Merge branch 'dev' of feistymeow.org:feisty_meow into dev
[feisty_meow.git] / nucleus / library / processes / post_office.cpp
1 /*****************************************************************************\
2 *                                                                             *
3 *  Name   : post_office                                                       *
4 *  Author : Chris Koeritz                                                     *
5 *                                                                             *
6 *******************************************************************************
7 * Copyright (c) 1998-$now By Author.  This program is free software; you can  *
8 * redistribute it and/or modify it under the terms of the GNU General Public  *
9 * License as published by the Free Software Foundation; either version 2 of   *
10 * the License or (at your option) any later version.  This is online at:      *
11 *     http://www.fsf.org/copyleft/gpl.html                                    *
12 * Please send any updates to: fred@gruntose.com                               *
13 \*****************************************************************************/
14
15 #include "ethread.h"
16 #include "letter.h"
17 #include "mailbox.h"
18 #include "post_office.h"
19 #include "thread_cabinet.h"
20
21 #include <basis/mutex.h>
22 #include <configuration/application_configuration.h>
23 #include <loggers/program_wide_logger.h>
24 #include <structures/set.h>
25 #include <structures/amorph.h>
26 #include <structures/unique_id.h>
27 #include <textual/parser_bits.h>
28 #include <timely/time_stamp.h>
29
30 using namespace basis;
31 using namespace configuration;
32 using namespace loggers;
33 using namespace structures;
34 using namespace textual;
35 using namespace timely;
36
37 namespace processes {
38
39 //#define DEBUG_POST_OFFICE
40   // uncomment if you want the noisy version.
41
42 #undef LOG
43 #define LOG(a) CLASS_EMERGENCY_LOG(program_wide_logger::get(), a)
44
45 const int CLEANING_INTERVAL = 14 * SECOND_ms;
46   // the interval between cleaning of extra letters and dead mailboxes.
47
48 const int SNOOZE_TIME_FOR_POSTMAN = 42;
49   // we'll snooze for this long if absolutely nothing happened during the
50   // thread's activation.  if things are going on, our snooze time is reduced
51   // by the length of time we were delivering items.
52
53 const int DELIVERIES_ALLOWED = 350;
54   // the maximum number of deliveries we'll try to get done per thread run.
55
56 //////////////
57
58 //hmmm: arrhhh--maybe we need to spawn a thread per postal route.
59
60 class postal_carrier : public ethread
61 {
62 public:
63   postal_carrier(post_office &parent, const unique_int &route)
64   : ethread(SNOOZE_TIME_FOR_POSTMAN, ethread::SLACK_INTERVAL),
65     _parent(parent),
66     _route(route)
67   {}
68
69   DEFINE_CLASS_NAME("postal_carrier");
70
71   void perform_activity(void *) {
72     FUNCDEF("perform_activity");
73     bool finished;
74     try {
75       finished = _parent.deliver_mail_on_route(_route, *this); 
76     } catch(...) {
77       LOG(astring("caught exception during mail delivery!"));
78     }
79     if (!finished) {
80       // not finished delivering all items.
81       reschedule();
82     } else {
83       reschedule(SNOOZE_TIME_FOR_POSTMAN);
84     }
85   }
86
87 private:
88   post_office &_parent;
89   unique_int _route;
90 };
91
92 //////////////
93
94 class postal_cache : public mailbox {};
95
96 //////////////
97
98 class tagged_mail_stop : public virtual text_formable
99 {
100 public:
101   mail_stop *_route;
102   unique_int _thread_id;
103   unique_int _id;
104
105   tagged_mail_stop(const unique_int &id = 0, mail_stop *route = NULL_POINTER,
106           const unique_int &thread_id = 0)
107       : _route(route), _thread_id(thread_id), _id(id) {}
108
109   DEFINE_CLASS_NAME("tagged_mail_stop");
110
111   virtual void text_form(basis::base_string &fill) const {
112     fill.assign(text_form());
113   }
114
115   virtual astring text_form() const {
116     return a_sprintf("%s: id=%d, addr=%08lx, thr_id=%d",
117         static_class_name(), _id.raw_id(), _route, _thread_id.raw_id());
118   }
119 };
120
121 //////////////
122
123 class route_map : public amorph<tagged_mail_stop>
124 {
125 public:
126   tagged_mail_stop *find(const unique_int &id) {
127     for (int i = 0; i < elements(); i++) {
128       tagged_mail_stop *curr = borrow(i);
129       if (curr && (curr->_id == id)) return curr;
130     }
131     return NULL_POINTER;
132   }
133
134   bool zap(const unique_int &id) {
135     for (int i = 0; i < elements(); i++) {
136       tagged_mail_stop *curr = borrow(i);
137       if (curr && (curr->_id == id)) {
138         amorph<tagged_mail_stop>::zap(i, i);
139         return true;
140       }
141     }
142     return false;
143   }
144
145 };
146
147 //////////////
148
149 class letter_morph : public amorph<letter> {};
150
151 //////////////
152
153 post_office::post_office()
154 : _post(new mailbox),
155   _routes(new route_map),
156   _next_cleaning(new time_stamp),
157   _threads(new thread_cabinet)
158 {
159 }
160
161 post_office::~post_office()
162 {
163   stop_serving();
164   WHACK(_post);
165   WHACK(_routes);
166   WHACK(_next_cleaning);
167   WHACK(_threads);
168 }
169
170 void post_office::show_routes(astring &to_fill)
171 {
172   auto_synchronizer l(c_mutt);
173 //hmmm: simplify this; just use the int_set returning func and print that.
174   astring current_line;
175   astring temp;
176   if (_routes->elements())
177     to_fill += astring("Mail Delivery Routes:") + parser_bits::platform_eol_to_chars();
178
179   for (int i = 0; i < _routes->elements(); i++) {
180     const tagged_mail_stop *tag = _routes->get(i);
181     if (!tag) continue;
182     temp = astring(astring::SPRINTF, "%d ", tag->_id.raw_id());
183     if (current_line.length() + temp.length() >= 80) {
184       current_line += parser_bits::platform_eol_to_chars();
185       to_fill += current_line;
186       current_line.reset();
187     }
188     current_line += temp;
189   }
190   // catch the last line we created.
191   if (!!current_line) to_fill += current_line;
192 }
193
194 void post_office::stop_serving() { if (_threads) _threads->stop_all(); }
195
196 void post_office::show_mail(astring &output)
197 {
198   output.reset();
199   output += parser_bits::platform_eol_to_chars();
200   output += astring("Mailbox Contents at ") + time_stamp::notarize(true)
201       + parser_bits::platform_eol_to_chars() + parser_bits::platform_eol_to_chars();
202   astring box_state;
203   _post->show(box_state); 
204   if (box_state.t()) output += box_state;
205   else
206     output += astring("No items are awaiting delivery.")
207         + parser_bits::platform_eol_to_chars();
208 }
209
210 void post_office::drop_off(const unique_int &id, letter *package)
211 {
212 #ifdef DEBUG_POST_OFFICE
213   FUNCDEF("drop_off");
214   LOG(astring(astring::SPRINTF, "mailbox drop for %d: ", id)
215       + package->text_form());
216 #endif
217   _post->drop_off(id, package); 
218 #ifdef DEBUG_POST_OFFICE
219   if (!route_listed(id)) {
220     LOG(a_sprintf("letter for %d has no route!", id));
221   }
222 #endif
223 }
224
225 bool post_office::pick_up(const unique_int &id, letter * &package)
226 {
227 #ifdef DEBUG_POST_OFFICE
228   FUNCDEF("pick_up");
229 #endif
230   bool to_return = _post->pick_up(id, package); 
231 #ifdef DEBUG_POST_OFFICE
232   if (to_return)
233     LOG(astring(astring::SPRINTF, "mailbox grab for %d: ", id)
234         + package->text_form());
235 #endif
236   return to_return;
237 }
238
239 bool post_office::route_listed(const unique_int &id)
240 {
241   int_set route_set;
242   get_route_list(route_set);
243   return route_set.member(id.raw_id());
244 }
245
246 void post_office::get_route_list(int_set &route_set)
247 {
248   auto_synchronizer l(c_mutt);
249
250   // gather the set of routes that we should carry mail to.
251   route_set.reset();
252
253   if (!_routes->elements()) {
254     // if there are no elements, why bother iterating?
255     return;
256   }
257
258   for (int i = 0; i < _routes->elements(); i++) {
259     const tagged_mail_stop *tag = _routes->get(i);
260     if (!tag) continue;
261     route_set.add(tag->_id.raw_id());
262   }
263 }
264
265 void post_office::clean_package_list(post_office &formal(post),
266     letter_morph &to_clean)
267 {
268   FUNCDEF("clean_package_list");
269   auto_synchronizer l(c_mutt);
270
271   // recycle all the stuff we had in the list.
272   while (to_clean.elements()) {
273     letter *package = to_clean.acquire(0);
274     to_clean.zap(0, 0);
275     if (!package) {
276       LOG("saw empty package in list to clean!");
277       continue;
278     }
279     WHACK(package);
280   }
281 }
282
283 bool post_office::deliver_mail_on_route(const unique_int &route,
284     ethread &carrier)
285 {
286   FUNCDEF("deliver_mail_on_route");
287   auto_synchronizer l(c_mutt);
288
289 #ifdef DEBUG_POST_OFFICE
290   time_stamp enter;
291 #endif
292   if (carrier.should_stop()) return true;  // get out if thread was told to.
293
294   int deliveries = 0;  // number of items delivered so far.
295   letter_morph items_for_route;
296     // holds the items that need to be sent to this route.
297
298   // pickup all of the mail that we can for this route.
299   while (deliveries < DELIVERIES_ALLOWED) {
300     if (carrier.should_stop())
301       return true;  // get out if thread was told to.
302     letter *package;
303     if (!_post->pick_up(route, package)) {
304       // there are no more letters for this route.
305       break;  // skip out of the loop.
306     }
307     deliveries++;  // count this item as a delivery.
308     items_for_route.append(package);
309   }
310
311   if (!items_for_route.elements()) return true;  // nothing to handle.
312
313   // locate the destination for this route.
314   tagged_mail_stop *real_route = _routes->find(route);  // find the route.
315   if (!real_route) {
316     // we failed to find the route we wanted...
317     LOG(astring(astring::SPRINTF, "route %d disappeared!", route.raw_id()));
318     clean_package_list(*this, items_for_route);
319     return true;
320   }
321
322   // now deliver what we found for this route.
323   for (int t = 0; t < items_for_route.elements(); t++) {
324     if (carrier.should_stop()) {
325       // get out if thread was told to.
326       return true;
327     }
328     letter *package = items_for_route.acquire(t);
329     // hand the package out on the route.
330     mail_stop::items_to_deliver pack(route, package);
331     real_route->_route->invoke_callback(pack);
332       // the callee is responsible for cleaning up.
333   }
334
335   bool finished_all = (deliveries < DELIVERIES_ALLOWED);
336     // true if we handled everything we could have.
337
338   if (carrier.should_stop()) return true;  // get out if thread was told to.
339
340   // this bit is for the post office at large, but we don't want an extra
341   // thread when we've got all these others handy.
342   bool cleaning_time = time_stamp() > *_next_cleaning;
343   if (cleaning_time) {
344     _post->clean_up();  // get rid of dead mailboxes in main post office.
345     _next_cleaning->reset(CLEANING_INTERVAL);
346   }
347
348   time_stamp exit;
349 #ifdef DEBUG_POST_OFFICE
350   int duration = int(exit.value() - enter.value());
351   if (duration > 20)
352     LOG(a_sprintf("deliveries took %d ms.", duration));
353 #endif
354   return finished_all;
355 }
356
357 bool post_office::register_route(const unique_int &id,
358     mail_stop &carrier_path)
359 {
360   auto_synchronizer l(c_mutt);
361
362   tagged_mail_stop *found = _routes->find(id);
363   if (found) return false;  // already exists.
364
365   postal_carrier *po = new postal_carrier(*this, id);
366   unique_int thread_id = _threads->add_thread(po, false, NULL_POINTER);
367     // add the thread so we can record its id.
368   tagged_mail_stop *new_stop = new tagged_mail_stop(id, &carrier_path,
369       thread_id);
370   _routes->append(new_stop);
371     // add the mail stop to our listings.
372   po->start(NULL_POINTER);
373     // now start the thread so it can begin cranking.
374   return true;
375 }
376
377 bool post_office::unregister_route(const unique_int &id)
378 {
379   auto_synchronizer l(c_mutt);
380
381   tagged_mail_stop *tag = _routes->find(id);
382   if (!tag) return false;  // doesn't exist yet.
383   unique_int thread_id = tag->_id;
384   _routes->zap(id);
385   _threads->zap_thread(thread_id);
386   return true;
387 }
388
389 } //namespace.
390
391