Further simplified signaling.

This commit is contained in:
genxium 2022-11-30 21:51:06 +08:00
parent 26370dce61
commit e165d49cb1
6 changed files with 166 additions and 112 deletions

View File

@ -1,5 +1,10 @@
package models package models
import (
. "battle_srv/protos"
"sync"
)
type RingBuffer struct { type RingBuffer struct {
Ed int32 // write index, open index Ed int32 // write index, open index
St int32 // read index, closed index St int32 // read index, closed index
@ -78,3 +83,42 @@ func (rb *RingBuffer) GetByFrameId(frameId int32) interface{} {
} }
return rb.GetByOffset(frameId - rb.StFrameId) return rb.GetByOffset(frameId - rb.StFrameId)
} }
func (rb *RingBuffer) cloneInputFrameDownsyncsByFrameIdRange(stFrameId, edFrameId int32, mux *sync.Mutex) (int32, []*InputFrameDownsync) {
dst := make([]*InputFrameDownsync, 0, rb.Cnt)
if nil != mux {
mux.Lock()
defer func() {
mux.Unlock()
}()
}
prevFrameFound := true
j := stFrameId
for j < edFrameId {
tmp := rb.GetByFrameId(j)
if nil == tmp {
if false == prevFrameFound {
// The "id"s are always consecutive
break
} else {
prevFrameFound = false
continue
}
}
foo := tmp.(*InputFrameDownsync)
bar := &InputFrameDownsync{
InputFrameId: foo.InputFrameId,
InputList: make([]uint64, len(foo.InputList)),
ConfirmedList: foo.ConfirmedList,
}
for i, input := range foo.InputList {
bar.InputList[i] = input
}
dst = append(dst, bar)
j++
}
return j, dst
}

View File

