29 using namespace basis;
41 #define AUTO_LOCK auto_synchronizer loc(*_lock);
54 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
58 class file_transfer_record
76 int _refresh_interval;
82 ~file_transfer_record() {
93 to_return +=
astring(
" dest=") + _dest_root;
108 class file_transfer_status :
public amorph<file_transfer_record>
114 for (
int i = 0; i < elements(); i++) {
115 const file_transfer_record *rec = get(i);
116 if (rec && (rec->_ent == ent) && (rec->_src_root == src)
117 && (rec->_dest_root == dest) ) {
124 virtual ~file_transfer_status() {}
129 file_transfer_record *find_mapping(
const astring &source_mapping) {
130 for (
int i = 0; i < elements(); i++) {
131 const file_transfer_record *rec = get(i);
132 if (rec && (rec->_source_mapping == source_mapping) )
144 astring source_mapping = pieces[0];
147 for (
int i = 0; i < elements(); i++) {
148 const file_transfer_record *rec = get(i);
149 if (rec && (rec->_source_mapping == source_mapping) ) {
150 return rec->_src_root;
159 for (
int i = 0; i < elements(); i++) {
160 const file_transfer_record *rec = get(i);
161 if (rec && (rec->_ent == ent) && (rec->_src_root == src)
162 && (rec->_dest_root == dest) ) {
172 for (
int i = elements() - 1; i >= 0; i--) {
173 const file_transfer_record *rec = get(i);
174 if (rec && (rec->_ent == ent) )
180 bool whack_mapping(
const astring &source_mapping) {
181 for (
int i = elements() - 1; i >= 0; i--) {
182 const file_transfer_record *rec = get(i);
183 if (rec && (rec->_source_mapping == source_mapping) ) {
194 for (
int i = 0; i < elements(); i++) {
195 const file_transfer_record *rec = get(i);
197 to_return += rec->
text_form() + parser_bits::platform_eol_to_chars();
205 class file_transfer_cleaner :
public ethread
211 virtual void perform_activity(
void *
formal(ptr)) { _parent.periodic_actions(); }
219 file_transfer_tentacle::file_transfer_tentacle(
int maximum_transfer,
223 _maximum_transfer(maximum_transfer),
224 _transfers(new file_transfer_status),
225 _correspondences(new file_transfer_status),
227 _cleaner(new file_transfer_cleaner(*this)),
228 _mode(mode_of_transfer)
237 WHACK(_correspondences);
245 return _transfers->text_form();
251 _transfers->whack_all(to_remove);
256 int refresh_interval)
264 file_transfer_record *new_record =
new file_transfer_record;
265 new_record->_source_mapping = source_mapping;
266 new_record->_src_root = source_root;
267 new_record->_refresh_interval = refresh_interval;
272 if (!new_record->_local_dir->good()) {
274 return common::ACCESS_DENIED;
276 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
278 +
" src=" + new_record->_src_root);
283 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
285 +
" src=" + new_record->_src_root);
288 _correspondences->append(new_record);
294 (
const astring &source_mapping)
297 if (!_correspondences->whack_mapping(source_mapping))
308 file_transfer_record *the_rec = _transfers->find(ent, src, dest);
309 if (!the_rec)
return false;
310 if (!the_rec->_diffs)
return false;
311 diffs = *the_rec->_diffs;
317 int &total_files,
double ¤t_size,
int ¤t_files,
bool &done,
326 file_transfer_record *the_rec = _transfers->find(ent, src, dest);
327 if (!the_rec)
return false;
328 done = the_rec->_done;
329 last_active = the_rec->_last_active;
331 if (the_rec->_diffs) {
332 the_rec->_diffs->calculate_progress(the_rec->_last_sent._filename,
333 the_rec->_last_sent._byte_start + the_rec->_last_sent._length,
334 current_files, current_size);
335 total_files = the_rec->_diffs->total_files();
336 total_size = the_rec->_diffs->total_size();
346 FUNCDEF(
"register_file_transfer");
350 file_transfer_record *the_rec = _transfers->find(ent, src_root, dest_root);
352 the_rec =
new file_transfer_record;
353 the_rec->_src_root = src_root;
354 the_rec->_dest_root = dest_root;
356 the_rec->_includes = includes;
357 _transfers->append(the_rec);
359 the_rec->_done =
false;
360 the_rec->_includes = includes;
361 the_rec->_last_active.
reset();
370 return _transfers->whack(ent, src_root, dest_root)?
OKAY :
NOT_FOUND;
376 file_transfer_record *the_rec = _correspondences->find_mapping(key);
377 if (!the_rec || !the_rec->_local_dir) {
381 return the_rec->_local_dir;
393 file_transfer_record *the_rec = _correspondences->find_mapping(key);
394 if (!the_rec)
return false;
395 if (!the_rec->_local_dir)
return false;
396 return the_rec->_local_dir->add_path(new_path) == common::OKAY;
403 file_transfer_record *the_rec = _correspondences->find_mapping(key);
404 if (!the_rec)
return false;
405 if (!the_rec->_local_dir)
return false;
406 return the_rec->_local_dir->remove_path(old_path) == common::OKAY;
417 for (
int i = _transfers->elements() - 1; i >= 0; i--) {
418 const file_transfer_record *curr = _transfers->get(i);
419 if (curr->_last_active < oldest_allowed) {
420 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
422 +
" src=" + curr->_src_root +
" dest=" + curr->_dest_root);
424 _transfers->zap(i, i);
429 for (
int i = 0; i < _correspondences->elements(); i++) {
430 file_transfer_record *curr = _correspondences->borrow(i);
431 if (curr->_last_active <
time_stamp(-curr->_refresh_interval)) {
432 if (curr->_local_dir) {
433 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
435 +
" src=" + curr->_src_root +
" dest=" + curr->_dest_root);
437 WHACK(curr->_local_dir);
440 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
442 +
" src=" + curr->_src_root +
" dest=" + curr->_dest_root);
445 curr->_last_active.reset();
466 FUNCDEF(
"handle_build_target_tree_request");
472 splitting.separate(rooted, pieces);
473 astring source_mapping = pieces[0];
478 source_start.
join(rooted, pieces);
481 file_transfer_record *mapping_record = _correspondences->find_mapping(source_mapping);
482 if (!mapping_record) {
483 LOG(
astring(
"could not find source mapping of ") + source_mapping);
490 LOG(
astring(
"could not unpack requester's directory tree"));
497 LOG(
astring(
"could not unpack requester's filename includes"));
505 file_transfer_record *the_rec = _transfers->find(item_id.
_entity,
509 the_rec =
new file_transfer_record;
510 the_rec->_ent = item_id.
_entity;
513 _transfers->append(the_rec);
516 the_rec->_done =
false;
517 the_rec->_last_active.reset();
522 if (result != common::OKAY) {
523 LOG(
"ERROR: got bad result from make_directories!");
542 FUNCDEF(
"handle_build_target_tree_response");
543 file_transfer_record *the_rec = _transfers->find(item_id.
_entity,
546 LOG(
astring(
"could not find the record for this transfer: item=")
552 the_rec->_last_active.reset();
557 outcome file_transfer_tentacle::handle_tree_compare_request
560 FUNCDEF(
"handle_tree_compare_request");
566 splitting.separate(rooted, pieces);
567 astring source_mapping = pieces[0];
572 source_start.
join(rooted, pieces);
575 file_transfer_record *mapping_record
576 = _correspondences->find_mapping(source_mapping);
577 if (!mapping_record) {
578 LOG(
astring(
"could not find source mapping of ") + source_mapping);
585 LOG(
astring(
"could not unpack requester's directory tree"));
592 LOG(
astring(
"could not unpack requester's filename includes"));
600 file_transfer_record *the_rec = _transfers->find(item_id.
_entity,
604 the_rec =
new file_transfer_record;
605 the_rec->_ent = item_id.
_entity;
608 _transfers->append(the_rec);
611 the_rec->_done =
false;
612 the_rec->_last_active.reset();
617 int how_comp = file_info::EQUAL_NAME;
619 how_comp |= file_info::EQUAL_FILESIZE | file_info::EQUAL_TIMESTAMP;
621 how_comp |= file_info::EQUAL_CHECKSUM;
624 directory_tree::compare_trees(*mapping_record->_local_dir,
625 source_start.
raw(), *dest_tree, astring::empty_string(),
631 if (requested_names.
length()) {
632 for (
int i = the_rec->_diffs->elements() - 1; i >= 0; i--) {
635 for (
int j = 0; j < requested_names.
length(); j++) {
636 filename req_curr(requested_names[j]);
637 if (req_curr.compare_suffix(diff_curr)) {
642 if (!found) the_rec->_diffs->zap(i, i);
661 outcome file_transfer_tentacle::handle_tree_compare_response
664 FUNCDEF(
"handle_tree_compare_response");
665 file_transfer_record *the_rec = _transfers->find(item_id.
_entity,
668 LOG(
astring(
"could not find the record for this transfer: item=")
674 the_rec->_last_active.reset();
684 the_rec->_diffs = flist;
688 outcome file_transfer_tentacle::handle_storage_request
691 FUNCDEF(
"handle_storage_request");
700 file_transfer_record *the_rec = _transfers->find(item_id.
_entity,
703 LOG(
astring(
"could not find the record for this transfer: item=")
709 the_rec->_last_active.reset();
715 outcome bufret = heavy_file_operations::buffer_files
716 (_correspondences->translate(the_rec->_src_root), *the_rec->_diffs,
717 the_rec->_last_sent, resp->
_packed_data, _maximum_transfer);
718 if (bufret == heavy_file_operations::FINISHED) {
722 the_rec->_done =
true;
725 }
else if (bufret !=
OKAY) {
727 LOG(
astring(
"buffer files returned an error on item=")
734 LOG(
"marking empty transfer as done; why not caught above at FINISHED check?");
735 the_rec->_done =
true;
745 outcome file_transfer_tentacle::handle_storage_response
748 FUNCDEF(
"handle_storage_response");
755 file_transfer_record *the_rec = _transfers->find(item_id.
_entity,
759 the_rec->_last_active.reset();
763 the_rec->_done =
true;
776 the_rec->_last_sent = found;
789 const file_info *recorded_info = the_rec->_diffs->
find(found._filename);
790 if (!recorded_info) {
791 LOG(
astring(
"unrequested file seen: ") + found._filename);
799 outcome ret = heavy_file_operations::write_file_chunk(full_file,
800 found._byte_start, to_write);
802 LOG(
astring(
"failed to write file chunk: error=")
803 + heavy_file_operations::outcome_name(ret) +
" file=" + full_file
804 +
a_sprintf(
" start=%d len=%d", found._byte_start, found._length));
806 found._time.set_time(full_file);
813 outcome file_transfer_tentacle::conclude_storage_request
816 FUNCDEF(
"conclude_storage_request");
825 file_transfer_record *the_rec = _transfers->find(item_id.
_entity,
828 LOG(
astring(
"could not find the record for this transfer: item=")
834 the_rec->_last_active.reset();
840 the_rec->_done =
true;
850 outcome file_transfer_tentacle::conclude_storage_response
853 FUNCDEF(
"conclude_storage_response");
860 file_transfer_record *the_rec = _transfers->find(item_id.
_entity,
864 the_rec->_last_active.reset();
867 the_rec->_done =
true;
891 if (inf->
_request)
return handle_build_target_tree_request(*inf, item_id);
892 else return handle_build_target_tree_response(*inf, item_id);
895 if (inf->
_request)
return handle_tree_compare_request(*inf, item_id);
896 else return handle_tree_compare_response(*inf, item_id);
899 if (inf->
_request)
return handle_storage_request(*inf, item_id);
900 else return handle_storage_response(*inf, item_id);
903 if (inf->
_request)
return conclude_storage_request(*inf, item_id);
904 else return conclude_storage_response(*inf, item_id);
914 for (
int i = 0; i < _correspondences->elements(); i++) {
915 file_transfer_record *curr = _correspondences->borrow(i);
917 if (curr->_source_mapping != source_mapping)
continue;
918 if (curr->_local_dir) {
919 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
921 +
" src=" + curr->_src_root +
" dest=" + curr->_dest_root);
923 WHACK(curr->_local_dir);
926 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
928 +
" src=" + curr->_src_root +
" dest=" + curr->_dest_root);
931 curr->_last_active.reset();
a_sprintf is a specialization of astring that provides printf style support.
void reset(int number=0, const contents *initial_contents=NULL_POINTER)
Resizes this array and sets the contents from an array of contents.
array subarray(int start, int end) const
Returns the array segment between the indices "start" and "end".
int length() const
Returns the current reported length of the allocated C array.
outcome zap(int start, int end)
Deletes from "this" the objects inclusively between "start" and "end".
Provides a dynamically resizable ASCII character string.
bool t() const
t() is a shortcut for the string being "true", as in non-empty.
virtual char get(int index) const
a constant peek at the string's internals at the specified index.
static const astring & empty_string()
useful wherever empty strings are needed, e.g., function defaults.
virtual void text_form(base_string &state_fill) const
Provides a text view of all the important info owned by this object.
int find(char to_find, int position=0, bool reverse=false) const
Locates "to_find" in "this".
A very common template for a dynamic array of bytes.
void lock()
Clamps down on the mutex, if possible.
void unlock()
Gives up the possession of the mutex.
Outcomes describe the state of completion for an operation.
An object that traverses directory trees and provides a view of all files.
virtual bool unpack(basis::byte_array &packed_form)
unpacks the directory_tree from a byte_array.
basis::outcome make_directories(const basis::astring new_root)
creates all of the directories in this object, but start at the "new_root".
Encapsulates some measures and calculations based on a file's contents.
const basis::astring & secondary() const
observes the alternate form of the name.
file_similarity
this enum encapsulates how files may be compared.
virtual bool unpack(basis::byte_array &packed_form)
Restores the packable from the "packed_form".
Provides operations commonly needed on file names.
void join(bool rooted, const structures::string_array &pieces)
undoes a separate() operation to get the filename back.
void separate(bool &rooted, structures::string_array &pieces) const
breaks the filename into its component parts.
const basis::astring & raw() const
returns the astring that we're holding onto for the path.
Base objects used by the file transfer tentacle to schedule transfers.
virtual basis::clonable * clone() const
must be provided to allow creation of a copy of this object.
@ BUILD_TARGET_TREE
asks the target side to build the directory tree from the source.
@ CONCLUDE_TRANSFER_MARKER
this infoton marks the end of the transfer process.
@ PLACE_FILE_CHUNKS
the destination side requests a new set of chunks.
@ TREE_COMPARISON
the destination root will be compared with the source root.
basis::abyte _command
one of the commands above.
basis::outcome _success
reports what kind of result occurred.
static const structures::string_array & file_transfer_classifier()
returns the classifier for this type of infoton.
basis::astring _src_root
the top-level directory of the source.
bool _request
if it's not a request, then it's a response.
basis::astring _dest_root
the top-level directory of the destination.
basis::byte_array _packed_data
the packed headers and file chunks.
virtual void pack(basis::byte_array &packed_form) const
stuffs the data in the infoton into the "packed_form".
Manages the transferrence of directory trees from one place to another.
bool add_path(const basis::astring &source_mapping, const basis::astring &new_path)
inserts the "new_path" into a registered correspondence.
basis::outcome cancel_file_transfer(const octopus_entity &ent, const basis::astring &src_root, const basis::astring &dest_root)
tosses a previously registered file transfer.
void unlock_directory()
unlock MUST be called when one is done looking at the tree.
virtual ~file_transfer_tentacle()
virtual void expunge(const octopus_entity &to_remove)
throws out any transfers occurring for the entity "to_remove".
bool status(const octopus_entity &ent, const basis::astring &src, const basis::astring &dest, double &total_size, int &total_files, double ¤t_size, int ¤t_files, bool &done, timely::time_stamp &last_active)
locates the transfer specified and returns information about it.
basis::outcome refresh_now(const basis::astring &source_mapping)
refreshes the "source_mapping" right now, regardless of the interval.
virtual basis::outcome reconstitute(const structures::string_array &classifier, basis::byte_array &packed_form, infoton *&reformed)
recreates a "reformed" infoton from its packed form.
bool remove_path(const basis::astring &source_mapping, const basis::astring &old_path)
deletes the "old_path" out of an existing correspondence.
bool get_differences(const octopus_entity &ent, const basis::astring &src, const basis::astring &dest, filesystem::filename_list &diffs)
accesses the list of difference for an ongoing transfer.
virtual basis::outcome consume(infoton &to_chow, const octopus_request_id &item_id, basis::byte_array &transformed)
processes the "to_chow" infoton as a file transfer request.
basis::outcome remove_correspondence(const basis::astring &source_mapping)
takes out the "source_mapping" which was previously added.
void periodic_actions()
drops timed out transfers.
basis::outcome add_correspondence(const basis::astring &source_mapping, const basis::astring &source_root, int refresh_interval)
adds a file transfer correspondence.
basis::astring text_form() const
returns a string representing the current state of transfers.
@ COMPARE_SIZE_AND_TIME
uses size and time to see differences.
@ ONLY_REPORT_DIFFS
no actual file transfer, just reports.
@ COMPARE_CONTENT_SAMPLE
samples parts of file for comparison.
basis::outcome register_file_transfer(const octopus_entity &ent, const basis::astring &src_root, const basis::astring &dest_root, const structures::string_array &include)
records a transfer that is going to commence.
filesystem::directory_tree * lock_directory(const basis::astring &source_mapping)
provides a view of the tentacle's current state.
An infoton is an individual request parcel with accompanying information.
const structures::string_array & classifier() const
this array of strings is the "name" for this infoton.
Provides a way of identifying users of an octopus object.
basis::astring text_form() const
returns a readable form of the identifier.
bool blank() const
true if the entity is blank, as constructed by default constructor.
Identifies requests made on an octopus by users.
basis::astring text_form() const
human readable form of the request.
octopus_entity _entity
the entity.
provides prefab implementations for parts of the tentacle object.
bool store_product(infoton *product, const octopus_request_id &original_id)
used by tentacles to store the objects they produce from infotons.
@ NO_HANDLER
no handler for that type of infoton.
Informs the caller that a request type was unknown to the server octopus.
Provides a platform-independent object for adding threads to a program.
void reset()
cleans out all of the contents.
An array of strings with some additional helpful methods.
virtual bool unpack(basis::byte_array &packed_form)
Unpacks a string array from the "packed_form" byte array.
Represents a point in time relative to the operating system startup time.
basis::astring text_form(stamp_display_style style=STAMP_RELATIVE) const
returns a simple textual representation of the time_stamp.
#define formal(parameter)
This macro just eats what it's passed; it marks unused formal parameters.
#define NULL_POINTER
The value representing a pointer to nothing.
#define DEFINE_CLASS_NAME(objname)
Defines the name of a class by providing a couple standard methods.
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
The guards collection helps in testing preconditions and reporting errors.
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
const int SECOND_ms
Number of milliseconds in a second.
const int MINUTE_ms
Number of milliseconds in a minute.
A platform independent way to obtain the timestamp of a file.
A logger that sends to the console screen using the standard output device.
basis::outcome reconstituter(const structures::string_array &classifier, basis::byte_array &packed_form, infoton *&reformed, contents *formal(junk))
< reconstituter should work for most infotons to restore flattened infotons.
const int TRANSFER_TIMEOUT
const int FTT_CLEANING_INTERVAL
A dynamic container class that holds any kind of object via pointers.