1 /*****************************************************************************\
4 * Author : Chris Koeritz *
6 *******************************************************************************
7 * Copyright (c) 2001-$now By Author. This program is free software; you can *
8 * redistribute it and/or modify it under the terms of the GNU General Public *
9 * License as published by the Free Software Foundation; either version 2 of *
10 * the License or (at your option) any later version. This is online at: *
11 * http://www.fsf.org/copyleft/gpl.html *
12 * Please send any updates to: fred@gruntose.com *
13 \*****************************************************************************/
15 #include "internet_address.h"
16 #include "raw_socket.h"
18 #include "tcpip_stack.h"
20 #include <basis/byte_array.h>
21 #include <basis/functions.h>
22 #include <basis/astring.h>
23 #include <basis/mutex.h>
24 #include <loggers/critical_events.h>
25 #include <loggers/program_wide_logger.h>
26 #include <structures/static_memory_gremlin.h>
27 #include <timely/time_control.h>
28 #include <timely/time_stamp.h>
31 #include <arpa/inet.h>
36 #include <sys/ioctl.h>
37 #include <sys/socket.h>
38 #include <sys/types.h>
43 using namespace basis;
44 using namespace loggers;
45 using namespace structures;
46 using namespace timely;
51 // uncomment for noisy version.
54 #define LOG(to_print) CLASS_EMERGENCY_LOG(program_wide_logger::get(), to_print)
56 const int PENDING_CONNECTIONS_ALLOWED = 14;
57 // we allow this many connections to queue up before they get rejected.
58 // if the OS is windoze, this number is ignored if it's greater than the
59 // hardcoded maximum of like 5.
61 const int RESOLVE_INTERVAL = 300;
62 // we'll re-resolve the ip address at this rate. this mainly comes into
63 // play for the connect call, since the address passed in could have changed
64 // or been invalid to start with. we're not losing much by trying to
65 // resolve the address again during connection time.
67 #define RECOGNIZE_DISCO \
68 _client_bind = false; \
69 _was_connected = false
71 // ensure that the socket is in a good state.
72 #define ENSURE_HEALTH(retval) \
73 if (!was_connected()) return retval; /* never has been. */ \
74 if (!_socket) { RECOGNIZE_DISCO; return retval; /* not set. */ }
76 #define CHECK_BOGUS(retval) \
77 if (is_bogus()) { return retval; /* this spocket is junk. */ }
81 // win32 seems to trip over selects unless we protect them.
82 #define GRAB_LOCK auto_synchronizer l(*_select_lock)
83 // and in truth, the locking turns out to be needed on win32 if we're
84 // going to allow sharing a spocket across threads. this is one of the
85 // design goals so we're honor bound to support that.
91 SAFE_STATIC(mutex, __broken_pipe_synch, )
94 spocket::spocket(const internet_address &where, sock_types type)
98 _was_connected(false),
100 _where(new internet_address(where)),
101 _remote(new internet_address),
102 _socks(new raw_socket),
103 _stack(new tcpip_stack),
104 _select_lock(new mutex),
105 _last_resolve(new time_stamp), // don't force an immediate resolve.
107 _cli_bind(new internet_address)
109 FUNCDEF("constructor");
110 if ( (_type == BROADCAST) || (_type == UNICAST) ) {
111 // casting types are never servers.
113 } else if ( (type == CONNECTED) || (type == BOGUS_SOCK) ) {
114 // nothing special here currently.
116 // this is an unknown type.
117 LOG(a_sprintf("unknown socket type %d; failing out.", _type));
118 //hmmm: without a validity flag of some sort, this doesn't mean much.
125 FUNCDEF("destructor");
127 LOG(a_sprintf("closing spocket: ") + text_form());
135 WHACK(_last_resolve);
137 _client_bind = false;
140 // where and remote don't need to be protected unless we revise the design of
141 // the class and allow a reset or re-open kind of method.
142 const internet_address &spocket::where() const { return *_where; }
143 const internet_address &spocket::remote() const { return *_remote; }
145 tcpip_stack &spocket::stack() const { return *_stack; }
147 // doesn't need to be protected since the sockets are being treated as simple
148 // ints and since _where currently does not get destroyed.
149 astring spocket::text_form()
151 FUNCDEF("text_form");
152 LOG("INTO TEXT_FORM A");
153 astring to_return = is_client()? "client" :
154 (is_root_server()? "root-server" : "server");
155 LOG("INTO TEXT_FORM B");
156 to_return += " spocket: ";
157 LOG("INTO TEXT_FORM C");
159 LOG("INTO TEXT_FORM C.1");
160 to_return += "connected, ";
162 LOG("INTO TEXT_FORM C.2");
163 if (was_connected()) to_return += "unconnected (was once), ";
164 else to_return += "never-connected, ";
165 LOG("INTO TEXT_FORM C.3");
167 LOG("INTO TEXT_FORM D");
168 to_return += a_sprintf("socket=%u, ", _socket);
169 LOG("INTO TEXT_FORM E");
170 if (is_root_server()) {
171 LOG("INTO TEXT_FORM F");
172 to_return += a_sprintf("root-socket=%u, ", _server_socket);
173 LOG("INTO TEXT_FORM G");
175 LOG("INTO TEXT_FORM H");
176 to_return += _where->text_form().s();
177 LOG("INTO TEXT_FORM X");
181 void spocket::bind_client(const internet_address &addr)
187 const char *spocket::outcome_name(const outcome &to_name)
189 switch (to_name.value()) {
190 case NOT_SERVER: return "NOT_SERVER";
191 default: return communication_commons::outcome_name(to_name);
195 outcome spocket::disconnect()
197 FUNCDEF("disconnect");
201 LOG(a_sprintf("closing socket %d", _socket));
203 _socks->close(_socket);
206 if (_server_socket) {
208 LOG(a_sprintf("closing server socket %d", _server_socket));
210 _socks->close(_server_socket);
216 bool spocket::connected()
218 FUNCDEF("connected");
219 ENSURE_HEALTH(false);
221 if (_type != CONNECTED) return was_connected();
223 if (!_socket) return false;
225 LOG("conn check, _sock not null");
227 // do examination on spocket.
230 LOG(a_sprintf("lock was grabbed, socks is %x", _socks));
233 int ret = _socks->select(_socket, sel_mode);
238 return true; // we are happy.
240 if ( (ret & SI_DISCONNECTED) || (ret & SI_ERRONEOUS) ) {
246 LOG("caught exception thrown from select, returning false.");
251 outcome spocket::await_readable(int timeout)
253 FUNCDEF("await_readable");
254 CHECK_BOGUS(NO_CONNECTION);
255 ENSURE_HEALTH(NO_CONNECTION);
257 int mode = raw_socket::SELECTING_JUST_READ;
258 int ret = _socks->select(_socket, mode, timeout);
259 if (ret & SI_READABLE) return OKAY;
260 // we found something to report.
261 if (ret & SI_DISCONNECTED) {
263 return NO_CONNECTION;
265 return _socket? NONE_READY : NO_CONNECTION;
266 // nothing is ready currently.
269 outcome spocket::await_writable(int timeout)
271 FUNCDEF("await_writable");
272 CHECK_BOGUS(NO_CONNECTION);
273 ENSURE_HEALTH(NO_CONNECTION);
275 int mode = raw_socket::SELECTING_JUST_WRITE;
276 int ret = _socks->select(_socket, mode, timeout);
277 if (ret & SI_WRITABLE) return OKAY;
278 // we found something to report.
279 if (ret & SI_DISCONNECTED) {
281 return NO_CONNECTION;
283 return _socket? NONE_READY : NO_CONNECTION;
284 // nothing is ready currently.
287 outcome spocket::connect(int communication_wait)
290 CHECK_BOGUS(NO_CONNECTION);
292 GRAB_LOCK; // short lock.
293 if ( (was_connected() && !_client) || _server_socket) {
295 LOG("this object was already opened as a server!");
299 _client = true; // set our state now that we're sure this is okay.
300 _was_connected = false; // reset this, since we're connecting now.
304 // the socket was never created (or was cleaned up previously). this is
305 // where we create the socket so we can communicate.
307 LOG(astring("creating socket now for ") + _where->text_form());
310 int sock_type = SOCK_STREAM;
311 int proto = IPPROTO_TCP;
313 if ( (_type == BROADCAST) || (_type == UNICAST) ) {
314 sock_type = SOCK_DGRAM;
317 _socket = int(::socket(AF_INET, sock_type, proto));
318 if ( (_socket == basis::un_int(INVALID_SOCKET)) || !_socket) {
320 LOG("Failed to open the client's connecting spocket.");
321 return ACCESS_DENIED;
323 LOG(a_sprintf("hola, socket value received is: %d", _socket));
325 // mark the spocket for _blocking_ I/O. we want connect to sit there
326 // until it's connected or returns with an error.
327 _socks->set_non_blocking(_socket, false);
329 LOG(a_sprintf("set nonblock false on : %d", _socket));
331 if (_type == BROADCAST) {
332 if (!_socks->set_broadcast(_socket)) return ACCESS_DENIED;
333 // mark the socket for broadcast capability.
336 if (!_socks->set_reuse_address(_socket)) return ACCESS_DENIED;
337 // mark the socket so we don't get bind errors on in-use conditions.
338 LOG(a_sprintf("set reuse addr : %d", _socket));
341 if (_type == CONNECTED) {
343 // turn on the keepalive timer so that loss of the connection will
344 // eventually be detected by the OS. the duration that is allowed to
345 // elapse before a dead connection is noticed varies with the operating
346 // system and is not configured at this level.
347 if (!_socks->set_keep_alive(_socket)) {
349 LOG("couldn't set watchdog timer on socket.");
353 //hmmm: doesn't this need to be done for bcast too?
355 // create the spocket address that we will connect to.
356 if (strlen(_where->hostname)
357 // && (_where->is_nil_address()
358 // || (*_last_resolve < time_stamp(-RESOLVE_INTERVAL) ) ) ) {
360 //moving to always re-resolving before a connect. otherwise we have somewhat
361 //hard to predict behavior about when the re-resolve will happen.
363 // we know we need to resolve if the address is NIL or if the re-resolve
364 // interval has elapsed.
366 byte_array ip_addr = _stack->full_resolve(_where->hostname, full_host);
367 if (ip_addr.length()) {
368 ip_addr.stuff(internet_address::ADDRESS_SIZE, _where->ip_address);
369 LOG(astring("successfully re-resolved address--") + _where->text_form());
371 *_last_resolve = time_stamp(); // reset since we just resolved.
374 // special code for forcing a client to bind.
376 sockaddr sock = _stack->convert(*_cli_bind);
379 LOG(a_sprintf("binding client socket %d to ", _socket)
380 + inet_ntoa(((sockaddr_in *)&sock)->sin_addr));
383 // now, the socket address is bound to our socket.
384 if (negative(bind(_socket, &sock, sizeof(sock)))) {
385 LOG(a_sprintf("error binding socket %d to ", _socket)
386 + inet_ntoa(((sockaddr_in *)&sock)->sin_addr));
390 } else if ( (_type == BROADCAST) || (_type == UNICAST) ) {
391 // this is the last piece of preparation for a broadcast or unicast socket.
392 // there's no real connection, so we just need to get it bound and ready
395 sockaddr sock = _stack->convert(*_where);
398 LOG(a_sprintf("binding socket %d to ", _socket)
399 + inet_ntoa(((sockaddr_in *)&sock)->sin_addr));
402 // now, the socket address is bound to our socket.
403 if (negative(bind(_socket, &sock, sizeof(sock)))) {
404 LOG(a_sprintf("error binding socket %d to ", _socket)
405 + inet_ntoa(((sockaddr_in *)&sock)->sin_addr));
408 // that's it for broadcast preparation. we should be ready.
409 _was_connected = true;
413 // the following is for connected mode only.
415 sockaddr sock = _stack->convert(*_where);
417 // attempt the connection now.
419 //hmmm: error returns are done differently on bsd, right?
420 //hmmm: perhaps hide the base connect in a func that sets our internal
421 // error variable and then allows comparison to enums we provide.
423 time_stamp abort_time(communication_wait);
425 bool connected = false; // did we connect.
427 int sock_len = sizeof(sock);
429 while (time_stamp() < abort_time) {
430 // make the low-level socket connection.
431 int ret = ::connect(_socket, &sock, sock_len);
432 if (ret != SOCKET_ERROR) {
434 _socks->set_non_blocking(_socket, true);
438 basis::un_int last_error = critical_events::system_error();
440 // if we're already done, then make this look like a normal connect.
441 if (last_error == SOCK_EISCONN) {
446 if ( (last_error != SOCK_EWOULDBLOCK)
447 && (last_error != SOCK_EINPROGRESS) ) {
448 // this seems like a real error here.
450 LOG(a_sprintf("Connect failed (error %s or %d) on address:",
451 critical_events::system_error_text(last_error).s(), last_error)
452 + _where->text_form());
454 if (last_error == SOCK_ECONNREFUSED) return NO_ANSWER;
455 //hmmm: fix more of the possibilities to be sensible outcomes?
456 return ACCESS_DENIED;
459 if (time_stamp() >= abort_time) break; // skip before sleeping if T.O.
461 // snooze for a bit before trying again.
462 time_control::sleep_ms(10);
467 LOG(a_sprintf("socket %d connected to server.", _socket));
469 GRAB_LOCK; // short lock.
470 _was_connected = true;
477 outcome spocket::accept(spocket * &sock, bool wait)
480 CHECK_BOGUS(NO_CONNECTION);
481 if (_type != CONNECTED) return BAD_INPUT;
483 // we don't lock in here; we should not be locking on the server socket.
485 sock = NIL; // reset.
489 LOG("tried to accept on a client spocket.");
495 if (!_server_socket) {
496 _server_socket = int(::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP));
498 LOG(a_sprintf("srv sock is %d", _server_socket));
499 LOG(astring("creating server socket now for ") + _where->text_form());
502 if (_server_socket == basis::un_int(INVALID_SOCKET)) {
503 LOG("Failed to open the serving spocket.");
507 // mark the socket so we don't get bind errors on in-use conditions.
508 if (!_socks->set_reuse_address(_server_socket))
509 LOG("Failed to mark the socket for re-use.");
511 // create the spocket address for where we exist.
512 sockaddr sock = _stack->convert(*_where);
514 // now, the spocket address is bound to our spocket.
515 int sock_len = sizeof(sock);
516 if (bind(_server_socket, (sockaddr *)&sock, sock_len) < 0) {
517 LOG(astring("Error on bind of ") + critical_events::system_error_text(critical_events::system_error()));
518 _socks->close(_server_socket);
519 return ACCESS_DENIED;
522 // now listen for a connection on our spocket.
523 if (listen(_server_socket, PENDING_CONNECTIONS_ALLOWED) < 0) {
524 LOG(astring("Listen failed with error of ")
525 + critical_events::system_error_text(critical_events::system_error()));
526 _socks->close(_server_socket);
527 return ACCESS_DENIED;
531 // do the kind of accept they want; either block on it or don't.
532 // since our server socket is never used for sends or receives, we pretty
533 // much control it completely and this is safe.
535 _socks->set_non_blocking(_server_socket, true);
536 // mark our socket as non-blocking so we don't get stuck in accepts.
538 _socks->set_non_blocking(_server_socket, false);
539 // mark our socket as blocking; we will be paused until accept occurs.
542 // now try accepting a connection on the spocket.
544 socklen_t sock_len = sizeof(new_sock);
545 basis::un_int accepted = int(::accept(_server_socket, &new_sock, &sock_len));
546 int error = critical_events::system_error();
547 if (!accepted || (accepted == INVALID_SOCKET)) {
548 if (error == SOCK_EWOULDBLOCK) return NO_CONNECTION;
550 LOG(astring("Accept got no client, with an error of ")
551 + critical_events::system_error_text(error));
553 return ACCESS_DENIED;
556 LOG(a_sprintf("accepted socket number is %d", accepted));
558 // mark the new spocket for non-blocking I/O.
559 _socks->set_non_blocking(accepted, true);
561 LOG("after set nonblockheading");
563 //move to socks object!
565 if (setsockopt(accepted, SOL_SOCKET, SO_KEEPALIVE, (char *)&sock_hop,
566 sizeof(sock_hop)) < 0) {
568 LOG("couldn't set watchdog timer on socket.");
573 LOG(astring("accepted a client on our socket: ") + _where->text_form());
576 // NOTE: normally, our network code sets the spocket to be kept alive (using
577 // keep alives), but we are trying to have a minimal spocket usage and
578 // a minimal network load for this test scenario.
580 // create the spocket address that we will connect to.
581 sock = new spocket(*_where);
582 LOG("preview new spock A:"); LOG(sock->text_form());
583 *sock->_remote = _stack->convert(new_sock);
584 LOG("preview new spock B:"); LOG(sock->text_form());
585 sock->_socket = accepted;
586 LOG("preview new spock C:"); LOG(sock->text_form());
587 sock->_server_socket = 0; // reset to avoid whacking.
588 LOG("preview new spock D:"); LOG(sock->text_form());
589 sock->_was_connected = true;
590 LOG("about to return this spocket as new spock:");
591 LOG(sock->text_form());
595 outcome spocket::send(const byte_array &to_send, int &len_sent)
597 return send(to_send.observe(), to_send.length(), len_sent);
600 outcome spocket::send(const abyte *buffer, int size, int &len_sent)
604 if (_type != CONNECTED) return BAD_INPUT;
606 ENSURE_HEALTH(NO_CONNECTION);
608 len_sent = ::send(_socket, (char *)buffer, size, 0);
609 int error_code = critical_events::system_error();
612 LOG("No data went out on the spocket.");
616 if (len_sent == SOCKET_ERROR) {
617 if (error_code == SOCK_EWOULDBLOCK) {
619 LOG("would block, will try later...");
621 LOG("HEY HEY! some was sent but we were not counting it!!!");
626 LOG(astring("Error ") + critical_events::system_error_text(error_code)
627 + " occurred during the send!");
629 if (!connected()) return NO_CONNECTION;
631 LOG(a_sprintf("forcing disconnect on socket %d.", _socket));
633 // we're trying this new approach here... we found that the socket doesn't
634 // really know that it got disconnected in some circumstances.
636 return ACCESS_DENIED;
638 if (len_sent != size) {
639 // only sent part of the buffer.
641 LOG(a_sprintf("sent %d bytes out of %d.", len_sent, size));
649 outcome spocket::send_to(const internet_address &where_to,
650 const byte_array &to_send, int &len_sent)
652 return send_to(where_to, to_send.observe(), to_send.length(), len_sent);
655 outcome spocket::send_to(const internet_address &where_to, const abyte *to_send,
656 int size, int &len_sent)
660 if (_type == CONNECTED) return BAD_INPUT;
661 sockaddr dest = _stack->convert(where_to);
662 int ret = sendto(_socket, (char *)to_send, size, 0, &dest, sizeof(dest));
663 int error = critical_events::system_error();
665 if (error == SOCK_EWOULDBLOCK) return NONE_READY; // no buffer space?
666 LOG(astring("failed to send packet; error ")
667 + _stack->tcpip_error_name(error));
668 return ACCESS_DENIED;
671 LOG(astring("didn't send whole datagram!"));
677 outcome spocket::receive(byte_array &buffer, int &size)
680 CHECK_BOGUS(NONE_READY);
681 if (_type != CONNECTED) return BAD_INPUT;
682 if (size <= 0) return BAD_INPUT;
684 outcome to_return = receive(buffer.access(), size);
685 // trim the buffer to the actual received size.
686 if (to_return == OKAY)
687 buffer.zap(size, buffer.last());
691 outcome spocket::receive(abyte *buffer, int &size)
695 CHECK_BOGUS(NONE_READY);
697 if (_type != CONNECTED) return BAD_INPUT;
698 ENSURE_HEALTH(NO_CONNECTION);
702 if (expected <= 0) return BAD_INPUT;
706 int len = recv(_socket, (char *)buffer, expected, 0);
710 // check to make sure we're not disconnected.
711 int ret = _socks->select(_socket, raw_socket::SELECTING_JUST_READ);
713 if (ret & SI_DISCONNECTED) {
715 return NO_CONNECTION;
718 // seems like more normal absence of data.
720 } else if (len < 0) {
722 if (critical_events::system_error() == SOCK_EWOULDBLOCK) return NONE_READY;
724 LOG(astring("The receive failed with an error ")
725 + critical_events::system_error_text(critical_events::system_error()));
728 if (!connected()) return NO_CONNECTION;
730 return ACCESS_DENIED;
737 outcome spocket::receive_from(byte_array &buffer, int &size,
738 internet_address &where_from)
740 FUNCDEF("receive_from");
741 where_from = internet_address();
742 CHECK_BOGUS(NONE_READY);
743 if (_type == CONNECTED) return BAD_INPUT;
744 if (size <= 0) return BAD_INPUT;
746 outcome to_return = receive_from(buffer.access(), size, where_from);
747 // trim the buffer to the actual received size.
748 if (to_return == OKAY)
749 buffer.zap(size, buffer.last());
753 outcome spocket::receive_from(abyte *buffer, int &size,
754 internet_address &where_from)
756 FUNCDEF("receive_from");
757 where_from = internet_address();
758 CHECK_BOGUS(NONE_READY);
759 if (_type == CONNECTED) return BAD_INPUT;
760 ENSURE_HEALTH(NO_CONNECTION);
763 if (expected <= 0) return BAD_INPUT;
766 socklen_t fromlen = sizeof(from);
767 int len = recvfrom(_socket, (char *)buffer, expected, 0, &from, &fromlen);
768 int err = critical_events::system_error();
769 if (!len) return NONE_READY;
772 LOG(a_sprintf("actual sys err value=%d", err));
774 if (err == SOCK_EWOULDBLOCK) return NONE_READY;
775 if (err == SOCK_ECONNRESET) return NONE_READY;
776 // this seems to be a necessary windoze kludge; we're not connected
777 // and never were but it says this idiotic garbage about the connection
780 LOG(astring("The recvfrom failed with an error ")
781 + critical_events::system_error_text(err));
783 if (!connected()) return NO_CONNECTION;
784 return ACCESS_DENIED;
786 where_from = _stack->convert(from);