1 /*****************************************************************************\
3 * Name : socket_minder *
4 * Author : Chris Koeritz *
6 *******************************************************************************
7 * Copyright (c) 1999-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
15 #include "raw_socket.h"
16 #include "socket_data.h"
17 #include "socket_minder.h"
18 #include "tcpip_stack.h"
20 #include <basis/mutex.h>
21 #include <loggers/critical_events.h>
22 #include <loggers/program_wide_logger.h>
23 #include <processes/ethread.h>
24 #include <processes/os_event.h>
25 #include <structures/set.h>
26 #include <structures/amorph.h>
27 #include <structures/unique_id.h>
28 #include <textual/parser_bits.h>
35 #include <arpa/inet.h>
36 #include <sys/socket.h>
39 using namespace basis;
40 using namespace loggers;
41 using namespace processes;
42 using namespace structures;
43 using namespace textual;
44 using namespace timely;
48 //#define DEBUG_SOCKET_MINDER
49 // uncomment for noisiness.
52 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), to_print)
54 const int SOCKET_CHECK_INTERVAL = 50;
55 // we will scoot around in our sockets this frequently.
57 const int SOCKMIND_MAXIMUM_RECEIVES = 10;
58 // we'll receive this many items from the socket in one go.
60 const int MAXIMUM_TRANSFER_CHUNK = 512 * KILOBYTE;
61 // largest block of data we'll try to deal with at one time.
63 const int CONN_ALERT_INTERVAL = 100;
64 // this is the most frequently that we will generate a connection checking
67 const int MULTI_SELECT_TIMEOUT = 250;
68 // the snooze on select will occur for this long. during this interval,
69 // it is likely no new sockets will be considered.
73 class socket_data_amorph : public amorph<socket_data> {};
77 class socket_minder_prompter : public ethread
80 socket_minder_prompter(socket_minder &parent)
81 : ethread(SOCKET_CHECK_INTERVAL, ethread::SLACK_INTERVAL),
87 ~socket_minder_prompter() {
88 stop(); // shut down our thread.
91 virtual void perform_activity(void *formal(ptr)) { _parent.snoozy_select(); }
94 socket_minder &_parent; // the object we're hooked to.
99 socket_minder::socket_minder(post_office &post, int parent_route,
100 int event_type, int message)
102 _parent_route(parent_route),
103 _event_type(event_type),
105 _socket_list(new socket_data_amorph),
106 _socks(new raw_socket),
107 _stack(new tcpip_stack),
109 _pending_sox(new int_set),
110 _prompter(new socket_minder_prompter(*this))
112 _prompter->start(NULL_POINTER);
115 socket_minder::~socket_minder()
126 void socket_minder::disengage()
131 astring socket_minder::text_form() const
133 auto_synchronizer l(*_lock);
136 for (int i = 0; i < _socket_list->elements(); i++) {
137 const socket_data *curr = _socket_list->get(i);
138 to_return += curr->text_form();
139 if (i != _socket_list->elements() - 1)
140 to_return += parser_bits::platform_eol_to_chars();
146 void socket_minder::snoozy_select()
148 FUNCDEF("snoozy_select");
153 get_sockets(read_sox, write_sox, pending);
155 // process any with pending connections right now, rather than later.
156 for (int p = 0; p < pending.length(); p++) {
157 socket_data *sd = lock_socket_data(pending[p]);
158 if (!sd) continue; // something hosed there.
159 handle_pending_connecters(*sd);
160 unlock_socket_data(sd);
163 // now select on all of our sockets simultaneously.
164 int ret = _socks->select(read_sox, write_sox, MULTI_SELECT_TIMEOUT);
165 if (!ret || (!read_sox.length() && !write_sox.length()) ) {
166 return; // nothing happened.
169 // rotate through the lists and push socket_minders around as needed.
170 // any sockets we have events for but no socket_data are orphans and will
173 // check read sockets.
174 for (int r = 0; r < read_sox.length(); r++) {
175 const int sock = read_sox[r];
176 if (owns_socket(sock)) {
177 socket_data *sd = lock_socket_data(sock);
178 if (!sd) continue; // something hosed there.
179 push_receives(*sd, SI_READABLE);
180 unlock_socket_data(sd);
182 r--; // skip back before deleted guy.
186 // check write sockets.
187 for (int w = 0; w < write_sox.length(); w++) {
188 const int sock = write_sox[w];
189 if (owns_socket(sock)) {
190 socket_data *sd = lock_socket_data(sock);
191 if (!sd) continue; // something hosed there.
192 push_sends(*sd, SI_WRITABLE);
193 unlock_socket_data(sd);
195 w--; // skip back before deleted guy.
200 void socket_minder::get_sockets(int_array &read_sox, int_array &write_sox,
201 int_array &pendings) const
203 auto_synchronizer l(*_lock);
204 for (int i = 0; i < _socket_list->elements(); i++) {
205 socket_data *sd = _socket_list->borrow(i);
206 if (sd->_connection_pending) {
207 // this is not ready for sends and receives yet.
208 pendings += sd->_socket;
210 // always add sockets to test if they have data waiting.
211 read_sox += sd->_socket;
212 // only check on writability if there is data pending for sending.
213 if (sd->_partially_sent.length())
214 write_sox += sd->_socket;
219 bool socket_minder::owns_socket(int socket) const
221 auto_synchronizer l(*_lock);
222 for (int i = 0; i < _socket_list->elements(); i++) {
223 if (_socket_list->borrow(i)->_socket == socket) return true;
228 socket_data *socket_minder::lock_socket_data(int socket)
231 for (int i = 0; i < _socket_list->elements(); i++)
232 if (_socket_list->borrow(i)->_socket == socket)
233 return _socket_list->borrow(i);
234 // this is a failure to get here; there was no suitable socket.
239 void socket_minder::unlock_socket_data(socket_data *to_unlock)
241 if (!to_unlock) return;
242 //can't affect it now. to_unlock = NULL_POINTER;
246 bool socket_minder::add_socket_data(int socket, bool server, int server_socket,
247 bool connected_mode, bool connection_pending)
249 auto_synchronizer l(*_lock);
250 socket_data *harpo = lock_socket_data(socket);
252 unlock_socket_data(harpo);
255 socket_data *new_data = new socket_data(socket, server, server_socket,
257 _socks->set_non_blocking(socket);
258 // ensure the new guy is treated as non-blocking. unix does not seem
259 // to inherit this from the parent.
260 new_data->_connection_pending = connection_pending;
261 _socket_list->append(new_data);
265 bool socket_minder::remove_socket_data(int socket)
267 FUNCDEF("remove_socket_data");
268 auto_synchronizer l(*_lock);
269 for (int i = 0; i < _socket_list->elements(); i++) {
270 if (_socket_list->borrow(i)->_socket == socket) {
271 // give the socket a last chance to get its data out.
272 evaluate_interest(*_socket_list->borrow(i));
273 _socket_list->zap(i, i);
277 // LOG(a_sprintf("couldn't find socket %d.", socket));
281 bool socket_minder::register_interest(int socket, int interests)
283 #ifdef DEBUG_SOCKET_MINDER
284 FUNCDEF("register_interest");
286 socket_data *harpo = lock_socket_data(socket);
287 #ifdef DEBUG_SOCKET_MINDER
288 LOG(astring(astring::SPRINTF, "registering interest of %d for socket "
289 "%d.", interests, socket));
291 if (!harpo) return false;
292 harpo->_registered_interests = interests;
293 unlock_socket_data(harpo);
297 bool socket_minder::is_connection_pending(int socket)
299 socket_data *harpo = lock_socket_data(socket);
300 if (!harpo) return false;
301 bool to_return = harpo->_connection_pending;
302 unlock_socket_data(harpo);
306 bool socket_minder::set_connection_pending(int socket, bool pending)
308 socket_data *harpo = lock_socket_data(socket);
309 if (!harpo) return false;
310 harpo->_connection_pending = pending;
311 unlock_socket_data(harpo);
315 void socket_minder::fire_event(int to_fire, int at_whom,
316 basis::un_int parm1, basis::un_int parm2)
318 _post.drop_off(at_whom, new OS_event(_event_type, to_fire, parm1, parm2));
321 void socket_minder::put_pending_server(int to_put, bool at_head)
323 if (!to_put) return; // bogus.
324 auto_synchronizer l(*_lock);
326 _pending_sox->insert(0, 1);
327 (*_pending_sox)[0] = to_put;
329 *_pending_sox += to_put;
333 bool socket_minder::zap_pending_server(int socket)
335 auto_synchronizer l(*_lock);
336 if (!_pending_sox->member(socket)) return false;
337 _pending_sox->remove(socket);
341 int socket_minder::get_pending_server()
343 auto_synchronizer l(*_lock);
344 if (!_pending_sox->length()) return 0;
345 int to_return = _pending_sox->get(0);
346 _pending_sox->zap(0, 0);
350 bool socket_minder::handle_pending_connecters(socket_data &to_peek)
352 FUNCDEF("handle_pending_connecters");
353 if (!to_peek._connection_pending) return false; // not needed here.
355 if (to_peek._last_conn_alert > time_stamp(-CONN_ALERT_INTERVAL)) {
359 to_peek._last_conn_alert.reset();
361 // force the issue; there is no simple way to portably know whether
362 // the socket got a connection or not, so just say it did.
363 if (!to_peek._is_server
364 && (to_peek._registered_interests & SI_CONNECTED) ) {
365 // deal with a client first. this just means, zing an event.
366 #ifdef DEBUG_SOCKET_MINDER
367 LOG(a_sprintf("sending client SI_CONNECTED event on parent %d",
370 fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
371 } else if (to_peek._is_server
372 && (to_peek._registered_interests & SI_CONNECTED) ) {
373 // special handling for servers. we accept the waiting guy if he's
374 // there. otherwise we don't send the event.
376 socklen_t sock_len = sizeof(sock_addr);
377 int new_sock = int(::accept(to_peek._socket, &sock_addr, &sock_len));
378 // check for a new socket.
379 if (new_sock != INVALID_SOCKET) {
380 LOG(a_sprintf("accept got a client socket %d.", new_sock));
381 if (_pending_sox->member(new_sock)) {
382 LOG(a_sprintf("already have seen socket %d in pending!", new_sock));
384 *_pending_sox += new_sock;
385 #ifdef DEBUG_SOCKET_MINDER
386 LOG(a_sprintf("sending server SI_CONNECTED event on parent %d",
389 fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
391 } else if (_pending_sox->length()) {
392 // there are still pending connectees.
393 fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
396 // also, if the connection is still pending, we don't want to select on
401 bool socket_minder::evaluate_interest(socket_data &to_peek)
403 FUNCDEF("evaluate_interest");
404 if (to_peek._connection_pending) {
405 return handle_pending_connecters(to_peek);
410 int states = _socks->select(to_peek._socket, sel_mode);
413 return true; // nothing to report.
416 if (! (states & SI_ERRONEOUS) && ! (states & SI_DISCONNECTED) ) {
417 push_sends(to_peek, states);
418 push_receives(to_peek, states);
421 if ( (to_peek._registered_interests & SI_ERRONEOUS)
422 && (states & SI_ERRONEOUS) ) {
423 // there's some kind of bad problem on the socket.
424 LOG(a_sprintf("socket %d has status of erroneous.", to_peek._socket));
425 //hmmm: what to do? generate an event?
428 if ( (to_peek._registered_interests & SI_DISCONNECTED)
429 && (states & SI_DISCONNECTED) ) {
430 // we lost our state of connectedness.
431 fire_event(_message, _parent_route, to_peek._socket,
433 return true; // get out now.
439 void socket_minder::push_sends(socket_data &to_poke, int states)
441 FUNCDEF("push_sends");
442 if (to_poke._connection_pending) {
443 LOG("not supposed to try this when not connected yet...");
446 #ifdef DEBUG_SOCKET_MINDER
447 if (to_poke._partially_sent.length()) {
448 LOG(a_sprintf("socket %d: %d bytes to send.", to_poke._socket,
449 to_poke._partially_sent.length()));
455 if ( (states & SI_WRITABLE) && to_poke._partially_sent.length()) {
456 // write to the socket since we see an opportunity.
457 byte_array &to_send = to_poke._partially_sent;
458 int len_sent = send(to_poke._socket, (char *)to_send.observe(),
459 to_send.length(), 0);
462 LOG(a_sprintf("didn't send any data on socket %d.", to_poke._socket));
463 } else if (len_sent == SOCKET_ERROR) {
464 // something bad happened.
465 error_code = critical_events::system_error();
467 #ifdef DEBUG_SOCKET_MINDER
468 LOG(a_sprintf("updating that %d bytes got sent out of %d to send.",
469 len_sent, to_send.length()));
470 if (to_send.length() != len_sent)
471 LOG(a_sprintf("size to send (%d) not same as size sent (%d).",
472 to_send.length(), len_sent));
474 // update the partially sent chunk for the bit we sent out.
475 to_send.zap(0, len_sent - 1);
479 // handle errors we have seen.
481 if (error_code != SOCK_EWOULDBLOCK)
482 LOG(astring(astring::SPRINTF, "error on socket %d is %s.",
483 to_poke._socket, _stack->tcpip_error_name(error_code).s()));
485 switch (error_code) {
486 case SOCK_ENOTSOCK: // fall-through.
487 case SOCK_ECONNABORTED: // fall-through.
488 case SOCK_ECONNRESET: {
489 // the connection got hammerlocked somehow.
490 LOG(a_sprintf("due to %s condition, closing socket %d.",
491 _stack->tcpip_error_name(error_code).s(), to_poke._socket));
492 fire_event(_message, _parent_route, to_poke._socket,
494 to_poke._partially_sent.reset(); // clear with no connection.
501 void socket_minder::push_receives(socket_data &to_poke, int states)
503 FUNCDEF("push_receives");
504 if (to_poke._connection_pending) {
505 LOG("not supposed to try this when not connected yet...");
508 #ifdef DEBUG_SOCKET_MINDER
509 if (to_poke._partially_received.length())
510 LOG(a_sprintf("socket %d: %d bytes already received.", to_poke._socket,
511 to_poke._partially_received.length()));
516 if ( (states & SI_READABLE) && to_poke._connected_mode) {
517 // grab any data that's waiting on the connection-oriented socket.
519 bool got_something = true; // hopeful for now.
520 to_poke._receive_buffer.reset(MAXIMUM_TRANSFER_CHUNK);
521 // allocate space where we'll get new data.
523 while (got_something && (count++ < SOCKMIND_MAXIMUM_RECEIVES)) {
524 got_something = false; // now get all pessimistic.
525 int len = recv(to_poke._socket, (char *)to_poke._receive_buffer.observe(),
526 to_poke._receive_buffer.length(), 0);
528 #ifdef DEBUG_SOCKET_MINDER
529 LOG(a_sprintf("received %d bytes on socket %d.", len, to_poke._socket));
531 // we received some actual data; we're happy again.
532 got_something = true;
533 // zap any extra off.
534 if (len < MAXIMUM_TRANSFER_CHUNK)
535 to_poke._receive_buffer.zap(len, to_poke._receive_buffer.last());
536 // add in what we were given.
537 to_poke._partially_received += to_poke._receive_buffer;
538 to_poke._receive_buffer.reset(MAXIMUM_TRANSFER_CHUNK);
539 // reset to the right size for trying some more.
541 error_code = critical_events::system_error();
543 // reset the states flag based on current state.
544 states = _socks->select(to_poke._socket,
545 raw_socket::SELECTING_JUST_READ);
546 if (states & SI_DISCONNECTED) {
547 error_code = SOCK_ECONNRESET; // make like regular disconnect.
548 LOG(a_sprintf("noticed disconnection on socket %d.",
550 // close the socket; we use a temporary because the close method
551 // wants to actually store zero in the socket.
552 basis::un_int deader = to_poke._socket;
553 _socks->close(deader);
559 if ( !(states & SI_DISCONNECTED)
560 && to_poke._partially_received.length()) {
561 #ifdef DEBUG_SOCKET_MINDER
562 LOG(a_sprintf("firing readable now: sock=%d", to_poke._socket));
564 fire_event(_message, _parent_route, to_poke._socket, SI_READABLE);
566 } else if ( (states & SI_READABLE) && !to_poke._connected_mode) {
567 // datagram style; we need to still alert the parent.
568 fire_event(_message, _parent_route, to_poke._socket,
572 // handle errors we have seen.
574 if (error_code != SOCK_EWOULDBLOCK)
575 LOG(astring(astring::SPRINTF, "error on socket %d is %s.",
576 to_poke._socket, _stack->tcpip_error_name(error_code).s()));
578 switch (error_code) {
579 case SOCK_ENOTSOCK: // fall-through.
580 case SOCK_ECONNABORTED: // fall-through.
581 case SOCK_ECONNRESET: {
582 // the connection got hammerlocked somehow.
583 LOG(a_sprintf("due to %s condition, closing socket %d.",
584 _stack->tcpip_error_name(error_code).s(), to_poke._socket));
585 fire_event(_message, _parent_route, to_poke._socket,
587 basis::un_int deader = to_poke._socket;
588 _socks->close(deader);
589 to_poke._partially_sent.reset(); // clear with no connection.