updates from orpheus for windoze build
[feisty_meow.git] / octopi / library / sockets / socket_minder.cpp
1 /*****************************************************************************\
2 *                                                                             *
3 *  Name   : socket_minder                                                     *
4 *  Author : Chris Koeritz                                                     *
5 *                                                                             *
6 *******************************************************************************
7 * Copyright (c) 1999-$now By Author.  This program is free software; you can  *
8 * redistribute it and/or modify it under the terms of the GNU General Public  *
9 * License as published by the Free Software Foundation; either version 2 of   *
10 * the License or (at your option) any later version.  This is online at:      *
11 *     http://www.fsf.org/copyleft/gpl.html                                    *
12 * Please send any updates to: fred@gruntose.com                               *
13 \*****************************************************************************/
14
15 #include "raw_socket.h"
16 #include "socket_data.h"
17 #include "socket_minder.h"
18 #include "tcpip_stack.h"
19
20 #include <basis/mutex.h>
21 #include <loggers/critical_events.h>
22 #include <loggers/program_wide_logger.h>
23 #include <processes/ethread.h>
24 #include <processes/os_event.h>
25 #include <structures/set.h>
26 #include <structures/amorph.h>
27 #include <structures/unique_id.h>
28 #include <textual/parser_bits.h>
29
30 #include <errno.h>
31 /*#ifdef __WIN32__
32   #include <ws2tcpip.h>
33 #endif
34 #ifdef __UNIX__
35 */
36   #include <arpa/inet.h>
37   #include <sys/socket.h>
38 //#endif
39
40 using namespace basis;
41 using namespace loggers;
42 using namespace processes;
43 using namespace structures;
44 using namespace textual;
45 using namespace timely;
46
47 namespace sockets {
48
49 //#define DEBUG_SOCKET_MINDER
50   // uncomment for noisiness.
51
52 #undef LOG
53 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), to_print)
54
55 const int SOCKET_CHECK_INTERVAL = 50;
56   // we will scoot around in our sockets this frequently.
57
58 const int SOCKMIND_MAXIMUM_RECEIVES = 10;
59   // we'll receive this many items from the socket in one go.
60
61 const int MAXIMUM_TRANSFER_CHUNK = 512 * KILOBYTE;
62   // largest block of data we'll try to deal with at one time.
63
64 const int CONN_ALERT_INTERVAL = 100;
65   // this is the most frequently that we will generate a connection checking
66   // event.
67
68 const int MULTI_SELECT_TIMEOUT = 250;
69   // the snooze on select will occur for this long.  during this interval,
70   // it is likely no new sockets will be considered.
71
72 //////////////
73
74 class socket_data_amorph : public amorph<socket_data> {};
75
76 //////////////
77
78 class socket_minder_prompter : public ethread
79 {
80 public:
81   socket_minder_prompter(socket_minder &parent)
82   : ethread(SOCKET_CHECK_INTERVAL, ethread::SLACK_INTERVAL),
83     _parent(parent)
84   {
85     start(NULL_POINTER);
86   }
87
88   ~socket_minder_prompter() {
89     stop();  // shut down our thread.
90   }
91
92   virtual void perform_activity(void *formal(ptr)) { _parent.snoozy_select(); }
93
94 private:
95   socket_minder &_parent;  // the object we're hooked to.
96 };
97
98 //////////////
99
100 socket_minder::socket_minder(post_office &post, int parent_route,
101     int event_type, int message)
102 : _post(post),
103   _parent_route(parent_route),
104   _event_type(event_type),
105   _lock(new mutex),
106   _socket_list(new socket_data_amorph),
107   _socks(new raw_socket),
108   _stack(new tcpip_stack),
109   _message(message),
110   _pending_sox(new int_set),
111   _prompter(new socket_minder_prompter(*this))
112 {
113   _prompter->start(NULL_POINTER);
114 }
115
116 socket_minder::~socket_minder()
117 {
118   _prompter->stop();
119   WHACK(_prompter);
120   WHACK(_socket_list);
121   WHACK(_lock);
122   WHACK(_pending_sox);
123   WHACK(_socks); 
124   WHACK(_stack);
125 }
126
127 void socket_minder::disengage()
128 {
129   _prompter->stop();
130 }
131
132 astring socket_minder::text_form() const
133 {
134   auto_synchronizer l(*_lock);
135   astring to_return;
136
137   for (int i = 0; i < _socket_list->elements(); i++) {
138     const socket_data *curr = _socket_list->get(i);
139     to_return += curr->text_form();
140     if (i != _socket_list->elements() - 1)
141       to_return += parser_bits::platform_eol_to_chars();
142   }
143
144   return to_return;
145 }
146
147 void socket_minder::snoozy_select()
148 {
149   FUNCDEF("snoozy_select");
150   int_array read_sox;
151   int_array write_sox;
152   int_array pending;
153
154   get_sockets(read_sox, write_sox, pending);
155
156   // process any with pending connections right now, rather than later.
157   for (int p = 0; p < pending.length(); p++) {
158     socket_data *sd = lock_socket_data(pending[p]);
159     if (!sd) continue;  // something hosed there.
160     handle_pending_connecters(*sd);
161     unlock_socket_data(sd);
162   }
163
164   // now select on all of our sockets simultaneously.
165   int ret = _socks->select(read_sox, write_sox, MULTI_SELECT_TIMEOUT);
166   if (!ret || (!read_sox.length() && !write_sox.length()) ) {
167     return;  // nothing happened.
168   }
169
170   // rotate through the lists and push socket_minders around as needed.
171   // any sockets we have events for but no socket_data are orphans and will
172   // be ignored.
173
174   // check read sockets.
175   for (int r = 0; r < read_sox.length(); r++) {
176     const int sock = read_sox[r];
177     if (owns_socket(sock)) {
178       socket_data *sd = lock_socket_data(sock);
179       if (!sd) continue;  // something hosed there.
180       push_receives(*sd, SI_READABLE);
181       unlock_socket_data(sd);
182       read_sox.zap(r, r);
183       r--;  // skip back before deleted guy.
184     }
185   }
186
187   // check write sockets.
188   for (int w = 0; w < write_sox.length(); w++) {
189     const int sock = write_sox[w];
190     if (owns_socket(sock)) {
191       socket_data *sd = lock_socket_data(sock);
192       if (!sd) continue;  // something hosed there.
193       push_sends(*sd, SI_WRITABLE);
194       unlock_socket_data(sd);
195       write_sox.zap(w, w);
196       w--;  // skip back before deleted guy.
197     }
198   }
199 }
200
201 void socket_minder::get_sockets(int_array &read_sox, int_array &write_sox,
202     int_array &pendings) const
203 {
204   auto_synchronizer l(*_lock);
205   for (int i = 0; i < _socket_list->elements(); i++) {
206     socket_data *sd = _socket_list->borrow(i);
207     if (sd->_connection_pending) {
208       // this is not ready for sends and receives yet.
209       pendings += sd->_socket;
210     } else {
211       // always add sockets to test if they have data waiting.
212       read_sox += sd->_socket;
213       // only check on writability if there is data pending for sending.
214       if (sd->_partially_sent.length())
215         write_sox += sd->_socket;
216     }
217   }
218 }
219
220 bool socket_minder::owns_socket(int socket) const
221 {
222   auto_synchronizer l(*_lock);
223   for (int i = 0; i < _socket_list->elements(); i++) {
224     if (_socket_list->borrow(i)->_socket == socket) return true;
225   }
226   return false;
227 }
228
229 socket_data *socket_minder::lock_socket_data(int socket)
230 {
231   _lock->lock();
232   for (int i = 0; i < _socket_list->elements(); i++)
233     if (_socket_list->borrow(i)->_socket == socket)
234       return _socket_list->borrow(i);
235   // this is a failure to get here; there was no suitable socket.
236   _lock->unlock();
237   return NULL_POINTER;
238 }
239
240 void socket_minder::unlock_socket_data(socket_data *to_unlock)
241 {
242   if (!to_unlock) return;
243 //can't affect it now.  to_unlock = NULL_POINTER;
244   _lock->unlock();
245 }
246
247 bool socket_minder::add_socket_data(int socket, bool server, int server_socket,
248     bool connected_mode, bool connection_pending)
249 {
250   auto_synchronizer l(*_lock);
251   socket_data *harpo = lock_socket_data(socket);
252   if (harpo) {
253     unlock_socket_data(harpo);
254     return false;
255   }
256   socket_data *new_data = new socket_data(socket, server, server_socket,
257       connected_mode);
258   _socks->set_non_blocking(socket);
259     // ensure the new guy is treated as non-blocking.  unix does not seem
260     // to inherit this from the parent.
261   new_data->_connection_pending = connection_pending;
262   _socket_list->append(new_data);
263   return true;
264 }
265
266 bool socket_minder::remove_socket_data(int socket)
267 {
268   FUNCDEF("remove_socket_data");
269   auto_synchronizer l(*_lock);
270   for (int i = 0; i < _socket_list->elements(); i++) {
271     if (_socket_list->borrow(i)->_socket == socket) {
272       // give the socket a last chance to get its data out.
273       evaluate_interest(*_socket_list->borrow(i));
274       _socket_list->zap(i, i);
275       return true;
276     }
277   }
278 //  LOG(a_sprintf("couldn't find socket %d.", socket));
279   return false;
280 }
281
282 bool socket_minder::register_interest(int socket, int interests)
283 {
284 #ifdef DEBUG_SOCKET_MINDER
285   FUNCDEF("register_interest");
286 #endif
287   socket_data *harpo = lock_socket_data(socket);
288 #ifdef DEBUG_SOCKET_MINDER
289   LOG(astring(astring::SPRINTF, "registering interest of %d for socket "
290       "%d.", interests, socket));
291 #endif
292   if (!harpo) return false;
293   harpo->_registered_interests = interests;
294   unlock_socket_data(harpo);
295   return true;
296 }
297
298 bool socket_minder::is_connection_pending(int socket)
299 {
300   socket_data *harpo = lock_socket_data(socket);
301   if (!harpo) return false;
302   bool to_return = harpo->_connection_pending;
303   unlock_socket_data(harpo);
304   return to_return;
305 }
306
307 bool socket_minder::set_connection_pending(int socket, bool pending)
308 {
309   socket_data *harpo = lock_socket_data(socket);
310   if (!harpo) return false;
311   harpo->_connection_pending = pending;
312   unlock_socket_data(harpo);
313   return true;
314 }
315
316 void socket_minder::fire_event(int to_fire, int at_whom,
317     basis::un_int parm1, basis::un_int parm2)
318 {
319   _post.drop_off(at_whom, new OS_event(_event_type, to_fire, parm1, parm2));
320 }
321
322 void socket_minder::put_pending_server(int to_put, bool at_head)
323 {
324   if (!to_put) return;  // bogus.
325   auto_synchronizer l(*_lock);
326   if (at_head) {
327     _pending_sox->insert(0, 1);
328     (*_pending_sox)[0] = to_put;
329   } else {
330     *_pending_sox += to_put;
331   }
332 }
333
334 bool socket_minder::zap_pending_server(int socket)
335 {
336   auto_synchronizer l(*_lock);
337   if (!_pending_sox->member(socket)) return false;
338   _pending_sox->remove(socket);
339   return true;
340 }
341
342 int socket_minder::get_pending_server()
343 {
344   auto_synchronizer l(*_lock);
345   if (!_pending_sox->length()) return 0;
346   int to_return = _pending_sox->get(0);
347   _pending_sox->zap(0, 0);
348   return to_return;
349 }
350
351 bool socket_minder::handle_pending_connecters(socket_data &to_peek)
352 {
353   FUNCDEF("handle_pending_connecters");
354   if (!to_peek._connection_pending) return false;  // not needed here.
355
356   if (to_peek._last_conn_alert > time_stamp(-CONN_ALERT_INTERVAL)) {
357     // not time yet.
358     return false;
359   }
360   to_peek._last_conn_alert.reset();
361
362   // force the issue; there is no simple way to portably know whether
363   // the socket got a connection or not, so just say it did.
364   if (!to_peek._is_server
365       && (to_peek._registered_interests & SI_CONNECTED) ) {
366     // deal with a client first.  this just means, zing an event.
367 #ifdef DEBUG_SOCKET_MINDER
368     LOG(a_sprintf("sending client SI_CONNECTED event on parent %d",
369         _parent_route));
370 #endif
371     fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
372   } else if (to_peek._is_server
373       && (to_peek._registered_interests & SI_CONNECTED) ) {
374     // special handling for servers.  we accept the waiting guy if he's
375     // there.  otherwise we don't send the event.
376     sockaddr sock_addr;
377     socklen_t sock_len = sizeof(sock_addr);
378     int new_sock = int(::accept(to_peek._socket, &sock_addr, &sock_len));
379       // check for a new socket.
380     if (new_sock != INVALID_SOCKET) {
381       LOG(a_sprintf("accept got a client socket %d.", new_sock));
382       if (_pending_sox->member(new_sock)) {
383         LOG(a_sprintf("already have seen socket %d in pending!", new_sock));
384       } else {
385         *_pending_sox += new_sock;
386 #ifdef DEBUG_SOCKET_MINDER
387         LOG(a_sprintf("sending server SI_CONNECTED event on parent %d",
388             _parent_route));
389 #endif
390         fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
391       }
392     } else if (_pending_sox->length()) {
393       // there are still pending connectees.
394       fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
395     }
396   }
397   // also, if the connection is still pending, we don't want to select on
398   // it yet.
399   return true;
400 }
401
402 bool socket_minder::evaluate_interest(socket_data &to_peek)
403 {
404   FUNCDEF("evaluate_interest");
405   if (to_peek._connection_pending) {
406     return handle_pending_connecters(to_peek);
407   }
408
409   int sel_mode = 0;
410
411   int states = _socks->select(to_peek._socket, sel_mode);
412
413   if (!states) {
414     return true;  // nothing to report.
415   }
416
417   if (! (states & SI_ERRONEOUS) && ! (states & SI_DISCONNECTED) ) {
418     push_sends(to_peek, states);
419     push_receives(to_peek, states);
420   }
421
422   if ( (to_peek._registered_interests & SI_ERRONEOUS)
423       && (states & SI_ERRONEOUS) ) {
424     // there's some kind of bad problem on the socket.
425     LOG(a_sprintf("socket %d has status of erroneous.", to_peek._socket));
426 //hmmm: what to do?  generate an event?
427   }
428
429   if ( (to_peek._registered_interests & SI_DISCONNECTED)
430       && (states & SI_DISCONNECTED) ) {
431     // we lost our state of connectedness.
432     fire_event(_message, _parent_route, to_peek._socket,
433         SI_DISCONNECTED);
434     return true;  // get out now.
435   }
436
437   return true;
438 }
439
440 void socket_minder::push_sends(socket_data &to_poke, int states)
441 {
442   FUNCDEF("push_sends");
443   if (to_poke._connection_pending) {
444     LOG("not supposed to try this when not connected yet...");
445   }
446
447 #ifdef DEBUG_SOCKET_MINDER
448   if (to_poke._partially_sent.length()) {
449     LOG(a_sprintf("socket %d: %d bytes to send.", to_poke._socket,
450         to_poke._partially_sent.length()));
451   }
452 #endif
453
454   int error_code = 0;
455
456   if ( (states & SI_WRITABLE) && to_poke._partially_sent.length()) {
457     // write to the socket since we see an opportunity.
458     byte_array &to_send = to_poke._partially_sent;
459     int len_sent = send(to_poke._socket, (char *)to_send.observe(),
460         to_send.length(), 0);
461     if (!len_sent) {
462       // nothing got sent.
463       LOG(a_sprintf("didn't send any data on socket %d.", to_poke._socket));
464     } else if (len_sent == SOCKET_ERROR) {
465       // something bad happened.
466       error_code = critical_events::system_error();
467     } else {
468 #ifdef DEBUG_SOCKET_MINDER
469       LOG(a_sprintf("updating that %d bytes got sent out of %d to send.",
470           len_sent, to_send.length()));
471       if (to_send.length() != len_sent)
472         LOG(a_sprintf("size to send (%d) not same as size sent (%d).",
473             to_send.length(), len_sent));
474 #endif
475       // update the partially sent chunk for the bit we sent out.
476       to_send.zap(0, len_sent - 1);
477     }
478   }
479
480   // handle errors we have seen.
481   if (error_code) {
482     if (error_code != SOCK_EWOULDBLOCK)
483       LOG(astring(astring::SPRINTF, "error on socket %d is %s.",
484           to_poke._socket, _stack->tcpip_error_name(error_code).s()));
485
486     switch (error_code) {
487       case SOCK_ENOTSOCK:  // fall-through.
488       case SOCK_ECONNABORTED:  // fall-through.
489       case SOCK_ECONNRESET: {
490         // the connection got hammerlocked somehow.
491         LOG(a_sprintf("due to %s condition, closing socket %d.",
492             _stack->tcpip_error_name(error_code).s(), to_poke._socket));
493         fire_event(_message, _parent_route, to_poke._socket,
494             SI_DISCONNECTED);
495         to_poke._partially_sent.reset();  // clear with no connection.
496         break;
497       }
498     }
499   }
500 }
501
502 void socket_minder::push_receives(socket_data &to_poke, int states)
503 {
504   FUNCDEF("push_receives");
505   if (to_poke._connection_pending) {
506     LOG("not supposed to try this when not connected yet...");
507   }
508
509 #ifdef DEBUG_SOCKET_MINDER
510   if (to_poke._partially_received.length())
511     LOG(a_sprintf("socket %d: %d bytes already received.", to_poke._socket,
512         to_poke._partially_received.length()));
513 #endif
514
515   int error_code = 0;
516
517   if ( (states & SI_READABLE) && to_poke._connected_mode) {
518     // grab any data that's waiting on the connection-oriented socket.
519
520     bool got_something = true;  // hopeful for now.
521     to_poke._receive_buffer.reset(MAXIMUM_TRANSFER_CHUNK);
522       // allocate space where we'll get new data.
523     int count = 0;
524     while (got_something && (count++ < SOCKMIND_MAXIMUM_RECEIVES)) {
525       got_something = false;  // now get all pessimistic.
526       int len = recv(to_poke._socket, (char *)to_poke._receive_buffer.observe(),
527           to_poke._receive_buffer.length(), 0);
528       if (len > 0) {
529 #ifdef DEBUG_SOCKET_MINDER
530         LOG(a_sprintf("received %d bytes on socket %d.", len, to_poke._socket));
531 #endif
532         // we received some actual data; we're happy again.
533         got_something = true;
534         // zap any extra off.
535         if (len < MAXIMUM_TRANSFER_CHUNK)
536           to_poke._receive_buffer.zap(len, to_poke._receive_buffer.last());
537         // add in what we were given.
538         to_poke._partially_received += to_poke._receive_buffer;
539         to_poke._receive_buffer.reset(MAXIMUM_TRANSFER_CHUNK);
540           // reset to the right size for trying some more.
541       } else {
542         error_code = critical_events::system_error();
543
544         // reset the states flag based on current state.
545         states = _socks->select(to_poke._socket,
546             raw_socket::SELECTING_JUST_READ);
547         if (states & SI_DISCONNECTED) {
548           error_code = SOCK_ECONNRESET;  // make like regular disconnect.
549           LOG(a_sprintf("noticed disconnection on socket %d.",
550               to_poke._socket));
551           // close the socket; we use a temporary because the close method
552           // wants to actually store zero in the socket.
553           basis::un_int deader = to_poke._socket;
554           _socks->close(deader);
555         }
556
557       }
558     }
559
560     if ( !(states & SI_DISCONNECTED)
561         && to_poke._partially_received.length()) { 
562 #ifdef DEBUG_SOCKET_MINDER
563       LOG(a_sprintf("firing readable now: sock=%d", to_poke._socket));
564 #endif
565       fire_event(_message, _parent_route, to_poke._socket, SI_READABLE);
566     }
567   } else if ( (states & SI_READABLE) && !to_poke._connected_mode) {
568     // datagram style; we need to still alert the parent.
569     fire_event(_message, _parent_route, to_poke._socket,
570         SI_READABLE);
571   }
572
573   // handle errors we have seen.
574   if (error_code) {
575     if (error_code != SOCK_EWOULDBLOCK)
576       LOG(astring(astring::SPRINTF, "error on socket %d is %s.",
577           to_poke._socket, _stack->tcpip_error_name(error_code).s()));
578
579     switch (error_code) {
580       case SOCK_ENOTSOCK:  // fall-through.
581       case SOCK_ECONNABORTED:  // fall-through.
582       case SOCK_ECONNRESET: {
583         // the connection got hammerlocked somehow.
584         LOG(a_sprintf("due to %s condition, closing socket %d.",
585             _stack->tcpip_error_name(error_code).s(), to_poke._socket));
586         fire_event(_message, _parent_route, to_poke._socket,
587             SI_DISCONNECTED);
588         basis::un_int deader = to_poke._socket;
589         _socks->close(deader);
590         to_poke._partially_sent.reset();  // clear with no connection.
591         break;
592       }
593     }
594   }
595 }
596
597 } //namespace.
598