1 /*****************************************************************************\
3 * Name : file_transfer_tentacle *
4 * Author : Chris Koeritz *
6 *******************************************************************************
7 * Copyright (c) 2005-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
15 #include "file_transfer_tentacle.h"
17 #include <basis/mutex.h>
18 #include <filesystem/directory_tree.h>
19 #include <filesystem/filename.h>
20 #include <filesystem/filename_list.h>
21 #include <filesystem/heavy_file_ops.h>
22 #include <loggers/program_wide_logger.h>
23 #include <octopus/entity_defs.h>
24 #include <octopus/entity_data_bin.h>
25 #include <octopus/unhandled_request.h>
26 #include <processes/ethread.h>
27 #include <textual/parser_bits.h>
29 using namespace basis;
30 using namespace filesystem;
31 using namespace loggers;
32 using namespace octopi;
33 using namespace processes;
34 using namespace structures;
35 using namespace textual;
36 using namespace timely;
41 #define AUTO_LOCK auto_synchronizer loc(*_lock);
42 // protects our lists.
44 const int FTT_CLEANING_INTERVAL = 30 * SECOND_ms;
45 // this is how frequently we clean up the list to remove outdated transfers.
47 const int TRANSFER_TIMEOUT = 10 * MINUTE_ms;
48 // if it hasn't been touched in this long, it's out of there.
50 //#define DEBUG_FILE_TRANSFER_TENTACLE
51 // uncomment for noisier version.
54 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
58 class file_transfer_record
61 // valid for both transfers and correspondences.
62 astring _src_root; // where the info is on the data provider.
63 time_stamp _last_active; // when this was last used.
65 // valid for file transfers only.
66 octopus_entity _ent; // the entity requesting this service.
67 astring _dest_root; // where the info is on the data sink.
68 filename_list *_diffs; // the differences to be transferred.
69 file_transfer_header _last_sent; // the last chunk that was sent.
70 bool _done; // true if the transfer is finished.
71 string_array _includes; // the set to include.
73 // valid for correspondence records only.
74 directory_tree *_local_dir; // our local information about the transfer.
75 astring _source_mapping; // valid for a correspondence record.
76 int _refresh_interval; // the rate of refreshing the source tree.
78 file_transfer_record() : _diffs(NULL_POINTER), _last_sent(file_time()),
79 _done(false), _local_dir(NULL_POINTER)
82 ~file_transfer_record() {
87 astring text_form() const {
89 to_return += astring("src=") + _src_root + astring(" last act=")
90 + _last_active.text_form();
91 if (_ent.blank()) to_return += astring(" ent=") + _ent.text_form();
93 to_return += astring(" dest=") + _dest_root;
94 to_return += astring(" last_sent=") + _last_sent.text_form();
102 // this implementation assumes that the same entity will never simultaneously
103 // transfer the same source to the same destination. that assumption holds
104 // up fine for different clients, since they should have different entities.
105 // when there is a collision on the entity/src/dest, then the default action
106 // is to assume that the transfer is just being started over.
108 class file_transfer_status : public amorph<file_transfer_record>
111 // find a transfer record by the key fields.
112 file_transfer_record *find(const octopus_entity &ent, const astring &src,
113 const astring &dest) {
114 for (int i = 0; i < elements(); i++) {
115 const file_transfer_record *rec = get(i);
116 if (rec && (rec->_ent == ent) && (rec->_src_root == src)
117 && (rec->_dest_root == dest) ) {
124 virtual ~file_transfer_status() {}
126 DEFINE_CLASS_NAME("file_transfer_status");
128 // find a file correspondence record by the mapping name.
129 file_transfer_record *find_mapping(const astring &source_mapping) {
130 for (int i = 0; i < elements(); i++) {
131 const file_transfer_record *rec = get(i);
132 if (rec && (rec->_source_mapping == source_mapping) )
138 // turns a source mapping into the location that it corresponds to.
139 astring translate(const astring &source_path) const {
140 FUNCDEF("translate");
143 filename(source_path).separate(rooted, pieces);
144 astring source_mapping = pieces[0];
145 pieces.zap(0, 0); // remove source part.
147 for (int i = 0; i < elements(); i++) {
148 const file_transfer_record *rec = get(i);
149 if (rec && (rec->_source_mapping == source_mapping) ) {
150 return rec->_src_root;
153 return astring::empty_string();
156 // removes a file transfer record by the key fields.
157 bool whack(const octopus_entity &ent, const astring &src,
158 const astring &dest) {
159 for (int i = 0; i < elements(); i++) {
160 const file_transfer_record *rec = get(i);
161 if (rec && (rec->_ent == ent) && (rec->_src_root == src)
162 && (rec->_dest_root == dest) ) {
170 // clean all records for the entity "ent".
171 void whack_all(const octopus_entity &ent) {
172 for (int i = elements() - 1; i >= 0; i--) {
173 const file_transfer_record *rec = get(i);
174 if (rec && (rec->_ent == ent) )
179 // removes a file transfer correspondence.
180 bool whack_mapping(const astring &source_mapping) {
181 for (int i = elements() - 1; i >= 0; i--) {
182 const file_transfer_record *rec = get(i);
183 if (rec && (rec->_source_mapping == source_mapping) ) {
191 // returns a string dump of the fields in this list.
192 astring text_form() const {
194 for (int i = 0; i < elements(); i++) {
195 const file_transfer_record *rec = get(i);
197 to_return += rec->text_form() + parser_bits::platform_eol_to_chars();
205 class file_transfer_cleaner : public ethread
208 file_transfer_cleaner(file_transfer_tentacle &parent)
209 : ethread(FTT_CLEANING_INTERVAL, SLACK_INTERVAL), _parent(parent) {}
211 virtual void perform_activity(void *formal(ptr)) { _parent.periodic_actions(); }
214 file_transfer_tentacle &_parent;
219 file_transfer_tentacle::file_transfer_tentacle(int maximum_transfer,
220 file_transfer_tentacle::transfer_modes mode_of_transfer)
221 : tentacle_helper<file_transfer_infoton>
222 (file_transfer_infoton::file_transfer_classifier(), false),
223 _maximum_transfer(maximum_transfer),
224 _transfers(new file_transfer_status),
225 _correspondences(new file_transfer_status),
227 _cleaner(new file_transfer_cleaner(*this)),
228 _mode(mode_of_transfer)
230 _cleaner->start(NULL_POINTER);
233 file_transfer_tentacle::~file_transfer_tentacle()
237 WHACK(_correspondences);
242 astring file_transfer_tentacle::text_form() const
245 return _transfers->text_form();
248 void file_transfer_tentacle::expunge(const octopus_entity &to_remove)
251 _transfers->whack_all(to_remove);
254 outcome file_transfer_tentacle::add_correspondence
255 (const astring &source_mapping, const astring &source_root,
256 int refresh_interval)
258 FUNCDEF("add_correspondence");
261 remove_correspondence(source_mapping); // clean the old one out first.
263 // create new file transfer record to hold this correspondence.
264 file_transfer_record *new_record = new file_transfer_record;
265 new_record->_source_mapping = source_mapping;
266 new_record->_src_root = source_root;
267 new_record->_refresh_interval = refresh_interval;
268 new_record->_local_dir = new directory_tree(source_root);
269 //hmmm: doesn't say anything about a pattern. do we need to worry about that?
271 // check that the directory looked healthy.
272 if (!new_record->_local_dir->good()) {
274 return common::ACCESS_DENIED;
276 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
277 LOG(astring("adding tree for: ent=") + new_record->_ent.text_form()
278 + " src=" + new_record->_src_root);
280 // calculate size and checksum info for the directory.
281 new_record->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
283 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
284 LOG(astring("done adding tree for: ent=") + new_record->_ent.text_form()
285 + " src=" + new_record->_src_root);
288 _correspondences->append(new_record);
293 outcome file_transfer_tentacle::remove_correspondence
294 (const astring &source_mapping)
297 if (!_correspondences->whack_mapping(source_mapping))
302 bool file_transfer_tentacle::get_differences(const octopus_entity &ent,
303 const astring &src, const astring &dest, filename_list &diffs)
305 FUNCDEF("get_differences");
308 file_transfer_record *the_rec = _transfers->find(ent, src, dest);
309 if (!the_rec) return false;
310 if (!the_rec->_diffs) return false; // no diffs listed.
311 diffs = *the_rec->_diffs;
315 bool file_transfer_tentacle::status(const octopus_entity &ent,
316 const astring &src, const astring &dest, double &total_size,
317 int &total_files, double ¤t_size, int ¤t_files, bool &done,
318 time_stamp &last_active)
326 file_transfer_record *the_rec = _transfers->find(ent, src, dest);
327 if (!the_rec) return false;
328 done = the_rec->_done;
329 last_active = the_rec->_last_active;
331 if (the_rec->_diffs) {
332 the_rec->_diffs->calculate_progress(the_rec->_last_sent._filename,
333 the_rec->_last_sent._byte_start + the_rec->_last_sent._length,
334 current_files, current_size);
335 total_files = the_rec->_diffs->total_files();
336 total_size = the_rec->_diffs->total_size();
342 outcome file_transfer_tentacle::register_file_transfer
343 (const octopus_entity &ent, const astring &src_root,
344 const astring &dest_root, const string_array &includes)
346 FUNCDEF("register_file_transfer");
348 // make sure that this isn't an existing transfer. if so, we just update
350 file_transfer_record *the_rec = _transfers->find(ent, src_root, dest_root);
352 the_rec = new file_transfer_record;
353 the_rec->_src_root = src_root;
354 the_rec->_dest_root = dest_root;
356 the_rec->_includes = includes;
357 _transfers->append(the_rec); // add the new record.
359 the_rec->_done = false;
360 the_rec->_includes = includes;
361 the_rec->_last_active.reset(); // freshen up the last activity time.
366 outcome file_transfer_tentacle::cancel_file_transfer(const octopus_entity &ent,
367 const astring &src_root, const astring &dest_root)
370 return _transfers->whack(ent, src_root, dest_root)? OKAY : NOT_FOUND;
373 directory_tree *file_transfer_tentacle::lock_directory(const astring &key)
376 file_transfer_record *the_rec = _correspondences->find_mapping(key);
377 if (!the_rec || !the_rec->_local_dir) {
379 return NULL_POINTER; // unknown transfer.
381 return the_rec->_local_dir;
384 void file_transfer_tentacle::unlock_directory()
389 bool file_transfer_tentacle::add_path(const astring &key,
390 const astring &new_path)
393 file_transfer_record *the_rec = _correspondences->find_mapping(key);
394 if (!the_rec) return false; // unknown transfer.
395 if (!the_rec->_local_dir) return false; // not right type.
396 return the_rec->_local_dir->add_path(new_path) == common::OKAY;
399 bool file_transfer_tentacle::remove_path(const astring &key,
400 const astring &old_path)
403 file_transfer_record *the_rec = _correspondences->find_mapping(key);
404 if (!the_rec) return false; // unknown transfer.
405 if (!the_rec->_local_dir) return false; // not right type.
406 return the_rec->_local_dir->remove_path(old_path) == common::OKAY;
409 void file_transfer_tentacle::periodic_actions()
411 FUNCDEF("periodic_actions");
414 // first, we'll clean out old transfers.
415 time_stamp oldest_allowed(-TRANSFER_TIMEOUT);
416 // nothing older than this should be kept.
417 for (int i = _transfers->elements() - 1; i >= 0; i--) {
418 const file_transfer_record *curr = _transfers->get(i);
419 if (curr->_last_active < oldest_allowed) {
420 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
421 LOG(astring("cleaning record for: ent=") + curr->_ent.text_form()
422 + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
424 _transfers->zap(i, i);
428 // then we'll rescan any trees that are ready for it.
429 for (int i = 0; i < _correspondences->elements(); i++) {
430 file_transfer_record *curr = _correspondences->borrow(i);
431 if (curr->_last_active < time_stamp(-curr->_refresh_interval)) {
432 if (curr->_local_dir) {
433 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
434 LOG(astring("refreshing tree for: ent=") + curr->_ent.text_form()
435 + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
437 WHACK(curr->_local_dir);
438 curr->_local_dir = new directory_tree(curr->_src_root);
439 curr->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
440 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
441 LOG(astring("done refreshing tree for: ent=") + curr->_ent.text_form()
442 + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
445 curr->_last_active.reset(); // reset our action time.
450 outcome file_transfer_tentacle::reconstitute(const string_array &classifier,
451 byte_array &packed_form, infoton * &reformed)
453 // this method doesn't use the lists, so it doesn't need locking.
454 if (classifier != file_transfer_infoton::file_transfer_classifier())
456 return reconstituter(classifier, packed_form, reformed,
457 (file_transfer_infoton *)NULL_POINTER);
460 // the "handle_" and "conclude_" methods are thread-safe because the mutex is locked before
461 // their invocations.
463 basis::outcome file_transfer_tentacle::handle_build_target_tree_request(file_transfer_infoton &req,
464 const octopus_request_id &item_id)
466 FUNCDEF("handle_build_target_tree_request");
468 // get the mapping from the specified location on this side.
469 filename splitting(req._src_root);
472 splitting.separate(rooted, pieces);
473 astring source_mapping = pieces[0];
475 // patch the name up to find the sub_path for the source.
476 filename source_start;
478 source_start.join(rooted, pieces);
480 // locate the allowed transfer depot for the mapping they provided.
481 file_transfer_record *mapping_record = _correspondences->find_mapping(source_mapping);
482 if (!mapping_record) {
483 LOG(astring("could not find source mapping of ") + source_mapping);
487 // unpack the tree that they sent us which describes their local area.
488 directory_tree *dest_tree = new directory_tree;
489 if (!dest_tree->unpack(req._packed_data)) {
490 LOG(astring("could not unpack requester's directory tree"));
495 string_array requested_names;
496 if (!requested_names.unpack(req._packed_data)) {
497 LOG(astring("could not unpack requester's filename includes"));
502 // look up to see if this is about something that has already been seen.
503 // we don't want to add a new transfer record if they're already working on
504 // this. that also lets them do a new tree compare to restart the transfer.
505 file_transfer_record *the_rec = _transfers->find(item_id._entity,
506 req._src_root, req._dest_root);
508 // there was no existing record; we'll create a new one.
509 the_rec = new file_transfer_record;
510 the_rec->_ent = item_id._entity;
511 the_rec->_src_root = req._src_root;
512 the_rec->_dest_root = req._dest_root;
513 _transfers->append(the_rec);
515 // record some activity on this record.
516 the_rec->_done = false;
517 the_rec->_last_active.reset();
520 // create any directories that are missing at this point.
521 basis::outcome result = dest_tree->make_directories(req._dest_root);
522 if (result != common::OKAY) {
523 LOG("ERROR: got bad result from make_directories!");
526 req._packed_data.reset(); // clear out existing stuff before cloning.
527 file_transfer_infoton *reply = dynamic_cast<file_transfer_infoton *>(req.clone());
529 reply->_request = false; // it's a response now.
530 reply->_success = result;
531 store_product(reply, item_id);
532 // send back the comparison list.
537 basis::outcome file_transfer_tentacle::handle_build_target_tree_response(file_transfer_infoton &resp,
538 const octopus_request_id &item_id)
540 //go to next step, tree comparison.
541 //look at the handle tree compare response for help though.
542 FUNCDEF("handle_build_target_tree_response");
543 file_transfer_record *the_rec = _transfers->find(item_id._entity,
544 resp._src_root, resp._dest_root);
546 LOG(astring("could not find the record for this transfer: item=")
547 + item_id.text_form() + " src=" + resp._src_root + " dest="
549 return NOT_FOUND; // not registered, so reject it.
552 the_rec->_last_active.reset(); // record some activity on this record.
554 return resp._success;
557 outcome file_transfer_tentacle::handle_tree_compare_request
558 (file_transfer_infoton &req, const octopus_request_id &item_id)
560 FUNCDEF("handle_tree_compare_request");
562 // get the mapping from the specified location on this side.
563 filename splitting(req._src_root);
566 splitting.separate(rooted, pieces);
567 astring source_mapping = pieces[0];
569 // patch the name up to find the sub_path for the source.
570 filename source_start;
572 source_start.join(rooted, pieces);
574 // locate the allowed transfer depot for the mapping they provided.
575 file_transfer_record *mapping_record
576 = _correspondences->find_mapping(source_mapping);
577 if (!mapping_record) {
578 LOG(astring("could not find source mapping of ") + source_mapping);
582 // unpack the tree that they sent us which describes their local area.
583 directory_tree *dest_tree = new directory_tree;
584 if (!dest_tree->unpack(req._packed_data)) {
585 LOG(astring("could not unpack requester's directory tree"));
590 string_array requested_names;
591 if (!requested_names.unpack(req._packed_data)) {
592 LOG(astring("could not unpack requester's filename includes"));
597 // look up to see if this is about something that has already been seen.
598 // we don't want to add a new transfer record if they're already working on
599 // this. that also lets them do a new tree compare to restart the transfer.
600 file_transfer_record *the_rec = _transfers->find(item_id._entity,
601 req._src_root, req._dest_root);
603 // there was no existing record; we'll create a new one.
604 the_rec = new file_transfer_record;
605 the_rec->_ent = item_id._entity;
606 the_rec->_src_root = req._src_root;
607 the_rec->_dest_root = req._dest_root;
608 _transfers->append(the_rec);
610 // record some activity on this record.
611 the_rec->_done = false;
612 the_rec->_last_active.reset();
615 the_rec->_diffs = new filename_list;
617 int how_comp = file_info::EQUAL_NAME; // the prize for doing nothing.
618 if (_mode & COMPARE_SIZE_AND_TIME)
619 how_comp |= file_info::EQUAL_FILESIZE | file_info::EQUAL_TIMESTAMP;
620 if (_mode & COMPARE_CONTENT_SAMPLE)
621 how_comp |= file_info::EQUAL_CHECKSUM;
623 // compare the two trees of files.
624 directory_tree::compare_trees(*mapping_record->_local_dir,
625 source_start.raw(), *dest_tree, astring::empty_string(),
626 *the_rec->_diffs, (file_info::file_similarity)how_comp);
628 //LOG(astring("filenames decided as different:\n") + the_rec->_diffs->text_form());
630 // now prune the diffs to accord with what they claim they want.
631 if (requested_names.length()) {
632 for (int i = the_rec->_diffs->elements() - 1; i >= 0; i--) {
633 filename diff_curr = *the_rec->_diffs->get(i);
635 for (int j = 0; j < requested_names.length(); j++) {
636 filename req_curr(requested_names[j]);
637 if (req_curr.compare_suffix(diff_curr)) {
642 if (!found) the_rec->_diffs->zap(i, i);
646 req._packed_data.reset(); // clear out existing stuff before cloning.
647 file_transfer_infoton *reply = dynamic_cast<file_transfer_infoton *>(req.clone());
648 the_rec->_diffs->pack(reply->_packed_data);
650 //hmmm: does the other side really need the list of filenames? i guess we
651 // could check validity of what's transferred or check space available
652 // before the client starts the transfer.
654 reply->_request = false; // it's a response now.
655 store_product(reply, item_id);
656 // send back the comparison list.
661 outcome file_transfer_tentacle::handle_tree_compare_response
662 (file_transfer_infoton &resp, const octopus_request_id &item_id)
664 FUNCDEF("handle_tree_compare_response");
665 file_transfer_record *the_rec = _transfers->find(item_id._entity,
666 resp._src_root, resp._dest_root);
668 LOG(astring("could not find the record for this transfer: item=")
669 + item_id.text_form() + " src=" + resp._src_root + " dest="
671 return NOT_FOUND; // not registered, so reject it.
674 the_rec->_last_active.reset(); // record some activity on this record.
676 filename_list *flist = new filename_list;
677 if (!flist->unpack(resp._packed_data)) {
682 //hmmm: verify space on device?
684 the_rec->_diffs = flist; // set the list of differences.
688 outcome file_transfer_tentacle::handle_storage_request
689 (file_transfer_infoton &req, const octopus_request_id &item_id)
691 FUNCDEF("handle_storage_request");
692 if (_mode & ONLY_REPORT_DIFFS) {
693 // store an unhandled infoton.
694 unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
695 store_product(deny, item_id);
699 // look up the transfer record.
700 file_transfer_record *the_rec = _transfers->find(item_id._entity,
701 req._src_root, req._dest_root);
703 LOG(astring("could not find the record for this transfer: item=")
704 + item_id.text_form() + " src=" + req._src_root + " dest="
706 return NOT_FOUND; // not registered, so reject it.
709 the_rec->_last_active.reset(); // mark it as still active.
711 file_transfer_infoton *resp = dynamic_cast<file_transfer_infoton *>(req.clone());
713 if (!the_rec->_diffs) return BAD_INPUT; // wrong type of object.
715 outcome bufret = heavy_file_operations::buffer_files
716 (_correspondences->translate(the_rec->_src_root), *the_rec->_diffs,
717 the_rec->_last_sent, resp->_packed_data, _maximum_transfer);
718 if (bufret == heavy_file_operations::FINISHED) {
719 bufret = OKAY; // in either case, we don't emit a finished outcome; handled elsewhere.
720 if (!resp->_packed_data.length()) {
721 // blank packages, so finish by setting command to be a conclude marker.
722 the_rec->_done = true;
723 resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER;
725 } else if (bufret != OKAY) {
726 // complain, but still send.
727 LOG(astring("buffer files returned an error on item=")
728 + item_id.text_form() + " src=" + req._src_root + " dest="
732 //can remove this block if stops saying it.
733 if ((bufret == OKAY) && !resp->_packed_data.length() ) {
734 LOG("marking empty transfer as done; why not caught above at FINISHED check?");
735 the_rec->_done = true;
736 resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER;
740 resp->_request = false; // it's a response now.
741 store_product(resp, item_id);
745 outcome file_transfer_tentacle::handle_storage_response
746 (file_transfer_infoton &resp, const octopus_request_id &item_id)
748 FUNCDEF("handle_storage_response");
749 if (_mode & ONLY_REPORT_DIFFS) {
754 // look up the transfer record.
755 file_transfer_record *the_rec = _transfers->find(item_id._entity,
756 resp._src_root, resp._dest_root);
757 if (!the_rec) return NOT_FOUND; // not registered, so reject it.
759 the_rec->_last_active.reset(); // mark it as still active.
761 if (!resp._packed_data.length()) {
762 // mark that we're done now.
763 the_rec->_done = true;
766 // chew on all the things they sent us.
767 while (resp._packed_data.length()) {
769 file_transfer_header found(empty);
770 if (!found.unpack(resp._packed_data)) {
772 LOG(astring("corruption seen on item=") + item_id.text_form()
773 + " src=" + resp._src_root + " dest=" + resp._dest_root);
776 the_rec->_last_sent = found;
778 if (found._length > resp._packed_data.length()) {
779 // another case for leaving--not enough data left in the buffer.
780 LOG(astring("data underflow seen on item=") + item_id.text_form()
781 + " src=" + resp._src_root + " dest=" + resp._dest_root);
784 byte_array to_write = resp._packed_data.subarray(0, found._length - 1);
785 resp._packed_data.zap(0, found._length - 1);
787 if (!the_rec->_diffs) return BAD_INPUT;
789 const file_info *recorded_info = the_rec->_diffs->find(found._filename);
790 if (!recorded_info) {
791 LOG(astring("unrequested file seen: ") + found._filename);
792 continue; // maybe there are others that aren't confused.
795 astring full_file = resp._dest_root + filename::default_separator()
796 + recorded_info->secondary();
797 // LOG(astring("telling it to write to fullfile: ") + full_file);
799 outcome ret = heavy_file_operations::write_file_chunk(full_file,
800 found._byte_start, to_write);
802 LOG(astring("failed to write file chunk: error=")
803 + heavy_file_operations::outcome_name(ret) + " file=" + full_file
804 + a_sprintf(" start=%d len=%d", found._byte_start, found._length));
806 found._time.set_time(full_file);
809 // there is no response product to store.
813 outcome file_transfer_tentacle::conclude_storage_request
814 (file_transfer_infoton &req, const octopus_request_id &item_id)
816 FUNCDEF("conclude_storage_request");
817 if (_mode & ONLY_REPORT_DIFFS) {
818 // store an unhandled infoton.
819 unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
820 store_product(deny, item_id);
824 // look up the transfer record.
825 file_transfer_record *the_rec = _transfers->find(item_id._entity,
826 req._src_root, req._dest_root);
828 LOG(astring("could not find the record for this transfer: item=")
829 + item_id.text_form() + " src=" + req._src_root + " dest="
831 return NOT_FOUND; // not registered, so reject it.
834 the_rec->_last_active.reset(); // mark it as still active.
836 file_transfer_infoton *resp = dynamic_cast<file_transfer_infoton *>(req.clone());
838 if (!the_rec->_diffs) return BAD_INPUT; // wrong type of object.
840 the_rec->_done = true; // we're concluding the transfer, so that's that.
841 resp->_request = false; // it's a response now.
842 store_product(resp, item_id);
844 LOG(astring("concluding transfer request on src=") + req._src_root + " dest="
850 outcome file_transfer_tentacle::conclude_storage_response
851 (file_transfer_infoton &resp, const octopus_request_id &item_id)
853 FUNCDEF("conclude_storage_response");
854 if (_mode & ONLY_REPORT_DIFFS) {
859 // look up the transfer record.
860 file_transfer_record *the_rec = _transfers->find(item_id._entity,
861 resp._src_root, resp._dest_root);
862 if (!the_rec) return NOT_FOUND; // not registered, so reject it.
864 the_rec->_last_active.reset(); // mark it as still active.
866 // mark that we're done now.
867 the_rec->_done = true;
869 LOG(astring("concluding transfer response on src=") + resp._src_root + " dest="
872 // there is no response product to store.
876 // consume() is the only method that is allowed to invoke the "handle_X" methods
877 // and it must lock the object beforehand.
879 outcome file_transfer_tentacle::consume(infoton &to_chow,
880 const octopus_request_id &item_id, byte_array &transformed)
884 file_transfer_infoton *inf = dynamic_cast<file_transfer_infoton *>(&to_chow);
885 if (!inf) return DISALLOWED; // not for us.
887 AUTO_LOCK; // protect our lists while we're working on them.
889 switch (inf->_command) {
890 case file_transfer_infoton::BUILD_TARGET_TREE: {
891 if (inf->_request) return handle_build_target_tree_request(*inf, item_id);
892 else return handle_build_target_tree_response(*inf, item_id);
894 case file_transfer_infoton::TREE_COMPARISON: {
895 if (inf->_request) return handle_tree_compare_request(*inf, item_id);
896 else return handle_tree_compare_response(*inf, item_id);
898 case file_transfer_infoton::PLACE_FILE_CHUNKS: {
899 if (inf->_request) return handle_storage_request(*inf, item_id);
900 else return handle_storage_response(*inf, item_id);
902 case file_transfer_infoton::CONCLUDE_TRANSFER_MARKER: {
903 if (inf->_request) return conclude_storage_request(*inf, item_id);
904 else return conclude_storage_response(*inf, item_id);
907 return BAD_INPUT; // not a recognized command.
910 outcome file_transfer_tentacle::refresh_now(const astring &source_mapping)
912 FUNCDEF("refresh_now");
914 for (int i = 0; i < _correspondences->elements(); i++) {
915 file_transfer_record *curr = _correspondences->borrow(i);
917 if (curr->_source_mapping != source_mapping) continue;
918 if (curr->_local_dir) {
919 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
920 LOG(astring("refreshing tree for: ent=") + curr->_ent.text_form()
921 + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
923 WHACK(curr->_local_dir);
924 curr->_local_dir = new directory_tree(curr->_src_root);
925 curr->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
926 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
927 LOG(astring("done refreshing tree for: ent=") + curr->_ent.text_form()
928 + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
931 curr->_last_active.reset(); // reset our action time.