Files
esengine/packages/editor-app/src-tauri/src/profiler_ws.rs

194 lines
6.7 KiB
Rust
Raw Normal View History

2025-10-15 22:30:49 +08:00
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, Mutex};
use tokio::task::JoinHandle;
use tokio_tungstenite::{accept_async, tungstenite::Message};
use futures_util::{SinkExt, StreamExt};
pub struct ProfilerServer {
tx: broadcast::Sender<String>,
port: u16,
shutdown_tx: Arc<Mutex<Option<tokio::sync::oneshot::Sender<()>>>>,
task_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
}
impl ProfilerServer {
pub fn new(port: u16) -> Self {
let (tx, _) = broadcast::channel(100);
Self {
tx,
port,
shutdown_tx: Arc::new(Mutex::new(None)),
task_handle: Arc::new(Mutex::new(None)),
}
}
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
let addr = format!("127.0.0.1:{}", self.port);
let listener = TcpListener::bind(&addr).await?;
println!("[ProfilerServer] Listening on: {}", addr);
let tx = self.tx.clone();
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
// 存储 shutdown sender
*self.shutdown_tx.lock().await = Some(shutdown_tx);
// 启动服务器任务
let task = tokio::spawn(async move {
loop {
tokio::select! {
// 监听新连接
result = listener.accept() => {
match result {
Ok((stream, peer_addr)) => {
let tx = tx.clone();
tokio::spawn(handle_connection(stream, peer_addr, tx));
}
Err(e) => {
eprintln!("[ProfilerServer] Failed to accept connection: {}", e);
}
}
}
// 监听关闭信号
_ = &mut shutdown_rx => {
println!("[ProfilerServer] Received shutdown signal");
break;
}
}
}
println!("[ProfilerServer] Server task ending");
});
// 存储任务句柄
*self.task_handle.lock().await = Some(task);
Ok(())
}
pub async fn stop(&self) {
println!("[ProfilerServer] Stopping server...");
// 发送关闭信号
if let Some(shutdown_tx) = self.shutdown_tx.lock().await.take() {
let _ = shutdown_tx.send(());
}
// 等待任务完成
if let Some(handle) = self.task_handle.lock().await.take() {
let _ = handle.await;
}
println!("[ProfilerServer] Server stopped");
}
refactor(editor): 提取行为树编辑器为独立包并重构编辑器架构 (#216) * refactor(editor): 提取行为树编辑器为独立包并重构编辑器架构 * feat(editor): 添加插件市场功能 * feat(editor): 重构插件市场以支持版本管理和ZIP打包 * feat(editor): 重构插件发布流程并修复React渲染警告 * fix(plugin): 修复插件发布和市场的路径不一致问题 * feat: 重构插件发布流程并添加插件删除功能 * fix(editor): 完善插件删除功能并修复多个关键问题 * fix(auth): 修复自动登录与手动登录的竞态条件问题 * feat(editor): 重构插件管理流程 * feat(editor): 支持 ZIP 文件直接发布插件 - 新增 PluginSourceParser 解析插件源 - 重构发布流程支持文件夹和 ZIP 两种方式 - 优化发布向导 UI * feat(editor): 插件市场支持多版本安装 - 插件解压到项目 plugins 目录 - 新增 Tauri 后端安装/卸载命令 - 支持选择任意版本安装 - 修复打包逻辑,保留完整 dist 目录结构 * feat(editor): 个人中心支持多版本管理 - 合并同一插件的不同版本 - 添加版本历史展开/折叠功能 - 禁止有待审核 PR 时更新插件 * fix(editor): 修复 InspectorRegistry 服务注册 - InspectorRegistry 实现 IService 接口 - 注册到 Core.services 供插件使用 * feat(behavior-tree-editor): 完善插件注册和文件操作 - 添加文件创建模板和操作处理器 - 实现右键菜单创建行为树功能 - 修复文件读取权限问题(使用 Tauri 命令) - 添加 BehaviorTreeEditorPanel 组件 - 修复 rollup 配置支持动态导入 * feat(plugin): 完善插件构建和发布流程 * fix(behavior-tree-editor): 完整恢复编辑器并修复 Toast 集成 * fix(behavior-tree-editor): 修复节点选中、连线跟随和文件加载问题并优化性能 * fix(behavior-tree-editor): 修复端口连接失败问题并优化连线样式 * refactor(behavior-tree-editor): 移除调试面板功能简化代码结构 * refactor(behavior-tree-editor): 清理冗余代码合并重复逻辑 * feat(behavior-tree-editor): 完善编辑器核心功能增强扩展性 * fix(lint): 修复ESLint错误确保CI通过 * refactor(behavior-tree-editor): 优化编辑器工具栏和编译器功能 * refactor(behavior-tree-editor): 清理技术债务,优化代码质量 * fix(editor-app): 修复字符串替换安全问题
2025-11-18 14:46:51 +08:00
#[allow(dead_code)]
2025-10-15 22:30:49 +08:00
pub fn broadcast(&self, message: String) {
let _ = self.tx.send(message);
}
}
async fn handle_connection(
stream: TcpStream,
peer_addr: SocketAddr,
tx: broadcast::Sender<String>,
) {
let ws_stream = match accept_async(stream).await {
Ok(ws) => ws,
Err(e) => {
// 忽略非 WebSocket 连接的错误(如普通 HTTP 请求、健康检查等)
// 这些是正常现象,不需要输出错误日志
let error_str = e.to_string();
if !error_str.contains("Connection: upgrade") && !error_str.contains("protocol error") {
eprintln!("[ProfilerServer] WebSocket error: {}", e);
}
2025-10-15 22:30:49 +08:00
return;
}
};
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
let mut rx = tx.subscribe();
println!("[ProfilerServer] Client {} connected", peer_addr);
// Send initial connection confirmation
let _ = ws_sender
.send(Message::Text(
serde_json::json!({
"type": "connected",
"message": "Connected to ECS Editor Profiler"
})
.to_string(),
))
.await;
// Spawn task to forward broadcast messages to this client
let forward_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
if ws_sender.send(Message::Text(msg)).await.is_err() {
break;
}
}
});
// Handle incoming messages from client
while let Some(msg) = ws_receiver.next().await {
match msg {
Ok(Message::Text(text)) => {
2025-10-16 11:55:41 +08:00
// Parse incoming messages
2025-10-16 17:33:43 +08:00
if let Ok(mut json_value) = serde_json::from_str::<serde_json::Value>(&text) {
let msg_type = json_value.get("type").and_then(|t| t.as_str());
if msg_type == Some("debug_data") {
2025-10-16 11:55:41 +08:00
// Broadcast debug data from game client to all clients (including frontend)
2025-10-15 22:30:49 +08:00
tx.send(text).ok();
2025-10-16 17:33:43 +08:00
} else if msg_type == Some("ping") {
2025-10-15 22:30:49 +08:00
// Respond to ping
let _ = tx.send(
serde_json::json!({
"type": "pong",
"timestamp": chrono::Utc::now().timestamp_millis()
})
.to_string(),
);
2025-10-16 17:33:43 +08:00
} else if msg_type == Some("log") {
// Inject clientId into log messages
if let Some(data) = json_value.get_mut("data").and_then(|d| d.as_object_mut()) {
data.insert("clientId".to_string(), serde_json::Value::String(peer_addr.to_string()));
}
tx.send(json_value.to_string()).ok();
2025-10-16 11:55:41 +08:00
} else {
// Forward all other messages (like get_raw_entity_list, get_entity_details, etc.)
// to all connected clients (this enables frontend -> game client communication)
tx.send(text).ok();
2025-10-15 22:30:49 +08:00
}
}
}
Ok(Message::Close(_)) => {
println!("[ProfilerServer] Client {} disconnected", peer_addr);
break;
}
Ok(Message::Ping(data)) => {
// Respond to WebSocket ping
tx.send(
serde_json::json!({
"type": "pong",
"data": String::from_utf8_lossy(&data)
})
.to_string(),
)
.ok();
}
Err(e) => {
eprintln!("[ProfilerServer] Error: {}", e);
break;
}
_ => {}
}
}
forward_task.abort();
println!("[ProfilerServer] Connection handler ended for {}", peer_addr);
}