improve sse receiver

This commit is contained in:
Ondrej Novak 2025-05-07 22:01:26 +02:00
parent bb5be10adc
commit 240c8764fb
7 changed files with 213 additions and 266 deletions

View file

@ -283,7 +283,7 @@ int vmode=2;
#include <platform/sse_receiver.h>
static SSE_RECEIVER *sse_receiver = NULL;
static MTQUEUE *mtqueue = NULL;
void purge_temps(char _) {
temp_storage_clear();
@ -745,8 +745,7 @@ void done_skeldal(void)
cur_config = NULL;
}
kill_timer();
if (sse_receiver) sse_receiver_stop(sse_receiver);
if (mtqueue) mtqueue_destroy(mtqueue);
if (sse_receiver) sse_receiver_destroy(sse_receiver);
}
@ -987,10 +986,9 @@ void show_loading_picture(char *filename)
void sse_listener_watch(EVENT_MSG *msg, void **userdata) {
if (msg->msg == E_WATCH) {
char *s = mtqueue_pop(mtqueue);
const char *s = sse_receiver_receive(sse_receiver);
if (s) {
send_message(E_EXTERNAL_MSG, s);
free(s);
}
}
}
@ -1005,14 +1003,11 @@ void sse_listener_init(const char *hostport) {
++port;
}
MTQUEUE *q = mtqueue_create();
SSE_RECEIVER *rcv = sse_receiver_install(q, host, port);
SSE_RECEIVER *rcv = sse_receiver_create(host, port);
if (rcv == NULL) {
mtqueue_destroy(q);
return;
}
mtqueue = q;
sse_receiver = rcv;
send_message(E_ADD, E_WATCH, sse_listener_watch);
@ -1165,28 +1160,32 @@ extern char running_battle;
int sector;
int i;
if (sscanf(m, "RELOAD %12s %d", fname, &sector) != 2) return 0;
if (sscanf(m, "RELOAD %12s %d", fname, &sector) == 2) {
strcopy_n(loadlevel.name,fname,sizeof(loadlevel.name));
loadlevel.start_pos=sector;
for(i=0;i<POCET_POSTAV;i++) {
postavy[i].sektor=loadlevel.start_pos;
postavy[i].groupnum = 1;
strcopy_n(loadlevel.name,fname,sizeof(loadlevel.name));
loadlevel.start_pos=sector;
for(i=0;i<POCET_POSTAV;i++) if (postavy[i].used) {
postavy[i].sektor=loadlevel.start_pos;
postavy[i].groupnum = 1;
}
SEND_LOG("(WIZARD) Load map '%s' %d",loadlevel.name,loadlevel.start_pos);
unwire_proc();
if (battle) konec_kola();
battle=0;
running_battle=0;
doNotLoadMapState=1;
hl_ptr=ikon_libs;
destroy_fly_map();
load_items();
zneplatnit_block(H_ENEMY);
zneplatnit_block(H_SHOP_PIC);
zneplatnit_block(H_DIALOGY_DAT);
load_shops();
send_message(E_CLOSE_MAP);
} else if (strncmp(m, "MESSAGE ", 8) == 0) {
bott_disp_text(m+8);
}
SEND_LOG("(WIZARD) Load map '%s' %d",loadlevel.name,loadlevel.start_pos);
unwire_proc();
if (battle) konec_kola();
battle=0;
running_battle=0;
doNotLoadMapState=1;
hl_ptr=ikon_libs;
destroy_fly_map();
load_items();
zneplatnit_block(H_ENEMY);
zneplatnit_block(H_SHOP_PIC);
zneplatnit_block(H_DIALOGY_DAT);
load_shops();
send_message(E_CLOSE_MAP);
}
return 0;
}

View file

@ -12,8 +12,7 @@ target_sources(skeldal_platform PRIVATE
timer.cpp
getopt.c
achievements.cpp
mtqueue.cpp
sse_receiver.cpp
sse_receiver.c
)
set(all_libs

View file

@ -1,44 +0,0 @@
#include "mtqueue.h"
#include <cstring>
#include <malloc.h>
#include <memory>
#include <mutex>
#include <queue>
struct StringDeleter {
void operator()(char *x) {
free(x);
}
};
std::unique_ptr<char, StringDeleter> alloc_string(const char *x) {
return std::unique_ptr<char, StringDeleter>(strdup(x));
}
typedef struct tag_mtqueue {
std::queue<std::unique_ptr<char, StringDeleter> > _q;
std::mutex _mx;
} MTQUEUE;
MTQUEUE *mtqueue_create() {
return new MTQUEUE();
}
void mtqueue_push(MTQUEUE *q, const char *message) {
std::lock_guard _(q->_mx);
q->_q.push(alloc_string(message));
}
char *mtqueue_pop(MTQUEUE *q) {
std::lock_guard _(q->_mx);
if (q->_q.empty()) return NULL;
else {
char *c = q->_q.front().release();
q->_q.pop();
return c;
}
}
void mtqueue_destroy(MTQUEUE *q) {
delete q;
}

View file

@ -1,32 +0,0 @@
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
typedef struct tag_mtqueue MTQUEUE;
///Create multithread queue
MTQUEUE *mtqueue_create();
///push to queue (string is copied)
/**
* @param q queue
* @param message message (string is copied)
*/
void mtqueue_push(MTQUEUE *q, const char *message);
///pop from the queue
/**
*
* @param q queue
* @return NULL, if queue is empty, or string. You have to release
* string by calling free() when you finish.
*/
char *mtqueue_pop(MTQUEUE *q);
///destroy the queue
void mtqueue_destroy(MTQUEUE *q);
#ifdef __cplusplus
}
#endif

