e4fc5511fdd937fd2ead375d99de3807fa1f0d8d
[feisty_meow.git] / octopi / library / sockets / socket_minder.cpp
1 /*****************************************************************************\
2 *                                                                             *
3 *  Name   : socket_minder                                                     *
4 *  Author : Chris Koeritz                                                     *
5 *                                                                             *
6 *******************************************************************************
7 * Copyright (c) 1999-$now By Author.  This program is free software; you can  *
8 * redistribute it and/or modify it under the terms of the GNU General Public  *
9 * License as published by the Free Software Foundation; either version 2 of   *
10 * the License or (at your option) any later version.  This is online at:      *
11 *     http://www.fsf.org/copyleft/gpl.html                                    *
12 * Please send any updates to: fred@gruntose.com                               *
13 \*****************************************************************************/
14
15 #include "raw_socket.h"
16 #include "socket_data.h"
17 #include "socket_minder.h"
18 #include "tcpip_stack.h"
19
20 #include <basis/mutex.h>
21 #include <loggers/critical_events.h>
22 #include <loggers/program_wide_logger.h>
23 #include <processes/ethread.h>
24 #include <processes/os_event.h>
25 #include <structures/set.h>
26 #include <structures/amorph.h>
27 #include <structures/unique_id.h>
28 #include <textual/parser_bits.h>
29
30 #include <errno.h>
31 #ifdef __WIN32__
32   #include <ws2tcpip.h>
33 #endif
34 #ifdef __UNIX__
35   #include <arpa/inet.h>
36   #include <sys/socket.h>
37 #endif
38
39 using namespace basis;
40 using namespace loggers;
41 using namespace processes;
42 using namespace structures;
43 using namespace textual;
44 using namespace timely;
45
46 namespace sockets {
47
48 //#define DEBUG_SOCKET_MINDER
49   // uncomment for noisiness.
50
51 #undef LOG
52 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), to_print)
53
54 const int SOCKET_CHECK_INTERVAL = 50;
55   // we will scoot around in our sockets this frequently.
56
57 const int SOCKMIND_MAXIMUM_RECEIVES = 10;
58   // we'll receive this many items from the socket in one go.
59
60 const int MAXIMUM_TRANSFER_CHUNK = 512 * KILOBYTE;
61   // largest block of data we'll try to deal with at one time.
62
63 const int CONN_ALERT_INTERVAL = 100;
64   // this is the most frequently that we will generate a connection checking
65   // event.
66
67 const int MULTI_SELECT_TIMEOUT = 250;
68   // the snooze on select will occur for this long.  during this interval,
69   // it is likely no new sockets will be considered.
70
71 //////////////
72
73 class socket_data_amorph : public amorph<socket_data> {};
74
75 //////////////
76
77 class socket_minder_prompter : public ethread
78 {
79 public:
80   socket_minder_prompter(socket_minder &parent)
81   : ethread(SOCKET_CHECK_INTERVAL, ethread::SLACK_INTERVAL),
82     _parent(parent)
83   {
84     start(NULL_POINTER);
85   }
86
87   ~socket_minder_prompter() {
88     stop();  // shut down our thread.
89   }
90
91   virtual void perform_activity(void *formal(ptr)) { _parent.snoozy_select(); }
92
93 private:
94   socket_minder &_parent;  // the object we're hooked to.
95 };
96
97 //////////////
98
99 socket_minder::socket_minder(post_office &post, int parent_route,
100     int event_type, int message)
101 : _post(post),
102   _parent_route(parent_route),
103   _event_type(event_type),
104   _lock(new mutex),
105   _socket_list(new socket_data_amorph),
106   _socks(new raw_socket),
107   _stack(new tcpip_stack),
108   _message(message),
109   _pending_sox(new int_set),
110   _prompter(new socket_minder_prompter(*this))
111 {
112   _prompter->start(NULL_POINTER);
113 }
114
115 socket_minder::~socket_minder()
116 {
117   _prompter->stop();
118   WHACK(_prompter);
119   WHACK(_socket_list);
120   WHACK(_lock);
121   WHACK(_pending_sox);
122   WHACK(_socks); 
123   WHACK(_stack);
124 }
125
126 void socket_minder::disengage()
127 {
128   _prompter->stop();
129 }
130
131 astring socket_minder::text_form() const
132 {
133   auto_synchronizer l(*_lock);
134   astring to_return;
135
136   for (int i = 0; i < _socket_list->elements(); i++) {
137     const socket_data *curr = _socket_list->get(i);
138     to_return += curr->text_form();
139     if (i != _socket_list->elements() - 1)
140       to_return += parser_bits::platform_eol_to_chars();
141   }
142
143   return to_return;
144 }
145
146 void socket_minder::snoozy_select()
147 {
148   FUNCDEF("snoozy_select");
149   int_array read_sox;
150   int_array write_sox;
151   int_array pending;
152
153   get_sockets(read_sox, write_sox, pending);
154
155   // process any with pending connections right now, rather than later.
156   for (int p = 0; p < pending.length(); p++) {
157     socket_data *sd = lock_socket_data(pending[p]);
158     if (!sd) continue;  // something hosed there.
159     handle_pending_connecters(*sd);
160     unlock_socket_data(sd);
161   }
162
163   // now select on all of our sockets simultaneously.
164   int ret = _socks->select(read_sox, write_sox, MULTI_SELECT_TIMEOUT);
165   if (!ret || (!read_sox.length() && !write_sox.length()) ) {
166     return;  // nothing happened.
167   }
168
169   // rotate through the lists and push socket_minders around as needed.
170   // any sockets we have events for but no socket_data are orphans and will
171   // be ignored.
172
173   // check read sockets.
174   for (int r = 0; r < read_sox.length(); r++) {
175     const int sock = read_sox[r];
176     if (owns_socket(sock)) {
177       socket_data *sd = lock_socket_data(sock);
178       if (!sd) continue;  // something hosed there.
179       push_receives(*sd, SI_READABLE);
180       unlock_socket_data(sd);
181       read_sox.zap(r, r);
182       r--;  // skip back before deleted guy.
183     }
184   }
185
186   // check write sockets.
187   for (int w = 0; w < write_sox.length(); w++) {
188     const int sock = write_sox[w];
189     if (owns_socket(sock)) {
190       socket_data *sd = lock_socket_data(sock);
191       if (!sd) continue;  // something hosed there.
192       push_sends(*sd, SI_WRITABLE);
193       unlock_socket_data(sd);
194       write_sox.zap(w, w);
195       w--;  // skip back before deleted guy.
196     }
197   }
198 }
199
200 void socket_minder::get_sockets(int_array &read_sox, int_array &write_sox,
201     int_array &pendings) const
202 {
203   auto_synchronizer l(*_lock);
204   for (int i = 0; i < _socket_list->elements(); i++) {
205     socket_data *sd = _socket_list->borrow(i);
206     if (sd->_connection_pending) {
207       // this is not ready for sends and receives yet.
208       pendings += sd->_socket;
209     } else {
210       // always add sockets to test if they have data waiting.
211       read_sox += sd->_socket;
212       // only check on writability if there is data pending for sending.
213       if (sd->_partially_sent.length())
214         write_sox += sd->_socket;
215     }
216   }
217 }
218
219 bool socket_minder::owns_socket(int socket) const
220 {
221   auto_synchronizer l(*_lock);
222   for (int i = 0; i < _socket_list->elements(); i++) {
223     if (_socket_list->borrow(i)->_socket == socket) return true;
224   }
225   return false;
226 }
227
228 socket_data *socket_minder::lock_socket_data(int socket)
229 {
230   _lock->lock();
231   for (int i = 0; i < _socket_list->elements(); i++)
232     if (_socket_list->borrow(i)->_socket == socket)
233       return _socket_list->borrow(i);
234   // this is a failure to get here; there was no suitable socket.
235   _lock->unlock();
236   return NULL_POINTER;
237 }
238
239 void socket_minder::unlock_socket_data(socket_data *to_unlock)
240 {
241   if (!to_unlock) return;
242 //can't affect it now.  to_unlock = NULL_POINTER;
243   _lock->unlock();
244 }
245
246 bool socket_minder::add_socket_data(int socket, bool server, int server_socket,
247     bool connected_mode, bool connection_pending)
248 {
249   auto_synchronizer l(*_lock);
250   socket_data *harpo = lock_socket_data(socket);
251   if (harpo) {
252     unlock_socket_data(harpo);
253     return false;
254   }
255   socket_data *new_data = new socket_data(socket, server, server_socket,
256       connected_mode);
257   _socks->set_non_blocking(socket);
258     // ensure the new guy is treated as non-blocking.  unix does not seem
259     // to inherit this from the parent.
260   new_data->_connection_pending = connection_pending;
261   _socket_list->append(new_data);
262   return true;
263 }
264
265 bool socket_minder::remove_socket_data(int socket)
266 {
267   FUNCDEF("remove_socket_data");
268   auto_synchronizer l(*_lock);
269   for (int i = 0; i < _socket_list->elements(); i++) {
270     if (_socket_list->borrow(i)->_socket == socket) {
271       // give the socket a last chance to get its data out.
272       evaluate_interest(*_socket_list->borrow(i));
273       _socket_list->zap(i, i);
274       return true;
275     }
276   }
277 //  LOG(a_sprintf("couldn't find socket %d.", socket));
278   return false;
279 }
280
281 bool socket_minder::register_interest(int socket, int interests)
282 {
283 #ifdef DEBUG_SOCKET_MINDER
284   FUNCDEF("register_interest");
285 #endif
286   socket_data *harpo = lock_socket_data(socket);
287 #ifdef DEBUG_SOCKET_MINDER
288   LOG(astring(astring::SPRINTF, "registering interest of %d for socket "
289       "%d.", interests, socket));
290 #endif
291   if (!harpo) return false;
292   harpo->_registered_interests = interests;
293   unlock_socket_data(harpo);
294   return true;
295 }
296
297 bool socket_minder::is_connection_pending(int socket)
298 {
299   socket_data *harpo = lock_socket_data(socket);
300   if (!harpo) return false;
301   bool to_return = harpo->_connection_pending;
302   unlock_socket_data(harpo);
303   return to_return;
304 }
305
306 bool socket_minder::set_connection_pending(int socket, bool pending)
307 {
308   socket_data *harpo = lock_socket_data(socket);
309   if (!harpo) return false;
310   harpo->_connection_pending = pending;
311   unlock_socket_data(harpo);
312   return true;
313 }
314
315 void socket_minder::fire_event(int to_fire, int at_whom,
316     basis::un_int parm1, basis::un_int parm2)
317 {
318   _post.drop_off(at_whom, new OS_event(_event_type, to_fire, parm1, parm2));
319 }
320
321 void socket_minder::put_pending_server(int to_put, bool at_head)
322 {
323   if (!to_put) return;  // bogus.
324   auto_synchronizer l(*_lock);
325   if (at_head) {
326     _pending_sox->insert(0, 1);
327     (*_pending_sox)[0] = to_put;
328   } else {
329     *_pending_sox += to_put;
330   }
331 }
332
333 bool socket_minder::zap_pending_server(int socket)
334 {
335   auto_synchronizer l(*_lock);
336   if (!_pending_sox->member(socket)) return false;
337   _pending_sox->remove(socket);
338   return true;
339 }
340
341 int socket_minder::get_pending_server()
342 {
343   auto_synchronizer l(*_lock);
344   if (!_pending_sox->length()) return 0;
345   int to_return = _pending_sox->get(0);
346   _pending_sox->zap(0, 0);
347   return to_return;
348 }
349
350 bool socket_minder::handle_pending_connecters(socket_data &to_peek)
351 {
352   FUNCDEF("handle_pending_connecters");
353   if (!to_peek._connection_pending) return false;  // not needed here.
354
355   if (to_peek._last_conn_alert > time_stamp(-CONN_ALERT_INTERVAL)) {
356     // not time yet.
357     return false;
358   }
359   to_peek._last_conn_alert.reset();
360
361   // force the issue; there is no simple way to portably know whether
362   // the socket got a connection or not, so just say it did.
363   if (!to_peek._is_server
364       && (to_peek._registered_interests & SI_CONNECTED) ) {
365     // deal with a client first.  this just means, zing an event.
366 #ifdef DEBUG_SOCKET_MINDER
367     LOG(a_sprintf("sending client SI_CONNECTED event on parent %d",
368         _parent_route));
369 #endif
370     fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
371   } else if (to_peek._is_server
372       && (to_peek._registered_interests & SI_CONNECTED) ) {
373     // special handling for servers.  we accept the waiting guy if he's
374     // there.  otherwise we don't send the event.
375     sockaddr sock_addr;
376     socklen_t sock_len = sizeof(sock_addr);
377     int new_sock = int(::accept(to_peek._socket, &sock_addr, &sock_len));
378       // check for a new socket.
379     if (new_sock != INVALID_SOCKET) {
380       LOG(a_sprintf("accept got a client socket %d.", new_sock));
381       if (_pending_sox->member(new_sock)) {
382         LOG(a_sprintf("already have seen socket %d in pending!", new_sock));
383       } else {
384         *_pending_sox += new_sock;
385 #ifdef DEBUG_SOCKET_MINDER
386         LOG(a_sprintf("sending server SI_CONNECTED event on parent %d",
387             _parent_route));
388 #endif
389         fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
390       }
391     } else if (_pending_sox->length()) {
392       // there are still pending connectees.
393       fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
394     }
395   }
396   // also, if the connection is still pending, we don't want to select on
397   // it yet.
398   return true;
399 }
400
401 bool socket_minder::evaluate_interest(socket_data &to_peek)
402 {
403   FUNCDEF("evaluate_interest");
404   if (to_peek._connection_pending) {
405     return handle_pending_connecters(to_peek);
406   }
407
408   int sel_mode = 0;
409
410   int states = _socks->select(to_peek._socket, sel_mode);
411
412   if (!states) {
413     return true;  // nothing to report.
414   }
415
416   if (! (states & SI_ERRONEOUS) && ! (states & SI_DISCONNECTED) ) {
417     push_sends(to_peek, states);
418     push_receives(to_peek, states);
419   }
420
421   if ( (to_peek._registered_interests & SI_ERRONEOUS)
422       && (states & SI_ERRONEOUS) ) {
423     // there's some kind of bad problem on the socket.
424     LOG(a_sprintf("socket %d has status of erroneous.", to_peek._socket));
425 //hmmm: what to do?  generate an event?
426   }
427
428   if ( (to_peek._registered_interests & SI_DISCONNECTED)
429       && (states & SI_DISCONNECTED) ) {
430     // we lost our state of connectedness.
431     fire_event(_message, _parent_route, to_peek._socket,
432         SI_DISCONNECTED);
433     return true;  // get out now.
434   }
435
436   return true;
437 }
438
439 void socket_minder::push_sends(socket_data &to_poke, int states)
440 {
441   FUNCDEF("push_sends");
442   if (to_poke._connection_pending) {
443     LOG("not supposed to try this when not connected yet...");
444   }
445
446 #ifdef DEBUG_SOCKET_MINDER
447   if (to_poke._partially_sent.length()) {
448     LOG(a_sprintf("socket %d: %d bytes to send.", to_poke._socket,
449         to_poke._partially_sent.length()));
450   }
451 #endif
452
453   int error_code = 0;
454
455   if ( (states & SI_WRITABLE) && to_poke._partially_sent.length()) {
456     // write to the socket since we see an opportunity.
457     byte_array &to_send = to_poke._partially_sent;
458     int len_sent = send(to_poke._socket, (char *)to_send.observe(),
459         to_send.length(), 0);
460     if (!len_sent) {
461       // nothing got sent.
462       LOG(a_sprintf("didn't send any data on socket %d.", to_poke._socket));
463     } else if (len_sent == SOCKET_ERROR) {
464       // something bad happened.
465       error_code = critical_events::system_error();
466     } else {
467 #ifdef DEBUG_SOCKET_MINDER
468       LOG(a_sprintf("updating that %d bytes got sent out of %d to send.",
469           len_sent, to_send.length()));
470       if (to_send.length() != len_sent)
471         LOG(a_sprintf("size to send (%d) not same as size sent (%d).",
472             to_send.length(), len_sent));
473 #endif
474       // update the partially sent chunk for the bit we sent out.
475       to_send.zap(0, len_sent - 1);
476     }
477   }
478
479   // handle errors we have seen.
480   if (error_code) {
481     if (error_code != SOCK_EWOULDBLOCK)
482       LOG(astring(astring::SPRINTF, "error on socket %d is %s.",
483           to_poke._socket, _stack->tcpip_error_name(error_code).s()));
484
485     switch (error_code) {
486       case SOCK_ENOTSOCK:  // fall-through.
487       case SOCK_ECONNABORTED:  // fall-through.
488       case SOCK_ECONNRESET: {
489         // the connection got hammerlocked somehow.
490         LOG(a_sprintf("due to %s condition, closing socket %d.",
491             _stack->tcpip_error_name(error_code).s(), to_poke._socket));
492         fire_event(_message, _parent_route, to_poke._socket,
493             SI_DISCONNECTED);
494         to_poke._partially_sent.reset();  // clear with no connection.
495         break;
496       }
497     }
498   }
499 }
500
501 void socket_minder::push_receives(socket_data &to_poke, int states)
502 {
503   FUNCDEF("push_receives");
504   if (to_poke._connection_pending) {
505     LOG("not supposed to try this when not connected yet...");
506   }
507
508 #ifdef DEBUG_SOCKET_MINDER
509   if (to_poke._partially_received.length())
510     LOG(a_sprintf("socket %d: %d bytes already received.", to_poke._socket,
511         to_poke._partially_received.length()));
512 #endif
513
514   int error_code = 0;
515
516   if ( (states & SI_READABLE) && to_poke._connected_mode) {
517     // grab any data that's waiting on the connection-oriented socket.
518
519     bool got_something = true;  // hopeful for now.
520     to_poke._receive_buffer.reset(MAXIMUM_TRANSFER_CHUNK);
521       // allocate space where we'll get new data.
522     int count = 0;
523     while (got_something && (count++ < SOCKMIND_MAXIMUM_RECEIVES)) {
524       got_something = false;  // now get all pessimistic.
525       int len = recv(to_poke._socket, (char *)to_poke._receive_buffer.observe(),
526           to_poke._receive_buffer.length(), 0);
527       if (len > 0) {
528 #ifdef DEBUG_SOCKET_MINDER
529         LOG(a_sprintf("received %d bytes on socket %d.", len, to_poke._socket));
530 #endif
531         // we received some actual data; we're happy again.
532         got_something = true;
533         // zap any extra off.
534         if (len < MAXIMUM_TRANSFER_CHUNK)
535           to_poke._receive_buffer.zap(len, to_poke._receive_buffer.last());
536         // add in what we were given.
537         to_poke._partially_received += to_poke._receive_buffer;
538         to_poke._receive_buffer.reset(MAXIMUM_TRANSFER_CHUNK);
539           // reset to the right size for trying some more.
540       } else {
541         error_code = critical_events::system_error();
542
543         // reset the states flag based on current state.
544         states = _socks->select(to_poke._socket,
545             raw_socket::SELECTING_JUST_READ);
546         if (states & SI_DISCONNECTED) {
547           error_code = SOCK_ECONNRESET;  // make like regular disconnect.
548           LOG(a_sprintf("noticed disconnection on socket %d.",
549               to_poke._socket));
550           // close the socket; we use a temporary because the close method
551           // wants to actually store zero in the socket.
552           basis::un_int deader = to_poke._socket;
553           _socks->close(deader);
554         }
555
556       }
557     }
558
559     if ( !(states & SI_DISCONNECTED)
560         && to_poke._partially_received.length()) { 
561 #ifdef DEBUG_SOCKET_MINDER
562       LOG(a_sprintf("firing readable now: sock=%d", to_poke._socket));
563 #endif
564       fire_event(_message, _parent_route, to_poke._socket, SI_READABLE);
565     }
566   } else if ( (states & SI_READABLE) && !to_poke._connected_mode) {
567     // datagram style; we need to still alert the parent.
568     fire_event(_message, _parent_route, to_poke._socket,
569         SI_READABLE);
570   }
571
572   // handle errors we have seen.
573   if (error_code) {
574     if (error_code != SOCK_EWOULDBLOCK)
575       LOG(astring(astring::SPRINTF, "error on socket %d is %s.",
576           to_poke._socket, _stack->tcpip_error_name(error_code).s()));
577
578     switch (error_code) {
579       case SOCK_ENOTSOCK:  // fall-through.
580       case SOCK_ECONNABORTED:  // fall-through.
581       case SOCK_ECONNRESET: {
582         // the connection got hammerlocked somehow.
583         LOG(a_sprintf("due to %s condition, closing socket %d.",
584             _stack->tcpip_error_name(error_code).s(), to_poke._socket));
585         fire_event(_message, _parent_route, to_poke._socket,
586             SI_DISCONNECTED);
587         basis::un_int deader = to_poke._socket;
588         _socks->close(deader);
589         to_poke._partially_sent.reset();  // clear with no connection.
590         break;
591       }
592     }
593   }
594 }
595
596 } //namespace.
597