Fixes for frontend hole punching compatibility.

This commit is contained in:
genxium 2023-01-25 11:57:59 +08:00
parent 6bc3feab58
commit 5df545e168
7 changed files with 91 additions and 41 deletions

File diff suppressed because one or more lines are too long

Binary file not shown.

Before

Width:  |  Height:  |  Size: 417 KiB

After

Width:  |  Height:  |  Size: 476 KiB

View File

@ -155,7 +155,6 @@ cc.Class({
const echoed = window.pb.protos.HolePunchUpsync.decode(ui8Arr); const echoed = window.pb.protos.HolePunchUpsync.decode(ui8Arr);
cc.log(`#2 Js called back by CPP: onUdpMessage: ${JSON.stringify(echoed)}`); cc.log(`#2 Js called back by CPP: onUdpMessage: ${JSON.stringify(echoed)}`);
}; };
DelayNoMore.UdpSession.upsertPeerUdpAddr(self.selfPlayerInfo.JoinIndex, "192.168.31.194", 6789, 123456);
const res1 = DelayNoMore.UdpSession.openUdpSession(8888 + self.selfPlayerInfo.JoinIndex); const res1 = DelayNoMore.UdpSession.openUdpSession(8888 + self.selfPlayerInfo.JoinIndex);
const holePunchDate = window.pb.protos.HolePunchUpsync.encode({ const holePunchDate = window.pb.protos.HolePunchUpsync.encode({
joinIndex: self.selfPlayerInfo.JoinIndex, joinIndex: self.selfPlayerInfo.JoinIndex,
@ -163,8 +162,9 @@ cc.Class({
intAuthToken: "foobar", intAuthToken: "foobar",
authKey: Math.floor(Math.random() * 65535), authKey: Math.floor(Math.random() * 65535),
}).finish() }).finish()
const res2 = DelayNoMore.UdpSession.punchToServer(holePunchDate); const res2 = DelayNoMore.UdpSession.punchToServer("127.0.0.1", 3000, holePunchDate);
//const res3 = DelayNoMore.UdpSession.closeUdpSession(); const res3 = DelayNoMore.UdpSession.upsertPeerUdpAddr(self.selfPlayerInfo.JoinIndex, "192.168.31.194", 6789, 123456, 2, self.selfPlayerInfo.JoinIndex);
//const res4 = DelayNoMore.UdpSession.closeUdpSession();
} }
self.onRoomDownsyncFrame(startRdf); self.onRoomDownsyncFrame(startRdf);

View File

