Added front-to-back UDP channel punching.

This commit is contained in:
genxium
2023-01-29 17:41:17 +08:00
parent b5530b352b
commit c65c122f45
10 changed files with 396 additions and 296 deletions

View File

@@ -14,6 +14,7 @@ struct PeerAddr peerAddrList[maxPeerCnt];
char SRV_IP[256];
int SRV_PORT = 0;
int UDP_TUNNEL_SRV_PORT = 0;
void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr const* addr, unsigned flags) {
if (nread < 0) {
@@ -46,8 +47,8 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr
if (6 == nread) {
// holepunching
} else if (0 < nread) {
// Non-holepunching
uint8_t* const ui8Arr = (uint8_t*)malloc(256*sizeof(uint8_t));
// Non-holepunching; it might be more effective in RAM usage to use a threadsafe RingBuff to pass msg to GameThread here, but as long as it's not a performance blocker don't bother optimize here...
uint8_t* const ui8Arr = (uint8_t*)malloc(maxUdpPayloadBytes*sizeof(uint8_t));
memset(ui8Arr, 0, sizeof ui8Arr);
for (int i = 0; i < nread; i++) {
*(ui8Arr+i) = *(buf->base + i);
@@ -58,9 +59,9 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr
se::ScriptEngine::getInstance()->getGlobalObject()->getProperty("onUdpMessage", &onUdpMessageCb);
// [WARNING] Declaring "AutoHandleScope" is critical here, otherwise "onUdpMessageCb.toObject()" wouldn't be recognized as a function of the ScriptEngine!
se::AutoHandleScope hs;
CCLOG("UDP received %d bytes upsync -- 1", nread);
//CCLOG("UDP received %d bytes upsync -- 1", nread);
se::Object* const gameThreadMsg = se::Object::createTypedArray(se::Object::TypedArrayType::UINT8, ui8Arr, nread);
CCLOG("UDP received %d bytes upsync -- 2", nread);
//CCLOG("UDP received %d bytes upsync -- 2", nread);
se::ValueArray args = { se::Value(gameThreadMsg) };
if (onUdpMessageCb.isObject() && onUdpMessageCb.toObject()->isFunction()) {
// Temporarily assume that the "this" ptr within callback is NULL.
@@ -69,9 +70,9 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr
se::ScriptEngine::getInstance()->clearException();
}
}
CCLOG("UDP received %d bytes upsync -- 3", nread);
//CCLOG("UDP received %d bytes upsync -- 3", nread);
gameThreadMsg->decRef(); // Reference http://docs.cocos.com/creator/2.2/manual/en/advanced-topics/JSB2.0-learning.html#seobject
CCLOG("UDP received %d bytes upsync -- 4", nread);
//CCLOG("UDP received %d bytes upsync -- 4", nread);
free(ui8Arr);
CCLOG("UDP received %d bytes upsync -- 5", nread);
@@ -110,25 +111,42 @@ void _onUvTimerClosed(uv_handle_t* timer) {
free(timer);
}
int const punchServerCnt = 3;
class PunchServerWork {
public:
BYTEC bytes[128]; // Wasting some RAM here thus no need for explicit recursive destruction
BYTEC bytes[maxUdpPayloadBytes]; // Wasting some RAM here thus no need for explicit recursive destruction
size_t bytesLen;
PunchServerWork(BYTEC* const newBytes, size_t newBytesLen) {
BYTEC udpTunnelBytes[maxUdpPayloadBytes];
size_t udpTunnelBytesLen;
PunchServerWork(BYTEC* const newBytes, size_t newBytesLen, BYTEC* const newUdpTunnelBytes, size_t newUdpTunnelBytesLen) {
memset(this->bytes, 0, sizeof(this->bytes));
memcpy(this->bytes, newBytes, newBytesLen);
this->bytesLen = newBytesLen;
memset(this->udpTunnelBytes, 0, sizeof(this->udpTunnelBytes));
memcpy(this->udpTunnelBytes, newUdpTunnelBytes, newUdpTunnelBytesLen);
this->udpTunnelBytesLen = newUdpTunnelBytesLen;
}
};
void _punchServerOnUvThread(uv_work_t* wrapper) {
PunchServerWork* work = (PunchServerWork*)wrapper->data;
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen);
struct sockaddr_in destAddr;
uv_ip4_addr(SRV_IP, SRV_PORT, &destAddr);
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&destAddr, _onSend);
for (int i = 0; i < punchServerCnt; i++) {
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen);
struct sockaddr_in destAddr;
uv_ip4_addr(SRV_IP, SRV_PORT, &destAddr);
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&destAddr, _onSend);
uv_udp_send_t* udpTunnelReq = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
uv_buf_t udpTunnelSendBuffer = uv_buf_init(work->udpTunnelBytes, work->udpTunnelBytesLen);
struct sockaddr_in udpTunnelDestAddr;
uv_ip4_addr(SRV_IP, UDP_TUNNEL_SRV_PORT, &udpTunnelDestAddr);
uv_udp_send(udpTunnelReq, udpSocket, &udpTunnelSendBuffer, 1, (struct sockaddr const*)&udpTunnelDestAddr, _onSend);
}
}
void _afterPunchServer(uv_work_t* wrapper, int status) {
CCLOG("UDP send about to free PunchServerWork for status:%d...", status);
@@ -177,8 +195,8 @@ void _punchPeerOnUvThreadDelayed(uv_timer_t* timer, int status) {
//CCLOG("UDP about to punch peer joinIndex:%d", i);
char peerIp[17] = { 0 };
uv_ip4_name((struct sockaddr_in*)&(peerAddrList[i].sockAddrIn), peerIp, sizeof peerIp);
int peerPortSt = ntohs(peerAddrList[i].sockAddrIn.sin_port) - 3;
int peerPortEd = ntohs(peerAddrList[i].sockAddrIn.sin_port) + 3;
int peerPortSt = ntohs(peerAddrList[i].sockAddrIn.sin_port);
int peerPortEd = ntohs(peerAddrList[i].sockAddrIn.sin_port) + 1; // Use tunnel of backend instead of sweeping ports blindly!
for (int peerPort = peerPortSt; peerPort < peerPortEd; peerPort++) {
if (0 > peerPort) continue;
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
@@ -216,7 +234,7 @@ void _afterPunchPeerTimerStarted(uv_work_t* wrapper, int status) {
class BroadcastInputFrameUpsyncWork {
public:
BYTEC bytes[128]; // Wasting some RAM here thus no need for explicit recursive destruction
BYTEC bytes[maxUdpPayloadBytes]; // Wasting some RAM here thus no need for explicit recursive destruction
size_t bytesLen;
int roomCapacity;
int selfJoinIndex;
@@ -323,7 +341,7 @@ bool DelayNoMore::UdpSession::closeUdpSession() {
return true;
}
bool DelayNoMore::UdpSession::punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes, size_t bytesLen) {
bool DelayNoMore::UdpSession::punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes, size_t bytesLen, int const udpTunnelSrvPort, BYTEC* const udpTunnelBytes, size_t udpTunnelBytesBytesLen) {
/*
[WARNING] The RAM space used for "bytes", either on stack or in heap, is preallocatedand managed by the caller which runs on the GameThread. Actual sending will be made on UvThread.
@@ -332,7 +350,8 @@ bool DelayNoMore::UdpSession::punchToServer(CHARC* const srvIp, int const srvPor
memset(SRV_IP, 0, sizeof SRV_IP);
memcpy(SRV_IP, srvIp, strlen(srvIp));
SRV_PORT = srvPort;
PunchServerWork* work = new PunchServerWork(bytes, bytesLen);
UDP_TUNNEL_SRV_PORT = udpTunnelSrvPort;
PunchServerWork* work = new PunchServerWork(bytes, bytesLen, udpTunnelBytes, udpTunnelBytesBytesLen);
uv_work_t* wrapper = (uv_work_t*)malloc(sizeof(uv_work_t));
wrapper->data = work;
uv_queue_work(loop, wrapper, _punchServerOnUvThread, _afterPunchServer);

View File

@@ -6,6 +6,7 @@
typedef char BYTEC;
typedef char const CHARC;
int const maxUdpPayloadBytes = 128;
int const maxPeerCnt = 10;
struct PeerAddr {
@@ -20,7 +21,7 @@ namespace DelayNoMore {
static bool closeUdpSession();
static bool upsertPeerUdpAddr(struct PeerAddr* newPeerAddrList, int roomCapacity, int selfJoinIndex);
//static bool clearPeerUDPAddrList();
static bool punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes, size_t bytesLen);
static bool punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes, size_t bytesLen, int const udpTunnelSrvPort, BYTEC* const udpTunnelBytes, size_t udpTunnelBytesBytesLen);
static bool broadcastInputFrameUpsync(BYTEC* const bytes, size_t bytesLen, int roomCapacity, int selfJoinIndex);
};
}

View File

@@ -22,7 +22,9 @@ bool punchToServer(se::State& s) {
const auto& args = s.args();
size_t argc = args.size();
CC_UNUSED bool ok = true;
if (3 == argc && args[0].isString() && args[1].isNumber() && args[2].isObject() && args[2].toObject()->isTypedArray()) {
if (5 == argc && args[0].isString() && args[1].isNumber() && args[2].isObject() && args[2].toObject()->isTypedArray()
&& args[3].isNumber() && args[4].isObject() && args[4].toObject()->isTypedArray()
) {
SE_PRECONDITION2(ok, false, "punchToServer: Error processing arguments");
CHARC* srvIp = args[0].toString().c_str();
int srvPort = args[1].toInt32();
@@ -35,10 +37,22 @@ bool punchToServer(se::State& s) {
for (size_t i = 0; i < sz; i++) {
bytes[i] = (char)(*(ptr + i));
}
CCLOG("Should punch %s:%d by %d bytes.", srvIp, srvPort, sz);
return DelayNoMore::UdpSession::punchToServer(srvIp, srvPort, bytes, sz);
int udpTunnelSrvPort = args[3].toInt32();
BYTEC udpTunnelBytes[1024];
memset(udpTunnelBytes, 0, sizeof udpTunnelBytes);
se::Object* udpTunnelObj = args[4].toObject();
size_t udpTunnelSz = 0;
uint8_t* udpTunnelPtr = NULL;
obj->getTypedArrayData(&udpTunnelPtr, &udpTunnelSz);
for (size_t i = 0; i < udpTunnelSz; i++) {
udpTunnelBytes[i] = (char)(*(udpTunnelPtr + i));
}
CCLOG("Should punch %s:%d by %d bytes; should punch udp tunnel %s:%d by %d bytes.", srvIp, srvPort, sz, srvIp, udpTunnelSrvPort, udpTunnelSz);
return DelayNoMore::UdpSession::punchToServer(srvIp, srvPort, bytes, sz, udpTunnelSrvPort, udpTunnelBytes, udpTunnelSz);
}
SE_REPORT_ERROR("wrong number of arguments: %d, was expecting %d; or wrong arg type!", (int)argc, 3);
SE_REPORT_ERROR("wrong number of arguments: %d, was expecting %d; or wrong arg type!", (int)argc, 5);
return false;
}
SE_BIND_FUNC(punchToServer)