diff --git a/server/server.go b/server/server.go index a348aad..a04af28 100644 --- a/server/server.go +++ b/server/server.go @@ -108,7 +108,6 @@ func (s *Server) handleWebSocket(c *gin.Context) { go func() { ticker := time.NewTicker(1 * time.Second) for range ticker.C { - streamer.GlobalStreamer.TruncateOutput() currentVideoPath, _ := streamer.GlobalStreamer.GetCurrentVideoPath() s.Broadcast(mywebsocket.Date{ Timestamp: time.Now().UnixMilli(), diff --git a/streamer/streamer.go b/streamer/streamer.go index 451a777..2033bf7 100644 --- a/streamer/streamer.go +++ b/streamer/streamer.go @@ -8,45 +8,55 @@ import ( "io" "live-streamer/config" "log" - "math" "os/exec" "strings" "sync" "time" ) -type Streamer struct { - videoList []config.InputItem +type playState struct { currentVideoIndex int + manualControl bool cmd *exec.Cmd ctx context.Context cancel context.CancelFunc - output strings.Builder - manualControl bool - mu sync.Mutex +} + +type Streamer struct { + playStateMu sync.RWMutex + playState playState + + videoMu sync.RWMutex + videoList []config.InputItem + + outputMu sync.RWMutex + output strings.Builder } var GlobalStreamer *Streamer func NewStreamer(videoList []config.InputItem) *Streamer { GlobalStreamer = &Streamer{ - videoList: videoList, - currentVideoIndex: 0, - cmd: nil, - ctx: nil, + videoList: videoList, + playState: playState{}, + output: strings.Builder{}, } return GlobalStreamer } func (s *Streamer) start() { - s.mu.Lock() - s.ctx, s.cancel = context.WithCancel(context.Background()) - currentVideo := s.videoList[s.currentVideoIndex] + s.playStateMu.Lock() + s.playState.ctx, s.playState.cancel = context.WithCancel(context.Background()) + currentVideo := s.videoList[s.playState.currentVideoIndex] videoPath := currentVideo.Path - s.cmd = exec.CommandContext(s.ctx, "ffmpeg", s.buildFFmpegArgs(currentVideo)...) - s.mu.Unlock() + s.playState.cmd = exec.CommandContext(s.playState.ctx, "ffmpeg", s.buildFFmpegArgs(currentVideo)...) + cmd := s.playState.cmd + ctx := s.playState.ctx + s.playStateMu.Unlock() + s.writeOutput(fmt.Sprintln("start stream: ", videoPath)) - pipe, err := s.cmd.StderrPipe() + + pipe, err := cmd.StderrPipe() if err != nil { log.Printf("failed to get pipe: %v", err) return @@ -54,25 +64,29 @@ func (s *Streamer) start() { reader := bufio.NewReader(pipe) - if err := s.cmd.Start(); err != nil { + if err := cmd.Start(); err != nil { s.writeOutput(fmt.Sprintf("starting ffmpeg error: %v\n", err)) return } go s.log(reader) - <-s.ctx.Done() + <-ctx.Done() s.writeOutput(fmt.Sprintf("stop stream: %s\n", videoPath)) - if s.manualControl { - s.manualControl = false + s.playStateMu.Lock() + if s.playState.manualControl { + // manualing change video, don't increase currentVideoIndex + s.playState.manualControl = false } else { - // stream next video - s.currentVideoIndex++ - if s.currentVideoIndex >= len(s.videoList) { - s.currentVideoIndex = 0 + s.playState.currentVideoIndex++ + s.videoMu.RLock() + if s.playState.currentVideoIndex >= len(s.videoList) { + s.playState.currentVideoIndex = 0 } + s.videoMu.RUnlock() } + s.playStateMu.Unlock() } func (s *Streamer) Stream() { @@ -86,83 +100,123 @@ func (s *Streamer) Stream() { } func (s *Streamer) Stop() { - s.mu.Lock() - defer s.mu.Unlock() - if s.cancel != nil { - stopped := make(chan error) - go func() { - if s.cmd != nil { - stopped <- s.cmd.Wait() - } - }() - s.cancel() - if s.cmd != nil && s.cmd.Process != nil { - select { - case <-stopped: - break - case <-time.After(3 * time.Second): - _ = s.cmd.Process.Kill() - break - } - s.cmd = nil + s.playStateMu.Lock() + cancel := s.playState.cancel + cmd := s.playState.cmd + s.playState.cancel = nil + s.playState.cmd = nil + s.playStateMu.Unlock() + + if cancel == nil || cmd == nil { + return + } + + stopped := make(chan error, 1) + go func() { + if cmd.Process != nil { + stopped <- cmd.Wait() + } else { + stopped <- nil } + }() + + cancel() + + if cmd.Process != nil { + select { + case <-stopped: + case <-time.After(3 * time.Second): + _ = cmd.Process.Kill() + } + close(stopped) } } -func (s *Streamer) writeOutput(str string) { - s.mu.Lock() - defer s.mu.Unlock() - s.output.WriteString(str) -} - func (s *Streamer) Add(videoPath string) { - s.mu.Lock() - defer s.mu.Unlock() + s.videoMu.Lock() + defer s.videoMu.Unlock() s.videoList = append(s.videoList, config.InputItem{Path: videoPath}) } func (s *Streamer) Remove(videoPath string) { - s.mu.Lock() - defer s.mu.Unlock() + var needStop bool // removed video is current playing + var removeIndex int = -1 + + s.videoMu.Lock() for i, item := range s.videoList { if item.Path == videoPath { - s.videoList = append(s.videoList[:i], s.videoList[i+1:]...) - if s.currentVideoIndex >= len(s.videoList) { - s.currentVideoIndex = 0 - } - if s.currentVideoIndex == i { - s.Stop() - } + removeIndex = i + + s.playStateMu.RLock() + needStop = (s.playState.currentVideoIndex == i) + s.playStateMu.RUnlock() + break } } + + if removeIndex >= 0 && removeIndex < len(s.videoList) { + oldLen := len(s.videoList) + s.videoList = append(s.videoList[:removeIndex], s.videoList[removeIndex+1:]...) + + s.playStateMu.Lock() + if s.playState.currentVideoIndex >= oldLen-1 { + s.playState.currentVideoIndex = 0 + } + s.playStateMu.Unlock() + } + s.videoMu.Unlock() + + if needStop { + s.Stop() + } } func (s *Streamer) Prev() { - s.mu.Lock() - s.manualControl = true - s.currentVideoIndex-- - if s.currentVideoIndex < 0 { - s.currentVideoIndex = len(s.videoList) - 1 + s.videoMu.RLock() + videoLen := len(s.videoList) + if videoLen == 0 { + return } - s.mu.Unlock() + s.videoMu.RUnlock() + + s.playStateMu.Lock() + s.playState.manualControl = true + s.playState.currentVideoIndex-- + if s.playState.currentVideoIndex < 0 { + s.playState.currentVideoIndex = videoLen - 1 + } + s.playStateMu.Unlock() + s.Stop() } func (s *Streamer) Next() { - s.mu.Lock() - s.manualControl = true - s.currentVideoIndex++ - if s.currentVideoIndex >= len(s.videoList) { - s.currentVideoIndex = 0 + s.videoMu.RLock() + videoLen := len(s.videoList) + if videoLen == 0 { + return } - s.mu.Unlock() + s.videoMu.RUnlock() + + s.playStateMu.Lock() + s.playState.manualControl = true + s.playState.currentVideoIndex++ + if s.playState.currentVideoIndex >= videoLen { + s.playState.currentVideoIndex = 0 + } + s.playStateMu.Unlock() + s.Stop() } func (s *Streamer) log(reader *bufio.Reader) { + s.playStateMu.RLock() + ctx := s.playState.ctx + s.playStateMu.RUnlock() + select { - case <-s.ctx.Done(): + case <-ctx.Done(): return default: if !config.GlobalConfig.Log.PlayState { @@ -187,23 +241,23 @@ func (s *Streamer) log(reader *bufio.Reader) { } func (s *Streamer) GetCurrentVideoPath() (string, error) { - s.mu.Lock() - defer s.mu.Unlock() + s.videoMu.RLock() + defer s.videoMu.RUnlock() if len(s.videoList) == 0 { return "", errors.New("no video streaming") } - return s.videoList[s.currentVideoIndex].Path, nil + return s.videoList[s.GetCurrentIndex()].Path, nil } func (s *Streamer) GetVideoList() []config.InputItem { - s.mu.Lock() - defer s.mu.Unlock() + s.videoMu.RLock() + defer s.videoMu.RUnlock() return s.videoList } func (s *Streamer) GetVideoListPath() []string { - s.mu.Lock() - defer s.mu.Unlock() + s.videoMu.RLock() + defer s.videoMu.RUnlock() var videoList []string for _, item := range s.videoList { videoList = append(videoList, item.Path) @@ -212,30 +266,23 @@ func (s *Streamer) GetVideoListPath() []string { } func (s *Streamer) GetCurrentIndex() int { - s.mu.Lock() - defer s.mu.Unlock() - return s.currentVideoIndex + s.playStateMu.RLock() + defer s.playStateMu.RUnlock() + return s.playState.currentVideoIndex +} + +func (s *Streamer) writeOutput(str string) { + s.outputMu.Lock() + defer s.outputMu.Unlock() + s.output.WriteString(str) } func (s *Streamer) GetOutput() string { - s.mu.Lock() - defer s.mu.Unlock() + s.outputMu.RLock() + defer s.outputMu.RUnlock() return s.output.String() } -func (s *Streamer) TruncateOutput() int { - s.mu.Lock() - defer s.mu.Unlock() - currentSize := s.output.Len() - if currentSize > math.MaxInt { - newStart := currentSize - math.MaxInt - trimmedOutput := s.output.String()[newStart:] - s.output.Reset() - s.output.WriteString(trimmedOutput) - } - return currentSize -} - func (s *Streamer) Close() { s.Stop() }