Fixed libuv multithreading.

This commit is contained in:
genxium 2023-01-27 22:51:34 +08:00
parent e7bf6ec16b
commit 4097a8da75
7 changed files with 195 additions and 100 deletions

File diff suppressed because one or more lines are too long

Binary file not shown.

Before

Width:  |  Height:  |  Size: 476 KiB

After

Width:  |  Height:  |  Size: 472 KiB

View File

@ -162,7 +162,15 @@ cc.Class({
authKey: Math.floor(Math.random() * 65535), authKey: Math.floor(Math.random() * 65535),
}).finish() }).finish()
const res2 = DelayNoMore.UdpSession.punchToServer("127.0.0.1", 3000, holePunchData); const res2 = DelayNoMore.UdpSession.punchToServer("127.0.0.1", 3000, holePunchData);
const res3 = DelayNoMore.UdpSession.upsertPeerUdpAddr(self.selfPlayerInfo.JoinIndex, "192.168.31.194", 6789, 123456, 2, self.selfPlayerInfo.JoinIndex); const res3 = DelayNoMore.UdpSession.upsertPeerUdpAddr([window.pb.protos.PeerUdpAddr.create({
ip: "192.168.31.194",
port: 6789,
authKey: 123456,
}), window.pb.protos.PeerUdpAddr.create({
ip: "192.168.1.101",
port: 8771,
authKey: 654321,
})], 2, self.selfPlayerInfo.JoinIndex);
//const res4 = DelayNoMore.UdpSession.closeUdpSession(); //const res4 = DelayNoMore.UdpSession.closeUdpSession();
} }
self.onRoomDownsyncFrame(startRdf); self.onRoomDownsyncFrame(startRdf);

View File

@ -222,8 +222,7 @@ window.initPersistentSessionClient = function(onopenCb, expectedRoomId) {
const peerJoinIndex = resp.peerJoinIndex; const peerJoinIndex = resp.peerJoinIndex;
const peerAddrList = resp.rdf.peerUdpAddrList; const peerAddrList = resp.rdf.peerUdpAddrList;
console.log(`Got DOWNSYNC_MSG_ACT_PEER_UDP_ADDR peerAddrList=${JSON.stringify(peerAddrList)}; boundRoomCapacity=${window.boundRoomCapacity}`); console.log(`Got DOWNSYNC_MSG_ACT_PEER_UDP_ADDR peerAddrList=${JSON.stringify(peerAddrList)}; boundRoomCapacity=${window.boundRoomCapacity}`);
const peerAddr = peerAddrList[peerJoinIndex - 1]; DelayNoMore.UdpSession.upsertPeerUdpAddr(peerAddrList, window.boundRoomCapacity, window.mapIns.selfPlayerInfo.JoinIndex); // In C++ impl it actually broadcasts the peer-punching message to all known peers within "window.boundRoomCapacity"
DelayNoMore.UdpSession.upsertPeerUdpAddr(peerJoinIndex, peerAddr.ip, peerAddr.port, peerAddr.authKey, window.boundRoomCapacity, window.mapIns.selfPlayerInfo.JoinIndex); // In C++ impl it actually broadcasts the peer-punching message to all known peers within "window.boundRoomCapacity"
} }
break; break;
default: default:

View File

