Files
esengine/packages/network-client/src/core/MessageQueue.ts

680 lines
18 KiB
TypeScript
Raw Normal View History

/**
*
*
*/
import { createLogger, ITimer } from '@esengine/ecs-framework';
import { INetworkMessage, MessageType } from '@esengine/network-shared';
import { NetworkTimerManager } from '../utils';
/**
*
*/
export enum MessagePriority {
Low = 1,
Normal = 5,
High = 8,
Critical = 10
}
/**
*
*/
export interface QueuedMessage {
id: string;
message: INetworkMessage;
priority: MessagePriority;
timestamp: number;
retryCount: number;
maxRetries: number;
reliable: boolean;
timeoutMs?: number;
callback?: (success: boolean, error?: Error) => void;
}
/**
*
*/
export interface MessageQueueConfig {
maxQueueSize: number;
maxRetries: number;
retryDelay: number;
processingInterval: number;
enablePriority: boolean;
enableReliableDelivery: boolean;
defaultTimeout: number;
}
/**
*
*/
export interface QueueStats {
totalQueued: number;
totalProcessed: number;
totalFailed: number;
currentSize: number;
averageProcessingTime: number;
messagesByPriority: Record<MessagePriority, number>;
reliableMessages: number;
expiredMessages: number;
}
/**
*
*/
export interface MessageQueueEvents {
messageQueued: (message: QueuedMessage) => void;
messageProcessed: (message: QueuedMessage, success: boolean) => void;
messageFailed: (message: QueuedMessage, error: Error) => void;
messageExpired: (message: QueuedMessage) => void;
queueFull: (droppedMessage: QueuedMessage) => void;
}
/**
*
*/
export class MessageQueue {
private logger = createLogger('MessageQueue');
private config: MessageQueueConfig;
private stats: QueueStats;
// 队列存储
private primaryQueue: QueuedMessage[] = [];
private priorityQueues: Map<MessagePriority, QueuedMessage[]> = new Map();
private retryQueue: QueuedMessage[] = [];
private processingMap: Map<string, QueuedMessage> = new Map();
// 定时器
private processingTimer?: ITimer;
private retryTimer?: ITimer;
private cleanupTimer?: ITimer;
// 事件处理器
private eventHandlers: Partial<MessageQueueEvents> = {};
// 发送回调
private sendCallback?: (message: INetworkMessage) => Promise<boolean>;
// 性能统计
private processingTimes: number[] = [];
/**
*
*/
constructor(config: Partial<MessageQueueConfig> = {}) {
this.config = {
maxQueueSize: 1000,
maxRetries: 3,
retryDelay: 1000,
processingInterval: 100,
enablePriority: true,
enableReliableDelivery: true,
defaultTimeout: 30000,
...config
};
this.stats = {
totalQueued: 0,
totalProcessed: 0,
totalFailed: 0,
currentSize: 0,
averageProcessingTime: 0,
messagesByPriority: {
[MessagePriority.Low]: 0,
[MessagePriority.Normal]: 0,
[MessagePriority.High]: 0,
[MessagePriority.Critical]: 0
},
reliableMessages: 0,
expiredMessages: 0
};
// 初始化优先级队列
if (this.config.enablePriority) {
this.priorityQueues.set(MessagePriority.Critical, []);
this.priorityQueues.set(MessagePriority.High, []);
this.priorityQueues.set(MessagePriority.Normal, []);
this.priorityQueues.set(MessagePriority.Low, []);
}
}
/**
*
*/
start(sendCallback: (message: INetworkMessage) => Promise<boolean>): void {
this.sendCallback = sendCallback;
this.processingTimer = NetworkTimerManager.schedule(
this.config.processingInterval / 1000,
true, // 重复执行
this,
() => this.processQueue()
);
if (this.config.maxRetries > 0) {
this.retryTimer = NetworkTimerManager.schedule(
this.config.retryDelay / 1000,
true, // 重复执行
this,
() => this.processRetryQueue()
);
}
this.cleanupTimer = NetworkTimerManager.schedule(
10, // 10秒
true, // 重复执行
this,
() => this.cleanupExpiredMessages()
);
this.logger.info('消息队列已启动');
}
/**
*
*/
stop(): void {
if (this.processingTimer) {
this.processingTimer.stop();
this.processingTimer = undefined;
}
if (this.retryTimer) {
this.retryTimer.stop();
this.retryTimer = undefined;
}
if (this.cleanupTimer) {
this.cleanupTimer.stop();
this.cleanupTimer = undefined;
}
this.logger.info('消息队列已停止');
}
/**
*
*/
enqueue(
message: INetworkMessage,
options: {
priority?: MessagePriority;
reliable?: boolean;
timeout?: number;
maxRetries?: number;
callback?: (success: boolean, error?: Error) => void;
} = {}
): boolean {
// 检查队列大小限制
if (this.getTotalSize() >= this.config.maxQueueSize) {
const droppedMessage = this.createQueuedMessage(message, options);
this.eventHandlers.queueFull?.(droppedMessage);
this.logger.warn('队列已满,丢弃消息:', message.type);
return false;
}
const queuedMessage = this.createQueuedMessage(message, options);
// 根据配置选择队列策略
if (this.config.enablePriority) {
this.enqueueToPriorityQueue(queuedMessage);
} else {
this.primaryQueue.push(queuedMessage);
}
this.updateQueueStats(queuedMessage);
this.eventHandlers.messageQueued?.(queuedMessage);
return true;
}
/**
*
*/
clear(): number {
const count = this.getTotalSize();
this.primaryQueue.length = 0;
this.retryQueue.length = 0;
this.processingMap.clear();
for (const queue of this.priorityQueues.values()) {
queue.length = 0;
}
this.stats.currentSize = 0;
this.logger.info(`已清空队列,清理了 ${count} 条消息`);
return count;
}
/**
*
*/
getStats(): QueueStats {
this.updateCurrentStats();
return { ...this.stats };
}
/**
*
*/
resetStats(): void {
this.stats = {
totalQueued: 0,
totalProcessed: 0,
totalFailed: 0,
currentSize: this.getTotalSize(),
averageProcessingTime: 0,
messagesByPriority: {
[MessagePriority.Low]: 0,
[MessagePriority.Normal]: 0,
[MessagePriority.High]: 0,
[MessagePriority.Critical]: 0
},
reliableMessages: 0,
expiredMessages: 0
};
this.processingTimes.length = 0;
}
/**
*
*/
on<K extends keyof MessageQueueEvents>(event: K, handler: MessageQueueEvents[K]): void {
this.eventHandlers[event] = handler;
}
/**
*
*/
off<K extends keyof MessageQueueEvents>(event: K): void {
delete this.eventHandlers[event];
}
/**
*
*/
updateConfig(newConfig: Partial<MessageQueueConfig>): void {
Object.assign(this.config, newConfig);
this.logger.info('消息队列配置已更新:', newConfig);
}
/**
*
*/
size(): number {
return this.getTotalSize();
}
/**
*
*/
isEmpty(): boolean {
return this.getTotalSize() === 0;
}
/**
*
*/
isFull(): boolean {
return this.getTotalSize() >= this.config.maxQueueSize;
}
/**
*
*/
private createQueuedMessage(
message: INetworkMessage,
options: any
): QueuedMessage {
const priority = options.priority || this.getMessagePriority(message);
const reliable = options.reliable ?? this.isReliableMessage(message);
return {
id: `${message.messageId}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`,
message,
priority,
timestamp: Date.now(),
retryCount: 0,
maxRetries: options.maxRetries ?? this.config.maxRetries,
reliable,
timeoutMs: options.timeout ?? this.config.defaultTimeout,
callback: options.callback
};
}
/**
*
*/
private enqueueToPriorityQueue(message: QueuedMessage): void {
const queue = this.priorityQueues.get(message.priority);
if (queue) {
queue.push(message);
} else {
this.primaryQueue.push(message);
}
}
/**
*
*/
private dequeue(): QueuedMessage | undefined {
if (this.config.enablePriority) {
// 按优先级顺序处理
for (const priority of [MessagePriority.Critical, MessagePriority.High, MessagePriority.Normal, MessagePriority.Low]) {
const queue = this.priorityQueues.get(priority);
if (queue && queue.length > 0) {
return queue.shift();
}
}
}
return this.primaryQueue.shift();
}
/**
*
*/
private async processQueue(): Promise<void> {
if (!this.sendCallback || this.getTotalSize() === 0) {
return;
}
const message = this.dequeue();
if (!message) {
return;
}
// 检查消息是否过期
if (this.isMessageExpired(message)) {
this.handleExpiredMessage(message);
return;
}
const startTime = Date.now();
try {
// 将消息标记为处理中
this.processingMap.set(message.id, message);
const success = await this.sendCallback(message.message);
const processingTime = Date.now() - startTime;
this.updateProcessingTime(processingTime);
if (success) {
this.handleSuccessfulMessage(message);
} else {
this.handleFailedMessage(message, new Error('发送失败'));
}
} catch (error) {
this.handleFailedMessage(message, error as Error);
} finally {
this.processingMap.delete(message.id);
}
}
/**
*
*/
private async processRetryQueue(): Promise<void> {
if (this.retryQueue.length === 0 || !this.sendCallback) {
return;
}
const message = this.retryQueue.shift();
if (!message) {
return;
}
// 检查是否可以重试
if (message.retryCount >= message.maxRetries) {
this.handleFailedMessage(message, new Error('达到最大重试次数'));
return;
}
// 检查消息是否过期
if (this.isMessageExpired(message)) {
this.handleExpiredMessage(message);
return;
}
message.retryCount++;
try {
const success = await this.sendCallback(message.message);
if (success) {
this.handleSuccessfulMessage(message);
} else {
// 重新加入重试队列
this.retryQueue.push(message);
}
} catch (error) {
// 重新加入重试队列
this.retryQueue.push(message);
}
}
/**
*
*/
private handleSuccessfulMessage(message: QueuedMessage): void {
this.stats.totalProcessed++;
if (message.callback) {
try {
message.callback(true);
} catch (error) {
this.logger.error('消息回调执行失败:', error);
}
}
this.eventHandlers.messageProcessed?.(message, true);
}
/**
*
*/
private handleFailedMessage(message: QueuedMessage, error: Error): void {
// 如果是可靠消息且未达到最大重试次数,加入重试队列
if (message.reliable && message.retryCount < message.maxRetries) {
this.retryQueue.push(message);
} else {
this.stats.totalFailed++;
if (message.callback) {
try {
message.callback(false, error);
} catch (callbackError) {
this.logger.error('消息回调执行失败:', callbackError);
}
}
this.eventHandlers.messageFailed?.(message, error);
}
this.eventHandlers.messageProcessed?.(message, false);
}
/**
*
*/
private handleExpiredMessage(message: QueuedMessage): void {
this.stats.expiredMessages++;
if (message.callback) {
try {
message.callback(false, new Error('消息已过期'));
} catch (error) {
this.logger.error('消息回调执行失败:', error);
}
}
this.eventHandlers.messageExpired?.(message);
}
/**
*
*/
private cleanupExpiredMessages(): void {
const now = Date.now();
let cleanedCount = 0;
// 清理主队列
this.primaryQueue = this.primaryQueue.filter(msg => {
if (this.isMessageExpired(msg)) {
this.handleExpiredMessage(msg);
cleanedCount++;
return false;
}
return true;
});
// 清理优先级队列
for (const [priority, queue] of this.priorityQueues) {
this.priorityQueues.set(priority, queue.filter(msg => {
if (this.isMessageExpired(msg)) {
this.handleExpiredMessage(msg);
cleanedCount++;
return false;
}
return true;
}));
}
// 清理重试队列
this.retryQueue = this.retryQueue.filter(msg => {
if (this.isMessageExpired(msg)) {
this.handleExpiredMessage(msg);
cleanedCount++;
return false;
}
return true;
});
if (cleanedCount > 0) {
this.logger.debug(`清理了 ${cleanedCount} 条过期消息`);
}
}
/**
*
*/
private isMessageExpired(message: QueuedMessage): boolean {
if (!message.timeoutMs) {
return false;
}
return Date.now() - message.timestamp > message.timeoutMs;
}
/**
*
*/
private getMessagePriority(message: INetworkMessage): MessagePriority {
switch (message.type) {
case MessageType.HEARTBEAT:
return MessagePriority.Low;
case MessageType.CONNECT:
case MessageType.DISCONNECT:
return MessagePriority.High;
case MessageType.ERROR:
return MessagePriority.Critical;
default:
return MessagePriority.Normal;
}
}
/**
*
*/
private isReliableMessage(message: INetworkMessage): boolean {
if (!this.config.enableReliableDelivery) {
return false;
}
// 某些消息类型默认需要可靠传输
const reliableTypes = [
MessageType.CONNECT,
MessageType.RPC_CALL,
MessageType.ENTITY_CREATE,
MessageType.ENTITY_DESTROY
];
return reliableTypes.includes(message.type) || message.reliable === true;
}
/**
*
*/
private getTotalSize(): number {
let size = this.primaryQueue.length + this.retryQueue.length + this.processingMap.size;
for (const queue of this.priorityQueues.values()) {
size += queue.length;
}
return size;
}
/**
*
*/
private updateQueueStats(message: QueuedMessage): void {
this.stats.totalQueued++;
this.stats.currentSize = this.getTotalSize();
this.stats.messagesByPriority[message.priority]++;
if (message.reliable) {
this.stats.reliableMessages++;
}
}
/**
*
*/
private updateCurrentStats(): void {
this.stats.currentSize = this.getTotalSize();
}
/**
*
*/
private updateProcessingTime(time: number): void {
this.processingTimes.push(time);
// 保持最近1000个样本
if (this.processingTimes.length > 1000) {
this.processingTimes.shift();
}
// 计算平均处理时间
this.stats.averageProcessingTime =
this.processingTimes.reduce((sum, t) => sum + t, 0) / this.processingTimes.length;
}
/**
*
*/
getDetailedStatus() {
return {
stats: this.getStats(),
config: this.config,
queueSizes: {
primary: this.primaryQueue.length,
retry: this.retryQueue.length,
processing: this.processingMap.size,
priorities: Object.fromEntries(
Array.from(this.priorityQueues.entries()).map(([priority, queue]) => [priority, queue.length])
)
},
isRunning: !!this.processingTimer,
processingTimes: [...this.processingTimes]
};
}
}