feat: log level config

feat: ffmpeg args placeholder
This commit is contained in:
Nite07 2024-10-29 03:35:21 +08:00
parent 2afcac48dc
commit dfdb6003ea
23 changed files with 212 additions and 71 deletions

0
.github/workflows/docker.yml vendored Normal file → Executable file
View File

0
.github/workflows/release.yml vendored Normal file → Executable file
View File

0
.gitignore vendored Normal file → Executable file
View File

0
.goreleaser.yaml Normal file → Executable file
View File

0
Dockerfile Normal file → Executable file
View File

0
LICENSE Normal file → Executable file
View File

21
README.md Normal file → Executable file
View File

@ -10,7 +10,9 @@
- 🎯 支持视频片段截取推流(指定开始和结束时间) - 🎯 支持视频片段截取推流(指定开始和结束时间)
- 🔄 支持手动切换当前推流视频 - 🔄 支持手动切换当前推流视频
## 示例配置 ## 配置
### 示例
除了 input 和 output 部分,其余都是可选的 除了 input 和 output 部分,其余都是可选的
@ -35,7 +37,7 @@
"-crf": 23, "-crf": 23,
"-maxrate": "1000k", "-maxrate": "1000k",
"-bufsize": "2000k", "-bufsize": "2000k",
"-vf": "1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2", "-vf": "scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2",
"-r": 30, "-r": 30,
"-c:a": "aac", "-c:a": "aac",
"-b:a": "128k", "-b:a": "128k",
@ -55,3 +57,18 @@
} }
} }
``` ```
### play 参数占位符
`{{filename}}`: 视频文件名(不包含后缀)
`{{filepath}}`: 视频路径
示例:
```json
{
"play": {
"-vf": "drawtext=text='{{filename}}':x=5:y=5:fontsize=24:fontcolor=white:borderw=2:bordercolor=black"
}
}
```

12
config/config.go Normal file → Executable file
View File

