making the revamp available in general
[feisty_meow.git] / octopi / applications / transporter / transporter.cpp
1 /*****************************************************************************\
2 *                                                                             *
3 *  Name   : transporter                                                       *
4 *  Author : Chris Koeritz                                                     *
5 *                                                                             *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author.  This program is free software; you can  *
8 * redistribute it and/or modify it under the terms of the GNU General Public  *
9 * License as published by the Free Software Foundation; either version 2 of   *
10 * the License or (at your option) any later version.  This is online at:      *
11 *     http://www.fsf.org/copyleft/gpl.html                                    *
12 * Please send any updates to: fred@gruntose.com                               *
13 \*****************************************************************************/
14
15 #include <basis/byte_array.h>
16 #include <basis/astring.h>
17
18 #include <application/hoople_main.h>
19 #include <application/command_line.h>
20 #include <cromp/cromp_client.h>
21 #include <cromp/cromp_server.h>
22 #include <filesystem/directory_tree.h>
23 #include <filesystem/filename_list.h>
24 #include <loggers/combo_logger.h>
25 #include <octopus/entity_defs.h>
26 #include <octopus/tentacle.h>
27 #include <octopus/unhandled_request.h>
28 #include <sockets/internet_address.h>
29 #include <sockets/machine_uid.h>
30 #include <sockets/tcpip_stack.h>
31 #include <structures/static_memory_gremlin.h>
32 #include <tentacles/file_transfer_tentacle.h>
33 #include <timely/time_control.h>
34 #include <timely/time_stamp.h>
35
36 using namespace application;
37 using namespace basis;
38 using namespace cromp;
39 using namespace filesystem;
40 using namespace loggers;
41 using namespace octopi;
42 using namespace sockets;
43 using namespace structures;
44 using namespace textual;
45 using namespace timely;
46
47 #define LOG(a) CLASS_EMERGENCY_LOG(program_wide_logger::get(), a)
48
49 const int REPORTING_INTERVAL = 28 * SECOND_ms;  // how often to squawk.
50
51 const int REFRESH_INTERVAL = 120 * MINUTE_ms;  // how often we check tree.
52
53 const int TRANSFER_PORT = 10808;
54   // simple port grabbed randomly for the default.
55
56 const int MAX_CHUNK = 2 * MEGABYTE;
57   // we will transfer fairly large chunks so we can get this done reasonably
58   // quickly.  even at that size, it shouldn't cause most modern machines to
59   // hiccup even slightly.
60
61 //////////////
62
63 class transporter : public application_shell
64 {
65 public:
66   transporter();
67   ~transporter();
68
69   virtual int execute();
70
71   DEFINE_CLASS_NAME("transporter");
72
73   int push_client_download();
74     // for a client side download, this prods the transfer process.
75
76   int print_instructions();
77     // shows the instructions for this application.
78
79 private:
80   bool _saw_clients;  // true if we ever got a connection.
81   cromp_server *_server_side;
82     // provides connection and transmission services for servers.
83   cromp_client *_client_side;  // client side connection.
84   bool _leave_when_no_clients;  // true if we should just do one run.
85   bool _encryption;  // true if we're encrypting.
86   astring _source;  // the source path which a client will ask the server for.
87   astring _target;  // the target path where files are stored for the client.
88   bool _started_okay;  // true if we got past the command line checks.
89 };
90
91 //////////////
92
93 transporter::transporter()
94 : application_shell(),
95   _saw_clients(false),
96   _server_side(NULL_POINTER),
97   _client_side(NULL_POINTER),
98   _leave_when_no_clients(false),
99   _encryption(false),
100   _started_okay(false)
101 {
102   FUNCDEF("constructor");
103   SETUP_COMBO_LOGGER;
104   LOG("");
105   LOG("");
106
107   command_line args(_global_argc, _global_argv);
108   // check for a port on the command line.
109   astring port_text;
110   int port = TRANSFER_PORT;
111   if (args.get_value("port", port_text, false))
112     port = port_text.convert(TRANSFER_PORT);
113   int posn = 0;
114   if (args.find("exit", posn)) {
115     LOG("seeing the 'exit without clients' flag set.");
116     _leave_when_no_clients = true;
117   }
118
119   int indy = 0;
120   if (args.find("encrypt", indy, false)
121       || (args.find('e', indy, false)) ) {
122 LOG("enabling encryption!");
123     // they're saying that we should encrypt the communication.
124     _encryption = true;
125   }
126
127   bool server = true;
128   indy = 0;
129   if (args.find("client", indy, false)) {
130 LOG("client side chosen");
131     server = false;
132   }
133
134   internet_address addr;
135   addr.port = port;
136
137   // check for a hostname on the command line.
138   astring hostname("local");
139   astring host_temp;
140   if (args.get_value("host", host_temp, false)) {
141     LOG(astring("using host: ") + host_temp);
142     hostname = host_temp;
143   } else LOG(astring("using host: ") + hostname);
144   strcpy(addr.hostname, hostname.s());
145
146   if (server) {
147     astring key;
148     if (!args.get_value("key", key, false)) {
149       print_instructions();
150       LOG("No keyword specified on command line.");
151       return;
152     }
153     astring root;
154     if (!args.get_value("root", root, false)) {
155       print_instructions();
156       LOG("No transfer root was specified on the command line.");
157       return;
158     }
159
160     LOG("starting transfer server");
161     _server_side = new cromp_server(cromp_server::any_address(port));
162     file_transfer_tentacle *new_tent = new file_transfer_tentacle(MAX_CHUNK,
163         (file_transfer_tentacle::transfer_modes)(file_transfer_tentacle::ONLY_REPORT_DIFFS
164         | file_transfer_tentacle::COMPARE_SIZE_AND_TIME
165         | file_transfer_tentacle::COMPARE_CONTENT_SAMPLE));
166
167 LOG(key + " => " + root);
168     new_tent->add_correspondence(key, root, REFRESH_INTERVAL);
169     _server_side->add_tentacle(new_tent);
170     _server_side->enable_servers(_encryption);
171   } else {
172     LOG("starting transfer client");
173     _client_side = new cromp_client(addr);
174     if (_encryption) _client_side->enable_encryption();
175
176     outcome ret = _client_side->connect();
177     if (ret != cromp_client::OKAY)
178       non_continuable_error(class_name(), func, astring("failed to connect to "
179           "the server: ") + cromp_client::outcome_name(ret));
180
181     file_transfer_tentacle *new_tent = new file_transfer_tentacle(MAX_CHUNK,
182         (file_transfer_tentacle::transfer_modes)(file_transfer_tentacle::ONLY_REPORT_DIFFS
183         | file_transfer_tentacle::COMPARE_SIZE_AND_TIME
184         | file_transfer_tentacle::COMPARE_CONTENT_SAMPLE));
185
186     if (!args.get_value("source", _source, false)) {
187       print_instructions();
188       LOG("No source path was specified on the command line.");
189       return;
190     }
191     if (!args.get_value("target", _target, false)) {
192       print_instructions();
193       LOG("No target path was specified on the command line.");
194       return;
195     }
196
197     string_array includes;
198     outcome regis = new_tent->register_file_transfer
199         (_client_side->entity(), _source, _target, includes);
200     if (regis != cromp_client::OKAY)
201       non_continuable_error(class_name(), func, "failed to register transfer");
202
203     _client_side->add_tentacle(new_tent);
204   }
205
206   _started_okay = true;
207
208 }
209
210 transporter::~transporter()
211 {
212   WHACK(_client_side);
213   WHACK(_server_side);
214 }
215
216 int transporter::print_instructions()
217 {
218   astring name = filename(_global_argv[0]).basename().raw();
219   log(a_sprintf("%s usage:", name.s()));
220   log(astring::empty_string());
221   log(a_sprintf("\
222 This program can transfer directory trees across the network.  It will only\n\
223 copy the files missing on the client's side given what the server offers.\n\
224 The program can function as either the server side or the client side.\n\
225 The available flags are:\n\
226 \n\
227 %s --client --host srvname --port P --source key_path --target cli_dest\n\
228 \n\
229 The client side needs to know the server host (srvname) and the port where\n\
230 the server is listening for connections (P).  The client will compare its\n\
231 local path (cli_dest) with the server's keyed path (key_path) and copy the\n\
232 files that are missing on the client's side.  The key path will begin with\n\
233 whatever keyword the server is offering, plus optional additional path\n\
234 components to retrieve less than the whole tree being served.\n\
235 \n\
236 \n\
237 %s --server --host srvname --port P --key keyname --root srv_path\n\
238 \n\
239 The server side needs to know what address and port to listen on (srvname\n\
240 and P).  It will open a server there that provides a directory hierarchy\n\
241 starting at the root specified (srv_path).  The directory tree will be known\n\
242 to clients as the key word (keyname), thus freeing the clients from needing\n\
243 to know absolute paths on the server.\n\
244 \n\
245 ", name.s(), name.s()));
246
247   return 23;
248 }
249
250 int transporter::push_client_download()
251 {
252   FUNCDEF("push_client_download");
253   // prepare a client request
254   file_transfer_infoton initiate;
255   initiate._request = true;
256   initiate._command = file_transfer_infoton::BUILD_TARGET_TREE;
257   initiate._src_root = _source;
258   initiate._dest_root = _target;
259
260   // make a directory snapshot with just directories, no files.
261   directory_tree target_area_just_dirs(_target, "*", true);
262   string_set includes;
263   initiate.package_tree_info(target_area_just_dirs, includes);
264   octopus_request_id cmd_id;
265   outcome build_ret = _client_side->submit(initiate, cmd_id);
266   if (build_ret != tentacle::OKAY)
267     non_continuable_error(class_name(), func, astring("failed to build the "
268         " target tree: ") + cromp_client::outcome_name(build_ret));
269
270   // now get the full contents going on.
271   initiate._command = file_transfer_infoton::TREE_COMPARISON;
272   directory_tree target_area(_target);
273   target_area.calculate(false);
274   includes.reset();
275   initiate.package_tree_info(target_area, includes);
276   outcome start_ret = _client_side->submit(initiate, cmd_id);
277   if (start_ret != tentacle::OKAY)
278     non_continuable_error(class_name(), func, astring("failed to initiate "
279         " the transfer: ") + cromp_client::outcome_name(start_ret));
280
281   infoton *start_reply_tmp = NULL_POINTER;
282 //hmmm: set timeout appropriate to the speed of the connection!
283   outcome first_receipt = _client_side->acquire(start_reply_tmp, cmd_id);
284   if (first_receipt != cromp_client::OKAY)
285     non_continuable_error(class_name(), func, astring("failed to receive response: ")
286         + cromp_client::outcome_name(start_ret));
287   file_transfer_infoton *start_reply = dynamic_cast<file_transfer_infoton *>
288       (start_reply_tmp);
289   if (!start_reply)
290     non_continuable_error(class_name(), func, "failed to cast starting infoton to "
291         "proper type");
292
293 //debugging start
294   filename_list diffs;
295   byte_array pack_copy = start_reply->_packed_data;
296   if (!diffs.unpack(pack_copy))
297     non_continuable_error(class_name(), func, "could not unpack filename list!");
298   LOG(astring("got list of diffs:\n") + diffs.text_form());
299 //debugging end
300
301   outcome eval_ret = _client_side->octo()->evaluate(start_reply, cmd_id, true);
302   if (eval_ret != cromp_client::OKAY)
303     non_continuable_error(class_name(), func, astring("failed to process the "
304         "start response: ") + cromp_client::outcome_name(eval_ret));
305
306   int iter = 0;
307
308   while (true) {
309 LOG(a_sprintf("ongoing chunk %d", ++iter));
310     // keep going until we find a broken reply.
311     file_transfer_infoton ongoing;
312     ongoing._request = true;
313     ongoing._command = file_transfer_infoton::PLACE_FILE_CHUNKS;
314     ongoing._src_root = _source;
315     ongoing._dest_root = _target;
316
317     octopus_request_id cmd_id;
318     outcome place_ret = _client_side->submit(ongoing, cmd_id);
319     if (place_ret != cromp_client::OKAY)
320       non_continuable_error(class_name(), func, astring("failed to send ongoing "
321           "chunk: ") + cromp_client::outcome_name(place_ret));
322
323     infoton *place_reply_tmp = NULL_POINTER;
324 //hmmm: set timeout appropriate to the speed of the connection!
325     outcome place_receipt = _client_side->acquire(place_reply_tmp, cmd_id);
326     if (place_receipt != cromp_client::OKAY)
327       non_continuable_error(class_name(), func, astring("failed to receive "
328           "place response: ") + cromp_client::outcome_name(place_receipt));
329
330     file_transfer_infoton *place_reply = dynamic_cast<file_transfer_infoton *>
331         (place_reply_tmp);
332     if (!place_reply) {
333       if (dynamic_cast<unhandled_request *>(place_reply_tmp)) {
334         log(astring("The server does not support file transfers."), ALWAYS_PRINT);
335         WHACK(place_reply_tmp);
336         break;
337       }
338       non_continuable_error(class_name(), func, "failed to cast storage reply infoton "
339           "to proper type");
340     }
341
342     int reply_size = place_reply->_packed_data.length();
343
344     outcome eval_ret2 = _client_side->octo()->evaluate(place_reply, cmd_id, true);
345     if (eval_ret2 != tentacle::OKAY)
346       non_continuable_error(class_name(), func, astring("failed to process the "
347           "place response: ") + cromp_client::outcome_name(eval_ret2));
348
349     if (!reply_size) {
350       LOG("hit termination condition: no data packed in for file chunks.");
351       break;
352     }
353   }
354   return 0;
355 }
356
357 int transporter::execute()
358 {
359   FUNCDEF("execute");
360
361   if (!_started_okay) return 32;
362
363   time_stamp next_report(REPORTING_INTERVAL);
364
365   while (true) {
366     // make sure we didn't see our exit condition.
367
368     if (_server_side && !_server_side->clients() && _leave_when_no_clients
369         && _saw_clients) {
370       LOG("exiting now");
371       break;
372     }
373
374     if (_client_side) return push_client_download();
375
376     if (time_stamp() > next_report) {
377       if (_server_side)
378         LOG(a_sprintf("There are %d clients.", _server_side->clients()));
379 //report about client side also.
380       next_report.reset(REPORTING_INTERVAL);
381     }
382
383     time_control::sleep_ms(100); 
384   }
385   return 0;
386 }
387
388 //////////////
389
390 HOOPLE_MAIN(transporter, )
391