nats-ui/main.go
admin f4f3062352
Some checks failed
CI/CD / Lint & Test (push) Failing after 51s
CI/CD / Build & Push Docker image (hub.p42.ru) (push) Has been skipped
Update architecture
2026-02-09 17:31:21 +07:00

418 lines
13 KiB
Go

package main
import (
"embed"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/nats-io/nats.go"
"gopkg.in/yaml.v3"
)
//go:embed index.html favicon.ico
var staticFS embed.FS
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type Message struct {
ID string `json:"id"`
Subject string `json:"subject"`
Data string `json:"data"`
Timestamp time.Time `json:"timestamp"`
Size int `json:"size"`
Producer string `json:"producer,omitempty"`
Consumer string `json:"consumer,omitempty"`
}
type MessageStore struct {
mu sync.RWMutex
messages []Message
maxSize int
}
func NewMessageStore(maxSize int) *MessageStore {
return &MessageStore{messages: make([]Message, 0), maxSize: maxSize}
}
func (ms *MessageStore) Add(msg Message) {
ms.mu.Lock()
defer ms.mu.Unlock()
ms.messages = append(ms.messages, msg)
if len(ms.messages) > ms.maxSize {
ms.messages = ms.messages[1:]
}
}
func (ms *MessageStore) GetAll() []Message {
ms.mu.RLock()
defer ms.mu.RUnlock()
result := make([]Message, len(ms.messages))
copy(result, ms.messages)
return result
}
func (ms *MessageStore) GetBySubject(subject string) []Message {
ms.mu.RLock()
defer ms.mu.RUnlock()
var result []Message
for _, msg := range ms.messages {
if msg.Subject == subject {
result = append(result, msg)
}
}
return result
}
func (ms *MessageStore) GetSubjects() []string {
ms.mu.RLock()
defer ms.mu.RUnlock()
subjectMap := make(map[string]bool)
for _, msg := range ms.messages {
subjectMap[msg.Subject] = true
}
subjects := make([]string, 0, len(subjectMap))
for subject := range subjectMap {
subjects = append(subjects, subject)
}
return subjects
}
type Hub struct {
clients map[*websocket.Conn]bool
broadcast chan Message
register chan *websocket.Conn
unregister chan *websocket.Conn
}
func NewHub() *Hub {
return &Hub{
clients: make(map[*websocket.Conn]bool),
broadcast: make(chan Message, 256),
register: make(chan *websocket.Conn),
unregister: make(chan *websocket.Conn),
}
}
func (h *Hub) Run() {
for {
select {
case conn := <-h.register:
h.clients[conn] = true
case conn := <-h.unregister:
if _, ok := h.clients[conn]; ok {
delete(h.clients, conn)
if err := conn.Close(); err != nil {
log.Printf("WebSocket close error: %v", err)
}
}
case message := <-h.broadcast:
for conn := range h.clients {
err := conn.WriteJSON(message)
if err != nil {
log.Printf("WebSocket error: %v", err)
delete(h.clients, conn)
if closeErr := conn.Close(); closeErr != nil {
log.Printf("WebSocket close error: %v", closeErr)
}
}
}
}
}
}
type Config struct {
NatsURL string `yaml:"nats_url"`
NatsMonitorURL string `yaml:"nats_monitor_url"`
Subjects string `yaml:"subjects"`
Port string `yaml:"port"`
Token string `yaml:"token"`
Username string `yaml:"username"`
Password string `yaml:"password"`
MaxMessages int `yaml:"max_messages"`
}
type connzResponse struct {
Connections []connInfo `json:"connections"`
Conns []connInfo `json:"conns"`
}
type connInfo struct {
CID uint64 `json:"cid"`
IP string `json:"ip"`
Port int `json:"port"`
Name string `json:"name"`
Subscriptions int `json:"subscriptions"`
InMsgs int64 `json:"in_msgs"`
OutMsgs int64 `json:"out_msgs"`
InBytes int64 `json:"in_bytes"`
OutBytes int64 `json:"out_bytes"`
Lang string `json:"lang"`
Version string `json:"version"`
}
func connectionsHandler(w http.ResponseWriter, _ *http.Request, cfg *Config) {
if cfg.NatsMonitorURL == "" {
if err := json.NewEncoder(w).Encode(connzResponse{Connections: []connInfo{}}); err != nil {
log.Printf("Failed to encode connections: %v", err)
}
return
}
url := strings.TrimSuffix(cfg.NatsMonitorURL, "/") + "/connz"
log.Printf("Fetching NATS connections from %s", url)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if cfg.Username != "" || cfg.Password != "" {
req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(cfg.Username+":"+cfg.Password)))
}
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Do(req)
if err != nil {
log.Printf("Failed to fetch NATS connz from %s: %v", url, err)
if err := json.NewEncoder(w).Encode(connzResponse{Connections: []connInfo{}}); err != nil {
log.Printf("Failed to encode connections: %v", err)
}
return
}
defer func() { _ = resp.Body.Close() }()
body, err := io.ReadAll(resp.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if resp.StatusCode != http.StatusOK {
log.Printf("NATS connz %s returned status %d, body: %s", url, resp.StatusCode, truncate(string(body), 300))
if err := json.NewEncoder(w).Encode(connzResponse{Connections: []connInfo{}}); err != nil {
log.Printf("Failed to encode connections: %v", err)
}
return
}
var connz connzResponse
if err := json.Unmarshal(body, &connz); err != nil {
log.Printf("Failed to parse NATS connz from %s: %v, body sample: %s", url, err, truncate(string(body), 200))
}
if len(connz.Connections) == 0 && len(connz.Conns) > 0 {
connz.Connections = connz.Conns
}
out := connzResponse{Connections: connz.Connections}
if err := json.NewEncoder(w).Encode(out); err != nil {
log.Printf("Failed to encode connections: %v", err)
}
}
func truncate(s string, max int) string {
if len(s) <= max {
return s
}
return s[:max] + "..."
}
func extractProducerFromPayload(data []byte) string {
var m map[string]interface{}
if err := json.Unmarshal(data, &m); err != nil {
return ""
}
for _, key := range []string{"producer", "source", "publisher", "from", "client_id"} {
if v, ok := m[key]; ok {
if s, ok := v.(string); ok && s != "" {
return s
}
}
}
return ""
}
func loadConfig(filename string) (*Config, error) {
data, err := os.ReadFile(filename)
if err != nil {
return nil, err
}
var config Config
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, fmt.Errorf("failed to parse config file: %w", err)
}
if config.NatsURL == "" {
config.NatsURL = "nats://localhost:4222"
}
if config.Subjects == "" {
config.Subjects = ">"
}
if config.Port == "" {
config.Port = "8080"
}
if config.MaxMessages == 0 {
config.MaxMessages = 1000
}
return &config, nil
}
func main() {
configFile := "config.yaml"
if len(os.Args) > 1 {
configFile = os.Args[1]
}
config, err := loadConfig(configFile)
if err != nil {
if os.IsNotExist(err) {
log.Fatalf("Config file not found: %s (config must be an external file). Create it or run: %s /path/to/config.yaml", configFile, os.Args[0])
}
log.Fatalf("Failed to load config: %v", err)
}
store := NewMessageStore(config.MaxMessages)
hub := NewHub()
go hub.Run()
var opts []nats.Option
opts = append(opts, nats.Name("nats-ui"))
if config.Token != "" {
opts = append(opts, nats.Token(config.Token))
} else if config.Username != "" || config.Password != "" {
opts = append(opts, nats.UserInfo(config.Username, config.Password))
}
nc, err := nats.Connect(config.NatsURL, opts...)
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
defer nc.Close()
subjectsList := parseSubjects(config.Subjects)
for _, subject := range subjectsList {
subj := subject
sub, err := nc.Subscribe(subj, func(msg *nats.Msg) {
producer := extractProducerFromPayload(msg.Data)
message := Message{
ID: fmt.Sprintf("%d", time.Now().UnixNano()),
Subject: msg.Subject,
Data: string(msg.Data),
Timestamp: time.Now(),
Size: len(msg.Data),
Producer: producer,
Consumer: "nats-ui",
}
store.Add(message)
hub.broadcast <- message
})
if err != nil {
log.Fatalf("Failed to subscribe to %s: %v", subj, err)
}
defer func() {
if err := sub.Unsubscribe(); err != nil {
log.Printf("Failed to unsubscribe from %s: %v", subj, err)
}
}()
}
indexHTML, _ := staticFS.ReadFile("index.html")
faviconICO, _ := staticFS.ReadFile("favicon.ico")
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/", "/index.html":
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(indexHTML)
case "/favicon.ico":
w.Header().Set("Content-Type", "image/x-icon")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(faviconICO)
default:
http.NotFound(w, r)
}
})
http.HandleFunc("/api/messages", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
subject := r.URL.Query().Get("subject")
var messages []Message
if subject != "" {
messages = store.GetBySubject(subject)
} else {
messages = store.GetAll()
}
if err := json.NewEncoder(w).Encode(messages); err != nil {
log.Printf("Failed to encode messages: %v", err)
}
})
http.HandleFunc("/api/subjects", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(store.GetSubjects()); err != nil {
log.Printf("Failed to encode subjects: %v", err)
}
})
http.HandleFunc("/api/subscribed", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
list := parseSubjects(config.Subjects)
if err := json.NewEncoder(w).Encode(map[string]interface{}{"subscribed": list}); err != nil {
log.Printf("Failed to encode subscribed: %v", err)
}
})
http.HandleFunc("/api/connections", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
connectionsHandler(w, r, config)
})
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("WebSocket upgrade error: %v", err)
return
}
hub.register <- conn
go func() {
messages := store.GetAll()
if err := conn.WriteJSON(messages); err != nil {
log.Printf("Error sending initial messages: %v", err)
return
}
}()
go func() {
for {
_, _, err := conn.ReadMessage()
if err != nil {
hub.unregister <- conn
break
}
}
}()
})
log.Printf("NATS UI: http://0.0.0.0:%s", config.Port)
addr := ":" + config.Port
log.Fatal(http.ListenAndServe(addr, nil))
}
func parseSubjects(subjectsStr string) []string {
if subjectsStr == ">" {
return []string{">"}
}
var result []string
subjects := []rune(subjectsStr)
current := ""
for i, r := range subjects {
if r == ',' {
if current != "" {
result = append(result, current)
current = ""
}
} else {
current += string(r)
}
if i == len(subjects)-1 && current != "" {
result = append(result, current)
}
}
if len(result) == 0 {
result = []string{">"}
}
return result
}