Add random cookie generation and enhance authentication flow in Copilot

This commit is contained in:
hlohaus 2025-10-10 09:19:01 +02:00
parent 77afb46c50
commit cf4ab392b6

View file

@ -4,6 +4,9 @@ import os
import json
import asyncio
import base64
import random
import string
import urllib.parse
from typing import AsyncIterator
from urllib.parse import quote
@ -25,7 +28,7 @@ from ..typing import AsyncResult, Messages, MediaListType
from ..errors import MissingRequirementsError, NoValidHarFileError, MissingAuthError
from ..providers.response import *
from ..tools.media import merge_media
from ..requests import get_nodriver
from ..requests import get_nodriver, DEFAULT_HEADERS
from ..image import to_bytes, is_accepted_format
from .helper import get_last_user_message
from ..files import get_bucket_dir
@ -52,6 +55,24 @@ def extract_bucket_items(messages: Messages) -> list[dict]:
bucket_items = []
return bucket_items
def random_hex(length):
return ''.join(random.choices('0123456789ABCDEF', k=length))
def random_base64(length):
chars = string.ascii_letters + string.digits + '+/='
return ''.join(random.choices(chars, k=length))
def get_fake_cookie():
return {
"_C_ETH": "1",
"_C_Auth": "",
"MUID": random_hex(32),
"MUIDB": random_hex(32),
"_EDGE_S": f"F=1&SID={random_hex(32)}",
"_EDGE_V": "1",
"ak_bmsc": f"{random_hex(32)}~{'0'*48}~{urllib.parse.quote(random_base64(300))}"
}
class Copilot(AsyncAuthedProvider, ProviderModelMixin):
label = "Microsoft Copilot"
url = "https://copilot.microsoft.com"
@ -76,12 +97,8 @@ class Copilot(AsyncAuthedProvider, ProviderModelMixin):
conversation_url = f"{url}/c/api/conversations"
@classmethod
async def on_auth_async(cls, cookies: dict = None, api_key: str = None, proxy: str = None, **kwargs) -> AsyncIterator:
if api_key:
if not cookies:
cookies = {}
cookies[cls.anon_cookie_name] = api_key
elif cookies is None:
async def on_auth_async(cls, cookies: dict = None, proxy: str = None, **kwargs) -> AsyncIterator:
if cookies is None:
cookies = get_cookies(cls.cookie_domain, False, cache_result=False)
access_token = None
useridentitytype = None
@ -119,10 +136,12 @@ class Copilot(AsyncAuthedProvider, ProviderModelMixin):
raise MissingRequirementsError('Install or update "curl_cffi" package | pip install -U curl_cffi')
model = cls.get_model(model)
websocket_url = cls.websocket_url
headers = None
headers = DEFAULT_HEADERS.copy()
headers["origin"] = cls.url
headers["referer"] = cls.url + "/"
if getattr(auth_result, "access_token", None):
websocket_url = f"{websocket_url}&accessToken={quote(auth_result.access_token)}" + (f"&X-UserIdentityType={quote(auth_result.useridentitytype)}" if getattr(auth_result, "useridentitytype", None) else "")
headers = {"authorization": f"Bearer {auth_result.access_token}"}
headers["authorization"] = f"Bearer {auth_result.access_token}"
async with AsyncSession(
timeout=timeout,
@ -170,12 +189,14 @@ class Copilot(AsyncAuthedProvider, ProviderModelMixin):
if response.status_code == 401:
raise MissingAuthError("Status 401: Invalid session")
response.raise_for_status()
debug.log(f"Copilot: Update cookies: [{', '.join(key for key in response.cookies)}]")
auth_result.cookies.update({key: value for key, value in response.cookies.items()})
if not cls.needs_auth and cls.anon_cookie_name not in auth_result.cookies:
raise MissingAuthError(f"Missing cookie: {cls.anon_cookie_name}")
conversation = Conversation(response.json().get("currentConversationId"))
debug.log(f"Copilot: Created conversation: {conversation.conversation_id}")
else:
debug.log(f"Copilot: Use conversation: {conversation.conversation_id}")
if return_conversation:
yield conversation
# response = await session.get("https://copilot.microsoft.com/c/api/user?api-version=4", headers={"x-useridentitytype": useridentitytype} if cls._access_token else {})
# if response.status_code == 401:
@ -298,6 +319,8 @@ class Copilot(AsyncAuthedProvider, ProviderModelMixin):
debug.log(f"Copilot Message: {msg_txt[:100]}...")
if not done:
raise MissingAuthError(f"Invalid response: {last_msg}")
if return_conversation:
yield conversation
if sources:
yield Sources(sources.values())
if not wss.closed:
@ -338,6 +361,19 @@ async def get_access_token_and_cookies(url: str, proxy: str = None, needs_auth:
break
if not needs_auth:
break
if not needs_auth:
textarea = await page.select("textarea")
if textarea is not None:
await textarea.send_keys("Hello")
await asyncio.sleep(1)
button = await page.select("[data-testid=\"submit-button\"]")
if button:
await button.click()
turnstile = await page.select('#cf-turnstile', 300)
if turnstile:
debug.log("Found Element: 'cf-turnstile'")
await asyncio.sleep(3)
await click_trunstile(page)
cookies = {}
while Copilot.anon_cookie_name not in cookies:
await asyncio.sleep(2)
@ -369,7 +405,23 @@ def readHAR(url: str):
useridentitytype = v_headers["x-useridentitytype"]
if v['request']['cookies']:
cookies = {c['name']: c['value'] for c in v['request']['cookies']}
if api_key is None:
raise NoValidHarFileError("No access token found in .har files")
if not cookies:
raise NoValidHarFileError("No session found in .har files")
return api_key, useridentitytype, cookies
return api_key, useridentitytype, cookies
if has_nodriver:
async def click_trunstile(page: nodriver.Tab, element='document.getElementById("cf-turnstile")'):
for _ in range(3):
size = None
for idx in range(15):
size = await page.js_dumps(f'{element}?.getBoundingClientRect()||{{}}')
debug.log(f"Found size: {size.get('x'), size.get('y')}")
if "x" not in size:
break
await page.flash_point(size.get("x") + idx * 3, size.get("y") + idx * 3)
await page.mouse_click(size.get("x") + idx * 3, size.get("y") + idx * 3)
await asyncio.sleep(2)
if "x" not in size:
break
debug.log("Finished clicking trunstile.")