mirror of
https://github.com/genxium/DelayNoMore
synced 2025-10-09 08:36:52 +00:00
Fixes for downsync order preservation.
This commit is contained in:
@@ -147,7 +147,7 @@ type Room struct {
|
||||
* Moreover, during the invocation of `PlayerSignalToCloseDict`, the `Player` instance is supposed to be deallocated (though not synchronously).
|
||||
*/
|
||||
PlayerDownsyncSessionDict map[int32]*websocket.Conn
|
||||
PlayerDownsyncSessionLock map[int32]*sync.Mutex // Guards [PlayerDownsyncSessionDict, each player.LastSentInputFrameId]
|
||||
PlayerDownsyncChanDict map[int32](chan InputsBufferSnapshot)
|
||||
PlayerSignalToCloseDict map[int32]SignalToCloseConnCbType
|
||||
Score float32
|
||||
State int32
|
||||
@@ -201,7 +201,6 @@ func (pR *Room) AddPlayerIfPossible(pPlayerFromDbInit *Player, session *websocke
|
||||
|
||||
pR.Players[playerId] = pPlayerFromDbInit
|
||||
pR.PlayerDownsyncSessionDict[playerId] = session
|
||||
pR.PlayerDownsyncSessionLock[playerId] = &sync.Mutex{}
|
||||
pR.PlayerSignalToCloseDict[playerId] = signalToCloseConnOfThisPlayer
|
||||
return true
|
||||
}
|
||||
@@ -232,7 +231,6 @@ func (pR *Room) ReAddPlayerIfPossible(pTmpPlayerInstance *Player, session *webso
|
||||
pEffectiveInRoomPlayerInstance.ColliderRadius = DEFAULT_PLAYER_RADIUS // Hardcoded
|
||||
|
||||
pR.PlayerDownsyncSessionDict[playerId] = session
|
||||
pR.PlayerDownsyncSessionLock[playerId] = &sync.Mutex{}
|
||||
pR.PlayerSignalToCloseDict[playerId] = signalToCloseConnOfThisPlayer
|
||||
|
||||
Logger.Warn("ReAddPlayerIfPossible finished.", zap.Any("roomId", pR.Id), zap.Any("playerId", playerId), zap.Any("joinIndex", pEffectiveInRoomPlayerInstance.JoinIndex), zap.Any("playerBattleState", pEffectiveInRoomPlayerInstance.BattleState), zap.Any("roomState", pR.State), zap.Any("roomEffectivePlayerCount", pR.EffectivePlayerCount), zap.Any("AckingFrameId", pEffectiveInRoomPlayerInstance.AckingFrameId), zap.Any("AckingInputFrameId", pEffectiveInRoomPlayerInstance.AckingInputFrameId), zap.Any("LastSentInputFrameId", pEffectiveInRoomPlayerInstance.LastSentInputFrameId))
|
||||
@@ -466,11 +464,7 @@ func (pR *Room) StartBattle() {
|
||||
|
||||
// Prefab and buffer backend inputFrameDownsync
|
||||
if pR.BackendDynamicsEnabled {
|
||||
unconfirmedMask, refRenderFrameId, inputsBufferSnapshot := pR.doBattleMainLoopPerTickBackendDynamicsWithProperLocking(prevRenderFrameId, &dynamicsDuration)
|
||||
if 0 < unconfirmedMask {
|
||||
Logger.Warn(fmt.Sprintf("roomId=%v, room.RenderFrameId=%v, room.CurDynamicsRenderFrameId=%v, room.LastAllConfirmedInputFrameId=%v, unconfirmedMask=%v", pR.Id, pR.RenderFrameId, pR.CurDynamicsRenderFrameId, pR.LastAllConfirmedInputFrameId, unconfirmedMask))
|
||||
pR.downsyncToAllPlayers(refRenderFrameId, unconfirmedMask, inputsBufferSnapshot)
|
||||
}
|
||||
pR.doBattleMainLoopPerTickBackendDynamicsWithProperLocking(prevRenderFrameId, &dynamicsDuration)
|
||||
}
|
||||
|
||||
pR.LastRenderFrameIdTriggeredAt = utils.UnixtimeNano()
|
||||
@@ -486,6 +480,46 @@ func (pR *Room) StartBattle() {
|
||||
}
|
||||
}
|
||||
|
||||
downsyncLoop := func(playerId int32, player *Player, playerDownsyncChan chan InputsBufferSnapshot) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
Logger.Error("downsyncLoop, recovery spot#1, recovered from: ", zap.Any("roomId", pR.Id), zap.Any("playerId", playerId), zap.Any("panic", r))
|
||||
}
|
||||
Logger.Info(fmt.Sprintf("The `downsyncLoop` for (roomId=%v, playerId=%v) is stopped@renderFrameId=%v", pR.Id, playerId, pR.RenderFrameId))
|
||||
}()
|
||||
|
||||
Logger.Debug(fmt.Sprintf("Started downsyncLoop for (roomId: %d, playerId:%d, playerDownsyncChan:%p)", pR.Id, playerId, playerDownsyncChan))
|
||||
|
||||
for {
|
||||
select {
|
||||
case inputsBufferSnapshot := <-playerDownsyncChan:
|
||||
nowBattleState := atomic.LoadInt32(&pR.State)
|
||||
switch nowBattleState {
|
||||
case RoomBattleStateIns.IDLE:
|
||||
case RoomBattleStateIns.STOPPING_BATTLE_FOR_SETTLEMENT:
|
||||
case RoomBattleStateIns.IN_SETTLEMENT:
|
||||
case RoomBattleStateIns.IN_DISMISSAL:
|
||||
Logger.Warn(fmt.Sprintf("Battle is not waiting/preparing/active for playerDownsyncChan for (roomId: %d, playerId:%d)", pR.Id, playerId))
|
||||
return
|
||||
}
|
||||
|
||||
pR.downsyncToSinglePlayer(playerId, player, inputsBufferSnapshot.RefRenderFrameId, inputsBufferSnapshot.UnconfirmedMask, inputsBufferSnapshot.ToSendInputFrameDownsyncs)
|
||||
Logger.Debug(fmt.Sprintf("Sent inputsBufferSnapshot(refRenderFrameId:%d, unconfirmedMask:%v) to for (roomId: %d, playerId:%d)#2", inputsBufferSnapshot.RefRenderFrameId, inputsBufferSnapshot.UnconfirmedMask, pR.Id, playerId))
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for playerId, player := range pR.Players {
|
||||
/*
|
||||
Always instantiates a new channel and let the old one die out due to not being retained by any root reference.
|
||||
|
||||
Each "playerDownsyncChan" stays alive through out the lifecycle of room instead of each "playerDownsyncSession", i.e. not closed or dereferenced upon disconnection.
|
||||
*/
|
||||
pR.PlayerDownsyncChanDict[playerId] = make(chan InputsBufferSnapshot, pR.InputsBuffer.N)
|
||||
go downsyncLoop(playerId, player, pR.PlayerDownsyncChanDict[playerId])
|
||||
}
|
||||
|
||||
pR.onBattlePrepare(func() {
|
||||
pR.onBattleStarted() // NOTE: Deliberately not using `defer`.
|
||||
go battleMainLoop()
|
||||
@@ -497,7 +531,18 @@ func (pR *Room) toDiscreteInputsBufferIndex(inputFrameId int32, joinIndex int32)
|
||||
}
|
||||
|
||||
func (pR *Room) OnBattleCmdReceived(pReq *WsReq) {
|
||||
// [WARNING] This function "OnBattleCmdReceived" could be called by different ws sessions and thus from different threads!
|
||||
/*
|
||||
[WARNING] This function "OnBattleCmdReceived" could be called by different ws sessions and thus from different threads!
|
||||
|
||||
That said, "markConfirmationIfApplicable" will still work as expected. Here's an example of weird call orders.
|
||||
---------------------------------------------------
|
||||
now lastAllConfirmedInputFrameId: 42; each "()" below indicates a "Lock/Unlock cycle of InputsBufferLock", and "x" indicates no new all-confirmed snapshot is created
|
||||
A: ([44,50],x) ([49,54],snapshot=[51,53])
|
||||
B: ([54,58],x)
|
||||
C: ([42,53],snapshot=[43,50])
|
||||
D: ([51,55],x)
|
||||
---------------------------------------------------
|
||||
*/
|
||||
// TODO: Put a rate limiter on this function!
|
||||
if swapped := atomic.CompareAndSwapInt32(&pR.State, RoomBattleStateIns.IN_BATTLE, RoomBattleStateIns.IN_BATTLE); !swapped {
|
||||
return
|
||||
@@ -518,15 +563,17 @@ func (pR *Room) OnBattleCmdReceived(pReq *WsReq) {
|
||||
atomic.StoreInt32(&(player.AckingFrameId), ackingFrameId)
|
||||
atomic.StoreInt32(&(player.AckingInputFrameId), ackingInputFrameId)
|
||||
|
||||
newAllConfirmedCount, refRenderFrameIdIfNeeded, inputsBufferSnapshot := pR.markConfirmationIfApplicable(inputFrameUpsyncBatch, playerId, player)
|
||||
Logger.Debug(fmt.Sprintf("OnBattleCmdReceived-InputsBufferLock about to lock: roomId=%v, fromPlayerId=%v", pR.Id, playerId))
|
||||
pR.InputsBufferLock.Lock()
|
||||
Logger.Debug(fmt.Sprintf("OnBattleCmdReceived-InputsBufferLock locked: roomId=%v, fromPlayerId=%v", pR.Id, playerId))
|
||||
defer func() {
|
||||
pR.InputsBufferLock.Unlock()
|
||||
Logger.Debug(fmt.Sprintf("OnBattleCmdReceived-InputsBufferLock unlocked: roomId=%v, fromPlayerId=%v", pR.Id, playerId))
|
||||
}()
|
||||
|
||||
if swapped := atomic.CompareAndSwapInt32(&pR.State, RoomBattleStateIns.IN_BATTLE, RoomBattleStateIns.IN_BATTLE); !swapped {
|
||||
return
|
||||
}
|
||||
// [WARNING] By now "pR.InputsBufferLock" is unlocked
|
||||
if 0 < newAllConfirmedCount {
|
||||
// Downsync new all-confirmed inputFrames asap
|
||||
pR.downsyncToAllPlayers(refRenderFrameIdIfNeeded, uint64(0), inputsBufferSnapshot)
|
||||
inputsBufferSnapshot := pR.markConfirmationIfApplicable(inputFrameUpsyncBatch, playerId, player)
|
||||
if nil != inputsBufferSnapshot {
|
||||
pR.downsyncToAllPlayers(inputsBufferSnapshot)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -677,7 +724,10 @@ func (pR *Room) OnDismissed() {
|
||||
pR.PlayersArr = make([]*Player, pR.Capacity)
|
||||
pR.CollisionSysMap = make(map[int32]*resolv.Object)
|
||||
pR.PlayerDownsyncSessionDict = make(map[int32]*websocket.Conn)
|
||||
pR.PlayerDownsyncSessionLock = make(map[int32]*sync.Mutex, pR.Capacity)
|
||||
for _, oldChan := range pR.PlayerDownsyncChanDict {
|
||||
close(oldChan)
|
||||
}
|
||||
pR.PlayerDownsyncChanDict = make(map[int32](chan InputsBufferSnapshot))
|
||||
pR.PlayerSignalToCloseDict = make(map[int32]SignalToCloseConnCbType)
|
||||
pR.JoinIndexBooleanArr = make([]bool, pR.Capacity)
|
||||
pR.Barriers = make(map[int32]*Barrier)
|
||||
@@ -739,7 +789,7 @@ func (pR *Room) OnDismissed() {
|
||||
pR.State = RoomBattleStateIns.IDLE
|
||||
pR.updateScore()
|
||||
|
||||
Logger.Info("The room is completely dismissed:", zap.Any("roomId", pR.Id))
|
||||
Logger.Info("The room is completely dismissed(all playerDownsyncChan closed):", zap.Any("roomId", pR.Id))
|
||||
}
|
||||
|
||||
func (pR *Room) expelPlayerDuringGame(playerId int32) {
|
||||
@@ -830,7 +880,7 @@ func (pR *Room) onPlayerLost(playerId int32) {
|
||||
func (pR *Room) clearPlayerNetworkSession(playerId int32) {
|
||||
if _, y := pR.PlayerDownsyncSessionDict[playerId]; y {
|
||||
Logger.Debug("clearPlayerNetworkSession:", zap.Any("roomId", pR.Id), zap.Any("playerId", playerId))
|
||||
// [WARNING] No need to delete "pR.PlayerDownsyncSessionLock[playerId]" here!
|
||||
// [WARNING] No need to close "pR.PlayerDownsyncChanDict[playerId]" immediately!
|
||||
delete(pR.PlayerDownsyncSessionDict, playerId)
|
||||
delete(pR.PlayerSignalToCloseDict, playerId)
|
||||
}
|
||||
@@ -952,20 +1002,12 @@ func (pR *Room) OnPlayerBattleColliderAcked(playerId int32) bool {
|
||||
}
|
||||
|
||||
func (pR *Room) sendSafely(roomDownsyncFrame *RoomDownsyncFrame, toSendInputFrameDownsyncs []*InputFrameDownsync, act int32, playerId int32, needLockExplicitly bool) {
|
||||
// [WARNING] This function MUST BE called while "pR.PlayerDownsyncSessionLock[player.JoinIndex-1]" is locked!
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
Logger.Error("sendSafely, recovered from: ", zap.Any("roomId", pR.Id), zap.Any("playerId", playerId), zap.Any("panic", r))
|
||||
}
|
||||
}()
|
||||
|
||||
if needLockExplicitly {
|
||||
pR.PlayerDownsyncSessionLock[playerId].Lock()
|
||||
defer func() {
|
||||
pR.PlayerDownsyncSessionLock[playerId].Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
if playerDownsyncSession, existent := pR.PlayerDownsyncSessionDict[playerId]; existent {
|
||||
pResp := &WsResp{
|
||||
Ret: int32(Constants.RetCode.Ok),
|
||||
@@ -996,49 +1038,43 @@ func (pR *Room) shouldPrefabInputFrameDownsync(prevRenderFrameId int32, renderFr
|
||||
|
||||
func (pR *Room) prefabInputFrameDownsync(inputFrameId int32) *InputFrameDownsync {
|
||||
/*
|
||||
[WARNING] This function MUST BE called while "pR.InputsBufferLock" is locked.
|
||||
|
||||
Kindly note that on backend the prefab is much simpler than its frontend counterpart, because frontend will upsync its latest command immediately if there's any change w.r.t. its own prev cmd, thus if no upsync received from a frontend,
|
||||
- EITHER it's due to local lag and bad network,
|
||||
- OR there's no change w.r.t. to its prev cmd.
|
||||
*/
|
||||
|
||||
var currInputFrameDownsync *InputFrameDownsync = nil
|
||||
tmp1 := pR.InputsBuffer.GetByFrameId(inputFrameId) // Would be nil if "pR.InputsBuffer.EdFrameId <= inputFrameId", else if "pR.InputsBuffer.EdFrameId > inputFrameId" is already met, then by now we can just return "tmp1.(*InputFrameDownsync)"
|
||||
if nil == tmp1 {
|
||||
for pR.InputsBuffer.EdFrameId <= inputFrameId {
|
||||
j := pR.InputsBuffer.EdFrameId
|
||||
currInputFrameDownsync = &InputFrameDownsync{
|
||||
InputFrameId: j,
|
||||
InputList: make([]uint64, pR.Capacity),
|
||||
ConfirmedList: uint64(0),
|
||||
}
|
||||
|
||||
if 0 == inputFrameId && 0 == pR.InputsBuffer.Cnt {
|
||||
currInputFrameDownsync = &InputFrameDownsync{
|
||||
InputFrameId: 0,
|
||||
InputList: make([]uint64, pR.Capacity),
|
||||
ConfirmedList: uint64(0),
|
||||
tmp2 := pR.InputsBuffer.GetByFrameId(j - 1) // There's no need for the backend to find the "lastAllConfirmed inputs" for prefabbing, either "BackendDynamicsEnabled" is true or false
|
||||
if nil != tmp2 {
|
||||
prevInputFrameDownsync := tmp2.(*InputFrameDownsync)
|
||||
for i, _ := range currInputFrameDownsync.InputList {
|
||||
currInputFrameDownsync.InputList[i] = (prevInputFrameDownsync.InputList[i] & uint64(15)) // Don't predict attack input!
|
||||
}
|
||||
}
|
||||
|
||||
pR.InputsBuffer.Put(currInputFrameDownsync)
|
||||
}
|
||||
} else {
|
||||
tmp := pR.InputsBuffer.GetByFrameId(inputFrameId - 1) // There's no need for the backend to find the "lastAllConfirmed inputs" for prefabbing, either "BackendDynamicsEnabled" is true or false
|
||||
if nil == tmp {
|
||||
panic(fmt.Sprintf("Error prefabbing inputFrameDownsync: roomId=%v, InputsBuffer=%v", pR.Id, pR.InputsBufferString(false)))
|
||||
}
|
||||
prevInputFrameDownsync := tmp.(*InputFrameDownsync)
|
||||
currInputList := make([]uint64, pR.Capacity) // Would be a clone of the values
|
||||
for i, _ := range currInputList {
|
||||
currInputList[i] = (prevInputFrameDownsync.InputList[i] & uint64(15)) // Don't predict attack input!
|
||||
}
|
||||
currInputFrameDownsync = &InputFrameDownsync{
|
||||
InputFrameId: inputFrameId,
|
||||
InputList: currInputList,
|
||||
ConfirmedList: uint64(0),
|
||||
}
|
||||
currInputFrameDownsync = tmp1.(*InputFrameDownsync)
|
||||
}
|
||||
|
||||
pR.InputsBuffer.Put(currInputFrameDownsync)
|
||||
return currInputFrameDownsync
|
||||
}
|
||||
|
||||
func (pR *Room) markConfirmationIfApplicable(inputFrameUpsyncBatch []*InputFrameUpsync, playerId int32, player *Player) (int32, int32, []*InputFrameDownsync) {
|
||||
Logger.Debug(fmt.Sprintf("markConfirmationIfApplicable-InputsBufferLock about to lock: roomId=%v, fromPlayerId=%v", pR.Id, playerId))
|
||||
pR.InputsBufferLock.Lock()
|
||||
Logger.Debug(fmt.Sprintf("markConfirmationIfApplicable-InputsBufferLock locked: roomId=%v, fromPlayerId=%v", pR.Id, playerId))
|
||||
defer func() {
|
||||
pR.InputsBufferLock.Unlock()
|
||||
Logger.Debug(fmt.Sprintf("markConfirmationIfApplicable-InputsBufferLock unlocked: roomId=%v, fromPlayerId=%v", pR.Id, playerId))
|
||||
}()
|
||||
|
||||
func (pR *Room) markConfirmationIfApplicable(inputFrameUpsyncBatch []*InputFrameUpsync, playerId int32, player *Player) *InputsBufferSnapshot {
|
||||
// [WARNING] This function MUST BE called while "pR.InputsBufferLock" is locked!
|
||||
for _, inputFrameUpsync := range inputFrameUpsyncBatch {
|
||||
clientInputFrameId := inputFrameUpsync.InputFrameId
|
||||
if clientInputFrameId < pR.InputsBuffer.StFrameId {
|
||||
@@ -1102,44 +1138,36 @@ func (pR *Room) markConfirmationIfApplicable(inputFrameUpsyncBatch []*InputFrame
|
||||
}
|
||||
}
|
||||
|
||||
refRenderFrameIdIfNeeded := pR.CurDynamicsRenderFrameId - 1
|
||||
var inputsBufferSnapshot []*InputFrameDownsync = nil
|
||||
if 0 < newAllConfirmedCount {
|
||||
refRenderFrameIdIfNeeded := pR.CurDynamicsRenderFrameId - 1
|
||||
/*
|
||||
[WARNING]
|
||||
|
||||
If "pR.InputsBufferLock" was previously held by "battleMainLoop -> applyInputFrameDownsyncDynamics", then this value would be just (pR.LastAllConfirmedInputFrameId - newAllConfirmedCount).
|
||||
If "pR.InputsBufferLock" was previously held by "doBattleMainLoopPerTickBackendDynamicsWithProperLocking", then this value would be just (pR.LastAllConfirmedInputFrameId - newAllConfirmedCount).
|
||||
|
||||
However if "pR.InputsBufferLock" was previously held by another "OnBattleCmdReceived -> markConfirmationIfApplicable", this value might be smaller than (pR.LastAllConfirmedInputFrameId - newAllConfirmedCount)!
|
||||
However if "pR.InputsBufferLock" was previously held by another "OnBattleCmdReceived", this value might be smaller than (pR.LastAllConfirmedInputFrameId - newAllConfirmedCount)!
|
||||
*/
|
||||
snapshotStFrameId := pR.ConvertToInputFrameId(refRenderFrameIdIfNeeded, pR.InputDelayFrames)
|
||||
// Duplicate downsynced inputFrameIds will be filtered out by frontend.
|
||||
inputsBufferSnapshot = pR.createInputsBufferSnapshot(snapshotStFrameId, pR.LastAllConfirmedInputFrameId+1)
|
||||
toSendInputFrameDownsyncs := pR.cloneInputsBuffer(snapshotStFrameId, pR.LastAllConfirmedInputFrameId+1)
|
||||
Logger.Debug(fmt.Sprintf("markConfirmationIfApplicable for roomId=%v returning newAllConfirmedCount=%d: InputsBuffer=%v", pR.Id, newAllConfirmedCount, pR.InputsBufferString(false)))
|
||||
return &InputsBufferSnapshot{
|
||||
RefRenderFrameId: refRenderFrameIdIfNeeded,
|
||||
UnconfirmedMask: uint64(0),
|
||||
ToSendInputFrameDownsyncs: toSendInputFrameDownsyncs,
|
||||
}
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
|
||||
Logger.Debug(fmt.Sprintf("markConfirmationIfApplicable for roomId=%v returning newAllConfirmedCount=%d: InputsBuffer=%v", pR.Id, newAllConfirmedCount, pR.InputsBufferString(false)))
|
||||
return newAllConfirmedCount, refRenderFrameIdIfNeeded, inputsBufferSnapshot
|
||||
}
|
||||
|
||||
func (pR *Room) forceConfirmationIfApplicable(prevRenderFrameId int32) (uint64, int32, []*InputFrameDownsync) {
|
||||
func (pR *Room) forceConfirmationIfApplicable(prevRenderFrameId int32) *InputsBufferSnapshot {
|
||||
// [WARNING] This function MUST BE called while "pR.InputsBufferLock" is locked!
|
||||
// Force confirmation of non-all-confirmed inputFrame EXACTLY ONE AT A TIME, returns the non-confirmed mask of players, e.g. in a 4-player-battle returning 1001 means that players with JoinIndex=1 and JoinIndex=4 are non-confirmed for inputFrameId2
|
||||
|
||||
/*
|
||||
Upon resynced on frontend, "refRenderFrameId" is now set to as advanced as possible, and it's the frontend's responsibility now to pave way for the "gap inputFrames"
|
||||
|
||||
If "NstDelayFrames" becomes larger, "pR.RenderFrameId - refRenderFrameId" possibly becomes larger because the force confirmation is delayed more.
|
||||
|
||||
Upon resync, it's still possible that "refRenderFrameId < frontend.chaserRenderFrameId" -- and this is allowed.
|
||||
*/
|
||||
refRenderFrameIdIfNeeded := pR.CurDynamicsRenderFrameId - 1
|
||||
if 0 > refRenderFrameIdIfNeeded {
|
||||
// Without a "refRenderFrame", there's no point to force confirmation, i.e. nothing to downsync to the "ACTIVE but slowly ticking frontend(s)"
|
||||
return 0, -1, nil
|
||||
}
|
||||
renderFrameId1 := (pR.RenderFrameId - pR.NstDelayFrames) // the renderFrameId which should've been rendered on frontend
|
||||
if 0 > renderFrameId1 {
|
||||
return 0, -1, nil
|
||||
return nil
|
||||
}
|
||||
ok := false
|
||||
renderFrameId2 := int32(-1)
|
||||
@@ -1149,14 +1177,14 @@ func (pR *Room) forceConfirmationIfApplicable(prevRenderFrameId int32) (uint64,
|
||||
|
||||
It's also important that "forceConfirmationIfApplicable" is NOT EXECUTED for every renderFrame, such that when a player is forced to resync, it has some time, i.e. (1 << InputScaleFrames) renderFrames, to upsync again.
|
||||
*/
|
||||
return 0, -1, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
inputFrameId2 := pR.ConvertToInputFrameId(renderFrameId2, 0) // The inputFrame to force confirmation (if necessary)
|
||||
if inputFrameId2 < pR.LastAllConfirmedInputFrameId {
|
||||
// No need to force confirmation, the inputFrames already arrived
|
||||
Logger.Debug(fmt.Sprintf("inputFrameId2=%v is already all-confirmed for roomId=%v[type#1], no need to force confirmation of it", inputFrameId2, pR.Id))
|
||||
return 0, -1, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
tmp := pR.InputsBuffer.GetByFrameId(inputFrameId2)
|
||||
@@ -1174,14 +1202,31 @@ func (pR *Room) forceConfirmationIfApplicable(prevRenderFrameId int32) (uint64,
|
||||
inputFrame2.ConfirmedList = allConfirmedMask
|
||||
pR.onInputFrameDownsyncAllConfirmed(inputFrame2, -1)
|
||||
|
||||
var inputsBufferSnapshot []*InputFrameDownsync = nil
|
||||
if 0 < unconfirmedMask {
|
||||
// This condition should be rarely met!
|
||||
snapshotStFrameId := pR.ConvertToInputFrameId(refRenderFrameIdIfNeeded, pR.InputDelayFrames)
|
||||
inputsBufferSnapshot = pR.createInputsBufferSnapshot(snapshotStFrameId, pR.LastAllConfirmedInputFrameId+1)
|
||||
}
|
||||
nextDynamicsRenderFrameId := pR.ConvertToLastUsedRenderFrameId(pR.LastAllConfirmedInputFrameId, pR.InputDelayFrames)
|
||||
/*
|
||||
Upon resynced on frontend, "refRenderFrameId" is now set to as advanced as possible, and it's the frontend's responsibility now to pave way for the "gap inputFrames"
|
||||
|
||||
return unconfirmedMask, refRenderFrameIdIfNeeded, inputsBufferSnapshot
|
||||
If "NstDelayFrames" becomes larger, "pR.RenderFrameId - refRenderFrameId" possibly becomes larger because the force confirmation is delayed more.
|
||||
|
||||
Upon resync, it's still possible that "refRenderFrameId < frontend.chaserRenderFrameId" -- and this is allowed.
|
||||
*/
|
||||
refRenderFrameIdIfNeeded := nextDynamicsRenderFrameId - 1
|
||||
if 0 > refRenderFrameIdIfNeeded {
|
||||
// Without a "refRenderFrame", there's no point to force confirmation, i.e. nothing to downsync to the "ACTIVE but slowly ticking frontend(s)"
|
||||
return nil
|
||||
}
|
||||
snapshotStFrameId := pR.ConvertToInputFrameId(refRenderFrameIdIfNeeded, pR.InputDelayFrames)
|
||||
toSendInputFrameDownsyncs := pR.cloneInputsBuffer(snapshotStFrameId, pR.LastAllConfirmedInputFrameId+1)
|
||||
return &InputsBufferSnapshot{
|
||||
RefRenderFrameId: refRenderFrameIdIfNeeded,
|
||||
UnconfirmedMask: unconfirmedMask,
|
||||
ToSendInputFrameDownsyncs: toSendInputFrameDownsyncs,
|
||||
}
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (pR *Room) applyInputFrameDownsyncDynamics(fromRenderFrameId int32, toRenderFrameId int32, spaceOffsetX, spaceOffsetY float64) {
|
||||
@@ -1498,26 +1543,25 @@ func (pR *Room) printBarrier(barrierCollider *resolv.Object) {
|
||||
Logger.Info(fmt.Sprintf("Barrier in roomId=%v: w=%v, h=%v, shape=%v", pR.Id, barrierCollider.W, barrierCollider.H, barrierCollider.Shape))
|
||||
}
|
||||
|
||||
func (pR *Room) doBattleMainLoopPerTickBackendDynamicsWithProperLocking(prevRenderFrameId int32, pDynamicsDuration *int64) (uint64, int32, []*InputFrameDownsync) {
|
||||
func (pR *Room) doBattleMainLoopPerTickBackendDynamicsWithProperLocking(prevRenderFrameId int32, pDynamicsDuration *int64) {
|
||||
Logger.Debug(fmt.Sprintf("doBattleMainLoopPerTickBackendDynamicsWithProperLocking-InputsBufferLock to about lock: roomId=%v", pR.Id))
|
||||
pR.InputsBufferLock.Lock()
|
||||
Logger.Debug(fmt.Sprintf("doBattleMainLoopPerTickBackendDynamicsWithProperLocking-InputsBufferLock locked: roomId=%v", pR.Id))
|
||||
|
||||
defer func() {
|
||||
// [WARNING] Unlock before calling "downsyncToAllPlayers -> downsyncToSinglePlayer" to avoid unlocking lags due to blocking network I/O
|
||||
pR.InputsBufferLock.Unlock()
|
||||
Logger.Debug(fmt.Sprintf("doBattleMainLoopPerTickBackendDynamicsWithProperLocking-InputsBufferLock unlocked: roomId=%v", pR.Id))
|
||||
}()
|
||||
|
||||
if ok, inputFrameGeneratingRenderFrameId := pR.shouldPrefabInputFrameDownsync(prevRenderFrameId, pR.RenderFrameId); ok {
|
||||
noDelayInputFrameId := pR.ConvertToInputFrameId(inputFrameGeneratingRenderFrameId, 0)
|
||||
if ok, thatRenderFrameId := pR.shouldPrefabInputFrameDownsync(prevRenderFrameId, pR.RenderFrameId); ok {
|
||||
noDelayInputFrameId := pR.ConvertToInputFrameId(thatRenderFrameId, 0)
|
||||
if existingInputFrame := pR.InputsBuffer.GetByFrameId(noDelayInputFrameId); nil == existingInputFrame {
|
||||
pR.prefabInputFrameDownsync(noDelayInputFrameId)
|
||||
}
|
||||
}
|
||||
|
||||
// Force setting all-confirmed of buffered inputFrames periodically, kindly note that if "pR.BackendDynamicsEnabled", what we want to achieve is "recovery upon reconnection", which certainly requires "forceConfirmationIfApplicable" to move "pR.LastAllConfirmedInputFrameId" forward as much as possible
|
||||
unconfirmedMask, refRenderFrameId, inputsBufferSnapshot := pR.forceConfirmationIfApplicable(prevRenderFrameId)
|
||||
inputsBufferSnapshot := pR.forceConfirmationIfApplicable(prevRenderFrameId)
|
||||
|
||||
if 0 <= pR.LastAllConfirmedInputFrameId {
|
||||
dynamicsStartedAt := utils.UnixtimeNano()
|
||||
@@ -1528,16 +1572,24 @@ func (pR *Room) doBattleMainLoopPerTickBackendDynamicsWithProperLocking(prevRend
|
||||
*pDynamicsDuration = utils.UnixtimeNano() - dynamicsStartedAt
|
||||
}
|
||||
|
||||
return unconfirmedMask, refRenderFrameId, inputsBufferSnapshot
|
||||
if nil != inputsBufferSnapshot {
|
||||
Logger.Warn(fmt.Sprintf("roomId=%v, room.RenderFrameId=%v, room.CurDynamicsRenderFrameId=%v, room.LastAllConfirmedInputFrameId=%v, unconfirmedMask=%v", pR.Id, pR.RenderFrameId, pR.CurDynamicsRenderFrameId, pR.LastAllConfirmedInputFrameId, inputsBufferSnapshot.UnconfirmedMask))
|
||||
pR.downsyncToAllPlayers(inputsBufferSnapshot)
|
||||
}
|
||||
}
|
||||
|
||||
func (pR *Room) downsyncToAllPlayers(refRenderFrameId int32, unconfirmedMask uint64, inputsBufferSnapshot []*InputFrameDownsync) {
|
||||
func (pR *Room) downsyncToAllPlayers(inputsBufferSnapshot *InputsBufferSnapshot) {
|
||||
/*
|
||||
[WARNING] This function MUST BE called while "pR.InputsBufferLock" is locked for preserving the order of generation of "inputsBufferSnapshot" -- see comments in "OnBattleCmdReceived" and [this issue](https://github.com/genxium/DelayNoMore/issues/12).
|
||||
|
||||
Actually if each player session were both intrinsically thread-safe & non-blocking for writing (like Java NIO), I could've just called "playerSession.WriteMessage" while holding "pR.InputsBufferLock" -- but the ws session provided by Gorilla library is neither thread-safe nor non-blocking for writing, which is fine because it creates a chance for the users to solve an interesting problem :)
|
||||
*/
|
||||
/*
|
||||
[WARNING] This function MUST BE called while "pR.InputsBufferLock" is unlocked -- otherwise the blocking "sendSafely" might cause significant lag!
|
||||
Moreover, we're downsyncing a same "inputsBufferSnapshot" for all players in the same battle and this is by design, i.e. not respecting "player.LastSentInputFrameId" because "new all-confirmed inputFrameDownsyncs" are the same for all players and ws is TCP-based (no loss of consecutive packets except for reconnection -- which is already handled by READDED_BATTLE_COLLIDER_ACKED)
|
||||
*/
|
||||
for _, player := range pR.PlayersArr {
|
||||
pR.downsyncToSinglePlayer(player.Id, player, refRenderFrameId, unconfirmedMask, inputsBufferSnapshot)
|
||||
for playerId, playerDownsyncChan := range pR.PlayerDownsyncChanDict {
|
||||
playerDownsyncChan <- (*inputsBufferSnapshot)
|
||||
Logger.Debug(fmt.Sprintf("Sent inputsBufferSnapshot(refRenderFrameId:%d, unconfirmedMask:%v) to for (roomId: %d, playerId:%d, playerDownsyncChan:%p)#1", inputsBufferSnapshot.RefRenderFrameId, inputsBufferSnapshot.UnconfirmedMask, pR.Id, playerId, playerDownsyncChan))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1548,14 +1600,6 @@ func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, refRender
|
||||
*/
|
||||
|
||||
playerJoinIndex := player.JoinIndex - 1
|
||||
Logger.Debug(fmt.Sprintf("downsyncToSinglePlayer-SessionLock about to lock: roomId=%v, playerId=%v", pR.Id, playerId))
|
||||
pR.PlayerDownsyncSessionLock[playerId].Lock()
|
||||
Logger.Debug(fmt.Sprintf("downsyncToSinglePlayer-SessionLock locked: roomId=%v, playerId=%v", pR.Id, playerId))
|
||||
defer func() {
|
||||
pR.PlayerDownsyncSessionLock[playerId].Unlock()
|
||||
Logger.Debug(fmt.Sprintf("downsyncToSinglePlayer-SessionLock unlocked: roomId=%v, playerId=%v", pR.Id, playerId))
|
||||
}()
|
||||
|
||||
playerBattleState := atomic.LoadInt32(&(player.BattleState))
|
||||
switch playerBattleState {
|
||||
case PlayerBattleStateIns.DISCONNECTED:
|
||||
@@ -1567,7 +1611,7 @@ func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, refRender
|
||||
return
|
||||
}
|
||||
|
||||
shouldResync1 := (PlayerBattleStateIns.READDED_BATTLE_COLLIDER_ACKED == playerBattleState) // i.e. implies that "MAGIC_LAST_SENT_INPUT_FRAME_ID_READDED == player.LastSentInputFrameId", kindly note that this check as well as the re-assignment at the bottom of this function are both guarded by "pR.PlayerDownsyncSessionLock[playerId]"
|
||||
shouldResync1 := (PlayerBattleStateIns.READDED_BATTLE_COLLIDER_ACKED == playerBattleState) // i.e. implies that "MAGIC_LAST_SENT_INPUT_FRAME_ID_READDED == player.LastSentInputFrameId"
|
||||
shouldResync2 := (0 < (unconfirmedMask & uint64(1<<uint32(playerJoinIndex)))) // This condition is critical, if we don't send resync upon this condition, the "reconnected or slowly-clocking player" might never get its input synced
|
||||
// shouldResync2 := (0 < unconfirmedMask) // An easier version of the above, might keep sending "refRenderFrame"s to still connected players when any player is disconnected
|
||||
shouldResyncOverall := (shouldResync1 || shouldResync2)
|
||||
@@ -1606,9 +1650,9 @@ func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, refRender
|
||||
}
|
||||
}
|
||||
|
||||
func (pR *Room) createInputsBufferSnapshot(stFrameId, edFrameId int32) []*InputFrameDownsync {
|
||||
func (pR *Room) cloneInputsBuffer(stFrameId, edFrameId int32) []*InputFrameDownsync {
|
||||
// [WARNING] This function MUST BE called while "pR.InputsBufferLock" is locked!
|
||||
snapshot := make([]*InputFrameDownsync, 0, edFrameId-stFrameId)
|
||||
cloned := make([]*InputFrameDownsync, 0, edFrameId-stFrameId)
|
||||
prevFrameFound := false
|
||||
j := stFrameId
|
||||
for j < edFrameId {
|
||||
@@ -1632,9 +1676,9 @@ func (pR *Room) createInputsBufferSnapshot(stFrameId, edFrameId int32) []*InputF
|
||||
for i, input := range foo.InputList {
|
||||
bar.InputList[i] = input
|
||||
}
|
||||
snapshot = append(snapshot, bar)
|
||||
cloned = append(cloned, bar)
|
||||
j++
|
||||
}
|
||||
|
||||
return snapshot
|
||||
return cloned
|
||||
}
|
||||
|
Reference in New Issue
Block a user