From e7bf6ec16bff5d2cf388dea72e221d8cf9ec288b Mon Sep 17 00:00:00 2001 From: genxium Date: Thu, 26 Jan 2023 20:19:45 +0800 Subject: [PATCH] Added basic input upsync via udp. --- frontend/assets/scripts/Map.js | 43 ++++++++++++++++++- frontend/assets/scripts/WsSessionMgr.js | 10 +++-- .../runtime-src/Classes/udp_session.cpp | 25 ++++++++--- .../runtime-src/Classes/udp_session.hpp | 3 +- .../Classes/udp_session_bridge.cpp | 36 ++++++++++++++-- .../Classes/udp_session_bridge.hpp | 1 + 6 files changed, 103 insertions(+), 15 deletions(-) diff --git a/frontend/assets/scripts/Map.js b/frontend/assets/scripts/Map.js index 06f955d..3c5e1a9 100644 --- a/frontend/assets/scripts/Map.js +++ b/frontend/assets/scripts/Map.js @@ -34,6 +34,44 @@ window.PlayerBattleState = { EXPELLED_IN_DISMISSAL: 6 }; +window.onUdpMessage = (args) => { + const self = window.mapIns; + const len = args.length; + const ui8Arr = new Uint8Array(len); + for (let i = 0; i < len; i++) { + ui8Arr[i] = args.charCodeAt(i); + } + cc.log(`#1 Js called back by CPP: onUdpMessage: args=${args}, typeof(args)=${typeof (args)}, argslen=${args.length}, ui8Arr=${ui8Arr}`); + + if (6 == len) { + cc.log(`#2 Js called back by CPP for peer hole punching`); + } else { + const req = window.pb.protos.WsReq.decode(ui8Arr); + if (req) { + cc.log(`#2 Js called back by CPP for upsync: onUdpMessage: ${JSON.stringify(req)}`); + if (req.act && window.UPSYNC_MSG_ACT_PLAYER_CMD == req.act) { + const renderedInputFrameIdUpper = gopkgs.ConvertToDelayedInputFrameId(self.renderFrameId); + const peerJoinIndex = req.joinIndex; + const batch = req.inputFrameUpsyncBatch; + for (let k in batch) { + const inputFrameUpsync = batch[k]; + if (inputFrameUpsync.inputFrameId < renderedInputFrameIdUpper) { + // Avoid obfuscating already rendered history + continue; + } + if (inputFrameUpsync.inputFrameId <= self.lastAllConfirmedInputFrameId) { + continue; + } + self.getOrPrefabInputFrameUpsync(inputFrameUpsync.inputFrameId); // Make sure that inputFrame exists locally + const existingInputFrame = self.recentInputCache.GetByFrameId(inputFrameUpsync.inputFrameId); + existingInputFrame.InputList[inputFrameUpsync.joinIndex - 1] = inputFrameUpsync.encoded; // No need to change "confirmedList", leave it to "onInputFrameDownsyncBatch" -- we're just helping prediction here + self.recentInputCache.SetByFrameId(existingInputFrame, inputFrameUpsync.inputFrameId); + } + } + } + } +}; + cc.Class({ extends: cc.Component, @@ -182,7 +220,7 @@ cc.Class({ */ if (null == currSelfInput) return false; - const shouldUpsyncForEarlyAllConfirmedOnBackend = (currInputFrameId - lastUpsyncInputFrameId >= this.inputFrameUpsyncDelayTolerance); + const shouldUpsyncForEarlyAllConfirmedOnBackend = (currInputFrameId - lastUpsyncInputFrameId >= 1); return shouldUpsyncForEarlyAllConfirmedOnBackend || (prevSelfInput != currSelfInput); }, @@ -218,6 +256,9 @@ cc.Class({ ackingInputFrameId: self.lastAllConfirmedInputFrameId, inputFrameUpsyncBatch: inputFrameUpsyncBatch, }).finish(); + if (cc.sys.isNative) { + DelayNoMore.UdpSession.broadcastInputFrameUpsync(reqData, window.boundRoomCapacity, self.selfPlayerInfo.JoinIndex); + } window.sendSafely(reqData); self.lastUpsyncInputFrameId = latestLocalInputFrameId; if (self.lastUpsyncInputFrameId >= self.recentInputCache.EdFrameId) { diff --git a/frontend/assets/scripts/WsSessionMgr.js b/frontend/assets/scripts/WsSessionMgr.js index 41c4fc6..4eaab39 100644 --- a/frontend/assets/scripts/WsSessionMgr.js +++ b/frontend/assets/scripts/WsSessionMgr.js @@ -44,7 +44,8 @@ window.getBoundRoomIdFromPersistentStorage = function() { window.clearBoundRoomIdInBothVolatileAndPersistentStorage(); return null; } - return cc.sys.localStorage.getItem("boundRoomId"); + const boundRoomIdStr = cc.sys.localStorage.getItem("boundRoomId"); + return (null == boundRoomIdStr ? null : parseInt(boundRoomIdStr)); }; window.getBoundRoomCapacityFromPersistentStorage = function() { @@ -53,7 +54,8 @@ window.getBoundRoomCapacityFromPersistentStorage = function() { window.clearBoundRoomIdInBothVolatileAndPersistentStorage(); return null; } - return cc.sys.localStorage.getItem("boundRoomCapacity"); + const boundRoomCapacityStr = cc.sys.localStorage.getItem("boundRoomCapacity"); + return (null == boundRoomCapacityStr ? null : parseInt(boundRoomCapacityStr)); }; window.clearBoundRoomIdInBothVolatileAndPersistentStorage = function() { @@ -219,9 +221,9 @@ window.initPersistentSessionClient = function(onopenCb, expectedRoomId) { if (cc.sys.isNative) { const peerJoinIndex = resp.peerJoinIndex; const peerAddrList = resp.rdf.peerUdpAddrList; + console.log(`Got DOWNSYNC_MSG_ACT_PEER_UDP_ADDR peerAddrList=${JSON.stringify(peerAddrList)}; boundRoomCapacity=${window.boundRoomCapacity}`); const peerAddr = peerAddrList[peerJoinIndex - 1]; - console.log(`Got DOWNSYNC_MSG_ACT_PEER_UDP_ADDR peerAddr=${peerAddr}; boundRoomCapacity=${window.boundRoomCapacity}, mapIns.selfPlayerInfo=${window.mapIns.selfPlayerInfo}`); - DelayNoMore.UdpSession.upsertPeerUdpAddr(peerJoinIndex, peerAddr.ip, peerAddr.port, peerAddr.authKey, window.boundRoomCapacity, window.mapIns.selfPlayerInfo.JoinIndex); + DelayNoMore.UdpSession.upsertPeerUdpAddr(peerJoinIndex, peerAddr.ip, peerAddr.port, peerAddr.authKey, window.boundRoomCapacity, window.mapIns.selfPlayerInfo.JoinIndex); // In C++ impl it actually broadcasts the peer-punching message to all known peers within "window.boundRoomCapacity" } break; default: diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp index d36b97f..d53e0d6 100644 --- a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp @@ -40,7 +40,7 @@ void _onRead(uv_udp_t* req, ssize_t nread, const uv_buf_t* buf, const struct soc memset(gameThreadMsg, 0, gameThreadMsgSize); memcpy(gameThreadMsg, buf->base, nread); - CCLOG("Recv %d bytes from %s:%d, converted to %d bytes for the JS callback", nread, ip, port, strlen(gameThreadMsg)); + CCLOG("UDP read %d bytes from %s:%d, converted to %d bytes for the JS callback", nread, ip, port, strlen(gameThreadMsg)); free(buf->base); //uv_udp_recv_stop(req); @@ -153,13 +153,13 @@ void _onSend(uv_udp_send_t* req, int status) { bool DelayNoMore::UdpSession::upsertPeerUdpAddr(int joinIndex, CHARC* const ip, int port, uint32_t authKey, int roomCapacity, int selfJoinIndex) { CCLOG("upsertPeerUdpAddr called by js for joinIndex=%d, ip=%s, port=%d, authKey=%lu; roomCapacity=%d, selfJoinIndex=%d.", joinIndex, ip, port, authKey, roomCapacity, selfJoinIndex); - uv_ip4_addr(ip, port, &(peerAddrList[joinIndex - 1].sockAddrIn)); - peerAddrList[joinIndex - 1].authKey = authKey; // Punching between existing peer-pairs for Address/Port-restricted Cone NAT (not need for Full Cone NAT) uv_mutex_lock(&sendLock); for (int i = 0; i < roomCapacity; i++) { if (i == selfJoinIndex - 1) continue; + uv_ip4_addr(ip, port, &(peerAddrList[i].sockAddrIn)); + peerAddrList[i].authKey = authKey; for (int j = 0; j < 10; j++) { uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); uv_buf_t sendBuffer = uv_buf_init("foobar", 6); // hardcoded for now @@ -171,7 +171,7 @@ bool DelayNoMore::UdpSession::upsertPeerUdpAddr(int joinIndex, CHARC* const ip, return true; } -bool DelayNoMore::UdpSession::punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes) { +bool DelayNoMore::UdpSession::punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes, size_t bytesLen) { /* [WARNING] The RAM space used for "bytes", either on stack or in heap, is preallocatedand managed by the caller. @@ -181,7 +181,7 @@ bool DelayNoMore::UdpSession::punchToServer(CHARC* const srvIp, int const srvPor SRV_PORT = srvPort; uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); - uv_buf_t sendBuffer = uv_buf_init(bytes, strlen(bytes)); + uv_buf_t sendBuffer = uv_buf_init(bytes, bytesLen); struct sockaddr_in destAddr; uv_ip4_addr(SRV_IP, SRV_PORT, &destAddr); @@ -191,3 +191,18 @@ bool DelayNoMore::UdpSession::punchToServer(CHARC* const srvIp, int const srvPor return true; } + +bool DelayNoMore::UdpSession::broadcastInputFrameUpsync(BYTEC* const bytes, size_t bytesLen, int roomCapacity, int selfJoinIndex) { + uv_mutex_lock(&sendLock); + for (int i = 0; i < roomCapacity; i++) { + if (i == selfJoinIndex - 1) continue; + for (int j = 0; j < 10; j++) { + uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); + uv_buf_t sendBuffer = uv_buf_init(bytes, bytesLen); + uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&peerAddrList[i], _onSend); + } + } + uv_mutex_unlock(&sendLock); + + return true; +} \ No newline at end of file diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.hpp b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.hpp index a397518..12e7c74 100644 --- a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.hpp +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.hpp @@ -13,7 +13,8 @@ namespace DelayNoMore { static bool closeUdpSession(); static bool upsertPeerUdpAddr(int joinIndex, CHARC* const ip, int port, uint32_t authKey, int roomCapacity, int selfJoinIndex); //static bool clearPeerUDPAddrList(); - static bool punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes); + static bool punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes, size_t bytesLen); + static bool broadcastInputFrameUpsync(BYTEC* const bytes, size_t bytesLen, int roomCapacity, int selfJoinIndex); }; } #endif diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.cpp b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.cpp index caeb05f..055c2b4 100644 --- a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.cpp +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.cpp @@ -30,17 +30,44 @@ bool punchToServer(se::State& s) { memset(bytes, 0, sizeof bytes); se::Object* obj = args[2].toObject(); size_t sz = 0; - uint8_t* ptr; + uint8_t* ptr = NULL; obj->getTypedArrayData(&ptr, &sz); - memcpy(bytes, ptr, sz); - CCLOG("Should punch %s:%d by %d bytes v.s. strlen(bytes)=%u.", srvIp, srvPort, sz, strlen(bytes)); - return DelayNoMore::UdpSession::punchToServer(srvIp, srvPort, bytes); + for (size_t i = 0; i < sz; i++) { + bytes[i] = (char)(*(ptr + i)); + } + CCLOG("Should punch %s:%d by %d bytes.", srvIp, srvPort, sz); + return DelayNoMore::UdpSession::punchToServer(srvIp, srvPort, bytes, sz); } SE_REPORT_ERROR("wrong number of arguments: %d, was expecting %d; or wrong arg type!", (int)argc, 3); return false; } SE_BIND_FUNC(punchToServer) +bool broadcastInputFrameUpsync(se::State& s) { + const auto& args = s.args(); + size_t argc = args.size(); + CC_UNUSED bool ok = true; + if (3 == argc && args[0].toObject()->isTypedArray() && args[1].isNumber() && args[2].isNumber()) { + SE_PRECONDITION2(ok, false, "broadcastInputFrameUpsync: Error processing arguments"); + BYTEC bytes[1024]; + memset(bytes, 0, sizeof bytes); + se::Object* obj = args[0].toObject(); + size_t sz = 0; + uint8_t* ptr = NULL; + obj->getTypedArrayData(&ptr, &sz); + for (size_t i = 0; i < sz; i++) { + bytes[i] = (char)(*(ptr + i)); + } + int roomCapacity = args[1].toInt32(); + int selfJoinIndex = args[2].toInt32(); + CCLOG("Should broadcastInputFrameUpsync %u bytes; roomCapacity=%d, selfJoinIndex=%d.", sz, roomCapacity, selfJoinIndex); + return DelayNoMore::UdpSession::broadcastInputFrameUpsync(bytes, sz, roomCapacity, selfJoinIndex); + } + SE_REPORT_ERROR("wrong number of arguments: %d, was expecting %d; or wrong arg type!", (int)argc, 3); + return false; +} +SE_BIND_FUNC(broadcastInputFrameUpsync) + bool closeUdpSession(se::State& s) { const auto& args = s.args(); size_t argc = args.size(); @@ -106,6 +133,7 @@ bool registerUdpSession(se::Object* obj) cls->defineStaticFunction("openUdpSession", _SE(openUdpSession)); cls->defineStaticFunction("punchToServer", _SE(punchToServer)); + cls->defineStaticFunction("broadcastInputFrameUpsync", _SE(broadcastInputFrameUpsync)); cls->defineStaticFunction("closeUdpSession", _SE(closeUdpSession)); cls->defineStaticFunction("upsertPeerUdpAddr", _SE(upsertPeerUdpAddr)); cls->defineFinalizeFunction(_SE(udpSessionFinalize)); diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.hpp b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.hpp index dee035d..b7c92d9 100644 --- a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.hpp +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.hpp @@ -12,6 +12,7 @@ bool registerUdpSession(se::Object* obj); SE_DECLARE_FUNC(openUdpSession); SE_DECLARE_FUNC(punchToServer); +SE_DECLARE_FUNC(broadcastInputFrameUpsync); SE_DECLARE_FUNC(closeUdpSession); SE_DECLARE_FUNC(upsertPeerUdpAddr);