package streamer import ( "bufio" "context" "errors" "fmt" "io" "live-streamer/model" "path/filepath" "os/exec" "strings" "sync" "time" "go.uber.org/zap" ) type Streamer struct { mailbox chan Message state *streamerState output strings.Builder outputQueue chan string outputReq chan chan string // address output concurrency security issue wg sync.WaitGroup // wait all handlers(except closehandler) to finish before closure close chan any // inform actorLoop() to return closed chan any // inform Close() to return log *zap.Logger option *Option rtmpServer string streamKey string } type Option struct { FFmpegArgs map[string]any ShowFFmpegOutput bool } type streamerState struct { videoList []model.VideoItem currentVideoIndex int manualControl bool cmd *exec.Cmd ctx context.Context cancel context.CancelFunc waitDone chan any } var GlobalStreamer *Streamer func NewStreamer(rtmpServer string, streamKey string, videoList []model.VideoItem, log *zap.Logger, option *Option) (*Streamer, error) { if rtmpServer == "" || streamKey == "" { return nil, errors.New("lack of args") } s := &Streamer{ mailbox: make(chan Message, 100), state: &streamerState{ videoList: videoList, }, output: strings.Builder{}, outputQueue: make(chan string, 100), outputReq: make(chan chan string), close: make(chan any), option: option, log: log, rtmpServer: rtmpServer, streamKey: streamKey, closed: make(chan any), } GlobalStreamer = s go s.actorLoop() go s.handleOutput() return s, nil } func (s *Streamer) actorLoop() { for { select { case <-s.close: return case msg := <-s.mailbox: if _, ok := msg.(CloseMessage); !ok { s.wg.Add(1) s.handleMessage(msg) s.wg.Done() } else { s.handleMessage(msg) } } } } func (s *Streamer) handleMessage(msg Message) { switch m := msg.(type) { case StartMessage: s.handleStart() case StopMessage: s.handleStop() case AddVideoMessage: s.handleAdd(m.Path) case RemoveVideoMessage: s.handleRemove(m.Path) case NextVideoMessage: s.handleNext() case PrevVideoMessage: s.handlePrev() case GetCurrentVideoMessage: s.handleGetCurrentVideo(m.Response) case GetVideoListMessage: s.handleGetVideoList(m.Response) case GetVideoListPathMessage: s.handleGetVideoListPath(m.Response) case GetCurrentIndexMessage: s.handleGetCurrentIndex(m.Response) case CloseMessage: s.handleClose() } } func (s *Streamer) handleStart() { if len(s.state.videoList) == 0 { time.Sleep(time.Second) s.mailbox <- StartMessage{} return } s.state.ctx, s.state.cancel = context.WithCancel(context.Background()) currentVideo := s.state.videoList[s.state.currentVideoIndex] videoPath := currentVideo.Path s.state.cmd = exec.CommandContext(s.state.ctx, "ffmpeg", s.buildFFmpegArgs(currentVideo)...) s.state.waitDone = make(chan any) pipe, err := s.state.cmd.StderrPipe() // ffmpeg send all messages to stderr if err != nil { s.log.Error("failed to get pipe", zap.Error(err)) return } reader := bufio.NewReader(pipe) s.log.Info("start stream", zap.String("path", videoPath)) s.writeOutput(fmt.Sprintln("start stream: ", videoPath)) if err := s.state.cmd.Start(); err != nil { s.writeOutput(fmt.Sprintf("starting ffmpeg error: %v\n", err)) return } go func() { s.log.Debug("wait stream end", zap.String("path", videoPath)) _ = s.state.cmd.Wait() s.log.Debug("process stop", zap.String("path", videoPath)) s.state.cancel() s.log.Debug("context cancel", zap.String("path", videoPath)) s.writeOutput(fmt.Sprintf("stop stream: %s\n", videoPath)) if !s.state.manualControl { s.log.Debug("video end", zap.String("path", videoPath)) s.state.currentVideoIndex++ if s.state.currentVideoIndex >= len(s.state.videoList) { s.state.currentVideoIndex = 0 } s.mailbox <- StartMessage{} } else { s.log.Debug("manually end", zap.String("path", videoPath)) s.state.manualControl = false } close(s.state.waitDone) }() go s.ffmpegLog(reader) } func (s *Streamer) handleStop() { if s.state.cancel == nil || s.state.cmd == nil { return } videoPath := s.state.videoList[s.state.currentVideoIndex].Path s.log.Debug("wait context to be cancelled", zap.String("path", videoPath)) s.state.cancel() s.log.Debug("context has been cancelled", zap.String("path", videoPath)) if s.state.cmd.Process != nil { s.log.Debug("wait ffmpeg process stop", zap.String("path", videoPath)) select { case <-s.state.waitDone: case <-time.After(3 * time.Second): _ = s.state.cmd.Process.Kill() } s.log.Debug("ffmpeg process has stopped", zap.String("path", videoPath)) } s.state.cancel = nil s.state.cmd = nil } func (s *Streamer) handleAdd(path string) { s.state.videoList = append(s.state.videoList, model.VideoItem{Path: path}) } func (s *Streamer) handleRemove(path string) { var needStop bool var removeIndex int = -1 for i, item := range s.state.videoList { if item.Path == path { removeIndex = i needStop = (s.state.currentVideoIndex == i) break } } if removeIndex >= 0 && removeIndex < len(s.state.videoList) { oldLen := len(s.state.videoList) s.state.videoList = append(s.state.videoList[:removeIndex], s.state.videoList[removeIndex+1:]...) if s.state.currentVideoIndex >= oldLen-1 { s.state.currentVideoIndex = 0 } } if needStop { s.mailbox <- StopMessage{} s.mailbox <- StartMessage{} } } func (s *Streamer) handleNext() { if len(s.state.videoList) == 0 { return } s.state.manualControl = true s.state.currentVideoIndex = (s.state.currentVideoIndex + 1) % len(s.state.videoList) s.mailbox <- StopMessage{} s.mailbox <- StartMessage{} } func (s *Streamer) handlePrev() { if len(s.state.videoList) == 0 { return } s.state.manualControl = true s.state.currentVideoIndex = (s.state.currentVideoIndex - 1 + len(s.state.videoList)) % len(s.state.videoList) s.mailbox <- StopMessage{} s.mailbox <- StartMessage{} } func (s *Streamer) handleGetCurrentVideo(response chan string) { if len(s.state.videoList) == 0 { response <- "" return } response <- s.state.videoList[s.state.currentVideoIndex].Path } func (s *Streamer) handleGetVideoList(response chan []model.VideoItem) { response <- s.state.videoList } func (s *Streamer) handleGetVideoListPath(response chan []string) { var paths []string for _, item := range s.state.videoList { paths = append(paths, item.Path) } response <- paths } func (s *Streamer) handleGetCurrentIndex(response chan int) { response <- s.state.currentVideoIndex } func (s *Streamer) handleClose() { close(s.close) s.handleStop() s.wg.Wait() close(s.mailbox) close(s.outputQueue) close(s.outputReq) s.output.Reset() GlobalStreamer = nil close(s.closed) } // Public methods that send messages to the actor func (s *Streamer) Start() { s.mailbox <- StartMessage{} } func (s *Streamer) Stop() { s.mailbox <- StopMessage{} } func (s *Streamer) Add(path string) { s.mailbox <- AddVideoMessage{Path: path} } func (s *Streamer) Remove(path string) { s.mailbox <- RemoveVideoMessage{Path: path} } func (s *Streamer) Next() { s.mailbox <- NextVideoMessage{} } func (s *Streamer) Prev() { s.mailbox <- PrevVideoMessage{} } func (s *Streamer) GetCurrentVideoPath() string { response := make(chan string) s.mailbox <- GetCurrentVideoMessage{Response: response} return <-response } func (s *Streamer) GetVideoList() []model.VideoItem { response := make(chan []model.VideoItem) s.mailbox <- GetVideoListMessage{Response: response} return <-response } func (s *Streamer) GetVideoListPath() []string { response := make(chan []string) s.mailbox <- GetVideoListPathMessage{Response: response} return <-response } func (s *Streamer) GetCurrentIndex() int { response := make(chan int) s.mailbox <- GetCurrentIndexMessage{Response: response} return <-response } func (s *Streamer) Close() { s.mailbox <- CloseMessage{} <-s.closed } func (s *Streamer) handleOutput() { for { select { case o := <-s.outputQueue: s.output.WriteString(o) case c := <-s.outputReq: c <- s.output.String() } } } func (s *Streamer) GetOutput() string { o := make(chan string) s.outputReq <- o return <-o } func (s *Streamer) writeOutput(str string) { s.outputQueue <- str } func (s *Streamer) ffmpegLog(reader *bufio.Reader) { select { case <-s.state.ctx.Done(): return default: buf := make([]byte, 1024) for { n, err := reader.Read(buf) if n > 0 { if s.option.ShowFFmpegOutput { s.writeOutput(string(buf[:n])) } if s.log.Level() == zap.DebugLevel { fmt.Print(string(buf[:n])) } } if err != nil { if err != io.EOF { s.writeOutput(fmt.Sprintf("reading ffmpeg output error: %v\n", err)) } break } } } } func (s *Streamer) buildFFmpegArgs(videoItem model.VideoItem) []string { videoPath := videoItem.Path args := []string{"-re"} if videoItem.Start != "" { args = append(args, "-ss", videoItem.Start) } if videoItem.End != "" { args = append(args, "-to", videoItem.End) } args = append(args, "-i", videoPath, ) for k, v := range s.option.FFmpegArgs { args = append(args, k) if str, ok := v.(string); ok { filename := strings.TrimSuffix(filepath.Base(videoPath), filepath.Ext(videoPath)) str = strings.ReplaceAll(str, "{{filepath}}", videoPath) str = strings.ReplaceAll(str, "{{filename}}", filename) args = append(args, str) } else { args = append(args, fmt.Sprint(v)) } } args = append(args, fmt.Sprintf("%s/%s", s.rtmpServer, s.streamKey)) s.log.Debug("build ffmpeg", zap.Strings("args", args)) return args }