]> git.0d.be Git - empathy.git/blob - libempathy/empathy-tp-file.c
Changed _run_ method calls to _call_ calls. (Jonny Lamb)
[empathy.git] / libempathy / empathy-tp-file.c
1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * Copyright (C) 2007-2008 Collabora Ltd.
4  * Copyright (C) 2007 Marco Barisione <marco@barisione.org>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
19  *
20  * Authors: Marco Barisione <marco@barisione.org>
21  *          Jonny Lamb <jonny.lamb@collabora.co.uk>
22  */
23
24 #include <config.h>
25
26 #include <string.h>
27 #include <unistd.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <sys/un.h>
31
32 #include <glib/gi18n.h>
33
34 #include <gio/gio.h>
35 #include <gio/gunixinputstream.h>
36 #include <gio/gunixoutputstream.h>
37
38 #include <libtelepathy/tp-conn.h>
39 #include <libtelepathy/tp-helpers.h>
40 #include <libtelepathy/tp-props-iface.h>
41
42 #include <telepathy-glib/proxy-subclass.h>
43
44 #include "empathy-tp-file.h"
45 #include "empathy-contact-factory.h"
46 #include "empathy-marshal.h"
47 #include "empathy-time.h"
48 #include "empathy-utils.h"
49
50 #define DEBUG_FLAG EMPATHY_DEBUG_FT
51 #include "empathy-debug.h"
52
53 /**
54  * SECTION:empathy-tp-file
55  * @short_description: File channel
56  * @see_also: #EmpathyTpFile, #EmpathyContact, empathy_send_file()
57  * @include: libempthy/empathy-tp-file.h
58  *
59  * The #EmpathyTpFile object represents a Telepathy file channel.
60  */
61
62 /**
63  * EMPATHY_TP_FILE_UNKNOWN_SIZE:
64  *
65  * Value used for the "size" or "estimated-size" properties when the size of
66  * the transferred file is unknown.
67  */
68
69 static void empathy_tp_file_class_init (EmpathyTpFileClass *klass);
70 static void empathy_tp_file_init (EmpathyTpFile *tp_file);
71 static void tp_file_finalize (GObject *object);
72 static GObject *tp_file_constructor (GType type, guint n_props,
73     GObjectConstructParam *props);
74 static void tp_file_get_property (GObject *object, guint param_id,
75     GValue *value, GParamSpec *pspec);
76 static void tp_file_set_property (GObject *object, guint param_id,
77     const GValue *value, GParamSpec *pspec);
78 static void tp_file_destroy_cb (TpChannel *file_chan, EmpathyTpFile *tp_file);
79 static void tp_file_closed_cb (TpChannel *file_chan, EmpathyTpFile *tp_file,
80     GObject *weak_object);
81 static void tp_file_state_changed_cb (DBusGProxy *file_iface, guint state,
82     guint reason, EmpathyTpFile *tp_file);
83 static void tp_file_transferred_bytes_changed_cb (TpProxy *proxy,
84     guint64 count, EmpathyTpFile *tp_file, GObject *weak_object);
85 static void copy_stream (GInputStream *in, GOutputStream *out,
86     GCancellable *cancellable);
87
88 /* EmpathyTpFile object */
89
90 #define GET_PRIV(obj) (G_TYPE_INSTANCE_GET_PRIVATE ((obj), \
91            EMPATHY_TYPE_TP_FILE, EmpathyTpFilePriv))
92
93 typedef struct _EmpathyTpFilePriv  EmpathyTpFilePriv;
94
95 struct _EmpathyTpFilePriv {
96   EmpathyContactFactory *factory;
97   McAccount *account;
98   gchar *id;
99   MissionControl *mc;
100   TpChannel *channel;
101
102   EmpathyTpFile *cached_empathy_file;
103   EmpathyContact *contact;
104   GInputStream *in_stream;
105   GOutputStream *out_stream;
106   gboolean incoming;
107   gchar *filename;
108   EmpFileTransferState state;
109   EmpFileTransferStateChangeReason state_change_reason;
110   guint64 size;
111   guint64 transferred_bytes;
112   gint64 start_time;
113   gchar *unix_socket_path;
114   gchar *content_hash;
115   EmpFileHashType content_hash_type;
116   gchar *content_type;
117   gchar *description;
118   GCancellable *cancellable;
119 };
120
121 enum {
122   PROP_0,
123   PROP_ACCOUNT,
124   PROP_CHANNEL,
125   PROP_STATE,
126   PROP_INCOMING,
127   PROP_FILENAME,
128   PROP_SIZE,
129   PROP_CONTENT_TYPE,
130   PROP_TRANSFERRED_BYTES,
131   PROP_CONTENT_HASH_TYPE,
132   PROP_CONTENT_HASH,
133   PROP_IN_STREAM,
134 };
135
136 G_DEFINE_TYPE (EmpathyTpFile, empathy_tp_file, G_TYPE_OBJECT);
137
138 static void
139 empathy_tp_file_class_init (EmpathyTpFileClass *klass)
140 {
141   GObjectClass *object_class = G_OBJECT_CLASS (klass);
142
143   object_class->finalize = tp_file_finalize;
144   object_class->constructor = tp_file_constructor;
145   object_class->get_property = tp_file_get_property;
146   object_class->set_property = tp_file_set_property;
147
148   /* Construct-only properties */
149   g_object_class_install_property (object_class,
150       PROP_ACCOUNT,
151       g_param_spec_object ("account",
152           "channel Account",
153           "The account associated with the channel",
154           MC_TYPE_ACCOUNT,
155           G_PARAM_READWRITE |
156           G_PARAM_CONSTRUCT_ONLY));
157
158   g_object_class_install_property (object_class,
159       PROP_CHANNEL,
160       g_param_spec_object ("channel",
161           "telepathy channel",
162           "The file transfer channel",
163           TP_TYPE_CHANNEL,
164           G_PARAM_READWRITE |
165           G_PARAM_CONSTRUCT_ONLY));
166
167   g_object_class_install_property (object_class,
168       PROP_STATE,
169       g_param_spec_uint ("state",
170           "state of the transfer",
171           "The file transfer state",
172           0,
173           G_MAXUINT,
174           G_MAXUINT,
175           G_PARAM_READWRITE |
176           G_PARAM_CONSTRUCT));
177
178   g_object_class_install_property (object_class,
179       PROP_INCOMING,
180       g_param_spec_boolean ("incoming",
181           "incoming",
182           "Whether the transfer is incoming",
183           FALSE,
184           G_PARAM_READWRITE |
185           G_PARAM_CONSTRUCT));
186
187   g_object_class_install_property (object_class,
188       PROP_FILENAME,
189       g_param_spec_string ("filename",
190           "name of the transfer",
191           "The file transfer filename",
192           "",
193           G_PARAM_READWRITE));
194
195   g_object_class_install_property (object_class,
196       PROP_SIZE,
197       g_param_spec_uint64 ("size",
198           "size of the file",
199           "The file transfer size",
200           0,
201           G_MAXUINT64,
202           G_MAXUINT64,
203           G_PARAM_READWRITE));
204
205   g_object_class_install_property (object_class,
206       PROP_CONTENT_TYPE,
207       g_param_spec_string ("content-type",
208           "file transfer content-type",
209           "The file transfer content-type",
210           "",
211           G_PARAM_READWRITE));
212
213   g_object_class_install_property (object_class,
214       PROP_CONTENT_HASH_TYPE,
215       g_param_spec_uint ("content-hash-type",
216           "file transfer hash type",
217           "The type of the file transfer hash",
218           0,
219           G_MAXUINT,
220           0,
221           G_PARAM_READWRITE));
222
223   g_object_class_install_property (object_class,
224       PROP_CONTENT_HASH,
225       g_param_spec_string ("content-hash",
226           "file transfer hash",
227           "The hash of the transfer's contents",
228           "",
229           G_PARAM_READWRITE));
230
231   g_object_class_install_property (object_class,
232       PROP_TRANSFERRED_BYTES,
233       g_param_spec_uint64 ("transferred-bytes",
234           "bytes transferred",
235           "The number of bytes transferred",
236           0,
237           G_MAXUINT64,
238           0,
239           G_PARAM_READWRITE));
240
241   g_object_class_install_property (object_class,
242       PROP_IN_STREAM,
243       g_param_spec_object ("in-stream",
244           "transfer input stream",
245           "The input stream for file transfer",
246           G_TYPE_INPUT_STREAM,
247           G_PARAM_READWRITE));
248
249   g_type_class_add_private (object_class, sizeof (EmpathyTpFilePriv));
250 }
251
252 static void
253 empathy_tp_file_init (EmpathyTpFile *tp_file)
254 {
255 }
256
257 static void
258 tp_file_finalize (GObject *object)
259 {
260   EmpathyTpFilePriv *priv;
261   EmpathyTpFile *tp_file;
262
263   tp_file = EMPATHY_TP_FILE (object);
264   priv = GET_PRIV (tp_file);
265
266   if (priv->channel)
267     {
268       DEBUG ("Closing channel..");
269       g_signal_handlers_disconnect_by_func (priv->channel,
270           tp_file_destroy_cb, object);
271       tp_cli_channel_call_close (priv->channel, -1, NULL, NULL, NULL, NULL);
272       if (G_IS_OBJECT (priv->channel))
273         g_object_unref (priv->channel);
274     }
275
276   if (priv->factory)
277     {
278       g_object_unref (priv->factory);
279     }
280   if (priv->account)
281     {
282       g_object_unref (priv->account);
283     }
284   if (priv->mc)
285     {
286       g_object_unref (priv->mc);
287     }
288
289   g_free (priv->id);
290   g_free (priv->filename);
291   g_free (priv->unix_socket_path);
292   g_free (priv->description);
293   g_free (priv->content_hash);
294   g_free (priv->content_type);
295
296   if (priv->in_stream)
297     g_object_unref (priv->in_stream);
298
299   if (priv->out_stream)
300     g_object_unref (priv->out_stream);
301
302   if (priv->contact)
303     g_object_unref (priv->contact);
304
305   if (priv->cancellable)
306     g_object_unref (priv->cancellable);
307
308   G_OBJECT_CLASS (empathy_tp_file_parent_class)->finalize (object);
309 }
310
311 static void
312 tp_file_get_all_cb (TpProxy *proxy,
313                     GHashTable *properties,
314                     const GError *error,
315                     gpointer user_data,
316                     GObject *weak_object)
317 {
318   EmpathyTpFilePriv *priv = (EmpathyTpFilePriv *) user_data;
319
320   if (error)
321     {
322       DEBUG ("Failed to get properties: %s", error->message);
323       return;
324     }
325
326   priv->size = g_value_get_uint64 (
327       g_hash_table_lookup (properties, "Size"));
328
329   priv->state = g_value_get_uint (
330       g_hash_table_lookup (properties, "State"));
331
332   /* Invalid reason, so empathy_file_get_state_change_reason() can give
333    * a warning if called for a not closed file transfer. */
334   priv->state_change_reason = -1;
335
336   priv->transferred_bytes = g_value_get_uint64 (
337       g_hash_table_lookup (properties, "TransferredBytes"));
338
339   priv->filename = g_value_dup_string (
340       g_hash_table_lookup (properties, "Filename"));
341
342   priv->content_hash = g_value_dup_string (
343       g_hash_table_lookup (properties, "ContentHash"));
344
345   priv->description = g_value_dup_string (
346       g_hash_table_lookup (properties, "Description"));
347
348   if (priv->state == EMP_FILE_TRANSFER_STATE_LOCAL_PENDING)
349     priv->incoming = TRUE;
350
351   g_hash_table_destroy (properties);
352 }
353
354 static GObject *
355 tp_file_constructor (GType type,
356                      guint n_props,
357                      GObjectConstructParam *props)
358 {
359   GObject *tp_file;
360   EmpathyTpFilePriv *priv;
361   TpHandle handle;
362
363   tp_file = G_OBJECT_CLASS (empathy_tp_file_parent_class)->constructor (type,
364       n_props, props);
365
366   priv = GET_PRIV (tp_file);
367
368   priv->factory = empathy_contact_factory_new ();
369   priv->mc = empathy_mission_control_new ();
370
371   tp_cli_channel_connect_to_closed (priv->channel,
372       (tp_cli_channel_signal_callback_closed) tp_file_closed_cb,
373       tp_file,
374       NULL, NULL, NULL);
375
376   emp_cli_channel_type_file_connect_to_file_transfer_state_changed (
377       TP_PROXY (priv->channel),
378       (emp_cli_channel_type_file_signal_callback_file_transfer_state_changed)
379           tp_file_state_changed_cb,
380       tp_file,
381       NULL, NULL, NULL);
382
383   emp_cli_channel_type_file_connect_to_transferred_bytes_changed (
384       TP_PROXY (priv->channel),
385       (emp_cli_channel_type_file_signal_callback_transferred_bytes_changed)
386           tp_file_transferred_bytes_changed_cb,
387       tp_file,
388       NULL, NULL, NULL);
389
390
391   handle = tp_channel_get_handle (priv->channel, NULL);
392   priv->contact = empathy_contact_factory_get_from_handle (priv->factory,
393       priv->account,
394       (guint) handle);
395
396   tp_cli_dbus_properties_call_get_all (priv->channel,
397       -1, EMP_IFACE_CHANNEL_TYPE_FILE, tp_file_get_all_cb, priv, NULL, NULL);
398
399   return tp_file;
400 }
401
402 static void
403 tp_file_get_property (GObject *object,
404                       guint param_id,
405                       GValue *value,
406                       GParamSpec *pspec)
407 {
408   EmpathyTpFilePriv *priv;
409   EmpathyTpFile *tp_file;
410
411   priv = GET_PRIV (object);
412   tp_file = EMPATHY_TP_FILE (object);
413
414   switch (param_id)
415     {
416       case PROP_ACCOUNT:
417         g_value_set_object (value, priv->account);
418         break;
419       case PROP_CHANNEL:
420         g_value_set_object (value, priv->channel);
421         break;
422       default:
423         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
424         break;
425     };
426 }
427
428 static void
429 tp_file_channel_set_dbus_property (gpointer proxy,
430                                    const gchar *property,
431                                    const GValue *value)
432 {
433         DEBUG ("Setting %s property", property);
434         tp_cli_dbus_properties_call_set (TP_PROXY (proxy), -1,
435             EMP_IFACE_CHANNEL_TYPE_FILE, property, value,
436             NULL, NULL, NULL, NULL);
437 }
438
439
440 static void
441 tp_file_set_property (GObject *object,
442                       guint param_id,
443                       const GValue *value,
444                       GParamSpec *pspec)
445 {
446   EmpathyTpFilePriv *priv;
447
448   priv = GET_PRIV (object);
449
450   switch (param_id)
451     {
452       case PROP_ACCOUNT:
453         priv->account = g_object_ref (g_value_get_object (value));
454         break;
455       case PROP_CHANNEL:
456         priv->channel = g_object_ref (g_value_get_object (value));
457         break;
458       case PROP_STATE:
459         priv->state = g_value_get_uint (value);
460         break;
461       case PROP_INCOMING:
462         priv->incoming = g_value_get_boolean (value);
463         break;
464       case PROP_FILENAME:
465         g_free (priv->filename);
466         priv->filename = g_value_dup_string (value);
467         tp_file_channel_set_dbus_property (priv->channel, "Filename", value);
468         break;
469       case PROP_SIZE:
470         priv->size = g_value_get_uint64 (value);
471         tp_file_channel_set_dbus_property (priv->channel, "Size", value);
472         break;
473       case PROP_CONTENT_TYPE:
474         tp_file_channel_set_dbus_property (priv->channel, "ContentType", value);
475         g_free (priv->content_type);
476         priv->content_type = g_value_dup_string (value);
477         break;
478       case PROP_CONTENT_HASH:
479         tp_file_channel_set_dbus_property (priv->channel, "ContentHash", value);
480         g_free (priv->content_hash);
481         priv->content_hash = g_value_dup_string (value);
482         break;
483       case PROP_IN_STREAM:
484         if (priv->in_stream)
485           g_object_unref (priv->in_stream);
486         priv->in_stream = g_object_ref (g_value_get_object (value));
487         break;
488       default:
489         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
490         break;
491     };
492 }
493
494 /**
495  * empathy_tp_file_new:
496  * @account: the #McAccount for the channel
497  * @channel: a Telepathy channel
498  *
499  * Creates a new #EmpathyTpFile wrapping @channel.
500  *
501  * Returns: a new #EmpathyTpFile
502  */
503 EmpathyTpFile *
504 empathy_tp_file_new (McAccount *account,
505                   TpChannel *channel)
506 {
507   return g_object_new (EMPATHY_TYPE_TP_FILE,
508       "account", account,
509       "channel", channel,
510       NULL);
511 }
512
513 /**
514  * empathy_tp_file_get_id:
515  * @tp_file: an #EmpathyTpFile
516  *
517  * Returns the ID of @tp_file.
518  *
519  * Returns: the ID
520  */
521 const gchar *
522 empathy_tp_file_get_id (EmpathyTpFile *tp_file)
523 {
524   EmpathyTpFilePriv *priv;
525
526   g_return_val_if_fail (EMPATHY_IS_TP_FILE (tp_file), NULL);
527
528   priv = GET_PRIV (tp_file);
529
530   return priv->id;
531 }
532
533 /**
534  * empathy_tp_file_get_channel
535  * @tp_file: an #EmpathyTpFile
536  *
537  * Returns the Telepathy file transfer channel
538  *
539  * Returns: the #TpChannel
540  */
541 TpChannel *
542 empathy_tp_file_get_channel (EmpathyTpFile *tp_file)
543 {
544   EmpathyTpFilePriv *priv;
545
546   g_return_val_if_fail (EMPATHY_IS_TP_FILE (tp_file), NULL);
547
548   priv = GET_PRIV (tp_file);
549
550   return priv->channel;
551 }
552
553 static void
554 tp_file_destroy_cb (TpChannel *file_channel,
555                     EmpathyTpFile *tp_file)
556 {
557   EmpathyTpFilePriv *priv;
558
559   priv = GET_PRIV (tp_file);
560
561   DEBUG ("Channel Closed or CM crashed");
562
563   g_object_unref (priv->channel);
564   priv->channel = NULL;
565 }
566
567 static void
568 tp_file_closed_cb (TpChannel *file_channel,
569                    EmpathyTpFile *tp_file,
570                    GObject *weak_object)
571 {
572   EmpathyTpFilePriv *priv;
573
574   priv = GET_PRIV (tp_file);
575
576   /* The channel is closed, do just like if the proxy was destroyed */
577   g_signal_handlers_disconnect_by_func (priv->channel,
578       tp_file_destroy_cb,
579       tp_file);
580   tp_file_destroy_cb (file_channel, tp_file);
581 }
582
583 static gint64
584 get_time_msec (void)
585 {
586   GTimeVal tv;
587
588   g_get_current_time (&tv);
589   return ((gint64) tv.tv_sec) * 1000 + tv.tv_usec / 1000;
590 }
591
592 static gint
593 _get_local_socket (EmpathyTpFile *tp_file)
594 {
595   gint fd;
596   size_t path_len;
597   struct sockaddr_un addr;
598   EmpathyTpFilePriv *priv;
599
600   priv = GET_PRIV (tp_file);
601
602   if (G_STR_EMPTY (priv->unix_socket_path))
603     return -1;
604
605   fd = socket (PF_UNIX, SOCK_STREAM, 0);
606   if (fd < 0)
607     return -1;
608
609   memset (&addr, 0, sizeof (addr));
610   addr.sun_family = AF_UNIX;
611   path_len = strlen (priv->unix_socket_path);
612   strncpy (addr.sun_path, priv->unix_socket_path, path_len);
613
614   if (connect (fd, (struct sockaddr*) &addr,
615       sizeof (addr)) < 0)
616     {
617       close (fd);
618       return -1;
619     }
620
621   return fd;
622 }
623
624 static void
625 tp_file_method_cb (TpProxy *proxy,
626                    const GValue *address,
627                    const GError *error,
628                    gpointer user_data,
629                    GObject *weak_object)
630 {
631   EmpathyTpFilePriv *priv = (EmpathyTpFilePriv *) user_data;
632
633   if (error)
634     {
635       DEBUG ("Error: %s", error->message);
636       return;
637     }
638
639   if (priv->unix_socket_path)
640     g_free (priv->unix_socket_path);
641
642   priv->unix_socket_path = g_value_dup_string (address);
643
644   DEBUG ("Got unix socket path: %s", priv->unix_socket_path);
645 }
646
647
648 /**
649  * empathy_tp_file_accept:
650  * @tp_file: an #EmpathyTpFile
651  *
652  * Accepts a file transfer that's in the "local pending" state (i.e.
653  * EMP_FILE_TRANSFER_STATE_LOCAL_PENDING).
654  */
655 void
656 empathy_tp_file_accept (EmpathyTpFile *tp_file,
657                         guint64 offset)
658 {
659   EmpathyTpFilePriv *priv;
660   GValue nothing = { 0 };
661
662   g_return_if_fail (EMPATHY_IS_TP_FILE (tp_file));
663
664   priv = GET_PRIV (tp_file);
665
666   g_return_if_fail (priv->out_stream != NULL);
667
668   DEBUG ("Accepting file: filename=%s", priv->filename);
669
670   g_value_init (&nothing, G_TYPE_STRING);
671   g_value_set_string (&nothing, "");
672
673   emp_cli_channel_type_file_call_accept_file (TP_PROXY (priv->channel),
674       -1, TP_SOCKET_ADDRESS_TYPE_UNIX, TP_SOCKET_ACCESS_CONTROL_LOCALHOST,
675       &nothing, offset, tp_file_method_cb, priv, NULL, NULL);
676 }
677
678 /**
679  * empathy_tp_file_offer:
680  * @tp_file: an #EmpathyTpFile
681  *
682  * Offers a file transfer that's in the "not offered" state (i.e.
683  * EMP_FILE_TRANSFER_STATE_NOT_OFFERED).
684  */
685 void
686 empathy_tp_file_offer (EmpathyTpFile *tp_file)
687 {
688   EmpathyTpFilePriv *priv;
689   GValue nothing = { 0 };
690
691   g_return_if_fail (EMPATHY_IS_TP_FILE (tp_file));
692
693   priv = GET_PRIV (tp_file);
694
695   g_value_init (&nothing, G_TYPE_STRING);
696   g_value_set_string (&nothing, "");
697
698   emp_cli_channel_type_file_call_offer_file (TP_PROXY (priv->channel),
699       -1, TP_SOCKET_ADDRESS_TYPE_UNIX, TP_SOCKET_ACCESS_CONTROL_LOCALHOST,
700       &nothing, tp_file_method_cb, priv, NULL, NULL);
701 }
702
703 static void
704 receive_tp_file (EmpathyTpFile *tp_file)
705 {
706   EmpathyTpFilePriv *priv;
707   GInputStream *socket_stream;
708   gint socket_fd;
709
710   priv = GET_PRIV (tp_file);
711
712   socket_fd = _get_local_socket (tp_file);
713
714   if (socket_fd < 0)
715     return;
716
717   socket_stream = g_unix_input_stream_new (socket_fd, TRUE);
718
719   priv->cancellable = g_cancellable_new ();
720
721   copy_stream (socket_stream, priv->out_stream, priv->cancellable);
722
723   g_object_unref (socket_stream);
724 }
725
726 static void
727 send_tp_file (EmpathyTpFile *tp_file)
728 {
729   gint socket_fd;
730   GOutputStream *socket_stream;
731   EmpathyTpFilePriv *priv;
732
733   priv = GET_PRIV (tp_file);
734
735   DEBUG ("Sending file content: filename=%s",
736            priv->filename);
737
738   g_return_if_fail (priv->in_stream);
739
740   socket_fd = _get_local_socket (tp_file);
741   if (socket_fd < 0)
742     {
743       DEBUG ("failed to get local socket fd");
744       return;
745     }
746   DEBUG ("got local socket fd");
747   socket_stream = g_unix_output_stream_new (socket_fd, TRUE);
748
749   priv->cancellable = g_cancellable_new ();
750
751   copy_stream (priv->in_stream, socket_stream, priv->cancellable);
752
753   g_object_unref (socket_stream);
754 }
755
756 static void
757 tp_file_state_changed_cb (DBusGProxy *tp_file_iface,
758                           EmpFileTransferState state,
759                           EmpFileTransferStateChangeReason reason,
760                           EmpathyTpFile *tp_file)
761 {
762   EmpathyTpFilePriv *priv;
763
764   priv = GET_PRIV (tp_file);
765
766   DEBUG ("File transfer state changed: filename=%s, "
767       "old state=%u, state=%u, reason=%u",
768       priv->filename, priv->state, state, reason);
769
770   if (state == EMP_FILE_TRANSFER_STATE_OPEN)
771     priv->start_time = get_time_msec ();
772
773   DEBUG ("state = %u, incoming = %s, in_stream = %s, out_stream = %s",
774       state, priv->incoming ? "yes" : "no",
775       priv->in_stream ? "present" : "not present",
776       priv->out_stream ? "present" : "not present");
777
778   if (state == EMP_FILE_TRANSFER_STATE_OPEN && !priv->incoming &&
779       priv->in_stream)
780     send_tp_file (tp_file);
781   else if (state == EMP_FILE_TRANSFER_STATE_OPEN && priv->incoming &&
782       priv->out_stream)
783     receive_tp_file (tp_file);
784
785   priv->state = state;
786   priv->state_change_reason = reason;
787
788   g_object_notify (G_OBJECT (tp_file), "state");
789 }
790
791 static void
792 tp_file_transferred_bytes_changed_cb (TpProxy *proxy,
793                                       guint64 count,
794                                       EmpathyTpFile *tp_file,
795                                       GObject *weak_object)
796 {
797   EmpathyTpFilePriv *priv;
798
799   priv = GET_PRIV (tp_file);
800
801   if (priv->transferred_bytes == count)
802     return;
803
804   priv->transferred_bytes = count;
805
806   g_object_notify (G_OBJECT (tp_file), "transferred-bytes");
807 }
808
809 EmpathyContact *
810 empathy_tp_file_get_contact (EmpathyTpFile *tp_file)
811 {
812   EmpathyTpFilePriv *priv;
813
814   priv = GET_PRIV (tp_file);
815
816   return priv->contact;
817 }
818
819 GInputStream *
820 empathy_tp_file_get_input_stream (EmpathyTpFile *tp_file)
821 {
822   EmpathyTpFilePriv *priv;
823
824   priv = GET_PRIV (tp_file);
825
826   return priv->in_stream;
827 }
828
829 GOutputStream *
830 empathy_tp_file_get_output_stream (EmpathyTpFile *tp_file)
831 {
832   EmpathyTpFilePriv *priv;
833
834   priv = GET_PRIV (tp_file);
835
836   return priv->out_stream;
837 }
838
839 const gchar *
840 empathy_tp_file_get_filename (EmpathyTpFile *tp_file)
841 {
842   EmpathyTpFilePriv *priv;
843
844   priv = GET_PRIV (tp_file);
845
846   return priv->filename;
847 }
848
849 gboolean
850 empathy_tp_file_get_incoming (EmpathyTpFile *tp_file)
851 {
852   EmpathyTpFilePriv *priv;
853
854   priv = GET_PRIV (tp_file);
855
856   return priv->incoming;
857 }
858
859 EmpFileTransferState
860 empathy_tp_file_get_state (EmpathyTpFile *tp_file)
861 {
862   EmpathyTpFilePriv *priv;
863
864   priv = GET_PRIV (tp_file);
865
866   return priv->state;
867 }
868
869 EmpFileTransferStateChangeReason
870 empathy_tp_file_get_state_change_reason (EmpathyTpFile *tp_file)
871 {
872   EmpathyTpFilePriv *priv;
873
874   priv = GET_PRIV (tp_file);
875
876   g_return_val_if_fail (priv->state_change_reason >= 0,
877       EMP_FILE_TRANSFER_STATE_CHANGE_REASON_NONE);
878
879   return priv->state_change_reason;
880 }
881
882 guint64
883 empathy_tp_file_get_size (EmpathyTpFile *tp_file)
884 {
885   EmpathyTpFilePriv *priv;
886
887   priv = GET_PRIV (tp_file);
888
889   return priv->size;
890 }
891
892 guint64
893 empathy_tp_file_get_transferred_bytes (EmpathyTpFile *tp_file)
894 {
895   EmpathyTpFilePriv *priv;
896
897   priv = GET_PRIV (tp_file);
898
899   return priv->transferred_bytes;
900 }
901
902 gint
903 empathy_tp_file_get_remaining_time (EmpathyTpFile *tp_file)
904 {
905   EmpathyTpFilePriv *priv;
906   gint64 curr_time, elapsed_time;
907   gdouble time_per_byte;
908   gdouble remaining_time;
909
910   priv = GET_PRIV (tp_file);
911
912   if (priv->size == EMPATHY_TP_FILE_UNKNOWN_SIZE)
913     return -1;
914
915   if (priv->transferred_bytes == priv->size)
916     return 0;
917
918   curr_time = get_time_msec ();
919   elapsed_time = curr_time - priv->start_time;
920   time_per_byte = (gdouble) elapsed_time / (gdouble) priv->transferred_bytes;
921   remaining_time = (time_per_byte * (priv->size - priv->transferred_bytes)) / 1000;
922
923   return (gint) (remaining_time + 0.5);
924 }
925
926 void
927 empathy_tp_file_cancel (EmpathyTpFile *tp_file)
928 {
929   EmpathyTpFilePriv *priv;
930
931   priv = GET_PRIV (tp_file);
932
933   tp_cli_channel_call_close (priv->channel, -1, NULL, NULL, NULL, NULL);
934
935   g_cancellable_cancel (priv->cancellable);
936 }
937
938 void
939 empathy_tp_file_set_input_stream (EmpathyTpFile *tp_file,
940                                   GInputStream *in_stream)
941 {
942   EmpathyTpFilePriv *priv;
943
944   priv = GET_PRIV (tp_file);
945
946   if (priv->in_stream == in_stream)
947     return;
948
949   if (priv->incoming)
950     g_warning ("Setting an input stream for incoming file "
951          "transfers is useless");
952
953   if (priv->in_stream)
954     g_object_unref (priv->in_stream);
955
956   if (in_stream)
957     g_object_ref (in_stream);
958
959   priv->in_stream = in_stream;
960
961   g_object_notify (G_OBJECT (tp_file), "in-stream");
962 }
963
964 void
965 empathy_tp_file_set_output_stream (EmpathyTpFile *tp_file,
966                                    GOutputStream *out_stream)
967 {
968   EmpathyTpFilePriv *priv;
969
970   priv = GET_PRIV (tp_file);
971
972   if (priv->out_stream == out_stream)
973     return;
974
975   if (!priv->incoming)
976     g_warning ("Setting an output stream for outgoing file "
977          "transfers is useless");
978
979   if (priv->out_stream)
980     g_object_unref (priv->out_stream);
981
982   if (out_stream)
983     g_object_ref (out_stream);
984
985   priv->out_stream = out_stream;
986 }
987
988 void
989 empathy_tp_file_set_filename (EmpathyTpFile *tp_file,
990                               const gchar *filename)
991 {
992   EmpathyTpFilePriv *priv;
993
994   priv = GET_PRIV (tp_file);
995
996   g_return_if_fail (filename != NULL);
997
998   if (priv->filename && strcmp (filename, priv->filename) == 0)
999     return;
1000
1001   g_free (priv->filename);
1002   priv->filename = g_strdup (filename);
1003
1004   g_object_notify (G_OBJECT (tp_file), "filename");
1005 }
1006
1007 /* Functions to copy the content of a GInputStream to a GOutputStream */
1008
1009 #define N_BUFFERS 2
1010 #define BUFFER_SIZE 4096
1011
1012 typedef struct {
1013   GInputStream *in;
1014   GOutputStream *out;
1015   GCancellable  *cancellable;
1016   char *buff[N_BUFFERS]; /* the temporary buffers */
1017   gsize count[N_BUFFERS]; /* how many bytes are used in the buffers */
1018   gboolean is_full[N_BUFFERS]; /* whether the buffers contain data */
1019   gint curr_read; /* index of the buffer used for reading */
1020   gint curr_write; /* index of the buffer used for writing */
1021   gboolean is_reading; /* we are reading */
1022   gboolean is_writing; /* we are writing */
1023   guint n_closed; /* number of streams that have been closed */
1024 } CopyData;
1025
1026 static void schedule_next (CopyData *copy);
1027
1028 static void
1029 free_copy_data_if_closed (CopyData *copy)
1030 {
1031   gint i;
1032
1033   /* Free the data only if both the input and output streams have
1034    * been closed. */
1035   copy->n_closed++;
1036   if (copy->n_closed < 2)
1037     return;
1038
1039   if (copy->in != NULL)
1040     g_object_unref (copy->in);
1041
1042   if (copy->out != NULL)
1043     g_object_unref (copy->out);
1044
1045   for (i = 0; i < N_BUFFERS; i++)
1046     g_free (copy->buff[i]);
1047
1048   g_object_unref (copy->cancellable);
1049   g_free (copy);
1050 }
1051
1052 static void
1053 io_error (CopyData *copy,
1054           GError *error)
1055 {
1056   g_cancellable_cancel (copy->cancellable);
1057
1058   if (error == NULL)
1059     g_warning ("I/O error");
1060   else if (error->domain == G_IO_ERROR && error->code == G_IO_ERROR_CANCELLED)
1061     ; /* Ignore cancellations */
1062   else
1063     g_warning ("I/O error: %d: %s\n", error->code, error->message);
1064
1065   if (copy->in != NULL)
1066     g_input_stream_close (copy->in, NULL, NULL);
1067
1068   if (copy->out != NULL)
1069     g_output_stream_close (copy->out, NULL, NULL);
1070
1071   free_copy_data_if_closed (copy);
1072 }
1073
1074 static void
1075 close_done (GObject *source_object,
1076             GAsyncResult *res,
1077             gpointer user_data)
1078 {
1079   CopyData *copy = user_data;
1080
1081   g_object_unref (source_object);
1082   free_copy_data_if_closed (copy);
1083 }
1084
1085 static void
1086 write_done_cb (GObject *source_object,
1087                GAsyncResult *res,
1088                gpointer user_data)
1089 {
1090   CopyData *copy = user_data;
1091   gssize count_write;
1092   GError *error = NULL;
1093
1094   count_write = g_output_stream_write_finish (copy->out, res, &error);
1095
1096   if (count_write <= 0)
1097     {
1098       io_error (copy, error);
1099       g_error_free (error);
1100       return;
1101     }
1102
1103   copy->is_full[copy->curr_write] = FALSE;
1104   copy->curr_write = (copy->curr_write + 1) % N_BUFFERS;
1105   copy->is_writing = FALSE;
1106
1107   schedule_next (copy);
1108 }
1109
1110 static void
1111 read_done_cb (GObject *source_object,
1112               GAsyncResult *res,
1113               gpointer user_data)
1114 {
1115   CopyData *copy = user_data;
1116   gssize count_read;
1117   GError *error = NULL;
1118
1119   count_read = g_input_stream_read_finish (copy->in, res, &error);
1120
1121   if (count_read == 0)
1122     {
1123       g_input_stream_close_async (copy->in, 0, copy->cancellable,
1124           close_done, copy);
1125       copy->in = NULL;
1126     }
1127   else if (count_read < 0)
1128     {
1129       io_error (copy, error);
1130       g_error_free (error);
1131       return;
1132     }
1133
1134   copy->count[copy->curr_read] = count_read;
1135   copy->is_full[copy->curr_read] = TRUE;
1136   copy->curr_read = (copy->curr_read + 1) % N_BUFFERS;
1137   copy->is_reading = FALSE;
1138
1139   schedule_next (copy);
1140 }
1141
1142 static void
1143 schedule_next (CopyData *copy)
1144 {
1145   if (copy->in != NULL &&
1146       !copy->is_reading &&
1147       !copy->is_full[copy->curr_read])
1148     {
1149       /* We are not reading and the current buffer is empty, so
1150        * start an async read. */
1151       copy->is_reading = TRUE;
1152       g_input_stream_read_async (copy->in,
1153           copy->buff[copy->curr_read],
1154           BUFFER_SIZE, 0, copy->cancellable,
1155           read_done_cb, copy);
1156     }
1157
1158   if (!copy->is_writing &&
1159       copy->is_full[copy->curr_write])
1160     {
1161       if (copy->count[copy->curr_write] == 0)
1162         {
1163           /* The last read on the buffer read 0 bytes, this
1164            * means that we got an EOF, so we can close
1165            * the output channel. */
1166           g_output_stream_close_async (copy->out, 0,
1167               copy->cancellable,
1168               close_done, copy);
1169       copy->out = NULL;
1170         }
1171       else
1172         {
1173           /* We are not writing and the current buffer contains
1174            * data, so start an async write. */
1175           copy->is_writing = TRUE;
1176           g_output_stream_write_async (copy->out,
1177               copy->buff[copy->curr_write],
1178               copy->count[copy->curr_write],
1179               0, copy->cancellable,
1180               write_done_cb, copy);
1181         }
1182     }
1183 }
1184
1185 static void
1186 copy_stream (GInputStream *in,
1187              GOutputStream *out,
1188              GCancellable *cancellable)
1189 {
1190   CopyData *copy;
1191   gint i;
1192
1193   g_return_if_fail (in != NULL);
1194   g_return_if_fail (out != NULL);
1195
1196   copy = g_new0 (CopyData, 1);
1197   copy->in = g_object_ref (in);
1198   copy->out = g_object_ref (out);
1199
1200   if (cancellable != NULL)
1201     copy->cancellable = g_object_ref (cancellable);
1202   else
1203     copy->cancellable = g_cancellable_new ();
1204
1205   for (i = 0; i < N_BUFFERS; i++)
1206     copy->buff[i] = g_malloc (BUFFER_SIZE);
1207
1208   schedule_next (copy);
1209 }