From b933cbe9ee24ea9326fe24405c095534b99bb588 Mon Sep 17 00:00:00 2001 From: admin Date: Thu, 4 Jun 2026 17:32:11 +0700 Subject: [PATCH] fixing some functions --- Makefile | 63 +++++ config/config.go | 8 + example-config.toml | 37 +++ main.go | 27 +- proxymap/proxymap.go | 129 +++++++-- streams/stream.go | 603 +++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 847 insertions(+), 20 deletions(-) create mode 100644 Makefile create mode 100644 streams/stream.go diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..270ea56 --- /dev/null +++ b/Makefile @@ -0,0 +1,63 @@ +BINARY := acme-reverseproxy +PKG := . +GO := go +GOFLAGS := -mod=vendor +LDFLAGS := -s -w + +BIN_DIR := bin +BIN := $(BIN_DIR)/$(BINARY) + +.PHONY: all build install run test fmt vet mod tidy clean docker gen-config help + +all: build + +help: + @echo "Targets:" + @echo " build compile $(BINARY) to $(BIN)" + @echo " install install to \$$GOPATH/bin or \$$GOBIN" + @echo " run run server with example-config.toml" + @echo " test run tests" + @echo " fmt go fmt ./..." + @echo " vet go vet ./..." + @echo " mod download and verify modules" + @echo " tidy go mod tidy" + @echo " gen-config generate sample config to stdout" + @echo " docker build Docker image" + @echo " clean remove build artifacts" + +build: $(BIN) + +$(BIN): + @mkdir -p $(BIN_DIR) + $(GO) build $(GOFLAGS) -ldflags "$(LDFLAGS)" -o $(BIN) $(PKG) + +install: + $(GO) install $(GOFLAGS) -ldflags "$(LDFLAGS)" $(PKG) + +run: build + $(BIN) srv --config example-config.toml + +test: + $(GO) test $(GOFLAGS) ./... + +fmt: + $(GO) fmt ./... + +vet: + $(GO) vet $(GOFLAGS) ./... + +mod: + $(GO) mod download + $(GO) mod verify + +tidy: + $(GO) mod tidy + +gen-config: build + $(BIN) gen config + +docker: + docker build -t $(BINARY) . + +clean: + rm -rf $(BIN_DIR) diff --git a/config/config.go b/config/config.go index 357d3e4..946851a 100644 --- a/config/config.go +++ b/config/config.go @@ -5,6 +5,7 @@ type Config struct { Mapping map[string]string WellKnownDir string NFS NFSConfig + Streams map[string]StreamConfig } type CA struct { @@ -19,3 +20,10 @@ type NFSConfig struct { MountPoint string `toml:"mount_point"` Options string `toml:"options"` } + +// StreamConfig описывает конфигурацию UDP/TCP стрима +type StreamConfig struct { + Protocol string `toml:"protocol"` // "tcp" или "udp" + Target string `toml:"target"` // целевой адрес в формате "host:port" + Timeout int `toml:"timeout"` // таймаут в секундах (опционально, по умолчанию 300) +} diff --git a/example-config.toml b/example-config.toml index 40f4c90..ab8d98b 100644 --- a/example-config.toml +++ b/example-config.toml @@ -23,3 +23,40 @@ WellKnownDir = "/tmp/.well-known" "example.com" = "http://localhost:8080" "secure.example.com" = "https://localhost:8443" "api.example.com" = "http://192.168.1.10:3000" + +# UDP/TCP stream configurations +# Each stream is defined as a separate table with the domain name +# +# PORTS: +# - Shared mode: TCP/UDP on port 10000 (requires domain in header) +# - Transparent mode: Each domain gets its own port +# * TCP streams: ports 10001, 10002, 10003, ... +# * UDP streams: ports 11001, 11002, 11003, ... +# +# FORMAT: +# [Streams."domain.com"] +# protocol = "tcp"|"udp" +# target = "host:port" +# timeout = seconds (optional, default 300) + +# TCP stream examples +[Streams."tcp.example.com"] + protocol = "tcp" + target = "localhost:22" + timeout = 300 + +[Streams."ssh.example.com"] + protocol = "tcp" + target = "192.168.1.100:2222" + timeout = 600 + +# UDP stream examples +[Streams."dns.example.com"] + protocol = "udp" + target = "8.8.8.8:53" + timeout = 30 + +[Streams."ntp.example.com"] + protocol = "udp" + target = "pool.ntp.org:123" + timeout = 60 diff --git a/main.go b/main.go index 60ab372..d16e248 100644 --- a/main.go +++ b/main.go @@ -13,6 +13,7 @@ import ( "acme-reverseproxy/config" "acme-reverseproxy/nfs" "acme-reverseproxy/proxymap" + "acme-reverseproxy/streams" "github.com/BurntSushi/toml" "github.com/sirupsen/logrus" @@ -86,6 +87,10 @@ func BeforeAction(c *cli.Context) error { for host, target := range cfg.Mapping { logrus.Infof(" %s -> %s", host, target) } + logrus.Infof("Configured streams: %d", len(cfg.Streams)) + for domain, stream := range cfg.Streams { + logrus.Infof(" %s -> %s (%s)", domain, stream.Target, stream.Protocol) + } return nil } @@ -134,6 +139,13 @@ func SrvCommand(c *cli.Context) error { } } + // Инициализируем менеджер стримов + streamManager := streams.NewStreamManager(cfg.Streams) + if err := streamManager.Start(); err != nil { + logrus.Errorf("Failed to start stream manager: %v", err) + return cli.NewExitError(err, 2) + } + list := []string{} for key := range cfg.Mapping { if key != "" { @@ -178,7 +190,14 @@ func SrvCommand(c *cli.Context) error { httpsServer := &http.Server{ Handler: rph, } - httpHandler := m.HTTPHandler(rph) + acmeHTTPHandler := m.HTTPHandler(rph) + httpHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if proxymap.IsAcmeWellKnownPath(r.URL.Path) { + rph.ServeHTTP(w, r) + return + } + acmeHTTPHandler.ServeHTTP(w, r) + }) httpServer := &http.Server{Addr: ":80", Handler: httpHandler} logrus.Info(strings.Repeat("=", 61)) logrus.Info("Starting reverse proxy servers...") @@ -186,6 +205,8 @@ func SrvCommand(c *cli.Context) error { var wg sync.WaitGroup var firstError error var mu sync.Mutex + + // Запускаем HTTP/HTTPS серверы wg.Add(2) go func() { defer wg.Done() @@ -219,6 +240,10 @@ func SrvCommand(c *cli.Context) error { logrus.Info("Servers started successfully. Waiting for requests...") logrus.Info("Press Ctrl+C to stop") wg.Wait() + + // При остановке останавливаем стримы + streamManager.Stop() + // При остановке размонтируем NFS если он был смонтирован if cfg.NFS.Enabled { if err := nfs.DefaultManager.Unmount(); err != nil { diff --git a/proxymap/proxymap.go b/proxymap/proxymap.go index 5470168..a2d0b23 100644 --- a/proxymap/proxymap.go +++ b/proxymap/proxymap.go @@ -16,6 +16,47 @@ import ( "github.com/sirupsen/logrus" ) +const ( + acmeChallengePrefix = "/.well-known/acme-challenge/" + acmeWellknownPrefix = "/.well-known/acme-wellknown/" +) + +// IsAcmeWellKnownPath reports whether path is an ACME HTTP-01 challenge URL. +func IsAcmeWellKnownPath(path string) bool { + return strings.HasPrefix(path, acmeChallengePrefix) || strings.HasPrefix(path, acmeWellknownPrefix) +} + +func acmeTokenFromPath(path string) (token string, layout string, ok bool) { + switch { + case strings.HasPrefix(path, acmeChallengePrefix): + layout = acmeChallengePrefix + token = strings.TrimPrefix(path, acmeChallengePrefix) + case strings.HasPrefix(path, acmeWellknownPrefix): + layout = acmeWellknownPrefix + token = strings.TrimPrefix(path, acmeWellknownPrefix) + default: + return "", "", false + } + if token == "" || strings.Contains(token, "..") { + return "", "", false + } + return token, layout, true +} + +func acmeFilePaths(wellKnownDir, token string) []string { + return []string{ + filepath.Join(wellKnownDir, "acme-challenge", token), + filepath.Join(wellKnownDir, token), + } +} + +func defaultAcmeWritePath(wellKnownDir, token, layout string) string { + if layout == acmeWellknownPrefix { + return filepath.Join(wellKnownDir, token) + } + return filepath.Join(wellKnownDir, "acme-challenge", token) +} + // stripHostPort возвращает только имя хоста, если в строке был порт. func stripHostPort(host string) string { h, _, err := net.SplitHostPort(host) @@ -143,9 +184,15 @@ type reverseProxiesHandler struct { func (rph reverseProxiesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { logrus.Infof("Incoming request: %s %s from %s (Host: %s)", r.Method, r.URL.Path, r.RemoteAddr, r.Host) - if strings.HasPrefix(r.URL.Path, "/.well-known/acme-wellknown/") { - logrus.Infof("Handling .well-known/acme-wellknown request: %s", r.URL.Path) - rph.handleWellKnown(w, r) + if strings.HasPrefix(r.URL.Path, acmeChallengePrefix) || strings.HasPrefix(r.URL.Path, acmeWellknownPrefix) { + token, layout, ok := acmeTokenFromPath(r.URL.Path) + if !ok { + logrus.Warnf("Invalid .well-known path: %s", r.URL.Path) + http.Error(w, "Invalid path", http.StatusBadRequest) + return + } + logrus.Infof("Handling ACME well-known request: %s (layout=%s)", r.URL.Path, layout) + rph.handleWellKnown(w, r, token, layout) return } @@ -194,7 +241,7 @@ func (rph reverseProxiesHandler) getHostList() []string { return hosts } -func (rph reverseProxiesHandler) handleWellKnown(w http.ResponseWriter, r *http.Request) { +func (rph reverseProxiesHandler) handleWellKnown(w http.ResponseWriter, r *http.Request, token, layout string) { // Проверяем и при необходимости монтируем NFS перед обработкой if err := nfs.EnsureMounted(); err != nil { logrus.Errorf("Failed to ensure NFS is mounted: %v", err) @@ -202,23 +249,18 @@ func (rph reverseProxiesHandler) handleWellKnown(w http.ResponseWriter, r *http. return } - path := strings.TrimPrefix(r.URL.Path, "/.well-known/acme-wellknown/") - if path == "" || strings.Contains(path, "..") { - logrus.Warnf("Invalid .well-known path: %s", r.URL.Path) - http.Error(w, "Invalid path", http.StatusBadRequest) - return - } - fullPath := filepath.Join(rph.WellKnownDir, path) - logrus.Infof("Well-known request: method=%s, path=%s, fullPath=%s", r.Method, path, fullPath) + paths := acmeFilePaths(rph.WellKnownDir, token) + logrus.Infof("Well-known request: method=%s, token=%s, layout=%s, paths=%v", + r.Method, token, layout, paths) switch r.Method { case "GET", "HEAD": - data, err := ioutil.ReadFile(fullPath) + data, _, err := readFirstExisting(paths) if err != nil { if os.IsNotExist(err) { http.NotFound(w, r) return } - logrus.Errorf("Error reading file %s: %v", fullPath, err) + logrus.Errorf("Error reading ACME challenge files %v: %v", paths, err) http.Error(w, "Internal server error", http.StatusInternalServerError) return } @@ -234,14 +276,18 @@ func (rph reverseProxiesHandler) handleWellKnown(w http.ResponseWriter, r *http. http.Error(w, "Bad request", http.StatusBadRequest) return } - dir := filepath.Dir(fullPath) + writePath := existingAcmePath(paths) + if writePath == "" { + writePath = defaultAcmeWritePath(rph.WellKnownDir, token, layout) + } + dir := filepath.Dir(writePath) if err := os.MkdirAll(dir, 0755); err != nil { logrus.Errorf("Error creating directory %s: %v", dir, err) http.Error(w, "Internal server error", http.StatusInternalServerError) return } - if err := ioutil.WriteFile(fullPath, data, 0644); err != nil { - logrus.Errorf("Error writing file %s: %v", fullPath, err) + if err := ioutil.WriteFile(writePath, data, 0644); err != nil { + logrus.Errorf("Error writing file %s: %v", writePath, err) http.Error(w, "Internal server error", http.StatusInternalServerError) return } @@ -252,12 +298,12 @@ func (rph reverseProxiesHandler) handleWellKnown(w http.ResponseWriter, r *http. w.Write([]byte("File created successfully")) } case "DELETE": - if err := os.Remove(fullPath); err != nil { + if err := removeFirstExisting(paths); err != nil { if os.IsNotExist(err) { http.NotFound(w, r) return } - logrus.Errorf("Error deleting file %s: %v", fullPath, err) + logrus.Errorf("Error deleting ACME challenge files %v: %v", paths, err) http.Error(w, "Internal server error", http.StatusInternalServerError) return } @@ -267,3 +313,48 @@ func (rph reverseProxiesHandler) handleWellKnown(w http.ResponseWriter, r *http. http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) } } + +func readFirstExisting(paths []string) ([]byte, string, error) { + var lastErr error + for _, path := range paths { + data, err := ioutil.ReadFile(path) + if err == nil { + return data, path, nil + } + if !os.IsNotExist(err) { + return nil, "", err + } + lastErr = err + } + if lastErr == nil { + lastErr = os.ErrNotExist + } + return nil, "", lastErr +} + +func existingAcmePath(paths []string) string { + for _, path := range paths { + if _, err := os.Stat(path); err == nil { + return path + } + } + return "" +} + +func removeFirstExisting(paths []string) error { + var lastErr error + for _, path := range paths { + err := os.Remove(path) + if err == nil { + return nil + } + if !os.IsNotExist(err) { + return err + } + lastErr = err + } + if lastErr == nil { + lastErr = os.ErrNotExist + } + return lastErr +} diff --git a/streams/stream.go b/streams/stream.go new file mode 100644 index 0000000..345ace1 --- /dev/null +++ b/streams/stream.go @@ -0,0 +1,603 @@ +package streams + +import ( + "context" + "fmt" + "io" + "net" + "sync" + "time" + + "acme-reverseproxy/config" + "github.com/sirupsen/logrus" +) + +// StreamManager управляет TCP/UDP стримами +type StreamManager struct { + streams map[string]config.StreamConfig + tcpListener net.Listener + udpConn *net.UDPConn + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + // Поддержка отдельных портов для каждого домена + tcpListeners map[string]net.Listener + udpConns map[string]*net.UDPConn +} + +// NewStreamManager создает новый менеджер стримов +func NewStreamManager(streams map[string]config.StreamConfig) *StreamManager { + ctx, cancel := context.WithCancel(context.Background()) + return &StreamManager{ + streams: streams, + ctx: ctx, + cancel: cancel, + tcpListeners: make(map[string]net.Listener), + udpConns: make(map[string]*net.UDPConn), + } +} + +// Start запускает TCP и UDP серверы +func (sm *StreamManager) Start() error { + if len(sm.streams) == 0 { + logrus.Info("No streams configured, skipping stream manager initialization") + return nil + } + + logrus.Infof("Starting stream manager with %d streams", len(sm.streams)) + + // Запускаем TCP сервер + if err := sm.startTCP(); err != nil { + return fmt.Errorf("failed to start TCP server: %w", err) + } + + // Запускаем UDP сервер + if err := sm.startUDP(); err != nil { + sm.tcpListener.Close() + return fmt.Errorf("failed to start UDP server: %w", err) + } + + return nil +} + +// startTCP запускает TCP сервер на общем порту 10000 +// и отдельные порты для каждого домена (10001, 10002, ...) +func (sm *StreamManager) startTCP() error { + // Общий сервер на порту 10000 (требует указания домена в заголовке) + listener, err := net.Listen("tcp", ":10000") + if err != nil { + return fmt.Errorf("failed to listen on port 10000: %w", err) + } + + sm.tcpListener = listener + logrus.Info("TCP stream server started on port 10000 (shared mode)") + + sm.wg.Add(1) + go func() { + defer sm.wg.Done() + for { + select { + case <-sm.ctx.Done(): + logrus.Info("TCP stream server shutting down") + return + default: + } + + conn, err := listener.Accept() + if err != nil { + select { + case <-sm.ctx.Done(): + return + default: + logrus.Errorf("Failed to accept TCP connection: %v", err) + } + continue + } + + sm.wg.Add(1) + go func(c net.Conn) { + defer sm.wg.Done() + defer c.Close() + sm.handleTCPConnection(c, "") + }(conn) + } + }() + + // Отдельные порты для каждого домена (прозрачный режим) + port := 10001 + for domain, cfg := range sm.streams { + if cfg.Protocol != "tcp" { + continue + } + addr := fmt.Sprintf(":%d", port) + listener, err := net.Listen("tcp", addr) + if err != nil { + logrus.Warnf("Failed to listen on port %d for domain %s: %v", port, domain, err) + port++ + continue + } + sm.tcpListeners[domain] = listener + logrus.Infof("TCP stream server for %s started on port %d (transparent mode)", domain, port) + + sm.wg.Add(1) + go func(d string, l net.Listener) { + defer sm.wg.Done() + for { + select { + case <-sm.ctx.Done(): + logrus.Infof("TCP stream server for %s shutting down", d) + return + default: + } + + conn, err := l.Accept() + if err != nil { + select { + case <-sm.ctx.Done(): + return + default: + logrus.Errorf("Failed to accept TCP connection for %s: %v", d, err) + } + continue + } + + sm.wg.Add(1) + go func(c net.Conn, domain string) { + defer sm.wg.Done() + defer c.Close() + sm.handleTCPConnection(c, domain) + }(conn, d) + } + }(domain, listener) + port++ + } + + return nil +} + +// startUDP запускает UDP сервер на общем порту 10000 +// и отдельные порты для каждого домена (11001, 11002, ...) +func (sm *StreamManager) startUDP() error { + // Общий сервер на порту 10000 (требует указания домена в заголовке) + addr, err := net.ResolveUDPAddr("udp", ":10000") + if err != nil { + return fmt.Errorf("failed to resolve UDP address: %w", err) + } + + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return fmt.Errorf("failed to listen on UDP port 10000: %w", err) + } + + sm.udpConn = conn + logrus.Info("UDP stream server started on port 10000 (shared mode)") + + sm.wg.Add(1) + go func() { + defer sm.wg.Done() + buf := make([]byte, 65536) + for { + select { + case <-sm.ctx.Done(): + logrus.Info("UDP stream server shutting down") + return + default: + } + + n, addr, err := conn.ReadFromUDP(buf) + if err != nil { + select { + case <-sm.ctx.Done(): + return + default: + logrus.Errorf("Failed to read UDP packet: %v", err) + } + continue + } + + sm.wg.Add(1) + go func(data []byte, a *net.UDPAddr) { + defer sm.wg.Done() + sm.handleUDPConnection(data, a, "") + }(buf[:n], addr) + } + }() + + // Отдельные порты для каждого домена (прозрачный режим) + port := 11001 + for domain, cfg := range sm.streams { + if cfg.Protocol != "udp" { + continue + } + addr := fmt.Sprintf(":%d", port) + udpAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + logrus.Warnf("Failed to resolve UDP address %s for domain %s: %v", addr, domain, err) + port++ + continue + } + + conn, err := net.ListenUDP("udp", udpAddr) + if err != nil { + logrus.Warnf("Failed to listen on UDP port %d for domain %s: %v", port, domain, err) + port++ + continue + } + + sm.udpConns[domain] = conn + logrus.Infof("UDP stream server for %s started on port %d (transparent mode)", domain, port) + + sm.wg.Add(1) + go func(d string, c *net.UDPConn) { + defer sm.wg.Done() + buf := make([]byte, 65536) + for { + select { + case <-sm.ctx.Done(): + logrus.Infof("UDP stream server for %s shutting down", d) + return + default: + } + + n, addr, err := c.ReadFromUDP(buf) + if err != nil { + select { + case <-sm.ctx.Done(): + return + default: + logrus.Errorf("Failed to read UDP packet for %s: %v", d, err) + } + continue + } + + sm.wg.Add(1) + go func(data []byte, a *net.UDPAddr, domain string) { + defer sm.wg.Done() + sm.handleUDPConnection(data, a, domain) + }(buf[:n], addr, d) + } + }(domain, conn) + port++ + } + + return nil +} + +// handleTCPConnection обрабатывает входящее TCP соединение +// Если domain указан, используется прозрачный режим +// Если domain пуст, требуется указание домена в заголовке +func (sm *StreamManager) handleTCPConnection(conn net.Conn, domain string) { + var targetDomain string + var initialData []byte + + if domain != "" { + // Прозрачный режим - домен уже известен + targetDomain = domain + logrus.Infof("TCP connection from %s for domain: %s (transparent mode)", conn.RemoteAddr(), targetDomain) + } else { + // Общий режим - читаем заголовок для определения домена + headerBuf := make([]byte, 1024) + conn.SetReadDeadline(time.Now().Add(5 * time.Second)) + n, err := io.ReadFull(conn, headerBuf) + if err != nil { + logrus.Errorf("Failed to read TCP header: %v", err) + return + } + conn.SetReadDeadline(time.Time{}) + + targetDomain = extractDomainFromHeader(headerBuf[:n]) + if targetDomain == "" { + logrus.Warn("No target domain found in TCP connection header") + conn.Write([]byte("HTTP/1.1 400 Bad Request\r\n\r\nDomain header required\n")) + return + } + initialData = headerBuf[:n] + logrus.Infof("TCP connection from %s targeting domain: %s", conn.RemoteAddr(), targetDomain) + } + + // Находим конфигурацию стрима для этого домена + sm.mu.RLock() + streamConfig, exists := sm.streams[targetDomain] + sm.mu.RUnlock() + + if !exists { + logrus.Warnf("No stream configuration found for domain: %s", targetDomain) + conn.Write([]byte("HTTP/1.1 404 Not Found\r\n\r\nNo stream configured for this domain\n")) + return + } + + if streamConfig.Protocol != "tcp" { + logrus.Warnf("Protocol mismatch: domain %s configured for %s, not TCP", targetDomain, streamConfig.Protocol) + conn.Write([]byte("HTTP/1.1 500 Internal Server Error\r\n\r\nProtocol mismatch\n")) + return + } + + // Проксируем соединение + sm.proxyTCP(conn, streamConfig.Target, initialData) +} + +// handleUDPConnection обрабатывает входящее UDP соединение +// Если domain указан, используется прозрачный режим +// Если domain пуст, требуется указание домена в заголовке +func (sm *StreamManager) handleUDPConnection(data []byte, addr *net.UDPAddr, domain string) { + var targetDomain string + + if domain != "" { + // Прозрачный режим - домен уже известен + targetDomain = domain + logrus.Infof("UDP packet from %s for domain: %s (transparent mode)", addr, targetDomain) + } else { + // Общий режим - извлекаем домен из данных + targetDomain = extractDomainFromHeader(data) + if targetDomain == "" { + logrus.Warn("No target domain found in UDP packet") + return + } + logrus.Infof("UDP packet from %s targeting domain: %s", addr, targetDomain) + } + + // Находим конфигурацию стрима для этого домена + sm.mu.RLock() + streamConfig, exists := sm.streams[targetDomain] + sm.mu.RUnlock() + + if !exists { + logrus.Warnf("No stream configuration found for domain: %s", targetDomain) + return + } + + if streamConfig.Protocol != "udp" { + logrus.Warnf("Protocol mismatch: domain %s configured for %s, not UDP", targetDomain, streamConfig.Protocol) + return + } + + // Проксируем UDP пакет + sm.proxyUDP(data, addr, streamConfig.Target) +} + +// proxyTCP проксирует TCP соединение +func (sm *StreamManager) proxyTCP(conn net.Conn, target string, initialData []byte) { + // Получаем таймаут из конфигурации (по умолчанию 300 секунд) + timeout := 300 * time.Second + if targetConfig, exists := sm.streams[conn.LocalAddr().String()]; exists { + if targetConfig.Timeout > 0 { + timeout = time.Duration(targetConfig.Timeout) * time.Second + } + } + + // Разрешаем целевой адрес + targetAddr := target + if _, _, err := net.SplitHostPort(target); err != nil { + // Если порта нет, добавляем дефолтный + targetAddr = net.JoinHostPort(target, "80") + } + + backendConn, err := net.DialTimeout("tcp", targetAddr, time.Duration(30)*time.Second) + if err != nil { + logrus.Errorf("Failed to connect to backend %s: %v", targetAddr, err) + conn.Write([]byte("HTTP/1.1 502 Bad Gateway\r\n\r\nFailed to connect to backend\n")) + return + } + defer backendConn.Close() + + // Устанавливаем таймаут + conn.SetDeadline(time.Now().Add(timeout)) + backendConn.SetDeadline(time.Now().Add(timeout)) + + // Отправляем начальные данные на бэкенд + if len(initialData) > 0 { + if _, err := backendConn.Write(initialData); err != nil { + logrus.Errorf("Failed to write initial data to backend: %v", err) + return + } + } + + // Запускаем двустороннюю передачу данных + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + defer func() { + if r := recover(); r != nil { + logrus.Errorf("Panic in TCP forward (client->backend): %v", r) + } + }() + io.Copy(backendConn, conn) + }() + + go func() { + defer wg.Done() + defer func() { + if r := recover(); r != nil { + logrus.Errorf("Panic in TCP forward (backend->client): %v", r) + } + }() + io.Copy(conn, backendConn) + }() + + wg.Wait() + logrus.Debugf("TCP proxy connection closed: %s -> %s", conn.RemoteAddr(), targetAddr) +} + +// proxyUDP проксирует UDP пакет +func (sm *StreamManager) proxyUDP(data []byte, clientAddr *net.UDPAddr, target string) { + // Разрешаем целевой адрес + targetAddr := target + if _, _, err := net.SplitHostPort(target); err != nil { + targetAddr = net.JoinHostPort(target, "80") + } + + backendAddr, err := net.ResolveUDPAddr("udp", targetAddr) + if err != nil { + logrus.Errorf("Failed to resolve backend address %s: %v", targetAddr, err) + return + } + + backendConn, err := net.DialUDP("udp", nil, backendAddr) + if err != nil { + logrus.Errorf("Failed to connect to backend %s: %v", targetAddr, err) + return + } + defer backendConn.Close() + + // Устанавливаем таймаут + backendConn.SetDeadline(time.Now().Add(30 * time.Second)) + + // Отправляем данные на бэкенд + if _, err := backendConn.Write(data); err != nil { + logrus.Errorf("Failed to send data to backend: %v", err) + return + } + + // Читаем ответ от бэкенда + responseBuf := make([]byte, 65536) + n, err := backendConn.Read(responseBuf) + if err != nil { + logrus.Errorf("Failed to read response from backend: %v", err) + return + } + + // Отправляем ответ клиенту + _, err = sm.udpConn.WriteToUDP(responseBuf[:n], clientAddr) + if err != nil { + logrus.Errorf("Failed to send response to client: %v", err) + return + } + + logrus.Debugf("UDP proxy packet forwarded: %s -> %s -> %s", clientAddr, targetAddr, clientAddr) +} + +// extractDomainFromHeader извлекает домен из заголовка +func extractDomainFromHeader(data []byte) string { + // Ищем первую строку, которая может содержать домен + // Поддерживаем форматы: + // - "DOMAIN example.com" (простой текстовый формат) + // - "CONNECT example.com:443 HTTP/1.1" (HTTP CONNECT) + // - "GET http://example.com/path HTTP/1.1" (HTTP GET) + + lines := splitLines(data) + if len(lines) == 0 { + return "" + } + + firstLine := string(lines[0]) + + // Проверяем формат CONNECT + if len(firstLine) > 8 && firstLine[:7] == "CONNECT" { + parts := splitString(firstLine[8:], ' ') + if len(parts) > 0 { + host := parts[0] + // Убираем порт если есть + if h, _, err := net.SplitHostPort(host); err == nil { + return h + } + return host + } + } + + // Проверяем формат GET/POST с полным URL + if (len(firstLine) > 3 && (firstLine[:3] == "GET" || firstLine[:3] == "POST")) { + parts := splitString(firstLine, ' ') + if len(parts) > 1 { + urlStr := parts[1] + // Пытаемся извлечь домен из URL + if urlStr[:7] == "http://" || urlStr[:8] == "https://" { + // Убираем схему + urlStr = urlStr[7:] + if urlStr[:8] == "https://" { + urlStr = urlStr[8:] + } + // Берем домен до первого слэша + for i, c := range urlStr { + if c == '/' || c == ':' { + urlStr = urlStr[:i] + break + } + } + return urlStr + } + } + } + + // Проверяем простой текстовый формат "DOMAIN " + if len(firstLine) > 7 && firstLine[:6] == "DOMAIN" { + domain := trimString(firstLine[7:]) + return domain + } + + return "" +} + +// splitLines разбивает данные на строки +func splitLines(data []byte) [][]byte { + var lines [][]byte + start := 0 + for i, b := range data { + if b == '\n' || b == '\r' { + lines = append(lines, data[start:i]) + start = i + 1 + } + } + if start < len(data) { + lines = append(lines, data[start:]) + } + return lines +} + +// splitString разбивает строку по разделителю +func splitString(s string, sep rune) []string { + var parts []string + start := 0 + for i, c := range s { + if c == sep { + parts = append(parts, s[start:i]) + start = i + 1 + } + } + parts = append(parts, s[start:]) + return parts +} + +// trimString убирает пробелы и переносы строк +func trimString(s string) string { + start := 0 + end := len(s) + for start < end && (s[start] == ' ' || s[start] == '\t' || s[start] == '\n' || s[start] == '\r') { + start++ + } + for end > start && (s[end-1] == ' ' || s[end-1] == '\t' || s[end-1] == '\n' || s[end-1] == '\r') { + end-- + } + return s[start:end] +} + +// Stop останавливает все серверы +func (sm *StreamManager) Stop() { + logrus.Info("Stopping stream manager...") + sm.cancel() + if sm.tcpListener != nil { + sm.tcpListener.Close() + } + if sm.udpConn != nil { + sm.udpConn.Close() + } + for _, listener := range sm.tcpListeners { + listener.Close() + } + for _, conn := range sm.udpConns { + conn.Close() + } + done := make(chan struct{}) + go func() { + sm.wg.Wait() + close(done) + }() + select { + case <-done: + logrus.Info("Stream manager stopped") + case <-time.After(10 * time.Second): + logrus.Warn("Stream manager stop timeout") + } +}