using namespace timely;
using namespace unit_test;
+// synchronization for logged messages to avoid overwriting on the console.
+SAFE_STATIC(mutex, __loggers_lock, )
+
// our macros for logging (with or without a timestamp).
-#define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(s))
+#define LOG(s) { \
+ auto_synchronizer critical_section(__loggers_lock()); \
+ CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(s)); \
+}
// the base log feature just prints the text to the console with no carriage return or extra flair.
// it does count up how many characters have been printed though, and does an EOL when it seems like it would be reasonable (80 chars-ish).
// terminal so far.
static int chars_printed = 0;
const int MAXIMUM_CHARS_PER_LINE = 108;
-//hmmm: may want to make the line size selectable, if we keep some version of this code around.
SAFE_STATIC(console_logger, ted, );
-SAFE_STATIC(mutex, __teds_lock, )
#define BASE_LOG(s) { \
- auto_synchronizer critical_section(__teds_lock()); \
+ auto_synchronizer critical_section(__loggers_lock()); \
/* we set the eol every time, since console_logger constructor doesn't currently provide. */ \
ted().eol(parser_bits::NO_ENDING); \
astring joe(s); \
chars_printed += len; \
ted().log(joe, basis::ALWAYS_PRINT); \
}
+//hmmm: may want to make the line size selectable, if we keep some version of the above line handling code around.
// protects our logging stream really, by keeping all the threads chomping at the bit rather
// than running right away. when this flag switches to true, then *bam* they're off.
private:
bool _value;
};
-SAFE_STATIC(bool_scared_ya, __time_to_start, (false));
+SAFE_STATIC(bool_scared_ya, __threads_can_run_wild_and_free, (false));
// global constants...
const int MAX_TIDIER_THREAD_PAUSE = 500;
// monk is kept asleep most of the time or he'd be trashing all our data too frequently.
-const int MIN_MONK_THREAD_PAUSE = 42 * SECOND_ms;
-const int MAX_MONK_THREAD_PAUSE = 64 * SECOND_ms;
+const int MIN_MONK_THREAD_PAUSE = 14 * SECOND_ms;
+const int MAX_MONK_THREAD_PAUSE = 28 * SECOND_ms;
// the range of new items added whenever the creator or destroyer threads are hit.
const int MINIMUM_ITEMS_HANDLED = 1;
class ballot_box_stuffer : public ethread
{
public:
- ballot_box_stuffer() : ethread(MIN_ADDER_THREAD_PAUSE) {
+ ballot_box_stuffer() : ethread(MIN_ADDER_THREAD_PAUSE, ethread::TIGHT_INTERVAL) {
FUNCDEF("constructor");
- LOG("+creator");
+ LOG(">> new creator >>");
}
virtual ~ballot_box_stuffer() {
FUNCDEF("destructor");
- LOG("~creator");
+ LOG("<< creator exits <<");
}
DEFINE_CLASS_NAME("ballot_box_stuffer");
void perform_activity(void *formal(data)) {
FUNCDEF("perform_activity");
- if (!__time_to_start()) return; // starting gun hasn't fired yet.
+ if (!__threads_can_run_wild_and_free()) { sleep_time(MIN_ADDER_THREAD_PAUSE); return; }
// add a new item to the cache.
int how_many = randomizer().inclusive(MINIMUM_ITEMS_HANDLED,
MAXIMUM_ITEMS_HANDLED);
for (int q = 0; q < string_count; q++) {
random_strings += string_manipulation::make_random_name();
}
+ // check exit sentry again just before adding.
+ if (!__threads_can_run_wild_and_free()) { sleep_time(MIN_ADDER_THREAD_PAUSE); return; }
unhandled_request *newbert = new unhandled_request(create_request_id(),
random_strings);
BASE_LOG("+");
class vote_destroyer : public ethread
{
public:
- vote_destroyer() : ethread(MIN_WHACKER_THREAD_PAUSE) {
+ vote_destroyer() : ethread(MIN_WHACKER_THREAD_PAUSE, ethread::TIGHT_INTERVAL) {
FUNCDEF("constructor");
- BASE_LOG("+destroyer");
+ LOG(">> new destroyer >>");
}
virtual ~vote_destroyer() {
FUNCDEF("destructor");
- LOG("~destroyer");
+ LOG("<< destroyer exits <<");
}
DEFINE_CLASS_NAME("vote_destroyer");
void perform_activity(void *formal(data)) {
FUNCDEF("perform_activity");
- if (!__time_to_start()) return; // starting gun hasn't fired yet.
+ if (!__threads_can_run_wild_and_free()) { sleep_time(MIN_WHACKER_THREAD_PAUSE); return; }
int how_many = randomizer().inclusive(MINIMUM_ITEMS_HANDLED,
MAXIMUM_ITEMS_HANDLED);
for (int i = 0; i < how_many; i++) {
+ // check exit sentry again just before removing.
+ if (!__threads_can_run_wild_and_free()) { sleep_time(MIN_WHACKER_THREAD_PAUSE); return; }
// snag any old item and drop it on the floor.
octopus_request_id id;
infoton *found = binger.acquire_for_any(id);
class obsessive_compulsive : public ethread
{
public:
- obsessive_compulsive() : ethread(0) {
+ obsessive_compulsive() : ethread(MIN_TIDIER_THREAD_PAUSE, ethread::TIGHT_INTERVAL) {
FUNCDEF("constructor");
- LOG("+cleaner");
+ LOG("<< new cleaner <<");
}
virtual ~obsessive_compulsive() {
FUNCDEF("destructor");
- LOG("~cleaner");
+ LOG(">> cleaner exits >>");
}
DEFINE_CLASS_NAME("obsessive_compulsive");
void perform_activity(void *formal(data)) {
FUNCDEF("perform_activity");
- if (!__time_to_start()) return; // starting gun hasn't fired yet.
+ if (!__threads_can_run_wild_and_free()) { sleep_time(MIN_TIDIER_THREAD_PAUSE); return; }
// make sure there's nothing rotting too long.
binger.clean_out_deadwood(DATA_DECAY_TIME);
// snooze.
class monk_the_detective : public ethread
{
public:
- monk_the_detective() : ethread(0) {
+ monk_the_detective() : ethread(MIN_MONK_THREAD_PAUSE, ethread::TIGHT_INTERVAL) {
FUNCDEF("constructor");
- LOG("+monk");
+ LOG(">> new monk >>");
}
virtual ~monk_the_detective() {
FUNCDEF("destructor");
- LOG("~monk");
+ LOG("<< monk exits <<");
}
DEFINE_CLASS_NAME("monk_the_detective");
void perform_activity(void *formal(data)) {
FUNCDEF("perform_activity");
- if (!__time_to_start()) return; // starting gun hasn't fired yet.
+ if (!__threads_can_run_wild_and_free()) { sleep_time(MIN_MONK_THREAD_PAUSE); return; }
// one activation of monk has devastating consequences. we empty out
// the data one item at a time until we see no data at all. after
// cleaning each item, we ensure that the deadwood is cleaned out.
auto_synchronizer l(binger.locker());
LOG(a_sprintf("monk sees %d items and will clean them all.", binger.items_held()));
+ int check_count = 0;
while (binger.items_held()) {
+ // check exit sentry again just before obsessing over cleanliness.
+ if (!__threads_can_run_wild_and_free()) { sleep_time(MIN_MONK_THREAD_PAUSE); return; }
// grab one instance of any item in the bin.
octopus_request_id id;
infoton *found = binger.acquire_for_any(id);
if (!found) break; // nothing to see here.
+ check_count++;
BASE_LOG("-");
WHACK(found);
// also clean out things a lot faster than normal.
binger.clean_out_deadwood(MONKS_CLEANING_TIME);
+//hmmm: interesting--deadwood cleaning above will possibly cause us not to clean out the number of items reported above for items_held().
}
-LOG(a_sprintf("after a little light cleaning, monk sees %d items.", binger.items_held()));
+LOG(a_sprintf("monk manually cleaned %d items very carefully...", check_count));
+LOG(a_sprintf("after this little light cleaning, monk sees %d items held.", binger.items_held()));
// snooze.
int sleepy_time = randomizer().inclusive(MIN_MONK_THREAD_PAUSE, MAX_MONK_THREAD_PAUSE);
// reschedule the thread for the new snooze. and note how we are not actually stuck waiting
}
}
+ /* we could use a thread_cabinet here, but it's kind of funny that we're using a bare amorph and
+ that its built-in reset() method is shutting down all the threads for us. so leaving this as
+ a nice example. */
amorph<ethread> thread_list;
for (int i = 0; i < DEFAULT_THREADS; i++) {
}
// set our sentinel variable to allow the threads to run now.
- __time_to_start() = true;
+ __threads_can_run_wild_and_free() = true;
time_stamp when_to_leave(duration);
while (when_to_leave > time_stamp()) {
time_control::sleep_ms(20);
}
+ __threads_can_run_wild_and_free() = false;
+
+#ifdef FANCY_UNNECESSARY_THREAD_STOP
+ //hmmm: this code shouldn't be needed! thread cabinet should do it!!!!
LOG("now cancelling all threads...");
-//hmmm: this code shouldn't be needed! thread cabinet should do it!!!!
-///for (int j = 0; j < thread_list.elements(); j++) thread_list[j]->cancel();
-///LOG("now stopping all threads....");
-///for (int k = 0; k < thread_list.elements(); k++) thread_list[k]->stop();
-///LOG("resetting thread list....");
- thread_list.reset(); // should whack all threads.
+ for (int j = 0; j < thread_list.elements(); j++) { thread_list[j]->cancel(); }
+ LOG("now stopping all threads...");
+ for (int k = 0; k < thread_list.elements(); k++) { thread_list[k]->stop(); }
+#endif
- LOG("done exiting from all threads.");
+ LOG("now resetting thread list...");
+ thread_list.reset(); // should whack all threads.
+ LOG("...done exiting from all threads.");
//report the results:
// how many objects created.