diff --git a/packages/network-client/src/core/NetworkClient.ts b/packages/network-client/src/core/NetworkClient.ts index 91b953b5..7d9d5bb4 100644 --- a/packages/network-client/src/core/NetworkClient.ts +++ b/packages/network-client/src/core/NetworkClient.ts @@ -33,6 +33,11 @@ export interface NetworkClientConfig { enableCompression: boolean; enableMessageQueue: boolean; }; + messageQueue?: { + maxSize: number; + flushOnAuthentication: boolean; + processInterval: number; + }; authentication: { autoAuthenticate: boolean; credentials?: Record; @@ -135,6 +140,12 @@ export class NetworkClient extends EventEmitter { enableMessageQueue: true, ...config.features }, + messageQueue: { + maxSize: 100, + flushOnAuthentication: true, + processInterval: 1000, + ...config.messageQueue + }, authentication: { autoAuthenticate: true, ...config.authentication @@ -270,21 +281,67 @@ export class NetworkClient extends EventEmitter { * 发送消息 */ send(message: T): boolean { - if (!this.transport || this.state !== ClientState.Connected && this.state !== ClientState.Authenticated) { - if (this.config.features.enableMessageQueue) { - this.queueMessage(message); - return true; - } else { - this.logger.warn('客户端未连接,无法发送消息'); + // 验证消息基本格式 + if (!this.validateMessage(message)) { + this.logger.warn('消息格式无效,发送失败'); + this.stats.messages.errors++; + return false; + } + + // 根据状态决定发送策略 + switch (this.state) { + case ClientState.Authenticated: + // 已认证,直接发送 + return this.sendImmediate(message); + + case ClientState.Connected: + case ClientState.Connecting: + // 已连接但未认证,缓存消息 + if (this.config.features.enableMessageQueue) { + this.queueMessage(message); + this.logger.debug('消息已缓存,等待认证完成'); + return true; + } else { + this.logger.warn('未启用消息队列,消息被丢弃'); + return false; + } + + case ClientState.Reconnecting: + // 重连中,缓存消息 + if (this.config.features.enableMessageQueue) { + this.queueMessage(message); + this.logger.debug('重连中,消息已缓存'); + return true; + } else { + this.logger.warn('重连中且未启用消息队列,消息被丢弃'); + return false; + } + + default: + this.logger.warn(`客户端状态 ${this.state},无法发送消息`); return false; - } + } + } + + /** + * 立即发送消息(不进行状态检查) + */ + private sendImmediate(message: T): boolean { + if (!this.transport) { + this.logger.error('传输层未初始化'); + return false; } try { const serializedMessage = this.serializer.serialize(message); - this.transport.send(serializedMessage.data); + // 确保发送的数据类型正确 + const dataToSend = typeof serializedMessage.data === 'string' + ? serializedMessage.data + : serializedMessage.data.toString(); + this.transport.send(dataToSend); this.stats.messages.sent++; + this.logger.debug(`消息发送成功: ${message.type}`); return true; } catch (error) { @@ -295,6 +352,46 @@ export class NetworkClient extends EventEmitter { } } + /** + * 验证消息格式 + */ + private validateMessage(message: T): boolean { + if (!message) { + return false; + } + + if (!message.messageId) { + this.logger.warn('消息缺少messageId'); + return false; + } + + if (!message.type) { + this.logger.warn('消息缺少type'); + return false; + } + + if (!message.timestamp) { + this.logger.warn('消息缺少timestamp'); + return false; + } + + // 对于游戏消息,验证senderId + if (message.type === MessageType.GAME_EVENT) { + if (!message.senderId) { + this.logger.warn('游戏消息缺少senderId'); + return false; + } + + // 检查senderId是否与当前clientId一致 + if (this.clientId && message.senderId !== this.clientId) { + this.logger.warn(`消息发送者ID不匹配: 期望 ${this.clientId}, 实际 ${message.senderId}`); + return false; + } + } + + return true; + } + /** * 获取客户端状态 */ @@ -412,9 +509,42 @@ export class NetworkClient extends EventEmitter { this.stats.state = newState; this.logger.debug(`客户端状态变化: ${oldState} -> ${newState}`); + + // 状态变更的副作用处理 + this.handleStateTransition(oldState, newState); + this.eventHandlers.stateChanged?.(oldState, newState); } + /** + * 处理状态转换的副作用 + */ + private handleStateTransition(oldState: ClientState, newState: ClientState): void { + switch (newState) { + case ClientState.Authenticated: + // 认证成功后自动处理消息队列 + if (oldState !== ClientState.Authenticated) { + this.logger.info('认证完成,处理缓存消息队列'); + this.processMessageQueue(); + } + break; + + case ClientState.Disconnected: + // 断开连接时清理资源 + if (this.config.features.enableMessageQueue && this.messageQueue.length > 0) { + this.logger.info(`连接断开,清理 ${this.messageQueue.length} 条缓存消息`); + this.clearMessageQueue(); + } + this.clientId = undefined; + break; + + case ClientState.Connecting: + // 连接开始时重置统计 + this.stats.connectionTime = Date.now(); + break; + } + } + /** * 设置事件处理器 */ @@ -637,10 +767,18 @@ export class NetworkClient extends EventEmitter { * 将消息加入队列 */ private queueMessage(message: INetworkMessage): void { + // 检查队列大小限制 + const maxSize = this.config.messageQueue?.maxSize || 100; + if (this.messageQueue.length >= maxSize) { + // 移除最旧的消息 + const removed = this.messageQueue.shift(); + this.logger.warn(`消息队列已满 (${maxSize}),移除最旧消息:`, removed?.type); + } + this.messageQueue.push(message); this.stats.messages.queued = this.messageQueue.length; - this.logger.debug(`消息已加入队列,当前队列长度: ${this.messageQueue.length}`); + this.logger.debug(`消息已加入队列,当前队列长度: ${this.messageQueue.length}/${maxSize}`); } /** @@ -651,20 +789,43 @@ export class NetworkClient extends EventEmitter { return; } + if (this.state !== ClientState.Authenticated) { + this.logger.debug('未认证,跳过消息队列处理'); + return; + } + this.isProcessingQueue = true; + const startQueueSize = this.messageQueue.length; + + this.logger.info(`开始处理消息队列,共 ${startQueueSize} 条消息`); try { - while (this.messageQueue.length > 0 && this.isConnected()) { + let processedCount = 0; + let failedCount = 0; + + while (this.messageQueue.length > 0 && this.state === ClientState.Authenticated) { const message = this.messageQueue.shift()!; - if (this.send(message)) { - this.logger.debug(`队列消息发送成功,剩余: ${this.messageQueue.length}`); + // 使用sendImmediate避免递归调用 + if (this.sendImmediate(message)) { + processedCount++; + this.logger.debug(`队列消息发送成功 [${processedCount}/${startQueueSize}]`); } else { + failedCount++; // 发送失败,重新加入队列头部 this.messageQueue.unshift(message); + this.logger.warn(`队列消息发送失败,剩余: ${this.messageQueue.length}`); break; } + + // 避免阻塞,每处理一定数量消息后暂停 + if (processedCount % 10 === 0) { + await new Promise(resolve => setTimeout(resolve, 1)); + } } + + this.logger.info(`消息队列处理完成: 成功 ${processedCount}, 失败 ${failedCount}, 剩余 ${this.messageQueue.length}`); + } finally { this.isProcessingQueue = false; this.stats.messages.queued = this.messageQueue.length; @@ -695,4 +856,46 @@ export class NetworkClient extends EventEmitter { public getHeartbeatStats() { return this.heartbeatManager.getStats(); } + + /** + * 检查客户端是否已认证并可以发送消息 + */ + public isReady(): boolean { + return this.state === ClientState.Authenticated && !!this.clientId; + } + + /** + * 获取消息队列状态 + */ + public getQueueStatus(): { length: number; isProcessing: boolean } { + return { + length: this.messageQueue.length, + isProcessing: this.isProcessingQueue + }; + } + + /** + * 手动触发消息队列处理(调试用) + */ + public flushMessageQueue(): void { + if (this.state === ClientState.Authenticated) { + this.processMessageQueue(); + } else { + this.logger.warn('客户端未认证,无法处理消息队列'); + } + } + + /** + * 设置消息队列最大大小 + */ + public setMaxQueueSize(size: number): void { + if (size > 0) { + // 如果新大小比当前队列小,截取队列 + if (size < this.messageQueue.length) { + const removed = this.messageQueue.length - size; + this.messageQueue.splice(0, removed); + this.logger.info(`消息队列大小调整为 ${size},移除了 ${removed} 条最旧消息`); + } + } + } } \ No newline at end of file diff --git a/packages/network-server/src/core/NetworkServer.ts b/packages/network-server/src/core/NetworkServer.ts index fc28d094..253e5db7 100644 --- a/packages/network-server/src/core/NetworkServer.ts +++ b/packages/network-server/src/core/NetworkServer.ts @@ -531,7 +531,7 @@ export class NetworkServer extends EventEmitter { // 反序列化消息 const deserializationResult = this.serializer.deserialize(data); if (!deserializationResult.isValid) { - this.logger.warn(`消息反序列化失败: ${deserializationResult.errors?.join(', ')}`); + this.logger.debug(`消息反序列化失败 (${clientId}): ${deserializationResult.errors?.join(', ')}`); this.stats.messages.errors++; return; } diff --git a/packages/network-server/src/transport/WebSocketTransport.ts b/packages/network-server/src/transport/WebSocketTransport.ts index b9a622a2..5aff3c78 100644 --- a/packages/network-server/src/transport/WebSocketTransport.ts +++ b/packages/network-server/src/transport/WebSocketTransport.ts @@ -327,6 +327,12 @@ export class WebSocketTransport extends EventEmitter implements ITransport { */ private handleClientMessage(clientId: string, data: WebSocket.Data): void { try { + // 检查是否为有效的应用消息 + if (!this.isApplicationMessage(data)) { + this.logger.debug(`忽略非应用消息 (${clientId}): ${typeof data} ${data instanceof ArrayBuffer ? data.byteLength : data.toString().length} bytes`); + return; + } + const message = data instanceof ArrayBuffer ? data : new TextEncoder().encode(data.toString()).buffer; // 触发消息事件 @@ -344,6 +350,35 @@ export class WebSocketTransport extends EventEmitter implements ITransport { } } + /** + * 检查是否为有效的应用消息 + */ + private isApplicationMessage(data: WebSocket.Data): boolean { + try { + // 转换为字符串进行检查 + const jsonString = data instanceof ArrayBuffer + ? new TextDecoder().decode(data) + : data.toString(); + + // 基本长度检查 - 空消息或过短消息通常不是应用消息 + if (!jsonString || jsonString.length < 10) { + return false; + } + + // 尝试解析JSON + const parsed = JSON.parse(jsonString); + + // 检查是否有基本的消息结构 + return parsed && + typeof parsed === 'object' && + (parsed.type || parsed.messageId || parsed.data); + + } catch (error) { + // JSON解析失败,可能是握手数据或其他非JSON消息 + return false; + } + } + /** * 处理客户端断开连接 */