重构控制连接相关代码,将EstablishControlConnection重命名为Connect,并更新相关接口和实现,改进了连接状态管理,使用TCPConnectionState替代ConnectionState,优化了代理管理器的结构。
This commit is contained in:
parent
5873c0f3c8
commit
d1997b6e1f
@ -22,7 +22,7 @@ type Client struct {
|
||||
serverPort int
|
||||
conn *grpc.ClientConn
|
||||
client pb.TunnelServiceClient
|
||||
stream pb.TunnelService_EstablishControlConnectionClient
|
||||
stream pb.TunnelService_ConnectClient
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
proxies *pb.ProxyConfigs
|
||||
@ -83,7 +83,7 @@ func (c *Client) Start() error {
|
||||
func (c *Client) Stop() {
|
||||
// 关闭所有连接
|
||||
c.connections.Range(func(key, value interface{}) bool {
|
||||
connState := value.(*ConnectionState)
|
||||
connState := value.(*TCPConnectionState)
|
||||
connState.Close()
|
||||
c.connections.Delete(key)
|
||||
return true
|
||||
@ -98,7 +98,7 @@ func (c *Client) Stop() {
|
||||
|
||||
// establishControlConnection 建立控制连接
|
||||
func (c *Client) establishControlConnection() error {
|
||||
stream, err := c.client.EstablishControlConnection(c.ctx)
|
||||
stream, err := c.client.Connect(c.ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to establish control connection: %v", err)
|
||||
}
|
||||
@ -155,9 +155,9 @@ func (c *Client) handleProxyData(msg *pb.Message) {
|
||||
hostPort := net.JoinHostPort(proxyData.ProxyConfig.LocalIp, fmt.Sprintf("%d", proxyData.ProxyConfig.LocalPort))
|
||||
|
||||
existingConn, ok := c.connections.Load(proxyData.ConnId)
|
||||
var connState *ConnectionState
|
||||
var connState *TCPConnectionState
|
||||
if ok {
|
||||
connState = existingConn.(*ConnectionState)
|
||||
connState = existingConn.(*TCPConnectionState)
|
||||
} else {
|
||||
conn, err := net.Dial(strings.ToLower(proxyData.ProxyConfig.Type.String()), hostPort)
|
||||
if err != nil {
|
||||
@ -172,7 +172,7 @@ func (c *Client) handleProxyData(msg *pb.Message) {
|
||||
_ = tcpConn.SetNoDelay(true)
|
||||
}
|
||||
}
|
||||
connState = NewConnectionState(conn, proxyData.ConnId, proxyData.ProxyConfig, c.stream)
|
||||
connState = NewTCPConnectionState(conn, proxyData.ConnId, proxyData.ProxyConfig, c.stream)
|
||||
c.connections.Store(proxyData.ConnId, connState)
|
||||
}
|
||||
|
||||
@ -203,7 +203,7 @@ func (c *Client) handleProxyError(msg *pb.Message) {
|
||||
proxyError := msg.GetProxyError()
|
||||
connState, ok := c.connections.Load(proxyError.ConnId)
|
||||
if ok {
|
||||
connState.(*ConnectionState).Close()
|
||||
connState.(*TCPConnectionState).Close()
|
||||
c.connections.Delete(proxyError.ConnId)
|
||||
}
|
||||
}
|
||||
|
@ -1,117 +1,8 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
pb "net-tunnel/pkg/proto"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ConnectionState struct {
|
||||
conn net.Conn
|
||||
connID string
|
||||
config *pb.ProxyConfig
|
||||
stream pb.TunnelService_EstablishControlConnectionClient
|
||||
closed bool
|
||||
closedMutex sync.Mutex
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
func NewConnectionState(conn net.Conn, connID string, config *pb.ProxyConfig, stream pb.TunnelService_EstablishControlConnectionClient) *ConnectionState {
|
||||
return &ConnectionState{
|
||||
conn: conn,
|
||||
connID: connID,
|
||||
config: config,
|
||||
stream: stream,
|
||||
closed: false,
|
||||
closedMutex: sync.Mutex{},
|
||||
closeOnce: sync.Once{},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConnectionState) Close() {
|
||||
c.closeOnce.Do(func() {
|
||||
c.closedMutex.Lock()
|
||||
defer c.closedMutex.Unlock()
|
||||
|
||||
if !c.closed {
|
||||
c.closed = true
|
||||
c.conn.Close()
|
||||
log.Printf("Connection %s closed", c.connID)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (c *ConnectionState) IsClosed() bool {
|
||||
c.closedMutex.Lock()
|
||||
defer c.closedMutex.Unlock()
|
||||
return c.closed
|
||||
}
|
||||
|
||||
func (c *ConnectionState) WriteData(data []byte) error {
|
||||
c.closedMutex.Lock()
|
||||
defer c.closedMutex.Unlock()
|
||||
|
||||
if c.closed {
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
_, err := c.conn.Write(data)
|
||||
if err != nil {
|
||||
c.Close()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ConnectionState) StartReading() {
|
||||
go func() {
|
||||
buffer := make([]byte, 4096)
|
||||
defer c.Close()
|
||||
|
||||
for {
|
||||
if c.IsClosed() {
|
||||
return
|
||||
}
|
||||
|
||||
if c.config.Type == pb.ProxyType_TCP {
|
||||
if tcpConn, ok := c.conn.(*net.TCPConn); ok {
|
||||
_ = tcpConn.SetReadDeadline(time.Now().Add(time.Minute * 3))
|
||||
}
|
||||
}
|
||||
|
||||
n, err := c.conn.Read(buffer)
|
||||
if err != nil {
|
||||
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||||
continue
|
||||
}
|
||||
|
||||
if err == io.EOF {
|
||||
log.Printf("Connection %s closed by remote", c.connID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if n > 0 {
|
||||
if c.IsClosed() {
|
||||
return
|
||||
}
|
||||
if err := c.stream.Send(&pb.Message{
|
||||
Content: &pb.Message_ProxyData{
|
||||
ProxyData: &pb.ProxyData{
|
||||
ConnId: c.connID,
|
||||
Data: buffer[:n],
|
||||
ProxyConfig: c.config,
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
log.Printf("Failed to send proxy data: %v", err)
|
||||
c.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
type ConnectionState interface {
|
||||
Close()
|
||||
IsClosed() bool
|
||||
WriteData(data []byte) error
|
||||
StartReading()
|
||||
}
|
||||
|
115
internal/client/tcp_connection_state.go
Normal file
115
internal/client/tcp_connection_state.go
Normal file
@ -0,0 +1,115 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
pb "net-tunnel/pkg/proto"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type TCPConnectionState struct {
|
||||
conn net.Conn
|
||||
connID string
|
||||
config *pb.ProxyConfig
|
||||
stream pb.TunnelService_ConnectClient
|
||||
closed bool
|
||||
closedMutex sync.Mutex
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
func NewTCPConnectionState(conn net.Conn, connID string, config *pb.ProxyConfig, stream pb.TunnelService_ConnectClient) *TCPConnectionState {
|
||||
return &TCPConnectionState{
|
||||
conn: conn,
|
||||
connID: connID,
|
||||
config: config,
|
||||
stream: stream,
|
||||
closed: false,
|
||||
closedMutex: sync.Mutex{},
|
||||
closeOnce: sync.Once{},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TCPConnectionState) Close() {
|
||||
c.closeOnce.Do(func() {
|
||||
c.closedMutex.Lock()
|
||||
defer c.closedMutex.Unlock()
|
||||
|
||||
if !c.closed {
|
||||
c.closed = true
|
||||
c.conn.Close()
|
||||
log.Printf("Connection %s closed", c.connID)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (c *TCPConnectionState) IsClosed() bool {
|
||||
c.closedMutex.Lock()
|
||||
defer c.closedMutex.Unlock()
|
||||
return c.closed
|
||||
}
|
||||
|
||||
func (c *TCPConnectionState) WriteData(data []byte) error {
|
||||
c.closedMutex.Lock()
|
||||
defer c.closedMutex.Unlock()
|
||||
|
||||
if c.closed {
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
_, err := c.conn.Write(data)
|
||||
if err != nil {
|
||||
c.Close()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TCPConnectionState) StartReading() {
|
||||
go func() {
|
||||
buffer := make([]byte, 4096)
|
||||
defer c.Close()
|
||||
|
||||
for {
|
||||
if c.IsClosed() {
|
||||
return
|
||||
}
|
||||
|
||||
if tcpConn, ok := c.conn.(*net.TCPConn); ok {
|
||||
_ = tcpConn.SetReadDeadline(time.Now().Add(time.Minute * 3))
|
||||
}
|
||||
|
||||
n, err := c.conn.Read(buffer)
|
||||
if err != nil {
|
||||
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
|
||||
continue
|
||||
}
|
||||
|
||||
if err == io.EOF {
|
||||
log.Printf("Connection %s closed by remote", c.connID)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if n > 0 {
|
||||
if c.IsClosed() {
|
||||
return
|
||||
}
|
||||
if err := c.stream.Send(&pb.Message{
|
||||
Content: &pb.Message_ProxyData{
|
||||
ProxyData: &pb.ProxyData{
|
||||
ConnId: c.connID,
|
||||
Data: buffer[:n],
|
||||
ProxyConfig: c.config,
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
log.Printf("Failed to send proxy data: %v", err)
|
||||
c.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
@ -7,17 +7,17 @@ import (
|
||||
"net-tunnel/pkg/proto"
|
||||
)
|
||||
|
||||
// ControlConnection 表示客户端和服务端之间的控制连接
|
||||
type ControlConnection struct {
|
||||
stream proto.TunnelService_EstablishControlConnectionServer
|
||||
// ClientConnection 表示客户端和服务端之间的控制连接
|
||||
type ClientConnection struct {
|
||||
stream proto.TunnelService_ConnectServer
|
||||
cancel context.CancelFunc
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// NewControlConnection 创建新的控制连接
|
||||
func NewControlConnection(stream proto.TunnelService_EstablishControlConnectionServer) *ControlConnection {
|
||||
// NewClientConnection 创建新的控制连接
|
||||
func NewClientConnection(stream proto.TunnelService_ConnectServer) *ClientConnection {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &ControlConnection{
|
||||
return &ClientConnection{
|
||||
stream: stream,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
@ -25,42 +25,42 @@ func NewControlConnection(stream proto.TunnelService_EstablishControlConnectionS
|
||||
}
|
||||
|
||||
// Send 发送消息到客户端
|
||||
func (c *ControlConnection) Send(msg *proto.Message) error {
|
||||
func (c *ClientConnection) Send(msg *proto.Message) error {
|
||||
return c.stream.Send(msg)
|
||||
}
|
||||
|
||||
// Close 关闭连接
|
||||
func (c *ControlConnection) Close() {
|
||||
func (c *ClientConnection) Close() {
|
||||
c.cancel()
|
||||
}
|
||||
|
||||
// ConnectionManager 管理所有客户端的控制连接
|
||||
type ConnectionManager struct {
|
||||
connections sync.Map // map[clientID]*ControlConnection
|
||||
// ClientManager 管理所有客户端的控制连接
|
||||
type ClientManager struct {
|
||||
clients sync.Map // map[clientID]*ControlConnection
|
||||
}
|
||||
|
||||
// NewConnectionManager 创建新的连接管理器
|
||||
func NewConnectionManager() *ConnectionManager {
|
||||
return &ConnectionManager{
|
||||
connections: sync.Map{},
|
||||
// NewClientManager 创建新的连接管理器
|
||||
func NewClientManager() *ClientManager {
|
||||
return &ClientManager{
|
||||
clients: sync.Map{},
|
||||
}
|
||||
}
|
||||
|
||||
// AddConnection 添加新的控制连接
|
||||
func (m *ConnectionManager) AddConnection(id string, conn *ControlConnection) {
|
||||
m.connections.Store(id, conn)
|
||||
// Add 添加新的控制连接
|
||||
func (m *ClientManager) Add(id string, conn *ClientConnection) {
|
||||
m.clients.Store(id, conn)
|
||||
}
|
||||
|
||||
// RemoveConnection 移除控制连接
|
||||
func (m *ConnectionManager) RemoveConnection(id string) {
|
||||
m.connections.Delete(id)
|
||||
// Remove 移除控制连接
|
||||
func (m *ClientManager) Remove(id string) {
|
||||
m.clients.Delete(id)
|
||||
}
|
||||
|
||||
// GetConnection 获取指定客户端的控制连接
|
||||
func (m *ConnectionManager) GetConnection(id string) (*ControlConnection, bool) {
|
||||
conn, ok := m.connections.Load(id)
|
||||
// Get 获取指定客户端的控制连接
|
||||
func (m *ClientManager) Get(id string) (*ClientConnection, bool) {
|
||||
conn, ok := m.clients.Load(id)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return conn.(*ControlConnection), true
|
||||
return conn.(*ClientConnection), true
|
||||
}
|
||||
|
@ -15,22 +15,22 @@ import (
|
||||
type ProxyEntry struct {
|
||||
Config *proto.ProxyConfig
|
||||
ClientID string
|
||||
TCPListener net.Listener // 用于 TCP 代理
|
||||
TCPConns sync.Map // map[remoteAddr]net.Conn 用于 TCP 代理
|
||||
UDPConn *net.UDPConn // 用于 UDP 代理
|
||||
TCPListener net.Listener
|
||||
TCPConns sync.Map
|
||||
UDPConn *net.UDPConn
|
||||
}
|
||||
|
||||
// ProxyManager 管理所有代理
|
||||
type ProxyManager struct {
|
||||
proxies sync.Map // map[proxyName]*ProxyEntry
|
||||
connManager *ConnectionManager
|
||||
proxies sync.Map // map[proxyName]*ProxyEntry
|
||||
clientManager *ClientManager
|
||||
}
|
||||
|
||||
// NewProxyManager 创建新的代理管理器
|
||||
func NewProxyManager(connManager *ConnectionManager) *ProxyManager {
|
||||
func NewProxyManager(connManager *ClientManager) *ProxyManager {
|
||||
return &ProxyManager{
|
||||
proxies: sync.Map{},
|
||||
connManager: connManager,
|
||||
proxies: sync.Map{},
|
||||
clientManager: connManager,
|
||||
}
|
||||
}
|
||||
|
||||
@ -68,22 +68,22 @@ func (m *ProxyManager) RegisterProxy(clientID string, config *proto.ProxyConfig)
|
||||
|
||||
// UnregisterProxy 注销一个代理
|
||||
func (m *ProxyManager) UnregisterProxy(clientID, proxyName string) {
|
||||
m.closeProxyLocked(proxyName)
|
||||
m.closeProxy(proxyName)
|
||||
}
|
||||
|
||||
// UnregisterAllProxies 注销客户端的所有代理
|
||||
func (m *ProxyManager) UnregisterAllProxies(clientID string) {
|
||||
m.proxies.Range(func(key, value interface{}) bool {
|
||||
m.proxies.Range(func(key, value any) bool {
|
||||
if entry, ok := value.(*ProxyEntry); ok && entry.ClientID == clientID {
|
||||
m.closeProxyLocked(key.(string))
|
||||
m.closeProxy(key.(string))
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
// closeProxyLocked 关闭代理(在持有锁的情况下调用)
|
||||
func (m *ProxyManager) closeProxyLocked(proxyID string) {
|
||||
if entry, exists := m.proxies.Load(proxyID); exists {
|
||||
// closeProxy 关闭代理
|
||||
func (m *ProxyManager) closeProxy(proxyName string) {
|
||||
if entry, exists := m.proxies.Load(proxyName); exists {
|
||||
entry := entry.(*ProxyEntry)
|
||||
if entry.TCPListener != nil {
|
||||
entry.TCPListener.Close()
|
||||
@ -91,7 +91,12 @@ func (m *ProxyManager) closeProxyLocked(proxyID string) {
|
||||
if entry.UDPConn != nil {
|
||||
entry.UDPConn.Close()
|
||||
}
|
||||
m.proxies.Delete(proxyID)
|
||||
entry.TCPConns.Range(func(key, value any) bool {
|
||||
value.(net.Conn).Close()
|
||||
entry.TCPConns.Delete(key)
|
||||
return true
|
||||
})
|
||||
m.proxies.Delete(proxyName)
|
||||
log.Printf("Unregistered proxy: %s", entry.Config.Name)
|
||||
}
|
||||
}
|
||||
@ -149,24 +154,24 @@ func (m *ProxyManager) handleTCPConnections(entry *ProxyEntry) {
|
||||
|
||||
// handleUDPPackets 处理传入的 UDP 数据包
|
||||
func (m *ProxyManager) handleUDPPackets(entry *ProxyEntry) {
|
||||
buffer := make([]byte, 4096)
|
||||
for {
|
||||
n, addr, err := entry.UDPConn.ReadFromUDP(buffer)
|
||||
if err != nil {
|
||||
// 连接可能已关闭
|
||||
log.Printf("UDP connection for %s closed: %v", entry.Config.Name, err)
|
||||
break
|
||||
}
|
||||
// buffer := make([]byte, 4096)
|
||||
// for {
|
||||
// n, addr, err := entry.UDPConn.ReadFromUDP(buffer)
|
||||
// if err != nil {
|
||||
// // 连接可能已关闭
|
||||
// log.Printf("UDP connection for %s closed: %v", entry.Config.Name, err)
|
||||
// break
|
||||
// }
|
||||
|
||||
go m.handleUDPPacket(entry, buffer[:n], addr)
|
||||
}
|
||||
// go m.handleUDPPacket(entry, buffer[:n], addr)
|
||||
// }
|
||||
}
|
||||
|
||||
func (m *ProxyManager) handleTCPConnection(entry *ProxyEntry, conn net.Conn) {
|
||||
if tcpConn, ok := conn.(*net.TCPConn); ok {
|
||||
_ = tcpConn.SetKeepAlive(true)
|
||||
_ = tcpConn.SetKeepAlivePeriod(30 * time.Second)
|
||||
_ = tcpConn.SetNoDelay(true) // 对SSH协议很重要,避免数据包延迟
|
||||
_ = tcpConn.SetNoDelay(true)
|
||||
}
|
||||
connID := fmt.Sprintf("%s_%d", conn.RemoteAddr().String(), time.Now().UnixNano())
|
||||
|
||||
@ -177,7 +182,7 @@ func (m *ProxyManager) handleTCPConnection(entry *ProxyEntry, conn net.Conn) {
|
||||
}()
|
||||
|
||||
// 获取客户端的控制连接
|
||||
controlConn, ok := m.connManager.GetConnection(entry.ClientID)
|
||||
controlConn, ok := m.clientManager.Get(entry.ClientID)
|
||||
if !ok {
|
||||
log.Printf("Control connection not found for client %s", entry.ClientID)
|
||||
return
|
||||
@ -223,4 +228,5 @@ func (m *ProxyManager) handleTCPConnection(entry *ProxyEntry, conn net.Conn) {
|
||||
}
|
||||
|
||||
func (m *ProxyManager) handleUDPPacket(entry *ProxyEntry, data []byte, addr *net.UDPAddr) {
|
||||
// entry.UDPConn.WriteToUDP(data, addr)
|
||||
}
|
||||
|
@ -14,21 +14,21 @@ import (
|
||||
// Server 表示服务端
|
||||
type Server struct {
|
||||
pb.UnimplementedTunnelServiceServer
|
||||
connManager *ConnectionManager
|
||||
proxyManager *ProxyManager
|
||||
grpcServer *grpc.Server
|
||||
bindAddr string
|
||||
bindPort int
|
||||
clientManager *ClientManager
|
||||
proxyManager *ProxyManager
|
||||
grpcServer *grpc.Server
|
||||
bindAddr string
|
||||
bindPort int
|
||||
}
|
||||
|
||||
// NewServer 创建一个新的服务端
|
||||
func NewServer(bindAddr string, bindPort int) *Server {
|
||||
connManager := NewConnectionManager()
|
||||
clientManager := NewClientManager()
|
||||
return &Server{
|
||||
connManager: connManager,
|
||||
proxyManager: NewProxyManager(connManager),
|
||||
bindAddr: bindAddr,
|
||||
bindPort: bindPort,
|
||||
clientManager: clientManager,
|
||||
proxyManager: NewProxyManager(clientManager),
|
||||
bindAddr: bindAddr,
|
||||
bindPort: bindPort,
|
||||
}
|
||||
}
|
||||
|
||||
@ -54,14 +54,14 @@ func (s *Server) Stop() {
|
||||
}
|
||||
}
|
||||
|
||||
// EstablishControlConnection 实现 gRPC 服务方法
|
||||
func (s *Server) EstablishControlConnection(stream pb.TunnelService_EstablishControlConnectionServer) error {
|
||||
// Connect 实现 gRPC 服务方法
|
||||
func (s *Server) Connect(stream pb.TunnelService_ConnectServer) error {
|
||||
// 创建控制连接
|
||||
clientID := "client_" + fmt.Sprint(time.Now().UnixNano())
|
||||
conn := NewControlConnection(stream)
|
||||
conn := NewClientConnection(stream)
|
||||
|
||||
s.connManager.AddConnection(clientID, conn)
|
||||
defer s.connManager.RemoveConnection(clientID)
|
||||
s.clientManager.Add(clientID, conn)
|
||||
defer s.clientManager.Remove(clientID)
|
||||
|
||||
log.Printf("New control connection established: %s", clientID)
|
||||
|
||||
@ -91,7 +91,7 @@ func (s *Server) EstablishControlConnection(stream pb.TunnelService_EstablishCon
|
||||
func (s *Server) handleProxyRegister(clientID string, msg *pb.Message_RegisterConfigs) {
|
||||
hasError := false
|
||||
|
||||
conn, ok := s.connManager.GetConnection(clientID)
|
||||
conn, ok := s.clientManager.Get(clientID)
|
||||
if !ok {
|
||||
log.Printf("Control connection not found for client %s", clientID)
|
||||
return
|
||||
@ -126,6 +126,19 @@ func (s *Server) handleProxyRegister(clientID string, msg *pb.Message_RegisterCo
|
||||
}
|
||||
|
||||
func (s *Server) handleProxyData(clientID string, msg *pb.Message_ProxyData) {
|
||||
switch msg.ProxyData.ProxyConfig.Type {
|
||||
case pb.ProxyType_TCP:
|
||||
s.handleTCPProxyData(clientID, msg)
|
||||
case pb.ProxyType_UDP:
|
||||
s.handleUDPProxyData(clientID, msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleUDPProxyData(clientID string, msg *pb.Message_ProxyData) {
|
||||
|
||||
}
|
||||
|
||||
func (s *Server) handleTCPProxyData(clientID string, msg *pb.Message_ProxyData) {
|
||||
proxyEntry, ok := s.proxyManager.proxies.Load(msg.ProxyData.ProxyConfig.Name)
|
||||
if !ok {
|
||||
log.Printf("Proxy %s not found", msg.ProxyData.ProxyConfig.Name)
|
||||
@ -137,7 +150,7 @@ func (s *Server) handleProxyData(clientID string, msg *pb.Message_ProxyData) {
|
||||
conn, ok := entry.TCPConns.Load(msg.ProxyData.ConnId)
|
||||
if !ok {
|
||||
log.Printf("TCP connection %s not found", msg.ProxyData.ConnId)
|
||||
controlConn, ok := s.connManager.GetConnection(clientID)
|
||||
controlConn, ok := s.clientManager.Get(clientID)
|
||||
if ok {
|
||||
if err := controlConn.Send(&pb.Message{
|
||||
Content: &pb.Message_ProxyError{
|
||||
@ -158,7 +171,7 @@ func (s *Server) handleProxyData(clientID string, msg *pb.Message_ProxyData) {
|
||||
log.Printf("Failed to write data to TCP connection: %v", err)
|
||||
conn.(net.Conn).Close()
|
||||
entry.TCPConns.Delete(msg.ProxyData.ConnId)
|
||||
controlConn, ok := s.connManager.GetConnection(clientID)
|
||||
controlConn, ok := s.clientManager.Get(clientID)
|
||||
if ok {
|
||||
if err := controlConn.Send(&pb.Message{
|
||||
Content: &pb.Message_ProxyError{
|
||||
|
@ -540,13 +540,12 @@ var file_pkg_proto_tunnel_proto_rawDesc = string([]byte{
|
||||
0x64, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
|
||||
0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x2a, 0x1d, 0x0a, 0x09, 0x50, 0x72, 0x6f, 0x78, 0x79,
|
||||
0x54, 0x79, 0x70, 0x65, 0x12, 0x07, 0x0a, 0x03, 0x54, 0x43, 0x50, 0x10, 0x00, 0x12, 0x07, 0x0a,
|
||||
0x03, 0x55, 0x44, 0x50, 0x10, 0x01, 0x32, 0x4f, 0x0a, 0x0d, 0x54, 0x75, 0x6e, 0x6e, 0x65, 0x6c,
|
||||
0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x3e, 0x0a, 0x1a, 0x45, 0x73, 0x74, 0x61, 0x62,
|
||||
0x6c, 0x69, 0x73, 0x68, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x43, 0x6f, 0x6e, 0x6e, 0x65,
|
||||
0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x0d, 0x2e, 0x6e, 0x77, 0x63, 0x74, 0x2e, 0x4d, 0x65, 0x73,
|
||||
0x73, 0x61, 0x67, 0x65, 0x1a, 0x0d, 0x2e, 0x6e, 0x77, 0x63, 0x74, 0x2e, 0x4d, 0x65, 0x73, 0x73,
|
||||
0x61, 0x67, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x0d, 0x5a, 0x0b, 0x2e, 0x2f, 0x70, 0x6b, 0x67,
|
||||
0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
0x03, 0x55, 0x44, 0x50, 0x10, 0x01, 0x32, 0x3c, 0x0a, 0x0d, 0x54, 0x75, 0x6e, 0x6e, 0x65, 0x6c,
|
||||
0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x2b, 0x0a, 0x07, 0x43, 0x6f, 0x6e, 0x6e, 0x65,
|
||||
0x63, 0x74, 0x12, 0x0d, 0x2e, 0x6e, 0x77, 0x63, 0x74, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
|
||||
0x65, 0x1a, 0x0d, 0x2e, 0x6e, 0x77, 0x63, 0x74, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
|
||||
0x28, 0x01, 0x30, 0x01, 0x42, 0x0d, 0x5a, 0x0b, 0x2e, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x72,
|
||||
0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
})
|
||||
|
||||
var (
|
||||
@ -584,8 +583,8 @@ var file_pkg_proto_tunnel_proto_depIdxs = []int32{
|
||||
3, // 7: nwct.RegisterProxiesError.proxy_config:type_name -> nwct.ProxyConfig
|
||||
3, // 8: nwct.ProxyError.proxy_config:type_name -> nwct.ProxyConfig
|
||||
3, // 9: nwct.ProxyConfigs.ConfigsEntry.value:type_name -> nwct.ProxyConfig
|
||||
1, // 10: nwct.TunnelService.EstablishControlConnection:input_type -> nwct.Message
|
||||
1, // 11: nwct.TunnelService.EstablishControlConnection:output_type -> nwct.Message
|
||||
1, // 10: nwct.TunnelService.Connect:input_type -> nwct.Message
|
||||
1, // 11: nwct.TunnelService.Connect:output_type -> nwct.Message
|
||||
11, // [11:12] is the sub-list for method output_type
|
||||
10, // [10:11] is the sub-list for method input_type
|
||||
10, // [10:10] is the sub-list for extension type_name
|
||||
|
@ -6,7 +6,7 @@ option go_package = "./pkg/proto";
|
||||
|
||||
service TunnelService {
|
||||
// 建立控制连接
|
||||
rpc EstablishControlConnection(stream Message) returns (stream Message);
|
||||
rpc Connect(stream Message) returns (stream Message);
|
||||
}
|
||||
|
||||
// 通用消息格式
|
||||
|
@ -19,7 +19,7 @@ import (
|
||||
const _ = grpc.SupportPackageIsVersion9
|
||||
|
||||
const (
|
||||
TunnelService_EstablishControlConnection_FullMethodName = "/nwct.TunnelService/EstablishControlConnection"
|
||||
TunnelService_Connect_FullMethodName = "/nwct.TunnelService/Connect"
|
||||
)
|
||||
|
||||
// TunnelServiceClient is the client API for TunnelService service.
|
||||
@ -27,7 +27,7 @@ const (
|
||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||
type TunnelServiceClient interface {
|
||||
// 建立控制连接
|
||||
EstablishControlConnection(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[Message, Message], error)
|
||||
Connect(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[Message, Message], error)
|
||||
}
|
||||
|
||||
type tunnelServiceClient struct {
|
||||
@ -38,9 +38,9 @@ func NewTunnelServiceClient(cc grpc.ClientConnInterface) TunnelServiceClient {
|
||||
return &tunnelServiceClient{cc}
|
||||
}
|
||||
|
||||
func (c *tunnelServiceClient) EstablishControlConnection(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[Message, Message], error) {
|
||||
func (c *tunnelServiceClient) Connect(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[Message, Message], error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
stream, err := c.cc.NewStream(ctx, &TunnelService_ServiceDesc.Streams[0], TunnelService_EstablishControlConnection_FullMethodName, cOpts...)
|
||||
stream, err := c.cc.NewStream(ctx, &TunnelService_ServiceDesc.Streams[0], TunnelService_Connect_FullMethodName, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -49,14 +49,14 @@ func (c *tunnelServiceClient) EstablishControlConnection(ctx context.Context, op
|
||||
}
|
||||
|
||||
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||
type TunnelService_EstablishControlConnectionClient = grpc.BidiStreamingClient[Message, Message]
|
||||
type TunnelService_ConnectClient = grpc.BidiStreamingClient[Message, Message]
|
||||
|
||||
// TunnelServiceServer is the server API for TunnelService service.
|
||||
// All implementations must embed UnimplementedTunnelServiceServer
|
||||
// for forward compatibility.
|
||||
type TunnelServiceServer interface {
|
||||
// 建立控制连接
|
||||
EstablishControlConnection(grpc.BidiStreamingServer[Message, Message]) error
|
||||
Connect(grpc.BidiStreamingServer[Message, Message]) error
|
||||
mustEmbedUnimplementedTunnelServiceServer()
|
||||
}
|
||||
|
||||
@ -67,8 +67,8 @@ type TunnelServiceServer interface {
|
||||
// pointer dereference when methods are called.
|
||||
type UnimplementedTunnelServiceServer struct{}
|
||||
|
||||
func (UnimplementedTunnelServiceServer) EstablishControlConnection(grpc.BidiStreamingServer[Message, Message]) error {
|
||||
return status.Errorf(codes.Unimplemented, "method EstablishControlConnection not implemented")
|
||||
func (UnimplementedTunnelServiceServer) Connect(grpc.BidiStreamingServer[Message, Message]) error {
|
||||
return status.Errorf(codes.Unimplemented, "method Connect not implemented")
|
||||
}
|
||||
func (UnimplementedTunnelServiceServer) mustEmbedUnimplementedTunnelServiceServer() {}
|
||||
func (UnimplementedTunnelServiceServer) testEmbeddedByValue() {}
|
||||
@ -91,12 +91,12 @@ func RegisterTunnelServiceServer(s grpc.ServiceRegistrar, srv TunnelServiceServe
|
||||
s.RegisterService(&TunnelService_ServiceDesc, srv)
|
||||
}
|
||||
|
||||
func _TunnelService_EstablishControlConnection_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
return srv.(TunnelServiceServer).EstablishControlConnection(&grpc.GenericServerStream[Message, Message]{ServerStream: stream})
|
||||
func _TunnelService_Connect_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
return srv.(TunnelServiceServer).Connect(&grpc.GenericServerStream[Message, Message]{ServerStream: stream})
|
||||
}
|
||||
|
||||
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||
type TunnelService_EstablishControlConnectionServer = grpc.BidiStreamingServer[Message, Message]
|
||||
type TunnelService_ConnectServer = grpc.BidiStreamingServer[Message, Message]
|
||||
|
||||
// TunnelService_ServiceDesc is the grpc.ServiceDesc for TunnelService service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
@ -107,8 +107,8 @@ var TunnelService_ServiceDesc = grpc.ServiceDesc{
|
||||
Methods: []grpc.MethodDesc{},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
StreamName: "EstablishControlConnection",
|
||||
Handler: _TunnelService_EstablishControlConnection_Handler,
|
||||
StreamName: "Connect",
|
||||
Handler: _TunnelService_Connect_Handler,
|
||||
ServerStreams: true,
|
||||
ClientStreams: true,
|
||||
},
|
||||
|
Loading…
x
Reference in New Issue
Block a user