first check-in of feisty meow codebase. many things broken still due to recent
[feisty_meow.git] / octopi / library / octopus / entity_data_bin.cpp
1 /*****************************************************************************\
2 *                                                                             *
3 *  Name   : entity_data_bin                                                   *
4 *  Author : Chris Koeritz                                                     *
5 *                                                                             *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author.  This program is free software; you can  *
8 * redistribute it and/or modify it under the terms of the GNU General Public  *
9 * License as published by the Free Software Foundation; either version 2 of   *
10 * the License or (at your option) any later version.  This is online at:      *
11 *     http://www.fsf.org/copyleft/gpl.html                                    *
12 * Please send any updates to: fred@gruntose.com                               *
13 \*****************************************************************************/
14
15 #include "entity_data_bin.h"
16 #include "entity_defs.h"
17 #include "infoton.h"
18 #include "tentacle.h"
19
20 #include <basis/astring.h>
21
22 #include <basis/mutex.h>
23 #include <loggers/program_wide_logger.h>
24 #include <structures/set.h>
25 #include <structures/string_array.h>
26 #include <structures/amorph.h>
27 #include <structures/string_hash.h>
28 #include <textual/parser_bits.h>
29 #include <timely/time_stamp.h>
30
31 using namespace basis;
32 using namespace loggers;
33 using namespace structures;
34 using namespace textual;
35 using namespace timely;
36
37 namespace octopi {
38
39 //#define DEBUG_ENTITY_DATA_BIN
40   // uncomment for more debugging information.
41
42 #undef GRAB_LOCK
43 #define GRAB_LOCK \
44   auto_synchronizer l(*_ent_lock)
45
46 #undef LOG
47 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
48
49 const int OCTOPUS_TABLE_BITS = 6;
50   // the hash table for items will have 2^N entries.
51
52 //hmmm: parameterize in class interface?
53 ////const int DATA_DECAY_INTERVAL = 4 * MINUTE_ms;
54   // if we haven't gotten a data item out to its entity in this long, then
55   // we assume the entity has croaked or doesn't want its data.
56
57 //////////////
58
59 class infoton_holder
60 {
61 public:
62   infoton *_item;      // the data making up the production.
63   octopus_request_id _id;  // the id, if any, of the original request.
64   time_stamp _when_added;  // when the data became available.
65
66   infoton_holder(const octopus_request_id &id = octopus_request_id(),
67       infoton *item = NIL)
68   : _item(item), _id(id), _when_added() {}
69
70   ~infoton_holder() { WHACK(_item); }
71
72   astring text_form() const {
73     return astring("id=") + _id.text_form() + ", added="
74         + _when_added.text_form() + ", item="
75         + _item->classifier().text_form() + ", data="
76         + _item->text_form();
77   }
78 };
79
80 //////////////
81
82 class entity_basket : public amorph<infoton_holder>
83 {
84 public:
85   time_stamp _last_active;
86
87   astring text_form() const {
88     astring to_return;
89     for (int i = 0; i < elements(); i++)
90       to_return += get(i)->text_form() + parser_bits::platform_eol_to_chars();
91     return to_return;
92   }
93 };
94
95 //////////////
96
97 class entity_hasher : public hashing_algorithm
98 {
99 public:
100   virtual hashing_algorithm *clone() const { return new entity_hasher; }
101
102   virtual basis::un_int hash(const void *key_data, int formal(key_length)) const {
103     octopus_entity *key = (octopus_entity *)key_data;
104     // jiggle the pieces of the id into a number.
105     return basis::un_int(
106         key->process_id()
107         + (key->add_in() << 10)
108         + (key->sequencer() << 14)
109         + (key->hostname()[0] << 20)
110         + (key->hostname()[1] << 24) );
111   }
112 };
113
114 //////////////
115
116 class entity_item_hash
117 : public hash_table<octopus_entity, entity_basket>
118 {
119 public:
120   entity_item_hash(const entity_hasher &hash)
121   : hash_table<octopus_entity, entity_basket>(hash, OCTOPUS_TABLE_BITS)
122   {}
123 };
124
125 //////////////
126
127 class basketcase : public structures::set<octopus_entity>
128 {
129 public:
130 };
131
132 //////////////
133
134 // used for our apply methods for communicating back to the caller.
135 struct apply_struct
136 {
137   basketcase *_empty_baskets;
138   entity_basket *_any_item;
139   int &_items_held;  // hooks to parent's item count.
140   int _decay_interval;  // how long are items allowed to live?
141
142   apply_struct(int &items_held)
143       : _empty_baskets(NIL), _any_item(NIL), _items_held(items_held),
144         _decay_interval(0) {}
145 };
146
147 //////////////
148
149 entity_data_bin::entity_data_bin(int max_size_per_entity)
150 : _table(new entity_item_hash(entity_hasher())),
151   _ent_lock(new mutex),
152   _action_count(0),
153   _max_per_ent(max_size_per_entity),
154   _items_held(0)
155 {}
156
157 entity_data_bin::~entity_data_bin()
158 {
159   WHACK(_table);
160   WHACK(_ent_lock);
161 }
162
163 int entity_data_bin::entities() const
164 {
165   GRAB_LOCK;
166   return _table->elements();
167 }
168
169 struct text_form_accumulator { astring _accum; };
170
171 bool text_form_applier(const octopus_entity &formal(key), entity_basket &bask,
172     void *data_link)
173 {
174   text_form_accumulator *shuttle = (text_form_accumulator *)data_link;
175   shuttle->_accum += bask.text_form();
176   return true;
177 }
178
179 astring entity_data_bin::text_form() const
180 {
181   GRAB_LOCK;
182   text_form_accumulator shuttle;
183   _table->apply(text_form_applier, &shuttle);
184   return shuttle._accum;
185 }
186
187 bool scramble_applier(const octopus_entity &formal(key), entity_basket &bask,
188     void *data_link)
189 {
190   #undef static_class_name
191   #define static_class_name() "entity_data_bin"
192 //  FUNCDEF("scramble_applier");
193   int *county = (int *)data_link;
194   *county += bask.elements();
195   return true;
196   #undef static_class_name
197 }
198
199 // this could be extended to do more interesting checks also; currently it's
200 // just like the entities() method really.
201 int entity_data_bin::scramble_counter()
202 {
203   GRAB_LOCK;
204   int count = 0;
205   _table->apply(scramble_applier, &count);
206   return count;
207 }
208
209 #ifdef DEBUG_ENTITY_DATA_BIN
210   #define DUMP_STATE \
211     if ( !(_action_count++ % 100) ) { \
212       int items = scramble_counter(); \
213       LOG(a_sprintf("-> %d items counted.", items)); \
214     }
215 #else
216   #define DUMP_STATE
217 #endif
218
219 bool entity_data_bin::add_item(infoton *to_add,
220     const octopus_request_id &orig_id)
221 {
222 //  FUNCDEF("add_item");
223   GRAB_LOCK;
224   // create a record to add to the appropriate bin.
225   infoton_holder *holder = new infoton_holder(orig_id, to_add);
226
227   // see if a basket already exists for the entity.
228   entity_basket *bask = _table->find(orig_id._entity);
229   if (!bask) {
230     // this entity doesn't have a basket so add one.
231     bask = new entity_basket;
232     _table->add(orig_id._entity, bask);
233   }
234
235   bask->_last_active = time_stamp();  // reset activity time.
236
237   // count up the current amount of data in use.
238   int current_size = 0;
239   for (int i = 0; i < bask->elements(); i++)
240     current_size += bask->borrow(i)->_item->packed_size();
241
242   if (current_size + to_add->packed_size() > _max_per_ent) {
243     WHACK(holder);
244     return false;
245   }
246   
247   // append the latest production to the list.
248   bask->append(holder);
249   _items_held++;
250   return true;
251 }
252
253 bool any_item_applier(const octopus_entity &formal(key), entity_basket &bask,
254     void *data_link)
255 {
256 //#ifdef DEBUG_ENTITY_DATA_BIN
257 //  #define static_class_name() "entity_data_bin"
258 //  FUNCDEF("any_item_applier");
259 //#endif
260   apply_struct *apple = (apply_struct *)data_link;
261   // check the basket to see if it has any items.
262   if (!bask.elements()) {
263 //#ifdef DEBUG_ENTITY_DATA_BIN
264 //    LOG(astring("saw empty basket ") + key.mangled_form());
265 //#endif
266     return true;  // continue iterating.
267   }
268   apple->_any_item = &bask;
269   return false;  // stop iteration.
270   #undef static_class_name
271 }
272
273 infoton *entity_data_bin::acquire_for_any(octopus_request_id &id)
274 {
275   FUNCDEF("acquire_for_any");
276   GRAB_LOCK;
277   apply_struct apple(_items_held);
278   _table->apply(any_item_applier, &apple);
279   if (!apple._any_item) return NIL;
280   DUMP_STATE;
281   // retrieve the information from our basket that was provided.
282   infoton_holder *found = apple._any_item->acquire(0);
283   apple._any_item->zap(0, 0);
284   if (!apple._any_item->elements()) {
285     // toss this empty basket.
286 #ifdef DEBUG_ENTITY_DATA_BIN
287     LOG(astring("tossing empty basket ") + found->_id._entity.mangled_form());
288 #endif
289     _table->zap(found->_id._entity);
290   }
291   apple._any_item = NIL;
292   infoton *to_return = found->_item;
293   id = found->_id;
294   found->_item = NIL;  // clear so it won't be whacked.
295   WHACK(found);
296   _items_held--;
297 //#ifdef DEBUG_ENTITY_DATA_BIN
298   if (_items_held < 0)
299     LOG("logic error: number of items went below zero.");
300 //#endif
301   return to_return;
302 }
303
304 int entity_data_bin::acquire_for_entity(const octopus_entity &requester,
305     infoton_list &items, int maximum_size)
306 {
307 //  FUNCDEF("acquire_for_entity [multiple]");
308   // this method does not grab the lock because it simply composes other
309   // class methods without interacting with class data members.
310   items.reset();
311   if (maximum_size <= 0) maximum_size = 20 * KILOBYTE;
312     // pick a reasonable default.
313   octopus_request_id id;
314   int items_found = 0;
315   while (maximum_size > 0) {
316     infoton *inf = acquire_for_entity(requester, id);
317     if (!inf)
318       break;  // none left.
319     items.append(new infoton_id_pair(inf, id));    
320     maximum_size -= inf->packed_size();
321     items_found++;
322   }
323   return items_found;
324 }
325
326 infoton *entity_data_bin::acquire_for_entity(const octopus_entity &requester,
327     octopus_request_id &id)
328 {
329   FUNCDEF("acquire_for_entity [single]");
330   id = octopus_request_id();  // reset it.
331   GRAB_LOCK;
332   infoton *to_return = NIL;
333   entity_basket *bask = _table->find(requester);
334   if (!bask) {
335     return NIL;
336   }
337   if (!bask->elements()) {
338 #ifdef DEBUG_ENTITY_DATA_BIN
339     LOG(astring("tossing empty basket ") + requester.mangled_form());
340 #endif
341     _table->zap(requester);
342     return NIL;
343   }
344   DUMP_STATE;
345   id = bask->get(0)->_id;
346   to_return = bask->borrow(0)->_item;
347   bask->borrow(0)->_item = NIL;
348   bask->zap(0, 0);
349   if (!bask->elements()) {
350 #ifdef DEBUG_ENTITY_DATA_BIN
351     LOG(astring("tossing empty basket ") + requester.mangled_form());
352 #endif
353     _table->zap(requester);
354   }
355   _items_held--;
356 //#ifdef DEBUG_ENTITY_DATA_BIN
357   if (_items_held < 0)
358     LOG("logic error: number of items went below zero.");
359 //#endif
360   return to_return;
361 }
362
363 infoton *entity_data_bin::acquire_for_identifier(const octopus_request_id &id)
364 {
365   FUNCDEF("acquire_for_identifier");
366   infoton *to_return = NIL;
367   GRAB_LOCK;
368   entity_basket *bask = _table->find(id._entity);
369   if (!bask) return NIL;
370   if (!bask->elements()) {
371 #ifdef DEBUG_ENTITY_DATA_BIN
372     LOG(astring("tossing empty basket ") + id._entity.mangled_form());
373 #endif
374     _table->zap(id._entity);
375     return NIL;
376   }
377   for (int i = 0; i < bask->elements(); i++) {
378     if (bask->get(i)->_id == id) {
379       to_return = bask->borrow(i)->_item;  // snag the item.
380       bask->borrow(i)->_item = NIL;  // clear the list's version out.
381       bask->zap(i, i);  // whack the sanitized element.
382       DUMP_STATE;
383       if (!bask->elements()) {
384 #ifdef DEBUG_ENTITY_DATA_BIN
385         LOG(astring("tossing empty basket ") + id._entity.mangled_form());
386 #endif
387         _table->zap(id._entity);
388       }
389       _items_held--;
390 //#ifdef DEBUG_ENTITY_DATA_BIN
391       if (_items_held < 0)
392         LOG("logic error: number of items went below zero.");
393 //#endif
394       return to_return;
395     }
396   }
397   return NIL;
398 }
399
400 bool cleaning_applier(const octopus_entity &key, entity_basket &bask,
401     void *data_link)
402 {
403   #define static_class_name() "entity_data_bin"
404   FUNCDEF("cleaning_applier");
405   apply_struct *apple = (apply_struct *)data_link;
406   time_stamp expiration_time(-apple->_decay_interval);
407
408   int whack_count = 0;
409   for (int i = 0; i < bask.elements(); i++) {
410     infoton_holder &rec = *bask.borrow(i);
411     if (rec._when_added <= expiration_time) {
412       // if a requester hasn't picked this up in N seconds, then drop it.
413 #ifdef DEBUG_ENTITY_DATA_BIN
414       LOG(astring("whacking old item ") + rec._id.text_form());
415 #endif
416       whack_count++;
417       apple->_items_held--;
418 //#ifdef DEBUG_ENTITY_DATA_BIN
419       if (apple->_items_held < 0)
420         LOG("logic error: number of items went below zero.");
421 //#endif
422       bask.zap(i, i);
423       i--;  // skip back before the delete.
424     } else {
425       // NOTE: this break is based on an assumption about the storage of
426       // items; if it's ever the case in the future that items can be
427       // disordered on time of arrival in the queue, then the break should
428       // be removed.
429       break;
430     }
431   }
432 #ifdef DEBUG_ENTITY_DATA_BIN
433   if (whack_count)
434     LOG(a_sprintf("==> whacked %d old items.", whack_count));
435 #endif
436   if (!bask.elements()) {
437     // if the basket has nothing left in it then we signal the parent that
438     // it can be deleted.
439 //LOG("adding to empty basket list.");
440     *apple->_empty_baskets += key;
441 //LOG("added to empty basket list.");
442   }
443
444   // keep iterating on items unless we know it's time to go.
445   return true;
446   #undef static_class_name
447 }
448
449 void entity_data_bin::clean_out_deadwood(int decay_interval)
450 {
451 #ifdef DEBUG_ENTITY_DATA_BIN
452   FUNCDEF("clean_out_deadwood");
453 #endif
454   GRAB_LOCK;
455   // check that no items have timed out.
456   apply_struct apple(_items_held);
457   basketcase empty_baskets;
458   apple._empty_baskets = &empty_baskets;
459   apple._decay_interval = decay_interval;
460   _table->apply(cleaning_applier, &apple);
461
462   // clean up any entities whose baskets are empty.
463   for (int i = empty_baskets.length() - 1; i >= 0; i--) {
464 #ifdef DEBUG_ENTITY_DATA_BIN
465      LOG(astring("removing basket ") + empty_baskets.get(i).mangled_form());
466 #endif
467     _table->zap(empty_baskets.get(i));
468     empty_baskets.zap(i, i);
469     // we don't skip back since we're scanning the array from its end.
470   }
471 }
472
473 bool entity_data_bin::get_sizes(const octopus_entity &id, int &items,
474     int &bytes)
475 {
476 //  FUNCDEF("get_sizes");
477   items = 0;
478   bytes = 0;
479   GRAB_LOCK;
480   entity_basket *bask = _table->find(id);
481   if (!bask || !bask->elements()) return false;
482   items = bask->elements();
483   for (int i = 0; i < bask->elements(); i++)
484     bytes += bask->borrow(i)->_item->packed_size();
485   return true;
486 }
487
488 } //namespace.
489