From beb09cb7c63a044d4e2cbe95daf8e4e9e9814c8f Mon Sep 17 00:00:00 2001 From: admin Date: Fri, 6 Feb 2026 15:41:01 +0700 Subject: [PATCH] first commit --- .dockerignore | 31 +++ .gitea/workflows/ci-cd.yaml | 86 ++++++ .gitignore | 31 +++ Dockerfile | 25 ++ README.md | 194 ++++++++++++++ docker-compose.yaml | 11 + go.mod | 18 ++ go.sum | 20 ++ index.html | 503 ++++++++++++++++++++++++++++++++++++ main.go | 297 +++++++++++++++++++++ 10 files changed, 1216 insertions(+) create mode 100644 .dockerignore create mode 100644 .gitea/workflows/ci-cd.yaml create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 docker-compose.yaml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 index.html create mode 100644 main.go diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..6b30f97 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,31 @@ +# Git +.git +.gitignore +.gitea + +# IDE and editor +.idea +.vscode +*.swp +*.swo +*~ + +# Build artifacts +nats-ui +*.exe +*.test +*.out + +# Documentation (not needed in image) +README.md +*.md + +# Docker +Dockerfile +.dockerignore +docker-compose.yaml +docker-compose*.yml + +# OS +.DS_Store +Thumbs.db diff --git a/.gitea/workflows/ci-cd.yaml b/.gitea/workflows/ci-cd.yaml new file mode 100644 index 0000000..d404cfa --- /dev/null +++ b/.gitea/workflows/ci-cd.yaml @@ -0,0 +1,86 @@ +name: CI/CD + +on: + push: + branches: + - main + - develop + - 'release/**' + pull_request: + branches: + - main + - develop + +env: + GO_VERSION: '1.21' + IMAGE_NAME: 'nats-ui' + DOCKER_REGISTRY: hub.p42.ru + IMAGE_REPO: hub.p42.ru/redirsvr/nats-ui + +jobs: + test: + name: Lint & Test + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: ${{ env.GO_VERSION }} + + - name: Download dependencies + run: go mod download + + - name: Run linter + uses: golangci/golangci-lint-action@v3 + with: + version: v1.64.8 + args: --timeout=5m + + - name: Run tests + run: go test -v -race ./... + + build: + name: Build & Push Docker image (hub.p42.ru) + runs-on: ubuntu-latest + needs: test + if: github.event_name == 'push' + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + - name: Log in to hub.p42.ru + uses: docker/login-action@v2 + with: + registry: ${{ env.DOCKER_REGISTRY }} + username: ${{ secrets.HUB_USER }} + password: ${{ secrets.HUB_SECRET }} + + - name: Extract metadata + id: meta + uses: docker/metadata-action@v4 + with: + images: ${{ env.IMAGE_REPO }} + tags: | + type=sha + type=ref,event=branch + type=raw,value=latest,enable={{is_default_branch}} + + - name: Build and push Docker image + uses: docker/build-push-action@v4 + with: + context: . + file: ./Dockerfile + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + # hub.p42.ru ограничивает размер blob-uploads (413), поэтому registry-cache отключаем. + # Используем inline cache (без отдельного :buildcache тега). + cache-from: type=registry,ref=${{ env.IMAGE_REPO }}:latest + cache-to: type=inline + diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8871315 --- /dev/null +++ b/.gitignore @@ -0,0 +1,31 @@ +# Binaries +nats-ui +config.yaml +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary +*.test + +# Output of the go coverage tool +*.out + +# Dependency directories +vendor/ + +# Go workspace file +go.work + +# IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..1af08da --- /dev/null +++ b/Dockerfile @@ -0,0 +1,25 @@ +# Build stage +FROM golang:1.21-alpine AS builder + +WORKDIR /app + +COPY go.mod go.sum ./ +RUN go mod download + +COPY main.go ./ +RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o nats-ui main.go + +# Runtime stage +FROM alpine:3.19 + +RUN apk --no-cache add ca-certificates tzdata + +WORKDIR /app + +COPY --from=builder /app/nats-ui . +COPY index.html config.yaml ./ + +EXPOSE 8080 + +ENTRYPOINT ["./nats-ui"] +CMD ["config.yaml"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..d2aa02d --- /dev/null +++ b/README.md @@ -0,0 +1,194 @@ +# NATS Queue Visualizer + +Веб-приложение на Go для визуализации содержимого очередей NATS в реальном времени. + +## Возможности + +- 🔄 Подписка на NATS subjects/очереди в реальном времени +- 📊 Красивый веб-интерфейс для просмотра сообщений +- 🔍 Фильтрация по subject и поиск по содержимому +- 📈 Статистика: количество сообщений, subjects, общий размер +- 🌐 WebSocket для обновлений в реальном времени +- 💾 Хранение последних N сообщений в памяти + +## Требования + +- Go 1.21 или выше +- NATS сервер (локальный или удаленный) + +## Установка + +1. Клонируйте репозиторий или скачайте файлы +2. Установите зависимости: + +```bash +go mod download +``` + +## Конфигурация + +Все настройки приложения находятся в файле `config.yaml`. Создайте этот файл или отредактируйте существующий: + +```yaml +# URL NATS сервера +nats_url: "nats://localhost:4222" + +# Аутентификация NATS (используйте один из вариантов) +# Вариант 1: Токен +# token: "your-token-here" + +# Вариант 2: Имя пользователя и пароль +# username: "your-username" +# password: "your-password" + +# Список subjects для подписки (через запятую) +subjects: ">" + +# Порт HTTP сервера +port: "8080" + +# Максимальное количество сообщений для хранения в памяти +max_messages: 1000 +``` + +## Использование + +### Базовое использование + +```bash +go run main.go +``` + +Приложение автоматически загрузит конфигурацию из `config.yaml` в текущей директории. + +### Использование другого конфигурационного файла + +```bash +go run main.go /path/to/custom-config.yaml +``` + +Можно указать путь к другому конфигурационному файлу в качестве аргумента командной строки. + +### Примеры subjects + +- `>` - все subjects +- `orders.*` - все subjects, начинающиеся с `orders.` +- `events.>` - все subjects в пространстве имен `events` +- `orders.created,orders.updated` - конкретные subjects через запятую + +## Веб-интерфейс + +Откройте браузер и перейдите на `http://localhost:8080` + +### Функции интерфейса: + +1. **Фильтр по subject** - выберите конкретный subject из выпадающего списка +2. **Поиск** - введите текст для поиска по содержимому сообщений +3. **Очистить** - удалить все сообщения из памяти (не влияет на NATS) +4. **Обновить** - перезагрузить все сообщения с сервера + +### Статистика + +Интерфейс показывает: +- Общее количество полученных сообщений +- Количество уникальных subjects +- Общий размер всех сообщений в байтах + +## API Endpoints + +- `GET /` - веб-интерфейс +- `GET /api/messages?subject=...` - получить все сообщения (опционально отфильтрованные по subject) +- `GET /api/subjects` - получить список всех subjects +- `WS /ws` - WebSocket для получения сообщений в реальном времени + +## Примеры конфигурации + +### Аутентификация с токеном + +```yaml +nats_url: "nats://nats.example.com:4222" +token: "your-secret-token" +subjects: ">" +port: "8080" +max_messages: 1000 +``` + +### Аутентификация с username/password + +```yaml +nats_url: "nats://nats.example.com:4222" +username: "myuser" +password: "mypassword" +subjects: ">" +port: "8080" +max_messages: 1000 +``` + +### Подписка на конкретные subjects + +Отредактируйте `config.yaml`: +```yaml +nats_url: "nats://localhost:4222" +subjects: "orders.created,orders.updated,orders.deleted" +port: "8080" +max_messages: 1000 +``` + +### Подписка на все subjects в пространстве имен + +```yaml +subjects: "events.>" +``` + +### Подключение к удаленному NATS серверу + +```yaml +nats_url: "nats://nats.example.com:4222" +``` + +### Увеличение лимита сообщений + +```yaml +max_messages: 5000 +``` + +## Сборка + +Для создания исполняемого файла: + +```bash +go build -o nats-ui main.go +``` + +Затем запустите: + +```bash +./nats-ui +``` + +Или с указанием конфигурационного файла: + +```bash +./nats-ui /path/to/config.yaml +``` + +## Структура проекта + +``` +nats-ui/ +├── main.go # Основной код приложения +├── index.html # Веб-интерфейс +├── config.yaml # Конфигурационный файл +├── go.mod # Зависимости Go +└── README.md # Документация +``` + +## Зависимости + +- `github.com/nats-io/nats.go` - клиент NATS +- `github.com/gorilla/websocket` - WebSocket поддержка +- `gopkg.in/yaml.v3` - парсер YAML конфигурации + +## Лицензия + +MIT diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..1b8a9de --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,11 @@ +services: + nats-ui: + image: hub.p42.ru/redirsvr/nats-ui:latest + container_name: nats-ui + ports: + - 0.0.0.0:8080:8080 + volumes: + - ./config.yaml:/app/config.yaml:ro + restart: unless-stopped + environment: + - TZ=Asia/Bangkok diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..cd84632 --- /dev/null +++ b/go.mod @@ -0,0 +1,18 @@ +module nats-ui + +go 1.21 + +require ( + github.com/gorilla/websocket v1.5.1 + github.com/nats-io/nats.go v1.31.0 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + github.com/klauspost/compress v1.17.2 // indirect + github.com/nats-io/nkeys v0.4.6 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.14.0 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..cd7eb21 --- /dev/null +++ b/go.sum @@ -0,0 +1,20 @@ +github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= +github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= +github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= +github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= +github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/index.html b/index.html new file mode 100644 index 0000000..a24cdde --- /dev/null +++ b/index.html @@ -0,0 +1,503 @@ + + + + + + NATS Queue Visualizer + + + +
+
+

