Add YouTube provider

This commit is contained in:
hlohaus 2025-07-14 08:49:12 +02:00
parent 89a45f38b6
commit c718011cb8
5 changed files with 138 additions and 1 deletions

126
g4f/Provider/YouTube.py Normal file
View file

@ -0,0 +1,126 @@
from __future__ import annotations
import os
try:
import yt_dlp
has_yt_dlp = True
except ImportError:
has_yt_dlp = False
from ..typing import AsyncResult, Messages
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
from ..providers.response import AudioResponse, VideoResponse
from ..image.copy_images import get_media_dir
from .helper import format_media_prompt
class YouTube(AsyncGeneratorProvider, ProviderModelMixin):
url = "https://youtube.com"
working = has_yt_dlp
use_nodriver = True
@classmethod
async def create_async_generator(
cls,
model: str,
messages: Messages,
prompt: str = None,
**kwargs
) -> AsyncResult:
prompt = format_media_prompt(messages, prompt)
provider = YouTubeProvider()
results = await provider.search(prompt, max_results=1)
if results:
video_url = results[0]['url']
path = await provider.download(video_url, model="mp3", output_dir=get_media_dir())
if path.endswith('.mp3'):
yield AudioResponse(f"/media/{os.path.basename(path)}")
else:
yield VideoResponse(f"/media/{os.path.basename(path)}", prompt)
class YouTubeProvider:
"""
Search and download YouTube videos.
model: "mp3" for audio only, or "high-definition" for best video
"""
def __init__(self):
pass
async def search(self, query: str, max_results: int = 5) -> list[dict]:
"""
Search YouTube for videos matching the query.
Returns a list of dicts with keys: title, url, id, duration
"""
ydl_opts = {
'quiet': True,
'extract_flat': True,
'skip_download': True,
}
search_url = f"ytsearch{max_results}:{query}"
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(search_url, download=False)
results = []
for entry in info.get('entries', []):
results.append({
'title': entry.get('title'),
'url': f"https://www.youtube.com/watch?v={entry.get('id')}",
'id': entry.get('id'),
'duration': entry.get('duration'),
})
return results
async def download(self, video_url: str, model: str = "high-definition", output_dir: str = ".") -> str:
"""
Download a YouTube video.
:param video_url: The video URL or video id
:param model: "mp3" for audio, "high-definition" for best video
:param output_dir: Download location
:return: The path to the downloaded file
"""
ydl_opts = {
'outtmpl': f"{output_dir}/%(title)s.%(ext)s",
'quiet': True,
}
if model == "mp3":
# Audio only, best quality
ydl_opts.update({
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192'
}]
})
elif model == "high-definition":
# Best video+audio
ydl_opts.update({
'format': 'bestvideo+bestaudio/best',
'merge_output_format': 'mp4',
})
else:
raise ValueError("model must be 'mp3' or 'high-definition'")
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
result = ydl.download([video_url])
return ydl.prepare_filename(ydl.extract_info(video_url, download=True)).replace('.webm', '.mp3')
# You can get actual file path via ydl.prepare_filename
# This is a simplified return - usually, you would parse the output or check the directory
return output_dir
# Example usage (async function to test)
async def demo():
provider = YouTubeProvider()
results = await provider.search("Python programming", max_results=2)
print("Search results:", results)
if results:
video_url = results[0]['url']
path = await provider.download(video_url, model="mp3")
print("Downloaded to:", path)
# To actually run demo()
# asyncio.run(demo())

View file

@ -53,6 +53,7 @@ from .Startnest import Startnest
from .TeachAnything import TeachAnything
from .Together import Together
from .WeWordle import WeWordle
from .YouTube import YouTube
from .Yqcloud import Yqcloud
import sys

View file

@ -131,7 +131,15 @@ class Video(AsyncGeneratorProvider, ProviderModelMixin):
for _, urls in RequestConfig.urls.items():
if event.request.url in urls:
return
debug.log(f"Adding URL: {event.request.url}")
RequestConfig.urls[prompt].append(event.request.url)
for idx in range(300):
button = await page.find("User menu")
if button:
break
if idx == 299:
stop_browser()
raise RuntimeError("Failed to wait for user menu.")
if model == "search" and page is not None:
await page.send(nodriver.cdp.network.enable())
page.add_handler(nodriver.cdp.network.RequestWillBeSent, on_request)

View file

@ -758,7 +758,7 @@ class Api:
if m:
return int(m.group(0))
else:
raise ValueError("No timestamp found in filename")
return 0
target = os.path.join(get_media_dir(), os.path.basename(filename))
if thumbnail and has_pillow:
thumbnail_dir = os.path.join(get_media_dir(), "thumbnails")

View file

@ -208,6 +208,8 @@ async def get_nodriver(
try:
if browser.connection:
browser.stop()
except:
pass
finally:
lock_file.unlink(missing_ok=True)
BrowserConfig.stop_browser = on_stop