mirror of
https://github.com/xtekky/gpt4free.git
synced 2025-12-06 02:30:41 -08:00
Add Yupp provider implementation with account management and model fetching
This commit is contained in:
parent
d309e0df1d
commit
d1df33ef4b
4 changed files with 775 additions and 0 deletions
477
g4f/Provider/Yupp.py
Normal file
477
g4f/Provider/Yupp.py
Normal file
|
|
@ -0,0 +1,477 @@
|
|||
import json
|
||||
import time
|
||||
import uuid
|
||||
import re
|
||||
import os
|
||||
from typing import Optional, Dict, Any, Generator, List
|
||||
import threading
|
||||
from ..providers.base_provider import AbstractProvider, ProviderModelMixin
|
||||
from ..providers.response import Reasoning, PlainTextResponse, PreviewResponse
|
||||
from ..errors import RateLimitError, ProviderException
|
||||
from ..cookies import get_cookies
|
||||
from .yupp.models import YuppModelManager
|
||||
from ..debug import log
|
||||
|
||||
# Global variables to manage Yupp accounts (should be set by your main application)
|
||||
YUPP_ACCOUNTS: List[Dict[str, Any]] = []
|
||||
YUPP_MODELS: List[Dict[str, Any]] = []
|
||||
account_rotation_lock = threading.Lock()
|
||||
|
||||
class YuppAccount:
|
||||
"""Yupp account representation"""
|
||||
def __init__(self, token: str, is_valid: bool = True, error_count: int = 0, last_used: float = 0):
|
||||
self.token = token
|
||||
self.is_valid = is_valid
|
||||
self.error_count = error_count
|
||||
self.last_used = last_used
|
||||
|
||||
def load_yupp_accounts(tokens_str: str):
|
||||
"""Load Yupp accounts from token string (compatible with your existing system)"""
|
||||
global YUPP_ACCOUNTS
|
||||
if not tokens_str:
|
||||
return
|
||||
|
||||
tokens = [token.strip() for token in tokens_str.split(',') if token.strip()]
|
||||
YUPP_ACCOUNTS = [
|
||||
{
|
||||
"token": token,
|
||||
"is_valid": True,
|
||||
"error_count": 0,
|
||||
"last_used": 0.0
|
||||
}
|
||||
for token in tokens
|
||||
]
|
||||
|
||||
def create_requests_session():
|
||||
"""Create a requests session with proper headers"""
|
||||
import requests
|
||||
session = requests.Session()
|
||||
session.headers.update({
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0",
|
||||
"Accept": "text/x-component, */*",
|
||||
"Accept-Encoding": "gzip, deflate, br, zstd",
|
||||
"Accept-Language": "en-US,en;q=0.9",
|
||||
"Sec-Fetch-Dest": "empty",
|
||||
"Sec-Fetch-Mode": "cors",
|
||||
"Sec-Fetch-Site": "same-origin",
|
||||
})
|
||||
return session
|
||||
|
||||
def get_best_yupp_account() -> Optional[Dict[str, Any]]:
|
||||
"""Get the best available Yupp account using a smart selection algorithm."""
|
||||
max_error_count = int(os.getenv("MAX_ERROR_COUNT", "3"))
|
||||
error_cooldown = int(os.getenv("ERROR_COOLDOWN", "300"))
|
||||
|
||||
with account_rotation_lock:
|
||||
now = time.time()
|
||||
valid_accounts = [
|
||||
acc
|
||||
for acc in YUPP_ACCOUNTS
|
||||
if acc["is_valid"]
|
||||
and (
|
||||
acc["error_count"] < max_error_count
|
||||
or now - acc["last_used"] > error_cooldown
|
||||
)
|
||||
]
|
||||
|
||||
if not valid_accounts:
|
||||
return None
|
||||
|
||||
# Reset error count for accounts that have been in cooldown
|
||||
for acc in valid_accounts:
|
||||
if (
|
||||
acc["error_count"] >= max_error_count
|
||||
and now - acc["last_used"] > error_cooldown
|
||||
):
|
||||
acc["error_count"] = 0
|
||||
|
||||
# Sort by last used (oldest first) and error count (lowest first)
|
||||
valid_accounts.sort(key=lambda x: (x["last_used"], x["error_count"]))
|
||||
account = valid_accounts[0]
|
||||
account["last_used"] = now
|
||||
return account
|
||||
|
||||
def format_messages_for_yupp(messages: List[Dict[str, str]]) -> str:
|
||||
"""Format multi-turn conversation for Yupp single-turn format"""
|
||||
formatted = []
|
||||
|
||||
# Handle system messages
|
||||
system_messages = [msg for msg in messages if msg.get("role") == "system"]
|
||||
if system_messages:
|
||||
for sys_msg in system_messages:
|
||||
content = sys_msg.get("content", "")
|
||||
formatted.append(content)
|
||||
|
||||
# Handle user and assistant messages
|
||||
user_assistant_msgs = [msg for msg in messages if msg.get("role") != "system"]
|
||||
for msg in user_assistant_msgs:
|
||||
role = "Human" if msg.get("role") == "user" else "Assistant"
|
||||
content = msg.get("content", "")
|
||||
formatted.append(f"\n\n{role}: {content}")
|
||||
|
||||
# Ensure it ends with Assistant:
|
||||
if not formatted or not formatted[-1].strip().startswith("Assistant:"):
|
||||
formatted.append("\n\nAssistant:")
|
||||
|
||||
result = "".join(formatted)
|
||||
# Remove leading \n\n if present
|
||||
if result.startswith("\n\n"):
|
||||
result = result[2:]
|
||||
|
||||
return result
|
||||
|
||||
def claim_yupp_reward(account: Dict[str, Any], reward_id: str):
|
||||
"""Claim Yupp reward synchronously"""
|
||||
try:
|
||||
import requests
|
||||
log_debug(f"Claiming reward {reward_id}...")
|
||||
url = "https://yupp.ai/api/trpc/reward.claim?batch=1"
|
||||
payload = {"0": {"json": {"rewardId": reward_id}}}
|
||||
headers = {
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36 Edg/137.0.0.0",
|
||||
"Content-Type": "application/json",
|
||||
"sec-fetch-site": "same-origin",
|
||||
"Cookie": f"__Secure-yupp.session-token={account['token']}",
|
||||
}
|
||||
session = create_requests_session()
|
||||
response = session.post(url, json=payload, headers=headers)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
balance = data[0]["result"]["data"]["json"]["currentCreditBalance"]
|
||||
log_debug(f"Reward claimed successfully. New balance: {balance}")
|
||||
return balance
|
||||
except Exception as e:
|
||||
log_debug(f"Failed to claim reward {reward_id}. Error: {e}")
|
||||
return None
|
||||
|
||||
def log_debug(message: str):
|
||||
"""Debug logging (can be replaced with your logging system)"""
|
||||
if os.getenv("DEBUG_MODE", "false").lower() == "true":
|
||||
print(f"[DEBUG] {message}")
|
||||
else:
|
||||
log(f"[Yupp] {message}")
|
||||
|
||||
class Yupp(AbstractProvider, ProviderModelMixin):
|
||||
"""
|
||||
Yupp.ai Provider for g4f
|
||||
Uses multiple account rotation and smart error handling
|
||||
"""
|
||||
|
||||
working = True
|
||||
active_by_default = True
|
||||
|
||||
@classmethod
|
||||
def get_models(cls) -> List[Dict[str, Any]]:
|
||||
if not cls.models:
|
||||
manager = YuppModelManager()
|
||||
models = manager.client.fetch_models()
|
||||
cls.models = [model.get("name") for model in models]
|
||||
return cls.models
|
||||
|
||||
@classmethod
|
||||
def create_completion(
|
||||
cls,
|
||||
model: str,
|
||||
messages: List[Dict[str, str]] = None,
|
||||
stream: bool = False,
|
||||
api_key: Optional[str] = None,
|
||||
temperature: float = 0.7,
|
||||
max_tokens: int = 1000,
|
||||
**kwargs,
|
||||
) -> Generator[str, Any, None]:
|
||||
if not api_key:
|
||||
api_key = get_cookies("yupp.ai", False).get("__Secure-yupp.session-token")
|
||||
|
||||
# Initialize Yupp accounts and models
|
||||
if api_key:
|
||||
load_yupp_accounts(api_key)
|
||||
|
||||
log_debug(f"Yupp provider initialized with {len(YUPP_ACCOUNTS)} accounts")
|
||||
|
||||
"""
|
||||
Create completion using Yupp.ai API with account rotation
|
||||
"""
|
||||
if messages is None:
|
||||
messages = []
|
||||
|
||||
if not YUPP_ACCOUNTS:
|
||||
raise ProviderException("No Yupp accounts configured. Set YUPP_API_KEY environment variable.")
|
||||
|
||||
# Format messages
|
||||
question = format_messages_for_yupp(messages)
|
||||
log_debug(f"Formatted question length: {len(question)}")
|
||||
|
||||
# Try all accounts with rotation
|
||||
max_attempts = len(YUPP_ACCOUNTS)
|
||||
for attempt in range(max_attempts):
|
||||
account = get_best_yupp_account()
|
||||
if not account:
|
||||
raise ProviderException("No valid Yupp accounts available")
|
||||
|
||||
try:
|
||||
yield from cls._make_yupp_request(
|
||||
account, question, model, model, stream,
|
||||
temperature, max_tokens, **kwargs
|
||||
)
|
||||
return # Success, exit the loop
|
||||
|
||||
except RateLimitError:
|
||||
log_debug(f"Account ...{account['token'][-4:]} hit rate limit, rotating")
|
||||
with account_rotation_lock:
|
||||
account["error_count"] += 1
|
||||
continue
|
||||
except ProviderException as e:
|
||||
log_debug(f"Account ...{account['token'][-4:]} failed: {str(e)}")
|
||||
with account_rotation_lock:
|
||||
if "auth" in str(e).lower() or "401" in str(e) or "403" in str(e):
|
||||
account["is_valid"] = False
|
||||
else:
|
||||
account["error_count"] += 1
|
||||
continue
|
||||
except Exception as e:
|
||||
log_debug(f"Unexpected error with account ...{account['token'][-4:]}: {str(e)}")
|
||||
with account_rotation_lock:
|
||||
account["error_count"] += 1
|
||||
raise ProviderException(f"Yupp request failed: {str(e)}") from e
|
||||
|
||||
raise ProviderException("All Yupp accounts failed after rotation attempts")
|
||||
|
||||
@classmethod
|
||||
def _make_yupp_request(
|
||||
cls,
|
||||
account: Dict[str, Any],
|
||||
question: str,
|
||||
model_name: str,
|
||||
model_id: str,
|
||||
stream: bool,
|
||||
temperature: float,
|
||||
max_tokens: int,
|
||||
**kwargs
|
||||
) -> Generator[str, Any, None]:
|
||||
"""Make actual request to Yupp.ai"""
|
||||
|
||||
# Build request
|
||||
url_uuid = str(uuid.uuid4())
|
||||
url = f"https://yupp.ai/chat/{url_uuid}?stream=true"
|
||||
|
||||
headers = {
|
||||
"accept": "text/x-component",
|
||||
"accept-language": "de,en-US;q=0.9,en;q=0.8,zh-CN;q=0.7,zh;q=0.6",
|
||||
"cache-control": "no-cache",
|
||||
"content-type": "text/plain;charset=UTF-8",
|
||||
"next-action": "7f2a2308b5fc462a2c26df714cb2cccd02a9c10fbb",
|
||||
"pragma": "no-cache",
|
||||
"priority": "u=1, i",
|
||||
"sec-ch-ua": "\"Chromium\";v=\"140\", \"Not=A?Brand\";v=\"24\", \"Google Chrome\";v=\"140\"",
|
||||
"sec-ch-ua-mobile": "?0",
|
||||
"sec-ch-ua-platform": "\"Linux\"",
|
||||
"sec-fetch-dest": "empty",
|
||||
"sec-fetch-mode": "cors",
|
||||
"sec-fetch-site": "same-origin",
|
||||
"cookie": f"__Secure-yupp.session-token={account['token']}",
|
||||
}
|
||||
|
||||
log_debug(f"Sending request to Yupp.ai with account ...{account['token'][-4:]}")
|
||||
|
||||
payload = [
|
||||
url_uuid,
|
||||
str(uuid.uuid4()),
|
||||
question,
|
||||
"$undefined",
|
||||
"$undefined",
|
||||
[],
|
||||
"$undefined",
|
||||
[{"modelName": model_name, "promptModifierId": "$undefined"}] if model_name else "none",
|
||||
"text",
|
||||
False,
|
||||
"$undefined",
|
||||
]
|
||||
|
||||
# Send request
|
||||
session = create_requests_session()
|
||||
response = session.post(
|
||||
url,
|
||||
data=json.dumps(payload),
|
||||
headers=headers,
|
||||
stream=True,
|
||||
timeout=60
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
yield from cls._process_stream_response(
|
||||
response.iter_lines(), account
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _process_stream_response(
|
||||
cls,
|
||||
response_lines,
|
||||
account: Dict[str, Any]
|
||||
) -> Generator[str, Any, None]:
|
||||
"""Process Yupp stream response and convert to OpenAI format"""
|
||||
|
||||
line_pattern = re.compile(b"^([0-9a-fA-F]+):(.*)")
|
||||
chunks = {}
|
||||
target_stream_id = None
|
||||
reward_info = None
|
||||
is_thinking = False
|
||||
thinking_content = ""
|
||||
normal_content = ""
|
||||
select_stream = [None, None]
|
||||
processed_content = set()
|
||||
|
||||
def extract_ref_id(ref):
|
||||
"""Extract ID from reference string, e.g., from '$@123' extract '123'"""
|
||||
return ref[2:] if ref and isinstance(ref, str) and ref.startswith("$@") else None
|
||||
|
||||
def is_valid_content(content: str) -> bool:
|
||||
"""Check if content is valid, avoid over-filtering"""
|
||||
if not content or content in [None, "", "$undefined"]:
|
||||
return False
|
||||
|
||||
if content.startswith("\\n\\<streaming stopped") or content.startswith("\n\\<streaming stopped"):
|
||||
return False
|
||||
|
||||
if re.match(r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", content.strip()):
|
||||
return False
|
||||
|
||||
if len(content.strip()) == 0:
|
||||
return False
|
||||
|
||||
if content.strip() in ["$undefined", "undefined", "null", "NULL"]:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def process_content_chunk(content: str, chunk_id: str, line_count: int):
|
||||
"""Process single content chunk"""
|
||||
nonlocal is_thinking, thinking_content, normal_content
|
||||
|
||||
if not is_valid_content(content):
|
||||
return
|
||||
|
||||
# log_debug(f"Processing chunk #{line_count} with content: '{content[:50]}...'")
|
||||
|
||||
if is_thinking:
|
||||
yield Reasoning(content)
|
||||
else:
|
||||
normal_content += content
|
||||
yield content
|
||||
|
||||
try:
|
||||
# log_debug("Starting to process Yupp stream response...")
|
||||
line_count = 0
|
||||
quick_response_id = None
|
||||
|
||||
for line in response_lines:
|
||||
|
||||
line_count += 1
|
||||
|
||||
match = line_pattern.match(line)
|
||||
if not match:
|
||||
log_debug(f"Line {line_count}: No pattern match")
|
||||
continue
|
||||
|
||||
chunk_id, chunk_data = match.groups()
|
||||
chunk_id = chunk_id.decode()
|
||||
|
||||
try:
|
||||
data = json.loads(chunk_data) if chunk_data != b"{}" else {}
|
||||
chunks[chunk_id] = data
|
||||
except json.JSONDecodeError:
|
||||
log_debug(f"Failed to parse JSON for chunk {chunk_id}")
|
||||
continue
|
||||
|
||||
# Process reward info
|
||||
if chunk_id == "a":
|
||||
reward_info = data
|
||||
log_debug(f"Found reward info")
|
||||
|
||||
# Process initial setup
|
||||
elif chunk_id == "1":
|
||||
yield PlainTextResponse(line.decode(errors="ignore"))
|
||||
if isinstance(data, dict):
|
||||
left_stream = data.get("leftStream", {})
|
||||
right_stream = data.get("rightStream", {})
|
||||
quick_response_id = extract_ref_id(data.get("quickResponse", {}).get("stream", {}).get("next"))
|
||||
select_stream = [left_stream, right_stream]
|
||||
|
||||
elif chunk_id == "e":
|
||||
yield PlainTextResponse(line.decode(errors="ignore"))
|
||||
if isinstance(data, dict):
|
||||
for i, selection in enumerate(data.get("modelSelections", [])):
|
||||
if selection.get("selectionSource") == "USER_SELECTED":
|
||||
if i < len(select_stream) and isinstance(select_stream[i], dict):
|
||||
target_stream_id = extract_ref_id(select_stream[i].get("next"))
|
||||
log_debug(f"Found target stream ID: {target_stream_id}")
|
||||
break
|
||||
|
||||
# Process target stream content
|
||||
elif target_stream_id and chunk_id == target_stream_id:
|
||||
yield PlainTextResponse(line.decode(errors="ignore"))
|
||||
if isinstance(data, dict):
|
||||
target_stream_id = extract_ref_id(data.get("next"))
|
||||
content = data.get("curr", "")
|
||||
if content:
|
||||
yield from process_content_chunk(content, chunk_id, line_count)
|
||||
|
||||
elif quick_response_id and chunk_id == quick_response_id:
|
||||
yield PlainTextResponse(line.decode(errors="ignore"))
|
||||
if isinstance(data, dict):
|
||||
content = data.get("curr", "")
|
||||
if content:
|
||||
yield PreviewResponse(content)
|
||||
|
||||
# Fallback: process any chunk with "curr"
|
||||
elif isinstance(data, dict) and "curr" in data:
|
||||
content = data.get("curr", "")
|
||||
if content:
|
||||
pass #yield from process_content_chunk(content, chunk_id)
|
||||
|
||||
log_debug(f"Finished processing {line_count} lines")
|
||||
|
||||
except:
|
||||
raise
|
||||
|
||||
finally:
|
||||
# Claim reward in background
|
||||
if reward_info and "unclaimedRewardInfo" in reward_info:
|
||||
reward_id = reward_info["unclaimedRewardInfo"].get("rewardId")
|
||||
if reward_id:
|
||||
try:
|
||||
claim_yupp_reward(account, reward_id)
|
||||
except Exception as e:
|
||||
log_debug(f"Failed to claim reward: {e}")
|
||||
|
||||
# log_debug(f"Stream completed. Content length: {len(normal_content)}")
|
||||
|
||||
# Initialize the provider
|
||||
def init_yupp_provider():
|
||||
"""Initialize Yupp provider with environment configuration"""
|
||||
tokens = os.getenv("YUPP_TOKENS", "")
|
||||
if tokens:
|
||||
load_yupp_accounts(tokens)
|
||||
|
||||
log_debug(f"Yupp provider initialized: {len(YUPP_ACCOUNTS)} accounts, {len(YUPP_MODELS)} models")
|
||||
return Yupp
|
||||
|
||||
# Example usage and testing
|
||||
if __name__ == "__main__":
|
||||
# Set up environment for testing
|
||||
# os.environ["DEBUG_MODE"] = "true"
|
||||
|
||||
# Initialize provider
|
||||
provider = init_yupp_provider()
|
||||
|
||||
# Test stream completion
|
||||
try:
|
||||
print("\nTesting stream completion...")
|
||||
for chunk in provider.create_completion(
|
||||
model="claude-sonnet-4-5-20250929<>thinking",
|
||||
messages=[{"role": "user", "content": "What is Python?"}],
|
||||
stream=True
|
||||
):
|
||||
if isinstance(chunk, str) and chunk.strip():
|
||||
print(chunk, end="")
|
||||
except Exception as e:
|
||||
print(f"\nStream test failed: {e}")
|
||||
|
|
@ -61,6 +61,7 @@ from .StringableInference import StringableInference
|
|||
from .TeachAnything import TeachAnything
|
||||
from .WeWordle import WeWordle
|
||||
from .Yqcloud import Yqcloud
|
||||
from .Yupp import Yupp
|
||||
|
||||
import sys
|
||||
|
||||
|
|
|
|||
293
g4f/Provider/yupp/models.py
Normal file
293
g4f/Provider/yupp/models.py
Normal file
|
|
@ -0,0 +1,293 @@
|
|||
import json
|
||||
import os
|
||||
import requests
|
||||
from typing import Dict, List, Optional, Any
|
||||
from dataclasses import dataclass
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModelConfig:
|
||||
"""Configuration for API requests"""
|
||||
base_url: str = "https://yupp.ai"
|
||||
api_endpoint: str = "/api/trpc/model.getModelInfoList,scribble.getScribbleByLabel"
|
||||
timeout: int = 30
|
||||
fallback_file: str = "models.json"
|
||||
output_file: str = "model.json"
|
||||
|
||||
|
||||
class YuppAPIClient:
|
||||
"""Yupp API client for fetching model data"""
|
||||
|
||||
def __init__(self, config: ModelConfig = None):
|
||||
self.config = config or ModelConfig()
|
||||
self.session = requests.Session()
|
||||
self._setup_session()
|
||||
|
||||
def _setup_session(self) -> None:
|
||||
"""Setup session with headers and cookies"""
|
||||
self.session.headers.update(self._get_headers())
|
||||
self._set_cookies()
|
||||
|
||||
def _get_headers(self) -> Dict[str, str]:
|
||||
"""Get request headers"""
|
||||
return {
|
||||
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
|
||||
"Accept": "application/json, text/plain, */*",
|
||||
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
|
||||
"Referer": f"{self.config.base_url}/",
|
||||
"Origin": self.config.base_url,
|
||||
"Sec-Fetch-Dest": "empty",
|
||||
"Sec-Fetch-Mode": "cors",
|
||||
"Sec-Fetch-Site": "same-origin",
|
||||
}
|
||||
|
||||
def _set_cookies(self) -> None:
|
||||
"""Set cookies from environment variable"""
|
||||
token = self._get_session_token()
|
||||
if token:
|
||||
self.session.cookies.set("__Secure-yupp.session-token", token)
|
||||
|
||||
def _get_session_token(self) -> Optional[str]:
|
||||
"""Get session token from environment variable"""
|
||||
env_tokens = os.getenv("YUPP_TOKENS")
|
||||
if not env_tokens:
|
||||
return None
|
||||
|
||||
try:
|
||||
tokens = [t.strip() for t in env_tokens.split(",") if t.strip()]
|
||||
return tokens[0] if tokens else None
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to parse YUPP_TOKENS: {e}")
|
||||
return None
|
||||
|
||||
def _build_api_url(self) -> str:
|
||||
"""Build the complete API URL"""
|
||||
params = "batch=1&input=%7B%220%22%3A%7B%22json%22%3Anull%2C%22meta%22%3A%7B%22values%22%3A%5B%22undefined%22%5D%7D%7D%2C%221%22%3A%7B%22json%22%3A%7B%22label%22%3A%22homepage_banner%22%7D%7D%7D"
|
||||
return f"{self.config.base_url}{self.config.api_endpoint}?{params}"
|
||||
|
||||
def fetch_models(self) -> Optional[List[Dict[str, Any]]]:
|
||||
"""Fetch model data from API"""
|
||||
url = self._build_api_url()
|
||||
|
||||
try:
|
||||
print(f"Fetching data from: {url}")
|
||||
response = self.session.get(url, timeout=self.config.timeout)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
print("Successfully fetched and parsed model data")
|
||||
|
||||
# Extract model list from response structure
|
||||
if data and isinstance(data, list) and len(data) > 0:
|
||||
return data[0]["result"]["data"]["json"]
|
||||
else:
|
||||
print("Unexpected response format")
|
||||
return None
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
print(f"Request failed: {e}")
|
||||
return None
|
||||
except (ValueError, json.JSONDecodeError) as e:
|
||||
print(f"JSON parsing failed: {e}")
|
||||
return None
|
||||
except KeyError as e:
|
||||
print(f"Data structure error - missing key: {e}")
|
||||
return None
|
||||
|
||||
|
||||
class ModelProcessor:
|
||||
"""Process and filter model data"""
|
||||
|
||||
SUPPORTED_FAMILIES = {
|
||||
"GPT", "Claude", "Gemini", "Qwen", "DeepSeek", "Perplexity", "Kimi"
|
||||
}
|
||||
|
||||
TAG_MAPPING = {
|
||||
"isPro": "☀️",
|
||||
"isMax": "🔥",
|
||||
"isNew": "🆕",
|
||||
"isLive": "🎤",
|
||||
"isAgent": "🤖",
|
||||
"isFast": "🚀",
|
||||
"isReasoning": "🧠",
|
||||
"isImageGeneration": "🎨",
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def generate_tags(cls, item: Dict[str, Any]) -> List[str]:
|
||||
"""Generate tags for model display"""
|
||||
tags = []
|
||||
|
||||
# Add emoji tags based on boolean flags
|
||||
for key, emoji in cls.TAG_MAPPING.items():
|
||||
if item.get(key, False):
|
||||
tags.append(emoji)
|
||||
|
||||
# Add attachment tag if supported
|
||||
if item.get("supportedAttachmentMimeTypes"):
|
||||
tags.append("📎")
|
||||
|
||||
return tags
|
||||
|
||||
@classmethod
|
||||
def should_include_model(cls, item: Dict[str, Any]) -> bool:
|
||||
"""Check if model should be included in output"""
|
||||
family = item.get("family")
|
||||
|
||||
# Include if in supported families or has special features
|
||||
return (
|
||||
family in cls.SUPPORTED_FAMILIES or
|
||||
item.get("isImageGeneration") or
|
||||
item.get("isAgent") or
|
||||
item.get("isLive")
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def process_model_item(cls, item: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Process individual model item"""
|
||||
tags = cls.generate_tags(item)
|
||||
label = item.get("label", "")
|
||||
|
||||
# Add tags to label if present
|
||||
if tags:
|
||||
label += "\n" + " | ".join(tags)
|
||||
|
||||
return {
|
||||
"id": item.get("id"),
|
||||
"name": item.get("name"),
|
||||
"label": label,
|
||||
"shortLabel": item.get("shortLabel"),
|
||||
"publisher": item.get("publisher"),
|
||||
"family": item.get("family"),
|
||||
"isPro": item.get("isPro", False),
|
||||
"isInternal": item.get("isInternal", False),
|
||||
"isMax": item.get("isMax", False),
|
||||
"isLive": item.get("isLive", False),
|
||||
"isNew": item.get("isNew", False),
|
||||
"isImageGeneration": item.get("isImageGeneration", False),
|
||||
"isAgent": item.get("isAgent", False),
|
||||
"isReasoning": item.get("isReasoning", False),
|
||||
"isFast": item.get("isFast", False),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def filter_and_process(cls, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
"""Filter and process model data"""
|
||||
return [
|
||||
cls.process_model_item(item)
|
||||
for item in data
|
||||
if cls.should_include_model(item)
|
||||
]
|
||||
|
||||
|
||||
class DataManager:
|
||||
"""Handle data loading and saving operations"""
|
||||
|
||||
@staticmethod
|
||||
def load_fallback_data(filename: str) -> List[Dict[str, Any]]:
|
||||
"""Load fallback data from local file"""
|
||||
try:
|
||||
with open(filename, "r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
except FileNotFoundError:
|
||||
print(f"Fallback file not found: {filename}")
|
||||
return []
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"Failed to parse fallback file: {e}")
|
||||
return []
|
||||
|
||||
@staticmethod
|
||||
def save_data(data: List[Dict[str, Any]], filename: str) -> bool:
|
||||
"""Save data to JSON file"""
|
||||
try:
|
||||
# Create directory if needed
|
||||
os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else ".",
|
||||
exist_ok=True)
|
||||
|
||||
# Create file if it doesn't exist
|
||||
if not os.path.exists(filename):
|
||||
open(filename, "a", encoding="utf-8").close()
|
||||
|
||||
with open(filename, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, indent=4, ensure_ascii=False)
|
||||
|
||||
print(f"Successfully saved {len(data)} models to {filename}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"Failed to save data: {e}")
|
||||
return False
|
||||
|
||||
|
||||
class YuppModelManager:
|
||||
"""Main manager class for Yupp model operations"""
|
||||
|
||||
def __init__(self, config: ModelConfig = None):
|
||||
self.config = config or ModelConfig()
|
||||
self.client = YuppAPIClient(config)
|
||||
self.processor = ModelProcessor()
|
||||
self.data_manager = DataManager()
|
||||
|
||||
def has_valid_token(self) -> bool:
|
||||
"""Check if valid token is available"""
|
||||
return self.client._get_session_token() is not None
|
||||
|
||||
def fetch_and_save_models(self, output_file: str = None) -> bool:
|
||||
"""Main method to fetch and save model data"""
|
||||
output_file = output_file or self.config.output_file
|
||||
|
||||
print("=== Yupp Model Data Fetcher ===")
|
||||
|
||||
if not self.has_valid_token():
|
||||
print("Warning: YUPP_TOKENS environment variable not set")
|
||||
return False
|
||||
|
||||
# Try to fetch from API
|
||||
data = self.client.fetch_models()
|
||||
|
||||
# Fallback to local data if API fails
|
||||
if not data:
|
||||
print("API request failed, trying fallback data...")
|
||||
data = self.data_manager.load_fallback_data(self.config.fallback_file)
|
||||
|
||||
if not data:
|
||||
print("No model data available")
|
||||
return False
|
||||
|
||||
print(f"Processing {len(data)} models...")
|
||||
processed_models = self.processor.filter_and_process(data)
|
||||
|
||||
return self.data_manager.save_data(processed_models, output_file)
|
||||
|
||||
def run_interactive(self) -> bool:
|
||||
"""Run in interactive mode (for CLI use)"""
|
||||
load_dotenv()
|
||||
|
||||
print("=== Yupp Model Data Tool ===")
|
||||
|
||||
if not self.has_valid_token():
|
||||
print("Error: YUPP_TOKENS environment variable not set")
|
||||
print("Please set YUPP_TOKENS environment variable, e.g.:")
|
||||
print("export YUPP_TOKENS='your_token_here'")
|
||||
return False
|
||||
|
||||
return self.fetch_and_save_models()
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point"""
|
||||
manager = YuppModelManager()
|
||||
success = manager.run_interactive()
|
||||
|
||||
if success:
|
||||
print("Operation completed successfully")
|
||||
else:
|
||||
print("Operation failed")
|
||||
return 1
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
exit(main())
|
||||
|
|
@ -100,4 +100,8 @@ class TimeoutError(G4FError):
|
|||
|
||||
class ConversationLimitError(G4FError):
|
||||
"""Raised when a conversation limit is reached on the provider."""
|
||||
pass
|
||||
|
||||
class ProviderException(G4FError):
|
||||
"""Raised for general provider-related exceptions."""
|
||||
pass
|
||||
Loading…
Add table
Add a link
Reference in a new issue