181
platform/sse_receiver.c Normal file
View file

@ -0,0 +1,181 @@
// sse_receiver.c
#include "sse_receiver.h"
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <stdio.h>
#include <errno.h>
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#pragma comment(lib, "ws2_32.lib")
typedef SOCKET sock_t;
#define CLOSESOCK closesocket
#define sock_init() { WSADATA wsa; WSAStartup(MAKEWORD(2,2), &wsa); }
#define sock_cleanup() WSACleanup()
#else
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
typedef int sock_t;
#define INVALID_SOCKET -1
#define CLOSESOCK close
#define sock_init()
#define sock_cleanup()
#endif
#define BUFFER_SIZE 4096
struct tag_sse_receiver {
char host[256];
char port[16];
sock_t sock;
int connected;
char buffer[BUFFER_SIZE+1];
size_t buf_len;
size_t line_len;
time_t next_attempt;
};
static void set_nonblocking(sock_t sock) {
#ifdef _WIN32
u_long mode = 1;
ioctlsocket(sock, FIONBIO, &mode);
#else
fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | O_NONBLOCK);
#endif
}
static int connect_and_handshake(SSE_RECEIVER *sse) {
struct addrinfo hints = {0}, *res = NULL;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
if (getaddrinfo(sse->host, sse->port, &hints, &res) != 0) {
return 0;
}
sock_t sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (sock == INVALID_SOCKET) {
freeaddrinfo(res);
return 0;
}
if (connect(sock, res->ai_addr, (int)res->ai_addrlen) != 0) {
CLOSESOCK(sock);
freeaddrinfo(res);
return 0;
}
freeaddrinfo(res);
const char *req_fmt =
"GET /command HTTP/1.1\r\n"
"Host: %s\r\n"
"Accept: text/event-stream\r\n"
"Connection: keep-alive\r\n\r\n";
char req[512];
snprintf(req, sizeof(req), req_fmt, sse->host);
send(sock, req, (int)strlen(req), 0);
set_nonblocking(sock);
sse->sock = sock;
sse->buf_len = 0;
sse->line_len = 0;
sse->connected = 1;
return 1;
}
SSE_RECEIVER *sse_receiver_create(const char *host, const char *port) {
sock_init();
SSE_RECEIVER *sse = calloc(1, sizeof(SSE_RECEIVER));
strncpy(sse->host, host, sizeof(sse->host) - 1);
strncpy(sse->port, port, sizeof(sse->port) - 1);
sse->sock = INVALID_SOCKET;
sse->connected = 0;
sse->buf_len = 0;
sse->line_len = 0;
sse->next_attempt = 0;
return sse;
}
size_t find_sep(const char *buffer, size_t size, char sep) {
for (size_t i = 0; i < size; ++i) {
if (buffer[i] == sep) return i;
}
return size;
}
const char *sse_receiver_receive(SSE_RECEIVER *sse) {
while(1) {
size_t rm = sse->buf_len-sse->line_len;
if (rm) {
const char *r = sse->buffer+sse->line_len;
size_t sep = find_sep(r, rm, '\n');
if (sep < rm) {
sep += sse->line_len;
sse->buffer[sep] = 0;
sse->line_len = sep+1;
if (strncmp(r,"data:",5) == 0) {
r+=5;
while (*r && isspace(*r)) ++r;
if (*r) {
return r;
}
}
continue;
}
if (r != sse->buffer) memmove(sse->buffer, r, rm);
}
sse->line_len = 0;
sse->buf_len = rm;
if (!sse->connected) {
time_t t = time(NULL);
if (t < sse->next_attempt) {
return NULL;
}
if (!connect_and_handshake(sse)) {
snprintf(sse->buffer, sizeof(sse->buffer) - 1,
"MESSAGE Failed to connect the command server %s:%s ",
sse->host, sse->port);
sse->next_attempt = t+5;
return sse->buffer;
} else {
return "MESSAGE Connected to command server";
}
}
int n = recv(sse->sock, sse->buffer + sse->buf_len, (int)(BUFFER_SIZE - sse->buf_len), 0);
if (n <= 0) {
#ifdef _WIN32
int err = WSAGetLastError();
if (err != WSAEWOULDBLOCK && err != WSAEINPROGRESS)
#else
if (errno != EWOULDBLOCK && errno != EAGAIN)
#endif
{
CLOSESOCK(sse->sock);
sse->connected = 0;
}
return NULL;
}
sse->buf_len += n;
}
}
void sse_receiver_destroy(SSE_RECEIVER *sse) {
if (sse->connected) {
CLOSESOCK(sse->sock);
}
free(sse);
sock_cleanup();
}

