mirror of
https://github.com/genxium/DelayNoMore
synced 2025-01-13 14:31:36 +00:00
Enhanced UDP message callback handling.
This commit is contained in:
parent
7b0c807496
commit
b79e2dc935
File diff suppressed because one or more lines are too long
@ -33,31 +33,70 @@ SendWork* SendRingBuff::pop() {
|
|||||||
|
|
||||||
// Recving
|
// Recving
|
||||||
void RecvRingBuff::put(char* newBytes, size_t newBytesLen) {
|
void RecvRingBuff::put(char* newBytes, size_t newBytesLen) {
|
||||||
while (0 < cnt && cnt >= n) {
|
RecvWork* slotEle = (&eles[ed.load()]); // Save for later update
|
||||||
|
|
||||||
|
int oldCnt = cnt.load();
|
||||||
|
int oldSt = st.load(); // Used to guard against "cnt decremented in pop(...), but st not yet incremented and thus return value not yet copied to avoid contamination"
|
||||||
|
int tried = 0;
|
||||||
|
while (n <= oldCnt && !ed.compare_exchange_weak(oldSt, oldSt) && 3 > tried) {
|
||||||
// Make room for the new element
|
// Make room for the new element
|
||||||
this->pop();
|
this->pop(NULL);
|
||||||
|
oldCnt = cnt.load(); // If "pop()" above failed, it'd only be due to concurrent calls to "pop()", either way the updated "cnt" should be good to go
|
||||||
|
oldSt = st.load();
|
||||||
|
++tried;
|
||||||
}
|
}
|
||||||
eles[ed].bytesLen = newBytesLen;
|
if (n <= oldCnt && !ed.compare_exchange_weak(oldSt, oldSt) && 3 == tried) {
|
||||||
memset(eles[ed].ui8Arr, 0, sizeof eles[ed].ui8Arr);
|
// Failed silently, UDP packet can be dropped.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
slotEle->bytesLen = newBytesLen;
|
||||||
|
memset(slotEle->ui8Arr, 0, sizeof slotEle->ui8Arr);
|
||||||
for (int i = 0; i < newBytesLen; i++) {
|
for (int i = 0; i < newBytesLen; i++) {
|
||||||
*(eles[ed].ui8Arr + i) = *(newBytes + i);
|
*(slotEle->ui8Arr + i) = *(newBytes + i);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// No need to compare-and-swap, only "UvRecvThread" will access "RecvRingBuff.ed".
|
||||||
ed++;
|
ed++;
|
||||||
cnt++;
|
|
||||||
if (ed >= n) {
|
if (ed >= n) {
|
||||||
ed -= n; // Deliberately not using "%" operator for performance concern
|
ed -= n; // Deliberately not using "%" operator for performance concern
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Only increment cnt when the putting of new element is fully done.
|
||||||
|
cnt++;
|
||||||
}
|
}
|
||||||
|
|
||||||
RecvWork* RecvRingBuff::pop() {
|
bool RecvRingBuff::pop(RecvWork* out) {
|
||||||
if (0 == cnt) {
|
int oldCnt = std::atomic_fetch_sub(&cnt, 1);
|
||||||
|
/*
|
||||||
|
[WARNING]
|
||||||
|
|
||||||
|
After here, two cases should be taken care of.
|
||||||
|
1. If "n == oldCnt", we need guard against "put" to avoid contaminating "ret" by the "putting".
|
||||||
|
2. If "0 >= oldCnt", we need guard against another "pop" to avoid over-popping.
|
||||||
|
*/
|
||||||
|
if (0 >= oldCnt) {
|
||||||
|
// "pop" could be accessed by either "GameThread/pollUdpRecvRingBuff" or "UvRecvThread/put", thus we should be proactively guard against concurrent popping while "1 == cnt"
|
||||||
|
++cnt;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
RecvWork* ret = &(eles[st]);
|
|
||||||
cnt--;
|
// When concurrent "pop"s reach here, over-popping is definitely avoided.
|
||||||
st++;
|
int oldSt = st.load();
|
||||||
if (st >= n) {
|
if (out) {
|
||||||
st -= n;
|
RecvWork* src = (&eles[oldSt]);
|
||||||
|
memset(out->ui8Arr, 0, sizeof out->ui8Arr);
|
||||||
|
memcpy(out->ui8Arr, src, src->bytesLen);
|
||||||
|
out->bytesLen = src->bytesLen;
|
||||||
|
}
|
||||||
|
int newSt = oldSt + 1;
|
||||||
|
if (newSt >= n) {
|
||||||
|
newSt -= n;
|
||||||
|
}
|
||||||
|
if (st.compare_exchange_weak(oldSt, newSt)) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
// Failed concurrent access should recover the "cnt"
|
||||||
|
++cnt;
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
return ret;
|
|
||||||
}
|
}
|
@ -4,6 +4,8 @@
|
|||||||
#include "uv/uv.h"
|
#include "uv/uv.h"
|
||||||
#define __SSIZE_T // Otherwise "ssize_t" would have conflicting macros error that stops compiling
|
#define __SSIZE_T // Otherwise "ssize_t" would have conflicting macros error that stops compiling
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
|
||||||
int const RING_BUFF_CONSECUTIVE_SET = 0;
|
int const RING_BUFF_CONSECUTIVE_SET = 0;
|
||||||
int const RING_BUFF_NON_CONSECUTIVE_SET = 1;
|
int const RING_BUFF_NON_CONSECUTIVE_SET = 1;
|
||||||
int const RING_BUFF_FAILED_TO_SET = 2;
|
int const RING_BUFF_FAILED_TO_SET = 2;
|
||||||
@ -48,10 +50,17 @@ public:
|
|||||||
size_t bytesLen;
|
size_t bytesLen;
|
||||||
};
|
};
|
||||||
|
|
||||||
// [WARNING] This class is specific to "RecvWork"
|
/*
|
||||||
|
[WARNING] This class is specific to "RecvWork"; its "put" and "pop" methods are designed to be thread-safe & lock-free for our particular case, i.e. only concurrent access from "UvRecvThread" & "GameThread", in a sense more sophisticated than the Golang or JavaScript versions.
|
||||||
|
|
||||||
|
There's yet no plan to support thread-safe & lock-free "getByFrameId/setByFrameId" -- being thread-safe is easy by use of mutex, which is very SLOWWWWW when used in 60fps race-conditions.
|
||||||
|
|
||||||
|
The generic "thread-safe, lock-free ring buffer or circular buffer" is a big problem, widely discussed over the internet and in literatures, search "lock-free circular buffer" for more information.
|
||||||
|
*/
|
||||||
class RecvRingBuff {
|
class RecvRingBuff {
|
||||||
public:
|
public:
|
||||||
int ed, st, n, cnt;
|
int n;
|
||||||
|
std::atomic_int ed, st, cnt;
|
||||||
RecvWork eles[maxBuffedMsgs]; // preallocated on stack to save heap alloc/dealloc time
|
RecvWork eles[maxBuffedMsgs]; // preallocated on stack to save heap alloc/dealloc time
|
||||||
RecvRingBuff(int newN) {
|
RecvRingBuff(int newN) {
|
||||||
this->n = newN;
|
this->n = newN;
|
||||||
@ -60,6 +69,6 @@ public:
|
|||||||
|
|
||||||
void put(char* newBytes, size_t newBytesLen);
|
void put(char* newBytes, size_t newBytesLen);
|
||||||
|
|
||||||
RecvWork* pop();
|
bool pop(RecvWork* out);
|
||||||
};
|
};
|
||||||
#endif
|
#endif
|
||||||
|
@ -72,9 +72,9 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr
|
|||||||
#if defined(COCOS2D_DEBUG) && (COCOS2D_DEBUG > 0)
|
#if defined(COCOS2D_DEBUG) && (COCOS2D_DEBUG > 0)
|
||||||
CCLOG("UDP received %u bytes inputFrameUpsync from %s:%d", nread, ip, port);
|
CCLOG("UDP received %u bytes inputFrameUpsync from %s:%d", nread, ip, port);
|
||||||
#endif
|
#endif
|
||||||
uv_mutex_lock(&recvRingBuffLock);
|
//uv_mutex_lock(&recvRingBuffLock);
|
||||||
recvRingBuff->put(buf->base, nread);
|
recvRingBuff->put(buf->base, nread);
|
||||||
uv_mutex_unlock(&recvRingBuffLock);
|
//uv_mutex_unlock(&recvRingBuffLock);
|
||||||
}
|
}
|
||||||
free(buf->base);
|
free(buf->base);
|
||||||
|
|
||||||
@ -352,15 +352,12 @@ bool DelayNoMore::UdpSession::broadcastInputFrameUpsync(BYTEC* const bytes, size
|
|||||||
bool DelayNoMore::UdpSession::pollUdpRecvRingBuff() {
|
bool DelayNoMore::UdpSession::pollUdpRecvRingBuff() {
|
||||||
// This function is called by GameThread 60 fps.
|
// This function is called by GameThread 60 fps.
|
||||||
|
|
||||||
if (0 >= recvRingBuff->cnt) {
|
//uv_mutex_lock(&recvRingBuffLock);
|
||||||
// This check is NOT thread-safe, but as "pollUdpRecvRingBuff" is called by GameThread, we want it to lock as few as possible.
|
while (true) {
|
||||||
return true;
|
RecvWork f;
|
||||||
}
|
bool res = recvRingBuff->pop(&f);
|
||||||
|
if (!res) return false;
|
||||||
|
|
||||||
uv_mutex_lock(&recvRingBuffLock);
|
|
||||||
RecvWork* f = NULL;
|
|
||||||
while (0 < recvRingBuff->cnt) {
|
|
||||||
f = recvRingBuff->pop();
|
|
||||||
// [WARNING] Declaring "AutoHandleScope" is critical here, otherwise "onUdpMessageCb.toObject()" wouldn't be recognized as a function of the ScriptEngine!
|
// [WARNING] Declaring "AutoHandleScope" is critical here, otherwise "onUdpMessageCb.toObject()" wouldn't be recognized as a function of the ScriptEngine!
|
||||||
se::AutoHandleScope hs;
|
se::AutoHandleScope hs;
|
||||||
// [WARNING] Use of the "ScriptEngine" is only allowed in "GameThread a.k.a. CocosThread"!
|
// [WARNING] Use of the "ScriptEngine" is only allowed in "GameThread a.k.a. CocosThread"!
|
||||||
@ -368,7 +365,7 @@ bool DelayNoMore::UdpSession::pollUdpRecvRingBuff() {
|
|||||||
se::ScriptEngine::getInstance()->getGlobalObject()->getProperty("onUdpMessage", &onUdpMessageCb);
|
se::ScriptEngine::getInstance()->getGlobalObject()->getProperty("onUdpMessage", &onUdpMessageCb);
|
||||||
if (onUdpMessageCb.isObject() && onUdpMessageCb.toObject()->isFunction()) {
|
if (onUdpMessageCb.isObject() && onUdpMessageCb.toObject()->isFunction()) {
|
||||||
//CCLOG("UDP received %d bytes upsync -- 1", nread);
|
//CCLOG("UDP received %d bytes upsync -- 1", nread);
|
||||||
se::Object* const gameThreadMsg = se::Object::createTypedArray(se::Object::TypedArrayType::UINT8, f->ui8Arr, f->bytesLen);
|
se::Object* const gameThreadMsg = se::Object::createTypedArray(se::Object::TypedArrayType::UINT8, f.ui8Arr, f.bytesLen);
|
||||||
//CCLOG("UDP received %d bytes upsync -- 2", nread);
|
//CCLOG("UDP received %d bytes upsync -- 2", nread);
|
||||||
se::ValueArray args = { se::Value(gameThreadMsg) };
|
se::ValueArray args = { se::Value(gameThreadMsg) };
|
||||||
|
|
||||||
@ -382,6 +379,6 @@ bool DelayNoMore::UdpSession::pollUdpRecvRingBuff() {
|
|||||||
//CCLOG("UDP received %d bytes upsync -- 4", nread);
|
//CCLOG("UDP received %d bytes upsync -- 4", nread);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
uv_mutex_unlock(&recvRingBuffLock);
|
//uv_mutex_unlock(&recvRingBuffLock);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
@ -755,7 +755,7 @@ func ApplyInputFrameDownsyncDynamicsOnSingleRenderFrame(inputsBuffer *RingBuffer
|
|||||||
// Reset playerCollider position from the "virtual grid position"
|
// Reset playerCollider position from the "virtual grid position"
|
||||||
newVx, newVy := currPlayerDownsync.VirtualGridX+currPlayerDownsync.VelX, currPlayerDownsync.VirtualGridY+currPlayerDownsync.VelY
|
newVx, newVy := currPlayerDownsync.VirtualGridX+currPlayerDownsync.VelX, currPlayerDownsync.VirtualGridY+currPlayerDownsync.VelY
|
||||||
if 0 >= thatPlayerInNextFrame.Hp && 0 == thatPlayerInNextFrame.FramesToRecover {
|
if 0 >= thatPlayerInNextFrame.Hp && 0 == thatPlayerInNextFrame.FramesToRecover {
|
||||||
// Revive
|
// Revive from Dying
|
||||||
newVx, newVy = currPlayerDownsync.RevivalVirtualGridX, currPlayerDownsync.RevivalVirtualGridY
|
newVx, newVy = currPlayerDownsync.RevivalVirtualGridX, currPlayerDownsync.RevivalVirtualGridY
|
||||||
thatPlayerInNextFrame.CharacterState = ATK_CHARACTER_STATE_IDLE1
|
thatPlayerInNextFrame.CharacterState = ATK_CHARACTER_STATE_IDLE1
|
||||||
thatPlayerInNextFrame.Hp = currPlayerDownsync.MaxHp
|
thatPlayerInNextFrame.Hp = currPlayerDownsync.MaxHp
|
||||||
@ -945,7 +945,8 @@ func ApplyInputFrameDownsyncDynamicsOnSingleRenderFrame(inputsBuffer *RingBuffer
|
|||||||
if fallStopping {
|
if fallStopping {
|
||||||
thatPlayerInNextFrame.VelY = 0
|
thatPlayerInNextFrame.VelY = 0
|
||||||
thatPlayerInNextFrame.VelX = 0
|
thatPlayerInNextFrame.VelX = 0
|
||||||
if ATK_CHARACTER_STATE_BLOWN_UP1 == thatPlayerInNextFrame.CharacterState {
|
if ATK_CHARACTER_STATE_DYING == thatPlayerInNextFrame.CharacterState {
|
||||||
|
} else if ATK_CHARACTER_STATE_BLOWN_UP1 == thatPlayerInNextFrame.CharacterState {
|
||||||
thatPlayerInNextFrame.CharacterState = ATK_CHARACTER_STATE_LAY_DOWN1
|
thatPlayerInNextFrame.CharacterState = ATK_CHARACTER_STATE_LAY_DOWN1
|
||||||
thatPlayerInNextFrame.FramesToRecover = chConfig.LayDownFramesToRecover
|
thatPlayerInNextFrame.FramesToRecover = chConfig.LayDownFramesToRecover
|
||||||
} else {
|
} else {
|
||||||
@ -960,9 +961,11 @@ func ApplyInputFrameDownsyncDynamicsOnSingleRenderFrame(inputsBuffer *RingBuffer
|
|||||||
thatPlayerInNextFrame.FramesToRecover = 0
|
thatPlayerInNextFrame.FramesToRecover = 0
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// landedOnGravityPushback not fallStopping, could be in LayDown or GetUp
|
// landedOnGravityPushback not fallStopping, could be in LayDown or GetUp or Dying
|
||||||
if _, existent := nonAttackingSet[thatPlayerInNextFrame.CharacterState]; existent {
|
if _, existent := nonAttackingSet[thatPlayerInNextFrame.CharacterState]; existent {
|
||||||
if ATK_CHARACTER_STATE_LAY_DOWN1 == thatPlayerInNextFrame.CharacterState {
|
if ATK_CHARACTER_STATE_DYING == thatPlayerInNextFrame.CharacterState {
|
||||||
|
// No update needed for Dying
|
||||||
|
} else if ATK_CHARACTER_STATE_LAY_DOWN1 == thatPlayerInNextFrame.CharacterState {
|
||||||
if 0 == thatPlayerInNextFrame.FramesToRecover {
|
if 0 == thatPlayerInNextFrame.FramesToRecover {
|
||||||
thatPlayerInNextFrame.CharacterState = ATK_CHARACTER_STATE_GET_UP1
|
thatPlayerInNextFrame.CharacterState = ATK_CHARACTER_STATE_GET_UP1
|
||||||
thatPlayerInNextFrame.FramesToRecover = chConfig.GetUpFramesToRecover
|
thatPlayerInNextFrame.FramesToRecover = chConfig.GetUpFramesToRecover
|
||||||
|
Loading…
x
Reference in New Issue
Block a user