#include "crompish_pax.h"
-#include <basis/chaos.h>
-#include <basis/istring.h>
-#include <basis/portable.h>
-#include <basis/set.h>
+#include <mathematics/chaos.h>
+#include <basis/astring.h>
+
+#include <structures/set.h>
#include <cromp/cromp_client.h>
-#include <mechanisms/ithread.h>
-#include <mechanisms/thread_cabinet.h>
-#include <mechanisms/throughput_counter.h>
+#include <processes/ethread.h>
+#include <processes/thread_cabinet.h>
+#include <sockets/throughput_counter.h>
#include <octopus/entity_data_bin.h>
#include <octopus/entity_defs.h>
#include <octopus/infoton.h>
-#include <opsystem/application_shell.h>
-#include <opsystem/command_line.h>
+#include <application/application_shell.h>
+#include <application/command_line.h>
#include <loggers/console_logger.h>
#include <loggers/file_logger.h>
-#include <opsystem/filename.h>
-#include <opsystem/rendezvous.h>
-#include <data_struct/static_memory_gremlin.h>
-#include <sockets/address.h>
+#include <filesystem/filename.h>
+#include <processes/rendezvous.h>
+#include <structures/static_memory_gremlin.h>
+#include <sockets/internet_address.h>
#include <stdlib.h>
#undef LOG
-#define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger(), s)
+#define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
#define DEBUG_TESTER
// uncomment for noisier version.
const int CHANCE_OF_RECONSTRUCT = 14;
// how frequently a bus reconstruction occurs, in 1000.
-#define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger(), s)
-#define BASE_LOG(s) EMERGENCY_LOG(program_wide_logger(), s)
+#define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
+#define BASE_LOG(s) EMERGENCY_LOG(program_wide_logger::get(), s)
class cromp_client_tester : public application_shell
{
virtual int execute();
- IMPLEMENT_CLASS_NAME("cromp_client_tester");
+ DEFINE_CLASS_NAME("cromp_client_tester");
- void bite_server(basis::set<octopus_request_id> &ids,
- basis::set<octopus_request_id> &delinquents, void *originator);
+ void bite_server(structures::set<octopus_request_id> &ids,
+ structures::set<octopus_request_id> &delinquents, void *originator);
// performs the big chunk of testing. the "ids" are the history of the
// sends that were made and they're managed by this method. the
// "originator" is a tag we can use to generate unique print outs.
FUNCDEF("increment_thread_count");
auto_synchronizer l(*_lock);
_threads_active++;
-//LOG(isprintf("count now %d", _threads_active));
+//LOG(a_sprintf("count now %d", _threads_active));
}
void decrement_thread_count() {
FUNCDEF("decrement_thread_count");
auto_synchronizer l(*_lock);
_threads_active--;
-//LOG(isprintf("count now %d", _threads_active));
+//LOG(a_sprintf("count now %d", _threads_active));
}
void report(const time_stamp &start_time, double bytes_transmitted,
bool _reconstruct_object; //!< true if we periodically tear down object.
internet_address _server_loc; //!< holds onto the requested address.
- void look_for_receipts(int count, basis::set<octopus_request_id> &ids,
- basis::set<octopus_request_id> &delinquents, bool wait = false);
+ void look_for_receipts(int count, structures::set<octopus_request_id> &ids,
+ structures::set<octopus_request_id> &delinquents, bool wait = false);
// attempts to get "count" items from the list of "ids".
};
-////////////////////////////////////////////////////////////////////////////
+//////////////
-class bitey_thread : public ithread
+class bitey_thread : public ethread
{
public:
bitey_thread(cromp_client_tester &parent)
- : ithread(), _parent(parent) {}
+ : ethread(), _parent(parent) {}
void perform_activity(void *formal(ptr)) {
FUNCDEF("perform_activity");
private:
cromp_client_tester &_parent;
- basis::set<octopus_request_id> _ids; // the ids for commands we've sent.
- basis::set<octopus_request_id> _delinquents; // missing ids during rcv.
+ structures::set<octopus_request_id> _ids; // the ids for commands we've sent.
+ structures::set<octopus_request_id> _delinquents; // missing ids during rcv.
};
-////////////////////////////////////////////////////////////////////////////
+//////////////
//hmmm: next stop; inject the types of items they're expecting in grab_items.
-class grabby_thread : public ithread
+class grabby_thread : public ethread
{
public:
grabby_thread(cromp_client_tester &parent)
- : ithread(), _parent(parent) {}
+ : ethread(), _parent(parent) {}
void perform_activity(void *formal(ptr)) {
while (!should_stop()) {
_parent.grab_items();
if (_rando.inclusive(0, 100) > 10)
- portable::sleep_ms(_rando.inclusive(5, 38));
+ time_control::sleep_ms(_rando.inclusive(5, 38));
}
}
chaos _rando;
};
-////////////////////////////////////////////////////////////////////////////
+//////////////
cromp_client_tester::cromp_client_tester()
: application_shell("cromp_client_tester"),
LOG("");
LOG("");
- command_line args(__argc, __argv);
-//LOG(isprintf("argc is %d and first is %s", __argc, __argv[0]));
+ command_line args(application::_global_argc, application::_global_argv);
+//LOG(a_sprintf("argc is %d and first is %s", application::_global_argc, application::_global_argv[0]));
int indy = 0;
if (args.find("help", indy, false)
}
// check for a port on the command line.
- istring port_text;
+ astring port_text;
int port = 5678;
if (args.get_value("port", port_text, false)) {
- LOG(istring("using port: ") + port_text);
+ LOG(astring("using port: ") + port_text);
port = port_text.convert(5678);
}
_server_loc.port = port;
}
// check for a hostname on the command line.
- istring hostname("local");
- istring host_temp;
+ astring hostname("local");
+ astring host_temp;
if (args.get_value("host", host_temp, false)) {
- LOG(istring("using host: ") + host_temp);
+ LOG(astring("using host: ") + host_temp);
hostname = host_temp;
}
-LOG(istring("using host: ") + hostname);
+LOG(astring("using host: ") + hostname);
strcpy(_server_loc.hostname, hostname.s());
- istring send_temp;
+ astring send_temp;
int send_count = MAXIMUM_SENDS;
if (args.get_value("sends", send_temp, false)) {
- LOG(istring("using send count: ") + send_temp);
+ LOG(astring("using send count: ") + send_temp);
send_count = send_temp.convert(send_count);
if (send_count <= 0) send_count = 1;
}
_send_count = send_count;
- istring thread_temp;
+ astring thread_temp;
int thread_count = NUMBER_OF_THREADS;
if (args.get_value("threads", thread_temp, false)) {
- LOG(istring("using thread count: ") + thread_temp);
+ LOG(astring("using thread count: ") + thread_temp);
thread_count = thread_temp.convert(thread_count);
if (thread_count <= 0) thread_count = 1;
}
_thread_count = thread_count;
- istring grabber_temp;
+ astring grabber_temp;
int grabber_count = GRABBER_THREADS;
if (args.get_value("grab", grabber_temp, false)) {
- LOG(istring("using grabber count: ") + grabber_temp);
+ LOG(astring("using grabber count: ") + grabber_temp);
grabber_count = grabber_temp.convert(grabber_count);
if (grabber_count < 0) grabber_count = 0;
}
_grabber_count = grabber_count;
- istring send_tries_temp;
+ astring send_tries_temp;
int send_tries = MAX_SEND_TRIES;
if (args.get_value("trysend", send_tries_temp, false)) {
- LOG(istring("using send tries: ") + send_tries_temp);
+ LOG(astring("using send tries: ") + send_tries_temp);
send_tries = send_tries_temp.convert(send_tries);
if (send_tries < 0) send_tries = 0;
}
//hmmm: how tiresome. how about a macro here? could help in general
// with command_line also.
- istring checkpoint_temp;
+ astring checkpoint_temp;
int checkpoint_count = CHECKPOINT_SIZE;
if (args.get_value("print", checkpoint_temp, false)) {
- LOG(istring("using checkpoint count: ") + checkpoint_temp);
+ LOG(astring("using checkpoint count: ") + checkpoint_temp);
checkpoint_count = checkpoint_temp.convert(checkpoint_count);
if (checkpoint_count <= 0) checkpoint_count = 1;
}
_checkpoint_count = checkpoint_count;
- istring dataseg_temp;
+ astring dataseg_temp;
int dataseg_size = DATA_SEGMENT_SIZE;
if (args.get_value("dataseg", dataseg_temp, false)) {
- LOG(istring("using dataseg size: ") + dataseg_temp);
+ LOG(astring("using dataseg size: ") + dataseg_temp);
dataseg_size = dataseg_temp.convert(dataseg_size);
if (dataseg_size < 0) dataseg_size = 0;
}
_dataseg_size = dataseg_size;
- istring report_temp;
+ astring report_temp;
int report_interval = REPORTING_INTERVAL;
if (args.get_value("report", report_temp, false)) {
- LOG(istring("using report interval: ") + report_temp);
+ LOG(astring("using report interval: ") + report_temp);
report_interval = report_temp.convert(report_interval);
if (report_interval <= 0) report_interval = 1;
report_interval *= SECOND_ms; // convert to milliseconds.
}
_report_interval = report_interval;
- istring snooze_temp;
+ astring snooze_temp;
int snooze_duration = 0; // no snooze by default.
if (args.get_value("snooze", snooze_temp, false)) {
- LOG(istring("using snooze duration: ") + snooze_temp);
+ LOG(astring("using snooze duration: ") + snooze_temp);
snooze_duration = snooze_temp.convert(snooze_duration);
if (snooze_duration < 0) snooze_duration = 0;
}
_reconstruct_object = true;
}
-LOG(istring("opening at ") + _server_loc.text_form());
+LOG(astring("opening at ") + _server_loc.text_form());
_uplink = new cromp_client(_server_loc);
_uplink->add_tentacle(new bubbles_tentacle(false));
int cromp_client_tester::print_instructions()
{
- istring name = filename(__argv[0]).basename().raw();
- log(isprintf("%s usage:", name.s()));
+ astring name = filename(application::_global_argv[0]).basename().raw();
+ log(a_sprintf("%s usage:", name.s()));
log("");
- log(isprintf("\
+ log(a_sprintf("\
This program connects to a cromp test server and exchanges packets to test\n\
the performance of the cromp protocol. All command line flags are optional\n\
but can be added to specify how the test should be performed. Currently,\n\
}
void cromp_client_tester::look_for_receipts(int count,
- basis::set<octopus_request_id> &ids,
- basis::set<octopus_request_id> &delinquents, bool wait)
+ structures::set<octopus_request_id> &ids,
+ structures::set<octopus_request_id> &delinquents, bool wait)
{
FUNCDEF("look_for_receipts");
infoton *received = NIL;
int acquire_duration = int(time_stamp().value() - start_acquire.value());
if (acquire_duration >= MAXIMUM_ACQUISITION_DELAY - 1) {
LOG("passed time limit for acquire! this is the faux dual-cpu bug!");
- LOG(isprintf("there were %d items left to acquire.", count));
- LOG(isprintf("pending %d bytes to send, %d bytes accumulated.",
+ LOG(a_sprintf("there were %d items left to acquire.", count));
+ LOG(a_sprintf("pending %d bytes to send, %d bytes accumulated.",
_uplink->pending_sends(), _uplink->accumulated_bytes()));
- LOG(isprintf("the data bin had %d items awaiting pickup.",
+ LOG(a_sprintf("the data bin had %d items awaiting pickup.",
_uplink->octo()->responses().items_held()));
if (ret != cromp_client::TIMED_OUT) {
LOG("cromp client lied about outcome?? didn't call this timed out!!");
if (ret != cromp_client::OKAY) {
if (ret != cromp_client::TIMED_OUT) {
- LOG(istring("failed to acquire the response--got error ")
+ LOG(astring("failed to acquire the response--got error ")
+ cromp_client::outcome_name(ret));
// give it another chance later.
ids += the_id;
-LOG(isprintf("moved %s back to main id queue.", the_id.text_form().s()));
+LOG(a_sprintf("moved %s back to main id queue.", the_id.text_form().s()));
} else {
if (delinquents.member(the_id))
continuable_error(class_name(), func,
- istring("a delinquent response is still missing: ")
+ astring("a delinquent response is still missing: ")
+ the_id.text_form());
// if we hadn't already seen it, we'll watch for it next time.
delinquents += the_id;
-LOG(isprintf("added %s to delinquents.", the_id.text_form().s()));
+LOG(a_sprintf("added %s to delinquents.", the_id.text_form().s()));
}
return;
}
// check that the right type is coming back to us.
bubble *cast = dynamic_cast<bubble *>(received);
if (!cast) {
- continuable_error(class_name(), func, istring("got the wrong type "
+ continuable_error(class_name(), func, astring("got the wrong type "
"of response: ") + received->classifier().text_form());
}
}
}
-void cromp_client_tester::bite_server(basis::set<octopus_request_id> &ids,
- basis::set<octopus_request_id> &delinquents,
+void cromp_client_tester::bite_server(structures::set<octopus_request_id> &ids,
+ structures::set<octopus_request_id> &delinquents,
void *originator)
{
FUNCDEF("bite_server");
if (failure_count++ < 20) {
sends--; // skip back for the failed one.
overall_sent -= curr_sending; // remove unsent portion.
- LOG(istring("got failure outcome ") + cromp_client::outcome_name(ret)
+ LOG(astring("got failure outcome ") + cromp_client::outcome_name(ret)
+ " from attempt to submit request.");
if (_snooze_duration) {
_uplink->keep_alive_pause(_snooze_duration, 60);
continue; // try again.
}
continuable_error(class_name(), func,
- istring("failed to submit the request--got error ")
+ astring("failed to submit the request--got error ")
+ cromp_client::outcome_name(ret));
break;
}
}
if (! (sends % _checkpoint_count)) {
- BASE_LOG(isprintf("%x send #%d", originator, sends));
+ BASE_LOG(a_sprintf("%x send #%d", originator, sends));
}
}
- BASE_LOG(isprintf("%x final send #%d", originator, _send_count));
+ BASE_LOG(a_sprintf("%x final send #%d", originator, _send_count));
/// LOG(timestamp(true, true) + " done.");
-/// LOG(isprintf("sent %d items.", _send_count));
+/// LOG(a_sprintf("sent %d items.", _send_count));
look_for_receipts(ids.elements(), ids, delinquents);
- LOG(isprintf("concluded %d test requests and responses.", _send_count));
+ LOG(a_sprintf("concluded %d test requests and responses.", _send_count));
}
void cromp_client_tester::grab_items()
// calculate how much space bubble's naming takes up.
byte_array packed_classifier;
- basis::pack(packed_classifier, my_bubble.classifier());
+ structures::pack_array(packed_classifier, my_bubble.classifier());
double classifier_size = packed_classifier.length() - sizeof(int);
// that's how much space is used by our goofy classifier name. there are
// a few bytes extra overhead for packing a string array and we remove
// get additional facts about how much of a packed infoton is wasted.
byte_array packed_infote;
infoton::fast_pack(packed_infote, my_bubble);
- log(isprintf("sane? -- overhead for just packed infoton is %d bytes.",
+ log(a_sprintf("sane? -- overhead for just packed infoton is %d bytes.",
packed_infote.length() - payload_portion));
octopus_request_id example_request(_uplink->entity(), 23982);
byte_array packed_req_id;
example_request.pack(packed_req_id);
- log(isprintf(" -- overhead for octo request id is %d bytes.",
+ log(a_sprintf(" -- overhead for octo request id is %d bytes.",
packed_req_id.length()));
byte_array packed_transa;
cromp_transaction::flatten(packed_transa, my_bubble,
octopus_request_id(_uplink->entity(), 23982));
- log(isprintf(" -- overhead for cromp transation is %d bytes.",
+ log(a_sprintf(" -- overhead for cromp transation is %d bytes.",
packed_transa.length() - payload_portion));
#endif
- BASE_LOG(isprintf("sent %.0f items, %.0f bytes, %.0f bytes per item,%s"
+ BASE_LOG(a_sprintf("sent %.0f items, %.0f bytes, %.0f bytes per item,%s"
"payload %.0f bytes, overhead %.0f bytes, percent overhead %.1f%%,%s"
"in %.2f seconds is %f ms/item%s"
"at %.2f %cb/sec & %.2f items/sec.",
bandwidth.number_of_sends(), bandwidth.bytes_sent(),
bytes_per_item,
- log_base::platform_ending(),
+ parser_bits::platform_eol_to_chars(),
payload_portion, overhead, percent_overhead * 100.0,
- log_base::platform_ending(),
+ parser_bits::platform_eol_to_chars(),
bandwidth.total_time() / SECOND_ms,
bandwidth.total_time() / bandwidth.number_of_sends(),
- log_base::platform_ending(),
+ parser_bits::platform_eol_to_chars(),
(bandwidth.kilobytes_per_second() < 1024.0?
bandwidth.kilobytes_per_second()
: bandwidth.megabytes_per_second()),
FUNCDEF("cause_object_reconstruction");
int rando = chaos().inclusive(1, 100);
if (rando > CHANCE_OF_RECONSTRUCT) return; // not doing it this time.
- LOG(istring("reconstructing client at ") + _server_loc.text_form());
+ LOG(astring("reconstructing client at ") + _server_loc.text_form());
//below is not good when multiple threads are allowed to romp on client.
//// WHACK(_uplink);
//// _uplink = new cromp_client(_server_loc);
while ( (ret != common::OKAY) && (counter-- >= 0) ) {
ret = _uplink->connect();
if (ret != cromp_client::OKAY) {
- LOG(istring("couldn't reconnect this time: ")
+ LOG(astring("couldn't reconnect this time: ")
+ cromp_client::outcome_name(ret));
- portable::sleep_ms(420);
+ time_control::sleep_ms(420);
}
}
}
outcome ret = _uplink->connect();
if (ret != cromp_client::OKAY) {
- deadly_error(class_name(), func, istring("connection failed with error: ")
+ deadly_error(class_name(), func, astring("connection failed with error: ")
+ cromp_client::outcome_name(ret));
}
thread_cabinet cab; // we store a bunch of threads here.
- LOG(isprintf("adding %d grabber threads to test.", _grabber_count));
+ LOG(a_sprintf("adding %d grabber threads to test.", _grabber_count));
// create the extra grabber threads.
for (int i = 0; i < _grabber_count; i++) {
cab.add_thread(to_add, false, NIL);
}
- LOG(isprintf("adding %d transmitter threads to test.", _thread_count));
+ LOG(a_sprintf("adding %d transmitter threads to test.", _thread_count));
// create the specified number of threads.
for (int j = 0; j < _thread_count; j++) {
cab.start_all(NIL);
//LOG("done starting threads...");
- portable::sleep_ms(400); // wait until a few get cranked up.
+ time_control::sleep_ms(400); // wait until a few get cranked up.
//LOG("did our initial sleep...");
while (cab.any_running()) {
- portable::sleep_ms(30);
+ time_control::sleep_ms(30);
if (!_threads_active) {
break;
}
outcome ret = _uplink->connect();
if (ret != cromp_client::OKAY) {
// snooze a bit so as not to drive server crazy or log too much noise.
- portable::sleep_ms(10 * SECOND_ms);
+ time_control::sleep_ms(10 * SECOND_ms);
}
}
}
LOG("- done testing -");
if (_finished_loops != double(_thread_count) * _send_count)
- LOG(isprintf("number of loops was calculated differently: wanted %d, "
+ LOG(a_sprintf("number of loops was calculated differently: wanted %d, "
"got %d", _thread_count * _send_count, _finished_loops));
report(start, cromp_common::total_bytes_sent()
return 0;
}
-////////////////////////////////////////////////////////////////////////////
+//////////////
HOOPLE_MAIN(cromp_client_tester, )