threaded pipe reading

This commit is contained in:
Nordup 2024-11-03 01:17:27 +04:00
parent 1b4681aee9
commit 741ca371cc
3 changed files with 82 additions and 62 deletions

View file

@ -7,6 +7,7 @@ class_name SandboxLogger
const LOG_FOLDER := "user://logs"
const LOG_FILE := "log.txt"
const PRINT_LOGS_ARG := "--sandbox-logs"
const BUFFER_SIZE = 2048
const FLUSH_DELAY = 5
var flush_timer: Timer
@ -16,9 +17,11 @@ var pipe: Dictionary
var gate: Gate
var print_logs: bool
var is_started: bool
var logs_sent: bool
var thread1: Thread = Thread.new()
var thread2: Thread = Thread.new()
func _ready() -> void:
gate_events.not_responding.connect(send_logs)
@ -28,25 +31,68 @@ func _ready() -> void:
func start(_pipe: Dictionary, _gate: Gate) -> void:
pipe = _pipe
gate = _gate
is_started = true
var path = LOG_FOLDER + "/" + get_folder_name(gate.url) + "/" + LOG_FILE
create_log_file()
start_reading_pipes()
start_flushing_logs()
func create_log_file() -> void:
var folder = gate.url.replace("http://", "").replace("https://", "").replace(".gate", "")
folder = folder.replace(":", "_") # remove ':' before port
var path = LOG_FOLDER + "/" + folder + "/" + LOG_FILE
var global_path = ProjectSettings.globalize_path(path)
DirAccess.make_dir_recursive_absolute(path.get_base_dir())
log_file = FileAccess.open(path, FileAccess.WRITE_READ)
Debug.logclr("Logs written to [url]%s[/url]" % [global_path], Color.GRAY)
# READING FROM PIPES
func start_reading_pipes() -> void:
thread1 = Thread.new()
thread2 = Thread.new()
thread1.start(read_stdio)
thread2.start(read_stderr)
func read_stdio() -> void:
var stdio = pipe["stdio"] as FileAccess
var buffer: PackedByteArray
start_flush_timer()
while stdio.is_open():
buffer = stdio.get_buffer(BUFFER_SIZE)
if not buffer.is_empty():
store_buffer.call_deferred(buffer)
func get_folder_name(url: String) -> String:
var folder = gate.url.replace("http://", "").replace("https://", "").replace(".gate", "")
folder = folder.replace(":", "_") # remove ':' before port
return folder
func read_stderr() -> void:
var stderr = pipe["stderr"] as FileAccess
var buffer: PackedByteArray
while stderr.is_open():
buffer = stderr.get_buffer(BUFFER_SIZE)
if not buffer.is_empty():
store_buffer.call_deferred(buffer)
func start_flush_timer() -> void:
func store_buffer(buffer: PackedByteArray) -> void:
if print_logs: printraw(buffer.get_string_from_utf8())
log_file.store_buffer(buffer)
func cleanup() -> void:
pipe["stdio"].close()
pipe["stderr"].close()
if thread1 != null: thread1.wait_to_finish()
if thread2 != null: thread2.wait_to_finish()
# FLUSH AND SEND LOGS
func start_flushing_logs() -> void:
flush_timer = Timer.new()
add_child(flush_timer)
flush_timer.timeout.connect(flush_logs)
@ -54,34 +100,12 @@ func start_flush_timer() -> void:
func flush_logs() -> void:
if not log_file.is_open(): return
if log_file == null or not log_file.is_open(): return
log_file.flush()
func _process(_delta: float) -> void:
if not is_started: return
if pipe["stdio"].is_open():
var buffer = PackedByteArray()
while true:
buffer.append_array(pipe["stdio"].get_buffer(2048))
if pipe["stdio"].get_error() != OK:
break;
while true:
buffer.append_array(pipe["stderr"].get_buffer(2048))
if pipe["stderr"].get_error() != OK:
break;
if !buffer.is_empty():
if print_logs: printraw(buffer.get_string_from_utf8())
log_file.store_buffer(buffer)
func send_logs() -> void:
if not is_started: return
if logs_sent: return
if log_file == null or logs_sent: return
logs_sent = true
flush_logs()

