diff --git a/frontend/assets/scripts/Map.js b/frontend/assets/scripts/Map.js index 89a802d..53ad007 100644 --- a/frontend/assets/scripts/Map.js +++ b/frontend/assets/scripts/Map.js @@ -1084,6 +1084,9 @@ fromUDP=${fromUDP}`); } try { let st = performance.now(); + if (cc.sys.isNative) { + DelayNoMore.UdpSession.pollUdpRecvRingBuff(); + } const noDelayInputFrameId = gopkgs.ConvertToNoDelayInputFrameId(self.renderFrameId); let prevSelfInput = null, currSelfInput = null; diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/send_ring_buff.cpp b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/send_ring_buff.cpp index 7dd2f9c..052eee5 100644 --- a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/send_ring_buff.cpp +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/send_ring_buff.cpp @@ -1,6 +1,7 @@ #include #include "send_ring_buff.hpp" +// Sending void SendRingBuff::put(BYTEC* const newBytes, size_t newBytesLen, PeerAddr* pNewPeerAddr) { while (0 < cnt && cnt >= n) { // Make room for the new element @@ -29,3 +30,34 @@ SendWork* SendRingBuff::pop() { } return ret; } + +// Recving +void RecvRingBuff::put(char* newBytes, size_t newBytesLen) { + while (0 < cnt && cnt >= n) { + // Make room for the new element + this->pop(); + } + eles[ed].bytesLen = newBytesLen; + memset(eles[ed].ui8Arr, 0, sizeof eles[ed].ui8Arr); + for (int i = 0; i < newBytesLen; i++) { + *(eles[ed].ui8Arr + i) = *(newBytes + i); + } + ed++; + cnt++; + if (ed >= n) { + ed -= n; // Deliberately not using "%" operator for performance concern + } +} + +RecvWork* RecvRingBuff::pop() { + if (0 == cnt) { + return NULL; + } + RecvWork* ret = &(eles[st]); + cnt--; + st++; + if (st >= n) { + st -= n; + } + return ret; +} \ No newline at end of file diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/send_ring_buff.hpp b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/send_ring_buff.hpp index a7a3758..90fbae6 100644 --- a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/send_ring_buff.hpp +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/send_ring_buff.hpp @@ -41,4 +41,25 @@ public: SendWork* pop(); }; +// TODO: Move "RecvXxxx" to a dedicated class. +class RecvWork { +public: + uint8_t ui8Arr[maxUdpPayloadBytes]; // Wasting some RAM here thus no need for explicit recursive destruction + size_t bytesLen; +}; + +// [WARNING] This class is specific to "RecvWork" +class RecvRingBuff { +public: + int ed, st, n, cnt; + RecvWork eles[maxBuffedMsgs]; // preallocated on stack to save heap alloc/dealloc time + RecvRingBuff(int newN) { + this->n = newN; + this->st = this->ed = this->cnt = 0; + } + + void put(char* newBytes, size_t newBytesLen); + + RecvWork* pop(); +}; #endif diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp index fd8969d..7681014 100644 --- a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp @@ -16,11 +16,15 @@ uv_loop_t *recvLoop = NULL, *sendLoop = NULL; uv_mutex_t sendRingBuffLock; // used along with "uvSendLoopTriggerSig" as a "uv_cond_t" SendRingBuff* sendRingBuff = NULL; +uv_mutex_t recvRingBuffLock; +RecvRingBuff* recvRingBuff = NULL; + char SRV_IP[256]; int SRV_PORT = 0; int UDP_TUNNEL_SRV_PORT = 0; struct PeerAddr udpPunchingServerAddr, udpTunnelAddr; struct PeerAddr peerAddrList[maxPeerCnt]; +bool peerPunchedMarks[maxPeerCnt]; void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr const* addr, unsigned flags) { if (nread < 0) { @@ -29,6 +33,8 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr free(buf->base); return; } + struct sockaddr_in const* sockAddr = (struct sockaddr_in const*)addr; + #if defined(COCOS2D_DEBUG) && (COCOS2D_DEBUG > 0) char ip[INET_ADDRSTRLEN]; memset(ip, 0, sizeof ip); @@ -38,10 +44,8 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr // The null check for "addr" is necessary, on Android there'd be such mysterious call to "_onRead"! switch (addr->sa_family) { case AF_INET: { - struct sockaddr_in const* sockAddr = (struct sockaddr_in const*)addr; uv_inet_ntop(sockAddr->sin_family, &(sockAddr->sin_addr), ip, INET_ADDRSTRLEN); port = ntohs(sockAddr->sin_port); - CCLOG("UDP received %u bytes from %s:%d", nread, ip, port); break; } default: @@ -53,37 +57,24 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr #endif if (6 == nread) { - // holepunching - } else if (0 < nread) { - // Non-holepunching; it might be more effective in RAM usage to use a threadsafe RingBuff to pass msg to GameThread here, but as long as it's not a performance blocker don't bother optimize here... - uint8_t* const ui8Arr = (uint8_t*)malloc(maxUdpPayloadBytes*sizeof(uint8_t)); - memset(ui8Arr, 0, sizeof(ui8Arr)); - for (int i = 0; i < nread; i++) { - *(ui8Arr+i) = *(buf->base + i); + // Peer holepunching + for (int i = 0; i < maxPeerCnt; i++) { + if (peerAddrList[i].sockAddrIn.sin_addr.s_addr != sockAddr->sin_addr.s_addr) continue; + if (peerAddrList[i].sockAddrIn.sin_port != sockAddr->sin_port) continue; + peerPunchedMarks[i] = true; +#if defined(COCOS2D_DEBUG) && (COCOS2D_DEBUG > 0) + CCLOG("UDP received peer-holepunching from %s:%d", ip, port); +#endif + break; } - cocos2d::Application::getInstance()->getScheduler()->performFunctionInCocosThread([=]() { - // [WARNING] Use of the "ScriptEngine" is only allowed in "GameThread a.k.a. CocosThread"! - se::Value onUdpMessageCb; - se::ScriptEngine::getInstance()->getGlobalObject()->getProperty("onUdpMessage", &onUdpMessageCb); - // [WARNING] Declaring "AutoHandleScope" is critical here, otherwise "onUdpMessageCb.toObject()" wouldn't be recognized as a function of the ScriptEngine! - se::AutoHandleScope hs; - //CCLOG("UDP received %d bytes upsync -- 1", nread); - se::Object* const gameThreadMsg = se::Object::createTypedArray(se::Object::TypedArrayType::UINT8, ui8Arr, nread); - //CCLOG("UDP received %d bytes upsync -- 2", nread); - se::ValueArray args = { se::Value(gameThreadMsg) }; - if (onUdpMessageCb.isObject() && onUdpMessageCb.toObject()->isFunction()) { - // Temporarily assume that the "this" ptr within callback is NULL. - bool ok = onUdpMessageCb.toObject()->call(args, NULL); - if (!ok) { - se::ScriptEngine::getInstance()->clearException(); - } - } - //CCLOG("UDP received %d bytes upsync -- 3", nread); - gameThreadMsg->decRef(); // Reference http://docs.cocos.com/creator/2.2/manual/en/advanced-topics/JSB2.0-learning.html#seobject - //CCLOG("UDP received %d bytes upsync -- 4", nread); - free(ui8Arr); - //CCLOG("UDP received %d bytes upsync -- 5", nread); - }); + } else if (0 < nread) { + // Non-holepunching; the previously used "cocos2d::Application::getInstance()->getScheduler()->performFunctionInCocosThread(...)" approach was so non-deterministic in terms of the lag till GameThread actually recognizes this latest received packet due to scheduler uncertainty -- and was also heavier in RAM due to lambda usage +#if defined(COCOS2D_DEBUG) && (COCOS2D_DEBUG > 0) + CCLOG("UDP received %u bytes inputFrameUpsync from %s:%d", nread, ip, port); +#endif + uv_mutex_lock(&recvRingBuffLock); + recvRingBuff->put(buf->base, nread); + uv_mutex_unlock(&recvRingBuffLock); } free(buf->base); @@ -169,6 +160,7 @@ void startRecvLoop(void* arg) { int uvCloseRet = uv_loop_close(l); CCLOG("UDP recv loop is closed in UvRecvThread, uvCloseRet=%d", uvCloseRet); + uv_mutex_destroy(&recvRingBuffLock); } void startSendLoop(void* arg) { @@ -196,6 +188,10 @@ int initSendLoop(struct sockaddr const* pUdpAddr) { } uv_mutex_init(&sendRingBuffLock); sendRingBuff = new SendRingBuff(maxBuffedMsgs); + + uv_mutex_init(&recvRingBuffLock); + recvRingBuff = new RecvRingBuff(maxBuffedMsgs); + uv_async_init(sendLoop, &uvSendLoopStopSig, _onUvStopSig); uv_async_init(sendLoop, &uvSendLoopTriggerSig, _onUvSthNewToSend); @@ -222,6 +218,12 @@ bool DelayNoMore::UdpSession::openUdpSession(int port) { struct sockaddr_in udpAddr; uv_ip4_addr("0.0.0.0", port, &udpAddr); struct sockaddr const* pUdpAddr = (struct sockaddr const*)&udpAddr; + + memset(peerPunchedMarks, false, sizeof(peerPunchedMarks)); + for (int i = 0; i < maxPeerCnt; i++) { + peerAddrList[i].authKey = -1; // hardcoded for now + memset((char*)&peerAddrList[i].sockAddrIn, 0, sizeof(peerAddrList[i].sockAddrIn)); + } /* [WARNING] On Android, the libuv documentation of "UV_UDP_REUSEADDR" is true, i.e. only the socket that binds later on the same port will be triggered the recv callback; however on Windows, experiment shows that the exact reverse is true instead. @@ -259,11 +261,8 @@ bool DelayNoMore::UdpSession::closeUdpSession() { uv_thread_join(&recvTid); free(udpRecvSocket); free(recvLoop); + delete recvRingBuff; - for (int i = 0; i < maxPeerCnt; i++) { - peerAddrList[i].authKey = -1; // hardcoded for now - memset((char*)&peerAddrList[i].sockAddrIn, 0, sizeof(peerAddrList[i].sockAddrIn)); - } CCLOG("Closed udp session and dealloc all resources in GameThread..."); return true; @@ -321,8 +320,7 @@ bool DelayNoMore::UdpSession::broadcastInputFrameUpsync(BYTEC* const bytes, size uv_mutex_lock(&sendRingBuffLock); // Might want to send several times for better arrival rate for (int j = 0; j < broadcastUpsyncCnt; j++) { - // Send to room udp tunnel in case of hole punching failure - sendRingBuff->put(bytes, bytesLen, &udpTunnelAddr); + int peerPunchedCnt = 0; for (int i = 0; i < roomCapacity; i++) { if (i + 1 == selfJoinIndex) { continue; @@ -331,8 +329,17 @@ bool DelayNoMore::UdpSession::broadcastInputFrameUpsync(BYTEC* const bytes, size // Peer addr not initialized continue; } + if (false == peerPunchedMarks[i]) { + // Not punched yet, save some bandwidth + continue; + } + sendRingBuff->put(bytes, bytesLen, &(peerAddrList[i])); + ++peerPunchedCnt; + } - sendRingBuff->put(bytes, bytesLen, &(peerAddrList[i])); // Content hardcoded for now + if (peerPunchedCnt + 1 < roomCapacity) { + // Send to room udp tunnel in case of ANY hole punching failure + sendRingBuff->put(bytes, bytesLen, &udpTunnelAddr); } } @@ -341,3 +348,40 @@ bool DelayNoMore::UdpSession::broadcastInputFrameUpsync(BYTEC* const bytes, size return true; } + +bool DelayNoMore::UdpSession::pollUdpRecvRingBuff() { + // This function is called by GameThread 60 fps. + + if (0 >= recvRingBuff->cnt) { + // This check is NOT thread-safe, but as "pollUdpRecvRingBuff" is called by GameThread, we want it to lock as few as possible. + return true; + } + + uv_mutex_lock(&recvRingBuffLock); + RecvWork* f = NULL; + while (0 < recvRingBuff->cnt) { + f = recvRingBuff->pop(); + // [WARNING] Declaring "AutoHandleScope" is critical here, otherwise "onUdpMessageCb.toObject()" wouldn't be recognized as a function of the ScriptEngine! + se::AutoHandleScope hs; + // [WARNING] Use of the "ScriptEngine" is only allowed in "GameThread a.k.a. CocosThread"! + se::Value onUdpMessageCb; + se::ScriptEngine::getInstance()->getGlobalObject()->getProperty("onUdpMessage", &onUdpMessageCb); + if (onUdpMessageCb.isObject() && onUdpMessageCb.toObject()->isFunction()) { + //CCLOG("UDP received %d bytes upsync -- 1", nread); + se::Object* const gameThreadMsg = se::Object::createTypedArray(se::Object::TypedArrayType::UINT8, f->ui8Arr, f->bytesLen); + //CCLOG("UDP received %d bytes upsync -- 2", nread); + se::ValueArray args = { se::Value(gameThreadMsg) }; + + // Temporarily assume that the "this" ptr within callback is NULL. + bool ok = onUdpMessageCb.toObject()->call(args, NULL); + if (!ok) { + se::ScriptEngine::getInstance()->clearException(); + } + //CCLOG("UDP received %d bytes upsync -- 3", nread); + gameThreadMsg->decRef(); // Reference http://docs.cocos.com/creator/2.2/manual/en/advanced-topics/JSB2.0-learning.html#seobject + //CCLOG("UDP received %d bytes upsync -- 4", nread); + } + } + uv_mutex_unlock(&recvRingBuffLock); + return true; +} \ No newline at end of file diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.hpp b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.hpp index b073791..f9e2042 100644 --- a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.hpp +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.hpp @@ -14,6 +14,7 @@ namespace DelayNoMore { //static bool clearPeerUDPAddrList(); static bool punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes, size_t bytesLen, int const udpTunnelSrvPort, BYTEC* const udpTunnelBytes, size_t udpTunnelBytesBytesLen); static bool broadcastInputFrameUpsync(BYTEC* const bytes, size_t bytesLen, int roomCapacity, int selfJoinIndex); + static bool pollUdpRecvRingBuff(); }; } #endif diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.cpp b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.cpp index 998fb49..3ece0a0 100644 --- a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.cpp +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.cpp @@ -126,6 +126,20 @@ bool upsertPeerUdpAddr(se::State& s) { } SE_BIND_FUNC(upsertPeerUdpAddr) +bool pollUdpRecvRingBuff(se::State& s) { + const auto& args = s.args(); + size_t argc = args.size(); + CC_UNUSED bool ok = true; + if (0 == argc) { + SE_PRECONDITION2(ok, false, "upsertPeerUdpAddr: Error processing arguments"); + return DelayNoMore::UdpSession::pollUdpRecvRingBuff(); + } + SE_REPORT_ERROR("wrong number of arguments: %d, was expecting %d; or wrong arg type!", (int)argc, 0); + + return false; +} +SE_BIND_FUNC(pollUdpRecvRingBuff) + static bool udpSessionFinalize(se::State& s) { CCLOGINFO("jsbindings: finalizing JS object %p (DelayNoMore::UdpSession)", s.nativeThisObject()); auto iter = se::NonRefNativePtrCreatedByCtorMap::find(s.nativeThisObject()); @@ -158,6 +172,7 @@ bool registerUdpSession(se::Object* obj) { cls->defineStaticFunction("broadcastInputFrameUpsync", _SE(broadcastInputFrameUpsync)); cls->defineStaticFunction("closeUdpSession", _SE(closeUdpSession)); cls->defineStaticFunction("upsertPeerUdpAddr", _SE(upsertPeerUdpAddr)); + cls->defineStaticFunction("pollUdpRecvRingBuff", _SE(pollUdpRecvRingBuff)); cls->defineFinalizeFunction(_SE(udpSessionFinalize)); cls->install(); diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.hpp b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.hpp index b7c92d9..e0f3da1 100644 --- a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.hpp +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.hpp @@ -15,5 +15,5 @@ SE_DECLARE_FUNC(punchToServer); SE_DECLARE_FUNC(broadcastInputFrameUpsync); SE_DECLARE_FUNC(closeUdpSession); SE_DECLARE_FUNC(upsertPeerUdpAddr); - +SE_DECLARE_FUNC(pollUdpRecvRingBuff); #endif