feat(framework): server testing utils, transaction storage simplify, pathfinding tests (#384)

## Server Testing Utils
- Add TestServer, TestClient, MockRoom for unit testing
- Export testing utilities from @esengine/server/testing

## Transaction Storage (BREAKING)
- Simplify RedisStorage/MongoStorage to factory pattern only
- Remove DI client injection option
- Add lazy connection and Symbol.asyncDispose support
- Add 161 unit tests with full coverage

## Pathfinding Tests
- Add 150 unit tests covering all components
- BinaryHeap, Heuristics, AStarPathfinder, GridMap, NavMesh, PathSmoother

## Docs
- Update storage.md for new factory pattern API
This commit is contained in:
YHH
2025-12-29 15:02:13 +08:00
committed by GitHub
parent 10c3891abd
commit 3b978384c7
50 changed files with 7591 additions and 660 deletions

View File

@@ -10,8 +10,8 @@ import type {
ITransactionStorage,
TransactionLog,
TransactionState,
OperationLog,
} from '../core/types.js'
OperationLog
} from '../core/types.js';
/**
* @zh 内存存储配置
@@ -33,13 +33,13 @@ export interface MemoryStorageConfig {
* @en Suitable for single-machine development and testing, data is stored in memory only
*/
export class MemoryStorage implements ITransactionStorage {
private _transactions: Map<string, TransactionLog> = new Map()
private _data: Map<string, { value: unknown; expireAt?: number }> = new Map()
private _locks: Map<string, { token: string; expireAt: number }> = new Map()
private _maxTransactions: number
private _transactions: Map<string, TransactionLog> = new Map();
private _data: Map<string, { value: unknown; expireAt?: number }> = new Map();
private _locks: Map<string, { token: string; expireAt: number }> = new Map();
private _maxTransactions: number;
constructor(config: MemoryStorageConfig = {}) {
this._maxTransactions = config.maxTransactions ?? 1000
this._maxTransactions = config.maxTransactions ?? 1000;
}
// =========================================================================
@@ -47,30 +47,30 @@ export class MemoryStorage implements ITransactionStorage {
// =========================================================================
async acquireLock(key: string, ttl: number): Promise<string | null> {
this._cleanExpiredLocks()
this._cleanExpiredLocks();
const existing = this._locks.get(key)
const existing = this._locks.get(key);
if (existing && existing.expireAt > Date.now()) {
return null
return null;
}
const token = `lock_${Date.now()}_${Math.random().toString(36).substring(2)}`
const token = `lock_${Date.now()}_${Math.random().toString(36).substring(2)}`;
this._locks.set(key, {
token,
expireAt: Date.now() + ttl,
})
expireAt: Date.now() + ttl
});
return token
return token;
}
async releaseLock(key: string, token: string): Promise<boolean> {
const lock = this._locks.get(key)
const lock = this._locks.get(key);
if (!lock || lock.token !== token) {
return false
return false;
}
this._locks.delete(key)
return true
this._locks.delete(key);
return true;
}
// =========================================================================
@@ -79,22 +79,22 @@ export class MemoryStorage implements ITransactionStorage {
async saveTransaction(tx: TransactionLog): Promise<void> {
if (this._transactions.size >= this._maxTransactions) {
this._cleanOldTransactions()
this._cleanOldTransactions();
}
this._transactions.set(tx.id, { ...tx })
this._transactions.set(tx.id, { ...tx });
}
async getTransaction(id: string): Promise<TransactionLog | null> {
const tx = this._transactions.get(id)
return tx ? { ...tx } : null
const tx = this._transactions.get(id);
return tx ? { ...tx } : null;
}
async updateTransactionState(id: string, state: TransactionState): Promise<void> {
const tx = this._transactions.get(id)
const tx = this._transactions.get(id);
if (tx) {
tx.state = state
tx.updatedAt = Date.now()
tx.state = state;
tx.updatedAt = Date.now();
}
}
@@ -104,37 +104,37 @@ export class MemoryStorage implements ITransactionStorage {
state: OperationLog['state'],
error?: string
): Promise<void> {
const tx = this._transactions.get(transactionId)
const tx = this._transactions.get(transactionId);
if (tx && tx.operations[operationIndex]) {
tx.operations[operationIndex].state = state
tx.operations[operationIndex].state = state;
if (error) {
tx.operations[operationIndex].error = error
tx.operations[operationIndex].error = error;
}
if (state === 'executed') {
tx.operations[operationIndex].executedAt = Date.now()
tx.operations[operationIndex].executedAt = Date.now();
} else if (state === 'compensated') {
tx.operations[operationIndex].compensatedAt = Date.now()
tx.operations[operationIndex].compensatedAt = Date.now();
}
tx.updatedAt = Date.now()
tx.updatedAt = Date.now();
}
}
async getPendingTransactions(serverId?: string): Promise<TransactionLog[]> {
const result: TransactionLog[] = []
const result: TransactionLog[] = [];
for (const tx of this._transactions.values()) {
if (tx.state === 'pending' || tx.state === 'executing') {
if (!serverId || tx.metadata?.serverId === serverId) {
result.push({ ...tx })
result.push({ ...tx });
}
}
}
return result
return result;
}
async deleteTransaction(id: string): Promise<void> {
this._transactions.delete(id)
this._transactions.delete(id);
}
// =========================================================================
@@ -142,28 +142,28 @@ export class MemoryStorage implements ITransactionStorage {
// =========================================================================
async get<T>(key: string): Promise<T | null> {
this._cleanExpiredData()
this._cleanExpiredData();
const entry = this._data.get(key)
if (!entry) return null
const entry = this._data.get(key);
if (!entry) return null;
if (entry.expireAt && entry.expireAt < Date.now()) {
this._data.delete(key)
return null
this._data.delete(key);
return null;
}
return entry.value as T
return entry.value as T;
}
async set<T>(key: string, value: T, ttl?: number): Promise<void> {
this._data.set(key, {
value,
expireAt: ttl ? Date.now() + ttl : undefined,
})
expireAt: ttl ? Date.now() + ttl : undefined
});
}
async delete(key: string): Promise<boolean> {
return this._data.delete(key)
return this._data.delete(key);
}
// =========================================================================
@@ -175,9 +175,9 @@ export class MemoryStorage implements ITransactionStorage {
* @en Clear all data (for testing)
*/
clear(): void {
this._transactions.clear()
this._data.clear()
this._locks.clear()
this._transactions.clear();
this._data.clear();
this._locks.clear();
}
/**
@@ -185,37 +185,37 @@ export class MemoryStorage implements ITransactionStorage {
* @en Get transaction count
*/
get transactionCount(): number {
return this._transactions.size
return this._transactions.size;
}
private _cleanExpiredLocks(): void {
const now = Date.now()
const now = Date.now();
for (const [key, lock] of this._locks) {
if (lock.expireAt < now) {
this._locks.delete(key)
this._locks.delete(key);
}
}
}
private _cleanExpiredData(): void {
const now = Date.now()
const now = Date.now();
for (const [key, entry] of this._data) {
if (entry.expireAt && entry.expireAt < now) {
this._data.delete(key)
this._data.delete(key);
}
}
}
private _cleanOldTransactions(): void {
const sorted = Array.from(this._transactions.entries())
.sort((a, b) => a[1].createdAt - b[1].createdAt)
.sort((a, b) => a[1].createdAt - b[1].createdAt);
const toRemove = sorted
.slice(0, Math.floor(this._maxTransactions * 0.2))
.filter(([_, tx]) => tx.state === 'committed' || tx.state === 'rolledback')
.filter(([_, tx]) => tx.state === 'committed' || tx.state === 'rolledback');
for (const [id] of toRemove) {
this._transactions.delete(id)
this._transactions.delete(id);
}
}
}
@@ -225,5 +225,5 @@ export class MemoryStorage implements ITransactionStorage {
* @en Create memory storage
*/
export function createMemoryStorage(config: MemoryStorageConfig = {}): MemoryStorage {
return new MemoryStorage(config)
return new MemoryStorage(config);
}

View File

@@ -10,8 +10,8 @@ import type {
ITransactionStorage,
TransactionLog,
TransactionState,
OperationLog,
} from '../core/types.js'
OperationLog
} from '../core/types.js';
/**
* @zh MongoDB Collection 接口
@@ -36,16 +36,50 @@ export interface MongoDb {
collection<T = unknown>(name: string): MongoCollection<T>
}
/**
* @zh MongoDB 客户端接口
* @en MongoDB client interface
*/
export interface MongoClient {
db(name?: string): MongoDb
close(): Promise<void>
}
/**
* @zh MongoDB 连接工厂
* @en MongoDB connection factory
*/
export type MongoClientFactory = () => MongoClient | Promise<MongoClient>
/**
* @zh MongoDB 存储配置
* @en MongoDB storage configuration
*/
export interface MongoStorageConfig {
/**
* @zh MongoDB 数据库实例
* @en MongoDB database instance
* @zh MongoDB 客户端工厂(惰性连接)
* @en MongoDB client factory (lazy connection)
*
* @example
* ```typescript
* import { MongoClient } from 'mongodb'
* const storage = new MongoStorage({
* factory: async () => {
* const client = new MongoClient('mongodb://localhost:27017')
* await client.connect()
* return client
* },
* database: 'game'
* })
* ```
*/
db: MongoDb
factory: MongoClientFactory
/**
* @zh 数据库名称
* @en Database name
*/
database: string
/**
* @zh 事务日志集合名称
@@ -82,32 +116,94 @@ interface DataDocument {
* @zh MongoDB 存储
* @en MongoDB storage
*
* @zh 基于 MongoDB 的事务存储,支持持久化复杂查询
* @en MongoDB-based transaction storage with persistence and complex query support
* @zh 基于 MongoDB 的事务存储,支持持久化复杂查询和惰性连接
* @en MongoDB-based transaction storage with persistence, complex queries and lazy connection
*
* @example
* ```typescript
* import { MongoClient } from 'mongodb'
*
* const client = new MongoClient('mongodb://localhost:27017')
* await client.connect()
* const db = client.db('game')
* // 创建存储(惰性连接,首次操作时才连接)
* const storage = new MongoStorage({
* factory: async () => {
* const client = new MongoClient('mongodb://localhost:27017')
* await client.connect()
* return client
* },
* database: 'game'
* })
*
* const storage = new MongoStorage({ db })
* await storage.ensureIndexes()
*
* // 使用后手动关闭
* await storage.close()
*
* // 或使用 await using 自动关闭 (TypeScript 5.2+)
* await using storage = new MongoStorage({ ... })
* // 作用域结束时自动关闭
* ```
*/
export class MongoStorage implements ITransactionStorage {
private _db: MongoDb
private _transactionCollection: string
private _dataCollection: string
private _lockCollection: string
private _client: MongoClient | null = null;
private _db: MongoDb | null = null;
private _factory: MongoClientFactory;
private _database: string;
private _transactionCollection: string;
private _dataCollection: string;
private _lockCollection: string;
private _closed: boolean = false;
constructor(config: MongoStorageConfig) {
this._db = config.db
this._transactionCollection = config.transactionCollection ?? 'transactions'
this._dataCollection = config.dataCollection ?? 'transaction_data'
this._lockCollection = config.lockCollection ?? 'transaction_locks'
this._factory = config.factory;
this._database = config.database;
this._transactionCollection = config.transactionCollection ?? 'transactions';
this._dataCollection = config.dataCollection ?? 'transaction_data';
this._lockCollection = config.lockCollection ?? 'transaction_locks';
}
// =========================================================================
// 生命周期 | Lifecycle
// =========================================================================
/**
* @zh 获取数据库实例(惰性连接)
* @en Get database instance (lazy connection)
*/
private async _getDb(): Promise<MongoDb> {
if (this._closed) {
throw new Error('MongoStorage is closed');
}
if (!this._db) {
this._client = await this._factory();
this._db = this._client.db(this._database);
}
return this._db;
}
/**
* @zh 关闭存储连接
* @en Close storage connection
*/
async close(): Promise<void> {
if (this._closed) return;
this._closed = true;
if (this._client) {
await this._client.close();
this._client = null;
this._db = null;
}
}
/**
* @zh 支持 await using 语法
* @en Support await using syntax
*/
async [Symbol.asyncDispose](): Promise<void> {
await this.close();
}
/**
@@ -115,16 +211,17 @@ export class MongoStorage implements ITransactionStorage {
* @en Ensure indexes exist
*/
async ensureIndexes(): Promise<void> {
const txColl = this._db.collection<TransactionLog>(this._transactionCollection)
await txColl.createIndex({ state: 1 })
await txColl.createIndex({ 'metadata.serverId': 1 })
await txColl.createIndex({ createdAt: 1 })
const db = await this._getDb();
const txColl = db.collection<TransactionLog>(this._transactionCollection);
await txColl.createIndex({ state: 1 });
await txColl.createIndex({ 'metadata.serverId': 1 });
await txColl.createIndex({ createdAt: 1 });
const lockColl = this._db.collection<LockDocument>(this._lockCollection)
await lockColl.createIndex({ expireAt: 1 }, { expireAfterSeconds: 0 })
const lockColl = db.collection<LockDocument>(this._lockCollection);
await lockColl.createIndex({ expireAt: 1 }, { expireAfterSeconds: 0 });
const dataColl = this._db.collection<DataDocument>(this._dataCollection)
await dataColl.createIndex({ expireAt: 1 }, { expireAfterSeconds: 0 })
const dataColl = db.collection<DataDocument>(this._dataCollection);
await dataColl.createIndex({ expireAt: 1 }, { expireAfterSeconds: 0 });
}
// =========================================================================
@@ -132,36 +229,38 @@ export class MongoStorage implements ITransactionStorage {
// =========================================================================
async acquireLock(key: string, ttl: number): Promise<string | null> {
const coll = this._db.collection<LockDocument>(this._lockCollection)
const token = `${Date.now()}_${Math.random().toString(36).substring(2)}`
const expireAt = new Date(Date.now() + ttl)
const db = await this._getDb();
const coll = db.collection<LockDocument>(this._lockCollection);
const token = `${Date.now()}_${Math.random().toString(36).substring(2)}`;
const expireAt = new Date(Date.now() + ttl);
try {
await coll.insertOne({
_id: key,
token,
expireAt,
})
return token
expireAt
});
return token;
} catch (error) {
const existing = await coll.findOne({ _id: key })
const existing = await coll.findOne({ _id: key });
if (existing && existing.expireAt < new Date()) {
const result = await coll.updateOne(
{ _id: key, expireAt: { $lt: new Date() } },
{ $set: { token, expireAt } }
)
);
if (result.modifiedCount > 0) {
return token
return token;
}
}
return null
return null;
}
}
async releaseLock(key: string, token: string): Promise<boolean> {
const coll = this._db.collection<LockDocument>(this._lockCollection)
const result = await coll.deleteOne({ _id: key, token })
return result.deletedCount > 0
const db = await this._getDb();
const coll = db.collection<LockDocument>(this._lockCollection);
const result = await coll.deleteOne({ _id: key, token });
return result.deletedCount > 0;
}
// =========================================================================
@@ -169,35 +268,38 @@ export class MongoStorage implements ITransactionStorage {
// =========================================================================
async saveTransaction(tx: TransactionLog): Promise<void> {
const coll = this._db.collection<TransactionLog & { _id: string }>(this._transactionCollection)
const db = await this._getDb();
const coll = db.collection<TransactionLog & { _id: string }>(this._transactionCollection);
const existing = await coll.findOne({ _id: tx.id })
const existing = await coll.findOne({ _id: tx.id });
if (existing) {
await coll.updateOne(
{ _id: tx.id },
{ $set: { ...tx, _id: tx.id } }
)
);
} else {
await coll.insertOne({ ...tx, _id: tx.id })
await coll.insertOne({ ...tx, _id: tx.id });
}
}
async getTransaction(id: string): Promise<TransactionLog | null> {
const coll = this._db.collection<TransactionLog & { _id: string }>(this._transactionCollection)
const doc = await coll.findOne({ _id: id })
const db = await this._getDb();
const coll = db.collection<TransactionLog & { _id: string }>(this._transactionCollection);
const doc = await coll.findOne({ _id: id });
if (!doc) return null
if (!doc) return null;
const { _id, ...tx } = doc
return tx as TransactionLog
const { _id, ...tx } = doc;
return tx as TransactionLog;
}
async updateTransactionState(id: string, state: TransactionState): Promise<void> {
const coll = this._db.collection(this._transactionCollection)
const db = await this._getDb();
const coll = db.collection(this._transactionCollection);
await coll.updateOne(
{ _id: id },
{ $set: { state, updatedAt: Date.now() } }
)
);
}
async updateOperationState(
@@ -206,47 +308,50 @@ export class MongoStorage implements ITransactionStorage {
state: OperationLog['state'],
error?: string
): Promise<void> {
const coll = this._db.collection(this._transactionCollection)
const db = await this._getDb();
const coll = db.collection(this._transactionCollection);
const update: Record<string, unknown> = {
[`operations.${operationIndex}.state`]: state,
updatedAt: Date.now(),
}
updatedAt: Date.now()
};
if (error) {
update[`operations.${operationIndex}.error`] = error
update[`operations.${operationIndex}.error`] = error;
}
if (state === 'executed') {
update[`operations.${operationIndex}.executedAt`] = Date.now()
update[`operations.${operationIndex}.executedAt`] = Date.now();
} else if (state === 'compensated') {
update[`operations.${operationIndex}.compensatedAt`] = Date.now()
update[`operations.${operationIndex}.compensatedAt`] = Date.now();
}
await coll.updateOne(
{ _id: transactionId },
{ $set: update }
)
);
}
async getPendingTransactions(serverId?: string): Promise<TransactionLog[]> {
const coll = this._db.collection<TransactionLog & { _id: string }>(this._transactionCollection)
const db = await this._getDb();
const coll = db.collection<TransactionLog & { _id: string }>(this._transactionCollection);
const filter: Record<string, unknown> = {
state: { $in: ['pending', 'executing'] },
}
state: { $in: ['pending', 'executing'] }
};
if (serverId) {
filter['metadata.serverId'] = serverId
filter['metadata.serverId'] = serverId;
}
const docs = await coll.find(filter).toArray()
return docs.map(({ _id, ...tx }) => tx as TransactionLog)
const docs = await coll.find(filter).toArray();
return docs.map(({ _id, ...tx }) => tx as TransactionLog);
}
async deleteTransaction(id: string): Promise<void> {
const coll = this._db.collection(this._transactionCollection)
await coll.deleteOne({ _id: id })
const db = await this._getDb();
const coll = db.collection(this._transactionCollection);
await coll.deleteOne({ _id: id });
}
// =========================================================================
@@ -254,43 +359,46 @@ export class MongoStorage implements ITransactionStorage {
// =========================================================================
async get<T>(key: string): Promise<T | null> {
const coll = this._db.collection<DataDocument>(this._dataCollection)
const doc = await coll.findOne({ _id: key })
const db = await this._getDb();
const coll = db.collection<DataDocument>(this._dataCollection);
const doc = await coll.findOne({ _id: key });
if (!doc) return null
if (!doc) return null;
if (doc.expireAt && doc.expireAt < new Date()) {
await coll.deleteOne({ _id: key })
return null
await coll.deleteOne({ _id: key });
return null;
}
return doc.value as T
return doc.value as T;
}
async set<T>(key: string, value: T, ttl?: number): Promise<void> {
const coll = this._db.collection<DataDocument>(this._dataCollection)
const db = await this._getDb();
const coll = db.collection<DataDocument>(this._dataCollection);
const doc: DataDocument = {
_id: key,
value,
}
value
};
if (ttl) {
doc.expireAt = new Date(Date.now() + ttl)
doc.expireAt = new Date(Date.now() + ttl);
}
const existing = await coll.findOne({ _id: key })
const existing = await coll.findOne({ _id: key });
if (existing) {
await coll.updateOne({ _id: key }, { $set: doc })
await coll.updateOne({ _id: key }, { $set: doc });
} else {
await coll.insertOne(doc)
await coll.insertOne(doc);
}
}
async delete(key: string): Promise<boolean> {
const coll = this._db.collection(this._dataCollection)
const result = await coll.deleteOne({ _id: key })
return result.deletedCount > 0
const db = await this._getDb();
const coll = db.collection(this._dataCollection);
const result = await coll.deleteOne({ _id: key });
return result.deletedCount > 0;
}
}
@@ -299,5 +407,5 @@ export class MongoStorage implements ITransactionStorage {
* @en Create MongoDB storage
*/
export function createMongoStorage(config: MongoStorageConfig): MongoStorage {
return new MongoStorage(config)
return new MongoStorage(config);
}

View File

@@ -10,8 +10,8 @@ import type {
ITransactionStorage,
TransactionLog,
TransactionState,
OperationLog,
} from '../core/types.js'
OperationLog
} from '../core/types.js';
/**
* @zh Redis 客户端接口(兼容 ioredis
@@ -28,18 +28,33 @@ export interface RedisClient {
hgetall(key: string): Promise<Record<string, string>>
keys(pattern: string): Promise<string[]>
expire(key: string, seconds: number): Promise<number>
quit(): Promise<string>
}
/**
* @zh Redis 连接工厂
* @en Redis connection factory
*/
export type RedisClientFactory = () => RedisClient | Promise<RedisClient>
/**
* @zh Redis 存储配置
* @en Redis storage configuration
*/
export interface RedisStorageConfig {
/**
* @zh Redis 客户端实例
* @en Redis client instance
* @zh Redis 客户端工厂(惰性连接)
* @en Redis client factory (lazy connection)
*
* @example
* ```typescript
* import Redis from 'ioredis'
* const storage = new RedisStorage({
* factory: () => new Redis('redis://localhost:6379')
* })
* ```
*/
client: RedisClient
factory: RedisClientFactory
/**
* @zh 键前缀
@@ -60,32 +75,88 @@ if redis.call("get", KEYS[1]) == ARGV[1] then
else
return 0
end
`
`;
/**
* @zh Redis 存储
* @en Redis storage
*
* @zh 基于 Redis 的分布式事务存储,支持分布式锁
* @en Redis-based distributed transaction storage with distributed locking support
* @zh 基于 Redis 的分布式事务存储,支持分布式锁和惰性连接
* @en Redis-based distributed transaction storage with distributed locking and lazy connection
*
* @example
* ```typescript
* import Redis from 'ioredis'
*
* const redis = new Redis('redis://localhost:6379')
* const storage = new RedisStorage({ client: redis })
* // 创建存储(惰性连接,首次操作时才连接)
* const storage = new RedisStorage({
* factory: () => new Redis('redis://localhost:6379')
* })
*
* // 使用后手动关闭
* await storage.close()
*
* // 或使用 await using 自动关闭 (TypeScript 5.2+)
* await using storage = new RedisStorage({
* factory: () => new Redis('redis://localhost:6379')
* })
* // 作用域结束时自动关闭
* ```
*/
export class RedisStorage implements ITransactionStorage {
private _client: RedisClient
private _prefix: string
private _transactionTTL: number
private _client: RedisClient | null = null;
private _factory: RedisClientFactory;
private _prefix: string;
private _transactionTTL: number;
private _closed: boolean = false;
constructor(config: RedisStorageConfig) {
this._client = config.client
this._prefix = config.prefix ?? 'tx:'
this._transactionTTL = config.transactionTTL ?? 86400 // 24 hours
this._factory = config.factory;
this._prefix = config.prefix ?? 'tx:';
this._transactionTTL = config.transactionTTL ?? 86400; // 24 hours
}
// =========================================================================
// 生命周期 | Lifecycle
// =========================================================================
/**
* @zh 获取 Redis 客户端(惰性连接)
* @en Get Redis client (lazy connection)
*/
private async _getClient(): Promise<RedisClient> {
if (this._closed) {
throw new Error('RedisStorage is closed');
}
if (!this._client) {
this._client = await this._factory();
}
return this._client;
}
/**
* @zh 关闭存储连接
* @en Close storage connection
*/
async close(): Promise<void> {
if (this._closed) return;
this._closed = true;
if (this._client) {
await this._client.quit();
this._client = null;
}
}
/**
* @zh 支持 await using 语法
* @en Support await using syntax
*/
async [Symbol.asyncDispose](): Promise<void> {
await this.close();
}
// =========================================================================
@@ -93,20 +164,22 @@ export class RedisStorage implements ITransactionStorage {
// =========================================================================
async acquireLock(key: string, ttl: number): Promise<string | null> {
const lockKey = `${this._prefix}lock:${key}`
const token = `${Date.now()}_${Math.random().toString(36).substring(2)}`
const ttlSeconds = Math.ceil(ttl / 1000)
const client = await this._getClient();
const lockKey = `${this._prefix}lock:${key}`;
const token = `${Date.now()}_${Math.random().toString(36).substring(2)}`;
const ttlSeconds = Math.ceil(ttl / 1000);
const result = await this._client.set(lockKey, token, 'NX', 'EX', String(ttlSeconds))
const result = await client.set(lockKey, token, 'NX', 'EX', String(ttlSeconds));
return result === 'OK' ? token : null
return result === 'OK' ? token : null;
}
async releaseLock(key: string, token: string): Promise<boolean> {
const lockKey = `${this._prefix}lock:${key}`
const client = await this._getClient();
const lockKey = `${this._prefix}lock:${key}`;
const result = await this._client.eval(LOCK_SCRIPT, 1, lockKey, token)
return result === 1
const result = await client.eval(LOCK_SCRIPT, 1, lockKey, token);
return result === 1;
}
// =========================================================================
@@ -114,30 +187,32 @@ export class RedisStorage implements ITransactionStorage {
// =========================================================================
async saveTransaction(tx: TransactionLog): Promise<void> {
const key = `${this._prefix}tx:${tx.id}`
const client = await this._getClient();
const key = `${this._prefix}tx:${tx.id}`;
await this._client.set(key, JSON.stringify(tx))
await this._client.expire(key, this._transactionTTL)
await client.set(key, JSON.stringify(tx));
await client.expire(key, this._transactionTTL);
if (tx.metadata?.serverId) {
const serverKey = `${this._prefix}server:${tx.metadata.serverId}:txs`
await this._client.hset(serverKey, tx.id, String(tx.createdAt))
const serverKey = `${this._prefix}server:${tx.metadata.serverId}:txs`;
await client.hset(serverKey, tx.id, String(tx.createdAt));
}
}
async getTransaction(id: string): Promise<TransactionLog | null> {
const key = `${this._prefix}tx:${id}`
const data = await this._client.get(key)
const client = await this._getClient();
const key = `${this._prefix}tx:${id}`;
const data = await client.get(key);
return data ? JSON.parse(data) : null
return data ? JSON.parse(data) : null;
}
async updateTransactionState(id: string, state: TransactionState): Promise<void> {
const tx = await this.getTransaction(id)
const tx = await this.getTransaction(id);
if (tx) {
tx.state = state
tx.updatedAt = Date.now()
await this.saveTransaction(tx)
tx.state = state;
tx.updatedAt = Date.now();
await this.saveTransaction(tx);
}
}
@@ -147,62 +222,64 @@ export class RedisStorage implements ITransactionStorage {
state: OperationLog['state'],
error?: string
): Promise<void> {
const tx = await this.getTransaction(transactionId)
const tx = await this.getTransaction(transactionId);
if (tx && tx.operations[operationIndex]) {
tx.operations[operationIndex].state = state
tx.operations[operationIndex].state = state;
if (error) {
tx.operations[operationIndex].error = error
tx.operations[operationIndex].error = error;
}
if (state === 'executed') {
tx.operations[operationIndex].executedAt = Date.now()
tx.operations[operationIndex].executedAt = Date.now();
} else if (state === 'compensated') {
tx.operations[operationIndex].compensatedAt = Date.now()
tx.operations[operationIndex].compensatedAt = Date.now();
}
tx.updatedAt = Date.now()
await this.saveTransaction(tx)
tx.updatedAt = Date.now();
await this.saveTransaction(tx);
}
}
async getPendingTransactions(serverId?: string): Promise<TransactionLog[]> {
const result: TransactionLog[] = []
const client = await this._getClient();
const result: TransactionLog[] = [];
if (serverId) {
const serverKey = `${this._prefix}server:${serverId}:txs`
const txIds = await this._client.hgetall(serverKey)
const serverKey = `${this._prefix}server:${serverId}:txs`;
const txIds = await client.hgetall(serverKey);
for (const id of Object.keys(txIds)) {
const tx = await this.getTransaction(id)
const tx = await this.getTransaction(id);
if (tx && (tx.state === 'pending' || tx.state === 'executing')) {
result.push(tx)
result.push(tx);
}
}
} else {
const pattern = `${this._prefix}tx:*`
const keys = await this._client.keys(pattern)
const pattern = `${this._prefix}tx:*`;
const keys = await client.keys(pattern);
for (const key of keys) {
const data = await this._client.get(key)
const data = await client.get(key);
if (data) {
const tx: TransactionLog = JSON.parse(data)
const tx: TransactionLog = JSON.parse(data);
if (tx.state === 'pending' || tx.state === 'executing') {
result.push(tx)
result.push(tx);
}
}
}
}
return result
return result;
}
async deleteTransaction(id: string): Promise<void> {
const key = `${this._prefix}tx:${id}`
const tx = await this.getTransaction(id)
const client = await this._getClient();
const key = `${this._prefix}tx:${id}`;
const tx = await this.getTransaction(id);
await this._client.del(key)
await client.del(key);
if (tx?.metadata?.serverId) {
const serverKey = `${this._prefix}server:${tx.metadata.serverId}:txs`
await this._client.hdel(serverKey, id)
const serverKey = `${this._prefix}server:${tx.metadata.serverId}:txs`;
await client.hdel(serverKey, id);
}
}
@@ -211,27 +288,30 @@ export class RedisStorage implements ITransactionStorage {
// =========================================================================
async get<T>(key: string): Promise<T | null> {
const fullKey = `${this._prefix}data:${key}`
const data = await this._client.get(fullKey)
const client = await this._getClient();
const fullKey = `${this._prefix}data:${key}`;
const data = await client.get(fullKey);
return data ? JSON.parse(data) : null
return data ? JSON.parse(data) : null;
}
async set<T>(key: string, value: T, ttl?: number): Promise<void> {
const fullKey = `${this._prefix}data:${key}`
const client = await this._getClient();
const fullKey = `${this._prefix}data:${key}`;
if (ttl) {
const ttlSeconds = Math.ceil(ttl / 1000)
await this._client.set(fullKey, JSON.stringify(value), 'EX', String(ttlSeconds))
const ttlSeconds = Math.ceil(ttl / 1000);
await client.set(fullKey, JSON.stringify(value), 'EX', String(ttlSeconds));
} else {
await this._client.set(fullKey, JSON.stringify(value))
await client.set(fullKey, JSON.stringify(value));
}
}
async delete(key: string): Promise<boolean> {
const fullKey = `${this._prefix}data:${key}`
const result = await this._client.del(fullKey)
return result > 0
const client = await this._getClient();
const fullKey = `${this._prefix}data:${key}`;
const result = await client.del(fullKey);
return result > 0;
}
}
@@ -240,5 +320,5 @@ export class RedisStorage implements ITransactionStorage {
* @en Create Redis storage
*/
export function createRedisStorage(config: RedisStorageConfig): RedisStorage {
return new RedisStorage(config)
return new RedisStorage(config);
}

View File

@@ -3,6 +3,6 @@
* @en Storage module exports
*/
export { MemoryStorage, createMemoryStorage, type MemoryStorageConfig } from './MemoryStorage.js'
export { RedisStorage, createRedisStorage, type RedisStorageConfig, type RedisClient } from './RedisStorage.js'
export { MongoStorage, createMongoStorage, type MongoStorageConfig, type MongoDb, type MongoCollection } from './MongoStorage.js'
export { MemoryStorage, createMemoryStorage, type MemoryStorageConfig } from './MemoryStorage.js';
export { RedisStorage, createRedisStorage, type RedisStorageConfig, type RedisClient } from './RedisStorage.js';
export { MongoStorage, createMongoStorage, type MongoStorageConfig, type MongoDb, type MongoCollection } from './MongoStorage.js';