gpt4free/g4f/requests/__init__.py
hlohaus 17976b93a7 ```
feat: add __call__ in field_serializer and update body wait logic

- In g4f/client/stubs.py, add a __call__ method to the field_serializer class that returns the first positional argument.
- In g4f/requests/__init__.py, replace the wait_for call with a while loop that repeatedly evaluates "document.querySelector('body:not(.no-js)')" and sleeps until the element is found.
```
2025-04-17 08:19:33 +02:00

192 lines
No EOL
6.5 KiB
Python

from __future__ import annotations
import os
import time
import random
from urllib.parse import urlparse
from typing import Iterator
from http.cookies import Morsel
from pathlib import Path
import asyncio
try:
from curl_cffi.requests import Session, Response
from .curl_cffi import StreamResponse, StreamSession, FormData
has_curl_cffi = True
except ImportError:
from typing import Type as Response
from .aiohttp import StreamResponse, StreamSession, FormData
has_curl_cffi = False
try:
import webview
has_webview = True
except ImportError:
has_webview = False
try:
import nodriver
from nodriver.cdp.network import CookieParam
from nodriver.core.config import find_chrome_executable
from nodriver import Browser, Tab, util
has_nodriver = True
except ImportError:
from typing import Type as Browser
from typing import Type as Tab
has_nodriver = False
try:
from platformdirs import user_config_dir
has_platformdirs = True
except ImportError:
has_platformdirs = False
from .. import debug
from .raise_for_status import raise_for_status
from ..errors import MissingRequirementsError
from ..typing import Cookies
from ..cookies import get_cookies_dir
from .defaults import DEFAULT_HEADERS, WEBVIEW_HAEDERS
if not has_curl_cffi:
class Session:
def __init__(self, **kwargs):
raise MissingRequirementsError('Install "curl_cffi" package | pip install -U curl_cffi')
async def get_args_from_webview(url: str) -> dict:
if not has_webview:
raise MissingRequirementsError('Install "webview" package')
window = webview.create_window("", url, hidden=True)
await asyncio.sleep(2)
body = None
while body is None:
try:
await asyncio.sleep(1)
body = window.dom.get_element("body:not(.no-js)")
except:
...
headers = {
**WEBVIEW_HAEDERS,
"User-Agent": window.evaluate_js("this.navigator.userAgent"),
"Accept-Language": window.evaluate_js("this.navigator.language"),
"Referer": window.real_url
}
cookies = [list(*cookie.items()) for cookie in window.get_cookies()]
cookies = {name: cookie.value for name, cookie in cookies}
window.destroy()
return {"headers": headers, "cookies": cookies}
def get_cookie_params_from_dict(cookies: Cookies, url: str = None, domain: str = None) -> list[CookieParam]:
[CookieParam.from_json({
"name": key,
"value": value,
"url": url,
"domain": domain
}) for key, value in cookies.items()]
async def get_args_from_nodriver(
url: str,
proxy: str = None,
timeout: int = 120,
wait_for: str = None,
callback: callable = None,
cookies: Cookies = None,
browser: Browser = None
) -> dict:
if browser is None:
browser, stop_browser = await get_nodriver(proxy=proxy, timeout=timeout)
else:
def stop_browser():
...
try:
if debug.logging:
print(f"Open nodriver with url: {url}")
domain = urlparse(url).netloc
if cookies is None:
cookies = {}
else:
await browser.cookies.set_all(get_cookie_params_from_dict(cookies, url=url, domain=domain))
page = await browser.get(url)
user_agent = await page.evaluate("window.navigator.userAgent", return_by_value=True)
while not await page.evaluate("document.querySelector('body:not(.no-js)')"):
await asyncio.sleep(1)
if wait_for is not None:
await page.wait_for(wait_for, timeout=timeout)
if callback is not None:
await callback(page)
for c in await page.send(nodriver.cdp.network.get_cookies([url])):
cookies[c.name] = c.value
await page.close()
return {
"impersonate": "chrome",
"cookies": cookies,
"headers": {
**DEFAULT_HEADERS,
"user-agent": user_agent,
"referer": url,
},
"proxy": proxy,
}
finally:
stop_browser()
def merge_cookies(cookies: Iterator[Morsel], response: Response) -> Cookies:
if cookies is None:
cookies = {}
if hasattr(response.cookies, "jar"):
for cookie in response.cookies.jar:
cookies[cookie.name] = cookie.value
else:
for key, value in response.cookies.items():
cookies[key] = value
return cookies
async def get_nodriver(
proxy: str = None,
user_data_dir = "nodriver",
timeout: int = 120,
browser_executable_path=None,
**kwargs
) -> tuple[Browser, callable]:
if not has_nodriver:
raise MissingRequirementsError('Install "nodriver" and "platformdirs" package | pip install -U nodriver platformdirs')
user_data_dir = user_config_dir(f"g4f-{user_data_dir}") if has_platformdirs else None
if browser_executable_path is None:
try:
browser_executable_path = find_chrome_executable()
except FileNotFoundError:
# Default to Edge if Chrome is not available.
browser_executable_path = "C:\\Program Files (x86)\\Microsoft\\Edge\\Application\\msedge.exe"
if not os.path.exists(browser_executable_path):
browser_executable_path = None
lock_file = Path(get_cookies_dir()) / ".nodriver_is_open"
lock_file.parent.mkdir(exist_ok=True)
# Implement a short delay (milliseconds) to prevent race conditions.
await asyncio.sleep(0.1 * random.randint(0, 50))
if lock_file.exists():
opend_at = float(lock_file.read_text())
time_open = time.time() - opend_at
if timeout * 2 > time_open:
debug.log(f"Nodriver: Browser is already in use since {time_open} secs.")
for _ in range(timeout):
if lock_file.exists():
await asyncio.sleep(1)
else:
break
lock_file.write_text(str(time.time()))
debug.log(f"Open nodriver with user_dir: {user_data_dir}")
try:
browser = await nodriver.start(
user_data_dir=user_data_dir,
browser_args=None if proxy is None else [f"--proxy-server={proxy}"],
browser_executable_path=browser_executable_path,
**kwargs
)
except:
if util.get_registered_instances():
browser = util.get_registered_instances().pop()
else:
raise
def on_stop():
try:
if browser.connection:
browser.stop()
finally:
lock_file.unlink(missing_ok=True)
return browser, on_stop