1 /*****************************************************************************\
4 * Author : Chris Koeritz *
6 *******************************************************************************
7 * Copyright (c) 1998-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
18 #include "post_office.h"
19 #include "thread_cabinet.h"
21 #include <basis/mutex.h>
22 #include <configuration/application_configuration.h>
23 #include <loggers/program_wide_logger.h>
24 #include <structures/set.h>
25 #include <structures/amorph.h>
26 #include <structures/unique_id.h>
27 #include <textual/parser_bits.h>
28 #include <timely/time_stamp.h>
30 using namespace basis;
31 using namespace configuration;
32 using namespace loggers;
33 using namespace structures;
34 using namespace textual;
35 using namespace timely;
39 //#define DEBUG_POST_OFFICE
40 // uncomment if you want the noisy version.
43 #define LOG(a) CLASS_EMERGENCY_LOG(program_wide_logger::get(), a)
45 const int CLEANING_INTERVAL = 14 * SECOND_ms;
46 // the interval between cleaning of extra letters and dead mailboxes.
48 const int SNOOZE_TIME_FOR_POSTMAN = 42;
49 // we'll snooze for this long if absolutely nothing happened during the
50 // thread's activation. if things are going on, our snooze time is reduced
51 // by the length of time we were delivering items.
53 const int DELIVERIES_ALLOWED = 350;
54 // the maximum number of deliveries we'll try to get done per thread run.
58 //hmmm: arrhhh--maybe we need to spawn a thread per postal route.
60 class postal_carrier : public ethread
63 postal_carrier(post_office &parent, const unique_int &route)
64 : ethread(SNOOZE_TIME_FOR_POSTMAN, ethread::SLACK_INTERVAL),
69 DEFINE_CLASS_NAME("postal_carrier");
71 void perform_activity(void *) {
72 FUNCDEF("perform_activity");
75 finished = _parent.deliver_mail_on_route(_route, *this);
77 LOG(astring("caught exception during mail delivery!"));
80 // not finished delivering all items.
83 reschedule(SNOOZE_TIME_FOR_POSTMAN);
94 class postal_cache : public mailbox {};
98 class tagged_mail_stop : public virtual text_formable
102 unique_int _thread_id;
105 tagged_mail_stop(const unique_int &id = 0, mail_stop *route = NULL_POINTER,
106 const unique_int &thread_id = 0)
107 : _route(route), _thread_id(thread_id), _id(id) {}
109 DEFINE_CLASS_NAME("tagged_mail_stop");
111 virtual void text_form(basis::base_string &fill) const {
112 fill.assign(text_form());
115 virtual astring text_form() const {
116 return a_sprintf("%s: id=%d, addr=%08lx, thr_id=%d",
117 static_class_name(), _id.raw_id(), _route, _thread_id.raw_id());
123 class route_map : public amorph<tagged_mail_stop>
126 tagged_mail_stop *find(const unique_int &id) {
127 for (int i = 0; i < elements(); i++) {
128 tagged_mail_stop *curr = borrow(i);
129 if (curr && (curr->_id == id)) return curr;
134 bool zap(const unique_int &id) {
135 for (int i = 0; i < elements(); i++) {
136 tagged_mail_stop *curr = borrow(i);
137 if (curr && (curr->_id == id)) {
138 amorph<tagged_mail_stop>::zap(i, i);
149 class letter_morph : public amorph<letter> {};
153 post_office::post_office()
154 : _post(new mailbox),
155 _routes(new route_map),
156 _next_cleaning(new time_stamp),
157 _threads(new thread_cabinet)
161 post_office::~post_office()
166 WHACK(_next_cleaning);
170 void post_office::show_routes(astring &to_fill)
172 auto_synchronizer l(c_mutt);
173 //hmmm: simplify this; just use the int_set returning func and print that.
174 astring current_line;
176 if (_routes->elements())
177 to_fill += astring("Mail Delivery Routes:") + parser_bits::platform_eol_to_chars();
179 for (int i = 0; i < _routes->elements(); i++) {
180 const tagged_mail_stop *tag = _routes->get(i);
182 temp = astring(astring::SPRINTF, "%d ", tag->_id.raw_id());
183 if (current_line.length() + temp.length() >= 80) {
184 current_line += parser_bits::platform_eol_to_chars();
185 to_fill += current_line;
186 current_line.reset();
188 current_line += temp;
190 // catch the last line we created.
191 if (!!current_line) to_fill += current_line;
194 void post_office::stop_serving() { if (_threads) _threads->stop_all(); }
196 void post_office::show_mail(astring &output)
199 output += parser_bits::platform_eol_to_chars();
200 output += astring("Mailbox Contents at ") + time_stamp::notarize(true)
201 + parser_bits::platform_eol_to_chars() + parser_bits::platform_eol_to_chars();
203 _post->show(box_state);
204 if (box_state.t()) output += box_state;
206 output += astring("No items are awaiting delivery.")
207 + parser_bits::platform_eol_to_chars();
210 void post_office::drop_off(const unique_int &id, letter *package)
212 #ifdef DEBUG_POST_OFFICE
214 LOG(astring(astring::SPRINTF, "mailbox drop for %d: ", id)
215 + package->text_form());
217 _post->drop_off(id, package);
218 #ifdef DEBUG_POST_OFFICE
219 if (!route_listed(id)) {
220 LOG(a_sprintf("letter for %d has no route!", id));
225 bool post_office::pick_up(const unique_int &id, letter * &package)
227 #ifdef DEBUG_POST_OFFICE
230 bool to_return = _post->pick_up(id, package);
231 #ifdef DEBUG_POST_OFFICE
233 LOG(astring(astring::SPRINTF, "mailbox grab for %d: ", id)
234 + package->text_form());
239 bool post_office::route_listed(const unique_int &id)
242 get_route_list(route_set);
243 return route_set.member(id.raw_id());
246 void post_office::get_route_list(int_set &route_set)
248 auto_synchronizer l(c_mutt);
250 // gather the set of routes that we should carry mail to.
253 if (!_routes->elements()) {
254 // if there are no elements, why bother iterating?
258 for (int i = 0; i < _routes->elements(); i++) {
259 const tagged_mail_stop *tag = _routes->get(i);
261 route_set.add(tag->_id.raw_id());
265 void post_office::clean_package_list(post_office &formal(post),
266 letter_morph &to_clean)
268 FUNCDEF("clean_package_list");
269 auto_synchronizer l(c_mutt);
271 // recycle all the stuff we had in the list.
272 while (to_clean.elements()) {
273 letter *package = to_clean.acquire(0);
276 LOG("saw empty package in list to clean!");
283 bool post_office::deliver_mail_on_route(const unique_int &route,
286 FUNCDEF("deliver_mail_on_route");
287 auto_synchronizer l(c_mutt);
289 #ifdef DEBUG_POST_OFFICE
292 if (carrier.should_stop()) return true; // get out if thread was told to.
294 int deliveries = 0; // number of items delivered so far.
295 letter_morph items_for_route;
296 // holds the items that need to be sent to this route.
298 // pickup all of the mail that we can for this route.
299 while (deliveries < DELIVERIES_ALLOWED) {
300 if (carrier.should_stop())
301 return true; // get out if thread was told to.
303 if (!_post->pick_up(route, package)) {
304 // there are no more letters for this route.
305 break; // skip out of the loop.
307 deliveries++; // count this item as a delivery.
308 items_for_route.append(package);
311 if (!items_for_route.elements()) return true; // nothing to handle.
313 // locate the destination for this route.
314 tagged_mail_stop *real_route = _routes->find(route); // find the route.
316 // we failed to find the route we wanted...
317 LOG(astring(astring::SPRINTF, "route %d disappeared!", route.raw_id()));
318 clean_package_list(*this, items_for_route);
322 // now deliver what we found for this route.
323 for (int t = 0; t < items_for_route.elements(); t++) {
324 if (carrier.should_stop()) {
325 // get out if thread was told to.
328 letter *package = items_for_route.acquire(t);
329 // hand the package out on the route.
330 mail_stop::items_to_deliver pack(route, package);
331 real_route->_route->invoke_callback(pack);
332 // the callee is responsible for cleaning up.
335 bool finished_all = (deliveries < DELIVERIES_ALLOWED);
336 // true if we handled everything we could have.
338 if (carrier.should_stop()) return true; // get out if thread was told to.
340 // this bit is for the post office at large, but we don't want an extra
341 // thread when we've got all these others handy.
342 bool cleaning_time = time_stamp() > *_next_cleaning;
344 _post->clean_up(); // get rid of dead mailboxes in main post office.
345 _next_cleaning->reset(CLEANING_INTERVAL);
349 #ifdef DEBUG_POST_OFFICE
350 int duration = int(exit.value() - enter.value());
352 LOG(a_sprintf("deliveries took %d ms.", duration));
357 bool post_office::register_route(const unique_int &id,
358 mail_stop &carrier_path)
360 auto_synchronizer l(c_mutt);
362 tagged_mail_stop *found = _routes->find(id);
363 if (found) return false; // already exists.
365 postal_carrier *po = new postal_carrier(*this, id);
366 unique_int thread_id = _threads->add_thread(po, false, NULL_POINTER);
367 // add the thread so we can record its id.
368 tagged_mail_stop *new_stop = new tagged_mail_stop(id, &carrier_path,
370 _routes->append(new_stop);
371 // add the mail stop to our listings.
372 po->start(NULL_POINTER);
373 // now start the thread so it can begin cranking.
377 bool post_office::unregister_route(const unique_int &id)
379 auto_synchronizer l(c_mutt);
381 tagged_mail_stop *tag = _routes->find(id);
382 if (!tag) return false; // doesn't exist yet.
383 unique_int thread_id = tag->_id;
385 _threads->zap_thread(thread_id);