mirror of
https://github.com/xtekky/gpt4free.git
synced 2025-12-05 18:20:35 -08:00
* IterListProvider support for generating images * Add missing get_har_files import in Copilot * Fix typo in dall-e-3 model name * Add image client unittests * Add MicrosoftDesigner provider * Import MicrosoftDesigner and add it to the model list
167 lines
No EOL
6.8 KiB
Python
167 lines
No EOL
6.8 KiB
Python
from __future__ import annotations
|
|
|
|
import uuid
|
|
import aiohttp
|
|
import random
|
|
import asyncio
|
|
import json
|
|
|
|
from ...image import ImageResponse
|
|
from ...errors import MissingRequirementsError, NoValidHarFileError
|
|
from ...typing import AsyncResult, Messages
|
|
from ...requests.raise_for_status import raise_for_status
|
|
from ...requests.aiohttp import get_connector
|
|
from ...requests import get_nodriver
|
|
from ..Copilot import get_headers, get_har_files
|
|
from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin
|
|
from ..helper import get_random_hex
|
|
from ... import debug
|
|
|
|
class MicrosoftDesigner(AsyncGeneratorProvider, ProviderModelMixin):
|
|
label = "Microsoft Designer"
|
|
url = "https://designer.microsoft.com"
|
|
working = True
|
|
needs_auth = True
|
|
default_image_model = "dall-e-3"
|
|
image_models = [default_image_model, "1024x1024", "1024x1792", "1792x1024"]
|
|
models = image_models
|
|
|
|
@classmethod
|
|
async def create_async_generator(
|
|
cls,
|
|
model: str,
|
|
messages: Messages,
|
|
prompt: str = None,
|
|
proxy: str = None,
|
|
**kwargs
|
|
) -> AsyncResult:
|
|
image_size = "1024x1024"
|
|
if model != cls.default_image_model and model in cls.image_models:
|
|
image_size = model
|
|
yield await cls.generate(messages[-1]["content"] if prompt is None else prompt, image_size, proxy)
|
|
|
|
@classmethod
|
|
async def generate(cls, prompt: str, image_size: str, proxy: str = None) -> ImageResponse:
|
|
try:
|
|
access_token, user_agent = readHAR("https://designerapp.officeapps.live.com")
|
|
except NoValidHarFileError as h:
|
|
debug.log(f"{cls.__name__}: {h}")
|
|
try:
|
|
access_token, user_agent = await get_access_token_and_user_agent(cls.url, proxy)
|
|
except MissingRequirementsError:
|
|
raise h
|
|
images = await create_images(prompt, access_token, user_agent, image_size, proxy)
|
|
return ImageResponse(images, prompt)
|
|
|
|
async def create_images(prompt: str, access_token: str, user_agent: str, image_size: str, proxy: str = None, seed: int = None):
|
|
url = 'https://designerapp.officeapps.live.com/designerapp/DallE.ashx?action=GetDallEImagesCogSci'
|
|
if seed is None:
|
|
seed = random.randint(0, 10000)
|
|
|
|
headers = {
|
|
"User-Agent": user_agent,
|
|
"Accept": "application/json, text/plain, */*",
|
|
"Accept-Language": "en-US",
|
|
'Authorization': f'Bearer {access_token}',
|
|
"AudienceGroup": "Production",
|
|
"Caller": "DesignerApp",
|
|
"ClientId": "b5c2664a-7e9b-4a7a-8c9a-cd2c52dcf621",
|
|
"SessionId": str(uuid.uuid4()),
|
|
"UserId": get_random_hex(16),
|
|
"ContainerId": "1e2843a7-2a98-4a6c-93f2-42002de5c478",
|
|
"FileToken": "9f1a4cb7-37e7-4c90-b44d-cb61cfda4bb8",
|
|
"x-upload-to-storage-das": "1",
|
|
"traceparent": "",
|
|
"X-DC-Hint": "FranceCentral",
|
|
"Platform": "Web",
|
|
"HostApp": "DesignerApp",
|
|
"ReleaseChannel": "",
|
|
"IsSignedInUser": "true",
|
|
"Locale": "de-DE",
|
|
"UserType": "MSA",
|
|
"x-req-start": "2615401",
|
|
"ClientBuild": "1.0.20241120.9",
|
|
"ClientName": "DesignerApp",
|
|
"Sec-Fetch-Dest": "empty",
|
|
"Sec-Fetch-Mode": "cors",
|
|
"Sec-Fetch-Site": "cross-site",
|
|
"Pragma": "no-cache",
|
|
"Cache-Control": "no-cache",
|
|
"Referer": "https://designer.microsoft.com/"
|
|
}
|
|
|
|
form_data = aiohttp.FormData()
|
|
form_data.add_field('dalle-caption', prompt)
|
|
form_data.add_field('dalle-scenario-name', 'TextToImage')
|
|
form_data.add_field('dalle-batch-size', '4')
|
|
form_data.add_field('dalle-image-response-format', 'UrlWithBase64Thumbnail')
|
|
form_data.add_field('dalle-seed', seed)
|
|
form_data.add_field('ClientFlights', 'EnableBICForDALLEFlight')
|
|
form_data.add_field('dalle-hear-back-in-ms', 1000)
|
|
form_data.add_field('dalle-include-b64-thumbnails', 'true')
|
|
form_data.add_field('dalle-aspect-ratio-scaling-factor-b64-thumbnails', 0.3)
|
|
form_data.add_field('dalle-image-size', image_size)
|
|
|
|
async with aiohttp.ClientSession(connector=get_connector(proxy=proxy)) as session:
|
|
async with session.post(url, headers=headers, data=form_data) as response:
|
|
await raise_for_status(response)
|
|
response_data = await response.json()
|
|
form_data.add_field('dalle-boost-count', response_data.get('dalle-boost-count', 0))
|
|
polling_meta_data = response_data.get('polling_response', {}).get('polling_meta_data', {})
|
|
form_data.add_field('dalle-poll-url', polling_meta_data.get('poll_url', ''))
|
|
|
|
while True:
|
|
await asyncio.sleep(polling_meta_data.get('poll_interval', 1000) / 1000)
|
|
async with session.post(url, headers=headers, data=form_data) as response:
|
|
await raise_for_status(response)
|
|
response_data = await response.json()
|
|
images = [image["ImageUrl"] for image in response_data.get('image_urls_thumbnail', [])]
|
|
if images:
|
|
return images
|
|
|
|
def readHAR(url: str) -> tuple[str, str]:
|
|
api_key = None
|
|
user_agent = None
|
|
for path in get_har_files():
|
|
with open(path, 'rb') as file:
|
|
try:
|
|
harFile = json.loads(file.read())
|
|
except json.JSONDecodeError:
|
|
# Error: not a HAR file!
|
|
continue
|
|
for v in harFile['log']['entries']:
|
|
if v['request']['url'].startswith(url):
|
|
v_headers = get_headers(v)
|
|
if "authorization" in v_headers:
|
|
api_key = v_headers["authorization"].split(maxsplit=1).pop()
|
|
if "user-agent" in v_headers:
|
|
user_agent = v_headers["user-agent"]
|
|
if api_key is None:
|
|
raise NoValidHarFileError("No access token found in .har files")
|
|
|
|
return api_key, user_agent
|
|
|
|
async def get_access_token_and_user_agent(url: str, proxy: str = None):
|
|
browser = await get_nodriver(proxy=proxy)
|
|
page = await browser.get(url)
|
|
user_agent = await page.evaluate("navigator.userAgent")
|
|
access_token = None
|
|
while access_token is None:
|
|
access_token = await page.evaluate("""
|
|
(() => {
|
|
for (var i = 0; i < localStorage.length; i++) {
|
|
try {
|
|
item = JSON.parse(localStorage.getItem(localStorage.key(i)));
|
|
if (item.credentialType == "AccessToken"
|
|
&& item.expiresOn > Math.floor(Date.now() / 1000)
|
|
&& item.target.includes("designerappservice")) {
|
|
return item.secret;
|
|
}
|
|
} catch(e) {}
|
|
}
|
|
})()
|
|
""")
|
|
if access_token is None:
|
|
await asyncio.sleep(1)
|
|
await page.close()
|
|
return access_token, user_agent |