package api import ( "encoding/json" "fmt" "io" "os" "path/filepath" "sort" "strings" "time" "go-whisper-api/whisper" ) const ( cacheWaiting = "waiting" cacheReady = "ready" fileParams = "params.conf" fileAudio = "audio.wav" fileAudioJSON = "audio.json" ) type TaskParams struct { ID string `json:"id"` Created string `json:"created"` Processed string `json:"processed,omitempty"` Status string `json:"status"` Model string `json:"model"` Language string `json:"language,omitempty"` Punctuation bool `json:"punctuation,omitempty"` Speakers bool `json:"speakers,omitempty"` NumClusters int `json:"num_clusters,omitempty"` Text string `json:"text,omitempty"` Words []whisper.Word `json:"words,omitempty"` Error string `json:"error,omitempty"` } type AudioJSON struct { Waveform []float64 `json:"waveform"` Buckets int `json:"buckets"` } type DiskCache struct { root string } func (c *DiskCache) Root() string { return c.root } func resolveCacheRoot(root string) (string, error) { if root == "" { root = "./cache" } abs, err := filepath.Abs(root) if err != nil { return "", fmt.Errorf("cache dir: %w", err) } return filepath.Clean(abs), nil } func NewDiskCache(root string) (*DiskCache, error) { abs, err := resolveCacheRoot(root) if err != nil { return nil, err } c := &DiskCache{root: abs} for _, sub := range []string{cacheWaiting, cacheReady} { if err := os.MkdirAll(filepath.Join(abs, sub), 0o755); err != nil { return nil, err } } return c, nil } func (c *DiskCache) waitingDir(id string) string { return filepath.Join(c.root, cacheWaiting, id) } func (c *DiskCache) readyDir(id string) string { return filepath.Join(c.root, cacheReady, id) } func (c *DiskCache) locate(id string) (dir, phase string, ok bool) { if id == "" { return "", "", false } ready := c.readyDir(id) if st, err := os.Stat(ready); err == nil && st.IsDir() { return ready, cacheReady, true } waiting := c.waitingDir(id) if st, err := os.Stat(waiting); err == nil && st.IsDir() { return waiting, cacheWaiting, true } return "", "", false } func (c *DiskCache) Enqueue(id string, params TaskParams, audioWavPath string) error { dir := c.waitingDir(id) if err := os.MkdirAll(dir, 0o755); err != nil { return err } params.ID = id if err := c.writeParams(dir, params); err != nil { _ = os.RemoveAll(dir) return err } dst := filepath.Join(dir, fileAudio) if err := os.Rename(audioWavPath, dst); err != nil { if err2 := copyFile(audioWavPath, dst); err2 != nil { _ = os.RemoveAll(dir) return fmt.Errorf("move audio to %s: %w", dst, err2) } _ = os.Remove(audioWavPath) } return nil } func (c *DiskCache) writeParams(dir string, params TaskParams) error { data, err := json.Marshal(params) if err != nil { return err } return os.WriteFile(filepath.Join(dir, fileParams), data, 0o644) } func (c *DiskCache) LoadParams(id string) (TaskParams, string, error) { dir, phase, ok := c.locate(id) if !ok { return TaskParams{}, "", fmt.Errorf("task not found") } params, err := c.readParams(dir) if err != nil { return TaskParams{}, "", err } return params, phase, nil } func (c *DiskCache) readParams(dir string) (TaskParams, error) { path := filepath.Join(dir, fileParams) data, err := os.ReadFile(path) if err != nil { return TaskParams{}, fmt.Errorf("read %s: %w", path, err) } var params TaskParams if err := json.Unmarshal(data, ¶ms); err != nil { return TaskParams{}, err } return params, nil } func (c *DiskCache) List() (map[string]map[string]string, error) { out := make(map[string]map[string]string) for _, phase := range []string{cacheWaiting, cacheReady} { base := filepath.Join(c.root, phase) entries, err := os.ReadDir(base) if err != nil { if os.IsNotExist(err) { continue } return nil, err } for _, e := range entries { if !e.IsDir() { continue } id := e.Name() params, err := c.readParams(filepath.Join(base, id)) if err != nil { continue } out[id] = map[string]string{ "created": params.Created, "status": params.Status, } } } return out, nil } func (c *DiskCache) NextWaiting() (string, bool, error) { base := filepath.Join(c.root, cacheWaiting) entries, err := os.ReadDir(base) if err != nil { if os.IsNotExist(err) { return "", false, nil } return "", false, err } type item struct { id string created time.Time } var pending []item for _, e := range entries { if !e.IsDir() { continue } params, err := c.readParams(filepath.Join(base, e.Name())) if err != nil || params.Status != string(statusWaiting) { continue } t, _ := time.ParseInLocation("2006-01-02 15:04:05", params.Created, time.Local) pending = append(pending, item{id: e.Name(), created: t}) } if len(pending) == 0 { return "", false, nil } sort.Slice(pending, func(i, j int) bool { return pending[i].created.Before(pending[j].created) }) return pending[0].id, true, nil } func (c *DiskCache) SetStatus(id string, status taskStatus, mutate func(*TaskParams)) error { dir := c.waitingDir(id) if _, err := os.Stat(dir); err != nil { return fmt.Errorf("task %s not in waiting", id) } params, err := c.readParams(dir) if err != nil { return err } params.Status = string(status) if mutate != nil { mutate(¶ms) } return c.writeParams(dir, params) } func (c *DiskCache) FinishWaiting(id string, result whisper.TranscriptResult, errMsg string, waveform []float64) error { dir := c.waitingDir(id) params, err := c.readParams(dir) if err != nil { return err } params.Processed = time.Now().Format("2006-01-02 15:04:05") if errMsg != "" { params.Status = string(statusError) params.Error = errMsg } else { params.Status = string(statusReady) params.Text = result.Text params.Words = result.Words } if err := c.writeParams(dir, params); err != nil { return fmt.Errorf("update %s: %w", filepath.Join(dir, fileParams), err) } if len(waveform) > 0 { aj := AudioJSON{Waveform: waveform, Buckets: len(waveform)} data, err := json.Marshal(aj) if err != nil { return err } if err := os.WriteFile(filepath.Join(dir, fileAudioJSON), data, 0o644); err != nil { return err } } return nil } func (c *DiskCache) PromoteToReady(id string) error { if _, phase, ok := c.locate(id); ok && phase == cacheReady { return nil } src := c.waitingDir(id) dst := c.readyDir(id) if _, err := os.Stat(src); err != nil { return fmt.Errorf("task %s not in waiting", id) } if _, err := os.Stat(dst); err == nil { return os.RemoveAll(src) } return os.Rename(src, dst) } func (c *DiskCache) Delete(id string) bool { dir, _, ok := c.locate(id) if !ok { return false } _ = os.RemoveAll(dir) return true } func (c *DiskCache) AudioPath(id string) (string, bool) { dir, _, ok := c.locate(id) if !ok { return "", false } p := filepath.Join(dir, fileAudio) if _, err := os.Stat(p); err != nil { return "", false } return p, true } func (c *DiskCache) Waveform(id string) ([]float64, error) { dir, _, ok := c.locate(id) if !ok { return nil, fmt.Errorf("task not found") } data, err := os.ReadFile(filepath.Join(dir, fileAudioJSON)) if err == nil { var aj AudioJSON if json.Unmarshal(data, &aj) == nil && len(aj.Waveform) > 0 { return aj.Waveform, nil } } return waveformFromWav(filepath.Join(dir, fileAudio), 512) } func (c *DiskCache) RecoverInterrupted() error { base := filepath.Join(c.root, cacheWaiting) entries, err := os.ReadDir(base) if err != nil { if os.IsNotExist(err) { return nil } return err } for _, e := range entries { if !e.IsDir() { continue } id := e.Name() dir := filepath.Join(base, id) params, err := c.readParams(dir) if err != nil { continue } switch params.Status { case string(statusProcessing): params.Status = string(statusWaiting) _ = c.writeParams(dir, params) case string(statusReady), string(statusError): _ = c.PromoteToReady(id) } } return nil } func copyFile(src, dst string) error { in, err := os.Open(src) if err != nil { return err } defer in.Close() out, err := os.Create(dst) if err != nil { return err } defer out.Close() _, err = io.Copy(out, in) return err } func isValidTaskID(id string) bool { return id != "" && !strings.Contains(id, "..") && !strings.ContainsAny(id, `/\`) }