mirror of
https://github.com/xtekky/gpt4free.git
synced 2025-12-06 02:30:41 -08:00
~ | new folder inluding ./tooland ./testing
This commit is contained in:
parent
796e4d7b55
commit
fdd8ef1fc3
12 changed files with 168 additions and 0 deletions
163
etc/interference/app.py
Normal file
163
etc/interference/app.py
Normal file
|
|
@ -0,0 +1,163 @@
|
|||
import json
|
||||
import time
|
||||
import random
|
||||
import string
|
||||
import requests
|
||||
|
||||
from typing import Any
|
||||
from flask import Flask, request
|
||||
from flask_cors import CORS
|
||||
from transformers import AutoTokenizer
|
||||
from g4f import ChatCompletion
|
||||
|
||||
app = Flask(__name__)
|
||||
CORS(app)
|
||||
|
||||
@app.route('/chat/completions', methods=['POST'])
|
||||
def chat_completions():
|
||||
model = request.get_json().get('model', 'gpt-3.5-turbo')
|
||||
stream = request.get_json().get('stream', False)
|
||||
messages = request.get_json().get('messages')
|
||||
|
||||
response = ChatCompletion.create(model = model,
|
||||
stream = stream, messages = messages)
|
||||
|
||||
completion_id = ''.join(random.choices(string.ascii_letters + string.digits, k=28))
|
||||
completion_timestamp = int(time.time())
|
||||
|
||||
if not stream:
|
||||
return {
|
||||
'id': f'chatcmpl-{completion_id}',
|
||||
'object': 'chat.completion',
|
||||
'created': completion_timestamp,
|
||||
'model': model,
|
||||
'choices': [
|
||||
{
|
||||
'index': 0,
|
||||
'message': {
|
||||
'role': 'assistant',
|
||||
'content': response,
|
||||
},
|
||||
'finish_reason': 'stop',
|
||||
}
|
||||
],
|
||||
'usage': {
|
||||
'prompt_tokens': None,
|
||||
'completion_tokens': None,
|
||||
'total_tokens': None,
|
||||
},
|
||||
}
|
||||
|
||||
def streaming():
|
||||
for chunk in response:
|
||||
completion_data = {
|
||||
'id': f'chatcmpl-{completion_id}',
|
||||
'object': 'chat.completion.chunk',
|
||||
'created': completion_timestamp,
|
||||
'model': model,
|
||||
'choices': [
|
||||
{
|
||||
'index': 0,
|
||||
'delta': {
|
||||
'content': chunk,
|
||||
},
|
||||
'finish_reason': None,
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
content = json.dumps(completion_data, separators=(',', ':'))
|
||||
yield f'data: {content}\n\n'
|
||||
time.sleep(0.1)
|
||||
|
||||
end_completion_data: dict[str, Any] = {
|
||||
'id': f'chatcmpl-{completion_id}',
|
||||
'object': 'chat.completion.chunk',
|
||||
'created': completion_timestamp,
|
||||
'model': model,
|
||||
'choices': [
|
||||
{
|
||||
'index': 0,
|
||||
'delta': {},
|
||||
'finish_reason': 'stop',
|
||||
}
|
||||
],
|
||||
}
|
||||
content = json.dumps(end_completion_data, separators=(',', ':'))
|
||||
yield f'data: {content}\n\n'
|
||||
|
||||
return app.response_class(streaming(), mimetype='text/event-stream')
|
||||
|
||||
|
||||
# Get the embedding from huggingface
|
||||
def get_embedding(input_text, token):
|
||||
huggingface_token = token
|
||||
embedding_model = 'sentence-transformers/all-mpnet-base-v2'
|
||||
max_token_length = 500
|
||||
|
||||
# Load the tokenizer for the 'all-mpnet-base-v2' model
|
||||
tokenizer = AutoTokenizer.from_pretrained(embedding_model)
|
||||
# Tokenize the text and split the tokens into chunks of 500 tokens each
|
||||
tokens = tokenizer.tokenize(input_text)
|
||||
token_chunks = [tokens[i:i + max_token_length]
|
||||
for i in range(0, len(tokens), max_token_length)]
|
||||
|
||||
# Initialize an empty list
|
||||
embeddings = []
|
||||
|
||||
# Create embeddings for each chunk
|
||||
for chunk in token_chunks:
|
||||
# Convert the chunk tokens back to text
|
||||
chunk_text = tokenizer.convert_tokens_to_string(chunk)
|
||||
|
||||
# Use the Hugging Face API to get embeddings for the chunk
|
||||
api_url = f'https://api-inference.huggingface.co/pipeline/feature-extraction/{embedding_model}'
|
||||
headers = {'Authorization': f'Bearer {huggingface_token}'}
|
||||
chunk_text = chunk_text.replace('\n', ' ')
|
||||
|
||||
# Make a POST request to get the chunk's embedding
|
||||
response = requests.post(api_url, headers=headers, json={
|
||||
'inputs': chunk_text, 'options': {'wait_for_model': True}})
|
||||
|
||||
# Parse the response and extract the embedding
|
||||
chunk_embedding = response.json()
|
||||
# Append the embedding to the list
|
||||
embeddings.append(chunk_embedding)
|
||||
|
||||
# averaging all the embeddings
|
||||
# this isn't very effective
|
||||
# someone a better idea?
|
||||
num_embeddings = len(embeddings)
|
||||
average_embedding = [sum(x) / num_embeddings for x in zip(*embeddings)]
|
||||
embedding = average_embedding
|
||||
return embedding
|
||||
|
||||
|
||||
@app.route('/embeddings', methods=['POST'])
|
||||
def embeddings():
|
||||
input_text_list = request.get_json().get('input')
|
||||
input_text = ' '.join(map(str, input_text_list))
|
||||
token = request.headers.get('Authorization').replace('Bearer ', '')
|
||||
embedding = get_embedding(input_text, token)
|
||||
|
||||
return {
|
||||
'data': [
|
||||
{
|
||||
'embedding': embedding,
|
||||
'index': 0,
|
||||
'object': 'embedding'
|
||||
}
|
||||
],
|
||||
'model': 'text-embedding-ada-002',
|
||||
'object': 'list',
|
||||
'usage': {
|
||||
'prompt_tokens': None,
|
||||
'total_tokens': None
|
||||
}
|
||||
}
|
||||
|
||||
def main():
|
||||
app.run(host='0.0.0.0', port=1337, debug=True)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
5
etc/interference/requirements.txt
Normal file
5
etc/interference/requirements.txt
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
flask_cors
|
||||
watchdog~=3.0.0
|
||||
transformers
|
||||
tensorflow
|
||||
torch
|
||||
25
etc/testing/log_time.py
Normal file
25
etc/testing/log_time.py
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
from time import time
|
||||
|
||||
|
||||
async def log_time_async(method: callable, **kwargs):
|
||||
start = time()
|
||||
result = await method(**kwargs)
|
||||
secs = f"{round(time() - start, 2)} secs"
|
||||
if result:
|
||||
return " ".join([result, secs])
|
||||
return secs
|
||||
|
||||
|
||||
def log_time_yield(method: callable, **kwargs):
|
||||
start = time()
|
||||
result = yield from method(**kwargs)
|
||||
yield f" {round(time() - start, 2)} secs"
|
||||
|
||||
|
||||
def log_time(method: callable, **kwargs):
|
||||
start = time()
|
||||
result = method(**kwargs)
|
||||
secs = f"{round(time() - start, 2)} secs"
|
||||
if result:
|
||||
return " ".join([result, secs])
|
||||
return secs
|
||||
30
etc/testing/test_async.py
Normal file
30
etc/testing/test_async.py
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
import sys
|
||||
from pathlib import Path
|
||||
import asyncio
|
||||
|
||||
sys.path.append(str(Path(__file__).parent.parent))
|
||||
|
||||
import g4f
|
||||
from testing.test_providers import get_providers
|
||||
from testing.log_time import log_time_async
|
||||
|
||||
async def create_async(provider):
|
||||
try:
|
||||
response = await log_time_async(
|
||||
provider.create_async,
|
||||
model=g4f.models.default.name,
|
||||
messages=[{"role": "user", "content": "Hello, are you GPT 3.5?"}]
|
||||
)
|
||||
print(f"{provider.__name__}:", response)
|
||||
except Exception as e:
|
||||
print(f"{provider.__name__}: {e.__class__.__name__}: {e}")
|
||||
|
||||
async def run_async():
|
||||
responses: list = [
|
||||
create_async(provider)
|
||||
for provider in get_providers()
|
||||
if provider.working
|
||||
]
|
||||
await asyncio.gather(*responses)
|
||||
|
||||
print("Total:", asyncio.run(log_time_async(run_async)))
|
||||
27
etc/testing/test_chat_completion.py
Normal file
27
etc/testing/test_chat_completion.py
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.append(str(Path(__file__).parent.parent))
|
||||
|
||||
import g4f, asyncio
|
||||
|
||||
print("create:", end=" ", flush=True)
|
||||
for response in g4f.ChatCompletion.create(
|
||||
model=g4f.models.gpt_4_32k_0613,
|
||||
provider=g4f.Provider.Aivvm,
|
||||
messages=[{"role": "user", "content": "send a bunch of emojis. i want to test something"}],
|
||||
temperature=0.0,
|
||||
stream=True
|
||||
):
|
||||
print(response, end="", flush=True)
|
||||
print()
|
||||
|
||||
async def run_async():
|
||||
response = await g4f.ChatCompletion.create_async(
|
||||
model=g4f.models.gpt_35_turbo_16k_0613,
|
||||
provider=g4f.Provider.Aivvm,
|
||||
messages=[{"role": "user", "content": "hello!"}],
|
||||
)
|
||||
print("create_async:", response)
|
||||
|
||||
# asyncio.run(run_async())
|
||||
27
etc/testing/test_interference.py
Normal file
27
etc/testing/test_interference.py
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
# type: ignore
|
||||
import openai
|
||||
|
||||
openai.api_key = ""
|
||||
openai.api_base = "http://localhost:1337"
|
||||
|
||||
|
||||
def main():
|
||||
chat_completion = openai.ChatCompletion.create(
|
||||
model="gpt-3.5-turbo",
|
||||
messages=[{"role": "user", "content": "write a poem about a tree"}],
|
||||
stream=True,
|
||||
)
|
||||
|
||||
if isinstance(chat_completion, dict):
|
||||
# not stream
|
||||
print(chat_completion.choices[0].message.content)
|
||||
else:
|
||||
# stream
|
||||
for token in chat_completion:
|
||||
content = token["choices"][0]["delta"].get("content")
|
||||
if content != None:
|
||||
print(content, end="", flush=True)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
96
etc/testing/test_needs_auth.py
Normal file
96
etc/testing/test_needs_auth.py
Normal file
|
|
@ -0,0 +1,96 @@
|
|||
import sys
|
||||
from pathlib import Path
|
||||
import asyncio
|
||||
|
||||
sys.path.append(str(Path(__file__).parent.parent))
|
||||
|
||||
import g4f
|
||||
from testing.log_time import log_time, log_time_async, log_time_yield
|
||||
|
||||
|
||||
_providers = [
|
||||
g4f.Provider.H2o,
|
||||
g4f.Provider.You,
|
||||
g4f.Provider.HuggingChat,
|
||||
g4f.Provider.OpenAssistant,
|
||||
g4f.Provider.Bing,
|
||||
g4f.Provider.Bard
|
||||
]
|
||||
|
||||
_instruct = "Hello, are you GPT 4?."
|
||||
|
||||
_example = """
|
||||
OpenaiChat: Hello! How can I assist you today? 2.0 secs
|
||||
Bard: Hello! How can I help you today? 3.44 secs
|
||||
Bing: Hello, this is Bing. How can I help? 😊 4.14 secs
|
||||
Async Total: 4.25 secs
|
||||
|
||||
OpenaiChat: Hello! How can I assist you today? 1.85 secs
|
||||
Bard: Hello! How can I help you today? 3.38 secs
|
||||
Bing: Hello, this is Bing. How can I help? 😊 6.14 secs
|
||||
Stream Total: 11.37 secs
|
||||
|
||||
OpenaiChat: Hello! How can I help you today? 3.28 secs
|
||||
Bard: Hello there! How can I help you today? 3.58 secs
|
||||
Bing: Hello! How can I help you today? 3.28 secs
|
||||
No Stream Total: 10.14 secs
|
||||
"""
|
||||
|
||||
print("Bing: ", end="")
|
||||
for response in log_time_yield(
|
||||
g4f.ChatCompletion.create,
|
||||
model=g4f.models.default,
|
||||
messages=[{"role": "user", "content": _instruct}],
|
||||
provider=g4f.Provider.Bing,
|
||||
#cookies=g4f.get_cookies(".huggingface.co"),
|
||||
stream=True,
|
||||
auth=True
|
||||
):
|
||||
print(response, end="", flush=True)
|
||||
print()
|
||||
print()
|
||||
|
||||
|
||||
async def run_async():
|
||||
responses = [
|
||||
log_time_async(
|
||||
provider.create_async,
|
||||
model=None,
|
||||
messages=[{"role": "user", "content": _instruct}],
|
||||
)
|
||||
for provider in _providers
|
||||
]
|
||||
responses = await asyncio.gather(*responses)
|
||||
for idx, provider in enumerate(_providers):
|
||||
print(f"{provider.__name__}:", responses[idx])
|
||||
print("Async Total:", asyncio.run(log_time_async(run_async)))
|
||||
print()
|
||||
|
||||
|
||||
def run_stream():
|
||||
for provider in _providers:
|
||||
print(f"{provider.__name__}: ", end="")
|
||||
for response in log_time_yield(
|
||||
provider.create_completion,
|
||||
model=None,
|
||||
messages=[{"role": "user", "content": _instruct}],
|
||||
):
|
||||
print(response, end="", flush=True)
|
||||
print()
|
||||
print("Stream Total:", log_time(run_stream))
|
||||
print()
|
||||
|
||||
|
||||
def create_no_stream():
|
||||
for provider in _providers:
|
||||
print(f"{provider.__name__}:", end=" ")
|
||||
for response in log_time_yield(
|
||||
provider.create_completion,
|
||||
model=None,
|
||||
messages=[{"role": "user", "content": _instruct}],
|
||||
stream=False
|
||||
):
|
||||
print(response, end="")
|
||||
print()
|
||||
print("No Stream Total:", log_time(create_no_stream))
|
||||
print()
|
||||
66
etc/testing/test_providers.py
Normal file
66
etc/testing/test_providers.py
Normal file
|
|
@ -0,0 +1,66 @@
|
|||
import sys
|
||||
from pathlib import Path
|
||||
from colorama import Fore, Style
|
||||
|
||||
sys.path.append(str(Path(__file__).parent.parent))
|
||||
|
||||
from g4f import BaseProvider, models, Provider
|
||||
|
||||
logging = False
|
||||
|
||||
|
||||
def main():
|
||||
providers = get_providers()
|
||||
failed_providers = []
|
||||
|
||||
for _provider in providers:
|
||||
if _provider.needs_auth:
|
||||
continue
|
||||
print("Provider:", _provider.__name__)
|
||||
result = test(_provider)
|
||||
print("Result:", result)
|
||||
if _provider.working and not result:
|
||||
failed_providers.append(_provider)
|
||||
|
||||
print()
|
||||
|
||||
if failed_providers:
|
||||
print(f"{Fore.RED + Style.BRIGHT}Failed providers:{Style.RESET_ALL}")
|
||||
for _provider in failed_providers:
|
||||
print(f"{Fore.RED}{_provider.__name__}")
|
||||
else:
|
||||
print(f"{Fore.GREEN + Style.BRIGHT}All providers are working")
|
||||
|
||||
|
||||
def get_providers() -> list[type[BaseProvider]]:
|
||||
providers = dir(Provider)
|
||||
providers = [getattr(Provider, provider) for provider in providers if provider != "RetryProvider"]
|
||||
providers = [provider for provider in providers if isinstance(provider, type)]
|
||||
return [provider for provider in providers if issubclass(provider, BaseProvider)]
|
||||
|
||||
|
||||
def create_response(_provider: type[BaseProvider]) -> str:
|
||||
model = models.gpt_35_turbo.name if _provider.supports_gpt_35_turbo else models.default.name
|
||||
response = _provider.create_completion(
|
||||
model=model,
|
||||
messages=[{"role": "user", "content": "Hello, who are you? Answer in detail much as possible."}],
|
||||
stream=False,
|
||||
)
|
||||
return "".join(response)
|
||||
|
||||
|
||||
def test(_provider: type[BaseProvider]) -> bool:
|
||||
try:
|
||||
response = create_response(_provider)
|
||||
assert type(response) is str
|
||||
assert len(response) > 0
|
||||
return response
|
||||
except Exception as e:
|
||||
if logging:
|
||||
print(e)
|
||||
return False
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
114
etc/tool/create_provider.py
Normal file
114
etc/tool/create_provider.py
Normal file
|
|
@ -0,0 +1,114 @@
|
|||
|
||||
import sys, re
|
||||
from pathlib import Path
|
||||
from os import path
|
||||
|
||||
sys.path.append(str(Path(__file__).parent.parent))
|
||||
|
||||
import g4f
|
||||
|
||||
def read_code(text):
|
||||
match = re.search(r"```(python|py|)\n(?P<code>[\S\s]+?)\n```", text)
|
||||
if match:
|
||||
return match.group("code")
|
||||
|
||||
def read_result(result):
|
||||
lines = []
|
||||
for line in result.split("\n"):
|
||||
if (line.startswith("```")):
|
||||
break
|
||||
if (line):
|
||||
lines.append(line)
|
||||
explanation = "\n".join(lines) if lines else ""
|
||||
return explanation, read_code(result)
|
||||
|
||||
def input_command():
|
||||
print("Enter/Paste the cURL command. Ctrl-D or Ctrl-Z ( windows ) to save it.")
|
||||
contents = []
|
||||
while True:
|
||||
try:
|
||||
line = input()
|
||||
except:
|
||||
break
|
||||
contents.append(line)
|
||||
return "\n".join(contents)
|
||||
|
||||
name = input("Name: ")
|
||||
provider_path = f"g4f/Provider/{name}.py"
|
||||
|
||||
example = """
|
||||
from __future__ import annotations
|
||||
|
||||
from aiohttp import ClientSession
|
||||
|
||||
from ..typing import AsyncGenerator
|
||||
from .base_provider import AsyncGeneratorProvider
|
||||
from .helper import format_prompt
|
||||
|
||||
|
||||
class ChatgptDuo(AsyncGeneratorProvider):
|
||||
url = "https://chat-gpt.com"
|
||||
supports_gpt_35_turbo = True
|
||||
working = True
|
||||
|
||||
@classmethod
|
||||
async def create_async_generator(
|
||||
cls,
|
||||
model: str,
|
||||
messages: list[dict[str, str]],
|
||||
**kwargs
|
||||
) -> AsyncGenerator:
|
||||
headers = {
|
||||
"authority": "chat-gpt.com",
|
||||
"accept": "application/json",
|
||||
"origin": cls.url,
|
||||
"referer": f"{cls.url}/chat",
|
||||
}
|
||||
async with ClientSession(headers=headers) as session:
|
||||
prompt = format_prompt(messages),
|
||||
data = {
|
||||
"prompt": prompt,
|
||||
"purpose": "ask",
|
||||
}
|
||||
async with session.post(cls.url + "/api/chat", json=data) as response:
|
||||
response.raise_for_status()
|
||||
async for stream in response.content:
|
||||
if stream:
|
||||
yield stream.decode()
|
||||
"""
|
||||
|
||||
if not path.isfile(provider_path):
|
||||
command = input_command()
|
||||
|
||||
prompt = f"""
|
||||
Create a provider from a cURL command. The command is:
|
||||
```bash
|
||||
{command}
|
||||
```
|
||||
A example for a provider:
|
||||
```py
|
||||
{example}
|
||||
```
|
||||
The name for the provider class:
|
||||
{name}
|
||||
Replace "hello" with `format_prompt(messages)`.
|
||||
And replace "gpt-3.5-turbo" with `model`.
|
||||
"""
|
||||
|
||||
print("Create code...")
|
||||
response = g4f.ChatCompletion.create(
|
||||
model=g4f.models.gpt_35_long,
|
||||
messages=[{"role": "user", "content": prompt}],
|
||||
auth=True,
|
||||
timeout=120,
|
||||
)
|
||||
print(response)
|
||||
explanation, code = read_result(response)
|
||||
if code:
|
||||
with open(provider_path, "w") as file:
|
||||
file.write(code)
|
||||
with open(f"g4f/Provider/__init__.py", "a") as file:
|
||||
file.write(f"\nfrom .{name} import {name}")
|
||||
else:
|
||||
with open(provider_path, "r") as file:
|
||||
code = file.read()
|
||||
33
etc/tool/provider_init.py
Normal file
33
etc/tool/provider_init.py
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
from pathlib import Path
|
||||
|
||||
|
||||
def main():
|
||||
content = create_content()
|
||||
with open("g4f/provider/__init__.py", "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
|
||||
|
||||
def create_content():
|
||||
path = Path()
|
||||
paths = path.glob("g4f/provider/*.py")
|
||||
paths = [p for p in paths if p.name not in ["__init__.py", "base_provider.py"]]
|
||||
classnames = [p.stem for p in paths]
|
||||
|
||||
import_lines = [f"from .{name} import {name}" for name in classnames]
|
||||
import_content = "\n".join(import_lines)
|
||||
|
||||
classnames.insert(0, "BaseProvider")
|
||||
all_content = [f' "{name}"' for name in classnames]
|
||||
all_content = ",\n".join(all_content)
|
||||
all_content = f"__all__ = [\n{all_content},\n]"
|
||||
|
||||
return f"""from .base_provider import BaseProvider
|
||||
{import_content}
|
||||
|
||||
|
||||
{all_content}
|
||||
"""
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
158
etc/tool/readme_table.py
Normal file
158
etc/tool/readme_table.py
Normal file
|
|
@ -0,0 +1,158 @@
|
|||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlparse
|
||||
|
||||
sys.path.append(str(Path(__file__).parent.parent))
|
||||
|
||||
import asyncio
|
||||
from g4f import models
|
||||
from g4f.Provider.base_provider import AsyncProvider, BaseProvider
|
||||
from g4f.Provider.retry_provider import RetryProvider
|
||||
from testing.test_providers import get_providers
|
||||
|
||||
logging = False
|
||||
|
||||
|
||||
def print_imports():
|
||||
print("##### Providers:")
|
||||
print("```py")
|
||||
print("from g4f.Provider import (")
|
||||
for _provider in get_providers():
|
||||
if _provider.working:
|
||||
print(f" {_provider.__name__},")
|
||||
|
||||
print(")")
|
||||
print("# Usage:")
|
||||
print("response = g4f.ChatCompletion.create(..., provider=ProviderName)")
|
||||
print("```")
|
||||
print()
|
||||
print()
|
||||
|
||||
def print_async():
|
||||
print("##### Async support:")
|
||||
print("```py")
|
||||
print("_providers = [")
|
||||
for _provider in get_providers():
|
||||
if _provider.working and issubclass(_provider, AsyncProvider):
|
||||
print(f" g4f.Provider.{_provider.__name__},")
|
||||
print("]")
|
||||
print("```")
|
||||
print()
|
||||
print()
|
||||
|
||||
|
||||
async def test_async(provider: type[BaseProvider]):
|
||||
if not provider.working:
|
||||
return False
|
||||
model = models.gpt_35_turbo.name if provider.supports_gpt_35_turbo else models.default.name
|
||||
messages = [{"role": "user", "content": "Hello Assistant!"}]
|
||||
try:
|
||||
if issubclass(provider, AsyncProvider):
|
||||
response = await provider.create_async(model=model, messages=messages)
|
||||
else:
|
||||
response = provider.create_completion(model=model, messages=messages, stream=False)
|
||||
return True if response else False
|
||||
except Exception as e:
|
||||
if logging:
|
||||
print(f"{provider.__name__}: {e.__class__.__name__}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def test_async_list(providers: list[type[BaseProvider]]):
|
||||
responses: list = [
|
||||
test_async(_provider)
|
||||
for _provider in providers
|
||||
]
|
||||
return await asyncio.gather(*responses)
|
||||
|
||||
|
||||
def print_providers():
|
||||
lines = [
|
||||
"| Website| Provider| gpt-3.5 | gpt-4 | Streaming | Asynchron | Status | Auth |",
|
||||
"| ------ | ------- | ------- | ----- | --------- | --------- | ------ | ---- |",
|
||||
]
|
||||
|
||||
providers = get_providers()
|
||||
responses = asyncio.run(test_async_list(providers))
|
||||
|
||||
for is_working in (True, False):
|
||||
for idx, _provider in enumerate(providers):
|
||||
if is_working != _provider.working:
|
||||
continue
|
||||
if _provider == RetryProvider:
|
||||
continue
|
||||
|
||||
netloc = urlparse(_provider.url).netloc
|
||||
website = f"[{netloc}]({_provider.url})"
|
||||
|
||||
provider_name = f"`g4f.Provider.{_provider.__name__}`"
|
||||
|
||||
has_gpt_35 = "✔️" if _provider.supports_gpt_35_turbo else "❌"
|
||||
has_gpt_4 = "✔️" if _provider.supports_gpt_4 else "❌"
|
||||
stream = "✔️" if _provider.supports_stream else "❌"
|
||||
can_async = "✔️" if issubclass(_provider, AsyncProvider) else "❌"
|
||||
if _provider.working:
|
||||
status = ''
|
||||
if responses[idx]:
|
||||
status = ''
|
||||
else:
|
||||
status = ''
|
||||
else:
|
||||
status = ''
|
||||
auth = "✔️" if _provider.needs_auth else "❌"
|
||||
|
||||
lines.append(
|
||||
f"| {website} | {provider_name} | {has_gpt_35} | {has_gpt_4} | {stream} | {can_async} | {status} | {auth} |"
|
||||
)
|
||||
print("\n".join(lines))
|
||||
|
||||
def print_models():
|
||||
base_provider_names = {
|
||||
"cohere": "Cohere",
|
||||
"google": "Google",
|
||||
"openai": "OpenAI",
|
||||
"anthropic": "Anthropic",
|
||||
"replicate": "Replicate",
|
||||
"huggingface": "Huggingface",
|
||||
}
|
||||
provider_urls = {
|
||||
"Bard": "https://bard.google.com/",
|
||||
"H2o": "https://www.h2o.ai/",
|
||||
"Vercel": "https://sdk.vercel.ai/",
|
||||
}
|
||||
|
||||
lines = [
|
||||
"| Model | Base Provider | Provider | Website |",
|
||||
"| ----- | ------------- | -------- | ------- |",
|
||||
]
|
||||
|
||||
_models = get_models()
|
||||
for model in _models:
|
||||
if not model.best_provider or model.best_provider.__name__ not in provider_urls:
|
||||
continue
|
||||
|
||||
name = re.split(r":|/", model.name)[-1]
|
||||
base_provider = base_provider_names[model.base_provider]
|
||||
provider_name = f"g4f.provider.{model.best_provider.__name__}"
|
||||
provider_url = provider_urls[model.best_provider.__name__]
|
||||
netloc = urlparse(provider_url).netloc
|
||||
website = f"[{netloc}]({provider_url})"
|
||||
|
||||
lines.append(f"| {name} | {base_provider} | {provider_name} | {website} |")
|
||||
|
||||
print("\n".join(lines))
|
||||
|
||||
|
||||
def get_models():
|
||||
_models = [item[1] for item in models.__dict__.items()]
|
||||
_models = [model for model in _models if type(model) is models.Model]
|
||||
return [model for model in _models if model.name not in ["gpt-3.5-turbo", "gpt-4"]]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print_imports()
|
||||
print_async()
|
||||
print_providers()
|
||||
print("\n", "-" * 50, "\n")
|
||||
print_models()
|
||||
103
etc/tool/vercel.py
Normal file
103
etc/tool/vercel.py
Normal file
|
|
@ -0,0 +1,103 @@
|
|||
import json
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
import quickjs
|
||||
from curl_cffi import requests
|
||||
|
||||
session = requests.Session(impersonate="chrome107")
|
||||
|
||||
|
||||
def get_model_info() -> dict[str, Any]:
|
||||
url = "https://sdk.vercel.ai"
|
||||
response = session.get(url)
|
||||
html = response.text
|
||||
paths_regex = r"static\/chunks.+?\.js"
|
||||
separator_regex = r'"\]\)<\/script><script>self\.__next_f\.push\(\[.,"'
|
||||
|
||||
paths = re.findall(paths_regex, html)
|
||||
paths = [re.sub(separator_regex, "", path) for path in paths]
|
||||
paths = list(set(paths))
|
||||
|
||||
urls = [f"{url}/_next/{path}" for path in paths]
|
||||
scripts = [session.get(url).text for url in urls]
|
||||
|
||||
for script in scripts:
|
||||
models_regex = r'let .="\\n\\nHuman:\",r=(.+?),.='
|
||||
matches = re.findall(models_regex, script)
|
||||
|
||||
if matches:
|
||||
models_str = matches[0]
|
||||
stop_sequences_regex = r"(?<=stopSequences:{value:\[)\D(?<!\])"
|
||||
models_str = re.sub(
|
||||
stop_sequences_regex, re.escape('"\\n\\nHuman:"'), models_str
|
||||
)
|
||||
|
||||
context = quickjs.Context() # type: ignore
|
||||
json_str: str = context.eval(f"({models_str})").json() # type: ignore
|
||||
return json.loads(json_str) # type: ignore
|
||||
|
||||
return {}
|
||||
|
||||
|
||||
def convert_model_info(models: dict[str, Any]) -> dict[str, Any]:
|
||||
model_info: dict[str, Any] = {}
|
||||
for model_name, params in models.items():
|
||||
default_params = params_to_default_params(params["parameters"])
|
||||
model_info[model_name] = {"id": params["id"], "default_params": default_params}
|
||||
return model_info
|
||||
|
||||
|
||||
def params_to_default_params(parameters: dict[str, Any]):
|
||||
defaults: dict[str, Any] = {}
|
||||
for key, parameter in parameters.items():
|
||||
if key == "maximumLength":
|
||||
key = "maxTokens"
|
||||
defaults[key] = parameter["value"]
|
||||
return defaults
|
||||
|
||||
|
||||
def get_model_names(model_info: dict[str, Any]):
|
||||
model_names = model_info.keys()
|
||||
model_names = [
|
||||
name
|
||||
for name in model_names
|
||||
if name not in ["openai:gpt-4", "openai:gpt-3.5-turbo"]
|
||||
]
|
||||
model_names.sort()
|
||||
return model_names
|
||||
|
||||
|
||||
def print_providers(model_names: list[str]):
|
||||
for name in model_names:
|
||||
split_name = re.split(r":|/", name)
|
||||
base_provider = split_name[0]
|
||||
variable_name = split_name[-1].replace("-", "_").replace(".", "")
|
||||
line = f'{variable_name} = Model(name="{name}", base_provider="{base_provider}", best_provider=Vercel,)\n'
|
||||
print(line)
|
||||
|
||||
|
||||
def print_convert(model_names: list[str]):
|
||||
for name in model_names:
|
||||
split_name = re.split(r":|/", name)
|
||||
key = split_name[-1]
|
||||
variable_name = split_name[-1].replace("-", "_").replace(".", "")
|
||||
# "claude-instant-v1": claude_instant_v1,
|
||||
line = f' "{key}": {variable_name},'
|
||||
print(line)
|
||||
|
||||
|
||||
def main():
|
||||
model_info = get_model_info()
|
||||
model_info = convert_model_info(model_info)
|
||||
print(json.dumps(model_info, indent=2))
|
||||
|
||||
model_names = get_model_names(model_info)
|
||||
print("-------" * 40)
|
||||
print_providers(model_names)
|
||||
print("-------" * 40)
|
||||
print_convert(model_names)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue