From d991c7cac6bd80a8a14e547165c9cd55a709e708 Mon Sep 17 00:00:00 2001 From: nite07 Date: Tue, 29 Oct 2024 17:11:14 +0800 Subject: [PATCH] adjust: code structure --- Dockerfile | 15 +++- config/config.go | 53 ++++++------- logger/logger.go | 15 +--- main.go | 30 +++++-- model/videoItem.go | 8 ++ server/server.go | 75 +++++++++--------- server/static/index.html | 37 ++++++++- {websocket => server}/websocket.go | 19 +---- streamer/helper.go | 45 ----------- streamer/message.go | 6 +- streamer/streamer.go | 121 ++++++++++++++++++++--------- utils/has_ffmpeg.go | 2 +- 12 files changed, 234 insertions(+), 192 deletions(-) create mode 100644 model/videoItem.go rename {websocket => server}/websocket.go (58%) delete mode 100755 streamer/helper.go diff --git a/Dockerfile b/Dockerfile index 994a799..d1f8620 100755 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.23 as builder +FROM golang:1.23 AS builder LABEL authors="nite07" WORKDIR /app @@ -8,6 +8,19 @@ ARG version RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w -X live-streamer/constant.Version=${version}" -o live-streamer . FROM alpine:latest +WORKDIR /app + +RUN apk update && \ + apk add --no-cache \ + ffmpeg \ + fontconfig \ + ttf-dejavu \ + ttf-liberation \ + font-noto \ + font-noto-emoji \ + wqy-zenhei \ + && fc-cache -f + COPY --from=builder /app/live-streamer /app/live-streamer EXPOSE 8080 VOLUME [ "/app/config.json" ] diff --git a/config/config.go b/config/config.go index f2d01a5..e97dc02 100755 --- a/config/config.go +++ b/config/config.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "live-streamer/model" "live-streamer/utils" "log" "os" @@ -16,16 +17,9 @@ type OutputConfig struct { StreamKey string `json:"stream_key"` } -type InputItem struct { - Path string `json:"path"` - Start string `json:"start"` - End string `json:"end"` - ItemType string `json:"-"` -} - type LogConfig struct { - Level string `json:"level"` - PlayState bool `json:"play_state"` + Level string `json:"level"` + ShowFFmpegOutput bool `json:"show_ffmpeg_output"` } type ServerConfig struct { @@ -34,23 +28,20 @@ type ServerConfig struct { } type Config struct { - Input []any `json:"input"` - InputItems []InputItem `json:"-"` // contains video file or dir - VideoList []InputItem `json:"-"` // only contains video file - Play map[string]any `json:"play"` - Output OutputConfig `json:"output"` - Log LogConfig `json:"log"` - Server ServerConfig `json:"server"` + Input []any `json:"input"` + InputItems []model.VideoItem `json:"-"` // contains video file or dir + VideoList []model.VideoItem `json:"-"` // only contains video file + Play map[string]any `json:"play"` + Output OutputConfig `json:"output"` + Log LogConfig `json:"log"` + Server ServerConfig `json:"server"` } var GlobalConfig *Config func init() { GlobalConfig = &Config{} - err := readConfig("config.json") - if len(GlobalConfig.Input) == 0 { - log.Fatal("No input video found") - } + err := readConfig("./config.json") if err != nil { if os.IsNotExist(err) { log.Fatal("Config not exists") @@ -58,6 +49,10 @@ func init() { log.Fatal(err) } } + + if len(GlobalConfig.Input) == 0 { + log.Fatal("no input video found") + } } func readConfig(configPath string) error { @@ -70,9 +65,9 @@ func readConfig(configPath string) error { } databytes, err := os.ReadFile(configPath) if err != nil { - return fmt.Errorf("Config read failed: %v", err) + return fmt.Errorf("config read failed: %v", err) } - if err = json.Unmarshal(databytes, &GlobalConfig); err != nil { + if err = json.Unmarshal(databytes, GlobalConfig); err != nil { return fmt.Errorf("config unmarshal failed: %v", err) } err = validateConfig() @@ -107,15 +102,15 @@ func validateInputConfig() error { return errors.New("video_path is nil") } - GlobalConfig.InputItems = make([]InputItem, 0, len(GlobalConfig.Input)) - GlobalConfig.VideoList = []InputItem{} + GlobalConfig.InputItems = make([]model.VideoItem, 0, len(GlobalConfig.Input)) + GlobalConfig.VideoList = []model.VideoItem{} for i, item := range GlobalConfig.Input { - var inputItem InputItem + var inputItem model.VideoItem switch v := item.(type) { case string: - inputItem = InputItem{Path: v} + inputItem = model.VideoItem{Path: v} case map[string]any: data, err := json.Marshal(v) if err != nil { @@ -192,14 +187,14 @@ func validateServerConfig() error { return nil } -func getAllVideos(dirPath string) ([]InputItem, error) { - res := []InputItem{} +func getAllVideos(dirPath string) ([]model.VideoItem, error) { + res := []model.VideoItem{} err := filepath.Walk(dirPath, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.IsDir() && utils.IsSupportedVideo(path) { - res = append(res, InputItem{Path: path}) + res = append(res, model.VideoItem{Path: path}) } return nil }) diff --git a/logger/logger.go b/logger/logger.go index 84f8708..9fa7262 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -1,7 +1,6 @@ package logger import ( - c "live-streamer/config" "os" "strings" @@ -9,16 +8,9 @@ import ( "go.uber.org/zap/zapcore" ) -var ( - GlobalLogger *zap.Logger - config *c.Config -) - -func init() { - config = c.GlobalConfig - +func NewLogger(level string) *zap.Logger { var logLevel zapcore.Level - switch strings.ToLower(config.Log.Level) { + switch strings.ToLower(level) { case "info": logLevel = zap.InfoLevel case "error": @@ -37,5 +29,6 @@ func init() { consoleWriter := zapcore.AddSync(os.Stdout) consoleCore := zapcore.NewCore(zapcore.NewConsoleEncoder(encoderConfig), consoleWriter, logLevel) - GlobalLogger = zap.New(consoleCore) + logger := zap.New(consoleCore) + return logger } diff --git a/main.go b/main.go index ec91297..f4e175e 100755 --- a/main.go +++ b/main.go @@ -5,12 +5,10 @@ import ( "fmt" c "live-streamer/config" - "live-streamer/constant" "live-streamer/logger" "live-streamer/server" "live-streamer/streamer" "live-streamer/utils" - "live-streamer/websocket" "os" @@ -26,24 +24,40 @@ var ( func init() { config = c.GlobalConfig - log = logger.GlobalLogger } func main() { - fmt.Println("Version: " + constant.Version) + // new logger + log = logger.NewLogger(config.Log.Level) - if !utils.HasFFMPEG() { + log.Debug("config", zap.Reflect("config", config)) + + // ffmpeg exist + if !utils.HasFFmpeg() { log.Fatal("ffmpeg not found") } - server.NewServer(config.Server.Addr, websocket.RequestHandler) + // new streamer + var err error + GlobalStreamer, err = streamer.NewStreamer(config.Output.RTMPServer, config.Output.StreamKey, config.VideoList, log, &streamer.Option{ + FFmpegArgs: config.Play, + ShowFFmpegOutput: config.Log.ShowFFmpegOutput, + }) + if err != nil { + log.Fatal("new streamer error", zap.Error(err)) + } + + // new server + server.NewServer(config.Server.Addr, GlobalStreamer, log, &server.Option{ + AuthToken: config.Server.Token, + }) server.GlobalServer.Run() - GlobalStreamer = streamer.NewStreamer(config.VideoList) - + // coroutine go startWatcher() go inputHandler() + // start streamer GlobalStreamer.Start() select {} } diff --git a/model/videoItem.go b/model/videoItem.go new file mode 100644 index 0000000..0cc63c8 --- /dev/null +++ b/model/videoItem.go @@ -0,0 +1,8 @@ +package model + +type VideoItem struct { + Path string `json:"path"` + Start string `json:"start"` + End string `json:"end"` + ItemType string `json:"-"` +} diff --git a/server/server.go b/server/server.go index 5dc2c17..a79fe90 100755 --- a/server/server.go +++ b/server/server.go @@ -3,10 +3,7 @@ package server import ( "embed" "html/template" - c "live-streamer/config" - "live-streamer/logger" "live-streamer/streamer" - mywebsocket "live-streamer/websocket" "net/http" "sync" "time" @@ -26,13 +23,17 @@ var upgrader = websocket.Upgrader{ }, } -type InputFunc func(mywebsocket.RequestType) - type Server struct { - addr string - dealInputFunc InputFunc - clients map[string]*Client - mu sync.Mutex + addr string + clients map[string]*Client + mu sync.Mutex + option *Option + log *zap.Logger + streamer *streamer.Streamer +} + +type Option struct { + AuthToken string } type Client struct { @@ -45,20 +46,16 @@ type Client struct { var ( GlobalServer *Server - config *c.Config - log *zap.Logger + log *zap.Logger ) -func init() { - config = c.GlobalConfig - log = logger.GlobalLogger -} - -func NewServer(addr string, dealInputFunc InputFunc) { +func NewServer(addr string, streamer *streamer.Streamer, log *zap.Logger, option *Option) { GlobalServer = &Server{ - addr: addr, - dealInputFunc: dealInputFunc, - clients: make(map[string]*Client), + addr: addr, + option: option, + clients: make(map[string]*Client), + log: log, + streamer: streamer, } } @@ -71,7 +68,7 @@ func (s *Server) Run() { } router.SetHTMLTemplate(tpl) - router.GET("/ws", AuthMiddleware(), s.handleWebSocket) + router.GET("/ws", s.AuthMiddleware(), s.handleWebSocket) router.GET( "/", func(c *gin.Context) { c.HTML(200, "index.html", nil) @@ -120,11 +117,11 @@ func (s *Server) handleWebSocket(c *gin.Context) { go func() { ticker := time.NewTicker(1 * time.Second) for range ticker.C { - s.Broadcast(mywebsocket.Date{ + s.Broadcast(Data{ Timestamp: time.Now().UnixMilli(), - CurrentVideoPath: streamer.GlobalStreamer.GetCurrentVideoPath(), - VideoList: streamer.GlobalStreamer.GetVideoListPath(), - Output: streamer.GlobalStreamer.GetOutput(), + CurrentVideoPath: s.streamer.GetCurrentVideoPath(), + VideoList: s.streamer.GetVideoListPath(), + Output: s.streamer.GetOutput(), }) } }() @@ -132,23 +129,20 @@ func (s *Server) handleWebSocket(c *gin.Context) { for { // recive message client.mu.Lock() - msg := mywebsocket.Request{} + msg := Request{} err := ws.ReadJSON(&msg) client.mu.Unlock() if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - log.Error("websocket error", zap.Error(err)) - } break } - s.dealInputFunc(msg.Type) + s.RequestHandler(msg.Type) } } -func AuthMiddleware() gin.HandlerFunc { +func (s *Server) AuthMiddleware() gin.HandlerFunc { return func(c *gin.Context) { - if config.Server.Token == "" || - c.Query("token") == config.Server.Token { + if s.option.AuthToken == "" || + c.Query("token") == s.option.AuthToken { c.Next() } else { c.AbortWithStatus(http.StatusUnauthorized) @@ -156,7 +150,7 @@ func AuthMiddleware() gin.HandlerFunc { } } -func (s *Server) Broadcast(obj mywebsocket.Date) { +func (s *Server) Broadcast(obj Data) { s.mu.Lock() for _, client := range s.clients { obj.Timestamp = time.Now().UnixMilli() @@ -167,7 +161,7 @@ func (s *Server) Broadcast(obj mywebsocket.Date) { s.mu.Unlock() } -func (s *Server) Single(userID string, obj mywebsocket.Date) { +func (s *Server) Single(userID string, obj Data) { s.mu.Lock() if client, ok := s.clients[userID]; ok { obj.Timestamp = time.Now().UnixMilli() @@ -178,6 +172,13 @@ func (s *Server) Single(userID string, obj mywebsocket.Date) { s.mu.Unlock() } -func (s *Server) Close() { - +func (s *Server) RequestHandler(reqType RequestType) { + switch reqType { + case TypeStreamNextVideo: + s.streamer.Next() + case TypeStreamPrevVideo: + s.streamer.Prev() + case TypeQuit: + s.streamer.Close() + } } diff --git a/server/static/index.html b/server/static/index.html index 54ef269..2e7e6b3 100755 --- a/server/static/index.html +++ b/server/static/index.html @@ -368,8 +368,31 @@