1 /*****************************************************************************\
3 * Name : entity_data_bin *
4 * Author : Chris Koeritz *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
15 #include "entity_data_bin.h"
16 #include "entity_defs.h"
20 #include <basis/astring.h>
22 #include <basis/mutex.h>
23 #include <loggers/program_wide_logger.h>
24 #include <structures/set.h>
25 #include <structures/string_array.h>
26 #include <structures/amorph.h>
27 #include <structures/string_hash.h>
28 #include <textual/parser_bits.h>
29 #include <timely/time_stamp.h>
31 using namespace basis;
32 using namespace loggers;
33 using namespace structures;
34 using namespace textual;
35 using namespace timely;
39 //#define DEBUG_ENTITY_DATA_BIN
40 // uncomment for more debugging information.
44 auto_synchronizer l(*_ent_lock)
47 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
49 const int OCTOPUS_TABLE_BITS = 6;
50 // the hash table for items will have 2^N entries.
52 //hmmm: parameterize in class interface?
53 ////const int DATA_DECAY_INTERVAL = 4 * MINUTE_ms;
54 // if we haven't gotten a data item out to its entity in this long, then
55 // we assume the entity has croaked or doesn't want its data.
62 infoton *_item; // the data making up the production.
63 octopus_request_id _id; // the id, if any, of the original request.
64 time_stamp _when_added; // when the data became available.
66 infoton_holder(const octopus_request_id &id = octopus_request_id(),
68 : _item(item), _id(id), _when_added() {}
70 ~infoton_holder() { WHACK(_item); }
72 astring text_form() const {
73 return astring("id=") + _id.text_form() + ", added="
74 + _when_added.text_form() + ", item="
75 + _item->classifier().text_form() + ", data="
82 class entity_basket : public amorph<infoton_holder>
85 time_stamp _last_active;
87 astring text_form() const {
89 for (int i = 0; i < elements(); i++)
90 to_return += get(i)->text_form() + parser_bits::platform_eol_to_chars();
97 class entity_hasher : public hashing_algorithm
100 virtual hashing_algorithm *clone() const { return new entity_hasher; }
102 virtual basis::un_int hash(const void *key_data, int formal(key_length)) const {
103 octopus_entity *key = (octopus_entity *)key_data;
104 // jiggle the pieces of the id into a number.
105 return basis::un_int(
107 + (key->add_in() << 10)
108 + (key->sequencer() << 14)
109 + (key->hostname()[0] << 20)
110 + (key->hostname()[1] << 24) );
116 class entity_item_hash
117 : public hash_table<octopus_entity, entity_basket>
120 entity_item_hash(const entity_hasher &hash)
121 : hash_table<octopus_entity, entity_basket>(hash, OCTOPUS_TABLE_BITS)
127 class basketcase : public structures::set<octopus_entity>
134 // used for our apply methods for communicating back to the caller.
137 basketcase *_empty_baskets;
138 entity_basket *_any_item;
139 int &_items_held; // hooks to parent's item count.
140 int _decay_interval; // how long are items allowed to live?
142 apply_struct(int &items_held)
143 : _empty_baskets(NIL), _any_item(NIL), _items_held(items_held),
144 _decay_interval(0) {}
149 entity_data_bin::entity_data_bin(int max_size_per_entity)
150 : _table(new entity_item_hash(entity_hasher())),
151 _ent_lock(new mutex),
153 _max_per_ent(max_size_per_entity),
157 entity_data_bin::~entity_data_bin()
163 int entity_data_bin::entities() const
166 return _table->elements();
169 struct text_form_accumulator { astring _accum; };
171 bool text_form_applier(const octopus_entity &formal(key), entity_basket &bask,
174 text_form_accumulator *shuttle = (text_form_accumulator *)data_link;
175 shuttle->_accum += bask.text_form();
179 astring entity_data_bin::text_form() const
182 text_form_accumulator shuttle;
183 _table->apply(text_form_applier, &shuttle);
184 return shuttle._accum;
187 bool scramble_applier(const octopus_entity &formal(key), entity_basket &bask,
190 #undef static_class_name
191 #define static_class_name() "entity_data_bin"
192 // FUNCDEF("scramble_applier");
193 int *county = (int *)data_link;
194 *county += bask.elements();
196 #undef static_class_name
199 // this could be extended to do more interesting checks also; currently it's
200 // just like the entities() method really.
201 int entity_data_bin::scramble_counter()
205 _table->apply(scramble_applier, &count);
209 #ifdef DEBUG_ENTITY_DATA_BIN
211 if ( !(_action_count++ % 100) ) { \
212 int items = scramble_counter(); \
213 LOG(a_sprintf("-> %d items counted.", items)); \
219 bool entity_data_bin::add_item(infoton *to_add,
220 const octopus_request_id &orig_id)
222 // FUNCDEF("add_item");
224 // create a record to add to the appropriate bin.
225 infoton_holder *holder = new infoton_holder(orig_id, to_add);
227 // see if a basket already exists for the entity.
228 entity_basket *bask = _table->find(orig_id._entity);
230 // this entity doesn't have a basket so add one.
231 bask = new entity_basket;
232 _table->add(orig_id._entity, bask);
235 bask->_last_active = time_stamp(); // reset activity time.
237 // count up the current amount of data in use.
238 int current_size = 0;
239 for (int i = 0; i < bask->elements(); i++)
240 current_size += bask->borrow(i)->_item->packed_size();
242 if (current_size + to_add->packed_size() > _max_per_ent) {
247 // append the latest production to the list.
248 bask->append(holder);
253 bool any_item_applier(const octopus_entity &formal(key), entity_basket &bask,
256 //#ifdef DEBUG_ENTITY_DATA_BIN
257 // #define static_class_name() "entity_data_bin"
258 // FUNCDEF("any_item_applier");
260 apply_struct *apple = (apply_struct *)data_link;
261 // check the basket to see if it has any items.
262 if (!bask.elements()) {
263 //#ifdef DEBUG_ENTITY_DATA_BIN
264 // LOG(astring("saw empty basket ") + key.mangled_form());
266 return true; // continue iterating.
268 apple->_any_item = &bask;
269 return false; // stop iteration.
270 #undef static_class_name
273 infoton *entity_data_bin::acquire_for_any(octopus_request_id &id)
275 FUNCDEF("acquire_for_any");
277 apply_struct apple(_items_held);
278 _table->apply(any_item_applier, &apple);
279 if (!apple._any_item) return NIL;
281 // retrieve the information from our basket that was provided.
282 infoton_holder *found = apple._any_item->acquire(0);
283 apple._any_item->zap(0, 0);
284 if (!apple._any_item->elements()) {
285 // toss this empty basket.
286 #ifdef DEBUG_ENTITY_DATA_BIN
287 LOG(astring("tossing empty basket ") + found->_id._entity.mangled_form());
289 _table->zap(found->_id._entity);
291 apple._any_item = NIL;
292 infoton *to_return = found->_item;
294 found->_item = NIL; // clear so it won't be whacked.
297 //#ifdef DEBUG_ENTITY_DATA_BIN
299 LOG("logic error: number of items went below zero.");
304 int entity_data_bin::acquire_for_entity(const octopus_entity &requester,
305 infoton_list &items, int maximum_size)
307 // FUNCDEF("acquire_for_entity [multiple]");
308 // this method does not grab the lock because it simply composes other
309 // class methods without interacting with class data members.
311 if (maximum_size <= 0) maximum_size = 20 * KILOBYTE;
312 // pick a reasonable default.
313 octopus_request_id id;
315 while (maximum_size > 0) {
316 infoton *inf = acquire_for_entity(requester, id);
319 items.append(new infoton_id_pair(inf, id));
320 maximum_size -= inf->packed_size();
326 infoton *entity_data_bin::acquire_for_entity(const octopus_entity &requester,
327 octopus_request_id &id)
329 FUNCDEF("acquire_for_entity [single]");
330 id = octopus_request_id(); // reset it.
332 infoton *to_return = NIL;
333 entity_basket *bask = _table->find(requester);
337 if (!bask->elements()) {
338 #ifdef DEBUG_ENTITY_DATA_BIN
339 LOG(astring("tossing empty basket ") + requester.mangled_form());
341 _table->zap(requester);
345 id = bask->get(0)->_id;
346 to_return = bask->borrow(0)->_item;
347 bask->borrow(0)->_item = NIL;
349 if (!bask->elements()) {
350 #ifdef DEBUG_ENTITY_DATA_BIN
351 LOG(astring("tossing empty basket ") + requester.mangled_form());
353 _table->zap(requester);
356 //#ifdef DEBUG_ENTITY_DATA_BIN
358 LOG("logic error: number of items went below zero.");
363 infoton *entity_data_bin::acquire_for_identifier(const octopus_request_id &id)
365 FUNCDEF("acquire_for_identifier");
366 infoton *to_return = NIL;
368 entity_basket *bask = _table->find(id._entity);
369 if (!bask) return NIL;
370 if (!bask->elements()) {
371 #ifdef DEBUG_ENTITY_DATA_BIN
372 LOG(astring("tossing empty basket ") + id._entity.mangled_form());
374 _table->zap(id._entity);
377 for (int i = 0; i < bask->elements(); i++) {
378 if (bask->get(i)->_id == id) {
379 to_return = bask->borrow(i)->_item; // snag the item.
380 bask->borrow(i)->_item = NIL; // clear the list's version out.
381 bask->zap(i, i); // whack the sanitized element.
383 if (!bask->elements()) {
384 #ifdef DEBUG_ENTITY_DATA_BIN
385 LOG(astring("tossing empty basket ") + id._entity.mangled_form());
387 _table->zap(id._entity);
390 //#ifdef DEBUG_ENTITY_DATA_BIN
392 LOG("logic error: number of items went below zero.");
400 bool cleaning_applier(const octopus_entity &key, entity_basket &bask,
403 #define static_class_name() "entity_data_bin"
404 FUNCDEF("cleaning_applier");
405 apply_struct *apple = (apply_struct *)data_link;
406 time_stamp expiration_time(-apple->_decay_interval);
409 for (int i = 0; i < bask.elements(); i++) {
410 infoton_holder &rec = *bask.borrow(i);
411 if (rec._when_added <= expiration_time) {
412 // if a requester hasn't picked this up in N seconds, then drop it.
413 #ifdef DEBUG_ENTITY_DATA_BIN
414 LOG(astring("whacking old item ") + rec._id.text_form());
417 apple->_items_held--;
418 //#ifdef DEBUG_ENTITY_DATA_BIN
419 if (apple->_items_held < 0)
420 LOG("logic error: number of items went below zero.");
423 i--; // skip back before the delete.
425 // NOTE: this break is based on an assumption about the storage of
426 // items; if it's ever the case in the future that items can be
427 // disordered on time of arrival in the queue, then the break should
432 #ifdef DEBUG_ENTITY_DATA_BIN
434 LOG(a_sprintf("==> whacked %d old items.", whack_count));
436 if (!bask.elements()) {
437 // if the basket has nothing left in it then we signal the parent that
438 // it can be deleted.
439 //LOG("adding to empty basket list.");
440 *apple->_empty_baskets += key;
441 //LOG("added to empty basket list.");
444 // keep iterating on items unless we know it's time to go.
446 #undef static_class_name
449 void entity_data_bin::clean_out_deadwood(int decay_interval)
451 #ifdef DEBUG_ENTITY_DATA_BIN
452 FUNCDEF("clean_out_deadwood");
455 // check that no items have timed out.
456 apply_struct apple(_items_held);
457 basketcase empty_baskets;
458 apple._empty_baskets = &empty_baskets;
459 apple._decay_interval = decay_interval;
460 _table->apply(cleaning_applier, &apple);
462 // clean up any entities whose baskets are empty.
463 for (int i = empty_baskets.length() - 1; i >= 0; i--) {
464 #ifdef DEBUG_ENTITY_DATA_BIN
465 LOG(astring("removing basket ") + empty_baskets.get(i).mangled_form());
467 _table->zap(empty_baskets.get(i));
468 empty_baskets.zap(i, i);
469 // we don't skip back since we're scanning the array from its end.
473 bool entity_data_bin::get_sizes(const octopus_entity &id, int &items,
476 // FUNCDEF("get_sizes");
480 entity_basket *bask = _table->find(id);
481 if (!bask || !bask->elements()) return false;
482 items = bask->elements();
483 for (int i = 0; i < bask->elements(); i++)
484 bytes += bask->borrow(i)->_item->packed_size();