165 lines
4.5 KiB
Python
165 lines
4.5 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from typing import List
|
|
|
|
|
|
def emit(status, progress, message="", output=""):
|
|
payload = {
|
|
"status": status,
|
|
"progress": progress,
|
|
"message": message,
|
|
}
|
|
if output:
|
|
payload["output"] = output
|
|
print(json.dumps(payload), flush=True)
|
|
|
|
|
|
def run(cmd):
|
|
proc = subprocess.run(cmd, capture_output=True, text=True)
|
|
if proc.returncode != 0:
|
|
raise RuntimeError(proc.stderr.strip() or proc.stdout.strip() or "command failed")
|
|
return proc
|
|
|
|
|
|
def parse_duration(value):
|
|
if value is None:
|
|
return "00:00:00"
|
|
total = int(float(value))
|
|
hours = total // 3600
|
|
minutes = (total % 3600) // 60
|
|
seconds = total % 60
|
|
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
|
|
|
|
|
|
def format_label(height, ext):
|
|
if height:
|
|
return f"{height}p ({ext})"
|
|
return f"Best ({ext})"
|
|
|
|
|
|
def build_quality_options(formats: List[dict]):
|
|
heights = []
|
|
for item in formats:
|
|
height = item.get("height")
|
|
if isinstance(height, int) and height > 0:
|
|
heights.append(height)
|
|
|
|
unique_heights = sorted(set(heights))
|
|
options = [{"value": "best", "label": "Best available"}]
|
|
for height in unique_heights:
|
|
options.append(
|
|
{
|
|
"value": f"bestvideo[height<={height}]+bestaudio/best[height<={height}]",
|
|
"label": f"Up to {height}p",
|
|
}
|
|
)
|
|
return options
|
|
|
|
|
|
def preview_stream_url(url):
|
|
candidates = [
|
|
"best[ext=mp4]/best",
|
|
"best",
|
|
]
|
|
for selector in candidates:
|
|
proc = subprocess.run(
|
|
["yt-dlp", "-g", "--no-playlist", "-f", selector, url],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if proc.returncode == 0:
|
|
lines = [line.strip() for line in proc.stdout.splitlines() if line.strip()]
|
|
if lines:
|
|
return lines[0]
|
|
return ""
|
|
|
|
|
|
def probe(url):
|
|
cmd = ["yt-dlp", "--dump-single-json", "--no-playlist", url]
|
|
proc = run(cmd)
|
|
payload = json.loads(proc.stdout)
|
|
thumbnail = payload.get("thumbnail") or ""
|
|
duration = payload.get("duration")
|
|
formats = payload.get("formats") or []
|
|
preview = {
|
|
"title": payload.get("title") or "Untitled",
|
|
"thumbnail": thumbnail,
|
|
"previewStreamUrl": preview_stream_url(url),
|
|
"durationSeconds": duration or 0,
|
|
"duration": parse_duration(duration),
|
|
"startDefault": "00:00:00",
|
|
"endDefault": parse_duration(duration),
|
|
"qualities": build_quality_options(formats),
|
|
}
|
|
print(json.dumps(preview), flush=True)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--mode", choices=["probe", "download"], default="download")
|
|
parser.add_argument("--url", required=True)
|
|
parser.add_argument("--start", default="00:00:00")
|
|
parser.add_argument("--end", default="00:00:00")
|
|
parser.add_argument("--output", default="")
|
|
parser.add_argument("--quality", default="best")
|
|
args = parser.parse_args()
|
|
|
|
if args.mode == "probe":
|
|
probe(args.url)
|
|
return
|
|
|
|
if not args.output:
|
|
raise RuntimeError("output path is required for download mode")
|
|
os.makedirs(os.path.dirname(args.output), exist_ok=True)
|
|
emit("starting", 5, "Resolving media stream")
|
|
|
|
with tempfile.TemporaryDirectory(prefix="aihub-") as tmpdir:
|
|
source_path = os.path.join(tmpdir, "source.%(ext)s")
|
|
download_cmd = [
|
|
"yt-dlp",
|
|
"--no-playlist",
|
|
"-f",
|
|
args.quality,
|
|
"-o",
|
|
source_path,
|
|
args.url,
|
|
]
|
|
run(download_cmd)
|
|
emit("downloaded", 55, "Source downloaded")
|
|
|
|
files = [os.path.join(tmpdir, name) for name in os.listdir(tmpdir)]
|
|
if not files:
|
|
raise RuntimeError("yt-dlp did not produce an output file")
|
|
source_file = sorted(files)[0]
|
|
|
|
ffmpeg_cmd = [
|
|
"ffmpeg",
|
|
"-y",
|
|
"-ss",
|
|
args.start,
|
|
"-to",
|
|
args.end,
|
|
"-i",
|
|
source_file,
|
|
"-c",
|
|
"copy",
|
|
args.output,
|
|
]
|
|
emit("cropping", 75, "Cropping requested segment")
|
|
run(ffmpeg_cmd)
|
|
|
|
emit("completed", 100, "Download complete", args.output)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except Exception as exc:
|
|
emit("error", 100, str(exc))
|
|
sys.exit(1)
|