]> git.0d.be Git - empathy.git/blob - libempathy/empathy-tp-file.c
Added EmpathyFile and EmpathyTpFile objects. (Jonny Lamb)
[empathy.git] / libempathy / empathy-tp-file.c
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * Copyright (C) 2007 Collabora Ltd.
4  *   @author: Xavier Claessens <xclaesse@gmail.com>
5  * Copyright (C) 2007 Marco Barisione <marco@barisione.org>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20  */
21
22 #include <config.h>
23
24 #include <string.h>
25 #include <unistd.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <sys/un.h>
29
30 #include <glib/gi18n.h>
31
32 #include <gio/gio.h>
33 #include <gio/gunixinputstream.h>
34 #include <gio/gunixoutputstream.h>
35
36 #include <libtelepathy/tp-conn.h>
37 #include <libtelepathy/tp-helpers.h>
38 #include <libtelepathy/tp-props-iface.h>
39
40 #include <telepathy-glib/proxy-subclass.h>
41
42 #include "empathy-tp-file.h"
43 #include "empathy-file.h"
44 #include "empathy-tp-file-priv.h"
45 #include "empathy-contact-factory.h"
46 #include "empathy-marshal.h"
47 #include "empathy-time.h"
48 #include "empathy-utils.h"
49
50 #define DEBUG_FLAG EMPATHY_DEBUG_FT
51 #include "empathy-debug.h"
52
53 /**
54  * SECTION:empathy-tp-file
55  * @short_description: File channel
56  * @see_also: #EmpathyFile, #EmpathyContact, empathy_send_file()
57  * @include: libempthy/empathy-tp-file.h
58  *
59  * The #EmpathyTpFile object represents a Telepathy file channel.
60  */
61
62 static void      empathy_tp_file_class_init           (EmpathyTpFileClass      *klass);
63 static void      empathy_tp_file_init                 (EmpathyTpFile           *tp_file);
64 static void      tp_file_finalize                     (GObject               *object);
65 static GObject * tp_file_constructor                  (GType                  type,
66                  guint                  n_props,
67                  GObjectConstructParam *props);
68 static void      tp_file_get_property                 (GObject               *object,
69                  guint                  param_id,
70                  GValue                *value,
71                  GParamSpec            *pspec);
72 static void      tp_file_set_property                 (GObject               *object,
73                  guint                  param_id,
74                  const GValue          *value,
75                  GParamSpec            *pspec);
76 static void      tp_file_destroy_cb                   (TpChannel             *file_chan,
77                  EmpathyTpFile           *tp_file);
78 static void      tp_file_closed_cb                    (TpChannel             *file_chan,
79                  EmpathyTpFile           *tp_file,
80                  GObject                 *weak_object);
81 static void      tp_file_state_changed_cb             (DBusGProxy            *file_iface,
82                  guint                  state,
83                  guint                  reason,
84                  EmpathyTpFile           *tp_file);
85 static void      tp_file_transferred_bytes_changed_cb (DBusGProxy            *file_iface,
86                  guint64                transferred_bytes,
87                  EmpathyTpFile           *tp_file);
88 static void      copy_stream                        (GInputStream          *in,
89                  GOutputStream         *out,
90                  GCancellable          *cancellable);
91
92 /* EmpathyTpFile object */
93
94 #define GET_PRIV(obj) (G_TYPE_INSTANCE_GET_PRIVATE ((obj), \
95            EMPATHY_TYPE_TP_FILE, EmpathyTpFilePriv))
96
97 typedef struct _EmpathyTpFilePriv  EmpathyTpFilePriv;
98
99 struct _EmpathyTpFilePriv {
100   EmpathyContactFactory *factory;
101   McAccount             *account;
102   gchar                 *id;
103   MissionControl        *mc;
104
105   TpChannel             *channel;
106
107   /* Previously in File/FT struct */
108   EmpathyFile                            *cached_empathy_file;
109   EmpathyContact                         *contact;
110   GInputStream                           *in_stream;
111   GOutputStream                          *out_stream;
112   gchar                                  *filename;
113   EmpFileTransferDirection                direction;
114   EmpFileTransferState                    state;
115   EmpFileTransferStateChangeReason        state_change_reason;
116   guint64                                 size;
117   guint64                                 transferred_bytes;
118   gint64                                  start_time;
119   gchar                                  *unix_socket_path;
120   gchar                                  *content_md5;
121   gchar                                  *content_type;
122   gchar                                  *description;
123   GCancellable                           *cancellable;
124 };
125
126 enum {
127   PROP_0,
128   PROP_ACCOUNT,
129   PROP_CHANNEL,
130   PROP_STATE,
131   PROP_DIRECTION,
132   PROP_FILENAME,
133   PROP_SIZE,
134   PROP_CONTENT_TYPE,
135   PROP_CONTENT_MD5,
136   PROP_IN_STREAM,
137 };
138
139 G_DEFINE_TYPE (EmpathyTpFile, empathy_tp_file, G_TYPE_OBJECT);
140
141 static void
142 empathy_tp_file_class_init (EmpathyTpFileClass *klass)
143 {
144   GObjectClass *object_class = G_OBJECT_CLASS (klass);
145
146   object_class->finalize = tp_file_finalize;
147   object_class->constructor = tp_file_constructor;
148   object_class->get_property = tp_file_get_property;
149   object_class->set_property = tp_file_set_property;
150
151   /* Construct-only properties */
152   g_object_class_install_property (object_class,
153       PROP_ACCOUNT,
154       g_param_spec_object ("account",
155           "channel Account",
156           "The account associated with the channel",
157           MC_TYPE_ACCOUNT,
158           G_PARAM_READWRITE |
159           G_PARAM_CONSTRUCT_ONLY));
160
161   g_object_class_install_property (object_class,
162       PROP_CHANNEL,
163       g_param_spec_object ("channel",
164           "telepathy channel",
165           "The file transfer channel",
166           TP_TYPE_CHANNEL,
167           G_PARAM_READWRITE |
168           G_PARAM_CONSTRUCT_ONLY));
169
170   g_object_class_install_property (object_class,
171       PROP_STATE,
172       g_param_spec_uint ("state",
173           "state of the transfer",
174           "The file transfer state",
175           0,
176           G_MAXUINT,
177           G_MAXUINT,
178           G_PARAM_READWRITE |
179           G_PARAM_CONSTRUCT));
180
181   g_object_class_install_property (object_class,
182       PROP_DIRECTION,
183       g_param_spec_uint ("direction",
184           "direction of the transfer",
185           "The file transfer direction",
186           0,
187           G_MAXUINT,
188           G_MAXUINT,
189           G_PARAM_READWRITE |
190           G_PARAM_CONSTRUCT));
191
192   g_object_class_install_property (object_class,
193       PROP_FILENAME,
194       g_param_spec_string ("filename",
195           "name of the transfer",
196           "The file transfer filename",
197           "",
198           G_PARAM_READWRITE));
199
200   g_object_class_install_property (object_class,
201       PROP_SIZE,
202       g_param_spec_uint64 ("size",
203           "size of the file",
204           "The file transfer size",
205           0,
206           G_MAXUINT64,
207           G_MAXUINT64,
208           G_PARAM_READWRITE));
209
210   g_object_class_install_property (object_class,
211       PROP_CONTENT_TYPE,
212       g_param_spec_string ("content-type",
213           "file transfer content-type",
214           "The file transfer content-type",
215           "",
216           G_PARAM_READWRITE));
217
218   g_object_class_install_property (object_class,
219       PROP_CONTENT_MD5,
220       g_param_spec_string ("content-md5",
221           "file transfer md5sum",
222           "The md5 sum of the transfer's contents",
223           "",
224           G_PARAM_READWRITE));
225
226   g_object_class_install_property (object_class,
227       PROP_IN_STREAM,
228       g_param_spec_object ("in-stream",
229           "transfer input stream",
230           "The input stream for file transfer",
231           G_TYPE_INPUT_STREAM,
232           G_PARAM_READWRITE));
233
234   g_type_class_add_private (object_class, sizeof (EmpathyTpFilePriv));
235 }
236
237 static void
238 empathy_tp_file_init (EmpathyTpFile *tp_file)
239 {
240 }
241
242 static void
243 tp_file_finalize (GObject *object)
244 {
245   EmpathyTpFilePriv *priv;
246   EmpathyTpFile     *tp_file;
247
248   tp_file = EMPATHY_TP_FILE (object);
249   priv = GET_PRIV (tp_file);
250
251   if (priv->channel)
252     {
253       DEBUG ("Closing channel..");
254       g_signal_handlers_disconnect_by_func (priv->channel,
255           tp_file_destroy_cb, object);
256       tp_cli_channel_run_close (priv->channel, -1, NULL, NULL);
257       g_object_unref (priv->channel);
258     }
259
260   if (priv->factory)
261     {
262       g_object_unref (priv->factory);
263     }
264   if (priv->account)
265     {
266       g_object_unref (priv->account);
267     }
268   if (priv->mc)
269     {
270       g_object_unref (priv->mc);
271     }
272
273   g_free (priv->id);
274   g_free (priv->filename);
275   g_free (priv->unix_socket_path);
276   g_free (priv->description);
277   g_free (priv->content_md5);
278   g_free (priv->content_type);
279
280   if (priv->in_stream)
281     g_object_unref (priv->in_stream);
282
283   if (priv->out_stream)
284     g_object_unref (priv->out_stream);
285
286   if (priv->contact)
287     g_object_unref (priv->contact);
288
289   if (priv->cancellable)
290     g_object_unref (priv->cancellable);
291
292   G_OBJECT_CLASS (empathy_tp_file_parent_class)->finalize (object);
293 }
294
295 static GObject *
296 tp_file_constructor (GType                  type,
297        guint                  n_props,
298        GObjectConstructParam *props)
299 {
300   GObject         *tp_file;
301   EmpathyTpFilePriv *priv;
302   GError            *error = NULL;
303   GHashTable        *properties;
304
305   tp_file = G_OBJECT_CLASS (empathy_tp_file_parent_class)->constructor (type, n_props, props);
306
307   priv = GET_PRIV (tp_file);
308
309   priv->factory = empathy_contact_factory_new ();
310   priv->mc = empathy_mission_control_new ();
311
312   tp_cli_channel_connect_to_closed (priv->channel,
313                                     (tp_cli_channel_signal_callback_closed) tp_file_closed_cb,
314                                     tp_file,
315                                     NULL, NULL, NULL);
316
317   emp_cli_channel_type_file_connect_to_file_transfer_state_changed (TP_PROXY (priv->channel),
318                                                                     (emp_cli_channel_type_file_signal_callback_file_transfer_state_changed) tp_file_state_changed_cb,
319                                                                     tp_file,
320                                                                     NULL, NULL, NULL);
321
322   emp_cli_channel_type_file_connect_to_transferred_bytes_changed (TP_PROXY (priv->channel),
323                                                                   (emp_cli_channel_type_file_signal_callback_transferred_bytes_changed) tp_file_transferred_bytes_changed_cb,
324                                                                   tp_file,
325                                                                   NULL, NULL, NULL);
326
327   if (!tp_cli_dbus_properties_run_get_all (priv->channel,
328       -1, EMP_IFACE_CHANNEL_TYPE_FILE, &properties, &error, NULL))
329     {
330       DEBUG ("Failed to get properties: %s",
331           error ? error->message : "No error given");
332       g_clear_error (&error);
333       return NULL;
334     }
335
336   priv->size = g_value_get_uint64 (g_hash_table_lookup (properties, "Size"));
337
338   priv->state = g_value_get_uint (g_hash_table_lookup (properties, "State"));
339
340   /* Invalid reason, so empathy_file_get_state_change_reason() can give
341    * a warning if called for a not closed file transfer. */
342   priv->state_change_reason = -1;
343
344   priv->transferred_bytes = g_value_get_uint64 (g_hash_table_lookup (properties, "TransferredBytes"));
345
346   priv->filename = g_value_dup_string (g_hash_table_lookup (properties, "Filename"));
347
348   priv->content_md5 = g_value_dup_string (g_hash_table_lookup (properties, "ContentMD5"));
349
350   priv->description = g_value_dup_string (g_hash_table_lookup (properties, "Description"));
351
352   priv->unix_socket_path = g_value_dup_string (g_hash_table_lookup (properties, "SocketPath"));
353
354   priv->direction = g_value_get_uint (g_hash_table_lookup (properties, "Direction"));
355
356   g_hash_table_destroy (properties);
357
358   return tp_file;
359 }
360
361 static void
362 tp_file_get_property (GObject    *object,
363         guint       param_id,
364         GValue     *value,
365         GParamSpec *pspec)
366 {
367   EmpathyTpFilePriv *priv;
368   EmpathyTpFile     *tp_file;
369
370   priv = GET_PRIV (object);
371   tp_file = EMPATHY_TP_FILE (object);
372
373   switch (param_id)
374     {
375       case PROP_ACCOUNT:
376         g_value_set_object (value, priv->account);
377         break;
378       case PROP_CHANNEL:
379         g_value_set_object (value, priv->channel);
380         break;
381       default:
382         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
383         break;
384     };
385 }
386
387 static void
388 tp_file_channel_set_dbus_property (gpointer proxy,
389                                    const gchar *property,
390                                    const GValue *value)
391 {
392         DEBUG ("Setting %s property", property);
393         tp_cli_dbus_properties_run_set (TP_PROXY (proxy),
394                                         -1,
395                                         EMP_IFACE_CHANNEL_TYPE_FILE,
396                                         property,
397                                         value,
398                                         NULL,
399                                         NULL);
400         DEBUG ("done");
401 }
402
403
404 static void
405 tp_file_set_property (GObject      *object,
406         guint         param_id,
407         const GValue *value,
408         GParamSpec   *pspec)
409 {
410   EmpathyTpFilePriv *priv;
411
412   priv = GET_PRIV (object);
413
414   switch (param_id)
415     {
416       case PROP_ACCOUNT:
417         priv->account = g_object_ref (g_value_get_object (value));
418         break;
419       case PROP_CHANNEL:
420         priv->channel = g_object_ref (g_value_get_object (value));
421         break;
422       case PROP_STATE:
423         priv->state = g_value_get_uint (value);
424         break;
425       case PROP_DIRECTION:
426         priv->direction = g_value_get_uint (value);
427         break;
428       case PROP_FILENAME:
429         g_free (priv->filename);
430         priv->filename = g_value_dup_string (value);
431         tp_file_channel_set_dbus_property (priv->channel, "Filename", value);
432         break;
433       case PROP_SIZE:
434         priv->size = g_value_get_uint64 (value);
435         tp_file_channel_set_dbus_property (priv->channel, "Size", value);
436         break;
437       case PROP_CONTENT_TYPE:
438         tp_file_channel_set_dbus_property (priv->channel, "ContentType", value);
439         g_free (priv->content_type);
440         priv->content_type = g_value_dup_string (value);
441         break;
442       case PROP_CONTENT_MD5:
443         tp_file_channel_set_dbus_property (priv->channel, "ContentMD5", value);
444         g_free (priv->content_md5);
445         priv->content_md5 = g_value_dup_string (value);
446         break;
447       case PROP_IN_STREAM:
448         if (priv->in_stream)
449           g_object_unref (priv->in_stream);
450         priv->in_stream = g_object_ref (g_value_get_object (value));
451         break;
452       default:
453         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
454         break;
455     };
456 }
457
458 /**
459  * empathy_tp_file_new:
460  * @account: the #McAccount for the channel
461  * @channel: a Telepathy channel
462  *
463  * Creates a new #EmpathyTpFile wrapping @channel.
464  *
465  * Returns: a new #EmpathyTpFile
466  */
467 EmpathyTpFile *
468 empathy_tp_file_new (McAccount *account,
469        TpChannel *channel)
470 {
471   return g_object_new (EMPATHY_TYPE_TP_FILE,
472            "account", account,
473            "channel", channel,
474            NULL);
475 }
476
477 /**
478  * empathy_tp_file_get_id:
479  * @tp_file: an #EmpathyTpFile
480  *
481  * Returns the ID of @tp_file.
482  *
483  * Returns: the ID
484  */
485 const gchar *
486 empathy_tp_file_get_id (EmpathyTpFile *tp_file)
487 {
488   EmpathyTpFilePriv *priv;
489
490   g_return_val_if_fail (EMPATHY_IS_TP_FILE (tp_file), NULL);
491
492   priv = GET_PRIV (tp_file);
493
494   return priv->id;
495 }
496
497 /**
498  * empathy_tp_file_get_channel
499  * @tp_file: an #EmpathyTpFile
500  *
501  * Returns the Telepathy file transfer channel
502  *
503  * Returns: the #TpChannel
504  */
505 TpChannel *
506 empathy_tp_file_get_channel (EmpathyTpFile *tp_file)
507 {
508   EmpathyTpFilePriv *priv;
509
510   g_return_val_if_fail (EMPATHY_IS_TP_FILE (tp_file), NULL);
511
512   priv = GET_PRIV (tp_file);
513
514   return priv->channel;
515 }
516
517 static void
518 tp_file_destroy_cb (TpChannel *file_channel, EmpathyTpFile *tp_file)
519 {
520   EmpathyTpFilePriv *priv;
521
522   priv = GET_PRIV (tp_file);
523
524   DEBUG ("Channel Closed or CM crashed");
525
526   g_object_unref  (priv->channel);
527   priv->channel = NULL;
528 }
529
530 static void
531 tp_file_closed_cb (TpChannel *file_channel, EmpathyTpFile *tp_file, GObject *weak_object)
532 {
533   EmpathyTpFilePriv *priv;
534
535   priv = GET_PRIV (tp_file);
536
537   /* The channel is closed, do just like if the proxy was destroyed */
538   g_signal_handlers_disconnect_by_func (priv->channel,
539                 tp_file_destroy_cb,
540                 tp_file);
541   tp_file_destroy_cb (file_channel, tp_file);
542 }
543
544 static gint64
545 get_time_msec (void)
546 {
547   GTimeVal tv;
548
549   g_get_current_time (&tv);
550   return ((gint64) tv.tv_sec) * 1000 + tv.tv_usec / 1000;
551 }
552
553 static gint
554 _get_local_socket (EmpathyTpFile *tp_file)
555 {
556   gint fd;
557   size_t path_len;
558   struct sockaddr_un addr;
559   EmpathyTpFilePriv *priv;
560
561   priv = GET_PRIV (tp_file);
562
563   if (priv->unix_socket_path == NULL)
564     return -1;
565
566   fd = socket (PF_UNIX, SOCK_STREAM, 0);
567   if (fd < 0)
568     return -1;
569
570   memset (&addr, 0, sizeof (addr));
571   addr.sun_family = AF_UNIX;
572   path_len = strlen (priv->unix_socket_path);
573   strncpy (addr.sun_path, priv->unix_socket_path, path_len);
574
575   if (connect (fd, (struct sockaddr*) &addr,
576       G_STRUCT_OFFSET (struct sockaddr_un, sun_path) + path_len) < 0)
577     {
578       close (fd);
579       return -1;
580     }
581
582   return fd;
583 }
584
585 static void
586 send_file (EmpathyTpFile *tp_file)
587 {
588   gint           socket_fd;
589   GOutputStream *socket_stream;
590   EmpathyTpFilePriv *priv;
591
592   priv = GET_PRIV (tp_file);
593
594   DEBUG ("Sending file content: filename=%s",
595            priv->filename);
596
597   g_return_if_fail (priv->in_stream);
598
599   socket_fd = _get_local_socket (tp_file);
600   if (socket_fd < 0)
601     {
602       DEBUG ("failed to get local socket fd");
603       return;
604     }
605   DEBUG ("got local socket fd");
606   socket_stream = g_unix_output_stream_new (socket_fd, TRUE);
607
608   priv->cancellable = g_cancellable_new ();
609
610   copy_stream (priv->in_stream, socket_stream, priv->cancellable);
611
612   g_object_unref (socket_stream);
613 }
614
615 static void
616 tp_file_state_changed_cb (DBusGProxy *file_iface,
617     EmpFileTransferState state,
618     EmpFileTransferStateChangeReason reason,
619     EmpathyTpFile *tp_file)
620 {
621   EmpathyTpFilePriv *priv;
622
623   priv = GET_PRIV (tp_file);
624
625   DEBUG ("File transfer state changed: filename=%s, "
626            "old state=%u, state=%u, reason=%u",
627            priv->filename, priv->state, state, reason);
628
629   if (state == EMP_FILE_TRANSFER_STATE_OPEN)
630     priv->start_time = get_time_msec ();
631
632   DEBUG ("state = %u, direction = %u, in_stream = %s",
633          state, priv->direction, priv->in_stream ? "present" : "not present");
634
635   if (state == EMP_FILE_TRANSFER_STATE_OPEN &&
636       priv->direction == EMP_FILE_TRANSFER_DIRECTION_OUTGOING &&
637       priv->in_stream)
638     send_file (tp_file);
639
640   priv->state = state;
641   priv->state_change_reason = reason;
642
643   g_object_notify (G_OBJECT (tp_file), "state");
644 }
645
646 static void
647 tp_file_transferred_bytes_changed_cb (DBusGProxy *file_iface,
648     guint64 transferred_bytes, EmpathyTpFile *tp_file)
649 {
650   EmpathyTpFilePriv *priv;
651
652   priv = GET_PRIV (tp_file);
653
654   if (priv->transferred_bytes == transferred_bytes)
655     return;
656
657   priv->transferred_bytes = transferred_bytes;
658
659   g_object_notify (G_OBJECT (tp_file), "transferred-bytes");
660 }
661
662 EmpathyContact *
663 _empathy_tp_file_get_contact (EmpathyTpFile *tp_file)
664 {
665   EmpathyTpFilePriv *priv;
666
667   priv = GET_PRIV (tp_file);
668
669   return priv->contact;
670 }
671
672 GInputStream *
673 _empathy_tp_file_get_input_stream (EmpathyTpFile *tp_file)
674 {
675   EmpathyTpFilePriv *priv;
676
677   priv = GET_PRIV (tp_file);
678
679   return priv->in_stream;
680 }
681
682 GOutputStream *
683 _empathy_tp_file_get_output_stream (EmpathyTpFile *tp_file)
684 {
685   EmpathyTpFilePriv *priv;
686
687   priv = GET_PRIV (tp_file);
688
689   return priv->out_stream;
690 }
691
692 const gchar *
693 _empathy_tp_file_get_filename (EmpathyTpFile *tp_file)
694 {
695   EmpathyTpFilePriv *priv;
696
697   priv = GET_PRIV (tp_file);
698
699   return priv->filename;
700 }
701
702 EmpFileTransferDirection
703 _empathy_tp_file_get_direction (EmpathyTpFile *tp_file)
704 {
705   EmpathyTpFilePriv *priv;
706
707   priv = GET_PRIV (tp_file);
708
709   return priv->direction;
710 }
711
712 EmpFileTransferState
713 _empathy_tp_file_get_state (EmpathyTpFile *tp_file)
714 {
715   EmpathyTpFilePriv *priv;
716
717   priv = GET_PRIV (tp_file);
718
719   return priv->state;
720 }
721
722 EmpFileTransferStateChangeReason
723 _empathy_tp_file_get_state_change_reason (EmpathyTpFile *tp_file)
724 {
725   EmpathyTpFilePriv *priv;
726
727   priv = GET_PRIV (tp_file);
728
729   g_return_val_if_fail (priv->state_change_reason >= 0,
730             EMP_FILE_TRANSFER_STATE_CHANGE_REASON_NONE);
731
732   return priv->state_change_reason;
733 }
734
735 guint64
736 _empathy_tp_file_get_size (EmpathyTpFile *tp_file)
737 {
738   EmpathyTpFilePriv *priv;
739
740   priv = GET_PRIV (tp_file);
741
742   return priv->size;
743 }
744
745 guint64
746 _empathy_tp_file_get_transferred_bytes (EmpathyTpFile *tp_file)
747 {
748   EmpathyTpFilePriv *priv;
749
750   priv = GET_PRIV (tp_file);
751
752   return priv->transferred_bytes;
753 }
754
755 gint
756 _empathy_tp_file_get_remaining_time (EmpathyTpFile *tp_file)
757 {
758   EmpathyTpFilePriv *priv;
759   gint64   curr_time, elapsed_time;
760   gdouble  time_per_byte;
761   gdouble  remaining_time;
762
763   priv = GET_PRIV (tp_file);
764
765   if (priv->size == EMPATHY_FILE_UNKNOWN_SIZE)
766     return -1;
767
768   if (priv->transferred_bytes == priv->size)
769     return 0;
770
771   curr_time = get_time_msec ();
772   elapsed_time = curr_time - priv->start_time;
773   time_per_byte = (gdouble) elapsed_time / (gdouble) priv->transferred_bytes;
774   remaining_time = (time_per_byte * (priv->size - priv->transferred_bytes)) / 1000;
775
776   return (gint) (remaining_time + 0.5);
777 }
778
779 void
780 _empathy_tp_file_set_input_stream (EmpathyTpFile *tp_file,
781     GInputStream *in_stream)
782 {
783   EmpathyTpFilePriv *priv;
784
785   priv = GET_PRIV (tp_file);
786
787   if (priv->in_stream == in_stream)
788     return;
789
790   if (priv->direction == EMP_FILE_TRANSFER_DIRECTION_INCOMING)
791     g_warning ("Setting an input stream for incoming file "
792          "transfers is useless");
793
794   if (priv->in_stream)
795     g_object_unref (priv->in_stream);
796   if (in_stream)
797     g_object_ref (in_stream);
798   priv->in_stream = in_stream;
799
800   g_object_notify (G_OBJECT (tp_file), "in-stream");
801 }
802
803 void
804 _empathy_tp_file_set_output_stream (EmpathyTpFile *tp_file,
805     GOutputStream *out_stream)
806 {
807   EmpathyTpFilePriv *priv;
808
809   priv = GET_PRIV (tp_file);
810
811   if (priv->out_stream == out_stream)
812     return;
813
814   if (priv->direction == EMP_FILE_TRANSFER_DIRECTION_OUTGOING)
815     g_warning ("Setting an output stream for outgoing file "
816          "transfers is useless");
817
818   if (priv->out_stream)
819     g_object_unref (priv->out_stream);
820   if (out_stream)
821     g_object_ref (out_stream);
822   priv->out_stream = out_stream;
823
824   g_object_notify (G_OBJECT (tp_file), "output-stream");
825 }
826
827 void
828 _empathy_tp_file_set_filename (EmpathyTpFile *tp_file,
829     const gchar *filename)
830 {
831   EmpathyTpFilePriv *priv;
832
833   priv = GET_PRIV (tp_file);
834
835   g_return_if_fail (filename != NULL);
836
837   if (priv->filename && strcmp (filename, priv->filename) == 0) {
838     return;
839   }
840
841   g_free (priv->filename);
842   priv->filename = g_strdup (filename);
843
844   g_object_notify (G_OBJECT (tp_file), "filename");
845 }
846
847 /* Functions to copy the content of a GInputStream to a GOutputStream */
848
849 #define N_BUFFERS 2
850 #define BUFFER_SIZE 4096
851
852 typedef struct {
853   GInputStream  *in;
854   GOutputStream *out;
855   GCancellable  *cancellable;
856   char          *buff[N_BUFFERS]; /* the temporary buffers */
857   gsize          count[N_BUFFERS]; /* how many bytes are used in the buffers */
858   gboolean       is_full[N_BUFFERS]; /* whether the buffers contain data */
859   gint           curr_read; /* index of the buffer used for reading */
860   gint           curr_write; /* index of the buffer used for writing */
861   gboolean       is_reading; /* we are reading */
862   gboolean       is_writing; /* we are writing */
863   guint          n_closed; /* number of streams that have been closed */
864 } CopyData;
865
866 static void schedule_next (CopyData *copy);
867
868 static void
869 free_copy_data_if_closed (CopyData *copy)
870 {
871   gint i;
872
873   /* Free the data only if both the input and output streams have
874    * been closed. */
875   copy->n_closed++;
876   if (copy->n_closed < 2)
877     return;
878
879   if (copy->in != NULL)
880     g_object_unref (copy->in);
881
882   if (copy->out != NULL)
883     g_object_unref (copy->out);
884
885   for (i = 0; i < N_BUFFERS; i++)
886     g_free (copy->buff[i]);
887
888   g_object_unref (copy->cancellable);
889   g_free (copy);
890 }
891
892 static void
893 io_error (CopyData *copy, GError *error)
894 {
895   g_cancellable_cancel (copy->cancellable);
896
897   if (error == NULL)
898     g_warning ("I/O error");
899   else if (error->domain == G_IO_ERROR && error->code == G_IO_ERROR_CANCELLED)
900     ; /* Ignore cancellations */
901   else
902     g_warning ("I/O error: %d: %s\n", error->code, error->message);
903
904   if (copy->in != NULL)
905     g_input_stream_close (copy->in, NULL, NULL);
906
907   if (copy->out != NULL)
908     g_output_stream_close (copy->out, NULL, NULL);
909
910   free_copy_data_if_closed (copy);
911 }
912
913 static void
914 close_done (GObject *source_object, GAsyncResult *res, gpointer user_data)
915 {
916   CopyData *copy = user_data;
917
918   g_object_unref (source_object);
919   free_copy_data_if_closed (copy);
920 }
921
922 static void
923 write_done_cb (GObject *source_object, GAsyncResult *res, gpointer user_data)
924 {
925   CopyData *copy = user_data;
926   gssize    count_write;
927   GError   *error = NULL;
928
929   count_write = g_output_stream_write_finish (copy->out, res, &error);
930
931   if (count_write <= 0) {
932     io_error (copy, error);
933     g_error_free (error);
934     return;
935   }
936
937   copy->is_full[copy->curr_write] = FALSE;
938   copy->curr_write = (copy->curr_write + 1) % N_BUFFERS;
939   copy->is_writing = FALSE;
940
941   schedule_next (copy);
942 }
943
944 static void
945 read_done_cb (GObject *source_object, GAsyncResult *res, gpointer user_data)
946 {
947   CopyData *copy = user_data;
948   gssize    count_read;
949   GError   *error = NULL;
950
951   count_read = g_input_stream_read_finish (copy->in, res, &error);
952
953   if (count_read == 0) {
954     g_input_stream_close_async (copy->in, 0, copy->cancellable,
955               close_done, copy);
956     copy->in = NULL;
957   }
958   else if (count_read < 0) {
959     io_error (copy, error);
960     g_error_free (error);
961     return;
962   }
963
964   copy->count[copy->curr_read] = count_read;
965   copy->is_full[copy->curr_read] = TRUE;
966   copy->curr_read = (copy->curr_read + 1) % N_BUFFERS;
967   copy->is_reading = FALSE;
968
969   schedule_next (copy);
970 }
971
972 static void
973 schedule_next (CopyData *copy)
974 {
975   if (copy->in != NULL &&
976       !copy->is_reading &&
977       !copy->is_full[copy->curr_read]) {
978     /* We are not reading and the current buffer is empty, so
979      * start an async read. */
980     copy->is_reading = TRUE;
981     g_input_stream_read_async (copy->in,
982              copy->buff[copy->curr_read],
983              BUFFER_SIZE, 0, copy->cancellable,
984              read_done_cb, copy);
985   }
986
987   if (!copy->is_writing &&
988       copy->is_full[copy->curr_write]) {
989     if (copy->count[copy->curr_write] == 0) {
990       /* The last read on the buffer read 0 bytes, this
991        * means that we got an EOF, so we can close
992        * the output channel. */
993       g_output_stream_close_async (copy->out, 0,
994                  copy->cancellable,
995                  close_done, copy);
996       copy->out = NULL;
997     } else {
998       /* We are not writing and the current buffer contains
999        * data, so start an async write. */
1000       copy->is_writing = TRUE;
1001       g_output_stream_write_async (copy->out,
1002                  copy->buff[copy->curr_write],
1003                  copy->count[copy->curr_write],
1004                  0, copy->cancellable,
1005                  write_done_cb, copy);
1006     }
1007   }
1008 }
1009
1010 static void
1011 copy_stream (GInputStream *in, GOutputStream *out, GCancellable *cancellable)
1012 {
1013   CopyData *copy;
1014   gint      i;
1015
1016   g_return_if_fail (in != NULL);
1017   g_return_if_fail (out != NULL);
1018
1019   copy = g_new0 (CopyData, 1);
1020   copy->in = g_object_ref (in);
1021   copy->out = g_object_ref (out);
1022
1023   if (cancellable != NULL)
1024     copy->cancellable = g_object_ref (cancellable);
1025   else
1026     copy->cancellable = g_cancellable_new ();
1027
1028   for (i = 0; i < N_BUFFERS; i++)
1029     copy->buff[i] = g_malloc (BUFFER_SIZE);
1030
1031   schedule_next (copy);
1032 }