Improving backend use of room.InputsBufferLock.

This commit is contained in:
genxium 2022-12-04 23:36:55 +08:00
parent 0373665382
commit e3d844abad
2 changed files with 155 additions and 146 deletions

View File

@ -1,10 +1,5 @@
package models package models
import (
. "battle_srv/protos"
"sync"
)
type RingBuffer struct { type RingBuffer struct {
Ed int32 // write index, open index Ed int32 // write index, open index
St int32 // read index, closed index St int32 // read index, closed index
@ -83,42 +78,3 @@ func (rb *RingBuffer) GetByFrameId(frameId int32) interface{} {
} }
return rb.GetByOffset(frameId - rb.StFrameId) return rb.GetByOffset(frameId - rb.StFrameId)
} }
func (rb *RingBuffer) cloneInputFrameDownsyncsByFrameIdRange(stFrameId, edFrameId int32, mux *sync.Mutex) (int32, []*InputFrameDownsync) {
dst := make([]*InputFrameDownsync, 0, rb.Cnt)
if nil != mux {
mux.Lock()
defer func() {
mux.Unlock()
}()
}
prevFrameFound := true
j := stFrameId
for j < edFrameId {
tmp := rb.GetByFrameId(j)
if nil == tmp {
if false == prevFrameFound {
// The "id"s are always consecutive
break
} else {
prevFrameFound = false
continue
}
}
foo := tmp.(*InputFrameDownsync)
bar := &InputFrameDownsync{
InputFrameId: foo.InputFrameId,
InputList: make([]uint64, len(foo.InputList)),
ConfirmedList: foo.ConfirmedList,
}
for i, input := range foo.InputList {
bar.InputList[i] = input
}
dst = append(dst, bar)
j++
}
return j, dst
}

View File

