feisty meow concerns codebase 2.140
socket_minder.cpp
Go to the documentation of this file.
1/*****************************************************************************\
2* *
3* Name : socket_minder *
4* Author : Chris Koeritz *
5* *
6*******************************************************************************
7* Copyright (c) 1999-$now By Author. This program is free software; you can *
8* redistribute it and/or modify it under the terms of the GNU General Public *
9* License as published by the Free Software Foundation; either version 2 of *
10* the License or (at your option) any later version. This is online at: *
11* http://www.fsf.org/copyleft/gpl.html *
12* Please send any updates to: fred@gruntose.com *
13\*****************************************************************************/
14
15#include "raw_socket.h"
16#include "socket_data.h"
17#include "socket_minder.h"
18#include "tcpip_stack.h"
19
20#include <basis/mutex.h>
23#include <processes/ethread.h>
24#include <processes/os_event.h>
25#include <structures/set.h>
26#include <structures/amorph.h>
28#include <textual/parser_bits.h>
29
30#include <errno.h>
31/*#ifdef __WIN32__
32 #include <ws2tcpip.h>
33#endif
34#ifdef __UNIX__
35*/
36 #include <arpa/inet.h>
37 #include <sys/socket.h>
38//#endif
39
40using namespace basis;
41using namespace loggers;
42using namespace processes;
43using namespace structures;
44using namespace textual;
45using namespace timely;
46
47namespace sockets {
48
49//#define DEBUG_SOCKET_MINDER
50 // uncomment for noisiness.
51
52#undef LOG
53#define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), to_print)
54
55const int SOCKET_CHECK_INTERVAL = 50;
56 // we will scoot around in our sockets this frequently.
57
59 // we'll receive this many items from the socket in one go.
60
62 // largest block of data we'll try to deal with at one time.
63
64const int CONN_ALERT_INTERVAL = 100;
65 // this is the most frequently that we will generate a connection checking
66 // event.
67
68const int MULTI_SELECT_TIMEOUT = 250;
69 // the snooze on select will occur for this long. during this interval,
70 // it is likely no new sockets will be considered.
71
73
74class socket_data_amorph : public amorph<socket_data> {};
75
77
78class socket_minder_prompter : public ethread
79{
80public:
81 socket_minder_prompter(socket_minder &parent)
83 _parent(parent)
84 {
86 }
87
88 ~socket_minder_prompter() {
89 stop(); // shut down our thread.
90 }
91
92 virtual void perform_activity(void *formal(ptr)) { _parent.snoozy_select(); }
93
94private:
95 socket_minder &_parent; // the object we're hooked to.
96};
97
99
101 int event_type, int message)
102: _post(post),
103 _parent_route(parent_route),
104 _event_type(event_type),
105 _lock(new mutex),
106 _socket_list(new socket_data_amorph),
107 _socks(new raw_socket),
108 _stack(new tcpip_stack),
109 _message(message),
110 _pending_sox(new int_set),
111 _prompter(new socket_minder_prompter(*this))
112{
113 _prompter->start(NULL_POINTER);
114}
115
117{
118 _prompter->stop();
119 WHACK(_prompter);
120 WHACK(_socket_list);
121 WHACK(_lock);
122 WHACK(_pending_sox);
123 WHACK(_socks);
124 WHACK(_stack);
125}
126
128{
129 _prompter->stop();
130}
131
133{
134 auto_synchronizer l(*_lock);
135 astring to_return;
136
137 for (int i = 0; i < _socket_list->elements(); i++) {
138 const socket_data *curr = _socket_list->get(i);
139 to_return += curr->text_form();
140 if (i != _socket_list->elements() - 1)
142 }
143
144 return to_return;
145}
146
148{
149 FUNCDEF("snoozy_select");
150 int_array read_sox;
151 int_array write_sox;
152 int_array pending;
153
154 get_sockets(read_sox, write_sox, pending);
155
156 // process any with pending connections right now, rather than later.
157 for (int p = 0; p < pending.length(); p++) {
158 socket_data *sd = lock_socket_data(pending[p]);
159 if (!sd) continue; // something hosed there.
162 }
163
164 // now select on all of our sockets simultaneously.
165 int ret = _socks->select(read_sox, write_sox, MULTI_SELECT_TIMEOUT);
166 if (!ret || (!read_sox.length() && !write_sox.length()) ) {
167 return; // nothing happened.
168 }
169
170 // rotate through the lists and push socket_minders around as needed.
171 // any sockets we have events for but no socket_data are orphans and will
172 // be ignored.
173
174 // check read sockets.
175 for (int r = 0; r < read_sox.length(); r++) {
176 const int sock = read_sox[r];
177 if (owns_socket(sock)) {
178 socket_data *sd = lock_socket_data(sock);
179 if (!sd) continue; // something hosed there.
182 read_sox.zap(r, r);
183 r--; // skip back before deleted guy.
184 }
185 }
186
187 // check write sockets.
188 for (int w = 0; w < write_sox.length(); w++) {
189 const int sock = write_sox[w];
190 if (owns_socket(sock)) {
191 socket_data *sd = lock_socket_data(sock);
192 if (!sd) continue; // something hosed there.
195 write_sox.zap(w, w);
196 w--; // skip back before deleted guy.
197 }
198 }
199}
200
202 int_array &pendings) const
203{
204 auto_synchronizer l(*_lock);
205 for (int i = 0; i < _socket_list->elements(); i++) {
206 socket_data *sd = _socket_list->borrow(i);
207 if (sd->_connection_pending) {
208 // this is not ready for sends and receives yet.
209 pendings += sd->_socket;
210 } else {
211 // always add sockets to test if they have data waiting.
212 read_sox += sd->_socket;
213 // only check on writability if there is data pending for sending.
214 if (sd->_partially_sent.length())
215 write_sox += sd->_socket;
216 }
217 }
218}
219
220bool socket_minder::owns_socket(int socket) const
221{
222 auto_synchronizer l(*_lock);
223 for (int i = 0; i < _socket_list->elements(); i++) {
224 if (_socket_list->borrow(i)->_socket == socket) return true;
225 }
226 return false;
227}
228
230{
231 _lock->lock();
232 for (int i = 0; i < _socket_list->elements(); i++)
233 if (_socket_list->borrow(i)->_socket == socket)
234 return _socket_list->borrow(i);
235 // this is a failure to get here; there was no suitable socket.
236 _lock->unlock();
237 return NULL_POINTER;
238}
239
241{
242 if (!to_unlock) return;
243//can't affect it now. to_unlock = NULL_POINTER;
244 _lock->unlock();
245}
246
247bool socket_minder::add_socket_data(int socket, bool server, int server_socket,
248 bool connected_mode, bool connection_pending)
249{
250 auto_synchronizer l(*_lock);
251 socket_data *harpo = lock_socket_data(socket);
252 if (harpo) {
253 unlock_socket_data(harpo);
254 return false;
255 }
256 socket_data *new_data = new socket_data(socket, server, server_socket,
257 connected_mode);
258 _socks->set_non_blocking(socket);
259 // ensure the new guy is treated as non-blocking. unix does not seem
260 // to inherit this from the parent.
261 new_data->_connection_pending = connection_pending;
262 _socket_list->append(new_data);
263 return true;
264}
265
267{
268 FUNCDEF("remove_socket_data");
269 auto_synchronizer l(*_lock);
270 for (int i = 0; i < _socket_list->elements(); i++) {
271 if (_socket_list->borrow(i)->_socket == socket) {
272 // give the socket a last chance to get its data out.
273 evaluate_interest(*_socket_list->borrow(i));
274 _socket_list->zap(i, i);
275 return true;
276 }
277 }
278// LOG(a_sprintf("couldn't find socket %d.", socket));
279 return false;
280}
281
282bool socket_minder::register_interest(int socket, int interests)
283{
284#ifdef DEBUG_SOCKET_MINDER
285 FUNCDEF("register_interest");
286#endif
287 socket_data *harpo = lock_socket_data(socket);
288#ifdef DEBUG_SOCKET_MINDER
289 LOG(astring(astring::SPRINTF, "registering interest of %d for socket "
290 "%d.", interests, socket));
291#endif
292 if (!harpo) return false;
293 harpo->_registered_interests = interests;
294 unlock_socket_data(harpo);
295 return true;
296}
297
299{
300 socket_data *harpo = lock_socket_data(socket);
301 if (!harpo) return false;
302 bool to_return = harpo->_connection_pending;
303 unlock_socket_data(harpo);
304 return to_return;
305}
306
307bool socket_minder::set_connection_pending(int socket, bool pending)
308{
309 socket_data *harpo = lock_socket_data(socket);
310 if (!harpo) return false;
311 harpo->_connection_pending = pending;
312 unlock_socket_data(harpo);
313 return true;
314}
315
316void socket_minder::fire_event(int to_fire, int at_whom,
317 basis::un_int parm1, basis::un_int parm2)
318{
319 _post.drop_off(at_whom, new OS_event(_event_type, to_fire, parm1, parm2));
320}
321
322void socket_minder::put_pending_server(int to_put, bool at_head)
323{
324 if (!to_put) return; // bogus.
325 auto_synchronizer l(*_lock);
326 if (at_head) {
327 _pending_sox->insert(0, 1);
328 (*_pending_sox)[0] = to_put;
329 } else {
330 *_pending_sox += to_put;
331 }
332}
333
335{
336 auto_synchronizer l(*_lock);
337 if (!_pending_sox->member(socket)) return false;
338 _pending_sox->remove(socket);
339 return true;
340}
341
343{
344 auto_synchronizer l(*_lock);
345 if (!_pending_sox->length()) return 0;
346 int to_return = _pending_sox->get(0);
347 _pending_sox->zap(0, 0);
348 return to_return;
349}
350
352{
353 FUNCDEF("handle_pending_connecters");
354 if (!to_peek._connection_pending) return false; // not needed here.
355
357 // not time yet.
358 return false;
359 }
360 to_peek._last_conn_alert.reset();
361
362 // force the issue; there is no simple way to portably know whether
363 // the socket got a connection or not, so just say it did.
364 if (!to_peek._is_server
365 && (to_peek._registered_interests & SI_CONNECTED) ) {
366 // deal with a client first. this just means, zing an event.
367#ifdef DEBUG_SOCKET_MINDER
368 LOG(a_sprintf("sending client SI_CONNECTED event on parent %d",
369 _parent_route));
370#endif
371 fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
372 } else if (to_peek._is_server
373 && (to_peek._registered_interests & SI_CONNECTED) ) {
374 // special handling for servers. we accept the waiting guy if he's
375 // there. otherwise we don't send the event.
376 sockaddr sock_addr;
377 socklen_t sock_len = sizeof(sock_addr);
378 int new_sock = int(::accept(to_peek._socket, &sock_addr, &sock_len));
379 // check for a new socket.
380 if (new_sock != INVALID_SOCKET) {
381 LOG(a_sprintf("accept got a client socket %d.", new_sock));
382 if (_pending_sox->member(new_sock)) {
383 LOG(a_sprintf("already have seen socket %d in pending!", new_sock));
384 } else {
385 *_pending_sox += new_sock;
386#ifdef DEBUG_SOCKET_MINDER
387 LOG(a_sprintf("sending server SI_CONNECTED event on parent %d",
388 _parent_route));
389#endif
390 fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
391 }
392 } else if (_pending_sox->length()) {
393 // there are still pending connectees.
394 fire_event(_message, _parent_route, to_peek._socket, SI_CONNECTED);
395 }
396 }
397 // also, if the connection is still pending, we don't want to select on
398 // it yet.
399 return true;
400}
401
403{
404 FUNCDEF("evaluate_interest");
405 if (to_peek._connection_pending) {
406 return handle_pending_connecters(to_peek);
407 }
408
409 int sel_mode = 0;
410
411 int states = _socks->select(to_peek._socket, sel_mode);
412
413 if (!states) {
414 return true; // nothing to report.
415 }
416
417 if (! (states & SI_ERRONEOUS) && ! (states & SI_DISCONNECTED) ) {
418 push_sends(to_peek, states);
419 push_receives(to_peek, states);
420 }
421
422 if ( (to_peek._registered_interests & SI_ERRONEOUS)
423 && (states & SI_ERRONEOUS) ) {
424 // there's some kind of bad problem on the socket.
425 LOG(a_sprintf("socket %d has status of erroneous.", to_peek._socket));
426//hmmm: what to do? generate an event?
427 }
428
430 && (states & SI_DISCONNECTED) ) {
431 // we lost our state of connectedness.
432 fire_event(_message, _parent_route, to_peek._socket,
434 return true; // get out now.
435 }
436
437 return true;
438}
439
440void socket_minder::push_sends(socket_data &to_poke, int states)
441{
442 FUNCDEF("push_sends");
443 if (to_poke._connection_pending) {
444 LOG("not supposed to try this when not connected yet...");
445 }
446
447#ifdef DEBUG_SOCKET_MINDER
448 if (to_poke._partially_sent.length()) {
449 LOG(a_sprintf("socket %d: %d bytes to send.", to_poke._socket,
450 to_poke._partially_sent.length()));
451 }
452#endif
453
454 int error_code = 0;
455
456 if ( (states & SI_WRITABLE) && to_poke._partially_sent.length()) {
457 // write to the socket since we see an opportunity.
458 byte_array &to_send = to_poke._partially_sent;
459 int len_sent = send(to_poke._socket, (char *)to_send.observe(),
460 to_send.length(), 0);
461 if (!len_sent) {
462 // nothing got sent.
463 LOG(a_sprintf("didn't send any data on socket %d.", to_poke._socket));
464 } else if (len_sent == SOCKET_ERROR) {
465 // something bad happened.
466 error_code = critical_events::system_error();
467 } else {
468#ifdef DEBUG_SOCKET_MINDER
469 LOG(a_sprintf("updating that %d bytes got sent out of %d to send.",
470 len_sent, to_send.length()));
471 if (to_send.length() != len_sent)
472 LOG(a_sprintf("size to send (%d) not same as size sent (%d).",
473 to_send.length(), len_sent));
474#endif
475 // update the partially sent chunk for the bit we sent out.
476 to_send.zap(0, len_sent - 1);
477 }
478 }
479
480 // handle errors we have seen.
481 if (error_code) {
482 if (error_code != SOCK_EWOULDBLOCK)
483 LOG(astring(astring::SPRINTF, "error on socket %d is %s.",
484 to_poke._socket, _stack->tcpip_error_name(error_code).s()));
485
486 switch (error_code) {
487 case SOCK_ENOTSOCK: // fall-through.
488 case SOCK_ECONNABORTED: // fall-through.
489 case SOCK_ECONNRESET: {
490 // the connection got hammerlocked somehow.
491 LOG(a_sprintf("due to %s condition, closing socket %d.",
492 _stack->tcpip_error_name(error_code).s(), to_poke._socket));
493 fire_event(_message, _parent_route, to_poke._socket,
495 to_poke._partially_sent.reset(); // clear with no connection.
496 break;
497 }
498 }
499 }
500}
501
503{
504 FUNCDEF("push_receives");
505 if (to_poke._connection_pending) {
506 LOG("not supposed to try this when not connected yet...");
507 }
508
509#ifdef DEBUG_SOCKET_MINDER
510 if (to_poke._partially_received.length())
511 LOG(a_sprintf("socket %d: %d bytes already received.", to_poke._socket,
512 to_poke._partially_received.length()));
513#endif
514
515 int error_code = 0;
516
517 if ( (states & SI_READABLE) && to_poke._connected_mode) {
518 // grab any data that's waiting on the connection-oriented socket.
519
520 bool got_something = true; // hopeful for now.
522 // allocate space where we'll get new data.
523 int count = 0;
524 while (got_something && (count++ < SOCKMIND_MAXIMUM_RECEIVES)) {
525 got_something = false; // now get all pessimistic.
526 int len = recv(to_poke._socket, (char *)to_poke._receive_buffer.observe(),
527 to_poke._receive_buffer.length(), 0);
528 if (len > 0) {
529#ifdef DEBUG_SOCKET_MINDER
530 LOG(a_sprintf("received %d bytes on socket %d.", len, to_poke._socket));
531#endif
532 // we received some actual data; we're happy again.
533 got_something = true;
534 // zap any extra off.
535 if (len < MAXIMUM_TRANSFER_CHUNK)
536 to_poke._receive_buffer.zap(len, to_poke._receive_buffer.last());
537 // add in what we were given.
538 to_poke._partially_received += to_poke._receive_buffer;
540 // reset to the right size for trying some more.
541 } else {
542 error_code = critical_events::system_error();
543
544 // reset the states flag based on current state.
545 states = _socks->select(to_poke._socket,
547 if (states & SI_DISCONNECTED) {
548 error_code = SOCK_ECONNRESET; // make like regular disconnect.
549 LOG(a_sprintf("noticed disconnection on socket %d.",
550 to_poke._socket));
551 // close the socket; we use a temporary because the close method
552 // wants to actually store zero in the socket.
553 basis::un_int deader = to_poke._socket;
554 _socks->close(deader);
555 }
556
557 }
558 }
559
560 if ( !(states & SI_DISCONNECTED)
561 && to_poke._partially_received.length()) {
562#ifdef DEBUG_SOCKET_MINDER
563 LOG(a_sprintf("firing readable now: sock=%d", to_poke._socket));
564#endif
565 fire_event(_message, _parent_route, to_poke._socket, SI_READABLE);
566 }
567 } else if ( (states & SI_READABLE) && !to_poke._connected_mode) {
568 // datagram style; we need to still alert the parent.
569 fire_event(_message, _parent_route, to_poke._socket,
571 }
572
573 // handle errors we have seen.
574 if (error_code) {
575 if (error_code != SOCK_EWOULDBLOCK)
576 LOG(astring(astring::SPRINTF, "error on socket %d is %s.",
577 to_poke._socket, _stack->tcpip_error_name(error_code).s()));
578
579 switch (error_code) {
580 case SOCK_ENOTSOCK: // fall-through.
581 case SOCK_ECONNABORTED: // fall-through.
582 case SOCK_ECONNRESET: {
583 // the connection got hammerlocked somehow.
584 LOG(a_sprintf("due to %s condition, closing socket %d.",
585 _stack->tcpip_error_name(error_code).s(), to_poke._socket));
586 fire_event(_message, _parent_route, to_poke._socket,
588 basis::un_int deader = to_poke._socket;
589 _socks->close(deader);
590 to_poke._partially_sent.reset(); // clear with no connection.
591 break;
592 }
593 }
594 }
595}
596
597} //namespace.
598
#define LOG(s)
a_sprintf is a specialization of astring that provides printf style support.
Definition astring.h:440
outcome insert(int index, int new_indices)
Adds "new_indices" new positions for objects into the array at "index".
Definition array.h:803
void reset(int number=0, const contents *initial_contents=NULL_POINTER)
Resizes this array and sets the contents from an array of contents.
Definition array.h:349
const contents & get(int index) const
Accesses individual objects stored in "this" at the "index" position.
Definition array.h:372
const contents * observe() const
Returns a pointer to the underlying C array of data.
Definition array.h:172
int length() const
Returns the current reported length of the allocated C array.
Definition array.h:115
outcome zap(int start, int end)
Deletes from "this" the objects inclusively between "start" and "end".
Definition array.h:769
int last() const
Returns the last valid element in the array.
Definition array.h:118
Provides a dynamically resizable ASCII character string.
Definition astring.h:35
const char * s() const
synonym for observe. the 's' stands for "string", if that helps.
Definition astring.h:113
auto_synchronizer simplifies concurrent code by automatically unlocking.
Definition mutex.h:113
A very common template for a dynamic array of bytes.
Definition byte_array.h:36
A simple object that wraps a templated array of ints.
Definition array.h:275
void lock()
Clamps down on the mutex, if possible.
Definition mutex.cpp:101
void unlock()
Gives up the possession of the mutex.
Definition mutex.cpp:113
static basis::un_int system_error()
gets the most recent system error reported on this thread.
Models an OS-level event so we can represent activities occurring there.
Definition os_event.h:31
Provides a platform-independent object for adding threads to a program.
Definition ethread.h:36
bool start(void *thread_data)
causes the thread to start, if it has not already been started.
Definition ethread.cpp:145
virtual void perform_activity(void *thread_data)=0
< invoked just after after start(), when the OS thread is created.
void stop()
tells the thread to shutdown and waits for the shutdown to occur.
Definition ethread.cpp:194
Manages a collection of mailboxes and implements delivery routes for mail.
Definition post_office.h:35
void drop_off(const structures::unique_int &id, letter *package)
sends a "package" on its way to the "id" via the registered route.
int close(basis::un_int &socket)
int select(basis::un_int socket, int selection_mode, int timeout=0) const
bool set_non_blocking(basis::un_int socket, bool non_blocking=true)
basis::astring text_form() const
Definition socket_data.h:69
basis::byte_array _receive_buffer
Definition socket_data.h:37
basis::byte_array _partially_received
Definition socket_data.h:36
timely::time_stamp _last_conn_alert
Definition socket_data.h:48
basis::byte_array _partially_sent
Definition socket_data.h:35
void get_sockets(basis::int_array &read_sox, basis::int_array &write_sox, basis::int_array &pending) const
bool is_connection_pending(int socket)
bool handle_pending_connecters(socket_data &to_peek)
void put_pending_server(int to_put, bool at_head)
bool set_connection_pending(int socket, bool pending)
bool remove_socket_data(int socket)
bool register_interest(int socket, int interests)
virtual bool evaluate_interest(socket_data &to_examine)
bool zap_pending_server(int socket)
socket_data * lock_socket_data(int socket)
void push_sends(socket_data &to_poke, int states)
bool add_socket_data(int socket, bool server, int server_socket, bool connected_mode, bool connection_pending)
void push_receives(socket_data &to_poke, int states)
basis::astring text_form() const
bool owns_socket(int socket) const
void unlock_socket_data(socket_data *to_unlock)
socket_minder(processes::post_office &post, int parent_route, int event_type, int message)
Helpful functions for interacting with TCP/IP stacks.
Definition tcpip_stack.h:38
static basis::astring tcpip_error_name(int error_value)
A simple object that wraps a templated set of ints.
Definition set.h:156
bool member(const contents &to_test) const
Returns true if the item "to_test" is a member of this set.
Definition set.h:223
bool remove(const contents &to_remove)
Removes the item "to_remove" from the set.
Definition set.h:249
static const char * platform_eol_to_chars()
provides the characters that make up this platform's line ending.
Represents a point in time relative to the operating system startup time.
Definition time_stamp.h:38
void reset()
sets the stamp time back to now.
#define formal(parameter)
This macro just eats what it's passed; it marks unused formal parameters.
Definition definitions.h:48
#define NULL_POINTER
The value representing a pointer to nothing.
Definition definitions.h:32
#define FUNCDEF(func_in)
FUNCDEF sets the name of a function (and plugs it into the callstack).
Definition enhance_cpp.h:54
The guards collection helps in testing preconditions and reporting errors.
Definition array.h:30
void WHACK(contents *&ptr)
deletion with clearing of the pointer.
Definition functions.h:121
unsigned int un_int
Abbreviated name for unsigned integers.
Definition definitions.h:62
const int KILOBYTE
Number of bytes in a kilobyte.
A logger that sends to the console screen using the standard output device.
Provides access to the operating system's socket methods.
const int SOCKMIND_MAXIMUM_RECEIVES
const int MAXIMUM_TRANSFER_CHUNK
@ SI_CONNECTED
Definition raw_socket.h:45
@ SI_DISCONNECTED
Definition raw_socket.h:46
@ SI_ERRONEOUS
Definition raw_socket.h:47
@ SI_READABLE
Definition raw_socket.h:43
@ SI_WRITABLE
Definition raw_socket.h:44
const int CONN_ALERT_INTERVAL
const int MULTI_SELECT_TIMEOUT
const int SOCKET_CHECK_INTERVAL
A dynamic container class that holds any kind of object via pointers.
Definition amorph.h:55
#include <time.h>
#define INVALID_SOCKET
#define SOCK_ECONNRESET
#define SOCK_ECONNABORTED
#define SOCKET_ERROR
#define SOCK_EWOULDBLOCK
#define SOCK_ENOTSOCK