From: Chris Koeritz Date: Sun, 16 Sep 2012 21:02:05 +0000 (-0400) Subject: updated to change how final part of transfer is tagged. X-Git-Tag: 2.140.90~1233 X-Git-Url: https://feistymeow.org/gitweb/?p=feisty_meow.git;a=commitdiff_plain;h=8e98ed433de0151e4356d580d2e5db6edd3e2c01 updated to change how final part of transfer is tagged. --- diff --git a/nucleus/library/filesystem/heavy_file_ops.cpp b/nucleus/library/filesystem/heavy_file_ops.cpp index 1e63ba31..e8f48ebd 100644 --- a/nucleus/library/filesystem/heavy_file_ops.cpp +++ b/nucleus/library/filesystem/heavy_file_ops.cpp @@ -235,14 +235,12 @@ outcome heavy_file_operations::buffer_files(const astring &source_root, const filename_list &to_transfer, file_transfer_header &last_action, byte_array &storage, int maximum_bytes) { -#ifdef DEBUG_HEAVY_FILE_OPS -// FUNCDEF("buffer_files"); -#endif + FUNCDEF("buffer_files"); storage.reset(); // clear out the current contents. if (!to_transfer.elements()) { // we seem to be done. - return OKAY; + return FINISHED; } outcome to_return = OKAY; diff --git a/nucleus/library/filesystem/heavy_file_ops.h b/nucleus/library/filesystem/heavy_file_ops.h index add49683..8cfdc42f 100644 --- a/nucleus/library/filesystem/heavy_file_ops.h +++ b/nucleus/library/filesystem/heavy_file_ops.h @@ -63,10 +63,7 @@ public: enum outcomes { OKAY = basis::common::OKAY, BAD_INPUT = basis::common::BAD_INPUT, -// GARBAGE = basis::common::GARBAGE, -// NOT_FOUND = basis::common::NOT_FOUND, -// NONE_READY = basis::common::NONE_READY, -// FAILURE = basis::common::FAILURE, + FINISHED = basis::common::IS_EMPTY, // nothing left to pack. DEFINE_OUTCOME(SOURCE_MISSING, -43, "The source file is not accessible"), DEFINE_OUTCOME(TARGET_DIR_ERROR, -44, "The target's directory could not " "be created"), diff --git a/octopi/library/tentacles/file_transfer_infoton.h b/octopi/library/tentacles/file_transfer_infoton.h index 5077d1ed..2ea10e2a 100644 --- a/octopi/library/tentacles/file_transfer_infoton.h +++ b/octopi/library/tentacles/file_transfer_infoton.h @@ -38,7 +38,7 @@ public: is the packed filename list that represents the differences between the two hierarchies. this is considered to start a file transfer based on those differences. */ - PLACE_FILE_CHUNKS + PLACE_FILE_CHUNKS, //!< the destination side requests a new set of chunks. /*!< this is based on the source's memory of where the transfer is at. this will only perform properly when the file transfer was requested to @@ -47,6 +47,11 @@ public: number of pairs of @code [ file_transfer_header + file chunk described in header ] @endcode */ + CONCLUDE_TRANSFER_MARKER + //!< this infoton marks the end of the transfer process. + /*!< we've added this type of transfer infoton to handle the finish + of the transfer. previously this was marked by a null data packet, + which turns out to be a really bad idea. */ }; basis::outcome _success; //!< reports what kind of result occurred. diff --git a/octopi/library/tentacles/file_transfer_tentacle.cpp b/octopi/library/tentacles/file_transfer_tentacle.cpp index fa082b99..bb9711e8 100644 --- a/octopi/library/tentacles/file_transfer_tentacle.cpp +++ b/octopi/library/tentacles/file_transfer_tentacle.cpp @@ -459,7 +459,7 @@ outcome file_transfer_tentacle::reconstitute(const string_array &classifier, (file_transfer_infoton *)NIL); } -// the "handle_" methods are thread-safe because the mutex is locked before +// the "handle_" and "conclude_" methods are thread-safe because the mutex is locked before // their invocations. outcome file_transfer_tentacle::handle_tree_compare_request @@ -599,8 +599,7 @@ outcome file_transfer_tentacle::handle_storage_request FUNCDEF("handle_storage_request"); if (_mode & ONLY_REPORT_DIFFS) { // store an unhandled infoton. - unhandled_request *deny = new unhandled_request(item_id, req.classifier(), - NO_HANDLER); + unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER); store_product(deny, item_id); return NO_HANDLER; } @@ -624,7 +623,11 @@ outcome file_transfer_tentacle::handle_storage_request outcome bufret = heavy_file_operations::buffer_files (_correspondences->translate(the_rec->_src_root), *the_rec->_diffs, the_rec->_last_sent, resp->_packed_data, _maximum_transfer); - if (bufret != OKAY) { + if (bufret == heavy_file_operations::FINISHED) { +//here we go. finish by setting command to conclude. +LOG("got the final marker saying heavy file ops done!"); + resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER; + } else if (bufret != OKAY) { // complain, but still send. LOG(astring("buffer files returned an error on item=") + item_id.text_form() + " src=" + req._src_root + " dest=" @@ -633,9 +636,8 @@ outcome file_transfer_tentacle::handle_storage_request if ( (bufret == OKAY) && !resp->_packed_data.length() ) { // seems like the transfer is done. - the_rec->_done = true; -//hmmm: mark the record and time out faster? + resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER; } resp->_request = false; // it's a response now. @@ -710,13 +712,69 @@ outcome file_transfer_tentacle::handle_storage_response return OKAY; } -// this is the only method that is allowed to invoke the "handle_X" methods +outcome file_transfer_tentacle::conclude_storage_request + (file_transfer_infoton &req, const octopus_request_id &item_id) +{ + FUNCDEF("handle_storage_request"); + if (_mode & ONLY_REPORT_DIFFS) { + // store an unhandled infoton. + unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER); + store_product(deny, item_id); + return NO_HANDLER; + } + + // look up the transfer record. + file_transfer_record *the_rec = _transfers->find(item_id._entity, + req._src_root, req._dest_root); + if (!the_rec) { + LOG(astring("could not find the record for this transfer: item=") + + item_id.text_form() + " src=" + req._src_root + " dest=" + + req._dest_root); + return NOT_FOUND; // not registered, so reject it. + } + + the_rec->_last_active.reset(); // mark it as still active. + + file_transfer_infoton *resp = dynamic_cast(req.clone()); + + if (!the_rec->_diffs) return BAD_INPUT; // wrong type of object. + + the_rec->_done = true; // we're concluding the transfer, so that's that. + resp->_request = false; // it's a response now. + store_product(resp, item_id); + return common::OKAY; +} + +outcome file_transfer_tentacle::conclude_storage_response + (file_transfer_infoton &resp, const octopus_request_id &item_id) +{ + FUNCDEF("handle_storage_response"); + if (_mode & ONLY_REPORT_DIFFS) { + // not spoken here. + return NO_HANDLER; + } + + // look up the transfer record. + file_transfer_record *the_rec = _transfers->find(item_id._entity, + resp._src_root, resp._dest_root); + if (!the_rec) return NOT_FOUND; // not registered, so reject it. + + the_rec->_last_active.reset(); // mark it as still active. + + // mark that we're done now. + the_rec->_done = true; + + // there is no response product to store. + return OKAY; +} + +// consume() is the only method that is allowed to invoke the "handle_X" methods // and it must lock the object beforehand. outcome file_transfer_tentacle::consume(infoton &to_chow, const octopus_request_id &item_id, byte_array &transformed) { -// FUNCDEF("consume"); + FUNCDEF("consume"); transformed.reset(); file_transfer_infoton *inf = dynamic_cast(&to_chow); if (!inf) return DISALLOWED; // not for us. @@ -732,6 +790,10 @@ outcome file_transfer_tentacle::consume(infoton &to_chow, if (inf->_request) return handle_storage_request(*inf, item_id); else return handle_storage_response(*inf, item_id); } + case file_transfer_infoton::CONCLUDE_TRANSFER_MARKER: { + if (inf->_request) return conclude_storage_request(*inf, item_id); + else return conclude_storage_response(*inf, item_id); + } } return BAD_INPUT; // not a recognized command. } diff --git a/octopi/library/tentacles/file_transfer_tentacle.h b/octopi/library/tentacles/file_transfer_tentacle.h index f74b4999..fc1edb48 100644 --- a/octopi/library/tentacles/file_transfer_tentacle.h +++ b/octopi/library/tentacles/file_transfer_tentacle.h @@ -175,6 +175,10 @@ private: const octopus_request_id &item_id); basis::outcome handle_storage_response(file_transfer_infoton &req, const octopus_request_id &item_id); + basis::outcome conclude_storage_request(file_transfer_infoton &req, + const octopus_request_id &item_id); + basis::outcome conclude_storage_response(file_transfer_infoton &req, + const octopus_request_id &item_id); }; } //namespace. diff --git a/octopi/library/tentacles/recursive_file_copy.cpp b/octopi/library/tentacles/recursive_file_copy.cpp index 657bc80c..e2461ca8 100644 --- a/octopi/library/tentacles/recursive_file_copy.cpp +++ b/octopi/library/tentacles/recursive_file_copy.cpp @@ -39,6 +39,9 @@ using namespace textual; namespace octopi { +//#define DEBUG_RECURSIVE_FILE_COPY + // uncomment for noisier debugging. + #define FAKE_HOSTNAME "internal_fake_host" #undef LOG @@ -141,7 +144,9 @@ outcome recursive_file_copy::copy_hierarchy(int transfer_mode, int iter = 0; while (true) { -LOG(a_sprintf("ongoing chunk %d", ++iter)); +#ifdef DEBUG_RECURSIVE_FILE_COPY + LOG(a_sprintf("ongoing chunk %d", ++iter)); +#endif // keep going until we find a broken reply. file_transfer_infoton *ongoing = new file_transfer_infoton; @@ -160,25 +165,38 @@ LOG(a_sprintf("ongoing chunk %d", ++iter)); if (!reply) RETURN_ERROR_RFC("failed to get ongoing transfer reply", NONE_READY); - if (!reply->_packed_data.length()) { + if (reply->_command == file_transfer_infoton::CONCLUDE_TRANSFER_MARKER) { BASE_LOG(astring("finished transfer from \"") + source_dir + "\" to \"" + target_dir + "\""); break; } + if (!reply->_packed_data.length()) { + RETURN_ERROR_RFC("file transfer had no packed data", GARBAGE); + } + byte_array copy = reply->_packed_data; while (copy.length()) { +#ifdef DEBUG_RECURSIVE_FILE_COPY + LOG(a_sprintf("starging size in array: %d", copy.length())); +#endif file_time empty; file_transfer_header head(empty); if (!head.unpack(copy)) RETURN_ERROR_RFC("failed to unpack header", GARBAGE); -//LOG(a_sprintf("size in array: %d", copy.length())); +#ifdef DEBUG_RECURSIVE_FILE_COPY + LOG(a_sprintf("removed head size in array: %d", copy.length())); +#endif if (copy.length() < head._length) RETURN_ERROR_RFC("not enough length in array", GARBAGE); //hmmm: are we doing nothing here besides validating that we GOT something in the header? copy.zap(0, head._length - 1); -//LOG(a_sprintf("size in array now: %d", copy.length())); +#ifdef DEBUG_RECURSIVE_FILE_COPY + LOG(a_sprintf("size in array now: %d", copy.length())); +#endif +//hmmm: this needs better formatting, and should not repeat the same file name even +// if it's in multiple chunks. //hmmm: if logging, then... BASE_LOG(head.readable_text_form()); }