feat(transaction): 添加游戏事务系统 | add game transaction system (#381)

- TransactionManager/TransactionContext 事务管理
- MemoryStorage/RedisStorage/MongoStorage 存储实现
- CurrencyOperation/InventoryOperation/TradeOperation 内置操作
- SagaOrchestrator 分布式 Saga 编排
- withTransactions() Room 集成
- 完整中英文文档
This commit is contained in:
YHH
2025-12-29 10:54:00 +08:00
committed by GitHub
parent 2d46ccf896
commit d4cef828e1
38 changed files with 6631 additions and 16 deletions

View File

@@ -0,0 +1,286 @@
/**
* @zh 事务上下文实现
* @en Transaction context implementation
*/
import type {
ITransactionContext,
ITransactionOperation,
ITransactionStorage,
TransactionState,
TransactionResult,
TransactionOptions,
TransactionLog,
OperationLog,
OperationResult,
} from './types.js'
/**
* @zh 生成唯一 ID
* @en Generate unique ID
*/
function generateId(): string {
return `tx_${Date.now().toString(36)}_${Math.random().toString(36).substring(2, 11)}`
}
/**
* @zh 事务上下文
* @en Transaction context
*
* @zh 封装事务的状态、操作和执行逻辑
* @en Encapsulates transaction state, operations, and execution logic
*
* @example
* ```typescript
* const ctx = new TransactionContext({ timeout: 5000 })
* ctx.addOperation(new DeductCurrency({ playerId: '1', amount: 100 }))
* ctx.addOperation(new AddItem({ playerId: '1', itemId: 'sword' }))
* const result = await ctx.execute()
* ```
*/
export class TransactionContext implements ITransactionContext {
private _id: string
private _state: TransactionState = 'pending'
private _timeout: number
private _operations: ITransactionOperation[] = []
private _storage: ITransactionStorage | null
private _metadata: Record<string, unknown>
private _contextData: Map<string, unknown> = new Map()
private _startTime: number = 0
private _distributed: boolean
constructor(options: TransactionOptions & { storage?: ITransactionStorage } = {}) {
this._id = generateId()
this._timeout = options.timeout ?? 30000
this._storage = options.storage ?? null
this._metadata = options.metadata ?? {}
this._distributed = options.distributed ?? false
}
// =========================================================================
// 只读属性 | Readonly properties
// =========================================================================
get id(): string {
return this._id
}
get state(): TransactionState {
return this._state
}
get timeout(): number {
return this._timeout
}
get operations(): ReadonlyArray<ITransactionOperation> {
return this._operations
}
get storage(): ITransactionStorage | null {
return this._storage
}
get metadata(): Record<string, unknown> {
return this._metadata
}
// =========================================================================
// 公共方法 | Public methods
// =========================================================================
/**
* @zh 添加操作
* @en Add operation
*/
addOperation<T extends ITransactionOperation>(operation: T): this {
if (this._state !== 'pending') {
throw new Error(`Cannot add operation to transaction in state: ${this._state}`)
}
this._operations.push(operation)
return this
}
/**
* @zh 执行事务
* @en Execute transaction
*/
async execute<T = unknown>(): Promise<TransactionResult<T>> {
if (this._state !== 'pending') {
return {
success: false,
transactionId: this._id,
results: [],
error: `Transaction already in state: ${this._state}`,
duration: 0,
}
}
this._startTime = Date.now()
this._state = 'executing'
const results: OperationResult[] = []
let executedCount = 0
try {
await this._saveLog()
for (let i = 0; i < this._operations.length; i++) {
if (this._isTimedOut()) {
throw new Error('Transaction timed out')
}
const op = this._operations[i]
const isValid = await op.validate(this)
if (!isValid) {
throw new Error(`Validation failed for operation: ${op.name}`)
}
const result = await op.execute(this)
results.push(result)
executedCount++
await this._updateOperationLog(i, 'executed')
if (!result.success) {
throw new Error(result.error ?? `Operation ${op.name} failed`)
}
}
this._state = 'committed'
await this._updateTransactionState('committed')
return {
success: true,
transactionId: this._id,
results,
data: this._collectResultData(results) as T,
duration: Date.now() - this._startTime,
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
await this._compensate(executedCount - 1)
return {
success: false,
transactionId: this._id,
results,
error: errorMessage,
duration: Date.now() - this._startTime,
}
}
}
/**
* @zh 手动回滚事务
* @en Manually rollback transaction
*/
async rollback(): Promise<void> {
if (this._state === 'committed' || this._state === 'rolledback') {
return
}
await this._compensate(this._operations.length - 1)
}
/**
* @zh 获取上下文数据
* @en Get context data
*/
get<T>(key: string): T | undefined {
return this._contextData.get(key) as T | undefined
}
/**
* @zh 设置上下文数据
* @en Set context data
*/
set<T>(key: string, value: T): void {
this._contextData.set(key, value)
}
// =========================================================================
// 私有方法 | Private methods
// =========================================================================
private _isTimedOut(): boolean {
return Date.now() - this._startTime > this._timeout
}
private async _compensate(fromIndex: number): Promise<void> {
this._state = 'rolledback'
for (let i = fromIndex; i >= 0; i--) {
const op = this._operations[i]
try {
await op.compensate(this)
await this._updateOperationLog(i, 'compensated')
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
await this._updateOperationLog(i, 'failed', errorMessage)
}
}
await this._updateTransactionState('rolledback')
}
private async _saveLog(): Promise<void> {
if (!this._storage) return
const log: TransactionLog = {
id: this._id,
state: this._state,
createdAt: this._startTime,
updatedAt: this._startTime,
timeout: this._timeout,
operations: this._operations.map((op) => ({
name: op.name,
data: op.data,
state: 'pending' as const,
})),
metadata: this._metadata,
distributed: this._distributed,
}
await this._storage.saveTransaction(log)
}
private async _updateTransactionState(state: TransactionState): Promise<void> {
this._state = state
if (this._storage) {
await this._storage.updateTransactionState(this._id, state)
}
}
private async _updateOperationLog(
index: number,
state: OperationLog['state'],
error?: string
): Promise<void> {
if (this._storage) {
await this._storage.updateOperationState(this._id, index, state, error)
}
}
private _collectResultData(results: OperationResult[]): unknown {
const data: Record<string, unknown> = {}
for (const result of results) {
if (result.data !== undefined) {
Object.assign(data, result.data)
}
}
return Object.keys(data).length > 0 ? data : undefined
}
}
/**
* @zh 创建事务上下文
* @en Create transaction context
*/
export function createTransactionContext(
options: TransactionOptions & { storage?: ITransactionStorage } = {}
): ITransactionContext {
return new TransactionContext(options)
}

View File

@@ -0,0 +1,255 @@
/**
* @zh 事务管理器
* @en Transaction manager
*/
import type {
ITransactionContext,
ITransactionStorage,
TransactionManagerConfig,
TransactionOptions,
TransactionLog,
TransactionResult,
} from './types.js'
import { TransactionContext } from './TransactionContext.js'
/**
* @zh 事务管理器
* @en Transaction manager
*
* @zh 管理事务的创建、执行和恢复
* @en Manages transaction creation, execution, and recovery
*
* @example
* ```typescript
* const manager = new TransactionManager({
* storage: new RedisStorage({ url: 'redis://localhost:6379' }),
* defaultTimeout: 10000,
* })
*
* const tx = manager.begin({ timeout: 5000 })
* tx.addOperation(new DeductCurrency({ ... }))
* tx.addOperation(new AddItem({ ... }))
*
* const result = await tx.execute()
* ```
*/
export class TransactionManager {
private _storage: ITransactionStorage | null
private _defaultTimeout: number
private _serverId: string
private _autoRecover: boolean
private _activeTransactions: Map<string, ITransactionContext> = new Map()
constructor(config: TransactionManagerConfig = {}) {
this._storage = config.storage ?? null
this._defaultTimeout = config.defaultTimeout ?? 30000
this._serverId = config.serverId ?? this._generateServerId()
this._autoRecover = config.autoRecover ?? true
}
// =========================================================================
// 只读属性 | Readonly properties
// =========================================================================
/**
* @zh 服务器 ID
* @en Server ID
*/
get serverId(): string {
return this._serverId
}
/**
* @zh 存储实例
* @en Storage instance
*/
get storage(): ITransactionStorage | null {
return this._storage
}
/**
* @zh 活跃事务数量
* @en Active transaction count
*/
get activeCount(): number {
return this._activeTransactions.size
}
// =========================================================================
// 公共方法 | Public methods
// =========================================================================
/**
* @zh 开始新事务
* @en Begin new transaction
*
* @param options - @zh 事务选项 @en Transaction options
* @returns @zh 事务上下文 @en Transaction context
*/
begin(options: TransactionOptions = {}): ITransactionContext {
const ctx = new TransactionContext({
timeout: options.timeout ?? this._defaultTimeout,
storage: this._storage ?? undefined,
metadata: {
...options.metadata,
serverId: this._serverId,
},
distributed: options.distributed,
})
this._activeTransactions.set(ctx.id, ctx)
return ctx
}
/**
* @zh 执行事务(便捷方法)
* @en Execute transaction (convenience method)
*
* @param builder - @zh 事务构建函数 @en Transaction builder function
* @param options - @zh 事务选项 @en Transaction options
* @returns @zh 事务结果 @en Transaction result
*/
async run<T = unknown>(
builder: (ctx: ITransactionContext) => void | Promise<void>,
options: TransactionOptions = {}
): Promise<TransactionResult<T>> {
const ctx = this.begin(options)
try {
await builder(ctx)
const result = await ctx.execute<T>()
return result
} finally {
this._activeTransactions.delete(ctx.id)
}
}
/**
* @zh 获取活跃事务
* @en Get active transaction
*/
getTransaction(id: string): ITransactionContext | undefined {
return this._activeTransactions.get(id)
}
/**
* @zh 恢复未完成的事务
* @en Recover pending transactions
*/
async recover(): Promise<number> {
if (!this._storage) return 0
const pendingTransactions = await this._storage.getPendingTransactions(this._serverId)
let recoveredCount = 0
for (const log of pendingTransactions) {
try {
await this._recoverTransaction(log)
recoveredCount++
} catch (error) {
console.error(`Failed to recover transaction ${log.id}:`, error)
}
}
return recoveredCount
}
/**
* @zh 获取分布式锁
* @en Acquire distributed lock
*/
async acquireLock(key: string, ttl: number = 10000): Promise<string | null> {
if (!this._storage) return null
return this._storage.acquireLock(key, ttl)
}
/**
* @zh 释放分布式锁
* @en Release distributed lock
*/
async releaseLock(key: string, token: string): Promise<boolean> {
if (!this._storage) return false
return this._storage.releaseLock(key, token)
}
/**
* @zh 使用分布式锁执行
* @en Execute with distributed lock
*/
async withLock<T>(
key: string,
fn: () => Promise<T>,
ttl: number = 10000
): Promise<T> {
const token = await this.acquireLock(key, ttl)
if (!token) {
throw new Error(`Failed to acquire lock for key: ${key}`)
}
try {
return await fn()
} finally {
await this.releaseLock(key, token)
}
}
/**
* @zh 清理已完成的事务日志
* @en Clean up completed transaction logs
*/
async cleanup(beforeTimestamp?: number): Promise<number> {
if (!this._storage) return 0
const timestamp = beforeTimestamp ?? Date.now() - 24 * 60 * 60 * 1000 // 默认清理24小时前
const pendingTransactions = await this._storage.getPendingTransactions()
let cleanedCount = 0
for (const log of pendingTransactions) {
if (
log.createdAt < timestamp &&
(log.state === 'committed' || log.state === 'rolledback')
) {
await this._storage.deleteTransaction(log.id)
cleanedCount++
}
}
return cleanedCount
}
// =========================================================================
// 私有方法 | Private methods
// =========================================================================
private _generateServerId(): string {
return `server_${Date.now().toString(36)}_${Math.random().toString(36).substring(2, 8)}`
}
private async _recoverTransaction(log: TransactionLog): Promise<void> {
if (log.state === 'executing') {
const executedOps = log.operations.filter((op) => op.state === 'executed')
if (executedOps.length > 0 && this._storage) {
for (let i = executedOps.length - 1; i >= 0; i--) {
await this._storage.updateOperationState(log.id, i, 'compensated')
}
await this._storage.updateTransactionState(log.id, 'rolledback')
} else {
await this._storage?.updateTransactionState(log.id, 'failed')
}
}
}
}
/**
* @zh 创建事务管理器
* @en Create transaction manager
*/
export function createTransactionManager(
config: TransactionManagerConfig = {}
): TransactionManager {
return new TransactionManager(config)
}

View File

@@ -0,0 +1,20 @@
/**
* @zh 核心模块导出
* @en Core module exports
*/
export type {
TransactionState,
OperationResult,
TransactionResult,
OperationLog,
TransactionLog,
TransactionOptions,
TransactionManagerConfig,
ITransactionStorage,
ITransactionOperation,
ITransactionContext,
} from './types.js'
export { TransactionContext, createTransactionContext } from './TransactionContext.js'
export { TransactionManager, createTransactionManager } from './TransactionManager.js'

View File

@@ -0,0 +1,484 @@
/**
* @zh 事务系统核心类型定义
* @en Transaction system core type definitions
*/
// =============================================================================
// 事务状态 | Transaction State
// =============================================================================
/**
* @zh 事务状态
* @en Transaction state
*/
export type TransactionState =
| 'pending' // 等待执行 | Waiting to execute
| 'executing' // 执行中 | Executing
| 'committed' // 已提交 | Committed
| 'rolledback' // 已回滚 | Rolled back
| 'failed' // 失败 | Failed
// =============================================================================
// 操作结果 | Operation Result
// =============================================================================
/**
* @zh 操作结果
* @en Operation result
*/
export interface OperationResult<T = unknown> {
/**
* @zh 是否成功
* @en Whether succeeded
*/
success: boolean
/**
* @zh 返回数据
* @en Return data
*/
data?: T
/**
* @zh 错误信息
* @en Error message
*/
error?: string
/**
* @zh 错误代码
* @en Error code
*/
errorCode?: string
}
/**
* @zh 事务结果
* @en Transaction result
*/
export interface TransactionResult<T = unknown> {
/**
* @zh 是否成功
* @en Whether succeeded
*/
success: boolean
/**
* @zh 事务 ID
* @en Transaction ID
*/
transactionId: string
/**
* @zh 操作结果列表
* @en Operation results
*/
results: OperationResult[]
/**
* @zh 最终数据
* @en Final data
*/
data?: T
/**
* @zh 错误信息
* @en Error message
*/
error?: string
/**
* @zh 执行时间(毫秒)
* @en Execution time in milliseconds
*/
duration: number
}
// =============================================================================
// 事务日志 | Transaction Log
// =============================================================================
/**
* @zh 操作日志
* @en Operation log
*/
export interface OperationLog {
/**
* @zh 操作名称
* @en Operation name
*/
name: string
/**
* @zh 操作数据
* @en Operation data
*/
data: unknown
/**
* @zh 操作状态
* @en Operation state
*/
state: 'pending' | 'executed' | 'compensated' | 'failed'
/**
* @zh 执行时间
* @en Execution timestamp
*/
executedAt?: number
/**
* @zh 补偿时间
* @en Compensation timestamp
*/
compensatedAt?: number
/**
* @zh 错误信息
* @en Error message
*/
error?: string
}
/**
* @zh 事务日志
* @en Transaction log
*/
export interface TransactionLog {
/**
* @zh 事务 ID
* @en Transaction ID
*/
id: string
/**
* @zh 事务状态
* @en Transaction state
*/
state: TransactionState
/**
* @zh 创建时间
* @en Creation timestamp
*/
createdAt: number
/**
* @zh 更新时间
* @en Update timestamp
*/
updatedAt: number
/**
* @zh 超时时间(毫秒)
* @en Timeout in milliseconds
*/
timeout: number
/**
* @zh 操作日志列表
* @en Operation logs
*/
operations: OperationLog[]
/**
* @zh 元数据
* @en Metadata
*/
metadata?: Record<string, unknown>
/**
* @zh 是否分布式事务
* @en Whether distributed transaction
*/
distributed?: boolean
/**
* @zh 参与的服务器列表
* @en Participating servers
*/
participants?: string[]
}
// =============================================================================
// 事务配置 | Transaction Configuration
// =============================================================================
/**
* @zh 事务选项
* @en Transaction options
*/
export interface TransactionOptions {
/**
* @zh 超时时间(毫秒),默认 30000
* @en Timeout in milliseconds, default 30000
*/
timeout?: number
/**
* @zh 是否分布式事务
* @en Whether distributed transaction
*/
distributed?: boolean
/**
* @zh 元数据
* @en Metadata
*/
metadata?: Record<string, unknown>
/**
* @zh 重试次数,默认 0
* @en Retry count, default 0
*/
retryCount?: number
/**
* @zh 重试间隔(毫秒),默认 1000
* @en Retry interval in milliseconds, default 1000
*/
retryInterval?: number
}
/**
* @zh 事务管理器配置
* @en Transaction manager configuration
*/
export interface TransactionManagerConfig {
/**
* @zh 存储实例
* @en Storage instance
*/
storage?: ITransactionStorage
/**
* @zh 默认超时时间(毫秒)
* @en Default timeout in milliseconds
*/
defaultTimeout?: number
/**
* @zh 服务器 ID分布式用
* @en Server ID for distributed transactions
*/
serverId?: string
/**
* @zh 是否自动恢复未完成事务
* @en Whether to auto-recover pending transactions
*/
autoRecover?: boolean
}
// =============================================================================
// 存储接口 | Storage Interface
// =============================================================================
/**
* @zh 事务存储接口
* @en Transaction storage interface
*/
export interface ITransactionStorage {
/**
* @zh 获取分布式锁
* @en Acquire distributed lock
*
* @param key - @zh 锁的键 @en Lock key
* @param ttl - @zh 锁的生存时间(毫秒) @en Lock TTL in milliseconds
* @returns @zh 锁令牌,获取失败返回 null @en Lock token, null if failed
*/
acquireLock(key: string, ttl: number): Promise<string | null>
/**
* @zh 释放分布式锁
* @en Release distributed lock
*
* @param key - @zh 锁的键 @en Lock key
* @param token - @zh 锁令牌 @en Lock token
* @returns @zh 是否成功释放 @en Whether released successfully
*/
releaseLock(key: string, token: string): Promise<boolean>
/**
* @zh 保存事务日志
* @en Save transaction log
*/
saveTransaction(tx: TransactionLog): Promise<void>
/**
* @zh 获取事务日志
* @en Get transaction log
*/
getTransaction(id: string): Promise<TransactionLog | null>
/**
* @zh 更新事务状态
* @en Update transaction state
*/
updateTransactionState(id: string, state: TransactionState): Promise<void>
/**
* @zh 更新操作状态
* @en Update operation state
*/
updateOperationState(
transactionId: string,
operationIndex: number,
state: OperationLog['state'],
error?: string
): Promise<void>
/**
* @zh 获取待恢复的事务列表
* @en Get pending transactions for recovery
*/
getPendingTransactions(serverId?: string): Promise<TransactionLog[]>
/**
* @zh 删除事务日志
* @en Delete transaction log
*/
deleteTransaction(id: string): Promise<void>
/**
* @zh 获取数据
* @en Get data
*/
get<T>(key: string): Promise<T | null>
/**
* @zh 设置数据
* @en Set data
*/
set<T>(key: string, value: T, ttl?: number): Promise<void>
/**
* @zh 删除数据
* @en Delete data
*/
delete(key: string): Promise<boolean>
}
// =============================================================================
// 操作接口 | Operation Interface
// =============================================================================
/**
* @zh 事务操作接口
* @en Transaction operation interface
*/
export interface ITransactionOperation<TData = unknown, TResult = unknown> {
/**
* @zh 操作名称
* @en Operation name
*/
readonly name: string
/**
* @zh 操作数据
* @en Operation data
*/
readonly data: TData
/**
* @zh 验证前置条件
* @en Validate preconditions
*
* @param ctx - @zh 事务上下文 @en Transaction context
* @returns @zh 是否验证通过 @en Whether validation passed
*/
validate(ctx: ITransactionContext): Promise<boolean>
/**
* @zh 执行操作
* @en Execute operation
*
* @param ctx - @zh 事务上下文 @en Transaction context
* @returns @zh 操作结果 @en Operation result
*/
execute(ctx: ITransactionContext): Promise<OperationResult<TResult>>
/**
* @zh 补偿操作(回滚)
* @en Compensate operation (rollback)
*
* @param ctx - @zh 事务上下文 @en Transaction context
*/
compensate(ctx: ITransactionContext): Promise<void>
}
// =============================================================================
// 事务上下文接口 | Transaction Context Interface
// =============================================================================
/**
* @zh 事务上下文接口
* @en Transaction context interface
*/
export interface ITransactionContext {
/**
* @zh 事务 ID
* @en Transaction ID
*/
readonly id: string
/**
* @zh 事务状态
* @en Transaction state
*/
readonly state: TransactionState
/**
* @zh 超时时间(毫秒)
* @en Timeout in milliseconds
*/
readonly timeout: number
/**
* @zh 操作列表
* @en Operations
*/
readonly operations: ReadonlyArray<ITransactionOperation>
/**
* @zh 存储实例
* @en Storage instance
*/
readonly storage: ITransactionStorage | null
/**
* @zh 元数据
* @en Metadata
*/
readonly metadata: Record<string, unknown>
/**
* @zh 添加操作
* @en Add operation
*/
addOperation<T extends ITransactionOperation>(operation: T): this
/**
* @zh 执行事务
* @en Execute transaction
*/
execute<T = unknown>(): Promise<TransactionResult<T>>
/**
* @zh 回滚事务
* @en Rollback transaction
*/
rollback(): Promise<void>
/**
* @zh 获取上下文数据
* @en Get context data
*/
get<T>(key: string): T | undefined
/**
* @zh 设置上下文数据
* @en Set context data
*/
set<T>(key: string, value: T): void
}

View File

@@ -0,0 +1,350 @@
/**
* @zh Saga 编排器
* @en Saga Orchestrator
*
* @zh 实现分布式事务的 Saga 模式编排
* @en Implements Saga pattern orchestration for distributed transactions
*/
import type {
ITransactionStorage,
TransactionLog,
TransactionState,
OperationResult,
} from '../core/types.js'
/**
* @zh Saga 步骤状态
* @en Saga step state
*/
export type SagaStepState = 'pending' | 'executing' | 'completed' | 'compensating' | 'compensated' | 'failed'
/**
* @zh Saga 步骤
* @en Saga step
*/
export interface SagaStep<T = unknown> {
/**
* @zh 步骤名称
* @en Step name
*/
name: string
/**
* @zh 目标服务器 ID分布式用
* @en Target server ID (for distributed)
*/
serverId?: string
/**
* @zh 执行函数
* @en Execute function
*/
execute: (data: T) => Promise<OperationResult>
/**
* @zh 补偿函数
* @en Compensate function
*/
compensate: (data: T) => Promise<void>
/**
* @zh 步骤数据
* @en Step data
*/
data: T
}
/**
* @zh Saga 步骤日志
* @en Saga step log
*/
export interface SagaStepLog {
name: string
serverId?: string
state: SagaStepState
startedAt?: number
completedAt?: number
error?: string
}
/**
* @zh Saga 日志
* @en Saga log
*/
export interface SagaLog {
id: string
state: 'pending' | 'running' | 'completed' | 'compensating' | 'compensated' | 'failed'
steps: SagaStepLog[]
createdAt: number
updatedAt: number
metadata?: Record<string, unknown>
}
/**
* @zh Saga 结果
* @en Saga result
*/
export interface SagaResult {
success: boolean
sagaId: string
completedSteps: string[]
failedStep?: string
error?: string
duration: number
}
/**
* @zh Saga 编排器配置
* @en Saga orchestrator configuration
*/
export interface SagaOrchestratorConfig {
/**
* @zh 存储实例
* @en Storage instance
*/
storage?: ITransactionStorage
/**
* @zh 默认超时时间(毫秒)
* @en Default timeout in milliseconds
*/
timeout?: number
/**
* @zh 服务器 ID
* @en Server ID
*/
serverId?: string
}
/**
* @zh 生成 Saga ID
* @en Generate Saga ID
*/
function generateSagaId(): string {
return `saga_${Date.now().toString(36)}_${Math.random().toString(36).substring(2, 11)}`
}
/**
* @zh Saga 编排器
* @en Saga Orchestrator
*
* @zh 管理分布式事务的 Saga 模式执行流程
* @en Manages Saga pattern execution flow for distributed transactions
*
* @example
* ```typescript
* const orchestrator = new SagaOrchestrator({
* storage: redisStorage,
* serverId: 'server1',
* })
*
* const result = await orchestrator.execute([
* {
* name: 'deduct_currency',
* serverId: 'server1',
* execute: async (data) => {
* // 扣除货币
* return { success: true }
* },
* compensate: async (data) => {
* // 恢复货币
* },
* data: { playerId: '1', amount: 100 },
* },
* {
* name: 'add_item',
* serverId: 'server2',
* execute: async (data) => {
* // 添加物品
* return { success: true }
* },
* compensate: async (data) => {
* // 移除物品
* },
* data: { playerId: '1', itemId: 'sword' },
* },
* ])
* ```
*/
export class SagaOrchestrator {
private _storage: ITransactionStorage | null
private _timeout: number
private _serverId: string
constructor(config: SagaOrchestratorConfig = {}) {
this._storage = config.storage ?? null
this._timeout = config.timeout ?? 30000
this._serverId = config.serverId ?? 'default'
}
/**
* @zh 执行 Saga
* @en Execute Saga
*/
async execute<T>(steps: SagaStep<T>[]): Promise<SagaResult> {
const sagaId = generateSagaId()
const startTime = Date.now()
const completedSteps: string[] = []
const sagaLog: SagaLog = {
id: sagaId,
state: 'pending',
steps: steps.map((s) => ({
name: s.name,
serverId: s.serverId,
state: 'pending' as SagaStepState,
})),
createdAt: startTime,
updatedAt: startTime,
metadata: { orchestratorServerId: this._serverId },
}
await this._saveSagaLog(sagaLog)
try {
sagaLog.state = 'running'
await this._saveSagaLog(sagaLog)
for (let i = 0; i < steps.length; i++) {
const step = steps[i]
if (Date.now() - startTime > this._timeout) {
throw new Error('Saga execution timed out')
}
sagaLog.steps[i].state = 'executing'
sagaLog.steps[i].startedAt = Date.now()
await this._saveSagaLog(sagaLog)
const result = await step.execute(step.data)
if (!result.success) {
sagaLog.steps[i].state = 'failed'
sagaLog.steps[i].error = result.error
await this._saveSagaLog(sagaLog)
throw new Error(result.error ?? `Step ${step.name} failed`)
}
sagaLog.steps[i].state = 'completed'
sagaLog.steps[i].completedAt = Date.now()
completedSteps.push(step.name)
await this._saveSagaLog(sagaLog)
}
sagaLog.state = 'completed'
sagaLog.updatedAt = Date.now()
await this._saveSagaLog(sagaLog)
return {
success: true,
sagaId,
completedSteps,
duration: Date.now() - startTime,
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
const failedStepIndex = completedSteps.length
sagaLog.state = 'compensating'
await this._saveSagaLog(sagaLog)
for (let i = completedSteps.length - 1; i >= 0; i--) {
const step = steps[i]
sagaLog.steps[i].state = 'compensating'
await this._saveSagaLog(sagaLog)
try {
await step.compensate(step.data)
sagaLog.steps[i].state = 'compensated'
} catch (compError) {
const compErrorMessage = compError instanceof Error ? compError.message : String(compError)
sagaLog.steps[i].state = 'failed'
sagaLog.steps[i].error = `Compensation failed: ${compErrorMessage}`
}
await this._saveSagaLog(sagaLog)
}
sagaLog.state = 'compensated'
sagaLog.updatedAt = Date.now()
await this._saveSagaLog(sagaLog)
return {
success: false,
sagaId,
completedSteps,
failedStep: steps[failedStepIndex]?.name,
error: errorMessage,
duration: Date.now() - startTime,
}
}
}
/**
* @zh 恢复未完成的 Saga
* @en Recover pending Sagas
*/
async recover(): Promise<number> {
if (!this._storage) return 0
const pendingSagas = await this._getPendingSagas()
let recoveredCount = 0
for (const saga of pendingSagas) {
try {
await this._recoverSaga(saga)
recoveredCount++
} catch (error) {
console.error(`Failed to recover saga ${saga.id}:`, error)
}
}
return recoveredCount
}
/**
* @zh 获取 Saga 日志
* @en Get Saga log
*/
async getSagaLog(sagaId: string): Promise<SagaLog | null> {
if (!this._storage) return null
return this._storage.get<SagaLog>(`saga:${sagaId}`)
}
private async _saveSagaLog(log: SagaLog): Promise<void> {
if (!this._storage) return
log.updatedAt = Date.now()
await this._storage.set(`saga:${log.id}`, log)
}
private async _getPendingSagas(): Promise<SagaLog[]> {
return []
}
private async _recoverSaga(saga: SagaLog): Promise<void> {
if (saga.state === 'running' || saga.state === 'compensating') {
const completedSteps = saga.steps
.filter((s) => s.state === 'completed')
.map((s) => s.name)
saga.state = 'compensated'
saga.updatedAt = Date.now()
if (this._storage) {
await this._storage.set(`saga:${saga.id}`, saga)
}
}
}
}
/**
* @zh 创建 Saga 编排器
* @en Create Saga orchestrator
*/
export function createSagaOrchestrator(config: SagaOrchestratorConfig = {}): SagaOrchestrator {
return new SagaOrchestrator(config)
}

View File

@@ -0,0 +1,15 @@
/**
* @zh 分布式模块导出
* @en Distributed module exports
*/
export {
SagaOrchestrator,
createSagaOrchestrator,
type SagaOrchestratorConfig,
type SagaStep,
type SagaStepState,
type SagaStepLog,
type SagaLog,
type SagaResult,
} from './SagaOrchestrator.js'

View File

@@ -0,0 +1,165 @@
/**
* @zh @esengine/transaction 事务系统
* @en @esengine/transaction Transaction System
*
* @zh 提供游戏事务处理能力,支持商店购买、玩家交易、分布式事务
* @en Provides game transaction capabilities, supporting shop purchases, player trading, and distributed transactions
*
* @example
* ```typescript
* import {
* TransactionManager,
* MemoryStorage,
* CurrencyOperation,
* InventoryOperation,
* } from '@esengine/transaction'
*
* // 创建事务管理器
* const manager = new TransactionManager({
* storage: new MemoryStorage(),
* })
*
* // 执行事务
* const result = await manager.run((tx) => {
* tx.addOperation(new CurrencyOperation({
* type: 'deduct',
* playerId: 'player1',
* currency: 'gold',
* amount: 100,
* }))
* tx.addOperation(new InventoryOperation({
* type: 'add',
* playerId: 'player1',
* itemId: 'sword',
* quantity: 1,
* }))
* })
*
* if (result.success) {
* console.log('Transaction completed!')
* }
* ```
*/
// =============================================================================
// Core | 核心
// =============================================================================
export type {
TransactionState,
OperationResult,
TransactionResult,
OperationLog,
TransactionLog,
TransactionOptions,
TransactionManagerConfig,
ITransactionStorage,
ITransactionOperation,
ITransactionContext,
} from './core/types.js'
export {
TransactionContext,
createTransactionContext,
} from './core/TransactionContext.js'
export {
TransactionManager,
createTransactionManager,
} from './core/TransactionManager.js'
// =============================================================================
// Storage | 存储
// =============================================================================
export {
MemoryStorage,
createMemoryStorage,
type MemoryStorageConfig,
} from './storage/MemoryStorage.js'
export {
RedisStorage,
createRedisStorage,
type RedisStorageConfig,
type RedisClient,
} from './storage/RedisStorage.js'
export {
MongoStorage,
createMongoStorage,
type MongoStorageConfig,
type MongoDb,
type MongoCollection,
} from './storage/MongoStorage.js'
// =============================================================================
// Operations | 操作
// =============================================================================
export { BaseOperation } from './operations/BaseOperation.js'
export {
CurrencyOperation,
createCurrencyOperation,
type CurrencyOperationType,
type CurrencyOperationData,
type CurrencyOperationResult,
type ICurrencyProvider,
} from './operations/CurrencyOperation.js'
export {
InventoryOperation,
createInventoryOperation,
type InventoryOperationType,
type InventoryOperationData,
type InventoryOperationResult,
type IInventoryProvider,
type ItemData,
} from './operations/InventoryOperation.js'
export {
TradeOperation,
createTradeOperation,
type TradeOperationData,
type TradeOperationResult,
type TradeItem,
type TradeCurrency,
type TradeParty,
type ITradeProvider,
} from './operations/TradeOperation.js'
// =============================================================================
// Distributed | 分布式
// =============================================================================
export {
SagaOrchestrator,
createSagaOrchestrator,
type SagaOrchestratorConfig,
type SagaStep,
type SagaStepState,
type SagaStepLog,
type SagaLog,
type SagaResult,
} from './distributed/SagaOrchestrator.js'
// =============================================================================
// Integration | 集成
// =============================================================================
export {
withTransactions,
TransactionRoom,
type TransactionRoomConfig,
type ITransactionRoom,
} from './integration/RoomTransactionMixin.js'
// =============================================================================
// Tokens | 令牌
// =============================================================================
export {
TransactionManagerToken,
TransactionStorageToken,
} from './tokens.js'

View File

@@ -0,0 +1,174 @@
/**
* @zh Room 事务扩展
* @en Room transaction extension
*/
import type {
ITransactionStorage,
ITransactionContext,
TransactionOptions,
TransactionResult,
} from '../core/types.js'
import { TransactionManager } from '../core/TransactionManager.js'
/**
* @zh 事务 Room 配置
* @en Transaction Room configuration
*/
export interface TransactionRoomConfig {
/**
* @zh 存储实例
* @en Storage instance
*/
storage?: ITransactionStorage
/**
* @zh 默认超时时间(毫秒)
* @en Default timeout in milliseconds
*/
defaultTimeout?: number
/**
* @zh 服务器 ID
* @en Server ID
*/
serverId?: string
}
/**
* @zh 事务 Room 接口
* @en Transaction Room interface
*/
export interface ITransactionRoom {
/**
* @zh 事务管理器
* @en Transaction manager
*/
readonly transactions: TransactionManager
/**
* @zh 开始事务
* @en Begin transaction
*/
beginTransaction(options?: TransactionOptions): ITransactionContext
/**
* @zh 执行事务
* @en Run transaction
*/
runTransaction<T = unknown>(
builder: (ctx: ITransactionContext) => void | Promise<void>,
options?: TransactionOptions
): Promise<TransactionResult<T>>
}
/**
* @zh 创建事务 Room mixin
* @en Create transaction Room mixin
*
* @example
* ```typescript
* import { Room } from '@esengine/server'
* import { withTransactions, RedisStorage } from '@esengine/transaction'
*
* class GameRoom extends withTransactions(Room, {
* storage: new RedisStorage({ client: redisClient }),
* }) {
* async handleBuy(itemId: string, player: Player) {
* const result = await this.runTransaction((tx) => {
* tx.addOperation(new CurrencyOperation({
* type: 'deduct',
* playerId: player.id,
* currency: 'gold',
* amount: 100,
* }))
* })
*
* if (result.success) {
* player.send('buy_success', { itemId })
* }
* }
* }
* ```
*/
export function withTransactions<TBase extends new (...args: any[]) => any>(
Base: TBase,
config: TransactionRoomConfig = {}
): TBase & (new (...args: any[]) => ITransactionRoom) {
return class TransactionRoom extends Base implements ITransactionRoom {
private _transactionManager: TransactionManager
constructor(...args: any[]) {
super(...args)
this._transactionManager = new TransactionManager({
storage: config.storage,
defaultTimeout: config.defaultTimeout,
serverId: config.serverId,
})
}
get transactions(): TransactionManager {
return this._transactionManager
}
beginTransaction(options?: TransactionOptions): ITransactionContext {
return this._transactionManager.begin(options)
}
runTransaction<T = unknown>(
builder: (ctx: ITransactionContext) => void | Promise<void>,
options?: TransactionOptions
): Promise<TransactionResult<T>> {
return this._transactionManager.run<T>(builder, options)
}
}
}
/**
* @zh 事务 Room 抽象基类
* @en Transaction Room abstract base class
*
* @zh 可以直接继承使用,也可以使用 withTransactions mixin
* @en Can be extended directly or use withTransactions mixin
*
* @example
* ```typescript
* class GameRoom extends TransactionRoom {
* constructor() {
* super({ storage: new RedisStorage({ client: redisClient }) })
* }
*
* async handleTrade(data: TradeData, player: Player) {
* const result = await this.runTransaction((tx) => {
* // 添加交易操作
* })
* }
* }
* ```
*/
export abstract class TransactionRoom implements ITransactionRoom {
private _transactionManager: TransactionManager
constructor(config: TransactionRoomConfig = {}) {
this._transactionManager = new TransactionManager({
storage: config.storage,
defaultTimeout: config.defaultTimeout,
serverId: config.serverId,
})
}
get transactions(): TransactionManager {
return this._transactionManager
}
beginTransaction(options?: TransactionOptions): ITransactionContext {
return this._transactionManager.begin(options)
}
runTransaction<T = unknown>(
builder: (ctx: ITransactionContext) => void | Promise<void>,
options?: TransactionOptions
): Promise<TransactionResult<T>> {
return this._transactionManager.run<T>(builder, options)
}
}

View File

@@ -0,0 +1,11 @@
/**
* @zh 集成模块导出
* @en Integration module exports
*/
export {
withTransactions,
TransactionRoom,
type TransactionRoomConfig,
type ITransactionRoom,
} from './RoomTransactionMixin.js'

View File

@@ -0,0 +1,64 @@
/**
* @zh 操作基类
* @en Base operation class
*/
import type {
ITransactionOperation,
ITransactionContext,
OperationResult,
} from '../core/types.js'
/**
* @zh 操作基类
* @en Base operation class
*
* @zh 提供通用的操作实现模板
* @en Provides common operation implementation template
*/
export abstract class BaseOperation<TData = unknown, TResult = unknown>
implements ITransactionOperation<TData, TResult>
{
abstract readonly name: string
readonly data: TData
constructor(data: TData) {
this.data = data
}
/**
* @zh 验证前置条件(默认通过)
* @en Validate preconditions (passes by default)
*/
async validate(_ctx: ITransactionContext): Promise<boolean> {
return true
}
/**
* @zh 执行操作
* @en Execute operation
*/
abstract execute(ctx: ITransactionContext): Promise<OperationResult<TResult>>
/**
* @zh 补偿操作
* @en Compensate operation
*/
abstract compensate(ctx: ITransactionContext): Promise<void>
/**
* @zh 创建成功结果
* @en Create success result
*/
protected success(data?: TResult): OperationResult<TResult> {
return { success: true, data }
}
/**
* @zh 创建失败结果
* @en Create failure result
*/
protected failure(error: string, errorCode?: string): OperationResult<TResult> {
return { success: false, error, errorCode }
}
}

View File

@@ -0,0 +1,208 @@
/**
* @zh 货币操作
* @en Currency operation
*/
import type { ITransactionContext, OperationResult } from '../core/types.js'
import { BaseOperation } from './BaseOperation.js'
/**
* @zh 货币操作类型
* @en Currency operation type
*/
export type CurrencyOperationType = 'add' | 'deduct'
/**
* @zh 货币操作数据
* @en Currency operation data
*/
export interface CurrencyOperationData {
/**
* @zh 操作类型
* @en Operation type
*/
type: CurrencyOperationType
/**
* @zh 玩家 ID
* @en Player ID
*/
playerId: string
/**
* @zh 货币类型(如 gold, diamond 等)
* @en Currency type (e.g., gold, diamond)
*/
currency: string
/**
* @zh 数量
* @en Amount
*/
amount: number
/**
* @zh 原因/来源
* @en Reason/source
*/
reason?: string
}
/**
* @zh 货币操作结果
* @en Currency operation result
*/
export interface CurrencyOperationResult {
/**
* @zh 操作前余额
* @en Balance before operation
*/
beforeBalance: number
/**
* @zh 操作后余额
* @en Balance after operation
*/
afterBalance: number
}
/**
* @zh 货币数据提供者接口
* @en Currency data provider interface
*/
export interface ICurrencyProvider {
/**
* @zh 获取货币余额
* @en Get currency balance
*/
getBalance(playerId: string, currency: string): Promise<number>
/**
* @zh 设置货币余额
* @en Set currency balance
*/
setBalance(playerId: string, currency: string, amount: number): Promise<void>
}
/**
* @zh 货币操作
* @en Currency operation
*
* @zh 用于处理货币的增加和扣除
* @en Used for handling currency addition and deduction
*
* @example
* ```typescript
* // 扣除金币
* tx.addOperation(new CurrencyOperation({
* type: 'deduct',
* playerId: 'player1',
* currency: 'gold',
* amount: 100,
* reason: 'purchase_item',
* }))
*
* // 增加钻石
* tx.addOperation(new CurrencyOperation({
* type: 'add',
* playerId: 'player1',
* currency: 'diamond',
* amount: 50,
* }))
* ```
*/
export class CurrencyOperation extends BaseOperation<CurrencyOperationData, CurrencyOperationResult> {
readonly name = 'currency'
private _provider: ICurrencyProvider | null = null
private _beforeBalance: number = 0
/**
* @zh 设置货币数据提供者
* @en Set currency data provider
*/
setProvider(provider: ICurrencyProvider): this {
this._provider = provider
return this
}
async validate(ctx: ITransactionContext): Promise<boolean> {
if (this.data.amount <= 0) {
return false
}
if (this.data.type === 'deduct') {
const balance = await this._getBalance(ctx)
return balance >= this.data.amount
}
return true
}
async execute(ctx: ITransactionContext): Promise<OperationResult<CurrencyOperationResult>> {
const { type, playerId, currency, amount } = this.data
this._beforeBalance = await this._getBalance(ctx)
let afterBalance: number
if (type === 'add') {
afterBalance = this._beforeBalance + amount
} else {
if (this._beforeBalance < amount) {
return this.failure('Insufficient balance', 'INSUFFICIENT_BALANCE')
}
afterBalance = this._beforeBalance - amount
}
await this._setBalance(ctx, afterBalance)
ctx.set(`currency:${playerId}:${currency}:before`, this._beforeBalance)
ctx.set(`currency:${playerId}:${currency}:after`, afterBalance)
return this.success({
beforeBalance: this._beforeBalance,
afterBalance,
})
}
async compensate(ctx: ITransactionContext): Promise<void> {
await this._setBalance(ctx, this._beforeBalance)
}
private async _getBalance(ctx: ITransactionContext): Promise<number> {
const { playerId, currency } = this.data
if (this._provider) {
return this._provider.getBalance(playerId, currency)
}
if (ctx.storage) {
const balance = await ctx.storage.get<number>(`player:${playerId}:currency:${currency}`)
return balance ?? 0
}
return 0
}
private async _setBalance(ctx: ITransactionContext, amount: number): Promise<void> {
const { playerId, currency } = this.data
if (this._provider) {
await this._provider.setBalance(playerId, currency, amount)
return
}
if (ctx.storage) {
await ctx.storage.set(`player:${playerId}:currency:${currency}`, amount)
}
}
}
/**
* @zh 创建货币操作
* @en Create currency operation
*/
export function createCurrencyOperation(data: CurrencyOperationData): CurrencyOperation {
return new CurrencyOperation(data)
}

View File

@@ -0,0 +1,291 @@
/**
* @zh 背包操作
* @en Inventory operation
*/
import type { ITransactionContext, OperationResult } from '../core/types.js'
import { BaseOperation } from './BaseOperation.js'
/**
* @zh 背包操作类型
* @en Inventory operation type
*/
export type InventoryOperationType = 'add' | 'remove' | 'update'
/**
* @zh 物品数据
* @en Item data
*/
export interface ItemData {
/**
* @zh 物品 ID
* @en Item ID
*/
itemId: string
/**
* @zh 数量
* @en Quantity
*/
quantity: number
/**
* @zh 物品属性
* @en Item properties
*/
properties?: Record<string, unknown>
}
/**
* @zh 背包操作数据
* @en Inventory operation data
*/
export interface InventoryOperationData {
/**
* @zh 操作类型
* @en Operation type
*/
type: InventoryOperationType
/**
* @zh 玩家 ID
* @en Player ID
*/
playerId: string
/**
* @zh 物品 ID
* @en Item ID
*/
itemId: string
/**
* @zh 数量
* @en Quantity
*/
quantity: number
/**
* @zh 物品属性(用于更新)
* @en Item properties (for update)
*/
properties?: Record<string, unknown>
/**
* @zh 原因/来源
* @en Reason/source
*/
reason?: string
}
/**
* @zh 背包操作结果
* @en Inventory operation result
*/
export interface InventoryOperationResult {
/**
* @zh 操作前的物品数据
* @en Item data before operation
*/
beforeItem?: ItemData
/**
* @zh 操作后的物品数据
* @en Item data after operation
*/
afterItem?: ItemData
}
/**
* @zh 背包数据提供者接口
* @en Inventory data provider interface
*/
export interface IInventoryProvider {
/**
* @zh 获取物品
* @en Get item
*/
getItem(playerId: string, itemId: string): Promise<ItemData | null>
/**
* @zh 设置物品
* @en Set item
*/
setItem(playerId: string, itemId: string, item: ItemData | null): Promise<void>
/**
* @zh 检查背包容量
* @en Check inventory capacity
*/
hasCapacity?(playerId: string, count: number): Promise<boolean>
}
/**
* @zh 背包操作
* @en Inventory operation
*
* @zh 用于处理物品的添加、移除和更新
* @en Used for handling item addition, removal, and update
*
* @example
* ```typescript
* // 添加物品
* tx.addOperation(new InventoryOperation({
* type: 'add',
* playerId: 'player1',
* itemId: 'sword_001',
* quantity: 1,
* }))
*
* // 移除物品
* tx.addOperation(new InventoryOperation({
* type: 'remove',
* playerId: 'player1',
* itemId: 'potion_hp',
* quantity: 5,
* }))
* ```
*/
export class InventoryOperation extends BaseOperation<InventoryOperationData, InventoryOperationResult> {
readonly name = 'inventory'
private _provider: IInventoryProvider | null = null
private _beforeItem: ItemData | null = null
/**
* @zh 设置背包数据提供者
* @en Set inventory data provider
*/
setProvider(provider: IInventoryProvider): this {
this._provider = provider
return this
}
async validate(ctx: ITransactionContext): Promise<boolean> {
const { type, quantity } = this.data
if (quantity <= 0) {
return false
}
if (type === 'remove') {
const item = await this._getItem(ctx)
return item !== null && item.quantity >= quantity
}
if (type === 'add' && this._provider?.hasCapacity) {
return this._provider.hasCapacity(this.data.playerId, 1)
}
return true
}
async execute(ctx: ITransactionContext): Promise<OperationResult<InventoryOperationResult>> {
const { type, playerId, itemId, quantity, properties } = this.data
this._beforeItem = await this._getItem(ctx)
let afterItem: ItemData | null = null
switch (type) {
case 'add': {
if (this._beforeItem) {
afterItem = {
...this._beforeItem,
quantity: this._beforeItem.quantity + quantity,
}
} else {
afterItem = {
itemId,
quantity,
properties,
}
}
break
}
case 'remove': {
if (!this._beforeItem || this._beforeItem.quantity < quantity) {
return this.failure('Insufficient item quantity', 'INSUFFICIENT_ITEM')
}
const newQuantity = this._beforeItem.quantity - quantity
if (newQuantity > 0) {
afterItem = {
...this._beforeItem,
quantity: newQuantity,
}
} else {
afterItem = null
}
break
}
case 'update': {
if (!this._beforeItem) {
return this.failure('Item not found', 'ITEM_NOT_FOUND')
}
afterItem = {
...this._beforeItem,
quantity: quantity > 0 ? quantity : this._beforeItem.quantity,
properties: properties ?? this._beforeItem.properties,
}
break
}
}
await this._setItem(ctx, afterItem)
ctx.set(`inventory:${playerId}:${itemId}:before`, this._beforeItem)
ctx.set(`inventory:${playerId}:${itemId}:after`, afterItem)
return this.success({
beforeItem: this._beforeItem ?? undefined,
afterItem: afterItem ?? undefined,
})
}
async compensate(ctx: ITransactionContext): Promise<void> {
await this._setItem(ctx, this._beforeItem)
}
private async _getItem(ctx: ITransactionContext): Promise<ItemData | null> {
const { playerId, itemId } = this.data
if (this._provider) {
return this._provider.getItem(playerId, itemId)
}
if (ctx.storage) {
return ctx.storage.get<ItemData>(`player:${playerId}:inventory:${itemId}`)
}
return null
}
private async _setItem(ctx: ITransactionContext, item: ItemData | null): Promise<void> {
const { playerId, itemId } = this.data
if (this._provider) {
await this._provider.setItem(playerId, itemId, item)
return
}
if (ctx.storage) {
if (item) {
await ctx.storage.set(`player:${playerId}:inventory:${itemId}`, item)
} else {
await ctx.storage.delete(`player:${playerId}:inventory:${itemId}`)
}
}
}
}
/**
* @zh 创建背包操作
* @en Create inventory operation
*/
export function createInventoryOperation(data: InventoryOperationData): InventoryOperation {
return new InventoryOperation(data)
}

View File

@@ -0,0 +1,331 @@
/**
* @zh 交易操作
* @en Trade operation
*/
import type { ITransactionContext, OperationResult } from '../core/types.js'
import { BaseOperation } from './BaseOperation.js'
import { CurrencyOperation, type CurrencyOperationData, type ICurrencyProvider } from './CurrencyOperation.js'
import { InventoryOperation, type InventoryOperationData, type IInventoryProvider, type ItemData } from './InventoryOperation.js'
/**
* @zh 交易物品
* @en Trade item
*/
export interface TradeItem {
/**
* @zh 物品 ID
* @en Item ID
*/
itemId: string
/**
* @zh 数量
* @en Quantity
*/
quantity: number
}
/**
* @zh 交易货币
* @en Trade currency
*/
export interface TradeCurrency {
/**
* @zh 货币类型
* @en Currency type
*/
currency: string
/**
* @zh 数量
* @en Amount
*/
amount: number
}
/**
* @zh 交易方数据
* @en Trade party data
*/
export interface TradeParty {
/**
* @zh 玩家 ID
* @en Player ID
*/
playerId: string
/**
* @zh 给出的物品
* @en Items to give
*/
items?: TradeItem[]
/**
* @zh 给出的货币
* @en Currencies to give
*/
currencies?: TradeCurrency[]
}
/**
* @zh 交易操作数据
* @en Trade operation data
*/
export interface TradeOperationData {
/**
* @zh 交易 ID
* @en Trade ID
*/
tradeId: string
/**
* @zh 交易发起方
* @en Trade initiator
*/
partyA: TradeParty
/**
* @zh 交易接收方
* @en Trade receiver
*/
partyB: TradeParty
/**
* @zh 原因/备注
* @en Reason/note
*/
reason?: string
}
/**
* @zh 交易操作结果
* @en Trade operation result
*/
export interface TradeOperationResult {
/**
* @zh 交易 ID
* @en Trade ID
*/
tradeId: string
/**
* @zh 交易是否成功
* @en Whether trade succeeded
*/
completed: boolean
}
/**
* @zh 交易数据提供者
* @en Trade data provider
*/
export interface ITradeProvider {
currencyProvider?: ICurrencyProvider
inventoryProvider?: IInventoryProvider
}
/**
* @zh 交易操作
* @en Trade operation
*
* @zh 用于处理玩家之间的物品和货币交换
* @en Used for handling item and currency exchange between players
*
* @example
* ```typescript
* tx.addOperation(new TradeOperation({
* tradeId: 'trade_001',
* partyA: {
* playerId: 'player1',
* items: [{ itemId: 'sword', quantity: 1 }],
* },
* partyB: {
* playerId: 'player2',
* currencies: [{ currency: 'gold', amount: 1000 }],
* },
* }))
* ```
*/
export class TradeOperation extends BaseOperation<TradeOperationData, TradeOperationResult> {
readonly name = 'trade'
private _provider: ITradeProvider | null = null
private _subOperations: (CurrencyOperation | InventoryOperation)[] = []
private _executedCount = 0
/**
* @zh 设置交易数据提供者
* @en Set trade data provider
*/
setProvider(provider: ITradeProvider): this {
this._provider = provider
return this
}
async validate(ctx: ITransactionContext): Promise<boolean> {
this._buildSubOperations()
for (const op of this._subOperations) {
const isValid = await op.validate(ctx)
if (!isValid) {
return false
}
}
return true
}
async execute(ctx: ITransactionContext): Promise<OperationResult<TradeOperationResult>> {
this._buildSubOperations()
this._executedCount = 0
try {
for (const op of this._subOperations) {
const result = await op.execute(ctx)
if (!result.success) {
await this._compensateExecuted(ctx)
return this.failure(result.error ?? 'Trade operation failed', 'TRADE_FAILED')
}
this._executedCount++
}
return this.success({
tradeId: this.data.tradeId,
completed: true,
})
} catch (error) {
await this._compensateExecuted(ctx)
const errorMessage = error instanceof Error ? error.message : String(error)
return this.failure(errorMessage, 'TRADE_ERROR')
}
}
async compensate(ctx: ITransactionContext): Promise<void> {
await this._compensateExecuted(ctx)
}
private _buildSubOperations(): void {
if (this._subOperations.length > 0) return
const { partyA, partyB } = this.data
if (partyA.items) {
for (const item of partyA.items) {
const removeOp = new InventoryOperation({
type: 'remove',
playerId: partyA.playerId,
itemId: item.itemId,
quantity: item.quantity,
reason: `trade:${this.data.tradeId}:give`,
})
const addOp = new InventoryOperation({
type: 'add',
playerId: partyB.playerId,
itemId: item.itemId,
quantity: item.quantity,
reason: `trade:${this.data.tradeId}:receive`,
})
if (this._provider?.inventoryProvider) {
removeOp.setProvider(this._provider.inventoryProvider)
addOp.setProvider(this._provider.inventoryProvider)
}
this._subOperations.push(removeOp, addOp)
}
}
if (partyA.currencies) {
for (const curr of partyA.currencies) {
const deductOp = new CurrencyOperation({
type: 'deduct',
playerId: partyA.playerId,
currency: curr.currency,
amount: curr.amount,
reason: `trade:${this.data.tradeId}:give`,
})
const addOp = new CurrencyOperation({
type: 'add',
playerId: partyB.playerId,
currency: curr.currency,
amount: curr.amount,
reason: `trade:${this.data.tradeId}:receive`,
})
if (this._provider?.currencyProvider) {
deductOp.setProvider(this._provider.currencyProvider)
addOp.setProvider(this._provider.currencyProvider)
}
this._subOperations.push(deductOp, addOp)
}
}
if (partyB.items) {
for (const item of partyB.items) {
const removeOp = new InventoryOperation({
type: 'remove',
playerId: partyB.playerId,
itemId: item.itemId,
quantity: item.quantity,
reason: `trade:${this.data.tradeId}:give`,
})
const addOp = new InventoryOperation({
type: 'add',
playerId: partyA.playerId,
itemId: item.itemId,
quantity: item.quantity,
reason: `trade:${this.data.tradeId}:receive`,
})
if (this._provider?.inventoryProvider) {
removeOp.setProvider(this._provider.inventoryProvider)
addOp.setProvider(this._provider.inventoryProvider)
}
this._subOperations.push(removeOp, addOp)
}
}
if (partyB.currencies) {
for (const curr of partyB.currencies) {
const deductOp = new CurrencyOperation({
type: 'deduct',
playerId: partyB.playerId,
currency: curr.currency,
amount: curr.amount,
reason: `trade:${this.data.tradeId}:give`,
})
const addOp = new CurrencyOperation({
type: 'add',
playerId: partyA.playerId,
currency: curr.currency,
amount: curr.amount,
reason: `trade:${this.data.tradeId}:receive`,
})
if (this._provider?.currencyProvider) {
deductOp.setProvider(this._provider.currencyProvider)
addOp.setProvider(this._provider.currencyProvider)
}
this._subOperations.push(deductOp, addOp)
}
}
}
private async _compensateExecuted(ctx: ITransactionContext): Promise<void> {
for (let i = this._executedCount - 1; i >= 0; i--) {
await this._subOperations[i].compensate(ctx)
}
}
}
/**
* @zh 创建交易操作
* @en Create trade operation
*/
export function createTradeOperation(data: TradeOperationData): TradeOperation {
return new TradeOperation(data)
}

View File

@@ -0,0 +1,36 @@
/**
* @zh 操作模块导出
* @en Operations module exports
*/
export { BaseOperation } from './BaseOperation.js'
export {
CurrencyOperation,
createCurrencyOperation,
type CurrencyOperationType,
type CurrencyOperationData,
type CurrencyOperationResult,
type ICurrencyProvider,
} from './CurrencyOperation.js'
export {
InventoryOperation,
createInventoryOperation,
type InventoryOperationType,
type InventoryOperationData,
type InventoryOperationResult,
type IInventoryProvider,
type ItemData,
} from './InventoryOperation.js'
export {
TradeOperation,
createTradeOperation,
type TradeOperationData,
type TradeOperationResult,
type TradeItem,
type TradeCurrency,
type TradeParty,
type ITradeProvider,
} from './TradeOperation.js'

View File

@@ -0,0 +1,229 @@
/**
* @zh 内存存储实现
* @en Memory storage implementation
*
* @zh 用于开发和测试环境,不支持分布式
* @en For development and testing, does not support distributed scenarios
*/
import type {
ITransactionStorage,
TransactionLog,
TransactionState,
OperationLog,
} from '../core/types.js'
/**
* @zh 内存存储配置
* @en Memory storage configuration
*/
export interface MemoryStorageConfig {
/**
* @zh 最大事务日志数量
* @en Maximum transaction log count
*/
maxTransactions?: number
}
/**
* @zh 内存存储
* @en Memory storage
*
* @zh 适用于单机开发和测试,数据仅保存在内存中
* @en Suitable for single-machine development and testing, data is stored in memory only
*/
export class MemoryStorage implements ITransactionStorage {
private _transactions: Map<string, TransactionLog> = new Map()
private _data: Map<string, { value: unknown; expireAt?: number }> = new Map()
private _locks: Map<string, { token: string; expireAt: number }> = new Map()
private _maxTransactions: number
constructor(config: MemoryStorageConfig = {}) {
this._maxTransactions = config.maxTransactions ?? 1000
}
// =========================================================================
// 分布式锁 | Distributed Lock
// =========================================================================
async acquireLock(key: string, ttl: number): Promise<string | null> {
this._cleanExpiredLocks()
const existing = this._locks.get(key)
if (existing && existing.expireAt > Date.now()) {
return null
}
const token = `lock_${Date.now()}_${Math.random().toString(36).substring(2)}`
this._locks.set(key, {
token,
expireAt: Date.now() + ttl,
})
return token
}
async releaseLock(key: string, token: string): Promise<boolean> {
const lock = this._locks.get(key)
if (!lock || lock.token !== token) {
return false
}
this._locks.delete(key)
return true
}
// =========================================================================
// 事务日志 | Transaction Log
// =========================================================================
async saveTransaction(tx: TransactionLog): Promise<void> {
if (this._transactions.size >= this._maxTransactions) {
this._cleanOldTransactions()
}
this._transactions.set(tx.id, { ...tx })
}
async getTransaction(id: string): Promise<TransactionLog | null> {
const tx = this._transactions.get(id)
return tx ? { ...tx } : null
}
async updateTransactionState(id: string, state: TransactionState): Promise<void> {
const tx = this._transactions.get(id)
if (tx) {
tx.state = state
tx.updatedAt = Date.now()
}
}
async updateOperationState(
transactionId: string,
operationIndex: number,
state: OperationLog['state'],
error?: string
): Promise<void> {
const tx = this._transactions.get(transactionId)
if (tx && tx.operations[operationIndex]) {
tx.operations[operationIndex].state = state
if (error) {
tx.operations[operationIndex].error = error
}
if (state === 'executed') {
tx.operations[operationIndex].executedAt = Date.now()
} else if (state === 'compensated') {
tx.operations[operationIndex].compensatedAt = Date.now()
}
tx.updatedAt = Date.now()
}
}
async getPendingTransactions(serverId?: string): Promise<TransactionLog[]> {
const result: TransactionLog[] = []
for (const tx of this._transactions.values()) {
if (tx.state === 'pending' || tx.state === 'executing') {
if (!serverId || tx.metadata?.serverId === serverId) {
result.push({ ...tx })
}
}
}
return result
}
async deleteTransaction(id: string): Promise<void> {
this._transactions.delete(id)
}
// =========================================================================
// 数据操作 | Data Operations
// =========================================================================
async get<T>(key: string): Promise<T | null> {
this._cleanExpiredData()
const entry = this._data.get(key)
if (!entry) return null
if (entry.expireAt && entry.expireAt < Date.now()) {
this._data.delete(key)
return null
}
return entry.value as T
}
async set<T>(key: string, value: T, ttl?: number): Promise<void> {
this._data.set(key, {
value,
expireAt: ttl ? Date.now() + ttl : undefined,
})
}
async delete(key: string): Promise<boolean> {
return this._data.delete(key)
}
// =========================================================================
// 辅助方法 | Helper methods
// =========================================================================
/**
* @zh 清空所有数据(测试用)
* @en Clear all data (for testing)
*/
clear(): void {
this._transactions.clear()
this._data.clear()
this._locks.clear()
}
/**
* @zh 获取事务数量
* @en Get transaction count
*/
get transactionCount(): number {
return this._transactions.size
}
private _cleanExpiredLocks(): void {
const now = Date.now()
for (const [key, lock] of this._locks) {
if (lock.expireAt < now) {
this._locks.delete(key)
}
}
}
private _cleanExpiredData(): void {
const now = Date.now()
for (const [key, entry] of this._data) {
if (entry.expireAt && entry.expireAt < now) {
this._data.delete(key)
}
}
}
private _cleanOldTransactions(): void {
const sorted = Array.from(this._transactions.entries())
.sort((a, b) => a[1].createdAt - b[1].createdAt)
const toRemove = sorted
.slice(0, Math.floor(this._maxTransactions * 0.2))
.filter(([_, tx]) => tx.state === 'committed' || tx.state === 'rolledback')
for (const [id] of toRemove) {
this._transactions.delete(id)
}
}
}
/**
* @zh 创建内存存储
* @en Create memory storage
*/
export function createMemoryStorage(config: MemoryStorageConfig = {}): MemoryStorage {
return new MemoryStorage(config)
}

View File

@@ -0,0 +1,303 @@
/**
* @zh MongoDB 存储实现
* @en MongoDB storage implementation
*
* @zh 支持持久化事务日志和查询
* @en Supports persistent transaction logs and queries
*/
import type {
ITransactionStorage,
TransactionLog,
TransactionState,
OperationLog,
} from '../core/types.js'
/**
* @zh MongoDB Collection 接口
* @en MongoDB Collection interface
*/
export interface MongoCollection<T> {
findOne(filter: object): Promise<T | null>
find(filter: object): {
toArray(): Promise<T[]>
}
insertOne(doc: T): Promise<{ insertedId: unknown }>
updateOne(filter: object, update: object): Promise<{ modifiedCount: number }>
deleteOne(filter: object): Promise<{ deletedCount: number }>
createIndex(spec: object, options?: object): Promise<string>
}
/**
* @zh MongoDB 数据库接口
* @en MongoDB database interface
*/
export interface MongoDb {
collection<T = unknown>(name: string): MongoCollection<T>
}
/**
* @zh MongoDB 存储配置
* @en MongoDB storage configuration
*/
export interface MongoStorageConfig {
/**
* @zh MongoDB 数据库实例
* @en MongoDB database instance
*/
db: MongoDb
/**
* @zh 事务日志集合名称
* @en Transaction log collection name
*/
transactionCollection?: string
/**
* @zh 数据集合名称
* @en Data collection name
*/
dataCollection?: string
/**
* @zh 锁集合名称
* @en Lock collection name
*/
lockCollection?: string
}
interface LockDocument {
_id: string
token: string
expireAt: Date
}
interface DataDocument {
_id: string
value: unknown
expireAt?: Date
}
/**
* @zh MongoDB 存储
* @en MongoDB storage
*
* @zh 基于 MongoDB 的事务存储,支持持久化和复杂查询
* @en MongoDB-based transaction storage with persistence and complex query support
*
* @example
* ```typescript
* import { MongoClient } from 'mongodb'
*
* const client = new MongoClient('mongodb://localhost:27017')
* await client.connect()
* const db = client.db('game')
*
* const storage = new MongoStorage({ db })
* await storage.ensureIndexes()
* ```
*/
export class MongoStorage implements ITransactionStorage {
private _db: MongoDb
private _transactionCollection: string
private _dataCollection: string
private _lockCollection: string
constructor(config: MongoStorageConfig) {
this._db = config.db
this._transactionCollection = config.transactionCollection ?? 'transactions'
this._dataCollection = config.dataCollection ?? 'transaction_data'
this._lockCollection = config.lockCollection ?? 'transaction_locks'
}
/**
* @zh 确保索引存在
* @en Ensure indexes exist
*/
async ensureIndexes(): Promise<void> {
const txColl = this._db.collection<TransactionLog>(this._transactionCollection)
await txColl.createIndex({ state: 1 })
await txColl.createIndex({ 'metadata.serverId': 1 })
await txColl.createIndex({ createdAt: 1 })
const lockColl = this._db.collection<LockDocument>(this._lockCollection)
await lockColl.createIndex({ expireAt: 1 }, { expireAfterSeconds: 0 })
const dataColl = this._db.collection<DataDocument>(this._dataCollection)
await dataColl.createIndex({ expireAt: 1 }, { expireAfterSeconds: 0 })
}
// =========================================================================
// 分布式锁 | Distributed Lock
// =========================================================================
async acquireLock(key: string, ttl: number): Promise<string | null> {
const coll = this._db.collection<LockDocument>(this._lockCollection)
const token = `${Date.now()}_${Math.random().toString(36).substring(2)}`
const expireAt = new Date(Date.now() + ttl)
try {
await coll.insertOne({
_id: key,
token,
expireAt,
})
return token
} catch (error) {
const existing = await coll.findOne({ _id: key })
if (existing && existing.expireAt < new Date()) {
const result = await coll.updateOne(
{ _id: key, expireAt: { $lt: new Date() } },
{ $set: { token, expireAt } }
)
if (result.modifiedCount > 0) {
return token
}
}
return null
}
}
async releaseLock(key: string, token: string): Promise<boolean> {
const coll = this._db.collection<LockDocument>(this._lockCollection)
const result = await coll.deleteOne({ _id: key, token })
return result.deletedCount > 0
}
// =========================================================================
// 事务日志 | Transaction Log
// =========================================================================
async saveTransaction(tx: TransactionLog): Promise<void> {
const coll = this._db.collection<TransactionLog & { _id: string }>(this._transactionCollection)
const existing = await coll.findOne({ _id: tx.id })
if (existing) {
await coll.updateOne(
{ _id: tx.id },
{ $set: { ...tx, _id: tx.id } }
)
} else {
await coll.insertOne({ ...tx, _id: tx.id })
}
}
async getTransaction(id: string): Promise<TransactionLog | null> {
const coll = this._db.collection<TransactionLog & { _id: string }>(this._transactionCollection)
const doc = await coll.findOne({ _id: id })
if (!doc) return null
const { _id, ...tx } = doc
return tx as TransactionLog
}
async updateTransactionState(id: string, state: TransactionState): Promise<void> {
const coll = this._db.collection(this._transactionCollection)
await coll.updateOne(
{ _id: id },
{ $set: { state, updatedAt: Date.now() } }
)
}
async updateOperationState(
transactionId: string,
operationIndex: number,
state: OperationLog['state'],
error?: string
): Promise<void> {
const coll = this._db.collection(this._transactionCollection)
const update: Record<string, unknown> = {
[`operations.${operationIndex}.state`]: state,
updatedAt: Date.now(),
}
if (error) {
update[`operations.${operationIndex}.error`] = error
}
if (state === 'executed') {
update[`operations.${operationIndex}.executedAt`] = Date.now()
} else if (state === 'compensated') {
update[`operations.${operationIndex}.compensatedAt`] = Date.now()
}
await coll.updateOne(
{ _id: transactionId },
{ $set: update }
)
}
async getPendingTransactions(serverId?: string): Promise<TransactionLog[]> {
const coll = this._db.collection<TransactionLog & { _id: string }>(this._transactionCollection)
const filter: Record<string, unknown> = {
state: { $in: ['pending', 'executing'] },
}
if (serverId) {
filter['metadata.serverId'] = serverId
}
const docs = await coll.find(filter).toArray()
return docs.map(({ _id, ...tx }) => tx as TransactionLog)
}
async deleteTransaction(id: string): Promise<void> {
const coll = this._db.collection(this._transactionCollection)
await coll.deleteOne({ _id: id })
}
// =========================================================================
// 数据操作 | Data Operations
// =========================================================================
async get<T>(key: string): Promise<T | null> {
const coll = this._db.collection<DataDocument>(this._dataCollection)
const doc = await coll.findOne({ _id: key })
if (!doc) return null
if (doc.expireAt && doc.expireAt < new Date()) {
await coll.deleteOne({ _id: key })
return null
}
return doc.value as T
}
async set<T>(key: string, value: T, ttl?: number): Promise<void> {
const coll = this._db.collection<DataDocument>(this._dataCollection)
const doc: DataDocument = {
_id: key,
value,
}
if (ttl) {
doc.expireAt = new Date(Date.now() + ttl)
}
const existing = await coll.findOne({ _id: key })
if (existing) {
await coll.updateOne({ _id: key }, { $set: doc })
} else {
await coll.insertOne(doc)
}
}
async delete(key: string): Promise<boolean> {
const coll = this._db.collection(this._dataCollection)
const result = await coll.deleteOne({ _id: key })
return result.deletedCount > 0
}
}
/**
* @zh 创建 MongoDB 存储
* @en Create MongoDB storage
*/
export function createMongoStorage(config: MongoStorageConfig): MongoStorage {
return new MongoStorage(config)
}

View File

@@ -0,0 +1,244 @@
/**
* @zh Redis 存储实现
* @en Redis storage implementation
*
* @zh 支持分布式锁和快速缓存
* @en Supports distributed locking and fast caching
*/
import type {
ITransactionStorage,
TransactionLog,
TransactionState,
OperationLog,
} from '../core/types.js'
/**
* @zh Redis 客户端接口(兼容 ioredis
* @en Redis client interface (compatible with ioredis)
*/
export interface RedisClient {
get(key: string): Promise<string | null>
set(key: string, value: string, ...args: string[]): Promise<string | null>
del(...keys: string[]): Promise<number>
eval(script: string, numkeys: number, ...args: (string | number)[]): Promise<unknown>
hget(key: string, field: string): Promise<string | null>
hset(key: string, ...args: (string | number)[]): Promise<number>
hdel(key: string, ...fields: string[]): Promise<number>
hgetall(key: string): Promise<Record<string, string>>
keys(pattern: string): Promise<string[]>
expire(key: string, seconds: number): Promise<number>
}
/**
* @zh Redis 存储配置
* @en Redis storage configuration
*/
export interface RedisStorageConfig {
/**
* @zh Redis 客户端实例
* @en Redis client instance
*/
client: RedisClient
/**
* @zh 键前缀
* @en Key prefix
*/
prefix?: string
/**
* @zh 事务日志过期时间(秒)
* @en Transaction log expiration time in seconds
*/
transactionTTL?: number
}
const LOCK_SCRIPT = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`
/**
* @zh Redis 存储
* @en Redis storage
*
* @zh 基于 Redis 的分布式事务存储,支持分布式锁
* @en Redis-based distributed transaction storage with distributed locking support
*
* @example
* ```typescript
* import Redis from 'ioredis'
*
* const redis = new Redis('redis://localhost:6379')
* const storage = new RedisStorage({ client: redis })
* ```
*/
export class RedisStorage implements ITransactionStorage {
private _client: RedisClient
private _prefix: string
private _transactionTTL: number
constructor(config: RedisStorageConfig) {
this._client = config.client
this._prefix = config.prefix ?? 'tx:'
this._transactionTTL = config.transactionTTL ?? 86400 // 24 hours
}
// =========================================================================
// 分布式锁 | Distributed Lock
// =========================================================================
async acquireLock(key: string, ttl: number): Promise<string | null> {
const lockKey = `${this._prefix}lock:${key}`
const token = `${Date.now()}_${Math.random().toString(36).substring(2)}`
const ttlSeconds = Math.ceil(ttl / 1000)
const result = await this._client.set(lockKey, token, 'NX', 'EX', String(ttlSeconds))
return result === 'OK' ? token : null
}
async releaseLock(key: string, token: string): Promise<boolean> {
const lockKey = `${this._prefix}lock:${key}`
const result = await this._client.eval(LOCK_SCRIPT, 1, lockKey, token)
return result === 1
}
// =========================================================================
// 事务日志 | Transaction Log
// =========================================================================
async saveTransaction(tx: TransactionLog): Promise<void> {
const key = `${this._prefix}tx:${tx.id}`
await this._client.set(key, JSON.stringify(tx))
await this._client.expire(key, this._transactionTTL)
if (tx.metadata?.serverId) {
const serverKey = `${this._prefix}server:${tx.metadata.serverId}:txs`
await this._client.hset(serverKey, tx.id, String(tx.createdAt))
}
}
async getTransaction(id: string): Promise<TransactionLog | null> {
const key = `${this._prefix}tx:${id}`
const data = await this._client.get(key)
return data ? JSON.parse(data) : null
}
async updateTransactionState(id: string, state: TransactionState): Promise<void> {
const tx = await this.getTransaction(id)
if (tx) {
tx.state = state
tx.updatedAt = Date.now()
await this.saveTransaction(tx)
}
}
async updateOperationState(
transactionId: string,
operationIndex: number,
state: OperationLog['state'],
error?: string
): Promise<void> {
const tx = await this.getTransaction(transactionId)
if (tx && tx.operations[operationIndex]) {
tx.operations[operationIndex].state = state
if (error) {
tx.operations[operationIndex].error = error
}
if (state === 'executed') {
tx.operations[operationIndex].executedAt = Date.now()
} else if (state === 'compensated') {
tx.operations[operationIndex].compensatedAt = Date.now()
}
tx.updatedAt = Date.now()
await this.saveTransaction(tx)
}
}
async getPendingTransactions(serverId?: string): Promise<TransactionLog[]> {
const result: TransactionLog[] = []
if (serverId) {
const serverKey = `${this._prefix}server:${serverId}:txs`
const txIds = await this._client.hgetall(serverKey)
for (const id of Object.keys(txIds)) {
const tx = await this.getTransaction(id)
if (tx && (tx.state === 'pending' || tx.state === 'executing')) {
result.push(tx)
}
}
} else {
const pattern = `${this._prefix}tx:*`
const keys = await this._client.keys(pattern)
for (const key of keys) {
const data = await this._client.get(key)
if (data) {
const tx: TransactionLog = JSON.parse(data)
if (tx.state === 'pending' || tx.state === 'executing') {
result.push(tx)
}
}
}
}
return result
}
async deleteTransaction(id: string): Promise<void> {
const key = `${this._prefix}tx:${id}`
const tx = await this.getTransaction(id)
await this._client.del(key)
if (tx?.metadata?.serverId) {
const serverKey = `${this._prefix}server:${tx.metadata.serverId}:txs`
await this._client.hdel(serverKey, id)
}
}
// =========================================================================
// 数据操作 | Data Operations
// =========================================================================
async get<T>(key: string): Promise<T | null> {
const fullKey = `${this._prefix}data:${key}`
const data = await this._client.get(fullKey)
return data ? JSON.parse(data) : null
}
async set<T>(key: string, value: T, ttl?: number): Promise<void> {
const fullKey = `${this._prefix}data:${key}`
if (ttl) {
const ttlSeconds = Math.ceil(ttl / 1000)
await this._client.set(fullKey, JSON.stringify(value), 'EX', String(ttlSeconds))
} else {
await this._client.set(fullKey, JSON.stringify(value))
}
}
async delete(key: string): Promise<boolean> {
const fullKey = `${this._prefix}data:${key}`
const result = await this._client.del(fullKey)
return result > 0
}
}
/**
* @zh 创建 Redis 存储
* @en Create Redis storage
*/
export function createRedisStorage(config: RedisStorageConfig): RedisStorage {
return new RedisStorage(config)
}

View File

@@ -0,0 +1,8 @@
/**
* @zh 存储模块导出
* @en Storage module exports
*/
export { MemoryStorage, createMemoryStorage, type MemoryStorageConfig } from './MemoryStorage.js'
export { RedisStorage, createRedisStorage, type RedisStorageConfig, type RedisClient } from './RedisStorage.js'
export { MongoStorage, createMongoStorage, type MongoStorageConfig, type MongoDb, type MongoCollection } from './MongoStorage.js'

View File

@@ -0,0 +1,20 @@
/**
* @zh Transaction 模块服务令牌
* @en Transaction module service tokens
*/
import { createServiceToken } from '@esengine/ecs-framework'
import type { TransactionManager } from './core/TransactionManager.js'
import type { ITransactionStorage } from './core/types.js'
/**
* @zh 事务管理器令牌
* @en Transaction manager token
*/
export const TransactionManagerToken = createServiceToken<TransactionManager>('transactionManager')
/**
* @zh 事务存储令牌
* @en Transaction storage token
*/
export const TransactionStorageToken = createServiceToken<ITransactionStorage>('transactionStorage')