1 /*****************************************************************************\
3 * Name : cromp_server *
4 * Author : Chris Koeritz *
6 *******************************************************************************
7 * Copyright (c) 2000-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
15 #include "cromp_common.h"
16 #include "cromp_security.h"
17 #include "cromp_server.h"
19 #include <basis/astring.h>
20 #include <basis/functions.h>
21 #include <basis/mutex.h>
22 #include <loggers/program_wide_logger.h>
23 #include <octopus/entity_data_bin.h>
24 #include <octopus/entity_defs.h>
25 #include <octopus/identity_infoton.h>
26 #include <octopus/infoton.h>
27 #include <octopus/tentacle.h>
28 #include <octopus/unhandled_request.h>
29 #include <processes/ethread.h>
30 #include <processes/thread_cabinet.h>
31 #include <sockets/internet_address.h>
32 #include <sockets/tcpip_stack.h>
33 #include <sockets/spocket.h>
34 #include <structures/amorph.h>
35 #include <structures/unique_id.h>
36 #include <tentacles/key_repository.h>
37 #include <tentacles/login_tentacle.h>
38 #include <tentacles/encryption_tentacle.h>
39 #include <tentacles/encryption_wrapper.h>
40 #include <timely/time_control.h>
42 using namespace basis;
43 using namespace loggers;
44 using namespace octopi;
45 using namespace processes;
46 using namespace sockets;
47 using namespace structures;
48 using namespace timely;
52 //#define DEBUG_CROMP_SERVER
53 // uncomment for noisy version.
55 const int DEAD_CLIENT_CLEANING_INTERVAL = 1 * SECOND_ms;
56 // we will drop any clients that have disconnected this long ago.
58 const int MAXIMUM_ACTIONS_PER_CLIENT = 4000;
59 // this is the maximum number of things we'll do in one run for a
60 // client, including both sends and receives.
62 const int SEND_TRIES_ALLOWED = 1;
63 // the number of attempts we will make to get outgoing data to send.
65 const int SEND_THRESHOLD = 512 * KILOBYTE;
66 // if we pile up some data to this point in our client gathering, we'll
67 // go ahead and start pushing it to the client.
69 const int EXTREME_SEND_TRIES_ALLOWED = 28;
70 // if we're clogged, we'll push this many times to get data out.
72 const int MAXIMUM_BYTES_PER_SEND = 2 * MEGABYTE;
73 // the maximum size we want our buffer to grow.
75 const int MAXIMUM_SIZE_BATCH = 384 * KILOBYTE;
76 // the largest chunk of updates we'll try to grab at one time.
78 const int DROPPING_INTERVAL = 500;
79 // the rate at which we'll check for dead clients and clean up.
81 const int DATA_AWAIT_TIMEOUT = 14;
82 // how long the server zones out waiting for data.
84 const int ACCEPTANCE_SNOOZE = 60;
85 // if the server sees no clients, it will take a little nap.
88 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(to_print).s())
93 class cromp_client_record;
95 class cromp_data_grabber : public ethread
98 cromp_data_grabber(cromp_client_record &parent, octopus *octo)
99 : ethread(), _parent(parent), _octo(octo) {}
101 DEFINE_CLASS_NAME("cromp_data_grabber");
103 virtual void perform_activity(void *);
106 cromp_client_record &_parent;
112 class cromp_client_record : public cromp_common
115 cromp_client_record(cromp_server &parent, spocket *client, octopus *octo,
116 login_tentacle &security)
117 : cromp_common(client, octo),
123 _grabber(*this, octo),
125 _still_connected(true),
126 _security_arm(security)
128 internet_address local_addr = internet_address
129 (internet_address::localhost(), client->stack().hostname(), 0);
130 open_common(local_addr); // open the common support for biz.
131 _grabber.start(NULL_POINTER); // crank up our background data pump on the socket.
134 ~cromp_client_record() {
138 DEFINE_CLASS_NAME("cromp_client_record");
140 bool handle_client_needs(ethread &prompter) {
141 #ifdef DEBUG_CROMP_SERVER
142 FUNCDEF("handle_client_needs");
145 if (!_healthy) return false; // done.
146 if (!spock()->connected()) {
147 _still_connected = false;
148 return false; // need to stop now.
150 bool keep_going = true;
152 while (keep_going && (actions < MAXIMUM_ACTIONS_PER_CLIENT) ) {
153 // make sure we don't overstay our welcome when the thread's supposed
155 if (prompter.should_stop()) return false;
156 keep_going = false; // only continue if there's a reason.
157 bool ret = get_incoming_data(actions); // look for requests.
158 if (ret) keep_going = true;
159 ret = push_client_replies(actions); // send replies back to the client.
160 if (ret) keep_going = true;
163 #ifdef DEBUG_CROMP_SERVER
165 LOG(a_sprintf("actions=%d", actions));
166 LOG(a_sprintf("%d pending send bytes, %d bytes accumulated, bin has "
167 "%d items.", pending_sends(), accumulated_bytes(),
168 octo()->responses().items_held()));
171 int duration = int(time_stamp().value() - start.value());
172 if (duration > 200) {
173 LOG(a_sprintf("duration=%d ms.", duration));
180 const octopus_entity &ent() const { return _ent; }
182 // stops the background activity of this object and drops the connection
188 while (get_incoming_data(actions)) {
189 // keep receiving whatever's there already. we are trying to drain
190 // the socket before destroying it.
193 // clean out any records for this goner.
194 _security_arm.expunge(_ent);
198 bool healthy() const { return _healthy; }
199 // this is true unless the object has been told to shut down.
201 bool still_connected() const { return _still_connected; }
202 // this is true unless the client side dropped the connection.
204 cromp_server &parent() const { return _parent; }
206 bool push_client_replies(int &actions) {
207 FUNCDEF("push_client_replies");
208 if (!healthy()) return false;
210 // not pushing replies if we haven't even gotten a command yet.
211 #ifdef DEBUG_CROMP_SERVER
212 LOG("not pushing replies for blank.");
217 if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
218 LOG("buffer clog being cleared now.");
219 // the buffers are pretty full; we'll try later.
220 push_outgoing(EXTREME_SEND_TRIES_ALLOWED);
221 // if we're still clogged, then leave.
222 if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
223 LOG("could not completely clear buffer clog.");
226 LOG("cleared out buffer clog.");
230 while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
231 // make sure we're not wasting our time.
232 if (!_octo->responses().items_held()) {
236 // make sure we don't ignore receptions.
237 grab_anything(false);
238 // try to grab a result for this entity.
239 int num_located = _octo->responses().acquire_for_entity(ent(),
240 _waiting, MAXIMUM_SIZE_BATCH);
246 // if we're encrypting, we need to wrap these as well.
247 if (_parent.encrypting()) {
248 for (int i = 0; i < _waiting.elements(); i++) {
249 infoton *curr = _waiting[i]->_data;
250 infoton *processed = _parent.wrap_infoton(curr,
251 _waiting[i]->_id._entity);
252 if (processed) _waiting[i]->_data = processed; // replace infoton.
256 outcome ret = pack_and_ship(_waiting, 0);
257 // no attempt to send yet; we're just stuffing the buffer.
258 if ( (ret != cromp_common::OKAY) && (ret != cromp_common::TIMED_OUT) ) {
259 //hmmm: what about keeping transmission as held in list; retry later on it?
261 //#ifdef DEBUG_CROMP_SERVER
262 LOG(astring("failed to send package back to client: ")
263 + cromp_common::outcome_name(ret));
269 if (pending_sends() > SEND_THRESHOLD) {
270 #ifdef DEBUG_CROMP_SERVER
271 LOG(astring("over sending threshold on ") + _ent.text_form());
273 push_outgoing(SEND_TRIES_ALLOWED);
277 // now that we've got a pile possibly, we'll try to send them out.
278 push_outgoing(SEND_TRIES_ALLOWED);
279 if (!spock()->connected()) {
280 #ifdef DEBUG_CROMP_SERVER
281 LOG("noticed disconnection of client.");
283 _still_connected = false;
288 bool get_incoming_data(int &actions) {
289 FUNCDEF("get_incoming_data");
290 if (!healthy()) return false;
291 int first_one = true;
292 bool saw_something = false; // true if we got a packet.
293 while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
294 // pull in anything waiting.
295 infoton *item = NULL_POINTER;
296 octopus_request_id req_id;
297 outcome ret = retrieve_and_restore_any(item, req_id,
298 first_one? DATA_AWAIT_TIMEOUT : 0);
300 if (ret == cromp_common::TIMED_OUT) {
301 actions--; // didn't actually eat one.
303 } else if (ret != cromp_common::OKAY) {
304 #ifdef DEBUG_CROMP_SERVER
305 LOG(astring("got error ") + cromp_common::outcome_name(ret));
307 if (ret == cromp_common::NO_CONNECTION) {
308 #ifdef DEBUG_CROMP_SERVER
309 LOG("noticed disconnection of client.");
311 _still_connected = false;
313 actions--; // didn't actually eat one.
314 return false; // get outa here.
317 saw_something = true;
319 if (req_id._entity.blank()) {
320 LOG(astring("would have assigned ours to blank id! ")
321 + req_id._entity.mangled_form());
325 #ifdef DEBUG_CROMP_SERVER
326 LOG(astring("cmd with entity ") + req_id._entity.mangled_form());
329 // assign the entity id now that we know it.
330 _ent = req_id._entity;
331 #ifdef DEBUG_CROMP_SERVER
332 LOG(astring("assigned own entity to ") + _ent.mangled_form());
334 } else if (!_fixated && (_ent != req_id._entity) ) {
335 #ifdef DEBUG_CROMP_SERVER
336 LOG(astring("fixated on entity of ") + req_id._entity.mangled_form()
337 + " where we used to have " + _ent.mangled_form());
339 _ent = req_id._entity;
342 } // connects to line after debug just below.
343 #ifdef DEBUG_CROMP_SERVER
344 else if (_ent != req_id._entity) {
345 // this checks the validity of the entity.
346 #ifdef DEBUG_CROMP_SERVER
347 LOG(astring("seeing wrong entity of ") + req_id._entity.mangled_form()
348 + " when we fixated on " + _ent.mangled_form());
354 // check again so we make sure we're still healthy; could have changed
355 // state while getting a command.
360 string_array classif = item->classifier();
361 // hang onto the classifier since the next time we get a chance, the
362 // object might be destroyed.
364 // we pass responsibility for this item over to the octopus. that's why
365 // we're not deleting it once evaluate gets the item.
366 ret = _octo->evaluate(item, req_id, _parent.instantaneous());
367 if (ret != tentacle::OKAY) {
368 #ifdef DEBUG_CROMP_SERVER
369 LOG(astring("failed to evaluate the infoton we got: ")
370 + classif.text_form());
372 //hmmm: we have upgraded this response to be for all errors, since otherwise
373 // clients will just time out waiting for something that's never coming.
375 // we do a special type of handling when the tentacle is missing. this
376 // is almost always because the wrong type of request is being sent to
377 // a server, or the server didn't register for all the objects it is
378 // supposed to handle.
379 ///// if (ret == tentacle::NOT_FOUND) {
380 //#ifdef DEBUG_CROMP_SERVER
381 LOG(astring("injecting unhandled note into response stream for ")
382 + req_id.text_form() + ", got outcome " + outcome_name(ret));
384 _parent.send_to_client(req_id,
385 new unhandled_request(req_id, classif, ret));
386 // this will always work, although it's not a surety that the
387 // client actually still exists. probably though, since we're
388 // just now handling this request.
392 return saw_something; // keep going if we actually did anything good.
396 cromp_server &_parent; // the object that owns this client.
398 octopus_entity _ent; // the entity by which we know this client.
399 bool _healthy; // reports our current state of happiness.
400 bool _fixated; // true if the entity id has become firm.
401 cromp_data_grabber _grabber; // the data grabbing thread.
402 infoton_list _waiting;
403 // used by the push_client_replies() method; allocated once to avoid churn.
404 bool _still_connected;
405 // set to true up until we notice that the client disconnected.
406 login_tentacle &_security_arm; // provides login checking.
411 void cromp_data_grabber::perform_activity(void *)
413 #ifdef DEBUG_CROMP_SERVER
414 FUNCDEF("perform_activity");
416 while (!should_stop()) {
417 // time_stamp started;
418 bool ret = _parent.handle_client_needs(*this);
419 // int duration = int(time_stamp().value() - started.value());
421 // they said to stop.
422 #ifdef DEBUG_CROMP_SERVER
423 LOG("done handling client needs.");
425 _octo->expunge(_parent.ent());
433 class cromp_client_list : public amorph<cromp_client_record>
436 int find(const octopus_entity &to_find) const {
437 for (int i = 0; i < elements(); i++)
438 if (to_find == get(i)->ent()) return i;
439 return common::NOT_FOUND;
445 class client_dropping_thread : public ethread
448 client_dropping_thread (cromp_server &parent)
449 : ethread(DROPPING_INTERVAL),
452 void perform_activity(void *formal(ptr)) {
453 FUNCDEF("perform_activity");
454 _parent.drop_dead_clients();
458 cromp_server &_parent; // we perform tricks for this object.
463 class connection_management_thread : public ethread
466 connection_management_thread(cromp_server &parent)
470 void perform_activity(void *formal(ptr)) {
471 FUNCDEF("perform_activity");
472 _parent.look_for_clients(*this);
476 cromp_server &_parent; // we perform tricks for this object.
482 #define LOCK_LISTS auto_synchronizer l(*_list_lock)
483 // takes over access to the client list and root socket.
485 cromp_server::cromp_server(const internet_address &where,
486 int accepting_threads, bool instantaneous, int max_per_ent)
487 : cromp_common(cromp_common::chew_hostname(where), max_per_ent),
488 _clients(new cromp_client_list),
489 _accepters(new thread_cabinet),
490 _list_lock(new mutex),
491 _next_droppage(new time_stamp(DEAD_CLIENT_CLEANING_INTERVAL)),
492 _instantaneous(instantaneous),
493 _where(new internet_address(where)),
494 _accepting_threads(accepting_threads),
495 _dropper(new client_dropping_thread(*this)),
497 _encrypt_arm(NULL_POINTER),
498 _default_security(new cromp_security),
499 _security_arm(NULL_POINTER)
501 FUNCDEF("constructor");
504 cromp_server::~cromp_server()
510 WHACK(_next_droppage);
512 WHACK(_default_security);
514 _encrypt_arm = NULL_POINTER;
515 _security_arm = NULL_POINTER;
518 internet_address cromp_server::location() const { return *_where; }
520 bool cromp_server::get_sizes(const octopus_entity &id, int &items, int &bytes)
521 { return octo()->responses().get_sizes(id, items, bytes); }
523 internet_address cromp_server::any_address(int port)
525 const abyte any_list[] = { 0, 0, 0, 0 };
526 return internet_address(byte_array(4, any_list), "", port);
529 astring cromp_server::responses_text_form() const
530 { return octo()->responses().text_form(); }
532 int cromp_server::DEFAULT_ACCEPTERS() {
533 // default number of listening threads; this is the maximum number of mostly
534 // simultaneous connections that the server can pick up at a time.
535 return 7; // others are not generally so limited on resources.
538 infoton *cromp_server::wrap_infoton(infoton * &request,
539 const octopus_entity &ent)
541 FUNCDEF("wrap_infoton");
542 if (!_enabled) return NULL_POINTER;
543 // identity is not wrapped with encryption; we need to establish and identity
544 // to talk on a distinct channel with the server. even if that identity were
545 // compromised, the interloper should still not be able to listen in on the
546 // establishment of an encryption channel. also, the encryption startup
547 // itself is not encrypted and we don't want to re-encrypt the wrapper.
548 if (dynamic_cast<identity_infoton *>(request)
549 || dynamic_cast<encryption_infoton *>(request)
550 || dynamic_cast<encryption_wrapper *>(request)) return NULL_POINTER;
552 #ifdef DEBUG_CROMP_SERVER
553 LOG(astring("encrypting ") + request->text_form());
556 octenc_key_record *key = _encrypt_arm->keys().lock(ent);
557 // lock here is released a bit down below.
559 LOG(astring("failed to locate key for entity ") + ent.text_form());
562 byte_array packed_request;
563 infoton::fast_pack(packed_request, *request);
565 encryption_wrapper *to_return = new encryption_wrapper;
566 key->_key.encrypt(packed_request, to_return->_wrapped);
567 _encrypt_arm->keys().unlock(key);
571 outcome cromp_server::enable_servers(bool encrypt, cromp_security *security)
573 FUNCDEF("enable_servers");
575 // add the tentacles needed for encryption.
576 #ifdef DEBUG_CROMP_SERVER
577 LOG(astring("enabling encryption for ") + class_name()
578 + " on " + _where->text_form());
580 _encrypt_arm = new encryption_tentacle;
581 add_tentacle(_encrypt_arm, true);
582 add_tentacle(new unwrapping_tentacle, false);
584 WHACK(_security_arm); // in case being reused.
586 _security_arm = new login_tentacle(*security);
587 add_tentacle(_security_arm, true);
589 _security_arm = new login_tentacle(*_default_security);
590 add_tentacle(_security_arm, true);
592 open_common(*_where); // open the common ground.
595 // try first accept, no waiting.
596 outcome to_return = accept_one_client(false);
597 if ( (to_return != common::NOT_FOUND) && (to_return != common::OKAY) ) {
598 LOG(astring("failure starting up server: ") + outcome_name(to_return));
602 #ifdef DEBUG_CROMP_SERVER
603 LOG(a_sprintf("adding %d accepting threads.", _accepting_threads));
605 for (int i = 0; i < _accepting_threads; i++) {
606 // crank in a new thread and tell it yes on starting it.
607 _accepters->add_thread(new connection_management_thread(*this), true, NULL_POINTER);
610 _dropper->start(NULL_POINTER);
614 void cromp_server::disable_servers()
616 FUNCDEF("disable_servers");
617 if (!_enabled) return;
618 _dropper->stop(); // signal the thread to leave when it can.
619 _accepters->stop_all(); // signal the accepting threads to exit.
622 // make sure no one rearranges or uses the client list while we're
624 for (int i = 0; i < _clients->elements(); i++) {
625 // stop the client's activities before the big shutdown.
626 cromp_client_record *cli = (*_clients)[i];
627 if (cli) cli->croak();
631 close_common(); // zap the socket so that our blocked waiters get woken up.
633 // now finalize the shutdown. we don't grab the lock because we don't want
634 // a deadlock, but we also shouldn't need to grab the lock. by here, we have
635 // cancelled all threads, no new clients should be able to be added, and the
636 // destruction of this list will ensure that each client's thread really is
640 _enabled = false; // record our defunctivity.
643 int cromp_server::clients() const
646 return _clients? _clients->elements() : 0;
649 bool cromp_server::disconnect_entity(const octopus_entity &id)
651 FUNCDEF("disconnect_entity");
652 if (!_enabled) return false;
654 int indy = _clients->find(id);
655 if (negative(indy)) return false; // didn't find it.
656 cromp_client_record *cli = (*_clients)[indy];
657 // disconnect the client and zap its entity records.
662 bool cromp_server::find_entity(const octopus_entity &id,
663 internet_address &found)
665 FUNCDEF("find_entity");
666 if (!_enabled) return false;
667 found = internet_address();
669 int indy = _clients->find(id);
670 if (negative(indy)) return false; // didn't find it.
671 cromp_client_record *cli = (*_clients)[indy];
672 // pull out the address from the record at that index.
673 found = cli->spock()->remote();
677 outcome cromp_server::accept_one_client(bool wait)
679 #ifdef DEBUG_CROMP_SERVER
680 FUNCDEF("accept_one_client");
682 if (!_enabled) return common::INCOMPLETE;
683 spocket *accepted = NULL_POINTER;
684 //printf((timestamp(true, true) + "into accept\n").s());
685 outcome ret = spock()->accept(accepted, wait);
686 //printf((timestamp(true, true) + "out of accept\n").s());
687 // accept and wait for it to finish.
688 if ( (ret == spocket::OKAY) && accepted) {
689 // we got a new client to talk to.
690 cromp_client_record *adding = new cromp_client_record(*this, accepted,
691 octo(), *_security_arm);
692 #ifdef DEBUG_CROMP_SERVER
693 LOG(a_sprintf("found a new client on sock %d.", accepted->OS_socket()));
695 LOCK_LISTS; // short term lock.
696 _clients->append(adding);
699 if (ret == spocket::NO_CONNECTION)
700 return NOT_FOUND; // normal occurrence.
701 #ifdef DEBUG_CROMP_SERVER
702 LOG(astring("error accepting client: ") + spocket::outcome_name(ret));
708 void cromp_server::look_for_clients(ethread &requestor)
710 FUNCDEF("look_for_clients");
711 if (!_enabled) return;
712 // see if any clients have been accepted.
713 while (!requestor.should_stop()) {
714 outcome ret = accept_one_client(false);
715 if ( (ret != OKAY) && (ret != NOT_FOUND) ) {
716 // we got an error condition besides our normal set.
717 //#ifdef DEBUG_CROMP_SERVER
718 LOG(astring("got real error on socket; leaving for good.")
719 + spocket::outcome_name(ret));
723 // if we weren't told we got a client, then we'll sleep. if we did get
724 // a client, we'll try again right away.
726 time_control::sleep_ms(ACCEPTANCE_SNOOZE);
730 outcome cromp_server::send_to_client(const octopus_request_id &id,
733 #ifdef DEBUG_CROMP_SERVER
734 FUNCDEF("send_to_client");
736 if (!_enabled) return common::INCOMPLETE;
737 if (!octo()->responses().add_item(data, id)) {
738 #ifdef DEBUG_CROMP_SERVER
739 LOG("failed to store result for client--no space left currently.");
746 /*outcome cromp_server::get_any_from_client(const octopus_entity &ent,
747 infoton * &data, int timeout)
749 FUNCDEF("get_from_client");
750 //hmmm: this implementation locks the lists; can't we get the client to do
751 // most of the work for this?
753 int indy = _clients->find(id._entity);
754 if (negative(indy)) return NOT_FOUND; // didn't find it.
755 cromp_client_record *cli = (*_clients)[indy];
756 octopus_request_id id;
757 return cli->retrieve_and_restore_any(data, ent, timeout);
761 outcome cromp_server::get_from_client(const octopus_request_id &id,
762 infoton * &data, int timeout)
764 FUNCDEF("get_from_client");
765 if (!_enabled) return common::INCOMPLETE;
766 //hmmm: this implementation locks the lists; can't we get the client to do
767 // most of the work for this?
769 int indy = _clients->find(id._entity);
770 if (negative(indy)) return NOT_FOUND; // didn't find it.
771 cromp_client_record *cli = (*_clients)[indy];
772 return cli->retrieve_and_restore(data, id, timeout);
775 void cromp_server::drop_dead_clients()
777 #ifdef DEBUG_CROMP_SERVER
778 FUNCDEF("drop_dead_clients");
780 if (!_enabled) return;
781 // clean out any dead clients.
785 if (time_stamp() < *_next_droppage) return; // not time yet.
788 LOCK_LISTS; // keep locked from now on.
789 for (int i = 0; i < _clients->elements(); i++) {
790 cromp_client_record *cli = (*_clients)[i];
792 #ifdef DEBUG_CROMP_SERVER
793 LOG(astring("error in list structure."));
796 i--; // skip back before deleted guy.
799 if (!cli->still_connected() || !cli->healthy()) {
800 #ifdef DEBUG_CROMP_SERVER
801 LOG(astring("dropping disconnected client ") + cli->ent().mangled_form());
803 cli->croak(); // stop it from operating.
805 //hmmm: check if it has data waiting and complain about it perhaps.
807 i--; // skip back before deleted guy.
812 _next_droppage->reset(DEAD_CLIENT_CLEANING_INTERVAL);