This commit is contained in:
Nite07 2024-10-23 16:37:43 +08:00
parent 1b0fb30743
commit 4b107ea59f
2 changed files with 50 additions and 29 deletions

View File

@ -7,6 +7,7 @@ import (
"live-streamer/utils" "live-streamer/utils"
"log" "log"
"os" "os"
"strings"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
) )
@ -32,6 +33,12 @@ func input() {
GlobalStreamer.Prev() GlobalStreamer.Prev()
case "next": case "next":
GlobalStreamer.Next() GlobalStreamer.Next()
case "quit":
GlobalStreamer.Close()
os.Exit(0)
case "list":
list := GlobalStreamer.GetVideoListPath()
log.Println(strings.Join(list, "\n"))
default: default:
log.Println("unknown command") log.Println("unknown command")
} }

View File

@ -11,6 +11,7 @@ import (
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"time" "time"
) )
@ -21,6 +22,7 @@ type Streamer struct {
logFile *os.File logFile *os.File
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
mu sync.Mutex
} }
func NewStreamer(videoList []config.InputItem) *Streamer { func NewStreamer(videoList []config.InputItem) *Streamer {
@ -54,7 +56,7 @@ func (s *Streamer) Remove(videoPath string) {
s.currentVideoIndex = 0 s.currentVideoIndex = 0
} }
if s.currentVideoIndex == i { if s.currentVideoIndex == i {
s.start() s.Stop()
} }
break break
} }
@ -66,7 +68,7 @@ func (s *Streamer) Prev() {
if s.currentVideoIndex < 0 { if s.currentVideoIndex < 0 {
s.currentVideoIndex = len(s.videoList) - 1 s.currentVideoIndex = len(s.videoList) - 1
} }
s.start() s.Stop()
} }
func (s *Streamer) Next() { func (s *Streamer) Next() {
@ -74,7 +76,7 @@ func (s *Streamer) Next() {
if s.currentVideoIndex >= len(s.videoList) { if s.currentVideoIndex >= len(s.videoList) {
s.currentVideoIndex = 0 s.currentVideoIndex = 0
} }
s.start() s.Stop()
} }
func (s *Streamer) Stream() { func (s *Streamer) Stream() {
@ -83,9 +85,6 @@ func (s *Streamer) Stream() {
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue
} }
if s.currentVideoIndex >= len(s.videoList) {
s.currentVideoIndex = 0
}
s.start() s.start()
} }
} }
@ -140,7 +139,9 @@ func (s *Streamer) start() {
videoPath := currentVideo.Path videoPath := currentVideo.Path
log.Println("start stream: ", videoPath) log.Println("start stream: ", videoPath)
s.mu.Lock()
s.cmd = exec.CommandContext(s.ctx, "ffmpeg", s.buildFFmpegArgs(currentVideo)...) s.cmd = exec.CommandContext(s.ctx, "ffmpeg", s.buildFFmpegArgs(currentVideo)...)
s.mu.Unlock()
pipe, err := s.cmd.StderrPipe() pipe, err := s.cmd.StderrPipe()
if err != nil { if err != nil {
@ -158,25 +159,33 @@ func (s *Streamer) start() {
go s.log(reader, writer) go s.log(reader, writer)
select { <-s.ctx.Done()
case <-s.ctx.Done(): log.Printf("stop stream: %s", videoPath)
log.Println("case <-s.ctx.Done")
if s.cmd != nil && s.cmd.Process != nil {
_ = s.cmd.Process.Kill()
s.cmd = nil
}
case err := <-waitCmd(s.cmd):
log.Println("case err := <-waitCmd(s.cmd)")
if err != nil {
log.Printf("ffmpeg exited with error: %v", err)
}
s.Next()
}
} }
func (s *Streamer) Stop() { func (s *Streamer) Stop() {
if s.cancel != nil { if s.cancel != nil {
done := make(chan error)
go func() {
done <- s.cmd.Wait()
}()
s.cancel() s.cancel()
s.mu.Lock()
if s.cmd != nil && s.cmd.Process != nil {
select {
case <-done:
break
case <-time.After(3 * time.Second):
// log.Printf("ffmpeg process is still running, killing it...\n")
if !s.cmd.ProcessState.Exited() {
_ = s.cmd.Process.Kill()
}
break
}
s.cmd = nil
}
s.mu.Unlock()
} }
} }
@ -184,23 +193,28 @@ func (s *Streamer) GetCurrentVideo() string {
return s.videoList[s.currentVideoIndex].Path return s.videoList[s.currentVideoIndex].Path
} }
func (s *Streamer) GetPlaylist() []config.InputItem { func (s *Streamer) GetVideoList() []config.InputItem {
return s.videoList return s.videoList
} }
func (s *Streamer) GetVideoListPath() []string {
var videoList []string
for _, item := range s.videoList {
videoList = append(videoList, item.Path)
}
return videoList
}
func (s *Streamer) GetCurrentIndex() int {
return s.currentVideoIndex
}
func (s *Streamer) Close() { func (s *Streamer) Close() {
if s.logFile != nil { if s.logFile != nil {
s.logFile.Close() s.logFile.Close()
s.logFile = nil s.logFile = nil
} }
} s.Stop()
func waitCmd(cmd *exec.Cmd) <-chan error {
ch := make(chan error, 1)
go func() {
ch <- cmd.Wait()
}()
return ch
} }
func (s *Streamer) log(reader *bufio.Reader, writer *bufio.Writer) { func (s *Streamer) log(reader *bufio.Reader, writer *bufio.Writer) {