first check-in of feisty meow codebase. many things broken still due to recent
[feisty_meow.git] / octopi / library / cromp / cromp_server.cpp
1 /*****************************************************************************\
2 *                                                                             *
3 *  Name   : cromp_server                                                      *
4 *  Author : Chris Koeritz                                                     *
5 *                                                                             *
6 *******************************************************************************
7 * Copyright (c) 2000-$now By Author.  This program is free software; you can  *
8 * redistribute it and/or modify it under the terms of the GNU General Public  *
9 * License as published by the Free Software Foundation; either version 2 of   *
10 * the License or (at your option) any later version.  This is online at:      *
11 *     http://www.fsf.org/copyleft/gpl.html                                    *
12 * Please send any updates to: fred@gruntose.com                               *
13 \*****************************************************************************/
14
15 #include "cromp_common.h"
16 #include "cromp_security.h"
17 #include "cromp_server.h"
18
19 #include <basis/astring.h>
20 #include <basis/functions.h>
21 #include <basis/mutex.h>
22 #include <loggers/program_wide_logger.h>
23 #include <octopus/entity_data_bin.h>
24 #include <octopus/entity_defs.h>
25 #include <octopus/identity_infoton.h>
26 #include <octopus/infoton.h>
27 #include <octopus/tentacle.h>
28 #include <octopus/unhandled_request.h>
29 #include <processes/ethread.h>
30 #include <processes/thread_cabinet.h>
31 #include <sockets/internet_address.h>
32 #include <sockets/tcpip_stack.h>
33 #include <sockets/spocket.h>
34 #include <structures/amorph.h>
35 #include <structures/unique_id.h>
36 #include <tentacles/key_repository.h>
37 #include <tentacles/login_tentacle.h>
38 #include <tentacles/encryption_tentacle.h>
39 #include <tentacles/encryption_wrapper.h>
40 #include <timely/time_control.h>
41
42 using namespace basis;
43 using namespace loggers;
44 using namespace octopi;
45 using namespace processes;
46 using namespace sockets;
47 using namespace structures;
48 using namespace timely;
49
50 namespace cromp {
51
52 //#define DEBUG_CROMP_SERVER
53   // uncomment for noisy version.
54
55 const int DEAD_CLIENT_CLEANING_INTERVAL = 1 * SECOND_ms;
56   // we will drop any clients that have disconnected this long ago.
57
58 const int MAXIMUM_ACTIONS_PER_CLIENT = 4000;
59   // this is the maximum number of things we'll do in one run for a
60   // client, including both sends and receives.
61
62 const int SEND_TRIES_ALLOWED = 1;
63   // the number of attempts we will make to get outgoing data to send.
64
65 const int SEND_THRESHOLD = 512 * KILOBYTE;
66   // if we pile up some data to this point in our client gathering, we'll
67   // go ahead and start pushing it to the client.
68
69 const int EXTREME_SEND_TRIES_ALLOWED = 28;
70   // if we're clogged, we'll push this many times to get data out.
71
72 const int MAXIMUM_BYTES_PER_SEND = 2 * MEGABYTE;
73   // the maximum size we want our buffer to grow.
74
75 const int MAXIMUM_SIZE_BATCH = 384 * KILOBYTE;
76   // the largest chunk of updates we'll try to grab at one time.
77
78 const int DROPPING_INTERVAL = 500;
79   // the rate at which we'll check for dead clients and clean up.
80
81 const int DATA_AWAIT_TIMEOUT = 14;
82   // how long the server zones out waiting for data.
83
84 const int ACCEPTANCE_SNOOZE = 60;
85   // if the server sees no clients, it will take a little nap.
86
87 #undef LOG
88 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(to_print).s())
89
90 //////////////
91
92 // forward.
93 class cromp_client_record;
94
95 class cromp_data_grabber : public ethread
96 {
97 public:
98   cromp_data_grabber(cromp_client_record &parent, octopus *octo)
99       : ethread(), _parent(parent), _octo(octo) {}
100
101   DEFINE_CLASS_NAME("cromp_data_grabber");
102
103   virtual void perform_activity(void *);
104
105 private:
106   cromp_client_record &_parent;
107   octopus *_octo;
108 };
109  
110 //////////////
111
112 class cromp_client_record : public cromp_common
113 {
114 public:
115   cromp_client_record(cromp_server &parent, spocket *client, octopus *octo,
116       login_tentacle &security)
117   : cromp_common(client, octo),
118     _parent(parent),
119     _octo(octo),
120     _ent(),
121     _healthy(true),
122     _fixated(false),
123     _grabber(*this, octo),
124     _waiting(),
125     _still_connected(true),
126     _security_arm(security)
127   {
128     internet_address local_addr = internet_address
129         (internet_address::localhost(), client->stack().hostname(), 0);
130     open_common(local_addr);  // open the common support for biz.
131     _grabber.start(NIL);  // crank up our background data pump on the socket.
132   }
133
134   ~cromp_client_record() {
135     croak();
136   }
137
138   DEFINE_CLASS_NAME("cromp_client_record");
139
140   bool handle_client_needs(ethread &prompter) {
141 #ifdef DEBUG_CROMP_SERVER
142     FUNCDEF("handle_client_needs");
143     time_stamp start;
144 #endif
145     if (!_healthy) return false;  // done.
146     if (!spock()->connected()) {
147       _still_connected = false;
148       return false;  // need to stop now.
149     }
150     bool keep_going = true;
151     int actions = 0;
152     while (keep_going && (actions < MAXIMUM_ACTIONS_PER_CLIENT) ) {
153       // make sure we don't overstay our welcome when the thread's supposed
154       // to quit.
155       if (prompter.should_stop()) return false;
156       keep_going = false;  // only continue if there's a reason.
157       bool ret = get_incoming_data(actions);  // look for requests.
158       if (ret) keep_going = true;
159       ret = push_client_replies(actions);  // send replies back to the client.
160       if (ret) keep_going = true;
161     }
162
163 #ifdef DEBUG_CROMP_SERVER
164     if (actions > 10) {
165       LOG(a_sprintf("actions=%d", actions));
166       LOG(a_sprintf("%d pending send bytes, %d bytes accumulated, bin has "
167           "%d items.", pending_sends(), accumulated_bytes(),
168           octo()->responses().items_held()));
169     }
170
171     int duration = int(time_stamp().value() - start.value());
172     if (duration > 200) {
173       LOG(a_sprintf("duration=%d ms.", duration));
174     }
175 #endif
176
177     return true;
178   }
179
180   const octopus_entity &ent() const { return _ent; }
181
182   // stops the background activity of this object and drops the connection
183   // to the client.
184   void croak() {
185 //    FUNCDEF("croak");
186     _grabber.stop();
187     int actions = 0;
188     while (get_incoming_data(actions)) {
189       // keep receiving whatever's there already.  we are trying to drain
190       // the socket before destroying it.
191     }
192     _healthy = false;
193     // clean out any records for this goner.
194     _security_arm.expunge(_ent);
195     close_common();
196   }
197
198   bool healthy() const { return _healthy; }
199     // this is true unless the object has been told to shut down.
200
201   bool still_connected() const { return _still_connected; }
202     // this is true unless the client side dropped the connection.
203
204   cromp_server &parent() const { return _parent; }
205
206   bool push_client_replies(int &actions) {
207     FUNCDEF("push_client_replies");
208     if (!healthy()) return false;
209     if (ent().blank()) {
210       // not pushing replies if we haven't even gotten a command yet.
211 #ifdef DEBUG_CROMP_SERVER
212       LOG("not pushing replies for blank.");
213 #endif
214       return false;
215     }
216
217     if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
218 LOG("buffer clog being cleared now.");
219       // the buffers are pretty full; we'll try later.
220       push_outgoing(EXTREME_SEND_TRIES_ALLOWED);
221       // if we're still clogged, then leave.
222       if (buffer_clog(MAXIMUM_BYTES_PER_SEND)) {
223 LOG("could not completely clear buffer clog.");
224         return true;
225       }
226 LOG("cleared out buffer clog.");
227     }
228
229     int any_left = true;
230     while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
231       // make sure we're not wasting our time.
232       if (!_octo->responses().items_held()) {
233         any_left = false;
234         break;
235       }
236       // make sure we don't ignore receptions.
237       grab_anything(false);
238       // try to grab a result for this entity.
239       int num_located = _octo->responses().acquire_for_entity(ent(),
240           _waiting, MAXIMUM_SIZE_BATCH);
241       if (!num_located) {
242         any_left = false;
243         break;
244       }
245
246       // if we're encrypting, we need to wrap these as well.
247       if (_parent.encrypting()) {
248         for (int i = 0; i < _waiting.elements(); i++) {
249           infoton *curr = _waiting[i]->_data;
250           infoton *processed = _parent.wrap_infoton(curr,
251               _waiting[i]->_id._entity);
252           if (processed) _waiting[i]->_data = processed;  // replace infoton.
253         }
254       }
255
256       outcome ret = pack_and_ship(_waiting, 0);
257         // no attempt to send yet; we're just stuffing the buffer.
258       if ( (ret != cromp_common::OKAY) && (ret != cromp_common::TIMED_OUT) ) {
259 //hmmm: what about keeping transmission as held in list; retry later on it?
260
261 //#ifdef DEBUG_CROMP_SERVER
262         LOG(astring("failed to send package back to client: ")
263             + cromp_common::outcome_name(ret));
264 //#endif
265         any_left = false;
266         break;
267       }
268
269       if (pending_sends() > SEND_THRESHOLD) {
270 #ifdef DEBUG_CROMP_SERVER
271         LOG(astring("over sending threshold on ") + _ent.text_form());
272 #endif
273         push_outgoing(SEND_TRIES_ALLOWED);
274       }
275
276     }
277     // now that we've got a pile possibly, we'll try to send them out.
278     push_outgoing(SEND_TRIES_ALLOWED);
279     if (!spock()->connected()) {
280 #ifdef DEBUG_CROMP_SERVER
281       LOG("noticed disconnection of client.");
282 #endif
283       _still_connected = false;
284     }
285     return any_left;
286   }
287
288   bool get_incoming_data(int &actions) {
289     FUNCDEF("get_incoming_data");
290     if (!healthy()) return false;
291     int first_one = true;
292     bool saw_something = false;  // true if we got a packet.
293     while (actions++ < MAXIMUM_ACTIONS_PER_CLIENT) {
294       // pull in anything waiting.
295       infoton *item = NIL;
296       octopus_request_id req_id;
297       outcome ret = retrieve_and_restore_any(item, req_id,
298           first_one? DATA_AWAIT_TIMEOUT : 0);
299       first_one = false;
300       if (ret == cromp_common::TIMED_OUT) {
301         actions--;  // didn't actually eat one.
302         return false;
303       } else if (ret != cromp_common::OKAY) {
304 #ifdef DEBUG_CROMP_SERVER
305         LOG(astring("got error ") + cromp_common::outcome_name(ret));
306 #endif
307         if (ret == cromp_common::NO_CONNECTION) {
308 #ifdef DEBUG_CROMP_SERVER
309           LOG("noticed disconnection of client.");
310 #endif
311           _still_connected = false;
312         }
313         actions--;  // didn't actually eat one.
314         return false;  // get outa here.
315       }
316       // got a packet.
317       saw_something = true;
318       if (!_fixated) {
319         if (req_id._entity.blank()) {
320           LOG(astring("would have assigned ours to blank id! ")
321               + req_id._entity.mangled_form());
322           WHACK(item);
323           continue;
324         }
325 #ifdef DEBUG_CROMP_SERVER
326         LOG(astring("cmd with entity ") + req_id._entity.mangled_form());
327 #endif
328         if (_ent.blank()) {
329           // assign the entity id now that we know it.
330           _ent = req_id._entity;
331 #ifdef DEBUG_CROMP_SERVER
332           LOG(astring("assigned own entity to ") + _ent.mangled_form());
333 #endif
334         } else if (!_fixated && (_ent != req_id._entity) ) {
335 #ifdef DEBUG_CROMP_SERVER
336           LOG(astring("fixated on entity of ") + req_id._entity.mangled_form()
337               + " where we used to have " + _ent.mangled_form());
338 #endif
339           _ent = req_id._entity;
340           _fixated = true;
341         }
342       }  // connects to line after debug just below.
343 #ifdef DEBUG_CROMP_SERVER
344         else if (_ent != req_id._entity) {
345         // this checks the validity of the entity.
346 #ifdef DEBUG_CROMP_SERVER
347         LOG(astring("seeing wrong entity of ") + req_id._entity.mangled_form()
348             + " when we fixated on " + _ent.mangled_form());
349 #endif
350         WHACK(item);
351         continue;
352       }
353 #endif
354       // check again so we make sure we're still healthy; could have changed
355       // state while getting a command.
356       if (!healthy()) {
357         WHACK(item);
358         continue;
359       }
360       string_array classif = item->classifier();
361         // hang onto the classifier since the next time we get a chance, the
362         // object might be destroyed.
363
364       // we pass responsibility for this item over to the octopus.  that's why
365       // we're not deleting it once evaluate gets the item.
366       ret = _octo->evaluate(item, req_id, _parent.instantaneous());
367       if (ret != tentacle::OKAY) {
368 #ifdef DEBUG_CROMP_SERVER
369         LOG(astring("failed to evaluate the infoton we got: ")
370             + classif.text_form());
371 #endif
372 //hmmm: we have upgraded this response to be for all errors, since otherwise
373 //      clients will just time out waiting for something that's never coming.
374
375         // we do a special type of handling when the tentacle is missing.  this
376         // is almost always because the wrong type of request is being sent to
377         // a server, or the server didn't register for all the objects it is
378         // supposed to handle.
379 /////        if (ret == tentacle::NOT_FOUND) {
380 //#ifdef DEBUG_CROMP_SERVER
381           LOG(astring("injecting unhandled note into response stream for ")
382               + req_id.text_form() + ", got outcome " + outcome_name(ret));
383 //#endif
384           _parent.send_to_client(req_id,
385               new unhandled_request(req_id, classif, ret));
386             // this will always work, although it's not a surety that the
387             // client actually still exists.  probably though, since we're
388             // just now handling this request.
389 /////        }
390       }
391     }
392     return saw_something;  // keep going if we actually did anything good.
393   }
394
395 private:
396   cromp_server &_parent;  // the object that owns this client.
397   octopus *_octo;
398   octopus_entity _ent;  // the entity by which we know this client.
399   bool _healthy;  // reports our current state of happiness.
400   bool _fixated;  // true if the entity id has become firm.
401   cromp_data_grabber _grabber;  // the data grabbing thread.
402   infoton_list _waiting;
403     // used by the push_client_replies() method; allocated once to avoid churn.
404   bool _still_connected;
405     // set to true up until we notice that the client disconnected.
406   login_tentacle &_security_arm;  // provides login checking.
407 };
408
409 //////////////
410
411 void cromp_data_grabber::perform_activity(void *)
412 {
413 #ifdef DEBUG_CROMP_SERVER
414   FUNCDEF("perform_activity");
415 #endif
416   while (!should_stop()) {
417 //    time_stamp started;
418     bool ret = _parent.handle_client_needs(*this);
419 //    int duration = int(time_stamp().value() - started.value());
420     if (!ret) {
421       // they said to stop.
422 #ifdef DEBUG_CROMP_SERVER
423       LOG("done handling client needs.");
424 #endif
425       _octo->expunge(_parent.ent());
426       break;
427     }
428   }
429 }
430
431 //////////////
432
433 class cromp_client_list : public amorph<cromp_client_record>
434 {
435 public:
436   int find(const octopus_entity &to_find) const {
437     for (int i = 0; i < elements(); i++)
438       if (to_find == get(i)->ent()) return i;
439     return common::NOT_FOUND;
440   }
441 };
442
443 //////////////
444
445 class client_dropping_thread : public ethread
446 {
447 public:
448   client_dropping_thread (cromp_server &parent)
449   : ethread(DROPPING_INTERVAL),
450     _parent(parent) {}
451
452   void perform_activity(void *formal(ptr)) {
453 //    FUNCDEF("perform_activity");
454     _parent.drop_dead_clients(); 
455   }
456
457 private:
458   cromp_server &_parent;  // we perform tricks for this object.
459 };
460
461 //////////////
462
463 class connection_management_thread : public ethread
464 {
465 public:
466   connection_management_thread(cromp_server &parent)
467   : ethread(),
468     _parent(parent) {}
469
470   void perform_activity(void *formal(ptr)) {
471 //    FUNCDEF("perform_activity");
472     _parent.look_for_clients(*this); 
473   }
474
475 private:
476   cromp_server &_parent;  // we perform tricks for this object.
477 };
478
479 //////////////
480
481 #undef LOCK_LISTS
482 #define LOCK_LISTS auto_synchronizer l(*_list_lock)
483   // takes over access to the client list and root socket.
484
485 cromp_server::cromp_server(const internet_address &where,
486     int accepting_threads, bool instantaneous, int max_per_ent)
487 : cromp_common(cromp_common::chew_hostname(where), max_per_ent),
488   _clients(new cromp_client_list),
489   _accepters(new thread_cabinet),
490   _list_lock(new mutex),
491   _next_droppage(new time_stamp(DEAD_CLIENT_CLEANING_INTERVAL)),
492   _instantaneous(instantaneous),
493   _where(new internet_address(where)),
494   _accepting_threads(accepting_threads),
495   _dropper(new client_dropping_thread(*this)),
496   _enabled(false),
497   _encrypt_arm(NIL),
498   _default_security(new cromp_security),
499   _security_arm(NIL)
500 {
501 //  FUNCDEF("constructor");
502 }
503  
504 cromp_server::~cromp_server()
505 {
506   disable_servers();
507   WHACK(_accepters);
508   WHACK(_dropper);
509   WHACK(_clients);
510   WHACK(_next_droppage);
511   WHACK(_where);
512   WHACK(_default_security);
513   WHACK(_list_lock);
514   _encrypt_arm = NIL;
515   _security_arm = NIL;
516 }
517
518 internet_address cromp_server::location() const { return *_where; }
519
520 bool cromp_server::get_sizes(const octopus_entity &id, int &items, int &bytes)
521 { return octo()->responses().get_sizes(id, items, bytes); }
522
523 internet_address cromp_server::any_address(int port)
524 {
525   const abyte any_list[] = { 0, 0, 0, 0 };
526   return internet_address(byte_array(4, any_list), "", port);
527 }
528
529 astring cromp_server::responses_text_form() const
530 { return octo()->responses().text_form(); }
531
532 int cromp_server::DEFAULT_ACCEPTERS() {
533   // default number of listening threads; this is the maximum number of mostly
534   // simultaneous connections that the server can pick up at a time.
535   return 7;  // others are not generally so limited on resources.
536 }
537
538 infoton *cromp_server::wrap_infoton(infoton * &request,
539     const octopus_entity &ent)
540 {
541   FUNCDEF("wrap_infoton");
542   if (!_enabled) return NIL;
543   // identity is not wrapped with encryption; we need to establish and identity
544   // to talk on a distinct channel with the server.  even if that identity were
545   // compromised, the interloper should still not be able to listen in on the
546   // establishment of an encryption channel.  also, the encryption startup
547   // itself is not encrypted and we don't want to re-encrypt the wrapper.
548   if (dynamic_cast<identity_infoton *>(request)
549       || dynamic_cast<encryption_infoton *>(request)
550       || dynamic_cast<encryption_wrapper *>(request)) return NIL;
551
552 #ifdef DEBUG_CROMP_SERVER
553   LOG(astring("encrypting ") + request->text_form());
554 #endif
555
556   octenc_key_record *key = _encrypt_arm->keys().lock(ent);
557     // lock here is released a bit down below.
558   if (!key) {
559     LOG(astring("failed to locate key for entity ") + ent.text_form());
560     return NIL;
561   }
562   byte_array packed_request;
563   infoton::fast_pack(packed_request, *request);
564   WHACK(request);
565   encryption_wrapper *to_return = new encryption_wrapper;
566   key->_key.encrypt(packed_request, to_return->_wrapped);
567   _encrypt_arm->keys().unlock(key);
568   return to_return;
569 }
570
571 outcome cromp_server::enable_servers(bool encrypt, cromp_security *security)
572 {
573   FUNCDEF("enable_servers");
574   if (encrypt) {
575     // add the tentacles needed for encryption.
576 #ifdef DEBUG_CROMP_SERVER
577     LOG(astring("enabling encryption for ") + class_name()
578         + " on " + _where->text_form());
579 #endif
580     _encrypt_arm = new encryption_tentacle;
581     add_tentacle(_encrypt_arm, true);
582     add_tentacle(new unwrapping_tentacle, false);
583   }
584   WHACK(_security_arm);  // in case being reused.
585   if (security) {
586     _security_arm = new login_tentacle(*security);
587     add_tentacle(_security_arm, true);
588   } else {
589     _security_arm = new login_tentacle(*_default_security);
590     add_tentacle(_security_arm, true);
591   }
592   open_common(*_where);  // open the common ground.
593
594   _enabled = true;
595   // try first accept, no waiting.
596   outcome to_return = accept_one_client(false);
597   if ( (to_return != common::NOT_FOUND) && (to_return != common::OKAY) ) {
598     LOG(astring("failure starting up server: ") + outcome_name(to_return));
599     return to_return;
600   }
601
602 #ifdef DEBUG_CROMP_SERVER
603   LOG(a_sprintf("adding %d accepting threads.", _accepting_threads));
604 #endif
605   for (int i = 0; i < _accepting_threads; i++) {
606     // crank in a new thread and tell it yes on starting it.
607     _accepters->add_thread(new connection_management_thread(*this), true, NIL);
608   }
609
610   _dropper->start(NIL);
611   return OKAY;
612 }
613
614 void cromp_server::disable_servers()
615 {
616 //  FUNCDEF("disable_servers");
617   if (!_enabled) return;
618   _dropper->stop();  // signal the thread to leave when it can.
619   _accepters->stop_all();  // signal the accepting threads to exit.
620   if (_clients) {
621     LOCK_LISTS;
622       // make sure no one rearranges or uses the client list while we're
623       // working on it.
624     for (int i = 0; i < _clients->elements(); i++) {
625       // stop the client's activities before the big shutdown.
626       cromp_client_record *cli = (*_clients)[i];
627       if (cli) cli->croak();
628     }
629   }
630
631   close_common();  // zap the socket so that our blocked waiters get woken up.
632
633   // now finalize the shutdown.  we don't grab the lock because we don't want
634   // a deadlock, but we also shouldn't need to grab the lock.  by here, we have
635   // cancelled all threads, no new clients should be able to be added, and the
636   // destruction of this list will ensure that each client's thread really is
637   // stopped.
638   WHACK(_clients);
639
640   _enabled = false;  // record our defunctivity.
641 }
642
643 int cromp_server::clients() const
644 {
645   LOCK_LISTS;
646   return _clients? _clients->elements() : 0;
647 }
648
649 bool cromp_server::disconnect_entity(const octopus_entity &id)
650 {
651 //  FUNCDEF("disconnect_entity");
652   if (!_enabled) return false;
653   LOCK_LISTS;
654   int indy = _clients->find(id);
655   if (negative(indy)) return false;  // didn't find it.
656   cromp_client_record *cli = (*_clients)[indy];
657   // disconnect the client and zap its entity records.
658   cli->croak();
659   return true;
660 }
661
662 bool cromp_server::find_entity(const octopus_entity &id,
663     internet_address &found)
664 {
665 //  FUNCDEF("find_entity");
666   if (!_enabled) return false;
667   found = internet_address();
668   LOCK_LISTS;
669   int indy = _clients->find(id);
670   if (negative(indy)) return false;  // didn't find it.
671   cromp_client_record *cli = (*_clients)[indy];
672     // pull out the address from the record at that index.
673   found = cli->spock()->remote();
674   return true;
675 }
676
677 outcome cromp_server::accept_one_client(bool wait)
678 {
679 #ifdef DEBUG_CROMP_SERVER
680   FUNCDEF("accept_one_client");
681 #endif
682   if (!_enabled) return common::INCOMPLETE;
683   spocket *accepted = NIL;
684 //printf((timestamp(true, true) + "into accept\n").s());
685   outcome ret = spock()->accept(accepted, wait);
686 //printf((timestamp(true, true) + "out of accept\n").s());
687     // accept and wait for it to finish.
688   if ( (ret == spocket::OKAY) && accepted) {
689     // we got a new client to talk to.
690     cromp_client_record *adding = new cromp_client_record(*this, accepted,
691         octo(), *_security_arm);
692 #ifdef DEBUG_CROMP_SERVER
693     LOG(a_sprintf("found a new client on sock %d.", accepted->OS_socket()));
694 #endif
695     LOCK_LISTS;  // short term lock.
696     _clients->append(adding);
697     return OKAY;
698   } else {
699     if (ret == spocket::NO_CONNECTION)
700       return NOT_FOUND;  // normal occurrence.
701 #ifdef DEBUG_CROMP_SERVER
702     LOG(astring("error accepting client: ") + spocket::outcome_name(ret));
703 #endif
704     return DISALLOWED;
705   }
706 }
707
708 void cromp_server::look_for_clients(ethread &requestor)
709 {
710   FUNCDEF("look_for_clients");
711   if (!_enabled) return;
712   // see if any clients have been accepted.
713   while (!requestor.should_stop()) {
714     outcome ret = accept_one_client(false);
715     if ( (ret != OKAY) && (ret != NOT_FOUND) ) {
716       // we got an error condition besides our normal set.
717 //#ifdef DEBUG_CROMP_SERVER
718       LOG(astring("got real error on socket; leaving for good.")
719           + spocket::outcome_name(ret));
720 //#endif
721       break;
722     }
723     // if we weren't told we got a client, then we'll sleep.  if we did get
724     // a client, we'll try again right away.
725     if (ret != OKAY)
726       time_control::sleep_ms(ACCEPTANCE_SNOOZE);
727   }
728 }
729
730 outcome cromp_server::send_to_client(const octopus_request_id &id,
731     infoton *data)
732 {
733 #ifdef DEBUG_CROMP_SERVER
734   FUNCDEF("send_to_client");
735 #endif
736   if (!_enabled) return common::INCOMPLETE;
737   if (!octo()->responses().add_item(data, id)) {
738 #ifdef DEBUG_CROMP_SERVER
739     LOG("failed to store result for client--no space left currently.");
740 #endif
741     return TOO_FULL;
742   }
743   return OKAY;
744 }
745
746 /*outcome cromp_server::get_any_from_client(const octopus_entity &ent,
747     infoton * &data, int timeout)
748 {
749   FUNCDEF("get_from_client");
750 //hmmm: this implementation locks the lists; can't we get the client to do
751 //      most of the work for this?
752   LOCK_LISTS;
753   int indy = _clients->find(id._entity);
754   if (negative(indy)) return NOT_FOUND;  // didn't find it.
755   cromp_client_record *cli = (*_clients)[indy];
756   octopus_request_id id;
757   return cli->retrieve_and_restore_any(data, ent, timeout);
758 }
759 */
760
761 outcome cromp_server::get_from_client(const octopus_request_id &id,
762     infoton * &data, int timeout)
763 {
764 //  FUNCDEF("get_from_client");
765   if (!_enabled) return common::INCOMPLETE;
766 //hmmm: this implementation locks the lists; can't we get the client to do
767 //      most of the work for this?
768   LOCK_LISTS;
769   int indy = _clients->find(id._entity);
770   if (negative(indy)) return NOT_FOUND;  // didn't find it.
771   cromp_client_record *cli = (*_clients)[indy];
772   return cli->retrieve_and_restore(data, id, timeout);
773 }
774
775 void cromp_server::drop_dead_clients()
776 {
777 #ifdef DEBUG_CROMP_SERVER
778   FUNCDEF("drop_dead_clients");
779 #endif
780   if (!_enabled) return;
781   // clean out any dead clients.
782
783   {
784     LOCK_LISTS;
785     if (time_stamp() < *_next_droppage) return;  // not time yet.
786   }
787
788   LOCK_LISTS;  // keep locked from now on.
789   for (int i = 0; i < _clients->elements(); i++) {
790     cromp_client_record *cli = (*_clients)[i];
791     if (!cli) {
792 #ifdef DEBUG_CROMP_SERVER
793       LOG(astring("error in list structure."));
794 #endif
795       _clients->zap(i, i);
796       i--;   // skip back before deleted guy.
797       continue;
798     }
799     if (!cli->still_connected() || !cli->healthy()) {
800 #ifdef DEBUG_CROMP_SERVER
801       LOG(astring("dropping disconnected client ") + cli->ent().mangled_form());
802 #endif
803       cli->croak();  // stop it from operating.
804
805 //hmmm: check if it has data waiting and complain about it perhaps.
806       _clients->zap(i, i);
807       i--;   // skip back before deleted guy.
808       continue;
809     }
810   }
811
812   _next_droppage->reset(DEAD_CLIENT_CLEANING_INTERVAL);
813 }
814
815 } //namespace.
816