#include <filesystem/heavy_file_ops.h>
#include <loggers/program_wide_logger.h>
#include <octopus/entity_defs.h>
+#include <octopus/entity_data_bin.h>
#include <octopus/unhandled_request.h>
#include <processes/ethread.h>
#include <textual/parser_bits.h>
astring _source_mapping; // valid for a correspondence record.
int _refresh_interval; // the rate of refreshing the source tree.
- file_transfer_record() : _diffs(NIL), _last_sent(file_time()),
- _done(false), _local_dir(NIL)
+ file_transfer_record() : _diffs(NULL_POINTER), _last_sent(file_time()),
+ _done(false), _local_dir(NULL_POINTER)
{}
~file_transfer_record() {
return borrow(i);
}
}
- return NIL;
+ return NULL_POINTER;
}
virtual ~file_transfer_status() {}
if (rec && (rec->_source_mapping == source_mapping) )
return borrow(i);
}
- return NIL;
+ return NULL_POINTER;
}
// turns a source mapping into the location that it corresponds to.
astring translate(const astring &source_path) const {
-// FUNCDEF("translate");
+ FUNCDEF("translate");
string_array pieces;
- filename(source_path).separate(pieces);
+ bool rooted;
+ filename(source_path).separate(rooted, pieces);
astring source_mapping = pieces[0];
pieces.zap(0, 0); // remove source part.
_cleaner(new file_transfer_cleaner(*this)),
_mode(mode_of_transfer)
{
- _cleaner->start(NIL);
+ _cleaner->start(NULL_POINTER);
}
file_transfer_tentacle::~file_transfer_tentacle()
(const astring &source_mapping, const astring &source_root,
int refresh_interval)
{
-#ifdef DEBUG_FILE_TRANSFER_TENTACLE
FUNCDEF("add_correspondence");
-#endif
AUTO_LOCK;
remove_correspondence(source_mapping); // clean the old one out first.
}
#ifdef DEBUG_FILE_TRANSFER_TENTACLE
LOG(astring("adding tree for: ent=") + new_record->_ent.text_form()
- + " src=" + new_record->_src_root + " dest=" + new_record->_dest_root);
+ + " src=" + new_record->_src_root);
#endif
// calculate size and checksum info for the directory.
new_record->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
#ifdef DEBUG_FILE_TRANSFER_TENTACLE
LOG(astring("done adding tree for: ent=") + new_record->_ent.text_form()
- + " src=" + new_record->_src_root + " dest=" + new_record->_dest_root);
+ + " src=" + new_record->_src_root);
#endif
_correspondences->append(new_record);
bool file_transfer_tentacle::get_differences(const octopus_entity &ent,
const astring &src, const astring &dest, filename_list &diffs)
{
-// FUNCDEF("get_differences");
+ FUNCDEF("get_differences");
diffs.reset();
AUTO_LOCK;
file_transfer_record *the_rec = _transfers->find(ent, src, dest);
int &total_files, double ¤t_size, int ¤t_files, bool &done,
time_stamp &last_active)
{
-// FUNCDEF("status");
+ FUNCDEF("status");
total_size = 0;
total_files = 0;
current_files = 0;
(const octopus_entity &ent, const astring &src_root,
const astring &dest_root, const string_array &includes)
{
-// FUNCDEF("register_file_transfer");
+ FUNCDEF("register_file_transfer");
AUTO_LOCK;
// make sure that this isn't an existing transfer. if so, we just update
// the status.
file_transfer_record *the_rec = _correspondences->find_mapping(key);
if (!the_rec || !the_rec->_local_dir) {
_lock->unlock();
- return NIL; // unknown transfer.
+ return NULL_POINTER; // unknown transfer.
}
return the_rec->_local_dir;
}
void file_transfer_tentacle::periodic_actions()
{
-#ifdef DEBUG_FILE_TRANSFER_TENTACLE
FUNCDEF("periodic_actions");
-#endif
AUTO_LOCK;
// first, we'll clean out old transfers.
if (classifier != file_transfer_infoton::file_transfer_classifier())
return NO_HANDLER;
return reconstituter(classifier, packed_form, reformed,
- (file_transfer_infoton *)NIL);
+ (file_transfer_infoton *)NULL_POINTER);
}
-// the "handle_" methods are thread-safe because the mutex is locked before
+// the "handle_" and "conclude_" methods are thread-safe because the mutex is locked before
// their invocations.
+basis::outcome file_transfer_tentacle::handle_build_target_tree_request(file_transfer_infoton &req,
+ const octopus_request_id &item_id)
+{
+ FUNCDEF("handle_build_target_tree_request");
+
+ // get the mapping from the specified location on this side.
+ filename splitting(req._src_root);
+ string_array pieces;
+ bool rooted;
+ splitting.separate(rooted, pieces);
+ astring source_mapping = pieces[0];
+
+ // patch the name up to find the sub_path for the source.
+ filename source_start;
+ pieces.zap(0, 0);
+ source_start.join(rooted, pieces);
+
+ // locate the allowed transfer depot for the mapping they provided.
+ file_transfer_record *mapping_record = _correspondences->find_mapping(source_mapping);
+ if (!mapping_record) {
+ LOG(astring("could not find source mapping of ") + source_mapping);
+ return NOT_FOUND;
+ }
+
+ // unpack the tree that they sent us which describes their local area.
+ directory_tree *dest_tree = new directory_tree;
+ if (!dest_tree->unpack(req._packed_data)) {
+ LOG(astring("could not unpack requester's directory tree"));
+ WHACK(dest_tree);
+ return GARBAGE;
+ }
+
+ string_array requested_names;
+ if (!requested_names.unpack(req._packed_data)) {
+ LOG(astring("could not unpack requester's filename includes"));
+ WHACK(dest_tree);
+ return GARBAGE;
+ }
+
+ // look up to see if this is about something that has already been seen.
+ // we don't want to add a new transfer record if they're already working on
+ // this. that also lets them do a new tree compare to restart the transfer.
+ file_transfer_record *the_rec = _transfers->find(item_id._entity,
+ req._src_root, req._dest_root);
+ if (!the_rec) {
+ // there was no existing record; we'll create a new one.
+ the_rec = new file_transfer_record;
+ the_rec->_ent = item_id._entity;
+ the_rec->_src_root = req._src_root;
+ the_rec->_dest_root = req._dest_root;
+ _transfers->append(the_rec);
+ } else {
+ // record some activity on this record.
+ the_rec->_done = false;
+ the_rec->_last_active.reset();
+ }
+
+ // create any directories that are missing at this point.
+ basis::outcome result = dest_tree->make_directories(req._dest_root);
+ if (result != common::OKAY) {
+ LOG("ERROR: got bad result from make_directories!");
+ }
+
+ req._packed_data.reset(); // clear out existing stuff before cloning.
+ file_transfer_infoton *reply = dynamic_cast<file_transfer_infoton *>(req.clone());
+
+ reply->_request = false; // it's a response now.
+ reply->_success = result;
+ store_product(reply, item_id);
+ // send back the comparison list.
+
+ return OKAY;
+}
+
+basis::outcome file_transfer_tentacle::handle_build_target_tree_response(file_transfer_infoton &resp,
+ const octopus_request_id &item_id)
+{
+//go to next step, tree comparison.
+//look at the handle tree compare response for help though.
+ FUNCDEF("handle_build_target_tree_response");
+ file_transfer_record *the_rec = _transfers->find(item_id._entity,
+ resp._src_root, resp._dest_root);
+ if (!the_rec) {
+ LOG(astring("could not find the record for this transfer: item=")
+ + item_id.text_form() + " src=" + resp._src_root + " dest="
+ + resp._dest_root);
+ return NOT_FOUND; // not registered, so reject it.
+ }
+
+ the_rec->_last_active.reset(); // record some activity on this record.
+
+ return resp._success;
+}
+
outcome file_transfer_tentacle::handle_tree_compare_request
(file_transfer_infoton &req, const octopus_request_id &item_id)
{
// get the mapping from the specified location on this side.
filename splitting(req._src_root);
string_array pieces;
- splitting.separate(pieces);
+ bool rooted;
+ splitting.separate(rooted, pieces);
astring source_mapping = pieces[0];
// patch the name up to find the sub_path for the source.
filename source_start;
pieces.zap(0, 0);
- source_start.join(pieces);
+ source_start.join(rooted, pieces);
// locate the allowed transfer depot for the mapping they provided.
file_transfer_record *mapping_record
filename req_curr(requested_names[j]);
if (req_curr.compare_suffix(diff_curr)) {
found = true;
-//LOG(astring("will use: ") + req_curr);
break;
}
}
FUNCDEF("handle_storage_request");
if (_mode & ONLY_REPORT_DIFFS) {
// store an unhandled infoton.
- unhandled_request *deny = new unhandled_request(item_id, req.classifier(),
- NO_HANDLER);
+ unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
store_product(deny, item_id);
return NO_HANDLER;
}
outcome bufret = heavy_file_operations::buffer_files
(_correspondences->translate(the_rec->_src_root), *the_rec->_diffs,
the_rec->_last_sent, resp->_packed_data, _maximum_transfer);
- if (bufret != OKAY) {
+ if (bufret == heavy_file_operations::FINISHED) {
+ bufret = OKAY; // in either case, we don't emit a finished outcome; handled elsewhere.
+ if (!resp->_packed_data.length()) {
+ // blank packages, so finish by setting command to be a conclude marker.
+ the_rec->_done = true;
+ resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER;
+ }
+ } else if (bufret != OKAY) {
// complain, but still send.
LOG(astring("buffer files returned an error on item=")
+ item_id.text_form() + " src=" + req._src_root + " dest="
+ req._dest_root);
}
- if ( (bufret == OKAY) && !resp->_packed_data.length() ) {
- // seems like the transfer is done.
-
+//can remove this block if stops saying it.
+ if ((bufret == OKAY) && !resp->_packed_data.length() ) {
+ LOG("marking empty transfer as done; why not caught above at FINISHED check?");
the_rec->_done = true;
-//hmmm: mark the record and time out faster?
+ resp->_command = file_transfer_infoton::CONCLUDE_TRANSFER_MARKER;
}
+//end of can remove.
resp->_request = false; // it's a response now.
store_product(resp, item_id);
astring full_file = resp._dest_root + filename::default_separator()
+ recorded_info->secondary();
+// LOG(astring("telling it to write to fullfile: ") + full_file);
outcome ret = heavy_file_operations::write_file_chunk(full_file,
found._byte_start, to_write);
return OKAY;
}
-// this is the only method that is allowed to invoke the "handle_X" methods
+outcome file_transfer_tentacle::conclude_storage_request
+ (file_transfer_infoton &req, const octopus_request_id &item_id)
+{
+ FUNCDEF("conclude_storage_request");
+ if (_mode & ONLY_REPORT_DIFFS) {
+ // store an unhandled infoton.
+ unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
+ store_product(deny, item_id);
+ return NO_HANDLER;
+ }
+
+ // look up the transfer record.
+ file_transfer_record *the_rec = _transfers->find(item_id._entity,
+ req._src_root, req._dest_root);
+ if (!the_rec) {
+ LOG(astring("could not find the record for this transfer: item=")
+ + item_id.text_form() + " src=" + req._src_root + " dest="
+ + req._dest_root);
+ return NOT_FOUND; // not registered, so reject it.
+ }
+
+ the_rec->_last_active.reset(); // mark it as still active.
+
+ file_transfer_infoton *resp = dynamic_cast<file_transfer_infoton *>(req.clone());
+
+ if (!the_rec->_diffs) return BAD_INPUT; // wrong type of object.
+
+ the_rec->_done = true; // we're concluding the transfer, so that's that.
+ resp->_request = false; // it's a response now.
+ store_product(resp, item_id);
+
+ LOG(astring("concluding transfer request on src=") + req._src_root + " dest="
+ + req._dest_root);
+
+ return common::OKAY;
+}
+
+outcome file_transfer_tentacle::conclude_storage_response
+ (file_transfer_infoton &resp, const octopus_request_id &item_id)
+{
+ FUNCDEF("conclude_storage_response");
+ if (_mode & ONLY_REPORT_DIFFS) {
+ // not spoken here.
+ return NO_HANDLER;
+ }
+
+ // look up the transfer record.
+ file_transfer_record *the_rec = _transfers->find(item_id._entity,
+ resp._src_root, resp._dest_root);
+ if (!the_rec) return NOT_FOUND; // not registered, so reject it.
+
+ the_rec->_last_active.reset(); // mark it as still active.
+
+ // mark that we're done now.
+ the_rec->_done = true;
+
+ LOG(astring("concluding transfer response on src=") + resp._src_root + " dest="
+ + resp._dest_root);
+
+ // there is no response product to store.
+ return OKAY;
+}
+
+// consume() is the only method that is allowed to invoke the "handle_X" methods
// and it must lock the object beforehand.
outcome file_transfer_tentacle::consume(infoton &to_chow,
const octopus_request_id &item_id, byte_array &transformed)
{
-// FUNCDEF("consume");
+ FUNCDEF("consume");
transformed.reset();
file_transfer_infoton *inf = dynamic_cast<file_transfer_infoton *>(&to_chow);
if (!inf) return DISALLOWED; // not for us.
AUTO_LOCK; // protect our lists while we're working on them.
switch (inf->_command) {
+ case file_transfer_infoton::BUILD_TARGET_TREE: {
+ if (inf->_request) return handle_build_target_tree_request(*inf, item_id);
+ else return handle_build_target_tree_response(*inf, item_id);
+ }
case file_transfer_infoton::TREE_COMPARISON: {
if (inf->_request) return handle_tree_compare_request(*inf, item_id);
else return handle_tree_compare_response(*inf, item_id);
if (inf->_request) return handle_storage_request(*inf, item_id);
else return handle_storage_response(*inf, item_id);
}
+ case file_transfer_infoton::CONCLUDE_TRANSFER_MARKER: {
+ if (inf->_request) return conclude_storage_request(*inf, item_id);
+ else return conclude_storage_response(*inf, item_id);
+ }
}
return BAD_INPUT; // not a recognized command.
}
outcome file_transfer_tentacle::refresh_now(const astring &source_mapping)
{
-#ifdef DEBUG_FILE_TRANSFER_TENTACLE
FUNCDEF("refresh_now");
-#endif
AUTO_LOCK;
for (int i = 0; i < _correspondences->elements(); i++) {
file_transfer_record *curr = _correspondences->borrow(i);