diff --git a/g4f/__init__.py b/g4f/__init__.py index 6cd2606a..b3d7d2bb 100644 --- a/g4f/__init__.py +++ b/g4f/__init__.py @@ -13,35 +13,38 @@ from .providers.types import ProviderType from .providers.helper import concat_chunks, async_concat_chunks from .client.service import get_model_and_provider -#Configure "g4f" logger -logger = logging.getLogger(__name__) -log_handler = logging.StreamHandler() -log_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT)) -logger.addHandler(log_handler) - +# Configure logger +logger = logging.getLogger("g4f") +handler = logging.StreamHandler() +handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT)) +logger.addHandler(handler) logger.setLevel(logging.ERROR) + class ChatCompletion: @staticmethod - def create(model : Union[Model, str], - messages : Messages, - provider : Union[ProviderType, str, None] = None, - stream : bool = False, - image : ImageType = None, - image_name: Optional[str] = None, - ignore_working: bool = False, - ignore_stream: bool = False, - **kwargs) -> Union[CreateResult, str]: + def _prepare_request(model: Union[Model, str], + messages: Messages, + provider: Union[ProviderType, str, None], + stream: bool, + image: ImageType, + image_name: Optional[str], + ignore_working: bool, + ignore_stream: bool, + **kwargs): + """Shared pre-processing for sync/async create methods.""" if image is not None: kwargs["media"] = [(image, image_name)] elif "images" in kwargs: kwargs["media"] = kwargs.pop("images") + model, provider = get_model_and_provider( model, provider, stream, ignore_working, ignore_stream, has_images="media" in kwargs, ) + if "proxy" not in kwargs: proxy = os.environ.get("G4F_PROXY") if proxy: @@ -49,36 +52,40 @@ class ChatCompletion: if ignore_stream: kwargs["ignore_stream"] = True - result = provider.create_function(model, messages, stream=stream, **kwargs) + return model, provider, kwargs + @staticmethod + def create(model: Union[Model, str], + messages: Messages, + provider: Union[ProviderType, str, None] = None, + stream: bool = False, + image: ImageType = None, + image_name: Optional[str] = None, + ignore_working: bool = False, + ignore_stream: bool = False, + **kwargs) -> Union[CreateResult, str]: + model, provider, kwargs = ChatCompletion._prepare_request( + model, messages, provider, stream, image, image_name, + ignore_working, ignore_stream, **kwargs + ) + result = provider.create_function(model, messages, stream=stream, **kwargs) return result if stream or ignore_stream else concat_chunks(result) @staticmethod - def create_async(model : Union[Model, str], - messages : Messages, - provider : Union[ProviderType, str, None] = None, - stream : bool = False, - image : ImageType = None, + def create_async(model: Union[Model, str], + messages: Messages, + provider: Union[ProviderType, str, None] = None, + stream: bool = False, + image: ImageType = None, image_name: Optional[str] = None, - ignore_stream: bool = False, ignore_working: bool = False, + ignore_stream: bool = False, **kwargs) -> Union[AsyncResult, Coroutine[str]]: - if image is not None: - kwargs["media"] = [(image, image_name)] - elif "images" in kwargs: - kwargs["media"] = kwargs.pop("images") - model, provider = get_model_and_provider(model, provider, False, ignore_working, has_images="media" in kwargs) - if "proxy" not in kwargs: - proxy = os.environ.get("G4F_PROXY") - if proxy: - kwargs["proxy"] = proxy - if ignore_stream: - kwargs["ignore_stream"] = True - + model, provider, kwargs = ChatCompletion._prepare_request( + model, messages, provider, stream, image, image_name, + ignore_working, ignore_stream, **kwargs + ) result = provider.async_create_function(model, messages, stream=stream, **kwargs) - - if not stream and not ignore_stream: - if hasattr(result, "__aiter__"): - result = async_concat_chunks(result) - + if not stream and not ignore_stream and hasattr(result, "__aiter__"): + result = async_concat_chunks(result) return result \ No newline at end of file diff --git a/g4f/config.py b/g4f/config.py index e3af2ac3..30503b88 100644 --- a/g4f/config.py +++ b/g4f/config.py @@ -1,16 +1,19 @@ +from __future__ import annotations + import os import sys from pathlib import Path +from functools import lru_cache -# Platform-appropriate directories +@lru_cache(maxsize=1) def get_config_dir() -> Path: """Get platform-appropriate config directory.""" if sys.platform == "win32": return Path(os.environ.get("APPDATA", Path.home() / "AppData" / "Roaming")) elif sys.platform == "darwin": return Path.home() / "Library" / "Application Support" - else: # Linux and other UNIX-like - return Path.home() / ".config" + return Path.home() / ".config" + PACKAGE_NAME = "g4f" CONFIG_DIR = get_config_dir() / PACKAGE_NAME diff --git a/g4f/cookies.py b/g4f/cookies.py index 87716543..586975d5 100644 --- a/g4f/cookies.py +++ b/g4f/cookies.py @@ -3,12 +3,14 @@ from __future__ import annotations import os import time import json +from typing import Optional, List try: from platformdirs import user_config_dir has_platformdirs = True except ImportError: has_platformdirs = False + try: from browser_cookie3 import ( chrome, chromium, opera, opera_gx, @@ -19,12 +21,6 @@ try: def g4f(domain_name: str) -> list: """ Load cookies from the 'g4f' browser (if exists). - - Args: - domain_name (str): The domain for which to load cookies. - - Returns: - list: List of cookies. """ if not has_platformdirs: return [] @@ -32,7 +28,7 @@ try: cookie_file = os.path.join(user_data_dir, "Default", "Cookies") return [] if not os.path.exists(cookie_file) else chrome(cookie_file, domain_name) - browsers = [ + BROWSERS = [ g4f, firefox, chrome, chromium, opera, opera_gx, brave, edge, vivaldi, @@ -40,43 +36,38 @@ try: has_browser_cookie3 = True except ImportError: has_browser_cookie3 = False - browsers = [] + BROWSERS: List = [] from .typing import Dict, Cookies from .errors import MissingRequirementsError from .config import COOKIES_DIR, CUSTOM_COOKIES_DIR from . import debug -class CookiesConfig(): +class CookiesConfig: cookies: Dict[str, Cookies] = {} cookies_dir: str = CUSTOM_COOKIES_DIR if os.path.exists(CUSTOM_COOKIES_DIR) else str(COOKIES_DIR) -DOMAINS = [ + +DOMAINS = ( ".bing.com", ".meta.ai", ".google.com", "www.whiterabbitneo.com", "huggingface.co", - ".huggingface.co" + ".huggingface.co", "chat.reka.ai", "chatgpt.com", ".cerebras.ai", "github.com", -] +) -if has_browser_cookie3 and os.environ.get('DBUS_SESSION_BUS_ADDRESS') == "/dev/null": +if has_browser_cookie3 and os.environ.get("DBUS_SESSION_BUS_ADDRESS") == "/dev/null": _LinuxPasswordManager.get_password = lambda a, b: b"secret" -def get_cookies(domain_name: str, raise_requirements_error: bool = True, single_browser: bool = False, cache_result: bool = True) -> Dict[str, str]: - """ - Load cookies for a given domain from all supported browsers and cache the results. - Args: - domain_name (str): The domain for which to load cookies. - - Returns: - Dict[str, str]: A dictionary of cookie names and values. - """ +def get_cookies(domain_name: str, raise_requirements_error: bool = True, + single_browser: bool = False, cache_result: bool = True) -> Dict[str, str]: + """Load cookies for a given domain from all supported browsers.""" if domain_name in CookiesConfig.cookies: return CookiesConfig.cookies[domain_name] @@ -85,120 +76,134 @@ def get_cookies(domain_name: str, raise_requirements_error: bool = True, single_ CookiesConfig.cookies[domain_name] = cookies return cookies + def set_cookies(domain_name: str, cookies: Cookies = None) -> None: + """Set or remove cookies for a given domain in the cache.""" if cookies: CookiesConfig.cookies[domain_name] = cookies - elif domain_name in CookiesConfig.cookies: - CookiesConfig.cookies.pop(domain_name) + else: + CookiesConfig.cookies.pop(domain_name, None) -def load_cookies_from_browsers(domain_name: str, raise_requirements_error: bool = True, single_browser: bool = False) -> Cookies: - """ - Helper function to load cookies from various browsers. - Args: - domain_name (str): The domain for which to load cookies. - - Returns: - Dict[str, str]: A dictionary of cookie names and values. - """ +def load_cookies_from_browsers(domain_name: str, + raise_requirements_error: bool = True, + single_browser: bool = False) -> Cookies: + """Helper to load cookies from all supported browsers.""" if not has_browser_cookie3: if raise_requirements_error: raise MissingRequirementsError('Install "browser_cookie3" package') return {} + cookies = {} - for cookie_fn in browsers: + for cookie_fn in BROWSERS: try: cookie_jar = cookie_fn(domain_name=domain_name) - if len(cookie_jar): + if cookie_jar: debug.log(f"Read cookies from {cookie_fn.__name__} for {domain_name}") for cookie in cookie_jar: - if cookie.name not in cookies: - if not cookie.expires or cookie.expires > time.time(): - cookies[cookie.name] = cookie.value - if single_browser and len(cookie_jar): + if cookie.name not in cookies and (not cookie.expires or cookie.expires > time.time()): + cookies[cookie.name] = cookie.value + if single_browser and cookie_jar: break except BrowserCookieError: pass + except KeyboardInterrupt: + debug.error("Cookie loading interrupted by user.") + break except Exception as e: debug.error(f"Error reading cookies from {cookie_fn.__name__} for {domain_name}: {e}") return cookies -def set_cookies_dir(dir: str) -> None: - CookiesConfig.cookies_dir = dir + +def set_cookies_dir(dir_path: str) -> None: + CookiesConfig.cookies_dir = dir_path + def get_cookies_dir() -> str: return CookiesConfig.cookies_dir -def read_cookie_files(dirPath: str = None): - dirPath = CookiesConfig.cookies_dir if dirPath is None else dirPath - if not os.access(dirPath, os.R_OK): - debug.log(f"Read cookies: {dirPath} dir is not readable") + +def _parse_har_file(path: str) -> Dict[str, Dict[str, str]]: + """Parse a HAR file and return cookies by domain.""" + cookies_by_domain = {} + try: + with open(path, "rb") as file: + har_file = json.load(file) + debug.log(f"Read .har file: {path}") + + def get_domain(entry: dict) -> Optional[str]: + headers = entry["request"].get("headers", []) + host_values = [h["value"] for h in headers if h["name"].lower() in ("host", ":authority")] + if not host_values: + return None + host = host_values.pop() + return next((d for d in DOMAINS if d in host), None) + + for entry in har_file.get("log", {}).get("entries", []): + domain = get_domain(entry) + if domain: + v_cookies = {c["name"]: c["value"] for c in entry["request"].get("cookies", [])} + if v_cookies: + cookies_by_domain[domain] = v_cookies + except (json.JSONDecodeError, FileNotFoundError): + pass + return cookies_by_domain + + +def _parse_json_cookie_file(path: str) -> Dict[str, Dict[str, str]]: + """Parse a JSON cookie export file.""" + cookies_by_domain = {} + try: + with open(path, "rb") as file: + cookie_file = json.load(file) + if not isinstance(cookie_file, list): + return {} + debug.log(f"Read cookie file: {path}") + for c in cookie_file: + if isinstance(c, dict) and "domain" in c: + cookies_by_domain.setdefault(c["domain"], {})[c["name"]] = c["value"] + except (json.JSONDecodeError, FileNotFoundError): + pass + return cookies_by_domain + + +def read_cookie_files(dir_path: Optional[str] = None, domains_filter: Optional[List[str]] = None) -> None: + """ + Load cookies from .har and .json files in a directory. + """ + dir_path = dir_path or CookiesConfig.cookies_dir + if not os.access(dir_path, os.R_OK): + debug.log(f"Read cookies: {dir_path} dir is not readable") return + # Optionally load environment variables try: from dotenv import load_dotenv - load_dotenv(os.path.join(dirPath, ".env"), override=True) - debug.log(f"Read cookies: Loaded environment variables from {dirPath}/.env") + load_dotenv(os.path.join(dir_path, ".env"), override=True) + debug.log(f"Read cookies: Loaded env vars from {dir_path}/.env") except ImportError: - debug.error("Warning: 'python-dotenv' is not installed. Environment variables will not be loaded.") + debug.error("Warning: 'python-dotenv' is not installed. Env vars not loaded.") - def get_domain(v: dict) -> str: - host = [h["value"] for h in v['request']['headers'] if h["name"].lower() in ("host", ":authority")] - if not host: - return - host = host.pop() - for d in DOMAINS: - if d in host: - return d - - harFiles = [] - cookieFiles = [] - for root, _, files in os.walk(dirPath): + har_files, json_files = [], [] + for root, _, files in os.walk(dir_path): for file in files: if file.endswith(".har"): - harFiles.append(os.path.join(root, file)) + har_files.append(os.path.join(root, file)) elif file.endswith(".json"): - cookieFiles.append(os.path.join(root, file)) - break + json_files.append(os.path.join(root, file)) + break # Do not recurse - CookiesConfig.cookies = {} - for path in harFiles: - with open(path, 'rb') as file: - try: - harFile = json.load(file) - except json.JSONDecodeError: - # Error: not a HAR file! - continue - debug.log(f"Read .har file: {path}") - new_cookies = {} - for v in harFile['log']['entries']: - domain = get_domain(v) - if domain is None: - continue - v_cookies = {} - for c in v['request']['cookies']: - v_cookies[c['name']] = c['value'] - if len(v_cookies) > 0: - CookiesConfig.cookies[domain] = v_cookies - new_cookies[domain] = len(v_cookies) - for domain, new_values in new_cookies.items(): - debug.log(f"Cookies added: {new_values} from {domain}") - for path in cookieFiles: - with open(path, 'rb') as file: - try: - cookieFile = json.load(file) - except json.JSONDecodeError: - # Error: not a json file! - continue - if not isinstance(cookieFile, list) or not isinstance(cookieFile[0], dict) or "domain" not in cookieFile[0]: - continue - debug.log(f"Read cookie file: {path}") - new_cookies = {} - for c in cookieFile: - if isinstance(c, dict) and "domain" in c: - if c["domain"] not in new_cookies: - new_cookies[c["domain"]] = {} - new_cookies[c["domain"]][c["name"]] = c["value"] - for domain, new_values in new_cookies.items(): - CookiesConfig.cookies[domain] = new_values - debug.log(f"Cookies added: {len(new_values)} from {domain}") \ No newline at end of file + CookiesConfig.cookies.clear() + + # Load cookies from files + for path in har_files: + for domain, cookies in _parse_har_file(path).items(): + if not domains_filter or domain in domains_filter: + CookiesConfig.cookies[domain] = cookies + debug.log(f"Cookies added: {len(cookies)} from {domain}") + + for path in json_files: + for domain, cookies in _parse_json_cookie_file(path).items(): + if not domains_filter or domain in domains_filter: + CookiesConfig.cookies[domain] = cookies + debug.log(f"Cookies added: {len(cookies)} from {domain}") \ No newline at end of file diff --git a/g4f/debug.py b/g4f/debug.py index e7a5691d..f31bad44 100644 --- a/g4f/debug.py +++ b/g4f/debug.py @@ -4,15 +4,35 @@ from typing import Callable, List, Optional, Any logging: bool = False version_check: bool = True version: Optional[str] = None -log_handler: Callable = print # More specifically: Callable[[Any, Optional[Any]], None] +log_handler: Callable[..., None] = print logs: List[str] = [] + +def enable_logging(handler: Callable[..., None] = print) -> None: + """Enable debug logging with optional handler.""" + global logging, log_handler + logging = True + log_handler = handler + + +def disable_logging() -> None: + """Disable debug logging.""" + global logging + logging = False + + def log(*text: Any, file: Optional[Any] = None) -> None: """Log a message if logging is enabled.""" if logging: + message = " ".join(map(str, text)) + logs.append(message) log_handler(*text, file=file) -def error(*error: Any, name: Optional[str] = None) -> None: + +def error(*error_args: Any, name: Optional[str] = None) -> None: """Log an error message to stderr.""" - error = [e if isinstance(e, str) else f"{type(e).__name__ if name is None else name}: {e}" for e in error] - log(*error, file=sys.stderr) + formatted_errors = [ + e if isinstance(e, str) else f"{name or type(e).__name__}: {e}" + for e in error_args + ] + log(*formatted_errors, file=sys.stderr) \ No newline at end of file diff --git a/g4f/errors.py b/g4f/errors.py index fea89fb4..b14d8774 100644 --- a/g4f/errors.py +++ b/g4f/errors.py @@ -1,59 +1,103 @@ -class ProviderNotFoundError(Exception): - ... +class G4FError(Exception): + """Base exception for all g4f-related errors.""" + pass -class ProviderNotWorkingError(Exception): - ... -class StreamNotSupportedError(Exception): - ... +class ProviderNotFoundError(G4FError): + """Raised when a provider is not found.""" + pass -class ModelNotFoundError(Exception): - ... -class ModelNotAllowedError(Exception): - ... +class ProviderNotWorkingError(G4FError): + """Raised when the provider is unavailable or failing.""" + pass -class RetryProviderError(Exception): - ... -class RetryNoProviderError(Exception): - ... +class StreamNotSupportedError(G4FError): + """Raised when the requested provider does not support streaming.""" + pass -class VersionNotFoundError(Exception): - ... -class MissingRequirementsError(Exception): - ... +class ModelNotFoundError(G4FError): + """Raised when a model is not found.""" + pass + + +class ModelNotAllowedError(G4FError): + """Raised when a model is not allowed by configuration or policy.""" + pass + + +class RetryProviderError(G4FError): + """Raised to retry with another provider.""" + pass + + +class RetryNoProviderError(G4FError): + """Raised when there are no providers left to retry.""" + pass + + +class VersionNotFoundError(G4FError): + """Raised when the version could not be determined.""" + pass + + +class MissingRequirementsError(G4FError): + """Raised when a required dependency is missing.""" + pass + class NestAsyncioError(MissingRequirementsError): - ... + """Raised when 'nest_asyncio' is missing.""" + pass -class MissingAuthError(Exception): - ... -class PaymentRequiredError(Exception): - ... +class MissingAuthError(G4FError): + """Raised when authentication details are missing.""" + pass -class NoMediaResponseError(Exception): - ... -class ResponseError(Exception): - ... +class PaymentRequiredError(G4FError): + """Raised when a provider requires payment before access.""" + pass + + +class NoMediaResponseError(G4FError): + """Raised when a media request returns no response.""" + pass + + +class ResponseError(G4FError): + """Base class for response-related errors.""" + pass + + +class ResponseStatusError(ResponseError): + """Raised when an HTTP response returns a non-success status code.""" + pass -class ResponseStatusError(Exception): - ... class CloudflareError(ResponseStatusError): - ... + """Raised when a request is blocked by Cloudflare.""" + pass + class RateLimitError(ResponseStatusError): - ... + """Raised when the provider's rate limit has been exceeded.""" + pass -class NoValidHarFileError(Exception): - ... -class TimeoutError(Exception): +class NoValidHarFileError(G4FError): + """Raised when no valid HAR file is found.""" + pass + + +class TimeoutError(G4FError): """Raised for timeout errors during API requests.""" + pass -class ConversationLimitError(Exception): - """Raised for conversation limit during API requests to AI endpoint.""" \ No newline at end of file + +class ConversationLimitError(G4FError): + """Raised when a conversation limit is reached on the provider.""" + pass \ No newline at end of file diff --git a/g4f/files.py b/g4f/files.py index 840ceb9c..0f0ee0c9 100644 --- a/g4f/files.py +++ b/g4f/files.py @@ -1,26 +1,34 @@ from __future__ import annotations import re -from urllib.parse import unquote import os +from urllib.parse import unquote from .cookies import get_cookies_dir -def secure_filename(filename: str) -> str: + +def secure_filename(filename: str, max_length: int = 100) -> str: + """Sanitize a filename for safe filesystem storage.""" if filename is None: return None - # Keep letters, numbers, basic punctuation and all Unicode chars + + # Keep letters, numbers, basic punctuation, underscores filename = re.sub( - r'[^\w.,_+-]+', - '_', + r"[^\w.,_+\-]+", + "_", unquote(filename).strip(), flags=re.UNICODE ) - encoding = 'utf-8' - max_length = 100 + encoding = "utf-8" encoded = filename.encode(encoding)[:max_length] - decoded = encoded.decode(encoding, 'ignore') + decoded = encoded.decode(encoding, "ignore") return decoded.strip(".,_+-") -def get_bucket_dir(*parts): - return os.path.join(get_cookies_dir(), "buckets", *[secure_filename(part) for part in parts if part]) + +def get_bucket_dir(*parts: str) -> str: + """Return a path under the cookies 'buckets' directory with sanitized parts.""" + return os.path.join( + get_cookies_dir(), + "buckets", + *[secure_filename(part) for part in parts if part] + ) \ No newline at end of file diff --git a/g4f/typing.py b/g4f/typing.py index 1300543f..86c67c20 100644 --- a/g4f/typing.py +++ b/g4f/typing.py @@ -1,42 +1,89 @@ -import os -from typing import Any, AsyncGenerator, Generator, AsyncIterator, Iterator, NewType, Tuple, Union, List, Dict, Type, IO, Optional, TypedDict +from __future__ import annotations -try: - from PIL.Image import Image -except ImportError: - class Image: +import os +from typing import ( + Any, + AsyncGenerator, + Generator, + AsyncIterator, + Iterator, + NewType, + Tuple, + Union, + List, + Dict, + Type, + IO, + Optional, + TypedDict, + TYPE_CHECKING, +) + +# Only import PIL for type-checkers; no runtime dependency required. +if TYPE_CHECKING: + from PIL.Image import Image as PILImage +else: + class PILImage: # minimal placeholder to avoid runtime import errors pass +# Response chunk type from providers from .providers.response import ResponseType -SHA256 = NewType('sha_256_hash', str) +# ---- Hashes & cookie aliases ------------------------------------------------- + +SHA256 = NewType("SHA256", str) +Cookies = Dict[str, str] + +# ---- Streaming result types -------------------------------------------------- + CreateResult = Iterator[Union[str, ResponseType]] AsyncResult = AsyncIterator[Union[str, ResponseType]] -Messages = List[Dict[str, Union[str, List[Dict[str, Union[str, Dict[str, str]]]]]]] -Cookies = Dict[str, str] -ImageType = Union[str, bytes, IO, Image, os.PathLike] + +# ---- Message schema ---------------------------------------------------------- +# Typical message structure: +# {"role": "user" | "assistant" | "system" | "tool", "content": str | [ContentPart, ...]} +# where content parts can be text or (optionally) structured pieces like images. + +class ContentPart(TypedDict, total=False): + type: str # e.g., "text", "image_url", etc. + text: str # present when type == "text" + image_url: Dict[str, str] # present when type == "image_url" + +class Message(TypedDict): + role: str + content: Union[str, List[ContentPart]] + +Messages = List[Message] + +# ---- Media inputs ------------------------------------------------------------ + +# Paths, raw bytes, file-like objects, or PIL Image objects are accepted. +ImageType = Union[str, bytes, IO[bytes], PILImage, os.PathLike] MediaListType = List[Tuple[ImageType, Optional[str]]] __all__ = [ - 'Any', - 'AsyncGenerator', - 'Generator', - 'AsyncIterator', - 'Iterator' - 'Tuple', - 'Union', - 'List', - 'Dict', - 'Type', - 'IO', - 'Optional', - 'TypedDict', - 'SHA256', - 'CreateResult', - 'AsyncResult', - 'Messages', - 'Cookies', - 'Image', - 'ImageType', - 'MediaListType' -] + "Any", + "AsyncGenerator", + "Generator", + "AsyncIterator", + "Iterator", + "Tuple", + "Union", + "List", + "Dict", + "Type", + "IO", + "Optional", + "TypedDict", + "SHA256", + "CreateResult", + "AsyncResult", + "Messages", + "Message", + "ContentPart", + "Cookies", + "Image", + "ImageType", + "MediaListType", + "ResponseType", +] \ No newline at end of file diff --git a/g4f/version.py b/g4f/version.py index bc97421a..7d0a6d1d 100644 --- a/g4f/version.py +++ b/g4f/version.py @@ -1,102 +1,114 @@ from __future__ import annotations -from os import environ import requests -from functools import cached_property +from os import environ +from functools import cached_property, lru_cache from importlib.metadata import version as get_package_version, PackageNotFoundError from subprocess import check_output, CalledProcessError, PIPE + from .errors import VersionNotFoundError from .config import PACKAGE_NAME, GITHUB_REPOSITORY from . import debug +# Default request timeout (seconds) +REQUEST_TIMEOUT = 5 + + +@lru_cache(maxsize=1) def get_pypi_version(package_name: str) -> str: """ Retrieves the latest version of a package from PyPI. - Args: - package_name (str): The name of the package for which to retrieve the version. - - Returns: - str: The latest version of the specified package from PyPI. - Raises: - VersionNotFoundError: If there is an error in fetching the version from PyPI. + VersionNotFoundError: If there is a network or parsing error. """ try: - response = requests.get(f"https://pypi.org/pypi/{package_name}/json").json() - return response["info"]["version"] + response = requests.get( + f"https://pypi.org/pypi/{package_name}/json", + timeout=REQUEST_TIMEOUT + ) + response.raise_for_status() + return response.json()["info"]["version"] except requests.RequestException as e: - raise VersionNotFoundError(f"Failed to get PyPI version: {e}") + raise VersionNotFoundError( + f"Failed to get PyPI version for '{package_name}'" + ) from e + +@lru_cache(maxsize=1) def get_github_version(repo: str) -> str: """ Retrieves the latest release version from a GitHub repository. - Args: - repo (str): The name of the GitHub repository. - - Returns: - str: The latest release version from the specified GitHub repository. - Raises: - VersionNotFoundError: If there is an error in fetching the version from GitHub. + VersionNotFoundError: If there is a network or parsing error. """ try: - response = requests.get(f"https://api.github.com/repos/{repo}/releases/latest") + response = requests.get( + f"https://api.github.com/repos/{repo}/releases/latest", + timeout=REQUEST_TIMEOUT + ) response.raise_for_status() - return response.json()["tag_name"] + data = response.json() + if "tag_name" not in data: + raise VersionNotFoundError(f"No tag_name found in latest GitHub release for '{repo}'") + return data["tag_name"] except requests.RequestException as e: - raise VersionNotFoundError(f"Failed to get GitHub release version: {e}") + raise VersionNotFoundError( + f"Failed to get GitHub release version for '{repo}'" + ) from e -def get_git_version() -> str: - # Read from git repository + +def get_git_version() -> str | None: + """Return latest Git tag if available, else None.""" try: - command = ["git", "describe", "--tags", "--abbrev=0"] - return check_output(command, text=True, stderr=PIPE).strip() + return check_output( + ["git", "describe", "--tags", "--abbrev=0"], + text=True, + stderr=PIPE + ).strip() except CalledProcessError: return None + class VersionUtils: """ Utility class for managing and comparing package versions of 'g4f'. """ + @cached_property def current_version(self) -> str: """ - Retrieves the current version of the 'g4f' package. - - Returns: - str: The current version of 'g4f'. - - Raises: - VersionNotFoundError: If the version cannot be determined from the package manager, - Docker environment, or git repository. + Returns the current installed version of g4f from: + - debug override + - package metadata + - environment variable (Docker) + - git tags """ if debug.version: return debug.version - # Read from package manager try: return get_package_version(PACKAGE_NAME) except PackageNotFoundError: pass - # Read from docker environment - version = environ.get("G4F_VERSION") - if version: - return version + version_env = environ.get("G4F_VERSION") + if version_env: + return version_env - return get_git_version() + git_version = get_git_version() + if git_version: + return git_version + + raise VersionNotFoundError("Could not determine current g4f version.") @property def latest_version(self) -> str: """ - Retrieves the latest version of the 'g4f' package. - - Returns: - str: The latest version of 'g4f'. + Returns the latest available version of g4f. + If not installed via PyPI, falls back to GitHub releases. """ - # Is installed via package manager? try: get_package_version(PACKAGE_NAME) except PackageNotFoundError: @@ -107,17 +119,30 @@ class VersionUtils: def latest_version_cached(self) -> str: return self.latest_version - def check_version(self) -> None: + def check_version(self, silent: bool = False) -> bool: """ - Checks if the current version of 'g4f' is up to date with the latest version. - - Note: - If a newer version is available, it prints a message with the new version and update instructions. + Checks if the current version is up-to-date. + Returns: + bool: True if current version is the latest, False otherwise. """ try: - if self.current_version != self.latest_version: - print(f'New g4f version: {self.latest_version} (current: {self.current_version}) | pip install -U g4f') + current = self.current_version + latest = self.latest_version + up_to_date = current == latest + if not silent: + if up_to_date: + print(f"g4f is up-to-date (version {current}).") + else: + print( + f"New g4f version available: {latest} " + f"(current: {current}) | pip install -U g4f" + ) + return up_to_date except Exception as e: - print(f'Failed to check g4f version: {e}') + if not silent: + print(f"Failed to check g4f version: {e}") + return True # Assume up-to-date if check fails -utils = VersionUtils() + +# Singleton instance +utils = VersionUtils() \ No newline at end of file