more updates, still not right.
[feisty_meow.git] / octopi / library / tentacles / file_transfer_tentacle.cpp
index fa082b990073f94a8feb26b341329929e41b0695..877672b11477a482e4453efcdd31c8d8e6ad1c78 100644 (file)
@@ -46,7 +46,7 @@ const int FTT_CLEANING_INTERVAL = 30 * SECOND_ms;
 const int TRANSFER_TIMEOUT = 10 * MINUTE_ms;
   // if it hasn't been touched in this long, it's out of there.
 
-//#define DEBUG_FILE_TRANSFER_TENTACLE
+#define DEBUG_FILE_TRANSFER_TENTACLE
   // uncomment for noisier version.
 
 #undef LOG
@@ -136,7 +136,7 @@ public:
 
   // turns a source mapping into the location that it corresponds to.
   astring translate(const astring &source_path) const {
-//    FUNCDEF("translate");
+    FUNCDEF("translate");
     string_array pieces;
     filename(source_path).separate(pieces);
     astring source_mapping = pieces[0];
@@ -253,9 +253,7 @@ outcome file_transfer_tentacle::add_correspondence
     (const astring &source_mapping, const astring &source_root,
      int refresh_interval)
 {
-#ifdef DEBUG_FILE_TRANSFER_TENTACLE
   FUNCDEF("add_correspondence");
-#endif
   AUTO_LOCK;
 
   remove_correspondence(source_mapping);  // clean the old one out first.
@@ -302,7 +300,7 @@ outcome file_transfer_tentacle::remove_correspondence
 bool file_transfer_tentacle::get_differences(const octopus_entity &ent,
     const astring &src, const astring &dest, filename_list &diffs)
 {
-//  FUNCDEF("get_differences");
+  FUNCDEF("get_differences");
   diffs.reset();
   AUTO_LOCK;
   file_transfer_record *the_rec = _transfers->find(ent, src, dest);
@@ -317,7 +315,7 @@ bool file_transfer_tentacle::status(const octopus_entity &ent,
     int &total_files, double &current_size, int &current_files, bool &done,
     time_stamp &last_active)
 {
-//  FUNCDEF("status");
+  FUNCDEF("status");
   total_size = 0;
   total_files = 0;
   current_files = 0;
@@ -343,7 +341,7 @@ outcome file_transfer_tentacle::register_file_transfer
     (const octopus_entity &ent, const astring &src_root,
     const astring &dest_root, const string_array &includes)
 {
-//  FUNCDEF("register_file_transfer");
+  FUNCDEF("register_file_transfer");
   AUTO_LOCK;
   // make sure that this isn't an existing transfer.  if so, we just update
   // the status.
@@ -408,9 +406,7 @@ bool file_transfer_tentacle::remove_path(const astring &key,
 
 void file_transfer_tentacle::periodic_actions()
 {
-#ifdef DEBUG_FILE_TRANSFER_TENTACLE
   FUNCDEF("periodic_actions");
-#endif
   AUTO_LOCK;
 
   // first, we'll clean out old transfers.
@@ -459,7 +455,7 @@ outcome file_transfer_tentacle::reconstitute(const string_array &classifier,
       (file_transfer_infoton *)NIL);
 }
 
-// the "handle_" methods are thread-safe because the mutex is locked before
+// the "handle_" and "conclude_" methods are thread-safe because the mutex is locked before
 // their invocations.
 
 outcome file_transfer_tentacle::handle_tree_compare_request
@@ -599,8 +595,7 @@ outcome file_transfer_tentacle::handle_storage_request
   FUNCDEF("handle_storage_request");
   if (_mode & ONLY_REPORT_DIFFS) {
     // store an unhandled infoton.
-    unhandled_request *deny = new unhandled_request(item_id, req.classifier(),
-        NO_HANDLER);
+    unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
     store_product(deny, item_id);
     return NO_HANDLER;
   }
@@ -624,19 +619,25 @@ outcome file_transfer_tentacle::handle_storage_request
   outcome bufret = heavy_file_operations::buffer_files
       (_correspondences->translate(the_rec->_src_root), *the_rec->_diffs,
       the_rec->_last_sent, resp->_packed_data, _maximum_transfer);
-  if (bufret != OKAY) {
+  if (bufret == heavy_file_operations::FINISHED) {
+    // finish by setting command to be a conclude marker.
+    the_rec->_done = true;
+    resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER;
+    bufret = OKAY;  // now it's no longer an exceptional outcome.
+  } else if (bufret != OKAY) {
     // complain, but still send.
     LOG(astring("buffer files returned an error on item=")
         + item_id.text_form() + " src=" + req._src_root + " dest="
         + req._dest_root);
   }
 
-  if ( (bufret == OKAY) && !resp->_packed_data.length() ) {
-    // seems like the transfer is done.
-
+//can remove this block if stops saying it.
+  if ((bufret == OKAY) && !resp->_packed_data.length() ) {
+    LOG("marking empty transfer as done; why not caught above at FINISHED check?");
     the_rec->_done = true;
-//hmmm: mark the record and time out faster?
+    resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER;
   }
+//end of can remove.
 
   resp->_request = false;  // it's a response now.
   store_product(resp, item_id);
@@ -710,13 +711,76 @@ outcome file_transfer_tentacle::handle_storage_response
   return OKAY;
 }
 
-// this is the only method that is allowed to invoke the "handle_X" methods
+outcome file_transfer_tentacle::conclude_storage_request
+    (file_transfer_infoton &req, const octopus_request_id &item_id)
+{
+  FUNCDEF("conclude_storage_request");
+  if (_mode & ONLY_REPORT_DIFFS) {
+    // store an unhandled infoton.
+    unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
+    store_product(deny, item_id);
+    return NO_HANDLER;
+  }
+
+  // look up the transfer record.
+  file_transfer_record *the_rec = _transfers->find(item_id._entity,
+      req._src_root, req._dest_root);
+  if (!the_rec) {
+    LOG(astring("could not find the record for this transfer: item=")
+        + item_id.text_form() + " src=" + req._src_root + " dest="
+        + req._dest_root);
+    return NOT_FOUND;  // not registered, so reject it.
+  }
+
+  the_rec->_last_active.reset();  // mark it as still active.
+
+  file_transfer_infoton *resp = dynamic_cast<file_transfer_infoton *>(req.clone());
+
+  if (!the_rec->_diffs) return BAD_INPUT;  // wrong type of object.
+
+  the_rec->_done = true;  // we're concluding the transfer, so that's that.
+  resp->_request = false;  // it's a response now.
+  store_product(resp, item_id);
+
+  LOG(astring("concluding transfer request on src=") + req._src_root + " dest="
+      + req._dest_root);
+
+  return common::OKAY;
+}
+
+outcome file_transfer_tentacle::conclude_storage_response
+    (file_transfer_infoton &resp, const octopus_request_id &item_id)
+{
+  FUNCDEF("conclude_storage_response");
+  if (_mode & ONLY_REPORT_DIFFS) {
+    // not spoken here.
+    return NO_HANDLER;
+  }
+
+  // look up the transfer record.
+  file_transfer_record *the_rec = _transfers->find(item_id._entity,
+      resp._src_root, resp._dest_root);
+  if (!the_rec) return NOT_FOUND;  // not registered, so reject it.
+
+  the_rec->_last_active.reset();  // mark it as still active.
+
+  // mark that we're done now.
+  the_rec->_done = true;
+
+  LOG(astring("concluding transfer response on src=") + resp._src_root + " dest="
+      + resp._dest_root);
+
+  // there is no response product to store.
+  return OKAY;
+}
+
+// consume() is the only method that is allowed to invoke the "handle_X" methods
 // and it must lock the object beforehand.
 
 outcome file_transfer_tentacle::consume(infoton &to_chow,
     const octopus_request_id &item_id, byte_array &transformed)
 {
-//  FUNCDEF("consume");
+  FUNCDEF("consume");
   transformed.reset();
   file_transfer_infoton *inf = dynamic_cast<file_transfer_infoton *>(&to_chow);
   if (!inf) return DISALLOWED;  // not for us.
@@ -732,15 +796,17 @@ outcome file_transfer_tentacle::consume(infoton &to_chow,
       if (inf->_request) return handle_storage_request(*inf, item_id);
       else return handle_storage_response(*inf, item_id);
     }
+    case file_transfer_infoton::CONCLUDE_TRANSFER_MARKER: {
+      if (inf->_request) return conclude_storage_request(*inf, item_id);
+      else return conclude_storage_response(*inf, item_id);
+    }
   }
   return BAD_INPUT;  // not a recognized command.
 }
 
 outcome file_transfer_tentacle::refresh_now(const astring &source_mapping)
 {
-#ifdef DEBUG_FILE_TRANSFER_TENTACLE
   FUNCDEF("refresh_now");
-#endif
   AUTO_LOCK;
   for (int i = 0; i < _correspondences->elements(); i++) {
     file_transfer_record *curr = _correspondences->borrow(i);