@ -9,9 +9,18 @@ uv_udp_t* udpSocket = NULL;
uv_loop_t* loop = NULL; // Only this loop is used for this simple PoC uv_loop_t* loop = NULL; // Only this loop is used for this simple PoC
int const sendBufferLen = 1024; int const maxPeerCnt = 10;
struct PeerAddr {
SOCKADDR_IN sockAddrIn;
uint32_t authKey;
};
struct PeerAddr peerAddrList[maxPeerCnt];
uv_mutex_t sendLock, recvLock; uv_mutex_t sendLock, recvLock;
CHARC * SRV_IP = NULL;
int SRV_PORT = 0;
void _onRead(uv_udp_t* req, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags) { void _onRead(uv_udp_t* req, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned flags) {
if (nread < 0) { if (nread < 0) {
CCLOGERROR("Read error %s", uv_err_name(nread)); CCLOGERROR("Read error %s", uv_err_name(nread));
@ -20,19 +29,20 @@ void _onRead(uv_udp_t* req, ssize_t nread, const uv_buf_t* buf, const struct soc
return; return;
} }
char senderAddrStr[64] = { 0 }; SOCKADDR_IN* sockAddr = (SOCKADDR_IN*)addr;
uv_ip4_name((const struct sockaddr_in*)addr, senderAddrStr, 16); char ip[64] = { 0 };
uv_ip4_name(sockAddr, ip, sizeof ip);
//uv_udp_recv_stop(req); int port = sockAddr->sin_port;
int const gameThreadMsgSize = 256; int const gameThreadMsgSize = 256;
char* const gameThreadMsg = (char* const)malloc(gameThreadMsgSize); char* const gameThreadMsg = (char* const)malloc(gameThreadMsgSize);
memset(gameThreadMsg, 0, gameThreadMsgSize); memset(gameThreadMsg, 0, gameThreadMsgSize);
memcpy(gameThreadMsg, buf->base, nread); memcpy(gameThreadMsg, buf->base, nread);
CCLOG("Recv %d bytes from %s, converted to %d bytes for the JS callback", nread, senderAddrStr, strlen(gameThreadMsg)); CCLOG("Recv %d bytes from %s:%d, converted to %d bytes for the JS callback", nread, ip, port, strlen(gameThreadMsg));
free(buf->base); free(buf->base);
//uv_udp_recv_stop(req);
cocos2d::Application::getInstance()->getScheduler()->performFunctionInCocosThread([=]() { cocos2d::Application::getInstance()->getScheduler()->performFunctionInCocosThread([=]() {
// [WARNING] Use of the "ScriptEngine" is only allowed in "GameThread a.k.a. CocosThread"! // [WARNING] Use of the "ScriptEngine" is only allowed in "GameThread a.k.a. CocosThread"!
se::Value onUdpMessageCb; se::Value onUdpMessageCb;
@ -41,7 +51,8 @@ void _onRead(uv_udp_t* req, ssize_t nread, const uv_buf_t* buf, const struct soc
se::AutoHandleScope hs; se::AutoHandleScope hs;
se::ValueArray args = { se::Value(gameThreadMsg) }; se::ValueArray args = { se::Value(gameThreadMsg) };
if (onUdpMessageCb.isObject() && onUdpMessageCb.toObject()->isFunction()) { if (onUdpMessageCb.isObject() && onUdpMessageCb.toObject()->isFunction()) {
bool ok = onUdpMessageCb.toObject()->call(args, NULL /* Temporarily assume that the "this" ptr within callback is NULL. */); // Temporarily assume that the "this" ptr within callback is NULL.
bool ok = onUdpMessageCb.toObject()->call(args, NULL);
if (!ok) { if (!ok) {
se::ScriptEngine::getInstance()->clearException(); se::ScriptEngine::getInstance()->clearException();
} }
@ -56,10 +67,20 @@ static void _allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
buf->len = suggested_size; buf->len = suggested_size;
} }
void diep(char* s) {
perror(s);
exit(1);
}
typedef struct client {
int host;
short port;
};
void startRecvLoop(void* arg) { void startRecvLoop(void* arg) {
uv_loop_t* loop = (uv_loop_t*)arg; uv_loop_t* l = (uv_loop_t*)arg;
uv_run(loop, UV_RUN_DEFAULT); uv_run(l, UV_RUN_DEFAULT);
CCLOG("UDP session is ended!");
CCLOG("UDP recv loop is ended!");
} }
bool DelayNoMore::UdpSession::openUdpSession(int port) { bool DelayNoMore::UdpSession::openUdpSession(int port) {
@ -67,17 +88,16 @@ bool DelayNoMore::UdpSession::openUdpSession(int port) {
uv_mutex_init(&sendLock); uv_mutex_init(&sendLock);
uv_mutex_init(&recvLock); uv_mutex_init(&recvLock);
CCLOG("About to open UDP session at port=%d...", port);
loop = uv_loop_new(); // Only the default loop is used for this simple PoC
udpSocket = (uv_udp_t*)malloc(sizeof(uv_udp_t)); udpSocket = (uv_udp_t*)malloc(sizeof(uv_udp_t));
SOCKADDR_IN recv_addr; SOCKADDR_IN recv_addr;
uv_ip4_addr("0.0.0.0", port, &recv_addr); uv_ip4_addr("0.0.0.0", port, &recv_addr);
uv_udp_init(loop, udpSocket);
uv_udp_bind(udpSocket, (struct sockaddr const*)&recv_addr, UV_UDP_REUSEADDR); uv_udp_bind(udpSocket, (struct sockaddr const*)&recv_addr, UV_UDP_REUSEADDR);
uv_udp_recv_start(udpSocket, _allocBuffer, _onRead);
CCLOG("About to open UDP session at port=%d...", port);
loop = uv_loop_new();
uv_udp_init(loop, udpSocket);
uv_udp_recv_start(udpSocket, _allocBuffer, _onRead);
uv_thread_t recvTid; uv_thread_t recvTid;
uv_thread_create(&recvTid, startRecvLoop, loop); uv_thread_create(&recvTid, startRecvLoop, loop);
@ -93,6 +113,12 @@ static void _onWalkCleanup(uv_handle_t* handle, void* data) {
bool DelayNoMore::UdpSession::closeUdpSession() { bool DelayNoMore::UdpSession::closeUdpSession() {
CCLOG("About to close udp session and dealloc all resources..."); CCLOG("About to close udp session and dealloc all resources...");
for (int i = 0; i < maxPeerCnt; i++) {
peerAddrList[i].authKey = -1; // hardcoded for now
memset((char*)&peerAddrList[i].sockAddrIn, 0, sizeof(peerAddrList[i].sockAddrIn));
}
uv_stop(loop); uv_stop(loop);
uv_walk(loop, _onWalkCleanup, NULL); uv_walk(loop, _onWalkCleanup, NULL);
uv_loop_close(loop); uv_loop_close(loop);
@ -101,16 +127,12 @@ bool DelayNoMore::UdpSession::closeUdpSession() {
uv_mutex_destroy(&sendLock); uv_mutex_destroy(&sendLock);
uv_mutex_destroy(&recvLock); uv_mutex_destroy(&recvLock);
CCLOG("Closed udp session and dealloc all resources..."); CCLOG("Closed udp session and dealloc all resources...");
return true; return true;
} }
bool DelayNoMore::UdpSession::upsertPeerUdpAddr(int joinIndex, CHARC* const ip, int port, uint32_t authKey) {
CCLOG("Called by js for joinIndex=%d, ip=%s, port=%d, authKey=%lu.", joinIndex, ip, port, authKey);
return true;
}
void _onSend(uv_udp_send_t* req, int status) { void _onSend(uv_udp_send_t* req, int status) {
free(req); free(req);
if (status) { if (status) {
@ -118,19 +140,43 @@ void _onSend(uv_udp_send_t* req, int status) {
} }
} }
bool DelayNoMore::UdpSession::punchToServer(BYTEC* const bytes) { bool DelayNoMore::UdpSession::upsertPeerUdpAddr(int joinIndex, CHARC* const ip, int port, uint32_t authKey, int roomCapacity, int selfJoinIndex) {
CCLOG("Called by js for joinIndex=%d, ip=%s, port=%d, authKey=%lu; roomCapacity=%d, selfJoinIndex=%d.", joinIndex, ip, port, authKey, roomCapacity, selfJoinIndex);
uv_ip4_addr(ip, port, &(peerAddrList[joinIndex - 1].sockAddrIn));
peerAddrList[joinIndex - 1].authKey = authKey;
// Punching between existing peer-pairs for Address/Port-restricted Cone NAT (not need for Full Cone NAT)
uv_mutex_lock(&sendLock);
for (int i = 0; i < roomCapacity; i++) {
if (i == selfJoinIndex - 1) continue;
for (int j = 0; j < 10; j++) {
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
uv_buf_t sendBuffer = uv_buf_init("foobar", 6); // hardcoded for now
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&peerAddrList[i], _onSend);
}
}
uv_mutex_unlock(&sendLock);
return true;
}
bool DelayNoMore::UdpSession::punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes) {
/* /*
[WARNING] The RAM space used for "bytes", either on stack or in heap, is preallocatedand managed by the caller. [WARNING] The RAM space used for "bytes", either on stack or in heap, is preallocatedand managed by the caller.
Moreover, there's no need to lock on "bytes". Only "udpSocket" is possibly accessed by multiple threads. Moreover, there's no need to lock on "bytes". Only "udpSocket" is possibly accessed by multiple threads.
*/ */
SRV_IP = srvIp;
SRV_PORT = srvPort;
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t)); uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
uv_buf_t sendBuffer = uv_buf_init(bytes, strlen(bytes)); uv_buf_t sendBuffer = uv_buf_init(bytes, strlen(bytes));
SOCKADDR_IN destAddr; SOCKADDR_IN destAddr;
uv_ip4_addr("127.0.0.1", 3000, &destAddr);
uv_ip4_addr(SRV_IP, SRV_PORT, &destAddr);
uv_mutex_lock(&sendLock); uv_mutex_lock(&sendLock);
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&destAddr, _onSend); uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&destAddr, _onSend);
uv_mutex_unlock(&sendLock); uv_mutex_unlock(&sendLock);
return true; return true;
} }

View File

@ -11,9 +11,9 @@ namespace DelayNoMore {
public: public:
static bool openUdpSession(int port); static bool openUdpSession(int port);
static bool closeUdpSession(); static bool closeUdpSession();
static bool upsertPeerUdpAddr(int joinIndex, CHARC* const ip, int port, uint32_t authKey); static bool upsertPeerUdpAddr(int joinIndex, CHARC* const ip, int port, uint32_t authKey, int roomCapacity, int selfJoinIndex);
//static bool clearPeerUDPAddrList(); //static bool clearPeerUDPAddrList();
static bool punchToServer(BYTEC* const bytes); static bool punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes);
}; };
} }
#endif #endif

View File

@ -22,19 +22,21 @@ bool punchToServer(se::State& s) {
const auto& args = s.args(); const auto& args = s.args();
size_t argc = args.size(); size_t argc = args.size();
CC_UNUSED bool ok = true; CC_UNUSED bool ok = true;
if (1 == argc && args[0].isObject() && args[0].toObject()->isTypedArray()) { if (3 == argc && args[0].isString() && args[1].isNumber() && args[2].isObject() && args[2].toObject()->isTypedArray()) {
SE_PRECONDITION2(ok, false, "punchToServer: Error processing arguments"); SE_PRECONDITION2(ok, false, "punchToServer: Error processing arguments");
CHARC* srvIp = args[0].toString().c_str();
int srvPort = args[1].toInt32();
BYTEC bytes[1024]; BYTEC bytes[1024];
memset(bytes, 0, sizeof bytes); memset(bytes, 0, sizeof bytes);
se::Object* obj = args[0].toObject(); se::Object* obj = args[2].toObject();
size_t sz = 0; size_t sz = 0;
uint8_t* ptr; uint8_t* ptr;
obj->getTypedArrayData(&ptr, &sz); obj->getTypedArrayData(&ptr, &sz);
memcpy(bytes, ptr, sz); memcpy(bytes, ptr, sz);
CCLOG("Should punch by %d bytes v.s. strlen(bytes)=%u.", sz, strlen(bytes)); CCLOG("Should punch %s:%d by %d bytes v.s. strlen(bytes)=%u.", srvIp, srvPort, sz, strlen(bytes));
return DelayNoMore::UdpSession::punchToServer(bytes); return DelayNoMore::UdpSession::punchToServer(srvIp, srvPort, bytes);
} }
SE_REPORT_ERROR("wrong number of arguments: %d, was expecting %d; or wrong arg type!", (int)argc, 1); SE_REPORT_ERROR("wrong number of arguments: %d, was expecting %d; or wrong arg type!", (int)argc, 3);
return false; return false;
} }
SE_BIND_FUNC(punchToServer) SE_BIND_FUNC(punchToServer)
@ -57,15 +59,17 @@ bool upsertPeerUdpAddr(se::State& s) {
const auto& args = s.args(); const auto& args = s.args();
size_t argc = args.size(); size_t argc = args.size();
CC_UNUSED bool ok = true; CC_UNUSED bool ok = true;
if (4 == argc && args[0].isNumber() && args[1].isString() && args[2].isNumber() && args[3].isNumber()) { if (6 == argc && args[0].isNumber() && args[1].isString() && args[2].isNumber() && args[3].isNumber() && args[4].isNumber() && args[5].isNumber()) {
SE_PRECONDITION2(ok, false, "upsertPeerUdpAddr: Error processing arguments"); SE_PRECONDITION2(ok, false, "upsertPeerUdpAddr: Error processing arguments");
int joinIndex = args[0].toInt32(); int joinIndex = args[0].toInt32();
CHARC* ip = args[1].toString().c_str(); CHARC* ip = args[1].toString().c_str();
int port = args[2].toInt32(); int port = args[2].toInt32();
uint32_t authKey = args[3].toUint32(); uint32_t authKey = args[3].toUint32();
return DelayNoMore::UdpSession::upsertPeerUdpAddr(joinIndex, ip, port, authKey); int roomCapacity = args[4].toInt32();
int selfJoinIndex = args[5].toInt32();
return DelayNoMore::UdpSession::upsertPeerUdpAddr(joinIndex, ip, port, authKey, roomCapacity, selfJoinIndex);
} }
SE_REPORT_ERROR("wrong number of arguments: %d, was expecting %d; or wrong arg type!", (int)argc, 4); SE_REPORT_ERROR("wrong number of arguments: %d, was expecting %d; or wrong arg type!", (int)argc, 6);
return false; return false;
} }

View File

@ -9,7 +9,7 @@ import (
func main() { func main() {
conn, err := net.ListenUDP("udp", &net.UDPAddr{ conn, err := net.ListenUDP("udp", &net.UDPAddr{
Port: 3000, Port: 3000,
IP: net.ParseIP("127.0.0.1"), IP: net.ParseIP("0.0.0.0"),
}) })
if err != nil { if err != nil {
panic(err) panic(err)