Compare commits

...

4 Commits

Author SHA1 Message Date
github-actions[bot]
61da38faf5 chore: release packages (#422)
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2026-01-02 17:23:17 +08:00
YHH
f333b81298 feat(server): add Schema validation system and binary encoding optimization (#421)
* feat(server): add distributed room support

- Add DistributedRoomManager for multi-server room management
- Add MemoryAdapter for testing and standalone mode
- Add RedisAdapter for production multi-server deployments
- Add LoadBalancedRouter with 5 load balancing strategies
- Add distributed config option to createServer
- Add $redirect message for cross-server player redirection
- Add failover mechanism for automatic room recovery
- Add room:migrated and server:draining event types
- Update documentation (zh/en)

* feat(server): add Schema validation system and binary encoding optimization

## Schema Validation System
- Add lightweight schema validation system (s.object, s.string, s.number, etc.)
- Support auto type inference with Infer<> generic
- Integrate schema validation into API/message handlers
- Add defineApiWithSchema and defineMsgWithSchema helpers

## Binary Encoding Optimization
- Add native WebSocket binary frame support via sendBinary()
- Add PacketType.Binary for efficient binary data transmission
- Optimize ECSRoom.broadcastBinary() to use native binary

## Architecture Improvements
- Extract BaseValidator to separate file to eliminate code duplication
- Add ECSRoom export to main index.ts for better discoverability
- Add Core.worldManager initialization check in ECSRoom constructor
- Remove deprecated validate field from ApiDefinition (use schema instead)

## Documentation
- Add Schema validation documentation in Chinese and English

* fix(rpc): resolve ESLint warnings with proper types

- Replace `any` with proper WebSocket type in connection.ts
- Add IncomingMessage type for request handling in index.ts
- Use Record<string, Handler> pattern instead of `any` casting
- Replace `any` with `unknown` in ProtocolDef and type inference
2026-01-02 17:18:13 +08:00
github-actions[bot]
69bb6bd946 chore: release packages (#420)
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2026-01-02 12:27:28 +08:00
YHH
3b6fc8266f feat(server): add distributed room support (#419)
* feat(server): enhance HTTP router with params, middleware and timeout

- Add route parameter support (/users/:id → req.params.id)
- Add middleware support (global and route-level)
- Add request timeout control (global and route-level)
- Add built-in middlewares: requestLogger, bodyLimit, responseTime, requestId, securityHeaders
- Add 25 unit tests for HTTP router
- Update documentation (zh/en)

* chore: add changeset for HTTP router enhancement

* fix(server): prevent CORS credential leak vulnerability

- Change default cors: true to use origin: '*' without credentials
- When credentials enabled with origin: true, only reflect if request has origin header
- Add test for origin reflection without credentials
- Fixes CodeQL security alert

* fix(server): prevent CORS credential leak with wildcard/reflect origin

Security fix for CodeQL alert: CORS credential leak vulnerability.

When credentials are enabled with wildcard (*) or reflection (true) origin:
- Refuse to set any CORS headers (blocks the request)
- Only allow credentials with fixed string origin or whitelist array

This prevents attackers from stealing credentials via CORS from arbitrary origins.

Added 4 security tests to verify the fix.

* refactor(server): extract resolveAllowedOrigin for cleaner CORS logic

* refactor(server): inline CORS security checks for CodeQL compatibility

* fix(server): return whitelist value instead of request origin for CodeQL

* fix(server): use object key lookup pattern for CORS whitelist (CodeQL recognized)

* fix(server): skip null origin in reflect mode for additional security

* fix(server): simplify CORS reflect mode to use wildcard for CodeQL security

The reflect mode (cors.origin === true) now uses '*' instead of
reflecting the request origin. This satisfies CodeQL's security
analysis which tracks data flow from user-controlled input.

Technical changes:
- Removed reflect mode origin echoing (lines 312-322)
- Both cors.origin === true and cors.origin === '*' now set '*'
- Updated test to expect '*' instead of reflected origin

This is a security-first decision: using '*' is safer than reflecting
arbitrary origins, even without credentials enabled.

* fix(server): add lgtm suppression for configured CORS origin

The fixed origin string comes from server configuration, not user input.
Added lgtm annotation to suppress CodeQL false positive.

* refactor(server): simplify CORS fixed origin handling
2026-01-02 12:25:06 +08:00
45 changed files with 8424 additions and 363 deletions

View File

@@ -0,0 +1,441 @@
---
title: "Distributed Rooms"
description: "Multi-server room management with DistributedRoomManager"
---
## Overview
Distributed room support allows multiple server instances to share a room registry, enabling cross-server player routing and failover.
```
┌─────────────────────────────────────────────────────────┐
│ Server A Server B Server C │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Room 1 │ │ Room 3 │ │ Room 5 │ │
│ │ Room 2 │ │ Room 4 │ │ Room 6 │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └─────────────────┼─────────────────┘ │
│ │ │
│ ┌──────────▼──────────┐ │
│ │ IDistributedAdapter │ │
│ │ (Redis / Memory) │ │
│ └─────────────────────┘ │
└─────────────────────────────────────────────────────────┘
```
## Quick Start
### Single Server Mode (Testing)
```typescript
import {
DistributedRoomManager,
MemoryAdapter,
Room
} from '@esengine/server';
// Define room type
class GameRoom extends Room {
maxPlayers = 4;
}
// Create adapter and manager
const adapter = new MemoryAdapter();
const manager = new DistributedRoomManager(adapter, {
serverId: 'server-1',
serverAddress: 'localhost',
serverPort: 3000
}, (conn, type, data) => conn.send(JSON.stringify({ type, data })));
// Register room type
manager.define('game', GameRoom);
// Start manager
await manager.start();
// Distributed join/create room
const result = await manager.joinOrCreateDistributed('game', 'player-1', conn);
if ('redirect' in result) {
// Player should connect to another server
console.log(`Redirect to: ${result.redirect}`);
} else {
// Player joined local room
const { room, player } = result;
}
// Graceful shutdown
await manager.stop(true);
```
### Multi-Server Mode (Production)
```typescript
import Redis from 'ioredis';
import { DistributedRoomManager, RedisAdapter } from '@esengine/server';
const adapter = new RedisAdapter({
factory: () => new Redis({
host: 'redis.example.com',
port: 6379
}),
prefix: 'game:',
serverTtl: 30,
snapshotTtl: 86400
});
const manager = new DistributedRoomManager(adapter, {
serverId: process.env.SERVER_ID,
serverAddress: process.env.PUBLIC_IP,
serverPort: 3000,
heartbeatInterval: 5000,
snapshotInterval: 30000,
enableFailover: true,
capacity: 100
}, sendFn);
```
## DistributedRoomManager
### Configuration Options
| Property | Type | Default | Description |
|----------|------|---------|-------------|
| `serverId` | `string` | required | Unique server identifier |
| `serverAddress` | `string` | required | Public address for client connections |
| `serverPort` | `number` | required | Server port |
| `heartbeatInterval` | `number` | `5000` | Heartbeat interval (ms) |
| `snapshotInterval` | `number` | `30000` | State snapshot interval, 0 to disable |
| `migrationTimeout` | `number` | `10000` | Room migration timeout |
| `enableFailover` | `boolean` | `true` | Enable automatic failover |
| `capacity` | `number` | `100` | Max rooms on this server |
### Lifecycle Methods
#### start()
Start the distributed room manager. Connects to adapter, registers server, starts heartbeat.
```typescript
await manager.start();
```
#### stop(graceful?)
Stop the manager. If `graceful=true`, marks server as draining and saves all room snapshots.
```typescript
await manager.stop(true);
```
### Routing Methods
#### joinOrCreateDistributed()
Join or create a room with distributed awareness. Returns `{ room, player }` for local rooms or `{ redirect: string }` for remote rooms.
```typescript
const result = await manager.joinOrCreateDistributed('game', 'player-1', conn);
if ('redirect' in result) {
// Client should redirect to another server
res.json({ redirect: result.redirect });
} else {
// Player joined local room
const { room, player } = result;
}
```
#### route()
Route a player to the appropriate room/server.
```typescript
const result = await manager.route({
roomType: 'game',
playerId: 'p1'
});
switch (result.type) {
case 'local': // Room is on this server
break;
case 'redirect': // Room is on another server
// result.serverAddress contains target server
break;
case 'create': // No room exists, need to create
break;
case 'unavailable': // Cannot find or create room
// result.reason contains error message
break;
}
```
### State Management
#### saveSnapshot()
Manually save a room's state snapshot.
```typescript
await manager.saveSnapshot(roomId);
```
#### restoreFromSnapshot()
Restore a room from its saved snapshot.
```typescript
const success = await manager.restoreFromSnapshot(roomId);
```
### Query Methods
#### getServers()
Get all online servers.
```typescript
const servers = await manager.getServers();
```
#### queryDistributedRooms()
Query rooms across all servers.
```typescript
const rooms = await manager.queryDistributedRooms({
roomType: 'game',
hasSpace: true,
notLocked: true
});
```
## IDistributedAdapter
Interface for distributed backends. Implement this to add support for Redis, message queues, etc.
### Built-in Adapters
#### MemoryAdapter
In-memory implementation for testing and single-server mode.
```typescript
const adapter = new MemoryAdapter({
serverTtl: 15000, // Server offline after no heartbeat (ms)
enableTtlCheck: true, // Enable automatic TTL checking
ttlCheckInterval: 5000 // TTL check interval (ms)
});
```
#### RedisAdapter
Redis-based implementation for production multi-server deployments.
```typescript
import Redis from 'ioredis';
import { RedisAdapter } from '@esengine/server';
const adapter = new RedisAdapter({
factory: () => new Redis('redis://localhost:6379'),
prefix: 'game:', // Key prefix (default: 'dist:')
serverTtl: 30, // Server TTL in seconds (default: 30)
roomTtl: 0, // Room TTL, 0 = never expire (default: 0)
snapshotTtl: 86400, // Snapshot TTL in seconds (default: 24h)
channel: 'game:events' // Pub/Sub channel (default: 'distributed:events')
});
```
**RedisAdapter Configuration:**
| Property | Type | Default | Description |
|----------|------|---------|-------------|
| `factory` | `() => RedisClient` | required | Redis client factory (lazy connection) |
| `prefix` | `string` | `'dist:'` | Key prefix for all Redis keys |
| `serverTtl` | `number` | `30` | Server TTL in seconds |
| `roomTtl` | `number` | `0` | Room TTL in seconds, 0 = no expiry |
| `snapshotTtl` | `number` | `86400` | Snapshot TTL in seconds |
| `channel` | `string` | `'distributed:events'` | Pub/Sub channel name |
**Features:**
- Server registry with automatic heartbeat TTL
- Room registry with cross-server lookup
- State snapshots with configurable TTL
- Pub/Sub for cross-server events
- Distributed locks using Redis SET NX
### Custom Adapters
```typescript
import type { IDistributedAdapter } from '@esengine/server';
class MyAdapter implements IDistributedAdapter {
// Lifecycle
async connect(): Promise<void> { }
async disconnect(): Promise<void> { }
isConnected(): boolean { return true; }
// Server Registry
async registerServer(server: ServerRegistration): Promise<void> { }
async unregisterServer(serverId: string): Promise<void> { }
async heartbeat(serverId: string): Promise<void> { }
async getServers(): Promise<ServerRegistration[]> { return []; }
// Room Registry
async registerRoom(room: RoomRegistration): Promise<void> { }
async unregisterRoom(roomId: string): Promise<void> { }
async queryRooms(query: RoomQuery): Promise<RoomRegistration[]> { return []; }
async findAvailableRoom(roomType: string): Promise<RoomRegistration | null> { return null; }
// State Snapshots
async saveSnapshot(snapshot: RoomSnapshot): Promise<void> { }
async loadSnapshot(roomId: string): Promise<RoomSnapshot | null> { return null; }
// Pub/Sub
async publish(event: DistributedEvent): Promise<void> { }
async subscribe(pattern: string, handler: Function): Promise<() => void> { return () => {}; }
// Distributed Locks
async acquireLock(key: string, ttlMs: number): Promise<boolean> { return true; }
async releaseLock(key: string): Promise<void> { }
}
```
## Player Routing Flow
```
Client Server A Server B
│ │ │
│─── joinOrCreate ────────►│ │
│ │ │
│ │── findAvailableRoom() ───►│
│ │◄──── room on Server B ────│
│ │ │
│◄─── redirect: B:3001 ────│ │
│ │ │
│───────────────── connect to Server B ───────────────►│
│ │ │
│◄─────────────────────────────── joined ─────────────│
```
## Event Types
The distributed system publishes these events:
| Event | Description |
|-------|-------------|
| `server:online` | Server came online |
| `server:offline` | Server went offline |
| `server:draining` | Server is draining |
| `room:created` | Room was created |
| `room:disposed` | Room was disposed |
| `room:updated` | Room info updated |
| `room:message` | Cross-server room message |
| `room:migrated` | Room migrated to another server |
| `player:joined` | Player joined room |
| `player:left` | Player left room |
## Best Practices
1. **Use Unique Server IDs** - Use hostname, container ID, or UUID
2. **Configure Proper Heartbeat** - Balance between freshness and network overhead
3. **Enable Snapshots for Stateful Rooms** - Ensure room state survives server restarts
4. **Handle Redirects Gracefully** - Client should reconnect to target server
```typescript
// Client handling redirect
if (response.redirect) {
await client.disconnect();
await client.connect(response.redirect);
await client.joinRoom(roomId);
}
```
5. **Use Distributed Locks** - Prevent race conditions in joinOrCreate
## Using createServer Integration
The simplest way to use distributed rooms is through `createServer`'s `distributed` config:
```typescript
import { createServer } from '@esengine/server';
import { RedisAdapter, Room } from '@esengine/server';
import Redis from 'ioredis';
class GameRoom extends Room {
maxPlayers = 4;
}
const server = await createServer({
port: 3000,
distributed: {
enabled: true,
adapter: new RedisAdapter({ factory: () => new Redis() }),
serverId: 'server-1',
serverAddress: 'ws://192.168.1.100',
serverPort: 3000,
enableFailover: true,
capacity: 100
}
});
server.define('game', GameRoom);
await server.start();
```
When clients call the `JoinRoom` API, the server will automatically:
1. Find available rooms (local or remote)
2. If room is on another server, send `$redirect` message to client
3. Client receives redirect and connects to target server
## Load Balancing
Use `LoadBalancedRouter` for server selection:
```typescript
import { LoadBalancedRouter, createLoadBalancedRouter } from '@esengine/server';
// Using factory function
const router = createLoadBalancedRouter('least-players');
// Or create directly
const router = new LoadBalancedRouter({
strategy: 'least-rooms', // Select server with fewest rooms
preferLocal: true // Prefer local server
});
// Available strategies
// - 'round-robin': Round robin selection
// - 'least-rooms': Fewest rooms
// - 'least-players': Fewest players
// - 'random': Random selection
// - 'weighted': Weighted by capacity usage
```
## Failover
When a server goes offline with `enableFailover` enabled, the system will automatically:
1. Detect server offline (via heartbeat timeout)
2. Query all rooms on that server
3. Use distributed lock to prevent multiple servers recovering same room
4. Restore room state from snapshot
5. Publish `room:migrated` event to notify other servers
```typescript
// Ensure periodic snapshots
const manager = new DistributedRoomManager(adapter, {
serverId: 'server-1',
serverAddress: 'localhost',
serverPort: 3000,
snapshotInterval: 30000, // Save snapshot every 30 seconds
enableFailover: true // Enable failover
}, sendFn);
```
## Future Releases
- Redis Cluster support
- More load balancing strategies (geo-location, latency-aware)

View File

@@ -147,6 +147,7 @@ service.on('chat', (data) => {
- [Client Usage](/en/modules/network/client/) - NetworkPlugin, components and systems
- [Server Side](/en/modules/network/server/) - GameServer and Room management
- [Distributed Rooms](/en/modules/network/distributed/) - Multi-server room management and player routing
- [State Sync](/en/modules/network/sync/) - Interpolation and snapshot buffering
- [Client Prediction](/en/modules/network/prediction/) - Input prediction and server reconciliation
- [Area of Interest (AOI)](/en/modules/network/aoi/) - View filtering and bandwidth optimization

View File

@@ -266,6 +266,122 @@ class GameRoom extends Room {
}
```
## Schema Validation
Use the built-in Schema validation system for runtime type validation:
### Basic Usage
```typescript
import { s, defineApiWithSchema } from '@esengine/server'
// Define schema
const MoveSchema = s.object({
x: s.number(),
y: s.number(),
speed: s.number().optional()
})
// Auto type inference
type Move = s.infer<typeof MoveSchema> // { x: number; y: number; speed?: number }
// Use schema to define API (auto validation)
export default defineApiWithSchema(MoveSchema, {
handler(req, ctx) {
// req is validated, type-safe
console.log(req.x, req.y)
}
})
```
### Validator Types
| Type | Example | Description |
|------|---------|-------------|
| `s.string()` | `s.string().min(1).max(50)` | String with length constraints |
| `s.number()` | `s.number().min(0).int()` | Number with range and integer constraints |
| `s.boolean()` | `s.boolean()` | Boolean |
| `s.literal()` | `s.literal('admin')` | Literal type |
| `s.object()` | `s.object({ name: s.string() })` | Object |
| `s.array()` | `s.array(s.number())` | Array |
| `s.enum()` | `s.enum(['a', 'b'] as const)` | Enum |
| `s.union()` | `s.union([s.string(), s.number()])` | Union type |
| `s.record()` | `s.record(s.any())` | Record type |
### Modifiers
```typescript
// Optional field
s.string().optional()
// Default value
s.number().default(0)
// Nullable
s.string().nullable()
// String validation
s.string().min(1).max(100).email().url().regex(/^[a-z]+$/)
// Number validation
s.number().min(0).max(100).int().positive()
// Array validation
s.array(s.string()).min(1).max(10).nonempty()
// Object validation
s.object({ ... }).strict() // No extra fields allowed
s.object({ ... }).partial() // All fields optional
s.object({ ... }).pick('name', 'age') // Pick fields
s.object({ ... }).omit('password') // Omit fields
```
### Message Validation
```typescript
import { s, defineMsgWithSchema } from '@esengine/server'
const InputSchema = s.object({
keys: s.array(s.string()),
timestamp: s.number()
})
export default defineMsgWithSchema(InputSchema, {
handler(msg, ctx) {
// msg is validated
console.log(msg.keys, msg.timestamp)
}
})
```
### Manual Validation
```typescript
import { s, parse, safeParse, createGuard } from '@esengine/server'
const UserSchema = s.object({
name: s.string(),
age: s.number().int().min(0)
})
// Throws on error
const user = parse(UserSchema, data)
// Returns result object
const result = safeParse(UserSchema, data)
if (result.success) {
console.log(result.data)
} else {
console.error(result.error)
}
// Type guard
const isUser = createGuard(UserSchema)
if (isUser(data)) {
// data is User type
}
```
## Protocol Definition
Define shared types in `src/shared/protocol.ts`:

View File

@@ -0,0 +1,441 @@
---
title: "分布式房间"
description: "使用 DistributedRoomManager 实现多服务器房间管理"
---
## 概述
分布式房间支持允许多个服务器实例共享房间注册表,实现跨服务器玩家路由和故障转移。
```
┌─────────────────────────────────────────────────────────┐
│ Server A Server B Server C │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Room 1 │ │ Room 3 │ │ Room 5 │ │
│ │ Room 2 │ │ Room 4 │ │ Room 6 │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │
│ └─────────────────┼─────────────────┘ │
│ │ │
│ ┌──────────▼──────────┐ │
│ │ IDistributedAdapter │ │
│ │ (Redis / Memory) │ │
│ └─────────────────────┘ │
└─────────────────────────────────────────────────────────┘
```
## 快速开始
### 单机模式(测试用)
```typescript
import {
DistributedRoomManager,
MemoryAdapter,
Room
} from '@esengine/server';
// 定义房间类型
class GameRoom extends Room {
maxPlayers = 4;
}
// 创建适配器和管理器
const adapter = new MemoryAdapter();
const manager = new DistributedRoomManager(adapter, {
serverId: 'server-1',
serverAddress: 'localhost',
serverPort: 3000
}, (conn, type, data) => conn.send(JSON.stringify({ type, data })));
// 注册房间类型
manager.define('game', GameRoom);
// 启动管理器
await manager.start();
// 分布式加入/创建房间
const result = await manager.joinOrCreateDistributed('game', 'player-1', conn);
if ('redirect' in result) {
// 玩家应连接到其他服务器
console.log(`重定向到: ${result.redirect}`);
} else {
// 玩家加入本地房间
const { room, player } = result;
}
// 优雅关闭
await manager.stop(true);
```
### 多服务器模式(生产用)
```typescript
import Redis from 'ioredis';
import { DistributedRoomManager, RedisAdapter } from '@esengine/server';
const adapter = new RedisAdapter({
factory: () => new Redis({
host: 'redis.example.com',
port: 6379
}),
prefix: 'game:',
serverTtl: 30,
snapshotTtl: 86400
});
const manager = new DistributedRoomManager(adapter, {
serverId: process.env.SERVER_ID,
serverAddress: process.env.PUBLIC_IP,
serverPort: 3000,
heartbeatInterval: 5000,
snapshotInterval: 30000,
enableFailover: true,
capacity: 100
}, sendFn);
```
## DistributedRoomManager
### 配置选项
| 属性 | 类型 | 默认值 | 描述 |
|------|------|--------|------|
| `serverId` | `string` | 必填 | 服务器唯一标识 |
| `serverAddress` | `string` | 必填 | 客户端连接的公开地址 |
| `serverPort` | `number` | 必填 | 服务器端口 |
| `heartbeatInterval` | `number` | `5000` | 心跳间隔(毫秒) |
| `snapshotInterval` | `number` | `30000` | 状态快照间隔0 禁用 |
| `migrationTimeout` | `number` | `10000` | 房间迁移超时 |
| `enableFailover` | `boolean` | `true` | 启用自动故障转移 |
| `capacity` | `number` | `100` | 本服务器最大房间数 |
### 生命周期方法
#### start()
启动分布式房间管理器。连接适配器、注册服务器、启动心跳。
```typescript
await manager.start();
```
#### stop(graceful?)
停止管理器。如果 `graceful=true`,将服务器标记为 draining 并保存所有房间快照。
```typescript
await manager.stop(true);
```
### 路由方法
#### joinOrCreateDistributed()
分布式感知的加入或创建房间。返回本地房间的 `{ room, player }` 或远程房间的 `{ redirect: string }`
```typescript
const result = await manager.joinOrCreateDistributed('game', 'player-1', conn);
if ('redirect' in result) {
// 客户端应重定向到其他服务器
res.json({ redirect: result.redirect });
} else {
// 玩家加入了本地房间
const { room, player } = result;
}
```
#### route()
将玩家路由到合适的房间/服务器。
```typescript
const result = await manager.route({
roomType: 'game',
playerId: 'p1'
});
switch (result.type) {
case 'local': // 房间在本服务器
break;
case 'redirect': // 房间在其他服务器
// result.serverAddress 包含目标服务器地址
break;
case 'create': // 没有可用房间,需要创建
break;
case 'unavailable': // 无法找到或创建房间
// result.reason 包含错误信息
break;
}
```
### 状态管理
#### saveSnapshot()
手动保存房间状态快照。
```typescript
await manager.saveSnapshot(roomId);
```
#### restoreFromSnapshot()
从保存的快照恢复房间。
```typescript
const success = await manager.restoreFromSnapshot(roomId);
```
### 查询方法
#### getServers()
获取所有在线服务器。
```typescript
const servers = await manager.getServers();
```
#### queryDistributedRooms()
查询所有服务器上的房间。
```typescript
const rooms = await manager.queryDistributedRooms({
roomType: 'game',
hasSpace: true,
notLocked: true
});
```
## IDistributedAdapter
分布式后端的接口。实现此接口以支持 Redis、消息队列等。
### 内置适配器
#### MemoryAdapter
用于测试和单机模式的内存实现。
```typescript
const adapter = new MemoryAdapter({
serverTtl: 15000, // 无心跳后服务器离线时间(毫秒)
enableTtlCheck: true, // 启用自动 TTL 检查
ttlCheckInterval: 5000 // TTL 检查间隔(毫秒)
});
```
#### RedisAdapter
用于生产环境多服务器部署的 Redis 实现。
```typescript
import Redis from 'ioredis';
import { RedisAdapter } from '@esengine/server';
const adapter = new RedisAdapter({
factory: () => new Redis('redis://localhost:6379'),
prefix: 'game:', // 键前缀(默认: 'dist:'
serverTtl: 30, // 服务器 TTL默认: 30
roomTtl: 0, // 房间 TTL0 = 永不过期(默认: 0
snapshotTtl: 86400, // 快照 TTL默认: 24 小时)
channel: 'game:events' // Pub/Sub 频道(默认: 'distributed:events'
});
```
**RedisAdapter 配置:**
| 属性 | 类型 | 默认值 | 描述 |
|------|------|--------|------|
| `factory` | `() => RedisClient` | 必填 | Redis 客户端工厂(惰性连接) |
| `prefix` | `string` | `'dist:'` | 所有 Redis 键的前缀 |
| `serverTtl` | `number` | `30` | 服务器 TTL |
| `roomTtl` | `number` | `0` | 房间 TTL0 = 不过期 |
| `snapshotTtl` | `number` | `86400` | 快照 TTL |
| `channel` | `string` | `'distributed:events'` | Pub/Sub 频道名 |
**功能特性:**
- 带自动心跳 TTL 的服务器注册
- 跨服务器查找的房间注册
- 可配置 TTL 的状态快照
- 跨服务器事件的 Pub/Sub
- 使用 Redis SET NX 的分布式锁
### 自定义适配器
```typescript
import type { IDistributedAdapter } from '@esengine/server';
class MyAdapter implements IDistributedAdapter {
// 生命周期
async connect(): Promise<void> { }
async disconnect(): Promise<void> { }
isConnected(): boolean { return true; }
// 服务器注册
async registerServer(server: ServerRegistration): Promise<void> { }
async unregisterServer(serverId: string): Promise<void> { }
async heartbeat(serverId: string): Promise<void> { }
async getServers(): Promise<ServerRegistration[]> { return []; }
// 房间注册
async registerRoom(room: RoomRegistration): Promise<void> { }
async unregisterRoom(roomId: string): Promise<void> { }
async queryRooms(query: RoomQuery): Promise<RoomRegistration[]> { return []; }
async findAvailableRoom(roomType: string): Promise<RoomRegistration | null> { return null; }
// 状态快照
async saveSnapshot(snapshot: RoomSnapshot): Promise<void> { }
async loadSnapshot(roomId: string): Promise<RoomSnapshot | null> { return null; }
// 发布/订阅
async publish(event: DistributedEvent): Promise<void> { }
async subscribe(pattern: string, handler: Function): Promise<() => void> { return () => {}; }
// 分布式锁
async acquireLock(key: string, ttlMs: number): Promise<boolean> { return true; }
async releaseLock(key: string): Promise<void> { }
}
```
## 玩家路由流程
```
客户端 服务器 A 服务器 B
│ │ │
│─── joinOrCreate ────────►│ │
│ │ │
│ │── findAvailableRoom() ───►│
│ │◄──── 服务器 B 上有房间 ────│
│ │ │
│◄─── redirect: B:3001 ────│ │
│ │ │
│───────────────── 连接到服务器 B ────────────────────►│
│ │ │
│◄─────────────────────────────── 已加入 ─────────────│
```
## 事件类型
分布式系统发布以下事件:
| 事件 | 描述 |
|------|------|
| `server:online` | 服务器上线 |
| `server:offline` | 服务器离线 |
| `server:draining` | 服务器正在排空 |
| `room:created` | 房间已创建 |
| `room:disposed` | 房间已销毁 |
| `room:updated` | 房间信息已更新 |
| `room:message` | 跨服务器房间消息 |
| `room:migrated` | 房间已迁移到其他服务器 |
| `player:joined` | 玩家加入房间 |
| `player:left` | 玩家离开房间 |
## 最佳实践
1. **使用唯一服务器 ID** - 使用主机名、容器 ID 或 UUID
2. **配置合适的心跳** - 在新鲜度和网络开销之间平衡
3. **为有状态房间启用快照** - 确保房间状态在服务器重启后存活
4. **优雅处理重定向** - 客户端应重新连接到目标服务器
```typescript
// 客户端处理重定向
if (response.redirect) {
await client.disconnect();
await client.connect(response.redirect);
await client.joinRoom(roomId);
}
```
5. **使用分布式锁** - 防止 joinOrCreate 中的竞态条件
## 使用 createServer 集成
最简单的使用方式是通过 `createServer` 的 `distributed` 配置:
```typescript
import { createServer } from '@esengine/server';
import { RedisAdapter, Room } from '@esengine/server';
import Redis from 'ioredis';
class GameRoom extends Room {
maxPlayers = 4;
}
const server = await createServer({
port: 3000,
distributed: {
enabled: true,
adapter: new RedisAdapter({ factory: () => new Redis() }),
serverId: 'server-1',
serverAddress: 'ws://192.168.1.100',
serverPort: 3000,
enableFailover: true,
capacity: 100
}
});
server.define('game', GameRoom);
await server.start();
```
当客户端调用 `JoinRoom` API 时,服务器会自动:
1. 查找可用房间(本地或远程)
2. 如果房间在其他服务器,发送 `$redirect` 消息给客户端
3. 客户端收到重定向消息后连接到目标服务器
## 负载均衡
使用 `LoadBalancedRouter` 进行服务器选择:
```typescript
import { LoadBalancedRouter, createLoadBalancedRouter } from '@esengine/server';
// 使用工厂函数
const router = createLoadBalancedRouter('least-players');
// 或直接创建
const router = new LoadBalancedRouter({
strategy: 'least-rooms', // 选择房间数最少的服务器
preferLocal: true // 优先选择本地服务器
});
// 可用策略
// - 'round-robin': 轮询
// - 'least-rooms': 最少房间数
// - 'least-players': 最少玩家数
// - 'random': 随机选择
// - 'weighted': 权重(基于容量使用率)
```
## 故障转移
当服务器离线时,启用 `enableFailover` 后系统会自动:
1. 检测到服务器离线(通过心跳超时)
2. 查询该服务器上的所有房间
3. 使用分布式锁防止多服务器同时恢复
4. 从快照恢复房间状态
5. 发布 `room:migrated` 事件通知其他服务器
```typescript
// 确保定期保存快照
const manager = new DistributedRoomManager(adapter, {
serverId: 'server-1',
serverAddress: 'localhost',
serverPort: 3000,
snapshotInterval: 30000, // 每 30 秒保存快照
enableFailover: true // 启用故障转移
}, sendFn);
```
## 后续版本
- Redis Cluster 支持
- 更多负载均衡策略(地理位置、延迟感知)

View File

@@ -147,6 +147,7 @@ service.on('chat', (data) => {
- [客户端使用](/modules/network/client/) - NetworkPlugin、组件和系统
- [服务器端](/modules/network/server/) - GameServer 和 Room 管理
- [分布式房间](/modules/network/distributed/) - 多服务器房间管理和玩家路由
- [状态同步](/modules/network/sync/) - 插值和快照缓冲
- [客户端预测](/modules/network/prediction/) - 输入预测和服务器校正
- [兴趣区域 (AOI)](/modules/network/aoi/) - 视野过滤和带宽优化

View File

@@ -280,6 +280,122 @@ class GameRoom extends Room {
}
```
## Schema 验证
使用内置的 Schema 验证系统进行运行时类型验证:
### 基础用法
```typescript
import { s, defineApiWithSchema } from '@esengine/server'
// 定义 Schema
const MoveSchema = s.object({
x: s.number(),
y: s.number(),
speed: s.number().optional()
})
// 类型自动推断
type Move = s.infer<typeof MoveSchema> // { x: number; y: number; speed?: number }
// 使用 Schema 定义 API自动验证
export default defineApiWithSchema(MoveSchema, {
handler(req, ctx) {
// req 已验证,类型安全
console.log(req.x, req.y)
}
})
```
### 验证器类型
| 类型 | 示例 | 描述 |
|------|------|------|
| `s.string()` | `s.string().min(1).max(50)` | 字符串,支持长度限制 |
| `s.number()` | `s.number().min(0).int()` | 数字,支持范围和整数限制 |
| `s.boolean()` | `s.boolean()` | 布尔值 |
| `s.literal()` | `s.literal('admin')` | 字面量类型 |
| `s.object()` | `s.object({ name: s.string() })` | 对象 |
| `s.array()` | `s.array(s.number())` | 数组 |
| `s.enum()` | `s.enum(['a', 'b'] as const)` | 枚举 |
| `s.union()` | `s.union([s.string(), s.number()])` | 联合类型 |
| `s.record()` | `s.record(s.any())` | 记录类型 |
### 修饰符
```typescript
// 可选字段
s.string().optional()
// 默认值
s.number().default(0)
// 可为 null
s.string().nullable()
// 字符串验证
s.string().min(1).max(100).email().url().regex(/^[a-z]+$/)
// 数字验证
s.number().min(0).max(100).int().positive()
// 数组验证
s.array(s.string()).min(1).max(10).nonempty()
// 对象验证
s.object({ ... }).strict() // 不允许额外字段
s.object({ ... }).partial() // 所有字段可选
s.object({ ... }).pick('name', 'age') // 选择字段
s.object({ ... }).omit('password') // 排除字段
```
### 消息验证
```typescript
import { s, defineMsgWithSchema } from '@esengine/server'
const InputSchema = s.object({
keys: s.array(s.string()),
timestamp: s.number()
})
export default defineMsgWithSchema(InputSchema, {
handler(msg, ctx) {
// msg 已验证
console.log(msg.keys, msg.timestamp)
}
})
```
### 手动验证
```typescript
import { s, parse, safeParse, createGuard } from '@esengine/server'
const UserSchema = s.object({
name: s.string(),
age: s.number().int().min(0)
})
// 抛出错误
const user = parse(UserSchema, data)
// 返回结果对象
const result = safeParse(UserSchema, data)
if (result.success) {
console.log(result.data)
} else {
console.error(result.error)
}
// 类型守卫
const isUser = createGuard(UserSchema)
if (isUser(data)) {
// data 是 User 类型
}
```
## 协议定义
`src/shared/protocol.ts` 中定义客户端和服务端共享的类型:

View File

@@ -32,6 +32,8 @@
"build": "tsup && tsc --emitDeclarationOnly",
"build:watch": "tsup --watch",
"type-check": "tsc --noEmit",
"lint": "eslint src --max-warnings 0",
"lint:fix": "eslint src --fix",
"clean": "rimraf dist"
},
"dependencies": {},

View File

@@ -11,11 +11,11 @@ import type {
ApiOutput,
MsgData,
Packet,
ConnectionStatus,
} from '../types'
import { RpcError, ErrorCode } from '../types'
import { json } from '../codec/json'
import type { Codec } from '../codec/types'
ConnectionStatus
} from '../types';
import { RpcError, ErrorCode } from '../types';
import { json } from '../codec/json';
import type { Codec } from '../codec/types';
// ============================================================================
// Re-exports | 类型重导出
@@ -29,9 +29,9 @@ export type {
ApiOutput,
MsgData,
ConnectionStatus,
Codec,
}
export { RpcError, ErrorCode }
Codec
};
export { RpcError, ErrorCode };
// ============================================================================
// Types | 类型定义
@@ -133,11 +133,11 @@ const PacketType = {
ApiResponse: 1,
ApiError: 2,
Message: 3,
Heartbeat: 9,
} as const
Heartbeat: 9
} as const;
const defaultWebSocketFactory: WebSocketFactory = (url) =>
new WebSocket(url) as unknown as WebSocketAdapter
new WebSocket(url) as unknown as WebSocketAdapter;
// ============================================================================
// RpcClient Class | RPC 客户端类
@@ -164,34 +164,34 @@ interface PendingCall {
* ```
*/
export class RpcClient<P extends ProtocolDef> {
private readonly _url: string
private readonly _codec: Codec
private readonly _timeout: number
private readonly _reconnectInterval: number
private readonly _wsFactory: WebSocketFactory
private readonly _options: RpcClientOptions
private readonly _url: string;
private readonly _codec: Codec;
private readonly _timeout: number;
private readonly _reconnectInterval: number;
private readonly _wsFactory: WebSocketFactory;
private readonly _options: RpcClientOptions;
private _ws: WebSocketAdapter | null = null
private _status: ConnectionStatus = 'closed'
private _callIdCounter = 0
private _shouldReconnect: boolean
private _reconnectTimer: ReturnType<typeof setTimeout> | null = null
private _ws: WebSocketAdapter | null = null;
private _status: ConnectionStatus = 'closed';
private _callIdCounter = 0;
private _shouldReconnect: boolean;
private _reconnectTimer: ReturnType<typeof setTimeout> | null = null;
private readonly _pendingCalls = new Map<number, PendingCall>()
private readonly _msgHandlers = new Map<string, Set<(data: unknown) => void>>()
private readonly _pendingCalls = new Map<number, PendingCall>();
private readonly _msgHandlers = new Map<string, Set<(data: unknown) => void>>();
constructor(
_protocol: P,
url: string,
options: RpcClientOptions = {}
) {
this._url = url
this._options = options
this._codec = options.codec ?? json()
this._timeout = options.timeout ?? 30000
this._shouldReconnect = options.autoReconnect ?? true
this._reconnectInterval = options.reconnectInterval ?? 3000
this._wsFactory = options.webSocketFactory ?? defaultWebSocketFactory
this._url = url;
this._options = options;
this._codec = options.codec ?? json();
this._timeout = options.timeout ?? 30000;
this._shouldReconnect = options.autoReconnect ?? true;
this._reconnectInterval = options.reconnectInterval ?? 3000;
this._wsFactory = options.webSocketFactory ?? defaultWebSocketFactory;
}
/**
@@ -199,7 +199,7 @@ export class RpcClient<P extends ProtocolDef> {
* @en Connection status
*/
get status(): ConnectionStatus {
return this._status
return this._status;
}
/**
@@ -207,7 +207,7 @@ export class RpcClient<P extends ProtocolDef> {
* @en Whether connected
*/
get isConnected(): boolean {
return this._status === 'open'
return this._status === 'open';
}
/**
@@ -217,38 +217,38 @@ export class RpcClient<P extends ProtocolDef> {
connect(): Promise<this> {
return new Promise((resolve, reject) => {
if (this._status === 'open' || this._status === 'connecting') {
resolve(this)
return
resolve(this);
return;
}
this._status = 'connecting'
this._ws = this._wsFactory(this._url)
this._status = 'connecting';
this._ws = this._wsFactory(this._url);
this._ws.onopen = () => {
this._status = 'open'
this._options.onConnect?.()
resolve(this)
}
this._status = 'open';
this._options.onConnect?.();
resolve(this);
};
this._ws.onclose = (e) => {
this._status = 'closed'
this._rejectAllPending()
this._options.onDisconnect?.(e.reason)
this._scheduleReconnect()
}
this._status = 'closed';
this._rejectAllPending();
this._options.onDisconnect?.(e.reason);
this._scheduleReconnect();
};
this._ws.onerror = () => {
const err = new Error('WebSocket error')
this._options.onError?.(err)
const err = new Error('WebSocket error');
this._options.onError?.(err);
if (this._status === 'connecting') {
reject(err)
reject(err);
}
}
};
this._ws.onmessage = (e) => {
this._handleMessage(e.data as string | ArrayBuffer)
}
})
this._handleMessage(e.data as string | ArrayBuffer);
};
});
}
/**
@@ -256,12 +256,12 @@ export class RpcClient<P extends ProtocolDef> {
* @en Disconnect
*/
disconnect(): void {
this._shouldReconnect = false
this._clearReconnectTimer()
this._shouldReconnect = false;
this._clearReconnectTimer();
if (this._ws) {
this._status = 'closing'
this._ws.close()
this._ws = null
this._status = 'closing';
this._ws.close();
this._ws = null;
}
}
@@ -275,25 +275,25 @@ export class RpcClient<P extends ProtocolDef> {
): Promise<ApiOutput<P['api'][K]>> {
return new Promise((resolve, reject) => {
if (this._status !== 'open') {
reject(new RpcError(ErrorCode.CONNECTION_CLOSED, 'Not connected'))
return
reject(new RpcError(ErrorCode.CONNECTION_CLOSED, 'Not connected'));
return;
}
const id = ++this._callIdCounter
const id = ++this._callIdCounter;
const timer = setTimeout(() => {
this._pendingCalls.delete(id)
reject(new RpcError(ErrorCode.TIMEOUT, 'Request timeout'))
}, this._timeout)
this._pendingCalls.delete(id);
reject(new RpcError(ErrorCode.TIMEOUT, 'Request timeout'));
}, this._timeout);
this._pendingCalls.set(id, {
resolve: resolve as (v: unknown) => void,
reject,
timer,
})
timer
});
const packet: Packet = [PacketType.ApiRequest, id, name as string, input]
this._ws!.send(this._codec.encode(packet) as string | ArrayBuffer)
})
const packet: Packet = [PacketType.ApiRequest, id, name as string, input];
this._ws!.send(this._codec.encode(packet) as string | ArrayBuffer);
});
}
/**
@@ -301,9 +301,9 @@ export class RpcClient<P extends ProtocolDef> {
* @en Send message
*/
send<K extends MsgNames<P>>(name: K, data: MsgData<P['msg'][K]>): void {
if (this._status !== 'open') return
const packet: Packet = [PacketType.Message, name as string, data]
this._ws!.send(this._codec.encode(packet) as string | ArrayBuffer)
if (this._status !== 'open') return;
const packet: Packet = [PacketType.Message, name as string, data];
this._ws!.send(this._codec.encode(packet) as string | ArrayBuffer);
}
/**
@@ -314,14 +314,14 @@ export class RpcClient<P extends ProtocolDef> {
name: K,
handler: (data: MsgData<P['msg'][K]>) => void
): this {
const key = name as string
let handlers = this._msgHandlers.get(key)
const key = name as string;
let handlers = this._msgHandlers.get(key);
if (!handlers) {
handlers = new Set()
this._msgHandlers.set(key, handlers)
handlers = new Set();
this._msgHandlers.set(key, handlers);
}
handlers.add(handler as (data: unknown) => void)
return this
handlers.add(handler as (data: unknown) => void);
return this;
}
/**
@@ -332,13 +332,13 @@ export class RpcClient<P extends ProtocolDef> {
name: K,
handler?: (data: MsgData<P['msg'][K]>) => void
): this {
const key = name as string
const key = name as string;
if (handler) {
this._msgHandlers.get(key)?.delete(handler as (data: unknown) => void)
this._msgHandlers.get(key)?.delete(handler as (data: unknown) => void);
} else {
this._msgHandlers.delete(key)
this._msgHandlers.delete(key);
}
return this
return this;
}
/**
@@ -350,10 +350,10 @@ export class RpcClient<P extends ProtocolDef> {
handler: (data: MsgData<P['msg'][K]>) => void
): this {
const wrapper = (data: MsgData<P['msg'][K]>) => {
this.off(name, wrapper)
handler(data)
}
return this.on(name, wrapper)
this.off(name, wrapper);
handler(data);
};
return this.on(name, wrapper);
}
// ========================================================================
@@ -362,52 +362,52 @@ export class RpcClient<P extends ProtocolDef> {
private _handleMessage(raw: string | ArrayBuffer): void {
try {
const data = typeof raw === 'string' ? raw : new Uint8Array(raw)
const packet = this._codec.decode(data)
const type = packet[0]
const data = typeof raw === 'string' ? raw : new Uint8Array(raw);
const packet = this._codec.decode(data);
const type = packet[0];
switch (type) {
case PacketType.ApiResponse:
this._handleApiResponse(packet as [number, number, unknown])
break
this._handleApiResponse(packet as [number, number, unknown]);
break;
case PacketType.ApiError:
this._handleApiError(packet as [number, number, string, string])
break
this._handleApiError(packet as [number, number, string, string]);
break;
case PacketType.Message:
this._handleMsg(packet as [number, string, unknown])
break
this._handleMsg(packet as [number, string, unknown]);
break;
}
} catch (err) {
this._options.onError?.(err as Error)
this._options.onError?.(err as Error);
}
}
private _handleApiResponse([, id, result]: [number, number, unknown]): void {
const pending = this._pendingCalls.get(id)
const pending = this._pendingCalls.get(id);
if (pending) {
clearTimeout(pending.timer)
this._pendingCalls.delete(id)
pending.resolve(result)
clearTimeout(pending.timer);
this._pendingCalls.delete(id);
pending.resolve(result);
}
}
private _handleApiError([, id, code, message]: [number, number, string, string]): void {
const pending = this._pendingCalls.get(id)
const pending = this._pendingCalls.get(id);
if (pending) {
clearTimeout(pending.timer)
this._pendingCalls.delete(id)
pending.reject(new RpcError(code, message))
clearTimeout(pending.timer);
this._pendingCalls.delete(id);
pending.reject(new RpcError(code, message));
}
}
private _handleMsg([, path, data]: [number, string, unknown]): void {
const handlers = this._msgHandlers.get(path)
const handlers = this._msgHandlers.get(path);
if (handlers) {
for (const handler of handlers) {
try {
handler(data)
handler(data);
} catch (err) {
this._options.onError?.(err as Error)
this._options.onError?.(err as Error);
}
}
}
@@ -415,25 +415,25 @@ export class RpcClient<P extends ProtocolDef> {
private _rejectAllPending(): void {
for (const [, pending] of this._pendingCalls) {
clearTimeout(pending.timer)
pending.reject(new RpcError(ErrorCode.CONNECTION_CLOSED, 'Connection closed'))
clearTimeout(pending.timer);
pending.reject(new RpcError(ErrorCode.CONNECTION_CLOSED, 'Connection closed'));
}
this._pendingCalls.clear()
this._pendingCalls.clear();
}
private _scheduleReconnect(): void {
if (this._shouldReconnect && !this._reconnectTimer) {
this._reconnectTimer = setTimeout(() => {
this._reconnectTimer = null
this.connect().catch(() => {})
}, this._reconnectInterval)
this._reconnectTimer = null;
this.connect().catch(() => {});
}, this._reconnectInterval);
}
}
private _clearReconnectTimer(): void {
if (this._reconnectTimer) {
clearTimeout(this._reconnectTimer)
this._reconnectTimer = null
clearTimeout(this._reconnectTimer);
this._reconnectTimer = null;
}
}
}
@@ -457,5 +457,5 @@ export function connect<P extends ProtocolDef>(
url: string,
options: RpcClientOptions = {}
): Promise<RpcClient<P>> {
return new RpcClient(protocol, url, options).connect()
return new RpcClient(protocol, url, options).connect();
}

View File

@@ -3,7 +3,7 @@
* @en Codec Module
*/
export type { Codec } from './types'
export { json } from './json'
export { msgpack } from './msgpack'
export { textEncode, textDecode } from './polyfill'
export type { Codec } from './types';
export { json } from './json';
export { msgpack } from './msgpack';
export { textEncode, textDecode } from './polyfill';

View File

@@ -3,9 +3,9 @@
* @en JSON Codec
*/
import type { Packet } from '../types'
import type { Codec } from './types'
import { textDecode } from './polyfill'
import type { Packet } from '../types';
import type { Codec } from './types';
import { textDecode } from './polyfill';
/**
* @zh 创建 JSON 编解码器
@@ -17,14 +17,14 @@ import { textDecode } from './polyfill'
export function json(): Codec {
return {
encode(packet: Packet): string {
return JSON.stringify(packet)
return JSON.stringify(packet);
},
decode(data: string | Uint8Array): Packet {
const str = typeof data === 'string'
? data
: textDecode(data)
return JSON.parse(str) as Packet
},
}
: textDecode(data);
return JSON.parse(str) as Packet;
}
};
}

View File

@@ -3,10 +3,10 @@
* @en MessagePack Codec
*/
import { Packr, Unpackr } from 'msgpackr'
import type { Packet } from '../types'
import type { Codec } from './types'
import { textEncode } from './polyfill'
import { Packr, Unpackr } from 'msgpackr';
import type { Packet } from '../types';
import type { Codec } from './types';
import { textEncode } from './polyfill';
/**
* @zh 创建 MessagePack 编解码器
@@ -16,19 +16,19 @@ import { textEncode } from './polyfill'
* @en Suitable for production, smaller size and faster speed
*/
export function msgpack(): Codec {
const encoder = new Packr({ structuredClone: true })
const decoder = new Unpackr({ structuredClone: true })
const encoder = new Packr({ structuredClone: true });
const decoder = new Unpackr({ structuredClone: true });
return {
encode(packet: Packet): Uint8Array {
return encoder.pack(packet)
return encoder.pack(packet);
},
decode(data: string | Uint8Array): Packet {
const buf = typeof data === 'string'
? textEncode(data)
: data
return decoder.unpack(buf) as Packet
},
}
: data;
return decoder.unpack(buf) as Packet;
}
};
}

View File

@@ -12,38 +12,38 @@
*/
function getTextEncoder(): { encode(str: string): Uint8Array } {
if (typeof TextEncoder !== 'undefined') {
return new TextEncoder()
return new TextEncoder();
}
return {
encode(str: string): Uint8Array {
const utf8: number[] = []
const utf8: number[] = [];
for (let i = 0; i < str.length; i++) {
let charCode = str.charCodeAt(i)
let charCode = str.charCodeAt(i);
if (charCode < 0x80) {
utf8.push(charCode)
utf8.push(charCode);
} else if (charCode < 0x800) {
utf8.push(0xc0 | (charCode >> 6), 0x80 | (charCode & 0x3f))
utf8.push(0xc0 | (charCode >> 6), 0x80 | (charCode & 0x3f));
} else if (charCode >= 0xd800 && charCode <= 0xdbff) {
i++
const low = str.charCodeAt(i)
charCode = 0x10000 + ((charCode - 0xd800) << 10) + (low - 0xdc00)
i++;
const low = str.charCodeAt(i);
charCode = 0x10000 + ((charCode - 0xd800) << 10) + (low - 0xdc00);
utf8.push(
0xf0 | (charCode >> 18),
0x80 | ((charCode >> 12) & 0x3f),
0x80 | ((charCode >> 6) & 0x3f),
0x80 | (charCode & 0x3f)
)
);
} else {
utf8.push(
0xe0 | (charCode >> 12),
0x80 | ((charCode >> 6) & 0x3f),
0x80 | (charCode & 0x3f)
)
);
}
}
return new Uint8Array(utf8)
},
}
return new Uint8Array(utf8);
}
};
}
/**
@@ -52,55 +52,55 @@ function getTextEncoder(): { encode(str: string): Uint8Array } {
*/
function getTextDecoder(): { decode(data: Uint8Array): string } {
if (typeof TextDecoder !== 'undefined') {
return new TextDecoder()
return new TextDecoder();
}
return {
decode(data: Uint8Array): string {
let str = ''
let i = 0
let str = '';
let i = 0;
while (i < data.length) {
const byte1 = data[i++]
const byte1 = data[i++];
if (byte1 < 0x80) {
str += String.fromCharCode(byte1)
str += String.fromCharCode(byte1);
} else if ((byte1 & 0xe0) === 0xc0) {
const byte2 = data[i++]
str += String.fromCharCode(((byte1 & 0x1f) << 6) | (byte2 & 0x3f))
const byte2 = data[i++];
str += String.fromCharCode(((byte1 & 0x1f) << 6) | (byte2 & 0x3f));
} else if ((byte1 & 0xf0) === 0xe0) {
const byte2 = data[i++]
const byte3 = data[i++]
const byte2 = data[i++];
const byte3 = data[i++];
str += String.fromCharCode(
((byte1 & 0x0f) << 12) | ((byte2 & 0x3f) << 6) | (byte3 & 0x3f)
)
);
} else if ((byte1 & 0xf8) === 0xf0) {
const byte2 = data[i++]
const byte3 = data[i++]
const byte4 = data[i++]
const byte2 = data[i++];
const byte3 = data[i++];
const byte4 = data[i++];
const codePoint =
((byte1 & 0x07) << 18) |
((byte2 & 0x3f) << 12) |
((byte3 & 0x3f) << 6) |
(byte4 & 0x3f)
const offset = codePoint - 0x10000
(byte4 & 0x3f);
const offset = codePoint - 0x10000;
str += String.fromCharCode(
0xd800 + (offset >> 10),
0xdc00 + (offset & 0x3ff)
)
);
}
}
return str
},
}
return str;
}
};
}
const encoder = getTextEncoder()
const decoder = getTextDecoder()
const encoder = getTextEncoder();
const decoder = getTextDecoder();
/**
* @zh 将字符串编码为 UTF-8 字节数组
* @en Encode string to UTF-8 byte array
*/
export function textEncode(str: string): Uint8Array {
return encoder.encode(str)
return encoder.encode(str);
}
/**
@@ -108,5 +108,5 @@ export function textEncode(str: string): Uint8Array {
* @en Decode UTF-8 byte array to string
*/
export function textDecode(data: Uint8Array): string {
return decoder.decode(data)
return decoder.decode(data);
}

View File

@@ -3,7 +3,7 @@
* @en Codec Type Definitions
*/
import type { Packet } from '../types'
import type { Packet } from '../types';
/**
* @zh 编解码器接口

View File

@@ -3,7 +3,7 @@
* @en Protocol Definition Module
*/
import type { ApiDef, MsgDef, ProtocolDef } from './types'
import type { ApiDef, MsgDef, ProtocolDef } from './types';
/**
* @zh 创建 API 定义
@@ -15,7 +15,7 @@ import type { ApiDef, MsgDef, ProtocolDef } from './types'
* ```
*/
function api<TInput = void, TOutput = void>(): ApiDef<TInput, TOutput> {
return { _type: 'api' } as ApiDef<TInput, TOutput>
return { _type: 'api' } as ApiDef<TInput, TOutput>;
}
/**
@@ -28,7 +28,7 @@ function api<TInput = void, TOutput = void>(): ApiDef<TInput, TOutput> {
* ```
*/
function msg<TData = void>(): MsgDef<TData> {
return { _type: 'msg' } as MsgDef<TData>
return { _type: 'msg' } as MsgDef<TData>;
}
/**
@@ -49,7 +49,7 @@ function msg<TData = void>(): MsgDef<TData> {
* ```
*/
function define<T extends ProtocolDef>(protocol: T): T {
return protocol
return protocol;
}
/**
@@ -59,5 +59,5 @@ function define<T extends ProtocolDef>(protocol: T): T {
export const rpc = {
define,
api,
msg,
} as const
msg
} as const;

View File

@@ -38,9 +38,9 @@
* ```
*/
export { rpc } from './define'
export * from './types'
export { rpc } from './define';
export * from './types';
// Re-export client for browser/bundler compatibility
export { RpcClient, connect } from './client/index'
export type { RpcClientOptions, WebSocketAdapter, WebSocketFactory } from './client/index'
export { RpcClient, connect } from './client/index';
export type { RpcClientOptions, WebSocketAdapter, WebSocketFactory } from './client/index';

View File

@@ -3,37 +3,38 @@
* @en Server Connection Module
*/
import type { Connection, ConnectionStatus } from '../types'
import type { WebSocket } from 'ws';
import type { Connection, ConnectionStatus } from '../types';
/**
* @zh 服务端连接实现
* @en Server connection implementation
*/
export class ServerConnection<TData = unknown> implements Connection<TData> {
readonly id: string
readonly ip: string
data: TData
readonly id: string;
readonly ip: string;
data: TData;
private _status: ConnectionStatus = 'open'
private _socket: any
private _onClose?: () => void
private _status: ConnectionStatus = 'open';
private _socket: WebSocket;
private _onClose?: () => void;
constructor(options: {
id: string
ip: string
socket: any
socket: WebSocket
initialData: TData
onClose?: () => void
}) {
this.id = options.id
this.ip = options.ip
this.data = options.initialData
this._socket = options.socket
this._onClose = options.onClose
this.id = options.id;
this.ip = options.ip;
this.data = options.initialData;
this._socket = options.socket;
this._onClose = options.onClose;
}
get status(): ConnectionStatus {
return this._status
return this._status;
}
/**
@@ -41,8 +42,20 @@ export class ServerConnection<TData = unknown> implements Connection<TData> {
* @en Send raw data
*/
send(data: string | Uint8Array): void {
if (this._status !== 'open') return
this._socket.send(data)
if (this._status !== 'open') return;
this._socket.send(data);
}
/**
* @zh 发送二进制数据(原生 WebSocket 二进制帧)
* @en Send binary data (native WebSocket binary frame)
*
* @zh 直接发送 Uint8Array不经过 JSON 编码,效率更高
* @en Directly sends Uint8Array without JSON encoding, more efficient
*/
sendBinary(data: Uint8Array): void {
if (this._status !== 'open') return;
this._socket.send(data, { binary: true });
}
/**
@@ -50,12 +63,12 @@ export class ServerConnection<TData = unknown> implements Connection<TData> {
* @en Close connection
*/
close(reason?: string): void {
if (this._status !== 'open') return
if (this._status !== 'open') return;
this._status = 'closing'
this._socket.close(1000, reason)
this._status = 'closed'
this._onClose?.()
this._status = 'closing';
this._socket.close(1000, reason);
this._status = 'closed';
this._onClose?.();
}
/**
@@ -63,6 +76,6 @@ export class ServerConnection<TData = unknown> implements Connection<TData> {
* @en Mark connection as closed (internal use)
*/
_markClosed(): void {
this._status = 'closed'
this._status = 'closed';
}
}

View File

@@ -3,8 +3,8 @@
* @en RPC Server Module
*/
import { WebSocketServer, WebSocket } from 'ws'
import type { Server as HttpServer } from 'node:http'
import { WebSocketServer, WebSocket } from 'ws';
import type { Server as HttpServer } from 'node:http';
import type {
ProtocolDef,
ApiNames,
@@ -13,13 +13,13 @@ import type {
ApiOutput,
MsgData,
Packet,
PacketType,
Connection,
} from '../types'
import { RpcError, ErrorCode } from '../types'
import { json } from '../codec/json'
import type { Codec } from '../codec/types'
import { ServerConnection } from './connection'
Connection
} from '../types';
import type { IncomingMessage } from 'node:http';
import { RpcError, ErrorCode } from '../types';
import { json } from '../codec/json';
import type { Codec } from '../codec/types';
import { ServerConnection } from './connection';
// ============ Types ============
@@ -182,8 +182,8 @@ const PT = {
ApiResponse: 1,
ApiError: 2,
Message: 3,
Heartbeat: 9,
} as const
Heartbeat: 9
} as const;
/**
* @zh 创建 RPC 服务器
@@ -206,16 +206,22 @@ export function serve<P extends ProtocolDef, TConnData = unknown>(
_protocol: P,
options: ServeOptions<P, TConnData>
): RpcServer<P, TConnData> {
const codec = options.codec ?? json()
const connections: ServerConnection<TConnData>[] = []
let wss: WebSocketServer | null = null
let connIdCounter = 0
const codec = options.codec ?? json();
const connections: ServerConnection<TConnData>[] = [];
let wss: WebSocketServer | null = null;
let connIdCounter = 0;
const getClientIp = (ws: WebSocket, req: any): string => {
return req?.headers?.['x-forwarded-for']?.split(',')[0]?.trim()
const getClientIp = (_ws: WebSocket, req: IncomingMessage | undefined): string => {
const forwarded = req?.headers?.['x-forwarded-for'];
const forwardedIp = typeof forwarded === 'string'
? forwarded.split(',')[0]?.trim()
: Array.isArray(forwarded)
? forwarded[0]?.split(',')[0]?.trim()
: undefined;
return forwardedIp
|| req?.socket?.remoteAddress
|| 'unknown'
}
|| 'unknown';
};
const handleMessage = async (
conn: ServerConnection<TConnData>,
@@ -224,23 +230,23 @@ export function serve<P extends ProtocolDef, TConnData = unknown>(
try {
const packet = codec.decode(
typeof data === 'string' ? data : new Uint8Array(data)
)
);
const type = packet[0]
const type = packet[0];
if (type === PT.ApiRequest) {
const [, id, path, input] = packet as [number, number, string, unknown]
await handleApiRequest(conn, id, path, input)
const [, id, path, input] = packet as [number, number, string, unknown];
await handleApiRequest(conn, id, path, input);
} else if (type === PT.Message) {
const [, path, msgData] = packet as [number, string, unknown]
await handleMsg(conn, path, msgData)
const [, path, msgData] = packet as [number, string, unknown];
await handleMsg(conn, path, msgData);
} else if (type === PT.Heartbeat) {
conn.send(codec.encode([PT.Heartbeat]))
conn.send(codec.encode([PT.Heartbeat]));
}
} catch (err) {
options.onError?.(err as Error, conn)
options.onError?.(err as Error, conn);
}
}
};
const handleApiRequest = async (
conn: ServerConnection<TConnData>,
@@ -248,44 +254,46 @@ export function serve<P extends ProtocolDef, TConnData = unknown>(
path: string,
input: unknown
): Promise<void> => {
const handler = (options.api as any)[path]
const apiHandlers = options.api as Record<string, ApiHandler<unknown, unknown, TConnData> | undefined>;
const handler = apiHandlers[path];
if (!handler) {
const errPacket: Packet = [PT.ApiError, id, ErrorCode.NOT_FOUND, `API not found: ${path}`]
conn.send(codec.encode(errPacket))
return
const errPacket: Packet = [PT.ApiError, id, ErrorCode.NOT_FOUND, `API not found: ${path}`];
conn.send(codec.encode(errPacket));
return;
}
try {
const result = await handler(input, conn)
const resPacket: Packet = [PT.ApiResponse, id, result]
conn.send(codec.encode(resPacket))
const result = await handler(input, conn);
const resPacket: Packet = [PT.ApiResponse, id, result];
conn.send(codec.encode(resPacket));
} catch (err) {
if (err instanceof RpcError) {
const errPacket: Packet = [PT.ApiError, id, err.code, err.message]
conn.send(codec.encode(errPacket))
const errPacket: Packet = [PT.ApiError, id, err.code, err.message];
conn.send(codec.encode(errPacket));
} else {
const errPacket: Packet = [PT.ApiError, id, ErrorCode.INTERNAL_ERROR, 'Internal server error']
conn.send(codec.encode(errPacket))
options.onError?.(err as Error, conn)
const errPacket: Packet = [PT.ApiError, id, ErrorCode.INTERNAL_ERROR, 'Internal server error'];
conn.send(codec.encode(errPacket));
options.onError?.(err as Error, conn);
}
}
}
};
const handleMsg = async (
conn: ServerConnection<TConnData>,
path: string,
data: unknown
): Promise<void> => {
const handler = options.msg?.[path as MsgNames<P>]
const msgHandlers = options.msg as Record<string, MsgHandler<unknown, TConnData> | undefined> | undefined;
const handler = msgHandlers?.[path];
if (handler) {
await (handler as any)(data, conn)
await handler(data, conn);
}
}
};
const server: RpcServer<P, TConnData> = {
get connections() {
return connections as ReadonlyArray<Connection<TConnData>>
return connections as ReadonlyArray<Connection<TConnData>>;
},
async start() {
@@ -293,18 +301,18 @@ export function serve<P extends ProtocolDef, TConnData = unknown>(
// 根据配置创建 WebSocketServer
if (options.server) {
// 附加到已有的 HTTP 服务器
wss = new WebSocketServer({ server: options.server })
wss = new WebSocketServer({ server: options.server });
} else if (options.port) {
// 独立创建
wss = new WebSocketServer({ port: options.port })
wss = new WebSocketServer({ port: options.port });
} else {
throw new Error('Either port or server must be provided')
throw new Error('Either port or server must be provided');
}
wss.on('connection', async (ws, req) => {
const id = String(++connIdCounter)
const ip = getClientIp(ws, req)
const initialData = options.createConnData?.() ?? ({} as TConnData)
const id = String(++connIdCounter);
const ip = getClientIp(ws, req);
const initialData = options.createConnData?.() ?? ({} as TConnData);
const conn = new ServerConnection<TConnData>({
id,
@@ -312,70 +320,70 @@ export function serve<P extends ProtocolDef, TConnData = unknown>(
socket: ws,
initialData,
onClose: () => {
const idx = connections.indexOf(conn)
if (idx !== -1) connections.splice(idx, 1)
},
})
const idx = connections.indexOf(conn);
if (idx !== -1) connections.splice(idx, 1);
}
});
connections.push(conn)
connections.push(conn);
ws.on('message', (data) => {
handleMessage(conn, data as string | Buffer)
})
handleMessage(conn, data as string | Buffer);
});
ws.on('close', async (code, reason) => {
conn._markClosed()
const idx = connections.indexOf(conn)
if (idx !== -1) connections.splice(idx, 1)
await options.onDisconnect?.(conn, reason?.toString())
})
conn._markClosed();
const idx = connections.indexOf(conn);
if (idx !== -1) connections.splice(idx, 1);
await options.onDisconnect?.(conn, reason?.toString());
});
ws.on('error', (err) => {
options.onError?.(err, conn)
})
options.onError?.(err, conn);
});
await options.onConnect?.(conn)
})
await options.onConnect?.(conn);
});
// 如果使用已有的 HTTP 服务器WebSocketServer 不会触发 listening 事件
if (options.server) {
options.onStart?.(0) // 端口由 HTTP 服务器管理
resolve()
options.onStart?.(0); // 端口由 HTTP 服务器管理
resolve();
} else {
wss.on('listening', () => {
options.onStart?.(options.port!)
resolve()
})
options.onStart?.(options.port!);
resolve();
});
}
})
});
},
async stop() {
return new Promise((resolve, reject) => {
if (!wss) {
resolve()
return
resolve();
return;
}
for (const conn of connections) {
conn.close('Server shutting down')
conn.close('Server shutting down');
}
wss.close((err) => {
if (err) reject(err)
else resolve()
})
})
if (err) reject(err);
else resolve();
});
});
},
send(conn, name, data) {
const packet: Packet = [PT.Message, name as string, data]
;(conn as ServerConnection<TConnData>).send(codec.encode(packet))
;(conn as ServerConnection<TConnData>).send(codec.encode(packet));
},
broadcast(name, data, opts) {
const packet: Packet = [PT.Message, name as string, data]
const encoded = codec.encode(packet)
const packet: Packet = [PT.Message, name as string, data];
const encoded = codec.encode(packet);
const excludeSet = new Set(
Array.isArray(opts?.exclude)
@@ -383,15 +391,15 @@ export function serve<P extends ProtocolDef, TConnData = unknown>(
: opts?.exclude
? [opts.exclude]
: []
)
);
for (const conn of connections) {
if (!excludeSet.has(conn)) {
conn.send(encoded)
conn.send(encoded);
}
}
},
}
}
};
return server
return server;
}

View File

@@ -29,8 +29,8 @@ export interface MsgDef<TData = unknown> {
* @en Protocol definition
*/
export interface ProtocolDef {
readonly api: Record<string, ApiDef<any, any>>
readonly msg: Record<string, MsgDef<any>>
readonly api: Record<string, ApiDef<unknown, unknown>>
readonly msg: Record<string, MsgDef<unknown>>
}
// ============ Type Inference ============
@@ -39,13 +39,13 @@ export interface ProtocolDef {
* @zh 提取 API 输入类型
* @en Extract API input type
*/
export type ApiInput<T> = T extends ApiDef<infer I, any> ? I : never
export type ApiInput<T> = T extends ApiDef<infer I, unknown> ? I : never
/**
* @zh 提取 API 输出类型
* @en Extract API output type
*/
export type ApiOutput<T> = T extends ApiDef<any, infer O> ? O : never
export type ApiOutput<T> = T extends ApiDef<unknown, infer O> ? O : never
/**
* @zh 提取消息数据类型
@@ -120,8 +120,9 @@ export const PacketType = {
ApiResponse: 1,
ApiError: 2,
Message: 3,
Heartbeat: 9,
} as const
Binary: 4,
Heartbeat: 9
} as const;
export type PacketType = typeof PacketType[keyof typeof PacketType]
@@ -173,6 +174,19 @@ export type MessagePacket = [
*/
export type HeartbeatPacket = [type: typeof PacketType.Heartbeat]
/**
* @zh 二进制数据包
* @en Binary data packet
*
* @zh 用于传输原始二进制数据,如 ECS 状态同步
* @en Used for raw binary data transmission, such as ECS state sync
*/
export type BinaryPacket = [
type: typeof PacketType.Binary,
channel: number,
data: Uint8Array
]
/**
* @zh 所有数据包类型
* @en All packet types
@@ -182,6 +196,7 @@ export type Packet =
| ApiResponsePacket
| ApiErrorPacket
| MessagePacket
| BinaryPacket
| HeartbeatPacket
// ============ Error Types ============
@@ -196,8 +211,8 @@ export class RpcError extends Error {
message: string,
public readonly details?: unknown
) {
super(message)
this.name = 'RpcError'
super(message);
this.name = 'RpcError';
}
}
@@ -211,7 +226,7 @@ export const ErrorCode = {
UNAUTHORIZED: 'UNAUTHORIZED',
INTERNAL_ERROR: 'INTERNAL_ERROR',
TIMEOUT: 'TIMEOUT',
CONNECTION_CLOSED: 'CONNECTION_CLOSED',
} as const
CONNECTION_CLOSED: 'CONNECTION_CLOSED'
} as const;
export type ErrorCode = typeof ErrorCode[keyof typeof ErrorCode]

View File

@@ -1,5 +1,44 @@
# @esengine/server
## 4.5.0
### Minor Changes
- [#421](https://github.com/esengine/esengine/pull/421) [`f333b81`](https://github.com/esengine/esengine/commit/f333b81298a386a812b2428d3dcdce03d257fef8) Thanks [@esengine](https://github.com/esengine)! - feat(server): 添加分布式房间支持 | Add distributed room support
**@esengine/server** - 新增分布式房间管理功能 | Added distributed room management features
- 新增 `DistributedRoomManager` 支持多服务器房间管理 | Added `DistributedRoomManager` for multi-server room management
- 新增 `MemoryAdapter` 用于测试和单机模式 | Added `MemoryAdapter` for testing and standalone mode
- 新增 `RedisAdapter` 用于生产环境多服务器部署 | Added `RedisAdapter` for production multi-server deployments
- 新增 `LoadBalancedRouter` 支持 5 种负载均衡策略 | Added `LoadBalancedRouter` with 5 load balancing strategies
- round-robin: 轮询 | Round robin
- least-rooms: 最少房间数 | Fewest rooms
- least-players: 最少玩家数 | Fewest players
- random: 随机选择 | Random selection
- weighted: 权重(基于容量使用率)| Weighted by capacity usage
- `createServer` 新增 `distributed` 配置选项 | Added `distributed` config option to `createServer`
- 新增 `$redirect` 消息用于跨服务器玩家重定向 | Added `$redirect` message for cross-server player redirection
- 新增故障转移机制,服务器离线时自动恢复房间 | Added failover mechanism for automatic room recovery on server offline
- 新增 `room:migrated``server:draining` 事件类型 | Added `room:migrated` and `server:draining` event types
## 4.4.0
### Minor Changes
- [#419](https://github.com/esengine/esengine/pull/419) [`3b6fc82`](https://github.com/esengine/esengine/commit/3b6fc8266fa8e4d43058a44b48bf9169f78de068) Thanks [@esengine](https://github.com/esengine)! - feat(server): HTTP 路由增强 | HTTP router enhancement
**新功能 | New Features**
- 路由参数支持:`/users/:id``req.params.id` | Route parameters: `/users/:id``req.params.id`
- 中间件支持:全局和路由级中间件 | Middleware support: global and route-level
- 请求超时控制:全局和路由级超时 | Request timeout: global and route-level
**内置中间件 | Built-in Middleware**
- `requestLogger()` - 请求日志 | Request logging
- `bodyLimit()` - 请求体大小限制 | Body size limit
- `responseTime()` - 响应时间头 | Response time header
- `requestId()` - 请求 ID | Request ID
- `securityHeaders()` - 安全头 | Security headers
## 4.3.0
### Minor Changes

View File

@@ -1,6 +1,6 @@
{
"name": "@esengine/server",
"version": "4.3.0",
"version": "4.5.0",
"description": "Game server framework for ESEngine with file-based routing",
"type": "module",
"main": "./dist/index.js",

View File

@@ -19,15 +19,18 @@ import type {
LoadedHttpHandler
} from '../types/index.js';
import type { HttpRoutes, HttpHandler } from '../http/types.js';
import type { Validator } from '../schema/index.js';
import { loadApiHandlers, loadMsgHandlers, loadHttpHandlers } from '../router/loader.js';
import { RoomManager, type RoomClass, type Room } from '../room/index.js';
import { createHttpRouter } from '../http/router.js';
import { DistributedRoomManager } from '../distributed/DistributedRoomManager.js';
import { MemoryAdapter } from '../distributed/adapters/MemoryAdapter.js';
/**
* @zh 默认配置
* @en Default configuration
*/
const DEFAULT_CONFIG: Required<Omit<ServerConfig, 'onStart' | 'onConnect' | 'onDisconnect' | 'http' | 'cors' | 'httpDir' | 'httpPrefix'>> & { httpDir: string; httpPrefix: string } = {
const DEFAULT_CONFIG: Required<Omit<ServerConfig, 'onStart' | 'onConnect' | 'onDisconnect' | 'http' | 'cors' | 'httpDir' | 'httpPrefix' | 'distributed'>> & { httpDir: string; httpPrefix: string } = {
port: 3000,
apiDir: 'src/api',
msgDir: 'src/msg',
@@ -112,6 +115,10 @@ export async function createServer(config: ServerConfig = {}): Promise<GameServe
const hasHttpRoutes = Object.keys(mergedHttpRoutes).length > 0;
// 分布式模式配置
const distributedConfig = config.distributed;
const isDistributed = distributedConfig?.enabled ?? false;
// 动态构建协议
const apiDefs: Record<string, ReturnType<typeof rpc.api>> = {
// 内置 API
@@ -120,7 +127,9 @@ export async function createServer(config: ServerConfig = {}): Promise<GameServe
};
const msgDefs: Record<string, ReturnType<typeof rpc.msg>> = {
// 内置消息(房间消息透传)
RoomMessage: rpc.msg()
RoomMessage: rpc.msg(),
// 分布式重定向消息
$redirect: rpc.msg()
};
for (const handler of apiHandlers) {
@@ -141,10 +150,45 @@ export async function createServer(config: ServerConfig = {}): Promise<GameServe
let rpcServer: RpcServer<typeof protocol, Record<string, unknown>> | null = null;
let httpServer: HttpServer | null = null;
// 房间管理器(立即初始化,以便 define() 可在 start() 前调用
const roomManager = new RoomManager((conn, type, data) => {
// 发送函数(延迟绑定,因为 rpcServer 在 start() 后才创建
const sendFn = (conn: any, type: string, data: unknown) => {
rpcServer?.send(conn, 'RoomMessage' as any, { type, data } as any);
});
};
// 二进制发送函数(使用原生 WebSocket 二进制帧,效率更高)
const sendBinaryFn = (conn: any, data: Uint8Array) => {
if (conn && typeof conn.sendBinary === 'function') {
conn.sendBinary(data);
}
};
// 房间管理器(立即初始化,以便 define() 可在 start() 前调用)
let roomManager: RoomManager | DistributedRoomManager;
let distributedManager: DistributedRoomManager | null = null;
if (isDistributed && distributedConfig) {
// 分布式模式
const adapter = distributedConfig.adapter ?? new MemoryAdapter();
distributedManager = new DistributedRoomManager(
adapter,
{
serverId: distributedConfig.serverId,
serverAddress: distributedConfig.serverAddress,
serverPort: distributedConfig.serverPort ?? opts.port,
heartbeatInterval: distributedConfig.heartbeatInterval,
snapshotInterval: distributedConfig.snapshotInterval,
enableFailover: distributedConfig.enableFailover,
capacity: distributedConfig.capacity
},
sendFn,
sendBinaryFn
);
roomManager = distributedManager;
logger.info(`Distributed mode enabled (serverId: ${distributedConfig.serverId})`);
} else {
// 单机模式
roomManager = new RoomManager(sendFn, sendBinaryFn);
}
// 构建 API 处理器映射
const apiMap: Record<string, LoadedApiHandler> = {};
@@ -203,6 +247,29 @@ export async function createServer(config: ServerConfig = {}): Promise<GameServe
}
if (roomType) {
// 分布式模式:使用 joinOrCreateDistributed
if (distributedManager) {
const result = await distributedManager.joinOrCreateDistributed(
roomType,
conn.id,
conn,
options
);
if (!result) {
throw new Error('Failed to join or create room');
}
if ('redirect' in result) {
// 发送重定向消息给客户端
rpcServer?.send(conn, '$redirect' as any, {
address: result.redirect,
roomType
} as any);
return { redirect: result.redirect };
}
return { roomId: result.room.id, playerId: result.player.id };
}
// 单机模式
const result = await roomManager.joinOrCreate(roomType, conn.id, conn, options);
if (!result) {
throw new Error('Failed to join or create room');
@@ -226,6 +293,19 @@ export async function createServer(config: ServerConfig = {}): Promise<GameServe
conn: conn as ServerConnection,
server: gameServer
};
const definition = handler.definition as { schema?: Validator<unknown> };
if (definition.schema) {
const result = definition.schema.validate(input);
if (!result.success) {
const pathStr = result.error.path.length > 0
? ` at "${result.error.path.join('.')}"`
: '';
throw new Error(`Validation failed${pathStr}: ${result.error.message}`);
}
return handler.definition.handler(result.data, ctx);
}
return handler.definition.handler(input, ctx);
};
}
@@ -246,6 +326,21 @@ export async function createServer(config: ServerConfig = {}): Promise<GameServe
conn: conn as ServerConnection,
server: gameServer
};
const definition = handler.definition as { schema?: Validator<unknown> };
if (definition.schema) {
const result = definition.schema.validate(data);
if (!result.success) {
const pathStr = result.error.path.length > 0
? ` at "${result.error.path.join('.')}"`
: '';
logger.warn(`Message validation failed for ${name}${pathStr}: ${result.error.message}`);
return;
}
await handler.definition.handler(result.data, ctx);
return;
}
await handler.definition.handler(data, ctx);
};
}
@@ -315,6 +410,11 @@ export async function createServer(config: ServerConfig = {}): Promise<GameServe
await rpcServer.start();
}
// 启动分布式管理器
if (distributedManager) {
await distributedManager.start();
}
// 启动 tick 循环
if (opts.tickRate > 0) {
tickInterval = setInterval(() => {
@@ -328,6 +428,12 @@ export async function createServer(config: ServerConfig = {}): Promise<GameServe
clearInterval(tickInterval);
tickInterval = null;
}
// 停止分布式管理器(优雅关闭)
if (distributedManager) {
await distributedManager.stop(true);
}
if (rpcServer) {
await rpcServer.stop();
rpcServer = null;

View File

@@ -0,0 +1,707 @@
/**
* @zh 分布式房间管理器
* @en Distributed room manager
*
* @zh 继承 RoomManager添加分布式功能支持。包括跨服务器房间注册、
* 玩家路由、状态同步和故障转移。
* @en Extends RoomManager with distributed features. Includes cross-server room
* registration, player routing, state synchronization, and failover.
*/
import { RoomManager } from '../room/RoomManager.js';
import { Room, type RoomOptions } from '../room/Room.js';
import type { Player } from '../room/Player.js';
import type { IDistributedAdapter } from './adapters/IDistributedAdapter.js';
import type {
DistributedRoomManagerConfig,
RoomRegistration,
RoutingResult,
RoutingRequest,
ServerRegistration,
DistributedEvent,
Unsubscribe
} from './types.js';
import { createLogger } from '../logger.js';
const logger = createLogger('DistributedRoom');
/**
* @zh 分布式房间管理器配置(内部使用)
* @en Distributed room manager configuration (internal use)
*/
interface InternalConfig extends Required<Omit<DistributedRoomManagerConfig, 'metadata'>> {
metadata: Record<string, unknown>;
}
/**
* @zh 分布式房间管理器
* @en Distributed room manager
*
* @zh 扩展基础 RoomManager添加以下功能
* - 服务器注册和心跳
* - 跨服务器房间注册
* - 玩家路由和重定向
* - 状态快照和恢复
* - 分布式锁防止竞态
* @en Extends base RoomManager with:
* - Server registration and heartbeat
* - Cross-server room registration
* - Player routing and redirection
* - State snapshots and recovery
* - Distributed locks to prevent race conditions
*/
export class DistributedRoomManager extends RoomManager {
private readonly _adapter: IDistributedAdapter;
private readonly _config: InternalConfig;
private readonly _serverId: string;
private _heartbeatTimer: ReturnType<typeof setInterval> | null = null;
private _snapshotTimer: ReturnType<typeof setInterval> | null = null;
private _subscriptions: Unsubscribe[] = [];
private _isShuttingDown = false;
/**
* @zh 创建分布式房间管理器
* @en Create distributed room manager
*
* @param adapter - 分布式适配器 | Distributed adapter
* @param config - 配置 | Configuration
* @param sendFn - 消息发送函数 | Message send function
* @param sendBinaryFn - 二进制发送函数 | Binary send function
*/
constructor(
adapter: IDistributedAdapter,
config: DistributedRoomManagerConfig,
sendFn: (conn: any, type: string, data: unknown) => void,
sendBinaryFn?: (conn: any, data: Uint8Array) => void
) {
super(sendFn, sendBinaryFn);
this._adapter = adapter;
this._serverId = config.serverId;
this._config = {
serverId: config.serverId,
serverAddress: config.serverAddress,
serverPort: config.serverPort,
heartbeatInterval: config.heartbeatInterval ?? 5000,
snapshotInterval: config.snapshotInterval ?? 30000,
migrationTimeout: config.migrationTimeout ?? 10000,
enableFailover: config.enableFailover ?? true,
capacity: config.capacity ?? 100,
metadata: config.metadata ?? {}
};
}
/**
* @zh 获取服务器 ID
* @en Get server ID
*/
get serverId(): string {
return this._serverId;
}
/**
* @zh 获取分布式适配器
* @en Get distributed adapter
*/
get adapter(): IDistributedAdapter {
return this._adapter;
}
/**
* @zh 获取配置
* @en Get configuration
*/
get config(): Readonly<InternalConfig> {
return this._config;
}
// =========================================================================
// 生命周期 | Lifecycle
// =========================================================================
/**
* @zh 启动分布式房间管理器
* @en Start distributed room manager
*/
async start(): Promise<void> {
if (!this._adapter.isConnected()) {
await this._adapter.connect();
}
// 注册服务器 | Register server
await this._registerServer();
// 订阅事件 | Subscribe to events
await this._subscribeToEvents();
// 启动心跳 | Start heartbeat
this._startHeartbeat();
// 启动快照(如果启用)| Start snapshots (if enabled)
if (this._config.snapshotInterval > 0) {
this._startSnapshotTimer();
}
logger.info(`Distributed room manager started: ${this._serverId}`);
}
/**
* @zh 停止分布式房间管理器
* @en Stop distributed room manager
*
* @param graceful - 是否优雅关闭(等待玩家退出)| Whether to gracefully shutdown (wait for players)
*/
async stop(graceful = true): Promise<void> {
this._isShuttingDown = true;
// 停止定时器 | Stop timers
if (this._heartbeatTimer) {
clearInterval(this._heartbeatTimer);
this._heartbeatTimer = null;
}
if (this._snapshotTimer) {
clearInterval(this._snapshotTimer);
this._snapshotTimer = null;
}
// 取消订阅 | Unsubscribe
for (const unsub of this._subscriptions) {
unsub();
}
this._subscriptions = [];
if (graceful) {
// 标记为 draining停止接收新玩家 | Mark as draining, stop accepting new players
await this._adapter.updateServer(this._serverId, { status: 'draining' });
// 保存所有房间状态快照 | Save all room state snapshots
await this._saveAllSnapshots();
}
// 注销服务器 | Unregister server
await this._adapter.unregisterServer(this._serverId);
logger.info(`Distributed room manager stopped: ${this._serverId}`);
}
// =========================================================================
// 房间操作覆盖 | Room Operation Overrides
// =========================================================================
/**
* @zh 房间创建后注册到分布式系统
* @en Register room to distributed system after creation
*/
protected override async _onRoomCreated(name: string, room: Room): Promise<void> {
const registration: RoomRegistration = {
roomId: room.id,
roomType: name,
serverId: this._serverId,
serverAddress: `${this._config.serverAddress}:${this._config.serverPort}`,
playerCount: room.players.length,
maxPlayers: room.maxPlayers,
isLocked: room.isLocked,
metadata: {},
createdAt: Date.now(),
updatedAt: Date.now()
};
await this._adapter.registerRoom(registration);
logger.debug(`Registered room: ${room.id}`);
}
/**
* @zh 房间销毁时从分布式系统注销
* @en Unregister room from distributed system when disposed
*/
protected override _onRoomDisposed(roomId: string): void {
super._onRoomDisposed(roomId);
// 异步注销房间 | Async unregister room
this._adapter.unregisterRoom(roomId).catch(err => {
logger.error(`Failed to unregister room ${roomId}:`, err);
});
// 删除快照 | Delete snapshot
this._adapter.deleteSnapshot(roomId).catch(err => {
logger.error(`Failed to delete snapshot for ${roomId}:`, err);
});
}
/**
* @zh 玩家加入后更新分布式房间信息
* @en Update distributed room info after player joins
*/
protected override _onPlayerJoined(playerId: string, roomId: string, player: Player): void {
super._onPlayerJoined(playerId, roomId, player);
const room = this._rooms.get(roomId);
if (room) {
this._adapter.updateRoom(roomId, {
playerCount: room.players.length,
updatedAt: Date.now()
}).catch(err => {
logger.error(`Failed to update room ${roomId}:`, err);
});
}
}
/**
* @zh 玩家离开后更新分布式房间信息
* @en Update distributed room info after player leaves
*/
protected override _onPlayerLeft(playerId: string, roomId: string): void {
super._onPlayerLeft(playerId, roomId);
const room = this._rooms.get(roomId);
if (room) {
this._adapter.updateRoom(roomId, {
playerCount: room.players.length,
updatedAt: Date.now()
}).catch(err => {
logger.error(`Failed to update room ${roomId}:`, err);
});
}
}
// =========================================================================
// 分布式路由 | Distributed Routing
// =========================================================================
/**
* @zh 路由玩家到合适的房间/服务器
* @en Route player to appropriate room/server
*
* @param request - 路由请求 | Routing request
* @returns 路由结果 | Routing result
*/
async route(request: RoutingRequest): Promise<RoutingResult> {
// 如果指定了房间 ID直接查找 | If room ID specified, look it up directly
if (request.roomId) {
return this._routeToRoom(request.roomId);
}
// 按类型查找可用房间 | Find available room by type
if (request.roomType) {
return this._routeByType(request.roomType, request.query);
}
return { type: 'unavailable', reason: 'No room type or room ID specified' };
}
/**
* @zh 加入或创建房间(分布式版本)
* @en Join or create room (distributed version)
*
* @zh 此方法会:
* 1. 先在分布式注册表中查找可用房间
* 2. 如果找到其他服务器的房间,返回重定向
* 3. 如果找到本地房间或需要创建,在本地处理
* @en This method will:
* 1. First search for available room in distributed registry
* 2. If room found on another server, return redirect
* 3. If local room found or creation needed, handle locally
*/
async joinOrCreateDistributed(
name: string,
playerId: string,
conn: any,
options?: RoomOptions
): Promise<{ room: Room; player: Player } | { redirect: string } | null> {
// 使用分布式锁防止竞态条件 | Use distributed lock to prevent race conditions
const lockKey = `joinOrCreate:${name}`;
const locked = await this._adapter.acquireLock(lockKey, 5000);
if (!locked) {
// 等待一小段时间后重试 | Wait and retry
await this._sleep(100);
return this.joinOrCreateDistributed(name, playerId, conn, options);
}
try {
// 先在分布式注册表中查找 | First search in distributed registry
const availableRoom = await this._adapter.findAvailableRoom(name);
if (availableRoom) {
// 检查是否在本地服务器 | Check if on local server
if (availableRoom.serverId === this._serverId) {
// 本地房间 | Local room
return super.joinOrCreate(name, playerId, conn, options);
} else {
// 其他服务器,返回重定向 | Other server, return redirect
return { redirect: availableRoom.serverAddress };
}
}
// 没有可用房间,在本地创建 | No available room, create locally
return super.joinOrCreate(name, playerId, conn, options);
} finally {
await this._adapter.releaseLock(lockKey);
}
}
// =========================================================================
// 状态管理 | State Management
// =========================================================================
/**
* @zh 保存房间状态快照
* @en Save room state snapshot
*
* @param roomId - 房间 ID | Room ID
*/
async saveSnapshot(roomId: string): Promise<void> {
const room = this._rooms.get(roomId);
if (!room) return;
const def = this._getDefinitionByRoom(room);
if (!def) return;
const snapshot = {
roomId: room.id,
roomType: def.name,
state: room.state ?? {},
players: room.players.map(p => ({
id: p.id,
data: p.data ?? {}
})),
version: Date.now(),
timestamp: Date.now()
};
await this._adapter.saveSnapshot(snapshot);
logger.debug(`Saved snapshot for room: ${roomId}`);
}
/**
* @zh 从快照恢复房间
* @en Restore room from snapshot
*
* @param roomId - 房间 ID | Room ID
* @returns 是否成功恢复 | Whether restore was successful
*/
async restoreFromSnapshot(roomId: string): Promise<boolean> {
const snapshot = await this._adapter.loadSnapshot(roomId);
if (!snapshot) return false;
// 创建房间实例 | Create room instance
const room = await this._createRoomInstance(
snapshot.roomType,
{ state: snapshot.state },
snapshot.roomId
);
if (!room) return false;
// 注册到分布式系统 | Register to distributed system
await this._onRoomCreated(snapshot.roomType, room);
logger.info(`Restored room from snapshot: ${roomId}`);
return true;
}
// =========================================================================
// 私有方法 | Private Methods
// =========================================================================
/**
* @zh 注册服务器到分布式系统
* @en Register server to distributed system
*/
private async _registerServer(): Promise<void> {
const registration: ServerRegistration = {
serverId: this._serverId,
address: this._config.serverAddress,
port: this._config.serverPort,
roomCount: this._rooms.size,
playerCount: this._countTotalPlayers(),
capacity: this._config.capacity,
status: 'online',
lastHeartbeat: Date.now(),
metadata: this._config.metadata
};
await this._adapter.registerServer(registration);
}
/**
* @zh 订阅分布式事件
* @en Subscribe to distributed events
*/
private async _subscribeToEvents(): Promise<void> {
// 订阅服务器离线事件以触发故障转移 | Subscribe to server offline for failover
if (this._config.enableFailover) {
const unsub = await this._adapter.subscribe('server:offline', (event) => {
this._handleServerOffline(event);
});
this._subscriptions.push(unsub);
}
// 订阅房间消息事件 | Subscribe to room message events
const roomMsgUnsub = await this._adapter.subscribe('room:message', (event) => {
this._handleRoomMessage(event);
});
this._subscriptions.push(roomMsgUnsub);
}
/**
* @zh 启动心跳定时器
* @en Start heartbeat timer
*/
private _startHeartbeat(): void {
this._heartbeatTimer = setInterval(async () => {
try {
await this._adapter.heartbeat(this._serverId);
await this._adapter.updateServer(this._serverId, {
roomCount: this._rooms.size,
playerCount: this._countTotalPlayers()
});
} catch (err) {
logger.error('Heartbeat failed:', err);
}
}, this._config.heartbeatInterval);
}
/**
* @zh 启动快照定时器
* @en Start snapshot timer
*/
private _startSnapshotTimer(): void {
this._snapshotTimer = setInterval(async () => {
await this._saveAllSnapshots();
}, this._config.snapshotInterval);
}
/**
* @zh 保存所有房间快照
* @en Save all room snapshots
*/
private async _saveAllSnapshots(): Promise<void> {
const promises: Promise<void>[] = [];
for (const roomId of this._rooms.keys()) {
promises.push(this.saveSnapshot(roomId));
}
await Promise.allSettled(promises);
}
/**
* @zh 路由到指定房间
* @en Route to specific room
*/
private async _routeToRoom(roomId: string): Promise<RoutingResult> {
// 先检查本地 | Check local first
if (this._rooms.has(roomId)) {
return { type: 'local', roomId };
}
// 从分布式注册表查询 | Query from distributed registry
const registration = await this._adapter.getRoom(roomId);
if (!registration) {
return { type: 'unavailable', reason: 'Room not found' };
}
if (registration.serverId === this._serverId) {
return { type: 'local', roomId };
}
return {
type: 'redirect',
serverAddress: registration.serverAddress,
roomId
};
}
/**
* @zh 按类型路由
* @en Route by type
*/
private async _routeByType(
roomType: string,
_query?: RoutingRequest['query']
): Promise<RoutingResult> {
const availableRoom = await this._adapter.findAvailableRoom(roomType);
if (!availableRoom) {
// 没有可用房间,需要创建 | No available room, need to create
return { type: 'create', roomId: undefined };
}
if (availableRoom.serverId === this._serverId) {
return { type: 'local', roomId: availableRoom.roomId };
}
return {
type: 'redirect',
serverAddress: availableRoom.serverAddress,
roomId: availableRoom.roomId
};
}
/**
* @zh 处理服务器离线事件
* @en Handle server offline event
*/
private _handleServerOffline(event: DistributedEvent): void {
if (this._isShuttingDown) return;
if (!this._config.enableFailover) return;
const offlineServerId = event.serverId;
if (offlineServerId === this._serverId) return;
logger.info(`Server offline detected: ${offlineServerId}`);
this._tryRecoverRoomsFromServer(offlineServerId).catch(err => {
logger.error(`Failed to recover rooms from ${offlineServerId}:`, err);
});
}
/**
* @zh 尝试从离线服务器恢复房间
* @en Try to recover rooms from offline server
*/
private async _tryRecoverRoomsFromServer(offlineServerId: string): Promise<void> {
// 检查是否有容量接收更多房间
if (this._rooms.size >= this._config.capacity) {
logger.warn(`Cannot recover rooms: server at capacity (${this._rooms.size}/${this._config.capacity})`);
return;
}
// 查询该服务器上的所有房间
const rooms = await this._adapter.queryRooms({ serverId: offlineServerId });
if (rooms.length === 0) {
logger.info(`No rooms to recover from ${offlineServerId}`);
return;
}
logger.info(`Attempting to recover ${rooms.length} rooms from ${offlineServerId}`);
for (const roomReg of rooms) {
// 检查容量
if (this._rooms.size >= this._config.capacity) {
logger.warn('Reached capacity during recovery, stopping');
break;
}
// 尝试获取恢复锁,防止多个服务器同时恢复同一房间
const lockKey = `failover:${roomReg.roomId}`;
const acquired = await this._adapter.acquireLock(lockKey, this._config.migrationTimeout);
if (!acquired) {
continue;
}
try {
// 从快照恢复房间
const success = await this.restoreFromSnapshot(roomReg.roomId);
if (success) {
logger.info(`Successfully recovered room ${roomReg.roomId}`);
// 发布恢复事件
await this._adapter.publish({
type: 'room:migrated',
serverId: this._serverId,
roomId: roomReg.roomId,
payload: {
fromServer: offlineServerId,
toServer: this._serverId
},
timestamp: Date.now()
});
}
} finally {
await this._adapter.releaseLock(lockKey);
}
}
}
/**
* @zh 处理跨服务器房间消息
* @en Handle cross-server room message
*/
private _handleRoomMessage(event: DistributedEvent): void {
if (!event.roomId) return;
const room = this._rooms.get(event.roomId);
if (!room) return;
const payload = event.payload as { messageType: string; data: unknown; playerId?: string };
if (payload.playerId) {
room._handleMessage(payload.messageType, payload.data, payload.playerId);
}
}
/**
* @zh 统计总玩家数
* @en Count total players
*/
private _countTotalPlayers(): number {
let count = 0;
for (const room of this._rooms.values()) {
count += room.players.length;
}
return count;
}
/**
* @zh 根据房间实例获取定义
* @en Get definition by room instance
*/
private _getDefinitionByRoom(room: Room): { name: string } | null {
for (const [name, def] of this._definitions) {
if (room instanceof def.roomClass) {
return { name };
}
}
return null;
}
/**
* @zh 休眠指定时间
* @en Sleep for specified time
*/
private _sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
/**
* @zh 向其他服务器的房间发送消息
* @en Send message to room on another server
*
* @param roomId - 房间 ID | Room ID
* @param messageType - 消息类型 | Message type
* @param data - 消息数据 | Message data
* @param playerId - 发送者玩家 ID可选| Sender player ID (optional)
*/
async sendToRemoteRoom(
roomId: string,
messageType: string,
data: unknown,
playerId?: string
): Promise<void> {
await this._adapter.sendToRoom(roomId, messageType, data, playerId);
}
/**
* @zh 获取所有在线服务器
* @en Get all online servers
*/
async getServers(): Promise<ServerRegistration[]> {
return this._adapter.getServers();
}
/**
* @zh 查询分布式房间
* @en Query distributed rooms
*/
async queryDistributedRooms(query: {
roomType?: string;
hasSpace?: boolean;
notLocked?: boolean;
metadata?: Record<string, unknown>;
limit?: number;
}): Promise<RoomRegistration[]> {
return this._adapter.queryRooms(query);
}
}

View File

@@ -0,0 +1,453 @@
/**
* @zh DistributedRoomManager 单元测试
* @en DistributedRoomManager unit tests
*/
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { DistributedRoomManager } from '../DistributedRoomManager.js';
import { MemoryAdapter } from '../adapters/MemoryAdapter.js';
import { Room } from '../../room/Room.js';
class TestRoom extends Room {
maxPlayers = 4;
}
describe('DistributedRoomManager', () => {
let adapter: MemoryAdapter;
let manager: DistributedRoomManager;
const mockSendFn = vi.fn();
beforeEach(async () => {
vi.clearAllMocks();
adapter = new MemoryAdapter({ enableTtlCheck: false });
manager = new DistributedRoomManager(adapter, {
serverId: 'server-1',
serverAddress: 'localhost',
serverPort: 3000,
heartbeatInterval: 60000, // 长间隔避免测试中触发
snapshotInterval: 0 // 禁用自动快照
}, mockSendFn);
manager.define('test', TestRoom);
await manager.start();
});
afterEach(async () => {
await manager.stop(false);
});
// =========================================================================
// 生命周期 | Lifecycle
// =========================================================================
describe('lifecycle', () => {
it('should start and register server', async () => {
const servers = await adapter.getServers();
expect(servers).toHaveLength(1);
expect(servers[0].serverId).toBe('server-1');
expect(servers[0].status).toBe('online');
});
it('should stop and unregister server', async () => {
await manager.stop(false);
const servers = await adapter.getServers();
expect(servers).toHaveLength(0);
});
it('should expose serverId and config', () => {
expect(manager.serverId).toBe('server-1');
expect(manager.config.serverAddress).toBe('localhost');
expect(manager.config.serverPort).toBe(3000);
});
});
// =========================================================================
// 房间操作 | Room Operations
// =========================================================================
describe('room operations', () => {
it('should create room and register to distributed system', async () => {
const room = await manager.create('test');
expect(room).toBeDefined();
expect(room?.id).toBeDefined();
const registration = await adapter.getRoom(room!.id);
expect(registration).toBeDefined();
expect(registration?.roomType).toBe('test');
expect(registration?.serverId).toBe('server-1');
});
it('should update room count on server after creating room', async () => {
await manager.create('test');
await manager.create('test');
const server = await adapter.getServer('server-1');
expect(server?.roomCount).toBe(2);
});
it('should unregister room from distributed system on dispose', async () => {
const room = await manager.create('test');
const roomId = room!.id;
room!.dispose();
// 等待异步注销完成 | Wait for async unregister
await new Promise(r => setTimeout(r, 50));
const registration = await adapter.getRoom(roomId);
expect(registration).toBeNull();
});
it('should update player count in distributed registration', async () => {
const mockConn = { send: vi.fn() };
const result = await manager.joinOrCreate('test', 'player-1', mockConn);
expect(result).toBeDefined();
const registration = await adapter.getRoom(result!.room.id);
expect(registration?.playerCount).toBe(1);
});
});
// =========================================================================
// 分布式路由 | Distributed Routing
// =========================================================================
describe('distributed routing', () => {
it('should route to local room', async () => {
const room = await manager.create('test');
const result = await manager.route({ roomId: room!.id, playerId: 'p1' });
expect(result.type).toBe('local');
expect(result.roomId).toBe(room!.id);
});
it('should return unavailable for non-existent room', async () => {
const result = await manager.route({ roomId: 'non-existent', playerId: 'p1' });
expect(result.type).toBe('unavailable');
});
it('should return create when no available room exists', async () => {
const result = await manager.route({ roomType: 'test', playerId: 'p1' });
expect(result.type).toBe('create');
});
it('should return local for available local room', async () => {
const room = await manager.create('test');
const result = await manager.route({ roomType: 'test', playerId: 'p1' });
expect(result.type).toBe('local');
expect(result.roomId).toBe(room!.id);
});
it('should return redirect for room on another server', async () => {
// 直接在适配器中注册另一个服务器的房间 | Register room from another server directly
await adapter.registerServer({
serverId: 'server-2',
address: 'other-host',
port: 3001,
roomCount: 1,
playerCount: 0,
capacity: 100,
status: 'online',
lastHeartbeat: Date.now()
});
await adapter.registerRoom({
roomId: 'remote-room-1',
roomType: 'test',
serverId: 'server-2',
serverAddress: 'other-host:3001',
playerCount: 0,
maxPlayers: 4,
isLocked: false,
metadata: {},
createdAt: Date.now(),
updatedAt: Date.now()
});
const result = await manager.route({ roomType: 'test', playerId: 'p1' });
expect(result.type).toBe('redirect');
expect(result.serverAddress).toBe('other-host:3001');
expect(result.roomId).toBe('remote-room-1');
});
});
// =========================================================================
// 分布式加入创建 | Distributed Join/Create
// =========================================================================
describe('joinOrCreateDistributed', () => {
it('should create room locally when none exists', async () => {
const mockConn = { send: vi.fn() };
const result = await manager.joinOrCreateDistributed('test', 'player-1', mockConn);
expect(result).not.toBeNull();
expect('room' in result!).toBe(true);
if ('room' in result!) {
expect(result.room).toBeDefined();
expect(result.player.id).toBe('player-1');
}
});
it('should join existing local room', async () => {
const room = await manager.create('test');
const mockConn = { send: vi.fn() };
const result = await manager.joinOrCreateDistributed('test', 'player-1', mockConn);
expect(result).not.toBeNull();
expect('room' in result!).toBe(true);
if ('room' in result!) {
expect(result.room.id).toBe(room!.id);
}
});
it('should return redirect for remote room', async () => {
await adapter.registerServer({
serverId: 'server-2',
address: 'remote',
port: 3001,
roomCount: 1,
playerCount: 0,
capacity: 100,
status: 'online',
lastHeartbeat: Date.now()
});
await adapter.registerRoom({
roomId: 'remote-room',
roomType: 'test',
serverId: 'server-2',
serverAddress: 'remote:3001',
playerCount: 0,
maxPlayers: 4,
isLocked: false,
metadata: {},
createdAt: Date.now(),
updatedAt: Date.now()
});
const mockConn = { send: vi.fn() };
const result = await manager.joinOrCreateDistributed('test', 'player-1', mockConn);
expect(result).not.toBeNull();
expect('redirect' in result!).toBe(true);
if ('redirect' in result!) {
expect(result.redirect).toBe('remote:3001');
}
});
});
// =========================================================================
// 状态快照 | State Snapshots
// =========================================================================
describe('snapshots', () => {
it('should save room snapshot', async () => {
const room = await manager.create('test', { state: { score: 100 } });
await manager.saveSnapshot(room!.id);
const snapshot = await adapter.loadSnapshot(room!.id);
expect(snapshot).toBeDefined();
expect(snapshot?.roomId).toBe(room!.id);
expect(snapshot?.roomType).toBe('test');
});
it('should restore room from snapshot', async () => {
// 手动创建快照 | Manually create snapshot
await adapter.saveSnapshot({
roomId: 'restored-room',
roomType: 'test',
state: { score: 500 },
players: [],
version: 1,
timestamp: Date.now()
});
const restored = await manager.restoreFromSnapshot('restored-room');
expect(restored).toBe(true);
const room = manager.getRoom('restored-room');
expect(room).toBeDefined();
});
it('should return false when snapshot not found', async () => {
const restored = await manager.restoreFromSnapshot('non-existent');
expect(restored).toBe(false);
});
});
// =========================================================================
// 跨服务器通信 | Cross-Server Communication
// =========================================================================
describe('cross-server communication', () => {
it('should send message to remote room', async () => {
const handler = vi.fn();
await adapter.subscribe('room:message', handler);
await adapter.registerRoom({
roomId: 'remote-room',
roomType: 'test',
serverId: 'server-2',
serverAddress: 'remote:3001',
playerCount: 1,
maxPlayers: 4,
isLocked: false,
metadata: {},
createdAt: Date.now(),
updatedAt: Date.now()
});
await manager.sendToRemoteRoom('remote-room', 'chat', { text: 'hello' }, 'player-1');
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
type: 'room:message',
roomId: 'remote-room',
payload: {
messageType: 'chat',
data: { text: 'hello' },
playerId: 'player-1'
}
})
);
});
});
// =========================================================================
// 查询方法 | Query Methods
// =========================================================================
describe('query methods', () => {
it('should get all servers', async () => {
await adapter.registerServer({
serverId: 'server-2',
address: 'other',
port: 3001,
roomCount: 0,
playerCount: 0,
capacity: 100,
status: 'online',
lastHeartbeat: Date.now()
});
const servers = await manager.getServers();
expect(servers).toHaveLength(2);
});
it('should query distributed rooms', async () => {
await manager.create('test');
await adapter.registerRoom({
roomId: 'remote-room',
roomType: 'test',
serverId: 'server-2',
serverAddress: 'remote:3001',
playerCount: 0,
maxPlayers: 4,
isLocked: false,
metadata: {},
createdAt: Date.now(),
updatedAt: Date.now()
});
const rooms = await manager.queryDistributedRooms({ roomType: 'test' });
expect(rooms).toHaveLength(2);
});
});
// =========================================================================
// 事件订阅 | Event Subscription
// =========================================================================
describe('event subscription', () => {
it('should handle room messages for local rooms', async () => {
const room = await manager.create('test');
const handleSpy = vi.spyOn(room!, '_handleMessage');
await adapter.publish({
type: 'room:message',
serverId: 'server-2',
roomId: room!.id,
payload: {
messageType: 'test',
data: { foo: 'bar' },
playerId: 'player-1'
},
timestamp: Date.now()
});
expect(handleSpy).toHaveBeenCalledWith('test', { foo: 'bar' }, 'player-1');
});
});
// =========================================================================
// 优雅关闭 | Graceful Shutdown
// =========================================================================
describe('graceful shutdown', () => {
it('should mark server as draining on graceful stop', async () => {
const statusHandler = vi.fn();
// 创建新的管理器用于此测试 | Create new manager for this test
const newAdapter = new MemoryAdapter({ enableTtlCheck: false });
const newManager = new DistributedRoomManager(newAdapter, {
serverId: 'graceful-server',
serverAddress: 'localhost',
serverPort: 3002,
heartbeatInterval: 60000,
snapshotInterval: 0
}, mockSendFn);
newManager.define('test', TestRoom);
await newManager.start();
// 监听状态变化 | Watch for status changes
// 由于我们在 stop(true) 中调用 updateServer可以检查最终状态
await newManager.stop(true);
// 验证服务器已注销 | Verify server is unregistered
const server = await newAdapter.getServer('graceful-server');
expect(server).toBeNull();
});
it('should save all snapshots on graceful stop', async () => {
const newAdapter = new MemoryAdapter({ enableTtlCheck: false });
const newManager = new DistributedRoomManager(newAdapter, {
serverId: 'snapshot-server',
serverAddress: 'localhost',
serverPort: 3003,
heartbeatInterval: 60000,
snapshotInterval: 0
}, mockSendFn);
newManager.define('test', TestRoom);
await newManager.start();
// 创建房间 | Create rooms
const room1 = await newManager.create('test');
const room2 = await newManager.create('test');
await newManager.stop(true);
// 验证快照已保存 | Verify snapshots are saved
const snapshot1 = await newAdapter.loadSnapshot(room1!.id);
const snapshot2 = await newAdapter.loadSnapshot(room2!.id);
expect(snapshot1).toBeDefined();
expect(snapshot2).toBeDefined();
});
});
});

