go-whisper-api/api/queue_worker.go
admin b5c083e06f
Some checks failed
CodeQL / Analyze (go) (push) Successful in 6m28s
Docker Image / build-docker (push) Failing after 13m26s
Lint and Testing / lint (push) Successful in 11m17s
Lint and Testing / test (push) Successful in 11m17s
Lint and Testing / golangci (push) Successful in 2m40s
first commit
2026-06-04 18:10:52 +07:00

99 lines
2.6 KiB
Go

package api
import (
"context"
"time"
"go-whisper-api/whisper"
"github.com/rs/zerolog/log"
)
func (s *Server) StartWorker(ctx context.Context) {
go s.queueWorker(ctx)
}
func (s *Server) queueWorker(ctx context.Context) {
const idlePoll = 2 * time.Second
for {
if ctx.Err() != nil {
return
}
id, ok, err := s.cache.NextWaiting()
if err != nil {
log.Error().Err(err).Msg("cache queue scan")
if !sleepOrWake(ctx, s.queueWake, time.Second) {
return
}
continue
}
if ok {
s.processCacheTask(ctx, id)
continue
}
if !sleepOrWake(ctx, s.queueWake, idlePoll) {
return
}
}
}
func sleepOrWake(ctx context.Context, wake <-chan struct{}, d time.Duration) bool {
timer := time.NewTimer(d)
defer timer.Stop()
select {
case <-ctx.Done():
return false
case <-wake:
return true
case <-timer.C:
return true
}
}
func (s *Server) processCacheTask(ctx context.Context, id string) {
params, _, err := s.cache.LoadParams(id)
if err != nil {
return
}
modelPath, err := s.models.Path(params.Model)
if err != nil {
s.completeCacheTask(id, whisper.TranscriptResult{}, err.Error())
return
}
if err := s.cache.SetStatus(id, statusProcessing, nil); err != nil {
log.Error().Err(err).Str("task", id).Msg("set processing")
return
}
audioPath, ok := s.cache.AudioPath(id)
if !ok {
s.completeCacheTask(id, whisper.TranscriptResult{}, "audio file missing")
return
}
stt := sttOptions{
language: params.Language,
punctuate: params.Punctuation,
speakers: params.Speakers,
numClusters: params.NumClusters,
}
if stt.language == "" {
stt.language = s.cfg.Language
}
result, err := s.transcribe(ctx, modelPath, audioPath, stt)
if err != nil {
s.completeCacheTask(id, result, err.Error())
log.Error().Err(err).Str("task", id).Msg("async transcribe")
return
}
s.completeCacheTask(id, result, "")
}
func (s *Server) completeCacheTask(id string, result whisper.TranscriptResult, errMsg string) {
if err := s.cache.FinishWaiting(id, result, errMsg, nil); err != nil {
log.Error().Err(err).Str("task", id).Str("cache", s.cache.Root()).Msg("finish task")
return
}
if err := s.cache.PromoteToReady(id); err != nil {
log.Error().Err(err).Str("task", id).Str("cache", s.cache.Root()).Msg("promote to ready")
}
}