feisty meow concerns codebase  2.140
post_office.cpp
Go to the documentation of this file.
1 /*****************************************************************************\
2 * *
3 * Name : post_office *
4 * Author : Chris Koeritz *
5 * *
6 *******************************************************************************
7 * Copyright (c) 1998-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
14 
15 #include "ethread.h"
16 #include "letter.h"
17 #include "mailbox.h"
18 #include "post_office.h"
19 #include "thread_cabinet.h"
20 
21 #include <basis/mutex.h>
24 #include <structures/set.h>
25 #include <structures/amorph.h>
26 #include <structures/unique_id.h>
27 #include <textual/parser_bits.h>
28 #include <timely/time_stamp.h>
29 
30 using namespace basis;
31 using namespace configuration;
32 using namespace loggers;
33 using namespace structures;
34 using namespace textual;
35 using namespace timely;
36 
37 namespace processes {
38 
39 //#define DEBUG_POST_OFFICE
40  // uncomment if you want the noisy version.
41 
42 #undef LOG
43 #define LOG(a) CLASS_EMERGENCY_LOG(program_wide_logger::get(), a)
44 
45 const int CLEANING_INTERVAL = 14 * SECOND_ms;
46  // the interval between cleaning of extra letters and dead mailboxes.
47 
48 const int SNOOZE_TIME_FOR_POSTMAN = 42;
49  // we'll snooze for this long if absolutely nothing happened during the
50  // thread's activation. if things are going on, our snooze time is reduced
51  // by the length of time we were delivering items.
52 
53 const int DELIVERIES_ALLOWED = 350;
54  // the maximum number of deliveries we'll try to get done per thread run.
55 
57 
58 //hmmm: arrhhh--maybe we need to spawn a thread per postal route.
59 
60 class postal_carrier : public ethread
61 {
62 public:
63  postal_carrier(post_office &parent, const unique_int &route)
64  : ethread(SNOOZE_TIME_FOR_POSTMAN, ethread::SLACK_INTERVAL),
65  _parent(parent),
66  _route(route)
67  {}
68 
69  DEFINE_CLASS_NAME("postal_carrier");
70 
71  void perform_activity(void *) {
72  FUNCDEF("perform_activity");
73  bool finished;
74  try {
75  finished = _parent.deliver_mail_on_route(_route, *this);
76  } catch(...) {
77  LOG(astring("caught exception during mail delivery!"));
78  }
79  if (!finished) {
80  // not finished delivering all items.
81  reschedule();
82  } else {
83  reschedule(SNOOZE_TIME_FOR_POSTMAN);
84  }
85  }
86 
87 private:
88  post_office &_parent;
89  unique_int _route;
90 };
91 
93 
94 class postal_cache : public mailbox {};
95 
97 
98 class tagged_mail_stop : public virtual text_formable
99 {
100 public:
101  mail_stop *_route;
102  unique_int _thread_id;
103  unique_int _id;
104 
105  tagged_mail_stop(const unique_int &id = 0, mail_stop *route = NULL_POINTER,
106  const unique_int &thread_id = 0)
107  : _route(route), _thread_id(thread_id), _id(id) {}
108 
109  DEFINE_CLASS_NAME("tagged_mail_stop");
110 
111  virtual void text_form(basis::base_string &fill) const {
112  fill.assign(text_form());
113  }
114 
115  virtual astring text_form() const {
116  return a_sprintf("%s: id=%d, addr=%08lx, thr_id=%d",
117  static_class_name(), _id.raw_id(), _route, _thread_id.raw_id());
118  }
119 };
120 
122 
123 class route_map : public amorph<tagged_mail_stop>
124 {
125 public:
126  tagged_mail_stop *find(const unique_int &id) {
127  for (int i = 0; i < elements(); i++) {
128  tagged_mail_stop *curr = borrow(i);
129  if (curr && (curr->_id == id)) return curr;
130  }
131  return NULL_POINTER;
132  }
133 
134  bool zap(const unique_int &id) {
135  for (int i = 0; i < elements(); i++) {
136  tagged_mail_stop *curr = borrow(i);
137  if (curr && (curr->_id == id)) {
139  return true;
140  }
141  }
142  return false;
143  }
144 
145 };
146 
148 
149 class letter_morph : public amorph<letter> {};
150 
152 
153 post_office::post_office()
154 : _post(new mailbox),
155  _routes(new route_map),
156  _next_cleaning(new time_stamp),
157  _threads(new thread_cabinet)
158 {
159 }
160 
162 {
163  stop_serving();
164  WHACK(_post);
165  WHACK(_routes);
166  WHACK(_next_cleaning);
167  WHACK(_threads);
168 }
169 
171 {
172  auto_synchronizer l(c_mutt);
173 //hmmm: simplify this; just use the int_set returning func and print that.
174  astring current_line;
175  astring temp;
176  if (_routes->elements())
177  to_fill += astring("Mail Delivery Routes:") + parser_bits::platform_eol_to_chars();
178 
179  for (int i = 0; i < _routes->elements(); i++) {
180  const tagged_mail_stop *tag = _routes->get(i);
181  if (!tag) continue;
182  temp = astring(astring::SPRINTF, "%d ", tag->_id.raw_id());
183  if (current_line.length() + temp.length() >= 80) {
184  current_line += parser_bits::platform_eol_to_chars();
185  to_fill += current_line;
186  current_line.reset();
187  }
188  current_line += temp;
189  }
190  // catch the last line we created.
191  if (!!current_line) to_fill += current_line;
192 }
193 
194 void post_office::stop_serving() { if (_threads) _threads->stop_all(); }
195 
197 {
198  output.reset();
199  output += parser_bits::platform_eol_to_chars();
200  output += astring("Mailbox Contents at ") + time_stamp::notarize(true)
201  + parser_bits::platform_eol_to_chars() + parser_bits::platform_eol_to_chars();
202  astring box_state;
203  _post->show(box_state);
204  if (box_state.t()) output += box_state;
205  else
206  output += astring("No items are awaiting delivery.")
207  + parser_bits::platform_eol_to_chars();
208 }
209 
210 void post_office::drop_off(const unique_int &id, letter *package)
211 {
212 #ifdef DEBUG_POST_OFFICE
213  FUNCDEF("drop_off");
214  LOG(astring(astring::SPRINTF, "mailbox drop for %d: ", id)
215  + package->text_form());
216 #endif
217  _post->drop_off(id, package);
218 #ifdef DEBUG_POST_OFFICE
219  if (!route_listed(id)) {
220  LOG(a_sprintf("letter for %d has no route!", id));
221  }
222 #endif
223 }
224 
225 bool post_office::pick_up(const unique_int &id, letter * &package)
226 {
227 #ifdef DEBUG_POST_OFFICE
228  FUNCDEF("pick_up");
229 #endif
230  bool to_return = _post->pick_up(id, package);
231 #ifdef DEBUG_POST_OFFICE
232  if (to_return)
233  LOG(astring(astring::SPRINTF, "mailbox grab for %d: ", id)
234  + package->text_form());
235 #endif
236  return to_return;
237 }
238 
240 {
241  int_set route_set;
242  get_route_list(route_set);
243  return route_set.member(id.raw_id());
244 }
245 
246 void post_office::get_route_list(int_set &route_set)
247 {
248  auto_synchronizer l(c_mutt);
249 
250  // gather the set of routes that we should carry mail to.
251  route_set.reset();
252 
253  if (!_routes->elements()) {
254  // if there are no elements, why bother iterating?
255  return;
256  }
257 
258  for (int i = 0; i < _routes->elements(); i++) {
259  const tagged_mail_stop *tag = _routes->get(i);
260  if (!tag) continue;
261  route_set.add(tag->_id.raw_id());
262  }
263 }
264 
265 void post_office::clean_package_list(post_office &formal(post),
266  letter_morph &to_clean)
267 {
268  FUNCDEF("clean_package_list");
269  auto_synchronizer l(c_mutt);
270 
271  // recycle all the stuff we had in the list.
272  while (to_clean.elements()) {
273  letter *package = to_clean.acquire(0);
274  to_clean.zap(0, 0);
275  if (!package) {
276  LOG("saw empty package in list to clean!");
277  continue;
278  }
279  WHACK(package);
280  }
281 }
282 
284  ethread &carrier)
285 {
286  FUNCDEF("deliver_mail_on_route");
287  auto_synchronizer l(c_mutt);
288 
289 #ifdef DEBUG_POST_OFFICE
290  time_stamp enter;
291 #endif
292  if (carrier.should_stop()) return true; // get out if thread was told to.
293 
294  int deliveries = 0; // number of items delivered so far.
295  letter_morph items_for_route;
296  // holds the items that need to be sent to this route.
297 
298  // pickup all of the mail that we can for this route.
299  while (deliveries < DELIVERIES_ALLOWED) {
300  if (carrier.should_stop())
301  return true; // get out if thread was told to.
302  letter *package;
303  if (!_post->pick_up(route, package)) {
304  // there are no more letters for this route.
305  break; // skip out of the loop.
306  }
307  deliveries++; // count this item as a delivery.
308  items_for_route.append(package);
309  }
310 
311  if (!items_for_route.elements()) return true; // nothing to handle.
312 
313  // locate the destination for this route.
314  tagged_mail_stop *real_route = _routes->find(route); // find the route.
315  if (!real_route) {
316  // we failed to find the route we wanted...
317  LOG(astring(astring::SPRINTF, "route %d disappeared!", route.raw_id()));
318  clean_package_list(*this, items_for_route);
319  return true;
320  }
321 
322  // now deliver what we found for this route.
323  for (int t = 0; t < items_for_route.elements(); t++) {
324  if (carrier.should_stop()) {
325  // get out if thread was told to.
326  return true;
327  }
328  letter *package = items_for_route.acquire(t);
329  // hand the package out on the route.
330  mail_stop::items_to_deliver pack(route, package);
331  real_route->_route->invoke_callback(pack);
332  // the callee is responsible for cleaning up.
333  }
334 
335  bool finished_all = (deliveries < DELIVERIES_ALLOWED);
336  // true if we handled everything we could have.
337 
338  if (carrier.should_stop()) return true; // get out if thread was told to.
339 
340  // this bit is for the post office at large, but we don't want an extra
341  // thread when we've got all these others handy.
342  bool cleaning_time = time_stamp() > *_next_cleaning;
343  if (cleaning_time) {
344  _post->clean_up(); // get rid of dead mailboxes in main post office.
345  _next_cleaning->reset(CLEANING_INTERVAL);
346  }
347 
348  time_stamp exit;
349 #ifdef DEBUG_POST_OFFICE
350  int duration = int(exit.value() - enter.value());
351  if (duration > 20)
352  LOG(a_sprintf("deliveries took %d ms.", duration));
353 #endif
354  return finished_all;
355 }
356 
358  mail_stop &carrier_path)
359 {
360  auto_synchronizer l(c_mutt);
361 
362  tagged_mail_stop *found = _routes->find(id);
363  if (found) return false; // already exists.
364 
365  postal_carrier *po = new postal_carrier(*this, id);
366  unique_int thread_id = _threads->add_thread(po, false, NULL_POINTER);
367  // add the thread so we can record its id.
368  tagged_mail_stop *new_stop = new tagged_mail_stop(id, &carrier_path,
369  thread_id);
370  _routes->append(new_stop);
371  // add the mail stop to our listings.
372  po->start(NULL_POINTER);
373  // now start the thread so it can begin cranking.
374  return true;
375 }
376 
378 {
379  auto_synchronizer l(c_mutt);
380 
381  tagged_mail_stop *tag = _routes->find(id);
382  if (!tag) return false; // doesn't exist yet.
383  unique_int thread_id = tag->_id;
384  _routes->zap(id);
385  _threads->zap_thread(thread_id);
386  return true;
387 }
388 
389 } //namespace.
390 
391 
a_sprintf is a specialization of astring that provides printf style support.
Definition: astring.h:440
void reset(int number=0, const contents *initial_contents=NULL_POINTER)
Resizes this array and sets the contents from an array of contents.
Definition: array.h:349
Provides a dynamically resizable ASCII character string.
Definition: astring.h:35
bool t() const
t() is a shortcut for the string being "true", as in non-empty.
Definition: astring.h:97
void reset()
clears out the contents string.
Definition: astring.h:202
int length() const
Returns the current length of the string.
Definition: astring.cpp:132
auto_synchronizer simplifies concurrent code by automatically unlocking.
Definition: mutex.h:113
Defines the base class for all string processing objects in hoople.
Definition: base_string.h:28
virtual base_string & assign(const base_string &s)=0
Sets the contents of this string to "s".
A base class for objects that can provide a synopsis of their current state.
Definition: contracts.h:142
Provides a platform-independent object for adding threads to a program.
Definition: ethread.h:36
bool should_stop() const
reports whether the thread should stop right now.
Definition: ethread.h:136
A virtual base class for pieces of "mail". Used by the mailbox object.
Definition: letter.h:25
virtual void text_form(basis::base_string &fill) const =0
derived letters must print a status blurb describing their contents.
Base class for routes on which letters are automatically delivered.
Definition: mail_stop.h:34
Implements a thread safe "mail" delivery system.
Definition: mailbox.h:37
void drop_off(const structures::unique_int &id, letter *package)
drops a "package" in the mailbox for "id".
Definition: mailbox.cpp:166
bool pick_up(const structures::unique_int &id, letter *&package)
returns true if the mailbox for "id" had a "package" to be delivered.
Definition: mailbox.cpp:189
void clean_up()
removes any empty mailboxes from our list.
Definition: mailbox.cpp:172
void show(basis::astring &to_fill)
provides a picture of what's waiting in the mailbox.
Definition: mailbox.cpp:203
Manages a collection of mailboxes and implements delivery routes for mail.
Definition: post_office.h:35
bool pick_up(const structures::unique_int &id, letter *&package)
retrieves a "package" intended for the "id" if one exists.
bool register_route(const structures::unique_int &id, mail_stop &carrier_path)
registers a route "carrier_path" for mail deliveries to the "id".
bool unregister_route(const structures::unique_int &id)
removes a route for the "id".
virtual ~post_office()
stop_serving must be invoked prior to this destructor.
void show_routes(basis::astring &to_fill)
writes a listing of the current routes into "to_fill".
void show_mail(basis::astring &to_fill)
prints a snapshot of all currently pending letters into "to_fill".
void stop_serving()
gets the mailbox to stop delivering items prior to a shutdown.
void drop_off(const structures::unique_int &id, letter *package)
sends a "package" on its way to the "id" via the registered route.
bool route_listed(const structures::unique_int &id)
returns true if there is a route listed for the "id".
bool deliver_mail_on_route(const structures::unique_int &route, ethread &carrier)
for internal use only–delivers the letters to known routes.
Manages a collection of threads.
bool zap_thread(const structures::unique_int &to_whack)
removes the thread with the id "to_whack".
structures::unique_int add_thread(ethread *to_add, bool start_it, void *parm)
adds a thread to be managed by the thread_cabinet.
void stop_all()
makes all of the threads quit.
A simple object that wraps a templated set of ints.
Definition: set.h:156
bool member(const contents &to_test) const
Returns true if the item "to_test" is a member of this set.
Definition: set.h:223
bool add(const contents &to_add)
Adds a new element "to_add" to the set.
Definition: set.h:232
uniquifier raw_id() const
Returns the held identifier in its native form.
Definition: unique_id.h:57
A unique identifier based on integers.
Definition: unique_id.h:97
Represents a point in time relative to the operating system startup time.
Definition: time_stamp.h:38
void reset()
sets the stamp time back to now.
Definition: time_stamp.cpp:59
time_representation value() const
returns the time_stamp in terms of the lower level type.
Definition: time_stamp.h:61
#define formal(parameter)
This macro just eats what it's passed; it marks unused formal parameters.
Definition: definitions.h:48
#define NULL_POINTER
The value representing a pointer to nothing.
Definition: definitions.h:32
#define DEFINE_CLASS_NAME(objname)
Defines the name of a class by providing a couple standard methods.
Definition: enhance_cpp.h:45
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
Definition: enhance_cpp.h:57
The guards collection helps in testing preconditions and reporting errors.
Definition: array.h:30
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
Definition: functions.h:121
const int SECOND_ms
Number of milliseconds in a second.
Definition: definitions.h:120
A logger that sends to the console screen using the standard output device.
const int CLEANING_INTERVAL
Definition: post_office.cpp:45
const int SNOOZE_TIME_FOR_POSTMAN
Definition: post_office.cpp:48
const int DELIVERIES_ALLOWED
Definition: post_office.cpp:53
A dynamic container class that holds any kind of object via pointers.
Definition: amorph.h:55
void pack(basis::byte_array &packed_form, const set< contents > &to_pack)
provides a way to pack any set that stores packable objects.
Definition: set.h:131
#include <time.h>
Definition: earth_time.cpp:37
#define LOG(a)
Definition: post_office.cpp:43
#define static_class_name()