Merge branch 'master' of feistymeow.org:feisty_meow
[feisty_meow.git] / octopi / library / sockets / sequence_tracker.cpp
1 /*****************************************************************************\
2 *                                                                             *
3 *  Name   : sequence_tracker                                                  *
4 *  Author : Chris Koeritz                                                     *
5 *                                                                             *
6 *******************************************************************************
7 * Copyright (c) 2002-$now By Author.  This program is free software; you can  *
8 * redistribute it and/or modify it under the terms of the GNU General Public  *
9 * License as published by the Free Software Foundation; either version 2 of   *
10 * the License or (at your option) any later version.  This is online at:      *
11 *     http://www.fsf.org/copyleft/gpl.html                                    *
12 * Please send any updates to: fred@gruntose.com                               *
13 \*****************************************************************************/
14
15 #include "machine_uid.h"
16 #include "sequence_tracker.h"
17
18 #include <basis/functions.h>
19 #include <basis/astring.h>
20
21 #include <basis/mutex.h>
22 #include <loggers/program_wide_logger.h>
23 #include <structures/amorph.h>
24 #include <structures/int_hash.h>
25 #include <textual/parser_bits.h>
26 #include <timely/time_stamp.h>
27
28 using namespace basis;
29 using namespace loggers;
30 using namespace structures;
31 using namespace textual;
32 using namespace timely;
33
34 namespace sockets {
35
36 const int MAX_BITS_FOR_SEQ_HASH = 10;
37   // the number of bits in the hash table of sequences, allowing 2^max buckets.
38
39 const int CLEANING_SPAN = 20000;
40   // if the sequence number is this far off from the one received, we will
41   // clean up the span list.
42
43 const int MAX_ITEMS = 200;
44   // maximum number of items in tracker.  this is quite low since we don't
45   // want to be lugging around thousands of indices.  for connection oriented,
46   // it will never be much of an issue, although for a broadcast style bus it
47   // could be kind of an issue if we do retransmissions with a lot of lag.
48
49 #undef LOG
50 #define LOG(s) CLASS_EMERGENCY_LOG(program_wide_logger::get(), s)
51
52 //hmmm: we need to address when a host has:
53 //        a) rolled over in sequences (not for 4 years at least?)
54 //        b) changed its address where another host now has old address (which
55 //           is also a weirdo sequence jump, maybe backwards, maybe not).
56 //      would our timing help to guarantee that any oddness introduced is
57 //      swamped out in a few minutes?  the worst thing would be a lost packet
58 //      we dumped because we thought we'd seen it but hadn't.
59 //      probably we will see a sequence that's really old seeming; should that
60 //      be enough to trigger flushing the whole host?
61
62 class sequence_record
63 {
64 public:
65   int _sequence;  // the sequence number in question.
66   time_stamp _when;  // when we received this sequence.
67
68   sequence_record(int seq = 0) : _sequence(seq) {}
69
70   astring text_form() const {
71     return a_sprintf("seq=%d, time=", _sequence) + _when.text_form();
72   }
73 };
74
75 //////////////
76
77 class host_record
78 {
79 public:
80   int _received_to;  // highest sequence we've got full reception prior to.
81   machine_uid _host;  // the host we're concerned with.
82   int_hash<sequence_record> _sequences;  // record of active sequences.
83   time_stamp _last_active;  // the last time we heard from this host.
84     // we could piece this together from the sequences but we prefer not to.
85
86   host_record(const machine_uid &host)
87   : _received_to(0), _host(host), _sequences(MAX_BITS_FOR_SEQ_HASH),
88     _last_active()
89   {}
90
91   void clean_up(int coalesce_time) {
92     // check sequences for being right next to the highest received sequence.
93     // if they're one up, they can be collapsed without waiting for the aging
94     // process.
95     int_set ids;
96     _sequences.ids(ids);
97
98     // we restrict the size of the array with this block.
99     if (ids.elements() > MAX_ITEMS) {
100       int zap_point = ids.elements() - MAX_ITEMS;
101         // we want to remove anything before this index.
102       for (int s0 = 0; s0 < zap_point; s0++) {
103         int seq = ids[s0];
104         _sequences.zap(seq);
105         // set our received_to value from the current element.
106         if (_received_to < seq)
107           _received_to = seq;
108       }
109       // now clean the list of our ids since they're gone.
110       ids.zap(0, zap_point - 1);
111     }
112
113     for (int s1 = 0; s1 < ids.elements(); s1++) {
114       sequence_record *seq;
115       int id = ids[s1];
116       if (!_sequences.find(id, seq)) continue;  // bad mistake going on.
117       if (_received_to + 1 == seq->_sequence) {
118         // we've hit one that can be collapsed.
119         _received_to++;
120         _sequences.zap(id);
121         ids.zap(s1, s1);
122         s1--;  // skip back before deleted item.
123       }
124     }
125
126     // check sequence ages.  coalesce any older ones.
127     for (int s2 = 0; s2 < ids.elements(); s2++) {
128       sequence_record *seq;
129       int id = ids[s2];
130       if (!_sequences.find(id, seq)) continue;  // bad mistake going on.
131       if (seq->_when < time_stamp(-coalesce_time)) {
132         // this sequence number has floated too long; crush it now.
133         if (_received_to < seq->_sequence)
134           _received_to = seq->_sequence;  // update highest received seq.
135         _sequences.zap(id);
136       }
137     }
138   }
139
140   astring text_form(bool verbose) {
141     astring to_return;
142     to_return += astring("host=") + _host.text_form()
143         + a_sprintf(", rec_to=%d", _received_to)
144         + ", active=" + _last_active.text_form();
145     if (verbose) {
146       int_set ids;
147       _sequences.ids(ids);
148       for (int i = 0; i < ids.elements(); i++) {
149         sequence_record *found;
150         if (!_sequences.find(ids[i], found))
151           continue;  // that's a bad thing.
152         to_return += astring(parser_bits::platform_eol_to_chars()) + "\t"
153             + found->text_form();
154       }
155     } else {
156       to_return += a_sprintf(", sequences held=%d", _sequences.elements());
157     }
158     return to_return;
159   }
160
161 };
162
163 //////////////
164
165 //hmmm: should this be a hash table?
166
167 class host_history : public amorph<host_record>
168 {
169 public:
170   virtual ~host_history() {}
171
172   DEFINE_CLASS_NAME("host_history");
173
174   int find_host(const machine_uid &to_find) {
175     for (int i = 0; i < elements(); i++) {
176       if (borrow(i)->_host == to_find) return i;
177     }
178     return common::NOT_FOUND;
179   }
180
181   bool whack_host(const machine_uid &to_find) {
182     int indy = find_host(to_find);
183     if (negative(indy)) return false;
184     zap(indy, indy);
185     return true;
186   }
187
188   void clean_up(int silence_time, int coalesce_time) {
189     for (int h = 0; h < elements(); h++) {
190       host_record &rec = *borrow(h);
191       // check host liveliness.
192       if (rec._last_active < time_stamp(-silence_time)) {
193         // this host got too stale; whack it now.
194         zap(h, h);
195         h--;  // skip back to prior element.
196         continue;
197       }
198       rec.clean_up(coalesce_time);
199     }
200   }
201
202   bool add_sequence(const machine_uid &to_find, int sequence,
203       int silence_time, int coalesce_time) {
204     FUNCDEF("add_sequence");
205     int indy = find_host(to_find);
206     if (negative(indy)) {
207       host_record *rec = new host_record(to_find);
208       append(rec);
209       indy = find_host(to_find);
210       if (negative(indy)) {
211         LOG(astring("*** failure to add a host to the tracker: ")
212             + to_find.text_form());
213         return false;  // serious error.
214       }
215     }
216     host_record &rec = *borrow(indy);
217     if (borrow(indy)->_received_to + 1 == sequence) {
218       // this is just one up from our last received guy, so optimize it out.
219       rec._received_to = sequence;
220     } else if (sequence - borrow(indy)->_received_to > CLEANING_SPAN) {
221       // if the number is wildly different, assume we haven't dealt with this
222       // for too long.
223       rec._received_to = sequence;
224 #ifdef DEBUG_SEQUENCE_TRACKER
225       LOG("sequence is wildly different, cleaning.");
226 #endif
227       clean_up(silence_time, coalesce_time);
228     } else {
229       // standard treatment, add it to the list.
230       rec._sequences.add(sequence, new sequence_record(sequence));
231       if (rec._sequences.elements() > MAX_ITEMS) {
232         // too many sequences floating around now.  clean them up.
233         clean_up(silence_time, coalesce_time);
234       }
235     }
236     rec._last_active = time_stamp();
237     return true;
238   }
239
240   astring text_form(bool verbose) {
241     astring to_return;
242     for (int i = 0; i < elements(); i++) {
243       to_return += borrow(i)->text_form(verbose);
244       if (i < elements() - 1)
245         to_return += parser_bits::platform_eol_to_chars();
246     }
247     return to_return;
248   }
249
250 };
251
252 //////////////
253
254 sequence_tracker::sequence_tracker(int coalesce_time, int silence_time)
255 : _coalesce_time(coalesce_time),
256   _silence_time(silence_time),
257   _hosts(new host_history),
258   _lock(new mutex)
259 {
260 }
261
262 sequence_tracker::~sequence_tracker()
263 {
264   WHACK(_lock);
265   WHACK(_hosts);
266 }
267
268 astring sequence_tracker::text_form(bool verbose) const
269 {
270   auto_synchronizer l(*_lock);
271   return _hosts->text_form(verbose);
272 }
273
274 void sequence_tracker::add_pair(const machine_uid &host, int sequence)
275 {
276   auto_synchronizer l(*_lock);
277   if (!_hosts->add_sequence(host, sequence, _silence_time, _coalesce_time)) {
278 //complain? 
279     return;
280   }
281 }
282
283 bool sequence_tracker::have_seen(const machine_uid &host, int sequence)
284 {
285   auto_synchronizer l(*_lock);
286   int indy = _hosts->find_host(host);
287   if (negative(indy)) return false;
288   host_record &rec = *_hosts->borrow(indy);
289   if (sequence <= rec._received_to) return true;
290   sequence_record *found;
291   return !!rec._sequences.find(sequence, found);
292 }
293
294 void sequence_tracker::clean_up()
295 {
296   auto_synchronizer l(*_lock);
297   _hosts->clean_up(_silence_time, _coalesce_time);
298 }
299
300 } //namespace.
301
302