1 /*****************************************************************************\
4 * Author : Chris Koeritz *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
15 #include "entity_data_bin.h"
16 #include "entity_defs.h"
20 #include <basis/astring.h>
21 #include <basis/mutex.h>
22 #include <loggers/program_wide_logger.h>
23 #include <processes/ethread.h>
24 #include <structures/amorph.h>
26 using namespace basis;
27 using namespace loggers;
28 using namespace processes;
29 using namespace structures;
33 //#define DEBUG_TENTACLE
34 // uncomment for noisier version.
36 #undef GRAB_CONSUMER_LOCK
37 #define GRAB_CONSUMER_LOCK auto_synchronizer l(*_input_guard)
40 #define LOG(t) CLASS_EMERGENCY_LOG(program_wide_logger::get(), t)
44 struct infoton_record {
46 octopus_request_id _id;
48 infoton_record(infoton *product, octopus_request_id id)
49 : _product(product), _id(id) {}
51 ~infoton_record() { WHACK(_product); }
54 class queueton : public amorph<infoton_record> {};
58 class pod_motivator : public ethread
61 pod_motivator(tentacle &parent, int motivational_rate)
62 : ethread(motivational_rate, ethread::SLACK_INTERVAL),
65 void perform_activity(void *formal(ptr)) { _parent.propel_arm(); }
73 tentacle::tentacle(const string_array &group_name, bool backgrounded,
74 int motivational_rate)
75 : _group(new string_array(group_name)),
76 _pending(new queueton),
77 _input_guard(new mutex),
78 _action(NULL_POINTER),
79 _products(NULL_POINTER),
80 _backgrounded(backgrounded)
82 // we only start the thread if they've said they'll support backgrounding.
84 _action = new pod_motivator(*this, motivational_rate);
89 if (_action) _action->stop();
96 const string_array &tentacle::group() const { return *_group; }
98 const char *tentacle::outcome_name(const outcome &to_name)
99 { return common::outcome_name(to_name); }
101 int tentacle::motivational_rate() const
102 { if (_action) return _action->sleep_time(); else return 0; }
104 entity_data_bin *tentacle::get_storage() { return _products; }
106 void tentacle::attach_storage(entity_data_bin &storage)
108 _products = &storage;
109 if (_action) _action->start(NULL_POINTER);
112 void tentacle::detach_storage()
114 if (_action) _action->stop();
115 _products = NULL_POINTER;
118 bool tentacle::store_product(infoton *product,
119 const octopus_request_id &original_id)
121 FUNCDEF("store_product");
123 //#ifdef DEBUG_TENTACLE
124 LOG("storage bunker has not been established!");
128 return _products->add_item(product, original_id);
131 outcome tentacle::enqueue(infoton *to_chow, const octopus_request_id &item_id)
135 // this may be a bad assumption, but here goes: we assume that the limit
136 // on per entity storage in the bin is pretty much the same as a reasonable
137 // limit here on the queue of pending items. we need to limit it and would
138 // rather not add another numerical parameter to the constructor.
140 max_size = _products->max_bytes_per_entity();
143 // check that the pending queue is also constrained.
144 for (int i = 0; i < _pending->elements(); i++) {
145 curr_size += _pending->borrow(i)->_product->packed_size();
147 if (curr_size + to_chow->packed_size() > max_size) {
152 *_pending += new infoton_record(to_chow, item_id);
153 //is there ever a failure outcome?
154 //yes, when space is tight!
158 infoton *tentacle::next_request(octopus_request_id &item_id)
161 if (!_pending->elements()) return NULL_POINTER; // nothing to return.
162 infoton *to_return = (*_pending)[0]->_product;
163 (*_pending)[0]->_product = NULL_POINTER;
164 // clean out so destructor doesn't delete the object.
165 item_id = (*_pending)[0]->_id;
170 void tentacle::propel_arm()
172 FUNCDEF("propel_arm");
173 infoton *next_item = NULL_POINTER;
175 octopus_request_id id;
176 next_item = next_request(id);
177 if (!next_item) break;
179 outcome ret = consume(*next_item, id, ignored);
181 #ifdef DEBUG_TENTACLE
182 LOG(astring("failed to act on ") + next_item->classifier().text_form());
185 WHACK(next_item); // fulfill responsibility for cleanup.