diff --git a/streamer/streamer.go b/streamer/streamer.go index 07ba1a0..b018a41 100644 --- a/streamer/streamer.go +++ b/streamer/streamer.go @@ -2,6 +2,7 @@ package streamer import ( "bufio" + "context" "fmt" "io" "live-streamer/config" @@ -10,22 +11,19 @@ import ( "os/exec" "path/filepath" "strings" - "sync" "time" ) type Streamer struct { - playlist []config.InputItem + videoList []config.InputItem currentVideoIndex int cmd *exec.Cmd - stopped bool logFile *os.File - doneCh chan bool - mu sync.Mutex - manualNext bool + ctx context.Context + cancel context.CancelFunc } -func NewStreamer(playList []config.InputItem) *Streamer { +func NewStreamer(videoList []config.InputItem) *Streamer { logDir := "logs" if err := os.MkdirAll(logDir, 0755); err != nil { log.Printf("Error creating log directory: %v\n", err) @@ -36,23 +34,23 @@ func NewStreamer(playList []config.InputItem) *Streamer { log.Fatalf("Error opening log file: %v\n", err) } return &Streamer{ - playlist: playList, + videoList: videoList, currentVideoIndex: 0, cmd: nil, logFile: logFile, - doneCh: make(chan bool, 1), + ctx: nil, } } func (s *Streamer) Add(videoPath string) { - s.playlist = append(s.playlist, config.InputItem{Path: videoPath}) + s.videoList = append(s.videoList, config.InputItem{Path: videoPath}) } func (s *Streamer) Remove(videoPath string) { - for i, item := range s.playlist { + for i, item := range s.videoList { if item.Path == videoPath { - s.playlist = append(s.playlist[:i], s.playlist[i+1:]...) - if s.currentVideoIndex >= len(s.playlist) { + s.videoList = append(s.videoList[:i], s.videoList[i+1:]...) + if s.currentVideoIndex >= len(s.videoList) { s.currentVideoIndex = 0 } if s.currentVideoIndex == i { @@ -66,15 +64,14 @@ func (s *Streamer) Remove(videoPath string) { func (s *Streamer) Prev() { s.currentVideoIndex-- if s.currentVideoIndex < 0 { - s.currentVideoIndex = len(s.playlist) - 1 + s.currentVideoIndex = len(s.videoList) - 1 } s.start() } func (s *Streamer) Next() { - s.manualNext = true s.currentVideoIndex++ - if s.currentVideoIndex >= len(s.playlist) { + if s.currentVideoIndex >= len(s.videoList) { s.currentVideoIndex = 0 } s.start() @@ -82,11 +79,11 @@ func (s *Streamer) Next() { func (s *Streamer) Stream() { for { - if len(s.playlist) == 0 { + if len(s.videoList) == 0 { time.Sleep(time.Second) continue } - if s.currentVideoIndex >= len(s.playlist) { + if s.currentVideoIndex >= len(s.videoList) { s.currentVideoIndex = 0 } s.start() @@ -135,21 +132,15 @@ func (s *Streamer) buildFFmpegArgs(videoItem config.InputItem) []string { } func (s *Streamer) start() { - defer func() { - s.cmd = nil - }() - if s.cmd != nil && !s.stopped { - s.Stop() - } - s.mu.Lock() - s.stopped = false - s.mu.Unlock() + s.Stop() - currentVideo := s.playlist[s.currentVideoIndex] + s.ctx, s.cancel = context.WithCancel(context.Background()) + + currentVideo := s.videoList[s.currentVideoIndex] videoPath := currentVideo.Path log.Println("start stream: ", videoPath) - s.cmd = exec.Command("ffmpeg", s.buildFFmpegArgs(currentVideo)...) + s.cmd = exec.CommandContext(s.ctx, "ffmpeg", s.buildFFmpegArgs(currentVideo)...) pipe, err := s.cmd.StderrPipe() if err != nil { @@ -165,72 +156,35 @@ func (s *Streamer) start() { return } - go func() { - defer func() { - if s.logFile != nil { - if err := s.logFile.Sync(); err != nil { - log.Printf("syncing log file error: %v\n", err) - } - } - }() - buf := make([]byte, 1024) - for { - n, err := reader.Read(buf) - if err != nil { - if err != io.EOF && !s.stopped { - log.Printf("reading ffmpeg error: %v\n", err) - } - break - } - if n > 0 { - timestamp := time.Now().Format("2006-01-02 15:04:05") - logLine := fmt.Sprintf("[%s] %s", timestamp, string(buf[:n])) - if s.logFile != nil { - if _, err := writer.WriteString(logLine); err != nil { - log.Printf("writing to log file error: %v\n", err) - } - if err := writer.Flush(); err != nil { - log.Printf("flushing writer error: %v\n", err) - } - } - } - } - }() + go s.log(reader, writer) - go func() { - err = s.cmd.Wait() - s.doneCh <- true - s.mu.Lock() - defer s.mu.Unlock() - if err != nil && !s.stopped { - log.Printf("ffmpeg streaming error: %v\nStart streaming next video\n", err) - s.Next() - } else { - log.Println("ffmpeg streaming stopped") - if !s.manualNext { - s.Next() - } + select { + case <-s.ctx.Done(): + log.Println("case <-s.ctx.Done") + if s.cmd != nil && s.cmd.Process != nil { + _ = s.cmd.Process.Kill() } - }() - - <-s.doneCh + case err := <-waitCmd(s.cmd): + log.Println("case err := <-waitCmd(s.cmd)") + if err != nil { + log.Printf("ffmpeg exited with error: %v", err) + } + s.Next() + } } func (s *Streamer) Stop() { - if s.cmd != nil && s.cmd.Process != nil { - s.stopped = true - _ = s.cmd.Process.Kill() - <-s.doneCh - s.cmd = nil + if s.cancel != nil { + s.cancel() } } func (s *Streamer) GetCurrentVideo() string { - return s.playlist[s.currentVideoIndex].Path + return s.videoList[s.currentVideoIndex].Path } func (s *Streamer) GetPlaylist() []config.InputItem { - return s.playlist + return s.videoList } func (s *Streamer) Close() { @@ -239,3 +193,43 @@ func (s *Streamer) Close() { s.logFile = nil } } + +func waitCmd(cmd *exec.Cmd) <-chan error { + ch := make(chan error, 1) + go func() { + ch <- cmd.Wait() + }() + return ch +} + +func (s *Streamer) log(reader *bufio.Reader, writer *bufio.Writer) { + defer func() { + if s.logFile != nil { + if err := s.logFile.Sync(); err != nil { + log.Printf("syncing log file error: %v\n", err) + } + } + }() + buf := make([]byte, 1024) + for { + n, err := reader.Read(buf) + if err != nil { + if err != io.EOF { + log.Printf("reading ffmpeg error: %v\n", err) + } + break + } + if n > 0 { + timestamp := time.Now().Format("2006-01-02 15:04:05") + logLine := fmt.Sprintf("[%s] %s", timestamp, string(buf[:n])) + if s.logFile != nil { + if _, err := writer.WriteString(logLine); err != nil { + log.Printf("writing to log file error: %v\n", err) + } + if err := writer.Flush(); err != nil { + log.Printf("flushing writer error: %v\n", err) + } + } + } + } +}