From 2b6cb570508f127cf8caabda90ada781cf884e49 Mon Sep 17 00:00:00 2001 From: genxium Date: Mon, 30 Jan 2023 00:20:43 +0800 Subject: [PATCH] Enabled backend udp tunnel forwarding. --- battle_srv/models/player.go | 3 +- battle_srv/models/room.go | 38 +++++++++++++------ frontend/assets/scripts/Map.js | 1 + frontend/assets/scripts/WsSessionMgr.js | 1 + .../runtime-src/Classes/udp_session.cpp | 13 ++++++- 5 files changed, 43 insertions(+), 13 deletions(-) diff --git a/battle_srv/models/player.go b/battle_srv/models/player.go index 973385e..eb11448 100644 --- a/battle_srv/models/player.go +++ b/battle_srv/models/player.go @@ -8,6 +8,7 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" "go.uber.org/zap" + "net" ) type PlayerBattleState struct { @@ -52,7 +53,7 @@ type Player struct { AckingInputFrameId int32 UdpAddr *PeerUdpAddr - BattleUdpTunnelAddr *PeerUdpAddr // This addr is used by backend only, not visible to frontend + BattleUdpTunnelAddr *net.UDPAddr // This addr is used by backend only, not visible to frontend BattleUdpTunnelAuthKey int32 } diff --git a/battle_srv/models/room.go b/battle_srv/models/room.go index 0168cad..a4c157f 100644 --- a/battle_srv/models/room.go +++ b/battle_srv/models/room.go @@ -159,7 +159,7 @@ type Room struct { rdfIdToActuallyUsedInput map[int32]*pb.InputFrameDownsync LastIndividuallyConfirmedInputList []uint64 - BattleUdpTunnelLock sync.Mutex // Guards "startBattleUdpTunnel" + BattleUdpTunnelLock sync.Mutex BattleUdpTunnelAddr *pb.PeerUdpAddr BattleUdpTunnel *net.UDPConn } @@ -327,8 +327,8 @@ func (pR *Room) InputsBufferString(allDetails bool) string { // Appending of the array of strings can be very SLOW due to on-demand heap allocation! Use this printing with caution. s := make([]string, 0) s = append(s, fmt.Sprintf("{renderFrameId: %v, stInputFrameId: %v, edInputFrameId: %v, lastAllConfirmedInputFrameIdWithChange: %v, lastAllConfirmedInputFrameId: %v}", pR.RenderFrameId, pR.InputsBuffer.StFrameId, pR.InputsBuffer.EdFrameId, pR.LastAllConfirmedInputFrameIdWithChange, pR.LastAllConfirmedInputFrameId)) - for playerId, player := range pR.PlayersArr { - s = append(s, fmt.Sprintf("{playerId: %v, ackingFrameId: %v, ackingInputFrameId: %v, lastSentInputFrameId: %v}", playerId, player.AckingFrameId, player.AckingInputFrameId, player.LastSentInputFrameId)) + for _, player := range pR.PlayersArr { + s = append(s, fmt.Sprintf("{playerId: %v, ackingFrameId: %v, ackingInputFrameId: %v, lastSentInputFrameId: %v}", player.Id, player.AckingFrameId, player.AckingInputFrameId, player.LastSentInputFrameId)) } for i := pR.InputsBuffer.StFrameId; i < pR.InputsBuffer.EdFrameId; i++ { tmp := pR.InputsBuffer.GetByFrameId(i) @@ -821,7 +821,7 @@ func (pR *Room) OnDismissed() { pR.RollbackEstimatedDtNanos = 16666666 // A little smaller than the actual per frame time, just for logging FAST FRAME dilutedServerFps := float64(58.0) // Don't set this value too small, otherwise we might miss force confirmation needs for slow tickers! pR.dilutedRollbackEstimatedDtNanos = int64(float64(pR.RollbackEstimatedDtNanos) * float64(serverFps) / dilutedServerFps) - pR.BattleDurationFrames = int32(5 * serverFps) + pR.BattleDurationFrames = int32(60 * serverFps) pR.BattleDurationNanos = int64(pR.BattleDurationFrames) * (pR.RollbackEstimatedDtNanos + 1) pR.InputFrameUpsyncDelayTolerance = battle.ConvertToNoDelayInputFrameId(pR.NstDelayFrames) - 1 // this value should be strictly smaller than (NstDelayFrames >> InputScaleFrames), otherwise "type#1 forceConfirmation" might become a lag avalanche pR.MaxChasingRenderFramesPerUpdate = 9 // Don't set this value too high to avoid exhausting frontend CPU within a single frame, roughly as the "turn-around frames to recover" is empirically OK @@ -1727,12 +1727,13 @@ func (pR *Room) startBattleUdpTunnel() { panic(err) } pReq := new(pb.WsReq) - if unmarshalErr := proto.Unmarshal(message[0:rlen], pReq); nil != unmarshalErr { + bytes := message[0:rlen] + if unmarshalErr := proto.Unmarshal(bytes, pReq); nil != unmarshalErr { Logger.Warn("`BattleUdpTunnel` for roomId=%d failed to unmarshal", zap.Error(unmarshalErr)) continue } playerId := pReq.PlayerId - Logger.Info(fmt.Sprintf("`BattleUdpTunnel` for roomId=%d received decoded WsReq:", pR.Id), zap.Any("pReq", pReq)) + Logger.Info(fmt.Sprintf("`BattleUdpTunnel` for roomId=%d received decoded WsReq:", pR.Id), zap.Any("pReq", pReq)) if player, exists1 := pR.Players[playerId]; exists1 { authKey := pReq.AuthKey if authKey != player.BattleUdpTunnelAuthKey { @@ -1740,12 +1741,27 @@ func (pR *Room) startBattleUdpTunnel() { continue } if _, existent := pR.PlayerDownsyncSessionDict[playerId]; existent { - player.UdpAddr = &pb.PeerUdpAddr{ - Ip: remote.IP.String(), - Port: int32(remote.Port), - AuthKey: pReq.AuthKey, - } + player.BattleUdpTunnelAddr = remote Logger.Info(fmt.Sprintf("`BattleUdpTunnel` for roomId=%d updated battleUdpAddr for playerId=%d to be %s\n", pR.Id, playerId, remote)) + + nowBattleState := atomic.LoadInt32(&pR.State) + if RoomBattleStateIns.IN_BATTLE == nowBattleState { + batch := pReq.InputFrameUpsyncBatch + if nil != batch && 0 < len(batch) { + peerJoinIndex := pReq.JoinIndex + // Broadcast to every other player in the same room/battle + for _, otherPlayer := range pR.PlayersArr { + if otherPlayer.JoinIndex == peerJoinIndex { + continue + } + _, wrerr := conn.WriteTo(bytes, otherPlayer.BattleUdpTunnelAddr) + if nil != wrerr { + Logger.Warn(fmt.Sprintf("`BattleUdpTunnel` for roomId=%d failed to forward upsync from (playerId:%d, joinIndex:%d, addr:%s) to (otherPlayerId:%d, otherPlayerJoinIndex:%d, otherPlayerAddr:%s)\n", pR.Id, playerId, peerJoinIndex, remote, otherPlayer.Id, otherPlayer.JoinIndex, otherPlayer.BattleUdpTunnelAddr)) + } + } + } + + } } else { Logger.Warn(fmt.Sprintf("`BattleUdpTunnel` for roomId=%d received validated %d bytes for playerId=%d from %s, but primary downsync session for it doesn't exist\n", pR.Id, rlen, playerId, remote)) } diff --git a/frontend/assets/scripts/Map.js b/frontend/assets/scripts/Map.js index 2469258..7033c4a 100644 --- a/frontend/assets/scripts/Map.js +++ b/frontend/assets/scripts/Map.js @@ -257,6 +257,7 @@ cc.Class({ joinIndex: self.selfPlayerInfo.JoinIndex, ackingInputFrameId: self.lastAllConfirmedInputFrameId, inputFrameUpsyncBatch: inputFrameUpsyncBatch, + authKey: self.selfPlayerInfo.udpTunnelAuthKey, }).finish(); if (cc.sys.isNative) { DelayNoMore.UdpSession.broadcastInputFrameUpsync(reqData, window.boundRoomCapacity, self.selfPlayerInfo.JoinIndex); diff --git a/frontend/assets/scripts/WsSessionMgr.js b/frontend/assets/scripts/WsSessionMgr.js index 45e1f2f..af491a8 100644 --- a/frontend/assets/scripts/WsSessionMgr.js +++ b/frontend/assets/scripts/WsSessionMgr.js @@ -98,6 +98,7 @@ window.handleHbRequirements = function(resp) { } else { console.log(`Handle hb requirements #5, native, bciFrame.battleUdpTunnel=${resp.bciFrame.battleUdpTunnel}, selfPlayerInfo=${JSON.stringify(window.mapIns.selfPlayerInfo)}`); const res1 = DelayNoMore.UdpSession.openUdpSession(8888 + window.mapIns.selfPlayerInfo.JoinIndex); + window.mapIns.selfPlayerInfo.udpTunnelAuthKey = resp.bciFrame.battleUdpTunnel.authKey; const intAuthToken = window.mapIns.selfPlayerInfo.intAuthToken; const authKey = Math.floor(Math.random() * 65535); window.mapIns.selfPlayerInfo.authKey = authKey; diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp index 8676fd0..a2d1f4b 100644 --- a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp @@ -15,6 +15,7 @@ struct PeerAddr peerAddrList[maxPeerCnt]; char SRV_IP[256]; int SRV_PORT = 0; int UDP_TUNNEL_SRV_PORT = 0; +struct PeerAddr udpTunnelAddr; void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr const* addr, unsigned flags) { if (nread < 0) { @@ -145,6 +146,7 @@ void _punchServerOnUvThread(uv_work_t* wrapper) { uv_buf_t udpTunnelSendBuffer = uv_buf_init(work->udpTunnelBytes, work->udpTunnelBytesLen); struct sockaddr_in udpTunnelDestAddr; uv_ip4_addr(SRV_IP, UDP_TUNNEL_SRV_PORT, &udpTunnelDestAddr); + udpTunnelAddr.sockAddrIn = udpTunnelDestAddr; uv_udp_send(udpTunnelReq, udpSocket, &udpTunnelSendBuffer, 1, (struct sockaddr const*)&udpTunnelDestAddr, _onSend); } } @@ -254,6 +256,14 @@ void _broadcastInputFrameUpsyncOnUvThread(uv_work_t* wrapper) { BroadcastInputFrameUpsyncWork* work = (BroadcastInputFrameUpsyncWork*)wrapper->data; int roomCapacity = work->roomCapacity; int selfJoinIndex = work->selfJoinIndex; + // Send to room udp tunnel in case of hole punching failure + for (int j = 0; j < broadcastUpsyncCnt; j++) { + uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); + uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen); + uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&(udpTunnelAddr.sockAddrIn), _onSend); + CCLOG("UDP sent upsync to udp tunnel %s:%d by %u bytes round-%d", SRV_IP, UDP_TUNNEL_SRV_PORT, work->bytesLen, j); + } + for (int i = 0; i < roomCapacity; i++) { if (i + 1 == selfJoinIndex) { continue; @@ -268,10 +278,11 @@ void _broadcastInputFrameUpsyncOnUvThread(uv_work_t* wrapper) { for (int j = 0; j < broadcastUpsyncCnt; j++) { uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen); - uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&peerAddrList[i], _onSend); + uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&(peerAddrList[i].sockAddrIn), _onSend); CCLOG("UDP broadcasted upsync to peer %s:%d by %u bytes round-%d", peerIp, ntohs(peerAddrList[i].sockAddrIn.sin_port), work->bytesLen, j); } } + } void _afterBroadcastInputFrameUpsync(uv_work_t* wrapper, int status) { BroadcastInputFrameUpsyncWork* work = (BroadcastInputFrameUpsyncWork*)wrapper->data;