refactor: Streamer

breaking change: change PlayConfig.CustomArgs type
This commit is contained in:
Nite07 2024-10-25 12:51:49 +08:00
parent 1c1028d857
commit 048dfecbae
7 changed files with 358 additions and 230 deletions

View File

@ -41,7 +41,7 @@
"audio_bitrate": "128k", "audio_bitrate": "128k",
"audio_sample_rate": 44100, "audio_sample_rate": 44100,
"output_format": "flv", "output_format": "flv",
"custom_args": "" "custom_args": []
}, },
"output": { "output": {
"rtmp_server": "rtmp://live-push.example.com/live", "rtmp_server": "rtmp://live-push.example.com/live",

View File

@ -35,7 +35,7 @@ type PlayConfig struct {
AudioBitrate string `json:"audio_bitrate"` AudioBitrate string `json:"audio_bitrate"`
AudioSampleRate int `json:"audio_sample_rate"` AudioSampleRate int `json:"audio_sample_rate"`
OutputFormat string `json:"output_format"` OutputFormat string `json:"output_format"`
CustomArgs string `json:"custom_args"` CustomArgs []string `json:"custom_args"`
} }
type LogConfig struct { type LogConfig struct {

View File

@ -28,8 +28,8 @@ func main() {
GlobalStreamer = streamer.NewStreamer(config.GlobalConfig.VideoList) GlobalStreamer = streamer.NewStreamer(config.GlobalConfig.VideoList)
go startWatcher() go startWatcher()
go input() go input()
GlobalStreamer.Stream() GlobalStreamer.Start()
GlobalStreamer.Close() select {}
} }
func input() { func input() {

View File

@ -367,6 +367,7 @@
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/js/bootstrap.bundle.min.js"></script> <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/js/bootstrap.bundle.min.js"></script>
<script> <script>
let ws; let ws;
let shouldAutoScroll = true;
function connectWebSocket() { function connectWebSocket() {
const token = document.getElementById("token-input").value; const token = document.getElementById("token-input").value;
@ -388,7 +389,9 @@
ws.onmessage = function (evt) { ws.onmessage = function (evt) {
let obj = JSON.parse(evt.data); let obj = JSON.parse(evt.data);
messagesArea.value = obj.output; messagesArea.value = obj.output;
// messagesArea.scrollTop = messagesArea.scrollHeight; if (shouldAutoScroll) {
messagesArea.scrollTop = messagesArea.scrollHeight;
}
document.querySelector("#current-video>span").innerHTML = document.querySelector("#current-video>span").innerHTML =
obj.currentVideoPath; obj.currentVideoPath;
const listContainer = document.querySelector( const listContainer = document.querySelector(
@ -425,15 +428,32 @@
function validateToken() { function validateToken() {
const tokenInput = document.getElementById("token-input"); const tokenInput = document.getElementById("token-input");
const token = tokenInput.value || getStoredToken(); const token = tokenInput.value || getStoredToken();
if (token) { if (token) {
tokenInput.value = token; tokenInput.value = token;
connectWebSocket(); connectWebSocket();
} }
} }
document
.getElementById("token-input")
.addEventListener("keydown", function (event) {
if (event.key === "Enter") {
validateToken();
}
});
const messagesArea = document.getElementById("messages"); const messagesArea = document.getElementById("messages");
messagesArea.addEventListener("scroll", function () {
const maxScrollTop =
messagesArea.scrollHeight - messagesArea.clientHeight;
if (messagesArea.scrollTop === maxScrollTop) {
shouldAutoScroll = true;
} else {
shouldAutoScroll = false;
}
});
function sendWs(type) { function sendWs(type) {
if (ws && ws.readyState === WebSocket.OPEN) { if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(`{ "type": "${type}" }`); ws.send(`{ "type": "${type}" }`);

45
streamer/helper.go Normal file
View File

@ -0,0 +1,45 @@
package streamer
import (
"fmt"
"live-streamer/config"
"log"
)
func buildFFmpegArgs(videoItem config.InputItem) []string {
videoPath := videoItem.Path
args := []string{"-re"}
if videoItem.Start != "" {
args = append(args, "-ss", videoItem.Start)
}
if videoItem.End != "" {
args = append(args, "-to", videoItem.End)
}
args = append(args,
"-i", videoPath,
"-c:v", config.GlobalConfig.Play.VideoCodec,
"-preset", config.GlobalConfig.Play.Preset,
"-crf", fmt.Sprintf("%d", config.GlobalConfig.Play.CRF),
"-maxrate", config.GlobalConfig.Play.MaxRate,
"-bufsize", config.GlobalConfig.Play.BufSize,
"-vf", fmt.Sprintf("scale=%s", config.GlobalConfig.Play.Scale),
"-r", fmt.Sprintf("%d", config.GlobalConfig.Play.FrameRate),
"-c:a", config.GlobalConfig.Play.AudioCodec,
"-b:a", config.GlobalConfig.Play.AudioBitrate,
"-ar", fmt.Sprintf("%d", config.GlobalConfig.Play.AudioSampleRate),
"-f", config.GlobalConfig.Play.OutputFormat,
)
if len(config.GlobalConfig.Play.CustomArgs) != 0 {
args = append(args, config.GlobalConfig.Play.CustomArgs...)
}
args = append(args, fmt.Sprintf("%s/%s", config.GlobalConfig.Output.RTMPServer, config.GlobalConfig.Output.StreamKey))
log.Println("ffmpeg args: ", args)
return args
}

43
streamer/message.go Normal file
View File

@ -0,0 +1,43 @@
package streamer
import "live-streamer/config"
type Message interface {
messageType() string
}
type StartMessage struct{}
type StopMessage struct{}
type AddVideoMessage struct {
Path string
}
type RemoveVideoMessage struct {
Path string
}
type NextVideoMessage struct{}
type PrevVideoMessage struct{}
type GetCurrentVideoMessage struct {
Response chan string
}
type GetVideoListMessage struct {
Response chan []config.InputItem
}
type GetVideoListPathMessage struct {
Response chan []string
}
type GetCurrentIndexMessage struct {
Response chan int
}
type CloseMessage struct{}
func (m StartMessage) messageType() string { return "start" }
func (m StopMessage) messageType() string { return "stop" }
func (m AddVideoMessage) messageType() string { return "add" }
func (m RemoveVideoMessage) messageType() string { return "remove" }
func (m NextVideoMessage) messageType() string { return "next" }
func (m PrevVideoMessage) messageType() string { return "prev" }
func (m GetCurrentVideoMessage) messageType() string { return "getCurrentVideo" }
func (m GetVideoListMessage) messageType() string { return "getVideoList" }
func (m GetVideoListPathMessage) messageType() string { return "getVideoListPath" }
func (m GetCurrentIndexMessage) messageType() string { return "getCurrentIndex" }
func (m CloseMessage) messageType() string { return "close" }

View File

@ -14,7 +14,19 @@ import (
"time" "time"
) )
type playState struct { type Streamer struct {
mailbox chan Message
state *streamerState
output strings.Builder
outputQueue chan string
outputReq chan chan string // address output concurrency security issue
wg sync.WaitGroup // wait all handlers(except closehandler) to finish before closure
}
type streamerState struct {
videoList []config.InputItem
currentVideoIndex int currentVideoIndex int
manualControl bool manualControl bool
cmd *exec.Cmd cmd *exec.Cmd
@ -23,42 +35,81 @@ type playState struct {
waitDone chan any waitDone chan any
} }
type Streamer struct {
playStateMu sync.RWMutex
playState playState
videoMu sync.RWMutex
videoList []config.InputItem
outputMu sync.RWMutex
output strings.Builder
}
var GlobalStreamer *Streamer var GlobalStreamer *Streamer
func NewStreamer(videoList []config.InputItem) *Streamer { func NewStreamer(videoList []config.InputItem) *Streamer {
GlobalStreamer = &Streamer{ s := &Streamer{
mailbox: make(chan Message, 100),
state: &streamerState{
videoList: videoList, videoList: videoList,
playState: playState{}, },
output: strings.Builder{}, output: strings.Builder{},
outputQueue: make(chan string, 100),
outputReq: make(chan chan string),
} }
return GlobalStreamer GlobalStreamer = s
go s.actorLoop()
go s.handleOutput()
return s
} }
func (s *Streamer) start() { func (s *Streamer) actorLoop() {
s.playStateMu.Lock() for msg := range s.mailbox {
s.playState.ctx, s.playState.cancel = context.WithCancel(context.Background()) if msg.messageType() != CloseMessage.messageType(CloseMessage{}) {
cancel := s.playState.cancel log.Printf("handle %s start\n", msg.messageType())
currentVideo := s.videoList[s.playState.currentVideoIndex] s.wg.Add(1)
s.handleMessage(msg)
s.wg.Done()
log.Printf("handle %s end\n", msg.messageType())
} else {
s.handleMessage(msg)
}
}
}
func (s *Streamer) handleMessage(msg Message) {
switch m := msg.(type) {
case StartMessage:
s.handleStart()
case StopMessage:
s.handleStop()
case AddVideoMessage:
s.handleAdd(m.Path)
case RemoveVideoMessage:
s.handleRemove(m.Path)
case NextVideoMessage:
s.handleNext()
case PrevVideoMessage:
s.handlePrev()
case GetCurrentVideoMessage:
s.handleGetCurrentVideo(m.Response)
case GetVideoListMessage:
s.handleGetVideoList(m.Response)
case GetVideoListPathMessage:
s.handleGetVideoListPath(m.Response)
case GetCurrentIndexMessage:
s.handleGetCurrentIndex(m.Response)
case CloseMessage:
s.handleClose()
}
}
func (s *Streamer) handleStart() {
if len(s.state.videoList) == 0 {
time.Sleep(time.Second)
s.mailbox <- StartMessage{}
return
}
s.state.ctx, s.state.cancel = context.WithCancel(context.Background())
currentVideo := s.state.videoList[s.state.currentVideoIndex]
videoPath := currentVideo.Path videoPath := currentVideo.Path
s.playState.cmd = exec.CommandContext(s.playState.ctx, "ffmpeg", s.buildFFmpegArgs(currentVideo)...) s.state.cmd = exec.CommandContext(s.state.ctx, "ffmpeg", buildFFmpegArgs(currentVideo)...)
s.playState.waitDone = make(chan any) s.state.waitDone = make(chan any)
cmd := s.playState.cmd
s.playStateMu.Unlock()
s.writeOutput(fmt.Sprintln("start stream: ", videoPath)) s.writeOutput(fmt.Sprintln("start stream: ", videoPath))
pipe, err := cmd.StderrPipe() pipe, err := s.state.cmd.StderrPipe() // ffmpeg send all messages to stderr
if err != nil { if err != nil {
log.Printf("failed to get pipe: %v", err) log.Printf("failed to get pipe: %v", err)
return return
@ -66,153 +117,211 @@ func (s *Streamer) start() {
reader := bufio.NewReader(pipe) reader := bufio.NewReader(pipe)
if err := cmd.Start(); err != nil { if err := s.state.cmd.Start(); err != nil {
s.writeOutput(fmt.Sprintf("starting ffmpeg error: %v\n", err)) s.writeOutput(fmt.Sprintf("starting ffmpeg error: %v\n", err))
return return
} }
go s.log(reader) go s.log(reader)
_ = cmd.Wait() go func() {
cancel() _ = s.state.cmd.Wait()
s.state.cancel()
s.writeOutput(fmt.Sprintf("stop stream: %s\n", videoPath)) s.writeOutput(fmt.Sprintf("stop stream: %s\n", videoPath))
s.playStateMu.Lock() if !s.state.manualControl {
if s.playState.manualControl { s.mailbox <- NextVideoMessage{}
// manualing change video, don't increase currentVideoIndex
s.playState.manualControl = false
} else { } else {
s.playState.currentVideoIndex++ s.state.manualControl = false
s.videoMu.RLock()
if s.playState.currentVideoIndex >= len(s.videoList) {
s.playState.currentVideoIndex = 0
}
s.videoMu.RUnlock()
}
close(s.playState.waitDone)
s.playStateMu.Unlock()
} }
func (s *Streamer) Stream() { close(s.state.waitDone)
for { }()
if len(s.videoList) == 0 {
time.Sleep(time.Second)
continue
}
s.start()
}
} }
func (s *Streamer) Stop() { func (s *Streamer) handleStop() {
s.playStateMu.Lock() if s.state.cancel == nil || s.state.cmd == nil {
cancel := s.playState.cancel
s.playState.cancel = nil
cmd := s.playState.cmd
s.playState.cmd = nil
done := s.playState.waitDone
s.playStateMu.Unlock()
if cancel == nil || cmd == nil {
return return
} }
cancel() s.state.cancel()
if cmd.Process != nil { if s.state.cmd.Process != nil {
select { select {
case <-done: case <-s.state.waitDone:
case <-time.After(3 * time.Second): case <-time.After(3 * time.Second):
_ = cmd.Process.Kill() _ = s.state.cmd.Process.Kill()
} }
} }
} }
func (s *Streamer) Add(videoPath string) { func (s *Streamer) handleAdd(path string) {
s.videoMu.Lock() s.state.videoList = append(s.state.videoList, config.InputItem{Path: path})
defer s.videoMu.Unlock()
s.videoList = append(s.videoList, config.InputItem{Path: videoPath})
} }
func (s *Streamer) Remove(videoPath string) { func (s *Streamer) handleRemove(path string) {
var needStop bool // removed video is current playing var needStop bool
var removeIndex int = -1 var removeIndex int = -1
s.videoMu.Lock() for i, item := range s.state.videoList {
for i, item := range s.videoList { if item.Path == path {
if item.Path == videoPath {
removeIndex = i removeIndex = i
needStop = (s.state.currentVideoIndex == i)
s.playStateMu.RLock()
needStop = (s.playState.currentVideoIndex == i)
s.playStateMu.RUnlock()
break break
} }
} }
if removeIndex >= 0 && removeIndex < len(s.videoList) { if removeIndex >= 0 && removeIndex < len(s.state.videoList) {
oldLen := len(s.videoList) oldLen := len(s.state.videoList)
s.videoList = append(s.videoList[:removeIndex], s.videoList[removeIndex+1:]...) s.state.videoList = append(s.state.videoList[:removeIndex], s.state.videoList[removeIndex+1:]...)
if s.state.currentVideoIndex >= oldLen-1 {
s.playStateMu.Lock() s.state.currentVideoIndex = 0
if s.playState.currentVideoIndex >= oldLen-1 {
s.playState.currentVideoIndex = 0
} }
s.playStateMu.Unlock()
} }
s.videoMu.Unlock()
if needStop { if needStop {
s.Stop() s.mailbox <- StopMessage{}
s.mailbox <- StartMessage{}
} }
} }
func (s *Streamer) Prev() { func (s *Streamer) handleNext() {
s.videoMu.RLock() if len(s.state.videoList) == 0 {
videoLen := len(s.videoList)
if videoLen == 0 {
return return
} }
s.videoMu.RUnlock()
s.playStateMu.Lock() s.state.manualControl = true
s.playState.manualControl = true s.state.currentVideoIndex++
s.playState.currentVideoIndex-- if s.state.currentVideoIndex >= len(s.state.videoList) {
if s.playState.currentVideoIndex < 0 { s.state.currentVideoIndex = 0
s.playState.currentVideoIndex = videoLen - 1
} }
s.playStateMu.Unlock()
s.Stop() s.mailbox <- StopMessage{}
s.mailbox <- StartMessage{}
}
func (s *Streamer) handlePrev() {
if len(s.state.videoList) == 0 {
return
}
s.state.manualControl = true
s.state.currentVideoIndex--
if s.state.currentVideoIndex < 0 {
s.state.currentVideoIndex = len(s.state.videoList) - 1
}
s.mailbox <- StopMessage{}
s.mailbox <- StartMessage{}
}
func (s *Streamer) handleGetCurrentVideo(response chan string) {
if len(s.state.videoList) == 0 {
response <- ""
return
}
response <- s.state.videoList[s.state.currentVideoIndex].Path
}
func (s *Streamer) handleGetVideoList(response chan []config.InputItem) {
response <- s.state.videoList
}
func (s *Streamer) handleGetVideoListPath(response chan []string) {
var paths []string
for _, item := range s.state.videoList {
paths = append(paths, item.Path)
}
response <- paths
}
func (s *Streamer) handleGetCurrentIndex(response chan int) {
response <- s.state.currentVideoIndex
}
func (s *Streamer) handleClose() {
s.wg.Wait()
os.Exit(0)
}
// Public methods that send messages to the actor
func (s *Streamer) Start() {
s.mailbox <- StartMessage{}
}
func (s *Streamer) Stop() {
s.mailbox <- StopMessage{}
}
func (s *Streamer) Add(path string) {
s.mailbox <- AddVideoMessage{Path: path}
}
func (s *Streamer) Remove(path string) {
s.mailbox <- RemoveVideoMessage{Path: path}
} }
func (s *Streamer) Next() { func (s *Streamer) Next() {
s.videoMu.RLock() s.mailbox <- NextVideoMessage{}
videoLen := len(s.videoList)
if videoLen == 0 {
return
} }
s.videoMu.RUnlock()
s.playStateMu.Lock() func (s *Streamer) Prev() {
s.playState.manualControl = true s.mailbox <- PrevVideoMessage{}
s.playState.currentVideoIndex++
if s.playState.currentVideoIndex >= videoLen {
s.playState.currentVideoIndex = 0
} }
s.playStateMu.Unlock()
s.Stop() func (s *Streamer) GetCurrentVideoPath() string {
response := make(chan string)
s.mailbox <- GetCurrentVideoMessage{Response: response}
return <-response
}
func (s *Streamer) GetVideoList() []config.InputItem {
response := make(chan []config.InputItem)
s.mailbox <- GetVideoListMessage{Response: response}
return <-response
}
func (s *Streamer) GetVideoListPath() []string {
response := make(chan []string)
s.mailbox <- GetVideoListPathMessage{Response: response}
return <-response
}
func (s *Streamer) GetCurrentIndex() int {
response := make(chan int)
s.mailbox <- GetCurrentIndexMessage{Response: response}
return <-response
}
func (s *Streamer) Close() {
s.mailbox <- StopMessage{}
s.mailbox <- CloseMessage{}
}
func (s *Streamer) handleOutput() {
for {
select {
case o := <-s.outputQueue:
s.output.WriteString(o)
case c := <-s.outputReq:
c <- s.output.String()
}
}
}
func (s *Streamer) GetOutput() string {
o := make(chan string)
s.outputReq <- o
return <-o
}
func (s *Streamer) writeOutput(str string) {
s.outputQueue <- str
} }
func (s *Streamer) log(reader *bufio.Reader) { func (s *Streamer) log(reader *bufio.Reader) {
s.playStateMu.RLock()
ctx := s.playState.ctx
s.playStateMu.RUnlock()
select { select {
case <-ctx.Done(): case <-s.state.ctx.Done():
return return
default: default:
if !config.GlobalConfig.Log.PlayState { if !config.GlobalConfig.Log.PlayState {
@ -222,9 +331,7 @@ func (s *Streamer) log(reader *bufio.Reader) {
for { for {
n, err := reader.Read(buf) n, err := reader.Read(buf)
if n > 0 { if n > 0 {
videoPath := s.GetCurrentVideoPath() s.writeOutput(string(buf[:n]))
buf = append([]byte(videoPath), buf...)
s.writeOutput(string(buf[:n+len(videoPath)]))
} }
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
@ -235,90 +342,3 @@ func (s *Streamer) log(reader *bufio.Reader) {
} }
} }
} }
func (s *Streamer) GetCurrentVideoPath() string {
s.videoMu.RLock()
defer s.videoMu.RUnlock()
if len(s.videoList) == 0 {
return ""
}
return s.videoList[s.GetCurrentIndex()].Path
}
func (s *Streamer) GetVideoList() []config.InputItem {
s.videoMu.RLock()
defer s.videoMu.RUnlock()
return s.videoList
}
func (s *Streamer) GetVideoListPath() []string {
s.videoMu.RLock()
defer s.videoMu.RUnlock()
var videoList []string
for _, item := range s.videoList {
videoList = append(videoList, item.Path)
}
return videoList
}
func (s *Streamer) GetCurrentIndex() int {
s.playStateMu.RLock()
defer s.playStateMu.RUnlock()
return s.playState.currentVideoIndex
}
func (s *Streamer) writeOutput(str string) {
s.outputMu.Lock()
defer s.outputMu.Unlock()
s.output.WriteString(str)
}
func (s *Streamer) GetOutput() string {
s.outputMu.RLock()
defer s.outputMu.RUnlock()
return s.output.String()
}
func (s *Streamer) Close() {
s.Stop()
os.Exit(0)
}
func (s *Streamer) buildFFmpegArgs(videoItem config.InputItem) []string {
videoPath := videoItem.Path
args := []string{"-re"}
if videoItem.Start != "" {
args = append(args, "-ss", videoItem.Start)
}
if videoItem.End != "" {
args = append(args, "-to", videoItem.End)
}
args = append(args,
"-i", videoPath,
"-c:v", config.GlobalConfig.Play.VideoCodec,
"-preset", config.GlobalConfig.Play.Preset,
"-crf", fmt.Sprintf("%d", config.GlobalConfig.Play.CRF),
"-maxrate", config.GlobalConfig.Play.MaxRate,
"-bufsize", config.GlobalConfig.Play.BufSize,
"-vf", fmt.Sprintf("scale=%s", config.GlobalConfig.Play.Scale),
"-r", fmt.Sprintf("%d", config.GlobalConfig.Play.FrameRate),
"-c:a", config.GlobalConfig.Play.AudioCodec,
"-b:a", config.GlobalConfig.Play.AudioBitrate,
"-ar", fmt.Sprintf("%d", config.GlobalConfig.Play.AudioSampleRate),
"-f", config.GlobalConfig.Play.OutputFormat,
)
if config.GlobalConfig.Play.CustomArgs != "" {
customArgs := strings.Fields(config.GlobalConfig.Play.CustomArgs)
args = append(args, customArgs...)
}
args = append(args, fmt.Sprintf("%s/%s", config.GlobalConfig.Output.RTMPServer, config.GlobalConfig.Output.StreamKey))
log.Println("ffmpeg args: ", args)
return args
}