1//! @file socket.cpp
2//! @author ryftchen
3//! @brief The definitions (socket) in the utility module.
4//! @version 0.1.0
5//! @copyright Copyright (c) 2022-2025 ryftchen. All rights reserved.
6
7#include "socket.hpp"
8
9#include <sys/poll.h>
10#include <netdb.h>
11#include <algorithm>
12#include <array>
13#include <cstring>
14#include <vector>
15
16namespace utility::socket
17{
18//! @brief Function version number.
19//! @return version number (major.minor.patch)
20const char* version() noexcept
21{
22 static const char* const ver = "0.1.0";
23 return ver;
24}
25
26//! @brief Get the ip address from transport information.
27//! @param addr - transport information
28//! @return ip address string
29static std::string ipAddrString(const ::sockaddr_in& addr)
30{
31 std::array<char, INET_ADDRSTRLEN> ip{};
32 ::inet_ntop(AF_INET, cp: &addr.sin_addr, buf: ip.data(), len: ip.size());
33 return std::string{ip.data()};
34}
35
36//! @brief Get the errno string safely.
37//! @return errno string
38static std::string safeStrErrno()
39{
40 std::array<char, 64> buffer{};
41#ifdef _GNU_SOURCE
42 return ::strerror_r(errno, buf: buffer.data(), buflen: buffer.size());
43#else
44 return (::strerror_r(errno, buffer.data(), buffer.size()) == 0) ? std::string{buffer.data()} : "Unknown error";
45#endif // _GNU_SOURCE
46}
47
48// NOLINTBEGIN(cppcoreguidelines-pro-type-reinterpret-cast)
49Socket::Socket(const Type sockType, const int sockId)
50{
51 ::pthread_spin_init(lock: &sockLock, pshared: ::PTHREAD_PROCESS_PRIVATE);
52 if (sockId != -1)
53 {
54 sock = sockId;
55 return;
56 }
57
58 sock = ::socket(AF_INET, type: static_cast<std::uint8_t>(sockType), protocol: 0);
59 if (sock == -1)
60 {
61 throw std::runtime_error{"Socket creation error, errno: " + safeStrErrno() + '.'};
62 }
63}
64
65Socket::~Socket()
66{
67 toClose();
68 ::pthread_spin_destroy(lock: &sockLock);
69}
70
71void Socket::toClose()
72{
73 requestStop();
74
75 const Guard lock(*this);
76 ::shutdown(fd: sock, how: ::SHUT_RDWR);
77 ::close(fd: sock);
78}
79
80void Socket::toJoin()
81{
82 if (asyncTask.valid() && (asyncTask.wait_until(abs: std::chrono::system_clock::now()) != std::future_status::ready))
83 {
84 asyncTask.wait();
85 }
86
87 while (!stopRequested())
88 {
89 std::this_thread::yield();
90 }
91}
92
93void Socket::requestStop()
94{
95 exitReady.store(i: true);
96}
97
98bool Socket::stopRequested() const
99{
100 return exitReady.load();
101}
102
103std::string Socket::transportAddress() const
104{
105 return ipAddrString(addr: sockAddr);
106}
107
108std::uint16_t Socket::transportPort() const
109{
110 return ::ntohs(netshort: sockAddr.sin_port);
111}
112
113void Socket::spinLock() const
114{
115 ::pthread_spin_lock(lock: &sockLock);
116}
117
118void Socket::spinUnlock() const
119{
120 ::pthread_spin_unlock(lock: &sockLock);
121}
122
123template <typename Func, typename... Args>
124void Socket::launchAsyncTask(Func&& func, Args&&... args)
125{
126 asyncTask = std::async(std::launch::async, std::forward<Func>(func), std::forward<Args>(args)...);
127}
128
129::ssize_t TCPSocket::toSend(const char* const bytes, const std::size_t size)
130{
131 const Guard lock(*this);
132 return ::send(fd: sock, buf: bytes, n: size, flags: 0);
133}
134
135::ssize_t TCPSocket::toSend(const std::string_view message)
136{
137 return toSend(bytes: message.data(), size: message.length());
138}
139
140void TCPSocket::toConnect(const std::string& ip, const std::uint16_t port)
141{
142 ::addrinfo* addrInfo = nullptr;
143 ::addrinfo hints{};
144 hints.ai_family = AF_INET;
145 hints.ai_socktype = ::SOCK_STREAM;
146
147 if (const int status = ::getaddrinfo(name: ip.c_str(), service: nullptr, req: &hints, pai: &addrInfo); status != 0)
148 {
149 throw std::runtime_error{
150 "Invalid address, status: " + std::string{::gai_strerror(ecode: status)} + ", errno: " + safeStrErrno() + '.'};
151 }
152
153 for (const ::addrinfo* entry = addrInfo; entry != nullptr; entry = entry->ai_next)
154 {
155 if (entry->ai_family == AF_INET)
156 {
157 std::memcpy(dest: static_cast<void*>(&sockAddr), src: static_cast<void*>(entry->ai_addr), n: sizeof(::sockaddr_in));
158 break;
159 }
160 }
161 ::freeaddrinfo(ai: addrInfo);
162
163 sockAddr.sin_family = AF_INET;
164 sockAddr.sin_port = ::htons(hostshort: port);
165 sockAddr.sin_addr.s_addr = static_cast<std::uint32_t>(sockAddr.sin_addr.s_addr);
166 if (::connect(fd: sock, addr: reinterpret_cast<const ::sockaddr*>(&sockAddr), len: sizeof(::sockaddr_in)) == -1)
167 {
168 throw std::runtime_error{"Failed to connect to the socket, errno: " + safeStrErrno() + '.'};
169 }
170
171 toReceive();
172}
173
174void TCPSocket::toReceive(const bool detach)
175{
176 if (auto self = shared_from_this(); !detach)
177 {
178 launchAsyncTask(func&: toRecv, args&: self);
179 }
180 else
181 {
182 std::thread(toRecv, self).detach();
183 }
184}
185
186void TCPSocket::subscribeMessage(MessageCallback callback)
187{
188 msgCb.store(desired: std::make_shared<decltype(callback)>(args: std::move(callback)), o: std::memory_order_release);
189}
190
191void TCPSocket::subscribeRawMessage(RawMessageCallback callback)
192{
193 rawMsgCb.store(desired: std::make_shared<decltype(callback)>(args: std::move(callback)), o: std::memory_order_release);
194}
195
196void TCPSocket::toRecv(const std::shared_ptr<TCPSocket> socket) // NOLINT(performance-unnecessary-value-param)
197{
198 std::array<char, bufferSize> tempBuffer{};
199 std::vector<::pollfd> pollFDs(1);
200 pollFDs.at(n: 0).fd = socket->sock;
201 pollFDs.at(n: 0).events = POLLIN;
202 for (constexpr std::uint8_t timeout = 10; !socket->stopRequested();)
203 {
204 const int status = ::poll(fds: pollFDs.data(), nfds: pollFDs.size(), timeout: timeout);
205 if (status == -1)
206 {
207 throw std::runtime_error{"Not the expected wait result for poll, errno: " + safeStrErrno() + '.'};
208 }
209 if (status == 0)
210 {
211 continue;
212 }
213
214 if (::ssize_t msgLen = 0; pollFDs.at(n: 0).revents & POLLIN)
215 {
216 if (const Guard lock(*socket); true)
217 {
218 msgLen = ::recv(fd: socket->sock, buf: tempBuffer.data(), n: tempBuffer.size(), flags: 0);
219 if (msgLen <= 0)
220 {
221 break;
222 }
223 }
224
225 tempBuffer[msgLen] = '\0';
226 socket->onMessage(message: std::string(tempBuffer.data(), msgLen));
227 socket->onRawMessage(bytes: tempBuffer.data(), size: msgLen);
228 }
229 }
230
231 socket->toClose();
232}
233
234void TCPSocket::onMessage(const std::string_view message) const
235{
236 const auto& callback = msgCb.load(o: std::memory_order_acquire);
237 if (callback && *callback)
238 {
239 (*callback)(message);
240 }
241}
242
243void TCPSocket::onRawMessage(char* const bytes, const std::size_t size) const
244{
245 const auto& callback = rawMsgCb.load(o: std::memory_order_acquire);
246 if (callback && *callback)
247 {
248 (*callback)(bytes, size);
249 }
250}
251
252TCPServer::TCPServer() : Socket(Type::tcp)
253{
254 const Guard lock(*this);
255 int opt1 = 1;
256 int opt2 = 0;
257 ::setsockopt(fd: sock, SOL_SOCKET, SO_REUSEADDR, optval: &opt1, optlen: sizeof(opt1));
258 ::setsockopt(fd: sock, SOL_SOCKET, SO_REUSEPORT, optval: &opt2, optlen: sizeof(opt2));
259}
260
261void TCPServer::toBind(const std::string& ip, const std::uint16_t port)
262{
263 if (::inet_pton(AF_INET, cp: ip.c_str(), buf: &sockAddr.sin_addr) == -1)
264 {
265 throw std::runtime_error{"Invalid address, address type is not supported, errno: " + safeStrErrno() + '.'};
266 }
267
268 sockAddr.sin_family = AF_INET;
269 sockAddr.sin_port = ::htons(hostshort: port);
270 if (const Guard lock(*this); ::bind(fd: sock, addr: reinterpret_cast<const ::sockaddr*>(&sockAddr), len: sizeof(sockAddr)) == -1)
271 {
272 throw std::runtime_error{"Failed to bind the socket, errno: " + safeStrErrno() + '.'};
273 }
274}
275
276void TCPServer::toBind(const std::uint16_t port)
277{
278 toBind(ip: "0.0.0.0", port);
279}
280
281void TCPServer::toListen()
282{
283 constexpr std::uint8_t retryTimes = 10;
284 if (const Guard lock(*this); ::listen(fd: sock, n: retryTimes) == -1)
285 {
286 throw std::runtime_error{"Server could not listen on the socket, errno: " + safeStrErrno() + '.'};
287 }
288}
289
290void TCPServer::toAccept(const bool detach)
291{
292 if (auto weakSelf = std::weak_ptr<TCPServer>(shared_from_this()); !detach)
293 {
294 launchAsyncTask(
295 func: [weakSelf]
296 {
297 if (auto sharedSelf = weakSelf.lock())
298 {
299 sharedSelf->toAccept(server: sharedSelf);
300 }
301 });
302 }
303 else
304 {
305 std::thread(
306 [weakSelf]
307 {
308 if (auto sharedSelf = weakSelf.lock())
309 {
310 sharedSelf->toAccept(server: sharedSelf);
311 }
312 })
313 .detach();
314 }
315}
316
317void TCPServer::subscribeConnection(ConnectionCallback callback)
318{
319 connCb.store(desired: std::make_shared<decltype(callback)>(args: std::move(callback)), o: std::memory_order_release);
320}
321
322void TCPServer::toAccept(const std::shared_ptr<TCPServer> server) // NOLINT(performance-unnecessary-value-param)
323{
324 ::sockaddr_in newSockAddr{};
325 ::socklen_t newSockAddrLen = sizeof(newSockAddr);
326
327 for (std::vector<std::shared_ptr<TCPSocket>> activeSockets{};;)
328 {
329 const int newSock = ::accept(fd: server->sock, addr: reinterpret_cast<::sockaddr*>(&newSockAddr), addr_len: &newSockAddrLen);
330 if (newSock == -1)
331 {
332 std::for_each(
333 first: activeSockets.cbegin(), last: activeSockets.cend(), f: [](const auto& socket) { socket->requestStop(); });
334 if ((errno == EBADF) || (errno == EINVAL))
335 {
336 return;
337 }
338 throw std::runtime_error{"Error while accepting a new connection, errno: " + safeStrErrno() + '.'};
339 }
340
341 auto newSocket = std::make_shared<TCPSocket>(args: newSock);
342 newSocket->sockAddr = newSockAddr;
343 server->onConnection(client: newSocket);
344
345 newSocket->toReceive(detach: true);
346 activeSockets.emplace_back(args: std::move(newSocket));
347 }
348}
349
350void TCPServer::onConnection(
351 const std::shared_ptr<TCPSocket> client) const // NOLINT(performance-unnecessary-value-param)
352{
353 const auto& callback = connCb.load(o: std::memory_order_acquire);
354 if (callback && *callback)
355 {
356 (*callback)(client);
357 }
358}
359
360::ssize_t UDPSocket::toSendTo(
361 const char* const bytes, const std::size_t size, const std::string& ip, const std::uint16_t port)
362{
363 ::addrinfo* addrInfo = nullptr;
364 ::addrinfo hints{};
365 hints.ai_family = AF_INET;
366 hints.ai_socktype = ::SOCK_DGRAM;
367
368 if (const int status = ::getaddrinfo(name: ip.c_str(), service: nullptr, req: &hints, pai: &addrInfo); status != 0)
369 {
370 throw std::runtime_error{
371 "Invalid address, status: " + std::string{::gai_strerror(ecode: status)} + ", errno: " + safeStrErrno() + '.'};
372 }
373
374 ::sockaddr_in addr{};
375 for (const ::addrinfo* entry = addrInfo; entry != nullptr; entry = entry->ai_next)
376 {
377 if (entry->ai_family == AF_INET)
378 {
379 std::memcpy(dest: static_cast<void*>(&addr), src: static_cast<void*>(entry->ai_addr), n: sizeof(::sockaddr_in));
380 break;
381 }
382 }
383 ::freeaddrinfo(ai: addrInfo);
384
385 addr.sin_port = ::htons(hostshort: port);
386 addr.sin_family = AF_INET;
387 ::ssize_t sent = 0;
388 if (const Guard lock(*this); true)
389 {
390 sent = ::sendto(fd: sock, buf: bytes, n: size, flags: 0, addr: reinterpret_cast<const ::sockaddr*>(&addr), addr_len: sizeof(addr));
391 if (sent == -1)
392 {
393 throw std::runtime_error{"Unable to send message to address, errno: " + safeStrErrno() + '.'};
394 }
395 }
396 return sent;
397}
398
399::ssize_t UDPSocket::toSendTo(const std::string_view message, const std::string& ip, const std::uint16_t port)
400{
401 return toSendTo(bytes: message.data(), size: message.length(), ip, port);
402}
403
404::ssize_t UDPSocket::toSend(const char* const bytes, const std::size_t size)
405{
406 const Guard lock(*this);
407 return ::send(fd: sock, buf: bytes, n: size, flags: 0);
408}
409
410::ssize_t UDPSocket::toSend(const std::string_view message)
411{
412 return toSend(bytes: message.data(), size: message.length());
413}
414
415void UDPSocket::toConnect(const std::string& ip, const std::uint16_t port)
416{
417 ::addrinfo* addrInfo = nullptr;
418 ::addrinfo hints{};
419 hints.ai_family = AF_INET;
420 hints.ai_socktype = ::SOCK_DGRAM;
421
422 if (const int status = ::getaddrinfo(name: ip.c_str(), service: nullptr, req: &hints, pai: &addrInfo); status != 0)
423 {
424 throw std::runtime_error{
425 "Invalid address, status: " + std::string{::gai_strerror(ecode: status)} + ", errno: " + safeStrErrno() + '.'};
426 }
427
428 for (const ::addrinfo* entry = addrInfo; entry != nullptr; entry = entry->ai_next)
429 {
430 if (entry->ai_family == AF_INET)
431 {
432 std::memcpy(dest: static_cast<void*>(&sockAddr), src: static_cast<void*>(entry->ai_addr), n: sizeof(::sockaddr_in));
433 break;
434 }
435 }
436 ::freeaddrinfo(ai: addrInfo);
437
438 sockAddr.sin_family = AF_INET;
439 sockAddr.sin_port = ::htons(hostshort: port);
440 sockAddr.sin_addr.s_addr = static_cast<std::uint32_t>(sockAddr.sin_addr.s_addr);
441 if (::connect(fd: sock, addr: reinterpret_cast<const ::sockaddr*>(&sockAddr), len: sizeof(::sockaddr_in)) == -1)
442 {
443 throw std::runtime_error{"Failed to connect to the socket, errno: " + safeStrErrno() + '.'};
444 }
445}
446
447void UDPSocket::toReceive(const bool detach)
448{
449 if (auto self = shared_from_this(); !detach)
450 {
451 launchAsyncTask(func&: toRecv, args&: self);
452 }
453 else
454 {
455 std::thread(toRecv, self).detach();
456 }
457}
458
459void UDPSocket::toReceiveFrom(const bool detach)
460{
461 auto self = shared_from_this();
462 if (!detach)
463 {
464 launchAsyncTask(func&: toRecvFrom, args&: self);
465 }
466 else
467 {
468 std::thread(toRecvFrom, self).detach();
469 }
470}
471
472void UDPSocket::subscribeMessage(MessageCallback callback)
473{
474 msgCb.store(desired: std::make_shared<decltype(callback)>(args: std::move(callback)), o: std::memory_order_release);
475}
476
477void UDPSocket::subscribeRawMessage(RawMessageCallback callback)
478{
479 rawMsgCb.store(desired: std::make_shared<decltype(callback)>(args: std::move(callback)), o: std::memory_order_release);
480}
481
482void UDPSocket::toRecv(const std::shared_ptr<UDPSocket> socket) // NOLINT(performance-unnecessary-value-param)
483{
484 std::array<char, bufferSize> tempBuffer{};
485 std::vector<::pollfd> pollFDs(1);
486 pollFDs.at(n: 0).fd = socket->sock;
487 pollFDs.at(n: 0).events = POLLIN;
488 for (constexpr std::uint8_t timeout = 10; !socket->stopRequested();)
489 {
490 const int status = ::poll(fds: pollFDs.data(), nfds: pollFDs.size(), timeout: timeout);
491 if (status == -1)
492 {
493 throw std::runtime_error{"Not the expected wait result for poll, errno: " + safeStrErrno() + '.'};
494 }
495 if (status == 0)
496 {
497 continue;
498 }
499
500 if (::ssize_t msgLen = 0; pollFDs.at(n: 0).revents & POLLIN)
501 {
502 if (const Guard lock(*socket); true)
503 {
504 msgLen = ::recv(fd: socket->sock, buf: tempBuffer.data(), n: tempBuffer.size(), flags: 0);
505 if (msgLen == -1)
506 {
507 break;
508 }
509 }
510
511 tempBuffer[msgLen] = '\0';
512 socket->onMessage(
513 message: std::string_view(tempBuffer.data(), msgLen), ip: socket->transportAddress(), port: socket->transportPort());
514 socket->onRawMessage(bytes: tempBuffer.data(), size: msgLen, ip: socket->transportAddress(), port: socket->transportPort());
515 }
516 }
517}
518
519void UDPSocket::toRecvFrom(const std::shared_ptr<UDPSocket> socket) // NOLINT(performance-unnecessary-value-param)
520{
521 ::sockaddr_in addr{};
522 ::socklen_t hostAddrSize = sizeof(addr);
523
524 std::array<char, bufferSize> tempBuffer{};
525 std::vector<::pollfd> pollFDs(1);
526 pollFDs.at(n: 0).fd = socket->sock;
527 pollFDs.at(n: 0).events = POLLIN;
528 for (constexpr std::uint8_t timeout = 10; !socket->stopRequested();)
529 {
530 const int status = ::poll(fds: pollFDs.data(), nfds: pollFDs.size(), timeout: timeout);
531 if (status == -1)
532 {
533 throw std::runtime_error{"Not the expected wait result for poll, errno: " + safeStrErrno() + '.'};
534 }
535 if (status == 0)
536 {
537 continue;
538 }
539
540 if (::ssize_t msgLen = 0; pollFDs.at(n: 0).revents & POLLIN)
541 {
542 if (const Guard lock(*socket); true)
543 {
544 msgLen = ::recvfrom(
545 fd: socket->sock,
546 buf: tempBuffer.data(),
547 n: tempBuffer.size(),
548 flags: 0,
549 addr: reinterpret_cast<::sockaddr*>(&addr),
550 addr_len: &hostAddrSize);
551 if (msgLen == -1)
552 {
553 break;
554 }
555 }
556
557 tempBuffer[msgLen] = '\0';
558 socket->onMessage(message: std::string_view(tempBuffer.data(), msgLen), ip: ipAddrString(addr), port: ::ntohs(netshort: addr.sin_port));
559 socket->onRawMessage(bytes: tempBuffer.data(), size: msgLen, ip: ipAddrString(addr), port: ::ntohs(netshort: addr.sin_port));
560 }
561 }
562}
563
564void UDPSocket::onMessage(const std::string_view message, const std::string& ip, const std::uint16_t port) const
565{
566 const auto& callback = msgCb.load(o: std::memory_order_acquire);
567 if (callback && *callback)
568 {
569 (*callback)(message, ip, port);
570 }
571}
572
573void UDPSocket::onRawMessage(
574 char* const bytes, const std::size_t size, const std::string& ip, const std::uint16_t port) const
575{
576 const auto& callback = rawMsgCb.load(o: std::memory_order_acquire);
577 if (callback && *callback)
578 {
579 (*callback)(bytes, size, ip, port);
580 }
581}
582
583void UDPServer::toBind(const std::string& ip, const std::uint16_t port)
584{
585 if (::inet_pton(AF_INET, cp: ip.c_str(), buf: &sockAddr.sin_addr) == -1)
586 {
587 throw std::runtime_error{"Invalid address, address type is not supported, errno: " + safeStrErrno() + '.'};
588 }
589
590 sockAddr.sin_family = AF_INET;
591 sockAddr.sin_port = ::htons(hostshort: port);
592 if (const Guard lock(*this); ::bind(fd: sock, addr: reinterpret_cast<const ::sockaddr*>(&sockAddr), len: sizeof(sockAddr)) == -1)
593 {
594 throw std::runtime_error{"Failed to bind the socket, errno: " + safeStrErrno() + '.'};
595 }
596}
597
598void UDPServer::toBind(const std::uint16_t port)
599{
600 toBind(ip: "0.0.0.0", port);
601}
602// NOLINTEND(cppcoreguidelines-pro-type-reinterpret-cast)
603} // namespace utility::socket
604