@@ -2,10 +2,11 @@
* @zh MongoDB 存储实现
* @en MongoDB storage implementation
*
* @zh 支持持久化事务日志和查询
* @en Supports persistent transaction logs and querie s
* @zh 基于共享连接的事务存储,使用 @esengine/database-drivers 提供的连接
* @en Transaction storage based on shared connection from @esengine/database-driver s
*/
import type { IMongoConnection , IMongoCollection } from '@esengine/database-drivers' ;
import type {
ITransactionStorage ,
TransactionLog ,
@@ -13,43 +14,9 @@ import type {
OperationLog
} from '../core/types.js' ;
/**
* @zh MongoDB Collection 接口
* @en MongoDB Collection interface
*/
export interface MongoCollection < T > {
findOne ( filter : object ) : Promise < T | null >
find ( filter : object ) : {
toArray ( ) : Promise < T [ ] >
}
insertOne ( doc : T ) : Promise < { insertedId : unknown } >
updateOne ( filter : object , update : object ) : Promise < { modifiedCount : number } >
deleteOne ( filter : object ) : Promise < { deletedCount : number } >
createIndex ( spec : object , options? : object ) : Promise < string >
}
/**
* @zh MongoDB 数据库接口
* @en MongoDB database interface
*/
export interface MongoDb {
collection < T = unknown > ( name : string ) : MongoCollection < T >
}
/**
* @zh MongoDB 客户端接口
* @en MongoDB client interface
*/
export interface MongoClient {
db ( name? : string ) : MongoDb
close ( ) : Promise < void >
}
/**
* @zh MongoDB 连接工厂
* @en MongoDB connection factory
*/
export type MongoClientFactory = ( ) = > MongoClient | Promise < MongoClient >
// =============================================================================
// 配置类型 | Configuration Types
// =============================================================================
/**
* @zh MongoDB 存储配置
@@ -57,29 +24,10 @@ export type MongoClientFactory = () => MongoClient | Promise<MongoClient>
*/
export interface MongoStorageConfig {
/**
* @zh MongoDB 客户端工厂(惰性 连接)
* @en MongoDB client factory (lazy connection )
*
* @example
* ```typescript
* import { MongoClient } from 'mongodb'
* const storage = new MongoStorage({
* factory: async () => {
* const client = new MongoClient('mongodb://localhost:27017')
* await client.connect()
* return client
* },
* database: 'game'
* })
* ```
* @zh MongoDB 连接(来自 @esengine/database-drivers )
* @en MongoDB connection (from @esengine/database-drivers )
*/
factory : MongoClientFactory
/**
* @zh 数据库名称
* @en Database name
*/
database : string
connection : I MongoConnection
/**
* @zh 事务日志集合名称
@@ -100,6 +48,10 @@ export interface MongoStorageConfig {
lockCollection? : string
}
// =============================================================================
// 内部类型 | Internal Types
// =============================================================================
interface LockDocument {
_id : string
token : string
@@ -112,50 +64,40 @@ interface DataDocument {
expireAt? : Date
}
// =============================================================================
// 实现 | Implementation
// =============================================================================
/**
* @zh MongoDB 存储
* @en MongoDB storage
*
* @zh 基于 MongoDB 的事务存储,支持持久化、复杂查询和惰性 连接
* @en MongoDB-based transaction storage with persistence, complex queries and lazy connection
* @zh 基于 MongoDB 的事务存储,使用 @esengine/database-drivers 的共享 连接
* @en MongoDB-based transaction storage using shared connection from @esengine/database-drivers
*
* @example
* ```typescript
* import { MongoClient } from 'mongodb '
* import { createMongoConnection } from '@esengine/database-drivers '
* import { MongoStorage } from '@esengine/transaction'
*
* // 创建存储(惰性连接,首次操作时才连接)
* const storage = new MongoStorage({
* factory: async () => {
* const client = new MongoClient('mongodb://localhost:27017')
* await client.connect()
* return client
* },
* const mongo = createMongoConnection({
* uri: 'mongodb://localhost:27017',
* database: 'game'
* })
* await mongo.connect()
*
* awai t storage.ensureIndexes( )
*
* // 使用后手动关闭
* await storage.close()
*
* // 或使用 await using 自动关闭 (TypeScript 5.2+)
* await using storage = new MongoStorage({ ... })
* // 作用域结束时自动关闭
* cons t storage = new MongoStorage({ connection: mongo } )
* ```
*/
export class MongoStorage implements ITransactionStorage {
private _client : MongoClient | null = null ;
private _db : MongoDb | null = null ;
private _factory : MongoClientFactory ;
private _database : string ;
private _transactionCollection : string ;
private _dataCollection : string ;
private _lockCollection : string ;
private readonly _connection : I MongoConnection ;
private readonly _transactionCollection : string ;
private readonly _dataCollection : string ;
private readonly _lockCollection : string ;
private _closed : boolean = false ;
constructor ( config : MongoStorageConfig ) {
this . _factory = config . factory ;
this . _database = config . database ;
this . _connection = config . connection ;
this . _transactionCollection = config . transactionCollection ? ? 'transactions' ;
this . _dataCollection = config . dataCollection ? ? 'transaction_data' ;
this . _lockCollection = config . lockCollection ? ? 'transaction_locks' ;
@@ -166,36 +108,30 @@ export class MongoStorage implements ITransactionStorage {
// =========================================================================
/**
* @zh 获取数据库实例(惰性连接)
* @en Get database instance (lazy conn ection)
* @zh 获取集合
* @en Get coll ection
*/
private async _getDb ( ) : Promise < MongoDb > {
private _getCollection < T extends object > ( name : string ) : IMongoCollection < T > {
if ( this . _closed ) {
throw new Error ( 'MongoStorage is closed' ) ;
}
if ( ! this . _db ) {
this . _client = await this . _factory ( ) ;
this . _db = this . _client . db ( this . _database ) ;
if ( ! this . _connection . isConnected ( ) ) {
throw new Error ( 'MongoDB connection is not connected' ) ;
}
return this . _db ;
return this . _connection . collection < T > ( name ) ;
}
/**
* @zh 关闭存储连接
* @en Close storage connection
* @zh 关闭存储
* @en Close storage
*
* @zh 不会关闭共享连接,只标记存储为已关闭
* @en Does not close shared connection, only marks storage as closed
*/
async close ( ) : Promise < void > {
if ( this . _closed ) return ;
this . _closed = true ;
if ( this . _client ) {
await this . _client . close ( ) ;
this . _client = null ;
this . _db = null ;
}
}
/**
@@ -211,16 +147,15 @@ export class MongoStorage implements ITransactionStorage {
* @en Ensure indexes exist
*/
async ensureIndexes ( ) : Promise < void > {
const db = await this . _getDb ( ) ;
const txColl = db . collection < TransactionLog > ( this . _transactionCollection ) ;
const txColl = this . _getCollection < TransactionLog & { _id : string } > ( this . _transactionCollection ) ;
await txColl . createIndex ( { state : 1 } ) ;
await txColl . createIndex ( { 'metadata.serverId' : 1 } ) ;
await txColl . createIndex ( { createdAt : 1 } ) ;
const lockColl = db . c ollection< LockDocument > ( this . _lockCollection ) ;
const lockColl = this . _getC ollection< LockDocument > ( this . _lockCollection ) ;
await lockColl . createIndex ( { expireAt : 1 } , { expireAfterSeconds : 0 } ) ;
const dataColl = db . c ollection< DataDocument > ( this . _dataCollection ) ;
const dataColl = this . _getC ollection< DataDocument > ( this . _dataCollection ) ;
await dataColl . createIndex ( { expireAt : 1 } , { expireAfterSeconds : 0 } ) ;
}
@@ -229,19 +164,14 @@ export class MongoStorage implements ITransactionStorage {
// =========================================================================
async acquireLock ( key : string , ttl : number ) : Promise < string | null > {
const db = await this . _getDb ( ) ;
const coll = db . collection < LockDocument > ( this . _lockCollection ) ;
const coll = this . _getCollection < LockDocument > ( this . _lockCollection ) ;
const token = ` ${ Date . now ( ) } _ ${ Math . random ( ) . toString ( 36 ) . substring ( 2 ) } ` ;
const expireAt = new Date ( Date . now ( ) + ttl ) ;
try {
await coll . insertOne ( {
_id : key ,
token ,
expireAt
} ) ;
await coll . insertOne ( { _id : key , token , expireAt } as LockDocument ) ;
return token ;
} catch ( error ) {
} catch {
const existing = await coll . findOne ( { _id : key } ) ;
if ( existing && existing . expireAt < new Date ( ) ) {
const result = await coll . updateOne (
@@ -257,8 +187,7 @@ export class MongoStorage implements ITransactionStorage {
}
async releaseLock ( key : string , token : string ) : Promise < boolean > {
const db = await this . _getDb ( ) ;
const coll = db . collection < LockDocument > ( this . _lockCollection ) ;
const coll = this . _getCollection < LockDocument > ( this . _lockCollection ) ;
const result = await coll . deleteOne ( { _id : key , token } ) ;
return result . deletedCount > 0 ;
}
@@ -268,8 +197,7 @@ export class MongoStorage implements ITransactionStorage {
// =========================================================================
async saveTransaction ( tx : TransactionLog ) : Promise < void > {
const db = await this . _getDb ( ) ;
const coll = db . collection < TransactionLog & { _id : string } > ( this . _transactionCollection ) ;
const coll = this . _getCollection < TransactionLog & { _id : string } > ( this . _transactionCollection ) ;
const existing = await coll . findOne ( { _id : tx.id } ) ;
if ( existing ) {
@@ -278,13 +206,12 @@ export class MongoStorage implements ITransactionStorage {
{ $set : { . . . tx , _id : tx.id } }
) ;
} else {
await coll . insertOne ( { . . . tx , _id : tx.id } ) ;
await coll . insertOne ( { . . . tx , _id : tx.id } as TransactionLog & { _id : string } ) ;
}
}
async getTransaction ( id : string ) : Promise < TransactionLog | null > {
const db = await this . _getDb ( ) ;
const coll = db . collection < TransactionLog & { _id : string } > ( this . _transactionCollection ) ;
const coll = this . _getCollection < TransactionLog & { _id : string } > ( this . _transactionCollection ) ;
const doc = await coll . findOne ( { _id : id } ) ;
if ( ! doc ) return null ;
@@ -294,8 +221,7 @@ export class MongoStorage implements ITransactionStorage {
}
async updateTransactionState ( id : string , state : TransactionState ) : Promise < void > {
const db = await this . _getDb ( ) ;
const coll = db . collection ( this . _transactionCollection ) ;
const coll = this . _getCollection < TransactionLog & { _id : string } > ( this . _transactionCollection ) ;
await coll . updateOne (
{ _id : id } ,
{ $set : { state , updatedAt : Date.now ( ) } }
@@ -308,8 +234,7 @@ export class MongoStorage implements ITransactionStorage {
state : OperationLog [ 'state' ] ,
error? : string
) : Promise < void > {
const db = await this . _getDb ( ) ;
const coll = db . collection ( this . _transactionCollection ) ;
const coll = this . _getCollection < TransactionLog & { _id : string } > ( this . _transactionCollection ) ;
const update : Record < string , unknown > = {
[ ` operations. ${ operationIndex } .state ` ] : state ,
@@ -333,8 +258,7 @@ export class MongoStorage implements ITransactionStorage {
}
async getPendingTransactions ( serverId? : string ) : Promise < TransactionLog [ ] > {
const db = await this . _getDb ( ) ;
const coll = db . collection < TransactionLog & { _id : string } > ( this . _transactionCollection ) ;
const coll = this . _getCollection < TransactionLog & { _id : string } > ( this . _transactionCollection ) ;
const filter : Record < string , unknown > = {
state : { $in : [ 'pending' , 'executing' ] }
@@ -344,13 +268,12 @@ export class MongoStorage implements ITransactionStorage {
filter [ 'metadata.serverId' ] = serverId ;
}
const docs = await coll . find ( filter ) . toArray ( ) ;
const docs = await coll . find ( filter ) ;
return docs . map ( ( { _id , . . . tx } ) = > tx as TransactionLog ) ;
}
async deleteTransaction ( id : string ) : Promise < void > {
const db = await this . _getDb ( ) ;
const coll = db . collection ( this . _transactionCollection ) ;
const coll = this . _getCollection < TransactionLog & { _id : string } > ( this . _transactionCollection ) ;
await coll . deleteOne ( { _id : id } ) ;
}
@@ -359,8 +282,7 @@ export class MongoStorage implements ITransactionStorage {
// =========================================================================
async get < T > ( key : string ) : Promise < T | null > {
const db = await this . _getDb ( ) ;
const coll = db . collection < DataDocument > ( this . _dataCollection ) ;
const coll = this . _getCollection < DataDocument > ( this . _dataCollection ) ;
const doc = await coll . findOne ( { _id : key } ) ;
if ( ! doc ) return null ;
@@ -374,13 +296,9 @@ export class MongoStorage implements ITransactionStorage {
}
async set < T > ( key : string , value : T , ttl? : number ) : Promise < void > {
const db = await this . _getDb ( ) ;
const coll = db . collection < DataDocument > ( this . _dataCollection ) ;
const coll = this . _getCollection < DataDocument > ( this . _dataCollection ) ;
const doc : DataDocument = {
_id : key ,
value
} ;
const doc : DataDocument = { _id : key , value } ;
if ( ttl ) {
doc . expireAt = new Date ( Date . now ( ) + ttl ) ;
@@ -395,8 +313,7 @@ export class MongoStorage implements ITransactionStorage {
}
async delete ( key : string ) : Promise < boolean > {
const db = await this . _getDb ( ) ;
const coll = db . collection ( this . _dataCollection ) ;
const coll = this . _getCollection < DataDocument > ( this . _dataCollection ) ;
const result = await coll . deleteOne ( { _id : key } ) ;
return result . deletedCount > 0 ;
}
@@ -405,7 +322,24 @@ export class MongoStorage implements ITransactionStorage {
/**
* @zh 创建 MongoDB 存储
* @en Create MongoDB storage
*
* @example
* ```typescript
* import { createMongoConnection } from '@esengine/database-drivers'
* import { createMongoStorage } from '@esengine/transaction'
*
* const mongo = createMongoConnection({
* uri: 'mongodb://localhost:27017',
* database: 'game'
* })
* await mongo.connect()
*
* const storage = createMongoStorage(mongo)
* ```
*/
export function createMongoStorage ( config : MongoStorageConfig ) : MongoStorage {
return new MongoStorage ( config ) ;
export function createMongoStorage (
connection : IMongoConnection ,
options? : Omit < MongoStorageConfig , 'connection' >
) : MongoStorage {
return new MongoStorage ( { connection , . . . options } ) ;
}