updated to change how final part of transfer is tagged.
authorChris Koeritz <fred@gruntose.com>
Sun, 16 Sep 2012 21:02:05 +0000 (17:02 -0400)
committerChris Koeritz <fred@gruntose.com>
Sun, 16 Sep 2012 21:02:05 +0000 (17:02 -0400)
nucleus/library/filesystem/heavy_file_ops.cpp
nucleus/library/filesystem/heavy_file_ops.h
octopi/library/tentacles/file_transfer_infoton.h
octopi/library/tentacles/file_transfer_tentacle.cpp
octopi/library/tentacles/file_transfer_tentacle.h
octopi/library/tentacles/recursive_file_copy.cpp

index 1e63ba318f846d5cb899c79f841a1fe31e7028cf..e8f48ebd5d20e585546ffe5a024250e27abd7499 100644 (file)
@@ -235,14 +235,12 @@ outcome heavy_file_operations::buffer_files(const astring &source_root,
     const filename_list &to_transfer, file_transfer_header &last_action,
     byte_array &storage, int maximum_bytes)
 {
-#ifdef DEBUG_HEAVY_FILE_OPS
-//  FUNCDEF("buffer_files");
-#endif
+  FUNCDEF("buffer_files");
   storage.reset();  // clear out the current contents.
 
   if (!to_transfer.elements()) {
     // we seem to be done.
-    return OKAY;
+    return FINISHED;
   }
 
   outcome to_return = OKAY;
index add4968375f4545bf6d259087eb346c2a2ac930e..8cfdc42f258cc627d866094dcb7b232020d122f5 100644 (file)
@@ -63,10 +63,7 @@ public:
   enum outcomes {
     OKAY = basis::common::OKAY,
     BAD_INPUT = basis::common::BAD_INPUT,
-//    GARBAGE = basis::common::GARBAGE,
-//    NOT_FOUND = basis::common::NOT_FOUND,
-//    NONE_READY = basis::common::NONE_READY,
-//    FAILURE = basis::common::FAILURE,
+    FINISHED = basis::common::IS_EMPTY,  // nothing left to pack.
     DEFINE_OUTCOME(SOURCE_MISSING, -43, "The source file is not accessible"),
     DEFINE_OUTCOME(TARGET_DIR_ERROR, -44, "The target's directory could not "
         "be created"),
index 5077d1ed2acd2c25d781960f1d42bc20127e1e40..2ea10e2a48af359ede1d65ff7016dadefccd1d33 100644 (file)
@@ -38,7 +38,7 @@ public:
       is the packed filename list that represents the differences between the
       two hierarchies.  this is considered to start a file transfer based on
       those differences. */
-    PLACE_FILE_CHUNKS
+    PLACE_FILE_CHUNKS,
       //!< the destination side requests a new set of chunks.
       /*!< this is based on the source's memory of where the transfer is at.
       this will only perform properly when the file transfer was requested to
@@ -47,6 +47,11 @@ public:
       number of pairs of @code
       [ file_transfer_header + file chunk described in header ]
       @endcode */
+    CONCLUDE_TRANSFER_MARKER
+      //!< this infoton marks the end of the transfer process.
+      /*!< we've added this type of transfer infoton to handle the finish
+      of the transfer.  previously this was marked by a null data packet,
+      which turns out to be a really bad idea. */
   };
 
   basis::outcome _success;  //!< reports what kind of result occurred.
index fa082b990073f94a8feb26b341329929e41b0695..bb9711e82e3c2886f3e65c20186fde53bae5ed42 100644 (file)
@@ -459,7 +459,7 @@ outcome file_transfer_tentacle::reconstitute(const string_array &classifier,
       (file_transfer_infoton *)NIL);
 }
 
-// the "handle_" methods are thread-safe because the mutex is locked before
+// the "handle_" and "conclude_" methods are thread-safe because the mutex is locked before
 // their invocations.
 
 outcome file_transfer_tentacle::handle_tree_compare_request