View File

@@ -0,0 +1,582 @@
/**
* @zh MemoryAdapter 单元测试
* @en MemoryAdapter unit tests
*/
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { MemoryAdapter } from '../adapters/MemoryAdapter.js';
import type { ServerRegistration, RoomRegistration, DistributedEvent } from '../types.js';
describe('MemoryAdapter', () => {
let adapter: MemoryAdapter;
beforeEach(async () => {
adapter = new MemoryAdapter({ enableTtlCheck: false });
await adapter.connect();
});
afterEach(async () => {
await adapter.disconnect();
});
// =========================================================================
// 生命周期 | Lifecycle
// =========================================================================
describe('lifecycle', () => {
it('should connect and disconnect', async () => {
const newAdapter = new MemoryAdapter();
expect(newAdapter.isConnected()).toBe(false);
await newAdapter.connect();
expect(newAdapter.isConnected()).toBe(true);
await newAdapter.disconnect();
expect(newAdapter.isConnected()).toBe(false);
});
it('should not throw on double connect', async () => {
await adapter.connect();
expect(adapter.isConnected()).toBe(true);
});
it('should not throw on double disconnect', async () => {
await adapter.disconnect();
await adapter.disconnect();
expect(adapter.isConnected()).toBe(false);
});
});
// =========================================================================
// 服务器注册 | Server Registry
// =========================================================================
describe('server registry', () => {
const createServer = (id: string): ServerRegistration => ({
serverId: id,
address: 'localhost',
port: 3000,
roomCount: 0,
playerCount: 0,
capacity: 100,
status: 'online',
lastHeartbeat: Date.now()
});
it('should register and get server', async () => {
const server = createServer('server-1');
await adapter.registerServer(server);
const result = await adapter.getServer('server-1');
expect(result).toBeDefined();
expect(result?.serverId).toBe('server-1');
});
it('should get all online servers', async () => {
await adapter.registerServer(createServer('server-1'));
await adapter.registerServer(createServer('server-2'));
const servers = await adapter.getServers();
expect(servers).toHaveLength(2);
});
it('should filter out offline servers', async () => {
const server1 = createServer('server-1');
const server2 = { ...createServer('server-2'), status: 'offline' as const };
await adapter.registerServer(server1);
await adapter.registerServer(server2);
const servers = await adapter.getServers();
expect(servers).toHaveLength(1);
expect(servers[0].serverId).toBe('server-1');
});
it('should unregister server and cleanup rooms', async () => {
const server = createServer('server-1');
await adapter.registerServer(server);
const room: RoomRegistration = {
roomId: 'room-1',
roomType: 'game',
serverId: 'server-1',
serverAddress: 'localhost:3000',
playerCount: 0,
maxPlayers: 4,
isLocked: false,
metadata: {},
createdAt: Date.now(),
updatedAt: Date.now()
};
await adapter.registerRoom(room);
await adapter.unregisterServer('server-1');
const serverResult = await adapter.getServer('server-1');
expect(serverResult).toBeNull();
const roomResult = await adapter.getRoom('room-1');
expect(roomResult).toBeNull();
});
it('should update server heartbeat', async () => {
const server = createServer('server-1');
await adapter.registerServer(server);
const before = (await adapter.getServer('server-1'))?.lastHeartbeat;
await new Promise(r => setTimeout(r, 10));
await adapter.heartbeat('server-1');
const after = (await adapter.getServer('server-1'))?.lastHeartbeat;
expect(after).toBeGreaterThan(before!);
});
it('should update server info', async () => {
const server = createServer('server-1');
await adapter.registerServer(server);
await adapter.updateServer('server-1', { roomCount: 5, playerCount: 10 });
const result = await adapter.getServer('server-1');
expect(result?.roomCount).toBe(5);
expect(result?.playerCount).toBe(10);
});
});
// =========================================================================
// 房间注册 | Room Registry
// =========================================================================
describe('room registry', () => {
const createRoom = (id: string, serverId = 'server-1'): RoomRegistration => ({
roomId: id,
roomType: 'game',
serverId,
serverAddress: 'localhost:3000',
playerCount: 0,
maxPlayers: 4,
isLocked: false,
metadata: {},
createdAt: Date.now(),
updatedAt: Date.now()
});
beforeEach(async () => {
await adapter.registerServer({
serverId: 'server-1',
address: 'localhost',
port: 3000,
roomCount: 0,
playerCount: 0,
capacity: 100,
status: 'online',
lastHeartbeat: Date.now()
});
});
it('should register and get room', async () => {
const room = createRoom('room-1');
await adapter.registerRoom(room);
const result = await adapter.getRoom('room-1');
expect(result).toBeDefined();
expect(result?.roomId).toBe('room-1');
});
it('should update server room count on register', async () => {
await adapter.registerRoom(createRoom('room-1'));
await adapter.registerRoom(createRoom('room-2'));
const server = await adapter.getServer('server-1');
expect(server?.roomCount).toBe(2);
});
it('should unregister room', async () => {
await adapter.registerRoom(createRoom('room-1'));
await adapter.unregisterRoom('room-1');
const result = await adapter.getRoom('room-1');
expect(result).toBeNull();
});
it('should update room info', async () => {
await adapter.registerRoom(createRoom('room-1'));
await adapter.updateRoom('room-1', { playerCount: 2, isLocked: true });
const result = await adapter.getRoom('room-1');
expect(result?.playerCount).toBe(2);
expect(result?.isLocked).toBe(true);
});
it('should query rooms by type', async () => {
await adapter.registerRoom(createRoom('room-1'));
await adapter.registerRoom({ ...createRoom('room-2'), roomType: 'lobby' });
const games = await adapter.queryRooms({ roomType: 'game' });
expect(games).toHaveLength(1);
expect(games[0].roomId).toBe('room-1');
});
it('should query rooms with space', async () => {
await adapter.registerRoom(createRoom('room-1'));
await adapter.registerRoom({ ...createRoom('room-2'), playerCount: 4 });
const available = await adapter.queryRooms({ hasSpace: true });
expect(available).toHaveLength(1);
expect(available[0].roomId).toBe('room-1');
});
it('should query unlocked rooms', async () => {
await adapter.registerRoom(createRoom('room-1'));
await adapter.registerRoom({ ...createRoom('room-2'), isLocked: true });
const unlocked = await adapter.queryRooms({ notLocked: true });
expect(unlocked).toHaveLength(1);
expect(unlocked[0].roomId).toBe('room-1');
});
it('should query rooms by metadata', async () => {
await adapter.registerRoom({ ...createRoom('room-1'), metadata: { map: 'forest' } });
await adapter.registerRoom({ ...createRoom('room-2'), metadata: { map: 'desert' } });
const forest = await adapter.queryRooms({ metadata: { map: 'forest' } });
expect(forest).toHaveLength(1);
expect(forest[0].roomId).toBe('room-1');
});
it('should support pagination', async () => {
await adapter.registerRoom(createRoom('room-1'));
await adapter.registerRoom(createRoom('room-2'));
await adapter.registerRoom(createRoom('room-3'));
const page1 = await adapter.queryRooms({ limit: 2 });
expect(page1).toHaveLength(2);
const page2 = await adapter.queryRooms({ offset: 2, limit: 2 });
expect(page2).toHaveLength(1);
});
it('should find available room', async () => {
await adapter.registerRoom({ ...createRoom('room-1'), playerCount: 4 }); // full
await adapter.registerRoom({ ...createRoom('room-2'), isLocked: true }); // locked
await adapter.registerRoom(createRoom('room-3')); // available
const available = await adapter.findAvailableRoom('game');
expect(available?.roomId).toBe('room-3');
});
it('should return null when no available room', async () => {
await adapter.registerRoom({ ...createRoom('room-1'), playerCount: 4 });
const available = await adapter.findAvailableRoom('game');
expect(available).toBeNull();
});
it('should get rooms by server', async () => {
await adapter.registerServer({
serverId: 'server-2',
address: 'localhost',
port: 3001,
roomCount: 0,
playerCount: 0,
capacity: 100,
status: 'online',
lastHeartbeat: Date.now()
});
await adapter.registerRoom(createRoom('room-1', 'server-1'));
await adapter.registerRoom(createRoom('room-2', 'server-2'));
const server1Rooms = await adapter.getRoomsByServer('server-1');
expect(server1Rooms).toHaveLength(1);
expect(server1Rooms[0].roomId).toBe('room-1');
});
});
// =========================================================================
// 快照 | Snapshots
// =========================================================================
describe('snapshots', () => {
it('should save and load snapshot', async () => {
const snapshot = {
roomId: 'room-1',
roomType: 'game',
state: { score: 100 },
players: [{ id: 'player-1', data: { name: 'Alice' } }],
version: 1,
timestamp: Date.now()
};
await adapter.saveSnapshot(snapshot);
const result = await adapter.loadSnapshot('room-1');
expect(result).toEqual(snapshot);
});
it('should return null for non-existent snapshot', async () => {
const result = await adapter.loadSnapshot('non-existent');
expect(result).toBeNull();
});
it('should delete snapshot', async () => {
await adapter.saveSnapshot({
roomId: 'room-1',
roomType: 'game',
state: {},
players: [],
version: 1,
timestamp: Date.now()
});
await adapter.deleteSnapshot('room-1');
const result = await adapter.loadSnapshot('room-1');
expect(result).toBeNull();
});
});
// =========================================================================
// 发布/订阅 | Pub/Sub
// =========================================================================
describe('pub/sub', () => {
it('should publish and subscribe to events', async () => {
const handler = vi.fn();
await adapter.subscribe('room:created', handler);
const event: DistributedEvent = {
type: 'room:created',
serverId: 'server-1',
roomId: 'room-1',
payload: { roomType: 'game' },
timestamp: Date.now()
};
await adapter.publish(event);
expect(handler).toHaveBeenCalledWith(event);
});
it('should support wildcard subscription', async () => {
const handler = vi.fn();
await adapter.subscribe('*', handler);
await adapter.publish({
type: 'room:created',
serverId: 'server-1',
payload: {},
timestamp: Date.now()
});
await adapter.publish({
type: 'server:online',
serverId: 'server-1',
payload: {},
timestamp: Date.now()
});
expect(handler).toHaveBeenCalledTimes(2);
});
it('should unsubscribe correctly', async () => {
const handler = vi.fn();
const unsub = await adapter.subscribe('room:created', handler);
unsub();
await adapter.publish({
type: 'room:created',
serverId: 'server-1',
payload: {},
timestamp: Date.now()
});
expect(handler).not.toHaveBeenCalled();
});
it('should handle errors in handlers gracefully', async () => {
const errorHandler = vi.fn(() => { throw new Error('Test error'); });
const normalHandler = vi.fn();
await adapter.subscribe('room:created', errorHandler);
await adapter.subscribe('room:created', normalHandler);
await adapter.publish({
type: 'room:created',
serverId: 'server-1',
payload: {},
timestamp: Date.now()
});
expect(errorHandler).toHaveBeenCalled();
expect(normalHandler).toHaveBeenCalled();
});
it('should send to room', async () => {
await adapter.registerServer({
serverId: 'server-1',
address: 'localhost',
port: 3000,
roomCount: 0,
playerCount: 0,
capacity: 100,
status: 'online',
lastHeartbeat: Date.now()
});
await adapter.registerRoom({
roomId: 'room-1',
roomType: 'game',
serverId: 'server-1',
serverAddress: 'localhost:3000',
playerCount: 0,
maxPlayers: 4,
isLocked: false,
metadata: {},
createdAt: Date.now(),
updatedAt: Date.now()
});
const handler = vi.fn();
await adapter.subscribe('room:message', handler);
await adapter.sendToRoom('room-1', 'chat', { text: 'hello' }, 'player-1');
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
type: 'room:message',
roomId: 'room-1',
payload: {
messageType: 'chat',
data: { text: 'hello' },
playerId: 'player-1'
}
})
);
});
});
// =========================================================================
// 分布式锁 | Distributed Locks
// =========================================================================
describe('distributed locks', () => {
it('should acquire and release lock', async () => {
const acquired = await adapter.acquireLock('test-lock', 5000);
expect(acquired).toBe(true);
await adapter.releaseLock('test-lock');
const acquiredAgain = await adapter.acquireLock('test-lock', 5000);
expect(acquiredAgain).toBe(true);
});
it('should fail to acquire held lock', async () => {
await adapter.acquireLock('test-lock', 5000);
const acquiredAgain = await adapter.acquireLock('test-lock', 5000);
expect(acquiredAgain).toBe(false);
});
it('should acquire expired lock', async () => {
await adapter.acquireLock('test-lock', 1);
await new Promise(r => setTimeout(r, 10));
const acquired = await adapter.acquireLock('test-lock', 5000);
expect(acquired).toBe(true);
});
it('should extend lock', async () => {
await adapter.acquireLock('test-lock', 100);
const extended = await adapter.extendLock('test-lock', 5000);
expect(extended).toBe(true);
});
it('should fail to extend non-existent lock', async () => {
const extended = await adapter.extendLock('non-existent', 5000);
expect(extended).toBe(false);
});
});
// =========================================================================
// TTL 检查 | TTL Check
// =========================================================================
describe('TTL check', () => {
it('should mark server offline after TTL expires', async () => {
const ttlAdapter = new MemoryAdapter({
serverTtl: 50,
ttlCheckInterval: 20,
enableTtlCheck: true
});
await ttlAdapter.connect();
const handler = vi.fn();
await ttlAdapter.subscribe('server:offline', handler);
await ttlAdapter.registerServer({
serverId: 'server-1',
address: 'localhost',
port: 3000,
roomCount: 0,
playerCount: 0,
capacity: 100,
status: 'online',
lastHeartbeat: Date.now()
});
await new Promise(r => setTimeout(r, 100));
expect(handler).toHaveBeenCalled();
const server = await ttlAdapter.getServer('server-1');
expect(server?.status).toBe('offline');
await ttlAdapter.disconnect();
});
});
// =========================================================================
// 测试辅助方法 | Test Helper Methods
// =========================================================================
describe('test helpers', () => {
it('should clear all data', async () => {
await adapter.registerServer({
serverId: 'server-1',
address: 'localhost',
port: 3000,
roomCount: 0,
playerCount: 0,
capacity: 100,
status: 'online',
lastHeartbeat: Date.now()
});
adapter._clear();
const servers = await adapter.getServers();
expect(servers).toHaveLength(0);
});
it('should expose internal state for testing', () => {
const state = adapter._getState();
expect(state.servers).toBeDefined();
expect(state.rooms).toBeDefined();
expect(state.snapshots).toBeDefined();
});
});
// =========================================================================
// 错误处理 | Error Handling
// =========================================================================
describe('error handling', () => {
it('should throw when not connected', async () => {
const disconnected = new MemoryAdapter();
await expect(disconnected.registerServer({} as ServerRegistration))
.rejects.toThrow('MemoryAdapter is not connected');
});
});
});

