107d92ae337fa30d69239fef63e2338c4edfdb0b
[feisty_meow.git] / octopi / library / cromp / cromp_common.cpp
1 /*****************************************************************************\
2 *                                                                             *
3 *  Name   : cromp_common                                                      *
4 *  Author : Chris Koeritz                                                     *
5 *                                                                             *
6 *******************************************************************************
7 * Copyright (c) 2000-$now By Author.  This program is free software; you can  *
8 * redistribute it and/or modify it under the terms of the GNU General Public  *
9 * License as published by the Free Software Foundation; either version 2 of   *
10 * the License or (at your option) any later version.  This is online at:      *
11 *     http://www.fsf.org/copyleft/gpl.html                                    *
12 * Please send any updates to: fred@gruntose.com                               *
13 \*****************************************************************************/
14
15 // NOTES:
16 // 
17 //   for a cromp_common that is "normal", the base octopus will be used for
18 // restoring infotons.
19 //   for a dependent cromp_common with a singleton and preexisting socket,
20 // the socket will be used for communications and the singleton octopus will
21 // be used for restore().
22 //
23 //   there are a few tiers of methods here.  the lowest-level tier can be
24 // called by any other functions except those in the lowest-level (so being on
25 // tier A implies that a method may not call other methods in tier A, but being
26 // on a tier X allows calling of all existent tiers X-1, X-2, ...).
27
28 //   last verified that conditions stated in header about variables protected
29 // by accumulator lock are true: 12/30/2002.
30
31 #include "cromp_common.h"
32 #include "cromp_transaction.h"
33
34 #include <basis/byte_array.h>
35 #include <basis/functions.h>
36 #include <basis/astring.h>
37 #include <basis/mutex.h>
38 #include <crypto/rsa_crypto.h>
39 #include <loggers/program_wide_logger.h>
40 #include <octopus/entity_data_bin.h>
41 #include <octopus/entity_defs.h>
42 #include <octopus/infoton.h>
43 #include <octopus/octopus.h>
44 #include <octopus/tentacle.h>
45 #include <octopus/unhandled_request.h>
46 #include <sockets/internet_address.h>
47 #include <sockets/machine_uid.h>
48 #include <sockets/spocket.h>
49 #include <sockets/tcpip_stack.h>
50 #include <structures/static_memory_gremlin.h>
51 #include <tentacles/encryption_infoton.h>
52 #include <textual/byte_formatter.h>
53 #include <timely/time_stamp.h>
54
55 using namespace basis;
56 using namespace crypto;
57 using namespace loggers;
58 using namespace octopi;
59 using namespace sockets;
60 using namespace structures;
61 using namespace textual;
62 using namespace timely;
63
64 namespace cromp {
65
66 //#define DEBUG_CROMP_COMMON
67   // uncomment for debugging info.
68
69 #undef LOG
70 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(to_print).s())
71
72 const int STALENESS_PERIOD = 2 * MINUTE_ms;
73   // if data sits in the buffer this long without us seeing more, we assume
74   // it's gone stale.
75
76 const int SEND_DELAY_TIME = 200;
77   // if the send failed initially, we'll delay this long before trying again.
78
79 const int DATA_AWAIT_SNOOZE = 80;
80   // we sleep for this long while we await data.
81
82 const int QUICK_CROMP_SNOOZE = 28;
83   // we take a quick nap if we're looking for some data and it's not there
84   // for us yet.
85
86 const int CROMP_BUFFER_CHUNK_SIZE = 256 * KILOBYTE;
87   // the initial allocation size for buffers.
88
89 const int MAXIMUM_RECEIVES = 70;
90   // the maximum number of receptions before we skip to next phase.
91
92 const int MAXIMUM_SEND = 128 * KILOBYTE;
93   // the largest chunk we try to send at a time.  we want to limit this
94   // rather than continually asking the OS to consume a big transmission.
95
96 const int CLEANUP_INTERVAL = 28 * SECOND_ms;
97   // this is how frequently we'll flush out items from our data bin that
98   // are too old.
99
100 const int cromp_common::HOSTCHOP = 6;
101   // we take this many characters as the readable textual portion of the
102   // hostname.
103
104 double cromp_common::_bytes_sent_total = 0.0;
105 double cromp_common::_bytes_received_total = 0.0;
106
107   SAFE_STATIC_CONST(rsa_crypto, _hidden_localhost_only_key,
108       (encryption_infoton::RSA_KEY_SIZE))
109   const rsa_crypto &cromp_common::localhost_only_key() {
110 #ifdef DEBUG_CROMP_COMMON
111     FUNCDEF("localhost_only_key");
112 #endif
113     static bool initted = false;
114 #ifdef DEBUG_CROMP_COMMON
115     bool was_initted = initted;
116 #endif
117     initted = true;
118 #ifdef DEBUG_CROMP_COMMON
119     if (!was_initted)
120       LOG("started creating localhost RSA key.");
121 #endif
122     const rsa_crypto &to_return = _hidden_localhost_only_key();
123 #ifdef DEBUG_CROMP_COMMON
124     if (!was_initted)
125       LOG("done creating localhost RSA key.");
126 #endif
127     return to_return;
128   }
129
130 cromp_common::cromp_common(const astring &host, int max_per_ent)
131 : _commlink(NIL),
132   _octopus(new octopus(host, max_per_ent)),
133   _singleton(NIL),
134   _requests(new entity_data_bin(max_per_ent)),
135   _accum_lock(new mutex),
136   _last_data_seen(new time_stamp),
137   _accumulator(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
138   _sendings(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
139   _receive_buffer(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
140   _still_flat(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
141   _last_cleanup(new time_stamp)
142 {
143 //  FUNCDEF("constructor [host/max_per_ent]");
144   // clear pre-existing space.
145   _accumulator->reset();
146   _sendings->reset();
147   _receive_buffer->reset();
148   _still_flat->reset();
149 }
150
151 cromp_common::cromp_common(spocket *preexisting, octopus *singleton)
152 : _commlink(preexisting),
153   _octopus(singleton),
154   _singleton(singleton),
155   _requests(new entity_data_bin(singleton?
156       singleton->responses().max_bytes_per_entity()
157           : DEFAULT_MAX_ENTITY_QUEUE)),
158   _accum_lock(new mutex),
159   _last_data_seen(new time_stamp),
160   _accumulator(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
161   _sendings(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
162   _receive_buffer(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
163   _still_flat(new byte_array(CROMP_BUFFER_CHUNK_SIZE, NIL)),
164   _last_cleanup(new time_stamp)
165 {
166   FUNCDEF("constructor [preexisting/singleton]");
167   if (!_octopus) {
168     // they passed us a bad singleton.  carry on as best we can.
169     LOG("singleton passed as NIL; constructing new octopus instead.");
170     internet_address local(internet_address::localhost(), "localhost", 0);
171     _octopus = new octopus(chew_hostname(local), DEFAULT_MAX_ENTITY_QUEUE);
172   }
173   // clear pre-existing space.
174   _accumulator->reset();
175   _sendings->reset();
176   _receive_buffer->reset();
177   _still_flat->reset();
178 }
179
180 cromp_common::~cromp_common()
181 {
182 //  FUNCDEF("destructor");
183   close_common();  // shuts down our socket and other stuff.
184   if (_singleton) {
185     _singleton = NIL;  // reset the pointer we had.
186     _octopus = NIL;  // ditto.
187   } else {
188     // this one was ours so we need to clean it up.
189     WHACK(_octopus);
190   }
191   WHACK(_accumulator);
192   WHACK(_sendings);
193   WHACK(_commlink);
194   WHACK(_requests);
195   WHACK(_last_cleanup);
196   WHACK(_last_data_seen);
197   WHACK(_receive_buffer);
198   WHACK(_still_flat);
199   WHACK(_accum_lock);
200 }
201
202 spocket *cromp_common::spock() const { return _commlink; }
203
204 int cromp_common::default_port() { return 10008; }
205
206 outcome cromp_common::add_tentacle(tentacle *to_add, bool filter)
207 { return _octopus->add_tentacle(to_add, filter); }
208
209 int cromp_common::pending_sends() const
210 {
211   auto_synchronizer l(*_accum_lock);
212   return _sendings->length();
213 }
214
215 int cromp_common::accumulated_bytes() const
216 {
217   auto_synchronizer l(*_accum_lock);
218   return _accumulator->length();
219 }
220
221 astring cromp_common::chew_hostname(const internet_address &addr,
222     internet_address *resolved_form)
223 {
224 #ifdef DEBUG_CROMP_COMMON
225   FUNCDEF("chew_hostname");
226   LOG(astring("addr coming in ") + addr.text_form());
227 #endif
228   tcpip_stack stack;
229   bool worked;
230   internet_address res1 = stack.fill_and_resolve(addr.hostname, addr.port,
231       worked);
232   if (worked) {
233     if (resolved_form) *resolved_form = res1;
234 #ifdef DEBUG_CROMP_COMMON
235     LOG(astring("resolved addr ") + res1.text_form());
236 #endif
237   } else {
238 #ifdef DEBUG_CROMP_COMMON
239     LOG(astring("failed to resolve host=") + addr.hostname);
240 #endif
241   }
242
243   // get a readable form of the host.
244   astring just_host = res1.normalize_host();
245   while (just_host.length() < HOSTCHOP) just_host += "-";  // filler.
246   machine_uid converted = res1.convert();
247   astring to_return = just_host.substring(0, HOSTCHOP - 1);
248   to_return += converted.compact_form();
249
250 #ifdef DEBUG_CROMP_COMMON
251   LOG(astring("returning machid ") + converted.text_form() + ", packed as "
252       + parser_bits::platform_eol_to_chars()
253       + byte_formatter::text_dump((abyte *)to_return.s(),
254             to_return.length() + 1));
255 #endif
256
257   return to_return;
258 }
259
260 astring cromp_common::responses_text_form() const
261 { return _requests->text_form(); }
262
263 internet_address cromp_common::other_side() const
264 {
265   if (!_commlink) return internet_address();
266   return _commlink->where();
267 }
268
269 int cromp_common::max_bytes_per_entity() const
270 { return _requests->max_bytes_per_entity(); }
271
272 void cromp_common::max_bytes_per_entity(int max_bytes_per_entity)
273 {
274   _requests->max_bytes_per_entity(max_bytes_per_entity);
275   _octopus->responses().max_bytes_per_entity(max_bytes_per_entity);
276 }
277
278 void cromp_common::conditional_cleaning()
279 {
280 //  FUNCDEF("conditional_cleaning");
281   if (time_stamp(-CLEANUP_INTERVAL) > *_last_cleanup) {
282     _requests->clean_out_deadwood();
283       // flush any items that are too old.
284     _last_cleanup->reset();
285       // record that we just cleaned up.
286   }
287 }
288
289 outcome cromp_common::open_common(const internet_address &where)
290 {
291 #ifdef DEBUG_CROMP_COMMON
292   FUNCDEF("open_common");
293 #endif
294   if (_singleton && _commlink)
295     return OKAY;  // done if this uses pre-existing objects.
296
297   if (_commlink) WHACK(_commlink);  // clean up any pre-existing socket.
298
299   internet_address other_side = where;
300
301 #ifdef DEBUG_CROMP_COMMON
302   LOG(astring("opening at ") + other_side.text_form());
303 #endif
304   _commlink = new spocket(other_side);
305 //hmmm: check socket health.
306
307   return OKAY;
308 }
309
310 outcome cromp_common::close_common()
311 {
312   if (_commlink) _commlink->disconnect();  // make the thread stop bothering.
313   return OKAY;
314 }
315
316 const char *cromp_common::outcome_name(const outcome &to_name)
317 {
318   switch (to_name.value()) {
319     case TOO_FULL: return "TOO_FULL";
320     case PARTIAL: return "PARTIAL";
321     default: return communication_commons::outcome_name(to_name);
322   }
323 }
324
325 outcome cromp_common::pack_and_ship(const infoton_list &requests,
326     int max_tries)
327 {
328   FUNCDEF("pack_and_ship [multiple]");
329   if (!_commlink) return BAD_INPUT;  // they haven't opened this yet.
330   conditional_cleaning();
331   {
332     auto_synchronizer l(*_accum_lock);  // lock while packing.
333     for (int i = 0; i < requests.elements(); i++) {
334       if (!requests[i] || !requests[i]->_data) {
335         // this is a screw-up by someone.
336         LOG("error in infoton_list; missing data element.");
337         continue;
338       }
339       cromp_transaction::flatten(*_sendings, *requests[i]->_data,
340           requests[i]->_id);
341     }
342   }
343
344   return push_outgoing(max_tries);
345 }
346
347 bool cromp_common::buffer_clog(int max_buff) const
348 {
349   auto_synchronizer l(*_accum_lock);
350   return _sendings->length() >= max_buff;
351 }
352
353 outcome cromp_common::pack_and_ship(const infoton &request,
354     const octopus_request_id &item_id, int max_tries)
355 {
356 #ifdef DEBUG_CROMP_COMMON
357   FUNCDEF("pack_and_ship [single]");
358 #endif
359   if (!_commlink) return BAD_INPUT;  // they haven't opened this yet.
360   conditional_cleaning();
361
362 #ifdef DEBUG_CROMP_COMMON
363   LOG(astring("sending req ") + item_id.mangled_form());
364 #endif
365
366   {
367     auto_synchronizer l(*_accum_lock);  // lock while packing.
368     cromp_transaction::flatten(*_sendings, request, item_id);
369   }
370
371   return push_outgoing(max_tries);
372 }
373
374 outcome cromp_common::push_outgoing(int max_tries)
375 {
376   FUNCDEF("push_outgoing");
377
378   if (!max_tries) return cromp_common::OKAY;
379     // no tries means we're done already.
380
381   grab_anything(false);  // suck any data in that happens to be waiting.
382
383   outcome to_return = cromp_common::TOO_FULL;
384   int attempts = 0;
385   while ( (attempts++ < max_tries) && (to_return == cromp_common::TOO_FULL) ) {
386     to_return = send_buffer();
387     grab_anything(false);  // suck any data in that happens to be waiting.
388     if (to_return == cromp_common::OKAY)
389       break;  // happy returns.
390     if (to_return == cromp_common::PARTIAL) {
391       // we sent all we tried to but there's more left.
392       attempts = 0;  // skip back since we had a successful attempt.
393       to_return = cromp_common::TOO_FULL;
394         // reset so that we treat this by staying in the send loop.
395       continue;  // jump back without waiting.
396     }
397     if (to_return == cromp_common::TOO_FULL) {
398       // we can't send any more yet so delay for a bit to see if we can get
399       // some more out.
400       time_stamp stop_pausing(SEND_DELAY_TIME);
401       while (time_stamp() < stop_pausing) {
402 LOG("into too full looping...");
403         if (!_commlink->connected()) break;
404         grab_anything(true);  // suck any data in that happens to be waiting.
405         // snooze a bit until we think we can write again.
406         outcome ret = _commlink->await_writable(QUICK_CROMP_SNOOZE);
407         if (ret != spocket::NONE_READY)
408           break;
409       }
410     } else {
411       LOG(astring("failed send: ") + cromp_common::outcome_name(to_return));
412       break;
413     }
414   }
415   return to_return;
416 }
417
418 // rules for send_buffer: this function is in the lowest-level tier for using
419 // the spocket.  it is allowed to be called by anyone.  it must not call any
420 // other functions on the cromp_common class.
421 outcome cromp_common::send_buffer()
422 {
423 #ifdef DEBUG_CROMP_COMMON
424   FUNCDEF("send_buffer");
425 #endif
426   auto_synchronizer l(*_accum_lock);
427
428   // all done if nothing to send.
429   if (!_sendings->length())
430     return OKAY;
431
432   int size_to_send = minimum(_sendings->length(), MAXIMUM_SEND);
433 #ifdef DEBUG_CROMP_COMMON
434 //  LOG(a_sprintf("sending %d bytes on socket %d.", size_to_send,
435 //      _commlink->OS_socket()));
436 #endif
437   int len_sent = 0;
438   outcome to_return;
439   outcome send_ret = _commlink->send(_sendings->observe(), size_to_send,
440       len_sent);
441   switch (send_ret.value()) {
442     case spocket::OKAY: {
443       // success.
444 #ifdef DEBUG_CROMP_COMMON
445 //      LOG(a_sprintf("really sent %d bytes on socket %d.", len_sent,
446 //          _commlink->OS_socket()));
447 #endif
448       _bytes_sent_total += len_sent;
449       to_return = OKAY;
450       break;
451     }
452     case spocket::PARTIAL: {
453       // got something done hopefully.
454 #ifdef DEBUG_CROMP_COMMON
455       LOG(a_sprintf("partial send of %d bytes (of %d desired) on socket %d.",
456           len_sent, size_to_send, _commlink->OS_socket()));
457 #endif
458       _bytes_sent_total += len_sent;
459       to_return = PARTIAL;
460       break;
461     }
462     case spocket::NONE_READY: {
463       // did nothing useful.
464 #ifdef DEBUG_CROMP_COMMON
465       LOG(a_sprintf("too full to send any on socket %d.",
466           _commlink->OS_socket()));
467 #endif
468       len_sent = 0;  // reset just in case.
469       to_return = TOO_FULL;
470       break;
471     }
472     default: {
473       // other things went wrong.
474 #ifdef DEBUG_CROMP_COMMON
475       LOG(astring("failing send with ") + spocket::outcome_name(send_ret));
476 #endif
477       len_sent = 0;  // reset just in case.
478
479 //hmmm: these are unnecessary now since it's the same set of outcomes.
480       if (send_ret == spocket::NO_CONNECTION) to_return = NO_CONNECTION;
481       else if (send_ret == spocket::TIMED_OUT) to_return = TIMED_OUT;
482 //any other ideas?
483       else to_return = DISALLOWED;
484
485 #ifdef DEBUG_CROMP_COMMON
486       LOG(astring("failed to send--got error ") + outcome_name(to_return));
487 #endif
488       break;
489     }
490   }
491
492   if ( (to_return == PARTIAL) || (to_return == OKAY) ) {
493     // accomodate our latest activity on the socket.
494     _sendings->zap(0, len_sent - 1);  // sent just some of it.
495   }
496
497   return to_return;
498 }
499
500 outcome cromp_common::retrieve_and_restore_root(bool get_anything,
501     infoton * &item, octopus_request_id &req_id, int timeout)
502 {
503 //  FUNCDEF("retrieve_and_restore_root");
504   item = NIL;
505   if (!_commlink) return BAD_INPUT;  // they haven't opened this yet.
506   octopus_request_id tmp_id;
507   time_stamp leaving_time(timeout);
508
509   conditional_cleaning();
510
511   do {
512     // check if it's already in the bin from someone else grabbing it.
513     if (get_anything)
514       item = _requests->acquire_for_any(req_id);
515     else
516       item = _requests->acquire_for_identifier(req_id);
517     if (item)
518       return OKAY;
519
520     // check to see if there's any data.
521     grab_anything(timeout? true : false);
522
523     push_outgoing(1);
524 //hmmm: parameterize the push?
525
526     // check again just to make sure.  this is before we check the timeout,
527     // since we could squeak in with something before that.
528     if (get_anything)
529       item = _requests->acquire_for_any(req_id);
530     else
531       item = _requests->acquire_for_identifier(req_id);
532     if (item)
533       return OKAY;
534
535     if (!timeout) return TIMED_OUT;
536       // timeout is not set so we leave right away.
537
538     if (!_commlink->connected()) return NO_CONNECTION;
539
540     // keep going if we haven't seen it yet and still have time.
541   } while (time_stamp() < leaving_time);
542   return TIMED_OUT;
543 }
544
545 outcome cromp_common::retrieve_and_restore(infoton * &item,
546     const octopus_request_id &req_id_in, int timeout)
547 {
548   octopus_request_id req_id = req_id_in;
549   return retrieve_and_restore_root(false, item, req_id, timeout);
550 }
551
552 outcome cromp_common::retrieve_and_restore_any(infoton * &item,
553     octopus_request_id &req_id, int timeout)
554 { return retrieve_and_restore_root(true, item, req_id, timeout); }
555
556 // rules: snarf_from_socket is in the second lowest-level tier.  it must not
557 // call any other functions on cromp_common besides the send_buffer and
558 // process_accumulator methods.
559 void cromp_common::snarf_from_socket(bool wait)
560 {
561 #ifdef DEBUG_CROMP_COMMON
562   FUNCDEF("snarf_from_socket");
563 #endif
564   if (wait) {
565 #ifdef DEBUG_CROMP_COMMON
566 //    LOG(a_sprintf("awaiting rcptblty on socket %d.", _commlink->OS_socket()));
567 #endif
568     // snooze until data seems ready for chewing or until we time out.
569     time_stamp stop_pausing(DATA_AWAIT_SNOOZE);
570     while (time_stamp() < stop_pausing) {
571       if (!_commlink->connected()) return;
572       outcome wait_ret = _commlink->await_readable(QUICK_CROMP_SNOOZE);
573       if (wait_ret != spocket::NONE_READY)
574         break;
575       send_buffer();  // push out some data in between.
576     }
577   }
578
579   outcome rcv_ret = spocket::OKAY;
580   // this loop scrounges as much data as possible, within limits.
581   int receptions = 0;
582   while ( (rcv_ret == spocket::OKAY) && (receptions++ < MAXIMUM_RECEIVES) ) {
583     int rcv_size = CROMP_BUFFER_CHUNK_SIZE;
584     {
585       auto_synchronizer l(*_accum_lock);
586       _receive_buffer->reset();  // clear pre-existing junk.
587       rcv_ret = _commlink->receive(*_receive_buffer, rcv_size);
588 #ifdef DEBUG_CROMP_COMMON
589       if ( (rcv_ret == spocket::OKAY) && rcv_size) {
590         LOG(a_sprintf("received %d bytes on socket %d", rcv_size,
591             _commlink->OS_socket()));
592       } else if (rcv_ret != spocket::NONE_READY) {
593         LOG(a_sprintf("no data on sock %d--outcome=", _commlink->OS_socket())
594             + spocket::outcome_name(rcv_ret));
595       }
596 #endif
597       if ( (rcv_ret == spocket::OKAY) && rcv_size) {
598         // we got some data from the receive, so store it.
599         _bytes_received_total += _receive_buffer->length();
600         *_accumulator += *_receive_buffer;  // add to overall accumulator.
601         _last_data_seen->reset();
602       }
603     }
604
605     send_buffer();
606       // force data to go out also.
607   }
608 }
609
610 void cromp_common::grab_anything(bool wait)
611 {
612   snarf_from_socket(wait);  // get any data that's waiting.
613   process_accumulator();  // retrieve any commands we see.
614 }
615
616 #define CHECK_STALENESS \
617   if (*_last_data_seen < time_stamp(-STALENESS_PERIOD)) { \
618     LOG("would resynch data due to staleness."); \
619     _accumulator->zap(0, 0);  /* roast first byte */ \
620     cromp_transaction::resynchronize(*_accumulator); \
621     _last_data_seen->reset(); \
622     continue; \
623   }
624
625 // process_accumulator should do nothing besides chewing on the buffer.
626 // this puts it in the lowest-level tier.
627
628 void cromp_common::process_accumulator()
629 {
630   FUNCDEF("process_accumulator");
631   infoton *item = NIL;
632   octopus_request_id req_id;
633
634   string_array clas;
635
636   if (!_accumulator->length()) return;
637
638   // a little gymnastics to get a large buffer on the first try.
639   byte_array temp_chow_buffer(CROMP_BUFFER_CHUNK_SIZE, NIL);
640   temp_chow_buffer.reset();
641
642   int cmds_found = 0;
643
644   while (_accumulator->length()) {
645 LOG(a_sprintf("eating command %d", cmds_found++));
646     {
647       // first block tries to extract data from the accumulator.
648       auto_synchronizer l(*_accum_lock);
649       // there are some contents; let's look at them.
650       int packed_length = 0;
651       outcome peek_ret = cromp_transaction::peek_header(*_accumulator,
652           packed_length);
653       if ( (peek_ret == cromp_transaction::WAY_TOO_SMALL)
654           || (peek_ret == cromp_transaction::PARTIAL) ) {
655         // not ready yet.
656         CHECK_STALENESS;
657         return;
658       } else if (peek_ret != cromp_transaction::OKAY) {
659         LOG(astring("error unpacking--peek error=")
660             + cromp_transaction::outcome_name(peek_ret));
661         // try to get to a real command.
662         _accumulator->zap(0, 0);  // roast first byte.
663         if (cromp_transaction::resynchronize(*_accumulator)) continue;
664         return;
665       }
666
667 #ifdef DEBUG_CROMP_COMMON
668       LOG("seeing command ready");
669 #endif
670       // temp buffer for undoing cromp transaction.
671       if (!cromp_transaction::unflatten(*_accumulator, *_still_flat, req_id)) {
672         LOG("failed to unpack even though peek was happy!");
673         // try to get to a real command.
674         _accumulator->zap(0, 0);  // roast first byte.
675         if (cromp_transaction::resynchronize(*_accumulator)) continue;
676         return;
677       }
678 #ifdef DEBUG_CROMP_COMMON
679       LOG(astring("got req id of ") + req_id.mangled_form());
680 #endif
681
682       // now unwrap the onion a bit more to find the real object being sent.
683       if (!infoton::fast_unpack(*_still_flat, clas, temp_chow_buffer)) {
684         // try to resynch on transaction boundary.
685         LOG("failed to get back a packed infoton!");
686         _accumulator->zap(0, 0);  // roast first byte.
687         if (cromp_transaction::resynchronize(*_accumulator)) continue;
688         return;
689       }
690 #ifdef DEBUG_CROMP_COMMON
691       LOG(astring("got classifier of ") + clas.text_form());
692 #endif
693     } // end of protected area.
694
695     // restore the infoton from the packed form.
696     outcome rest_ret = octo()->restore(clas, temp_chow_buffer, item);
697     if (rest_ret != tentacle::OKAY) {
698 #ifdef DEBUG_CROMP_COMMON
699       LOG(astring("our octopus couldn't restore the packed data! ")
700           + outcome_name(rest_ret));
701 #endif
702       // publish an unhandled request back to the requestor.
703       _requests->add_item(new unhandled_request(req_id, clas, rest_ret),
704           req_id);
705     } else {
706       // we finally have reached a point where we have a valid infoton.
707       if (_requests->add_item(item, req_id))
708         cmds_found++;
709 #ifdef DEBUG_CROMP_COMMON
710       else
711         LOG("failed to add item to bin due to space constraints.");
712 #endif
713     }
714 LOG(a_sprintf("ate command %d", cmds_found));
715   }
716 ///  LOG(a_sprintf("added %d commands", cmds_found));
717 }
718
719 bool cromp_common::decode_host(const astring &coded_host, astring &hostname,
720     machine_uid &machine)
721 {
722   if (coded_host.length() < HOSTCHOP) return false;  // not big enough.
723   hostname = coded_host.substring(0, cromp_common::HOSTCHOP - 1);
724   const astring compact_uid = coded_host.substring(cromp_common::HOSTCHOP,
725       coded_host.length() - 1);
726   machine = machine_uid::expand(compact_uid);
727   if (!machine.valid()) return false;
728   return true;
729 }
730
731 } //namespace.
732