adjust: code structure

This commit is contained in:
Nite07 2024-10-29 17:11:14 +08:00
parent dfdb6003ea
commit d991c7cac6
12 changed files with 234 additions and 192 deletions

View File

@ -1,4 +1,4 @@
FROM golang:1.23 as builder
FROM golang:1.23 AS builder
LABEL authors="nite07"
WORKDIR /app
@ -8,6 +8,19 @@ ARG version
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w -X live-streamer/constant.Version=${version}" -o live-streamer .
FROM alpine:latest
WORKDIR /app
RUN apk update && \
apk add --no-cache \
ffmpeg \
fontconfig \
ttf-dejavu \
ttf-liberation \
font-noto \
font-noto-emoji \
wqy-zenhei \
&& fc-cache -f
COPY --from=builder /app/live-streamer /app/live-streamer
EXPOSE 8080
VOLUME [ "/app/config.json" ]

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"live-streamer/model"
"live-streamer/utils"
"log"
"os"
@ -16,16 +17,9 @@ type OutputConfig struct {
StreamKey string `json:"stream_key"`
}
type InputItem struct {
Path string `json:"path"`
Start string `json:"start"`
End string `json:"end"`
ItemType string `json:"-"`
}
type LogConfig struct {
Level string `json:"level"`
PlayState bool `json:"play_state"`
Level string `json:"level"`
ShowFFmpegOutput bool `json:"show_ffmpeg_output"`
}
type ServerConfig struct {
@ -34,23 +28,20 @@ type ServerConfig struct {
}
type Config struct {
Input []any `json:"input"`
InputItems []InputItem `json:"-"` // contains video file or dir
VideoList []InputItem `json:"-"` // only contains video file
Play map[string]any `json:"play"`
Output OutputConfig `json:"output"`
Log LogConfig `json:"log"`
Server ServerConfig `json:"server"`
Input []any `json:"input"`
InputItems []model.VideoItem `json:"-"` // contains video file or dir
VideoList []model.VideoItem `json:"-"` // only contains video file
Play map[string]any `json:"play"`
Output OutputConfig `json:"output"`
Log LogConfig `json:"log"`
Server ServerConfig `json:"server"`
}
var GlobalConfig *Config
func init() {
GlobalConfig = &Config{}
err := readConfig("config.json")
if len(GlobalConfig.Input) == 0 {
log.Fatal("No input video found")
}
err := readConfig("./config.json")
if err != nil {
if os.IsNotExist(err) {
log.Fatal("Config not exists")
@ -58,6 +49,10 @@ func init() {
log.Fatal(err)
}
}
if len(GlobalConfig.Input) == 0 {
log.Fatal("no input video found")
}
}
func readConfig(configPath string) error {
@ -70,9 +65,9 @@ func readConfig(configPath string) error {
}
databytes, err := os.ReadFile(configPath)
if err != nil {
return fmt.Errorf("Config read failed: %v", err)
return fmt.Errorf("config read failed: %v", err)
}
if err = json.Unmarshal(databytes, &GlobalConfig); err != nil {
if err = json.Unmarshal(databytes, GlobalConfig); err != nil {
return fmt.Errorf("config unmarshal failed: %v", err)
}
err = validateConfig()
@ -107,15 +102,15 @@ func validateInputConfig() error {
return errors.New("video_path is nil")
}
GlobalConfig.InputItems = make([]InputItem, 0, len(GlobalConfig.Input))
GlobalConfig.VideoList = []InputItem{}
GlobalConfig.InputItems = make([]model.VideoItem, 0, len(GlobalConfig.Input))
GlobalConfig.VideoList = []model.VideoItem{}
for i, item := range GlobalConfig.Input {
var inputItem InputItem
var inputItem model.VideoItem
switch v := item.(type) {
case string:
inputItem = InputItem{Path: v}
inputItem = model.VideoItem{Path: v}
case map[string]any:
data, err := json.Marshal(v)
if err != nil {
@ -192,14 +187,14 @@ func validateServerConfig() error {
return nil
}
func getAllVideos(dirPath string) ([]InputItem, error) {
res := []InputItem{}
func getAllVideos(dirPath string) ([]model.VideoItem, error) {
res := []model.VideoItem{}
err := filepath.Walk(dirPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() && utils.IsSupportedVideo(path) {
res = append(res, InputItem{Path: path})
res = append(res, model.VideoItem{Path: path})
}
return nil
})

View File

@ -1,7 +1,6 @@
package logger
import (
c "live-streamer/config"
"os"
"strings"
@ -9,16 +8,9 @@ import (
"go.uber.org/zap/zapcore"
)
var (
GlobalLogger *zap.Logger
config *c.Config
)
func init() {
config = c.GlobalConfig
func NewLogger(level string) *zap.Logger {
var logLevel zapcore.Level
switch strings.ToLower(config.Log.Level) {
switch strings.ToLower(level) {
case "info":
logLevel = zap.InfoLevel
case "error":
@ -37,5 +29,6 @@ func init() {
consoleWriter := zapcore.AddSync(os.Stdout)
consoleCore := zapcore.NewCore(zapcore.NewConsoleEncoder(encoderConfig), consoleWriter, logLevel)
GlobalLogger = zap.New(consoleCore)
logger := zap.New(consoleCore)
return logger
}

30
main.go
View File

@ -5,12 +5,10 @@ import (
"fmt"
c "live-streamer/config"
"live-streamer/constant"
"live-streamer/logger"
"live-streamer/server"
"live-streamer/streamer"
"live-streamer/utils"
"live-streamer/websocket"
"os"
@ -26,24 +24,40 @@ var (
func init() {
config = c.GlobalConfig
log = logger.GlobalLogger
}
func main() {
fmt.Println("Version: " + constant.Version)
// new logger
log = logger.NewLogger(config.Log.Level)
if !utils.HasFFMPEG() {
log.Debug("config", zap.Reflect("config", config))
// ffmpeg exist
if !utils.HasFFmpeg() {
log.Fatal("ffmpeg not found")
}
server.NewServer(config.Server.Addr, websocket.RequestHandler)
// new streamer
var err error
GlobalStreamer, err = streamer.NewStreamer(config.Output.RTMPServer, config.Output.StreamKey, config.VideoList, log, &streamer.Option{
FFmpegArgs: config.Play,
ShowFFmpegOutput: config.Log.ShowFFmpegOutput,
})
if err != nil {
log.Fatal("new streamer error", zap.Error(err))
}
// new server
server.NewServer(config.Server.Addr, GlobalStreamer, log, &server.Option{
AuthToken: config.Server.Token,
})
server.GlobalServer.Run()
GlobalStreamer = streamer.NewStreamer(config.VideoList)
// coroutine
go startWatcher()
go inputHandler()
// start streamer
GlobalStreamer.Start()
select {}
}

8
model/videoItem.go Normal file
View File

@ -0,0 +1,8 @@
package model
type VideoItem struct {
Path string `json:"path"`
Start string `json:"start"`
End string `json:"end"`
ItemType string `json:"-"`
}

View File

@ -3,10 +3,7 @@ package server
import (
"embed"
"html/template"
c "live-streamer/config"
"live-streamer/logger"
"live-streamer/streamer"
mywebsocket "live-streamer/websocket"
"net/http"
"sync"
"time"
@ -26,13 +23,17 @@ var upgrader = websocket.Upgrader{
},
}
type InputFunc func(mywebsocket.RequestType)
type Server struct {
addr string
dealInputFunc InputFunc
clients map[string]*Client
mu sync.Mutex
addr string
clients map[string]*Client
mu sync.Mutex
option *Option
log *zap.Logger
streamer *streamer.Streamer
}
type Option struct {
AuthToken string
}
type Client struct {
@ -45,20 +46,16 @@ type Client struct {
var (
GlobalServer *Server
config *c.Config
log *zap.Logger
log *zap.Logger
)
func init() {
config = c.GlobalConfig
log = logger.GlobalLogger
}
func NewServer(addr string, dealInputFunc InputFunc) {
func NewServer(addr string, streamer *streamer.Streamer, log *zap.Logger, option *Option) {
GlobalServer = &Server{
addr: addr,
dealInputFunc: dealInputFunc,
clients: make(map[string]*Client),
addr: addr,
option: option,
clients: make(map[string]*Client),
log: log,
streamer: streamer,
}
}
@ -71,7 +68,7 @@ func (s *Server) Run() {
}
router.SetHTMLTemplate(tpl)
router.GET("/ws", AuthMiddleware(), s.handleWebSocket)
router.GET("/ws", s.AuthMiddleware(), s.handleWebSocket)
router.GET(
"/", func(c *gin.Context) {
c.HTML(200, "index.html", nil)
@ -120,11 +117,11 @@ func (s *Server) handleWebSocket(c *gin.Context) {
go func() {
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
s.Broadcast(mywebsocket.Date{
s.Broadcast(Data{
Timestamp: time.Now().UnixMilli(),
CurrentVideoPath: streamer.GlobalStreamer.GetCurrentVideoPath(),
VideoList: streamer.GlobalStreamer.GetVideoListPath(),
Output: streamer.GlobalStreamer.GetOutput(),
CurrentVideoPath: s.streamer.GetCurrentVideoPath(),
VideoList: s.streamer.GetVideoListPath(),
Output: s.streamer.GetOutput(),
})
}
}()
@ -132,23 +129,20 @@ func (s *Server) handleWebSocket(c *gin.Context) {
for {
// recive message
client.mu.Lock()
msg := mywebsocket.Request{}
msg := Request{}
err := ws.ReadJSON(&msg)
client.mu.Unlock()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Error("websocket error", zap.Error(err))
}
break
}
s.dealInputFunc(msg.Type)
s.RequestHandler(msg.Type)
}
}
func AuthMiddleware() gin.HandlerFunc {
func (s *Server) AuthMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
if config.Server.Token == "" ||
c.Query("token") == config.Server.Token {
if s.option.AuthToken == "" ||
c.Query("token") == s.option.AuthToken {
c.Next()
} else {
c.AbortWithStatus(http.StatusUnauthorized)
@ -156,7 +150,7 @@ func AuthMiddleware() gin.HandlerFunc {
}
}
func (s *Server) Broadcast(obj mywebsocket.Date) {
func (s *Server) Broadcast(obj Data) {
s.mu.Lock()
for _, client := range s.clients {
obj.Timestamp = time.Now().UnixMilli()
@ -167,7 +161,7 @@ func (s *Server) Broadcast(obj mywebsocket.Date) {
s.mu.Unlock()
}
func (s *Server) Single(userID string, obj mywebsocket.Date) {
func (s *Server) Single(userID string, obj Data) {
s.mu.Lock()
if client, ok := s.clients[userID]; ok {
obj.Timestamp = time.Now().UnixMilli()
@ -178,6 +172,13 @@ func (s *Server) Single(userID string, obj mywebsocket.Date) {
s.mu.Unlock()
}
func (s *Server) Close() {
func (s *Server) RequestHandler(reqType RequestType) {
switch reqType {
case TypeStreamNextVideo:
s.streamer.Next()
case TypeStreamPrevVideo:
s.streamer.Prev()
case TypeQuit:
s.streamer.Close()
}
}

View File

@ -368,8 +368,31 @@
<script>
let ws;
let shouldAutoScroll = true;
let isConnecting = false;
let reconnectTimer = null;
function connectWebSocket() {
if (isConnecting || (ws && ws.readyState === WebSocket.CONNECTING)) {
console.log("WebSocket connection already in progress");
return;
}
if (ws && ws.readyState === WebSocket.OPEN) {
console.log("WebSocket already connected");
return;
}
if (ws) {
ws.close();
ws = null;
}
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
isConnecting = true;
const token = document.getElementById("token-input").value;
const wsProtocol =
window.location.protocol === "https:" ? "wss:" : "ws:";
@ -378,6 +401,7 @@
ws.onopen = function () {
console.log("Connected to WebSocket");
isConnecting = false;
setStoredToken(token);
document.getElementById("token-screen").style.display = "none";
document.querySelector(".container-fluid").style.display = "flex";
@ -404,15 +428,21 @@
};
ws.onerror = function () {
isConnecting = false;
document.getElementById("token-error").style.display = "block";
};
ws.onclose = function () {
isConnecting = false;
console.log("Disconnected from WebSocket");
document.getElementById("status").textContent =
"WebSocket Status: Disconnected";
document.getElementById("status").classList.remove("connected");
setTimeout(connectWebSocket, 3000);
if (reconnectTimer) {
clearTimeout(reconnectTimer);
}
reconnectTimer = setTimeout(connectWebSocket, 3000);
};
}
@ -480,6 +510,11 @@
sendWs("Quit");
if (ws) {
ws.close();
ws = null;
}
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
}
};

View File

@ -1,8 +1,4 @@
package websocket
import (
"live-streamer/streamer"
)
package server
type RequestType string
@ -16,20 +12,9 @@ type Request struct {
Type RequestType `json:"type"`
}
type Date struct {
type Data struct {
Timestamp int64 `json:"timestamp"`
CurrentVideoPath string `json:"currentVideoPath"`
VideoList []string `json:"videoList"`
Output string `json:"output"`
}
func RequestHandler(reqType RequestType) {
switch reqType {
case TypeStreamNextVideo:
streamer.GlobalStreamer.Next()
case TypeStreamPrevVideo:
streamer.GlobalStreamer.Prev()
case TypeQuit:
streamer.GlobalStreamer.Close()
}
}

View File

@ -1,45 +0,0 @@
package streamer
import (
"fmt"
c "live-streamer/config"
"path/filepath"
"strings"
"go.uber.org/zap"
)
func buildFFmpegArgs(videoItem c.InputItem) []string {
videoPath := videoItem.Path
args := []string{"-re"}
if videoItem.Start != "" {
args = append(args, "-ss", videoItem.Start)
}
if videoItem.End != "" {
args = append(args, "-to", videoItem.End)
}
args = append(args,
"-i", videoPath,
)
for k, v := range config.Play {
args = append(args, k)
if str, ok := v.(string); ok {
filename := strings.TrimSuffix(filepath.Base(videoPath), filepath.Ext(videoPath))
str = strings.ReplaceAll(str, "{{filepath}}", videoPath)
str = strings.ReplaceAll(str, "{{filename}}", filename)
args = append(args, str)
} else {
args = append(args, fmt.Sprint(v))
}
}
args = append(args, fmt.Sprintf("%s/%s", config.Output.RTMPServer, config.Output.StreamKey))
log.Debug("build ffmpeg", zap.Strings("args", args))
return args
}

View File

@ -1,8 +1,6 @@
package streamer
import (
c "live-streamer/config"
)
import "live-streamer/model"
type Message interface {
messageType() string
@ -22,7 +20,7 @@ type GetCurrentVideoMessage struct {
Response chan string
}
type GetVideoListMessage struct {
Response chan []c.InputItem
Response chan []model.VideoItem
}
type GetVideoListPathMessage struct {
Response chan []string

View File

@ -3,10 +3,11 @@ package streamer
import (
"bufio"
"context"
"errors"
"fmt"
"io"
c "live-streamer/config"
"live-streamer/logger"
"live-streamer/model"
"path/filepath"
"os"
"os/exec"
@ -27,10 +28,21 @@ type Streamer struct {
wg sync.WaitGroup // wait all handlers(except closehandler) to finish before closure
close chan any
log *zap.Logger
option *Option
rtmpServer string
streamKey string
}
type Option struct {
FFmpegArgs map[string]any
ShowFFmpegOutput bool
}
type streamerState struct {
videoList []c.InputItem
videoList []model.VideoItem
currentVideoIndex int
manualControl bool
cmd *exec.Cmd
@ -41,17 +53,10 @@ type streamerState struct {
var GlobalStreamer *Streamer
var (
config *c.Config
log *zap.Logger
)
func init() {
config = c.GlobalConfig
log = logger.GlobalLogger
}
func NewStreamer(videoList []c.InputItem) *Streamer {
func NewStreamer(rtmpServer string, streamKey string, videoList []model.VideoItem, log *zap.Logger, option *Option) (*Streamer, error) {
if rtmpServer == "" || streamKey == "" {
return nil, errors.New("lack of args")
}
s := &Streamer{
mailbox: make(chan Message, 100),
state: &streamerState{
@ -61,11 +66,15 @@ func NewStreamer(videoList []c.InputItem) *Streamer {
outputQueue: make(chan string, 100),
outputReq: make(chan chan string),
close: make(chan any),
option: option,
log: log,
rtmpServer: rtmpServer,
streamKey: streamKey,
}
GlobalStreamer = s
go s.actorLoop()
go s.handleOutput()
return s
return s, nil
}
func (s *Streamer) actorLoop() {
@ -122,18 +131,18 @@ func (s *Streamer) handleStart() {
s.state.ctx, s.state.cancel = context.WithCancel(context.Background())
currentVideo := s.state.videoList[s.state.currentVideoIndex]
videoPath := currentVideo.Path
s.state.cmd = exec.CommandContext(s.state.ctx, "ffmpeg", buildFFmpegArgs(currentVideo)...)
s.state.cmd = exec.CommandContext(s.state.ctx, "ffmpeg", s.buildFFmpegArgs(currentVideo)...)
s.state.waitDone = make(chan any)
pipe, err := s.state.cmd.StderrPipe() // ffmpeg send all messages to stderr
if err != nil {
log.Error("failed to get pipe", zap.Error(err))
s.log.Error("failed to get pipe", zap.Error(err))
return
}
reader := bufio.NewReader(pipe)
log.Info("start stream", zap.String("path", videoPath))
s.log.Info("start stream", zap.String("path", videoPath))
s.writeOutput(fmt.Sprintln("start stream: ", videoPath))
if err := s.state.cmd.Start(); err != nil {
s.writeOutput(fmt.Sprintf("starting ffmpeg error: %v\n", err))
@ -141,30 +150,30 @@ func (s *Streamer) handleStart() {
}
go func() {
log.Debug("wait stream end", zap.String("path", videoPath))
s.log.Debug("wait stream end", zap.String("path", videoPath))
_ = s.state.cmd.Wait()
log.Debug("process stop", zap.String("path", videoPath))
s.log.Debug("process stop", zap.String("path", videoPath))
s.state.cancel()
log.Debug("context cancel", zap.String("path", videoPath))
s.log.Debug("context cancel", zap.String("path", videoPath))
s.writeOutput(fmt.Sprintf("stop stream: %s\n", videoPath))
if !s.state.manualControl {
log.Debug("video end", zap.String("path", videoPath))
s.log.Debug("video end", zap.String("path", videoPath))
s.state.currentVideoIndex++
if s.state.currentVideoIndex >= len(s.state.videoList) {
s.state.currentVideoIndex = 0
}
s.mailbox <- StartMessage{}
} else {
log.Debug("manually end", zap.String("path", videoPath))
s.log.Debug("manually end", zap.String("path", videoPath))
s.state.manualControl = false
}
close(s.state.waitDone)
}()
go s.log(reader)
go s.ffmpegLog(reader)
}
func (s *Streamer) handleStop() {
@ -174,18 +183,18 @@ func (s *Streamer) handleStop() {
videoPath := s.state.videoList[s.state.currentVideoIndex].Path
log.Debug("wait context to be cancelled", zap.String("path", videoPath))
s.log.Debug("wait context to be cancelled", zap.String("path", videoPath))
s.state.cancel()
log.Debug("context has been cancelled", zap.String("path", videoPath))
s.log.Debug("context has been cancelled", zap.String("path", videoPath))
if s.state.cmd.Process != nil {
log.Debug("wait ffmpeg process stop", zap.String("path", videoPath))
s.log.Debug("wait ffmpeg process stop", zap.String("path", videoPath))
select {
case <-s.state.waitDone:
case <-time.After(3 * time.Second):
_ = s.state.cmd.Process.Kill()
}
log.Debug("ffmpeg process has stopped", zap.String("path", videoPath))
s.log.Debug("ffmpeg process has stopped", zap.String("path", videoPath))
}
s.state.cancel = nil
@ -193,7 +202,7 @@ func (s *Streamer) handleStop() {
}
func (s *Streamer) handleAdd(path string) {
s.state.videoList = append(s.state.videoList, c.InputItem{Path: path})
s.state.videoList = append(s.state.videoList, model.VideoItem{Path: path})
}
func (s *Streamer) handleRemove(path string) {
@ -260,7 +269,7 @@ func (s *Streamer) handleGetCurrentVideo(response chan string) {
response <- s.state.videoList[s.state.currentVideoIndex].Path
}
func (s *Streamer) handleGetVideoList(response chan []c.InputItem) {
func (s *Streamer) handleGetVideoList(response chan []model.VideoItem) {
response <- s.state.videoList
}
@ -314,8 +323,8 @@ func (s *Streamer) GetCurrentVideoPath() string {
return <-response
}
func (s *Streamer) GetVideoList() []c.InputItem {
response := make(chan []c.InputItem)
func (s *Streamer) GetVideoList() []model.VideoItem {
response := make(chan []model.VideoItem)
s.mailbox <- GetVideoListMessage{Response: response}
return <-response
}
@ -356,20 +365,21 @@ func (s *Streamer) writeOutput(str string) {
s.outputQueue <- str
}
func (s *Streamer) log(reader *bufio.Reader) {
func (s *Streamer) ffmpegLog(reader *bufio.Reader) {
select {
case <-s.state.ctx.Done():
return
default:
if !config.Log.PlayState {
return
}
buf := make([]byte, 1024)
for {
n, err := reader.Read(buf)
if n > 0 {
log.Debug("ffmpeg output", zap.String("msg", strings.TrimSpace(string(buf[:n]))))
s.writeOutput(string(buf[:n]))
if s.option.ShowFFmpegOutput {
s.writeOutput(string(buf[:n]))
}
if s.log.Level() == zap.DebugLevel {
fmt.Print(string(buf[:n]))
}
}
if err != nil {
if err != io.EOF {
@ -380,3 +390,38 @@ func (s *Streamer) log(reader *bufio.Reader) {
}
}
}
func (s *Streamer) buildFFmpegArgs(videoItem model.VideoItem) []string {
videoPath := videoItem.Path
args := []string{"-re"}
if videoItem.Start != "" {
args = append(args, "-ss", videoItem.Start)
}
if videoItem.End != "" {
args = append(args, "-to", videoItem.End)
}
args = append(args,
"-i", videoPath,
)
for k, v := range s.option.FFmpegArgs {
args = append(args, k)
if str, ok := v.(string); ok {
filename := strings.TrimSuffix(filepath.Base(videoPath), filepath.Ext(videoPath))
str = strings.ReplaceAll(str, "{{filepath}}", videoPath)
str = strings.ReplaceAll(str, "{{filename}}", filename)
args = append(args, str)
} else {
args = append(args, fmt.Sprint(v))
}
}
args = append(args, fmt.Sprintf("%s/%s", s.rtmpServer, s.streamKey))
s.log.Debug("build ffmpeg", zap.Strings("args", args))
return args
}

View File

@ -2,7 +2,7 @@ package utils
import "os/exec"
func HasFFMPEG() bool {
func HasFFmpeg() bool {
_, err := exec.LookPath("ffmpeg")
return err == nil
}