fixing extensions
[feisty_meow.git] / octopi / library / octopus / octopus.cpp
1 /*****************************************************************************\
2 *                                                                             *
3 *  Name   : octopus                                                           *
4 *  Author : Chris Koeritz                                                     *
5 *                                                                             *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author.  This program is free software; you can  *
8 * redistribute it and/or modify it under the terms of the GNU General Public  *
9 * License as published by the Free Software Foundation; either version 2 of   *
10 * the License or (at your option) any later version.  This is online at:      *
11 *     http://www.fsf.org/copyleft/gpl.html                                    *
12 * Please send any updates to: fred@gruntose.com                               *
13 \*****************************************************************************/
14
15 #include "entity_data_bin.h"
16 #include "entity_defs.h"
17 #include "identity_tentacle.h"
18 #include "infoton.h"
19 #include "octopus.h"
20 #include "tentacle.h"
21 #include "unhandled_request.h"
22
23 #include <basis/astring.h>
24 #include <basis/mutex.h>
25 #include <configuration/application_configuration.h>
26 #include <loggers/critical_events.h>
27 #include <loggers/program_wide_logger.h>
28 #include <mathematics/chaos.h>
29 #include <structures/amorph.h>
30 #include <structures/string_hash.h>
31 #include <timely/time_control.h>
32 #include <timely/time_stamp.h>
33
34 using namespace basis;
35 using namespace configuration;
36 using namespace loggers;
37 using namespace mathematics;
38 using namespace processes;
39 using namespace structures;
40 using namespace timely;
41
42 namespace octopi {
43
44 //#define DEBUG_OCTOPUS
45   // uncomment for debugging noise.
46 //#define DEBUG_OCTOPUS_FILTERS
47   // uncomment for noisy filter processing.
48
49 #undef GRAB_LOCK
50 #define GRAB_LOCK \
51   auto_synchronizer l(*_molock)
52
53 // this macro returns a result and deletes the request due to a failure.  it
54 // stores a response for the request, in case they were expecting one, since
55 // otherwise they will wait a long time for a response that isn't coming.  if
56 // those responses are never picked up, they will eventually be cleaned out.
57 #define WHACK_RETURN(to_ret, to_whack) { \
58   unhandled_request *bad_response = new unhandled_request(id, \
59       request->classifier(), to_ret); \
60   _responses->add_item(bad_response, id); \
61   WHACK(to_whack); \
62   return to_ret; \
63 }
64
65 const int MAXIMUM_TRASH_SIZE = 128 * KILOBYTE;
66   // this is how much we'll toss out on closing an entity.
67
68 #undef LOG
69 #define LOG(t) CLASS_EMERGENCY_LOG(program_wide_logger::get(), t)
70
71 const int OCTOPUS_CHECKING_INTERVAL = 4 * MINUTE_ms;
72   // the frequency in milliseconds of cleaning on the response bin.  this
73   // doesn't need to happen very often; it only tosses data that has been
74   // abandoned in the response bin.
75
76 //////////////
77
78 class filter_list : public array<tentacle *>
79 {
80 public:
81   bool remove(tentacle *to_remove) {
82     for (int i = 0; i < length(); i++) {
83       if (get(i) == to_remove) {
84         zap(i, i);
85         return true;
86       }
87     }
88     return false;
89   }
90 };
91
92 //////////////
93
94 class tentacle_record 
95 {
96 public:
97   tentacle *_limb;
98   bool _filter;
99
100   tentacle_record(tentacle *limb, bool filter)
101       : _limb(limb), _filter(filter) {}
102
103   ~tentacle_record() { WHACK(_limb); }
104 };
105
106 //////////////
107
108 class modula_oblongata : public amorph<tentacle_record>
109 {
110 public:
111   modula_oblongata() : amorph<tentacle_record>() {}
112
113   int find_index(const string_array &group) {
114     for (int i = 0; i < elements(); i++) {
115       if (borrow(i)->_limb->group().prefix_compare(group))
116         return i;
117     }
118     return common::NOT_FOUND;
119   }
120
121   tentacle *find(const string_array &group) {
122     int indy = find_index(group);
123     if (negative(indy)) return NULL_POINTER;
124     return borrow(indy)->_limb;
125   }
126
127   bool zap(int a, int b) {
128     outcome ret = amorph<tentacle_record>::zap(a, b);
129     return ret == common::OKAY;
130   }
131
132   bool zap(const string_array &group) {
133     int indy = find_index(group);
134     if (negative(indy)) return false;
135     amorph<tentacle_record>::zap(indy, indy);
136     return true;
137   }
138 };
139
140 //////////////
141
142 octopus::octopus(const astring &name, int max_per_ent)
143 : _name(new astring(name)),
144   _tentacles(new modula_oblongata),
145   _molock(new mutex),
146   _responses(new entity_data_bin(max_per_ent)),
147   _disallow_removals(0),
148   _next_cleaning(new time_stamp(OCTOPUS_CHECKING_INTERVAL)),
149   _clean_lock(new mutex),
150   _filters(new filter_list),
151   _sequencer(new safe_roller(1, MAXINT32 / 2)),
152   _rando(new chaos)
153 {
154   add_tentacle(new identity_tentacle(*this), true);
155     // register a way to issue identities.  this is a filter.
156   add_tentacle(new unhandled_request_tentacle(false), false);
157     // provide a way to unpack the unhandled_request object.
158 }
159
160 octopus::~octopus()
161 {
162   FUNCDEF("destructor");
163   WHACK(_filters);
164   WHACK(_tentacles);
165   WHACK(_responses);
166   WHACK(_next_cleaning);
167   WHACK(_clean_lock);
168   WHACK(_name);
169   WHACK(_molock);
170   WHACK(_rando);
171   WHACK(_sequencer);
172 }
173
174 void octopus::lock_tentacles() { _molock->lock(); }
175
176 void octopus::unlock_tentacles() { _molock->unlock(); }
177
178 entity_data_bin &octopus::responses() { return *_responses; }
179
180 int octopus::locked_tentacle_count() { return _tentacles->elements(); }
181
182 const astring &octopus::name() const { return *_name; }
183
184 tentacle *octopus::locked_get_tentacle(int indy)
185 { return _tentacles->borrow(indy)->_limb; }
186
187 infoton *octopus::acquire_specific_result(const octopus_request_id &id)
188 { return _responses->acquire_for_identifier(id); }
189
190 infoton *octopus::acquire_result(const octopus_entity &requester,
191     octopus_request_id &id)
192 { return _responses->acquire_for_entity(requester, id); }
193
194 void octopus::unlock_tentacle(tentacle *to_unlock)
195 {
196   to_unlock = NULL_POINTER;
197   _molock->unlock();
198 }
199
200 void octopus::expunge(const octopus_entity &to_remove)
201 {
202   FUNCDEF("expunge");
203   {
204     // temporary lock so we can keep tentacles from evaporating.
205     GRAB_LOCK;
206     _disallow_removals++;
207   }
208
209   // we've now ensured that no tentacles will be removed, so at most the
210   // list would get longer.  we'll settle on its current length.
211   int len = _tentacles->elements();
212   for (int i = 0; i < len; i++) {
213     tentacle_record *curr = _tentacles->borrow(i);
214     if (!curr || !curr->_limb) {
215 //complain... logic error.
216       continue;
217     }
218     // activate the expunge method on the current tentacle.
219     curr->_limb->expunge(to_remove);
220   }
221
222   {
223     // re-enable tentacle removals.
224     GRAB_LOCK;
225     _disallow_removals--;
226   }
227
228   // throw out any data that was waiting for that guy.
229   int items_found = 1;
230   infoton_list junk;
231   while (items_found) {
232     // grab a chunk of items to be trashed.
233     items_found = responses().acquire_for_entity(to_remove, junk,
234         MAXIMUM_TRASH_SIZE);
235     junk.reset();
236 //#ifdef DEBUG_OCTOPUS
237     if (items_found)
238       LOG(a_sprintf("cleaned %d items for expunged entity ", items_found)
239           + to_remove.mangled_form());
240 //#endif
241   }
242
243 }
244
245 outcome octopus::zap_tentacle(const string_array &tentacle_name)
246 {
247   tentacle *found = NULL_POINTER;
248   outcome ret = remove_tentacle(tentacle_name, found);
249   WHACK(found);
250   return ret;
251 }
252
253 outcome octopus::add_tentacle(tentacle *to_add, bool filter)
254 {
255   FUNCDEF("add_tentacle");
256   if (!to_add) return tentacle::BAD_INPUT;
257   if (!to_add->group().length()) return tentacle::BAD_INPUT;
258   outcome zapped_it = zap_tentacle(to_add->group());
259   if (zapped_it == tentacle::OKAY) {
260 //#ifdef DEBUG_OCTOPUS
261     LOG(astring("removed existing tentacle: ") + to_add->group().text_form());
262 //#endif
263   }
264   GRAB_LOCK;
265   tentacle *found = _tentacles->find(to_add->group());
266   // if found is non-null, then that would be a serious logic error since
267   // we just zapped it above.
268   if (found) return tentacle::ALREADY_EXISTS;
269   to_add->attach_storage(*_responses);
270   tentacle_record *new_record = new tentacle_record(to_add, filter);
271   _tentacles->append(new_record);
272   if (filter) *_filters += to_add;
273 #ifdef DEBUG_OCTOPUS
274   LOG(astring("added tentacle on ") + to_add->group().text_form());
275 #endif
276   return tentacle::OKAY;
277 }
278
279 outcome octopus::remove_tentacle(const string_array &group_name,
280     tentacle * &free_me)
281 {
282   FUNCDEF("remove_tentacle");
283   free_me = NULL_POINTER;
284   if (!group_name.length()) return tentacle::BAD_INPUT;
285   while (true) {
286     // repeatedly grab the lock and make sure we're allowed to remove.  if
287     // we're told we can't remove yet, then we drop the lock again and pause.
288     _molock->lock();
289     if (!_disallow_removals) {
290       // we ARE allowed to remove it right now.  we leave the loop in
291       // possession of the lock.
292       break;
293     }
294     if (_disallow_removals < 0) {
295       continuable_error(class_name(), func, "logic error in removal "
296           "reference counter.");
297     }
298     _molock->unlock();
299     time_control::sleep_ms(0);  // yield thread's execution to another thread.
300   }
301   int indy = _tentacles->find_index(group_name);
302   if (negative(indy)) {
303     // nope, no match.
304     _molock->unlock();
305     return tentacle::NOT_FOUND;
306   }
307   // found the match.
308   tentacle_record *freeing = _tentacles->acquire(indy);
309   _tentacles->zap(indy, indy);
310   free_me = freeing->_limb;
311   _filters->remove(free_me);
312   _molock->unlock();
313   freeing->_limb = NULL_POINTER;
314   WHACK(freeing);
315   return tentacle::OKAY;
316 }
317
318 outcome octopus::restore(const string_array &classifier,
319     byte_array &packed_form, infoton * &reformed)
320 {
321 #ifdef DEBUG_OCTOPUS
322   FUNCDEF("restore");
323 #endif
324   periodic_cleaning();  // freshen up if it's that time.
325
326   reformed = NULL_POINTER;
327   if (!classifier.length()) return tentacle::BAD_INPUT;
328   if (!packed_form.length()) return tentacle::BAD_INPUT;
329   if (!classifier.length()) return tentacle::BAD_INPUT;
330   {
331     // keep anyone from being removed until we're done.
332     GRAB_LOCK;
333     _disallow_removals++;
334   }
335   tentacle *found = _tentacles->find(classifier);
336   outcome to_return;
337   if (!found) {
338 #ifdef DEBUG_OCTOPUS
339     LOG(astring("tentacle not found for: ") + classifier.text_form());
340 #endif
341     to_return = tentacle::NOT_FOUND;
342   } else {
343     to_return = found->reconstitute(classifier, packed_form, reformed);
344   }
345   // re-enable tentacle removals.
346   GRAB_LOCK;
347   _disallow_removals--;
348   return to_return;
349 }
350
351 outcome octopus::evaluate(infoton *request, const octopus_request_id &id,
352     bool now)
353 {
354   FUNCDEF("evaluate");
355   periodic_cleaning();  // freshen up if it's that time.
356
357   // check that the classifier is well formed.
358   if (!request->classifier().length()) {
359 #ifdef DEBUG_OCTOPUS
360     LOG("failed due to empty classifier.");
361 #endif
362     WHACK_RETURN(tentacle::BAD_INPUT, request);
363   }
364
365   _molock->lock();
366
367   // block tentacle removals while we're working.
368   _disallow_removals++;
369
370   // ensure that we pass this infoton through all the filters for vetting.
371   for (int i = 0; i < _filters->length(); i++) {
372     tentacle *current = (*_filters)[i];
373 #ifdef DEBUG_OCTOPUS_FILTERS
374     LOG(a_sprintf("%d: checking ", i + 1) + current->group().text_form());
375 #endif
376
377     // check if the infoton is addressed specifically by this filter.
378     bool is_relevant = current->group().prefix_compare(request->classifier());
379
380 #ifdef DEBUG_OCTOPUS_FILTERS
381     if (is_relevant)
382       LOG(astring("found it to be relevant!  for ") + id.text_form())
383     else
384       LOG(astring("found it to not be relevant.  for ") + id.text_form());
385 #endif
386
387     // this infoton is _for_ this filter.
388     _molock->unlock();
389       // unlock octopus to allow others to operate.
390
391     byte_array transformed;
392 //hmmm: maybe there should be a separate filter method?
393     outcome to_return = current->consume(*request, id, transformed);
394       // pass the infoton into the current filter.
395
396     if (is_relevant) {
397       // the infoton was _for_ the current filter.  that means that we are
398       // done processing it now.
399 #ifdef DEBUG_OCTOPUS_FILTERS
400       LOG(astring("filter ") + current->group().text_form() + " consumed "
401           "infoton from " + id.text_form() + " with result "
402           + tentacle::outcome_name(to_return));
403 #endif
404       WHACK(request);
405       GRAB_LOCK;  // short re-establishment of the lock.
406       _disallow_removals--;
407       return to_return;
408     } else {
409       // the infoton was vetted by the filter.  make sure it was liked.
410 #ifdef DEBUG_OCTOPUS_FILTERS
411       LOG(astring("filter ") + current->group().text_form() + " vetted "
412           "infoton " + id.text_form() + " with result "
413           + tentacle::outcome_name(to_return));
414 #endif
415       if (to_return == tentacle::PARTIAL) {
416         // if the infoton is partially complete, then we're allowed to keep
417         // going.  this outcome means it was not prohibited.
418
419         // make sure they didn't switch it out on us.
420         if (transformed.length()) {
421           // we need to substitute the transformed version for the original.
422           string_array classif;
423           byte_array decro;  // decrypted packed infoton.
424           bool worked = infoton::fast_unpack(transformed, classif, decro);
425           if (!worked) {
426             LOG("failed to fast_unpack the transformed data.");
427           } else {
428             infoton *new_req = NULL_POINTER;
429             outcome rest_ret = restore(classif, decro, new_req);
430             if (rest_ret == tentacle::OKAY) {
431               // we got a good transformed version.
432               WHACK(request);
433               request = new_req;  // substitution complete.
434             } else {
435               LOG("failed to restore transformed infoton.");
436             }
437           }
438         }
439
440         _molock->lock();  // get the lock again.
441         continue;
442       } else {
443         // this is a failure to process that object.
444 #ifdef DEBUG_OCTOPUS_FILTERS
445         LOG(astring("filter ") + current->group().text_form() + " denied "
446             "infoton from " + id.text_form());
447 #endif
448         {
449           GRAB_LOCK;  // short re-establishment of the lock.
450           _disallow_removals--;
451         }
452         WHACK_RETURN(to_return, request);
453       }
454     }
455   }
456
457   // if we're here, then the infoton has been approved by all filters.
458
459 #ifdef DEBUG_OCTOPUS_FILTERS
460   LOG(astring("all filters approved infoton: ") + id.text_form());
461 #endif
462
463   // locate the appropriate tentacle for this request.
464   tentacle *found = _tentacles->find(request->classifier());
465
466   _molock->unlock();
467     // from here in, the octopus itself is not locked up.  but we have sent
468     // the signal that no one must remove any tentacles for now.
469
470   if (!found) {
471 #ifdef DEBUG_OCTOPUS
472     LOG(astring("tentacle not found for: ")
473         + request->classifier().text_form());
474 #endif
475     GRAB_LOCK;  // short re-establishment of the lock.
476     _disallow_removals--;
477     WHACK_RETURN(tentacle::NOT_FOUND, request);
478   }
479   // make sure they want background execution and that the tentacle can
480   // support this.
481   if (!now && found->backgrounding()) {
482     // pass responsibility over to the tentacle.
483     outcome to_return = found->enqueue(request, id);
484     GRAB_LOCK;  // short re-establishment of the lock.
485     _disallow_removals--;
486     return to_return;
487   } else {
488     // call the tentacle directly.
489     byte_array ignored;
490     outcome to_return = found->consume(*request, id, ignored);
491     WHACK(request);
492     GRAB_LOCK;  // short re-establishment of the lock.
493     _disallow_removals--;
494     return to_return;
495   }
496 }
497
498 void octopus::periodic_cleaning()
499 {
500   FUNCDEF("periodic_cleaning");
501   time_stamp next_time;
502   {
503     auto_synchronizer l(*_clean_lock);
504     next_time = *_next_cleaning;
505   }
506   if (next_time < time_stamp()) {
507     // the bin locks itself, so we don't need to grab the lock here.
508     _responses->clean_out_deadwood(); 
509     auto_synchronizer l(*_clean_lock);
510       // lock before modifying the time stamp; only one writer.
511     _next_cleaning->reset(OCTOPUS_CHECKING_INTERVAL);
512   }
513 }
514
515 tentacle *octopus::lock_tentacle(const string_array &tentacle_name)
516 {
517   if (!tentacle_name.length()) return NULL_POINTER;
518   _molock->lock();
519   tentacle *found = _tentacles->find(tentacle_name);
520   if (!found) {
521     _molock->unlock();
522     return NULL_POINTER;
523   }
524   return found;
525 }
526
527 octopus_entity octopus::issue_identity()
528 {
529   return octopus_entity(*_name, application_configuration::process_id(), _sequencer->next_id(),
530       _rando->inclusive(0, MAXINT32 / 4));
531 }
532
533 } //namespace.
534