may have gotten the gremlin; was an interaction that separate used to handle, but...
[feisty_meow.git] / octopi / library / tentacles / file_transfer_tentacle.cpp
1 /*****************************************************************************\
2 *                                                                             *
3 *  Name   : file_transfer_tentacle                                            *
4 *  Author : Chris Koeritz                                                     *
5 *                                                                             *
6 *******************************************************************************
7 * Copyright (c) 2005-$now By Author.  This program is free software; you can  *
8 * redistribute it and/or modify it under the terms of the GNU General Public  *
9 * License as published by the Free Software Foundation; either version 2 of   *
10 * the License or (at your option) any later version.  This is online at:      *
11 *     http://www.fsf.org/copyleft/gpl.html                                    *
12 * Please send any updates to: fred@gruntose.com                               *
13 \*****************************************************************************/
14
15 #include "file_transfer_tentacle.h"
16
17 #include <basis/mutex.h>
18 #include <filesystem/directory_tree.h>
19 #include <filesystem/filename.h>
20 #include <filesystem/filename_list.h>
21 #include <filesystem/heavy_file_ops.h>
22 #include <loggers/program_wide_logger.h>
23 #include <octopus/entity_defs.h>
24 #include <octopus/unhandled_request.h>
25 #include <processes/ethread.h>
26 #include <textual/parser_bits.h>
27
28 using namespace basis;
29 using namespace filesystem;
30 using namespace loggers;
31 using namespace octopi;
32 using namespace processes;
33 using namespace structures;
34 using namespace textual;
35 using namespace timely;
36
37 namespace octopi {
38
39 #undef AUTO_LOCK
40 #define AUTO_LOCK auto_synchronizer loc(*_lock);
41   // protects our lists.
42
43 const int FTT_CLEANING_INTERVAL = 30 * SECOND_ms;
44   // this is how frequently we clean up the list to remove outdated transfers.
45
46 const int TRANSFER_TIMEOUT = 10 * MINUTE_ms;
47   // if it hasn't been touched in this long, it's out of there.
48
49 #define DEBUG_FILE_TRANSFER_TENTACLE
50   // uncomment for noisier version.
51
52 #undef LOG
53 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
54
55 //////////////
56
57 class file_transfer_record 
58 {
59 public:
60   // valid for both transfers and correspondences.
61   astring _src_root;  // where the info is on the data provider.
62   time_stamp _last_active;  // when this was last used.
63
64   // valid for file transfers only.
65   octopus_entity _ent;  // the entity requesting this service.
66   astring _dest_root;  // where the info is on the data sink.
67   filename_list *_diffs;  // the differences to be transferred.
68   file_transfer_header _last_sent;  // the last chunk that was sent.
69   bool _done;  // true if the transfer is finished.
70   string_array _includes;  // the set to include.
71
72   // valid for correspondence records only.
73   directory_tree *_local_dir;  // our local information about the transfer.
74   astring _source_mapping;  // valid for a correspondence record.
75   int _refresh_interval;  // the rate of refreshing the source tree.
76
77   file_transfer_record() : _diffs(NIL), _last_sent(file_time()),
78       _done(false), _local_dir(NIL)
79   {}
80
81   ~file_transfer_record() {
82     WHACK(_local_dir);
83     WHACK(_diffs);
84   }
85
86   astring text_form() const {
87     astring to_return;
88     to_return += astring("src=") + _src_root + astring(" last act=")
89         + _last_active.text_form();
90     if (_ent.blank()) to_return += astring(" ent=") + _ent.text_form();
91     if (_dest_root.t()) {
92       to_return += astring(" dest=") + _dest_root;
93       to_return += astring(" last_sent=") + _last_sent.text_form();
94     }
95     return to_return;
96   }
97 };
98
99 //////////////
100
101 // this implementation assumes that the same entity will never simultaneously
102 // transfer the same source to the same destination.  that assumption holds
103 // up fine for different clients, since they should have different entities.
104 // when there is a collision on the entity/src/dest, then the default action
105 // is to assume that the transfer is just being started over.
106
107 class file_transfer_status : public amorph<file_transfer_record>
108 {
109 public:
110   // find a transfer record by the key fields.
111   file_transfer_record *find(const octopus_entity &ent, const astring &src,
112       const astring &dest) {
113     for (int i = 0; i < elements(); i++) {
114       const file_transfer_record *rec = get(i);
115       if (rec && (rec->_ent == ent) && (rec->_src_root == src)
116           && (rec->_dest_root == dest) ) {
117         return borrow(i);
118       }
119     }
120     return NIL;
121   }
122
123   virtual ~file_transfer_status() {}
124
125   DEFINE_CLASS_NAME("file_transfer_status");
126
127   // find a file correspondence record by the mapping name.
128   file_transfer_record *find_mapping(const astring &source_mapping) {
129     for (int i = 0; i < elements(); i++) {
130       const file_transfer_record *rec = get(i);
131       if (rec && (rec->_source_mapping == source_mapping) )
132         return borrow(i);
133     }
134     return NIL;
135   }
136
137   // turns a source mapping into the location that it corresponds to.
138   astring translate(const astring &source_path) const {
139     FUNCDEF("translate");
140     string_array pieces;
141     bool rooted;
142     filename(source_path).separate(rooted, pieces);
143     astring source_mapping = pieces[0];
144     pieces.zap(0, 0);  // remove source part.
145
146     for (int i = 0; i < elements(); i++) {
147       const file_transfer_record *rec = get(i);
148       if (rec && (rec->_source_mapping == source_mapping) ) {
149         return rec->_src_root;
150       }
151     }
152     return astring::empty_string();
153   }
154
155   // removes a file transfer record by the key fields.
156   bool whack(const octopus_entity &ent, const astring &src,
157       const astring &dest) {
158     for (int i = 0; i < elements(); i++) {
159       const file_transfer_record *rec = get(i);
160       if (rec && (rec->_ent == ent) && (rec->_src_root == src)
161           && (rec->_dest_root == dest) ) {
162         zap(i, i);
163         return true;
164       }
165     }
166     return false;
167   }
168
169   // clean all records for the entity "ent".
170   void whack_all(const octopus_entity &ent) {
171     for (int i = elements() - 1; i >= 0; i--) {
172       const file_transfer_record *rec = get(i);
173       if (rec && (rec->_ent == ent) )
174         zap(i, i);
175     }
176   }
177
178   // removes a file transfer correspondence.
179   bool whack_mapping(const astring &source_mapping) {
180     for (int i = elements() - 1; i >= 0; i--) {
181       const file_transfer_record *rec = get(i);
182       if (rec && (rec->_source_mapping == source_mapping) ) {
183         zap(i, i);
184         return true;
185       }
186     }
187     return false;
188   }
189
190   // returns a string dump of the fields in this list.
191   astring text_form() const {
192     astring to_return;
193     for (int i = 0; i < elements(); i++) {
194       const file_transfer_record *rec = get(i);
195       if (rec)
196         to_return += rec->text_form() + parser_bits::platform_eol_to_chars();
197     }
198     return to_return;
199   }
200 };
201
202 //////////////
203
204 class file_transfer_cleaner : public ethread
205 {
206 public:
207   file_transfer_cleaner(file_transfer_tentacle &parent)
208       : ethread(FTT_CLEANING_INTERVAL, SLACK_INTERVAL), _parent(parent) {}
209
210   virtual void perform_activity(void *formal(ptr)) { _parent.periodic_actions(); }
211
212 private:
213   file_transfer_tentacle &_parent;
214 };
215
216 //////////////
217
218 file_transfer_tentacle::file_transfer_tentacle(int maximum_transfer,
219     file_transfer_tentacle::transfer_modes mode_of_transfer)
220 : tentacle_helper<file_transfer_infoton>
221       (file_transfer_infoton::file_transfer_classifier(), false),
222   _maximum_transfer(maximum_transfer),
223   _transfers(new file_transfer_status),
224   _correspondences(new file_transfer_status),
225   _lock(new mutex),
226   _cleaner(new file_transfer_cleaner(*this)),
227   _mode(mode_of_transfer)
228 {
229   _cleaner->start(NIL);
230 }
231
232 file_transfer_tentacle::~file_transfer_tentacle()
233 {
234   _cleaner->stop();
235   WHACK(_transfers);
236   WHACK(_correspondences);
237   WHACK(_cleaner);
238   WHACK(_lock);
239 }
240
241 astring file_transfer_tentacle::text_form() const
242 {
243   AUTO_LOCK;
244   return _transfers->text_form();
245 }
246
247 void file_transfer_tentacle::expunge(const octopus_entity &to_remove)
248 {
249   AUTO_LOCK;
250   _transfers->whack_all(to_remove);
251 }
252
253 outcome file_transfer_tentacle::add_correspondence
254     (const astring &source_mapping, const astring &source_root,
255      int refresh_interval)
256 {
257   FUNCDEF("add_correspondence");
258   AUTO_LOCK;
259
260   remove_correspondence(source_mapping);  // clean the old one out first.
261
262   // create new file transfer record to hold this correspondence.
263   file_transfer_record *new_record = new file_transfer_record;
264   new_record->_source_mapping = source_mapping;
265   new_record->_src_root = source_root;
266   new_record->_refresh_interval = refresh_interval;
267   new_record->_local_dir = new directory_tree(source_root);
268 //hmmm: doesn't say anything about a pattern.  do we need to worry about that?
269
270   // check that the directory looked healthy.
271   if (!new_record->_local_dir->good()) {
272     WHACK(new_record);
273     return common::ACCESS_DENIED;
274   }
275 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
276   LOG(astring("adding tree for: ent=") + new_record->_ent.text_form()
277       + " src=" + new_record->_src_root);
278 #endif
279   // calculate size and checksum info for the directory.
280   new_record->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
281
282 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
283   LOG(astring("done adding tree for: ent=") + new_record->_ent.text_form()
284       + " src=" + new_record->_src_root);
285 #endif
286
287   _correspondences->append(new_record);
288
289   return OKAY;
290 }
291
292 outcome file_transfer_tentacle::remove_correspondence
293     (const astring &source_mapping)
294 {
295   AUTO_LOCK;
296   if (!_correspondences->whack_mapping(source_mapping))
297     return NOT_FOUND;
298   return OKAY;
299 }
300
301 bool file_transfer_tentacle::get_differences(const octopus_entity &ent,
302     const astring &src, const astring &dest, filename_list &diffs)
303 {
304   FUNCDEF("get_differences");
305   diffs.reset();
306   AUTO_LOCK;
307   file_transfer_record *the_rec = _transfers->find(ent, src, dest);
308   if (!the_rec) return false;
309   if (!the_rec->_diffs) return false;  // no diffs listed.
310   diffs = *the_rec->_diffs;
311   return true;
312 }
313
314 bool file_transfer_tentacle::status(const octopus_entity &ent,
315     const astring &src, const astring &dest, double &total_size,
316     int &total_files, double &current_size, int &current_files, bool &done,
317     time_stamp &last_active)
318 {
319   FUNCDEF("status");
320   total_size = 0;
321   total_files = 0;
322   current_files = 0;
323   current_size = 0;
324   AUTO_LOCK;
325   file_transfer_record *the_rec = _transfers->find(ent, src, dest);
326   if (!the_rec) return false;
327   done = the_rec->_done;
328   last_active = the_rec->_last_active;
329
330   if (the_rec->_diffs) {
331     the_rec->_diffs->calculate_progress(the_rec->_last_sent._filename,
332         the_rec->_last_sent._byte_start + the_rec->_last_sent._length,
333         current_files, current_size);
334     total_files = the_rec->_diffs->total_files();
335     total_size = the_rec->_diffs->total_size();
336   }
337
338   return true;
339 }
340
341 outcome file_transfer_tentacle::register_file_transfer
342     (const octopus_entity &ent, const astring &src_root,
343     const astring &dest_root, const string_array &includes)
344 {
345   FUNCDEF("register_file_transfer");
346   AUTO_LOCK;
347   // make sure that this isn't an existing transfer.  if so, we just update
348   // the status.
349   file_transfer_record *the_rec = _transfers->find(ent, src_root, dest_root);
350   if (!the_rec) {
351     the_rec = new file_transfer_record;
352     the_rec->_src_root = src_root;
353     the_rec->_dest_root = dest_root;
354     the_rec->_ent = ent;
355     the_rec->_includes = includes;
356     _transfers->append(the_rec);  // add the new record.
357   } else {
358     the_rec->_done = false;
359     the_rec->_includes = includes;
360     the_rec->_last_active.reset();  // freshen up the last activity time.
361   }
362   return OKAY;
363 }
364
365 outcome file_transfer_tentacle::cancel_file_transfer(const octopus_entity &ent,
366     const astring &src_root, const astring &dest_root)
367 {
368   AUTO_LOCK;
369   return _transfers->whack(ent, src_root, dest_root)?  OKAY : NOT_FOUND;
370 }
371
372 directory_tree *file_transfer_tentacle::lock_directory(const astring &key)
373 {
374   _lock->lock();
375   file_transfer_record *the_rec = _correspondences->find_mapping(key);
376   if (!the_rec || !the_rec->_local_dir) {
377     _lock->unlock();
378     return NIL;  // unknown transfer.
379   }
380   return the_rec->_local_dir;
381 }
382
383 void file_transfer_tentacle::unlock_directory()
384 {
385   _lock->unlock();
386 }
387
388 bool file_transfer_tentacle::add_path(const astring &key,
389     const astring &new_path)
390 {
391   AUTO_LOCK;
392   file_transfer_record *the_rec = _correspondences->find_mapping(key);
393   if (!the_rec) return false;  // unknown transfer.
394   if (!the_rec->_local_dir) return false;  // not right type.
395   return the_rec->_local_dir->add_path(new_path) == common::OKAY;
396 }
397
398 bool file_transfer_tentacle::remove_path(const astring &key,
399     const astring &old_path)
400 {
401   AUTO_LOCK;
402   file_transfer_record *the_rec = _correspondences->find_mapping(key);
403   if (!the_rec) return false;  // unknown transfer.
404   if (!the_rec->_local_dir) return false;  // not right type.
405   return the_rec->_local_dir->remove_path(old_path) == common::OKAY;
406 }
407
408 void file_transfer_tentacle::periodic_actions()
409 {
410   FUNCDEF("periodic_actions");
411   AUTO_LOCK;
412
413   // first, we'll clean out old transfers.
414   time_stamp oldest_allowed(-TRANSFER_TIMEOUT);
415     // nothing older than this should be kept.
416   for (int i = _transfers->elements() - 1; i >= 0; i--) {
417     const file_transfer_record *curr = _transfers->get(i);
418     if (curr->_last_active < oldest_allowed) {
419 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
420       LOG(astring("cleaning record for: ent=") + curr->_ent.text_form()
421           + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
422 #endif
423       _transfers->zap(i, i);
424     }
425   }
426
427   // then we'll rescan any trees that are ready for it.
428   for (int i = 0; i < _correspondences->elements(); i++) {
429     file_transfer_record *curr = _correspondences->borrow(i);
430     if (curr->_last_active < time_stamp(-curr->_refresh_interval)) {
431       if (curr->_local_dir) {
432 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
433         LOG(astring("refreshing tree for: ent=") + curr->_ent.text_form()
434             + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
435 #endif
436         WHACK(curr->_local_dir);
437         curr->_local_dir = new directory_tree(curr->_src_root);
438         curr->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
439 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
440         LOG(astring("done refreshing tree for: ent=") + curr->_ent.text_form()
441             + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
442 #endif
443       }
444       curr->_last_active.reset();  // reset our action time.
445     }
446   }
447 }
448
449 outcome file_transfer_tentacle::reconstitute(const string_array &classifier,
450     byte_array &packed_form, infoton * &reformed)
451 {
452   // this method doesn't use the lists, so it doesn't need locking.
453   if (classifier != file_transfer_infoton::file_transfer_classifier())
454     return NO_HANDLER;
455   return reconstituter(classifier, packed_form, reformed,
456       (file_transfer_infoton *)NIL);
457 }
458
459 // the "handle_" and "conclude_" methods are thread-safe because the mutex is locked before
460 // their invocations.
461
462 outcome file_transfer_tentacle::handle_tree_compare_request
463     (file_transfer_infoton &req, const octopus_request_id &item_id)
464 {
465   FUNCDEF("handle_tree_compare_request");
466
467   // get the mapping from the specified location on this side.
468   filename splitting(req._src_root);
469   string_array pieces;
470   bool rooted;
471   splitting.separate(rooted, pieces);
472   astring source_mapping = pieces[0];
473
474   // patch the name up to find the sub_path for the source.
475   filename source_start;
476   pieces.zap(0, 0);
477   source_start.join(rooted, pieces);
478
479   // locate the allowed transfer depot for the mapping they provided.
480   file_transfer_record *mapping_record
481       = _correspondences->find_mapping(source_mapping);
482   if (!mapping_record) {
483     LOG(astring("could not find source mapping of ") + source_mapping);
484     return NOT_FOUND;
485   }
486
487   // unpack the tree that they sent us which describes their local area.
488   directory_tree *dest_tree = new directory_tree;
489   if (!dest_tree->unpack(req._packed_data)) {
490     LOG(astring("could not unpack requester's directory tree"));
491     WHACK(dest_tree);
492     return GARBAGE;
493   }
494
495   string_array requested_names;
496   if (!requested_names.unpack(req._packed_data)) {
497     LOG(astring("could not unpack requester's filename includes"));
498     WHACK(dest_tree);
499     return GARBAGE;
500   }
501
502   // look up to see if this is about something that has already been seen.
503   // we don't want to add a new transfer record if they're already working on
504   // this.  that also lets them do a new tree compare to restart the transfer.
505   file_transfer_record *the_rec = _transfers->find(item_id._entity,
506       req._src_root, req._dest_root);
507   if (!the_rec) {
508     // there was no existing record; we'll create a new one.
509     the_rec = new file_transfer_record;
510     the_rec->_ent = item_id._entity;
511     the_rec->_src_root = req._src_root;
512     the_rec->_dest_root = req._dest_root;
513     _transfers->append(the_rec);
514   } else {
515     // record some activity on this record.
516     the_rec->_done = false;
517     the_rec->_last_active.reset();
518   }
519
520   the_rec->_diffs = new filename_list;
521
522   int how_comp = file_info::EQUAL_NAME;  // the prize for doing nothing.
523   if (_mode & COMPARE_SIZE_AND_TIME)
524     how_comp |= file_info::EQUAL_FILESIZE | file_info::EQUAL_TIMESTAMP;
525   if (_mode & COMPARE_CONTENT_SAMPLE)
526     how_comp |= file_info::EQUAL_CHECKSUM;
527
528   // compare the two trees of files.
529   directory_tree::compare_trees(*mapping_record->_local_dir,
530       source_start.raw(), *dest_tree, astring::empty_string(),
531       *the_rec->_diffs, (file_info::file_similarity)how_comp);
532
533 //LOG(astring("filenames decided as different:\n") + the_rec->_diffs->text_form());
534
535   // now prune the diffs to accord with what they claim they want.
536   if (requested_names.length()) {
537     for (int i = the_rec->_diffs->elements() - 1; i >= 0; i--) {
538       filename diff_curr = *the_rec->_diffs->get(i);
539       bool found = false;
540       for (int j = 0; j < requested_names.length(); j++) {
541         filename req_curr(requested_names[j]);
542         if (req_curr.compare_suffix(diff_curr)) {
543           found = true;
544 //LOG(astring("will use: ") + req_curr);
545           break;
546         }
547       }
548       if (!found) the_rec->_diffs->zap(i, i);
549     }
550   }
551
552   req._packed_data.reset();  // clear out existing stuff before cloning.
553   file_transfer_infoton *reply = dynamic_cast<file_transfer_infoton *>(req.clone());
554   the_rec->_diffs->pack(reply->_packed_data);
555
556 //hmmm: does the other side really need the list of filenames?  i guess we
557 //      could check validity of what's transferred or check space available
558 //      before the client starts the transfer.
559
560   reply->_request = false;  // it's a response now.
561   store_product(reply, item_id);
562     // send back the comparison list.
563
564   return OKAY;
565 }
566
567 outcome file_transfer_tentacle::handle_tree_compare_response
568     (file_transfer_infoton &resp, const octopus_request_id &item_id)
569 {
570   FUNCDEF("handle_tree_compare_response");
571   file_transfer_record *the_rec = _transfers->find(item_id._entity,
572       resp._src_root, resp._dest_root);
573   if (!the_rec) {
574     LOG(astring("could not find the record for this transfer: item=")
575         + item_id.text_form() + " src=" + resp._src_root + " dest="
576         + resp._dest_root);
577     return NOT_FOUND;  // not registered, so reject it.
578   }
579
580   the_rec->_last_active.reset();  // record some activity on this record.
581
582   filename_list *flist = new filename_list;
583   if (!flist->unpack(resp._packed_data)) {
584     WHACK(flist);
585     return GARBAGE;
586   }
587
588 //hmmm: verify space on device?
589
590   the_rec->_diffs = flist;  // set the list of differences.
591   return OKAY;
592 }
593
594 outcome file_transfer_tentacle::handle_storage_request
595     (file_transfer_infoton &req, const octopus_request_id &item_id)
596 {
597   FUNCDEF("handle_storage_request");
598   if (_mode & ONLY_REPORT_DIFFS) {
599     // store an unhandled infoton.
600     unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
601     store_product(deny, item_id);
602     return NO_HANDLER;
603   }
604
605   // look up the transfer record.
606   file_transfer_record *the_rec = _transfers->find(item_id._entity,
607       req._src_root, req._dest_root);
608   if (!the_rec) {
609     LOG(astring("could not find the record for this transfer: item=")
610         + item_id.text_form() + " src=" + req._src_root + " dest="
611         + req._dest_root);
612     return NOT_FOUND;  // not registered, so reject it.
613   }
614
615   the_rec->_last_active.reset();  // mark it as still active.
616
617   file_transfer_infoton *resp = dynamic_cast<file_transfer_infoton *>(req.clone());
618
619   if (!the_rec->_diffs) return BAD_INPUT;  // wrong type of object.
620
621   outcome bufret = heavy_file_operations::buffer_files
622       (_correspondences->translate(the_rec->_src_root), *the_rec->_diffs,
623       the_rec->_last_sent, resp->_packed_data, _maximum_transfer);
624   if (bufret == heavy_file_operations::FINISHED) {
625     bufret = OKAY;  // in either case, we don't emit a finished outcome; handled elsewhere.
626     if (!resp->_packed_data.length()) {
627       // blank packages, so finish by setting command to be a conclude marker.
628       the_rec->_done = true;
629       resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER;
630     }
631   } else if (bufret != OKAY) {
632     // complain, but still send.
633     LOG(astring("buffer files returned an error on item=")
634         + item_id.text_form() + " src=" + req._src_root + " dest="
635         + req._dest_root);
636   }
637
638 //can remove this block if stops saying it.
639   if ((bufret == OKAY) && !resp->_packed_data.length() ) {
640     LOG("marking empty transfer as done; why not caught above at FINISHED check?");
641     the_rec->_done = true;
642     resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER;
643   }
644 //end of can remove.
645
646   resp->_request = false;  // it's a response now.
647   store_product(resp, item_id);
648   return bufret;
649 }
650
651 outcome file_transfer_tentacle::handle_storage_response
652     (file_transfer_infoton &resp, const octopus_request_id &item_id)
653 {
654   FUNCDEF("handle_storage_response");
655   if (_mode & ONLY_REPORT_DIFFS) {
656     // not spoken here.
657     return NO_HANDLER;
658   }
659
660   // look up the transfer record.
661   file_transfer_record *the_rec = _transfers->find(item_id._entity,
662       resp._src_root, resp._dest_root);
663   if (!the_rec) return NOT_FOUND;  // not registered, so reject it.
664
665   the_rec->_last_active.reset();  // mark it as still active.
666
667   if (!resp._packed_data.length()) {
668     // mark that we're done now.
669     the_rec->_done = true;
670   }
671
672   // chew on all the things they sent us.
673   while (resp._packed_data.length()) {
674     file_time empty;
675     file_transfer_header found(empty);
676     if (!found.unpack(resp._packed_data)) {
677       // bomb out now.
678       LOG(astring("corruption seen on item=") + item_id.text_form()
679           + " src=" + resp._src_root + " dest=" + resp._dest_root);
680       return GARBAGE;
681     }
682     the_rec->_last_sent = found;
683
684     if (found._length > resp._packed_data.length()) {
685       // another case for leaving--not enough data left in the buffer.
686       LOG(astring("data underflow seen on item=") + item_id.text_form()
687           + " src=" + resp._src_root + " dest=" + resp._dest_root);
688       return GARBAGE;
689     }
690     byte_array to_write = resp._packed_data.subarray(0, found._length - 1);
691     resp._packed_data.zap(0, found._length - 1);
692
693     if (!the_rec->_diffs) return BAD_INPUT;
694
695     const file_info *recorded_info = the_rec->_diffs->find(found._filename);
696     if (!recorded_info) {
697       LOG(astring("unrequested file seen: ") + found._filename);
698       continue;  // maybe there are others that aren't confused.
699     }
700
701     astring full_file = resp._dest_root + filename::default_separator()
702         + recorded_info->secondary();
703 LOG(astring("telling it to write to fullfile: ") + full_file);
704
705     outcome ret = heavy_file_operations::write_file_chunk(full_file,
706         found._byte_start, to_write);
707     if (ret != OKAY) {
708       LOG(astring("failed to write file chunk: error=")
709           + heavy_file_operations::outcome_name(ret) + " file=" + full_file
710           + a_sprintf(" start=%d len=%d", found._byte_start, found._length));
711     }
712     found._time.set_time(full_file);
713   }
714
715   // there is no response product to store.
716   return OKAY;
717 }
718
719 outcome file_transfer_tentacle::conclude_storage_request
720     (file_transfer_infoton &req, const octopus_request_id &item_id)
721 {
722   FUNCDEF("conclude_storage_request");
723   if (_mode & ONLY_REPORT_DIFFS) {
724     // store an unhandled infoton.
725     unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
726     store_product(deny, item_id);
727     return NO_HANDLER;
728   }
729
730   // look up the transfer record.
731   file_transfer_record *the_rec = _transfers->find(item_id._entity,
732       req._src_root, req._dest_root);
733   if (!the_rec) {
734     LOG(astring("could not find the record for this transfer: item=")
735         + item_id.text_form() + " src=" + req._src_root + " dest="
736         + req._dest_root);
737     return NOT_FOUND;  // not registered, so reject it.
738   }
739
740   the_rec->_last_active.reset();  // mark it as still active.
741
742   file_transfer_infoton *resp = dynamic_cast<file_transfer_infoton *>(req.clone());
743
744   if (!the_rec->_diffs) return BAD_INPUT;  // wrong type of object.
745
746   the_rec->_done = true;  // we're concluding the transfer, so that's that.
747   resp->_request = false;  // it's a response now.
748   store_product(resp, item_id);
749
750   LOG(astring("concluding transfer request on src=") + req._src_root + " dest="
751       + req._dest_root);
752
753   return common::OKAY;
754 }
755
756 outcome file_transfer_tentacle::conclude_storage_response
757     (file_transfer_infoton &resp, const octopus_request_id &item_id)
758 {
759   FUNCDEF("conclude_storage_response");
760   if (_mode & ONLY_REPORT_DIFFS) {
761     // not spoken here.
762     return NO_HANDLER;
763   }
764
765   // look up the transfer record.
766   file_transfer_record *the_rec = _transfers->find(item_id._entity,
767       resp._src_root, resp._dest_root);
768   if (!the_rec) return NOT_FOUND;  // not registered, so reject it.
769
770   the_rec->_last_active.reset();  // mark it as still active.
771
772   // mark that we're done now.
773   the_rec->_done = true;
774
775   LOG(astring("concluding transfer response on src=") + resp._src_root + " dest="
776       + resp._dest_root);
777
778   // there is no response product to store.
779   return OKAY;
780 }
781
782 // consume() is the only method that is allowed to invoke the "handle_X" methods
783 // and it must lock the object beforehand.
784
785 outcome file_transfer_tentacle::consume(infoton &to_chow,
786     const octopus_request_id &item_id, byte_array &transformed)
787 {
788   FUNCDEF("consume");
789   transformed.reset();
790   file_transfer_infoton *inf = dynamic_cast<file_transfer_infoton *>(&to_chow);
791   if (!inf) return DISALLOWED;  // not for us.
792
793   AUTO_LOCK;  // protect our lists while we're working on them.
794
795   switch (inf->_command) {
796     case file_transfer_infoton::TREE_COMPARISON: {
797       if (inf->_request) return handle_tree_compare_request(*inf, item_id);
798       else return handle_tree_compare_response(*inf, item_id);
799     }
800     case file_transfer_infoton::PLACE_FILE_CHUNKS: {
801       if (inf->_request) return handle_storage_request(*inf, item_id);
802       else return handle_storage_response(*inf, item_id);
803     }
804     case file_transfer_infoton::CONCLUDE_TRANSFER_MARKER: {
805       if (inf->_request) return conclude_storage_request(*inf, item_id);
806       else return conclude_storage_response(*inf, item_id);
807     }
808   }
809   return BAD_INPUT;  // not a recognized command.
810 }
811
812 outcome file_transfer_tentacle::refresh_now(const astring &source_mapping)
813 {
814   FUNCDEF("refresh_now");
815   AUTO_LOCK;
816   for (int i = 0; i < _correspondences->elements(); i++) {
817     file_transfer_record *curr = _correspondences->borrow(i);
818     if (!curr) continue;
819     if (curr->_source_mapping != source_mapping) continue;
820     if (curr->_local_dir) {
821 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
822       LOG(astring("refreshing tree for: ent=") + curr->_ent.text_form()
823           + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
824 #endif
825       WHACK(curr->_local_dir);
826       curr->_local_dir = new directory_tree(curr->_src_root);
827       curr->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
828 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
829       LOG(astring("done refreshing tree for: ent=") + curr->_ent.text_form()
830           + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
831 #endif
832     }
833     curr->_last_active.reset();  // reset our action time.
834     return OKAY;
835   }
836   return NOT_FOUND;
837 }
838
839 } //namespace.
840
841