Added necessary locking for backend InputsBuffer.

This commit is contained in:
genxium 2022-11-30 16:53:48 +08:00
parent f3a12b2aa9
commit 26370dce61
2 changed files with 79 additions and 48 deletions

View File

@ -158,7 +158,7 @@ type Room struct {
DismissalWaitGroup sync.WaitGroup DismissalWaitGroup sync.WaitGroup
Barriers map[int32]*Barrier Barriers map[int32]*Barrier
InputsBuffer *RingBuffer // Indices are STRICTLY consecutive InputsBuffer *RingBuffer // Indices are STRICTLY consecutive
DiscreteInputsBuffer sync.Map // Indices are NOT NECESSARILY consecutive InputsBufferLock sync.Mutex
RenderFrameBuffer *RingBuffer RenderFrameBuffer *RingBuffer
LastAllConfirmedInputFrameId int32 LastAllConfirmedInputFrameId int32
LastAllConfirmedInputFrameIdWithChange int32 LastAllConfirmedInputFrameIdWithChange int32
@ -172,6 +172,7 @@ type Room struct {
BulletBattleLocalIdCounter int32 BulletBattleLocalIdCounter int32
dilutedRollbackEstimatedDtNanos int64 dilutedRollbackEstimatedDtNanos int64
BattleColliderInfo // Compositing to send centralized magic numbers BattleColliderInfo // Compositing to send centralized magic numbers
} }
@ -412,13 +413,6 @@ func (pR *Room) StartBattle() {
Logger.Info("The `battleMainLoop` is started for:", zap.Any("roomId", pR.Id)) Logger.Info("The `battleMainLoop` is started for:", zap.Any("roomId", pR.Id))
for { for {
pR.markConfirmationIfApplicable()
unconfirmedMask := uint64(0)
// [WARNING] Downsync the all-confirmed inputFrames asap!
upperToSendInputFrameId := pR.LastAllConfirmedInputFrameId
for playerId, player := range pR.Players {
pR.downsyncToSinglePlayer(playerId, player, upperToSendInputFrameId, unconfirmedMask)
}
stCalculation := utils.UnixtimeNano() stCalculation := utils.UnixtimeNano()
elapsedNanosSinceLastFrameIdTriggered := stCalculation - pR.LastRenderFrameIdTriggeredAt elapsedNanosSinceLastFrameIdTriggered := stCalculation - pR.LastRenderFrameIdTriggeredAt
@ -436,21 +430,46 @@ func (pR *Room) StartBattle() {
return return
} }
// Prefab and buffer backend inputFrameDownsync if 0 == pR.RenderFrameId {
if pR.shouldPrefabInputFrameDownsync(pR.RenderFrameId) { for playerId, player := range pR.Players {
noDelayInputFrameId := pR.ConvertToInputFrameId(pR.RenderFrameId, 0) currPlayerBattleState := atomic.LoadInt32(&(player.BattleState)) // Might be changed in "OnPlayerDisconnected/OnPlayerLost" from other threads
pR.prefabInputFrameDownsync(noDelayInputFrameId) // [WARNING] DON'T try to send any message to an inactive player!
switch currPlayerBattleState {
case PlayerBattleStateIns.DISCONNECTED:
case PlayerBattleStateIns.LOST:
continue
}
kickoffFrame := pR.RenderFrameBuffer.GetByFrameId(0).(*RoomDownsyncFrame)
pR.sendSafely(kickoffFrame, nil, DOWNSYNC_MSG_ACT_BATTLE_START, playerId)
}
Logger.Info(fmt.Sprintf("In `battleMainLoop` for roomId=%v sent out kickoffFrame", pR.Id))
} }
upperToSendInputFrameId := pR.LastAllConfirmedInputFrameId
dynamicsDuration := int64(0)
unconfirmedMask := uint64(0)
// Prefab and buffer backend inputFrameDownsync
if pR.BackendDynamicsForceConfirmationEnabled { if pR.BackendDynamicsForceConfirmationEnabled {
pR.InputsBufferLock.Lock()
defer func() {
pR.InputsBufferLock.Unlock()
}()
if pR.shouldPrefabInputFrameDownsync(pR.RenderFrameId) {
noDelayInputFrameId := pR.ConvertToInputFrameId(pR.RenderFrameId, 0)
if existingInputFrame := pR.InputsBuffer.GetByFrameId(noDelayInputFrameId); nil == existingInputFrame {
pR.prefabInputFrameDownsync(noDelayInputFrameId)
}
}
// Force setting all-confirmed of buffered inputFrames periodically // Force setting all-confirmed of buffered inputFrames periodically
unconfirmedMask = pR.forceConfirmationIfApplicable() unconfirmedMask = pR.forceConfirmationIfApplicable()
} }
upperToSendInputFrameId = pR.LastAllConfirmedInputFrameId
dynamicsDuration := int64(0)
if pR.BackendDynamicsEnabled { if pR.BackendDynamicsEnabled {
pR.InputsBufferLock.Lock()
defer func() {
pR.InputsBufferLock.Unlock()
}()
if 0 <= pR.LastAllConfirmedInputFrameId { if 0 <= pR.LastAllConfirmedInputFrameId {
dynamicsStartedAt := utils.UnixtimeNano() dynamicsStartedAt := utils.UnixtimeNano()
// Apply "all-confirmed inputFrames" to move forward "pR.CurDynamicsRenderFrameId" // Apply "all-confirmed inputFrames" to move forward "pR.CurDynamicsRenderFrameId"
@ -488,39 +507,60 @@ func (pR *Room) toDiscreteInputsBufferIndex(inputFrameId int32, joinIndex int32)
} }
func (pR *Room) OnBattleCmdReceived(pReq *WsReq) { func (pR *Room) OnBattleCmdReceived(pReq *WsReq) {
// [WARNING] This function "OnBattleCmdReceived" could be called by different ws sessions and thus from different threads!
if swapped := atomic.CompareAndSwapInt32(&pR.State, RoomBattleStateIns.IN_BATTLE, RoomBattleStateIns.IN_BATTLE); !swapped { if swapped := atomic.CompareAndSwapInt32(&pR.State, RoomBattleStateIns.IN_BATTLE, RoomBattleStateIns.IN_BATTLE); !swapped {
return return
} }
playerId := pReq.PlayerId playerId := pReq.PlayerId
var player *Player = nil
var existent bool = false
inputFrameUpsyncBatch := pReq.InputFrameUpsyncBatch inputFrameUpsyncBatch := pReq.InputFrameUpsyncBatch
ackingFrameId := pReq.AckingFrameId ackingFrameId := pReq.AckingFrameId
ackingInputFrameId := pReq.AckingInputFrameId ackingInputFrameId := pReq.AckingInputFrameId
if _, existent := pR.Players[playerId]; !existent { if player, existent = pR.Players[playerId]; !existent {
Logger.Warn(fmt.Sprintf("upcmd player doesn't exist: roomId=%v, playerId=%v", pR.Id, playerId)) Logger.Warn(fmt.Sprintf("upcmd player doesn't exist: roomId=%v, playerId=%v", pR.Id, playerId))
return return
} }
if swapped := atomic.CompareAndSwapInt32(&(pR.Players[playerId].AckingFrameId), pR.Players[playerId].AckingFrameId, ackingFrameId); !swapped { atomic.StoreInt32(&(player.AckingFrameId), ackingFrameId)
panic(fmt.Sprintf("Failed to update AckingFrameId to %v for roomId=%v, playerId=%v", ackingFrameId, pR.Id, playerId)) atomic.StoreInt32(&(player.AckingInputFrameId), ackingInputFrameId)
}
if swapped := atomic.CompareAndSwapInt32(&(pR.Players[playerId].AckingInputFrameId), pR.Players[playerId].AckingInputFrameId, ackingInputFrameId); !swapped { pR.InputsBufferLock.Lock()
panic(fmt.Sprintf("Failed to update AckingInputFrameId to %v for roomId=%v, playerId=%v", ackingInputFrameId, pR.Id, playerId))
}
defer func() {
pR.InputsBufferLock.Unlock()
}()
for _, inputFrameUpsync := range inputFrameUpsyncBatch { for _, inputFrameUpsync := range inputFrameUpsyncBatch {
clientInputFrameId := inputFrameUpsync.InputFrameId clientInputFrameId := inputFrameUpsync.InputFrameId
if clientInputFrameId < pR.InputsBuffer.StFrameId { if clientInputFrameId < pR.InputsBuffer.StFrameId {
// The updates to "pR.InputsBuffer.StFrameId" is monotonically increasing, thus if "clientInputFrameId < pR.InputsBuffer.StFrameId" at any moment of time, it is obsolete in the future. // The updates to "pR.InputsBuffer.StFrameId" is monotonically increasing, thus if "clientInputFrameId < pR.InputsBuffer.StFrameId" at any moment of time, it is obsolete in the future.
Logger.Debug(fmt.Sprintf("Omitting obsolete inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false))) Logger.Warn(fmt.Sprintf("Omitting obsolete inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false)))
continue continue
} }
bufIndex := pR.toDiscreteInputsBufferIndex(clientInputFrameId, pReq.JoinIndex) if clientInputFrameId > pR.InputsBuffer.EdFrameId {
pR.DiscreteInputsBuffer.Store(bufIndex, inputFrameUpsync) Logger.Warn(fmt.Sprintf("Dropping too advanced inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v; is this player cheating?", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false)))
continue
// TODO: "pR.DiscreteInputsBuffer" might become too large with outdated "inputFrameUpsync" items, maintain another queue orderd by timestamp to evict them }
var targetInputFrameDownsync *InputFrameDownsync = nil
if clientInputFrameId == pR.InputsBuffer.EdFrameId {
targetInputFrameDownsync = pR.prefabInputFrameDownsync(clientInputFrameId)
Logger.Debug(fmt.Sprintf("OnBattleCmdReceived-Prefabbed new inputFrameDownsync from inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false)))
} else {
targetInputFrameDownsync = pR.InputsBuffer.GetByFrameId(clientInputFrameId).(*InputFrameDownsync)
Logger.Debug(fmt.Sprintf("OnBattleCmdReceived-stuffing inputFrameDownsync from inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false)))
}
targetInputFrameDownsync.InputList[player.JoinIndex-1] = inputFrameUpsync.Encoded
targetInputFrameDownsync.ConfirmedList |= uint64(1 << uint32(player.JoinIndex-1))
}
newAllConfirmedCount := pR.markConfirmationIfApplicable()
if 0 < newAllConfirmedCount {
// Downsync new all-confirmed inputFrames asap
unconfirmedMask := uint64(0)
for playerId, player := range pR.Players {
pR.downsyncToSinglePlayer(playerId, player, pR.LastAllConfirmedInputFrameId, unconfirmedMask)
}
} }
} }
@ -675,7 +715,6 @@ func (pR *Room) OnDismissed() {
pR.Barriers = make(map[int32]*Barrier) pR.Barriers = make(map[int32]*Barrier)
pR.RenderCacheSize = 1024 pR.RenderCacheSize = 1024
pR.RenderFrameBuffer = NewRingBuffer(pR.RenderCacheSize) pR.RenderFrameBuffer = NewRingBuffer(pR.RenderCacheSize)
pR.DiscreteInputsBuffer = sync.Map{}
pR.InputsBuffer = NewRingBuffer((pR.RenderCacheSize >> 2) + 1) pR.InputsBuffer = NewRingBuffer((pR.RenderCacheSize >> 2) + 1)
pR.LastAllConfirmedInputFrameId = -1 pR.LastAllConfirmedInputFrameId = -1
@ -697,7 +736,7 @@ func (pR *Room) OnDismissed() {
pR.InputFrameUpsyncDelayTolerance = 2 pR.InputFrameUpsyncDelayTolerance = 2
pR.MaxChasingRenderFramesPerUpdate = 8 pR.MaxChasingRenderFramesPerUpdate = 8
pR.BackendDynamicsEnabled = true // [WARNING] When "false", recovery upon reconnection wouldn't work! pR.BackendDynamicsEnabled = false // [WARNING] When "false", recovery upon reconnection wouldn't work!
pR.BackendDynamicsForceConfirmationEnabled = (pR.BackendDynamicsEnabled && true) pR.BackendDynamicsForceConfirmationEnabled = (pR.BackendDynamicsEnabled && true)
punchSkillId := int32(1) punchSkillId := int32(1)
pR.MeleeSkillConfig = make(map[int32]*MeleeBullet, 0) pR.MeleeSkillConfig = make(map[int32]*MeleeBullet, 0)
@ -998,34 +1037,28 @@ func (pR *Room) prefabInputFrameDownsync(inputFrameId int32) *InputFrameDownsync
return currInputFrameDownsync return currInputFrameDownsync
} }
func (pR *Room) markConfirmationIfApplicable() { func (pR *Room) markConfirmationIfApplicable() int {
newAllConfirmedCount := 0
inputFrameId1 := pR.LastAllConfirmedInputFrameId + 1 inputFrameId1 := pR.LastAllConfirmedInputFrameId + 1
totPlayerCnt := uint32(pR.Capacity) totPlayerCnt := uint32(pR.Capacity)
allConfirmedMask := uint64((1 << totPlayerCnt) - 1) allConfirmedMask := uint64((1 << totPlayerCnt) - 1)
for inputFrameId := inputFrameId1; inputFrameId < pR.InputsBuffer.EdFrameId; inputFrameId++ { for inputFrameId := inputFrameId1; inputFrameId < pR.InputsBuffer.EdFrameId; inputFrameId++ {
tmp := pR.InputsBuffer.GetByFrameId(inputFrameId) tmp := pR.InputsBuffer.GetByFrameId(inputFrameId)
if nil == tmp { if nil == tmp {
panic(fmt.Sprintf("inputFrameId=%v doesn't exist for roomId=%v, this is abnormal because the server should prefab inputFrameDownsync in a most advanced pace, check the prefab logic (Or maybe you're having a 'Room.RenderCacheSize' too small)! InputsBuffer=%v", inputFrameId, pR.Id, pR.InputsBufferString(false))) panic(fmt.Sprintf("inputFrameId=%v doesn't exist for roomId=%v! InputsBuffer=%v", inputFrameId, pR.Id, pR.InputsBufferString(false)))
} }
inputFrameDownsync := tmp.(*InputFrameDownsync) inputFrameDownsync := tmp.(*InputFrameDownsync)
for _, player := range pR.Players {
bufIndex := pR.toDiscreteInputsBufferIndex(inputFrameId, player.JoinIndex)
tmp, loaded := pR.DiscreteInputsBuffer.LoadAndDelete(bufIndex) // It's safe to "LoadAndDelete" here because the "inputFrameUpsync" of this player is already remembered by the corresponding "inputFrameDown".
if !loaded {
continue
}
inputFrameUpsync := tmp.(*InputFrameUpsync)
indiceInJoinIndexBooleanArr := uint32(player.JoinIndex - 1)
inputFrameDownsync.InputList[indiceInJoinIndexBooleanArr] = inputFrameUpsync.Encoded
inputFrameDownsync.ConfirmedList |= (1 << indiceInJoinIndexBooleanArr)
}
if allConfirmedMask == inputFrameDownsync.ConfirmedList { if allConfirmedMask == inputFrameDownsync.ConfirmedList {
newAllConfirmedCount += 1
pR.onInputFrameDownsyncAllConfirmed(inputFrameDownsync, -1) pR.onInputFrameDownsyncAllConfirmed(inputFrameDownsync, -1)
} else { } else {
break break
} }
} }
Logger.Info(fmt.Sprintf("markConfirmationIfApplicable checking inputFrameId=[%v, %v) for roomId=%v, newAllConfirmedCount=%d: InputsBuffer=%v", inputFrameId1, pR.InputsBuffer.EdFrameId, pR.Id, newAllConfirmedCount, pR.InputsBufferString(false)))
return newAllConfirmedCount
} }
func (pR *Room) forceConfirmationIfApplicable() uint64 { func (pR *Room) forceConfirmationIfApplicable() uint64 {
@ -1375,8 +1408,6 @@ func (pR *Room) printBarrier(barrierCollider *resolv.Object) {
} }
func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, upperToSendInputFrameId int32, unconfirmedMask uint64) { func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, upperToSendInputFrameId int32, unconfirmedMask uint64) {
// [WARNING] "pR.InputsBuffer" isn't thread-safe, thus it's critical to guarantee that "pR.InputsBuffer.Put/Pop/GetByFrameId" are all executed by "battleMainLoop" only, i.e. this function should also be executed by "battleMainLoop" only!
currPlayerBattleState := atomic.LoadInt32(&(player.BattleState)) // Might be changed in "OnPlayerDisconnected/OnPlayerLost" from other threads currPlayerBattleState := atomic.LoadInt32(&(player.BattleState)) // Might be changed in "OnPlayerDisconnected/OnPlayerLost" from other threads
// [WARNING] DON'T try to send any message to an inactive player! // [WARNING] DON'T try to send any message to an inactive player!
@ -1431,7 +1462,7 @@ func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, upperToSe
} }
if 0 >= len(toSendInputFrames) { if 0 >= len(toSendInputFrames) {
Logger.Warn(fmt.Sprintf("Not sending due to empty toSendInputFrames: roomId=%v, playerId=%v, refRenderFrameId=%v, upperToSendInputFrameId=%v, lastSentInputFrameId=%v, playerAckingInputFrameId=%v", pR.Id, playerId, refRenderFrameId, upperToSendInputFrameId, player.LastSentInputFrameId, player.AckingInputFrameId)) Logger.Debug(fmt.Sprintf("Not sending due to empty toSendInputFrames: roomId=%v, playerId=%v, refRenderFrameId=%v, upperToSendInputFrameId=%v, lastSentInputFrameId=%v, playerAckingInputFrameId=%v", pR.Id, playerId, refRenderFrameId, upperToSendInputFrameId, player.LastSentInputFrameId, player.AckingInputFrameId))
return return
} }

View File

@ -365,7 +365,7 @@ func Serve(c *gin.Context) {
return nil return nil
} }
// Tries to receive from client-side in a non-blocking manner. // TODO: Is there any potential edge-trigger improvement like the epoll approach mentioned above for the following statement? See discussion in https://github.com/gorilla/websocket/issues/122
_, bytes, err := conn.ReadMessage() _, bytes, err := conn.ReadMessage()
if nil != err { if nil != err {
Logger.Error("About to `signalToCloseConnOfThisPlayer`", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Error(err)) Logger.Error("About to `signalToCloseConnOfThisPlayer`", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Error(err))