package streamer import ( "bufio" "context" "fmt" "io" "live-streamer/config" "log" "os" "os/exec" "strings" "sync" "time" ) type Streamer struct { mailbox chan Message state *streamerState output strings.Builder outputQueue chan string outputReq chan chan string // address output concurrency security issue wg sync.WaitGroup // wait all handlers(except closehandler) to finish before closure } type streamerState struct { videoList []config.InputItem currentVideoIndex int manualControl bool cmd *exec.Cmd ctx context.Context cancel context.CancelFunc waitDone chan any } var GlobalStreamer *Streamer func NewStreamer(videoList []config.InputItem) *Streamer { s := &Streamer{ mailbox: make(chan Message, 100), state: &streamerState{ videoList: videoList, }, output: strings.Builder{}, outputQueue: make(chan string, 100), outputReq: make(chan chan string), } GlobalStreamer = s go s.actorLoop() go s.handleOutput() return s } func (s *Streamer) actorLoop() { for msg := range s.mailbox { if msg.messageType() != CloseMessage.messageType(CloseMessage{}) { s.wg.Add(1) s.handleMessage(msg) s.wg.Done() } else { s.handleMessage(msg) } } } func (s *Streamer) handleMessage(msg Message) { switch m := msg.(type) { case StartMessage: s.handleStart() case StopMessage: s.handleStop() case AddVideoMessage: s.handleAdd(m.Path) case RemoveVideoMessage: s.handleRemove(m.Path) case NextVideoMessage: s.handleNext() case PrevVideoMessage: s.handlePrev() case GetCurrentVideoMessage: s.handleGetCurrentVideo(m.Response) case GetVideoListMessage: s.handleGetVideoList(m.Response) case GetVideoListPathMessage: s.handleGetVideoListPath(m.Response) case GetCurrentIndexMessage: s.handleGetCurrentIndex(m.Response) case CloseMessage: s.handleClose() } } func (s *Streamer) handleStart() { if len(s.state.videoList) == 0 { time.Sleep(time.Second) s.mailbox <- StartMessage{} return } s.state.ctx, s.state.cancel = context.WithCancel(context.Background()) currentVideo := s.state.videoList[s.state.currentVideoIndex] videoPath := currentVideo.Path s.state.cmd = exec.CommandContext(s.state.ctx, "ffmpeg", buildFFmpegArgs(currentVideo)...) s.state.waitDone = make(chan any) s.writeOutput(fmt.Sprintln("start stream: ", videoPath)) pipe, err := s.state.cmd.StderrPipe() // ffmpeg send all messages to stderr if err != nil { log.Printf("failed to get pipe: %v", err) return } reader := bufio.NewReader(pipe) if err := s.state.cmd.Start(); err != nil { s.writeOutput(fmt.Sprintf("starting ffmpeg error: %v\n", err)) return } go s.log(reader) go func() { _ = s.state.cmd.Wait() s.state.cancel() s.writeOutput(fmt.Sprintf("stop stream: %s\n", videoPath)) if !s.state.manualControl { s.mailbox <- NextVideoMessage{} } else { s.state.manualControl = false } close(s.state.waitDone) }() } func (s *Streamer) handleStop() { if s.state.cancel == nil || s.state.cmd == nil { return } s.state.cancel() if s.state.cmd.Process != nil { select { case <-s.state.waitDone: case <-time.After(3 * time.Second): _ = s.state.cmd.Process.Kill() } } } func (s *Streamer) handleAdd(path string) { s.state.videoList = append(s.state.videoList, config.InputItem{Path: path}) } func (s *Streamer) handleRemove(path string) { var needStop bool var removeIndex int = -1 for i, item := range s.state.videoList { if item.Path == path { removeIndex = i needStop = (s.state.currentVideoIndex == i) break } } if removeIndex >= 0 && removeIndex < len(s.state.videoList) { oldLen := len(s.state.videoList) s.state.videoList = append(s.state.videoList[:removeIndex], s.state.videoList[removeIndex+1:]...) if s.state.currentVideoIndex >= oldLen-1 { s.state.currentVideoIndex = 0 } } if needStop { s.mailbox <- StopMessage{} s.mailbox <- StartMessage{} } } func (s *Streamer) handleNext() { if len(s.state.videoList) == 0 { return } s.state.manualControl = true s.state.currentVideoIndex++ if s.state.currentVideoIndex >= len(s.state.videoList) { s.state.currentVideoIndex = 0 } s.mailbox <- StopMessage{} s.mailbox <- StartMessage{} } func (s *Streamer) handlePrev() { if len(s.state.videoList) == 0 { return } s.state.manualControl = true s.state.currentVideoIndex-- if s.state.currentVideoIndex < 0 { s.state.currentVideoIndex = len(s.state.videoList) - 1 } s.mailbox <- StopMessage{} s.mailbox <- StartMessage{} } func (s *Streamer) handleGetCurrentVideo(response chan string) { if len(s.state.videoList) == 0 { response <- "" return } response <- s.state.videoList[s.state.currentVideoIndex].Path } func (s *Streamer) handleGetVideoList(response chan []config.InputItem) { response <- s.state.videoList } func (s *Streamer) handleGetVideoListPath(response chan []string) { var paths []string for _, item := range s.state.videoList { paths = append(paths, item.Path) } response <- paths } func (s *Streamer) handleGetCurrentIndex(response chan int) { response <- s.state.currentVideoIndex } func (s *Streamer) handleClose() { s.wg.Wait() os.Exit(0) } // Public methods that send messages to the actor func (s *Streamer) Start() { s.mailbox <- StartMessage{} } func (s *Streamer) Stop() { s.mailbox <- StopMessage{} } func (s *Streamer) Add(path string) { s.mailbox <- AddVideoMessage{Path: path} } func (s *Streamer) Remove(path string) { s.mailbox <- RemoveVideoMessage{Path: path} } func (s *Streamer) Next() { s.mailbox <- NextVideoMessage{} } func (s *Streamer) Prev() { s.mailbox <- PrevVideoMessage{} } func (s *Streamer) GetCurrentVideoPath() string { response := make(chan string) s.mailbox <- GetCurrentVideoMessage{Response: response} return <-response } func (s *Streamer) GetVideoList() []config.InputItem { response := make(chan []config.InputItem) s.mailbox <- GetVideoListMessage{Response: response} return <-response } func (s *Streamer) GetVideoListPath() []string { response := make(chan []string) s.mailbox <- GetVideoListPathMessage{Response: response} return <-response } func (s *Streamer) GetCurrentIndex() int { response := make(chan int) s.mailbox <- GetCurrentIndexMessage{Response: response} return <-response } func (s *Streamer) Close() { s.mailbox <- StopMessage{} s.mailbox <- CloseMessage{} } func (s *Streamer) handleOutput() { for { select { case o := <-s.outputQueue: s.output.WriteString(o) case c := <-s.outputReq: c <- s.output.String() } } } func (s *Streamer) GetOutput() string { o := make(chan string) s.outputReq <- o return <-o } func (s *Streamer) writeOutput(str string) { s.outputQueue <- str } func (s *Streamer) log(reader *bufio.Reader) { select { case <-s.state.ctx.Done(): return default: if !config.GlobalConfig.Log.PlayState { return } buf := make([]byte, 1024) for { n, err := reader.Read(buf) if n > 0 { s.writeOutput(string(buf[:n])) } if err != nil { if err != io.EOF { s.writeOutput(fmt.Sprintf("reading ffmpeg output error: %v\n", err)) } break } } } }