deep mods
[feisty_meow.git] / octopi / library / tentacles / file_transfer_tentacle.cpp
index bb9711e82e3c2886f3e65c20186fde53bae5ed42..1fb2ad9eb8e3a9d59ab6095f4901b1a0c6a43180 100644 (file)
@@ -21,6 +21,7 @@
 #include <filesystem/heavy_file_ops.h>
 #include <loggers/program_wide_logger.h>
 #include <octopus/entity_defs.h>
+#include <octopus/entity_data_bin.h>
 #include <octopus/unhandled_request.h>
 #include <processes/ethread.h>
 #include <textual/parser_bits.h>
@@ -74,8 +75,8 @@ public:
   astring _source_mapping;  // valid for a correspondence record.
   int _refresh_interval;  // the rate of refreshing the source tree.
 
-  file_transfer_record() : _diffs(NIL), _last_sent(file_time()),
-      _done(false), _local_dir(NIL)
+  file_transfer_record() : _diffs(NULL_POINTER), _last_sent(file_time()),
+      _done(false), _local_dir(NULL_POINTER)
   {}
 
   ~file_transfer_record() {
@@ -117,7 +118,7 @@ public:
         return borrow(i);
       }
     }
-    return NIL;
+    return NULL_POINTER;
   }
 
   virtual ~file_transfer_status() {}
@@ -131,14 +132,15 @@ public:
       if (rec && (rec->_source_mapping == source_mapping) )
         return borrow(i);
     }
-    return NIL;
+    return NULL_POINTER;
   }
 
   // turns a source mapping into the location that it corresponds to.
   astring translate(const astring &source_path) const {
-//    FUNCDEF("translate");
+    FUNCDEF("translate");
     string_array pieces;
-    filename(source_path).separate(pieces);
+    bool rooted;
+    filename(source_path).separate(rooted, pieces);
     astring source_mapping = pieces[0];
     pieces.zap(0, 0);  // remove source part.
 
@@ -225,7 +227,7 @@ file_transfer_tentacle::file_transfer_tentacle(int maximum_transfer,
   _cleaner(new file_transfer_cleaner(*this)),
   _mode(mode_of_transfer)
 {
-  _cleaner->start(NIL);
+  _cleaner->start(NULL_POINTER);
 }
 
 file_transfer_tentacle::~file_transfer_tentacle()
@@ -253,9 +255,7 @@ outcome file_transfer_tentacle::add_correspondence
     (const astring &source_mapping, const astring &source_root,
      int refresh_interval)
 {
-#ifdef DEBUG_FILE_TRANSFER_TENTACLE
   FUNCDEF("add_correspondence");
-#endif
   AUTO_LOCK;
 
   remove_correspondence(source_mapping);  // clean the old one out first.
@@ -275,14 +275,14 @@ outcome file_transfer_tentacle::add_correspondence
   }
 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
   LOG(astring("adding tree for: ent=") + new_record->_ent.text_form()
-      + " src=" + new_record->_src_root + " dest=" + new_record->_dest_root);
+      + " src=" + new_record->_src_root);
 #endif
   // calculate size and checksum info for the directory.
   new_record->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
 
 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
   LOG(astring("done adding tree for: ent=") + new_record->_ent.text_form()
-      + " src=" + new_record->_src_root + " dest=" + new_record->_dest_root);
+      + " src=" + new_record->_src_root);
 #endif
 
   _correspondences->append(new_record);
