feisty meow concerns codebase 2.140
entity_data_bin.cpp
Go to the documentation of this file.
1/*****************************************************************************\
2* *
3* Name : entity_data_bin *
4* Author : Chris Koeritz *
5* *
6*******************************************************************************
7* Copyright (c) 2002-$now By Author. This program is free software; you can *
8* redistribute it and/or modify it under the terms of the GNU General Public *
9* License as published by the Free Software Foundation; either version 2 of *
10* the License or (at your option) any later version. This is online at: *
11* http://www.fsf.org/copyleft/gpl.html *
12* Please send any updates to: fred@gruntose.com *
13\*****************************************************************************/
14
15#include "entity_data_bin.h"
16#include "entity_defs.h"
17#include "infoton.h"
18#include "tentacle.h"
19
20#include <basis/astring.h>
21
22#include <basis/mutex.h>
24#include <structures/set.h>
26#include <structures/amorph.h>
28#include <textual/parser_bits.h>
29#include <timely/time_stamp.h>
30
31using namespace basis;
32using namespace loggers;
33using namespace structures;
34using namespace textual;
35using namespace timely;
36
37namespace octopi {
38
39//#define DEBUG_ENTITY_DATA_BIN
40 // uncomment for more debugging information.
41
42#undef GRAB_LOCK
43#define GRAB_LOCK \
44 auto_synchronizer l(*_ent_lock)
45
46#undef LOG
47#define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
48
49const int OCTOPUS_TABLE_BITS = 6;
50 // the hash table for items will have 2^N entries.
51
52//hmmm: parameterize in class interface?
54 // if we haven't gotten a data item out to its entity in this long, then
55 // we assume the entity has croaked or doesn't want its data.
56
58
59class infoton_holder
60{
61public:
62 infoton *_item; // the data making up the production.
63 octopus_request_id _id; // the id, if any, of the original request.
64 time_stamp _when_added; // when the data became available.
65
66 infoton_holder(const octopus_request_id &id = octopus_request_id(),
67 infoton *item = NULL_POINTER)
68 : _item(item), _id(id), _when_added() {}
69
70 ~infoton_holder() { WHACK(_item); }
71
72 astring text_form() const {
73 return astring("id=") + _id.text_form() + ", added="
74 + _when_added.text_form() + ", item="
75 + _item->classifier().text_form() + ", data="
76 + _item->text_form();
77 }
78};
79
81
82class entity_basket : public amorph<infoton_holder>
83{
84public:
85 time_stamp _last_active;
86
87 astring text_form() const {
88 astring to_return;
89 for (int i = 0; i < elements(); i++)
91 return to_return;
92 }
93};
94
96
97class entity_hasher : public hashing_algorithm
98{
99public:
100 virtual hashing_algorithm *clone() const { return new entity_hasher; }
101
102 virtual basis::un_int hash(const void *key_data, int formal(key_length)) const {
103 octopus_entity *key = (octopus_entity *)key_data;
104 // jiggle the pieces of the id into a number.
105 return basis::un_int(
106 key->process_id()
107 + (key->add_in() << 10)
108 + (key->sequencer() << 14)
109 + (key->hostname()[0] << 20)
110 + (key->hostname()[1] << 24) );
111 }
112};
113
115
116class entity_item_hash
117: public hash_table<octopus_entity, entity_basket>
118{
119public:
120 entity_item_hash(const entity_hasher &hash)
121 : hash_table<octopus_entity, entity_basket>(hash, OCTOPUS_TABLE_BITS)
122 {}
123};
124
126
127class basketcase : public structures::set<octopus_entity>
128{
129public:
130};
131
133
134// used for our apply methods for communicating back to the caller.
135struct apply_struct
136{
137 basketcase *_empty_baskets;
138 entity_basket *_any_item;
139 int &_items_held; // hooks to parent's item count.
140 int _decay_interval; // how long are items allowed to live?
141
142 apply_struct(int &items_held)
143 : _empty_baskets(NULL_POINTER), _any_item(NULL_POINTER), _items_held(items_held),
144 _decay_interval(0) {}
145};
146
148
149entity_data_bin::entity_data_bin(int max_size_per_entity)
150: _table(new entity_item_hash(entity_hasher())),
151 _ent_lock(new mutex),
152 _action_count(0),
153 _max_per_ent(max_size_per_entity),
154 _items_held(0)
155{}
156
158{
159 WHACK(_table);
160 WHACK(_ent_lock);
161}
162
164{
165 GRAB_LOCK;
166 return _table->elements();
167}
168
169struct text_form_accumulator { astring _accum; };
170
171bool text_form_applier(const octopus_entity &formal(key), entity_basket &bask,
172 void *data_link)
173{
174 text_form_accumulator *shuttle = (text_form_accumulator *)data_link;
175 shuttle->_accum += bask.text_form();
176 return true;
177}
178
180{
181 GRAB_LOCK;
182 text_form_accumulator shuttle;
183 _table->apply(text_form_applier, &shuttle);
184 return shuttle._accum;
185}
186
187bool scramble_applier(const octopus_entity &formal(key), entity_basket &bask,
188 void *data_link)
189{
190 #undef static_class_name
191 #define static_class_name() "entity_data_bin"
192 FUNCDEF("scramble_applier");
193 int *county = (int *)data_link;
194 *county += bask.elements();
195 return true;
196 #undef static_class_name
197}
198
199// this could be extended to do more interesting checks also; currently it's
200// just like the entities() method really.
201int entity_data_bin::scramble_counter()
202{
203 GRAB_LOCK;
204 int count = 0;
205 _table->apply(scramble_applier, &count);
206 return count;
207}
208
209#ifdef DEBUG_ENTITY_DATA_BIN
210 #define DUMP_STATE \
211 if ( !(_action_count++ % 100) ) { \
212 int items = scramble_counter(); \
213 LOG(a_sprintf("-> %d items counted.", items)); \
214 }
215#else
216 #define DUMP_STATE
217#endif
218
220 const octopus_request_id &orig_id)
221{
222 FUNCDEF("add_item");
223 GRAB_LOCK;
224 // create a record to add to the appropriate bin.
225 infoton_holder *holder = new infoton_holder(orig_id, to_add);
226
227 // see if a basket already exists for the entity.
228 entity_basket *bask = _table->find(orig_id._entity);
229 if (!bask) {
230 // this entity doesn't have a basket so add one.
231 bask = new entity_basket;
232 _table->add(orig_id._entity, bask);
233 }
234
235 bask->_last_active = time_stamp(); // reset activity time.
236
237 // count the current amount of data in use.
238 int current_count = 0; int current_size = 0;
239 bool worked = get_sizes(orig_id._entity, current_count, current_size);
240#ifdef DEBUG_ENTITY_DATA_BIN
241// LOG(a_sprintf("size before add=%d", current_size));
242#endif
243 if (current_size + to_add->packed_size() > _max_per_ent) {
244#ifdef DEBUG_ENTITY_DATA_BIN
245 LOG(a_sprintf("size limit would be exceeded if we stored this product (would grow to %d with limit of %d).", current_size + to_add->packed_size(), _max_per_ent));
246#endif
247 WHACK(holder);
248 return false;
249 }
250
251 // append the latest production to the list.
252 bask->append(holder);
253 _items_held++;
254
255#ifdef DEBUG_ENTITY_DATA_BIN
256// worked = get_sizes(orig_id._entity, current_count, current_size);
257// LOG(a_sprintf("size after add=%d", current_size));
258#endif
259
260 return true;
261}
262
263bool any_item_applier(const octopus_entity &formal(key), entity_basket &bask,
264 void *data_link)
265{
266 #define static_class_name() "entity_data_bin"
267 FUNCDEF("any_item_applier");
268 apply_struct *apple = (apply_struct *)data_link;
269 // check the basket to see if it has any items.
270 if (!bask.elements()) {
271//#ifdef DEBUG_ENTITY_DATA_BIN
272// LOG(astring("saw empty basket ") + key.mangled_form());
273//#endif
274 return true; // continue iterating.
275 }
276 apple->_any_item = &bask;
277 return false; // stop iteration.
278 #undef static_class_name
279}
280
282{
283 FUNCDEF("acquire_for_any");
284 GRAB_LOCK;
285 apply_struct apple(_items_held);
286 _table->apply(any_item_applier, &apple);
287 if (!apple._any_item) return NULL_POINTER;
289#ifdef DEBUG_ENTITY_DATA_BIN
290// int current_count = 0;
291// int current_size = 0;
292// bool worked = get_sizes(id._entity, current_count, current_size);
293// LOG(a_sprintf("size before remove=%d", current_size));
294#endif
295 // retrieve the information from our basket that was provided.
296 infoton_holder *found = apple._any_item->acquire(0);
297 apple._any_item->zap(0, 0);
298 if (!apple._any_item->elements()) {
299 // toss this empty basket.
300#ifdef DEBUG_ENTITY_DATA_BIN
301 LOG(astring("tossing empty basket ") + found->_id._entity.mangled_form());
302#endif
303 _table->zap(found->_id._entity);
304 }
305 apple._any_item = NULL_POINTER;
306 infoton *to_return = found->_item;
307 id = found->_id;
308 found->_item = NULL_POINTER; // clear so it won't be whacked.
309 WHACK(found);
310 _items_held--;
311
312//#ifdef DEBUG_ENTITY_DATA_BIN
313 if (_items_held < 0)
314 LOG("logic error: number of items went below zero.");
315//#endif
316
317#ifdef DEBUG_ENTITY_DATA_BIN
318// worked = get_sizes(id._entity, current_count, current_size);
319// LOG(a_sprintf("size after remove=%d", current_size));
320#endif
321 return to_return;
322}
323
325 infoton_list &items, int maximum_size)
326{
327 FUNCDEF("acquire_for_entity [multiple]");
328 // this method does not grab the lock because it simply composes other
329 // class methods without interacting with class data members.
330 items.reset();
331 if (maximum_size <= 0) maximum_size = 20 * KILOBYTE;
332 // pick a reasonable default.
334 int items_found = 0;
335 while (maximum_size > 0) {
336 infoton *inf = acquire_for_entity(requester, id);
337 if (!inf)
338 break; // none left.
339 items.append(new infoton_id_pair(inf, id));
340 maximum_size -= inf->packed_size();
341 items_found++;
342 }
343 return items_found;
344}
345
348{
349 FUNCDEF("acquire_for_entity [single]");
350 id = octopus_request_id(); // reset it.
351 GRAB_LOCK;
352 infoton *to_return = NULL_POINTER;
353 entity_basket *bask = _table->find(requester);
354 if (!bask) {
355 return NULL_POINTER;
356 }
357 if (!bask->elements()) {
358#ifdef DEBUG_ENTITY_DATA_BIN
359 LOG(astring("tossing empty basket ") + requester.mangled_form());
360#endif
361 _table->zap(requester);
362 return NULL_POINTER;
363 }
365 id = bask->get(0)->_id;
366 to_return = bask->borrow(0)->_item;
367 bask->borrow(0)->_item = NULL_POINTER;
368 bask->zap(0, 0);
369 if (!bask->elements()) {
370#ifdef DEBUG_ENTITY_DATA_BIN
371 LOG(astring("tossing empty basket ") + requester.mangled_form());
372#endif
373 _table->zap(requester);
374 }
375 _items_held--;
376//#ifdef DEBUG_ENTITY_DATA_BIN
377 if (_items_held < 0)
378 LOG("logic error: number of items went below zero.");
379//#endif
380 return to_return;
381}
382
384{
385 FUNCDEF("acquire_for_identifier");
386 infoton *to_return = NULL_POINTER;
387 GRAB_LOCK;
388 entity_basket *bask = _table->find(id._entity);
389 if (!bask) return NULL_POINTER;
390 if (!bask->elements()) {
391#ifdef DEBUG_ENTITY_DATA_BIN
392 LOG(astring("tossing empty basket ") + id._entity.mangled_form());
393#endif
394 _table->zap(id._entity);
395 return NULL_POINTER;
396 }
397 for (int i = 0; i < bask->elements(); i++) {
398 if (bask->get(i)->_id == id) {
399 to_return = bask->borrow(i)->_item; // snag the item.
400 bask->borrow(i)->_item = NULL_POINTER; // clear the list's version out.
401 bask->zap(i, i); // whack the sanitized element.
403 if (!bask->elements()) {
404#ifdef DEBUG_ENTITY_DATA_BIN
405 LOG(astring("tossing empty basket ") + id._entity.mangled_form());
406#endif
407 _table->zap(id._entity);
408 }
409 _items_held--;
410//#ifdef DEBUG_ENTITY_DATA_BIN
411 if (_items_held < 0)
412 LOG("logic error: number of items went below zero.");
413//#endif
414 return to_return;
415 }
416 }
417 return NULL_POINTER;
418}
419
420bool cleaning_applier(const octopus_entity &key, entity_basket &bask,
421 void *data_link)
422{
423 #define static_class_name() "entity_data_bin"
424 FUNCDEF("cleaning_applier");
425 apply_struct *apple = (apply_struct *)data_link;
426 time_stamp expiration_time(-apple->_decay_interval);
427
428 int whack_count = 0;
429 for (int i = 0; i < bask.elements(); i++) {
430 infoton_holder &rec = *bask.borrow(i);
431 if (rec._when_added <= expiration_time) {
432 // if a requester hasn't picked this up in N seconds, then drop it.
433#ifdef DEBUG_ENTITY_DATA_BIN
434 LOG(astring("whacking old item ") + rec._id.text_form());
435#endif
436 whack_count++;
437 apple->_items_held--;
438//#ifdef DEBUG_ENTITY_DATA_BIN
439 if (apple->_items_held < 0)
440 LOG("logic error: number of items went below zero.");
441//#endif
442 bask.zap(i, i);
443 i--; // skip back before the delete.
444 } else {
445 // NOTE: this break is based on an assumption about the storage of
446 // items; if it's ever the case in the future that items can be
447 // disordered on time of arrival in the queue, then the break should
448 // be removed.
449 break;
450 }
451 }
452#ifdef DEBUG_ENTITY_DATA_BIN
453 if (whack_count)
454 LOG(a_sprintf("==> whacked %d old items.", whack_count));
455#endif
456 if (!bask.elements()) {
457 // if the basket has nothing left in it then we signal the parent that
458 // it can be deleted.
459//LOG("adding to empty basket list.");
460 *apple->_empty_baskets += key;
461//LOG("added to empty basket list.");
462 }
463
464 // keep iterating on items unless we know it's time to go.
465 return true;
466 #undef static_class_name
467}
468
470{
471#ifdef DEBUG_ENTITY_DATA_BIN
472 FUNCDEF("clean_out_deadwood");
473#endif
474 GRAB_LOCK;
475 // check that no items have timed out.
476 apply_struct apple(_items_held);
477 basketcase empty_baskets;
478 apple._empty_baskets = &empty_baskets;
479 apple._decay_interval = decay_interval;
480 _table->apply(cleaning_applier, &apple);
481
482 // clean up any entities whose baskets are empty.
483 for (int i = empty_baskets.length() - 1; i >= 0; i--) {
484#ifdef DEBUG_ENTITY_DATA_BIN
485 LOG(astring("removing basket ") + empty_baskets.get(i).mangled_form());
486#endif
487 _table->zap(empty_baskets.get(i));
488 empty_baskets.zap(i, i);
489 // we don't skip back since we're scanning the array from its end.
490 }
491}
492
493bool entity_data_bin::get_sizes(const octopus_entity &id, int &items, int &bytes) const
494{
495 FUNCDEF("get_sizes");
496 items = 0;
497 bytes = 0;
498 GRAB_LOCK;
499 entity_basket *bask = _table->find(id);
500 if (!bask || !bask->elements()) return false;
501 items = bask->elements();
502 for (int i = 0; i < bask->elements(); i++)
503 bytes += bask->borrow(i)->_item->packed_size();
504 return true;
505}
506
507} //namespace.
508
#define LOG(s)
a_sprintf is a specialization of astring that provides printf style support.
Definition astring.h:440
Provides a dynamically resizable ASCII character string.
Definition astring.h:35
virtual void text_form(base_string &state_fill) const
Provides a text view of all the important info owned by this object.
Definition astring.cpp:130
entity_data_bin(int max_bytes_per_entity)
allows each entity in the bin to have "max_bytes_per_entity" bytes stored.
bool add_item(infoton *to_add, const octopus_request_id &id)
infoton * acquire_for_any(octopus_request_id &id)
infoton * acquire_for_identifier(const octopus_request_id &id)
void clean_out_deadwood(int decay_interval=4 *basis::MINUTE_ms)
basis::astring text_form() const
bool get_sizes(const octopus_entity &id, int &items, int &bytes) const
infoton * acquire_for_entity(const octopus_entity &requester, octopus_request_id &id)
implements a list of waiting infotons.
a list of pending requests and who made them.
An infoton is an individual request parcel with accompanying information.
Definition infoton.h:32
virtual void text_form(basis::base_string &state_fill) const =0
requires derived infotons to be able to show their state as a string.
virtual int packed_size() const =0
reports how large the infoton will be when packed.
const structures::string_array & classifier() const
this array of strings is the "name" for this infoton.
Definition infoton.cpp:85
Provides a way of identifying users of an octopus object.
Definition entity_defs.h:35
basis::astring mangled_form() const
returns the combined string form of the identifier.
Identifies requests made on an octopus by users.
basis::astring text_form() const
human readable form of the request.
octopus_entity _entity
the entity.
int elements() const
the maximum number of elements currently allowed in this amorph.
Definition amorph.h:66
basis::outcome append(const contents *data)
puts "data" on the end of this amorph.
Definition amorph.h:303
void reset()
cleans out all of the contents.
Definition amorph.h:81
const infoton_holder * get(int field) const
Returns a constant pointer to the information at the index "field".
Definition amorph.h:312
Implements hashing into buckets for quick object access.
Definition hash_table.h:75
A hashing algorithm takes a key and derives a related integer from it.
Definition hash_table.h:46
virtual basis::un_int hash(const void *key_data, int key_length) const =0
returns a hash value based on the "key_data" and "key_length".
Emulates a mathematical set, providing several standard set operations.
Definition set.h:36
basis::astring text_form() const
A synonym for the text_format() method.
static const char * platform_eol_to_chars()
provides the characters that make up this platform's line ending.
Represents a point in time relative to the operating system startup time.
Definition time_stamp.h:38
basis::astring text_form(stamp_display_style style=STAMP_RELATIVE) const
returns a simple textual representation of the time_stamp.
#define formal(parameter)
This macro just eats what it's passed; it marks unused formal parameters.
Definition definitions.h:48
#define NULL_POINTER
The value representing a pointer to nothing.
Definition definitions.h:32
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
Definition enhance_cpp.h:54
#define GRAB_LOCK
#define DUMP_STATE
The guards collection helps in testing preconditions and reporting errors.
Definition array.h:30
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
Definition functions.h:121
unsigned int un_int
Abbreviated name for unsigned integers.
Definition definitions.h:62
const int KILOBYTE
Number of bytes in a kilobyte.
A logger that sends to the console screen using the standard output device.
bool text_form_applier(const octopus_entity &formal(key), entity_basket &bask, void *data_link)
bool cleaning_applier(const octopus_entity &key, entity_basket &bask, void *data_link)
const int OCTOPUS_TABLE_BITS
bool scramble_applier(const octopus_entity &formal(key), entity_basket &bask, void *data_link)
bool any_item_applier(const octopus_entity &formal(key), entity_basket &bask, void *data_link)
A dynamic container class that holds any kind of object via pointers.
Definition amorph.h:55
#include <time.h>