first check-in of feisty meow codebase. many things broken still due to recent
[feisty_meow.git] / octopi / applications / transporter / transporter.cpp
1 /*****************************************************************************\
2 *                                                                             *
3 *  Name   : transporter                                                       *
4 *  Author : Chris Koeritz                                                     *
5 *                                                                             *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author.  This program is free software; you can  *
8 * redistribute it and/or modify it under the terms of the GNU General Public  *
9 * License as published by the Free Software Foundation; either version 2 of   *
10 * the License or (at your option) any later version.  This is online at:      *
11 *     http://www.fsf.org/copyleft/gpl.html                                    *
12 * Please send any updates to: fred@gruntose.com                               *
13 \*****************************************************************************/
14
15 #include <basis/byte_array.h>
16 #include <basis/astring.h>
17
18 #include <application/hoople_main.h>
19 #include <application/command_line.h>
20 #include <cromp/cromp_client.h>
21 #include <cromp/cromp_server.h>
22 #include <filesystem/directory_tree.h>
23 #include <filesystem/filename_list.h>
24 #include <loggers/combo_logger.h>
25 #include <octopus/entity_defs.h>
26 #include <octopus/tentacle.h>
27 #include <octopus/unhandled_request.h>
28 #include <sockets/internet_address.h>
29 #include <sockets/machine_uid.h>
30 #include <sockets/tcpip_stack.h>
31 #include <structures/static_memory_gremlin.h>
32 #include <tentacles/file_transfer_tentacle.h>
33 #include <timely/time_control.h>
34 #include <timely/time_stamp.h>
35
36 using namespace application;
37 using namespace basis;
38 using namespace cromp;
39 using namespace filesystem;
40 using namespace loggers;
41 using namespace octopi;
42 using namespace sockets;
43 using namespace structures;
44 using namespace textual;
45 using namespace timely;
46
47 #define LOG(a) CLASS_EMERGENCY_LOG(program_wide_logger::get(), a)
48
49 const int REPORTING_INTERVAL = 28 * SECOND_ms;  // how often to squawk.
50
51 const int REFRESH_INTERVAL = 120 * MINUTE_ms;  // how often we check tree.
52
53 const int TRANSFER_PORT = 10808;
54   // simple port grabbed randomly for the default.
55
56 const int MAX_CHUNK = 2 * MEGABYTE;
57   // we will transfer fairly large chunks so we can get this done reasonably
58   // quickly.  even at that size, it shouldn't cause most modern machines to
59   // hiccup even slightly.
60
61 //////////////
62
63 class transporter : public application_shell
64 {
65 public:
66   transporter();
67   ~transporter();
68
69   virtual int execute();
70
71   DEFINE_CLASS_NAME("transporter");
72
73   int push_client_download();
74     // for a client side download, this prods the transfer process.
75
76   int print_instructions();
77     // shows the instructions for this application.
78
79 private:
80   bool _saw_clients;  // true if we ever got a connection.
81   cromp_server *_server_side;
82     // provides connection and transmission services for servers.
83   cromp_client *_client_side;  // client side connection.
84   bool _leave_when_no_clients;  // true if we should just do one run.
85   bool _encryption;  // true if we're encrypting.
86   astring _source;  // the source path which a client will ask the server for.
87   astring _target;  // the target path where files are stored for the client.
88   bool _started_okay;  // true if we got past the command line checks.
89 };
90
91 //////////////
92
93 transporter::transporter()
94 : application_shell(),
95   _saw_clients(false),
96   _server_side(NIL),
97   _client_side(NIL),
98   _leave_when_no_clients(false),
99   _encryption(false),
100   _started_okay(false)
101 {
102   FUNCDEF("constructor");
103   SETUP_COMBO_LOGGER;
104   LOG("");
105   LOG("");
106
107   command_line args(_global_argc, _global_argv);
108   // check for a port on the command line.
109   astring port_text;
110   int port = TRANSFER_PORT;
111   if (args.get_value("port", port_text, false))
112     port = port_text.convert(TRANSFER_PORT);
113   int posn = 0;
114   if (args.find("exit", posn)) {
115     LOG("seeing the 'exit without clients' flag set.");
116     _leave_when_no_clients = true;
117   }
118
119   int indy = 0;
120   if (args.find("encrypt", indy, false)
121       || (args.find('e', indy, false)) ) {
122 LOG("enabling encryption!");
123     // they're saying that we should encrypt the communication.
124     _encryption = true;
125   }
126
127   bool server = true;
128   indy = 0;
129   if (args.find("client", indy, false)) {
130 LOG("client side chosen");
131     server = false;
132   }
133
134   internet_address addr;
135   addr.port = port;
136
137   // check for a hostname on the command line.
138   astring hostname("local");
139   astring host_temp;
140   if (args.get_value("host", host_temp, false)) {
141     LOG(astring("using host: ") + host_temp);
142     hostname = host_temp;
143   } else LOG(astring("using host: ") + hostname);
144   strcpy(addr.hostname, hostname.s());
145
146   if (server) {
147     astring key;
148     if (!args.get_value("key", key, false)) {
149       print_instructions();
150       LOG("No keyword specified on command line.");
151       return;
152     }
153     astring root;
154     if (!args.get_value("root", root, false)) {
155       print_instructions();
156       LOG("No transfer root was specified on the command line.");
157       return;
158     }
159
160     LOG("starting transfer server");
161     _server_side = new cromp_server(cromp_server::any_address(port));
162     file_transfer_tentacle *new_tent = new file_transfer_tentacle(MAX_CHUNK,
163         (file_transfer_tentacle::transfer_modes)(file_transfer_tentacle::ONLY_REPORT_DIFFS
164         | file_transfer_tentacle::COMPARE_SIZE_AND_TIME
165         | file_transfer_tentacle::COMPARE_CONTENT_SAMPLE));
166
167 LOG(key + " => " + root);
168     new_tent->add_correspondence(key, root, REFRESH_INTERVAL);
169     _server_side->add_tentacle(new_tent);
170     _server_side->enable_servers(_encryption);
171   } else {
172     LOG("starting transfer client");
173     _client_side = new cromp_client(addr);
174     if (_encryption) _client_side->enable_encryption();
175
176     outcome ret = _client_side->connect();
177     if (ret != cromp_client::OKAY)
178       non_continuable_error(class_name(), func, astring("failed to connect to "
179           "the server: ") + cromp_client::outcome_name(ret));
180
181     file_transfer_tentacle *new_tent = new file_transfer_tentacle(MAX_CHUNK,
182         (file_transfer_tentacle::transfer_modes)(file_transfer_tentacle::ONLY_REPORT_DIFFS
183         | file_transfer_tentacle::COMPARE_SIZE_AND_TIME
184         | file_transfer_tentacle::COMPARE_CONTENT_SAMPLE));
185
186     if (!args.get_value("source", _source, false)) {
187       print_instructions();
188       LOG("No source path was specified on the command line.");
189       return;
190     }
191     if (!args.get_value("target", _target, false)) {
192       print_instructions();
193       LOG("No target path was specified on the command line.");
194       return;
195     }
196
197     string_array includes;
198     outcome regis = new_tent->register_file_transfer
199         (_client_side->entity(), _source, _target, includes);
200     if (regis != cromp_client::OKAY)
201       non_continuable_error(class_name(), func, "failed to register transfer");
202
203     _client_side->add_tentacle(new_tent);
204   }
205
206   _started_okay = true;
207
208 }
209
210 transporter::~transporter()
211 {
212   WHACK(_client_side);
213   WHACK(_server_side);
214 }
215
216 int transporter::print_instructions()
217 {
218   astring name = filename(_global_argv[0]).basename().raw();
219   log(a_sprintf("%s usage:", name.s()));
220   log(astring::empty_string());
221   log(a_sprintf("\
222 This program can transfer directory trees across the network.  It will only\n\
223 copy the files missing on the client's side given what the server offers.\n\
224 The program can function as either the server side or the client side.\n\
225 The available flags are:\n\
226 \n\
227 %s --client --host srvname --port P --source key_path --target cli_dest\n\
228 \n\
229 The client side needs to know the server host (srvname) and the port where\n\
230 the server is listening for connections (P).  The client will compare its\n\
231 local path (cli_dest) with the server's keyed path (key_path) and copy the\n\
232 files that are missing on the client's side.  The key path will begin with\n\
233 whatever keyword the server is offering, plus optional additional path\n\
234 components to retrieve less than the whole tree being served.\n\
235 \n\
236 \n\
237 %s --server --host srvname --port P --key keyname --root srv_path\n\
238 \n\
239 The server side needs to know what address and port to listen on (srvname\n\
240 and P).  It will open a server there that provides a directory hierarchy\n\
241 starting at the root specified (srv_path).  The directory tree will be known\n\
242 to clients as the key word (keyname), thus freeing the clients from needing\n\
243 to know absolute paths on the server.\n\
244 \n\
245 ", name.s(), name.s()));
246
247   return 23;
248 }
249
250 int transporter::push_client_download()
251 {
252   FUNCDEF("push_client_download");
253   // prepare a client request
254   file_transfer_infoton initiate;
255   initiate._request = true;
256   initiate._command = file_transfer_infoton::TREE_COMPARISON;
257   initiate._src_root = _source;
258   initiate._dest_root = _target;
259   directory_tree target_area(_target);
260   target_area.calculate(false);
261   string_set includes;
262   initiate.package_tree_info(target_area, includes);
263   octopus_request_id cmd_id;
264   outcome start_ret = _client_side->submit(initiate, cmd_id);
265   if (start_ret != tentacle::OKAY)
266     non_continuable_error(class_name(), func, astring("failed to initiate "
267         " the transfer: ") + cromp_client::outcome_name(start_ret));
268
269   infoton *start_reply_tmp = NIL;
270 //hmmm: set timeout appropriate to the speed of the connection!
271   outcome first_receipt = _client_side->acquire(start_reply_tmp, cmd_id);
272   if (first_receipt != cromp_client::OKAY)
273     non_continuable_error(class_name(), func, astring("failed to receive response: ")
274         + cromp_client::outcome_name(start_ret));
275   file_transfer_infoton *start_reply = dynamic_cast<file_transfer_infoton *>
276       (start_reply_tmp);
277   if (!start_reply)
278     non_continuable_error(class_name(), func, "failed to cast starting infoton to "
279         "proper type");
280
281 //debugging start
282   filename_list diffs;
283   byte_array pack_copy = start_reply->_packed_data;
284   if (!diffs.unpack(pack_copy))
285     non_continuable_error(class_name(), func, "could not unpack filename list!");
286   LOG(astring("got list of diffs:\n") + diffs.text_form());
287 //debugging end
288
289   outcome eval_ret = _client_side->octo()->evaluate(start_reply, cmd_id, true);
290   if (eval_ret != cromp_client::OKAY)
291     non_continuable_error(class_name(), func, astring("failed to process the "
292         "start response: ") + cromp_client::outcome_name(eval_ret));
293
294   int iter = 0;
295
296   while (true) {
297 LOG(a_sprintf("ongoing chunk %d", ++iter));
298     // keep going until we find a broken reply.
299     file_transfer_infoton ongoing;
300     ongoing._request = true;
301     ongoing._command = file_transfer_infoton::PLACE_FILE_CHUNKS;
302     ongoing._src_root = _source;
303     ongoing._dest_root = _target;
304
305     octopus_request_id cmd_id;
306     outcome place_ret = _client_side->submit(ongoing, cmd_id);
307     if (place_ret != cromp_client::OKAY)
308       non_continuable_error(class_name(), func, astring("failed to send ongoing "
309           "chunk: ") + cromp_client::outcome_name(place_ret));
310
311     infoton *place_reply_tmp = NIL;
312 //hmmm: set timeout appropriate to the speed of the connection!
313     outcome place_receipt = _client_side->acquire(place_reply_tmp, cmd_id);
314     if (place_receipt != cromp_client::OKAY)
315       non_continuable_error(class_name(), func, astring("failed to receive "
316           "place response: ") + cromp_client::outcome_name(place_receipt));
317
318     file_transfer_infoton *place_reply = dynamic_cast<file_transfer_infoton *>
319         (place_reply_tmp);
320     if (!place_reply) {
321       if (dynamic_cast<unhandled_request *>(place_reply_tmp)) {
322         log(astring("The server does not support file transfers."), ALWAYS_PRINT);
323         WHACK(place_reply_tmp);
324         break;
325       }
326       non_continuable_error(class_name(), func, "failed to cast storage reply infoton "
327           "to proper type");
328     }
329
330     int reply_size = place_reply->_packed_data.length();
331
332     outcome eval_ret2 = _client_side->octo()->evaluate(place_reply, cmd_id, true);
333     if (eval_ret2 != tentacle::OKAY)
334       non_continuable_error(class_name(), func, astring("failed to process the "
335           "place response: ") + cromp_client::outcome_name(eval_ret2));
336
337     if (!reply_size) {
338       LOG("hit termination condition: no data packed in for file chunks.");
339       break;
340     }
341   }
342   return 0;
343 }
344
345 int transporter::execute()
346 {
347   FUNCDEF("execute");
348
349   if (!_started_okay) return 32;
350
351   time_stamp next_report(REPORTING_INTERVAL);
352
353   while (true) {
354     // make sure we didn't see our exit condition.
355
356     if (_server_side && !_server_side->clients() && _leave_when_no_clients
357         && _saw_clients) {
358       LOG("exiting now");
359       break;
360     }
361
362     if (_client_side) return push_client_download();
363
364     if (time_stamp() > next_report) {
365       if (_server_side)
366         LOG(a_sprintf("There are %d clients.", _server_side->clients()));
367 //report about client side also.
368       next_report.reset(REPORTING_INTERVAL);
369     }
370
371     time_control::sleep_ms(100); 
372   }
373   return 0;
374 }
375
376 //////////////
377
378 HOOPLE_MAIN(transporter, )
379