gpt4free/g4f/tools/fetch_and_scrape.py

98 lines
No EOL
3.7 KiB
Python

from __future__ import annotations
import hashlib
import asyncio
from pathlib import Path
from typing import Iterator, Optional
from urllib.parse import urlparse, quote_plus
from aiohttp import ClientSession, ClientError
from datetime import date
import asyncio
try:
from bs4 import BeautifulSoup
has_requirements = True
except ImportError:
has_requirements = False
from ..cookies import get_cookies_dir
from ..providers.response import format_link
def scrape_text(html: str, max_words: Optional[int] = None, add_source: bool = True, count_images: int = 2) -> Iterator[str]:
"""
Parses the provided HTML and yields text fragments.
"""
soup = BeautifulSoup(html, "html.parser")
for selector in [
"main", ".main-content-wrapper", ".main-content", ".emt-container-inner",
".content-wrapper", "#content", "#mainContent",
]:
selected = soup.select_one(selector)
if selected:
soup = selected
break
for remove_selector in [".c-globalDisclosure"]:
unwanted = soup.select_one(remove_selector)
if unwanted:
unwanted.extract()
image_selector = "img[alt][src^=http]:not([alt='']):not(.avatar):not([width])"
image_link_selector = f"a:has({image_selector})"
seen_texts = []
for element in soup.select(f"h1, h2, h3, h4, h5, h6, p, pre, table:not(:has(p)), ul:not(:has(p)), {image_link_selector}"):
if count_images > 0:
image = element.select_one(image_selector)
if image:
title = str(element.get("title", element.text))
if title:
yield f"!{format_link(image['src'], title)}\n"
if max_words is not None:
max_words -= 10
count_images -= 1
continue
for line in element.get_text(" ").splitlines():
words = [word for word in line.split() if word]
if not words:
continue
joined_line = " ".join(words)
if joined_line in seen_texts:
continue
if max_words is not None:
max_words -= len(words)
if max_words <= 0:
break
yield joined_line + "\n"
seen_texts.append(joined_line)
if add_source:
canonical_link = soup.find("link", rel="canonical")
if canonical_link and "href" in canonical_link.attrs:
link = canonical_link["href"]
domain = urlparse(link).netloc
yield f"\nSource: [{domain}]({link})"
async def fetch_and_scrape(session: ClientSession, url: str, max_words: Optional[int] = None, add_source: bool = False, proxy: str = None) -> str:
"""
Fetches a URL and returns the scraped text, using caching to avoid redundant downloads.
"""
try:
cache_dir: Path = Path(get_cookies_dir()) / ".scrape_cache" / "fetch_and_scrape"
cache_dir.mkdir(parents=True, exist_ok=True)
md5_hash = hashlib.md5(url.encode(errors="ignore")).hexdigest()
cache_file = cache_dir / f"{quote_plus(url.split('?')[0].split('//')[1].replace('/', ' ')[:48])}.{date.today()}.{md5_hash[:16]}.cache"
if cache_file.exists():
return cache_file.read_text()
async with session.get(url, proxy=proxy) as response:
if response.status == 200:
html = await response.text(errors="replace")
scraped_text = "".join(scrape_text(html, max_words, add_source))
with open(cache_file, "wb") as f:
f.write(scraped_text.encode(errors="replace"))
return scraped_text
except (ClientError, asyncio.TimeoutError):
return ""
return ""