feisty meow concerns codebase 2.140
sequence_tracker.cpp
Go to the documentation of this file.
1/*****************************************************************************\
2* *
3* Name : sequence_tracker *
4* Author : Chris Koeritz *
5* *
6*******************************************************************************
7* Copyright (c) 2002-$now By Author. This program is free software; you can *
8* redistribute it and/or modify it under the terms of the GNU General Public *
9* License as published by the Free Software Foundation; either version 2 of *
10* the License or (at your option) any later version. This is online at: *
11* http://www.fsf.org/copyleft/gpl.html *
12* Please send any updates to: fred@gruntose.com *
13\*****************************************************************************/
14
15#include "machine_uid.h"
16#include "sequence_tracker.h"
17
18#include <basis/functions.h>
19#include <basis/astring.h>
20
21#include <basis/mutex.h>
23#include <structures/amorph.h>
24#include <structures/int_hash.h>
25#include <textual/parser_bits.h>
26#include <timely/time_stamp.h>
27
28using namespace basis;
29using namespace loggers;
30using namespace structures;
31using namespace textual;
32using namespace timely;
33
34namespace sockets {
35
37 // the estimated elements in the hash table of sequences, before we start needing buckets.
38
39const int CLEANING_SPAN = 20000;
40 // if the sequence number is this far off from the one received, we will
41 // clean up the span list.
42
43const int MAX_ITEMS = 200;
44 // maximum number of items in tracker. this is quite low since we don't
45 // want to be lugging around thousands of indices. for connection oriented,
46 // it will never be much of an issue, although for a broadcast style bus it
47 // could be kind of an issue if we do retransmissions with a lot of lag.
48
49#undef LOG
50#define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
51
52//hmmm: we need to address when a host has:
53// a) rolled over in sequences (not for 4 years at least?)
54// b) changed its address where another host now has old address (which
55// is also a weirdo sequence jump, maybe backwards, maybe not).
56// would our timing help to guarantee that any oddness introduced is
57// swamped out in a few minutes? the worst thing would be a lost packet
58// we dumped because we thought we'd seen it but hadn't.
59// probably we will see a sequence that's really old seeming; should that
60// be enough to trigger flushing the whole host?
61
62class sequence_record
63{
64public:
65 int _sequence; // the sequence number in question.
66 time_stamp _when; // when we received this sequence.
67
68 sequence_record(int seq = 0) : _sequence(seq) {}
69
70 astring text_form() const {
71 return a_sprintf("seq=%d, time=", _sequence) + _when.text_form();
72 }
73};
74
76
77class host_record
78{
79public:
80 int _received_to; // highest sequence we've got full reception prior to.
81 machine_uid _host; // the host we're concerned with.
82 int_hash<sequence_record> _sequences; // record of active sequences.
83 time_stamp _last_active; // the last time we heard from this host.
84 // we could piece this together from the sequences but we prefer not to.
85
86 host_record(const machine_uid &host)
87 : _received_to(0), _host(host), _sequences(MAX_ELEMENTS_FOR_SEQ_HASH),
88 _last_active()
89 {}
90
91 void clean_up(int coalesce_time) {
92 // check sequences for being right next to the highest received sequence.
93 // if they're one up, they can be collapsed without waiting for the aging
94 // process.
95 int_set ids;
96 _sequences.ids(ids);
97
98 // we restrict the size of the array with this block.
99 if (ids.elements() > MAX_ITEMS) {
100 int zap_point = ids.elements() - MAX_ITEMS;
101 // we want to remove anything before this index.
102 for (int s0 = 0; s0 < zap_point; s0++) {
103 int seq = ids[s0];
104 _sequences.zap(seq);
105 // set our received_to value from the current element.
106 if (_received_to < seq)
107 _received_to = seq;
108 }
109 // now clean the list of our ids since they're gone.
110 ids.zap(0, zap_point - 1);
111 }
112
113 for (int s1 = 0; s1 < ids.elements(); s1++) {
114 sequence_record *seq;
115 int id = ids[s1];
116 if (!_sequences.find(id, seq)) continue; // bad mistake going on.
117 if (_received_to + 1 == seq->_sequence) {
118 // we've hit one that can be collapsed.
119 _received_to++;
120 _sequences.zap(id);
121 ids.zap(s1, s1);
122 s1--; // skip back before deleted item.
123 }
124 }
125
126 // check sequence ages. coalesce any older ones.
127 for (int s2 = 0; s2 < ids.elements(); s2++) {
128 sequence_record *seq;
129 int id = ids[s2];
130 if (!_sequences.find(id, seq)) continue; // bad mistake going on.
131 if (seq->_when < time_stamp(-coalesce_time)) {
132 // this sequence number has floated too long; crush it now.
133 if (_received_to < seq->_sequence)
134 _received_to = seq->_sequence; // update highest received seq.
135 _sequences.zap(id);
136 }
137 }
138 }
139
140 astring text_form(bool verbose) {
141 astring to_return;
142 to_return += astring("host=") + _host.text_form()
143 + a_sprintf(", rec_to=%d", _received_to)
144 + ", active=" + _last_active.text_form();
145 if (verbose) {
146 int_set ids;
147 _sequences.ids(ids);
148 for (int i = 0; i < ids.elements(); i++) {
149 sequence_record *found;
150 if (!_sequences.find(ids[i], found))
151 continue; // that's a bad thing.
152 to_return += astring(parser_bits::platform_eol_to_chars()) + "\t"
153 + found->text_form();
154 }
155 } else {
156 to_return += a_sprintf(", sequences held=%d", _sequences.elements());
157 }
158 return to_return;
159 }
160
161};
162
164
165//hmmm: should this be a hash table?
166
167class host_history : public amorph<host_record>
168{
169public:
170 virtual ~host_history() {}
171
172 DEFINE_CLASS_NAME("host_history");
173
174 int find_host(const machine_uid &to_find) {
175 for (int i = 0; i < elements(); i++) {
176 if (borrow(i)->_host == to_find) return i;
177 }
178 return common::NOT_FOUND;
179 }
180
181 bool whack_host(const machine_uid &to_find) {
182 int indy = find_host(to_find);
183 if (negative(indy)) return false;
184 zap(indy, indy);
185 return true;
186 }
187
188 void clean_up(int silence_time, int coalesce_time) {
189 for (int h = 0; h < elements(); h++) {
190 host_record &rec = *borrow(h);
191 // check host liveliness.
192 if (rec._last_active < time_stamp(-silence_time)) {
193 // this host got too stale; whack it now.
194 zap(h, h);
195 h--; // skip back to prior element.
196 continue;
197 }
198 rec.clean_up(coalesce_time);
199 }
200 }
201
202 bool add_sequence(const machine_uid &to_find, int sequence,
203 int silence_time, int coalesce_time) {
204 FUNCDEF("add_sequence");
205 int indy = find_host(to_find);
206 if (negative(indy)) {
207 host_record *rec = new host_record(to_find);
208 append(rec);
209 indy = find_host(to_find);
210 if (negative(indy)) {
211 LOG(astring("*** failure to add a host to the tracker: ")
212 + to_find.text_form());
213 return false; // serious error.
214 }
215 }
216 host_record &rec = *borrow(indy);
217 if (borrow(indy)->_received_to + 1 == sequence) {
218 // this is just one up from our last received guy, so optimize it out.
219 rec._received_to = sequence;
220 } else if (sequence - borrow(indy)->_received_to > CLEANING_SPAN) {
221 // if the number is wildly different, assume we haven't dealt with this
222 // for too long.
223 rec._received_to = sequence;
224#ifdef DEBUG_SEQUENCE_TRACKER
225 LOG("sequence is wildly different, cleaning.");
226#endif
227 clean_up(silence_time, coalesce_time);
228 } else {
229 // standard treatment, add it to the list.
230 rec._sequences.add(sequence, new sequence_record(sequence));
231 if (rec._sequences.elements() > MAX_ITEMS) {
232 // too many sequences floating around now. clean them up.
233 clean_up(silence_time, coalesce_time);
234 }
235 }
236 rec._last_active = time_stamp();
237 return true;
238 }
239
240 astring text_form(bool verbose) {
241 astring to_return;
242 for (int i = 0; i < elements(); i++) {
243 to_return += borrow(i)->text_form(verbose);
244 if (i < elements() - 1)
246 }
247 return to_return;
248 }
249
250};
251
253
254sequence_tracker::sequence_tracker(int coalesce_time, int silence_time)
255: _coalesce_time(coalesce_time),
256 _silence_time(silence_time),
257 _hosts(new host_history),
258 _lock(new mutex)
259{
260}
261
263{
264 WHACK(_lock);
265 WHACK(_hosts);
266}
267
269{
270 auto_synchronizer l(*_lock);
271 return _hosts->text_form(verbose);
272}
273
274void sequence_tracker::add_pair(const machine_uid &host, int sequence)
275{
276 auto_synchronizer l(*_lock);
277 if (!_hosts->add_sequence(host, sequence, _silence_time, _coalesce_time)) {
278//complain?
279 return;
280 }
281}
282
283bool sequence_tracker::have_seen(const machine_uid &host, int sequence)
284{
285 auto_synchronizer l(*_lock);
286 int indy = _hosts->find_host(host);
287 if (negative(indy)) return false;
288 host_record &rec = *_hosts->borrow(indy);
289 if (sequence <= rec._received_to) return true;
290 sequence_record *found;
291 return !!rec._sequences.find(sequence, found);
292}
293
295{
296 auto_synchronizer l(*_lock);
297 _hosts->clean_up(_silence_time, _coalesce_time);
298}
299
300} //namespace.
301
302
#define LOG(s)
a_sprintf is a specialization of astring that provides printf style support.
Definition astring.h:440
outcome zap(int start, int end)
Deletes from "this" the objects inclusively between "start" and "end".
Definition array.h:769
Provides a dynamically resizable ASCII character string.
Definition astring.h:35
virtual void text_form(base_string &state_fill) const
Provides a text view of all the important info owned by this object.
Definition astring.cpp:130
auto_synchronizer simplifies concurrent code by automatically unlocking.
Definition mutex.h:113
bool have_seen(const machine_uid &host, int sequence)
void add_pair(const machine_uid &host, int sequence)
sequence_tracker(int coalesce_time, int silence_time)
basis::astring text_form(bool verbose=false) const
int elements() const
the maximum number of elements currently allowed in this amorph.
Definition amorph.h:66
basis::outcome zap(int start, int end)
Removes a range of indices from the amorph.
Definition amorph.h:357
host_record * borrow(int field)
Returns a pointer to the information at the index "field".
Definition amorph.h:448
int elements() const
the number of valid items we found by traversing the hash table.
Definition hash_table.h:581
bool find(const key_type &key, contents *&item_found) const
locates the item specified by the "key", if possible.
Definition hash_table.h:465
A hash table for storing integers.
Definition int_hash.h:36
bool zap(int key)
overrides base zap() method plus keeps id list updated.
Definition int_hash.h:102
const int_set & ids() const
Definition int_hash.h:82
A simple object that wraps a templated set of ints.
Definition set.h:156
int elements() const
Returns the number of elements in this set.
Definition set.h:47
static const char * platform_eol_to_chars()
provides the characters that make up this platform's line ending.
Represents a point in time relative to the operating system startup time.
Definition time_stamp.h:38
basis::astring text_form(stamp_display_style style=STAMP_RELATIVE) const
returns a simple textual representation of the time_stamp.
#define DEFINE_CLASS_NAME(objname)
Defines the name of a class by providing a couple standard methods.
Definition enhance_cpp.h:42
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
Definition enhance_cpp.h:54
bool verbose
Definition makedep.cpp:112
bool append
Definition makedep.cpp:110
The guards collection helps in testing preconditions and reporting errors.
Definition array.h:30
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
Definition functions.h:121
bool negative(const type &a)
negative returns true if "a" is less than zero.
Definition functions.h:43
A logger that sends to the console screen using the standard output device.
Provides access to the operating system's socket methods.
const int CLEANING_SPAN
const int MAX_ITEMS
const int MAX_ELEMENTS_FOR_SEQ_HASH
A dynamic container class that holds any kind of object via pointers.
Definition amorph.h:55
#include <time.h>