refactor(core): 优化 PlatformWorkerPool 和 IncrementalSerializer 代码规范 (#335)

PlatformWorkerPool:
- 添加 IWorkerPoolStatus 导出接口
- 添加便捷 getter: isDestroyed, workerCount, isReady, hasPendingTasks
- 提取 createTask/completeTask 方法减少重复代码
- 添加 ERROR_POOL_DESTROYED 常量
- 添加代码段分隔符和双语注释

IncrementalSerializer:
- 所有公共 API 添加 @zh/@en 双语注释
- 类型改用 interface 并添加 readonly
- SceneDataChange.value 从 any 改为 unknown
- 导出 SceneSnapshot, IIncrementalStats 接口
- 提取 DEFAULT_OPTIONS 常量
- 优化 getIncrementalStats 从 6 次 filter 改为 2 次循环
This commit is contained in:
YHH
2025-12-25 18:39:15 +08:00
committed by GitHub
parent c2ebd387f2
commit 56e322de7f
3 changed files with 475 additions and 314 deletions

View File

@@ -1,8 +1,11 @@
/**
* 增量序列化器
* @zh 增量序列化器
* @en Incremental Serializer
*
* 提供高性能的增量序列化支持,只序列化变更的数据
* 适用于网络同步、大场景存档、时间回溯等场景
* @zh 提供高性能的增量序列化支持,只序列化变更的数据
* 适用于网络同步、大场景存档、时间回溯等场景
* @en Provides high-performance incremental serialization, serializing only changed data.
* Suitable for network sync, large scene archiving, and time rewind scenarios.
*/
import type { IScene } from '../IScene';
@@ -14,177 +17,263 @@ import { BinarySerializer } from '../../Utils/BinarySerializer';
import { HierarchyComponent } from '../Components/HierarchyComponent';
import { HierarchySystem } from '../Systems/HierarchySystem';
// =============================================================================
// 枚举 | Enums
// =============================================================================
/**
* 变更操作类型
* @zh 变更操作类型
* @en Change operation type
*/
export enum ChangeOperation {
/** 添加新实体 */
/** @zh 添加新实体 @en Entity added */
EntityAdded = 'entity_added',
/** 删除实体 */
/** @zh 删除实体 @en Entity removed */
EntityRemoved = 'entity_removed',
/** 实体属性更新 */
/** @zh 实体属性更新 @en Entity updated */
EntityUpdated = 'entity_updated',
/** 添加组件 */
/** @zh 添加组件 @en Component added */
ComponentAdded = 'component_added',
/** 删除组件 */
/** @zh 删除组件 @en Component removed */
ComponentRemoved = 'component_removed',
/** 组件数据更新 */
/** @zh 组件数据更新 @en Component updated */
ComponentUpdated = 'component_updated',
/** 场景数据更新 */
/** @zh 场景数据更新 @en Scene data updated */
SceneDataUpdated = 'scene_data_updated'
}
// =============================================================================
// 类型定义 | Type Definitions
// =============================================================================
/**
* 实体变更记录
* @zh 实体变更记录
* @en Entity change record
*/
export type EntityChange = {
/** 操作类型 */
operation: ChangeOperation;
/** 实体ID */
entityId: number;
/** 实体名称用于Added操作 */
entityName?: string;
/** 实体数据用于Added/Updated操作 */
entityData?: Partial<SerializedEntity>;
export interface EntityChange {
/** @zh 操作类型 @en Operation type */
readonly operation: ChangeOperation;
/** @zh 实体ID @en Entity ID */
readonly entityId: number;
/** @zh 实体名称用于Added操作@en Entity name (for Added operation) */
readonly entityName?: string;
/** @zh 实体数据用于Added/Updated操作@en Entity data (for Added/Updated operation) */
readonly entityData?: Partial<SerializedEntity>;
}
/**
* 组件变更记录
* @zh 组件变更记录
* @en Component change record
*/
export type ComponentChange = {
/** 操作类型 */
operation: ChangeOperation;
/** 实体ID */
entityId: number;
/** 组件类型名称 */
componentType: string;
/** 组件数据用于Added/Updated操作 */
componentData?: SerializedComponent;
export interface ComponentChange {
/** @zh 操作类型 @en Operation type */
readonly operation: ChangeOperation;
/** @zh 实体ID @en Entity ID */
readonly entityId: number;
/** @zh 组件类型名称 @en Component type name */
readonly componentType: string;
/** @zh 组件数据用于Added/Updated操作@en Component data (for Added/Updated operation) */
readonly componentData?: SerializedComponent;
}
/**
* 场景数据变更记录
* @zh 场景数据变更记录
* @en Scene data change record
*/
export type SceneDataChange = {
/** 操作类型 */
operation: ChangeOperation;
/** 变更的键 */
key: string;
/** 新值 */
value: any;
/** 是否删除 */
deleted?: boolean;
export interface SceneDataChange {
/** @zh 操作类型 @en Operation type */
readonly operation: ChangeOperation;
/** @zh 变更的键 @en Changed key */
readonly key: string;
/** @zh 新值 @en New value */
readonly value: unknown;
/** @zh 是否删除 @en Whether deleted */
readonly deleted?: boolean;
}
/**
* 增量序列化数据
* @zh 增量序列化数据
* @en Incremental snapshot data
*/
export type IncrementalSnapshot = {
/** 快照版本号 */
version: number;
/** 时间戳 */
timestamp: number;
/** 场景名称 */
sceneName: string;
/** 基础版本号(相对于哪个快照的增量) */
baseVersion: number;
/** 实体变更列表 */
entityChanges: EntityChange[];
/** 组件变更列表 */
componentChanges: ComponentChange[];
/** 场景数据变更列表 */
sceneDataChanges: SceneDataChange[];
export interface IncrementalSnapshot {
/** @zh 快照版本号 @en Snapshot version */
readonly version: number;
/** @zh 时间戳 @en Timestamp */
readonly timestamp: number;
/** @zh 场景名称 @en Scene name */
readonly sceneName: string;
/** @zh 基础版本号(相对于哪个快照的增量)@en Base version (incremental from which snapshot) */
readonly baseVersion: number;
/** @zh 实体变更列表 @en Entity changes list */
readonly entityChanges: EntityChange[];
/** @zh 组件变更列表 @en Component changes list */
readonly componentChanges: ComponentChange[];
/** @zh 场景数据变更列表 @en Scene data changes list */
readonly sceneDataChanges: SceneDataChange[];
}
/**
* 场景快照(用于对比)
* @zh 实体快照数据
* @en Entity snapshot data
*/
interface SceneSnapshot {
/** 快照版本号 */
version: number;
/** 实体ID集合 */
entityIds: Set<number>;
/** 实体数据映射 */
entities: Map<number, {
name: string;
tag: number;
active: boolean;
enabled: boolean;
updateOrder: number;
parentId?: number;
}>;
/** 组件数据映射 (entityId -> componentType -> serializedData) */
components: Map<number, Map<string, string>>; // 使用JSON字符串存储组件数据
/** 场景自定义数据 */
sceneData: Map<string, string>; // 使用JSON字符串存储场景数据
interface EntitySnapshotData {
readonly name: string;
readonly tag: number;
readonly active: boolean;
readonly enabled: boolean;
readonly updateOrder: number;
readonly parentId?: number;
}
/**
* 增量序列化格式
* @zh 场景快照(用于对比)
* @en Scene snapshot (for comparison)
*/
export interface SceneSnapshot {
/** @zh 快照版本号 @en Snapshot version */
readonly version: number;
/** @zh 实体ID集合 @en Entity ID set */
readonly entityIds: Set<number>;
/** @zh 实体数据映射 @en Entity data map */
readonly entities: Map<number, EntitySnapshotData>;
/** @zh 组件数据映射 (entityId -> componentType -> serializedData JSON) @en Component data map */
readonly components: Map<number, Map<string, string>>;
/** @zh 场景自定义数据 @en Scene custom data */
readonly sceneData: Map<string, string>;
}
/**
* @zh 增量序列化格式
* @en Incremental serialization format
*/
export type IncrementalSerializationFormat = 'json' | 'binary';
/**
* 增量序列化选项
* @zh 增量快照统计信息
* @en Incremental snapshot statistics
*/
export type IncrementalSerializationOptions = {
export interface IIncrementalStats {
/** @zh 总变更数 @en Total changes */
readonly totalChanges: number;
/** @zh 实体变更数 @en Entity changes count */
readonly entityChanges: number;
/** @zh 组件变更数 @en Component changes count */
readonly componentChanges: number;
/** @zh 场景数据变更数 @en Scene data changes count */
readonly sceneDataChanges: number;
/** @zh 新增实体数 @en Added entities count */
readonly addedEntities: number;
/** @zh 删除实体数 @en Removed entities count */
readonly removedEntities: number;
/** @zh 更新实体数 @en Updated entities count */
readonly updatedEntities: number;
/** @zh 新增组件数 @en Added components count */
readonly addedComponents: number;
/** @zh 删除组件数 @en Removed components count */
readonly removedComponents: number;
/** @zh 更新组件数 @en Updated components count */
readonly updatedComponents: number;
}
/**
* @zh 增量序列化选项
* @en Incremental serialization options
*/
export interface IncrementalSerializationOptions {
/**
* 是否包含组件数据的深度对比
* 默认true设为false可提升性能但可能漏掉组件内部字段变更
* @zh 实体过滤器 - 只快照符合条件的实体
* @en Entity filter - only snapshot entities that match the condition
*
* @example
* ```typescript
* // 只快照玩家实体
* const snapshot = IncrementalSerializer.createSnapshot(scene, {
* entityFilter: (entity) => entity.tag === PLAYER_TAG
* });
*
* // 只快照有特定组件的实体
* const snapshot = IncrementalSerializer.createSnapshot(scene, {
* entityFilter: (entity) => entity.hasComponent(PlayerMarker)
* });
* ```
*/
entityFilter?: (entity: Entity) => boolean;
/**
* @zh 是否包含组件数据的深度对比默认true
* @en Whether to deep compare component data, default true
*/
deepComponentComparison?: boolean;
/**
* 是否跟踪场景数据变更
* 默认true
* @zh 是否跟踪场景数据变更默认true
* @en Whether to track scene data changes, default true
*/
trackSceneData?: boolean;
/**
* 是否压缩快照使用JSON序列化
* 默认false设为true可减少内存占用但增加CPU开销
* @zh 是否压缩快照默认false
* @en Whether to compress snapshot, default false
*/
compressSnapshot?: boolean;
/**
* 序列化格式
* - 'json': JSON格式
* - 'binary': 二进制格式
* 默认 'json'
* @zh 序列化格式,默认 'json'
* @en Serialization format, default 'json'
*/
format?: IncrementalSerializationFormat;
/**
* 是否美化JSON输出仅在format='json'时有效)
* 默认false
* @zh 是否美化JSON输出仅在format='json'时有效)默认false
* @en Whether to prettify JSON output (only for format='json'), default false
*/
pretty?: boolean;
}
// =============================================================================
// 常量 | Constants
// =============================================================================
const DEFAULT_OPTIONS: Required<Omit<IncrementalSerializationOptions, 'entityFilter'>> = {
deepComponentComparison: true,
trackSceneData: true,
compressSnapshot: false,
format: 'json',
pretty: false
};
// =============================================================================
// IncrementalSerializer
// =============================================================================
/**
* 增量序列化器类
* @zh 增量序列化器类
* @en Incremental serializer class
*
* @zh 提供场景快照创建、增量计算、应用和序列化功能
* @en Provides scene snapshot creation, incremental computation, application and serialization
*/
export class IncrementalSerializer {
/** 当前快照版本号 */
/** @zh 当前快照版本号 @en Current snapshot version */
private static snapshotVersion = 0;
// =========================================================================
// 快照创建 | Snapshot Creation
// =========================================================================
/**
* 创建场景快照
* @zh 创建场景快照
* @en Create scene snapshot
*
* @param scene 要快照的场景
* @param options 序列化选项
* @returns 场景快照对象
* @param scene - @zh 要快照的场景 @en Scene to snapshot
* @param options - @zh 序列化选项 @en Serialization options
* @returns @zh 场景快照对象 @en Scene snapshot object
*/
public static createSnapshot(
scene: IScene,
options?: IncrementalSerializationOptions
): SceneSnapshot {
const opts = {
deepComponentComparison: true,
trackSceneData: true,
compressSnapshot: false,
...options
};
const opts = { ...DEFAULT_OPTIONS, ...options };
const snapshot: SceneSnapshot = {
version: ++this.snapshotVersion,
@@ -194,8 +283,13 @@ export class IncrementalSerializer {
sceneData: new Map()
};
// 快照所有实体
// 快照实体(支持过滤)
for (const entity of scene.entities.buffer) {
// 应用实体过滤器
if (opts.entityFilter && !opts.entityFilter(entity)) {
continue;
}
snapshot.entityIds.add(entity.id);
// 获取层级信息
@@ -243,24 +337,25 @@ export class IncrementalSerializer {
return snapshot;
}
// =========================================================================
// 增量计算 | Incremental Computation
// =========================================================================
/**
* 计算增量变更
* @zh 计算增量变更
* @en Compute incremental changes
*
* @param scene 当前场景
* @param baseSnapshot 基础快照
* @param options 序列化选项
* @returns 增量快照
* @param scene - @zh 当前场景 @en Current scene
* @param baseSnapshot - @zh 基础快照 @en Base snapshot
* @param options - @zh 序列化选项 @en Serialization options
* @returns @zh 增量快照 @en Incremental snapshot
*/
public static computeIncremental(
scene: IScene,
baseSnapshot: SceneSnapshot,
options?: IncrementalSerializationOptions
): IncrementalSnapshot {
const opts = {
deepComponentComparison: true,
trackSceneData: true,
...options
};
const opts = { ...DEFAULT_OPTIONS, ...options };
const incremental: IncrementalSnapshot = {
version: ++this.snapshotVersion,
@@ -274,8 +369,13 @@ export class IncrementalSerializer {
const currentEntityIds = new Set<number>();
// 检测实体变更
// 检测实体变更(支持过滤)
for (const entity of scene.entities.buffer) {
// 应用实体过滤器
if (opts.entityFilter && !opts.entityFilter(entity)) {
continue;
}
currentEntityIds.add(entity.id);
// 获取层级信息
@@ -372,8 +472,13 @@ export class IncrementalSerializer {
return incremental;
}
// =========================================================================
// 私有方法 - 变更检测 | Private Methods - Change Detection
// =========================================================================
/**
* 检测组件变更
* @zh 检测组件变更
* @en Detect component changes
*/
private static detectComponentChanges(
entity: Entity,
@@ -429,7 +534,8 @@ export class IncrementalSerializer {
}
/**
* 检测场景数据变更
* @zh 检测场景数据变更
* @en Detect scene data changes
*/
private static detectSceneDataChanges(
scene: IScene,
@@ -466,12 +572,17 @@ export class IncrementalSerializer {
}
}
// =========================================================================
// 增量应用 | Incremental Application
// =========================================================================
/**
* 应用增量变更到场景
* @zh 应用增量变更到场景
* @en Apply incremental changes to scene
*
* @param scene 目标场景
* @param incremental 增量快照
* @param componentRegistry 组件类型注册表
* @param scene - @zh 目标场景 @en Target scene
* @param incremental - @zh 增量快照 @en Incremental snapshot
* @param componentRegistry - @zh 组件类型注册表 @en Component type registry
*/
public static applyIncremental(
scene: IScene,
@@ -617,12 +728,17 @@ export class IncrementalSerializer {
}
}
// =========================================================================
// 序列化与反序列化 | Serialization & Deserialization
// =========================================================================
/**
* 序列化增量快照
* @zh 序列化增量快照
* @en Serialize incremental snapshot
*
* @param incremental 增量快照
* @param options 序列化选项
* @returns 序列化后的数据JSON字符串或二进制Uint8Array
* @param incremental - @zh 增量快照 @en Incremental snapshot
* @param options - @zh 序列化选项 @en Serialization options
* @returns @zh 序列化后的数据 @en Serialized data
*
* @example
* ```typescript
@@ -633,74 +749,62 @@ export class IncrementalSerializer {
* const binaryData = IncrementalSerializer.serializeIncremental(snapshot, {
* format: 'binary'
* });
*
* // 美化JSON
* const prettyJson = IncrementalSerializer.serializeIncremental(snapshot, {
* format: 'json',
* pretty: true
* });
* ```
*/
public static serializeIncremental(
incremental: IncrementalSnapshot,
options?: { format?: IncrementalSerializationFormat; pretty?: boolean }
): string | Uint8Array {
const opts = {
format: 'json' as IncrementalSerializationFormat,
pretty: false,
...options
};
const format = options?.format ?? 'json';
const pretty = options?.pretty ?? false;
if (opts.format === 'binary') {
if (format === 'binary') {
return BinarySerializer.encode(incremental);
} else {
return opts.pretty
? JSON.stringify(incremental, null, 2)
: JSON.stringify(incremental);
}
return pretty ? JSON.stringify(incremental, null, 2) : JSON.stringify(incremental);
}
/**
* 反序列化增量快照
* @zh 反序列化增量快照
* @en Deserialize incremental snapshot
*
* @param data 序列化的数据JSON字符串或二进制Uint8Array
* @returns 增量快照
*
* @example
* ```typescript
* // 从JSON反序列化
* const snapshot = IncrementalSerializer.deserializeIncremental(jsonString);
*
* // 从二进制反序列化
* const snapshot = IncrementalSerializer.deserializeIncremental(buffer);
* ```
* @param data - @zh 序列化的数据 @en Serialized data
* @returns @zh 增量快照 @en Incremental snapshot
*/
public static deserializeIncremental(data: string | Uint8Array): IncrementalSnapshot {
if (typeof data === 'string') {
return JSON.parse(data);
} else {
return BinarySerializer.decode(data) as IncrementalSnapshot;
}
return BinarySerializer.decode(data) as IncrementalSnapshot;
}
// =========================================================================
// 统计与工具 | Statistics & Utilities
// =========================================================================
/**
* 获取增量快照的统计信息
* @zh 获取增量快照的统计信息
* @en Get incremental snapshot statistics
*
* @param incremental 增量快照
* @returns 统计信息
* @param incremental - @zh 增量快照 @en Incremental snapshot
* @returns @zh 统计信息 @en Statistics
*/
public static getIncrementalStats(incremental: IncrementalSnapshot): {
totalChanges: number;
entityChanges: number;
componentChanges: number;
sceneDataChanges: number;
addedEntities: number;
removedEntities: number;
updatedEntities: number;
addedComponents: number;
removedComponents: number;
updatedComponents: number;
} {
public static getIncrementalStats(incremental: IncrementalSnapshot): IIncrementalStats {
const entityStats = { added: 0, removed: 0, updated: 0 };
const componentStats = { added: 0, removed: 0, updated: 0 };
for (const change of incremental.entityChanges) {
if (change.operation === ChangeOperation.EntityAdded) entityStats.added++;
else if (change.operation === ChangeOperation.EntityRemoved) entityStats.removed++;
else if (change.operation === ChangeOperation.EntityUpdated) entityStats.updated++;
}
for (const change of incremental.componentChanges) {
if (change.operation === ChangeOperation.ComponentAdded) componentStats.added++;
else if (change.operation === ChangeOperation.ComponentRemoved) componentStats.removed++;
else if (change.operation === ChangeOperation.ComponentUpdated) componentStats.updated++;
}
return {
totalChanges:
incremental.entityChanges.length +
@@ -709,29 +813,18 @@ export class IncrementalSerializer {
entityChanges: incremental.entityChanges.length,
componentChanges: incremental.componentChanges.length,
sceneDataChanges: incremental.sceneDataChanges.length,
addedEntities: incremental.entityChanges.filter(
(c) => c.operation === ChangeOperation.EntityAdded
).length,
removedEntities: incremental.entityChanges.filter(
(c) => c.operation === ChangeOperation.EntityRemoved
).length,
updatedEntities: incremental.entityChanges.filter(
(c) => c.operation === ChangeOperation.EntityUpdated
).length,
addedComponents: incremental.componentChanges.filter(
(c) => c.operation === ChangeOperation.ComponentAdded
).length,
removedComponents: incremental.componentChanges.filter(
(c) => c.operation === ChangeOperation.ComponentRemoved
).length,
updatedComponents: incremental.componentChanges.filter(
(c) => c.operation === ChangeOperation.ComponentUpdated
).length
addedEntities: entityStats.added,
removedEntities: entityStats.removed,
updatedEntities: entityStats.updated,
addedComponents: componentStats.added,
removedComponents: componentStats.removed,
updatedComponents: componentStats.updated
};
}
/**
* 重置快照版本号(用于测试)
* @zh 重置快照版本号(用于测试)
* @en Reset snapshot version (for testing)
*/
public static resetVersion(): void {
this.snapshotVersion = 0;

View File

@@ -5,6 +5,12 @@
import type { PlatformWorker } from '../../Platform/IPlatformAdapter';
// =============================================================================
// 常量 | Constants
// =============================================================================
const ERROR_POOL_DESTROYED = 'Worker pool has been destroyed';
// =============================================================================
// 类型定义 | Type Definitions
// =============================================================================
@@ -37,8 +43,25 @@ interface IWorkerMessageData {
}
/**
* @zh Worker 状态
* @en Worker state
* @zh Worker 状态接口
* @en Worker pool status interface
*/
export interface IWorkerPoolStatus {
/** @zh Worker 总数 @en Total number of workers */
readonly total: number;
/** @zh 空闲 Worker 数量 @en Number of idle workers */
readonly idle: number;
/** @zh 忙碌 Worker 数量 @en Number of busy workers */
readonly busy: number;
/** @zh 初始化中的 Worker 数量 @en Number of initializing workers */
readonly initializing: number;
/** @zh 队列中等待的任务数 @en Number of queued tasks */
readonly queuedTasks: number;
}
/**
* @zh Worker 状态枚举
* @en Worker state enum
*/
const enum WorkerState {
/** @zh 初始化中 @en Initializing */
@@ -66,7 +89,11 @@ export class PlatformWorkerPool {
private readonly pendingTasks: Map<number, IWorkerTask> = new Map();
private readonly taskQueue: IWorkerTask[] = [];
private taskCounter = 0;
private isDestroyed = false;
private _isDestroyed = false;
// =========================================================================
// 构造函数 | Constructor
// =========================================================================
/**
* @zh 创建 Worker 池
@@ -83,6 +110,136 @@ export class PlatformWorkerPool {
this.initializeWorkers(sharedBuffer);
}
// =========================================================================
// 公共属性 | Public Properties
// =========================================================================
/**
* @zh 池是否已销毁
* @en Whether the pool has been destroyed
*/
get isDestroyed(): boolean {
return this._isDestroyed;
}
/**
* @zh Worker 数量
* @en Number of workers in the pool
*/
get workerCount(): number {
return this.workers.length;
}
/**
* @zh 所有 Worker 是否已就绪(无初始化中的 Worker
* @en Whether all workers are ready (no initializing workers)
*/
get isReady(): boolean {
if (this._isDestroyed) return false;
for (const state of this.workerStates.values()) {
if (state === WorkerState.Initializing) return false;
}
return this.workers.length > 0;
}
/**
* @zh 是否有待处理的任务(队列中或执行中)
* @en Whether there are pending tasks (queued or executing)
*/
get hasPendingTasks(): boolean {
return this.taskQueue.length > 0 || this.pendingTasks.size > 0;
}
// =========================================================================
// 公共方法 | Public Methods
// =========================================================================
/**
* @zh 执行 SharedArrayBuffer 任务
* @en Execute SharedArrayBuffer task
*
* @param data - @zh 任务数据 @en Task data
* @returns @zh 任务完成的 Promise @en Promise that resolves when task completes
*/
executeSharedBuffer(data: Record<string, unknown>): Promise<void> {
return this.createTask<void>(
`shared-${++this.taskCounter}`,
{ ...data, type: 'shared' },
() => undefined
);
}
/**
* @zh 执行普通任务
* @en Execute normal task
*
* @param data - @zh 任务数据 @en Task data
* @returns @zh 包含任务结果的 Promise @en Promise with task result
*/
execute<TResult = unknown>(data: Record<string, unknown>): Promise<TResult> {
return this.createTask<TResult>(
`task-${++this.taskCounter}`,
data,
(result) => result as TResult
);
}
/**
* @zh 获取 Worker 池状态
* @en Get Worker pool status
*
* @returns @zh 池状态对象 @en Pool status object
*/
getStatus(): IWorkerPoolStatus {
const status = { idle: 0, busy: 0, initializing: 0 };
for (const state of this.workerStates.values()) {
if (state === WorkerState.Idle) status.idle++;
else if (state === WorkerState.Busy) status.busy++;
else if (state === WorkerState.Initializing) status.initializing++;
}
return {
total: this.workers.length,
...status,
queuedTasks: this.taskQueue.length
};
}
/**
* @zh 销毁 Worker 池,释放所有资源
* @en Destroy Worker pool and release all resources
*/
destroy(): void {
if (this._isDestroyed) return;
this._isDestroyed = true;
const destroyError = new Error(ERROR_POOL_DESTROYED);
// Reject all pending and queued tasks
for (const task of this.pendingTasks.values()) {
task.reject(destroyError);
}
for (const task of this.taskQueue) {
task.reject(destroyError);
}
// Terminate all workers
for (const worker of this.workers) {
worker.terminate();
}
// Clear state
this.workers.length = 0;
this.taskQueue.length = 0;
this.pendingTasks.clear();
this.workerStates.clear();
}
// =========================================================================
// 私有方法 | Private Methods
// =========================================================================
/**
* @zh 初始化所有 Worker
* @en Initialize all Workers
@@ -92,14 +249,10 @@ export class PlatformWorkerPool {
const worker = this.workers[i];
if (!worker) continue;
// Set initial state
this.workerStates.set(i, sharedBuffer ? WorkerState.Initializing : WorkerState.Idle);
// Bind message and error handlers
worker.onMessage((event) => this.handleMessage(i, event.data));
worker.onError((error) => this.handleError(i, error));
// Initialize SharedArrayBuffer if provided
if (sharedBuffer) {
worker.postMessage({ type: 'init', sharedBuffer });
}
@@ -107,46 +260,26 @@ export class PlatformWorkerPool {
}
/**
* @zh 执行 SharedArrayBuffer 任务
* @en Execute SharedArrayBuffer task
* @zh 创建并入队任务
* @en Create and enqueue task
*/
executeSharedBuffer(data: Record<string, unknown>): Promise<void> {
private createTask<T>(
id: string,
data: Record<string, unknown>,
transform: (result: unknown) => T
): Promise<T> {
return new Promise((resolve, reject) => {
if (this.isDestroyed) {
reject(new Error('Worker pool has been destroyed'));
if (this._isDestroyed) {
reject(new Error(ERROR_POOL_DESTROYED));
return;
}
const task: IWorkerTask = {
id: `shared-${++this.taskCounter}`,
data: { ...data, type: 'shared' },
resolve: () => resolve(),
reject
};
this.enqueueTask(task);
});
}
/**
* @zh 执行普通任务
* @en Execute normal task
*/
execute<TResult = unknown>(data: Record<string, unknown>): Promise<TResult> {
return new Promise((resolve, reject) => {
if (this.isDestroyed) {
reject(new Error('Worker pool has been destroyed'));
return;
}
const task: IWorkerTask = {
id: `task-${++this.taskCounter}`,
this.enqueueTask({
id,
data,
resolve: (result) => resolve(result as TResult),
resolve: (result) => resolve(transform(result)),
reject
};
this.enqueueTask(task);
});
});
}
@@ -208,27 +341,17 @@ export class PlatformWorkerPool {
* @en Handle Worker message
*/
private handleMessage(workerIndex: number, data: IWorkerMessageData): void {
// Handle initialization response
if (data.type === 'init') {
this.workerStates.set(workerIndex, WorkerState.Idle);
this.dispatchTasks();
return;
}
// Handle task response
const task = this.pendingTasks.get(workerIndex);
if (!task) return;
this.pendingTasks.delete(workerIndex);
this.workerStates.set(workerIndex, WorkerState.Idle);
if (data.error) {
task.reject(new Error(data.error));
this.completeTask(workerIndex, new Error(data.error));
} else {
task.resolve(data.result);
this.completeTask(workerIndex, undefined, data.result);
}
this.dispatchTasks();
}
/**
@@ -236,82 +359,26 @@ export class PlatformWorkerPool {
* @en Handle Worker error
*/
private handleError(workerIndex: number, error: ErrorEvent): void {
const task = this.pendingTasks.get(workerIndex);
this.completeTask(workerIndex, new Error(error.message));
}
if (task) {
this.pendingTasks.delete(workerIndex);
this.workerStates.set(workerIndex, WorkerState.Idle);
task.reject(new Error(error.message));
/**
* @zh 完成任务并释放 Worker
* @en Complete task and release Worker
*/
private completeTask(workerIndex: number, error?: Error, result?: unknown): void {
const task = this.pendingTasks.get(workerIndex);
if (!task) return;
this.pendingTasks.delete(workerIndex);
this.workerStates.set(workerIndex, WorkerState.Idle);
if (error) {
task.reject(error);
} else {
task.resolve(result);
}
this.dispatchTasks();
}
/**
* @zh 获取 Worker 池状态
* @en Get Worker pool status
*/
getStatus(): {
total: number;
idle: number;
busy: number;
initializing: number;
queuedTasks: number;
} {
let idle = 0;
let busy = 0;
let initializing = 0;
for (const state of this.workerStates.values()) {
switch (state) {
case WorkerState.Idle:
idle++;
break;
case WorkerState.Busy:
busy++;
break;
case WorkerState.Initializing:
initializing++;
break;
}
}
return {
total: this.workers.length,
idle,
busy,
initializing,
queuedTasks: this.taskQueue.length
};
}
/**
* @zh 销毁 Worker 池
* @en Destroy Worker pool
*/
destroy(): void {
if (this.isDestroyed) return;
this.isDestroyed = true;
// Reject all pending tasks
for (const task of this.pendingTasks.values()) {
task.reject(new Error('Worker pool destroyed'));
}
// Reject all queued tasks
for (const task of this.taskQueue) {
task.reject(new Error('Worker pool destroyed'));
}
// Terminate all workers
for (const worker of this.workers) {
worker.terminate();
}
// Clear state
this.workers.length = 0;
this.taskQueue.length = 0;
this.pendingTasks.clear();
this.workerStates.clear();
}
}

View File

@@ -14,6 +14,7 @@ export { IntervalSystem } from './IntervalSystem';
export { WorkerEntitySystem } from './WorkerEntitySystem';
export { HierarchySystem } from './HierarchySystem';
export { PlatformWorkerPool } from './PlatformWorkerPool';
export type { IWorkerPoolStatus } from './PlatformWorkerPool';
// =============================================================================
// Worker 系统类型导出 | Worker System Type Exports