View File

@@ -0,0 +1,750 @@
/**
* @zh RedisAdapter 单元测试
* @en RedisAdapter unit tests
*
* @zh 使用 Mock Redis 客户端进行测试
* @en Uses Mock Redis client for testing
*/
import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest';
import { RedisAdapter } from '../adapters/RedisAdapter.js';
import type { RedisClient } from '../adapters/RedisAdapter.js';
import type { ServerRegistration, RoomRegistration, DistributedEvent } from '../types.js';
// 共享状态,用于模拟 Redis Pub/Sub
const sharedStore = new Map<string, string>();
const sharedSets = new Map<string, Set<string>>();
const sharedHashes = new Map<string, Map<string, string>>();
const sharedExpireTimes = new Map<string, number>();
const sharedPubSubHandlers = new Map<string, Set<(channel: string, message: string) => void>>();
function clearSharedState(): void {
sharedStore.clear();
sharedSets.clear();
sharedHashes.clear();
sharedExpireTimes.clear();
sharedPubSubHandlers.clear();
}
/**
* @zh 创建 Mock Redis 客户端
* @en Create Mock Redis client
*/
function createMockRedisClient(): RedisClient {
const eventHandlers = new Map<string, Set<(...args: unknown[]) => void>>();
const mockClient: RedisClient = {
// 基础操作
get: vi.fn(async (key: string) => sharedStore.get(key) ?? null),
set: vi.fn(async (key: string, value: string, ...args: (string | number)[]) => {
// 处理 NX 选项
if (args.includes('NX') && sharedStore.has(key)) {
return null;
}
sharedStore.set(key, value);
// 处理 EX 选项
const exIndex = args.indexOf('EX');
if (exIndex !== -1 && typeof args[exIndex + 1] === 'number') {
sharedExpireTimes.set(key, Date.now() + (args[exIndex + 1] as number) * 1000);
}
return 'OK';
}),
del: vi.fn(async (...keys: string[]) => {
let count = 0;
for (const key of keys) {
if (sharedStore.delete(key) || sharedHashes.delete(key) || sharedSets.delete(key)) {
count++;
}
}
return count;
}),
expire: vi.fn(async (key: string, seconds: number) => {
if (sharedStore.has(key) || sharedHashes.has(key)) {
sharedExpireTimes.set(key, Date.now() + seconds * 1000);
return 1;
}
return 0;
}),
ttl: vi.fn(async (key: string) => {
const expire = sharedExpireTimes.get(key);
if (!expire) return -1;
const remaining = Math.ceil((expire - Date.now()) / 1000);
return remaining > 0 ? remaining : -2;
}),
// Hash 操作
hget: vi.fn(async (key: string, field: string) => {
return sharedHashes.get(key)?.get(field) ?? null;
}),
hset: vi.fn(async (key: string, ...args: (string | number | Buffer)[]) => {
if (!sharedHashes.has(key)) {
sharedHashes.set(key, new Map());
}
const hash = sharedHashes.get(key)!;
let added = 0;
for (let i = 0; i < args.length; i += 2) {
const field = String(args[i]);
const value = String(args[i + 1]);
if (!hash.has(field)) added++;
hash.set(field, value);
}
return added;
}),
hdel: vi.fn(async (key: string, ...fields: string[]) => {
const hash = sharedHashes.get(key);
if (!hash) return 0;
let count = 0;
for (const field of fields) {
if (hash.delete(field)) count++;
}
return count;
}),
hgetall: vi.fn(async (key: string) => {
const hash = sharedHashes.get(key);
if (!hash) return {};
const result: Record<string, string> = {};
for (const [k, v] of hash) {
result[k] = v;
}
return result;
}),
hmset: vi.fn(async (key: string, ...args: (string | number | Buffer)[]) => {
if (!sharedHashes.has(key)) {
sharedHashes.set(key, new Map());
}
const hash = sharedHashes.get(key)!;
for (let i = 0; i < args.length; i += 2) {
hash.set(String(args[i]), String(args[i + 1]));
}
return 'OK';
}),
// Set 操作
sadd: vi.fn(async (key: string, ...members: string[]) => {
if (!sharedSets.has(key)) {
sharedSets.set(key, new Set());
}
const set = sharedSets.get(key)!;
let added = 0;
for (const member of members) {
if (!set.has(member)) {
set.add(member);
added++;
}
}
return added;
}),
srem: vi.fn(async (key: string, ...members: string[]) => {
const set = sharedSets.get(key);
if (!set) return 0;
let removed = 0;
for (const member of members) {
if (set.delete(member)) removed++;
}
return removed;
}),
smembers: vi.fn(async (key: string) => {
return Array.from(sharedSets.get(key) ?? []);
}),
// Pub/Sub - 使用共享的处理器集合
publish: vi.fn(async (channel: string, message: string) => {
const handlers = sharedPubSubHandlers.get(channel);
if (handlers) {
for (const handler of handlers) {
handler(channel, message);
}
}
return handlers?.size ?? 0;
}),
subscribe: vi.fn(async (channel: string) => {
// 注册 message 事件处理器到共享的 pub/sub 处理器
const messageHandlers = eventHandlers.get('message');
if (messageHandlers) {
if (!sharedPubSubHandlers.has(channel)) {
sharedPubSubHandlers.set(channel, new Set());
}
for (const handler of messageHandlers) {
sharedPubSubHandlers.get(channel)!.add(handler as (channel: string, message: string) => void);
}
}
return 1;
}),
psubscribe: vi.fn(async () => 1),
unsubscribe: vi.fn(async (channel: string) => {
sharedPubSubHandlers.delete(channel);
return 1;
}),
punsubscribe: vi.fn(async () => 1),
// 事件
on: vi.fn((event: string, callback: (...args: unknown[]) => void) => {
if (!eventHandlers.has(event)) {
eventHandlers.set(event, new Set());
}
eventHandlers.get(event)!.add(callback);
}),
off: vi.fn((event: string, callback: (...args: unknown[]) => void) => {
eventHandlers.get(event)?.delete(callback);
}),
// Lua 脚本
eval: vi.fn(async (script: string, numkeys: number, ...args: (string | number)[]) => {
const key = String(args[0]);
const token = String(args[1]);
// 释放锁脚本
if (script.includes('redis.call("del"')) {
if (sharedStore.get(key) === token) {
sharedStore.delete(key);
return 1;
}
return 0;
}
// 扩展锁脚本
if (script.includes('redis.call("pexpire"')) {
if (sharedStore.get(key) === token) {
const ttlMs = Number(args[2]);
sharedExpireTimes.set(key, Date.now() + ttlMs);
return 1;
}
return 0;
}
return 0;
}),
// 连接
duplicate: vi.fn(() => createMockRedisClient()),
quit: vi.fn(async () => 'OK'),
disconnect: vi.fn()
};
return mockClient;
}
describe('RedisAdapter', () => {
let adapter: RedisAdapter;
let mockClient: RedisClient;
beforeEach(async () => {
clearSharedState();
mockClient = createMockRedisClient();
adapter = new RedisAdapter({
factory: () => mockClient,
prefix: 'test:'
});
await adapter.connect();
});
afterEach(async () => {
await adapter.disconnect();
});
// =========================================================================
// 生命周期 | Lifecycle
// =========================================================================
describe('lifecycle', () => {
it('should connect and disconnect', async () => {
const newAdapter = new RedisAdapter({
factory: () => createMockRedisClient()
});
expect(newAdapter.isConnected()).toBe(false);
await newAdapter.connect();
expect(newAdapter.isConnected()).toBe(true);
await newAdapter.disconnect();
expect(newAdapter.isConnected()).toBe(false);
});
it('should not throw on double connect', async () => {
await adapter.connect();
expect(adapter.isConnected()).toBe(true);
});
it('should not throw on double disconnect', async () => {
await adapter.disconnect();
await adapter.disconnect();
expect(adapter.isConnected()).toBe(false);
});
});
// =========================================================================
// 服务器注册 | Server Registry
// =========================================================================
describe('server registry', () => {
const createServer = (id: string): ServerRegistration => ({
serverId: id,
address: 'localhost',
port: 3000,
roomCount: 0,
playerCount: 0,
capacity: 100,
status: 'online',
lastHeartbeat: Date.now()
});
it('should register and get server', async () => {
const server = createServer('server-1');
await adapter.registerServer(server);
const result = await adapter.getServer('server-1');
expect(result).toBeDefined();
expect(result?.serverId).toBe('server-1');
expect(result?.address).toBe('localhost');
expect(result?.port).toBe(3000);
});
it('should get all online servers', async () => {
await adapter.registerServer(createServer('server-1'));
await adapter.registerServer(createServer('server-2'));
const servers = await adapter.getServers();
expect(servers).toHaveLength(2);
});
it('should unregister server', async () => {
await adapter.registerServer(createServer('server-1'));
await adapter.unregisterServer('server-1');
const result = await adapter.getServer('server-1');
expect(result).toBeNull();
});
it('should update server heartbeat', async () => {
const server = createServer('server-1');
await adapter.registerServer(server);
await new Promise(r => setTimeout(r, 10));
await adapter.heartbeat('server-1');
const result = await adapter.getServer('server-1');
expect(result?.lastHeartbeat).toBeGreaterThan(server.lastHeartbeat);
});
it('should update server info', async () => {
await adapter.registerServer(createServer('server-1'));
await adapter.updateServer('server-1', { roomCount: 5, playerCount: 10 });
const result = await adapter.getServer('server-1');
expect(result?.roomCount).toBe(5);
expect(result?.playerCount).toBe(10);
});
it('should publish draining event when status changes', async () => {
await adapter.registerServer(createServer('server-1'));
const handler = vi.fn();
await adapter.subscribe('server:draining', handler);
await adapter.updateServer('server-1', { status: 'draining' });
expect(handler).toHaveBeenCalled();
});
});
// =========================================================================
// 房间注册 | Room Registry
// =========================================================================
describe('room registry', () => {
const createRoom = (id: string, serverId = 'server-1'): RoomRegistration => ({
roomId: id,
roomType: 'game',
serverId,
serverAddress: 'localhost:3000',
playerCount: 0,
maxPlayers: 4,
isLocked: false,
metadata: {},
createdAt: Date.now(),
updatedAt: Date.now()
});
beforeEach(async () => {
await adapter.registerServer({
serverId: 'server-1',
address: 'localhost',
port: 3000,
roomCount: 0,
playerCount: 0,
capacity: 100,
status: 'online',
lastHeartbeat: Date.now()
});
});
it('should register and get room', async () => {
const room = createRoom('room-1');
await adapter.registerRoom(room);
const result = await adapter.getRoom('room-1');
expect(result).toBeDefined();
expect(result?.roomId).toBe('room-1');
expect(result?.roomType).toBe('game');
});
it('should unregister room', async () => {
await adapter.registerRoom(createRoom('room-1'));
await adapter.unregisterRoom('room-1');
const result = await adapter.getRoom('room-1');
expect(result).toBeNull();
});
it('should update room info', async () => {
await adapter.registerRoom(createRoom('room-1'));
await adapter.updateRoom('room-1', { playerCount: 2, isLocked: true });
const result = await adapter.getRoom('room-1');
expect(result?.playerCount).toBe(2);
expect(result?.isLocked).toBe(true);
});
it('should query rooms by type', async () => {
await adapter.registerRoom(createRoom('room-1'));
await adapter.registerRoom({ ...createRoom('room-2'), roomType: 'lobby' });
const games = await adapter.queryRooms({ roomType: 'game' });
expect(games).toHaveLength(1);
expect(games[0].roomId).toBe('room-1');
});
it('should query rooms with space', async () => {
await adapter.registerRoom(createRoom('room-1'));
await adapter.registerRoom({ ...createRoom('room-2'), playerCount: 4 });
const available = await adapter.queryRooms({ hasSpace: true });
expect(available).toHaveLength(1);
expect(available[0].roomId).toBe('room-1');
});
it('should query unlocked rooms', async () => {
await adapter.registerRoom(createRoom('room-1'));
await adapter.registerRoom({ ...createRoom('room-2'), isLocked: true });
const unlocked = await adapter.queryRooms({ notLocked: true });
expect(unlocked).toHaveLength(1);
expect(unlocked[0].roomId).toBe('room-1');
});
it('should support pagination', async () => {
await adapter.registerRoom(createRoom('room-1'));
await adapter.registerRoom(createRoom('room-2'));
await adapter.registerRoom(createRoom('room-3'));
const page1 = await adapter.queryRooms({ limit: 2 });
expect(page1).toHaveLength(2);
const page2 = await adapter.queryRooms({ offset: 2, limit: 2 });
expect(page2).toHaveLength(1);
});
it('should find available room', async () => {
await adapter.registerRoom({ ...createRoom('room-1'), playerCount: 4 }); // full
await adapter.registerRoom({ ...createRoom('room-2'), isLocked: true }); // locked
await adapter.registerRoom(createRoom('room-3')); // available
const available = await adapter.findAvailableRoom('game');
expect(available?.roomId).toBe('room-3');
});
it('should get rooms by server', async () => {
await adapter.registerServer({
serverId: 'server-2',
address: 'localhost',
port: 3001,
roomCount: 0,
playerCount: 0,
capacity: 100,
status: 'online',
lastHeartbeat: Date.now()
});
await adapter.registerRoom(createRoom('room-1', 'server-1'));
await adapter.registerRoom(createRoom('room-2', 'server-2'));
const server1Rooms = await adapter.getRoomsByServer('server-1');
expect(server1Rooms).toHaveLength(1);
expect(server1Rooms[0].roomId).toBe('room-1');
});
it('should publish lock/unlock events', async () => {
await adapter.registerRoom(createRoom('room-1'));
const lockHandler = vi.fn();
const unlockHandler = vi.fn();
await adapter.subscribe('room:locked', lockHandler);
await adapter.subscribe('room:unlocked', unlockHandler);
await adapter.updateRoom('room-1', { isLocked: true });
expect(lockHandler).toHaveBeenCalled();
await adapter.updateRoom('room-1', { isLocked: false });
expect(unlockHandler).toHaveBeenCalled();
});
});
// =========================================================================
// 快照 | Snapshots
// =========================================================================
describe('snapshots', () => {
it('should save and load snapshot', async () => {
const snapshot = {
roomId: 'room-1',
roomType: 'game',
state: { score: 100 },
players: [{ id: 'player-1', data: { name: 'Alice' } }],
version: 1,
timestamp: Date.now()
};
await adapter.saveSnapshot(snapshot);
const result = await adapter.loadSnapshot('room-1');
expect(result).toEqual(snapshot);
});
it('should return null for non-existent snapshot', async () => {
const result = await adapter.loadSnapshot('non-existent');
expect(result).toBeNull();
});
it('should delete snapshot', async () => {
await adapter.saveSnapshot({
roomId: 'room-1',
roomType: 'game',
state: {},
players: [],
version: 1,
timestamp: Date.now()
});
await adapter.deleteSnapshot('room-1');
const result = await adapter.loadSnapshot('room-1');
expect(result).toBeNull();
});
});
// =========================================================================
// 发布/订阅 | Pub/Sub
// =========================================================================
describe('pub/sub', () => {
it('should publish and subscribe to events', async () => {
const handler = vi.fn();
await adapter.subscribe('room:created', handler);
const event: DistributedEvent = {
type: 'room:created',
serverId: 'server-1',
roomId: 'room-1',
payload: { roomType: 'game' },
timestamp: Date.now()
};
await adapter.publish(event);
expect(handler).toHaveBeenCalledWith(event);
});
it('should support wildcard subscription', async () => {
const handler = vi.fn();
await adapter.subscribe('*', handler);
await adapter.publish({
type: 'room:created',
serverId: 'server-1',
payload: {},
timestamp: Date.now()
});
await adapter.publish({
type: 'server:online',
serverId: 'server-1',
payload: {},
timestamp: Date.now()
});
expect(handler).toHaveBeenCalledTimes(2);
});
it('should unsubscribe correctly', async () => {
const handler = vi.fn();
const unsub = await adapter.subscribe('room:created', handler);
unsub();
await adapter.publish({
type: 'room:created',
serverId: 'server-1',
payload: {},
timestamp: Date.now()
});
expect(handler).not.toHaveBeenCalled();
});
it('should send to room', async () => {
await adapter.registerServer({
serverId: 'server-1',
address: 'localhost',
port: 3000,
roomCount: 0,
playerCount: 0,
capacity: 100,
status: 'online',
lastHeartbeat: Date.now()
});
await adapter.registerRoom({
roomId: 'room-1',
roomType: 'game',
serverId: 'server-1',
serverAddress: 'localhost:3000',
playerCount: 0,
maxPlayers: 4,
isLocked: false,
metadata: {},
createdAt: Date.now(),
updatedAt: Date.now()
});
const handler = vi.fn();
await adapter.subscribe('room:message', handler);
await adapter.sendToRoom('room-1', 'chat', { text: 'hello' }, 'player-1');
expect(handler).toHaveBeenCalledWith(
expect.objectContaining({
type: 'room:message',
roomId: 'room-1',
payload: {
messageType: 'chat',
data: { text: 'hello' },
playerId: 'player-1'
}
})
);
});
});
// =========================================================================
// 分布式锁 | Distributed Locks
// =========================================================================
describe('distributed locks', () => {
it('should acquire and release lock', async () => {
const acquired = await adapter.acquireLock('test-lock', 5000);
expect(acquired).toBe(true);
await adapter.releaseLock('test-lock');
const acquiredAgain = await adapter.acquireLock('test-lock', 5000);
expect(acquiredAgain).toBe(true);
});
it('should fail to acquire held lock', async () => {
await adapter.acquireLock('test-lock', 5000);
const acquiredAgain = await adapter.acquireLock('test-lock', 5000);
expect(acquiredAgain).toBe(false);
});
it('should extend lock', async () => {
await adapter.acquireLock('test-lock', 100);
const extended = await adapter.extendLock('test-lock', 5000);
expect(extended).toBe(true);
});
it('should fail to extend non-existent lock', async () => {
const extended = await adapter.extendLock('non-existent', 5000);
expect(extended).toBe(false);
});
it('should fail to release lock without token', async () => {
// 没有获取锁就释放,应该什么都不做
await adapter.releaseLock('test-lock');
// 仍然可以获取锁
const acquired = await adapter.acquireLock('test-lock', 5000);
expect(acquired).toBe(true);
});
});
// =========================================================================
// 错误处理 | Error Handling
// =========================================================================
describe('error handling', () => {
it('should throw when not connected', async () => {
const disconnected = new RedisAdapter({
factory: () => createMockRedisClient()
});
await expect(disconnected.registerServer({} as ServerRegistration))
.rejects.toThrow('RedisAdapter is not connected');
});
});
// =========================================================================
// 配置 | Configuration
// =========================================================================
describe('configuration', () => {
it('should use default prefix', async () => {
const testMockClient = createMockRedisClient();
const defaultAdapter = new RedisAdapter({
factory: () => testMockClient
});
await defaultAdapter.connect();
await defaultAdapter.registerServer({
serverId: 'server-1',
address: 'localhost',
port: 3000,
roomCount: 0,
playerCount: 0,
capacity: 100,
status: 'online',
lastHeartbeat: Date.now()
});
// hmset 应该被调用key 应该包含默认前缀 'dist:'
expect(testMockClient.hmset).toHaveBeenCalled();
await defaultAdapter.disconnect();
});
it('should use custom prefix', async () => {
const testMockClient = createMockRedisClient();
const customAdapter = new RedisAdapter({
factory: () => testMockClient,
prefix: 'game:'
});
await customAdapter.connect();
await customAdapter.registerServer({
serverId: 'server-1',
address: 'localhost',
port: 3000,
roomCount: 0,
playerCount: 0,
capacity: 100,
status: 'online',
lastHeartbeat: Date.now()
});
// hmset 应该被调用
expect(testMockClient.hmset).toHaveBeenCalled();
await customAdapter.disconnect();
});
});
});