@ -3,23 +3,15 @@
#include "cocos/platform/CCApplication.h" #include "cocos/platform/CCApplication.h"
#include "cocos/base/CCScheduler.h" #include "cocos/base/CCScheduler.h"
#include "cocos/scripting/js-bindings/jswrapper/SeApi.h" #include "cocos/scripting/js-bindings/jswrapper/SeApi.h"
#include "uv/uv.h"
uv_udp_t* udpSocket = NULL; uv_udp_t* udpSocket = NULL;
uv_thread_t recvTid; uv_thread_t recvTid;
uv_async_t uvLoopStopSig; uv_async_t uvLoopStopSig;
uv_loop_t* loop = NULL; // Only this loop is used for this simple PoC uv_loop_t* loop = NULL; // Only this loop is used for this simple PoC
int const maxPeerCnt = 10;
struct PeerAddr {
struct sockaddr_in sockAddrIn;
uint32_t authKey;
};
struct PeerAddr peerAddrList[maxPeerCnt]; struct PeerAddr peerAddrList[maxPeerCnt];
uv_mutex_t sendLock, recvLock; char SRV_IP[256];
CHARC * SRV_IP = NULL;
int SRV_PORT = 0; int SRV_PORT = 0;
void _onRead(uv_udp_t* req, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags) { void _onRead(uv_udp_t* req, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags) {
@ -31,9 +23,9 @@ void _onRead(uv_udp_t* req, ssize_t nread, const uv_buf_t* buf, const struct soc
} }
struct sockaddr_in* sockAddr = (struct sockaddr_in*)addr; struct sockaddr_in* sockAddr = (struct sockaddr_in*)addr;
char ip[64] = { 0 }; char ip[17] = { 0 };
uv_ip4_name(sockAddr, ip, sizeof ip); uv_ip4_name(sockAddr, ip, sizeof ip);
int port = sockAddr->sin_port; int port = ntohs(sockAddr->sin_port);
int const gameThreadMsgSize = 256; int const gameThreadMsgSize = 256;
char* const gameThreadMsg = (char* const)malloc(gameThreadMsgSize); char* const gameThreadMsg = (char* const)malloc(gameThreadMsgSize);
@ -64,24 +56,127 @@ void _onRead(uv_udp_t* req, ssize_t nread, const uv_buf_t* buf, const struct soc
static void _allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static void _allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
(void)handle; (void)handle;
buf->base = (char *)malloc(suggested_size); buf->base = (char*)malloc(suggested_size);
buf->len = suggested_size; buf->len = suggested_size;
} }
void diep(char* s) {
perror(s);
exit(1);
}
typedef struct client {
int host;
short port;
};
void _onUvStopSig(uv_async_t* handle) { void _onUvStopSig(uv_async_t* handle) {
uv_stop(loop); uv_stop(loop);
CCLOG("UDP recv loop is signaled to stop in UvThread"); CCLOG("UDP recv loop is signaled to stop in UvThread");
} }
void _onSend(uv_udp_send_t* req, int status) {
free(req); // No need to free "req->base", it'll be handled in each "_afterXxx" callback
if (status) {
CCLOGERROR("uv_udp_send_cb error: %s\n", uv_strerror(status));
}
}
class PunchServerWork {
public:
BYTEC bytes[128]; // Wasting some RAM here thus no need for explicit recursive destruction
size_t bytesLen;
PunchServerWork(BYTEC* const newBytes, size_t newBytesLen) {
memset(this->bytes, 0, sizeof(this->bytes));
memcpy(this->bytes, newBytes, newBytesLen);
this->bytesLen = newBytesLen;
}
};
void _punchServerOnUvThread(uv_work_t* wrapper) {
PunchServerWork* work = (PunchServerWork*)wrapper->data;
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen);
struct sockaddr_in destAddr;
uv_ip4_addr(SRV_IP, SRV_PORT, &destAddr);
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&destAddr, _onSend);
}
void _afterPunchServer(uv_work_t* wrapper, int status) {
PunchServerWork* work = (PunchServerWork*)wrapper->data;
delete work;
}
class PunchPeerWork {
public:
int roomCapacity;
int selfJoinIndex;
PunchPeerWork(int newRoomCapacity, int newSelfJoinIndex) {
this->roomCapacity = newRoomCapacity;
this->selfJoinIndex = newSelfJoinIndex;
}
};
void _punchPeerOnUvThread(uv_work_t* wrapper) {
PunchPeerWork* work = (PunchPeerWork*)wrapper->data;
int roomCapacity = work->roomCapacity;
int selfJoinIndex = work->selfJoinIndex;
for (int i = 0; i < roomCapacity; i++) {
if (i + 1 == selfJoinIndex) {
continue;
}
if (0 == peerAddrList[i].sockAddrIn.sin_port) {
// Peer addr not initialized
continue;
}
char peerIp[17] = { 0 };
uv_ip4_name((struct sockaddr_in*)&(peerAddrList[i].sockAddrIn), peerIp, sizeof peerIp);
for (int j = 0; j < 3; j++) {
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
uv_buf_t sendBuffer = uv_buf_init("foobar", 6); // hardcoded for now
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&peerAddrList[i], _onSend);
CCLOG("UDP punched peer %s:%d by 6 bytes round-%d", peerIp, ntohs(peerAddrList[i].sockAddrIn.sin_port), j);
}
}
}
void _afterPunchPeer(uv_work_t* wrapper, int status) {
PunchPeerWork* work = (PunchPeerWork*)wrapper->data;
delete work;
}
class BroadcastInputFrameUpsyncWork {
public:
BYTEC bytes[128]; // Wasting some RAM here thus no need for explicit recursive destruction
size_t bytesLen;
int roomCapacity;
int selfJoinIndex;
BroadcastInputFrameUpsyncWork(BYTEC* const newBytes, size_t newBytesLen, int newRoomCapacity, int newSelfJoinIndex) {
memset(this->bytes, 0, sizeof(this->bytes));
memcpy(this->bytes, newBytes, newBytesLen);
this->bytesLen = newBytesLen;
this->roomCapacity = newRoomCapacity;
this->selfJoinIndex = newSelfJoinIndex;
}
};
void _broadcastInputFrameUpsyncOnUvThread(uv_work_t* wrapper) {
BroadcastInputFrameUpsyncWork* work = (BroadcastInputFrameUpsyncWork*)wrapper->data;
int roomCapacity = work->roomCapacity;
int selfJoinIndex = work->selfJoinIndex;
for (int i = 0; i < roomCapacity; i++) {
if (i + 1 == selfJoinIndex) {
continue;
}
if (0 == peerAddrList[i].sockAddrIn.sin_port) {
// Peer addr not initialized
continue;
}
char peerIp[17] = { 0 };
uv_ip4_name((struct sockaddr_in*)&(peerAddrList[i].sockAddrIn), peerIp, sizeof peerIp);
// Might want to send several times for better arrival rate
for (int j = 0; j < 1; j++) {
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen);
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&peerAddrList[i], _onSend);
CCLOG("UDP broadcasted upsync to peer %s:%d by %u bytes round-%d", peerIp, ntohs(peerAddrList[i].sockAddrIn.sin_port), work->bytesLen, j);
}
}
}
void _afterBroadcastInputFrameUpsync(uv_work_t* wrapper, int status) {
BroadcastInputFrameUpsyncWork* work = (BroadcastInputFrameUpsyncWork*)wrapper->data;
delete work;
}
void _onWalkCleanup(uv_handle_t* handle, void* data) { void _onWalkCleanup(uv_handle_t* handle, void* data) {
(void)data; (void)data;
uv_close(handle, NULL); uv_close(handle, NULL);
@ -100,9 +195,6 @@ void startRecvLoop(void* arg) {
bool DelayNoMore::UdpSession::openUdpSession(int port) { bool DelayNoMore::UdpSession::openUdpSession(int port) {
uv_mutex_init(&sendLock);
uv_mutex_init(&recvLock);
udpSocket = (uv_udp_t*)malloc(sizeof(uv_udp_t)); udpSocket = (uv_udp_t*)malloc(sizeof(uv_udp_t));
struct sockaddr_in recv_addr; struct sockaddr_in recv_addr;
uv_ip4_addr("0.0.0.0", port, &recv_addr); uv_ip4_addr("0.0.0.0", port, &recv_addr);
@ -112,6 +204,7 @@ bool DelayNoMore::UdpSession::openUdpSession(int port) {
loop = uv_loop_new(); loop = uv_loop_new();
uv_udp_init(loop, udpSocket); uv_udp_init(loop, udpSocket);
uv_async_init(loop, &uvLoopStopSig, _onUvStopSig); uv_async_init(loop, &uvLoopStopSig, _onUvStopSig);
uv_udp_recv_start(udpSocket, _allocBuffer, _onRead); uv_udp_recv_start(udpSocket, _allocBuffer, _onRead);
uv_thread_create(&recvTid, startRecvLoop, loop); uv_thread_create(&recvTid, startRecvLoop, loop);
@ -136,73 +229,51 @@ bool DelayNoMore::UdpSession::closeUdpSession() {
free(udpSocket); free(udpSocket);
free(loop); free(loop);
uv_mutex_destroy(&sendLock);
uv_mutex_destroy(&recvLock);
CCLOG("Closed udp session and dealloc all resources in GameThread..."); CCLOG("Closed udp session and dealloc all resources in GameThread...");
return true; return true;
} }
void _onSend(uv_udp_send_t* req, int status) {
free(req);
if (status) {
fprintf(stderr, "uv_udp_send_cb error: %s\n", uv_strerror(status));
}
}
bool DelayNoMore::UdpSession::upsertPeerUdpAddr(int joinIndex, CHARC* const ip, int port, uint32_t authKey, int roomCapacity, int selfJoinIndex) {
CCLOG("upsertPeerUdpAddr called by js for joinIndex=%d, ip=%s, port=%d, authKey=%lu; roomCapacity=%d, selfJoinIndex=%d.", joinIndex, ip, port, authKey, roomCapacity, selfJoinIndex);
// Punching between existing peer-pairs for Address/Port-restricted Cone NAT (not need for Full Cone NAT)
uv_mutex_lock(&sendLock);
for (int i = 0; i < roomCapacity; i++) {
if (i == selfJoinIndex - 1) continue;
uv_ip4_addr(ip, port, &(peerAddrList[i].sockAddrIn));
peerAddrList[i].authKey = authKey;
for (int j = 0; j < 10; j++) {
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
uv_buf_t sendBuffer = uv_buf_init("foobar", 6); // hardcoded for now
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&peerAddrList[i], _onSend);
}
}
uv_mutex_unlock(&sendLock);
return true;
}
bool DelayNoMore::UdpSession::punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes, size_t bytesLen) { bool DelayNoMore::UdpSession::punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes, size_t bytesLen) {
/* /*
[WARNING] The RAM space used for "bytes", either on stack or in heap, is preallocatedand managed by the caller. [WARNING] The RAM space used for "bytes", either on stack or in heap, is preallocatedand managed by the caller which runs on the GameThread. Actual sending will be made on UvThread.
Moreover, there's no need to lock on "bytes". Only "udpSocket" is possibly accessed by multiple threads. Therefore we make a copy of this message before dispatching it "GameThread -> UvThread".
*/ */
SRV_IP = srvIp; memset(SRV_IP, 0, sizeof SRV_IP);
memcpy(SRV_IP, srvIp, strlen(srvIp));
SRV_PORT = srvPort; SRV_PORT = srvPort;
PunchServerWork* work = new PunchServerWork(bytes, bytesLen);
uv_work_t* wrapper = (uv_work_t*)malloc(sizeof(uv_work_t));
wrapper->data = work;
uv_queue_work(loop, wrapper, _punchServerOnUvThread, _afterPunchServer);
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); return true;
uv_buf_t sendBuffer = uv_buf_init(bytes, bytesLen); }
struct sockaddr_in destAddr;
uv_ip4_addr(SRV_IP, SRV_PORT, &destAddr); bool DelayNoMore::UdpSession::upsertPeerUdpAddr(struct PeerAddr* newPeerAddrList, int roomCapacity, int selfJoinIndex) {
uv_mutex_lock(&sendLock); CCLOG("upsertPeerUdpAddr called by js for roomCapacity=%d, selfJoinIndex=%d.", roomCapacity, selfJoinIndex);
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&destAddr, _onSend);
uv_mutex_unlock(&sendLock); // Punching between existing peer-pairs for Address/Port-restricted Cone NAT (not need for Full Cone NAT); UvThread never writes into "peerAddrList", so I assume that it's safe to skip locking for them
for (int i = 0; i < roomCapacity; i++) {
if (i == selfJoinIndex - 1) continue;
peerAddrList[i].sockAddrIn = (*(newPeerAddrList + i)).sockAddrIn;
peerAddrList[i].authKey = (*(newPeerAddrList + i)).authKey;
}
PunchPeerWork* work = new PunchPeerWork(roomCapacity, selfJoinIndex);
uv_work_t* wrapper = (uv_work_t*)malloc(sizeof(uv_work_t));
wrapper->data = work;
uv_queue_work(loop, wrapper, _punchPeerOnUvThread, _afterPunchPeer);
return true; return true;
} }
bool DelayNoMore::UdpSession::broadcastInputFrameUpsync(BYTEC* const bytes, size_t bytesLen, int roomCapacity, int selfJoinIndex) { bool DelayNoMore::UdpSession::broadcastInputFrameUpsync(BYTEC* const bytes, size_t bytesLen, int roomCapacity, int selfJoinIndex) {
uv_mutex_lock(&sendLock); BroadcastInputFrameUpsyncWork* work = new BroadcastInputFrameUpsyncWork(bytes, bytesLen, roomCapacity, selfJoinIndex);
for (int i = 0; i < roomCapacity; i++) { uv_work_t* wrapper = (uv_work_t*)malloc(sizeof(uv_work_t));
if (i == selfJoinIndex - 1) continue; wrapper->data = work;
for (int j = 0; j < 10; j++) { uv_queue_work(loop, wrapper, _broadcastInputFrameUpsyncOnUvThread, _afterBroadcastInputFrameUpsync);
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
uv_buf_t sendBuffer = uv_buf_init(bytes, bytesLen);
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&peerAddrList[i], _onSend);
}
}
uv_mutex_unlock(&sendLock);
return true; return true;
} }

