revision of events

This commit is contained in:
Ondřej Novák 2025-01-27 17:33:59 +01:00
parent 858c4384e8
commit 669f72908e
33 changed files with 661 additions and 382 deletions

View file

@ -4,6 +4,7 @@
#include <thread>
#include <atomic>
#include <memory>
#include <queue>
#include <unordered_map>
struct TaskInfo {
@ -13,6 +14,7 @@ struct TaskInfo {
std::chrono::system_clock::time_point _wake_up_after = {};
int wake_up_msg = -1;
bool request_exit = false;
bool exited = false;
TaskInfo(int id):id(id) {}
};
@ -32,41 +34,83 @@ static std::atomic<bool> resume_master_flag = {false};
static TaskInfo *current_task_inst = NULL;
static EVENT_MSG *cur_message = NULL;
struct MsgQueue {
EVENT_MSG *msg;
TaskInfo *sender;
};
static std::queue<MsgQueue> msg_queue;
void flush_message_queue();
static void switch_to_task(TaskInfo *task) {
if (task == current_task_inst) return;
if (task == NULL) {
TaskInfo *me = current_task_inst;
current_task_inst = NULL;
me->resume_flag = false;
resume_master_flag = true;
resume_master_flag.notify_all();
me->resume_flag.wait(false);
me->resume_flag = false;
} else if (current_task_inst == NULL) {
if (task->exited) return ;
current_task_inst = task;
resume_master_flag = false;
task->resume_flag = true;
task->resume_flag.notify_all();
resume_master_flag.wait(false);
resume_master_flag = false;
flush_message_queue();
} else {
if (task->exited) return ;
TaskInfo *me = current_task_inst;
me->resume_flag = false;
current_task_inst = task;
task->resume_flag = true;
task->resume_flag.notify_all();
me->resume_flag.wait(false);
me->resume_flag = false;
}
}
static void clean_task_table() {
for (auto iter = task_list.begin(); iter != task_list.end();) {
if (iter->second->exited) {
iter = task_list.erase(iter);
} else {
++iter;
}
}
}
static void clean_up_current_task() {
TaskInfo *me = current_task_inst;
if (!me) return;
int id = me->id;
me->thr.detach();
task_list.erase(id);
me->exited = true;
current_task_inst = NULL;
resume_master_flag = true;
resume_master_flag.notify_all();
}
void flush_message_queue() {
while (!msg_queue.empty()) {
auto m = msg_queue.front();
msg_queue.pop();
for (auto &[id, task]: task_list) {
if (task->wake_up_msg == m.msg->msg && task.get() != m.sender) {
EVENT_MSG cpy;
cpy.msg = m.msg->msg;
va_copy(cpy.data, m.msg->data);
cur_message = &cpy;
switch_to_task(task.get());
va_end(cpy.data);
cur_message = NULL;
}
}
clean_task_table();
}
}
int add_task(int stack,TaskerFunctionName fcname,...) {
int id = get_new_task_id();
auto st = task_list.emplace(id, std::make_unique<TaskInfo>(id));
@ -75,6 +119,7 @@ int add_task(int stack,TaskerFunctionName fcname,...) {
va_start(args, fcname);
new_task->thr = std::thread([&]{
new_task->resume_flag.wait(false);
new_task->resume_flag = false;
fcname(args);
clean_up_current_task();
});
@ -93,17 +138,13 @@ char is_running(int id_num) {
return id_num < 0 || task_list.find(id_num) != task_list.end();
}
void unsuspend_task(EVENT_MSG *msg) {
for (auto &[id, task]: task_list) {
if (task->wake_up_msg == msg->msg) {
EVENT_MSG cpy;
cpy.msg = msg->msg;
va_copy(cpy.data, msg->data);
cur_message = &cpy;
switch_to_task(task.get());
va_end(cpy.data);
cur_message = NULL;
}
msg_queue.push({msg, current_task_inst});
if (current_task_inst) {
switch_to_task(NULL);
} else {
flush_message_queue();
}
}
void task_sleep(void) {
if (current_task_inst) {
@ -115,13 +156,14 @@ void task_sleep(void) {
switch_to_task(task.get());
}
}
clean_task_table();
}
}
EVENT_MSG *task_wait_event(int32_t event_number) {
if (current_task_inst == NULL) return NULL;
current_task_inst->wake_up_msg = event_number;
switch_to_task(NULL);
return NULL;
return cur_message;
}
int q_any_task() {
return task_list.size();