mirror of
https://github.com/genxium/DelayNoMore
synced 2025-01-14 23:11:25 +00:00
379 lines
15 KiB
Go
379 lines
15 KiB
Go
|
package ws
|
||
|
|
||
|
import (
|
||
|
"container/heap"
|
||
|
"fmt"
|
||
|
"github.com/gin-gonic/gin"
|
||
|
"github.com/golang/protobuf/proto"
|
||
|
"github.com/gorilla/websocket"
|
||
|
"go.uber.org/zap"
|
||
|
"net/http"
|
||
|
. "server/common"
|
||
|
"server/models"
|
||
|
pb "server/pb_output"
|
||
|
"strconv"
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
READ_BUF_SIZE = 8 * 1024
|
||
|
WRITE_BUF_SIZE = 8 * 1024
|
||
|
)
|
||
|
|
||
|
var upgrader = websocket.Upgrader{
|
||
|
ReadBufferSize: READ_BUF_SIZE,
|
||
|
WriteBufferSize: WRITE_BUF_SIZE,
|
||
|
CheckOrigin: func(r *http.Request) bool {
|
||
|
Logger.Debug("origin", zap.Any("origin", r.Header.Get("Origin")))
|
||
|
return true
|
||
|
},
|
||
|
}
|
||
|
|
||
|
func startOrFeedHeartbeatWatchdog(conn *websocket.Conn) bool {
|
||
|
if nil == conn {
|
||
|
return false
|
||
|
}
|
||
|
conn.SetReadDeadline(time.Now().Add(time.Millisecond * (ConstVals.Ws.WillKickIfInactiveFor)))
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
func Serve(c *gin.Context) {
|
||
|
token, ok := c.GetQuery("intAuthToken")
|
||
|
if !ok {
|
||
|
c.AbortWithStatus(http.StatusBadRequest)
|
||
|
return
|
||
|
}
|
||
|
Logger.Info("Finding PlayerLogin record for ws authentication:", zap.Any("intAuthToken", token))
|
||
|
boundRoomId := 0
|
||
|
expectRoomId := 0
|
||
|
var err error
|
||
|
if boundRoomIdStr, hasBoundRoomId := c.GetQuery("boundRoomId"); hasBoundRoomId {
|
||
|
boundRoomId, err = strconv.Atoi(boundRoomIdStr)
|
||
|
if err != nil {
|
||
|
// TODO: Abort with specific message.
|
||
|
c.AbortWithStatus(http.StatusBadRequest)
|
||
|
return
|
||
|
}
|
||
|
Logger.Info("Finding PlayerLogin record for ws authentication:", zap.Any("intAuthToken", token), zap.Any("boundRoomId", boundRoomId))
|
||
|
}
|
||
|
if expectRoomIdStr, hasExpectRoomId := c.GetQuery("expectedRoomId"); hasExpectRoomId {
|
||
|
expectRoomId, err = strconv.Atoi(expectRoomIdStr)
|
||
|
if err != nil {
|
||
|
c.AbortWithStatus(http.StatusBadRequest)
|
||
|
return
|
||
|
}
|
||
|
Logger.Info("Finding PlayerLogin record for ws authentication:", zap.Any("intAuthToken", token), zap.Any("expectedRoomId", expectRoomId))
|
||
|
}
|
||
|
|
||
|
// TODO: Wrap the following 2 stmts by sql transaction!
|
||
|
playerId, err := models.GetPlayerIdByToken(token)
|
||
|
if err != nil || playerId == 0 {
|
||
|
// TODO: Abort with specific message.
|
||
|
Logger.Info("PlayerLogin record not found for ws authentication:", zap.Any("intAuthToken", token))
|
||
|
c.AbortWithStatus(http.StatusBadRequest)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
Logger.Info("PlayerLogin record has been found for ws authentication:", zap.Any("playerId", playerId))
|
||
|
|
||
|
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
||
|
if err != nil {
|
||
|
Logger.Error("upgrade:", zap.Error(err), zap.Any("playerId", playerId))
|
||
|
c.AbortWithStatus(http.StatusBadRequest)
|
||
|
return
|
||
|
}
|
||
|
Logger.Debug("ConstVals.Ws.WillKickIfInactiveFor", zap.Duration("v", ConstVals.Ws.WillKickIfInactiveFor))
|
||
|
/**
|
||
|
* WARNING: After successfully upgraded to use the "persistent connection" of http1.1/websocket protocol, you CANNOT overwrite the http1.0 resp status by `c.AbortWithStatus(...)` any more!
|
||
|
*/
|
||
|
|
||
|
connHasBeenSignaledToClose := int32(0)
|
||
|
pConnHasBeenSignaledToClose := &connHasBeenSignaledToClose
|
||
|
|
||
|
var pRoom *models.Room = nil
|
||
|
signalToCloseConnOfThisPlayer := func(customRetCode int, customRetMsg string) {
|
||
|
if swapped := atomic.CompareAndSwapInt32(pConnHasBeenSignaledToClose, 0, 1); !swapped {
|
||
|
return
|
||
|
}
|
||
|
Logger.Warn("signalToCloseConnOfThisPlayer:", zap.Any("playerId", playerId), zap.Any("customRetCode", customRetCode), zap.Any("customRetMsg", customRetMsg))
|
||
|
if nil != pRoom {
|
||
|
pRoom.OnPlayerDisconnected(int32(playerId))
|
||
|
}
|
||
|
defer func() {
|
||
|
if r := recover(); r != nil {
|
||
|
Logger.Warn("Recovered from: ", zap.Any("panic", r))
|
||
|
}
|
||
|
}()
|
||
|
/**
|
||
|
* References
|
||
|
* - https://tools.ietf.org/html/rfc6455
|
||
|
* - https://godoc.org/github.com/gorilla/websocket#hdr-Control_Messages
|
||
|
* - https://godoc.org/github.com/gorilla/websocket#FormatCloseMessage
|
||
|
* - https://godoc.org/github.com/gorilla/websocket#Conn.WriteControl
|
||
|
* - https://godoc.org/github.com/gorilla/websocket#hdr-Concurrency
|
||
|
* - "The Close and WriteControl methods can be called concurrently with all other methods."
|
||
|
*/
|
||
|
|
||
|
/**
|
||
|
* References for the "WebsocketStdCloseCode"s. Note that we're using some "CustomCloseCode"s here as well.
|
||
|
*
|
||
|
* - https://tools.ietf.org/html/rfc6455#section-7.4
|
||
|
* - https://godoc.org/github.com/gorilla/websocket#pkg-constants.
|
||
|
*/
|
||
|
closeMessage := websocket.FormatCloseMessage(customRetCode, customRetMsg)
|
||
|
err := conn.WriteControl(websocket.CloseMessage, closeMessage, time.Now().Add(time.Millisecond*(ConstVals.Ws.WillKickIfInactiveFor)))
|
||
|
if err != nil {
|
||
|
Logger.Error("Unable to send the CloseFrame control message to player(client-side):", zap.Any("playerId", playerId), zap.Error(err))
|
||
|
}
|
||
|
|
||
|
time.AfterFunc(3*time.Second, func() {
|
||
|
// To actually terminates the underlying TCP connection which might be in `CLOSE_WAIT` state if inspected by `netstat`.
|
||
|
conn.Close()
|
||
|
})
|
||
|
}
|
||
|
|
||
|
onReceivedCloseMessageFromClient := func(code int, text string) error {
|
||
|
Logger.Warn("Triggered `onReceivedCloseMessageFromClient`:", zap.Any("code", code), zap.Any("playerId", playerId), zap.Any("message", text))
|
||
|
signalToCloseConnOfThisPlayer(code, text)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* - "SetCloseHandler sets the handler for close messages received from the peer."
|
||
|
*
|
||
|
* - "The default close handler sends a close message back to the peer."
|
||
|
*
|
||
|
* - "The connection read methods return a CloseError when a close message is received. Most applications should handle close messages as part of their normal error handling. Applications should only set a close handler when the application must perform some action before sending a close message back to the peer."
|
||
|
*
|
||
|
* from reference https://godoc.org/github.com/gorilla/websocket#Conn.SetCloseHandler.
|
||
|
*/
|
||
|
conn.SetCloseHandler(onReceivedCloseMessageFromClient)
|
||
|
|
||
|
pPlayer, err := models.GetPlayerById(playerId)
|
||
|
|
||
|
if nil != err || nil == pPlayer {
|
||
|
// TODO: Abort with specific message.
|
||
|
signalToCloseConnOfThisPlayer(Constants.RetCode.PlayerNotFound, "")
|
||
|
}
|
||
|
|
||
|
Logger.Info("Player has logged in and its profile is found from persistent storage:", zap.Any("playerId", playerId), zap.Any("play", pPlayer))
|
||
|
|
||
|
// Find a room to join.
|
||
|
Logger.Info("About to acquire RoomHeapMux for player:", zap.Any("playerId", playerId))
|
||
|
(*(models.RoomHeapMux)).Lock()
|
||
|
defer func() {
|
||
|
(*(models.RoomHeapMux)).Unlock()
|
||
|
Logger.Info("Released RoomHeapMux for player:", zap.Any("playerId", playerId))
|
||
|
}()
|
||
|
defer func() {
|
||
|
if r := recover(); r != nil {
|
||
|
Logger.Error("Recovered from: ", zap.Any("panic", r))
|
||
|
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, "")
|
||
|
}
|
||
|
}()
|
||
|
Logger.Info("Acquired RoomHeapMux for player:", zap.Any("playerId", playerId))
|
||
|
// Logger.Info("The RoomHeapManagerIns has:", zap.Any("addr", fmt.Sprintf("%p", models.RoomHeapManagerIns)), zap.Any("size", len(*(models.RoomHeapManagerIns))))
|
||
|
playerSuccessfullyAddedToRoom := false
|
||
|
if 0 < boundRoomId {
|
||
|
if tmpPRoom, existent := (*models.RoomMapManagerIns)[int32(boundRoomId)]; existent {
|
||
|
pRoom = tmpPRoom
|
||
|
Logger.Info("Successfully got:\n", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Any("forBoundRoomId", boundRoomId))
|
||
|
res := pRoom.ReAddPlayerIfPossible(pPlayer, conn, signalToCloseConnOfThisPlayer)
|
||
|
if !res {
|
||
|
Logger.Warn("Failed to get:\n", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Any("forBoundRoomId", boundRoomId))
|
||
|
} else {
|
||
|
playerSuccessfullyAddedToRoom = true
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if 0 < expectRoomId {
|
||
|
if tmpRoom, existent := (*models.RoomMapManagerIns)[int32(expectRoomId)]; existent {
|
||
|
pRoom = tmpRoom
|
||
|
Logger.Info("Successfully got:\n", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Any("forExpectedRoomId", expectRoomId))
|
||
|
|
||
|
if pRoom.ReAddPlayerIfPossible(pPlayer, conn, signalToCloseConnOfThisPlayer) {
|
||
|
playerSuccessfullyAddedToRoom = true
|
||
|
} else if pRoom.AddPlayerIfPossible(pPlayer, conn, signalToCloseConnOfThisPlayer) {
|
||
|
playerSuccessfullyAddedToRoom = true
|
||
|
} else {
|
||
|
Logger.Warn("Failed to get:\n", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Any("forExpectedRoomId", expectRoomId))
|
||
|
playerSuccessfullyAddedToRoom = false
|
||
|
}
|
||
|
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if false == playerSuccessfullyAddedToRoom {
|
||
|
defer func() {
|
||
|
if pRoom != nil {
|
||
|
heap.Push(models.RoomHeapManagerIns, pRoom)
|
||
|
(models.RoomHeapManagerIns).Update(pRoom, pRoom.Score)
|
||
|
}
|
||
|
(models.RoomHeapManagerIns).PrintInOrder()
|
||
|
}()
|
||
|
tmpRoom, ok := heap.Pop(models.RoomHeapManagerIns).(*models.Room)
|
||
|
if !ok {
|
||
|
signalToCloseConnOfThisPlayer(Constants.RetCode.LocallyNoAvailableRoom, fmt.Sprintf("Cannot pop a (*Room) for playerId == %v!", playerId))
|
||
|
} else {
|
||
|
pRoom = tmpRoom
|
||
|
Logger.Info("Successfully popped:\n", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId))
|
||
|
res := pRoom.AddPlayerIfPossible(pPlayer, conn, signalToCloseConnOfThisPlayer)
|
||
|
if !res {
|
||
|
signalToCloseConnOfThisPlayer(Constants.RetCode.PlayerNotAddableToRoom, fmt.Sprintf("AddPlayerIfPossible returns false for roomId == %v, playerId == %v!", pRoom.Id, playerId))
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if swapped := atomic.CompareAndSwapInt32(pConnHasBeenSignaledToClose, 1, 1); swapped {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
if pThePlayer, ok := pRoom.Players[int32(playerId)]; ok && (models.PlayerBattleStateIns.ADDED_PENDING_BATTLE_COLLIDER_ACK == pThePlayer.BattleState || models.PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK == pThePlayer.BattleState) {
|
||
|
defer func() {
|
||
|
timeoutSeconds := time.Duration(5) * time.Second
|
||
|
time.AfterFunc(timeoutSeconds, func() {
|
||
|
if models.PlayerBattleStateIns.ADDED_PENDING_BATTLE_COLLIDER_ACK == pThePlayer.BattleState || models.PlayerBattleStateIns.READDED_PENDING_BATTLE_COLLIDER_ACK == pThePlayer.BattleState {
|
||
|
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, fmt.Sprintf("The expected Ack for BattleColliderInfo is not received in %s, for playerId == %v!", timeoutSeconds, playerId))
|
||
|
}
|
||
|
})
|
||
|
}()
|
||
|
|
||
|
playerBattleColliderInfo := models.ToPbStrToBattleColliderInfo(int32(Constants.Ws.IntervalToPing), int32(Constants.Ws.WillKickIfInactiveFor), pRoom.Id, pRoom.StageName, pRoom.RawBattleStrToVec2DListMap, pRoom.RawBattleStrToPolygon2DListMap, pRoom.StageDiscreteW, pRoom.StageDiscreteH, pRoom.StageTileW, pRoom.StageTileH)
|
||
|
|
||
|
resp := &pb.WsResp{
|
||
|
Ret: int32(Constants.RetCode.Ok),
|
||
|
EchoedMsgId: int32(0),
|
||
|
Act: models.DOWNSYNC_MSG_ACT_HB_REQ,
|
||
|
BciFrame: playerBattleColliderInfo,
|
||
|
}
|
||
|
|
||
|
// Logger.Info("Sending downsync HeartbeatRequirements:", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Any("resp", resp))
|
||
|
|
||
|
theBytes, marshalErr := proto.Marshal(resp)
|
||
|
if nil != marshalErr {
|
||
|
Logger.Error("Error marshalling HeartbeatRequirements:", zap.Any("the error", marshalErr), zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId))
|
||
|
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, fmt.Sprintf("Error marshalling HeartbeatRequirements, playerId == %v and roomId == %v!", playerId, pRoom.Id))
|
||
|
}
|
||
|
|
||
|
if err := conn.WriteMessage(websocket.BinaryMessage, theBytes); nil != err {
|
||
|
Logger.Error("HeartbeatRequirements resp not written:", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Error(err))
|
||
|
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, fmt.Sprintf("HeartbeatRequirements resp not written to roomId=%v, playerId == %v!", pRoom.Id, playerId))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
TODO
|
||
|
|
||
|
Is there a way to EXPLICITLY make this "receivingLoopAgainstPlayer/conn.ReadXXX(...)" edge-triggered or yield/park otherwise? For example a C-style equivalent would be as follows.
|
||
|
|
||
|
```
|
||
|
receivingLoopAgainstPlayer := func() error {
|
||
|
defer func() {
|
||
|
if r := recover(); r != nil {
|
||
|
Logger.Warn("Goroutine `receivingLoopAgainstPlayer`, recovery spot#1, recovered from: ", zap.Any("panic", r))
|
||
|
}
|
||
|
Logger.Info("Goroutine `receivingLoopAgainstPlayer` is stopped for:", zap.Any("playerId", playerId), zap.Any("roomId", pRoom.Id))
|
||
|
}()
|
||
|
|
||
|
// Set O_NONBLOCK on "fdOfThisConn".
|
||
|
int flags = fcntl(fdOfThisConn, F_GETFL, 0);
|
||
|
fcntl(fdOfThisConn, F_SETFL, flags | O_NONBLOCK);
|
||
|
|
||
|
int ep_fd = epoll_create1(0);
|
||
|
epoll_event ev;
|
||
|
ev.data.fd = fdOfThisConn;
|
||
|
ev.events = (EPOLLIN | EPOLLET | CUSTOM_SIGNAL_TO_CLOSE); // Is this possible?
|
||
|
epoll_ctl(ep_fd, EPOLL_CTL_ADD, fdOfThisConn, &ev);
|
||
|
epoll_event *evs = (epoll_event*)calloc(MAXEVENTS, sizeof(epoll_event));
|
||
|
|
||
|
bool localAwarenessOfSignaledToClose = false;
|
||
|
|
||
|
while(true) {
|
||
|
if (true == localAwarenessOfSignaledToClose) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
// Would yield the current KernelThread and park it to a "queue" for later being unparked from the same "queue", thus resumed running. See http://web.stanford.edu/~hhli/CS110Notes/CS110NotesCollection/Topic%204%20Networking%20(5).html for more information. However, multiple "goroutine"s might share a same KernelThread and could be an issue for yielding.
|
||
|
int n = epoll_wait(ep_fd, evs, MAXEVENTS, -1);
|
||
|
|
||
|
for (int i = 0; i < n; ++i) {
|
||
|
if (evs[i].data.fd == fdOfThisConn) {
|
||
|
if (
|
||
|
(evs[i].events & EPOLLERR) ||
|
||
|
(evs[i].events & EPOLLHUP) ||
|
||
|
(evs[i].events & CUSTOM_SIGNAL_TO_CLOSE)
|
||
|
) {
|
||
|
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, "")
|
||
|
localAwarenessOfSignaledToClose = true;
|
||
|
break;
|
||
|
}
|
||
|
int nbytes = 0;
|
||
|
while(nbytes = recv(fdOfThisConn, buff, sizeof(buff)) && 0 < nbytes) {
|
||
|
...
|
||
|
}
|
||
|
// Now that "0 == nbytes" or "EWOULDBLOCK == nbytes" or other errors came up.
|
||
|
continue;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
```
|
||
|
-- YFLu, 2020-07-03
|
||
|
*/
|
||
|
|
||
|
// Starts the receiving loop against the client-side
|
||
|
receivingLoopAgainstPlayer := func() error {
|
||
|
defer func() {
|
||
|
if r := recover(); r != nil {
|
||
|
Logger.Warn("Goroutine `receivingLoopAgainstPlayer`, recovery spot#1, recovered from: ", zap.Any("panic", r))
|
||
|
}
|
||
|
Logger.Info("Goroutine `receivingLoopAgainstPlayer` is stopped for:", zap.Any("playerId", playerId), zap.Any("roomId", pRoom.Id))
|
||
|
}()
|
||
|
for {
|
||
|
if swapped := atomic.CompareAndSwapInt32(pConnHasBeenSignaledToClose, 1, 1); swapped {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Tries to receive from client-side in a non-blocking manner.
|
||
|
_, bytes, err := conn.ReadMessage()
|
||
|
if nil != err {
|
||
|
Logger.Error("About to `signalToCloseConnOfThisPlayer`", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Error(err))
|
||
|
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, "")
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
pReq := new(pb.WsReq)
|
||
|
unmarshalErr := proto.Unmarshal(bytes, pReq)
|
||
|
if nil != unmarshalErr {
|
||
|
Logger.Error("About to `signalToCloseConnOfThisPlayer`", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Error(unmarshalErr))
|
||
|
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, "")
|
||
|
}
|
||
|
|
||
|
// Logger.Info("Received request message from client", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Any("pReq", pReq))
|
||
|
|
||
|
switch pReq.Act {
|
||
|
case models.UPSYNC_MSG_ACT_HB_PING:
|
||
|
startOrFeedHeartbeatWatchdog(conn)
|
||
|
case models.UPSYNC_MSG_ACT_PLAYER_CMD:
|
||
|
startOrFeedHeartbeatWatchdog(conn)
|
||
|
pRoom.OnBattleCmdReceived(pReq)
|
||
|
case models.UPSYNC_MSG_ACT_PLAYER_COLLIDER_ACK:
|
||
|
res := pRoom.OnPlayerBattleColliderAcked(int32(playerId))
|
||
|
if false == res {
|
||
|
Logger.Error("About to `signalToCloseConnOfThisPlayer`", zap.Any("roomId", pRoom.Id), zap.Any("playerId", playerId), zap.Error(err))
|
||
|
signalToCloseConnOfThisPlayer(Constants.RetCode.UnknownError, "")
|
||
|
return nil
|
||
|
}
|
||
|
default:
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
startOrFeedHeartbeatWatchdog(conn)
|
||
|
go receivingLoopAgainstPlayer()
|
||
|
}
|