From 5863f88435c5abe2ea10977546d879cc95ea57e1 Mon Sep 17 00:00:00 2001 From: genxium Date: Sat, 28 Jan 2023 12:55:22 +0800 Subject: [PATCH] Minor update. --- .../runtime-src/Classes/udp_session.cpp | 62 ++++++++++++++----- 1 file changed, 48 insertions(+), 14 deletions(-) diff --git a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp index 5865a74..26b2081 100644 --- a/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp +++ b/frontend/build-templates/jsb-link/frameworks/runtime-src/Classes/udp_session.cpp @@ -6,6 +6,7 @@ uv_udp_t* udpSocket = NULL; uv_thread_t recvTid; +uv_timer_t peerPunchTimer; uv_async_t uvLoopStopSig; uv_loop_t* loop = NULL; // Only this loop is used for this simple PoC @@ -21,6 +22,7 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr free(buf->base); return; } + CCLOG("UDP received %d bytes", nread); if (NULL != addr) { // The null check for "addr" is necessary, on Android there'd be such mysterious call to "_onRead"! switch (addr->sa_family) { @@ -119,15 +121,31 @@ class PunchPeerWork { public: int roomCapacity; int selfJoinIndex; + int naiveRefCnt; PunchPeerWork(int newRoomCapacity, int newSelfJoinIndex) { this->roomCapacity = newRoomCapacity; this->selfJoinIndex = newSelfJoinIndex; + this->naiveRefCnt = 0; + } + void refInc() { + ++this->naiveRefCnt; + } + void refDecAndDelIfZero() { + --this->naiveRefCnt; + if (0 >= this->naiveRefCnt) { + delete this; + } + } + virtual ~PunchPeerWork() { + CCLOG("PunchPeerWork instance deleted..."); } }; -void _punchPeerOnUvThread(uv_work_t* wrapper) { - PunchPeerWork* work = (PunchPeerWork*)wrapper->data; +void _punchPeerOnUvThreadDelayed(uv_timer_t* timer, int status) { + //CCLOG("_punchPeerOnUvThreadDelayed started..."); + PunchPeerWork* work = (PunchPeerWork*)timer->data; int roomCapacity = work->roomCapacity; int selfJoinIndex = work->selfJoinIndex; + for (int i = 0; i < roomCapacity; i++) { if (i + 1 == selfJoinIndex) { continue; @@ -136,21 +154,37 @@ void _punchPeerOnUvThread(uv_work_t* wrapper) { // Peer addr not initialized continue; } + //CCLOG("UDP about to punch peer joinIndex:%d", i); char peerIp[17] = { 0 }; uv_ip4_name((struct sockaddr_in*)&(peerAddrList[i].sockAddrIn), peerIp, sizeof peerIp); - for (int j = 0; j < 3; j++) { - uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); - uv_buf_t sendBuffer = uv_buf_init("foobar", 6); // hardcoded for now - uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&peerAddrList[i], _onSend); - CCLOG("UDP punched peer %s:%d by 6 bytes round-%d", peerIp, ntohs(peerAddrList[i].sockAddrIn.sin_port), j); - } + uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); + uv_buf_t sendBuffer = uv_buf_init("foobar", 6); // hardcoded for now + uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&peerAddrList[i], _onSend); + CCLOG("UDP punched peer %s:%d by 6 bytes", peerIp, ntohs(peerAddrList[i].sockAddrIn.sin_port)); + } + uv_timer_stop(timer); + free(timer); + //CCLOG("_punchPeerOnUvThreadDelayed stopped..."); + work->refDecAndDelIfZero(); +} +int const punchPeerCnt = 10; +void _startPunchPeerTimerOnUvThread(uv_work_t* wrapper) { + PunchPeerWork* work = (PunchPeerWork*)wrapper->data; + int roomCapacity = work->roomCapacity; + int selfJoinIndex = work->selfJoinIndex; + + for (int j = 0; j < punchPeerCnt; j++) { + work->refInc(); + } + for (int j = 0; j < punchPeerCnt; j++) { + uv_timer_t* punchTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t)); // I don't think libuv timer is safe to be called from GameThread, thus calling it within UvThread here + uv_timer_init(loop, punchTimer); + punchTimer->data = work; + uv_timer_start(punchTimer, (uv_timer_cb)&_punchPeerOnUvThreadDelayed, j * 500, 0); } } -void _afterPunchPeer(uv_work_t* wrapper, int status) { - CCLOG("UDP send about to free PunchPeerWork for status:%d...", status); - PunchPeerWork* work = (PunchPeerWork*)wrapper->data; - delete work; - CCLOG("UDP send freed PunchPeerWork for status:%d...", status); +void _afterPunchPeerTimerStarted(uv_work_t* wrapper, int status) { + // RAM of PunchPeerWork handled by "naiveRefCnt" } class BroadcastInputFrameUpsyncWork { @@ -285,7 +319,7 @@ bool DelayNoMore::UdpSession::upsertPeerUdpAddr(struct PeerAddr* newPeerAddrList PunchPeerWork* work = new PunchPeerWork(roomCapacity, selfJoinIndex); uv_work_t* wrapper = (uv_work_t*)malloc(sizeof(uv_work_t)); wrapper->data = work; - uv_queue_work(loop, wrapper, _punchPeerOnUvThread, _afterPunchPeer); + uv_queue_work(loop, wrapper, _startPunchPeerTimerOnUvThread, _afterPunchPeerTimerStarted); return true; }