1 /*****************************************************************************\
4 * Author : Chris Koeritz *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
15 #include "entity_data_bin.h"
16 #include "entity_defs.h"
20 #include <basis/astring.h>
21 #include <basis/mutex.h>
22 #include <processes/ethread.h>
23 #include <structures/amorph.h>
25 using namespace basis;
26 using namespace processes;
27 using namespace structures;
31 //#define DEBUG_TENTACLE
32 // uncomment for noisier version.
34 #undef GRAB_CONSUMER_LOCK
35 #define GRAB_CONSUMER_LOCK auto_synchronizer l(*_input_guard)
38 #define LOG(t) CLASS_EMERGENCY_LOG(program_wide_logger::get(), t)
42 struct infoton_record {
44 octopus_request_id _id;
46 infoton_record(infoton *product, octopus_request_id id)
47 : _product(product), _id(id) {}
49 ~infoton_record() { WHACK(_product); }
52 class queueton : public amorph<infoton_record> {};
56 class pod_motivator : public ethread
59 pod_motivator(tentacle &parent, int motivational_rate)
60 : ethread(motivational_rate, ethread::SLACK_INTERVAL),
63 void perform_activity(void *formal(ptr)) { _parent.propel_arm(); }
71 tentacle::tentacle(const string_array &group_name, bool backgrounded,
72 int motivational_rate)
73 : _group(new string_array(group_name)),
74 _pending(new queueton),
75 _input_guard(new mutex),
78 _backgrounded(backgrounded)
80 // we only start the thread if they've said they'll support backgrounding.
82 _action = new pod_motivator(*this, motivational_rate);
87 if (_action) _action->stop();
94 const string_array &tentacle::group() const { return *_group; }
96 const char *tentacle::outcome_name(const outcome &to_name)
97 { return common::outcome_name(to_name); }
99 int tentacle::motivational_rate() const
100 { if (_action) return _action->sleep_time(); else return 0; }
102 void tentacle::attach_storage(entity_data_bin &storage)
104 _products = &storage;
105 if (_action) _action->start(NIL);
108 void tentacle::detach_storage()
110 if (_action) _action->stop();
114 bool tentacle::store_product(infoton *product,
115 const octopus_request_id &original_id)
117 #ifdef DEBUG_TENTACLE
118 FUNCDEF("store_product");
121 #ifdef DEBUG_TENTACLE
122 LOG("storage bunker has not been established!");
126 return _products->add_item(product, original_id);
129 outcome tentacle::enqueue(infoton *to_chow, const octopus_request_id &item_id)
133 // this may be a bad assumption, but here goes: we assume that the limit
134 // on per entity storage in the bin is pretty much the same as a reasonable
135 // limit here on the queue of pending items. we need to limit it and would
136 // rather not add another numerical parameter to the constructor.
138 max_size = _products->max_bytes_per_entity();
141 // check that the pending queue is also constrained.
142 for (int i = 0; i < _pending->elements(); i++) {
143 curr_size += _pending->borrow(i)->_product->packed_size();
145 if (curr_size + to_chow->packed_size() > max_size) {
150 *_pending += new infoton_record(to_chow, item_id);
151 //is there ever a failure outcome?
152 //yes, when space is tight!
156 infoton *tentacle::next_request(octopus_request_id &item_id)
159 if (!_pending->elements()) return NIL; // nothing to return.
160 infoton *to_return = (*_pending)[0]->_product;
161 (*_pending)[0]->_product = NIL;
162 // clean out so destructor doesn't delete the object.
163 item_id = (*_pending)[0]->_id;
168 void tentacle::propel_arm()
170 #ifdef DEBUG_TENTACLE
171 FUNCDEF("propel_arm");
173 infoton *next_item = NIL;
175 octopus_request_id id;
176 next_item = next_request(id);
177 if (!next_item) break;
179 outcome ret = consume(*next_item, id, ignored);
181 #ifdef DEBUG_TENTACLE
182 LOG(astring("failed to act on ") + next_item->classifier().text_form());
185 WHACK(next_item); // fulfill responsibility for cleanup.