diff --git a/README.md b/README.md index d2aa02d..def5cf7 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ ## Возможности - 🔄 Подписка на NATS subjects/очереди в реальном времени -- 📊 Красивый веб-интерфейс для просмотра сообщений +- 📊 Веб-интерфейс: продюсер/потребитель по сообщениям, активные подключения к NATS - 🔍 Фильтрация по subject и поиск по содержимому - 📈 Статистика: количество сообщений, subjects, общий размер - 🌐 WebSocket для обновлений в реальном времени @@ -41,6 +41,9 @@ nats_url: "nats://localhost:4222" # username: "your-username" # password: "your-password" +# URL HTTP-мониторинга NATS (порт 8222) — для отображения активных подключений +# nats_monitor_url: "http://localhost:8222" + # Список subjects для подписки (через запятую) subjects: ">" diff --git a/index.html b/index.html index a24cdde..de38476 100644 --- a/index.html +++ b/index.html @@ -228,6 +228,64 @@ color: #1976d2; } + .message-producer-consumer { + display: flex; + gap: 20px; + margin-top: 8px; + font-size: 0.85em; + color: #555; + } + + .message-producer-consumer span { + display: flex; + align-items: center; + gap: 6px; + } + + .message-producer-consumer strong { + color: #333; + } + + .connections-section { + padding: 20px 30px; + border-bottom: 1px solid #eee; + } + + .connections-section h2 { + font-size: 1.3em; + margin-bottom: 15px; + color: #333; + } + + .connections-table { + width: 100%; + border-collapse: collapse; + font-size: 0.9em; + } + + .connections-table th, + .connections-table td { + padding: 10px 12px; + text-align: left; + border-bottom: 1px solid #eee; + } + + .connections-table th { + background: #f5f5f5; + font-weight: 600; + color: #555; + } + + .connections-table tr:hover { + background: #fafafa; + } + + .connections-empty { + color: #999; + padding: 20px; + text-align: center; + } + .empty-state { text-align: center; padding: 60px 20px; @@ -300,6 +358,13 @@ +
+

Активные подключения к NATS