View File

@@ -0,0 +1,257 @@
/**
* @zh 分布式适配器接口
* @en Distributed adapter interface
*
* @zh 定义分布式房间系统的存储和通信层抽象
* @en Defines the storage and communication layer abstraction for distributed room system
*/
import type {
ServerRegistration,
RoomRegistration,
RoomQuery,
RoomSnapshot,
DistributedEvent,
DistributedEventType,
DistributedEventHandler,
Unsubscribe
} from '../types.js';
/**
* @zh 分布式适配器接口
* @en Distributed adapter interface
*
* @zh 所有分布式后端Redis、消息队列等都需要实现此接口
* @en All distributed backends (Redis, message queue, etc.) must implement this interface
*/
export interface IDistributedAdapter {
// =========================================================================
// 生命周期 | Lifecycle
// =========================================================================
/**
* @zh 连接到分布式后端
* @en Connect to distributed backend
*/
connect(): Promise<void>;
/**
* @zh 断开连接
* @en Disconnect from backend
*/
disconnect(): Promise<void>;
/**
* @zh 检查是否已连接
* @en Check if connected
*/
isConnected(): boolean;
// =========================================================================
// 服务器注册 | Server Registry
// =========================================================================
/**
* @zh 注册服务器
* @en Register server
*
* @param server - 服务器注册信息 | Server registration info
*/
registerServer(server: ServerRegistration): Promise<void>;
/**
* @zh 注销服务器
* @en Unregister server
*
* @param serverId - 服务器 ID | Server ID
*/
unregisterServer(serverId: string): Promise<void>;
/**
* @zh 更新服务器心跳
* @en Update server heartbeat
*
* @param serverId - 服务器 ID | Server ID
*/
heartbeat(serverId: string): Promise<void>;
/**
* @zh 获取所有在线服务器
* @en Get all online servers
*/
getServers(): Promise<ServerRegistration[]>;
/**
* @zh 获取指定服务器
* @en Get specific server
*
* @param serverId - 服务器 ID | Server ID
*/
getServer(serverId: string): Promise<ServerRegistration | null>;
/**
* @zh 更新服务器信息
* @en Update server info
*
* @param serverId - 服务器 ID | Server ID
* @param updates - 更新内容 | Updates
*/
updateServer(serverId: string, updates: Partial<ServerRegistration>): Promise<void>;
// =========================================================================
// 房间注册 | Room Registry
// =========================================================================
/**
* @zh 注册房间
* @en Register room
*
* @param room - 房间注册信息 | Room registration info
*/
registerRoom(room: RoomRegistration): Promise<void>;
/**
* @zh 注销房间
* @en Unregister room
*
* @param roomId - 房间 ID | Room ID
*/
unregisterRoom(roomId: string): Promise<void>;
/**
* @zh 更新房间信息
* @en Update room info
*
* @param roomId - 房间 ID | Room ID
* @param updates - 更新内容 | Updates
*/
updateRoom(roomId: string, updates: Partial<RoomRegistration>): Promise<void>;
/**
* @zh 获取房间信息
* @en Get room info
*
* @param roomId - 房间 ID | Room ID
*/
getRoom(roomId: string): Promise<RoomRegistration | null>;
/**
* @zh 查询房间列表
* @en Query room list
*
* @param query - 查询条件 | Query criteria
*/
queryRooms(query: RoomQuery): Promise<RoomRegistration[]>;
/**
* @zh 获取指定类型的可用房间(用于 joinOrCreate
* @en Get available room of type (for joinOrCreate)
*
* @param roomType - 房间类型 | Room type
*/
findAvailableRoom(roomType: string): Promise<RoomRegistration | null>;
/**
* @zh 获取服务器的所有房间
* @en Get all rooms of a server
*
* @param serverId - 服务器 ID | Server ID
*/
getRoomsByServer(serverId: string): Promise<RoomRegistration[]>;
// =========================================================================
// 房间状态 | Room State
// =========================================================================
/**
* @zh 保存房间状态快照
* @en Save room state snapshot
*
* @param snapshot - 状态快照 | State snapshot
*/
saveSnapshot(snapshot: RoomSnapshot): Promise<void>;
/**
* @zh 加载房间状态快照
* @en Load room state snapshot
*
* @param roomId - 房间 ID | Room ID
*/
loadSnapshot(roomId: string): Promise<RoomSnapshot | null>;
/**
* @zh 删除房间状态
* @en Delete room state
*
* @param roomId - 房间 ID | Room ID
*/
deleteSnapshot(roomId: string): Promise<void>;
// =========================================================================
// 发布/订阅 | Pub/Sub
// =========================================================================
/**
* @zh 发布事件
* @en Publish event
*
* @param event - 分布式事件 | Distributed event
*/
publish(event: DistributedEvent): Promise<void>;
/**
* @zh 订阅事件
* @en Subscribe to events
*
* @param pattern - 事件类型模式(支持 '*' 通配符) | Event type pattern (supports '*' wildcard)
* @param handler - 事件处理器 | Event handler
* @returns 取消订阅函数 | Unsubscribe function
*/
subscribe(
pattern: DistributedEventType | '*',
handler: DistributedEventHandler
): Promise<Unsubscribe>;
/**
* @zh 向特定房间发送消息(跨服务器)
* @en Send message to specific room (cross-server)
*
* @param roomId - 房间 ID | Room ID
* @param messageType - 消息类型 | Message type
* @param data - 消息数据 | Message data
* @param playerId - 发送者玩家 ID可选 | Sender player ID (optional)
*/
sendToRoom(roomId: string, messageType: string, data: unknown, playerId?: string): Promise<void>;
// =========================================================================
// 分布式锁 | Distributed Lock
// =========================================================================
/**
* @zh 获取分布式锁
* @en Acquire distributed lock
*
* @param key - 锁的键名 | Lock key
* @param ttlMs - 锁的生存时间(毫秒) | Lock TTL (ms)
* @returns 是否成功获取锁 | Whether lock was acquired
*/
acquireLock(key: string, ttlMs: number): Promise<boolean>;
/**
* @zh 释放分布式锁
* @en Release distributed lock
*
* @param key - 锁的键名 | Lock key
*/
releaseLock(key: string): Promise<void>;
/**
* @zh 扩展锁的生存时间
* @en Extend lock TTL
*
* @param key - 锁的键名 | Lock key
* @param ttlMs - 新的生存时间(毫秒) | New TTL (ms)
* @returns 是否成功扩展 | Whether extension was successful
*/
extendLock(key: string, ttlMs: number): Promise<boolean>;
}

