package server import ( "embed" "html/template" "live-streamer/streamer" "net/http" "os" "sync" "time" "github.com/gin-gonic/gin" uuid "github.com/gofrs/uuid/v5" "github.com/gorilla/websocket" "go.uber.org/zap" ) //go:embed static var staticFiles embed.FS var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } type Server struct { addr string clients map[string]*Client mu sync.Mutex option *Option log *zap.Logger streamer *streamer.Streamer } type Option struct { AuthToken string } type Client struct { id string conn *websocket.Conn mu sync.Mutex hasSentSize int } var ( GlobalServer *Server log *zap.Logger ) func NewServer(addr string, streamer *streamer.Streamer, log *zap.Logger, option *Option) { GlobalServer = &Server{ addr: addr, option: option, clients: make(map[string]*Client), log: log, streamer: streamer, } } func (s *Server) Run() { gin.SetMode(gin.ReleaseMode) router := gin.New() tpl, err := template.ParseFS(staticFiles, "static/*") if err != nil { log.Fatal("parsing templates error", zap.Error(err)) } router.SetHTMLTemplate(tpl) router.GET("/ws", s.AuthMiddleware(), s.handleWebSocket) router.GET( "/", func(c *gin.Context) { c.HTML(200, "index.html", nil) }, ) go func() { if err := router.Run(s.addr); err != nil { log.Fatal("starting server error", zap.Error(err)) } }() } func (s *Server) handleWebSocket(c *gin.Context) { ws, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { return } ws.SetCloseHandler(func(code int, text string) error { return nil }) id, err := uuid.NewV7() if err != nil { log.Error("generating uuid error", zap.Error(err)) return } client := &Client{id: id.String(), conn: ws, hasSentSize: 0} s.mu.Lock() s.clients[client.id] = client s.mu.Unlock() defer func() { client.mu.Lock() ws.Close() client.mu.Unlock() s.mu.Lock() delete(s.clients, client.id) s.mu.Unlock() if r := recover(); r != nil { log.Panic("webSocket handler panic", zap.Any("recover", r)) } }() go func() { ticker := time.NewTicker(1 * time.Second) for range ticker.C { s.Broadcast(Data{ Timestamp: time.Now().UnixMilli(), CurrentVideoPath: s.streamer.GetCurrentVideoPath(), VideoList: s.streamer.GetVideoListPath(), Output: s.streamer.GetOutput(), }) } }() for { // recive message client.mu.Lock() msg := Request{} err := ws.ReadJSON(&msg) client.mu.Unlock() if err != nil { break } s.RequestHandler(msg.Type) } } func (s *Server) AuthMiddleware() gin.HandlerFunc { return func(c *gin.Context) { if s.option.AuthToken == "" || c.Query("token") == s.option.AuthToken { c.Next() } else { c.AbortWithStatus(http.StatusUnauthorized) } } } func (s *Server) Broadcast(obj Data) { s.mu.Lock() for _, client := range s.clients { obj.Timestamp = time.Now().UnixMilli() if err := client.conn.WriteJSON(obj); err != nil { log.Error("websocket writing message error", zap.Error(err)) } } s.mu.Unlock() } func (s *Server) Single(userID string, obj Data) { s.mu.Lock() if client, ok := s.clients[userID]; ok { obj.Timestamp = time.Now().UnixMilli() if err := client.conn.WriteJSON(obj); err != nil { log.Error("websocket writing message error", zap.Error(err)) } } s.mu.Unlock() } func (s *Server) RequestHandler(reqType RequestType) { switch reqType { case TypeStreamNextVideo: s.streamer.Next() case TypeStreamPrevVideo: s.streamer.Prev() case TypeQuit: s.streamer.Close() os.Exit(0) } }