mirror of
https://github.com/genxium/DelayNoMore
synced 2025-01-14 23:11:25 +00:00
405 lines
16 KiB
Go
405 lines
16 KiB
Go
package ws
|
|
|
|
import (
|
|
"container/heap"
|
|
"fmt"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/gorilla/websocket"
|
|
"go.uber.org/zap"
|
|
"net/http"
|
|
. "server/common"
|
|
"server/models"
|
|
pb "server/pb_output"
|
|
"strconv"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
. "dnmshared"
|
|
)
|
|
|
|
const (
|
|
READ_BUF_SIZE = 8 * 1024
|
|
WRITE_BUF_SIZE = 8 * 1024
|
|
)
|
|
|
|
var upgrader = websocket.Upgrader{
|
|
ReadBufferSize: READ_BUF_SIZE,
|
|
WriteBufferSize: WRITE_BUF_SIZE,
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
Logger.Debug("origin", zap.Any("origin", r.Header.Get("Origin")))
|
|
return true
|
|
},
|
|
}
|
|
|
|
func startOrFeedHeartbeatWatchdog(conn *websocket.Conn) bool {
|
|
if nil == conn {
|
|
return false
|
|
}
|
|
conn.SetReadDeadline(time.Now().Add(time.Millisecond * (ConstVals.Ws.WillKickIfInactiveFor)))
|
|
return true
|
|
}
|
|
|
|
func Serve(c *gin.Context) {
|
|
token, ok := c.GetQuery("intAuthToken")
|
|
if !ok {
|
|
c.AbortWithStatus(http.StatusBadRequest)
|
|
return
|
|
}
|
|
Logger.Info("Finding PlayerLogin record for ws authentication:", zap.Any("intAuthToken", token))
|
|
boundRoomId := 0
|
|
expectRoomId := 0
|
|
var err error
|
|
if boundRoomIdStr, hasBoundRoomId := c.GetQuery("boundRoomId"); hasBoundRoomId {
|
|
boundRoomId, err = strconv.Atoi(boundRoomIdStr)
|
|
if err != nil {
|
|
// TODO: Abort with specific message.
|
|
c.AbortWithStatus(http.StatusBadRequest)
|
|
return
|
|
}
|
|
Logger.Info("Finding PlayerLogin record for ws authentication:", zap.Any("intAuthToken", token), zap.Any("boundRoomId", boundRoomId))
|
|
}
|
|
if expectRoomIdStr, hasExpectRoomId := c.GetQuery("expectedRoomId"); hasExpectRoomId {
|
|
expectRoomId, err = strconv.Atoi(expectRoomIdStr)
|
|
if err != nil {
|
|
c.AbortWithStatus(http.StatusBadRequest)
|
|
return
|
|
}
|
|
Logger.Info("Finding PlayerLogin record for ws authentication:", zap.Any("intAuthToken", token), zap.Any("expectedRoomId", expectRoomId))
|
|
}
|
|
|
|
// TODO: Wrap the following 2 stmts by sql transaction!
|
|
playerId, err := models.GetPlayerIdByToken(token)
|
|
if err != nil || playerId == 0 {
|
|
// TODO: Abort with specific message.
|
|
Logger.Info("PlayerLogin record not found for ws authentication:", zap.Any("intAuthToken", token))
|
|
c.AbortWithStatus(http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
Logger.Info("PlayerLogin record has been found for ws authentication:", zap.Any("playerId", playerId))
|
|
|
|
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
|
if err != nil {
|
|
Logger.Error("upgrade:", zap.Error(err), zap.Any("playerId", playerId))
|
|
c.AbortWithStatus(http.StatusBadRequest)
|
|
return
|
|
}
|
|
Logger.Debug("ConstVals.Ws.WillKickIfInactiveFor", zap.Duration("v", ConstVals.Ws.WillKickIfInactiveFor))
|
|
/**
|
|
* WARNING: After successfully upgraded to use the "persistent connection" of http1.1/websocket protocol, you CANNOT overwrite the http1.0 resp status by `c.AbortWithStatus(...)` any more!
|
|
*/
|
|
|
|
connHasBeenSignaledToClose := int32(0)
|
|
pConnHasBeenSignaledToClose := &connHasBeenSignaledToClose
|
|
|
|
var pRoom *models.Room = nil
|
|
signalToCloseConnOfThisPlayer := func(customRetCode int, customRetMsg string) {
|
|
if swapped := atomic.CompareAndSwapInt32(pConnHasBeenSignaledToClose, 0, 1); !swapped {
|
|
return
|
|
}
|
|
Logger.Warn("signalToCloseConnOfThisPlayer:", zap.Any("playerId", playerId), zap.Any("customRetCode", customRetCode), zap.Any("customRetMsg", customRetMsg))
|
|
if nil != pRoom {
|
|
pRoom.OnPlayerDisconnected(int32(playerId))
|
|
}
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
Logger.Warn("Recovered from: ", zap.Any("panic", r))
|
|
}
|
|
}()
|
|
/**
|
|
* References
|
|
* - https://tools.ietf.org/html/rfc6455
|
|
* - https://godoc.org/github.com/gorilla/websocket#hdr-Control_Messages
|
|
* - https://godoc.org/github.com/gorilla/websocket#FormatCloseMessage
|
|
* - https://godoc.org/github.com/gorilla/websocket#Conn.WriteControl
|
|
* - https://godoc.org/github.com/gorilla/websocket#hdr-Concurrency
|
|
* - "The Close and WriteControl methods can be called concurrently with all other methods."
|
|
*/
|
|
|
|
/**
|
|
* References for the "WebsocketStdCloseCode"s. Note that we're using some "CustomCloseCode"s here as well.
|
|
*
|
|
* - https://tools.ietf.org/html/rfc6455#section-7.4
|
|
* - https://godoc.org/github.com/gorilla/websocket#pkg-constants.
|
|
*/
|
|
closeMessage := websocket.FormatCloseMessage(customRetCode, customRetMsg)
|
|
err := conn.WriteControl(websocket.CloseMessage, closeMessage, time.Now().Add(time.Millisecond*(ConstVals.Ws.WillKickIfInactiveFor)))
|
|
if err != nil {
|
|
Logger.Error("Unable to send the CloseFrame control message to player(client-side):", zap.Any("playerId", playerId), zap.Error(err))
|
|
}
|
|
|
|
time.AfterFunc(3*time.Second, func() {
|
|
// To actually terminates the underlying TCP connection which might be in `CLOSE_WAIT` state if inspected by `netstat`.
|
|
conn.Close()
|
|
})
|
|
}
|
|
|
|
onReceivedCloseMessageFromClient := func(code int, text string) error {
|
|
Logger.Warn("Triggered `onReceivedCloseMessageFromClient`:", zap.Any("code", code), zap.Any("playerId", playerId), zap.Any("message", text))
|
|
signalToCloseConnOfThisPlayer(code, text)
|
|
return nil
|
|
}
|
|
|
|
/**
|
|
* - "SetCloseHandler sets the handler for close messages received from the peer."
|
|
*
|
|
* - "The default close handler sends a close message back to the peer."
|
|
*
|
|
* - "The connection read methods return a CloseError when a close message is received. Most applications should handle close messages as part of their normal error handling. Applications should only set a close handler when the application must perform some action before sending a close message back to the peer."
|
|
*
|
|
* from reference https://godoc.org/github.com/gorilla/websocket#Conn.SetCloseHandler.
|
|
*/
|
|
conn.SetCloseHandler(onReceivedCloseMessageFromClient)
|
|
|
|
pPlayer, err := models.GetPlayerById(playerId)
|
|
|
|
if nil != err || nil == pPlayer {
|
|
// TODO: Abort with specific message.
|
|
signalToCloseConnOfThisPlayer(Constants.RetCode.PlayerNotFound, "")
|
|
}
|
|
|
|
Logger.Info("Player has logged in and its profile is found from persistent storage:", zap.Any("playerId", playerId), zap.Any("play", pPlayer))
|
|
|
|
// Find a room to join.
|
|
Logger.Info("About to acquire RoomHeapMux for player:", zap.Any("playerId", playerId))
|
|
(*(models.RoomHeapMux)).Lock()
|
|
defer func() {
|
|
(*(models.RoomHeapMux)).Unlock()
|
|
Logger.Info("Released RoomHeapMux for player:", zap.Any("playerId", playerId))
|
|
}()
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
Logger.Error("Recovered from: ", zap.Any("panic", r))
|
|
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, "")
|
|
}
|
|
}()
|
|
Logger.Info("Acquired RoomHeapMux for player:", zap.Any("playerId", playerId))
|
|
// Logger.Info("The RoomHeapManagerIns has:", zap.Any("addr", fmt.Sprintf("%p", models.RoomHeapManagerIns)), zap.Any("size", len(*(models.RoomHeapManagerIns))))
|
|
playerSuccessfullyAddedToRoom := false
|
|
if 0 < boundRoomId {
|
|
if tmpPRoom, existent := (*models.RoomMapManagerIns)[int32(boundRoomId)]; existent {
|
|
pRoom = tmpPRoom
|
|
Logger.Info("Successfully got:\n", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Any("forBoundRoomId", boundRoomId))
|
|
res := pRoom.ReAddPlayerIfPossible(pPlayer, conn, signalToCloseConnOfThisPlayer)
|
|
if !res {
|
|
Logger.Warn("Failed to get:\n", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Any("forBoundRoomId", boundRoomId))
|
|
} else {
|
|
playerSuccessfullyAddedToRoom = true
|
|
}
|
|
}
|
|
}
|
|
|
|
if 0 < expectRoomId {
|
|
if tmpRoom, existent := (*models.RoomMapManagerIns)[int32(expectRoomId)]; existent {
|
|
pRoom = tmpRoom
|
|
Logger.Info("Successfully got:\n", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Any("forExpectedRoomId", expectRoomId))
|
|
|
|
if pRoom.ReAddPlayerIfPossible(pPlayer, conn, signalToCloseConnOfThisPlayer) {
|
|
playerSuccessfullyAddedToRoom = true
|
|
} else if pRoom.AddPlayerIfPossible(pPlayer, conn, signalToCloseConnOfThisPlayer) {
|
|
playerSuccessfullyAddedToRoom = true
|
|
} else {
|
|
Logger.Warn("Failed to get:\n", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Any("forExpectedRoomId", expectRoomId))
|
|
playerSuccessfullyAddedToRoom = false
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
if false == playerSuccessfullyAddedToRoom {
|
|
defer func() {
|
|
if pRoom != nil {
|
|
heap.Push(models.RoomHeapManagerIns, pRoom)
|
|
(models.RoomHeapManagerIns).Update(pRoom, pRoom.Score)
|
|
}
|
|
(models.RoomHeapManagerIns).PrintInOrder()
|
|
}()
|
|
tmpRoom, ok := heap.Pop(models.RoomHeapManagerIns).(*models.Room)
|
|
if !ok {
|
|
signalToCloseConnOfThisPlayer(Constants.RetCode.LocallyNoAvailableRoom, fmt.Sprintf("Cannot pop a (*Room) for playerId == %v!", playerId))
|
|
} else {
|
|
pRoom = tmpRoom
|
|
Logger.Info("Successfully popped:\n", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId))
|
|
res := pRoom.AddPlayerIfPossible(pPlayer, conn, signalToCloseConnOfThisPlayer)
|
|
if !res {
|
|
signalToCloseConnOfThisPlayer(Constants.RetCode.PlayerNotAddableToRoom, fmt.Sprintf("AddPlayerIfPossible returns false for roomId == %v, playerId == %v!", pRoom.Id, playerId))
|
|
}
|
|
}
|
|
}
|
|
|
|
if swapped := atomic.CompareAndSwapInt32(pConnHasBeenSignaledToClose, 1, 1); swapped {
|
|
return
|
|
}
|
|
|
|
if pThePlayer, ok := pRoom.Players[int32(playerId)]; ok && (models.PlayerBattleStateIns.ADDED_PENDING_BATTLE_COLLIDER_ACK == pThePlayer.BattleState || models.PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK == pThePlayer.BattleState) {
|
|
defer func() {
|
|
timeoutSeconds := time.Duration(5) * time.Second
|
|
time.AfterFunc(timeoutSeconds, func() {
|
|
if models.PlayerBattleStateIns.ADDED_PENDING_BATTLE_COLLIDER_ACK == pThePlayer.BattleState || models.PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK == pThePlayer.BattleState {
|
|
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, fmt.Sprintf("The expected Ack for BattleColliderInfo is not received in %s, for playerId == %v!", timeoutSeconds, playerId))
|
|
}
|
|
})
|
|
}()
|
|
|
|
// Construct "battleColliderInfo" to downsync
|
|
bciFrame := &pb.BattleColliderInfo{
|
|
BoundRoomId: pRoom.Id,
|
|
StageName: pRoom.StageName,
|
|
StrToVec2DListMap: models.ToPbVec2DListMap(pRoom.RawBattleStrToVec2DListMap),
|
|
StrToPolygon2DListMap: models.ToPbPolygon2DListMap(pRoom.RawBattleStrToPolygon2DListMap),
|
|
StageDiscreteW: pRoom.StageDiscreteW,
|
|
StageDiscreteH: pRoom.StageDiscreteH,
|
|
StageTileW: pRoom.StageTileW,
|
|
StageTileH: pRoom.StageTileH,
|
|
|
|
IntervalToPing: int32(Constants.Ws.IntervalToPing),
|
|
WillKickIfInactiveFor: int32(Constants.Ws.WillKickIfInactiveFor),
|
|
BattleDurationNanos: pRoom.BattleDurationNanos,
|
|
ServerFps: pRoom.ServerFps,
|
|
InputDelayFrames: pRoom.InputDelayFrames,
|
|
InputScaleFrames: pRoom.InputScaleFrames,
|
|
NstDelayFrames: pRoom.NstDelayFrames,
|
|
InputFrameUpsyncDelayTolerance: pRoom.InputFrameUpsyncDelayTolerance,
|
|
MaxChasingRenderFramesPerUpdate: pRoom.MaxChasingRenderFramesPerUpdate,
|
|
PlayerBattleState: pThePlayer.BattleState, // For frontend to know whether it's rejoining
|
|
RollbackEstimatedDt: pRoom.RollbackEstimatedDt,
|
|
RollbackEstimatedDtMillis: pRoom.RollbackEstimatedDtMillis,
|
|
RollbackEstimatedDtNanos: pRoom.RollbackEstimatedDtNanos,
|
|
}
|
|
|
|
resp := &pb.WsResp{
|
|
Ret: int32(Constants.RetCode.Ok),
|
|
EchoedMsgId: int32(0),
|
|
Act: models.DOWNSYNC_MSG_ACT_HB_REQ,
|
|
BciFrame: bciFrame,
|
|
}
|
|
|
|
Logger.Debug("Sending downsync HeartbeatRequirements:", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Any("resp", resp))
|
|
|
|
theBytes, marshalErr := proto.Marshal(resp)
|
|
if nil != marshalErr {
|
|
Logger.Error("Error marshalling HeartbeatRequirements:", zap.Any("the error", marshalErr), zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId))
|
|
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, fmt.Sprintf("Error marshalling HeartbeatRequirements, playerId == %v and roomId == %v!", playerId, pRoom.Id))
|
|
}
|
|
|
|
if err := conn.WriteMessage(websocket.BinaryMessage, theBytes); nil != err {
|
|
Logger.Error("HeartbeatRequirements resp not written:", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Error(err))
|
|
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, fmt.Sprintf("HeartbeatRequirements resp not written to roomId=%v, playerId == %v!", pRoom.Id, playerId))
|
|
}
|
|
}
|
|
|
|
/*
|
|
TODO
|
|
|
|
Is there a way to EXPLICITLY make this "receivingLoopAgainstPlayer/conn.ReadXXX(...)" edge-triggered or yield/park otherwise? For example a C-style equivalent would be as follows.
|
|
|
|
```
|
|
receivingLoopAgainstPlayer := func() error {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
Logger.Warn("Goroutine `receivingLoopAgainstPlayer`, recovery spot#1, recovered from: ", zap.Any("panic", r))
|
|
}
|
|
Logger.Info("Goroutine `receivingLoopAgainstPlayer` is stopped for:", zap.Any("playerId", playerId), zap.Any("roomId", pRoom.Id))
|
|
}()
|
|
|
|
// Set O_NONBLOCK on "fdOfThisConn".
|
|
int flags = fcntl(fdOfThisConn, F_GETFL, 0);
|
|
fcntl(fdOfThisConn, F_SETFL, flags | O_NONBLOCK);
|
|
|
|
int ep_fd = epoll_create1(0);
|
|
epoll_event ev;
|
|
ev.data.fd = fdOfThisConn;
|
|
ev.events = (EPOLLIN | EPOLLET | CUSTOM_SIGNAL_TO_CLOSE); // Is this possible?
|
|
epoll_ctl(ep_fd, EPOLL_CTL_ADD, fdOfThisConn, &ev);
|
|
epoll_event *evs = (epoll_event*)calloc(MAXEVENTS, sizeof(epoll_event));
|
|
|
|
bool localAwarenessOfSignaledToClose = false;
|
|
|
|
while(true) {
|
|
if (true == localAwarenessOfSignaledToClose) {
|
|
return;
|
|
}
|
|
|
|
// Would yield the current KernelThread and park it to a "queue" for later being unparked from the same "queue", thus resumed running. See http://web.stanford.edu/~hhli/CS110Notes/CS110NotesCollection/Topic%204%20Networking%20(5).html for more information. However, multiple "goroutine"s might share a same KernelThread and could be an issue for yielding.
|
|
int n = epoll_wait(ep_fd, evs, MAXEVENTS, -1);
|
|
|
|
for (int i = 0; i < n; ++i) {
|
|
if (evs[i].data.fd == fdOfThisConn) {
|
|
if (
|
|
(evs[i].events & EPOLLERR) ||
|
|
(evs[i].events & EPOLLHUP) ||
|
|
(evs[i].events & CUSTOM_SIGNAL_TO_CLOSE)
|
|
) {
|
|
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, "")
|
|
localAwarenessOfSignaledToClose = true;
|
|
break;
|
|
}
|
|
int nbytes = 0;
|
|
while(nbytes = recv(fdOfThisConn, buff, sizeof(buff)) && 0 < nbytes) {
|
|
...
|
|
}
|
|
// Now that "0 == nbytes" or "EWOULDBLOCK == nbytes" or other errors came up.
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
```
|
|
-- YFLu, 2020-07-03
|
|
*/
|
|
|
|
// Starts the receiving loop against the client-side
|
|
receivingLoopAgainstPlayer := func() error {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
Logger.Warn("Goroutine `receivingLoopAgainstPlayer`, recovery spot#1, recovered from: ", zap.Any("panic", r))
|
|
}
|
|
Logger.Info("Goroutine `receivingLoopAgainstPlayer` is stopped for:", zap.Any("playerId", playerId), zap.Any("roomId", pRoom.Id))
|
|
}()
|
|
for {
|
|
if swapped := atomic.CompareAndSwapInt32(pConnHasBeenSignaledToClose, 1, 1); swapped {
|
|
return nil
|
|
}
|
|
|
|
// Tries to receive from client-side in a non-blocking manner.
|
|
_, bytes, err := conn.ReadMessage()
|
|
if nil != err {
|
|
Logger.Error("About to `signalToCloseConnOfThisPlayer`", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Error(err))
|
|
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, "")
|
|
return nil
|
|
}
|
|
|
|
pReq := new(pb.WsReq)
|
|
unmarshalErr := proto.Unmarshal(bytes, pReq)
|
|
if nil != unmarshalErr {
|
|
Logger.Error("About to `signalToCloseConnOfThisPlayer`", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Error(unmarshalErr))
|
|
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, "")
|
|
}
|
|
|
|
// Logger.Info("Received request message from client", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Any("pReq", pReq))
|
|
|
|
switch pReq.Act {
|
|
case models.UPSYNC_MSG_ACT_HB_PING:
|
|
startOrFeedHeartbeatWatchdog(conn)
|
|
case models.UPSYNC_MSG_ACT_PLAYER_CMD:
|
|
startOrFeedHeartbeatWatchdog(conn)
|
|
pRoom.OnBattleCmdReceived(pReq)
|
|
case models.UPSYNC_MSG_ACT_PLAYER_COLLIDER_ACK:
|
|
res := pRoom.OnPlayerBattleColliderAcked(int32(playerId))
|
|
if false == res {
|
|
Logger.Error("About to `signalToCloseConnOfThisPlayer`", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Error(err))
|
|
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, "")
|
|
return nil
|
|
}
|
|
default:
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
startOrFeedHeartbeatWatchdog(conn)
|
|
go receivingLoopAgainstPlayer()
|
|
}
|