Files
AI Assistant 6f3149a443
build-push / docker (push) Successful in 4m15s
Add local self-test flow and fix fallback regressions
2026-03-13 18:09:32 +09:00

82 lines
2.6 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import json
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import parse_qs, urlparse
def build_results(engine: str, query: str):
lowered = (engine or "").lower()
if "google" in lowered and "video" in lowered:
return [
{
"title": f"{query.title()} cinematic b-roll",
"url": "https://www.youtube.com/watch?v=abcdefghijk",
"content": f"{query} stock footage edit reference",
"thumbnail": "https://i.ytimg.com/vi/abcdefghijk/hqdefault.jpg",
"engine": engine,
}
]
return [
{
"title": f"{query.title()} envato clip",
"url": "https://elements.envato.com/city-rain-b-roll-AB12CD",
"content": f"{query} stock footage cinematic scene",
"thumbnail": "https://images.example.com/envato-city-rain.jpg",
"thumbnail_src": "https://images.example.com/envato-city-rain.jpg",
"engine": engine,
},
{
"title": f"{query.title()} artgrid clip",
"url": "https://artgrid.io/clip/123456/city-rain-night",
"content": f"{query} editorial footage establishing shot",
"thumbnail": "https://images.example.com/artgrid-city-rain.jpg",
"thumbnail_src": "https://images.example.com/artgrid-city-rain.jpg",
"img_src": "https://images.example.com/artgrid-city-rain.jpg",
"engine": engine,
},
]
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
parsed = urlparse(self.path)
if parsed.path != "/search":
self.send_response(404)
self.end_headers()
return
params = parse_qs(parsed.query)
query = params.get("q", ["city rain"])[0]
engine = params.get("engines", [""])[0]
payload = {"results": build_results(engine, query)}
body = json.dumps(payload).encode("utf-8")
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, format, *args):
return
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--port", type=int, default=18080)
args = parser.parse_args()
server = ThreadingHTTPServer(("127.0.0.1", args.port), Handler)
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
server.server_close()
if __name__ == "__main__":
main()