diff --git a/main.go b/main.go index f4e175e..0886ad7 100755 --- a/main.go +++ b/main.go @@ -77,6 +77,7 @@ func inputHandler() { GlobalStreamer.Prev() case "quit": GlobalStreamer.Close() + os.Exit(0) case "current": fmt.Println(GlobalStreamer.GetCurrentVideoPath()) } diff --git a/server/server.go b/server/server.go index a79fe90..978a091 100755 --- a/server/server.go +++ b/server/server.go @@ -5,6 +5,7 @@ import ( "html/template" "live-streamer/streamer" "net/http" + "os" "sync" "time" @@ -180,5 +181,6 @@ func (s *Server) RequestHandler(reqType RequestType) { s.streamer.Prev() case TypeQuit: s.streamer.Close() + os.Exit(0) } } diff --git a/streamer/streamer.go b/streamer/streamer.go index 5f6549b..5ba1a64 100755 --- a/streamer/streamer.go +++ b/streamer/streamer.go @@ -9,7 +9,6 @@ import ( "live-streamer/model" "path/filepath" - "os" "os/exec" "strings" "sync" @@ -26,8 +25,9 @@ type Streamer struct { outputQueue chan string outputReq chan chan string // address output concurrency security issue - wg sync.WaitGroup // wait all handlers(except closehandler) to finish before closure - close chan any + wg sync.WaitGroup // wait all handlers(except closehandler) to finish before closure + close chan any // inform actorLoop() to return + closed chan any // inform Close() to return log *zap.Logger option *Option @@ -70,6 +70,7 @@ func NewStreamer(rtmpServer string, streamKey string, videoList []model.VideoIte log: log, rtmpServer: rtmpServer, streamKey: streamKey, + closed: make(chan any), } GlobalStreamer = s go s.actorLoop() @@ -237,10 +238,7 @@ func (s *Streamer) handleNext() { } s.state.manualControl = true - s.state.currentVideoIndex++ - if s.state.currentVideoIndex >= len(s.state.videoList) { - s.state.currentVideoIndex = 0 - } + s.state.currentVideoIndex = (s.state.currentVideoIndex + 1) % len(s.state.videoList) s.mailbox <- StopMessage{} s.mailbox <- StartMessage{} @@ -252,10 +250,7 @@ func (s *Streamer) handlePrev() { } s.state.manualControl = true - s.state.currentVideoIndex-- - if s.state.currentVideoIndex < 0 { - s.state.currentVideoIndex = len(s.state.videoList) - 1 - } + s.state.currentVideoIndex = (s.state.currentVideoIndex - 1 + len(s.state.videoList)) % len(s.state.videoList) s.mailbox <- StopMessage{} s.mailbox <- StartMessage{} @@ -289,7 +284,12 @@ func (s *Streamer) handleClose() { close(s.close) s.handleStop() s.wg.Wait() - os.Exit(0) + close(s.mailbox) + close(s.outputQueue) + close(s.outputReq) + s.output.Reset() + GlobalStreamer = nil + close(s.closed) } // Public methods that send messages to the actor @@ -343,6 +343,7 @@ func (s *Streamer) GetCurrentIndex() int { func (s *Streamer) Close() { s.mailbox <- CloseMessage{} + <-s.closed } func (s *Streamer) handleOutput() {