]> git.0d.be Git - empathy.git/blob - libempathy/empathy-tp-file.c
Renamed EmpathyFile to EmpathyTpFile. (Jonny Lamb)
[empathy.git] / libempathy / empathy-tp-file.c
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * Copyright (C) 2007 Collabora Ltd.
4  *   @author: Xavier Claessens <xclaesse@gmail.com>
5  * Copyright (C) 2007 Marco Barisione <marco@barisione.org>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20  */
21
22 #include <config.h>
23
24 #include <string.h>
25 #include <unistd.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <sys/un.h>
29
30 #include <glib/gi18n.h>
31
32 #include <gio/gio.h>
33 #include <gio/gunixinputstream.h>
34 #include <gio/gunixoutputstream.h>
35
36 #include <libtelepathy/tp-conn.h>
37 #include <libtelepathy/tp-helpers.h>
38 #include <libtelepathy/tp-props-iface.h>
39
40 #include <telepathy-glib/proxy-subclass.h>
41
42 #include "empathy-tp-file.h"
43 #include "empathy-contact-factory.h"
44 #include "empathy-marshal.h"
45 #include "empathy-time.h"
46 #include "empathy-utils.h"
47
48 #define DEBUG_FLAG EMPATHY_DEBUG_FT
49 #include "empathy-debug.h"
50
51 /**
52  * SECTION:empathy-tp-file
53  * @short_description: File channel
54  * @see_also: #EmpathyTpFile, #EmpathyContact, empathy_send_file()
55  * @include: libempthy/empathy-tp-file.h
56  *
57  * The #EmpathyTpFile object represents a Telepathy file channel.
58  */
59
60 /**
61  * EMPATHY_TP_FILE_UNKNOWN_SIZE:
62  *
63  * Value used for the "size" or "estimated-size" properties when the size of
64  * the transferred file is unknown.
65  */
66
67 static void empathy_tp_file_class_init (EmpathyTpFileClass *klass);
68 static void empathy_tp_file_init (EmpathyTpFile *tp_file);
69 static void tp_file_finalize (GObject *object);
70 static GObject *tp_file_constructor (GType type, guint n_props,
71     GObjectConstructParam *props);
72 static void tp_file_get_property (GObject *object, guint param_id,
73     GValue *value, GParamSpec *pspec);
74 static void tp_file_set_property (GObject *object, guint param_id,
75     const GValue *value, GParamSpec *pspec);
76 static void tp_file_destroy_cb (TpChannel *file_chan, EmpathyTpFile *tp_file);
77 static void tp_file_closed_cb (TpChannel *file_chan, EmpathyTpFile *tp_file,
78     GObject *weak_object);
79 static void tp_file_state_changed_cb (DBusGProxy *file_iface, guint state,
80     guint reason, EmpathyTpFile *tp_file);
81 static void tp_file_transferred_bytes_changed_cb (TpProxy *proxy,
82     guint64 count, EmpathyTpFile *tp_file, GObject *weak_object);
83 static void copy_stream (GInputStream *in, GOutputStream *out,
84     GCancellable *cancellable);
85
86 /* EmpathyTpFile object */
87
88 #define GET_PRIV(obj) (G_TYPE_INSTANCE_GET_PRIVATE ((obj), \
89            EMPATHY_TYPE_TP_FILE, EmpathyTpFilePriv))
90
91 typedef struct _EmpathyTpFilePriv  EmpathyTpFilePriv;
92
93 struct _EmpathyTpFilePriv {
94   EmpathyContactFactory *factory;
95   McAccount *account;
96   gchar *id;
97   MissionControl *mc;
98   TpChannel *channel;
99
100   EmpathyTpFile *cached_empathy_file;
101   EmpathyContact *contact;
102   GInputStream *in_stream;
103   GOutputStream *out_stream;
104   gchar *filename;
105   EmpFileTransferDirection direction;
106   EmpFileTransferState state;
107   EmpFileTransferStateChangeReason state_change_reason;
108   guint64 size;
109   guint64 transferred_bytes;
110   gint64 start_time;
111   gchar *unix_socket_path;
112   gchar *content_md5;
113   gchar *content_type;
114   gchar *description;
115   GCancellable *cancellable;
116 };
117
118 enum {
119   PROP_0,
120   PROP_ACCOUNT,
121   PROP_CHANNEL,
122   PROP_STATE,
123   PROP_DIRECTION,
124   PROP_FILENAME,
125   PROP_SIZE,
126   PROP_CONTENT_TYPE,
127   PROP_TRANSFERRED_BYTES,
128   PROP_CONTENT_MD5,
129   PROP_IN_STREAM,
130 };
131
132 G_DEFINE_TYPE (EmpathyTpFile, empathy_tp_file, G_TYPE_OBJECT);
133
134 static void
135 empathy_tp_file_class_init (EmpathyTpFileClass *klass)
136 {
137   GObjectClass *object_class = G_OBJECT_CLASS (klass);
138
139   object_class->finalize = tp_file_finalize;
140   object_class->constructor = tp_file_constructor;
141   object_class->get_property = tp_file_get_property;
142   object_class->set_property = tp_file_set_property;
143
144   /* Construct-only properties */
145   g_object_class_install_property (object_class,
146       PROP_ACCOUNT,
147       g_param_spec_object ("account",
148           "channel Account",
149           "The account associated with the channel",
150           MC_TYPE_ACCOUNT,
151           G_PARAM_READWRITE |
152           G_PARAM_CONSTRUCT_ONLY));
153
154   g_object_class_install_property (object_class,
155       PROP_CHANNEL,
156       g_param_spec_object ("channel",
157           "telepathy channel",
158           "The file transfer channel",
159           TP_TYPE_CHANNEL,
160           G_PARAM_READWRITE |
161           G_PARAM_CONSTRUCT_ONLY));
162
163   g_object_class_install_property (object_class,
164       PROP_STATE,
165       g_param_spec_uint ("state",
166           "state of the transfer",
167           "The file transfer state",
168           0,
169           G_MAXUINT,
170           G_MAXUINT,
171           G_PARAM_READWRITE |
172           G_PARAM_CONSTRUCT));
173
174   g_object_class_install_property (object_class,
175       PROP_DIRECTION,
176       g_param_spec_uint ("direction",
177           "direction of the transfer",
178           "The file transfer direction",
179           0,
180           G_MAXUINT,
181           G_MAXUINT,
182           G_PARAM_READWRITE |
183           G_PARAM_CONSTRUCT));
184
185   g_object_class_install_property (object_class,
186       PROP_FILENAME,
187       g_param_spec_string ("filename",
188           "name of the transfer",
189           "The file transfer filename",
190           "",
191           G_PARAM_READWRITE));
192
193   g_object_class_install_property (object_class,
194       PROP_SIZE,
195       g_param_spec_uint64 ("size",
196           "size of the file",
197           "The file transfer size",
198           0,
199           G_MAXUINT64,
200           G_MAXUINT64,
201           G_PARAM_READWRITE));
202
203   g_object_class_install_property (object_class,
204       PROP_CONTENT_TYPE,
205       g_param_spec_string ("content-type",
206           "file transfer content-type",
207           "The file transfer content-type",
208           "",
209           G_PARAM_READWRITE));
210
211   g_object_class_install_property (object_class,
212       PROP_CONTENT_MD5,
213       g_param_spec_string ("content-md5",
214           "file transfer md5sum",
215           "The md5 sum of the transfer's contents",
216           "",
217           G_PARAM_READWRITE));
218
219   g_object_class_install_property (object_class,
220       PROP_TRANSFERRED_BYTES,
221       g_param_spec_uint64 ("transferred-bytes",
222           "bytes transferred",
223           "The number of bytes transferred",
224           0,
225           G_MAXUINT64,
226           0,
227           G_PARAM_READWRITE));
228
229   g_object_class_install_property (object_class,
230       PROP_IN_STREAM,
231       g_param_spec_object ("in-stream",
232           "transfer input stream",
233           "The input stream for file transfer",
234           G_TYPE_INPUT_STREAM,
235           G_PARAM_READWRITE));
236
237   g_type_class_add_private (object_class, sizeof (EmpathyTpFilePriv));
238 }
239
240 static void
241 empathy_tp_file_init (EmpathyTpFile *tp_file)
242 {
243 }
244
245 static void
246 tp_file_finalize (GObject *object)
247 {
248   EmpathyTpFilePriv *priv;
249   EmpathyTpFile *tp_file;
250
251   tp_file = EMPATHY_TP_FILE (object);
252   priv = GET_PRIV (tp_file);
253
254   if (priv->channel)
255     {
256       DEBUG ("Closing channel..");
257       g_signal_handlers_disconnect_by_func (priv->channel,
258           tp_file_destroy_cb, object);
259       tp_cli_channel_run_close (priv->channel, -1, NULL, NULL);
260       if (G_IS_OBJECT (priv->channel))
261         g_object_unref (priv->channel);
262     }
263
264   if (priv->factory)
265     {
266       g_object_unref (priv->factory);
267     }
268   if (priv->account)
269     {
270       g_object_unref (priv->account);
271     }
272   if (priv->mc)
273     {
274       g_object_unref (priv->mc);
275     }
276
277   g_free (priv->id);
278   g_free (priv->filename);
279   g_free (priv->unix_socket_path);
280   g_free (priv->description);
281   g_free (priv->content_md5);
282   g_free (priv->content_type);
283
284   if (priv->in_stream)
285     g_object_unref (priv->in_stream);
286
287   if (priv->out_stream)
288     g_object_unref (priv->out_stream);
289
290   if (priv->contact)
291     g_object_unref (priv->contact);
292
293   if (priv->cancellable)
294     g_object_unref (priv->cancellable);
295
296   G_OBJECT_CLASS (empathy_tp_file_parent_class)->finalize (object);
297 }
298
299 static GObject *
300 tp_file_constructor (GType type,
301                      guint n_props,
302                      GObjectConstructParam *props)
303 {
304   GObject *tp_file;
305   EmpathyTpFilePriv *priv;
306   GError *error = NULL;
307   GHashTable *properties;
308   TpHandle handle;
309
310   tp_file = G_OBJECT_CLASS (empathy_tp_file_parent_class)->constructor (type,
311       n_props, props);
312
313   priv = GET_PRIV (tp_file);
314
315   priv->factory = empathy_contact_factory_new ();
316   priv->mc = empathy_mission_control_new ();
317
318   tp_cli_channel_connect_to_closed (priv->channel,
319       (tp_cli_channel_signal_callback_closed) tp_file_closed_cb,
320       tp_file,
321       NULL, NULL, NULL);
322
323   emp_cli_channel_type_file_connect_to_file_transfer_state_changed (
324       TP_PROXY (priv->channel),
325       (emp_cli_channel_type_file_signal_callback_file_transfer_state_changed)
326           tp_file_state_changed_cb,
327       tp_file,
328       NULL, NULL, NULL);
329
330   emp_cli_channel_type_file_connect_to_transferred_bytes_changed (
331       TP_PROXY (priv->channel),
332       (emp_cli_channel_type_file_signal_callback_transferred_bytes_changed)
333           tp_file_transferred_bytes_changed_cb,
334       tp_file,
335       NULL, NULL, NULL);
336
337
338   handle = tp_channel_get_handle (priv->channel, NULL);
339   priv->contact = empathy_contact_factory_get_from_handle (priv->factory,
340       priv->account,
341       (guint) handle);
342
343   if (!tp_cli_dbus_properties_run_get_all (priv->channel,
344       -1, EMP_IFACE_CHANNEL_TYPE_FILE, &properties, &error, NULL))
345     {
346       DEBUG ("Failed to get properties: %s",
347           error ? error->message : "No error given");
348       g_clear_error (&error);
349       return NULL;
350     }
351
352   priv->size = g_value_get_uint64 (g_hash_table_lookup (properties, "Size"));
353
354   priv->state = g_value_get_uint (g_hash_table_lookup (properties, "State"));
355
356   /* Invalid reason, so empathy_file_get_state_change_reason() can give
357    * a warning if called for a not closed file transfer. */
358   priv->state_change_reason = -1;
359
360   priv->transferred_bytes = g_value_get_uint64 (g_hash_table_lookup (
361       properties, "TransferredBytes"));
362
363   priv->filename = g_value_dup_string (g_hash_table_lookup (properties,
364       "Filename"));
365
366   priv->content_md5 = g_value_dup_string (g_hash_table_lookup (properties,
367       "ContentMD5"));
368
369   priv->description = g_value_dup_string (g_hash_table_lookup (properties,
370       "Description"));
371
372   priv->unix_socket_path = g_value_dup_string (g_hash_table_lookup (properties,
373       "SocketPath"));
374
375   priv->direction = g_value_get_uint (g_hash_table_lookup (properties,
376       "Direction"));
377
378   g_hash_table_destroy (properties);
379
380   return tp_file;
381 }
382
383 static void
384 tp_file_get_property (GObject *object,
385                       guint param_id,
386                       GValue *value,
387                       GParamSpec *pspec)
388 {
389   EmpathyTpFilePriv *priv;
390   EmpathyTpFile *tp_file;
391
392   priv = GET_PRIV (object);
393   tp_file = EMPATHY_TP_FILE (object);
394
395   switch (param_id)
396     {
397       case PROP_ACCOUNT:
398         g_value_set_object (value, priv->account);
399         break;
400       case PROP_CHANNEL:
401         g_value_set_object (value, priv->channel);
402         break;
403       default:
404         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
405         break;
406     };
407 }
408
409 static void
410 tp_file_channel_set_dbus_property (gpointer proxy,
411                                    const gchar *property,
412                                    const GValue *value)
413 {
414         DEBUG ("Setting %s property", property);
415         tp_cli_dbus_properties_run_set (TP_PROXY (proxy),
416             -1,
417             EMP_IFACE_CHANNEL_TYPE_FILE,
418             property,
419             value,
420             NULL, NULL);
421         DEBUG ("done");
422 }
423
424
425 static void
426 tp_file_set_property (GObject *object,
427                       guint param_id,
428                       const GValue *value,
429                       GParamSpec *pspec)
430 {
431   EmpathyTpFilePriv *priv;
432
433   priv = GET_PRIV (object);
434
435   switch (param_id)
436     {
437       case PROP_ACCOUNT:
438         priv->account = g_object_ref (g_value_get_object (value));
439         break;
440       case PROP_CHANNEL:
441         priv->channel = g_object_ref (g_value_get_object (value));
442         break;
443       case PROP_STATE:
444         priv->state = g_value_get_uint (value);
445         break;
446       case PROP_DIRECTION:
447         priv->direction = g_value_get_uint (value);
448         break;
449       case PROP_FILENAME:
450         g_free (priv->filename);
451         priv->filename = g_value_dup_string (value);
452         tp_file_channel_set_dbus_property (priv->channel, "Filename", value);
453         break;
454       case PROP_SIZE:
455         priv->size = g_value_get_uint64 (value);
456         tp_file_channel_set_dbus_property (priv->channel, "Size", value);
457         break;
458       case PROP_CONTENT_TYPE:
459         tp_file_channel_set_dbus_property (priv->channel, "ContentType", value);
460         g_free (priv->content_type);
461         priv->content_type = g_value_dup_string (value);
462         break;
463       case PROP_CONTENT_MD5:
464         tp_file_channel_set_dbus_property (priv->channel, "ContentMD5", value);
465         g_free (priv->content_md5);
466         priv->content_md5 = g_value_dup_string (value);
467         break;
468       case PROP_IN_STREAM:
469         if (priv->in_stream)
470           g_object_unref (priv->in_stream);
471         priv->in_stream = g_object_ref (g_value_get_object (value));
472         break;
473       default:
474         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
475         break;
476     };
477 }
478
479 /**
480  * empathy_tp_file_new:
481  * @account: the #McAccount for the channel
482  * @channel: a Telepathy channel
483  *
484  * Creates a new #EmpathyTpFile wrapping @channel.
485  *
486  * Returns: a new #EmpathyTpFile
487  */
488 EmpathyTpFile *
489 empathy_tp_file_new (McAccount *account,
490                   TpChannel *channel)
491 {
492   return g_object_new (EMPATHY_TYPE_TP_FILE,
493       "account", account,
494       "channel", channel,
495       NULL);
496 }
497
498 /**
499  * empathy_tp_file_get_id:
500  * @tp_file: an #EmpathyTpFile
501  *
502  * Returns the ID of @tp_file.
503  *
504  * Returns: the ID
505  */
506 const gchar *
507 empathy_tp_file_get_id (EmpathyTpFile *tp_file)
508 {
509   EmpathyTpFilePriv *priv;
510
511   g_return_val_if_fail (EMPATHY_IS_TP_FILE (tp_file), NULL);
512
513   priv = GET_PRIV (tp_file);
514
515   return priv->id;
516 }
517
518 /**
519  * empathy_tp_file_get_channel
520  * @tp_file: an #EmpathyTpFile
521  *
522  * Returns the Telepathy file transfer channel
523  *
524  * Returns: the #TpChannel
525  */
526 TpChannel *
527 empathy_tp_file_get_channel (EmpathyTpFile *tp_file)
528 {
529   EmpathyTpFilePriv *priv;
530
531   g_return_val_if_fail (EMPATHY_IS_TP_FILE (tp_file), NULL);
532
533   priv = GET_PRIV (tp_file);
534
535   return priv->channel;
536 }
537
538 static void
539 tp_file_destroy_cb (TpChannel *file_channel,
540                     EmpathyTpFile *tp_file)
541 {
542   EmpathyTpFilePriv *priv;
543
544   priv = GET_PRIV (tp_file);
545
546   DEBUG ("Channel Closed or CM crashed");
547
548   g_object_unref (priv->channel);
549   priv->channel = NULL;
550 }
551
552 static void
553 tp_file_closed_cb (TpChannel *file_channel,
554                    EmpathyTpFile *tp_file,
555                    GObject *weak_object)
556 {
557   EmpathyTpFilePriv *priv;
558
559   priv = GET_PRIV (tp_file);
560
561   /* The channel is closed, do just like if the proxy was destroyed */
562   g_signal_handlers_disconnect_by_func (priv->channel,
563       tp_file_destroy_cb,
564       tp_file);
565   tp_file_destroy_cb (file_channel, tp_file);
566 }
567
568 static gint64
569 get_time_msec (void)
570 {
571   GTimeVal tv;
572
573   g_get_current_time (&tv);
574   return ((gint64) tv.tv_sec) * 1000 + tv.tv_usec / 1000;
575 }
576
577 static gint
578 _get_local_socket (EmpathyTpFile *tp_file)
579 {
580   gint fd;
581   size_t path_len;
582   struct sockaddr_un addr;
583   EmpathyTpFilePriv *priv;
584   GValue *socket_path;
585
586   priv = GET_PRIV (tp_file);
587
588   /* TODO: This could probably be a little nicer. */
589   tp_cli_dbus_properties_run_get (priv->channel,
590       -1,
591       EMP_IFACE_CHANNEL_TYPE_FILE,
592       "SocketPath",
593       &socket_path,
594       NULL,
595       NULL);
596
597   if (priv->unix_socket_path)
598     g_free (priv->unix_socket_path);
599
600   priv->unix_socket_path = g_value_dup_string (socket_path);
601   g_value_unset (socket_path);
602
603   if (G_STR_EMPTY (priv->unix_socket_path))
604     return -1;
605
606   fd = socket (PF_UNIX, SOCK_STREAM, 0);
607   if (fd < 0)
608     return -1;
609
610   memset (&addr, 0, sizeof (addr));
611   addr.sun_family = AF_UNIX;
612   path_len = strlen (priv->unix_socket_path);
613   strncpy (addr.sun_path, priv->unix_socket_path, path_len);
614
615   if (connect (fd, (struct sockaddr*) &addr,
616       sizeof (addr)) < 0)
617     {
618       close (fd);
619       return -1;
620     }
621
622   return fd;
623 }
624
625 /**
626  * empathy_tp_file_accept:
627  * @tp_file: an #EmpathyTpFile
628  *
629  * Accepts a file transfer that's in the "local pending" state (i.e.
630  * EMP_FILE_TRANSFER_STATE_LOCAL_PENDING).
631  */
632 void
633 empathy_tp_file_accept (EmpathyTpFile *tp_file)
634 {
635   EmpathyTpFilePriv *priv;
636   GValue *address;
637   GValue nothing = { 0 };
638   GError *error = NULL;
639
640   g_return_if_fail (EMPATHY_IS_TP_FILE (tp_file));
641
642   priv = GET_PRIV (tp_file);
643
644   g_return_if_fail (priv->out_stream != NULL);
645
646   DEBUG ("Accepting file: filename=%s", priv->filename);
647
648   g_value_init (&nothing, G_TYPE_STRING);
649   g_value_set_string (&nothing, "");
650
651   if (!emp_cli_channel_type_file_run_accept_file (TP_PROXY (priv->channel),
652       -1, TP_SOCKET_ADDRESS_TYPE_UNIX, TP_SOCKET_ACCESS_CONTROL_LOCALHOST,
653       &nothing, &address, &error, NULL))
654     {
655       DEBUG ("Accept error: %s",
656           error ? error->message : "No message given");
657       g_clear_error (&error);
658     }
659
660   if (priv->unix_socket_path)
661     g_free (priv->unix_socket_path);
662
663   priv->unix_socket_path = g_value_dup_string (address);
664   g_value_unset (address);
665
666   DEBUG ("Got unix socket path: %s", priv->unix_socket_path);
667 }
668
669 static void
670 receive_tp_file (EmpathyTpFile *tp_file)
671 {
672   EmpathyTpFilePriv *priv;
673   GInputStream *socket_stream;
674   gint socket_fd;
675
676   priv = GET_PRIV (tp_file);
677
678   socket_fd = _get_local_socket (tp_file);
679
680   if (socket_fd < 0)
681     return;
682
683   socket_stream = g_unix_input_stream_new (socket_fd, TRUE);
684
685   priv->cancellable = g_cancellable_new ();
686
687   copy_stream (socket_stream, priv->out_stream, priv->cancellable);
688
689   g_object_unref (socket_stream);
690 }
691
692
693 static void
694 send_tp_file (EmpathyTpFile *tp_file)
695 {
696   gint socket_fd;
697   GOutputStream *socket_stream;
698   EmpathyTpFilePriv *priv;
699
700   priv = GET_PRIV (tp_file);
701
702   DEBUG ("Sending file content: filename=%s",
703            priv->filename);
704
705   g_return_if_fail (priv->in_stream);
706
707   socket_fd = _get_local_socket (tp_file);
708   if (socket_fd < 0)
709     {
710       DEBUG ("failed to get local socket fd");
711       return;
712     }
713   DEBUG ("got local socket fd");
714   socket_stream = g_unix_output_stream_new (socket_fd, TRUE);
715
716   priv->cancellable = g_cancellable_new ();
717
718   copy_stream (priv->in_stream, socket_stream, priv->cancellable);
719
720   g_object_unref (socket_stream);
721 }
722
723 static void
724 tp_file_state_changed_cb (DBusGProxy *tp_file_iface,
725                           EmpFileTransferState state,
726                           EmpFileTransferStateChangeReason reason,
727                           EmpathyTpFile *tp_file)
728 {
729   EmpathyTpFilePriv *priv;
730
731   priv = GET_PRIV (tp_file);
732
733   DEBUG ("File transfer state changed: filename=%s, "
734       "old state=%u, state=%u, reason=%u",
735       priv->filename, priv->state, state, reason);
736
737   if (state == EMP_FILE_TRANSFER_STATE_OPEN)
738     priv->start_time = get_time_msec ();
739
740   DEBUG ("state = %u, direction = %u, in_stream = %s, out_stream = %s",
741       state, priv->direction,
742       priv->in_stream ? "present" : "not present",
743       priv->out_stream ? "present" : "not present");
744
745   if (state == EMP_FILE_TRANSFER_STATE_OPEN &&
746       priv->direction == EMP_FILE_TRANSFER_DIRECTION_OUTGOING &&
747       priv->in_stream)
748     send_tp_file (tp_file);
749   else if (state == EMP_FILE_TRANSFER_STATE_OPEN &&
750       priv->direction == EMP_FILE_TRANSFER_DIRECTION_INCOMING &&
751       priv->out_stream)
752       receive_tp_file (tp_file);
753
754   priv->state = state;
755   priv->state_change_reason = reason;
756
757   g_object_notify (G_OBJECT (tp_file), "state");
758 }
759
760 static void
761 tp_file_transferred_bytes_changed_cb (TpProxy *proxy,
762                                       guint64 count,
763                                       EmpathyTpFile *tp_file,
764                                       GObject *weak_object)
765 {
766   EmpathyTpFilePriv *priv;
767
768   priv = GET_PRIV (tp_file);
769
770   if (priv->transferred_bytes == count)
771     return;
772
773   priv->transferred_bytes = count;
774
775   g_object_notify (G_OBJECT (tp_file), "transferred-bytes");
776 }
777
778 EmpathyContact *
779 empathy_tp_file_get_contact (EmpathyTpFile *tp_file)
780 {
781   EmpathyTpFilePriv *priv;
782
783   priv = GET_PRIV (tp_file);
784
785   return priv->contact;
786 }
787
788 GInputStream *
789 empathy_tp_file_get_input_stream (EmpathyTpFile *tp_file)
790 {
791   EmpathyTpFilePriv *priv;
792
793   priv = GET_PRIV (tp_file);
794
795   return priv->in_stream;
796 }
797
798 GOutputStream *
799 empathy_tp_file_get_output_stream (EmpathyTpFile *tp_file)
800 {
801   EmpathyTpFilePriv *priv;
802
803   priv = GET_PRIV (tp_file);
804
805   return priv->out_stream;
806 }
807
808 const gchar *
809 empathy_tp_file_get_filename (EmpathyTpFile *tp_file)
810 {
811   EmpathyTpFilePriv *priv;
812
813   priv = GET_PRIV (tp_file);
814
815   return priv->filename;
816 }
817
818 EmpFileTransferDirection
819 empathy_tp_file_get_direction (EmpathyTpFile *tp_file)
820 {
821   EmpathyTpFilePriv *priv;
822
823   priv = GET_PRIV (tp_file);
824
825   return priv->direction;
826 }
827
828 EmpFileTransferState
829 empathy_tp_file_get_state (EmpathyTpFile *tp_file)
830 {
831   EmpathyTpFilePriv *priv;
832
833   priv = GET_PRIV (tp_file);
834
835   return priv->state;
836 }
837
838 EmpFileTransferStateChangeReason
839 empathy_tp_file_get_state_change_reason (EmpathyTpFile *tp_file)
840 {
841   EmpathyTpFilePriv *priv;
842
843   priv = GET_PRIV (tp_file);
844
845   g_return_val_if_fail (priv->state_change_reason >= 0,
846       EMP_FILE_TRANSFER_STATE_CHANGE_REASON_NONE);
847
848   return priv->state_change_reason;
849 }
850
851 guint64
852 empathy_tp_file_get_size (EmpathyTpFile *tp_file)
853 {
854   EmpathyTpFilePriv *priv;
855
856   priv = GET_PRIV (tp_file);
857
858   return priv->size;
859 }
860
861 guint64
862 empathy_tp_file_get_transferred_bytes (EmpathyTpFile *tp_file)
863 {
864   EmpathyTpFilePriv *priv;
865
866   priv = GET_PRIV (tp_file);
867
868   return priv->transferred_bytes;
869 }
870
871 gint
872 empathy_tp_file_get_remaining_time (EmpathyTpFile *tp_file)
873 {
874   EmpathyTpFilePriv *priv;
875   gint64 curr_time, elapsed_time;
876   gdouble time_per_byte;
877   gdouble remaining_time;
878
879   priv = GET_PRIV (tp_file);
880
881   if (priv->size == EMPATHY_TP_FILE_UNKNOWN_SIZE)
882     return -1;
883
884   if (priv->transferred_bytes == priv->size)
885     return 0;
886
887   curr_time = get_time_msec ();
888   elapsed_time = curr_time - priv->start_time;
889   time_per_byte = (gdouble) elapsed_time / (gdouble) priv->transferred_bytes;
890   remaining_time = (time_per_byte * (priv->size - priv->transferred_bytes)) / 1000;
891
892   return (gint) (remaining_time + 0.5);
893 }
894
895 void
896 empathy_tp_file_cancel (EmpathyTpFile *tp_file)
897 {
898   EmpathyTpFilePriv *priv;
899
900   priv = GET_PRIV (tp_file);
901
902   tp_cli_channel_run_close (priv->channel, -1, NULL, NULL);
903
904   g_cancellable_cancel (priv->cancellable);
905 }
906
907 void
908 empathy_tp_file_set_input_stream (EmpathyTpFile *tp_file,
909                                   GInputStream *in_stream)
910 {
911   EmpathyTpFilePriv *priv;
912
913   priv = GET_PRIV (tp_file);
914
915   if (priv->in_stream == in_stream)
916     return;
917
918   if (priv->direction == EMP_FILE_TRANSFER_DIRECTION_INCOMING)
919     g_warning ("Setting an input stream for incoming file "
920          "transfers is useless");
921
922   if (priv->in_stream)
923     g_object_unref (priv->in_stream);
924
925   if (in_stream)
926     g_object_ref (in_stream);
927
928   priv->in_stream = in_stream;
929
930   g_object_notify (G_OBJECT (tp_file), "in-stream");
931 }
932
933 void
934 empathy_tp_file_set_output_stream (EmpathyTpFile *tp_file,
935                                    GOutputStream *out_stream)
936 {
937   EmpathyTpFilePriv *priv;
938
939   priv = GET_PRIV (tp_file);
940
941   if (priv->out_stream == out_stream)
942     return;
943
944   if (priv->direction == EMP_FILE_TRANSFER_DIRECTION_OUTGOING)
945     g_warning ("Setting an output stream for outgoing file "
946          "transfers is useless");
947
948   if (priv->out_stream)
949     g_object_unref (priv->out_stream);
950
951   if (out_stream)
952     g_object_ref (out_stream);
953
954   priv->out_stream = out_stream;
955 }
956
957 void
958 empathy_tp_file_set_filename (EmpathyTpFile *tp_file,
959                               const gchar *filename)
960 {
961   EmpathyTpFilePriv *priv;
962
963   priv = GET_PRIV (tp_file);
964
965   g_return_if_fail (filename != NULL);
966
967   if (priv->filename && strcmp (filename, priv->filename) == 0)
968     return;
969
970   g_free (priv->filename);
971   priv->filename = g_strdup (filename);
972
973   g_object_notify (G_OBJECT (tp_file), "filename");
974 }
975
976 /* Functions to copy the content of a GInputStream to a GOutputStream */
977
978 #define N_BUFFERS 2
979 #define BUFFER_SIZE 4096
980
981 typedef struct {
982   GInputStream *in;
983   GOutputStream *out;
984   GCancellable  *cancellable;
985   char *buff[N_BUFFERS]; /* the temporary buffers */
986   gsize count[N_BUFFERS]; /* how many bytes are used in the buffers */
987   gboolean is_full[N_BUFFERS]; /* whether the buffers contain data */
988   gint curr_read; /* index of the buffer used for reading */
989   gint curr_write; /* index of the buffer used for writing */
990   gboolean is_reading; /* we are reading */
991   gboolean is_writing; /* we are writing */
992   guint n_closed; /* number of streams that have been closed */
993 } CopyData;
994
995 static void schedule_next (CopyData *copy);
996
997 static void
998 free_copy_data_if_closed (CopyData *copy)
999 {
1000   gint i;
1001
1002   /* Free the data only if both the input and output streams have
1003    * been closed. */
1004   copy->n_closed++;
1005   if (copy->n_closed < 2)
1006     return;
1007
1008   if (copy->in != NULL)
1009     g_object_unref (copy->in);
1010
1011   if (copy->out != NULL)
1012     g_object_unref (copy->out);
1013
1014   for (i = 0; i < N_BUFFERS; i++)
1015     g_free (copy->buff[i]);
1016
1017   g_object_unref (copy->cancellable);
1018   g_free (copy);
1019 }
1020
1021 static void
1022 io_error (CopyData *copy,
1023           GError *error)
1024 {
1025   g_cancellable_cancel (copy->cancellable);
1026
1027   if (error == NULL)
1028     g_warning ("I/O error");
1029   else if (error->domain == G_IO_ERROR && error->code == G_IO_ERROR_CANCELLED)
1030     ; /* Ignore cancellations */
1031   else
1032     g_warning ("I/O error: %d: %s\n", error->code, error->message);
1033
1034   if (copy->in != NULL)
1035     g_input_stream_close (copy->in, NULL, NULL);
1036
1037   if (copy->out != NULL)
1038     g_output_stream_close (copy->out, NULL, NULL);
1039
1040   free_copy_data_if_closed (copy);
1041 }
1042
1043 static void
1044 close_done (GObject *source_object,
1045             GAsyncResult *res,
1046             gpointer user_data)
1047 {
1048   CopyData *copy = user_data;
1049
1050   g_object_unref (source_object);
1051   free_copy_data_if_closed (copy);
1052 }
1053
1054 static void
1055 write_done_cb (GObject *source_object,
1056                GAsyncResult *res,
1057                gpointer user_data)
1058 {
1059   CopyData *copy = user_data;
1060   gssize count_write;
1061   GError *error = NULL;
1062
1063   count_write = g_output_stream_write_finish (copy->out, res, &error);
1064
1065   if (count_write <= 0)
1066     {
1067       io_error (copy, error);
1068       g_error_free (error);
1069       return;
1070     }
1071
1072   copy->is_full[copy->curr_write] = FALSE;
1073   copy->curr_write = (copy->curr_write + 1) % N_BUFFERS;
1074   copy->is_writing = FALSE;
1075
1076   schedule_next (copy);
1077 }
1078
1079 static void
1080 read_done_cb (GObject *source_object,
1081               GAsyncResult *res,
1082               gpointer user_data)
1083 {
1084   CopyData *copy = user_data;
1085   gssize count_read;
1086   GError *error = NULL;
1087
1088   count_read = g_input_stream_read_finish (copy->in, res, &error);
1089
1090   if (count_read == 0)
1091     {
1092       g_input_stream_close_async (copy->in, 0, copy->cancellable,
1093           close_done, copy);
1094       copy->in = NULL;
1095     }
1096   else if (count_read < 0)
1097     {
1098       io_error (copy, error);
1099       g_error_free (error);
1100       return;
1101     }
1102
1103   copy->count[copy->curr_read] = count_read;
1104   copy->is_full[copy->curr_read] = TRUE;
1105   copy->curr_read = (copy->curr_read + 1) % N_BUFFERS;
1106   copy->is_reading = FALSE;
1107
1108   schedule_next (copy);
1109 }
1110
1111 static void
1112 schedule_next (CopyData *copy)
1113 {
1114   if (copy->in != NULL &&
1115       !copy->is_reading &&
1116       !copy->is_full[copy->curr_read])
1117     {
1118       /* We are not reading and the current buffer is empty, so
1119        * start an async read. */
1120       copy->is_reading = TRUE;
1121       g_input_stream_read_async (copy->in,
1122           copy->buff[copy->curr_read],
1123           BUFFER_SIZE, 0, copy->cancellable,
1124           read_done_cb, copy);
1125     }
1126
1127   if (!copy->is_writing &&
1128       copy->is_full[copy->curr_write])
1129     {
1130       if (copy->count[copy->curr_write] == 0)
1131         {
1132           /* The last read on the buffer read 0 bytes, this
1133            * means that we got an EOF, so we can close
1134            * the output channel. */
1135           g_output_stream_close_async (copy->out, 0,
1136               copy->cancellable,
1137               close_done, copy);
1138       copy->out = NULL;
1139         }
1140       else
1141         {
1142           /* We are not writing and the current buffer contains
1143            * data, so start an async write. */
1144           copy->is_writing = TRUE;
1145           g_output_stream_write_async (copy->out,
1146               copy->buff[copy->curr_write],
1147               copy->count[copy->curr_write],
1148               0, copy->cancellable,
1149               write_done_cb, copy);
1150         }
1151     }
1152 }
1153
1154 static void
1155 copy_stream (GInputStream *in,
1156              GOutputStream *out,
1157              GCancellable *cancellable)
1158 {
1159   CopyData *copy;
1160   gint i;
1161
1162   g_return_if_fail (in != NULL);
1163   g_return_if_fail (out != NULL);
1164
1165   copy = g_new0 (CopyData, 1);
1166   copy->in = g_object_ref (in);
1167   copy->out = g_object_ref (out);
1168
1169   if (cancellable != NULL)
1170     copy->cancellable = g_object_ref (cancellable);
1171   else
1172     copy->cancellable = g_cancellable_new ();
1173
1174   for (i = 0; i < N_BUFFERS; i++)
1175     copy->buff[i] = g_malloc (BUFFER_SIZE);
1176
1177   schedule_next (copy);
1178 }