Files
ai-media-hub/worker/downloader.py
AI Assistant d7506c041a
Some checks failed
build-push / docker (push) Has been cancelled
Initial AI media hub implementation
2026-03-12 15:01:18 +09:00

83 lines
2.1 KiB
Python

#!/usr/bin/env python3
import argparse
import json
import os
import subprocess
import sys
import tempfile
def emit(status, progress, message="", output=""):
payload = {
"status": status,
"progress": progress,
"message": message,
}
if output:
payload["output"] = output
print(json.dumps(payload), flush=True)
def run(cmd):
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.strip() or proc.stdout.strip() or "command failed")
return proc
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--url", required=True)
parser.add_argument("--start", default="00:00:00")
parser.add_argument("--end", default="00:00:10")
parser.add_argument("--output", required=True)
args = parser.parse_args()
os.makedirs(os.path.dirname(args.output), exist_ok=True)
emit("starting", 5, "Resolving media stream")
with tempfile.TemporaryDirectory(prefix="aihub-") as tmpdir:
source_path = os.path.join(tmpdir, "source.%(ext)s")
download_cmd = [
"yt-dlp",
"--no-playlist",
"-f",
"mp4/bestvideo*+bestaudio/best",
"-o",
source_path,
args.url,
]
run(download_cmd)
emit("downloaded", 55, "Source downloaded")
files = [os.path.join(tmpdir, name) for name in os.listdir(tmpdir)]
if not files:
raise RuntimeError("yt-dlp did not produce an output file")
source_file = sorted(files)[0]
ffmpeg_cmd = [
"ffmpeg",
"-y",
"-ss",
args.start,
"-to",
args.end,
"-i",
source_file,
"-c",
"copy",
args.output,
]
emit("cropping", 75, "Cropping requested segment")
run(ffmpeg_cmd)
emit("completed", 100, "Download complete", args.output)
if __name__ == "__main__":
try:
main()
except Exception as exc:
emit("error", 100, str(exc))
sys.exit(1)