55 using namespace basis;
70 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(to_print).s())
100 const int cromp_common::HOSTCHOP = 6;
104 double cromp_common::_bytes_sent_total = 0.0;
105 double cromp_common::_bytes_received_total = 0.0;
108 (encryption_infoton::RSA_KEY_SIZE))
109 const rsa_crypto &cromp_common::localhost_only_key() {
110 #ifdef DEBUG_CROMP_COMMON
113 static bool initted =
false;
114 #ifdef DEBUG_CROMP_COMMON
115 bool was_initted = initted;
118 #ifdef DEBUG_CROMP_COMMON
120 LOG(
"started creating localhost RSA key.");
122 const rsa_crypto &to_return = _hidden_localhost_only_key();
123 #ifdef DEBUG_CROMP_COMMON
125 LOG(
"done creating localhost RSA key.");
130 cromp_common::cromp_common(
const astring &host,
int max_per_ent)
132 _octopus(new
octopus(host, max_per_ent)),
135 _accum_lock(new
mutex),
143 FUNCDEF(
"constructor [host/max_per_ent]");
145 _accumulator->
reset();
147 _receive_buffer->
reset();
148 _still_flat->
reset();
152 : _commlink(preexisting),
154 _singleton(singleton),
156 singleton->responses().max_bytes_per_entity()
158 _accum_lock(new
mutex),
166 FUNCDEF(
"constructor [preexisting/singleton]");
169 LOG(
"singleton passed as NULL_POINTER; constructing new octopus instead.");
174 _accumulator->
reset();
176 _receive_buffer->
reset();
177 _still_flat->
reset();
195 WHACK(_last_cleanup);
196 WHACK(_last_data_seen);
197 WHACK(_receive_buffer);
212 return _sendings->
length();
218 return _accumulator->
length();
224 #ifdef DEBUG_CROMP_COMMON
233 if (resolved_form) *resolved_form = res1;
234 #ifdef DEBUG_CROMP_COMMON
238 #ifdef DEBUG_CROMP_COMMON
250 #ifdef DEBUG_CROMP_COMMON
252 + parser_bits::platform_eol_to_chars()
253 + byte_formatter::text_dump((
abyte *)to_return.
s(),
266 return _commlink->
where();
278 void cromp_common::conditional_cleaning()
280 FUNCDEF(
"conditional_cleaning");
284 _last_cleanup->
reset();
291 #ifdef DEBUG_CROMP_COMMON
294 if (_singleton && _commlink)
297 if (_commlink)
WHACK(_commlink);
301 #ifdef DEBUG_CROMP_COMMON
318 switch (to_name.
value()) {
319 case TOO_FULL:
return "TOO_FULL";
320 case PARTIAL:
return "PARTIAL";
321 default:
return communication_commons::outcome_name(to_name);
328 FUNCDEF(
"pack_and_ship [multiple]");
330 conditional_cleaning();
333 for (
int i = 0; i < requests.
elements(); i++) {
334 if (!requests[i] || !requests[i]->_data) {
336 LOG(
"error in infoton_list; missing data element.");
350 return _sendings->
length() >= max_buff;
356 #ifdef DEBUG_CROMP_COMMON
357 FUNCDEF(
"pack_and_ship [single]");
360 conditional_cleaning();
362 #ifdef DEBUG_CROMP_COMMON
383 outcome to_return = cromp_common::TOO_FULL;
385 while ( (attempts++ < max_tries) && (to_return == cromp_common::TOO_FULL) ) {
393 to_return = cromp_common::TOO_FULL;
397 if (to_return == cromp_common::TOO_FULL) {
402 LOG(
"into too full looping...");
407 if (ret != spocket::NONE_READY)
423 #ifdef DEBUG_CROMP_COMMON
433 #ifdef DEBUG_CROMP_COMMON
441 switch (send_ret.
value()) {
442 case spocket::OKAY: {
444 #ifdef DEBUG_CROMP_COMMON
448 _bytes_sent_total += len_sent;
452 case spocket::PARTIAL: {
454 #ifdef DEBUG_CROMP_COMMON
455 LOG(
a_sprintf(
"partial send of %d bytes (of %d desired) on socket %d.",
456 len_sent, size_to_send, _commlink->
OS_socket()));
458 _bytes_sent_total += len_sent;
462 case spocket::NONE_READY: {
464 #ifdef DEBUG_CROMP_COMMON
469 to_return = TOO_FULL;
474 #ifdef DEBUG_CROMP_COMMON
475 LOG(
astring(
"failing send with ") + spocket::outcome_name(send_ret));
480 if (send_ret == spocket::NO_CONNECTION) to_return =
NO_CONNECTION;
481 else if (send_ret == spocket::TIMED_OUT) to_return =
TIMED_OUT;
485 #ifdef DEBUG_CROMP_COMMON
492 if ( (to_return ==
PARTIAL) || (to_return ==
OKAY) ) {
494 _sendings->
zap(0, len_sent - 1);
500 outcome cromp_common::retrieve_and_restore_root(
bool get_anything,
503 FUNCDEF(
"retrieve_and_restore_root");
509 conditional_cleaning();
549 return retrieve_and_restore_root(
false, item, req_id, timeout);
554 {
return retrieve_and_restore_root(
true, item, req_id, timeout); }
559 void cromp_common::snarf_from_socket(
bool wait)
561 #ifdef DEBUG_CROMP_COMMON
565 #ifdef DEBUG_CROMP_COMMON
573 if (wait_ret != spocket::NONE_READY)
579 outcome rcv_ret = spocket::OKAY;
586 _receive_buffer->
reset();
587 rcv_ret = _commlink->
receive(*_receive_buffer, rcv_size);
588 #ifdef DEBUG_CROMP_COMMON
589 if ( (rcv_ret == spocket::OKAY) && rcv_size) {
590 LOG(
a_sprintf(
"received %d bytes on socket %d", rcv_size,
592 }
else if (rcv_ret != spocket::NONE_READY) {
594 + spocket::outcome_name(rcv_ret));
597 if ( (rcv_ret == spocket::OKAY) && rcv_size) {
599 _bytes_received_total += _receive_buffer->
length();
600 *_accumulator += *_receive_buffer;
601 _last_data_seen->
reset();
612 snarf_from_socket(wait);
613 process_accumulator();
616 #define CHECK_STALENESS \
617 if (*_last_data_seen < time_stamp(-STALENESS_PERIOD)) { \
618 LOG("would resynch data due to staleness."); \
619 _accumulator->zap(0, 0);
\
620 cromp_transaction::resynchronize(*_accumulator); \
621 _last_data_seen->reset(); \
628 void cromp_common::process_accumulator()
630 FUNCDEF(
"process_accumulator");
636 if (!_accumulator->
length())
return;
640 temp_chow_buffer.
reset();
644 while (_accumulator->
length()) {
650 int packed_length = 0;
653 if ( (peek_ret == cromp_transaction::WAY_TOO_SMALL)
662 _accumulator->
zap(0, 0);
667 #ifdef DEBUG_CROMP_COMMON
668 LOG(
"seeing command ready");
672 LOG(
"failed to unpack even though peek was happy!");
674 _accumulator->
zap(0, 0);
678 #ifdef DEBUG_CROMP_COMMON
683 if (!infoton::fast_unpack(*_still_flat, clas, temp_chow_buffer)) {
685 LOG(
"failed to get back a packed infoton!");
686 _accumulator->
zap(0, 0);
690 #ifdef DEBUG_CROMP_COMMON
697 if (rest_ret != tentacle::OKAY) {
698 #ifdef DEBUG_CROMP_COMMON
699 LOG(
astring(
"our octopus couldn't restore the packed data! ")
707 if (_requests->
add_item(item, req_id))
709 #ifdef DEBUG_CROMP_COMMON
711 LOG(
"failed to add item to bin due to space constraints.");
726 machine = machine_uid::expand(compact_uid);
727 if (!machine.
valid())
return false;
a_sprintf is a specialization of astring that provides printf style support.
void reset(int number=0, const contents *initial_contents=NULL_POINTER)
Resizes this array and sets the contents from an array of contents.
const contents * observe() const
Returns a pointer to the underlying C array of data.
int length() const
Returns the current reported length of the allocated C array.
outcome zap(int start, int end)
Deletes from "this" the objects inclusively between "start" and "end".
Provides a dynamically resizable ASCII character string.
const char * s() const
synonym for observe. the 's' stands for "string", if that helps.
bool substring(astring &target, int start, int end) const
a version that stores the substring in an existing "target" string.
int length() const
Returns the current length of the string.
auto_synchronizer simplifies concurrent code by automatically unlocking.
A very common template for a dynamic array of bytes.
Outcomes describe the state of completion for an operation.
static const char * outcome_name(const basis::outcome &to_name)
octopi::octopus * octo() const
static const int HOSTCHOP
bool buffer_clog(int clog_point=1 *basis::MEGABYTE) const
basis::outcome open_common(const sockets::internet_address &where)
basis::outcome push_outgoing(int max_tries)
sockets::spocket * spock() const
cromp_common(const basis::astring &host, int max_per_ent)
basis::outcome retrieve_and_restore(octopi::infoton *&item, const octopi::octopus_request_id &req_id, int timeout)
basis::outcome close_common()
int accumulated_bytes() const
returns the number of bytes pending processing from the other side.
basis::outcome pack_and_ship(const octopi::infoton &request, const octopi::octopus_request_id &item_id, int max_tries)
sockets::internet_address other_side() const
static int default_port()
basis::outcome retrieve_and_restore_any(octopi::infoton *&item, octopi::octopus_request_id &req_id, int timeout)
static basis::astring chew_hostname(const sockets::internet_address &addr, sockets::internet_address *resolved=NULL_POINTER)
int max_bytes_per_entity() const
basis::astring responses_text_form() const
static bool decode_host(const basis::astring &coded_host, basis::astring &hostname, sockets::machine_uid &machine)
int pending_sends() const
returns the number of bytes still unsent.
virtual basis::outcome add_tentacle(octopi::tentacle *to_add, bool filter=false)
basis::outcome send_buffer()
void grab_anything(bool wait)
static basis::outcome peek_header(const basis::byte_array &packed_form, int &length)
static bool resynchronize(basis::byte_array &packed_form)
static void flatten(basis::byte_array &packed_form, const octopi::infoton &request, const octopi::octopus_request_id &id)
static bool unflatten(basis::byte_array &packed_form, basis::byte_array &still_flat, octopi::octopus_request_id &id)
static const char * outcome_name(const basis::outcome &to_name)
Supports public key encryption and decryption.
Stores a set of infotons grouped by the entity that owns them.
int max_bytes_per_entity() const
bool add_item(infoton *to_add, const octopus_request_id &id)
infoton * acquire_for_any(octopus_request_id &id)
infoton * acquire_for_identifier(const octopus_request_id &id)
void clean_out_deadwood(int decay_interval=4 *basis::MINUTE_ms)
basis::astring text_form() const
a list of pending requests and who made them.
An infoton is an individual request parcel with accompanying information.
Identifies requests made on an octopus by users.
basis::astring mangled_form() const
similar to entity id.
Octopus is a design pattern for generalized request processing systems.
entity_data_bin & responses()
allows external access to our set of results.
basis::outcome add_tentacle(tentacle *to_add, bool filter=false)
hooks a tentacle in to provide processing of one type of infoton.
basis::outcome restore(const structures::string_array &classifier, basis::byte_array &packed_form, infoton *&reformed)
regenerates a packed infoton given its classifier.
Manages a service within an octopus by processing certain infotons.
Informs the caller that a request type was unknown to the server octopus.
this type of address describes a destination out on the internet.
machine_uid convert() const
basis::astring normalize_host() const
basis::astring text_form() const
char hostname[MAXIMUM_HOSTNAME_LENGTH]
basis::astring text_form() const
basis::astring compact_form() const
Abstraction for a higher-level BSD socket that is platform independent.
basis::outcome await_writable(int timeout)
basis::outcome send(const basis::abyte *buffer, int size, int &len_sent)
basis::un_int OS_socket()
basis::outcome disconnect()
basis::outcome await_readable(int timeout)
const internet_address & where() const
basis::outcome receive(basis::abyte *buffer, int &size)
Helpful functions for interacting with TCP/IP stacks.
int elements() const
the maximum number of elements currently allowed in this amorph.
An abstraction that represents a stack data structure.
An array of strings with some additional helpful methods.
basis::astring text_form() const
A synonym for the text_format() method.
Represents a point in time relative to the operating system startup time.
void reset()
sets the stamp time back to now.
#define NULL_POINTER
The value representing a pointer to nothing.
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
The guards collection helps in testing preconditions and reporting errors.
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
unsigned char abyte
A fairly important unit which is seldom defined...
const int SECOND_ms
Number of milliseconds in a second.
type minimum(type a, type b)
maximum returns the greater of two values.
const int MINUTE_ms
Number of milliseconds in a minute.
const int KILOBYTE
Number of bytes in a kilobyte.
const int CROMP_BUFFER_CHUNK_SIZE
const int MAXIMUM_RECEIVES
const int SEND_DELAY_TIME
const int STALENESS_PERIOD
const int DEFAULT_MAX_ENTITY_QUEUE
the default size we allow per each entity.
const int CLEANUP_INTERVAL
const int DATA_AWAIT_SNOOZE
const int QUICK_CROMP_SNOOZE
A logger that sends to the console screen using the standard output device.
Provides access to the operating system's socket methods.
A dynamic container class that holds any kind of object via pointers.
#define SAFE_STATIC_CONST(type, func_name, parms)
this version returns a constant object instead.