Drafted peer inputFrameUpsync broadcasting mechanism.

This commit is contained in:
genxium
2023-01-18 15:36:04 +08:00
parent 48074d48af
commit b81c470135
9 changed files with 690 additions and 314 deletions

View File

@@ -27,10 +27,11 @@ const (
UPSYNC_MSG_ACT_PLAYER_CMD = int32(2)
UPSYNC_MSG_ACT_PLAYER_COLLIDER_ACK = int32(3)
DOWNSYNC_MSG_ACT_HB_REQ = int32(1)
DOWNSYNC_MSG_ACT_INPUT_BATCH = int32(2)
DOWNSYNC_MSG_ACT_BATTLE_STOPPED = int32(3)
DOWNSYNC_MSG_ACT_FORCED_RESYNC = int32(4)
DOWNSYNC_MSG_ACT_HB_REQ = int32(1)
DOWNSYNC_MSG_ACT_INPUT_BATCH = int32(2)
DOWNSYNC_MSG_ACT_BATTLE_STOPPED = int32(3)
DOWNSYNC_MSG_ACT_FORCED_RESYNC = int32(4)
DOWNSYNC_MSG_ACT_PEER_INPUT_BATCH = int32(5)
DOWNSYNC_MSG_ACT_BATTLE_READY_TO_START = int32(-1)
DOWNSYNC_MSG_ACT_BATTLE_START = int32(0)
@@ -116,10 +117,15 @@ type Room struct {
*
* Moreover, during the invocation of `PlayerSignalToCloseDict`, the `Player` instance is supposed to be deallocated (though not synchronously).
*/
PlayerDownsyncSessionDict map[int32]*websocket.Conn
PlayerDownsyncChanDict map[int32](chan pb.InputsBufferSnapshot)
PlayerDownsyncSessionDict map[int32]*websocket.Conn
PlayerSignalToCloseDict map[int32]SignalToCloseConnCbType
PlayerDownsyncChanDict map[int32](chan pb.InputsBufferSnapshot)
PlayerSecondaryDownsyncSessionDict map[int32]*websocket.Conn
PlayerSecondarySignalToCloseDict map[int32]SignalToCloseConnCbType
PlayerSecondaryDownsyncChanDict map[int32](chan pb.InputsBufferSnapshot)
PlayerActiveWatchdogDict map[int32](*Watchdog)
PlayerSignalToCloseDict map[int32]SignalToCloseConnCbType
Score float32
State int32
Index int
@@ -184,7 +190,7 @@ func (pR *Room) AddPlayerIfPossible(pPlayerFromDbInit *Player, session *websocke
pR.PlayerSignalToCloseDict[playerId] = signalToCloseConnOfThisPlayer
newWatchdog := NewWatchdog(ConstVals.Ws.WillKickIfInactiveFor, func() {
Logger.Warn("Conn inactive watchdog triggered#1:", zap.Any("playerId", playerId), zap.Any("roomId", pR.Id), zap.Any("roomState", pR.State), zap.Any("roomEffectivePlayerCount", pR.EffectivePlayerCount))
signalToCloseConnOfThisPlayer(Constants.RetCode.ActiveWatchdog, "")
pR.signalToCloseAllSessionsOfPlayer(playerId, Constants.RetCode.ActiveWatchdog)
})
newWatchdog.Stop()
pR.PlayerActiveWatchdogDict[playerId] = newWatchdog
@@ -221,7 +227,7 @@ func (pR *Room) ReAddPlayerIfPossible(pTmpPlayerInstance *Player, session *webso
pR.PlayerSignalToCloseDict[playerId] = signalToCloseConnOfThisPlayer
pR.PlayerActiveWatchdogDict[playerId] = NewWatchdog(ConstVals.Ws.WillKickIfInactiveFor, func() {
Logger.Warn("Conn inactive watchdog triggered#2:", zap.Any("playerId", playerId), zap.Any("roomId", pR.Id), zap.Any("roomState", pR.State), zap.Any("roomEffectivePlayerCount", pR.EffectivePlayerCount))
signalToCloseConnOfThisPlayer(Constants.RetCode.ActiveWatchdog, "")
pR.signalToCloseAllSessionsOfPlayer(playerId, Constants.RetCode.ActiveWatchdog)
}) // For ReAdded player the new watchdog starts immediately
Logger.Warn("ReAddPlayerIfPossible finished.", zap.Any("roomId", pR.Id), zap.Any("playerId", playerId), zap.Any("joinIndex", pEffectiveInRoomPlayerInstance.JoinIndex), zap.Any("playerBattleState", pEffectiveInRoomPlayerInstance.BattleState), zap.Any("roomState", pR.State), zap.Any("roomEffectivePlayerCount", pR.EffectivePlayerCount), zap.Any("AckingFrameId", pEffectiveInRoomPlayerInstance.AckingFrameId), zap.Any("AckingInputFrameId", pEffectiveInRoomPlayerInstance.AckingInputFrameId), zap.Any("LastSentInputFrameId", pEffectiveInRoomPlayerInstance.LastSentInputFrameId))
@@ -482,7 +488,7 @@ func (pR *Room) StartBattle() {
kickoffFrameJs := pR.RenderFrameBuffer.GetByFrameId(0).(*battle.RoomDownsyncFrame)
pbKickOffRenderFrame := toPbRoomDownsyncFrame(kickoffFrameJs)
pbKickOffRenderFrame.SpeciesIdList = pR.SpeciesIdList
pR.sendSafely(pbKickOffRenderFrame, nil, DOWNSYNC_MSG_ACT_BATTLE_START, playerId, true)
pR.sendSafely(pbKickOffRenderFrame, nil, DOWNSYNC_MSG_ACT_BATTLE_START, playerId, true, MAGIC_JOIN_INDEX_DEFAULT)
}
Logger.Info(fmt.Sprintf("In `battleMainLoop` for roomId=%v sent out kickoffFrame", pR.Id))
}
@@ -509,7 +515,7 @@ func (pR *Room) StartBattle() {
}
}
downsyncLoop := func(playerId int32, player *Player, playerDownsyncChan chan pb.InputsBufferSnapshot) {
downsyncLoop := func(playerId int32, player *Player, playerDownsyncChan chan pb.InputsBufferSnapshot, playerSecondaryDownsyncChan chan pb.InputsBufferSnapshot) {
defer func() {
if r := recover(); r != nil {
Logger.Error("downsyncLoop, recovery spot#1, recovered from: ", zap.Any("roomId", pR.Id), zap.Any("playerId", playerId), zap.Any("panic", r))
@@ -517,19 +523,23 @@ func (pR *Room) StartBattle() {
Logger.Info(fmt.Sprintf("The `downsyncLoop` for (roomId=%v, playerId=%v) is stopped@renderFrameId=%v", pR.Id, playerId, pR.RenderFrameId))
}()
Logger.Debug(fmt.Sprintf("Started downsyncLoop for (roomId: %d, playerId:%d, playerDownsyncChan:%p)", pR.Id, playerId, playerDownsyncChan))
//Logger.Info(fmt.Sprintf("Started downsyncLoop for (roomId: %d, playerId:%d, playerDownsyncChan:%p)", pR.Id, playerId, playerDownsyncChan))
for {
nowBattleState := atomic.LoadInt32(&pR.State)
switch nowBattleState {
case RoomBattleStateIns.IDLE, RoomBattleStateIns.STOPPING_BATTLE_FOR_SETTLEMENT, RoomBattleStateIns.IN_SETTLEMENT, RoomBattleStateIns.IN_DISMISSAL:
Logger.Warn(fmt.Sprintf("Battle is not waiting/preparing/active for playerDownsyncChan for (roomId: %d, playerId:%d)", pR.Id, playerId))
return
}
select {
case inputsBufferSnapshot := <-playerDownsyncChan:
nowBattleState := atomic.LoadInt32(&pR.State)
switch nowBattleState {
case RoomBattleStateIns.IDLE, RoomBattleStateIns.STOPPING_BATTLE_FOR_SETTLEMENT, RoomBattleStateIns.IN_SETTLEMENT, RoomBattleStateIns.IN_DISMISSAL:
Logger.Warn(fmt.Sprintf("Battle is not waiting/preparing/active for playerDownsyncChan for (roomId: %d, playerId:%d)", pR.Id, playerId))
return
}
pR.downsyncToSinglePlayer(playerId, player, inputsBufferSnapshot.RefRenderFrameId, inputsBufferSnapshot.UnconfirmedMask, inputsBufferSnapshot.ToSendInputFrameDownsyncs, inputsBufferSnapshot.ShouldForceResync)
//Logger.Info(fmt.Sprintf("Sent inputsBufferSnapshot(refRenderFrameId:%d, unconfirmedMask:%v) to for (roomId: %d, playerId:%d)#2", inputsBufferSnapshot.RefRenderFrameId, inputsBufferSnapshot.UnconfirmedMask, pR.Id, playerId))
case inputsBufferSnapshot2 := <-playerSecondaryDownsyncChan:
pR.downsyncPeerInputFrameUpsyncToSinglePlayer(playerId, player, inputsBufferSnapshot2.ToSendInputFrameDownsyncs, inputsBufferSnapshot2.PeerJoinIndex)
//Logger.Info(fmt.Sprintf("Sent secondary inputsBufferSnapshot to for (roomId: %d, playerId:%d)#2", pR.Id, playerId))
default:
}
}
@@ -542,7 +552,8 @@ func (pR *Room) StartBattle() {
Each "playerDownsyncChan" stays alive through out the lifecycle of room instead of each "playerDownsyncSession", i.e. not closed or dereferenced upon disconnection.
*/
pR.PlayerDownsyncChanDict[playerId] = make(chan pb.InputsBufferSnapshot, pR.InputsBuffer.N)
go downsyncLoop(playerId, player, pR.PlayerDownsyncChanDict[playerId])
pR.PlayerSecondaryDownsyncChanDict[playerId] = make(chan pb.InputsBufferSnapshot, pR.InputsBuffer.N)
go downsyncLoop(playerId, player, pR.PlayerDownsyncChanDict[playerId], pR.PlayerSecondaryDownsyncChanDict[playerId])
}
pR.onBattlePrepare(func() {
@@ -599,6 +610,16 @@ func (pR *Room) OnBattleCmdReceived(pReq *pb.WsReq) {
inputsBufferSnapshot := pR.markConfirmationIfApplicable(inputFrameUpsyncBatch, playerId, player)
if nil != inputsBufferSnapshot {
pR.downsyncToAllPlayers(inputsBufferSnapshot)
} else {
// no new all-confirmed
toSendInputFrameDownsyncs := pR.cloneInputsBuffer(inputFrameUpsyncBatch[0].InputFrameId, inputFrameUpsyncBatch[len(inputFrameUpsyncBatch)-1].InputFrameId+1)
inputsBufferSnapshot = &pb.InputsBufferSnapshot{
ToSendInputFrameDownsyncs: toSendInputFrameDownsyncs,
PeerJoinIndex: player.JoinIndex,
}
//Logger.Info(fmt.Sprintf("OnBattleCmdReceived no new all-confirmed: roomId=%v, fromPlayerId=%v, forming peer broadcasting snapshot=%v", pR.Id, playerId, inputsBufferSnapshot))
pR.broadcastPeerUpsyncForBetterPrediction(inputsBufferSnapshot)
}
}
@@ -650,7 +671,7 @@ func (pR *Room) StopBattleForSettlement() {
PlayersArr: toPbPlayers(pR.Players, false),
CountdownNanos: -1, // TODO: Replace this magic constant!
}
pR.sendSafely(&assembledFrame, nil, DOWNSYNC_MSG_ACT_BATTLE_STOPPED, playerId, true)
pR.sendSafely(&assembledFrame, nil, DOWNSYNC_MSG_ACT_BATTLE_STOPPED, playerId, true, MAGIC_JOIN_INDEX_DEFAULT)
}
// Note that `pR.onBattleStoppedForSettlement` will be called by `battleMainLoop`.
}
@@ -679,7 +700,7 @@ func (pR *Room) onBattlePrepare(cb BattleStartCbType) {
Logger.Info("Sending out frame for RoomBattleState.PREPARE:", zap.Any("battleReadyToStartFrame", battleReadyToStartFrame))
for _, player := range pR.Players {
pR.sendSafely(battleReadyToStartFrame, nil, DOWNSYNC_MSG_ACT_BATTLE_READY_TO_START, player.Id, true)
pR.sendSafely(battleReadyToStartFrame, nil, DOWNSYNC_MSG_ACT_BATTLE_READY_TO_START, player.Id, true, MAGIC_JOIN_INDEX_DEFAULT)
}
battlePreparationNanos := int64(6000000000)
@@ -748,6 +769,7 @@ func (pR *Room) OnDismissed() {
pR.CharacterConfigsArr = make([]*battle.CharacterConfig, pR.Capacity)
pR.CollisionSysMap = make(map[int32]*resolv.Object)
pR.PlayerDownsyncSessionDict = make(map[int32]*websocket.Conn)
pR.PlayerSecondaryDownsyncSessionDict = make(map[int32]*websocket.Conn)
for _, oldWatchdog := range pR.PlayerActiveWatchdogDict {
oldWatchdog.Stop()
}
@@ -756,7 +778,12 @@ func (pR *Room) OnDismissed() {
close(oldChan)
}
pR.PlayerDownsyncChanDict = make(map[int32](chan pb.InputsBufferSnapshot))
for _, oldChan := range pR.PlayerSecondaryDownsyncChanDict {
close(oldChan)
}
pR.PlayerSecondaryDownsyncChanDict = make(map[int32](chan pb.InputsBufferSnapshot))
pR.PlayerSignalToCloseDict = make(map[int32]SignalToCloseConnCbType)
pR.PlayerSecondarySignalToCloseDict = make(map[int32]SignalToCloseConnCbType)
pR.JoinIndexBooleanArr = make([]bool, pR.Capacity)
pR.RenderCacheSize = 1024
pR.RenderFrameBuffer = battle.NewRingBuffer(pR.RenderCacheSize)
@@ -799,19 +826,24 @@ func (pR *Room) OnDismissed() {
}
func (pR *Room) expelPlayerDuringGame(playerId int32) {
if signalToCloseConnOfThisPlayer, existent := pR.PlayerSignalToCloseDict[playerId]; existent {
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, "") // TODO: Specify an error code
}
pR.signalToCloseAllSessionsOfPlayer(playerId, Constants.RetCode.UnknownError)
pR.onPlayerExpelledDuringGame(playerId)
}
func (pR *Room) expelPlayerForDismissal(playerId int32) {
if signalToCloseConnOfThisPlayer, existent := pR.PlayerSignalToCloseDict[playerId]; existent {
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, "") // TODO: Specify an error code
}
pR.signalToCloseAllSessionsOfPlayer(playerId, Constants.RetCode.UnknownError)
pR.onPlayerExpelledForDismissal(playerId)
}
func (pR *Room) signalToCloseAllSessionsOfPlayer(playerId int32, retCode int) {
if signalToCloseConnOfThisPlayer, existent := pR.PlayerSignalToCloseDict[playerId]; existent {
signalToCloseConnOfThisPlayer(retCode, "") // TODO: Specify an error code
}
if signalToCloseConnOfThisPlayer2, existent2 := pR.PlayerSecondarySignalToCloseDict[playerId]; existent2 {
signalToCloseConnOfThisPlayer2(retCode, "") // TODO: Specify an error code
}
}
func (pR *Room) onPlayerExpelledDuringGame(playerId int32) {
pR.onPlayerLost(playerId)
}
@@ -829,6 +861,10 @@ func (pR *Room) OnPlayerDisconnected(playerId int32) {
}
}()
if signalToCloseConnOfThisPlayer2, existent2 := pR.PlayerSecondarySignalToCloseDict[playerId]; existent2 {
signalToCloseConnOfThisPlayer2(Constants.RetCode.UnknownError, "") // TODO: Specify an error code
}
if player, existent := pR.Players[playerId]; existent {
thatPlayerBattleState := atomic.LoadInt32(&(player.BattleState))
switch thatPlayerBattleState {
@@ -888,6 +924,8 @@ func (pR *Room) clearPlayerNetworkSession(playerId int32) {
delete(pR.PlayerActiveWatchdogDict, playerId)
delete(pR.PlayerDownsyncSessionDict, playerId)
delete(pR.PlayerSignalToCloseDict, playerId)
delete(pR.PlayerSecondaryDownsyncSessionDict, playerId)
delete(pR.PlayerSecondarySignalToCloseDict, playerId)
}
}
@@ -977,7 +1015,7 @@ func (pR *Room) OnPlayerBattleColliderAcked(playerId int32) bool {
Logger.Debug(fmt.Sprintf("OnPlayerBattleColliderAcked-middle: roomId=%v, roomState=%v, targetPlayerId=%v, targetPlayerBattleState=%v, thatPlayerId=%v, thatPlayerBattleState=%v", pR.Id, pR.State, targetPlayer.Id, targetPlayer.BattleState, thatPlayer.Id, thatPlayerBattleState))
if thatPlayerId == targetPlayer.Id || (PlayerBattleStateIns.ADDED_PENDING_BATTLE_COLLIDER_ACK == thatPlayerBattleState || PlayerBattleStateIns.ACTIVE == thatPlayerBattleState) {
Logger.Debug(fmt.Sprintf("OnPlayerBattleColliderAcked-sending DOWNSYNC_MSG_ACT_PLAYER_ADDED_AND_ACKED: roomId=%v, roomState=%v, targetPlayerId=%v, targetPlayerBattleState=%v, capacity=%v, EffectivePlayerCount=%v", pR.Id, pR.State, targetPlayer.Id, targetPlayer.BattleState, pR.Capacity, pR.EffectivePlayerCount))
pR.sendSafely(playerAckedFrame, nil, DOWNSYNC_MSG_ACT_PLAYER_ADDED_AND_ACKED, thatPlayer.Id, true)
pR.sendSafely(playerAckedFrame, nil, DOWNSYNC_MSG_ACT_PLAYER_ADDED_AND_ACKED, thatPlayer.Id, true, MAGIC_JOIN_INDEX_DEFAULT)
}
}
atomic.StoreInt32(&(targetPlayer.BattleState), PlayerBattleStateIns.ACTIVE)
@@ -1010,28 +1048,43 @@ func (pR *Room) OnPlayerBattleColliderAcked(playerId int32) bool {
return true
}
func (pR *Room) sendSafely(roomDownsyncFrame *pb.RoomDownsyncFrame, toSendInputFrameDownsyncs []*pb.InputFrameDownsync, act int32, playerId int32, needLockExplicitly bool) {
func (pR *Room) sendSafely(roomDownsyncFrame *pb.RoomDownsyncFrame, toSendInputFrameDownsyncs []*pb.InputFrameDownsync, act int32, playerId int32, needLockExplicitly bool, peerJoinIndex int32) {
defer func() {
if r := recover(); r != nil {
Logger.Error("sendSafely, recovered from: ", zap.Any("roomId", pR.Id), zap.Any("playerId", playerId), zap.Any("panic", r))
}
}()
if playerDownsyncSession, existent := pR.PlayerDownsyncSessionDict[playerId]; existent {
pResp := &pb.WsResp{
Ret: int32(Constants.RetCode.Ok),
Act: act,
Rdf: roomDownsyncFrame,
InputFrameDownsyncBatch: toSendInputFrameDownsyncs,
}
pResp := &pb.WsResp{
Ret: int32(Constants.RetCode.Ok),
Act: act,
Rdf: roomDownsyncFrame,
InputFrameDownsyncBatch: toSendInputFrameDownsyncs,
PeerJoinIndex: peerJoinIndex,
}
theBytes, marshalErr := proto.Marshal(pResp)
if nil != marshalErr {
panic(fmt.Sprintf("Error marshaling downsync message: roomId=%v, playerId=%v, roomState=%v, roomEffectivePlayerCount=%v", pR.Id, playerId, pR.State, pR.EffectivePlayerCount))
}
theBytes, marshalErr := proto.Marshal(pResp)
if nil != marshalErr {
panic(fmt.Sprintf("Error marshaling downsync message: roomId=%v, playerId=%v, roomState=%v, roomEffectivePlayerCount=%v", pR.Id, playerId, pR.State, pR.EffectivePlayerCount))
}
if err := playerDownsyncSession.WriteMessage(websocket.BinaryMessage, theBytes); nil != err {
panic(fmt.Sprintf("Error sending downsync message: roomId=%v, playerId=%v, roomState=%v, roomEffectivePlayerCount=%v, err=%v", pR.Id, playerId, pR.State, pR.EffectivePlayerCount, err))
if MAGIC_JOIN_INDEX_DEFAULT == peerJoinIndex {
if playerDownsyncSession, existent := pR.PlayerDownsyncSessionDict[playerId]; existent {
if err := playerDownsyncSession.WriteMessage(websocket.BinaryMessage, theBytes); nil != err {
panic(fmt.Sprintf("Error sending primary downsync message: roomId=%v, playerId=%v, roomState=%v, roomEffectivePlayerCount=%v, err=%v", pR.Id, playerId, pR.State, pR.EffectivePlayerCount, err))
}
}
} else {
/*
[FIXME]
This branch is preferred to use an additional session of each player for sending, and the session is preferrably UDP instead of any TCP-based protocol, but I'm being lazy here.
See `<proj-root>/ConcerningEdgeCases.md` for the advantage of using UDP as a supplement.
*/
if playerSecondaryDownsyncSession, existent := pR.PlayerSecondaryDownsyncSessionDict[playerId]; existent {
if err := playerSecondaryDownsyncSession.WriteMessage(websocket.BinaryMessage, theBytes); nil != err {
panic(fmt.Sprintf("Error sending secondary downsync message: roomId=%v, playerId=%v, roomState=%v, roomEffectivePlayerCount=%v, err=%v", pR.Id, playerId, pR.State, pR.EffectivePlayerCount, err))
}
}
}
}
@@ -1347,6 +1400,30 @@ func (pR *Room) doBattleMainLoopPerTickBackendDynamicsWithProperLocking(prevRend
}
}
func (pR *Room) broadcastPeerUpsyncForBetterPrediction(inputsBufferSnapshot *pb.InputsBufferSnapshot) {
// See `<proj-root>/ConcerningEdgeCases.md` for why this method exists.
for _, player := range pR.PlayersArr {
playerBattleState := atomic.LoadInt32(&(player.BattleState))
switch playerBattleState {
case PlayerBattleStateIns.DISCONNECTED, PlayerBattleStateIns.LOST, PlayerBattleStateIns.EXPELLED_DURING_GAME, PlayerBattleStateIns.EXPELLED_IN_DISMISSAL, PlayerBattleStateIns.ADDED_PENDING_BATTLE_COLLIDER_ACK, PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK:
continue
}
if player.JoinIndex == inputsBufferSnapshot.PeerJoinIndex {
continue
}
if playerSecondaryDownsyncChan, existent := pR.PlayerSecondaryDownsyncChanDict[player.Id]; existent {
/*
[FIXME]
This function is preferred to use an additional go-channel of each player for sending, see "downsyncLoop" & "Room.sendSafely" for more information!
*/
playerSecondaryDownsyncChan <- (*inputsBufferSnapshot)
} else {
Logger.Warn(fmt.Sprintf("playerDownsyncChan for (roomId: %d, playerId:%d) is gone", pR.Id, player.Id))
}
}
}
func (pR *Room) downsyncToAllPlayers(inputsBufferSnapshot *pb.InputsBufferSnapshot) {
/*
[WARNING] This function MUST BE called while "pR.InputsBufferLock" is LOCKED to **preserve the order of generation of "inputsBufferSnapshot" for sending** -- see comments in "OnBattleCmdReceived" and [this issue](https://github.com/genxium/DelayNoMore/issues/12).
@@ -1423,14 +1500,14 @@ func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, refRender
We hereby assume that Golang runtime allocates & frees small amount of RAM quickly enough compared to either network I/O blocking in worst cases or the high frequency "per inputFrameDownsync*player" locking (though "OnBattleCmdReceived" locks at the same frequency but it's inevitable).
*/
playerJoinIndex := player.JoinIndex - 1
playerJoinIndexInBooleanArr := player.JoinIndex - 1
playerBattleState := atomic.LoadInt32(&(player.BattleState))
switch playerBattleState {
case PlayerBattleStateIns.DISCONNECTED, PlayerBattleStateIns.LOST, PlayerBattleStateIns.EXPELLED_DURING_GAME, PlayerBattleStateIns.EXPELLED_IN_DISMISSAL, PlayerBattleStateIns.ADDED_PENDING_BATTLE_COLLIDER_ACK, PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK:
return
}
isSlowTicker := (0 < (unconfirmedMask & uint64(1<<uint32(playerJoinIndex))))
isSlowTicker := (0 < (unconfirmedMask & uint64(1<<uint32(playerJoinIndexInBooleanArr))))
shouldResync1 := (PlayerBattleStateIns.READDED_BATTLE_COLLIDER_ACKED == playerBattleState) // i.e. implies that "MAGIC_LAST_SENT_INPUT_FRAME_ID_READDED == player.LastSentInputFrameId"
shouldResync2 := isSlowTicker // This condition is critical, if we don't send resync upon this condition, the "reconnected or slowly-clocking player" might never get its input synced
shouldResync3 := shouldForceResync
@@ -1455,13 +1532,13 @@ func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, refRender
refRenderFrame.BackendUnconfirmedMask = unconfirmedMask
pbRefRenderFrame := toPbRoomDownsyncFrame(refRenderFrame)
pbRefRenderFrame.SpeciesIdList = pR.SpeciesIdList
pR.sendSafely(pbRefRenderFrame, toSendInputFrameDownsyncsSnapshot, DOWNSYNC_MSG_ACT_FORCED_RESYNC, playerId, false)
pR.sendSafely(pbRefRenderFrame, toSendInputFrameDownsyncsSnapshot, DOWNSYNC_MSG_ACT_FORCED_RESYNC, playerId, false, MAGIC_JOIN_INDEX_DEFAULT)
//Logger.Warn(fmt.Sprintf("Sent refRenderFrameId=%v & inputFrameIds [%d, %d), for roomId=%v, playerId=%d, playerJoinIndex=%d, renderFrameId=%d, curDynamicsRenderFrameId=%d, playerLastSentInputFrameId=%d: InputsBuffer=%v", refRenderFrameId, toSendInputFrameIdSt, toSendInputFrameIdEd, pR.Id, playerId, player.JoinIndex, pR.RenderFrameId, pR.CurDynamicsRenderFrameId, player.LastSentInputFrameId, pR.InputsBufferString(false)))
if shouldResync1 || shouldResync3 {
Logger.Debug(fmt.Sprintf("Sent refRenderFrameId=%v & inputFrameIds [%d, %d), for roomId=%v, playerId=%d, playerJoinIndex=%d, renderFrameId=%d, curDynamicsRenderFrameId=%d, playerLastSentInputFrameId=%d: shouldResync1=%v, shouldResync2=%v, shouldResync3=%v, playerBattleState=%d", refRenderFrameId, toSendInputFrameIdSt, toSendInputFrameIdEd, pR.Id, playerId, player.JoinIndex, pR.RenderFrameId, pR.CurDynamicsRenderFrameId, player.LastSentInputFrameId, shouldResync1, shouldResync2, shouldResync3, playerBattleState))
}
} else {
pR.sendSafely(nil, toSendInputFrameDownsyncsSnapshot, DOWNSYNC_MSG_ACT_INPUT_BATCH, playerId, false)
pR.sendSafely(nil, toSendInputFrameDownsyncsSnapshot, DOWNSYNC_MSG_ACT_INPUT_BATCH, playerId, false, MAGIC_JOIN_INDEX_DEFAULT)
}
player.LastSentInputFrameId = toSendInputFrameIdEd - 1
if shouldResync1 {
@@ -1469,6 +1546,16 @@ func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, refRender
}
}
func (pR *Room) downsyncPeerInputFrameUpsyncToSinglePlayer(playerId int32, player *Player, toSendInputFrameDownsyncsSnapshot []*pb.InputFrameDownsync, peerJoinIndex int32) {
playerBattleState := atomic.LoadInt32(&(player.BattleState))
switch playerBattleState {
case PlayerBattleStateIns.DISCONNECTED, PlayerBattleStateIns.LOST, PlayerBattleStateIns.EXPELLED_DURING_GAME, PlayerBattleStateIns.EXPELLED_IN_DISMISSAL, PlayerBattleStateIns.ADDED_PENDING_BATTLE_COLLIDER_ACK, PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK:
return
}
pR.sendSafely(nil, toSendInputFrameDownsyncsSnapshot, DOWNSYNC_MSG_ACT_PEER_INPUT_BATCH, playerId, false, peerJoinIndex)
}
func (pR *Room) cloneInputsBuffer(stFrameId, edFrameId int32) []*pb.InputFrameDownsync {
// [WARNING] This function MUST BE called while "pR.InputsBufferLock" is locked!
cloned := make([]*pb.InputFrameDownsync, 0, edFrameId-stFrameId)
@@ -1501,3 +1588,22 @@ func (pR *Room) cloneInputsBuffer(stFrameId, edFrameId int32) []*pb.InputFrameDo
return cloned
}
func (pR *Room) SetSecondarySession(playerId int32, session *websocket.Conn, signalToCloseConnOfThisPlayer SignalToCloseConnCbType) {
// TODO: Use a dedicated lock
if player, ok := pR.Players[playerId]; ok {
playerBattleState := atomic.LoadInt32(&(player.BattleState))
switch playerBattleState {
case PlayerBattleStateIns.DISCONNECTED, PlayerBattleStateIns.LOST, PlayerBattleStateIns.EXPELLED_DURING_GAME, PlayerBattleStateIns.EXPELLED_IN_DISMISSAL:
// Kindly note that "PlayerBattleStateIns.ADDED_PENDING_BATTLE_COLLIDER_ACK, PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK" are allowed
return
}
if _, existent := pR.PlayerDownsyncSessionDict[playerId]; existent {
if _, existent2 := pR.PlayerSecondaryDownsyncSessionDict[playerId]; !existent2 {
Logger.Info(fmt.Sprintf("SetSecondarySession for roomId=%v, playerId=%d, pR.Players=%v", pR.Id, playerId, pR.Players))
pR.PlayerSecondaryDownsyncSessionDict[playerId] = session
pR.PlayerSecondarySignalToCloseDict[playerId] = signalToCloseConnOfThisPlayer
}
}
}
}