实现ServerRpc和ClientRpc装饰器

This commit is contained in:
YHH
2025-08-20 10:32:56 +08:00
parent 0a1d7ac083
commit 69616bbddc
9 changed files with 2826 additions and 1 deletions

View File

@@ -0,0 +1,454 @@
import { createLogger } from '@esengine/ecs-framework';
import { EventEmitter } from '../utils/EventEmitter';
import {
RpcCallRequest,
RpcCallResponse,
RpcError,
RpcErrorType,
RpcStats,
RpcMethodMetadata
} from '../types/RpcTypes';
import { RpcMetadataManager } from './RpcMetadataManager';
/**
* RPC调用统计信息
*/
interface CallStats {
startTime: number;
endTime?: number;
duration?: number;
success: boolean;
error?: RpcError;
}
/**
* 速率限制器
*/
interface RateLimiter {
calls: number[];
limit: number;
window: number; // 时间窗口(毫秒)
}
/**
* RPC调用处理器事件
*/
export interface RpcCallHandlerEvents {
callStarted: (request: RpcCallRequest) => void;
callCompleted: (request: RpcCallRequest, response: RpcCallResponse) => void;
callFailed: (request: RpcCallRequest, error: RpcError) => void;
rateLimitExceeded: (methodName: string, senderId: string) => void;
permissionDenied: (methodName: string, senderId: string) => void;
}
/**
* RPC调用处理器配置
*/
export interface RpcCallHandlerConfig {
/** 最大并发调用数 */
maxConcurrentCalls: number;
/** 默认超时时间(毫秒) */
defaultTimeout: number;
/** 是否启用速率限制 */
enableRateLimit: boolean;
/** 是否启用权限检查 */
enablePermissionCheck: boolean;
/** 是否启用性能监控 */
enablePerformanceMonitoring: boolean;
/** 统计数据保留时间(毫秒) */
statsRetentionTime: number;
}
/**
* RPC调用处理器
* 负责处理来自客户端的RPC调用请求
*/
export class RpcCallHandler extends EventEmitter {
private logger = createLogger('RpcCallHandler');
private config: RpcCallHandlerConfig;
private metadataManager: RpcMetadataManager;
/** 当前活跃的调用 */
private activeCalls = new Map<string, CallStats>();
/** 速率限制器 */
private rateLimiters = new Map<string, RateLimiter>();
/** 统计信息 */
private stats: RpcStats = {
totalCalls: 0,
successfulCalls: 0,
failedCalls: 0,
averageResponseTime: 0,
pendingCalls: 0,
timeoutCalls: 0,
retryCount: 0,
lastUpdated: Date.now()
};
/** 历史调用统计 */
private callHistory: CallStats[] = [];
/** 权限检查器 */
private permissionChecker?: (methodName: string, senderId: string) => boolean;
constructor(
metadataManager: RpcMetadataManager,
config: Partial<RpcCallHandlerConfig> = {}
) {
super();
this.metadataManager = metadataManager;
this.config = {
maxConcurrentCalls: 100,
defaultTimeout: 30000,
enableRateLimit: true,
enablePermissionCheck: true,
enablePerformanceMonitoring: true,
statsRetentionTime: 300000, // 5分钟
...config
};
}
/**
* 设置权限检查器
*/
public setPermissionChecker(checker: (methodName: string, senderId: string) => boolean): void {
this.permissionChecker = checker;
}
/**
* 处理RPC调用请求
*/
public async handleCall<T extends readonly unknown[], R>(
request: RpcCallRequest<T>
): Promise<RpcCallResponse<R>> {
const startTime = Date.now();
const callStats: CallStats = {
startTime,
success: false
};
this.activeCalls.set(request.callId, callStats);
this.stats.pendingCalls++;
this.emit('callStarted', request);
try {
// 1. 检查并发限制
if (this.activeCalls.size > this.config.maxConcurrentCalls) {
throw this.createError(
RpcErrorType.RATE_LIMITED,
`超过最大并发调用数限制: ${this.config.maxConcurrentCalls}`
);
}
// 2. 获取方法元数据
const metadata = this.metadataManager.getMethodMetadata(request.methodName);
if (!metadata) {
throw this.createError(
RpcErrorType.METHOD_NOT_FOUND,
`RPC方法不存在: ${request.methodName}`
);
}
// 3. 验证方法调用
const validation = this.metadataManager.validateMethodCall(
request.methodName,
Array.from(request.args),
request.senderId
);
if (!validation.valid) {
throw this.createError(
RpcErrorType.INVALID_ARGUMENTS,
validation.error || '参数验证失败'
);
}
// 4. 权限检查
if (this.config.enablePermissionCheck && !this.checkPermission(metadata, request.senderId)) {
this.emit('permissionDenied', request.methodName, request.senderId);
throw this.createError(
RpcErrorType.PERMISSION_DENIED,
`没有调用权限: ${request.methodName}`
);
}
// 5. 速率限制检查
if (this.config.enableRateLimit && !this.checkRateLimit(metadata, request.senderId)) {
this.emit('rateLimitExceeded', request.methodName, request.senderId);
throw this.createError(
RpcErrorType.RATE_LIMITED,
`调用频率超限: ${request.methodName}`
);
}
// 6. 执行方法调用
const handler = this.metadataManager.getMethodHandler(request.methodName);
if (!handler) {
throw this.createError(
RpcErrorType.SERVER_ERROR,
`方法处理器不存在: ${request.methodName}`
);
}
// 创建带超时的Promise
const timeout = request.options.timeout || metadata.options.timeout || this.config.defaultTimeout;
const result = await this.executeWithTimeout(handler, request.args, timeout);
// 7. 创建成功响应
const endTime = Date.now();
const duration = endTime - startTime;
callStats.endTime = endTime;
callStats.duration = duration;
callStats.success = true;
const response: RpcCallResponse<R> = {
callId: request.callId,
success: true,
result: result as R,
timestamp: endTime,
duration
};
this.updateStats(callStats);
this.emit('callCompleted', request, response);
return response;
} catch (error) {
// 8. 处理错误
const endTime = Date.now();
const duration = endTime - startTime;
const rpcError = error instanceof Error && 'type' in error
? error as RpcError
: this.createError(RpcErrorType.SERVER_ERROR, String(error));
callStats.endTime = endTime;
callStats.duration = duration;
callStats.error = rpcError;
const response: RpcCallResponse<R> = {
callId: request.callId,
success: false,
error: rpcError,
timestamp: endTime,
duration
};
this.updateStats(callStats);
this.emit('callFailed', request, rpcError);
return response;
} finally {
// 9. 清理
this.activeCalls.delete(request.callId);
this.stats.pendingCalls--;
this.addToHistory(callStats);
}
}
/**
* 获取统计信息
*/
public getStats(): RpcStats {
this.stats.lastUpdated = Date.now();
return { ...this.stats };
}
/**
* 获取当前活跃调用数
*/
public getActiveCalls(): number {
return this.activeCalls.size;
}
/**
* 获取方法调用历史
*/
public getCallHistory(methodName?: string, limit: number = 100): CallStats[] {
let history = [...this.callHistory];
if (methodName) {
// 这里需要扩展CallStats接口来包含methodName
// 暂时返回所有历史
}
return history.slice(-limit);
}
/**
* 重置统计信息
*/
public resetStats(): void {
this.stats = {
totalCalls: 0,
successfulCalls: 0,
failedCalls: 0,
averageResponseTime: 0,
pendingCalls: this.activeCalls.size,
timeoutCalls: 0,
retryCount: 0,
lastUpdated: Date.now()
};
this.callHistory.length = 0;
this.rateLimiters.clear();
}
/**
* 更新配置
*/
public updateConfig(newConfig: Partial<RpcCallHandlerConfig>): void {
Object.assign(this.config, newConfig);
}
/**
* 销毁处理器
*/
public destroy(): void {
this.activeCalls.clear();
this.rateLimiters.clear();
this.callHistory.length = 0;
this.removeAllListeners();
}
/**
* 检查权限
*/
private checkPermission(metadata: RpcMethodMetadata, senderId: string): boolean {
// 如果方法不需要认证,直接通过
if (!metadata.options.requireAuth) {
return true;
}
// 使用自定义权限检查器
if (this.permissionChecker) {
return this.permissionChecker(metadata.methodName, senderId);
}
// 默认:需要认证但没有检查器,拒绝访问
return false;
}
/**
* 检查速率限制
*/
private checkRateLimit(metadata: RpcMethodMetadata, senderId: string): boolean {
const rateLimit = metadata.options.rateLimit;
if (!rateLimit) {
return true;
}
const key = `${senderId}:${metadata.methodName}`;
let limiter = this.rateLimiters.get(key);
if (!limiter) {
limiter = {
calls: [],
limit: rateLimit,
window: 60000 // 1分钟窗口
};
this.rateLimiters.set(key, limiter);
}
const now = Date.now();
// 清理过期的调用记录
limiter.calls = limiter.calls.filter(time => now - time < limiter.window);
// 检查是否超限
if (limiter.calls.length >= limiter.limit) {
return false;
}
// 记录本次调用
limiter.calls.push(now);
return true;
}
/**
* 执行带超时的方法调用
*/
private async executeWithTimeout(
handler: Function,
args: readonly unknown[],
timeout: number
): Promise<unknown> {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
reject(this.createError(
RpcErrorType.TIMEOUT,
`方法调用超时: ${timeout}ms`
));
}, timeout);
Promise.resolve(handler(...args))
.then(result => {
clearTimeout(timer);
resolve(result);
})
.catch(error => {
clearTimeout(timer);
reject(error);
});
});
}
/**
* 创建RPC错误
*/
private createError(type: RpcErrorType, message: string, code?: number): RpcError {
return {
type,
message,
code
};
}
/**
* 更新统计信息
*/
private updateStats(callStats: CallStats): void {
this.stats.totalCalls++;
if (callStats.success) {
this.stats.successfulCalls++;
} else {
this.stats.failedCalls++;
if (callStats.error?.type === RpcErrorType.TIMEOUT) {
this.stats.timeoutCalls++;
}
}
// 更新平均响应时间
if (callStats.duration !== undefined) {
const totalTime = this.stats.averageResponseTime * (this.stats.totalCalls - 1) + callStats.duration;
this.stats.averageResponseTime = totalTime / this.stats.totalCalls;
}
}
/**
* 添加到历史记录
*/
private addToHistory(callStats: CallStats): void {
if (!this.config.enablePerformanceMonitoring) {
return;
}
this.callHistory.push(callStats);
// 清理过期的历史记录
const cutoffTime = Date.now() - this.config.statsRetentionTime;
this.callHistory = this.callHistory.filter(stats => stats.startTime > cutoffTime);
// 限制历史记录数量
if (this.callHistory.length > 10000) {
this.callHistory = this.callHistory.slice(-5000);
}
}
}

