88#define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(to_print).s())
93class cromp_client_record;
95class cromp_data_grabber :
public ethread
98 cromp_data_grabber(cromp_client_record &parent,
octopus *octo)
99 :
ethread(), _parent(parent), _octo(octo) {}
103 virtual void perform_activity(
void *);
106 cromp_client_record &_parent;
112class cromp_client_record :
public cromp_common
123 _grabber(*this,
octo),
125 _still_connected(true),
126 _security_arm(security)
134 ~cromp_client_record() {
140 bool handle_client_needs(
ethread &prompter) {
141#ifdef DEBUG_CROMP_SERVER
142 FUNCDEF(
"handle_client_needs");
145 if (!_healthy)
return false;
146 if (!
spock()->connected()) {
147 _still_connected =
false;
150 bool keep_going =
true;
157 bool ret = get_incoming_data(actions);
158 if (ret) keep_going =
true;
159 ret = push_client_replies(actions);
160 if (ret) keep_going =
true;
163#ifdef DEBUG_CROMP_SERVER
166 LOG(
a_sprintf(
"%d pending send bytes, %d bytes accumulated, bin has "
168 octo()->responses().items_held()));
172 if (duration > 200) {
188 while (get_incoming_data(actions)) {
194 _security_arm.expunge(_ent);
198 bool healthy()
const {
return _healthy; }
201 bool still_connected()
const {
return _still_connected; }
204 cromp_server &parent()
const {
return _parent; }
206 bool push_client_replies(
int &actions) {
207 FUNCDEF(
"push_client_replies");
208 if (!healthy())
return false;
211#ifdef DEBUG_CROMP_SERVER
212 LOG(
"not pushing replies for blank.");
218LOG(
"buffer clog being cleared now.");
223LOG(
"could not completely clear buffer clog.");
226LOG(
"cleared out buffer clog.");
232 if (!_octo->responses().items_held()) {
239 int num_located = _octo->responses().acquire_for_entity(ent(),
247 if (_parent.encrypting()) {
248 for (
int i = 0; i < _waiting.elements(); i++) {
249 infoton *curr = _waiting[i]->_data;
250 infoton *processed = _parent.wrap_infoton(curr,
251 _waiting[i]->_id._entity);
252 if (processed) _waiting[i]->_data = processed;
262 LOG(
astring(
"failed to send package back to client: ")
270#ifdef DEBUG_CROMP_SERVER
279 if (!
spock()->connected()) {
280#ifdef DEBUG_CROMP_SERVER
281 LOG(
"noticed disconnection of client.");
283 _still_connected =
false;
288 bool get_incoming_data(
int &actions) {
290 if (!healthy())
return false;
291 int first_one =
true;
292 bool saw_something =
false;
304#ifdef DEBUG_CROMP_SERVER
308#ifdef DEBUG_CROMP_SERVER
309 LOG(
"noticed disconnection of client.");
311 _still_connected =
false;
317 saw_something =
true;
320 LOG(
astring(
"would have assigned ours to blank id! ")
325#ifdef DEBUG_CROMP_SERVER
331#ifdef DEBUG_CROMP_SERVER
332 LOG(
astring(
"assigned own entity to ") + _ent.mangled_form());
334 }
else if (!_fixated && (_ent != req_id.
_entity) ) {
335#ifdef DEBUG_CROMP_SERVER
337 +
" where we used to have " + _ent.mangled_form());
343#ifdef DEBUG_CROMP_SERVER
344 else if (_ent != req_id.
_entity) {
346#ifdef DEBUG_CROMP_SERVER
348 +
" when we fixated on " + _ent.mangled_form());
366 ret = _octo->evaluate(item, req_id, _parent.instantaneous());
368#ifdef DEBUG_CROMP_SERVER
369 LOG(
astring(
"failed to evaluate the infoton we got: ")
381 LOG(
astring(
"injecting unhandled note into response stream for ")
384 _parent.send_to_client(req_id,
392 return saw_something;
396 cromp_server &_parent;
401 cromp_data_grabber _grabber;
404 bool _still_connected;
411void cromp_data_grabber::perform_activity(
void *)
413#ifdef DEBUG_CROMP_SERVER
416 while (!should_stop()) {
418 bool ret = _parent.handle_client_needs(*
this);
422#ifdef DEBUG_CROMP_SERVER
423 LOG(
"done handling client needs.");
425 _octo->expunge(_parent.ent());
433class cromp_client_list :
public amorph<cromp_client_record>
437 for (
int i = 0; i <
elements(); i++)
438 if (to_find ==
get(i)->ent())
return i;
439 return common::NOT_FOUND;
445class client_dropping_thread :
public ethread
448 client_dropping_thread (cromp_server &parent)
454 _parent.drop_dead_clients();
458 cromp_server &_parent;
463class connection_management_thread :
public ethread
466 connection_management_thread(cromp_server &parent)
472 _parent.look_for_clients(*
this);
476 cromp_server &_parent;
482#define LOCK_LISTS auto_synchronizer l(*_list_lock)
486 int accepting_threads,
bool instantaneous,
int max_per_ent)
488 _clients(new cromp_client_list),
490 _list_lock(new
mutex),
492 _instantaneous(instantaneous),
494 _accepting_threads(accepting_threads),
495 _dropper(new client_dropping_thread(*this)),
510 WHACK(_next_droppage);
512 WHACK(_default_security);
525 const abyte any_list[] = { 0, 0, 0, 0 };
552#ifdef DEBUG_CROMP_SERVER
576#ifdef DEBUG_CROMP_SERVER
577 LOG(
astring(
"enabling encryption for ") + class_name()
584 WHACK(_security_arm);
596 outcome to_return = accept_one_client(
false);
597 if ( (to_return != common::NOT_FOUND) && (to_return != common::OKAY) ) {
602#ifdef DEBUG_CROMP_SERVER
603 LOG(
a_sprintf(
"adding %d accepting threads.", _accepting_threads));
605 for (
int i = 0; i < _accepting_threads; i++) {
617 if (!_enabled)
return;
624 for (
int i = 0; i < _clients->elements(); i++) {
626 cromp_client_record *cli = (*_clients)[i];
627 if (cli) cli->croak();
646 return _clients? _clients->elements() : 0;
652 if (!_enabled)
return false;
654 int indy = _clients->find(
id);
656 cromp_client_record *cli = (*_clients)[indy];
666 if (!_enabled)
return false;
669 int indy = _clients->find(
id);
671 cromp_client_record *cli = (*_clients)[indy];
673 found = cli->spock()->remote();
677outcome cromp_server::accept_one_client(
bool wait)
679#ifdef DEBUG_CROMP_SERVER
682 if (!_enabled)
return common::INCOMPLETE;
690 cromp_client_record *adding =
new cromp_client_record(*
this, accepted,
691 octo(), *_security_arm);
692#ifdef DEBUG_CROMP_SERVER
696 _clients->append(adding);
701#ifdef DEBUG_CROMP_SERVER
711 if (!_enabled)
return;
714 outcome ret = accept_one_client(
false);
718 LOG(
astring(
"got real error on socket; leaving for good.")
733#ifdef DEBUG_CROMP_SERVER
736 if (!_enabled)
return common::INCOMPLETE;
737 if (!
octo()->responses().add_item(data,
id)) {
738#ifdef DEBUG_CROMP_SERVER
739 LOG(
"failed to store result for client--no space left currently.");
765 if (!_enabled)
return common::INCOMPLETE;
769 int indy = _clients->find(
id._entity);
771 cromp_client_record *cli = (*_clients)[indy];
772 return cli->retrieve_and_restore(data,
id, timeout);
777#ifdef DEBUG_CROMP_SERVER
780 if (!_enabled)
return;
789 for (
int i = 0; i < _clients->elements(); i++) {
790 cromp_client_record *cli = (*_clients)[i];
792#ifdef DEBUG_CROMP_SERVER
799 if (!cli->still_connected() || !cli->healthy()) {
800#ifdef DEBUG_CROMP_SERVER
801 LOG(
astring(
"dropping disconnected client ") + cli->ent().mangled_form());
a_sprintf is a specialization of astring that provides printf style support.
Provides a dynamically resizable ASCII character string.
virtual void text_form(base_string &state_fill) const
Provides a text view of all the important info owned by this object.
A very common template for a dynamic array of bytes.
Outcomes describe the state of completion for an operation.
A few common features used by both CROMP clients and servers.
static const char * outcome_name(const basis::outcome &to_name)
bool buffer_clog(int clog_point=1 *basis::MEGABYTE) const
basis::outcome open_common(const sockets::internet_address &where)
basis::outcome push_outgoing(int max_tries)
sockets::spocket * spock() const
cromp_common(const basis::astring &host, int max_per_ent)
basis::outcome close_common()
int accumulated_bytes() const
returns the number of bytes pending processing from the other side.
basis::outcome pack_and_ship(const octopi::infoton &request, const octopi::octopus_request_id &item_id, int max_tries)
octopi::octopus * octo() const
basis::outcome retrieve_and_restore_any(octopi::infoton *&item, octopi::octopus_request_id &req_id, int timeout)
int pending_sends() const
returns the number of bytes still unsent.
virtual basis::outcome add_tentacle(octopi::tentacle *to_add, bool filter=false)
void grab_anything(bool wait)
Implements the client registry in a cromp-appropriate manner.
cromp_server(const sockets::internet_address &where, int accepting_threads=DEFAULT_ACCEPTERS(), bool instantaneous=true, int max_per_entity=DEFAULT_MAX_ENTITY_QUEUE)
basis::outcome enable_servers(bool encrypt, cromp_security *security=NULL_POINTER)
bool get_sizes(const octopi::octopus_entity &id, int &items, int &bytes)
octopi::infoton * wrap_infoton(octopi::infoton *&request, const octopi::octopus_entity &ent)
static sockets::internet_address any_address(int port)
basis::outcome send_to_client(const octopi::octopus_request_id &id, octopi::infoton *data)
basis::astring responses_text_form() const
sockets::internet_address location() const
void look_for_clients(processes::ethread &requester)
bool find_entity(const octopi::octopus_entity &id, sockets::internet_address &found)
basis::outcome get_from_client(const octopi::octopus_request_id &id, octopi::infoton *&data, int timeout)
static int DEFAULT_ACCEPTERS()
bool disconnect_entity(const octopi::octopus_entity &id)
returns true if the "id" can be found and disconnected.
bool encrypt(const basis::byte_array &source, basis::byte_array &target) const
encrypts the "source" array into the "target" array.
Encapsulates the chit-chat necessary to establish an encrypted connection.
Processes the encryption_infoton object for setting up an encrypted channel.
key_repository & keys() const
provides access to our list of keys.
Wraps an encrypted infoton when the octopus is in an encrypted mode.
basis::byte_array _wrapped
the encrypted data that's held here.
basis::astring text_form() const
bool get_sizes(const octopus_entity &id, int &items, int &bytes) const
Encapsulates just the action of identifying an octopus user.
a list of pending requests and who made them.
An infoton is an individual request parcel with accompanying information.
virtual void text_form(basis::base_string &state_fill) const =0
requires derived infotons to be able to show their state as a string.
static void fast_pack(basis::byte_array &packed_form, const infoton &to_pack)
flattens an infoton "to_pack" into the byte array "packed_form".
const structures::string_array & classifier() const
this array of strings is the "name" for this infoton.
void unlock(octenc_key_record *to_unlock)
drops the lock on the key record in "to_unlock".
octenc_key_record * lock(const octopus_entity &ent)
locates the key for "ent", if it's stored.
Provides rudimentary login services.
Tracks the keys that have been assigned for a secure channel.
crypto::blowfish_crypto _key
used for communicating with an entity.
Provides a way of identifying users of an octopus object.
basis::astring text_form() const
returns a readable form of the identifier.
bool blank() const
true if the entity is blank, as constructed by default constructor.
basis::astring mangled_form() const
returns the combined string form of the identifier.
Identifies requests made on an octopus by users.
basis::astring text_form() const
human readable form of the request.
octopus_entity _entity
the entity.
Octopus is a design pattern for generalized request processing systems.
entity_data_bin & responses()
allows external access to our set of results.
Informs the caller that a request type was unknown to the server octopus.
this simple tentacle just unpacks the encryption_wrapper infoton.
Provides a platform-independent object for adding threads to a program.
virtual void perform_activity(void *thread_data)=0
< invoked just after after start(), when the OS thread is created.
bool should_stop() const
reports whether the thread should stop right now.
Manages a collection of threads.
structures::unique_int add_thread(ethread *to_add, bool start_it, void *parm)
adds a thread to be managed by the thread_cabinet.
void stop_all()
makes all of the threads quit.
this type of address describes a destination out on the internet.
basis::astring text_form() const
static const basis::byte_array & localhost()
Abstraction for a higher-level BSD socket that is platform independent.
tcpip_stack & stack() const
static const char * outcome_name(const basis::outcome &to_name)
basis::un_int OS_socket()
basis::outcome accept(spocket *&sock, bool wait)
basis::astring hostname() const
int elements() const
the maximum number of elements currently allowed in this amorph.
int find(const cromp_client_record *to_locate, basis::outcome &o)
Searches the amorph for the contents specified.
const cromp_client_record * get(int field) const
Returns a constant pointer to the information at the index "field".
An array of strings with some additional helpful methods.
basis::astring text_form() const
A synonym for the text_format() method.
static void sleep_ms(basis::un_int msec)
a system independent name for a forced snooze measured in milliseconds.
Represents a point in time relative to the operating system startup time.
void reset()
sets the stamp time back to now.
time_representation value() const
returns the time_stamp in terms of the lower level type.
#define formal(parameter)
This macro just eats what it's passed; it marks unused formal parameters.
#define NULL_POINTER
The value representing a pointer to nothing.
#define DEFINE_CLASS_NAME(objname)
Defines the name of a class by providing a couple standard methods.
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
The guards collection helps in testing preconditions and reporting errors.
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
const int MEGABYTE
Number of bytes in a megabyte.
unsigned char abyte
A fairly important unit which is seldom defined...
const int SECOND_ms
Number of milliseconds in a second.
bool negative(const type &a)
negative returns true if "a" is less than zero.
const int KILOBYTE
Number of bytes in a kilobyte.
const int DEAD_CLIENT_CLEANING_INTERVAL
const int MAXIMUM_ACTIONS_PER_CLIENT
const int DROPPING_INTERVAL
const int SEND_TRIES_ALLOWED
const int MAXIMUM_SIZE_BATCH
const int ACCEPTANCE_SNOOZE
const int DATA_AWAIT_TIMEOUT
const int EXTREME_SEND_TRIES_ALLOWED
const int MAXIMUM_BYTES_PER_SEND
A logger that sends to the console screen using the standard output device.
Provides access to the operating system's socket methods.
A dynamic container class that holds any kind of object via pointers.