diff --git a/battle_srv/models/room.go b/battle_srv/models/room.go index e25b100..0d37756 100644 --- a/battle_srv/models/room.go +++ b/battle_srv/models/room.go @@ -158,7 +158,7 @@ type Room struct { DismissalWaitGroup sync.WaitGroup Barriers map[int32]*Barrier InputsBuffer *RingBuffer // Indices are STRICTLY consecutive - DiscreteInputsBuffer sync.Map // Indices are NOT NECESSARILY consecutive + InputsBufferLock sync.Mutex RenderFrameBuffer *RingBuffer LastAllConfirmedInputFrameId int32 LastAllConfirmedInputFrameIdWithChange int32 @@ -172,7 +172,8 @@ type Room struct { BulletBattleLocalIdCounter int32 dilutedRollbackEstimatedDtNanos int64 - BattleColliderInfo // Compositing to send centralized magic numbers + + BattleColliderInfo // Compositing to send centralized magic numbers } func (pR *Room) updateScore() { @@ -412,13 +413,6 @@ func (pR *Room) StartBattle() { Logger.Info("The `battleMainLoop` is started for:", zap.Any("roomId", pR.Id)) for { - pR.markConfirmationIfApplicable() - unconfirmedMask := uint64(0) - // [WARNING] Downsync the all-confirmed inputFrames asap! - upperToSendInputFrameId := pR.LastAllConfirmedInputFrameId - for playerId, player := range pR.Players { - pR.downsyncToSinglePlayer(playerId, player, upperToSendInputFrameId, unconfirmedMask) - } stCalculation := utils.UnixtimeNano() elapsedNanosSinceLastFrameIdTriggered := stCalculation - pR.LastRenderFrameIdTriggeredAt @@ -436,21 +430,46 @@ func (pR *Room) StartBattle() { return } - // Prefab and buffer backend inputFrameDownsync - if pR.shouldPrefabInputFrameDownsync(pR.RenderFrameId) { - noDelayInputFrameId := pR.ConvertToInputFrameId(pR.RenderFrameId, 0) - pR.prefabInputFrameDownsync(noDelayInputFrameId) + if 0 == pR.RenderFrameId { + for playerId, player := range pR.Players { + currPlayerBattleState := atomic.LoadInt32(&(player.BattleState)) // Might be changed in "OnPlayerDisconnected/OnPlayerLost" from other threads + // [WARNING] DON'T try to send any message to an inactive player! + switch currPlayerBattleState { + case PlayerBattleStateIns.DISCONNECTED: + case PlayerBattleStateIns.LOST: + continue + } + kickoffFrame := pR.RenderFrameBuffer.GetByFrameId(0).(*RoomDownsyncFrame) + pR.sendSafely(kickoffFrame, nil, DOWNSYNC_MSG_ACT_BATTLE_START, playerId) + } + Logger.Info(fmt.Sprintf("In `battleMainLoop` for roomId=%v sent out kickoffFrame", pR.Id)) } + upperToSendInputFrameId := pR.LastAllConfirmedInputFrameId + dynamicsDuration := int64(0) + unconfirmedMask := uint64(0) + // Prefab and buffer backend inputFrameDownsync if pR.BackendDynamicsForceConfirmationEnabled { + pR.InputsBufferLock.Lock() + defer func() { + pR.InputsBufferLock.Unlock() + }() + if pR.shouldPrefabInputFrameDownsync(pR.RenderFrameId) { + noDelayInputFrameId := pR.ConvertToInputFrameId(pR.RenderFrameId, 0) + if existingInputFrame := pR.InputsBuffer.GetByFrameId(noDelayInputFrameId); nil == existingInputFrame { + pR.prefabInputFrameDownsync(noDelayInputFrameId) + } + } + // Force setting all-confirmed of buffered inputFrames periodically unconfirmedMask = pR.forceConfirmationIfApplicable() } - upperToSendInputFrameId = pR.LastAllConfirmedInputFrameId - - dynamicsDuration := int64(0) if pR.BackendDynamicsEnabled { + pR.InputsBufferLock.Lock() + defer func() { + pR.InputsBufferLock.Unlock() + }() if 0 <= pR.LastAllConfirmedInputFrameId { dynamicsStartedAt := utils.UnixtimeNano() // Apply "all-confirmed inputFrames" to move forward "pR.CurDynamicsRenderFrameId" @@ -488,39 +507,60 @@ func (pR *Room) toDiscreteInputsBufferIndex(inputFrameId int32, joinIndex int32) } func (pR *Room) OnBattleCmdReceived(pReq *WsReq) { + // [WARNING] This function "OnBattleCmdReceived" could be called by different ws sessions and thus from different threads! if swapped := atomic.CompareAndSwapInt32(&pR.State, RoomBattleStateIns.IN_BATTLE, RoomBattleStateIns.IN_BATTLE); !swapped { return } playerId := pReq.PlayerId + var player *Player = nil + var existent bool = false inputFrameUpsyncBatch := pReq.InputFrameUpsyncBatch ackingFrameId := pReq.AckingFrameId ackingInputFrameId := pReq.AckingInputFrameId - if _, existent := pR.Players[playerId]; !existent { + if player, existent = pR.Players[playerId]; !existent { Logger.Warn(fmt.Sprintf("upcmd player doesn't exist: roomId=%v, playerId=%v", pR.Id, playerId)) return } - if swapped := atomic.CompareAndSwapInt32(&(pR.Players[playerId].AckingFrameId), pR.Players[playerId].AckingFrameId, ackingFrameId); !swapped { - panic(fmt.Sprintf("Failed to update AckingFrameId to %v for roomId=%v, playerId=%v", ackingFrameId, pR.Id, playerId)) - } + atomic.StoreInt32(&(player.AckingFrameId), ackingFrameId) + atomic.StoreInt32(&(player.AckingInputFrameId), ackingInputFrameId) - if swapped := atomic.CompareAndSwapInt32(&(pR.Players[playerId].AckingInputFrameId), pR.Players[playerId].AckingInputFrameId, ackingInputFrameId); !swapped { - panic(fmt.Sprintf("Failed to update AckingInputFrameId to %v for roomId=%v, playerId=%v", ackingInputFrameId, pR.Id, playerId)) - } + pR.InputsBufferLock.Lock() + defer func() { + pR.InputsBufferLock.Unlock() + }() for _, inputFrameUpsync := range inputFrameUpsyncBatch { clientInputFrameId := inputFrameUpsync.InputFrameId if clientInputFrameId < pR.InputsBuffer.StFrameId { // The updates to "pR.InputsBuffer.StFrameId" is monotonically increasing, thus if "clientInputFrameId < pR.InputsBuffer.StFrameId" at any moment of time, it is obsolete in the future. - Logger.Debug(fmt.Sprintf("Omitting obsolete inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false))) + Logger.Warn(fmt.Sprintf("Omitting obsolete inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false))) continue } - bufIndex := pR.toDiscreteInputsBufferIndex(clientInputFrameId, pReq.JoinIndex) - pR.DiscreteInputsBuffer.Store(bufIndex, inputFrameUpsync) - - // TODO: "pR.DiscreteInputsBuffer" might become too large with outdated "inputFrameUpsync" items, maintain another queue orderd by timestamp to evict them + if clientInputFrameId > pR.InputsBuffer.EdFrameId { + Logger.Warn(fmt.Sprintf("Dropping too advanced inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v; is this player cheating?", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false))) + continue + } + var targetInputFrameDownsync *InputFrameDownsync = nil + if clientInputFrameId == pR.InputsBuffer.EdFrameId { + targetInputFrameDownsync = pR.prefabInputFrameDownsync(clientInputFrameId) + Logger.Debug(fmt.Sprintf("OnBattleCmdReceived-Prefabbed new inputFrameDownsync from inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false))) + } else { + targetInputFrameDownsync = pR.InputsBuffer.GetByFrameId(clientInputFrameId).(*InputFrameDownsync) + Logger.Debug(fmt.Sprintf("OnBattleCmdReceived-stuffing inputFrameDownsync from inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false))) + } + targetInputFrameDownsync.InputList[player.JoinIndex-1] = inputFrameUpsync.Encoded + targetInputFrameDownsync.ConfirmedList |= uint64(1 << uint32(player.JoinIndex-1)) + } + newAllConfirmedCount := pR.markConfirmationIfApplicable() + if 0 < newAllConfirmedCount { + // Downsync new all-confirmed inputFrames asap + unconfirmedMask := uint64(0) + for playerId, player := range pR.Players { + pR.downsyncToSinglePlayer(playerId, player, pR.LastAllConfirmedInputFrameId, unconfirmedMask) + } } } @@ -675,7 +715,6 @@ func (pR *Room) OnDismissed() { pR.Barriers = make(map[int32]*Barrier) pR.RenderCacheSize = 1024 pR.RenderFrameBuffer = NewRingBuffer(pR.RenderCacheSize) - pR.DiscreteInputsBuffer = sync.Map{} pR.InputsBuffer = NewRingBuffer((pR.RenderCacheSize >> 2) + 1) pR.LastAllConfirmedInputFrameId = -1 @@ -697,7 +736,7 @@ func (pR *Room) OnDismissed() { pR.InputFrameUpsyncDelayTolerance = 2 pR.MaxChasingRenderFramesPerUpdate = 8 - pR.BackendDynamicsEnabled = true // [WARNING] When "false", recovery upon reconnection wouldn't work! + pR.BackendDynamicsEnabled = false // [WARNING] When "false", recovery upon reconnection wouldn't work! pR.BackendDynamicsForceConfirmationEnabled = (pR.BackendDynamicsEnabled && true) punchSkillId := int32(1) pR.MeleeSkillConfig = make(map[int32]*MeleeBullet, 0) @@ -998,34 +1037,28 @@ func (pR *Room) prefabInputFrameDownsync(inputFrameId int32) *InputFrameDownsync return currInputFrameDownsync } -func (pR *Room) markConfirmationIfApplicable() { +func (pR *Room) markConfirmationIfApplicable() int { + newAllConfirmedCount := 0 inputFrameId1 := pR.LastAllConfirmedInputFrameId + 1 totPlayerCnt := uint32(pR.Capacity) allConfirmedMask := uint64((1 << totPlayerCnt) - 1) + for inputFrameId := inputFrameId1; inputFrameId < pR.InputsBuffer.EdFrameId; inputFrameId++ { tmp := pR.InputsBuffer.GetByFrameId(inputFrameId) if nil == tmp { - panic(fmt.Sprintf("inputFrameId=%v doesn't exist for roomId=%v, this is abnormal because the server should prefab inputFrameDownsync in a most advanced pace, check the prefab logic (Or maybe you're having a 'Room.RenderCacheSize' too small)! InputsBuffer=%v", inputFrameId, pR.Id, pR.InputsBufferString(false))) + panic(fmt.Sprintf("inputFrameId=%v doesn't exist for roomId=%v! InputsBuffer=%v", inputFrameId, pR.Id, pR.InputsBufferString(false))) } inputFrameDownsync := tmp.(*InputFrameDownsync) - for _, player := range pR.Players { - bufIndex := pR.toDiscreteInputsBufferIndex(inputFrameId, player.JoinIndex) - tmp, loaded := pR.DiscreteInputsBuffer.LoadAndDelete(bufIndex) // It's safe to "LoadAndDelete" here because the "inputFrameUpsync" of this player is already remembered by the corresponding "inputFrameDown". - if !loaded { - continue - } - inputFrameUpsync := tmp.(*InputFrameUpsync) - indiceInJoinIndexBooleanArr := uint32(player.JoinIndex - 1) - inputFrameDownsync.InputList[indiceInJoinIndexBooleanArr] = inputFrameUpsync.Encoded - inputFrameDownsync.ConfirmedList |= (1 << indiceInJoinIndexBooleanArr) - } - if allConfirmedMask == inputFrameDownsync.ConfirmedList { + newAllConfirmedCount += 1 pR.onInputFrameDownsyncAllConfirmed(inputFrameDownsync, -1) } else { break } } + + Logger.Info(fmt.Sprintf("markConfirmationIfApplicable checking inputFrameId=[%v, %v) for roomId=%v, newAllConfirmedCount=%d: InputsBuffer=%v", inputFrameId1, pR.InputsBuffer.EdFrameId, pR.Id, newAllConfirmedCount, pR.InputsBufferString(false))) + return newAllConfirmedCount } func (pR *Room) forceConfirmationIfApplicable() uint64 { @@ -1375,8 +1408,6 @@ func (pR *Room) printBarrier(barrierCollider *resolv.Object) { } func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, upperToSendInputFrameId int32, unconfirmedMask uint64) { - // [WARNING] "pR.InputsBuffer" isn't thread-safe, thus it's critical to guarantee that "pR.InputsBuffer.Put/Pop/GetByFrameId" are all executed by "battleMainLoop" only, i.e. this function should also be executed by "battleMainLoop" only! - currPlayerBattleState := atomic.LoadInt32(&(player.BattleState)) // Might be changed in "OnPlayerDisconnected/OnPlayerLost" from other threads // [WARNING] DON'T try to send any message to an inactive player! @@ -1431,7 +1462,7 @@ func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, upperToSe } if 0 >= len(toSendInputFrames) { - Logger.Warn(fmt.Sprintf("Not sending due to empty toSendInputFrames: roomId=%v, playerId=%v, refRenderFrameId=%v, upperToSendInputFrameId=%v, lastSentInputFrameId=%v, playerAckingInputFrameId=%v", pR.Id, playerId, refRenderFrameId, upperToSendInputFrameId, player.LastSentInputFrameId, player.AckingInputFrameId)) + Logger.Debug(fmt.Sprintf("Not sending due to empty toSendInputFrames: roomId=%v, playerId=%v, refRenderFrameId=%v, upperToSendInputFrameId=%v, lastSentInputFrameId=%v, playerAckingInputFrameId=%v", pR.Id, playerId, refRenderFrameId, upperToSendInputFrameId, player.LastSentInputFrameId, player.AckingInputFrameId)) return } diff --git a/battle_srv/ws/serve.go b/battle_srv/ws/serve.go index 7bcea1b..c0131c5 100644 --- a/battle_srv/ws/serve.go +++ b/battle_srv/ws/serve.go @@ -365,7 +365,7 @@ func Serve(c *gin.Context) { return nil } - // Tries to receive from client-side in a non-blocking manner. + // TODO: Is there any potential edge-trigger improvement like the epoll approach mentioned above for the following statement? See discussion in https://github.com/gorilla/websocket/issues/122 _, bytes, err := conn.ReadMessage() if nil != err { Logger.Error("About to `signalToCloseConnOfThisPlayer`", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Error(err))