实现SyncVar装饰器和组件同步

This commit is contained in:
YHH
2025-08-20 10:16:54 +08:00
parent 364bc4cdab
commit 0a1d7ac083
21 changed files with 5956 additions and 342 deletions

View File

@@ -0,0 +1,328 @@
import { SyncMode, AuthorityType, NetworkScope } from '../types/NetworkTypes';
/**
* SyncVar支持的值类型
*/
export type SyncVarValue = string | number | boolean | object | null | undefined;
/**
* SyncVar配置选项
*/
export interface SyncVarOptions<T = SyncVarValue> {
/** 同步模式 */
mode?: SyncMode;
/** 同步频率(毫秒) */
syncRate?: number;
/** 权限类型 */
authority?: AuthorityType;
/** 网络作用域 */
scope?: NetworkScope;
/** 变化阈值,用于数值类型 */
threshold?: number;
/** 是否启用压缩 */
compression?: boolean;
/** 优先级(0-10数值越高优先级越高) */
priority?: number;
/** 是否启用插值 */
interpolation?: boolean;
/** 自定义序列化函数 */
customSerializer?: (value: T) => unknown;
/** 自定义反序列化函数 */
customDeserializer?: (value: unknown) => T;
/** 变化回调函数 */
onChanged?: (oldValue: T, newValue: T) => void;
}
/**
* SyncVar元数据
*/
export interface SyncVarMetadata<T = SyncVarValue> {
propertyKey: string | symbol;
options: Required<SyncVarOptions<T>>;
originalDescriptor?: PropertyDescriptor;
lastValue?: T;
lastSyncTime: number;
isDirty: boolean;
syncCount: number;
}
/**
* 存储SyncVar元数据的Symbol键
*/
export const SYNCVAR_METADATA_KEY = Symbol('SyncVarMetadata');
/**
* 默认SyncVar配置
*/
export const DEFAULT_SYNCVAR_OPTIONS: Required<SyncVarOptions<SyncVarValue>> = {
mode: SyncMode.All,
syncRate: 100,
authority: AuthorityType.Server,
scope: NetworkScope.Global,
threshold: 0.001,
compression: true,
priority: 5,
interpolation: false,
customSerializer: (value: SyncVarValue) => value,
customDeserializer: (value: unknown) => value as SyncVarValue,
onChanged: () => {}
};
/**
* SyncVar装饰器
* 用于标记需要网络同步的属性
*/
export function SyncVar<T extends SyncVarValue = SyncVarValue>(options: SyncVarOptions<T> = {}) {
return function (target: object, propertyKey: string | symbol) {
const fullOptions = { ...DEFAULT_SYNCVAR_OPTIONS, ...options } as Required<SyncVarOptions<T>>;
// 获取或创建元数据存储
if (!(target.constructor as any)[SYNCVAR_METADATA_KEY]) {
(target.constructor as any)[SYNCVAR_METADATA_KEY] = new Map();
}
const metadataMap = (target.constructor as any)[SYNCVAR_METADATA_KEY] as Map<string | symbol, SyncVarMetadata<T>>;
// 创建元数据
const metadata: SyncVarMetadata<T> = {
propertyKey,
options: fullOptions,
lastSyncTime: 0,
isDirty: false,
syncCount: 0
};
metadataMap.set(propertyKey, metadata);
// 获取原始属性描述符
const originalDescriptor = Object.getOwnPropertyDescriptor(target, propertyKey) || {
writable: true,
enumerable: true,
configurable: true
};
metadata.originalDescriptor = originalDescriptor;
// 创建内部存储属性名
const internalPropertyKey = Symbol(`_syncVar_${String(propertyKey)}`);
// 重新定义属性,添加变化检测
Object.defineProperty(target, propertyKey, {
get: function() {
return (this as any)[internalPropertyKey];
},
set: function(newValue: T) {
const oldValue = (this as any)[internalPropertyKey];
// 检查是否真的发生了变化
if (!hasValueChanged(oldValue, newValue, fullOptions.threshold)) {
return;
}
// 更新值
(this as any)[internalPropertyKey] = newValue;
// 标记为脏数据
markAsDirty(this, propertyKey);
// 触发变化回调
if (fullOptions.onChanged) {
fullOptions.onChanged(oldValue, newValue);
}
// 如果启用了自动同步,立即同步
if (fullOptions.syncRate === 0) {
requestImmediateSync(this, propertyKey);
}
},
enumerable: originalDescriptor.enumerable,
configurable: originalDescriptor.configurable
});
// 设置初始值
if (originalDescriptor.value !== undefined) {
(target as any)[internalPropertyKey] = originalDescriptor.value;
}
};
}
/**
* 检查值是否发生了变化
*/
function hasValueChanged(oldValue: SyncVarValue, newValue: SyncVarValue, threshold: number): boolean {
// 严格相等检查
if (oldValue === newValue) {
return false;
}
// null/undefined 检查
if (oldValue == null || newValue == null) {
return oldValue !== newValue;
}
// 数值类型的阈值检查
if (typeof oldValue === 'number' && typeof newValue === 'number') {
return Math.abs(oldValue - newValue) > threshold;
}
// 对象类型的深度比较
if (typeof oldValue === 'object' && typeof newValue === 'object') {
return !deepEqual(oldValue, newValue);
}
return true;
}
/**
* 深度比较两个对象是否相等
*/
function deepEqual(obj1: SyncVarValue, obj2: SyncVarValue): boolean {
if (obj1 === obj2) {
return true;
}
if (obj1 == null || obj2 == null) {
return obj1 === obj2;
}
if (typeof obj1 !== typeof obj2) {
return false;
}
if (typeof obj1 !== 'object') {
return obj1 === obj2;
}
const keys1 = Object.keys(obj1);
const keys2 = Object.keys(obj2);
if (keys1.length !== keys2.length) {
return false;
}
for (const key of keys1) {
if (!keys2.includes(key)) {
return false;
}
if (!deepEqual((obj1 as any)[key], (obj2 as any)[key])) {
return false;
}
}
return true;
}
/**
* 标记属性为脏数据
*/
function markAsDirty(instance: object, propertyKey: string | symbol): void {
const metadataMap = (instance.constructor as any)[SYNCVAR_METADATA_KEY] as Map<string | symbol, SyncVarMetadata>;
const metadata = metadataMap?.get(propertyKey);
if (metadata) {
metadata.isDirty = true;
metadata.lastValue = (instance as any)[propertyKey];
}
// 通知SyncVar管理器
if (typeof window !== 'undefined' && (window as any).SyncVarManager) {
(window as any).SyncVarManager.markInstanceDirty(instance);
}
}
/**
* 请求立即同步
*/
function requestImmediateSync(instance: object, propertyKey: string | symbol): void {
// 通知SyncVar管理器立即同步
if (typeof window !== 'undefined' && (window as any).SyncVarManager) {
(window as any).SyncVarManager.requestImmediateSync(instance, propertyKey);
}
}
/**
* 获取对象的所有SyncVar元数据
*/
export function getSyncVarMetadata(target: object): Map<string | symbol, SyncVarMetadata> {
if (!target || !target.constructor) {
return new Map();
}
return (target.constructor as any)[SYNCVAR_METADATA_KEY] || new Map();
}
/**
* 获取特定属性的SyncVar元数据
*/
export function getSyncVarPropertyMetadata(target: object, propertyKey: string | symbol): SyncVarMetadata | undefined {
const metadataMap = getSyncVarMetadata(target);
return metadataMap.get(propertyKey);
}
/**
* 检查对象是否有SyncVar属性
*/
export function hasSyncVars(target: object): boolean {
const metadataMap = getSyncVarMetadata(target);
return metadataMap.size > 0;
}
/**
* 获取对象的所有脏SyncVar属性
*/
export function getDirtySyncVars(target: object): Map<string | symbol, SyncVarMetadata> {
const metadataMap = getSyncVarMetadata(target);
const dirtyVars = new Map<string | symbol, SyncVarMetadata>();
for (const [key, metadata] of metadataMap) {
if (metadata.isDirty) {
dirtyVars.set(key, metadata);
}
}
return dirtyVars;
}
/**
* 清理所有脏标记
*/
export function clearDirtyFlags(target: object): void {
const metadataMap = getSyncVarMetadata(target);
for (const metadata of metadataMap.values()) {
metadata.isDirty = false;
metadata.lastSyncTime = Date.now();
metadata.syncCount++;
}
}
/**
* 重置SyncVar统计信息
*/
export function resetSyncVarStats(target: object): void {
const metadataMap = getSyncVarMetadata(target);
for (const metadata of metadataMap.values()) {
metadata.syncCount = 0;
metadata.lastSyncTime = 0;
}
}
/**
* 获取SyncVar统计信息
*/
export function getSyncVarStats(target: object): { [key: string]: { syncCount: number; lastSyncTime: number; isDirty: boolean } } {
const metadataMap = getSyncVarMetadata(target);
const stats: { [key: string]: { syncCount: number; lastSyncTime: number; isDirty: boolean } } = {};
for (const [key, metadata] of metadataMap) {
stats[String(key)] = {
syncCount: metadata.syncCount,
lastSyncTime: metadata.lastSyncTime,
isDirty: metadata.isDirty
};
}
return stats;
}

View File

@@ -0,0 +1,5 @@
/**
* 网络装饰器导出
*/
export * from './SyncVar';

View File

@@ -24,6 +24,32 @@ export * from './events/NetworkEvents';
// 序列化系统
export * from './serialization/JSONSerializer';
export * from './serialization/MessageCompressor';
export {
SyncVarSerializer,
SyncVarSerializerConfig,
SerializationResult as SyncVarSerializationResult,
DeserializationResult as SyncVarDeserializationResult,
DeltaData as SyncVarDeltaData,
CompressionMetadata
} from './serialization/SyncVarSerializer';
// 装饰器系统
export * from './decorators';
// 同步系统
export { SyncVarManager, SyncBatch } from './sync/SyncVarManager';
export {
DeltaSync,
DeltaSyncConfig,
DeltaData,
DeltaOperationType,
DeltaOperation,
VersionedData,
DeltaSyncStats
} from './sync/DeltaSync';
// 监控系统
export * from './monitoring';
// 工具类
export * from './utils';

View File

