411 lines
13 KiB
Go
411 lines
13 KiB
Go
package main
|
|
|
|
import (
|
|
"embed"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/nats-io/nats.go"
|
|
"gopkg.in/yaml.v3"
|
|
)
|
|
|
|
//go:embed index.html favicon.ico
|
|
var staticFS embed.FS
|
|
|
|
var upgrader = websocket.Upgrader{
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
return true
|
|
},
|
|
}
|
|
|
|
type Message struct {
|
|
ID string `json:"id"`
|
|
Subject string `json:"subject"`
|
|
Data string `json:"data"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Size int `json:"size"`
|
|
Producer string `json:"producer,omitempty"`
|
|
Consumer string `json:"consumer,omitempty"`
|
|
}
|
|
|
|
type MessageStore struct {
|
|
mu sync.RWMutex
|
|
messages []Message
|
|
maxSize int
|
|
}
|
|
|
|
func NewMessageStore(maxSize int) *MessageStore {
|
|
return &MessageStore{messages: make([]Message, 0), maxSize: maxSize}
|
|
}
|
|
|
|
func (ms *MessageStore) Add(msg Message) {
|
|
ms.mu.Lock()
|
|
defer ms.mu.Unlock()
|
|
ms.messages = append(ms.messages, msg)
|
|
if len(ms.messages) > ms.maxSize {
|
|
ms.messages = ms.messages[1:]
|
|
}
|
|
}
|
|
|
|
func (ms *MessageStore) GetAll() []Message {
|
|
ms.mu.RLock()
|
|
defer ms.mu.RUnlock()
|
|
result := make([]Message, len(ms.messages))
|
|
copy(result, ms.messages)
|
|
return result
|
|
}
|
|
|
|
func (ms *MessageStore) GetBySubject(subject string) []Message {
|
|
ms.mu.RLock()
|
|
defer ms.mu.RUnlock()
|
|
var result []Message
|
|
for _, msg := range ms.messages {
|
|
if msg.Subject == subject {
|
|
result = append(result, msg)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (ms *MessageStore) GetSubjects() []string {
|
|
ms.mu.RLock()
|
|
defer ms.mu.RUnlock()
|
|
subjectMap := make(map[string]bool)
|
|
for _, msg := range ms.messages {
|
|
subjectMap[msg.Subject] = true
|
|
}
|
|
subjects := make([]string, 0, len(subjectMap))
|
|
for subject := range subjectMap {
|
|
subjects = append(subjects, subject)
|
|
}
|
|
return subjects
|
|
}
|
|
|
|
type Hub struct {
|
|
clients map[*websocket.Conn]bool
|
|
broadcast chan Message
|
|
register chan *websocket.Conn
|
|
unregister chan *websocket.Conn
|
|
}
|
|
|
|
func NewHub() *Hub {
|
|
return &Hub{
|
|
clients: make(map[*websocket.Conn]bool),
|
|
broadcast: make(chan Message, 256),
|
|
register: make(chan *websocket.Conn),
|
|
unregister: make(chan *websocket.Conn),
|
|
}
|
|
}
|
|
|
|
func (h *Hub) Run() {
|
|
for {
|
|
select {
|
|
case conn := <-h.register:
|
|
h.clients[conn] = true
|
|
case conn := <-h.unregister:
|
|
if _, ok := h.clients[conn]; ok {
|
|
delete(h.clients, conn)
|
|
if err := conn.Close(); err != nil {
|
|
log.Printf("WebSocket close error: %v", err)
|
|
}
|
|
}
|
|
case message := <-h.broadcast:
|
|
for conn := range h.clients {
|
|
err := conn.WriteJSON(message)
|
|
if err != nil {
|
|
log.Printf("WebSocket error: %v", err)
|
|
delete(h.clients, conn)
|
|
if closeErr := conn.Close(); closeErr != nil {
|
|
log.Printf("WebSocket close error: %v", closeErr)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
type Config struct {
|
|
NatsURL string `yaml:"nats_url"`
|
|
NatsMonitorURL string `yaml:"nats_monitor_url"`
|
|
Subjects string `yaml:"subjects"`
|
|
Port string `yaml:"port"`
|
|
Token string `yaml:"token"`
|
|
Username string `yaml:"username"`
|
|
Password string `yaml:"password"`
|
|
MaxMessages int `yaml:"max_messages"`
|
|
}
|
|
|
|
type connzResponse struct {
|
|
Connections []connInfo `json:"connections"`
|
|
}
|
|
|
|
type connInfo struct {
|
|
CID int `json:"cid"`
|
|
IP string `json:"ip"`
|
|
Port int `json:"port"`
|
|
Name string `json:"name"`
|
|
Subscriptions int `json:"subscriptions"`
|
|
InMsgs int64 `json:"in_msgs"`
|
|
OutMsgs int64 `json:"out_msgs"`
|
|
InBytes int64 `json:"in_bytes"`
|
|
OutBytes int64 `json:"out_bytes"`
|
|
Lang string `json:"lang"`
|
|
Version string `json:"version"`
|
|
}
|
|
|
|
func connectionsHandler(w http.ResponseWriter, _ *http.Request, cfg *Config) {
|
|
if cfg.NatsMonitorURL == "" {
|
|
if err := json.NewEncoder(w).Encode(connzResponse{Connections: []connInfo{}}); err != nil {
|
|
log.Printf("Failed to encode connections: %v", err)
|
|
}
|
|
return
|
|
}
|
|
url := strings.TrimSuffix(cfg.NatsMonitorURL, "/") + "/connz"
|
|
req, err := http.NewRequest(http.MethodGet, url, nil)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
if cfg.Username != "" || cfg.Password != "" {
|
|
req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(cfg.Username+":"+cfg.Password)))
|
|
}
|
|
client := &http.Client{Timeout: 5 * time.Second}
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
log.Printf("Failed to fetch NATS connz: %v", err)
|
|
if err := json.NewEncoder(w).Encode(connzResponse{Connections: []connInfo{}}); err != nil {
|
|
log.Printf("Failed to encode connections: %v", err)
|
|
}
|
|
return
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
log.Printf("NATS connz returned status %d", resp.StatusCode)
|
|
if err := json.NewEncoder(w).Encode(connzResponse{Connections: []connInfo{}}); err != nil {
|
|
log.Printf("Failed to encode connections: %v", err)
|
|
}
|
|
return
|
|
}
|
|
var connz connzResponse
|
|
if err := json.Unmarshal(body, &connz); err != nil {
|
|
log.Printf("Failed to parse connz: %v", err)
|
|
if err := json.NewEncoder(w).Encode(connzResponse{Connections: []connInfo{}}); err != nil {
|
|
log.Printf("Failed to encode connections: %v", err)
|
|
}
|
|
return
|
|
}
|
|
if err := json.NewEncoder(w).Encode(connz); err != nil {
|
|
log.Printf("Failed to encode connections: %v", err)
|
|
}
|
|
}
|
|
|
|
func extractProducerFromPayload(data []byte) string {
|
|
var m map[string]interface{}
|
|
if err := json.Unmarshal(data, &m); err != nil {
|
|
return ""
|
|
}
|
|
for _, key := range []string{"producer", "source", "publisher", "from", "client_id"} {
|
|
if v, ok := m[key]; ok {
|
|
if s, ok := v.(string); ok && s != "" {
|
|
return s
|
|
}
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func loadConfig(filename string) (*Config, error) {
|
|
data, err := os.ReadFile(filename)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read config file: %w", err)
|
|
}
|
|
var config Config
|
|
if err := yaml.Unmarshal(data, &config); err != nil {
|
|
return nil, fmt.Errorf("failed to parse config file: %w", err)
|
|
}
|
|
if config.NatsURL == "" {
|
|
config.NatsURL = "nats://localhost:4222"
|
|
}
|
|
if config.Subjects == "" {
|
|
config.Subjects = ">"
|
|
}
|
|
if config.Port == "" {
|
|
config.Port = "8080"
|
|
}
|
|
if config.MaxMessages == 0 {
|
|
config.MaxMessages = 1000
|
|
}
|
|
return &config, nil
|
|
}
|
|
|
|
func main() {
|
|
configFile := "config.yaml"
|
|
if len(os.Args) > 1 {
|
|
configFile = os.Args[1]
|
|
}
|
|
config, err := loadConfig(configFile)
|
|
if err != nil {
|
|
log.Fatalf("Failed to load config: %v", err)
|
|
}
|
|
log.Printf("Loaded configuration from %s", configFile)
|
|
store := NewMessageStore(config.MaxMessages)
|
|
hub := NewHub()
|
|
go hub.Run()
|
|
var opts []nats.Option
|
|
opts = append(opts, nats.Name("nats-ui"))
|
|
if config.Token != "" {
|
|
opts = append(opts, nats.Token(config.Token))
|
|
log.Printf("Using token authentication")
|
|
} else if config.Username != "" || config.Password != "" {
|
|
opts = append(opts, nats.UserInfo(config.Username, config.Password))
|
|
log.Printf("Using username/password authentication")
|
|
}
|
|
nc, err := nats.Connect(config.NatsURL, opts...)
|
|
if err != nil {
|
|
log.Fatalf("Failed to connect to NATS: %v", err)
|
|
}
|
|
defer nc.Close()
|
|
log.Printf("Connected to NATS at %s", config.NatsURL)
|
|
subjectsList := parseSubjects(config.Subjects)
|
|
for _, subject := range subjectsList {
|
|
subj := subject
|
|
sub, err := nc.Subscribe(subj, func(msg *nats.Msg) {
|
|
producer := extractProducerFromPayload(msg.Data)
|
|
message := Message{
|
|
ID: fmt.Sprintf("%d", time.Now().UnixNano()),
|
|
Subject: msg.Subject,
|
|
Data: string(msg.Data),
|
|
Timestamp: time.Now(),
|
|
Size: len(msg.Data),
|
|
Producer: producer,
|
|
Consumer: "nats-ui",
|
|
}
|
|
store.Add(message)
|
|
hub.broadcast <- message
|
|
log.Printf("Received message on %s: %d bytes", msg.Subject, len(msg.Data))
|
|
})
|
|
if err != nil {
|
|
log.Fatalf("Failed to subscribe to %s: %v", subj, err)
|
|
}
|
|
log.Printf("Subscribed to: %s", subj)
|
|
defer func() {
|
|
if err := sub.Unsubscribe(); err != nil {
|
|
log.Printf("Failed to unsubscribe from %s: %v", subj, err)
|
|
}
|
|
}()
|
|
}
|
|
indexHTML, _ := staticFS.ReadFile("index.html")
|
|
faviconICO, _ := staticFS.ReadFile("favicon.ico")
|
|
|
|
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
|
switch r.URL.Path {
|
|
case "/", "/index.html":
|
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write(indexHTML)
|
|
case "/favicon.ico":
|
|
w.Header().Set("Content-Type", "image/x-icon")
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write(faviconICO)
|
|
default:
|
|
http.NotFound(w, r)
|
|
}
|
|
})
|
|
http.HandleFunc("/api/messages", func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
subject := r.URL.Query().Get("subject")
|
|
var messages []Message
|
|
if subject != "" {
|
|
messages = store.GetBySubject(subject)
|
|
} else {
|
|
messages = store.GetAll()
|
|
}
|
|
if err := json.NewEncoder(w).Encode(messages); err != nil {
|
|
log.Printf("Failed to encode messages: %v", err)
|
|
}
|
|
})
|
|
http.HandleFunc("/api/subjects", func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
if err := json.NewEncoder(w).Encode(store.GetSubjects()); err != nil {
|
|
log.Printf("Failed to encode subjects: %v", err)
|
|
}
|
|
})
|
|
http.HandleFunc("/api/connections", func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
connectionsHandler(w, r, config)
|
|
})
|
|
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
|
|
conn, err := upgrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
log.Printf("WebSocket upgrade error: %v", err)
|
|
return
|
|
}
|
|
hub.register <- conn
|
|
|
|
// Отправляем все существующие сообщения одним массивом при подключении
|
|
go func() {
|
|
messages := store.GetAll()
|
|
// Отправляем массив всех сообщений
|
|
if err := conn.WriteJSON(messages); err != nil {
|
|
log.Printf("Error sending initial messages: %v", err)
|
|
return
|
|
}
|
|
}()
|
|
|
|
// Читаем сообщения от клиента (для keep-alive)
|
|
go func() {
|
|
for {
|
|
_, _, err := conn.ReadMessage()
|
|
if err != nil {
|
|
hub.unregister <- conn
|
|
break
|
|
}
|
|
}
|
|
}()
|
|
})
|
|
log.Printf("Starting HTTP server on port %s", config.Port)
|
|
log.Printf("Open http://localhost:%s in your browser", config.Port)
|
|
addr := ":" + config.Port
|
|
log.Fatal(http.ListenAndServe(addr, nil))
|
|
}
|
|
|
|
func parseSubjects(subjectsStr string) []string {
|
|
if subjectsStr == ">" {
|
|
return []string{">"}
|
|
}
|
|
var result []string
|
|
subjects := []rune(subjectsStr)
|
|
current := ""
|
|
for i, r := range subjects {
|
|
if r == ',' {
|
|
if current != "" {
|
|
result = append(result, current)
|
|
current = ""
|
|
}
|
|
} else {
|
|
current += string(r)
|
|
}
|
|
if i == len(subjects)-1 && current != "" {
|
|
result = append(result, current)
|
|
}
|
|
}
|
|
if len(result) == 0 {
|
|
result = []string{">"}
|
|
}
|
|
return result
|
|
}
|