diff --git a/main.go b/main.go index d62d6c4..a69a599 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "live-streamer/utils" "log" "os" + "strings" "github.com/fsnotify/fsnotify" ) @@ -32,6 +33,12 @@ func input() { GlobalStreamer.Prev() case "next": GlobalStreamer.Next() + case "quit": + GlobalStreamer.Close() + os.Exit(0) + case "list": + list := GlobalStreamer.GetVideoListPath() + log.Println(strings.Join(list, "\n")) default: log.Println("unknown command") } diff --git a/streamer/streamer.go b/streamer/streamer.go index 6409221..3613119 100644 --- a/streamer/streamer.go +++ b/streamer/streamer.go @@ -11,6 +11,7 @@ import ( "os/exec" "path/filepath" "strings" + "sync" "time" ) @@ -21,6 +22,7 @@ type Streamer struct { logFile *os.File ctx context.Context cancel context.CancelFunc + mu sync.Mutex } func NewStreamer(videoList []config.InputItem) *Streamer { @@ -54,7 +56,7 @@ func (s *Streamer) Remove(videoPath string) { s.currentVideoIndex = 0 } if s.currentVideoIndex == i { - s.start() + s.Stop() } break } @@ -66,7 +68,7 @@ func (s *Streamer) Prev() { if s.currentVideoIndex < 0 { s.currentVideoIndex = len(s.videoList) - 1 } - s.start() + s.Stop() } func (s *Streamer) Next() { @@ -74,7 +76,7 @@ func (s *Streamer) Next() { if s.currentVideoIndex >= len(s.videoList) { s.currentVideoIndex = 0 } - s.start() + s.Stop() } func (s *Streamer) Stream() { @@ -83,9 +85,6 @@ func (s *Streamer) Stream() { time.Sleep(time.Second) continue } - if s.currentVideoIndex >= len(s.videoList) { - s.currentVideoIndex = 0 - } s.start() } } @@ -140,7 +139,9 @@ func (s *Streamer) start() { videoPath := currentVideo.Path log.Println("start stream: ", videoPath) + s.mu.Lock() s.cmd = exec.CommandContext(s.ctx, "ffmpeg", s.buildFFmpegArgs(currentVideo)...) + s.mu.Unlock() pipe, err := s.cmd.StderrPipe() if err != nil { @@ -158,25 +159,33 @@ func (s *Streamer) start() { go s.log(reader, writer) - select { - case <-s.ctx.Done(): - log.Println("case <-s.ctx.Done") - if s.cmd != nil && s.cmd.Process != nil { - _ = s.cmd.Process.Kill() - s.cmd = nil - } - case err := <-waitCmd(s.cmd): - log.Println("case err := <-waitCmd(s.cmd)") - if err != nil { - log.Printf("ffmpeg exited with error: %v", err) - } - s.Next() - } + <-s.ctx.Done() + log.Printf("stop stream: %s", videoPath) } func (s *Streamer) Stop() { if s.cancel != nil { + done := make(chan error) + go func() { + done <- s.cmd.Wait() + }() + s.cancel() + s.mu.Lock() + if s.cmd != nil && s.cmd.Process != nil { + select { + case <-done: + break + case <-time.After(3 * time.Second): + // log.Printf("ffmpeg process is still running, killing it...\n") + if !s.cmd.ProcessState.Exited() { + _ = s.cmd.Process.Kill() + } + break + } + s.cmd = nil + } + s.mu.Unlock() } } @@ -184,23 +193,28 @@ func (s *Streamer) GetCurrentVideo() string { return s.videoList[s.currentVideoIndex].Path } -func (s *Streamer) GetPlaylist() []config.InputItem { +func (s *Streamer) GetVideoList() []config.InputItem { return s.videoList } +func (s *Streamer) GetVideoListPath() []string { + var videoList []string + for _, item := range s.videoList { + videoList = append(videoList, item.Path) + } + return videoList +} + +func (s *Streamer) GetCurrentIndex() int { + return s.currentVideoIndex +} + func (s *Streamer) Close() { if s.logFile != nil { s.logFile.Close() s.logFile = nil } -} - -func waitCmd(cmd *exec.Cmd) <-chan error { - ch := make(chan error, 1) - go func() { - ch <- cmd.Wait() - }() - return ch + s.Stop() } func (s *Streamer) log(reader *bufio.Reader, writer *bufio.Writer) {