X-Git-Url: https://feistymeow.org/gitweb/?a=blobdiff_plain;f=octopi%2Flibrary%2Ftentacles%2Ffile_transfer_tentacle.cpp;h=50cc5114b5ea97b1499d48e30d10417d868d6434;hb=94a850991b040a4b8000a3fa1a2906e7c7f4f612;hp=fa082b990073f94a8feb26b341329929e41b0695;hpb=2952ccf47b80174880141a7ecfa122089f349b8d;p=feisty_meow.git diff --git a/octopi/library/tentacles/file_transfer_tentacle.cpp b/octopi/library/tentacles/file_transfer_tentacle.cpp index fa082b99..50cc5114 100644 --- a/octopi/library/tentacles/file_transfer_tentacle.cpp +++ b/octopi/library/tentacles/file_transfer_tentacle.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -46,7 +47,7 @@ const int FTT_CLEANING_INTERVAL = 30 * SECOND_ms; const int TRANSFER_TIMEOUT = 10 * MINUTE_ms; // if it hasn't been touched in this long, it's out of there. -//#define DEBUG_FILE_TRANSFER_TENTACLE +#define DEBUG_FILE_TRANSFER_TENTACLE // uncomment for noisier version. #undef LOG @@ -136,9 +137,10 @@ public: // turns a source mapping into the location that it corresponds to. astring translate(const astring &source_path) const { -// FUNCDEF("translate"); + FUNCDEF("translate"); string_array pieces; - filename(source_path).separate(pieces); + bool rooted; + filename(source_path).separate(rooted, pieces); astring source_mapping = pieces[0]; pieces.zap(0, 0); // remove source part. @@ -253,9 +255,7 @@ outcome file_transfer_tentacle::add_correspondence (const astring &source_mapping, const astring &source_root, int refresh_interval) { -#ifdef DEBUG_FILE_TRANSFER_TENTACLE FUNCDEF("add_correspondence"); -#endif AUTO_LOCK; remove_correspondence(source_mapping); // clean the old one out first. @@ -275,14 +275,14 @@ outcome file_transfer_tentacle::add_correspondence } #ifdef DEBUG_FILE_TRANSFER_TENTACLE LOG(astring("adding tree for: ent=") + new_record->_ent.text_form() - + " src=" + new_record->_src_root + " dest=" + new_record->_dest_root); + + " src=" + new_record->_src_root); #endif // calculate size and checksum info for the directory. new_record->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) ); #ifdef DEBUG_FILE_TRANSFER_TENTACLE LOG(astring("done adding tree for: ent=") + new_record->_ent.text_form() - + " src=" + new_record->_src_root + " dest=" + new_record->_dest_root); + + " src=" + new_record->_src_root); #endif _correspondences->append(new_record); @@ -302,7 +302,7 @@ outcome file_transfer_tentacle::remove_correspondence bool file_transfer_tentacle::get_differences(const octopus_entity &ent, const astring &src, const astring &dest, filename_list &diffs) { -// FUNCDEF("get_differences"); + FUNCDEF("get_differences"); diffs.reset(); AUTO_LOCK; file_transfer_record *the_rec = _transfers->find(ent, src, dest); @@ -317,7 +317,7 @@ bool file_transfer_tentacle::status(const octopus_entity &ent, int &total_files, double ¤t_size, int ¤t_files, bool &done, time_stamp &last_active) { -// FUNCDEF("status"); + FUNCDEF("status"); total_size = 0; total_files = 0; current_files = 0; @@ -343,7 +343,7 @@ outcome file_transfer_tentacle::register_file_transfer (const octopus_entity &ent, const astring &src_root, const astring &dest_root, const string_array &includes) { -// FUNCDEF("register_file_transfer"); + FUNCDEF("register_file_transfer"); AUTO_LOCK; // make sure that this isn't an existing transfer. if so, we just update // the status. @@ -408,9 +408,7 @@ bool file_transfer_tentacle::remove_path(const astring &key, void file_transfer_tentacle::periodic_actions() { -#ifdef DEBUG_FILE_TRANSFER_TENTACLE FUNCDEF("periodic_actions"); -#endif AUTO_LOCK; // first, we'll clean out old transfers. @@ -459,7 +457,7 @@ outcome file_transfer_tentacle::reconstitute(const string_array &classifier, (file_transfer_infoton *)NIL); } -// the "handle_" methods are thread-safe because the mutex is locked before +// the "handle_" and "conclude_" methods are thread-safe because the mutex is locked before // their invocations. outcome file_transfer_tentacle::handle_tree_compare_request @@ -470,13 +468,14 @@ outcome file_transfer_tentacle::handle_tree_compare_request // get the mapping from the specified location on this side. filename splitting(req._src_root); string_array pieces; - splitting.separate(pieces); + bool rooted; + splitting.separate(rooted, pieces); astring source_mapping = pieces[0]; // patch the name up to find the sub_path for the source. filename source_start; pieces.zap(0, 0); - source_start.join(pieces); + source_start.join(rooted, pieces); // locate the allowed transfer depot for the mapping they provided. file_transfer_record *mapping_record @@ -560,9 +559,13 @@ outcome file_transfer_tentacle::handle_tree_compare_request // before the client starts the transfer. reply->_request = false; // it's a response now. +LOG("storing product from transfer processing"); store_product(reply, item_id); // send back the comparison list. +LOG("now showing bin before return:"); +LOG(get_storage()->text_form()); + return OKAY; } @@ -599,8 +602,7 @@ outcome file_transfer_tentacle::handle_storage_request FUNCDEF("handle_storage_request"); if (_mode & ONLY_REPORT_DIFFS) { // store an unhandled infoton. - unhandled_request *deny = new unhandled_request(item_id, req.classifier(), - NO_HANDLER); + unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER); store_product(deny, item_id); return NO_HANDLER; } @@ -624,19 +626,27 @@ outcome file_transfer_tentacle::handle_storage_request outcome bufret = heavy_file_operations::buffer_files (_correspondences->translate(the_rec->_src_root), *the_rec->_diffs, the_rec->_last_sent, resp->_packed_data, _maximum_transfer); - if (bufret != OKAY) { + if (bufret == heavy_file_operations::FINISHED) { + bufret = OKAY; // in either case, we don't emit a finished outcome; handled elsewhere. + if (!resp->_packed_data.length()) { + // blank packages, so finish by setting command to be a conclude marker. + the_rec->_done = true; + resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER; + } + } else if (bufret != OKAY) { // complain, but still send. LOG(astring("buffer files returned an error on item=") + item_id.text_form() + " src=" + req._src_root + " dest=" + req._dest_root); } - if ( (bufret == OKAY) && !resp->_packed_data.length() ) { - // seems like the transfer is done. - +//can remove this block if stops saying it. + if ((bufret == OKAY) && !resp->_packed_data.length() ) { + LOG("marking empty transfer as done; why not caught above at FINISHED check?"); the_rec->_done = true; -//hmmm: mark the record and time out faster? + resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER; } +//end of can remove. resp->_request = false; // it's a response now. store_product(resp, item_id); @@ -695,6 +705,7 @@ outcome file_transfer_tentacle::handle_storage_response astring full_file = resp._dest_root + filename::default_separator() + recorded_info->secondary(); +LOG(astring("telling it to write to fullfile: ") + full_file); outcome ret = heavy_file_operations::write_file_chunk(full_file, found._byte_start, to_write); @@ -710,13 +721,76 @@ outcome file_transfer_tentacle::handle_storage_response return OKAY; } -// this is the only method that is allowed to invoke the "handle_X" methods +outcome file_transfer_tentacle::conclude_storage_request + (file_transfer_infoton &req, const octopus_request_id &item_id) +{ + FUNCDEF("conclude_storage_request"); + if (_mode & ONLY_REPORT_DIFFS) { + // store an unhandled infoton. + unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER); + store_product(deny, item_id); + return NO_HANDLER; + } + + // look up the transfer record. + file_transfer_record *the_rec = _transfers->find(item_id._entity, + req._src_root, req._dest_root); + if (!the_rec) { + LOG(astring("could not find the record for this transfer: item=") + + item_id.text_form() + " src=" + req._src_root + " dest=" + + req._dest_root); + return NOT_FOUND; // not registered, so reject it. + } + + the_rec->_last_active.reset(); // mark it as still active. + + file_transfer_infoton *resp = dynamic_cast(req.clone()); + + if (!the_rec->_diffs) return BAD_INPUT; // wrong type of object. + + the_rec->_done = true; // we're concluding the transfer, so that's that. + resp->_request = false; // it's a response now. + store_product(resp, item_id); + + LOG(astring("concluding transfer request on src=") + req._src_root + " dest=" + + req._dest_root); + + return common::OKAY; +} + +outcome file_transfer_tentacle::conclude_storage_response + (file_transfer_infoton &resp, const octopus_request_id &item_id) +{ + FUNCDEF("conclude_storage_response"); + if (_mode & ONLY_REPORT_DIFFS) { + // not spoken here. + return NO_HANDLER; + } + + // look up the transfer record. + file_transfer_record *the_rec = _transfers->find(item_id._entity, + resp._src_root, resp._dest_root); + if (!the_rec) return NOT_FOUND; // not registered, so reject it. + + the_rec->_last_active.reset(); // mark it as still active. + + // mark that we're done now. + the_rec->_done = true; + + LOG(astring("concluding transfer response on src=") + resp._src_root + " dest=" + + resp._dest_root); + + // there is no response product to store. + return OKAY; +} + +// consume() is the only method that is allowed to invoke the "handle_X" methods // and it must lock the object beforehand. outcome file_transfer_tentacle::consume(infoton &to_chow, const octopus_request_id &item_id, byte_array &transformed) { -// FUNCDEF("consume"); + FUNCDEF("consume"); transformed.reset(); file_transfer_infoton *inf = dynamic_cast(&to_chow); if (!inf) return DISALLOWED; // not for us. @@ -732,15 +806,17 @@ outcome file_transfer_tentacle::consume(infoton &to_chow, if (inf->_request) return handle_storage_request(*inf, item_id); else return handle_storage_response(*inf, item_id); } + case file_transfer_infoton::CONCLUDE_TRANSFER_MARKER: { + if (inf->_request) return conclude_storage_request(*inf, item_id); + else return conclude_storage_response(*inf, item_id); + } } return BAD_INPUT; // not a recognized command. } outcome file_transfer_tentacle::refresh_now(const astring &source_mapping) { -#ifdef DEBUG_FILE_TRANSFER_TENTACLE FUNCDEF("refresh_now"); -#endif AUTO_LOCK; for (int i = 0; i < _correspondences->elements(); i++) { file_transfer_record *curr = _correspondences->borrow(i);