fixing extensions
[feisty_meow.git] / octopi / library / tentacles / file_transfer_tentacle.cpp
1 /*****************************************************************************\
2 *                                                                             *
3 *  Name   : file_transfer_tentacle                                            *
4 *  Author : Chris Koeritz                                                     *
5 *                                                                             *
6 *******************************************************************************
7 * Copyright (c) 2005-$now By Author.  This program is free software; you can  *
8 * redistribute it and/or modify it under the terms of the GNU General Public  *
9 * License as published by the Free Software Foundation; either version 2 of   *
10 * the License or (at your option) any later version.  This is online at:      *
11 *     http://www.fsf.org/copyleft/gpl.html                                    *
12 * Please send any updates to: fred@gruntose.com                               *
13 \*****************************************************************************/
14
15 #include "file_transfer_tentacle.h"
16
17 #include <basis/mutex.h>
18 #include <filesystem/directory_tree.h>
19 #include <filesystem/filename.h>
20 #include <filesystem/filename_list.h>
21 #include <filesystem/heavy_file_ops.h>
22 #include <loggers/program_wide_logger.h>
23 #include <octopus/entity_defs.h>
24 #include <octopus/entity_data_bin.h>
25 #include <octopus/unhandled_request.h>
26 #include <processes/ethread.h>
27 #include <textual/parser_bits.h>
28
29 using namespace basis;
30 using namespace filesystem;
31 using namespace loggers;
32 using namespace octopi;
33 using namespace processes;
34 using namespace structures;
35 using namespace textual;
36 using namespace timely;
37
38 namespace octopi {
39
40 #undef AUTO_LOCK
41 #define AUTO_LOCK auto_synchronizer loc(*_lock);
42   // protects our lists.
43
44 const int FTT_CLEANING_INTERVAL = 30 * SECOND_ms;
45   // this is how frequently we clean up the list to remove outdated transfers.
46
47 const int TRANSFER_TIMEOUT = 10 * MINUTE_ms;
48   // if it hasn't been touched in this long, it's out of there.
49
50 //#define DEBUG_FILE_TRANSFER_TENTACLE
51   // uncomment for noisier version.
52
53 #undef LOG
54 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
55
56 //////////////
57
58 class file_transfer_record 
59 {
60 public:
61   // valid for both transfers and correspondences.
62   astring _src_root;  // where the info is on the data provider.
63   time_stamp _last_active;  // when this was last used.
64
65   // valid for file transfers only.
66   octopus_entity _ent;  // the entity requesting this service.
67   astring _dest_root;  // where the info is on the data sink.
68   filename_list *_diffs;  // the differences to be transferred.
69   file_transfer_header _last_sent;  // the last chunk that was sent.
70   bool _done;  // true if the transfer is finished.
71   string_array _includes;  // the set to include.
72
73   // valid for correspondence records only.
74   directory_tree *_local_dir;  // our local information about the transfer.
75   astring _source_mapping;  // valid for a correspondence record.
76   int _refresh_interval;  // the rate of refreshing the source tree.
77
78   file_transfer_record() : _diffs(NULL_POINTER), _last_sent(file_time()),
79       _done(false), _local_dir(NULL_POINTER)
80   {}
81
82   ~file_transfer_record() {
83     WHACK(_local_dir);
84     WHACK(_diffs);
85   }
86
87   astring text_form() const {
88     astring to_return;
89     to_return += astring("src=") + _src_root + astring(" last act=")
90         + _last_active.text_form();
91     if (_ent.blank()) to_return += astring(" ent=") + _ent.text_form();
92     if (_dest_root.t()) {
93       to_return += astring(" dest=") + _dest_root;
94       to_return += astring(" last_sent=") + _last_sent.text_form();
95     }
96     return to_return;
97   }
98 };
99
100 //////////////
101
102 // this implementation assumes that the same entity will never simultaneously
103 // transfer the same source to the same destination.  that assumption holds
104 // up fine for different clients, since they should have different entities.
105 // when there is a collision on the entity/src/dest, then the default action
106 // is to assume that the transfer is just being started over.
107
108 class file_transfer_status : public amorph<file_transfer_record>
109 {
110 public:
111   // find a transfer record by the key fields.
112   file_transfer_record *find(const octopus_entity &ent, const astring &src,
113       const astring &dest) {
114     for (int i = 0; i < elements(); i++) {
115       const file_transfer_record *rec = get(i);
116       if (rec && (rec->_ent == ent) && (rec->_src_root == src)
117           && (rec->_dest_root == dest) ) {
118         return borrow(i);
119       }
120     }
121     return NULL_POINTER;
122   }
123
124   virtual ~file_transfer_status() {}
125
126   DEFINE_CLASS_NAME("file_transfer_status");
127
128   // find a file correspondence record by the mapping name.
129   file_transfer_record *find_mapping(const astring &source_mapping) {
130     for (int i = 0; i < elements(); i++) {
131       const file_transfer_record *rec = get(i);
132       if (rec && (rec->_source_mapping == source_mapping) )
133         return borrow(i);
134     }
135     return NULL_POINTER;
136   }
137
138   // turns a source mapping into the location that it corresponds to.
139   astring translate(const astring &source_path) const {
140     FUNCDEF("translate");
141     string_array pieces;
142     bool rooted;
143     filename(source_path).separate(rooted, pieces);
144     astring source_mapping = pieces[0];
145     pieces.zap(0, 0);  // remove source part.
146
147     for (int i = 0; i < elements(); i++) {
148       const file_transfer_record *rec = get(i);
149       if (rec && (rec->_source_mapping == source_mapping) ) {
150         return rec->_src_root;
151       }
152     }
153     return astring::empty_string();
154   }
155
156   // removes a file transfer record by the key fields.
157   bool whack(const octopus_entity &ent, const astring &src,
158       const astring &dest) {
159     for (int i = 0; i < elements(); i++) {
160       const file_transfer_record *rec = get(i);
161       if (rec && (rec->_ent == ent) && (rec->_src_root == src)
162           && (rec->_dest_root == dest) ) {
163         zap(i, i);
164         return true;
165       }
166     }
167     return false;
168   }
169
170   // clean all records for the entity "ent".
171   void whack_all(const octopus_entity &ent) {
172     for (int i = elements() - 1; i >= 0; i--) {
173       const file_transfer_record *rec = get(i);
174       if (rec && (rec->_ent == ent) )
175         zap(i, i);
176     }
177   }
178
179   // removes a file transfer correspondence.
180   bool whack_mapping(const astring &source_mapping) {
181     for (int i = elements() - 1; i >= 0; i--) {
182       const file_transfer_record *rec = get(i);
183       if (rec && (rec->_source_mapping == source_mapping) ) {
184         zap(i, i);
185         return true;
186       }
187     }
188     return false;
189   }
190
191   // returns a string dump of the fields in this list.
192   astring text_form() const {
193     astring to_return;
194     for (int i = 0; i < elements(); i++) {
195       const file_transfer_record *rec = get(i);
196       if (rec)
197         to_return += rec->text_form() + parser_bits::platform_eol_to_chars();
198     }
199     return to_return;
200   }
201 };
202
203 //////////////
204
205 class file_transfer_cleaner : public ethread
206 {
207 public:
208   file_transfer_cleaner(file_transfer_tentacle &parent)
209       : ethread(FTT_CLEANING_INTERVAL, SLACK_INTERVAL), _parent(parent) {}
210
211   virtual void perform_activity(void *formal(ptr)) { _parent.periodic_actions(); }
212
213 private:
214   file_transfer_tentacle &_parent;
215 };
216
217 //////////////
218
219 file_transfer_tentacle::file_transfer_tentacle(int maximum_transfer,
220     file_transfer_tentacle::transfer_modes mode_of_transfer)
221 : tentacle_helper<file_transfer_infoton>
222       (file_transfer_infoton::file_transfer_classifier(), false),
223   _maximum_transfer(maximum_transfer),
224   _transfers(new file_transfer_status),
225   _correspondences(new file_transfer_status),
226   _lock(new mutex),
227   _cleaner(new file_transfer_cleaner(*this)),
228   _mode(mode_of_transfer)
229 {
230   _cleaner->start(NULL_POINTER);
231 }
232
233 file_transfer_tentacle::~file_transfer_tentacle()
234 {
235   _cleaner->stop();
236   WHACK(_transfers);
237   WHACK(_correspondences);
238   WHACK(_cleaner);
239   WHACK(_lock);
240 }
241
242 astring file_transfer_tentacle::text_form() const
243 {
244   AUTO_LOCK;
245   return _transfers->text_form();
246 }
247
248 void file_transfer_tentacle::expunge(const octopus_entity &to_remove)
249 {
250   AUTO_LOCK;
251   _transfers->whack_all(to_remove);
252 }
253
254 outcome file_transfer_tentacle::add_correspondence
255     (const astring &source_mapping, const astring &source_root,
256      int refresh_interval)
257 {
258   FUNCDEF("add_correspondence");
259   AUTO_LOCK;
260
261   remove_correspondence(source_mapping);  // clean the old one out first.
262
263   // create new file transfer record to hold this correspondence.
264   file_transfer_record *new_record = new file_transfer_record;
265   new_record->_source_mapping = source_mapping;
266   new_record->_src_root = source_root;
267   new_record->_refresh_interval = refresh_interval;
268   new_record->_local_dir = new directory_tree(source_root);
269 //hmmm: doesn't say anything about a pattern.  do we need to worry about that?
270
271   // check that the directory looked healthy.
272   if (!new_record->_local_dir->good()) {
273     WHACK(new_record);
274     return common::ACCESS_DENIED;
275   }
276 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
277   LOG(astring("adding tree for: ent=") + new_record->_ent.text_form()
278       + " src=" + new_record->_src_root);
279 #endif
280   // calculate size and checksum info for the directory.
281   new_record->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
282
283 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
284   LOG(astring("done adding tree for: ent=") + new_record->_ent.text_form()
285       + " src=" + new_record->_src_root);
286 #endif
287
288   _correspondences->append(new_record);
289
290   return OKAY;
291 }
292
293 outcome file_transfer_tentacle::remove_correspondence
294     (const astring &source_mapping)
295 {
296   AUTO_LOCK;
297   if (!_correspondences->whack_mapping(source_mapping))
298     return NOT_FOUND;
299   return OKAY;
300 }
301
302 bool file_transfer_tentacle::get_differences(const octopus_entity &ent,
303     const astring &src, const astring &dest, filename_list &diffs)
304 {
305   FUNCDEF("get_differences");
306   diffs.reset();
307   AUTO_LOCK;
308   file_transfer_record *the_rec = _transfers->find(ent, src, dest);
309   if (!the_rec) return false;
310   if (!the_rec->_diffs) return false;  // no diffs listed.
311   diffs = *the_rec->_diffs;
312   return true;
313 }
314
315 bool file_transfer_tentacle::status(const octopus_entity &ent,
316     const astring &src, const astring &dest, double &total_size,
317     int &total_files, double &current_size, int &current_files, bool &done,
318     time_stamp &last_active)
319 {
320   FUNCDEF("status");
321   total_size = 0;
322   total_files = 0;
323   current_files = 0;
324   current_size = 0;
325   AUTO_LOCK;
326   file_transfer_record *the_rec = _transfers->find(ent, src, dest);
327   if (!the_rec) return false;
328   done = the_rec->_done;
329   last_active = the_rec->_last_active;
330
331   if (the_rec->_diffs) {
332     the_rec->_diffs->calculate_progress(the_rec->_last_sent._filename,
333         the_rec->_last_sent._byte_start + the_rec->_last_sent._length,
334         current_files, current_size);
335     total_files = the_rec->_diffs->total_files();
336     total_size = the_rec->_diffs->total_size();
337   }
338
339   return true;
340 }
341
342 outcome file_transfer_tentacle::register_file_transfer
343     (const octopus_entity &ent, const astring &src_root,
344     const astring &dest_root, const string_array &includes)
345 {
346   FUNCDEF("register_file_transfer");
347   AUTO_LOCK;
348   // make sure that this isn't an existing transfer.  if so, we just update
349   // the status.
350   file_transfer_record *the_rec = _transfers->find(ent, src_root, dest_root);
351   if (!the_rec) {
352     the_rec = new file_transfer_record;
353     the_rec->_src_root = src_root;
354     the_rec->_dest_root = dest_root;
355     the_rec->_ent = ent;
356     the_rec->_includes = includes;
357     _transfers->append(the_rec);  // add the new record.
358   } else {
359     the_rec->_done = false;
360     the_rec->_includes = includes;
361     the_rec->_last_active.reset();  // freshen up the last activity time.
362   }
363   return OKAY;
364 }
365
366 outcome file_transfer_tentacle::cancel_file_transfer(const octopus_entity &ent,
367     const astring &src_root, const astring &dest_root)
368 {
369   AUTO_LOCK;
370   return _transfers->whack(ent, src_root, dest_root)?  OKAY : NOT_FOUND;
371 }
372
373 directory_tree *file_transfer_tentacle::lock_directory(const astring &key)
374 {
375   _lock->lock();
376   file_transfer_record *the_rec = _correspondences->find_mapping(key);
377   if (!the_rec || !the_rec->_local_dir) {
378     _lock->unlock();
379     return NULL_POINTER;  // unknown transfer.
380   }
381   return the_rec->_local_dir;
382 }
383
384 void file_transfer_tentacle::unlock_directory()
385 {
386   _lock->unlock();
387 }
388
389 bool file_transfer_tentacle::add_path(const astring &key,
390     const astring &new_path)
391 {
392   AUTO_LOCK;
393   file_transfer_record *the_rec = _correspondences->find_mapping(key);
394   if (!the_rec) return false;  // unknown transfer.
395   if (!the_rec->_local_dir) return false;  // not right type.
396   return the_rec->_local_dir->add_path(new_path) == common::OKAY;
397 }
398
399 bool file_transfer_tentacle::remove_path(const astring &key,
400     const astring &old_path)
401 {
402   AUTO_LOCK;
403   file_transfer_record *the_rec = _correspondences->find_mapping(key);
404   if (!the_rec) return false;  // unknown transfer.
405   if (!the_rec->_local_dir) return false;  // not right type.
406   return the_rec->_local_dir->remove_path(old_path) == common::OKAY;
407 }
408
409 void file_transfer_tentacle::periodic_actions()
410 {
411   FUNCDEF("periodic_actions");
412   AUTO_LOCK;
413
414   // first, we'll clean out old transfers.
415   time_stamp oldest_allowed(-TRANSFER_TIMEOUT);
416     // nothing older than this should be kept.
417   for (int i = _transfers->elements() - 1; i >= 0; i--) {
418     const file_transfer_record *curr = _transfers->get(i);
419     if (curr->_last_active < oldest_allowed) {
420 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
421       LOG(astring("cleaning record for: ent=") + curr->_ent.text_form()
422           + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
423 #endif
424       _transfers->zap(i, i);
425     }
426   }
427
428   // then we'll rescan any trees that are ready for it.
429   for (int i = 0; i < _correspondences->elements(); i++) {
430     file_transfer_record *curr = _correspondences->borrow(i);
431     if (curr->_last_active < time_stamp(-curr->_refresh_interval)) {
432       if (curr->_local_dir) {
433 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
434         LOG(astring("refreshing tree for: ent=") + curr->_ent.text_form()
435             + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
436 #endif
437         WHACK(curr->_local_dir);
438         curr->_local_dir = new directory_tree(curr->_src_root);
439         curr->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
440 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
441         LOG(astring("done refreshing tree for: ent=") + curr->_ent.text_form()
442             + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
443 #endif
444       }
445       curr->_last_active.reset();  // reset our action time.
446     }
447   }
448 }
449
450 outcome file_transfer_tentacle::reconstitute(const string_array &classifier,
451     byte_array &packed_form, infoton * &reformed)
452 {
453   // this method doesn't use the lists, so it doesn't need locking.
454   if (classifier != file_transfer_infoton::file_transfer_classifier())
455     return NO_HANDLER;
456   return reconstituter(classifier, packed_form, reformed,
457       (file_transfer_infoton *)NULL_POINTER);
458 }
459
460 // the "handle_" and "conclude_" methods are thread-safe because the mutex is locked before
461 // their invocations.
462
463 basis::outcome file_transfer_tentacle::handle_build_target_tree_request(file_transfer_infoton &req,
464     const octopus_request_id &item_id)
465 {
466   FUNCDEF("handle_build_target_tree_request");
467
468   // get the mapping from the specified location on this side.
469   filename splitting(req._src_root);
470   string_array pieces;
471   bool rooted;
472   splitting.separate(rooted, pieces);
473   astring source_mapping = pieces[0];
474
475   // patch the name up to find the sub_path for the source.
476   filename source_start;
477   pieces.zap(0, 0);
478   source_start.join(rooted, pieces);
479
480   // locate the allowed transfer depot for the mapping they provided.
481   file_transfer_record *mapping_record = _correspondences->find_mapping(source_mapping);
482   if (!mapping_record) {
483     LOG(astring("could not find source mapping of ") + source_mapping);
484     return NOT_FOUND;
485   }
486
487   // unpack the tree that they sent us which describes their local area.
488   directory_tree *dest_tree = new directory_tree;
489   if (!dest_tree->unpack(req._packed_data)) {
490     LOG(astring("could not unpack requester's directory tree"));
491     WHACK(dest_tree);
492     return GARBAGE;
493   }
494
495   string_array requested_names;
496   if (!requested_names.unpack(req._packed_data)) {
497     LOG(astring("could not unpack requester's filename includes"));
498     WHACK(dest_tree);
499     return GARBAGE;
500   }
501
502   // look up to see if this is about something that has already been seen.
503   // we don't want to add a new transfer record if they're already working on
504   // this.  that also lets them do a new tree compare to restart the transfer.
505   file_transfer_record *the_rec = _transfers->find(item_id._entity,
506       req._src_root, req._dest_root);
507   if (!the_rec) {
508     // there was no existing record; we'll create a new one.
509     the_rec = new file_transfer_record;
510     the_rec->_ent = item_id._entity;
511     the_rec->_src_root = req._src_root;
512     the_rec->_dest_root = req._dest_root;
513     _transfers->append(the_rec);
514   } else {
515     // record some activity on this record.
516     the_rec->_done = false;
517     the_rec->_last_active.reset();
518   }
519
520   // create any directories that are missing at this point.
521   basis::outcome result = dest_tree->make_directories(req._dest_root);
522   if (result != common::OKAY) {
523     LOG("ERROR: got bad result from make_directories!");
524   }
525
526   req._packed_data.reset();  // clear out existing stuff before cloning.
527   file_transfer_infoton *reply = dynamic_cast<file_transfer_infoton *>(req.clone());
528
529   reply->_request = false;  // it's a response now.
530   reply->_success = result;
531   store_product(reply, item_id);
532     // send back the comparison list.
533
534   return OKAY;
535 }
536
537 basis::outcome file_transfer_tentacle::handle_build_target_tree_response(file_transfer_infoton &resp,
538     const octopus_request_id &item_id)
539 {
540 //go to next step, tree comparison.
541 //look at the handle tree compare response for help though.
542   FUNCDEF("handle_build_target_tree_response");
543   file_transfer_record *the_rec = _transfers->find(item_id._entity,
544       resp._src_root, resp._dest_root);
545   if (!the_rec) {
546     LOG(astring("could not find the record for this transfer: item=")
547         + item_id.text_form() + " src=" + resp._src_root + " dest="
548         + resp._dest_root);
549     return NOT_FOUND;  // not registered, so reject it.
550   }
551
552   the_rec->_last_active.reset();  // record some activity on this record.
553
554   return resp._success;
555 }
556
557 outcome file_transfer_tentacle::handle_tree_compare_request
558     (file_transfer_infoton &req, const octopus_request_id &item_id)
559 {
560   FUNCDEF("handle_tree_compare_request");
561
562   // get the mapping from the specified location on this side.
563   filename splitting(req._src_root);
564   string_array pieces;
565   bool rooted;
566   splitting.separate(rooted, pieces);
567   astring source_mapping = pieces[0];
568
569   // patch the name up to find the sub_path for the source.
570   filename source_start;
571   pieces.zap(0, 0);
572   source_start.join(rooted, pieces);
573
574   // locate the allowed transfer depot for the mapping they provided.
575   file_transfer_record *mapping_record
576       = _correspondences->find_mapping(source_mapping);
577   if (!mapping_record) {
578     LOG(astring("could not find source mapping of ") + source_mapping);
579     return NOT_FOUND;
580   }
581
582   // unpack the tree that they sent us which describes their local area.
583   directory_tree *dest_tree = new directory_tree;
584   if (!dest_tree->unpack(req._packed_data)) {
585     LOG(astring("could not unpack requester's directory tree"));
586     WHACK(dest_tree);
587     return GARBAGE;
588   }
589
590   string_array requested_names;
591   if (!requested_names.unpack(req._packed_data)) {
592     LOG(astring("could not unpack requester's filename includes"));
593     WHACK(dest_tree);
594     return GARBAGE;
595   }
596
597   // look up to see if this is about something that has already been seen.
598   // we don't want to add a new transfer record if they're already working on
599   // this.  that also lets them do a new tree compare to restart the transfer.
600   file_transfer_record *the_rec = _transfers->find(item_id._entity,
601       req._src_root, req._dest_root);
602   if (!the_rec) {
603     // there was no existing record; we'll create a new one.
604     the_rec = new file_transfer_record;
605     the_rec->_ent = item_id._entity;
606     the_rec->_src_root = req._src_root;
607     the_rec->_dest_root = req._dest_root;
608     _transfers->append(the_rec);
609   } else {
610     // record some activity on this record.
611     the_rec->_done = false;
612     the_rec->_last_active.reset();
613   }
614
615   the_rec->_diffs = new filename_list;
616
617   int how_comp = file_info::EQUAL_NAME;  // the prize for doing nothing.
618   if (_mode & COMPARE_SIZE_AND_TIME)
619     how_comp |= file_info::EQUAL_FILESIZE | file_info::EQUAL_TIMESTAMP;
620   if (_mode & COMPARE_CONTENT_SAMPLE)
621     how_comp |= file_info::EQUAL_CHECKSUM;
622
623   // compare the two trees of files.
624   directory_tree::compare_trees(*mapping_record->_local_dir,
625       source_start.raw(), *dest_tree, astring::empty_string(),
626       *the_rec->_diffs, (file_info::file_similarity)how_comp);
627
628 //LOG(astring("filenames decided as different:\n") + the_rec->_diffs->text_form());
629
630   // now prune the diffs to accord with what they claim they want.
631   if (requested_names.length()) {
632     for (int i = the_rec->_diffs->elements() - 1; i >= 0; i--) {
633       filename diff_curr = *the_rec->_diffs->get(i);
634       bool found = false;
635       for (int j = 0; j < requested_names.length(); j++) {
636         filename req_curr(requested_names[j]);
637         if (req_curr.compare_suffix(diff_curr)) {
638           found = true;
639           break;
640         }
641       }
642       if (!found) the_rec->_diffs->zap(i, i);
643     }
644   }
645
646   req._packed_data.reset();  // clear out existing stuff before cloning.
647   file_transfer_infoton *reply = dynamic_cast<file_transfer_infoton *>(req.clone());
648   the_rec->_diffs->pack(reply->_packed_data);
649
650 //hmmm: does the other side really need the list of filenames?  i guess we
651 //      could check validity of what's transferred or check space available
652 //      before the client starts the transfer.
653
654   reply->_request = false;  // it's a response now.
655   store_product(reply, item_id);
656     // send back the comparison list.
657
658   return OKAY;
659 }
660
661 outcome file_transfer_tentacle::handle_tree_compare_response
662     (file_transfer_infoton &resp, const octopus_request_id &item_id)
663 {
664   FUNCDEF("handle_tree_compare_response");
665   file_transfer_record *the_rec = _transfers->find(item_id._entity,
666       resp._src_root, resp._dest_root);
667   if (!the_rec) {
668     LOG(astring("could not find the record for this transfer: item=")
669         + item_id.text_form() + " src=" + resp._src_root + " dest="
670         + resp._dest_root);
671     return NOT_FOUND;  // not registered, so reject it.
672   }
673
674   the_rec->_last_active.reset();  // record some activity on this record.
675
676   filename_list *flist = new filename_list;
677   if (!flist->unpack(resp._packed_data)) {
678     WHACK(flist);
679     return GARBAGE;
680   }
681
682 //hmmm: verify space on device?
683
684   the_rec->_diffs = flist;  // set the list of differences.
685   return OKAY;
686 }
687
688 outcome file_transfer_tentacle::handle_storage_request
689     (file_transfer_infoton &req, const octopus_request_id &item_id)
690 {
691   FUNCDEF("handle_storage_request");
692   if (_mode & ONLY_REPORT_DIFFS) {
693     // store an unhandled infoton.
694     unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
695     store_product(deny, item_id);
696     return NO_HANDLER;
697   }
698
699   // look up the transfer record.
700   file_transfer_record *the_rec = _transfers->find(item_id._entity,
701       req._src_root, req._dest_root);
702   if (!the_rec) {
703     LOG(astring("could not find the record for this transfer: item=")
704         + item_id.text_form() + " src=" + req._src_root + " dest="
705         + req._dest_root);
706     return NOT_FOUND;  // not registered, so reject it.
707   }
708
709   the_rec->_last_active.reset();  // mark it as still active.
710
711   file_transfer_infoton *resp = dynamic_cast<file_transfer_infoton *>(req.clone());
712
713   if (!the_rec->_diffs) return BAD_INPUT;  // wrong type of object.
714
715   outcome bufret = heavy_file_operations::buffer_files
716       (_correspondences->translate(the_rec->_src_root), *the_rec->_diffs,
717       the_rec->_last_sent, resp->_packed_data, _maximum_transfer);
718   if (bufret == heavy_file_operations::FINISHED) {
719     bufret = OKAY;  // in either case, we don't emit a finished outcome; handled elsewhere.
720     if (!resp->_packed_data.length()) {
721       // blank packages, so finish by setting command to be a conclude marker.
722       the_rec->_done = true;
723       resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER;
724     }
725   } else if (bufret != OKAY) {
726     // complain, but still send.
727     LOG(astring("buffer files returned an error on item=")
728         + item_id.text_form() + " src=" + req._src_root + " dest="
729         + req._dest_root);
730   }
731
732 //can remove this block if stops saying it.
733   if ((bufret == OKAY) && !resp->_packed_data.length() ) {
734     LOG("marking empty transfer as done; why not caught above at FINISHED check?");
735     the_rec->_done = true;
736     resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER;
737   }
738 //end of can remove.
739
740   resp->_request = false;  // it's a response now.
741   store_product(resp, item_id);
742   return bufret;
743 }
744
745 outcome file_transfer_tentacle::handle_storage_response
746     (file_transfer_infoton &resp, const octopus_request_id &item_id)
747 {
748   FUNCDEF("handle_storage_response");
749   if (_mode & ONLY_REPORT_DIFFS) {
750     // not spoken here.
751     return NO_HANDLER;
752   }
753
754   // look up the transfer record.
755   file_transfer_record *the_rec = _transfers->find(item_id._entity,
756       resp._src_root, resp._dest_root);
757   if (!the_rec) return NOT_FOUND;  // not registered, so reject it.
758
759   the_rec->_last_active.reset();  // mark it as still active.
760
761   if (!resp._packed_data.length()) {
762     // mark that we're done now.
763     the_rec->_done = true;
764   }
765
766   // chew on all the things they sent us.
767   while (resp._packed_data.length()) {
768     file_time empty;
769     file_transfer_header found(empty);
770     if (!found.unpack(resp._packed_data)) {
771       // bomb out now.
772       LOG(astring("corruption seen on item=") + item_id.text_form()
773           + " src=" + resp._src_root + " dest=" + resp._dest_root);
774       return GARBAGE;
775     }
776     the_rec->_last_sent = found;
777
778     if (found._length > resp._packed_data.length()) {
779       // another case for leaving--not enough data left in the buffer.
780       LOG(astring("data underflow seen on item=") + item_id.text_form()
781           + " src=" + resp._src_root + " dest=" + resp._dest_root);
782       return GARBAGE;
783     }
784     byte_array to_write = resp._packed_data.subarray(0, found._length - 1);
785     resp._packed_data.zap(0, found._length - 1);
786
787     if (!the_rec->_diffs) return BAD_INPUT;
788
789     const file_info *recorded_info = the_rec->_diffs->find(found._filename);
790     if (!recorded_info) {
791       LOG(astring("unrequested file seen: ") + found._filename);
792       continue;  // maybe there are others that aren't confused.
793     }
794
795     astring full_file = resp._dest_root + filename::default_separator()
796         + recorded_info->secondary();
797 //     LOG(astring("telling it to write to fullfile: ") + full_file);
798
799     outcome ret = heavy_file_operations::write_file_chunk(full_file,
800         found._byte_start, to_write);
801     if (ret != OKAY) {
802       LOG(astring("failed to write file chunk: error=")
803           + heavy_file_operations::outcome_name(ret) + " file=" + full_file
804           + a_sprintf(" start=%d len=%d", found._byte_start, found._length));
805     }
806     found._time.set_time(full_file);
807   }
808
809   // there is no response product to store.
810   return OKAY;
811 }
812
813 outcome file_transfer_tentacle::conclude_storage_request
814     (file_transfer_infoton &req, const octopus_request_id &item_id)
815 {
816   FUNCDEF("conclude_storage_request");
817   if (_mode & ONLY_REPORT_DIFFS) {
818     // store an unhandled infoton.
819     unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
820     store_product(deny, item_id);
821     return NO_HANDLER;
822   }
823
824   // look up the transfer record.
825   file_transfer_record *the_rec = _transfers->find(item_id._entity,
826       req._src_root, req._dest_root);
827   if (!the_rec) {
828     LOG(astring("could not find the record for this transfer: item=")
829         + item_id.text_form() + " src=" + req._src_root + " dest="
830         + req._dest_root);
831     return NOT_FOUND;  // not registered, so reject it.
832   }
833
834   the_rec->_last_active.reset();  // mark it as still active.
835
836   file_transfer_infoton *resp = dynamic_cast<file_transfer_infoton *>(req.clone());
837
838   if (!the_rec->_diffs) return BAD_INPUT;  // wrong type of object.
839
840   the_rec->_done = true;  // we're concluding the transfer, so that's that.
841   resp->_request = false;  // it's a response now.
842   store_product(resp, item_id);
843
844   LOG(astring("concluding transfer request on src=") + req._src_root + " dest="
845       + req._dest_root);
846
847   return common::OKAY;
848 }
849
850 outcome file_transfer_tentacle::conclude_storage_response
851     (file_transfer_infoton &resp, const octopus_request_id &item_id)
852 {
853   FUNCDEF("conclude_storage_response");
854   if (_mode & ONLY_REPORT_DIFFS) {
855     // not spoken here.
856     return NO_HANDLER;
857   }
858
859   // look up the transfer record.
860   file_transfer_record *the_rec = _transfers->find(item_id._entity,
861       resp._src_root, resp._dest_root);
862   if (!the_rec) return NOT_FOUND;  // not registered, so reject it.
863
864   the_rec->_last_active.reset();  // mark it as still active.
865
866   // mark that we're done now.
867   the_rec->_done = true;
868
869   LOG(astring("concluding transfer response on src=") + resp._src_root + " dest="
870       + resp._dest_root);
871
872   // there is no response product to store.
873   return OKAY;
874 }
875
876 // consume() is the only method that is allowed to invoke the "handle_X" methods
877 // and it must lock the object beforehand.
878
879 outcome file_transfer_tentacle::consume(infoton &to_chow,
880     const octopus_request_id &item_id, byte_array &transformed)
881 {
882   FUNCDEF("consume");
883   transformed.reset();
884   file_transfer_infoton *inf = dynamic_cast<file_transfer_infoton *>(&to_chow);
885   if (!inf) return DISALLOWED;  // not for us.
886
887   AUTO_LOCK;  // protect our lists while we're working on them.
888
889   switch (inf->_command) {
890     case file_transfer_infoton::BUILD_TARGET_TREE: {
891       if (inf->_request) return handle_build_target_tree_request(*inf, item_id);
892       else return handle_build_target_tree_response(*inf, item_id);
893     }
894     case file_transfer_infoton::TREE_COMPARISON: {
895       if (inf->_request) return handle_tree_compare_request(*inf, item_id);
896       else return handle_tree_compare_response(*inf, item_id);
897     }
898     case file_transfer_infoton::PLACE_FILE_CHUNKS: {
899       if (inf->_request) return handle_storage_request(*inf, item_id);
900       else return handle_storage_response(*inf, item_id);
901     }
902     case file_transfer_infoton::CONCLUDE_TRANSFER_MARKER: {
903       if (inf->_request) return conclude_storage_request(*inf, item_id);
904       else return conclude_storage_response(*inf, item_id);
905     }
906   }
907   return BAD_INPUT;  // not a recognized command.
908 }
909
910 outcome file_transfer_tentacle::refresh_now(const astring &source_mapping)
911 {
912   FUNCDEF("refresh_now");
913   AUTO_LOCK;
914   for (int i = 0; i < _correspondences->elements(); i++) {
915     file_transfer_record *curr = _correspondences->borrow(i);
916     if (!curr) continue;
917     if (curr->_source_mapping != source_mapping) continue;
918     if (curr->_local_dir) {
919 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
920       LOG(astring("refreshing tree for: ent=") + curr->_ent.text_form()
921           + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
922 #endif
923       WHACK(curr->_local_dir);
924       curr->_local_dir = new directory_tree(curr->_src_root);
925       curr->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
926 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
927       LOG(astring("done refreshing tree for: ent=") + curr->_ent.text_form()
928           + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
929 #endif
930     }
931     curr->_last_active.reset();  // reset our action time.
932     return OKAY;
933   }
934   return NOT_FOUND;
935 }
936
937 } //namespace.
938
939