@ -158,7 +158,7 @@ type Room struct {
Barriers map[int32]*Barrier Barriers map[int32]*Barrier
InputsBuffer *RingBuffer // Indices are STRICTLY consecutive InputsBuffer *RingBuffer // Indices are STRICTLY consecutive
InputsBufferLock sync.Mutex InputsBufferLock sync.Mutex
RenderFrameBuffer *RingBuffer RenderFrameBuffer *RingBuffer // Indices are STRICTLY consecutive
LastAllConfirmedInputFrameId int32 LastAllConfirmedInputFrameId int32
LastAllConfirmedInputFrameIdWithChange int32 LastAllConfirmedInputFrameIdWithChange int32
LastAllConfirmedInputList []uint64 LastAllConfirmedInputList []uint64
@ -342,7 +342,7 @@ func (pR *Room) ConvertToLastUsedRenderFrameId(inputFrameId int32, inputDelayFra
} }
func (pR *Room) RenderFrameBufferString() string { func (pR *Room) RenderFrameBufferString() string {
return fmt.Sprintf("{renderFrameId: %d, stRenderFrameId: %d, edRenderFrameId: %d, lastAllConfirmedRenderFrameId: %d}", pR.RenderFrameId, pR.RenderFrameBuffer.StFrameId, pR.RenderFrameBuffer.EdFrameId, pR.CurDynamicsRenderFrameId) return fmt.Sprintf("{renderFrameId: %d, stRenderFrameId: %d, edRenderFrameId: %d, curDynamicsRenderFrameId: %d}", pR.RenderFrameId, pR.RenderFrameBuffer.StFrameId, pR.RenderFrameBuffer.EdFrameId, pR.CurDynamicsRenderFrameId)
} }
func (pR *Room) InputsBufferString(allDetails bool) string { func (pR *Room) InputsBufferString(allDetails bool) string {
@ -350,7 +350,7 @@ func (pR *Room) InputsBufferString(allDetails bool) string {
// Appending of the array of strings can be very SLOW due to on-demand heap allocation! Use this printing with caution. // Appending of the array of strings can be very SLOW due to on-demand heap allocation! Use this printing with caution.
s := make([]string, 0) s := make([]string, 0)
s = append(s, fmt.Sprintf("{renderFrameId: %v, stInputFrameId: %v, edInputFrameId: %v, lastAllConfirmedInputFrameIdWithChange: %v, lastAllConfirmedInputFrameId: %v}", pR.RenderFrameId, pR.InputsBuffer.StFrameId, pR.InputsBuffer.EdFrameId, pR.LastAllConfirmedInputFrameIdWithChange, pR.LastAllConfirmedInputFrameId)) s = append(s, fmt.Sprintf("{renderFrameId: %v, stInputFrameId: %v, edInputFrameId: %v, lastAllConfirmedInputFrameIdWithChange: %v, lastAllConfirmedInputFrameId: %v}", pR.RenderFrameId, pR.InputsBuffer.StFrameId, pR.InputsBuffer.EdFrameId, pR.LastAllConfirmedInputFrameIdWithChange, pR.LastAllConfirmedInputFrameId))
for playerId, player := range pR.Players { for playerId, player := range pR.PlayersArr {
s = append(s, fmt.Sprintf("{playerId: %v, ackingFrameId: %v, ackingInputFrameId: %v, lastSentInputFrameId: %v}", playerId, player.AckingFrameId, player.AckingInputFrameId, player.LastSentInputFrameId)) s = append(s, fmt.Sprintf("{playerId: %v, ackingFrameId: %v, ackingInputFrameId: %v, lastSentInputFrameId: %v}", playerId, player.AckingFrameId, player.AckingInputFrameId, player.LastSentInputFrameId))
} }
for i := pR.InputsBuffer.StFrameId; i < pR.InputsBuffer.EdFrameId; i++ { for i := pR.InputsBuffer.StFrameId; i < pR.InputsBuffer.EdFrameId; i++ {
@ -428,7 +428,8 @@ func (pR *Room) StartBattle() {
} }
if 0 == pR.RenderFrameId { if 0 == pR.RenderFrameId {
for playerId, player := range pR.Players { for _, player := range pR.PlayersArr {
playerId := player.Id
thatPlayerBattleState := atomic.LoadInt32(&(player.BattleState)) // Might be changed in "OnPlayerDisconnected/OnPlayerLost" from other threads thatPlayerBattleState := atomic.LoadInt32(&(player.BattleState)) // Might be changed in "OnPlayerDisconnected/OnPlayerLost" from other threads
// [WARNING] DON'T try to send any message to an inactive player! // [WARNING] DON'T try to send any message to an inactive player!
switch thatPlayerBattleState { switch thatPlayerBattleState {
@ -445,32 +446,14 @@ func (pR *Room) StartBattle() {
} }
dynamicsDuration := int64(0) dynamicsDuration := int64(0)
unconfirmedMask := uint64(0)
// Prefab and buffer backend inputFrameDownsync // Prefab and buffer backend inputFrameDownsync
if pR.BackendDynamicsEnabled { if pR.BackendDynamicsEnabled {
if pR.shouldPrefabInputFrameDownsync(pR.RenderFrameId) { unconfirmedMask, inputsBufferSnapshot := pR.doBattleMainLoopPerTickBackendDynamicsWithProperLocking(&dynamicsDuration)
noDelayInputFrameId := pR.ConvertToInputFrameId(pR.RenderFrameId, 0)
if existingInputFrame := pR.InputsBuffer.GetByFrameId(noDelayInputFrameId); nil == existingInputFrame {
pR.prefabInputFrameDownsync(noDelayInputFrameId, true)
}
}
// Force setting all-confirmed of buffered inputFrames periodically, kindly note that if "pR.BackendDynamicsEnabled", what we want to achieve is "recovery upon reconnection", which certainly requires "forceConfirmationIfApplicable" to move "pR.LastAllConfirmedInputFrameId" forward as much as possible
unconfirmedMask = pR.forceConfirmationIfApplicable()
if 0 <= pR.LastAllConfirmedInputFrameId {
dynamicsStartedAt := utils.UnixtimeNano()
// Apply "all-confirmed inputFrames" to move forward "pR.CurDynamicsRenderFrameId"
nextDynamicsRenderFrameId := pR.ConvertToLastUsedRenderFrameId(pR.LastAllConfirmedInputFrameId, pR.InputDelayFrames)
Logger.Debug(fmt.Sprintf("roomId=%v, room.RenderFrameId=%v, room.CurDynamicsRenderFrameId=%v, LastAllConfirmedInputFrameId=%v, InputDelayFrames=%v, nextDynamicsRenderFrameId=%v", pR.Id, pR.RenderFrameId, pR.CurDynamicsRenderFrameId, pR.LastAllConfirmedInputFrameId, pR.InputDelayFrames, nextDynamicsRenderFrameId))
pR.applyInputFrameDownsyncDynamics(pR.CurDynamicsRenderFrameId, nextDynamicsRenderFrameId, pR.collisionSpaceOffsetX, pR.collisionSpaceOffsetY)
dynamicsDuration = utils.UnixtimeNano() - dynamicsStartedAt
}
if 0 < unconfirmedMask { if 0 < unconfirmedMask {
Logger.Debug(fmt.Sprintf("roomId=%v, room.RenderFrameId=%v, room.CurDynamicsRenderFrameId=%v, room.LastAllConfirmedInputFrameId=%v, unconfirmedMask=%v", pR.Id, pR.RenderFrameId, pR.CurDynamicsRenderFrameId, pR.LastAllConfirmedInputFrameId, unconfirmedMask))
// Otherwise no need to downsync immediately // Otherwise no need to downsync immediately
pR.downsyncToAllPlayers(pR.LastAllConfirmedInputFrameId, unconfirmedMask, false) Logger.Debug(fmt.Sprintf("roomId=%v, room.RenderFrameId=%v, room.CurDynamicsRenderFrameId=%v, room.LastAllConfirmedInputFrameId=%v, unconfirmedMask=%v", pR.Id, pR.RenderFrameId, pR.CurDynamicsRenderFrameId, pR.LastAllConfirmedInputFrameId, unconfirmedMask))
pR.downsyncToAllPlayers(pR.LastAllConfirmedInputFrameId, unconfirmedMask, inputsBufferSnapshot)
} }
} }
@ -496,6 +479,7 @@ func (pR *Room) toDiscreteInputsBufferIndex(inputFrameId int32, joinIndex int32)
func (pR *Room) OnBattleCmdReceived(pReq *WsReq) { func (pR *Room) OnBattleCmdReceived(pReq *WsReq) {
// [WARNING] This function "OnBattleCmdReceived" could be called by different ws sessions and thus from different threads! // [WARNING] This function "OnBattleCmdReceived" could be called by different ws sessions and thus from different threads!
// TODO: Put a rate limiter on this function!
if swapped := atomic.CompareAndSwapInt32(&pR.State, RoomBattleStateIns.IN_BATTLE, RoomBattleStateIns.IN_BATTLE); !swapped { if swapped := atomic.CompareAndSwapInt32(&pR.State, RoomBattleStateIns.IN_BATTLE, RoomBattleStateIns.IN_BATTLE); !swapped {
return return
} }
@ -515,44 +499,16 @@ func (pR *Room) OnBattleCmdReceived(pReq *WsReq) {
atomic.StoreInt32(&(player.AckingFrameId), ackingFrameId) atomic.StoreInt32(&(player.AckingFrameId), ackingFrameId)
atomic.StoreInt32(&(player.AckingInputFrameId), ackingInputFrameId) atomic.StoreInt32(&(player.AckingInputFrameId), ackingInputFrameId)
Logger.Debug(fmt.Sprintf("InputsBufferLock about to lock: roomId=%v", pR.Id)) newAllConfirmedCount := pR.markConfirmationIfApplicable(inputFrameUpsyncBatch, playerId, player)
pR.InputsBufferLock.Lock() // [WARNING] By now "pR.InputsBufferLock" is unlocked
Logger.Debug(fmt.Sprintf("InputsBufferLock locked: roomId=%v", pR.Id))
defer func() {
pR.InputsBufferLock.Unlock()
Logger.Debug(fmt.Sprintf("InputsBufferLock unlocked: roomId=%v", pR.Id))
}()
for _, inputFrameUpsync := range inputFrameUpsyncBatch {
clientInputFrameId := inputFrameUpsync.InputFrameId
if clientInputFrameId < pR.InputsBuffer.StFrameId {
// The updates to "pR.InputsBuffer.StFrameId" is monotonically increasing, thus if "clientInputFrameId < pR.InputsBuffer.StFrameId" at any moment of time, it is obsolete in the future.
Logger.Warn(fmt.Sprintf("Omitting obsolete inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false)))
continue
}
if clientInputFrameId > pR.InputsBuffer.EdFrameId {
Logger.Warn(fmt.Sprintf("Dropping too advanced inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v; is this player cheating?", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false)))
continue
}
var targetInputFrameDownsync *InputFrameDownsync = nil
if clientInputFrameId == pR.InputsBuffer.EdFrameId {
targetInputFrameDownsync = pR.prefabInputFrameDownsync(clientInputFrameId, false)
Logger.Debug(fmt.Sprintf("OnBattleCmdReceived-Prefabbed new inputFrameDownsync from inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false)))
} else {
targetInputFrameDownsync = pR.InputsBuffer.GetByFrameId(clientInputFrameId).(*InputFrameDownsync)
Logger.Debug(fmt.Sprintf("OnBattleCmdReceived-stuffing inputFrameDownsync from inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false)))
}
targetInputFrameDownsync.InputList[player.JoinIndex-1] = inputFrameUpsync.Encoded
targetInputFrameDownsync.ConfirmedList |= uint64(1 << uint32(player.JoinIndex-1))
}
newAllConfirmedCount := pR.markConfirmationIfApplicable()
if 0 < newAllConfirmedCount { if 0 < newAllConfirmedCount {
// Downsync new all-confirmed inputFrames asap // Downsync new all-confirmed inputFrames asap
unconfirmedMask := uint64(0) pR.downsyncToAllPlayers(pR.LastAllConfirmedInputFrameId, uint64(0), pR.createInputsBufferSnapshot())
pR.downsyncToAllPlayers(pR.LastAllConfirmedInputFrameId, unconfirmedMask, true)
} }
} }
func (pR *Room) onInputFrameDownsyncAllConfirmed(inputFrameDownsync *InputFrameDownsync, playerId int32) { func (pR *Room) onInputFrameDownsyncAllConfirmed(inputFrameDownsync *InputFrameDownsync, playerId int32) {
// [WARNING] This function MUST BE called while "pR.InputsBufferLock" is locked!
inputFrameId := inputFrameDownsync.InputFrameId inputFrameId := inputFrameDownsync.InputFrameId
if -1 == pR.LastAllConfirmedInputFrameIdWithChange || false == pR.equalInputLists(inputFrameDownsync.InputList, pR.LastAllConfirmedInputList) { if -1 == pR.LastAllConfirmedInputFrameIdWithChange || false == pR.equalInputLists(inputFrameDownsync.InputList, pR.LastAllConfirmedInputList) {
if -1 == playerId { if -1 == playerId {
@ -560,9 +516,9 @@ func (pR *Room) onInputFrameDownsyncAllConfirmed(inputFrameDownsync *InputFrameD
} else { } else {
Logger.Debug(fmt.Sprintf("Key inputFrame change: roomId=%v, playerId=%v, newInputFrameId=%v, lastInputFrameId=%v, newInputList=%v, lastInputList=%v, InputsBuffer=%v", pR.Id, playerId, inputFrameId, pR.LastAllConfirmedInputFrameId, inputFrameDownsync.InputList, pR.LastAllConfirmedInputList, pR.InputsBufferString(false))) Logger.Debug(fmt.Sprintf("Key inputFrame change: roomId=%v, playerId=%v, newInputFrameId=%v, lastInputFrameId=%v, newInputList=%v, lastInputList=%v, InputsBuffer=%v", pR.Id, playerId, inputFrameId, pR.LastAllConfirmedInputFrameId, inputFrameDownsync.InputList, pR.LastAllConfirmedInputList, pR.InputsBufferString(false)))
} }
atomic.StoreInt32(&(pR.LastAllConfirmedInputFrameIdWithChange), inputFrameId) pR.LastAllConfirmedInputFrameIdWithChange = inputFrameId
} }
atomic.StoreInt32(&(pR.LastAllConfirmedInputFrameId), inputFrameId) // [WARNING] It's IMPORTANT that "pR.LastAllConfirmedInputFrameId" is NOT NECESSARILY CONSECUTIVE, i.e. if one of the players disconnects and reconnects within a considerable amount of frame delays! pR.LastAllConfirmedInputFrameId = inputFrameId
for i, v := range inputFrameDownsync.InputList { for i, v := range inputFrameDownsync.InputList {
// To avoid potential misuse of pointers // To avoid potential misuse of pointers
pR.LastAllConfirmedInputList[i] = v pR.LastAllConfirmedInputList[i] = v
@ -717,8 +673,8 @@ func (pR *Room) OnDismissed() {
pR.ServerFps = 60 pR.ServerFps = 60
pR.RollbackEstimatedDtMillis = 16.667 // Use fixed-and-low-precision to mitigate the inconsistent floating-point-number issue between Golang and JavaScript pR.RollbackEstimatedDtMillis = 16.667 // Use fixed-and-low-precision to mitigate the inconsistent floating-point-number issue between Golang and JavaScript
pR.RollbackEstimatedDtNanos = 16666666 // A little smaller than the actual per frame time, just for logging FAST FRAME pR.RollbackEstimatedDtNanos = 16666666 // A little smaller than the actual per frame time, just for logging FAST FRAME
dilutedServerFps := int64(58) dilutedServerFps := float64(59.5)
pR.dilutedRollbackEstimatedDtNanos = pR.RollbackEstimatedDtNanos * (int64(pR.ServerFps) / dilutedServerFps) pR.dilutedRollbackEstimatedDtNanos = pR.RollbackEstimatedDtNanos * int64(float64(pR.ServerFps)/dilutedServerFps)
pR.BattleDurationFrames = 30 * pR.ServerFps pR.BattleDurationFrames = 30 * pR.ServerFps
pR.BattleDurationNanos = int64(pR.BattleDurationFrames) * (pR.RollbackEstimatedDtNanos + 1) pR.BattleDurationNanos = int64(pR.BattleDurationFrames) * (pR.RollbackEstimatedDtNanos + 1)
pR.InputFrameUpsyncDelayTolerance = 2 pR.InputFrameUpsyncDelayTolerance = 2
@ -941,7 +897,7 @@ func (pR *Room) OnPlayerBattleColliderAcked(playerId int32) bool {
case PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK: case PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK:
// only send resync info to the targetPlayer // only send resync info to the targetPlayer
// i.e. implies that "targetPlayer.LastSentInputFrameId == MAGIC_LAST_SENT_INPUT_FRAME_ID_READDED" // i.e. implies that "targetPlayer.LastSentInputFrameId == MAGIC_LAST_SENT_INPUT_FRAME_ID_READDED"
pR.downsyncToSinglePlayer(playerId, targetPlayer, pR.LastAllConfirmedInputFrameId, uint64(0), false) pR.downsyncToSinglePlayer(playerId, targetPlayer, pR.LastAllConfirmedInputFrameId, uint64(0), pR.createInputsBufferSnapshot())
default: default:
} }
@ -996,24 +952,13 @@ func (pR *Room) shouldPrefabInputFrameDownsync(renderFrameId int32) bool {
return ((renderFrameId & ((1 << pR.InputScaleFrames) - 1)) == 0) return ((renderFrameId & ((1 << pR.InputScaleFrames) - 1)) == 0)
} }
func (pR *Room) prefabInputFrameDownsync(inputFrameId int32, lockInputsBuffer bool) *InputFrameDownsync { func (pR *Room) prefabInputFrameDownsync(inputFrameId int32) *InputFrameDownsync {
/* /*
Kindly note that on backend the prefab is much simpler than its frontend counterpart, because frontend will upsync its latest command immediately if there's any change w.r.t. its own prev cmd, thus if no upsync received from a frontend, Kindly note that on backend the prefab is much simpler than its frontend counterpart, because frontend will upsync its latest command immediately if there's any change w.r.t. its own prev cmd, thus if no upsync received from a frontend,
- EITHER it's due to local lag and bad network, - EITHER it's due to local lag and bad network,
- OR there's no change w.r.t. to its prev cmd. - OR there's no change w.r.t. to its prev cmd.
*/ */
if lockInputsBuffer {
Logger.Debug(fmt.Sprintf("prefabInputFrameDownsync-InputsBufferLock to about lock: roomId=%v", pR.Id))
pR.InputsBufferLock.Lock()
Logger.Debug(fmt.Sprintf("prefabInputFrameDownsync-InputsBufferLock locked: roomId=%v", pR.Id))
defer func() {
pR.InputsBufferLock.Unlock()
Logger.Debug(fmt.Sprintf("prefabInputFrameDownsync-InputsBufferLock unlocked: roomId=%v", pR.Id))
}()
}
var currInputFrameDownsync *InputFrameDownsync = nil var currInputFrameDownsync *InputFrameDownsync = nil
if 0 == inputFrameId && 0 == pR.InputsBuffer.Cnt { if 0 == inputFrameId && 0 == pR.InputsBuffer.Cnt {
@ -1043,7 +988,42 @@ func (pR *Room) prefabInputFrameDownsync(inputFrameId int32, lockInputsBuffer bo
return currInputFrameDownsync return currInputFrameDownsync
} }
func (pR *Room) markConfirmationIfApplicable() int { func (pR *Room) markConfirmationIfApplicable(inputFrameUpsyncBatch []*InputFrameUpsync, playerId int32, player *Player) int {
Logger.Debug(fmt.Sprintf("markConfirmationIfApplicable-InputsBufferLock about to lock: roomId=%v, fromPlayerId=%v", pR.Id, playerId))
pR.InputsBufferLock.Lock()
Logger.Debug(fmt.Sprintf("markConfirmationIfApplicable-InputsBufferLock locked: roomId=%v, fromPlayerId=%v", pR.Id, playerId))
defer func() {
pR.InputsBufferLock.Unlock()
Logger.Debug(fmt.Sprintf("markConfirmationIfApplicable-InputsBufferLock unlocked: roomId=%v, fromPlayerId=%v", pR.Id, playerId))
}()
for _, inputFrameUpsync := range inputFrameUpsyncBatch {
clientInputFrameId := inputFrameUpsync.InputFrameId
if clientInputFrameId < pR.InputsBuffer.StFrameId {
// The updates to "pR.InputsBuffer.StFrameId" is monotonically increasing, thus if "clientInputFrameId < pR.InputsBuffer.StFrameId" at any moment of time, it is obsolete in the future.
Logger.Debug(fmt.Sprintf("Omitting obsolete inputFrameUpsync#1: roomId=%v, playerId=%v, clientInputFrameId=%v, lastAllConfirmedInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, pR.LastAllConfirmedInputFrameId, pR.InputsBufferString(false)))
continue
}
if clientInputFrameId < pR.LastAllConfirmedInputFrameId {
Logger.Info(fmt.Sprintf("Omitting obsolete inputFrameUpsync#2: roomId=%v, playerId=%v, clientInputFrameId=%v, lastAllConfirmedInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, pR.LastAllConfirmedInputFrameId, pR.InputsBufferString(false)))
continue
}
if clientInputFrameId > pR.InputsBuffer.EdFrameId {
Logger.Warn(fmt.Sprintf("Dropping too advanced inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, lastAllConfirmedInputFrameId=%v, InputsBuffer=%v; is this player cheating?", pR.Id, playerId, clientInputFrameId, pR.LastAllConfirmedInputFrameId, pR.InputsBufferString(false)))
continue
}
var targetInputFrameDownsync *InputFrameDownsync = nil
if clientInputFrameId == pR.InputsBuffer.EdFrameId {
targetInputFrameDownsync = pR.prefabInputFrameDownsync(clientInputFrameId)
Logger.Debug(fmt.Sprintf("OnBattleCmdReceived-Prefabbed new inputFrameDownsync from inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false)))
} else {
targetInputFrameDownsync = pR.InputsBuffer.GetByFrameId(clientInputFrameId).(*InputFrameDownsync)
Logger.Debug(fmt.Sprintf("OnBattleCmdReceived-stuffing inputFrameDownsync from inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false)))
}
targetInputFrameDownsync.InputList[player.JoinIndex-1] = inputFrameUpsync.Encoded
targetInputFrameDownsync.ConfirmedList |= uint64(1 << uint32(player.JoinIndex-1))
}
newAllConfirmedCount := 0 newAllConfirmedCount := 0
inputFrameId1 := pR.LastAllConfirmedInputFrameId + 1 inputFrameId1 := pR.LastAllConfirmedInputFrameId + 1
totPlayerCnt := uint32(pR.Capacity) totPlayerCnt := uint32(pR.Capacity)
@ -1058,7 +1038,7 @@ func (pR *Room) markConfirmationIfApplicable() int {
inputFrameDownsync := tmp.(*InputFrameDownsync) inputFrameDownsync := tmp.(*InputFrameDownsync)
if allConfirmedMask != inputFrameDownsync.ConfirmedList { if allConfirmedMask != inputFrameDownsync.ConfirmedList {
for _, player := range pR.Players { for _, player := range pR.PlayersArr {
thatPlayerBattleState := atomic.LoadInt32(&(player.BattleState)) thatPlayerBattleState := atomic.LoadInt32(&(player.BattleState))
thatPlayerJoinMask := uint64(1 << uint32(player.JoinIndex-1)) thatPlayerJoinMask := uint64(1 << uint32(player.JoinIndex-1))
if 0 == (inputFrameDownsync.ConfirmedList & thatPlayerJoinMask) { if 0 == (inputFrameDownsync.ConfirmedList & thatPlayerJoinMask) {
@ -1085,6 +1065,7 @@ func (pR *Room) markConfirmationIfApplicable() int {
} }
func (pR *Room) forceConfirmationIfApplicable() uint64 { func (pR *Room) forceConfirmationIfApplicable() uint64 {
// [WARNING] This function MUST BE called while "pR.InputsBufferLock" is locked!
// Force confirmation of non-all-confirmed inputFrame EXACTLY ONE AT A TIME, returns the non-confirmed mask of players, e.g. in a 4-player-battle returning 1001 means that players with JoinIndex=1 and JoinIndex=4 are non-confirmed for inputFrameId2 // Force confirmation of non-all-confirmed inputFrame EXACTLY ONE AT A TIME, returns the non-confirmed mask of players, e.g. in a 4-player-battle returning 1001 means that players with JoinIndex=1 and JoinIndex=4 are non-confirmed for inputFrameId2
renderFrameId1 := (pR.RenderFrameId - pR.NstDelayFrames) // the renderFrameId which should've been rendered on frontend renderFrameId1 := (pR.RenderFrameId - pR.NstDelayFrames) // the renderFrameId which should've been rendered on frontend
if 0 > renderFrameId1 || !pR.shouldPrefabInputFrameDownsync(renderFrameId1) { if 0 > renderFrameId1 || !pR.shouldPrefabInputFrameDownsync(renderFrameId1) {
@ -1103,14 +1084,6 @@ func (pR *Room) forceConfirmationIfApplicable() uint64 {
return 0 return 0
} }
Logger.Debug(fmt.Sprintf("forceConfirmation-InputsBufferLock about to lock: roomId=%v", pR.Id))
pR.InputsBufferLock.Lock()
Logger.Debug(fmt.Sprintf("forceConfirmation-InputsBufferLock locked: roomId=%v", pR.Id))
defer func() {
pR.InputsBufferLock.Unlock()
Logger.Debug(fmt.Sprintf("forceConfirmation-InputsBufferLock unlocked: roomId=%v", pR.Id))
}()
tmp := pR.InputsBuffer.GetByFrameId(inputFrameId2) tmp := pR.InputsBuffer.GetByFrameId(inputFrameId2)
if nil == tmp { if nil == tmp {
panic(fmt.Sprintf("inputFrameId2=%v doesn't exist for roomId=%v, this is abnormal because the server should prefab inputFrameDownsync in a most advanced pace, check the prefab logic! InputsBuffer=%v", inputFrameId2, pR.Id, pR.InputsBufferString(false))) panic(fmt.Sprintf("inputFrameId2=%v doesn't exist for roomId=%v, this is abnormal because the server should prefab inputFrameDownsync in a most advanced pace, check the prefab logic! InputsBuffer=%v", inputFrameId2, pR.Id, pR.InputsBufferString(false)))
@ -1164,7 +1137,7 @@ func (pR *Room) applyInputFrameDownsyncDynamics(fromRenderFrameId int32, toRende
} }
delayedInputFrame = tmp.(*InputFrameDownsync) delayedInputFrame = tmp.(*InputFrameDownsync)
// [WARNING] It's possible that by now "allConfirmedMask != delayedInputFrame.ConfirmedList && delayedInputFrameId <= pR.LastAllConfirmedInputFrameId", we trust "pR.LastAllConfirmedInputFrameId" as the TOP AUTHORITY. // [WARNING] It's possible that by now "allConfirmedMask != delayedInputFrame.ConfirmedList && delayedInputFrameId <= pR.LastAllConfirmedInputFrameId", we trust "pR.LastAllConfirmedInputFrameId" as the TOP AUTHORITY.
atomic.StoreUint64(&(delayedInputFrame.ConfirmedList), allConfirmedMask) delayedInputFrame.ConfirmedList = allConfirmedMask
} }
nextRenderFrame := pR.applyInputFrameDownsyncDynamicsOnSingleRenderFrame(delayedInputFrame, currRenderFrame, pR.CollisionSysMap) nextRenderFrame := pR.applyInputFrameDownsyncDynamicsOnSingleRenderFrame(delayedInputFrame, currRenderFrame, pR.CollisionSysMap)
@ -1211,7 +1184,8 @@ func (pR *Room) applyInputFrameDownsyncDynamicsOnSingleRenderFrame(delayedInputF
effPushbacks := make([]Vec2D, pR.Capacity) // Guaranteed determinism regardless of traversal order effPushbacks := make([]Vec2D, pR.Capacity) // Guaranteed determinism regardless of traversal order
// Reset playerCollider position from the "virtual grid position" // Reset playerCollider position from the "virtual grid position"
for playerId, player := range pR.Players { for _, player := range pR.PlayersArr {
playerId := player.Id
joinIndex := player.JoinIndex joinIndex := player.JoinIndex
bulletPushbacks[joinIndex-1].X, bulletPushbacks[joinIndex-1].Y = float64(0), float64(0) bulletPushbacks[joinIndex-1].X, bulletPushbacks[joinIndex-1].Y = float64(0), float64(0)
effPushbacks[joinIndex-1].X, effPushbacks[joinIndex-1].Y = float64(0), float64(0) effPushbacks[joinIndex-1].X, effPushbacks[joinIndex-1].Y = float64(0), float64(0)
@ -1306,7 +1280,8 @@ func (pR *Room) applyInputFrameDownsyncDynamicsOnSingleRenderFrame(delayedInputF
} }
inputList := delayedInputFrame.InputList inputList := delayedInputFrame.InputList
// Process player inputs // Process player inputs
for playerId, player := range pR.Players { for _, player := range pR.PlayersArr {
playerId := player.Id
joinIndex := player.JoinIndex joinIndex := player.JoinIndex
collisionPlayerIndex := COLLISION_PLAYER_INDEX_PREFIX + joinIndex collisionPlayerIndex := COLLISION_PLAYER_INDEX_PREFIX + joinIndex
playerCollider := collisionSysMap[collisionPlayerIndex] playerCollider := collisionSysMap[collisionPlayerIndex]
@ -1368,7 +1343,7 @@ func (pR *Room) applyInputFrameDownsyncDynamicsOnSingleRenderFrame(delayedInputF
} }
// handle pushbacks upon collision after all movements treated as simultaneous // handle pushbacks upon collision after all movements treated as simultaneous
for _, player := range pR.Players { for _, player := range pR.PlayersArr {
joinIndex := player.JoinIndex joinIndex := player.JoinIndex
collisionPlayerIndex := COLLISION_PLAYER_INDEX_PREFIX + joinIndex collisionPlayerIndex := COLLISION_PLAYER_INDEX_PREFIX + joinIndex
playerCollider := collisionSysMap[collisionPlayerIndex] playerCollider := collisionSysMap[collisionPlayerIndex]
@ -1387,7 +1362,8 @@ func (pR *Room) applyInputFrameDownsyncDynamicsOnSingleRenderFrame(delayedInputF
} }
} }
for playerId, player := range pR.Players { for _, player := range pR.PlayersArr {
playerId := player.Id
joinIndex := player.JoinIndex joinIndex := player.JoinIndex
collisionPlayerIndex := COLLISION_PLAYER_INDEX_PREFIX + joinIndex collisionPlayerIndex := COLLISION_PLAYER_INDEX_PREFIX + joinIndex
playerCollider := collisionSysMap[collisionPlayerIndex] playerCollider := collisionSysMap[collisionPlayerIndex]
@ -1447,8 +1423,48 @@ func (pR *Room) printBarrier(barrierCollider *resolv.Object) {
Logger.Info(fmt.Sprintf("Barrier in roomId=%v: w=%v, h=%v, shape=%v", pR.Id, barrierCollider.W, barrierCollider.H, barrierCollider.Shape)) Logger.Info(fmt.Sprintf("Barrier in roomId=%v: w=%v, h=%v, shape=%v", pR.Id, barrierCollider.W, barrierCollider.H, barrierCollider.Shape))
} }
func (pR *Room) downsyncToAllPlayers(upperToSendInputFrameId int32, unconfirmedMask uint64, prohibitsInputsBufferLock bool) { func (pR *Room) doBattleMainLoopPerTickBackendDynamicsWithProperLocking(pDynamicsDuration *int64) (uint64, *RingBuffer) {
for playerId, player := range pR.Players { Logger.Debug(fmt.Sprintf("doBattleMainLoopPerTickBackendDynamicsWithProperLocking-InputsBufferLock to about lock: roomId=%v", pR.Id))
pR.InputsBufferLock.Lock()
Logger.Debug(fmt.Sprintf("doBattleMainLoopPerTickBackendDynamicsWithProperLocking-InputsBufferLock locked: roomId=%v", pR.Id))
defer func() {
// [WARNING] Unlock before calling "downsyncToAllPlayers -> downsyncToSinglePlayer" to avoid unlocking lags due to blocking network I/O
pR.InputsBufferLock.Unlock()
Logger.Debug(fmt.Sprintf("doBattleMainLoopPerTickBackendDynamicsWithProperLocking-InputsBufferLock unlocked: roomId=%v", pR.Id))
}()
if pR.shouldPrefabInputFrameDownsync(pR.RenderFrameId) {
noDelayInputFrameId := pR.ConvertToInputFrameId(pR.RenderFrameId, 0)
if existingInputFrame := pR.InputsBuffer.GetByFrameId(noDelayInputFrameId); nil == existingInputFrame {
pR.prefabInputFrameDownsync(noDelayInputFrameId)
}
}
// Force setting all-confirmed of buffered inputFrames periodically, kindly note that if "pR.BackendDynamicsEnabled", what we want to achieve is "recovery upon reconnection", which certainly requires "forceConfirmationIfApplicable" to move "pR.LastAllConfirmedInputFrameId" forward as much as possible
unconfirmedMask := pR.forceConfirmationIfApplicable()
if 0 <= pR.LastAllConfirmedInputFrameId {
dynamicsStartedAt := utils.UnixtimeNano()
// Apply "all-confirmed inputFrames" to move forward "pR.CurDynamicsRenderFrameId"
nextDynamicsRenderFrameId := pR.ConvertToLastUsedRenderFrameId(pR.LastAllConfirmedInputFrameId, pR.InputDelayFrames)
Logger.Debug(fmt.Sprintf("roomId=%v, room.RenderFrameId=%v, room.CurDynamicsRenderFrameId=%v, LastAllConfirmedInputFrameId=%v, InputDelayFrames=%v, nextDynamicsRenderFrameId=%v", pR.Id, pR.RenderFrameId, pR.CurDynamicsRenderFrameId, pR.LastAllConfirmedInputFrameId, pR.InputDelayFrames, nextDynamicsRenderFrameId))
pR.applyInputFrameDownsyncDynamics(pR.CurDynamicsRenderFrameId, nextDynamicsRenderFrameId, pR.collisionSpaceOffsetX, pR.collisionSpaceOffsetY)
*pDynamicsDuration = utils.UnixtimeNano() - dynamicsStartedAt
}
var inputsBufferSnapshot *RingBuffer = nil
if 0 < unconfirmedMask {
// [WARNING] This condition should be rarely met!
// Otherwise there's no need to create the inputsBufferSnapshot
inputsBufferSnapshot = pR.createInputsBufferSnapshot()
}
return unconfirmedMask, inputsBufferSnapshot
}
func (pR *Room) downsyncToAllPlayers(upperToSendInputFrameId int32, unconfirmedMask uint64, inputsBufferSnapshot *RingBuffer) {
// [WARNING] This function MUST BE called while "pR.InputsBufferLock" is unlocked -- otherwise the blocking "sendSafely" might cause significant lag!
for _, player := range pR.PlayersArr {
thatPlayerBattleState := atomic.LoadInt32(&(player.BattleState)) thatPlayerBattleState := atomic.LoadInt32(&(player.BattleState))
switch thatPlayerBattleState { switch thatPlayerBattleState {
case PlayerBattleStateIns.DISCONNECTED: case PlayerBattleStateIns.DISCONNECTED:
@ -1458,12 +1474,12 @@ func (pR *Room) downsyncToAllPlayers(upperToSendInputFrameId int32, unconfirmedM
case PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK: // This is the reason why battleState filter is put at "downsyncToAllPlayers" instead of "downsyncToSinglePlayer" case PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK: // This is the reason why battleState filter is put at "downsyncToAllPlayers" instead of "downsyncToSinglePlayer"
continue continue
} }
pR.downsyncToSinglePlayer(playerId, player, pR.LastAllConfirmedInputFrameId, unconfirmedMask, prohibitsInputsBufferLock) pR.downsyncToSinglePlayer(player.Id, player, pR.LastAllConfirmedInputFrameId, unconfirmedMask, inputsBufferSnapshot)
} }
} }
func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, upperToSendInputFrameId int32, unconfirmedMask uint64, prohibitsInputsBufferLock bool) { func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, upperToSendInputFrameId int32, unconfirmedMask uint64, inputsBufferSnapshot *RingBuffer) {
// TODO: DON'T try to send any message to an inactive player! However, note that player.BattleState could be modified from different threads and needs to be synced! // [WARNING] This function MUST BE called while "pR.InputsBufferLock" is unlocked -- otherwise the blocking "sendSafely" might cause significant lag!
// [WARNING] Websocket is TCP-based, thus no need to re-send a previously sent inputFrame to a same player! // [WARNING] Websocket is TCP-based, thus no need to re-send a previously sent inputFrame to a same player!
lowerToSentInputFrameId := player.LastSentInputFrameId + 1 lowerToSentInputFrameId := player.LastSentInputFrameId + 1
/* /*
@ -1494,13 +1510,8 @@ func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, upperToSe
} }
// [WARNING] EDGE CASE HERE: Upon initialization, all of "lastAllConfirmedInputFrameId", "lastAllConfirmedInputFrameIdWithChange" and "anchorInputFrameId" are "-1", thus "j" starts with "0", however "inputFrameId: 0" might not have been all confirmed! // [WARNING] EDGE CASE HERE: Upon initialization, all of "lastAllConfirmedInputFrameId", "lastAllConfirmedInputFrameIdWithChange" and "anchorInputFrameId" are "-1", thus "j" starts with "0", however "inputFrameId: 0" might not have been all confirmed!
var theInputsBufferLockToUse *sync.Mutex = &pR.InputsBufferLock
if prohibitsInputsBufferLock {
// Already locked in caller function
theInputsBufferLockToUse = nil
}
// [WARNING] Clone to deliberately avoid holding "pR.InputsBufferLock" while using network I/O // [WARNING] Clone to deliberately avoid holding "pR.InputsBufferLock" while using network I/O
j, toSendInputFrameDownsyncs := pR.InputsBuffer.cloneInputFrameDownsyncsByFrameIdRange(lowerToSentInputFrameId, upperToSendInputFrameId+1, theInputsBufferLockToUse) j, toSendInputFrameDownsyncs := pR.cloneInputFrameDownsyncsByFrameIdRange(lowerToSentInputFrameId, upperToSendInputFrameId+1, inputsBufferSnapshot)
if 0 >= len(toSendInputFrameDownsyncs) { if 0 >= len(toSendInputFrameDownsyncs) {
Logger.Debug(fmt.Sprintf("Not sending due to actually empty toSendInputFrameDownsyncs: roomId=%v, playerId=%v, playerLastSentInputFrameId=%v, refRenderFrameId=%v, lowerToSentInputFrameId=%v, upperToSendInputFrameId=%v, j=%v, lastSentInputFrameId=%v, playerAckingInputFrameId=%v", pR.Id, playerId, player.LastSentInputFrameId, refRenderFrameId, lowerToSentInputFrameId, upperToSendInputFrameId, j, player.LastSentInputFrameId, player.AckingInputFrameId)) Logger.Debug(fmt.Sprintf("Not sending due to actually empty toSendInputFrameDownsyncs: roomId=%v, playerId=%v, playerLastSentInputFrameId=%v, refRenderFrameId=%v, lowerToSentInputFrameId=%v, upperToSendInputFrameId=%v, j=%v, lastSentInputFrameId=%v, playerAckingInputFrameId=%v", pR.Id, playerId, player.LastSentInputFrameId, refRenderFrameId, lowerToSentInputFrameId, upperToSendInputFrameId, j, player.LastSentInputFrameId, player.AckingInputFrameId))
@ -1519,8 +1530,8 @@ func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, upperToSe
} }
refRenderFrame := tmp.(*RoomDownsyncFrame) refRenderFrame := tmp.(*RoomDownsyncFrame)
for playerId, player := range pR.Players { for _, player := range pR.PlayersArr {
refRenderFrame.Players[playerId].ColliderRadius = player.ColliderRadius // hardcoded for now refRenderFrame.Players[player.Id].ColliderRadius = player.ColliderRadius // hardcoded for now
} }
refRenderFrame.BackendUnconfirmedMask = unconfirmedMask refRenderFrame.BackendUnconfirmedMask = unconfirmedMask
Logger.Warn(fmt.Sprintf("Sending refRenderFrameId=%v for roomId=%v, , playerId=%v, playerJoinIndex=%v, renderFrameId=%v, curDynamicsRenderFrameId=%v, playerLastSentInputFrameId=%v, lowerToSentInputFrameId=%v, upperToSendInputFrameId=%v, j=%v: InputsBuffer=%v", refRenderFrameId, pR.Id, playerId, player.JoinIndex, pR.RenderFrameId, pR.CurDynamicsRenderFrameId, player.LastSentInputFrameId, lowerToSentInputFrameId, upperToSendInputFrameId, j, pR.InputsBufferString(false))) Logger.Warn(fmt.Sprintf("Sending refRenderFrameId=%v for roomId=%v, , playerId=%v, playerJoinIndex=%v, renderFrameId=%v, curDynamicsRenderFrameId=%v, playerLastSentInputFrameId=%v, lowerToSentInputFrameId=%v, upperToSendInputFrameId=%v, j=%v: InputsBuffer=%v", refRenderFrameId, pR.Id, playerId, player.JoinIndex, pR.RenderFrameId, pR.CurDynamicsRenderFrameId, player.LastSentInputFrameId, lowerToSentInputFrameId, upperToSendInputFrameId, j, pR.InputsBufferString(false)))
@ -1530,3 +1541,45 @@ func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, upperToSe
} }
player.LastSentInputFrameId = j - 1 player.LastSentInputFrameId = j - 1
} }
func (pR *Room) createInputsBufferSnapshot() *RingBuffer {
// [WARNING] This function MUST BE called while "pR.InputsBufferLock" is locked!
snapshot := NewRingBuffer(pR.InputsBuffer.N)
for j := pR.InputsBuffer.StFrameId; j < pR.InputsBuffer.Ed; j++ {
tmp := pR.InputsBuffer.GetByFrameId(j)
snapshot.Put(tmp)
}
return snapshot
}
func (pR *Room) cloneInputFrameDownsyncsByFrameIdRange(stFrameId, edFrameId int32, src *RingBuffer) (int32, []*InputFrameDownsync) {
dst := make([]*InputFrameDownsync, 0, src.Cnt)
prevFrameFound := true
j := stFrameId
for j < edFrameId {
tmp := src.GetByFrameId(j)
if nil == tmp {
if false == prevFrameFound {
// The "id"s are always consecutive
break
} else {
prevFrameFound = false
continue
}
}
foo := tmp.(*InputFrameDownsync)
bar := &InputFrameDownsync{
InputFrameId: foo.InputFrameId,
InputList: make([]uint64, len(foo.InputList)),
ConfirmedList: foo.ConfirmedList,
}
for i, input := range foo.InputList {
bar.InputList[i] = input
}
dst = append(dst, bar)
j++
}
return j, dst
}