Enhancement for libuv thread safety.

This commit is contained in:
genxium 2023-02-02 19:16:25 +08:00
parent b0f37d2237
commit 70a86c27b0
7 changed files with 237 additions and 252 deletions

View File

@ -17,6 +17,9 @@
"append_file": [{ "append_file": [{
"from": "cocos/scripting/js-bindings/manual/jsb_module_register.cpp", "from": "cocos/scripting/js-bindings/manual/jsb_module_register.cpp",
"to": "frameworks/runtime-src/Classes/jsb_module_register.cpp" "to": "frameworks/runtime-src/Classes/jsb_module_register.cpp"
}, {
"from": "frameworks/runtime-src/Classes/send_ring_buff.hpp",
"to": "frameworks/runtime-src/Classes/send_ring_buff.hpp"
}, { }, {
"from": "frameworks/runtime-src/Classes/udp_session.hpp", "from": "frameworks/runtime-src/Classes/udp_session.hpp",
"to": "frameworks/runtime-src/Classes/udp_session.hpp" "to": "frameworks/runtime-src/Classes/udp_session.hpp"

View File

@ -0,0 +1,69 @@
#ifndef send_ring_buff_hpp
#define send_ring_buff_hpp
#include "uv/uv.h"
#define __SSIZE_T // Otherwise "ssize_t" would have conflicting macros error that stops compiling
int const RING_BUFF_CONSECUTIVE_SET = 0;
int const RING_BUFF_NON_CONSECUTIVE_SET = 1;
int const RING_BUFF_FAILED_TO_SET = 2;
typedef char BYTEC;
typedef char const CHARC;
int const maxUdpPayloadBytes = 128;
int const maxBuffedMsgs = 512;
struct PeerAddr {
struct sockaddr_in sockAddrIn;
uint32_t authKey;
};
class SendWork {
public:
BYTEC bytes[maxUdpPayloadBytes]; // Wasting some RAM here thus no need for explicit recursive destruction
size_t bytesLen;
PeerAddr peerAddr;
};
// [WARNING] This class is specific to "SendWork", designed and implemented only to use in multithreading env and save heap alloc/dealloc timecomplexity, it's by no means comparable to the Golang or JavaScript versions!
class SendRingBuffer {
public:
int ed, st, n, cnt;
SendWork eles[maxBuffedMsgs]; // preallocated on stack to save heap alloc/dealloc time
SendRingBuffer(int newN) {
this->n = newN;
this->st = this->ed = this->cnt = 0;
}
void put(BYTEC* const newBytes, size_t newBytesLen, PeerAddr* pNewPeerAddr) {
while (0 < cnt && cnt >= n) {
// Make room for the new element
this->pop();
}
eles[ed].bytesLen = newBytesLen;
memset(eles[ed].bytes, 0, sizeof eles[ed].bytes);
memcpy(eles[ed].bytes, newBytes, newBytesLen);
eles[ed].peerAddr = *(pNewPeerAddr);
ed++;
cnt++;
if (ed >= n) {
ed -= n; // Deliberately not using "%" operator for performance concern
}
}
// Sending is always sequential in UvSendThread, no need to return a copy of "SendWork" instance
SendWork* pop() {
if (0 == cnt) {
return NULL;
}
SendWork* ret = &(eles[st]);
cnt--;
st++;
if (st >= n) {
st -= n;
}
return ret;
}
};
#endif

View File

@ -4,18 +4,23 @@
#include "cocos/base/CCScheduler.h" #include "cocos/base/CCScheduler.h"
#include "cocos/scripting/js-bindings/jswrapper/SeApi.h" #include "cocos/scripting/js-bindings/jswrapper/SeApi.h"
uv_udp_t* udpSocket = NULL; int const punchServerCnt = 3;
uv_thread_t recvTid; int const punchPeerCnt = 3;
uv_timer_t peerPunchTimer; int const broadcastUpsyncCnt = 3;
uv_async_t uvLoopStopSig;
uv_loop_t* loop = NULL; // Only this loop is used for this simple PoC
struct PeerAddr peerAddrList[maxPeerCnt]; uv_udp_t *udpRecvSocket = NULL, *udpSendSocket = NULL;
uv_thread_t recvTid, sendTid;
uv_async_t uvRecvLoopStopSig, uvSendLoopStopSig, uvSendLoopTriggerSig;
uv_loop_t *recvLoop = NULL, *sendLoop = NULL;
uv_mutex_t sendRingBuffLock; // used along with "uvSendLoopTriggerSig" as a "uv_cond_t"
SendRingBuffer* sendRingBuff = NULL;
char SRV_IP[256]; char SRV_IP[256];
int SRV_PORT = 0; int SRV_PORT = 0;
int UDP_TUNNEL_SRV_PORT = 0; int UDP_TUNNEL_SRV_PORT = 0;
struct PeerAddr udpTunnelAddr; struct PeerAddr udpPunchingServerAddr, udpTunnelAddr;
struct PeerAddr peerAddrList[maxPeerCnt];
void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr const* addr, unsigned flags) { void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr const* addr, unsigned flags) {
if (nread < 0) { if (nread < 0) {
@ -24,6 +29,7 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr
free(buf->base); free(buf->base);
return; return;
} }
#if defined(COCOS2D_DEBUG) && (COCOS2D_DEBUG > 0)
char ip[INET_ADDRSTRLEN]; char ip[INET_ADDRSTRLEN];
memset(ip, 0, sizeof ip); memset(ip, 0, sizeof ip);
int port = 0; int port = 0;
@ -44,6 +50,7 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr
} else { } else {
CCLOG("UDP received %d bytes from unknown sender", nread); CCLOG("UDP received %d bytes from unknown sender", nread);
} }
#endif
if (6 == nread) { if (6 == nread) {
// holepunching // holepunching
@ -75,8 +82,7 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr
gameThreadMsg->decRef(); // Reference http://docs.cocos.com/creator/2.2/manual/en/advanced-topics/JSB2.0-learning.html#seobject gameThreadMsg->decRef(); // Reference http://docs.cocos.com/creator/2.2/manual/en/advanced-topics/JSB2.0-learning.html#seobject
//CCLOG("UDP received %d bytes upsync -- 4", nread); //CCLOG("UDP received %d bytes upsync -- 4", nread);
free(ui8Arr); free(ui8Arr);
CCLOG("UDP received %d bytes upsync -- 5", nread); //CCLOG("UDP received %d bytes upsync -- 5", nread);
}); });
} }
free(buf->base); free(buf->base);
@ -95,198 +101,57 @@ static void _allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
} }
void _onUvStopSig(uv_async_t* handle) { void _onUvStopSig(uv_async_t* handle) {
uv_stop(loop); if (!handle) return;
CCLOG("UDP recv loop is signaled to stop in UvThread"); uv_stop(handle->loop);
CCLOG("UDP loop %p is signaled to stop in UvXxxxThread", handle->loop);
} }
void _onSend(uv_udp_send_t* req, int status) { void _afterSend(uv_udp_send_t* req, int status) {
CCLOG("UDP send about to free req for status:%d...", status); if (req) {
free(req); // No need to free "req->base", it'll be handled in each "_afterXxx" callback free(req);
CCLOG("UDP send freed req for status:%d...", status); }
if (status) { if (status) {
CCLOGERROR("uv_udp_send_cb error: %s\n", uv_strerror(status)); CCLOGERROR("uv_udp_send_cb error: %s\n", uv_strerror(status));
} }
} }
void _onUvTimerClosed(uv_handle_t* timer) { void _onUvSthNewToSend(uv_async_t* handle) {
free(timer);
}
int const punchServerCnt = 3; bool hasNext = true;
class PunchServerWork { while (NULL != handle && true == hasNext) {
public: SendWork* work = NULL;
BYTEC bytes[maxUdpPayloadBytes]; // Wasting some RAM here thus no need for explicit recursive destruction uv_mutex_lock(&sendRingBuffLock);
size_t bytesLen; work = sendRingBuff->pop();
BYTEC udpTunnelBytes[maxUdpPayloadBytes]; if (NULL == work) {
size_t udpTunnelBytesLen; hasNext = false;
PunchServerWork(BYTEC* const newBytes, size_t newBytesLen, BYTEC* const newUdpTunnelBytes, size_t newUdpTunnelBytesLen) {
memset(this->bytes, 0, sizeof(this->bytes));
memcpy(this->bytes, newBytes, newBytesLen);
this->bytesLen = newBytesLen;
memset(this->udpTunnelBytes, 0, sizeof(this->udpTunnelBytes));
memcpy(this->udpTunnelBytes, newUdpTunnelBytes, newUdpTunnelBytesLen);
this->udpTunnelBytesLen = newUdpTunnelBytesLen;
}
};
void _punchServerOnUvThread(uv_work_t* wrapper) {
PunchServerWork* work = (PunchServerWork*)wrapper->data;
for (int i = 0; i < punchServerCnt; i++) {
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen);
struct sockaddr_in destAddr;
uv_ip4_addr(SRV_IP, SRV_PORT, &destAddr);
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&destAddr, _onSend);
uv_udp_send_t* udpTunnelReq = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
uv_buf_t udpTunnelSendBuffer = uv_buf_init(work->udpTunnelBytes, work->udpTunnelBytesLen);
struct sockaddr_in udpTunnelDestAddr;
uv_ip4_addr(SRV_IP, UDP_TUNNEL_SRV_PORT, &udpTunnelDestAddr);
udpTunnelAddr.sockAddrIn = udpTunnelDestAddr;
uv_udp_send(udpTunnelReq, udpSocket, &udpTunnelSendBuffer, 1, (struct sockaddr const*)&udpTunnelDestAddr, _onSend);
}
}
void _afterPunchServer(uv_work_t* wrapper, int status) {
CCLOG("UDP send about to free PunchServerWork for status:%d...", status);
PunchServerWork* work = (PunchServerWork*)wrapper->data;
delete work;
CCLOG("UDP freed PunchServerWork for status:%d...", status);
}
class PunchPeerWork {
public:
int roomCapacity;
int selfJoinIndex;
int naiveRefCnt;
PunchPeerWork(int newRoomCapacity, int newSelfJoinIndex) {
this->roomCapacity = newRoomCapacity;
this->selfJoinIndex = newSelfJoinIndex;
this->naiveRefCnt = 0;
}
void refInc() {
++this->naiveRefCnt;
}
void refDecAndDelIfZero() {
--this->naiveRefCnt;
if (0 >= this->naiveRefCnt) {
delete this;
} }
} /*
virtual ~PunchPeerWork() { [WARNING] The following "uv_udp_try_send" might block I / O for a long time, hence unlock "as soon as possible" to avoid blocking the "GameThread" which is awaiting to acquire this mutex!
CCLOG("PunchPeerWork instance deleted...");
}
};
void _punchPeerOnUvThreadDelayed(uv_timer_t* timer, int status) {
//CCLOG("_punchPeerOnUvThreadDelayed started...");
PunchPeerWork* work = (PunchPeerWork*)timer->data;
int roomCapacity = work->roomCapacity;
int selfJoinIndex = work->selfJoinIndex;
for (int i = 0; i < roomCapacity; i++) { There's a very small chance where "sendRingBuff->put(...)" could contaminate the just popped "work" in "sendRingBuff->eles", thus "sendRingBuff->n" is made quite large to avoid that, moreover in terms of protecting "work" we're also unlocking "as late as possible"!
if (i + 1 == selfJoinIndex) { */
continue; uv_mutex_unlock(&sendRingBuffLock);
} if (NULL != work) {
if (0 == peerAddrList[i].sockAddrIn.sin_port) { /*
// Peer addr not initialized // [WARNING] If "uv_udp_send" is to be used instead of "uv_udp_try_send", as UvSendThread will always be terminated from GameThread, it's a MUST to use the following heap-alloc form to initialize "uv_udp_send_t* req" such that "_afterSend" is guaranteed to be called, otherwise "int uvRunRet2 = uv_run(l, UV_RUN_DEFAULT);" for UvSendThread would block forever due to residual active handles.
continue;
}
//CCLOG("UDP about to punch peer joinIndex:%d", i);
char peerIp[17] = { 0 };
uv_ip4_name((struct sockaddr_in*)&(peerAddrList[i].sockAddrIn), peerIp, sizeof peerIp);
int peerPortSt = ntohs(peerAddrList[i].sockAddrIn.sin_port);
int peerPortEd = ntohs(peerAddrList[i].sockAddrIn.sin_port) + 1; // Use tunnel of backend instead of sweeping ports blindly!
for (int peerPort = peerPortSt; peerPort < peerPortEd; peerPort++) {
if (0 > peerPort) continue;
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
uv_buf_t sendBuffer = uv_buf_init("foobar", 6); // hardcoded for now
struct sockaddr_in testPeerAddr;
uv_ip4_addr(peerIp, peerPort, &testPeerAddr);
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&testPeerAddr, _onSend);
CCLOG("UDP punched peer %s:%d by 6 bytes", peerIp, peerPort);
}
}
uv_timer_stop(timer);
uv_close((uv_handle_t*)timer, _onUvTimerClosed);
//CCLOG("_punchPeerOnUvThreadDelayed stopped...");
work->refDecAndDelIfZero();
}
int const punchPeerCnt = 3;
void _startPunchPeerTimerOnUvThread(uv_work_t* wrapper) {
PunchPeerWork* work = (PunchPeerWork*)wrapper->data;
int roomCapacity = work->roomCapacity;
int selfJoinIndex = work->selfJoinIndex;
for (int j = 0; j < punchPeerCnt; j++) { uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof uv_udp_send_t);
work->refInc();
}
for (int j = 0; j < punchPeerCnt; j++) {
uv_timer_t* punchTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t)); // I don't think libuv timer is safe to be called from GameThread, thus calling it within UvThread here
uv_timer_init(loop, punchTimer);
punchTimer->data = work;
uv_timer_start(punchTimer, (uv_timer_cb)&_punchPeerOnUvThreadDelayed, j * 500, 0);
}
}
void _afterPunchPeerTimerStarted(uv_work_t* wrapper, int status) {
// RAM of PunchPeerWork handled by "naiveRefCnt"
}
class BroadcastInputFrameUpsyncWork {
public:
BYTEC bytes[maxUdpPayloadBytes]; // Wasting some RAM here thus no need for explicit recursive destruction
size_t bytesLen;
int roomCapacity;
int selfJoinIndex;
BroadcastInputFrameUpsyncWork(BYTEC* const newBytes, size_t newBytesLen, int newRoomCapacity, int newSelfJoinIndex) {
memset(this->bytes, 0, sizeof(this->bytes));
memcpy(this->bytes, newBytes, newBytesLen);
this->bytesLen = newBytesLen;
this->roomCapacity = newRoomCapacity;
this->selfJoinIndex = newSelfJoinIndex;
}
};
int const broadcastUpsyncCnt = 1;
void _broadcastInputFrameUpsyncOnUvThread(uv_work_t* wrapper) {
BroadcastInputFrameUpsyncWork* work = (BroadcastInputFrameUpsyncWork*)wrapper->data;
int roomCapacity = work->roomCapacity;
int selfJoinIndex = work->selfJoinIndex;
// Send to room udp tunnel in case of hole punching failure
for (int j = 0; j < broadcastUpsyncCnt; j++) {
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen);
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&(udpTunnelAddr.sockAddrIn), _onSend);
CCLOG("UDP sent upsync to udp tunnel %s:%d by %u bytes round-%d", SRV_IP, UDP_TUNNEL_SRV_PORT, work->bytesLen, j);
}
for (int i = 0; i < roomCapacity; i++) {
if (i + 1 == selfJoinIndex) {
continue;
}
if (0 == peerAddrList[i].sockAddrIn.sin_port) {
// Peer addr not initialized
continue;
}
char peerIp[17] = { 0 };
uv_ip4_name((struct sockaddr_in*)&(peerAddrList[i].sockAddrIn), peerIp, sizeof peerIp);
// Might want to send several times for better arrival rate
for (int j = 0; j < broadcastUpsyncCnt; j++) {
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen); uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen);
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&(peerAddrList[i].sockAddrIn), _onSend); uv_udp_send(req, udpSendSocket, &sendBuffer, 1, (struct sockaddr const*)&(work->peerAddr.sockAddrIn), _afterSend);
CCLOG("UDP broadcasted upsync to peer %s:%d by %u bytes round-%d", peerIp, ntohs(peerAddrList[i].sockAddrIn.sin_port), work->bytesLen, j); */
uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen);
uv_udp_try_send(udpSendSocket, &sendBuffer, 1, (struct sockaddr const*)&(work->peerAddr.sockAddrIn));
#if defined(COCOS2D_DEBUG) && (COCOS2D_DEBUG > 0)
char ip[INET_ADDRSTRLEN];
memset(ip, 0, sizeof ip);
uv_inet_ntop(work->peerAddr.sockAddrIn.sin_family, &(work->peerAddr.sockAddrIn.sin_addr), ip, INET_ADDRSTRLEN);
int port = ntohs(work->peerAddr.sockAddrIn.sin_port);
//CCLOG("UDP sent %d bytes to %s:%d", sendBuffer.len, ip, port);
#endif
} }
} }
}
void _afterBroadcastInputFrameUpsync(uv_work_t* wrapper, int status) {
BroadcastInputFrameUpsyncWork* work = (BroadcastInputFrameUpsyncWork*)wrapper->data;
delete work;
} }
void _onWalkCleanup(uv_handle_t* handle, void* data) { void _onWalkCleanup(uv_handle_t* handle, void* data) {
@ -295,38 +160,67 @@ void _onWalkCleanup(uv_handle_t* handle, void* data) {
} }
void startRecvLoop(void* arg) { void startRecvLoop(void* arg) {
uv_udp_recv_start(udpRecvSocket, _allocBuffer, _onRead);
uv_loop_t* l = (uv_loop_t*)arg; uv_loop_t* l = (uv_loop_t*)arg;
int uvRunRet1 = uv_run(l, UV_RUN_DEFAULT); int uvRunRet1 = uv_run(l, UV_RUN_DEFAULT);
CCLOG("UDP recv loop is ended in UvThread, uvRunRet1=%d", uvRunRet1); CCLOG("UDP recv loop is ended in UvRecvThread, uvRunRet1=%d", uvRunRet1);
uv_walk(l, _onWalkCleanup, NULL); uv_walk(l, _onWalkCleanup, NULL);
CCLOG("UDP recv loop is walked in UvRecvThread");
int uvRunRet2 = uv_run(l, UV_RUN_DEFAULT); int uvRunRet2 = uv_run(l, UV_RUN_DEFAULT);
CCLOG("UDP recv loop is run after walking in UvRecvThread, uvRunRet2=%d", uvRunRet2);
int uvCloseRet = uv_loop_close(l); int uvCloseRet = uv_loop_close(l);
CCLOG("UDP recv loop is closed in UvThread, uvRunRet2=%d, uvCloseRet=%d", uvRunRet2, uvCloseRet); CCLOG("UDP recv loop is closed in UvRecvThread, uvCloseRet=%d", uvCloseRet);
}
void startSendLoop(void* arg) {
uv_loop_t* l = (uv_loop_t*)arg;
int uvRunRet1 = uv_run(l, UV_RUN_DEFAULT);
CCLOG("UDP send loop is ended in UvSendThread, uvRunRet1=%d", uvRunRet1);
uv_walk(l, _onWalkCleanup, NULL);
CCLOG("UDP send loop is walked in UvSendThread");
int uvRunRet2 = uv_run(l, UV_RUN_DEFAULT);
CCLOG("UDP send loop is run after walking in UvSendThread, uvRunRet2=%d", uvRunRet2);
int uvCloseRet = uv_loop_close(l);
CCLOG("UDP send loop is closed in UvSendThread, uvCloseRet=%d", uvCloseRet);
uv_mutex_destroy(&sendRingBuffLock);
} }
bool DelayNoMore::UdpSession::openUdpSession(int port) { bool DelayNoMore::UdpSession::openUdpSession(int port) {
loop = uv_loop_new(); recvLoop = uv_loop_new();
udpSocket = (uv_udp_t*)malloc(sizeof(uv_udp_t)); udpRecvSocket = (uv_udp_t*)malloc(sizeof(uv_udp_t));
int sockInitRes = uv_udp_init(loop, udpSocket); // "uv_udp_init" must precede that of "uv_udp_bind" for successful binding! int recvSockInitRes = uv_udp_init(recvLoop, udpRecvSocket); // "uv_udp_init" must precede that of "uv_udp_bind" for successful binding!
struct sockaddr_in recv_addr; struct sockaddr_in udpAddr;
uv_ip4_addr("0.0.0.0", port, &recv_addr); uv_ip4_addr("0.0.0.0", port, &udpAddr);
int bindRes = uv_udp_bind(udpSocket, (struct sockaddr const*)&recv_addr, UV_UDP_REUSEADDR); int bindRes1 = uv_udp_bind(udpRecvSocket, (struct sockaddr const*)&udpAddr, UV_UDP_REUSEADDR);
if (0 != bindRes) { if (0 != bindRes1) {
CCLOGERROR("Failed to bind port=%d; bind result=%d, reason=%s", port, bindRes, uv_strerror(bindRes)); CCLOGERROR("Failed to bind recv on port=%d; result=%d, reason=%s", port, bindRes1, uv_strerror(bindRes1));
exit(-1); exit(-1);
} }
uv_async_init(loop, &uvLoopStopSig, _onUvStopSig); sendLoop = uv_loop_new();
udpSendSocket = (uv_udp_t*)malloc(sizeof(uv_udp_t));
int sendSockInitRes = uv_udp_init(sendLoop, udpSendSocket); // "uv_udp_init" must precede that of "uv_udp_bind" for successful binding!
int bindRes2 = uv_udp_bind(udpSendSocket, (struct sockaddr const*)&udpAddr, UV_UDP_REUSEADDR);
if (0 != bindRes2) {
CCLOGERROR("Failed to bind send on port=%d; result=%d, reason=%s", port, bindRes2, uv_strerror(bindRes2));
exit(-1);
}
CCLOG("About to open UDP session at port=%d; bind result=%d, sock init result=%d...", port, bindRes, sockInitRes); uv_async_init(recvLoop, &uvRecvLoopStopSig, _onUvStopSig);
uv_mutex_init(&sendRingBuffLock);
sendRingBuff = new SendRingBuffer(maxBuffedMsgs);
uv_async_init(sendLoop, &uvSendLoopStopSig, _onUvStopSig);
uv_async_init(sendLoop, &uvSendLoopTriggerSig, _onUvSthNewToSend);
uv_udp_recv_start(udpSocket, _allocBuffer, _onRead); CCLOG("About to open UDP session at port=%d; bindRes1=%d, bindRes2=%d; recvSockInitRes=%d, sendSocketInitRes=%d; recvLoop=%p, sendLoop=%p...", port, bindRes1, bindRes2, recvSockInitRes, sendSockInitRes, recvLoop, sendLoop);
// TODO: Currently "sending" is also done in the "receiving loop thread", shall I segregate it to another dedicated thread? uv_thread_create(&recvTid, startRecvLoop, recvLoop);
uv_thread_create(&recvTid, startRecvLoop, loop); uv_thread_create(&sendTid, startSendLoop, sendLoop);
CCLOG("Finished opening UDP session at port=%d", port); CCLOG("Finished opening UDP session at port=%d", port);
@ -336,71 +230,97 @@ bool DelayNoMore::UdpSession::openUdpSession(int port) {
bool DelayNoMore::UdpSession::closeUdpSession() { bool DelayNoMore::UdpSession::closeUdpSession() {
CCLOG("About to close udp session and dealloc all resources..."); CCLOG("About to close udp session and dealloc all resources...");
uv_async_send(&uvSendLoopStopSig);
CCLOG("Signaling UvSendThread to end in GameThread...");
uv_thread_join(&sendTid);
free(udpSendSocket);
free(sendLoop);
delete sendRingBuff;
uv_async_send(&uvRecvLoopStopSig); // The few if not only guaranteed thread safe utility of libuv :) See http://docs.libuv.org/en/v1.x/async.html#c.uv_async_send
CCLOG("Signaling UvRecvThread to end in GameThread...");
uv_thread_join(&recvTid);
free(udpRecvSocket);
free(recvLoop);
for (int i = 0; i < maxPeerCnt; i++) { for (int i = 0; i < maxPeerCnt; i++) {
peerAddrList[i].authKey = -1; // hardcoded for now peerAddrList[i].authKey = -1; // hardcoded for now
memset((char*)&peerAddrList[i].sockAddrIn, 0, sizeof(peerAddrList[i].sockAddrIn)); memset((char*)&peerAddrList[i].sockAddrIn, 0, sizeof(peerAddrList[i].sockAddrIn));
} }
uv_async_send(&uvLoopStopSig); // The few if not only guaranteed thread safe utility of libuv :) See http://docs.libuv.org/en/v1.x/async.html#c.uv_async_send
CCLOG("Signaling UvThread to end in GameThread...");
uv_thread_join(&recvTid);
free(udpSocket);
free(loop);
CCLOG("Closed udp session and dealloc all resources in GameThread..."); CCLOG("Closed udp session and dealloc all resources in GameThread...");
return true; return true;
} }
bool DelayNoMore::UdpSession::punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes, size_t bytesLen, int const udpTunnelSrvPort, BYTEC* const udpTunnelBytes, size_t udpTunnelBytesBytesLen) { bool DelayNoMore::UdpSession::punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes, size_t bytesLen, int const udpTunnelSrvPort, BYTEC* const udpTunnelBytes, size_t udpTunnelBytesBytesLen) {
/*
[WARNING] The RAM space used for "bytes", either on stack or in heap, is preallocatedand managed by the caller which runs on the GameThread. Actual sending will be made on UvThread.
Therefore we make a copy of this message before dispatching it "GameThread -> UvThread".
*/
memset(SRV_IP, 0, sizeof SRV_IP); memset(SRV_IP, 0, sizeof SRV_IP);
memcpy(SRV_IP, srvIp, strlen(srvIp)); memcpy(SRV_IP, srvIp, strlen(srvIp));
SRV_PORT = srvPort; SRV_PORT = srvPort;
UDP_TUNNEL_SRV_PORT = udpTunnelSrvPort; UDP_TUNNEL_SRV_PORT = udpTunnelSrvPort;
PunchServerWork* work = new PunchServerWork(bytes, bytesLen, udpTunnelBytes, udpTunnelBytesBytesLen);
struct sockaddr_in udpPunchingServerDestAddr;
uv_ip4_addr(SRV_IP, SRV_PORT, &udpPunchingServerDestAddr);
udpPunchingServerAddr.sockAddrIn = udpPunchingServerDestAddr;
struct sockaddr_in udpTunnelDestAddr;
uv_ip4_addr(SRV_IP, UDP_TUNNEL_SRV_PORT, &udpTunnelDestAddr);
udpTunnelAddr.sockAddrIn = udpTunnelDestAddr;
/* /*
TODO: Libuv is really inconvenient here, neither "uv_queue_work" nor "uv_async_init" is threadsafe(http ://docs.libuv.org/en/v1.x/threadpool.html#c.uv_queue_work)! What's the point of such a queue? It's even more difficult than writing my own implementation -- again a threadsafe RingBuff could be used to the rescue, yet I'd like to investigate more into how to make the following threadsafe APIs with minimum cross-platform C++ codes Libuv is really inconvenient here, neither "uv_queue_work" nor "uv_async_init" is threadsafe(http ://docs.libuv.org/en/v1.x/threadpool.html#c.uv_queue_work)! What's the point of such a queue? It's even more difficult than writing my own implementation -- again a threadsafe RingBuff could be used to the rescue, yet I'd like to investigate more into how to make the following threadsafe APIs with minimum cross-platform C++ codes
- _sendMessage(...), should be both non-blocking & threadsafe, called from GameThread - _sendMessage(...), should be both non-blocking & threadsafe, called from GameThread
- _onRead(...), should be called first in UvThread in an edge-triggered manner like idiomatic "epoll" or "kqueue", then dispatch the received message to GameThread by a threadsafe RingBuff - _onRead(...), should be called first in UvRecvThread in an edge-triggered manner like idiomatic "epoll" or "kqueue", then dispatch the received message to GameThread by a threadsafe RingBuff
*/ */
uv_work_t* wrapper = (uv_work_t*)malloc(sizeof(uv_work_t));
wrapper->data = work;
uv_queue_work(loop, wrapper, _punchServerOnUvThread, _afterPunchServer); uv_mutex_lock(&sendRingBuffLock);
sendRingBuff->put(bytes, bytesLen, &udpPunchingServerAddr);
sendRingBuff->put(udpTunnelBytes, udpTunnelBytesBytesLen, &udpTunnelAddr);
uv_mutex_unlock(&sendRingBuffLock);
uv_async_send(&uvSendLoopTriggerSig);
return true; return true;
} }
bool DelayNoMore::UdpSession::upsertPeerUdpAddr(struct PeerAddr* newPeerAddrList, int roomCapacity, int selfJoinIndex) { bool DelayNoMore::UdpSession::upsertPeerUdpAddr(struct PeerAddr* newPeerAddrList, int roomCapacity, int selfJoinIndex) {
// Call timer for multiple sendings from JavaScript?
CCLOG("upsertPeerUdpAddr called by js for roomCapacity=%d, selfJoinIndex=%d.", roomCapacity, selfJoinIndex); CCLOG("upsertPeerUdpAddr called by js for roomCapacity=%d, selfJoinIndex=%d.", roomCapacity, selfJoinIndex);
// Punching between existing peer-pairs for Address/Port-restricted Cone NAT (not need for Full Cone NAT); UvThread never writes into "peerAddrList", so I assume that it's safe to skip locking for them uv_mutex_lock(&sendRingBuffLock);
for (int i = 0; i < roomCapacity; i++) { for (int i = 0; i < roomCapacity; i++) {
if (i == selfJoinIndex - 1) continue; if (i == selfJoinIndex - 1) continue;
peerAddrList[i].sockAddrIn = (*(newPeerAddrList + i)).sockAddrIn; struct PeerAddr* cand = (newPeerAddrList + i);
peerAddrList[i].authKey = (*(newPeerAddrList + i)).authKey; if (NULL == cand || 0 == cand->sockAddrIn.sin_port) continue; // Not initialized
peerAddrList[i].sockAddrIn = cand->sockAddrIn;
peerAddrList[i].authKey = cand->authKey;
sendRingBuff->put("foobar", 6, &(peerAddrList[i])); // Content hardcoded for now
} }
uv_mutex_unlock(&sendRingBuffLock);
PunchPeerWork* work = new PunchPeerWork(roomCapacity, selfJoinIndex); uv_async_send(&uvSendLoopTriggerSig);
uv_work_t* wrapper = (uv_work_t*)malloc(sizeof(uv_work_t));
wrapper->data = work;
uv_queue_work(loop, wrapper, _startPunchPeerTimerOnUvThread, _afterPunchPeerTimerStarted);
return true; return true;
} }
bool DelayNoMore::UdpSession::broadcastInputFrameUpsync(BYTEC* const bytes, size_t bytesLen, int roomCapacity, int selfJoinIndex) { bool DelayNoMore::UdpSession::broadcastInputFrameUpsync(BYTEC* const bytes, size_t bytesLen, int roomCapacity, int selfJoinIndex) {
BroadcastInputFrameUpsyncWork* work = new BroadcastInputFrameUpsyncWork(bytes, bytesLen, roomCapacity, selfJoinIndex); uv_mutex_lock(&sendRingBuffLock);
uv_work_t* wrapper = (uv_work_t*)malloc(sizeof(uv_work_t)); // Might want to send several times for better arrival rate
wrapper->data = work; for (int j = 0; j < broadcastUpsyncCnt; j++) {
uv_queue_work(loop, wrapper, _broadcastInputFrameUpsyncOnUvThread, _afterBroadcastInputFrameUpsync); // Send to room udp tunnel in case of hole punching failure
sendRingBuff->put(bytes, bytesLen, &udpTunnelAddr);
for (int i = 0; i < roomCapacity; i++) {
if (i + 1 == selfJoinIndex) {
continue;
}
if (0 == peerAddrList[i].sockAddrIn.sin_port) {
// Peer addr not initialized
continue;
}
sendRingBuff->put(bytes, bytesLen, &(peerAddrList[i])); // Content hardcoded for now
}
}
uv_mutex_unlock(&sendRingBuffLock);
uv_async_send(&uvSendLoopTriggerSig);
return true; return true;
} }

View File

@ -1,18 +1,9 @@
#include "uv/uv.h"
#define __SSIZE_T // Otherwise "ssize_t" would have conflicting macros error that stops compiling
#ifndef udp_session_hpp #ifndef udp_session_hpp
#define udp_session_hpp #define udp_session_hpp
typedef char BYTEC; #include "send_ring_buff.hpp"
typedef char const CHARC;
int const maxUdpPayloadBytes = 128;
int const maxPeerCnt = 10; int const maxPeerCnt = 10;
struct PeerAddr {
struct sockaddr_in sockAddrIn;
uint32_t authKey;
};
namespace DelayNoMore { namespace DelayNoMore {
class UdpSession { class UdpSession {

View File

@ -126,8 +126,7 @@ bool upsertPeerUdpAddr(se::State& s) {
} }
SE_BIND_FUNC(upsertPeerUdpAddr) SE_BIND_FUNC(upsertPeerUdpAddr)
static bool udpSessionFinalize(se::State& s) static bool udpSessionFinalize(se::State& s) {
{
CCLOGINFO("jsbindings: finalizing JS object %p (DelayNoMore::UdpSession)", s.nativeThisObject()); CCLOGINFO("jsbindings: finalizing JS object %p (DelayNoMore::UdpSession)", s.nativeThisObject());
auto iter = se::NonRefNativePtrCreatedByCtorMap::find(s.nativeThisObject()); auto iter = se::NonRefNativePtrCreatedByCtorMap::find(s.nativeThisObject());
if (iter != se::NonRefNativePtrCreatedByCtorMap::end()) { if (iter != se::NonRefNativePtrCreatedByCtorMap::end()) {
@ -141,8 +140,7 @@ SE_BIND_FINALIZE_FUNC(udpSessionFinalize)
se::Object* __jsb_udp_session_proto = nullptr; se::Object* __jsb_udp_session_proto = nullptr;
se::Class* __jsb_udp_session_class = nullptr; se::Class* __jsb_udp_session_class = nullptr;
bool registerUdpSession(se::Object* obj) bool registerUdpSession(se::Object* obj) {
{
// Get the ns // Get the ns
se::Value nsVal; se::Value nsVal;
if (!obj->getProperty("DelayNoMore", &nsVal)) if (!obj->getProperty("DelayNoMore", &nsVal))

View File

@ -193,6 +193,7 @@ copy "$(ProjectDir)..\..\..\project.json" "$(OutDir)\" /Y</Command>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ClInclude Include="main.h" /> <ClInclude Include="main.h" />
<ClInclude Include="..\Classes\send_ring_buff.hpp" />
<ClInclude Include="..\Classes\udp_session.hpp" /> <ClInclude Include="..\Classes\udp_session.hpp" />
<ClInclude Include="..\Classes\udp_session_bridge.hpp" /> <ClInclude Include="..\Classes\udp_session_bridge.hpp" />
<ClInclude Include="..\Classes\AppDelegate.h" /> <ClInclude Include="..\Classes\AppDelegate.h" />

View File

@ -37,6 +37,9 @@
<Filter>win32</Filter> <Filter>win32</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="resource.h" /> <ClInclude Include="resource.h" />
<ClInclude Include="..\Classes\send_ring_buff.hpp">
<Filter>Classes</Filter>
</ClInclude>
<ClInclude Include="..\Classes\udp_session.hpp"> <ClInclude Include="..\Classes\udp_session.hpp">
<Filter>Classes</Filter> <Filter>Classes</Filter>
</ClInclude> </ClInclude>