gpt4free/g4f/gui/server/api.py
hlohaus ab04d1d894 Increase max token in HuggingfaceAPI
Restore browser instance on start up errors in nodriver
Restored instances can be used as usual or to stop the browser
Add demo modus to web ui for HuggingSpace
Add rate limit support to web ui. Simply install flask_limiter
Add home for demo with Access Token input and validation
Add stripped model list for demo
Add ignores for encoding error in web_search and file upload
2025-01-26 02:48:43 +01:00

244 lines
No EOL
10 KiB
Python

from __future__ import annotations
import logging
import os
import asyncio
from typing import Iterator
from flask import send_from_directory
from inspect import signature
from ...errors import VersionNotFoundError
from ...image import ImagePreview, ImageResponse, copy_images, ensure_images_dir, images_dir
from ...tools.run_tools import iter_run_tools
from ...Provider import ProviderUtils, __providers__
from ...providers.base_provider import ProviderModelMixin
from ...providers.retry_provider import IterListProvider
from ...providers.response import BaseConversation, JsonConversation, FinishReason, Usage, Reasoning
from ...providers.response import SynthesizeData, TitleGeneration, RequestLogin, Parameters
from ... import version, models
from ... import ChatCompletion, get_model_and_provider
from ... import debug
logger = logging.getLogger(__name__)
conversations: dict[dict[str, BaseConversation]] = {}
class Api:
@staticmethod
def get_models():
return [{
"name": model.name,
"image": isinstance(model, models.ImageModel),
"vision": isinstance(model, models.VisionModel),
"providers": [
getattr(provider, "parent", provider.__name__)
for provider in providers
if provider.working
]
}
for model, providers in models.__models__.values()]
@staticmethod
def get_provider_models(provider: str, api_key: str = None, api_base: str = None):
if provider in ProviderUtils.convert:
provider = ProviderUtils.convert[provider]
if issubclass(provider, ProviderModelMixin):
if api_key is not None and "api_key" in signature(provider.get_models).parameters:
models = provider.get_models(api_key=api_key, api_base=api_base)
else:
models = provider.get_models()
return [
{
"model": model,
"default": model == provider.default_model,
"vision": getattr(provider, "default_vision_model", None) == model or model in getattr(provider, "vision_models", []),
"image": False if provider.image_models is None else model in provider.image_models,
}
for model in models
]
return []
@staticmethod
def get_providers() -> dict[str, str]:
return [{
"name": provider.__name__,
"label": provider.label if hasattr(provider, "label") else provider.__name__,
"parent": getattr(provider, "parent", None),
"image": bool(getattr(provider, "image_models", False)),
"vision": getattr(provider, "default_vision_model", None) is not None,
"auth": provider.needs_auth,
"login_url": getattr(provider, "login_url", None),
} for provider in __providers__ if provider.working]
@staticmethod
def get_version() -> dict:
try:
current_version = version.utils.current_version
except VersionNotFoundError:
current_version = None
return {
"version": current_version,
"latest_version": version.utils.latest_version,
}
def serve_images(self, name):
ensure_images_dir()
return send_from_directory(os.path.abspath(images_dir), name)
def _prepare_conversation_kwargs(self, json_data: dict, kwargs: dict):
model = json_data.get('model')
provider = json_data.get('provider')
messages = json_data.get('messages')
api_key = json_data.get("api_key")
if api_key:
kwargs["api_key"] = api_key
api_base = json_data.get("api_base")
if api_base:
kwargs["api_base"] = api_base
kwargs["tool_calls"] = [{
"function": {
"name": "bucket_tool"
},
"type": "function"
}]
web_search = json_data.get('web_search')
if web_search:
kwargs["web_search"] = web_search
action = json_data.get('action')
if action == "continue":
kwargs["tool_calls"].append({
"function": {
"name": "continue_tool"
},
"type": "function"
})
conversation = json_data.get("conversation")
if conversation is not None:
kwargs["conversation"] = JsonConversation(**conversation)
else:
conversation_id = json_data.get("conversation_id")
if conversation_id and provider:
if provider in conversations and conversation_id in conversations[provider]:
kwargs["conversation"] = conversations[provider][conversation_id]
if json_data.get("ignored"):
kwargs["ignored"] = json_data["ignored"]
if json_data.get("action"):
kwargs["action"] = json_data["action"]
return {
"model": model,
"provider": provider,
"messages": messages,
"stream": True,
"ignore_stream": True,
"return_conversation": True,
**kwargs
}
def _create_response_stream(self, kwargs: dict, conversation_id: str, provider: str, download_images: bool = True) -> Iterator:
def decorated_log(text: str):
debug.logs.append(text)
if debug.logging:
debug.log_handler(text)
debug.log = decorated_log
proxy = os.environ.get("G4F_PROXY")
provider = kwargs.get("provider")
try:
model, provider_handler = get_model_and_provider(
kwargs.get("model"), provider,
stream=True,
ignore_stream=True,
logging=False,
has_images="images" in kwargs,
)
except Exception as e:
logger.exception(e)
yield self._format_json('error', get_error_message(e))
return
params = {
**(provider_handler.get_parameters(as_json=True) if hasattr(provider_handler, "get_parameters") else {}),
"model": model,
"messages": kwargs.get("messages"),
}
if isinstance(kwargs.get("conversation"), JsonConversation):
params["conversation"] = kwargs.get("conversation").get_dict()
else:
params["conversation_id"] = conversation_id
if kwargs.get("api_key") is not None:
params["api_key"] = kwargs["api_key"]
yield self._format_json("parameters", params)
first = True
try:
result = iter_run_tools(ChatCompletion.create, **{**kwargs, "model": model, "provider": provider_handler})
for chunk in result:
if first:
first = False
yield self.handle_provider(provider_handler, model)
if isinstance(chunk, BaseConversation):
if provider is not None:
if provider not in conversations:
conversations[provider] = {}
conversations[provider][conversation_id] = chunk
if isinstance(chunk, JsonConversation):
yield self._format_json("conversation", {
provider.__name__ if isinstance(provider, type) else provider: chunk.get_dict()
})
else:
yield self._format_json("conversation_id", conversation_id)
elif isinstance(chunk, Exception):
logger.exception(chunk)
yield self._format_json("message", get_error_message(chunk))
elif isinstance(chunk, ImagePreview):
yield self._format_json("preview", chunk.to_string())
elif isinstance(chunk, ImageResponse):
images = chunk
if download_images:
images = asyncio.run(copy_images(chunk.get_list(), chunk.get("cookies"), proxy))
images = ImageResponse(images, chunk.alt)
yield self._format_json("content", str(images))
elif isinstance(chunk, SynthesizeData):
yield self._format_json("synthesize", chunk.get_dict())
elif isinstance(chunk, TitleGeneration):
yield self._format_json("title", chunk.title)
elif isinstance(chunk, RequestLogin):
yield self._format_json("login", str(chunk))
elif isinstance(chunk, Parameters):
yield self._format_json("parameters", chunk.get_dict())
elif isinstance(chunk, FinishReason):
yield self._format_json("finish", chunk.get_dict())
elif isinstance(chunk, Usage):
yield self._format_json("usage", chunk.get_dict())
elif isinstance(chunk, Reasoning):
yield self._format_json("reasoning", token=chunk.token, status=chunk.status)
else:
yield self._format_json("content", str(chunk))
if debug.logs:
for log in debug.logs:
yield self._format_json("log", str(log))
debug.logs = []
except Exception as e:
logger.exception(e)
yield self._format_json('error', get_error_message(e))
if first:
yield self.handle_provider(provider_handler, model)
def _format_json(self, response_type: str, content = None, **kwargs):
if content is not None:
return {
'type': response_type,
response_type: content,
}
return {
'type': response_type,
**kwargs
}
def handle_provider(self, provider_handler, model):
if isinstance(provider_handler, IterListProvider) and provider_handler.last_provider is not None:
provider_handler = provider_handler.last_provider
if not model and hasattr(provider_handler, "last_model") and provider_handler.last_model is not None:
model = provider_handler.last_model
return self._format_json("provider", {**provider_handler.get_dict(), "model": model})
def get_error_message(exception: Exception) -> str:
return f"{type(exception).__name__}: {exception}"