feisty meow concerns codebase  2.140
file_transfer_tentacle.cpp
Go to the documentation of this file.
1 /*****************************************************************************\
2 * *
3 * Name : file_transfer_tentacle *
4 * Author : Chris Koeritz *
5 * *
6 *******************************************************************************
7 * Copyright (c) 2005-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
14 
15 #include "file_transfer_tentacle.h"
16 
17 #include <basis/mutex.h>
19 #include <filesystem/filename.h>
23 #include <octopus/entity_defs.h>
26 #include <processes/ethread.h>
27 #include <textual/parser_bits.h>
28 
29 using namespace basis;
30 using namespace filesystem;
31 using namespace loggers;
32 using namespace octopi;
33 using namespace processes;
34 using namespace structures;
35 using namespace textual;
36 using namespace timely;
37 
38 namespace octopi {
39 
40 #undef AUTO_LOCK
41 #define AUTO_LOCK auto_synchronizer loc(*_lock);
42  // protects our lists.
43 
45  // this is how frequently we clean up the list to remove outdated transfers.
46 
47 const int TRANSFER_TIMEOUT = 10 * MINUTE_ms;
48  // if it hasn't been touched in this long, it's out of there.
49 
50 //#define DEBUG_FILE_TRANSFER_TENTACLE
51  // uncomment for noisier version.
52 
53 #undef LOG
54 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
55 
57 
58 class file_transfer_record
59 {
60 public:
61  // valid for both transfers and correspondences.
62  astring _src_root; // where the info is on the data provider.
63  time_stamp _last_active; // when this was last used.
64 
65  // valid for file transfers only.
66  octopus_entity _ent; // the entity requesting this service.
67  astring _dest_root; // where the info is on the data sink.
68  filename_list *_diffs; // the differences to be transferred.
69  file_transfer_header _last_sent; // the last chunk that was sent.
70  bool _done; // true if the transfer is finished.
71  string_array _includes; // the set to include.
72 
73  // valid for correspondence records only.
74  directory_tree *_local_dir; // our local information about the transfer.
75  astring _source_mapping; // valid for a correspondence record.
76  int _refresh_interval; // the rate of refreshing the source tree.
77 
78  file_transfer_record() : _diffs(NULL_POINTER), _last_sent(file_time()),
79  _done(false), _local_dir(NULL_POINTER)
80  {}
81 
82  ~file_transfer_record() {
83  WHACK(_local_dir);
84  WHACK(_diffs);
85  }
86 
87  astring text_form() const {
88  astring to_return;
89  to_return += astring("src=") + _src_root + astring(" last act=")
90  + _last_active.text_form();
91  if (_ent.blank()) to_return += astring(" ent=") + _ent.text_form();
92  if (_dest_root.t()) {
93  to_return += astring(" dest=") + _dest_root;
94  to_return += astring(" last_sent=") + _last_sent.text_form();
95  }
96  return to_return;
97  }
98 };
99 
101 
102 // this implementation assumes that the same entity will never simultaneously
103 // transfer the same source to the same destination. that assumption holds
104 // up fine for different clients, since they should have different entities.
105 // when there is a collision on the entity/src/dest, then the default action
106 // is to assume that the transfer is just being started over.
107 
108 class file_transfer_status : public amorph<file_transfer_record>
109 {
110 public:
111  // find a transfer record by the key fields.
112  file_transfer_record *find(const octopus_entity &ent, const astring &src,
113  const astring &dest) {
114  for (int i = 0; i < elements(); i++) {
115  const file_transfer_record *rec = get(i);
116  if (rec && (rec->_ent == ent) && (rec->_src_root == src)
117  && (rec->_dest_root == dest) ) {
118  return borrow(i);
119  }
120  }
121  return NULL_POINTER;
122  }
123 
124  virtual ~file_transfer_status() {}
125 
126  DEFINE_CLASS_NAME("file_transfer_status");
127 
128  // find a file correspondence record by the mapping name.
129  file_transfer_record *find_mapping(const astring &source_mapping) {
130  for (int i = 0; i < elements(); i++) {
131  const file_transfer_record *rec = get(i);
132  if (rec && (rec->_source_mapping == source_mapping) )
133  return borrow(i);
134  }
135  return NULL_POINTER;
136  }
137 
138  // turns a source mapping into the location that it corresponds to.
139  astring translate(const astring &source_path) const {
140  FUNCDEF("translate");
141  string_array pieces;
142  bool rooted;
143  filename(source_path).separate(rooted, pieces);
144  astring source_mapping = pieces[0];
145  pieces.zap(0, 0); // remove source part.
146 
147  for (int i = 0; i < elements(); i++) {
148  const file_transfer_record *rec = get(i);
149  if (rec && (rec->_source_mapping == source_mapping) ) {
150  return rec->_src_root;
151  }
152  }
153  return astring::empty_string();
154  }
155 
156  // removes a file transfer record by the key fields.
157  bool whack(const octopus_entity &ent, const astring &src,
158  const astring &dest) {
159  for (int i = 0; i < elements(); i++) {
160  const file_transfer_record *rec = get(i);
161  if (rec && (rec->_ent == ent) && (rec->_src_root == src)
162  && (rec->_dest_root == dest) ) {
163  zap(i, i);
164  return true;
165  }
166  }
167  return false;
168  }
169 
170  // clean all records for the entity "ent".
171  void whack_all(const octopus_entity &ent) {
172  for (int i = elements() - 1; i >= 0; i--) {
173  const file_transfer_record *rec = get(i);
174  if (rec && (rec->_ent == ent) )
175  zap(i, i);
176  }
177  }
178 
179  // removes a file transfer correspondence.
180  bool whack_mapping(const astring &source_mapping) {
181  for (int i = elements() - 1; i >= 0; i--) {
182  const file_transfer_record *rec = get(i);
183  if (rec && (rec->_source_mapping == source_mapping) ) {
184  zap(i, i);
185  return true;
186  }
187  }
188  return false;
189  }
190 
191  // returns a string dump of the fields in this list.
192  astring text_form() const {
193  astring to_return;
194  for (int i = 0; i < elements(); i++) {
195  const file_transfer_record *rec = get(i);
196  if (rec)
197  to_return += rec->text_form() + parser_bits::platform_eol_to_chars();
198  }
199  return to_return;
200  }
201 };
202 
204 
205 class file_transfer_cleaner : public ethread
206 {
207 public:
208  file_transfer_cleaner(file_transfer_tentacle &parent)
209  : ethread(FTT_CLEANING_INTERVAL, SLACK_INTERVAL), _parent(parent) {}
210 
211  virtual void perform_activity(void *formal(ptr)) { _parent.periodic_actions(); }
212 
213 private:
214  file_transfer_tentacle &_parent;
215 };
216 
218 
219 file_transfer_tentacle::file_transfer_tentacle(int maximum_transfer,
222  (file_transfer_infoton::file_transfer_classifier(), false),
223  _maximum_transfer(maximum_transfer),
224  _transfers(new file_transfer_status),
225  _correspondences(new file_transfer_status),
226  _lock(new mutex),
227  _cleaner(new file_transfer_cleaner(*this)),
228  _mode(mode_of_transfer)
229 {
230  _cleaner->start(NULL_POINTER);
231 }
232 
234 {
235  _cleaner->stop();
236  WHACK(_transfers);
237  WHACK(_correspondences);
238  WHACK(_cleaner);
239  WHACK(_lock);
240 }
241 
243 {
244  AUTO_LOCK;
245  return _transfers->text_form();
246 }
247 
249 {
250  AUTO_LOCK;
251  _transfers->whack_all(to_remove);
252 }
253 
255  (const astring &source_mapping, const astring &source_root,
256  int refresh_interval)
257 {
258  FUNCDEF("add_correspondence");
259  AUTO_LOCK;
260 
261  remove_correspondence(source_mapping); // clean the old one out first.
262 
263  // create new file transfer record to hold this correspondence.
264  file_transfer_record *new_record = new file_transfer_record;
265  new_record->_source_mapping = source_mapping;
266  new_record->_src_root = source_root;
267  new_record->_refresh_interval = refresh_interval;
268  new_record->_local_dir = new directory_tree(source_root);
269 //hmmm: doesn't say anything about a pattern. do we need to worry about that?
270 
271  // check that the directory looked healthy.
272  if (!new_record->_local_dir->good()) {
273  WHACK(new_record);
274  return common::ACCESS_DENIED;
275  }
276 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
277  LOG(astring("adding tree for: ent=") + new_record->_ent.text_form()
278  + " src=" + new_record->_src_root);
279 #endif
280  // calculate size and checksum info for the directory.
281  new_record->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
282 
283 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
284  LOG(astring("done adding tree for: ent=") + new_record->_ent.text_form()
285  + " src=" + new_record->_src_root);
286 #endif
287 
288  _correspondences->append(new_record);
289 
290  return OKAY;
291 }
292 
294  (const astring &source_mapping)
295 {
296  AUTO_LOCK;
297  if (!_correspondences->whack_mapping(source_mapping))
298  return NOT_FOUND;
299  return OKAY;
300 }
301 
303  const astring &src, const astring &dest, filename_list &diffs)
304 {
305  FUNCDEF("get_differences");
306  diffs.reset();
307  AUTO_LOCK;
308  file_transfer_record *the_rec = _transfers->find(ent, src, dest);
309  if (!the_rec) return false;
310  if (!the_rec->_diffs) return false; // no diffs listed.
311  diffs = *the_rec->_diffs;
312  return true;
313 }
314 
316  const astring &src, const astring &dest, double &total_size,
317  int &total_files, double &current_size, int &current_files, bool &done,
318  time_stamp &last_active)
319 {
320  FUNCDEF("status");
321  total_size = 0;
322  total_files = 0;
323  current_files = 0;
324  current_size = 0;
325  AUTO_LOCK;
326  file_transfer_record *the_rec = _transfers->find(ent, src, dest);
327  if (!the_rec) return false;
328  done = the_rec->_done;
329  last_active = the_rec->_last_active;
330 
331  if (the_rec->_diffs) {
332  the_rec->_diffs->calculate_progress(the_rec->_last_sent._filename,
333  the_rec->_last_sent._byte_start + the_rec->_last_sent._length,
334  current_files, current_size);
335  total_files = the_rec->_diffs->total_files();
336  total_size = the_rec->_diffs->total_size();
337  }
338 
339  return true;
340 }
341 
343  (const octopus_entity &ent, const astring &src_root,
344  const astring &dest_root, const string_array &includes)
345 {
346  FUNCDEF("register_file_transfer");
347  AUTO_LOCK;
348  // make sure that this isn't an existing transfer. if so, we just update
349  // the status.
350  file_transfer_record *the_rec = _transfers->find(ent, src_root, dest_root);
351  if (!the_rec) {
352  the_rec = new file_transfer_record;
353  the_rec->_src_root = src_root;
354  the_rec->_dest_root = dest_root;
355  the_rec->_ent = ent;
356  the_rec->_includes = includes;
357  _transfers->append(the_rec); // add the new record.
358  } else {
359  the_rec->_done = false;
360  the_rec->_includes = includes;
361  the_rec->_last_active.reset(); // freshen up the last activity time.
362  }
363  return OKAY;
364 }
365 
367  const astring &src_root, const astring &dest_root)
368 {
369  AUTO_LOCK;
370  return _transfers->whack(ent, src_root, dest_root)? OKAY : NOT_FOUND;
371 }
372 
374 {
375  _lock->lock();
376  file_transfer_record *the_rec = _correspondences->find_mapping(key);
377  if (!the_rec || !the_rec->_local_dir) {
378  _lock->unlock();
379  return NULL_POINTER; // unknown transfer.
380  }
381  return the_rec->_local_dir;
382 }
383 
385 {
386  _lock->unlock();
387 }
388 
390  const astring &new_path)
391 {
392  AUTO_LOCK;
393  file_transfer_record *the_rec = _correspondences->find_mapping(key);
394  if (!the_rec) return false; // unknown transfer.
395  if (!the_rec->_local_dir) return false; // not right type.
396  return the_rec->_local_dir->add_path(new_path) == common::OKAY;
397 }
398 
400  const astring &old_path)
401 {
402  AUTO_LOCK;
403  file_transfer_record *the_rec = _correspondences->find_mapping(key);
404  if (!the_rec) return false; // unknown transfer.
405  if (!the_rec->_local_dir) return false; // not right type.
406  return the_rec->_local_dir->remove_path(old_path) == common::OKAY;
407 }
408 
410 {
411  FUNCDEF("periodic_actions");
412  AUTO_LOCK;
413 
414  // first, we'll clean out old transfers.
415  time_stamp oldest_allowed(-TRANSFER_TIMEOUT);
416  // nothing older than this should be kept.
417  for (int i = _transfers->elements() - 1; i >= 0; i--) {
418  const file_transfer_record *curr = _transfers->get(i);
419  if (curr->_last_active < oldest_allowed) {
420 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
421  LOG(astring("cleaning record for: ent=") + curr->_ent.text_form()
422  + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
423 #endif
424  _transfers->zap(i, i);
425  }
426  }
427 
428  // then we'll rescan any trees that are ready for it.
429  for (int i = 0; i < _correspondences->elements(); i++) {
430  file_transfer_record *curr = _correspondences->borrow(i);
431  if (curr->_last_active < time_stamp(-curr->_refresh_interval)) {
432  if (curr->_local_dir) {
433 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
434  LOG(astring("refreshing tree for: ent=") + curr->_ent.text_form()
435  + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
436 #endif
437  WHACK(curr->_local_dir);
438  curr->_local_dir = new directory_tree(curr->_src_root);
439  curr->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
440 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
441  LOG(astring("done refreshing tree for: ent=") + curr->_ent.text_form()
442  + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
443 #endif
444  }
445  curr->_last_active.reset(); // reset our action time.
446  }
447  }
448 }
449 
451  byte_array &packed_form, infoton * &reformed)
452 {
453  // this method doesn't use the lists, so it doesn't need locking.
455  return NO_HANDLER;
456  return reconstituter(classifier, packed_form, reformed,
458 }
459 
460 // the "handle_" and "conclude_" methods are thread-safe because the mutex is locked before
461 // their invocations.
462 
463 basis::outcome file_transfer_tentacle::handle_build_target_tree_request(file_transfer_infoton &req,
464  const octopus_request_id &item_id)
465 {
466  FUNCDEF("handle_build_target_tree_request");
467 
468  // get the mapping from the specified location on this side.
469  filename splitting(req._src_root);
470  string_array pieces;
471  bool rooted;
472  splitting.separate(rooted, pieces);
473  astring source_mapping = pieces[0];
474 
475  // patch the name up to find the sub_path for the source.
476  filename source_start;
477  pieces.zap(0, 0);
478  source_start.join(rooted, pieces);
479 
480  // locate the allowed transfer depot for the mapping they provided.
481  file_transfer_record *mapping_record = _correspondences->find_mapping(source_mapping);
482  if (!mapping_record) {
483  LOG(astring("could not find source mapping of ") + source_mapping);
484  return NOT_FOUND;
485  }
486 
487  // unpack the tree that they sent us which describes their local area.
488  directory_tree *dest_tree = new directory_tree;
489  if (!dest_tree->unpack(req._packed_data)) {
490  LOG(astring("could not unpack requester's directory tree"));
491  WHACK(dest_tree);
492  return GARBAGE;
493  }
494 
495  string_array requested_names;
496  if (!requested_names.unpack(req._packed_data)) {
497  LOG(astring("could not unpack requester's filename includes"));
498  WHACK(dest_tree);
499  return GARBAGE;
500  }
501 
502  // look up to see if this is about something that has already been seen.
503  // we don't want to add a new transfer record if they're already working on
504  // this. that also lets them do a new tree compare to restart the transfer.
505  file_transfer_record *the_rec = _transfers->find(item_id._entity,
506  req._src_root, req._dest_root);
507  if (!the_rec) {
508  // there was no existing record; we'll create a new one.
509  the_rec = new file_transfer_record;
510  the_rec->_ent = item_id._entity;
511  the_rec->_src_root = req._src_root;
512  the_rec->_dest_root = req._dest_root;
513  _transfers->append(the_rec);
514  } else {
515  // record some activity on this record.
516  the_rec->_done = false;
517  the_rec->_last_active.reset();
518  }
519 
520  // create any directories that are missing at this point.
521  basis::outcome result = dest_tree->make_directories(req._dest_root);
522  if (result != common::OKAY) {
523  LOG("ERROR: got bad result from make_directories!");
524  }
525 
526  req._packed_data.reset(); // clear out existing stuff before cloning.
527  file_transfer_infoton *reply = dynamic_cast<file_transfer_infoton *>(req.clone());
528 
529  reply->_request = false; // it's a response now.
530  reply->_success = result;
531  store_product(reply, item_id);
532  // send back the comparison list.
533 
534  return OKAY;
535 }
536 
537 basis::outcome file_transfer_tentacle::handle_build_target_tree_response(file_transfer_infoton &resp,
538  const octopus_request_id &item_id)
539 {
540 //go to next step, tree comparison.
541 //look at the handle tree compare response for help though.
542  FUNCDEF("handle_build_target_tree_response");
543  file_transfer_record *the_rec = _transfers->find(item_id._entity,
544  resp._src_root, resp._dest_root);
545  if (!the_rec) {
546  LOG(astring("could not find the record for this transfer: item=")
547  + item_id.text_form() + " src=" + resp._src_root + " dest="
548  + resp._dest_root);
549  return NOT_FOUND; // not registered, so reject it.
550  }
551 
552  the_rec->_last_active.reset(); // record some activity on this record.
553 
554  return resp._success;
555 }
556 
557 outcome file_transfer_tentacle::handle_tree_compare_request
558  (file_transfer_infoton &req, const octopus_request_id &item_id)
559 {
560  FUNCDEF("handle_tree_compare_request");
561 
562  // get the mapping from the specified location on this side.
563  filename splitting(req._src_root);
564  string_array pieces;
565  bool rooted;
566  splitting.separate(rooted, pieces);
567  astring source_mapping = pieces[0];
568 
569  // patch the name up to find the sub_path for the source.
570  filename source_start;
571  pieces.zap(0, 0);
572  source_start.join(rooted, pieces);
573 
574  // locate the allowed transfer depot for the mapping they provided.
575  file_transfer_record *mapping_record
576  = _correspondences->find_mapping(source_mapping);
577  if (!mapping_record) {
578  LOG(astring("could not find source mapping of ") + source_mapping);
579  return NOT_FOUND;
580  }
581 
582  // unpack the tree that they sent us which describes their local area.
583  directory_tree *dest_tree = new directory_tree;
584  if (!dest_tree->unpack(req._packed_data)) {
585  LOG(astring("could not unpack requester's directory tree"));
586  WHACK(dest_tree);
587  return GARBAGE;
588  }
589 
590  string_array requested_names;
591  if (!requested_names.unpack(req._packed_data)) {
592  LOG(astring("could not unpack requester's filename includes"));
593  WHACK(dest_tree);
594  return GARBAGE;
595  }
596 
597  // look up to see if this is about something that has already been seen.
598  // we don't want to add a new transfer record if they're already working on
599  // this. that also lets them do a new tree compare to restart the transfer.
600  file_transfer_record *the_rec = _transfers->find(item_id._entity,
601  req._src_root, req._dest_root);
602  if (!the_rec) {
603  // there was no existing record; we'll create a new one.
604  the_rec = new file_transfer_record;
605  the_rec->_ent = item_id._entity;
606  the_rec->_src_root = req._src_root;
607  the_rec->_dest_root = req._dest_root;
608  _transfers->append(the_rec);
609  } else {
610  // record some activity on this record.
611  the_rec->_done = false;
612  the_rec->_last_active.reset();
613  }
614 
615  the_rec->_diffs = new filename_list;
616 
617  int how_comp = file_info::EQUAL_NAME; // the prize for doing nothing.
618  if (_mode & COMPARE_SIZE_AND_TIME)
619  how_comp |= file_info::EQUAL_FILESIZE | file_info::EQUAL_TIMESTAMP;
620  if (_mode & COMPARE_CONTENT_SAMPLE)
621  how_comp |= file_info::EQUAL_CHECKSUM;
622 
623  // compare the two trees of files.
624  directory_tree::compare_trees(*mapping_record->_local_dir,
625  source_start.raw(), *dest_tree, astring::empty_string(),
626  *the_rec->_diffs, (file_info::file_similarity)how_comp);
627 
628 //LOG(astring("filenames decided as different:\n") + the_rec->_diffs->text_form());
629 
630  // now prune the diffs to accord with what they claim they want.
631  if (requested_names.length()) {
632  for (int i = the_rec->_diffs->elements() - 1; i >= 0; i--) {
633  filename diff_curr = *the_rec->_diffs->get(i);
634  bool found = false;
635  for (int j = 0; j < requested_names.length(); j++) {
636  filename req_curr(requested_names[j]);
637  if (req_curr.compare_suffix(diff_curr)) {
638  found = true;
639  break;
640  }
641  }
642  if (!found) the_rec->_diffs->zap(i, i);
643  }
644  }
645 
646  req._packed_data.reset(); // clear out existing stuff before cloning.
647  file_transfer_infoton *reply = dynamic_cast<file_transfer_infoton *>(req.clone());
648  the_rec->_diffs->pack(reply->_packed_data);
649 
650 //hmmm: does the other side really need the list of filenames? i guess we
651 // could check validity of what's transferred or check space available
652 // before the client starts the transfer.
653 
654  reply->_request = false; // it's a response now.
655  store_product(reply, item_id);
656  // send back the comparison list.
657 
658  return OKAY;
659 }
660 
661 outcome file_transfer_tentacle::handle_tree_compare_response
662  (file_transfer_infoton &resp, const octopus_request_id &item_id)
663 {
664  FUNCDEF("handle_tree_compare_response");
665  file_transfer_record *the_rec = _transfers->find(item_id._entity,
666  resp._src_root, resp._dest_root);
667  if (!the_rec) {
668  LOG(astring("could not find the record for this transfer: item=")
669  + item_id.text_form() + " src=" + resp._src_root + " dest="
670  + resp._dest_root);
671  return NOT_FOUND; // not registered, so reject it.
672  }
673 
674  the_rec->_last_active.reset(); // record some activity on this record.
675 
676  filename_list *flist = new filename_list;
677  if (!flist->unpack(resp._packed_data)) {
678  WHACK(flist);
679  return GARBAGE;
680  }
681 
682 //hmmm: verify space on device?
683 
684  the_rec->_diffs = flist; // set the list of differences.
685  return OKAY;
686 }
687 
688 outcome file_transfer_tentacle::handle_storage_request
689  (file_transfer_infoton &req, const octopus_request_id &item_id)
690 {
691  FUNCDEF("handle_storage_request");
692  if (_mode & ONLY_REPORT_DIFFS) {
693  // store an unhandled infoton.
694  unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
695  store_product(deny, item_id);
696  return NO_HANDLER;
697  }
698 
699  // look up the transfer record.
700  file_transfer_record *the_rec = _transfers->find(item_id._entity,
701  req._src_root, req._dest_root);
702  if (!the_rec) {
703  LOG(astring("could not find the record for this transfer: item=")
704  + item_id.text_form() + " src=" + req._src_root + " dest="
705  + req._dest_root);
706  return NOT_FOUND; // not registered, so reject it.
707  }
708 
709  the_rec->_last_active.reset(); // mark it as still active.
710 
711  file_transfer_infoton *resp = dynamic_cast<file_transfer_infoton *>(req.clone());
712 
713  if (!the_rec->_diffs) return BAD_INPUT; // wrong type of object.
714 
715  outcome bufret = heavy_file_operations::buffer_files
716  (_correspondences->translate(the_rec->_src_root), *the_rec->_diffs,
717  the_rec->_last_sent, resp->_packed_data, _maximum_transfer);
718  if (bufret == heavy_file_operations::FINISHED) {
719  bufret = OKAY; // in either case, we don't emit a finished outcome; handled elsewhere.
720  if (!resp->_packed_data.length()) {
721  // blank packages, so finish by setting command to be a conclude marker.
722  the_rec->_done = true;
724  }
725  } else if (bufret != OKAY) {
726  // complain, but still send.
727  LOG(astring("buffer files returned an error on item=")
728  + item_id.text_form() + " src=" + req._src_root + " dest="
729  + req._dest_root);
730  }
731 
732 //can remove this block if stops saying it.
733  if ((bufret == OKAY) && !resp->_packed_data.length() ) {
734  LOG("marking empty transfer as done; why not caught above at FINISHED check?");
735  the_rec->_done = true;
737  }
738 //end of can remove.
739 
740  resp->_request = false; // it's a response now.
741  store_product(resp, item_id);
742  return bufret;
743 }
744 
745 outcome file_transfer_tentacle::handle_storage_response
746  (file_transfer_infoton &resp, const octopus_request_id &item_id)
747 {
748  FUNCDEF("handle_storage_response");
749  if (_mode & ONLY_REPORT_DIFFS) {
750  // not spoken here.
751  return NO_HANDLER;
752  }
753 
754  // look up the transfer record.
755  file_transfer_record *the_rec = _transfers->find(item_id._entity,
756  resp._src_root, resp._dest_root);
757  if (!the_rec) return NOT_FOUND; // not registered, so reject it.
758 
759  the_rec->_last_active.reset(); // mark it as still active.
760 
761  if (!resp._packed_data.length()) {
762  // mark that we're done now.
763  the_rec->_done = true;
764  }
765 
766  // chew on all the things they sent us.
767  while (resp._packed_data.length()) {
768  file_time empty;
769  file_transfer_header found(empty);
770  if (!found.unpack(resp._packed_data)) {
771  // bomb out now.
772  LOG(astring("corruption seen on item=") + item_id.text_form()
773  + " src=" + resp._src_root + " dest=" + resp._dest_root);
774  return GARBAGE;
775  }
776  the_rec->_last_sent = found;
777 
778  if (found._length > resp._packed_data.length()) {
779  // another case for leaving--not enough data left in the buffer.
780  LOG(astring("data underflow seen on item=") + item_id.text_form()
781  + " src=" + resp._src_root + " dest=" + resp._dest_root);
782  return GARBAGE;
783  }
784  byte_array to_write = resp._packed_data.subarray(0, found._length - 1);
785  resp._packed_data.zap(0, found._length - 1);
786 
787  if (!the_rec->_diffs) return BAD_INPUT;
788 
789  const file_info *recorded_info = the_rec->_diffs->find(found._filename);
790  if (!recorded_info) {
791  LOG(astring("unrequested file seen: ") + found._filename);
792  continue; // maybe there are others that aren't confused.
793  }
794 
795  astring full_file = resp._dest_root + filename::default_separator()
796  + recorded_info->secondary();
797 // LOG(astring("telling it to write to fullfile: ") + full_file);
798 
799  outcome ret = heavy_file_operations::write_file_chunk(full_file,
800  found._byte_start, to_write);
801  if (ret != OKAY) {
802  LOG(astring("failed to write file chunk: error=")
803  + heavy_file_operations::outcome_name(ret) + " file=" + full_file
804  + a_sprintf(" start=%d len=%d", found._byte_start, found._length));
805  }
806  found._time.set_time(full_file);
807  }
808 
809  // there is no response product to store.
810  return OKAY;
811 }
812 
813 outcome file_transfer_tentacle::conclude_storage_request
814  (file_transfer_infoton &req, const octopus_request_id &item_id)
815 {
816  FUNCDEF("conclude_storage_request");
817  if (_mode & ONLY_REPORT_DIFFS) {
818  // store an unhandled infoton.
819  unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
820  store_product(deny, item_id);
821  return NO_HANDLER;
822  }
823 
824  // look up the transfer record.
825  file_transfer_record *the_rec = _transfers->find(item_id._entity,
826  req._src_root, req._dest_root);
827  if (!the_rec) {
828  LOG(astring("could not find the record for this transfer: item=")
829  + item_id.text_form() + " src=" + req._src_root + " dest="
830  + req._dest_root);
831  return NOT_FOUND; // not registered, so reject it.
832  }
833 
834  the_rec->_last_active.reset(); // mark it as still active.
835 
836  file_transfer_infoton *resp = dynamic_cast<file_transfer_infoton *>(req.clone());
837 
838  if (!the_rec->_diffs) return BAD_INPUT; // wrong type of object.
839 
840  the_rec->_done = true; // we're concluding the transfer, so that's that.
841  resp->_request = false; // it's a response now.
842  store_product(resp, item_id);
843 
844  LOG(astring("concluding transfer request on src=") + req._src_root + " dest="
845  + req._dest_root);
846 
847  return common::OKAY;
848 }
849 
850 outcome file_transfer_tentacle::conclude_storage_response
851  (file_transfer_infoton &resp, const octopus_request_id &item_id)
852 {
853  FUNCDEF("conclude_storage_response");
854  if (_mode & ONLY_REPORT_DIFFS) {
855  // not spoken here.
856  return NO_HANDLER;
857  }
858 
859  // look up the transfer record.
860  file_transfer_record *the_rec = _transfers->find(item_id._entity,
861  resp._src_root, resp._dest_root);
862  if (!the_rec) return NOT_FOUND; // not registered, so reject it.
863 
864  the_rec->_last_active.reset(); // mark it as still active.
865 
866  // mark that we're done now.
867  the_rec->_done = true;
868 
869  LOG(astring("concluding transfer response on src=") + resp._src_root + " dest="
870  + resp._dest_root);
871 
872  // there is no response product to store.
873  return OKAY;
874 }
875 
876 // consume() is the only method that is allowed to invoke the "handle_X" methods
877 // and it must lock the object beforehand.
878 
880  const octopus_request_id &item_id, byte_array &transformed)
881 {
882  FUNCDEF("consume");
883  transformed.reset();
884  file_transfer_infoton *inf = dynamic_cast<file_transfer_infoton *>(&to_chow);
885  if (!inf) return DISALLOWED; // not for us.
886 
887  AUTO_LOCK; // protect our lists while we're working on them.
888 
889  switch (inf->_command) {
891  if (inf->_request) return handle_build_target_tree_request(*inf, item_id);
892  else return handle_build_target_tree_response(*inf, item_id);
893  }
895  if (inf->_request) return handle_tree_compare_request(*inf, item_id);
896  else return handle_tree_compare_response(*inf, item_id);
897  }
899  if (inf->_request) return handle_storage_request(*inf, item_id);
900  else return handle_storage_response(*inf, item_id);
901  }
903  if (inf->_request) return conclude_storage_request(*inf, item_id);
904  else return conclude_storage_response(*inf, item_id);
905  }
906  }
907  return BAD_INPUT; // not a recognized command.
908 }
909 
911 {
912  FUNCDEF("refresh_now");
913  AUTO_LOCK;
914  for (int i = 0; i < _correspondences->elements(); i++) {
915  file_transfer_record *curr = _correspondences->borrow(i);
916  if (!curr) continue;
917  if (curr->_source_mapping != source_mapping) continue;
918  if (curr->_local_dir) {
919 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
920  LOG(astring("refreshing tree for: ent=") + curr->_ent.text_form()
921  + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
922 #endif
923  WHACK(curr->_local_dir);
924  curr->_local_dir = new directory_tree(curr->_src_root);
925  curr->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
926 #ifdef DEBUG_FILE_TRANSFER_TENTACLE
927  LOG(astring("done refreshing tree for: ent=") + curr->_ent.text_form()
928  + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
929 #endif
930  }
931  curr->_last_active.reset(); // reset our action time.
932  return OKAY;
933  }
934  return NOT_FOUND;
935 }
936 
937 } //namespace.
938 
939 
a_sprintf is a specialization of astring that provides printf style support.
Definition: astring.h:440
void reset(int number=0, const contents *initial_contents=NULL_POINTER)
Resizes this array and sets the contents from an array of contents.
Definition: array.h:349
array subarray(int start, int end) const
Returns the array segment between the indices "start" and "end".
Definition: array.h:443
int length() const
Returns the current reported length of the allocated C array.
Definition: array.h:115
outcome zap(int start, int end)
Deletes from "this" the objects inclusively between "start" and "end".
Definition: array.h:769
Provides a dynamically resizable ASCII character string.
Definition: astring.h:35
bool t() const
t() is a shortcut for the string being "true", as in non-empty.
Definition: astring.h:97
virtual char get(int index) const
a constant peek at the string's internals at the specified index.
Definition: astring.cpp:138
static const astring & empty_string()
useful wherever empty strings are needed, e.g., function defaults.
Definition: astring.cpp:128
virtual void text_form(base_string &state_fill) const
Provides a text view of all the important info owned by this object.
Definition: astring.cpp:130
int find(char to_find, int position=0, bool reverse=false) const
Locates "to_find" in "this".
Definition: astring.cpp:574
A very common template for a dynamic array of bytes.
Definition: byte_array.h:36
void lock()
Clamps down on the mutex, if possible.
Definition: mutex.cpp:95
void unlock()
Gives up the possession of the mutex.
Definition: mutex.cpp:107
Outcomes describe the state of completion for an operation.
Definition: outcome.h:31
An object that traverses directory trees and provides a view of all files.
virtual bool unpack(basis::byte_array &packed_form)
unpacks the directory_tree from a byte_array.
basis::outcome make_directories(const basis::astring new_root)
creates all of the directories in this object, but start at the "new_root".
Encapsulates some measures and calculations based on a file's contents.
Definition: file_info.h:29
const basis::astring & secondary() const
observes the alternate form of the name.
Definition: file_info.cpp:74
file_similarity
this enum encapsulates how files may be compared.
Definition: file_info.h:32
describes one portion of an ongoing file transfer.
basis::astring text_form() const
virtual bool unpack(basis::byte_array &packed_form)
Restores the packable from the "packed_form".
Provides operations commonly needed on file names.
Definition: filename.h:64
void join(bool rooted, const structures::string_array &pieces)
undoes a separate() operation to get the filename back.
Definition: filename.cpp:503
void separate(bool &rooted, structures::string_array &pieces) const
breaks the filename into its component parts.
Definition: filename.cpp:482
const basis::astring & raw() const
returns the astring that we're holding onto for the path.
Definition: filename.cpp:97
Base objects used by the file transfer tentacle to schedule transfers.
virtual basis::clonable * clone() const
must be provided to allow creation of a copy of this object.
@ BUILD_TARGET_TREE
asks the target side to build the directory tree from the source.
@ CONCLUDE_TRANSFER_MARKER
this infoton marks the end of the transfer process.
@ PLACE_FILE_CHUNKS
the destination side requests a new set of chunks.
@ TREE_COMPARISON
the destination root will be compared with the source root.
basis::abyte _command
one of the commands above.
basis::outcome _success
reports what kind of result occurred.
static const structures::string_array & file_transfer_classifier()
returns the classifier for this type of infoton.
basis::astring _src_root
the top-level directory of the source.
bool _request
if it's not a request, then it's a response.
basis::astring _dest_root
the top-level directory of the destination.
basis::byte_array _packed_data
the packed headers and file chunks.
virtual void pack(basis::byte_array &packed_form) const
stuffs the data in the infoton into the "packed_form".
Manages the transferrence of directory trees from one place to another.
bool add_path(const basis::astring &source_mapping, const basis::astring &new_path)
inserts the "new_path" into a registered correspondence.
basis::outcome cancel_file_transfer(const octopus_entity &ent, const basis::astring &src_root, const basis::astring &dest_root)
tosses a previously registered file transfer.
void unlock_directory()
unlock MUST be called when one is done looking at the tree.
virtual void expunge(const octopus_entity &to_remove)
throws out any transfers occurring for the entity "to_remove".
bool status(const octopus_entity &ent, const basis::astring &src, const basis::astring &dest, double &total_size, int &total_files, double &current_size, int &current_files, bool &done, timely::time_stamp &last_active)
locates the transfer specified and returns information about it.
basis::outcome refresh_now(const basis::astring &source_mapping)
refreshes the "source_mapping" right now, regardless of the interval.
virtual basis::outcome reconstitute(const structures::string_array &classifier, basis::byte_array &packed_form, infoton *&reformed)
recreates a "reformed" infoton from its packed form.
bool remove_path(const basis::astring &source_mapping, const basis::astring &old_path)
deletes the "old_path" out of an existing correspondence.
bool get_differences(const octopus_entity &ent, const basis::astring &src, const basis::astring &dest, filesystem::filename_list &diffs)
accesses the list of difference for an ongoing transfer.
virtual basis::outcome consume(infoton &to_chow, const octopus_request_id &item_id, basis::byte_array &transformed)
processes the "to_chow" infoton as a file transfer request.
basis::outcome remove_correspondence(const basis::astring &source_mapping)
takes out the "source_mapping" which was previously added.
void periodic_actions()
drops timed out transfers.
basis::outcome add_correspondence(const basis::astring &source_mapping, const basis::astring &source_root, int refresh_interval)
adds a file transfer correspondence.
basis::astring text_form() const
returns a string representing the current state of transfers.
@ COMPARE_SIZE_AND_TIME
uses size and time to see differences.
@ ONLY_REPORT_DIFFS
no actual file transfer, just reports.
@ COMPARE_CONTENT_SAMPLE
samples parts of file for comparison.
basis::outcome register_file_transfer(const octopus_entity &ent, const basis::astring &src_root, const basis::astring &dest_root, const structures::string_array &include)
records a transfer that is going to commence.
filesystem::directory_tree * lock_directory(const basis::astring &source_mapping)
provides a view of the tentacle's current state.
An infoton is an individual request parcel with accompanying information.
Definition: infoton.h:32
const structures::string_array & classifier() const
this array of strings is the "name" for this infoton.
Definition: infoton.cpp:85
Provides a way of identifying users of an octopus object.
Definition: entity_defs.h:35
basis::astring text_form() const
returns a readable form of the identifier.
bool blank() const
true if the entity is blank, as constructed by default constructor.
Definition: entity_defs.cpp:99
Identifies requests made on an octopus by users.
Definition: entity_defs.h:114
basis::astring text_form() const
human readable form of the request.
octopus_entity _entity
the entity.
Definition: entity_defs.h:116
provides prefab implementations for parts of the tentacle object.
bool store_product(infoton *product, const octopus_request_id &original_id)
used by tentacles to store the objects they produce from infotons.
Definition: tentacle.cpp:118
@ NO_HANDLER
no handler for that type of infoton.
Definition: tentacle.h:73
Informs the caller that a request type was unknown to the server octopus.
Provides a platform-independent object for adding threads to a program.
Definition: ethread.h:36
void reset()
cleans out all of the contents.
Definition: amorph.h:81
An array of strings with some additional helpful methods.
Definition: string_array.h:32
virtual bool unpack(basis::byte_array &packed_form)
Unpacks a string array from the "packed_form" byte array.
Definition: string_array.h:111
Represents a point in time relative to the operating system startup time.
Definition: time_stamp.h:38
basis::astring text_form(stamp_display_style style=STAMP_RELATIVE) const
returns a simple textual representation of the time_stamp.
Definition: time_stamp.cpp:61
#define formal(parameter)
This macro just eats what it's passed; it marks unused formal parameters.
Definition: definitions.h:48
#define NULL_POINTER
The value representing a pointer to nothing.
Definition: definitions.h:32
#define DEFINE_CLASS_NAME(objname)
Defines the name of a class by providing a couple standard methods.
Definition: enhance_cpp.h:45
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
Definition: enhance_cpp.h:57
#define LOG(s)
#define AUTO_LOCK
The guards collection helps in testing preconditions and reporting errors.
Definition: array.h:30
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
Definition: functions.h:121
const int SECOND_ms
Number of milliseconds in a second.
Definition: definitions.h:120
const int MINUTE_ms
Number of milliseconds in a minute.
Definition: definitions.h:121
A platform independent way to obtain the timestamp of a file.
Definition: byte_filer.cpp:37
A logger that sends to the console screen using the standard output device.
basis::outcome reconstituter(const structures::string_array &classifier, basis::byte_array &packed_form, infoton *&reformed, contents *formal(junk))
< reconstituter should work for most infotons to restore flattened infotons.
const int TRANSFER_TIMEOUT
const int FTT_CLEANING_INTERVAL
A dynamic container class that holds any kind of object via pointers.
Definition: amorph.h:55
#include <time.h>
Definition: earth_time.cpp:37