first check-in of feisty meow codebase. many things broken still due to recent
[feisty_meow.git] / octopi / library / octopus / tentacle.cpp
1 /*****************************************************************************\
2 *                                                                             *
3 *  Name   : tentacle                                                          *
4 *  Author : Chris Koeritz                                                     *
5 *                                                                             *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author.  This program is free software; you can  *
8 * redistribute it and/or modify it under the terms of the GNU General Public  *
9 * License as published by the Free Software Foundation; either version 2 of   *
10 * the License or (at your option) any later version.  This is online at:      *
11 *     http://www.fsf.org/copyleft/gpl.html                                    *
12 * Please send any updates to: fred@gruntose.com                               *
13 \*****************************************************************************/
14
15 #include "entity_data_bin.h"
16 #include "entity_defs.h"
17 #include "infoton.h"
18 #include "tentacle.h"
19
20 #include <basis/astring.h>
21 #include <basis/mutex.h>
22 #include <processes/ethread.h>
23 #include <structures/amorph.h>
24
25 using namespace basis;
26 using namespace processes;
27 using namespace structures;
28
29 namespace octopi {
30
31 //#define DEBUG_TENTACLE
32   // uncomment for noisier version.
33
34 #undef GRAB_CONSUMER_LOCK
35 #define GRAB_CONSUMER_LOCK auto_synchronizer l(*_input_guard)
36
37 #undef LOG
38 #define LOG(t) CLASS_EMERGENCY_LOG(program_wide_logger::get(), t)
39
40 //////////////
41
42 struct infoton_record {
43   infoton *_product;
44   octopus_request_id _id;
45
46   infoton_record(infoton *product, octopus_request_id id)
47       : _product(product), _id(id) {}
48
49   ~infoton_record() { WHACK(_product); }
50 };
51
52 class queueton : public amorph<infoton_record> {};
53
54 //////////////
55
56 class pod_motivator : public ethread
57 {
58 public:
59   pod_motivator(tentacle &parent, int motivational_rate)
60   : ethread(motivational_rate, ethread::SLACK_INTERVAL),
61     _parent(parent) {}
62
63   void perform_activity(void *formal(ptr)) { _parent.propel_arm(); }
64
65 private:
66   tentacle &_parent;
67 };
68
69 //////////////
70
71 tentacle::tentacle(const string_array &group_name, bool backgrounded,
72     int motivational_rate)
73 : _group(new string_array(group_name)),
74   _pending(new queueton),
75   _input_guard(new mutex),
76   _action(NIL),
77   _products(NIL),
78   _backgrounded(backgrounded)
79 {
80   // we only start the thread if they've said they'll support backgrounding.
81   if (backgrounded)
82     _action = new pod_motivator(*this, motivational_rate);
83 }
84
85 tentacle::~tentacle()
86 {
87   if (_action) _action->stop();
88   WHACK(_action);
89   WHACK(_group);
90   WHACK(_pending);
91   WHACK(_input_guard);
92 }
93
94 const string_array &tentacle::group() const { return *_group; }
95
96 const char *tentacle::outcome_name(const outcome &to_name)
97 { return common::outcome_name(to_name); }
98
99 int tentacle::motivational_rate() const
100 { if (_action) return _action->sleep_time(); else return 0; }
101
102 void tentacle::attach_storage(entity_data_bin &storage)
103 {
104   _products = &storage;
105   if (_action) _action->start(NIL);
106 }
107
108 void tentacle::detach_storage()
109 {
110   if (_action) _action->stop();
111   _products = NIL;
112 }
113
114 bool tentacle::store_product(infoton *product,
115     const octopus_request_id &original_id)
116 {
117 #ifdef DEBUG_TENTACLE
118   FUNCDEF("store_product");
119 #endif
120   if (!_products) {
121 #ifdef DEBUG_TENTACLE
122     LOG("storage bunker has not been established!");
123 #endif
124     return false;
125   }
126   return _products->add_item(product, original_id);
127 }
128
129 outcome tentacle::enqueue(infoton *to_chow, const octopus_request_id &item_id)
130 {
131   GRAB_CONSUMER_LOCK;
132   int max_size = 0;
133   // this may be a bad assumption, but here goes: we assume that the limit
134   // on per entity storage in the bin is pretty much the same as a reasonable
135   // limit here on the queue of pending items.  we need to limit it and would
136   // rather not add another numerical parameter to the constructor.
137   if (_products)
138     max_size = _products->max_bytes_per_entity();
139   int curr_size = 0;
140   if (max_size) {
141     // check that the pending queue is also constrained.
142     for (int i = 0; i < _pending->elements(); i++) {
143       curr_size += _pending->borrow(i)->_product->packed_size();
144     }
145     if (curr_size + to_chow->packed_size() > max_size) {
146       WHACK(to_chow);
147       return NO_SPACE;
148     }
149   }
150   *_pending += new infoton_record(to_chow, item_id);
151 //is there ever a failure outcome?
152 //yes, when space is tight!
153   return OKAY;
154 }
155
156 infoton *tentacle::next_request(octopus_request_id &item_id)
157 {
158   GRAB_CONSUMER_LOCK;
159   if (!_pending->elements()) return NIL;  // nothing to return.
160   infoton *to_return = (*_pending)[0]->_product;
161   (*_pending)[0]->_product = NIL;
162     // clean out so destructor doesn't delete the object.
163   item_id = (*_pending)[0]->_id;
164   _pending->zap(0, 0);
165   return to_return;
166 }
167
168 void tentacle::propel_arm()
169 {
170 #ifdef DEBUG_TENTACLE
171   FUNCDEF("propel_arm");
172 #endif
173   infoton *next_item = NIL;
174   do {
175     octopus_request_id id;
176     next_item = next_request(id);
177     if (!next_item) break;
178     byte_array ignored;
179     outcome ret = consume(*next_item, id, ignored);
180     if (ret != OKAY) {
181 #ifdef DEBUG_TENTACLE
182       LOG(astring("failed to act on ") + next_item->classifier().text_form());
183 #endif
184     }
185     WHACK(next_item);  // fulfill responsibility for cleanup.
186   } while (next_item);
187 }
188
189 } //namespace.
190