409 lines
13 KiB
Go
409 lines
13 KiB
Go
package main
|
||
|
||
import (
|
||
"encoding/base64"
|
||
"encoding/json"
|
||
"fmt"
|
||
"io"
|
||
"log"
|
||
"net/http"
|
||
"os"
|
||
"path/filepath"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/gorilla/websocket"
|
||
"github.com/nats-io/nats.go"
|
||
"gopkg.in/yaml.v3"
|
||
)
|
||
|
||
var upgrader = websocket.Upgrader{
|
||
CheckOrigin: func(r *http.Request) bool {
|
||
return true
|
||
},
|
||
}
|
||
|
||
type Message struct {
|
||
ID string `json:"id"`
|
||
Subject string `json:"subject"`
|
||
Data string `json:"data"`
|
||
Timestamp time.Time `json:"timestamp"`
|
||
Size int `json:"size"`
|
||
Producer string `json:"producer,omitempty"`
|
||
Consumer string `json:"consumer,omitempty"`
|
||
}
|
||
|
||
type MessageStore struct {
|
||
mu sync.RWMutex
|
||
messages []Message
|
||
maxSize int
|
||
}
|
||
|
||
func NewMessageStore(maxSize int) *MessageStore {
|
||
return &MessageStore{messages: make([]Message, 0), maxSize: maxSize}
|
||
}
|
||
|
||
func (ms *MessageStore) Add(msg Message) {
|
||
ms.mu.Lock()
|
||
defer ms.mu.Unlock()
|
||
ms.messages = append(ms.messages, msg)
|
||
if len(ms.messages) > ms.maxSize {
|
||
ms.messages = ms.messages[1:]
|
||
}
|
||
}
|
||
|
||
func (ms *MessageStore) GetAll() []Message {
|
||
ms.mu.RLock()
|
||
defer ms.mu.RUnlock()
|
||
result := make([]Message, len(ms.messages))
|
||
copy(result, ms.messages)
|
||
return result
|
||
}
|
||
|
||
func (ms *MessageStore) GetBySubject(subject string) []Message {
|
||
ms.mu.RLock()
|
||
defer ms.mu.RUnlock()
|
||
var result []Message
|
||
for _, msg := range ms.messages {
|
||
if msg.Subject == subject {
|
||
result = append(result, msg)
|
||
}
|
||
}
|
||
return result
|
||
}
|
||
|
||
func (ms *MessageStore) GetSubjects() []string {
|
||
ms.mu.RLock()
|
||
defer ms.mu.RUnlock()
|
||
subjectMap := make(map[string]bool)
|
||
for _, msg := range ms.messages {
|
||
subjectMap[msg.Subject] = true
|
||
}
|
||
subjects := make([]string, 0, len(subjectMap))
|
||
for subject := range subjectMap {
|
||
subjects = append(subjects, subject)
|
||
}
|
||
return subjects
|
||
}
|
||
|
||
type Hub struct {
|
||
clients map[*websocket.Conn]bool
|
||
broadcast chan Message
|
||
register chan *websocket.Conn
|
||
unregister chan *websocket.Conn
|
||
}
|
||
|
||
func NewHub() *Hub {
|
||
return &Hub{
|
||
clients: make(map[*websocket.Conn]bool),
|
||
broadcast: make(chan Message, 256),
|
||
register: make(chan *websocket.Conn),
|
||
unregister: make(chan *websocket.Conn),
|
||
}
|
||
}
|
||
|
||
func (h *Hub) Run() {
|
||
for {
|
||
select {
|
||
case conn := <-h.register:
|
||
h.clients[conn] = true
|
||
case conn := <-h.unregister:
|
||
if _, ok := h.clients[conn]; ok {
|
||
delete(h.clients, conn)
|
||
if err := conn.Close(); err != nil {
|
||
log.Printf("WebSocket close error: %v", err)
|
||
}
|
||
}
|
||
case message := <-h.broadcast:
|
||
for conn := range h.clients {
|
||
err := conn.WriteJSON(message)
|
||
if err != nil {
|
||
log.Printf("WebSocket error: %v", err)
|
||
delete(h.clients, conn)
|
||
if closeErr := conn.Close(); closeErr != nil {
|
||
log.Printf("WebSocket close error: %v", closeErr)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
type Config struct {
|
||
NatsURL string `yaml:"nats_url"`
|
||
NatsMonitorURL string `yaml:"nats_monitor_url"`
|
||
Subjects string `yaml:"subjects"`
|
||
Port string `yaml:"port"`
|
||
Token string `yaml:"token"`
|
||
Username string `yaml:"username"`
|
||
Password string `yaml:"password"`
|
||
MaxMessages int `yaml:"max_messages"`
|
||
}
|
||
|
||
type connzResponse struct {
|
||
Connections []connInfo `json:"connections"`
|
||
}
|
||
|
||
type connInfo struct {
|
||
CID int `json:"cid"`
|
||
IP string `json:"ip"`
|
||
Port int `json:"port"`
|
||
Name string `json:"name"`
|
||
Subscriptions int `json:"subscriptions"`
|
||
InMsgs int64 `json:"in_msgs"`
|
||
OutMsgs int64 `json:"out_msgs"`
|
||
InBytes int64 `json:"in_bytes"`
|
||
OutBytes int64 `json:"out_bytes"`
|
||
Lang string `json:"lang"`
|
||
Version string `json:"version"`
|
||
}
|
||
|
||
func connectionsHandler(w http.ResponseWriter, _ *http.Request, cfg *Config) {
|
||
if cfg.NatsMonitorURL == "" {
|
||
if err := json.NewEncoder(w).Encode(connzResponse{Connections: []connInfo{}}); err != nil {
|
||
log.Printf("Failed to encode connections: %v", err)
|
||
}
|
||
return
|
||
}
|
||
url := strings.TrimSuffix(cfg.NatsMonitorURL, "/") + "/connz"
|
||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||
if err != nil {
|
||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||
return
|
||
}
|
||
if cfg.Username != "" || cfg.Password != "" {
|
||
req.Header.Set("Authorization", "Basic "+base64.StdEncoding.EncodeToString([]byte(cfg.Username+":"+cfg.Password)))
|
||
}
|
||
client := &http.Client{Timeout: 5 * time.Second}
|
||
resp, err := client.Do(req)
|
||
if err != nil {
|
||
log.Printf("Failed to fetch NATS connz: %v", err)
|
||
if err := json.NewEncoder(w).Encode(connzResponse{Connections: []connInfo{}}); err != nil {
|
||
log.Printf("Failed to encode connections: %v", err)
|
||
}
|
||
return
|
||
}
|
||
defer func() { _ = resp.Body.Close() }()
|
||
body, err := io.ReadAll(resp.Body)
|
||
if err != nil {
|
||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||
return
|
||
}
|
||
if resp.StatusCode != http.StatusOK {
|
||
log.Printf("NATS connz returned status %d", resp.StatusCode)
|
||
if err := json.NewEncoder(w).Encode(connzResponse{Connections: []connInfo{}}); err != nil {
|
||
log.Printf("Failed to encode connections: %v", err)
|
||
}
|
||
return
|
||
}
|
||
var connz connzResponse
|
||
if err := json.Unmarshal(body, &connz); err != nil {
|
||
log.Printf("Failed to parse connz: %v", err)
|
||
if err := json.NewEncoder(w).Encode(connzResponse{Connections: []connInfo{}}); err != nil {
|
||
log.Printf("Failed to encode connections: %v", err)
|
||
}
|
||
return
|
||
}
|
||
if err := json.NewEncoder(w).Encode(connz); err != nil {
|
||
log.Printf("Failed to encode connections: %v", err)
|
||
}
|
||
}
|
||
|
||
func extractProducerFromPayload(data []byte) string {
|
||
var m map[string]interface{}
|
||
if err := json.Unmarshal(data, &m); err != nil {
|
||
return ""
|
||
}
|
||
for _, key := range []string{"producer", "source", "publisher", "from", "client_id"} {
|
||
if v, ok := m[key]; ok {
|
||
if s, ok := v.(string); ok && s != "" {
|
||
return s
|
||
}
|
||
}
|
||
}
|
||
return ""
|
||
}
|
||
|
||
func loadConfig(filename string) (*Config, error) {
|
||
data, err := os.ReadFile(filename)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to read config file: %w", err)
|
||
}
|
||
var config Config
|
||
if err := yaml.Unmarshal(data, &config); err != nil {
|
||
return nil, fmt.Errorf("failed to parse config file: %w", err)
|
||
}
|
||
if config.NatsURL == "" {
|
||
config.NatsURL = "nats://localhost:4222"
|
||
}
|
||
if config.Subjects == "" {
|
||
config.Subjects = ">"
|
||
}
|
||
if config.Port == "" {
|
||
config.Port = "8080"
|
||
}
|
||
if config.MaxMessages == 0 {
|
||
config.MaxMessages = 1000
|
||
}
|
||
return &config, nil
|
||
}
|
||
|
||
func main() {
|
||
configFile := "config.yaml"
|
||
if len(os.Args) > 1 {
|
||
configFile = os.Args[1]
|
||
}
|
||
config, err := loadConfig(configFile)
|
||
if err != nil {
|
||
log.Fatalf("Failed to load config: %v", err)
|
||
}
|
||
log.Printf("Loaded configuration from %s", configFile)
|
||
store := NewMessageStore(config.MaxMessages)
|
||
hub := NewHub()
|
||
go hub.Run()
|
||
var opts []nats.Option
|
||
opts = append(opts, nats.Name("nats-ui"))
|
||
if config.Token != "" {
|
||
opts = append(opts, nats.Token(config.Token))
|
||
log.Printf("Using token authentication")
|
||
} else if config.Username != "" || config.Password != "" {
|
||
opts = append(opts, nats.UserInfo(config.Username, config.Password))
|
||
log.Printf("Using username/password authentication")
|
||
}
|
||
nc, err := nats.Connect(config.NatsURL, opts...)
|
||
if err != nil {
|
||
log.Fatalf("Failed to connect to NATS: %v", err)
|
||
}
|
||
defer nc.Close()
|
||
log.Printf("Connected to NATS at %s", config.NatsURL)
|
||
subjectsList := parseSubjects(config.Subjects)
|
||
for _, subject := range subjectsList {
|
||
subj := subject
|
||
sub, err := nc.Subscribe(subj, func(msg *nats.Msg) {
|
||
producer := extractProducerFromPayload(msg.Data)
|
||
message := Message{
|
||
ID: fmt.Sprintf("%d", time.Now().UnixNano()),
|
||
Subject: msg.Subject,
|
||
Data: string(msg.Data),
|
||
Timestamp: time.Now(),
|
||
Size: len(msg.Data),
|
||
Producer: producer,
|
||
Consumer: "nats-ui",
|
||
}
|
||
store.Add(message)
|
||
hub.broadcast <- message
|
||
log.Printf("Received message on %s: %d bytes", msg.Subject, len(msg.Data))
|
||
})
|
||
if err != nil {
|
||
log.Fatalf("Failed to subscribe to %s: %v", subj, err)
|
||
}
|
||
log.Printf("Subscribed to: %s", subj)
|
||
defer func() {
|
||
if err := sub.Unsubscribe(); err != nil {
|
||
log.Printf("Failed to unsubscribe from %s: %v", subj, err)
|
||
}
|
||
}()
|
||
}
|
||
// Определяем путь к index.html
|
||
indexPath := "index.html"
|
||
if _, err := os.Stat(indexPath); os.IsNotExist(err) {
|
||
// Если не найден в текущей директории, пробуем рядом с исполняемым файлом
|
||
exePath, err := os.Executable()
|
||
if err == nil {
|
||
exeDir := filepath.Dir(exePath)
|
||
indexPath = filepath.Join(exeDir, "index.html")
|
||
}
|
||
}
|
||
|
||
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||
if r.URL.Path != "/" {
|
||
http.NotFound(w, r)
|
||
return
|
||
}
|
||
http.ServeFile(w, r, indexPath)
|
||
})
|
||
http.HandleFunc("/api/messages", func(w http.ResponseWriter, r *http.Request) {
|
||
w.Header().Set("Content-Type", "application/json")
|
||
subject := r.URL.Query().Get("subject")
|
||
var messages []Message
|
||
if subject != "" {
|
||
messages = store.GetBySubject(subject)
|
||
} else {
|
||
messages = store.GetAll()
|
||
}
|
||
if err := json.NewEncoder(w).Encode(messages); err != nil {
|
||
log.Printf("Failed to encode messages: %v", err)
|
||
}
|
||
})
|
||
http.HandleFunc("/api/subjects", func(w http.ResponseWriter, r *http.Request) {
|
||
w.Header().Set("Content-Type", "application/json")
|
||
if err := json.NewEncoder(w).Encode(store.GetSubjects()); err != nil {
|
||
log.Printf("Failed to encode subjects: %v", err)
|
||
}
|
||
})
|
||
http.HandleFunc("/api/connections", func(w http.ResponseWriter, r *http.Request) {
|
||
w.Header().Set("Content-Type", "application/json")
|
||
connectionsHandler(w, r, config)
|
||
})
|
||
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
|
||
conn, err := upgrader.Upgrade(w, r, nil)
|
||
if err != nil {
|
||
log.Printf("WebSocket upgrade error: %v", err)
|
||
return
|
||
}
|
||
hub.register <- conn
|
||
|
||
// Отправляем все существующие сообщения одним массивом при подключении
|
||
go func() {
|
||
messages := store.GetAll()
|
||
// Отправляем массив всех сообщений
|
||
if err := conn.WriteJSON(messages); err != nil {
|
||
log.Printf("Error sending initial messages: %v", err)
|
||
return
|
||
}
|
||
}()
|
||
|
||
// Читаем сообщения от клиента (для keep-alive)
|
||
go func() {
|
||
for {
|
||
_, _, err := conn.ReadMessage()
|
||
if err != nil {
|
||
hub.unregister <- conn
|
||
break
|
||
}
|
||
}
|
||
}()
|
||
})
|
||
log.Printf("Starting HTTP server on port %s", config.Port)
|
||
log.Printf("Open http://localhost:%s in your browser", config.Port)
|
||
addr := ":" + config.Port
|
||
log.Fatal(http.ListenAndServe(addr, nil))
|
||
}
|
||
|
||
func parseSubjects(subjectsStr string) []string {
|
||
if subjectsStr == ">" {
|
||
return []string{">"}
|
||
}
|
||
var result []string
|
||
subjects := []rune(subjectsStr)
|
||
current := ""
|
||
for i, r := range subjects {
|
||
if r == ',' {
|
||
if current != "" {
|
||
result = append(result, current)
|
||
current = ""
|
||
}
|
||
} else {
|
||
current += string(r)
|
||
}
|
||
if i == len(subjects)-1 && current != "" {
|
||
result = append(result, current)
|
||
}
|
||
}
|
||
if len(result) == 0 {
|
||
result = []string{">"}
|
||
}
|
||
return result
|
||
}
|