View file

@ -17,22 +17,28 @@ func _ready() -> void:
func start_sandbox(gate: Gate) -> void:
var pipe: Dictionary
match Platform.get_platform():
Platform.WINDOWS:
start_sandbox_windows(gate)
pipe = start_sandbox_windows(gate)
Platform.LINUX_BSD:
start_sandbox_linux(gate)
pipe = start_sandbox_linux(gate)
Platform.MACOS:
start_sandbox_macos(gate)
pipe = start_sandbox_macos(gate)
_:
assert(false, "Platform is not supported")
if pipe.is_empty(): return
snbx_pid = pipe["pid"]
snbx_logger.call_thread_safe("start", pipe, gate)
gate_events.gate_entered_emit()
func start_sandbox_linux(gate: Gate) -> void:
func start_sandbox_linux(gate: Gate) -> Dictionary:
if not snbx_executable.exists():
Debug.logerr("Sandbox executable not found at " + snbx_executable.path); return
Debug.logerr("Sandbox executable not found at " + snbx_executable.path); return {}
if not snbx_env.zip_exists():
Debug.logerr("Sandbox environment not found at " + snbx_env.zip_path); return
Debug.logerr("Sandbox environment not found at " + snbx_env.zip_path); return {}
snbx_env.create_env(snbx_executable.path, gate)
@ -42,18 +48,14 @@ func start_sandbox_linux(gate: Gate) -> void:
"--resolution", "%dx%d" % [render_result.width, render_result.height],
"--verbose"
]
Debug.logclr(snbx_env.start + " " + " ".join(args), Color.DIM_GRAY)
var pipe = OS.execute_with_pipe(snbx_env.start, args, false)
snbx_logger.start(pipe, gate)
snbx_pid = pipe["pid"]
gate_events.gate_entered_emit()
return OS.execute_with_pipe(snbx_env.start, args)
func start_sandbox_windows(gate: Gate) -> void:
func start_sandbox_windows(gate: Gate) -> Dictionary:
if not snbx_executable.exists():
Debug.logerr("Sandbox executable not found at " + snbx_executable.path); return
Debug.logerr("Sandbox executable not found at " + snbx_executable.path); return {}
DirAccess.make_dir_recursive_absolute(IPC_FOLDER) # TODO: move to snbx_env
@ -65,18 +67,14 @@ func start_sandbox_windows(gate: Gate) -> void:
"--verbose"
]
if not shared_libs.is_empty(): args += ["--gdext-libs-dir", shared_libs]
Debug.logclr(snbx_executable.path + " " + " ".join(args), Color.DIM_GRAY)
var pipe = OS.execute_with_pipe(snbx_executable.path, args, false)
snbx_logger.start(pipe, gate)
snbx_pid = pipe["pid"]
gate_events.gate_entered_emit()
return OS.execute_with_pipe(snbx_executable.path, args)
func start_sandbox_macos(gate: Gate) -> void:
func start_sandbox_macos(gate: Gate) -> Dictionary:
if not snbx_executable.exists():
Debug.logerr("Sandbox executable not found at " + snbx_executable.path); return
Debug.logerr("Sandbox executable not found at " + snbx_executable.path); return {}
var pack_file = ProjectSettings.globalize_path(gate.resource_pack)
var shared_libs = ProjectSettings.globalize_path(gate.shared_libs_dir)
@ -86,13 +84,9 @@ func start_sandbox_macos(gate: Gate) -> void:
"--verbose"
]
if not shared_libs.is_empty(): args += ["--gdext-libs-dir", shared_libs]
Debug.logclr(snbx_executable.path + " " + " ".join(args), Color.DIM_GRAY)
var pipe = OS.execute_with_pipe(snbx_executable.path, args, false)
snbx_logger.start(pipe, gate)
snbx_pid = pipe["pid"]
gate_events.gate_entered_emit()
return OS.execute_with_pipe(snbx_executable.path, args)
func kill_sandbox() -> void:
@ -105,6 +99,8 @@ func kill_sandbox() -> void:
kill_sandbox_macos()
_:
assert(false, "Platform is not supported")
snbx_logger.call_thread_safe("cleanup")
func kill_sandbox_linux() -> void: