feisty meow concerns codebase  2.140
entity_data_bin.cpp
Go to the documentation of this file.
1 /*****************************************************************************\
2 * *
3 * Name : entity_data_bin *
4 * Author : Chris Koeritz *
5 * *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
14 
15 #include "entity_data_bin.h"
16 #include "entity_defs.h"
17 #include "infoton.h"
18 #include "tentacle.h"
19 
20 #include <basis/astring.h>
21 
22 #include <basis/mutex.h>
24 #include <structures/set.h>
26 #include <structures/amorph.h>
27 #include <structures/string_hash.h>
28 #include <textual/parser_bits.h>
29 #include <timely/time_stamp.h>
30 
31 using namespace basis;
32 using namespace loggers;
33 using namespace structures;
34 using namespace textual;
35 using namespace timely;
36 
37 namespace octopi {
38 
39 //#define DEBUG_ENTITY_DATA_BIN
40  // uncomment for more debugging information.
41 
42 #undef GRAB_LOCK
43 #define GRAB_LOCK \
44  auto_synchronizer l(*_ent_lock)
45 
46 #undef LOG
47 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
48 
49 const int OCTOPUS_TABLE_BITS = 6;
50  // the hash table for items will have 2^N entries.
51 
52 //hmmm: parameterize in class interface?
54  // if we haven't gotten a data item out to its entity in this long, then
55  // we assume the entity has croaked or doesn't want its data.
56 
58 
59 class infoton_holder
60 {
61 public:
62  infoton *_item; // the data making up the production.
63  octopus_request_id _id; // the id, if any, of the original request.
64  time_stamp _when_added; // when the data became available.
65 
66  infoton_holder(const octopus_request_id &id = octopus_request_id(),
67  infoton *item = NULL_POINTER)
68  : _item(item), _id(id), _when_added() {}
69 
70  ~infoton_holder() { WHACK(_item); }
71 
72  astring text_form() const {
73  return astring("id=") + _id.text_form() + ", added="
74  + _when_added.text_form() + ", item="
75  + _item->classifier().text_form() + ", data="
76  + _item->text_form();
77  }
78 };
79 
81 
82 class entity_basket : public amorph<infoton_holder>
83 {
84 public:
85  time_stamp _last_active;
86 
87  astring text_form() const {
88  astring to_return;
89  for (int i = 0; i < elements(); i++)
90  to_return += get(i)->text_form() + parser_bits::platform_eol_to_chars();
91  return to_return;
92  }
93 };
94 
96 
97 class entity_hasher : public hashing_algorithm
98 {
99 public:
100  virtual hashing_algorithm *clone() const { return new entity_hasher; }
101 
102  virtual basis::un_int hash(const void *key_data, int formal(key_length)) const {
103  octopus_entity *key = (octopus_entity *)key_data;
104  // jiggle the pieces of the id into a number.
105  return basis::un_int(
106  key->process_id()
107  + (key->add_in() << 10)
108  + (key->sequencer() << 14)
109  + (key->hostname()[0] << 20)
110  + (key->hostname()[1] << 24) );
111  }
112 };
113 
115 
116 class entity_item_hash
117 : public hash_table<octopus_entity, entity_basket>
118 {
119 public:
120  entity_item_hash(const entity_hasher &hash)
121  : hash_table<octopus_entity, entity_basket>(hash, OCTOPUS_TABLE_BITS)
122  {}
123 };
124 
126 
127 class basketcase : public structures::set<octopus_entity>
128 {
129 public:
130 };
131 
133 
134 // used for our apply methods for communicating back to the caller.
135 struct apply_struct
136 {
137  basketcase *_empty_baskets;
138  entity_basket *_any_item;
139  int &_items_held; // hooks to parent's item count.
140  int _decay_interval; // how long are items allowed to live?
141 
142  apply_struct(int &items_held)
143  : _empty_baskets(NULL_POINTER), _any_item(NULL_POINTER), _items_held(items_held),
144  _decay_interval(0) {}
145 };
146 
148 
149 entity_data_bin::entity_data_bin(int max_size_per_entity)
150 : _table(new entity_item_hash(entity_hasher())),
151  _ent_lock(new mutex),
152  _action_count(0),
153  _max_per_ent(max_size_per_entity),
154  _items_held(0)
155 {}
156 
158 {
159  WHACK(_table);
160  WHACK(_ent_lock);
161 }
162 
164 {
165  GRAB_LOCK;
166  return _table->elements();
167 }
168 
169 struct text_form_accumulator { astring _accum; };
170 
171 bool text_form_applier(const octopus_entity &formal(key), entity_basket &bask,
172  void *data_link)
173 {
174  text_form_accumulator *shuttle = (text_form_accumulator *)data_link;
175  shuttle->_accum += bask.text_form();
176  return true;
177 }
178 
180 {
181  GRAB_LOCK;
182  text_form_accumulator shuttle;
183  _table->apply(text_form_applier, &shuttle);
184  return shuttle._accum;
185 }
186 
187 bool scramble_applier(const octopus_entity &formal(key), entity_basket &bask,
188  void *data_link)
189 {
190  #undef static_class_name
191  #define static_class_name() "entity_data_bin"
192  FUNCDEF("scramble_applier");
193  int *county = (int *)data_link;
194  *county += bask.elements();
195  return true;
196  #undef static_class_name
197 }
198 
199 // this could be extended to do more interesting checks also; currently it's
200 // just like the entities() method really.
201 int entity_data_bin::scramble_counter()
202 {
203  GRAB_LOCK;
204  int count = 0;
205  _table->apply(scramble_applier, &count);
206  return count;
207 }
208 
209 #ifdef DEBUG_ENTITY_DATA_BIN
210  #define DUMP_STATE \
211  if ( !(_action_count++ % 100) ) { \
212  int items = scramble_counter(); \
213  LOG(a_sprintf("-> %d items counted.", items)); \
214  }
215 #else
216  #define DUMP_STATE
217 #endif
218 
220  const octopus_request_id &orig_id)
221 {
222  FUNCDEF("add_item");
223  GRAB_LOCK;
224  // create a record to add to the appropriate bin.
225  infoton_holder *holder = new infoton_holder(orig_id, to_add);
226 
227  // see if a basket already exists for the entity.
228  entity_basket *bask = _table->find(orig_id._entity);
229  if (!bask) {
230  // this entity doesn't have a basket so add one.
231  bask = new entity_basket;
232  _table->add(orig_id._entity, bask);
233  }
234 
235  bask->_last_active = time_stamp(); // reset activity time.
236 
237  // count up the current amount of data in use.
238  int current_size = 0;
239  for (int i = 0; i < bask->elements(); i++)
240  current_size += bask->borrow(i)->_item->packed_size();
241 
242  if (current_size + to_add->packed_size() > _max_per_ent) {
243  WHACK(holder);
244 LOG(astring("size limit would be exceeded if we stored this product"));
245  return false;
246  }
247 
248  // append the latest production to the list.
249  bask->append(holder);
250  _items_held++;
251  return true;
252 }
253 
254 bool any_item_applier(const octopus_entity &formal(key), entity_basket &bask,
255  void *data_link)
256 {
257  #define static_class_name() "entity_data_bin"
258  FUNCDEF("any_item_applier");
259  apply_struct *apple = (apply_struct *)data_link;
260  // check the basket to see if it has any items.
261  if (!bask.elements()) {
262 //#ifdef DEBUG_ENTITY_DATA_BIN
263 // LOG(astring("saw empty basket ") + key.mangled_form());
264 //#endif
265  return true; // continue iterating.
266  }
267  apple->_any_item = &bask;
268  return false; // stop iteration.
269  #undef static_class_name
270 }
271 
273 {
274  FUNCDEF("acquire_for_any");
275  GRAB_LOCK;
276  apply_struct apple(_items_held);
277  _table->apply(any_item_applier, &apple);
278  if (!apple._any_item) return NULL_POINTER;
279  DUMP_STATE;
280  // retrieve the information from our basket that was provided.
281  infoton_holder *found = apple._any_item->acquire(0);
282  apple._any_item->zap(0, 0);
283  if (!apple._any_item->elements()) {
284  // toss this empty basket.
285 #ifdef DEBUG_ENTITY_DATA_BIN
286  LOG(astring("tossing empty basket ") + found->_id._entity.mangled_form());
287 #endif
288  _table->zap(found->_id._entity);
289  }
290  apple._any_item = NULL_POINTER;
291  infoton *to_return = found->_item;
292  id = found->_id;
293  found->_item = NULL_POINTER; // clear so it won't be whacked.
294  WHACK(found);
295  _items_held--;
296 //#ifdef DEBUG_ENTITY_DATA_BIN
297  if (_items_held < 0)
298  LOG("logic error: number of items went below zero.");
299 //#endif
300  return to_return;
301 }
302 
304  infoton_list &items, int maximum_size)
305 {
306  FUNCDEF("acquire_for_entity [multiple]");
307  // this method does not grab the lock because it simply composes other
308  // class methods without interacting with class data members.
309  items.reset();
310  if (maximum_size <= 0) maximum_size = 20 * KILOBYTE;
311  // pick a reasonable default.
313  int items_found = 0;
314  while (maximum_size > 0) {
315  infoton *inf = acquire_for_entity(requester, id);
316  if (!inf)
317  break; // none left.
318  items.append(new infoton_id_pair(inf, id));
319  maximum_size -= inf->packed_size();
320  items_found++;
321  }
322  return items_found;
323 }
324 
326  octopus_request_id &id)
327 {
328  FUNCDEF("acquire_for_entity [single]");
329  id = octopus_request_id(); // reset it.
330  GRAB_LOCK;
331  infoton *to_return = NULL_POINTER;
332  entity_basket *bask = _table->find(requester);
333  if (!bask) {
334  return NULL_POINTER;
335  }
336  if (!bask->elements()) {
337 #ifdef DEBUG_ENTITY_DATA_BIN
338  LOG(astring("tossing empty basket ") + requester.mangled_form());
339 #endif
340  _table->zap(requester);
341  return NULL_POINTER;
342  }
343  DUMP_STATE;
344  id = bask->get(0)->_id;
345  to_return = bask->borrow(0)->_item;
346  bask->borrow(0)->_item = NULL_POINTER;
347  bask->zap(0, 0);
348  if (!bask->elements()) {
349 #ifdef DEBUG_ENTITY_DATA_BIN
350  LOG(astring("tossing empty basket ") + requester.mangled_form());
351 #endif
352  _table->zap(requester);
353  }
354  _items_held--;
355 //#ifdef DEBUG_ENTITY_DATA_BIN
356  if (_items_held < 0)
357  LOG("logic error: number of items went below zero.");
358 //#endif
359  return to_return;
360 }
361 
363 {
364  FUNCDEF("acquire_for_identifier");
365  infoton *to_return = NULL_POINTER;
366  GRAB_LOCK;
367  entity_basket *bask = _table->find(id._entity);
368  if (!bask) return NULL_POINTER;
369  if (!bask->elements()) {
370 #ifdef DEBUG_ENTITY_DATA_BIN
371  LOG(astring("tossing empty basket ") + id._entity.mangled_form());
372 #endif
373  _table->zap(id._entity);
374  return NULL_POINTER;
375  }
376  for (int i = 0; i < bask->elements(); i++) {
377  if (bask->get(i)->_id == id) {
378  to_return = bask->borrow(i)->_item; // snag the item.
379  bask->borrow(i)->_item = NULL_POINTER; // clear the list's version out.
380  bask->zap(i, i); // whack the sanitized element.
381  DUMP_STATE;
382  if (!bask->elements()) {
383 #ifdef DEBUG_ENTITY_DATA_BIN
384  LOG(astring("tossing empty basket ") + id._entity.mangled_form());
385 #endif
386  _table->zap(id._entity);
387  }
388  _items_held--;
389 //#ifdef DEBUG_ENTITY_DATA_BIN
390  if (_items_held < 0)
391  LOG("logic error: number of items went below zero.");
392 //#endif
393  return to_return;
394  }
395  }
396  return NULL_POINTER;
397 }
398 
399 bool cleaning_applier(const octopus_entity &key, entity_basket &bask,
400  void *data_link)
401 {
402  #define static_class_name() "entity_data_bin"
403  FUNCDEF("cleaning_applier");
404  apply_struct *apple = (apply_struct *)data_link;
405  time_stamp expiration_time(-apple->_decay_interval);
406 
407  int whack_count = 0;
408  for (int i = 0; i < bask.elements(); i++) {
409  infoton_holder &rec = *bask.borrow(i);
410  if (rec._when_added <= expiration_time) {
411  // if a requester hasn't picked this up in N seconds, then drop it.
412 #ifdef DEBUG_ENTITY_DATA_BIN
413  LOG(astring("whacking old item ") + rec._id.text_form());
414 #endif
415  whack_count++;
416  apple->_items_held--;
417 //#ifdef DEBUG_ENTITY_DATA_BIN
418  if (apple->_items_held < 0)
419  LOG("logic error: number of items went below zero.");
420 //#endif
421  bask.zap(i, i);
422  i--; // skip back before the delete.
423  } else {
424  // NOTE: this break is based on an assumption about the storage of
425  // items; if it's ever the case in the future that items can be
426  // disordered on time of arrival in the queue, then the break should
427  // be removed.
428  break;
429  }
430  }
431 #ifdef DEBUG_ENTITY_DATA_BIN
432  if (whack_count)
433  LOG(a_sprintf("==> whacked %d old items.", whack_count));
434 #endif
435  if (!bask.elements()) {
436  // if the basket has nothing left in it then we signal the parent that
437  // it can be deleted.
438 //LOG("adding to empty basket list.");
439  *apple->_empty_baskets += key;
440 //LOG("added to empty basket list.");
441  }
442 
443  // keep iterating on items unless we know it's time to go.
444  return true;
445  #undef static_class_name
446 }
447 
448 void entity_data_bin::clean_out_deadwood(int decay_interval)
449 {
450 #ifdef DEBUG_ENTITY_DATA_BIN
451  FUNCDEF("clean_out_deadwood");
452 #endif
453  GRAB_LOCK;
454  // check that no items have timed out.
455  apply_struct apple(_items_held);
456  basketcase empty_baskets;
457  apple._empty_baskets = &empty_baskets;
458  apple._decay_interval = decay_interval;
459  _table->apply(cleaning_applier, &apple);
460 
461  // clean up any entities whose baskets are empty.
462  for (int i = empty_baskets.length() - 1; i >= 0; i--) {
463 #ifdef DEBUG_ENTITY_DATA_BIN
464  LOG(astring("removing basket ") + empty_baskets.get(i).mangled_form());
465 #endif
466  _table->zap(empty_baskets.get(i));
467  empty_baskets.zap(i, i);
468  // we don't skip back since we're scanning the array from its end.
469  }
470 }
471 
472 bool entity_data_bin::get_sizes(const octopus_entity &id, int &items,
473  int &bytes)
474 {
475  FUNCDEF("get_sizes");
476  items = 0;
477  bytes = 0;
478  GRAB_LOCK;
479  entity_basket *bask = _table->find(id);
480  if (!bask || !bask->elements()) return false;
481  items = bask->elements();
482  for (int i = 0; i < bask->elements(); i++)
483  bytes += bask->borrow(i)->_item->packed_size();
484  return true;
485 }
486 
487 } //namespace.
488 
a_sprintf is a specialization of astring that provides printf style support.
Definition: astring.h:440
Provides a dynamically resizable ASCII character string.
Definition: astring.h:35
virtual char get(int index) const
a constant peek at the string's internals at the specified index.
Definition: astring.cpp:138
virtual void text_form(base_string &state_fill) const
Provides a text view of all the important info owned by this object.
Definition: astring.cpp:130
bool add_item(infoton *to_add, const octopus_request_id &id)
infoton * acquire_for_any(octopus_request_id &id)
infoton * acquire_for_identifier(const octopus_request_id &id)
void clean_out_deadwood(int decay_interval=4 *basis::MINUTE_ms)
basis::astring text_form() const
bool get_sizes(const octopus_entity &id, int &items, int &bytes)
infoton * acquire_for_entity(const octopus_entity &requester, octopus_request_id &id)
implements a list of waiting infotons.
Definition: entity_defs.h:166
a list of pending requests and who made them.
Definition: entity_defs.h:181
An infoton is an individual request parcel with accompanying information.
Definition: infoton.h:32
virtual void text_form(basis::base_string &state_fill) const =0
requires derived infotons to be able to show their state as a string.
virtual int packed_size() const =0
reports how large the infoton will be when packed.
const structures::string_array & classifier() const
this array of strings is the "name" for this infoton.
Definition: infoton.cpp:85
Provides a way of identifying users of an octopus object.
Definition: entity_defs.h:35
basis::astring mangled_form() const
returns the combined string form of the identifier.
Identifies requests made on an octopus by users.
Definition: entity_defs.h:114
basis::astring text_form() const
human readable form of the request.
octopus_entity _entity
the entity.
Definition: entity_defs.h:116
basis::outcome append(const contents *data)
puts "data" on the end of this amorph.
Definition: amorph.h:303
void reset()
cleans out all of the contents.
Definition: amorph.h:81
Implements hashing into buckets for quick object access.
Definition: hash_table.h:70
A hashing algorithm takes a key and derives a related integer from it.
Definition: hash_table.h:41
Emulates a mathematical set, providing several standard set operations.
Definition: set.h:36
basis::astring text_form() const
A synonym for the text_format() method.
Definition: string_array.h:71
Represents a point in time relative to the operating system startup time.
Definition: time_stamp.h:38
basis::astring text_form(stamp_display_style style=STAMP_RELATIVE) const
returns a simple textual representation of the time_stamp.
Definition: time_stamp.cpp:61
#define formal(parameter)
This macro just eats what it's passed; it marks unused formal parameters.
Definition: definitions.h:48
#define NULL_POINTER
The value representing a pointer to nothing.
Definition: definitions.h:32
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
Definition: enhance_cpp.h:57
#define GRAB_LOCK
#define LOG(s)
#define DUMP_STATE
The guards collection helps in testing preconditions and reporting errors.
Definition: array.h:30
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
Definition: functions.h:121
unsigned int un_int
Abbreviated name for unsigned integers.
Definition: definitions.h:62
const int KILOBYTE
Number of bytes in a kilobyte.
Definition: definitions.h:134
A logger that sends to the console screen using the standard output device.
bool text_form_applier(const octopus_entity &formal(key), entity_basket &bask, void *data_link)
bool cleaning_applier(const octopus_entity &key, entity_basket &bask, void *data_link)
const int OCTOPUS_TABLE_BITS
bool scramble_applier(const octopus_entity &formal(key), entity_basket &bask, void *data_link)
bool any_item_applier(const octopus_entity &formal(key), entity_basket &bask, void *data_link)
A dynamic container class that holds any kind of object via pointers.
Definition: amorph.h:55
#include <time.h>
Definition: earth_time.cpp:37