]> git.0d.be Git - empathy.git/blob - libempathy/empathy-tp-file.c
Updated yet more places to use s/direction/incoming/ (Jonny Lamb)
[empathy.git] / libempathy / empathy-tp-file.c
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * Copyright (C) 2007 Collabora Ltd.
4  *   @author: Xavier Claessens <xclaesse@gmail.com>
5  * Copyright (C) 2007 Marco Barisione <marco@barisione.org>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
20  */
21
22 #include <config.h>
23
24 #include <string.h>
25 #include <unistd.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <sys/un.h>
29
30 #include <glib/gi18n.h>
31
32 #include <gio/gio.h>
33 #include <gio/gunixinputstream.h>
34 #include <gio/gunixoutputstream.h>
35
36 #include <libtelepathy/tp-conn.h>
37 #include <libtelepathy/tp-helpers.h>
38 #include <libtelepathy/tp-props-iface.h>
39
40 #include <telepathy-glib/proxy-subclass.h>
41
42 #include "empathy-tp-file.h"
43 #include "empathy-contact-factory.h"
44 #include "empathy-marshal.h"
45 #include "empathy-time.h"
46 #include "empathy-utils.h"
47
48 #define DEBUG_FLAG EMPATHY_DEBUG_FT
49 #include "empathy-debug.h"
50
51 /**
52  * SECTION:empathy-tp-file
53  * @short_description: File channel
54  * @see_also: #EmpathyTpFile, #EmpathyContact, empathy_send_file()
55  * @include: libempthy/empathy-tp-file.h
56  *
57  * The #EmpathyTpFile object represents a Telepathy file channel.
58  */
59
60 /**
61  * EMPATHY_TP_FILE_UNKNOWN_SIZE:
62  *
63  * Value used for the "size" or "estimated-size" properties when the size of
64  * the transferred file is unknown.
65  */
66
67 static void empathy_tp_file_class_init (EmpathyTpFileClass *klass);
68 static void empathy_tp_file_init (EmpathyTpFile *tp_file);
69 static void tp_file_finalize (GObject *object);
70 static GObject *tp_file_constructor (GType type, guint n_props,
71     GObjectConstructParam *props);
72 static void tp_file_get_property (GObject *object, guint param_id,
73     GValue *value, GParamSpec *pspec);
74 static void tp_file_set_property (GObject *object, guint param_id,
75     const GValue *value, GParamSpec *pspec);
76 static void tp_file_destroy_cb (TpChannel *file_chan, EmpathyTpFile *tp_file);
77 static void tp_file_closed_cb (TpChannel *file_chan, EmpathyTpFile *tp_file,
78     GObject *weak_object);
79 static void tp_file_state_changed_cb (DBusGProxy *file_iface, guint state,
80     guint reason, EmpathyTpFile *tp_file);
81 static void tp_file_transferred_bytes_changed_cb (TpProxy *proxy,
82     guint64 count, EmpathyTpFile *tp_file, GObject *weak_object);
83 static void copy_stream (GInputStream *in, GOutputStream *out,
84     GCancellable *cancellable);
85
86 /* EmpathyTpFile object */
87
88 #define GET_PRIV(obj) (G_TYPE_INSTANCE_GET_PRIVATE ((obj), \
89            EMPATHY_TYPE_TP_FILE, EmpathyTpFilePriv))
90
91 typedef struct _EmpathyTpFilePriv  EmpathyTpFilePriv;
92
93 struct _EmpathyTpFilePriv {
94   EmpathyContactFactory *factory;
95   McAccount *account;
96   gchar *id;
97   MissionControl *mc;
98   TpChannel *channel;
99
100   EmpathyTpFile *cached_empathy_file;
101   EmpathyContact *contact;
102   GInputStream *in_stream;
103   GOutputStream *out_stream;
104   gboolean incoming;
105   gchar *filename;
106   EmpFileTransferState state;
107   EmpFileTransferStateChangeReason state_change_reason;
108   guint64 size;
109   guint64 transferred_bytes;
110   gint64 start_time;
111   gchar *unix_socket_path;
112   gchar *content_md5;
113   gchar *content_type;
114   gchar *description;
115   GCancellable *cancellable;
116 };
117
118 enum {
119   PROP_0,
120   PROP_ACCOUNT,
121   PROP_CHANNEL,
122   PROP_STATE,
123   PROP_INCOMING,
124   PROP_FILENAME,
125   PROP_SIZE,
126   PROP_CONTENT_TYPE,
127   PROP_TRANSFERRED_BYTES,
128   PROP_CONTENT_MD5,
129   PROP_IN_STREAM,
130 };
131
132 G_DEFINE_TYPE (EmpathyTpFile, empathy_tp_file, G_TYPE_OBJECT);
133
134 static void
135 empathy_tp_file_class_init (EmpathyTpFileClass *klass)
136 {
137   GObjectClass *object_class = G_OBJECT_CLASS (klass);
138
139   object_class->finalize = tp_file_finalize;
140   object_class->constructor = tp_file_constructor;
141   object_class->get_property = tp_file_get_property;
142   object_class->set_property = tp_file_set_property;
143
144   /* Construct-only properties */
145   g_object_class_install_property (object_class,
146       PROP_ACCOUNT,
147       g_param_spec_object ("account",
148           "channel Account",
149           "The account associated with the channel",
150           MC_TYPE_ACCOUNT,
151           G_PARAM_READWRITE |
152           G_PARAM_CONSTRUCT_ONLY));
153
154   g_object_class_install_property (object_class,
155       PROP_CHANNEL,
156       g_param_spec_object ("channel",
157           "telepathy channel",
158           "The file transfer channel",
159           TP_TYPE_CHANNEL,
160           G_PARAM_READWRITE |
161           G_PARAM_CONSTRUCT_ONLY));
162
163   g_object_class_install_property (object_class,
164       PROP_STATE,
165       g_param_spec_uint ("state",
166           "state of the transfer",
167           "The file transfer state",
168           0,
169           G_MAXUINT,
170           G_MAXUINT,
171           G_PARAM_READWRITE |
172           G_PARAM_CONSTRUCT));
173
174   g_object_class_install_property (object_class,
175       PROP_INCOMING,
176       g_param_spec_boolean ("incoming",
177           "incoming",
178           "Whether the transfer is incoming",
179           FALSE,
180           G_PARAM_READWRITE |
181           G_PARAM_CONSTRUCT));
182
183   g_object_class_install_property (object_class,
184       PROP_FILENAME,
185       g_param_spec_string ("filename",
186           "name of the transfer",
187           "The file transfer filename",
188           "",
189           G_PARAM_READWRITE));
190
191   g_object_class_install_property (object_class,
192       PROP_SIZE,
193       g_param_spec_uint64 ("size",
194           "size of the file",
195           "The file transfer size",
196           0,
197           G_MAXUINT64,
198           G_MAXUINT64,
199           G_PARAM_READWRITE));
200
201   g_object_class_install_property (object_class,
202       PROP_CONTENT_TYPE,
203       g_param_spec_string ("content-type",
204           "file transfer content-type",
205           "The file transfer content-type",
206           "",
207           G_PARAM_READWRITE));
208
209   g_object_class_install_property (object_class,
210       PROP_CONTENT_MD5,
211       g_param_spec_string ("content-md5",
212           "file transfer md5sum",
213           "The md5 sum of the transfer's contents",
214           "",
215           G_PARAM_READWRITE));
216
217   g_object_class_install_property (object_class,
218       PROP_TRANSFERRED_BYTES,
219       g_param_spec_uint64 ("transferred-bytes",
220           "bytes transferred",
221           "The number of bytes transferred",
222           0,
223           G_MAXUINT64,
224           0,
225           G_PARAM_READWRITE));
226
227   g_object_class_install_property (object_class,
228       PROP_IN_STREAM,
229       g_param_spec_object ("in-stream",
230           "transfer input stream",
231           "The input stream for file transfer",
232           G_TYPE_INPUT_STREAM,
233           G_PARAM_READWRITE));
234
235   g_type_class_add_private (object_class, sizeof (EmpathyTpFilePriv));
236 }
237
238 static void
239 empathy_tp_file_init (EmpathyTpFile *tp_file)
240 {
241 }
242
243 static void
244 tp_file_finalize (GObject *object)
245 {
246   EmpathyTpFilePriv *priv;
247   EmpathyTpFile *tp_file;
248
249   tp_file = EMPATHY_TP_FILE (object);
250   priv = GET_PRIV (tp_file);
251
252   if (priv->channel)
253     {
254       DEBUG ("Closing channel..");
255       g_signal_handlers_disconnect_by_func (priv->channel,
256           tp_file_destroy_cb, object);
257       tp_cli_channel_run_close (priv->channel, -1, NULL, NULL);
258       if (G_IS_OBJECT (priv->channel))
259         g_object_unref (priv->channel);
260     }
261
262   if (priv->factory)
263     {
264       g_object_unref (priv->factory);
265     }
266   if (priv->account)
267     {
268       g_object_unref (priv->account);
269     }
270   if (priv->mc)
271     {
272       g_object_unref (priv->mc);
273     }
274
275   g_free (priv->id);
276   g_free (priv->filename);
277   g_free (priv->unix_socket_path);
278   g_free (priv->description);
279   g_free (priv->content_md5);
280   g_free (priv->content_type);
281
282   if (priv->in_stream)
283     g_object_unref (priv->in_stream);
284
285   if (priv->out_stream)
286     g_object_unref (priv->out_stream);
287
288   if (priv->contact)
289     g_object_unref (priv->contact);
290
291   if (priv->cancellable)
292     g_object_unref (priv->cancellable);
293
294   G_OBJECT_CLASS (empathy_tp_file_parent_class)->finalize (object);
295 }
296
297 static GObject *
298 tp_file_constructor (GType type,
299                      guint n_props,
300                      GObjectConstructParam *props)
301 {
302   GObject *tp_file;
303   EmpathyTpFilePriv *priv;
304   GError *error = NULL;
305   GHashTable *properties;
306   TpHandle handle;
307
308   tp_file = G_OBJECT_CLASS (empathy_tp_file_parent_class)->constructor (type,
309       n_props, props);
310
311   priv = GET_PRIV (tp_file);
312
313   priv->factory = empathy_contact_factory_new ();
314   priv->mc = empathy_mission_control_new ();
315
316   tp_cli_channel_connect_to_closed (priv->channel,
317       (tp_cli_channel_signal_callback_closed) tp_file_closed_cb,
318       tp_file,
319       NULL, NULL, NULL);
320
321   emp_cli_channel_type_file_connect_to_file_transfer_state_changed (
322       TP_PROXY (priv->channel),
323       (emp_cli_channel_type_file_signal_callback_file_transfer_state_changed)
324           tp_file_state_changed_cb,
325       tp_file,
326       NULL, NULL, NULL);
327
328   emp_cli_channel_type_file_connect_to_transferred_bytes_changed (
329       TP_PROXY (priv->channel),
330       (emp_cli_channel_type_file_signal_callback_transferred_bytes_changed)
331           tp_file_transferred_bytes_changed_cb,
332       tp_file,
333       NULL, NULL, NULL);
334
335
336   handle = tp_channel_get_handle (priv->channel, NULL);
337   priv->contact = empathy_contact_factory_get_from_handle (priv->factory,
338       priv->account,
339       (guint) handle);
340
341   if (!tp_cli_dbus_properties_run_get_all (priv->channel,
342       -1, EMP_IFACE_CHANNEL_TYPE_FILE, &properties, &error, NULL))
343     {
344       DEBUG ("Failed to get properties: %s",
345           error ? error->message : "No error given");
346       g_clear_error (&error);
347       return NULL;
348     }
349
350   priv->size = g_value_get_uint64 (g_hash_table_lookup (properties, "Size"));
351
352   priv->state = g_value_get_uint (g_hash_table_lookup (properties, "State"));
353
354   /* Invalid reason, so empathy_file_get_state_change_reason() can give
355    * a warning if called for a not closed file transfer. */
356   priv->state_change_reason = -1;
357
358   priv->transferred_bytes = g_value_get_uint64 (g_hash_table_lookup (
359       properties, "TransferredBytes"));
360
361   priv->filename = g_value_dup_string (g_hash_table_lookup (properties,
362       "Filename"));
363
364   priv->content_md5 = g_value_dup_string (g_hash_table_lookup (properties,
365       "ContentMD5"));
366
367   priv->description = g_value_dup_string (g_hash_table_lookup (properties,
368       "Description"));
369
370   priv->unix_socket_path = g_value_dup_string (g_hash_table_lookup (properties,
371       "SocketPath"));
372
373   if (priv->state == EMP_FILE_TRANSFER_STATE_LOCAL_PENDING)
374     priv->incoming = TRUE;
375
376   g_hash_table_destroy (properties);
377
378   return tp_file;
379 }
380
381 static void
382 tp_file_get_property (GObject *object,
383                       guint param_id,
384                       GValue *value,
385                       GParamSpec *pspec)
386 {
387   EmpathyTpFilePriv *priv;
388   EmpathyTpFile *tp_file;
389
390   priv = GET_PRIV (object);
391   tp_file = EMPATHY_TP_FILE (object);
392
393   switch (param_id)
394     {
395       case PROP_ACCOUNT:
396         g_value_set_object (value, priv->account);
397         break;
398       case PROP_CHANNEL:
399         g_value_set_object (value, priv->channel);
400         break;
401       default:
402         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
403         break;
404     };
405 }
406
407 static void
408 tp_file_channel_set_dbus_property (gpointer proxy,
409                                    const gchar *property,
410                                    const GValue *value)
411 {
412         DEBUG ("Setting %s property", property);
413         tp_cli_dbus_properties_run_set (TP_PROXY (proxy),
414             -1,
415             EMP_IFACE_CHANNEL_TYPE_FILE,
416             property,
417             value,
418             NULL, NULL);
419         DEBUG ("done");
420 }
421
422
423 static void
424 tp_file_set_property (GObject *object,
425                       guint param_id,
426                       const GValue *value,
427                       GParamSpec *pspec)
428 {
429   EmpathyTpFilePriv *priv;
430
431   priv = GET_PRIV (object);
432
433   switch (param_id)
434     {
435       case PROP_ACCOUNT:
436         priv->account = g_object_ref (g_value_get_object (value));
437         break;
438       case PROP_CHANNEL:
439         priv->channel = g_object_ref (g_value_get_object (value));
440         break;
441       case PROP_STATE:
442         priv->state = g_value_get_uint (value);
443         break;
444       case PROP_INCOMING:
445         priv->incoming = g_value_get_boolean (value);
446         break;
447       case PROP_FILENAME:
448         g_free (priv->filename);
449         priv->filename = g_value_dup_string (value);
450         tp_file_channel_set_dbus_property (priv->channel, "Filename", value);
451         break;
452       case PROP_SIZE:
453         priv->size = g_value_get_uint64 (value);
454         tp_file_channel_set_dbus_property (priv->channel, "Size", value);
455         break;
456       case PROP_CONTENT_TYPE:
457         tp_file_channel_set_dbus_property (priv->channel, "ContentType", value);
458         g_free (priv->content_type);
459         priv->content_type = g_value_dup_string (value);
460         break;
461       case PROP_CONTENT_MD5:
462         tp_file_channel_set_dbus_property (priv->channel, "ContentMD5", value);
463         g_free (priv->content_md5);
464         priv->content_md5 = g_value_dup_string (value);
465         break;
466       case PROP_IN_STREAM:
467         if (priv->in_stream)
468           g_object_unref (priv->in_stream);
469         priv->in_stream = g_object_ref (g_value_get_object (value));
470         break;
471       default:
472         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
473         break;
474     };
475 }
476
477 /**
478  * empathy_tp_file_new:
479  * @account: the #McAccount for the channel
480  * @channel: a Telepathy channel
481  *
482  * Creates a new #EmpathyTpFile wrapping @channel.
483  *
484  * Returns: a new #EmpathyTpFile
485  */
486 EmpathyTpFile *
487 empathy_tp_file_new (McAccount *account,
488                   TpChannel *channel)
489 {
490   return g_object_new (EMPATHY_TYPE_TP_FILE,
491       "account", account,
492       "channel", channel,
493       NULL);
494 }
495
496 /**
497  * empathy_tp_file_get_id:
498  * @tp_file: an #EmpathyTpFile
499  *
500  * Returns the ID of @tp_file.
501  *
502  * Returns: the ID
503  */
504 const gchar *
505 empathy_tp_file_get_id (EmpathyTpFile *tp_file)
506 {
507   EmpathyTpFilePriv *priv;
508
509   g_return_val_if_fail (EMPATHY_IS_TP_FILE (tp_file), NULL);
510
511   priv = GET_PRIV (tp_file);
512
513   return priv->id;
514 }
515
516 /**
517  * empathy_tp_file_get_channel
518  * @tp_file: an #EmpathyTpFile
519  *
520  * Returns the Telepathy file transfer channel
521  *
522  * Returns: the #TpChannel
523  */
524 TpChannel *
525 empathy_tp_file_get_channel (EmpathyTpFile *tp_file)
526 {
527   EmpathyTpFilePriv *priv;
528
529   g_return_val_if_fail (EMPATHY_IS_TP_FILE (tp_file), NULL);
530
531   priv = GET_PRIV (tp_file);
532
533   return priv->channel;
534 }
535
536 static void
537 tp_file_destroy_cb (TpChannel *file_channel,
538                     EmpathyTpFile *tp_file)
539 {
540   EmpathyTpFilePriv *priv;
541
542   priv = GET_PRIV (tp_file);
543
544   DEBUG ("Channel Closed or CM crashed");
545
546   g_object_unref (priv->channel);
547   priv->channel = NULL;
548 }
549
550 static void
551 tp_file_closed_cb (TpChannel *file_channel,
552                    EmpathyTpFile *tp_file,
553                    GObject *weak_object)
554 {
555   EmpathyTpFilePriv *priv;
556
557   priv = GET_PRIV (tp_file);
558
559   /* The channel is closed, do just like if the proxy was destroyed */
560   g_signal_handlers_disconnect_by_func (priv->channel,
561       tp_file_destroy_cb,
562       tp_file);
563   tp_file_destroy_cb (file_channel, tp_file);
564 }
565
566 static gint64
567 get_time_msec (void)
568 {
569   GTimeVal tv;
570
571   g_get_current_time (&tv);
572   return ((gint64) tv.tv_sec) * 1000 + tv.tv_usec / 1000;
573 }
574
575 static gint
576 _get_local_socket (EmpathyTpFile *tp_file)
577 {
578   gint fd;
579   size_t path_len;
580   struct sockaddr_un addr;
581   EmpathyTpFilePriv *priv;
582   GValue *socket_path;
583
584   priv = GET_PRIV (tp_file);
585
586   /* TODO: This could probably be a little nicer. */
587   tp_cli_dbus_properties_run_get (priv->channel,
588       -1,
589       EMP_IFACE_CHANNEL_TYPE_FILE,
590       "SocketPath",
591       &socket_path,
592       NULL,
593       NULL);
594
595   if (priv->unix_socket_path)
596     g_free (priv->unix_socket_path);
597
598   priv->unix_socket_path = g_value_dup_string (socket_path);
599   g_value_unset (socket_path);
600
601   if (G_STR_EMPTY (priv->unix_socket_path))
602     return -1;
603
604   fd = socket (PF_UNIX, SOCK_STREAM, 0);
605   if (fd < 0)
606     return -1;
607
608   memset (&addr, 0, sizeof (addr));
609   addr.sun_family = AF_UNIX;
610   path_len = strlen (priv->unix_socket_path);
611   strncpy (addr.sun_path, priv->unix_socket_path, path_len);
612
613   if (connect (fd, (struct sockaddr*) &addr,
614       sizeof (addr)) < 0)
615     {
616       close (fd);
617       return -1;
618     }
619
620   return fd;
621 }
622
623 /**
624  * empathy_tp_file_accept:
625  * @tp_file: an #EmpathyTpFile
626  *
627  * Accepts a file transfer that's in the "local pending" state (i.e.
628  * EMP_FILE_TRANSFER_STATE_LOCAL_PENDING).
629  */
630 void
631 empathy_tp_file_accept (EmpathyTpFile *tp_file)
632 {
633   EmpathyTpFilePriv *priv;
634   GValue *address;
635   GValue nothing = { 0 };
636   GError *error = NULL;
637
638   g_return_if_fail (EMPATHY_IS_TP_FILE (tp_file));
639
640   priv = GET_PRIV (tp_file);
641
642   g_return_if_fail (priv->out_stream != NULL);
643
644   DEBUG ("Accepting file: filename=%s", priv->filename);
645
646   g_value_init (&nothing, G_TYPE_STRING);
647   g_value_set_string (&nothing, "");
648
649   if (!emp_cli_channel_type_file_run_accept_file (TP_PROXY (priv->channel),
650       -1, TP_SOCKET_ADDRESS_TYPE_UNIX, TP_SOCKET_ACCESS_CONTROL_LOCALHOST,
651       &nothing, &address, &error, NULL))
652     {
653       DEBUG ("Accept error: %s",
654           error ? error->message : "No message given");
655       g_clear_error (&error);
656     }
657
658   if (priv->unix_socket_path)
659     g_free (priv->unix_socket_path);
660
661   priv->unix_socket_path = g_value_dup_string (address);
662   g_value_unset (address);
663
664   DEBUG ("Got unix socket path: %s", priv->unix_socket_path);
665 }
666
667 static void
668 receive_tp_file (EmpathyTpFile *tp_file)
669 {
670   EmpathyTpFilePriv *priv;
671   GInputStream *socket_stream;
672   gint socket_fd;
673
674   priv = GET_PRIV (tp_file);
675
676   socket_fd = _get_local_socket (tp_file);
677
678   if (socket_fd < 0)
679     return;
680
681   socket_stream = g_unix_input_stream_new (socket_fd, TRUE);
682
683   priv->cancellable = g_cancellable_new ();
684
685   copy_stream (socket_stream, priv->out_stream, priv->cancellable);
686
687   g_object_unref (socket_stream);
688 }
689
690
691 static void
692 send_tp_file (EmpathyTpFile *tp_file)
693 {
694   gint socket_fd;
695   GOutputStream *socket_stream;
696   EmpathyTpFilePriv *priv;
697
698   priv = GET_PRIV (tp_file);
699
700   DEBUG ("Sending file content: filename=%s",
701            priv->filename);
702
703   g_return_if_fail (priv->in_stream);
704
705   socket_fd = _get_local_socket (tp_file);
706   if (socket_fd < 0)
707     {
708       DEBUG ("failed to get local socket fd");
709       return;
710     }
711   DEBUG ("got local socket fd");
712   socket_stream = g_unix_output_stream_new (socket_fd, TRUE);
713
714   priv->cancellable = g_cancellable_new ();
715
716   copy_stream (priv->in_stream, socket_stream, priv->cancellable);
717
718   g_object_unref (socket_stream);
719 }
720
721 static void
722 tp_file_state_changed_cb (DBusGProxy *tp_file_iface,
723                           EmpFileTransferState state,
724                           EmpFileTransferStateChangeReason reason,
725                           EmpathyTpFile *tp_file)
726 {
727   EmpathyTpFilePriv *priv;
728
729   priv = GET_PRIV (tp_file);
730
731   DEBUG ("File transfer state changed: filename=%s, "
732       "old state=%u, state=%u, reason=%u",
733       priv->filename, priv->state, state, reason);
734
735   if (state == EMP_FILE_TRANSFER_STATE_OPEN)
736     priv->start_time = get_time_msec ();
737
738   DEBUG ("state = %u, incoming = %s, in_stream = %s, out_stream = %s",
739       state, priv->incoming ? "yes" : "no",
740       priv->in_stream ? "present" : "not present",
741       priv->out_stream ? "present" : "not present");
742
743   if (state == EMP_FILE_TRANSFER_STATE_OPEN && !priv->incoming &&
744       priv->in_stream)
745     send_tp_file (tp_file);
746   else if (state == EMP_FILE_TRANSFER_STATE_OPEN && priv->incoming &&
747       priv->out_stream)
748       receive_tp_file (tp_file);
749
750   priv->state = state;
751   priv->state_change_reason = reason;
752
753   g_object_notify (G_OBJECT (tp_file), "state");
754 }
755
756 static void
757 tp_file_transferred_bytes_changed_cb (TpProxy *proxy,
758                                       guint64 count,
759                                       EmpathyTpFile *tp_file,
760                                       GObject *weak_object)
761 {
762   EmpathyTpFilePriv *priv;
763
764   priv = GET_PRIV (tp_file);
765
766   if (priv->transferred_bytes == count)
767     return;
768
769   priv->transferred_bytes = count;
770
771   g_object_notify (G_OBJECT (tp_file), "transferred-bytes");
772 }
773
774 EmpathyContact *
775 empathy_tp_file_get_contact (EmpathyTpFile *tp_file)
776 {
777   EmpathyTpFilePriv *priv;
778
779   priv = GET_PRIV (tp_file);
780
781   return priv->contact;
782 }
783
784 GInputStream *
785 empathy_tp_file_get_input_stream (EmpathyTpFile *tp_file)
786 {
787   EmpathyTpFilePriv *priv;
788
789   priv = GET_PRIV (tp_file);
790
791   return priv->in_stream;
792 }
793
794 GOutputStream *
795 empathy_tp_file_get_output_stream (EmpathyTpFile *tp_file)
796 {
797   EmpathyTpFilePriv *priv;
798
799   priv = GET_PRIV (tp_file);
800
801   return priv->out_stream;
802 }
803
804 const gchar *
805 empathy_tp_file_get_filename (EmpathyTpFile *tp_file)
806 {
807   EmpathyTpFilePriv *priv;
808
809   priv = GET_PRIV (tp_file);
810
811   return priv->filename;
812 }
813
814 gboolean
815 empathy_tp_file_get_incoming (EmpathyTpFile *tp_file)
816 {
817   EmpathyTpFilePriv *priv;
818
819   priv = GET_PRIV (tp_file);
820
821   return priv->incoming;
822 }
823
824 EmpFileTransferState
825 empathy_tp_file_get_state (EmpathyTpFile *tp_file)
826 {
827   EmpathyTpFilePriv *priv;
828
829   priv = GET_PRIV (tp_file);
830
831   return priv->state;
832 }
833
834 EmpFileTransferStateChangeReason
835 empathy_tp_file_get_state_change_reason (EmpathyTpFile *tp_file)
836 {
837   EmpathyTpFilePriv *priv;
838
839   priv = GET_PRIV (tp_file);
840
841   g_return_val_if_fail (priv->state_change_reason >= 0,
842       EMP_FILE_TRANSFER_STATE_CHANGE_REASON_NONE);
843
844   return priv->state_change_reason;
845 }
846
847 guint64
848 empathy_tp_file_get_size (EmpathyTpFile *tp_file)
849 {
850   EmpathyTpFilePriv *priv;
851
852   priv = GET_PRIV (tp_file);
853
854   return priv->size;
855 }
856
857 guint64
858 empathy_tp_file_get_transferred_bytes (EmpathyTpFile *tp_file)
859 {
860   EmpathyTpFilePriv *priv;
861
862   priv = GET_PRIV (tp_file);
863
864   return priv->transferred_bytes;
865 }
866
867 gint
868 empathy_tp_file_get_remaining_time (EmpathyTpFile *tp_file)
869 {
870   EmpathyTpFilePriv *priv;
871   gint64 curr_time, elapsed_time;
872   gdouble time_per_byte;
873   gdouble remaining_time;
874
875   priv = GET_PRIV (tp_file);
876
877   if (priv->size == EMPATHY_TP_FILE_UNKNOWN_SIZE)
878     return -1;
879
880   if (priv->transferred_bytes == priv->size)
881     return 0;
882
883   curr_time = get_time_msec ();
884   elapsed_time = curr_time - priv->start_time;
885   time_per_byte = (gdouble) elapsed_time / (gdouble) priv->transferred_bytes;
886   remaining_time = (time_per_byte * (priv->size - priv->transferred_bytes)) / 1000;
887
888   return (gint) (remaining_time + 0.5);
889 }
890
891 void
892 empathy_tp_file_cancel (EmpathyTpFile *tp_file)
893 {
894   EmpathyTpFilePriv *priv;
895
896   priv = GET_PRIV (tp_file);
897
898   tp_cli_channel_run_close (priv->channel, -1, NULL, NULL);
899
900   g_cancellable_cancel (priv->cancellable);
901 }
902
903 void
904 empathy_tp_file_set_input_stream (EmpathyTpFile *tp_file,
905                                   GInputStream *in_stream)
906 {
907   EmpathyTpFilePriv *priv;
908
909   priv = GET_PRIV (tp_file);
910
911   if (priv->in_stream == in_stream)
912     return;
913
914   if (priv->incoming)
915     g_warning ("Setting an input stream for incoming file "
916          "transfers is useless");
917
918   if (priv->in_stream)
919     g_object_unref (priv->in_stream);
920
921   if (in_stream)
922     g_object_ref (in_stream);
923
924   priv->in_stream = in_stream;
925
926   g_object_notify (G_OBJECT (tp_file), "in-stream");
927 }
928
929 void
930 empathy_tp_file_set_output_stream (EmpathyTpFile *tp_file,
931                                    GOutputStream *out_stream)
932 {
933   EmpathyTpFilePriv *priv;
934
935   priv = GET_PRIV (tp_file);
936
937   if (priv->out_stream == out_stream)
938     return;
939
940   if (!priv->incoming)
941     g_warning ("Setting an output stream for outgoing file "
942          "transfers is useless");
943
944   if (priv->out_stream)
945     g_object_unref (priv->out_stream);
946
947   if (out_stream)
948     g_object_ref (out_stream);
949
950   priv->out_stream = out_stream;
951 }
952
953 void
954 empathy_tp_file_set_filename (EmpathyTpFile *tp_file,
955                               const gchar *filename)
956 {
957   EmpathyTpFilePriv *priv;
958
959   priv = GET_PRIV (tp_file);
960
961   g_return_if_fail (filename != NULL);
962
963   if (priv->filename && strcmp (filename, priv->filename) == 0)
964     return;
965
966   g_free (priv->filename);
967   priv->filename = g_strdup (filename);
968
969   g_object_notify (G_OBJECT (tp_file), "filename");
970 }
971
972 /* Functions to copy the content of a GInputStream to a GOutputStream */
973
974 #define N_BUFFERS 2
975 #define BUFFER_SIZE 4096
976
977 typedef struct {
978   GInputStream *in;
979   GOutputStream *out;
980   GCancellable  *cancellable;
981   char *buff[N_BUFFERS]; /* the temporary buffers */
982   gsize count[N_BUFFERS]; /* how many bytes are used in the buffers */
983   gboolean is_full[N_BUFFERS]; /* whether the buffers contain data */
984   gint curr_read; /* index of the buffer used for reading */
985   gint curr_write; /* index of the buffer used for writing */
986   gboolean is_reading; /* we are reading */
987   gboolean is_writing; /* we are writing */
988   guint n_closed; /* number of streams that have been closed */
989 } CopyData;
990
991 static void schedule_next (CopyData *copy);
992
993 static void
994 free_copy_data_if_closed (CopyData *copy)
995 {
996   gint i;
997
998   /* Free the data only if both the input and output streams have
999    * been closed. */
1000   copy->n_closed++;
1001   if (copy->n_closed < 2)
1002     return;
1003
1004   if (copy->in != NULL)
1005     g_object_unref (copy->in);
1006
1007   if (copy->out != NULL)
1008     g_object_unref (copy->out);
1009
1010   for (i = 0; i < N_BUFFERS; i++)
1011     g_free (copy->buff[i]);
1012
1013   g_object_unref (copy->cancellable);
1014   g_free (copy);
1015 }
1016
1017 static void
1018 io_error (CopyData *copy,
1019           GError *error)
1020 {
1021   g_cancellable_cancel (copy->cancellable);
1022
1023   if (error == NULL)
1024     g_warning ("I/O error");
1025   else if (error->domain == G_IO_ERROR && error->code == G_IO_ERROR_CANCELLED)
1026     ; /* Ignore cancellations */
1027   else
1028     g_warning ("I/O error: %d: %s\n", error->code, error->message);
1029
1030   if (copy->in != NULL)
1031     g_input_stream_close (copy->in, NULL, NULL);
1032
1033   if (copy->out != NULL)
1034     g_output_stream_close (copy->out, NULL, NULL);
1035
1036   free_copy_data_if_closed (copy);
1037 }
1038
1039 static void
1040 close_done (GObject *source_object,
1041             GAsyncResult *res,
1042             gpointer user_data)
1043 {
1044   CopyData *copy = user_data;
1045
1046   g_object_unref (source_object);
1047   free_copy_data_if_closed (copy);
1048 }
1049
1050 static void
1051 write_done_cb (GObject *source_object,
1052                GAsyncResult *res,
1053                gpointer user_data)
1054 {
1055   CopyData *copy = user_data;
1056   gssize count_write;
1057   GError *error = NULL;
1058
1059   count_write = g_output_stream_write_finish (copy->out, res, &error);
1060
1061   if (count_write <= 0)
1062     {
1063       io_error (copy, error);
1064       g_error_free (error);
1065       return;
1066     }
1067
1068   copy->is_full[copy->curr_write] = FALSE;
1069   copy->curr_write = (copy->curr_write + 1) % N_BUFFERS;
1070   copy->is_writing = FALSE;
1071
1072   schedule_next (copy);
1073 }
1074
1075 static void
1076 read_done_cb (GObject *source_object,
1077               GAsyncResult *res,
1078               gpointer user_data)
1079 {
1080   CopyData *copy = user_data;
1081   gssize count_read;
1082   GError *error = NULL;
1083
1084   count_read = g_input_stream_read_finish (copy->in, res, &error);
1085
1086   if (count_read == 0)
1087     {
1088       g_input_stream_close_async (copy->in, 0, copy->cancellable,
1089           close_done, copy);
1090       copy->in = NULL;
1091     }
1092   else if (count_read < 0)
1093     {
1094       io_error (copy, error);
1095       g_error_free (error);
1096       return;
1097     }
1098
1099   copy->count[copy->curr_read] = count_read;
1100   copy->is_full[copy->curr_read] = TRUE;
1101   copy->curr_read = (copy->curr_read + 1) % N_BUFFERS;
1102   copy->is_reading = FALSE;
1103
1104   schedule_next (copy);
1105 }
1106
1107 static void
1108 schedule_next (CopyData *copy)
1109 {
1110   if (copy->in != NULL &&
1111       !copy->is_reading &&
1112       !copy->is_full[copy->curr_read])
1113     {
1114       /* We are not reading and the current buffer is empty, so
1115        * start an async read. */
1116       copy->is_reading = TRUE;
1117       g_input_stream_read_async (copy->in,
1118           copy->buff[copy->curr_read],
1119           BUFFER_SIZE, 0, copy->cancellable,
1120           read_done_cb, copy);
1121     }
1122
1123   if (!copy->is_writing &&
1124       copy->is_full[copy->curr_write])
1125     {
1126       if (copy->count[copy->curr_write] == 0)
1127         {
1128           /* The last read on the buffer read 0 bytes, this
1129            * means that we got an EOF, so we can close
1130            * the output channel. */
1131           g_output_stream_close_async (copy->out, 0,
1132               copy->cancellable,
1133               close_done, copy);
1134       copy->out = NULL;
1135         }
1136       else
1137         {
1138           /* We are not writing and the current buffer contains
1139            * data, so start an async write. */
1140           copy->is_writing = TRUE;
1141           g_output_stream_write_async (copy->out,
1142               copy->buff[copy->curr_write],
1143               copy->count[copy->curr_write],
1144               0, copy->cancellable,
1145               write_done_cb, copy);
1146         }
1147     }
1148 }
1149
1150 static void
1151 copy_stream (GInputStream *in,
1152              GOutputStream *out,
1153              GCancellable *cancellable)
1154 {
1155   CopyData *copy;
1156   gint i;
1157
1158   g_return_if_fail (in != NULL);
1159   g_return_if_fail (out != NULL);
1160
1161   copy = g_new0 (CopyData, 1);
1162   copy->in = g_object_ref (in);
1163   copy->out = g_object_ref (out);
1164
1165   if (cancellable != NULL)
1166     copy->cancellable = g_object_ref (cancellable);
1167   else
1168     copy->cancellable = g_cancellable_new ();
1169
1170   for (i = 0; i < N_BUFFERS; i++)
1171     copy->buff[i] = g_malloc (BUFFER_SIZE);
1172
1173   schedule_next (copy);
1174 }