first check-in of feisty meow codebase. many things broken still due to recent
[feisty_meow.git] / octopi / library / tentacles / file_transfer_tentacle.cpp
1 /*****************************************************************************\
2 *                                                                             *
3 *  Name   : file_transfer_tentacle                                            *
4 *  Author : Chris Koeritz                                                     *
5 *                                                                             *
6 *******************************************************************************
7 * Copyright (c) 2005-$now By Author.  This program is free software; you can  *
8 * redistribute it and/or modify it under the terms of the GNU General Public  *
9 * License as published by the Free Software Foundation; either version 2 of   *
10 * the License or (at your option) any later version.  This is online at:      *
11 *     http://www.fsf.org/copyleft/gpl.html                                    *
12 * Please send any updates to: fred@gruntose.com                               *
13 \*****************************************************************************/
14
15 #include "file_transfer_tentacle.h"
16
17 #include <basis/mutex.h>
18 #include <filesystem/directory_tree.h>
19 #include <filesystem/filename.h>
20 #include <filesystem/filename_list.h>
21 #include <filesystem/heavy_file_ops.h>
22 #include <loggers/program_wide_logger.h>
23 #include <octopus/entity_defs.h>
24 #include <octopus/unhandled_request.h>
25 #include <processes/ethread.h>
26 #include <textual/parser_bits.h>
27
28 using namespace basis;
29 using namespace filesystem;
30 using namespace loggers;
31 using namespace octopi;
32 using namespace processes;
33 using namespace structures;
34 using namespace textual;
35 using namespace timely;
36
37 namespace octopi {
38
39 #undef AUTO_LOCK
40 #define AUTO_LOCK auto_synchronizer loc(*_lock);
41   // protects our lists.
42
43 const int FTT_CLEANING_INTERVAL = 30 * SECOND_ms;
44   // this is how frequently we clean up the list to remove outdated transfers.
45
46 const int TRANSFER_TIMEOUT = 10 * MINUTE_ms;
47   // if it hasn't been touched in this long, it's out of there.
48
49 //#define DEBUG_FILE_TRANSFER_TENTACLE
50   // uncomment for noisier version.
51
52 #undef LOG
53 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
54
55 //////////////
56
57 class file_transfer_record 
58 {
59 public:
60   // valid for both transfers and correspondences.
61   astring _src_root;  // where the info is on the data provider.
62   time_stamp _last_active;  // when this was last used.
63
64   // valid for file transfers only.
65   octopus_entity _ent;  // the entity requesting this service.
66   astring _dest_root;  // where the info is on the data sink.
67   filename_list *_diffs;  // the differences to be transferred.
68   file_transfer_header _last_sent;  // the last chunk that was sent.
69   bool _done;  // true if the transfer is finished.
70   string_array _includes;  // the set to include.
71
72   // valid for correspondence records only.
73   directory_tree *_local_dir;  // our local information about the transfer.
74   astring _source_mapping;  // valid for a correspondence record.
75   int _refresh_interval;  // the rate of refreshing the source tree.
76
77   file_transfer_record() : _diffs(NIL), _last_sent(file_time()),
78       _done(false), _local_dir(NIL)
79   {}
80
81   ~file_transfer_record() {
82     WHACK(_local_dir);
83     WHACK(_diffs);
84   }
85
86   astring text_form() const {
87     astring to_return;
88     to_return += astring("src=") + _src_root + astring(" last act=")
89         + _last_active.text_form();
90     if (_ent.blank()) to_return += astring(" ent=") + _ent.text_form();
91     if (_dest_root.t()) {
92       to_return += astring(" dest=") + _dest_root;
93       to_return += astring(" last_sent=") + _last_sent.text_form();
94     }
95     return to_return;
96   }
97 };
98
99 //////////////
100
101 // this implementation assumes that the same entity will never simultaneously
102 // transfer the same source to the same destination.  that assumption holds
103 // up fine for different clients, since they should have different entities.
104 // when there is a collision on the entity/src/dest, then the default action
105 // is to assume that the transfer is just being started over.
106
107 class file_transfer_status : public amorph<file_transfer_record>
108 {
109 public:
110   // find a transfer record by the key fields.
111   file_transfer_record *find(const octopus_entity &ent, const astring &src,
112       const astring &dest) {
113     for (int i = 0; i < elements(); i++) {
114       const file_transfer_record *rec = get(i);
115       if (rec && (rec->_ent == ent) && (rec->_src_root == src)
116           && (rec->_dest_root == dest) ) {
117         return borrow(i);
118       }
119     }
120     return NIL;
121   }
122
123   virtual ~file_transfer_status() {}
124
125   DEFINE_CLASS_NAME("file_transfer_status");
126
127   // find a file correspondence record by the mapping name.
128   file_transfer_record *find_mapping(const astring &source_mapping) {
129     for (int i = 0; i < elements(); i++) {
130       const file_transfer_record *rec = get(i);
131       if (rec && (rec->_source_mapping == source_mapping) )
132         return borrow(i);
133     }
134     return NIL;
135   }
136
137   // turns a source mapping into the location that it corresponds to.
138   astring translate(const astring &source_path) const {
139 //    FUNCDEF("translate");
140     string_array pieces;
141     filename(source_path).separate(pieces);
142     astring source_mapping = pieces[0];
143     pieces.zap(0, 0);  // remove source part.
144
145     for (int i = 0; i < elements(); i++) {
146       const file_transfer_record *rec = get(i);
147       if (rec && (rec->_source_mapping == source_mapping) ) {
148         return rec->_src_root;
149       }
150     }
151     return astring::empty_string();
152   }
153
154   // removes a file transfer record by the key fields.
155   bool whack(const octopus_entity &ent, const astring &src,
156       const astring &dest) {
157     for (int i = 0; i < elements(); i++) {
158       const file_transfer_record *rec = get(i);
159       if (rec && (rec->_ent == ent) && (rec->_src_root == src)
160           && (rec->_dest_root == dest) ) {
161         zap(i, i);
162         return true;
163       }
164     }
165     return false;
166   }
167
168   // clean all records for the entity "ent".
169   void whack_all(const octopus_entity &ent) {
170     for (int i = elements() - 1; i >= 0; i--) {
171       const file_transfer_record *rec = get(i);
172       if (rec && (rec->_ent == ent) )
173         zap(i, i);
174     }
175   }
176
177   // removes a file transfer correspondence.
178   bool whack_mapping(const astring &source_mapping) {
179     for (int i = elements() - 1; i >= 0; i--) {
180       const file_transfer_record *rec = get(i);
181       if (rec && (rec->_source_mapping == source_mapping) ) {
182         zap(i, i);
183         return true;
184       }
185     }
186     return false;
187   }
188
189   // returns a string dump of the fields in this list.
190   astring text_form() const {
191     astring to_return;
192     for (int i = 0; i < elements(); i++) {
193       const file_transfer_record *rec = get(i);
194       if (rec)
195         to_return += rec->text_form() + parser_bits::platform_eol_to_chars();
196     }
197     return to_return;
198   }
199 };
200
201 //////////////
202
203 class file_transfer_cleaner : public ethread
204 {
205 public:
206   file_transfer_cleaner(file_transfer_tentacle &parent)
207       : ethread(FTT_CLEANING_INTERVAL, SLACK_INTERVAL), _parent(parent) {}
208
209   virtual void perform_activity(void *formal(ptr)) { _parent.periodic_actions(); }
210
211 private:
212   file_transfer_tentacle &_parent;
213 };
214
215 //////////////
216
217 file_transfer_tentacle::file_transfer_tentacle(int maximum_transfer,
218     file_transfer_tentacle::transfer_modes mode_of_transfer)
219 : tentacle_helper<file_transfer_infoton>
220       (file_transfer_infoton::file_transfer_classifier(), false),
221   _maximum_transfer(maximum_transfer),
222   _transfers(new file_transfer_status),
223   _correspondences(new file_transfer_status),
224   _lock(new mutex),
225   _cleaner(new file_transfer_cleaner(*this)),
226   _mode(mode_of_transfer)
227 {
228   _cleaner->start(NIL);
229 }
230
231 file_transfer_tentacle::~file_transfer_tentacle()
232 {
233   _cleaner->stop();
234   WHACK(_transfers);
235   WHACK(_correspondences);
236   WHACK(_cleaner);
237   WHACK(_lock);
238 }
239
240 astring file_transfer_tentacle::text_form() const
241 {
242   AUTO_LOCK;
243   return _transfers->text_form();
244 }
245
246 void file_transfer_tentacle::expunge(const octopus_entity &to_remove)
247 {
248   AUTO_LOCK;
249   _transfers->whack_all(to_remove);
250 }
251
252 outcome file_transfer_tentacle::add_correspondence
253     (const astring &source_mapping, const astring &source_root,
254      int refresh_interval)
255 {
256 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
257   FUNCDEF("add_correspondence");
258 #endif
259   AUTO_LOCK;
260
261   remove_correspondence(source_mapping);  // clean the old one out first.
262
263   // create new file transfer record to hold this correspondence.
264   file_transfer_record *new_record = new file_transfer_record;
265   new_record->_source_mapping = source_mapping;
266   new_record->_src_root = source_root;
267   new_record->_refresh_interval = refresh_interval;
268   new_record->_local_dir = new directory_tree(source_root);
269 //hmmm: doesn't say anything about a pattern.  do we need to worry about that?
270
271   // check that the directory looked healthy.
272   if (!new_record->_local_dir->good()) {
273     WHACK(new_record);
274     return common::ACCESS_DENIED;
275   }
276 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
277   LOG(astring("adding tree for: ent=") + new_record->_ent.text_form()
278       + " src=" + new_record->_src_root + " dest=" + new_record->_dest_root);
279 #endif
280   // calculate size and checksum info for the directory.
281   new_record->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
282
283 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
284   LOG(astring("done adding tree for: ent=") + new_record->_ent.text_form()
285       + " src=" + new_record->_src_root + " dest=" + new_record->_dest_root);
286 #endif
287
288   _correspondences->append(new_record);
289
290   return OKAY;
291 }
292
293 outcome file_transfer_tentacle::remove_correspondence
294     (const astring &source_mapping)
295 {
296   AUTO_LOCK;
297   if (!_correspondences->whack_mapping(source_mapping))
298     return NOT_FOUND;
299   return OKAY;
300 }
301
302 bool file_transfer_tentacle::get_differences(const octopus_entity &ent,
303     const astring &src, const astring &dest, filename_list &diffs)
304 {
305 //  FUNCDEF("get_differences");
306   diffs.reset();
307   AUTO_LOCK;
308   file_transfer_record *the_rec = _transfers->find(ent, src, dest);
309   if (!the_rec) return false;
310   if (!the_rec->_diffs) return false;  // no diffs listed.
311   diffs = *the_rec->_diffs;
312   return true;
313 }
314
315 bool file_transfer_tentacle::status(const octopus_entity &ent,
316     const astring &src, const astring &dest, double &total_size,
317     int &total_files, double &current_size, int &current_files, bool &done,
318     time_stamp &last_active)
319 {
320 //  FUNCDEF("status");
321   total_size = 0;
322   total_files = 0;
323   current_files = 0;
324   current_size = 0;
325   AUTO_LOCK;
326   file_transfer_record *the_rec = _transfers->find(ent, src, dest);
327   if (!the_rec) return false;
328   done = the_rec->_done;
329   last_active = the_rec->_last_active;
330
331   if (the_rec->_diffs) {
332     the_rec->_diffs->calculate_progress(the_rec->_last_sent._filename,
333         the_rec->_last_sent._byte_start + the_rec->_last_sent._length,
334         current_files, current_size);
335     total_files = the_rec->_diffs->total_files();
336     total_size = the_rec->_diffs->total_size();
337   }
338
339   return true;
340 }
341
342 outcome file_transfer_tentacle::register_file_transfer
343     (const octopus_entity &ent, const astring &src_root,
344     const astring &dest_root, const string_array &includes)
345 {
346 //  FUNCDEF("register_file_transfer");
347   AUTO_LOCK;
348   // make sure that this isn't an existing transfer.  if so, we just update
349   // the status.
350   file_transfer_record *the_rec = _transfers->find(ent, src_root, dest_root);
351   if (!the_rec) {
352     the_rec = new file_transfer_record;
353     the_rec->_src_root = src_root;
354     the_rec->_dest_root = dest_root;
355     the_rec->_ent = ent;
356     the_rec->_includes = includes;
357     _transfers->append(the_rec);  // add the new record.
358   } else {
359     the_rec->_done = false;
360     the_rec->_includes = includes;
361     the_rec->_last_active.reset();  // freshen up the last activity time.
362   }
363   return OKAY;
364 }
365
366 outcome file_transfer_tentacle::cancel_file_transfer(const octopus_entity &ent,
367     const astring &src_root, const astring &dest_root)
368 {
369   AUTO_LOCK;
370   return _transfers->whack(ent, src_root, dest_root)?  OKAY : NOT_FOUND;
371 }
372
373 directory_tree *file_transfer_tentacle::lock_directory(const astring &key)
374 {
375   _lock->lock();
376   file_transfer_record *the_rec = _correspondences->find_mapping(key);
377   if (!the_rec || !the_rec->_local_dir) {
378     _lock->unlock();
379     return NIL;  // unknown transfer.
380   }
381   return the_rec->_local_dir;
382 }
383
384 void file_transfer_tentacle::unlock_directory()
385 {
386   _lock->unlock();
387 }
388
389 bool file_transfer_tentacle::add_path(const astring &key,
390     const astring &new_path)
391 {
392   AUTO_LOCK;
393   file_transfer_record *the_rec = _correspondences->find_mapping(key);
394   if (!the_rec) return false;  // unknown transfer.
395   if (!the_rec->_local_dir) return false;  // not right type.
396   return the_rec->_local_dir->add_path(new_path) == common::OKAY;
397 }
398
399 bool file_transfer_tentacle::remove_path(const astring &key,
400     const astring &old_path)
401 {
402   AUTO_LOCK;
403   file_transfer_record *the_rec = _correspondences->find_mapping(key);
404   if (!the_rec) return false;  // unknown transfer.
405   if (!the_rec->_local_dir) return false;  // not right type.
406   return the_rec->_local_dir->remove_path(old_path) == common::OKAY;
407 }
408
409 void file_transfer_tentacle::periodic_actions()
410 {
411 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
412   FUNCDEF("periodic_actions");
413 #endif
414   AUTO_LOCK;
415
416   // first, we'll clean out old transfers.
417   time_stamp oldest_allowed(-TRANSFER_TIMEOUT);
418     // nothing older than this should be kept.
419   for (int i = _transfers->elements() - 1; i >= 0; i--) {
420     const file_transfer_record *curr = _transfers->get(i);
421     if (curr->_last_active < oldest_allowed) {
422 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
423       LOG(astring("cleaning record for: ent=") + curr->_ent.text_form()
424           + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
425 #endif
426       _transfers->zap(i, i);
427     }
428   }
429
430   // then we'll rescan any trees that are ready for it.
431   for (int i = 0; i < _correspondences->elements(); i++) {
432     file_transfer_record *curr = _correspondences->borrow(i);
433     if (curr->_last_active < time_stamp(-curr->_refresh_interval)) {
434       if (curr->_local_dir) {
435 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
436         LOG(astring("refreshing tree for: ent=") + curr->_ent.text_form()
437             + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
438 #endif
439         WHACK(curr->_local_dir);
440         curr->_local_dir = new directory_tree(curr->_src_root);
441         curr->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
442 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
443         LOG(astring("done refreshing tree for: ent=") + curr->_ent.text_form()
444             + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
445 #endif
446       }
447       curr->_last_active.reset();  // reset our action time.
448     }
449   }
450 }
451
452 outcome file_transfer_tentacle::reconstitute(const string_array &classifier,
453     byte_array &packed_form, infoton * &reformed)
454 {
455   // this method doesn't use the lists, so it doesn't need locking.
456   if (classifier != file_transfer_infoton::file_transfer_classifier())
457     return NO_HANDLER;
458   return reconstituter(classifier, packed_form, reformed,
459       (file_transfer_infoton *)NIL);
460 }
461
462 // the "handle_" methods are thread-safe because the mutex is locked before
463 // their invocations.
464
465 outcome file_transfer_tentacle::handle_tree_compare_request
466     (file_transfer_infoton &req, const octopus_request_id &item_id)
467 {
468   FUNCDEF("handle_tree_compare_request");
469
470   // get the mapping from the specified location on this side.
471   filename splitting(req._src_root);
472   string_array pieces;
473   splitting.separate(pieces);
474   astring source_mapping = pieces[0];
475
476   // patch the name up to find the sub_path for the source.
477   filename source_start;
478   pieces.zap(0, 0);
479   source_start.join(pieces);
480
481   // locate the allowed transfer depot for the mapping they provided.
482   file_transfer_record *mapping_record
483       = _correspondences->find_mapping(source_mapping);
484   if (!mapping_record) {
485     LOG(astring("could not find source mapping of ") + source_mapping);
486     return NOT_FOUND;
487   }
488
489   // unpack the tree that they sent us which describes their local area.
490   directory_tree *dest_tree = new directory_tree;
491   if (!dest_tree->unpack(req._packed_data)) {
492     LOG(astring("could not unpack requester's directory tree"));
493     WHACK(dest_tree);
494     return GARBAGE;
495   }
496
497   string_array requested_names;
498   if (!requested_names.unpack(req._packed_data)) {
499     LOG(astring("could not unpack requester's filename includes"));
500     WHACK(dest_tree);
501     return GARBAGE;
502   }
503
504   // look up to see if this is about something that has already been seen.
505   // we don't want to add a new transfer record if they're already working on
506   // this.  that also lets them do a new tree compare to restart the transfer.
507   file_transfer_record *the_rec = _transfers->find(item_id._entity,
508       req._src_root, req._dest_root);
509   if (!the_rec) {
510     // there was no existing record; we'll create a new one.
511     the_rec = new file_transfer_record;
512     the_rec->_ent = item_id._entity;
513     the_rec->_src_root = req._src_root;
514     the_rec->_dest_root = req._dest_root;
515     _transfers->append(the_rec);
516   } else {
517     // record some activity on this record.
518     the_rec->_done = false;
519     the_rec->_last_active.reset();
520   }
521
522   the_rec->_diffs = new filename_list;
523
524   int how_comp = file_info::EQUAL_NAME;  // the prize for doing nothing.
525   if (_mode & COMPARE_SIZE_AND_TIME)
526     how_comp |= file_info::EQUAL_FILESIZE | file_info::EQUAL_TIMESTAMP;
527   if (_mode & COMPARE_CONTENT_SAMPLE)
528     how_comp |= file_info::EQUAL_CHECKSUM;
529
530   // compare the two trees of files.
531   directory_tree::compare_trees(*mapping_record->_local_dir,
532       source_start.raw(), *dest_tree, astring::empty_string(),
533       *the_rec->_diffs, (file_info::file_similarity)how_comp);
534
535 //LOG(astring("filenames decided as different:\n") + the_rec->_diffs->text_form());
536
537   // now prune the diffs to accord with what they claim they want.
538   if (requested_names.length()) {
539     for (int i = the_rec->_diffs->elements() - 1; i >= 0; i--) {
540       filename diff_curr = *the_rec->_diffs->get(i);
541       bool found = false;
542       for (int j = 0; j < requested_names.length(); j++) {
543         filename req_curr(requested_names[j]);
544         if (req_curr.compare_suffix(diff_curr)) {
545           found = true;
546 //LOG(astring("will use: ") + req_curr);
547           break;
548         }
549       }
550       if (!found) the_rec->_diffs->zap(i, i);
551     }
552   }
553
554   req._packed_data.reset();  // clear out existing stuff before cloning.
555   file_transfer_infoton *reply = dynamic_cast<file_transfer_infoton *>(req.clone());
556   the_rec->_diffs->pack(reply->_packed_data);
557
558 //hmmm: does the other side really need the list of filenames?  i guess we
559 //      could check validity of what's transferred or check space available
560 //      before the client starts the transfer.
561
562   reply->_request = false;  // it's a response now.
563   store_product(reply, item_id);
564     // send back the comparison list.
565
566   return OKAY;
567 }
568
569 outcome file_transfer_tentacle::handle_tree_compare_response
570     (file_transfer_infoton &resp, const octopus_request_id &item_id)
571 {
572   FUNCDEF("handle_tree_compare_response");
573   file_transfer_record *the_rec = _transfers->find(item_id._entity,
574       resp._src_root, resp._dest_root);
575   if (!the_rec) {
576     LOG(astring("could not find the record for this transfer: item=")
577         + item_id.text_form() + " src=" + resp._src_root + " dest="
578         + resp._dest_root);
579     return NOT_FOUND;  // not registered, so reject it.
580   }
581
582   the_rec->_last_active.reset();  // record some activity on this record.
583
584   filename_list *flist = new filename_list;
585   if (!flist->unpack(resp._packed_data)) {
586     WHACK(flist);
587     return GARBAGE;
588   }
589
590 //hmmm: verify space on device?
591
592   the_rec->_diffs = flist;  // set the list of differences.
593   return OKAY;
594 }
595
596 outcome file_transfer_tentacle::handle_storage_request
597     (file_transfer_infoton &req, const octopus_request_id &item_id)
598 {
599   FUNCDEF("handle_storage_request");
600   if (_mode & ONLY_REPORT_DIFFS) {
601     // store an unhandled infoton.
602     unhandled_request *deny = new unhandled_request(item_id, req.classifier(),
603         NO_HANDLER);
604     store_product(deny, item_id);
605     return NO_HANDLER;
606   }
607
608   // look up the transfer record.
609   file_transfer_record *the_rec = _transfers->find(item_id._entity,
610       req._src_root, req._dest_root);
611   if (!the_rec) {
612     LOG(astring("could not find the record for this transfer: item=")
613         + item_id.text_form() + " src=" + req._src_root + " dest="
614         + req._dest_root);
615     return NOT_FOUND;  // not registered, so reject it.
616   }
617
618   the_rec->_last_active.reset();  // mark it as still active.
619
620   file_transfer_infoton *resp = dynamic_cast<file_transfer_infoton *>(req.clone());
621
622   if (!the_rec->_diffs) return BAD_INPUT;  // wrong type of object.
623
624   outcome bufret = heavy_file_operations::buffer_files
625       (_correspondences->translate(the_rec->_src_root), *the_rec->_diffs,
626       the_rec->_last_sent, resp->_packed_data, _maximum_transfer);
627   if (bufret != OKAY) {
628     // complain, but still send.
629     LOG(astring("buffer files returned an error on item=")
630         + item_id.text_form() + " src=" + req._src_root + " dest="
631         + req._dest_root);
632   }
633
634   if ( (bufret == OKAY) && !resp->_packed_data.length() ) {
635     // seems like the transfer is done.
636
637     the_rec->_done = true;
638 //hmmm: mark the record and time out faster?
639   }
640
641   resp->_request = false;  // it's a response now.
642   store_product(resp, item_id);
643   return bufret;
644 }
645
646 outcome file_transfer_tentacle::handle_storage_response
647     (file_transfer_infoton &resp, const octopus_request_id &item_id)
648 {
649   FUNCDEF("handle_storage_response");
650   if (_mode & ONLY_REPORT_DIFFS) {
651     // not spoken here.
652     return NO_HANDLER;
653   }
654
655   // look up the transfer record.
656   file_transfer_record *the_rec = _transfers->find(item_id._entity,
657       resp._src_root, resp._dest_root);
658   if (!the_rec) return NOT_FOUND;  // not registered, so reject it.
659
660   the_rec->_last_active.reset();  // mark it as still active.
661
662   if (!resp._packed_data.length()) {
663     // mark that we're done now.
664     the_rec->_done = true;
665   }
666
667   // chew on all the things they sent us.
668   while (resp._packed_data.length()) {
669     file_time empty;
670     file_transfer_header found(empty);
671     if (!found.unpack(resp._packed_data)) {
672       // bomb out now.
673       LOG(astring("corruption seen on item=") + item_id.text_form()
674           + " src=" + resp._src_root + " dest=" + resp._dest_root);
675       return GARBAGE;
676     }
677     the_rec->_last_sent = found;
678
679     if (found._length > resp._packed_data.length()) {
680       // another case for leaving--not enough data left in the buffer.
681       LOG(astring("data underflow seen on item=") + item_id.text_form()
682           + " src=" + resp._src_root + " dest=" + resp._dest_root);
683       return GARBAGE;
684     }
685     byte_array to_write = resp._packed_data.subarray(0, found._length - 1);
686     resp._packed_data.zap(0, found._length - 1);
687
688     if (!the_rec->_diffs) return BAD_INPUT;
689
690     const file_info *recorded_info = the_rec->_diffs->find(found._filename);
691     if (!recorded_info) {
692       LOG(astring("unrequested file seen: ") + found._filename);
693       continue;  // maybe there are others that aren't confused.
694     }
695
696     astring full_file = resp._dest_root + filename::default_separator()
697         + recorded_info->secondary();
698
699     outcome ret = heavy_file_operations::write_file_chunk(full_file,
700         found._byte_start, to_write);
701     if (ret != OKAY) {
702       LOG(astring("failed to write file chunk: error=")
703           + heavy_file_operations::outcome_name(ret) + " file=" + full_file
704           + a_sprintf(" start=%d len=%d", found._byte_start, found._length));
705     }
706     found._time.set_time(full_file);
707   }
708
709   // there is no response product to store.
710   return OKAY;
711 }
712
713 // this is the only method that is allowed to invoke the "handle_X" methods
714 // and it must lock the object beforehand.
715
716 outcome file_transfer_tentacle::consume(infoton &to_chow,
717     const octopus_request_id &item_id, byte_array &transformed)
718 {
719 //  FUNCDEF("consume");
720   transformed.reset();
721   file_transfer_infoton *inf = dynamic_cast<file_transfer_infoton *>(&to_chow);
722   if (!inf) return DISALLOWED;  // not for us.
723
724   AUTO_LOCK;  // protect our lists while we're working on them.
725
726   switch (inf->_command) {
727     case file_transfer_infoton::TREE_COMPARISON: {
728       if (inf->_request) return handle_tree_compare_request(*inf, item_id);
729       else return handle_tree_compare_response(*inf, item_id);
730     }
731     case file_transfer_infoton::PLACE_FILE_CHUNKS: {
732       if (inf->_request) return handle_storage_request(*inf, item_id);
733       else return handle_storage_response(*inf, item_id);
734     }
735   }
736   return BAD_INPUT;  // not a recognized command.
737 }
738
739 outcome file_transfer_tentacle::refresh_now(const astring &source_mapping)
740 {
741 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
742   FUNCDEF("refresh_now");
743 #endif
744   AUTO_LOCK;
745   for (int i = 0; i < _correspondences->elements(); i++) {
746     file_transfer_record *curr = _correspondences->borrow(i);
747     if (!curr) continue;
748     if (curr->_source_mapping != source_mapping) continue;
749     if (curr->_local_dir) {
750 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
751       LOG(astring("refreshing tree for: ent=") + curr->_ent.text_form()
752           + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
753 #endif
754       WHACK(curr->_local_dir);
755       curr->_local_dir = new directory_tree(curr->_src_root);
756       curr->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
757 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
758       LOG(astring("done refreshing tree for: ent=") + curr->_ent.text_form()
759           + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
760 #endif
761     }
762     curr->_last_active.reset();  // reset our action time.
763     return OKAY;
764   }
765   return NOT_FOUND;
766 }
767
768 } //namespace.
769
770