Minor update.

This commit is contained in:
genxium 2023-01-28 12:55:22 +08:00
parent bbf07fe518
commit 5863f88435

View File

@ -6,6 +6,7 @@
uv_udp_t* udpSocket = NULL; uv_udp_t* udpSocket = NULL;
uv_thread_t recvTid; uv_thread_t recvTid;
uv_timer_t peerPunchTimer;
uv_async_t uvLoopStopSig; uv_async_t uvLoopStopSig;
uv_loop_t* loop = NULL; // Only this loop is used for this simple PoC uv_loop_t* loop = NULL; // Only this loop is used for this simple PoC
@ -21,6 +22,7 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr
free(buf->base); free(buf->base);
return; return;
} }
CCLOG("UDP received %d bytes", nread);
if (NULL != addr) { if (NULL != addr) {
// The null check for "addr" is necessary, on Android there'd be such mysterious call to "_onRead"! // The null check for "addr" is necessary, on Android there'd be such mysterious call to "_onRead"!
switch (addr->sa_family) { switch (addr->sa_family) {
@ -119,15 +121,31 @@ class PunchPeerWork {
public: public:
int roomCapacity; int roomCapacity;
int selfJoinIndex; int selfJoinIndex;
int naiveRefCnt;
PunchPeerWork(int newRoomCapacity, int newSelfJoinIndex) { PunchPeerWork(int newRoomCapacity, int newSelfJoinIndex) {
this->roomCapacity = newRoomCapacity; this->roomCapacity = newRoomCapacity;
this->selfJoinIndex = newSelfJoinIndex; this->selfJoinIndex = newSelfJoinIndex;
this->naiveRefCnt = 0;
}
void refInc() {
++this->naiveRefCnt;
}
void refDecAndDelIfZero() {
--this->naiveRefCnt;
if (0 >= this->naiveRefCnt) {
delete this;
}
}
virtual ~PunchPeerWork() {
CCLOG("PunchPeerWork instance deleted...");
} }
}; };
void _punchPeerOnUvThread(uv_work_t* wrapper) { void _punchPeerOnUvThreadDelayed(uv_timer_t* timer, int status) {
PunchPeerWork* work = (PunchPeerWork*)wrapper->data; //CCLOG("_punchPeerOnUvThreadDelayed started...");
PunchPeerWork* work = (PunchPeerWork*)timer->data;
int roomCapacity = work->roomCapacity; int roomCapacity = work->roomCapacity;
int selfJoinIndex = work->selfJoinIndex; int selfJoinIndex = work->selfJoinIndex;
for (int i = 0; i < roomCapacity; i++) { for (int i = 0; i < roomCapacity; i++) {
if (i + 1 == selfJoinIndex) { if (i + 1 == selfJoinIndex) {
continue; continue;
@ -136,21 +154,37 @@ void _punchPeerOnUvThread(uv_work_t* wrapper) {
// Peer addr not initialized // Peer addr not initialized
continue; continue;
} }
//CCLOG("UDP about to punch peer joinIndex:%d", i);
char peerIp[17] = { 0 }; char peerIp[17] = { 0 };
uv_ip4_name((struct sockaddr_in*)&(peerAddrList[i].sockAddrIn), peerIp, sizeof peerIp); uv_ip4_name((struct sockaddr_in*)&(peerAddrList[i].sockAddrIn), peerIp, sizeof peerIp);
for (int j = 0; j < 3; j++) {
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
uv_buf_t sendBuffer = uv_buf_init("foobar", 6); // hardcoded for now uv_buf_t sendBuffer = uv_buf_init("foobar", 6); // hardcoded for now
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&peerAddrList[i], _onSend); uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&peerAddrList[i], _onSend);
CCLOG("UDP punched peer %s:%d by 6 bytes round-%d", peerIp, ntohs(peerAddrList[i].sockAddrIn.sin_port), j); CCLOG("UDP punched peer %s:%d by 6 bytes", peerIp, ntohs(peerAddrList[i].sockAddrIn.sin_port));
} }
uv_timer_stop(timer);
free(timer);
//CCLOG("_punchPeerOnUvThreadDelayed stopped...");
work->refDecAndDelIfZero();
} }
} int const punchPeerCnt = 10;
void _afterPunchPeer(uv_work_t* wrapper, int status) { void _startPunchPeerTimerOnUvThread(uv_work_t* wrapper) {
CCLOG("UDP send about to free PunchPeerWork for status:%d...", status);
PunchPeerWork* work = (PunchPeerWork*)wrapper->data; PunchPeerWork* work = (PunchPeerWork*)wrapper->data;
delete work; int roomCapacity = work->roomCapacity;
CCLOG("UDP send freed PunchPeerWork for status:%d...", status); int selfJoinIndex = work->selfJoinIndex;
for (int j = 0; j < punchPeerCnt; j++) {
work->refInc();
}
for (int j = 0; j < punchPeerCnt; j++) {
uv_timer_t* punchTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t)); // I don't think libuv timer is safe to be called from GameThread, thus calling it within UvThread here
uv_timer_init(loop, punchTimer);
punchTimer->data = work;
uv_timer_start(punchTimer, (uv_timer_cb)&_punchPeerOnUvThreadDelayed, j * 500, 0);
}
}
void _afterPunchPeerTimerStarted(uv_work_t* wrapper, int status) {
// RAM of PunchPeerWork handled by "naiveRefCnt"
} }
class BroadcastInputFrameUpsyncWork { class BroadcastInputFrameUpsyncWork {
@ -285,7 +319,7 @@ bool DelayNoMore::UdpSession::upsertPeerUdpAddr(struct PeerAddr* newPeerAddrList
PunchPeerWork* work = new PunchPeerWork(roomCapacity, selfJoinIndex); PunchPeerWork* work = new PunchPeerWork(roomCapacity, selfJoinIndex);
uv_work_t* wrapper = (uv_work_t*)malloc(sizeof(uv_work_t)); uv_work_t* wrapper = (uv_work_t*)malloc(sizeof(uv_work_t));
wrapper->data = work; wrapper->data = work;
uv_queue_work(loop, wrapper, _punchPeerOnUvThread, _afterPunchPeer); uv_queue_work(loop, wrapper, _startPunchPeerTimerOnUvThread, _afterPunchPeerTimerStarted);
return true; return true;
} }