live-streamer/server/server.go

187 lines
3.6 KiB
Go
Raw Permalink Normal View History

2024-10-23 10:47:37 -04:00
package server
import (
"embed"
"html/template"
2024-10-23 17:06:19 -04:00
"live-streamer/streamer"
2024-10-23 10:47:37 -04:00
"net/http"
"os"
2024-10-23 14:35:37 -04:00
"sync"
"time"
2024-10-23 10:47:37 -04:00
"github.com/gin-gonic/gin"
2024-10-23 14:35:37 -04:00
uuid "github.com/gofrs/uuid/v5"
2024-10-23 10:47:37 -04:00
"github.com/gorilla/websocket"
"go.uber.org/zap"
2024-10-23 10:47:37 -04:00
)
//go:embed static
var staticFiles embed.FS
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type Server struct {
2024-10-29 05:11:14 -04:00
addr string
clients map[string]*Client
mu sync.Mutex
option *Option
log *zap.Logger
streamer *streamer.Streamer
}
type Option struct {
AuthToken string
2024-10-23 10:47:37 -04:00
}
type Client struct {
2024-10-23 17:06:19 -04:00
id string
conn *websocket.Conn
mu sync.Mutex
hasSentSize int
2024-10-23 10:47:37 -04:00
}
var (
GlobalServer *Server
2024-10-29 05:11:14 -04:00
log *zap.Logger
)
2024-10-29 05:11:14 -04:00
func NewServer(addr string, streamer *streamer.Streamer, log *zap.Logger, option *Option) {
2024-10-23 10:47:37 -04:00
GlobalServer = &Server{
2024-10-29 05:11:14 -04:00
addr: addr,
option: option,
clients: make(map[string]*Client),
log: log,
streamer: streamer,
2024-10-23 10:47:37 -04:00
}
}
func (s *Server) Run() {
gin.SetMode(gin.ReleaseMode)
2024-10-23 14:35:37 -04:00
router := gin.New()
2024-10-23 10:47:37 -04:00
tpl, err := template.ParseFS(staticFiles, "static/*")
if err != nil {
log.Fatal("parsing templates error", zap.Error(err))
2024-10-23 10:47:37 -04:00
}
router.SetHTMLTemplate(tpl)
2024-10-29 05:11:14 -04:00
router.GET("/ws", s.AuthMiddleware(), s.handleWebSocket)
2024-10-23 10:47:37 -04:00
router.GET(
"/", func(c *gin.Context) {
c.HTML(200, "index.html", nil)
},
)
go func() {
if err := router.Run(s.addr); err != nil {
log.Fatal("starting server error", zap.Error(err))
2024-10-23 10:47:37 -04:00
}
}()
}
func (s *Server) handleWebSocket(c *gin.Context) {
ws, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
return
}
2024-10-23 14:35:37 -04:00
ws.SetCloseHandler(func(code int, text string) error {
return nil
})
id, err := uuid.NewV7()
if err != nil {
log.Error("generating uuid error", zap.Error(err))
2024-10-23 14:35:37 -04:00
return
}
2024-10-23 17:06:19 -04:00
client := &Client{id: id.String(), conn: ws, hasSentSize: 0}
2024-10-23 14:35:37 -04:00
s.mu.Lock()
s.clients[client.id] = client
s.mu.Unlock()
2024-10-23 10:47:37 -04:00
defer func() {
2024-10-23 14:35:37 -04:00
client.mu.Lock()
ws.Close()
client.mu.Unlock()
s.mu.Lock()
delete(s.clients, client.id)
s.mu.Unlock()
if r := recover(); r != nil {
log.Panic("webSocket handler panic", zap.Any("recover", r))
2024-10-23 10:47:37 -04:00
}
}()
2024-10-23 17:06:19 -04:00
go func() {
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
2024-10-29 05:11:14 -04:00
s.Broadcast(Data{
2024-10-23 17:06:19 -04:00
Timestamp: time.Now().UnixMilli(),
2024-10-29 05:11:14 -04:00
CurrentVideoPath: s.streamer.GetCurrentVideoPath(),
VideoList: s.streamer.GetVideoListPath(),
Output: s.streamer.GetOutput(),
2024-10-23 17:06:19 -04:00
})
}
}()
2024-10-23 10:47:37 -04:00
for {
// recive message
2024-10-23 14:35:37 -04:00
client.mu.Lock()
2024-10-29 05:11:14 -04:00
msg := Request{}
2024-10-23 14:35:37 -04:00
err := ws.ReadJSON(&msg)
client.mu.Unlock()
2024-10-23 10:47:37 -04:00
if err != nil {
break
}
2024-10-29 05:11:14 -04:00
s.RequestHandler(msg.Type)
2024-10-23 10:47:37 -04:00
}
}
2024-10-29 05:11:14 -04:00
func (s *Server) AuthMiddleware() gin.HandlerFunc {
2024-10-23 14:35:37 -04:00
return func(c *gin.Context) {
2024-10-29 05:11:14 -04:00
if s.option.AuthToken == "" ||
c.Query("token") == s.option.AuthToken {
2024-10-23 14:35:37 -04:00
c.Next()
} else {
c.AbortWithStatus(http.StatusUnauthorized)
}
}
2024-10-23 10:47:37 -04:00
}
2024-10-29 05:11:14 -04:00
func (s *Server) Broadcast(obj Data) {
2024-10-23 14:35:37 -04:00
s.mu.Lock()
for _, client := range s.clients {
obj.Timestamp = time.Now().UnixMilli()
if err := client.conn.WriteJSON(obj); err != nil {
log.Error("websocket writing message error", zap.Error(err))
2024-10-23 14:35:37 -04:00
}
}
s.mu.Unlock()
2024-10-23 10:47:37 -04:00
}
2024-10-29 05:11:14 -04:00
func (s *Server) Single(userID string, obj Data) {
2024-10-23 14:35:37 -04:00
s.mu.Lock()
if client, ok := s.clients[userID]; ok {
obj.Timestamp = time.Now().UnixMilli()
if err := client.conn.WriteJSON(obj); err != nil {
log.Error("websocket writing message error", zap.Error(err))
2024-10-23 14:35:37 -04:00
}
}
s.mu.Unlock()
2024-10-23 10:47:37 -04:00
}
2024-10-29 05:11:14 -04:00
func (s *Server) RequestHandler(reqType RequestType) {
switch reqType {
case TypeStreamNextVideo:
s.streamer.Next()
case TypeStreamPrevVideo:
s.streamer.Prev()
case TypeQuit:
s.streamer.Close()
os.Exit(0)
2024-10-29 05:11:14 -04:00
}
2024-10-23 10:47:37 -04:00
}