feisty meow concerns codebase 2.140
test_cromp_client.cpp
Go to the documentation of this file.
1/*****************************************************************************\
2* *
3* Name : test_cromp_client *
4* Author : Chris Koeritz *
5* *
6*******************************************************************************
7* Copyright (c) 2002-$now By Author. This program is free software; you can *
8* redistribute it and/or modify it under the terms of the GNU General Public *
9* License as published by the Free Software Foundation; either version 2 of *
10* the License or (at your option) any later version. This is online at: *
11* http://www.fsf.org/copyleft/gpl.html *
12* Please send any updates to: fred@gruntose.com *
13\*****************************************************************************/
14
15#include "crompish_pax.h"
16
17#include <mathematics/chaos.h>
18#include <basis/astring.h>
19
23#include <cromp/cromp_client.h>
24#include <filesystem/filename.h>
26#include <loggers/file_logger.h>
29#include <octopus/entity_defs.h>
30#include <octopus/infoton.h>
31#include <processes/ethread.h>
37#include <structures/set.h>
38#include <timely/time_control.h>
39#include <unit_test/unit_base.h>
40
41#include <stdlib.h>
42
43using namespace application;
44using namespace basis;
45using namespace configuration;
46using namespace cromp;
47using namespace mathematics;
48using namespace filesystem;
49using namespace loggers;
50using namespace octopi;
51using namespace processes;
52using namespace sockets;
53using namespace structures;
54using namespace textual;
55using namespace timely;
56using namespace unit_test;
57
58#undef LOG
59#define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), astring(s))
60
61#define DEBUG_TESTER
62 // uncomment for noisier version.
63
64// the number of transactions to send during a test. if timing connection
65// duration, then use a maximum of 1. if timing speed of operation once
66// connected, use a large number.
67//const int MAXIMUM_SENDS = 10008;
68const int MAXIMUM_SENDS = 100008;
69//const int MAXIMUM_SENDS = 10000008;
70 // have had success with up to 10000000 sends using small data segments.
71
72const int NUMBER_OF_THREADS = 1;
73//const int NUMBER_OF_THREADS = 10;
74//const int NUMBER_OF_THREADS = 20;
75 // the number of simultaneous actors on the single cromp_client.
76
77//const int GRABBER_THREADS = 5;
78const int GRABBER_THREADS = 0;
79 // the number of threads that just pluck at the cromp_client trying to
80 // interfere with the testing threads.
81
82//const int MAX_SEND_TRIES = 0; // don't pause.
83const int MAX_SEND_TRIES = 1; // try to get stuff out but don't wait long.
84//const int MAX_SEND_TRIES = 5; // wait a reasonable amount of times to send.
85//const int MAX_SEND_TRIES = 10000; // force it to get out, hopefully.
86 // the number of times we try to push the sends out. zero means never
87 // try to push anything, just add it to the buffer. 1 or more is that
88 // many tries to push the send.
89
90//const int CHECKPOINT_SIZE = 1000;
91//const int CHECKPOINT_SIZE = 100;
92const int CHECKPOINT_SIZE = 100;
93 // prints a counter out when we reach a multiple of this many sends.
94
95//const int DATA_SEGMENT_SIZE = 0;
96const int DATA_SEGMENT_SIZE = 64;
97//const int DATA_SEGMENT_SIZE = 128 * KILOBYTE;
98//const int DATA_SEGMENT_SIZE = 1 * MEGABYTE;
99 // the chunk size that we attach.
100
102 // this is the period between reports on how the test is going.
103
104//***this is where we are in testing the faux dual cpu problem.
105//***the longer delay shows the problem more easily. the shorter delay
106//***is being used for a long test.
108//const int MAXIMUM_ACQUISITION_DELAY = int(0.5 * SECOND_ms);
109 // the longest we'll snooze off waiting for pending receptions to occur.
110
111//const int MAXIMUM_PENDING_REQUESTS = 1;
113 // this is a threshold for the number of requests; once hit, we start
114 // awaiting the responses.
115
116//const int PENDING_REQUESTS_FORCED = 3;
117//const int PENDING_REQUESTS_FORCED = 80;
119 // when we've been forced to gather some pending responses to previous
120 // requests, this is how many we'll try to get at once. numbers closer
121 // to the MAXIMUM_PENDING_REQUESTS will force more synchrony.
122
124 // how frequently a bus reconstruction occurs, in 1000.
125
126class cromp_client_tester : virtual public unit_base, virtual public application_shell
127{
128public:
129 cromp_client_tester();
130 ~cromp_client_tester();
131
132 virtual int execute();
133
134 DEFINE_CLASS_NAME("cromp_client_tester");
135
136 void bite_server(structures::set<octopus_request_id> &ids,
137 structures::set<octopus_request_id> &delinquents, void *originator);
138 // performs the big chunk of testing. the "ids" are the history of the
139 // sends that were made and they're managed by this method. the
140 // "originator" is a tag we can use to generate unique print outs.
141
142 int print_instructions();
144
145 void grab_items();
147
148 void cause_object_reconstruction();
150
151 void increment_thread_count() {
152 FUNCDEF("increment_thread_count");
153 auto_synchronizer l(*_lock);
154 _threads_active++;
155//LOG(a_sprintf("count now %d", _threads_active));
156 }
157
158 void decrement_thread_count() {
159 FUNCDEF("decrement_thread_count");
160 auto_synchronizer l(*_lock);
161 _threads_active--;
162//LOG(a_sprintf("count now %d", _threads_active));
163 }
164
165 void report(const time_stamp &start_time, double bytes_transmitted,
166 double conversations);
167 // describes how the test is going.
168
169private:
170 cromp_client *_uplink; // provides the connection and transmission services.
171
172 mutex *_lock; // protects the objects below.
173 int _threads_active; // the number of transmitter threads running.
174 time_stamp _last_report; // when we last reported on progress.
175 double _finished_loops; // counts number of loops we've achieved.
176 bool _encryption; // true if we're encrypting.
177 int _send_count;
178 int _thread_count;
179 int _grabber_count;
180 int _send_tries;
181 int _checkpoint_count;
182 int _dataseg_size;
183 int _report_interval;
184 int _snooze_duration;
185 bool _rpc_style;
186 bool _reconstruct_object;
187 internet_address _server_loc;
188
189 void look_for_receipts(int count, structures::set<octopus_request_id> &ids,
190 structures::set<octopus_request_id> &delinquents, bool wait = false);
191 // attempts to get "count" items from the list of "ids".
192};
193
195
196class bitey_thread : public ethread
197{
198public:
199 bitey_thread(cromp_client_tester &parent)
200 : ethread(), _parent(parent) {}
201
202 void perform_activity(void *formal(ptr)) {
203 FUNCDEF("perform_activity");
204 _parent.increment_thread_count();
205 _parent.bite_server(_ids, _delinquents, this);
206 _parent.decrement_thread_count();
207 }
208
209private:
210 cromp_client_tester &_parent;
211 structures::set<octopus_request_id> _ids; // the ids for commands we've sent.
212 structures::set<octopus_request_id> _delinquents; // missing ids during rcv.
213};
214
216
217//hmmm: next stop; inject the types of items they're expecting in grab_items.
218
219class grabby_thread : public ethread
220{
221public:
222 grabby_thread(cromp_client_tester &parent)
223 : ethread(), _parent(parent) {}
224
225 void perform_activity(void *formal(ptr)) {
226 while (!should_stop()) {
227 _parent.grab_items();
228 if (_rando.inclusive(0, 100) > 10)
229 time_control::sleep_ms(_rando.inclusive(5, 38));
230 }
231 }
232
233private:
234 cromp_client_tester &_parent;
235 chaos _rando;
236};
237
239
240cromp_client_tester::cromp_client_tester()
242//"cromp_client_tester"),
243 _uplink(NULL_POINTER),
244 _lock(new mutex),
245 _threads_active(0),
246 _finished_loops(0.0),
247 _encryption(false),
248 _send_count(0),
249 _thread_count(0),
250 _grabber_count(0),
251 _send_tries(0),
252 _checkpoint_count(0),
253 _dataseg_size(0),
254 _report_interval(0),
255 _snooze_duration(0),
256 _rpc_style(false),
257 _reconstruct_object(false),
258 _server_loc()
259{
260 FUNCDEF("constructor");
261 LOG("");
262 LOG("");
263
265//LOG(a_sprintf("argc is %d and first is %s", application::_global_argc, application::_global_argv[0]));
266
267 int indy = 0;
268 if (args.find("help", indy, false)
269 || (args.find("?", indy, false))
270 || (args.find('?', indy, false)) ) {
272 exit(0);
273 }
274
275 // check for a port on the command line.
276 astring port_text;
277 int port = 5678;
278 if (args.get_value("port", port_text, false)) {
279 LOG(astring("using port: ") + port_text);
280 port = port_text.convert(5678);
281 }
282 _server_loc.port = port;
283
284//hmmm:normalize host so this can take either name or IP.
285
286 indy = 0;
287 if (args.find("encrypt", indy, false) || (args.find('e', indy, true)) ) {
288 // they're saying that we should encrypt the communication.
289 LOG("turning on encryption.");
290 _encryption = true;
291 }
292
293 indy = 0;
294 if (args.find("rpc", indy, false) || (args.find('R', indy, true)) ) {
295 // this is telling us to turn on RPC mode. we will make each request and
296 // reply pair synchronous, i.e., each reply will be awaited for when a
297 // request has been made.
298 LOG("turning on RPC style requests.");
299 _rpc_style = true;
300 }
301
302 // check for a hostname on the command line.
303 astring hostname("local");
304 astring host_temp;
305 if (args.get_value("host", host_temp, false)) {
306 LOG(astring("using host: ") + host_temp);
307 hostname = host_temp;
308 }
309LOG(astring("using host: ") + hostname);
310 strcpy(_server_loc.hostname, hostname.s());
311
312 astring send_temp;
313 int send_count = MAXIMUM_SENDS;
314 if (args.get_value("sends", send_temp, false)) {
315 LOG(astring("using send count: ") + send_temp);
316 send_count = send_temp.convert(send_count);
317 if (send_count <= 0) send_count = 1;
318 }
319 _send_count = send_count;
320
321 astring thread_temp;
322 int thread_count = NUMBER_OF_THREADS;
323 if (args.get_value("threads", thread_temp, false)) {
324 LOG(astring("using thread count: ") + thread_temp);
325 thread_count = thread_temp.convert(thread_count);
326 if (thread_count <= 0) thread_count = 1;
327 }
328 _thread_count = thread_count;
329
330 astring grabber_temp;
331 int grabber_count = GRABBER_THREADS;
332 if (args.get_value("grab", grabber_temp, false)) {
333 LOG(astring("using grabber count: ") + grabber_temp);
334 grabber_count = grabber_temp.convert(grabber_count);
335 if (grabber_count < 0) grabber_count = 0;
336 }
337 _grabber_count = grabber_count;
338
339 astring send_tries_temp;
340 int send_tries = MAX_SEND_TRIES;
341 if (args.get_value("trysend", send_tries_temp, false)) {
342 LOG(astring("using send tries: ") + send_tries_temp);
343 send_tries = send_tries_temp.convert(send_tries);
344 if (send_tries < 0) send_tries = 0;
345 }
346 _send_tries = send_tries;
347
348//hmmm: how tiresome. how about a macro here? could help in general
349// with command_line also.
350
351 astring checkpoint_temp;
352 int checkpoint_count = CHECKPOINT_SIZE;
353 if (args.get_value("print", checkpoint_temp, false)) {
354 LOG(astring("using checkpoint count: ") + checkpoint_temp);
355 checkpoint_count = checkpoint_temp.convert(checkpoint_count);
356 if (checkpoint_count <= 0) checkpoint_count = 1;
357 }
358 _checkpoint_count = checkpoint_count;
359
360 astring dataseg_temp;
361 int dataseg_size = DATA_SEGMENT_SIZE;
362 if (args.get_value("dataseg", dataseg_temp, false)) {
363 LOG(astring("using dataseg size: ") + dataseg_temp);
364 dataseg_size = dataseg_temp.convert(dataseg_size);
365 if (dataseg_size < 0) dataseg_size = 0;
366 }
367 _dataseg_size = dataseg_size;
368
369 astring report_temp;
370 int report_interval = REPORTING_INTERVAL;
371 if (args.get_value("report", report_temp, false)) {
372 LOG(astring("using report interval: ") + report_temp);
373 report_interval = report_temp.convert(report_interval);
374 if (report_interval <= 0) report_interval = 1;
375 report_interval *= SECOND_ms; // convert to milliseconds.
376 }
377 _report_interval = report_interval;
378
379 astring snooze_temp;
380 int snooze_duration = 0; // no snooze by default.
381 if (args.get_value("snooze", snooze_temp, false)) {
382 LOG(astring("using snooze duration: ") + snooze_temp);
383 snooze_duration = snooze_temp.convert(snooze_duration);
384 if (snooze_duration < 0) snooze_duration = 0;
385 }
386 _snooze_duration = snooze_duration;
387
388 if (args.find("reconstruct", indy, false)) {
389 LOG("saw reconstruct flag; will periodically tear down object.");
390 _reconstruct_object = true;
391 }
392
393LOG(astring("opening at ") + _server_loc.text_form());
394 _uplink = new cromp_client(_server_loc);
395
396 _uplink->add_tentacle(new bubbles_tentacle(false));
397//we don't need backgrounding right now.
398}
399
400cromp_client_tester::~cromp_client_tester()
401{
402 WHACK(_lock);
403 WHACK(_uplink);
404}
405
406int cromp_client_tester::print_instructions()
407{
409 log(a_sprintf("%s usage:", name.s()));
410 log(astring(""));
411 log(a_sprintf("\
412This program connects to a cromp test server and exchanges packets to test\n\
413the performance of the cromp protocol. All command line flags are optional\n\
414but can be added to specify how the test should be performed. Currently,\n\
415the valid options are:\n\
416 --help\tShow this set of command-line help.\n\
417 -?\t\tditto.\n\
418 --port N\tConnect to the server on the port specified.\n\
419 --host X\tConnect to server at IP address or hostname X.\n\
420 --encrypt\tEncrypt the connection. Server must do this also.\n\
421 -e\t\tditto.\n\
422 --sends N\tThe number of sends to perform.\n\
423 --threads N\tNumber of threads competing for single cromp link.\n\
424 --grab N\tNumber of additional threads stressing retrievals.\n\
425 --trysend N\tCount of tries for sending if not all data went out.\n\
426 --print N\tItems handled in between showing send counter.\n\
427 --dataseg N\tSize of extra data packed in each test packet.\n\
428 --report N\tDuration of time between reports, in seconds.\n\
429 --snooze N\tSleep N ms between each send; this invalidates timing info.\n\
430 --rpc\tEmulate Remote Procedure Call by awaiting each response.\n\
431 -R\t\tditto\n\
432"));
433 return -3;
434}
435
436void cromp_client_tester::look_for_receipts(int count,
438 structures::set<octopus_request_id> &delinquents, bool wait)
439{
440 FUNCDEF("look_for_receipts");
441 infoton *received = NULL_POINTER;
442 while (count--) {
443 if (!ids.length()) break; // nothing to check on.
444 octopus_request_id the_id = ids[0];
445 ids.zap(0, 0); // take out the one we're inspecting right now.
446
447 time_stamp start_acquire;
448 int delay = MAXIMUM_ACQUISITION_DELAY;
449 if (wait) delay = 2 * MINUTE_ms; // force a long delay.
450 outcome ret = _uplink->acquire(received, the_id, delay);
451 int acquire_duration = int(time_stamp().value() - start_acquire.value());
452 if (acquire_duration >= MAXIMUM_ACQUISITION_DELAY - 1) {
453 LOG("passed time limit for acquire! this is the faux dual-cpu bug!");
454 LOG(a_sprintf("there were %d items left to acquire.", count));
455 LOG(a_sprintf("pending %d bytes to send, %d bytes accumulated.",
456 _uplink->pending_sends(), _uplink->accumulated_bytes()));
457 LOG(a_sprintf("the data bin had %d items awaiting pickup.",
458 _uplink->octo()->responses().items_held()));
459 if (ret != cromp_client::TIMED_OUT) {
460 LOG("cromp client lied about outcome?? didn't call this timed out!!");
461 }
462 }
463
464 if (ret != cromp_client::OKAY) {
465 if (ret != cromp_client::TIMED_OUT) {
466 LOG(astring("failed to acquire the response--got error ")
467 + cromp_client::outcome_name(ret));
468 // give it another chance later.
469 ids += the_id;
470LOG(a_sprintf("moved %s back to main id queue.", the_id.text_form().s()));
471 } else {
472 if (delinquents.member(the_id))
473 continuable_error(class_name(), func,
474 astring("a delinquent response is still missing: ")
475 + the_id.text_form());
476 // if we hadn't already seen it, we'll watch for it next time.
477 delinquents += the_id;
478LOG(a_sprintf("added %s to delinquents.", the_id.text_form().s()));
479 }
480 return;
481 }
482
483if (!received) {
484deadly_error(class_name(), func,
485"received packet was NULL_POINTER even though outcome was OKAY!");
486}
487
488 // check that the right type is coming back to us.
489 bubble *cast = dynamic_cast<bubble *>(received);
490 if (!cast) {
491 continuable_error(class_name(), func, astring("got the wrong type "
492 "of response: ") + received->classifier().text_form());
493 }
494
495 // if we had a problem with this item earlier, we remove it since it
496 // succeeded this time.
497 if (delinquents.member(the_id))
498 delinquents.remove(the_id);
499 WHACK(received);
500 }
501}
502
503void cromp_client_tester::bite_server(structures::set<octopus_request_id> &ids,
505 void *originator)
506{
507 FUNCDEF("bite_server");
508 octopus_request_id cmd_id;
509
511
512 outcome ret;
513
514 double overall_sent = 0;
515
516//hmmm: not very interesting boundaries below, non-randomized and identical in both places.
517 const char *bounds_init[] = { "0", "120", "220", "280" };
518 string_array boundish(4, bounds_init);
519
520 // this computes the size of the exchange object with no extra data attached.
522 bubble test_size(_dataseg_size, boundish, 238843);
523 test_size.data().reset();
524 // set the data segment to zero length.
525 test_size.pack(temp);
526 int base_length = temp.length();
527 // this is the base packed length of the bubble object.
528
529 int failure_count = 0;
530
531 time_stamp start; // record when our testing started.
532
533 for (int sends = 1; sends <= _send_count; sends++) {
534 bubble to_send(_dataseg_size, boundish, 238843);
535 int curr_sending = to_send.data_length() + base_length * 2;
536 overall_sent += curr_sending;
537 // we compute the overall sent by what's sent in the request (which is
538 // of the base length plus the attached array size) and the reply (which
539 // is the base length only since the server resets the data attachment).
540 // we go ahead and count it as sent before the send, since we're going
541 // to bomb out if the send doesn't work.
542 ret = _uplink->submit(to_send, cmd_id, _send_tries);
543 switch (ret.value()) {
544 case cromp_client::OKAY: {
545 // complete success in sending that chunk out.
546 ids.add(cmd_id); // record it.
547 if (_rpc_style) {
548 // this call is used to force single requests and replies RPC style.
549 look_for_receipts(1, ids, delinquents, true);
550 }
551 // sleep if we were asked to.
552 if (_snooze_duration) {
553 _uplink->keep_alive_pause(_snooze_duration, 60);
554 look_for_receipts(1, ids, delinquents);
555 }
556 break;
557 }
558 case cromp_client::TOO_FULL: {
559//treating as failure right now.
560LOG("got too full outcome!");
561 sends--;
562 overall_sent -= curr_sending;
563 continue;
564 break;
565 }
566 case cromp_client::TIMED_OUT: {
567//treating as failure right now.
568LOG("got timed out outcome!");
569 sends--;
570 overall_sent -= curr_sending;
571 continue;
572 break;
573 }
574 default: {
575 // a failure case that we have no other handling for.
576 if (failure_count++ < 20) {
577 sends--; // skip back for the failed one.
578 overall_sent -= curr_sending; // remove unsent portion.
579 LOG(astring("got failure outcome ") + cromp_client::outcome_name(ret)
580 + " from attempt to submit request.");
581 if (_snooze_duration) {
582 _uplink->keep_alive_pause(_snooze_duration, 60);
583 }
584 continue; // try again.
585 }
586 continuable_error(class_name(), func,
587 astring("failed to submit the request--got error ")
588 + cromp_client::outcome_name(ret));
589 break;
590 }
591 }
592
593 _finished_loops += 1.0;
594
596 // grab some of the items waiting. hopefully they're back by now.
597 look_for_receipts(PENDING_REQUESTS_FORCED, ids, delinquents);
598 }
599
600 if (! (sends % _checkpoint_count)) {
601 LOG(a_sprintf("%x send #%d", originator, sends));
602 }
603 }
604 LOG(a_sprintf("%x final send #%d", originator, _send_count));
605
608
609 look_for_receipts(ids.elements(), ids, delinquents);
610
611 LOG(a_sprintf("concluded %d test requests and responses.", _send_count));
612}
613
614void cromp_client_tester::grab_items()
615{
616 FUNCDEF("grab_items");
617 octopus_request_id id(_uplink->entity(), -12);
618 // look for an id we don't expect to have any thing waiting for.
619 infoton *found = NULL_POINTER;
620 outcome ret = _uplink->retrieve_and_restore(found, id, 0);
621 WHACK(found);
622}
623
624void cromp_client_tester::report(const time_stamp &start_time,
625 double bytes_transmitted, double conversations)
626{
627 FUNCDEF("report");
628 throughput_counter bandwidth; // calculator for communication speed.
629 double duration = time_stamp().value() - start_time.value();
630 // the elapsed duration so far.
631 bandwidth.add_run(bytes_transmitted, duration, conversations * 2);
632 // create a portrait of how the run has progressed. we multiply the
633 // conversations by two since we are counting both the request and the
634 // response (send and receive) as a transfer.
635
636 // calculate the number of bytes per item for real as it plays out in
637 // cromp sending.
638 double bytes_per_item = bandwidth.bytes_sent() / bandwidth.number_of_sends();
639
640 bubble my_bubble(_dataseg_size); // an exemplar for our sends.
641
642 // calculate how much space bubble's naming takes up.
643 byte_array packed_classifier;
644 structures::pack_array(packed_classifier, my_bubble.classifier());
645 double classifier_size = packed_classifier.length() - sizeof(int);
646 // that's how much space is used by our goofy classifier name. there are
647 // a few bytes extra overhead for packing a string array and we remove
648 // them from consideration; we only want credit for the name, since that
649 // is not truly overhead, given that the bubble infoton chose it.
650 double payload_portion = my_bubble.packed_size() + classifier_size;
651 // calculate the portion of our transmissions that are solely the
652 // result of what we are putting into the package.
653 double overhead = bytes_per_item - payload_portion;
654 // okay, this is how many bytes per item is cromp noise, rather than
655 // something the user is responsible for.
656 double percent_overhead = overhead / bytes_per_item;
657
658// change 0 to 1 to enable this section of information.
659#if 0
660 // get additional facts about how much of a packed infoton is wasted.
661 byte_array packed_infote;
662 infoton::fast_pack(packed_infote, my_bubble);
663 log(a_sprintf("sane? -- overhead for just packed infoton is %d bytes.",
664 packed_infote.length() - payload_portion));
665 octopus_request_id example_request(_uplink->entity(), 23982);
666 byte_array packed_req_id;
667 example_request.pack(packed_req_id);
668 log(a_sprintf(" -- overhead for octo request id is %d bytes.",
669 packed_req_id.length()));
670 byte_array packed_transa;
671 cromp_transaction::flatten(packed_transa, my_bubble,
672 octopus_request_id(_uplink->entity(), 23982));
673 log(a_sprintf(" -- overhead for cromp transation is %d bytes.",
674 packed_transa.length() - payload_portion));
675#endif
676
677 LOG(a_sprintf("sent %.0f items, %.0f bytes, %.0f bytes per item,%s"
678 "payload %.0f bytes, overhead %.0f bytes, percent overhead %.1f%%,%s"
679 "in %.2f seconds is %f ms/item%s"
680 "at %.2f %cb/sec & %.2f items/sec.",
681 bandwidth.number_of_sends(), bandwidth.bytes_sent(),
682 bytes_per_item,
684 payload_portion, overhead, percent_overhead * 100.0,
686 bandwidth.total_time() / SECOND_ms,
687 bandwidth.total_time() / bandwidth.number_of_sends(),
689 (bandwidth.kilobytes_per_second() < 1024.0?
690 bandwidth.kilobytes_per_second()
691 : bandwidth.megabytes_per_second()),
692 (bandwidth.kilobytes_per_second() < 1024.0? 'K' : 'M'),
693 bandwidth.number_of_sends() / (bandwidth.total_time() / SECOND_ms)));
694}
695
696void cromp_client_tester::cause_object_reconstruction()
697{
698 FUNCDEF("cause_object_reconstruction");
699 int rando = chaos().inclusive(1, 100);
700 if (rando > CHANCE_OF_RECONSTRUCT) return; // not doing it this time.
701 LOG(astring("reconstructing client at ") + _server_loc.text_form());
702//below is not good when multiple threads are allowed to romp on client.
705
706 _uplink->disconnect();
707 outcome ret = common::INVALID;
708 int counter = 100; // allowed this many times to try to reconnect.
709 while ( (ret != common::OKAY) && (counter-- >= 0) ) {
710 ret = _uplink->connect();
711 if (ret != cromp_client::OKAY) {
712 LOG(astring("couldn't reconnect this time: ")
713 + cromp_client::outcome_name(ret));
715 }
716 }
717}
718
719int cromp_client_tester::execute()
720{
721 FUNCDEF("execute");
722
723 // testing that crompish pax are done right.
724 bubble fud(randomizer().inclusive(12, 2829));
725 byte_array packed_fud;
726 fud.pack(packed_fud);
727 if (packed_fud.length() != fud.packed_size())
728 deadly_error(class_name(), func, "bubble's packed size method is wrong.");
729
730 if (_encryption) _uplink->enable_encryption();
731
732 outcome ret = _uplink->connect();
733 if (ret != cromp_client::OKAY) {
734 deadly_error(class_name(), func, astring("connection failed with error: ")
735 + cromp_client::outcome_name(ret));
736 }
737
738 thread_cabinet cab; // we store a bunch of threads here.
739
740 LOG(a_sprintf("adding %d grabber threads to test.", _grabber_count));
741
742 // create the extra grabber threads.
743 for (int i = 0; i < _grabber_count; i++) {
744 grabby_thread *to_add = new grabby_thread(*this);
745 cab.add_thread(to_add, false, NULL_POINTER);
746 }
747
748 LOG(a_sprintf("adding %d transmitter threads to test.", _thread_count));
749
750 // create the specified number of threads.
751 for (int j = 0; j < _thread_count; j++) {
752 bitey_thread *to_add = new bitey_thread(*this);
753 cab.add_thread(to_add, false, NULL_POINTER);
754 }
755
756//LOG("starting all threads...");
757 time_stamp start;
759//LOG("done starting threads...");
760
761 time_control::sleep_ms(400); // wait until a few get cranked up.
762
763//LOG("did our initial sleep...");
764
765 while (cab.any_running()) {
767 if (!_threads_active) {
768 break;
769 }
770//LOG("main loop...");
771 if (time_stamp(-_report_interval) > _last_report) {
772 report(start, cromp_common::total_bytes_sent()
774 _finished_loops);
775 _last_report.reset();
776 }
777 if (_reconstruct_object) {
778 cause_object_reconstruction();
779 }
780 if (!_uplink->connected()) {
781 LOG("connection dropped. trying to connect again.");
782 outcome ret = _uplink->connect();
783 if (ret != cromp_client::OKAY) {
784 // snooze a bit so as not to drive server crazy or log too much noise.
786 }
787 }
788 }
789
790 LOG("- done testing -");
791
792 if (_finished_loops != double(_thread_count) * _send_count)
793 LOG(a_sprintf("number of loops was calculated differently: wanted %d, "
794 "got %d", _thread_count * _send_count, _finished_loops));
795
796 report(start, cromp_common::total_bytes_sent()
798 _thread_count * _send_count);
799
800//LOG("stopping all threads...");
801 cab.stop_all();
802 LOG("all threads exited.");
803
804#ifdef DEBUG_TESTER
807#endif
808
809 LOG("works for those functions tested.");
810
811 return 0;
812}
813
815
816HOOPLE_MAIN(cromp_client_tester, )
817
int print_instructions(bool good, const astring &program_name)
Definition checker.cpp:45
The application_shell is a base object for console programs.
virtual int execute()=0
< retrieves the command line from the /proc hierarchy on linux.
a_sprintf is a specialization of astring that provides printf style support.
Definition astring.h:440
int length() const
Returns the current reported length of the allocated C array.
Definition array.h:115
outcome zap(int start, int end)
Deletes from "this" the objects inclusively between "start" and "end".
Definition array.h:769
Provides a dynamically resizable ASCII character string.
Definition astring.h:35
const char * s() const
synonym for observe. the 's' stands for "string", if that helps.
Definition astring.h:113
int convert(int default_value) const
Converts the string into a corresponding integer.
Definition astring.cpp:760
virtual void text_form(base_string &state_fill) const
Provides a text view of all the important info owned by this object.
Definition astring.cpp:130
auto_synchronizer simplifies concurrent code by automatically unlocking.
Definition mutex.h:113
A very common template for a dynamic array of bytes.
Definition byte_array.h:36
Outcomes describe the state of completion for an operation.
Definition outcome.h:31
int value() const
Definition outcome.h:51
static double total_bytes_sent()
static double total_bytes_received()
static void flatten(basis::byte_array &packed_form, const octopi::infoton &request, const octopi::octopus_request_id &id)
Provides operations commonly needed on file names.
Definition filename.h:64
const basis::astring & raw() const
returns the astring that we're holding onto for the path.
Definition filename.cpp:97
filename basename() const
returns the base of the filename; no directory.
Definition filename.cpp:385
a platform-independent way to acquire random numbers in a specific range.
Definition chaos.h:51
int inclusive(int low, int high) const
< Returns a pseudo-random number r, such that "low" <= r <= "high".
Definition chaos.h:88
An infoton is an individual request parcel with accompanying information.
Definition infoton.h:32
static void fast_pack(basis::byte_array &packed_form, const infoton &to_pack)
flattens an infoton "to_pack" into the byte array "packed_form".
Definition infoton.cpp:162
const structures::string_array & classifier() const
this array of strings is the "name" for this infoton.
Definition infoton.cpp:85
Identifies requests made on an octopus by users.
basis::astring text_form() const
human readable form of the request.
Provides a platform-independent object for adding threads to a program.
Definition ethread.h:36
virtual void perform_activity(void *thread_data)=0
< invoked just after after start(), when the OS thread is created.
bool should_stop() const
reports whether the thread should stop right now.
Definition ethread.h:136
Manages a collection of threads.
bool any_running() const
returns true if any threads are currently running.
structures::unique_int add_thread(ethread *to_add, bool start_it, void *parm)
adds a thread to be managed by the thread_cabinet.
void start_all(void *pointer)
cranks up any threads that are not already running.
void stop_all()
makes all of the threads quit.
this type of address describes a destination out on the internet.
Reports on average bandwidth of the transfers being measured.
double kilobytes_per_second() const
returns the number of kilobytes that transfers are getting per second.
double bytes_sent() const
returns the number of bytes sent so far.
double total_time() const
the run time so far, in milliseconds.
double number_of_sends() const
returns the number of sends that have occurred.
void add_run(double size_of_send, double time_of_send, double number_of_runs=1.0)
records a run without changing the state of the current run.
Emulates a mathematical set, providing several standard set operations.
Definition set.h:36
int elements() const
Returns the number of elements in this set.
Definition set.h:47
bool member(const contents &to_test) const
Returns true if the item "to_test" is a member of this set.
Definition set.h:223
bool add(const contents &to_add)
Adds a new element "to_add" to the set.
Definition set.h:232
bool remove(const contents &to_remove)
Removes the item "to_remove" from the set.
Definition set.h:249
An array of strings with some additional helpful methods.
basis::astring text_form() const
A synonym for the text_format() method.
static const char * platform_eol_to_chars()
provides the characters that make up this platform's line ending.
static void sleep_ms(basis::un_int msec)
a system independent name for a forced snooze measured in milliseconds.
Represents a point in time relative to the operating system startup time.
Definition time_stamp.h:38
time_representation value() const
returns the time_stamp in terms of the lower level type.
Definition time_stamp.h:61
#define continuable_error(c, f, i)
#define deadly_error(c, f, i)
#define formal(parameter)
This macro just eats what it's passed; it marks unused formal parameters.
Definition definitions.h:48
#define NULL_POINTER
The value representing a pointer to nothing.
Definition definitions.h:32
#define DEFINE_CLASS_NAME(objname)
Defines the name of a class by providing a couple standard methods.
Definition enhance_cpp.h:42
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
Definition enhance_cpp.h:54
Provides macros that implement the 'main' program of an application.
#define HOOPLE_MAIN(obj_name, obj_args)
options that should work for most unix and linux apps.
Definition hoople_main.h:61
Implements an application lock to ensure only one is running at once.
char ** _global_argv
The guards collection helps in testing preconditions and reporting errors.
Definition array.h:30
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
Definition functions.h:121
const int SECOND_ms
Number of milliseconds in a second.
const int MINUTE_ms
Number of milliseconds in a minute.
A platform independent way to obtain the timestamp of a file.
A logger that sends to the console screen using the standard output device.
An extension to floating point primitives providing approximate equality.
Definition averager.h:21
Provides access to the operating system's socket methods.
A dynamic container class that holds any kind of object via pointers.
Definition amorph.h:55
void pack_array(basis::byte_array &packed_form, const basis::array< contents > &to_pack)
provides a way to pack any array that stores packable objects.
#include <time.h>
Useful support functions for unit testing, especially within hoople.
Definition unit_base.cpp:35
#define randomizer()
const int MAX_SEND_TRIES
const int MAXIMUM_ACQUISITION_DELAY
const int CHECKPOINT_SIZE
#define LOG(s)
const int PENDING_REQUESTS_FORCED
const int DATA_SEGMENT_SIZE
const int CHANCE_OF_RECONSTRUCT
const int GRABBER_THREADS
const int MAXIMUM_SENDS
const int NUMBER_OF_THREADS
const int REPORTING_INTERVAL
const int MAXIMUM_PENDING_REQUESTS
chaos rando