From b81c4701350b80f4cbeab9170427ac71f041eec6 Mon Sep 17 00:00:00 2001 From: genxium Date: Wed, 18 Jan 2023 15:36:04 +0800 Subject: [PATCH] Drafted peer inputFrameUpsync broadcasting mechanism. --- battle_srv/main.go | 1 + battle_srv/models/room.go | 198 +++++-- battle_srv/protos/room_downsync_frame.pb.go | 560 +++++++++--------- battle_srv/ws/serve.go | 87 +++ .../pbfiles/room_downsync_frame.proto | 3 + frontend/assets/scenes/login.fire | 2 +- frontend/assets/scripts/Map.js | 27 + frontend/assets/scripts/WsSessionMgr.js | 57 +- ...om_downsync_frame_proto_bundle.forcemsg.js | 69 +++ 9 files changed, 690 insertions(+), 314 deletions(-) diff --git a/battle_srv/main.go b/battle_srv/main.go index 25653ed..739835b 100644 --- a/battle_srv/main.go +++ b/battle_srv/main.go @@ -89,6 +89,7 @@ func setRouter(router *gin.Engine) { router.StaticFS("/asset", http.Dir(filepath.Join(Conf.General.AppRoot, "asset"))) router.GET("/ping", f) router.GET("/tsrht", ws.Serve) + router.GET("/tsrhtSecondary", ws.HandleSecondaryWsSessionForPlayer) apiRouter := router.Group("/api") { diff --git a/battle_srv/models/room.go b/battle_srv/models/room.go index 6c8c7ef..041456a 100644 --- a/battle_srv/models/room.go +++ b/battle_srv/models/room.go @@ -27,10 +27,11 @@ const ( UPSYNC_MSG_ACT_PLAYER_CMD = int32(2) UPSYNC_MSG_ACT_PLAYER_COLLIDER_ACK = int32(3) - DOWNSYNC_MSG_ACT_HB_REQ = int32(1) - DOWNSYNC_MSG_ACT_INPUT_BATCH = int32(2) - DOWNSYNC_MSG_ACT_BATTLE_STOPPED = int32(3) - DOWNSYNC_MSG_ACT_FORCED_RESYNC = int32(4) + DOWNSYNC_MSG_ACT_HB_REQ = int32(1) + DOWNSYNC_MSG_ACT_INPUT_BATCH = int32(2) + DOWNSYNC_MSG_ACT_BATTLE_STOPPED = int32(3) + DOWNSYNC_MSG_ACT_FORCED_RESYNC = int32(4) + DOWNSYNC_MSG_ACT_PEER_INPUT_BATCH = int32(5) DOWNSYNC_MSG_ACT_BATTLE_READY_TO_START = int32(-1) DOWNSYNC_MSG_ACT_BATTLE_START = int32(0) @@ -116,10 +117,15 @@ type Room struct { * * Moreover, during the invocation of `PlayerSignalToCloseDict`, the `Player` instance is supposed to be deallocated (though not synchronously). */ - PlayerDownsyncSessionDict map[int32]*websocket.Conn - PlayerDownsyncChanDict map[int32](chan pb.InputsBufferSnapshot) + PlayerDownsyncSessionDict map[int32]*websocket.Conn + PlayerSignalToCloseDict map[int32]SignalToCloseConnCbType + PlayerDownsyncChanDict map[int32](chan pb.InputsBufferSnapshot) + + PlayerSecondaryDownsyncSessionDict map[int32]*websocket.Conn + PlayerSecondarySignalToCloseDict map[int32]SignalToCloseConnCbType + PlayerSecondaryDownsyncChanDict map[int32](chan pb.InputsBufferSnapshot) + PlayerActiveWatchdogDict map[int32](*Watchdog) - PlayerSignalToCloseDict map[int32]SignalToCloseConnCbType Score float32 State int32 Index int @@ -184,7 +190,7 @@ func (pR *Room) AddPlayerIfPossible(pPlayerFromDbInit *Player, session *websocke pR.PlayerSignalToCloseDict[playerId] = signalToCloseConnOfThisPlayer newWatchdog := NewWatchdog(ConstVals.Ws.WillKickIfInactiveFor, func() { Logger.Warn("Conn inactive watchdog triggered#1:", zap.Any("playerId", playerId), zap.Any("roomId", pR.Id), zap.Any("roomState", pR.State), zap.Any("roomEffectivePlayerCount", pR.EffectivePlayerCount)) - signalToCloseConnOfThisPlayer(Constants.RetCode.ActiveWatchdog, "") + pR.signalToCloseAllSessionsOfPlayer(playerId, Constants.RetCode.ActiveWatchdog) }) newWatchdog.Stop() pR.PlayerActiveWatchdogDict[playerId] = newWatchdog @@ -221,7 +227,7 @@ func (pR *Room) ReAddPlayerIfPossible(pTmpPlayerInstance *Player, session *webso pR.PlayerSignalToCloseDict[playerId] = signalToCloseConnOfThisPlayer pR.PlayerActiveWatchdogDict[playerId] = NewWatchdog(ConstVals.Ws.WillKickIfInactiveFor, func() { Logger.Warn("Conn inactive watchdog triggered#2:", zap.Any("playerId", playerId), zap.Any("roomId", pR.Id), zap.Any("roomState", pR.State), zap.Any("roomEffectivePlayerCount", pR.EffectivePlayerCount)) - signalToCloseConnOfThisPlayer(Constants.RetCode.ActiveWatchdog, "") + pR.signalToCloseAllSessionsOfPlayer(playerId, Constants.RetCode.ActiveWatchdog) }) // For ReAdded player the new watchdog starts immediately Logger.Warn("ReAddPlayerIfPossible finished.", zap.Any("roomId", pR.Id), zap.Any("playerId", playerId), zap.Any("joinIndex", pEffectiveInRoomPlayerInstance.JoinIndex), zap.Any("playerBattleState", pEffectiveInRoomPlayerInstance.BattleState), zap.Any("roomState", pR.State), zap.Any("roomEffectivePlayerCount", pR.EffectivePlayerCount), zap.Any("AckingFrameId", pEffectiveInRoomPlayerInstance.AckingFrameId), zap.Any("AckingInputFrameId", pEffectiveInRoomPlayerInstance.AckingInputFrameId), zap.Any("LastSentInputFrameId", pEffectiveInRoomPlayerInstance.LastSentInputFrameId)) @@ -482,7 +488,7 @@ func (pR *Room) StartBattle() { kickoffFrameJs := pR.RenderFrameBuffer.GetByFrameId(0).(*battle.RoomDownsyncFrame) pbKickOffRenderFrame := toPbRoomDownsyncFrame(kickoffFrameJs) pbKickOffRenderFrame.SpeciesIdList = pR.SpeciesIdList - pR.sendSafely(pbKickOffRenderFrame, nil, DOWNSYNC_MSG_ACT_BATTLE_START, playerId, true) + pR.sendSafely(pbKickOffRenderFrame, nil, DOWNSYNC_MSG_ACT_BATTLE_START, playerId, true, MAGIC_JOIN_INDEX_DEFAULT) } Logger.Info(fmt.Sprintf("In `battleMainLoop` for roomId=%v sent out kickoffFrame", pR.Id)) } @@ -509,7 +515,7 @@ func (pR *Room) StartBattle() { } } - downsyncLoop := func(playerId int32, player *Player, playerDownsyncChan chan pb.InputsBufferSnapshot) { + downsyncLoop := func(playerId int32, player *Player, playerDownsyncChan chan pb.InputsBufferSnapshot, playerSecondaryDownsyncChan chan pb.InputsBufferSnapshot) { defer func() { if r := recover(); r != nil { Logger.Error("downsyncLoop, recovery spot#1, recovered from: ", zap.Any("roomId", pR.Id), zap.Any("playerId", playerId), zap.Any("panic", r)) @@ -517,19 +523,23 @@ func (pR *Room) StartBattle() { Logger.Info(fmt.Sprintf("The `downsyncLoop` for (roomId=%v, playerId=%v) is stopped@renderFrameId=%v", pR.Id, playerId, pR.RenderFrameId)) }() - Logger.Debug(fmt.Sprintf("Started downsyncLoop for (roomId: %d, playerId:%d, playerDownsyncChan:%p)", pR.Id, playerId, playerDownsyncChan)) + //Logger.Info(fmt.Sprintf("Started downsyncLoop for (roomId: %d, playerId:%d, playerDownsyncChan:%p)", pR.Id, playerId, playerDownsyncChan)) for { + nowBattleState := atomic.LoadInt32(&pR.State) + switch nowBattleState { + case RoomBattleStateIns.IDLE, RoomBattleStateIns.STOPPING_BATTLE_FOR_SETTLEMENT, RoomBattleStateIns.IN_SETTLEMENT, RoomBattleStateIns.IN_DISMISSAL: + Logger.Warn(fmt.Sprintf("Battle is not waiting/preparing/active for playerDownsyncChan for (roomId: %d, playerId:%d)", pR.Id, playerId)) + return + } + select { case inputsBufferSnapshot := <-playerDownsyncChan: - nowBattleState := atomic.LoadInt32(&pR.State) - switch nowBattleState { - case RoomBattleStateIns.IDLE, RoomBattleStateIns.STOPPING_BATTLE_FOR_SETTLEMENT, RoomBattleStateIns.IN_SETTLEMENT, RoomBattleStateIns.IN_DISMISSAL: - Logger.Warn(fmt.Sprintf("Battle is not waiting/preparing/active for playerDownsyncChan for (roomId: %d, playerId:%d)", pR.Id, playerId)) - return - } pR.downsyncToSinglePlayer(playerId, player, inputsBufferSnapshot.RefRenderFrameId, inputsBufferSnapshot.UnconfirmedMask, inputsBufferSnapshot.ToSendInputFrameDownsyncs, inputsBufferSnapshot.ShouldForceResync) //Logger.Info(fmt.Sprintf("Sent inputsBufferSnapshot(refRenderFrameId:%d, unconfirmedMask:%v) to for (roomId: %d, playerId:%d)#2", inputsBufferSnapshot.RefRenderFrameId, inputsBufferSnapshot.UnconfirmedMask, pR.Id, playerId)) + case inputsBufferSnapshot2 := <-playerSecondaryDownsyncChan: + pR.downsyncPeerInputFrameUpsyncToSinglePlayer(playerId, player, inputsBufferSnapshot2.ToSendInputFrameDownsyncs, inputsBufferSnapshot2.PeerJoinIndex) + //Logger.Info(fmt.Sprintf("Sent secondary inputsBufferSnapshot to for (roomId: %d, playerId:%d)#2", pR.Id, playerId)) default: } } @@ -542,7 +552,8 @@ func (pR *Room) StartBattle() { Each "playerDownsyncChan" stays alive through out the lifecycle of room instead of each "playerDownsyncSession", i.e. not closed or dereferenced upon disconnection. */ pR.PlayerDownsyncChanDict[playerId] = make(chan pb.InputsBufferSnapshot, pR.InputsBuffer.N) - go downsyncLoop(playerId, player, pR.PlayerDownsyncChanDict[playerId]) + pR.PlayerSecondaryDownsyncChanDict[playerId] = make(chan pb.InputsBufferSnapshot, pR.InputsBuffer.N) + go downsyncLoop(playerId, player, pR.PlayerDownsyncChanDict[playerId], pR.PlayerSecondaryDownsyncChanDict[playerId]) } pR.onBattlePrepare(func() { @@ -599,6 +610,16 @@ func (pR *Room) OnBattleCmdReceived(pReq *pb.WsReq) { inputsBufferSnapshot := pR.markConfirmationIfApplicable(inputFrameUpsyncBatch, playerId, player) if nil != inputsBufferSnapshot { pR.downsyncToAllPlayers(inputsBufferSnapshot) + } else { + // no new all-confirmed + toSendInputFrameDownsyncs := pR.cloneInputsBuffer(inputFrameUpsyncBatch[0].InputFrameId, inputFrameUpsyncBatch[len(inputFrameUpsyncBatch)-1].InputFrameId+1) + + inputsBufferSnapshot = &pb.InputsBufferSnapshot{ + ToSendInputFrameDownsyncs: toSendInputFrameDownsyncs, + PeerJoinIndex: player.JoinIndex, + } + //Logger.Info(fmt.Sprintf("OnBattleCmdReceived no new all-confirmed: roomId=%v, fromPlayerId=%v, forming peer broadcasting snapshot=%v", pR.Id, playerId, inputsBufferSnapshot)) + pR.broadcastPeerUpsyncForBetterPrediction(inputsBufferSnapshot) } } @@ -650,7 +671,7 @@ func (pR *Room) StopBattleForSettlement() { PlayersArr: toPbPlayers(pR.Players, false), CountdownNanos: -1, // TODO: Replace this magic constant! } - pR.sendSafely(&assembledFrame, nil, DOWNSYNC_MSG_ACT_BATTLE_STOPPED, playerId, true) + pR.sendSafely(&assembledFrame, nil, DOWNSYNC_MSG_ACT_BATTLE_STOPPED, playerId, true, MAGIC_JOIN_INDEX_DEFAULT) } // Note that `pR.onBattleStoppedForSettlement` will be called by `battleMainLoop`. } @@ -679,7 +700,7 @@ func (pR *Room) onBattlePrepare(cb BattleStartCbType) { Logger.Info("Sending out frame for RoomBattleState.PREPARE:", zap.Any("battleReadyToStartFrame", battleReadyToStartFrame)) for _, player := range pR.Players { - pR.sendSafely(battleReadyToStartFrame, nil, DOWNSYNC_MSG_ACT_BATTLE_READY_TO_START, player.Id, true) + pR.sendSafely(battleReadyToStartFrame, nil, DOWNSYNC_MSG_ACT_BATTLE_READY_TO_START, player.Id, true, MAGIC_JOIN_INDEX_DEFAULT) } battlePreparationNanos := int64(6000000000) @@ -748,6 +769,7 @@ func (pR *Room) OnDismissed() { pR.CharacterConfigsArr = make([]*battle.CharacterConfig, pR.Capacity) pR.CollisionSysMap = make(map[int32]*resolv.Object) pR.PlayerDownsyncSessionDict = make(map[int32]*websocket.Conn) + pR.PlayerSecondaryDownsyncSessionDict = make(map[int32]*websocket.Conn) for _, oldWatchdog := range pR.PlayerActiveWatchdogDict { oldWatchdog.Stop() } @@ -756,7 +778,12 @@ func (pR *Room) OnDismissed() { close(oldChan) } pR.PlayerDownsyncChanDict = make(map[int32](chan pb.InputsBufferSnapshot)) + for _, oldChan := range pR.PlayerSecondaryDownsyncChanDict { + close(oldChan) + } + pR.PlayerSecondaryDownsyncChanDict = make(map[int32](chan pb.InputsBufferSnapshot)) pR.PlayerSignalToCloseDict = make(map[int32]SignalToCloseConnCbType) + pR.PlayerSecondarySignalToCloseDict = make(map[int32]SignalToCloseConnCbType) pR.JoinIndexBooleanArr = make([]bool, pR.Capacity) pR.RenderCacheSize = 1024 pR.RenderFrameBuffer = battle.NewRingBuffer(pR.RenderCacheSize) @@ -799,19 +826,24 @@ func (pR *Room) OnDismissed() { } func (pR *Room) expelPlayerDuringGame(playerId int32) { - if signalToCloseConnOfThisPlayer, existent := pR.PlayerSignalToCloseDict[playerId]; existent { - signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, "") // TODO: Specify an error code - } + pR.signalToCloseAllSessionsOfPlayer(playerId, Constants.RetCode.UnknownError) pR.onPlayerExpelledDuringGame(playerId) } func (pR *Room) expelPlayerForDismissal(playerId int32) { - if signalToCloseConnOfThisPlayer, existent := pR.PlayerSignalToCloseDict[playerId]; existent { - signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, "") // TODO: Specify an error code - } + pR.signalToCloseAllSessionsOfPlayer(playerId, Constants.RetCode.UnknownError) pR.onPlayerExpelledForDismissal(playerId) } +func (pR *Room) signalToCloseAllSessionsOfPlayer(playerId int32, retCode int) { + if signalToCloseConnOfThisPlayer, existent := pR.PlayerSignalToCloseDict[playerId]; existent { + signalToCloseConnOfThisPlayer(retCode, "") // TODO: Specify an error code + } + if signalToCloseConnOfThisPlayer2, existent2 := pR.PlayerSecondarySignalToCloseDict[playerId]; existent2 { + signalToCloseConnOfThisPlayer2(retCode, "") // TODO: Specify an error code + } +} + func (pR *Room) onPlayerExpelledDuringGame(playerId int32) { pR.onPlayerLost(playerId) } @@ -829,6 +861,10 @@ func (pR *Room) OnPlayerDisconnected(playerId int32) { } }() + if signalToCloseConnOfThisPlayer2, existent2 := pR.PlayerSecondarySignalToCloseDict[playerId]; existent2 { + signalToCloseConnOfThisPlayer2(Constants.RetCode.UnknownError, "") // TODO: Specify an error code + } + if player, existent := pR.Players[playerId]; existent { thatPlayerBattleState := atomic.LoadInt32(&(player.BattleState)) switch thatPlayerBattleState { @@ -888,6 +924,8 @@ func (pR *Room) clearPlayerNetworkSession(playerId int32) { delete(pR.PlayerActiveWatchdogDict, playerId) delete(pR.PlayerDownsyncSessionDict, playerId) delete(pR.PlayerSignalToCloseDict, playerId) + delete(pR.PlayerSecondaryDownsyncSessionDict, playerId) + delete(pR.PlayerSecondarySignalToCloseDict, playerId) } } @@ -977,7 +1015,7 @@ func (pR *Room) OnPlayerBattleColliderAcked(playerId int32) bool { Logger.Debug(fmt.Sprintf("OnPlayerBattleColliderAcked-middle: roomId=%v, roomState=%v, targetPlayerId=%v, targetPlayerBattleState=%v, thatPlayerId=%v, thatPlayerBattleState=%v", pR.Id, pR.State, targetPlayer.Id, targetPlayer.BattleState, thatPlayer.Id, thatPlayerBattleState)) if thatPlayerId == targetPlayer.Id || (PlayerBattleStateIns.ADDED_PENDING_BATTLE_COLLIDER_ACK == thatPlayerBattleState || PlayerBattleStateIns.ACTIVE == thatPlayerBattleState) { Logger.Debug(fmt.Sprintf("OnPlayerBattleColliderAcked-sending DOWNSYNC_MSG_ACT_PLAYER_ADDED_AND_ACKED: roomId=%v, roomState=%v, targetPlayerId=%v, targetPlayerBattleState=%v, capacity=%v, EffectivePlayerCount=%v", pR.Id, pR.State, targetPlayer.Id, targetPlayer.BattleState, pR.Capacity, pR.EffectivePlayerCount)) - pR.sendSafely(playerAckedFrame, nil, DOWNSYNC_MSG_ACT_PLAYER_ADDED_AND_ACKED, thatPlayer.Id, true) + pR.sendSafely(playerAckedFrame, nil, DOWNSYNC_MSG_ACT_PLAYER_ADDED_AND_ACKED, thatPlayer.Id, true, MAGIC_JOIN_INDEX_DEFAULT) } } atomic.StoreInt32(&(targetPlayer.BattleState), PlayerBattleStateIns.ACTIVE) @@ -1010,28 +1048,43 @@ func (pR *Room) OnPlayerBattleColliderAcked(playerId int32) bool { return true } -func (pR *Room) sendSafely(roomDownsyncFrame *pb.RoomDownsyncFrame, toSendInputFrameDownsyncs []*pb.InputFrameDownsync, act int32, playerId int32, needLockExplicitly bool) { +func (pR *Room) sendSafely(roomDownsyncFrame *pb.RoomDownsyncFrame, toSendInputFrameDownsyncs []*pb.InputFrameDownsync, act int32, playerId int32, needLockExplicitly bool, peerJoinIndex int32) { defer func() { if r := recover(); r != nil { Logger.Error("sendSafely, recovered from: ", zap.Any("roomId", pR.Id), zap.Any("playerId", playerId), zap.Any("panic", r)) } }() - if playerDownsyncSession, existent := pR.PlayerDownsyncSessionDict[playerId]; existent { - pResp := &pb.WsResp{ - Ret: int32(Constants.RetCode.Ok), - Act: act, - Rdf: roomDownsyncFrame, - InputFrameDownsyncBatch: toSendInputFrameDownsyncs, - } + pResp := &pb.WsResp{ + Ret: int32(Constants.RetCode.Ok), + Act: act, + Rdf: roomDownsyncFrame, + InputFrameDownsyncBatch: toSendInputFrameDownsyncs, + PeerJoinIndex: peerJoinIndex, + } - theBytes, marshalErr := proto.Marshal(pResp) - if nil != marshalErr { - panic(fmt.Sprintf("Error marshaling downsync message: roomId=%v, playerId=%v, roomState=%v, roomEffectivePlayerCount=%v", pR.Id, playerId, pR.State, pR.EffectivePlayerCount)) - } + theBytes, marshalErr := proto.Marshal(pResp) + if nil != marshalErr { + panic(fmt.Sprintf("Error marshaling downsync message: roomId=%v, playerId=%v, roomState=%v, roomEffectivePlayerCount=%v", pR.Id, playerId, pR.State, pR.EffectivePlayerCount)) + } - if err := playerDownsyncSession.WriteMessage(websocket.BinaryMessage, theBytes); nil != err { - panic(fmt.Sprintf("Error sending downsync message: roomId=%v, playerId=%v, roomState=%v, roomEffectivePlayerCount=%v, err=%v", pR.Id, playerId, pR.State, pR.EffectivePlayerCount, err)) + if MAGIC_JOIN_INDEX_DEFAULT == peerJoinIndex { + if playerDownsyncSession, existent := pR.PlayerDownsyncSessionDict[playerId]; existent { + if err := playerDownsyncSession.WriteMessage(websocket.BinaryMessage, theBytes); nil != err { + panic(fmt.Sprintf("Error sending primary downsync message: roomId=%v, playerId=%v, roomState=%v, roomEffectivePlayerCount=%v, err=%v", pR.Id, playerId, pR.State, pR.EffectivePlayerCount, err)) + } + } + } else { + /* + [FIXME] + This branch is preferred to use an additional session of each player for sending, and the session is preferrably UDP instead of any TCP-based protocol, but I'm being lazy here. + + See `/ConcerningEdgeCases.md` for the advantage of using UDP as a supplement. + */ + if playerSecondaryDownsyncSession, existent := pR.PlayerSecondaryDownsyncSessionDict[playerId]; existent { + if err := playerSecondaryDownsyncSession.WriteMessage(websocket.BinaryMessage, theBytes); nil != err { + panic(fmt.Sprintf("Error sending secondary downsync message: roomId=%v, playerId=%v, roomState=%v, roomEffectivePlayerCount=%v, err=%v", pR.Id, playerId, pR.State, pR.EffectivePlayerCount, err)) + } } } } @@ -1347,6 +1400,30 @@ func (pR *Room) doBattleMainLoopPerTickBackendDynamicsWithProperLocking(prevRend } } +func (pR *Room) broadcastPeerUpsyncForBetterPrediction(inputsBufferSnapshot *pb.InputsBufferSnapshot) { + // See `/ConcerningEdgeCases.md` for why this method exists. + for _, player := range pR.PlayersArr { + playerBattleState := atomic.LoadInt32(&(player.BattleState)) + switch playerBattleState { + case PlayerBattleStateIns.DISCONNECTED, PlayerBattleStateIns.LOST, PlayerBattleStateIns.EXPELLED_DURING_GAME, PlayerBattleStateIns.EXPELLED_IN_DISMISSAL, PlayerBattleStateIns.ADDED_PENDING_BATTLE_COLLIDER_ACK, PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK: + continue + } + if player.JoinIndex == inputsBufferSnapshot.PeerJoinIndex { + continue + } + + if playerSecondaryDownsyncChan, existent := pR.PlayerSecondaryDownsyncChanDict[player.Id]; existent { + /* + [FIXME] + This function is preferred to use an additional go-channel of each player for sending, see "downsyncLoop" & "Room.sendSafely" for more information! + */ + playerSecondaryDownsyncChan <- (*inputsBufferSnapshot) + } else { + Logger.Warn(fmt.Sprintf("playerDownsyncChan for (roomId: %d, playerId:%d) is gone", pR.Id, player.Id)) + } + } +} + func (pR *Room) downsyncToAllPlayers(inputsBufferSnapshot *pb.InputsBufferSnapshot) { /* [WARNING] This function MUST BE called while "pR.InputsBufferLock" is LOCKED to **preserve the order of generation of "inputsBufferSnapshot" for sending** -- see comments in "OnBattleCmdReceived" and [this issue](https://github.com/genxium/DelayNoMore/issues/12). @@ -1423,14 +1500,14 @@ func (pR *Room) downsyncToSinglePlayer(playerId int32, player *Player, refRender We hereby assume that Golang runtime allocates & frees small amount of RAM quickly enough compared to either network I/O blocking in worst cases or the high frequency "per inputFrameDownsync*player" locking (though "OnBattleCmdReceived" locks at the same frequency but it's inevitable). */ - playerJoinIndex := player.JoinIndex - 1 + playerJoinIndexInBooleanArr := player.JoinIndex - 1 playerBattleState := atomic.LoadInt32(&(player.BattleState)) switch playerBattleState { case PlayerBattleStateIns.DISCONNECTED, PlayerBattleStateIns.LOST, PlayerBattleStateIns.EXPELLED_DURING_GAME, PlayerBattleStateIns.EXPELLED_IN_DISMISSAL, PlayerBattleStateIns.ADDED_PENDING_BATTLE_COLLIDER_ACK, PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK: return } - isSlowTicker := (0 < (unconfirmedMask & uint64(1</ConcerningEdgeCases.md` for why this method exists. + if (null == batch) { + return; + } + const self = this; + if (!self.recentInputCache) { + return; + } + if (ALL_BATTLE_STATES.IN_SETTLEMENT == self.battleState) { + return; + } + + for (let k in batch) { + const inputFrameDownsync = batch[k]; + const inputFrameDownsyncId = inputFrameDownsync.inputFrameId; + if (inputFrameDownsyncId <= self.lastAllConfirmedInputFrameId) { + continue; + } + self.getOrPrefabInputFrameUpsync(inputFrameDownsyncId); // Make sure that inputFrame exists locally + const existingInputFrame = self.recentInputCache.GetByFrameId(inputFrameDownsyncId); + existingInputFrame.InputList[peerJoinIndex - 1] = inputFrameDownsync.inputList[peerJoinIndex - 1]; // No need to change "confirmedList", leave it to "onInputFrameDownsyncBatch" -- we're just helping prediction here + self.recentInputCache.SetByFrameId(existingInputFrame, inputFrameDownsyncId); + } + }, + onPlayerAdded(rdf /* pb.RoomDownsyncFrame */ ) { const self = this; // Update the "finding player" GUI and show it if not previously present diff --git a/frontend/assets/scripts/WsSessionMgr.js b/frontend/assets/scripts/WsSessionMgr.js index 7559608..76d409e 100644 --- a/frontend/assets/scripts/WsSessionMgr.js +++ b/frontend/assets/scripts/WsSessionMgr.js @@ -11,6 +11,7 @@ window.DOWNSYNC_MSG_ACT_HB_REQ = 1; window.DOWNSYNC_MSG_ACT_INPUT_BATCH = 2; window.DOWNSYNC_MSG_ACT_BATTLE_STOPPED = 3; window.DOWNSYNC_MSG_ACT_FORCED_RESYNC = 4; +window.DOWNSYNC_MSG_ACT_PEER_INPUT_BATCH = 5; window.sendSafely = function(msgStr) { /** @@ -65,6 +66,7 @@ window.handleHbRequirements = function(resp) { } if (window.handleBattleColliderInfo) { + window.initSecondarySession(null, window.boundRoomId); window.handleBattleColliderInfo(resp.bciFrame); } }; @@ -130,8 +132,6 @@ window.initPersistentSessionClient = function(onopenCb, expectedRoomId) { } } - const currentHistoryState = window.history && window.history.state ? window.history.state : {}; - const clientSession = new WebSocket(urlToConnect); clientSession.binaryType = 'arraybuffer'; // Make 'event.data' of 'onmessage' an "ArrayBuffer" instead of a "Blob" @@ -240,3 +240,56 @@ window.clearLocalStorageAndBackToLoginScene = function(shouldRetainBoundRoomIdIn cc.director.loadScene('login'); }; +// For secondary ws session +window.initSecondarySession = function(onopenCb, boundRoomId) { + if (window.secondarySession && window.secondarySession.readyState == WebSocket.OPEN) { + if (null != onopenCb) { + onopenCb(); + } + return; + } + + const selfPlayerStr = cc.sys.localStorage.getItem("selfPlayer"); + const selfPlayer = null == selfPlayerStr ? null : JSON.parse(selfPlayerStr); + const intAuthToken = null == selfPlayer ? "" : selfPlayer.intAuthToken; + + let urlToConnect = backendAddress.PROTOCOL.replace('http', 'ws') + '://' + backendAddress.HOST + ":" + backendAddress.PORT + "/tsrhtSecondary?isSecondary=true&intAuthToken=" + intAuthToken + "&boundRoomId=" + boundRoomId; + + const clientSession = new WebSocket(urlToConnect); + clientSession.binaryType = 'arraybuffer'; // Make 'event.data' of 'onmessage' an "ArrayBuffer" instead of a "Blob" + + clientSession.onopen = function(evt) { + console.warn("The secondary WS clientSession is opened."); + window.secondarySession = clientSession; + if (null == onopenCb) return; + onopenCb(); + }; + + clientSession.onmessage = function(evt) { + if (null == evt || null == evt.data) { + return; + } + try { + const resp = window.pb.protos.WsResp.decode(new Uint8Array(evt.data)); + //console.log(`Got non-empty onmessage decoded: resp.act=${resp.act}`); + switch (resp.act) { + case window.DOWNSYNC_MSG_ACT_PEER_INPUT_BATCH: + mapIns.onPeerInputFrameUpsync(resp.peerJoinIndex, resp.inputFrameDownsyncBatch); + break; + default: + break; + } + } catch (e) { + console.error("Secondary ws session, unexpected error when parsing data of:", evt.data, e); + } + }; + + clientSession.onerror = function(evt) { + console.error("Secondary ws session, error caught on the WS clientSession: ", evt); + }; + + clientSession.onclose = function(evt) { + // [WARNING] The callback "onclose" might be called AFTER the webpage is refreshed with "1001 == evt.code". + console.warn(`Secondary ws session is closed: evt=${JSON.stringify(evt)}, evt.code=${evt.code}`); + }; +}; diff --git a/frontend/assets/scripts/modules/room_downsync_frame_proto_bundle.forcemsg.js b/frontend/assets/scripts/modules/room_downsync_frame_proto_bundle.forcemsg.js index 5e41996..818085b 100644 --- a/frontend/assets/scripts/modules/room_downsync_frame_proto_bundle.forcemsg.js +++ b/frontend/assets/scripts/modules/room_downsync_frame_proto_bundle.forcemsg.js @@ -2360,6 +2360,7 @@ $root.protos = (function() { * @interface IInputFrameUpsync * @property {number|null} [inputFrameId] InputFrameUpsync inputFrameId * @property {number|Long|null} [encoded] InputFrameUpsync encoded + * @property {number|null} [joinIndex] InputFrameUpsync joinIndex */ /** @@ -2393,6 +2394,14 @@ $root.protos = (function() { */ InputFrameUpsync.prototype.encoded = $util.Long ? $util.Long.fromBits(0,0,true) : 0; + /** + * InputFrameUpsync joinIndex. + * @member {number} joinIndex + * @memberof protos.InputFrameUpsync + * @instance + */ + InputFrameUpsync.prototype.joinIndex = 0; + /** * Creates a new InputFrameUpsync instance using the specified properties. * @function create @@ -2421,6 +2430,8 @@ $root.protos = (function() { writer.uint32(/* id 1, wireType 0 =*/8).int32(message.inputFrameId); if (message.encoded != null && Object.hasOwnProperty.call(message, "encoded")) writer.uint32(/* id 2, wireType 0 =*/16).uint64(message.encoded); + if (message.joinIndex != null && Object.hasOwnProperty.call(message, "joinIndex")) + writer.uint32(/* id 3, wireType 0 =*/24).int32(message.joinIndex); return writer; }; @@ -2463,6 +2474,10 @@ $root.protos = (function() { message.encoded = reader.uint64(); break; } + case 3: { + message.joinIndex = reader.int32(); + break; + } default: reader.skipType(tag & 7); break; @@ -2504,6 +2519,9 @@ $root.protos = (function() { if (message.encoded != null && message.hasOwnProperty("encoded")) if (!$util.isInteger(message.encoded) && !(message.encoded && $util.isInteger(message.encoded.low) && $util.isInteger(message.encoded.high))) return "encoded: integer|Long expected"; + if (message.joinIndex != null && message.hasOwnProperty("joinIndex")) + if (!$util.isInteger(message.joinIndex)) + return "joinIndex: integer expected"; return null; }; @@ -2530,6 +2548,8 @@ $root.protos = (function() { message.encoded = object.encoded; else if (typeof object.encoded === "object") message.encoded = new $util.LongBits(object.encoded.low >>> 0, object.encoded.high >>> 0).toNumber(true); + if (object.joinIndex != null) + message.joinIndex = object.joinIndex | 0; return message; }; @@ -2553,6 +2573,7 @@ $root.protos = (function() { object.encoded = options.longs === String ? long.toString() : options.longs === Number ? long.toNumber() : long; } else object.encoded = options.longs === String ? "0" : 0; + object.joinIndex = 0; } if (message.inputFrameId != null && message.hasOwnProperty("inputFrameId")) object.inputFrameId = message.inputFrameId; @@ -2561,6 +2582,8 @@ $root.protos = (function() { object.encoded = options.longs === String ? String(message.encoded) : message.encoded; else object.encoded = options.longs === String ? $util.Long.prototype.toString.call(message.encoded) : options.longs === Number ? new $util.LongBits(message.encoded.low >>> 0, message.encoded.high >>> 0).toNumber(true) : message.encoded; + if (message.joinIndex != null && message.hasOwnProperty("joinIndex")) + object.joinIndex = message.joinIndex; return object; }; @@ -3513,6 +3536,7 @@ $root.protos = (function() { * @property {protos.RoomDownsyncFrame|null} [rdf] WsResp rdf * @property {Array.|null} [inputFrameDownsyncBatch] WsResp inputFrameDownsyncBatch * @property {protos.BattleColliderInfo|null} [bciFrame] WsResp bciFrame + * @property {number|null} [peerJoinIndex] WsResp peerJoinIndex */ /** @@ -3579,6 +3603,14 @@ $root.protos = (function() { */ WsResp.prototype.bciFrame = null; + /** + * WsResp peerJoinIndex. + * @member {number} peerJoinIndex + * @memberof protos.WsResp + * @instance + */ + WsResp.prototype.peerJoinIndex = 0; + /** * Creates a new WsResp instance using the specified properties. * @function create @@ -3616,6 +3648,8 @@ $root.protos = (function() { $root.protos.InputFrameDownsync.encode(message.inputFrameDownsyncBatch[i], writer.uint32(/* id 5, wireType 2 =*/42).fork()).ldelim(); if (message.bciFrame != null && Object.hasOwnProperty.call(message, "bciFrame")) $root.protos.BattleColliderInfo.encode(message.bciFrame, writer.uint32(/* id 6, wireType 2 =*/50).fork()).ldelim(); + if (message.peerJoinIndex != null && Object.hasOwnProperty.call(message, "peerJoinIndex")) + writer.uint32(/* id 7, wireType 0 =*/56).int32(message.peerJoinIndex); return writer; }; @@ -3676,6 +3710,10 @@ $root.protos = (function() { message.bciFrame = $root.protos.BattleColliderInfo.decode(reader, reader.uint32()); break; } + case 7: { + message.peerJoinIndex = reader.int32(); + break; + } default: reader.skipType(tag & 7); break; @@ -3739,6 +3777,9 @@ $root.protos = (function() { if (error) return "bciFrame." + error; } + if (message.peerJoinIndex != null && message.hasOwnProperty("peerJoinIndex")) + if (!$util.isInteger(message.peerJoinIndex)) + return "peerJoinIndex: integer expected"; return null; }; @@ -3780,6 +3821,8 @@ $root.protos = (function() { throw TypeError(".protos.WsResp.bciFrame: object expected"); message.bciFrame = $root.protos.BattleColliderInfo.fromObject(object.bciFrame); } + if (object.peerJoinIndex != null) + message.peerJoinIndex = object.peerJoinIndex | 0; return message; }; @@ -3804,6 +3847,7 @@ $root.protos = (function() { object.act = 0; object.rdf = null; object.bciFrame = null; + object.peerJoinIndex = 0; } if (message.ret != null && message.hasOwnProperty("ret")) object.ret = message.ret; @@ -3820,6 +3864,8 @@ $root.protos = (function() { } if (message.bciFrame != null && message.hasOwnProperty("bciFrame")) object.bciFrame = $root.protos.BattleColliderInfo.toObject(message.bciFrame, options); + if (message.peerJoinIndex != null && message.hasOwnProperty("peerJoinIndex")) + object.peerJoinIndex = message.peerJoinIndex; return object; }; @@ -3862,6 +3908,7 @@ $root.protos = (function() { * @property {number|Long|null} [unconfirmedMask] InputsBufferSnapshot unconfirmedMask * @property {Array.|null} [toSendInputFrameDownsyncs] InputsBufferSnapshot toSendInputFrameDownsyncs * @property {boolean|null} [shouldForceResync] InputsBufferSnapshot shouldForceResync + * @property {number|null} [peerJoinIndex] InputsBufferSnapshot peerJoinIndex */ /** @@ -3912,6 +3959,14 @@ $root.protos = (function() { */ InputsBufferSnapshot.prototype.shouldForceResync = false; + /** + * InputsBufferSnapshot peerJoinIndex. + * @member {number} peerJoinIndex + * @memberof protos.InputsBufferSnapshot + * @instance + */ + InputsBufferSnapshot.prototype.peerJoinIndex = 0; + /** * Creates a new InputsBufferSnapshot instance using the specified properties. * @function create @@ -3945,6 +4000,8 @@ $root.protos = (function() { $root.protos.InputFrameDownsync.encode(message.toSendInputFrameDownsyncs[i], writer.uint32(/* id 3, wireType 2 =*/26).fork()).ldelim(); if (message.shouldForceResync != null && Object.hasOwnProperty.call(message, "shouldForceResync")) writer.uint32(/* id 4, wireType 0 =*/32).bool(message.shouldForceResync); + if (message.peerJoinIndex != null && Object.hasOwnProperty.call(message, "peerJoinIndex")) + writer.uint32(/* id 5, wireType 0 =*/40).int32(message.peerJoinIndex); return writer; }; @@ -3997,6 +4054,10 @@ $root.protos = (function() { message.shouldForceResync = reader.bool(); break; } + case 5: { + message.peerJoinIndex = reader.int32(); + break; + } default: reader.skipType(tag & 7); break; @@ -4050,6 +4111,9 @@ $root.protos = (function() { if (message.shouldForceResync != null && message.hasOwnProperty("shouldForceResync")) if (typeof message.shouldForceResync !== "boolean") return "shouldForceResync: boolean expected"; + if (message.peerJoinIndex != null && message.hasOwnProperty("peerJoinIndex")) + if (!$util.isInteger(message.peerJoinIndex)) + return "peerJoinIndex: integer expected"; return null; }; @@ -4088,6 +4152,8 @@ $root.protos = (function() { } if (object.shouldForceResync != null) message.shouldForceResync = Boolean(object.shouldForceResync); + if (object.peerJoinIndex != null) + message.peerJoinIndex = object.peerJoinIndex | 0; return message; }; @@ -4114,6 +4180,7 @@ $root.protos = (function() { } else object.unconfirmedMask = options.longs === String ? "0" : 0; object.shouldForceResync = false; + object.peerJoinIndex = 0; } if (message.refRenderFrameId != null && message.hasOwnProperty("refRenderFrameId")) object.refRenderFrameId = message.refRenderFrameId; @@ -4129,6 +4196,8 @@ $root.protos = (function() { } if (message.shouldForceResync != null && message.hasOwnProperty("shouldForceResync")) object.shouldForceResync = message.shouldForceResync; + if (message.peerJoinIndex != null && message.hasOwnProperty("peerJoinIndex")) + object.peerJoinIndex = message.peerJoinIndex; return object; };