Drafted enhancement of UDP message callback.

This commit is contained in:
genxium 2023-02-09 10:18:23 +08:00
parent 5c611b626d
commit 7b0c807496
7 changed files with 156 additions and 40 deletions

View File

@ -1084,6 +1084,9 @@ fromUDP=${fromUDP}`);
} }
try { try {
let st = performance.now(); let st = performance.now();
if (cc.sys.isNative) {
DelayNoMore.UdpSession.pollUdpRecvRingBuff();
}
const noDelayInputFrameId = gopkgs.ConvertToNoDelayInputFrameId(self.renderFrameId); const noDelayInputFrameId = gopkgs.ConvertToNoDelayInputFrameId(self.renderFrameId);
let prevSelfInput = null, let prevSelfInput = null,
currSelfInput = null; currSelfInput = null;

View File

@ -1,6 +1,7 @@
#include <string.h> #include <string.h>
#include "send_ring_buff.hpp" #include "send_ring_buff.hpp"
// Sending
void SendRingBuff::put(BYTEC* const newBytes, size_t newBytesLen, PeerAddr* pNewPeerAddr) { void SendRingBuff::put(BYTEC* const newBytes, size_t newBytesLen, PeerAddr* pNewPeerAddr) {
while (0 < cnt && cnt >= n) { while (0 < cnt && cnt >= n) {
// Make room for the new element // Make room for the new element
@ -29,3 +30,34 @@ SendWork* SendRingBuff::pop() {
} }
return ret; return ret;
} }
// Recving
void RecvRingBuff::put(char* newBytes, size_t newBytesLen) {
while (0 < cnt && cnt >= n) {
// Make room for the new element
this->pop();
}
eles[ed].bytesLen = newBytesLen;
memset(eles[ed].ui8Arr, 0, sizeof eles[ed].ui8Arr);
for (int i = 0; i < newBytesLen; i++) {
*(eles[ed].ui8Arr + i) = *(newBytes + i);
}
ed++;
cnt++;
if (ed >= n) {
ed -= n; // Deliberately not using "%" operator for performance concern
}
}
RecvWork* RecvRingBuff::pop() {
if (0 == cnt) {
return NULL;
}
RecvWork* ret = &(eles[st]);
cnt--;
st++;
if (st >= n) {
st -= n;
}
return ret;
}

View File

@ -41,4 +41,25 @@ public:
SendWork* pop(); SendWork* pop();
}; };
// TODO: Move "RecvXxxx" to a dedicated class.
class RecvWork {
public:
uint8_t ui8Arr[maxUdpPayloadBytes]; // Wasting some RAM here thus no need for explicit recursive destruction
size_t bytesLen;
};
// [WARNING] This class is specific to "RecvWork"
class RecvRingBuff {
public:
int ed, st, n, cnt;
RecvWork eles[maxBuffedMsgs]; // preallocated on stack to save heap alloc/dealloc time
RecvRingBuff(int newN) {
this->n = newN;
this->st = this->ed = this->cnt = 0;
}
void put(char* newBytes, size_t newBytesLen);
RecvWork* pop();
};
#endif #endif

View File

@ -16,11 +16,15 @@ uv_loop_t *recvLoop = NULL, *sendLoop = NULL;
uv_mutex_t sendRingBuffLock; // used along with "uvSendLoopTriggerSig" as a "uv_cond_t" uv_mutex_t sendRingBuffLock; // used along with "uvSendLoopTriggerSig" as a "uv_cond_t"
SendRingBuff* sendRingBuff = NULL; SendRingBuff* sendRingBuff = NULL;
uv_mutex_t recvRingBuffLock;
RecvRingBuff* recvRingBuff = NULL;
char SRV_IP[256]; char SRV_IP[256];
int SRV_PORT = 0; int SRV_PORT = 0;
int UDP_TUNNEL_SRV_PORT = 0; int UDP_TUNNEL_SRV_PORT = 0;
struct PeerAddr udpPunchingServerAddr, udpTunnelAddr; struct PeerAddr udpPunchingServerAddr, udpTunnelAddr;
struct PeerAddr peerAddrList[maxPeerCnt]; struct PeerAddr peerAddrList[maxPeerCnt];
bool peerPunchedMarks[maxPeerCnt];
void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr const* addr, unsigned flags) { void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr const* addr, unsigned flags) {
if (nread < 0) { if (nread < 0) {
@ -29,6 +33,8 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr
free(buf->base); free(buf->base);
return; return;
} }
struct sockaddr_in const* sockAddr = (struct sockaddr_in const*)addr;
#if defined(COCOS2D_DEBUG) && (COCOS2D_DEBUG > 0) #if defined(COCOS2D_DEBUG) && (COCOS2D_DEBUG > 0)
char ip[INET_ADDRSTRLEN]; char ip[INET_ADDRSTRLEN];
memset(ip, 0, sizeof ip); memset(ip, 0, sizeof ip);
@ -38,10 +44,8 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr
// The null check for "addr" is necessary, on Android there'd be such mysterious call to "_onRead"! // The null check for "addr" is necessary, on Android there'd be such mysterious call to "_onRead"!
switch (addr->sa_family) { switch (addr->sa_family) {
case AF_INET: { case AF_INET: {
struct sockaddr_in const* sockAddr = (struct sockaddr_in const*)addr;
uv_inet_ntop(sockAddr->sin_family, &(sockAddr->sin_addr), ip, INET_ADDRSTRLEN); uv_inet_ntop(sockAddr->sin_family, &(sockAddr->sin_addr), ip, INET_ADDRSTRLEN);
port = ntohs(sockAddr->sin_port); port = ntohs(sockAddr->sin_port);
CCLOG("UDP received %u bytes from %s:%d", nread, ip, port);
break; break;
} }
default: default:
@ -53,37 +57,24 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr
#endif #endif
if (6 == nread) { if (6 == nread) {
// holepunching // Peer holepunching
for (int i = 0; i < maxPeerCnt; i++) {
if (peerAddrList[i].sockAddrIn.sin_addr.s_addr != sockAddr->sin_addr.s_addr) continue;
if (peerAddrList[i].sockAddrIn.sin_port != sockAddr->sin_port) continue;
peerPunchedMarks[i] = true;
#if defined(COCOS2D_DEBUG) && (COCOS2D_DEBUG > 0)
CCLOG("UDP received peer-holepunching from %s:%d", ip, port);
#endif
break;
}
} else if (0 < nread) { } else if (0 < nread) {
// Non-holepunching; it might be more effective in RAM usage to use a threadsafe RingBuff to pass msg to GameThread here, but as long as it's not a performance blocker don't bother optimize here... // Non-holepunching; the previously used "cocos2d::Application::getInstance()->getScheduler()->performFunctionInCocosThread(...)" approach was so non-deterministic in terms of the lag till GameThread actually recognizes this latest received packet due to scheduler uncertainty -- and was also heavier in RAM due to lambda usage
uint8_t* const ui8Arr = (uint8_t*)malloc(maxUdpPayloadBytes*sizeof(uint8_t)); #if defined(COCOS2D_DEBUG) && (COCOS2D_DEBUG > 0)
memset(ui8Arr, 0, sizeof(ui8Arr)); CCLOG("UDP received %u bytes inputFrameUpsync from %s:%d", nread, ip, port);
for (int i = 0; i < nread; i++) { #endif
*(ui8Arr+i) = *(buf->base + i); uv_mutex_lock(&recvRingBuffLock);
} recvRingBuff->put(buf->base, nread);
cocos2d::Application::getInstance()->getScheduler()->performFunctionInCocosThread([=]() { uv_mutex_unlock(&recvRingBuffLock);
// [WARNING] Use of the "ScriptEngine" is only allowed in "GameThread a.k.a. CocosThread"!
se::Value onUdpMessageCb;
se::ScriptEngine::getInstance()->getGlobalObject()->getProperty("onUdpMessage", &onUdpMessageCb);
// [WARNING] Declaring "AutoHandleScope" is critical here, otherwise "onUdpMessageCb.toObject()" wouldn't be recognized as a function of the ScriptEngine!
se::AutoHandleScope hs;
//CCLOG("UDP received %d bytes upsync -- 1", nread);
se::Object* const gameThreadMsg = se::Object::createTypedArray(se::Object::TypedArrayType::UINT8, ui8Arr, nread);
//CCLOG("UDP received %d bytes upsync -- 2", nread);
se::ValueArray args = { se::Value(gameThreadMsg) };
if (onUdpMessageCb.isObject() && onUdpMessageCb.toObject()->isFunction()) {
// Temporarily assume that the "this" ptr within callback is NULL.
bool ok = onUdpMessageCb.toObject()->call(args, NULL);
if (!ok) {
se::ScriptEngine::getInstance()->clearException();
}
}
//CCLOG("UDP received %d bytes upsync -- 3", nread);
gameThreadMsg->decRef(); // Reference http://docs.cocos.com/creator/2.2/manual/en/advanced-topics/JSB2.0-learning.html#seobject
//CCLOG("UDP received %d bytes upsync -- 4", nread);
free(ui8Arr);
//CCLOG("UDP received %d bytes upsync -- 5", nread);
});
} }
free(buf->base); free(buf->base);
@ -169,6 +160,7 @@ void startRecvLoop(void* arg) {
int uvCloseRet = uv_loop_close(l); int uvCloseRet = uv_loop_close(l);
CCLOG("UDP recv loop is closed in UvRecvThread, uvCloseRet=%d", uvCloseRet); CCLOG("UDP recv loop is closed in UvRecvThread, uvCloseRet=%d", uvCloseRet);
uv_mutex_destroy(&recvRingBuffLock);
} }
void startSendLoop(void* arg) { void startSendLoop(void* arg) {
@ -196,6 +188,10 @@ int initSendLoop(struct sockaddr const* pUdpAddr) {
} }
uv_mutex_init(&sendRingBuffLock); uv_mutex_init(&sendRingBuffLock);
sendRingBuff = new SendRingBuff(maxBuffedMsgs); sendRingBuff = new SendRingBuff(maxBuffedMsgs);
uv_mutex_init(&recvRingBuffLock);
recvRingBuff = new RecvRingBuff(maxBuffedMsgs);
uv_async_init(sendLoop, &uvSendLoopStopSig, _onUvStopSig); uv_async_init(sendLoop, &uvSendLoopStopSig, _onUvStopSig);
uv_async_init(sendLoop, &uvSendLoopTriggerSig, _onUvSthNewToSend); uv_async_init(sendLoop, &uvSendLoopTriggerSig, _onUvSthNewToSend);
@ -222,6 +218,12 @@ bool DelayNoMore::UdpSession::openUdpSession(int port) {
struct sockaddr_in udpAddr; struct sockaddr_in udpAddr;
uv_ip4_addr("0.0.0.0", port, &udpAddr); uv_ip4_addr("0.0.0.0", port, &udpAddr);
struct sockaddr const* pUdpAddr = (struct sockaddr const*)&udpAddr; struct sockaddr const* pUdpAddr = (struct sockaddr const*)&udpAddr;
memset(peerPunchedMarks, false, sizeof(peerPunchedMarks));
for (int i = 0; i < maxPeerCnt; i++) {
peerAddrList[i].authKey = -1; // hardcoded for now
memset((char*)&peerAddrList[i].sockAddrIn, 0, sizeof(peerAddrList[i].sockAddrIn));
}
/* /*
[WARNING] On Android, the libuv documentation of "UV_UDP_REUSEADDR" is true, i.e. only the socket that binds later on the same port will be triggered the recv callback; however on Windows, experiment shows that the exact reverse is true instead. [WARNING] On Android, the libuv documentation of "UV_UDP_REUSEADDR" is true, i.e. only the socket that binds later on the same port will be triggered the recv callback; however on Windows, experiment shows that the exact reverse is true instead.
@ -259,11 +261,8 @@ bool DelayNoMore::UdpSession::closeUdpSession() {
uv_thread_join(&recvTid); uv_thread_join(&recvTid);
free(udpRecvSocket); free(udpRecvSocket);
free(recvLoop); free(recvLoop);
delete recvRingBuff;
for (int i = 0; i < maxPeerCnt; i++) {
peerAddrList[i].authKey = -1; // hardcoded for now
memset((char*)&peerAddrList[i].sockAddrIn, 0, sizeof(peerAddrList[i].sockAddrIn));
}
CCLOG("Closed udp session and dealloc all resources in GameThread..."); CCLOG("Closed udp session and dealloc all resources in GameThread...");
return true; return true;
@ -321,8 +320,7 @@ bool DelayNoMore::UdpSession::broadcastInputFrameUpsync(BYTEC* const bytes, size
uv_mutex_lock(&sendRingBuffLock); uv_mutex_lock(&sendRingBuffLock);
// Might want to send several times for better arrival rate // Might want to send several times for better arrival rate
for (int j = 0; j < broadcastUpsyncCnt; j++) { for (int j = 0; j < broadcastUpsyncCnt; j++) {
// Send to room udp tunnel in case of hole punching failure int peerPunchedCnt = 0;
sendRingBuff->put(bytes, bytesLen, &udpTunnelAddr);
for (int i = 0; i < roomCapacity; i++) { for (int i = 0; i < roomCapacity; i++) {
if (i + 1 == selfJoinIndex) { if (i + 1 == selfJoinIndex) {
continue; continue;
@ -331,8 +329,17 @@ bool DelayNoMore::UdpSession::broadcastInputFrameUpsync(BYTEC* const bytes, size
// Peer addr not initialized // Peer addr not initialized
continue; continue;
} }
if (false == peerPunchedMarks[i]) {
// Not punched yet, save some bandwidth
continue;
}
sendRingBuff->put(bytes, bytesLen, &(peerAddrList[i]));
++peerPunchedCnt;
}
sendRingBuff->put(bytes, bytesLen, &(peerAddrList[i])); // Content hardcoded for now if (peerPunchedCnt + 1 < roomCapacity) {
// Send to room udp tunnel in case of ANY hole punching failure
sendRingBuff->put(bytes, bytesLen, &udpTunnelAddr);
} }
} }
@ -341,3 +348,40 @@ bool DelayNoMore::UdpSession::broadcastInputFrameUpsync(BYTEC* const bytes, size
return true; return true;
} }
bool DelayNoMore::UdpSession::pollUdpRecvRingBuff() {
// This function is called by GameThread 60 fps.
if (0 >= recvRingBuff->cnt) {
// This check is NOT thread-safe, but as "pollUdpRecvRingBuff" is called by GameThread, we want it to lock as few as possible.
return true;
}
uv_mutex_lock(&recvRingBuffLock);
RecvWork* f = NULL;
while (0 < recvRingBuff->cnt) {
f = recvRingBuff->pop();
// [WARNING] Declaring "AutoHandleScope" is critical here, otherwise "onUdpMessageCb.toObject()" wouldn't be recognized as a function of the ScriptEngine!
se::AutoHandleScope hs;
// [WARNING] Use of the "ScriptEngine" is only allowed in "GameThread a.k.a. CocosThread"!
se::Value onUdpMessageCb;
se::ScriptEngine::getInstance()->getGlobalObject()->getProperty("onUdpMessage", &onUdpMessageCb);
if (onUdpMessageCb.isObject() && onUdpMessageCb.toObject()->isFunction()) {
//CCLOG("UDP received %d bytes upsync -- 1", nread);
se::Object* const gameThreadMsg = se::Object::createTypedArray(se::Object::TypedArrayType::UINT8, f->ui8Arr, f->bytesLen);
//CCLOG("UDP received %d bytes upsync -- 2", nread);
se::ValueArray args = { se::Value(gameThreadMsg) };
// Temporarily assume that the "this" ptr within callback is NULL.
bool ok = onUdpMessageCb.toObject()->call(args, NULL);
if (!ok) {
se::ScriptEngine::getInstance()->clearException();
}
//CCLOG("UDP received %d bytes upsync -- 3", nread);
gameThreadMsg->decRef(); // Reference http://docs.cocos.com/creator/2.2/manual/en/advanced-topics/JSB2.0-learning.html#seobject
//CCLOG("UDP received %d bytes upsync -- 4", nread);
}
}
uv_mutex_unlock(&recvRingBuffLock);
return true;
}

View File

@ -14,6 +14,7 @@ namespace DelayNoMore {
//static bool clearPeerUDPAddrList(); //static bool clearPeerUDPAddrList();
static bool punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes, size_t bytesLen, int const udpTunnelSrvPort, BYTEC* const udpTunnelBytes, size_t udpTunnelBytesBytesLen); static bool punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes, size_t bytesLen, int const udpTunnelSrvPort, BYTEC* const udpTunnelBytes, size_t udpTunnelBytesBytesLen);
static bool broadcastInputFrameUpsync(BYTEC* const bytes, size_t bytesLen, int roomCapacity, int selfJoinIndex); static bool broadcastInputFrameUpsync(BYTEC* const bytes, size_t bytesLen, int roomCapacity, int selfJoinIndex);
static bool pollUdpRecvRingBuff();
}; };
} }
#endif #endif

View File

@ -126,6 +126,20 @@ bool upsertPeerUdpAddr(se::State& s) {
} }
SE_BIND_FUNC(upsertPeerUdpAddr) SE_BIND_FUNC(upsertPeerUdpAddr)
bool pollUdpRecvRingBuff(se::State& s) {
const auto& args = s.args();
size_t argc = args.size();
CC_UNUSED bool ok = true;
if (0 == argc) {
SE_PRECONDITION2(ok, false, "upsertPeerUdpAddr: Error processing arguments");
return DelayNoMore::UdpSession::pollUdpRecvRingBuff();
}
SE_REPORT_ERROR("wrong number of arguments: %d, was expecting %d; or wrong arg type!", (int)argc, 0);
return false;
}
SE_BIND_FUNC(pollUdpRecvRingBuff)
static bool udpSessionFinalize(se::State& s) { static bool udpSessionFinalize(se::State& s) {
CCLOGINFO("jsbindings: finalizing JS object %p (DelayNoMore::UdpSession)", s.nativeThisObject()); CCLOGINFO("jsbindings: finalizing JS object %p (DelayNoMore::UdpSession)", s.nativeThisObject());
auto iter = se::NonRefNativePtrCreatedByCtorMap::find(s.nativeThisObject()); auto iter = se::NonRefNativePtrCreatedByCtorMap::find(s.nativeThisObject());
@ -158,6 +172,7 @@ bool registerUdpSession(se::Object* obj) {
cls->defineStaticFunction("broadcastInputFrameUpsync", _SE(broadcastInputFrameUpsync)); cls->defineStaticFunction("broadcastInputFrameUpsync", _SE(broadcastInputFrameUpsync));
cls->defineStaticFunction("closeUdpSession", _SE(closeUdpSession)); cls->defineStaticFunction("closeUdpSession", _SE(closeUdpSession));
cls->defineStaticFunction("upsertPeerUdpAddr", _SE(upsertPeerUdpAddr)); cls->defineStaticFunction("upsertPeerUdpAddr", _SE(upsertPeerUdpAddr));
cls->defineStaticFunction("pollUdpRecvRingBuff", _SE(pollUdpRecvRingBuff));
cls->defineFinalizeFunction(_SE(udpSessionFinalize)); cls->defineFinalizeFunction(_SE(udpSessionFinalize));
cls->install(); cls->install();

View File

@ -15,5 +15,5 @@ SE_DECLARE_FUNC(punchToServer);
SE_DECLARE_FUNC(broadcastInputFrameUpsync); SE_DECLARE_FUNC(broadcastInputFrameUpsync);
SE_DECLARE_FUNC(closeUdpSession); SE_DECLARE_FUNC(closeUdpSession);
SE_DECLARE_FUNC(upsertPeerUdpAddr); SE_DECLARE_FUNC(upsertPeerUdpAddr);
SE_DECLARE_FUNC(pollUdpRecvRingBuff);
#endif #endif