diff --git a/docs/guide/worker-system.md b/docs/guide/worker-system.md index d5650c45..e1e63eff 100644 --- a/docs/guide/worker-system.md +++ b/docs/guide/worker-system.md @@ -30,7 +30,8 @@ class PhysicsWorkerSystem extends WorkerEntitySystem { constructor() { super(Matcher.all(Position, Velocity, Physics), { enableWorker: true, // 启用Worker并行处理 - workerCount: 4, // Worker数量 + workerCount: 8, // Worker数量,系统会自动限制在硬件支持范围内 + entitiesPerWorker: 100, // 每个Worker处理的实体数量 useSharedArrayBuffer: true, // 启用SharedArrayBuffer优化 entityDataSize: 7, // 每个实体数据大小 maxEntities: 10000, // 最大实体数量 @@ -132,8 +133,10 @@ Worker系统支持丰富的配置选项: interface WorkerSystemConfig { /** 是否启用Worker并行处理 */ enableWorker?: boolean; - /** Worker数量,默认为CPU核心数 */ + /** Worker数量,默认为CPU核心数,自动限制在系统最大值内 */ workerCount?: number; + /** 每个Worker处理的实体数量,用于控制负载分布 */ + entitiesPerWorker?: number; /** 系统配置数据,会传递给Worker */ systemConfig?: any; /** 是否使用SharedArrayBuffer优化 */ @@ -153,8 +156,11 @@ constructor() { // 根据任务复杂度决定是否启用 enableWorker: this.shouldUseWorker(), - // 限制Worker数量,避免创建过多线程 - workerCount: Math.min(navigator.hardwareConcurrency || 2, 4), + // Worker数量:系统会自动限制在硬件支持范围内 + workerCount: 8, // 请求8个Worker,实际数量受CPU核心数限制 + + // 每个Worker处理的实体数量(可选) + entitiesPerWorker: 200, // 精确控制负载分布 // 大量简单计算时启用SharedArrayBuffer useSharedArrayBuffer: this.entityCount > 1000, @@ -178,6 +184,14 @@ private shouldUseWorker(): boolean { // 根据实体数量和计算复杂度决定 return this.expectedEntityCount > 100; } + +// 获取系统信息 +getSystemInfo() { + const info = this.getWorkerInfo(); + console.log(`Worker数量: ${info.workerCount}/${info.maxSystemWorkerCount}`); + console.log(`每Worker实体数: ${info.entitiesPerWorker || '自动分配'}`); + console.log(`当前模式: ${info.currentMode}`); +} ``` ## 处理模式 @@ -271,7 +285,8 @@ class ParticlePhysicsWorkerSystem extends WorkerEntitySystem { constructor() { super(Matcher.all(Position, Velocity, Physics, Renderable), { enableWorker: true, - workerCount: navigator.hardwareConcurrency || 2, + workerCount: 6, // 请求6个Worker,自动限制在CPU核心数内 + entitiesPerWorker: 150, // 每个Worker处理150个粒子 useSharedArrayBuffer: true, entityDataSize: 9, maxEntities: 5000, @@ -435,8 +450,11 @@ class ParticlePhysicsWorkerSystem extends WorkerEntitySystem { public getPerformanceInfo(): { enabled: boolean; workerCount: number; + entitiesPerWorker?: number; + maxSystemWorkerCount: number; entityCount: number; isProcessing: boolean; + currentMode: string; } { const workerInfo = this.getWorkerInfo(); return { @@ -519,10 +537,12 @@ interface ComplexData { ### 3. Worker数量控制 ```typescript -// ✅ 推荐:适当的Worker数量 +// ✅ 推荐:灵活的Worker配置 constructor() { super(matcher, { - workerCount: Math.min(navigator.hardwareConcurrency || 2, 4), // 限制最大数量 + // 直接指定需要的Worker数量,系统会自动限制在硬件支持范围内 + workerCount: 8, // 请求8个Worker + entitiesPerWorker: 100, // 每个Worker处理100个实体 enableWorker: this.shouldUseWorker(), // 条件启用 }); } @@ -531,6 +551,15 @@ private shouldUseWorker(): boolean { // 根据实体数量和复杂度决定是否使用Worker return this.expectedEntityCount > 100; } + +// 获取实际使用的Worker信息 +checkWorkerConfiguration() { + const info = this.getWorkerInfo(); + console.log(`请求Worker数量: 8`); + console.log(`实际Worker数量: ${info.workerCount}`); + console.log(`系统最大支持: ${info.maxSystemWorkerCount}`); + console.log(`每Worker实体数: ${info.entitiesPerWorker || '自动分配'}`); +} ``` ### 4. 性能监控 @@ -557,15 +586,15 @@ public getPerformanceMetrics(): WorkerPerformanceMetrics { - 保持数据结构简单和扁平 - 避免频繁的大数据传输 -### 3. 批处理大小 -根据实体数量和Worker数量调整批处理大小,平衡负载和开销。 - -### 4. 降级策略 +### 3. 降级策略 始终提供主线程回退方案,确保在不支持Worker的环境中正常运行。 -### 5. 内存管理 +### 4. 内存管理 及时清理Worker池和共享缓冲区,避免内存泄漏。 +### 5. 负载均衡 +使用 `entitiesPerWorker` 参数精确控制负载分布,避免某些Worker空闲而其他Worker过载。 + ## 在线演示 查看完整的Worker系统演示:[Worker系统演示](https://esengine.github.io/ecs-framework/demos/worker-system/) diff --git a/packages/core/src/ECS/Systems/EntitySystem.ts b/packages/core/src/ECS/Systems/EntitySystem.ts index cbf1a5cd..a789f0d8 100644 --- a/packages/core/src/ECS/Systems/EntitySystem.ts +++ b/packages/core/src/ECS/Systems/EntitySystem.ts @@ -5,6 +5,7 @@ import type { Scene } from '../Scene'; import type { ISystemBase } from '../../Types'; import type { QuerySystem } from '../Core/QuerySystem'; import { getSystemInstanceTypeName } from '../Decorators'; +import { createLogger } from '../../Utils/Logger'; import type { EventListenerConfig, TypeSafeEventSystem, EventHandler } from '../Core/EventSystem'; /** @@ -49,6 +50,7 @@ export abstract class EntitySystem implements ISystemBase { private _matcher: Matcher; private _eventListeners: EventListenerRecord[]; private _scene: Scene | null; + protected logger = createLogger('EntitySystem'); /** * 实体ID映射缓存 @@ -702,7 +704,7 @@ export abstract class EntitySystem implements ISystemBase { config?: EventListenerConfig ): void { if (!this.scene?.eventSystem) { - console.warn(`[${this.systemName}] Cannot add event listener: scene.eventSystem not available`); + this.logger.warn(`${this.systemName}: 无法添加事件监听器,scene.eventSystem 不可用`); return; } @@ -754,7 +756,7 @@ export abstract class EntitySystem implements ISystemBase { try { listener.eventSystem.off(listener.eventType, listener.listenerRef); } catch (error) { - console.warn(`[${this.systemName}] Failed to remove event listener for "${listener.eventType}":`, error); + this.logger.warn(`${this.systemName}: 移除事件监听器失败 "${listener.eventType}"`, error); } } diff --git a/packages/core/src/ECS/Systems/WorkerEntitySystem.ts b/packages/core/src/ECS/Systems/WorkerEntitySystem.ts index 98155dbc..523657fc 100644 --- a/packages/core/src/ECS/Systems/WorkerEntitySystem.ts +++ b/packages/core/src/ECS/Systems/WorkerEntitySystem.ts @@ -2,6 +2,7 @@ import { Entity } from '../Entity'; import { EntitySystem } from './EntitySystem'; import { Matcher } from '../Utils/Matcher'; import { Time } from '../../Utils/Time'; +import { createLogger } from '../../Utils/Logger'; import type { IComponent } from '../../Types'; /** @@ -20,8 +21,10 @@ export type WorkerProcessFunction = any> = ( export interface WorkerSystemConfig { /** 是否启用Worker并行处理 */ enableWorker?: boolean; - /** Worker数量,默认为CPU核心数 */ + /** Worker数量,默认为CPU核心数,自动限制在系统最大值内 */ workerCount?: number; + /** 每个Worker处理的实体数量,用于控制负载分布 */ + entitiesPerWorker?: number; /** 系统配置数据,会传递给Worker */ systemConfig?: any; /** 是否使用SharedArrayBuffer优化 */ @@ -63,7 +66,8 @@ export type SharedArrayBufferProcessFunction = ( * constructor() { * super(Matcher.all(Transform, Velocity), { * enableWorker: true, - * workerCount: 4, + * workerCount: 8, // 指定8个worker,系统会自动限制在系统最大值内 + * entitiesPerWorker: 100, // 每个worker处理100个实体 * useSharedArrayBuffer: true, * entityDataSize: 6, // x, y, vx, vy, radius, mass * maxEntities: 10000, @@ -183,19 +187,34 @@ export type SharedArrayBufferProcessFunction = ( * ``` */ export abstract class WorkerEntitySystem extends EntitySystem { - protected config: Required> & { systemConfig?: any }; + protected config: Required> & { + systemConfig?: any; + entitiesPerWorker?: number; + }; private workerPool: WebWorkerPool | null = null; private isProcessing = false; protected sharedBuffer: SharedArrayBuffer | null = null; protected sharedFloatArray: Float32Array | null = null; + private logger = createLogger('WorkerEntitySystem'); constructor(matcher?: Matcher, config: WorkerSystemConfig = {}) { super(matcher); + // 验证和调整 worker 数量,确保不超过系统最大值 + const requestedWorkerCount = config.workerCount ?? this.getMaxSystemWorkerCount(); + const maxSystemWorkerCount = this.getMaxSystemWorkerCount(); + const validatedWorkerCount = Math.min(requestedWorkerCount, maxSystemWorkerCount); + + // 如果用户请求的数量超过系统最大值,给出警告 + if (requestedWorkerCount > maxSystemWorkerCount) { + this.logger.warn(`请求 ${requestedWorkerCount} 个Worker,但系统最多支持 ${maxSystemWorkerCount} 个。实际使用 ${validatedWorkerCount} 个Worker。`); + } + this.config = { enableWorker: config.enableWorker ?? true, - workerCount: config.workerCount ?? this.getOptimalWorkerCount(), + workerCount: validatedWorkerCount, systemConfig: config.systemConfig, + entitiesPerWorker: config.entitiesPerWorker, useSharedArrayBuffer: config.useSharedArrayBuffer ?? this.isSharedArrayBufferSupported(), entityDataSize: config.entityDataSize ?? this.getDefaultEntityDataSize(), maxEntities: config.maxEntities ?? 10000 @@ -226,13 +245,14 @@ export abstract class WorkerEntitySystem extends EntitySystem } /** - * 获取最优Worker数量 + * 获取系统支持的最大Worker数量 */ - private getOptimalWorkerCount(): number { + private getMaxSystemWorkerCount(): number { if (typeof navigator !== 'undefined' && navigator.hardwareConcurrency) { - return Math.min(navigator.hardwareConcurrency, 4); + // 使用全部CPU核心数作为最大限制 + return navigator.hardwareConcurrency; } - return 2; + return 4; // 降级默认值 } /** @@ -248,7 +268,7 @@ export abstract class WorkerEntitySystem extends EntitySystem try { // 检查是否支持SharedArrayBuffer if (!this.isSharedArrayBufferSupported()) { - console.warn(`[${this.systemName}] SharedArrayBuffer not supported, falling back to single Worker mode for collision detection integrity`); + this.logger.warn(`${this.systemName}: 不支持SharedArrayBuffer,降级到单Worker模式以保证数据处理完整性`); this.config.useSharedArrayBuffer = false; // 降级到单Worker模式:确保所有实体在同一个Worker中处理,维持实体间交互的完整性 this.config.workerCount = 1; @@ -261,9 +281,9 @@ export abstract class WorkerEntitySystem extends EntitySystem this.sharedBuffer = new SharedArrayBuffer(bufferSize); this.sharedFloatArray = new Float32Array(this.sharedBuffer); - console.log(`[${this.systemName}] SharedArrayBuffer initialized successfully (${bufferSize} bytes)`); + this.logger.info(`${this.systemName}: SharedArrayBuffer初始化成功 (${bufferSize} 字节)`); } catch (error) { - console.warn(`[${this.systemName}] SharedArrayBuffer init failed, falling back to single Worker mode for collision detection integrity:`, error); + this.logger.warn(`${this.systemName}: SharedArrayBuffer初始化失败,降级到单Worker模式以保证数据处理完整性`, error); this.config.useSharedArrayBuffer = false; this.sharedBuffer = null; this.sharedFloatArray = null; @@ -283,7 +303,7 @@ export abstract class WorkerEntitySystem extends EntitySystem this.sharedBuffer // 传递SharedArrayBuffer给Worker池 ); } catch (error) { - console.error(`[${this.systemName}] Failed to initialize worker pool:`, error); + this.logger.error(`${this.systemName}: Worker池初始化失败`, error); this.config.enableWorker = false; } } @@ -399,7 +419,7 @@ export abstract class WorkerEntitySystem extends EntitySystem } else { // 如果配置了SharedArrayBuffer但不可用,记录降级信息 if (this.config.useSharedArrayBuffer) { - console.log(`[${this.systemName}] Falling back to traditional Worker mode for this frame`); + this.logger.info(`${this.systemName}: 本帧降级到传统Worker模式`); } // 传统Worker异步处理 this.processWithWorker(entities).finally(() => { @@ -408,13 +428,13 @@ export abstract class WorkerEntitySystem extends EntitySystem } } else { // 同步处理(最后的fallback) - console.log(`[${this.systemName}] Worker not available, processing synchronously`); + this.logger.info(`${this.systemName}: Worker不可用,使用同步处理`); this.processSynchronously(entities); this.isProcessing = false; } } catch (error) { this.isProcessing = false; - console.error(`[${this.systemName}] Processing failed:`, error); + this.logger.error(`${this.systemName}: 处理失败`, error); throw error; } } @@ -506,19 +526,36 @@ export abstract class WorkerEntitySystem extends EntitySystem } /** - * 创建数据批次 - 按Worker数量平均分配 + * 创建数据批次 - 支持用户指定每个Worker的实体数量 */ private createBatches(data: T[]): T[][] { const workerCount = this.config.workerCount; const batches: T[][] = []; - const batchSize = Math.ceil(data.length / workerCount); - for (let i = 0; i < workerCount; i++) { - const startIndex = i * batchSize; - const endIndex = Math.min(startIndex + batchSize, data.length); + // 如果用户指定了每个Worker处理的实体数量 + if (this.config.entitiesPerWorker) { + const entitiesPerWorker = this.config.entitiesPerWorker; - if (startIndex < data.length) { - batches.push(data.slice(startIndex, endIndex)); + for (let i = 0; i < data.length; i += entitiesPerWorker) { + const endIndex = Math.min(i + entitiesPerWorker, data.length); + batches.push(data.slice(i, endIndex)); + } + + // 限制批次数量不超过Worker数量 + if (batches.length > workerCount) { + this.logger.warn(`${this.systemName}: 创建了 ${batches.length} 个批次,但只有 ${workerCount} 个Worker。某些Worker将依次处理多个批次。`); + } + } else { + // 默认行为:按Worker数量平均分配 + const batchSize = Math.ceil(data.length / workerCount); + + for (let i = 0; i < workerCount; i++) { + const startIndex = i * batchSize; + const endIndex = Math.min(startIndex + batchSize, data.length); + + if (startIndex < data.length) { + batches.push(data.slice(startIndex, endIndex)); + } } } @@ -549,17 +586,26 @@ export abstract class WorkerEntitySystem extends EntitySystem protected abstract writeEntityToBuffer(entityData: TEntityData, offset: number): void; /** - * 创建SharedArrayBuffer任务 + * 创建SharedArrayBuffer任务 - 支持用户指定每个Worker的实体数量 */ private createSharedArrayBufferTasks(entityCount: number): Promise[] { const promises: Promise[] = []; - const entitiesPerWorker = Math.ceil(entityCount / this.config.workerCount); - for (let i = 0; i < this.config.workerCount; i++) { - const startIndex = i * entitiesPerWorker; - const endIndex = Math.min(startIndex + entitiesPerWorker, entityCount); + // 如果用户指定了每个Worker处理的实体数量 + if (this.config.entitiesPerWorker) { + const entitiesPerWorker = this.config.entitiesPerWorker; + const tasksNeeded = Math.ceil(entityCount / entitiesPerWorker); + const availableWorkers = this.config.workerCount; + + // 如果任务数超过Worker数量,警告用户 + if (tasksNeeded > availableWorkers) { + this.logger.warn(`${this.systemName}: 需要 ${tasksNeeded} 个任务处理 ${entityCount} 个实体(每任务 ${entitiesPerWorker} 个),但只有 ${availableWorkers} 个Worker。某些Worker将依次处理多个任务。`); + } + + for (let i = 0; i < entityCount; i += entitiesPerWorker) { + const startIndex = i; + const endIndex = Math.min(i + entitiesPerWorker, entityCount); - if (startIndex < entityCount) { const promise = this.workerPool!.executeSharedBuffer({ startIndex, endIndex, @@ -568,6 +614,24 @@ export abstract class WorkerEntitySystem extends EntitySystem }); promises.push(promise); } + } else { + // 默认行为:按Worker数量平均分配 + const entitiesPerWorker = Math.ceil(entityCount / this.config.workerCount); + + for (let i = 0; i < this.config.workerCount; i++) { + const startIndex = i * entitiesPerWorker; + const endIndex = Math.min(startIndex + entitiesPerWorker, entityCount); + + if (startIndex < entityCount) { + const promise = this.workerPool!.executeSharedBuffer({ + startIndex, + endIndex, + deltaTime: Time.deltaTime, + systemConfig: this.config.systemConfig + }); + promises.push(promise); + } + } } return promises; @@ -637,6 +701,19 @@ export abstract class WorkerEntitySystem extends EntitySystem */ public updateConfig(newConfig: Partial): void { const oldConfig = { ...this.config }; + + // 如果更新了workerCount,需要验证并调整 + if (newConfig.workerCount !== undefined) { + const maxSystemWorkerCount = this.getMaxSystemWorkerCount(); + const validatedWorkerCount = Math.min(newConfig.workerCount, maxSystemWorkerCount); + + if (newConfig.workerCount > maxSystemWorkerCount) { + this.logger.warn(`请求 ${newConfig.workerCount} 个Worker,但系统最多支持 ${maxSystemWorkerCount} 个。实际使用 ${validatedWorkerCount} 个Worker。`); + } + + newConfig.workerCount = validatedWorkerCount; + } + Object.assign(this.config, newConfig); // 如果 SharedArrayBuffer 设置发生变化,需要重新初始化 @@ -709,6 +786,8 @@ export abstract class WorkerEntitySystem extends EntitySystem public getWorkerInfo(): { enabled: boolean; workerCount: number; + entitiesPerWorker?: number; + maxSystemWorkerCount: number; isProcessing: boolean; sharedArrayBufferSupported: boolean; sharedArrayBufferEnabled: boolean; @@ -727,6 +806,8 @@ export abstract class WorkerEntitySystem extends EntitySystem return { enabled: this.config.enableWorker, workerCount: this.config.workerCount, + entitiesPerWorker: this.config.entitiesPerWorker, + maxSystemWorkerCount: this.getMaxSystemWorkerCount(), isProcessing: this.isProcessing, sharedArrayBufferSupported: this.isSharedArrayBufferSupported(), sharedArrayBufferEnabled: this.config.useSharedArrayBuffer,