from __future__ import annotations import json from typing import Union from aiohttp import ClientResponse from requests import Response as RequestsResponse from ..errors import ResponseStatusError, RateLimitError, MissingAuthError, CloudflareError from . import Response, StreamResponse def is_cloudflare(text: str) -> bool: if "Generated by cloudfront" in text or '
' in text: return True elif "
Unable to load site
" in text or 'id="challenge-error-text"' in text async def raise_for_status_async(response: Union[StreamResponse, ClientResponse], message: str = None): if response.ok: return is_html = False if message is None: content_type = response.headers.get("content-type", "") if content_type.startswith("application/json"): try: message = await response.json() error = message.get("error") if isinstance(error, dict): message = error.get("message") else: message = message.get("message", message) if isinstance(error, str): message = f"{error}: {message}" except json.JSONDecodeError: message = await response.text() else: message = (await response.text()).strip() is_html = content_type.startswith("text/html") or message.startswith("