42 using namespace basis;
88 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(to_print).s())
93 class cromp_client_record;
95 class cromp_data_grabber :
public ethread
98 cromp_data_grabber(cromp_client_record &parent,
octopus *octo)
99 :
ethread(), _parent(parent), _octo(octo) {}
103 virtual void perform_activity(
void *);
106 cromp_client_record &_parent;
112 class cromp_client_record :
public cromp_common
115 cromp_client_record(cromp_server &parent,
spocket *client,
octopus *octo,
117 : cromp_common(client, octo),
123 _grabber(*this, octo),
125 _still_connected(true),
126 _security_arm(security)
129 (internet_address::localhost(), client->
stack().
hostname(), 0);
130 open_common(local_addr);
134 ~cromp_client_record() {
140 bool handle_client_needs(
ethread &prompter) {
141 #ifdef DEBUG_CROMP_SERVER
142 FUNCDEF(
"handle_client_needs");
145 if (!_healthy)
return false;
146 if (!spock()->connected()) {
147 _still_connected =
false;
150 bool keep_going =
true;
157 bool ret = get_incoming_data(actions);
158 if (ret) keep_going =
true;
159 ret = push_client_replies(actions);
160 if (ret) keep_going =
true;
163 #ifdef DEBUG_CROMP_SERVER
166 LOG(
a_sprintf(
"%d pending send bytes, %d bytes accumulated, bin has "
167 "%d items.", pending_sends(), accumulated_bytes(),
168 octo()->responses().items_held()));
172 if (duration > 200) {
188 while (get_incoming_data(actions)) {
194 _security_arm.expunge(_ent);
198 bool healthy()
const {
return _healthy; }
201 bool still_connected()
const {
return _still_connected; }
204 cromp_server &parent()
const {
return _parent; }
206 bool push_client_replies(
int &actions) {
207 FUNCDEF(
"push_client_replies");
208 if (!healthy())
return false;
211 #ifdef DEBUG_CROMP_SERVER
212 LOG(
"not pushing replies for blank.");
218 LOG(
"buffer clog being cleared now.");
223 LOG(
"could not completely clear buffer clog.");
226 LOG(
"cleared out buffer clog.");
232 if (!_octo->responses().items_held()) {
237 grab_anything(
false);
239 int num_located = _octo->responses().acquire_for_entity(ent(),
247 if (_parent.encrypting()) {
248 for (
int i = 0; i < _waiting.elements(); i++) {
249 infoton *curr = _waiting[i]->_data;
250 infoton *processed = _parent.wrap_infoton(curr,
251 _waiting[i]->_id._entity);
252 if (processed) _waiting[i]->_data = processed;
256 outcome ret = pack_and_ship(_waiting, 0);
258 if ( (ret != cromp_common::OKAY) && (ret != cromp_common::TIMED_OUT) ) {
262 LOG(
astring(
"failed to send package back to client: ")
263 + cromp_common::outcome_name(ret));
270 #ifdef DEBUG_CROMP_SERVER
279 if (!spock()->connected()) {
280 #ifdef DEBUG_CROMP_SERVER
281 LOG(
"noticed disconnection of client.");
283 _still_connected =
false;
288 bool get_incoming_data(
int &actions) {
290 if (!healthy())
return false;
291 int first_one =
true;
292 bool saw_something =
false;
297 outcome ret = retrieve_and_restore_any(item, req_id,
300 if (ret == cromp_common::TIMED_OUT) {
303 }
else if (ret != cromp_common::OKAY) {
304 #ifdef DEBUG_CROMP_SERVER
305 LOG(
astring(
"got error ") + cromp_common::outcome_name(ret));
307 if (ret == cromp_common::NO_CONNECTION) {
308 #ifdef DEBUG_CROMP_SERVER
309 LOG(
"noticed disconnection of client.");
311 _still_connected =
false;
317 saw_something =
true;
320 LOG(
astring(
"would have assigned ours to blank id! ")
325 #ifdef DEBUG_CROMP_SERVER
331 #ifdef DEBUG_CROMP_SERVER
332 LOG(
astring(
"assigned own entity to ") + _ent.mangled_form());
334 }
else if (!_fixated && (_ent != req_id.
_entity) ) {
335 #ifdef DEBUG_CROMP_SERVER
337 +
" where we used to have " + _ent.mangled_form());
343 #ifdef DEBUG_CROMP_SERVER
344 else if (_ent != req_id.
_entity) {
346 #ifdef DEBUG_CROMP_SERVER
348 +
" when we fixated on " + _ent.mangled_form());
366 ret = _octo->evaluate(item, req_id, _parent.instantaneous());
367 if (ret != tentacle::OKAY) {
368 #ifdef DEBUG_CROMP_SERVER
369 LOG(
astring(
"failed to evaluate the infoton we got: ")
381 LOG(
astring(
"injecting unhandled note into response stream for ")
382 + req_id.
text_form() +
", got outcome " + outcome_name(ret));
384 _parent.send_to_client(req_id,
392 return saw_something;
396 cromp_server &_parent;
401 cromp_data_grabber _grabber;
404 bool _still_connected;
411 void cromp_data_grabber::perform_activity(
void *)
413 #ifdef DEBUG_CROMP_SERVER
416 while (!should_stop()) {
418 bool ret = _parent.handle_client_needs(*
this);
422 #ifdef DEBUG_CROMP_SERVER
423 LOG(
"done handling client needs.");
425 _octo->expunge(_parent.ent());
433 class cromp_client_list :
public amorph<cromp_client_record>
437 for (
int i = 0; i < elements(); i++)
438 if (to_find == get(i)->ent())
return i;
439 return common::NOT_FOUND;
445 class client_dropping_thread :
public ethread
448 client_dropping_thread (cromp_server &parent)
452 void perform_activity(
void *
formal(ptr)) {
454 _parent.drop_dead_clients();
458 cromp_server &_parent;
463 class connection_management_thread :
public ethread
466 connection_management_thread(cromp_server &parent)
470 void perform_activity(
void *
formal(ptr)) {
472 _parent.look_for_clients(*
this);
476 cromp_server &_parent;
482 #define LOCK_LISTS auto_synchronizer l(*_list_lock)
486 int accepting_threads,
bool instantaneous,
int max_per_ent)
488 _clients(new cromp_client_list),
490 _list_lock(new
mutex),
492 _instantaneous(instantaneous),
494 _accepting_threads(accepting_threads),
495 _dropper(new client_dropping_thread(*this)),
510 WHACK(_next_droppage);
512 WHACK(_default_security);
525 const abyte any_list[] = { 0, 0, 0, 0 };
552 #ifdef DEBUG_CROMP_SERVER
563 infoton::fast_pack(packed_request, *request);
576 #ifdef DEBUG_CROMP_SERVER
577 LOG(
astring(
"enabling encryption for ") + class_name()
584 WHACK(_security_arm);
596 outcome to_return = accept_one_client(
false);
597 if ( (to_return != common::NOT_FOUND) && (to_return != common::OKAY) ) {
602 #ifdef DEBUG_CROMP_SERVER
603 LOG(
a_sprintf(
"adding %d accepting threads.", _accepting_threads));
605 for (
int i = 0; i < _accepting_threads; i++) {
617 if (!_enabled)
return;
624 for (
int i = 0; i < _clients->elements(); i++) {
626 cromp_client_record *cli = (*_clients)[i];
627 if (cli) cli->croak();
646 return _clients? _clients->elements() : 0;
652 if (!_enabled)
return false;
654 int indy = _clients->find(
id);
656 cromp_client_record *cli = (*_clients)[indy];
666 if (!_enabled)
return false;
669 int indy = _clients->find(
id);
671 cromp_client_record *cli = (*_clients)[indy];
673 found = cli->spock()->remote();
677 outcome cromp_server::accept_one_client(
bool wait)
679 #ifdef DEBUG_CROMP_SERVER
682 if (!_enabled)
return common::INCOMPLETE;
688 if ( (ret == spocket::OKAY) && accepted) {
690 cromp_client_record *adding =
new cromp_client_record(*
this, accepted,
691 octo(), *_security_arm);
692 #ifdef DEBUG_CROMP_SERVER
696 _clients->append(adding);
699 if (ret == spocket::NO_CONNECTION)
701 #ifdef DEBUG_CROMP_SERVER
702 LOG(
astring(
"error accepting client: ") + spocket::outcome_name(ret));
711 if (!_enabled)
return;
714 outcome ret = accept_one_client(
false);
718 LOG(
astring(
"got real error on socket; leaving for good.")
719 + spocket::outcome_name(ret));
733 #ifdef DEBUG_CROMP_SERVER
736 if (!_enabled)
return common::INCOMPLETE;
737 if (!
octo()->responses().add_item(data,
id)) {
738 #ifdef DEBUG_CROMP_SERVER
739 LOG(
"failed to store result for client--no space left currently.");
765 if (!_enabled)
return common::INCOMPLETE;
769 int indy = _clients->find(
id._entity);
771 cromp_client_record *cli = (*_clients)[indy];
772 return cli->retrieve_and_restore(data,
id, timeout);
777 #ifdef DEBUG_CROMP_SERVER
780 if (!_enabled)
return;
789 for (
int i = 0; i < _clients->elements(); i++) {
790 cromp_client_record *cli = (*_clients)[i];
792 #ifdef DEBUG_CROMP_SERVER
799 if (!cli->still_connected() || !cli->healthy()) {
800 #ifdef DEBUG_CROMP_SERVER
801 LOG(
astring(
"dropping disconnected client ") + cli->ent().mangled_form());
a_sprintf is a specialization of astring that provides printf style support.
Provides a dynamically resizable ASCII character string.
virtual void text_form(base_string &state_fill) const
Provides a text view of all the important info owned by this object.
A very common template for a dynamic array of bytes.
Outcomes describe the state of completion for an operation.
A few common features used by both CROMP clients and servers.
static const char * outcome_name(const basis::outcome &to_name)
octopi::octopus * octo() const
basis::outcome open_common(const sockets::internet_address &where)
sockets::spocket * spock() const
basis::outcome close_common()
virtual basis::outcome add_tentacle(octopi::tentacle *to_add, bool filter=false)
Implements the client registry in a cromp-appropriate manner.
basis::outcome enable_servers(bool encrypt, cromp_security *security=NULL_POINTER)
bool get_sizes(const octopi::octopus_entity &id, int &items, int &bytes)
octopi::infoton * wrap_infoton(octopi::infoton *&request, const octopi::octopus_entity &ent)
static sockets::internet_address any_address(int port)
basis::outcome send_to_client(const octopi::octopus_request_id &id, octopi::infoton *data)
basis::astring responses_text_form() const
sockets::internet_address location() const
void look_for_clients(processes::ethread &requester)
bool find_entity(const octopi::octopus_entity &id, sockets::internet_address &found)
basis::outcome get_from_client(const octopi::octopus_request_id &id, octopi::infoton *&data, int timeout)
static int DEFAULT_ACCEPTERS()
bool disconnect_entity(const octopi::octopus_entity &id)
returns true if the "id" can be found and disconnected.
bool encrypt(const basis::byte_array &source, basis::byte_array &target) const
encrypts the "source" array into the "target" array.
Encapsulates the chit-chat necessary to establish an encrypted connection.
Processes the encryption_infoton object for setting up an encrypted channel.
key_repository & keys() const
provides access to our list of keys.
Wraps an encrypted infoton when the octopus is in an encrypted mode.
basis::byte_array _wrapped
the encrypted data that's held here.
basis::astring text_form() const
bool get_sizes(const octopus_entity &id, int &items, int &bytes)
Encapsulates just the action of identifying an octopus user.
a list of pending requests and who made them.
An infoton is an individual request parcel with accompanying information.
virtual void text_form(basis::base_string &state_fill) const =0
requires derived infotons to be able to show their state as a string.
const structures::string_array & classifier() const
this array of strings is the "name" for this infoton.
void unlock(octenc_key_record *to_unlock)
drops the lock on the key record in "to_unlock".
octenc_key_record * lock(const octopus_entity &ent)
locates the key for "ent", if it's stored.
Provides rudimentary login services.
Tracks the keys that have been assigned for a secure channel.
crypto::blowfish_crypto _key
used for communicating with an entity.
Provides a way of identifying users of an octopus object.
basis::astring text_form() const
returns a readable form of the identifier.
bool blank() const
true if the entity is blank, as constructed by default constructor.
basis::astring mangled_form() const
returns the combined string form of the identifier.
Identifies requests made on an octopus by users.
basis::astring text_form() const
human readable form of the request.
octopus_entity _entity
the entity.
Octopus is a design pattern for generalized request processing systems.
entity_data_bin & responses()
allows external access to our set of results.
Informs the caller that a request type was unknown to the server octopus.
this simple tentacle just unpacks the encryption_wrapper infoton.
Provides a platform-independent object for adding threads to a program.
bool should_stop() const
reports whether the thread should stop right now.
Manages a collection of threads.
structures::unique_int add_thread(ethread *to_add, bool start_it, void *parm)
adds a thread to be managed by the thread_cabinet.
void stop_all()
makes all of the threads quit.
this type of address describes a destination out on the internet.
basis::astring text_form() const
Abstraction for a higher-level BSD socket that is platform independent.
tcpip_stack & stack() const
basis::un_int OS_socket()
basis::outcome accept(spocket *&sock, bool wait)
basis::astring hostname() const
An array of strings with some additional helpful methods.
basis::astring text_form() const
A synonym for the text_format() method.
Represents a point in time relative to the operating system startup time.
void reset()
sets the stamp time back to now.
time_representation value() const
returns the time_stamp in terms of the lower level type.
#define formal(parameter)
This macro just eats what it's passed; it marks unused formal parameters.
#define NULL_POINTER
The value representing a pointer to nothing.
#define DEFINE_CLASS_NAME(objname)
Defines the name of a class by providing a couple standard methods.
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
The guards collection helps in testing preconditions and reporting errors.
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
const int MEGABYTE
Number of bytes in a megabyte.
unsigned char abyte
A fairly important unit which is seldom defined...
const int SECOND_ms
Number of milliseconds in a second.
bool negative(const type &a)
negative returns true if "a" is less than zero.
const int KILOBYTE
Number of bytes in a kilobyte.
const int DEAD_CLIENT_CLEANING_INTERVAL
const int MAXIMUM_ACTIONS_PER_CLIENT
const int DROPPING_INTERVAL
const int SEND_TRIES_ALLOWED
const int MAXIMUM_SIZE_BATCH
const int ACCEPTANCE_SNOOZE
const int DATA_AWAIT_TIMEOUT
const int EXTREME_SEND_TRIES_ALLOWED
const int MAXIMUM_BYTES_PER_SEND
A logger that sends to the console screen using the standard output device.
Provides access to the operating system's socket methods.
A dynamic container class that holds any kind of object via pointers.