🚀 NATS Queue Visualizer

+
Отключено
+
+ +
+
+ + +
+ + +
+ +
+
+
0
+
Всего сообщений
+
+
+
0
+
Уникальных subjects
+
+
+
0
+
Всего байт
+
+
+ +
+
+ + + +

Ожидание сообщений из NATS...

+
+
+
+ + + + diff --git a/main.go b/main.go new file mode 100644 index 0000000..2346705 --- /dev/null +++ b/main.go @@ -0,0 +1,297 @@ +package main + +import ( + "encoding/json" + "fmt" + "log" + "net/http" + "os" + "path/filepath" + "sync" + "time" + + "github.com/gorilla/websocket" + "github.com/nats-io/nats.go" + "gopkg.in/yaml.v3" +) + +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +type Message struct { + ID string `json:"id"` + Subject string `json:"subject"` + Data string `json:"data"` + Timestamp time.Time `json:"timestamp"` + Size int `json:"size"` +} + +type MessageStore struct { + mu sync.RWMutex + messages []Message + maxSize int +} + +func NewMessageStore(maxSize int) *MessageStore { + return &MessageStore{messages: make([]Message, 0), maxSize: maxSize} +} + +func (ms *MessageStore) Add(msg Message) { + ms.mu.Lock() + defer ms.mu.Unlock() + ms.messages = append(ms.messages, msg) + if len(ms.messages) > ms.maxSize { + ms.messages = ms.messages[1:] + } +} + +func (ms *MessageStore) GetAll() []Message { + ms.mu.RLock() + defer ms.mu.RUnlock() + result := make([]Message, len(ms.messages)) + copy(result, ms.messages) + return result +} + +func (ms *MessageStore) GetBySubject(subject string) []Message { + ms.mu.RLock() + defer ms.mu.RUnlock() + var result []Message + for _, msg := range ms.messages { + if msg.Subject == subject { + result = append(result, msg) + } + } + return result +} + +func (ms *MessageStore) GetSubjects() []string { + ms.mu.RLock() + defer ms.mu.RUnlock() + subjectMap := make(map[string]bool) + for _, msg := range ms.messages { + subjectMap[msg.Subject] = true + } + subjects := make([]string, 0, len(subjectMap)) + for subject := range subjectMap { + subjects = append(subjects, subject) + } + return subjects +} + +type Hub struct { + clients map[*websocket.Conn]bool + broadcast chan Message + register chan *websocket.Conn + unregister chan *websocket.Conn +} + +func NewHub() *Hub { + return &Hub{ + clients: make(map[*websocket.Conn]bool), + broadcast: make(chan Message, 256), + register: make(chan *websocket.Conn), + unregister: make(chan *websocket.Conn), + } +} + +func (h *Hub) Run() { + for { + select { + case conn := <-h.register: + h.clients[conn] = true + case conn := <-h.unregister: + if _, ok := h.clients[conn]; ok { + delete(h.clients, conn) + conn.Close() + } + case message := <-h.broadcast: + for conn := range h.clients { + err := conn.WriteJSON(message) + if err != nil { + log.Printf("WebSocket error: %v", err) + delete(h.clients, conn) + conn.Close() + } + } + } + } +} + +type Config struct { + NatsURL string `yaml:"nats_url"` + Subjects string `yaml:"subjects"` + Port string `yaml:"port"` + Token string `yaml:"token"` + Username string `yaml:"username"` + Password string `yaml:"password"` + MaxMessages int `yaml:"max_messages"` +} + +func loadConfig(filename string) (*Config, error) { + data, err := os.ReadFile(filename) + if err != nil { + return nil, fmt.Errorf("failed to read config file: %w", err) + } + var config Config + if err := yaml.Unmarshal(data, &config); err != nil { + return nil, fmt.Errorf("failed to parse config file: %w", err) + } + if config.NatsURL == "" { + config.NatsURL = "nats://localhost:4222" + } + if config.Subjects == "" { + config.Subjects = ">" + } + if config.Port == "" { + config.Port = "8080" + } + if config.MaxMessages == 0 { + config.MaxMessages = 1000 + } + return &config, nil +} + +func main() { + configFile := "config.yaml" + if len(os.Args) > 1 { + configFile = os.Args[1] + } + config, err := loadConfig(configFile) + if err != nil { + log.Fatalf("Failed to load config: %v", err) + } + log.Printf("Loaded configuration from %s", configFile) + store := NewMessageStore(config.MaxMessages) + hub := NewHub() + go hub.Run() + var opts []nats.Option + if config.Token != "" { + opts = append(opts, nats.Token(config.Token)) + log.Printf("Using token authentication") + } else if config.Username != "" || config.Password != "" { + opts = append(opts, nats.UserInfo(config.Username, config.Password)) + log.Printf("Using username/password authentication") + } + nc, err := nats.Connect(config.NatsURL, opts...) + if err != nil { + log.Fatalf("Failed to connect to NATS: %v", err) + } + defer nc.Close() + log.Printf("Connected to NATS at %s", config.NatsURL) + subjectsList := parseSubjects(config.Subjects) + for _, subject := range subjectsList { + sub, err := nc.Subscribe(subject, func(msg *nats.Msg) { + message := Message{ + ID: fmt.Sprintf("%d", time.Now().UnixNano()), + Subject: msg.Subject, + Data: string(msg.Data), + Timestamp: time.Now(), + Size: len(msg.Data), + } + store.Add(message) + hub.broadcast <- message + log.Printf("Received message on %s: %d bytes", msg.Subject, len(msg.Data)) + }) + if err != nil { + log.Fatalf("Failed to subscribe to %s: %v", subject, err) + } + log.Printf("Subscribed to: %s", subject) + defer sub.Unsubscribe() + } + // Определяем путь к index.html + indexPath := "index.html" + if _, err := os.Stat(indexPath); os.IsNotExist(err) { + // Если не найден в текущей директории, пробуем рядом с исполняемым файлом + exePath, err := os.Executable() + if err == nil { + exeDir := filepath.Dir(exePath) + indexPath = filepath.Join(exeDir, "index.html") + } + } + + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" { + http.NotFound(w, r) + return + } + http.ServeFile(w, r, indexPath) + }) + http.HandleFunc("/api/messages", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + subject := r.URL.Query().Get("subject") + var messages []Message + if subject != "" { + messages = store.GetBySubject(subject) + } else { + messages = store.GetAll() + } + json.NewEncoder(w).Encode(messages) + }) + http.HandleFunc("/api/subjects", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(store.GetSubjects()) + }) + http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("WebSocket upgrade error: %v", err) + return + } + hub.register <- conn + + // Отправляем все существующие сообщения одним массивом при подключении + go func() { + messages := store.GetAll() + // Отправляем массив всех сообщений + if err := conn.WriteJSON(messages); err != nil { + log.Printf("Error sending initial messages: %v", err) + return + } + }() + + // Читаем сообщения от клиента (для keep-alive) + go func() { + for { + _, _, err := conn.ReadMessage() + if err != nil { + hub.unregister <- conn + break + } + } + }() + }) + log.Printf("Starting HTTP server on port %s", config.Port) + log.Printf("Open http://localhost:%s in your browser", config.Port) + addr := ":" + config.Port + log.Fatal(http.ListenAndServe(addr, nil)) +} + +func parseSubjects(subjectsStr string) []string { + if subjectsStr == ">" { + return []string{">"} + } + var result []string + subjects := []rune(subjectsStr) + current := "" + for i, r := range subjects { + if r == ',' { + if current != "" { + result = append(result, current) + current = "" + } + } else { + current += string(r) + } + if i == len(subjects)-1 && current != "" { + result = append(result, current) + } + } + if len(result) == 0 { + result = []string{">"} + } + return result +}