View File

@@ -0,0 +1,504 @@
import { createLogger } from '@esengine/ecs-framework';
import { EventEmitter } from '../utils/EventEmitter';
import {
RpcCallRequest,
RpcCallResponse,
RpcError,
RpcErrorType,
RpcCallInfo,
RpcCallStatus,
RpcStats,
RpcOptions,
ClientRpcInvoker,
ServerRpcInvoker
} from '../types/RpcTypes';
import { MessageType, RpcTarget } from '../types/NetworkTypes';
/**
* 网络发送器接口
*/
export interface NetworkSender {
sendMessage(message: object): Promise<void>;
}
/**
* RPC调用代理事件
*/
export interface RpcCallProxyEvents {
callSent: (request: RpcCallRequest) => void;
responseReceived: (response: RpcCallResponse) => void;
callTimeout: (callId: string) => void;
callFailed: (callId: string, error: RpcError) => void;
retryAttempt: (callId: string, attempt: number) => void;
}
/**
* RPC调用代理配置
*/
export interface RpcCallProxyConfig {
/** 默认超时时间(毫秒) */
defaultTimeout: number;
/** 最大重试次数 */
maxRetries: number;
/** 重试延迟基数(毫秒) */
retryDelayBase: number;
/** 重试延迟倍数 */
retryDelayMultiplier: number;
/** 最大重试延迟(毫秒) */
maxRetryDelay: number;
/** 是否启用离线队列 */
enableOfflineQueue: boolean;
/** 离线队列最大大小 */
maxOfflineQueueSize: number;
/** 调用ID生成器 */
generateCallId?: () => string;
}
/**
* RPC调用代理
* 负责发送RPC调用并处理响应
*/
export class RpcCallProxy extends EventEmitter {
private logger = createLogger('RpcCallProxy');
private config: RpcCallProxyConfig;
private networkSender: NetworkSender;
/** 待处理的调用 */
private pendingCalls = new Map<string, RpcCallInfo>();
/** 离线队列 */
private offlineQueue: RpcCallRequest[] = [];
/** 是否在线 */
private isOnline = true;
/** 统计信息 */
private stats: RpcStats = {
totalCalls: 0,
successfulCalls: 0,
failedCalls: 0,
averageResponseTime: 0,
pendingCalls: 0,
timeoutCalls: 0,
retryCount: 0,
lastUpdated: Date.now()
};
/** 重试定时器 */
private retryTimers = new Map<string, ReturnType<typeof setTimeout>>();
constructor(
networkSender: NetworkSender,
config: Partial<RpcCallProxyConfig> = {}
) {
super();
this.networkSender = networkSender;
this.config = {
defaultTimeout: 30000,
maxRetries: 3,
retryDelayBase: 1000,
retryDelayMultiplier: 2,
maxRetryDelay: 10000,
enableOfflineQueue: true,
maxOfflineQueueSize: 100,
generateCallId: () => `rpc_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
...config
};
}
/**
* 客户端RPC调用器
*/
public get clientRpc(): ClientRpcInvoker {
return <TArgs extends readonly unknown[], TReturn>(
methodName: string,
args: TArgs,
options?: Partial<RpcOptions>
): Promise<TReturn> => {
return this.call(methodName, args, options);
};
}
/**
* 服务端RPC调用器用于服务端调用客户端
*/
public get serverRpc(): ServerRpcInvoker {
return <TArgs extends readonly unknown[], TReturn>(
clientId: string,
methodName: string,
args: TArgs,
options?: Partial<RpcOptions>
): Promise<TReturn> => {
const callOptions = {
...options,
target: RpcTarget.Client
};
return this.call(methodName, args, callOptions, clientId);
};
}
/**
* 发起RPC调用
*/
public async call<TArgs extends readonly unknown[], TReturn>(
methodName: string,
args: TArgs,
options: Partial<RpcOptions> = {},
targetId?: string
): Promise<TReturn> {
const callId = this.config.generateCallId!();
const timeout = options.timeout || this.config.defaultTimeout;
const request: RpcCallRequest<TArgs> = {
callId,
methodName,
args,
senderId: 'client', // 这应该从认证系统获取
targetId,
timestamp: Date.now(),
options: {
reliable: true,
priority: 5,
timeout,
...options
}
};
// 创建Promise和调用信息
return new Promise<TReturn>((resolve, reject) => {
const callInfo: RpcCallInfo<TArgs> = {
request,
status: RpcCallStatus.PENDING,
resolve: resolve as (value: unknown) => void,
reject: (reason: RpcError) => reject(reason),
retryCount: 0,
createdAt: Date.now()
};
this.pendingCalls.set(callId, callInfo);
this.stats.pendingCalls++;
this.stats.totalCalls++;
// 设置超时
setTimeout(() => {
this.handleTimeout(callId);
}, timeout);
// 发送调用
this.sendCall(callInfo);
});
}
/**
* 处理RPC响应
*/
public handleResponse(response: RpcCallResponse): void {
const callInfo = this.pendingCalls.get(response.callId);
if (!callInfo) {
this.logger.warn(`收到未知调用的响应: ${response.callId}`);
return;
}
// 清理定时器
const timer = this.retryTimers.get(response.callId);
if (timer) {
clearTimeout(timer);
this.retryTimers.delete(response.callId);
}
// 更新状态
callInfo.status = response.success ? RpcCallStatus.COMPLETED : RpcCallStatus.FAILED;
callInfo.completedAt = Date.now();
// 更新统计
if (response.success) {
this.stats.successfulCalls++;
this.updateAverageResponseTime(response.duration);
} else {
this.stats.failedCalls++;
}
this.stats.pendingCalls--;
// 处理结果
if (response.success) {
callInfo.resolve!(response.result);
} else {
callInfo.reject!(response.error!);
this.emit('callFailed', response.callId, response.error!);
}
// 清理
this.pendingCalls.delete(response.callId);
this.emit('responseReceived', response);
}
/**
* 设置网络状态
*/
public setOnlineStatus(online: boolean): void {
const wasOnline = this.isOnline;
this.isOnline = online;
if (online && !wasOnline) {
// 从离线状态恢复,处理离线队列
this.processOfflineQueue();
}
}
/**
* 取消RPC调用
*/
public cancelCall(callId: string): boolean {
const callInfo = this.pendingCalls.get(callId);
if (!callInfo) {
return false;
}
// 清理定时器
const timer = this.retryTimers.get(callId);
if (timer) {
clearTimeout(timer);
this.retryTimers.delete(callId);
}
// 更新状态
callInfo.status = RpcCallStatus.CANCELLED;
// 拒绝Promise
callInfo.reject!({
type: RpcErrorType.CLIENT_ERROR,
message: '调用被取消'
});
// 清理
this.pendingCalls.delete(callId);
this.stats.pendingCalls--;
return true;
}
/**
* 获取统计信息
*/
public getStats(): RpcStats {
this.stats.lastUpdated = Date.now();
return { ...this.stats };
}
/**
* 获取待处理的调用
*/
public getPendingCalls(): RpcCallInfo[] {
return Array.from(this.pendingCalls.values());
}
/**
* 重置统计信息
*/
public resetStats(): void {
this.stats = {
totalCalls: 0,
successfulCalls: 0,
failedCalls: 0,
averageResponseTime: 0,
pendingCalls: this.pendingCalls.size,
timeoutCalls: 0,
retryCount: 0,
lastUpdated: Date.now()
};
}
/**
* 更新配置
*/
public updateConfig(newConfig: Partial<RpcCallProxyConfig>): void {
Object.assign(this.config, newConfig);
}
/**
* 销毁代理
*/
public destroy(): void {
// 取消所有待处理的调用
const pendingCallIds = Array.from(this.pendingCalls.keys());
for (const callId of pendingCallIds) {
this.cancelCall(callId);
}
// 清理定时器
for (const timer of this.retryTimers.values()) {
clearTimeout(timer);
}
this.retryTimers.clear();
// 清理队列
this.offlineQueue.length = 0;
this.removeAllListeners();
}
/**
* 发送调用
*/
private async sendCall<T extends readonly unknown[]>(callInfo: RpcCallInfo<T>): Promise<void> {
try {
// 检查网络状态
if (!this.isOnline) {
if (this.config.enableOfflineQueue) {
this.addToOfflineQueue(callInfo.request);
} else {
throw new Error('网络不可用');
}
return;
}
// 构建网络消息
const message = {
type: MessageType.RPC_CALL,
messageId: callInfo.request.callId,
timestamp: Date.now(),
senderId: callInfo.request.senderId,
data: callInfo.request,
reliable: callInfo.request.options.reliable,
priority: callInfo.request.options.priority
};
// 发送消息
await this.networkSender.sendMessage(message);
callInfo.status = RpcCallStatus.SENT;
callInfo.sentAt = Date.now();
this.emit('callSent', callInfo.request);
} catch (error) {
this.logger.error(`发送RPC调用失败: ${callInfo.request.methodName}`, error);
// 检查是否可以重试
if (callInfo.retryCount < this.config.maxRetries) {
this.scheduleRetry(callInfo);
} else {
this.handleCallFailure(callInfo, {
type: RpcErrorType.NETWORK_ERROR,
message: String(error)
});
}
}
}
/**
* 处理超时
*/
private handleTimeout(callId: string): void {
const callInfo = this.pendingCalls.get(callId);
if (!callInfo || callInfo.status === RpcCallStatus.COMPLETED) {
return;
}
// 检查是否可以重试
if (callInfo.retryCount < this.config.maxRetries) {
this.scheduleRetry(callInfo);
} else {
this.stats.timeoutCalls++;
this.handleCallFailure(callInfo, {
type: RpcErrorType.TIMEOUT,
message: `调用超时: ${callInfo.request.options.timeout}ms`
});
this.emit('callTimeout', callId);
}
}
/**
* 安排重试
*/
private scheduleRetry<T extends readonly unknown[]>(callInfo: RpcCallInfo<T>): void {
callInfo.retryCount++;
this.stats.retryCount++;
// 计算延迟时间(指数退避)
const baseDelay = this.config.retryDelayBase * Math.pow(this.config.retryDelayMultiplier, callInfo.retryCount - 1);
const delay = Math.min(baseDelay, this.config.maxRetryDelay);
callInfo.nextRetryTime = Date.now() + delay;
this.emit('retryAttempt', callInfo.request.callId, callInfo.retryCount);
const timer = setTimeout(() => {
this.retryTimers.delete(callInfo.request.callId);
this.sendCall(callInfo);
}, delay);
this.retryTimers.set(callInfo.request.callId, timer);
}
/**
* 处理调用失败
*/
private handleCallFailure<T extends readonly unknown[]>(callInfo: RpcCallInfo<T>, error: RpcError): void {
callInfo.status = RpcCallStatus.FAILED;
callInfo.completedAt = Date.now();
callInfo.reject!(error);
this.pendingCalls.delete(callInfo.request.callId);
this.stats.pendingCalls--;
this.stats.failedCalls++;
this.emit('callFailed', callInfo.request.callId, error);
}
/**
* 添加到离线队列
*/
private addToOfflineQueue<T extends readonly unknown[]>(request: RpcCallRequest<T>): void {
if (this.offlineQueue.length >= this.config.maxOfflineQueueSize) {
// 移除最旧的请求
this.offlineQueue.shift();
}
this.offlineQueue.push(request);
}
/**
* 处理离线队列
*/
private async processOfflineQueue(): Promise<void> {
if (!this.isOnline || this.offlineQueue.length === 0) {
return;
}
const queue = [...this.offlineQueue];
this.offlineQueue.length = 0;
for (const request of queue) {
try {
// 重新创建调用信息
const callInfo: RpcCallInfo = {
request,
status: RpcCallStatus.PENDING,
retryCount: 0,
createdAt: Date.now()
};
this.pendingCalls.set(request.callId, callInfo);
await this.sendCall(callInfo);
} catch (error) {
this.logger.error(`处理离线队列失败: ${request.methodName}`, error);
}
}
}
/**
* 更新平均响应时间
*/
private updateAverageResponseTime(responseTime: number): void {
const totalResponses = this.stats.successfulCalls;
const currentAverage = this.stats.averageResponseTime;
this.stats.averageResponseTime =
(currentAverage * (totalResponses - 1) + responseTime) / totalResponses;
}
}

View File

@@ -0,0 +1,345 @@
import { createLogger } from '@esengine/ecs-framework';
import { EventEmitter } from '../utils/EventEmitter';
import { RpcMethodMetadata, RpcMethodRegistry } from '../types/RpcTypes';
import { getRpcMethods, RpcMethodValidator } from '../decorators/RpcDecorators';
/**
* RPC元数据管理器事件
*/
export interface RpcMetadataManagerEvents {
methodRegistered: (metadata: RpcMethodMetadata) => void;
methodUnregistered: (methodName: string) => void;
classRegistered: (className: string, methodCount: number) => void;
classUnregistered: (className: string) => void;
}
/**
* RPC元数据管理器
* 负责管理所有RPC方法的元数据和注册信息
*/
export class RpcMetadataManager extends EventEmitter {
private logger = createLogger('RpcMetadataManager');
/** 方法注册表 */
private registry: RpcMethodRegistry = new Map();
/** 类到方法的映射 */
private classMethods = new Map<string, Set<string>>();
/** 方法名到类的映射 */
private methodToClass = new Map<string, string>();
/** 实例缓存 */
private instances = new Map<string, object>();
/**
* 注册RPC类
*/
public registerClass(instance: object): void {
const className = instance.constructor.name;
try {
const rpcMethods = getRpcMethods(instance.constructor as Function);
if (rpcMethods.length === 0) {
this.logger.warn(`${className} 没有RPC方法`);
return;
}
// 验证所有方法定义
for (const metadata of rpcMethods) {
const validation = RpcMethodValidator.validateMethodDefinition(metadata);
if (!validation.valid) {
throw new Error(`${className}.${metadata.methodName}: ${validation.error}`);
}
}
// 注册方法
const methodNames = new Set<string>();
for (const metadata of rpcMethods) {
const fullMethodName = `${className}.${metadata.methodName}`;
// 检查方法是否已存在
if (this.registry.has(fullMethodName)) {
throw new Error(`RPC方法已存在: ${fullMethodName}`);
}
// 获取实际方法处理器
const handler = (instance as Record<string, unknown>)[metadata.methodName];
if (typeof handler !== 'function') {
throw new Error(`方法不存在或不是函数: ${fullMethodName}`);
}
// 注册方法
this.registry.set(fullMethodName, {
metadata,
handler: handler.bind(instance)
});
methodNames.add(metadata.methodName);
this.methodToClass.set(fullMethodName, className);
this.logger.debug(`已注册RPC方法: ${fullMethodName}`);
this.emit('methodRegistered', metadata);
}
// 更新类映射
this.classMethods.set(className, methodNames);
this.instances.set(className, instance);
this.logger.info(`已注册RPC类: ${className},方法数: ${methodNames.size}`);
this.emit('classRegistered', className, methodNames.size);
} catch (error) {
this.logger.error(`注册RPC类失败: ${className}`, error);
throw error;
}
}
/**
* 注销RPC类
*/
public unregisterClass(classNameOrInstance: string | object): void {
const className = typeof classNameOrInstance === 'string'
? classNameOrInstance
: classNameOrInstance.constructor.name;
const methodNames = this.classMethods.get(className);
if (!methodNames) {
this.logger.warn(`RPC类未注册: ${className}`);
return;
}
// 移除所有方法
for (const methodName of methodNames) {
const fullMethodName = `${className}.${methodName}`;
this.registry.delete(fullMethodName);
this.methodToClass.delete(fullMethodName);
this.emit('methodUnregistered', fullMethodName);
}
// 清理映射
this.classMethods.delete(className);
this.instances.delete(className);
this.logger.info(`已注销RPC类: ${className}`);
this.emit('classUnregistered', className);
}
/**
* 获取RPC方法元数据
*/
public getMethodMetadata(methodName: string): RpcMethodMetadata | undefined {
const entry = this.registry.get(methodName);
return entry?.metadata;
}
/**
* 获取RPC方法处理器
*/
public getMethodHandler(methodName: string): Function | undefined {
const entry = this.registry.get(methodName);
return entry?.handler;
}
/**
* 检查方法是否存在
*/
public hasMethod(methodName: string): boolean {
return this.registry.has(methodName);
}
/**
* 获取所有服务端RPC方法
*/
public getServerRpcMethods(): RpcMethodMetadata[] {
const methods: RpcMethodMetadata[] = [];
for (const [, entry] of this.registry) {
if (entry.metadata.isServerRpc) {
methods.push(entry.metadata);
}
}
return methods;
}
/**
* 获取所有客户端RPC方法
*/
public getClientRpcMethods(): RpcMethodMetadata[] {
const methods: RpcMethodMetadata[] = [];
for (const [, entry] of this.registry) {
if (!entry.metadata.isServerRpc) {
methods.push(entry.metadata);
}
}
return methods;
}
/**
* 获取类的所有RPC方法
*/
public getClassMethods(className: string): RpcMethodMetadata[] {
const methodNames = this.classMethods.get(className);
if (!methodNames) {
return [];
}
const methods: RpcMethodMetadata[] = [];
for (const methodName of methodNames) {
const fullMethodName = `${className}.${methodName}`;
const entry = this.registry.get(fullMethodName);
if (entry) {
methods.push(entry.metadata);
}
}
return methods;
}
/**
* 获取已注册的类名列表
*/
public getRegisteredClasses(): string[] {
return Array.from(this.classMethods.keys());
}
/**
* 获取所有已注册的方法名
*/
public getAllMethodNames(): string[] {
return Array.from(this.registry.keys());
}
/**
* 根据方法名获取所属类
*/
public getMethodClass(methodName: string): string | undefined {
return this.methodToClass.get(methodName);
}
/**
* 获取类实例
*/
public getClassInstance(className: string): object | undefined {
return this.instances.get(className);
}
/**
* 获取注册统计信息
*/
public getStats(): {
totalMethods: number;
serverRpcMethods: number;
clientRpcMethods: number;
registeredClasses: number;
} {
let serverRpcCount = 0;
let clientRpcCount = 0;
for (const [, entry] of this.registry) {
if (entry.metadata.isServerRpc) {
serverRpcCount++;
} else {
clientRpcCount++;
}
}
return {
totalMethods: this.registry.size,
serverRpcMethods: serverRpcCount,
clientRpcMethods: clientRpcCount,
registeredClasses: this.classMethods.size
};
}
/**
* 验证方法调用
*/
public validateMethodCall(
methodName: string,
args: unknown[],
callerId?: string
): { valid: boolean; error?: string } {
const metadata = this.getMethodMetadata(methodName);
if (!metadata) {
return {
valid: false,
error: `RPC方法不存在: ${methodName}`
};
}
return RpcMethodValidator.validateCall(metadata, args, callerId);
}
/**
* 搜索方法
*/
public searchMethods(query: {
className?: string;
isServerRpc?: boolean;
requireAuth?: boolean;
target?: string;
}): RpcMethodMetadata[] {
const results: RpcMethodMetadata[] = [];
for (const [methodName, entry] of this.registry) {
const metadata = entry.metadata;
// 类名过滤
if (query.className && metadata.className !== query.className) {
continue;
}
// RPC类型过滤
if (query.isServerRpc !== undefined && metadata.isServerRpc !== query.isServerRpc) {
continue;
}
// 认证要求过滤
if (query.requireAuth !== undefined && metadata.options.requireAuth !== query.requireAuth) {
continue;
}
// 目标过滤
if (query.target && metadata.options.target !== query.target) {
continue;
}
results.push(metadata);
}
return results;
}
/**
* 清空所有注册
*/
public clear(): void {
const classNames = Array.from(this.classMethods.keys());
for (const className of classNames) {
this.unregisterClass(className);
}
this.registry.clear();
this.classMethods.clear();
this.methodToClass.clear();
this.instances.clear();
this.logger.info('已清空所有RPC注册');
}
/**
* 销毁管理器
*/
public destroy(): void {
this.clear();
this.removeAllListeners();
}
}

View File

@@ -0,0 +1,552 @@
import { createLogger } from '@esengine/ecs-framework';
import { EventEmitter } from '../utils/EventEmitter';
import { RpcCallRequest, RpcCallResponse, RpcError, RpcErrorType } from '../types/RpcTypes';
/**
* 重复调用记录
*/
interface DuplicateCallRecord {
callId: string;
methodName: string;
senderId: string;
firstCallTime: number;
lastCallTime: number;
callCount: number;
response?: RpcCallResponse;
}
/**
* 幂等性配置
*/
interface IdempotencyConfig {
/** 是否启用幂等性检查 */
enabled: boolean;
/** 记录保留时间(毫秒) */
recordRetentionTime: number;
/** 最大记录数量 */
maxRecords: number;
/** 检查窗口时间(毫秒) */
checkWindowTime: number;
}
/**
* 顺序执行配置
*/
interface OrderedExecutionConfig {
/** 是否启用顺序执行 */
enabled: boolean;
/** 最大等待时间(毫秒) */
maxWaitTime: number;
/** 队列最大大小 */
maxQueueSize: number;
}
/**
* 事务配置
*/
interface TransactionConfig {
/** 是否启用事务支持 */
enabled: boolean;
/** 事务超时时间(毫秒) */
transactionTimeout: number;
/** 最大事务数量 */
maxTransactions: number;
}
/**
* RPC可靠性管理器配置
*/
export interface RpcReliabilityConfig {
/** 幂等性配置 */
idempotency: IdempotencyConfig;
/** 顺序执行配置 */
orderedExecution: OrderedExecutionConfig;
/** 事务配置 */
transaction: TransactionConfig;
}
/**
* 事务信息
*/
interface TransactionInfo {
transactionId: string;
calls: RpcCallRequest[];
startTime: number;
status: 'pending' | 'committed' | 'rolledback';
rollbackActions: Array<() => Promise<void>>;
}
/**
* 顺序执行队列项
*/
interface OrderedQueueItem {
request: RpcCallRequest;
handler: () => Promise<RpcCallResponse>;
resolve: (response: RpcCallResponse) => void;
reject: (error: RpcError) => void;
enqueuedAt: number;
}
/**
* RPC可靠性管理器事件
*/
export interface RpcReliabilityManagerEvents {
duplicateCallDetected: (record: DuplicateCallRecord) => void;
transactionStarted: (transactionId: string) => void;
transactionCommitted: (transactionId: string) => void;
transactionRolledback: (transactionId: string, reason: string) => void;
orderedCallQueued: (callId: string, queueSize: number) => void;
orderedCallProcessed: (callId: string, waitTime: number) => void;
}
/**
* RPC可靠性管理器
* 提供重复检测、幂等性、顺序执行和事务支持
*/
export class RpcReliabilityManager extends EventEmitter {
private logger = createLogger('RpcReliabilityManager');
private config: RpcReliabilityConfig;
/** 重复调用记录 */
private duplicateRecords = new Map<string, DuplicateCallRecord>();
/** 活跃事务 */
private transactions = new Map<string, TransactionInfo>();
/** 顺序执行队列(按发送者分组) */
private orderedQueues = new Map<string, OrderedQueueItem[]>();
/** 正在处理的有序调用 */
private processingOrdered = new Set<string>();
/** 清理定时器 */
private cleanupTimer: ReturnType<typeof setInterval> | null = null;
constructor(config: Partial<RpcReliabilityConfig> = {}) {
super();
this.config = {
idempotency: {
enabled: true,
recordRetentionTime: 300000, // 5分钟
maxRecords: 10000,
checkWindowTime: 60000, // 1分钟
...config.idempotency
},
orderedExecution: {
enabled: false,
maxWaitTime: 30000,
maxQueueSize: 1000,
...config.orderedExecution
},
transaction: {
enabled: false,
transactionTimeout: 60000,
maxTransactions: 100,
...config.transaction
}
};
this.startCleanupTimer();
}
/**
* 检查并处理重复调用
*/
public checkDuplicateCall(request: RpcCallRequest): {
isDuplicate: boolean;
response?: RpcCallResponse;
shouldProcess: boolean;
} {
if (!this.config.idempotency.enabled) {
return { isDuplicate: false, shouldProcess: true };
}
const key = `${request.senderId}:${request.callId}`;
const existing = this.duplicateRecords.get(key);
const now = Date.now();
if (existing) {
// 更新重复调用记录
existing.lastCallTime = now;
existing.callCount++;
this.emit('duplicateCallDetected', existing);
// 如果已有响应,直接返回
if (existing.response) {
return {
isDuplicate: true,
response: existing.response,
shouldProcess: false
};
}
// 如果在检查窗口内,认为是重复调用但还在处理中
if (now - existing.firstCallTime < this.config.idempotency.checkWindowTime) {
return {
isDuplicate: true,
shouldProcess: false
};
}
}
// 记录新的调用
const record: DuplicateCallRecord = {
callId: request.callId,
methodName: request.methodName,
senderId: request.senderId,
firstCallTime: now,
lastCallTime: now,
callCount: 1
};
this.duplicateRecords.set(key, record);
return { isDuplicate: false, shouldProcess: true };
}
/**
* 记录调用响应(用于幂等性)
*/
public recordCallResponse(request: RpcCallRequest, response: RpcCallResponse): void {
if (!this.config.idempotency.enabled) {
return;
}
const key = `${request.senderId}:${request.callId}`;
const record = this.duplicateRecords.get(key);
if (record) {
record.response = response;
}
}
/**
* 处理有序调用
*/
public async handleOrderedCall(
request: RpcCallRequest,
handler: () => Promise<RpcCallResponse>
): Promise<RpcCallResponse> {
if (!this.config.orderedExecution.enabled) {
return handler();
}
const senderId = request.senderId;
return new Promise<RpcCallResponse>((resolve, reject) => {
const queueItem: OrderedQueueItem = {
request,
handler,
resolve,
reject,
enqueuedAt: Date.now()
};
// 获取或创建队列
let queue = this.orderedQueues.get(senderId);
if (!queue) {
queue = [];
this.orderedQueues.set(senderId, queue);
}
// 检查队列大小
if (queue.length >= this.config.orderedExecution.maxQueueSize) {
reject({
type: RpcErrorType.RATE_LIMITED,
message: '有序执行队列已满'
});
return;
}
queue.push(queueItem);
this.emit('orderedCallQueued', request.callId, queue.length);
// 如果没有正在处理的调用,开始处理
if (!this.processingOrdered.has(senderId)) {
this.processOrderedQueue(senderId);
}
});
}
/**
* 开始事务
*/
public startTransaction(transactionId: string): void {
if (!this.config.transaction.enabled) {
throw new Error('事务功能未启用');
}
if (this.transactions.has(transactionId)) {
throw new Error(`事务已存在: ${transactionId}`);
}
if (this.transactions.size >= this.config.transaction.maxTransactions) {
throw new Error('超过最大事务数量限制');
}
const transaction: TransactionInfo = {
transactionId,
calls: [],
startTime: Date.now(),
status: 'pending',
rollbackActions: []
};
this.transactions.set(transactionId, transaction);
this.emit('transactionStarted', transactionId);
// 设置事务超时
setTimeout(() => {
if (this.transactions.has(transactionId)) {
this.rollbackTransaction(transactionId, '事务超时');
}
}, this.config.transaction.transactionTimeout);
}
/**
* 添加事务调用
*/
public addTransactionCall(
transactionId: string,
request: RpcCallRequest,
rollbackAction?: () => Promise<void>
): void {
const transaction = this.transactions.get(transactionId);
if (!transaction) {
throw new Error(`事务不存在: ${transactionId}`);
}
if (transaction.status !== 'pending') {
throw new Error(`事务状态无效: ${transaction.status}`);
}
transaction.calls.push(request);
if (rollbackAction) {
transaction.rollbackActions.push(rollbackAction);
}
}
/**
* 提交事务
*/
public async commitTransaction(transactionId: string): Promise<void> {
const transaction = this.transactions.get(transactionId);
if (!transaction) {
throw new Error(`事务不存在: ${transactionId}`);
}
if (transaction.status !== 'pending') {
throw new Error(`事务状态无效: ${transaction.status}`);
}
transaction.status = 'committed';
this.transactions.delete(transactionId);
this.emit('transactionCommitted', transactionId);
this.logger.info(`事务已提交: ${transactionId},包含 ${transaction.calls.length} 个调用`);
}
/**
* 回滚事务
*/
public async rollbackTransaction(transactionId: string, reason: string): Promise<void> {
const transaction = this.transactions.get(transactionId);
if (!transaction) {
throw new Error(`事务不存在: ${transactionId}`);
}
if (transaction.status !== 'pending') {
return; // 已经处理过
}
transaction.status = 'rolledback';
// 执行回滚操作
for (const rollbackAction of transaction.rollbackActions.reverse()) {
try {
await rollbackAction();
} catch (error) {
this.logger.error(`回滚操作失败: ${transactionId}`, error);
}
}
this.transactions.delete(transactionId);
this.emit('transactionRolledback', transactionId, reason);
this.logger.warn(`事务已回滚: ${transactionId},原因: ${reason}`);
}
/**
* 获取事务信息
*/
public getTransaction(transactionId: string): TransactionInfo | undefined {
return this.transactions.get(transactionId);
}
/**
* 获取统计信息
*/
public getStats(): {
duplicateRecords: number;
activeTransactions: number;
totalQueuedCalls: number;
processingQueues: number;
} {
let totalQueuedCalls = 0;
for (const queue of this.orderedQueues.values()) {
totalQueuedCalls += queue.length;
}
return {
duplicateRecords: this.duplicateRecords.size,
activeTransactions: this.transactions.size,
totalQueuedCalls,
processingQueues: this.processingOrdered.size
};
}
/**
* 更新配置
*/
public updateConfig(newConfig: Partial<RpcReliabilityConfig>): void {
if (newConfig.idempotency) {
Object.assign(this.config.idempotency, newConfig.idempotency);
}
if (newConfig.orderedExecution) {
Object.assign(this.config.orderedExecution, newConfig.orderedExecution);
}
if (newConfig.transaction) {
Object.assign(this.config.transaction, newConfig.transaction);
}
}
/**
* 销毁管理器
*/
public destroy(): void {
// 停止清理定时器
if (this.cleanupTimer) {
clearInterval(this.cleanupTimer);
this.cleanupTimer = null;
}
// 回滚所有活跃事务
const transactionIds = Array.from(this.transactions.keys());
for (const transactionId of transactionIds) {
this.rollbackTransaction(transactionId, '管理器销毁').catch(error => {
this.logger.error(`销毁时回滚事务失败: ${transactionId}`, error);
});
}
// 清理队列
for (const queue of this.orderedQueues.values()) {
for (const item of queue) {
item.reject({
type: RpcErrorType.CLIENT_ERROR,
message: '服务关闭'
});
}
}
this.duplicateRecords.clear();
this.transactions.clear();
this.orderedQueues.clear();
this.processingOrdered.clear();
this.removeAllListeners();
}
/**
* 处理有序队列
*/
private async processOrderedQueue(senderId: string): Promise<void> {
this.processingOrdered.add(senderId);
try {
const queue = this.orderedQueues.get(senderId);
if (!queue || queue.length === 0) {
return;
}
while (queue.length > 0) {
const item = queue.shift()!;
const waitTime = Date.now() - item.enqueuedAt;
// 检查等待时间是否超限
if (waitTime > this.config.orderedExecution.maxWaitTime) {
item.reject({
type: RpcErrorType.TIMEOUT,
message: `有序执行等待超时: ${waitTime}ms`
});
continue;
}
try {
const response = await item.handler();
item.resolve(response);
this.emit('orderedCallProcessed', item.request.callId, waitTime);
} catch (error) {
item.reject(error as RpcError);
}
}
} finally {
this.processingOrdered.delete(senderId);
// 如果队列还有新的项目,继续处理
const queue = this.orderedQueues.get(senderId);
if (queue && queue.length > 0) {
setImmediate(() => this.processOrderedQueue(senderId));
}
}
}
/**
* 开始清理定时器
*/
private startCleanupTimer(): void {
this.cleanupTimer = setInterval(() => {
this.cleanup();
}, 60000); // 每分钟清理一次
}
/**
* 清理过期数据
*/
private cleanup(): void {
const now = Date.now();
// 清理过期的重复调用记录
if (this.config.idempotency.enabled) {
for (const [key, record] of this.duplicateRecords) {
if (now - record.lastCallTime > this.config.idempotency.recordRetentionTime) {
this.duplicateRecords.delete(key);
}
}
// 限制记录数量
if (this.duplicateRecords.size > this.config.idempotency.maxRecords) {
const sortedRecords = Array.from(this.duplicateRecords.entries())
.sort(([,a], [,b]) => a.lastCallTime - b.lastCallTime);
const keepCount = Math.floor(this.config.idempotency.maxRecords * 0.8);
for (let i = 0; i < sortedRecords.length - keepCount; i++) {
this.duplicateRecords.delete(sortedRecords[i][0]);
}
}
}
// 清理空的有序队列
for (const [senderId, queue] of this.orderedQueues) {
if (queue.length === 0 && !this.processingOrdered.has(senderId)) {
this.orderedQueues.delete(senderId);
}
}
}
}