@@ -0,0 +1,541 @@
import { createLogger } from '@esengine/ecs-framework';
import { EventEmitter } from '../utils/EventEmitter';
/**
* 带宽监控配置
*/
export interface BandwidthMonitorConfig {
/** 监控间隔(毫秒) */
monitorInterval: number;
/** 采样窗口大小 */
sampleWindowSize: number;
/** 预警阈值(0-1) */
warningThreshold: number;
/** 严重阈值(0-1) */
criticalThreshold: number;
/** 是否启用自适应调整 */
enableAdaptive: boolean;
/** 自适应调整因子 */
adaptiveFactor: number;
}
/**
* 带宽样本
*/
export interface BandwidthSample {
timestamp: number;
bytesIn: number;
bytesOut: number;
packetsIn: number;
packetsOut: number;
latency: number;
}
/**
* 带宽统计
*/
export interface BandwidthStats {
/** 当前上行带宽(bytes/s) */
currentUpload: number;
/** 当前下行带宽(bytes/s) */
currentDownload: number;
/** 平均上行带宽(bytes/s) */
averageUpload: number;
/** 平均下行带宽(bytes/s) */
averageDownload: number;
/** 峰值上行带宽(bytes/s) */
peakUpload: number;
/** 峰值下行带宽(bytes/s) */
peakDownload: number;
/** 总上传字节数 */
totalUpload: number;
/** 总下载字节数 */
totalDownload: number;
/** 当前包速率(packets/s) */
currentPacketRate: number;
/** 平均延迟(ms) */
averageLatency: number;
/** 延迟抖动(ms) */
latencyJitter: number;
/** 利用率(0-1) */
utilization: number;
}
/**
* 带宽限制
*/
export interface BandwidthLimit {
/** 上行限制(bytes/s) */
uploadLimit: number;
/** 下行限制(bytes/s) */
downloadLimit: number;
/** 是否启用限制 */
enabled: boolean;
}
/**
* 带宽警告级别
*/
export enum BandwidthWarningLevel {
Normal = 'normal',
Warning = 'warning',
Critical = 'critical'
}
/**
* 带宽事件
*/
export interface BandwidthMonitorEvents {
bandwidthChanged: (stats: BandwidthStats) => void;
limitExceeded: (direction: 'upload' | 'download', current: number, limit: number) => void;
warningLevelChanged: (level: BandwidthWarningLevel, stats: BandwidthStats) => void;
adaptiveAdjustment: (oldLimits: BandwidthLimit, newLimits: BandwidthLimit) => void;
}
/**
* 带宽监控器
* 负责监控网络带宽使用情况并提供自适应调整
*/
export class BandwidthMonitor extends EventEmitter {
private logger = createLogger('BandwidthMonitor');
private config: BandwidthMonitorConfig;
/** 带宽样本历史 */
private samples: BandwidthSample[] = [];
/** 当前带宽限制 */
private limits: BandwidthLimit;
/** 当前警告级别 */
private currentWarningLevel = BandwidthWarningLevel.Normal;
/** 监控定时器 */
private monitorTimer: ReturnType<typeof setInterval> | null = null;
/** 统计信息 */
private stats: BandwidthStats = {
currentUpload: 0,
currentDownload: 0,
averageUpload: 0,
averageDownload: 0,
peakUpload: 0,
peakDownload: 0,
totalUpload: 0,
totalDownload: 0,
currentPacketRate: 0,
averageLatency: 0,
latencyJitter: 0,
utilization: 0
};
/** 上次统计时间 */
private lastStatsTime = Date.now();
/** 累计字节数 */
private cumulativeBytesIn = 0;
private cumulativeBytesOut = 0;
private cumulativePacketsIn = 0;
private cumulativePacketsOut = 0;
constructor(config: Partial<BandwidthMonitorConfig> = {}) {
super();
this.config = {
monitorInterval: 1000,
sampleWindowSize: 60,
warningThreshold: 0.8,
criticalThreshold: 0.95,
enableAdaptive: true,
adaptiveFactor: 0.1,
...config
};
this.limits = {
uploadLimit: 1024 * 1024, // 1MB/s
downloadLimit: 1024 * 1024, // 1MB/s
enabled: false
};
this.startMonitoring();
}
/**
* 记录网络活动
*/
public recordActivity(bytesIn: number, bytesOut: number, packetsIn: number = 0, packetsOut: number = 0, latency: number = 0): void {
this.cumulativeBytesIn += bytesIn;
this.cumulativeBytesOut += bytesOut;
this.cumulativePacketsIn += packetsIn;
this.cumulativePacketsOut += packetsOut;
this.stats.totalUpload += bytesOut;
this.stats.totalDownload += bytesIn;
}
/**
* 设置带宽限制
*/
public setBandwidthLimits(limits: Partial<BandwidthLimit>): void {
const oldLimits = { ...this.limits };
Object.assign(this.limits, limits);
this.logger.info(`带宽限制已更新: 上行=${this.limits.uploadLimit}B/s, 下行=${this.limits.downloadLimit}B/s`);
if (this.config.enableAdaptive) {
this.emit('adaptiveAdjustment', oldLimits, this.limits);
}
}
/**
* 获取当前统计信息
*/
public getStats(): BandwidthStats {
return { ...this.stats };
}
/**
* 获取当前限制
*/
public getLimits(): BandwidthLimit {
return { ...this.limits };
}
/**
* 获取当前警告级别
*/
public getWarningLevel(): BandwidthWarningLevel {
return this.currentWarningLevel;
}
/**
* 检查是否超过限制
*/
public isOverLimit(): { upload: boolean; download: boolean } {
if (!this.limits.enabled) {
return { upload: false, download: false };
}
return {
upload: this.stats.currentUpload > this.limits.uploadLimit,
download: this.stats.currentDownload > this.limits.downloadLimit
};
}
/**
* 获取建议的数据发送大小
*/
public getRecommendedSendSize(): number {
if (!this.limits.enabled) {
return Infinity;
}
const uploadUtilization = this.stats.currentUpload / this.limits.uploadLimit;
if (uploadUtilization < this.config.warningThreshold) {
return this.limits.uploadLimit * 0.1; // 10% of limit
} else if (uploadUtilization < this.config.criticalThreshold) {
return this.limits.uploadLimit * 0.05; // 5% of limit
} else {
return this.limits.uploadLimit * 0.01; // 1% of limit
}
}
/**
* 获取发送延迟建议
*/
public getRecommendedDelay(): number {
const utilization = Math.max(
this.stats.currentUpload / this.limits.uploadLimit,
this.stats.currentDownload / this.limits.downloadLimit
);
if (utilization < this.config.warningThreshold) {
return 0;
} else if (utilization < this.config.criticalThreshold) {
return 100; // 100ms delay
} else {
return 500; // 500ms delay
}
}
/**
* 重置统计信息
*/
public resetStats(): void {
this.stats = {
currentUpload: 0,
currentDownload: 0,
averageUpload: 0,
averageDownload: 0,
peakUpload: 0,
peakDownload: 0,
totalUpload: 0,
totalDownload: 0,
currentPacketRate: 0,
averageLatency: 0,
latencyJitter: 0,
utilization: 0
};
this.samples.length = 0;
this.cumulativeBytesIn = 0;
this.cumulativeBytesOut = 0;
this.cumulativePacketsIn = 0;
this.cumulativePacketsOut = 0;
this.lastStatsTime = Date.now();
}
/**
* 更新配置
*/
public updateConfig(newConfig: Partial<BandwidthMonitorConfig>): void {
Object.assign(this.config, newConfig);
if (newConfig.monitorInterval !== undefined) {
this.restartMonitoring();
}
}
/**
* 销毁监控器
*/
public destroy(): void {
this.stopMonitoring();
this.samples.length = 0;
this.removeAllListeners();
}
/**
* 开始监控
*/
private startMonitoring(): void {
if (this.monitorTimer) {
return;
}
this.monitorTimer = setInterval(() => {
this.updateStats();
}, this.config.monitorInterval);
}
/**
* 停止监控
*/
private stopMonitoring(): void {
if (this.monitorTimer) {
clearInterval(this.monitorTimer);
this.monitorTimer = null;
}
}
/**
* 重启监控
*/
private restartMonitoring(): void {
this.stopMonitoring();
this.startMonitoring();
}
/**
* 更新统计信息
*/
private updateStats(): void {
const now = Date.now();
const deltaTime = (now - this.lastStatsTime) / 1000; // 转换为秒
if (deltaTime <= 0) {
return;
}
// 计算当前速率
const currentUpload = this.cumulativeBytesOut / deltaTime;
const currentDownload = this.cumulativeBytesIn / deltaTime;
const currentPacketRate = (this.cumulativePacketsIn + this.cumulativePacketsOut) / deltaTime;
// 创建新样本
const sample: BandwidthSample = {
timestamp: now,
bytesIn: this.cumulativeBytesIn,
bytesOut: this.cumulativeBytesOut,
packetsIn: this.cumulativePacketsIn,
packetsOut: this.cumulativePacketsOut,
latency: 0 // 需要从外部提供
};
this.samples.push(sample);
// 限制样本数量
if (this.samples.length > this.config.sampleWindowSize) {
this.samples.shift();
}
// 更新统计信息
this.stats.currentUpload = currentUpload;
this.stats.currentDownload = currentDownload;
this.stats.currentPacketRate = currentPacketRate;
// 更新峰值
this.stats.peakUpload = Math.max(this.stats.peakUpload, currentUpload);
this.stats.peakDownload = Math.max(this.stats.peakDownload, currentDownload);
// 计算平均值
this.calculateAverages();
// 计算利用率
this.calculateUtilization();
// 检查限制
this.checkLimits();
// 检查警告级别
this.checkWarningLevel();
// 自适应调整
if (this.config.enableAdaptive) {
this.performAdaptiveAdjustment();
}
// 重置累计值
this.cumulativeBytesIn = 0;
this.cumulativeBytesOut = 0;
this.cumulativePacketsIn = 0;
this.cumulativePacketsOut = 0;
this.lastStatsTime = now;
// 发出事件
this.emit('bandwidthChanged', this.stats);
}
/**
* 计算平均值
*/
private calculateAverages(): void {
if (this.samples.length === 0) {
return;
}
let totalUpload = 0;
let totalDownload = 0;
let totalLatency = 0;
for (let i = 1; i < this.samples.length; i++) {
const prev = this.samples[i - 1];
const curr = this.samples[i];
const deltaTime = (curr.timestamp - prev.timestamp) / 1000;
if (deltaTime > 0) {
totalUpload += (curr.bytesOut - prev.bytesOut) / deltaTime;
totalDownload += (curr.bytesIn - prev.bytesIn) / deltaTime;
totalLatency += curr.latency;
}
}
const sampleCount = this.samples.length - 1;
if (sampleCount > 0) {
this.stats.averageUpload = totalUpload / sampleCount;
this.stats.averageDownload = totalDownload / sampleCount;
this.stats.averageLatency = totalLatency / this.samples.length;
}
// 计算延迟抖动
this.calculateLatencyJitter();
}
/**
* 计算延迟抖动
*/
private calculateLatencyJitter(): void {
if (this.samples.length < 2) {
return;
}
let jitterSum = 0;
let jitterCount = 0;
for (let i = 1; i < this.samples.length; i++) {
const diff = Math.abs(this.samples[i].latency - this.samples[i - 1].latency);
jitterSum += diff;
jitterCount++;
}
this.stats.latencyJitter = jitterCount > 0 ? jitterSum / jitterCount : 0;
}
/**
* 计算利用率
*/
private calculateUtilization(): void {
if (!this.limits.enabled) {
this.stats.utilization = 0;
return;
}
const uploadUtilization = this.stats.currentUpload / this.limits.uploadLimit;
const downloadUtilization = this.stats.currentDownload / this.limits.downloadLimit;
this.stats.utilization = Math.max(uploadUtilization, downloadUtilization);
}
/**
* 检查限制
*/
private checkLimits(): void {
if (!this.limits.enabled) {
return;
}
if (this.stats.currentUpload > this.limits.uploadLimit) {
this.emit('limitExceeded', 'upload', this.stats.currentUpload, this.limits.uploadLimit);
}
if (this.stats.currentDownload > this.limits.downloadLimit) {
this.emit('limitExceeded', 'download', this.stats.currentDownload, this.limits.downloadLimit);
}
}
/**
* 检查警告级别
*/
private checkWarningLevel(): void {
let newLevel = BandwidthWarningLevel.Normal;
if (this.stats.utilization >= this.config.criticalThreshold) {
newLevel = BandwidthWarningLevel.Critical;
} else if (this.stats.utilization >= this.config.warningThreshold) {
newLevel = BandwidthWarningLevel.Warning;
}
if (newLevel !== this.currentWarningLevel) {
this.currentWarningLevel = newLevel;
this.emit('warningLevelChanged', newLevel, this.stats);
}
}
/**
* 执行自适应调整
*/
private performAdaptiveAdjustment(): void {
if (!this.limits.enabled || this.stats.utilization < this.config.warningThreshold) {
return;
}
const oldLimits = { ...this.limits };
// 根据当前利用率动态调整限制
if (this.stats.utilization > this.config.criticalThreshold) {
// 严重超载,降低限制
this.limits.uploadLimit *= (1 - this.config.adaptiveFactor);
this.limits.downloadLimit *= (1 - this.config.adaptiveFactor);
} else if (this.stats.utilization < this.config.warningThreshold * 0.5) {
// 利用率较低,可以提高限制
this.limits.uploadLimit *= (1 + this.config.adaptiveFactor * 0.5);
this.limits.downloadLimit *= (1 + this.config.adaptiveFactor * 0.5);
}
// 检查是否有变化
if (this.limits.uploadLimit !== oldLimits.uploadLimit ||
this.limits.downloadLimit !== oldLimits.downloadLimit) {
this.emit('adaptiveAdjustment', oldLimits, this.limits);
}
}
}

