diff --git a/main.go b/main.go index 8a9bd89..ab2f142 100644 --- a/main.go +++ b/main.go @@ -1,32 +1,55 @@ package main import ( + "bufio" + "fmt" "live-streamer/config" "live-streamer/server" "live-streamer/streamer" "live-streamer/utils" - "live-streamer/websocket" "log" + "os" "github.com/fsnotify/fsnotify" ) var GlobalStreamer *streamer.Streamer -var outputer websocket.Outputer func main() { server.NewServer(":8080", websocketRequestHandler) server.GlobalServer.Run() - outputer = server.GlobalServer if !utils.HasFFMPEG() { log.Fatal("ffmpeg not found") } - GlobalStreamer = streamer.NewStreamer(config.GlobalConfig.VideoList, outputer) + GlobalStreamer = streamer.NewStreamer(config.GlobalConfig.VideoList) go startWatcher() + go input() GlobalStreamer.Stream() GlobalStreamer.Close() } +func input() { + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + line := scanner.Text() // 获取用户输入的内容 + switch line { + case "list": + fmt.Println(GlobalStreamer.GetVideoListPath()) + case "index": + fmt.Println(GlobalStreamer.GetCurrentIndex()) + case "next": + GlobalStreamer.Next() + case "prev": + GlobalStreamer.Prev() + case "quit": + GlobalStreamer.Close() + os.Exit(0) + case "current": + fmt.Println(GlobalStreamer.GetCurrentVideoPath()) + } + } +} + func startWatcher() { watcher, err := fsnotify.NewWatcher() if err != nil { @@ -56,11 +79,9 @@ func startWatcher() { if utils.IsSupportedVideo(event.Name) { log.Println("new video added:", event.Name) GlobalStreamer.Add(event.Name) - server.GlobalServer.Broadcast(websocket.MakeResponse(websocket.TypeAddVideo, true, event.Name, "")) } } if event.Op&fsnotify.Remove == fsnotify.Remove { - server.GlobalServer.Broadcast(websocket.MakeResponse(websocket.TypeRemoveVideo, true, event.Name, "")) log.Println("video removed:", event.Name) GlobalStreamer.Remove(event.Name) } diff --git a/server/server.go b/server/server.go index 1edbc80..a348aad 100644 --- a/server/server.go +++ b/server/server.go @@ -4,6 +4,7 @@ import ( "embed" "html/template" "live-streamer/config" + "live-streamer/streamer" mywebsocket "live-streamer/websocket" "log" "net/http" @@ -24,20 +25,20 @@ var upgrader = websocket.Upgrader{ }, } -type InputFunc func(mywebsocket.Response) +type InputFunc func(mywebsocket.RequestType) type Server struct { addr string dealInputFunc InputFunc clients map[string]*Client - historyOutput string mu sync.Mutex } type Client struct { - id string - conn *websocket.Conn - mu sync.Mutex + id string + conn *websocket.Conn + mu sync.Mutex + hasSentSize int } var GlobalServer *Server @@ -47,7 +48,6 @@ func NewServer(addr string, dealInputFunc InputFunc) { addr: addr, dealInputFunc: dealInputFunc, clients: make(map[string]*Client), - historyOutput: "", } } @@ -88,12 +88,10 @@ func (s *Server) handleWebSocket(c *gin.Context) { log.Printf("generating uuid error: %v", err) return } - client := &Client{id: id.String(), conn: ws} + client := &Client{id: id.String(), conn: ws, hasSentSize: 0} s.mu.Lock() s.clients[client.id] = client s.mu.Unlock() - // write history output - s.Single(client.id, mywebsocket.MakeOutput(s.historyOutput)) defer func() { client.mu.Lock() @@ -107,10 +105,24 @@ func (s *Server) handleWebSocket(c *gin.Context) { } }() + go func() { + ticker := time.NewTicker(1 * time.Second) + for range ticker.C { + streamer.GlobalStreamer.TruncateOutput() + currentVideoPath, _ := streamer.GlobalStreamer.GetCurrentVideoPath() + s.Broadcast(mywebsocket.Date{ + Timestamp: time.Now().UnixMilli(), + CurrentVideoPath: currentVideoPath, + VideoList: streamer.GlobalStreamer.GetVideoListPath(), + Output: streamer.GlobalStreamer.GetOutput(), + }) + } + }() + for { // recive message client.mu.Lock() - msg := mywebsocket.Response{} + msg := mywebsocket.Request{} err := ws.ReadJSON(&msg) client.mu.Unlock() if err != nil { @@ -119,7 +131,7 @@ func (s *Server) handleWebSocket(c *gin.Context) { } break } - s.dealInputFunc(msg) + s.dealInputFunc(msg.Type) } } @@ -134,13 +146,9 @@ func AuthMiddleware() gin.HandlerFunc { } } -func (s *Server) Broadcast(obj mywebsocket.Response) { +func (s *Server) Broadcast(obj mywebsocket.Date) { s.mu.Lock() - if obj.Type == mywebsocket.TypeOutput { - s.historyOutput += obj.Data.(string) - } for _, client := range s.clients { - obj.UserID = client.id obj.Timestamp = time.Now().UnixMilli() if err := client.conn.WriteJSON(obj); err != nil { log.Printf("websocket writing message error: %v", err) @@ -149,10 +157,9 @@ func (s *Server) Broadcast(obj mywebsocket.Response) { s.mu.Unlock() } -func (s *Server) Single(userID string, obj mywebsocket.Response) { +func (s *Server) Single(userID string, obj mywebsocket.Date) { s.mu.Lock() if client, ok := s.clients[userID]; ok { - obj.UserID = userID obj.Timestamp = time.Now().UnixMilli() if err := client.conn.WriteJSON(obj); err != nil { log.Printf("websocket writing message error: %v", err) diff --git a/server/static/index.html b/server/static/index.html index cecdfc8..bbe0540 100644 --- a/server/static/index.html +++ b/server/static/index.html @@ -367,117 +367,63 @@ diff --git a/streamer/streamer.go b/streamer/streamer.go index 7f42403..451a777 100644 --- a/streamer/streamer.go +++ b/streamer/streamer.go @@ -7,8 +7,8 @@ import ( "fmt" "io" "live-streamer/config" - "live-streamer/websocket" "log" + "math" "os/exec" "strings" "sync" @@ -21,23 +21,60 @@ type Streamer struct { cmd *exec.Cmd ctx context.Context cancel context.CancelFunc + output strings.Builder + manualControl bool mu sync.Mutex - outputer websocket.Outputer } var GlobalStreamer *Streamer -func NewStreamer(videoList []config.InputItem, outputer websocket.Outputer) *Streamer { +func NewStreamer(videoList []config.InputItem) *Streamer { GlobalStreamer = &Streamer{ videoList: videoList, currentVideoIndex: 0, cmd: nil, ctx: nil, - outputer: outputer, } return GlobalStreamer } +func (s *Streamer) start() { + s.mu.Lock() + s.ctx, s.cancel = context.WithCancel(context.Background()) + currentVideo := s.videoList[s.currentVideoIndex] + videoPath := currentVideo.Path + s.cmd = exec.CommandContext(s.ctx, "ffmpeg", s.buildFFmpegArgs(currentVideo)...) + s.mu.Unlock() + s.writeOutput(fmt.Sprintln("start stream: ", videoPath)) + pipe, err := s.cmd.StderrPipe() + if err != nil { + log.Printf("failed to get pipe: %v", err) + return + } + + reader := bufio.NewReader(pipe) + + if err := s.cmd.Start(); err != nil { + s.writeOutput(fmt.Sprintf("starting ffmpeg error: %v\n", err)) + return + } + + go s.log(reader) + + <-s.ctx.Done() + s.writeOutput(fmt.Sprintf("stop stream: %s\n", videoPath)) + + if s.manualControl { + s.manualControl = false + } else { + // stream next video + s.currentVideoIndex++ + if s.currentVideoIndex >= len(s.videoList) { + s.currentVideoIndex = 0 + } + } +} + func (s *Streamer) Stream() { for { if len(s.videoList) == 0 { @@ -48,52 +85,17 @@ func (s *Streamer) Stream() { } } -func (s *Streamer) start() { - s.Stop() - - s.ctx, s.cancel = context.WithCancel(context.Background()) - - currentVideo := s.videoList[s.currentVideoIndex] - videoPath := currentVideo.Path - s.outputer.Broadcast(websocket.MakeOutput(fmt.Sprint("start stream: ", videoPath))) - - s.mu.Lock() - s.cmd = exec.CommandContext(s.ctx, "ffmpeg", s.buildFFmpegArgs(currentVideo)...) - s.mu.Unlock() - - pipe, err := s.cmd.StderrPipe() - if err != nil { - log.Printf("failed to get pipe: %v", err) - return - } - - reader := bufio.NewReader(pipe) - - if err := s.cmd.Start(); err != nil { - s.outputer.Broadcast(websocket.MakeOutput(fmt.Sprintf("starting ffmpeg error: %v\n", err))) - return - } - - go s.log(reader) - - <-s.ctx.Done() - s.outputer.Broadcast(websocket.MakeOutput(fmt.Sprintf("stop stream: %s", videoPath))) - - // stream next video - s.currentVideoIndex++ - if s.currentVideoIndex >= len(s.videoList) { - s.currentVideoIndex = 0 - } -} - func (s *Streamer) Stop() { + s.mu.Lock() + defer s.mu.Unlock() if s.cancel != nil { stopped := make(chan error) go func() { - stopped <- s.cmd.Wait() + if s.cmd != nil { + stopped <- s.cmd.Wait() + } }() s.cancel() - s.mu.Lock() if s.cmd != nil && s.cmd.Process != nil { select { case <-stopped: @@ -104,15 +106,24 @@ func (s *Streamer) Stop() { } s.cmd = nil } - s.mu.Unlock() } } +func (s *Streamer) writeOutput(str string) { + s.mu.Lock() + defer s.mu.Unlock() + s.output.WriteString(str) +} + func (s *Streamer) Add(videoPath string) { + s.mu.Lock() + defer s.mu.Unlock() s.videoList = append(s.videoList, config.InputItem{Path: videoPath}) } func (s *Streamer) Remove(videoPath string) { + s.mu.Lock() + defer s.mu.Unlock() for i, item := range s.videoList { if item.Path == videoPath { s.videoList = append(s.videoList[:i], s.videoList[i+1:]...) @@ -128,18 +139,24 @@ func (s *Streamer) Remove(videoPath string) { } func (s *Streamer) Prev() { + s.mu.Lock() + s.manualControl = true s.currentVideoIndex-- if s.currentVideoIndex < 0 { s.currentVideoIndex = len(s.videoList) - 1 } + s.mu.Unlock() s.Stop() } func (s *Streamer) Next() { + s.mu.Lock() + s.manualControl = true s.currentVideoIndex++ if s.currentVideoIndex >= len(s.videoList) { s.currentVideoIndex = 0 } + s.mu.Unlock() s.Stop() } @@ -157,11 +174,11 @@ func (s *Streamer) log(reader *bufio.Reader) { if n > 0 { videoPath, _ := s.GetCurrentVideoPath() buf = append([]byte(videoPath), buf...) - s.outputer.Broadcast(websocket.MakeOutput(string(buf[:n+len(videoPath)]))) + s.writeOutput(string(buf[:n+len(videoPath)])) } if err != nil { if err != io.EOF { - s.outputer.Broadcast(websocket.MakeOutput(fmt.Sprintf("reading ffmpeg output error: %v\n", err))) + s.writeOutput(fmt.Sprintf("reading ffmpeg output error: %v\n", err)) } break } @@ -169,6 +186,60 @@ func (s *Streamer) log(reader *bufio.Reader) { } } +func (s *Streamer) GetCurrentVideoPath() (string, error) { + s.mu.Lock() + defer s.mu.Unlock() + if len(s.videoList) == 0 { + return "", errors.New("no video streaming") + } + return s.videoList[s.currentVideoIndex].Path, nil +} + +func (s *Streamer) GetVideoList() []config.InputItem { + s.mu.Lock() + defer s.mu.Unlock() + return s.videoList +} + +func (s *Streamer) GetVideoListPath() []string { + s.mu.Lock() + defer s.mu.Unlock() + var videoList []string + for _, item := range s.videoList { + videoList = append(videoList, item.Path) + } + return videoList +} + +func (s *Streamer) GetCurrentIndex() int { + s.mu.Lock() + defer s.mu.Unlock() + return s.currentVideoIndex +} + +func (s *Streamer) GetOutput() string { + s.mu.Lock() + defer s.mu.Unlock() + return s.output.String() +} + +func (s *Streamer) TruncateOutput() int { + s.mu.Lock() + defer s.mu.Unlock() + currentSize := s.output.Len() + if currentSize > math.MaxInt { + newStart := currentSize - math.MaxInt + trimmedOutput := s.output.String()[newStart:] + s.output.Reset() + s.output.WriteString(trimmedOutput) + } + return currentSize +} + +func (s *Streamer) Close() { + s.Stop() +} + func (s *Streamer) buildFFmpegArgs(videoItem config.InputItem) []string { videoPath := videoItem.Path @@ -209,30 +280,3 @@ func (s *Streamer) buildFFmpegArgs(videoItem config.InputItem) []string { return args } - -func (s *Streamer) GetCurrentVideoPath() (string, error) { - if len(s.videoList) == 0 { - return "", errors.New("no video streaming") - } - return s.videoList[s.currentVideoIndex].Path, nil -} - -func (s *Streamer) GetVideoList() []config.InputItem { - return s.videoList -} - -func (s *Streamer) GetVideoListPath() []string { - var videoList []string - for _, item := range s.videoList { - videoList = append(videoList, item.Path) - } - return videoList -} - -func (s *Streamer) GetCurrentIndex() int { - return s.currentVideoIndex -} - -func (s *Streamer) Close() { - s.Stop() -} diff --git a/websocket/outputer.go b/websocket/outputer.go deleted file mode 100644 index 08fa228..0000000 --- a/websocket/outputer.go +++ /dev/null @@ -1,6 +0,0 @@ -package websocket - -type Outputer interface { - Broadcast(v Response) - Single(userID string, v Response) -} diff --git a/websocket/websocket.go b/websocket/websocket.go index 0517077..4866bd4 100644 --- a/websocket/websocket.go +++ b/websocket/websocket.go @@ -1,47 +1,20 @@ package websocket -type MessageType string +type RequestType string -var ( - TypeOutput MessageType = "Output" - TypeStreamNextVideo MessageType = "StreamNextVideo" - TypeStreamPrevVideo MessageType = "StreamPrevVideo" - TypeGetCurrentVideoPath MessageType = "GetCurrentVideoPath" - TypeGetVideoList MessageType = "GetVideoList" - TypeQuit MessageType = "Quit" - TypeRemoveVideo MessageType = "RemoveVideo" - TypeAddVideo MessageType = "AddVideo" +const ( + TypeStreamNextVideo RequestType = "StreamNextVideo" + TypeStreamPrevVideo RequestType = "StreamPrevVideo" + TypeQuit RequestType = "Quit" ) type Request struct { - Type MessageType `json:"type"` - Args []string `json:"args"` - UserID string `json:"user_id"` - Timestamp int64 `json:"timestamp"` + Type RequestType `json:"type"` } -type Response struct { - Type MessageType `json:"type"` - Success bool `json:"success"` - Data any `json:"data"` - Message string `json:"message"` - UserID string `json:"user_id"` - Timestamp int64 `json:"timestamp"` -} - -func MakeResponse(messageType MessageType, success bool, data any, message string) Response { - return Response{ - Type: messageType, - Success: success, - Data: data, - Message: message, - } -} - -func MakeOutput(output string) Response { - return Response{ - Success: true, - Type: TypeOutput, - Data: output, - } +type Date struct { + Timestamp int64 `json:"timestamp"` + CurrentVideoPath string `json:"currentVideoPath"` + VideoList []string `json:"videoList"` + Output string `json:"output"` } diff --git a/websocket_handler.go b/websocket_handler.go index 2829dc5..dbdf8f1 100644 --- a/websocket_handler.go +++ b/websocket_handler.go @@ -1,60 +1,18 @@ package main import ( - "live-streamer/server" "live-streamer/websocket" + "os" ) -func websocketRequestHandler(req websocket.Response) { - if req.UserID == "" { - return - } - var resp websocket.Response - switch websocket.MessageType(req.Type) { +func websocketRequestHandler(reqType websocket.RequestType) { + switch reqType { case websocket.TypeStreamNextVideo: GlobalStreamer.Next() - resp = websocket.Response{ - Type: websocket.TypeStreamNextVideo, - Success: true, - } - server.GlobalServer.Broadcast(resp) case websocket.TypeStreamPrevVideo: GlobalStreamer.Prev() - resp = websocket.Response{ - Type: websocket.TypeStreamPrevVideo, - Success: true, - } - server.GlobalServer.Broadcast(resp) - case websocket.TypeGetCurrentVideoPath: - videoPath, err := GlobalStreamer.GetCurrentVideoPath() - if err != nil { - resp = websocket.Response{ - Type: websocket.TypeGetCurrentVideoPath, - Success: false, - Message: err.Error(), - } - } else { - resp = websocket.Response{ - Type: websocket.TypeGetCurrentVideoPath, - Success: true, - Data: videoPath, - } - } - server.GlobalServer.Single(req.UserID, resp) - case websocket.TypeGetVideoList: - resp = websocket.Response{ - Type: websocket.TypeGetVideoList, - Success: true, - Data: GlobalStreamer.GetVideoListPath(), - } - server.GlobalServer.Single(req.UserID, resp) case websocket.TypeQuit: - server.GlobalServer.Close() GlobalStreamer.Close() - resp = websocket.Response{ - Type: websocket.TypeQuit, - Success: true, - } - server.GlobalServer.Broadcast(resp) + os.Exit(0) } }