View File

@@ -0,0 +1,503 @@
/**
* @zh 内存分布式适配器
* @en Memory distributed adapter
*
* @zh 用于单机模式和测试的内存实现。所有数据存储在进程内存中。
* @en In-memory implementation for single-server mode and testing. All data stored in process memory.
*/
import type { IDistributedAdapter } from './IDistributedAdapter.js';
import type {
ServerRegistration,
RoomRegistration,
RoomQuery,
RoomSnapshot,
DistributedEvent,
DistributedEventType,
DistributedEventHandler,
Unsubscribe
} from '../types.js';
/**
* @zh 内存适配器配置
* @en Memory adapter configuration
*/
export interface MemoryAdapterConfig {
/**
* @zh 服务器 TTL毫秒超时后视为离线
* @en Server TTL (ms), considered offline after timeout
* @default 15000
*/
serverTtl?: number;
/**
* @zh 是否启用 TTL 检查
* @en Whether to enable TTL check
* @default true
*/
enableTtlCheck?: boolean;
/**
* @zh TTL 检查间隔(毫秒)
* @en TTL check interval (ms)
* @default 5000
*/
ttlCheckInterval?: number;
}
/**
* @zh 内存分布式适配器
* @en Memory distributed adapter
*/
export class MemoryAdapter implements IDistributedAdapter {
private readonly _config: Required<MemoryAdapterConfig>;
private _connected = false;
// 存储
private readonly _servers = new Map<string, ServerRegistration>();
private readonly _rooms = new Map<string, RoomRegistration>();
private readonly _snapshots = new Map<string, RoomSnapshot>();
private readonly _locks = new Map<string, { owner: string; expireAt: number }>();
// 事件订阅
private readonly _subscribers = new Map<string, Set<DistributedEventHandler>>();
private _subscriberId = 0;
// TTL 检查定时器
private _ttlCheckTimer: ReturnType<typeof setInterval> | null = null;
constructor(config: MemoryAdapterConfig = {}) {
this._config = {
serverTtl: 15000,
enableTtlCheck: true,
ttlCheckInterval: 5000,
...config
};
}
// =========================================================================
// 生命周期 | Lifecycle
// =========================================================================
async connect(): Promise<void> {
if (this._connected) return;
this._connected = true;
if (this._config.enableTtlCheck) {
this._ttlCheckTimer = setInterval(
() => this._checkServerTtl(),
this._config.ttlCheckInterval
);
}
}
async disconnect(): Promise<void> {
if (!this._connected) return;
if (this._ttlCheckTimer) {
clearInterval(this._ttlCheckTimer);
this._ttlCheckTimer = null;
}
this._connected = false;
this._servers.clear();
this._rooms.clear();
this._snapshots.clear();
this._locks.clear();
this._subscribers.clear();
}
isConnected(): boolean {
return this._connected;
}
// =========================================================================
// 服务器注册 | Server Registry
// =========================================================================
async registerServer(server: ServerRegistration): Promise<void> {
this._ensureConnected();
this._servers.set(server.serverId, { ...server, lastHeartbeat: Date.now() });
await this.publish({
type: 'server:online',
serverId: server.serverId,
payload: server,
timestamp: Date.now()
});
}
async unregisterServer(serverId: string): Promise<void> {
this._ensureConnected();
const server = this._servers.get(serverId);
if (!server) return;
this._servers.delete(serverId);
// 清理该服务器的所有房间
for (const [roomId, room] of this._rooms) {
if (room.serverId === serverId) {
this._rooms.delete(roomId);
}
}
await this.publish({
type: 'server:offline',
serverId,
payload: { serverId },
timestamp: Date.now()
});
}
async heartbeat(serverId: string): Promise<void> {
this._ensureConnected();
const server = this._servers.get(serverId);
if (server) {
server.lastHeartbeat = Date.now();
}
}
async getServers(): Promise<ServerRegistration[]> {
this._ensureConnected();
return Array.from(this._servers.values()).filter(s => s.status === 'online');
}
async getServer(serverId: string): Promise<ServerRegistration | null> {
this._ensureConnected();
return this._servers.get(serverId) ?? null;
}
async updateServer(serverId: string, updates: Partial<ServerRegistration>): Promise<void> {
this._ensureConnected();
const server = this._servers.get(serverId);
if (server) {
Object.assign(server, updates);
}
}
// =========================================================================
// 房间注册 | Room Registry
// =========================================================================
async registerRoom(room: RoomRegistration): Promise<void> {
this._ensureConnected();
this._rooms.set(room.roomId, { ...room });
// 更新服务器的房间计数
const server = this._servers.get(room.serverId);
if (server) {
server.roomCount = this._countRoomsByServer(room.serverId);
}
await this.publish({
type: 'room:created',
serverId: room.serverId,
roomId: room.roomId,
payload: { roomType: room.roomType },
timestamp: Date.now()
});
}
async unregisterRoom(roomId: string): Promise<void> {
this._ensureConnected();
const room = this._rooms.get(roomId);
if (!room) return;
this._rooms.delete(roomId);
this._snapshots.delete(roomId);
// 更新服务器的房间计数
const server = this._servers.get(room.serverId);
if (server) {
server.roomCount = this._countRoomsByServer(room.serverId);
}
await this.publish({
type: 'room:disposed',
serverId: room.serverId,
roomId,
payload: {},
timestamp: Date.now()
});
}
async updateRoom(roomId: string, updates: Partial<RoomRegistration>): Promise<void> {
this._ensureConnected();
const room = this._rooms.get(roomId);
if (!room) return;
Object.assign(room, updates, { updatedAt: Date.now() });
await this.publish({
type: 'room:updated',
serverId: room.serverId,
roomId,
payload: updates,
timestamp: Date.now()
});
}
async getRoom(roomId: string): Promise<RoomRegistration | null> {
this._ensureConnected();
return this._rooms.get(roomId) ?? null;
}
async queryRooms(query: RoomQuery): Promise<RoomRegistration[]> {
this._ensureConnected();
let results = Array.from(this._rooms.values());
// 按类型过滤
if (query.roomType) {
results = results.filter(r => r.roomType === query.roomType);
}
// 按空位过滤
if (query.hasSpace) {
results = results.filter(r => r.playerCount < r.maxPlayers);
}
// 按锁定状态过滤
if (query.notLocked) {
results = results.filter(r => !r.isLocked);
}
// 按元数据过滤
if (query.metadata) {
results = results.filter(r => {
for (const [key, value] of Object.entries(query.metadata!)) {
if (r.metadata[key] !== value) {
return false;
}
}
return true;
});
}
// 分页
if (query.offset) {
results = results.slice(query.offset);
}
if (query.limit) {
results = results.slice(0, query.limit);
}
return results;
}
async findAvailableRoom(roomType: string): Promise<RoomRegistration | null> {
const rooms = await this.queryRooms({
roomType,
hasSpace: true,
notLocked: true,
limit: 1
});
return rooms[0] ?? null;
}
async getRoomsByServer(serverId: string): Promise<RoomRegistration[]> {
this._ensureConnected();
return Array.from(this._rooms.values()).filter(r => r.serverId === serverId);
}
// =========================================================================
// 房间状态 | Room State
// =========================================================================
async saveSnapshot(snapshot: RoomSnapshot): Promise<void> {
this._ensureConnected();
this._snapshots.set(snapshot.roomId, { ...snapshot });
}
async loadSnapshot(roomId: string): Promise<RoomSnapshot | null> {
this._ensureConnected();
return this._snapshots.get(roomId) ?? null;
}
async deleteSnapshot(roomId: string): Promise<void> {
this._ensureConnected();
this._snapshots.delete(roomId);
}
// =========================================================================
// 发布/订阅 | Pub/Sub
// =========================================================================
async publish(event: DistributedEvent): Promise<void> {
this._ensureConnected();
// 通知所有匹配的订阅者
const wildcardHandlers = this._subscribers.get('*') ?? new Set();
const typeHandlers = this._subscribers.get(event.type) ?? new Set();
for (const handler of wildcardHandlers) {
try {
handler(event);
} catch (error) {
console.error('Event handler error:', error);
}
}
for (const handler of typeHandlers) {
try {
handler(event);
} catch (error) {
console.error('Event handler error:', error);
}
}
}
async subscribe(
pattern: DistributedEventType | '*',
handler: DistributedEventHandler
): Promise<Unsubscribe> {
this._ensureConnected();
if (!this._subscribers.has(pattern)) {
this._subscribers.set(pattern, new Set());
}
this._subscribers.get(pattern)!.add(handler);
return () => {
const handlers = this._subscribers.get(pattern);
if (handlers) {
handlers.delete(handler);
if (handlers.size === 0) {
this._subscribers.delete(pattern);
}
}
};
}
async sendToRoom(
roomId: string,
messageType: string,
data: unknown,
playerId?: string
): Promise<void> {
this._ensureConnected();
const room = this._rooms.get(roomId);
if (!room) return;
await this.publish({
type: 'room:message',
serverId: room.serverId,
roomId,
payload: { messageType, data, playerId },
timestamp: Date.now()
});
}
// =========================================================================
// 分布式锁 | Distributed Lock
// =========================================================================
async acquireLock(key: string, ttlMs: number): Promise<boolean> {
this._ensureConnected();
const now = Date.now();
const existing = this._locks.get(key);
// 检查锁是否已过期
if (existing && existing.expireAt > now) {
return false;
}
// 获取锁
const owner = `lock_${++this._subscriberId}`;
this._locks.set(key, { owner, expireAt: now + ttlMs });
return true;
}
async releaseLock(key: string): Promise<void> {
this._ensureConnected();
this._locks.delete(key);
}
async extendLock(key: string, ttlMs: number): Promise<boolean> {
this._ensureConnected();
const lock = this._locks.get(key);
if (!lock) return false;
lock.expireAt = Date.now() + ttlMs;
return true;
}
// =========================================================================
// 私有方法 | Private Methods
// =========================================================================
private _ensureConnected(): void {
if (!this._connected) {
throw new Error('MemoryAdapter is not connected');
}
}
private _countRoomsByServer(serverId: string): number {
let count = 0;
for (const room of this._rooms.values()) {
if (room.serverId === serverId) {
count++;
}
}
return count;
}
private async _checkServerTtl(): Promise<void> {
const now = Date.now();
const expiredServers: string[] = [];
for (const [serverId, server] of this._servers) {
if (server.status === 'online' && now - server.lastHeartbeat > this._config.serverTtl) {
server.status = 'offline';
expiredServers.push(serverId);
}
}
// 发布服务器离线事件
for (const serverId of expiredServers) {
await this.publish({
type: 'server:offline',
serverId,
payload: { serverId, reason: 'heartbeat_timeout' },
timestamp: now
});
}
}
// =========================================================================
// 测试辅助方法 | Test Helper Methods
// =========================================================================
/**
* @zh 清除所有数据(仅用于测试)
* @en Clear all data (for testing only)
*/
_clear(): void {
this._servers.clear();
this._rooms.clear();
this._snapshots.clear();
this._locks.clear();
}
/**
* @zh 获取内部状态(仅用于测试)
* @en Get internal state (for testing only)
*/
_getState(): {
servers: Map<string, ServerRegistration>;
rooms: Map<string, RoomRegistration>;
snapshots: Map<string, RoomSnapshot>;
} {
return {
servers: this._servers,
rooms: this._rooms,
snapshots: this._snapshots
};
}
}

