1 /*****************************************************************************\
4 * Author : Chris Koeritz *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
15 #include <basis/byte_array.h>
16 #include <basis/astring.h>
18 #include <application/hoople_main.h>
19 #include <application/command_line.h>
20 #include <cromp/cromp_client.h>
21 #include <cromp/cromp_server.h>
22 #include <filesystem/directory_tree.h>
23 #include <filesystem/filename_list.h>
24 #include <loggers/combo_logger.h>
25 #include <octopus/entity_defs.h>
26 #include <octopus/tentacle.h>
27 #include <octopus/unhandled_request.h>
28 #include <sockets/internet_address.h>
29 #include <sockets/machine_uid.h>
30 #include <sockets/tcpip_stack.h>
31 #include <structures/static_memory_gremlin.h>
32 #include <tentacles/file_transfer_tentacle.h>
33 #include <timely/time_control.h>
34 #include <timely/time_stamp.h>
36 using namespace application;
37 using namespace basis;
38 using namespace cromp;
39 using namespace filesystem;
40 using namespace loggers;
41 using namespace octopi;
42 using namespace sockets;
43 using namespace structures;
44 using namespace textual;
45 using namespace timely;
47 #define LOG(a) CLASS_EMERGENCY_LOG(program_wide_logger::get(), a)
49 const int REPORTING_INTERVAL = 28 * SECOND_ms; // how often to squawk.
51 const int REFRESH_INTERVAL = 120 * MINUTE_ms; // how often we check tree.
53 const int TRANSFER_PORT = 10808;
54 // simple port grabbed randomly for the default.
56 const int MAX_CHUNK = 2 * MEGABYTE;
57 // we will transfer fairly large chunks so we can get this done reasonably
58 // quickly. even at that size, it shouldn't cause most modern machines to
59 // hiccup even slightly.
63 class transporter : public application_shell
69 virtual int execute();
71 DEFINE_CLASS_NAME("transporter");
73 int push_client_download();
74 // for a client side download, this prods the transfer process.
76 int print_instructions();
77 // shows the instructions for this application.
80 bool _saw_clients; // true if we ever got a connection.
81 cromp_server *_server_side;
82 // provides connection and transmission services for servers.
83 cromp_client *_client_side; // client side connection.
84 bool _leave_when_no_clients; // true if we should just do one run.
85 bool _encryption; // true if we're encrypting.
86 astring _source; // the source path which a client will ask the server for.
87 astring _target; // the target path where files are stored for the client.
88 bool _started_okay; // true if we got past the command line checks.
93 transporter::transporter()
94 : application_shell(),
98 _leave_when_no_clients(false),
102 FUNCDEF("constructor");
107 command_line args(_global_argc, _global_argv);
108 // check for a port on the command line.
110 int port = TRANSFER_PORT;
111 if (args.get_value("port", port_text, false))
112 port = port_text.convert(TRANSFER_PORT);
114 if (args.find("exit", posn)) {
115 LOG("seeing the 'exit without clients' flag set.");
116 _leave_when_no_clients = true;
120 if (args.find("encrypt", indy, false)
121 || (args.find('e', indy, false)) ) {
122 LOG("enabling encryption!");
123 // they're saying that we should encrypt the communication.
129 if (args.find("client", indy, false)) {
130 LOG("client side chosen");
134 internet_address addr;
137 // check for a hostname on the command line.
138 astring hostname("local");
140 if (args.get_value("host", host_temp, false)) {
141 LOG(astring("using host: ") + host_temp);
142 hostname = host_temp;
143 } else LOG(astring("using host: ") + hostname);
144 strcpy(addr.hostname, hostname.s());
148 if (!args.get_value("key", key, false)) {
149 print_instructions();
150 LOG("No keyword specified on command line.");
154 if (!args.get_value("root", root, false)) {
155 print_instructions();
156 LOG("No transfer root was specified on the command line.");
160 LOG("starting transfer server");
161 _server_side = new cromp_server(cromp_server::any_address(port));
162 file_transfer_tentacle *new_tent = new file_transfer_tentacle(MAX_CHUNK,
163 (file_transfer_tentacle::transfer_modes)(file_transfer_tentacle::ONLY_REPORT_DIFFS
164 | file_transfer_tentacle::COMPARE_SIZE_AND_TIME
165 | file_transfer_tentacle::COMPARE_CONTENT_SAMPLE));
167 LOG(key + " => " + root);
168 new_tent->add_correspondence(key, root, REFRESH_INTERVAL);
169 _server_side->add_tentacle(new_tent);
170 _server_side->enable_servers(_encryption);
172 LOG("starting transfer client");
173 _client_side = new cromp_client(addr);
174 if (_encryption) _client_side->enable_encryption();
176 outcome ret = _client_side->connect();
177 if (ret != cromp_client::OKAY)
178 non_continuable_error(class_name(), func, astring("failed to connect to "
179 "the server: ") + cromp_client::outcome_name(ret));
181 file_transfer_tentacle *new_tent = new file_transfer_tentacle(MAX_CHUNK,
182 (file_transfer_tentacle::transfer_modes)(file_transfer_tentacle::ONLY_REPORT_DIFFS
183 | file_transfer_tentacle::COMPARE_SIZE_AND_TIME
184 | file_transfer_tentacle::COMPARE_CONTENT_SAMPLE));
186 if (!args.get_value("source", _source, false)) {
187 print_instructions();
188 LOG("No source path was specified on the command line.");
191 if (!args.get_value("target", _target, false)) {
192 print_instructions();
193 LOG("No target path was specified on the command line.");
197 string_array includes;
198 outcome regis = new_tent->register_file_transfer
199 (_client_side->entity(), _source, _target, includes);
200 if (regis != cromp_client::OKAY)
201 non_continuable_error(class_name(), func, "failed to register transfer");
203 _client_side->add_tentacle(new_tent);
206 _started_okay = true;
210 transporter::~transporter()
216 int transporter::print_instructions()
218 astring name = filename(_global_argv[0]).basename().raw();
219 log(a_sprintf("%s usage:", name.s()));
220 log(astring::empty_string());
222 This program can transfer directory trees across the network. It will only\n\
223 copy the files missing on the client's side given what the server offers.\n\
224 The program can function as either the server side or the client side.\n\
225 The available flags are:\n\
227 %s --client --host srvname --port P --source key_path --target cli_dest\n\
229 The client side needs to know the server host (srvname) and the port where\n\
230 the server is listening for connections (P). The client will compare its\n\
231 local path (cli_dest) with the server's keyed path (key_path) and copy the\n\
232 files that are missing on the client's side. The key path will begin with\n\
233 whatever keyword the server is offering, plus optional additional path\n\
234 components to retrieve less than the whole tree being served.\n\
237 %s --server --host srvname --port P --key keyname --root srv_path\n\
239 The server side needs to know what address and port to listen on (srvname\n\
240 and P). It will open a server there that provides a directory hierarchy\n\
241 starting at the root specified (srv_path). The directory tree will be known\n\
242 to clients as the key word (keyname), thus freeing the clients from needing\n\
243 to know absolute paths on the server.\n\
245 ", name.s(), name.s()));
250 int transporter::push_client_download()
252 FUNCDEF("push_client_download");
253 // prepare a client request
254 file_transfer_infoton initiate;
255 initiate._request = true;
256 initiate._command = file_transfer_infoton::TREE_COMPARISON;
257 initiate._src_root = _source;
258 initiate._dest_root = _target;
259 directory_tree target_area(_target);
260 target_area.calculate(false);
262 initiate.package_tree_info(target_area, includes);
263 octopus_request_id cmd_id;
264 outcome start_ret = _client_side->submit(initiate, cmd_id);
265 if (start_ret != tentacle::OKAY)
266 non_continuable_error(class_name(), func, astring("failed to initiate "
267 " the transfer: ") + cromp_client::outcome_name(start_ret));
269 infoton *start_reply_tmp = NIL;
270 //hmmm: set timeout appropriate to the speed of the connection!
271 outcome first_receipt = _client_side->acquire(start_reply_tmp, cmd_id);
272 if (first_receipt != cromp_client::OKAY)
273 non_continuable_error(class_name(), func, astring("failed to receive response: ")
274 + cromp_client::outcome_name(start_ret));
275 file_transfer_infoton *start_reply = dynamic_cast<file_transfer_infoton *>
278 non_continuable_error(class_name(), func, "failed to cast starting infoton to "
283 byte_array pack_copy = start_reply->_packed_data;
284 if (!diffs.unpack(pack_copy))
285 non_continuable_error(class_name(), func, "could not unpack filename list!");
286 LOG(astring("got list of diffs:\n") + diffs.text_form());
289 outcome eval_ret = _client_side->octo()->evaluate(start_reply, cmd_id, true);
290 if (eval_ret != cromp_client::OKAY)
291 non_continuable_error(class_name(), func, astring("failed to process the "
292 "start response: ") + cromp_client::outcome_name(eval_ret));
297 LOG(a_sprintf("ongoing chunk %d", ++iter));
298 // keep going until we find a broken reply.
299 file_transfer_infoton ongoing;
300 ongoing._request = true;
301 ongoing._command = file_transfer_infoton::PLACE_FILE_CHUNKS;
302 ongoing._src_root = _source;
303 ongoing._dest_root = _target;
305 octopus_request_id cmd_id;
306 outcome place_ret = _client_side->submit(ongoing, cmd_id);
307 if (place_ret != cromp_client::OKAY)
308 non_continuable_error(class_name(), func, astring("failed to send ongoing "
309 "chunk: ") + cromp_client::outcome_name(place_ret));
311 infoton *place_reply_tmp = NIL;
312 //hmmm: set timeout appropriate to the speed of the connection!
313 outcome place_receipt = _client_side->acquire(place_reply_tmp, cmd_id);
314 if (place_receipt != cromp_client::OKAY)
315 non_continuable_error(class_name(), func, astring("failed to receive "
316 "place response: ") + cromp_client::outcome_name(place_receipt));
318 file_transfer_infoton *place_reply = dynamic_cast<file_transfer_infoton *>
321 if (dynamic_cast<unhandled_request *>(place_reply_tmp)) {
322 log(astring("The server does not support file transfers."), ALWAYS_PRINT);
323 WHACK(place_reply_tmp);
326 non_continuable_error(class_name(), func, "failed to cast storage reply infoton "
330 int reply_size = place_reply->_packed_data.length();
332 outcome eval_ret2 = _client_side->octo()->evaluate(place_reply, cmd_id, true);
333 if (eval_ret2 != tentacle::OKAY)
334 non_continuable_error(class_name(), func, astring("failed to process the "
335 "place response: ") + cromp_client::outcome_name(eval_ret2));
338 LOG("hit termination condition: no data packed in for file chunks.");
345 int transporter::execute()
349 if (!_started_okay) return 32;
351 time_stamp next_report(REPORTING_INTERVAL);
354 // make sure we didn't see our exit condition.
356 if (_server_side && !_server_side->clients() && _leave_when_no_clients
362 if (_client_side) return push_client_download();
364 if (time_stamp() > next_report) {
366 LOG(a_sprintf("There are %d clients.", _server_side->clients()));
367 //report about client side also.
368 next_report.reset(REPORTING_INTERVAL);
371 time_control::sleep_ms(100);
378 HOOPLE_MAIN(transporter, )