1 /*****************************************************************************\
3 * Name : socket_minder *
4 * Author : Chris Koeritz *
6 *******************************************************************************
7 * Copyright (c) 1999-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
15 #include "raw_socket.h"
16 #include "socket_data.h"
17 #include "socket_minder.h"
18 #include "tcpip_stack.h"
20 #include <basis/mutex.h>
21 #include <loggers/critical_events.h>
22 #include <loggers/program_wide_logger.h>
23 #include <processes/ethread.h>
24 #include <processes/os_event.h>
25 #include <structures/set.h>
26 #include <structures/amorph.h>
27 #include <structures/unique_id.h>
28 #include <textual/parser_bits.h>
36 #include <arpa/inet.h>
37 #include <sys/socket.h>
40 using namespace basis;
41 using namespace loggers;
42 using namespace processes;
43 using namespace structures;
44 using namespace textual;
45 using namespace timely;
49 //#define DEBUG_SOCKET_MINDER
50 // uncomment for noisiness.
53 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), to_print)
55 const int SOCKET_CHECK_INTERVAL = 50;
56 // we will scoot around in our sockets this frequently.
58 const int SOCKMIND_MAXIMUM_RECEIVES = 10;
59 // we'll receive this many items from the socket in one go.
61 const int MAXIMUM_TRANSFER_CHUNK = 512 * KILOBYTE;
62 // largest block of data we'll try to deal with at one time.
64 const int CONN_ALERT_INTERVAL = 100;
65 // this is the most frequently that we will generate a connection checking
68 const int MULTI_SELECT_TIMEOUT = 250;
69 // the snooze on select will occur for this long. during this interval,
70 // it is likely no new sockets will be considered.
74 class socket_data_amorph : public amorph<socket_data> {};
78 class socket_minder_prompter : public ethread
81 socket_minder_prompter(socket_minder &parent)
82 : ethread(SOCKET_CHECK_INTERVAL, ethread::SLACK_INTERVAL),
88 ~socket_minder_prompter() {
89 stop(); // shut down our thread.
92 virtual void perform_activity(void *formal(ptr)) { _parent.snoozy_select(); }
95 socket_minder &_parent; // the object we're hooked to.
100 socket_minder::socket_minder(post_office &post, int parent_route,
101 int event_type, int message)
103 _parent_route(parent_route),
104 _event_type(event_type),
106 _socket_list(new socket_data_amorph),
107 _socks(new raw_socket),
108 _stack(new tcpip_stack),
110 _pending_sox(new int_set),
111 _prompter(new socket_minder_prompter(*this))
113 _prompter->start(NULL_POINTER);
116 socket_minder::~socket_minder()
127 void socket_minder::disengage()
132 astring socket_minder::text_form() const
134 auto_synchronizer l(*_lock);
137 for (int i = 0; i < _socket_list->elements(); i++) {
138 const socket_data *curr = _socket_list->get(i);
139 to_return += curr->text_form();
140 if (i != _socket_list->elements() - 1)
141 to_return += parser_bits::platform_eol_to_chars();
147 void socket_minder::snoozy_select()
149 FUNCDEF("snoozy_select");
154 get_sockets(read_sox, write_sox, pending);
156 // process any with pending connections right now, rather than later.
157 for (int p = 0; p < pending.length(); p++) {
158 socket_data *sd = lock_socket_data(pending[p]);
159 if (!sd) continue; // something hosed there.
160 handle_pending_connecters(*sd);
161 unlock_socket_data(sd);
164 // now select on all of our sockets simultaneously.
165 int ret = _socks->select(read_sox, write_sox, MULTI_SELECT_TIMEOUT);
166 if (!ret || (!read_sox.length() && !write_sox.length()) ) {
167 return; // nothing happened.
170 // rotate through the lists and push socket_minders around as needed.
171 // any sockets we have events for but no socket_data are orphans and will
174 // check read sockets.
175 for (int r = 0; r < read_sox.length(); r++) {
176 const int sock = read_sox[r];
177 if (owns_socket(sock)) {
178 socket_data *sd = lock_socket_data(sock);
179 if (!sd) continue; // something hosed there.
180 push_receives(*sd, SI_READABLE);
181 unlock_socket_data(sd);
183 r--; // skip back before deleted guy.
187 // check write sockets.
188 for (int w = 0; w < write_sox.length(); w++) {
189 const int sock = write_sox[w];
190 if (owns_socket(sock)) {
191 socket_data *sd = lock_socket_data(sock);
192 if (!sd) continue; // something hosed there.
193 push_sends(*sd, SI_WRITABLE);
194 unlock_socket_data(sd);
196 w--; // skip back before deleted guy.
201 void socket_minder::get_sockets(int_array &read_sox, int_array &write_sox,
202 int_array &pendings) const
204 auto_synchronizer l(*_lock);
205 for (int i = 0; i < _socket_list->elements(); i++) {
206 socket_data *sd = _socket_list->borrow(i);
207 if (sd->_connection_pending) {
208 // this is not ready for sends and receives yet.
209 pendings += sd->_socket;
211 // always add sockets to test if they have data waiting.
212 read_sox += sd->_socket;
213 // only check on writability if there is data pending for sending.
214 if (sd->_partially_sent.length())
215 write_sox += sd->_socket;
220 bool socket_minder::owns_socket(int socket) const
222 auto_synchronizer l(*_lock);
223 for (int i = 0; i < _socket_list->elements(); i++) {
224 if (_socket_list->borrow(i)->_socket == socket) return true;
229 socket_data *socket_minder::lock_socket_data(int socket)
232 for (int i = 0; i < _socket_list->elements(); i++)
233 if (_socket_list->borrow(i)->_socket == socket)
234 return _socket_list->borrow(i);
235 // this is a failure to get here; there was no suitable socket.
240 void socket_minder::unlock_socket_data(socket_data *to_unlock)
242 if (!to_unlock) return;
243 //can't affect it now. to_unlock = NULL_POINTER;
247 bool socket_minder::add_socket_data(int socket, bool server, int server_socket,
248 bool connected_mode, bool connection_pending)
250 auto_synchronizer l(*_lock);
251 socket_data *harpo = lock_socket_data(socket);
253 unlock_socket_data(harpo);
256 socket_data *new_data = new socket_data(socket, server, server_socket,
258 _socks->set_non_blocking(socket);
259 // ensure the new guy is treated as non-blocking. unix does not seem
260 // to inherit this from the parent.
261 new_data->_connection_pending = connection_pending;
262 _socket_list->append(new_data);
266 bool socket_minder::remove_socket_data(int socket)
268 FUNCDEF("remove_socket_data");
269 auto_synchronizer l(*_lock);
270 for (int i = 0; i < _socket_list->elements(); i++) {
271 if (_socket_list->borrow(i)->_socket == socket) {
272 // give the socket a last chance to get its data out.
273 evaluate_interest(*_socket_list->borrow(i));
274 _socket_list->zap(i, i);
278 // LOG(a_sprintf("couldn't find socket %d.", socket));
282 bool socket_minder::register_interest(int socket, int interests)
284 #ifdef DEBUG_SOCKET_MINDER
285 FUNCDEF("register_interest");
287 socket_data *harpo = lock_socket_data(socket);
288 #ifdef DEBUG_SOCKET_MINDER
289 LOG(astring(astring::SPRINTF, "registering interest of %d for socket "
290 "%d.", interests, socket));
292 if (!harpo) return false;
293 harpo->_registered_interests = interests;
294 unlock_socket_data(harpo);
298 bool socket_minder::is_connection_pending(int socket)
300 socket_data *harpo = lock_socket_data(socket);
301 if (!harpo) return false;
302 bool to_return = harpo->_connection_pending;
303 unlock_socket_data(harpo);
307 bool socket_minder::set_connection_pending(int socket, bool pending)
309 socket_data *harpo = lock_socket_data(socket);
310 if (!harpo) return false;
311 harpo->_connection_pending = pending;
312 unlock_socket_data(harpo);
316 void socket_minder::fire_event(int to_fire, int at_whom,
317 basis::un_int parm1, basis::un_int parm2)
319 _post.drop_off(at_whom, new OS_event(_event_type, to_fire, parm1, parm2));
322 void socket_minder::put_pending_server(int to_put, bool at_head)
324 if (!to_put) return; // bogus.
325 auto_synchronizer l(*_lock);
327 _pending_sox->insert(0, 1);
328 (*_pending_sox)[0] = to_put;
330 *_pending_sox += to_put;
334 bool socket_minder::zap_pending_server(int socket)
336 auto_synchronizer l(*_lock);
337 if (!_pending_sox->member(socket)) return false;
338 _pending_sox->remove(socket);
342 int socket_minder::get_pending_server()
344 auto_synchronizer l(*_lock);
345 if (!_pending_sox->length()) return 0;
346 int to_return = _pending_sox->get(0);
347 _pending_sox->zap(0, 0);
351 bool socket_minder::handle_pending_connecters(socket_data &to_peek)
353 FUNCDEF("handle_pending_connecters");
354 if (!to_peek._connection_pending) return false; // not needed here.
356 if (to_peek._last_conn_alert > time_stamp(-CONN_ALERT_INTERVAL)) {
360 to_peek._last_conn_alert.reset();
362 // force the issue; there is no simple way to portably know whether
363 // the socket got a connection or not, so just say it did.
364 if (!to_peek._is_server
365 && (to_peek._registered_interests & SI_CONNECTED) ) {
366 // deal with a client first. this just means, zing an event.
367 #ifdef DEBUG_SOCKET_MINDER
368 LOG(a_sprintf("sending client SI_CONNECTED event on parent %d",
371 fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
372 } else if (to_peek._is_server
373 && (to_peek._registered_interests & SI_CONNECTED) ) {
374 // special handling for servers. we accept the waiting guy if he's
375 // there. otherwise we don't send the event.
377 socklen_t sock_len = sizeof(sock_addr);
378 int new_sock = int(::accept(to_peek._socket, &sock_addr, &sock_len));
379 // check for a new socket.
380 if (new_sock != INVALID_SOCKET) {
381 LOG(a_sprintf("accept got a client socket %d.", new_sock));
382 if (_pending_sox->member(new_sock)) {
383 LOG(a_sprintf("already have seen socket %d in pending!", new_sock));
385 *_pending_sox += new_sock;
386 #ifdef DEBUG_SOCKET_MINDER
387 LOG(a_sprintf("sending server SI_CONNECTED event on parent %d",
390 fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
392 } else if (_pending_sox->length()) {
393 // there are still pending connectees.
394 fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
397 // also, if the connection is still pending, we don't want to select on
402 bool socket_minder::evaluate_interest(socket_data &to_peek)
404 FUNCDEF("evaluate_interest");
405 if (to_peek._connection_pending) {
406 return handle_pending_connecters(to_peek);
411 int states = _socks->select(to_peek._socket, sel_mode);
414 return true; // nothing to report.
417 if (! (states & SI_ERRONEOUS) && ! (states & SI_DISCONNECTED) ) {
418 push_sends(to_peek, states);
419 push_receives(to_peek, states);
422 if ( (to_peek._registered_interests & SI_ERRONEOUS)
423 && (states & SI_ERRONEOUS) ) {
424 // there's some kind of bad problem on the socket.
425 LOG(a_sprintf("socket %d has status of erroneous.", to_peek._socket));
426 //hmmm: what to do? generate an event?
429 if ( (to_peek._registered_interests & SI_DISCONNECTED)
430 && (states & SI_DISCONNECTED) ) {
431 // we lost our state of connectedness.
432 fire_event(_message, _parent_route, to_peek._socket,
434 return true; // get out now.
440 void socket_minder::push_sends(socket_data &to_poke, int states)
442 FUNCDEF("push_sends");
443 if (to_poke._connection_pending) {
444 LOG("not supposed to try this when not connected yet...");
447 #ifdef DEBUG_SOCKET_MINDER
448 if (to_poke._partially_sent.length()) {
449 LOG(a_sprintf("socket %d: %d bytes to send.", to_poke._socket,
450 to_poke._partially_sent.length()));
456 if ( (states & SI_WRITABLE) && to_poke._partially_sent.length()) {
457 // write to the socket since we see an opportunity.
458 byte_array &to_send = to_poke._partially_sent;
459 int len_sent = send(to_poke._socket, (char *)to_send.observe(),
460 to_send.length(), 0);
463 LOG(a_sprintf("didn't send any data on socket %d.", to_poke._socket));
464 } else if (len_sent == SOCKET_ERROR) {
465 // something bad happened.
466 error_code = critical_events::system_error();
468 #ifdef DEBUG_SOCKET_MINDER
469 LOG(a_sprintf("updating that %d bytes got sent out of %d to send.",
470 len_sent, to_send.length()));
471 if (to_send.length() != len_sent)
472 LOG(a_sprintf("size to send (%d) not same as size sent (%d).",
473 to_send.length(), len_sent));
475 // update the partially sent chunk for the bit we sent out.
476 to_send.zap(0, len_sent - 1);
480 // handle errors we have seen.
482 if (error_code != SOCK_EWOULDBLOCK)
483 LOG(astring(astring::SPRINTF, "error on socket %d is %s.",
484 to_poke._socket, _stack->tcpip_error_name(error_code).s()));
486 switch (error_code) {
487 case SOCK_ENOTSOCK: // fall-through.
488 case SOCK_ECONNABORTED: // fall-through.
489 case SOCK_ECONNRESET: {
490 // the connection got hammerlocked somehow.
491 LOG(a_sprintf("due to %s condition, closing socket %d.",
492 _stack->tcpip_error_name(error_code).s(), to_poke._socket));
493 fire_event(_message, _parent_route, to_poke._socket,
495 to_poke._partially_sent.reset(); // clear with no connection.
502 void socket_minder::push_receives(socket_data &to_poke, int states)
504 FUNCDEF("push_receives");
505 if (to_poke._connection_pending) {
506 LOG("not supposed to try this when not connected yet...");
509 #ifdef DEBUG_SOCKET_MINDER
510 if (to_poke._partially_received.length())
511 LOG(a_sprintf("socket %d: %d bytes already received.", to_poke._socket,
512 to_poke._partially_received.length()));
517 if ( (states & SI_READABLE) && to_poke._connected_mode) {
518 // grab any data that's waiting on the connection-oriented socket.
520 bool got_something = true; // hopeful for now.
521 to_poke._receive_buffer.reset(MAXIMUM_TRANSFER_CHUNK);
522 // allocate space where we'll get new data.
524 while (got_something && (count++ < SOCKMIND_MAXIMUM_RECEIVES)) {
525 got_something = false; // now get all pessimistic.
526 int len = recv(to_poke._socket, (char *)to_poke._receive_buffer.observe(),
527 to_poke._receive_buffer.length(), 0);
529 #ifdef DEBUG_SOCKET_MINDER
530 LOG(a_sprintf("received %d bytes on socket %d.", len, to_poke._socket));
532 // we received some actual data; we're happy again.
533 got_something = true;
534 // zap any extra off.
535 if (len < MAXIMUM_TRANSFER_CHUNK)
536 to_poke._receive_buffer.zap(len, to_poke._receive_buffer.last());
537 // add in what we were given.
538 to_poke._partially_received += to_poke._receive_buffer;
539 to_poke._receive_buffer.reset(MAXIMUM_TRANSFER_CHUNK);
540 // reset to the right size for trying some more.
542 error_code = critical_events::system_error();
544 // reset the states flag based on current state.
545 states = _socks->select(to_poke._socket,
546 raw_socket::SELECTING_JUST_READ);
547 if (states & SI_DISCONNECTED) {
548 error_code = SOCK_ECONNRESET; // make like regular disconnect.
549 LOG(a_sprintf("noticed disconnection on socket %d.",
551 // close the socket; we use a temporary because the close method
552 // wants to actually store zero in the socket.
553 basis::un_int deader = to_poke._socket;
554 _socks->close(deader);
560 if ( !(states & SI_DISCONNECTED)
561 && to_poke._partially_received.length()) {
562 #ifdef DEBUG_SOCKET_MINDER
563 LOG(a_sprintf("firing readable now: sock=%d", to_poke._socket));
565 fire_event(_message, _parent_route, to_poke._socket, SI_READABLE);
567 } else if ( (states & SI_READABLE) && !to_poke._connected_mode) {
568 // datagram style; we need to still alert the parent.
569 fire_event(_message, _parent_route, to_poke._socket,
573 // handle errors we have seen.
575 if (error_code != SOCK_EWOULDBLOCK)
576 LOG(astring(astring::SPRINTF, "error on socket %d is %s.",
577 to_poke._socket, _stack->tcpip_error_name(error_code).s()));
579 switch (error_code) {
580 case SOCK_ENOTSOCK: // fall-through.
581 case SOCK_ECONNABORTED: // fall-through.
582 case SOCK_ECONNRESET: {
583 // the connection got hammerlocked somehow.
584 LOG(a_sprintf("due to %s condition, closing socket %d.",
585 _stack->tcpip_error_name(error_code).s(), to_poke._socket));
586 fire_event(_message, _parent_route, to_poke._socket,
588 basis::un_int deader = to_poke._socket;
589 _socks->close(deader);
590 to_poke._partially_sent.reset(); // clear with no connection.