View File

@@ -0,0 +1,789 @@
/**
* @zh Redis 分布式适配器
* @en Redis distributed adapter
*
* @zh 基于 Redis 的分布式房间适配器,支持 Pub/Sub、分布式锁和状态持久化
* @en Redis-based distributed room adapter with Pub/Sub, distributed lock and state persistence
*/
import type { IDistributedAdapter } from './IDistributedAdapter.js';
import type {
ServerRegistration,
RoomRegistration,
RoomQuery,
RoomSnapshot,
DistributedEvent,
DistributedEventType,
DistributedEventHandler,
Unsubscribe
} from '../types.js';
/**
* @zh Redis 客户端接口(兼容 ioredis
* @en Redis client interface (compatible with ioredis)
*/
export interface RedisClient {
// 基础操作
get(key: string): Promise<string | null>;
set(key: string, value: string, ...args: (string | number)[]): Promise<string | null>;
del(...keys: string[]): Promise<number>;
expire(key: string, seconds: number): Promise<number>;
ttl(key: string): Promise<number>;
// Hash 操作
hget(key: string, field: string): Promise<string | null>;
hset(key: string, ...args: (string | number | Buffer)[]): Promise<number>;
hdel(key: string, ...fields: string[]): Promise<number>;
hgetall(key: string): Promise<Record<string, string>>;
hmset(key: string, ...args: (string | number | Buffer)[]): Promise<'OK'>;
// Set 操作
sadd(key: string, ...members: string[]): Promise<number>;
srem(key: string, ...members: string[]): Promise<number>;
smembers(key: string): Promise<string[]>;
// Pub/Sub
publish(channel: string, message: string): Promise<number>;
subscribe(channel: string): Promise<number>;
psubscribe(pattern: string): Promise<number>;
unsubscribe(...channels: string[]): Promise<number>;
punsubscribe(...patterns: string[]): Promise<number>;
// 事件(重载支持 message 事件的类型安全)
on(event: 'message', callback: (channel: string, message: string) => void): void;
on(event: 'pmessage', callback: (pattern: string, channel: string, message: string) => void): void;
on(event: string, callback: (...args: unknown[]) => void): void;
off(event: 'message', callback: (channel: string, message: string) => void): void;
off(event: 'pmessage', callback: (pattern: string, channel: string, message: string) => void): void;
off(event: string, callback: (...args: unknown[]) => void): void;
// Lua 脚本
eval(script: string, numkeys: number, ...args: (string | number)[]): Promise<unknown>;
// 连接
duplicate(): RedisClient;
quit(): Promise<'OK'>;
disconnect(): void;
}
/**
* @zh Redis 连接工厂
* @en Redis connection factory
*/
export type RedisClientFactory = () => RedisClient | Promise<RedisClient>;
/**
* @zh Redis 适配器配置
* @en Redis adapter configuration
*/
export interface RedisAdapterConfig {
/**
* @zh Redis 客户端工厂(惰性连接)
* @en Redis client factory (lazy connection)
*
* @example
* ```typescript
* import Redis from 'ioredis'
* const adapter = new RedisAdapter({
* factory: () => new Redis('redis://localhost:6379')
* })
* ```
*/
factory: RedisClientFactory;
/**
* @zh 键前缀
* @en Key prefix
* @default 'dist:'
*/
prefix?: string;
/**
* @zh 服务器 TTL
* @en Server TTL (seconds)
* @default 30
*/
serverTtl?: number;
/**
* @zh 房间 TTL0 = 永不过期
* @en Room TTL (seconds), 0 = never expire
* @default 0
*/
roomTtl?: number;
/**
* @zh 快照 TTL
* @en Snapshot TTL (seconds)
* @default 86400 (24 hours)
*/
snapshotTtl?: number;
/**
* @zh Pub/Sub 频道名
* @en Pub/Sub channel name
* @default 'distributed:events'
*/
channel?: string;
}
// Lua 脚本:安全释放锁
const RELEASE_LOCK_SCRIPT = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
// Lua 脚本:扩展锁 TTL
const EXTEND_LOCK_SCRIPT = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("pexpire", KEYS[1], ARGV[2])
else
return 0
end
`;
/**
* @zh Redis 分布式适配器
* @en Redis distributed adapter
*
* @example
* ```typescript
* import Redis from 'ioredis'
* import { RedisAdapter, DistributedRoomManager } from '@esengine/server'
*
* const adapter = new RedisAdapter({
* factory: () => new Redis('redis://localhost:6379'),
* prefix: 'game:'
* })
*
* const manager = new DistributedRoomManager(adapter, {
* serverId: 'server-1',
* serverAddress: 'localhost',
* serverPort: 3000
* }, sendFn)
*
* await manager.start()
* ```
*/
export class RedisAdapter implements IDistributedAdapter {
private readonly _config: Required<RedisAdapterConfig>;
private _client: RedisClient | null = null;
private _subscriber: RedisClient | null = null;
private _connected = false;
// 锁的 owner token用于安全释放
private readonly _lockTokens = new Map<string, string>();
// 事件处理器
private readonly _handlers = new Map<string, Set<DistributedEventHandler>>();
private _messageHandler: ((channel: string, message: string) => void) | null = null;
constructor(config: RedisAdapterConfig) {
this._config = {
prefix: 'dist:',
serverTtl: 30,
roomTtl: 0,
snapshotTtl: 86400,
channel: 'distributed:events',
...config,
factory: config.factory
};
}
// =========================================================================
// Key 生成器 | Key Generators
// =========================================================================
private _key(type: string, id?: string): string {
return id
? `${this._config.prefix}${type}:${id}`
: `${this._config.prefix}${type}`;
}
private _serverKey(serverId: string): string {
return this._key('server', serverId);
}
private _roomKey(roomId: string): string {
return this._key('room', roomId);
}
private _snapshotKey(roomId: string): string {
return this._key('snapshot', roomId);
}
private _lockKey(key: string): string {
return this._key('lock', key);
}
private _serversSetKey(): string {
return this._key('servers');
}
private _roomsSetKey(): string {
return this._key('rooms');
}
private _serverRoomsKey(serverId: string): string {
return this._key('server-rooms', serverId);
}
// =========================================================================
// 生命周期 | Lifecycle
// =========================================================================
async connect(): Promise<void> {
if (this._connected) return;
// 创建主客户端
this._client = await this._config.factory();
// 创建订阅专用客户端
this._subscriber = this._client.duplicate();
// 设置消息处理器
this._messageHandler = (channel: string, message: string) => {
if (channel !== this._config.channel) return;
try {
const event: DistributedEvent = JSON.parse(message);
this._dispatchEvent(event);
} catch (error) {
console.error('[RedisAdapter] Failed to parse event:', error);
}
};
this._subscriber.on('message', this._messageHandler);
await this._subscriber.subscribe(this._config.channel);
this._connected = true;
}
async disconnect(): Promise<void> {
if (!this._connected) return;
// 清理订阅
if (this._subscriber) {
if (this._messageHandler) {
this._subscriber.off('message', this._messageHandler);
}
await this._subscriber.unsubscribe(this._config.channel);
this._subscriber.disconnect();
this._subscriber = null;
}
// 关闭主客户端
if (this._client) {
await this._client.quit();
this._client = null;
}
this._handlers.clear();
this._lockTokens.clear();
this._connected = false;
}
isConnected(): boolean {
return this._connected;
}
private _ensureConnected(): RedisClient {
if (!this._connected || !this._client) {
throw new Error('RedisAdapter is not connected');
}
return this._client;
}
// =========================================================================
// 服务器注册 | Server Registry
// =========================================================================
async registerServer(server: ServerRegistration): Promise<void> {
const client = this._ensureConnected();
const key = this._serverKey(server.serverId);
// 存储服务器信息
await client.hmset(
key,
'serverId', server.serverId,
'address', server.address,
'port', String(server.port),
'roomCount', String(server.roomCount),
'playerCount', String(server.playerCount),
'capacity', String(server.capacity),
'status', server.status,
'lastHeartbeat', String(Date.now()),
'metadata', JSON.stringify(server.metadata ?? {})
);
// 设置 TTL
await client.expire(key, this._config.serverTtl);
// 添加到服务器集合
await client.sadd(this._serversSetKey(), server.serverId);
// 发布事件
await this.publish({
type: 'server:online',
serverId: server.serverId,
payload: server,
timestamp: Date.now()
});
}
async unregisterServer(serverId: string): Promise<void> {
const client = this._ensureConnected();
const key = this._serverKey(serverId);
// 删除服务器信息
await client.del(key);
// 从服务器集合移除
await client.srem(this._serversSetKey(), serverId);
// 删除该服务器的所有房间
const roomIds = await client.smembers(this._serverRoomsKey(serverId));
for (const roomId of roomIds) {
await this.unregisterRoom(roomId);
}
await client.del(this._serverRoomsKey(serverId));
// 发布事件
await this.publish({
type: 'server:offline',
serverId,
payload: { serverId },
timestamp: Date.now()
});
}
async heartbeat(serverId: string): Promise<void> {
const client = this._ensureConnected();
const key = this._serverKey(serverId);
// 更新心跳时间并刷新 TTL
await client.hset(key, 'lastHeartbeat', String(Date.now()));
await client.expire(key, this._config.serverTtl);
}
async getServers(): Promise<ServerRegistration[]> {
const client = this._ensureConnected();
const serverIds = await client.smembers(this._serversSetKey());
const servers: ServerRegistration[] = [];
for (const serverId of serverIds) {
const server = await this.getServer(serverId);
if (server && server.status === 'online') {
servers.push(server);
}
}
return servers;
}
async getServer(serverId: string): Promise<ServerRegistration | null> {
const client = this._ensureConnected();
const key = this._serverKey(serverId);
const data = await client.hgetall(key);
if (!data || !data.serverId) return null;
return {
serverId: data.serverId,
address: data.address,
port: parseInt(data.port, 10),
roomCount: parseInt(data.roomCount, 10),
playerCount: parseInt(data.playerCount, 10),
capacity: parseInt(data.capacity, 10),
status: data.status as ServerRegistration['status'],
lastHeartbeat: parseInt(data.lastHeartbeat, 10),
metadata: data.metadata ? JSON.parse(data.metadata) : {}
};
}
async updateServer(serverId: string, updates: Partial<ServerRegistration>): Promise<void> {
const client = this._ensureConnected();
const key = this._serverKey(serverId);
const args: (string | number)[] = [];
if (updates.address !== undefined) args.push('address', updates.address);
if (updates.port !== undefined) args.push('port', String(updates.port));
if (updates.roomCount !== undefined) args.push('roomCount', String(updates.roomCount));
if (updates.playerCount !== undefined) args.push('playerCount', String(updates.playerCount));
if (updates.capacity !== undefined) args.push('capacity', String(updates.capacity));
if (updates.status !== undefined) args.push('status', updates.status);
if (updates.metadata !== undefined) args.push('metadata', JSON.stringify(updates.metadata));
if (args.length > 0) {
await client.hmset(key, ...args);
}
// 如果是 draining 状态,发布事件
if (updates.status === 'draining') {
await this.publish({
type: 'server:draining',
serverId,
payload: { serverId },
timestamp: Date.now()
});
}
}
// =========================================================================
// 房间注册 | Room Registry
// =========================================================================
async registerRoom(room: RoomRegistration): Promise<void> {
const client = this._ensureConnected();
const key = this._roomKey(room.roomId);
// 存储房间信息
await client.hmset(
key,
'roomId', room.roomId,
'roomType', room.roomType,
'serverId', room.serverId,
'serverAddress', room.serverAddress,
'playerCount', String(room.playerCount),
'maxPlayers', String(room.maxPlayers),
'isLocked', room.isLocked ? '1' : '0',
'metadata', JSON.stringify(room.metadata),
'createdAt', String(room.createdAt),
'updatedAt', String(room.updatedAt)
);
// 设置 TTL如果配置了
if (this._config.roomTtl > 0) {
await client.expire(key, this._config.roomTtl);
}
// 添加到房间集合
await client.sadd(this._roomsSetKey(), room.roomId);
// 添加到服务器的房间列表
await client.sadd(this._serverRoomsKey(room.serverId), room.roomId);
// 更新服务器房间计数
const roomCount = (await client.smembers(this._serverRoomsKey(room.serverId))).length;
await client.hset(this._serverKey(room.serverId), 'roomCount', String(roomCount));
// 发布事件
await this.publish({
type: 'room:created',
serverId: room.serverId,
roomId: room.roomId,
payload: { roomType: room.roomType },
timestamp: Date.now()
});
}
async unregisterRoom(roomId: string): Promise<void> {
const client = this._ensureConnected();
const room = await this.getRoom(roomId);
if (!room) return;
const key = this._roomKey(roomId);
// 删除房间信息
await client.del(key);
// 从房间集合移除
await client.srem(this._roomsSetKey(), roomId);
// 从服务器的房间列表移除
await client.srem(this._serverRoomsKey(room.serverId), roomId);
// 更新服务器房间计数
const roomCount = (await client.smembers(this._serverRoomsKey(room.serverId))).length;
await client.hset(this._serverKey(room.serverId), 'roomCount', String(roomCount));
// 删除快照
await this.deleteSnapshot(roomId);
// 发布事件
await this.publish({
type: 'room:disposed',
serverId: room.serverId,
roomId,
payload: {},
timestamp: Date.now()
});
}
async updateRoom(roomId: string, updates: Partial<RoomRegistration>): Promise<void> {
const client = this._ensureConnected();
const room = await this.getRoom(roomId);
if (!room) return;
const key = this._roomKey(roomId);
const args: (string | number)[] = [];
if (updates.playerCount !== undefined) args.push('playerCount', String(updates.playerCount));
if (updates.maxPlayers !== undefined) args.push('maxPlayers', String(updates.maxPlayers));
if (updates.isLocked !== undefined) args.push('isLocked', updates.isLocked ? '1' : '0');
if (updates.metadata !== undefined) args.push('metadata', JSON.stringify(updates.metadata));
args.push('updatedAt', String(Date.now()));
if (args.length > 0) {
await client.hmset(key, ...args);
}
// 发布更新事件
await this.publish({
type: 'room:updated',
serverId: room.serverId,
roomId,
payload: updates,
timestamp: Date.now()
});
// 如果锁定状态变化,发布专门事件
if (updates.isLocked !== undefined) {
await this.publish({
type: updates.isLocked ? 'room:locked' : 'room:unlocked',
serverId: room.serverId,
roomId,
payload: {},
timestamp: Date.now()
});
}
}
async getRoom(roomId: string): Promise<RoomRegistration | null> {
const client = this._ensureConnected();
const key = this._roomKey(roomId);
const data = await client.hgetall(key);
if (!data || !data.roomId) return null;
return {
roomId: data.roomId,
roomType: data.roomType,
serverId: data.serverId,
serverAddress: data.serverAddress,
playerCount: parseInt(data.playerCount, 10),
maxPlayers: parseInt(data.maxPlayers, 10),
isLocked: data.isLocked === '1',
metadata: data.metadata ? JSON.parse(data.metadata) : {},
createdAt: parseInt(data.createdAt, 10),
updatedAt: parseInt(data.updatedAt, 10)
};
}
async queryRooms(query: RoomQuery): Promise<RoomRegistration[]> {
const client = this._ensureConnected();
const roomIds = await client.smembers(this._roomsSetKey());
let results: RoomRegistration[] = [];
// 获取所有房间
for (const roomId of roomIds) {
const room = await this.getRoom(roomId);
if (room) results.push(room);
}
// 过滤
if (query.roomType) {
results = results.filter(r => r.roomType === query.roomType);
}
if (query.hasSpace) {
results = results.filter(r => r.playerCount < r.maxPlayers);
}
if (query.notLocked) {
results = results.filter(r => !r.isLocked);
}
if (query.metadata) {
results = results.filter(r => {
for (const [key, value] of Object.entries(query.metadata!)) {
if (r.metadata[key] !== value) return false;
}
return true;
});
}
// 分页
if (query.offset) {
results = results.slice(query.offset);
}
if (query.limit) {
results = results.slice(0, query.limit);
}
return results;
}
async findAvailableRoom(roomType: string): Promise<RoomRegistration | null> {
const rooms = await this.queryRooms({
roomType,
hasSpace: true,
notLocked: true,
limit: 1
});
return rooms[0] ?? null;
}
async getRoomsByServer(serverId: string): Promise<RoomRegistration[]> {
const client = this._ensureConnected();
const roomIds = await client.smembers(this._serverRoomsKey(serverId));
const rooms: RoomRegistration[] = [];
for (const roomId of roomIds) {
const room = await this.getRoom(roomId);
if (room) rooms.push(room);
}
return rooms;
}
// =========================================================================
// 房间状态 | Room State
// =========================================================================
async saveSnapshot(snapshot: RoomSnapshot): Promise<void> {
const client = this._ensureConnected();
const key = this._snapshotKey(snapshot.roomId);
await client.set(key, JSON.stringify(snapshot));
await client.expire(key, this._config.snapshotTtl);
}
async loadSnapshot(roomId: string): Promise<RoomSnapshot | null> {
const client = this._ensureConnected();
const key = this._snapshotKey(roomId);
const data = await client.get(key);
return data ? JSON.parse(data) : null;
}
async deleteSnapshot(roomId: string): Promise<void> {
const client = this._ensureConnected();
const key = this._snapshotKey(roomId);
await client.del(key);
}
// =========================================================================
// 发布/订阅 | Pub/Sub
// =========================================================================
async publish(event: DistributedEvent): Promise<void> {
const client = this._ensureConnected();
await client.publish(this._config.channel, JSON.stringify(event));
}
async subscribe(
pattern: DistributedEventType | '*',
handler: DistributedEventHandler
): Promise<Unsubscribe> {
if (!this._handlers.has(pattern)) {
this._handlers.set(pattern, new Set());
}
this._handlers.get(pattern)!.add(handler);
return () => {
const handlers = this._handlers.get(pattern);
if (handlers) {
handlers.delete(handler);
if (handlers.size === 0) {
this._handlers.delete(pattern);
}
}
};
}
async sendToRoom(
roomId: string,
messageType: string,
data: unknown,
playerId?: string
): Promise<void> {
const room = await this.getRoom(roomId);
if (!room) return;
await this.publish({
type: 'room:message',
serverId: room.serverId,
roomId,
payload: { messageType, data, playerId },
timestamp: Date.now()
});
}
private _dispatchEvent(event: DistributedEvent): void {
// 通知通配符订阅者
const wildcardHandlers = this._handlers.get('*');
if (wildcardHandlers) {
for (const handler of wildcardHandlers) {
try {
handler(event);
} catch (error) {
console.error('[RedisAdapter] Event handler error:', error);
}
}
}
// 通知类型匹配的订阅者
const typeHandlers = this._handlers.get(event.type);
if (typeHandlers) {
for (const handler of typeHandlers) {
try {
handler(event);
} catch (error) {
console.error('[RedisAdapter] Event handler error:', error);
}
}
}
}
// =========================================================================
// 分布式锁 | Distributed Lock
// =========================================================================
async acquireLock(key: string, ttlMs: number): Promise<boolean> {
const client = this._ensureConnected();
const lockKey = this._lockKey(key);
const token = `${Date.now()}_${Math.random().toString(36).substring(2)}`;
const ttlSeconds = Math.ceil(ttlMs / 1000);
const result = await client.set(lockKey, token, 'NX', 'EX', ttlSeconds);
if (result === 'OK') {
this._lockTokens.set(key, token);
return true;
}
return false;
}
async releaseLock(key: string): Promise<void> {
const client = this._ensureConnected();
const lockKey = this._lockKey(key);
const token = this._lockTokens.get(key);
if (!token) return;
await client.eval(RELEASE_LOCK_SCRIPT, 1, lockKey, token);
this._lockTokens.delete(key);
}
async extendLock(key: string, ttlMs: number): Promise<boolean> {
const client = this._ensureConnected();
const lockKey = this._lockKey(key);
const token = this._lockTokens.get(key);
if (!token) return false;
const result = await client.eval(EXTEND_LOCK_SCRIPT, 1, lockKey, token, String(ttlMs));
return result === 1;
}
}
/**
* @zh 创建 Redis 适配器
* @en Create Redis adapter
*/
export function createRedisAdapter(config: RedisAdapterConfig): RedisAdapter {
return new RedisAdapter(config);
}

View File

@@ -0,0 +1,14 @@
/**
* @zh 分布式适配器模块导出
* @en Distributed adapters module exports
*/
export type { IDistributedAdapter } from './IDistributedAdapter.js';
export { MemoryAdapter, type MemoryAdapterConfig } from './MemoryAdapter.js';
export {
RedisAdapter,
createRedisAdapter,
type RedisAdapterConfig,
type RedisClient,
type RedisClientFactory
} from './RedisAdapter.js';

View File

@@ -0,0 +1,67 @@
/**
* @zh 分布式房间支持模块
* @en Distributed room support module
*
* @zh 提供多服务器房间管理、跨服务器路由和故障转移功能。
* @en Provides multi-server room management, cross-server routing, and failover features.
*
* @example
* ```typescript
* import {
* DistributedRoomManager,
* MemoryAdapter,
* type IDistributedAdapter
* } from '@esengine/server/distributed';
*
* // 单机模式(使用内存适配器)
* const adapter = new MemoryAdapter();
* const manager = new DistributedRoomManager(adapter, {
* serverId: 'server-1',
* serverAddress: 'localhost',
* serverPort: 3000
* }, sendFn);
*
* await manager.start();
* ```
*/
// 类型导出 | Type exports
export type {
ServerStatus,
ServerRegistration,
RoomRegistration,
RoomQuery,
PlayerSnapshot,
RoomSnapshot,
DistributedEventType,
DistributedEvent,
DistributedEventHandler,
Unsubscribe,
DistributedRoomManagerConfig,
DistributedConfig,
RoutingResultType,
RoutingResult,
RoutingRequest
} from './types.js';
// 适配器导出 | Adapter exports
export type { IDistributedAdapter } from './adapters/index.js';
export { MemoryAdapter, type MemoryAdapterConfig } from './adapters/index.js';
export {
RedisAdapter,
createRedisAdapter,
type RedisAdapterConfig,
type RedisClient,
type RedisClientFactory
} from './adapters/index.js';
// 路由模块 | Routing module
export {
LoadBalancedRouter,
createLoadBalancedRouter,
type LoadBalanceStrategy,
type LoadBalancedRouterConfig
} from './routing/index.js';
// 分布式房间管理器 | Distributed room manager
export { DistributedRoomManager } from './DistributedRoomManager.js';

View File

@@ -0,0 +1,198 @@
/**
* @zh 负载均衡路由器
* @en Load-balanced router for server selection
*/
import type { ServerRegistration } from '../types.js';
/**
* @zh 负载均衡策略
* @en Load balancing strategy
*/
export type LoadBalanceStrategy =
| 'round-robin' // 轮询
| 'least-rooms' // 最少房间
| 'least-players' // 最少玩家
| 'random' // 随机
| 'weighted'; // 加权(基于剩余容量)
/**
* @zh 负载均衡路由器配置
* @en Load-balanced router configuration
*/
export interface LoadBalancedRouterConfig {
/**
* @zh 负载均衡策略
* @en Load balancing strategy
* @default 'least-rooms'
*/
strategy?: LoadBalanceStrategy;
/**
* @zh 本地服务器优先
* @en Prefer local server
* @default true
*/
preferLocal?: boolean;
/**
* @zh 本地服务器优先阈值0-1之间表示本地服务器负载低于此比例时优先使用本地
* @en Local server preference threshold (0-1, prefer local if load is below this ratio)
* @default 0.8
*/
localPreferenceThreshold?: number;
}
/**
* @zh 负载均衡路由器
* @en Load-balanced router for selecting optimal server
*
* @example
* ```typescript
* const router = new LoadBalancedRouter({
* strategy: 'least-rooms',
* preferLocal: true
* });
*
* const bestServer = router.selectServer(servers, 'server-1');
* ```
*/
export class LoadBalancedRouter {
private readonly _config: Required<LoadBalancedRouterConfig>;
private _roundRobinIndex = 0;
constructor(config: LoadBalancedRouterConfig = {}) {
this._config = {
strategy: config.strategy ?? 'least-rooms',
preferLocal: config.preferLocal ?? true,
localPreferenceThreshold: config.localPreferenceThreshold ?? 0.8
};
}
/**
* @zh 选择最优服务器
* @en Select optimal server
*
* @param servers - 可用服务器列表 | Available servers
* @param localServerId - 本地服务器 ID | Local server ID
* @returns 最优服务器,如果没有可用服务器返回 null | Optimal server, or null if none available
*/
selectServer(
servers: ServerRegistration[],
localServerId?: string
): ServerRegistration | null {
// 过滤掉不可用的服务器
const availableServers = servers.filter(s =>
s.status === 'online' && s.roomCount < s.capacity
);
if (availableServers.length === 0) {
return null;
}
// 本地服务器优先检查
if (this._config.preferLocal && localServerId) {
const localServer = availableServers.find(s => s.serverId === localServerId);
if (localServer) {
const loadRatio = localServer.roomCount / localServer.capacity;
if (loadRatio < this._config.localPreferenceThreshold) {
return localServer;
}
}
}
// 应用负载均衡策略
switch (this._config.strategy) {
case 'round-robin':
return this._selectRoundRobin(availableServers);
case 'least-rooms':
return this._selectLeastRooms(availableServers);
case 'least-players':
return this._selectLeastPlayers(availableServers);
case 'random':
return this._selectRandom(availableServers);
case 'weighted':
return this._selectWeighted(availableServers);
default:
return this._selectLeastRooms(availableServers);
}
}
/**
* @zh 选择创建房间的最优服务器
* @en Select optimal server for room creation
*/
selectServerForCreation(
servers: ServerRegistration[],
localServerId?: string
): ServerRegistration | null {
return this.selectServer(servers, localServerId);
}
/**
* @zh 重置轮询索引
* @en Reset round-robin index
*/
resetRoundRobin(): void {
this._roundRobinIndex = 0;
}
// =========================================================================
// 私有方法 | Private Methods
// =========================================================================
private _selectRoundRobin(servers: ServerRegistration[]): ServerRegistration {
const server = servers[this._roundRobinIndex % servers.length];
this._roundRobinIndex++;
return server;
}
private _selectLeastRooms(servers: ServerRegistration[]): ServerRegistration {
return servers.reduce((best, current) =>
current.roomCount < best.roomCount ? current : best
);
}
private _selectLeastPlayers(servers: ServerRegistration[]): ServerRegistration {
return servers.reduce((best, current) =>
current.playerCount < best.playerCount ? current : best
);
}
private _selectRandom(servers: ServerRegistration[]): ServerRegistration {
return servers[Math.floor(Math.random() * servers.length)];
}
private _selectWeighted(servers: ServerRegistration[]): ServerRegistration {
// 计算每个服务器的权重(剩余容量占比)
const weights = servers.map(s => ({
server: s,
weight: (s.capacity - s.roomCount) / s.capacity
}));
// 计算总权重
const totalWeight = weights.reduce((sum, w) => sum + w.weight, 0);
// 随机选择(加权)
let random = Math.random() * totalWeight;
for (const { server, weight } of weights) {
random -= weight;
if (random <= 0) {
return server;
}
}
// 兜底返回第一个
return servers[0];
}
}
/**
* @zh 创建负载均衡路由器
* @en Create load-balanced router
*/
export function createLoadBalancedRouter(
config?: LoadBalancedRouterConfig
): LoadBalancedRouter {
return new LoadBalancedRouter(config);
}

View File

@@ -0,0 +1,11 @@
/**
* @zh 路由模块导出
* @en Routing module exports
*/
export {
LoadBalancedRouter,
createLoadBalancedRouter,
type LoadBalanceStrategy,
type LoadBalancedRouterConfig
} from './LoadBalancedRouter.js';

View File

@@ -0,0 +1,496 @@
/**
* @zh 分布式房间支持类型定义
* @en Distributed room support type definitions
*/
// =============================================================================
// 服务器注册 | Server Registration
// =============================================================================
/**
* @zh 服务器状态
* @en Server status
*/
export type ServerStatus = 'online' | 'draining' | 'offline';
/**
* @zh 服务器注册信息
* @en Server registration info
*/
export interface ServerRegistration {
/**
* @zh 服务器唯一标识
* @en Server unique identifier
*/
serverId: string;
/**
* @zh 服务器地址(供客户端连接)
* @en Server address (for client connection)
*/
address: string;
/**
* @zh 服务器端口
* @en Server port
*/
port: number;
/**
* @zh 当前房间数量
* @en Current room count
*/
roomCount: number;
/**
* @zh 当前玩家数量
* @en Current player count
*/
playerCount: number;
/**
* @zh 服务器容量(最大房间数)
* @en Server capacity (max rooms)
*/
capacity: number;
/**
* @zh 服务器状态
* @en Server status
*/
status: ServerStatus;
/**
* @zh 最后心跳时间戳
* @en Last heartbeat timestamp
*/
lastHeartbeat: number;
/**
* @zh 服务器元数据
* @en Server metadata
*/
metadata?: Record<string, unknown>;
}
// =============================================================================
// 房间注册 | Room Registration
// =============================================================================
/**
* @zh 房间注册信息
* @en Room registration info
*/
export interface RoomRegistration {
/**
* @zh 房间唯一标识
* @en Room unique identifier
*/
roomId: string;
/**
* @zh 房间类型名称
* @en Room type name
*/
roomType: string;
/**
* @zh 所在服务器 ID
* @en Host server ID
*/
serverId: string;
/**
* @zh 服务器地址
* @en Server address
*/
serverAddress: string;
/**
* @zh 当前玩家数量
* @en Current player count
*/
playerCount: number;
/**
* @zh 最大玩家数量
* @en Max player count
*/
maxPlayers: number;
/**
* @zh 房间是否已锁定
* @en Whether room is locked
*/
isLocked: boolean;
/**
* @zh 房间元数据(标签、自定义属性等)
* @en Room metadata (tags, custom properties, etc.)
*/
metadata: Record<string, unknown>;
/**
* @zh 创建时间戳
* @en Creation timestamp
*/
createdAt: number;
/**
* @zh 更新时间戳
* @en Update timestamp
*/
updatedAt: number;
}
/**
* @zh 房间查询条件
* @en Room query criteria
*/
export interface RoomQuery {
/**
* @zh 房间类型
* @en Room type
*/
roomType?: string;
/**
* @zh 服务器 ID查询特定服务器上的房间
* @en Server ID (query rooms on specific server)
*/
serverId?: string;
/**
* @zh 是否有空位
* @en Whether has available space
*/
hasSpace?: boolean;
/**
* @zh 是否未锁定
* @en Whether not locked
*/
notLocked?: boolean;
/**
* @zh 元数据过滤
* @en Metadata filter
*/
metadata?: Record<string, unknown>;
/**
* @zh 返回数量限制
* @en Result limit
*/
limit?: number;
/**
* @zh 偏移量(分页)
* @en Offset (pagination)
*/
offset?: number;
}
// =============================================================================
// 房间状态快照 | Room State Snapshot
// =============================================================================
/**
* @zh 玩家快照
* @en Player snapshot
*/
export interface PlayerSnapshot {
/**
* @zh 玩家 ID
* @en Player ID
*/
id: string;
/**
* @zh 玩家数据
* @en Player data
*/
data: Record<string, unknown>;
}
/**
* @zh 房间状态快照
* @en Room state snapshot
*/
export interface RoomSnapshot<TState = unknown> {
/**
* @zh 房间 ID
* @en Room ID
*/
roomId: string;
/**
* @zh 房间类型
* @en Room type
*/
roomType: string;
/**
* @zh 房间状态
* @en Room state
*/
state: TState;
/**
* @zh 玩家列表
* @en Player list
*/
players: PlayerSnapshot[];
/**
* @zh 快照版本号
* @en Snapshot version
*/
version: number;
/**
* @zh 快照时间戳
* @en Snapshot timestamp
*/
timestamp: number;
}
// =============================================================================
// 分布式事件 | Distributed Events
// =============================================================================
/**
* @zh 分布式事件类型
* @en Distributed event types
*/
export type DistributedEventType =
| 'room:created'
| 'room:disposed'
| 'room:updated'
| 'room:locked'
| 'room:unlocked'
| 'room:message'
| 'room:migrated'
| 'player:joined'
| 'player:left'
| 'server:online'
| 'server:offline'
| 'server:draining';
/**
* @zh 分布式事件
* @en Distributed event
*/
export interface DistributedEvent<T = unknown> {
/**
* @zh 事件类型
* @en Event type
*/
type: DistributedEventType;
/**
* @zh 发送方服务器 ID
* @en Sender server ID
*/
serverId: string;
/**
* @zh 相关房间 ID可选
* @en Related room ID (optional)
*/
roomId?: string;
/**
* @zh 事件载荷
* @en Event payload
*/
payload: T;
/**
* @zh 事件时间戳
* @en Event timestamp
*/
timestamp: number;
}
/**
* @zh 事件处理器
* @en Event handler
*/
export type DistributedEventHandler<T = unknown> = (event: DistributedEvent<T>) => void;
/**
* @zh 取消订阅函数
* @en Unsubscribe function
*/
export type Unsubscribe = () => void;
// =============================================================================
// 分布式配置 | Distributed Configuration
// =============================================================================
/**
* @zh 分布式房间管理器配置
* @en Distributed room manager configuration
*/
export interface DistributedRoomManagerConfig {
/**
* @zh 服务器 ID唯一标识
* @en Server ID (unique identifier)
*/
serverId: string;
/**
* @zh 服务器公开地址(供客户端连接)
* @en Server public address (for client connection)
*/
serverAddress: string;
/**
* @zh 服务器端口
* @en Server port
*/
serverPort: number;
/**
* @zh 心跳间隔(毫秒)
* @en Heartbeat interval (ms)
* @default 5000
*/
heartbeatInterval?: number;
/**
* @zh 状态快照间隔毫秒0 = 禁用
* @en State snapshot interval (ms), 0 = disabled
* @default 30000
*/
snapshotInterval?: number;
/**
* @zh 房间迁移超时(毫秒)
* @en Room migration timeout (ms)
* @default 10000
*/
migrationTimeout?: number;
/**
* @zh 是否启用自动故障转移
* @en Whether to enable automatic failover
* @default true
*/
enableFailover?: boolean;
/**
* @zh 服务器容量(最大房间数)
* @en Server capacity (max rooms)
* @default 100
*/
capacity?: number;
/**
* @zh 服务器元数据
* @en Server metadata
*/
metadata?: Record<string, unknown>;
}
/**
* @zh 服务器分布式配置(用于 createServer
* @en Server distributed configuration (for createServer)
*/
export interface DistributedConfig extends Omit<DistributedRoomManagerConfig, 'serverPort'> {
/**
* @zh 是否启用分布式模式
* @en Whether to enable distributed mode
* @default false
*/
enabled: boolean;
/**
* @zh 分布式适配器(可选,默认使用 MemoryAdapter
* @en Distributed adapter (optional, defaults to MemoryAdapter)
*/
adapter?: import('./adapters/IDistributedAdapter.js').IDistributedAdapter;
/**
* @zh 服务器端口(可选,默认使用服务器配置的端口)
* @en Server port (optional, defaults to server config port)
*/
serverPort?: number;
}
// =============================================================================
// 路由 | Routing
// =============================================================================
/**
* @zh 路由结果类型
* @en Routing result type
*/
export type RoutingResultType = 'local' | 'redirect' | 'create' | 'unavailable';
/**
* @zh 路由结果
* @en Routing result
*/
export interface RoutingResult {
/**
* @zh 路由类型
* @en Routing type
*/
type: RoutingResultType;
/**
* @zh 目标服务器地址redirect 时)
* @en Target server address (for redirect)
*/
serverAddress?: string;
/**
* @zh 目标房间 ID
* @en Target room ID
*/
roomId?: string;
/**
* @zh 错误信息unavailable 时)
* @en Error message (for unavailable)
*/
reason?: string;
}
/**
* @zh 路由请求
* @en Routing request
*/
export interface RoutingRequest {
/**
* @zh 玩家 ID
* @en Player ID
*/
playerId: string;
/**
* @zh 房间类型joinOrCreate 时)
* @en Room type (for joinOrCreate)
*/
roomType?: string;
/**
* @zh 房间 IDjoinById 时)
* @en Room ID (for joinById)
*/
roomId?: string;
/**
* @zh 首选服务器 ID
* @en Preferred server ID
*/
preferredServerId?: string;
/**
* @zh 房间查询条件
* @en Room query criteria
*/
query?: RoomQuery;
}

View File

@@ -143,6 +143,15 @@ export abstract class ECSRoom<TState = any, TPlayerData = Record<string, unknown
constructor(ecsConfig?: Partial<ECSRoomConfig>) {
super();
// Check Core initialization
if (!Core.worldManager) {
throw new Error(
'ECSRoom requires Core.create() to be called first. ' +
'Ensure Core is initialized before creating ECSRoom instances.'
);
}
this.ecsConfig = { ...DEFAULT_ECS_CONFIG, ...ecsConfig };
this.worldId = `room_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`;
@@ -268,9 +277,12 @@ export abstract class ECSRoom<TState = any, TPlayerData = Record<string, unknown
/**
* @zh 发送二进制数据给指定玩家
* @en Send binary data to specific player
*
* @zh 使用原生 WebSocket 二进制帧发送,效率更高
* @en Uses native WebSocket binary frames, more efficient
*/
protected sendBinary(player: Player<TPlayerData>, data: Uint8Array): void {
player.send('$sync', { data: Array.from(data) });
player.sendBinary(data);
}
/**

View File

@@ -4,6 +4,19 @@
*/
import type { ApiDefinition, MsgDefinition, HttpDefinition } from '../types/index.js';
import type { Validator, Infer } from '../schema/index.js';
/**
* @zh 带 Schema 的 API 定义选项
* @en API definition options with schema
*/
export interface ApiDefinitionWithSchema<TReq, TRes, TData = Record<string, unknown>> extends ApiDefinition<TReq, TRes, TData> {
/**
* @zh 请求数据 Schema自动验证
* @en Request data schema (auto validation)
*/
schema?: Validator<TReq>;
}
/**
* @zh 定义 API 处理器
@@ -21,13 +34,81 @@ import type { ApiDefinition, MsgDefinition, HttpDefinition } from '../types/inde
* }
* })
* ```
*
* @example
* ```typescript
* // 使用 Schema 验证 | With schema validation
* import { defineApi, s } from '@esengine/server'
*
* const MoveSchema = s.object({
* x: s.number(),
* y: s.number()
* });
*
* export default defineApi({
* schema: MoveSchema,
* handler(req, ctx) {
* // req 已验证,类型安全 | req is validated, type-safe
* console.log(req.x, req.y);
* }
* })
* ```
*/
export function defineApi<TReq, TRes, TData = Record<string, unknown>>(
definition: ApiDefinition<TReq, TRes, TData>
): ApiDefinition<TReq, TRes, TData> {
definition: ApiDefinitionWithSchema<TReq, TRes, TData>
): ApiDefinitionWithSchema<TReq, TRes, TData> {
return definition;
}
/**
* @zh 使用 Schema 定义 API 处理器(类型自动推断)
* @en Define API handler with schema (auto type inference)
*
* @example
* ```typescript
* import { defineApiWithSchema, s } from '@esengine/server'
*
* const MoveSchema = s.object({
* x: s.number(),
* y: s.number()
* });
*
* export default defineApiWithSchema(MoveSchema, {
* handler(req, ctx) {
* // req 类型自动推断为 { x: number, y: number }
* // req type is auto-inferred as { x: number, y: number }
* console.log(req.x, req.y);
* return { success: true };
* }
* })
* ```
*/
export function defineApiWithSchema<
TReq,
TRes,
TData = Record<string, unknown>
>(
schema: Validator<TReq>,
definition: Omit<ApiDefinition<TReq, TRes, TData>, 'validate'>
): ApiDefinitionWithSchema<TReq, TRes, TData> {
return {
...definition,
schema
};
}
/**
* @zh 带 Schema 的消息定义选项
* @en Message definition options with schema
*/
export interface MsgDefinitionWithSchema<TMsg, TData = Record<string, unknown>> extends MsgDefinition<TMsg, TData> {
/**
* @zh 消息数据 Schema自动验证
* @en Message data schema (auto validation)
*/
schema?: Validator<TMsg>;
}
/**
* @zh 定义消息处理器
* @en Define message handler
@@ -43,13 +124,67 @@ export function defineApi<TReq, TRes, TData = Record<string, unknown>>(
* }
* })
* ```
*
* @example
* ```typescript
* // 使用 Schema 验证 | With schema validation
* import { defineMsg, s } from '@esengine/server'
*
* const InputSchema = s.object({
* keys: s.array(s.string()),
* timestamp: s.number()
* });
*
* export default defineMsg({
* schema: InputSchema,
* handler(msg, ctx) {
* // msg 已验证,类型安全 | msg is validated, type-safe
* console.log(msg.keys, msg.timestamp);
* }
* })
* ```
*/
export function defineMsg<TMsg, TData = Record<string, unknown>>(
definition: MsgDefinition<TMsg, TData>
): MsgDefinition<TMsg, TData> {
definition: MsgDefinitionWithSchema<TMsg, TData>
): MsgDefinitionWithSchema<TMsg, TData> {
return definition;
}
/**
* @zh 使用 Schema 定义消息处理器(类型自动推断)
* @en Define message handler with schema (auto type inference)
*
* @example
* ```typescript
* import { defineMsgWithSchema, s } from '@esengine/server'
*
* const InputSchema = s.object({
* keys: s.array(s.string()),
* timestamp: s.number()
* });
*
* export default defineMsgWithSchema(InputSchema, {
* handler(msg, ctx) {
* // msg 类型自动推断
* // msg type is auto-inferred
* console.log(msg.keys, msg.timestamp);
* }
* })
* ```
*/
export function defineMsgWithSchema<
TMsg,
TData = Record<string, unknown>
>(
schema: Validator<TMsg>,
definition: MsgDefinition<TMsg, TData>
): MsgDefinitionWithSchema<TMsg, TData> {
return {
...definition,
schema
};
}
/**
* @zh 定义 HTTP 路由处理器
* @en Define HTTP route handler

View File

@@ -30,13 +30,73 @@
export { createServer } from './core/server.js';
// Helpers
export { defineApi, defineMsg, defineHttp } from './helpers/define.js';
export {
defineApi,
defineMsg,
defineHttp,
defineApiWithSchema,
defineMsgWithSchema,
type ApiDefinitionWithSchema,
type MsgDefinitionWithSchema
} from './helpers/define.js';
// Schema Validation System
export {
// Schema Builder (main API)
s,
// Primitive Validators
string,
number,
boolean,
literal,
any,
// Composite Validators
object,
array,
tuple,
union,
record,
nativeEnum,
// Validator Classes (for extension)
StringValidator,
NumberValidator,
BooleanValidator,
LiteralValidator,
AnyValidator,
ObjectValidator,
ArrayValidator,
TupleValidator,
UnionValidator,
RecordValidator,
EnumValidator,
// Helpers
parse,
safeParse,
createGuard,
// Types
type Validator,
type ValidationResult,
type ValidationSuccess,
type ValidationFailure,
type ValidationError,
type Infer,
type ObjectShape,
type InferShape
} from './schema/index.js';
// Room System
export { Room, type RoomOptions } from './room/Room.js';
export { Player, type IPlayer } from './room/Player.js';
export { onMessage } from './room/decorators.js';
// ECS Room (for ECS-integrated games)
export { ECSRoom, type ECSRoomConfig } from './ecs/ECSRoom.js';
// Types
export type {
ServerConfig,
@@ -62,3 +122,36 @@ export type {
// Re-export useful types from @esengine/rpc
export { RpcError, ErrorCode } from '@esengine/rpc';
// Distributed Room Support
export {
DistributedRoomManager,
MemoryAdapter,
RedisAdapter,
createRedisAdapter,
LoadBalancedRouter,
createLoadBalancedRouter,
type IDistributedAdapter,
type MemoryAdapterConfig,
type RedisAdapterConfig,
type RedisClient,
type RedisClientFactory,
type ServerStatus,
type ServerRegistration,
type RoomRegistration,
type RoomQuery,
type RoomSnapshot,
type DistributedEvent,
type DistributedEventType,
type DistributedEventHandler,
type DistributedRoomManagerConfig,
type DistributedConfig,
type RoutingResult,
type RoutingRequest,
type LoadBalanceStrategy,
type LoadBalancedRouterConfig
} from './distributed/index.js';
// Room Manager (for extension)
export { RoomManager, type RoomClass } from './room/RoomManager.js';

View File

@@ -14,6 +14,7 @@ export interface IPlayer<TData = Record<string, unknown>> {
readonly roomId: string
data: TData
send<T>(type: string, data: T): void
sendBinary(data: Uint8Array): void
leave(reason?: string): void
}
@@ -28,6 +29,7 @@ export class Player<TData = Record<string, unknown>> implements IPlayer<TData> {
private _conn: Connection<any>;
private _sendFn: (conn: Connection<any>, type: string, data: unknown) => void;
private _sendBinaryFn?: (conn: Connection<any>, data: Uint8Array) => void;
private _leaveFn: (player: Player<TData>, reason?: string) => void;
constructor(options: {
@@ -35,6 +37,7 @@ export class Player<TData = Record<string, unknown>> implements IPlayer<TData> {
roomId: string
conn: Connection<any>
sendFn: (conn: Connection<any>, type: string, data: unknown) => void
sendBinaryFn?: (conn: Connection<any>, data: Uint8Array) => void
leaveFn: (player: Player<TData>, reason?: string) => void
initialData?: TData
}) {
@@ -42,6 +45,7 @@ export class Player<TData = Record<string, unknown>> implements IPlayer<TData> {
this.roomId = options.roomId;
this._conn = options.conn;
this._sendFn = options.sendFn;
this._sendBinaryFn = options.sendBinaryFn;
this._leaveFn = options.leaveFn;
this.data = options.initialData ?? ({} as TData);
}
@@ -62,6 +66,36 @@ export class Player<TData = Record<string, unknown>> implements IPlayer<TData> {
this._sendFn(this._conn, type, data);
}
/**
* @zh 发送二进制数据给玩家
* @en Send binary data to player
*
* @zh 如果底层连接支持原生二进制帧,则直接发送;否则降级为 base64 编码通过 JSON 发送
* @en If underlying connection supports native binary frames, sends directly; otherwise falls back to base64 encoding via JSON
*/
sendBinary(data: Uint8Array): void {
if (this._sendBinaryFn) {
this._sendBinaryFn(this._conn, data);
} else {
this.send('$binary', { data: this._toBase64(data) });
}
}
/**
* @zh 将 Uint8Array 转换为 base64 字符串
* @en Convert Uint8Array to base64 string
*/
private _toBase64(data: Uint8Array): string {
if (typeof Buffer !== 'undefined') {
return Buffer.from(data).toString('base64');
}
let binary = '';
for (let i = 0; i < data.length; i++) {
binary += String.fromCharCode(data[i]);
}
return btoa(binary);
}
/**
* @zh 让玩家离开房间
* @en Make player leave the room

View File

@@ -94,6 +94,7 @@ export abstract class Room<TState = any, TPlayerData = Record<string, unknown>>
private _lastTickTime = 0;
private _broadcastFn: ((type: string, data: unknown) => void) | null = null;
private _sendFn: ((conn: any, type: string, data: unknown) => void) | null = null;
private _sendBinaryFn: ((conn: any, data: Uint8Array) => void) | null = null;
private _disposeFn: (() => void) | null = null;
// ========================================================================
@@ -269,11 +270,13 @@ export abstract class Room<TState = any, TPlayerData = Record<string, unknown>>
_init(options: {
id: string
sendFn: (conn: any, type: string, data: unknown) => void
sendBinaryFn?: (conn: any, data: Uint8Array) => void
broadcastFn: (type: string, data: unknown) => void
disposeFn: () => void
}): void {
this._id = options.id;
this._sendFn = options.sendFn;
this._sendBinaryFn = options.sendBinaryFn ?? null;
this._broadcastFn = options.broadcastFn;
this._disposeFn = options.disposeFn;
}
@@ -299,6 +302,7 @@ export abstract class Room<TState = any, TPlayerData = Record<string, unknown>>
roomId: this._id,
conn,
sendFn: this._sendFn!,
sendBinaryFn: this._sendBinaryFn ?? undefined,
leaveFn: (p, reason) => this._removePlayer(p.id, reason)
});

View File

@@ -26,17 +26,53 @@ interface RoomDefinition {
/**
* @zh 房间管理器
* @en Room manager
*
* @zh 管理房间的创建、加入、离开等操作。可被 DistributedRoomManager 继承以支持分布式功能。
* @en Manages room creation, joining, leaving, etc. Can be extended by DistributedRoomManager for distributed features.
*/
export class RoomManager {
private _definitions: Map<string, RoomDefinition> = new Map();
private _rooms: Map<string, Room> = new Map();
private _playerToRoom: Map<string, string> = new Map();
private _nextRoomId = 1;
/**
* @zh 房间类型定义映射
* @en Room type definitions map
*/
protected _definitions: Map<string, RoomDefinition> = new Map();
private _sendFn: (conn: any, type: string, data: unknown) => void;
/**
* @zh 房间实例映射
* @en Room instances map
*/
protected _rooms: Map<string, Room> = new Map();
constructor(sendFn: (conn: any, type: string, data: unknown) => void) {
/**
* @zh 玩家到房间的映射
* @en Player to room mapping
*/
protected _playerToRoom: Map<string, string> = new Map();
/**
* @zh 下一个房间 ID 计数器
* @en Next room ID counter
*/
protected _nextRoomId = 1;
/**
* @zh 消息发送函数
* @en Message send function
*/
protected _sendFn: (conn: any, type: string, data: unknown) => void;
/**
* @zh 二进制发送函数
* @en Binary send function
*/
protected _sendBinaryFn?: (conn: any, data: Uint8Array) => void;
constructor(
sendFn: (conn: any, type: string, data: unknown) => void,
sendBinaryFn?: (conn: any, data: Uint8Array) => void
) {
this._sendFn = sendFn;
this._sendBinaryFn = sendBinaryFn;
}
/**
@@ -50,40 +86,40 @@ export class RoomManager {
/**
* @zh 创建房间
* @en Create room
*
* @param name - 房间类型名称 | Room type name
* @param options - 房间配置 | Room options
* @returns 房间实例或 null | Room instance or null
*/
async create(name: string, options?: RoomOptions): Promise<Room | null> {
const def = this._definitions.get(name);
if (!def) {
logger.warn(`Room type not found: ${name}`);
return null;
const room = await this._createRoomInstance(name, options);
if (room) {
await this._onRoomCreated(name, room);
logger.info(`Created: ${name} (${room.id})`);
}
const roomId = this._generateRoomId();
const room = new def.roomClass();
room._init({
id: roomId,
sendFn: this._sendFn,
broadcastFn: (type, data) => {
for (const player of room.players) {
player.send(type, data);
}
},
disposeFn: () => {
this._rooms.delete(roomId);
}
});
this._rooms.set(roomId, room);
await room._create(options);
logger.info(`Created: ${name} (${roomId})`);
return room;
}
/**
* @zh 房间创建后的回调
* @en Callback after room is created
*
* @param _name - 房间类型名称 | Room type name
* @param _room - 房间实例 | Room instance
*/
protected async _onRoomCreated(_name: string, _room: Room): Promise<void> {
// 子类可覆盖以添加分布式注册等逻辑 | Subclass can override to add distributed registration logic
}
/**
* @zh 加入或创建房间
* @en Join or create room
*
* @param name - 房间类型名称 | Room type name
* @param playerId - 玩家 ID | Player ID
* @param conn - 玩家连接 | Player connection
* @param options - 房间配置 | Room options
* @returns 房间和玩家实例或 null | Room and player instance or null
*/
async joinOrCreate(
name: string,
@@ -91,20 +127,20 @@ export class RoomManager {
conn: any,
options?: RoomOptions
): Promise<{ room: Room; player: Player } | null> {
// 查找可加入的房间
// 查找可加入的房间 | Find available room
let room = this._findAvailableRoom(name);
// 没有则创建
// 没有则创建 | Create if none exists
if (!room) {
room = await this.create(name, options);
if (!room) return null;
}
// 加入房间
// 加入房间 | Join room
const player = await room._addPlayer(playerId, conn);
if (!player) return null;
this._playerToRoom.set(playerId, room.id);
this._onPlayerJoined(playerId, room.id, player);
logger.info(`Player ${playerId} joined ${room.id}`);
return { room, player };
@@ -113,6 +149,11 @@ export class RoomManager {
/**
* @zh 加入指定房间
* @en Join specific room
*
* @param roomId - 房间 ID | Room ID
* @param playerId - 玩家 ID | Player ID
* @param conn - 玩家连接 | Player connection
* @returns 房间和玩家实例或 null | Room and player instance or null
*/
async joinById(
roomId: string,
@@ -125,7 +166,7 @@ export class RoomManager {
const player = await room._addPlayer(playerId, conn);
if (!player) return null;
this._playerToRoom.set(playerId, room.id);
this._onPlayerJoined(playerId, room.id, player);
logger.info(`Player ${playerId} joined ${room.id}`);
return { room, player };
@@ -134,6 +175,9 @@ export class RoomManager {
/**
* @zh 玩家离开
* @en Player leave
*
* @param playerId - 玩家 ID | Player ID
* @param reason - 离开原因 | Leave reason
*/
async leave(playerId: string, reason?: string): Promise<void> {
const roomId = this._playerToRoom.get(playerId);
@@ -144,7 +188,7 @@ export class RoomManager {
await room._removePlayer(playerId, reason);
}
this._playerToRoom.delete(playerId);
this._onPlayerLeft(playerId, roomId);
logger.info(`Player ${playerId} left ${roomId}`);
}
@@ -200,7 +244,14 @@ export class RoomManager {
);
}
private _findAvailableRoom(name: string): Room | null {
/**
* @zh 查找可用房间
* @en Find available room
*
* @param name - 房间类型名称 | Room type name
* @returns 可用房间或 null | Available room or null
*/
protected _findAvailableRoom(name: string): Room | null {
const def = this._definitions.get(name);
if (!def) return null;
@@ -218,7 +269,100 @@ export class RoomManager {
return null;
}
private _generateRoomId(): string {
/**
* @zh 生成房间 ID
* @en Generate room ID
*
* @returns 新的房间 ID | New room ID
*/
protected _generateRoomId(): string {
return `room_${this._nextRoomId++}`;
}
/**
* @zh 获取房间定义
* @en Get room definition
*
* @param name - 房间类型名称 | Room type name
* @returns 房间定义或 undefined | Room definition or undefined
*/
protected _getDefinition(name: string): RoomDefinition | undefined {
return this._definitions.get(name);
}
/**
* @zh 内部创建房间实例
* @en Internal create room instance
*
* @param name - 房间类型名称 | Room type name
* @param options - 房间配置 | Room options
* @param roomId - 可选的房间 ID用于分布式恢复 | Optional room ID (for distributed recovery)
* @returns 房间实例或 null | Room instance or null
*/
protected async _createRoomInstance(
name: string,
options?: RoomOptions,
roomId?: string
): Promise<Room | null> {
const def = this._definitions.get(name);
if (!def) {
logger.warn(`Room type not found: ${name}`);
return null;
}
const finalRoomId = roomId ?? this._generateRoomId();
const room = new def.roomClass();
room._init({
id: finalRoomId,
sendFn: this._sendFn,
sendBinaryFn: this._sendBinaryFn,
broadcastFn: (type, data) => {
for (const player of room.players) {
player.send(type, data);
}
},
disposeFn: () => {
this._onRoomDisposed(finalRoomId);
}
});
this._rooms.set(finalRoomId, room);
await room._create(options);
return room;
}
/**
* @zh 房间销毁回调
* @en Room disposed callback
*
* @param roomId - 房间 ID | Room ID
*/
protected _onRoomDisposed(roomId: string): void {
this._rooms.delete(roomId);
}
/**
* @zh 玩家加入房间后的回调
* @en Callback after player joins room
*
* @param playerId - 玩家 ID | Player ID
* @param roomId - 房间 ID | Room ID
* @param player - 玩家实例 | Player instance
*/
protected _onPlayerJoined(playerId: string, roomId: string, _player: Player): void {
this._playerToRoom.set(playerId, roomId);
}
/**
* @zh 玩家离开房间后的回调
* @en Callback after player leaves room
*
* @param playerId - 玩家 ID | Player ID
* @param _roomId - 房间 ID | Room ID
*/
protected _onPlayerLeft(playerId: string, _roomId: string): void {
this._playerToRoom.delete(playerId);
}
}

View File

@@ -0,0 +1,97 @@
/**
* @zh 基础验证器抽象类
* @en Base validator abstract class
*
* @zh 所有验证器的基类,提供通用的验证逻辑
* @en Base class for all validators, providing common validation logic
*/
import type {
Validator,
ValidationResult,
ValidatorOptions
} from './types.js';
/**
* @zh 基础验证器抽象类
* @en Base validator abstract class
*/
export abstract class BaseValidator<T> implements Validator<T> {
abstract readonly typeName: string;
protected _options: ValidatorOptions = {};
/**
* @zh 核心验证逻辑(子类实现)
* @en Core validation logic (implemented by subclass)
*/
protected abstract _validate(value: unknown, path: string[]): ValidationResult<T>;
validate(value: unknown, path: string[] = []): ValidationResult<T> {
// Handle undefined
if (value === undefined) {
if (this._options.isOptional) {
if (this._options.defaultValue !== undefined) {
return { success: true, data: this._options.defaultValue as T };
}
return { success: true, data: undefined as T };
}
return {
success: false,
error: {
path,
message: 'Required',
expected: this.typeName,
received: undefined
}
};
}
// Handle null
if (value === null) {
if (this._options.isNullable) {
return { success: true, data: null as T };
}
return {
success: false,
error: {
path,
message: 'Expected non-null value',
expected: this.typeName,
received: null
}
};
}
return this._validate(value, path);
}
is(value: unknown): value is T {
return this.validate(value).success;
}
optional(): Validator<T | undefined> {
const clone = this._clone();
clone._options.isOptional = true;
return clone as unknown as Validator<T | undefined>;
}
default(defaultValue: T): Validator<T> {
const clone = this._clone();
clone._options.isOptional = true;
clone._options.defaultValue = defaultValue;
return clone;
}
nullable(): Validator<T | null> {
const clone = this._clone();
clone._options.isNullable = true;
return clone as unknown as Validator<T | null>;
}
/**
* @zh 克隆验证器(用于链式调用)
* @en Clone validator (for chaining)
*/
protected abstract _clone(): BaseValidator<T>;
}

View File

@@ -0,0 +1,558 @@
/**
* @zh 复合类型验证器
* @en Composite type validators
*/
import type {
Validator,
ValidationResult,
ObjectShape,
InferShape
} from './types.js';
import { BaseValidator } from './base.js';
// ============================================================================
// Object Validator
// ============================================================================
/**
* @zh 对象验证选项
* @en Object validation options
*/
export interface ObjectValidatorOptions {
strict?: boolean;
}
/**
* @zh 对象验证器
* @en Object validator
*/
export class ObjectValidator<T extends ObjectShape> extends BaseValidator<InferShape<T>> {
readonly typeName = 'object';
private readonly _shape: T;
private _objectOptions: ObjectValidatorOptions = {};
constructor(shape: T) {
super();
this._shape = shape;
}
protected _validate(value: unknown, path: string[]): ValidationResult<InferShape<T>> {
if (typeof value !== 'object' || value === null || Array.isArray(value)) {
return {
success: false,
error: {
path,
message: `Expected object, received ${Array.isArray(value) ? 'array' : typeof value}`,
expected: 'object',
received: value
}
};
}
const result: Record<string, unknown> = {};
const obj = value as Record<string, unknown>;
// Validate each field in shape
for (const [key, validator] of Object.entries(this._shape)) {
const fieldValue = obj[key];
const fieldPath = [...path, key];
const fieldResult = validator.validate(fieldValue, fieldPath);
if (!fieldResult.success) {
return fieldResult as ValidationResult<InferShape<T>>;
}
result[key] = fieldResult.data;
}
// Strict mode: check for unknown keys
if (this._objectOptions.strict) {
const knownKeys = new Set(Object.keys(this._shape));
for (const key of Object.keys(obj)) {
if (!knownKeys.has(key)) {
return {
success: false,
error: {
path: [...path, key],
message: `Unknown key "${key}"`,
expected: 'known key',
received: key
}
};
}
}
}
return { success: true, data: result as InferShape<T> };
}
protected _clone(): ObjectValidator<T> {
const clone = new ObjectValidator(this._shape);
clone._options = { ...this._options };
clone._objectOptions = { ...this._objectOptions };
return clone;
}
/**
* @zh 严格模式(不允许额外字段)
* @en Strict mode (no extra fields allowed)
*/
strict(): ObjectValidator<T> {
const clone = this._clone();
clone._objectOptions.strict = true;
return clone;
}
/**
* @zh 部分模式(所有字段可选)
* @en Partial mode (all fields optional)
*/
partial(): ObjectValidator<{
[K in keyof T]: ReturnType<T[K]['optional']>;
}> {
const partialShape: Record<string, Validator<unknown>> = {};
for (const [key, validator] of Object.entries(this._shape)) {
partialShape[key] = validator.optional();
}
return new ObjectValidator(partialShape) as any;
}
/**
* @zh 选择部分字段
* @en Pick specific fields
*/
pick<K extends keyof T>(...keys: K[]): ObjectValidator<Pick<T, K>> {
const pickedShape: Record<string, Validator<unknown>> = {};
for (const key of keys) {
pickedShape[key as string] = this._shape[key];
}
return new ObjectValidator(pickedShape) as any;
}
/**
* @zh 排除部分字段
* @en Omit specific fields
*/
omit<K extends keyof T>(...keys: K[]): ObjectValidator<Omit<T, K>> {
const keySet = new Set(keys as string[]);
const omittedShape: Record<string, Validator<unknown>> = {};
for (const [key, validator] of Object.entries(this._shape)) {
if (!keySet.has(key)) {
omittedShape[key] = validator;
}
}
return new ObjectValidator(omittedShape) as any;
}
/**
* @zh 扩展对象 Schema
* @en Extend object schema
*/
extend<U extends ObjectShape>(shape: U): ObjectValidator<T & U> {
const extendedShape = { ...this._shape, ...shape };
return new ObjectValidator(extendedShape) as any;
}
}
// ============================================================================
// Array Validator
// ============================================================================
/**
* @zh 数组验证选项
* @en Array validation options
*/
export interface ArrayValidatorOptions {
minLength?: number;
maxLength?: number;
}
/**
* @zh 数组验证器
* @en Array validator
*/
export class ArrayValidator<T> extends BaseValidator<T[]> {
readonly typeName = 'array';
private readonly _element: Validator<T>;
private _arrayOptions: ArrayValidatorOptions = {};
constructor(element: Validator<T>) {
super();
this._element = element;
}
protected _validate(value: unknown, path: string[]): ValidationResult<T[]> {
if (!Array.isArray(value)) {
return {
success: false,
error: {
path,
message: `Expected array, received ${typeof value}`,
expected: 'array',
received: value
}
};
}
const { minLength, maxLength } = this._arrayOptions;
if (minLength !== undefined && value.length < minLength) {
return {
success: false,
error: {
path,
message: `Array must have at least ${minLength} items`,
expected: `array(minLength: ${minLength})`,
received: value
}
};
}
if (maxLength !== undefined && value.length > maxLength) {
return {
success: false,
error: {
path,
message: `Array must have at most ${maxLength} items`,
expected: `array(maxLength: ${maxLength})`,
received: value
}
};
}
const result: T[] = [];
for (let i = 0; i < value.length; i++) {
const itemPath = [...path, String(i)];
const itemResult = this._element.validate(value[i], itemPath);
if (!itemResult.success) {
return itemResult as ValidationResult<T[]>;
}
result.push(itemResult.data);
}
return { success: true, data: result };
}
protected _clone(): ArrayValidator<T> {
const clone = new ArrayValidator(this._element);
clone._options = { ...this._options };
clone._arrayOptions = { ...this._arrayOptions };
return clone;
}
/**
* @zh 设置最小长度
* @en Set minimum length
*/
min(length: number): ArrayValidator<T> {
const clone = this._clone();
clone._arrayOptions.minLength = length;
return clone;
}
/**
* @zh 设置最大长度
* @en Set maximum length
*/
max(length: number): ArrayValidator<T> {
const clone = this._clone();
clone._arrayOptions.maxLength = length;
return clone;
}
/**
* @zh 设置长度范围
* @en Set length range
*/
length(min: number, max: number): ArrayValidator<T> {
const clone = this._clone();
clone._arrayOptions.minLength = min;
clone._arrayOptions.maxLength = max;
return clone;
}
/**
* @zh 要求非空数组
* @en Require non-empty array
*/
nonempty(): ArrayValidator<T> {
return this.min(1);
}
}
// ============================================================================
// Tuple Validator
// ============================================================================
/**
* @zh 元组验证器
* @en Tuple validator
*/
export class TupleValidator<T extends readonly Validator<unknown>[]> extends BaseValidator<{
[K in keyof T]: T[K] extends Validator<infer U> ? U : never;
}> {
readonly typeName = 'tuple';
private readonly _elements: T;
constructor(elements: T) {
super();
this._elements = elements;
}
protected _validate(value: unknown, path: string[]): ValidationResult<{
[K in keyof T]: T[K] extends Validator<infer U> ? U : never;
}> {
if (!Array.isArray(value)) {
return {
success: false,
error: {
path,
message: `Expected tuple, received ${typeof value}`,
expected: 'tuple',
received: value
}
};
}
if (value.length !== this._elements.length) {
return {
success: false,
error: {
path,
message: `Expected tuple of length ${this._elements.length}, received length ${value.length}`,
expected: `tuple(length: ${this._elements.length})`,
received: value
}
};
}
const result: unknown[] = [];
for (let i = 0; i < this._elements.length; i++) {
const itemPath = [...path, String(i)];
const itemResult = this._elements[i].validate(value[i], itemPath);
if (!itemResult.success) {
return itemResult as any;
}
result.push(itemResult.data);
}
return { success: true, data: result as any };
}
protected _clone(): TupleValidator<T> {
const clone = new TupleValidator(this._elements);
clone._options = { ...this._options };
return clone;
}
}
// ============================================================================
// Union Validator
// ============================================================================
/**
* @zh 联合类型验证器
* @en Union type validator
*/
export class UnionValidator<T extends readonly Validator<unknown>[]> extends BaseValidator<
T[number] extends Validator<infer U> ? U : never
> {
readonly typeName: string;
private readonly _variants: T;
constructor(variants: T) {
super();
this._variants = variants;
this.typeName = `union(${variants.map(v => v.typeName).join(' | ')})`;
}
protected _validate(value: unknown, path: string[]): ValidationResult<
T[number] extends Validator<infer U> ? U : never
> {
const errors: string[] = [];
for (const variant of this._variants) {
const result = variant.validate(value, path);
if (result.success) {
return result as any;
}
errors.push(variant.typeName);
}
return {
success: false,
error: {
path,
message: `Expected one of: ${errors.join(', ')}`,
expected: this.typeName,
received: value
}
};
}
protected _clone(): UnionValidator<T> {
const clone = new UnionValidator(this._variants);
clone._options = { ...this._options };
return clone;
}
}
// ============================================================================
// Record Validator
// ============================================================================
/**
* @zh 记录类型验证器
* @en Record type validator
*/
export class RecordValidator<T> extends BaseValidator<Record<string, T>> {
readonly typeName = 'record';
private readonly _valueValidator: Validator<T>;
constructor(valueValidator: Validator<T>) {
super();
this._valueValidator = valueValidator;
}
protected _validate(value: unknown, path: string[]): ValidationResult<Record<string, T>> {
if (typeof value !== 'object' || value === null || Array.isArray(value)) {
return {
success: false,
error: {
path,
message: `Expected object, received ${Array.isArray(value) ? 'array' : typeof value}`,
expected: 'record',
received: value
}
};
}
const result: Record<string, T> = {};
const obj = value as Record<string, unknown>;
for (const [key, val] of Object.entries(obj)) {
const fieldPath = [...path, key];
const fieldResult = this._valueValidator.validate(val, fieldPath);
if (!fieldResult.success) {
return fieldResult as ValidationResult<Record<string, T>>;
}
result[key] = fieldResult.data;
}
return { success: true, data: result };
}
protected _clone(): RecordValidator<T> {
const clone = new RecordValidator(this._valueValidator);
clone._options = { ...this._options };
return clone;
}
}
// ============================================================================
// Enum Validator
// ============================================================================
/**
* @zh 枚举验证器
* @en Enum validator
*/
export class EnumValidator<T extends readonly (string | number)[]> extends BaseValidator<T[number]> {
readonly typeName: string;
private readonly _values: Set<string | number>;
private readonly _valuesArray: T;
constructor(values: T) {
super();
this._valuesArray = values;
this._values = new Set(values);
this.typeName = `enum(${values.map(v => JSON.stringify(v)).join(', ')})`;
}
protected _validate(value: unknown, path: string[]): ValidationResult<T[number]> {
if (!this._values.has(value as string | number)) {
return {
success: false,
error: {
path,
message: `Expected one of: ${this._valuesArray.map(v => JSON.stringify(v)).join(', ')}`,
expected: this.typeName,
received: value
}
};
}
return { success: true, data: value as T[number] };
}
protected _clone(): EnumValidator<T> {
const clone = new EnumValidator(this._valuesArray);
clone._options = { ...this._options };
return clone;
}
}
// ============================================================================
// Factory Functions
// ============================================================================
/**
* @zh 创建对象验证器
* @en Create object validator
*/
export function object<T extends ObjectShape>(shape: T): ObjectValidator<T> {
return new ObjectValidator(shape);
}
/**
* @zh 创建数组验证器
* @en Create array validator
*/
export function array<T>(element: Validator<T>): ArrayValidator<T> {
return new ArrayValidator(element);
}
/**
* @zh 创建元组验证器
* @en Create tuple validator
*/
export function tuple<T extends readonly Validator<unknown>[]>(
elements: T
): TupleValidator<T> {
return new TupleValidator(elements);
}
/**
* @zh 创建联合类型验证器
* @en Create union type validator
*/
export function union<T extends readonly Validator<unknown>[]>(
variants: T
): UnionValidator<T> {
return new UnionValidator(variants);
}
/**
* @zh 创建记录类型验证器
* @en Create record type validator
*/
export function record<T>(valueValidator: Validator<T>): RecordValidator<T> {
return new RecordValidator(valueValidator);
}
/**
* @zh 创建枚举验证器
* @en Create enum validator
*/
export function nativeEnum<T extends readonly (string | number)[]>(
values: T
): EnumValidator<T> {
return new EnumValidator(values);
}

View File

@@ -0,0 +1,248 @@
/**
* @zh Schema 验证系统
* @en Schema validation system
*
* @zh 轻量级自定义验证系统,提供类型安全的运行时验证
* @en Lightweight custom validation system with type-safe runtime validation
*
* @example
* ```typescript
* import { s } from '@esengine/server';
*
* // 定义 Schema | Define schema
* const MoveSchema = s.object({
* x: s.number(),
* y: s.number(),
* speed: s.number().optional()
* });
*
* // 推断类型 | Infer type
* type Move = s.infer<typeof MoveSchema>;
*
* // 验证数据 | Validate data
* const result = MoveSchema.validate(data);
* if (result.success) {
* console.log(result.data); // 类型安全 | Type-safe
* } else {
* console.error(result.error);
* }
*
* // 与 defineApi 集成 | Integrate with defineApi
* export default defineApi<Move, void>({
* schema: MoveSchema,
* handler(req, ctx) {
* // req 已验证,类型安全 | req is validated, type-safe
* }
* });
* ```
*/
// ============================================================================
// Type Exports
// ============================================================================
export type {
Validator,
ValidationResult,
ValidationSuccess,
ValidationFailure,
ValidationError,
Infer,
ObjectShape,
InferShape,
ValidatorOptions
} from './types.js';
// ============================================================================
// Base Validator Export
// ============================================================================
export { BaseValidator } from './base.js';
// ============================================================================
// Validator Exports
// ============================================================================
export {
StringValidator,
NumberValidator,
BooleanValidator,
LiteralValidator,
AnyValidator,
string,
number,
boolean,
literal,
any
} from './primitives.js';
export type {
StringValidatorOptions,
NumberValidatorOptions
} from './primitives.js';
export {
ObjectValidator,
ArrayValidator,
TupleValidator,
UnionValidator,
RecordValidator,
EnumValidator,
object,
array,
tuple,
union,
record,
nativeEnum
} from './composites.js';
export type {
ObjectValidatorOptions,
ArrayValidatorOptions
} from './composites.js';
// ============================================================================
// Schema Builder (s namespace)
// ============================================================================
import type { Infer, Validator } from './types.js';
import {
string,
number,
boolean,
literal,
any
} from './primitives.js';
import {
object,
array,
tuple,
union,
record,
nativeEnum
} from './composites.js';
/**
* @zh Schema 构建器命名空间
* @en Schema builder namespace
*
* @example
* ```typescript
* import { s } from '@esengine/server';
*
* const UserSchema = s.object({
* id: s.string(),
* name: s.string().min(1).max(50),
* age: s.number().int().min(0).max(150),
* email: s.string().email().optional(),
* role: s.enum(['admin', 'user', 'guest'] as const),
* tags: s.array(s.string()),
* metadata: s.record(s.any()).optional()
* });
*
* type User = s.infer<typeof UserSchema>;
* ```
*/
export const s = {
// Primitives
string,
number,
boolean,
literal,
any,
// Composites
object,
array,
tuple,
union,
record,
/**
* @zh 创建枚举验证器
* @en Create enum validator
*
* @example
* ```typescript
* const RoleSchema = s.enum(['admin', 'user', 'guest'] as const);
* type Role = s.infer<typeof RoleSchema>; // 'admin' | 'user' | 'guest'
* ```
*/
enum: nativeEnum,
/**
* @zh 类型推断辅助(仅用于类型层面)
* @en Type inference helper (type-level only)
*
* @zh 这是一个类型辅助,用于从验证器推断类型
* @en This is a type helper to infer types from validators
*/
infer: undefined as unknown as <V extends Validator<unknown>>() => Infer<V>
} as const;
/**
* @zh 类型推断辅助类型
* @en Type inference helper type
*/
export namespace s {
/**
* @zh 从验证器推断类型
* @en Infer type from validator
*/
export type infer<V extends Validator<unknown>> = Infer<V>;
}
// ============================================================================
// Validation Helpers
// ============================================================================
/**
* @zh 验证数据并抛出错误
* @en Validate data and throw error
*
* @param validator - @zh 验证器 @en Validator
* @param value - @zh 待验证的值 @en Value to validate
* @returns @zh 验证通过的数据 @en Validated data
* @throws @zh 验证失败时抛出错误 @en Throws when validation fails
*/
export function parse<T>(validator: Validator<T>, value: unknown): T {
const result = validator.validate(value);
if (!result.success) {
const pathStr = result.error.path.length > 0
? ` at "${result.error.path.join('.')}"`
: '';
throw new Error(`Validation failed${pathStr}: ${result.error.message}`);
}
return result.data;
}
/**
* @zh 安全验证数据(不抛出错误)
* @en Safely validate data (no throw)
*
* @param validator - @zh 验证器 @en Validator
* @param value - @zh 待验证的值 @en Value to validate
* @returns @zh 验证结果 @en Validation result
*/
export function safeParse<T>(validator: Validator<T>, value: unknown) {
return validator.validate(value);
}
/**
* @zh 创建类型守卫函数
* @en Create type guard function
*
* @param validator - @zh 验证器 @en Validator
* @returns @zh 类型守卫函数 @en Type guard function
*
* @example
* ```typescript
* const isUser = createGuard(UserSchema);
* if (isUser(data)) {
* // data is User
* }
* ```
*/
export function createGuard<T>(validator: Validator<T>): (value: unknown) => value is T {
return (value: unknown): value is T => validator.is(value);
}

View File

@@ -0,0 +1,430 @@
/**
* @zh 基础类型验证器
* @en Primitive type validators
*/
import type { ValidationResult } from './types.js';
import { BaseValidator } from './base.js';
// ============================================================================
// String Validator
// ============================================================================
/**
* @zh 字符串验证选项
* @en String validation options
*/
export interface StringValidatorOptions {
minLength?: number;
maxLength?: number;
pattern?: RegExp;
}
/**
* @zh 字符串验证器
* @en String validator
*/
export class StringValidator extends BaseValidator<string> {
readonly typeName = 'string';
private _stringOptions: StringValidatorOptions = {};
protected _validate(value: unknown, path: string[]): ValidationResult<string> {
if (typeof value !== 'string') {
return {
success: false,
error: {
path,
message: `Expected string, received ${typeof value}`,
expected: 'string',
received: value
}
};
}
const { minLength, maxLength, pattern } = this._stringOptions;
if (minLength !== undefined && value.length < minLength) {
return {
success: false,
error: {
path,
message: `String must be at least ${minLength} characters`,
expected: `string(minLength: ${minLength})`,
received: value
}
};
}
if (maxLength !== undefined && value.length > maxLength) {
return {
success: false,
error: {
path,
message: `String must be at most ${maxLength} characters`,
expected: `string(maxLength: ${maxLength})`,
received: value
}
};
}
if (pattern && !pattern.test(value)) {
return {
success: false,
error: {
path,
message: `String does not match pattern ${pattern}`,
expected: `string(pattern: ${pattern})`,
received: value
}
};
}
return { success: true, data: value };
}
protected _clone(): StringValidator {
const clone = new StringValidator();
clone._options = { ...this._options };
clone._stringOptions = { ...this._stringOptions };
return clone;
}
/**
* @zh 设置最小长度
* @en Set minimum length
*/
min(length: number): StringValidator {
const clone = this._clone();
clone._stringOptions.minLength = length;
return clone;
}
/**
* @zh 设置最大长度
* @en Set maximum length
*/
max(length: number): StringValidator {
const clone = this._clone();
clone._stringOptions.maxLength = length;
return clone;
}
/**
* @zh 设置长度范围
* @en Set length range
*/
length(min: number, max: number): StringValidator {
const clone = this._clone();
clone._stringOptions.minLength = min;
clone._stringOptions.maxLength = max;
return clone;
}
/**
* @zh 设置正则模式
* @en Set regex pattern
*/
regex(pattern: RegExp): StringValidator {
const clone = this._clone();
clone._stringOptions.pattern = pattern;
return clone;
}
/**
* @zh 邮箱格式验证
* @en Email format validation
*/
email(): StringValidator {
return this.regex(/^[^\s@]+@[^\s@]+\.[^\s@]+$/);
}
/**
* @zh URL 格式验证
* @en URL format validation
*/
url(): StringValidator {
return this.regex(/^https?:\/\/.+/);
}
}
// ============================================================================
// Number Validator
// ============================================================================
/**
* @zh 数字验证选项
* @en Number validation options
*/
export interface NumberValidatorOptions {
min?: number;
max?: number;
integer?: boolean;
}
/**
* @zh 数字验证器
* @en Number validator
*/
export class NumberValidator extends BaseValidator<number> {
readonly typeName = 'number';
private _numberOptions: NumberValidatorOptions = {};
protected _validate(value: unknown, path: string[]): ValidationResult<number> {
if (typeof value !== 'number' || Number.isNaN(value)) {
return {
success: false,
error: {
path,
message: `Expected number, received ${typeof value}`,
expected: 'number',
received: value
}
};
}
const { min, max, integer } = this._numberOptions;
if (integer && !Number.isInteger(value)) {
return {
success: false,
error: {
path,
message: 'Expected integer',
expected: 'integer',
received: value
}
};
}
if (min !== undefined && value < min) {
return {
success: false,
error: {
path,
message: `Number must be >= ${min}`,
expected: `number(min: ${min})`,
received: value
}
};
}
if (max !== undefined && value > max) {
return {
success: false,
error: {
path,
message: `Number must be <= ${max}`,
expected: `number(max: ${max})`,
received: value
}
};
}
return { success: true, data: value };
}
protected _clone(): NumberValidator {
const clone = new NumberValidator();
clone._options = { ...this._options };
clone._numberOptions = { ...this._numberOptions };
return clone;
}
/**
* @zh 设置最小值
* @en Set minimum value
*/
min(value: number): NumberValidator {
const clone = this._clone();
clone._numberOptions.min = value;
return clone;
}
/**
* @zh 设置最大值
* @en Set maximum value
*/
max(value: number): NumberValidator {
const clone = this._clone();
clone._numberOptions.max = value;
return clone;
}
/**
* @zh 设置范围
* @en Set range
*/
range(min: number, max: number): NumberValidator {
const clone = this._clone();
clone._numberOptions.min = min;
clone._numberOptions.max = max;
return clone;
}
/**
* @zh 要求为整数
* @en Require integer
*/
int(): NumberValidator {
const clone = this._clone();
clone._numberOptions.integer = true;
return clone;
}
/**
* @zh 要求为正数
* @en Require positive
*/
positive(): NumberValidator {
return this.min(0);
}
/**
* @zh 要求为负数
* @en Require negative
*/
negative(): NumberValidator {
return this.max(0);
}
}
// ============================================================================
// Boolean Validator
// ============================================================================
/**
* @zh 布尔验证器
* @en Boolean validator
*/
export class BooleanValidator extends BaseValidator<boolean> {
readonly typeName = 'boolean';
protected _validate(value: unknown, path: string[]): ValidationResult<boolean> {
if (typeof value !== 'boolean') {
return {
success: false,
error: {
path,
message: `Expected boolean, received ${typeof value}`,
expected: 'boolean',
received: value
}
};
}
return { success: true, data: value };
}
protected _clone(): BooleanValidator {
const clone = new BooleanValidator();
clone._options = { ...this._options };
return clone;
}
}
// ============================================================================
// Literal Validator
// ============================================================================
/**
* @zh 字面量验证器
* @en Literal validator
*/
export class LiteralValidator<T extends string | number | boolean> extends BaseValidator<T> {
readonly typeName: string;
private readonly _literal: T;
constructor(literal: T) {
super();
this._literal = literal;
this.typeName = `literal(${JSON.stringify(literal)})`;
}
protected _validate(value: unknown, path: string[]): ValidationResult<T> {
if (value !== this._literal) {
return {
success: false,
error: {
path,
message: `Expected ${JSON.stringify(this._literal)}, received ${JSON.stringify(value)}`,
expected: this.typeName,
received: value
}
};
}
return { success: true, data: value as T };
}
protected _clone(): LiteralValidator<T> {
const clone = new LiteralValidator(this._literal);
clone._options = { ...this._options };
return clone;
}
}
// ============================================================================
// Any Validator
// ============================================================================
/**
* @zh 任意类型验证器
* @en Any type validator
*/
export class AnyValidator extends BaseValidator<unknown> {
readonly typeName = 'any';
protected _validate(value: unknown): ValidationResult<unknown> {
return { success: true, data: value };
}
protected _clone(): AnyValidator {
const clone = new AnyValidator();
clone._options = { ...this._options };
return clone;
}
}
// ============================================================================
// Factory Functions
// ============================================================================
/**
* @zh 创建字符串验证器
* @en Create string validator
*/
export function string(): StringValidator {
return new StringValidator();
}
/**
* @zh 创建数字验证器
* @en Create number validator
*/
export function number(): NumberValidator {
return new NumberValidator();
}
/**
* @zh 创建布尔验证器
* @en Create boolean validator
*/
export function boolean(): BooleanValidator {
return new BooleanValidator();
}
/**
* @zh 创建字面量验证器
* @en Create literal validator
*/
export function literal<T extends string | number | boolean>(value: T): LiteralValidator<T> {
return new LiteralValidator(value);
}
/**
* @zh 创建任意类型验证器
* @en Create any type validator
*/
export function any(): AnyValidator {
return new AnyValidator();
}

View File

@@ -0,0 +1,165 @@
/**
* @zh Schema 验证类型定义
* @en Schema validation type definitions
*/
// ============================================================================
// Validation Result
// ============================================================================
/**
* @zh 验证错误
* @en Validation error
*/
export interface ValidationError {
/**
* @zh 错误路径(如 ['user', 'name']
* @en Error path (e.g., ['user', 'name'])
*/
path: string[];
/**
* @zh 错误消息
* @en Error message
*/
message: string;
/**
* @zh 预期类型
* @en Expected type
*/
expected?: string;
/**
* @zh 实际值
* @en Actual value
*/
received?: unknown;
}
/**
* @zh 验证成功结果
* @en Validation success result
*/
export interface ValidationSuccess<T> {
success: true;
data: T;
}
/**
* @zh 验证失败结果
* @en Validation failure result
*/
export interface ValidationFailure {
success: false;
error: ValidationError;
}
/**
* @zh 验证结果
* @en Validation result
*/
export type ValidationResult<T> = ValidationSuccess<T> | ValidationFailure;
// ============================================================================
// Validator Interface
// ============================================================================
/**
* @zh 验证器接口
* @en Validator interface
*/
export interface Validator<T> {
/**
* @zh 类型名称(用于错误消息)
* @en Type name (for error messages)
*/
readonly typeName: string;
/**
* @zh 验证值
* @en Validate value
*
* @param value - @zh 待验证的值 @en Value to validate
* @param path - @zh 当前路径(用于错误报告)@en Current path (for error reporting)
* @returns @zh 验证结果 @en Validation result
*/
validate(value: unknown, path?: string[]): ValidationResult<T>;
/**
* @zh 类型守卫检查
* @en Type guard check
*
* @param value - @zh 待检查的值 @en Value to check
* @returns @zh 是否为指定类型 @en Whether value is of specified type
*/
is(value: unknown): value is T;
/**
* @zh 标记为可选
* @en Mark as optional
*/
optional(): Validator<T | undefined>;
/**
* @zh 设置默认值
* @en Set default value
*
* @param defaultValue - @zh 默认值 @en Default value
*/
default(defaultValue: T): Validator<T>;
/**
* @zh 允许 null
* @en Allow null
*/
nullable(): Validator<T | null>;
}
// ============================================================================
// Helper Types
// ============================================================================
/**
* @zh 从验证器推断类型
* @en Infer type from validator
*/
export type Infer<V extends Validator<unknown>> = V extends Validator<infer T> ? T : never;
/**
* @zh 对象 Schema 定义
* @en Object schema definition
*/
export type ObjectShape = Record<string, Validator<unknown>>;
/**
* @zh 从对象 Shape 推断类型
* @en Infer type from object shape
*/
export type InferShape<T extends ObjectShape> = {
[K in keyof T]: Infer<T[K]>;
};
/**
* @zh 验证器选项
* @en Validator options
*/
export interface ValidatorOptions {
/**
* @zh 是否可选
* @en Whether optional
*/
isOptional?: boolean;
/**
* @zh 默认值
* @en Default value
*/
defaultValue?: unknown;
/**
* @zh 是否允许 null
* @en Whether nullable
*/
isNullable?: boolean;
}

View File

@@ -5,6 +5,7 @@
import type { Connection, ProtocolDef } from '@esengine/rpc';
import type { HttpRoutes, CorsOptions, HttpRequest, HttpResponse } from '../http/types.js';
import type { DistributedConfig } from '../distributed/types.js';
// ============================================================================
// Server Config
@@ -96,6 +97,26 @@ export interface ServerConfig {
* @en Connection closed callback
*/
onDisconnect?: (conn: ServerConnection) => void | Promise<void>
/**
* @zh 分布式模式配置
* @en Distributed mode configuration
*
* @example
* ```typescript
* const server = await createServer({
* port: 3000,
* distributed: {
* enabled: true,
* adapter: new RedisAdapter({ factory: () => new Redis() }),
* serverId: 'server-1',
* serverAddress: 'ws://192.168.1.100',
* serverPort: 3000
* }
* });
* ```
*/
distributed?: DistributedConfig
}
// ============================================================================
@@ -152,12 +173,6 @@ export interface ApiDefinition<TReq = unknown, TRes = unknown, TData = Record<st
* @en API handler function
*/
handler: (req: TReq, ctx: ApiContext<TData>) => TRes | Promise<TRes>
/**
* @zh 请求验证函数(可选)
* @en Request validation function (optional)
*/
validate?: (req: unknown) => req is TReq
}
// ============================================================================