Optimize backend search and evaluation pipeline
build-push / docker (push) Has been cancelled

This commit is contained in:
AI Assistant
2026-03-16 16:13:43 +09:00
parent c5f6c611ec
commit 60fdd7842c
7 changed files with 371 additions and 10 deletions
+31 -2
View File
@@ -444,9 +444,25 @@ func (a *App) searchMedia(c *gin.Context) {
if extraErr == nil && len(extraResults) > 0 {
results = mergeSearchResults(results, extraResults)
scored = services.RankSearchResults(strings.Join(explorationQueries[:min(len(explorationQueries), 3)], " "), results)
recommended, geminiStats, geminiErr = services.EvaluateAllCandidatesWithGeminiWithDeadline(a.GeminiService, req.Query, scored, deadline.Add(-3*time.Second))
a.debug("search supplemental query variants", gin.H{"variants": explorationQueries, "variantCount": len(explorationQueries)})
a.debug("search gemini evaluation after supplemental search", geminiStats)
reviewedLinks := services.ReviewedRecommendationLinks(recommended)
supplementalCandidates := services.SelectUnevaluatedCandidates(scored, reviewedLinks, services.RemainingGeminiCapacity(recommended))
if len(supplementalCandidates) > 0 {
extraRecommended, extraStats, extraGeminiErr := services.EvaluateAllCandidatesWithGeminiWithDeadline(
a.GeminiService,
req.Query,
supplementalCandidates,
deadline.Add(-3*time.Second),
)
recommended = services.MergeUniqueRecommendations(recommended, extraRecommended)
geminiStats = services.MergeGeminiBatchStats(geminiStats, extraStats)
geminiStats.RecommendedCount = len(services.ReviewedRecommendationLinks(recommended))
geminiErr = combineSearchWarnings(geminiErr, extraGeminiErr)
a.debug("search gemini evaluation after supplemental search", gin.H{
"stats": geminiStats,
"supplementalCount": len(supplementalCandidates),
})
}
}
}
if geminiErr != nil && len(recommended) == 0 {
@@ -595,6 +611,19 @@ func mergeSearchResults(base, extra []services.SearchResult) []services.SearchRe
return merged
}
func combineSearchWarnings(base, extra error) error {
switch {
case base == nil:
return extra
case extra == nil:
return base
case base.Error() == extra.Error():
return base
default:
return fmt.Errorf("%s; %s", base.Error(), extra.Error())
}
}
func summarizeSearchResults(results []services.SearchResult, duration time.Duration, geminiCap int, warning string) searchDebugSummary {
bySource := map[string]int{}
withPreview := 0
+129 -2
View File
@@ -33,6 +33,19 @@ type SearchService struct {
Client *http.Client
collectors []searchCollector
Debug func(message string, data any)
cacheMu sync.Mutex
searchCache map[string]cachedSearchResults
fetchCache map[string]cachedFetchResult
}
type cachedSearchResults struct {
items []SearchResult
expiresAt time.Time
}
type cachedFetchResult struct {
body string
expiresAt time.Time
}
func NewSearchService(baseURL, googleVideoEngine, webEngine string) *SearchService {
@@ -52,6 +65,8 @@ func NewSearchService(baseURL, googleVideoEngine, webEngine string) *SearchServi
artgridCollector{},
googleVideoCollector{},
},
searchCache: map[string]cachedSearchResults{},
fetchCache: map[string]cachedFetchResult{},
}
}
@@ -102,6 +117,7 @@ func (s *SearchService) SearchMediaWithDeadline(queries []string, enabledPlatfor
}
searchQueries := collector.BuildQueries(base)
shuffleStrings(searchQueries)
searchQueries = limitCollectorQueries(collector.Name(), searchQueries, onlyMissing)
s.debug("search_service:collector_queries", map[string]any{
"collector": collector.Name(),
"base": base,
@@ -392,6 +408,24 @@ func (s *SearchService) enrichArtgrid(result SearchResult) SearchResult {
}
func (s *SearchService) search(query, categories, engine, source string) ([]SearchResult, error) {
cacheKey := strings.Join([]string{
s.BaseURL,
query,
categories,
engine,
source,
}, "\n")
if cached, ok := s.getCachedSearchResults(cacheKey); ok {
s.debug("search_service:searx_cache_hit", map[string]any{
"query": query,
"categories": categories,
"engine": engine,
"source": source,
"count": len(cached),
})
return cached, nil
}
values := url.Values{}
values.Set("q", query)
values.Set("format", "json")
@@ -458,9 +492,60 @@ func (s *SearchService) search(query, categories, engine, source string) ([]Sear
Source: normalizeSource(source, link, item.Engine),
})
}
s.setCachedSearchResults(cacheKey, results, 2*time.Minute)
return results, nil
}
func (s *SearchService) getCachedSearchResults(key string) ([]SearchResult, bool) {
s.cacheMu.Lock()
defer s.cacheMu.Unlock()
entry, ok := s.searchCache[key]
if !ok {
return nil, false
}
if time.Now().After(entry.expiresAt) {
delete(s.searchCache, key)
return nil, false
}
return cloneSearchResults(entry.items), true
}
func (s *SearchService) setCachedSearchResults(key string, items []SearchResult, ttl time.Duration) {
s.cacheMu.Lock()
defer s.cacheMu.Unlock()
s.searchCache[key] = cachedSearchResults{
items: cloneSearchResults(items),
expiresAt: time.Now().Add(ttl),
}
}
func (s *SearchService) getCachedFetchResult(key string) (string, bool) {
s.cacheMu.Lock()
defer s.cacheMu.Unlock()
entry, ok := s.fetchCache[key]
if !ok {
return "", false
}
if time.Now().After(entry.expiresAt) {
delete(s.fetchCache, key)
return "", false
}
return entry.body, true
}
func (s *SearchService) setCachedFetchResult(key, body string, ttl time.Duration) {
s.cacheMu.Lock()
defer s.cacheMu.Unlock()
s.fetchCache[key] = cachedFetchResult{
body: body,
expiresAt: time.Now().Add(ttl),
}
}
func (s *SearchService) debug(message string, data any) {
if s != nil && s.Debug != nil {
s.Debug(message, data)
@@ -750,6 +835,12 @@ func pickVideoURL(urls []string) string {
}
func (s *SearchService) fetchText(target string) (string, error) {
cacheKey := "html\n" + target
if cached, ok := s.getCachedFetchResult(cacheKey); ok {
s.debug("search_service:fetch_cache_hit", map[string]any{"type": "html", "target": target, "bytes": len(cached)})
return cached, nil
}
req, err := newBrowserRequest(http.MethodGet, target, "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8")
if err != nil {
return "", err
@@ -772,10 +863,18 @@ func (s *SearchService) fetchText(target string) (string, error) {
if looksLikeCloudflareChallenge(string(data)) {
return fetchTextViaPython(target)
}
return string(data), nil
body := string(data)
s.setCachedFetchResult(cacheKey, body, 3*time.Minute)
return body, nil
}
func (s *SearchService) fetchJSONText(target string) (string, error) {
cacheKey := "json\n" + target
if cached, ok := s.getCachedFetchResult(cacheKey); ok {
s.debug("search_service:fetch_cache_hit", map[string]any{"type": "json", "target": target, "bytes": len(cached)})
return cached, nil
}
req, err := newBrowserRequest(http.MethodGet, target, "application/json, text/json, */*")
if err != nil {
return "", err
@@ -792,7 +891,9 @@ func (s *SearchService) fetchJSONText(target string) (string, error) {
if err != nil {
return "", err
}
return string(data), nil
body := string(data)
s.setCachedFetchResult(cacheKey, body, 3*time.Minute)
return body, nil
}
func firstNonEmpty(values ...string) string {
@@ -1086,6 +1187,32 @@ func limitQueries(queries []string, limit int) []string {
return filtered
}
func limitCollectorQueries(collector string, queries []string, onlyMissing bool) []string {
limit := 2
switch collector {
case "Envato", "Artgrid":
limit = 3
case "Google Video":
limit = 2
}
if onlyMissing {
limit--
}
if limit < 1 {
limit = 1
}
return limitQueries(queries, limit)
}
func cloneSearchResults(items []SearchResult) []SearchResult {
if len(items) == 0 {
return []SearchResult{}
}
cloned := make([]SearchResult, len(items))
copy(cloned, items)
return cloned
}
func shuffleStrings(values []string) {
if len(values) < 2 {
return
+28
View File
@@ -4,6 +4,7 @@ import (
"encoding/base64"
"strings"
"testing"
"time"
)
func TestExtractVideoPreviewURLFindsEnvatoPreview(t *testing.T) {
@@ -113,3 +114,30 @@ func TestGeminiCandidateLimitNeverExceedsCandidates(t *testing.T) {
t.Fatalf("expected Gemini limit to stay within candidate count, got %d", got)
}
}
func TestLimitCollectorQueriesUsesSmallerBudgetForMissingPass(t *testing.T) {
queries := []string{"a", "b", "c", "d"}
got := limitCollectorQueries("Artgrid", queries, true)
if len(got) != 2 {
t.Fatalf("expected 2 queries for missing-pass Artgrid collector, got %d", len(got))
}
got = limitCollectorQueries("Google Video", queries, false)
if len(got) != 2 {
t.Fatalf("expected 2 queries for Google Video collector, got %d", len(got))
}
}
func TestSearchServiceFetchCacheRoundTrip(t *testing.T) {
service := NewSearchService("http://example.com", "", "")
service.setCachedFetchResult("html\nhttps://example.com/item", "<html></html>", time.Minute)
body, ok := service.getCachedFetchResult("html\nhttps://example.com/item")
if !ok {
t.Fatal("expected cached fetch result")
}
if body != "<html></html>" {
t.Fatalf("unexpected cached body: %q", body)
}
}
+59 -4
View File
@@ -14,6 +14,7 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"time"
)
@@ -23,6 +24,14 @@ type GeminiService struct {
GenerateEndpoint string
TranslateEndpoint string
Debug func(message string, data any)
cacheMu sync.Mutex
visualCache map[string]cachedVisualData
}
type cachedVisualData struct {
data string
mimeType string
expiresAt time.Time
}
type AIRecommendation struct {
@@ -46,6 +55,7 @@ func NewGeminiService(apiKey string) *GeminiService {
Client: &http.Client{Timeout: 40 * time.Second},
GenerateEndpoint: "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent",
TranslateEndpoint: "https://translate.googleapis.com/translate_a/single",
visualCache: map[string]cachedVisualData{},
}
}
@@ -187,7 +197,7 @@ User query: ` + query,
maxImages := min(len(candidates), 10)
visualCount := 0
for idx := 0; idx < maxImages; idx++ {
img, mimeType, err := fetchCandidateVisualInlineData(g.Client, candidates[idx])
img, mimeType, err := g.fetchCandidateVisualInlineData(candidates[idx])
if err != nil {
g.debug("gemini:vision_candidate_visual_error", map[string]any{
"index": idx,
@@ -330,6 +340,32 @@ func fetchImageAsInlineData(client *http.Client, imageURL, referer string) (stri
return base64.StdEncoding.EncodeToString(data), mimeType, nil
}
func (g *GeminiService) getCachedVisual(key string) (string, string, bool) {
g.cacheMu.Lock()
defer g.cacheMu.Unlock()
entry, ok := g.visualCache[key]
if !ok {
return "", "", false
}
if time.Now().After(entry.expiresAt) {
delete(g.visualCache, key)
return "", "", false
}
return entry.data, entry.mimeType, true
}
func (g *GeminiService) setCachedVisual(key, data, mimeType string, ttl time.Duration) {
g.cacheMu.Lock()
defer g.cacheMu.Unlock()
g.visualCache[key] = cachedVisualData{
data: data,
mimeType: mimeType,
expiresAt: time.Now().Add(ttl),
}
}
func newBrowserStyleImageRequest(imageURL, referer string) (*http.Request, error) {
req, err := http.NewRequest(http.MethodGet, imageURL, nil)
if err != nil {
@@ -344,21 +380,40 @@ func newBrowserStyleImageRequest(imageURL, referer string) (*http.Request, error
return req, nil
}
func fetchCandidateVisualInlineData(client *http.Client, candidate SearchResult) (string, string, error) {
func (g *GeminiService) fetchCandidateVisualInlineData(candidate SearchResult) (string, string, error) {
if candidate.PreviewVideoURL != "" && (candidate.Source == "Envato" || candidate.Source == "Artgrid") {
cacheKey := "frame\n" + candidate.PreviewVideoURL
if data, mimeType, ok := g.getCachedVisual(cacheKey); ok {
return data, mimeType, nil
}
data, mimeType, err := extractFrameFromVideo(candidate.PreviewVideoURL)
if err == nil {
g.setCachedVisual(cacheKey, data, mimeType, 10*time.Minute)
return data, mimeType, nil
}
}
if candidate.ThumbnailURL != "" {
data, mimeType, err := fetchImageAsInlineData(client, candidate.ThumbnailURL, candidate.Link)
cacheKey := "image\n" + candidate.ThumbnailURL
if data, mimeType, ok := g.getCachedVisual(cacheKey); ok {
return data, mimeType, nil
}
data, mimeType, err := fetchImageAsInlineData(g.Client, candidate.ThumbnailURL, candidate.Link)
if err == nil {
g.setCachedVisual(cacheKey, data, mimeType, 10*time.Minute)
return data, mimeType, nil
}
}
if candidate.PreviewVideoURL != "" {
return extractFrameFromVideo(candidate.PreviewVideoURL)
cacheKey := "frame\n" + candidate.PreviewVideoURL
if data, mimeType, ok := g.getCachedVisual(cacheKey); ok {
return data, mimeType, nil
}
data, mimeType, err := extractFrameFromVideo(candidate.PreviewVideoURL)
if err != nil {
return "", "", err
}
g.setCachedVisual(cacheKey, data, mimeType, 10*time.Minute)
return data, mimeType, nil
}
return "", "", fmt.Errorf("candidate has no thumbnail or preview video")
}
+42
View File
@@ -46,3 +46,45 @@ func TestNormalizeKnownMediaPhrases(t *testing.T) {
t.Fatalf("expected cyberpunk city, got %q", translated)
}
}
func TestSelectUnevaluatedCandidatesSkipsReviewedLinks(t *testing.T) {
ranked := []SearchResult{
{Link: "https://a.example"},
{Link: "https://b.example"},
{Link: "https://c.example"},
}
reviewed := map[string]bool{
"https://a.example": true,
}
selected := SelectUnevaluatedCandidates(ranked, reviewed, 2)
if len(selected) != 2 {
t.Fatalf("expected 2 selected candidates, got %d", len(selected))
}
if selected[0].Link != "https://b.example" || selected[1].Link != "https://c.example" {
t.Fatalf("unexpected selection order: %#v", selected)
}
}
func TestRemainingGeminiCapacityShrinksWithReviewedItems(t *testing.T) {
reviewed := []AIRecommendation{
{Link: "https://a.example"},
{Link: "https://b.example"},
}
if got := RemainingGeminiCapacity(reviewed); got != 14 {
t.Fatalf("expected 14 remaining slots, got %d", got)
}
}
func TestGeminiVisualCacheRoundTrip(t *testing.T) {
service := NewGeminiService("")
service.setCachedVisual("image\nhttps://example.com/thumb.jpg", "abc", "image/jpeg", time.Minute)
data, mimeType, ok := service.getCachedVisual("image\nhttps://example.com/thumb.jpg")
if !ok {
t.Fatal("expected visual cache hit")
}
if data != "abc" || mimeType != "image/jpeg" {
t.Fatalf("unexpected cached visual data: %q %q", data, mimeType)
}
}
+68 -2
View File
@@ -260,6 +260,72 @@ func RandomizeTopRecommendations(items []AIRecommendation, window int) []AIRecom
return shuffled
}
func ReviewedRecommendationLinks(items []AIRecommendation) map[string]bool {
seen := map[string]bool{}
for _, item := range items {
if item.Link == "" {
continue
}
seen[item.Link] = true
}
return seen
}
func RemainingGeminiCapacity(reviewed []AIRecommendation) int {
remaining := GeminiCandidateLimit(16) - len(ReviewedRecommendationLinks(reviewed))
if remaining < 0 {
return 0
}
return remaining
}
func SelectUnevaluatedCandidates(ranked []SearchResult, reviewedLinks map[string]bool, limit int) []SearchResult {
if limit <= 0 {
return []SearchResult{}
}
selected := make([]SearchResult, 0, min(limit, len(ranked)))
for _, item := range ranked {
if len(selected) >= limit {
break
}
if item.Link == "" || reviewedLinks[item.Link] {
continue
}
selected = append(selected, item)
}
return selected
}
func MergeUniqueRecommendations(base, extra []AIRecommendation) []AIRecommendation {
merged := make([]AIRecommendation, 0, len(base)+len(extra))
seen := map[string]bool{}
for _, item := range append(base, extra...) {
if item.Link == "" || seen[item.Link] {
continue
}
seen[item.Link] = true
merged = append(merged, item)
}
return merged
}
func MergeGeminiBatchStats(base, extra GeminiBatchStats) GeminiBatchStats {
merged := base
merged.CandidateCap += extra.CandidateCap
merged.Requested += extra.Requested
merged.Batches += extra.Batches
merged.Succeeded += extra.Succeeded
merged.Failed += extra.Failed
merged.SequentialRetried += extra.SequentialRetried
merged.RecommendedCount += extra.RecommendedCount
merged.Errors = append(append([]string{}, base.Errors...), extra.Errors...)
if len(merged.Errors) > 5 {
merged.Errors = merged.Errors[:5]
}
return merged
}
func recoverGeminiBatchSequentially(service *GeminiService, query string, ranked []SearchResult, startIndex int) ([]AIRecommendation, []string) {
recovered := make([]AIRecommendation, 0, 8)
errs := make([]string, 0, 4)
@@ -270,11 +336,11 @@ func recoverGeminiBatchSequentially(service *GeminiService, query string, ranked
if len(errs) < 4 {
errs = append(errs, err.Error())
}
time.Sleep(350 * time.Millisecond)
time.Sleep(150 * time.Millisecond)
continue
}
recovered = append(recovered, recs...)
time.Sleep(350 * time.Millisecond)
time.Sleep(150 * time.Millisecond)
}
return recovered, errs
}