70#define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(to_print).s())
104double cromp_common::_bytes_sent_total = 0.0;
105double cromp_common::_bytes_received_total = 0.0;
110#ifdef DEBUG_CROMP_COMMON
113 static bool initted =
false;
114#ifdef DEBUG_CROMP_COMMON
115 bool was_initted = initted;
118#ifdef DEBUG_CROMP_COMMON
120 LOG(
"started creating localhost RSA key.");
122 const rsa_crypto &to_return = _hidden_localhost_only_key();
123#ifdef DEBUG_CROMP_COMMON
125 LOG(
"done creating localhost RSA key.");
132 _octopus(new
octopus(host, max_per_ent)),
135 _accum_lock(new
mutex),
143 FUNCDEF(
"constructor [host/max_per_ent]");
145 _accumulator->
reset();
147 _receive_buffer->
reset();
148 _still_flat->
reset();
152: _commlink(preexisting),
154 _singleton(singleton),
156 singleton->responses().max_bytes_per_entity()
158 _accum_lock(new
mutex),
166 FUNCDEF(
"constructor [preexisting/singleton]");
169 LOG(
"singleton passed as NULL_POINTER; constructing new octopus instead.");
174 _accumulator->
reset();
176 _receive_buffer->
reset();
177 _still_flat->
reset();
195 WHACK(_last_cleanup);
196 WHACK(_last_data_seen);
197 WHACK(_receive_buffer);
212 return _sendings->
length();
218 return _accumulator->
length();
224#ifdef DEBUG_CROMP_COMMON
233 if (resolved_form) *resolved_form = res1;
234#ifdef DEBUG_CROMP_COMMON
238#ifdef DEBUG_CROMP_COMMON
250#ifdef DEBUG_CROMP_COMMON
266 return _commlink->
where();
278void cromp_common::conditional_cleaning()
280 FUNCDEF(
"conditional_cleaning");
284 _last_cleanup->
reset();
291#ifdef DEBUG_CROMP_COMMON
294 if (_singleton && _commlink)
297 if (_commlink)
WHACK(_commlink);
301#ifdef DEBUG_CROMP_COMMON
318 switch (to_name.
value()) {
319 case TOO_FULL:
return "TOO_FULL";
320 case PARTIAL:
return "PARTIAL";
328 FUNCDEF(
"pack_and_ship [multiple]");
330 conditional_cleaning();
333 for (
int i = 0; i < requests.
elements(); i++) {
334 if (!requests[i] || !requests[i]->_data) {
336 LOG(
"error in infoton_list; missing data element.");
350 return _sendings->
length() >= max_buff;
356#ifdef DEBUG_CROMP_COMMON
357 FUNCDEF(
"pack_and_ship [single]");
360 conditional_cleaning();
362#ifdef DEBUG_CROMP_COMMON
383 outcome to_return = cromp_common::TOO_FULL;
385 while ( (attempts++ < max_tries) && (to_return == cromp_common::TOO_FULL) ) {
393 to_return = cromp_common::TOO_FULL;
397 if (to_return == cromp_common::TOO_FULL) {
402LOG(
"into too full looping...");
423#ifdef DEBUG_CROMP_COMMON
433#ifdef DEBUG_CROMP_COMMON
441 switch (send_ret.
value()) {
444#ifdef DEBUG_CROMP_COMMON
448 _bytes_sent_total += len_sent;
454#ifdef DEBUG_CROMP_COMMON
455 LOG(
a_sprintf(
"partial send of %d bytes (of %d desired) on socket %d.",
456 len_sent, size_to_send, _commlink->
OS_socket()));
458 _bytes_sent_total += len_sent;
464#ifdef DEBUG_CROMP_COMMON
469 to_return = TOO_FULL;
474#ifdef DEBUG_CROMP_COMMON
485#ifdef DEBUG_CROMP_COMMON
492 if ( (to_return ==
PARTIAL) || (to_return ==
OKAY) ) {
494 _sendings->
zap(0, len_sent - 1);
500outcome cromp_common::retrieve_and_restore_root(
bool get_anything,
503 FUNCDEF(
"retrieve_and_restore_root");
509 conditional_cleaning();
549 return retrieve_and_restore_root(
false, item, req_id, timeout);
554{
return retrieve_and_restore_root(
true, item, req_id, timeout); }
559void cromp_common::snarf_from_socket(
bool wait)
561#ifdef DEBUG_CROMP_COMMON
565#ifdef DEBUG_CROMP_COMMON
586 _receive_buffer->
reset();
587 rcv_ret = _commlink->
receive(*_receive_buffer, rcv_size);
588#ifdef DEBUG_CROMP_COMMON
590 LOG(
a_sprintf(
"received %d bytes on socket %d", rcv_size,
599 _bytes_received_total += _receive_buffer->
length();
600 *_accumulator += *_receive_buffer;
601 _last_data_seen->
reset();
612 snarf_from_socket(wait);
613 process_accumulator();
616#define CHECK_STALENESS \
617 if (*_last_data_seen < time_stamp(-STALENESS_PERIOD)) { \
618 LOG("would resynch data due to staleness."); \
619 _accumulator->zap(0, 0); \
620 cromp_transaction::resynchronize(*_accumulator); \
621 _last_data_seen->reset(); \
628void cromp_common::process_accumulator()
630 FUNCDEF(
"process_accumulator");
636 if (!_accumulator->
length())
return;
640 temp_chow_buffer.
reset();
644 while (_accumulator->
length()) {
650 int packed_length = 0;
653 if ( (peek_ret == cromp_transaction::WAY_TOO_SMALL)
662 _accumulator->
zap(0, 0);
667#ifdef DEBUG_CROMP_COMMON
668 LOG(
"seeing command ready");
672 LOG(
"failed to unpack even though peek was happy!");
674 _accumulator->
zap(0, 0);
678#ifdef DEBUG_CROMP_COMMON
685 LOG(
"failed to get back a packed infoton!");
686 _accumulator->
zap(0, 0);
690#ifdef DEBUG_CROMP_COMMON
698#ifdef DEBUG_CROMP_COMMON
699 LOG(
astring(
"our octopus couldn't restore the packed data! ")
707 if (_requests->
add_item(item, req_id))
709#ifdef DEBUG_CROMP_COMMON
711 LOG(
"failed to add item to bin due to space constraints.");
727 if (!machine.
valid())
return false;
a_sprintf is a specialization of astring that provides printf style support.
void reset(int number=0, const contents *initial_contents=NULL_POINTER)
Resizes this array and sets the contents from an array of contents.
const contents * observe() const
Returns a pointer to the underlying C array of data.
int length() const
Returns the current reported length of the allocated C array.
outcome zap(int start, int end)
Deletes from "this" the objects inclusively between "start" and "end".
Provides a dynamically resizable ASCII character string.
const char * s() const
synonym for observe. the 's' stands for "string", if that helps.
bool substring(astring &target, int start, int end) const
a version that stores the substring in an existing "target" string.
int length() const
Returns the current length of the string.
auto_synchronizer simplifies concurrent code by automatically unlocking.
A very common template for a dynamic array of bytes.
Outcomes describe the state of completion for an operation.
static const char * outcome_name(const basis::outcome &to_name)
static const int HOSTCHOP
bool buffer_clog(int clog_point=1 *basis::MEGABYTE) const
basis::outcome open_common(const sockets::internet_address &where)
basis::outcome push_outgoing(int max_tries)
sockets::spocket * spock() const
cromp_common(const basis::astring &host, int max_per_ent)
basis::outcome retrieve_and_restore(octopi::infoton *&item, const octopi::octopus_request_id &req_id, int timeout)
basis::outcome close_common()
int accumulated_bytes() const
returns the number of bytes pending processing from the other side.
basis::outcome pack_and_ship(const octopi::infoton &request, const octopi::octopus_request_id &item_id, int max_tries)
octopi::octopus * octo() const
sockets::internet_address other_side() const
static int default_port()
basis::outcome retrieve_and_restore_any(octopi::infoton *&item, octopi::octopus_request_id &req_id, int timeout)
static basis::astring chew_hostname(const sockets::internet_address &addr, sockets::internet_address *resolved=NULL_POINTER)
int max_bytes_per_entity() const
basis::astring responses_text_form() const
static bool decode_host(const basis::astring &coded_host, basis::astring &hostname, sockets::machine_uid &machine)
int pending_sends() const
returns the number of bytes still unsent.
virtual basis::outcome add_tentacle(octopi::tentacle *to_add, bool filter=false)
basis::outcome send_buffer()
void grab_anything(bool wait)
static const crypto::rsa_crypto & localhost_only_key()
static basis::outcome peek_header(const basis::byte_array &packed_form, int &length)
static bool resynchronize(basis::byte_array &packed_form)
static void flatten(basis::byte_array &packed_form, const octopi::infoton &request, const octopi::octopus_request_id &id)
static bool unflatten(basis::byte_array &packed_form, basis::byte_array &still_flat, octopi::octopus_request_id &id)
static const char * outcome_name(const basis::outcome &to_name)
Supports public key encryption and decryption.
static const int RSA_KEY_SIZE
this key size should be used for all RSA private keys.
Stores a set of infotons grouped by the entity that owns them.
int max_bytes_per_entity() const
bool add_item(infoton *to_add, const octopus_request_id &id)
infoton * acquire_for_any(octopus_request_id &id)
infoton * acquire_for_identifier(const octopus_request_id &id)
void clean_out_deadwood(int decay_interval=4 *basis::MINUTE_ms)
basis::astring text_form() const
a list of pending requests and who made them.
An infoton is an individual request parcel with accompanying information.
static bool fast_unpack(basis::byte_array &packed_form, structures::string_array &classifier, basis::byte_array &info)
undoes a previous fast_pack to restore the previous information.
Identifies requests made on an octopus by users.
basis::astring mangled_form() const
similar to entity id.
Octopus is a design pattern for generalized request processing systems.
entity_data_bin & responses()
allows external access to our set of results.
basis::outcome add_tentacle(tentacle *to_add, bool filter=false)
hooks a tentacle in to provide processing of one type of infoton.
basis::outcome restore(const structures::string_array &classifier, basis::byte_array &packed_form, infoton *&reformed)
regenerates a packed infoton given its classifier.
Manages a service within an octopus by processing certain infotons.
Informs the caller that a request type was unknown to the server octopus.
static const char * outcome_name(const basis::outcome &to_name)
this type of address describes a destination out on the internet.
machine_uid convert() const
basis::astring normalize_host() const
basis::astring text_form() const
char hostname[MAXIMUM_HOSTNAME_LENGTH]
static const basis::byte_array & localhost()
static machine_uid expand(const basis::astring &compacted)
basis::astring text_form() const
basis::astring compact_form() const
Abstraction for a higher-level BSD socket that is platform independent.
basis::outcome await_writable(int timeout)
basis::outcome send(const basis::abyte *buffer, int size, int &len_sent)
static const char * outcome_name(const basis::outcome &to_name)
basis::un_int OS_socket()
basis::outcome disconnect()
basis::outcome await_readable(int timeout)
const internet_address & where() const
basis::outcome receive(basis::abyte *buffer, int &size)
Helpful functions for interacting with TCP/IP stacks.
int elements() const
the maximum number of elements currently allowed in this amorph.
An abstraction that represents a stack data structure.
An array of strings with some additional helpful methods.
basis::astring text_form() const
A synonym for the text_format() method.
static void text_dump(basis::astring &output, const basis::abyte *location, basis::un_int length, basis::un_int label=0, const char *eol="\n")
prints out a block of memory in a human readable form.
static const char * platform_eol_to_chars()
provides the characters that make up this platform's line ending.
Represents a point in time relative to the operating system startup time.
void reset()
sets the stamp time back to now.
#define NULL_POINTER
The value representing a pointer to nothing.
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
The guards collection helps in testing preconditions and reporting errors.
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
unsigned char abyte
A fairly important unit which is seldom defined...
const int SECOND_ms
Number of milliseconds in a second.
type minimum(type a, type b)
maximum returns the greater of two values.
const int MINUTE_ms
Number of milliseconds in a minute.
const int KILOBYTE
Number of bytes in a kilobyte.
const int CROMP_BUFFER_CHUNK_SIZE
const int MAXIMUM_RECEIVES
const int SEND_DELAY_TIME
const int STALENESS_PERIOD
const int DEFAULT_MAX_ENTITY_QUEUE
the default size we allow per each entity.
const int CLEANUP_INTERVAL
const int DATA_AWAIT_SNOOZE
const int QUICK_CROMP_SNOOZE
A logger that sends to the console screen using the standard output device.
Provides access to the operating system's socket methods.
A dynamic container class that holds any kind of object via pointers.
#define SAFE_STATIC_CONST(type, func_name, parms)
this version returns a constant object instead.