View file

@ -1,142 +0,0 @@
#include "sse_receiver.h"
#include <atomic>
#include <thread>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <functional>
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#pragma comment(lib, "ws2_32.lib")
typedef SOCKET sock_t;
#define CLOSESOCK closesocket
#define sock_init() { WSADATA wsa; WSAStartup(MAKEWORD(2,2), &wsa); }
#define sock_cleanup() WSACleanup()
#define SHUT_RD SD_RECEIVE
#else
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <fcntl.h>
typedef int sock_t;
#define INVALID_SOCKET -1
#define CLOSESOCK close
#define sock_init()
#define sock_cleanup()
#endif
#define BUFFER_SIZE 1024
void sse_client_loop(const char *host, const char *port, std::function<void(const char *)> callback, std::stop_token tkn) {
sock_init();
struct addrinfo hints = {}, *res = NULL;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
if (getaddrinfo(host, port, &hints, &res) != 0) {
perror("getaddrinfo");
return;
}
sock_t sock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (sock == INVALID_SOCKET) {
perror("socket");
freeaddrinfo(res);
return;
}
if (connect(sock, res->ai_addr, res->ai_addrlen) != 0) {
perror("connect");
CLOSESOCK(sock);
freeaddrinfo(res);
return;
}
freeaddrinfo(res);
{
std::stop_callback _cb(tkn, [&]{
shutdown(sock, SHUT_RD);
});
// Send HTTP GET request
char req[512];
snprintf(req, sizeof(req),
"GET /command HTTP/1.1\r\n"
"Host: %s\r\n"
"Accept: text/event-stream\r\n"
"Connection: keep-alive\r\n\r\n", host);
send(sock, req, strlen(req), 0);
// Read response and extract "data: " lines
char buffer[BUFFER_SIZE];
int buf_len = 0;
while (!tkn.stop_requested()) {
int n = recv(sock, buffer + buf_len, BUFFER_SIZE - buf_len - 1, 0);
if (n <= 0) {
break;
}
buf_len += n;
buffer[buf_len] = '\0';
char *line_start = buffer;
while (1) {
char *newline = strstr(line_start, "\n");
if (!newline) break;
*newline = '\0';
if (strncmp(line_start, "data: ", 6) == 0) {
callback(line_start + 6);
}
line_start = newline + 1;
}
// Move leftover data to start
buf_len = strlen(line_start);
memmove(buffer, line_start, buf_len);
}
}
CLOSESOCK(sock);
sock_cleanup();
return;
}
typedef struct tag_sse_receiver {
std::jthread thr;
}
SSE_RECEIVER;
SSE_RECEIVER *sse_receiver_install(MTQUEUE *q, const char *host, const char *port) {
SSE_RECEIVER *sse = new SSE_RECEIVER;
sse->thr = std::jthread([sse, q, host = std::string(host), port = std::string(port)](std::stop_token tkn){
while (!tkn.stop_requested()) {
std::this_thread::sleep_for(std::chrono::seconds(2));
sse_client_loop(host.c_str(), port.c_str(), [q](const char *msg){
mtqueue_push(q, msg);
}, tkn);
}
});
return sse;
}
void sse_receiver_stop(SSE_RECEIVER *inst) {
delete inst;
}

View file

@ -1,28 +1,14 @@
#pragma once
#include "mtqueue.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct tag_sse_receiver SSE_RECEIVER;
///Install sse receiver
/**
* @param q mtqueue, which receives messages received by the receiver
* @param host host
* @param port port
* @return pointer to instance of receiver
*/
SSE_RECEIVER *sse_receiver_install(MTQUEUE *q, const char *host, const char *port);
///Stops the receiver
/**
* @param inst instance of receiver
* @note the associated queue is not destroyed
*/
void sse_receiver_stop(SSE_RECEIVER *inst);
SSE_RECEIVER *sse_receiver_create(const char *host, const char *port);
const char *sse_receiver_receive(SSE_RECEIVER *sse);
void sse_receiver_destroy(SSE_RECEIVER *inst);
#ifdef __cplusplus