Add unitests for the client

Fix: async generator ignored GeneratorExit
Fix: ResourceWarning: unclosed event loop
This commit is contained in:
Heiner Lohaus 2024-02-14 09:21:57 +01:00
parent 151f8b8b0e
commit e1a0b3ffa2
8 changed files with 172 additions and 132 deletions

View file

@ -8,6 +8,7 @@ import unittest
import g4f
from g4f import ChatCompletion
from g4f.client import Client
from .mocks import ProviderMock, AsyncProviderMock, AsyncGeneratorProviderMock
DEFAULT_MESSAGES = [{'role': 'user', 'content': 'Hello'}]
@ -24,11 +25,16 @@ class TestChatCompletion(unittest.TestCase):
def test_create(self):
result = ChatCompletion.create(g4f.models.default, DEFAULT_MESSAGES, AsyncProviderMock)
self.assertEqual("Mock",result)
self.assertEqual("Mock", result)
def test_create_generator(self):
result = ChatCompletion.create(g4f.models.default, DEFAULT_MESSAGES, AsyncGeneratorProviderMock)
self.assertEqual("Mock",result)
self.assertEqual("Mock", result)
def test_await_callback(self):
client = Client(provider=AsyncGeneratorProviderMock)
response = client.chat.completions.create(DEFAULT_MESSAGES, "", max_tokens=0)
self.assertEqual("Mock", response.choices[0].message.content)
class TestChatCompletionAsync(unittest.IsolatedAsyncioTestCase):

54
etc/unittest/client.py Normal file
View file

@ -0,0 +1,54 @@
import unittest
from g4f.client import Client, ChatCompletion, ChatCompletionChunk
from .mocks import AsyncGeneratorProviderMock, ModelProviderMock, YieldProviderMock
DEFAULT_MESSAGES = [{'role': 'user', 'content': 'Hello'}]
class TestPassModel(unittest.TestCase):
def test_response(self):
client = Client(provider=AsyncGeneratorProviderMock)
response = client.chat.completions.create(DEFAULT_MESSAGES, "")
self.assertIsInstance(response, ChatCompletion)
self.assertEqual("Mock", response.choices[0].message.content)
def test_pass_model(self):
client = Client(provider=ModelProviderMock)
response = client.chat.completions.create(DEFAULT_MESSAGES, "Hello")
self.assertIsInstance(response, ChatCompletion)
self.assertEqual("Hello", response.choices[0].message.content)
def test_max_tokens(self):
client = Client(provider=YieldProviderMock)
messages = [{'role': 'user', 'content': chunk} for chunk in ["How ", "are ", "you", "?"]]
response = client.chat.completions.create(messages, "Hello", max_tokens=1)
self.assertIsInstance(response, ChatCompletion)
self.assertEqual("How ", response.choices[0].message.content)
response = client.chat.completions.create(messages, "Hello", max_tokens=2)
self.assertIsInstance(response, ChatCompletion)
self.assertEqual("How are ", response.choices[0].message.content)
def test_max_stream(self):
client = Client(provider=YieldProviderMock)
messages = [{'role': 'user', 'content': chunk} for chunk in ["How ", "are ", "you", "?"]]
response = client.chat.completions.create(messages, "Hello", stream=True)
for chunk in response:
self.assertIsInstance(chunk, ChatCompletionChunk)
self.assertIsInstance(chunk.choices[0].delta.content, str)
messages = [{'role': 'user', 'content': chunk} for chunk in ["You ", "You ", "Other", "?"]]
response = client.chat.completions.create(messages, "Hello", stream=True, max_tokens=2)
response = list(response)
self.assertEqual(len(response), 2)
for chunk in response:
self.assertEqual(chunk.choices[0].delta.content, "You ")
def no_test_stop(self):
client = Client(provider=YieldProviderMock)
messages = [{'role': 'user', 'content': chunk} for chunk in ["How ", "are ", "you", "?"]]
response = client.chat.completions.create(messages, "Hello", stop=["and"])
self.assertIsInstance(response, ChatCompletion)
self.assertEqual("How are you?", response.choices[0].message.content)
if __name__ == '__main__':
unittest.main()

View file

@ -7,10 +7,10 @@ class ProviderMock(AbstractProvider):
model, messages, stream, **kwargs
):
yield "Mock"
class AsyncProviderMock(AsyncProvider):
working = True
async def create_async(
model, messages, **kwargs
):
@ -18,16 +18,25 @@ class AsyncProviderMock(AsyncProvider):
class AsyncGeneratorProviderMock(AsyncGeneratorProvider):
working = True
async def create_async_generator(
model, messages, stream, **kwargs
):
yield "Mock"
class ModelProviderMock(AbstractProvider):
working = True
def create_completion(
model, messages, stream, **kwargs
):
yield model
yield model
class YieldProviderMock(AsyncGeneratorProvider):
working = True
async def create_async_generator(
model, messages, stream, **kwargs
):
for message in messages:
yield message["content"]