package main import ( "encoding/json" "fmt" "log" "net/http" "os" "path/filepath" "sync" "time" "github.com/gorilla/websocket" "github.com/nats-io/nats.go" "gopkg.in/yaml.v3" ) var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } type Message struct { ID string `json:"id"` Subject string `json:"subject"` Data string `json:"data"` Timestamp time.Time `json:"timestamp"` Size int `json:"size"` } type MessageStore struct { mu sync.RWMutex messages []Message maxSize int } func NewMessageStore(maxSize int) *MessageStore { return &MessageStore{messages: make([]Message, 0), maxSize: maxSize} } func (ms *MessageStore) Add(msg Message) { ms.mu.Lock() defer ms.mu.Unlock() ms.messages = append(ms.messages, msg) if len(ms.messages) > ms.maxSize { ms.messages = ms.messages[1:] } } func (ms *MessageStore) GetAll() []Message { ms.mu.RLock() defer ms.mu.RUnlock() result := make([]Message, len(ms.messages)) copy(result, ms.messages) return result } func (ms *MessageStore) GetBySubject(subject string) []Message { ms.mu.RLock() defer ms.mu.RUnlock() var result []Message for _, msg := range ms.messages { if msg.Subject == subject { result = append(result, msg) } } return result } func (ms *MessageStore) GetSubjects() []string { ms.mu.RLock() defer ms.mu.RUnlock() subjectMap := make(map[string]bool) for _, msg := range ms.messages { subjectMap[msg.Subject] = true } subjects := make([]string, 0, len(subjectMap)) for subject := range subjectMap { subjects = append(subjects, subject) } return subjects } type Hub struct { clients map[*websocket.Conn]bool broadcast chan Message register chan *websocket.Conn unregister chan *websocket.Conn } func NewHub() *Hub { return &Hub{ clients: make(map[*websocket.Conn]bool), broadcast: make(chan Message, 256), register: make(chan *websocket.Conn), unregister: make(chan *websocket.Conn), } } func (h *Hub) Run() { for { select { case conn := <-h.register: h.clients[conn] = true case conn := <-h.unregister: if _, ok := h.clients[conn]; ok { delete(h.clients, conn) conn.Close() } case message := <-h.broadcast: for conn := range h.clients { err := conn.WriteJSON(message) if err != nil { log.Printf("WebSocket error: %v", err) delete(h.clients, conn) conn.Close() } } } } } type Config struct { NatsURL string `yaml:"nats_url"` Subjects string `yaml:"subjects"` Port string `yaml:"port"` Token string `yaml:"token"` Username string `yaml:"username"` Password string `yaml:"password"` MaxMessages int `yaml:"max_messages"` } func loadConfig(filename string) (*Config, error) { data, err := os.ReadFile(filename) if err != nil { return nil, fmt.Errorf("failed to read config file: %w", err) } var config Config if err := yaml.Unmarshal(data, &config); err != nil { return nil, fmt.Errorf("failed to parse config file: %w", err) } if config.NatsURL == "" { config.NatsURL = "nats://localhost:4222" } if config.Subjects == "" { config.Subjects = ">" } if config.Port == "" { config.Port = "8080" } if config.MaxMessages == 0 { config.MaxMessages = 1000 } return &config, nil } func main() { configFile := "config.yaml" if len(os.Args) > 1 { configFile = os.Args[1] } config, err := loadConfig(configFile) if err != nil { log.Fatalf("Failed to load config: %v", err) } log.Printf("Loaded configuration from %s", configFile) store := NewMessageStore(config.MaxMessages) hub := NewHub() go hub.Run() var opts []nats.Option if config.Token != "" { opts = append(opts, nats.Token(config.Token)) log.Printf("Using token authentication") } else if config.Username != "" || config.Password != "" { opts = append(opts, nats.UserInfo(config.Username, config.Password)) log.Printf("Using username/password authentication") } nc, err := nats.Connect(config.NatsURL, opts...) if err != nil { log.Fatalf("Failed to connect to NATS: %v", err) } defer nc.Close() log.Printf("Connected to NATS at %s", config.NatsURL) subjectsList := parseSubjects(config.Subjects) for _, subject := range subjectsList { sub, err := nc.Subscribe(subject, func(msg *nats.Msg) { message := Message{ ID: fmt.Sprintf("%d", time.Now().UnixNano()), Subject: msg.Subject, Data: string(msg.Data), Timestamp: time.Now(), Size: len(msg.Data), } store.Add(message) hub.broadcast <- message log.Printf("Received message on %s: %d bytes", msg.Subject, len(msg.Data)) }) if err != nil { log.Fatalf("Failed to subscribe to %s: %v", subject, err) } log.Printf("Subscribed to: %s", subject) defer sub.Unsubscribe() } // Определяем путь к index.html indexPath := "index.html" if _, err := os.Stat(indexPath); os.IsNotExist(err) { // Если не найден в текущей директории, пробуем рядом с исполняемым файлом exePath, err := os.Executable() if err == nil { exeDir := filepath.Dir(exePath) indexPath = filepath.Join(exeDir, "index.html") } } http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { if r.URL.Path != "/" { http.NotFound(w, r) return } http.ServeFile(w, r, indexPath) }) http.HandleFunc("/api/messages", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") subject := r.URL.Query().Get("subject") var messages []Message if subject != "" { messages = store.GetBySubject(subject) } else { messages = store.GetAll() } json.NewEncoder(w).Encode(messages) }) http.HandleFunc("/api/subjects", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(store.GetSubjects()) }) http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Printf("WebSocket upgrade error: %v", err) return } hub.register <- conn // Отправляем все существующие сообщения одним массивом при подключении go func() { messages := store.GetAll() // Отправляем массив всех сообщений if err := conn.WriteJSON(messages); err != nil { log.Printf("Error sending initial messages: %v", err) return } }() // Читаем сообщения от клиента (для keep-alive) go func() { for { _, _, err := conn.ReadMessage() if err != nil { hub.unregister <- conn break } } }() }) log.Printf("Starting HTTP server on port %s", config.Port) log.Printf("Open http://localhost:%s in your browser", config.Port) addr := ":" + config.Port log.Fatal(http.ListenAndServe(addr, nil)) } func parseSubjects(subjectsStr string) []string { if subjectsStr == ">" { return []string{">"} } var result []string subjects := []rune(subjectsStr) current := "" for i, r := range subjects { if r == ',' { if current != "" { result = append(result, current) current = "" } } else { current += string(r) } if i == len(subjects)-1 && current != "" { result = append(result, current) } } if len(result) == 0 { result = []string{">"} } return result }