feisty meow concerns codebase  2.140
octopus.cpp
Go to the documentation of this file.
1 /*****************************************************************************\
2 * *
3 * Name : octopus *
4 * Author : Chris Koeritz *
5 * *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
14 
15 #include "entity_data_bin.h"
16 #include "entity_defs.h"
17 #include "identity_tentacle.h"
18 #include "infoton.h"
19 #include "octopus.h"
20 #include "tentacle.h"
21 #include "unhandled_request.h"
22 
23 #include <basis/astring.h>
24 #include <basis/mutex.h>
28 #include <mathematics/chaos.h>
29 #include <structures/amorph.h>
30 #include <structures/string_hash.h>
31 #include <timely/time_control.h>
32 #include <timely/time_stamp.h>
33 
34 using namespace basis;
35 using namespace configuration;
36 using namespace loggers;
37 using namespace mathematics;
38 using namespace processes;
39 using namespace structures;
40 using namespace timely;
41 
42 namespace octopi {
43 
44 //#define DEBUG_OCTOPUS
45  // uncomment for debugging noise.
46 //#define DEBUG_OCTOPUS_FILTERS
47  // uncomment for noisy filter processing.
48 
49 #undef GRAB_LOCK
50 #define GRAB_LOCK \
51  auto_synchronizer l(*_molock)
52 
53 // this macro returns a result and deletes the request due to a failure. it
54 // stores a response for the request, in case they were expecting one, since
55 // otherwise they will wait a long time for a response that isn't coming. if
56 // those responses are never picked up, they will eventually be cleaned out.
57 #define WHACK_RETURN(to_ret, to_whack) { \
58  unhandled_request *bad_response = new unhandled_request(id, \
59  request->classifier(), to_ret); \
60  _responses->add_item(bad_response, id); \
61  WHACK(to_whack); \
62  return to_ret; \
63 }
64 
65 const int MAXIMUM_TRASH_SIZE = 128 * KILOBYTE;
66  // this is how much we'll toss out on closing an entity.
67 
68 #undef LOG
69 #define LOG(t) CLASS_EMERGENCY_LOG(program_wide_logger::get(), t)
70 
72  // the frequency in milliseconds of cleaning on the response bin. this
73  // doesn't need to happen very often; it only tosses data that has been
74  // abandoned in the response bin.
75 
77 
78 class filter_list : public array<tentacle *>
79 {
80 public:
81  bool remove(tentacle *to_remove) {
82  for (int i = 0; i < length(); i++) {
83  if (get(i) == to_remove) {
84  zap(i, i);
85  return true;
86  }
87  }
88  return false;
89  }
90 };
91 
93 
94 class tentacle_record
95 {
96 public:
97  tentacle *_limb;
98  bool _filter;
99 
100  tentacle_record(tentacle *limb, bool filter)
101  : _limb(limb), _filter(filter) {}
102 
103  ~tentacle_record() { WHACK(_limb); }
104 };
105 
107 
108 class modula_oblongata : public amorph<tentacle_record>
109 {
110 public:
111  modula_oblongata() : amorph<tentacle_record>() {}
112 
113  int find_index(const string_array &group) {
114  for (int i = 0; i < elements(); i++) {
115  if (borrow(i)->_limb->group().prefix_compare(group))
116  return i;
117  }
118  return common::NOT_FOUND;
119  }
120 
121  tentacle *find(const string_array &group) {
122  int indy = find_index(group);
123  if (negative(indy)) return NULL_POINTER;
124  return borrow(indy)->_limb;
125  }
126 
127  bool zap(int a, int b) {
129  return ret == common::OKAY;
130  }
131 
132  bool zap(const string_array &group) {
133  int indy = find_index(group);
134  if (negative(indy)) return false;
135  amorph<tentacle_record>::zap(indy, indy);
136  return true;
137  }
138 };
139 
141 
142 octopus::octopus(const astring &name, int max_per_ent)
143 : _name(new astring(name)),
144  _tentacles(new modula_oblongata),
145  _molock(new mutex),
146  _responses(new entity_data_bin(max_per_ent)),
147  _disallow_removals(0),
148  _next_cleaning(new time_stamp(OCTOPUS_CHECKING_INTERVAL)),
149  _clean_lock(new mutex),
150  _filters(new filter_list),
151  _sequencer(new safe_roller(1, MAXINT32 / 2)),
152  _rando(new chaos)
153 {
154  add_tentacle(new identity_tentacle(*this), true);
155  // register a way to issue identities. this is a filter.
156  add_tentacle(new unhandled_request_tentacle(false), false);
157  // provide a way to unpack the unhandled_request object.
158 }
159 
161 {
162  FUNCDEF("destructor");
163  WHACK(_filters);
164  WHACK(_tentacles);
165  WHACK(_responses);
166  WHACK(_next_cleaning);
167  WHACK(_clean_lock);
168  WHACK(_name);
169  WHACK(_molock);
170  WHACK(_rando);
171  WHACK(_sequencer);
172 }
173 
174 void octopus::lock_tentacles() { _molock->lock(); }
175 
176 void octopus::unlock_tentacles() { _molock->unlock(); }
177 
178 entity_data_bin &octopus::responses() { return *_responses; }
179 
180 int octopus::locked_tentacle_count() { return _tentacles->elements(); }
181 
182 const astring &octopus::name() const { return *_name; }
183 
185 { return _tentacles->borrow(indy)->_limb; }
186 
188 { return _responses->acquire_for_identifier(id); }
189 
191  octopus_request_id &id)
192 { return _responses->acquire_for_entity(requester, id); }
193 
195 {
196  to_unlock = NULL_POINTER;
197  _molock->unlock();
198 }
199 
200 void octopus::expunge(const octopus_entity &to_remove)
201 {
202  FUNCDEF("expunge");
203  {
204  // temporary lock so we can keep tentacles from evaporating.
205  GRAB_LOCK;
206  _disallow_removals++;
207  }
208 
209  // we've now ensured that no tentacles will be removed, so at most the
210  // list would get longer. we'll settle on its current length.
211  int len = _tentacles->elements();
212  for (int i = 0; i < len; i++) {
213  tentacle_record *curr = _tentacles->borrow(i);
214  if (!curr || !curr->_limb) {
215 //complain... logic error.
216  continue;
217  }
218  // activate the expunge method on the current tentacle.
219  curr->_limb->expunge(to_remove);
220  }
221 
222  {
223  // re-enable tentacle removals.
224  GRAB_LOCK;
225  _disallow_removals--;
226  }
227 
228  // throw out any data that was waiting for that guy.
229  int items_found = 1;
230  infoton_list junk;
231  while (items_found) {
232  // grab a chunk of items to be trashed.
233  items_found = responses().acquire_for_entity(to_remove, junk,
235  junk.reset();
236 //#ifdef DEBUG_OCTOPUS
237  if (items_found)
238  LOG(a_sprintf("cleaned %d items for expunged entity ", items_found)
239  + to_remove.mangled_form());
240 //#endif
241  }
242 
243 }
244 
246 {
247  tentacle *found = NULL_POINTER;
248  outcome ret = remove_tentacle(tentacle_name, found);
249  WHACK(found);
250  return ret;
251 }
252 
254 {
255  FUNCDEF("add_tentacle");
256  if (!to_add) return tentacle::BAD_INPUT;
257  if (!to_add->group().length()) return tentacle::BAD_INPUT;
258  outcome zapped_it = zap_tentacle(to_add->group());
259  if (zapped_it == tentacle::OKAY) {
260 //#ifdef DEBUG_OCTOPUS
261  LOG(astring("removed existing tentacle: ") + to_add->group().text_form());
262 //#endif
263  }
264  GRAB_LOCK;
265  tentacle *found = _tentacles->find(to_add->group());
266  // if found is non-null, then that would be a serious logic error since
267  // we just zapped it above.
268  if (found) return tentacle::ALREADY_EXISTS;
269  to_add->attach_storage(*_responses);
270  tentacle_record *new_record = new tentacle_record(to_add, filter);
271  _tentacles->append(new_record);
272  if (filter) *_filters += to_add;
273 #ifdef DEBUG_OCTOPUS
274  LOG(astring("added tentacle on ") + to_add->group().text_form());
275 #endif
276  return tentacle::OKAY;
277 }
278 
280  tentacle * &free_me)
281 {
282  FUNCDEF("remove_tentacle");
283  free_me = NULL_POINTER;
284  if (!group_name.length()) return tentacle::BAD_INPUT;
285  while (true) {
286  // repeatedly grab the lock and make sure we're allowed to remove. if
287  // we're told we can't remove yet, then we drop the lock again and pause.
288  _molock->lock();
289  if (!_disallow_removals) {
290  // we ARE allowed to remove it right now. we leave the loop in
291  // possession of the lock.
292  break;
293  }
294  if (_disallow_removals < 0) {
295  continuable_error(class_name(), func, "logic error in removal "
296  "reference counter.");
297  }
298  _molock->unlock();
299  time_control::sleep_ms(0); // yield thread's execution to another thread.
300  }
301  int indy = _tentacles->find_index(group_name);
302  if (negative(indy)) {
303  // nope, no match.
304  _molock->unlock();
305  return tentacle::NOT_FOUND;
306  }
307  // found the match.
308  tentacle_record *freeing = _tentacles->acquire(indy);
309  _tentacles->zap(indy, indy);
310  free_me = freeing->_limb;
311  _filters->remove(free_me);
312  _molock->unlock();
313  freeing->_limb = NULL_POINTER;
314  WHACK(freeing);
315  return tentacle::OKAY;
316 }
317 
319  byte_array &packed_form, infoton * &reformed)
320 {
321 #ifdef DEBUG_OCTOPUS
322  FUNCDEF("restore");
323 #endif
324  periodic_cleaning(); // freshen up if it's that time.
325 
326  reformed = NULL_POINTER;
327  if (!classifier.length()) return tentacle::BAD_INPUT;
328  if (!packed_form.length()) return tentacle::BAD_INPUT;
329  if (!classifier.length()) return tentacle::BAD_INPUT;
330  {
331  // keep anyone from being removed until we're done.
332  GRAB_LOCK;
333  _disallow_removals++;
334  }
335  tentacle *found = _tentacles->find(classifier);
336  outcome to_return;
337  if (!found) {
338 #ifdef DEBUG_OCTOPUS
339  LOG(astring("tentacle not found for: ") + classifier.text_form());
340 #endif
341  to_return = tentacle::NOT_FOUND;
342  } else {
343  to_return = found->reconstitute(classifier, packed_form, reformed);
344  }
345  // re-enable tentacle removals.
346  GRAB_LOCK;
347  _disallow_removals--;
348  return to_return;
349 }
350 
352  bool now)
353 {
354  FUNCDEF("evaluate");
355  periodic_cleaning(); // freshen up if it's that time.
356 
357  // check that the classifier is well formed.
358  if (!request->classifier().length()) {
359 #ifdef DEBUG_OCTOPUS
360  LOG("failed due to empty classifier.");
361 #endif
363  }
364 
365  _molock->lock();
366 
367  // block tentacle removals while we're working.
368  _disallow_removals++;
369 
370  // ensure that we pass this infoton through all the filters for vetting.
371  for (int i = 0; i < _filters->length(); i++) {
372  tentacle *current = (*_filters)[i];
373 #ifdef DEBUG_OCTOPUS_FILTERS
374  LOG(a_sprintf("%d: checking ", i + 1) + current->group().text_form());
375 #endif
376 
377  // check if the infoton is addressed specifically by this filter.
378  bool is_relevant = current->group().prefix_compare(request->classifier());
379 
380 #ifdef DEBUG_OCTOPUS_FILTERS
381  if (is_relevant)
382  LOG(astring("found it to be relevant! for ") + id.text_form())
383  else
384  LOG(astring("found it to not be relevant. for ") + id.text_form());
385 #endif
386 
387  // this infoton is _for_ this filter.
388  _molock->unlock();
389  // unlock octopus to allow others to operate.
390 
391  byte_array transformed;
392 //hmmm: maybe there should be a separate filter method?
393  outcome to_return = current->consume(*request, id, transformed);
394  // pass the infoton into the current filter.
395 
396  if (is_relevant) {
397  // the infoton was _for_ the current filter. that means that we are
398  // done processing it now.
399 #ifdef DEBUG_OCTOPUS_FILTERS
400  LOG(astring("filter ") + current->group().text_form() + " consumed "
401  "infoton from " + id.text_form() + " with result "
402  + tentacle::outcome_name(to_return));
403 #endif
404  WHACK(request);
405  GRAB_LOCK; // short re-establishment of the lock.
406  _disallow_removals--;
407  return to_return;
408  } else {
409  // the infoton was vetted by the filter. make sure it was liked.
410 #ifdef DEBUG_OCTOPUS_FILTERS
411  LOG(astring("filter ") + current->group().text_form() + " vetted "
412  "infoton " + id.text_form() + " with result "
413  + tentacle::outcome_name(to_return));
414 #endif
415  if (to_return == tentacle::PARTIAL) {
416  // if the infoton is partially complete, then we're allowed to keep
417  // going. this outcome means it was not prohibited.
418 
419  // make sure they didn't switch it out on us.
420  if (transformed.length()) {
421  // we need to substitute the transformed version for the original.
422  string_array classif;
423  byte_array decro; // decrypted packed infoton.
424  bool worked = infoton::fast_unpack(transformed, classif, decro);
425  if (!worked) {
426  LOG("failed to fast_unpack the transformed data.");
427  } else {
428  infoton *new_req = NULL_POINTER;
429  outcome rest_ret = restore(classif, decro, new_req);
430  if (rest_ret == tentacle::OKAY) {
431  // we got a good transformed version.
432  WHACK(request);
433  request = new_req; // substitution complete.
434  } else {
435  LOG("failed to restore transformed infoton.");
436  }
437  }
438  }
439 
440  _molock->lock(); // get the lock again.
441  continue;
442  } else {
443  // this is a failure to process that object.
444 #ifdef DEBUG_OCTOPUS_FILTERS
445  LOG(astring("filter ") + current->group().text_form() + " denied "
446  "infoton from " + id.text_form());
447 #endif
448  {
449  GRAB_LOCK; // short re-establishment of the lock.
450  _disallow_removals--;
451  }
452  WHACK_RETURN(to_return, request);
453  }
454  }
455  }
456 
457  // if we're here, then the infoton has been approved by all filters.
458 
459 #ifdef DEBUG_OCTOPUS_FILTERS
460  LOG(astring("all filters approved infoton: ") + id.text_form());
461 #endif
462 
463  // locate the appropriate tentacle for this request.
464  tentacle *found = _tentacles->find(request->classifier());
465 
466  _molock->unlock();
467  // from here in, the octopus itself is not locked up. but we have sent
468  // the signal that no one must remove any tentacles for now.
469 
470  if (!found) {
471 #ifdef DEBUG_OCTOPUS
472  LOG(astring("tentacle not found for: ")
473  + request->classifier().text_form());
474 #endif
475  GRAB_LOCK; // short re-establishment of the lock.
476  _disallow_removals--;
478  }
479  // make sure they want background execution and that the tentacle can
480  // support this.
481  if (!now && found->backgrounding()) {
482  // pass responsibility over to the tentacle.
483  outcome to_return = found->enqueue(request, id);
484  GRAB_LOCK; // short re-establishment of the lock.
485  _disallow_removals--;
486  return to_return;
487  } else {
488  // call the tentacle directly.
489  byte_array ignored;
490  outcome to_return = found->consume(*request, id, ignored);
491  WHACK(request);
492  GRAB_LOCK; // short re-establishment of the lock.
493  _disallow_removals--;
494  return to_return;
495  }
496 }
497 
499 {
500  FUNCDEF("periodic_cleaning");
501  time_stamp next_time;
502  {
503  auto_synchronizer l(*_clean_lock);
504  next_time = *_next_cleaning;
505  }
506  if (next_time < time_stamp()) {
507  // the bin locks itself, so we don't need to grab the lock here.
508  _responses->clean_out_deadwood();
509  auto_synchronizer l(*_clean_lock);
510  // lock before modifying the time stamp; only one writer.
511  _next_cleaning->reset(OCTOPUS_CHECKING_INTERVAL);
512  }
513 }
514 
516 {
517  if (!tentacle_name.length()) return NULL_POINTER;
518  _molock->lock();
519  tentacle *found = _tentacles->find(tentacle_name);
520  if (!found) {
521  _molock->unlock();
522  return NULL_POINTER;
523  }
524  return found;
525 }
526 
528 {
529  return octopus_entity(*_name, application_configuration::process_id(), _sequencer->next_id(),
530  _rando->inclusive(0, MAXINT32 / 4));
531 }
532 
533 } //namespace.
534 
a_sprintf is a specialization of astring that provides printf style support.
Definition: astring.h:440
Represents a sequential, ordered, contiguous collection of objects.
Definition: array.h:54
int length() const
Returns the current reported length of the allocated C array.
Definition: array.h:115
Provides a dynamically resizable ASCII character string.
Definition: astring.h:35
virtual void text_form(base_string &state_fill) const
Provides a text view of all the important info owned by this object.
Definition: astring.cpp:130
auto_synchronizer simplifies concurrent code by automatically unlocking.
Definition: mutex.h:113
A very common template for a dynamic array of bytes.
Definition: byte_array.h:36
void lock()
Clamps down on the mutex, if possible.
Definition: mutex.cpp:95
void unlock()
Gives up the possession of the mutex.
Definition: mutex.cpp:107
Outcomes describe the state of completion for an operation.
Definition: outcome.h:31
a platform-independent way to acquire random numbers in a specific range.
Definition: chaos.h:51
int inclusive(int low, int high) const
< Returns a pseudo-random number r, such that "low" <= r <= "high".
Definition: chaos.h:88
Stores a set of infotons grouped by the entity that owns them.
infoton * acquire_for_identifier(const octopus_request_id &id)
void clean_out_deadwood(int decay_interval=4 *basis::MINUTE_ms)
infoton * acquire_for_entity(const octopus_entity &requester, octopus_request_id &id)
Supports an early step in using octopus services: getting an identity.
a list of pending requests and who made them.
Definition: entity_defs.h:181
An infoton is an individual request parcel with accompanying information.
Definition: infoton.h:32
static bool fast_unpack(basis::byte_array &packed_form, structures::string_array &classifier, basis::byte_array &info)
undoes a previous fast_pack to restore the previous information.
Definition: infoton.cpp:227
const structures::string_array & classifier() const
this array of strings is the "name" for this infoton.
Definition: infoton.cpp:85
Provides a way of identifying users of an octopus object.
Definition: entity_defs.h:35
basis::astring mangled_form() const
returns the combined string form of the identifier.
Identifies requests made on an octopus by users.
Definition: entity_defs.h:114
entity_data_bin & responses()
allows external access to our set of results.
Definition: octopus.cpp:178
tentacle * locked_get_tentacle(int indy)
access indy'th tentacle.
Definition: octopus.cpp:184
void unlock_tentacles()
unlocks the list.
Definition: octopus.cpp:176
basis::outcome add_tentacle(tentacle *to_add, bool filter=false)
hooks a tentacle in to provide processing of one type of infoton.
Definition: octopus.cpp:253
void periodic_cleaning()
flushes any abandoned data from the response bin.
Definition: octopus.cpp:498
tentacle * lock_tentacle(const structures::string_array &tentacle_name)
locates the tentacle with the "tentacle_name" and returns it.
Definition: octopus.cpp:515
int locked_tentacle_count()
number of tentacles.
Definition: octopus.cpp:180
infoton * acquire_result(const octopus_entity &requester, octopus_request_id &original_id)
acquires responses to previous requests if there are any waiting.
Definition: octopus.cpp:190
basis::outcome evaluate(infoton *request, const octopus_request_id &item_id, bool now=false)
tries to process the "request" using the current set of tentacles.
Definition: octopus.cpp:351
void unlock_tentacle(tentacle *to_unlock)
unlocks the octopus when given a previously locked tentacle.
Definition: octopus.cpp:194
virtual ~octopus()
Definition: octopus.cpp:160
octopus_entity issue_identity()
creates an entity identifier that is unique for this octopus.
Definition: octopus.cpp:527
infoton * acquire_specific_result(const octopus_request_id &original_id)
supports seeking the result for a specific request.
Definition: octopus.cpp:187
basis::outcome restore(const structures::string_array &classifier, basis::byte_array &packed_form, infoton *&reformed)
regenerates a packed infoton given its classifier.
Definition: octopus.cpp:318
basis::outcome zap_tentacle(const structures::string_array &group_name)
similar to remove_tentacle(), but destroys the tentacle.
Definition: octopus.cpp:245
const basis::astring & name() const
returns the name that the octopus was constructed with.
Definition: octopus.cpp:182
void expunge(const octopus_entity &to_remove)
invokes every tentacle's expunge() method on the id "to_remove".
Definition: octopus.cpp:200
basis::outcome remove_tentacle(const structures::string_array &group_name, tentacle *&free_me)
removes the tentacle listed for the "group_name", if any.
Definition: octopus.cpp:279
void lock_tentacles()
locks the tentacle list for use with locked_get_tentacle.
Definition: octopus.cpp:174
Manages a service within an octopus by processing certain infotons.
Definition: tentacle.h:36
virtual basis::outcome consume(infoton &to_chow, const octopus_request_id &item_id, basis::byte_array &transformed)=0
this is the main function that processes infotons for this tentacle.
basis::outcome enqueue(infoton *to_chow, const octopus_request_id &item_id)
holds onto infotons coming from the octopus for backgrounding.
Definition: tentacle.cpp:131
bool backgrounding() const
reports on whether this tentacle supports background operation or not.
Definition: tentacle.h:58
const structures::string_array & group() const
returns the name of the group that this tentacle services.
Definition: tentacle.cpp:96
virtual basis::outcome reconstitute(const structures::string_array &classifier, basis::byte_array &packed_form, infoton *&reformed)=0
regenerates an infoton from its packed form.
static const char * outcome_name(const basis::outcome &to_name)
returns the textual form of the outcome "to_name".
Definition: tentacle.cpp:98
@ PARTIAL
processing of request is partially done.
Definition: tentacle.h:74
void attach_storage(entity_data_bin &storage)
used when a tentacle is being integrated with an octopus.
Definition: tentacle.cpp:106
Implements a thread-safe roller object.
Definition: safe_roller.h:30
int next_id()
returns a unique (per instance of this type) id.
Definition: safe_roller.cpp:49
void reset()
cleans out all of the contents.
Definition: amorph.h:81
An array of strings with some additional helpful methods.
Definition: string_array.h:32
bool prefix_compare(const string_array &second) const
Returns true if all of the elements in this are the same in "second".
Definition: string_array.h:98
basis::astring text_form() const
A synonym for the text_format() method.
Definition: string_array.h:71
Represents a point in time relative to the operating system startup time.
Definition: time_stamp.h:38
void reset()
sets the stamp time back to now.
Definition: time_stamp.cpp:59
#define continuable_error(c, f, i)
#define NULL_POINTER
The value representing a pointer to nothing.
Definition: definitions.h:32
#define MAXINT32
Maximum 32-bit integer value.
Definition: definitions.h:75
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
Definition: enhance_cpp.h:57
The guards collection helps in testing preconditions and reporting errors.
Definition: array.h:30
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
Definition: functions.h:121
bool negative(const type &a)
negative returns true if "a" is less than zero.
Definition: functions.h:43
const int MINUTE_ms
Number of milliseconds in a minute.
Definition: definitions.h:121
const int KILOBYTE
Number of bytes in a kilobyte.
Definition: definitions.h:134
A logger that sends to the console screen using the standard output device.
An extension to floating point primitives providing approximate equality.
Definition: averager.h:21
const int MAXIMUM_TRASH_SIZE
Definition: octopus.cpp:65
const int OCTOPUS_CHECKING_INTERVAL
Definition: octopus.cpp:71
A dynamic container class that holds any kind of object via pointers.
Definition: amorph.h:55
#include <time.h>
Definition: earth_time.cpp:37
time_locus now()
returns our current locus in the time continuum.
Definition: earth_time.cpp:352
#define GRAB_LOCK
Definition: octopus.cpp:50
#define WHACK_RETURN(to_ret, to_whack)
Definition: octopus.cpp:57
#define LOG(t)
Definition: octopus.cpp:69
chaos _rando