mirror of
https://github.com/xtekky/gpt4free.git
synced 2025-12-15 14:51:19 -08:00
- Rename DeepInfraChat to DeepInfra across all files - Move DeepInfra from needs_auth to main Provider directory - Rename LMArenaBeta to LMArena throughout codebase - Move search-related providers to new search subdirectory (GoogleSearch, SearXNG, YouTube) - Move deprecated providers to not_working directory (Free2GPT, LegacyLMArena, PenguinAI, ImageLabs, har) - Add new Mintlify provider with custom AI assistant implementation - Update Anthropic provider with Claude 4 models and Opus 4.1 parameter handling - Update Grok provider with Grok 4 models and improved streaming support - Update GithubCopilot with expanded model list including o3-mini, o4-mini, gpt-5 previews - Update LambdaChat default model from deepseek-r1 to deepseek-llama3.3-70b - Update TeachAnything default model from gemini-1.5-pro to gemma - Remove DeepInfra from needs_auth directory - Update all model_map references from DeepInfraChat to DeepInfra - Update all model_map references from LMArenaBeta to LMArena - Add beta_headers support to Anthropic for special features - Improve Mintlify provider with system prompt handling and streaming - Update model configurations in models.py to reflect provider changes
159 lines
5.6 KiB
Python
159 lines
5.6 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
|
|
try:
|
|
import yt_dlp
|
|
has_yt_dlp = True
|
|
except ImportError:
|
|
has_yt_dlp = False
|
|
|
|
from ...typing import AsyncResult, Messages
|
|
from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin
|
|
from ...providers.response import AudioResponse, VideoResponse, YouTubeResponse
|
|
from ...image.copy_images import get_media_dir
|
|
from ..helper import format_media_prompt
|
|
|
|
class YouTube(AsyncGeneratorProvider, ProviderModelMixin):
|
|
url = "https://youtube.com"
|
|
working = has_yt_dlp
|
|
|
|
default_model = "search"
|
|
models = ["mp3", "1080p", "720p", "480p", "search"]
|
|
|
|
@classmethod
|
|
async def create_async_generator(
|
|
cls,
|
|
model: str,
|
|
messages: Messages,
|
|
prompt: str = None,
|
|
**kwargs
|
|
) -> AsyncResult:
|
|
prompt = format_media_prompt(messages, prompt)
|
|
results = [{
|
|
"id": line.split("?v=")[-1].split("&")[0],
|
|
"url": line
|
|
} for line in prompt.splitlines()
|
|
if line.startswith("https://www.youtube.com/watch?v=")]
|
|
provider = YouTubeProvider()
|
|
if not results:
|
|
results = await provider.search(prompt, max_results=10)
|
|
new_results = []
|
|
for result in results:
|
|
video_url = result['url']
|
|
has_video = False
|
|
for message in messages:
|
|
if isinstance(message.get("content"), str):
|
|
if video_url in message["content"] and (model == "search" or model in message["content"]):
|
|
has_video = True
|
|
break
|
|
if has_video:
|
|
continue
|
|
new_results.append(result)
|
|
if model == "search":
|
|
yield YouTubeResponse([result["id"] for result in new_results[:5]], True)
|
|
else:
|
|
if new_results:
|
|
video_url = new_results[0]['url']
|
|
path = await provider.download(video_url, model=model, output_dir=get_media_dir())
|
|
if path.endswith('.mp3'):
|
|
yield AudioResponse(f"/media/{os.path.basename(path)}")
|
|
else:
|
|
yield VideoResponse(f"/media/{os.path.basename(path)}", prompt)
|
|
yield f"\n\n[{video_url}]({video_url})\n\n"
|
|
|
|
class YouTubeProvider:
|
|
"""
|
|
Search and download YouTube videos.
|
|
|
|
model: "mp3" for audio only, or "high-definition" for best video
|
|
"""
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
async def search(self, query: str, max_results: int = 5) -> list[dict]:
|
|
"""
|
|
Search YouTube for videos matching the query.
|
|
|
|
Returns a list of dicts with keys: title, url, id, duration
|
|
"""
|
|
ydl_opts = {
|
|
'quiet': True,
|
|
'extract_flat': True,
|
|
'skip_download': True,
|
|
}
|
|
search_url = f"ytsearch{max_results}:{query}"
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(search_url, download=False)
|
|
results = []
|
|
for entry in info.get('entries', []):
|
|
results.append({
|
|
'title': entry.get('title'),
|
|
'url': f"https://www.youtube.com/watch?v={entry.get('id')}",
|
|
'id': entry.get('id'),
|
|
'duration': entry.get('duration'),
|
|
})
|
|
return results
|
|
|
|
async def download(self, video_url: str, model: str = "720p", output_dir: str = ".") -> str:
|
|
"""
|
|
Download a YouTube video.
|
|
|
|
:param video_url: The video URL or video id
|
|
:param model: "mp3" for audio, "high-definition" for best video
|
|
:param output_dir: Download location
|
|
:return: The path to the downloaded file
|
|
"""
|
|
ydl_opts = {
|
|
'outtmpl': f"{output_dir}/%(title)s{'' if model == 'mp3' else (' ' + model)}.%(ext)s",
|
|
'quiet': True,
|
|
}
|
|
if model == "mp3":
|
|
# Audio only, best quality
|
|
ydl_opts.update({
|
|
'format': 'bestaudio/best',
|
|
'postprocessors': [{
|
|
'key': 'FFmpegExtractAudio',
|
|
'preferredcodec': 'mp3',
|
|
'preferredquality': '192'
|
|
}]
|
|
})
|
|
elif model == "1080p":
|
|
ydl_opts.update({
|
|
'format': 'bestvideo[height<=1080]+bestaudio/best[height<=1080]',
|
|
'merge_output_format': 'mp4',
|
|
})
|
|
elif model == "720p":
|
|
ydl_opts.update({
|
|
'format': 'bestvideo[height=720]+bestaudio/best[height=720]',
|
|
'merge_output_format': 'mp4',
|
|
})
|
|
elif model == "480p":
|
|
ydl_opts.update({
|
|
'format': 'bestvideo[height<=480]+bestaudio/best[height<=480]',
|
|
'merge_output_format': 'mp4',
|
|
})
|
|
else:
|
|
raise ValueError("model must be 'mp3' or 'high-definition'")
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
result = ydl.download([video_url])
|
|
return ydl.prepare_filename(ydl.extract_info(video_url, download=True)).replace('.webm', '.mp3' if model == "mp3" else '.webm')
|
|
# You can get actual file path via ydl.prepare_filename
|
|
# This is a simplified return - usually, you would parse the output or check the directory
|
|
return output_dir
|
|
|
|
# Example usage (async function to test)
|
|
|
|
async def demo():
|
|
provider = YouTubeProvider()
|
|
results = await provider.search("Python programming", max_results=2)
|
|
print("Search results:", results)
|
|
if results:
|
|
video_url = results[0]['url']
|
|
path = await provider.download(video_url, model="mp3")
|
|
print("Downloaded to:", path)
|
|
|
|
# To actually run demo()
|
|
# asyncio.run(demo())
|