]> git.0d.be Git - empathy.git/blob - libempathy/empathy-tp-file.c
Added ContentHashType channel property, and renamed ContentMD5 to ContentHash. (Jonny...
[empathy.git] / libempathy / empathy-tp-file.c
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * Copyright (C) 2007 Collabora Ltd.
4  *   @author: Xavier Claessens <xclaesse@gmail.com>
5  * Copyright (C) 2007 Marco Barisione <marco@barisione.org>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20  */
21
22 #include <config.h>
23
24 #include <string.h>
25 #include <unistd.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <sys/un.h>
29
30 #include <glib/gi18n.h>
31
32 #include <gio/gio.h>
33 #include <gio/gunixinputstream.h>
34 #include <gio/gunixoutputstream.h>
35
36 #include <libtelepathy/tp-conn.h>
37 #include <libtelepathy/tp-helpers.h>
38 #include <libtelepathy/tp-props-iface.h>
39
40 #include <telepathy-glib/proxy-subclass.h>
41
42 #include "empathy-tp-file.h"
43 #include "empathy-contact-factory.h"
44 #include "empathy-marshal.h"
45 #include "empathy-time.h"
46 #include "empathy-utils.h"
47
48 #define DEBUG_FLAG EMPATHY_DEBUG_FT
49 #include "empathy-debug.h"
50
51 /**
52  * SECTION:empathy-tp-file
53  * @short_description: File channel
54  * @see_also: #EmpathyTpFile, #EmpathyContact, empathy_send_file()
55  * @include: libempthy/empathy-tp-file.h
56  *
57  * The #EmpathyTpFile object represents a Telepathy file channel.
58  */
59
60 /**
61  * EMPATHY_TP_FILE_UNKNOWN_SIZE:
62  *
63  * Value used for the "size" or "estimated-size" properties when the size of
64  * the transferred file is unknown.
65  */
66
67 static void empathy_tp_file_class_init (EmpathyTpFileClass *klass);
68 static void empathy_tp_file_init (EmpathyTpFile *tp_file);
69 static void tp_file_finalize (GObject *object);
70 static GObject *tp_file_constructor (GType type, guint n_props,
71     GObjectConstructParam *props);
72 static void tp_file_get_property (GObject *object, guint param_id,
73     GValue *value, GParamSpec *pspec);
74 static void tp_file_set_property (GObject *object, guint param_id,
75     const GValue *value, GParamSpec *pspec);
76 static void tp_file_destroy_cb (TpChannel *file_chan, EmpathyTpFile *tp_file);
77 static void tp_file_closed_cb (TpChannel *file_chan, EmpathyTpFile *tp_file,
78     GObject *weak_object);
79 static void tp_file_state_changed_cb (DBusGProxy *file_iface, guint state,
80     guint reason, EmpathyTpFile *tp_file);
81 static void tp_file_transferred_bytes_changed_cb (TpProxy *proxy,
82     guint64 count, EmpathyTpFile *tp_file, GObject *weak_object);
83 static void copy_stream (GInputStream *in, GOutputStream *out,
84     GCancellable *cancellable);
85
86 /* EmpathyTpFile object */
87
88 #define GET_PRIV(obj) (G_TYPE_INSTANCE_GET_PRIVATE ((obj), \
89            EMPATHY_TYPE_TP_FILE, EmpathyTpFilePriv))
90
91 typedef struct _EmpathyTpFilePriv  EmpathyTpFilePriv;
92
93 struct _EmpathyTpFilePriv {
94   EmpathyContactFactory *factory;
95   McAccount *account;
96   gchar *id;
97   MissionControl *mc;
98   TpChannel *channel;
99
100   EmpathyTpFile *cached_empathy_file;
101   EmpathyContact *contact;
102   GInputStream *in_stream;
103   GOutputStream *out_stream;
104   gboolean incoming;
105   gchar *filename;
106   EmpFileTransferState state;
107   EmpFileTransferStateChangeReason state_change_reason;
108   guint64 size;
109   guint64 transferred_bytes;
110   gint64 start_time;
111   gchar *unix_socket_path;
112   gchar *content_hash;
113   EmpFileHashType content_hash_type;
114   gchar *content_type;
115   gchar *description;
116   GCancellable *cancellable;
117 };
118
119 enum {
120   PROP_0,
121   PROP_ACCOUNT,
122   PROP_CHANNEL,
123   PROP_STATE,
124   PROP_INCOMING,
125   PROP_FILENAME,
126   PROP_SIZE,
127   PROP_CONTENT_TYPE,
128   PROP_TRANSFERRED_BYTES,
129   PROP_CONTENT_HASH_TYPE,
130   PROP_CONTENT_HASH,
131   PROP_IN_STREAM,
132 };
133
134 G_DEFINE_TYPE (EmpathyTpFile, empathy_tp_file, G_TYPE_OBJECT);
135
136 static void
137 empathy_tp_file_class_init (EmpathyTpFileClass *klass)
138 {
139   GObjectClass *object_class = G_OBJECT_CLASS (klass);
140
141   object_class->finalize = tp_file_finalize;
142   object_class->constructor = tp_file_constructor;
143   object_class->get_property = tp_file_get_property;
144   object_class->set_property = tp_file_set_property;
145
146   /* Construct-only properties */
147   g_object_class_install_property (object_class,
148       PROP_ACCOUNT,
149       g_param_spec_object ("account",
150           "channel Account",
151           "The account associated with the channel",
152           MC_TYPE_ACCOUNT,
153           G_PARAM_READWRITE |
154           G_PARAM_CONSTRUCT_ONLY));
155
156   g_object_class_install_property (object_class,
157       PROP_CHANNEL,
158       g_param_spec_object ("channel",
159           "telepathy channel",
160           "The file transfer channel",
161           TP_TYPE_CHANNEL,
162           G_PARAM_READWRITE |
163           G_PARAM_CONSTRUCT_ONLY));
164
165   g_object_class_install_property (object_class,
166       PROP_STATE,
167       g_param_spec_uint ("state",
168           "state of the transfer",
169           "The file transfer state",
170           0,
171           G_MAXUINT,
172           G_MAXUINT,
173           G_PARAM_READWRITE |
174           G_PARAM_CONSTRUCT));
175
176   g_object_class_install_property (object_class,
177       PROP_INCOMING,
178       g_param_spec_boolean ("incoming",
179           "incoming",
180           "Whether the transfer is incoming",
181           FALSE,
182           G_PARAM_READWRITE |
183           G_PARAM_CONSTRUCT));
184
185   g_object_class_install_property (object_class,
186       PROP_FILENAME,
187       g_param_spec_string ("filename",
188           "name of the transfer",
189           "The file transfer filename",
190           "",
191           G_PARAM_READWRITE));
192
193   g_object_class_install_property (object_class,
194       PROP_SIZE,
195       g_param_spec_uint64 ("size",
196           "size of the file",
197           "The file transfer size",
198           0,
199           G_MAXUINT64,
200           G_MAXUINT64,
201           G_PARAM_READWRITE));
202
203   g_object_class_install_property (object_class,
204       PROP_CONTENT_TYPE,
205       g_param_spec_string ("content-type",
206           "file transfer content-type",
207           "The file transfer content-type",
208           "",
209           G_PARAM_READWRITE));
210
211   g_object_class_install_property (object_class,
212       PROP_CONTENT_HASH_TYPE,
213       g_param_spec_uint ("content-hash-type",
214           "file transfer hash type",
215           "The type of the file transfer hash",
216           0,
217           G_MAXUINT,
218           0,
219           G_PARAM_READWRITE));
220
221   g_object_class_install_property (object_class,
222       PROP_CONTENT_HASH,
223       g_param_spec_string ("content-hash",
224           "file transfer hash",
225           "The hash of the transfer's contents",
226           "",
227           G_PARAM_READWRITE));
228
229   g_object_class_install_property (object_class,
230       PROP_TRANSFERRED_BYTES,
231       g_param_spec_uint64 ("transferred-bytes",
232           "bytes transferred",
233           "The number of bytes transferred",
234           0,
235           G_MAXUINT64,
236           0,
237           G_PARAM_READWRITE));
238
239   g_object_class_install_property (object_class,
240       PROP_IN_STREAM,
241       g_param_spec_object ("in-stream",
242           "transfer input stream",
243           "The input stream for file transfer",
244           G_TYPE_INPUT_STREAM,
245           G_PARAM_READWRITE));
246
247   g_type_class_add_private (object_class, sizeof (EmpathyTpFilePriv));
248 }
249
250 static void
251 empathy_tp_file_init (EmpathyTpFile *tp_file)
252 {
253 }
254
255 static void
256 tp_file_finalize (GObject *object)
257 {
258   EmpathyTpFilePriv *priv;
259   EmpathyTpFile *tp_file;
260
261   tp_file = EMPATHY_TP_FILE (object);
262   priv = GET_PRIV (tp_file);
263
264   if (priv->channel)
265     {
266       DEBUG ("Closing channel..");
267       g_signal_handlers_disconnect_by_func (priv->channel,
268           tp_file_destroy_cb, object);
269       tp_cli_channel_run_close (priv->channel, -1, NULL, NULL);
270       if (G_IS_OBJECT (priv->channel))
271         g_object_unref (priv->channel);
272     }
273
274   if (priv->factory)
275     {
276       g_object_unref (priv->factory);
277     }
278   if (priv->account)
279     {
280       g_object_unref (priv->account);
281     }
282   if (priv->mc)
283     {
284       g_object_unref (priv->mc);
285     }
286
287   g_free (priv->id);
288   g_free (priv->filename);
289   g_free (priv->unix_socket_path);
290   g_free (priv->description);
291   g_free (priv->content_hash);
292   g_free (priv->content_type);
293
294   if (priv->in_stream)
295     g_object_unref (priv->in_stream);
296
297   if (priv->out_stream)
298     g_object_unref (priv->out_stream);
299
300   if (priv->contact)
301     g_object_unref (priv->contact);
302
303   if (priv->cancellable)
304     g_object_unref (priv->cancellable);
305
306   G_OBJECT_CLASS (empathy_tp_file_parent_class)->finalize (object);
307 }
308
309 static GObject *
310 tp_file_constructor (GType type,
311                      guint n_props,
312                      GObjectConstructParam *props)
313 {
314   GObject *tp_file;
315   EmpathyTpFilePriv *priv;
316   GError *error = NULL;
317   GHashTable *properties;
318   TpHandle handle;
319
320   tp_file = G_OBJECT_CLASS (empathy_tp_file_parent_class)->constructor (type,
321       n_props, props);
322
323   priv = GET_PRIV (tp_file);
324
325   priv->factory = empathy_contact_factory_new ();
326   priv->mc = empathy_mission_control_new ();
327
328   tp_cli_channel_connect_to_closed (priv->channel,
329       (tp_cli_channel_signal_callback_closed) tp_file_closed_cb,
330       tp_file,
331       NULL, NULL, NULL);
332
333   emp_cli_channel_type_file_connect_to_file_transfer_state_changed (
334       TP_PROXY (priv->channel),
335       (emp_cli_channel_type_file_signal_callback_file_transfer_state_changed)
336           tp_file_state_changed_cb,
337       tp_file,
338       NULL, NULL, NULL);
339
340   emp_cli_channel_type_file_connect_to_transferred_bytes_changed (
341       TP_PROXY (priv->channel),
342       (emp_cli_channel_type_file_signal_callback_transferred_bytes_changed)
343           tp_file_transferred_bytes_changed_cb,
344       tp_file,
345       NULL, NULL, NULL);
346
347
348   handle = tp_channel_get_handle (priv->channel, NULL);
349   priv->contact = empathy_contact_factory_get_from_handle (priv->factory,
350       priv->account,
351       (guint) handle);
352
353   if (!tp_cli_dbus_properties_run_get_all (priv->channel,
354       -1, EMP_IFACE_CHANNEL_TYPE_FILE, &properties, &error, NULL))
355     {
356       DEBUG ("Failed to get properties: %s",
357           error ? error->message : "No error given");
358       g_clear_error (&error);
359       return NULL;
360     }
361
362   priv->size = g_value_get_uint64 (g_hash_table_lookup (properties, "Size"));
363
364   priv->state = g_value_get_uint (g_hash_table_lookup (properties, "State"));
365
366   /* Invalid reason, so empathy_file_get_state_change_reason() can give
367    * a warning if called for a not closed file transfer. */
368   priv->state_change_reason = -1;
369
370   priv->transferred_bytes = g_value_get_uint64 (g_hash_table_lookup (
371       properties, "TransferredBytes"));
372
373   priv->filename = g_value_dup_string (g_hash_table_lookup (properties,
374       "Filename"));
375
376   priv->content_hash = g_value_dup_string (g_hash_table_lookup (properties,
377       "ContentHash"));
378
379   priv->description = g_value_dup_string (g_hash_table_lookup (properties,
380       "Description"));
381
382   priv->unix_socket_path = g_value_dup_string (g_hash_table_lookup (properties,
383       "SocketPath"));
384
385   if (priv->state == EMP_FILE_TRANSFER_STATE_LOCAL_PENDING)
386     priv->incoming = TRUE;
387
388   g_hash_table_destroy (properties);
389
390   return tp_file;
391 }
392
393 static void
394 tp_file_get_property (GObject *object,
395                       guint param_id,
396                       GValue *value,
397                       GParamSpec *pspec)
398 {
399   EmpathyTpFilePriv *priv;
400   EmpathyTpFile *tp_file;
401
402   priv = GET_PRIV (object);
403   tp_file = EMPATHY_TP_FILE (object);
404
405   switch (param_id)
406     {
407       case PROP_ACCOUNT:
408         g_value_set_object (value, priv->account);
409         break;
410       case PROP_CHANNEL:
411         g_value_set_object (value, priv->channel);
412         break;
413       default:
414         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
415         break;
416     };
417 }
418
419 static void
420 tp_file_channel_set_dbus_property (gpointer proxy,
421                                    const gchar *property,
422                                    const GValue *value)
423 {
424         DEBUG ("Setting %s property", property);
425         tp_cli_dbus_properties_run_set (TP_PROXY (proxy),
426             -1,
427             EMP_IFACE_CHANNEL_TYPE_FILE,
428             property,
429             value,
430             NULL, NULL);
431         DEBUG ("done");
432 }
433
434
435 static void
436 tp_file_set_property (GObject *object,
437                       guint param_id,
438                       const GValue *value,
439                       GParamSpec *pspec)
440 {
441   EmpathyTpFilePriv *priv;
442
443   priv = GET_PRIV (object);
444
445   switch (param_id)
446     {
447       case PROP_ACCOUNT:
448         priv->account = g_object_ref (g_value_get_object (value));
449         break;
450       case PROP_CHANNEL:
451         priv->channel = g_object_ref (g_value_get_object (value));
452         break;
453       case PROP_STATE:
454         priv->state = g_value_get_uint (value);
455         break;
456       case PROP_INCOMING:
457         priv->incoming = g_value_get_boolean (value);
458         break;
459       case PROP_FILENAME:
460         g_free (priv->filename);
461         priv->filename = g_value_dup_string (value);
462         tp_file_channel_set_dbus_property (priv->channel, "Filename", value);
463         break;
464       case PROP_SIZE:
465         priv->size = g_value_get_uint64 (value);
466         tp_file_channel_set_dbus_property (priv->channel, "Size", value);
467         break;
468       case PROP_CONTENT_TYPE:
469         tp_file_channel_set_dbus_property (priv->channel, "ContentType", value);
470         g_free (priv->content_type);
471         priv->content_type = g_value_dup_string (value);
472         break;
473       case PROP_CONTENT_HASH:
474         tp_file_channel_set_dbus_property (priv->channel, "ContentHash", value);
475         g_free (priv->content_hash);
476         priv->content_hash = g_value_dup_string (value);
477         break;
478       case PROP_IN_STREAM:
479         if (priv->in_stream)
480           g_object_unref (priv->in_stream);
481         priv->in_stream = g_object_ref (g_value_get_object (value));
482         break;
483       default:
484         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
485         break;
486     };
487 }
488
489 /**
490  * empathy_tp_file_new:
491  * @account: the #McAccount for the channel
492  * @channel: a Telepathy channel
493  *
494  * Creates a new #EmpathyTpFile wrapping @channel.
495  *
496  * Returns: a new #EmpathyTpFile
497  */
498 EmpathyTpFile *
499 empathy_tp_file_new (McAccount *account,
500                   TpChannel *channel)
501 {
502   return g_object_new (EMPATHY_TYPE_TP_FILE,
503       "account", account,
504       "channel", channel,
505       NULL);
506 }
507
508 /**
509  * empathy_tp_file_get_id:
510  * @tp_file: an #EmpathyTpFile
511  *
512  * Returns the ID of @tp_file.
513  *
514  * Returns: the ID
515  */
516 const gchar *
517 empathy_tp_file_get_id (EmpathyTpFile *tp_file)
518 {
519   EmpathyTpFilePriv *priv;
520
521   g_return_val_if_fail (EMPATHY_IS_TP_FILE (tp_file), NULL);
522
523   priv = GET_PRIV (tp_file);
524
525   return priv->id;
526 }
527
528 /**
529  * empathy_tp_file_get_channel
530  * @tp_file: an #EmpathyTpFile
531  *
532  * Returns the Telepathy file transfer channel
533  *
534  * Returns: the #TpChannel
535  */
536 TpChannel *
537 empathy_tp_file_get_channel (EmpathyTpFile *tp_file)
538 {
539   EmpathyTpFilePriv *priv;
540
541   g_return_val_if_fail (EMPATHY_IS_TP_FILE (tp_file), NULL);
542
543   priv = GET_PRIV (tp_file);
544
545   return priv->channel;
546 }
547
548 static void
549 tp_file_destroy_cb (TpChannel *file_channel,
550                     EmpathyTpFile *tp_file)
551 {
552   EmpathyTpFilePriv *priv;
553
554   priv = GET_PRIV (tp_file);
555
556   DEBUG ("Channel Closed or CM crashed");
557
558   g_object_unref (priv->channel);
559   priv->channel = NULL;
560 }
561
562 static void
563 tp_file_closed_cb (TpChannel *file_channel,
564                    EmpathyTpFile *tp_file,
565                    GObject *weak_object)
566 {
567   EmpathyTpFilePriv *priv;
568
569   priv = GET_PRIV (tp_file);
570
571   /* The channel is closed, do just like if the proxy was destroyed */
572   g_signal_handlers_disconnect_by_func (priv->channel,
573       tp_file_destroy_cb,
574       tp_file);
575   tp_file_destroy_cb (file_channel, tp_file);
576 }
577
578 static gint64
579 get_time_msec (void)
580 {
581   GTimeVal tv;
582
583   g_get_current_time (&tv);
584   return ((gint64) tv.tv_sec) * 1000 + tv.tv_usec / 1000;
585 }
586
587 static gint
588 _get_local_socket (EmpathyTpFile *tp_file)
589 {
590   gint fd;
591   size_t path_len;
592   struct sockaddr_un addr;
593   EmpathyTpFilePriv *priv;
594   GValue *socket_path;
595
596   priv = GET_PRIV (tp_file);
597
598   /* TODO: This could probably be a little nicer. */
599   tp_cli_dbus_properties_run_get (priv->channel,
600       -1,
601       EMP_IFACE_CHANNEL_TYPE_FILE,
602       "SocketPath",
603       &socket_path,
604       NULL,
605       NULL);
606
607   if (priv->unix_socket_path)
608     g_free (priv->unix_socket_path);
609
610   priv->unix_socket_path = g_value_dup_string (socket_path);
611   g_value_unset (socket_path);
612
613   if (G_STR_EMPTY (priv->unix_socket_path))
614     return -1;
615
616   fd = socket (PF_UNIX, SOCK_STREAM, 0);
617   if (fd < 0)
618     return -1;
619
620   memset (&addr, 0, sizeof (addr));
621   addr.sun_family = AF_UNIX;
622   path_len = strlen (priv->unix_socket_path);
623   strncpy (addr.sun_path, priv->unix_socket_path, path_len);
624
625   if (connect (fd, (struct sockaddr*) &addr,
626       sizeof (addr)) < 0)
627     {
628       close (fd);
629       return -1;
630     }
631
632   return fd;
633 }
634
635 /**
636  * empathy_tp_file_accept:
637  * @tp_file: an #EmpathyTpFile
638  *
639  * Accepts a file transfer that's in the "local pending" state (i.e.
640  * EMP_FILE_TRANSFER_STATE_LOCAL_PENDING).
641  */
642 void
643 empathy_tp_file_accept (EmpathyTpFile *tp_file)
644 {
645   EmpathyTpFilePriv *priv;
646   GValue *address;
647   GValue nothing = { 0 };
648   GError *error = NULL;
649
650   g_return_if_fail (EMPATHY_IS_TP_FILE (tp_file));
651
652   priv = GET_PRIV (tp_file);
653
654   g_return_if_fail (priv->out_stream != NULL);
655
656   DEBUG ("Accepting file: filename=%s", priv->filename);
657
658   g_value_init (&nothing, G_TYPE_STRING);
659   g_value_set_string (&nothing, "");
660
661   if (!emp_cli_channel_type_file_run_accept_file (TP_PROXY (priv->channel),
662       -1, TP_SOCKET_ADDRESS_TYPE_UNIX, TP_SOCKET_ACCESS_CONTROL_LOCALHOST,
663       &nothing, &address, &error, NULL))
664     {
665       DEBUG ("Accept error: %s",
666           error ? error->message : "No message given");
667       g_clear_error (&error);
668     }
669
670   if (priv->unix_socket_path)
671     g_free (priv->unix_socket_path);
672
673   priv->unix_socket_path = g_value_dup_string (address);
674   g_value_unset (address);
675
676   DEBUG ("Got unix socket path: %s", priv->unix_socket_path);
677 }
678
679 static void
680 receive_tp_file (EmpathyTpFile *tp_file)
681 {
682   EmpathyTpFilePriv *priv;
683   GInputStream *socket_stream;
684   gint socket_fd;
685
686   priv = GET_PRIV (tp_file);
687
688   socket_fd = _get_local_socket (tp_file);
689
690   if (socket_fd < 0)
691     return;
692
693   socket_stream = g_unix_input_stream_new (socket_fd, TRUE);
694
695   priv->cancellable = g_cancellable_new ();
696
697   copy_stream (socket_stream, priv->out_stream, priv->cancellable);
698
699   g_object_unref (socket_stream);
700 }
701
702
703 static void
704 send_tp_file (EmpathyTpFile *tp_file)
705 {
706   gint socket_fd;
707   GOutputStream *socket_stream;
708   EmpathyTpFilePriv *priv;
709
710   priv = GET_PRIV (tp_file);
711
712   DEBUG ("Sending file content: filename=%s",
713            priv->filename);
714
715   g_return_if_fail (priv->in_stream);
716
717   socket_fd = _get_local_socket (tp_file);
718   if (socket_fd < 0)
719     {
720       DEBUG ("failed to get local socket fd");
721       return;
722     }
723   DEBUG ("got local socket fd");
724   socket_stream = g_unix_output_stream_new (socket_fd, TRUE);
725
726   priv->cancellable = g_cancellable_new ();
727
728   copy_stream (priv->in_stream, socket_stream, priv->cancellable);
729
730   g_object_unref (socket_stream);
731 }
732
733 static void
734 tp_file_state_changed_cb (DBusGProxy *tp_file_iface,
735                           EmpFileTransferState state,
736                           EmpFileTransferStateChangeReason reason,
737                           EmpathyTpFile *tp_file)
738 {
739   EmpathyTpFilePriv *priv;
740
741   priv = GET_PRIV (tp_file);
742
743   DEBUG ("File transfer state changed: filename=%s, "
744       "old state=%u, state=%u, reason=%u",
745       priv->filename, priv->state, state, reason);
746
747   if (state == EMP_FILE_TRANSFER_STATE_OPEN)
748     priv->start_time = get_time_msec ();
749
750   DEBUG ("state = %u, incoming = %s, in_stream = %s, out_stream = %s",
751       state, priv->incoming ? "yes" : "no",
752       priv->in_stream ? "present" : "not present",
753       priv->out_stream ? "present" : "not present");
754
755   if (state == EMP_FILE_TRANSFER_STATE_OPEN && !priv->incoming &&
756       priv->in_stream)
757     send_tp_file (tp_file);
758   else if (state == EMP_FILE_TRANSFER_STATE_OPEN && priv->incoming &&
759       priv->out_stream)
760       receive_tp_file (tp_file);
761
762   priv->state = state;
763   priv->state_change_reason = reason;
764
765   g_object_notify (G_OBJECT (tp_file), "state");
766 }
767
768 static void
769 tp_file_transferred_bytes_changed_cb (TpProxy *proxy,
770                                       guint64 count,
771                                       EmpathyTpFile *tp_file,
772                                       GObject *weak_object)
773 {
774   EmpathyTpFilePriv *priv;
775
776   priv = GET_PRIV (tp_file);
777
778   if (priv->transferred_bytes == count)
779     return;
780
781   priv->transferred_bytes = count;
782
783   g_object_notify (G_OBJECT (tp_file), "transferred-bytes");
784 }
785
786 EmpathyContact *
787 empathy_tp_file_get_contact (EmpathyTpFile *tp_file)
788 {
789   EmpathyTpFilePriv *priv;
790
791   priv = GET_PRIV (tp_file);
792
793   return priv->contact;
794 }
795
796 GInputStream *
797 empathy_tp_file_get_input_stream (EmpathyTpFile *tp_file)
798 {
799   EmpathyTpFilePriv *priv;
800
801   priv = GET_PRIV (tp_file);
802
803   return priv->in_stream;
804 }
805
806 GOutputStream *
807 empathy_tp_file_get_output_stream (EmpathyTpFile *tp_file)
808 {
809   EmpathyTpFilePriv *priv;
810
811   priv = GET_PRIV (tp_file);
812
813   return priv->out_stream;
814 }
815
816 const gchar *
817 empathy_tp_file_get_filename (EmpathyTpFile *tp_file)
818 {
819   EmpathyTpFilePriv *priv;
820
821   priv = GET_PRIV (tp_file);
822
823   return priv->filename;
824 }
825
826 gboolean
827 empathy_tp_file_get_incoming (EmpathyTpFile *tp_file)
828 {
829   EmpathyTpFilePriv *priv;
830
831   priv = GET_PRIV (tp_file);
832
833   return priv->incoming;
834 }
835
836 EmpFileTransferState
837 empathy_tp_file_get_state (EmpathyTpFile *tp_file)
838 {
839   EmpathyTpFilePriv *priv;
840
841   priv = GET_PRIV (tp_file);
842
843   return priv->state;
844 }
845
846 EmpFileTransferStateChangeReason
847 empathy_tp_file_get_state_change_reason (EmpathyTpFile *tp_file)
848 {
849   EmpathyTpFilePriv *priv;
850
851   priv = GET_PRIV (tp_file);
852
853   g_return_val_if_fail (priv->state_change_reason >= 0,
854       EMP_FILE_TRANSFER_STATE_CHANGE_REASON_NONE);
855
856   return priv->state_change_reason;
857 }
858
859 guint64
860 empathy_tp_file_get_size (EmpathyTpFile *tp_file)
861 {
862   EmpathyTpFilePriv *priv;
863
864   priv = GET_PRIV (tp_file);
865
866   return priv->size;
867 }
868
869 guint64
870 empathy_tp_file_get_transferred_bytes (EmpathyTpFile *tp_file)
871 {
872   EmpathyTpFilePriv *priv;
873
874   priv = GET_PRIV (tp_file);
875
876   return priv->transferred_bytes;
877 }
878
879 gint
880 empathy_tp_file_get_remaining_time (EmpathyTpFile *tp_file)
881 {
882   EmpathyTpFilePriv *priv;
883   gint64 curr_time, elapsed_time;
884   gdouble time_per_byte;
885   gdouble remaining_time;
886
887   priv = GET_PRIV (tp_file);
888
889   if (priv->size == EMPATHY_TP_FILE_UNKNOWN_SIZE)
890     return -1;
891
892   if (priv->transferred_bytes == priv->size)
893     return 0;
894
895   curr_time = get_time_msec ();
896   elapsed_time = curr_time - priv->start_time;
897   time_per_byte = (gdouble) elapsed_time / (gdouble) priv->transferred_bytes;
898   remaining_time = (time_per_byte * (priv->size - priv->transferred_bytes)) / 1000;
899
900   return (gint) (remaining_time + 0.5);
901 }
902
903 void
904 empathy_tp_file_cancel (EmpathyTpFile *tp_file)
905 {
906   EmpathyTpFilePriv *priv;
907
908   priv = GET_PRIV (tp_file);
909
910   tp_cli_channel_run_close (priv->channel, -1, NULL, NULL);
911
912   g_cancellable_cancel (priv->cancellable);
913 }
914
915 void
916 empathy_tp_file_set_input_stream (EmpathyTpFile *tp_file,
917                                   GInputStream *in_stream)
918 {
919   EmpathyTpFilePriv *priv;
920
921   priv = GET_PRIV (tp_file);
922
923   if (priv->in_stream == in_stream)
924     return;
925
926   if (priv->incoming)
927     g_warning ("Setting an input stream for incoming file "
928          "transfers is useless");
929
930   if (priv->in_stream)
931     g_object_unref (priv->in_stream);
932
933   if (in_stream)
934     g_object_ref (in_stream);
935
936   priv->in_stream = in_stream;
937
938   g_object_notify (G_OBJECT (tp_file), "in-stream");
939 }
940
941 void
942 empathy_tp_file_set_output_stream (EmpathyTpFile *tp_file,
943                                    GOutputStream *out_stream)
944 {
945   EmpathyTpFilePriv *priv;
946
947   priv = GET_PRIV (tp_file);
948
949   if (priv->out_stream == out_stream)
950     return;
951
952   if (!priv->incoming)
953     g_warning ("Setting an output stream for outgoing file "
954          "transfers is useless");
955
956   if (priv->out_stream)
957     g_object_unref (priv->out_stream);
958
959   if (out_stream)
960     g_object_ref (out_stream);
961
962   priv->out_stream = out_stream;
963 }
964
965 void
966 empathy_tp_file_set_filename (EmpathyTpFile *tp_file,
967                               const gchar *filename)
968 {
969   EmpathyTpFilePriv *priv;
970
971   priv = GET_PRIV (tp_file);
972
973   g_return_if_fail (filename != NULL);
974
975   if (priv->filename && strcmp (filename, priv->filename) == 0)
976     return;
977
978   g_free (priv->filename);
979   priv->filename = g_strdup (filename);
980
981   g_object_notify (G_OBJECT (tp_file), "filename");
982 }
983
984 /* Functions to copy the content of a GInputStream to a GOutputStream */
985
986 #define N_BUFFERS 2
987 #define BUFFER_SIZE 4096
988
989 typedef struct {
990   GInputStream *in;
991   GOutputStream *out;
992   GCancellable  *cancellable;
993   char *buff[N_BUFFERS]; /* the temporary buffers */
994   gsize count[N_BUFFERS]; /* how many bytes are used in the buffers */
995   gboolean is_full[N_BUFFERS]; /* whether the buffers contain data */
996   gint curr_read; /* index of the buffer used for reading */
997   gint curr_write; /* index of the buffer used for writing */
998   gboolean is_reading; /* we are reading */
999   gboolean is_writing; /* we are writing */
1000   guint n_closed; /* number of streams that have been closed */
1001 } CopyData;
1002
1003 static void schedule_next (CopyData *copy);
1004
1005 static void
1006 free_copy_data_if_closed (CopyData *copy)
1007 {
1008   gint i;
1009
1010   /* Free the data only if both the input and output streams have
1011    * been closed. */
1012   copy->n_closed++;
1013   if (copy->n_closed < 2)
1014     return;
1015
1016   if (copy->in != NULL)
1017     g_object_unref (copy->in);
1018
1019   if (copy->out != NULL)
1020     g_object_unref (copy->out);
1021
1022   for (i = 0; i < N_BUFFERS; i++)
1023     g_free (copy->buff[i]);
1024
1025   g_object_unref (copy->cancellable);
1026   g_free (copy);
1027 }
1028
1029 static void
1030 io_error (CopyData *copy,
1031           GError *error)
1032 {
1033   g_cancellable_cancel (copy->cancellable);
1034
1035   if (error == NULL)
1036     g_warning ("I/O error");
1037   else if (error->domain == G_IO_ERROR && error->code == G_IO_ERROR_CANCELLED)
1038     ; /* Ignore cancellations */
1039   else
1040     g_warning ("I/O error: %d: %s\n", error->code, error->message);
1041
1042   if (copy->in != NULL)
1043     g_input_stream_close (copy->in, NULL, NULL);
1044
1045   if (copy->out != NULL)
1046     g_output_stream_close (copy->out, NULL, NULL);
1047
1048   free_copy_data_if_closed (copy);
1049 }
1050
1051 static void
1052 close_done (GObject *source_object,
1053             GAsyncResult *res,
1054             gpointer user_data)
1055 {
1056   CopyData *copy = user_data;
1057
1058   g_object_unref (source_object);
1059   free_copy_data_if_closed (copy);
1060 }
1061
1062 static void
1063 write_done_cb (GObject *source_object,
1064                GAsyncResult *res,
1065                gpointer user_data)
1066 {
1067   CopyData *copy = user_data;
1068   gssize count_write;
1069   GError *error = NULL;
1070
1071   count_write = g_output_stream_write_finish (copy->out, res, &error);
1072
1073   if (count_write <= 0)
1074     {
1075       io_error (copy, error);
1076       g_error_free (error);
1077       return;
1078     }
1079
1080   copy->is_full[copy->curr_write] = FALSE;
1081   copy->curr_write = (copy->curr_write + 1) % N_BUFFERS;
1082   copy->is_writing = FALSE;
1083
1084   schedule_next (copy);
1085 }
1086
1087 static void
1088 read_done_cb (GObject *source_object,
1089               GAsyncResult *res,
1090               gpointer user_data)
1091 {
1092   CopyData *copy = user_data;
1093   gssize count_read;
1094   GError *error = NULL;
1095
1096   count_read = g_input_stream_read_finish (copy->in, res, &error);
1097
1098   if (count_read == 0)
1099     {
1100       g_input_stream_close_async (copy->in, 0, copy->cancellable,
1101           close_done, copy);
1102       copy->in = NULL;
1103     }
1104   else if (count_read < 0)
1105     {
1106       io_error (copy, error);
1107       g_error_free (error);
1108       return;
1109     }
1110
1111   copy->count[copy->curr_read] = count_read;
1112   copy->is_full[copy->curr_read] = TRUE;
1113   copy->curr_read = (copy->curr_read + 1) % N_BUFFERS;
1114   copy->is_reading = FALSE;
1115
1116   schedule_next (copy);
1117 }
1118
1119 static void
1120 schedule_next (CopyData *copy)
1121 {
1122   if (copy->in != NULL &&
1123       !copy->is_reading &&
1124       !copy->is_full[copy->curr_read])
1125     {
1126       /* We are not reading and the current buffer is empty, so
1127        * start an async read. */
1128       copy->is_reading = TRUE;
1129       g_input_stream_read_async (copy->in,
1130           copy->buff[copy->curr_read],
1131           BUFFER_SIZE, 0, copy->cancellable,
1132           read_done_cb, copy);
1133     }
1134
1135   if (!copy->is_writing &&
1136       copy->is_full[copy->curr_write])
1137     {
1138       if (copy->count[copy->curr_write] == 0)
1139         {
1140           /* The last read on the buffer read 0 bytes, this
1141            * means that we got an EOF, so we can close
1142            * the output channel. */
1143           g_output_stream_close_async (copy->out, 0,
1144               copy->cancellable,
1145               close_done, copy);
1146       copy->out = NULL;
1147         }
1148       else
1149         {
1150           /* We are not writing and the current buffer contains
1151            * data, so start an async write. */
1152           copy->is_writing = TRUE;
1153           g_output_stream_write_async (copy->out,
1154               copy->buff[copy->curr_write],
1155               copy->count[copy->curr_write],
1156               0, copy->cancellable,
1157               write_done_cb, copy);
1158         }
1159     }
1160 }
1161
1162 static void
1163 copy_stream (GInputStream *in,
1164              GOutputStream *out,
1165              GCancellable *cancellable)
1166 {
1167   CopyData *copy;
1168   gint i;
1169
1170   g_return_if_fail (in != NULL);
1171   g_return_if_fail (out != NULL);
1172
1173   copy = g_new0 (CopyData, 1);
1174   copy->in = g_object_ref (in);
1175   copy->out = g_object_ref (out);
1176
1177   if (cancellable != NULL)
1178     copy->cancellable = g_object_ref (cancellable);
1179   else
1180     copy->cancellable = g_cancellable_new ();
1181
1182   for (i = 0; i < N_BUFFERS; i++)
1183     copy->buff[i] = g_malloc (BUFFER_SIZE);
1184
1185   schedule_next (copy);
1186 }