@@ -599,8 +599,7 @@ outcome file_transfer_tentacle::handle_storage_request
   FUNCDEF("handle_storage_request");
   if (_mode & ONLY_REPORT_DIFFS) {
     // store an unhandled infoton.
-    unhandled_request *deny = new unhandled_request(item_id, req.classifier(),
-        NO_HANDLER);
+    unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
     store_product(deny, item_id);
     return NO_HANDLER;
   }
@@ -624,7 +623,11 @@ outcome file_transfer_tentacle::handle_storage_request
   outcome bufret = heavy_file_operations::buffer_files
       (_correspondences->translate(the_rec->_src_root), *the_rec->_diffs,
       the_rec->_last_sent, resp->_packed_data, _maximum_transfer);
-  if (bufret != OKAY) {
+  if (bufret == heavy_file_operations::FINISHED) {
+//here we go.  finish by setting command to conclude.
+LOG("got the final marker saying heavy file ops done!");
+    resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER;
+  } else if (bufret != OKAY) {
     // complain, but still send.
     LOG(astring("buffer files returned an error on item=")
         + item_id.text_form() + " src=" + req._src_root + " dest="
@@ -633,9 +636,8 @@ outcome file_transfer_tentacle::handle_storage_request
 
   if ( (bufret == OKAY) && !resp->_packed_data.length() ) {
     // seems like the transfer is done.
-
     the_rec->_done = true;
-//hmmm: mark the record and time out faster?
+    resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER;
   }
 
   resp->_request = false;  // it's a response now.
@@ -710,13 +712,69 @@ outcome file_transfer_tentacle::handle_storage_response
   return OKAY;
 }
 
-// this is the only method that is allowed to invoke the "handle_X" methods
+outcome file_transfer_tentacle::conclude_storage_request
+    (file_transfer_infoton &req, const octopus_request_id &item_id)
+{
+  FUNCDEF("handle_storage_request");
+  if (_mode & ONLY_REPORT_DIFFS) {
+    // store an unhandled infoton.
+    unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
+    store_product(deny, item_id);
+    return NO_HANDLER;
+  }
+
+  // look up the transfer record.
+  file_transfer_record *the_rec = _transfers->find(item_id._entity,
+      req._src_root, req._dest_root);
+  if (!the_rec) {
+    LOG(astring("could not find the record for this transfer: item=")
+        + item_id.text_form() + " src=" + req._src_root + " dest="
+        + req._dest_root);
+    return NOT_FOUND;  // not registered, so reject it.
+  }
+
+  the_rec->_last_active.reset();  // mark it as still active.
+
+  file_transfer_infoton *resp = dynamic_cast<file_transfer_infoton *>(req.clone());
+
+  if (!the_rec->_diffs) return BAD_INPUT;  // wrong type of object.
+
+  the_rec->_done = true;  // we're concluding the transfer, so that's that.
+  resp->_request = false;  // it's a response now.
+  store_product(resp, item_id);
+  return common::OKAY;
+}
+
+outcome file_transfer_tentacle::conclude_storage_response
+    (file_transfer_infoton &resp, const octopus_request_id &item_id)
+{
+  FUNCDEF("handle_storage_response");
+  if (_mode & ONLY_REPORT_DIFFS) {
+    // not spoken here.
+    return NO_HANDLER;
+  }
+
+  // look up the transfer record.
+  file_transfer_record *the_rec = _transfers->find(item_id._entity,
+      resp._src_root, resp._dest_root);
+  if (!the_rec) return NOT_FOUND;  // not registered, so reject it.
+
+  the_rec->_last_active.reset();  // mark it as still active.
+
+  // mark that we're done now.
+  the_rec->_done = true;
+
+  // there is no response product to store.
+  return OKAY;
+}
+
+// consume() is the only method that is allowed to invoke the "handle_X" methods
 // and it must lock the object beforehand.
 
 outcome file_transfer_tentacle::consume(infoton &to_chow,
     const octopus_request_id &item_id, byte_array &transformed)
 {
-//  FUNCDEF("consume");
+  FUNCDEF("consume");
   transformed.reset();
   file_transfer_infoton *inf = dynamic_cast<file_transfer_infoton *>(&to_chow);
   if (!inf) return DISALLOWED;  // not for us.
@@ -732,6 +790,10 @@ outcome file_transfer_tentacle::consume(infoton &to_chow,
       if (inf->_request) return handle_storage_request(*inf, item_id);
       else return handle_storage_response(*inf, item_id);
     }
+    case file_transfer_infoton::CONCLUDE_TRANSFER_MARKER: {
+      if (inf->_request) return conclude_storage_request(*inf, item_id);
+      else return conclude_storage_response(*inf, item_id);
+    }
   }
   return BAD_INPUT;  // not a recognized command.
 }
index f74b499967da0a720e08a4c9ea5bc760f5cc387a..fc1edb483377aa14d34ae34e733604d9db8b2cf1 100644 (file)
@@ -175,6 +175,10 @@ private:
           const octopus_request_id &item_id);
   basis::outcome handle_storage_response(file_transfer_infoton &req,
           const octopus_request_id &item_id);