+
+
Загрузка…
+
+
+
@@ -413,6 +478,10 @@ ${formatBytes(msg.size)}
+
+ Продюсер: ${escapeHtml(msg.producer || '—')} + Потребитель: ${escapeHtml(msg.consumer || 'nats-ui')} +
${escapeHtml(msg.data)}
`).join(''); @@ -496,8 +565,56 @@ updateMessages(); }); + function loadConnections() { + fetch('/api/connections') + .then(res => res.json()) + .then(data => { + const container = document.getElementById('connectionsContainer'); + const conns = data.connections || []; + if (conns.length === 0) { + container.innerHTML = '
Нет данных о подключениях. Укажите nats_monitor_url в конфигурации (HTTP мониторинг NATS, порт 8222).
'; + return; + } + container.innerHTML = ` + + + + + + + + + + + + + + ${conns.map(c => ` + + + + + + + + + + `).join('')} + +
CIDИмяIP:ПортПодписокВходящиеИсходящиеЯзык
${c.cid}${escapeHtml(c.name || '—')}${escapeHtml(c.ip || '')}:${c.port || ''}${c.subscriptions ?? '—'}${c.in_msgs ?? '—'}${c.out_msgs ?? '—'}${escapeHtml(c.lang || '—')}
+ `; + }) + .catch(err => { + document.getElementById('connectionsContainer').innerHTML = + '
Не удалось загрузить список подключений.
'; + console.error('Connections load error:', err); + }); + } + // Инициализация - только WebSocket, все сообщения придут автоматически connectWebSocket(); + loadConnections(); + setInterval(loadConnections, 5000); diff --git a/main.go b/main.go index 6af38de..aa97530 100644 --- a/main.go +++ b/main.go @@ -1,12 +1,15 @@ package main import ( + "encoding/base64" "encoding/json" "fmt" + "io" "log" "net/http" "os" "path/filepath" + "strings" "sync" "time" @@ -27,6 +30,8 @@ type Message struct { Data string `json:"data"` Timestamp time.Time `json:"timestamp"` Size int `json:"size"` + Producer string `json:"producer,omitempty"` + Consumer string `json:"consumer,omitempty"` } type MessageStore struct { @@ -126,13 +131,98 @@ func (h *Hub) Run() { } type Config struct { - NatsURL string `yaml:"nats_url"` - Subjects string `yaml:"subjects"` - Port string `yaml:"port"` - Token string `yaml:"token"` - Username string `yaml:"username"` - Password string `yaml:"password"` - MaxMessages int `yaml:"max_messages"` + NatsURL string `yaml:"nats_url"` + NatsMonitorURL string `yaml:"nats_monitor_url"` + Subjects string `yaml:"subjects"` + Port string `yaml:"port"` + Token string `yaml:"token"` + Username string `yaml:"username"` + Password string `yaml:"password"` + MaxMessages int `yaml:"max_messages"` +} + +type connzResponse struct { + Connections []connInfo `json:"connections"` +} + +type connInfo struct { + CID int `json:"cid"` + IP string `json:"ip"` + Port int `json:"port"` + Name string `json:"name"` + Subscriptions int `json:"subscriptions"` + InMsgs int64 `json:"in_msgs"` + OutMsgs int64 `json:"out_msgs"` + InBytes int64 `json:"in_bytes"` + OutBytes int64 `json:"out_bytes"` + Lang string `json:"lang"` + Version string `json:"version"` +} + +func connectionsHandler(w http.ResponseWriter, _ *http.Request, cfg *Config) { + if cfg.NatsMonitorURL == "" { + if err := json.NewEncoder(w).Encode(connzResponse{Connections: []connInfo{}}); err != nil { + log.Printf("Failed to encode connections: %v", err) + } + return + } + url := strings.TrimSuffix(cfg.NatsMonitorURL, "/") + "/connz" + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if cfg.Username != "" || cfg.Password != "" { + req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(cfg.Username+":"+cfg.Password))) + } + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Do(req) + if err != nil { + log.Printf("Failed to fetch NATS connz: %v", err) + if err := json.NewEncoder(w).Encode(connzResponse{Connections: []connInfo{}}); err != nil { + log.Printf("Failed to encode connections: %v", err) + } + return + } + defer func() { _ = resp.Body.Close() }() + body, err := io.ReadAll(resp.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if resp.StatusCode != http.StatusOK { + log.Printf("NATS connz returned status %d", resp.StatusCode) + if err := json.NewEncoder(w).Encode(connzResponse{Connections: []connInfo{}}); err != nil { + log.Printf("Failed to encode connections: %v", err) + } + return + } + var connz connzResponse + if err := json.Unmarshal(body, &connz); err != nil { + log.Printf("Failed to parse connz: %v", err) + if err := json.NewEncoder(w).Encode(connzResponse{Connections: []connInfo{}}); err != nil { + log.Printf("Failed to encode connections: %v", err) + } + return + } + if err := json.NewEncoder(w).Encode(connz); err != nil { + log.Printf("Failed to encode connections: %v", err) + } +} + +func extractProducerFromPayload(data []byte) string { + var m map[string]interface{} + if err := json.Unmarshal(data, &m); err != nil { + return "" + } + for _, key := range []string{"producer", "source", "publisher", "from", "client_id"} { + if v, ok := m[key]; ok { + if s, ok := v.(string); ok && s != "" { + return s + } + } + } + return "" } func loadConfig(filename string) (*Config, error) { @@ -173,6 +263,7 @@ func main() { hub := NewHub() go hub.Run() var opts []nats.Option + opts = append(opts, nats.Name("nats-ui")) if config.Token != "" { opts = append(opts, nats.Token(config.Token)) log.Printf("Using token authentication") @@ -190,12 +281,15 @@ func main() { for _, subject := range subjectsList { subj := subject sub, err := nc.Subscribe(subj, func(msg *nats.Msg) { + producer := extractProducerFromPayload(msg.Data) message := Message{ ID: fmt.Sprintf("%d", time.Now().UnixNano()), Subject: msg.Subject, Data: string(msg.Data), Timestamp: time.Now(), Size: len(msg.Data), + Producer: producer, + Consumer: "nats-ui", } store.Add(message) hub.broadcast <- message @@ -248,6 +342,10 @@ func main() { log.Printf("Failed to encode subjects: %v", err) } }) + http.HandleFunc("/api/connections", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + connectionsHandler(w, r, config) + }) http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil {