1 /*****************************************************************************\
4 * Author : Chris Koeritz *
6 *******************************************************************************
7 * Copyright (c) 2001-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
15 #include "internet_address.h"
16 #include "raw_socket.h"
18 #include "tcpip_stack.h"
20 #include <basis/byte_array.h>
21 #include <basis/functions.h>
22 #include <basis/astring.h>
23 #include <basis/mutex.h>
24 #include <loggers/critical_events.h>
25 #include <loggers/program_wide_logger.h>
26 #include <structures/static_memory_gremlin.h>
27 #include <timely/time_control.h>
28 #include <timely/time_stamp.h>
31 #include <arpa/inet.h>
36 #include <sys/ioctl.h>
37 #include <sys/socket.h>
38 #include <sys/types.h>
43 using namespace basis;
44 using namespace loggers;
45 using namespace structures;
46 using namespace timely;
50 //#define DEBUG_SPOCKET
51 // uncomment for noisy version.
54 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), to_print)
56 const int PENDING_CONNECTIONS_ALLOWED = 14;
57 // we allow this many connections to queue up before they get rejected.
58 // if the OS is windoze, this number is ignored if it's greater than the
59 // hardcoded maximum of like 5.
61 const int RESOLVE_INTERVAL = 300;
62 // we'll re-resolve the ip address at this rate. this mainly comes into
63 // play for the connect call, since the address passed in could have changed
64 // or been invalid to start with. we're not losing much by trying to
65 // resolve the address again during connection time.
67 #define RECOGNIZE_DISCO \
68 _client_bind = false; \
69 _was_connected = false
71 // ensure that the socket is in a good state.
72 #define ENSURE_HEALTH(retval) \
73 if (!was_connected()) return retval; /* never has been. */ \
74 if (!_socket) { RECOGNIZE_DISCO; return retval; /* not set. */ }
76 #define CHECK_BOGUS(retval) \
77 if (is_bogus()) { return retval; /* this spocket is junk. */ }
81 // win32 seems to trip over selects unless we protect them.
82 #define GRAB_LOCK auto_synchronizer l(*_select_lock)
83 // and in truth, the locking turns out to be needed on win32 if we're
84 // going to allow sharing a spocket across threads. this is one of the
85 // design goals so we're honor bound to support that.
91 SAFE_STATIC(mutex, __broken_pipe_synch, )
94 spocket::spocket(const internet_address &where, sock_types type)
98 _was_connected(false),
100 _where(new internet_address(where)),
101 _remote(new internet_address),
102 _socks(new raw_socket),
103 _stack(new tcpip_stack),
104 _select_lock(new mutex),
105 _last_resolve(new time_stamp), // don't force an immediate resolve.
107 _cli_bind(new internet_address)
109 FUNCDEF("constructor");
110 if ( (_type == BROADCAST) || (_type == UNICAST) ) {
111 // casting types are never servers.
113 } else if ( (type == CONNECTED) || (type == BOGUS_SOCK) ) {
114 // nothing special here currently.
116 // this is an unknown type.
117 LOG(a_sprintf("unknown socket type %d; failing out.", _type));
118 //hmmm: without a validity flag of some sort, this doesn't mean much.
125 FUNCDEF("destructor");
127 LOG(a_sprintf("closing spocket: ") + text_form());
135 WHACK(_last_resolve);
137 _client_bind = false;
140 // where and remote don't need to be protected unless we revise the design of
141 // the class and allow a reset or re-open kind of method.
142 const internet_address &spocket::where() const { return *_where; }
143 const internet_address &spocket::remote() const { return *_remote; }
145 tcpip_stack &spocket::stack() const { return *_stack; }
147 // doesn't need to be protected since the sockets are being treated as simple
148 // ints and since _where currently does not get destroyed.
149 astring spocket::text_form()
151 FUNCDEF("text_form");
152 astring to_return = is_client()? "client" :
153 (is_root_server()? "root-server" : "server");
154 to_return += " spocket: ";
156 to_return += "connected, ";
158 if (was_connected()) to_return += "unconnected (was once), ";
159 else to_return += "never-connected, ";
161 to_return += a_sprintf("socket=%u, ", _socket);
162 if (is_root_server()) {
163 to_return += a_sprintf("root-socket=%u, ", _server_socket);
165 to_return += _where->text_form().s();
169 void spocket::bind_client(const internet_address &addr)
175 const char *spocket::outcome_name(const outcome &to_name)
177 switch (to_name.value()) {
178 case NOT_SERVER: return "NOT_SERVER";
179 default: return communication_commons::outcome_name(to_name);
183 outcome spocket::disconnect()
185 FUNCDEF("disconnect");
189 LOG(a_sprintf("closing socket %d", _socket));
191 _socks->close(_socket);
194 if (_server_socket) {
196 LOG(a_sprintf("closing server socket %d", _server_socket));
198 _socks->close(_server_socket);
204 bool spocket::connected()
206 FUNCDEF("connected");
207 ENSURE_HEALTH(false);
209 if (_type != CONNECTED) return was_connected();
211 if (!_socket) return false;
213 // do examination on spocket.
218 int ret = _socks->select(_socket, sel_mode);
220 return true; // we are happy.
222 if ( (ret & SI_DISCONNECTED) || (ret & SI_ERRONEOUS) ) {
228 LOG("caught exception thrown from select, returning false.");
233 outcome spocket::await_readable(int timeout)
235 FUNCDEF("await_readable");
236 CHECK_BOGUS(NO_CONNECTION);
237 ENSURE_HEALTH(NO_CONNECTION);
239 int mode = raw_socket::SELECTING_JUST_READ;
240 int ret = _socks->select(_socket, mode, timeout);
241 if (ret & SI_READABLE) return OKAY;
242 // we found something to report.
243 if (ret & SI_DISCONNECTED) {
245 return NO_CONNECTION;
247 return _socket? NONE_READY : NO_CONNECTION;
248 // nothing is ready currently.
251 outcome spocket::await_writable(int timeout)
253 FUNCDEF("await_writable");
254 CHECK_BOGUS(NO_CONNECTION);
255 ENSURE_HEALTH(NO_CONNECTION);
257 int mode = raw_socket::SELECTING_JUST_WRITE;
258 int ret = _socks->select(_socket, mode, timeout);
259 if (ret & SI_WRITABLE) return OKAY;
260 // we found something to report.
261 if (ret & SI_DISCONNECTED) {
263 return NO_CONNECTION;
265 return _socket? NONE_READY : NO_CONNECTION;
266 // nothing is ready currently.
269 outcome spocket::connect(int communication_wait)
272 CHECK_BOGUS(NO_CONNECTION);
274 GRAB_LOCK; // short lock.
275 if ( (was_connected() && !_client) || _server_socket) {
277 LOG("this object was already opened as a server!");
281 _client = true; // set our state now that we're sure this is okay.
282 _was_connected = false; // reset this, since we're connecting now.
286 // the socket was never created (or was cleaned up previously). this is
287 // where we create the socket so we can communicate.
289 LOG(astring("creating socket now for ") + _where->text_form());
292 int sock_type = SOCK_STREAM;
293 int proto = IPPROTO_TCP;
295 if ( (_type == BROADCAST) || (_type == UNICAST) ) {
296 sock_type = SOCK_DGRAM;
299 _socket = int(::socket(AF_INET, sock_type, proto));
300 if ( (_socket == basis::un_int(INVALID_SOCKET)) || !_socket) {
302 LOG("Failed to open the client's connecting spocket.");
303 return ACCESS_DENIED;
306 // mark the spocket for _blocking_ I/O. we want connect to sit there
307 // until it's connected or returns with an error.
308 _socks->set_non_blocking(_socket, false);
310 if (_type == BROADCAST) {
311 if (!_socks->set_broadcast(_socket)) return ACCESS_DENIED;
312 // mark the socket for broadcast capability.
315 if (!_socks->set_reuse_address(_socket)) return ACCESS_DENIED;
316 // mark the socket so we don't get bind errors on in-use conditions.
319 if (_type == CONNECTED) {
321 // turn on the keepalive timer so that loss of the connection will
322 // eventually be detected by the OS. the duration that is allowed to
323 // elapse before a dead connection is noticed varies with the operating
324 // system and is not configured at this level.
325 if (!_socks->set_keep_alive(_socket)) {
327 LOG("couldn't set watchdog timer on socket.");
331 //hmmm: doesn't this need to be done for bcast too?
333 // create the spocket address that we will connect to.
334 if (strlen(_where->hostname)
335 // && (_where->is_nil_address()
336 // || (*_last_resolve < time_stamp(-RESOLVE_INTERVAL) ) ) ) {
338 //moving to always re-resolving before a connect. otherwise we have somewhat
339 //hard to predict behavior about when the re-resolve will happen.
341 // we know we need to resolve if the address is NULL_POINTER or if the re-resolve
342 // interval has elapsed.
344 byte_array ip_addr = _stack->full_resolve(_where->hostname, full_host);
345 if (ip_addr.length()) {
346 ip_addr.stuff(internet_address::ADDRESS_SIZE, _where->ip_address);
347 LOG(astring("successfully re-resolved address--") + _where->text_form());
349 *_last_resolve = time_stamp(); // reset since we just resolved.
352 // special code for forcing a client to bind.
354 sockaddr sock = _stack->convert(*_cli_bind);
357 LOG(a_sprintf("binding client socket %d to ", _socket)
358 + inet_ntoa(((sockaddr_in *)&sock)->sin_addr));
361 // now, the socket address is bound to our socket.
362 if (negative(bind(_socket, &sock, sizeof(sock)))) {
363 LOG(a_sprintf("error binding socket %d to ", _socket)
364 + inet_ntoa(((sockaddr_in *)&sock)->sin_addr));
368 } else if ( (_type == BROADCAST) || (_type == UNICAST) ) {
369 // this is the last piece of preparation for a broadcast or unicast socket.
370 // there's no real connection, so we just need to get it bound and ready
373 sockaddr sock = _stack->convert(*_where);
376 LOG(a_sprintf("binding socket %d to ", _socket)
377 + inet_ntoa(((sockaddr_in *)&sock)->sin_addr));
380 // now, the socket address is bound to our socket.
381 if (negative(bind(_socket, &sock, sizeof(sock)))) {
382 LOG(a_sprintf("error binding socket %d to ", _socket)
383 + inet_ntoa(((sockaddr_in *)&sock)->sin_addr));
386 // that's it for broadcast preparation. we should be ready.
387 _was_connected = true;
391 // the following is for connected mode only.
393 sockaddr sock = _stack->convert(*_where);
395 // attempt the connection now.
397 //hmmm: error returns are done differently on bsd, right?
398 //hmmm: perhaps hide the base connect in a func that sets our internal
399 // error variable and then allows comparison to enums we provide.
401 time_stamp abort_time(communication_wait);
403 bool connected = false; // did we connect.
405 int sock_len = sizeof(sock);
407 while (time_stamp() < abort_time) {
408 // make the low-level socket connection.
409 int ret = ::connect(_socket, &sock, sock_len);
410 if (ret != SOCKET_ERROR) {
412 _socks->set_non_blocking(_socket, true);
416 basis::un_int last_error = critical_events::system_error();
418 // if we're already done, then make this look like a normal connect.
419 if (last_error == SOCK_EISCONN) {
424 if ( (last_error != SOCK_EWOULDBLOCK)
425 && (last_error != SOCK_EINPROGRESS) ) {
426 // this seems like a real error here.
428 LOG(a_sprintf("Connect failed (error %s or %d) on address:",
429 critical_events::system_error_text(last_error).s(), last_error)
430 + _where->text_form());
432 if (last_error == SOCK_ECONNREFUSED) return NO_ANSWER;
433 //hmmm: fix more of the possibilities to be sensible outcomes?
434 return ACCESS_DENIED;
437 if (time_stamp() >= abort_time) break; // skip before sleeping if T.O.
439 // snooze for a bit before trying again.
440 time_control::sleep_ms(10);
445 LOG(a_sprintf("socket %d connected to server.", _socket));
447 GRAB_LOCK; // short lock.
448 _was_connected = true;
455 outcome spocket::accept(spocket * &sock, bool wait)
458 CHECK_BOGUS(NO_CONNECTION);
459 if (_type != CONNECTED) return BAD_INPUT;
461 // we don't lock in here; we should not be locking on the server socket.
463 sock = NULL_POINTER; // reset.
467 LOG("tried to accept on a client spocket.");
473 if (!_server_socket) {
474 _server_socket = int(::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP));
476 LOG(a_sprintf("srv sock is %d", _server_socket));
477 LOG(astring("creating server socket now for ") + _where->text_form());
480 if (_server_socket == basis::un_int(INVALID_SOCKET)) {
481 LOG("Failed to open the serving spocket.");
485 // mark the socket so we don't get bind errors on in-use conditions.
486 if (!_socks->set_reuse_address(_server_socket))
487 LOG("Failed to mark the socket for re-use.");
489 // create the spocket address for where we exist.
490 sockaddr sock = _stack->convert(*_where);
492 // now, the spocket address is bound to our spocket.
493 int sock_len = sizeof(sock);
494 if (bind(_server_socket, (sockaddr *)&sock, sock_len) < 0) {
495 LOG(astring("Error on bind of ") + critical_events::system_error_text(critical_events::system_error()));
496 _socks->close(_server_socket);
497 return ACCESS_DENIED;
500 // now listen for a connection on our spocket.
501 if (listen(_server_socket, PENDING_CONNECTIONS_ALLOWED) < 0) {
502 LOG(astring("Listen failed with error of ")
503 + critical_events::system_error_text(critical_events::system_error()));
504 _socks->close(_server_socket);
505 return ACCESS_DENIED;
509 // do the kind of accept they want; either block on it or don't.
510 // since our server socket is never used for sends or receives, we pretty
511 // much control it completely and this is safe.
513 _socks->set_non_blocking(_server_socket, true);
514 // mark our socket as non-blocking so we don't get stuck in accepts.
516 _socks->set_non_blocking(_server_socket, false);
517 // mark our socket as blocking; we will be paused until accept occurs.
520 // now try accepting a connection on the spocket.
522 socklen_t sock_len = sizeof(new_sock);
523 basis::un_int accepted = int(::accept(_server_socket, &new_sock, &sock_len));
524 int error = critical_events::system_error();
525 if (!accepted || (accepted == INVALID_SOCKET)) {
526 if (error == SOCK_EWOULDBLOCK) return NO_CONNECTION;
528 LOG(astring("Accept got no client, with an error of ")
529 + critical_events::system_error_text(error));
531 return ACCESS_DENIED;
534 // mark the new spocket for non-blocking I/O.
535 _socks->set_non_blocking(accepted, true);
537 //move to socks object!
539 if (setsockopt(accepted, SOL_SOCKET, SO_KEEPALIVE, (char *)&sock_hop,
540 sizeof(sock_hop)) < 0) {
542 LOG("couldn't set watchdog timer on socket.");
547 LOG(astring("accepted a client on our socket: ") + _where->text_form());
550 // NOTE: normally, our network code sets the spocket to be kept alive (using
551 // keep alives), but we are trying to have a minimal spocket usage and
552 // a minimal network load for this test scenario.
554 // create the spocket address that we will connect to.
555 sock = new spocket(*_where);
556 *sock->_remote = _stack->convert(new_sock);
557 sock->_socket = accepted;
558 sock->_server_socket = 0; // reset to avoid whacking.
559 sock->_was_connected = true;
563 outcome spocket::send(const byte_array &to_send, int &len_sent)
565 return send(to_send.observe(), to_send.length(), len_sent);
568 outcome spocket::send(const abyte *buffer, int size, int &len_sent)
572 if (_type != CONNECTED) return BAD_INPUT;
574 ENSURE_HEALTH(NO_CONNECTION);
576 len_sent = ::send(_socket, (char *)buffer, size, 0);
577 int error_code = critical_events::system_error();
580 LOG("No data went out on the spocket.");
584 if (len_sent == SOCKET_ERROR) {
585 if (error_code == SOCK_EWOULDBLOCK) {
587 LOG("would block, will try later...");
589 LOG("HEY HEY! some was sent but we were not counting it!!!");
594 LOG(astring("Error ") + critical_events::system_error_text(error_code)
595 + " occurred during the send!");
597 if (!connected()) return NO_CONNECTION;
599 LOG(a_sprintf("forcing disconnect on socket %d.", _socket));
601 // we're trying this new approach here... we found that the socket doesn't
602 // really know that it got disconnected in some circumstances.
604 return ACCESS_DENIED;
606 if (len_sent != size) {
607 // only sent part of the buffer.
609 LOG(a_sprintf("sent %d bytes out of %d.", len_sent, size));
617 outcome spocket::send_to(const internet_address &where_to,
618 const byte_array &to_send, int &len_sent)
620 return send_to(where_to, to_send.observe(), to_send.length(), len_sent);
623 outcome spocket::send_to(const internet_address &where_to, const abyte *to_send,
624 int size, int &len_sent)
628 if (_type == CONNECTED) return BAD_INPUT;
629 sockaddr dest = _stack->convert(where_to);
630 int ret = sendto(_socket, (char *)to_send, size, 0, &dest, sizeof(dest));
631 int error = critical_events::system_error();
633 if (error == SOCK_EWOULDBLOCK) return NONE_READY; // no buffer space?
634 LOG(astring("failed to send packet; error ")
635 + _stack->tcpip_error_name(error));
636 return ACCESS_DENIED;
639 LOG(astring("didn't send whole datagram!"));
645 outcome spocket::receive(byte_array &buffer, int &size)
648 CHECK_BOGUS(NONE_READY);
649 if (_type != CONNECTED) return BAD_INPUT;
650 if (size <= 0) return BAD_INPUT;
652 outcome to_return = receive(buffer.access(), size);
653 // trim the buffer to the actual received size.
654 if (to_return == OKAY)
655 buffer.zap(size, buffer.last());
659 outcome spocket::receive(abyte *buffer, int &size)
662 CHECK_BOGUS(NONE_READY);
663 if (_type != CONNECTED) return BAD_INPUT;
664 ENSURE_HEALTH(NO_CONNECTION);
667 if (expected <= 0) return BAD_INPUT;
669 int len = recv(_socket, (char *)buffer, expected, 0);
671 // check to make sure we're not disconnected.
672 int ret = _socks->select(_socket, raw_socket::SELECTING_JUST_READ);
673 if (ret & SI_DISCONNECTED) {
675 return NO_CONNECTION;
677 // seems like more normal absence of data.
679 } else if (len < 0) {
680 if (critical_events::system_error() == SOCK_EWOULDBLOCK) return NONE_READY;
682 LOG(astring("The receive failed with an error ")
683 + critical_events::system_error_text(critical_events::system_error()));
685 if (!connected()) return NO_CONNECTION;
686 return ACCESS_DENIED;
692 outcome spocket::receive_from(byte_array &buffer, int &size,
693 internet_address &where_from)
695 FUNCDEF("receive_from");
696 where_from = internet_address();
697 CHECK_BOGUS(NONE_READY);
698 if (_type == CONNECTED) return BAD_INPUT;
699 if (size <= 0) return BAD_INPUT;
701 outcome to_return = receive_from(buffer.access(), size, where_from);
702 // trim the buffer to the actual received size.
703 if (to_return == OKAY)
704 buffer.zap(size, buffer.last());
708 outcome spocket::receive_from(abyte *buffer, int &size,
709 internet_address &where_from)
711 FUNCDEF("receive_from");
712 where_from = internet_address();
713 CHECK_BOGUS(NONE_READY);
714 if (_type == CONNECTED) return BAD_INPUT;
715 ENSURE_HEALTH(NO_CONNECTION);
718 if (expected <= 0) return BAD_INPUT;
721 socklen_t fromlen = sizeof(from);
722 int len = recvfrom(_socket, (char *)buffer, expected, 0, &from, &fromlen);
723 int err = critical_events::system_error();
724 if (!len) return NONE_READY;
727 LOG(a_sprintf("actual sys err value=%d", err));
729 if (err == SOCK_EWOULDBLOCK) return NONE_READY;
730 if (err == SOCK_ECONNRESET) return NONE_READY;
731 // this seems to be a necessary windoze kludge; we're not connected
732 // and never were but it says this idiotic garbage about the connection
735 LOG(astring("The recvfrom failed with an error ")
736 + critical_events::system_error_text(err));
738 if (!connected()) return NO_CONNECTION;
739 return ACCESS_DENIED;
741 where_from = _stack->convert(from);