feisty meow concerns codebase 2.140
cromp_server.cpp
Go to the documentation of this file.
1/*****************************************************************************\
2* *
3* Name : cromp_server *
4* Author : Chris Koeritz *
5* *
6*******************************************************************************
7* Copyright (c) 2000-$now By Author. This program is free software; you can *
8* redistribute it and/or modify it under the terms of the GNU General Public *
9* License as published by the Free Software Foundation; either version 2 of *
10* the License or (at your option) any later version. This is online at: *
11* http://www.fsf.org/copyleft/gpl.html *
12* Please send any updates to: fred@gruntose.com *
13\*****************************************************************************/
14
15#include "cromp_common.h"
16#include "cromp_security.h"
17#include "cromp_server.h"
18
19#include <basis/astring.h>
20#include <basis/functions.h>
21#include <basis/mutex.h>
24#include <octopus/entity_defs.h>
26#include <octopus/infoton.h>
27#include <octopus/tentacle.h>
29#include <processes/ethread.h>
32#include <sockets/tcpip_stack.h>
33#include <sockets/spocket.h>
34#include <structures/amorph.h>
40#include <timely/time_control.h>
41
42using namespace basis;
43using namespace loggers;
44using namespace octopi;
45using namespace processes;
46using namespace sockets;
47using namespace structures;
48using namespace timely;
49
50namespace cromp {
51
52//#define DEBUG_CROMP_SERVER
53 // uncomment for noisy version.
54
56 // we will drop any clients that have disconnected this long ago.
57
59 // this is the maximum number of things we'll do in one run for a
60 // client, including both sends and receives.
61
62const int SEND_TRIES_ALLOWED = 1;
63 // the number of attempts we will make to get outgoing data to send.
64
65const int SEND_THRESHOLD = 512 * KILOBYTE;
66 // if we pile up some data to this point in our client gathering, we'll
67 // go ahead and start pushing it to the client.
68
70 // if we're clogged, we'll push this many times to get data out.
71
73 // the maximum size we want our buffer to grow.
74
75const int MAXIMUM_SIZE_BATCH = 384 * KILOBYTE;
76 // the largest chunk of updates we'll try to grab at one time.
77
78const int DROPPING_INTERVAL = 500;
79 // the rate at which we'll check for dead clients and clean up.
80
81const int DATA_AWAIT_TIMEOUT = 14;
82 // how long the server zones out waiting for data.
83
84const int ACCEPTANCE_SNOOZE = 60;
85 // if the server sees no clients, it will take a little nap.
86
87#undef LOG
88#define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(to_print).s())
89
91
92// forward.
93class cromp_client_record;
94
95class cromp_data_grabber : public ethread
96{
97public:
98 cromp_data_grabber(cromp_client_record &parent, octopus *octo)
99 : ethread(), _parent(parent), _octo(octo) {}
100
101 DEFINE_CLASS_NAME("cromp_data_grabber");
102
103 virtual void perform_activity(void *);
104
105private:
106 cromp_client_record &_parent;
107 octopus *_octo;
108};
109
111
112class cromp_client_record : public cromp_common
113{
114public:
115 cromp_client_record(cromp_server &parent, spocket *client, octopus *octo,
116 login_tentacle &security)
117 : cromp_common(client, octo),
118 _parent(parent),
119 _octo(octo),
120 _ent(),
121 _healthy(true),
122 _fixated(false),
123 _grabber(*this, octo),
124 _waiting(),
125 _still_connected(true),
126 _security_arm(security)
127 {
129 (internet_address::localhost(), client->stack().hostname(), 0);
130 open_common(local_addr); // open the common support for biz.
131 _grabber.start(NULL_POINTER); // crank up our background data pump on the socket.
132 }
133
134 ~cromp_client_record() {
135 croak();
136 }
137
138 DEFINE_CLASS_NAME("cromp_client_record");
139
140 bool handle_client_needs(ethread &prompter) {
141#ifdef DEBUG_CROMP_SERVER
142 FUNCDEF("handle_client_needs");
143 time_stamp start;
144#endif
145 if (!_healthy) return false; // done.
146 if (!spock()->connected()) {
147 _still_connected = false;
148 return false; // need to stop now.
149 }
150 bool keep_going = true;
151 int actions = 0;
152 while (keep_going && (actions < MAXIMUM_ACTIONS_PER_CLIENT) ) {
153 // make sure we don't overstay our welcome when the thread's supposed
154 // to quit.
155 if (prompter.should_stop()) return false;
156 keep_going = false; // only continue if there's a reason.
157 bool ret = get_incoming_data(actions); // look for requests.
158 if (ret) keep_going = true;
159 ret = push_client_replies(actions); // send replies back to the client.
160 if (ret) keep_going = true;
161 }
162
163#ifdef DEBUG_CROMP_SERVER
164 if (actions > 10) {
165 LOG(a_sprintf("actions=%d", actions));
166 LOG(a_sprintf("%d pending send bytes, %d bytes accumulated, bin has "
167 "%d items.", pending_sends(), accumulated_bytes(),
168 octo()->responses().items_held()));
169 }
170
171 int duration = int(time_stamp().value() - start.value());
172 if (duration > 200) {
173 LOG(a_sprintf("duration=%d ms.", duration));
174 }
175#endif
176
177 return true;
178 }
179
180 const octopus_entity &ent() const { return _ent; }
181
182 // stops the background activity of this object and drops the connection
183 // to the client.
184 void croak() {
185 FUNCDEF("croak");
186 _grabber.stop();
187 int actions = 0;
188 while (get_incoming_data(actions)) {
189 // keep receiving whatever's there already. we are trying to drain
190 // the socket before destroying it.
191 }
192 _healthy = false;
193 // clean out any records for this goner.
194 _security_arm.expunge(_ent);
195 close_common();
196 }
197
198 bool healthy() const { return _healthy; }
199 // this is true unless the object has been told to shut down.
200
201 bool still_connected() const { return _still_connected; }
202 // this is true unless the client side dropped the connection.
203
204 cromp_server &parent() const { return _parent; }
205
206 bool push_client_replies(int &actions) {
207 FUNCDEF("push_client_replies");
208 if (!healthy()) return false;
209 if (ent().blank()) {
210 // not pushing replies if we haven't even gotten a command yet.
211#ifdef DEBUG_CROMP_SERVER
212 LOG("not pushing replies for blank.");
213#endif
214 return false;
215 }
216
218LOG("buffer clog being cleared now.");
219 // the buffers are pretty full; we'll try later.
221 // if we're still clogged, then leave.
223LOG("could not completely clear buffer clog.");
224 return true;
225 }
226LOG("cleared out buffer clog.");
227 }
228
229 int any_left = true;
230 while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
231 // make sure we're not wasting our time.
232 if (!_octo->responses().items_held()) {
233 any_left = false;
234 break;
235 }
236 // make sure we don't ignore receptions.
237 grab_anything(false);
238 // try to grab a result for this entity.
239 int num_located = _octo->responses().acquire_for_entity(ent(),
240 _waiting, MAXIMUM_SIZE_BATCH);
241 if (!num_located) {
242 any_left = false;
243 break;
244 }
245
246 // if we're encrypting, we need to wrap these as well.
247 if (_parent.encrypting()) {
248 for (int i = 0; i < _waiting.elements(); i++) {
249 infoton *curr = _waiting[i]->_data;
250 infoton *processed = _parent.wrap_infoton(curr,
251 _waiting[i]->_id._entity);
252 if (processed) _waiting[i]->_data = processed; // replace infoton.
253 }
254 }
255
256 outcome ret = pack_and_ship(_waiting, 0);
257 // no attempt to send yet; we're just stuffing the buffer.
258 if ( (ret != cromp_common::OKAY) && (ret != cromp_common::TIMED_OUT) ) {
259//hmmm: what about keeping transmission as held in list; retry later on it?
260
261//#ifdef DEBUG_CROMP_SERVER
262 LOG(astring("failed to send package back to client: ")
264//#endif
265 any_left = false;
266 break;
267 }
268
270#ifdef DEBUG_CROMP_SERVER
271 LOG(astring("over sending threshold on ") + _ent.text_form());
272#endif
274 }
275
276 }
277 // now that we've got a pile possibly, we'll try to send them out.
279 if (!spock()->connected()) {
280#ifdef DEBUG_CROMP_SERVER
281 LOG("noticed disconnection of client.");
282#endif
283 _still_connected = false;
284 }
285 return any_left;
286 }
287
288 bool get_incoming_data(int &actions) {
289 FUNCDEF("get_incoming_data");
290 if (!healthy()) return false;
291 int first_one = true;
292 bool saw_something = false; // true if we got a packet.
293 while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
294 // pull in anything waiting.
295 infoton *item = NULL_POINTER;
296 octopus_request_id req_id;
297 outcome ret = retrieve_and_restore_any(item, req_id,
298 first_one? DATA_AWAIT_TIMEOUT : 0);
299 first_one = false;
300 if (ret == cromp_common::TIMED_OUT) {
301 actions--; // didn't actually eat one.
302 return false;
303 } else if (ret != cromp_common::OKAY) {
304#ifdef DEBUG_CROMP_SERVER
305 LOG(astring("got error ") + cromp_common::outcome_name(ret));
306#endif
307 if (ret == cromp_common::NO_CONNECTION) {
308#ifdef DEBUG_CROMP_SERVER
309 LOG("noticed disconnection of client.");
310#endif
311 _still_connected = false;
312 }
313 actions--; // didn't actually eat one.
314 return false; // get outa here.
315 }
316 // got a packet.
317 saw_something = true;
318 if (!_fixated) {
319 if (req_id._entity.blank()) {
320 LOG(astring("would have assigned ours to blank id! ")
321 + req_id._entity.mangled_form());
322 WHACK(item);
323 continue;
324 }
325#ifdef DEBUG_CROMP_SERVER
326 LOG(astring("cmd with entity ") + req_id._entity.mangled_form());
327#endif
328 if (_ent.blank()) {
329 // assign the entity id now that we know it.
330 _ent = req_id._entity;
331#ifdef DEBUG_CROMP_SERVER
332 LOG(astring("assigned own entity to ") + _ent.mangled_form());
333#endif
334 } else if (!_fixated && (_ent != req_id._entity) ) {
335#ifdef DEBUG_CROMP_SERVER
336 LOG(astring("fixated on entity of ") + req_id._entity.mangled_form()
337 + " where we used to have " + _ent.mangled_form());
338#endif
339 _ent = req_id._entity;
340 _fixated = true;
341 }
342 } // connects to line after debug just below.
343#ifdef DEBUG_CROMP_SERVER
344 else if (_ent != req_id._entity) {
345 // this checks the validity of the entity.
346#ifdef DEBUG_CROMP_SERVER
347 LOG(astring("seeing wrong entity of ") + req_id._entity.mangled_form()
348 + " when we fixated on " + _ent.mangled_form());
349#endif
350 WHACK(item);
351 continue;
352 }
353#endif
354 // check again so we make sure we're still healthy; could have changed
355 // state while getting a command.
356 if (!healthy()) {
357 WHACK(item);
358 continue;
359 }
360 string_array classif = item->classifier();
361 // hang onto the classifier since the next time we get a chance, the
362 // object might be destroyed.
363
364 // we pass responsibility for this item over to the octopus. that's why
365 // we're not deleting it once evaluate gets the item.
366 ret = _octo->evaluate(item, req_id, _parent.instantaneous());
367 if (ret != tentacle::OKAY) {
368#ifdef DEBUG_CROMP_SERVER
369 LOG(astring("failed to evaluate the infoton we got: ")
370 + classif.text_form());
371#endif
372//hmmm: we have upgraded this response to be for all errors, since otherwise
373// clients will just time out waiting for something that's never coming.
374
375 // we do a special type of handling when the tentacle is missing. this
376 // is almost always because the wrong type of request is being sent to
377 // a server, or the server didn't register for all the objects it is
378 // supposed to handle.
380//#ifdef DEBUG_CROMP_SERVER
381 LOG(astring("injecting unhandled note into response stream for ")
382 + req_id.text_form() + ", got outcome " + outcome_name(ret));
383//#endif
384 _parent.send_to_client(req_id,
385 new unhandled_request(req_id, classif, ret));
386 // this will always work, although it's not a surety that the
387 // client actually still exists. probably though, since we're
388 // just now handling this request.
390 }
391 }
392 return saw_something; // keep going if we actually did anything good.
393 }
394
395private:
396 cromp_server &_parent; // the object that owns this client.
397 octopus *_octo;
398 octopus_entity _ent; // the entity by which we know this client.
399 bool _healthy; // reports our current state of happiness.
400 bool _fixated; // true if the entity id has become firm.
401 cromp_data_grabber _grabber; // the data grabbing thread.
402 infoton_list _waiting;
403 // used by the push_client_replies() method; allocated once to avoid churn.
404 bool _still_connected;
405 // set to true up until we notice that the client disconnected.
406 login_tentacle &_security_arm; // provides login checking.
407};
408
410
411void cromp_data_grabber::perform_activity(void *)
412{
413#ifdef DEBUG_CROMP_SERVER
414 FUNCDEF("perform_activity");
415#endif
416 while (!should_stop()) {
417// time_stamp started;
418 bool ret = _parent.handle_client_needs(*this);
419// int duration = int(time_stamp().value() - started.value());
420 if (!ret) {
421 // they said to stop.
422#ifdef DEBUG_CROMP_SERVER
423 LOG("done handling client needs.");
424#endif
425 _octo->expunge(_parent.ent());
426 break;
427 }
428 }
429}
430
432
433class cromp_client_list : public amorph<cromp_client_record>
434{
435public:
436 int find(const octopus_entity &to_find) const {
437 for (int i = 0; i < elements(); i++)
438 if (to_find == get(i)->ent()) return i;
439 return common::NOT_FOUND;
440 }
441};
442
444
445class client_dropping_thread : public ethread
446{
447public:
448 client_dropping_thread (cromp_server &parent)
450 _parent(parent) {}
451
452 void perform_activity(void *formal(ptr)) {
453 FUNCDEF("perform_activity");
454 _parent.drop_dead_clients();
455 }
456
457private:
458 cromp_server &_parent; // we perform tricks for this object.
459};
460
462
463class connection_management_thread : public ethread
464{
465public:
466 connection_management_thread(cromp_server &parent)
467 : ethread(),
468 _parent(parent) {}
469
470 void perform_activity(void *formal(ptr)) {
471 FUNCDEF("perform_activity");
472 _parent.look_for_clients(*this);
473 }
474
475private:
476 cromp_server &_parent; // we perform tricks for this object.
477};
478
480
481#undef LOCK_LISTS
482#define LOCK_LISTS auto_synchronizer l(*_list_lock)
483 // takes over access to the client list and root socket.
484
486 int accepting_threads, bool instantaneous, int max_per_ent)
487: cromp_common(cromp_common::chew_hostname(where), max_per_ent),
488 _clients(new cromp_client_list),
489 _accepters(new thread_cabinet),
490 _list_lock(new mutex),
491 _next_droppage(new time_stamp(DEAD_CLIENT_CLEANING_INTERVAL)),
492 _instantaneous(instantaneous),
493 _where(new internet_address(where)),
494 _accepting_threads(accepting_threads),
495 _dropper(new client_dropping_thread(*this)),
496 _enabled(false),
497 _encrypt_arm(NULL_POINTER),
498 _default_security(new cromp_security),
499 _security_arm(NULL_POINTER)
500{
501 FUNCDEF("constructor");
502}
503
505{
507 WHACK(_accepters);
508 WHACK(_dropper);
509 WHACK(_clients);
510 WHACK(_next_droppage);
511 WHACK(_where);
512 WHACK(_default_security);
513 WHACK(_list_lock);
514 _encrypt_arm = NULL_POINTER;
515 _security_arm = NULL_POINTER;
516}
517
518internet_address cromp_server::location() const { return *_where; }
519
520bool cromp_server::get_sizes(const octopus_entity &id, int &items, int &bytes)
521{ return octo()->responses().get_sizes(id, items, bytes); }
522
524{
525 const abyte any_list[] = { 0, 0, 0, 0 };
526 return internet_address(byte_array(4, any_list), "", port);
527}
528
531
533 // default number of listening threads; this is the maximum number of mostly
534 // simultaneous connections that the server can pick up at a time.
535 return 7; // others are not generally so limited on resources.
536}
537
539 const octopus_entity &ent)
540{
541 FUNCDEF("wrap_infoton");
542 if (!_enabled) return NULL_POINTER;
543 // identity is not wrapped with encryption; we need to establish and identity
544 // to talk on a distinct channel with the server. even if that identity were
545 // compromised, the interloper should still not be able to listen in on the
546 // establishment of an encryption channel. also, the encryption startup
547 // itself is not encrypted and we don't want to re-encrypt the wrapper.
548 if (dynamic_cast<identity_infoton *>(request)
549 || dynamic_cast<encryption_infoton *>(request)
550 || dynamic_cast<encryption_wrapper *>(request)) return NULL_POINTER;
551
552#ifdef DEBUG_CROMP_SERVER
553 LOG(astring("encrypting ") + request->text_form());
554#endif
555
556 octenc_key_record *key = _encrypt_arm->keys().lock(ent);
557 // lock here is released a bit down below.
558 if (!key) {
559 LOG(astring("failed to locate key for entity ") + ent.text_form());
560 return NULL_POINTER;
561 }
562 byte_array packed_request;
563 infoton::fast_pack(packed_request, *request);
564 WHACK(request);
565 encryption_wrapper *to_return = new encryption_wrapper;
566 key->_key.encrypt(packed_request, to_return->_wrapped);
567 _encrypt_arm->keys().unlock(key);
568 return to_return;
569}
570
572{
573 FUNCDEF("enable_servers");
574 if (encrypt) {
575 // add the tentacles needed for encryption.
576#ifdef DEBUG_CROMP_SERVER
577 LOG(astring("enabling encryption for ") + class_name()
578 + " on " + _where->text_form());
579#endif
580 _encrypt_arm = new encryption_tentacle;
581 add_tentacle(_encrypt_arm, true);
583 }
584 WHACK(_security_arm); // in case being reused.
585 if (security) {
586 _security_arm = new login_tentacle(*security);
587 add_tentacle(_security_arm, true);
588 } else {
589 _security_arm = new login_tentacle(*_default_security);
590 add_tentacle(_security_arm, true);
591 }
592 open_common(*_where); // open the common ground.
593
594 _enabled = true;
595 // try first accept, no waiting.
596 outcome to_return = accept_one_client(false);
597 if ( (to_return != common::NOT_FOUND) && (to_return != common::OKAY) ) {
598 LOG(astring("failure starting up server: ") + outcome_name(to_return));
599 return to_return;
600 }
601
602#ifdef DEBUG_CROMP_SERVER
603 LOG(a_sprintf("adding %d accepting threads.", _accepting_threads));
604#endif
605 for (int i = 0; i < _accepting_threads; i++) {
606 // crank in a new thread and tell it yes on starting it.
607 _accepters->add_thread(new connection_management_thread(*this), true, NULL_POINTER);
608 }
609
610 _dropper->start(NULL_POINTER);
611 return OKAY;
612}
613
615{
616 FUNCDEF("disable_servers");
617 if (!_enabled) return;
618 _dropper->stop(); // signal the thread to leave when it can.
619 _accepters->stop_all(); // signal the accepting threads to exit.
620 if (_clients) {
622 // make sure no one rearranges or uses the client list while we're
623 // working on it.
624 for (int i = 0; i < _clients->elements(); i++) {
625 // stop the client's activities before the big shutdown.
626 cromp_client_record *cli = (*_clients)[i];
627 if (cli) cli->croak();
628 }
629 }
630
631 close_common(); // zap the socket so that our blocked waiters get woken up.
632
633 // now finalize the shutdown. we don't grab the lock because we don't want
634 // a deadlock, but we also shouldn't need to grab the lock. by here, we have
635 // cancelled all threads, no new clients should be able to be added, and the
636 // destruction of this list will ensure that each client's thread really is
637 // stopped.
638 WHACK(_clients);
639
640 _enabled = false; // record our defunctivity.
641}
642
644{
646 return _clients? _clients->elements() : 0;
647}
648
650{
651 FUNCDEF("disconnect_entity");
652 if (!_enabled) return false;
654 int indy = _clients->find(id);
655 if (negative(indy)) return false; // didn't find it.
656 cromp_client_record *cli = (*_clients)[indy];
657 // disconnect the client and zap its entity records.
658 cli->croak();
659 return true;
660}
661
663 internet_address &found)
664{
665 FUNCDEF("find_entity");
666 if (!_enabled) return false;
667 found = internet_address();
669 int indy = _clients->find(id);
670 if (negative(indy)) return false; // didn't find it.
671 cromp_client_record *cli = (*_clients)[indy];
672 // pull out the address from the record at that index.
673 found = cli->spock()->remote();
674 return true;
675}
676
677outcome cromp_server::accept_one_client(bool wait)
678{
679#ifdef DEBUG_CROMP_SERVER
680 FUNCDEF("accept_one_client");
681#endif
682 if (!_enabled) return common::INCOMPLETE;
683 spocket *accepted = NULL_POINTER;
684//printf((timestamp(true, true) + "into accept\n").s());
685 outcome ret = spock()->accept(accepted, wait);
686//printf((timestamp(true, true) + "out of accept\n").s());
687 // accept and wait for it to finish.
688 if ( (ret == spocket::OKAY) && accepted) {
689 // we got a new client to talk to.
690 cromp_client_record *adding = new cromp_client_record(*this, accepted,
691 octo(), *_security_arm);
692#ifdef DEBUG_CROMP_SERVER
693 LOG(a_sprintf("found a new client on sock %d.", accepted->OS_socket()));
694#endif
695 LOCK_LISTS; // short term lock.
696 _clients->append(adding);
697 return OKAY;
698 } else {
699 if (ret == spocket::NO_CONNECTION)
700 return NOT_FOUND; // normal occurrence.
701#ifdef DEBUG_CROMP_SERVER
702 LOG(astring("error accepting client: ") + spocket::outcome_name(ret));
703#endif
704 return DISALLOWED;
705 }
706}
707
709{
710 FUNCDEF("look_for_clients");
711 if (!_enabled) return;
712 // see if any clients have been accepted.
713 while (!requestor.should_stop()) {
714 outcome ret = accept_one_client(false);
715 if ( (ret != OKAY) && (ret != NOT_FOUND) ) {
716 // we got an error condition besides our normal set.
717//#ifdef DEBUG_CROMP_SERVER
718 LOG(astring("got real error on socket; leaving for good.")
719 + spocket::outcome_name(ret));
720//#endif
721 break;
722 }
723 // if we weren't told we got a client, then we'll sleep. if we did get
724 // a client, we'll try again right away.
725 if (ret != OKAY)
727 }
728}
729
731 infoton *data)
732{
733#ifdef DEBUG_CROMP_SERVER
734 FUNCDEF("send_to_client");
735#endif
736 if (!_enabled) return common::INCOMPLETE;
737 if (!octo()->responses().add_item(data, id)) {
738#ifdef DEBUG_CROMP_SERVER
739 LOG("failed to store result for client--no space left currently.");
740#endif
741 return TOO_FULL;
742 }
743 return OKAY;
744}
745
746/*outcome cromp_server::get_any_from_client(const octopus_entity &ent,
747 infoton * &data, int timeout)
748{
749 FUNCDEF("get_from_client");
750//hmmm: this implementation locks the lists; can't we get the client to do
751// most of the work for this?
752 LOCK_LISTS;
753 int indy = _clients->find(id._entity);
754 if (negative(indy)) return NOT_FOUND; // didn't find it.
755 cromp_client_record *cli = (*_clients)[indy];
756 octopus_request_id id;
757 return cli->retrieve_and_restore_any(data, ent, timeout);
758}
759*/
760
762 infoton * &data, int timeout)
763{
764 FUNCDEF("get_from_client");
765 if (!_enabled) return common::INCOMPLETE;
766//hmmm: this implementation locks the lists; can't we get the client to do
767// most of the work for this?
769 int indy = _clients->find(id._entity);
770 if (negative(indy)) return NOT_FOUND; // didn't find it.
771 cromp_client_record *cli = (*_clients)[indy];
772 return cli->retrieve_and_restore(data, id, timeout);
773}
774
776{
777#ifdef DEBUG_CROMP_SERVER
778 FUNCDEF("drop_dead_clients");
779#endif
780 if (!_enabled) return;
781 // clean out any dead clients.
782
783 {
785 if (time_stamp() < *_next_droppage) return; // not time yet.
786 }
787
788 LOCK_LISTS; // keep locked from now on.
789 for (int i = 0; i < _clients->elements(); i++) {
790 cromp_client_record *cli = (*_clients)[i];
791 if (!cli) {
792#ifdef DEBUG_CROMP_SERVER
793 LOG(astring("error in list structure."));
794#endif
795 _clients->zap(i, i);
796 i--; // skip back before deleted guy.
797 continue;
798 }
799 if (!cli->still_connected() || !cli->healthy()) {
800#ifdef DEBUG_CROMP_SERVER
801 LOG(astring("dropping disconnected client ") + cli->ent().mangled_form());
802#endif
803 cli->croak(); // stop it from operating.
804
805//hmmm: check if it has data waiting and complain about it perhaps.
806 _clients->zap(i, i);
807 i--; // skip back before deleted guy.
808 continue;
809 }
810 }
811
812 _next_droppage->reset(DEAD_CLIENT_CLEANING_INTERVAL);
813}
814
815} //namespace.
816
#define LOG(s)
a_sprintf is a specialization of astring that provides printf style support.
Definition astring.h:440
Provides a dynamically resizable ASCII character string.
Definition astring.h:35
virtual void text_form(base_string &state_fill) const
Provides a text view of all the important info owned by this object.
Definition astring.cpp:130
A very common template for a dynamic array of bytes.
Definition byte_array.h:36
Outcomes describe the state of completion for an operation.
Definition outcome.h:31
A few common features used by both CROMP clients and servers.
static const char * outcome_name(const basis::outcome &to_name)
bool buffer_clog(int clog_point=1 *basis::MEGABYTE) const
basis::outcome open_common(const sockets::internet_address &where)
basis::outcome push_outgoing(int max_tries)
sockets::spocket * spock() const
cromp_common(const basis::astring &host, int max_per_ent)
basis::outcome close_common()
int accumulated_bytes() const
returns the number of bytes pending processing from the other side.
basis::outcome pack_and_ship(const octopi::infoton &request, const octopi::octopus_request_id &item_id, int max_tries)
octopi::octopus * octo() const
basis::outcome retrieve_and_restore_any(octopi::infoton *&item, octopi::octopus_request_id &req_id, int timeout)
int pending_sends() const
returns the number of bytes still unsent.
virtual basis::outcome add_tentacle(octopi::tentacle *to_add, bool filter=false)
void grab_anything(bool wait)
Implements the client registry in a cromp-appropriate manner.
cromp_server(const sockets::internet_address &where, int accepting_threads=DEFAULT_ACCEPTERS(), bool instantaneous=true, int max_per_entity=DEFAULT_MAX_ENTITY_QUEUE)
basis::outcome enable_servers(bool encrypt, cromp_security *security=NULL_POINTER)
bool get_sizes(const octopi::octopus_entity &id, int &items, int &bytes)
octopi::infoton * wrap_infoton(octopi::infoton *&request, const octopi::octopus_entity &ent)
static sockets::internet_address any_address(int port)
basis::outcome send_to_client(const octopi::octopus_request_id &id, octopi::infoton *data)
basis::astring responses_text_form() const
sockets::internet_address location() const
void look_for_clients(processes::ethread &requester)
bool find_entity(const octopi::octopus_entity &id, sockets::internet_address &found)
basis::outcome get_from_client(const octopi::octopus_request_id &id, octopi::infoton *&data, int timeout)
static int DEFAULT_ACCEPTERS()
bool disconnect_entity(const octopi::octopus_entity &id)
returns true if the "id" can be found and disconnected.
bool encrypt(const basis::byte_array &source, basis::byte_array &target) const
encrypts the "source" array into the "target" array.
Encapsulates the chit-chat necessary to establish an encrypted connection.
Processes the encryption_infoton object for setting up an encrypted channel.
key_repository & keys() const
provides access to our list of keys.
Wraps an encrypted infoton when the octopus is in an encrypted mode.
basis::byte_array _wrapped
the encrypted data that's held here.
basis::astring text_form() const
bool get_sizes(const octopus_entity &id, int &items, int &bytes) const
Encapsulates just the action of identifying an octopus user.
a list of pending requests and who made them.
An infoton is an individual request parcel with accompanying information.
Definition infoton.h:32
virtual void text_form(basis::base_string &state_fill) const =0
requires derived infotons to be able to show their state as a string.
static void fast_pack(basis::byte_array &packed_form, const infoton &to_pack)
flattens an infoton "to_pack" into the byte array "packed_form".
Definition infoton.cpp:162
const structures::string_array & classifier() const
this array of strings is the "name" for this infoton.
Definition infoton.cpp:85
void unlock(octenc_key_record *to_unlock)
drops the lock on the key record in "to_unlock".
octenc_key_record * lock(const octopus_entity &ent)
locates the key for "ent", if it's stored.
Provides rudimentary login services.
Tracks the keys that have been assigned for a secure channel.
crypto::blowfish_crypto _key
used for communicating with an entity.
Provides a way of identifying users of an octopus object.
Definition entity_defs.h:35
basis::astring text_form() const
returns a readable form of the identifier.
bool blank() const
true if the entity is blank, as constructed by default constructor.
basis::astring mangled_form() const
returns the combined string form of the identifier.
Identifies requests made on an octopus by users.
basis::astring text_form() const
human readable form of the request.
octopus_entity _entity
the entity.
Octopus is a design pattern for generalized request processing systems.
Definition octopus.h:47
entity_data_bin & responses()
allows external access to our set of results.
Definition octopus.cpp:178
Informs the caller that a request type was unknown to the server octopus.
this simple tentacle just unpacks the encryption_wrapper infoton.
Provides a platform-independent object for adding threads to a program.
Definition ethread.h:36
virtual void perform_activity(void *thread_data)=0
< invoked just after after start(), when the OS thread is created.
bool should_stop() const
reports whether the thread should stop right now.
Definition ethread.h:136
Manages a collection of threads.
structures::unique_int add_thread(ethread *to_add, bool start_it, void *parm)
adds a thread to be managed by the thread_cabinet.
void stop_all()
makes all of the threads quit.
this type of address describes a destination out on the internet.
basis::astring text_form() const
static const basis::byte_array & localhost()
Abstraction for a higher-level BSD socket that is platform independent.
Definition spocket.h:40
tcpip_stack & stack() const
Definition spocket.cpp:150
static const char * outcome_name(const basis::outcome &to_name)
Definition spocket.cpp:180
basis::un_int OS_socket()
Definition spocket.h:108
basis::outcome accept(spocket *&sock, bool wait)
Definition spocket.cpp:460
basis::astring hostname() const
int elements() const
the maximum number of elements currently allowed in this amorph.
Definition amorph.h:66
int find(const cromp_client_record *to_locate, basis::outcome &o)
Searches the amorph for the contents specified.
Definition amorph.h:432
const cromp_client_record * get(int field) const
Returns a constant pointer to the information at the index "field".
Definition amorph.h:312
An array of strings with some additional helpful methods.
basis::astring text_form() const
A synonym for the text_format() method.
static void sleep_ms(basis::un_int msec)
a system independent name for a forced snooze measured in milliseconds.
Represents a point in time relative to the operating system startup time.
Definition time_stamp.h:38
void reset()
sets the stamp time back to now.
time_representation value() const
returns the time_stamp in terms of the lower level type.
Definition time_stamp.h:61
#define LOCK_LISTS
#define formal(parameter)
This macro just eats what it's passed; it marks unused formal parameters.
Definition definitions.h:48
#define NULL_POINTER
The value representing a pointer to nothing.
Definition definitions.h:32
#define DEFINE_CLASS_NAME(objname)
Defines the name of a class by providing a couple standard methods.
Definition enhance_cpp.h:42
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
Definition enhance_cpp.h:54
The guards collection helps in testing preconditions and reporting errors.
Definition array.h:30
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
Definition functions.h:121
const int MEGABYTE
Number of bytes in a megabyte.
unsigned char abyte
A fairly important unit which is seldom defined...
Definition definitions.h:51
const int SECOND_ms
Number of milliseconds in a second.
bool negative(const type &a)
negative returns true if "a" is less than zero.
Definition functions.h:43
const int KILOBYTE
Number of bytes in a kilobyte.
const int DEAD_CLIENT_CLEANING_INTERVAL
const int MAXIMUM_ACTIONS_PER_CLIENT
const int DROPPING_INTERVAL
const int SEND_TRIES_ALLOWED
const int MAXIMUM_SIZE_BATCH
const int SEND_THRESHOLD
const int ACCEPTANCE_SNOOZE
const int DATA_AWAIT_TIMEOUT
const int EXTREME_SEND_TRIES_ALLOWED
const int MAXIMUM_BYTES_PER_SEND
A logger that sends to the console screen using the standard output device.
Provides access to the operating system's socket methods.
A dynamic container class that holds any kind of object via pointers.
Definition amorph.h:55
#include <time.h>