@@ -302,7 +302,7 @@ outcome file_transfer_tentacle::remove_correspondence
 bool file_transfer_tentacle::get_differences(const octopus_entity &ent,
     const astring &src, const astring &dest, filename_list &diffs)
 {
-//  FUNCDEF("get_differences");
+  FUNCDEF("get_differences");
   diffs.reset();
   AUTO_LOCK;
   file_transfer_record *the_rec = _transfers->find(ent, src, dest);
@@ -317,7 +317,7 @@ bool file_transfer_tentacle::status(const octopus_entity &ent,
     int &total_files, double &current_size, int &current_files, bool &done,
     time_stamp &last_active)
 {
-//  FUNCDEF("status");
+  FUNCDEF("status");
   total_size = 0;
   total_files = 0;
   current_files = 0;
@@ -343,7 +343,7 @@ outcome file_transfer_tentacle::register_file_transfer
     (const octopus_entity &ent, const astring &src_root,
     const astring &dest_root, const string_array &includes)
 {
-//  FUNCDEF("register_file_transfer");
+  FUNCDEF("register_file_transfer");
   AUTO_LOCK;
   // make sure that this isn't an existing transfer.  if so, we just update
   // the status.
@@ -376,7 +376,7 @@ directory_tree *file_transfer_tentacle::lock_directory(const astring &key)
   file_transfer_record *the_rec = _correspondences->find_mapping(key);
   if (!the_rec || !the_rec->_local_dir) {
     _lock->unlock();
-    return NIL;  // unknown transfer.
+    return NULL_POINTER;  // unknown transfer.
   }
   return the_rec->_local_dir;
 }
@@ -408,9 +408,7 @@ bool file_transfer_tentacle::remove_path(const astring &key,
 
 void file_transfer_tentacle::periodic_actions()
 {
-#ifdef DEBUG_FILE_TRANSFER_TENTACLE
   FUNCDEF("periodic_actions");
-#endif
   AUTO_LOCK;
 
   // first, we'll clean out old transfers.
@@ -456,12 +454,106 @@ outcome file_transfer_tentacle::reconstitute(const string_array &classifier,
   if (classifier != file_transfer_infoton::file_transfer_classifier())
     return NO_HANDLER;
   return reconstituter(classifier, packed_form, reformed,
-      (file_transfer_infoton *)NIL);
+      (file_transfer_infoton *)NULL_POINTER);
 }
 
 // the "handle_" and "conclude_" methods are thread-safe because the mutex is locked before
 // their invocations.
 
+basis::outcome file_transfer_tentacle::handle_build_target_tree_request(file_transfer_infoton &req,
+    const octopus_request_id &item_id)
+{
+  FUNCDEF("handle_build_target_tree_request");
+
+  // get the mapping from the specified location on this side.
+  filename splitting(req._src_root);
+  string_array pieces;
+  bool rooted;
+  splitting.separate(rooted, pieces);
+  astring source_mapping = pieces[0];
+
+  // patch the name up to find the sub_path for the source.
+  filename source_start;
+  pieces.zap(0, 0);
+  source_start.join(rooted, pieces);
+
+  // locate the allowed transfer depot for the mapping they provided.
+  file_transfer_record *mapping_record = _correspondences->find_mapping(source_mapping);
+  if (!mapping_record) {
+    LOG(astring("could not find source mapping of ") + source_mapping);
+    return NOT_FOUND;
+  }
+
+  // unpack the tree that they sent us which describes their local area.
+  directory_tree *dest_tree = new directory_tree;
+  if (!dest_tree->unpack(req._packed_data)) {
+    LOG(astring("could not unpack requester's directory tree"));
+    WHACK(dest_tree);
+    return GARBAGE;
+  }
+
+  string_array requested_names;
+  if (!requested_names.unpack(req._packed_data)) {
+    LOG(astring("could not unpack requester's filename includes"));
+    WHACK(dest_tree);
+    return GARBAGE;
+  }
+
+  // look up to see if this is about something that has already been seen.
+  // we don't want to add a new transfer record if they're already working on
+  // this.  that also lets them do a new tree compare to restart the transfer.
+  file_transfer_record *the_rec = _transfers->find(item_id._entity,
+      req._src_root, req._dest_root);
+  if (!the_rec) {
+    // there was no existing record; we'll create a new one.
+    the_rec = new file_transfer_record;
+    the_rec->_ent = item_id._entity;
+    the_rec->_src_root = req._src_root;
+    the_rec->_dest_root = req._dest_root;
+    _transfers->append(the_rec);
+  } else {
+    // record some activity on this record.
+    the_rec->_done = false;
+    the_rec->_last_active.reset();
+  }
+
+  // create any directories that are missing at this point.
+  basis::outcome result = dest_tree->make_directories(req._dest_root);
+  if (result != common::OKAY) {
+    LOG("ERROR: got bad result from make_directories!");
+  }
+
+  req._packed_data.reset();  // clear out existing stuff before cloning.
+  file_transfer_infoton *reply = dynamic_cast<file_transfer_infoton *>(req.clone());
+
+  reply->_request = false;  // it's a response now.
+  reply->_success = result;
+  store_product(reply, item_id);
+    // send back the comparison list.
+
+  return OKAY;
+}
+
+basis::outcome file_transfer_tentacle::handle_build_target_tree_response(file_transfer_infoton &resp,
+    const octopus_request_id &item_id)
+{
+//go to next step, tree comparison.
+//look at the handle tree compare response for help though.
+  FUNCDEF("handle_build_target_tree_response");
+  file_transfer_record *the_rec = _transfers->find(item_id._entity,
+      resp._src_root, resp._dest_root);
+  if (!the_rec) {
+    LOG(astring("could not find the record for this transfer: item=")
+        + item_id.text_form() + " src=" + resp._src_root + " dest="
+        + resp._dest_root);
+    return NOT_FOUND;  // not registered, so reject it.
+  }
+
+  the_rec->_last_active.reset();  // record some activity on this record.
+
+  return resp._success;
+}
+
 outcome file_transfer_tentacle::handle_tree_compare_request
     (file_transfer_infoton &req, const octopus_request_id &item_id)
 {
@@ -470,13 +562,14 @@ outcome file_transfer_tentacle::handle_tree_compare_request
   // get the mapping from the specified location on this side.
   filename splitting(req._src_root);
   string_array pieces;
-  splitting.separate(pieces);
+  bool rooted;
+  splitting.separate(rooted, pieces);
   astring source_mapping = pieces[0];
 
   // patch the name up to find the sub_path for the source.
   filename source_start;
   pieces.zap(0, 0);
-  source_start.join(pieces);
+  source_start.join(rooted, pieces);
 
   // locate the allowed transfer depot for the mapping they provided.
   file_transfer_record *mapping_record
@@ -543,7 +636,6 @@ outcome file_transfer_tentacle::handle_tree_compare_request
         filename req_curr(requested_names[j]);
         if (req_curr.compare_suffix(diff_curr)) {
           found = true;
-//LOG(astring("will use: ") + req_curr);
           break;
         }
       }
@@ -624,9 +716,12 @@ outcome file_transfer_tentacle::handle_storage_request
       (_correspondences->translate(the_rec->_src_root), *the_rec->_diffs,
       the_rec->_last_sent, resp->_packed_data, _maximum_transfer);
   if (bufret == heavy_file_operations::FINISHED) {
-//here we go.  finish by setting command to conclude.
-LOG("got the final marker saying heavy file ops done!");
-    resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER;
+    bufret = OKAY;  // in either case, we don't emit a finished outcome; handled elsewhere.
+    if (!resp->_packed_data.length()) {
+      // blank packages, so finish by setting command to be a conclude marker.
+      the_rec->_done = true;
+      resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER;
+    }
   } else if (bufret != OKAY) {
     // complain, but still send.
     LOG(astring("buffer files returned an error on item=")
@@ -634,11 +729,13 @@ LOG("got the final marker saying heavy file ops done!");
         + req._dest_root);
   }
 
-  if ( (bufret == OKAY) && !resp->_packed_data.length() ) {
-    // seems like the transfer is done.
+//can remove this block if stops saying it.
+  if ((bufret == OKAY) && !resp->_packed_data.length() ) {
+    LOG("marking empty transfer as done; why not caught above at FINISHED check?");
     the_rec->_done = true;
     resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER;
   }
+//end of can remove.
 
   resp->_request = false;  // it's a response now.
   store_product(resp, item_id);
@@ -697,6 +794,7 @@ outcome file_transfer_tentacle::handle_storage_response
 
     astring full_file = resp._dest_root + filename::default_separator()
         + recorded_info->secondary();
+//     LOG(astring("telling it to write to fullfile: ") + full_file);
 
     outcome ret = heavy_file_operations::write_file_chunk(full_file,
         found._byte_start, to_write);
@@ -715,7 +813,7 @@ outcome file_transfer_tentacle::handle_storage_response
 outcome file_transfer_tentacle::conclude_storage_request
     (file_transfer_infoton &req, const octopus_request_id &item_id)
 {
-  FUNCDEF("handle_storage_request");
+  FUNCDEF("conclude_storage_request");
   if (_mode & ONLY_REPORT_DIFFS) {
     // store an unhandled infoton.
     unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
@@ -742,13 +840,17 @@ outcome file_transfer_tentacle::conclude_storage_request
   the_rec->_done = true;  // we're concluding the transfer, so that's that.
   resp->_request = false;  // it's a response now.
   store_product(resp, item_id);
+
+  LOG(astring("concluding transfer request on src=") + req._src_root + " dest="
+      + req._dest_root);
+
   return common::OKAY;
 }
 
 outcome file_transfer_tentacle::conclude_storage_response
     (file_transfer_infoton &resp, const octopus_request_id &item_id)
 {
-  FUNCDEF("handle_storage_response");
+  FUNCDEF("conclude_storage_response");
   if (_mode & ONLY_REPORT_DIFFS) {
     // not spoken here.
     return NO_HANDLER;
@@ -764,6 +866,9 @@ outcome file_transfer_tentacle::conclude_storage_response
   // mark that we're done now.
   the_rec->_done = true;
 
+  LOG(astring("concluding transfer response on src=") + resp._src_root + " dest="
+      + resp._dest_root);
+
   // there is no response product to store.
   return OKAY;
 }
@@ -782,6 +887,10 @@ outcome file_transfer_tentacle::consume(infoton &to_chow,
   AUTO_LOCK;  // protect our lists while we're working on them.
 
   switch (inf->_command) {
+    case file_transfer_infoton::BUILD_TARGET_TREE: {
+      if (inf->_request) return handle_build_target_tree_request(*inf, item_id);
+      else return handle_build_target_tree_response(*inf, item_id);
+    }
     case file_transfer_infoton::TREE_COMPARISON: {
       if (inf->_request) return handle_tree_compare_request(*inf, item_id);
       else return handle_tree_compare_response(*inf, item_id);
@@ -800,9 +909,7 @@ outcome file_transfer_tentacle::consume(infoton &to_chow,
 
 outcome file_transfer_tentacle::refresh_now(const astring &source_mapping)
 {
-#ifdef DEBUG_FILE_TRANSFER_TENTACLE
   FUNCDEF("refresh_now");
-#endif
   AUTO_LOCK;
   for (int i = 0; i < _correspondences->elements(); i++) {
     file_transfer_record *curr = _correspondences->borrow(i);