From 70a86c27b0ebe62692939c86384d645402453a96 Mon Sep 17 00:00:00 2001 From: genxium Date: Thu, 2 Feb 2023 19:16:25 +0800 Subject: [PATCH] Enhancement for libuv thread safety. --- .../jsb-link/cocos-project-template.json | 3 + .../runtime-src/Classes/send_ring_buff.hpp | 69 +++ .../runtime-src/Classes/udp_session.cpp | 392 +++++++----------- .../runtime-src/Classes/udp_session.hpp | 11 +- .../Classes/udp_session_bridge.cpp | 6 +- .../proj.win32/DelayNoMore.vcxproj | 3 +- .../proj.win32/DelayNoMore.vcxproj.filters | 5 +- 7 files changed, 237 insertions(+), 252 deletions(-) create mode 100644 frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/send_ring_buff.hpp diff --git a/frontend/build-templates/jsb-link/cocos-project-template.json b/frontend/build-templates/jsb-link/cocos-project-template.json index 183cbd0..8744f8a 100644 --- a/frontend/build-templates/jsb-link/cocos-project-template.json +++ b/frontend/build-templates/jsb-link/cocos-project-template.json @@ -17,6 +17,9 @@ "append_file": [{ "from": "cocos/scripting/js-bindings/manual/jsb_module_register.cpp", "to": "frameworks/runtime-src/Classes/jsb_module_register.cpp" + }, { + "from": "frameworks/runtime-src/Classes/send_ring_buff.hpp", + "to": "frameworks/runtime-src/Classes/send_ring_buff.hpp" }, { "from": "frameworks/runtime-src/Classes/udp_session.hpp", "to": "frameworks/runtime-src/Classes/udp_session.hpp" diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/send_ring_buff.hpp b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/send_ring_buff.hpp new file mode 100644 index 0000000..81c3854 --- /dev/null +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/send_ring_buff.hpp @@ -0,0 +1,69 @@ +#ifndef send_ring_buff_hpp +#define send_ring_buff_hpp + +#include "uv/uv.h" +#define __SSIZE_T // Otherwise "ssize_t" would have conflicting macros error that stops compiling + +int const RING_BUFF_CONSECUTIVE_SET = 0; +int const RING_BUFF_NON_CONSECUTIVE_SET = 1; +int const RING_BUFF_FAILED_TO_SET = 2; + +typedef char BYTEC; +typedef char const CHARC; +int const maxUdpPayloadBytes = 128; +int const maxBuffedMsgs = 512; + +struct PeerAddr { + struct sockaddr_in sockAddrIn; + uint32_t authKey; +}; + +class SendWork { +public: + BYTEC bytes[maxUdpPayloadBytes]; // Wasting some RAM here thus no need for explicit recursive destruction + size_t bytesLen; + PeerAddr peerAddr; +}; + +// [WARNING] This class is specific to "SendWork", designed and implemented only to use in multithreading env and save heap alloc/dealloc timecomplexity, it's by no means comparable to the Golang or JavaScript versions! +class SendRingBuffer { +public: + int ed, st, n, cnt; + SendWork eles[maxBuffedMsgs]; // preallocated on stack to save heap alloc/dealloc time + SendRingBuffer(int newN) { + this->n = newN; + this->st = this->ed = this->cnt = 0; + } + + void put(BYTEC* const newBytes, size_t newBytesLen, PeerAddr* pNewPeerAddr) { + while (0 < cnt && cnt >= n) { + // Make room for the new element + this->pop(); + } + eles[ed].bytesLen = newBytesLen; + memset(eles[ed].bytes, 0, sizeof eles[ed].bytes); + memcpy(eles[ed].bytes, newBytes, newBytesLen); + eles[ed].peerAddr = *(pNewPeerAddr); + ed++; + cnt++; + if (ed >= n) { + ed -= n; // Deliberately not using "%" operator for performance concern + } + } + + // Sending is always sequential in UvSendThread, no need to return a copy of "SendWork" instance + SendWork* pop() { + if (0 == cnt) { + return NULL; + } + SendWork* ret = &(eles[st]); + cnt--; + st++; + if (st >= n) { + st -= n; + } + return ret; + } +}; + +#endif \ No newline at end of file diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp index 4c3e02c..c2c60b1 100644 --- a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp @@ -4,18 +4,23 @@ #include "cocos/base/CCScheduler.h" #include "cocos/scripting/js-bindings/jswrapper/SeApi.h" -uv_udp_t* udpSocket = NULL; -uv_thread_t recvTid; -uv_timer_t peerPunchTimer; -uv_async_t uvLoopStopSig; -uv_loop_t* loop = NULL; // Only this loop is used for this simple PoC +int const punchServerCnt = 3; +int const punchPeerCnt = 3; +int const broadcastUpsyncCnt = 3; -struct PeerAddr peerAddrList[maxPeerCnt]; +uv_udp_t *udpRecvSocket = NULL, *udpSendSocket = NULL; +uv_thread_t recvTid, sendTid; +uv_async_t uvRecvLoopStopSig, uvSendLoopStopSig, uvSendLoopTriggerSig; +uv_loop_t *recvLoop = NULL, *sendLoop = NULL; + +uv_mutex_t sendRingBuffLock; // used along with "uvSendLoopTriggerSig" as a "uv_cond_t" +SendRingBuffer* sendRingBuff = NULL; char SRV_IP[256]; int SRV_PORT = 0; int UDP_TUNNEL_SRV_PORT = 0; -struct PeerAddr udpTunnelAddr; +struct PeerAddr udpPunchingServerAddr, udpTunnelAddr; +struct PeerAddr peerAddrList[maxPeerCnt]; void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr const* addr, unsigned flags) { if (nread < 0) { @@ -24,6 +29,7 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr free(buf->base); return; } +#if defined(COCOS2D_DEBUG) && (COCOS2D_DEBUG > 0) char ip[INET_ADDRSTRLEN]; memset(ip, 0, sizeof ip); int port = 0; @@ -44,6 +50,7 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr } else { CCLOG("UDP received %d bytes from unknown sender", nread); } +#endif if (6 == nread) { // holepunching @@ -75,8 +82,7 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr gameThreadMsg->decRef(); // Reference http://docs.cocos.com/creator/2.2/manual/en/advanced-topics/JSB2.0-learning.html#seobject //CCLOG("UDP received %d bytes upsync -- 4", nread); free(ui8Arr); - CCLOG("UDP received %d bytes upsync -- 5", nread); - + //CCLOG("UDP received %d bytes upsync -- 5", nread); }); } free(buf->base); @@ -95,198 +101,57 @@ static void _allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b } void _onUvStopSig(uv_async_t* handle) { - uv_stop(loop); - CCLOG("UDP recv loop is signaled to stop in UvThread"); + if (!handle) return; + uv_stop(handle->loop); + CCLOG("UDP loop %p is signaled to stop in UvXxxxThread", handle->loop); } -void _onSend(uv_udp_send_t* req, int status) { - CCLOG("UDP send about to free req for status:%d...", status); - free(req); // No need to free "req->base", it'll be handled in each "_afterXxx" callback - CCLOG("UDP send freed req for status:%d...", status); +void _afterSend(uv_udp_send_t* req, int status) { + if (req) { + free(req); + } if (status) { CCLOGERROR("uv_udp_send_cb error: %s\n", uv_strerror(status)); } } -void _onUvTimerClosed(uv_handle_t* timer) { - free(timer); -} - -int const punchServerCnt = 3; -class PunchServerWork { -public: - BYTEC bytes[maxUdpPayloadBytes]; // Wasting some RAM here thus no need for explicit recursive destruction - size_t bytesLen; - - BYTEC udpTunnelBytes[maxUdpPayloadBytes]; - size_t udpTunnelBytesLen; - - PunchServerWork(BYTEC* const newBytes, size_t newBytesLen, BYTEC* const newUdpTunnelBytes, size_t newUdpTunnelBytesLen) { - memset(this->bytes, 0, sizeof(this->bytes)); - memcpy(this->bytes, newBytes, newBytesLen); - - this->bytesLen = newBytesLen; - - memset(this->udpTunnelBytes, 0, sizeof(this->udpTunnelBytes)); - memcpy(this->udpTunnelBytes, newUdpTunnelBytes, newUdpTunnelBytesLen); - - this->udpTunnelBytesLen = newUdpTunnelBytesLen; - } -}; -void _punchServerOnUvThread(uv_work_t* wrapper) { - PunchServerWork* work = (PunchServerWork*)wrapper->data; - for (int i = 0; i < punchServerCnt; i++) { - uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); - uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen); - struct sockaddr_in destAddr; - uv_ip4_addr(SRV_IP, SRV_PORT, &destAddr); - uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&destAddr, _onSend); - - uv_udp_send_t* udpTunnelReq = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); - uv_buf_t udpTunnelSendBuffer = uv_buf_init(work->udpTunnelBytes, work->udpTunnelBytesLen); - struct sockaddr_in udpTunnelDestAddr; - uv_ip4_addr(SRV_IP, UDP_TUNNEL_SRV_PORT, &udpTunnelDestAddr); - udpTunnelAddr.sockAddrIn = udpTunnelDestAddr; - uv_udp_send(udpTunnelReq, udpSocket, &udpTunnelSendBuffer, 1, (struct sockaddr const*)&udpTunnelDestAddr, _onSend); - } -} -void _afterPunchServer(uv_work_t* wrapper, int status) { - CCLOG("UDP send about to free PunchServerWork for status:%d...", status); - PunchServerWork* work = (PunchServerWork*)wrapper->data; - delete work; - CCLOG("UDP freed PunchServerWork for status:%d...", status); -} - -class PunchPeerWork { -public: - int roomCapacity; - int selfJoinIndex; - int naiveRefCnt; - PunchPeerWork(int newRoomCapacity, int newSelfJoinIndex) { - this->roomCapacity = newRoomCapacity; - this->selfJoinIndex = newSelfJoinIndex; - this->naiveRefCnt = 0; - } - void refInc() { - ++this->naiveRefCnt; - } - void refDecAndDelIfZero() { - --this->naiveRefCnt; - if (0 >= this->naiveRefCnt) { - delete this; - } - } - virtual ~PunchPeerWork() { - CCLOG("PunchPeerWork instance deleted..."); - } -}; -void _punchPeerOnUvThreadDelayed(uv_timer_t* timer, int status) { - //CCLOG("_punchPeerOnUvThreadDelayed started..."); - PunchPeerWork* work = (PunchPeerWork*)timer->data; - int roomCapacity = work->roomCapacity; - int selfJoinIndex = work->selfJoinIndex; - - for (int i = 0; i < roomCapacity; i++) { - if (i + 1 == selfJoinIndex) { - continue; - } - if (0 == peerAddrList[i].sockAddrIn.sin_port) { - // Peer addr not initialized - continue; - } - //CCLOG("UDP about to punch peer joinIndex:%d", i); - char peerIp[17] = { 0 }; - uv_ip4_name((struct sockaddr_in*)&(peerAddrList[i].sockAddrIn), peerIp, sizeof peerIp); - int peerPortSt = ntohs(peerAddrList[i].sockAddrIn.sin_port); - int peerPortEd = ntohs(peerAddrList[i].sockAddrIn.sin_port) + 1; // Use tunnel of backend instead of sweeping ports blindly! - for (int peerPort = peerPortSt; peerPort < peerPortEd; peerPort++) { - if (0 > peerPort) continue; - uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); - uv_buf_t sendBuffer = uv_buf_init("foobar", 6); // hardcoded for now - struct sockaddr_in testPeerAddr; - uv_ip4_addr(peerIp, peerPort, &testPeerAddr); - uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&testPeerAddr, _onSend); - CCLOG("UDP punched peer %s:%d by 6 bytes", peerIp, peerPort); - } - } - uv_timer_stop(timer); - uv_close((uv_handle_t*)timer, _onUvTimerClosed); - //CCLOG("_punchPeerOnUvThreadDelayed stopped..."); - work->refDecAndDelIfZero(); -} -int const punchPeerCnt = 3; -void _startPunchPeerTimerOnUvThread(uv_work_t* wrapper) { - PunchPeerWork* work = (PunchPeerWork*)wrapper->data; - int roomCapacity = work->roomCapacity; - int selfJoinIndex = work->selfJoinIndex; - - for (int j = 0; j < punchPeerCnt; j++) { - work->refInc(); - } - for (int j = 0; j < punchPeerCnt; j++) { - uv_timer_t* punchTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t)); // I don't think libuv timer is safe to be called from GameThread, thus calling it within UvThread here - uv_timer_init(loop, punchTimer); - punchTimer->data = work; - uv_timer_start(punchTimer, (uv_timer_cb)&_punchPeerOnUvThreadDelayed, j * 500, 0); - } -} -void _afterPunchPeerTimerStarted(uv_work_t* wrapper, int status) { - // RAM of PunchPeerWork handled by "naiveRefCnt" -} - -class BroadcastInputFrameUpsyncWork { -public: - BYTEC bytes[maxUdpPayloadBytes]; // Wasting some RAM here thus no need for explicit recursive destruction - size_t bytesLen; - int roomCapacity; - int selfJoinIndex; - - BroadcastInputFrameUpsyncWork(BYTEC* const newBytes, size_t newBytesLen, int newRoomCapacity, int newSelfJoinIndex) { - memset(this->bytes, 0, sizeof(this->bytes)); - memcpy(this->bytes, newBytes, newBytesLen); - - this->bytesLen = newBytesLen; - - this->roomCapacity = newRoomCapacity; - this->selfJoinIndex = newSelfJoinIndex; - } -}; -int const broadcastUpsyncCnt = 1; -void _broadcastInputFrameUpsyncOnUvThread(uv_work_t* wrapper) { - BroadcastInputFrameUpsyncWork* work = (BroadcastInputFrameUpsyncWork*)wrapper->data; - int roomCapacity = work->roomCapacity; - int selfJoinIndex = work->selfJoinIndex; - // Send to room udp tunnel in case of hole punching failure - for (int j = 0; j < broadcastUpsyncCnt; j++) { - uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); - uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen); - uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&(udpTunnelAddr.sockAddrIn), _onSend); - CCLOG("UDP sent upsync to udp tunnel %s:%d by %u bytes round-%d", SRV_IP, UDP_TUNNEL_SRV_PORT, work->bytesLen, j); - } +void _onUvSthNewToSend(uv_async_t* handle) { - for (int i = 0; i < roomCapacity; i++) { - if (i + 1 == selfJoinIndex) { - continue; + bool hasNext = true; + while (NULL != handle && true == hasNext) { + SendWork* work = NULL; + uv_mutex_lock(&sendRingBuffLock); + work = sendRingBuff->pop(); + + if (NULL == work) { + hasNext = false; } - if (0 == peerAddrList[i].sockAddrIn.sin_port) { - // Peer addr not initialized - continue; - } - char peerIp[17] = { 0 }; - uv_ip4_name((struct sockaddr_in*)&(peerAddrList[i].sockAddrIn), peerIp, sizeof peerIp); - // Might want to send several times for better arrival rate - for (int j = 0; j < broadcastUpsyncCnt; j++) { - uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); + /* + [WARNING] The following "uv_udp_try_send" might block I / O for a long time, hence unlock "as soon as possible" to avoid blocking the "GameThread" which is awaiting to acquire this mutex! + + There's a very small chance where "sendRingBuff->put(...)" could contaminate the just popped "work" in "sendRingBuff->eles", thus "sendRingBuff->n" is made quite large to avoid that, moreover in terms of protecting "work" we're also unlocking "as late as possible"! + */ + uv_mutex_unlock(&sendRingBuffLock); + if (NULL != work) { + /* + // [WARNING] If "uv_udp_send" is to be used instead of "uv_udp_try_send", as UvSendThread will always be terminated from GameThread, it's a MUST to use the following heap-alloc form to initialize "uv_udp_send_t* req" such that "_afterSend" is guaranteed to be called, otherwise "int uvRunRet2 = uv_run(l, UV_RUN_DEFAULT);" for UvSendThread would block forever due to residual active handles. + + uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof uv_udp_send_t); uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen); - uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&(peerAddrList[i].sockAddrIn), _onSend); - CCLOG("UDP broadcasted upsync to peer %s:%d by %u bytes round-%d", peerIp, ntohs(peerAddrList[i].sockAddrIn.sin_port), work->bytesLen, j); + uv_udp_send(req, udpSendSocket, &sendBuffer, 1, (struct sockaddr const*)&(work->peerAddr.sockAddrIn), _afterSend); + */ + + uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen); + uv_udp_try_send(udpSendSocket, &sendBuffer, 1, (struct sockaddr const*)&(work->peerAddr.sockAddrIn)); +#if defined(COCOS2D_DEBUG) && (COCOS2D_DEBUG > 0) + char ip[INET_ADDRSTRLEN]; + memset(ip, 0, sizeof ip); + uv_inet_ntop(work->peerAddr.sockAddrIn.sin_family, &(work->peerAddr.sockAddrIn.sin_addr), ip, INET_ADDRSTRLEN); + int port = ntohs(work->peerAddr.sockAddrIn.sin_port); + //CCLOG("UDP sent %d bytes to %s:%d", sendBuffer.len, ip, port); +#endif } } - -} -void _afterBroadcastInputFrameUpsync(uv_work_t* wrapper, int status) { - BroadcastInputFrameUpsyncWork* work = (BroadcastInputFrameUpsyncWork*)wrapper->data; - delete work; } void _onWalkCleanup(uv_handle_t* handle, void* data) { @@ -295,38 +160,67 @@ void _onWalkCleanup(uv_handle_t* handle, void* data) { } void startRecvLoop(void* arg) { + uv_udp_recv_start(udpRecvSocket, _allocBuffer, _onRead); + uv_loop_t* l = (uv_loop_t*)arg; int uvRunRet1 = uv_run(l, UV_RUN_DEFAULT); - CCLOG("UDP recv loop is ended in UvThread, uvRunRet1=%d", uvRunRet1); + CCLOG("UDP recv loop is ended in UvRecvThread, uvRunRet1=%d", uvRunRet1); uv_walk(l, _onWalkCleanup, NULL); + CCLOG("UDP recv loop is walked in UvRecvThread"); int uvRunRet2 = uv_run(l, UV_RUN_DEFAULT); + CCLOG("UDP recv loop is run after walking in UvRecvThread, uvRunRet2=%d", uvRunRet2); int uvCloseRet = uv_loop_close(l); - CCLOG("UDP recv loop is closed in UvThread, uvRunRet2=%d, uvCloseRet=%d", uvRunRet2, uvCloseRet); + CCLOG("UDP recv loop is closed in UvRecvThread, uvCloseRet=%d", uvCloseRet); +} + +void startSendLoop(void* arg) { + uv_loop_t* l = (uv_loop_t*)arg; + int uvRunRet1 = uv_run(l, UV_RUN_DEFAULT); + CCLOG("UDP send loop is ended in UvSendThread, uvRunRet1=%d", uvRunRet1); + uv_walk(l, _onWalkCleanup, NULL); + CCLOG("UDP send loop is walked in UvSendThread"); + int uvRunRet2 = uv_run(l, UV_RUN_DEFAULT); + CCLOG("UDP send loop is run after walking in UvSendThread, uvRunRet2=%d", uvRunRet2); + + int uvCloseRet = uv_loop_close(l); + CCLOG("UDP send loop is closed in UvSendThread, uvCloseRet=%d", uvCloseRet); + uv_mutex_destroy(&sendRingBuffLock); } bool DelayNoMore::UdpSession::openUdpSession(int port) { - loop = uv_loop_new(); - udpSocket = (uv_udp_t*)malloc(sizeof(uv_udp_t)); + recvLoop = uv_loop_new(); + udpRecvSocket = (uv_udp_t*)malloc(sizeof(uv_udp_t)); - int sockInitRes = uv_udp_init(loop, udpSocket); // "uv_udp_init" must precede that of "uv_udp_bind" for successful binding! + int recvSockInitRes = uv_udp_init(recvLoop, udpRecvSocket); // "uv_udp_init" must precede that of "uv_udp_bind" for successful binding! - struct sockaddr_in recv_addr; - uv_ip4_addr("0.0.0.0", port, &recv_addr); - int bindRes = uv_udp_bind(udpSocket, (struct sockaddr const*)&recv_addr, UV_UDP_REUSEADDR); - if (0 != bindRes) { - CCLOGERROR("Failed to bind port=%d; bind result=%d, reason=%s", port, bindRes, uv_strerror(bindRes)); + struct sockaddr_in udpAddr; + uv_ip4_addr("0.0.0.0", port, &udpAddr); + int bindRes1 = uv_udp_bind(udpRecvSocket, (struct sockaddr const*)&udpAddr, UV_UDP_REUSEADDR); + if (0 != bindRes1) { + CCLOGERROR("Failed to bind recv on port=%d; result=%d, reason=%s", port, bindRes1, uv_strerror(bindRes1)); exit(-1); } - uv_async_init(loop, &uvLoopStopSig, _onUvStopSig); + sendLoop = uv_loop_new(); + udpSendSocket = (uv_udp_t*)malloc(sizeof(uv_udp_t)); + int sendSockInitRes = uv_udp_init(sendLoop, udpSendSocket); // "uv_udp_init" must precede that of "uv_udp_bind" for successful binding! + int bindRes2 = uv_udp_bind(udpSendSocket, (struct sockaddr const*)&udpAddr, UV_UDP_REUSEADDR); + if (0 != bindRes2) { + CCLOGERROR("Failed to bind send on port=%d; result=%d, reason=%s", port, bindRes2, uv_strerror(bindRes2)); + exit(-1); + } - CCLOG("About to open UDP session at port=%d; bind result=%d, sock init result=%d...", port, bindRes, sockInitRes); + uv_async_init(recvLoop, &uvRecvLoopStopSig, _onUvStopSig); + uv_mutex_init(&sendRingBuffLock); + sendRingBuff = new SendRingBuffer(maxBuffedMsgs); + uv_async_init(sendLoop, &uvSendLoopStopSig, _onUvStopSig); + uv_async_init(sendLoop, &uvSendLoopTriggerSig, _onUvSthNewToSend); - uv_udp_recv_start(udpSocket, _allocBuffer, _onRead); + CCLOG("About to open UDP session at port=%d; bindRes1=%d, bindRes2=%d; recvSockInitRes=%d, sendSocketInitRes=%d; recvLoop=%p, sendLoop=%p...", port, bindRes1, bindRes2, recvSockInitRes, sendSockInitRes, recvLoop, sendLoop); - // TODO: Currently "sending" is also done in the "receiving loop thread", shall I segregate it to another dedicated thread? - uv_thread_create(&recvTid, startRecvLoop, loop); + uv_thread_create(&recvTid, startRecvLoop, recvLoop); + uv_thread_create(&sendTid, startSendLoop, sendLoop); CCLOG("Finished opening UDP session at port=%d", port); @@ -336,71 +230,97 @@ bool DelayNoMore::UdpSession::openUdpSession(int port) { bool DelayNoMore::UdpSession::closeUdpSession() { CCLOG("About to close udp session and dealloc all resources..."); + uv_async_send(&uvSendLoopStopSig); + CCLOG("Signaling UvSendThread to end in GameThread..."); + uv_thread_join(&sendTid); + free(udpSendSocket); + free(sendLoop); + delete sendRingBuff; + + uv_async_send(&uvRecvLoopStopSig); // The few if not only guaranteed thread safe utility of libuv :) See http://docs.libuv.org/en/v1.x/async.html#c.uv_async_send + CCLOG("Signaling UvRecvThread to end in GameThread..."); + uv_thread_join(&recvTid); + free(udpRecvSocket); + free(recvLoop); + for (int i = 0; i < maxPeerCnt; i++) { peerAddrList[i].authKey = -1; // hardcoded for now memset((char*)&peerAddrList[i].sockAddrIn, 0, sizeof(peerAddrList[i].sockAddrIn)); } - uv_async_send(&uvLoopStopSig); // The few if not only guaranteed thread safe utility of libuv :) See http://docs.libuv.org/en/v1.x/async.html#c.uv_async_send - CCLOG("Signaling UvThread to end in GameThread..."); - - uv_thread_join(&recvTid); - - free(udpSocket); - free(loop); - CCLOG("Closed udp session and dealloc all resources in GameThread..."); return true; } bool DelayNoMore::UdpSession::punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes, size_t bytesLen, int const udpTunnelSrvPort, BYTEC* const udpTunnelBytes, size_t udpTunnelBytesBytesLen) { - /* - [WARNING] The RAM space used for "bytes", either on stack or in heap, is preallocatedand managed by the caller which runs on the GameThread. Actual sending will be made on UvThread. - - Therefore we make a copy of this message before dispatching it "GameThread -> UvThread". - */ memset(SRV_IP, 0, sizeof SRV_IP); memcpy(SRV_IP, srvIp, strlen(srvIp)); SRV_PORT = srvPort; UDP_TUNNEL_SRV_PORT = udpTunnelSrvPort; - PunchServerWork* work = new PunchServerWork(bytes, bytesLen, udpTunnelBytes, udpTunnelBytesBytesLen); + + struct sockaddr_in udpPunchingServerDestAddr; + uv_ip4_addr(SRV_IP, SRV_PORT, &udpPunchingServerDestAddr); + udpPunchingServerAddr.sockAddrIn = udpPunchingServerDestAddr; + + struct sockaddr_in udpTunnelDestAddr; + uv_ip4_addr(SRV_IP, UDP_TUNNEL_SRV_PORT, &udpTunnelDestAddr); + udpTunnelAddr.sockAddrIn = udpTunnelDestAddr; /* - TODO: Libuv is really inconvenient here, neither "uv_queue_work" nor "uv_async_init" is threadsafe(http ://docs.libuv.org/en/v1.x/threadpool.html#c.uv_queue_work)! What's the point of such a queue? It's even more difficult than writing my own implementation -- again a threadsafe RingBuff could be used to the rescue, yet I'd like to investigate more into how to make the following threadsafe APIs with minimum cross-platform C++ codes + Libuv is really inconvenient here, neither "uv_queue_work" nor "uv_async_init" is threadsafe(http ://docs.libuv.org/en/v1.x/threadpool.html#c.uv_queue_work)! What's the point of such a queue? It's even more difficult than writing my own implementation -- again a threadsafe RingBuff could be used to the rescue, yet I'd like to investigate more into how to make the following threadsafe APIs with minimum cross-platform C++ codes - _sendMessage(...), should be both non-blocking & threadsafe, called from GameThread - - _onRead(...), should be called first in UvThread in an edge-triggered manner like idiomatic "epoll" or "kqueue", then dispatch the received message to GameThread by a threadsafe RingBuff + - _onRead(...), should be called first in UvRecvThread in an edge-triggered manner like idiomatic "epoll" or "kqueue", then dispatch the received message to GameThread by a threadsafe RingBuff */ - uv_work_t* wrapper = (uv_work_t*)malloc(sizeof(uv_work_t)); - wrapper->data = work; - - uv_queue_work(loop, wrapper, _punchServerOnUvThread, _afterPunchServer); + + uv_mutex_lock(&sendRingBuffLock); + sendRingBuff->put(bytes, bytesLen, &udpPunchingServerAddr); + sendRingBuff->put(udpTunnelBytes, udpTunnelBytesBytesLen, &udpTunnelAddr); + uv_mutex_unlock(&sendRingBuffLock); + uv_async_send(&uvSendLoopTriggerSig); return true; } bool DelayNoMore::UdpSession::upsertPeerUdpAddr(struct PeerAddr* newPeerAddrList, int roomCapacity, int selfJoinIndex) { + // Call timer for multiple sendings from JavaScript? CCLOG("upsertPeerUdpAddr called by js for roomCapacity=%d, selfJoinIndex=%d.", roomCapacity, selfJoinIndex); - // Punching between existing peer-pairs for Address/Port-restricted Cone NAT (not need for Full Cone NAT); UvThread never writes into "peerAddrList", so I assume that it's safe to skip locking for them + uv_mutex_lock(&sendRingBuffLock); for (int i = 0; i < roomCapacity; i++) { if (i == selfJoinIndex - 1) continue; - peerAddrList[i].sockAddrIn = (*(newPeerAddrList + i)).sockAddrIn; - peerAddrList[i].authKey = (*(newPeerAddrList + i)).authKey; + struct PeerAddr* cand = (newPeerAddrList + i); + if (NULL == cand || 0 == cand->sockAddrIn.sin_port) continue; // Not initialized + peerAddrList[i].sockAddrIn = cand->sockAddrIn; + peerAddrList[i].authKey = cand->authKey; + sendRingBuff->put("foobar", 6, &(peerAddrList[i])); // Content hardcoded for now } - - PunchPeerWork* work = new PunchPeerWork(roomCapacity, selfJoinIndex); - uv_work_t* wrapper = (uv_work_t*)malloc(sizeof(uv_work_t)); - wrapper->data = work; - uv_queue_work(loop, wrapper, _startPunchPeerTimerOnUvThread, _afterPunchPeerTimerStarted); + uv_mutex_unlock(&sendRingBuffLock); + uv_async_send(&uvSendLoopTriggerSig); return true; } bool DelayNoMore::UdpSession::broadcastInputFrameUpsync(BYTEC* const bytes, size_t bytesLen, int roomCapacity, int selfJoinIndex) { - BroadcastInputFrameUpsyncWork* work = new BroadcastInputFrameUpsyncWork(bytes, bytesLen, roomCapacity, selfJoinIndex); - uv_work_t* wrapper = (uv_work_t*)malloc(sizeof(uv_work_t)); - wrapper->data = work; - uv_queue_work(loop, wrapper, _broadcastInputFrameUpsyncOnUvThread, _afterBroadcastInputFrameUpsync); + uv_mutex_lock(&sendRingBuffLock); + // Might want to send several times for better arrival rate + for (int j = 0; j < broadcastUpsyncCnt; j++) { + // Send to room udp tunnel in case of hole punching failure + sendRingBuff->put(bytes, bytesLen, &udpTunnelAddr); + for (int i = 0; i < roomCapacity; i++) { + if (i + 1 == selfJoinIndex) { + continue; + } + if (0 == peerAddrList[i].sockAddrIn.sin_port) { + // Peer addr not initialized + continue; + } + + sendRingBuff->put(bytes, bytesLen, &(peerAddrList[i])); // Content hardcoded for now + } + } + + uv_mutex_unlock(&sendRingBuffLock); + uv_async_send(&uvSendLoopTriggerSig); return true; } diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.hpp b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.hpp index b0c6441..b073791 100644 --- a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.hpp +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.hpp @@ -1,18 +1,9 @@ -#include "uv/uv.h" -#define __SSIZE_T // Otherwise "ssize_t" would have conflicting macros error that stops compiling - #ifndef udp_session_hpp #define udp_session_hpp -typedef char BYTEC; -typedef char const CHARC; -int const maxUdpPayloadBytes = 128; +#include "send_ring_buff.hpp" int const maxPeerCnt = 10; -struct PeerAddr { - struct sockaddr_in sockAddrIn; - uint32_t authKey; -}; namespace DelayNoMore { class UdpSession { diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.cpp b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.cpp index da214da..998fb49 100644 --- a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.cpp +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session_bridge.cpp @@ -126,8 +126,7 @@ bool upsertPeerUdpAddr(se::State& s) { } SE_BIND_FUNC(upsertPeerUdpAddr) -static bool udpSessionFinalize(se::State& s) -{ +static bool udpSessionFinalize(se::State& s) { CCLOGINFO("jsbindings: finalizing JS object %p (DelayNoMore::UdpSession)", s.nativeThisObject()); auto iter = se::NonRefNativePtrCreatedByCtorMap::find(s.nativeThisObject()); if (iter != se::NonRefNativePtrCreatedByCtorMap::end()) { @@ -141,8 +140,7 @@ SE_BIND_FINALIZE_FUNC(udpSessionFinalize) se::Object* __jsb_udp_session_proto = nullptr; se::Class* __jsb_udp_session_class = nullptr; -bool registerUdpSession(se::Object* obj) -{ +bool registerUdpSession(se::Object* obj) { // Get the ns se::Value nsVal; if (!obj->getProperty("DelayNoMore", &nsVal)) diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/proj.win32/DelayNoMore.vcxproj b/frontend/build-templates/jsb-link/frameworks/runtime-src/proj.win32/DelayNoMore.vcxproj index 3256f14..dbadfd3 100644 --- a/frontend/build-templates/jsb-link/frameworks/runtime-src/proj.win32/DelayNoMore.vcxproj +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/proj.win32/DelayNoMore.vcxproj @@ -193,6 +193,7 @@ copy "$(ProjectDir)..\..\..\project.json" "$(OutDir)\" /Y + @@ -207,4 +208,4 @@ copy "$(ProjectDir)..\..\..\project.json" "$(OutDir)\" /Y - \ No newline at end of file + diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/proj.win32/DelayNoMore.vcxproj.filters b/frontend/build-templates/jsb-link/frameworks/runtime-src/proj.win32/DelayNoMore.vcxproj.filters index af517e9..a09c1ea 100644 --- a/frontend/build-templates/jsb-link/frameworks/runtime-src/proj.win32/DelayNoMore.vcxproj.filters +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/proj.win32/DelayNoMore.vcxproj.filters @@ -37,6 +37,9 @@ win32 + + Classes + Classes @@ -54,4 +57,4 @@ resource - \ No newline at end of file +