1 /*****************************************************************************\
3 * Name : entity_data_bin *
4 * Author : Chris Koeritz *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
15 #include "entity_data_bin.h"
16 #include "entity_defs.h"
20 #include <basis/astring.h>
22 #include <basis/mutex.h>
23 #include <loggers/program_wide_logger.h>
24 #include <structures/set.h>
25 #include <structures/string_array.h>
26 #include <structures/amorph.h>
27 #include <structures/string_hash.h>
28 #include <textual/parser_bits.h>
29 #include <timely/time_stamp.h>
31 using namespace basis;
32 using namespace loggers;
33 using namespace structures;
34 using namespace textual;
35 using namespace timely;
39 //#define DEBUG_ENTITY_DATA_BIN
40 // uncomment for more debugging information.
44 auto_synchronizer l(*_ent_lock)
47 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
49 const int OCTOPUS_TABLE_BITS = 6;
50 // the hash table for items will have 2^N entries.
52 //hmmm: parameterize in class interface?
53 ////const int DATA_DECAY_INTERVAL = 4 * MINUTE_ms;
54 // if we haven't gotten a data item out to its entity in this long, then
55 // we assume the entity has croaked or doesn't want its data.
62 infoton *_item; // the data making up the production.
63 octopus_request_id _id; // the id, if any, of the original request.
64 time_stamp _when_added; // when the data became available.
66 infoton_holder(const octopus_request_id &id = octopus_request_id(),
67 infoton *item = NULL_POINTER)
68 : _item(item), _id(id), _when_added() {}
70 ~infoton_holder() { WHACK(_item); }
72 astring text_form() const {
73 return astring("id=") + _id.text_form() + ", added="
74 + _when_added.text_form() + ", item="
75 + _item->classifier().text_form() + ", data="
82 class entity_basket : public amorph<infoton_holder>
85 time_stamp _last_active;
87 astring text_form() const {
89 for (int i = 0; i < elements(); i++)
90 to_return += get(i)->text_form() + parser_bits::platform_eol_to_chars();
97 class entity_hasher : public hashing_algorithm
100 virtual hashing_algorithm *clone() const { return new entity_hasher; }
102 virtual basis::un_int hash(const void *key_data, int formal(key_length)) const {
103 octopus_entity *key = (octopus_entity *)key_data;
104 // jiggle the pieces of the id into a number.
105 return basis::un_int(
107 + (key->add_in() << 10)
108 + (key->sequencer() << 14)
109 + (key->hostname()[0] << 20)
110 + (key->hostname()[1] << 24) );
116 class entity_item_hash
117 : public hash_table<octopus_entity, entity_basket>
120 entity_item_hash(const entity_hasher &hash)
121 : hash_table<octopus_entity, entity_basket>(hash, OCTOPUS_TABLE_BITS)
127 class basketcase : public structures::set<octopus_entity>
134 // used for our apply methods for communicating back to the caller.
137 basketcase *_empty_baskets;
138 entity_basket *_any_item;
139 int &_items_held; // hooks to parent's item count.
140 int _decay_interval; // how long are items allowed to live?
142 apply_struct(int &items_held)
143 : _empty_baskets(NULL_POINTER), _any_item(NULL_POINTER), _items_held(items_held),
144 _decay_interval(0) {}
149 entity_data_bin::entity_data_bin(int max_size_per_entity)
150 : _table(new entity_item_hash(entity_hasher())),
151 _ent_lock(new mutex),
153 _max_per_ent(max_size_per_entity),
157 entity_data_bin::~entity_data_bin()
163 int entity_data_bin::entities() const
166 return _table->elements();
169 struct text_form_accumulator { astring _accum; };
171 bool text_form_applier(const octopus_entity &formal(key), entity_basket &bask,
174 text_form_accumulator *shuttle = (text_form_accumulator *)data_link;
175 shuttle->_accum += bask.text_form();
179 astring entity_data_bin::text_form() const
182 text_form_accumulator shuttle;
183 _table->apply(text_form_applier, &shuttle);
184 return shuttle._accum;
187 bool scramble_applier(const octopus_entity &formal(key), entity_basket &bask,
190 #undef static_class_name
191 #define static_class_name() "entity_data_bin"
192 FUNCDEF("scramble_applier");
193 int *county = (int *)data_link;
194 *county += bask.elements();
196 #undef static_class_name
199 // this could be extended to do more interesting checks also; currently it's
200 // just like the entities() method really.
201 int entity_data_bin::scramble_counter()
205 _table->apply(scramble_applier, &count);
209 #ifdef DEBUG_ENTITY_DATA_BIN
211 if ( !(_action_count++ % 100) ) { \
212 int items = scramble_counter(); \
213 LOG(a_sprintf("-> %d items counted.", items)); \
219 bool entity_data_bin::add_item(infoton *to_add,
220 const octopus_request_id &orig_id)
224 // create a record to add to the appropriate bin.
225 infoton_holder *holder = new infoton_holder(orig_id, to_add);
227 // see if a basket already exists for the entity.
228 entity_basket *bask = _table->find(orig_id._entity);
230 // this entity doesn't have a basket so add one.
231 bask = new entity_basket;
232 _table->add(orig_id._entity, bask);
235 bask->_last_active = time_stamp(); // reset activity time.
237 // count up the current amount of data in use.
238 int current_size = 0;
239 for (int i = 0; i < bask->elements(); i++)
240 current_size += bask->borrow(i)->_item->packed_size();
242 if (current_size + to_add->packed_size() > _max_per_ent) {
244 LOG(astring("size limit would be exceeded if we stored this product"));
248 // append the latest production to the list.
249 bask->append(holder);
254 bool any_item_applier(const octopus_entity &formal(key), entity_basket &bask,
257 #define static_class_name() "entity_data_bin"
258 FUNCDEF("any_item_applier");
259 apply_struct *apple = (apply_struct *)data_link;
260 // check the basket to see if it has any items.
261 if (!bask.elements()) {
262 //#ifdef DEBUG_ENTITY_DATA_BIN
263 // LOG(astring("saw empty basket ") + key.mangled_form());
265 return true; // continue iterating.
267 apple->_any_item = &bask;
268 return false; // stop iteration.
269 #undef static_class_name
272 infoton *entity_data_bin::acquire_for_any(octopus_request_id &id)
274 FUNCDEF("acquire_for_any");
276 apply_struct apple(_items_held);
277 _table->apply(any_item_applier, &apple);
278 if (!apple._any_item) return NULL_POINTER;
280 // retrieve the information from our basket that was provided.
281 infoton_holder *found = apple._any_item->acquire(0);
282 apple._any_item->zap(0, 0);
283 if (!apple._any_item->elements()) {
284 // toss this empty basket.
285 #ifdef DEBUG_ENTITY_DATA_BIN
286 LOG(astring("tossing empty basket ") + found->_id._entity.mangled_form());
288 _table->zap(found->_id._entity);
290 apple._any_item = NULL_POINTER;
291 infoton *to_return = found->_item;
293 found->_item = NULL_POINTER; // clear so it won't be whacked.
296 //#ifdef DEBUG_ENTITY_DATA_BIN
298 LOG("logic error: number of items went below zero.");
303 int entity_data_bin::acquire_for_entity(const octopus_entity &requester,
304 infoton_list &items, int maximum_size)
306 FUNCDEF("acquire_for_entity [multiple]");
307 // this method does not grab the lock because it simply composes other
308 // class methods without interacting with class data members.
310 if (maximum_size <= 0) maximum_size = 20 * KILOBYTE;
311 // pick a reasonable default.
312 octopus_request_id id;
314 while (maximum_size > 0) {
315 infoton *inf = acquire_for_entity(requester, id);
318 items.append(new infoton_id_pair(inf, id));
319 maximum_size -= inf->packed_size();
325 infoton *entity_data_bin::acquire_for_entity(const octopus_entity &requester,
326 octopus_request_id &id)
328 FUNCDEF("acquire_for_entity [single]");
329 id = octopus_request_id(); // reset it.
331 infoton *to_return = NULL_POINTER;
332 entity_basket *bask = _table->find(requester);
336 if (!bask->elements()) {
337 #ifdef DEBUG_ENTITY_DATA_BIN
338 LOG(astring("tossing empty basket ") + requester.mangled_form());
340 _table->zap(requester);
344 id = bask->get(0)->_id;
345 to_return = bask->borrow(0)->_item;
346 bask->borrow(0)->_item = NULL_POINTER;
348 if (!bask->elements()) {
349 #ifdef DEBUG_ENTITY_DATA_BIN
350 LOG(astring("tossing empty basket ") + requester.mangled_form());
352 _table->zap(requester);
355 //#ifdef DEBUG_ENTITY_DATA_BIN
357 LOG("logic error: number of items went below zero.");
362 infoton *entity_data_bin::acquire_for_identifier(const octopus_request_id &id)
364 FUNCDEF("acquire_for_identifier");
365 infoton *to_return = NULL_POINTER;
367 entity_basket *bask = _table->find(id._entity);
368 if (!bask) return NULL_POINTER;
369 if (!bask->elements()) {
370 #ifdef DEBUG_ENTITY_DATA_BIN
371 LOG(astring("tossing empty basket ") + id._entity.mangled_form());
373 _table->zap(id._entity);
376 for (int i = 0; i < bask->elements(); i++) {
377 if (bask->get(i)->_id == id) {
378 to_return = bask->borrow(i)->_item; // snag the item.
379 bask->borrow(i)->_item = NULL_POINTER; // clear the list's version out.
380 bask->zap(i, i); // whack the sanitized element.
382 if (!bask->elements()) {
383 #ifdef DEBUG_ENTITY_DATA_BIN
384 LOG(astring("tossing empty basket ") + id._entity.mangled_form());
386 _table->zap(id._entity);
389 //#ifdef DEBUG_ENTITY_DATA_BIN
391 LOG("logic error: number of items went below zero.");
399 bool cleaning_applier(const octopus_entity &key, entity_basket &bask,
402 #define static_class_name() "entity_data_bin"
403 FUNCDEF("cleaning_applier");
404 apply_struct *apple = (apply_struct *)data_link;
405 time_stamp expiration_time(-apple->_decay_interval);
408 for (int i = 0; i < bask.elements(); i++) {
409 infoton_holder &rec = *bask.borrow(i);
410 if (rec._when_added <= expiration_time) {
411 // if a requester hasn't picked this up in N seconds, then drop it.
412 #ifdef DEBUG_ENTITY_DATA_BIN
413 LOG(astring("whacking old item ") + rec._id.text_form());
416 apple->_items_held--;
417 //#ifdef DEBUG_ENTITY_DATA_BIN
418 if (apple->_items_held < 0)
419 LOG("logic error: number of items went below zero.");
422 i--; // skip back before the delete.
424 // NOTE: this break is based on an assumption about the storage of
425 // items; if it's ever the case in the future that items can be
426 // disordered on time of arrival in the queue, then the break should
431 #ifdef DEBUG_ENTITY_DATA_BIN
433 LOG(a_sprintf("==> whacked %d old items.", whack_count));
435 if (!bask.elements()) {
436 // if the basket has nothing left in it then we signal the parent that
437 // it can be deleted.
438 //LOG("adding to empty basket list.");
439 *apple->_empty_baskets += key;
440 //LOG("added to empty basket list.");
443 // keep iterating on items unless we know it's time to go.
445 #undef static_class_name
448 void entity_data_bin::clean_out_deadwood(int decay_interval)
450 #ifdef DEBUG_ENTITY_DATA_BIN
451 FUNCDEF("clean_out_deadwood");
454 // check that no items have timed out.
455 apply_struct apple(_items_held);
456 basketcase empty_baskets;
457 apple._empty_baskets = &empty_baskets;
458 apple._decay_interval = decay_interval;
459 _table->apply(cleaning_applier, &apple);
461 // clean up any entities whose baskets are empty.
462 for (int i = empty_baskets.length() - 1; i >= 0; i--) {
463 #ifdef DEBUG_ENTITY_DATA_BIN
464 LOG(astring("removing basket ") + empty_baskets.get(i).mangled_form());
466 _table->zap(empty_baskets.get(i));
467 empty_baskets.zap(i, i);
468 // we don't skip back since we're scanning the array from its end.
472 bool entity_data_bin::get_sizes(const octopus_entity &id, int &items,
475 FUNCDEF("get_sizes");
479 entity_basket *bask = _table->find(id);
480 if (!bask || !bask->elements()) return false;
481 items = bask->elements();
482 for (int i = 0; i < bask->elements(); i++)
483 bytes += bask->borrow(i)->_item->packed_size();