feisty meow concerns codebase  2.140
cromp_common.cpp
Go to the documentation of this file.
1 /*****************************************************************************\
2 * *
3 * Name : cromp_common *
4 * Author : Chris Koeritz *
5 * *
6 *******************************************************************************
7 * Copyright (c) 2000-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
14 
15 // NOTES:
16 //
17 // for a cromp_common that is "normal", the base octopus will be used for
18 // restoring infotons.
19 // for a dependent cromp_common with a singleton and preexisting socket,
20 // the socket will be used for communications and the singleton octopus will
21 // be used for restore().
22 //
23 // there are a few tiers of methods here. the lowest-level tier can be
24 // called by any other functions except those in the lowest-level (so being on
25 // tier A implies that a method may not call other methods in tier A, but being
26 // on a tier X allows calling of all existent tiers X-1, X-2, ...).
27 
28 // last verified that conditions stated in header about variables protected
29 // by accumulator lock are true: 12/30/2002.
30 
31 #include "cromp_common.h"
32 #include "cromp_transaction.h"
33 
34 #include <basis/byte_array.h>
35 #include <basis/functions.h>
36 #include <basis/astring.h>
37 #include <basis/mutex.h>
38 #include <crypto/rsa_crypto.h>
41 #include <octopus/entity_defs.h>
42 #include <octopus/infoton.h>
43 #include <octopus/octopus.h>
44 #include <octopus/tentacle.h>
47 #include <sockets/machine_uid.h>
48 #include <sockets/spocket.h>
49 #include <sockets/tcpip_stack.h>
52 #include <textual/byte_formatter.h>
53 #include <timely/time_stamp.h>
54 
55 using namespace basis;
56 using namespace crypto;
57 using namespace loggers;
58 using namespace octopi;
59 using namespace sockets;
60 using namespace structures;
61 using namespace textual;
62 using namespace timely;
63 
64 namespace cromp {
65 
66 //#define DEBUG_CROMP_COMMON
67  // uncomment for debugging info.
68 
69 #undef LOG
70 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(to_print).s())
71 
72 const int STALENESS_PERIOD = 2 * MINUTE_ms;
73  // if data sits in the buffer this long without us seeing more, we assume
74  // it's gone stale.
75 
76 const int SEND_DELAY_TIME = 200;
77  // if the send failed initially, we'll delay this long before trying again.
78 
79 const int DATA_AWAIT_SNOOZE = 80;
80  // we sleep for this long while we await data.
81 
82 const int QUICK_CROMP_SNOOZE = 28;
83  // we take a quick nap if we're looking for some data and it's not there
84  // for us yet.
85 
87  // the initial allocation size for buffers.
88 
89 const int MAXIMUM_RECEIVES = 70;
90  // the maximum number of receptions before we skip to next phase.
91 
92 const int MAXIMUM_SEND = 128 * KILOBYTE;
93  // the largest chunk we try to send at a time. we want to limit this
94  // rather than continually asking the OS to consume a big transmission.
95 
96 const int CLEANUP_INTERVAL = 28 * SECOND_ms;
97  // this is how frequently we'll flush out items from our data bin that
98  // are too old.
99 
100 const int cromp_common::HOSTCHOP = 6;
101  // we take this many characters as the readable textual portion of the
102  // hostname.
103 
104 double cromp_common::_bytes_sent_total = 0.0;
105 double cromp_common::_bytes_received_total = 0.0;
106 
107  SAFE_STATIC_CONST(rsa_crypto, _hidden_localhost_only_key,
108  (encryption_infoton::RSA_KEY_SIZE))
109  const rsa_crypto &cromp_common::localhost_only_key() {
110 #ifdef DEBUG_CROMP_COMMON
111  FUNCDEF("localhost_only_key");
112 #endif
113  static bool initted = false;
114 #ifdef DEBUG_CROMP_COMMON
115  bool was_initted = initted;
116 #endif
117  initted = true;
118 #ifdef DEBUG_CROMP_COMMON
119  if (!was_initted)
120  LOG("started creating localhost RSA key.");
121 #endif
122  const rsa_crypto &to_return = _hidden_localhost_only_key();
123 #ifdef DEBUG_CROMP_COMMON
124  if (!was_initted)
125  LOG("done creating localhost RSA key.");
126 #endif
127  return to_return;
128  }
129 
130 cromp_common::cromp_common(const astring &host, int max_per_ent)
131 : _commlink(NULL_POINTER),
132  _octopus(new octopus(host, max_per_ent)),
133  _singleton(NULL_POINTER),
134  _requests(new entity_data_bin(max_per_ent)),
135  _accum_lock(new mutex),
136  _last_data_seen(new time_stamp),
137  _accumulator(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NULL_POINTER)),
139  _receive_buffer(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NULL_POINTER)),
141  _last_cleanup(new time_stamp)
142 {
143  FUNCDEF("constructor [host/max_per_ent]");
144  // clear pre-existing space.
145  _accumulator->reset();
146  _sendings->reset();
147  _receive_buffer->reset();
148  _still_flat->reset();
149 }
150 
151 cromp_common::cromp_common(spocket *preexisting, octopus *singleton)
152 : _commlink(preexisting),
153  _octopus(singleton),
154  _singleton(singleton),
155  _requests(new entity_data_bin(singleton?
156  singleton->responses().max_bytes_per_entity()
158  _accum_lock(new mutex),
159  _last_data_seen(new time_stamp),
160  _accumulator(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NULL_POINTER)),
162  _receive_buffer(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NULL_POINTER)),
164  _last_cleanup(new time_stamp)
165 {
166  FUNCDEF("constructor [preexisting/singleton]");
167  if (!_octopus) {
168  // they passed us a bad singleton. carry on as best we can.
169  LOG("singleton passed as NULL_POINTER; constructing new octopus instead.");
170  internet_address local(internet_address::localhost(), "localhost", 0);
171  _octopus = new octopus(chew_hostname(local), DEFAULT_MAX_ENTITY_QUEUE);
172  }
173  // clear pre-existing space.
174  _accumulator->reset();
175  _sendings->reset();
176  _receive_buffer->reset();
177  _still_flat->reset();
178 }
179 
181 {
182  FUNCDEF("destructor");
183  close_common(); // shuts down our socket and other stuff.
184  if (_singleton) {
185  _singleton = NULL_POINTER; // reset the pointer we had.
186  _octopus = NULL_POINTER; // ditto.
187  } else {
188  // this one was ours so we need to clean it up.
189  WHACK(_octopus);
190  }
191  WHACK(_accumulator);
192  WHACK(_sendings);
193  WHACK(_commlink);
194  WHACK(_requests);
195  WHACK(_last_cleanup);
196  WHACK(_last_data_seen);
197  WHACK(_receive_buffer);
198  WHACK(_still_flat);
199  WHACK(_accum_lock);
200 }
201 
202 spocket *cromp_common::spock() const { return _commlink; }
203 
204 int cromp_common::default_port() { return 10008; }
205 
207 { return _octopus->add_tentacle(to_add, filter); }
208 
210 {
211  auto_synchronizer l(*_accum_lock);
212  return _sendings->length();
213 }
214 
216 {
217  auto_synchronizer l(*_accum_lock);
218  return _accumulator->length();
219 }
220 
222  internet_address *resolved_form)
223 {
224 #ifdef DEBUG_CROMP_COMMON
225  FUNCDEF("chew_hostname");
226  LOG(astring("addr coming in ") + addr.text_form());
227 #endif
229  bool worked;
230  internet_address res1 = stack.fill_and_resolve(addr.hostname, addr.port,
231  worked);
232  if (worked) {
233  if (resolved_form) *resolved_form = res1;
234 #ifdef DEBUG_CROMP_COMMON
235  LOG(astring("resolved addr ") + res1.text_form());
236 #endif
237  } else {
238 #ifdef DEBUG_CROMP_COMMON
239  LOG(astring("failed to resolve host=") + addr.hostname);
240 #endif
241  }
242 
243  // get a readable form of the host.
244  astring just_host = res1.normalize_host();
245  while (just_host.length() < HOSTCHOP) just_host += "-"; // filler.
246  machine_uid converted = res1.convert();
247  astring to_return = just_host.substring(0, HOSTCHOP - 1);
248  to_return += converted.compact_form();
249 
250 #ifdef DEBUG_CROMP_COMMON
251  LOG(astring("returning machid ") + converted.text_form() + ", packed as "
252  + parser_bits::platform_eol_to_chars()
253  + byte_formatter::text_dump((abyte *)to_return.s(),
254  to_return.length() + 1));
255 #endif
256 
257  return to_return;
258 }
259 
261 { return _requests->text_form(); }
262 
264 {
265  if (!_commlink) return internet_address();
266  return _commlink->where();
267 }
268 
270 { return _requests->max_bytes_per_entity(); }
271 
272 void cromp_common::max_bytes_per_entity(int max_bytes_per_entity)
273 {
276 }
277 
278 void cromp_common::conditional_cleaning()
279 {
280  FUNCDEF("conditional_cleaning");
281  if (time_stamp(-CLEANUP_INTERVAL) > *_last_cleanup) {
282  _requests->clean_out_deadwood();
283  // flush any items that are too old.
284  _last_cleanup->reset();
285  // record that we just cleaned up.
286  }
287 }
288 
290 {
291 #ifdef DEBUG_CROMP_COMMON
292  FUNCDEF("open_common");
293 #endif
294  if (_singleton && _commlink)
295  return OKAY; // done if this uses pre-existing objects.
296 
297  if (_commlink) WHACK(_commlink); // clean up any pre-existing socket.
298 
300 
301 #ifdef DEBUG_CROMP_COMMON
302  LOG(astring("opening at ") + other_side.text_form());
303 #endif
304  _commlink = new spocket(other_side);
305 //hmmm: check socket health.
306 
307  return OKAY;
308 }
309 
311 {
312  if (_commlink) _commlink->disconnect(); // make the thread stop bothering.
313  return OKAY;
314 }
315 
316 const char *cromp_common::outcome_name(const outcome &to_name)
317 {
318  switch (to_name.value()) {
319  case TOO_FULL: return "TOO_FULL";
320  case PARTIAL: return "PARTIAL";
321  default: return communication_commons::outcome_name(to_name);
322  }
323 }
324 
326  int max_tries)
327 {
328  FUNCDEF("pack_and_ship [multiple]");
329  if (!_commlink) return BAD_INPUT; // they haven't opened this yet.
330  conditional_cleaning();
331  {
332  auto_synchronizer l(*_accum_lock); // lock while packing.
333  for (int i = 0; i < requests.elements(); i++) {
334  if (!requests[i] || !requests[i]->_data) {
335  // this is a screw-up by someone.
336  LOG("error in infoton_list; missing data element.");
337  continue;
338  }
339  cromp_transaction::flatten(*_sendings, *requests[i]->_data,
340  requests[i]->_id);
341  }
342  }
343 
344  return push_outgoing(max_tries);
345 }
346 
347 bool cromp_common::buffer_clog(int max_buff) const
348 {
349  auto_synchronizer l(*_accum_lock);
350  return _sendings->length() >= max_buff;
351 }
352 
354  const octopus_request_id &item_id, int max_tries)
355 {
356 #ifdef DEBUG_CROMP_COMMON
357  FUNCDEF("pack_and_ship [single]");
358 #endif
359  if (!_commlink) return BAD_INPUT; // they haven't opened this yet.
360  conditional_cleaning();
361 
362 #ifdef DEBUG_CROMP_COMMON
363  LOG(astring("sending req ") + item_id.mangled_form());
364 #endif
365 
366  {
367  auto_synchronizer l(*_accum_lock); // lock while packing.
368  cromp_transaction::flatten(*_sendings, request, item_id);
369  }
370 
371  return push_outgoing(max_tries);
372 }
373 
375 {
376  FUNCDEF("push_outgoing");
377 
378  if (!max_tries) return cromp_common::OKAY;
379  // no tries means we're done already.
380 
381  grab_anything(false); // suck any data in that happens to be waiting.
382 
383  outcome to_return = cromp_common::TOO_FULL;
384  int attempts = 0;
385  while ( (attempts++ < max_tries) && (to_return == cromp_common::TOO_FULL) ) {
386  to_return = send_buffer();
387  grab_anything(false); // suck any data in that happens to be waiting.
388  if (to_return == cromp_common::OKAY)
389  break; // happy returns.
390  if (to_return == cromp_common::PARTIAL) {
391  // we sent all we tried to but there's more left.
392  attempts = 0; // skip back since we had a successful attempt.
393  to_return = cromp_common::TOO_FULL;
394  // reset so that we treat this by staying in the send loop.
395  continue; // jump back without waiting.
396  }
397  if (to_return == cromp_common::TOO_FULL) {
398  // we can't send any more yet so delay for a bit to see if we can get
399  // some more out.
400  time_stamp stop_pausing(SEND_DELAY_TIME);
401  while (time_stamp() < stop_pausing) {
402 LOG("into too full looping...");
403  if (!_commlink->connected()) break;
404  grab_anything(true); // suck any data in that happens to be waiting.
405  // snooze a bit until we think we can write again.
406  outcome ret = _commlink->await_writable(QUICK_CROMP_SNOOZE);
407  if (ret != spocket::NONE_READY)
408  break;
409  }
410  } else {
411  LOG(astring("failed send: ") + cromp_common::outcome_name(to_return));
412  break;
413  }
414  }
415  return to_return;
416 }
417 
418 // rules for send_buffer: this function is in the lowest-level tier for using
419 // the spocket. it is allowed to be called by anyone. it must not call any
420 // other functions on the cromp_common class.
422 {
423 #ifdef DEBUG_CROMP_COMMON
424  FUNCDEF("send_buffer");
425 #endif
426  auto_synchronizer l(*_accum_lock);
427 
428  // all done if nothing to send.
429  if (!_sendings->length())
430  return OKAY;
431 
432  int size_to_send = minimum(_sendings->length(), MAXIMUM_SEND);
433 #ifdef DEBUG_CROMP_COMMON
434 // LOG(a_sprintf("sending %d bytes on socket %d.", size_to_send,
435 // _commlink->OS_socket()));
436 #endif
437  int len_sent = 0;
438  outcome to_return;
439  outcome send_ret = _commlink->send(_sendings->observe(), size_to_send,
440  len_sent);
441  switch (send_ret.value()) {
442  case spocket::OKAY: {
443  // success.
444 #ifdef DEBUG_CROMP_COMMON
445 // LOG(a_sprintf("really sent %d bytes on socket %d.", len_sent,
446 // _commlink->OS_socket()));
447 #endif
448  _bytes_sent_total += len_sent;
449  to_return = OKAY;
450  break;
451  }
452  case spocket::PARTIAL: {
453  // got something done hopefully.
454 #ifdef DEBUG_CROMP_COMMON
455  LOG(a_sprintf("partial send of %d bytes (of %d desired) on socket %d.",
456  len_sent, size_to_send, _commlink->OS_socket()));
457 #endif
458  _bytes_sent_total += len_sent;
459  to_return = PARTIAL;
460  break;
461  }
462  case spocket::NONE_READY: {
463  // did nothing useful.
464 #ifdef DEBUG_CROMP_COMMON
465  LOG(a_sprintf("too full to send any on socket %d.",
466  _commlink->OS_socket()));
467 #endif
468  len_sent = 0; // reset just in case.
469  to_return = TOO_FULL;
470  break;
471  }
472  default: {
473  // other things went wrong.
474 #ifdef DEBUG_CROMP_COMMON
475  LOG(astring("failing send with ") + spocket::outcome_name(send_ret));
476 #endif
477  len_sent = 0; // reset just in case.
478 
479 //hmmm: these are unnecessary now since it's the same set of outcomes.
480  if (send_ret == spocket::NO_CONNECTION) to_return = NO_CONNECTION;
481  else if (send_ret == spocket::TIMED_OUT) to_return = TIMED_OUT;
482 //any other ideas?
483  else to_return = DISALLOWED;
484 
485 #ifdef DEBUG_CROMP_COMMON
486  LOG(astring("failed to send--got error ") + outcome_name(to_return));
487 #endif
488  break;
489  }
490  }
491 
492  if ( (to_return == PARTIAL) || (to_return == OKAY) ) {
493  // accomodate our latest activity on the socket.
494  _sendings->zap(0, len_sent - 1); // sent just some of it.
495  }
496 
497  return to_return;
498 }
499 
500 outcome cromp_common::retrieve_and_restore_root(bool get_anything,
501  infoton * &item, octopus_request_id &req_id, int timeout)
502 {
503  FUNCDEF("retrieve_and_restore_root");
504  item = NULL_POINTER;
505  if (!_commlink) return BAD_INPUT; // they haven't opened this yet.
506  octopus_request_id tmp_id;
507  time_stamp leaving_time(timeout);
508 
509  conditional_cleaning();
510 
511  do {
512  // check if it's already in the bin from someone else grabbing it.
513  if (get_anything)
514  item = _requests->acquire_for_any(req_id);
515  else
516  item = _requests->acquire_for_identifier(req_id);
517  if (item)
518  return OKAY;
519 
520  // check to see if there's any data.
521  grab_anything(timeout? true : false);
522 
523  push_outgoing(1);
524 //hmmm: parameterize the push?
525 
526  // check again just to make sure. this is before we check the timeout,
527  // since we could squeak in with something before that.
528  if (get_anything)
529  item = _requests->acquire_for_any(req_id);
530  else
531  item = _requests->acquire_for_identifier(req_id);
532  if (item)
533  return OKAY;
534 
535  if (!timeout) return TIMED_OUT;
536  // timeout is not set so we leave right away.
537 
538  if (!_commlink->connected()) return NO_CONNECTION;
539 
540  // keep going if we haven't seen it yet and still have time.
541  } while (time_stamp() < leaving_time);
542  return TIMED_OUT;
543 }
544 
546  const octopus_request_id &req_id_in, int timeout)
547 {
548  octopus_request_id req_id = req_id_in;
549  return retrieve_and_restore_root(false, item, req_id, timeout);
550 }
551 
553  octopus_request_id &req_id, int timeout)
554 { return retrieve_and_restore_root(true, item, req_id, timeout); }
555 
556 // rules: snarf_from_socket is in the second lowest-level tier. it must not
557 // call any other functions on cromp_common besides the send_buffer and
558 // process_accumulator methods.
559 void cromp_common::snarf_from_socket(bool wait)
560 {
561 #ifdef DEBUG_CROMP_COMMON
562  FUNCDEF("snarf_from_socket");
563 #endif
564  if (wait) {
565 #ifdef DEBUG_CROMP_COMMON
566 // LOG(a_sprintf("awaiting rcptblty on socket %d.", _commlink->OS_socket()));
567 #endif
568  // snooze until data seems ready for chewing or until we time out.
569  time_stamp stop_pausing(DATA_AWAIT_SNOOZE);
570  while (time_stamp() < stop_pausing) {
571  if (!_commlink->connected()) return;
572  outcome wait_ret = _commlink->await_readable(QUICK_CROMP_SNOOZE);
573  if (wait_ret != spocket::NONE_READY)
574  break;
575  send_buffer(); // push out some data in between.
576  }
577  }
578 
579  outcome rcv_ret = spocket::OKAY;
580  // this loop scrounges as much data as possible, within limits.
581  int receptions = 0;
582  while ( (rcv_ret == spocket::OKAY) && (receptions++ < MAXIMUM_RECEIVES) ) {
583  int rcv_size = CROMP_BUFFER_CHUNK_SIZE;
584  {
585  auto_synchronizer l(*_accum_lock);
586  _receive_buffer->reset(); // clear pre-existing junk.
587  rcv_ret = _commlink->receive(*_receive_buffer, rcv_size);
588 #ifdef DEBUG_CROMP_COMMON
589  if ( (rcv_ret == spocket::OKAY) && rcv_size) {
590  LOG(a_sprintf("received %d bytes on socket %d", rcv_size,
591  _commlink->OS_socket()));
592  } else if (rcv_ret != spocket::NONE_READY) {
593  LOG(a_sprintf("no data on sock %d--outcome=", _commlink->OS_socket())
594  + spocket::outcome_name(rcv_ret));
595  }
596 #endif
597  if ( (rcv_ret == spocket::OKAY) && rcv_size) {
598  // we got some data from the receive, so store it.
599  _bytes_received_total += _receive_buffer->length();
600  *_accumulator += *_receive_buffer; // add to overall accumulator.
601  _last_data_seen->reset();
602  }
603  }
604 
605  send_buffer();
606  // force data to go out also.
607  }
608 }
609 
611 {
612  snarf_from_socket(wait); // get any data that's waiting.
613  process_accumulator(); // retrieve any commands we see.
614 }
615 
616 #define CHECK_STALENESS \
617  if (*_last_data_seen < time_stamp(-STALENESS_PERIOD)) { \
618  LOG("would resynch data due to staleness."); \
619  _accumulator->zap(0, 0); /* roast first byte */ \
620  cromp_transaction::resynchronize(*_accumulator); \
621  _last_data_seen->reset(); \
622  continue; \
623  }
624 
625 // process_accumulator should do nothing besides chewing on the buffer.
626 // this puts it in the lowest-level tier.
627 
628 void cromp_common::process_accumulator()
629 {
630  FUNCDEF("process_accumulator");
631  infoton *item = NULL_POINTER;
632  octopus_request_id req_id;
633 
634  string_array clas;
635 
636  if (!_accumulator->length()) return;
637 
638  // a little gymnastics to get a large buffer on the first try.
640  temp_chow_buffer.reset();
641 
642  int cmds_found = 0;
643 
644  while (_accumulator->length()) {
645 LOG(a_sprintf("eating command %d", cmds_found++));
646  {
647  // first block tries to extract data from the accumulator.
648  auto_synchronizer l(*_accum_lock);
649  // there are some contents; let's look at them.
650  int packed_length = 0;
651  outcome peek_ret = cromp_transaction::peek_header(*_accumulator,
652  packed_length);
653  if ( (peek_ret == cromp_transaction::WAY_TOO_SMALL)
654  || (peek_ret == cromp_transaction::PARTIAL) ) {
655  // not ready yet.
657  return;
658  } else if (peek_ret != cromp_transaction::OKAY) {
659  LOG(astring("error unpacking--peek error=")
660  + cromp_transaction::outcome_name(peek_ret));
661  // try to get to a real command.
662  _accumulator->zap(0, 0); // roast first byte.
663  if (cromp_transaction::resynchronize(*_accumulator)) continue;
664  return;
665  }
666 
667 #ifdef DEBUG_CROMP_COMMON
668  LOG("seeing command ready");
669 #endif
670  // temp buffer for undoing cromp transaction.
671  if (!cromp_transaction::unflatten(*_accumulator, *_still_flat, req_id)) {
672  LOG("failed to unpack even though peek was happy!");
673  // try to get to a real command.
674  _accumulator->zap(0, 0); // roast first byte.
675  if (cromp_transaction::resynchronize(*_accumulator)) continue;
676  return;
677  }
678 #ifdef DEBUG_CROMP_COMMON
679  LOG(astring("got req id of ") + req_id.mangled_form());
680 #endif
681 
682  // now unwrap the onion a bit more to find the real object being sent.
683  if (!infoton::fast_unpack(*_still_flat, clas, temp_chow_buffer)) {
684  // try to resynch on transaction boundary.
685  LOG("failed to get back a packed infoton!");
686  _accumulator->zap(0, 0); // roast first byte.
687  if (cromp_transaction::resynchronize(*_accumulator)) continue;
688  return;
689  }
690 #ifdef DEBUG_CROMP_COMMON
691  LOG(astring("got classifier of ") + clas.text_form());
692 #endif
693  } // end of protected area.
694 
695  // restore the infoton from the packed form.
696  outcome rest_ret = octo()->restore(clas, temp_chow_buffer, item);
697  if (rest_ret != tentacle::OKAY) {
698 #ifdef DEBUG_CROMP_COMMON
699  LOG(astring("our octopus couldn't restore the packed data! ")
700  + outcome_name(rest_ret));
701 #endif
702  // publish an unhandled request back to the requestor.
703  _requests->add_item(new unhandled_request(req_id, clas, rest_ret),
704  req_id);
705  } else {
706  // we finally have reached a point where we have a valid infoton.
707  if (_requests->add_item(item, req_id))
708  cmds_found++;
709 #ifdef DEBUG_CROMP_COMMON
710  else
711  LOG("failed to add item to bin due to space constraints.");
712 #endif
713  }
714 LOG(a_sprintf("ate command %d", cmds_found));
715  }
717 }
718 
719 bool cromp_common::decode_host(const astring &coded_host, astring &hostname,
720  machine_uid &machine)
721 {
722  if (coded_host.length() < HOSTCHOP) return false; // not big enough.
723  hostname = coded_host.substring(0, cromp_common::HOSTCHOP - 1);
724  const astring compact_uid = coded_host.substring(cromp_common::HOSTCHOP,
725  coded_host.length() - 1);
726  machine = machine_uid::expand(compact_uid);
727  if (!machine.valid()) return false;
728  return true;
729 }
730 
731 } //namespace.
732 
a_sprintf is a specialization of astring that provides printf style support.
Definition: astring.h:440
void reset(int number=0, const contents *initial_contents=NULL_POINTER)
Resizes this array and sets the contents from an array of contents.
Definition: array.h:349
const contents * observe() const
Returns a pointer to the underlying C array of data.
Definition: array.h:172
int length() const
Returns the current reported length of the allocated C array.
Definition: array.h:115
outcome zap(int start, int end)
Deletes from "this" the objects inclusively between "start" and "end".
Definition: array.h:769
Provides a dynamically resizable ASCII character string.
Definition: astring.h:35
const char * s() const
synonym for observe. the 's' stands for "string", if that helps.
Definition: astring.h:113
bool substring(astring &target, int start, int end) const
a version that stores the substring in an existing "target" string.
Definition: astring.cpp:865
int length() const
Returns the current length of the string.
Definition: astring.cpp:132
auto_synchronizer simplifies concurrent code by automatically unlocking.
Definition: mutex.h:113
A very common template for a dynamic array of bytes.
Definition: byte_array.h:36
Outcomes describe the state of completion for an operation.
Definition: outcome.h:31
int value() const
Definition: outcome.h:51
static const char * outcome_name(const basis::outcome &to_name)
octopi::octopus * octo() const
Definition: cromp_common.h:176
static const int HOSTCHOP
Definition: cromp_common.h:165
bool buffer_clog(int clog_point=1 *basis::MEGABYTE) const
basis::outcome open_common(const sockets::internet_address &where)
basis::outcome push_outgoing(int max_tries)
sockets::spocket * spock() const
cromp_common(const basis::astring &host, int max_per_ent)
basis::outcome retrieve_and_restore(octopi::infoton *&item, const octopi::octopus_request_id &req_id, int timeout)
basis::outcome close_common()
int accumulated_bytes() const
returns the number of bytes pending processing from the other side.
basis::outcome pack_and_ship(const octopi::infoton &request, const octopi::octopus_request_id &item_id, int max_tries)
sockets::internet_address other_side() const
static int default_port()
basis::outcome retrieve_and_restore_any(octopi::infoton *&item, octopi::octopus_request_id &req_id, int timeout)
static basis::astring chew_hostname(const sockets::internet_address &addr, sockets::internet_address *resolved=NULL_POINTER)
int max_bytes_per_entity() const
basis::astring responses_text_form() const
static bool decode_host(const basis::astring &coded_host, basis::astring &hostname, sockets::machine_uid &machine)
int pending_sends() const
returns the number of bytes still unsent.
virtual basis::outcome add_tentacle(octopi::tentacle *to_add, bool filter=false)
basis::outcome send_buffer()
void grab_anything(bool wait)
static basis::outcome peek_header(const basis::byte_array &packed_form, int &length)
static bool resynchronize(basis::byte_array &packed_form)
static void flatten(basis::byte_array &packed_form, const octopi::infoton &request, const octopi::octopus_request_id &id)
static bool unflatten(basis::byte_array &packed_form, basis::byte_array &still_flat, octopi::octopus_request_id &id)
static const char * outcome_name(const basis::outcome &to_name)
Supports public key encryption and decryption.
Definition: rsa_crypto.h:33
Stores a set of infotons grouped by the entity that owns them.
int max_bytes_per_entity() const
bool add_item(infoton *to_add, const octopus_request_id &id)
infoton * acquire_for_any(octopus_request_id &id)
infoton * acquire_for_identifier(const octopus_request_id &id)
void clean_out_deadwood(int decay_interval=4 *basis::MINUTE_ms)
basis::astring text_form() const
a list of pending requests and who made them.
Definition: entity_defs.h:181
An infoton is an individual request parcel with accompanying information.
Definition: infoton.h:32
Identifies requests made on an octopus by users.
Definition: entity_defs.h:114
basis::astring mangled_form() const
similar to entity id.
Octopus is a design pattern for generalized request processing systems.
Definition: octopus.h:47
entity_data_bin & responses()
allows external access to our set of results.
Definition: octopus.cpp:178
basis::outcome add_tentacle(tentacle *to_add, bool filter=false)
hooks a tentacle in to provide processing of one type of infoton.
Definition: octopus.cpp:253
basis::outcome restore(const structures::string_array &classifier, basis::byte_array &packed_form, infoton *&reformed)
regenerates a packed infoton given its classifier.
Definition: octopus.cpp:318
Manages a service within an octopus by processing certain infotons.
Definition: tentacle.h:36
Informs the caller that a request type was unknown to the server octopus.
this type of address describes a destination out on the internet.
machine_uid convert() const
basis::astring normalize_host() const
basis::astring text_form() const
char hostname[MAXIMUM_HOSTNAME_LENGTH]
basis::astring text_form() const
Definition: machine_uid.cpp:83
basis::astring compact_form() const
Definition: machine_uid.cpp:99
bool valid() const
Definition: machine_uid.h:68
Abstraction for a higher-level BSD socket that is platform independent.
Definition: spocket.h:40
basis::outcome await_writable(int timeout)
Definition: spocket.cpp:256
basis::outcome send(const basis::abyte *buffer, int size, int &len_sent)
Definition: spocket.cpp:573
basis::un_int OS_socket()
Definition: spocket.h:108
basis::outcome disconnect()
Definition: spocket.cpp:188
basis::outcome await_readable(int timeout)
Definition: spocket.cpp:238
bool connected()
Definition: spocket.cpp:209
const internet_address & where() const
Definition: spocket.cpp:147
basis::outcome receive(basis::abyte *buffer, int &size)
Definition: spocket.cpp:664
Helpful functions for interacting with TCP/IP stacks.
Definition: tcpip_stack.h:38
int elements() const
the maximum number of elements currently allowed in this amorph.
Definition: amorph.h:66
An abstraction that represents a stack data structure.
Definition: stack.h:30
An array of strings with some additional helpful methods.
Definition: string_array.h:32
basis::astring text_form() const
A synonym for the text_format() method.
Definition: string_array.h:71
Represents a point in time relative to the operating system startup time.
Definition: time_stamp.h:38
void reset()
sets the stamp time back to now.
Definition: time_stamp.cpp:59
#define CHECK_STALENESS
#define LOG(to_print)
#define NULL_POINTER
The value representing a pointer to nothing.
Definition: definitions.h:32
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
Definition: enhance_cpp.h:57
The guards collection helps in testing preconditions and reporting errors.
Definition: array.h:30
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
Definition: functions.h:121
unsigned char abyte
A fairly important unit which is seldom defined...
Definition: definitions.h:51
const int SECOND_ms
Number of milliseconds in a second.
Definition: definitions.h:120
type minimum(type a, type b)
maximum returns the greater of two values.
Definition: functions.h:29
const int MINUTE_ms
Number of milliseconds in a minute.
Definition: definitions.h:121
const int KILOBYTE
Number of bytes in a kilobyte.
Definition: definitions.h:134
const int CROMP_BUFFER_CHUNK_SIZE
const int MAXIMUM_RECEIVES
const int SEND_DELAY_TIME
const int STALENESS_PERIOD
const int MAXIMUM_SEND
const int DEFAULT_MAX_ENTITY_QUEUE
the default size we allow per each entity.
Definition: cromp_common.h:28
const int CLEANUP_INTERVAL
const int DATA_AWAIT_SNOOZE
const int QUICK_CROMP_SNOOZE
A logger that sends to the console screen using the standard output device.
Provides access to the operating system's socket methods.
Definition: base_address.h:26
A dynamic container class that holds any kind of object via pointers.
Definition: amorph.h:55
#include <time.h>
Definition: earth_time.cpp:37
#define SAFE_STATIC_CONST(type, func_name, parms)
this version returns a constant object instead.