commit 5873c0f3c81cba718c15289f456d26e64eca4ac2 Author: nite Date: Fri Mar 21 22:16:43 2025 +1100 start diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..3e22480 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,22 @@ +{ + // 使用 IntelliSense 了解相关属性。 + // 悬停以查看现有属性的描述。 + // 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Launch server", + "type": "go", + "request": "launch", + "mode": "debug", + "program": "${workspaceFolder}/cmd/server/main.go" + }, + { + "name": "Launch client", + "type": "go", + "request": "launch", + "mode": "debug", + "program": "${workspaceFolder}/cmd/client/main.go" + } + ] +} diff --git a/cmd/client/main.go b/cmd/client/main.go new file mode 100644 index 0000000..d49c848 --- /dev/null +++ b/cmd/client/main.go @@ -0,0 +1,35 @@ +package main + +import ( + "log" + "os" + "os/signal" + "syscall" + + "net-tunnel/internal/client" + pb "net-tunnel/pkg/proto" +) + +func main() { + // 创建客户端 + cli := client.NewClient("localhost", 7000) + + // 添加 TCP 代理示例 + cli.AddProxy("ssh", pb.ProxyType_TCP, "127.0.0.1", 22, 2222) + + // 添加 UDP 代理示例 + // cli.AddProxy("dns", pb.ProxyType_UDP, "8.8.8.8", 53, 5353) + + // 启动客户端 + if err := cli.Start(); err != nil { + log.Fatalf("Client error: %v", err) + } + + // 等待中断信号 + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + <-sig + + // 优雅关闭 + cli.Stop() +} diff --git a/cmd/server/main.go b/cmd/server/main.go new file mode 100644 index 0000000..6e74af6 --- /dev/null +++ b/cmd/server/main.go @@ -0,0 +1,30 @@ +package main + +import ( + "log" + "os" + "os/signal" + "syscall" + + "net-tunnel/internal/server" +) + +func main() { + // 创建服务端 + srv := server.NewServer("0.0.0.0", 7000) + + // 在后台启动服务 + go func() { + if err := srv.Start(); err != nil { + log.Fatalf("Server error: %v", err) + } + }() + + // 等待中断信号 + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + <-sig + + // 优雅关闭 + srv.Stop() +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d821ecd --- /dev/null +++ b/go.mod @@ -0,0 +1,12 @@ +module net-tunnel + +go 1.24.1 + +require ( + golang.org/x/net v0.34.0 // indirect + golang.org/x/sys v0.29.0 // indirect + golang.org/x/text v0.21.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect + google.golang.org/grpc v1.71.0 // indirect + google.golang.org/protobuf v1.36.4 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..eefbbe0 --- /dev/null +++ b/go.sum @@ -0,0 +1,12 @@ +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f h1:OxYkA3wjPsZyBylwymxSHa7ViiW1Sml4ToBrncvFehI= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50= +google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg= +google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= +google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM= +google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= diff --git a/internal/client/client.go b/internal/client/client.go new file mode 100644 index 0000000..ae86c3a --- /dev/null +++ b/internal/client/client.go @@ -0,0 +1,209 @@ +package client + +import ( + "context" + "fmt" + "log" + "net" + "os" + "strings" + "sync" + "time" + + pb "net-tunnel/pkg/proto" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// Client 客户端 +type Client struct { + serverAddr string + serverPort int + conn *grpc.ClientConn + client pb.TunnelServiceClient + stream pb.TunnelService_EstablishControlConnectionClient + ctx context.Context + cancel context.CancelFunc + proxies *pb.ProxyConfigs + mu sync.Mutex + connections sync.Map // map[connID]*ConnectionState +} + +// NewClient 创建一个新的客户端 +func NewClient(serverAddr string, serverPort int) *Client { + ctx, cancel := context.WithCancel(context.Background()) + return &Client{ + serverAddr: serverAddr, + serverPort: serverPort, + ctx: ctx, + cancel: cancel, + proxies: &pb.ProxyConfigs{}, + connections: sync.Map{}, + } +} + +// AddProxy 添加一个代理配置 +func (c *Client) AddProxy(name string, proxyType pb.ProxyType, localIP string, localPort, remotePort int32) { + c.mu.Lock() + defer c.mu.Unlock() + if c.proxies.Configs == nil { + c.proxies.Configs = make(map[string]*pb.ProxyConfig) + } + c.proxies.Configs[name] = &pb.ProxyConfig{ + Name: name, + Type: proxyType, + LocalIp: localIP, + LocalPort: localPort, + RemotePort: remotePort, + } +} + +// Start 启动客户端 +func (c *Client) Start() error { + // 连接到服务端 + addr := fmt.Sprintf("%s:%d", c.serverAddr, c.serverPort) + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return fmt.Errorf("failed to connect to server: %v", err) + } + c.conn = conn + + c.client = pb.NewTunnelServiceClient(conn) + + // 建立控制连接 + if err := c.establishControlConnection(); err != nil { + return err + } + + return nil +} + +// Stop 停止客户端 +func (c *Client) Stop() { + // 关闭所有连接 + c.connections.Range(func(key, value interface{}) bool { + connState := value.(*ConnectionState) + connState.Close() + c.connections.Delete(key) + return true + }) + + // 关闭控制连接 + c.cancel() + if c.conn != nil { + c.conn.Close() + } +} + +// establishControlConnection 建立控制连接 +func (c *Client) establishControlConnection() error { + stream, err := c.client.EstablishControlConnection(c.ctx) + if err != nil { + return fmt.Errorf("failed to establish control connection: %v", err) + } + c.stream = stream + + // 注册所有代理 + if err := stream.Send(&pb.Message{ + Content: &pb.Message_RegisterConfigs{ + RegisterConfigs: c.proxies, + }, + }); err != nil { + return fmt.Errorf("failed to send proxy config: %v", err) + } + + // 接收服务端消息 + go c.receiveMessages() + + return nil +} + +// receiveMessages 接收服务端消息 +func (c *Client) receiveMessages() { + for { + msg, err := c.stream.Recv() + if err != nil { + log.Printf("Control connection closed: %v", err) + // 尝试重连 + time.Sleep(5 * time.Second) + if err := c.establishControlConnection(); err != nil { + log.Printf("Failed to re-establish control connection: %v", err) + return + } + return + } + + switch msg.GetContent().(type) { + case *pb.Message_ProxyData: + c.handleProxyData(msg) + case *pb.Message_RegisterProxiesError: + c.handleRegisterProxiesError(msg) + case *pb.Message_RegisterConfigs: + c.handleRegisterConfigs(msg) + case *pb.Message_ProxyError: + c.handleProxyError(msg) + } + } +} + +func (c *Client) handleProxyData(msg *pb.Message) { + proxyData := msg.GetProxyData() + log.Printf("Received proxy data for connection: %v", proxyData.ConnId) + + // 处理代理数据 + hostPort := net.JoinHostPort(proxyData.ProxyConfig.LocalIp, fmt.Sprintf("%d", proxyData.ProxyConfig.LocalPort)) + + existingConn, ok := c.connections.Load(proxyData.ConnId) + var connState *ConnectionState + if ok { + connState = existingConn.(*ConnectionState) + } else { + conn, err := net.Dial(strings.ToLower(proxyData.ProxyConfig.Type.String()), hostPort) + if err != nil { + log.Printf("Failed to connect to proxy: %v", err) + return + } + switch strings.ToLower(proxyData.ProxyConfig.Type.String()) { + case "tcp": + if tcpConn, ok := conn.(*net.TCPConn); ok { + _ = tcpConn.SetKeepAlive(true) + _ = tcpConn.SetKeepAlivePeriod(30 * time.Second) + _ = tcpConn.SetNoDelay(true) + } + } + connState = NewConnectionState(conn, proxyData.ConnId, proxyData.ProxyConfig, c.stream) + c.connections.Store(proxyData.ConnId, connState) + } + + err := connState.WriteData(proxyData.Data) + if err != nil { + log.Printf("Failed to write data: %v", err) + connState.Close() + c.connections.Delete(proxyData.ConnId) + return + } + connState.StartReading() +} + +func (c *Client) handleRegisterProxiesError(msg *pb.Message) { + registerProxiesError := msg.GetRegisterProxiesError() + log.Printf("Register proxies error: %v", registerProxiesError.ProxyConfig.Name) + os.Exit(1) +} + +func (c *Client) handleRegisterConfigs(msg *pb.Message) { + registerConfigs := msg.GetRegisterConfigs() + for name := range registerConfigs.Configs { + log.Printf("Register config: %v", name) + } +} + +func (c *Client) handleProxyError(msg *pb.Message) { + proxyError := msg.GetProxyError() + connState, ok := c.connections.Load(proxyError.ConnId) + if ok { + connState.(*ConnectionState).Close() + c.connections.Delete(proxyError.ConnId) + } +} diff --git a/internal/client/connection_state.go b/internal/client/connection_state.go new file mode 100644 index 0000000..0f632c2 --- /dev/null +++ b/internal/client/connection_state.go @@ -0,0 +1,117 @@ +package client + +import ( + "io" + "log" + "net" + pb "net-tunnel/pkg/proto" + "sync" + "time" +) + +type ConnectionState struct { + conn net.Conn + connID string + config *pb.ProxyConfig + stream pb.TunnelService_EstablishControlConnectionClient + closed bool + closedMutex sync.Mutex + closeOnce sync.Once +} + +func NewConnectionState(conn net.Conn, connID string, config *pb.ProxyConfig, stream pb.TunnelService_EstablishControlConnectionClient) *ConnectionState { + return &ConnectionState{ + conn: conn, + connID: connID, + config: config, + stream: stream, + closed: false, + closedMutex: sync.Mutex{}, + closeOnce: sync.Once{}, + } +} + +func (c *ConnectionState) Close() { + c.closeOnce.Do(func() { + c.closedMutex.Lock() + defer c.closedMutex.Unlock() + + if !c.closed { + c.closed = true + c.conn.Close() + log.Printf("Connection %s closed", c.connID) + } + }) +} + +func (c *ConnectionState) IsClosed() bool { + c.closedMutex.Lock() + defer c.closedMutex.Unlock() + return c.closed +} + +func (c *ConnectionState) WriteData(data []byte) error { + c.closedMutex.Lock() + defer c.closedMutex.Unlock() + + if c.closed { + return io.EOF + } + + _, err := c.conn.Write(data) + if err != nil { + c.Close() + return err + } + return nil +} + +func (c *ConnectionState) StartReading() { + go func() { + buffer := make([]byte, 4096) + defer c.Close() + + for { + if c.IsClosed() { + return + } + + if c.config.Type == pb.ProxyType_TCP { + if tcpConn, ok := c.conn.(*net.TCPConn); ok { + _ = tcpConn.SetReadDeadline(time.Now().Add(time.Minute * 3)) + } + } + + n, err := c.conn.Read(buffer) + if err != nil { + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + continue + } + + if err == io.EOF { + log.Printf("Connection %s closed by remote", c.connID) + } + return + } + + if n > 0 { + if c.IsClosed() { + return + } + if err := c.stream.Send(&pb.Message{ + Content: &pb.Message_ProxyData{ + ProxyData: &pb.ProxyData{ + ConnId: c.connID, + Data: buffer[:n], + ProxyConfig: c.config, + }, + }, + }); err != nil { + log.Printf("Failed to send proxy data: %v", err) + c.Close() + return + } + } + } + }() +} diff --git a/internal/server/connection.go b/internal/server/connection.go new file mode 100644 index 0000000..49fc027 --- /dev/null +++ b/internal/server/connection.go @@ -0,0 +1,66 @@ +package server + +import ( + "context" + "sync" + + "net-tunnel/pkg/proto" +) + +// ControlConnection 表示客户端和服务端之间的控制连接 +type ControlConnection struct { + stream proto.TunnelService_EstablishControlConnectionServer + cancel context.CancelFunc + ctx context.Context +} + +// NewControlConnection 创建新的控制连接 +func NewControlConnection(stream proto.TunnelService_EstablishControlConnectionServer) *ControlConnection { + ctx, cancel := context.WithCancel(context.Background()) + return &ControlConnection{ + stream: stream, + ctx: ctx, + cancel: cancel, + } +} + +// Send 发送消息到客户端 +func (c *ControlConnection) Send(msg *proto.Message) error { + return c.stream.Send(msg) +} + +// Close 关闭连接 +func (c *ControlConnection) Close() { + c.cancel() +} + +// ConnectionManager 管理所有客户端的控制连接 +type ConnectionManager struct { + connections sync.Map // map[clientID]*ControlConnection +} + +// NewConnectionManager 创建新的连接管理器 +func NewConnectionManager() *ConnectionManager { + return &ConnectionManager{ + connections: sync.Map{}, + } +} + +// AddConnection 添加新的控制连接 +func (m *ConnectionManager) AddConnection(id string, conn *ControlConnection) { + m.connections.Store(id, conn) +} + +// RemoveConnection 移除控制连接 +func (m *ConnectionManager) RemoveConnection(id string) { + m.connections.Delete(id) +} + +// GetConnection 获取指定客户端的控制连接 +func (m *ConnectionManager) GetConnection(id string) (*ControlConnection, bool) { + conn, ok := m.connections.Load(id) + if !ok { + return nil, false + } + return conn.(*ControlConnection), true +} diff --git a/internal/server/proxy.go b/internal/server/proxy.go new file mode 100644 index 0000000..9e8a749 --- /dev/null +++ b/internal/server/proxy.go @@ -0,0 +1,226 @@ +package server + +import ( + "fmt" + "io" + "log" + "net" + "sync" + "time" + + "net-tunnel/pkg/proto" +) + +// ProxyEntry 表示一个代理条目 +type ProxyEntry struct { + Config *proto.ProxyConfig + ClientID string + TCPListener net.Listener // 用于 TCP 代理 + TCPConns sync.Map // map[remoteAddr]net.Conn 用于 TCP 代理 + UDPConn *net.UDPConn // 用于 UDP 代理 +} + +// ProxyManager 管理所有代理 +type ProxyManager struct { + proxies sync.Map // map[proxyName]*ProxyEntry + connManager *ConnectionManager +} + +// NewProxyManager 创建新的代理管理器 +func NewProxyManager(connManager *ConnectionManager) *ProxyManager { + return &ProxyManager{ + proxies: sync.Map{}, + connManager: connManager, + } +} + +// RegisterProxy 注册一个新的代理 +func (m *ProxyManager) RegisterProxy(clientID string, config *proto.ProxyConfig) error { + if _, exists := m.proxies.Load(config.Name); exists { + return fmt.Errorf("proxy %s already registered", config.Name) + } + + entry := &ProxyEntry{ + Config: config, + ClientID: clientID, + } + + // 根据代理类型启动监听器 + var err error + if config.Type == proto.ProxyType_TCP { + err = m.startTCPProxy(entry) + } else if config.Type == proto.ProxyType_UDP { + err = m.startUDPProxy(entry) + } else { + return fmt.Errorf("unsupported proxy type: %v", config.Type) + } + + if err != nil { + return err + } + + m.proxies.Store(config.Name, entry) + log.Printf("Registered proxy: %s (type: %s, port: %d)", + config.Name, config.Type, config.RemotePort) + + return nil +} + +// UnregisterProxy 注销一个代理 +func (m *ProxyManager) UnregisterProxy(clientID, proxyName string) { + m.closeProxyLocked(proxyName) +} + +// UnregisterAllProxies 注销客户端的所有代理 +func (m *ProxyManager) UnregisterAllProxies(clientID string) { + m.proxies.Range(func(key, value interface{}) bool { + if entry, ok := value.(*ProxyEntry); ok && entry.ClientID == clientID { + m.closeProxyLocked(key.(string)) + } + return true + }) +} + +// closeProxyLocked 关闭代理(在持有锁的情况下调用) +func (m *ProxyManager) closeProxyLocked(proxyID string) { + if entry, exists := m.proxies.Load(proxyID); exists { + entry := entry.(*ProxyEntry) + if entry.TCPListener != nil { + entry.TCPListener.Close() + } + if entry.UDPConn != nil { + entry.UDPConn.Close() + } + m.proxies.Delete(proxyID) + log.Printf("Unregistered proxy: %s", entry.Config.Name) + } +} + +// startTCPProxy 启动一个 TCP 代理 +func (m *ProxyManager) startTCPProxy(entry *ProxyEntry) error { + addr := fmt.Sprintf(":%d", entry.Config.RemotePort) + listener, err := net.Listen("tcp", addr) + if err != nil { + return fmt.Errorf("failed to listen on port %d: %v", entry.Config.RemotePort, err) + } + + entry.TCPListener = listener + + // 启动协程接受连接 + go m.handleTCPConnections(entry) + + return nil +} + +// startUDPProxy 启动一个 UDP 代理 +func (m *ProxyManager) startUDPProxy(entry *ProxyEntry) error { + addr := fmt.Sprintf(":%d", entry.Config.RemotePort) + udpAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return fmt.Errorf("failed to resolve address %s: %v", addr, err) + } + + conn, err := net.ListenUDP("udp", udpAddr) + if err != nil { + return fmt.Errorf("failed to listen on UDP port %d: %v", entry.Config.RemotePort, err) + } + + entry.UDPConn = conn + + // 启动协程接收 UDP 数据包 + go m.handleUDPPackets(entry) + + return nil +} + +// handleTCPConnections 处理传入的 TCP 连接 +func (m *ProxyManager) handleTCPConnections(entry *ProxyEntry) { + for { + conn, err := entry.TCPListener.Accept() + if err != nil { + // 监听器可能已关闭 + log.Printf("TCP listener for %s closed: %v", entry.Config.Name, err) + break + } + + go m.handleTCPConnection(entry, conn) + } +} + +// handleUDPPackets 处理传入的 UDP 数据包 +func (m *ProxyManager) handleUDPPackets(entry *ProxyEntry) { + buffer := make([]byte, 4096) + for { + n, addr, err := entry.UDPConn.ReadFromUDP(buffer) + if err != nil { + // 连接可能已关闭 + log.Printf("UDP connection for %s closed: %v", entry.Config.Name, err) + break + } + + go m.handleUDPPacket(entry, buffer[:n], addr) + } +} + +func (m *ProxyManager) handleTCPConnection(entry *ProxyEntry, conn net.Conn) { + if tcpConn, ok := conn.(*net.TCPConn); ok { + _ = tcpConn.SetKeepAlive(true) + _ = tcpConn.SetKeepAlivePeriod(30 * time.Second) + _ = tcpConn.SetNoDelay(true) // 对SSH协议很重要,避免数据包延迟 + } + connID := fmt.Sprintf("%s_%d", conn.RemoteAddr().String(), time.Now().UnixNano()) + + defer func() { + conn.Close() + entry.TCPConns.Delete(connID) + log.Printf("TCP connection for %s closed", connID) + }() + + // 获取客户端的控制连接 + controlConn, ok := m.connManager.GetConnection(entry.ClientID) + if !ok { + log.Printf("Control connection not found for client %s", entry.ClientID) + return + } + + entry.TCPConns.Store(connID, conn) + log.Printf("TCP connection for %s accepted", connID) + + wg := sync.WaitGroup{} + wg.Add(1) + // 启动数据转发 + go func() { + defer wg.Done() + buffer := make([]byte, 4096) + for { + n, err := conn.Read(buffer) + if err != nil { + if err == io.EOF { + log.Printf("TCP connection for %s closed", connID) + } else { + log.Printf("Failed to read from TCP connection: %v", err) + } + return + } + + // 发送数据到客户端 + err = controlConn.Send(&proto.Message{ + Content: &proto.Message_ProxyData{ + ProxyData: &proto.ProxyData{ + ConnId: connID, + ProxyConfig: entry.Config, + Data: buffer[:n], + }, + }, + }) + if err != nil { + log.Printf("Failed to send proxy data: %v", err) + return + } + } + }() + wg.Wait() +} + +func (m *ProxyManager) handleUDPPacket(entry *ProxyEntry, data []byte, addr *net.UDPAddr) { +} diff --git a/internal/server/server.go b/internal/server/server.go new file mode 100644 index 0000000..2124e91 --- /dev/null +++ b/internal/server/server.go @@ -0,0 +1,177 @@ +package server + +import ( + "fmt" + "log" + "net" + "time" + + pb "net-tunnel/pkg/proto" + + "google.golang.org/grpc" +) + +// Server 表示服务端 +type Server struct { + pb.UnimplementedTunnelServiceServer + connManager *ConnectionManager + proxyManager *ProxyManager + grpcServer *grpc.Server + bindAddr string + bindPort int +} + +// NewServer 创建一个新的服务端 +func NewServer(bindAddr string, bindPort int) *Server { + connManager := NewConnectionManager() + return &Server{ + connManager: connManager, + proxyManager: NewProxyManager(connManager), + bindAddr: bindAddr, + bindPort: bindPort, + } +} + +// Start 启动服务端 +func (s *Server) Start() error { + addr := fmt.Sprintf("%s:%d", s.bindAddr, s.bindPort) + lis, err := net.Listen("tcp", addr) + if err != nil { + return fmt.Errorf("failed to listen: %v", err) + } + + s.grpcServer = grpc.NewServer() + pb.RegisterTunnelServiceServer(s.grpcServer, s) + + log.Printf("Server started, listening on %s", addr) + return s.grpcServer.Serve(lis) +} + +// Stop 停止服务端 +func (s *Server) Stop() { + if s.grpcServer != nil { + s.grpcServer.GracefulStop() + } +} + +// EstablishControlConnection 实现 gRPC 服务方法 +func (s *Server) EstablishControlConnection(stream pb.TunnelService_EstablishControlConnectionServer) error { + // 创建控制连接 + clientID := "client_" + fmt.Sprint(time.Now().UnixNano()) + conn := NewControlConnection(stream) + + s.connManager.AddConnection(clientID, conn) + defer s.connManager.RemoveConnection(clientID) + + log.Printf("New control connection established: %s", clientID) + + // 接收客户端消息 + for { + msg, err := stream.Recv() + if err != nil { + log.Printf("Control connection closed: %v", err) + // 清理该客户端的代理 + s.proxyManager.UnregisterAllProxies(clientID) + return err + } + + // 处理不同类型的消息 + switch content := msg.GetContent().(type) { + case *pb.Message_RegisterConfigs: + // 处理代理配置注册 + s.handleProxyRegister(clientID, content) + case *pb.Message_ProxyData: + s.handleProxyData(clientID, content) + default: + log.Printf("收到未知类型的消息") + } + } +} + +func (s *Server) handleProxyRegister(clientID string, msg *pb.Message_RegisterConfigs) { + hasError := false + + conn, ok := s.connManager.GetConnection(clientID) + if !ok { + log.Printf("Control connection not found for client %s", clientID) + return + } + + for _, config := range msg.RegisterConfigs.GetConfigs() { + if err := s.proxyManager.RegisterProxy(clientID, config); err != nil { + log.Printf("Failed to register proxy %s: %v", config.Name, err) + msg := &pb.Message_RegisterProxiesError{ + RegisterProxiesError: &pb.RegisterProxiesError{ + ProxyConfig: config, + Error: err.Error(), + }, + } + + if err := conn.Send(&pb.Message{ + Content: msg, + }); err != nil { + log.Printf("Failed to send message to client %s: %v", clientID, err) + } + hasError = true + break + } + } + if !hasError { + if err := conn.Send(&pb.Message{ + Content: msg, + }); err != nil { + log.Printf("Failed to send message to client %s: %v", clientID, err) + } + } +} + +func (s *Server) handleProxyData(clientID string, msg *pb.Message_ProxyData) { + proxyEntry, ok := s.proxyManager.proxies.Load(msg.ProxyData.ProxyConfig.Name) + if !ok { + log.Printf("Proxy %s not found", msg.ProxyData.ProxyConfig.Name) + return + } + entry := proxyEntry.(*ProxyEntry) + switch entry.Config.Type { + case pb.ProxyType_TCP: + conn, ok := entry.TCPConns.Load(msg.ProxyData.ConnId) + if !ok { + log.Printf("TCP connection %s not found", msg.ProxyData.ConnId) + controlConn, ok := s.connManager.GetConnection(clientID) + if ok { + if err := controlConn.Send(&pb.Message{ + Content: &pb.Message_ProxyError{ + ProxyError: &pb.ProxyError{ + ProxyConfig: msg.ProxyData.ProxyConfig, + ConnId: msg.ProxyData.ConnId, + Error: "TCP connection not found", + }, + }, + }); err != nil { + log.Printf("Failed to send message to client %s: %v", clientID, err) + } + } + return + } + _, err := conn.(net.Conn).Write(msg.ProxyData.Data) + if err != nil { + log.Printf("Failed to write data to TCP connection: %v", err) + conn.(net.Conn).Close() + entry.TCPConns.Delete(msg.ProxyData.ConnId) + controlConn, ok := s.connManager.GetConnection(clientID) + if ok { + if err := controlConn.Send(&pb.Message{ + Content: &pb.Message_ProxyError{ + ProxyError: &pb.ProxyError{ + ProxyConfig: msg.ProxyData.ProxyConfig, + ConnId: msg.ProxyData.ConnId, + Error: "Failed to write data to TCP connection", + }, + }, + }); err != nil { + log.Printf("Failed to send message to client %s: %v", clientID, err) + } + } + } + } +} diff --git a/pkg/proto/tunnel.pb.go b/pkg/proto/tunnel.pb.go new file mode 100644 index 0000000..874fbce --- /dev/null +++ b/pkg/proto/tunnel.pb.go @@ -0,0 +1,625 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.5 +// protoc v6.30.0 +// source: pkg/proto/tunnel.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ProxyType int32 + +const ( + ProxyType_TCP ProxyType = 0 + ProxyType_UDP ProxyType = 1 +) + +// Enum value maps for ProxyType. +var ( + ProxyType_name = map[int32]string{ + 0: "TCP", + 1: "UDP", + } + ProxyType_value = map[string]int32{ + "TCP": 0, + "UDP": 1, + } +) + +func (x ProxyType) Enum() *ProxyType { + p := new(ProxyType) + *p = x + return p +} + +func (x ProxyType) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (ProxyType) Descriptor() protoreflect.EnumDescriptor { + return file_pkg_proto_tunnel_proto_enumTypes[0].Descriptor() +} + +func (ProxyType) Type() protoreflect.EnumType { + return &file_pkg_proto_tunnel_proto_enumTypes[0] +} + +func (x ProxyType) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use ProxyType.Descriptor instead. +func (ProxyType) EnumDescriptor() ([]byte, []int) { + return file_pkg_proto_tunnel_proto_rawDescGZIP(), []int{0} +} + +// 通用消息格式 +type Message struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Types that are valid to be assigned to Content: + // + // *Message_RegisterConfigs + // *Message_ProxyData + // *Message_RegisterProxiesError + // *Message_ProxyError + Content isMessage_Content `protobuf_oneof:"content"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *Message) Reset() { + *x = Message{} + mi := &file_pkg_proto_tunnel_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_pkg_proto_tunnel_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_pkg_proto_tunnel_proto_rawDescGZIP(), []int{0} +} + +func (x *Message) GetContent() isMessage_Content { + if x != nil { + return x.Content + } + return nil +} + +func (x *Message) GetRegisterConfigs() *ProxyConfigs { + if x != nil { + if x, ok := x.Content.(*Message_RegisterConfigs); ok { + return x.RegisterConfigs + } + } + return nil +} + +func (x *Message) GetProxyData() *ProxyData { + if x != nil { + if x, ok := x.Content.(*Message_ProxyData); ok { + return x.ProxyData + } + } + return nil +} + +func (x *Message) GetRegisterProxiesError() *RegisterProxiesError { + if x != nil { + if x, ok := x.Content.(*Message_RegisterProxiesError); ok { + return x.RegisterProxiesError + } + } + return nil +} + +func (x *Message) GetProxyError() *ProxyError { + if x != nil { + if x, ok := x.Content.(*Message_ProxyError); ok { + return x.ProxyError + } + } + return nil +} + +type isMessage_Content interface { + isMessage_Content() +} + +type Message_RegisterConfigs struct { + RegisterConfigs *ProxyConfigs `protobuf:"bytes,1,opt,name=register_configs,json=registerConfigs,proto3,oneof"` +} + +type Message_ProxyData struct { + ProxyData *ProxyData `protobuf:"bytes,2,opt,name=proxy_data,json=proxyData,proto3,oneof"` +} + +type Message_RegisterProxiesError struct { + RegisterProxiesError *RegisterProxiesError `protobuf:"bytes,3,opt,name=register_proxies_error,json=registerProxiesError,proto3,oneof"` +} + +type Message_ProxyError struct { + ProxyError *ProxyError `protobuf:"bytes,4,opt,name=proxy_error,json=proxyError,proto3,oneof"` +} + +func (*Message_RegisterConfigs) isMessage_Content() {} + +func (*Message_ProxyData) isMessage_Content() {} + +func (*Message_RegisterProxiesError) isMessage_Content() {} + +func (*Message_ProxyError) isMessage_Content() {} + +// 代理配置 +type ProxyConfigs struct { + state protoimpl.MessageState `protogen:"open.v1"` + Configs map[string]*ProxyConfig `protobuf:"bytes,1,rep,name=configs,proto3" json:"configs,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ProxyConfigs) Reset() { + *x = ProxyConfigs{} + mi := &file_pkg_proto_tunnel_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ProxyConfigs) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ProxyConfigs) ProtoMessage() {} + +func (x *ProxyConfigs) ProtoReflect() protoreflect.Message { + mi := &file_pkg_proto_tunnel_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ProxyConfigs.ProtoReflect.Descriptor instead. +func (*ProxyConfigs) Descriptor() ([]byte, []int) { + return file_pkg_proto_tunnel_proto_rawDescGZIP(), []int{1} +} + +func (x *ProxyConfigs) GetConfigs() map[string]*ProxyConfig { + if x != nil { + return x.Configs + } + return nil +} + +type ProxyConfig struct { + state protoimpl.MessageState `protogen:"open.v1"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Type ProxyType `protobuf:"varint,2,opt,name=type,proto3,enum=nwct.ProxyType" json:"type,omitempty"` + LocalIp string `protobuf:"bytes,3,opt,name=local_ip,json=localIp,proto3" json:"local_ip,omitempty"` + LocalPort int32 `protobuf:"varint,4,opt,name=local_port,json=localPort,proto3" json:"local_port,omitempty"` + RemotePort int32 `protobuf:"varint,5,opt,name=remote_port,json=remotePort,proto3" json:"remote_port,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ProxyConfig) Reset() { + *x = ProxyConfig{} + mi := &file_pkg_proto_tunnel_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ProxyConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ProxyConfig) ProtoMessage() {} + +func (x *ProxyConfig) ProtoReflect() protoreflect.Message { + mi := &file_pkg_proto_tunnel_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ProxyConfig.ProtoReflect.Descriptor instead. +func (*ProxyConfig) Descriptor() ([]byte, []int) { + return file_pkg_proto_tunnel_proto_rawDescGZIP(), []int{2} +} + +func (x *ProxyConfig) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *ProxyConfig) GetType() ProxyType { + if x != nil { + return x.Type + } + return ProxyType_TCP +} + +func (x *ProxyConfig) GetLocalIp() string { + if x != nil { + return x.LocalIp + } + return "" +} + +func (x *ProxyConfig) GetLocalPort() int32 { + if x != nil { + return x.LocalPort + } + return 0 +} + +func (x *ProxyConfig) GetRemotePort() int32 { + if x != nil { + return x.RemotePort + } + return 0 +} + +// 代理数据 +type ProxyData struct { + state protoimpl.MessageState `protogen:"open.v1"` + ConnId string `protobuf:"bytes,1,opt,name=conn_id,json=connId,proto3" json:"conn_id,omitempty"` + ProxyConfig *ProxyConfig `protobuf:"bytes,2,opt,name=proxy_config,json=proxyConfig,proto3" json:"proxy_config,omitempty"` + Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ProxyData) Reset() { + *x = ProxyData{} + mi := &file_pkg_proto_tunnel_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ProxyData) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ProxyData) ProtoMessage() {} + +func (x *ProxyData) ProtoReflect() protoreflect.Message { + mi := &file_pkg_proto_tunnel_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ProxyData.ProtoReflect.Descriptor instead. +func (*ProxyData) Descriptor() ([]byte, []int) { + return file_pkg_proto_tunnel_proto_rawDescGZIP(), []int{3} +} + +func (x *ProxyData) GetConnId() string { + if x != nil { + return x.ConnId + } + return "" +} + +func (x *ProxyData) GetProxyConfig() *ProxyConfig { + if x != nil { + return x.ProxyConfig + } + return nil +} + +func (x *ProxyData) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +type RegisterProxiesError struct { + state protoimpl.MessageState `protogen:"open.v1"` + ProxyConfig *ProxyConfig `protobuf:"bytes,1,opt,name=proxy_config,json=proxyConfig,proto3" json:"proxy_config,omitempty"` + Error string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RegisterProxiesError) Reset() { + *x = RegisterProxiesError{} + mi := &file_pkg_proto_tunnel_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RegisterProxiesError) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterProxiesError) ProtoMessage() {} + +func (x *RegisterProxiesError) ProtoReflect() protoreflect.Message { + mi := &file_pkg_proto_tunnel_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterProxiesError.ProtoReflect.Descriptor instead. +func (*RegisterProxiesError) Descriptor() ([]byte, []int) { + return file_pkg_proto_tunnel_proto_rawDescGZIP(), []int{4} +} + +func (x *RegisterProxiesError) GetProxyConfig() *ProxyConfig { + if x != nil { + return x.ProxyConfig + } + return nil +} + +func (x *RegisterProxiesError) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +type ProxyError struct { + state protoimpl.MessageState `protogen:"open.v1"` + ProxyConfig *ProxyConfig `protobuf:"bytes,1,opt,name=proxy_config,json=proxyConfig,proto3" json:"proxy_config,omitempty"` + ConnId string `protobuf:"bytes,2,opt,name=conn_id,json=connId,proto3" json:"conn_id,omitempty"` + Error string `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ProxyError) Reset() { + *x = ProxyError{} + mi := &file_pkg_proto_tunnel_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ProxyError) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ProxyError) ProtoMessage() {} + +func (x *ProxyError) ProtoReflect() protoreflect.Message { + mi := &file_pkg_proto_tunnel_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ProxyError.ProtoReflect.Descriptor instead. +func (*ProxyError) Descriptor() ([]byte, []int) { + return file_pkg_proto_tunnel_proto_rawDescGZIP(), []int{5} +} + +func (x *ProxyError) GetProxyConfig() *ProxyConfig { + if x != nil { + return x.ProxyConfig + } + return nil +} + +func (x *ProxyError) GetConnId() string { + if x != nil { + return x.ConnId + } + return "" +} + +func (x *ProxyError) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +var File_pkg_proto_tunnel_proto protoreflect.FileDescriptor + +var file_pkg_proto_tunnel_proto_rawDesc = string([]byte{ + 0x0a, 0x16, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x74, 0x75, 0x6e, 0x6e, + 0x65, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x6e, 0x77, 0x63, 0x74, 0x22, 0x90, + 0x02, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x3f, 0x0a, 0x10, 0x72, 0x65, + 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x6e, 0x77, 0x63, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x78, + 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x48, 0x00, 0x52, 0x0f, 0x72, 0x65, 0x67, 0x69, + 0x73, 0x74, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x12, 0x30, 0x0a, 0x0a, 0x70, + 0x72, 0x6f, 0x78, 0x79, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x0f, 0x2e, 0x6e, 0x77, 0x63, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x44, 0x61, 0x74, 0x61, + 0x48, 0x00, 0x52, 0x09, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x44, 0x61, 0x74, 0x61, 0x12, 0x52, 0x0a, + 0x16, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x70, 0x72, 0x6f, 0x78, 0x69, 0x65, + 0x73, 0x5f, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, + 0x6e, 0x77, 0x63, 0x74, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x50, 0x72, 0x6f, + 0x78, 0x69, 0x65, 0x73, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x48, 0x00, 0x52, 0x14, 0x72, 0x65, 0x67, + 0x69, 0x73, 0x74, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x78, 0x69, 0x65, 0x73, 0x45, 0x72, 0x72, 0x6f, + 0x72, 0x12, 0x33, 0x0a, 0x0b, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x5f, 0x65, 0x72, 0x72, 0x6f, 0x72, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x6e, 0x77, 0x63, 0x74, 0x2e, 0x50, 0x72, + 0x6f, 0x78, 0x79, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x48, 0x00, 0x52, 0x0a, 0x70, 0x72, 0x6f, 0x78, + 0x79, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x42, 0x09, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, + 0x74, 0x22, 0x98, 0x01, 0x0a, 0x0c, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x73, 0x12, 0x39, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x6e, 0x77, 0x63, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, + 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x1a, 0x4d, 0x0a, + 0x0c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, + 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, + 0x27, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, + 0x2e, 0x6e, 0x77, 0x63, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xa1, 0x01, 0x0a, + 0x0b, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, + 0x12, 0x23, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, + 0x2e, 0x6e, 0x77, 0x63, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x54, 0x79, 0x70, 0x65, 0x52, + 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x19, 0x0a, 0x08, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x5f, 0x69, + 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x49, 0x70, + 0x12, 0x1d, 0x0a, 0x0a, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x05, 0x52, 0x09, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x50, 0x6f, 0x72, 0x74, 0x12, + 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x05, + 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x50, 0x6f, 0x72, 0x74, + 0x22, 0x6e, 0x0a, 0x09, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x44, 0x61, 0x74, 0x61, 0x12, 0x17, 0x0a, + 0x07, 0x63, 0x6f, 0x6e, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, + 0x63, 0x6f, 0x6e, 0x6e, 0x49, 0x64, 0x12, 0x34, 0x0a, 0x0c, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x5f, + 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6e, + 0x77, 0x63, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, + 0x0b, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x22, 0x62, 0x0a, 0x14, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x50, 0x72, 0x6f, 0x78, + 0x69, 0x65, 0x73, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x34, 0x0a, 0x0c, 0x70, 0x72, 0x6f, 0x78, + 0x79, 0x5f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, + 0x2e, 0x6e, 0x77, 0x63, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x52, 0x0b, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x14, + 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x22, 0x71, 0x0a, 0x0a, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x45, 0x72, 0x72, + 0x6f, 0x72, 0x12, 0x34, 0x0a, 0x0c, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x5f, 0x63, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6e, 0x77, 0x63, 0x74, 0x2e, + 0x50, 0x72, 0x6f, 0x78, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0b, 0x70, 0x72, 0x6f, + 0x78, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x17, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x6e, + 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x6e, 0x49, + 0x64, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x2a, 0x1d, 0x0a, 0x09, 0x50, 0x72, 0x6f, 0x78, 0x79, + 0x54, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x54, 0x43, 0x50, 0x10, 0x00, 0x12, 0x07, 0x0a, + 0x03, 0x55, 0x44, 0x50, 0x10, 0x01, 0x32, 0x4f, 0x0a, 0x0d, 0x54, 0x75, 0x6e, 0x6e, 0x65, 0x6c, + 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x3e, 0x0a, 0x1a, 0x45, 0x73, 0x74, 0x61, 0x62, + 0x6c, 0x69, 0x73, 0x68, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x43, 0x6f, 0x6e, 0x6e, 0x65, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0d, 0x2e, 0x6e, 0x77, 0x63, 0x74, 0x2e, 0x4d, 0x65, 0x73, + 0x73, 0x61, 0x67, 0x65, 0x1a, 0x0d, 0x2e, 0x6e, 0x77, 0x63, 0x74, 0x2e, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0d, 0x5a, 0x0b, 0x2e, 0x2f, 0x70, 0x6b, 0x67, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +}) + +var ( + file_pkg_proto_tunnel_proto_rawDescOnce sync.Once + file_pkg_proto_tunnel_proto_rawDescData []byte +) + +func file_pkg_proto_tunnel_proto_rawDescGZIP() []byte { + file_pkg_proto_tunnel_proto_rawDescOnce.Do(func() { + file_pkg_proto_tunnel_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_pkg_proto_tunnel_proto_rawDesc), len(file_pkg_proto_tunnel_proto_rawDesc))) + }) + return file_pkg_proto_tunnel_proto_rawDescData +} + +var file_pkg_proto_tunnel_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_pkg_proto_tunnel_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_pkg_proto_tunnel_proto_goTypes = []any{ + (ProxyType)(0), // 0: nwct.ProxyType + (*Message)(nil), // 1: nwct.Message + (*ProxyConfigs)(nil), // 2: nwct.ProxyConfigs + (*ProxyConfig)(nil), // 3: nwct.ProxyConfig + (*ProxyData)(nil), // 4: nwct.ProxyData + (*RegisterProxiesError)(nil), // 5: nwct.RegisterProxiesError + (*ProxyError)(nil), // 6: nwct.ProxyError + nil, // 7: nwct.ProxyConfigs.ConfigsEntry +} +var file_pkg_proto_tunnel_proto_depIdxs = []int32{ + 2, // 0: nwct.Message.register_configs:type_name -> nwct.ProxyConfigs + 4, // 1: nwct.Message.proxy_data:type_name -> nwct.ProxyData + 5, // 2: nwct.Message.register_proxies_error:type_name -> nwct.RegisterProxiesError + 6, // 3: nwct.Message.proxy_error:type_name -> nwct.ProxyError + 7, // 4: nwct.ProxyConfigs.configs:type_name -> nwct.ProxyConfigs.ConfigsEntry + 0, // 5: nwct.ProxyConfig.type:type_name -> nwct.ProxyType + 3, // 6: nwct.ProxyData.proxy_config:type_name -> nwct.ProxyConfig + 3, // 7: nwct.RegisterProxiesError.proxy_config:type_name -> nwct.ProxyConfig + 3, // 8: nwct.ProxyError.proxy_config:type_name -> nwct.ProxyConfig + 3, // 9: nwct.ProxyConfigs.ConfigsEntry.value:type_name -> nwct.ProxyConfig + 1, // 10: nwct.TunnelService.EstablishControlConnection:input_type -> nwct.Message + 1, // 11: nwct.TunnelService.EstablishControlConnection:output_type -> nwct.Message + 11, // [11:12] is the sub-list for method output_type + 10, // [10:11] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name +} + +func init() { file_pkg_proto_tunnel_proto_init() } +func file_pkg_proto_tunnel_proto_init() { + if File_pkg_proto_tunnel_proto != nil { + return + } + file_pkg_proto_tunnel_proto_msgTypes[0].OneofWrappers = []any{ + (*Message_RegisterConfigs)(nil), + (*Message_ProxyData)(nil), + (*Message_RegisterProxiesError)(nil), + (*Message_ProxyError)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_pkg_proto_tunnel_proto_rawDesc), len(file_pkg_proto_tunnel_proto_rawDesc)), + NumEnums: 1, + NumMessages: 7, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_pkg_proto_tunnel_proto_goTypes, + DependencyIndexes: file_pkg_proto_tunnel_proto_depIdxs, + EnumInfos: file_pkg_proto_tunnel_proto_enumTypes, + MessageInfos: file_pkg_proto_tunnel_proto_msgTypes, + }.Build() + File_pkg_proto_tunnel_proto = out.File + file_pkg_proto_tunnel_proto_goTypes = nil + file_pkg_proto_tunnel_proto_depIdxs = nil +} diff --git a/pkg/proto/tunnel.proto b/pkg/proto/tunnel.proto new file mode 100644 index 0000000..346c69c --- /dev/null +++ b/pkg/proto/tunnel.proto @@ -0,0 +1,54 @@ +syntax = "proto3"; + +package nwct; + +option go_package = "./pkg/proto"; + +service TunnelService { + // 建立控制连接 + rpc EstablishControlConnection(stream Message) returns (stream Message); +} + +// 通用消息格式 +message Message { + oneof content { + ProxyConfigs register_configs = 1; + ProxyData proxy_data = 2; + RegisterProxiesError register_proxies_error = 3; + ProxyError proxy_error = 4; + } +} + +// 代理配置 +message ProxyConfigs { map configs = 1; } + +message ProxyConfig { + string name = 1; + ProxyType type = 2; + string local_ip = 3; + int32 local_port = 4; + int32 remote_port = 5; +} + +enum ProxyType { + TCP = 0; + UDP = 1; +} + +// 代理数据 +message ProxyData { + string conn_id = 1; + ProxyConfig proxy_config = 2; + bytes data = 3; +} + +message RegisterProxiesError { + ProxyConfig proxy_config = 1; + string error = 2; +} + +message ProxyError { + ProxyConfig proxy_config = 1; + string conn_id = 2; + string error = 3; +} \ No newline at end of file diff --git a/pkg/proto/tunnel_grpc.pb.go b/pkg/proto/tunnel_grpc.pb.go new file mode 100644 index 0000000..6bef5f6 --- /dev/null +++ b/pkg/proto/tunnel_grpc.pb.go @@ -0,0 +1,117 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v6.30.0 +// source: pkg/proto/tunnel.proto + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + TunnelService_EstablishControlConnection_FullMethodName = "/nwct.TunnelService/EstablishControlConnection" +) + +// TunnelServiceClient is the client API for TunnelService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type TunnelServiceClient interface { + // 建立控制连接 + EstablishControlConnection(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[Message, Message], error) +} + +type tunnelServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewTunnelServiceClient(cc grpc.ClientConnInterface) TunnelServiceClient { + return &tunnelServiceClient{cc} +} + +func (c *tunnelServiceClient) EstablishControlConnection(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[Message, Message], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &TunnelService_ServiceDesc.Streams[0], TunnelService_EstablishControlConnection_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[Message, Message]{ClientStream: stream} + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type TunnelService_EstablishControlConnectionClient = grpc.BidiStreamingClient[Message, Message] + +// TunnelServiceServer is the server API for TunnelService service. +// All implementations must embed UnimplementedTunnelServiceServer +// for forward compatibility. +type TunnelServiceServer interface { + // 建立控制连接 + EstablishControlConnection(grpc.BidiStreamingServer[Message, Message]) error + mustEmbedUnimplementedTunnelServiceServer() +} + +// UnimplementedTunnelServiceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedTunnelServiceServer struct{} + +func (UnimplementedTunnelServiceServer) EstablishControlConnection(grpc.BidiStreamingServer[Message, Message]) error { + return status.Errorf(codes.Unimplemented, "method EstablishControlConnection not implemented") +} +func (UnimplementedTunnelServiceServer) mustEmbedUnimplementedTunnelServiceServer() {} +func (UnimplementedTunnelServiceServer) testEmbeddedByValue() {} + +// UnsafeTunnelServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to TunnelServiceServer will +// result in compilation errors. +type UnsafeTunnelServiceServer interface { + mustEmbedUnimplementedTunnelServiceServer() +} + +func RegisterTunnelServiceServer(s grpc.ServiceRegistrar, srv TunnelServiceServer) { + // If the following call pancis, it indicates UnimplementedTunnelServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&TunnelService_ServiceDesc, srv) +} + +func _TunnelService_EstablishControlConnection_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(TunnelServiceServer).EstablishControlConnection(&grpc.GenericServerStream[Message, Message]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type TunnelService_EstablishControlConnectionServer = grpc.BidiStreamingServer[Message, Message] + +// TunnelService_ServiceDesc is the grpc.ServiceDesc for TunnelService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var TunnelService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "nwct.TunnelService", + HandlerType: (*TunnelServiceServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "EstablishControlConnection", + Handler: _TunnelService_EstablishControlConnection_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "pkg/proto/tunnel.proto", +}