1 /*****************************************************************************\
4 * Author : Chris Koeritz *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
15 #include "entity_data_bin.h"
16 #include "entity_defs.h"
17 #include "identity_tentacle.h"
21 #include "unhandled_request.h"
23 #include <basis/astring.h>
24 #include <basis/mutex.h>
25 #include <configuration/application_configuration.h>
26 #include <loggers/critical_events.h>
27 #include <loggers/program_wide_logger.h>
28 #include <mathematics/chaos.h>
29 #include <structures/amorph.h>
30 #include <structures/string_hash.h>
31 #include <timely/time_control.h>
32 #include <timely/time_stamp.h>
34 using namespace basis;
35 using namespace configuration;
36 using namespace loggers;
37 using namespace mathematics;
38 using namespace processes;
39 using namespace structures;
40 using namespace timely;
44 //#define DEBUG_OCTOPUS
45 // uncomment for debugging noise.
46 //#define DEBUG_OCTOPUS_FILTERS
47 // uncomment for noisy filter processing.
51 auto_synchronizer l(*_molock)
53 // this macro returns a result and deletes the request due to a failure. it
54 // stores a response for the request, in case they were expecting one, since
55 // otherwise they will wait a long time for a response that isn't coming. if
56 // those responses are never picked up, they will eventually be cleaned out.
57 #define WHACK_RETURN(to_ret, to_whack) { \
58 unhandled_request *bad_response = new unhandled_request(id, \
59 request->classifier(), to_ret); \
60 _responses->add_item(bad_response, id); \
65 const int MAXIMUM_TRASH_SIZE = 128 * KILOBYTE;
66 // this is how much we'll toss out on closing an entity.
69 #define LOG(t) CLASS_EMERGENCY_LOG(program_wide_logger::get(), t)
71 const int OCTOPUS_CHECKING_INTERVAL = 4 * MINUTE_ms;
72 // the frequency in milliseconds of cleaning on the response bin. this
73 // doesn't need to happen very often; it only tosses data that has been
74 // abandoned in the response bin.
78 class filter_list : public array<tentacle *>
81 bool remove(tentacle *to_remove) {
82 for (int i = 0; i < length(); i++) {
83 if (get(i) == to_remove) {
100 tentacle_record(tentacle *limb, bool filter)
101 : _limb(limb), _filter(filter) {}
103 ~tentacle_record() { WHACK(_limb); }
108 class modula_oblongata : public amorph<tentacle_record>
111 modula_oblongata() : amorph<tentacle_record>() {}
113 int find_index(const string_array &group) {
114 for (int i = 0; i < elements(); i++) {
115 if (borrow(i)->_limb->group().prefix_compare(group))
118 return common::NOT_FOUND;
121 tentacle *find(const string_array &group) {
122 int indy = find_index(group);
123 if (negative(indy)) return NULL_POINTER;
124 return borrow(indy)->_limb;
127 bool zap(int a, int b) {
128 outcome ret = amorph<tentacle_record>::zap(a, b);
129 return ret == common::OKAY;
132 bool zap(const string_array &group) {
133 int indy = find_index(group);
134 if (negative(indy)) return false;
135 amorph<tentacle_record>::zap(indy, indy);
142 octopus::octopus(const astring &name, int max_per_ent)
143 : _name(new astring(name)),
144 _tentacles(new modula_oblongata),
146 _responses(new entity_data_bin(max_per_ent)),
147 _disallow_removals(0),
148 _next_cleaning(new time_stamp(OCTOPUS_CHECKING_INTERVAL)),
149 _clean_lock(new mutex),
150 _filters(new filter_list),
151 _sequencer(new safe_roller(1, MAXINT32 / 2)),
154 add_tentacle(new identity_tentacle(*this), true);
155 // register a way to issue identities. this is a filter.
156 add_tentacle(new unhandled_request_tentacle(false), false);
157 // provide a way to unpack the unhandled_request object.
162 FUNCDEF("destructor");
166 WHACK(_next_cleaning);
174 void octopus::lock_tentacles() { _molock->lock(); }
176 void octopus::unlock_tentacles() { _molock->unlock(); }
178 entity_data_bin &octopus::responses() { return *_responses; }
180 int octopus::locked_tentacle_count() { return _tentacles->elements(); }
182 const astring &octopus::name() const { return *_name; }
184 tentacle *octopus::locked_get_tentacle(int indy)
185 { return _tentacles->borrow(indy)->_limb; }
187 infoton *octopus::acquire_specific_result(const octopus_request_id &id)
188 { return _responses->acquire_for_identifier(id); }
190 infoton *octopus::acquire_result(const octopus_entity &requester,
191 octopus_request_id &id)
192 { return _responses->acquire_for_entity(requester, id); }
194 void octopus::unlock_tentacle(tentacle *to_unlock)
196 to_unlock = NULL_POINTER;
200 void octopus::expunge(const octopus_entity &to_remove)
204 // temporary lock so we can keep tentacles from evaporating.
206 _disallow_removals++;
209 // we've now ensured that no tentacles will be removed, so at most the
210 // list would get longer. we'll settle on its current length.
211 int len = _tentacles->elements();
212 for (int i = 0; i < len; i++) {
213 tentacle_record *curr = _tentacles->borrow(i);
214 if (!curr || !curr->_limb) {
215 //complain... logic error.
218 // activate the expunge method on the current tentacle.
219 curr->_limb->expunge(to_remove);
223 // re-enable tentacle removals.
225 _disallow_removals--;
228 // throw out any data that was waiting for that guy.
231 while (items_found) {
232 // grab a chunk of items to be trashed.
233 items_found = responses().acquire_for_entity(to_remove, junk,
236 //#ifdef DEBUG_OCTOPUS
238 LOG(a_sprintf("cleaned %d items for expunged entity ", items_found)
239 + to_remove.mangled_form());
245 outcome octopus::zap_tentacle(const string_array &tentacle_name)
247 tentacle *found = NULL_POINTER;
248 outcome ret = remove_tentacle(tentacle_name, found);
253 outcome octopus::add_tentacle(tentacle *to_add, bool filter)
255 FUNCDEF("add_tentacle");
256 if (!to_add) return tentacle::BAD_INPUT;
257 if (!to_add->group().length()) return tentacle::BAD_INPUT;
258 outcome zapped_it = zap_tentacle(to_add->group());
259 if (zapped_it == tentacle::OKAY) {
260 //#ifdef DEBUG_OCTOPUS
261 LOG(astring("removed existing tentacle: ") + to_add->group().text_form());
265 tentacle *found = _tentacles->find(to_add->group());
266 // if found is non-null, then that would be a serious logic error since
267 // we just zapped it above.
268 if (found) return tentacle::ALREADY_EXISTS;
269 to_add->attach_storage(*_responses);
270 tentacle_record *new_record = new tentacle_record(to_add, filter);
271 _tentacles->append(new_record);
272 if (filter) *_filters += to_add;
274 LOG(astring("added tentacle on ") + to_add->group().text_form());
276 return tentacle::OKAY;
279 outcome octopus::remove_tentacle(const string_array &group_name,
282 FUNCDEF("remove_tentacle");
283 free_me = NULL_POINTER;
284 if (!group_name.length()) return tentacle::BAD_INPUT;
286 // repeatedly grab the lock and make sure we're allowed to remove. if
287 // we're told we can't remove yet, then we drop the lock again and pause.
289 if (!_disallow_removals) {
290 // we ARE allowed to remove it right now. we leave the loop in
291 // possession of the lock.
294 if (_disallow_removals < 0) {
295 continuable_error(class_name(), func, "logic error in removal "
296 "reference counter.");
299 time_control::sleep_ms(0); // yield thread's execution to another thread.
301 int indy = _tentacles->find_index(group_name);
302 if (negative(indy)) {
305 return tentacle::NOT_FOUND;
308 tentacle_record *freeing = _tentacles->acquire(indy);
309 _tentacles->zap(indy, indy);
310 free_me = freeing->_limb;
311 _filters->remove(free_me);
313 freeing->_limb = NULL_POINTER;
315 return tentacle::OKAY;
318 outcome octopus::restore(const string_array &classifier,
319 byte_array &packed_form, infoton * &reformed)
324 periodic_cleaning(); // freshen up if it's that time.
326 reformed = NULL_POINTER;
327 if (!classifier.length()) return tentacle::BAD_INPUT;
328 if (!packed_form.length()) return tentacle::BAD_INPUT;
329 if (!classifier.length()) return tentacle::BAD_INPUT;
331 // keep anyone from being removed until we're done.
333 _disallow_removals++;
335 tentacle *found = _tentacles->find(classifier);
339 LOG(astring("tentacle not found for: ") + classifier.text_form());
341 to_return = tentacle::NOT_FOUND;
343 to_return = found->reconstitute(classifier, packed_form, reformed);
345 // re-enable tentacle removals.
347 _disallow_removals--;
351 outcome octopus::evaluate(infoton *request, const octopus_request_id &id,
355 periodic_cleaning(); // freshen up if it's that time.
357 // check that the classifier is well formed.
358 if (!request->classifier().length()) {
360 LOG("failed due to empty classifier.");
362 WHACK_RETURN(tentacle::BAD_INPUT, request);
367 // block tentacle removals while we're working.
368 _disallow_removals++;
370 // ensure that we pass this infoton through all the filters for vetting.
371 for (int i = 0; i < _filters->length(); i++) {
372 tentacle *current = (*_filters)[i];
373 #ifdef DEBUG_OCTOPUS_FILTERS
374 LOG(a_sprintf("%d: checking ", i + 1) + current->group().text_form());
377 // check if the infoton is addressed specifically by this filter.
378 bool is_relevant = current->group().prefix_compare(request->classifier());
380 #ifdef DEBUG_OCTOPUS_FILTERS
382 LOG(astring("found it to be relevant! for ") + id.text_form())
384 LOG(astring("found it to not be relevant. for ") + id.text_form());
387 // this infoton is _for_ this filter.
389 // unlock octopus to allow others to operate.
391 byte_array transformed;
392 //hmmm: maybe there should be a separate filter method?
393 outcome to_return = current->consume(*request, id, transformed);
394 // pass the infoton into the current filter.
397 // the infoton was _for_ the current filter. that means that we are
398 // done processing it now.
399 #ifdef DEBUG_OCTOPUS_FILTERS
400 LOG(astring("filter ") + current->group().text_form() + " consumed "
401 "infoton from " + id.text_form() + " with result "
402 + tentacle::outcome_name(to_return));
405 GRAB_LOCK; // short re-establishment of the lock.
406 _disallow_removals--;
409 // the infoton was vetted by the filter. make sure it was liked.
410 #ifdef DEBUG_OCTOPUS_FILTERS
411 LOG(astring("filter ") + current->group().text_form() + " vetted "
412 "infoton " + id.text_form() + " with result "
413 + tentacle::outcome_name(to_return));
415 if (to_return == tentacle::PARTIAL) {
416 // if the infoton is partially complete, then we're allowed to keep
417 // going. this outcome means it was not prohibited.
419 // make sure they didn't switch it out on us.
420 if (transformed.length()) {
421 // we need to substitute the transformed version for the original.
422 string_array classif;
423 byte_array decro; // decrypted packed infoton.
424 bool worked = infoton::fast_unpack(transformed, classif, decro);
426 LOG("failed to fast_unpack the transformed data.");
428 infoton *new_req = NULL_POINTER;
429 outcome rest_ret = restore(classif, decro, new_req);
430 if (rest_ret == tentacle::OKAY) {
431 // we got a good transformed version.
433 request = new_req; // substitution complete.
435 LOG("failed to restore transformed infoton.");
440 _molock->lock(); // get the lock again.
443 // this is a failure to process that object.
444 #ifdef DEBUG_OCTOPUS_FILTERS
445 LOG(astring("filter ") + current->group().text_form() + " denied "
446 "infoton from " + id.text_form());
449 GRAB_LOCK; // short re-establishment of the lock.
450 _disallow_removals--;
452 WHACK_RETURN(to_return, request);
457 // if we're here, then the infoton has been approved by all filters.
459 #ifdef DEBUG_OCTOPUS_FILTERS
460 LOG(astring("all filters approved infoton: ") + id.text_form());
463 // locate the appropriate tentacle for this request.
464 tentacle *found = _tentacles->find(request->classifier());
467 // from here in, the octopus itself is not locked up. but we have sent
468 // the signal that no one must remove any tentacles for now.
472 LOG(astring("tentacle not found for: ")
473 + request->classifier().text_form());
475 GRAB_LOCK; // short re-establishment of the lock.
476 _disallow_removals--;
477 WHACK_RETURN(tentacle::NOT_FOUND, request);
479 // make sure they want background execution and that the tentacle can
481 if (!now && found->backgrounding()) {
482 // pass responsibility over to the tentacle.
483 outcome to_return = found->enqueue(request, id);
484 GRAB_LOCK; // short re-establishment of the lock.
485 _disallow_removals--;
488 // call the tentacle directly.
490 outcome to_return = found->consume(*request, id, ignored);
492 GRAB_LOCK; // short re-establishment of the lock.
493 _disallow_removals--;
498 void octopus::periodic_cleaning()
500 FUNCDEF("periodic_cleaning");
501 time_stamp next_time;
503 auto_synchronizer l(*_clean_lock);
504 next_time = *_next_cleaning;
506 if (next_time < time_stamp()) {
507 // the bin locks itself, so we don't need to grab the lock here.
508 _responses->clean_out_deadwood();
509 auto_synchronizer l(*_clean_lock);
510 // lock before modifying the time stamp; only one writer.
511 _next_cleaning->reset(OCTOPUS_CHECKING_INTERVAL);
515 tentacle *octopus::lock_tentacle(const string_array &tentacle_name)
517 if (!tentacle_name.length()) return NULL_POINTER;
519 tentacle *found = _tentacles->find(tentacle_name);
527 octopus_entity octopus::issue_identity()
529 return octopus_entity(*_name, application_configuration::process_id(), _sequencer->next_id(),
530 _rando->inclusive(0, MAXINT32 / 4));