feat(server): add Schema validation system and binary encoding optimization (#421)

* feat(server): add distributed room support

- Add DistributedRoomManager for multi-server room management
- Add MemoryAdapter for testing and standalone mode
- Add RedisAdapter for production multi-server deployments
- Add LoadBalancedRouter with 5 load balancing strategies
- Add distributed config option to createServer
- Add $redirect message for cross-server player redirection
- Add failover mechanism for automatic room recovery
- Add room:migrated and server:draining event types
- Update documentation (zh/en)

* feat(server): add Schema validation system and binary encoding optimization

## Schema Validation System
- Add lightweight schema validation system (s.object, s.string, s.number, etc.)
- Support auto type inference with Infer<> generic
- Integrate schema validation into API/message handlers
- Add defineApiWithSchema and defineMsgWithSchema helpers

## Binary Encoding Optimization
- Add native WebSocket binary frame support via sendBinary()
- Add PacketType.Binary for efficient binary data transmission
- Optimize ECSRoom.broadcastBinary() to use native binary

## Architecture Improvements
- Extract BaseValidator to separate file to eliminate code duplication
- Add ECSRoom export to main index.ts for better discoverability
- Add Core.worldManager initialization check in ECSRoom constructor
- Remove deprecated validate field from ApiDefinition (use schema instead)

## Documentation
- Add Schema validation documentation in Chinese and English

* fix(rpc): resolve ESLint warnings with proper types

- Replace `any` with proper WebSocket type in connection.ts
- Add IncomingMessage type for request handling in index.ts
- Use Record<string, Handler> pattern instead of `any` casting
- Replace `any` with `unknown` in ProtocolDef and type inference
This commit is contained in:
YHH
2026-01-02 17:18:13 +08:00
committed by GitHub
parent 69bb6bd946
commit f333b81298
44 changed files with 8405 additions and 362 deletions

View File

@@ -3,8 +3,8 @@
* @en RPC Server Module
*/
import { WebSocketServer, WebSocket } from 'ws'
import type { Server as HttpServer } from 'node:http'
import { WebSocketServer, WebSocket } from 'ws';
import type { Server as HttpServer } from 'node:http';
import type {
ProtocolDef,
ApiNames,
@@ -13,13 +13,13 @@ import type {
ApiOutput,
MsgData,
Packet,
PacketType,
Connection,
} from '../types'
import { RpcError, ErrorCode } from '../types'
import { json } from '../codec/json'
import type { Codec } from '../codec/types'
import { ServerConnection } from './connection'
Connection
} from '../types';
import type { IncomingMessage } from 'node:http';
import { RpcError, ErrorCode } from '../types';
import { json } from '../codec/json';
import type { Codec } from '../codec/types';
import { ServerConnection } from './connection';
// ============ Types ============
@@ -182,8 +182,8 @@ const PT = {
ApiResponse: 1,
ApiError: 2,
Message: 3,
Heartbeat: 9,
} as const
Heartbeat: 9
} as const;
/**
* @zh 创建 RPC 服务器
@@ -206,16 +206,22 @@ export function serve<P extends ProtocolDef, TConnData = unknown>(
_protocol: P,
options: ServeOptions<P, TConnData>
): RpcServer<P, TConnData> {
const codec = options.codec ?? json()
const connections: ServerConnection<TConnData>[] = []
let wss: WebSocketServer | null = null
let connIdCounter = 0
const codec = options.codec ?? json();
const connections: ServerConnection<TConnData>[] = [];
let wss: WebSocketServer | null = null;
let connIdCounter = 0;
const getClientIp = (ws: WebSocket, req: any): string => {
return req?.headers?.['x-forwarded-for']?.split(',')[0]?.trim()
const getClientIp = (_ws: WebSocket, req: IncomingMessage | undefined): string => {
const forwarded = req?.headers?.['x-forwarded-for'];
const forwardedIp = typeof forwarded === 'string'
? forwarded.split(',')[0]?.trim()
: Array.isArray(forwarded)
? forwarded[0]?.split(',')[0]?.trim()
: undefined;
return forwardedIp
|| req?.socket?.remoteAddress
|| 'unknown'
}
|| 'unknown';
};
const handleMessage = async (
conn: ServerConnection<TConnData>,
@@ -224,23 +230,23 @@ export function serve<P extends ProtocolDef, TConnData = unknown>(
try {
const packet = codec.decode(
typeof data === 'string' ? data : new Uint8Array(data)
)
);
const type = packet[0]
const type = packet[0];
if (type === PT.ApiRequest) {
const [, id, path, input] = packet as [number, number, string, unknown]
await handleApiRequest(conn, id, path, input)
const [, id, path, input] = packet as [number, number, string, unknown];
await handleApiRequest(conn, id, path, input);
} else if (type === PT.Message) {
const [, path, msgData] = packet as [number, string, unknown]
await handleMsg(conn, path, msgData)
const [, path, msgData] = packet as [number, string, unknown];
await handleMsg(conn, path, msgData);
} else if (type === PT.Heartbeat) {
conn.send(codec.encode([PT.Heartbeat]))
conn.send(codec.encode([PT.Heartbeat]));
}
} catch (err) {
options.onError?.(err as Error, conn)
options.onError?.(err as Error, conn);
}
}
};
const handleApiRequest = async (
conn: ServerConnection<TConnData>,
@@ -248,44 +254,46 @@ export function serve<P extends ProtocolDef, TConnData = unknown>(
path: string,
input: unknown
): Promise<void> => {
const handler = (options.api as any)[path]
const apiHandlers = options.api as Record<string, ApiHandler<unknown, unknown, TConnData> | undefined>;
const handler = apiHandlers[path];
if (!handler) {
const errPacket: Packet = [PT.ApiError, id, ErrorCode.NOT_FOUND, `API not found: ${path}`]
conn.send(codec.encode(errPacket))
return
const errPacket: Packet = [PT.ApiError, id, ErrorCode.NOT_FOUND, `API not found: ${path}`];
conn.send(codec.encode(errPacket));
return;
}
try {
const result = await handler(input, conn)
const resPacket: Packet = [PT.ApiResponse, id, result]
conn.send(codec.encode(resPacket))
const result = await handler(input, conn);
const resPacket: Packet = [PT.ApiResponse, id, result];
conn.send(codec.encode(resPacket));
} catch (err) {
if (err instanceof RpcError) {
const errPacket: Packet = [PT.ApiError, id, err.code, err.message]
conn.send(codec.encode(errPacket))
const errPacket: Packet = [PT.ApiError, id, err.code, err.message];
conn.send(codec.encode(errPacket));
} else {
const errPacket: Packet = [PT.ApiError, id, ErrorCode.INTERNAL_ERROR, 'Internal server error']
conn.send(codec.encode(errPacket))
options.onError?.(err as Error, conn)
const errPacket: Packet = [PT.ApiError, id, ErrorCode.INTERNAL_ERROR, 'Internal server error'];
conn.send(codec.encode(errPacket));
options.onError?.(err as Error, conn);
}
}
}
};
const handleMsg = async (
conn: ServerConnection<TConnData>,
path: string,
data: unknown
): Promise<void> => {
const handler = options.msg?.[path as MsgNames<P>]
const msgHandlers = options.msg as Record<string, MsgHandler<unknown, TConnData> | undefined> | undefined;
const handler = msgHandlers?.[path];
if (handler) {
await (handler as any)(data, conn)
await handler(data, conn);
}
}
};
const server: RpcServer<P, TConnData> = {
get connections() {
return connections as ReadonlyArray<Connection<TConnData>>
return connections as ReadonlyArray<Connection<TConnData>>;
},
async start() {
@@ -293,18 +301,18 @@ export function serve<P extends ProtocolDef, TConnData = unknown>(
// 根据配置创建 WebSocketServer
if (options.server) {
// 附加到已有的 HTTP 服务器
wss = new WebSocketServer({ server: options.server })
wss = new WebSocketServer({ server: options.server });
} else if (options.port) {
// 独立创建
wss = new WebSocketServer({ port: options.port })
wss = new WebSocketServer({ port: options.port });
} else {
throw new Error('Either port or server must be provided')
throw new Error('Either port or server must be provided');
}
wss.on('connection', async (ws, req) => {
const id = String(++connIdCounter)
const ip = getClientIp(ws, req)
const initialData = options.createConnData?.() ?? ({} as TConnData)
const id = String(++connIdCounter);
const ip = getClientIp(ws, req);
const initialData = options.createConnData?.() ?? ({} as TConnData);
const conn = new ServerConnection<TConnData>({
id,
@@ -312,70 +320,70 @@ export function serve<P extends ProtocolDef, TConnData = unknown>(
socket: ws,
initialData,
onClose: () => {
const idx = connections.indexOf(conn)
if (idx !== -1) connections.splice(idx, 1)
},
})
const idx = connections.indexOf(conn);
if (idx !== -1) connections.splice(idx, 1);
}
});
connections.push(conn)
connections.push(conn);
ws.on('message', (data) => {
handleMessage(conn, data as string | Buffer)
})
handleMessage(conn, data as string | Buffer);
});
ws.on('close', async (code, reason) => {
conn._markClosed()
const idx = connections.indexOf(conn)
if (idx !== -1) connections.splice(idx, 1)
await options.onDisconnect?.(conn, reason?.toString())
})
conn._markClosed();
const idx = connections.indexOf(conn);
if (idx !== -1) connections.splice(idx, 1);
await options.onDisconnect?.(conn, reason?.toString());
});
ws.on('error', (err) => {
options.onError?.(err, conn)
})
options.onError?.(err, conn);
});
await options.onConnect?.(conn)
})
await options.onConnect?.(conn);
});
// 如果使用已有的 HTTP 服务器WebSocketServer 不会触发 listening 事件
if (options.server) {
options.onStart?.(0) // 端口由 HTTP 服务器管理
resolve()
options.onStart?.(0); // 端口由 HTTP 服务器管理
resolve();
} else {
wss.on('listening', () => {
options.onStart?.(options.port!)
resolve()
})
options.onStart?.(options.port!);
resolve();
});
}
})
});
},
async stop() {
return new Promise((resolve, reject) => {
if (!wss) {
resolve()
return
resolve();
return;
}
for (const conn of connections) {
conn.close('Server shutting down')
conn.close('Server shutting down');
}
wss.close((err) => {
if (err) reject(err)
else resolve()
})
})
if (err) reject(err);
else resolve();
});
});
},
send(conn, name, data) {
const packet: Packet = [PT.Message, name as string, data]
;(conn as ServerConnection<TConnData>).send(codec.encode(packet))
;(conn as ServerConnection<TConnData>).send(codec.encode(packet));
},
broadcast(name, data, opts) {
const packet: Packet = [PT.Message, name as string, data]
const encoded = codec.encode(packet)
const packet: Packet = [PT.Message, name as string, data];
const encoded = codec.encode(packet);
const excludeSet = new Set(
Array.isArray(opts?.exclude)
@@ -383,15 +391,15 @@ export function serve<P extends ProtocolDef, TConnData = unknown>(
: opts?.exclude
? [opts.exclude]
: []
)
);
for (const conn of connections) {
if (!excludeSet.has(conn)) {
conn.send(encoded)
conn.send(encoded);
}
}
},
}
}
};
return server
return server;
}