(file_transfer_infoton *)NIL);
}
-// the "handle_" methods are thread-safe because the mutex is locked before
+// the "handle_" and "conclude_" methods are thread-safe because the mutex is locked before
// their invocations.
outcome file_transfer_tentacle::handle_tree_compare_request
FUNCDEF("handle_storage_request");
if (_mode & ONLY_REPORT_DIFFS) {
// store an unhandled infoton.
- unhandled_request *deny = new unhandled_request(item_id, req.classifier(),
- NO_HANDLER);
+ unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
store_product(deny, item_id);
return NO_HANDLER;
}
outcome bufret = heavy_file_operations::buffer_files
(_correspondences->translate(the_rec->_src_root), *the_rec->_diffs,
the_rec->_last_sent, resp->_packed_data, _maximum_transfer);
- if (bufret != OKAY) {
+ if (bufret == heavy_file_operations::FINISHED) {
+//here we go. finish by setting command to conclude.
+LOG("got the final marker saying heavy file ops done!");
+ resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER;
+ } else if (bufret != OKAY) {
// complain, but still send.
LOG(astring("buffer files returned an error on item=")
+ item_id.text_form() + " src=" + req._src_root + " dest="
if ( (bufret == OKAY) && !resp->_packed_data.length() ) {
// seems like the transfer is done.
-
the_rec->_done = true;
-//hmmm: mark the record and time out faster?
+ resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER;
}
resp->_request = false; // it's a response now.
return OKAY;
}
-// this is the only method that is allowed to invoke the "handle_X" methods
+outcome file_transfer_tentacle::conclude_storage_request
+ (file_transfer_infoton &req, const octopus_request_id &item_id)
+{
+ FUNCDEF("handle_storage_request");
+ if (_mode & ONLY_REPORT_DIFFS) {
+ // store an unhandled infoton.
+ unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
+ store_product(deny, item_id);
+ return NO_HANDLER;
+ }
+
+ // look up the transfer record.
+ file_transfer_record *the_rec = _transfers->find(item_id._entity,
+ req._src_root, req._dest_root);
+ if (!the_rec) {
+ LOG(astring("could not find the record for this transfer: item=")
+ + item_id.text_form() + " src=" + req._src_root + " dest="
+ + req._dest_root);
+ return NOT_FOUND; // not registered, so reject it.
+ }
+
+ the_rec->_last_active.reset(); // mark it as still active.
+
+ file_transfer_infoton *resp = dynamic_cast<file_transfer_infoton *>(req.clone());
+
+ if (!the_rec->_diffs) return BAD_INPUT; // wrong type of object.
+
+ the_rec->_done = true; // we're concluding the transfer, so that's that.
+ resp->_request = false; // it's a response now.
+ store_product(resp, item_id);
+ return common::OKAY;
+}
+
+outcome file_transfer_tentacle::conclude_storage_response
+ (file_transfer_infoton &resp, const octopus_request_id &item_id)
+{
+ FUNCDEF("handle_storage_response");
+ if (_mode & ONLY_REPORT_DIFFS) {
+ // not spoken here.
+ return NO_HANDLER;
+ }
+
+ // look up the transfer record.
+ file_transfer_record *the_rec = _transfers->find(item_id._entity,
+ resp._src_root, resp._dest_root);
+ if (!the_rec) return NOT_FOUND; // not registered, so reject it.
+
+ the_rec->_last_active.reset(); // mark it as still active.
+
+ // mark that we're done now.
+ the_rec->_done = true;
+
+ // there is no response product to store.
+ return OKAY;
+}
+
+// consume() is the only method that is allowed to invoke the "handle_X" methods
// and it must lock the object beforehand.
outcome file_transfer_tentacle::consume(infoton &to_chow,
const octopus_request_id &item_id, byte_array &transformed)
{
-// FUNCDEF("consume");
+ FUNCDEF("consume");
transformed.reset();
file_transfer_infoton *inf = dynamic_cast<file_transfer_infoton *>(&to_chow);
if (!inf) return DISALLOWED; // not for us.
if (inf->_request) return handle_storage_request(*inf, item_id);
else return handle_storage_response(*inf, item_id);
}
+ case file_transfer_infoton::CONCLUDE_TRANSFER_MARKER: {
+ if (inf->_request) return conclude_storage_request(*inf, item_id);
+ else return conclude_storage_response(*inf, item_id);
+ }
}
return BAD_INPUT; // not a recognized command.
}
namespace octopi {
+//#define DEBUG_RECURSIVE_FILE_COPY
+ // uncomment for noisier debugging.
+
#define FAKE_HOSTNAME "internal_fake_host"
#undef LOG
int iter = 0;
while (true) {
-LOG(a_sprintf("ongoing chunk %d", ++iter));
+#ifdef DEBUG_RECURSIVE_FILE_COPY
+ LOG(a_sprintf("ongoing chunk %d", ++iter));
+#endif
// keep going until we find a broken reply.
file_transfer_infoton *ongoing = new file_transfer_infoton;
if (!reply)
RETURN_ERROR_RFC("failed to get ongoing transfer reply", NONE_READY);
- if (!reply->_packed_data.length()) {
+ if (reply->_command == file_transfer_infoton::CONCLUDE_TRANSFER_MARKER) {
BASE_LOG(astring("finished transfer from \"") + source_dir
+ "\" to \"" + target_dir + "\"");
break;
}
+ if (!reply->_packed_data.length()) {
+ RETURN_ERROR_RFC("file transfer had no packed data", GARBAGE);
+ }
+
byte_array copy = reply->_packed_data;
while (copy.length()) {
+#ifdef DEBUG_RECURSIVE_FILE_COPY
+ LOG(a_sprintf("starging size in array: %d", copy.length()));
+#endif
file_time empty;
file_transfer_header head(empty);
if (!head.unpack(copy))
RETURN_ERROR_RFC("failed to unpack header", GARBAGE);
-//LOG(a_sprintf("size in array: %d", copy.length()));
+#ifdef DEBUG_RECURSIVE_FILE_COPY
+ LOG(a_sprintf("removed head size in array: %d", copy.length()));
+#endif
if (copy.length() < head._length)
RETURN_ERROR_RFC("not enough length in array", GARBAGE);
//hmmm: are we doing nothing here besides validating that we GOT something in the header?
copy.zap(0, head._length - 1);
-//LOG(a_sprintf("size in array now: %d", copy.length()));
+#ifdef DEBUG_RECURSIVE_FILE_COPY
+ LOG(a_sprintf("size in array now: %d", copy.length()));
+#endif
+//hmmm: this needs better formatting, and should not repeat the same file name even
+// if it's in multiple chunks.
//hmmm: if logging, then...
BASE_LOG(head.readable_text_form());
}