fixing extensions
[feisty_meow.git] / octopi / library / octopus / tentacle.cpp
1 /*****************************************************************************\
2 *                                                                             *
3 *  Name   : tentacle                                                          *
4 *  Author : Chris Koeritz                                                     *
5 *                                                                             *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author.  This program is free software; you can  *
8 * redistribute it and/or modify it under the terms of the GNU General Public  *
9 * License as published by the Free Software Foundation; either version 2 of   *
10 * the License or (at your option) any later version.  This is online at:      *
11 *     http://www.fsf.org/copyleft/gpl.html                                    *
12 * Please send any updates to: fred@gruntose.com                               *
13 \*****************************************************************************/
14
15 #include "entity_data_bin.h"
16 #include "entity_defs.h"
17 #include "infoton.h"
18 #include "tentacle.h"
19
20 #include <basis/astring.h>
21 #include <basis/mutex.h>
22 #include <loggers/program_wide_logger.h>
23 #include <processes/ethread.h>
24 #include <structures/amorph.h>
25
26 using namespace basis;
27 using namespace loggers;
28 using namespace processes;
29 using namespace structures;
30
31 namespace octopi {
32
33 //#define DEBUG_TENTACLE
34   // uncomment for noisier version.
35
36 #undef GRAB_CONSUMER_LOCK
37 #define GRAB_CONSUMER_LOCK auto_synchronizer l(*_input_guard)
38
39 #undef LOG
40 #define LOG(t) CLASS_EMERGENCY_LOG(program_wide_logger::get(), t)
41
42 //////////////
43
44 struct infoton_record {
45   infoton *_product;
46   octopus_request_id _id;
47
48   infoton_record(infoton *product, octopus_request_id id)
49       : _product(product), _id(id) {}
50
51   ~infoton_record() { WHACK(_product); }
52 };
53
54 class queueton : public amorph<infoton_record> {};
55
56 //////////////
57
58 class pod_motivator : public ethread
59 {
60 public:
61   pod_motivator(tentacle &parent, int motivational_rate)
62   : ethread(motivational_rate, ethread::SLACK_INTERVAL),
63     _parent(parent) {}
64
65   void perform_activity(void *formal(ptr)) { _parent.propel_arm(); }
66
67 private:
68   tentacle &_parent;
69 };
70
71 //////////////
72
73 tentacle::tentacle(const string_array &group_name, bool backgrounded,
74     int motivational_rate)
75 : _group(new string_array(group_name)),
76   _pending(new queueton),
77   _input_guard(new mutex),
78   _action(NULL_POINTER),
79   _products(NULL_POINTER),
80   _backgrounded(backgrounded)
81 {
82   // we only start the thread if they've said they'll support backgrounding.
83   if (backgrounded)
84     _action = new pod_motivator(*this, motivational_rate);
85 }
86
87 tentacle::~tentacle()
88 {
89   if (_action) _action->stop();
90   WHACK(_action);
91   WHACK(_group);
92   WHACK(_pending);
93   WHACK(_input_guard);
94 }
95
96 const string_array &tentacle::group() const { return *_group; }
97
98 const char *tentacle::outcome_name(const outcome &to_name)
99 { return common::outcome_name(to_name); }
100
101 int tentacle::motivational_rate() const
102 { if (_action) return _action->sleep_time(); else return 0; }
103
104 entity_data_bin *tentacle::get_storage() { return _products; }
105
106 void tentacle::attach_storage(entity_data_bin &storage)
107 {
108   _products = &storage;
109   if (_action) _action->start(NULL_POINTER);
110 }
111
112 void tentacle::detach_storage()
113 {
114   if (_action) _action->stop();
115   _products = NULL_POINTER;
116 }
117
118 bool tentacle::store_product(infoton *product,
119     const octopus_request_id &original_id)
120 {
121   FUNCDEF("store_product");
122   if (!_products) {
123 //#ifdef DEBUG_TENTACLE
124     LOG("storage bunker has not been established!");
125 //#endif
126     return false;
127   }
128   return _products->add_item(product, original_id);
129 }
130
131 outcome tentacle::enqueue(infoton *to_chow, const octopus_request_id &item_id)
132 {
133   GRAB_CONSUMER_LOCK;
134   int max_size = 0;
135   // this may be a bad assumption, but here goes: we assume that the limit
136   // on per entity storage in the bin is pretty much the same as a reasonable
137   // limit here on the queue of pending items.  we need to limit it and would
138   // rather not add another numerical parameter to the constructor.
139   if (_products)
140     max_size = _products->max_bytes_per_entity();
141   int curr_size = 0;
142   if (max_size) {
143     // check that the pending queue is also constrained.
144     for (int i = 0; i < _pending->elements(); i++) {
145       curr_size += _pending->borrow(i)->_product->packed_size();
146     }
147     if (curr_size + to_chow->packed_size() > max_size) {
148       WHACK(to_chow);
149       return NO_SPACE;
150     }
151   }
152   *_pending += new infoton_record(to_chow, item_id);
153 //is there ever a failure outcome?
154 //yes, when space is tight!
155   return OKAY;
156 }
157
158 infoton *tentacle::next_request(octopus_request_id &item_id)
159 {
160   GRAB_CONSUMER_LOCK;
161   if (!_pending->elements()) return NULL_POINTER;  // nothing to return.
162   infoton *to_return = (*_pending)[0]->_product;
163   (*_pending)[0]->_product = NULL_POINTER;
164     // clean out so destructor doesn't delete the object.
165   item_id = (*_pending)[0]->_id;
166   _pending->zap(0, 0);
167   return to_return;
168 }
169
170 void tentacle::propel_arm()
171 {
172   FUNCDEF("propel_arm");
173   infoton *next_item = NULL_POINTER;
174   do {
175     octopus_request_id id;
176     next_item = next_request(id);
177     if (!next_item) break;
178     byte_array ignored;
179     outcome ret = consume(*next_item, id, ignored);
180     if (ret != OKAY) {
181 #ifdef DEBUG_TENTACLE
182       LOG(astring("failed to act on ") + next_item->classifier().text_form());
183 #endif
184     }
185     WHACK(next_item);  // fulfill responsibility for cleanup.
186   } while (next_item);
187 }
188
189 } //namespace.
190