]> git.0d.be Git - empathy.git/blob - libempathy/empathy-tp-file.c
Added a workaround to EmpathyTpFile's finalize function so it doesn't run g_object_un...
[empathy.git] / libempathy / empathy-tp-file.c
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * Copyright (C) 2007 Collabora Ltd.
4  *   @author: Xavier Claessens <xclaesse@gmail.com>
5  * Copyright (C) 2007 Marco Barisione <marco@barisione.org>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20  */
21
22 #include <config.h>
23
24 #include <string.h>
25 #include <unistd.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <sys/un.h>
29
30 #include <glib/gi18n.h>
31
32 #include <gio/gio.h>
33 #include <gio/gunixinputstream.h>
34 #include <gio/gunixoutputstream.h>
35
36 #include <libtelepathy/tp-conn.h>
37 #include <libtelepathy/tp-helpers.h>
38 #include <libtelepathy/tp-props-iface.h>
39
40 #include <telepathy-glib/proxy-subclass.h>
41
42 #include "empathy-tp-file.h"
43 #include "empathy-file.h"
44 #include "empathy-tp-file-priv.h"
45 #include "empathy-contact-factory.h"
46 #include "empathy-marshal.h"
47 #include "empathy-time.h"
48 #include "empathy-utils.h"
49
50 #define DEBUG_FLAG EMPATHY_DEBUG_FT
51 #include "empathy-debug.h"
52
53 /**
54  * SECTION:empathy-tp-file
55  * @short_description: File channel
56  * @see_also: #EmpathyFile, #EmpathyContact, empathy_send_file()
57  * @include: libempthy/empathy-tp-file.h
58  *
59  * The #EmpathyTpFile object represents a Telepathy file channel.
60  */
61
62 static void      empathy_tp_file_class_init           (EmpathyTpFileClass      *klass);
63 static void      empathy_tp_file_init                 (EmpathyTpFile           *tp_file);
64 static void      tp_file_finalize                     (GObject               *object);
65 static GObject * tp_file_constructor                  (GType                  type,
66                  guint                  n_props,
67                  GObjectConstructParam *props);
68 static void      tp_file_get_property                 (GObject               *object,
69                  guint                  param_id,
70                  GValue                *value,
71                  GParamSpec            *pspec);
72 static void      tp_file_set_property                 (GObject               *object,
73                  guint                  param_id,
74                  const GValue          *value,
75                  GParamSpec            *pspec);
76 static void      tp_file_destroy_cb                   (TpChannel             *file_chan,
77                  EmpathyTpFile           *tp_file);
78 static void      tp_file_closed_cb                    (TpChannel             *file_chan,
79                  EmpathyTpFile           *tp_file,
80                  GObject                 *weak_object);
81 static void      tp_file_state_changed_cb             (DBusGProxy            *file_iface,
82                  guint                  state,
83                  guint                  reason,
84                  EmpathyTpFile           *tp_file);
85 static void      tp_file_transferred_bytes_changed_cb (DBusGProxy            *file_iface,
86                  guint64                transferred_bytes,
87                  EmpathyTpFile           *tp_file);
88 static void      copy_stream                        (GInputStream          *in,
89                  GOutputStream         *out,
90                  GCancellable          *cancellable);
91
92 /* EmpathyTpFile object */
93
94 #define GET_PRIV(obj) (G_TYPE_INSTANCE_GET_PRIVATE ((obj), \
95            EMPATHY_TYPE_TP_FILE, EmpathyTpFilePriv))
96
97 typedef struct _EmpathyTpFilePriv  EmpathyTpFilePriv;
98
99 struct _EmpathyTpFilePriv {
100   EmpathyContactFactory *factory;
101   McAccount             *account;
102   gchar                 *id;
103   MissionControl        *mc;
104
105   TpChannel             *channel;
106
107   /* Previously in File/FT struct */
108   EmpathyFile                            *cached_empathy_file;
109   EmpathyContact                         *contact;
110   GInputStream                           *in_stream;
111   GOutputStream                          *out_stream;
112   gchar                                  *filename;
113   EmpFileTransferDirection                direction;
114   EmpFileTransferState                    state;
115   EmpFileTransferStateChangeReason        state_change_reason;
116   guint64                                 size;
117   guint64                                 transferred_bytes;
118   gint64                                  start_time;
119   gchar                                  *unix_socket_path;
120   gchar                                  *content_md5;
121   gchar                                  *content_type;
122   gchar                                  *description;
123   GCancellable                           *cancellable;
124 };
125
126 enum {
127   PROP_0,
128   PROP_ACCOUNT,
129   PROP_CHANNEL,
130   PROP_STATE,
131   PROP_DIRECTION,
132   PROP_FILENAME,
133   PROP_SIZE,
134   PROP_CONTENT_TYPE,
135   PROP_CONTENT_MD5,
136   PROP_IN_STREAM,
137 };
138
139 G_DEFINE_TYPE (EmpathyTpFile, empathy_tp_file, G_TYPE_OBJECT);
140
141 static void
142 empathy_tp_file_class_init (EmpathyTpFileClass *klass)
143 {
144   GObjectClass *object_class = G_OBJECT_CLASS (klass);
145
146   object_class->finalize = tp_file_finalize;
147   object_class->constructor = tp_file_constructor;
148   object_class->get_property = tp_file_get_property;
149   object_class->set_property = tp_file_set_property;
150
151   /* Construct-only properties */
152   g_object_class_install_property (object_class,
153       PROP_ACCOUNT,
154       g_param_spec_object ("account",
155           "channel Account",
156           "The account associated with the channel",
157           MC_TYPE_ACCOUNT,
158           G_PARAM_READWRITE |
159           G_PARAM_CONSTRUCT_ONLY));
160
161   g_object_class_install_property (object_class,
162       PROP_CHANNEL,
163       g_param_spec_object ("channel",
164           "telepathy channel",
165           "The file transfer channel",
166           TP_TYPE_CHANNEL,
167           G_PARAM_READWRITE |
168           G_PARAM_CONSTRUCT_ONLY));
169
170   g_object_class_install_property (object_class,
171       PROP_STATE,
172       g_param_spec_uint ("state",
173           "state of the transfer",
174           "The file transfer state",
175           0,
176           G_MAXUINT,
177           G_MAXUINT,
178           G_PARAM_READWRITE |
179           G_PARAM_CONSTRUCT));
180
181   g_object_class_install_property (object_class,
182       PROP_DIRECTION,
183       g_param_spec_uint ("direction",
184           "direction of the transfer",
185           "The file transfer direction",
186           0,
187           G_MAXUINT,
188           G_MAXUINT,
189           G_PARAM_READWRITE |
190           G_PARAM_CONSTRUCT));
191
192   g_object_class_install_property (object_class,
193       PROP_FILENAME,
194       g_param_spec_string ("filename",
195           "name of the transfer",
196           "The file transfer filename",
197           "",
198           G_PARAM_READWRITE));
199
200   g_object_class_install_property (object_class,
201       PROP_SIZE,
202       g_param_spec_uint64 ("size",
203           "size of the file",
204           "The file transfer size",
205           0,
206           G_MAXUINT64,
207           G_MAXUINT64,
208           G_PARAM_READWRITE));
209
210   g_object_class_install_property (object_class,
211       PROP_CONTENT_TYPE,
212       g_param_spec_string ("content-type",
213           "file transfer content-type",
214           "The file transfer content-type",
215           "",
216           G_PARAM_READWRITE));
217
218   g_object_class_install_property (object_class,
219       PROP_CONTENT_MD5,
220       g_param_spec_string ("content-md5",
221           "file transfer md5sum",
222           "The md5 sum of the transfer's contents",
223           "",
224           G_PARAM_READWRITE));
225
226   g_object_class_install_property (object_class,
227       PROP_IN_STREAM,
228       g_param_spec_object ("in-stream",
229           "transfer input stream",
230           "The input stream for file transfer",
231           G_TYPE_INPUT_STREAM,
232           G_PARAM_READWRITE));
233
234   g_type_class_add_private (object_class, sizeof (EmpathyTpFilePriv));
235 }
236
237 static void
238 empathy_tp_file_init (EmpathyTpFile *tp_file)
239 {
240 }
241
242 static void
243 tp_file_finalize (GObject *object)
244 {
245   EmpathyTpFilePriv *priv;
246   EmpathyTpFile     *tp_file;
247
248   tp_file = EMPATHY_TP_FILE (object);
249   priv = GET_PRIV (tp_file);
250
251   if (priv->channel)
252     {
253       DEBUG ("Closing channel..");
254       g_signal_handlers_disconnect_by_func (priv->channel,
255           tp_file_destroy_cb, object);
256       tp_cli_channel_run_close (priv->channel, -1, NULL, NULL);
257       if (G_IS_OBJECT (priv->channel))
258         g_object_unref (priv->channel);
259     }
260
261   if (priv->factory)
262     {
263       g_object_unref (priv->factory);
264     }
265   if (priv->account)
266     {
267       g_object_unref (priv->account);
268     }
269   if (priv->mc)
270     {
271       g_object_unref (priv->mc);
272     }
273
274   g_free (priv->id);
275   g_free (priv->filename);
276   g_free (priv->unix_socket_path);
277   g_free (priv->description);
278   g_free (priv->content_md5);
279   g_free (priv->content_type);
280
281   if (priv->in_stream)
282     g_object_unref (priv->in_stream);
283
284   if (priv->out_stream)
285     g_object_unref (priv->out_stream);
286
287   if (priv->contact)
288     g_object_unref (priv->contact);
289
290   if (priv->cancellable)
291     g_object_unref (priv->cancellable);
292
293   G_OBJECT_CLASS (empathy_tp_file_parent_class)->finalize (object);
294 }
295
296 static GObject *
297 tp_file_constructor (GType                  type,
298        guint                  n_props,
299        GObjectConstructParam *props)
300 {
301   GObject         *tp_file;
302   EmpathyTpFilePriv *priv;
303   GError            *error = NULL;
304   GHashTable        *properties;
305
306   tp_file = G_OBJECT_CLASS (empathy_tp_file_parent_class)->constructor (type, n_props, props);
307
308   priv = GET_PRIV (tp_file);
309
310   priv->factory = empathy_contact_factory_new ();
311   priv->mc = empathy_mission_control_new ();
312
313   tp_cli_channel_connect_to_closed (priv->channel,
314                                     (tp_cli_channel_signal_callback_closed) tp_file_closed_cb,
315                                     tp_file,
316                                     NULL, NULL, NULL);
317
318   emp_cli_channel_type_file_connect_to_file_transfer_state_changed (TP_PROXY (priv->channel),
319                                                                     (emp_cli_channel_type_file_signal_callback_file_transfer_state_changed) tp_file_state_changed_cb,
320                                                                     tp_file,
321                                                                     NULL, NULL, NULL);
322
323   emp_cli_channel_type_file_connect_to_transferred_bytes_changed (TP_PROXY (priv->channel),
324                                                                   (emp_cli_channel_type_file_signal_callback_transferred_bytes_changed) tp_file_transferred_bytes_changed_cb,
325                                                                   tp_file,
326                                                                   NULL, NULL, NULL);
327
328   if (!tp_cli_dbus_properties_run_get_all (priv->channel,
329       -1, EMP_IFACE_CHANNEL_TYPE_FILE, &properties, &error, NULL))
330     {
331       DEBUG ("Failed to get properties: %s",
332           error ? error->message : "No error given");
333       g_clear_error (&error);
334       return NULL;
335     }
336
337   priv->size = g_value_get_uint64 (g_hash_table_lookup (properties, "Size"));
338
339   priv->state = g_value_get_uint (g_hash_table_lookup (properties, "State"));
340
341   /* Invalid reason, so empathy_file_get_state_change_reason() can give
342    * a warning if called for a not closed file transfer. */
343   priv->state_change_reason = -1;
344
345   priv->transferred_bytes = g_value_get_uint64 (g_hash_table_lookup (properties, "TransferredBytes"));
346
347   priv->filename = g_value_dup_string (g_hash_table_lookup (properties, "Filename"));
348
349   priv->content_md5 = g_value_dup_string (g_hash_table_lookup (properties, "ContentMD5"));
350
351   priv->description = g_value_dup_string (g_hash_table_lookup (properties, "Description"));
352
353   priv->unix_socket_path = g_value_dup_string (g_hash_table_lookup (properties, "SocketPath"));
354
355   priv->direction = g_value_get_uint (g_hash_table_lookup (properties, "Direction"));
356
357   g_hash_table_destroy (properties);
358
359   return tp_file;
360 }
361
362 static void
363 tp_file_get_property (GObject    *object,
364         guint       param_id,
365         GValue     *value,
366         GParamSpec *pspec)
367 {
368   EmpathyTpFilePriv *priv;
369   EmpathyTpFile     *tp_file;
370
371   priv = GET_PRIV (object);
372   tp_file = EMPATHY_TP_FILE (object);
373
374   switch (param_id)
375     {
376       case PROP_ACCOUNT:
377         g_value_set_object (value, priv->account);
378         break;
379       case PROP_CHANNEL:
380         g_value_set_object (value, priv->channel);
381         break;
382       default:
383         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
384         break;
385     };
386 }
387
388 static void
389 tp_file_channel_set_dbus_property (gpointer proxy,
390                                    const gchar *property,
391                                    const GValue *value)
392 {
393         DEBUG ("Setting %s property", property);
394         tp_cli_dbus_properties_run_set (TP_PROXY (proxy),
395                                         -1,
396                                         EMP_IFACE_CHANNEL_TYPE_FILE,
397                                         property,
398                                         value,
399                                         NULL,
400                                         NULL);
401         DEBUG ("done");
402 }
403
404
405 static void
406 tp_file_set_property (GObject      *object,
407         guint         param_id,
408         const GValue *value,
409         GParamSpec   *pspec)
410 {
411   EmpathyTpFilePriv *priv;
412
413   priv = GET_PRIV (object);
414
415   switch (param_id)
416     {
417       case PROP_ACCOUNT:
418         priv->account = g_object_ref (g_value_get_object (value));
419         break;
420       case PROP_CHANNEL:
421         priv->channel = g_object_ref (g_value_get_object (value));
422         break;
423       case PROP_STATE:
424         priv->state = g_value_get_uint (value);
425         break;
426       case PROP_DIRECTION:
427         priv->direction = g_value_get_uint (value);
428         break;
429       case PROP_FILENAME:
430         g_free (priv->filename);
431         priv->filename = g_value_dup_string (value);
432         tp_file_channel_set_dbus_property (priv->channel, "Filename", value);
433         break;
434       case PROP_SIZE:
435         priv->size = g_value_get_uint64 (value);
436         tp_file_channel_set_dbus_property (priv->channel, "Size", value);
437         break;
438       case PROP_CONTENT_TYPE:
439         tp_file_channel_set_dbus_property (priv->channel, "ContentType", value);
440         g_free (priv->content_type);
441         priv->content_type = g_value_dup_string (value);
442         break;
443       case PROP_CONTENT_MD5:
444         tp_file_channel_set_dbus_property (priv->channel, "ContentMD5", value);
445         g_free (priv->content_md5);
446         priv->content_md5 = g_value_dup_string (value);
447         break;
448       case PROP_IN_STREAM:
449         if (priv->in_stream)
450           g_object_unref (priv->in_stream);
451         priv->in_stream = g_object_ref (g_value_get_object (value));
452         break;
453       default:
454         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
455         break;
456     };
457 }
458
459 /**
460  * empathy_tp_file_new:
461  * @account: the #McAccount for the channel
462  * @channel: a Telepathy channel
463  *
464  * Creates a new #EmpathyTpFile wrapping @channel.
465  *
466  * Returns: a new #EmpathyTpFile
467  */
468 EmpathyTpFile *
469 empathy_tp_file_new (McAccount *account,
470        TpChannel *channel)
471 {
472   return g_object_new (EMPATHY_TYPE_TP_FILE,
473            "account", account,
474            "channel", channel,
475            NULL);
476 }
477
478 /**
479  * empathy_tp_file_get_id:
480  * @tp_file: an #EmpathyTpFile
481  *
482  * Returns the ID of @tp_file.
483  *
484  * Returns: the ID
485  */
486 const gchar *
487 empathy_tp_file_get_id (EmpathyTpFile *tp_file)
488 {
489   EmpathyTpFilePriv *priv;
490
491   g_return_val_if_fail (EMPATHY_IS_TP_FILE (tp_file), NULL);
492
493   priv = GET_PRIV (tp_file);
494
495   return priv->id;
496 }
497
498 /**
499  * empathy_tp_file_get_channel
500  * @tp_file: an #EmpathyTpFile
501  *
502  * Returns the Telepathy file transfer channel
503  *
504  * Returns: the #TpChannel
505  */
506 TpChannel *
507 empathy_tp_file_get_channel (EmpathyTpFile *tp_file)
508 {
509   EmpathyTpFilePriv *priv;
510
511   g_return_val_if_fail (EMPATHY_IS_TP_FILE (tp_file), NULL);
512
513   priv = GET_PRIV (tp_file);
514
515   return priv->channel;
516 }
517
518 static void
519 tp_file_destroy_cb (TpChannel *file_channel, EmpathyTpFile *tp_file)
520 {
521   EmpathyTpFilePriv *priv;
522
523   priv = GET_PRIV (tp_file);
524
525   DEBUG ("Channel Closed or CM crashed");
526
527   g_object_unref  (priv->channel);
528   priv->channel = NULL;
529 }
530
531 static void
532 tp_file_closed_cb (TpChannel *file_channel, EmpathyTpFile *tp_file, GObject *weak_object)
533 {
534   EmpathyTpFilePriv *priv;
535
536   priv = GET_PRIV (tp_file);
537
538   /* The channel is closed, do just like if the proxy was destroyed */
539   g_signal_handlers_disconnect_by_func (priv->channel,
540                 tp_file_destroy_cb,
541                 tp_file);
542   tp_file_destroy_cb (file_channel, tp_file);
543 }
544
545 static gint64
546 get_time_msec (void)
547 {
548   GTimeVal tv;
549
550   g_get_current_time (&tv);
551   return ((gint64) tv.tv_sec) * 1000 + tv.tv_usec / 1000;
552 }
553
554 static gint
555 _get_local_socket (EmpathyTpFile *tp_file)
556 {
557   gint fd;
558   size_t path_len;
559   struct sockaddr_un addr;
560   EmpathyTpFilePriv *priv;
561
562   priv = GET_PRIV (tp_file);
563
564   if (priv->unix_socket_path == NULL)
565     return -1;
566
567   fd = socket (PF_UNIX, SOCK_STREAM, 0);
568   if (fd < 0)
569     return -1;
570
571   memset (&addr, 0, sizeof (addr));
572   addr.sun_family = AF_UNIX;
573   path_len = strlen (priv->unix_socket_path);
574   strncpy (addr.sun_path, priv->unix_socket_path, path_len);
575
576   if (connect (fd, (struct sockaddr*) &addr,
577       G_STRUCT_OFFSET (struct sockaddr_un, sun_path) + path_len) < 0)
578     {
579       close (fd);
580       return -1;
581     }
582
583   return fd;
584 }
585
586 static void
587 send_file (EmpathyTpFile *tp_file)
588 {
589   gint           socket_fd;
590   GOutputStream *socket_stream;
591   EmpathyTpFilePriv *priv;
592
593   priv = GET_PRIV (tp_file);
594
595   DEBUG ("Sending file content: filename=%s",
596            priv->filename);
597
598   g_return_if_fail (priv->in_stream);
599
600   socket_fd = _get_local_socket (tp_file);
601   if (socket_fd < 0)
602     {
603       DEBUG ("failed to get local socket fd");
604       return;
605     }
606   DEBUG ("got local socket fd");
607   socket_stream = g_unix_output_stream_new (socket_fd, TRUE);
608
609   priv->cancellable = g_cancellable_new ();
610
611   copy_stream (priv->in_stream, socket_stream, priv->cancellable);
612
613   g_object_unref (socket_stream);
614 }
615
616 static void
617 tp_file_state_changed_cb (DBusGProxy *file_iface,
618     EmpFileTransferState state,
619     EmpFileTransferStateChangeReason reason,
620     EmpathyTpFile *tp_file)
621 {
622   EmpathyTpFilePriv *priv;
623
624   priv = GET_PRIV (tp_file);
625
626   DEBUG ("File transfer state changed: filename=%s, "
627            "old state=%u, state=%u, reason=%u",
628            priv->filename, priv->state, state, reason);
629
630   if (state == EMP_FILE_TRANSFER_STATE_OPEN)
631     priv->start_time = get_time_msec ();
632
633   DEBUG ("state = %u, direction = %u, in_stream = %s",
634          state, priv->direction, priv->in_stream ? "present" : "not present");
635
636   if (state == EMP_FILE_TRANSFER_STATE_OPEN &&
637       priv->direction == EMP_FILE_TRANSFER_DIRECTION_OUTGOING &&
638       priv->in_stream)
639     send_file (tp_file);
640
641   priv->state = state;
642   priv->state_change_reason = reason;
643
644   g_object_notify (G_OBJECT (tp_file), "state");
645 }
646
647 static void
648 tp_file_transferred_bytes_changed_cb (DBusGProxy *file_iface,
649     guint64 transferred_bytes, EmpathyTpFile *tp_file)
650 {
651   EmpathyTpFilePriv *priv;
652
653   priv = GET_PRIV (tp_file);
654
655   if (priv->transferred_bytes == transferred_bytes)
656     return;
657
658   priv->transferred_bytes = transferred_bytes;
659
660   g_object_notify (G_OBJECT (tp_file), "transferred-bytes");
661 }
662
663 EmpathyContact *
664 _empathy_tp_file_get_contact (EmpathyTpFile *tp_file)
665 {
666   EmpathyTpFilePriv *priv;
667
668   priv = GET_PRIV (tp_file);
669
670   return priv->contact;
671 }
672
673 GInputStream *
674 _empathy_tp_file_get_input_stream (EmpathyTpFile *tp_file)
675 {
676   EmpathyTpFilePriv *priv;
677
678   priv = GET_PRIV (tp_file);
679
680   return priv->in_stream;
681 }
682
683 GOutputStream *
684 _empathy_tp_file_get_output_stream (EmpathyTpFile *tp_file)
685 {
686   EmpathyTpFilePriv *priv;
687
688   priv = GET_PRIV (tp_file);
689
690   return priv->out_stream;
691 }
692
693 const gchar *
694 _empathy_tp_file_get_filename (EmpathyTpFile *tp_file)
695 {
696   EmpathyTpFilePriv *priv;
697
698   priv = GET_PRIV (tp_file);
699
700   return priv->filename;
701 }
702
703 EmpFileTransferDirection
704 _empathy_tp_file_get_direction (EmpathyTpFile *tp_file)
705 {
706   EmpathyTpFilePriv *priv;
707
708   priv = GET_PRIV (tp_file);
709
710   return priv->direction;
711 }
712
713 EmpFileTransferState
714 _empathy_tp_file_get_state (EmpathyTpFile *tp_file)
715 {
716   EmpathyTpFilePriv *priv;
717
718   priv = GET_PRIV (tp_file);
719
720   return priv->state;
721 }
722
723 EmpFileTransferStateChangeReason
724 _empathy_tp_file_get_state_change_reason (EmpathyTpFile *tp_file)
725 {
726   EmpathyTpFilePriv *priv;
727
728   priv = GET_PRIV (tp_file);
729
730   g_return_val_if_fail (priv->state_change_reason >= 0,
731             EMP_FILE_TRANSFER_STATE_CHANGE_REASON_NONE);
732
733   return priv->state_change_reason;
734 }
735
736 guint64
737 _empathy_tp_file_get_size (EmpathyTpFile *tp_file)
738 {
739   EmpathyTpFilePriv *priv;
740
741   priv = GET_PRIV (tp_file);
742
743   return priv->size;
744 }
745
746 guint64
747 _empathy_tp_file_get_transferred_bytes (EmpathyTpFile *tp_file)
748 {
749   EmpathyTpFilePriv *priv;
750
751   priv = GET_PRIV (tp_file);
752
753   return priv->transferred_bytes;
754 }
755
756 gint
757 _empathy_tp_file_get_remaining_time (EmpathyTpFile *tp_file)
758 {
759   EmpathyTpFilePriv *priv;
760   gint64   curr_time, elapsed_time;
761   gdouble  time_per_byte;
762   gdouble  remaining_time;
763
764   priv = GET_PRIV (tp_file);
765
766   if (priv->size == EMPATHY_FILE_UNKNOWN_SIZE)
767     return -1;
768
769   if (priv->transferred_bytes == priv->size)
770     return 0;
771
772   curr_time = get_time_msec ();
773   elapsed_time = curr_time - priv->start_time;
774   time_per_byte = (gdouble) elapsed_time / (gdouble) priv->transferred_bytes;
775   remaining_time = (time_per_byte * (priv->size - priv->transferred_bytes)) / 1000;
776
777   return (gint) (remaining_time + 0.5);
778 }
779
780 void
781 _empathy_tp_file_set_input_stream (EmpathyTpFile *tp_file,
782     GInputStream *in_stream)
783 {
784   EmpathyTpFilePriv *priv;
785
786   priv = GET_PRIV (tp_file);
787
788   if (priv->in_stream == in_stream)
789     return;
790
791   if (priv->direction == EMP_FILE_TRANSFER_DIRECTION_INCOMING)
792     g_warning ("Setting an input stream for incoming file "
793          "transfers is useless");
794
795   if (priv->in_stream)
796     g_object_unref (priv->in_stream);
797   if (in_stream)
798     g_object_ref (in_stream);
799   priv->in_stream = in_stream;
800
801   g_object_notify (G_OBJECT (tp_file), "in-stream");
802 }
803
804 void
805 _empathy_tp_file_set_output_stream (EmpathyTpFile *tp_file,
806     GOutputStream *out_stream)
807 {
808   EmpathyTpFilePriv *priv;
809
810   priv = GET_PRIV (tp_file);
811
812   if (priv->out_stream == out_stream)
813     return;
814
815   if (priv->direction == EMP_FILE_TRANSFER_DIRECTION_OUTGOING)
816     g_warning ("Setting an output stream for outgoing file "
817          "transfers is useless");
818
819   if (priv->out_stream)
820     g_object_unref (priv->out_stream);
821   if (out_stream)
822     g_object_ref (out_stream);
823   priv->out_stream = out_stream;
824
825   g_object_notify (G_OBJECT (tp_file), "output-stream");
826 }
827
828 void
829 _empathy_tp_file_set_filename (EmpathyTpFile *tp_file,
830     const gchar *filename)
831 {
832   EmpathyTpFilePriv *priv;
833
834   priv = GET_PRIV (tp_file);
835
836   g_return_if_fail (filename != NULL);
837
838   if (priv->filename && strcmp (filename, priv->filename) == 0) {
839     return;
840   }
841
842   g_free (priv->filename);
843   priv->filename = g_strdup (filename);
844
845   g_object_notify (G_OBJECT (tp_file), "filename");
846 }
847
848 /* Functions to copy the content of a GInputStream to a GOutputStream */
849
850 #define N_BUFFERS 2
851 #define BUFFER_SIZE 4096
852
853 typedef struct {
854   GInputStream  *in;
855   GOutputStream *out;
856   GCancellable  *cancellable;
857   char          *buff[N_BUFFERS]; /* the temporary buffers */
858   gsize          count[N_BUFFERS]; /* how many bytes are used in the buffers */
859   gboolean       is_full[N_BUFFERS]; /* whether the buffers contain data */
860   gint           curr_read; /* index of the buffer used for reading */
861   gint           curr_write; /* index of the buffer used for writing */
862   gboolean       is_reading; /* we are reading */
863   gboolean       is_writing; /* we are writing */
864   guint          n_closed; /* number of streams that have been closed */
865 } CopyData;
866
867 static void schedule_next (CopyData *copy);
868
869 static void
870 free_copy_data_if_closed (CopyData *copy)
871 {
872   gint i;
873
874   /* Free the data only if both the input and output streams have
875    * been closed. */
876   copy->n_closed++;
877   if (copy->n_closed < 2)
878     return;
879
880   if (copy->in != NULL)
881     g_object_unref (copy->in);
882
883   if (copy->out != NULL)
884     g_object_unref (copy->out);
885
886   for (i = 0; i < N_BUFFERS; i++)
887     g_free (copy->buff[i]);
888
889   g_object_unref (copy->cancellable);
890   g_free (copy);
891 }
892
893 static void
894 io_error (CopyData *copy, GError *error)
895 {
896   g_cancellable_cancel (copy->cancellable);
897
898   if (error == NULL)
899     g_warning ("I/O error");
900   else if (error->domain == G_IO_ERROR && error->code == G_IO_ERROR_CANCELLED)
901     ; /* Ignore cancellations */
902   else
903     g_warning ("I/O error: %d: %s\n", error->code, error->message);
904
905   if (copy->in != NULL)
906     g_input_stream_close (copy->in, NULL, NULL);
907
908   if (copy->out != NULL)
909     g_output_stream_close (copy->out, NULL, NULL);
910
911   free_copy_data_if_closed (copy);
912 }
913
914 static void
915 close_done (GObject *source_object, GAsyncResult *res, gpointer user_data)
916 {
917   CopyData *copy = user_data;
918
919   g_object_unref (source_object);
920   free_copy_data_if_closed (copy);
921 }
922
923 static void
924 write_done_cb (GObject *source_object, GAsyncResult *res, gpointer user_data)
925 {
926   CopyData *copy = user_data;
927   gssize    count_write;
928   GError   *error = NULL;
929
930   count_write = g_output_stream_write_finish (copy->out, res, &error);
931
932   if (count_write <= 0) {
933     io_error (copy, error);
934     g_error_free (error);
935     return;
936   }
937
938   copy->is_full[copy->curr_write] = FALSE;
939   copy->curr_write = (copy->curr_write + 1) % N_BUFFERS;
940   copy->is_writing = FALSE;
941
942   schedule_next (copy);
943 }
944
945 static void
946 read_done_cb (GObject *source_object, GAsyncResult *res, gpointer user_data)
947 {
948   CopyData *copy = user_data;
949   gssize    count_read;
950   GError   *error = NULL;
951
952   count_read = g_input_stream_read_finish (copy->in, res, &error);
953
954   if (count_read == 0) {
955     g_input_stream_close_async (copy->in, 0, copy->cancellable,
956               close_done, copy);
957     copy->in = NULL;
958   }
959   else if (count_read < 0) {
960     io_error (copy, error);
961     g_error_free (error);
962     return;
963   }
964
965   copy->count[copy->curr_read] = count_read;
966   copy->is_full[copy->curr_read] = TRUE;
967   copy->curr_read = (copy->curr_read + 1) % N_BUFFERS;
968   copy->is_reading = FALSE;
969
970   schedule_next (copy);
971 }
972
973 static void
974 schedule_next (CopyData *copy)
975 {
976   if (copy->in != NULL &&
977       !copy->is_reading &&
978       !copy->is_full[copy->curr_read]) {
979     /* We are not reading and the current buffer is empty, so
980      * start an async read. */
981     copy->is_reading = TRUE;
982     g_input_stream_read_async (copy->in,
983              copy->buff[copy->curr_read],
984              BUFFER_SIZE, 0, copy->cancellable,
985              read_done_cb, copy);
986   }
987
988   if (!copy->is_writing &&
989       copy->is_full[copy->curr_write]) {
990     if (copy->count[copy->curr_write] == 0) {
991       /* The last read on the buffer read 0 bytes, this
992        * means that we got an EOF, so we can close
993        * the output channel. */
994       g_output_stream_close_async (copy->out, 0,
995                  copy->cancellable,
996                  close_done, copy);
997       copy->out = NULL;
998     } else {
999       /* We are not writing and the current buffer contains
1000        * data, so start an async write. */
1001       copy->is_writing = TRUE;
1002       g_output_stream_write_async (copy->out,
1003                  copy->buff[copy->curr_write],
1004                  copy->count[copy->curr_write],
1005                  0, copy->cancellable,
1006                  write_done_cb, copy);
1007     }
1008   }
1009 }
1010
1011 static void
1012 copy_stream (GInputStream *in, GOutputStream *out, GCancellable *cancellable)
1013 {
1014   CopyData *copy;
1015   gint      i;
1016
1017   g_return_if_fail (in != NULL);
1018   g_return_if_fail (out != NULL);
1019
1020   copy = g_new0 (CopyData, 1);
1021   copy->in = g_object_ref (in);
1022   copy->out = g_object_ref (out);
1023
1024   if (cancellable != NULL)
1025     copy->cancellable = g_object_ref (cancellable);
1026   else
1027     copy->cancellable = g_cancellable_new ();
1028
1029   for (i = 0; i < N_BUFFERS; i++)
1030     copy->buff[i] = g_malloc (BUFFER_SIZE);
1031
1032   schedule_next (copy);
1033 }