mirror of
https://github.com/genxium/DelayNoMore
synced 2025-10-16 20:16:37 +00:00
Compare commits
15 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
70ae4a4c92 | ||
|
6f561bea87 | ||
|
70a86c27b0 | ||
|
b0f37d2237 | ||
|
09376b827d | ||
|
d560392c79 | ||
|
c75f642011 | ||
|
5cfcac6cf6 | ||
|
d37ebd4c33 | ||
|
1d138b17c3 | ||
|
851678e2f3 | ||
|
2fb6fd6bea | ||
|
e3440a2a06 | ||
|
8de2d6e4e7 | ||
|
ba2dd0b22e |
@@ -4,7 +4,7 @@ This project is a demo for a websocket-based rollback netcode inspired by [GGPO]
|
||||
|
||||
As lots of feedbacks ask for a discussion on using UDP instead, I tried to summarize my personal opinion about it in [ConcerningEdgeCases](./ConcerningEdgeCases.md) -- **since v0.9.25, the project is actually equipped with UDP capabilities as follows**.
|
||||
- When using the so called `native apps` on `Android` and `Windows` (I'm working casually hard to support `iOS` next), the frontends will try to use UDP hole-punching w/ the help of backend as a registry. If UDP hole-punching is working, the rollback is often less than `turn-around frames to recover` and thus not noticeable, being much better than using websocket alone.
|
||||
- If UDP hole-punching is not working, e.g. for Symmetric NAT like in 4G/5G cellular network, the frontends will use backend as a UDP tunnel (or relay, whatever you like to call it). This video shows how the UDP tunnel performs for a [Phone-4G v.s. PC-Wifi (viewed by PC side)](https://pan.baidu.com/s/1wdUTvRiyrTLWy7mF6G7uyQ?pwd=icmp).
|
||||
- If UDP hole-punching is not working, e.g. for Symmetric NAT like in 4G/5G cellular network, the frontends will use backend as a UDP tunnel (or relay, whatever you like to call it). This video shows how the UDP tunnel performs for a [Phone-4G v.s. PC-Wifi (viewed by PC side)](https://pan.baidu.com/s/1IZVa5wVgAdeH6D-xsZYFUw?pwd=dgkj).
|
||||
- Browser vs `native app` is possible but in that case only websocket is used.
|
||||
|
||||
The following video is recorded over INTERNET using an input delay of 4 frames and it feels SMOOTH when playing! Please also checkout these demo videos
|
||||
@@ -27,7 +27,7 @@ _(how input delay roughly works)_
|
||||
|
||||

|
||||
|
||||
_(how rollback-and-chase in this project roughly works, kindly note that by the current implementation, each frontend only maintains a `lastAllConfirmedInputFrameId` for all the other peers, because the backend only downsyncs all-confirmed inputFrames, see [markConfirmationIfApplicable](https://github.com/genxium/DelayNoMore/blob/v0.9.14/battle_srv/models/room.go#L1085) for more information -- if a serverless peer-to-peer communication is seriously needed here, consider porting [markConfirmationIfApplicable](https://github.com/genxium/DelayNoMore/blob/v0.9.14/battle_srv/models/room.go#L1085) into frontend for maintaining `lastAllConfirmedInputFrameId` under chaotic reception order of inputFrames from peers)_
|
||||
_(how rollback-and-chase in this project roughly works)_
|
||||
|
||||

|
||||

|
||||
|
@@ -47,10 +47,11 @@ type Player struct {
|
||||
TutorialStage int `db:"tutorial_stage"`
|
||||
|
||||
// other in-battle info fields
|
||||
LastReceivedInputFrameId int32
|
||||
LastSentInputFrameId int32
|
||||
AckingFrameId int32
|
||||
AckingInputFrameId int32
|
||||
LastReceivedInputFrameId int32
|
||||
LastUdpReceivedInputFrameId int32
|
||||
LastSentInputFrameId int32
|
||||
AckingFrameId int32
|
||||
AckingInputFrameId int32
|
||||
|
||||
UdpAddr *PeerUdpAddr
|
||||
BattleUdpTunnelAddr *net.UDPAddr // This addr is used by backend only, not visible to frontend
|
||||
|
@@ -136,7 +136,7 @@ type Room struct {
|
||||
EffectivePlayerCount int32
|
||||
DismissalWaitGroup sync.WaitGroup
|
||||
InputsBuffer *battle.RingBuffer // Indices are STRICTLY consecutive
|
||||
InputsBufferLock sync.Mutex // Guards [InputsBuffer, LatestPlayerUpsyncedInputFrameId, LastAllConfirmedInputFrameId, LastAllConfirmedInputList, LastAllConfirmedInputFrameIdWithChange, LastIndividuallyConfirmedInputList, player.LastReceivedInputFrameId]
|
||||
InputsBufferLock sync.Mutex // Guards [InputsBuffer, LatestPlayerUpsyncedInputFrameId, LastAllConfirmedInputFrameId, LastAllConfirmedInputList, LastAllConfirmedInputFrameIdWithChange, LastIndividuallyConfirmedInputList, player.LastReceivedInputFrameId, player.LastUdpReceivedInputFrameId]
|
||||
RenderFrameBuffer *battle.RingBuffer // Indices are STRICTLY consecutive
|
||||
LatestPlayerUpsyncedInputFrameId int32
|
||||
LastAllConfirmedInputFrameId int32
|
||||
@@ -189,6 +189,7 @@ func (pR *Room) AddPlayerIfPossible(pPlayerFromDbInit *Player, session *websocke
|
||||
pPlayerFromDbInit.AckingInputFrameId = -1
|
||||
pPlayerFromDbInit.LastSentInputFrameId = MAGIC_LAST_SENT_INPUT_FRAME_ID_NORMAL_ADDED
|
||||
pPlayerFromDbInit.LastReceivedInputFrameId = MAGIC_LAST_SENT_INPUT_FRAME_ID_NORMAL_ADDED
|
||||
pPlayerFromDbInit.LastUdpReceivedInputFrameId = MAGIC_LAST_SENT_INPUT_FRAME_ID_NORMAL_ADDED
|
||||
pPlayerFromDbInit.BattleState = PlayerBattleStateIns.ADDED_PENDING_BATTLE_COLLIDER_ACK
|
||||
|
||||
pPlayerFromDbInit.ColliderRadius = DEFAULT_PLAYER_RADIUS // Hardcoded
|
||||
@@ -230,6 +231,7 @@ func (pR *Room) ReAddPlayerIfPossible(pTmpPlayerInstance *Player, session *webso
|
||||
pEffectiveInRoomPlayerInstance.AckingFrameId = -1
|
||||
pEffectiveInRoomPlayerInstance.AckingInputFrameId = -1
|
||||
pEffectiveInRoomPlayerInstance.LastSentInputFrameId = MAGIC_LAST_SENT_INPUT_FRAME_ID_READDED
|
||||
// [WARNING] DON'T reset "player.LastReceivedInputFrameId" & "player.LastUdpReceivedInputFrameId" upon reconnection!
|
||||
pEffectiveInRoomPlayerInstance.BattleState = PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK
|
||||
|
||||
pEffectiveInRoomPlayerInstance.ColliderRadius = DEFAULT_PLAYER_RADIUS // Hardcoded
|
||||
@@ -574,7 +576,7 @@ func (pR *Room) StartBattle() {
|
||||
})
|
||||
}
|
||||
|
||||
func (pR *Room) OnBattleCmdReceived(pReq *pb.WsReq) {
|
||||
func (pR *Room) OnBattleCmdReceived(pReq *pb.WsReq, fromUDP bool) {
|
||||
/*
|
||||
[WARNING] This function "OnBattleCmdReceived" could be called by different ws sessions and thus from different threads!
|
||||
|
||||
@@ -619,7 +621,7 @@ func (pR *Room) OnBattleCmdReceived(pReq *pb.WsReq) {
|
||||
//Logger.Debug(fmt.Sprintf("OnBattleCmdReceived-InputsBufferLock unlocked: roomId=%v, fromPlayerId=%v", pR.Id, playerId))
|
||||
}()
|
||||
|
||||
inputsBufferSnapshot := pR.markConfirmationIfApplicable(inputFrameUpsyncBatch, playerId, player)
|
||||
inputsBufferSnapshot := pR.markConfirmationIfApplicable(inputFrameUpsyncBatch, playerId, player, fromUDP)
|
||||
if nil != inputsBufferSnapshot {
|
||||
pR.downsyncToAllPlayers(inputsBufferSnapshot)
|
||||
} /*else {
|
||||
@@ -815,7 +817,7 @@ func (pR *Room) OnDismissed() {
|
||||
|
||||
pR.RenderFrameId = 0
|
||||
pR.CurDynamicsRenderFrameId = 0
|
||||
pR.NstDelayFrames = 16
|
||||
pR.NstDelayFrames = 24
|
||||
|
||||
serverFps := 60
|
||||
pR.RollbackEstimatedDtMillis = 16.667 // Use fixed-and-low-precision to mitigate the inconsistent floating-point-number issue between Golang and JavaScript
|
||||
@@ -823,6 +825,7 @@ func (pR *Room) OnDismissed() {
|
||||
dilutedServerFps := float64(58.0) // Don't set this value too small, otherwise we might miss force confirmation needs for slow tickers!
|
||||
pR.dilutedRollbackEstimatedDtNanos = int64(float64(pR.RollbackEstimatedDtNanos) * float64(serverFps) / dilutedServerFps)
|
||||
pR.BattleDurationFrames = int32(60 * serverFps)
|
||||
//pR.BattleDurationFrames = int32(20 * serverFps)
|
||||
pR.BattleDurationNanos = int64(pR.BattleDurationFrames) * (pR.RollbackEstimatedDtNanos + 1)
|
||||
pR.InputFrameUpsyncDelayTolerance = battle.ConvertToNoDelayInputFrameId(pR.NstDelayFrames) - 1 // this value should be strictly smaller than (NstDelayFrames >> InputScaleFrames), otherwise "type#1 forceConfirmation" might become a lag avalanche
|
||||
pR.MaxChasingRenderFramesPerUpdate = 9 // Don't set this value too high to avoid exhausting frontend CPU within a single frame, roughly as the "turn-around frames to recover" is empirically OK
|
||||
@@ -1159,7 +1162,7 @@ func (pR *Room) getOrPrefabInputFrameDownsync(inputFrameId int32) *battle.InputF
|
||||
return currInputFrameDownsync
|
||||
}
|
||||
|
||||
func (pR *Room) markConfirmationIfApplicable(inputFrameUpsyncBatch []*pb.InputFrameUpsync, playerId int32, player *Player) *pb.InputsBufferSnapshot {
|
||||
func (pR *Room) markConfirmationIfApplicable(inputFrameUpsyncBatch []*pb.InputFrameUpsync, playerId int32, player *Player, fromUDP bool) *pb.InputsBufferSnapshot {
|
||||
// [WARNING] This function MUST BE called while "pR.InputsBufferLock" is locked!
|
||||
// Step#1, put the received "inputFrameUpsyncBatch" into "pR.InputsBuffer"
|
||||
for _, inputFrameUpsync := range inputFrameUpsyncBatch {
|
||||
@@ -1170,6 +1173,7 @@ func (pR *Room) markConfirmationIfApplicable(inputFrameUpsyncBatch []*pb.InputFr
|
||||
continue
|
||||
}
|
||||
if clientInputFrameId < player.LastReceivedInputFrameId {
|
||||
// [WARNING] It's important for correctness that we use "player.LastReceivedInputFrameId" instead of "player.LastUdpReceivedInputFrameId" here!
|
||||
Logger.Debug(fmt.Sprintf("Omitting obsolete inputFrameUpsync#2: roomId=%v, playerId=%v, clientInputFrameId=%v, playerLastReceivedInputFrameId=%v, InputsBuffer=%v", pR.Id, playerId, clientInputFrameId, player.LastReceivedInputFrameId, pR.InputsBufferString(false)))
|
||||
continue
|
||||
}
|
||||
@@ -1182,11 +1186,25 @@ func (pR *Room) markConfirmationIfApplicable(inputFrameUpsyncBatch []*pb.InputFr
|
||||
targetInputFrameDownsync.InputList[player.JoinIndex-1] = inputFrameUpsync.Encoded
|
||||
targetInputFrameDownsync.ConfirmedList |= uint64(1 << uint32(player.JoinIndex-1))
|
||||
|
||||
player.LastReceivedInputFrameId = clientInputFrameId
|
||||
pR.LastIndividuallyConfirmedInputList[player.JoinIndex-1] = inputFrameUpsync.Encoded
|
||||
if false == fromUDP {
|
||||
/*
|
||||
[WARNING] We have to distinguish whether or not the incoming batch is from UDP here, otherwise "pR.LatestPlayerUpsyncedInputFrameId - pR.LastAllConfirmedInputFrameId" might become unexpectedly large in case of "UDP packet loss + slow ws session"!
|
||||
|
||||
if clientInputFrameId > pR.LatestPlayerUpsyncedInputFrameId {
|
||||
pR.LatestPlayerUpsyncedInputFrameId = clientInputFrameId
|
||||
Moreover, only ws session upsyncs should advance "player.LastReceivedInputFrameId" & "pR.LatestPlayerUpsyncedInputFrameId".
|
||||
|
||||
Kindly note that the updates of "player.LastReceivedInputFrameId" could be discrete before and after reconnection.
|
||||
*/
|
||||
player.LastReceivedInputFrameId = clientInputFrameId
|
||||
if clientInputFrameId > pR.LatestPlayerUpsyncedInputFrameId {
|
||||
pR.LatestPlayerUpsyncedInputFrameId = clientInputFrameId
|
||||
}
|
||||
}
|
||||
|
||||
if clientInputFrameId > player.LastUdpReceivedInputFrameId {
|
||||
// No need to update "player.LastUdpReceivedInputFrameId" only when "true == fromUDP", we should keep "player.LastUdpReceivedInputFrameId >= player.LastReceivedInputFrameId" at any moment.
|
||||
player.LastUdpReceivedInputFrameId = clientInputFrameId
|
||||
// It's safe (in terms of getting an eventually correct "RenderFrameBuffer") to put the following update of "pR.LastIndividuallyConfirmedInputList" which is ONLY used for prediction in "InputsBuffer" out of "false == fromUDP" block.
|
||||
pR.LastIndividuallyConfirmedInputList[player.JoinIndex-1] = inputFrameUpsync.Encoded
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1253,6 +1271,7 @@ func (pR *Room) forceConfirmationIfApplicable(prevRenderFrameId int32) uint64 {
|
||||
totPlayerCnt := uint32(pR.Capacity)
|
||||
allConfirmedMask := uint64((1 << totPlayerCnt) - 1)
|
||||
unconfirmedMask := uint64(0)
|
||||
// As "pR.LastAllConfirmedInputFrameId" can be advanced by UDP but "pR.LatestPlayerUpsyncedInputFrameId" could only be advanced by ws session, when the following condition is met we know that the slow ticker is really in trouble!
|
||||
if pR.LatestPlayerUpsyncedInputFrameId > (pR.LastAllConfirmedInputFrameId + pR.InputFrameUpsyncDelayTolerance + 1) {
|
||||
// Type#1 check whether there's a significantly slow ticker among players
|
||||
oldLastAllConfirmedInputFrameId := pR.LastAllConfirmedInputFrameId
|
||||
@@ -1730,7 +1749,7 @@ func (pR *Room) startBattleUdpTunnel() {
|
||||
pReq := new(pb.WsReq)
|
||||
bytes := message[0:rlen]
|
||||
if unmarshalErr := proto.Unmarshal(bytes, pReq); nil != unmarshalErr {
|
||||
Logger.Warn("`BattleUdpTunnel` for roomId=%d failed to unmarshal", zap.Error(unmarshalErr))
|
||||
Logger.Warn(fmt.Sprintf("`BattleUdpTunnel` for roomId=%d failed to unmarshal %d bytes", pR.Id, rlen), zap.Error(unmarshalErr))
|
||||
continue
|
||||
}
|
||||
playerId := pReq.PlayerId
|
||||
@@ -1760,7 +1779,7 @@ func (pR *Room) startBattleUdpTunnel() {
|
||||
Logger.Warn(fmt.Sprintf("`BattleUdpTunnel` for roomId=%d failed to forward upsync from (playerId:%d, joinIndex:%d, addr:%s) to (otherPlayerId:%d, otherPlayerJoinIndex:%d, otherPlayerAddr:%s)\n", pR.Id, playerId, peerJoinIndex, remote, otherPlayer.Id, otherPlayer.JoinIndex, otherPlayer.BattleUdpTunnelAddr))
|
||||
}
|
||||
}
|
||||
pR.OnBattleCmdReceived(pReq) // To help advance "pR.LastAllConfirmedInputFrameId" asap
|
||||
pR.OnBattleCmdReceived(pReq, true) // To help advance "pR.LastAllConfirmedInputFrameId" asap, and even if "pR.LastAllConfirmedInputFrameId" is not advanced due to packet loss, these UDP packets would help prefill the "InputsBuffer" with correct player "future inputs (compared to ws session)" such that when "forceConfirmation" occurs we have as many correct predictions as possible
|
||||
}
|
||||
|
||||
}
|
||||
|
@@ -388,7 +388,7 @@ func Serve(c *gin.Context) {
|
||||
startOrFeedHeartbeatWatchdog(conn)
|
||||
case models.UPSYNC_MSG_ACT_PLAYER_CMD:
|
||||
startOrFeedHeartbeatWatchdog(conn)
|
||||
pRoom.OnBattleCmdReceived(pReq)
|
||||
pRoom.OnBattleCmdReceived(pReq, false)
|
||||
case models.UPSYNC_MSG_ACT_PLAYER_COLLIDER_ACK:
|
||||
res := pRoom.OnPlayerBattleColliderAcked(int32(playerId))
|
||||
if false == res {
|
||||
|
@@ -547,7 +547,7 @@
|
||||
"array": [
|
||||
0,
|
||||
0,
|
||||
210.53572189052173,
|
||||
209.73151519075364,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
|
@@ -29,6 +29,15 @@ cc.Class({
|
||||
if (!selfPlayerRichInfo) return;
|
||||
const selfPlayerNode = selfPlayerRichInfo.node;
|
||||
if (!selfPlayerNode) return;
|
||||
self.mapNode.setPosition(cc.v2().sub(selfPlayerNode.position));
|
||||
const dst = cc.v2().sub(selfPlayerNode.position);
|
||||
const pDiff = dst.sub(self.mapNode.position);
|
||||
const stepLength = dt * self.speed;
|
||||
if (stepLength > pDiff.mag()) {
|
||||
self.mapNode.setPosition(dst);
|
||||
} else {
|
||||
pDiff.normalizeSelf();
|
||||
const newMapPos = self.mapNode.position.add(pDiff.mul(dt * self.speed));
|
||||
self.mapNode.setPosition(newMapPos);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@@ -43,10 +43,10 @@ window.onUdpMessage = (args) => {
|
||||
//cc.log(`#2 Js called back by CPP for upsync: onUdpMessage: ${JSON.stringify(req)}`);
|
||||
if (req.act && window.UPSYNC_MSG_ACT_PLAYER_CMD == req.act) {
|
||||
let effCnt = 0;
|
||||
const renderedInputFrameIdUpper = gopkgs.ConvertToDelayedInputFrameId(self.renderFrameId);
|
||||
const peerJoinIndex = req.joinIndex;
|
||||
if (peerJoinIndex == self.selfPlayerInfo.JoinIndex) return;
|
||||
const batch = req.inputFrameUpsyncBatch;
|
||||
self.onPeerInputFrameUpsync(peerJoinIndex, batch);
|
||||
self.onPeerInputFrameUpsync(peerJoinIndex, batch, true);
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -144,7 +144,7 @@ cc.Class({
|
||||
return (confirmedList + 1) == (1 << this.playerRichInfoDict.size);
|
||||
},
|
||||
|
||||
getOrPrefabInputFrameUpsync(inputFrameId) {
|
||||
getOrPrefabInputFrameUpsync(inputFrameId, canConfirmSelf) {
|
||||
// TODO: find some kind of synchronization mechanism against "onInputFrameDownsyncBatch"!
|
||||
const self = this;
|
||||
if (
|
||||
@@ -157,33 +157,48 @@ cc.Class({
|
||||
let previousSelfInput = null,
|
||||
currSelfInput = null;
|
||||
const joinIndex = self.selfPlayerInfo.JoinIndex;
|
||||
const selfJoinIndexMask = (1 << (joinIndex - 1));
|
||||
const existingInputFrame = self.recentInputCache.GetByFrameId(inputFrameId);
|
||||
const previousInputFrameDownsync = self.recentInputCache.GetByFrameId(inputFrameId - 1);
|
||||
previousSelfInput = (null == previousInputFrameDownsync ? null : previousInputFrameDownsync.InputList[joinIndex - 1]);
|
||||
if (null != existingInputFrame) {
|
||||
if (
|
||||
null != existingInputFrame
|
||||
&&
|
||||
(true != canConfirmSelf)
|
||||
) {
|
||||
// This could happen upon either [type#1] or [type#2] forceConfirmation, where "refRenderFrame" is accompanied by some "inputFrameDownsyncs". The check here also guarantees that we don't override history
|
||||
//console.log(`noDelayInputFrameId=${inputFrameId} already exists in recentInputCache: recentInputCache=${self._stringifyRecentInputCache(false)}`);
|
||||
return [previousSelfInput, existingInputFrame.InputList[joinIndex - 1]];
|
||||
}
|
||||
|
||||
const lastAllConfirmedInputFrame = self.recentInputCache.GetByFrameId(self.lastAllConfirmedInputFrameId);
|
||||
const prefabbedInputList = new Array(self.playerRichInfoDict.size).fill(0);
|
||||
// the returned "gopkgs.NewInputFrameDownsync.InputList" is immutable, thus we can only modify the values in "prefabbedInputList"
|
||||
for (let k in prefabbedInputList) {
|
||||
if (null != previousInputFrameDownsync) {
|
||||
for (let k = 0; k < window.boundRoomCapacity; ++k) {
|
||||
if (null != existingInputFrame) {
|
||||
// When "null != existingInputFrame", it implies that "true == canConfirmSelf" here, we just have to assign "prefabbedInputList[(joinIndex-1)]" specifically and copy all others
|
||||
prefabbedInputList[k] = existingInputFrame.InputList[k];
|
||||
} else if (self.lastIndividuallyConfirmedInputFrameId[k] <= inputFrameId) {
|
||||
prefabbedInputList[k] = self.lastIndividuallyConfirmedInputList[k];
|
||||
// Don't predict "btnA & btnB"!
|
||||
prefabbedInputList[k] = (prefabbedInputList[k] & 15);
|
||||
} else if (null != previousInputFrameDownsync) {
|
||||
// When "self.lastIndividuallyConfirmedInputFrameId[k] > inputFrameId", don't use it to predict a historical input!
|
||||
prefabbedInputList[k] = previousInputFrameDownsync.InputList[k];
|
||||
// Don't predict "btnA & btnB"!
|
||||
prefabbedInputList[k] = (prefabbedInputList[k] & 15);
|
||||
}
|
||||
if (0 <= self.lastAllConfirmedInputFrameId && inputFrameId - 1 > self.lastAllConfirmedInputFrameId) {
|
||||
prefabbedInputList[k] = lastAllConfirmedInputFrame.InputList[k];
|
||||
}
|
||||
// Don't predict "btnA & btnB"!
|
||||
prefabbedInputList[k] = (prefabbedInputList[k] & 15);
|
||||
}
|
||||
let initConfirmedList = 0;
|
||||
if (null != existingInputFrame) {
|
||||
// When "null != existingInputFrame", it implies that "true == canConfirmSelf" here
|
||||
initConfirmedList = (existingInputFrame.ConfirmedList | selfJoinIndexMask);
|
||||
}
|
||||
currSelfInput = self.ctrl.getEncodedInput(); // When "null == existingInputFrame", it'd be safe to say that the realtime "self.ctrl.getEncodedInput()" is for the requested "inputFrameId"
|
||||
prefabbedInputList[(joinIndex - 1)] = currSelfInput;
|
||||
while (self.recentInputCache.EdFrameId <= inputFrameId) {
|
||||
// Fill the gap
|
||||
const prefabbedInputFrameDownsync = gopkgs.NewInputFrameDownsync(self.recentInputCache.EdFrameId, prefabbedInputList.slice(), (1 << (joinIndex - 1)));
|
||||
// [WARNING] Do not blindly use "selfJoinIndexMask" here, as the "actuallyUsedInput for self" couldn't be confirmed while prefabbing, otherwise we'd have confirmed a wrong self input by "_markConfirmationIfApplicable()"!
|
||||
const prefabbedInputFrameDownsync = gopkgs.NewInputFrameDownsync(self.recentInputCache.EdFrameId, prefabbedInputList.slice(), initConfirmedList);
|
||||
// console.log(`Prefabbed inputFrameId=${prefabbedInputFrameDownsync.InputFrameId}`);
|
||||
self.recentInputCache.Put(prefabbedInputFrameDownsync);
|
||||
}
|
||||
@@ -355,6 +370,8 @@ cc.Class({
|
||||
self.lastUpsyncInputFrameId = -1;
|
||||
self.chaserRenderFrameId = -1; // at any moment, "chaserRenderFrameId <= renderFrameId", but "chaserRenderFrameId" would fluctuate according to "onInputFrameDownsyncBatch"
|
||||
|
||||
self.lastIndividuallyConfirmedInputFrameId = new Array(window.boundRoomCapacity).fill(-1);
|
||||
self.lastIndividuallyConfirmedInputList = new Array(window.boundRoomCapacity).fill(0);
|
||||
self.recentRenderCache = new RingBuffer(self.renderCacheSize);
|
||||
|
||||
self.recentInputCache = gopkgs.NewRingBufferJs((self.renderCacheSize >> 1) + 1);
|
||||
@@ -814,6 +831,19 @@ cc.Class({
|
||||
return true;
|
||||
},
|
||||
|
||||
_markConfirmationIfApplicable() {
|
||||
const self = this;
|
||||
let newAllConfirmedCnt = 0;
|
||||
while (self.recentInputCache.StFrameId <= self.lastAllConfirmedInputFrameId && self.lastAllConfirmedInputFrameId < self.recentInputCache.EdFrameId) {
|
||||
const inputFrameDownsync = self.recentInputCache.GetByFrameId(self.lastAllConfirmedInputFrameId);
|
||||
if (null == inputFrameDownsync) break;
|
||||
if (self._allConfirmed(inputFrameDownsync.ConfirmedList)) break;
|
||||
++self.lastAllConfirmedInputFrameId;
|
||||
++newAllConfirmedCnt;
|
||||
}
|
||||
return newAllConfirmedCnt;
|
||||
},
|
||||
|
||||
onInputFrameDownsyncBatch(batch /* []*pb.InputFrameDownsync */ ) {
|
||||
// TODO: find some kind of synchronization mechanism against "getOrPrefabInputFrameUpsync"!
|
||||
if (null == batch) {
|
||||
@@ -835,8 +865,9 @@ cc.Class({
|
||||
if (inputFrameDownsyncId <= self.lastAllConfirmedInputFrameId) {
|
||||
continue;
|
||||
}
|
||||
// [WARNING] Take all "inputFrameDownsync" from backend as all-confirmed, it'll be later checked by "rollbackAndChase".
|
||||
// [WARNING] Now that "inputFrameDownsyncId > self.lastAllConfirmedInputFrameId", we should make an update immediately because unlike its backend counterpart "Room.LastAllConfirmedInputFrameId", the frontend "mapIns.lastAllConfirmedInputFrameId" might inevitably get gaps among discrete values due to "either type#1 or type#2 forceConfirmation" -- and only "onInputFrameDownsyncBatch" can catch this!
|
||||
self.lastAllConfirmedInputFrameId = inputFrameDownsyncId;
|
||||
|
||||
const localInputFrame = self.recentInputCache.GetByFrameId(inputFrameDownsyncId);
|
||||
if (null != localInputFrame
|
||||
&&
|
||||
@@ -846,16 +877,29 @@ cc.Class({
|
||||
) {
|
||||
firstPredictedYetIncorrectInputFrameId = inputFrameDownsyncId;
|
||||
}
|
||||
// [WARNING] Take all "inputFrameDownsync" from backend as all-confirmed, it'll be later checked by "rollbackAndChase".
|
||||
inputFrameDownsync.confirmedList = (1 << self.playerRichInfoDict.size) - 1;
|
||||
const inputFrameDownsyncLocal = gopkgs.NewInputFrameDownsync(inputFrameDownsync.inputFrameId, inputFrameDownsync.inputList, inputFrameDownsync.confirmedList); // "battle.InputFrameDownsync" in "jsexport"
|
||||
for (let j in self.playerRichInfoArr) {
|
||||
const jj = parseInt(j);
|
||||
if (inputFrameDownsync.inputFrameId > self.lastIndividuallyConfirmedInputFrameId[jj]) {
|
||||
self.lastIndividuallyConfirmedInputFrameId[jj] = inputFrameDownsync.inputFrameId;
|
||||
self.lastIndividuallyConfirmedInputList[jj] = inputFrameDownsync.inputList[jj];
|
||||
}
|
||||
}
|
||||
//console.log(`Confirmed inputFrameId=${inputFrameDownsync.inputFrameId}`);
|
||||
const [ret, oldStFrameId, oldEdFrameId] = self.recentInputCache.SetByFrameId(inputFrameDownsyncLocal, inputFrameDownsync.inputFrameId);
|
||||
if (window.RING_BUFF_FAILED_TO_SET == ret) {
|
||||
throw `Failed to dump input cache (maybe recentInputCache too small)! inputFrameDownsync.inputFrameId=${inputFrameDownsync.inputFrameId}, lastAllConfirmedInputFrameId=${self.lastAllConfirmedInputFrameId}; recentRenderCache=${self._stringifyRecentRenderCache(false)}, recentInputCache=${self._stringifyRecentInputCache(false)}`;
|
||||
}
|
||||
}
|
||||
self._markConfirmationIfApplicable();
|
||||
self._handleIncorrectlyRenderedPrediction(firstPredictedYetIncorrectInputFrameId, batch, false);
|
||||
},
|
||||
|
||||
_handleIncorrectlyRenderedPrediction(firstPredictedYetIncorrectInputFrameId, batch, fromUDP) {
|
||||
if (null == firstPredictedYetIncorrectInputFrameId) return;
|
||||
const self = this;
|
||||
const renderFrameId1 = gopkgs.ConvertToFirstUsedRenderFrameId(firstPredictedYetIncorrectInputFrameId) - 1;
|
||||
if (renderFrameId1 >= self.chaserRenderFrameId) return;
|
||||
|
||||
@@ -871,15 +915,20 @@ cc.Class({
|
||||
--------------------------------------------------------
|
||||
*/
|
||||
// The actual rollback-and-chase would later be executed in update(dt).
|
||||
console.log(`Mismatched input detected, resetting chaserRenderFrameId: ${self.chaserRenderFrameId}->${renderFrameId1} by firstPredictedYetIncorrectInputFrameId: ${firstPredictedYetIncorrectInputFrameId}
|
||||
console.log(`Mismatched input detected, resetting chaserRenderFrameId: ${self.chaserRenderFrameId}->${renderFrameId1} by
|
||||
firstPredictedYetIncorrectInputFrameId: ${firstPredictedYetIncorrectInputFrameId}
|
||||
lastAllConfirmedInputFrameId=${self.lastAllConfirmedInputFrameId}
|
||||
recentInputCache=${self._stringifyRecentInputCache(false)}
|
||||
batchInputFrameIdRange=[${batch[0].inputFrameId}, ${batch[batch.length - 1].inputFrameId}]`);
|
||||
batchInputFrameIdRange=[${batch[0].inputFrameId}, ${batch[batch.length - 1].inputFrameId}]
|
||||
fromUDP=${fromUDP}`);
|
||||
self.chaserRenderFrameId = renderFrameId1;
|
||||
self.networkDoctor.logRollbackFrames(self.renderFrameId - self.chaserRenderFrameId);
|
||||
let rollbackFrames = (self.renderFrameId - self.chaserRenderFrameId);
|
||||
if (0 > rollbackFrames)
|
||||
rollbackFrames = 0;
|
||||
self.networkDoctor.logRollbackFrames(rollbackFrames);
|
||||
},
|
||||
|
||||
onPeerInputFrameUpsync(peerJoinIndex, batch /* []*pb.InputFrameDownsync */ ) {
|
||||
onPeerInputFrameUpsync(peerJoinIndex, batch, fromUDP) {
|
||||
// TODO: find some kind of synchronization mechanism against "getOrPrefabInputFrameUpsync"!
|
||||
// See `<proj-root>/ConcerningEdgeCases.md` for why this method exists.
|
||||
if (null == batch) {
|
||||
@@ -895,36 +944,54 @@ batchInputFrameIdRange=[${batch[0].inputFrameId}, ${batch[batch.length - 1].inpu
|
||||
|
||||
let effCnt = 0;
|
||||
//console.log(`Received peer inputFrameUpsync batch w/ inputFrameId in [${batch[0].inputFrameId}, ${batch[batch.length - 1].inputFrameId}] for prediction assistance`);
|
||||
let firstPredictedYetIncorrectInputFrameId = null;
|
||||
const renderedInputFrameIdUpper = gopkgs.ConvertToDelayedInputFrameId(self.renderFrameId);
|
||||
for (let k in batch) {
|
||||
const inputFrameDownsync = batch[k];
|
||||
const inputFrameDownsyncId = inputFrameDownsync.inputFrameId;
|
||||
if (inputFrameDownsyncId < renderedInputFrameIdUpper) {
|
||||
// Avoid obfuscating already rendered history
|
||||
const inputFrame = batch[k]; // could be either "pb.InputFrameDownsync" or "pb.InputFrameUpsync", depending on "fromUDP"
|
||||
const inputFrameId = inputFrame.inputFrameId;
|
||||
const peerEncodedInput = (true == fromUDP ? inputFrame.encoded : inputFrame.inputList[peerJoinIndex - 1]);
|
||||
if (inputFrameId <= renderedInputFrameIdUpper) {
|
||||
// [WARNING] Avoid obfuscating already rendered history, even at "inputFrameId == renderedInputFrameIdUpper", due to the use of "INPUT_SCALE_FRAMES" some previous render frames might already be rendered with "inputFrameId"!
|
||||
// TODO: Shall we update the "chaserRenderFrameId" if the rendered history was wrong? It doesn't seem to impact eventual correctness if we allow the update of "chaserRenderFrameId" upon "inputFrameId <= renderedInputFrameIdUpper" here, however UDP upsync doesn't reserve order from a same sender and there might be multiple other senders, hence it might result in unnecessarily frequent chasing.
|
||||
const localInputFrame = self.recentInputCache.GetByFrameId(inputFrameId);
|
||||
if (null != localInputFrame
|
||||
&&
|
||||
null == firstPredictedYetIncorrectInputFrameId
|
||||
&&
|
||||
localInputFrame.InputList[peerJoinIndex - 1] != peerEncodedInput
|
||||
) {
|
||||
firstPredictedYetIncorrectInputFrameId = inputFrameId;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (inputFrameDownsyncId <= self.lastAllConfirmedInputFrameId) {
|
||||
if (inputFrameId <= self.lastAllConfirmedInputFrameId) {
|
||||
// [WARNING] Don't reject it by "inputFrameId <= self.lastIndividuallyConfirmedInputFrameId[peerJoinIndex-1]", the arrival of UDP packets might not reserve their sending order!
|
||||
continue;
|
||||
}
|
||||
self.getOrPrefabInputFrameUpsync(inputFrameDownsyncId); // Make sure that inputFrame exists locally
|
||||
const existingInputFrame = self.recentInputCache.GetByFrameId(inputFrameDownsyncId);
|
||||
if (0 < (existingInputFrame.ConfirmedList & (1 << (peerJoinIndex - 1)))) {
|
||||
const peerJoinIndexMask = (1 << (peerJoinIndex - 1));
|
||||
self.getOrPrefabInputFrameUpsync(inputFrameId, false); // Make sure that inputFrame exists locally
|
||||
const existingInputFrame = self.recentInputCache.GetByFrameId(inputFrameId);
|
||||
if (0 < (existingInputFrame.ConfirmedList & peerJoinIndexMask)) {
|
||||
continue;
|
||||
}
|
||||
if (inputFrameId > self.lastIndividuallyConfirmedInputFrameId[peerJoinIndex - 1]) {
|
||||
self.lastIndividuallyConfirmedInputFrameId[peerJoinIndex - 1] = inputFrameId;
|
||||
self.lastIndividuallyConfirmedInputList[peerJoinIndex - 1] = peerEncodedInput;
|
||||
}
|
||||
effCnt += 1;
|
||||
// the returned "gopkgs.NewInputFrameDownsync.InputList" is immutable, thus we can only modify the values in "newInputList" and "newConfirmedList"!
|
||||
let newInputList = new Array(self.playerRichInfoDict.size).fill(0);
|
||||
for (let i in existingInputFrame.InputList) {
|
||||
newInputList[i] = existingInputFrame.InputList[i];
|
||||
}
|
||||
let newConfirmedList = (existingInputFrame.confirmedList | (1 << (peerJoinIndex - 1)));
|
||||
// No need to change "lastAllConfirmedInputFrameId", leave it to "onInputFrameDownsyncBatch" -- we're just helping prediction here
|
||||
const newInputFrameDownsyncLocal = gopkgs.NewInputFrameDownsync(inputFrameDownsyncId, newInputList, newConfirmedList);
|
||||
self.recentInputCache.SetByFrameId(newInputFrameDownsyncLocal, inputFrameDownsyncId);
|
||||
let newInputList = existingInputFrame.InputList.slice();
|
||||
newInputList[peerJoinIndex - 1] = peerEncodedInput;
|
||||
let newConfirmedList = (existingInputFrame.ConfirmedList | peerJoinIndex);
|
||||
const newInputFrameDownsyncLocal = gopkgs.NewInputFrameDownsync(inputFrameId, newInputList, newConfirmedList);
|
||||
//console.log(`Updated encoded input of peerJoinIndex=${peerJoinIndex} to ${peerEncodedInput} for inputFrameId=${inputFrameId}/renderedInputFrameIdUpper=${renderedInputFrameIdUpper} from ${JSON.stringify(inputFrame)}; newInputFrameDownsyncLocal=${self.gopkgsInputFrameDownsyncStr(newInputFrameDownsyncLocal)}; existingInputFrame=${self.gopkgsInputFrameDownsyncStr(existingInputFrame)}`);
|
||||
self.recentInputCache.SetByFrameId(newInputFrameDownsyncLocal, inputFrameId);
|
||||
}
|
||||
if (0 < effCnt) {
|
||||
//self._markConfirmationIfApplicable();
|
||||
self.networkDoctor.logPeerInputFrameUpsync(batch[0].inputFrameId, batch[batch.length - 1].inputFrameId);
|
||||
}
|
||||
self._handleIncorrectlyRenderedPrediction(firstPredictedYetIncorrectInputFrameId, batch, fromUDP);
|
||||
},
|
||||
|
||||
onPlayerAdded(rdf /* pb.RoomDownsyncFrame */ ) {
|
||||
@@ -942,7 +1009,6 @@ batchInputFrameIdRange=[${batch[0].inputFrameId}, ${batch[batch.length - 1].inpu
|
||||
if (ALL_BATTLE_STATES.IN_BATTLE != self.battleState) {
|
||||
return;
|
||||
}
|
||||
self._stringifyRdfIdToActuallyUsedInput();
|
||||
window.closeWSConnection(constants.RET_CODE.BATTLE_STOPPED, "");
|
||||
self.battleState = ALL_BATTLE_STATES.IN_SETTLEMENT;
|
||||
self.countdownNanos = null;
|
||||
@@ -1009,13 +1075,13 @@ batchInputFrameIdRange=[${batch[0].inputFrameId}, ${batch[batch.length - 1].inpu
|
||||
let prevSelfInput = null,
|
||||
currSelfInput = null;
|
||||
if (gopkgs.ShouldGenerateInputFrameUpsync(self.renderFrameId)) {
|
||||
[prevSelfInput, currSelfInput] = self.getOrPrefabInputFrameUpsync(noDelayInputFrameId);
|
||||
[prevSelfInput, currSelfInput] = self.getOrPrefabInputFrameUpsync(noDelayInputFrameId, true);
|
||||
}
|
||||
|
||||
const delayedInputFrameId = gopkgs.ConvertToDelayedInputFrameId(self.renderFrameId);
|
||||
if (null == self.recentInputCache.GetByFrameId(delayedInputFrameId)) {
|
||||
// Possible edge case after resync, kindly note that it's OK to prefab a "future inputFrame" here, because "sendInputFrameUpsyncBatch" would be capped by "noDelayInputFrameId from self.renderFrameId".
|
||||
self.getOrPrefabInputFrameUpsync(delayedInputFrameId);
|
||||
self.getOrPrefabInputFrameUpsync(delayedInputFrameId, false);
|
||||
}
|
||||
|
||||
let t0 = performance.now();
|
||||
@@ -1070,7 +1136,7 @@ othersForcedDownsyncRenderFrame=${JSON.stringify(othersForcedDownsyncRenderFrame
|
||||
++self.renderFrameId; // [WARNING] It's important to increment the renderFrameId AFTER all the operations above!!!
|
||||
self.lastRenderFrameIdTriggeredAt = performance.now();
|
||||
let t3 = performance.now();
|
||||
self.skipRenderFrameFlag = self.networkDoctor.isTooFast();
|
||||
self.skipRenderFrameFlag = self.networkDoctor.isTooFast(self);
|
||||
} catch (err) {
|
||||
console.error("Error during Map.update", err);
|
||||
self.onBattleStopped(); // TODO: Popup to ask player to refresh browser
|
||||
@@ -1419,6 +1485,21 @@ othersForcedDownsyncRenderFrame=${JSON.stringify(othersForcedDownsyncRenderFrame
|
||||
return s.join(',');
|
||||
},
|
||||
|
||||
gopkgsInputFrameDownsyncStr(inputFrameDownsync) {
|
||||
if (null == inputFrameDownsync) return "{}";
|
||||
const self = this;
|
||||
let s = [];
|
||||
s.push(`InputFrameId:${inputFrameDownsync.InputFrameId}`);
|
||||
let ss = [];
|
||||
for (let k = 0; k < window.boundRoomCapacity; ++k) {
|
||||
ss.push(`"${inputFrameDownsync.InputList[k]}"`);
|
||||
}
|
||||
s.push(`InputList:[${ss.join(',')}]`);
|
||||
s.push(`ConfirmedList:${inputFrameDownsync.ConfirmedList}`);
|
||||
|
||||
return `{${s.join(',')}}`;
|
||||
},
|
||||
|
||||
_stringifyRdfIdToActuallyUsedInput() {
|
||||
const self = this;
|
||||
let s = [];
|
||||
|
@@ -79,17 +79,29 @@ NetworkDoctor.prototype.logSkippedRenderFrameCnt = function() {
|
||||
this.skippedRenderFrameCnt += 1;
|
||||
}
|
||||
|
||||
NetworkDoctor.prototype.isTooFast = function() {
|
||||
return false;
|
||||
NetworkDoctor.prototype.isTooFast = function(mapIns) {
|
||||
const [sendingFps, srvDownsyncFps, peerUpsyncFps, rollbackFrames, skippedRenderFrameCnt] = this.stats();
|
||||
if (sendingFps >= this.inputRateThreshold + 2) {
|
||||
if (sendingFps >= this.inputRateThreshold + 3) {
|
||||
// Don't send too fast
|
||||
console.log(`Sending too fast, sendingFps=${sendingFps}`);
|
||||
return true;
|
||||
} else if (sendingFps >= this.inputRateThreshold && srvDownsyncFps >= this.inputRateThreshold) {
|
||||
} else {
|
||||
const sendingFpsNormal = (sendingFps >= this.inputRateThreshold);
|
||||
// An outstanding lag within the "inputFrameDownsyncQ" will reduce "srvDownsyncFps", HOWEVER, a constant lag wouldn't impact "srvDownsyncFps"! In native platforms we might use PING value might help as a supplement information to confirm that the "selfPlayer" is not lagged within the time accounted by "inputFrameDownsyncQ".
|
||||
if (rollbackFrames >= this.rollbackFramesThreshold) {
|
||||
// I got many frames rolled back while none of my peers effectively helped my preciction. Deliberately not using "peerUpsyncThreshold" here because when using UDP p2p upsync broadcasting, we expect to receive effective p2p upsyncs from every other player.
|
||||
return true;
|
||||
const recvFpsNormal = (srvDownsyncFps >= this.inputRateThreshold || peerUpsyncFps >= this.inputRateThreshold * (window.boundRoomCapacity - 1));
|
||||
if (sendingFpsNormal && recvFpsNormal) {
|
||||
let selfInputFrameIdFront = gopkgs.ConvertToNoDelayInputFrameId(mapIns.renderFrameId);
|
||||
let minInputFrameIdFront = Number.MAX_VALUE;
|
||||
for (let k = 0; k < window.boundRoomCapacity; ++k) {
|
||||
if (k + 1 == mapIns.selfPlayerInfo.JoinIndex) continue;
|
||||
if (mapIns.lastIndividuallyConfirmedInputFrameId[k] >= minInputFrameIdFront) continue;
|
||||
minInputFrameIdFront = mapIns.lastIndividuallyConfirmedInputFrameId[k];
|
||||
}
|
||||
if ((selfInputFrameIdFront > minInputFrameIdFront) && ((selfInputFrameIdFront - minInputFrameIdFront) > (mapIns.inputFrameUpsyncDelayTolerance + 1))) {
|
||||
// first comparison condition is to avoid numeric overflow
|
||||
console.log(`Game logic ticking too fast, selfInputFrameIdFront=${selfInputFrameIdFront}, minInputFrameIdFront=${minInputFrameIdFront}, inputFrameUpsyncDelayTolerance=${mapIns.inputFrameUpsyncDelayTolerance}`);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
|
@@ -189,7 +189,7 @@ cc.Class({
|
||||
currSelfInput = null;
|
||||
const noDelayInputFrameId = gopkgs.ConvertToNoDelayInputFrameId(self.renderFrameId); // It's important that "inputDelayFrames == 0" here
|
||||
if (gopkgs.ShouldGenerateInputFrameUpsync(self.renderFrameId)) {
|
||||
const prevAndCurrInputs = self.getOrPrefabInputFrameUpsync(noDelayInputFrameId);
|
||||
const prevAndCurrInputs = self.getOrPrefabInputFrameUpsync(noDelayInputFrameId, true);
|
||||
prevSelfInput = prevAndCurrInputs[0];
|
||||
currSelfInput = prevAndCurrInputs[1];
|
||||
}
|
||||
|
@@ -229,7 +229,11 @@ window.initPersistentSessionClient = function(onopenCb, expectedRoomId) {
|
||||
const peerJoinIndex = resp.peerJoinIndex;
|
||||
const peerAddrList = resp.rdf.peerUdpAddrList;
|
||||
console.log(`Got DOWNSYNC_MSG_ACT_PEER_UDP_ADDR peerAddrList=${JSON.stringify(peerAddrList)}; boundRoomCapacity=${window.boundRoomCapacity}`);
|
||||
DelayNoMore.UdpSession.upsertPeerUdpAddr(peerAddrList, window.boundRoomCapacity, window.mapIns.selfPlayerInfo.JoinIndex); // In C++ impl it actually broadcasts the peer-punching message to all known peers within "window.boundRoomCapacity"
|
||||
for (let j = 0; j < 3; ++j) {
|
||||
setTimeout(()=> {
|
||||
DelayNoMore.UdpSession.upsertPeerUdpAddr(peerAddrList, window.boundRoomCapacity, window.mapIns.selfPlayerInfo.JoinIndex); // In C++ impl it actually broadcasts the peer-punching message to all known peers within "window.boundRoomCapacity"
|
||||
}, j*500);
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
@@ -250,7 +254,8 @@ window.initPersistentSessionClient = function(onopenCb, expectedRoomId) {
|
||||
console.warn(`The WS clientSession is closed: evt=${JSON.stringify(evt)}, evt.code=${evt.code}`);
|
||||
if (cc.sys.isNative) {
|
||||
if (mapIns.frameDataLoggingEnabled) {
|
||||
console.warn(`${mapIns._stringifyRdfIdToActuallyUsedInput()}`);
|
||||
console.warn(`${mapIns._stringifyRdfIdToActuallyUsedInput()}
|
||||
`);
|
||||
}
|
||||
DelayNoMore.UdpSession.closeUdpSession();
|
||||
}
|
||||
@@ -260,7 +265,8 @@ window.initPersistentSessionClient = function(onopenCb, expectedRoomId) {
|
||||
case constants.RET_CODE.BATTLE_STOPPED:
|
||||
// deliberately do nothing
|
||||
if (mapIns.frameDataLoggingEnabled) {
|
||||
console.warn(`${mapIns._stringifyRdfIdToActuallyUsedInput()}`);
|
||||
console.warn(`${mapIns._stringifyRdfIdToActuallyUsedInput()}
|
||||
`);
|
||||
}
|
||||
break;
|
||||
case constants.RET_CODE.PLAYER_NOT_ADDABLE_TO_ROOM:
|
||||
@@ -277,7 +283,8 @@ window.initPersistentSessionClient = function(onopenCb, expectedRoomId) {
|
||||
case constants.RET_CODE.PLAYER_CHEATING:
|
||||
case 1006: // Peer(i.e. the backend) gone unexpectedly, but not working for "cc.sys.isNative"
|
||||
if (mapIns.frameDataLoggingEnabled) {
|
||||
console.warn(`${mapIns._stringifyRdfIdToActuallyUsedInput()}`);
|
||||
console.warn(`${mapIns._stringifyRdfIdToActuallyUsedInput()}
|
||||
`);
|
||||
}
|
||||
window.clearLocalStorageAndBackToLoginScene(true);
|
||||
break;
|
||||
@@ -335,7 +342,7 @@ window.initSecondarySession = function(onopenCb, boundRoomId) {
|
||||
//console.log(`Got non-empty onmessage decoded: resp.act=${resp.act}`);
|
||||
switch (resp.act) {
|
||||
case window.DOWNSYNC_MSG_ACT_PEER_INPUT_BATCH:
|
||||
mapIns.onPeerInputFrameUpsync(resp.peerJoinIndex, resp.inputFrameDownsyncBatch);
|
||||
mapIns.onPeerInputFrameUpsync(resp.peerJoinIndex, resp.inputFrameDownsyncBatch, false);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
@@ -17,6 +17,12 @@
|
||||
"append_file": [{
|
||||
"from": "cocos/scripting/js-bindings/manual/jsb_module_register.cpp",
|
||||
"to": "frameworks/runtime-src/Classes/jsb_module_register.cpp"
|
||||
}, {
|
||||
"from": "frameworks/runtime-src/Classes/send_ring_buff.hpp",
|
||||
"to": "frameworks/runtime-src/Classes/send_ring_buff.hpp"
|
||||
}, {
|
||||
"from": "frameworks/runtime-src/Classes/send_ring_buff.cpp",
|
||||
"to": "frameworks/runtime-src/Classes/send_ring_buff.cpp"
|
||||
}, {
|
||||
"from": "frameworks/runtime-src/Classes/udp_session.hpp",
|
||||
"to": "frameworks/runtime-src/Classes/udp_session.hpp"
|
||||
|
@@ -0,0 +1,31 @@
|
||||
#include <string.h>
|
||||
#include "send_ring_buff.hpp"
|
||||
|
||||
void SendRingBuff::put(BYTEC* const newBytes, size_t newBytesLen, PeerAddr* pNewPeerAddr) {
|
||||
while (0 < cnt && cnt >= n) {
|
||||
// Make room for the new element
|
||||
this->pop();
|
||||
}
|
||||
eles[ed].bytesLen = newBytesLen;
|
||||
memset(eles[ed].bytes, 0, sizeof eles[ed].bytes);
|
||||
memcpy(eles[ed].bytes, newBytes, newBytesLen);
|
||||
eles[ed].peerAddr = *(pNewPeerAddr);
|
||||
ed++;
|
||||
cnt++;
|
||||
if (ed >= n) {
|
||||
ed -= n; // Deliberately not using "%" operator for performance concern
|
||||
}
|
||||
}
|
||||
|
||||
SendWork* SendRingBuff::pop() {
|
||||
if (0 == cnt) {
|
||||
return NULL;
|
||||
}
|
||||
SendWork* ret = &(eles[st]);
|
||||
cnt--;
|
||||
st++;
|
||||
if (st >= n) {
|
||||
st -= n;
|
||||
}
|
||||
return ret;
|
||||
}
|
@@ -0,0 +1,44 @@
|
||||
#ifndef send_ring_buff_hpp
|
||||
#define send_ring_buff_hpp
|
||||
|
||||
#include "uv/uv.h"
|
||||
#define __SSIZE_T // Otherwise "ssize_t" would have conflicting macros error that stops compiling
|
||||
|
||||
int const RING_BUFF_CONSECUTIVE_SET = 0;
|
||||
int const RING_BUFF_NON_CONSECUTIVE_SET = 1;
|
||||
int const RING_BUFF_FAILED_TO_SET = 2;
|
||||
|
||||
typedef char BYTEC;
|
||||
typedef char const CHARC;
|
||||
int const maxUdpPayloadBytes = 128;
|
||||
int const maxBuffedMsgs = 512;
|
||||
|
||||
struct PeerAddr {
|
||||
struct sockaddr_in sockAddrIn;
|
||||
uint32_t authKey;
|
||||
};
|
||||
|
||||
class SendWork {
|
||||
public:
|
||||
BYTEC bytes[maxUdpPayloadBytes]; // Wasting some RAM here thus no need for explicit recursive destruction
|
||||
size_t bytesLen;
|
||||
PeerAddr peerAddr;
|
||||
};
|
||||
|
||||
// [WARNING] This class is specific to "SendWork", designed and implemented only to use in multithreading env and save heap alloc/dealloc timecomplexity, it's by no means comparable to the Golang or JavaScript versions!
|
||||
class SendRingBuff {
|
||||
public:
|
||||
int ed, st, n, cnt;
|
||||
SendWork eles[maxBuffedMsgs]; // preallocated on stack to save heap alloc/dealloc time
|
||||
SendRingBuff(int newN) {
|
||||
this->n = newN;
|
||||
this->st = this->ed = this->cnt = 0;
|
||||
}
|
||||
|
||||
void put(BYTEC* const newBytes, size_t newBytesLen, PeerAddr* pNewPeerAddr);
|
||||
|
||||
// Sending is always sequential in UvSendThread, no need to return a copy of "SendWork" instance
|
||||
SendWork* pop();
|
||||
};
|
||||
|
||||
#endif
|
@@ -4,18 +4,23 @@
|
||||
#include "cocos/base/CCScheduler.h"
|
||||
#include "cocos/scripting/js-bindings/jswrapper/SeApi.h"
|
||||
|
||||
uv_udp_t* udpSocket = NULL;
|
||||
uv_thread_t recvTid;
|
||||
uv_timer_t peerPunchTimer;
|
||||
uv_async_t uvLoopStopSig;
|
||||
uv_loop_t* loop = NULL; // Only this loop is used for this simple PoC
|
||||
int const punchServerCnt = 3;
|
||||
int const punchPeerCnt = 3;
|
||||
int const broadcastUpsyncCnt = 1;
|
||||
|
||||
struct PeerAddr peerAddrList[maxPeerCnt];
|
||||
uv_udp_t *udpRecvSocket = NULL, *udpSendSocket = NULL;
|
||||
uv_thread_t recvTid, sendTid;
|
||||
uv_async_t uvRecvLoopStopSig, uvSendLoopStopSig, uvSendLoopTriggerSig;
|
||||
uv_loop_t *recvLoop = NULL, *sendLoop = NULL;
|
||||
|
||||
uv_mutex_t sendRingBuffLock; // used along with "uvSendLoopTriggerSig" as a "uv_cond_t"
|
||||
SendRingBuff* sendRingBuff = NULL;
|
||||
|
||||
char SRV_IP[256];
|
||||
int SRV_PORT = 0;
|
||||
int UDP_TUNNEL_SRV_PORT = 0;
|
||||
struct PeerAddr udpTunnelAddr;
|
||||
struct PeerAddr udpPunchingServerAddr, udpTunnelAddr;
|
||||
struct PeerAddr peerAddrList[maxPeerCnt];
|
||||
|
||||
void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr const* addr, unsigned flags) {
|
||||
if (nread < 0) {
|
||||
@@ -24,6 +29,7 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr
|
||||
free(buf->base);
|
||||
return;
|
||||
}
|
||||
#if defined(COCOS2D_DEBUG) && (COCOS2D_DEBUG > 0)
|
||||
char ip[INET_ADDRSTRLEN];
|
||||
memset(ip, 0, sizeof ip);
|
||||
int port = 0;
|
||||
@@ -35,22 +41,23 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr
|
||||
struct sockaddr_in const* sockAddr = (struct sockaddr_in const*)addr;
|
||||
uv_inet_ntop(sockAddr->sin_family, &(sockAddr->sin_addr), ip, INET_ADDRSTRLEN);
|
||||
port = ntohs(sockAddr->sin_port);
|
||||
//CCLOG("UDP received %d bytes from %s:%d", nread, ip, port);
|
||||
CCLOG("UDP received %u bytes from %s:%d", nread, ip, port);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
CCLOG("UDP received %d bytes from unknown sender", nread);
|
||||
CCLOG("UDP received %u bytes from unknown sender", nread);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (6 == nread) {
|
||||
// holepunching
|
||||
} else if (0 < nread) {
|
||||
// Non-holepunching; it might be more effective in RAM usage to use a threadsafe RingBuff to pass msg to GameThread here, but as long as it's not a performance blocker don't bother optimize here...
|
||||
uint8_t* const ui8Arr = (uint8_t*)malloc(maxUdpPayloadBytes*sizeof(uint8_t));
|
||||
memset(ui8Arr, 0, sizeof ui8Arr);
|
||||
memset(ui8Arr, 0, sizeof(ui8Arr));
|
||||
for (int i = 0; i < nread; i++) {
|
||||
*(ui8Arr+i) = *(buf->base + i);
|
||||
}
|
||||
@@ -75,8 +82,7 @@ void _onRead(uv_udp_t* req, ssize_t nread, uv_buf_t const* buf, struct sockaddr
|
||||
gameThreadMsg->decRef(); // Reference http://docs.cocos.com/creator/2.2/manual/en/advanced-topics/JSB2.0-learning.html#seobject
|
||||
//CCLOG("UDP received %d bytes upsync -- 4", nread);
|
||||
free(ui8Arr);
|
||||
CCLOG("UDP received %d bytes upsync -- 5", nread);
|
||||
|
||||
//CCLOG("UDP received %d bytes upsync -- 5", nread);
|
||||
});
|
||||
}
|
||||
free(buf->base);
|
||||
@@ -95,198 +101,56 @@ static void _allocBuffer(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
|
||||
}
|
||||
|
||||
void _onUvStopSig(uv_async_t* handle) {
|
||||
uv_stop(loop);
|
||||
CCLOG("UDP recv loop is signaled to stop in UvThread");
|
||||
if (!handle) return;
|
||||
uv_stop(handle->loop);
|
||||
CCLOG("UDP loop %p is signaled to stop in UvXxxxThread", handle->loop);
|
||||
}
|
||||
|
||||
void _onSend(uv_udp_send_t* req, int status) {
|
||||
CCLOG("UDP send about to free req for status:%d...", status);
|
||||
free(req); // No need to free "req->base", it'll be handled in each "_afterXxx" callback
|
||||
CCLOG("UDP send freed req for status:%d...", status);
|
||||
void _afterSend(uv_udp_send_t* req, int status) {
|
||||
if (req) {
|
||||
free(req);
|
||||
}
|
||||
if (status) {
|
||||
CCLOGERROR("uv_udp_send_cb error: %s\n", uv_strerror(status));
|
||||
}
|
||||
}
|
||||
|
||||
void _onUvTimerClosed(uv_handle_t* timer) {
|
||||
free(timer);
|
||||
}
|
||||
|
||||
int const punchServerCnt = 3;
|
||||
class PunchServerWork {
|
||||
public:
|
||||
BYTEC bytes[maxUdpPayloadBytes]; // Wasting some RAM here thus no need for explicit recursive destruction
|
||||
size_t bytesLen;
|
||||
|
||||
BYTEC udpTunnelBytes[maxUdpPayloadBytes];
|
||||
size_t udpTunnelBytesLen;
|
||||
|
||||
PunchServerWork(BYTEC* const newBytes, size_t newBytesLen, BYTEC* const newUdpTunnelBytes, size_t newUdpTunnelBytesLen) {
|
||||
memset(this->bytes, 0, sizeof(this->bytes));
|
||||
memcpy(this->bytes, newBytes, newBytesLen);
|
||||
|
||||
this->bytesLen = newBytesLen;
|
||||
|
||||
memset(this->udpTunnelBytes, 0, sizeof(this->udpTunnelBytes));
|
||||
memcpy(this->udpTunnelBytes, newUdpTunnelBytes, newUdpTunnelBytesLen);
|
||||
|
||||
this->udpTunnelBytesLen = newUdpTunnelBytesLen;
|
||||
}
|
||||
};
|
||||
void _punchServerOnUvThread(uv_work_t* wrapper) {
|
||||
PunchServerWork* work = (PunchServerWork*)wrapper->data;
|
||||
for (int i = 0; i < punchServerCnt; i++) {
|
||||
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
|
||||
uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen);
|
||||
struct sockaddr_in destAddr;
|
||||
uv_ip4_addr(SRV_IP, SRV_PORT, &destAddr);
|
||||
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&destAddr, _onSend);
|
||||
|
||||
uv_udp_send_t* udpTunnelReq = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
|
||||
uv_buf_t udpTunnelSendBuffer = uv_buf_init(work->udpTunnelBytes, work->udpTunnelBytesLen);
|
||||
struct sockaddr_in udpTunnelDestAddr;
|
||||
uv_ip4_addr(SRV_IP, UDP_TUNNEL_SRV_PORT, &udpTunnelDestAddr);
|
||||
udpTunnelAddr.sockAddrIn = udpTunnelDestAddr;
|
||||
uv_udp_send(udpTunnelReq, udpSocket, &udpTunnelSendBuffer, 1, (struct sockaddr const*)&udpTunnelDestAddr, _onSend);
|
||||
}
|
||||
}
|
||||
void _afterPunchServer(uv_work_t* wrapper, int status) {
|
||||
CCLOG("UDP send about to free PunchServerWork for status:%d...", status);
|
||||
PunchServerWork* work = (PunchServerWork*)wrapper->data;
|
||||
delete work;
|
||||
CCLOG("UDP freed PunchServerWork for status:%d...", status);
|
||||
}
|
||||
|
||||
class PunchPeerWork {
|
||||
public:
|
||||
int roomCapacity;
|
||||
int selfJoinIndex;
|
||||
int naiveRefCnt;
|
||||
PunchPeerWork(int newRoomCapacity, int newSelfJoinIndex) {
|
||||
this->roomCapacity = newRoomCapacity;
|
||||
this->selfJoinIndex = newSelfJoinIndex;
|
||||
this->naiveRefCnt = 0;
|
||||
}
|
||||
void refInc() {
|
||||
++this->naiveRefCnt;
|
||||
}
|
||||
void refDecAndDelIfZero() {
|
||||
--this->naiveRefCnt;
|
||||
if (0 >= this->naiveRefCnt) {
|
||||
delete this;
|
||||
}
|
||||
}
|
||||
virtual ~PunchPeerWork() {
|
||||
CCLOG("PunchPeerWork instance deleted...");
|
||||
}
|
||||
};
|
||||
void _punchPeerOnUvThreadDelayed(uv_timer_t* timer, int status) {
|
||||
//CCLOG("_punchPeerOnUvThreadDelayed started...");
|
||||
PunchPeerWork* work = (PunchPeerWork*)timer->data;
|
||||
int roomCapacity = work->roomCapacity;
|
||||
int selfJoinIndex = work->selfJoinIndex;
|
||||
|
||||
for (int i = 0; i < roomCapacity; i++) {
|
||||
if (i + 1 == selfJoinIndex) {
|
||||
continue;
|
||||
}
|
||||
if (0 == peerAddrList[i].sockAddrIn.sin_port) {
|
||||
// Peer addr not initialized
|
||||
continue;
|
||||
}
|
||||
//CCLOG("UDP about to punch peer joinIndex:%d", i);
|
||||
char peerIp[17] = { 0 };
|
||||
uv_ip4_name((struct sockaddr_in*)&(peerAddrList[i].sockAddrIn), peerIp, sizeof peerIp);
|
||||
int peerPortSt = ntohs(peerAddrList[i].sockAddrIn.sin_port);
|
||||
int peerPortEd = ntohs(peerAddrList[i].sockAddrIn.sin_port) + 1; // Use tunnel of backend instead of sweeping ports blindly!
|
||||
for (int peerPort = peerPortSt; peerPort < peerPortEd; peerPort++) {
|
||||
if (0 > peerPort) continue;
|
||||
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
|
||||
uv_buf_t sendBuffer = uv_buf_init("foobar", 6); // hardcoded for now
|
||||
struct sockaddr_in testPeerAddr;
|
||||
uv_ip4_addr(peerIp, peerPort, &testPeerAddr);
|
||||
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&testPeerAddr, _onSend);
|
||||
CCLOG("UDP punched peer %s:%d by 6 bytes", peerIp, peerPort);
|
||||
}
|
||||
}
|
||||
uv_timer_stop(timer);
|
||||
uv_close((uv_handle_t*)timer, _onUvTimerClosed);
|
||||
//CCLOG("_punchPeerOnUvThreadDelayed stopped...");
|
||||
work->refDecAndDelIfZero();
|
||||
}
|
||||
int const punchPeerCnt = 3;
|
||||
void _startPunchPeerTimerOnUvThread(uv_work_t* wrapper) {
|
||||
PunchPeerWork* work = (PunchPeerWork*)wrapper->data;
|
||||
int roomCapacity = work->roomCapacity;
|
||||
int selfJoinIndex = work->selfJoinIndex;
|
||||
|
||||
for (int j = 0; j < punchPeerCnt; j++) {
|
||||
work->refInc();
|
||||
}
|
||||
for (int j = 0; j < punchPeerCnt; j++) {
|
||||
uv_timer_t* punchTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t)); // I don't think libuv timer is safe to be called from GameThread, thus calling it within UvThread here
|
||||
uv_timer_init(loop, punchTimer);
|
||||
punchTimer->data = work;
|
||||
uv_timer_start(punchTimer, (uv_timer_cb)&_punchPeerOnUvThreadDelayed, j * 500, 0);
|
||||
}
|
||||
}
|
||||
void _afterPunchPeerTimerStarted(uv_work_t* wrapper, int status) {
|
||||
// RAM of PunchPeerWork handled by "naiveRefCnt"
|
||||
}
|
||||
|
||||
class BroadcastInputFrameUpsyncWork {
|
||||
public:
|
||||
BYTEC bytes[maxUdpPayloadBytes]; // Wasting some RAM here thus no need for explicit recursive destruction
|
||||
size_t bytesLen;
|
||||
int roomCapacity;
|
||||
int selfJoinIndex;
|
||||
|
||||
BroadcastInputFrameUpsyncWork(BYTEC* const newBytes, size_t newBytesLen, int newRoomCapacity, int newSelfJoinIndex) {
|
||||
memset(this->bytes, 0, sizeof(this->bytes));
|
||||
memcpy(this->bytes, newBytes, newBytesLen);
|
||||
|
||||
this->bytesLen = newBytesLen;
|
||||
|
||||
this->roomCapacity = newRoomCapacity;
|
||||
this->selfJoinIndex = newSelfJoinIndex;
|
||||
}
|
||||
};
|
||||
int const broadcastUpsyncCnt = 1;
|
||||
void _broadcastInputFrameUpsyncOnUvThread(uv_work_t* wrapper) {
|
||||
BroadcastInputFrameUpsyncWork* work = (BroadcastInputFrameUpsyncWork*)wrapper->data;
|
||||
int roomCapacity = work->roomCapacity;
|
||||
int selfJoinIndex = work->selfJoinIndex;
|
||||
// Send to room udp tunnel in case of hole punching failure
|
||||
for (int j = 0; j < broadcastUpsyncCnt; j++) {
|
||||
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
|
||||
uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen);
|
||||
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&(udpTunnelAddr.sockAddrIn), _onSend);
|
||||
CCLOG("UDP sent upsync to udp tunnel %s:%d by %u bytes round-%d", SRV_IP, UDP_TUNNEL_SRV_PORT, work->bytesLen, j);
|
||||
}
|
||||
void _onUvSthNewToSend(uv_async_t* handle) {
|
||||
|
||||
for (int i = 0; i < roomCapacity; i++) {
|
||||
if (i + 1 == selfJoinIndex) {
|
||||
continue;
|
||||
bool hasNext = true;
|
||||
while (NULL != handle && true == hasNext) {
|
||||
SendWork* work = NULL;
|
||||
uv_mutex_lock(&sendRingBuffLock);
|
||||
work = sendRingBuff->pop();
|
||||
|
||||
if (NULL == work) {
|
||||
hasNext = false;
|
||||
}
|
||||
if (0 == peerAddrList[i].sockAddrIn.sin_port) {
|
||||
// Peer addr not initialized
|
||||
continue;
|
||||
}
|
||||
char peerIp[17] = { 0 };
|
||||
uv_ip4_name((struct sockaddr_in*)&(peerAddrList[i].sockAddrIn), peerIp, sizeof peerIp);
|
||||
// Might want to send several times for better arrival rate
|
||||
for (int j = 0; j < broadcastUpsyncCnt; j++) {
|
||||
/*
|
||||
[WARNING] The following "uv_udp_try_send" might block I / O for a long time, hence unlock "as soon as possible" to avoid blocking the "GameThread" which is awaiting to acquire this mutex!
|
||||
|
||||
There's a very small chance where "sendRingBuff->put(...)" could contaminate the just popped "work" in "sendRingBuff->eles", thus "sendRingBuff->n" is made quite large to avoid that, moreover in terms of protecting "work" we're also unlocking "as late as possible"!
|
||||
*/
|
||||
uv_mutex_unlock(&sendRingBuffLock);
|
||||
if (NULL != work) {
|
||||
|
||||
// [WARNING] If "uv_udp_send" is to be used instead of "uv_udp_try_send", as UvSendThread will always be terminated from GameThread, it's a MUST to use the following heap-alloc form to initialize "uv_udp_send_t* req" such that "_afterSend" is guaranteed to be called, otherwise "int uvRunRet2 = uv_run(l, UV_RUN_DEFAULT);" for UvSendThread would block forever due to residual active handles.
|
||||
|
||||
uv_udp_send_t* req = (uv_udp_send_t*)malloc(sizeof(uv_udp_send_t));
|
||||
uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen);
|
||||
uv_udp_send(req, udpSocket, &sendBuffer, 1, (struct sockaddr const*)&(peerAddrList[i].sockAddrIn), _onSend);
|
||||
CCLOG("UDP broadcasted upsync to peer %s:%d by %u bytes round-%d", peerIp, ntohs(peerAddrList[i].sockAddrIn.sin_port), work->bytesLen, j);
|
||||
uv_udp_send(req, udpSendSocket, &sendBuffer, 1, (struct sockaddr const*)&(work->peerAddr.sockAddrIn), _afterSend);
|
||||
|
||||
//uv_buf_t sendBuffer = uv_buf_init(work->bytes, work->bytesLen);
|
||||
//uv_udp_try_send(udpSendSocket, &sendBuffer, 1, (struct sockaddr const*)&(work->peerAddr.sockAddrIn));
|
||||
#if defined(COCOS2D_DEBUG) && (COCOS2D_DEBUG > 0)
|
||||
char ip[INET_ADDRSTRLEN];
|
||||
memset(ip, 0, sizeof ip);
|
||||
uv_inet_ntop(work->peerAddr.sockAddrIn.sin_family, &(work->peerAddr.sockAddrIn.sin_addr), ip, INET_ADDRSTRLEN);
|
||||
int port = ntohs(work->peerAddr.sockAddrIn.sin_port);
|
||||
CCLOG("UDP sent %d bytes to %s:%d", sendBuffer.len, ip, port);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
void _afterBroadcastInputFrameUpsync(uv_work_t* wrapper, int status) {
|
||||
BroadcastInputFrameUpsyncWork* work = (BroadcastInputFrameUpsyncWork*)wrapper->data;
|
||||
delete work;
|
||||
}
|
||||
|
||||
void _onWalkCleanup(uv_handle_t* handle, void* data) {
|
||||
@@ -297,35 +161,83 @@ void _onWalkCleanup(uv_handle_t* handle, void* data) {
|
||||
void startRecvLoop(void* arg) {
|
||||
uv_loop_t* l = (uv_loop_t*)arg;
|
||||
int uvRunRet1 = uv_run(l, UV_RUN_DEFAULT);
|
||||
CCLOG("UDP recv loop is ended in UvThread, uvRunRet1=%d", uvRunRet1);
|
||||
CCLOG("UDP recv loop is ended in UvRecvThread, uvRunRet1=%d", uvRunRet1);
|
||||
uv_walk(l, _onWalkCleanup, NULL);
|
||||
CCLOG("UDP recv loop is walked in UvRecvThread");
|
||||
int uvRunRet2 = uv_run(l, UV_RUN_DEFAULT);
|
||||
CCLOG("UDP recv loop is run after walking in UvRecvThread, uvRunRet2=%d", uvRunRet2);
|
||||
|
||||
int uvCloseRet = uv_loop_close(l);
|
||||
CCLOG("UDP recv loop is closed in UvThread, uvRunRet2=%d, uvCloseRet=%d", uvRunRet2, uvCloseRet);
|
||||
CCLOG("UDP recv loop is closed in UvRecvThread, uvCloseRet=%d", uvCloseRet);
|
||||
}
|
||||
|
||||
void startSendLoop(void* arg) {
|
||||
uv_loop_t* l = (uv_loop_t*)arg;
|
||||
int uvRunRet1 = uv_run(l, UV_RUN_DEFAULT);
|
||||
CCLOG("UDP send loop is ended in UvSendThread, uvRunRet1=%d", uvRunRet1);
|
||||
uv_walk(l, _onWalkCleanup, NULL);
|
||||
CCLOG("UDP send loop is walked in UvSendThread");
|
||||
int uvRunRet2 = uv_run(l, UV_RUN_DEFAULT);
|
||||
CCLOG("UDP send loop is run after walking in UvSendThread, uvRunRet2=%d", uvRunRet2);
|
||||
|
||||
int uvCloseRet = uv_loop_close(l);
|
||||
CCLOG("UDP send loop is closed in UvSendThread, uvCloseRet=%d", uvCloseRet);
|
||||
uv_mutex_destroy(&sendRingBuffLock);
|
||||
}
|
||||
|
||||
int initSendLoop(struct sockaddr const* pUdpAddr) {
|
||||
sendLoop = uv_loop_new();
|
||||
udpSendSocket = (uv_udp_t*)malloc(sizeof(uv_udp_t));
|
||||
int sendSockInitRes = uv_udp_init(sendLoop, udpSendSocket); // "uv_udp_init" must precede that of "uv_udp_bind" for successful binding!
|
||||
int sendBindRes = uv_udp_bind(udpSendSocket, pUdpAddr, UV_UDP_REUSEADDR);
|
||||
if (0 != sendBindRes) {
|
||||
CCLOGERROR("Failed to bind send; sendSockInitRes=%d, sendBindRes=%d, reason=%s", sendSockInitRes, sendBindRes, uv_strerror(sendBindRes));
|
||||
exit(-1);
|
||||
}
|
||||
uv_mutex_init(&sendRingBuffLock);
|
||||
sendRingBuff = new SendRingBuff(maxBuffedMsgs);
|
||||
uv_async_init(sendLoop, &uvSendLoopStopSig, _onUvStopSig);
|
||||
uv_async_init(sendLoop, &uvSendLoopTriggerSig, _onUvSthNewToSend);
|
||||
|
||||
return sendBindRes;
|
||||
}
|
||||
|
||||
bool initRecvLoop(struct sockaddr const* pUdpAddr) {
|
||||
recvLoop = uv_loop_new();
|
||||
udpRecvSocket = (uv_udp_t*)malloc(sizeof(uv_udp_t));
|
||||
|
||||
int recvSockInitRes = uv_udp_init(recvLoop, udpRecvSocket);
|
||||
int recvbindRes = uv_udp_bind(udpRecvSocket, pUdpAddr, UV_UDP_REUSEADDR);
|
||||
if (0 != recvbindRes) {
|
||||
CCLOGERROR("Failed to bind recv; recvSockInitRes=%d, recvbindRes=%d, reason=%s", recvSockInitRes, recvbindRes, uv_strerror(recvbindRes));
|
||||
exit(-1);
|
||||
}
|
||||
uv_udp_recv_start(udpRecvSocket, _allocBuffer, _onRead);
|
||||
uv_async_init(recvLoop, &uvRecvLoopStopSig, _onUvStopSig);
|
||||
|
||||
return recvbindRes;
|
||||
}
|
||||
|
||||
bool DelayNoMore::UdpSession::openUdpSession(int port) {
|
||||
loop = uv_loop_new();
|
||||
udpSocket = (uv_udp_t*)malloc(sizeof(uv_udp_t));
|
||||
struct sockaddr_in udpAddr;
|
||||
uv_ip4_addr("0.0.0.0", port, &udpAddr);
|
||||
struct sockaddr const* pUdpAddr = (struct sockaddr const*)&udpAddr;
|
||||
/*
|
||||
[WARNING] On Android, the libuv documentation of "UV_UDP_REUSEADDR" is true, i.e. only the socket that binds later on the same port will be triggered the recv callback; however on Windows, experiment shows that the exact reverse is true instead.
|
||||
|
||||
int sockInitRes = uv_udp_init(loop, udpSocket); // "uv_udp_init" must precede that of "uv_udp_bind" for successful binding!
|
||||
It's feasible to use a same socket instance for both receiving and sending in different threads, however not knowing the exact thread-safety concerns for "uv_udp_send/uv_udp_try_send" & "uv recv callback" stops me from doing so, I'd prefer to stick to using different socket instances in different threads.
|
||||
*/
|
||||
#if (CC_TARGET_PLATFORM == CC_PLATFORM_ANDROID)
|
||||
initSendLoop(pUdpAddr);
|
||||
initRecvLoop(pUdpAddr);
|
||||
#else
|
||||
initRecvLoop(pUdpAddr);
|
||||
initSendLoop(pUdpAddr);
|
||||
#endif
|
||||
CCLOG("About to open UDP session at port=%d; recvLoop=%p, sendLoop=%p...", port, recvLoop, sendLoop);
|
||||
|
||||
struct sockaddr_in recv_addr;
|
||||
uv_ip4_addr("0.0.0.0", port, &recv_addr);
|
||||
int bindRes = uv_udp_bind(udpSocket, (struct sockaddr const*)&recv_addr, UV_UDP_REUSEADDR);
|
||||
if (0 != bindRes) {
|
||||
CCLOGERROR("Failed to bind port=%d; bind result=%d, reason=%s", port, bindRes, uv_strerror(bindRes));
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
uv_async_init(loop, &uvLoopStopSig, _onUvStopSig);
|
||||
|
||||
CCLOG("About to open UDP session at port=%d; bind result=%d, sock init result=%d...", port, bindRes, sockInitRes);
|
||||
|
||||
uv_udp_recv_start(udpSocket, _allocBuffer, _onRead);
|
||||
|
||||
uv_thread_create(&recvTid, startRecvLoop, loop);
|
||||
uv_thread_create(&recvTid, startRecvLoop, recvLoop);
|
||||
uv_thread_create(&sendTid, startSendLoop, sendLoop);
|
||||
|
||||
CCLOG("Finished opening UDP session at port=%d", port);
|
||||
|
||||
@@ -335,64 +247,97 @@ bool DelayNoMore::UdpSession::openUdpSession(int port) {
|
||||
bool DelayNoMore::UdpSession::closeUdpSession() {
|
||||
CCLOG("About to close udp session and dealloc all resources...");
|
||||
|
||||
uv_async_send(&uvSendLoopStopSig);
|
||||
CCLOG("Signaling UvSendThread to end in GameThread...");
|
||||
uv_thread_join(&sendTid);
|
||||
free(udpSendSocket);
|
||||
free(sendLoop);
|
||||
delete sendRingBuff;
|
||||
|
||||
uv_async_send(&uvRecvLoopStopSig); // The few if not only guaranteed thread safe utility of libuv :) See http://docs.libuv.org/en/v1.x/async.html#c.uv_async_send
|
||||
CCLOG("Signaling UvRecvThread to end in GameThread...");
|
||||
uv_thread_join(&recvTid);
|
||||
free(udpRecvSocket);
|
||||
free(recvLoop);
|
||||
|
||||
for (int i = 0; i < maxPeerCnt; i++) {
|
||||
peerAddrList[i].authKey = -1; // hardcoded for now
|
||||
memset((char*)&peerAddrList[i].sockAddrIn, 0, sizeof(peerAddrList[i].sockAddrIn));
|
||||
}
|
||||
uv_async_send(&uvLoopStopSig); // The few if not only guaranteed thread safe utility of libuv :) See http://docs.libuv.org/en/v1.x/async.html#c.uv_async_send
|
||||
CCLOG("Signaling UvThread to end in GameThread...");
|
||||
|
||||
uv_thread_join(&recvTid);
|
||||
|
||||
free(udpSocket);
|
||||
free(loop);
|
||||
|
||||
CCLOG("Closed udp session and dealloc all resources in GameThread...");
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool DelayNoMore::UdpSession::punchToServer(CHARC* const srvIp, int const srvPort, BYTEC* const bytes, size_t bytesLen, int const udpTunnelSrvPort, BYTEC* const udpTunnelBytes, size_t udpTunnelBytesBytesLen) {
|
||||
/*
|
||||
[WARNING] The RAM space used for "bytes", either on stack or in heap, is preallocatedand managed by the caller which runs on the GameThread. Actual sending will be made on UvThread.
|
||||
|
||||
Therefore we make a copy of this message before dispatching it "GameThread -> UvThread".
|
||||
*/
|
||||
memset(SRV_IP, 0, sizeof SRV_IP);
|
||||
memcpy(SRV_IP, srvIp, strlen(srvIp));
|
||||
SRV_PORT = srvPort;
|
||||
UDP_TUNNEL_SRV_PORT = udpTunnelSrvPort;
|
||||
PunchServerWork* work = new PunchServerWork(bytes, bytesLen, udpTunnelBytes, udpTunnelBytesBytesLen);
|
||||
uv_work_t* wrapper = (uv_work_t*)malloc(sizeof(uv_work_t));
|
||||
wrapper->data = work;
|
||||
uv_queue_work(loop, wrapper, _punchServerOnUvThread, _afterPunchServer);
|
||||
|
||||
struct sockaddr_in udpPunchingServerDestAddr;
|
||||
uv_ip4_addr(SRV_IP, SRV_PORT, &udpPunchingServerDestAddr);
|
||||
udpPunchingServerAddr.sockAddrIn = udpPunchingServerDestAddr;
|
||||
|
||||
struct sockaddr_in udpTunnelDestAddr;
|
||||
uv_ip4_addr(SRV_IP, UDP_TUNNEL_SRV_PORT, &udpTunnelDestAddr);
|
||||
udpTunnelAddr.sockAddrIn = udpTunnelDestAddr;
|
||||
|
||||
/*
|
||||
Libuv is really inconvenient here, neither "uv_queue_work" nor "uv_async_init" is threadsafe(http ://docs.libuv.org/en/v1.x/threadpool.html#c.uv_queue_work)! What's the point of such a queue? It's even more difficult than writing my own implementation -- again a threadsafe RingBuff could be used to the rescue, yet I'd like to investigate more into how to make the following threadsafe APIs with minimum cross-platform C++ codes
|
||||
- _sendMessage(...), should be both non-blocking & threadsafe, called from GameThread
|
||||
- _onRead(...), should be called first in UvRecvThread in an edge-triggered manner like idiomatic "epoll" or "kqueue", then dispatch the received message to GameThread by a threadsafe RingBuff
|
||||
*/
|
||||
|
||||
uv_mutex_lock(&sendRingBuffLock);
|
||||
sendRingBuff->put(bytes, bytesLen, &udpPunchingServerAddr);
|
||||
sendRingBuff->put(udpTunnelBytes, udpTunnelBytesBytesLen, &udpTunnelAddr);
|
||||
uv_mutex_unlock(&sendRingBuffLock);
|
||||
uv_async_send(&uvSendLoopTriggerSig);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool DelayNoMore::UdpSession::upsertPeerUdpAddr(struct PeerAddr* newPeerAddrList, int roomCapacity, int selfJoinIndex) {
|
||||
// Call timer for multiple sendings from JavaScript?
|
||||
CCLOG("upsertPeerUdpAddr called by js for roomCapacity=%d, selfJoinIndex=%d.", roomCapacity, selfJoinIndex);
|
||||
|
||||
// Punching between existing peer-pairs for Address/Port-restricted Cone NAT (not need for Full Cone NAT); UvThread never writes into "peerAddrList", so I assume that it's safe to skip locking for them
|
||||
uv_mutex_lock(&sendRingBuffLock);
|
||||
for (int i = 0; i < roomCapacity; i++) {
|
||||
if (i == selfJoinIndex - 1) continue;
|
||||
peerAddrList[i].sockAddrIn = (*(newPeerAddrList + i)).sockAddrIn;
|
||||
peerAddrList[i].authKey = (*(newPeerAddrList + i)).authKey;
|
||||
struct PeerAddr* cand = (newPeerAddrList + i);
|
||||
if (NULL == cand || 0 == cand->sockAddrIn.sin_port) continue; // Not initialized
|
||||
peerAddrList[i].sockAddrIn = cand->sockAddrIn;
|
||||
peerAddrList[i].authKey = cand->authKey;
|
||||
sendRingBuff->put("foobar", 6, &(peerAddrList[i])); // Content hardcoded for now
|
||||
}
|
||||
|
||||
PunchPeerWork* work = new PunchPeerWork(roomCapacity, selfJoinIndex);
|
||||
uv_work_t* wrapper = (uv_work_t*)malloc(sizeof(uv_work_t));
|
||||
wrapper->data = work;
|
||||
uv_queue_work(loop, wrapper, _startPunchPeerTimerOnUvThread, _afterPunchPeerTimerStarted);
|
||||
uv_mutex_unlock(&sendRingBuffLock);
|
||||
uv_async_send(&uvSendLoopTriggerSig);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool DelayNoMore::UdpSession::broadcastInputFrameUpsync(BYTEC* const bytes, size_t bytesLen, int roomCapacity, int selfJoinIndex) {
|
||||
BroadcastInputFrameUpsyncWork* work = new BroadcastInputFrameUpsyncWork(bytes, bytesLen, roomCapacity, selfJoinIndex);
|
||||
uv_work_t* wrapper = (uv_work_t*)malloc(sizeof(uv_work_t));
|
||||
wrapper->data = work;
|
||||
uv_queue_work(loop, wrapper, _broadcastInputFrameUpsyncOnUvThread, _afterBroadcastInputFrameUpsync);
|
||||
uv_mutex_lock(&sendRingBuffLock);
|
||||
// Might want to send several times for better arrival rate
|
||||
for (int j = 0; j < broadcastUpsyncCnt; j++) {
|
||||
// Send to room udp tunnel in case of hole punching failure
|
||||
sendRingBuff->put(bytes, bytesLen, &udpTunnelAddr);
|
||||
for (int i = 0; i < roomCapacity; i++) {
|
||||
if (i + 1 == selfJoinIndex) {
|
||||
continue;
|
||||
}
|
||||
if (0 == peerAddrList[i].sockAddrIn.sin_port) {
|
||||
// Peer addr not initialized
|
||||
continue;
|
||||
}
|
||||
|
||||
sendRingBuff->put(bytes, bytesLen, &(peerAddrList[i])); // Content hardcoded for now
|
||||
}
|
||||
}
|
||||
|
||||
uv_mutex_unlock(&sendRingBuffLock);
|
||||
uv_async_send(&uvSendLoopTriggerSig);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@@ -1,18 +1,9 @@
|
||||
#include "uv/uv.h"
|
||||
#define __SSIZE_T // Otherwise "ssize_t" would have conflicting macros error that stops compiling
|
||||
|
||||
#ifndef udp_session_hpp
|
||||
#define udp_session_hpp
|
||||
|
||||
typedef char BYTEC;
|
||||
typedef char const CHARC;
|
||||
int const maxUdpPayloadBytes = 128;
|
||||
#include "send_ring_buff.hpp"
|
||||
|
||||
int const maxPeerCnt = 10;
|
||||
struct PeerAddr {
|
||||
struct sockaddr_in sockAddrIn;
|
||||
uint32_t authKey;
|
||||
};
|
||||
|
||||
namespace DelayNoMore {
|
||||
class UdpSession {
|
||||
|
@@ -126,8 +126,7 @@ bool upsertPeerUdpAddr(se::State& s) {
|
||||
}
|
||||
SE_BIND_FUNC(upsertPeerUdpAddr)
|
||||
|
||||
static bool udpSessionFinalize(se::State& s)
|
||||
{
|
||||
static bool udpSessionFinalize(se::State& s) {
|
||||
CCLOGINFO("jsbindings: finalizing JS object %p (DelayNoMore::UdpSession)", s.nativeThisObject());
|
||||
auto iter = se::NonRefNativePtrCreatedByCtorMap::find(s.nativeThisObject());
|
||||
if (iter != se::NonRefNativePtrCreatedByCtorMap::end()) {
|
||||
@@ -141,8 +140,7 @@ SE_BIND_FINALIZE_FUNC(udpSessionFinalize)
|
||||
|
||||
se::Object* __jsb_udp_session_proto = nullptr;
|
||||
se::Class* __jsb_udp_session_class = nullptr;
|
||||
bool registerUdpSession(se::Object* obj)
|
||||
{
|
||||
bool registerUdpSession(se::Object* obj) {
|
||||
// Get the ns
|
||||
se::Value nsVal;
|
||||
if (!obj->getProperty("DelayNoMore", &nsVal))
|
||||
|
@@ -15,6 +15,7 @@ LOCAL_SRC_FILES := hellojavascript/main.cpp \
|
||||
../../../Classes/jsb_module_register.cpp \
|
||||
../../../Classes/udp_session.cpp \
|
||||
../../../Classes/udp_session_bridge.cpp \
|
||||
../../../Classes/send_ring_buff.cpp
|
||||
|
||||
LOCAL_C_INCLUDES := $(LOCAL_PATH)/../../../Classes
|
||||
|
||||
|
@@ -190,9 +190,11 @@ copy "$(ProjectDir)..\..\..\project.json" "$(OutDir)\" /Y</Command>
|
||||
<ClCompile Include="..\Classes\AppDelegate.cpp" />
|
||||
<ClCompile Include="..\Classes\udp_session.cpp" />
|
||||
<ClCompile Include="..\Classes\udp_session_bridge.cpp" />
|
||||
<ClCompile Include="..\Classes\send_ring_buff.cpp" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<ClInclude Include="main.h" />
|
||||
<ClInclude Include="..\Classes\send_ring_buff.hpp" />
|
||||
<ClInclude Include="..\Classes\udp_session.hpp" />
|
||||
<ClInclude Include="..\Classes\udp_session_bridge.hpp" />
|
||||
<ClInclude Include="..\Classes\AppDelegate.h" />
|
||||
@@ -207,4 +209,4 @@ copy "$(ProjectDir)..\..\..\project.json" "$(OutDir)\" /Y</Command>
|
||||
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
|
||||
<ImportGroup Label="ExtensionTargets">
|
||||
</ImportGroup>
|
||||
</Project>
|
||||
</Project>
|
||||
|
@@ -22,6 +22,9 @@
|
||||
<ClCompile Include="..\Classes\jsb_module_register.cpp">
|
||||
<Filter>Classes</Filter>
|
||||
</ClCompile>
|
||||
<ClCompile Include="..\Classes\send_ring_buff.cpp">
|
||||
<Filter>Classes</Filter>
|
||||
</ClCompile>
|
||||
<ClCompile Include="..\Classes\udp_session.cpp">
|
||||
<Filter>Classes</Filter>
|
||||
</ClCompile>
|
||||
@@ -37,6 +40,9 @@
|
||||
<Filter>win32</Filter>
|
||||
</ClInclude>
|
||||
<ClInclude Include="resource.h" />
|
||||
<ClInclude Include="..\Classes\send_ring_buff.hpp">
|
||||
<Filter>Classes</Filter>
|
||||
</ClInclude>
|
||||
<ClInclude Include="..\Classes\udp_session.hpp">
|
||||
<Filter>Classes</Filter>
|
||||
</ClInclude>
|
||||
@@ -54,4 +60,4 @@
|
||||
<Filter>resource</Filter>
|
||||
</Image>
|
||||
</ItemGroup>
|
||||
</Project>
|
||||
</Project>
|
||||
|
@@ -71,7 +71,7 @@
|
||||
"shelter_z_reducer",
|
||||
"shelter"
|
||||
],
|
||||
"last-module-event-record-time": 1674632533161,
|
||||
"last-module-event-record-time": 1675240036576,
|
||||
"simulator-orientation": false,
|
||||
"simulator-resolution": {
|
||||
"height": 640,
|
||||
|
@@ -24,7 +24,6 @@ const (
|
||||
|
||||
INPUT_DELAY_FRAMES = int32(6) // in the count of render frames
|
||||
INPUT_SCALE_FRAMES = uint32(2) // inputDelayedAndScaledFrameId = ((originalFrameId - InputDelayFrames) >> InputScaleFrames)
|
||||
NST_DELAY_FRAMES = int32(16) // network-single-trip delay in the count of render frames, proposed to be (InputDelayFrames >> 1) because we expect a round-trip delay to be exactly "InputDelayFrames"
|
||||
|
||||
SP_ATK_LOOKUP_FRAMES = int32(5)
|
||||
|
||||
|
Reference in New Issue
Block a user