1 /*****************************************************************************\
3 * Name : cromp_common *
4 * Author : Chris Koeritz *
6 *******************************************************************************
7 * Copyright (c) 2000-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
17 // for a cromp_common that is "normal", the base octopus will be used for
18 // restoring infotons.
19 // for a dependent cromp_common with a singleton and preexisting socket,
20 // the socket will be used for communications and the singleton octopus will
21 // be used for restore().
23 // there are a few tiers of methods here. the lowest-level tier can be
24 // called by any other functions except those in the lowest-level (so being on
25 // tier A implies that a method may not call other methods in tier A, but being
26 // on a tier X allows calling of all existent tiers X-1, X-2, ...).
28 // last verified that conditions stated in header about variables protected
29 // by accumulator lock are true: 12/30/2002.
31 #include "cromp_common.h"
32 #include "cromp_transaction.h"
34 #include <basis/byte_array.h>
35 #include <basis/functions.h>
36 #include <basis/astring.h>
37 #include <basis/mutex.h>
38 #include <crypto/rsa_crypto.h>
39 #include <loggers/program_wide_logger.h>
40 #include <octopus/entity_data_bin.h>
41 #include <octopus/entity_defs.h>
42 #include <octopus/infoton.h>
43 #include <octopus/octopus.h>
44 #include <octopus/tentacle.h>
45 #include <octopus/unhandled_request.h>
46 #include <sockets/internet_address.h>
47 #include <sockets/machine_uid.h>
48 #include <sockets/spocket.h>
49 #include <sockets/tcpip_stack.h>
50 #include <structures/static_memory_gremlin.h>
51 #include <tentacles/encryption_infoton.h>
52 #include <textual/byte_formatter.h>
53 #include <timely/time_stamp.h>
55 using namespace basis;
56 using namespace crypto;
57 using namespace loggers;
58 using namespace octopi;
59 using namespace sockets;
60 using namespace structures;
61 using namespace textual;
62 using namespace timely;
66 //#define DEBUG_CROMP_COMMON
67 // uncomment for debugging info.
70 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(to_print).s())
72 const int STALENESS_PERIOD = 2 * MINUTE_ms;
73 // if data sits in the buffer this long without us seeing more, we assume
76 const int SEND_DELAY_TIME = 200;
77 // if the send failed initially, we'll delay this long before trying again.
79 const int DATA_AWAIT_SNOOZE = 80;
80 // we sleep for this long while we await data.
82 const int QUICK_CROMP_SNOOZE = 28;
83 // we take a quick nap if we're looking for some data and it's not there
86 const int CROMP_BUFFER_CHUNK_SIZE = 256 * KILOBYTE;
87 // the initial allocation size for buffers.
89 const int MAXIMUM_RECEIVES = 70;
90 // the maximum number of receptions before we skip to next phase.
92 const int MAXIMUM_SEND = 128 * KILOBYTE;
93 // the largest chunk we try to send at a time. we want to limit this
94 // rather than continually asking the OS to consume a big transmission.
96 const int CLEANUP_INTERVAL = 28 * SECOND_ms;
97 // this is how frequently we'll flush out items from our data bin that
100 const int cromp_common::HOSTCHOP = 6;
101 // we take this many characters as the readable textual portion of the
104 double cromp_common::_bytes_sent_total = 0.0;
105 double cromp_common::_bytes_received_total = 0.0;
107 SAFE_STATIC_CONST(rsa_crypto, _hidden_localhost_only_key,
108 (encryption_infoton::RSA_KEY_SIZE))
109 const rsa_crypto &cromp_common::localhost_only_key() {
110 #ifdef DEBUG_CROMP_COMMON
111 FUNCDEF("localhost_only_key");
113 static bool initted = false;
114 #ifdef DEBUG_CROMP_COMMON
115 bool was_initted = initted;
118 #ifdef DEBUG_CROMP_COMMON
120 LOG("started creating localhost RSA key.");
122 const rsa_crypto &to_return = _hidden_localhost_only_key();
123 #ifdef DEBUG_CROMP_COMMON
125 LOG("done creating localhost RSA key.");
130 cromp_common::cromp_common(const astring &host, int max_per_ent)
131 : _commlink(NULL_POINTER),
132 _octopus(new octopus(host, max_per_ent)),
133 _singleton(NULL_POINTER),
134 _requests(new entity_data_bin(max_per_ent)),
135 _accum_lock(new mutex),
136 _last_data_seen(new time_stamp),
137 _accumulator(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NULL_POINTER)),
138 _sendings(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NULL_POINTER)),
139 _receive_buffer(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NULL_POINTER)),
140 _still_flat(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NULL_POINTER)),
141 _last_cleanup(new time_stamp)
143 FUNCDEF("constructor [host/max_per_ent]");
144 // clear pre-existing space.
145 _accumulator->reset();
147 _receive_buffer->reset();
148 _still_flat->reset();
151 cromp_common::cromp_common(spocket *preexisting, octopus *singleton)
152 : _commlink(preexisting),
154 _singleton(singleton),
155 _requests(new entity_data_bin(singleton?
156 singleton->responses().max_bytes_per_entity()
157 : DEFAULT_MAX_ENTITY_QUEUE)),
158 _accum_lock(new mutex),
159 _last_data_seen(new time_stamp),
160 _accumulator(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NULL_POINTER)),
161 _sendings(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NULL_POINTER)),
162 _receive_buffer(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NULL_POINTER)),
163 _still_flat(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NULL_POINTER)),
164 _last_cleanup(new time_stamp)
166 FUNCDEF("constructor [preexisting/singleton]");
168 // they passed us a bad singleton. carry on as best we can.
169 LOG("singleton passed as NULL_POINTER; constructing new octopus instead.");
170 internet_address local(internet_address::localhost(), "localhost", 0);
171 _octopus = new octopus(chew_hostname(local), DEFAULT_MAX_ENTITY_QUEUE);
173 // clear pre-existing space.
174 _accumulator->reset();
176 _receive_buffer->reset();
177 _still_flat->reset();
180 cromp_common::~cromp_common()
182 FUNCDEF("destructor");
183 close_common(); // shuts down our socket and other stuff.
185 _singleton = NULL_POINTER; // reset the pointer we had.
186 _octopus = NULL_POINTER; // ditto.
188 // this one was ours so we need to clean it up.
195 WHACK(_last_cleanup);
196 WHACK(_last_data_seen);
197 WHACK(_receive_buffer);
202 spocket *cromp_common::spock() const { return _commlink; }
204 int cromp_common::default_port() { return 10008; }
206 outcome cromp_common::add_tentacle(tentacle *to_add, bool filter)
207 { return _octopus->add_tentacle(to_add, filter); }
209 int cromp_common::pending_sends() const
211 auto_synchronizer l(*_accum_lock);
212 return _sendings->length();
215 int cromp_common::accumulated_bytes() const
217 auto_synchronizer l(*_accum_lock);
218 return _accumulator->length();
221 astring cromp_common::chew_hostname(const internet_address &addr,
222 internet_address *resolved_form)
224 #ifdef DEBUG_CROMP_COMMON
225 FUNCDEF("chew_hostname");
226 LOG(astring("addr coming in ") + addr.text_form());
230 internet_address res1 = stack.fill_and_resolve(addr.hostname, addr.port,
233 if (resolved_form) *resolved_form = res1;
234 #ifdef DEBUG_CROMP_COMMON
235 LOG(astring("resolved addr ") + res1.text_form());
238 #ifdef DEBUG_CROMP_COMMON
239 LOG(astring("failed to resolve host=") + addr.hostname);
243 // get a readable form of the host.
244 astring just_host = res1.normalize_host();
245 while (just_host.length() < HOSTCHOP) just_host += "-"; // filler.
246 machine_uid converted = res1.convert();
247 astring to_return = just_host.substring(0, HOSTCHOP - 1);
248 to_return += converted.compact_form();
250 #ifdef DEBUG_CROMP_COMMON
251 LOG(astring("returning machid ") + converted.text_form() + ", packed as "
252 + parser_bits::platform_eol_to_chars()
253 + byte_formatter::text_dump((abyte *)to_return.s(),
254 to_return.length() + 1));
260 astring cromp_common::responses_text_form() const
261 { return _requests->text_form(); }
263 internet_address cromp_common::other_side() const
265 if (!_commlink) return internet_address();
266 return _commlink->where();
269 int cromp_common::max_bytes_per_entity() const
270 { return _requests->max_bytes_per_entity(); }
272 void cromp_common::max_bytes_per_entity(int max_bytes_per_entity)
274 _requests->max_bytes_per_entity(max_bytes_per_entity);
275 _octopus->responses().max_bytes_per_entity(max_bytes_per_entity);
278 void cromp_common::conditional_cleaning()
280 FUNCDEF("conditional_cleaning");
281 if (time_stamp(-CLEANUP_INTERVAL) > *_last_cleanup) {
282 _requests->clean_out_deadwood();
283 // flush any items that are too old.
284 _last_cleanup->reset();
285 // record that we just cleaned up.
289 outcome cromp_common::open_common(const internet_address &where)
291 #ifdef DEBUG_CROMP_COMMON
292 FUNCDEF("open_common");
294 if (_singleton && _commlink)
295 return OKAY; // done if this uses pre-existing objects.
297 if (_commlink) WHACK(_commlink); // clean up any pre-existing socket.
299 internet_address other_side = where;
301 #ifdef DEBUG_CROMP_COMMON
302 LOG(astring("opening at ") + other_side.text_form());
304 _commlink = new spocket(other_side);
305 //hmmm: check socket health.
310 outcome cromp_common::close_common()
312 if (_commlink) _commlink->disconnect(); // make the thread stop bothering.
316 const char *cromp_common::outcome_name(const outcome &to_name)
318 switch (to_name.value()) {
319 case TOO_FULL: return "TOO_FULL";
320 case PARTIAL: return "PARTIAL";
321 default: return communication_commons::outcome_name(to_name);
325 outcome cromp_common::pack_and_ship(const infoton_list &requests,
328 FUNCDEF("pack_and_ship [multiple]");
329 if (!_commlink) return BAD_INPUT; // they haven't opened this yet.
330 conditional_cleaning();
332 auto_synchronizer l(*_accum_lock); // lock while packing.
333 for (int i = 0; i < requests.elements(); i++) {
334 if (!requests[i] || !requests[i]->_data) {
335 // this is a screw-up by someone.
336 LOG("error in infoton_list; missing data element.");
339 cromp_transaction::flatten(*_sendings, *requests[i]->_data,
344 return push_outgoing(max_tries);
347 bool cromp_common::buffer_clog(int max_buff) const
349 auto_synchronizer l(*_accum_lock);
350 return _sendings->length() >= max_buff;
353 outcome cromp_common::pack_and_ship(const infoton &request,
354 const octopus_request_id &item_id, int max_tries)
356 #ifdef DEBUG_CROMP_COMMON
357 FUNCDEF("pack_and_ship [single]");
359 if (!_commlink) return BAD_INPUT; // they haven't opened this yet.
360 conditional_cleaning();
362 #ifdef DEBUG_CROMP_COMMON
363 LOG(astring("sending req ") + item_id.mangled_form());
367 auto_synchronizer l(*_accum_lock); // lock while packing.
368 cromp_transaction::flatten(*_sendings, request, item_id);
371 return push_outgoing(max_tries);
374 outcome cromp_common::push_outgoing(int max_tries)
376 FUNCDEF("push_outgoing");
378 if (!max_tries) return cromp_common::OKAY;
379 // no tries means we're done already.
381 grab_anything(false); // suck any data in that happens to be waiting.
383 outcome to_return = cromp_common::TOO_FULL;
385 while ( (attempts++ < max_tries) && (to_return == cromp_common::TOO_FULL) ) {
386 to_return = send_buffer();
387 grab_anything(false); // suck any data in that happens to be waiting.
388 if (to_return == cromp_common::OKAY)
389 break; // happy returns.
390 if (to_return == cromp_common::PARTIAL) {
391 // we sent all we tried to but there's more left.
392 attempts = 0; // skip back since we had a successful attempt.
393 to_return = cromp_common::TOO_FULL;
394 // reset so that we treat this by staying in the send loop.
395 continue; // jump back without waiting.
397 if (to_return == cromp_common::TOO_FULL) {
398 // we can't send any more yet so delay for a bit to see if we can get
400 time_stamp stop_pausing(SEND_DELAY_TIME);
401 while (time_stamp() < stop_pausing) {
402 LOG("into too full looping...");
403 if (!_commlink->connected()) break;
404 grab_anything(true); // suck any data in that happens to be waiting.
405 // snooze a bit until we think we can write again.
406 outcome ret = _commlink->await_writable(QUICK_CROMP_SNOOZE);
407 if (ret != spocket::NONE_READY)
411 LOG(astring("failed send: ") + cromp_common::outcome_name(to_return));
418 // rules for send_buffer: this function is in the lowest-level tier for using
419 // the spocket. it is allowed to be called by anyone. it must not call any
420 // other functions on the cromp_common class.
421 outcome cromp_common::send_buffer()
423 #ifdef DEBUG_CROMP_COMMON
424 FUNCDEF("send_buffer");
426 auto_synchronizer l(*_accum_lock);
428 // all done if nothing to send.
429 if (!_sendings->length())
432 int size_to_send = minimum(_sendings->length(), MAXIMUM_SEND);
433 #ifdef DEBUG_CROMP_COMMON
434 // LOG(a_sprintf("sending %d bytes on socket %d.", size_to_send,
435 // _commlink->OS_socket()));
439 outcome send_ret = _commlink->send(_sendings->observe(), size_to_send,
441 switch (send_ret.value()) {
442 case spocket::OKAY: {
444 #ifdef DEBUG_CROMP_COMMON
445 // LOG(a_sprintf("really sent %d bytes on socket %d.", len_sent,
446 // _commlink->OS_socket()));
448 _bytes_sent_total += len_sent;
452 case spocket::PARTIAL: {
453 // got something done hopefully.
454 #ifdef DEBUG_CROMP_COMMON
455 LOG(a_sprintf("partial send of %d bytes (of %d desired) on socket %d.",
456 len_sent, size_to_send, _commlink->OS_socket()));
458 _bytes_sent_total += len_sent;
462 case spocket::NONE_READY: {
463 // did nothing useful.
464 #ifdef DEBUG_CROMP_COMMON
465 LOG(a_sprintf("too full to send any on socket %d.",
466 _commlink->OS_socket()));
468 len_sent = 0; // reset just in case.
469 to_return = TOO_FULL;
473 // other things went wrong.
474 #ifdef DEBUG_CROMP_COMMON
475 LOG(astring("failing send with ") + spocket::outcome_name(send_ret));
477 len_sent = 0; // reset just in case.
479 //hmmm: these are unnecessary now since it's the same set of outcomes.
480 if (send_ret == spocket::NO_CONNECTION) to_return = NO_CONNECTION;
481 else if (send_ret == spocket::TIMED_OUT) to_return = TIMED_OUT;
483 else to_return = DISALLOWED;
485 #ifdef DEBUG_CROMP_COMMON
486 LOG(astring("failed to send--got error ") + outcome_name(to_return));
492 if ( (to_return == PARTIAL) || (to_return == OKAY) ) {
493 // accomodate our latest activity on the socket.
494 _sendings->zap(0, len_sent - 1); // sent just some of it.
500 outcome cromp_common::retrieve_and_restore_root(bool get_anything,
501 infoton * &item, octopus_request_id &req_id, int timeout)
503 FUNCDEF("retrieve_and_restore_root");
505 if (!_commlink) return BAD_INPUT; // they haven't opened this yet.
506 octopus_request_id tmp_id;
507 time_stamp leaving_time(timeout);
509 conditional_cleaning();
512 // check if it's already in the bin from someone else grabbing it.
514 item = _requests->acquire_for_any(req_id);
516 item = _requests->acquire_for_identifier(req_id);
520 // check to see if there's any data.
521 grab_anything(timeout? true : false);
524 //hmmm: parameterize the push?
526 // check again just to make sure. this is before we check the timeout,
527 // since we could squeak in with something before that.
529 item = _requests->acquire_for_any(req_id);
531 item = _requests->acquire_for_identifier(req_id);
535 if (!timeout) return TIMED_OUT;
536 // timeout is not set so we leave right away.
538 if (!_commlink->connected()) return NO_CONNECTION;
540 // keep going if we haven't seen it yet and still have time.
541 } while (time_stamp() < leaving_time);
545 outcome cromp_common::retrieve_and_restore(infoton * &item,
546 const octopus_request_id &req_id_in, int timeout)
548 octopus_request_id req_id = req_id_in;
549 return retrieve_and_restore_root(false, item, req_id, timeout);
552 outcome cromp_common::retrieve_and_restore_any(infoton * &item,
553 octopus_request_id &req_id, int timeout)
554 { return retrieve_and_restore_root(true, item, req_id, timeout); }
556 // rules: snarf_from_socket is in the second lowest-level tier. it must not
557 // call any other functions on cromp_common besides the send_buffer and
558 // process_accumulator methods.
559 void cromp_common::snarf_from_socket(bool wait)
561 #ifdef DEBUG_CROMP_COMMON
562 FUNCDEF("snarf_from_socket");
565 #ifdef DEBUG_CROMP_COMMON
566 // LOG(a_sprintf("awaiting rcptblty on socket %d.", _commlink->OS_socket()));
568 // snooze until data seems ready for chewing or until we time out.
569 time_stamp stop_pausing(DATA_AWAIT_SNOOZE);
570 while (time_stamp() < stop_pausing) {
571 if (!_commlink->connected()) return;
572 outcome wait_ret = _commlink->await_readable(QUICK_CROMP_SNOOZE);
573 if (wait_ret != spocket::NONE_READY)
575 send_buffer(); // push out some data in between.
579 outcome rcv_ret = spocket::OKAY;
580 // this loop scrounges as much data as possible, within limits.
582 while ( (rcv_ret == spocket::OKAY) && (receptions++ < MAXIMUM_RECEIVES) ) {
583 int rcv_size = CROMP_BUFFER_CHUNK_SIZE;
585 auto_synchronizer l(*_accum_lock);
586 _receive_buffer->reset(); // clear pre-existing junk.
587 rcv_ret = _commlink->receive(*_receive_buffer, rcv_size);
588 #ifdef DEBUG_CROMP_COMMON
589 if ( (rcv_ret == spocket::OKAY) && rcv_size) {
590 LOG(a_sprintf("received %d bytes on socket %d", rcv_size,
591 _commlink->OS_socket()));
592 } else if (rcv_ret != spocket::NONE_READY) {
593 LOG(a_sprintf("no data on sock %d--outcome=", _commlink->OS_socket())
594 + spocket::outcome_name(rcv_ret));
597 if ( (rcv_ret == spocket::OKAY) && rcv_size) {
598 // we got some data from the receive, so store it.
599 _bytes_received_total += _receive_buffer->length();
600 *_accumulator += *_receive_buffer; // add to overall accumulator.
601 _last_data_seen->reset();
606 // force data to go out also.
610 void cromp_common::grab_anything(bool wait)
612 snarf_from_socket(wait); // get any data that's waiting.
613 process_accumulator(); // retrieve any commands we see.
616 #define CHECK_STALENESS \
617 if (*_last_data_seen < time_stamp(-STALENESS_PERIOD)) { \
618 LOG("would resynch data due to staleness."); \
619 _accumulator->zap(0, 0); /* roast first byte */ \
620 cromp_transaction::resynchronize(*_accumulator); \
621 _last_data_seen->reset(); \
625 // process_accumulator should do nothing besides chewing on the buffer.
626 // this puts it in the lowest-level tier.
628 void cromp_common::process_accumulator()
630 FUNCDEF("process_accumulator");
631 infoton *item = NULL_POINTER;
632 octopus_request_id req_id;
636 if (!_accumulator->length()) return;
638 // a little gymnastics to get a large buffer on the first try.
639 byte_array temp_chow_buffer(CROMP_BUFFER_CHUNK_SIZE, NULL_POINTER);
640 temp_chow_buffer.reset();
644 while (_accumulator->length()) {
645 LOG(a_sprintf("eating command %d", cmds_found++));
647 // first block tries to extract data from the accumulator.
648 auto_synchronizer l(*_accum_lock);
649 // there are some contents; let's look at them.
650 int packed_length = 0;
651 outcome peek_ret = cromp_transaction::peek_header(*_accumulator,
653 if ( (peek_ret == cromp_transaction::WAY_TOO_SMALL)
654 || (peek_ret == cromp_transaction::PARTIAL) ) {
658 } else if (peek_ret != cromp_transaction::OKAY) {
659 LOG(astring("error unpacking--peek error=")
660 + cromp_transaction::outcome_name(peek_ret));
661 // try to get to a real command.
662 _accumulator->zap(0, 0); // roast first byte.
663 if (cromp_transaction::resynchronize(*_accumulator)) continue;
667 #ifdef DEBUG_CROMP_COMMON
668 LOG("seeing command ready");
670 // temp buffer for undoing cromp transaction.
671 if (!cromp_transaction::unflatten(*_accumulator, *_still_flat, req_id)) {
672 LOG("failed to unpack even though peek was happy!");
673 // try to get to a real command.
674 _accumulator->zap(0, 0); // roast first byte.
675 if (cromp_transaction::resynchronize(*_accumulator)) continue;
678 #ifdef DEBUG_CROMP_COMMON
679 LOG(astring("got req id of ") + req_id.mangled_form());
682 // now unwrap the onion a bit more to find the real object being sent.
683 if (!infoton::fast_unpack(*_still_flat, clas, temp_chow_buffer)) {
684 // try to resynch on transaction boundary.
685 LOG("failed to get back a packed infoton!");
686 _accumulator->zap(0, 0); // roast first byte.
687 if (cromp_transaction::resynchronize(*_accumulator)) continue;
690 #ifdef DEBUG_CROMP_COMMON
691 LOG(astring("got classifier of ") + clas.text_form());
693 } // end of protected area.
695 // restore the infoton from the packed form.
696 outcome rest_ret = octo()->restore(clas, temp_chow_buffer, item);
697 if (rest_ret != tentacle::OKAY) {
698 #ifdef DEBUG_CROMP_COMMON
699 LOG(astring("our octopus couldn't restore the packed data! ")
700 + outcome_name(rest_ret));
702 // publish an unhandled request back to the requestor.
703 _requests->add_item(new unhandled_request(req_id, clas, rest_ret),
706 // we finally have reached a point where we have a valid infoton.
707 if (_requests->add_item(item, req_id))
709 #ifdef DEBUG_CROMP_COMMON
711 LOG("failed to add item to bin due to space constraints.");
714 LOG(a_sprintf("ate command %d", cmds_found));
716 /// LOG(a_sprintf("added %d commands", cmds_found));
719 bool cromp_common::decode_host(const astring &coded_host, astring &hostname,
720 machine_uid &machine)
722 if (coded_host.length() < HOSTCHOP) return false; // not big enough.
723 hostname = coded_host.substring(0, cromp_common::HOSTCHOP - 1);
724 const astring compact_uid = coded_host.substring(cromp_common::HOSTCHOP,
725 coded_host.length() - 1);
726 machine = machine_uid::expand(compact_uid);
727 if (!machine.valid()) return false;