@ -35,8 +35,7 @@ const (
DOWNSYNC_MSG_ACT_BATTLE_READY_TO_START = int32(-1) DOWNSYNC_MSG_ACT_BATTLE_READY_TO_START = int32(-1)
DOWNSYNC_MSG_ACT_BATTLE_START = int32(0) DOWNSYNC_MSG_ACT_BATTLE_START = int32(0)
DOWNSYNC_MSG_ACT_PLAYER_ADDED_AND_ACKED = int32(-98) DOWNSYNC_MSG_ACT_PLAYER_ADDED_AND_ACKED = int32(-98)
DOWNSYNC_MSG_ACT_PLAYER_READDED_AND_ACKED = int32(-97)
) )
const ( const (
@ -373,7 +372,7 @@ func (pR *Room) InputsBufferString(allDetails bool) string {
func (pR *Room) StartBattle() { func (pR *Room) StartBattle() {
if RoomBattleStateIns.WAITING != pR.State { if RoomBattleStateIns.WAITING != pR.State {
Logger.Warn("[StartBattle] Battle not started due to not being WAITING!", zap.Any("roomId", pR.Id), zap.Any("roomState", pR.State)) Logger.Debug("[StartBattle] Battle not started due to not being WAITING!", zap.Any("roomId", pR.Id), zap.Any("roomState", pR.State))
return return
} }
@ -396,16 +395,18 @@ func (pR *Room) StartBattle() {
pR.refreshColliders(spaceW, spaceH) pR.refreshColliders(spaceW, spaceH)
/** /**
* Will be triggered from a goroutine which executes the critical `Room.AddPlayerIfPossible`, thus the `battleMainLoop` should be detached. * Will be triggered from a goroutine which executes the critical `Room.AddPlayerIfPossible`, thus the `battleMainLoop` should be detached.
* All of the consecutive stages, e.g. settlement, dismissal, should share the same goroutine with `battleMainLoop`. * All of the consecutive stages, e.g. settlement, dismissal, should share the same goroutine with `battleMainLoop`.
*/ *
* As "defer" is only applicable to function scope, the use of "pR.InputsBufferLock" within "battleMainLoop" is embedded into each subroutine call.
*/
battleMainLoop := func() { battleMainLoop := func() {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
Logger.Error("battleMainLoop, recovery spot#1, recovered from: ", zap.Any("roomId", pR.Id), zap.Any("panic", r)) Logger.Error("battleMainLoop, recovery spot#1, recovered from: ", zap.Any("roomId", pR.Id), zap.Any("panic", r))
pR.StopBattleForSettlement() pR.StopBattleForSettlement()
} }
Logger.Info("The `battleMainLoop` is stopped for:", zap.Any("roomId", pR.Id)) Logger.Info(fmt.Sprintf("The `battleMainLoop` for roomId=%v is stopped@renderFrameId=%v, with battleDurationFrames=%v:\n%v", pR.Id, pR.RenderFrameId, pR.BattleDurationFrames, pR.InputsBufferString(false))) // This takes sometime to print
pR.onBattleStoppedForSettlement() pR.onBattleStoppedForSettlement()
}() }()
@ -413,7 +414,6 @@ func (pR *Room) StartBattle() {
Logger.Info("The `battleMainLoop` is started for:", zap.Any("roomId", pR.Id)) Logger.Info("The `battleMainLoop` is started for:", zap.Any("roomId", pR.Id))
for { for {
stCalculation := utils.UnixtimeNano() stCalculation := utils.UnixtimeNano()
elapsedNanosSinceLastFrameIdTriggered := stCalculation - pR.LastRenderFrameIdTriggeredAt elapsedNanosSinceLastFrameIdTriggered := stCalculation - pR.LastRenderFrameIdTriggeredAt
if elapsedNanosSinceLastFrameIdTriggered < pR.dilutedRollbackEstimatedDtNanos { if elapsedNanosSinceLastFrameIdTriggered < pR.dilutedRollbackEstimatedDtNanos {
@ -421,8 +421,6 @@ func (pR *Room) StartBattle() {
} }
if pR.RenderFrameId > pR.BattleDurationFrames { if pR.RenderFrameId > pR.BattleDurationFrames {
Logger.Info(fmt.Sprintf("The `battleMainLoop` for roomId=%v is stopped@renderFrameId=%v, with battleDurationFrames=%v:\n%v", pR.Id, pR.RenderFrameId, pR.BattleDurationFrames, pR.InputsBufferString(true)))
pR.StopBattleForSettlement()
return return
} }
@ -432,9 +430,9 @@ func (pR *Room) StartBattle() {
if 0 == pR.RenderFrameId { if 0 == pR.RenderFrameId {
for playerId, player := range pR.Players { for playerId, player := range pR.Players {
currPlayerBattleState := atomic.LoadInt32(&(player.BattleState)) // Might be changed in "OnPlayerDisconnected/OnPlayerLost" from other threads thatPlayerBattleState := atomic.LoadInt32(&(player.BattleState)) // Might be changed in "OnPlayerDisconnected/OnPlayerLost" from other threads
// [WARNING] DON'T try to send any message to an inactive player! // [WARNING] DON'T try to send any message to an inactive player!
switch currPlayerBattleState { switch thatPlayerBattleState {
case PlayerBattleStateIns.DISCONNECTED: case PlayerBattleStateIns.DISCONNECTED:
case PlayerBattleStateIns.LOST: case PlayerBattleStateIns.LOST:
continue continue
@ -448,16 +446,13 @@ func (pR *Room) StartBattle() {
upperToSendInputFrameId := pR.LastAllConfirmedInputFrameId upperToSendInputFrameId := pR.LastAllConfirmedInputFrameId
dynamicsDuration := int64(0) dynamicsDuration := int64(0)
unconfirmedMask := uint64(0) unconfirmedMask := uint64(0)
// Prefab and buffer backend inputFrameDownsync // Prefab and buffer backend inputFrameDownsync
if pR.BackendDynamicsForceConfirmationEnabled { if pR.BackendDynamicsForceConfirmationEnabled {
pR.InputsBufferLock.Lock()
defer func() {
pR.InputsBufferLock.Unlock()
}()
if pR.shouldPrefabInputFrameDownsync(pR.RenderFrameId) { if pR.shouldPrefabInputFrameDownsync(pR.RenderFrameId) {
noDelayInputFrameId := pR.ConvertToInputFrameId(pR.RenderFrameId, 0) noDelayInputFrameId := pR.ConvertToInputFrameId(pR.RenderFrameId, 0)
if existingInputFrame := pR.InputsBuffer.GetByFrameId(noDelayInputFrameId); nil == existingInputFrame { if existingInputFrame := pR.InputsBuffer.GetByFrameId(noDelayInputFrameId); nil == existingInputFrame {
pR.prefabInputFrameDownsync(noDelayInputFrameId) pR.prefabInputFrameDownsync(noDelayInputFrameId, true)
} }
} }
@ -466,10 +461,6 @@ func (pR *Room) StartBattle() {
} }
if pR.BackendDynamicsEnabled { if pR.BackendDynamicsEnabled {
pR.InputsBufferLock.Lock()
defer func() {
pR.InputsBufferLock.Unlock()
}()
if 0 <= pR.LastAllConfirmedInputFrameId { if 0 <= pR.LastAllConfirmedInputFrameId {
dynamicsStartedAt := utils.UnixtimeNano() dynamicsStartedAt := utils.UnixtimeNano()
// Apply "all-confirmed inputFrames" to move forward "pR.CurDynamicsRenderFrameId" // Apply "all-confirmed inputFrames" to move forward "pR.CurDynamicsRenderFrameId"
@ -479,10 +470,9 @@ func (pR *Room) StartBattle() {
dynamicsDuration = utils.UnixtimeNano() - dynamicsStartedAt dynamicsDuration = utils.UnixtimeNano() - dynamicsStartedAt
} }
if 0 < unconfirmedMask { if 0 < unconfirmedMask {
Logger.Warn(fmt.Sprintf("roomId=%v, room.RenderFrameId=%v, room.CurDynamicsRenderFrameId=%v, room.LastAllConfirmedInputFrameId=%v, unconfirmedMask=%v", pR.Id, pR.RenderFrameId, pR.CurDynamicsRenderFrameId, pR.LastAllConfirmedInputFrameId, unconfirmedMask))
// Otherwise no need to downsync immediately // Otherwise no need to downsync immediately
for playerId, player := range pR.Players { pR.downsyncToAllPlayers(upperToSendInputFrameId, unconfirmedMask, false)
pR.downsyncToSinglePlayer(playerId, player, upperToSendInputFrameId, unconfirmedMask)
}
} }
} }
@ -527,10 +517,12 @@ func (pR *Room) OnBattleCmdReceived(pReq *WsReq) {
atomic.StoreInt32(&(player.AckingFrameId), ackingFrameId) atomic.StoreInt32(&(player.AckingFrameId), ackingFrameId)
atomic.StoreInt32(&(player.AckingInputFrameId), ackingInputFrameId) atomic.StoreInt32(&(player.AckingInputFrameId), ackingInputFrameId)
Logger.Debug(fmt.Sprintf("InputsBufferLock about to lock: roomId=%v", pR.Id))
pR.InputsBufferLock.Lock() pR.InputsBufferLock.Lock()
Logger.Debug(fmt.Sprintf("InputsBufferLock locked: roomId=%v", pR.Id))
defer func() { defer func() {
pR.InputsBufferLock.Unlock() pR.InputsBufferLock.Unlock()
Logger.Debug(fmt.Sprintf("InputsBufferLock unlocked: roomId=%v", pR.Id))
}() }()
for _, inputFrameUpsync := range inputFrameUpsyncBatch { for _, inputFrameUpsync := range inputFrameUpsyncBatch {
clientInputFrameId := inputFrameUpsync.InputFrameId clientInputFrameId := inputFrameUpsync.InputFrameId
@ -545,7 +537,7 @@ func (pR *Room) OnBattleCmdReceived(pReq *WsReq) {
} }
var targetInputFrameDownsync *InputFrameDownsync = nil var targetInputFrameDownsync *InputFrameDownsync = nil
if clientInputFrameId == pR.InputsBuffer.EdFrameId { if clientInputFrameId == pR.InputsBuffer.EdFrameId {
targetInputFrameDownsync = pR.prefabInputFrameDownsync(clientInputFrameId) targetInputFrameDownsync = pR.prefabInputFrameDownsync(clientInputFrameId, false)
Logger.Debug(fmt.Sprintf("OnBattleCmdReceived-Prefabbed new inputFrameDownsync from inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false))) Logger.Debug(fmt.Sprintf("OnBattleCmdReceived-Prefabbed new inputFrameDownsync from inputFrameUpsync: roomId=%v, playerId=%v, clientInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, pR.InputsBufferString(false)))
} else { } else {
targetInputFrameDownsync = pR.InputsBuffer.GetByFrameId(clientInputFrameId).(*InputFrameDownsync) targetInputFrameDownsync = pR.InputsBuffer.GetByFrameId(clientInputFrameId).(*InputFrameDownsync)
@ -558,9 +550,7 @@ func (pR *Room) OnBattleCmdReceived(pReq *WsReq) {
if 0 < newAllConfirmedCount { if 0 < newAllConfirmedCount {
// Downsync new all-confirmed inputFrames asap // Downsync new all-confirmed inputFrames asap
unconfirmedMask := uint64(0) unconfirmedMask := uint64(0)
for playerId, player := range pR.Players { pR.downsyncToAllPlayers(pR.LastAllConfirmedInputFrameId, unconfirmedMask, true)
pR.downsyncToSinglePlayer(playerId, player, pR.LastAllConfirmedInputFrameId, unconfirmedMask)
}
} }
} }
@ -736,8 +726,8 @@ func (pR *Room) OnDismissed() {
pR.InputFrameUpsyncDelayTolerance = 2 pR.InputFrameUpsyncDelayTolerance = 2
pR.MaxChasingRenderFramesPerUpdate = 8 pR.MaxChasingRenderFramesPerUpdate = 8
pR.BackendDynamicsEnabled = false // [WARNING] When "false", recovery upon reconnection wouldn't work! pR.BackendDynamicsEnabled = true // [WARNING] When "false", recovery upon reconnection wouldn't work!
pR.BackendDynamicsForceConfirmationEnabled = (pR.BackendDynamicsEnabled && true) pR.BackendDynamicsForceConfirmationEnabled = (pR.BackendDynamicsEnabled && false)
punchSkillId := int32(1) punchSkillId := int32(1)
pR.MeleeSkillConfig = make(map[int32]*MeleeBullet, 0) pR.MeleeSkillConfig = make(map[int32]*MeleeBullet, 0)
pR.MeleeSkillConfig[punchSkillId] = &MeleeBullet{ pR.MeleeSkillConfig[punchSkillId] = &MeleeBullet{
@ -801,8 +791,8 @@ func (pR *Room) OnPlayerDisconnected(playerId int32) {
}() }()
if player, existent := pR.Players[playerId]; existent { if player, existent := pR.Players[playerId]; existent {
currPlayerBattleState := atomic.LoadInt32(&(player.BattleState)) thatPlayerBattleState := atomic.LoadInt32(&(player.BattleState))
switch currPlayerBattleState { switch thatPlayerBattleState {
case PlayerBattleStateIns.DISCONNECTED: case PlayerBattleStateIns.DISCONNECTED:
case PlayerBattleStateIns.LOST: case PlayerBattleStateIns.LOST:
case PlayerBattleStateIns.EXPELLED_DURING_GAME: case PlayerBattleStateIns.EXPELLED_DURING_GAME:
@ -899,7 +889,7 @@ func (pR *Room) onPlayerAdded(playerId int32) {
} }
pR.updateScore() pR.updateScore()
Logger.Info("onPlayerAdded:", zap.Any("playerId", playerId), zap.Any("roomId", pR.Id), zap.Any("joinIndex", pR.Players[playerId].JoinIndex), zap.Any("EffectivePlayerCount", pR.EffectivePlayerCount), zap.Any("resulted pR.JoinIndexBooleanArr", pR.JoinIndexBooleanArr), zap.Any("RoomBattleState", pR.State)) Logger.Info("onPlayerAdded:", zap.Any("roomId", pR.Id), zap.Any("playerId", playerId), zap.Any("playerBattleState", pR.Players[playerId].BattleState), zap.Any("joinIndex", pR.Players[playerId].JoinIndex), zap.Any("EffectivePlayerCount", pR.EffectivePlayerCount), zap.Any("resulted pR.JoinIndexBooleanArr", pR.JoinIndexBooleanArr), zap.Any("RoomBattleState", pR.State))
} }
func (pR *Room) onPlayerReAdded(playerId int32) { func (pR *Room) onPlayerReAdded(playerId int32) {
@ -920,47 +910,46 @@ func (pR *Room) OnPlayerBattleColliderAcked(playerId int32) bool {
return false return false
} }
// Broadcast added or readded player info to all players in the same room Logger.Info(fmt.Sprintf("OnPlayerBattleColliderAcked-before: roomId=%v, roomState=%v, targetPlayerId=%v, targetPlayerBattleState=%v, capacity=%v, EffectivePlayerCount=%v", pR.Id, pR.State, targetPlayer.Id, targetPlayer.BattleState, pR.Capacity, pR.EffectivePlayerCount))
for _, eachPlayer := range pR.Players { switch targetPlayer.BattleState {
/* case PlayerBattleStateIns.ADDED_PENDING_BATTLE_COLLIDER_ACK:
[WARNING] playerAckedFrame := &RoomDownsyncFrame{
This `playerAckedFrame` is the first ever "RoomDownsyncFrame" for every "PersistentSessionClient on the frontend", and it goes right after each "BattleColliderInfo". Id: pR.RenderFrameId,
Players: toPbPlayers(pR.Players, true),
By making use of the sequential nature of each ws session, all later "RoomDownsyncFrame"s generated after `pRoom.StartBattle()` will be put behind this `playerAckedFrame`.
This function is triggered by an upsync message via WebSocket, thus downsync sending is also available by now.
*/
currPlayerBattleState := atomic.LoadInt32(&(eachPlayer.BattleState))
if PlayerBattleStateIns.DISCONNECTED == currPlayerBattleState || PlayerBattleStateIns.LOST == currPlayerBattleState {
// [WARNING] DON'T try to send any message to an inactive player!
continue
} }
switch targetPlayer.BattleState { // Broadcast normally added player info to all players in the same room
case PlayerBattleStateIns.ADDED_PENDING_BATTLE_COLLIDER_ACK: for thatPlayerId, thatPlayer := range pR.Players {
playerAckedFrame := &RoomDownsyncFrame{ /*
Id: pR.RenderFrameId, [WARNING]
Players: toPbPlayers(pR.Players, true), This `playerAckedFrame` is the first ever "RoomDownsyncFrame" for every "PersistentSessionClient on the frontend", and it goes right after each "BattleColliderInfo".
By making use of the sequential nature of each ws session, all later "RoomDownsyncFrame"s generated after `pRoom.StartBattle()` will be put behind this `playerAckedFrame`.
This function is triggered by an upsync message via WebSocket, thus downsync sending is also available by now.
*/
thatPlayerBattleState := atomic.LoadInt32(&(thatPlayer.BattleState))
Logger.Info(fmt.Sprintf("OnPlayerBattleColliderAcked-middle: roomId=%v, roomState=%v, targetPlayerId=%v, targetPlayerBattleState=%v, thatPlayerId=%v, thatPlayerBattleState=%v", pR.Id, pR.State, targetPlayer.Id, targetPlayer.BattleState, thatPlayer.Id, thatPlayerBattleState))
if thatPlayerId == targetPlayer.Id || (PlayerBattleStateIns.ADDED_PENDING_BATTLE_COLLIDER_ACK == thatPlayerBattleState || PlayerBattleStateIns.ACTIVE == thatPlayerBattleState) {
Logger.Info(fmt.Sprintf("OnPlayerBattleColliderAcked-sending DOWNSYNC_MSG_ACT_PLAYER_ADDED_AND_ACKED: roomId=%v, roomState=%v, targetPlayerId=%v, targetPlayerBattleState=%v, capacity=%v, EffectivePlayerCount=%v", pR.Id, pR.State, targetPlayer.Id, targetPlayer.BattleState, pR.Capacity, pR.EffectivePlayerCount))
pR.sendSafely(playerAckedFrame, nil, DOWNSYNC_MSG_ACT_PLAYER_ADDED_AND_ACKED, thatPlayer.Id)
} }
pR.sendSafely(playerAckedFrame, nil, DOWNSYNC_MSG_ACT_PLAYER_ADDED_AND_ACKED, eachPlayer.Id)
case PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK:
playerAckedFrame := &RoomDownsyncFrame{
Id: pR.RenderFrameId,
Players: toPbPlayers(pR.Players, true),
}
pR.sendSafely(playerAckedFrame, nil, DOWNSYNC_MSG_ACT_PLAYER_READDED_AND_ACKED, eachPlayer.Id)
default:
} }
case PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK:
// only send resync info to the targetPlayer
// i.e. implies that "targetPlayer.LastSentInputFrameId == MAGIC_LAST_SENT_INPUT_FRAME_ID_READDED"
pR.downsyncToSinglePlayer(playerId, targetPlayer, pR.LastAllConfirmedInputFrameId, uint64(0), false)
default:
} }
targetPlayer.BattleState = PlayerBattleStateIns.ACTIVE targetPlayer.BattleState = PlayerBattleStateIns.ACTIVE
Logger.Info(fmt.Sprintf("OnPlayerBattleColliderAcked: roomId=%v, roomState=%v, targetPlayerId=%v, targetPlayerBattleState=%v, capacity=%v, EffectivePlayerCount=%v", pR.Id, pR.State, targetPlayer.Id, targetPlayer.BattleState, pR.Capacity, pR.EffectivePlayerCount)) Logger.Info(fmt.Sprintf("OnPlayerBattleColliderAcked-post-downsync: roomId=%v, roomState=%v, targetPlayerId=%v, targetPlayerBattleState=%v, capacity=%v, EffectivePlayerCount=%v", pR.Id, pR.State, targetPlayer.Id, targetPlayer.BattleState, pR.Capacity, pR.EffectivePlayerCount))
if pR.Capacity == int(pR.EffectivePlayerCount) { if pR.Capacity == int(pR.EffectivePlayerCount) {
allAcked := true allAcked := true
for _, p := range pR.Players { for _, p := range pR.Players {
if PlayerBattleStateIns.ACTIVE != p.BattleState { if PlayerBattleStateIns.ACTIVE != p.BattleState {
Logger.Info("unexpectedly got an inactive player", zap.Any("roomId", pR.Id), zap.Any("playerId", p.Id), zap.Any("battleState", p.BattleState)) Logger.Warn("unexpectedly got an inactive player", zap.Any("roomId", pR.Id), zap.Any("playerId", p.Id), zap.Any("battleState", p.BattleState))
allAcked = false allAcked = false
break break
} }
@ -974,7 +963,7 @@ func (pR *Room) OnPlayerBattleColliderAcked(playerId int32) bool {
return true return true
} }
func (pR *Room) sendSafely(roomDownsyncFrame *RoomDownsyncFrame, toSendFrames []*InputFrameDownsync, act int32, playerId int32) { func (pR *Room) sendSafely(roomDownsyncFrame *RoomDownsyncFrame, toSendInputFrameDownsyncs []*InputFrameDownsync, act int32, playerId int32) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
pR.PlayerSignalToCloseDict[playerId](Constants.RetCode.UnknownError, fmt.Sprintf("%v", r)) pR.PlayerSignalToCloseDict[playerId](Constants.RetCode.UnknownError, fmt.Sprintf("%v", r))
@ -985,7 +974,7 @@ func (pR *Room) sendSafely(roomDownsyncFrame *RoomDownsyncFrame, toSendFrames []
Ret: int32(Constants.RetCode.Ok), Ret: int32(Constants.RetCode.Ok),
Act: act, Act: act,
Rdf: roomDownsyncFrame, Rdf: roomDownsyncFrame,
InputFrameDownsyncBatch: toSendFrames, InputFrameDownsyncBatch: toSendInputFrameDownsyncs,
} }
theBytes, marshalErr := proto.Marshal(pResp) theBytes, marshalErr := proto.Marshal(pResp)
@ -1002,12 +991,24 @@ func (pR *Room) shouldPrefabInputFrameDownsync(renderFrameId int32) bool {
return ((renderFrameId & ((1 << pR.InputScaleFrames) - 1)) == 0) return ((renderFrameId & ((1 << pR.InputScaleFrames) - 1)) == 0)
} }
func (pR *Room) prefabInputFrameDownsync(inputFrameId int32) *InputFrameDownsync { func (pR *Room) prefabInputFrameDownsync(inputFrameId int32, lockInputsBuffer bool) *InputFrameDownsync {
/* /*
Kindly note that on backend the prefab is much simpler than its frontend counterpart, because frontend will upsync its latest command immediately if there's any change w.r.t. its own prev cmd, thus if no upsync received from a frontend, Kindly note that on backend the prefab is much simpler than its frontend counterpart, because frontend will upsync its latest command immediately if there's any change w.r.t. its own prev cmd, thus if no upsync received from a frontend,
- EITHER it's due to local lag and bad network, - EITHER it's due to local lag and bad network,
- OR there's no change w.r.t. to its prev cmd. - OR there's no change w.r.t. to its prev cmd.
*/ */
if lockInputsBuffer {
Logger.Info(fmt.Sprintf("InputsBufferLock to about lock: roomId=%v", pR.Id))
pR.InputsBufferLock.Lock()
Logger.Info(fmt.Sprintf("InputsBufferLock locked: roomId=%v", pR.Id))
defer func() {
pR.InputsBufferLock.Unlock()
Logger.Info(fmt.Sprintf("InputsBufferLock unlocked: roomId=%v", pR.Id))
}()
}
var currInputFrameDownsync *InputFrameDownsync = nil var currInputFrameDownsync *InputFrameDownsync = nil
if 0 == inputFrameId && 0 == pR.InputsBuffer.Cnt { if 0 == inputFrameId && 0 == pR.InputsBuffer.Cnt {
@ -1057,7 +1058,7 @@ func (pR *Room) markConfirmationIfApplicable() int {
} }
} }
Logger.Info(fmt.Sprintf("markConfirmationIfApplicable checking inputFrameId=[%v, %v) for roomId=%v, newAllConfirmedCount=%d: InputsBuffer=%v", inputFrameId1, pR.InputsBuffer.EdFrameId, pR.Id, newAllConfirmedCount, pR.InputsBufferString(false))) Logger.Debug(fmt.Sprintf("markConfirmationIfApplicable checking inputFrameId=[%v, %v) for roomId=%v, newAllConfirmedCount=%d: InputsBuffer=%v", inputFrameId1, pR.InputsBuffer.EdFrameId, pR.Id, newAllConfirmedCount, pR.InputsBufferString(false)))
return newAllConfirmedCount return newAllConfirmedCount
} }
@ -1079,6 +1080,15 @@ func (pR *Room) forceConfirmationIfApplicable() uint64 {
Logger.Debug(fmt.Sprintf("inputFrameId2=%v is already all-confirmed for roomId=%v[type#1], no need to force confirmation of it", inputFrameId2, pR.Id)) Logger.Debug(fmt.Sprintf("inputFrameId2=%v is already all-confirmed for roomId=%v[type#1], no need to force confirmation of it", inputFrameId2, pR.Id))
return 0 return 0
} }
Logger.Info(fmt.Sprintf("InputsBufferLock about to lock: roomId=%v", pR.Id))
pR.InputsBufferLock.Lock()
Logger.Info(fmt.Sprintf("InputsBufferLock locked: roomId=%v", pR.Id))
defer func() {
pR.InputsBufferLock.Unlock()
Logger.Info(fmt.Sprintf("InputsBufferLock unlocked: roomId=%v", pR.Id))
}()
tmp := pR.InputsBuffer.GetByFrameId(inputFrameId2) tmp := pR.InputsBuffer.GetByFrameId(inputFrameId2)
if nil == tmp { if nil == tmp {
panic(fmt.Sprintf("inputFrameId2=%v doesn't exist for roomId=%v, this is abnormal because the server should prefab inputFrameDownsync in a most advanced pace, check the prefab logic! InputsBuffer=%v", inputFrameId2, pR.Id, pR.InputsBufferString(false))) panic(fmt.Sprintf("inputFrameId2=%v doesn't exist for roomId=%v, this is abnormal because the server should prefab inputFrameDownsync in a most advanced pace, check the prefab logic! InputsBuffer=%v", inputFrameId2, pR.Id, pR.InputsBufferString(false)))
@ -1106,6 +1116,14 @@ func (pR *Room) applyInputFrameDownsyncDynamics(fromRenderFrameId int32, toRende
totPlayerCnt := uint32(pR.Capacity) totPlayerCnt := uint32(pR.Capacity)
allConfirmedMask := uint64((1 << totPlayerCnt) - 1) allConfirmedMask := uint64((1 << totPlayerCnt) - 1)
Logger.Debug(fmt.Sprintf("applyInputFrameDownsyncDynamics-InputsBufferLock about to lock: roomId=%v", pR.Id))
pR.InputsBufferLock.Lock()
Logger.Debug(fmt.Sprintf("applyInputFrameDownsyncDynamics-InputsBufferLock locked: roomId=%v", pR.Id))
defer func() {
pR.InputsBufferLock.Unlock()
Logger.Debug(fmt.Sprintf("applyInputFrameDownsyncDynamics-InputsBufferLock unlocked: roomId=%v", pR.Id))
}()
for collisionSysRenderFrameId := fromRenderFrameId; collisionSysRenderFrameId < toRenderFrameId; collisionSysRenderFrameId++ { for collisionSysRenderFrameId := fromRenderFrameId; collisionSysRenderFrameId < toRenderFrameId; collisionSysRenderFrameId++ {
currRenderFrameTmp := pR.RenderFrameBuffer.GetByFrameId(collisionSysRenderFrameId) currRenderFrameTmp := pR.RenderFrameBuffer.GetByFrameId(collisionSysRenderFrameId)
if nil == currRenderFrameTmp { if nil == currRenderFrameTmp {
@ -1407,7 +1425,13 @@ func (pR *Room) printBarrier(barrierCollider *resolv.Object) {
Logger.Info(fmt.Sprintf("Barrier in roomId=%v: w=%v, h=%v, shape=%v", pR.Id, barrierCollider.W, barrierCollider.H, barrierCollider.Shape)) Logger.Info(fmt.Sprintf("Barrier in roomId=%v: w=%v, h=%v, shape=%v", pR.Id, barrierCollider.W, barrierCollider.H, barrierCollider.Shape))
} }
func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, upperToSendInputFrameId int32, unconfirmedMask uint64) { func (pR *Room) downsyncToAllPlayers(upperToSendInputFrameId int32, unconfirmedMask uint64, prohibitsInputsBufferLock bool) {
for playerId, player := range pR.Players {
pR.downsyncToSinglePlayer(playerId, player, pR.LastAllConfirmedInputFrameId, unconfirmedMask, prohibitsInputsBufferLock)
}
}
func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, upperToSendInputFrameId int32, unconfirmedMask uint64, prohibitsInputsBufferLock bool) {
currPlayerBattleState := atomic.LoadInt32(&(player.BattleState)) // Might be changed in "OnPlayerDisconnected/OnPlayerLost" from other threads currPlayerBattleState := atomic.LoadInt32(&(player.BattleState)) // Might be changed in "OnPlayerDisconnected/OnPlayerLost" from other threads
// [WARNING] DON'T try to send any message to an inactive player! // [WARNING] DON'T try to send any message to an inactive player!
@ -1417,16 +1441,8 @@ func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, upperToSe
return return
} }
if 0 == pR.RenderFrameId {
kickoffFrame := pR.RenderFrameBuffer.GetByFrameId(0).(*RoomDownsyncFrame)
pR.sendSafely(kickoffFrame, nil, DOWNSYNC_MSG_ACT_BATTLE_START, playerId)
return
}
// [WARNING] Websocket is TCP-based, thus no need to re-send a previously sent inputFrame to a same player! // [WARNING] Websocket is TCP-based, thus no need to re-send a previously sent inputFrame to a same player!
toSendInputFrames := make([]*InputFrameDownsync, 0, pR.InputsBuffer.Cnt) lowerToSentInputFrameId := player.LastSentInputFrameId + 1
j := player.LastSentInputFrameId + 1
/* /*
[WARNING] [WARNING]
Upon resynced on frontend, "refRenderFrameId" MUST BE CAPPED somehow by "upperToSendInputFrameId", if frontend resyncs itself to a more advanced value than given below, upon the next renderFrame tick on the frontend it might generate non-consecutive "nextInputFrameId > frontend.recentInputCache.edFrameId+1". Upon resynced on frontend, "refRenderFrameId" MUST BE CAPPED somehow by "upperToSendInputFrameId", if frontend resyncs itself to a more advanced value than given below, upon the next renderFrame tick on the frontend it might generate non-consecutive "nextInputFrameId > frontend.recentInputCache.edFrameId+1".
@ -1442,27 +1458,26 @@ func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, upperToSe
if MAGIC_LAST_SENT_INPUT_FRAME_ID_READDED == player.LastSentInputFrameId { if MAGIC_LAST_SENT_INPUT_FRAME_ID_READDED == player.LastSentInputFrameId {
// A rejoined player, should guarantee that when it resyncs to "refRenderFrameId" a matching inputFrame to apply exists // A rejoined player, should guarantee that when it resyncs to "refRenderFrameId" a matching inputFrame to apply exists
j = pR.ConvertToInputFrameId(refRenderFrameId, pR.InputDelayFrames) lowerToSentInputFrameId = pR.ConvertToInputFrameId(refRenderFrameId, pR.InputDelayFrames)
Logger.Warn(fmt.Sprintf("Resetting refRenderFrame for rejoined player: roomId=%v, playerId=%v, refRenderFrameId=%v, j=%v, upperToSendInputFrameId=%v", pR.Id, playerId, refRenderFrameId, j, upperToSendInputFrameId)) Logger.Warn(fmt.Sprintf("Resetting refRenderFrame for rejoined player: roomId=%v, renderFrameId=%v, playerId=%v, refRenderFrameId=%v, lowerToSentInputFrameId=%v, upperToSendInputFrameId=%v", pR.Id, pR.RenderFrameId, playerId, refRenderFrameId, lowerToSentInputFrameId, upperToSendInputFrameId))
}
if lowerToSentInputFrameId > upperToSendInputFrameId {
Logger.Warn(fmt.Sprintf("Not sending due to potentially empty toSendInputFrameDownsyncs: roomId=%v, playerId=%v, refRenderFrameId=%v, lowerToSentInputFrameId=%v, upperToSendInputFrameId=%v, lastSentInputFrameId=%v, playerAckingInputFrameId=%v", pR.Id, playerId, refRenderFrameId, lowerToSentInputFrameId, upperToSendInputFrameId, player.LastSentInputFrameId, player.AckingInputFrameId))
return
} }
// [WARNING] EDGE CASE HERE: Upon initialization, all of "lastAllConfirmedInputFrameId", "lastAllConfirmedInputFrameIdWithChange" and "anchorInputFrameId" are "-1", thus "j" starts with "0", however "inputFrameId: 0" might not have been all confirmed! // [WARNING] EDGE CASE HERE: Upon initialization, all of "lastAllConfirmedInputFrameId", "lastAllConfirmedInputFrameIdWithChange" and "anchorInputFrameId" are "-1", thus "j" starts with "0", however "inputFrameId: 0" might not have been all confirmed!
for j <= upperToSendInputFrameId { var theInputsBufferLockToUse *sync.Mutex = &pR.InputsBufferLock
tmp := pR.InputsBuffer.GetByFrameId(j) if prohibitsInputsBufferLock {
if nil == tmp { // Already locked in caller function
Logger.Warn(fmt.Sprintf("Required inputFrameId=%v for roomId=%v, playerId=%v doesn't exist! InputsBuffer=%v", j, pR.Id, playerId, pR.InputsBufferString(false))) theInputsBufferLockToUse = nil
continue
}
f := tmp.(*InputFrameDownsync)
if pR.inputFrameIdDebuggable(j) {
Logger.Debug("inputFrame lifecycle#3[sending]:", zap.Any("roomId", pR.Id), zap.Any("playerId", playerId), zap.Any("playerAckingInputFrameId", player.AckingInputFrameId), zap.Any("inputFrameId", j), zap.Any("inputFrameId-doublecheck", f.InputFrameId), zap.Any("InputsBuffer", pR.InputsBufferString(false)), zap.Any("ConfirmedList", f.ConfirmedList))
}
toSendInputFrames = append(toSendInputFrames, f)
j++
} }
// [WARNING] Clone to deliberately avoid holding "pR.InputsBufferLock" while using network I/O
j, toSendInputFrameDownsyncs := pR.InputsBuffer.cloneInputFrameDownsyncsByFrameIdRange(lowerToSentInputFrameId, upperToSendInputFrameId+1, theInputsBufferLockToUse)
if 0 >= len(toSendInputFrames) { if 0 >= len(toSendInputFrameDownsyncs) {
Logger.Debug(fmt.Sprintf("Not sending due to empty toSendInputFrames: roomId=%v, playerId=%v, refRenderFrameId=%v, upperToSendInputFrameId=%v, lastSentInputFrameId=%v, playerAckingInputFrameId=%v", pR.Id, playerId, refRenderFrameId, upperToSendInputFrameId, player.LastSentInputFrameId, player.AckingInputFrameId)) Logger.Debug(fmt.Sprintf("Not sending due to actually empty toSendInputFrameDownsyncs: roomId=%v, playerId=%v, refRenderFrameId=%v, lowerToSentInputFrameId=%v, upperToSendInputFrameId=%v, j=%v, lastSentInputFrameId=%v, playerAckingInputFrameId=%v", pR.Id, playerId, refRenderFrameId, lowerToSentInputFrameId, upperToSendInputFrameId, j, player.LastSentInputFrameId, player.AckingInputFrameId))
return return
} }
@ -1477,13 +1492,15 @@ func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, upperToSe
if pR.BackendDynamicsEnabled && (shouldResync1 || shouldResync2) { if pR.BackendDynamicsEnabled && (shouldResync1 || shouldResync2) {
tmp := pR.RenderFrameBuffer.GetByFrameId(refRenderFrameId) tmp := pR.RenderFrameBuffer.GetByFrameId(refRenderFrameId)
if nil == tmp { if nil == tmp {
panic(fmt.Sprintf("Required refRenderFrameId=%v for roomId=%v, playerId=%v, j=%v doesn't exist! InputsBuffer=%v, RenderFrameBuffer=%v", refRenderFrameId, pR.Id, playerId, j, pR.InputsBufferString(false), pR.RenderFrameBufferString())) panic(fmt.Sprintf("Required refRenderFrameId=%v for roomId=%v, renderFrameId=%v, playerId=%v, j=%v doesn't exist! InputsBuffer=%v, RenderFrameBuffer=%v", refRenderFrameId, pR.Id, pR.RenderFrameId, playerId, j, pR.InputsBufferString(false), pR.RenderFrameBufferString()))
} }
Logger.Warn(fmt.Sprintf("Sending refRenderFrameId=%v for roomId=%v, renderFrameId=%v, playerId=%v, j=%v", refRenderFrameId, pR.Id, pR.RenderFrameId, playerId, j))
refRenderFrame := tmp.(*RoomDownsyncFrame) refRenderFrame := tmp.(*RoomDownsyncFrame)
refRenderFrame.BackendUnconfirmedMask = unconfirmedMask refRenderFrame.BackendUnconfirmedMask = unconfirmedMask
pR.sendSafely(refRenderFrame, toSendInputFrames, DOWNSYNC_MSG_ACT_FORCED_RESYNC, playerId) pR.sendSafely(refRenderFrame, toSendInputFrameDownsyncs, DOWNSYNC_MSG_ACT_FORCED_RESYNC, playerId)
} else { } else {
pR.sendSafely(nil, toSendInputFrames, DOWNSYNC_MSG_ACT_INPUT_BATCH, playerId) pR.sendSafely(nil, toSendInputFrameDownsyncs, DOWNSYNC_MSG_ACT_INPUT_BATCH, playerId)
} }
player.LastSentInputFrameId = j - 1 player.LastSentInputFrameId = j - 1
} }

View File

@ -440,7 +440,7 @@
"array": [ "array": [
0, 0,
0, 0,
216.50635094610968, 216.84703350462206,
0, 0,
0, 0,
0, 0,

View File

@ -124,11 +124,8 @@ cc.Class({
self.recentInputCache.pop(); self.recentInputCache.pop();
} }
const [ret, oldStFrameId, oldEdFrameId] = self.recentInputCache.setByFrameId(inputFrameDownsync, inputFrameDownsync.inputFrameId); const [ret, oldStFrameId, oldEdFrameId] = self.recentInputCache.setByFrameId(inputFrameDownsync, inputFrameDownsync.inputFrameId);
if (window.RING_BUFF_NON_CONSECUTIVE_SET == ret) {
throw `Failed to dump input cache#1! inputFrameDownsync.inputFrameId=${inputFrameDownsync.inputFrameId}, lastAllConfirmedRenderFrameId=${self.lastAllConfirmedRenderFrameId}, lastAllConfirmedInputFrameId=${self.lastAllConfirmedInputFrameId}; recentRenderCache=${self._stringifyRecentRenderCache(false)}, recentInputCache=${self._stringifyRecentInputCache(false)}`;
}
if (window.RING_BUFF_FAILED_TO_SET == ret) { if (window.RING_BUFF_FAILED_TO_SET == ret) {
throw `Failed to dump input cache#2 (maybe recentInputCache too small)! inputFrameDownsync.inputFrameId=${inputFrameDownsync.inputFrameId}, lastAllConfirmedRenderFrameId=${self.lastAllConfirmedRenderFrameId}, lastAllConfirmedInputFrameId=${self.lastAllConfirmedInputFrameId}; recentRenderCache=${self._stringifyRecentRenderCache(false)}, recentInputCache=${self._stringifyRecentInputCache(false)}`; throw `Failed to dump input cache (maybe recentInputCache too small)! inputFrameDownsync.inputFrameId=${inputFrameDownsync.inputFrameId}, lastAllConfirmedRenderFrameId=${self.lastAllConfirmedRenderFrameId}, lastAllConfirmedInputFrameId=${self.lastAllConfirmedInputFrameId}; recentRenderCache=${self._stringifyRecentRenderCache(false)}, recentInputCache=${self._stringifyRecentInputCache(false)}`;
} }
return ret; return ret;
}, },

View File

@ -74,12 +74,12 @@ RingBuffer.prototype.getByFrameId = function(frameId) {
RingBuffer.prototype.setByFrameId = function(item, frameId) { RingBuffer.prototype.setByFrameId = function(item, frameId) {
const oldStFrameId = this.stFrameId, const oldStFrameId = this.stFrameId,
oldEdFrameId = this.edFrameId; oldEdFrameId = this.edFrameId;
if (frameId < this.stFrameId) { if (frameId < oldStFrameId) {
return [window.RING_BUFF_FAILED_TO_SET, oldStFrameId, oldEdFrameId]; return [window.RING_BUFF_FAILED_TO_SET, oldStFrameId, oldEdFrameId];
} }
// By now "this.stFrameId <= frameId" // By now "this.stFrameId <= frameId"
if (this.edFrameId > frameId) { if (oldEdFrameId > frameId) {
const arrIdx = this.getArrIdxByOffset(frameId - this.stFrameId); const arrIdx = this.getArrIdxByOffset(frameId - this.stFrameId);
if (null != arrIdx) { if (null != arrIdx) {
this.eles[arrIdx] = item; this.eles[arrIdx] = item;
@ -89,16 +89,16 @@ RingBuffer.prototype.setByFrameId = function(item, frameId) {
// By now "this.edFrameId <= frameId" // By now "this.edFrameId <= frameId"
let ret = window.RING_BUFF_CONSECUTIVE_SET; let ret = window.RING_BUFF_CONSECUTIVE_SET;
if (this.edFrameId < frameId) { if (oldEdFrameId < frameId) {
this.st = this.ed = 0; this.st = this.ed = 0;
this.stFrameId = this.edFrameId = frameId; this.stFrameId = this.edFrameId = frameId;
this.cnt = 0; this.cnt = 0;
ret = window.RING_BUFF_NON_CONSECUTIVE_SET; ret = window.RING_BUFF_NON_CONSECUTIVE_SET;
} else {
// this.edFrameId == frameId
this.put(item);
} }
// By now "this.edFrameId == frameId"
this.put(item);
return [ret, oldStFrameId, oldEdFrameId]; return [ret, oldStFrameId, oldEdFrameId];
}; };

View File

@ -5,7 +5,6 @@ window.UPSYNC_MSG_ACT_PLAYER_CMD = 2;
window.UPSYNC_MSG_ACT_PLAYER_COLLIDER_ACK = 3; window.UPSYNC_MSG_ACT_PLAYER_COLLIDER_ACK = 3;
window.DOWNSYNC_MSG_ACT_PLAYER_ADDED_AND_ACKED = -98; window.DOWNSYNC_MSG_ACT_PLAYER_ADDED_AND_ACKED = -98;
window.DOWNSYNC_MSG_ACT_PLAYER_READDED_AND_ACKED = -97;
window.DOWNSYNC_MSG_ACT_BATTLE_READY_TO_START = -1; window.DOWNSYNC_MSG_ACT_BATTLE_READY_TO_START = -1;
window.DOWNSYNC_MSG_ACT_BATTLE_START = 0; window.DOWNSYNC_MSG_ACT_BATTLE_START = 0;
window.DOWNSYNC_MSG_ACT_HB_REQ = 1; window.DOWNSYNC_MSG_ACT_HB_REQ = 1;
@ -154,10 +153,6 @@ window.initPersistentSessionClient = function(onopenCb, expectedRoomId) {
case window.DOWNSYNC_MSG_ACT_PLAYER_ADDED_AND_ACKED: case window.DOWNSYNC_MSG_ACT_PLAYER_ADDED_AND_ACKED:
mapIns.onPlayerAdded(resp.rdf); mapIns.onPlayerAdded(resp.rdf);
break; break;
case window.DOWNSYNC_MSG_ACT_PLAYER_READDED_AND_ACKED:
// Deliberately left blank for now
mapIns.hideFindingPlayersGUI(resp.rdf);
break;
case window.DOWNSYNC_MSG_ACT_BATTLE_READY_TO_START: case window.DOWNSYNC_MSG_ACT_BATTLE_READY_TO_START:
mapIns.onBattleReadyToStart(resp.rdf); mapIns.onBattleReadyToStart(resp.rdf);
break; break;
@ -176,6 +171,7 @@ window.initPersistentSessionClient = function(onopenCb, expectedRoomId) {
${JSON.stringify(resp, null, 2)}`); ${JSON.stringify(resp, null, 2)}`);
return; return;
} }
mapIns.hideFindingPlayersGUI(resp.rdf);
const inputFrameIdConsecutive = (resp.inputFrameDownsyncBatch[0].inputFrameId == mapIns.lastAllConfirmedInputFrameId + 1); const inputFrameIdConsecutive = (resp.inputFrameDownsyncBatch[0].inputFrameId == mapIns.lastAllConfirmedInputFrameId + 1);
// The following order of execution is important // The following order of execution is important
mapIns.onRoomDownsyncFrame(resp.rdf); mapIns.onRoomDownsyncFrame(resp.rdf);