diff --git a/game/skeldal.c b/game/skeldal.c index c289f50..24c5859 100644 --- a/game/skeldal.c +++ b/game/skeldal.c @@ -283,7 +283,7 @@ int vmode=2; #include static SSE_RECEIVER *sse_receiver = NULL; -static MTQUEUE *mtqueue = NULL; + void purge_temps(char _) { temp_storage_clear(); @@ -745,8 +745,7 @@ void done_skeldal(void) cur_config = NULL; } kill_timer(); - if (sse_receiver) sse_receiver_stop(sse_receiver); - if (mtqueue) mtqueue_destroy(mtqueue); + if (sse_receiver) sse_receiver_destroy(sse_receiver); } @@ -987,10 +986,9 @@ void show_loading_picture(char *filename) void sse_listener_watch(EVENT_MSG *msg, void **userdata) { if (msg->msg == E_WATCH) { - char *s = mtqueue_pop(mtqueue); + const char *s = sse_receiver_receive(sse_receiver); if (s) { send_message(E_EXTERNAL_MSG, s); - free(s); } } } @@ -1005,14 +1003,11 @@ void sse_listener_init(const char *hostport) { ++port; } - MTQUEUE *q = mtqueue_create(); - SSE_RECEIVER *rcv = sse_receiver_install(q, host, port); + SSE_RECEIVER *rcv = sse_receiver_create(host, port); if (rcv == NULL) { - mtqueue_destroy(q); return; } - mtqueue = q; sse_receiver = rcv; send_message(E_ADD, E_WATCH, sse_listener_watch); @@ -1165,28 +1160,32 @@ extern char running_battle; int sector; int i; - if (sscanf(m, "RELOAD %12s %d", fname, §or) != 2) return 0; + if (sscanf(m, "RELOAD %12s %d", fname, §or) == 2) { - strcopy_n(loadlevel.name,fname,sizeof(loadlevel.name)); - loadlevel.start_pos=sector; - for(i=0;i -#include -#include -#include -#include -struct StringDeleter { - void operator()(char *x) { - free(x); - } -}; - -std::unique_ptr alloc_string(const char *x) { - return std::unique_ptr(strdup(x)); -} - - -typedef struct tag_mtqueue { - std::queue > _q; - std::mutex _mx; - - -} MTQUEUE; - -MTQUEUE *mtqueue_create() { - return new MTQUEUE(); -} -void mtqueue_push(MTQUEUE *q, const char *message) { - std::lock_guard _(q->_mx); - q->_q.push(alloc_string(message)); -} -char *mtqueue_pop(MTQUEUE *q) { - std::lock_guard _(q->_mx); - if (q->_q.empty()) return NULL; - else { - char *c = q->_q.front().release(); - q->_q.pop(); - return c; - } -} -void mtqueue_destroy(MTQUEUE *q) { - delete q; -} diff --git a/platform/mtqueue.h b/platform/mtqueue.h deleted file mode 100644 index 07919e0..0000000 --- a/platform/mtqueue.h +++ /dev/null @@ -1,32 +0,0 @@ -#pragma once - -#ifdef __cplusplus -extern "C" { -#endif - - -typedef struct tag_mtqueue MTQUEUE; -///Create multithread queue -MTQUEUE *mtqueue_create(); -///push to queue (string is copied) -/** - * @param q queue - * @param message message (string is copied) - */ -void mtqueue_push(MTQUEUE *q, const char *message); -///pop from the queue -/** - * - * @param q queue - * @return NULL, if queue is empty, or string. You have to release - * string by calling free() when you finish. - */ -char *mtqueue_pop(MTQUEUE *q); - -///destroy the queue -void mtqueue_destroy(MTQUEUE *q); - - -#ifdef __cplusplus -} -#endif diff --git a/platform/sse_receiver.c b/platform/sse_receiver.c new file mode 100644 index 0000000..c16015e --- /dev/null +++ b/platform/sse_receiver.c @@ -0,0 +1,181 @@ +// sse_receiver.c +#include "sse_receiver.h" +#include +#include +#include +#include +#include + +#ifdef _WIN32 + #include + #include + #pragma comment(lib, "ws2_32.lib") + typedef SOCKET sock_t; + #define CLOSESOCK closesocket + #define sock_init() { WSADATA wsa; WSAStartup(MAKEWORD(2,2), &wsa); } + #define sock_cleanup() WSACleanup() +#else + #include + #include + #include + #include + #include + typedef int sock_t; + #define INVALID_SOCKET -1 + #define CLOSESOCK close + #define sock_init() + #define sock_cleanup() +#endif + +#define BUFFER_SIZE 4096 + +struct tag_sse_receiver { + char host[256]; + char port[16]; + sock_t sock; + int connected; + char buffer[BUFFER_SIZE+1]; + size_t buf_len; + size_t line_len; + time_t next_attempt; +}; + +static void set_nonblocking(sock_t sock) { +#ifdef _WIN32 + u_long mode = 1; + ioctlsocket(sock, FIONBIO, &mode); +#else + fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | O_NONBLOCK); +#endif +} + +static int connect_and_handshake(SSE_RECEIVER *sse) { + struct addrinfo hints = {0}, *res = NULL; + + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + + if (getaddrinfo(sse->host, sse->port, &hints, &res) != 0) { + return 0; + } + + sock_t sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (sock == INVALID_SOCKET) { + freeaddrinfo(res); + return 0; + } + + if (connect(sock, res->ai_addr, (int)res->ai_addrlen) != 0) { + CLOSESOCK(sock); + freeaddrinfo(res); + return 0; + } + + freeaddrinfo(res); + + const char *req_fmt = + "GET /command HTTP/1.1\r\n" + "Host: %s\r\n" + "Accept: text/event-stream\r\n" + "Connection: keep-alive\r\n\r\n"; + + char req[512]; + snprintf(req, sizeof(req), req_fmt, sse->host); + send(sock, req, (int)strlen(req), 0); + + set_nonblocking(sock); + sse->sock = sock; + sse->buf_len = 0; + sse->line_len = 0; + sse->connected = 1; + + return 1; +} + +SSE_RECEIVER *sse_receiver_create(const char *host, const char *port) { + sock_init(); + SSE_RECEIVER *sse = calloc(1, sizeof(SSE_RECEIVER)); + strncpy(sse->host, host, sizeof(sse->host) - 1); + strncpy(sse->port, port, sizeof(sse->port) - 1); + sse->sock = INVALID_SOCKET; + sse->connected = 0; + sse->buf_len = 0; + sse->line_len = 0; + sse->next_attempt = 0; + return sse; +} + +size_t find_sep(const char *buffer, size_t size, char sep) { + for (size_t i = 0; i < size; ++i) { + if (buffer[i] == sep) return i; + } + return size; +} + +const char *sse_receiver_receive(SSE_RECEIVER *sse) { + while(1) { + size_t rm = sse->buf_len-sse->line_len; + if (rm) { + const char *r = sse->buffer+sse->line_len; + size_t sep = find_sep(r, rm, '\n'); + if (sep < rm) { + sep += sse->line_len; + sse->buffer[sep] = 0; + sse->line_len = sep+1; + + if (strncmp(r,"data:",5) == 0) { + r+=5; + while (*r && isspace(*r)) ++r; + if (*r) { + return r; + } + } + continue; + } + if (r != sse->buffer) memmove(sse->buffer, r, rm); + } + sse->line_len = 0; + sse->buf_len = rm; + + if (!sse->connected) { + time_t t = time(NULL); + if (t < sse->next_attempt) { + return NULL; + } + if (!connect_and_handshake(sse)) { + snprintf(sse->buffer, sizeof(sse->buffer) - 1, + "MESSAGE Failed to connect the command server %s:%s ", + sse->host, sse->port); + sse->next_attempt = t+5; + return sse->buffer; + } else { + return "MESSAGE Connected to command server"; + } + } + + int n = recv(sse->sock, sse->buffer + sse->buf_len, (int)(BUFFER_SIZE - sse->buf_len), 0); + if (n <= 0) { +#ifdef _WIN32 + int err = WSAGetLastError(); + if (err != WSAEWOULDBLOCK && err != WSAEINPROGRESS) +#else + if (errno != EWOULDBLOCK && errno != EAGAIN) +#endif + { + CLOSESOCK(sse->sock); + sse->connected = 0; + } + return NULL; + } + + sse->buf_len += n; + } +} + +void sse_receiver_destroy(SSE_RECEIVER *sse) { + if (sse->connected) { + CLOSESOCK(sse->sock); + } + free(sse); + sock_cleanup(); +} diff --git a/platform/sse_receiver.cpp b/platform/sse_receiver.cpp deleted file mode 100644 index f043e83..0000000 --- a/platform/sse_receiver.cpp +++ /dev/null @@ -1,142 +0,0 @@ -#include "sse_receiver.h" - -#include -#include -#include -#include -#include -#include -#include - -#ifdef _WIN32 - #include - #include - #pragma comment(lib, "ws2_32.lib") - typedef SOCKET sock_t; - #define CLOSESOCK closesocket - #define sock_init() { WSADATA wsa; WSAStartup(MAKEWORD(2,2), &wsa); } - #define sock_cleanup() WSACleanup() - #define SHUT_RD SD_RECEIVE -#else - #include - #include - #include - #include - #include - typedef int sock_t; - #define INVALID_SOCKET -1 - #define CLOSESOCK close - #define sock_init() - #define sock_cleanup() -#endif - -#define BUFFER_SIZE 1024 - - -void sse_client_loop(const char *host, const char *port, std::function callback, std::stop_token tkn) { - sock_init(); - - struct addrinfo hints = {}, *res = NULL; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; - - if (getaddrinfo(host, port, &hints, &res) != 0) { - perror("getaddrinfo"); - return; - } - - sock_t sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol); - if (sock == INVALID_SOCKET) { - perror("socket"); - freeaddrinfo(res); - return; - } - - - - if (connect(sock, res->ai_addr, res->ai_addrlen) != 0) { - perror("connect"); - CLOSESOCK(sock); - freeaddrinfo(res); - return; - } - - - freeaddrinfo(res); - - { - std::stop_callback _cb(tkn, [&]{ - shutdown(sock, SHUT_RD); - }); - - // Send HTTP GET request - char req[512]; - snprintf(req, sizeof(req), - "GET /command HTTP/1.1\r\n" - "Host: %s\r\n" - "Accept: text/event-stream\r\n" - "Connection: keep-alive\r\n\r\n", host); - send(sock, req, strlen(req), 0); - - // Read response and extract "data: " lines - char buffer[BUFFER_SIZE]; - int buf_len = 0; - - while (!tkn.stop_requested()) { - int n = recv(sock, buffer + buf_len, BUFFER_SIZE - buf_len - 1, 0); - if (n <= 0) { - break; - } - - buf_len += n; - buffer[buf_len] = '\0'; - - char *line_start = buffer; - while (1) { - char *newline = strstr(line_start, "\n"); - if (!newline) break; - - *newline = '\0'; - - if (strncmp(line_start, "data: ", 6) == 0) { - callback(line_start + 6); - } - - line_start = newline + 1; - } - - // Move leftover data to start - buf_len = strlen(line_start); - memmove(buffer, line_start, buf_len); - } - - } - CLOSESOCK(sock); - sock_cleanup(); - return; -} - - - - -typedef struct tag_sse_receiver { - std::jthread thr; -} -SSE_RECEIVER; - -SSE_RECEIVER *sse_receiver_install(MTQUEUE *q, const char *host, const char *port) { - SSE_RECEIVER *sse = new SSE_RECEIVER; - sse->thr = std::jthread([sse, q, host = std::string(host), port = std::string(port)](std::stop_token tkn){ - while (!tkn.stop_requested()) { - std::this_thread::sleep_for(std::chrono::seconds(2)); - sse_client_loop(host.c_str(), port.c_str(), [q](const char *msg){ - mtqueue_push(q, msg); - }, tkn); - } - }); - return sse; -} -void sse_receiver_stop(SSE_RECEIVER *inst) { - delete inst; -} - diff --git a/platform/sse_receiver.h b/platform/sse_receiver.h index 044c5a7..a567c67 100644 --- a/platform/sse_receiver.h +++ b/platform/sse_receiver.h @@ -1,28 +1,14 @@ #pragma once -#include "mtqueue.h" #ifdef __cplusplus extern "C" { #endif typedef struct tag_sse_receiver SSE_RECEIVER; - -///Install sse receiver -/** - * @param q mtqueue, which receives messages received by the receiver - * @param host host - * @param port port - * @return pointer to instance of receiver - */ -SSE_RECEIVER *sse_receiver_install(MTQUEUE *q, const char *host, const char *port); - -///Stops the receiver -/** - * @param inst instance of receiver - * @note the associated queue is not destroyed - */ -void sse_receiver_stop(SSE_RECEIVER *inst); +SSE_RECEIVER *sse_receiver_create(const char *host, const char *port); +const char *sse_receiver_receive(SSE_RECEIVER *sse); +void sse_receiver_destroy(SSE_RECEIVER *inst); #ifdef __cplusplus