gpt4free/g4f/image/copy_images.py
hlohaus c97ba0c88e Add audio transcribing example and support
Add Grok Chat provider
Rename images parameter to media
Update demo homepage
2025-03-21 03:17:45 +01:00

131 lines
5.1 KiB
Python

from __future__ import annotations
import os
import time
import uuid
import asyncio
import hashlib
import re
from urllib.parse import quote, unquote
from aiohttp import ClientSession, ClientError
from ..typing import Optional, Cookies
from ..requests.aiohttp import get_connector
from ..Provider.template import BackendApi
from . import is_accepted_format, extract_data_uri
from .. import debug
# Directory for storing generated images
images_dir = "./generated_images"
def get_image_extension(image: str) -> str:
"""Extract image extension from URL or filename, default to .jpg"""
match = re.search(r"\.(jpe?g|png|webp)$", image, re.IGNORECASE)
return f".{match.group(1).lower()}" if match else ".jpg"
def ensure_images_dir():
"""Create images directory if it doesn't exist"""
os.makedirs(images_dir, exist_ok=True)
def get_source_url(image: str, default: str = None) -> str:
"""Extract original URL from image parameter if present"""
if "url=" in image:
decoded_url = unquote(image.split("url=", 1)[1])
if decoded_url.startswith(("http://", "https://")):
return decoded_url
return default
async def copy_images(
images: list[str],
cookies: Optional[Cookies] = None,
headers: Optional[dict] = None,
proxy: Optional[str] = None,
alt: str = None,
add_url: bool = True,
target: str = None,
ssl: bool = None
) -> list[str]:
"""
Download and store images locally with Unicode-safe filenames
Returns list of relative image URLs
"""
if add_url:
add_url = not cookies
ensure_images_dir()
async with ClientSession(
connector=get_connector(proxy=proxy),
cookies=cookies,
headers=headers,
) as session:
async def copy_image(image: str, target: str = None) -> str:
"""Process individual image and return its local URL"""
target_path = target
if target_path is None:
# Generate filename components
file_hash = hashlib.sha256(image.encode()).hexdigest()[:16]
timestamp = int(time.time())
# Sanitize alt text for filename (Unicode-safe)
if alt:
# Keep letters, numbers, basic punctuation and all Unicode chars
clean_alt = re.sub(
r'[^\w\s.-]', # Allow all Unicode word chars
'_',
unquote(alt).strip(),
flags=re.UNICODE
)
clean_alt = re.sub(r'[\s_]+', '_', clean_alt)[:100]
else:
clean_alt = "image"
# Build safe filename with full Unicode support
extension = get_image_extension(image)
filename = (
f"{timestamp}_"
f"{clean_alt}_"
f"{file_hash}"
f"{extension}"
)
target_path = os.path.join(images_dir, filename)
try:
# Handle different image types
if image.startswith("data:"):
with open(target_path, "wb") as f:
f.write(extract_data_uri(image))
else:
# Apply BackendApi settings if needed
if BackendApi.working and image.startswith(BackendApi.url):
request_headers = BackendApi.headers if headers is None else headers
request_ssl = BackendApi.ssl
else:
request_headers = headers
request_ssl = ssl
async with session.get(image, ssl=request_ssl, headers=request_headers) as response:
response.raise_for_status()
with open(target_path, "wb") as f:
async for chunk in response.content.iter_chunked(4096):
f.write(chunk)
# Verify file format
if target is None and not os.path.splitext(target_path)[1]:
with open(target_path, "rb") as f:
file_header = f.read(12)
detected_type = is_accepted_format(file_header)
if detected_type:
new_ext = f".{detected_type.split('/')[-1]}"
os.rename(target_path, f"{target_path}{new_ext}")
target_path = f"{target_path}{new_ext}"
# Build URL with safe encoding
url_filename = quote(os.path.basename(target_path))
return f"/images/{url_filename}" + (('?url=' + quote(image)) if add_url and not image.startswith('data:') else '')
except (ClientError, IOError, OSError) as e:
debug.error(f"Image copying failed: {type(e).__name__}: {e}")
if target_path and os.path.exists(target_path):
os.unlink(target_path)
return get_source_url(image, image)
return await asyncio.gather(*[copy_image(img, target) for img in images])