feisty meow concerns codebase  2.140
sequence_tracker.cpp
Go to the documentation of this file.
1 /*****************************************************************************\
2 * *
3 * Name : sequence_tracker *
4 * Author : Chris Koeritz *
5 * *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
14 
15 #include "machine_uid.h"
16 #include "sequence_tracker.h"
17 
18 #include <basis/functions.h>
19 #include <basis/astring.h>
20 
21 #include <basis/mutex.h>
23 #include <structures/amorph.h>
24 #include <structures/int_hash.h>
25 #include <textual/parser_bits.h>
26 #include <timely/time_stamp.h>
27 
28 using namespace basis;
29 using namespace loggers;
30 using namespace structures;
31 using namespace textual;
32 using namespace timely;
33 
34 namespace sockets {
35 
36 const int MAX_BITS_FOR_SEQ_HASH = 10;
37  // the number of bits in the hash table of sequences, allowing 2^max buckets.
38 
39 const int CLEANING_SPAN = 20000;
40  // if the sequence number is this far off from the one received, we will
41  // clean up the span list.
42 
43 const int MAX_ITEMS = 200;
44  // maximum number of items in tracker. this is quite low since we don't
45  // want to be lugging around thousands of indices. for connection oriented,
46  // it will never be much of an issue, although for a broadcast style bus it
47  // could be kind of an issue if we do retransmissions with a lot of lag.
48 
49 #undef LOG
50 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
51 
52 //hmmm: we need to address when a host has:
53 // a) rolled over in sequences (not for 4 years at least?)
54 // b) changed its address where another host now has old address (which
55 // is also a weirdo sequence jump, maybe backwards, maybe not).
56 // would our timing help to guarantee that any oddness introduced is
57 // swamped out in a few minutes? the worst thing would be a lost packet
58 // we dumped because we thought we'd seen it but hadn't.
59 // probably we will see a sequence that's really old seeming; should that
60 // be enough to trigger flushing the whole host?
61 
62 class sequence_record
63 {
64 public:
65  int _sequence; // the sequence number in question.
66  time_stamp _when; // when we received this sequence.
67 
68  sequence_record(int seq = 0) : _sequence(seq) {}
69 
70  astring text_form() const {
71  return a_sprintf("seq=%d, time=", _sequence) + _when.text_form();
72  }
73 };
74 
76 
77 class host_record
78 {
79 public:
80  int _received_to; // highest sequence we've got full reception prior to.
81  machine_uid _host; // the host we're concerned with.
82  int_hash<sequence_record> _sequences; // record of active sequences.
83  time_stamp _last_active; // the last time we heard from this host.
84  // we could piece this together from the sequences but we prefer not to.
85 
86  host_record(const machine_uid &host)
87  : _received_to(0), _host(host), _sequences(MAX_BITS_FOR_SEQ_HASH),
88  _last_active()
89  {}
90 
91  void clean_up(int coalesce_time) {
92  // check sequences for being right next to the highest received sequence.
93  // if they're one up, they can be collapsed without waiting for the aging
94  // process.
95  int_set ids;
96  _sequences.ids(ids);
97 
98  // we restrict the size of the array with this block.
99  if (ids.elements() > MAX_ITEMS) {
100  int zap_point = ids.elements() - MAX_ITEMS;
101  // we want to remove anything before this index.
102  for (int s0 = 0; s0 < zap_point; s0++) {
103  int seq = ids[s0];
104  _sequences.zap(seq);
105  // set our received_to value from the current element.
106  if (_received_to < seq)
107  _received_to = seq;
108  }
109  // now clean the list of our ids since they're gone.
110  ids.zap(0, zap_point - 1);
111  }
112 
113  for (int s1 = 0; s1 < ids.elements(); s1++) {
114  sequence_record *seq;
115  int id = ids[s1];
116  if (!_sequences.find(id, seq)) continue; // bad mistake going on.
117  if (_received_to + 1 == seq->_sequence) {
118  // we've hit one that can be collapsed.
119  _received_to++;
120  _sequences.zap(id);
121  ids.zap(s1, s1);
122  s1--; // skip back before deleted item.
123  }
124  }
125 
126  // check sequence ages. coalesce any older ones.
127  for (int s2 = 0; s2 < ids.elements(); s2++) {
128  sequence_record *seq;
129  int id = ids[s2];
130  if (!_sequences.find(id, seq)) continue; // bad mistake going on.
131  if (seq->_when < time_stamp(-coalesce_time)) {
132  // this sequence number has floated too long; crush it now.
133  if (_received_to < seq->_sequence)
134  _received_to = seq->_sequence; // update highest received seq.
135  _sequences.zap(id);
136  }
137  }
138  }
139 
140  astring text_form(bool verbose) {
141  astring to_return;
142  to_return += astring("host=") + _host.text_form()
143  + a_sprintf(", rec_to=%d", _received_to)
144  + ", active=" + _last_active.text_form();
145  if (verbose) {
146  int_set ids;
147  _sequences.ids(ids);
148  for (int i = 0; i < ids.elements(); i++) {
149  sequence_record *found;
150  if (!_sequences.find(ids[i], found))
151  continue; // that's a bad thing.
152  to_return += astring(parser_bits::platform_eol_to_chars()) + "\t"
153  + found->text_form();
154  }
155  } else {
156  to_return += a_sprintf(", sequences held=%d", _sequences.elements());
157  }
158  return to_return;
159  }
160 
161 };
162 
164 
165 //hmmm: should this be a hash table?
166 
167 class host_history : public amorph<host_record>
168 {
169 public:
170  virtual ~host_history() {}
171 
172  DEFINE_CLASS_NAME("host_history");
173 
174  int find_host(const machine_uid &to_find) {
175  for (int i = 0; i < elements(); i++) {
176  if (borrow(i)->_host == to_find) return i;
177  }
178  return common::NOT_FOUND;
179  }
180 
181  bool whack_host(const machine_uid &to_find) {
182  int indy = find_host(to_find);
183  if (negative(indy)) return false;
184  zap(indy, indy);
185  return true;
186  }
187 
188  void clean_up(int silence_time, int coalesce_time) {
189  for (int h = 0; h < elements(); h++) {
190  host_record &rec = *borrow(h);
191  // check host liveliness.
192  if (rec._last_active < time_stamp(-silence_time)) {
193  // this host got too stale; whack it now.
194  zap(h, h);
195  h--; // skip back to prior element.
196  continue;
197  }
198  rec.clean_up(coalesce_time);
199  }
200  }
201 
202  bool add_sequence(const machine_uid &to_find, int sequence,
203  int silence_time, int coalesce_time) {
204  FUNCDEF("add_sequence");
205  int indy = find_host(to_find);
206  if (negative(indy)) {
207  host_record *rec = new host_record(to_find);
208  append(rec);
209  indy = find_host(to_find);
210  if (negative(indy)) {
211  LOG(astring("*** failure to add a host to the tracker: ")
212  + to_find.text_form());
213  return false; // serious error.
214  }
215  }
216  host_record &rec = *borrow(indy);
217  if (borrow(indy)->_received_to + 1 == sequence) {
218  // this is just one up from our last received guy, so optimize it out.
219  rec._received_to = sequence;
220  } else if (sequence - borrow(indy)->_received_to > CLEANING_SPAN) {
221  // if the number is wildly different, assume we haven't dealt with this
222  // for too long.
223  rec._received_to = sequence;
224 #ifdef DEBUG_SEQUENCE_TRACKER
225  LOG("sequence is wildly different, cleaning.");
226 #endif
227  clean_up(silence_time, coalesce_time);
228  } else {
229  // standard treatment, add it to the list.
230  rec._sequences.add(sequence, new sequence_record(sequence));
231  if (rec._sequences.elements() > MAX_ITEMS) {
232  // too many sequences floating around now. clean them up.
233  clean_up(silence_time, coalesce_time);
234  }
235  }
236  rec._last_active = time_stamp();
237  return true;
238  }
239 
240  astring text_form(bool verbose) {
241  astring to_return;
242  for (int i = 0; i < elements(); i++) {
243  to_return += borrow(i)->text_form(verbose);
244  if (i < elements() - 1)
245  to_return += parser_bits::platform_eol_to_chars();
246  }
247  return to_return;
248  }
249 
250 };
251 
253 
254 sequence_tracker::sequence_tracker(int coalesce_time, int silence_time)
255 : _coalesce_time(coalesce_time),
256  _silence_time(silence_time),
257  _hosts(new host_history),
258  _lock(new mutex)
259 {
260 }
261 
263 {
264  WHACK(_lock);
265  WHACK(_hosts);
266 }
267 
269 {
270  auto_synchronizer l(*_lock);
271  return _hosts->text_form(verbose);
272 }
273 
274 void sequence_tracker::add_pair(const machine_uid &host, int sequence)
275 {
276  auto_synchronizer l(*_lock);
277  if (!_hosts->add_sequence(host, sequence, _silence_time, _coalesce_time)) {
278 //complain?
279  return;
280  }
281 }
282 
283 bool sequence_tracker::have_seen(const machine_uid &host, int sequence)
284 {
285  auto_synchronizer l(*_lock);
286  int indy = _hosts->find_host(host);
287  if (negative(indy)) return false;
288  host_record &rec = *_hosts->borrow(indy);
289  if (sequence <= rec._received_to) return true;
290  sequence_record *found;
291  return !!rec._sequences.find(sequence, found);
292 }
293 
295 {
296  auto_synchronizer l(*_lock);
297  _hosts->clean_up(_silence_time, _coalesce_time);
298 }
299 
300 } //namespace.
301 
302 
a_sprintf is a specialization of astring that provides printf style support.
Definition: astring.h:440
outcome zap(int start, int end)
Deletes from "this" the objects inclusively between "start" and "end".
Definition: array.h:769
Provides a dynamically resizable ASCII character string.
Definition: astring.h:35
virtual void text_form(base_string &state_fill) const
Provides a text view of all the important info owned by this object.
Definition: astring.cpp:130
auto_synchronizer simplifies concurrent code by automatically unlocking.
Definition: mutex.h:113
bool have_seen(const machine_uid &host, int sequence)
void add_pair(const machine_uid &host, int sequence)
basis::astring text_form(bool verbose=false) const
int elements() const
the number of valid items we found by traversing the hash table.
Definition: hash_table.h:576
bool find(const key_type &key, contents *&item_found) const
locates the item specified by the "key", if possible.
Definition: hash_table.h:460
bool zap(int key)
overrides base zap() method plus keeps id list updated.
Definition: int_hash.h:102
const int_set & ids() const
Definition: int_hash.h:82
A simple object that wraps a templated set of ints.
Definition: set.h:156
int elements() const
Returns the number of elements in this set.
Definition: set.h:47
Represents a point in time relative to the operating system startup time.
Definition: time_stamp.h:38
basis::astring text_form(stamp_display_style style=STAMP_RELATIVE) const
returns a simple textual representation of the time_stamp.
Definition: time_stamp.cpp:61
#define DEFINE_CLASS_NAME(objname)
Defines the name of a class by providing a couple standard methods.
Definition: enhance_cpp.h:45
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
Definition: enhance_cpp.h:57
bool verbose
Definition: makedep.cpp:112
bool append
Definition: makedep.cpp:110
The guards collection helps in testing preconditions and reporting errors.
Definition: array.h:30
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
Definition: functions.h:121
bool negative(const type &a)
negative returns true if "a" is less than zero.
Definition: functions.h:43
A logger that sends to the console screen using the standard output device.
Provides access to the operating system's socket methods.
Definition: base_address.h:26
const int CLEANING_SPAN
const int MAX_BITS_FOR_SEQ_HASH
const int MAX_ITEMS
A dynamic container class that holds any kind of object via pointers.
Definition: amorph.h:55
#include <time.h>
Definition: earth_time.cpp:37
#define LOG(s)