Integrated basic holepunching for multiplayer codebase.

This commit is contained in:
genxium
2023-01-25 18:26:13 +08:00
parent 5df545e168
commit 8536521136
16 changed files with 1522 additions and 981 deletions

View File

@@ -37,6 +37,8 @@ type mysqlConf struct {
type sioConf struct {
HostAndPort string `json:"hostAndPort"`
UdpHost string `json:"udpHost"`
UdpPort int `json:"udpPort"`
}
type botServerConf struct {

View File

@@ -1,3 +1,5 @@
{
"hostAndPort": "0.0.0.0:9992"
"hostAndPort": "0.0.0.0:9992",
"udpHost": "0.0.0.0",
"udpPort": 3000
}

View File

@@ -23,6 +23,8 @@ import (
"github.com/gin-gonic/gin"
"github.com/robfig/cron"
"go.uber.org/zap"
"net"
)
func main() {
@@ -34,7 +36,7 @@ func main() {
env_tools.MergeTestPlayerAccounts()
}
models.InitRoomHeapManager()
startScheduler()
// startScheduler()
router := gin.Default()
setRouter(router)
@@ -54,6 +56,7 @@ func main() {
}
Logger.Info("Listening and serving HTTP on", zap.Any("Conf.Sio.HostAndPort", Conf.Sio.HostAndPort))
}()
go startUdpServer()
var gracefulStop = make(chan os.Signal)
signal.Notify(gracefulStop, syscall.SIGTERM)
signal.Notify(gracefulStop, syscall.SIGINT)
@@ -114,3 +117,26 @@ func startScheduler() {
//c.AddFunc("*/1 * * * * *", FuncName)
c.Start()
}
func startUdpServer() {
conn, err := net.ListenUDP("udp", &net.UDPAddr{
Port: Conf.Sio.UdpPort,
IP: net.ParseIP(Conf.Sio.UdpHost),
})
if err != nil {
panic(err)
}
defer conn.Close()
Logger.Info(fmt.Sprintf("Udp server started at %s", conn.LocalAddr().String()))
for {
message := make([]byte, 2046)
rlen, remote, err := conn.ReadFromUDP(message[:])
if err != nil {
panic(err)
}
Logger.Info(fmt.Sprintf("received: %d bytes from %s\n", rlen, remote))
ws.HandleUdpHolePunchingForPlayer(message[0:rlen], remote)
}
}

View File

@@ -50,6 +50,8 @@ type Player struct {
LastSentInputFrameId int32
AckingFrameId int32
AckingInputFrameId int32
UdpAddr *PeerUdpAddr
}
func ExistPlayerByName(name string) (bool, error) {

View File

@@ -13,6 +13,7 @@ import (
"io/ioutil"
"jsexport/battle"
"math/rand"
"net"
"os"
"path/filepath"
"resolv"
@@ -32,6 +33,7 @@ const (
DOWNSYNC_MSG_ACT_BATTLE_STOPPED = int32(3)
DOWNSYNC_MSG_ACT_FORCED_RESYNC = int32(4)
DOWNSYNC_MSG_ACT_PEER_INPUT_BATCH = int32(5)
DOWNSYNC_MSG_ACT_PEER_UDP_ADDR = int32(6)
DOWNSYNC_MSG_ACT_BATTLE_READY_TO_START = int32(-1)
DOWNSYNC_MSG_ACT_BATTLE_START = int32(0)
@@ -176,6 +178,7 @@ func (pR *Room) AddPlayerIfPossible(pPlayerFromDbInit *Player, session *websocke
defer pR.onPlayerAdded(playerId)
pPlayerFromDbInit.UdpAddr = nil
pPlayerFromDbInit.AckingFrameId = -1
pPlayerFromDbInit.AckingInputFrameId = -1
pPlayerFromDbInit.LastSentInputFrameId = MAGIC_LAST_SENT_INPUT_FRAME_ID_NORMAL_ADDED
@@ -215,6 +218,7 @@ func (pR *Room) ReAddPlayerIfPossible(pTmpPlayerInstance *Player, session *webso
*/
defer pR.onPlayerReAdded(playerId)
pEffectiveInRoomPlayerInstance := pR.Players[playerId]
pEffectiveInRoomPlayerInstance.UdpAddr = nil
pEffectiveInRoomPlayerInstance.AckingFrameId = -1
pEffectiveInRoomPlayerInstance.AckingInputFrameId = -1
pEffectiveInRoomPlayerInstance.LastSentInputFrameId = MAGIC_LAST_SENT_INPUT_FRAME_ID_READDED
@@ -1068,7 +1072,9 @@ func (pR *Room) sendSafely(roomDownsyncFrame *pb.RoomDownsyncFrame, toSendInputF
panic(fmt.Sprintf("Error marshaling downsync message: roomId=%v, playerId=%v, roomState=%v, roomEffectivePlayerCount=%v", pR.Id, playerId, pR.State, pR.EffectivePlayerCount))
}
if MAGIC_JOIN_INDEX_DEFAULT == peerJoinIndex {
shouldUseSecondaryWsSession := (MAGIC_JOIN_INDEX_DEFAULT != peerJoinIndex && DOWNSYNC_MSG_ACT_INPUT_BATCH == act) // FIXME: Simplify the condition
//Logger.Info(fmt.Sprintf("shouldUseSecondaryWsSession=%v: roomId=%v, playerId=%v, roomState=%v, roomEffectivePlayerCount=%v", shouldUseSecondaryWsSession, pR.Id, playerId, pR.State, pR.EffectivePlayerCount))
if !shouldUseSecondaryWsSession {
if playerDownsyncSession, existent := pR.PlayerDownsyncSessionDict[playerId]; existent {
if err := playerDownsyncSession.WriteMessage(websocket.BinaryMessage, theBytes); nil != err {
panic(fmt.Sprintf("Error sending primary downsync message: roomId=%v, playerId=%v, roomState=%v, roomEffectivePlayerCount=%v, err=%v", pR.Id, playerId, pR.State, pR.EffectivePlayerCount, err))
@@ -1607,3 +1613,53 @@ func (pR *Room) SetSecondarySession(playerId int32, session *websocket.Conn, sig
}
}
}
func (pR *Room) UpdatePeerUdpAddrList(playerId int32, peerAddr *net.UDPAddr, pReq *pb.HolePunchUpsync) {
// TODO: There's a chance that by now "player.JoinIndex" is not yet determined, use a lock to sync
if player, ok := pR.Players[playerId]; ok && MAGIC_JOIN_INDEX_DEFAULT != player.JoinIndex {
playerBattleState := atomic.LoadInt32(&(player.BattleState))
switch playerBattleState {
case PlayerBattleStateIns.DISCONNECTED, PlayerBattleStateIns.LOST, PlayerBattleStateIns.EXPELLED_DURING_GAME, PlayerBattleStateIns.EXPELLED_IN_DISMISSAL:
// Kindly note that "PlayerBattleStateIns.ADDED_PENDING_BATTLE_COLLIDER_ACK, PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK" are allowed
return
}
if _, existent := pR.PlayerDownsyncSessionDict[playerId]; existent {
player.UdpAddr = &pb.PeerUdpAddr{
Ip: peerAddr.IP.String(),
Port: int32(peerAddr.Port),
AuthKey: pReq.AuthKey,
}
Logger.Info(fmt.Sprintf("UpdatePeerUdpAddrList done for roomId=%v, playerId=%d, peerAddr=%s", pR.Id, playerId, peerAddr))
peerJoinIndex := player.JoinIndex
peerUdpAddrList := make([]*pb.PeerUdpAddr, pR.Capacity, pR.Capacity)
for _, otherPlayer := range pR.Players {
if MAGIC_JOIN_INDEX_DEFAULT == otherPlayer.JoinIndex {
// TODO: Again this shouldn't happen, apply proper locking
continue
}
// In case of highly concurrent update that might occur while later marshalling, use the ptr of a copy
peerUdpAddrList[otherPlayer.JoinIndex-1] = &pb.PeerUdpAddr{
Ip: otherPlayer.UdpAddr.Ip,
Port: otherPlayer.UdpAddr.Port,
AuthKey: otherPlayer.UdpAddr.AuthKey,
}
}
// Broadcast this new UDP addr to all the existing players
for otherPlayerId, otherPlayer := range pR.Players {
otherPlayerBattleState := atomic.LoadInt32(&(otherPlayer.BattleState))
switch otherPlayerBattleState {
case PlayerBattleStateIns.DISCONNECTED, PlayerBattleStateIns.LOST, PlayerBattleStateIns.EXPELLED_DURING_GAME, PlayerBattleStateIns.EXPELLED_IN_DISMISSAL:
continue
}
Logger.Info(fmt.Sprintf("Downsyncing peerUdpAddrList for roomId=%v, playerId=%d", pR.Id, otherPlayerId))
pR.sendSafely(&pb.RoomDownsyncFrame{
PeerUdpAddrList: peerUdpAddrList,
}, nil, DOWNSYNC_MSG_ACT_PEER_UDP_ADDR, otherPlayerId, false, peerJoinIndex)
}
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -10,6 +10,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
"go.uber.org/zap"
"net"
"net/http"
"strconv"
"sync/atomic"
@@ -256,17 +257,19 @@ func Serve(c *gin.Context) {
SpaceOffsetX: pRoom.SpaceOffsetX,
SpaceOffsetY: pRoom.SpaceOffsetY,
RenderCacheSize: pRoom.RenderCacheSize,
CollisionMinStep: pRoom.CollisionMinStep,
RenderCacheSize: pRoom.RenderCacheSize,
CollisionMinStep: pRoom.CollisionMinStep,
BoundRoomCapacity: int32(pRoom.Capacity),
FrameDataLoggingEnabled: pRoom.FrameDataLoggingEnabled,
}
resp := &pb.WsResp{
Ret: int32(Constants.RetCode.Ok),
EchoedMsgId: int32(0),
Act: models.DOWNSYNC_MSG_ACT_HB_REQ,
BciFrame: bciFrame,
Ret: int32(Constants.RetCode.Ok),
EchoedMsgId: int32(0),
Act: models.DOWNSYNC_MSG_ACT_HB_REQ,
BciFrame: bciFrame,
PeerJoinIndex: pThePlayer.JoinIndex,
}
Logger.Debug("Sending downsync HeartbeatRequirements:", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Any("resp", resp))
@@ -432,12 +435,12 @@ func HandleSecondaryWsSessionForPlayer(c *gin.Context) {
playerId, err := models.GetPlayerIdByToken(token)
if err != nil || playerId == 0 {
// TODO: Abort with specific message.
Logger.Warn("Secondary ws session playerLogin record not found for ws authentication:", zap.Any("intAuthToken", token))
Logger.Warn("Secondary ws session playerLogin record not found:", zap.Any("intAuthToken", token))
c.AbortWithStatus(http.StatusBadRequest)
return
}
Logger.Info("Secondary ws session playerLogin record has been found for ws authentication:", zap.Any("playerId", playerId), zap.Any("intAuthToken", token), zap.Any("boundRoomId", boundRoomId))
Logger.Info("Secondary ws session playerLogin record has been found:", zap.Any("playerId", playerId), zap.Any("intAuthToken", token), zap.Any("boundRoomId", boundRoomId))
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
@@ -482,3 +485,32 @@ func HandleSecondaryWsSessionForPlayer(c *gin.Context) {
pRoom.SetSecondarySession(int32(playerId), conn, signalToCloseConnOfThisPlayer)
}
func HandleUdpHolePunchingForPlayer(message []byte, peerAddr *net.UDPAddr) {
pReq := new(pb.HolePunchUpsync)
if unmarshalErr := proto.Unmarshal(message, pReq); nil != unmarshalErr {
Logger.Error("Udp session failed to unmarshal", zap.Error(unmarshalErr))
return
}
token := pReq.IntAuthToken
boundRoomId := pReq.BoundRoomId
pRoom, existent := (*models.RoomMapManagerIns)[int32(boundRoomId)]
// Deliberately querying playerId after querying room, because the former is against persistent storage and could be slow!
if !existent {
Logger.Warn("Udp session failed to get:\n", zap.Any("intAuthToken", token), zap.Any("forBoundRoomId", boundRoomId))
return
}
// TODO: Wrap the following 2 stmts by sql transaction!
playerId, err := models.GetPlayerIdByToken(token)
if err != nil || playerId == 0 {
// TODO: Abort with specific message.
Logger.Warn("Udp session playerLogin record not found for:", zap.Any("intAuthToken", token))
return
}
Logger.Info("Udp session playerLogin record has been found:", zap.Any("playerId", playerId), zap.Any("intAuthToken", token), zap.Any("boundRoomId", boundRoomId), zap.Any("peerAddr", peerAddr))
pRoom.UpdatePeerUdpAddrList(int32(playerId), peerAddr, pReq)
}