View File

@@ -0,0 +1,5 @@
/**
* 监控模块导出
*/
export * from './BandwidthMonitor';

View File

@@ -1,56 +1,329 @@
/**
* 消息压缩器
* 提供多种压缩算法选择和压缩率统计
* 消息压缩器框架
* 提供可扩展的压缩算法接口,用户可以注册自定义压缩算法
*/
import { createLogger } from '@esengine/ecs-framework';
import * as zlib from 'zlib';
import { promisify } from 'util';
/**
* 压缩算法类型
* 压缩算法接口
*/
export enum CompressionAlgorithm {
NONE = 'none',
GZIP = 'gzip',
DEFLATE = 'deflate',
BROTLI = 'brotli'
export interface ICompressionAlgorithm {
/** 算法名称 */
readonly name: string;
/** 算法版本 */
readonly version: string;
/** 是否支持异步压缩 */
readonly supportsAsync: boolean;
/**
* 同步压缩
*/
compress(data: ArrayBuffer): ArrayBuffer;
/**
* 同步解压缩
*/
decompress(data: ArrayBuffer): ArrayBuffer;
/**
* 异步压缩(可选)
*/
compressAsync?(data: ArrayBuffer): Promise<ArrayBuffer>;
/**
* 异步解压缩(可选)
*/
decompressAsync?(data: ArrayBuffer): Promise<ArrayBuffer>;
/**
* 估算压缩后大小(可选)
*/
estimateCompressedSize?(data: ArrayBuffer): number;
}
/**
* 压缩配置
*/
export interface CompressionConfig {
algorithm: CompressionAlgorithm;
level: number; // 压缩级别 (0-9)
threshold: number; // 最小压缩阈值(字节)
enableAsync: boolean; // 是否启用异步压缩
chunkSize: number; // 分块大小
/** 默认压缩算法名称 */
defaultAlgorithm: string;
/** 最小压缩阈值(字节) */
threshold: number;
/** 是否启用异步压缩 */
enableAsync: boolean;
/** 压缩级别提示 (0-9) */
level: number;
/** 分块大小 */
chunkSize: number;
/** 是否启用压缩统计 */
enableStats: boolean;
}
/**
* 压缩结果
*/
export interface CompressionResult {
data: Buffer;
/** 压缩后的数据 */
data: ArrayBuffer;
/** 原始大小 */
originalSize: number;
/** 压缩后大小 */
compressedSize: number;
/** 压缩比 */
compressionRatio: number;
/** 压缩耗时 */
compressionTime: number;
algorithm: CompressionAlgorithm;
/** 使用的算法 */
algorithm: string;
/** 是否实际进行了压缩 */
wasCompressed: boolean;
}
/**
* 解压缩结果
*/
export interface DecompressionResult {
/** 解压缩后的数据 */
data: ArrayBuffer;
/** 原始压缩大小 */
compressedSize: number;
/** 解压缩后大小 */
decompressedSize: number;
/** 解压缩耗时 */
decompressionTime: number;
/** 使用的算法 */
algorithm: string;
}
/**
* 压缩统计信息
*/
export interface CompressionStats {
totalCompressed: number;
totalDecompressed: number;
/** 总压缩次数 */
totalCompressions: number;
/** 总解压缩次数 */
totalDecompressions: number;
/** 总原始字节数 */
totalOriginalBytes: number;
/** 总压缩字节数 */
totalCompressedBytes: number;
/** 平均压缩比 */
averageCompressionRatio: number;
/** 平均压缩时间 */
averageCompressionTime: number;
/** 平均解压缩时间 */
averageDecompressionTime: number;
algorithmUsage: Record<CompressionAlgorithm, number>;
/** 算法使用统计 */
algorithmUsage: Record<string, number>;
}
/**
* 无压缩算法实现(默认)
*/
export class NoCompressionAlgorithm implements ICompressionAlgorithm {
readonly name = 'none';
readonly version = '1.0.0';
readonly supportsAsync = false;
compress(data: ArrayBuffer): ArrayBuffer {
return data.slice(0);
}
decompress(data: ArrayBuffer): ArrayBuffer {
return data.slice(0);
}
estimateCompressedSize(data: ArrayBuffer): number {
return data.byteLength;
}
}
/**
* LZ字符串压缩算法实现
*/
export class LZCompressionAlgorithm implements ICompressionAlgorithm {
readonly name = 'lz-string';
readonly version = '1.0.0';
readonly supportsAsync = false;
compress(data: ArrayBuffer): ArrayBuffer {
// 将ArrayBuffer转换为字符串
const decoder = new TextDecoder();
const input = decoder.decode(data);
if (!input) {
return data.slice(0);
}
// LZ压缩算法
const dictionary: { [key: string]: number } = {};
let dictSize = 256;
// 初始化字典
for (let i = 0; i < 256; i++) {
dictionary[String.fromCharCode(i)] = i;
}
let w = '';
const result: number[] = [];
for (let i = 0; i < input.length; i++) {
const c = input.charAt(i);
const wc = w + c;
if (dictionary[wc] !== undefined) {
w = wc;
} else {
result.push(dictionary[w]);
dictionary[wc] = dictSize++;
w = c;
// 防止字典过大
if (dictSize >= 0xFFFF) {
dictSize = 256;
// 重置字典
for (const key in dictionary) {
if (dictionary[key] >= 256) {
delete dictionary[key];
}
}
}
}
}
if (w) {
result.push(dictionary[w]);
}
// 将结果转换为ArrayBuffer
return this.numbersToArrayBuffer(result);
}
decompress(data: ArrayBuffer): ArrayBuffer {
if (data.byteLength === 0) {
return data.slice(0);
}
const numbers = this.arrayBufferToNumbers(data);
if (numbers.length === 0) {
return data.slice(0);
}
const dictionary: { [key: number]: string } = {};
let dictSize = 256;
// 初始化字典
for (let i = 0; i < 256; i++) {
dictionary[i] = String.fromCharCode(i);
}
let w = String.fromCharCode(numbers[0]);
const result = [w];
for (let i = 1; i < numbers.length; i++) {
const k = numbers[i];
let entry: string;
if (dictionary[k] !== undefined) {
entry = dictionary[k];
} else if (k === dictSize) {
entry = w + w.charAt(0);
} else {
throw new Error('LZ解压缩错误无效的压缩数据');
}
result.push(entry);
dictionary[dictSize++] = w + entry.charAt(0);
w = entry;
// 防止字典过大
if (dictSize >= 0xFFFF) {
dictSize = 256;
// 重置字典
for (const key in dictionary) {
if (parseInt(key) >= 256) {
delete dictionary[key];
}
}
}
}
// 将结果转换为ArrayBuffer
const output = result.join('');
const encoder = new TextEncoder();
return encoder.encode(output).buffer;
}
estimateCompressedSize(data: ArrayBuffer): number {
// 简单估算假设压缩率在30%-70%之间
const size = data.byteLength;
return Math.floor(size * 0.5); // 50%的估算压缩率
}
/**
* 将数字数组转换为ArrayBuffer
*/
private numbersToArrayBuffer(numbers: number[]): ArrayBuffer {
// 使用变长编码以节省空间
const bytes: number[] = [];
for (const num of numbers) {
if (num < 128) {
// 小于128用1字节
bytes.push(num);
} else if (num < 16384) {
// 小于16384用2字节最高位为1表示有下一字节
bytes.push(0x80 | (num & 0x7F));
bytes.push((num >> 7) & 0x7F);
} else {
// 大于等于16384用3字节
bytes.push(0x80 | (num & 0x7F));
bytes.push(0x80 | ((num >> 7) & 0x7F));
bytes.push((num >> 14) & 0x7F);
}
}
return new Uint8Array(bytes).buffer;
}
/**
* 将ArrayBuffer转换为数字数组
*/
private arrayBufferToNumbers(buffer: ArrayBuffer): number[] {
const bytes = new Uint8Array(buffer);
const numbers: number[] = [];
for (let i = 0; i < bytes.length; i++) {
const byte1 = bytes[i];
if ((byte1 & 0x80) === 0) {
// 单字节数字
numbers.push(byte1);
} else {
// 多字节数字
let num = byte1 & 0x7F;
i++;
if (i < bytes.length) {
const byte2 = bytes[i];
num |= (byte2 & 0x7F) << 7;
if ((byte2 & 0x80) !== 0) {
// 三字节数字
i++;
if (i < bytes.length) {
const byte3 = bytes[i];
num |= (byte3 & 0x7F) << 14;
}
}
}
numbers.push(num);
}
}
return numbers;
}
}
/**
@@ -59,89 +332,138 @@ export interface CompressionStats {
export class MessageCompressor {
private logger = createLogger('MessageCompressor');
private config: CompressionConfig;
private algorithms = new Map<string, ICompressionAlgorithm>();
private stats: CompressionStats;
// 异步压缩函数
private gzipAsync = promisify(zlib.gzip);
private gunzipAsync = promisify(zlib.gunzip);
private deflateAsync = promisify(zlib.deflate);
private inflateAsync = promisify(zlib.inflate);
private brotliCompressAsync = promisify(zlib.brotliCompress);
private brotliDecompressAsync = promisify(zlib.brotliDecompress);
/**
* 构造函数
*/
constructor(config: Partial<CompressionConfig> = {}) {
this.config = {
algorithm: CompressionAlgorithm.GZIP,
level: 6, // 平衡压缩率和速度
defaultAlgorithm: 'none',
threshold: 1024, // 1KB以上才压缩
enableAsync: true,
level: 6,
chunkSize: 64 * 1024, // 64KB分块
enableStats: true,
...config
};
this.stats = {
totalCompressed: 0,
totalDecompressed: 0,
totalCompressions: 0,
totalDecompressions: 0,
totalOriginalBytes: 0,
totalCompressedBytes: 0,
averageCompressionRatio: 0,
averageCompressionRatio: 1.0,
averageCompressionTime: 0,
averageDecompressionTime: 0,
algorithmUsage: {
[CompressionAlgorithm.NONE]: 0,
[CompressionAlgorithm.GZIP]: 0,
[CompressionAlgorithm.DEFLATE]: 0,
[CompressionAlgorithm.BROTLI]: 0
}
algorithmUsage: {}
};
// 注册默认算法
this.registerAlgorithm(new NoCompressionAlgorithm());
this.registerAlgorithm(new LZCompressionAlgorithm());
}
/**
* 注册压缩算法
*/
public registerAlgorithm(algorithm: ICompressionAlgorithm): void {
if (this.algorithms.has(algorithm.name)) {
this.logger.warn(`压缩算法 '${algorithm.name}' 已存在,将被覆盖`);
}
this.algorithms.set(algorithm.name, algorithm);
this.stats.algorithmUsage[algorithm.name] = 0;
this.logger.info(`注册压缩算法: ${algorithm.name} v${algorithm.version}`);
}
/**
* 注销压缩算法
*/
public unregisterAlgorithm(algorithmName: string): boolean {
if (algorithmName === 'none') {
this.logger.warn('无法注销默认的无压缩算法');
return false;
}
const removed = this.algorithms.delete(algorithmName);
if (removed) {
delete this.stats.algorithmUsage[algorithmName];
this.logger.info(`注销压缩算法: ${algorithmName}`);
}
return removed;
}
/**
* 获取已注册的算法列表
*/
public getRegisteredAlgorithms(): string[] {
return Array.from(this.algorithms.keys());
}
/**
* 检查算法是否已注册
*/
public hasAlgorithm(algorithmName: string): boolean {
return this.algorithms.has(algorithmName);
}
/**
* 压缩数据
*/
async compress(data: string | Buffer): Promise<CompressionResult> {
public async compress(
data: ArrayBuffer | string,
algorithmName?: string
): Promise<CompressionResult> {
const startTime = performance.now();
const inputBuffer = typeof data === 'string' ? Buffer.from(data, 'utf8') : data;
const originalSize = inputBuffer.length;
// 转换输入数据
const inputBuffer = typeof data === 'string'
? new TextEncoder().encode(data).buffer
: data;
const originalSize = inputBuffer.byteLength;
// 选择压缩算法
const selectedAlgorithm = algorithmName || this.config.defaultAlgorithm;
const algorithm = this.algorithms.get(selectedAlgorithm);
if (!algorithm) {
throw new Error(`未找到压缩算法: ${selectedAlgorithm}`);
}
try {
let compressedData: ArrayBuffer;
let wasCompressed = false;
// 检查是否需要压缩
if (originalSize < this.config.threshold) {
return this.createNoCompressionResult(inputBuffer, originalSize, startTime);
}
let compressedData: Buffer;
const algorithm = this.config.algorithm;
// 根据算法进行压缩
switch (algorithm) {
case CompressionAlgorithm.GZIP:
compressedData = await this.compressGzip(inputBuffer);
break;
case CompressionAlgorithm.DEFLATE:
compressedData = await this.compressDeflate(inputBuffer);
break;
case CompressionAlgorithm.BROTLI:
compressedData = await this.compressBrotli(inputBuffer);
break;
case CompressionAlgorithm.NONE:
default:
return this.createNoCompressionResult(inputBuffer, originalSize, startTime);
if (originalSize < this.config.threshold || selectedAlgorithm === 'none') {
compressedData = inputBuffer.slice(0);
} else {
// 选择同步或异步压缩
if (this.config.enableAsync && algorithm.supportsAsync && algorithm.compressAsync) {
compressedData = await algorithm.compressAsync(inputBuffer);
} else {
compressedData = algorithm.compress(inputBuffer);
}
wasCompressed = true;
}
const endTime = performance.now();
const compressionTime = endTime - startTime;
const compressedSize = compressedData.length;
const compressedSize = compressedData.byteLength;
const compressionRatio = originalSize > 0 ? compressedSize / originalSize : 1;
// 检查压缩效果
if (compressedSize >= originalSize * 0.9) {
// 压缩效果不明显,返回原始数据
this.logger.debug(`压缩效果不佳,返回原始数据。原始: ${originalSize}, 压缩: ${compressedSize}`);
return this.createNoCompressionResult(inputBuffer, originalSize, startTime);
// 更新统计信息
if (this.config.enableStats) {
this.updateCompressionStats(
selectedAlgorithm,
originalSize,
compressedSize,
compressionTime
);
}
const result: CompressionResult = {
@@ -150,349 +472,183 @@ export class MessageCompressor {
compressedSize,
compressionRatio,
compressionTime,
algorithm
algorithm: selectedAlgorithm,
wasCompressed
};
// 更新统计
this.updateCompressionStats(result);
this.logger.debug(
`压缩完成: ${originalSize}B -> ${compressedSize}B ` +
`(${(compressionRatio * 100).toFixed(1)}%) ` +
`用时 ${compressionTime.toFixed(2)}ms, 算法: ${selectedAlgorithm}`
);
return result;
} catch (error) {
this.logger.error('压缩失败:', error);
return this.createNoCompressionResult(inputBuffer, originalSize, startTime);
this.logger.error(`压缩失败 (${selectedAlgorithm}):`, error);
throw error;
}
}
/**
* 解压缩数据
*/
async decompress(data: Buffer, algorithm: CompressionAlgorithm): Promise<Buffer> {
public async decompress(
data: ArrayBuffer,
algorithmName: string
): Promise<DecompressionResult> {
const startTime = performance.now();
const compressedSize = data.byteLength;
const algorithm = this.algorithms.get(algorithmName);
if (!algorithm) {
throw new Error(`未找到解压缩算法: ${algorithmName}`);
}
try {
if (algorithm === CompressionAlgorithm.NONE) {
return data;
}
let decompressedData: ArrayBuffer;
let decompressedData: Buffer;
switch (algorithm) {
case CompressionAlgorithm.GZIP:
decompressedData = await this.decompressGzip(data);
break;
case CompressionAlgorithm.DEFLATE:
decompressedData = await this.decompressDeflate(data);
break;
case CompressionAlgorithm.BROTLI:
decompressedData = await this.decompressBrotli(data);
break;
default:
throw new Error(`不支持的压缩算法: ${algorithm}`);
// 选择同步或异步解压缩
if (this.config.enableAsync && algorithm.supportsAsync && algorithm.decompressAsync) {
decompressedData = await algorithm.decompressAsync(data);
} else {
decompressedData = algorithm.decompress(data);
}
const endTime = performance.now();
const decompressionTime = endTime - startTime;
const decompressedSize = decompressedData.byteLength;
// 更新统计
this.updateDecompressionStats(decompressionTime);
// 更新统计信息
if (this.config.enableStats) {
this.updateDecompressionStats(algorithmName, decompressionTime);
}
return decompressedData;
const result: DecompressionResult = {
data: decompressedData,
compressedSize,
decompressedSize,
decompressionTime,
algorithm: algorithmName
};
this.logger.debug(
`解压缩完成: ${compressedSize}B -> ${decompressedSize}B ` +
`用时 ${decompressionTime.toFixed(2)}ms, 算法: ${algorithmName}`
);
return result;
} catch (error) {
this.logger.error('解压缩失败:', error);
this.logger.error(`解压缩失败 (${algorithmName}):`, error);
throw error;
}
}
/**
* 批量压缩
* 估算压缩后大小
*/
async compressBatch(dataList: (string | Buffer)[]): Promise<CompressionResult[]> {
const results: CompressionResult[] = [];
if (this.config.enableAsync) {
// 并行压缩
const promises = dataList.map(data => this.compress(data));
return await Promise.all(promises);
} else {
// 串行压缩
for (const data of dataList) {
results.push(await this.compress(data));
}
return results;
public estimateCompressedSize(
data: ArrayBuffer,
algorithmName?: string
): number {
const selectedAlgorithm = algorithmName || this.config.defaultAlgorithm;
const algorithm = this.algorithms.get(selectedAlgorithm);
if (!algorithm) {
return data.byteLength;
}
if (algorithm.estimateCompressedSize) {
return algorithm.estimateCompressedSize(data);
}
// 如果没有估算函数,返回原始大小
return data.byteLength;
}
/**
* 自适应压缩
* 根据数据特征自动选择最佳压缩算法
* 获取压缩统计信息
*/
async compressAdaptive(data: string | Buffer): Promise<CompressionResult> {
const inputBuffer = typeof data === 'string' ? Buffer.from(data, 'utf8') : data;
const originalAlgorithm = this.config.algorithm;
try {
// 对小数据进行算法测试
const testSize = Math.min(inputBuffer.length, 4096); // 测试前4KB
const testData = inputBuffer.subarray(0, testSize);
const algorithms = [
CompressionAlgorithm.GZIP,
CompressionAlgorithm.DEFLATE,
CompressionAlgorithm.BROTLI
];
let bestAlgorithm = CompressionAlgorithm.GZIP;
let bestRatio = 1;
// 测试不同算法的压缩效果
for (const algorithm of algorithms) {
try {
this.config.algorithm = algorithm;
const testResult = await this.compress(testData);
if (testResult.compressionRatio < bestRatio) {
bestRatio = testResult.compressionRatio;
bestAlgorithm = algorithm;
}
} catch (error) {
// 忽略测试失败的算法
continue;
}
}
// 使用最佳算法压缩完整数据
this.config.algorithm = bestAlgorithm;
const result = await this.compress(inputBuffer);
this.logger.debug(`自适应压缩选择算法: ${bestAlgorithm}, 压缩率: ${result.compressionRatio.toFixed(3)}`);
return result;
} finally {
// 恢复原始配置
this.config.algorithm = originalAlgorithm;
}
}
/**
* 获取统计信息
*/
getStats(): CompressionStats {
public getStats(): CompressionStats {
return { ...this.stats };
}
/**
* 重置统计信息
*/
resetStats(): void {
public resetStats(): void {
this.stats = {
totalCompressed: 0,
totalDecompressed: 0,
totalCompressions: 0,
totalDecompressions: 0,
totalOriginalBytes: 0,
totalCompressedBytes: 0,
averageCompressionRatio: 0,
averageCompressionRatio: 1.0,
averageCompressionTime: 0,
averageDecompressionTime: 0,
algorithmUsage: {
[CompressionAlgorithm.NONE]: 0,
[CompressionAlgorithm.GZIP]: 0,
[CompressionAlgorithm.DEFLATE]: 0,
[CompressionAlgorithm.BROTLI]: 0
}
algorithmUsage: {}
};
// 重新初始化算法使用统计
for (const algorithmName of this.algorithms.keys()) {
this.stats.algorithmUsage[algorithmName] = 0;
}
}
/**
* 更新配置
*/
updateConfig(newConfig: Partial<CompressionConfig>): void {
public updateConfig(newConfig: Partial<CompressionConfig>): void {
Object.assign(this.config, newConfig);
this.logger.info('压缩器配置已更新:', newConfig);
this.logger.info('压缩器配置已更新');
}
/**
* 获取压缩建议
* 获取配置
*/
getCompressionRecommendation(dataSize: number, dataType: string): CompressionAlgorithm {
// 根据数据大小和类型推荐压缩算法
if (dataSize < this.config.threshold) {
return CompressionAlgorithm.NONE;
}
if (dataType === 'json' || dataType === 'text') {
// 文本数据推荐GZIP
return CompressionAlgorithm.GZIP;
} else if (dataType === 'binary') {
// 二进制数据推荐DEFLATE
return CompressionAlgorithm.DEFLATE;
} else {
// 默认推荐GZIP
return CompressionAlgorithm.GZIP;
}
public getConfig(): CompressionConfig {
return { ...this.config };
}
/**
* GZIP压缩
* 更新压缩统计信息
*/
private async compressGzip(data: Buffer): Promise<Buffer> {
if (this.config.enableAsync) {
return await this.gzipAsync(data, { level: this.config.level });
} else {
return zlib.gzipSync(data, { level: this.config.level });
}
}
/**
* GZIP解压缩
*/
private async decompressGzip(data: Buffer): Promise<Buffer> {
if (this.config.enableAsync) {
return await this.gunzipAsync(data);
} else {
return zlib.gunzipSync(data);
}
}
/**
* DEFLATE压缩
*/
private async compressDeflate(data: Buffer): Promise<Buffer> {
if (this.config.enableAsync) {
return await this.deflateAsync(data, { level: this.config.level });
} else {
return zlib.deflateSync(data, { level: this.config.level });
}
}
/**
* DEFLATE解压缩
*/
private async decompressDeflate(data: Buffer): Promise<Buffer> {
if (this.config.enableAsync) {
return await this.inflateAsync(data);
} else {
return zlib.inflateSync(data);
}
}
/**
* BROTLI压缩
*/
private async compressBrotli(data: Buffer): Promise<Buffer> {
const options = {
params: {
[zlib.constants.BROTLI_PARAM_QUALITY]: this.config.level
}
};
if (this.config.enableAsync) {
return await this.brotliCompressAsync(data, options);
} else {
return zlib.brotliCompressSync(data, options);
}
}
/**
* BROTLI解压缩
*/
private async decompressBrotli(data: Buffer): Promise<Buffer> {
if (this.config.enableAsync) {
return await this.brotliDecompressAsync(data);
} else {
return zlib.brotliDecompressSync(data);
}
}
/**
* 创建无压缩结果
*/
private createNoCompressionResult(
data: Buffer,
private updateCompressionStats(
algorithmName: string,
originalSize: number,
startTime: number
): CompressionResult {
const endTime = performance.now();
const result: CompressionResult = {
data,
originalSize,
compressedSize: originalSize,
compressionRatio: 1,
compressionTime: endTime - startTime,
algorithm: CompressionAlgorithm.NONE
};
compressedSize: number,
compressionTime: number
): void {
this.stats.totalCompressions++;
this.stats.totalOriginalBytes += originalSize;
this.stats.totalCompressedBytes += compressedSize;
this.stats.algorithmUsage[algorithmName]++;
this.updateCompressionStats(result);
return result;
}
// 更新平均压缩比
this.stats.averageCompressionRatio = this.stats.totalOriginalBytes > 0
? this.stats.totalCompressedBytes / this.stats.totalOriginalBytes
: 1.0;
/**
* 更新压缩统计
*/
private updateCompressionStats(result: CompressionResult): void {
this.stats.totalCompressed++;
this.stats.totalOriginalBytes += result.originalSize;
this.stats.totalCompressedBytes += result.compressedSize;
this.stats.algorithmUsage[result.algorithm]++;
// 计算平均值
this.stats.averageCompressionRatio =
this.stats.totalOriginalBytes > 0 ?
this.stats.totalCompressedBytes / this.stats.totalOriginalBytes : 1;
// 更新平均压缩时间(使用移动平均)
const alpha = 0.1; // 平滑因子
// 更新平均压缩时间
this.stats.averageCompressionTime =
this.stats.averageCompressionTime * (1 - alpha) + result.compressionTime * alpha;
(this.stats.averageCompressionTime * (this.stats.totalCompressions - 1) + compressionTime)
/ this.stats.totalCompressions;
}
/**
* 更新解压缩统计
* 更新解压缩统计信息
*/
private updateDecompressionStats(decompressionTime: number): void {
this.stats.totalDecompressed++;
private updateDecompressionStats(algorithmName: string, decompressionTime: number): void {
this.stats.totalDecompressions++;
// 更新平均解压缩时间(使用移动平均)
const alpha = 0.1;
// 更新平均解压缩时间
this.stats.averageDecompressionTime =
this.stats.averageDecompressionTime * (1 - alpha) + decompressionTime * alpha;
(this.stats.averageDecompressionTime * (this.stats.totalDecompressions - 1) + decompressionTime)
/ this.stats.totalDecompressions;
}
}
/**
* 获取压缩效率报告
*/
getEfficiencyReport() {
const savings = this.stats.totalOriginalBytes - this.stats.totalCompressedBytes;
const savingsPercentage = this.stats.totalOriginalBytes > 0 ?
(savings / this.stats.totalOriginalBytes) * 100 : 0;
return {
totalSavings: savings,
savingsPercentage,
averageCompressionRatio: this.stats.averageCompressionRatio,
averageCompressionTime: this.stats.averageCompressionTime,
averageDecompressionTime: this.stats.averageDecompressionTime,
algorithmUsage: this.stats.algorithmUsage,
recommendation: this.generateRecommendation()
};
}
/**
* 生成优化建议
*/
private generateRecommendation(): string {
const ratio = this.stats.averageCompressionRatio;
const time = this.stats.averageCompressionTime;
if (ratio > 0.8) {
return '压缩效果较差,建议调整算法或提高压缩级别';
} else if (time > 50) {
return '压缩时间较长,建议降低压缩级别或使用更快的算法';
} else if (ratio < 0.3) {
return '压缩效果很好,当前配置最优';
} else {
return '压缩性能正常';
}
}
}
/**
* 全局压缩器实例
*/
export const globalCompressor = new MessageCompressor();

View File

@@ -0,0 +1,639 @@
import { SyncBatch } from '../sync/SyncVarManager';
import { MessageType, INetworkMessage } from '../types/NetworkTypes';
/**
* 序列化配置
*/
export interface SyncVarSerializerConfig {
/** 是否启用压缩 */
enableCompression: boolean;
/** 是否启用差量同步 */
enableDeltaSync: boolean;
/** 是否启用类型检查 */
enableTypeChecking: boolean;
/** 最大消息大小(字节) */
maxMessageSize: number;
/** 是否启用批量优化 */
enableBatching: boolean;
/** 批量超时时间(毫秒) */
batchTimeout: number;
}
/**
* 序列化结果
*/
export interface SerializationResult {
/** 是否成功 */
success: boolean;
/** 序列化的数据 */
data?: ArrayBuffer | string;
/** 错误信息 */
error?: string;
/** 原始大小 */
originalSize: number;
/** 压缩后大小 */
compressedSize: number;
/** 压缩比 */
compressionRatio: number;
}
/**
* 反序列化结果
*/
export interface DeserializationResult<T = any> {
/** 是否成功 */
success: boolean;
/** 反序列化的数据 */
data?: T;
/** 错误信息 */
errors?: string[];
/** 是否通过类型检查 */
isValidType: boolean;
}
/**
* 差量数据
*/
export interface DeltaData {
/** 基础版本 */
baseVersion: number;
/** 当前版本 */
currentVersion: number;
/** 变化的字段 */
changes: { [key: string]: any };
/** 删除的字段 */
deletions: string[];
}
/**
* 压缩元数据
*/
export interface CompressionMetadata {
/** 压缩算法 */
algorithm: string;
/** 原始大小 */
originalSize: number;
/** 压缩大小 */
compressedSize: number;
/** 压缩时间戳 */
timestamp: number;
}
/**
* SyncVar专用序列化器
* 针对SyncVar数据进行优化的序列化系统
*/
export class SyncVarSerializer {
private config: SyncVarSerializerConfig;
private deltaHistory = new Map<string, { version: number; data: any }>();
private versionCounter = 0;
private compressionCache = new Map<string, ArrayBuffer>();
constructor(config: Partial<SyncVarSerializerConfig> = {}) {
this.config = {
enableCompression: true,
enableDeltaSync: true,
enableTypeChecking: true,
maxMessageSize: 64 * 1024, // 64KB
enableBatching: true,
batchTimeout: 16, // 16ms (60fps)
...config
};
}
/**
* 序列化SyncVar批次数据
*/
public serializeSyncBatch(batch: SyncBatch): SerializationResult {
try {
const startTime = performance.now();
// 准备序列化数据
let dataToSerialize: any = batch;
// 应用差量同步
if (this.config.enableDeltaSync) {
dataToSerialize = this.applyDeltaCompression(batch);
}
// 基础JSON序列化
const jsonString = JSON.stringify(dataToSerialize, this.replacer.bind(this));
const originalSize = new TextEncoder().encode(jsonString).length;
// 检查消息大小限制
if (originalSize > this.config.maxMessageSize) {
return {
success: false,
error: `消息大小超出限制: ${originalSize} > ${this.config.maxMessageSize}`,
originalSize,
compressedSize: 0,
compressionRatio: 0
};
}
let finalData: ArrayBuffer | string = jsonString;
let compressedSize = originalSize;
// 应用压缩
if (this.config.enableCompression && originalSize > 256) {
const compressionResult = this.compress(jsonString);
if (compressionResult.success && compressionResult.data) {
finalData = compressionResult.data;
compressedSize = compressionResult.data.byteLength;
}
}
const compressionRatio = originalSize > 0 ? compressedSize / originalSize : 1;
return {
success: true,
data: finalData,
originalSize,
compressedSize,
compressionRatio
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : '序列化失败',
originalSize: 0,
compressedSize: 0,
compressionRatio: 1
};
}
}
/**
* 反序列化SyncVar批次数据
*/
public deserializeSyncBatch(data: ArrayBuffer | string): DeserializationResult<SyncBatch> {
try {
let jsonString: string;
// 解压缩
if (data instanceof ArrayBuffer) {
const decompressResult = this.decompress(data);
if (!decompressResult.success || !decompressResult.data) {
return {
success: false,
errors: ['解压缩失败'],
isValidType: false
};
}
jsonString = decompressResult.data;
} else {
jsonString = data;
}
// JSON反序列化
const parsedData = JSON.parse(jsonString, this.reviver.bind(this));
// 类型检查
if (this.config.enableTypeChecking) {
const typeCheckResult = this.validateSyncBatchType(parsedData);
if (!typeCheckResult.isValid) {
return {
success: false,
errors: typeCheckResult.errors,
isValidType: false
};
}
}
// 应用差量还原
let finalData = parsedData;
if (this.config.enableDeltaSync && this.isDeltaData(parsedData)) {
finalData = this.applyDeltaRestore(parsedData);
}
return {
success: true,
data: finalData as SyncBatch,
isValidType: true
};
} catch (error) {
return {
success: false,
errors: [error instanceof Error ? error.message : '反序列化失败'],
isValidType: false
};
}
}
/**
* 创建网络消息
*/
public createSyncMessage(batch: SyncBatch, senderId: string): INetworkMessage {
const serializedData = this.serializeSyncBatch(batch);
return {
type: MessageType.SYNC_BATCH,
messageId: this.generateMessageId(),
timestamp: Date.now(),
senderId,
data: serializedData.data,
reliable: true,
priority: this.calculateMessagePriority(batch)
};
}
/**
* 解析网络消息
*/
public parseSyncMessage(message: INetworkMessage): DeserializationResult<SyncBatch> {
if (message.type !== MessageType.SYNC_BATCH) {
return {
success: false,
errors: ['消息类型不匹配'],
isValidType: false
};
}
return this.deserializeSyncBatch(message.data);
}
/**
* 更新配置
*/
public updateConfig(newConfig: Partial<SyncVarSerializerConfig>): void {
Object.assign(this.config, newConfig);
}
/**
* 获取配置
*/
public getConfig(): SyncVarSerializerConfig {
return { ...this.config };
}
/**
* 清理缓存
*/
public clearCache(): void {
this.deltaHistory.clear();
this.compressionCache.clear();
this.versionCounter = 0;
}
/**
* 获取缓存统计
*/
public getCacheStats(): { deltaHistorySize: number; compressionCacheSize: number } {
return {
deltaHistorySize: this.deltaHistory.size,
compressionCacheSize: this.compressionCache.size
};
}
/**
* 应用差量压缩
*/
private applyDeltaCompression(batch: SyncBatch): DeltaData | SyncBatch {
const key = batch.instanceId;
const lastRecord = this.deltaHistory.get(key);
if (!lastRecord) {
// 第一次同步,存储完整数据
this.deltaHistory.set(key, {
version: ++this.versionCounter,
data: { ...batch }
});
return batch;
}
// 计算差量
const changes: { [key: string]: any } = {};
const deletions: string[] = [];
// 检查变化的属性
for (const [prop, value] of Object.entries(batch.changes)) {
if (!lastRecord.data.changes || lastRecord.data.changes[prop] !== value) {
changes[prop] = value;
}
}
// 检查删除的属性
if (lastRecord.data.changes) {
for (const prop of Object.keys(lastRecord.data.changes)) {
if (!(prop in batch.changes)) {
deletions.push(prop);
}
}
}
// 如果没有变化,返回空的差量数据
if (Object.keys(changes).length === 0 && deletions.length === 0) {
return {
baseVersion: lastRecord.version,
currentVersion: lastRecord.version,
changes: {},
deletions: []
};
}
// 更新历史记录
const currentVersion = ++this.versionCounter;
this.deltaHistory.set(key, {
version: currentVersion,
data: { ...batch }
});
return {
baseVersion: lastRecord.version,
currentVersion,
changes,
deletions
};
}
/**
* 应用差量还原
*/
private applyDeltaRestore(deltaData: DeltaData): SyncBatch {
// 这里应该根据baseVersion找到对应的基础数据
// 简化实现返回一个基本的SyncBatch
return {
instanceId: 'unknown',
instanceType: 'unknown',
changes: deltaData.changes,
timestamp: Date.now(),
syncModes: {},
authorities: {},
scopes: {},
priorities: {}
};
}
/**
* 检查是否为差量数据
*/
private isDeltaData(data: any): data is DeltaData {
return data &&
typeof data.baseVersion === 'number' &&
typeof data.currentVersion === 'number' &&
typeof data.changes === 'object' &&
Array.isArray(data.deletions);
}
/**
* 压缩数据
*/
private compress(data: string): { success: boolean; data?: ArrayBuffer } {
try {
// 使用LZ字符串压缩算法
const compressed = this.lzCompress(data);
const encoder = new TextEncoder();
const bytes = encoder.encode(compressed);
return {
success: true,
data: bytes.buffer
};
} catch (error) {
return { success: false };
}
}
/**
* 解压缩数据
*/
private decompress(data: ArrayBuffer): { success: boolean; data?: string } {
try {
const decoder = new TextDecoder();
const compressedString = decoder.decode(data);
const decompressed = this.lzDecompress(compressedString);
return {
success: true,
data: decompressed
};
} catch (error) {
return { success: false };
}
}
/**
* JSON序列化替换函数
*/
private replacer(key: string, value: any): any {
// 处理特殊类型的序列化
if (value instanceof Date) {
return { __type: 'Date', value: value.toISOString() };
}
if (value instanceof Map) {
return { __type: 'Map', value: Array.from(value.entries()) };
}
if (value instanceof Set) {
return { __type: 'Set', value: Array.from(value) };
}
// 处理BigInt
if (typeof value === 'bigint') {
return { __type: 'BigInt', value: value.toString() };
}
return value;
}
/**
* JSON反序列化恢复函数
*/
private reviver(key: string, value: any): any {
if (value && typeof value === 'object' && value.__type) {
switch (value.__type) {
case 'Date':
return new Date(value.value);
case 'Map':
return new Map(value.value);
case 'Set':
return new Set(value.value);
case 'BigInt':
return BigInt(value.value);
}
}
return value;
}
/**
* 验证SyncBatch类型
*/
private validateSyncBatchType(data: any): { isValid: boolean; errors: string[] } {
const errors: string[] = [];
if (!data || typeof data !== 'object') {
errors.push('数据不是对象');
return { isValid: false, errors };
}
if (typeof data.instanceId !== 'string') {
errors.push('instanceId必须是字符串');
}
if (typeof data.instanceType !== 'string') {
errors.push('instanceType必须是字符串');
}
if (!data.changes || typeof data.changes !== 'object') {
errors.push('changes必须是对象');
}
if (typeof data.timestamp !== 'number') {
errors.push('timestamp必须是数字');
}
return { isValid: errors.length === 0, errors };
}
/**
* 生成消息ID
*/
private generateMessageId(): string {
return `sync_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
/**
* 计算消息优先级
*/
private calculateMessagePriority(batch: SyncBatch): number {
// 根据批次中属性的优先级计算整体优先级
const priorities = Object.values(batch.priorities);
if (priorities.length === 0) {
return 5; // 默认优先级
}
// 使用最高优先级
return Math.max(...priorities);
}
/**
* LZ字符串压缩算法
*/
private lzCompress(input: string): string {
if (!input) return '';
const dictionary: { [key: string]: number } = {};
let dictSize = 256;
// 初始化字典
for (let i = 0; i < 256; i++) {
dictionary[String.fromCharCode(i)] = i;
}
let w = '';
const result: number[] = [];
for (let i = 0; i < input.length; i++) {
const c = input.charAt(i);
const wc = w + c;
if (dictionary[wc] !== undefined) {
w = wc;
} else {
result.push(dictionary[w]);
dictionary[wc] = dictSize++;
w = c;
}
}
if (w) {
result.push(dictionary[w]);
}
// 将结果编码为Base64以确保字符串安全
return this.arrayToBase64(result);
}
/**
* LZ字符串解压缩算法
*/
private lzDecompress(compressed: string): string {
if (!compressed) return '';
const data = this.base64ToArray(compressed);
if (data.length === 0) return '';
const dictionary: { [key: number]: string } = {};
let dictSize = 256;
// 初始化字典
for (let i = 0; i < 256; i++) {
dictionary[i] = String.fromCharCode(i);
}
let w = String.fromCharCode(data[0]);
const result = [w];
for (let i = 1; i < data.length; i++) {
const k = data[i];
let entry: string;
if (dictionary[k] !== undefined) {
entry = dictionary[k];
} else if (k === dictSize) {
entry = w + w.charAt(0);
} else {
throw new Error('解压缩错误:无效的压缩数据');
}
result.push(entry);
dictionary[dictSize++] = w + entry.charAt(0);
w = entry;
}
return result.join('');
}
/**
* 数组转Base64
*/
private arrayToBase64(array: number[]): string {
// 将数字数组转换为字节数组
const bytes: number[] = [];
for (const num of array) {
if (num < 256) {
bytes.push(num);
} else {
// 大于255的数字用两个字节表示
bytes.push(255, num - 255);
}
}
// 转换为字符串然后编码为Base64
const binaryString = String.fromCharCode(...bytes);
return btoa(binaryString);
}
/**
* Base64转数组
*/
private base64ToArray(base64: string): number[] {
try {
const binaryString = atob(base64);
const bytes: number[] = [];
for (let i = 0; i < binaryString.length; i++) {
bytes.push(binaryString.charCodeAt(i));
}
// 还原原始数字数组
const result: number[] = [];
for (let i = 0; i < bytes.length; i++) {
if (bytes[i] === 255 && i + 1 < bytes.length) {
result.push(255 + bytes[i + 1]);
i++; // 跳过下一个字节
} else {
result.push(bytes[i]);
}
}
return result;
} catch (error) {
throw new Error('Base64解码失败');
}
}
}

View File

@@ -0,0 +1,794 @@
import { createLogger } from '@esengine/ecs-framework';
/**
* 差量同步配置
*/
export interface DeltaSyncConfig {
/** 是否启用差量同步 */
enabled: boolean;
/** 最大历史版本数 */
maxHistoryVersions: number;
/** 版本超时时间(毫秒) */
versionTimeout: number;
/** 差量压缩阈值 */
compressionThreshold: number;
/** 是否启用智能合并 */
enableSmartMerging: boolean;
/** 合并时间窗口(毫秒) */
mergeWindow: number;
}
/**
* 版本化数据
*/
export interface VersionedData {
version: number;
timestamp: number;
data: any;
checksum?: string;
}
/**
* 差量数据
*/
export interface DeltaData {
baseVersion: number;
targetVersion: number;
changes: { [key: string]: any };
deletions: string[];
metadata: {
timestamp: number;
size: number;
compressionRatio: number;
};
}
/**
* 差量操作类型
*/
export enum DeltaOperationType {
/** 添加属性 */
ADD = 'add',
/** 修改属性 */
MODIFY = 'modify',
/** 删除属性 */
DELETE = 'delete',
/** 批量操作 */
BATCH = 'batch',
/** 无操作(合并后消除的操作) */
NOOP = 'noop'
}
/**
* 差量操作
*/
export interface DeltaOperation {
type: DeltaOperationType;
path: string;
oldValue?: any;
newValue?: any;
timestamp: number;
}
/**
* 差量同步统计
*/
export interface DeltaSyncStats {
totalDeltas: number;
totalSize: number;
compressionRatio: number;
averageDeltaSize: number;
cacheHitRate: number;
mergedOperations: number;
}
/**
* 差量同步器
* 负责计算和应用数据差量,减少网络传输量
*/
export class DeltaSync {
private logger = createLogger('DeltaSync');
private config: DeltaSyncConfig;
/** 版本历史 */
private versionHistory = new Map<string, Map<number, VersionedData>>();
/** 版本计数器 */
private versionCounters = new Map<string, number>();
/** 差量缓存 */
private deltaCache = new Map<string, DeltaData>();
/** 待合并操作 */
private pendingOperations = new Map<string, DeltaOperation[]>();
/** 统计信息 */
private stats: DeltaSyncStats = {
totalDeltas: 0,
totalSize: 0,
compressionRatio: 1,
averageDeltaSize: 0,
cacheHitRate: 0,
mergedOperations: 0
};
/** 合并定时器 */
private mergeTimers = new Map<string, any>();
constructor(config: Partial<DeltaSyncConfig> = {}) {
this.config = {
enabled: true,
maxHistoryVersions: 10,
versionTimeout: 30000,
compressionThreshold: 100,
enableSmartMerging: true,
mergeWindow: 50,
...config
};
}
/**
* 记录基线版本
*/
public recordBaseline(instanceId: string, data: any): number {
if (!this.config.enabled) {
return 0;
}
const version = this.getNextVersion(instanceId);
const versionedData: VersionedData = {
version,
timestamp: Date.now(),
data: this.deepClone(data),
checksum: this.calculateChecksum(data)
};
this.storeVersion(instanceId, versionedData);
return version;
}
/**
* 计算差量
*/
public calculateDelta(instanceId: string, newData: any, baseVersion?: number): DeltaData | null {
if (!this.config.enabled) {
return null;
}
const history = this.versionHistory.get(instanceId);
if (!history || history.size === 0) {
// 没有基线,记录第一个版本
this.recordBaseline(instanceId, newData);
return null;
}
// 选择基线版本
let baseVersionData: VersionedData;
if (baseVersion !== undefined) {
const foundVersion = history.get(baseVersion);
if (!foundVersion) {
this.logger.warn(`未找到版本 ${baseVersion},使用最新版本`);
const latestVersion = this.getLatestVersion(instanceId);
if (!latestVersion) {
this.logger.error(`实例 ${instanceId} 没有任何版本历史`);
return null;
}
baseVersionData = latestVersion;
} else {
baseVersionData = foundVersion;
}
} else {
const latestVersion = this.getLatestVersion(instanceId);
if (!latestVersion) {
this.logger.error(`实例 ${instanceId} 没有任何版本历史`);
return null;
}
baseVersionData = latestVersion;
}
const targetVersion = this.getNextVersion(instanceId);
const changes = this.computeChanges(baseVersionData.data, newData);
const deletions = this.computeDeletions(baseVersionData.data, newData);
// 检查是否有变化
if (Object.keys(changes).length === 0 && deletions.length === 0) {
return null;
}
const deltaData: DeltaData = {
baseVersion: baseVersionData.version,
targetVersion,
changes,
deletions,
metadata: {
timestamp: Date.now(),
size: this.estimateSize(changes) + this.estimateSize(deletions),
compressionRatio: 1
}
};
// 记录新版本
this.recordBaseline(instanceId, newData);
// 更新统计
this.updateStats(deltaData);
return deltaData;
}
/**
* 应用差量
*/
public applyDelta(instanceId: string, delta: DeltaData): any {
if (!this.config.enabled) {
return null;
}
const history = this.versionHistory.get(instanceId);
if (!history) {
this.logger.error(`实例 ${instanceId} 没有版本历史`);
return null;
}
const baseData = history.get(delta.baseVersion);
if (!baseData) {
this.logger.error(`未找到基线版本 ${delta.baseVersion}`);
return null;
}
// 复制基线数据
const result = this.deepClone(baseData.data);
// 应用变化
for (const [key, value] of Object.entries(delta.changes)) {
this.setNestedProperty(result, key, value);
}
// 应用删除
for (const key of delta.deletions) {
this.deleteNestedProperty(result, key);
}
// 记录结果版本
const resultVersion: VersionedData = {
version: delta.targetVersion,
timestamp: delta.metadata.timestamp,
data: this.deepClone(result),
checksum: this.calculateChecksum(result)
};
this.storeVersion(instanceId, resultVersion);
return result;
}
/**
* 智能合并操作
*/
public mergeOperations(instanceId: string, operations: DeltaOperation[]): DeltaOperation[] {
if (!this.config.enableSmartMerging || operations.length <= 1) {
return operations;
}
const pathMap = new Map<string, DeltaOperation>();
// 按路径分组操作
for (const op of operations) {
const existing = pathMap.get(op.path);
if (!existing) {
pathMap.set(op.path, op);
} else {
// 合并同路径的操作
const mergedOp = this.mergeTwoOperations(existing, op);
pathMap.set(op.path, mergedOp);
this.stats.mergedOperations++;
}
}
// 过滤掉NOOP操作
return Array.from(pathMap.values()).filter(op => op.type !== DeltaOperationType.NOOP);
}
/**
* 延迟合并操作
*/
public scheduleOperation(instanceId: string, operation: DeltaOperation): void {
if (!this.config.enableSmartMerging) {
return;
}
let operations = this.pendingOperations.get(instanceId);
if (!operations) {
operations = [];
this.pendingOperations.set(instanceId, operations);
}
operations.push(operation);
// 重置合并定时器
const existingTimer = this.mergeTimers.get(instanceId);
if (existingTimer) {
clearTimeout(existingTimer);
}
const timer = setTimeout(() => {
this.flushPendingOperations(instanceId);
}, this.config.mergeWindow);
this.mergeTimers.set(instanceId, timer);
}
/**
* 压缩差量数据
*/
public compressDelta(delta: DeltaData): DeltaData {
if (delta.metadata.size < this.config.compressionThreshold) {
return delta;
}
// 简化的压缩实现
const compressedChanges = this.compressObject(delta.changes);
const compressedDeletions = delta.deletions; // 删除操作通常已经很紧凑
const originalSize = delta.metadata.size;
const compressedSize = this.estimateSize(compressedChanges) + this.estimateSize(compressedDeletions);
return {
...delta,
changes: compressedChanges,
deletions: compressedDeletions,
metadata: {
...delta.metadata,
size: compressedSize,
compressionRatio: compressedSize / originalSize
}
};
}
/**
* 清理过期版本
*/
public cleanup(): void {
const now = Date.now();
for (const [instanceId, history] of this.versionHistory) {
const versionsToDelete: number[] = [];
for (const [version, versionData] of history) {
// 检查超时
if (now - versionData.timestamp > this.config.versionTimeout) {
versionsToDelete.push(version);
}
}
// 保留最新的几个版本
const sortedVersions = Array.from(history.keys()).sort((a, b) => b - a);
const toKeep = sortedVersions.slice(0, this.config.maxHistoryVersions);
for (const version of versionsToDelete) {
if (!toKeep.includes(version)) {
history.delete(version);
}
}
// 如果实例没有版本了,删除实例
if (history.size === 0) {
this.versionHistory.delete(instanceId);
this.versionCounters.delete(instanceId);
}
}
// 清理差量缓存
this.deltaCache.clear();
}
/**
* 获取统计信息
*/
public getStats(): DeltaSyncStats {
return { ...this.stats };
}
/**
* 重置统计信息
*/
public resetStats(): void {
this.stats = {
totalDeltas: 0,
totalSize: 0,
compressionRatio: 1,
averageDeltaSize: 0,
cacheHitRate: 0,
mergedOperations: 0
};
}
/**
* 更新配置
*/
public updateConfig(newConfig: Partial<DeltaSyncConfig>): void {
Object.assign(this.config, newConfig);
}
/**
* 销毁同步器
*/
public destroy(): void {
// 清理定时器
for (const timer of this.mergeTimers.values()) {
clearTimeout(timer);
}
this.versionHistory.clear();
this.versionCounters.clear();
this.deltaCache.clear();
this.pendingOperations.clear();
this.mergeTimers.clear();
}
/**
* 获取下一个版本号
*/
private getNextVersion(instanceId: string): number {
const current = this.versionCounters.get(instanceId) || 0;
const next = current + 1;
this.versionCounters.set(instanceId, next);
return next;
}
/**
* 存储版本数据
*/
private storeVersion(instanceId: string, versionData: VersionedData): void {
let history = this.versionHistory.get(instanceId);
if (!history) {
history = new Map();
this.versionHistory.set(instanceId, history);
}
history.set(versionData.version, versionData);
// 限制历史版本数量
if (history.size > this.config.maxHistoryVersions) {
const oldestVersion = Math.min(...Array.from(history.keys()));
history.delete(oldestVersion);
}
}
/**
* 获取最新版本
*/
private getLatestVersion(instanceId: string): VersionedData | undefined {
const history = this.versionHistory.get(instanceId);
if (!history || history.size === 0) {
return undefined;
}
const latestVersion = Math.max(...Array.from(history.keys()));
return history.get(latestVersion);
}
/**
* 计算变化
*/
private computeChanges(oldData: any, newData: any): { [key: string]: any } {
const changes: { [key: string]: any } = {};
for (const [key, newValue] of Object.entries(newData)) {
const oldValue = oldData[key];
if (!this.deepEqual(oldValue, newValue)) {
changes[key] = newValue;
}
}
return changes;
}
/**
* 计算删除
*/
private computeDeletions(oldData: any, newData: any): string[] {
const deletions: string[] = [];
for (const key of Object.keys(oldData)) {
if (!(key in newData)) {
deletions.push(key);
}
}
return deletions;
}
/**
* 合并两个操作
*/
private mergeTwoOperations(op1: DeltaOperation, op2: DeltaOperation): DeltaOperation {
// 智能合并逻辑
const mergedOp: DeltaOperation = {
type: op2.type,
path: op2.path,
oldValue: op1.oldValue, // 保留最初的旧值
newValue: op2.newValue,
timestamp: op2.timestamp
};
// 处理特殊合并情况
if (op1.type === DeltaOperationType.ADD && op2.type === DeltaOperationType.DELETE) {
// 添加后删除 = 无操作
return {
type: DeltaOperationType.NOOP,
path: op2.path,
oldValue: undefined,
newValue: undefined,
timestamp: op2.timestamp
};
}
if (op1.type === DeltaOperationType.DELETE && op2.type === DeltaOperationType.ADD) {
// 删除后添加 = 修改
mergedOp.type = DeltaOperationType.MODIFY;
mergedOp.oldValue = op1.oldValue;
}
if (op1.type === DeltaOperationType.MODIFY && op2.type === DeltaOperationType.DELETE) {
// 修改后删除 = 删除原始值
mergedOp.type = DeltaOperationType.DELETE;
mergedOp.newValue = undefined;
}
// 检查是否值回到了原始状态
if (op1.type === DeltaOperationType.MODIFY &&
op2.type === DeltaOperationType.MODIFY &&
this.deepEqual(op1.oldValue, op2.newValue)) {
// 值回到原始状态 = 无操作
return {
type: DeltaOperationType.NOOP,
path: op2.path,
oldValue: undefined,
newValue: undefined,
timestamp: op2.timestamp
};
}
return mergedOp;
}
/**
* 刷新待处理操作
*/
private flushPendingOperations(instanceId: string): void {
const operations = this.pendingOperations.get(instanceId);
if (!operations || operations.length === 0) {
return;
}
// 合并操作并发送
this.mergeOperations(instanceId, operations);
// 清理待处理操作
this.pendingOperations.delete(instanceId);
this.mergeTimers.delete(instanceId);
}
/**
* 压缩对象
*/
private compressObject(obj: any): any {
if (!obj || typeof obj !== 'object') {
return obj;
}
// 移除null和undefined值
const compressed: any = Array.isArray(obj) ? [] : {};
for (const [key, value] of Object.entries(obj)) {
if (value !== null && value !== undefined) {
if (typeof value === 'object') {
compressed[key] = this.compressObject(value);
} else {
compressed[key] = value;
}
}
}
return compressed;
}
/**
* 估算大小
*/
private estimateSize(obj: any): number {
if (obj === null || obj === undefined) {
return 4; // "null"的长度
}
if (typeof obj === 'string') {
return obj.length * 2; // UTF-16字符估算
}
if (typeof obj === 'number') {
return 8; // 64位数字
}
if (typeof obj === 'boolean') {
return 4; // true/false
}
if (Array.isArray(obj)) {
let size = 2; // []
for (const item of obj) {
size += this.estimateSize(item) + 1; // +1 for comma
}
return size;
}
if (typeof obj === 'object') {
let size = 2; // {}
for (const [key, value] of Object.entries(obj)) {
size += key.length * 2 + 3; // key + ":"
size += this.estimateSize(value) + 1; // value + comma
}
return size;
}
return JSON.stringify(obj).length;
}
/**
* 深度克隆
*/
private deepClone(obj: any): any {
if (obj === null || obj === undefined) {
return obj;
}
if (typeof obj !== 'object') {
return obj;
}
if (obj instanceof Date) {
return new Date(obj.getTime());
}
if (obj instanceof RegExp) {
return new RegExp(obj);
}
if (Array.isArray(obj)) {
return obj.map(item => this.deepClone(item));
}
const cloned: any = {};
for (const [key, value] of Object.entries(obj)) {
cloned[key] = this.deepClone(value);
}
return cloned;
}
/**
* 深度比较
*/
private deepEqual(obj1: any, obj2: any): boolean {
if (obj1 === obj2) {
return true;
}
if (obj1 === null || obj2 === null || obj1 === undefined || obj2 === undefined) {
return obj1 === obj2;
}
if (typeof obj1 !== typeof obj2) {
return false;
}
if (typeof obj1 !== 'object') {
return obj1 === obj2;
}
if (obj1 instanceof Date && obj2 instanceof Date) {
return obj1.getTime() === obj2.getTime();
}
if (Array.isArray(obj1) !== Array.isArray(obj2)) {
return false;
}
if (Array.isArray(obj1)) {
if (obj1.length !== obj2.length) {
return false;
}
for (let i = 0; i < obj1.length; i++) {
if (!this.deepEqual(obj1[i], obj2[i])) {
return false;
}
}
return true;
}
const keys1 = Object.keys(obj1);
const keys2 = Object.keys(obj2);
if (keys1.length !== keys2.length) {
return false;
}
for (const key of keys1) {
if (!keys2.includes(key)) {
return false;
}
if (!this.deepEqual(obj1[key], obj2[key])) {
return false;
}
}
return true;
}
/**
* 设置嵌套属性
*/
private setNestedProperty(obj: any, path: string, value: any): void {
const keys = path.split('.');
let current = obj;
for (let i = 0; i < keys.length - 1; i++) {
const key = keys[i];
if (!(key in current)) {
current[key] = {};
}
current = current[key];
}
current[keys[keys.length - 1]] = value;
}
/**
* 删除嵌套属性
*/
private deleteNestedProperty(obj: any, path: string): void {
const keys = path.split('.');
let current = obj;
for (let i = 0; i < keys.length - 1; i++) {
const key = keys[i];
if (!(key in current)) {
return; // 路径不存在
}
current = current[key];
}
delete current[keys[keys.length - 1]];
}
/**
* 计算校验和
*/
private calculateChecksum(obj: any): string {
// 简化的校验和实现
const str = JSON.stringify(obj);
let hash = 0;
for (let i = 0; i < str.length; i++) {
const char = str.charCodeAt(i);
hash = ((hash << 5) - hash) + char;
hash = hash & hash; // 转换为32位整数
}
return hash.toString(16);
}
/**
* 更新统计信息
*/
private updateStats(delta: DeltaData): void {
this.stats.totalDeltas++;
this.stats.totalSize += delta.metadata.size;
this.stats.averageDeltaSize = this.stats.totalSize / this.stats.totalDeltas;
this.stats.compressionRatio =
(this.stats.compressionRatio * (this.stats.totalDeltas - 1) + delta.metadata.compressionRatio) /
this.stats.totalDeltas;
}
}

View File

@@ -0,0 +1,464 @@
import {
getSyncVarMetadata,
getDirtySyncVars,
clearDirtyFlags,
SyncVarMetadata,
hasSyncVars,
SyncVarValue
} from '../decorators/SyncVar';
import { SyncMode, AuthorityType, NetworkScope } from '../types/NetworkTypes';
import { EventEmitter } from '../utils/EventEmitter';
/**
* 同步批次数据
*/
export interface SyncBatch {
/** 实例ID */
instanceId: string;
/** 实例类型 */
instanceType: string;
/** 变化的属性数据 */
changes: { [propertyKey: string]: SyncVarValue };
/** 时间戳 */
timestamp: number;
/** 同步模式映射 */
syncModes: { [propertyKey: string]: SyncMode };
/** 权限映射 */
authorities: { [propertyKey: string]: AuthorityType };
/** 作用域映射 */
scopes: { [propertyKey: string]: NetworkScope };
/** 优先级映射 */
priorities: { [propertyKey: string]: number };
}
/**
* 同步统计信息
*/
export interface SyncStats {
/** 注册的实例数量 */
registeredInstances: number;
/** 脏实例数量 */
dirtyInstances: number;
/** 总同步次数 */
totalSyncs: number;
/** 总传输字节数 */
totalBytes: number;
/** 平均同步延迟 */
averageLatency: number;
/** 每秒同步次数 */
syncsPerSecond: number;
/** 最后同步时间 */
lastSyncTime: number;
}
/**
* SyncVar管理器事件
*/
export interface SyncVarManagerEvents {
instanceRegistered: (instanceId: string, instance: object) => void;
instanceUnregistered: (instanceId: string) => void;
syncBatchReady: (batch: SyncBatch) => void;
syncCompleted: (instanceId: string, propertyCount: number) => void;
syncError: (error: Error, instanceId?: string) => void;
}
/**
* SyncVar管理器
* 负责管理所有带有SyncVar的实例追踪变化并生成同步批次
*/
export class SyncVarManager extends EventEmitter {
private static instance: SyncVarManager | null = null;
/** 注册的实例映射 */
private registeredInstances = new Map<string, object>();
/** 脏实例集合 */
private dirtyInstances = new Set<string>();
/** 实例ID计数器 */
private instanceIdCounter = 0;
/** 实例ID映射 */
private instanceIdMap = new WeakMap<any, string>();
/** 统计信息 */
private stats: SyncStats = {
registeredInstances: 0,
dirtyInstances: 0,
totalSyncs: 0,
totalBytes: 0,
averageLatency: 0,
syncsPerSecond: 0,
lastSyncTime: 0
};
/** 自动同步定时器 */
private autoSyncTimer: ReturnType<typeof setInterval> | null = null;
/** 同步频率(毫秒) */
private syncRate = 100;
/** 是否启用自动同步 */
private autoSyncEnabled = true;
/** 最大批次大小 */
private maxBatchSize = 100;
/** 立即同步请求队列 */
private immediateSyncQueue = new Set<{ instanceId: string; propertyKey?: string | symbol }>();
private constructor() {
super();
this.startAutoSync();
}
/**
* 获取单例实例
*/
public static getInstance(): SyncVarManager {
if (!SyncVarManager.instance) {
SyncVarManager.instance = new SyncVarManager();
}
return SyncVarManager.instance;
}
/**
* 注册实例
*/
public registerInstance(instance: object): string {
if (!hasSyncVars(instance)) {
throw new Error('实例没有SyncVar属性无法注册');
}
// 检查是否已经注册
if (this.instanceIdMap.has(instance)) {
return this.instanceIdMap.get(instance)!;
}
// 生成新的实例ID
const instanceId = `syncvar_${++this.instanceIdCounter}`;
// 注册实例
this.registeredInstances.set(instanceId, instance);
this.instanceIdMap.set(instance, instanceId);
// 更新统计
this.stats.registeredInstances = this.registeredInstances.size;
this.emit('instanceRegistered', instanceId, instance);
return instanceId;
}
/**
* 注销实例
*/
public unregisterInstance(instance: object): boolean {
const instanceId = this.instanceIdMap.get(instance);
if (!instanceId) {
return false;
}
// 删除注册信息
this.registeredInstances.delete(instanceId);
this.instanceIdMap.delete(instance);
this.dirtyInstances.delete(instanceId);
// 更新统计
this.stats.registeredInstances = this.registeredInstances.size;
this.stats.dirtyInstances = this.dirtyInstances.size;
this.emit('instanceUnregistered', instanceId);
return true;
}
/**
* 标记实例为脏数据
*/
public markInstanceDirty(instance: object): void {
const instanceId = this.instanceIdMap.get(instance);
if (!instanceId) {
// 自动注册实例
this.registerInstance(instance);
return this.markInstanceDirty(instance);
}
this.dirtyInstances.add(instanceId);
this.stats.dirtyInstances = this.dirtyInstances.size;
}
/**
* 请求立即同步
*/
public requestImmediateSync(instance: object, propertyKey?: string | symbol): void {
const instanceId = this.instanceIdMap.get(instance);
if (!instanceId) {
return;
}
this.markInstanceDirty(instance);
this.immediateSyncQueue.add({ instanceId, propertyKey });
// 立即处理同步
this.processImmediateSyncs();
}
/**
* 手动触发同步
*/
public syncNow(): SyncBatch[] {
const batches: SyncBatch[] = [];
// 处理立即同步请求
this.processImmediateSyncs();
// 收集所有脏实例的数据
for (const instanceId of this.dirtyInstances) {
const batch = this.createSyncBatch(instanceId);
if (batch && Object.keys(batch.changes).length > 0) {
batches.push(batch);
}
}
// 清理脏标记
this.clearAllDirtyFlags();
// 更新统计
this.stats.totalSyncs += batches.length;
this.stats.lastSyncTime = Date.now();
return batches;
}
/**
* 设置同步频率
*/
public setSyncRate(rate: number): void {
this.syncRate = Math.max(1, rate);
if (this.autoSyncEnabled) {
this.restartAutoSync();
}
}
/**
* 启用/禁用自动同步
*/
public setAutoSyncEnabled(enabled: boolean): void {
this.autoSyncEnabled = enabled;
if (enabled) {
this.startAutoSync();
} else {
this.stopAutoSync();
}
}
/**
* 设置最大批次大小
*/
public setMaxBatchSize(size: number): void {
this.maxBatchSize = Math.max(1, size);
}
/**
* 获取统计信息
*/
public getStats(): SyncStats {
return { ...this.stats };
}
/**
* 获取实例ID
*/
public getInstanceId(instance: object): string | undefined {
return this.instanceIdMap.get(instance);
}
/**
* 获取实例
*/
public getInstance(instanceId: string): object | undefined {
return this.registeredInstances.get(instanceId);
}
/**
* 获取所有注册的实例ID
*/
public getAllInstanceIds(): string[] {
return Array.from(this.registeredInstances.keys());
}
/**
* 检查实例是否为脏数据
*/
public isInstanceDirty(instanceId: string): boolean {
return this.dirtyInstances.has(instanceId);
}
/**
* 重置统计信息
*/
public resetStats(): void {
this.stats = {
...this.stats,
totalSyncs: 0,
totalBytes: 0,
averageLatency: 0,
syncsPerSecond: 0
};
}
/**
* 销毁管理器
*/
public destroy(): void {
this.stopAutoSync();
this.registeredInstances.clear();
this.dirtyInstances.clear();
this.instanceIdMap = new WeakMap();
this.immediateSyncQueue.clear();
this.removeAllListeners();
SyncVarManager.instance = null;
}
/**
* 创建同步批次
*/
private createSyncBatch(instanceId: string): SyncBatch | null {
const instance = this.registeredInstances.get(instanceId);
if (!instance) {
return null;
}
const dirtyVars = getDirtySyncVars(instance);
if (dirtyVars.size === 0) {
return null;
}
const changes: { [propertyKey: string]: SyncVarValue } = {};
const syncModes: { [propertyKey: string]: SyncMode } = {};
const authorities: { [propertyKey: string]: AuthorityType } = {};
const scopes: { [propertyKey: string]: NetworkScope } = {};
const priorities: { [propertyKey: string]: number } = {};
for (const [propertyKey, metadata] of dirtyVars) {
const key = String(propertyKey);
changes[key] = (instance as any)[propertyKey];
syncModes[key] = metadata.options.mode;
authorities[key] = metadata.options.authority;
scopes[key] = metadata.options.scope;
priorities[key] = metadata.options.priority;
}
return {
instanceId,
instanceType: instance.constructor.name,
changes,
timestamp: Date.now(),
syncModes,
authorities,
scopes,
priorities
};
}
/**
* 处理立即同步请求
*/
private processImmediateSyncs(): void {
if (this.immediateSyncQueue.size === 0) {
return;
}
const batches: SyncBatch[] = [];
for (const request of this.immediateSyncQueue) {
const batch = this.createSyncBatch(request.instanceId);
if (batch && Object.keys(batch.changes).length > 0) {
// 如果指定了特定属性,只同步该属性
if (request.propertyKey) {
const key = String(request.propertyKey);
if (batch.changes[key] !== undefined) {
const filteredBatch: SyncBatch = {
...batch,
changes: { [key]: batch.changes[key] },
syncModes: { [key]: batch.syncModes[key] },
authorities: { [key]: batch.authorities[key] },
scopes: { [key]: batch.scopes[key] },
priorities: { [key]: batch.priorities[key] }
};
batches.push(filteredBatch);
}
} else {
batches.push(batch);
}
}
}
// 清空立即同步队列
this.immediateSyncQueue.clear();
// 发送批次
for (const batch of batches) {
this.emit('syncBatchReady', batch);
this.emit('syncCompleted', batch.instanceId, Object.keys(batch.changes).length);
}
}
/**
* 清理所有脏标记
*/
private clearAllDirtyFlags(): void {
for (const instanceId of this.dirtyInstances) {
const instance = this.registeredInstances.get(instanceId);
if (instance) {
clearDirtyFlags(instance);
}
}
this.dirtyInstances.clear();
this.stats.dirtyInstances = 0;
}
/**
* 启动自动同步
*/
private startAutoSync(): void {
if (this.autoSyncTimer || !this.autoSyncEnabled) {
return;
}
this.autoSyncTimer = setInterval(() => {
try {
const batches = this.syncNow();
for (const batch of batches) {
this.emit('syncBatchReady', batch);
this.emit('syncCompleted', batch.instanceId, Object.keys(batch.changes).length);
}
} catch (error) {
this.emit('syncError', error as Error);
}
}, this.syncRate);
}
/**
* 停止自动同步
*/
private stopAutoSync(): void {
if (this.autoSyncTimer) {
clearInterval(this.autoSyncTimer);
this.autoSyncTimer = null;
}
}
/**
* 重启自动同步
*/
private restartAutoSync(): void {
this.stopAutoSync();
this.startAutoSync();
}
}
// 全局单例访问
if (typeof window !== 'undefined') {
(window as any).SyncVarManager = SyncVarManager.getInstance();
}

View File

@@ -0,0 +1,6 @@
/**
* 同步系统导出
*/
export * from './SyncVarManager';
export * from './DeltaSync';