u
This commit is contained in:
		| @@ -108,7 +108,6 @@ func (s *Server) handleWebSocket(c *gin.Context) { | ||||
| 	go func() { | ||||
| 		ticker := time.NewTicker(1 * time.Second) | ||||
| 		for range ticker.C { | ||||
| 			streamer.GlobalStreamer.TruncateOutput() | ||||
| 			currentVideoPath, _ := streamer.GlobalStreamer.GetCurrentVideoPath() | ||||
| 			s.Broadcast(mywebsocket.Date{ | ||||
| 				Timestamp:        time.Now().UnixMilli(), | ||||
|   | ||||
| @@ -8,45 +8,55 @@ import ( | ||||
| 	"io" | ||||
| 	"live-streamer/config" | ||||
| 	"log" | ||||
| 	"math" | ||||
| 	"os/exec" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
| ) | ||||
|  | ||||
| type Streamer struct { | ||||
| 	videoList         []config.InputItem | ||||
| type playState struct { | ||||
| 	currentVideoIndex int | ||||
| 	manualControl     bool | ||||
| 	cmd               *exec.Cmd | ||||
| 	ctx               context.Context | ||||
| 	cancel            context.CancelFunc | ||||
| 	output            strings.Builder | ||||
| 	manualControl     bool | ||||
| 	mu                sync.Mutex | ||||
| } | ||||
|  | ||||
| type Streamer struct { | ||||
| 	playStateMu sync.RWMutex | ||||
| 	playState   playState | ||||
|  | ||||
| 	videoMu   sync.RWMutex | ||||
| 	videoList []config.InputItem | ||||
|  | ||||
| 	outputMu sync.RWMutex | ||||
| 	output   strings.Builder | ||||
| } | ||||
|  | ||||
| var GlobalStreamer *Streamer | ||||
|  | ||||
| func NewStreamer(videoList []config.InputItem) *Streamer { | ||||
| 	GlobalStreamer = &Streamer{ | ||||
| 		videoList:         videoList, | ||||
| 		currentVideoIndex: 0, | ||||
| 		cmd:               nil, | ||||
| 		ctx:               nil, | ||||
| 		videoList: videoList, | ||||
| 		playState: playState{}, | ||||
| 		output:    strings.Builder{}, | ||||
| 	} | ||||
| 	return GlobalStreamer | ||||
| } | ||||
|  | ||||
| func (s *Streamer) start() { | ||||
| 	s.mu.Lock() | ||||
| 	s.ctx, s.cancel = context.WithCancel(context.Background()) | ||||
| 	currentVideo := s.videoList[s.currentVideoIndex] | ||||
| 	s.playStateMu.Lock() | ||||
| 	s.playState.ctx, s.playState.cancel = context.WithCancel(context.Background()) | ||||
| 	currentVideo := s.videoList[s.playState.currentVideoIndex] | ||||
| 	videoPath := currentVideo.Path | ||||
| 	s.cmd = exec.CommandContext(s.ctx, "ffmpeg", s.buildFFmpegArgs(currentVideo)...) | ||||
| 	s.mu.Unlock() | ||||
| 	s.playState.cmd = exec.CommandContext(s.playState.ctx, "ffmpeg", s.buildFFmpegArgs(currentVideo)...) | ||||
| 	cmd := s.playState.cmd | ||||
| 	ctx := s.playState.ctx | ||||
| 	s.playStateMu.Unlock() | ||||
|  | ||||
| 	s.writeOutput(fmt.Sprintln("start stream: ", videoPath)) | ||||
| 	pipe, err := s.cmd.StderrPipe() | ||||
|  | ||||
| 	pipe, err := cmd.StderrPipe() | ||||
| 	if err != nil { | ||||
| 		log.Printf("failed to get pipe: %v", err) | ||||
| 		return | ||||
| @@ -54,25 +64,29 @@ func (s *Streamer) start() { | ||||
|  | ||||
| 	reader := bufio.NewReader(pipe) | ||||
|  | ||||
| 	if err := s.cmd.Start(); err != nil { | ||||
| 	if err := cmd.Start(); err != nil { | ||||
| 		s.writeOutput(fmt.Sprintf("starting ffmpeg error: %v\n", err)) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	go s.log(reader) | ||||
|  | ||||
| 	<-s.ctx.Done() | ||||
| 	<-ctx.Done() | ||||
| 	s.writeOutput(fmt.Sprintf("stop stream: %s\n", videoPath)) | ||||
|  | ||||
| 	if s.manualControl { | ||||
| 		s.manualControl = false | ||||
| 	s.playStateMu.Lock() | ||||
| 	if s.playState.manualControl { | ||||
| 		// manualing change video, don't increase currentVideoIndex | ||||
| 		s.playState.manualControl = false | ||||
| 	} else { | ||||
| 		// stream next video | ||||
| 		s.currentVideoIndex++ | ||||
| 		if s.currentVideoIndex >= len(s.videoList) { | ||||
| 			s.currentVideoIndex = 0 | ||||
| 		s.playState.currentVideoIndex++ | ||||
| 		s.videoMu.RLock() | ||||
| 		if s.playState.currentVideoIndex >= len(s.videoList) { | ||||
| 			s.playState.currentVideoIndex = 0 | ||||
| 		} | ||||
| 		s.videoMu.RUnlock() | ||||
| 	} | ||||
| 	s.playStateMu.Unlock() | ||||
| } | ||||
|  | ||||
| func (s *Streamer) Stream() { | ||||
| @@ -86,83 +100,123 @@ func (s *Streamer) Stream() { | ||||
| } | ||||
|  | ||||
| func (s *Streamer) Stop() { | ||||
| 	s.mu.Lock() | ||||
| 	defer s.mu.Unlock() | ||||
| 	if s.cancel != nil { | ||||
| 		stopped := make(chan error) | ||||
| 		go func() { | ||||
| 			if s.cmd != nil { | ||||
| 				stopped <- s.cmd.Wait() | ||||
| 			} | ||||
| 		}() | ||||
| 		s.cancel() | ||||
| 		if s.cmd != nil && s.cmd.Process != nil { | ||||
| 			select { | ||||
| 			case <-stopped: | ||||
| 				break | ||||
| 			case <-time.After(3 * time.Second): | ||||
| 				_ = s.cmd.Process.Kill() | ||||
| 				break | ||||
| 			} | ||||
| 			s.cmd = nil | ||||
| 	s.playStateMu.Lock() | ||||
| 	cancel := s.playState.cancel | ||||
| 	cmd := s.playState.cmd | ||||
| 	s.playState.cancel = nil | ||||
| 	s.playState.cmd = nil | ||||
| 	s.playStateMu.Unlock() | ||||
|  | ||||
| 	if cancel == nil || cmd == nil { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	stopped := make(chan error, 1) | ||||
| 	go func() { | ||||
| 		if cmd.Process != nil { | ||||
| 			stopped <- cmd.Wait() | ||||
| 		} else { | ||||
| 			stopped <- nil | ||||
| 		} | ||||
| 	}() | ||||
|  | ||||
| 	cancel() | ||||
|  | ||||
| 	if cmd.Process != nil { | ||||
| 		select { | ||||
| 		case <-stopped: | ||||
| 		case <-time.After(3 * time.Second): | ||||
| 			_ = cmd.Process.Kill() | ||||
| 		} | ||||
| 		close(stopped) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (s *Streamer) writeOutput(str string) { | ||||
| 	s.mu.Lock() | ||||
| 	defer s.mu.Unlock() | ||||
| 	s.output.WriteString(str) | ||||
| } | ||||
|  | ||||
| func (s *Streamer) Add(videoPath string) { | ||||
| 	s.mu.Lock() | ||||
| 	defer s.mu.Unlock() | ||||
| 	s.videoMu.Lock() | ||||
| 	defer s.videoMu.Unlock() | ||||
| 	s.videoList = append(s.videoList, config.InputItem{Path: videoPath}) | ||||
| } | ||||
|  | ||||
| func (s *Streamer) Remove(videoPath string) { | ||||
| 	s.mu.Lock() | ||||
| 	defer s.mu.Unlock() | ||||
| 	var needStop bool // removed video is current playing | ||||
| 	var removeIndex int = -1 | ||||
|  | ||||
| 	s.videoMu.Lock() | ||||
| 	for i, item := range s.videoList { | ||||
| 		if item.Path == videoPath { | ||||
| 			s.videoList = append(s.videoList[:i], s.videoList[i+1:]...) | ||||
| 			if s.currentVideoIndex >= len(s.videoList) { | ||||
| 				s.currentVideoIndex = 0 | ||||
| 			} | ||||
| 			if s.currentVideoIndex == i { | ||||
| 				s.Stop() | ||||
| 			} | ||||
| 			removeIndex = i | ||||
|  | ||||
| 			s.playStateMu.RLock() | ||||
| 			needStop = (s.playState.currentVideoIndex == i) | ||||
| 			s.playStateMu.RUnlock() | ||||
|  | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	if removeIndex >= 0 && removeIndex < len(s.videoList) { | ||||
| 		oldLen := len(s.videoList) | ||||
| 		s.videoList = append(s.videoList[:removeIndex], s.videoList[removeIndex+1:]...) | ||||
|  | ||||
| 		s.playStateMu.Lock() | ||||
| 		if s.playState.currentVideoIndex >= oldLen-1 { | ||||
| 			s.playState.currentVideoIndex = 0 | ||||
| 		} | ||||
| 		s.playStateMu.Unlock() | ||||
| 	} | ||||
| 	s.videoMu.Unlock() | ||||
|  | ||||
| 	if needStop { | ||||
| 		s.Stop() | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (s *Streamer) Prev() { | ||||
| 	s.mu.Lock() | ||||
| 	s.manualControl = true | ||||
| 	s.currentVideoIndex-- | ||||
| 	if s.currentVideoIndex < 0 { | ||||
| 		s.currentVideoIndex = len(s.videoList) - 1 | ||||
| 	s.videoMu.RLock() | ||||
| 	videoLen := len(s.videoList) | ||||
| 	if videoLen == 0 { | ||||
| 		return | ||||
| 	} | ||||
| 	s.mu.Unlock() | ||||
| 	s.videoMu.RUnlock() | ||||
|  | ||||
| 	s.playStateMu.Lock() | ||||
| 	s.playState.manualControl = true | ||||
| 	s.playState.currentVideoIndex-- | ||||
| 	if s.playState.currentVideoIndex < 0 { | ||||
| 		s.playState.currentVideoIndex = videoLen - 1 | ||||
| 	} | ||||
| 	s.playStateMu.Unlock() | ||||
|  | ||||
| 	s.Stop() | ||||
| } | ||||
|  | ||||
| func (s *Streamer) Next() { | ||||
| 	s.mu.Lock() | ||||
| 	s.manualControl = true | ||||
| 	s.currentVideoIndex++ | ||||
| 	if s.currentVideoIndex >= len(s.videoList) { | ||||
| 		s.currentVideoIndex = 0 | ||||
| 	s.videoMu.RLock() | ||||
| 	videoLen := len(s.videoList) | ||||
| 	if videoLen == 0 { | ||||
| 		return | ||||
| 	} | ||||
| 	s.mu.Unlock() | ||||
| 	s.videoMu.RUnlock() | ||||
|  | ||||
| 	s.playStateMu.Lock() | ||||
| 	s.playState.manualControl = true | ||||
| 	s.playState.currentVideoIndex++ | ||||
| 	if s.playState.currentVideoIndex >= videoLen { | ||||
| 		s.playState.currentVideoIndex = 0 | ||||
| 	} | ||||
| 	s.playStateMu.Unlock() | ||||
|  | ||||
| 	s.Stop() | ||||
| } | ||||
|  | ||||
| func (s *Streamer) log(reader *bufio.Reader) { | ||||
| 	s.playStateMu.RLock() | ||||
| 	ctx := s.playState.ctx | ||||
| 	s.playStateMu.RUnlock() | ||||
|  | ||||
| 	select { | ||||
| 	case <-s.ctx.Done(): | ||||
| 	case <-ctx.Done(): | ||||
| 		return | ||||
| 	default: | ||||
| 		if !config.GlobalConfig.Log.PlayState { | ||||
| @@ -187,23 +241,23 @@ func (s *Streamer) log(reader *bufio.Reader) { | ||||
| } | ||||
|  | ||||
| func (s *Streamer) GetCurrentVideoPath() (string, error) { | ||||
| 	s.mu.Lock() | ||||
| 	defer s.mu.Unlock() | ||||
| 	s.videoMu.RLock() | ||||
| 	defer s.videoMu.RUnlock() | ||||
| 	if len(s.videoList) == 0 { | ||||
| 		return "", errors.New("no video streaming") | ||||
| 	} | ||||
| 	return s.videoList[s.currentVideoIndex].Path, nil | ||||
| 	return s.videoList[s.GetCurrentIndex()].Path, nil | ||||
| } | ||||
|  | ||||
| func (s *Streamer) GetVideoList() []config.InputItem { | ||||
| 	s.mu.Lock() | ||||
| 	defer s.mu.Unlock() | ||||
| 	s.videoMu.RLock() | ||||
| 	defer s.videoMu.RUnlock() | ||||
| 	return s.videoList | ||||
| } | ||||
|  | ||||
| func (s *Streamer) GetVideoListPath() []string { | ||||
| 	s.mu.Lock() | ||||
| 	defer s.mu.Unlock() | ||||
| 	s.videoMu.RLock() | ||||
| 	defer s.videoMu.RUnlock() | ||||
| 	var videoList []string | ||||
| 	for _, item := range s.videoList { | ||||
| 		videoList = append(videoList, item.Path) | ||||
| @@ -212,30 +266,23 @@ func (s *Streamer) GetVideoListPath() []string { | ||||
| } | ||||
|  | ||||
| func (s *Streamer) GetCurrentIndex() int { | ||||
| 	s.mu.Lock() | ||||
| 	defer s.mu.Unlock() | ||||
| 	return s.currentVideoIndex | ||||
| 	s.playStateMu.RLock() | ||||
| 	defer s.playStateMu.RUnlock() | ||||
| 	return s.playState.currentVideoIndex | ||||
| } | ||||
|  | ||||
| func (s *Streamer) writeOutput(str string) { | ||||
| 	s.outputMu.Lock() | ||||
| 	defer s.outputMu.Unlock() | ||||
| 	s.output.WriteString(str) | ||||
| } | ||||
|  | ||||
| func (s *Streamer) GetOutput() string { | ||||
| 	s.mu.Lock() | ||||
| 	defer s.mu.Unlock() | ||||
| 	s.outputMu.RLock() | ||||
| 	defer s.outputMu.RUnlock() | ||||
| 	return s.output.String() | ||||
| } | ||||
|  | ||||
| func (s *Streamer) TruncateOutput() int { | ||||
| 	s.mu.Lock() | ||||
| 	defer s.mu.Unlock() | ||||
| 	currentSize := s.output.Len() | ||||
| 	if currentSize > math.MaxInt { | ||||
| 		newStart := currentSize - math.MaxInt | ||||
| 		trimmedOutput := s.output.String()[newStart:] | ||||
| 		s.output.Reset() | ||||
| 		s.output.WriteString(trimmedOutput) | ||||
| 	} | ||||
| 	return currentSize | ||||
| } | ||||
|  | ||||
| func (s *Streamer) Close() { | ||||
| 	s.Stop() | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user