feisty meow concerns codebase  2.140
cromp_server.cpp
Go to the documentation of this file.
1 /*****************************************************************************\
2 * *
3 * Name : cromp_server *
4 * Author : Chris Koeritz *
5 * *
6 *******************************************************************************
7 * Copyright (c) 2000-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
14 
15 #include "cromp_common.h"
16 #include "cromp_security.h"
17 #include "cromp_server.h"
18 
19 #include <basis/astring.h>
20 #include <basis/functions.h>
21 #include <basis/mutex.h>
24 #include <octopus/entity_defs.h>
26 #include <octopus/infoton.h>
27 #include <octopus/tentacle.h>
29 #include <processes/ethread.h>
32 #include <sockets/tcpip_stack.h>
33 #include <sockets/spocket.h>
34 #include <structures/amorph.h>
35 #include <structures/unique_id.h>
40 #include <timely/time_control.h>
41 
42 using namespace basis;
43 using namespace loggers;
44 using namespace octopi;
45 using namespace processes;
46 using namespace sockets;
47 using namespace structures;
48 using namespace timely;
49 
50 namespace cromp {
51 
52 //#define DEBUG_CROMP_SERVER
53  // uncomment for noisy version.
54 
56  // we will drop any clients that have disconnected this long ago.
57 
58 const int MAXIMUM_ACTIONS_PER_CLIENT = 4000;
59  // this is the maximum number of things we'll do in one run for a
60  // client, including both sends and receives.
61 
62 const int SEND_TRIES_ALLOWED = 1;
63  // the number of attempts we will make to get outgoing data to send.
64 
65 const int SEND_THRESHOLD = 512 * KILOBYTE;
66  // if we pile up some data to this point in our client gathering, we'll
67  // go ahead and start pushing it to the client.
68 
70  // if we're clogged, we'll push this many times to get data out.
71 
73  // the maximum size we want our buffer to grow.
74 
75 const int MAXIMUM_SIZE_BATCH = 384 * KILOBYTE;
76  // the largest chunk of updates we'll try to grab at one time.
77 
78 const int DROPPING_INTERVAL = 500;
79  // the rate at which we'll check for dead clients and clean up.
80 
81 const int DATA_AWAIT_TIMEOUT = 14;
82  // how long the server zones out waiting for data.
83 
84 const int ACCEPTANCE_SNOOZE = 60;
85  // if the server sees no clients, it will take a little nap.
86 
87 #undef LOG
88 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(to_print).s())
89 
91 
92 // forward.
93 class cromp_client_record;
94 
95 class cromp_data_grabber : public ethread
96 {
97 public:
98  cromp_data_grabber(cromp_client_record &parent, octopus *octo)
99  : ethread(), _parent(parent), _octo(octo) {}
100 
101  DEFINE_CLASS_NAME("cromp_data_grabber");
102 
103  virtual void perform_activity(void *);
104 
105 private:
106  cromp_client_record &_parent;
107  octopus *_octo;
108 };
109 
111 
112 class cromp_client_record : public cromp_common
113 {
114 public:
115  cromp_client_record(cromp_server &parent, spocket *client, octopus *octo,
116  login_tentacle &security)
117  : cromp_common(client, octo),
118  _parent(parent),
119  _octo(octo),
120  _ent(),
121  _healthy(true),
122  _fixated(false),
123  _grabber(*this, octo),
124  _waiting(),
125  _still_connected(true),
126  _security_arm(security)
127  {
129  (internet_address::localhost(), client->stack().hostname(), 0);
130  open_common(local_addr); // open the common support for biz.
131  _grabber.start(NULL_POINTER); // crank up our background data pump on the socket.
132  }
133 
134  ~cromp_client_record() {
135  croak();
136  }
137 
138  DEFINE_CLASS_NAME("cromp_client_record");
139 
140  bool handle_client_needs(ethread &prompter) {
141 #ifdef DEBUG_CROMP_SERVER
142  FUNCDEF("handle_client_needs");
143  time_stamp start;
144 #endif
145  if (!_healthy) return false; // done.
146  if (!spock()->connected()) {
147  _still_connected = false;
148  return false; // need to stop now.
149  }
150  bool keep_going = true;
151  int actions = 0;
152  while (keep_going && (actions < MAXIMUM_ACTIONS_PER_CLIENT) ) {
153  // make sure we don't overstay our welcome when the thread's supposed
154  // to quit.
155  if (prompter.should_stop()) return false;
156  keep_going = false; // only continue if there's a reason.
157  bool ret = get_incoming_data(actions); // look for requests.
158  if (ret) keep_going = true;
159  ret = push_client_replies(actions); // send replies back to the client.
160  if (ret) keep_going = true;
161  }
162 
163 #ifdef DEBUG_CROMP_SERVER
164  if (actions > 10) {
165  LOG(a_sprintf("actions=%d", actions));
166  LOG(a_sprintf("%d pending send bytes, %d bytes accumulated, bin has "
167  "%d items.", pending_sends(), accumulated_bytes(),
168  octo()->responses().items_held()));
169  }
170 
171  int duration = int(time_stamp().value() - start.value());
172  if (duration > 200) {
173  LOG(a_sprintf("duration=%d ms.", duration));
174  }
175 #endif
176 
177  return true;
178  }
179 
180  const octopus_entity &ent() const { return _ent; }
181 
182  // stops the background activity of this object and drops the connection
183  // to the client.
184  void croak() {
185  FUNCDEF("croak");
186  _grabber.stop();
187  int actions = 0;
188  while (get_incoming_data(actions)) {
189  // keep receiving whatever's there already. we are trying to drain
190  // the socket before destroying it.
191  }
192  _healthy = false;
193  // clean out any records for this goner.
194  _security_arm.expunge(_ent);
195  close_common();
196  }
197 
198  bool healthy() const { return _healthy; }
199  // this is true unless the object has been told to shut down.
200 
201  bool still_connected() const { return _still_connected; }
202  // this is true unless the client side dropped the connection.
203 
204  cromp_server &parent() const { return _parent; }
205 
206  bool push_client_replies(int &actions) {
207  FUNCDEF("push_client_replies");
208  if (!healthy()) return false;
209  if (ent().blank()) {
210  // not pushing replies if we haven't even gotten a command yet.
211 #ifdef DEBUG_CROMP_SERVER
212  LOG("not pushing replies for blank.");
213 #endif
214  return false;
215  }
216 
217  if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
218 LOG("buffer clog being cleared now.");
219  // the buffers are pretty full; we'll try later.
220  push_outgoing(EXTREME_SEND_TRIES_ALLOWED);
221  // if we're still clogged, then leave.
222  if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
223 LOG("could not completely clear buffer clog.");
224  return true;
225  }
226 LOG("cleared out buffer clog.");
227  }
228 
229  int any_left = true;
230  while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
231  // make sure we're not wasting our time.
232  if (!_octo->responses().items_held()) {
233  any_left = false;
234  break;
235  }
236  // make sure we don't ignore receptions.
237  grab_anything(false);
238  // try to grab a result for this entity.
239  int num_located = _octo->responses().acquire_for_entity(ent(),
240  _waiting, MAXIMUM_SIZE_BATCH);
241  if (!num_located) {
242  any_left = false;
243  break;
244  }
245 
246  // if we're encrypting, we need to wrap these as well.
247  if (_parent.encrypting()) {
248  for (int i = 0; i < _waiting.elements(); i++) {
249  infoton *curr = _waiting[i]->_data;
250  infoton *processed = _parent.wrap_infoton(curr,
251  _waiting[i]->_id._entity);
252  if (processed) _waiting[i]->_data = processed; // replace infoton.
253  }
254  }
255 
256  outcome ret = pack_and_ship(_waiting, 0);
257  // no attempt to send yet; we're just stuffing the buffer.
258  if ( (ret != cromp_common::OKAY) && (ret != cromp_common::TIMED_OUT) ) {
259 //hmmm: what about keeping transmission as held in list; retry later on it?
260 
261 //#ifdef DEBUG_CROMP_SERVER
262  LOG(astring("failed to send package back to client: ")
263  + cromp_common::outcome_name(ret));
264 //#endif
265  any_left = false;
266  break;
267  }
268 
269  if (pending_sends() > SEND_THRESHOLD) {
270 #ifdef DEBUG_CROMP_SERVER
271  LOG(astring("over sending threshold on ") + _ent.text_form());
272 #endif
273  push_outgoing(SEND_TRIES_ALLOWED);
274  }
275 
276  }
277  // now that we've got a pile possibly, we'll try to send them out.
278  push_outgoing(SEND_TRIES_ALLOWED);
279  if (!spock()->connected()) {
280 #ifdef DEBUG_CROMP_SERVER
281  LOG("noticed disconnection of client.");
282 #endif
283  _still_connected = false;
284  }
285  return any_left;
286  }
287 
288  bool get_incoming_data(int &actions) {
289  FUNCDEF("get_incoming_data");
290  if (!healthy()) return false;
291  int first_one = true;
292  bool saw_something = false; // true if we got a packet.
293  while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
294  // pull in anything waiting.
295  infoton *item = NULL_POINTER;
296  octopus_request_id req_id;
297  outcome ret = retrieve_and_restore_any(item, req_id,
298  first_one? DATA_AWAIT_TIMEOUT : 0);
299  first_one = false;
300  if (ret == cromp_common::TIMED_OUT) {
301  actions--; // didn't actually eat one.
302  return false;
303  } else if (ret != cromp_common::OKAY) {
304 #ifdef DEBUG_CROMP_SERVER
305  LOG(astring("got error ") + cromp_common::outcome_name(ret));
306 #endif
307  if (ret == cromp_common::NO_CONNECTION) {
308 #ifdef DEBUG_CROMP_SERVER
309  LOG("noticed disconnection of client.");
310 #endif
311  _still_connected = false;
312  }
313  actions--; // didn't actually eat one.
314  return false; // get outa here.
315  }
316  // got a packet.
317  saw_something = true;
318  if (!_fixated) {
319  if (req_id._entity.blank()) {
320  LOG(astring("would have assigned ours to blank id! ")
321  + req_id._entity.mangled_form());
322  WHACK(item);
323  continue;
324  }
325 #ifdef DEBUG_CROMP_SERVER
326  LOG(astring("cmd with entity ") + req_id._entity.mangled_form());
327 #endif
328  if (_ent.blank()) {
329  // assign the entity id now that we know it.
330  _ent = req_id._entity;
331 #ifdef DEBUG_CROMP_SERVER
332  LOG(astring("assigned own entity to ") + _ent.mangled_form());
333 #endif
334  } else if (!_fixated && (_ent != req_id._entity) ) {
335 #ifdef DEBUG_CROMP_SERVER
336  LOG(astring("fixated on entity of ") + req_id._entity.mangled_form()
337  + " where we used to have " + _ent.mangled_form());
338 #endif
339  _ent = req_id._entity;
340  _fixated = true;
341  }
342  } // connects to line after debug just below.
343 #ifdef DEBUG_CROMP_SERVER
344  else if (_ent != req_id._entity) {
345  // this checks the validity of the entity.
346 #ifdef DEBUG_CROMP_SERVER
347  LOG(astring("seeing wrong entity of ") + req_id._entity.mangled_form()
348  + " when we fixated on " + _ent.mangled_form());
349 #endif
350  WHACK(item);
351  continue;
352  }
353 #endif
354  // check again so we make sure we're still healthy; could have changed
355  // state while getting a command.
356  if (!healthy()) {
357  WHACK(item);
358  continue;
359  }
360  string_array classif = item->classifier();
361  // hang onto the classifier since the next time we get a chance, the
362  // object might be destroyed.
363 
364  // we pass responsibility for this item over to the octopus. that's why
365  // we're not deleting it once evaluate gets the item.
366  ret = _octo->evaluate(item, req_id, _parent.instantaneous());
367  if (ret != tentacle::OKAY) {
368 #ifdef DEBUG_CROMP_SERVER
369  LOG(astring("failed to evaluate the infoton we got: ")
370  + classif.text_form());
371 #endif
372 //hmmm: we have upgraded this response to be for all errors, since otherwise
373 // clients will just time out waiting for something that's never coming.
374 
375  // we do a special type of handling when the tentacle is missing. this
376  // is almost always because the wrong type of request is being sent to
377  // a server, or the server didn't register for all the objects it is
378  // supposed to handle.
380 //#ifdef DEBUG_CROMP_SERVER
381  LOG(astring("injecting unhandled note into response stream for ")
382  + req_id.text_form() + ", got outcome " + outcome_name(ret));
383 //#endif
384  _parent.send_to_client(req_id,
385  new unhandled_request(req_id, classif, ret));
386  // this will always work, although it's not a surety that the
387  // client actually still exists. probably though, since we're
388  // just now handling this request.
390  }
391  }
392  return saw_something; // keep going if we actually did anything good.
393  }
394 
395 private:
396  cromp_server &_parent; // the object that owns this client.
397  octopus *_octo;
398  octopus_entity _ent; // the entity by which we know this client.
399  bool _healthy; // reports our current state of happiness.
400  bool _fixated; // true if the entity id has become firm.
401  cromp_data_grabber _grabber; // the data grabbing thread.
402  infoton_list _waiting;
403  // used by the push_client_replies() method; allocated once to avoid churn.
404  bool _still_connected;
405  // set to true up until we notice that the client disconnected.
406  login_tentacle &_security_arm; // provides login checking.
407 };
408 
410 
411 void cromp_data_grabber::perform_activity(void *)
412 {
413 #ifdef DEBUG_CROMP_SERVER
414  FUNCDEF("perform_activity");
415 #endif
416  while (!should_stop()) {
417 // time_stamp started;
418  bool ret = _parent.handle_client_needs(*this);
419 // int duration = int(time_stamp().value() - started.value());
420  if (!ret) {
421  // they said to stop.
422 #ifdef DEBUG_CROMP_SERVER
423  LOG("done handling client needs.");
424 #endif
425  _octo->expunge(_parent.ent());
426  break;
427  }
428  }
429 }
430 
432 
433 class cromp_client_list : public amorph<cromp_client_record>
434 {
435 public:
436  int find(const octopus_entity &to_find) const {
437  for (int i = 0; i < elements(); i++)
438  if (to_find == get(i)->ent()) return i;
439  return common::NOT_FOUND;
440  }
441 };
442 
444 
445 class client_dropping_thread : public ethread
446 {
447 public:
448  client_dropping_thread (cromp_server &parent)
450  _parent(parent) {}
451 
452  void perform_activity(void *formal(ptr)) {
453  FUNCDEF("perform_activity");
454  _parent.drop_dead_clients();
455  }
456 
457 private:
458  cromp_server &_parent; // we perform tricks for this object.
459 };
460 
462 
463 class connection_management_thread : public ethread
464 {
465 public:
466  connection_management_thread(cromp_server &parent)
467  : ethread(),
468  _parent(parent) {}
469 
470  void perform_activity(void *formal(ptr)) {
471  FUNCDEF("perform_activity");
472  _parent.look_for_clients(*this);
473  }
474 
475 private:
476  cromp_server &_parent; // we perform tricks for this object.
477 };
478 
480 
481 #undef LOCK_LISTS
482 #define LOCK_LISTS auto_synchronizer l(*_list_lock)
483  // takes over access to the client list and root socket.
484 
485 cromp_server::cromp_server(const internet_address &where,
486  int accepting_threads, bool instantaneous, int max_per_ent)
487 : cromp_common(cromp_common::chew_hostname(where), max_per_ent),
488  _clients(new cromp_client_list),
489  _accepters(new thread_cabinet),
490  _list_lock(new mutex),
491  _next_droppage(new time_stamp(DEAD_CLIENT_CLEANING_INTERVAL)),
492  _instantaneous(instantaneous),
493  _where(new internet_address(where)),
494  _accepting_threads(accepting_threads),
495  _dropper(new client_dropping_thread(*this)),
496  _enabled(false),
497  _encrypt_arm(NULL_POINTER),
498  _default_security(new cromp_security),
499  _security_arm(NULL_POINTER)
500 {
501  FUNCDEF("constructor");
502 }
503 
505 {
506  disable_servers();
507  WHACK(_accepters);
508  WHACK(_dropper);
509  WHACK(_clients);
510  WHACK(_next_droppage);
511  WHACK(_where);
512  WHACK(_default_security);
513  WHACK(_list_lock);
514  _encrypt_arm = NULL_POINTER;
515  _security_arm = NULL_POINTER;
516 }
517 
518 internet_address cromp_server::location() const { return *_where; }
519 
520 bool cromp_server::get_sizes(const octopus_entity &id, int &items, int &bytes)
521 { return octo()->responses().get_sizes(id, items, bytes); }
522 
524 {
525  const abyte any_list[] = { 0, 0, 0, 0 };
526  return internet_address(byte_array(4, any_list), "", port);
527 }
528 
530 { return octo()->responses().text_form(); }
531 
533  // default number of listening threads; this is the maximum number of mostly
534  // simultaneous connections that the server can pick up at a time.
535  return 7; // others are not generally so limited on resources.
536 }
537 
539  const octopus_entity &ent)
540 {
541  FUNCDEF("wrap_infoton");
542  if (!_enabled) return NULL_POINTER;
543  // identity is not wrapped with encryption; we need to establish and identity
544  // to talk on a distinct channel with the server. even if that identity were
545  // compromised, the interloper should still not be able to listen in on the
546  // establishment of an encryption channel. also, the encryption startup
547  // itself is not encrypted and we don't want to re-encrypt the wrapper.
548  if (dynamic_cast<identity_infoton *>(request)
549  || dynamic_cast<encryption_infoton *>(request)
550  || dynamic_cast<encryption_wrapper *>(request)) return NULL_POINTER;
551 
552 #ifdef DEBUG_CROMP_SERVER
553  LOG(astring("encrypting ") + request->text_form());
554 #endif
555 
556  octenc_key_record *key = _encrypt_arm->keys().lock(ent);
557  // lock here is released a bit down below.
558  if (!key) {
559  LOG(astring("failed to locate key for entity ") + ent.text_form());
560  return NULL_POINTER;
561  }
562  byte_array packed_request;
563  infoton::fast_pack(packed_request, *request);
564  WHACK(request);
565  encryption_wrapper *to_return = new encryption_wrapper;
566  key->_key.encrypt(packed_request, to_return->_wrapped);
567  _encrypt_arm->keys().unlock(key);
568  return to_return;
569 }
570 
572 {
573  FUNCDEF("enable_servers");
574  if (encrypt) {
575  // add the tentacles needed for encryption.
576 #ifdef DEBUG_CROMP_SERVER
577  LOG(astring("enabling encryption for ") + class_name()
578  + " on " + _where->text_form());
579 #endif
580  _encrypt_arm = new encryption_tentacle;
581  add_tentacle(_encrypt_arm, true);
582  add_tentacle(new unwrapping_tentacle, false);
583  }
584  WHACK(_security_arm); // in case being reused.
585  if (security) {
586  _security_arm = new login_tentacle(*security);
587  add_tentacle(_security_arm, true);
588  } else {
589  _security_arm = new login_tentacle(*_default_security);
590  add_tentacle(_security_arm, true);
591  }
592  open_common(*_where); // open the common ground.
593 
594  _enabled = true;
595  // try first accept, no waiting.
596  outcome to_return = accept_one_client(false);
597  if ( (to_return != common::NOT_FOUND) && (to_return != common::OKAY) ) {
598  LOG(astring("failure starting up server: ") + outcome_name(to_return));
599  return to_return;
600  }
601 
602 #ifdef DEBUG_CROMP_SERVER
603  LOG(a_sprintf("adding %d accepting threads.", _accepting_threads));
604 #endif
605  for (int i = 0; i < _accepting_threads; i++) {
606  // crank in a new thread and tell it yes on starting it.
607  _accepters->add_thread(new connection_management_thread(*this), true, NULL_POINTER);
608  }
609 
610  _dropper->start(NULL_POINTER);
611  return OKAY;
612 }
613 
615 {
616  FUNCDEF("disable_servers");
617  if (!_enabled) return;
618  _dropper->stop(); // signal the thread to leave when it can.
619  _accepters->stop_all(); // signal the accepting threads to exit.
620  if (_clients) {
621  LOCK_LISTS;
622  // make sure no one rearranges or uses the client list while we're
623  // working on it.
624  for (int i = 0; i < _clients->elements(); i++) {
625  // stop the client's activities before the big shutdown.
626  cromp_client_record *cli = (*_clients)[i];
627  if (cli) cli->croak();
628  }
629  }
630 
631  close_common(); // zap the socket so that our blocked waiters get woken up.
632 
633  // now finalize the shutdown. we don't grab the lock because we don't want
634  // a deadlock, but we also shouldn't need to grab the lock. by here, we have
635  // cancelled all threads, no new clients should be able to be added, and the
636  // destruction of this list will ensure that each client's thread really is
637  // stopped.
638  WHACK(_clients);
639 
640  _enabled = false; // record our defunctivity.
641 }
642 
644 {
645  LOCK_LISTS;
646  return _clients? _clients->elements() : 0;
647 }
648 
650 {
651  FUNCDEF("disconnect_entity");
652  if (!_enabled) return false;
653  LOCK_LISTS;
654  int indy = _clients->find(id);
655  if (negative(indy)) return false; // didn't find it.
656  cromp_client_record *cli = (*_clients)[indy];
657  // disconnect the client and zap its entity records.
658  cli->croak();
659  return true;
660 }
661 
663  internet_address &found)
664 {
665  FUNCDEF("find_entity");
666  if (!_enabled) return false;
667  found = internet_address();
668  LOCK_LISTS;
669  int indy = _clients->find(id);
670  if (negative(indy)) return false; // didn't find it.
671  cromp_client_record *cli = (*_clients)[indy];
672  // pull out the address from the record at that index.
673  found = cli->spock()->remote();
674  return true;
675 }
676 
677 outcome cromp_server::accept_one_client(bool wait)
678 {
679 #ifdef DEBUG_CROMP_SERVER
680  FUNCDEF("accept_one_client");
681 #endif
682  if (!_enabled) return common::INCOMPLETE;
683  spocket *accepted = NULL_POINTER;
684 //printf((timestamp(true, true) + "into accept\n").s());
685  outcome ret = spock()->accept(accepted, wait);
686 //printf((timestamp(true, true) + "out of accept\n").s());
687  // accept and wait for it to finish.
688  if ( (ret == spocket::OKAY) && accepted) {
689  // we got a new client to talk to.
690  cromp_client_record *adding = new cromp_client_record(*this, accepted,
691  octo(), *_security_arm);
692 #ifdef DEBUG_CROMP_SERVER
693  LOG(a_sprintf("found a new client on sock %d.", accepted->OS_socket()));
694 #endif
695  LOCK_LISTS; // short term lock.
696  _clients->append(adding);
697  return OKAY;
698  } else {
699  if (ret == spocket::NO_CONNECTION)
700  return NOT_FOUND; // normal occurrence.
701 #ifdef DEBUG_CROMP_SERVER
702  LOG(astring("error accepting client: ") + spocket::outcome_name(ret));
703 #endif
704  return DISALLOWED;
705  }
706 }
707 
709 {
710  FUNCDEF("look_for_clients");
711  if (!_enabled) return;
712  // see if any clients have been accepted.
713  while (!requestor.should_stop()) {
714  outcome ret = accept_one_client(false);
715  if ( (ret != OKAY) && (ret != NOT_FOUND) ) {
716  // we got an error condition besides our normal set.
717 //#ifdef DEBUG_CROMP_SERVER
718  LOG(astring("got real error on socket; leaving for good.")
719  + spocket::outcome_name(ret));
720 //#endif
721  break;
722  }
723  // if we weren't told we got a client, then we'll sleep. if we did get
724  // a client, we'll try again right away.
725  if (ret != OKAY)
726  time_control::sleep_ms(ACCEPTANCE_SNOOZE);
727  }
728 }
729 
731  infoton *data)
732 {
733 #ifdef DEBUG_CROMP_SERVER
734  FUNCDEF("send_to_client");
735 #endif
736  if (!_enabled) return common::INCOMPLETE;
737  if (!octo()->responses().add_item(data, id)) {
738 #ifdef DEBUG_CROMP_SERVER
739  LOG("failed to store result for client--no space left currently.");
740 #endif
741  return TOO_FULL;
742  }
743  return OKAY;
744 }
745 
746 /*outcome cromp_server::get_any_from_client(const octopus_entity &ent,
747  infoton * &data, int timeout)
748 {
749  FUNCDEF("get_from_client");
750 //hmmm: this implementation locks the lists; can't we get the client to do
751 // most of the work for this?
752  LOCK_LISTS;
753  int indy = _clients->find(id._entity);
754  if (negative(indy)) return NOT_FOUND; // didn't find it.
755  cromp_client_record *cli = (*_clients)[indy];
756  octopus_request_id id;
757  return cli->retrieve_and_restore_any(data, ent, timeout);
758 }
759 */
760 
762  infoton * &data, int timeout)
763 {
764  FUNCDEF("get_from_client");
765  if (!_enabled) return common::INCOMPLETE;
766 //hmmm: this implementation locks the lists; can't we get the client to do
767 // most of the work for this?
768  LOCK_LISTS;
769  int indy = _clients->find(id._entity);
770  if (negative(indy)) return NOT_FOUND; // didn't find it.
771  cromp_client_record *cli = (*_clients)[indy];
772  return cli->retrieve_and_restore(data, id, timeout);
773 }
774 
776 {
777 #ifdef DEBUG_CROMP_SERVER
778  FUNCDEF("drop_dead_clients");
779 #endif
780  if (!_enabled) return;
781  // clean out any dead clients.
782 
783  {
784  LOCK_LISTS;
785  if (time_stamp() < *_next_droppage) return; // not time yet.
786  }
787 
788  LOCK_LISTS; // keep locked from now on.
789  for (int i = 0; i < _clients->elements(); i++) {
790  cromp_client_record *cli = (*_clients)[i];
791  if (!cli) {
792 #ifdef DEBUG_CROMP_SERVER
793  LOG(astring("error in list structure."));
794 #endif
795  _clients->zap(i, i);
796  i--; // skip back before deleted guy.
797  continue;
798  }
799  if (!cli->still_connected() || !cli->healthy()) {
800 #ifdef DEBUG_CROMP_SERVER
801  LOG(astring("dropping disconnected client ") + cli->ent().mangled_form());
802 #endif
803  cli->croak(); // stop it from operating.
804 
805 //hmmm: check if it has data waiting and complain about it perhaps.
806  _clients->zap(i, i);
807  i--; // skip back before deleted guy.
808  continue;
809  }
810  }
811 
812  _next_droppage->reset(DEAD_CLIENT_CLEANING_INTERVAL);
813 }
814 
815 } //namespace.
816 
a_sprintf is a specialization of astring that provides printf style support.
Definition: astring.h:440
Provides a dynamically resizable ASCII character string.
Definition: astring.h:35
virtual void text_form(base_string &state_fill) const
Provides a text view of all the important info owned by this object.
Definition: astring.cpp:130
A very common template for a dynamic array of bytes.
Definition: byte_array.h:36
Outcomes describe the state of completion for an operation.
Definition: outcome.h:31
A few common features used by both CROMP clients and servers.
Definition: cromp_common.h:36
static const char * outcome_name(const basis::outcome &to_name)
octopi::octopus * octo() const
Definition: cromp_common.h:176
basis::outcome open_common(const sockets::internet_address &where)
sockets::spocket * spock() const
basis::outcome close_common()
virtual basis::outcome add_tentacle(octopi::tentacle *to_add, bool filter=false)
Implements the client registry in a cromp-appropriate manner.
basis::outcome enable_servers(bool encrypt, cromp_security *security=NULL_POINTER)
bool get_sizes(const octopi::octopus_entity &id, int &items, int &bytes)
octopi::infoton * wrap_infoton(octopi::infoton *&request, const octopi::octopus_entity &ent)
static sockets::internet_address any_address(int port)
basis::outcome send_to_client(const octopi::octopus_request_id &id, octopi::infoton *data)
basis::astring responses_text_form() const
sockets::internet_address location() const
void look_for_clients(processes::ethread &requester)
bool find_entity(const octopi::octopus_entity &id, sockets::internet_address &found)
basis::outcome get_from_client(const octopi::octopus_request_id &id, octopi::infoton *&data, int timeout)
static int DEFAULT_ACCEPTERS()
bool disconnect_entity(const octopi::octopus_entity &id)
returns true if the "id" can be found and disconnected.
bool encrypt(const basis::byte_array &source, basis::byte_array &target) const
encrypts the "source" array into the "target" array.
Encapsulates the chit-chat necessary to establish an encrypted connection.
Processes the encryption_infoton object for setting up an encrypted channel.
key_repository & keys() const
provides access to our list of keys.
Wraps an encrypted infoton when the octopus is in an encrypted mode.
basis::byte_array _wrapped
the encrypted data that's held here.
basis::astring text_form() const
bool get_sizes(const octopus_entity &id, int &items, int &bytes)
Encapsulates just the action of identifying an octopus user.
a list of pending requests and who made them.
Definition: entity_defs.h:181
An infoton is an individual request parcel with accompanying information.
Definition: infoton.h:32
virtual void text_form(basis::base_string &state_fill) const =0
requires derived infotons to be able to show their state as a string.
const structures::string_array & classifier() const
this array of strings is the "name" for this infoton.
Definition: infoton.cpp:85
void unlock(octenc_key_record *to_unlock)
drops the lock on the key record in "to_unlock".
octenc_key_record * lock(const octopus_entity &ent)
locates the key for "ent", if it's stored.
Provides rudimentary login services.
Tracks the keys that have been assigned for a secure channel.
crypto::blowfish_crypto _key
used for communicating with an entity.
Provides a way of identifying users of an octopus object.
Definition: entity_defs.h:35
basis::astring text_form() const
returns a readable form of the identifier.
bool blank() const
true if the entity is blank, as constructed by default constructor.
Definition: entity_defs.cpp:99
basis::astring mangled_form() const
returns the combined string form of the identifier.
Identifies requests made on an octopus by users.
Definition: entity_defs.h:114
basis::astring text_form() const
human readable form of the request.
octopus_entity _entity
the entity.
Definition: entity_defs.h:116
Octopus is a design pattern for generalized request processing systems.
Definition: octopus.h:47
entity_data_bin & responses()
allows external access to our set of results.
Definition: octopus.cpp:178
Informs the caller that a request type was unknown to the server octopus.
this simple tentacle just unpacks the encryption_wrapper infoton.
Provides a platform-independent object for adding threads to a program.
Definition: ethread.h:36
bool should_stop() const
reports whether the thread should stop right now.
Definition: ethread.h:136
Manages a collection of threads.
structures::unique_int add_thread(ethread *to_add, bool start_it, void *parm)
adds a thread to be managed by the thread_cabinet.
void stop_all()
makes all of the threads quit.
this type of address describes a destination out on the internet.
basis::astring text_form() const
Abstraction for a higher-level BSD socket that is platform independent.
Definition: spocket.h:40
tcpip_stack & stack() const
Definition: spocket.cpp:150
basis::un_int OS_socket()
Definition: spocket.h:108
basis::outcome accept(spocket *&sock, bool wait)
Definition: spocket.cpp:460
basis::astring hostname() const
An array of strings with some additional helpful methods.
Definition: string_array.h:32
basis::astring text_form() const
A synonym for the text_format() method.
Definition: string_array.h:71
Represents a point in time relative to the operating system startup time.
Definition: time_stamp.h:38
void reset()
sets the stamp time back to now.
Definition: time_stamp.cpp:59
time_representation value() const
returns the time_stamp in terms of the lower level type.
Definition: time_stamp.h:61
#define LOG(to_print)
#define LOCK_LISTS
#define formal(parameter)
This macro just eats what it's passed; it marks unused formal parameters.
Definition: definitions.h:48
#define NULL_POINTER
The value representing a pointer to nothing.
Definition: definitions.h:32
#define DEFINE_CLASS_NAME(objname)
Defines the name of a class by providing a couple standard methods.
Definition: enhance_cpp.h:45
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
Definition: enhance_cpp.h:57
The guards collection helps in testing preconditions and reporting errors.
Definition: array.h:30
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
Definition: functions.h:121
const int MEGABYTE
Number of bytes in a megabyte.
Definition: definitions.h:135
unsigned char abyte
A fairly important unit which is seldom defined...
Definition: definitions.h:51
const int SECOND_ms
Number of milliseconds in a second.
Definition: definitions.h:120
bool negative(const type &a)
negative returns true if "a" is less than zero.
Definition: functions.h:43
const int KILOBYTE
Number of bytes in a kilobyte.
Definition: definitions.h:134
const int DEAD_CLIENT_CLEANING_INTERVAL
const int MAXIMUM_ACTIONS_PER_CLIENT
const int DROPPING_INTERVAL
const int SEND_TRIES_ALLOWED
const int MAXIMUM_SIZE_BATCH
const int SEND_THRESHOLD
const int ACCEPTANCE_SNOOZE
const int DATA_AWAIT_TIMEOUT
const int EXTREME_SEND_TRIES_ALLOWED
const int MAXIMUM_BYTES_PER_SEND
A logger that sends to the console screen using the standard output device.
Provides access to the operating system's socket methods.
Definition: base_address.h:26
A dynamic container class that holds any kind of object via pointers.
Definition: amorph.h:55
#include <time.h>
Definition: earth_time.cpp:37