Compare commits

..

2 Commits

Author SHA1 Message Date
genxium
8de2d6e4e7 Enhancement for type#1 force-confirmation trigger. 2023-01-31 09:57:37 +08:00
genxium
ba2dd0b22e Added thread-safety comments for libuv codes. 2023-01-30 23:41:22 +08:00
3 changed files with 22 additions and 11 deletions

View File

@@ -574,7 +574,7 @@ func (pR *Room) StartBattle() {
})
}
func (pR *Room) OnBattleCmdReceived(pReq *pb.WsReq) {
func (pR *Room) OnBattleCmdReceived(pReq *pb.WsReq, fromUDP bool) {
/*
[WARNING] This function "OnBattleCmdReceived" could be called by different ws sessions and thus from different threads!
@@ -619,7 +619,7 @@ func (pR *Room) OnBattleCmdReceived(pReq *pb.WsReq) {
//Logger.Debug(fmt.Sprintf("OnBattleCmdReceived-InputsBufferLock unlocked: roomId=%v, fromPlayerId=%v", pR.Id, playerId))
}()
inputsBufferSnapshot := pR.markConfirmationIfApplicable(inputFrameUpsyncBatch, playerId, player)
inputsBufferSnapshot := pR.markConfirmationIfApplicable(inputFrameUpsyncBatch, playerId, player, fromUDP)
if nil != inputsBufferSnapshot {
pR.downsyncToAllPlayers(inputsBufferSnapshot)
} /*else {
@@ -1159,7 +1159,7 @@ func (pR *Room) getOrPrefabInputFrameDownsync(inputFrameId int32) *battle.InputF
return currInputFrameDownsync
}
func (pR *Room) markConfirmationIfApplicable(inputFrameUpsyncBatch []*pb.InputFrameUpsync, playerId int32, player *Player) *pb.InputsBufferSnapshot {
func (pR *Room) markConfirmationIfApplicable(inputFrameUpsyncBatch []*pb.InputFrameUpsync, playerId int32, player *Player, fromUDP bool) *pb.InputsBufferSnapshot {
// [WARNING] This function MUST BE called while "pR.InputsBufferLock" is locked!
// Step#1, put the received "inputFrameUpsyncBatch" into "pR.InputsBuffer"
for _, inputFrameUpsync := range inputFrameUpsyncBatch {
@@ -1182,11 +1182,14 @@ func (pR *Room) markConfirmationIfApplicable(inputFrameUpsyncBatch []*pb.InputFr
targetInputFrameDownsync.InputList[player.JoinIndex-1] = inputFrameUpsync.Encoded
targetInputFrameDownsync.ConfirmedList |= uint64(1 << uint32(player.JoinIndex-1))
player.LastReceivedInputFrameId = clientInputFrameId
pR.LastIndividuallyConfirmedInputList[player.JoinIndex-1] = inputFrameUpsync.Encoded
if clientInputFrameId > pR.LatestPlayerUpsyncedInputFrameId {
pR.LatestPlayerUpsyncedInputFrameId = clientInputFrameId
if false == fromUDP {
// [WARNING] We have to distinguish whether or not the incoming batch is from UDP here, otherwise "pR.LatestPlayerUpsyncedInputFrameId - pR.LastAllConfirmedInputFrameId" might become unexpectedly large in case of "UDP packet loss + slow ws session"!
player.LastReceivedInputFrameId = clientInputFrameId
if clientInputFrameId > pR.LatestPlayerUpsyncedInputFrameId {
pR.LatestPlayerUpsyncedInputFrameId = clientInputFrameId
}
// It's safe (in terms of getting an eventually correct "RenderFrameBuffer") to put the following update of "pR.LastIndividuallyConfirmedInputList" which is ONLY used for prediction in "InputsBuffer" out of "false == fromUDP" block, but I'm still putting it in for convenient debugging.
pR.LastIndividuallyConfirmedInputList[player.JoinIndex-1] = inputFrameUpsync.Encoded
}
}
@@ -1760,7 +1763,7 @@ func (pR *Room) startBattleUdpTunnel() {
Logger.Warn(fmt.Sprintf("`BattleUdpTunnel` for roomId=%d failed to forward upsync from (playerId:%d, joinIndex:%d, addr:%s) to (otherPlayerId:%d, otherPlayerJoinIndex:%d, otherPlayerAddr:%s)\n", pR.Id, playerId, peerJoinIndex, remote, otherPlayer.Id, otherPlayer.JoinIndex, otherPlayer.BattleUdpTunnelAddr))
}
}
pR.OnBattleCmdReceived(pReq) // To help advance "pR.LastAllConfirmedInputFrameId" asap
pR.OnBattleCmdReceived(pReq, true) // To help advance "pR.LastAllConfirmedInputFrameId" asap, and even if "pR.LastAllConfirmedInputFrameId" is not advanced due to packet loss, these UDP packets would help prefill the "InputsBuffer" with correct player "future inputs (compared to ws session)" such that when "forceConfirmation" occurs we have as many correct predictions as possible
}
}

View File

@@ -388,7 +388,7 @@ func Serve(c *gin.Context) {
startOrFeedHeartbeatWatchdog(conn)
case models.UPSYNC_MSG_ACT_PLAYER_CMD:
startOrFeedHeartbeatWatchdog(conn)
pRoom.OnBattleCmdReceived(pReq)
pRoom.OnBattleCmdReceived(pReq, false)
case models.UPSYNC_MSG_ACT_PLAYER_COLLIDER_ACK:
res := pRoom.OnPlayerBattleColliderAcked(int32(playerId))
if false == res {

View File

@@ -35,7 +35,7 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr
struct sockaddr_in const* sockAddr = (struct sockaddr_in const*)addr;
uv_inet_ntop(sockAddr->sin_family, &(sockAddr->sin_addr), ip, INET_ADDRSTRLEN);
port = ntohs(sockAddr->sin_port);
//CCLOG("UDP received %d bytes from %s:%d", nread, ip, port);
CCLOG("UDP received %d bytes from %s:%d", nread, ip, port);
break;
}
default:
@@ -325,6 +325,7 @@ bool DelayNoMore::UdpSession::openUdpSession(int port) {
uv_udp_recv_start(udpSocket, _allocBuffer, _onRead);
// TODO: Currently "sending" is also done in the "receiving loop thread", shall I segregate it to another dedicated thread?
uv_thread_create(&recvTid, startRecvLoop, loop);
CCLOG("Finished opening UDP session at port=%d", port);
@@ -363,8 +364,15 @@ bool DelayNoMore::UdpSession::punchToServer(CHARC* const srvIp, int const srvPor
SRV_PORT = srvPort;
UDP_TUNNEL_SRV_PORT = udpTunnelSrvPort;
PunchServerWork* work = new PunchServerWork(bytes, bytesLen, udpTunnelBytes, udpTunnelBytesBytesLen);
/*
TODO: Libuv is really inconvenient here, neither "uv_queue_work" nor "uv_async_init" is threadsafe(http ://docs.libuv.org/en/v1.x/threadpool.html#c.uv_queue_work)! What's the point of such a queue? It's even more difficult than writing my own implementation -- again a threadsafe RingBuff could be used to the rescue, yet I'd like to investigate more into how to make the following threadsafe APIs with minimum cross-platform C++ codes
- _sendMessage(...), should be both non-blocking & threadsafe, called from GameThread
- _onRead(...), should be called first in UvThread in an edge-triggered manner like idiomatic "epoll" or "kqueue", then dispatch the received message to GameThread by a threadsafe RingBuff
*/
uv_work_t* wrapper = (uv_work_t*)malloc(sizeof(uv_work_t));
wrapper->data = work;
uv_queue_work(loop, wrapper, _punchServerOnUvThread, _afterPunchServer);
return true;