允许用户自定义核心数量

This commit is contained in:
YHH
2025-09-28 23:35:25 +08:00
parent 66dc9780b9
commit 05f04ef37e
3 changed files with 154 additions and 42 deletions

View File

@@ -30,7 +30,8 @@ class PhysicsWorkerSystem extends WorkerEntitySystem<PhysicsData> {
constructor() {
super(Matcher.all(Position, Velocity, Physics), {
enableWorker: true, // 启用Worker并行处理
workerCount: 4, // Worker数量
workerCount: 8, // Worker数量,系统会自动限制在硬件支持范围内
entitiesPerWorker: 100, // 每个Worker处理的实体数量
useSharedArrayBuffer: true, // 启用SharedArrayBuffer优化
entityDataSize: 7, // 每个实体数据大小
maxEntities: 10000, // 最大实体数量
@@ -132,8 +133,10 @@ Worker系统支持丰富的配置选项
interface WorkerSystemConfig {
/** 是否启用Worker并行处理 */
enableWorker?: boolean;
/** Worker数量默认为CPU核心数 */
/** Worker数量默认为CPU核心数,自动限制在系统最大值内 */
workerCount?: number;
/** 每个Worker处理的实体数量用于控制负载分布 */
entitiesPerWorker?: number;
/** 系统配置数据会传递给Worker */
systemConfig?: any;
/** 是否使用SharedArrayBuffer优化 */
@@ -153,8 +156,11 @@ constructor() {
// 根据任务复杂度决定是否启用
enableWorker: this.shouldUseWorker(),
// 限制Worker数量,避免创建过多线程
workerCount: Math.min(navigator.hardwareConcurrency || 2, 4),
// Worker数量:系统会自动限制在硬件支持范围内
workerCount: 8, // 请求8个Worker实际数量受CPU核心数限制
// 每个Worker处理的实体数量可选
entitiesPerWorker: 200, // 精确控制负载分布
// 大量简单计算时启用SharedArrayBuffer
useSharedArrayBuffer: this.entityCount > 1000,
@@ -178,6 +184,14 @@ private shouldUseWorker(): boolean {
// 根据实体数量和计算复杂度决定
return this.expectedEntityCount > 100;
}
// 获取系统信息
getSystemInfo() {
const info = this.getWorkerInfo();
console.log(`Worker数量: ${info.workerCount}/${info.maxSystemWorkerCount}`);
console.log(`每Worker实体数: ${info.entitiesPerWorker || '自动分配'}`);
console.log(`当前模式: ${info.currentMode}`);
}
```
## 处理模式
@@ -271,7 +285,8 @@ class ParticlePhysicsWorkerSystem extends WorkerEntitySystem<ParticleData> {
constructor() {
super(Matcher.all(Position, Velocity, Physics, Renderable), {
enableWorker: true,
workerCount: navigator.hardwareConcurrency || 2,
workerCount: 6, // 请求6个Worker自动限制在CPU核心数内
entitiesPerWorker: 150, // 每个Worker处理150个粒子
useSharedArrayBuffer: true,
entityDataSize: 9,
maxEntities: 5000,
@@ -435,8 +450,11 @@ class ParticlePhysicsWorkerSystem extends WorkerEntitySystem<ParticleData> {
public getPerformanceInfo(): {
enabled: boolean;
workerCount: number;
entitiesPerWorker?: number;
maxSystemWorkerCount: number;
entityCount: number;
isProcessing: boolean;
currentMode: string;
} {
const workerInfo = this.getWorkerInfo();
return {
@@ -519,10 +537,12 @@ interface ComplexData {
### 3. Worker数量控制
```typescript
// ✅ 推荐:适当的Worker数量
// ✅ 推荐:灵活的Worker配置
constructor() {
super(matcher, {
workerCount: Math.min(navigator.hardwareConcurrency || 2, 4), // 限制最大数量
// 直接指定需要的Worker数量系统会自动限制在硬件支持范围内
workerCount: 8, // 请求8个Worker
entitiesPerWorker: 100, // 每个Worker处理100个实体
enableWorker: this.shouldUseWorker(), // 条件启用
});
}
@@ -531,6 +551,15 @@ private shouldUseWorker(): boolean {
// 根据实体数量和复杂度决定是否使用Worker
return this.expectedEntityCount > 100;
}
// 获取实际使用的Worker信息
checkWorkerConfiguration() {
const info = this.getWorkerInfo();
console.log(`请求Worker数量: 8`);
console.log(`实际Worker数量: ${info.workerCount}`);
console.log(`系统最大支持: ${info.maxSystemWorkerCount}`);
console.log(`每Worker实体数: ${info.entitiesPerWorker || '自动分配'}`);
}
```
### 4. 性能监控
@@ -557,15 +586,15 @@ public getPerformanceMetrics(): WorkerPerformanceMetrics {
- 保持数据结构简单和扁平
- 避免频繁的大数据传输
### 3. 批处理大小
根据实体数量和Worker数量调整批处理大小平衡负载和开销。
### 4. 降级策略
### 3. 降级策略
始终提供主线程回退方案确保在不支持Worker的环境中正常运行。
### 5. 内存管理
### 4. 内存管理
及时清理Worker池和共享缓冲区避免内存泄漏。
### 5. 负载均衡
使用 `entitiesPerWorker` 参数精确控制负载分布避免某些Worker空闲而其他Worker过载。
## 在线演示
查看完整的Worker系统演示[Worker系统演示](https://esengine.github.io/ecs-framework/demos/worker-system/)

View File

@@ -5,6 +5,7 @@ import type { Scene } from '../Scene';
import type { ISystemBase } from '../../Types';
import type { QuerySystem } from '../Core/QuerySystem';
import { getSystemInstanceTypeName } from '../Decorators';
import { createLogger } from '../../Utils/Logger';
import type { EventListenerConfig, TypeSafeEventSystem, EventHandler } from '../Core/EventSystem';
/**
@@ -49,6 +50,7 @@ export abstract class EntitySystem implements ISystemBase {
private _matcher: Matcher;
private _eventListeners: EventListenerRecord[];
private _scene: Scene | null;
protected logger = createLogger('EntitySystem');
/**
* 实体ID映射缓存
@@ -702,7 +704,7 @@ export abstract class EntitySystem implements ISystemBase {
config?: EventListenerConfig
): void {
if (!this.scene?.eventSystem) {
console.warn(`[${this.systemName}] Cannot add event listener: scene.eventSystem not available`);
this.logger.warn(`${this.systemName}: 无法添加事件监听器,scene.eventSystem 不可用`);
return;
}
@@ -754,7 +756,7 @@ export abstract class EntitySystem implements ISystemBase {
try {
listener.eventSystem.off(listener.eventType, listener.listenerRef);
} catch (error) {
console.warn(`[${this.systemName}] Failed to remove event listener for "${listener.eventType}":`, error);
this.logger.warn(`${this.systemName}: 移除事件监听器失败 "${listener.eventType}"`, error);
}
}

View File

@@ -2,6 +2,7 @@ import { Entity } from '../Entity';
import { EntitySystem } from './EntitySystem';
import { Matcher } from '../Utils/Matcher';
import { Time } from '../../Utils/Time';
import { createLogger } from '../../Utils/Logger';
import type { IComponent } from '../../Types';
/**
@@ -20,8 +21,10 @@ export type WorkerProcessFunction<T extends Record<string, any> = any> = (
export interface WorkerSystemConfig {
/** 是否启用Worker并行处理 */
enableWorker?: boolean;
/** Worker数量默认为CPU核心数 */
/** Worker数量默认为CPU核心数,自动限制在系统最大值内 */
workerCount?: number;
/** 每个Worker处理的实体数量用于控制负载分布 */
entitiesPerWorker?: number;
/** 系统配置数据会传递给Worker */
systemConfig?: any;
/** 是否使用SharedArrayBuffer优化 */
@@ -63,7 +66,8 @@ export type SharedArrayBufferProcessFunction = (
* constructor() {
* super(Matcher.all(Transform, Velocity), {
* enableWorker: true,
* workerCount: 4,
* workerCount: 8, // 指定8个worker系统会自动限制在系统最大值内
* entitiesPerWorker: 100, // 每个worker处理100个实体
* useSharedArrayBuffer: true,
* entityDataSize: 6, // x, y, vx, vy, radius, mass
* maxEntities: 10000,
@@ -183,19 +187,34 @@ export type SharedArrayBufferProcessFunction = (
* ```
*/
export abstract class WorkerEntitySystem<TEntityData = any> extends EntitySystem {
protected config: Required<Omit<WorkerSystemConfig, 'systemConfig'>> & { systemConfig?: any };
protected config: Required<Omit<WorkerSystemConfig, 'systemConfig' | 'entitiesPerWorker'>> & {
systemConfig?: any;
entitiesPerWorker?: number;
};
private workerPool: WebWorkerPool | null = null;
private isProcessing = false;
protected sharedBuffer: SharedArrayBuffer | null = null;
protected sharedFloatArray: Float32Array | null = null;
private logger = createLogger('WorkerEntitySystem');
constructor(matcher?: Matcher, config: WorkerSystemConfig = {}) {
super(matcher);
// 验证和调整 worker 数量,确保不超过系统最大值
const requestedWorkerCount = config.workerCount ?? this.getMaxSystemWorkerCount();
const maxSystemWorkerCount = this.getMaxSystemWorkerCount();
const validatedWorkerCount = Math.min(requestedWorkerCount, maxSystemWorkerCount);
// 如果用户请求的数量超过系统最大值,给出警告
if (requestedWorkerCount > maxSystemWorkerCount) {
this.logger.warn(`请求 ${requestedWorkerCount} 个Worker但系统最多支持 ${maxSystemWorkerCount} 个。实际使用 ${validatedWorkerCount} 个Worker。`);
}
this.config = {
enableWorker: config.enableWorker ?? true,
workerCount: config.workerCount ?? this.getOptimalWorkerCount(),
workerCount: validatedWorkerCount,
systemConfig: config.systemConfig,
entitiesPerWorker: config.entitiesPerWorker,
useSharedArrayBuffer: config.useSharedArrayBuffer ?? this.isSharedArrayBufferSupported(),
entityDataSize: config.entityDataSize ?? this.getDefaultEntityDataSize(),
maxEntities: config.maxEntities ?? 10000
@@ -226,13 +245,14 @@ export abstract class WorkerEntitySystem<TEntityData = any> extends EntitySystem
}
/**
* 获取最优Worker数量
* 获取系统支持的最大Worker数量
*/
private getOptimalWorkerCount(): number {
private getMaxSystemWorkerCount(): number {
if (typeof navigator !== 'undefined' && navigator.hardwareConcurrency) {
return Math.min(navigator.hardwareConcurrency, 4);
// 使用全部CPU核心数作为最大限制
return navigator.hardwareConcurrency;
}
return 2;
return 4; // 降级默认值
}
/**
@@ -248,7 +268,7 @@ export abstract class WorkerEntitySystem<TEntityData = any> extends EntitySystem
try {
// 检查是否支持SharedArrayBuffer
if (!this.isSharedArrayBufferSupported()) {
console.warn(`[${this.systemName}] SharedArrayBuffer not supported, falling back to single Worker mode for collision detection integrity`);
this.logger.warn(`${this.systemName}: 不支持SharedArrayBuffer降级到单Worker模式以保证数据处理完整性`);
this.config.useSharedArrayBuffer = false;
// 降级到单Worker模式确保所有实体在同一个Worker中处理维持实体间交互的完整性
this.config.workerCount = 1;
@@ -261,9 +281,9 @@ export abstract class WorkerEntitySystem<TEntityData = any> extends EntitySystem
this.sharedBuffer = new SharedArrayBuffer(bufferSize);
this.sharedFloatArray = new Float32Array(this.sharedBuffer);
console.log(`[${this.systemName}] SharedArrayBuffer initialized successfully (${bufferSize} bytes)`);
this.logger.info(`${this.systemName}: SharedArrayBuffer初始化成功 (${bufferSize} 字节)`);
} catch (error) {
console.warn(`[${this.systemName}] SharedArrayBuffer init failed, falling back to single Worker mode for collision detection integrity:`, error);
this.logger.warn(`${this.systemName}: SharedArrayBuffer初始化失败降级到单Worker模式以保证数据处理完整性`, error);
this.config.useSharedArrayBuffer = false;
this.sharedBuffer = null;
this.sharedFloatArray = null;
@@ -283,7 +303,7 @@ export abstract class WorkerEntitySystem<TEntityData = any> extends EntitySystem
this.sharedBuffer // 传递SharedArrayBuffer给Worker池
);
} catch (error) {
console.error(`[${this.systemName}] Failed to initialize worker pool:`, error);
this.logger.error(`${this.systemName}: Worker池初始化失败`, error);
this.config.enableWorker = false;
}
}
@@ -399,7 +419,7 @@ export abstract class WorkerEntitySystem<TEntityData = any> extends EntitySystem
} else {
// 如果配置了SharedArrayBuffer但不可用记录降级信息
if (this.config.useSharedArrayBuffer) {
console.log(`[${this.systemName}] Falling back to traditional Worker mode for this frame`);
this.logger.info(`${this.systemName}: 本帧降级到传统Worker模式`);
}
// 传统Worker异步处理
this.processWithWorker(entities).finally(() => {
@@ -408,13 +428,13 @@ export abstract class WorkerEntitySystem<TEntityData = any> extends EntitySystem
}
} else {
// 同步处理最后的fallback
console.log(`[${this.systemName}] Worker not available, processing synchronously`);
this.logger.info(`${this.systemName}: Worker不可用,使用同步处理`);
this.processSynchronously(entities);
this.isProcessing = false;
}
} catch (error) {
this.isProcessing = false;
console.error(`[${this.systemName}] Processing failed:`, error);
this.logger.error(`${this.systemName}: 处理失败`, error);
throw error;
}
}
@@ -506,11 +526,27 @@ export abstract class WorkerEntitySystem<TEntityData = any> extends EntitySystem
}
/**
* 创建数据批次 - 按Worker数量平均分配
* 创建数据批次 - 支持用户指定每个Worker的实体数量
*/
private createBatches<T>(data: T[]): T[][] {
const workerCount = this.config.workerCount;
const batches: T[][] = [];
// 如果用户指定了每个Worker处理的实体数量
if (this.config.entitiesPerWorker) {
const entitiesPerWorker = this.config.entitiesPerWorker;
for (let i = 0; i < data.length; i += entitiesPerWorker) {
const endIndex = Math.min(i + entitiesPerWorker, data.length);
batches.push(data.slice(i, endIndex));
}
// 限制批次数量不超过Worker数量
if (batches.length > workerCount) {
this.logger.warn(`${this.systemName}: 创建了 ${batches.length} 个批次,但只有 ${workerCount} 个Worker。某些Worker将依次处理多个批次。`);
}
} else {
// 默认行为按Worker数量平均分配
const batchSize = Math.ceil(data.length / workerCount);
for (let i = 0; i < workerCount; i++) {
@@ -521,6 +557,7 @@ export abstract class WorkerEntitySystem<TEntityData = any> extends EntitySystem
batches.push(data.slice(startIndex, endIndex));
}
}
}
return batches;
}
@@ -549,10 +586,36 @@ export abstract class WorkerEntitySystem<TEntityData = any> extends EntitySystem
protected abstract writeEntityToBuffer(entityData: TEntityData, offset: number): void;
/**
* 创建SharedArrayBuffer任务
* 创建SharedArrayBuffer任务 - 支持用户指定每个Worker的实体数量
*/
private createSharedArrayBufferTasks(entityCount: number): Promise<void>[] {
const promises: Promise<void>[] = [];
// 如果用户指定了每个Worker处理的实体数量
if (this.config.entitiesPerWorker) {
const entitiesPerWorker = this.config.entitiesPerWorker;
const tasksNeeded = Math.ceil(entityCount / entitiesPerWorker);
const availableWorkers = this.config.workerCount;
// 如果任务数超过Worker数量警告用户
if (tasksNeeded > availableWorkers) {
this.logger.warn(`${this.systemName}: 需要 ${tasksNeeded} 个任务处理 ${entityCount} 个实体(每任务 ${entitiesPerWorker} 个),但只有 ${availableWorkers} 个Worker。某些Worker将依次处理多个任务。`);
}
for (let i = 0; i < entityCount; i += entitiesPerWorker) {
const startIndex = i;
const endIndex = Math.min(i + entitiesPerWorker, entityCount);
const promise = this.workerPool!.executeSharedBuffer({
startIndex,
endIndex,
deltaTime: Time.deltaTime,
systemConfig: this.config.systemConfig
});
promises.push(promise);
}
} else {
// 默认行为按Worker数量平均分配
const entitiesPerWorker = Math.ceil(entityCount / this.config.workerCount);
for (let i = 0; i < this.config.workerCount; i++) {
@@ -569,6 +632,7 @@ export abstract class WorkerEntitySystem<TEntityData = any> extends EntitySystem
promises.push(promise);
}
}
}
return promises;
}
@@ -637,6 +701,19 @@ export abstract class WorkerEntitySystem<TEntityData = any> extends EntitySystem
*/
public updateConfig(newConfig: Partial<WorkerSystemConfig>): void {
const oldConfig = { ...this.config };
// 如果更新了workerCount需要验证并调整
if (newConfig.workerCount !== undefined) {
const maxSystemWorkerCount = this.getMaxSystemWorkerCount();
const validatedWorkerCount = Math.min(newConfig.workerCount, maxSystemWorkerCount);
if (newConfig.workerCount > maxSystemWorkerCount) {
this.logger.warn(`请求 ${newConfig.workerCount} 个Worker但系统最多支持 ${maxSystemWorkerCount} 个。实际使用 ${validatedWorkerCount} 个Worker。`);
}
newConfig.workerCount = validatedWorkerCount;
}
Object.assign(this.config, newConfig);
// 如果 SharedArrayBuffer 设置发生变化,需要重新初始化
@@ -709,6 +786,8 @@ export abstract class WorkerEntitySystem<TEntityData = any> extends EntitySystem
public getWorkerInfo(): {
enabled: boolean;
workerCount: number;
entitiesPerWorker?: number;
maxSystemWorkerCount: number;
isProcessing: boolean;
sharedArrayBufferSupported: boolean;
sharedArrayBufferEnabled: boolean;
@@ -727,6 +806,8 @@ export abstract class WorkerEntitySystem<TEntityData = any> extends EntitySystem
return {
enabled: this.config.enableWorker,
workerCount: this.config.workerCount,
entitiesPerWorker: this.config.entitiesPerWorker,
maxSystemWorkerCount: this.getMaxSystemWorkerCount(),
isProcessing: this.isProcessing,
sharedArrayBufferSupported: this.isSharedArrayBufferSupported(),
sharedArrayBufferEnabled: this.config.useSharedArrayBuffer,