nats-ui/main.go
2026-02-06 15:50:12 +07:00

311 lines
9.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"path/filepath"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/nats-io/nats.go"
"gopkg.in/yaml.v3"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type Message struct {
ID string `json:"id"`
Subject string `json:"subject"`
Data string `json:"data"`
Timestamp time.Time `json:"timestamp"`
Size int `json:"size"`
}
type MessageStore struct {
mu sync.RWMutex
messages []Message
maxSize int
}
func NewMessageStore(maxSize int) *MessageStore {
return &MessageStore{messages: make([]Message, 0), maxSize: maxSize}
}
func (ms *MessageStore) Add(msg Message) {
ms.mu.Lock()
defer ms.mu.Unlock()
ms.messages = append(ms.messages, msg)
if len(ms.messages) > ms.maxSize {
ms.messages = ms.messages[1:]
}
}
func (ms *MessageStore) GetAll() []Message {
ms.mu.RLock()
defer ms.mu.RUnlock()
result := make([]Message, len(ms.messages))
copy(result, ms.messages)
return result
}
func (ms *MessageStore) GetBySubject(subject string) []Message {
ms.mu.RLock()
defer ms.mu.RUnlock()
var result []Message
for _, msg := range ms.messages {
if msg.Subject == subject {
result = append(result, msg)
}
}
return result
}
func (ms *MessageStore) GetSubjects() []string {
ms.mu.RLock()
defer ms.mu.RUnlock()
subjectMap := make(map[string]bool)
for _, msg := range ms.messages {
subjectMap[msg.Subject] = true
}
subjects := make([]string, 0, len(subjectMap))
for subject := range subjectMap {
subjects = append(subjects, subject)
}
return subjects
}
type Hub struct {
clients map[*websocket.Conn]bool
broadcast chan Message
register chan *websocket.Conn
unregister chan *websocket.Conn
}
func NewHub() *Hub {
return &Hub{
clients: make(map[*websocket.Conn]bool),
broadcast: make(chan Message, 256),
register: make(chan *websocket.Conn),
unregister: make(chan *websocket.Conn),
}
}
func (h *Hub) Run() {
for {
select {
case conn := <-h.register:
h.clients[conn] = true
case conn := <-h.unregister:
if _, ok := h.clients[conn]; ok {
delete(h.clients, conn)
if err := conn.Close(); err != nil {
log.Printf("WebSocket close error: %v", err)
}
}
case message := <-h.broadcast:
for conn := range h.clients {
err := conn.WriteJSON(message)
if err != nil {
log.Printf("WebSocket error: %v", err)
delete(h.clients, conn)
if closeErr := conn.Close(); closeErr != nil {
log.Printf("WebSocket close error: %v", closeErr)
}
}
}
}
}
}
type Config struct {
NatsURL string `yaml:"nats_url"`
Subjects string `yaml:"subjects"`
Port string `yaml:"port"`
Token string `yaml:"token"`
Username string `yaml:"username"`
Password string `yaml:"password"`
MaxMessages int `yaml:"max_messages"`
}
func loadConfig(filename string) (*Config, error) {
data, err := os.ReadFile(filename)
if err != nil {
return nil, fmt.Errorf("failed to read config file: %w", err)
}
var config Config
if err := yaml.Unmarshal(data, &config); err != nil {
return nil, fmt.Errorf("failed to parse config file: %w", err)
}
if config.NatsURL == "" {
config.NatsURL = "nats://localhost:4222"
}
if config.Subjects == "" {
config.Subjects = ">"
}
if config.Port == "" {
config.Port = "8080"
}
if config.MaxMessages == 0 {
config.MaxMessages = 1000
}
return &config, nil
}
func main() {
configFile := "config.yaml"
if len(os.Args) > 1 {
configFile = os.Args[1]
}
config, err := loadConfig(configFile)
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
log.Printf("Loaded configuration from %s", configFile)
store := NewMessageStore(config.MaxMessages)
hub := NewHub()
go hub.Run()
var opts []nats.Option
if config.Token != "" {
opts = append(opts, nats.Token(config.Token))
log.Printf("Using token authentication")
} else if config.Username != "" || config.Password != "" {
opts = append(opts, nats.UserInfo(config.Username, config.Password))
log.Printf("Using username/password authentication")
}
nc, err := nats.Connect(config.NatsURL, opts...)
if err != nil {
log.Fatalf("Failed to connect to NATS: %v", err)
}
defer nc.Close()
log.Printf("Connected to NATS at %s", config.NatsURL)
subjectsList := parseSubjects(config.Subjects)
for _, subject := range subjectsList {
subj := subject
sub, err := nc.Subscribe(subj, func(msg *nats.Msg) {
message := Message{
ID: fmt.Sprintf("%d", time.Now().UnixNano()),
Subject: msg.Subject,
Data: string(msg.Data),
Timestamp: time.Now(),
Size: len(msg.Data),
}
store.Add(message)
hub.broadcast <- message
log.Printf("Received message on %s: %d bytes", msg.Subject, len(msg.Data))
})
if err != nil {
log.Fatalf("Failed to subscribe to %s: %v", subj, err)
}
log.Printf("Subscribed to: %s", subj)
defer func() {
if err := sub.Unsubscribe(); err != nil {
log.Printf("Failed to unsubscribe from %s: %v", subj, err)
}
}()
}
// Определяем путь к index.html
indexPath := "index.html"
if _, err := os.Stat(indexPath); os.IsNotExist(err) {
// Если не найден в текущей директории, пробуем рядом с исполняемым файлом
exePath, err := os.Executable()
if err == nil {
exeDir := filepath.Dir(exePath)
indexPath = filepath.Join(exeDir, "index.html")
}
}
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
http.NotFound(w, r)
return
}
http.ServeFile(w, r, indexPath)
})
http.HandleFunc("/api/messages", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
subject := r.URL.Query().Get("subject")
var messages []Message
if subject != "" {
messages = store.GetBySubject(subject)
} else {
messages = store.GetAll()
}
if err := json.NewEncoder(w).Encode(messages); err != nil {
log.Printf("Failed to encode messages: %v", err)
}
})
http.HandleFunc("/api/subjects", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(store.GetSubjects()); err != nil {
log.Printf("Failed to encode subjects: %v", err)
}
})
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("WebSocket upgrade error: %v", err)
return
}
hub.register <- conn
// Отправляем все существующие сообщения одним массивом при подключении
go func() {
messages := store.GetAll()
// Отправляем массив всех сообщений
if err := conn.WriteJSON(messages); err != nil {
log.Printf("Error sending initial messages: %v", err)
return
}
}()
// Читаем сообщения от клиента (для keep-alive)
go func() {
for {
_, _, err := conn.ReadMessage()
if err != nil {
hub.unregister <- conn
break
}
}
}()
})
log.Printf("Starting HTTP server on port %s", config.Port)
log.Printf("Open http://localhost:%s in your browser", config.Port)
addr := ":" + config.Port
log.Fatal(http.ListenAndServe(addr, nil))
}
func parseSubjects(subjectsStr string) []string {
if subjectsStr == ">" {
return []string{">"}
}
var result []string
subjects := []rune(subjectsStr)
current := ""
for i, r := range subjects {
if r == ',' {
if current != "" {
result = append(result, current)
current = ""
}
} else {
current += string(r)
}
if i == len(subjects)-1 && current != "" {
result = append(result, current)
}
}
if len(result) == 0 {
result = []string{">"}
}
return result
}