1 #include <c10d/Utils.hpp> 7 #include <netinet/in.h> 8 #include <netinet/tcp.h> 24 constexpr
int LISTEN_QUEUE_SIZE = 64;
26 void setSocketNoDelay(
int socket) {
28 socklen_t optlen =
sizeof(flag);
29 SYSCHECK_ERR_RETURN_NEG1(setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, (
char*)&flag, optlen));
32 PortType getSocketPort(
int fd) {
34 struct ::sockaddr_storage addrStorage;
35 socklen_t addrLen =
sizeof(addrStorage);
36 SYSCHECK_ERR_RETURN_NEG1(getsockname(
37 fd, reinterpret_cast<struct ::sockaddr*>(&addrStorage), &addrLen));
39 if (addrStorage.ss_family == AF_INET) {
40 struct ::sockaddr_in* addr =
41 reinterpret_cast<struct ::sockaddr_in*
>(&addrStorage);
42 listenPort = ntohs(addr->sin_port);
44 }
else if (addrStorage.ss_family == AF_INET6) {
45 struct ::sockaddr_in6* addr =
46 reinterpret_cast<struct ::sockaddr_in6*
>(&addrStorage);
47 listenPort = ntohs(addr->sin6_port);
50 throw std::runtime_error(
"unsupported protocol");
57 std::string sockaddrToString(struct ::sockaddr* addr) {
58 char address[INET6_ADDRSTRLEN + 1];
59 if (addr->sa_family == AF_INET) {
60 struct ::sockaddr_in* s =
reinterpret_cast<struct ::sockaddr_in*
>(addr);
61 SYSCHECK(::inet_ntop(AF_INET, &(s->sin_addr), address, INET_ADDRSTRLEN), __output !=
nullptr)
62 address[INET_ADDRSTRLEN] = '\0';
63 } else if (addr->sa_family == AF_INET6) {
64 struct ::sockaddr_in6* s =
reinterpret_cast<struct ::sockaddr_in6*
>(addr);
65 SYSCHECK(::inet_ntop(AF_INET6, &(s->sin6_addr), address, INET6_ADDRSTRLEN), __output !=
nullptr)
66 address[INET6_ADDRSTRLEN] = '\0';
68 throw std::runtime_error(
"unsupported protocol");
74 std::pair<int, PortType> listen(PortType port) {
75 struct ::addrinfo hints, *res = NULL;
76 std::memset(&hints, 0x00,
sizeof(hints));
77 hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
78 hints.ai_family = AF_UNSPEC;
79 hints.ai_socktype = SOCK_STREAM;
84 int err = ::getaddrinfo(
nullptr, std::to_string(port).data(), &hints, &res);
85 if (err != 0 || !res) {
86 throw std::invalid_argument(
87 "cannot find host to listen on: " + std::string(gai_strerror(err)));
90 std::shared_ptr<struct ::addrinfo> addresses(
91 res, [](struct ::addrinfo* p) { ::freeaddrinfo(p); });
93 struct ::addrinfo* nextAddr = addresses.get();
97 SYSCHECK_ERR_RETURN_NEG1(
100 nextAddr->ai_socktype,
101 nextAddr->ai_protocol))
104 SYSCHECK_ERR_RETURN_NEG1(
105 ::setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(
int)))
107 SYSCHECK_ERR_RETURN_NEG1(::bind(socket, nextAddr->ai_addr, nextAddr->ai_addrlen))
108 SYSCHECK_ERR_RETURN_NEG1(::listen(socket, LISTEN_QUEUE_SIZE))
111 } catch (const
std::system_error& e) {
113 nextAddr = nextAddr->ai_next;
124 return {socket, getSocketPort(socket)};
128 const std::string& address,
131 const std::chrono::milliseconds& timeout) {
132 struct ::addrinfo hints, *res = NULL;
133 std::memset(&hints, 0x00,
sizeof(hints));
134 hints.ai_flags = AI_NUMERICSERV;
135 hints.ai_family = AF_UNSPEC;
136 hints.ai_socktype = SOCK_STREAM;
142 ::getaddrinfo(address.data(), std::to_string(port).data(), &hints, &res);
143 if (err != 0 || !res) {
144 throw std::invalid_argument(
145 "host not found: " + std::string(gai_strerror(err)));
148 std::shared_ptr<struct ::addrinfo> addresses(
149 res, [](struct ::addrinfo* p) { ::freeaddrinfo(p); });
151 struct ::addrinfo* nextAddr = addresses.get();
155 bool anyRefused =
false;
158 SYSCHECK_ERR_RETURN_NEG1(
161 nextAddr->ai_socktype,
162 nextAddr->ai_protocol))
164 ResourceGuard socketGuard([socket]() { ::close(socket); });
167 SYSCHECK_ERR_RETURN_NEG1(::fcntl(socket, F_SETFL, O_NONBLOCK));
169 int ret = ::connect(socket, nextAddr->ai_addr, nextAddr->ai_addrlen);
171 if (ret != 0 && errno != EINPROGRESS) {
172 throw std::system_error(errno, std::system_category());
177 pfd.events = POLLOUT;
179 int numReady = ::poll(&pfd, 1, timeout.count());
181 throw std::system_error(errno, std::system_category());
182 }
else if (numReady == 0) {
184 throw std::runtime_error(
"connect() timed out");
187 socklen_t errLen =
sizeof(errno);
189 ::getsockopt(socket, SOL_SOCKET, SO_ERROR, &errno, &errLen);
196 throw std::system_error(errno, std::system_category());
201 SYSCHECK_ERR_RETURN_NEG1(flags = ::fcntl(socket, F_GETFL));
202 SYSCHECK_ERR_RETURN_NEG1(::fcntl(socket, F_SETFL, flags & (~O_NONBLOCK)));
203 socketGuard.release();
206 }
catch (std::exception& e) {
207 if (errno == ECONNREFUSED) {
213 nextAddr = nextAddr->ai_next;
217 if (!wait || !anyRefused) {
220 std::this_thread::sleep_for(std::chrono::seconds(1));
222 nextAddr = addresses.get();
227 setSocketNoDelay(socket);
232 std::tuple<int, std::string> accept(
234 const std::chrono::milliseconds& timeout) {
236 std::unique_ptr<struct ::pollfd[]> events(
new struct ::pollfd[1]);
237 events[0] = {.fd = listenSocket, .events = POLLIN};
240 int res = ::poll(events.get(), 1, timeout.count());
242 throw std::runtime_error(
243 "waiting for processes to " 244 "connect has timed out");
245 }
else if (res == -1) {
246 if (errno == EINTR) {
249 throw std::system_error(errno, std::system_category());
251 if (!(events[0].revents & POLLIN))
252 throw std::system_error(ECONNABORTED, std::system_category());
258 SYSCHECK_ERR_RETURN_NEG1(socket = ::accept(listenSocket, NULL, NULL))
261 struct ::sockaddr_storage addr;
262 socklen_t addrLen = sizeof(addr);
263 SYSCHECK_ERR_RETURN_NEG1(::getpeername(
264 socket, reinterpret_cast<struct ::sockaddr*>(&addr), &addrLen))
266 setSocketNoDelay(socket);
268 return
std::make_tuple(
269 socket, sockaddrToString(reinterpret_cast<struct ::sockaddr*>(&addr)));