feisty meow concerns codebase 2.140
file_transfer_tentacle.cpp
Go to the documentation of this file.
1/*****************************************************************************\
2* *
3* Name : file_transfer_tentacle *
4* Author : Chris Koeritz *
5* *
6*******************************************************************************
7* Copyright (c) 2005-$now By Author. This program is free software; you can *
8* redistribute it and/or modify it under the terms of the GNU General Public *
9* License as published by the Free Software Foundation; either version 2 of *
10* the License or (at your option) any later version. This is online at: *
11* http://www.fsf.org/copyleft/gpl.html *
12* Please send any updates to: fred@gruntose.com *
13\*****************************************************************************/
14
16
17#include <basis/mutex.h>
19#include <filesystem/filename.h>
23#include <octopus/entity_defs.h>
26#include <processes/ethread.h>
27#include <textual/parser_bits.h>
28
29using namespace basis;
30using namespace filesystem;
31using namespace loggers;
32using namespace octopi;
33using namespace processes;
34using namespace structures;
35using namespace textual;
36using namespace timely;
37
38namespace octopi {
39
40#undef AUTO_LOCK
41#define AUTO_LOCK auto_synchronizer loc(*_lock);
42 // protects our lists.
43
45 // this is how frequently we clean up the list to remove outdated transfers.
46
47const int TRANSFER_TIMEOUT = 10 * MINUTE_ms;
48 // if it hasn't been touched in this long, it's out of there.
49
50//#define DEBUG_FILE_TRANSFER_TENTACLE
51 // uncomment for noisier version.
52
53#undef LOG
54#define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
55
57
58class file_transfer_record
59{
60public:
61 // valid for both transfers and correspondences.
62 astring _src_root; // where the info is on the data provider.
63 time_stamp _last_active; // when this was last used.
64
65 // valid for file transfers only.
66 octopus_entity _ent; // the entity requesting this service.
67 astring _dest_root; // where the info is on the data sink.
68 filename_list *_diffs; // the differences to be transferred.
69 file_transfer_header _last_sent; // the last chunk that was sent.
70 bool _done; // true if the transfer is finished.
71 string_array _includes; // the set to include.
72
73 // valid for correspondence records only.
74 directory_tree *_local_dir; // our local information about the transfer.
75 astring _source_mapping; // valid for a correspondence record.
76 int _refresh_interval; // the rate of refreshing the source tree.
77
78 file_transfer_record() : _diffs(NULL_POINTER), _last_sent(file_time()),
79 _done(false), _local_dir(NULL_POINTER)
80 {}
81
82 ~file_transfer_record() {
83 WHACK(_local_dir);
84 WHACK(_diffs);
85 }
86
87 astring text_form() const {
88 astring to_return;
89 to_return += astring("src=") + _src_root + astring(" last act=")
90 + _last_active.text_form();
91 if (_ent.blank()) to_return += astring(" ent=") + _ent.text_form();
92 if (_dest_root.t()) {
93 to_return += astring(" dest=") + _dest_root;
94 to_return += astring(" last_sent=") + _last_sent.text_form();
95 }
96 return to_return;
97 }
98};
99
101
102// this implementation assumes that the same entity will never simultaneously
103// transfer the same source to the same destination. that assumption holds
104// up fine for different clients, since they should have different entities.
105// when there is a collision on the entity/src/dest, then the default action
106// is to assume that the transfer is just being started over.
107
108class file_transfer_status : public amorph<file_transfer_record>
109{
110public:
111 // find a transfer record by the key fields.
112 file_transfer_record *find(const octopus_entity &ent, const astring &src,
113 const astring &dest) {
114 for (int i = 0; i < elements(); i++) {
115 const file_transfer_record *rec = get(i);
116 if (rec && (rec->_ent == ent) && (rec->_src_root == src)
117 && (rec->_dest_root == dest) ) {
118 return borrow(i);
119 }
120 }
121 return NULL_POINTER;
122 }
123
124 virtual ~file_transfer_status() {}
125
126 DEFINE_CLASS_NAME("file_transfer_status");
127
128 // find a file correspondence record by the mapping name.
129 file_transfer_record *find_mapping(const astring &source_mapping) {
130 for (int i = 0; i < elements(); i++) {
131 const file_transfer_record *rec = get(i);
132 if (rec && (rec->_source_mapping == source_mapping) )
133 return borrow(i);
134 }
135 return NULL_POINTER;
136 }
137
138 // turns a source mapping into the location that it corresponds to.
139 astring translate(const astring &source_path) const {
140 FUNCDEF("translate");
141 string_array pieces;
142 bool rooted;
143 filename(source_path).separate(rooted, pieces);
144 astring source_mapping = pieces[0];
145 pieces.zap(0, 0); // remove source part.
146
147 for (int i = 0; i < elements(); i++) {
148 const file_transfer_record *rec = get(i);
149 if (rec && (rec->_source_mapping == source_mapping) ) {
150 return rec->_src_root;
151 }
152 }
153 return astring::empty_string();
154 }
155
156 // removes a file transfer record by the key fields.
157 bool whack(const octopus_entity &ent, const astring &src,
158 const astring &dest) {
159 for (int i = 0; i < elements(); i++) {
160 const file_transfer_record *rec = get(i);
161 if (rec && (rec->_ent == ent) && (rec->_src_root == src)
162 && (rec->_dest_root == dest) ) {
163 zap(i, i);
164 return true;
165 }
166 }
167 return false;
168 }
169
170 // clean all records for the entity "ent".
171 void whack_all(const octopus_entity &ent) {
172 for (int i = elements() - 1; i >= 0; i--) {
173 const file_transfer_record *rec = get(i);
174 if (rec && (rec->_ent == ent) )
175 zap(i, i);
176 }
177 }
178
179 // removes a file transfer correspondence.
180 bool whack_mapping(const astring &source_mapping) {
181 for (int i = elements() - 1; i >= 0; i--) {
182 const file_transfer_record *rec = get(i);
183 if (rec && (rec->_source_mapping == source_mapping) ) {
184 zap(i, i);
185 return true;
186 }
187 }
188 return false;
189 }
190
191 // returns a string dump of the fields in this list.
192 astring text_form() const {
193 astring to_return;
194 for (int i = 0; i < elements(); i++) {
195 const file_transfer_record *rec = get(i);
196 if (rec)
197 to_return += rec->text_form() + parser_bits::platform_eol_to_chars();
198 }
199 return to_return;
200 }
201};
202
204
205class file_transfer_cleaner : public ethread
206{
207public:
208 file_transfer_cleaner(file_transfer_tentacle &parent)
209 : ethread(FTT_CLEANING_INTERVAL, SLACK_INTERVAL), _parent(parent) {}
210
211 virtual void perform_activity(void *formal(ptr)) { _parent.periodic_actions(); }
212
213private:
214 file_transfer_tentacle &_parent;
215};
216
218
222 (file_transfer_infoton::file_transfer_classifier(), false),
223 _maximum_transfer(maximum_transfer),
224 _transfers(new file_transfer_status),
225 _correspondences(new file_transfer_status),
226 _lock(new mutex),
227 _cleaner(new file_transfer_cleaner(*this)),
228 _mode(mode_of_transfer)
229{
230 _cleaner->start(NULL_POINTER);
231}
232
234{
235 _cleaner->stop();
236 WHACK(_transfers);
237 WHACK(_correspondences);
238 WHACK(_cleaner);
239 WHACK(_lock);
240}
241
243{
244 AUTO_LOCK;
245 return _transfers->text_form();
246}
247
249{
250 AUTO_LOCK;
251 _transfers->whack_all(to_remove);
252}
253
255 (const astring &source_mapping, const astring &source_root,
256 int refresh_interval)
257{
258 FUNCDEF("add_correspondence");
259 AUTO_LOCK;
260
261 remove_correspondence(source_mapping); // clean the old one out first.
262
263 // create new file transfer record to hold this correspondence.
264 file_transfer_record *new_record = new file_transfer_record;
265 new_record->_source_mapping = source_mapping;
266 new_record->_src_root = source_root;
267 new_record->_refresh_interval = refresh_interval;
268 new_record->_local_dir = new directory_tree(source_root);
269//hmmm: doesn't say anything about a pattern. do we need to worry about that?
270
271 // check that the directory looked healthy.
272 if (!new_record->_local_dir->good()) {
273 WHACK(new_record);
274 return common::ACCESS_DENIED;
275 }
276#ifdef DEBUG_FILE_TRANSFER_TENTACLE
277 LOG(astring("adding tree for: ent=") + new_record->_ent.text_form()
278 + " src=" + new_record->_src_root);
279#endif
280 // calculate size and checksum info for the directory.
281 new_record->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
282
283#ifdef DEBUG_FILE_TRANSFER_TENTACLE
284 LOG(astring("done adding tree for: ent=") + new_record->_ent.text_form()
285 + " src=" + new_record->_src_root);
286#endif
287
288 _correspondences->append(new_record);
289
290 return OKAY;
291}
292
294 (const astring &source_mapping)
295{
296 AUTO_LOCK;
297 if (!_correspondences->whack_mapping(source_mapping))
298 return NOT_FOUND;
299 return OKAY;
300}
301
303 const astring &src, const astring &dest, filename_list &diffs)
304{
305 FUNCDEF("get_differences");
306 diffs.reset();
307 AUTO_LOCK;
308 file_transfer_record *the_rec = _transfers->find(ent, src, dest);
309 if (!the_rec) return false;
310 if (!the_rec->_diffs) return false; // no diffs listed.
311 diffs = *the_rec->_diffs;
312 return true;
313}
314
316 const astring &src, const astring &dest, double &total_size,
317 int &total_files, double &current_size, int &current_files, bool &done,
318 time_stamp &last_active)
319{
320 FUNCDEF("status");
321 total_size = 0;
322 total_files = 0;
323 current_files = 0;
324 current_size = 0;
325 AUTO_LOCK;
326 file_transfer_record *the_rec = _transfers->find(ent, src, dest);
327 if (!the_rec) return false;
328 done = the_rec->_done;
329 last_active = the_rec->_last_active;
330
331 if (the_rec->_diffs) {
332 the_rec->_diffs->calculate_progress(the_rec->_last_sent._filename,
333 the_rec->_last_sent._byte_start + the_rec->_last_sent._length,
334 current_files, current_size);
335 total_files = the_rec->_diffs->total_files();
336 total_size = the_rec->_diffs->total_size();
337 }
338
339 return true;
340}
341
343 (const octopus_entity &ent, const astring &src_root,
344 const astring &dest_root, const string_array &includes)
345{
346 FUNCDEF("register_file_transfer");
347 AUTO_LOCK;
348 // make sure that this isn't an existing transfer. if so, we just update
349 // the status.
350 file_transfer_record *the_rec = _transfers->find(ent, src_root, dest_root);
351 if (!the_rec) {
352 the_rec = new file_transfer_record;
353 the_rec->_src_root = src_root;
354 the_rec->_dest_root = dest_root;
355 the_rec->_ent = ent;
356 the_rec->_includes = includes;
357 _transfers->append(the_rec); // add the new record.
358 } else {
359 the_rec->_done = false;
360 the_rec->_includes = includes;
361 the_rec->_last_active.reset(); // freshen up the last activity time.
362 }
363 return OKAY;
364}
365
367 const astring &src_root, const astring &dest_root)
368{
369 AUTO_LOCK;
370 return _transfers->whack(ent, src_root, dest_root)? OKAY : NOT_FOUND;
371}
372
374{
375 _lock->lock();
376 file_transfer_record *the_rec = _correspondences->find_mapping(key);
377 if (!the_rec || !the_rec->_local_dir) {
378 _lock->unlock();
379 return NULL_POINTER; // unknown transfer.
380 }
381 return the_rec->_local_dir;
382}
383
388
390 const astring &new_path)
391{
392 AUTO_LOCK;
393 file_transfer_record *the_rec = _correspondences->find_mapping(key);
394 if (!the_rec) return false; // unknown transfer.
395 if (!the_rec->_local_dir) return false; // not right type.
396 return the_rec->_local_dir->add_path(new_path) == common::OKAY;
397}
398
400 const astring &old_path)
401{
402 AUTO_LOCK;
403 file_transfer_record *the_rec = _correspondences->find_mapping(key);
404 if (!the_rec) return false; // unknown transfer.
405 if (!the_rec->_local_dir) return false; // not right type.
406 return the_rec->_local_dir->remove_path(old_path) == common::OKAY;
407}
408
410{
411 FUNCDEF("periodic_actions");
412 AUTO_LOCK;
413
414 // first, we'll clean out old transfers.
415 time_stamp oldest_allowed(-TRANSFER_TIMEOUT);
416 // nothing older than this should be kept.
417 for (int i = _transfers->elements() - 1; i >= 0; i--) {
418 const file_transfer_record *curr = _transfers->get(i);
419 if (curr->_last_active < oldest_allowed) {
420#ifdef DEBUG_FILE_TRANSFER_TENTACLE
421 LOG(astring("cleaning record for: ent=") + curr->_ent.text_form()
422 + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
423#endif
424 _transfers->zap(i, i);
425 }
426 }
427
428 // then we'll rescan any trees that are ready for it.
429 for (int i = 0; i < _correspondences->elements(); i++) {
430 file_transfer_record *curr = _correspondences->borrow(i);
431 if (curr->_last_active < time_stamp(-curr->_refresh_interval)) {
432 if (curr->_local_dir) {
433#ifdef DEBUG_FILE_TRANSFER_TENTACLE
434 LOG(astring("refreshing tree for: ent=") + curr->_ent.text_form()
435 + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
436#endif
437 WHACK(curr->_local_dir);
438 curr->_local_dir = new directory_tree(curr->_src_root);
439 curr->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
440#ifdef DEBUG_FILE_TRANSFER_TENTACLE
441 LOG(astring("done refreshing tree for: ent=") + curr->_ent.text_form()
442 + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
443#endif
444 }
445 curr->_last_active.reset(); // reset our action time.
446 }
447 }
448}
449
451 byte_array &packed_form, infoton * &reformed)
452{
453 // this method doesn't use the lists, so it doesn't need locking.
455 return NO_HANDLER;
456 return reconstituter(classifier, packed_form, reformed,
458}
459
460// the "handle_" and "conclude_" methods are thread-safe because the mutex is locked before
461// their invocations.
462
463basis::outcome file_transfer_tentacle::handle_build_target_tree_request(file_transfer_infoton &req,
464 const octopus_request_id &item_id)
465{
466 FUNCDEF("handle_build_target_tree_request");
467
468 // get the mapping from the specified location on this side.
469 filename splitting(req._src_root);
470 string_array pieces;
471 bool rooted;
472 splitting.separate(rooted, pieces);
473 astring source_mapping = pieces[0];
474
475 // patch the name up to find the sub_path for the source.
476 filename source_start;
477 pieces.zap(0, 0);
478 source_start.join(rooted, pieces);
479
480 // locate the allowed transfer depot for the mapping they provided.
481 file_transfer_record *mapping_record = _correspondences->find_mapping(source_mapping);
482 if (!mapping_record) {
483 LOG(astring("could not find source mapping of ") + source_mapping);
484 return NOT_FOUND;
485 }
486
487 // unpack the tree that they sent us which describes their local area.
488 directory_tree *dest_tree = new directory_tree;
489 if (!dest_tree->unpack(req._packed_data)) {
490 LOG(astring("could not unpack requester's directory tree"));
491 WHACK(dest_tree);
492 return GARBAGE;
493 }
494
495 string_array requested_names;
496 if (!requested_names.unpack(req._packed_data)) {
497 LOG(astring("could not unpack requester's filename includes"));
498 WHACK(dest_tree);
499 return GARBAGE;
500 }
501
502 // look up to see if this is about something that has already been seen.
503 // we don't want to add a new transfer record if they're already working on
504 // this. that also lets them do a new tree compare to restart the transfer.
505 file_transfer_record *the_rec = _transfers->find(item_id._entity,
506 req._src_root, req._dest_root);
507 if (!the_rec) {
508 // there was no existing record; we'll create a new one.
509 the_rec = new file_transfer_record;
510 the_rec->_ent = item_id._entity;
511 the_rec->_src_root = req._src_root;
512 the_rec->_dest_root = req._dest_root;
513 _transfers->append(the_rec);
514 } else {
515 // record some activity on this record.
516 the_rec->_done = false;
517 the_rec->_last_active.reset();
518 }
519
520 // create any directories that are missing at this point.
521 basis::outcome result = dest_tree->make_directories(req._dest_root);
522 if (result != common::OKAY) {
523 LOG("ERROR: got bad result from make_directories!");
524 }
525
526 req._packed_data.reset(); // clear out existing stuff before cloning.
527 file_transfer_infoton *reply = dynamic_cast<file_transfer_infoton *>(req.clone());
528
529 reply->_request = false; // it's a response now.
530 reply->_success = result;
531 store_product(reply, item_id);
532 // send back the comparison list.
533
534 return OKAY;
535}
536
537basis::outcome file_transfer_tentacle::handle_build_target_tree_response(file_transfer_infoton &resp,
538 const octopus_request_id &item_id)
539{
540//go to next step, tree comparison.
541//look at the handle tree compare response for help though.
542 FUNCDEF("handle_build_target_tree_response");
543 file_transfer_record *the_rec = _transfers->find(item_id._entity,
544 resp._src_root, resp._dest_root);
545 if (!the_rec) {
546 LOG(astring("could not find the record for this transfer: item=")
547 + item_id.text_form() + " src=" + resp._src_root + " dest="
548 + resp._dest_root);
549 return NOT_FOUND; // not registered, so reject it.
550 }
551
552 the_rec->_last_active.reset(); // record some activity on this record.
553
554 return resp._success;
555}
556
557outcome file_transfer_tentacle::handle_tree_compare_request
558 (file_transfer_infoton &req, const octopus_request_id &item_id)
559{
560 FUNCDEF("handle_tree_compare_request");
561
562 // get the mapping from the specified location on this side.
563 filename splitting(req._src_root);
564 string_array pieces;
565 bool rooted;
566 splitting.separate(rooted, pieces);
567 astring source_mapping = pieces[0];
568
569 // patch the name up to find the sub_path for the source.
570 filename source_start;
571 pieces.zap(0, 0);
572 source_start.join(rooted, pieces);
573
574 // locate the allowed transfer depot for the mapping they provided.
575 file_transfer_record *mapping_record
576 = _correspondences->find_mapping(source_mapping);
577 if (!mapping_record) {
578 LOG(astring("could not find source mapping of ") + source_mapping);
579 return NOT_FOUND;
580 }
581
582 // unpack the tree that they sent us which describes their local area.
583 directory_tree *dest_tree = new directory_tree;
584 if (!dest_tree->unpack(req._packed_data)) {
585 LOG(astring("could not unpack requester's directory tree"));
586 WHACK(dest_tree);
587 return GARBAGE;
588 }
589
590 string_array requested_names;
591 if (!requested_names.unpack(req._packed_data)) {
592 LOG(astring("could not unpack requester's filename includes"));
593 WHACK(dest_tree);
594 return GARBAGE;
595 }
596
597 // look up to see if this is about something that has already been seen.
598 // we don't want to add a new transfer record if they're already working on
599 // this. that also lets them do a new tree compare to restart the transfer.
600 file_transfer_record *the_rec = _transfers->find(item_id._entity,
601 req._src_root, req._dest_root);
602 if (!the_rec) {
603 // there was no existing record; we'll create a new one.
604 the_rec = new file_transfer_record;
605 the_rec->_ent = item_id._entity;
606 the_rec->_src_root = req._src_root;
607 the_rec->_dest_root = req._dest_root;
608 _transfers->append(the_rec);
609 } else {
610 // record some activity on this record.
611 the_rec->_done = false;
612 the_rec->_last_active.reset();
613 }
614
615 the_rec->_diffs = new filename_list;
616
617 int how_comp = file_info::EQUAL_NAME; // the prize for doing nothing.
618 if (_mode & COMPARE_SIZE_AND_TIME)
620 if (_mode & COMPARE_CONTENT_SAMPLE)
621 how_comp |= file_info::EQUAL_CHECKSUM;
622
623 // compare the two trees of files.
624 directory_tree::compare_trees(*mapping_record->_local_dir,
625 source_start.raw(), *dest_tree, astring::empty_string(),
626 *the_rec->_diffs, (file_info::file_similarity)how_comp);
627
628//LOG(astring("filenames decided as different:\n") + the_rec->_diffs->text_form());
629
630 // now prune the diffs to accord with what they claim they want.
631 if (requested_names.length()) {
632 for (int i = the_rec->_diffs->elements() - 1; i >= 0; i--) {
633 filename diff_curr = *the_rec->_diffs->get(i);
634 bool found = false;
635 for (int j = 0; j < requested_names.length(); j++) {
636 filename req_curr(requested_names[j]);
637 if (req_curr.compare_suffix(diff_curr)) {
638 found = true;
639 break;
640 }
641 }
642 if (!found) the_rec->_diffs->zap(i, i);
643 }
644 }
645
646 req._packed_data.reset(); // clear out existing stuff before cloning.
647 file_transfer_infoton *reply = dynamic_cast<file_transfer_infoton *>(req.clone());
648 the_rec->_diffs->pack(reply->_packed_data);
649
650//hmmm: does the other side really need the list of filenames? i guess we
651// could check validity of what's transferred or check space available
652// before the client starts the transfer.
653
654 reply->_request = false; // it's a response now.
655 store_product(reply, item_id);
656 // send back the comparison list.
657
658 return OKAY;
659}
660
661outcome file_transfer_tentacle::handle_tree_compare_response
662 (file_transfer_infoton &resp, const octopus_request_id &item_id)
663{
664 FUNCDEF("handle_tree_compare_response");
665 file_transfer_record *the_rec = _transfers->find(item_id._entity,
666 resp._src_root, resp._dest_root);
667 if (!the_rec) {
668 LOG(astring("could not find the record for this transfer: item=")
669 + item_id.text_form() + " src=" + resp._src_root + " dest="
670 + resp._dest_root);
671 return NOT_FOUND; // not registered, so reject it.
672 }
673
674 the_rec->_last_active.reset(); // record some activity on this record.
675
676 filename_list *flist = new filename_list;
677 if (!flist->unpack(resp._packed_data)) {
678 WHACK(flist);
679 return GARBAGE;
680 }
681
682//hmmm: verify space on device?
683
684 the_rec->_diffs = flist; // set the list of differences.
685 return OKAY;
686}
687
688outcome file_transfer_tentacle::handle_storage_request
689 (file_transfer_infoton &req, const octopus_request_id &item_id)
690{
691 FUNCDEF("handle_storage_request");
692 if (_mode & ONLY_REPORT_DIFFS) {
693 // store an unhandled infoton.
694 unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
695 store_product(deny, item_id);
696 return NO_HANDLER;
697 }
698
699 // look up the transfer record.
700 file_transfer_record *the_rec = _transfers->find(item_id._entity,
701 req._src_root, req._dest_root);
702 if (!the_rec) {
703 LOG(astring("could not find the record for this transfer: item=")
704 + item_id.text_form() + " src=" + req._src_root + " dest="
705 + req._dest_root);
706 return NOT_FOUND; // not registered, so reject it.
707 }
708
709 the_rec->_last_active.reset(); // mark it as still active.
710
711 file_transfer_infoton *resp = dynamic_cast<file_transfer_infoton *>(req.clone());
712
713 if (!the_rec->_diffs) return BAD_INPUT; // wrong type of object.
714
716 (_correspondences->translate(the_rec->_src_root), *the_rec->_diffs,
717 the_rec->_last_sent, resp->_packed_data, _maximum_transfer);
718 if (bufret == heavy_file_operations::FINISHED) {
719 bufret = OKAY; // in either case, we don't emit a finished outcome; handled elsewhere.
720 if (!resp->_packed_data.length()) {
721 // blank packages, so finish by setting command to be a conclude marker.
722 the_rec->_done = true;
724 }
725 } else if (bufret != OKAY) {
726 // complain, but still send.
727 LOG(astring("buffer files returned an error on item=")
728 + item_id.text_form() + " src=" + req._src_root + " dest="
729 + req._dest_root);
730 }
731
732//can remove this block if stops saying it.
733 if ((bufret == OKAY) && !resp->_packed_data.length() ) {
734 LOG("marking empty transfer as done; why not caught above at FINISHED check?");
735 the_rec->_done = true;
737 }
738//end of can remove.
739
740 resp->_request = false; // it's a response now.
741 store_product(resp, item_id);
742 return bufret;
743}
744
745outcome file_transfer_tentacle::handle_storage_response
746 (file_transfer_infoton &resp, const octopus_request_id &item_id)
747{
748 FUNCDEF("handle_storage_response");
749 if (_mode & ONLY_REPORT_DIFFS) {
750 // not spoken here.
751 return NO_HANDLER;
752 }
753
754 // look up the transfer record.
755 file_transfer_record *the_rec = _transfers->find(item_id._entity,
756 resp._src_root, resp._dest_root);
757 if (!the_rec) return NOT_FOUND; // not registered, so reject it.
758
759 the_rec->_last_active.reset(); // mark it as still active.
760
761 if (!resp._packed_data.length()) {
762 // mark that we're done now.
763 the_rec->_done = true;
764 }
765
766 // chew on all the things they sent us.
767 while (resp._packed_data.length()) {
768 file_time empty;
769 file_transfer_header found(empty);
770 if (!found.unpack(resp._packed_data)) {
771 // bomb out now.
772 LOG(astring("corruption seen on item=") + item_id.text_form()
773 + " src=" + resp._src_root + " dest=" + resp._dest_root);
774 return GARBAGE;
775 }
776 the_rec->_last_sent = found;
777
778 if (found._length > resp._packed_data.length()) {
779 // another case for leaving--not enough data left in the buffer.
780 LOG(astring("data underflow seen on item=") + item_id.text_form()
781 + " src=" + resp._src_root + " dest=" + resp._dest_root);
782 return GARBAGE;
783 }
784 byte_array to_write = resp._packed_data.subarray(0, found._length - 1);
785 resp._packed_data.zap(0, found._length - 1);
786
787 if (!the_rec->_diffs) return BAD_INPUT;
788
789 const file_info *recorded_info = the_rec->_diffs->find(found._filename);
790 if (!recorded_info) {
791 LOG(astring("unrequested file seen: ") + found._filename);
792 continue; // maybe there are others that aren't confused.
793 }
794
796 + recorded_info->secondary();
797// LOG(astring("telling it to write to fullfile: ") + full_file);
798
800 found._byte_start, to_write);
801 if (ret != OKAY) {
802 LOG(astring("failed to write file chunk: error=")
803 + heavy_file_operations::outcome_name(ret) + " file=" + full_file
804 + a_sprintf(" start=%d len=%d", found._byte_start, found._length));
805 }
806 found._time.set_time(full_file);
807 }
808
809 // there is no response product to store.
810 return OKAY;
811}
812
813outcome file_transfer_tentacle::conclude_storage_request
814 (file_transfer_infoton &req, const octopus_request_id &item_id)
815{
816 FUNCDEF("conclude_storage_request");
817 if (_mode & ONLY_REPORT_DIFFS) {
818 // store an unhandled infoton.
819 unhandled_request *deny = new unhandled_request(item_id, req.classifier(), NO_HANDLER);
820 store_product(deny, item_id);
821 return NO_HANDLER;
822 }
823
824 // look up the transfer record.
825 file_transfer_record *the_rec = _transfers->find(item_id._entity,
826 req._src_root, req._dest_root);
827 if (!the_rec) {
828 LOG(astring("could not find the record for this transfer: item=")
829 + item_id.text_form() + " src=" + req._src_root + " dest="
830 + req._dest_root);
831 return NOT_FOUND; // not registered, so reject it.
832 }
833
834 the_rec->_last_active.reset(); // mark it as still active.
835
836 file_transfer_infoton *resp = dynamic_cast<file_transfer_infoton *>(req.clone());
837
838 if (!the_rec->_diffs) return BAD_INPUT; // wrong type of object.
839
840 the_rec->_done = true; // we're concluding the transfer, so that's that.
841 resp->_request = false; // it's a response now.
842 store_product(resp, item_id);
843
844 LOG(astring("concluding transfer request on src=") + req._src_root + " dest="
845 + req._dest_root);
846
847 return common::OKAY;
848}
849
850outcome file_transfer_tentacle::conclude_storage_response
851 (file_transfer_infoton &resp, const octopus_request_id &item_id)
852{
853 FUNCDEF("conclude_storage_response");
854 if (_mode & ONLY_REPORT_DIFFS) {
855 // not spoken here.
856 return NO_HANDLER;
857 }
858
859 // look up the transfer record.
860 file_transfer_record *the_rec = _transfers->find(item_id._entity,
861 resp._src_root, resp._dest_root);
862 if (!the_rec) return NOT_FOUND; // not registered, so reject it.
863
864 the_rec->_last_active.reset(); // mark it as still active.
865
866 // mark that we're done now.
867 the_rec->_done = true;
868
869 LOG(astring("concluding transfer response on src=") + resp._src_root + " dest="
870 + resp._dest_root);
871
872 // there is no response product to store.
873 return OKAY;
874}
875
876// consume() is the only method that is allowed to invoke the "handle_X" methods
877// and it must lock the object beforehand.
878
880 const octopus_request_id &item_id, byte_array &transformed)
881{
882 FUNCDEF("consume");
883 transformed.reset();
884 file_transfer_infoton *inf = dynamic_cast<file_transfer_infoton *>(&to_chow);
885 if (!inf) return DISALLOWED; // not for us.
886
887 AUTO_LOCK; // protect our lists while we're working on them.
888
889 switch (inf->_command) {
891 if (inf->_request) return handle_build_target_tree_request(*inf, item_id);
892 else return handle_build_target_tree_response(*inf, item_id);
893 }
895 if (inf->_request) return handle_tree_compare_request(*inf, item_id);
896 else return handle_tree_compare_response(*inf, item_id);
897 }
899 if (inf->_request) return handle_storage_request(*inf, item_id);
900 else return handle_storage_response(*inf, item_id);
901 }
903 if (inf->_request) return conclude_storage_request(*inf, item_id);
904 else return conclude_storage_response(*inf, item_id);
905 }
906 }
907 return BAD_INPUT; // not a recognized command.
908}
909
911{
912 FUNCDEF("refresh_now");
913 AUTO_LOCK;
914 for (int i = 0; i < _correspondences->elements(); i++) {
915 file_transfer_record *curr = _correspondences->borrow(i);
916 if (!curr) continue;
917 if (curr->_source_mapping != source_mapping) continue;
918 if (curr->_local_dir) {
919#ifdef DEBUG_FILE_TRANSFER_TENTACLE
920 LOG(astring("refreshing tree for: ent=") + curr->_ent.text_form()
921 + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
922#endif
923 WHACK(curr->_local_dir);
924 curr->_local_dir = new directory_tree(curr->_src_root);
925 curr->_local_dir->calculate( !(_mode & COMPARE_CONTENT_SAMPLE) );
926#ifdef DEBUG_FILE_TRANSFER_TENTACLE
927 LOG(astring("done refreshing tree for: ent=") + curr->_ent.text_form()
928 + " src=" + curr->_src_root + " dest=" + curr->_dest_root);
929#endif
930 }
931 curr->_last_active.reset(); // reset our action time.
932 return OKAY;
933 }
934 return NOT_FOUND;
935}
936
937} //namespace.
938
939
#define LOG(s)
a_sprintf is a specialization of astring that provides printf style support.
Definition astring.h:440
void reset(int number=0, const contents *initial_contents=NULL_POINTER)
Resizes this array and sets the contents from an array of contents.
Definition array.h:349
array subarray(int start, int end) const
Returns the array segment between the indices "start" and "end".
Definition array.h:443
int length() const
Returns the current reported length of the allocated C array.
Definition array.h:115
outcome zap(int start, int end)
Deletes from "this" the objects inclusively between "start" and "end".
Definition array.h:769
Provides a dynamically resizable ASCII character string.
Definition astring.h:35
bool t() const
t() is a shortcut for the string being "true", as in non-empty.
Definition astring.h:97
static const astring & empty_string()
useful wherever empty strings are needed, e.g., function defaults.
Definition astring.cpp:128
A very common template for a dynamic array of bytes.
Definition byte_array.h:36
void lock()
Clamps down on the mutex, if possible.
Definition mutex.cpp:101
void unlock()
Gives up the possession of the mutex.
Definition mutex.cpp:113
Outcomes describe the state of completion for an operation.
Definition outcome.h:31
An object that traverses directory trees and provides a view of all files.
static bool compare_trees(const directory_tree &source, const directory_tree &target, filename_list &differences, file_info::file_similarity how_to_compare)
compares the tree in "source" with the tree in "target".
virtual bool unpack(basis::byte_array &packed_form)
unpacks the directory_tree from a byte_array.
basis::outcome make_directories(const basis::astring new_root)
creates all of the directories in this object, but start at the "new_root".
Encapsulates some measures and calculations based on a file's contents.
Definition file_info.h:29
const basis::astring & secondary() const
observes the alternate form of the name.
Definition file_info.cpp:74
file_similarity
this enum encapsulates how files may be compared.
Definition file_info.h:32
describes one portion of an ongoing file transfer.
basis::astring text_form() const
virtual bool unpack(basis::byte_array &packed_form)
Restores the packable from the "packed_form".
Provides operations commonly needed on file names.
Definition filename.h:64
void join(bool rooted, const structures::string_array &pieces)
undoes a separate() operation to get the filename back.
Definition filename.cpp:503
void separate(bool &rooted, structures::string_array &pieces) const
breaks the filename into its component parts.
Definition filename.cpp:482
static basis::astring default_separator()
returns the default separator character for this OS.
Definition filename.cpp:93
const basis::astring & raw() const
returns the astring that we're holding onto for the path.
Definition filename.cpp:97
static basis::outcome write_file_chunk(const basis::astring &target, double byte_start, const basis::byte_array &chunk, bool truncate=true, int copy_chunk_factor=heavy_file_operations::copy_chunk_factor())
stores a chunk of bytes into the "target" file.
static const char * outcome_name(const basis::outcome &to_name)
static basis::outcome buffer_files(const basis::astring &source_root, const filename_list &to_transfer, file_transfer_header &last_action, basis::byte_array &storage, int maximum_bytes)
reads files in "to_transfer" and packs them into a "storage" buffer.
Base objects used by the file transfer tentacle to schedule transfers.
@ BUILD_TARGET_TREE
asks the target side to build the directory tree from the source.
@ CONCLUDE_TRANSFER_MARKER
this infoton marks the end of the transfer process.
@ PLACE_FILE_CHUNKS
the destination side requests a new set of chunks.
@ TREE_COMPARISON
the destination root will be compared with the source root.
basis::abyte _command
one of the commands above.
basis::outcome _success
reports what kind of result occurred.
virtual basis::clonable * clone() const
must be provided to allow creation of a copy of this object.
basis::astring _src_root
the top-level directory of the source.
static const structures::string_array & file_transfer_classifier()
returns the classifier for this type of infoton.
bool _request
if it's not a request, then it's a response.
basis::astring _dest_root
the top-level directory of the destination.
basis::byte_array _packed_data
the packed headers and file chunks.
Manages the transferrence of directory trees from one place to another.
bool add_path(const basis::astring &source_mapping, const basis::astring &new_path)
inserts the "new_path" into a registered correspondence.
basis::outcome cancel_file_transfer(const octopus_entity &ent, const basis::astring &src_root, const basis::astring &dest_root)
tosses a previously registered file transfer.
void unlock_directory()
unlock MUST be called when one is done looking at the tree.
virtual void expunge(const octopus_entity &to_remove)
throws out any transfers occurring for the entity "to_remove".
bool status(const octopus_entity &ent, const basis::astring &src, const basis::astring &dest, double &total_size, int &total_files, double &current_size, int &current_files, bool &done, timely::time_stamp &last_active)
locates the transfer specified and returns information about it.
basis::outcome refresh_now(const basis::astring &source_mapping)
refreshes the "source_mapping" right now, regardless of the interval.
virtual basis::outcome reconstitute(const structures::string_array &classifier, basis::byte_array &packed_form, infoton *&reformed)
recreates a "reformed" infoton from its packed form.
bool remove_path(const basis::astring &source_mapping, const basis::astring &old_path)
deletes the "old_path" out of an existing correspondence.
bool get_differences(const octopus_entity &ent, const basis::astring &src, const basis::astring &dest, filesystem::filename_list &diffs)
accesses the list of difference for an ongoing transfer.
virtual basis::outcome consume(infoton &to_chow, const octopus_request_id &item_id, basis::byte_array &transformed)
processes the "to_chow" infoton as a file transfer request.
basis::outcome remove_correspondence(const basis::astring &source_mapping)
takes out the "source_mapping" which was previously added.
void periodic_actions()
drops timed out transfers.
basis::outcome add_correspondence(const basis::astring &source_mapping, const basis::astring &source_root, int refresh_interval)
adds a file transfer correspondence.
basis::astring text_form() const
returns a string representing the current state of transfers.
file_transfer_tentacle(int maximum_transfer, transfer_modes mode_of_transfer)
constructs a tentacle for either transfers or comparisons.
@ COMPARE_SIZE_AND_TIME
uses size and time to see differences.
@ ONLY_REPORT_DIFFS
no actual file transfer, just reports.
@ COMPARE_CONTENT_SAMPLE
samples parts of file for comparison.
basis::outcome register_file_transfer(const octopus_entity &ent, const basis::astring &src_root, const basis::astring &dest_root, const structures::string_array &include)
records a transfer that is going to commence.
filesystem::directory_tree * lock_directory(const basis::astring &source_mapping)
provides a view of the tentacle's current state.
An infoton is an individual request parcel with accompanying information.
Definition infoton.h:32
const structures::string_array & classifier() const
this array of strings is the "name" for this infoton.
Definition infoton.cpp:85
Provides a way of identifying users of an octopus object.
Definition entity_defs.h:35
basis::astring text_form() const
returns a readable form of the identifier.
bool blank() const
true if the entity is blank, as constructed by default constructor.
Identifies requests made on an octopus by users.
basis::astring text_form() const
human readable form of the request.
octopus_entity _entity
the entity.
provides prefab implementations for parts of the tentacle object.
@ NO_HANDLER
no handler for that type of infoton.
Definition tentacle.h:73
bool store_product(infoton *product, const octopus_request_id &original_id)
used by tentacles to store the objects they produce from infotons.
Definition tentacle.cpp:118
Informs the caller that a request type was unknown to the server octopus.
Provides a platform-independent object for adding threads to a program.
Definition ethread.h:36
virtual void perform_activity(void *thread_data)=0
< invoked just after after start(), when the OS thread is created.
int elements() const
the maximum number of elements currently allowed in this amorph.
Definition amorph.h:66
basis::outcome zap(int start, int end)
Removes a range of indices from the amorph.
Definition amorph.h:357
void reset()
cleans out all of the contents.
Definition amorph.h:81
int find(const file_transfer_record *to_locate, basis::outcome &o)
Searches the amorph for the contents specified.
Definition amorph.h:432
const file_transfer_record * get(int field) const
Returns a constant pointer to the information at the index "field".
Definition amorph.h:312
file_transfer_record * borrow(int field)
Returns a pointer to the information at the index "field".
Definition amorph.h:448
An array of strings with some additional helpful methods.
virtual bool unpack(basis::byte_array &packed_form)
Unpacks a string array from the "packed_form" byte array.
static const char * platform_eol_to_chars()
provides the characters that make up this platform's line ending.
Represents a point in time relative to the operating system startup time.
Definition time_stamp.h:38
basis::astring text_form(stamp_display_style style=STAMP_RELATIVE) const
returns a simple textual representation of the time_stamp.
#define AUTO_LOCK
#define formal(parameter)
This macro just eats what it's passed; it marks unused formal parameters.
Definition definitions.h:48
#define NULL_POINTER
The value representing a pointer to nothing.
Definition definitions.h:32
#define DEFINE_CLASS_NAME(objname)
Defines the name of a class by providing a couple standard methods.
Definition enhance_cpp.h:42
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
Definition enhance_cpp.h:54
The guards collection helps in testing preconditions and reporting errors.
Definition array.h:30
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
Definition functions.h:121
const int SECOND_ms
Number of milliseconds in a second.
const int MINUTE_ms
Number of milliseconds in a minute.
A platform independent way to obtain the timestamp of a file.
A logger that sends to the console screen using the standard output device.
basis::outcome reconstituter(const structures::string_array &classifier, basis::byte_array &packed_form, infoton *&reformed, contents *formal(junk))
< reconstituter should work for most infotons to restore flattened infotons.
const int TRANSFER_TIMEOUT
const int FTT_CLEANING_INTERVAL
A dynamic container class that holds any kind of object via pointers.
Definition amorph.h:55
#include <time.h>