@ -24,6 +24,7 @@ type InputItem struct {
} }
type LogConfig struct { type LogConfig struct {
Level string `json:"level"`
PlayState bool `json:"play_state"` PlayState bool `json:"play_state"`
} }
@ -42,10 +43,10 @@ type Config struct {
Server ServerConfig `json:"server"` Server ServerConfig `json:"server"`
} }
var GlobalConfig Config var GlobalConfig *Config
func init() { func init() {
GlobalConfig = Config{} GlobalConfig = &Config{}
err := readConfig("config.json") err := readConfig("config.json")
if len(GlobalConfig.Input) == 0 { if len(GlobalConfig.Input) == 0 {
log.Fatal("No input video found") log.Fatal("No input video found")
@ -91,9 +92,16 @@ func validateConfig() error {
if err := validateServerConfig(); err != nil { if err := validateServerConfig(); err != nil {
return err return err
} }
validateLogConfig()
return nil return nil
} }
func validateLogConfig() {
if GlobalConfig.Log.Level == "" {
GlobalConfig.Log.Level = "info"
}
}
func validateInputConfig() error { func validateInputConfig() error {
if GlobalConfig.Input == nil { if GlobalConfig.Input == nil {
return errors.New("video_path is nil") return errors.New("video_path is nil")

0
constant/constant.go Normal file → Executable file
View File

0
constant/version.go Normal file → Executable file
View File

0
docker-compose.yaml Normal file → Executable file
View File

2
go.mod Normal file → Executable file
View File

@ -30,6 +30,8 @@ require (
github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect github.com/ugorji/go/codec v1.2.12 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/arch v0.8.0 // indirect golang.org/x/arch v0.8.0 // indirect
golang.org/x/crypto v0.23.0 // indirect golang.org/x/crypto v0.23.0 // indirect
golang.org/x/net v0.25.0 // indirect golang.org/x/net v0.25.0 // indirect

7
go.sum Normal file → Executable file
View File

@ -58,6 +58,8 @@ github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
@ -74,6 +76,10 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE=
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc= golang.org/x/arch v0.8.0 h1:3wRIsP3pM4yUptoR96otTUOXI367OS0+c9eeRi9doIc=
golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= golang.org/x/arch v0.8.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
@ -81,6 +87,7 @@ golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=

41
logger/logger.go Normal file
View File

@ -0,0 +1,41 @@
package logger
import (
c "live-streamer/config"
"os"
"strings"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var (
GlobalLogger *zap.Logger
config *c.Config
)
func init() {
config = c.GlobalConfig
var logLevel zapcore.Level
switch strings.ToLower(config.Log.Level) {
case "info":
logLevel = zap.InfoLevel
case "error":
logLevel = zap.ErrorLevel
case "warn":
logLevel = zap.WarnLevel
case "debug":
logLevel = zap.DebugLevel
case "panic":
logLevel = zap.PanicLevel
}
encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder
encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
consoleWriter := zapcore.AddSync(os.Stdout)
consoleCore := zapcore.NewCore(zapcore.NewConsoleEncoder(encoderConfig), consoleWriter, logLevel)
GlobalLogger = zap.New(consoleCore)
}

48
main.go Normal file → Executable file
View File

@ -4,35 +4,51 @@ import (
"bufio" "bufio"
"fmt" "fmt"
"live-streamer/config" c "live-streamer/config"
"live-streamer/constant" "live-streamer/constant"
"live-streamer/logger"
"live-streamer/server" "live-streamer/server"
"live-streamer/streamer" "live-streamer/streamer"
"live-streamer/utils" "live-streamer/utils"
"live-streamer/websocket" "live-streamer/websocket"
"log"
"os" "os"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"go.uber.org/zap"
) )
var GlobalStreamer *streamer.Streamer var (
GlobalStreamer *streamer.Streamer
log *zap.Logger
config *c.Config
)
func init() {
config = c.GlobalConfig
log = logger.GlobalLogger
}
func main() { func main() {
fmt.Println("Version: " + constant.Version) fmt.Println("Version: " + constant.Version)
server.NewServer(config.GlobalConfig.Server.Addr, websocket.RequestHandler)
server.GlobalServer.Run()
if !utils.HasFFMPEG() { if !utils.HasFFMPEG() {
log.Fatal("ffmpeg not found") log.Fatal("ffmpeg not found")
} }
GlobalStreamer = streamer.NewStreamer(config.GlobalConfig.VideoList)
server.NewServer(config.Server.Addr, websocket.RequestHandler)
server.GlobalServer.Run()
GlobalStreamer = streamer.NewStreamer(config.VideoList)
go startWatcher() go startWatcher()
go input() go inputHandler()
GlobalStreamer.Start() GlobalStreamer.Start()
select {} select {}
} }
func input() { func inputHandler() {
scanner := bufio.NewScanner(os.Stdin) scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() { for scanner.Scan() {
line := scanner.Text() line := scanner.Text()
@ -56,20 +72,20 @@ func input() {
func startWatcher() { func startWatcher() {
watcher, err := fsnotify.NewWatcher() watcher, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
log.Fatalf("failed to create watcher: %v", err) log.Fatal("failed to create watcher", zap.Error(err))
} }
defer watcher.Close() defer watcher.Close()
for _, item := range config.GlobalConfig.InputItems { for _, item := range config.InputItems {
if item.ItemType == "dir" { if item.ItemType == "dir" {
err = watcher.Add(item.Path) err = watcher.Add(item.Path)
if err != nil { if err != nil {
log.Fatalf("failed to add dir to watcher: %v", err) log.Fatal("failed to add dir to watcher", zap.Error(err))
} }
log.Println("watching dir:", item.Path) log.Info("watching dir", zap.String("path", item.Path))
} }
} }
if err != nil { if err != nil {
log.Fatalf("failed to start watcher: %v", err) log.Fatal("failed to start watcher", zap.Error(err))
} }
for { for {
@ -80,19 +96,19 @@ func startWatcher() {
} }
if event.Op&fsnotify.Create == fsnotify.Create { if event.Op&fsnotify.Create == fsnotify.Create {
if utils.IsSupportedVideo(event.Name) { if utils.IsSupportedVideo(event.Name) {
log.Println("new video added:", event.Name) log.Info("new video added", zap.String("path", event.Name))
GlobalStreamer.Add(event.Name) GlobalStreamer.Add(event.Name)
} }
} }
if event.Op&fsnotify.Remove == fsnotify.Remove { if event.Op&fsnotify.Remove == fsnotify.Remove {
log.Println("video removed:", event.Name) log.Info("video removed", zap.String("path", event.Name))
GlobalStreamer.Remove(event.Name) GlobalStreamer.Remove(event.Name)
} }
case err, ok := <-watcher.Errors: case err, ok := <-watcher.Errors:
if !ok { if !ok {
return return
} }
log.Println("watcher error:", err) log.Error("watcher error", zap.Error(err))
} }
} }
} }

35
server/server.go Normal file → Executable file
View File

@ -3,10 +3,10 @@ package server
import ( import (
"embed" "embed"
"html/template" "html/template"
"live-streamer/config" c "live-streamer/config"
"live-streamer/logger"
"live-streamer/streamer" "live-streamer/streamer"
mywebsocket "live-streamer/websocket" mywebsocket "live-streamer/websocket"
"log"
"net/http" "net/http"
"sync" "sync"
"time" "time"
@ -14,6 +14,7 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
uuid "github.com/gofrs/uuid/v5" uuid "github.com/gofrs/uuid/v5"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"go.uber.org/zap"
) )
//go:embed static //go:embed static
@ -41,7 +42,17 @@ type Client struct {
hasSentSize int hasSentSize int
} }
var GlobalServer *Server var (
GlobalServer *Server
config *c.Config
log *zap.Logger
)
func init() {
config = c.GlobalConfig
log = logger.GlobalLogger
}
func NewServer(addr string, dealInputFunc InputFunc) { func NewServer(addr string, dealInputFunc InputFunc) {
GlobalServer = &Server{ GlobalServer = &Server{
@ -56,7 +67,7 @@ func (s *Server) Run() {
router := gin.New() router := gin.New()
tpl, err := template.ParseFS(staticFiles, "static/*") tpl, err := template.ParseFS(staticFiles, "static/*")
if err != nil { if err != nil {
log.Fatalf("Error parsing templates: %v", err) log.Fatal("parsing templates error", zap.Error(err))
} }
router.SetHTMLTemplate(tpl) router.SetHTMLTemplate(tpl)
@ -69,7 +80,7 @@ func (s *Server) Run() {
go func() { go func() {
if err := router.Run(s.addr); err != nil { if err := router.Run(s.addr); err != nil {
log.Fatalf("Error starting server: %v", err) log.Fatal("starting server error", zap.Error(err))
} }
}() }()
} }
@ -86,7 +97,7 @@ func (s *Server) handleWebSocket(c *gin.Context) {
id, err := uuid.NewV7() id, err := uuid.NewV7()
if err != nil { if err != nil {
log.Printf("generating uuid error: %v", err) log.Error("generating uuid error", zap.Error(err))
return return
} }
client := &Client{id: id.String(), conn: ws, hasSentSize: 0} client := &Client{id: id.String(), conn: ws, hasSentSize: 0}
@ -102,7 +113,7 @@ func (s *Server) handleWebSocket(c *gin.Context) {
delete(s.clients, client.id) delete(s.clients, client.id)
s.mu.Unlock() s.mu.Unlock()
if r := recover(); r != nil { if r := recover(); r != nil {
log.Printf("webSocket handler panic: %v", r) log.Panic("webSocket handler panic", zap.Any("recover", r))
} }
}() }()
@ -126,7 +137,7 @@ func (s *Server) handleWebSocket(c *gin.Context) {
client.mu.Unlock() client.mu.Unlock()
if err != nil { if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("websocket error: %v", err) log.Error("websocket error", zap.Error(err))
} }
break break
} }
@ -136,8 +147,8 @@ func (s *Server) handleWebSocket(c *gin.Context) {
func AuthMiddleware() gin.HandlerFunc { func AuthMiddleware() gin.HandlerFunc {
return func(c *gin.Context) { return func(c *gin.Context) {
if config.GlobalConfig.Server.Token == "" || if config.Server.Token == "" ||
c.Query("token") == config.GlobalConfig.Server.Token { c.Query("token") == config.Server.Token {
c.Next() c.Next()
} else { } else {
c.AbortWithStatus(http.StatusUnauthorized) c.AbortWithStatus(http.StatusUnauthorized)
@ -150,7 +161,7 @@ func (s *Server) Broadcast(obj mywebsocket.Date) {
for _, client := range s.clients { for _, client := range s.clients {
obj.Timestamp = time.Now().UnixMilli() obj.Timestamp = time.Now().UnixMilli()
if err := client.conn.WriteJSON(obj); err != nil { if err := client.conn.WriteJSON(obj); err != nil {
log.Printf("websocket writing message error: %v", err) log.Error("websocket writing message error", zap.Error(err))
} }
} }
s.mu.Unlock() s.mu.Unlock()
@ -161,7 +172,7 @@ func (s *Server) Single(userID string, obj mywebsocket.Date) {
if client, ok := s.clients[userID]; ok { if client, ok := s.clients[userID]; ok {
obj.Timestamp = time.Now().UnixMilli() obj.Timestamp = time.Now().UnixMilli()
if err := client.conn.WriteJSON(obj); err != nil { if err := client.conn.WriteJSON(obj); err != nil {
log.Printf("websocket writing message error: %v", err) log.Error("websocket writing message error", zap.Error(err))
} }
} }
s.mu.Unlock() s.mu.Unlock()

0
server/static/index.html Normal file → Executable file
View File

22
streamer/helper.go Normal file → Executable file
View File

@ -2,11 +2,14 @@ package streamer
import ( import (
"fmt" "fmt"
"live-streamer/config" c "live-streamer/config"
"log" "path/filepath"
"strings"
"go.uber.org/zap"
) )
func buildFFmpegArgs(videoItem config.InputItem) []string { func buildFFmpegArgs(videoItem c.InputItem) []string {
videoPath := videoItem.Path videoPath := videoItem.Path
args := []string{"-re"} args := []string{"-re"}
@ -22,14 +25,21 @@ func buildFFmpegArgs(videoItem config.InputItem) []string {
"-i", videoPath, "-i", videoPath,
) )
for k, v := range config.GlobalConfig.Play { for k, v := range config.Play {
args = append(args, k) args = append(args, k)
if str, ok := v.(string); ok {
filename := strings.TrimSuffix(filepath.Base(videoPath), filepath.Ext(videoPath))
str = strings.ReplaceAll(str, "{{filepath}}", videoPath)
str = strings.ReplaceAll(str, "{{filename}}", filename)
args = append(args, str)
} else {
args = append(args, fmt.Sprint(v)) args = append(args, fmt.Sprint(v))
} }
}
args = append(args, fmt.Sprintf("%s/%s", config.GlobalConfig.Output.RTMPServer, config.GlobalConfig.Output.StreamKey)) args = append(args, fmt.Sprintf("%s/%s", config.Output.RTMPServer, config.Output.StreamKey))
log.Println("ffmpeg args: ", args) log.Debug("build ffmpeg", zap.Strings("args", args))
return args return args
} }

6
streamer/message.go Normal file → Executable file
View File

@ -1,6 +1,8 @@
package streamer package streamer
import "live-streamer/config" import (
c "live-streamer/config"
)
type Message interface { type Message interface {
messageType() string messageType() string
@ -20,7 +22,7 @@ type GetCurrentVideoMessage struct {
Response chan string Response chan string
} }
type GetVideoListMessage struct { type GetVideoListMessage struct {
Response chan []config.InputItem Response chan []c.InputItem
} }
type GetVideoListPathMessage struct { type GetVideoListPathMessage struct {
Response chan []string Response chan []string

73
streamer/streamer.go Normal file → Executable file
View File

@ -5,13 +5,16 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"live-streamer/config" c "live-streamer/config"
"log" "live-streamer/logger"
"os" "os"
"os/exec" "os/exec"
"strings" "strings"
"sync" "sync"
"time" "time"
"go.uber.org/zap"
) )
type Streamer struct { type Streamer struct {
@ -23,10 +26,11 @@ type Streamer struct {
outputReq chan chan string // address output concurrency security issue outputReq chan chan string // address output concurrency security issue
wg sync.WaitGroup // wait all handlers(except closehandler) to finish before closure wg sync.WaitGroup // wait all handlers(except closehandler) to finish before closure
close chan any
} }
type streamerState struct { type streamerState struct {
videoList []config.InputItem videoList []c.InputItem
currentVideoIndex int currentVideoIndex int
manualControl bool manualControl bool
cmd *exec.Cmd cmd *exec.Cmd
@ -37,7 +41,17 @@ type streamerState struct {
var GlobalStreamer *Streamer var GlobalStreamer *Streamer
func NewStreamer(videoList []config.InputItem) *Streamer { var (
config *c.Config
log *zap.Logger
)
func init() {
config = c.GlobalConfig
log = logger.GlobalLogger
}
func NewStreamer(videoList []c.InputItem) *Streamer {
s := &Streamer{ s := &Streamer{
mailbox: make(chan Message, 100), mailbox: make(chan Message, 100),
state: &streamerState{ state: &streamerState{
@ -46,6 +60,7 @@ func NewStreamer(videoList []config.InputItem) *Streamer {
output: strings.Builder{}, output: strings.Builder{},
outputQueue: make(chan string, 100), outputQueue: make(chan string, 100),
outputReq: make(chan chan string), outputReq: make(chan chan string),
close: make(chan any),
} }
GlobalStreamer = s GlobalStreamer = s
go s.actorLoop() go s.actorLoop()
@ -54,8 +69,12 @@ func NewStreamer(videoList []config.InputItem) *Streamer {
} }
func (s *Streamer) actorLoop() { func (s *Streamer) actorLoop() {
for msg := range s.mailbox { for {
if msg.messageType() != CloseMessage.messageType(CloseMessage{}) { select {
case <-s.close:
return
case msg := <-s.mailbox:
if _, ok := msg.(CloseMessage); !ok {
s.wg.Add(1) s.wg.Add(1)
s.handleMessage(msg) s.handleMessage(msg)
s.wg.Done() s.wg.Done()
@ -64,6 +83,7 @@ func (s *Streamer) actorLoop() {
} }
} }
} }
}
func (s *Streamer) handleMessage(msg Message) { func (s *Streamer) handleMessage(msg Message) {
switch m := msg.(type) { switch m := msg.(type) {
@ -105,43 +125,46 @@ func (s *Streamer) handleStart() {
s.state.cmd = exec.CommandContext(s.state.ctx, "ffmpeg", buildFFmpegArgs(currentVideo)...) s.state.cmd = exec.CommandContext(s.state.ctx, "ffmpeg", buildFFmpegArgs(currentVideo)...)
s.state.waitDone = make(chan any) s.state.waitDone = make(chan any)
s.writeOutput(fmt.Sprintln("start stream: ", videoPath))
pipe, err := s.state.cmd.StderrPipe() // ffmpeg send all messages to stderr pipe, err := s.state.cmd.StderrPipe() // ffmpeg send all messages to stderr
if err != nil { if err != nil {
log.Printf("failed to get pipe: %v", err) log.Error("failed to get pipe", zap.Error(err))
return return
} }
reader := bufio.NewReader(pipe) reader := bufio.NewReader(pipe)
log.Info("start stream", zap.String("path", videoPath))
s.writeOutput(fmt.Sprintln("start stream: ", videoPath))
if err := s.state.cmd.Start(); err != nil { if err := s.state.cmd.Start(); err != nil {
s.writeOutput(fmt.Sprintf("starting ffmpeg error: %v\n", err)) s.writeOutput(fmt.Sprintf("starting ffmpeg error: %v\n", err))
return return
} }
go s.log(reader)
go func() { go func() {
log.Debug("wait stream end", zap.String("path", videoPath))
_ = s.state.cmd.Wait() _ = s.state.cmd.Wait()
log.Debug("process stop", zap.String("path", videoPath))
s.state.cancel() s.state.cancel()
log.Debug("context cancel", zap.String("path", videoPath))
s.writeOutput(fmt.Sprintf("stop stream: %s\n", videoPath)) s.writeOutput(fmt.Sprintf("stop stream: %s\n", videoPath))
if !s.state.manualControl { if !s.state.manualControl {
log.Println("ready to stream next video") log.Debug("video end", zap.String("path", videoPath))
s.state.currentVideoIndex++ s.state.currentVideoIndex++
if s.state.currentVideoIndex >= len(s.state.videoList) { if s.state.currentVideoIndex >= len(s.state.videoList) {
s.state.currentVideoIndex = 0 s.state.currentVideoIndex = 0
} }
s.mailbox <- StartMessage{} s.mailbox <- StartMessage{}
} else { } else {
log.Println("manually control") log.Debug("manually end", zap.String("path", videoPath))
s.state.manualControl = false s.state.manualControl = false
} }
close(s.state.waitDone) close(s.state.waitDone)
}() }()
go s.log(reader)
} }
func (s *Streamer) handleStop() { func (s *Streamer) handleStop() {
@ -149,18 +172,20 @@ func (s *Streamer) handleStop() {
return return
} }
log.Println("wait context to be cancelled") videoPath := s.state.videoList[s.state.currentVideoIndex].Path
log.Debug("wait context to be cancelled", zap.String("path", videoPath))
s.state.cancel() s.state.cancel()
log.Println("context has been cancelled") log.Debug("context has been cancelled", zap.String("path", videoPath))
if s.state.cmd.Process != nil { if s.state.cmd.Process != nil {
log.Println("wait ffmpeg process stop") log.Debug("wait ffmpeg process stop", zap.String("path", videoPath))
select { select {
case <-s.state.waitDone: case <-s.state.waitDone:
case <-time.After(3 * time.Second): case <-time.After(3 * time.Second):
_ = s.state.cmd.Process.Kill() _ = s.state.cmd.Process.Kill()
} }
log.Println("ffmpeg process has stopped") log.Debug("ffmpeg process has stopped", zap.String("path", videoPath))
} }
s.state.cancel = nil s.state.cancel = nil
@ -168,7 +193,7 @@ func (s *Streamer) handleStop() {
} }
func (s *Streamer) handleAdd(path string) { func (s *Streamer) handleAdd(path string) {
s.state.videoList = append(s.state.videoList, config.InputItem{Path: path}) s.state.videoList = append(s.state.videoList, c.InputItem{Path: path})
} }
func (s *Streamer) handleRemove(path string) { func (s *Streamer) handleRemove(path string) {
@ -235,7 +260,7 @@ func (s *Streamer) handleGetCurrentVideo(response chan string) {
response <- s.state.videoList[s.state.currentVideoIndex].Path response <- s.state.videoList[s.state.currentVideoIndex].Path
} }
func (s *Streamer) handleGetVideoList(response chan []config.InputItem) { func (s *Streamer) handleGetVideoList(response chan []c.InputItem) {
response <- s.state.videoList response <- s.state.videoList
} }
@ -252,6 +277,8 @@ func (s *Streamer) handleGetCurrentIndex(response chan int) {
} }
func (s *Streamer) handleClose() { func (s *Streamer) handleClose() {
close(s.close)
s.handleStop()
s.wg.Wait() s.wg.Wait()
os.Exit(0) os.Exit(0)
} }
@ -287,8 +314,8 @@ func (s *Streamer) GetCurrentVideoPath() string {
return <-response return <-response
} }
func (s *Streamer) GetVideoList() []config.InputItem { func (s *Streamer) GetVideoList() []c.InputItem {
response := make(chan []config.InputItem) response := make(chan []c.InputItem)
s.mailbox <- GetVideoListMessage{Response: response} s.mailbox <- GetVideoListMessage{Response: response}
return <-response return <-response
} }
@ -306,7 +333,6 @@ func (s *Streamer) GetCurrentIndex() int {
} }
func (s *Streamer) Close() { func (s *Streamer) Close() {
s.mailbox <- StopMessage{}
s.mailbox <- CloseMessage{} s.mailbox <- CloseMessage{}
} }
@ -335,13 +361,14 @@ func (s *Streamer) log(reader *bufio.Reader) {
case <-s.state.ctx.Done(): case <-s.state.ctx.Done():
return return
default: default:
if !config.GlobalConfig.Log.PlayState { if !config.Log.PlayState {
return return
} }
buf := make([]byte, 1024) buf := make([]byte, 1024)
for { for {
n, err := reader.Read(buf) n, err := reader.Read(buf)
if n > 0 { if n > 0 {
log.Debug("ffmpeg output", zap.String("msg", strings.TrimSpace(string(buf[:n]))))
s.writeOutput(string(buf[:n])) s.writeOutput(string(buf[:n]))
} }
if err != nil { if err != nil {

0
utils/has_ffmpeg.go Normal file → Executable file
View File

0
utils/is_supported_video.go Normal file → Executable file
View File

0
websocket/websocket.go Normal file → Executable file
View File