commit 60fca41864f58af9a124943c07ed24982ab2d758 Author: nite07 Date: Wed Oct 23 04:39:10 2024 +0800 Init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..08bdb1b --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.vscode +video +config.json +logs diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..6d7857d --- /dev/null +++ b/config/config.go @@ -0,0 +1,213 @@ +package config + +import ( + "encoding/json" + "errors" + "fmt" + "live-streamer/utils" + "log" + "os" + "path/filepath" + "reflect" + "strings" +) + +type OutputConfig struct { + RTMPServer string `json:"rtmp_server"` + StreamKey string `json:"stream_key"` +} + +type InputItem struct { + Path string `json:"path"` + Start string `json:"start"` + End string `json:"end"` + ItemType string `json:"-"` +} + +type PlayConfig struct { + VideoCodec string `json:"video_codec"` + Preset string `json:"preset"` + CRF int `json:"crf"` + MaxRate string `json:"max_rate"` + BufSize string `json:"buf_size"` + Scale string `json:"scale"` + FrameRate int `json:"frame_rate"` + AudioCodec string `json:"audio_codec"` + AudioBitrate string `json:"audio_bitrate"` + AudioSampleRate int `json:"audio_sample_rate"` + OutputFormat string `json:"output_format"` + CustomArgs string `json:"custom_args"` +} + +type Config struct { + Input []any `json:"input"` + inputItems []InputItem `json:"-"` // contains video file or dir + PlayList []InputItem `json:"-"` // only contains video file + Play PlayConfig `json:"play"` + Output OutputConfig `json:"output"` +} + +var GlobalConfig Config + +func init() { + GlobalConfig = Config{} + err := readConfig("config.json") + for i, item := range GlobalConfig.inputItems { + if item.ItemType == "file" { + GlobalConfig.PlayList = append(GlobalConfig.PlayList, item) + } else if item.ItemType == "dir" { + videos, err := getAllVideos(item.Path) + if err != nil { + log.Fatalf("input[%v] walk error: %v", i, err) + } + GlobalConfig.PlayList = append(GlobalConfig.PlayList, videos...) + } + } + if len(GlobalConfig.PlayList) == 0 { + log.Fatal("No input video found") + } + if err != nil { + if os.IsNotExist(err) { + log.Fatal("Config not exists") + } else { + log.Fatal(err) + } + } +} + +func readConfig(configPath string) error { + stat, err := os.Stat(configPath) + if err != nil { + return fmt.Errorf("config read failed: %v", err) + } + if stat.IsDir() { + return os.ErrNotExist + } + databytes, err := os.ReadFile(configPath) + if err != nil { + return fmt.Errorf("Config read failed: %v", err) + } + if err = json.Unmarshal(databytes, &GlobalConfig); err != nil { + return fmt.Errorf("config unmarshal failed: %v", err) + } + err = validateConfig() + if err != nil { + return fmt.Errorf("config validate failed: %v", err) + } + return nil +} + +func validateInputConfig() error { + if GlobalConfig.Input == nil { + return errors.New("video_path is nil") + } else { + for i, item := range GlobalConfig.Input { + typeOf := reflect.TypeOf(item) + var inputItem InputItem + if typeOf.Kind() == reflect.String { + inputItem = InputItem{Path: item.(string)} + } + if inputItem.Path == "" { + return fmt.Errorf("video_path[%v] is empty", i) + } + stat, err := os.Stat(inputItem.Path) + if err != nil { + return fmt.Errorf("video_path[%v] stat failed: %v", i, err) + } + if stat.IsDir() { + inputItem.ItemType = "dir" + } else { + inputItem.ItemType = "file" + if !utils.IsSupportedVideo(inputItem.Path) { + return fmt.Errorf("video_path[%v] is not supported", i) + } + } + GlobalConfig.inputItems = append(GlobalConfig.inputItems, inputItem) + } + } + return nil +} + +func validateOutputConfig() error { + if GlobalConfig.Output.RTMPServer == "" { + return errors.New("rtmp_server is empty") + } else if !strings.HasPrefix(GlobalConfig.Output.RTMPServer, "rtmp://") && + !strings.HasPrefix(GlobalConfig.Output.RTMPServer, "rtmps://") { + return errors.New("rtmp_server is not a valid rtmp server") + } else { + GlobalConfig.Output.RTMPServer = strings.TrimSuffix(GlobalConfig.Output.RTMPServer, "/") + } + if GlobalConfig.Output.StreamKey == "" { + return errors.New("stream_key is empty") + } else { + GlobalConfig.Output.StreamKey = strings.TrimPrefix(GlobalConfig.Output.StreamKey, "/") + } + return nil +} + +func validatePlayConfig() error { + if GlobalConfig.Play.VideoCodec == "" { + GlobalConfig.Play.VideoCodec = "libx264" + } + if GlobalConfig.Play.Preset == "" { + GlobalConfig.Play.Preset = "fast" + } + if GlobalConfig.Play.CRF == 0 { + GlobalConfig.Play.CRF = 23 + } + if GlobalConfig.Play.MaxRate == "" { + GlobalConfig.Play.MaxRate = "8000k" + } + if GlobalConfig.Play.BufSize == "" { + GlobalConfig.Play.BufSize = "12000k" + } + if GlobalConfig.Play.Scale == "" { + GlobalConfig.Play.Scale = "1920:1080" + } + if GlobalConfig.Play.FrameRate == 0 { + GlobalConfig.Play.FrameRate = 30 + } + if GlobalConfig.Play.AudioCodec == "" { + GlobalConfig.Play.AudioCodec = "aac" + } + if GlobalConfig.Play.AudioBitrate == "" { + GlobalConfig.Play.AudioBitrate = "192k" + } + if GlobalConfig.Play.AudioSampleRate == 0 { + GlobalConfig.Play.AudioSampleRate = 48000 + } + if GlobalConfig.Play.OutputFormat == "" { + GlobalConfig.Play.OutputFormat = "flv" + } + return nil +} + +func validateConfig() error { + if err := validateInputConfig(); err != nil { + return err + } + if err := validateOutputConfig(); err != nil { + return err + } + if err := validatePlayConfig(); err != nil { + return err + } + return nil +} + +func getAllVideos(dirPath string) ([]InputItem, error) { + res := []InputItem{} + err := filepath.Walk(dirPath, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() && utils.IsSupportedVideo(path) { + res = append(res, InputItem{Path: path}) + } + return nil + }) + if err != nil { + return nil, err + } + return res, nil +} diff --git a/constant/constant.go b/constant/constant.go new file mode 100644 index 0000000..7e6726e --- /dev/null +++ b/constant/constant.go @@ -0,0 +1,17 @@ +package constant + +var SupportedStreamingFormats = []string{ + "mp4", + "flv", + "mkv", + "ts", + "webm", + "avi", + "mov", + "wmv", + "ogg", + "m3u8", + "mpd", + "rtmp", + "srt", +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..eaf067e --- /dev/null +++ b/go.mod @@ -0,0 +1,7 @@ +module live-streamer + +go 1.23.2 + +require github.com/fsnotify/fsnotify v1.7.0 + +require golang.org/x/sys v0.4.0 // indirect diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..ccd7ce9 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= +github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/main.go b/main.go new file mode 100644 index 0000000..d62d6c4 --- /dev/null +++ b/main.go @@ -0,0 +1,82 @@ +package main + +import ( + "bufio" + "live-streamer/config" + "live-streamer/streamer" + "live-streamer/utils" + "log" + "os" + + "github.com/fsnotify/fsnotify" +) + +var GlobalStreamer *streamer.Streamer + +func main() { + if !utils.HasFFMPEG() { + log.Fatal("ffmpeg not found") + } + GlobalStreamer = streamer.NewStreamer(config.GlobalConfig.PlayList) + go input() + go startWatcher() + GlobalStreamer.Stream() + GlobalStreamer.Close() +} + +func input() { + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + switch scanner.Text() { + case "prev": + GlobalStreamer.Prev() + case "next": + GlobalStreamer.Next() + default: + log.Println("unknown command") + } + } +} + +func startWatcher() { + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Fatalf("failed to create watcher: %v", err) + } + defer watcher.Close() + + for _, item := range config.GlobalConfig.PlayList { + if item.ItemType == "dir" { + err = watcher.Add(item.Path) + if err != nil { + log.Fatalf("failed to add dir to watcher: %v", err) + } + log.Println("watching dir:", item.Path) + } + } + if err != nil { + log.Fatalf("failed to start watcher: %v", err) + } + + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + if event.Op&fsnotify.Create == fsnotify.Create { + if utils.IsSupportedVideo(event.Name) { + GlobalStreamer.Add(event.Name) + } + } + if event.Op&fsnotify.Remove == fsnotify.Remove { + GlobalStreamer.Remove(event.Name) + } + case err, ok := <-watcher.Errors: + if !ok { + return + } + log.Println("watcher error:", err) + } + } +} diff --git a/streamer/streamer.go b/streamer/streamer.go new file mode 100644 index 0000000..07ba1a0 --- /dev/null +++ b/streamer/streamer.go @@ -0,0 +1,241 @@ +package streamer + +import ( + "bufio" + "fmt" + "io" + "live-streamer/config" + "log" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "time" +) + +type Streamer struct { + playlist []config.InputItem + currentVideoIndex int + cmd *exec.Cmd + stopped bool + logFile *os.File + doneCh chan bool + mu sync.Mutex + manualNext bool +} + +func NewStreamer(playList []config.InputItem) *Streamer { + logDir := "logs" + if err := os.MkdirAll(logDir, 0755); err != nil { + log.Printf("Error creating log directory: %v\n", err) + } + logPath := filepath.Join(logDir, fmt.Sprintf("ffmpeg_%s.log", time.Now().Format("2006-01-02_15-04-05"))) + logFile, err := os.OpenFile(logPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + log.Fatalf("Error opening log file: %v\n", err) + } + return &Streamer{ + playlist: playList, + currentVideoIndex: 0, + cmd: nil, + logFile: logFile, + doneCh: make(chan bool, 1), + } +} + +func (s *Streamer) Add(videoPath string) { + s.playlist = append(s.playlist, config.InputItem{Path: videoPath}) +} + +func (s *Streamer) Remove(videoPath string) { + for i, item := range s.playlist { + if item.Path == videoPath { + s.playlist = append(s.playlist[:i], s.playlist[i+1:]...) + if s.currentVideoIndex >= len(s.playlist) { + s.currentVideoIndex = 0 + } + if s.currentVideoIndex == i { + s.start() + } + break + } + } +} + +func (s *Streamer) Prev() { + s.currentVideoIndex-- + if s.currentVideoIndex < 0 { + s.currentVideoIndex = len(s.playlist) - 1 + } + s.start() +} + +func (s *Streamer) Next() { + s.manualNext = true + s.currentVideoIndex++ + if s.currentVideoIndex >= len(s.playlist) { + s.currentVideoIndex = 0 + } + s.start() +} + +func (s *Streamer) Stream() { + for { + if len(s.playlist) == 0 { + time.Sleep(time.Second) + continue + } + if s.currentVideoIndex >= len(s.playlist) { + s.currentVideoIndex = 0 + } + s.start() + } +} + +func (s *Streamer) buildFFmpegArgs(videoItem config.InputItem) []string { + videoPath := videoItem.Path + + args := []string{"-re"} + if videoItem.Start != "" { + args = append(args, "-ss", videoItem.Start) + } + + args = append(args, "-i", videoPath) + + if videoItem.End != "" { + args = append(args, "-to", videoItem.End) + } + + args = append(args, + "-c:v", config.GlobalConfig.Play.VideoCodec, + "-preset", config.GlobalConfig.Play.Preset, + "-crf", fmt.Sprintf("%d", config.GlobalConfig.Play.CRF), + "-maxrate", config.GlobalConfig.Play.MaxRate, + "-bufsize", config.GlobalConfig.Play.BufSize, + "-vf", fmt.Sprintf("scale=%s", config.GlobalConfig.Play.Scale), + "-r", fmt.Sprintf("%d", config.GlobalConfig.Play.FrameRate), + "-c:a", config.GlobalConfig.Play.AudioCodec, + "-b:a", config.GlobalConfig.Play.AudioBitrate, + "-ar", fmt.Sprintf("%d", config.GlobalConfig.Play.AudioSampleRate), + "-f", config.GlobalConfig.Play.OutputFormat, + "-stats", "-loglevel", "info", + ) + + if config.GlobalConfig.Play.CustomArgs != "" { + customArgs := strings.Fields(config.GlobalConfig.Play.CustomArgs) + args = append(args, customArgs...) + } + + args = append(args, fmt.Sprintf("%s/%s", config.GlobalConfig.Output.RTMPServer, config.GlobalConfig.Output.StreamKey)) + + // log.Println("ffmpeg args: ", args) + + return args +} + +func (s *Streamer) start() { + defer func() { + s.cmd = nil + }() + if s.cmd != nil && !s.stopped { + s.Stop() + } + s.mu.Lock() + s.stopped = false + s.mu.Unlock() + + currentVideo := s.playlist[s.currentVideoIndex] + videoPath := currentVideo.Path + log.Println("start stream: ", videoPath) + + s.cmd = exec.Command("ffmpeg", s.buildFFmpegArgs(currentVideo)...) + + pipe, err := s.cmd.StderrPipe() + if err != nil { + log.Printf("failed to get pipe: %v", err) + return + } + + reader := bufio.NewReader(pipe) + writer := bufio.NewWriter(s.logFile) + + if err := s.cmd.Start(); err != nil { + log.Printf("starting ffmpeg error: %v\n", err) + return + } + + go func() { + defer func() { + if s.logFile != nil { + if err := s.logFile.Sync(); err != nil { + log.Printf("syncing log file error: %v\n", err) + } + } + }() + buf := make([]byte, 1024) + for { + n, err := reader.Read(buf) + if err != nil { + if err != io.EOF && !s.stopped { + log.Printf("reading ffmpeg error: %v\n", err) + } + break + } + if n > 0 { + timestamp := time.Now().Format("2006-01-02 15:04:05") + logLine := fmt.Sprintf("[%s] %s", timestamp, string(buf[:n])) + if s.logFile != nil { + if _, err := writer.WriteString(logLine); err != nil { + log.Printf("writing to log file error: %v\n", err) + } + if err := writer.Flush(); err != nil { + log.Printf("flushing writer error: %v\n", err) + } + } + } + } + }() + + go func() { + err = s.cmd.Wait() + s.doneCh <- true + s.mu.Lock() + defer s.mu.Unlock() + if err != nil && !s.stopped { + log.Printf("ffmpeg streaming error: %v\nStart streaming next video\n", err) + s.Next() + } else { + log.Println("ffmpeg streaming stopped") + if !s.manualNext { + s.Next() + } + } + }() + + <-s.doneCh +} + +func (s *Streamer) Stop() { + if s.cmd != nil && s.cmd.Process != nil { + s.stopped = true + _ = s.cmd.Process.Kill() + <-s.doneCh + s.cmd = nil + } +} + +func (s *Streamer) GetCurrentVideo() string { + return s.playlist[s.currentVideoIndex].Path +} + +func (s *Streamer) GetPlaylist() []config.InputItem { + return s.playlist +} + +func (s *Streamer) Close() { + if s.logFile != nil { + s.logFile.Close() + s.logFile = nil + } +} diff --git a/utils/has_ffmpeg.go b/utils/has_ffmpeg.go new file mode 100644 index 0000000..f080312 --- /dev/null +++ b/utils/has_ffmpeg.go @@ -0,0 +1,8 @@ +package utils + +import "os/exec" + +func HasFFMPEG() bool { + _, err := exec.LookPath("ffmpeg") + return err == nil +} diff --git a/utils/is_supported_video.go b/utils/is_supported_video.go new file mode 100644 index 0000000..e30a813 --- /dev/null +++ b/utils/is_supported_video.go @@ -0,0 +1,13 @@ +package utils + +import ( + "live-streamer/constant" + "path/filepath" + "slices" + "strings" +) + +func IsSupportedVideo(filename string) bool { + ext := strings.ToLower(filepath.Ext(filename)) + return slices.Contains(constant.SupportedStreamingFormats, strings.TrimPrefix(ext, ".")) +}