1 /*****************************************************************************\
3 * Name : sequence_tracker *
4 * Author : Chris Koeritz *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
15 #include "machine_uid.h"
16 #include "sequence_tracker.h"
18 #include <basis/functions.h>
19 #include <basis/astring.h>
21 #include <basis/mutex.h>
22 #include <loggers/program_wide_logger.h>
23 #include <structures/amorph.h>
24 #include <structures/int_hash.h>
25 #include <textual/parser_bits.h>
26 #include <timely/time_stamp.h>
28 using namespace basis;
29 using namespace loggers;
30 using namespace structures;
31 using namespace textual;
32 using namespace timely;
36 const int MAX_BITS_FOR_SEQ_HASH = 10;
37 // the number of bits in the hash table of sequences, allowing 2^max buckets.
39 const int CLEANING_SPAN = 20000;
40 // if the sequence number is this far off from the one received, we will
41 // clean up the span list.
43 const int MAX_ITEMS = 200;
44 // maximum number of items in tracker. this is quite low since we don't
45 // want to be lugging around thousands of indices. for connection oriented,
46 // it will never be much of an issue, although for a broadcast style bus it
47 // could be kind of an issue if we do retransmissions with a lot of lag.
50 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
52 //hmmm: we need to address when a host has:
53 // a) rolled over in sequences (not for 4 years at least?)
54 // b) changed its address where another host now has old address (which
55 // is also a weirdo sequence jump, maybe backwards, maybe not).
56 // would our timing help to guarantee that any oddness introduced is
57 // swamped out in a few minutes? the worst thing would be a lost packet
58 // we dumped because we thought we'd seen it but hadn't.
59 // probably we will see a sequence that's really old seeming; should that
60 // be enough to trigger flushing the whole host?
65 int _sequence; // the sequence number in question.
66 time_stamp _when; // when we received this sequence.
68 sequence_record(int seq = 0) : _sequence(seq) {}
70 astring text_form() const {
71 return a_sprintf("seq=%d, time=", _sequence) + _when.text_form();
80 int _received_to; // highest sequence we've got full reception prior to.
81 machine_uid _host; // the host we're concerned with.
82 int_hash<sequence_record> _sequences; // record of active sequences.
83 time_stamp _last_active; // the last time we heard from this host.
84 // we could piece this together from the sequences but we prefer not to.
86 host_record(const machine_uid &host)
87 : _received_to(0), _host(host), _sequences(MAX_BITS_FOR_SEQ_HASH),
91 void clean_up(int coalesce_time) {
92 // check sequences for being right next to the highest received sequence.
93 // if they're one up, they can be collapsed without waiting for the aging
98 // we restrict the size of the array with this block.
99 if (ids.elements() > MAX_ITEMS) {
100 int zap_point = ids.elements() - MAX_ITEMS;
101 // we want to remove anything before this index.
102 for (int s0 = 0; s0 < zap_point; s0++) {
105 // set our received_to value from the current element.
106 if (_received_to < seq)
109 // now clean the list of our ids since they're gone.
110 ids.zap(0, zap_point - 1);
113 for (int s1 = 0; s1 < ids.elements(); s1++) {
114 sequence_record *seq;
116 if (!_sequences.find(id, seq)) continue; // bad mistake going on.
117 if (_received_to + 1 == seq->_sequence) {
118 // we've hit one that can be collapsed.
122 s1--; // skip back before deleted item.
126 // check sequence ages. coalesce any older ones.
127 for (int s2 = 0; s2 < ids.elements(); s2++) {
128 sequence_record *seq;
130 if (!_sequences.find(id, seq)) continue; // bad mistake going on.
131 if (seq->_when < time_stamp(-coalesce_time)) {
132 // this sequence number has floated too long; crush it now.
133 if (_received_to < seq->_sequence)
134 _received_to = seq->_sequence; // update highest received seq.
140 astring text_form(bool verbose) {
142 to_return += astring("host=") + _host.text_form()
143 + a_sprintf(", rec_to=%d", _received_to)
144 + ", active=" + _last_active.text_form();
148 for (int i = 0; i < ids.elements(); i++) {
149 sequence_record *found;
150 if (!_sequences.find(ids[i], found))
151 continue; // that's a bad thing.
152 to_return += astring(parser_bits::platform_eol_to_chars()) + "\t"
153 + found->text_form();
156 to_return += a_sprintf(", sequences held=%d", _sequences.elements());
165 //hmmm: should this be a hash table?
167 class host_history : public amorph<host_record>
170 virtual ~host_history() {}
172 DEFINE_CLASS_NAME("host_history");
174 int find_host(const machine_uid &to_find) {
175 for (int i = 0; i < elements(); i++) {
176 if (borrow(i)->_host == to_find) return i;
178 return common::NOT_FOUND;
181 bool whack_host(const machine_uid &to_find) {
182 int indy = find_host(to_find);
183 if (negative(indy)) return false;
188 void clean_up(int silence_time, int coalesce_time) {
189 for (int h = 0; h < elements(); h++) {
190 host_record &rec = *borrow(h);
191 // check host liveliness.
192 if (rec._last_active < time_stamp(-silence_time)) {
193 // this host got too stale; whack it now.
195 h--; // skip back to prior element.
198 rec.clean_up(coalesce_time);
202 bool add_sequence(const machine_uid &to_find, int sequence,
203 int silence_time, int coalesce_time) {
204 FUNCDEF("add_sequence");
205 int indy = find_host(to_find);
206 if (negative(indy)) {
207 host_record *rec = new host_record(to_find);
209 indy = find_host(to_find);
210 if (negative(indy)) {
211 LOG(astring("*** failure to add a host to the tracker: ")
212 + to_find.text_form());
213 return false; // serious error.
216 host_record &rec = *borrow(indy);
217 if (borrow(indy)->_received_to + 1 == sequence) {
218 // this is just one up from our last received guy, so optimize it out.
219 rec._received_to = sequence;
220 } else if (sequence - borrow(indy)->_received_to > CLEANING_SPAN) {
221 // if the number is wildly different, assume we haven't dealt with this
223 rec._received_to = sequence;
224 #ifdef DEBUG_SEQUENCE_TRACKER
225 LOG("sequence is wildly different, cleaning.");
227 clean_up(silence_time, coalesce_time);
229 // standard treatment, add it to the list.
230 rec._sequences.add(sequence, new sequence_record(sequence));
231 if (rec._sequences.elements() > MAX_ITEMS) {
232 // too many sequences floating around now. clean them up.
233 clean_up(silence_time, coalesce_time);
236 rec._last_active = time_stamp();
240 astring text_form(bool verbose) {
242 for (int i = 0; i < elements(); i++) {
243 to_return += borrow(i)->text_form(verbose);
244 if (i < elements() - 1)
245 to_return += parser_bits::platform_eol_to_chars();
254 sequence_tracker::sequence_tracker(int coalesce_time, int silence_time)
255 : _coalesce_time(coalesce_time),
256 _silence_time(silence_time),
257 _hosts(new host_history),
262 sequence_tracker::~sequence_tracker()
268 astring sequence_tracker::text_form(bool verbose) const
270 auto_synchronizer l(*_lock);
271 return _hosts->text_form(verbose);
274 void sequence_tracker::add_pair(const machine_uid &host, int sequence)
276 auto_synchronizer l(*_lock);
277 if (!_hosts->add_sequence(host, sequence, _silence_time, _coalesce_time)) {
283 bool sequence_tracker::have_seen(const machine_uid &host, int sequence)
285 auto_synchronizer l(*_lock);
286 int indy = _hosts->find_host(host);
287 if (negative(indy)) return false;
288 host_record &rec = *_hosts->borrow(indy);
289 if (sequence <= rec._received_to) return true;
290 sequence_record *found;
291 return !!rec._sequences.find(sequence, found);
294 void sequence_tracker::clean_up()
296 auto_synchronizer l(*_lock);
297 _hosts->clean_up(_silence_time, _coalesce_time);