View File

@ -1,17 +1,24 @@
#include "uv/uv.h"
#define __SSIZE_T // Otherwise "ssize_t" would have conflicting macros error that stops compiling
#ifndef udp_session_hpp #ifndef udp_session_hpp
#define udp_session_hpp #define udp_session_hpp
#include "cocos/scripting/js-bindings/jswrapper/SeApi.h"
typedef char BYTEC; typedef char BYTEC;
typedef char const CHARC; typedef char const CHARC;
int const maxPeerCnt = 10;
struct PeerAddr {
struct sockaddr_in sockAddrIn;
uint32_t authKey;
};
namespace DelayNoMore { namespace DelayNoMore {
class UdpSession { class UdpSession {
public: public:
static bool openUdpSession(int port); static bool openUdpSession(int port);
static bool closeUdpSession(); static bool closeUdpSession();
static bool upsertPeerUdpAddr(int joinIndex, CHARC* const ip, int port, uint32_t authKey, int roomCapacity, int selfJoinIndex); static bool upsertPeerUdpAddr(struct PeerAddr* newPeerAddrList, int roomCapacity, int selfJoinIndex);
//static bool clearPeerUDPAddrList(); //static bool clearPeerUDPAddrList();
static bool punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes, size_t bytesLen); static bool punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes, size_t bytesLen);
static bool broadcastInputFrameUpsync(BYTEC* const bytes, size_t bytesLen, int roomCapacity, int selfJoinIndex); static bool broadcastInputFrameUpsync(BYTEC* const bytes, size_t bytesLen, int roomCapacity, int selfJoinIndex);

View File

@ -82,21 +82,31 @@ bool closeUdpSession(se::State& s) {
} }
SE_BIND_FUNC(closeUdpSession) SE_BIND_FUNC(closeUdpSession)
struct PeerAddr newPeerAddrList[maxPeerCnt];
bool upsertPeerUdpAddr(se::State& s) { bool upsertPeerUdpAddr(se::State& s) {
const auto& args = s.args(); const auto& args = s.args();
size_t argc = args.size(); size_t argc = args.size();
CC_UNUSED bool ok = true; CC_UNUSED bool ok = true;
if (6 == argc && args[0].isNumber() && args[1].isString() && args[2].isNumber() && args[3].isNumber() && args[4].isNumber() && args[5].isNumber()) { if (3 == argc && args[0].isObject() && args[0].toObject()->isArray() && args[1].isNumber() && args[2].isNumber()) {
SE_PRECONDITION2(ok, false, "upsertPeerUdpAddr: Error processing arguments"); SE_PRECONDITION2(ok, false, "upsertPeerUdpAddr: Error processing arguments");
int joinIndex = args[0].toInt32(); int roomCapacity = args[1].toInt32();
CHARC* ip = args[1].toString().c_str(); int selfJoinIndex = args[2].toInt32();
int port = args[2].toInt32(); se::Object* newPeerAddrValArr = args[0].toObject();
uint32_t authKey = args[3].toUint32(); for (int i = 0; i < roomCapacity; i++) {
int roomCapacity = args[4].toInt32(); se::Value newPeerAddrVal;
int selfJoinIndex = args[5].toInt32(); newPeerAddrValArr->getArrayElement(i, &newPeerAddrVal);
return DelayNoMore::UdpSession::upsertPeerUdpAddr(joinIndex, ip, port, authKey, roomCapacity, selfJoinIndex); se::Object* newPeerAddrObj = newPeerAddrVal.toObject();
se::Value newIp, newPort, newAuthKey;
newPeerAddrObj->getProperty("ip", &newIp);
newPeerAddrObj->getProperty("port", &newPort);
newPeerAddrObj->getProperty("authKey", &newAuthKey);
uv_ip4_addr(newIp.toString().c_str(), newPort.toInt32(), &(newPeerAddrList[i].sockAddrIn));
newPeerAddrList[i].authKey = newAuthKey.toInt32();
}
return DelayNoMore::UdpSession::upsertPeerUdpAddr(newPeerAddrList, roomCapacity, selfJoinIndex);
} }
SE_REPORT_ERROR("wrong number of arguments: %d, was expecting %d; or wrong arg type!", (int)argc, 6); SE_REPORT_ERROR("wrong number of arguments: %d, was expecting %d; or wrong arg type!", (int)argc, 3);
return false; return false;
} }