+  basis::outcome conclude_storage_request(file_transfer_infoton &req,
+          const octopus_request_id &item_id);
+  basis::outcome conclude_storage_response(file_transfer_infoton &req,
+          const octopus_request_id &item_id);
 };
 
 } //namespace.
index 657bc80cd24744678d5d81180f61740680ccf64f..e2461ca8a8a9254bbdc8a893d6df8dae80a8b9de 100644 (file)
@@ -39,6 +39,9 @@ using namespace textual;
 
 namespace octopi {
 
+//#define DEBUG_RECURSIVE_FILE_COPY
+  // uncomment for noisier debugging.
+
 #define FAKE_HOSTNAME "internal_fake_host"
 
 #undef LOG
@@ -141,7 +144,9 @@ outcome recursive_file_copy::copy_hierarchy(int transfer_mode,
 
   int iter = 0;
   while (true) {
-LOG(a_sprintf("ongoing chunk %d", ++iter));
+#ifdef DEBUG_RECURSIVE_FILE_COPY
+    LOG(a_sprintf("ongoing chunk %d", ++iter));
+#endif
 
     // keep going until we find a broken reply.
     file_transfer_infoton *ongoing = new file_transfer_infoton;
@@ -160,25 +165,38 @@ LOG(a_sprintf("ongoing chunk %d", ++iter));
     if (!reply)
       RETURN_ERROR_RFC("failed to get ongoing transfer reply", NONE_READY);
 
-    if (!reply->_packed_data.length()) {
+    if (reply->_command == file_transfer_infoton::CONCLUDE_TRANSFER_MARKER) {
       BASE_LOG(astring("finished transfer from \"") + source_dir
           + "\" to \"" + target_dir + "\"");
       break;
     }
 
+    if (!reply->_packed_data.length()) {
+      RETURN_ERROR_RFC("file transfer had no packed data", GARBAGE);
+    }
+
     byte_array copy = reply->_packed_data;
     while (copy.length()) {
+#ifdef DEBUG_RECURSIVE_FILE_COPY
+      LOG(a_sprintf("starging size in array: %d", copy.length()));
+#endif
       file_time empty;
       file_transfer_header head(empty);
       if (!head.unpack(copy)) 
         RETURN_ERROR_RFC("failed to unpack header", GARBAGE);
-//LOG(a_sprintf("size in array: %d", copy.length()));
+#ifdef DEBUG_RECURSIVE_FILE_COPY
+      LOG(a_sprintf("removed head size in array: %d", copy.length()));
+#endif
       if (copy.length() < head._length)
         RETURN_ERROR_RFC("not enough length in array", GARBAGE);
 //hmmm: are we doing nothing here besides validating that we GOT something in the header?
       copy.zap(0, head._length - 1);
-//LOG(a_sprintf("size in array now: %d", copy.length()));
+#ifdef DEBUG_RECURSIVE_FILE_COPY
+      LOG(a_sprintf("size in array now: %d", copy.length()));
+#endif
 
+//hmmm: this needs better formatting, and should not repeat the same file name even
+//      if it's in multiple chunks.
 //hmmm: if logging, then...
       BASE_LOG(head.readable_text_form());
     }