Files
esengine/packages/network-client/src/transport/WebSocketClientTransport.ts

282 lines
7.3 KiB
TypeScript
Raw Normal View History

2025-08-12 09:39:07 +08:00
/**
* WebSocket
*/
import { Core, ITimer } from '@esengine/ecs-framework';
import {
ClientTransport,
ClientTransportConfig,
ConnectionState,
ClientMessage
} from './ClientTransport';
/**
* WebSocket
*/
export interface WebSocketClientConfig extends ClientTransportConfig {
/** WebSocket 路径 */
path?: string;
/** 协议列表 */
protocols?: string | string[];
/** 额外的请求头 */
headers?: Record<string, string>;
/** 是否启用二进制消息 */
binaryType?: 'blob' | 'arraybuffer';
/** WebSocket 扩展 */
extensions?: any;
}
/**
* WebSocket
*/
export class WebSocketClientTransport extends ClientTransport {
private websocket: WebSocket | null = null;
private connectionPromise: Promise<void> | null = null;
private connectionTimeoutTimer: ITimer<any> | null = null;
protected override config: WebSocketClientConfig;
constructor(config: WebSocketClientConfig) {
super(config);
this.config = {
path: '/ws',
protocols: [],
headers: {},
binaryType: 'arraybuffer',
...config
};
}
/**
*
*/
async connect(): Promise<void> {
if (this.state === ConnectionState.CONNECTING ||
this.state === ConnectionState.CONNECTED) {
return this.connectionPromise || Promise.resolve();
}
this.setState(ConnectionState.CONNECTING);
this.stopReconnect(); // 停止任何正在进行的重连
this.connectionPromise = new Promise((resolve, reject) => {
try {
// 构建WebSocket URL
const protocol = this.config.secure ? 'wss' : 'ws';
const url = `${protocol}://${this.config.host}:${this.config.port}${this.config.path}`;
// 创建WebSocket连接
this.websocket = new WebSocket(url, this.config.protocols);
if (this.config.binaryType) {
this.websocket.binaryType = this.config.binaryType;
}
// 设置连接超时
this.connectionTimeoutTimer = Core.schedule(this.config.connectionTimeout! / 1000, false, this, () => {
if (this.websocket && this.websocket.readyState === WebSocket.CONNECTING) {
this.websocket.close();
reject(new Error('Connection timeout'));
}
});
// WebSocket 事件处理
this.websocket.onopen = () => {
if (this.connectionTimeoutTimer) {
this.connectionTimeoutTimer.stop();
this.connectionTimeoutTimer = null;
}
this.setState(ConnectionState.CONNECTED);
resolve();
};
this.websocket.onclose = (event) => {
if (this.connectionTimeoutTimer) {
this.connectionTimeoutTimer.stop();
this.connectionTimeoutTimer = null;
}
this.handleClose(event.code, event.reason);
if (this.state === ConnectionState.CONNECTING) {
reject(new Error(`Connection failed: ${event.reason || 'Unknown error'}`));
}
};
this.websocket.onerror = (event) => {
if (this.connectionTimeoutTimer) {
this.connectionTimeoutTimer.stop();
this.connectionTimeoutTimer = null;
}
const error = new Error('WebSocket error');
this.handleError(error);
if (this.state === ConnectionState.CONNECTING) {
reject(error);
}
};
this.websocket.onmessage = (event) => {
this.handleWebSocketMessage(event);
};
} catch (error) {
this.setState(ConnectionState.ERROR);
reject(error);
}
});
return this.connectionPromise;
}
/**
*
*/
async disconnect(): Promise<void> {
this.stopReconnect();
if (this.websocket) {
// 设置状态为断开连接,避免触发重连
this.setState(ConnectionState.DISCONNECTED);
if (this.websocket.readyState === WebSocket.OPEN ||
this.websocket.readyState === WebSocket.CONNECTING) {
this.websocket.close(1000, 'Client disconnect');
}
this.websocket = null;
}
this.connectionPromise = null;
}
/**
*
*/
async sendMessage(message: ClientMessage): Promise<boolean> {
if (!this.websocket || this.websocket.readyState !== WebSocket.OPEN) {
// 如果未连接,将消息加入队列
if (this.state === ConnectionState.CONNECTING ||
this.state === ConnectionState.RECONNECTING) {
return this.queueMessage(message);
}
return false;
}
try {
// 序列化消息
const serialized = JSON.stringify({
...message,
timestamp: message.timestamp || Date.now()
});
// 发送消息
this.websocket.send(serialized);
this.updateSendStats(message);
return true;
} catch (error) {
this.handleError(error as Error);
return false;
}
}
/**
* WebSocket
*/
private handleWebSocketMessage(event: MessageEvent): void {
try {
let data: string;
if (event.data instanceof ArrayBuffer) {
// 处理二进制数据
data = new TextDecoder().decode(event.data);
} else if (event.data instanceof Blob) {
// Blob 需要异步处理
event.data.text().then(text => {
this.processMessage(text);
});
return;
} else {
// 字符串数据
data = event.data;
}
this.processMessage(data);
} catch (error) {
console.error('Error processing WebSocket message:', error);
}
}
/**
*
*/
private processMessage(data: string): void {
try {
const message: ClientMessage = JSON.parse(data);
this.handleMessage(message);
} catch (error) {
console.error('Error parsing message:', error);
}
}
/**
*
*/
private handleClose(code: number, reason: string): void {
this.websocket = null;
this.connectionPromise = null;
const wasConnected = this.isConnected();
// 根据关闭代码决定是否重连
if (code === 1000) {
// 正常关闭,不重连
this.setState(ConnectionState.DISCONNECTED);
this.emit('disconnected', reason || 'Normal closure');
} else if (wasConnected && this.reconnectAttempts < this.config.maxReconnectAttempts!) {
// 异常关闭,尝试重连
this.emit('disconnected', reason || `Abnormal closure (${code})`);
this.startReconnect();
} else {
// 达到最大重连次数或其他情况
this.setState(ConnectionState.DISCONNECTED);
this.emit('disconnected', reason || `Connection lost (${code})`);
}
}
/**
* WebSocket
*/
getReadyState(): number {
return this.websocket?.readyState ?? WebSocket.CLOSED;
}
/**
* WebSocket
*/
getWebSocket(): WebSocket | null {
return this.websocket;
}
/**
* WebSocket
*/
static isSupported(): boolean {
return typeof WebSocket !== 'undefined';
}
/**
*
*/
override destroy(): void {
if (this.connectionTimeoutTimer) {
this.connectionTimeoutTimer.stop();
this.connectionTimeoutTimer = null;
}
this.disconnect();
super.destroy();
}
}