1 /*****************************************************************************\
4 * Author : Chris Koeritz *
6 *******************************************************************************
7 * Copyright (c) 2001-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
15 #include "internet_address.h"
16 #include "raw_socket.h"
18 #include "tcpip_stack.h"
20 #include <basis/byte_array.h>
21 #include <basis/functions.h>
22 #include <basis/astring.h>
23 #include <basis/mutex.h>
24 #include <loggers/critical_events.h>
25 #include <loggers/program_wide_logger.h>
26 #include <structures/static_memory_gremlin.h>
27 #include <timely/time_control.h>
28 #include <timely/time_stamp.h>
30 //hmmm: put this bag o headers into a similar thing to windoze helper. maybe just have an os_helper file that combines both?
32 #include <arpa/inet.h>
37 #include <sys/ioctl.h>
38 #include <sys/socket.h>
39 #include <sys/types.h>
44 using namespace basis;
45 using namespace loggers;
46 using namespace structures;
47 using namespace timely;
51 //#define DEBUG_SPOCKET
52 // uncomment for noisy version.
55 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), to_print)
57 const int PENDING_CONNECTIONS_ALLOWED = 14;
58 // we allow this many connections to queue up before they get rejected.
59 // if the OS is windoze, this number is ignored if it's greater than the
60 // hardcoded maximum of like 5.
62 const int RESOLVE_INTERVAL = 300;
63 // we'll re-resolve the ip address at this rate. this mainly comes into
64 // play for the connect call, since the address passed in could have changed
65 // or been invalid to start with. we're not losing much by trying to
66 // resolve the address again during connection time.
68 #define RECOGNIZE_DISCO \
69 _client_bind = false; \
70 _was_connected = false
72 // ensure that the socket is in a good state.
73 #define ENSURE_HEALTH(retval) \
74 if (!was_connected()) return retval; /* never has been. */ \
75 if (!_socket) { RECOGNIZE_DISCO; return retval; /* not set. */ }
77 #define CHECK_BOGUS(retval) \
78 if (is_bogus()) { return retval; /* this spocket is junk. */ }
83 // win32 seems to trip over selects unless we protect them.
84 #define GRAB_LOCK auto_synchronizer l(*_select_lock)
85 // and in truth, the locking turns out to be needed on win32 if we're
86 // going to allow sharing a spocket across threads. this is one of the
87 // design goals so we're honor bound to support that.
96 SAFE_STATIC(mutex, __broken_pipe_synch, )
99 spocket::spocket(const internet_address &where, sock_types type)
103 _was_connected(false),
105 _where(new internet_address(where)),
106 _remote(new internet_address),
107 _socks(new raw_socket),
108 _stack(new tcpip_stack),
109 _select_lock(new mutex),
110 _last_resolve(new time_stamp), // don't force an immediate resolve.
112 _cli_bind(new internet_address)
114 FUNCDEF("constructor");
115 if ( (_type == BROADCAST) || (_type == UNICAST) ) {
116 // casting types are never servers.
118 } else if ( (type == CONNECTED) || (type == BOGUS_SOCK) ) {
119 // nothing special here currently.
121 // this is an unknown type.
122 LOG(a_sprintf("unknown socket type %d; failing out.", _type));
123 //hmmm: without a validity flag of some sort, this doesn't mean much.
130 FUNCDEF("destructor");
132 LOG(a_sprintf("closing spocket: ") + text_form());
140 WHACK(_last_resolve);
142 _client_bind = false;
145 // where and remote don't need to be protected unless we revise the design of
146 // the class and allow a reset or re-open kind of method.
147 const internet_address &spocket::where() const { return *_where; }
148 const internet_address &spocket::remote() const { return *_remote; }
150 tcpip_stack &spocket::stack() const { return *_stack; }
152 // doesn't need to be protected since the sockets are being treated as simple
153 // ints and since _where currently does not get destroyed.
154 astring spocket::text_form()
156 FUNCDEF("text_form");
157 astring to_return = is_client()? "client" :
158 (is_root_server()? "root-server" : "server");
159 to_return += " spocket: ";
161 to_return += "connected, ";
163 if (was_connected()) to_return += "unconnected (was once), ";
164 else to_return += "never-connected, ";
166 to_return += a_sprintf("socket=%u, ", _socket);
167 if (is_root_server()) {
168 to_return += a_sprintf("root-socket=%u, ", _server_socket);
170 to_return += _where->text_form().s();
174 void spocket::bind_client(const internet_address &addr)
180 const char *spocket::outcome_name(const outcome &to_name)
182 switch (to_name.value()) {
183 case NOT_SERVER: return "NOT_SERVER";
184 default: return communication_commons::outcome_name(to_name);
188 outcome spocket::disconnect()
190 FUNCDEF("disconnect");
194 LOG(a_sprintf("closing socket %d", _socket));
196 _socks->close(_socket);
199 if (_server_socket) {
201 LOG(a_sprintf("closing server socket %d", _server_socket));
203 _socks->close(_server_socket);
209 bool spocket::connected()
211 FUNCDEF("connected");
212 ENSURE_HEALTH(false);
214 if (_type != CONNECTED) return was_connected();
216 if (!_socket) return false;
218 // do examination on spocket.
223 int ret = _socks->select(_socket, sel_mode);
225 return true; // we are happy.
227 if ( (ret & SI_DISCONNECTED) || (ret & SI_ERRONEOUS) ) {
233 LOG("caught exception thrown from select, returning false.");
238 outcome spocket::await_readable(int timeout)
240 FUNCDEF("await_readable");
241 CHECK_BOGUS(NO_CONNECTION);
242 ENSURE_HEALTH(NO_CONNECTION);
244 int mode = raw_socket::SELECTING_JUST_READ;
245 int ret = _socks->select(_socket, mode, timeout);
246 if (ret & SI_READABLE) return OKAY;
247 // we found something to report.
248 if (ret & SI_DISCONNECTED) {
250 return NO_CONNECTION;
252 return _socket? NONE_READY : NO_CONNECTION;
253 // nothing is ready currently.
256 outcome spocket::await_writable(int timeout)
258 FUNCDEF("await_writable");
259 CHECK_BOGUS(NO_CONNECTION);
260 ENSURE_HEALTH(NO_CONNECTION);
262 int mode = raw_socket::SELECTING_JUST_WRITE;
263 int ret = _socks->select(_socket, mode, timeout);
264 if (ret & SI_WRITABLE) return OKAY;
265 // we found something to report.
266 if (ret & SI_DISCONNECTED) {
268 return NO_CONNECTION;
270 return _socket? NONE_READY : NO_CONNECTION;
271 // nothing is ready currently.
274 outcome spocket::connect(int communication_wait)
277 CHECK_BOGUS(NO_CONNECTION);
279 GRAB_LOCK; // short lock.
280 if ( (was_connected() && !_client) || _server_socket) {
282 LOG("this object was already opened as a server!");
286 _client = true; // set our state now that we're sure this is okay.
287 _was_connected = false; // reset this, since we're connecting now.
291 // the socket was never created (or was cleaned up previously). this is
292 // where we create the socket so we can communicate.
294 LOG(astring("creating socket now for ") + _where->text_form());
297 int sock_type = SOCK_STREAM;
298 int proto = IPPROTO_TCP;
300 if ( (_type == BROADCAST) || (_type == UNICAST) ) {
301 sock_type = SOCK_DGRAM;
304 _socket = int(::socket(AF_INET, sock_type, proto));
305 if ( (_socket == basis::un_int(INVALID_SOCKET)) || !_socket) {
307 LOG("Failed to open the client's connecting spocket.");
308 return ACCESS_DENIED;
311 // mark the spocket for _blocking_ I/O. we want connect to sit there
312 // until it's connected or returns with an error.
313 _socks->set_non_blocking(_socket, false);
315 if (_type == BROADCAST) {
316 if (!_socks->set_broadcast(_socket)) return ACCESS_DENIED;
317 // mark the socket for broadcast capability.
320 if (!_socks->set_reuse_address(_socket)) return ACCESS_DENIED;
321 // mark the socket so we don't get bind errors on in-use conditions.
324 if (_type == CONNECTED) {
326 // turn on the keepalive timer so that loss of the connection will
327 // eventually be detected by the OS. the duration that is allowed to
328 // elapse before a dead connection is noticed varies with the operating
329 // system and is not configured at this level.
330 if (!_socks->set_keep_alive(_socket)) {
332 LOG("couldn't set watchdog timer on socket.");
336 //hmmm: doesn't this need to be done for bcast too?
338 // create the spocket address that we will connect to.
339 if (strlen(_where->hostname)
340 // && (_where->is_nil_address()
341 // || (*_last_resolve < time_stamp(-RESOLVE_INTERVAL) ) ) ) {
343 //moving to always re-resolving before a connect. otherwise we have somewhat
344 //hard to predict behavior about when the re-resolve will happen.
346 // we know we need to resolve if the address is NULL_POINTER or if the re-resolve
347 // interval has elapsed.
349 byte_array ip_addr = _stack->full_resolve(_where->hostname, full_host);
350 if (ip_addr.length()) {
351 ip_addr.stuff(internet_address::ADDRESS_SIZE, _where->ip_address);
352 LOG(astring("successfully re-resolved address--") + _where->text_form());
354 *_last_resolve = time_stamp(); // reset since we just resolved.
357 // special code for forcing a client to bind.
359 sockaddr sock = _stack->convert(*_cli_bind);
362 LOG(a_sprintf("binding client socket %d to ", _socket)
363 + inet_ntoa(((sockaddr_in *)&sock)->sin_addr));
366 // now, the socket address is bound to our socket.
367 if (negative(bind(_socket, &sock, sizeof(sock)))) {
368 LOG(a_sprintf("error binding socket %d to ", _socket)
369 + inet_ntoa(((sockaddr_in *)&sock)->sin_addr));
373 } else if ( (_type == BROADCAST) || (_type == UNICAST) ) {
374 // this is the last piece of preparation for a broadcast or unicast socket.
375 // there's no real connection, so we just need to get it bound and ready
378 sockaddr sock = _stack->convert(*_where);
381 LOG(a_sprintf("binding socket %d to ", _socket)
382 + inet_ntoa(((sockaddr_in *)&sock)->sin_addr));
385 // now, the socket address is bound to our socket.
386 if (negative(bind(_socket, &sock, sizeof(sock)))) {
387 LOG(a_sprintf("error binding socket %d to ", _socket)
388 + inet_ntoa(((sockaddr_in *)&sock)->sin_addr));
391 // that's it for broadcast preparation. we should be ready.
392 _was_connected = true;
396 // the following is for connected mode only.
398 sockaddr sock = _stack->convert(*_where);
400 // attempt the connection now.
402 //hmmm: error returns are done differently on bsd, right?
403 //hmmm: perhaps hide the base connect in a func that sets our internal
404 // error variable and then allows comparison to enums we provide.
406 time_stamp abort_time(communication_wait);
408 bool connected = false; // did we connect.
410 int sock_len = sizeof(sock);
412 while (time_stamp() < abort_time) {
413 // make the low-level socket connection.
414 int ret = ::connect(_socket, &sock, sock_len);
415 if (ret != SOCKET_ERROR) {
417 _socks->set_non_blocking(_socket, true);
421 basis::un_int last_error = critical_events::system_error();
423 // if we're already done, then make this look like a normal connect.
424 if (last_error == SOCK_EISCONN) {
429 if ( (last_error != SOCK_EWOULDBLOCK)
430 && (last_error != SOCK_EINPROGRESS) ) {
431 // this seems like a real error here.
433 LOG(a_sprintf("Connect failed (error %s or %d) on address:",
434 critical_events::system_error_text(last_error).s(), last_error)
435 + _where->text_form());
437 if (last_error == SOCK_ECONNREFUSED) return NO_ANSWER;
438 //hmmm: fix more of the possibilities to be sensible outcomes?
439 return ACCESS_DENIED;
442 if (time_stamp() >= abort_time) break; // skip before sleeping if T.O.
444 // snooze for a bit before trying again.
445 time_control::sleep_ms(10);
450 LOG(a_sprintf("socket %d connected to server.", _socket));
452 GRAB_LOCK; // short lock.
453 _was_connected = true;
460 outcome spocket::accept(spocket * &sock, bool wait)
463 CHECK_BOGUS(NO_CONNECTION);
464 if (_type != CONNECTED) return BAD_INPUT;
466 // we don't lock in here; we should not be locking on the server socket.
468 sock = NULL_POINTER; // reset.
472 LOG("tried to accept on a client spocket.");
478 if (!_server_socket) {
479 _server_socket = int(::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP));
481 LOG(a_sprintf("srv sock is %d", _server_socket));
482 LOG(astring("creating server socket now for ") + _where->text_form());
485 if (_server_socket == basis::un_int(INVALID_SOCKET)) {
486 LOG("Failed to open the serving spocket.");
490 // mark the socket so we don't get bind errors on in-use conditions.
491 if (!_socks->set_reuse_address(_server_socket))
492 LOG("Failed to mark the socket for re-use.");
494 // create the spocket address for where we exist.
495 sockaddr sock = _stack->convert(*_where);
497 // now, the spocket address is bound to our spocket.
498 int sock_len = sizeof(sock);
499 if (bind(_server_socket, (sockaddr *)&sock, sock_len) < 0) {
500 LOG(astring("Error on bind of ") + critical_events::system_error_text(critical_events::system_error()));
501 _socks->close(_server_socket);
502 return ACCESS_DENIED;
505 // now listen for a connection on our spocket.
506 if (listen(_server_socket, PENDING_CONNECTIONS_ALLOWED) < 0) {
507 LOG(astring("Listen failed with error of ")
508 + critical_events::system_error_text(critical_events::system_error()));
509 _socks->close(_server_socket);
510 return ACCESS_DENIED;
514 // do the kind of accept they want; either block on it or don't.
515 // since our server socket is never used for sends or receives, we pretty
516 // much control it completely and this is safe.
518 _socks->set_non_blocking(_server_socket, true);
519 // mark our socket as non-blocking so we don't get stuck in accepts.
521 _socks->set_non_blocking(_server_socket, false);
522 // mark our socket as blocking; we will be paused until accept occurs.
525 // now try accepting a connection on the spocket.
527 socklen_t sock_len = sizeof(new_sock);
528 basis::un_int accepted = int(::accept(_server_socket, &new_sock, &sock_len));
529 int error = critical_events::system_error();
530 if (!accepted || (accepted == INVALID_SOCKET)) {
531 if (error == SOCK_EWOULDBLOCK) return NO_CONNECTION;
533 LOG(astring("Accept got no client, with an error of ")
534 + critical_events::system_error_text(error));
536 return ACCESS_DENIED;
539 // mark the new spocket for non-blocking I/O.
540 _socks->set_non_blocking(accepted, true);
542 //move to socks object!
544 if (setsockopt(accepted, SOL_SOCKET, SO_KEEPALIVE, (char *)&sock_hop,
545 sizeof(sock_hop)) < 0) {
547 LOG("couldn't set watchdog timer on socket.");
552 LOG(astring("accepted a client on our socket: ") + _where->text_form());
555 // NOTE: normally, our network code sets the spocket to be kept alive (using
556 // keep alives), but we are trying to have a minimal spocket usage and
557 // a minimal network load for this test scenario.
559 // create the spocket address that we will connect to.
560 sock = new spocket(*_where);
561 *sock->_remote = _stack->convert(new_sock);
562 sock->_socket = accepted;
563 sock->_server_socket = 0; // reset to avoid whacking.
564 sock->_was_connected = true;
568 outcome spocket::send(const byte_array &to_send, int &len_sent)
570 return send(to_send.observe(), to_send.length(), len_sent);
573 outcome spocket::send(const abyte *buffer, int size, int &len_sent)
577 if (_type != CONNECTED) return BAD_INPUT;
579 ENSURE_HEALTH(NO_CONNECTION);
581 len_sent = ::send(_socket, (char *)buffer, size, 0);
582 int error_code = critical_events::system_error();
585 LOG("No data went out on the spocket.");
589 if (len_sent == SOCKET_ERROR) {
590 if (error_code == SOCK_EWOULDBLOCK) {
592 LOG("would block, will try later...");
594 LOG("HEY HEY! some was sent but we were not counting it!!!");
599 LOG(astring("Error ") + critical_events::system_error_text(error_code)
600 + " occurred during the send!");
602 if (!connected()) return NO_CONNECTION;
604 LOG(a_sprintf("forcing disconnect on socket %d.", _socket));
606 // we're trying this new approach here... we found that the socket doesn't
607 // really know that it got disconnected in some circumstances.
609 return ACCESS_DENIED;
611 if (len_sent != size) {
612 // only sent part of the buffer.
614 LOG(a_sprintf("sent %d bytes out of %d.", len_sent, size));
622 outcome spocket::send_to(const internet_address &where_to,
623 const byte_array &to_send, int &len_sent)
625 return send_to(where_to, to_send.observe(), to_send.length(), len_sent);
628 outcome spocket::send_to(const internet_address &where_to, const abyte *to_send,
629 int size, int &len_sent)
633 if (_type == CONNECTED) return BAD_INPUT;
634 sockaddr dest = _stack->convert(where_to);
635 int ret = sendto(_socket, (char *)to_send, size, 0, &dest, sizeof(dest));
636 int error = critical_events::system_error();
638 if (error == SOCK_EWOULDBLOCK) return NONE_READY; // no buffer space?
639 LOG(astring("failed to send packet; error ")
640 + _stack->tcpip_error_name(error));
641 return ACCESS_DENIED;
644 LOG(astring("didn't send whole datagram!"));
650 outcome spocket::receive(byte_array &buffer, int &size)
653 CHECK_BOGUS(NONE_READY);
654 if (_type != CONNECTED) return BAD_INPUT;
655 if (size <= 0) return BAD_INPUT;
657 outcome to_return = receive(buffer.access(), size);
658 // trim the buffer to the actual received size.
659 if (to_return == OKAY)
660 buffer.zap(size, buffer.last());
664 outcome spocket::receive(abyte *buffer, int &size)
667 CHECK_BOGUS(NONE_READY);
668 if (_type != CONNECTED) return BAD_INPUT;
669 ENSURE_HEALTH(NO_CONNECTION);
672 if (expected <= 0) return BAD_INPUT;
674 int len = recv(_socket, (char *)buffer, expected, 0);
676 // check to make sure we're not disconnected.
677 int ret = _socks->select(_socket, raw_socket::SELECTING_JUST_READ);
678 if (ret & SI_DISCONNECTED) {
680 return NO_CONNECTION;
682 // seems like more normal absence of data.
684 } else if (len < 0) {
685 if (critical_events::system_error() == SOCK_EWOULDBLOCK) return NONE_READY;
687 LOG(astring("The receive failed with an error ")
688 + critical_events::system_error_text(critical_events::system_error()));
690 if (!connected()) return NO_CONNECTION;
691 return ACCESS_DENIED;
697 outcome spocket::receive_from(byte_array &buffer, int &size,
698 internet_address &where_from)
700 FUNCDEF("receive_from");
701 where_from = internet_address();
702 CHECK_BOGUS(NONE_READY);
703 if (_type == CONNECTED) return BAD_INPUT;
704 if (size <= 0) return BAD_INPUT;
706 outcome to_return = receive_from(buffer.access(), size, where_from);
707 // trim the buffer to the actual received size.
708 if (to_return == OKAY)
709 buffer.zap(size, buffer.last());
713 outcome spocket::receive_from(abyte *buffer, int &size,
714 internet_address &where_from)
716 FUNCDEF("receive_from");
717 where_from = internet_address();
718 CHECK_BOGUS(NONE_READY);
719 if (_type == CONNECTED) return BAD_INPUT;
720 ENSURE_HEALTH(NO_CONNECTION);
723 if (expected <= 0) return BAD_INPUT;
726 socklen_t fromlen = sizeof(from);
727 int len = recvfrom(_socket, (char *)buffer, expected, 0, &from, &fromlen);
728 int err = critical_events::system_error();
729 if (!len) return NONE_READY;
732 LOG(a_sprintf("actual sys err value=%d", err));
734 if (err == SOCK_EWOULDBLOCK) return NONE_READY;
735 if (err == SOCK_ECONNRESET) return NONE_READY;
736 // this seems to be a necessary windoze kludge; we're not connected
737 // and never were but it says this idiotic garbage about the connection
740 LOG(astring("The recvfrom failed with an error ")
741 + critical_events::system_error_text(err));
743 if (!connected()) return NO_CONNECTION;
744 return ACCESS_DENIED;
746 where_from = _stack->convert(from);