网络层完善消息队列

This commit is contained in:
YHH
2025-09-04 16:25:18 +08:00
parent 20a3f03e12
commit 4137eb2bce
3 changed files with 251 additions and 13 deletions

View File

@@ -33,6 +33,11 @@ export interface NetworkClientConfig {
enableCompression: boolean;
enableMessageQueue: boolean;
};
messageQueue?: {
maxSize: number;
flushOnAuthentication: boolean;
processInterval: number;
};
authentication: {
autoAuthenticate: boolean;
credentials?: Record<string, any>;
@@ -135,6 +140,12 @@ export class NetworkClient extends EventEmitter {
enableMessageQueue: true,
...config.features
},
messageQueue: {
maxSize: 100,
flushOnAuthentication: true,
processInterval: 1000,
...config.messageQueue
},
authentication: {
autoAuthenticate: true,
...config.authentication
@@ -270,21 +281,67 @@ export class NetworkClient extends EventEmitter {
* 发送消息
*/
send<T extends INetworkMessage>(message: T): boolean {
if (!this.transport || this.state !== ClientState.Connected && this.state !== ClientState.Authenticated) {
// 验证消息基本格式
if (!this.validateMessage(message)) {
this.logger.warn('消息格式无效,发送失败');
this.stats.messages.errors++;
return false;
}
// 根据状态决定发送策略
switch (this.state) {
case ClientState.Authenticated:
// 已认证,直接发送
return this.sendImmediate(message);
case ClientState.Connected:
case ClientState.Connecting:
// 已连接但未认证,缓存消息
if (this.config.features.enableMessageQueue) {
this.queueMessage(message);
this.logger.debug('消息已缓存,等待认证完成');
return true;
} else {
this.logger.warn('客户端未连接,无法发送消息');
this.logger.warn('未启用消息队列,消息被丢弃');
return false;
}
case ClientState.Reconnecting:
// 重连中,缓存消息
if (this.config.features.enableMessageQueue) {
this.queueMessage(message);
this.logger.debug('重连中,消息已缓存');
return true;
} else {
this.logger.warn('重连中且未启用消息队列,消息被丢弃');
return false;
}
default:
this.logger.warn(`客户端状态 ${this.state},无法发送消息`);
return false;
}
}
/**
* 立即发送消息(不进行状态检查)
*/
private sendImmediate<T extends INetworkMessage>(message: T): boolean {
if (!this.transport) {
this.logger.error('传输层未初始化');
return false;
}
try {
const serializedMessage = this.serializer.serialize(message);
this.transport.send(serializedMessage.data);
// 确保发送的数据类型正确
const dataToSend = typeof serializedMessage.data === 'string'
? serializedMessage.data
: serializedMessage.data.toString();
this.transport.send(dataToSend);
this.stats.messages.sent++;
this.logger.debug(`消息发送成功: ${message.type}`);
return true;
} catch (error) {
@@ -295,6 +352,46 @@ export class NetworkClient extends EventEmitter {
}
}
/**
* 验证消息格式
*/
private validateMessage<T extends INetworkMessage>(message: T): boolean {
if (!message) {
return false;
}
if (!message.messageId) {
this.logger.warn('消息缺少messageId');
return false;
}
if (!message.type) {
this.logger.warn('消息缺少type');
return false;
}
if (!message.timestamp) {
this.logger.warn('消息缺少timestamp');
return false;
}
// 对于游戏消息验证senderId
if (message.type === MessageType.GAME_EVENT) {
if (!message.senderId) {
this.logger.warn('游戏消息缺少senderId');
return false;
}
// 检查senderId是否与当前clientId一致
if (this.clientId && message.senderId !== this.clientId) {
this.logger.warn(`消息发送者ID不匹配: 期望 ${this.clientId}, 实际 ${message.senderId}`);
return false;
}
}
return true;
}
/**
* 获取客户端状态
*/
@@ -412,9 +509,42 @@ export class NetworkClient extends EventEmitter {
this.stats.state = newState;
this.logger.debug(`客户端状态变化: ${oldState} -> ${newState}`);
// 状态变更的副作用处理
this.handleStateTransition(oldState, newState);
this.eventHandlers.stateChanged?.(oldState, newState);
}
/**
* 处理状态转换的副作用
*/
private handleStateTransition(oldState: ClientState, newState: ClientState): void {
switch (newState) {
case ClientState.Authenticated:
// 认证成功后自动处理消息队列
if (oldState !== ClientState.Authenticated) {
this.logger.info('认证完成,处理缓存消息队列');
this.processMessageQueue();
}
break;
case ClientState.Disconnected:
// 断开连接时清理资源
if (this.config.features.enableMessageQueue && this.messageQueue.length > 0) {
this.logger.info(`连接断开,清理 ${this.messageQueue.length} 条缓存消息`);
this.clearMessageQueue();
}
this.clientId = undefined;
break;
case ClientState.Connecting:
// 连接开始时重置统计
this.stats.connectionTime = Date.now();
break;
}
}
/**
* 设置事件处理器
*/
@@ -637,10 +767,18 @@ export class NetworkClient extends EventEmitter {
* 将消息加入队列
*/
private queueMessage(message: INetworkMessage): void {
// 检查队列大小限制
const maxSize = this.config.messageQueue?.maxSize || 100;
if (this.messageQueue.length >= maxSize) {
// 移除最旧的消息
const removed = this.messageQueue.shift();
this.logger.warn(`消息队列已满 (${maxSize}),移除最旧消息:`, removed?.type);
}
this.messageQueue.push(message);
this.stats.messages.queued = this.messageQueue.length;
this.logger.debug(`消息已加入队列,当前队列长度: ${this.messageQueue.length}`);
this.logger.debug(`消息已加入队列,当前队列长度: ${this.messageQueue.length}/${maxSize}`);
}
/**
@@ -651,20 +789,43 @@ export class NetworkClient extends EventEmitter {
return;
}
if (this.state !== ClientState.Authenticated) {
this.logger.debug('未认证,跳过消息队列处理');
return;
}
this.isProcessingQueue = true;
const startQueueSize = this.messageQueue.length;
this.logger.info(`开始处理消息队列,共 ${startQueueSize} 条消息`);
try {
while (this.messageQueue.length > 0 && this.isConnected()) {
let processedCount = 0;
let failedCount = 0;
while (this.messageQueue.length > 0 && this.state === ClientState.Authenticated) {
const message = this.messageQueue.shift()!;
if (this.send(message)) {
this.logger.debug(`队列消息发送成功,剩余: ${this.messageQueue.length}`);
// 使用sendImmediate避免递归调用
if (this.sendImmediate(message)) {
processedCount++;
this.logger.debug(`队列消息发送成功 [${processedCount}/${startQueueSize}]`);
} else {
failedCount++;
// 发送失败,重新加入队列头部
this.messageQueue.unshift(message);
this.logger.warn(`队列消息发送失败,剩余: ${this.messageQueue.length}`);
break;
}
// 避免阻塞,每处理一定数量消息后暂停
if (processedCount % 10 === 0) {
await new Promise(resolve => setTimeout(resolve, 1));
}
}
this.logger.info(`消息队列处理完成: 成功 ${processedCount}, 失败 ${failedCount}, 剩余 ${this.messageQueue.length}`);
} finally {
this.isProcessingQueue = false;
this.stats.messages.queued = this.messageQueue.length;
@@ -695,4 +856,46 @@ export class NetworkClient extends EventEmitter {
public getHeartbeatStats() {
return this.heartbeatManager.getStats();
}
/**
* 检查客户端是否已认证并可以发送消息
*/
public isReady(): boolean {
return this.state === ClientState.Authenticated && !!this.clientId;
}
/**
* 获取消息队列状态
*/
public getQueueStatus(): { length: number; isProcessing: boolean } {
return {
length: this.messageQueue.length,
isProcessing: this.isProcessingQueue
};
}
/**
* 手动触发消息队列处理(调试用)
*/
public flushMessageQueue(): void {
if (this.state === ClientState.Authenticated) {
this.processMessageQueue();
} else {
this.logger.warn('客户端未认证,无法处理消息队列');
}
}
/**
* 设置消息队列最大大小
*/
public setMaxQueueSize(size: number): void {
if (size > 0) {
// 如果新大小比当前队列小,截取队列
if (size < this.messageQueue.length) {
const removed = this.messageQueue.length - size;
this.messageQueue.splice(0, removed);
this.logger.info(`消息队列大小调整为 ${size},移除了 ${removed} 条最旧消息`);
}
}
}
}

View File

@@ -531,7 +531,7 @@ export class NetworkServer extends EventEmitter {
// 反序列化消息
const deserializationResult = this.serializer.deserialize<INetworkMessage>(data);
if (!deserializationResult.isValid) {
this.logger.warn(`消息反序列化失败: ${deserializationResult.errors?.join(', ')}`);
this.logger.debug(`消息反序列化失败 (${clientId}): ${deserializationResult.errors?.join(', ')}`);
this.stats.messages.errors++;
return;
}

View File

@@ -327,6 +327,12 @@ export class WebSocketTransport extends EventEmitter implements ITransport {
*/
private handleClientMessage(clientId: string, data: WebSocket.Data): void {
try {
// 检查是否为有效的应用消息
if (!this.isApplicationMessage(data)) {
this.logger.debug(`忽略非应用消息 (${clientId}): ${typeof data} ${data instanceof ArrayBuffer ? data.byteLength : data.toString().length} bytes`);
return;
}
const message = data instanceof ArrayBuffer ? data : new TextEncoder().encode(data.toString()).buffer;
// 触发消息事件
@@ -344,6 +350,35 @@ export class WebSocketTransport extends EventEmitter implements ITransport {
}
}
/**
* 检查是否为有效的应用消息
*/
private isApplicationMessage(data: WebSocket.Data): boolean {
try {
// 转换为字符串进行检查
const jsonString = data instanceof ArrayBuffer
? new TextDecoder().decode(data)
: data.toString();
// 基本长度检查 - 空消息或过短消息通常不是应用消息
if (!jsonString || jsonString.length < 10) {
return false;
}
// 尝试解析JSON
const parsed = JSON.parse(jsonString);
// 检查是否有基本的消息结构
return parsed &&
typeof parsed === 'object' &&
(parsed.type || parsed.messageId || parsed.data);
} catch (error) {
// JSON解析失败可能是握手数据或其他非JSON消息
return false;
}
}
/**
* 处理客户端断开连接
*/