This commit is contained in:
+32
-6
@@ -53,6 +53,8 @@ type SearchExecutionMeta struct {
|
||||
PartialDueToDeadline bool `json:"partialDueToDeadline"`
|
||||
}
|
||||
|
||||
const searchEnrichmentReserve = 4 * time.Second
|
||||
|
||||
func NewSearchService(baseURL, googleVideoEngine, webEngine string) *SearchService {
|
||||
if googleVideoEngine == "" {
|
||||
googleVideoEngine = "google videos"
|
||||
@@ -84,6 +86,7 @@ func (s *SearchService) SearchMediaWithDeadline(queries []string, enabledPlatfor
|
||||
if s.BaseURL == "" {
|
||||
return nil, meta, fmt.Errorf("searxng base url is not configured")
|
||||
}
|
||||
collectionDeadline, enrichmentDeadline := splitSearchDeadlines(deadline)
|
||||
s.debug("search_service:start", map[string]any{
|
||||
"queries": queries,
|
||||
"enabledPlatforms": enabledPlatforms,
|
||||
@@ -93,13 +96,13 @@ func (s *SearchService) SearchMediaWithDeadline(queries []string, enabledPlatfor
|
||||
sourceCounts := map[string]int{}
|
||||
results := make([]SearchResult, 0, 90)
|
||||
var lastErr error
|
||||
collectorZeroStreak := map[string]int{}
|
||||
|
||||
baseQueries := limitQueries(queries, 8)
|
||||
shuffleStrings(baseQueries)
|
||||
primaryQueries := baseQueries[:minInt(len(baseQueries), 3)]
|
||||
runSearchPass := func(bases []string, onlyMissing bool) {
|
||||
for _, base := range bases {
|
||||
if !deadline.IsZero() && time.Now().After(deadline) {
|
||||
if !collectionDeadline.IsZero() && time.Now().After(collectionDeadline) {
|
||||
meta.PartialDueToDeadline = true
|
||||
s.debug("search_service:deadline_reached", map[string]any{"stage": "runSearchPass", "base": base})
|
||||
return
|
||||
@@ -109,7 +112,7 @@ func (s *SearchService) SearchMediaWithDeadline(queries []string, enabledPlatfor
|
||||
continue
|
||||
}
|
||||
for _, collector := range s.collectors {
|
||||
if !deadline.IsZero() && time.Now().After(deadline) {
|
||||
if !collectionDeadline.IsZero() && time.Now().After(collectionDeadline) {
|
||||
meta.PartialDueToDeadline = true
|
||||
s.debug("search_service:deadline_reached", map[string]any{"stage": "collectorLoop", "collector": collector.Name()})
|
||||
return
|
||||
@@ -124,7 +127,6 @@ func (s *SearchService) SearchMediaWithDeadline(queries []string, enabledPlatfor
|
||||
continue
|
||||
}
|
||||
searchQueries := collector.BuildQueries(base)
|
||||
shuffleStrings(searchQueries)
|
||||
searchQueries = limitCollectorQueries(collector.Name(), searchQueries, onlyMissing)
|
||||
s.debug("search_service:collector_queries", map[string]any{
|
||||
"collector": collector.Name(),
|
||||
@@ -133,7 +135,7 @@ func (s *SearchService) SearchMediaWithDeadline(queries []string, enabledPlatfor
|
||||
"searchQueries": searchQueries,
|
||||
})
|
||||
for _, searchQuery := range searchQueries {
|
||||
if !deadline.IsZero() && time.Now().After(deadline) {
|
||||
if !collectionDeadline.IsZero() && time.Now().After(collectionDeadline) {
|
||||
meta.PartialDueToDeadline = true
|
||||
s.debug("search_service:deadline_reached", map[string]any{"stage": "queryLoop", "collector": collector.Name(), "query": searchQuery})
|
||||
return
|
||||
@@ -157,6 +159,11 @@ func (s *SearchService) SearchMediaWithDeadline(queries []string, enabledPlatfor
|
||||
"rawCount": len(items),
|
||||
"sourceCount": sourceCounts[collector.Name()],
|
||||
})
|
||||
if len(items) == 0 && sourceCounts[collector.Name()] == 0 {
|
||||
collectorZeroStreak[collector.Name()]++
|
||||
} else {
|
||||
collectorZeroStreak[collector.Name()] = 0
|
||||
}
|
||||
for _, item := range items {
|
||||
item = normalizeResultForCollector(collector.Name(), item)
|
||||
if item.Link == "" || seen[item.Link] || !collector.Accept(item) {
|
||||
@@ -169,6 +176,14 @@ func (s *SearchService) SearchMediaWithDeadline(queries []string, enabledPlatfor
|
||||
break
|
||||
}
|
||||
}
|
||||
if collectorZeroStreak[collector.Name()] >= 2 && sourceCounts[collector.Name()] == 0 {
|
||||
s.debug("search_service:collector_skip_after_zero_streak", map[string]any{
|
||||
"collector": collector.Name(),
|
||||
"base": base,
|
||||
"streak": collectorZeroStreak[collector.Name()],
|
||||
})
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -192,11 +207,22 @@ func (s *SearchService) SearchMediaWithDeadline(queries []string, enabledPlatfor
|
||||
"hadError": lastErr != nil,
|
||||
"partialDueToDeadline": meta.PartialDueToDeadline,
|
||||
})
|
||||
enriched, enrichMeta := s.EnrichResultsWithDeadline(results, deadline)
|
||||
enriched, enrichMeta := s.EnrichResultsWithDeadline(results, enrichmentDeadline)
|
||||
meta.PartialDueToDeadline = meta.PartialDueToDeadline || enrichMeta.PartialDueToDeadline
|
||||
return enriched, meta, nil
|
||||
}
|
||||
|
||||
func splitSearchDeadlines(deadline time.Time) (time.Time, time.Time) {
|
||||
if deadline.IsZero() {
|
||||
return time.Time{}, time.Time{}
|
||||
}
|
||||
remaining := time.Until(deadline)
|
||||
if remaining <= searchEnrichmentReserve {
|
||||
return deadline, deadline
|
||||
}
|
||||
return deadline.Add(-searchEnrichmentReserve), deadline
|
||||
}
|
||||
|
||||
func (s *SearchService) EnrichResults(results []SearchResult) []SearchResult {
|
||||
enriched, _ := s.EnrichResultsWithDeadline(results, time.Time{})
|
||||
return enriched
|
||||
|
||||
@@ -182,6 +182,27 @@ func TestSearchServiceFetchCacheRoundTrip(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitSearchDeadlinesReservesEnrichmentWindow(t *testing.T) {
|
||||
deadline := time.Now().Add(20 * time.Second)
|
||||
collectionDeadline, enrichmentDeadline := splitSearchDeadlines(deadline)
|
||||
|
||||
if enrichmentDeadline.IsZero() {
|
||||
t.Fatal("expected enrichment deadline to be preserved")
|
||||
}
|
||||
if !collectionDeadline.Before(enrichmentDeadline) {
|
||||
t.Fatalf("expected collection deadline before enrichment deadline, got %v >= %v", collectionDeadline, enrichmentDeadline)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitSearchDeadlinesDoesNotReserveWhenBudgetTooSmall(t *testing.T) {
|
||||
deadline := time.Now().Add(2 * time.Second)
|
||||
collectionDeadline, enrichmentDeadline := splitSearchDeadlines(deadline)
|
||||
|
||||
if !collectionDeadline.Equal(enrichmentDeadline) {
|
||||
t.Fatalf("expected identical deadlines, got %v and %v", collectionDeadline, enrichmentDeadline)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSearchServiceSkipsArtgridAPIAfter403(t *testing.T) {
|
||||
var apiRequests atomic.Int32
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
@@ -47,7 +47,7 @@ func (artgridCollector) Enrich(searcher *SearchService, result SearchResult) Sea
|
||||
type googleVideoCollector struct{}
|
||||
|
||||
func (googleVideoCollector) Name() string { return "Google Video" }
|
||||
func (googleVideoCollector) MaxResults() int { return 8 }
|
||||
func (googleVideoCollector) MaxResults() int { return 12 }
|
||||
func (googleVideoCollector) Enabled(enabledPlatforms map[string]bool) bool {
|
||||
return len(enabledPlatforms) == 0 || enabledPlatforms["google video"]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user