feat: add CLI entry points for GeminiCLI and QwenCode providers

This commit is contained in:
hlohaus 2026-02-06 11:25:35 +01:00
parent e4125ecae4
commit 6472b47739
3 changed files with 737 additions and 4 deletions

View file

@ -1,9 +1,17 @@
import os
import sys
import json
import base64
import time
import secrets
import hashlib
import asyncio
import webbrowser
import threading
from pathlib import Path
from typing import Any, AsyncGenerator, Dict, List, Optional, Union
from typing import Any, AsyncGenerator, Dict, List, Optional, Union, Tuple
from urllib.parse import urlencode, parse_qs, urlparse
from http.server import HTTPServer, BaseHTTPRequestHandler
import aiohttp
from aiohttp import ClientSession, ClientTimeout
@ -17,9 +25,213 @@ from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin, AuthFile
from ..helper import get_connector, get_system_prompt, format_media_prompt
from ... import debug
def get_oauth_creds_path():
return Path.home() / ".gemini" / "oauth_creds.json"
# OAuth configuration for GeminiCLI
GEMINICLI_REDIRECT_URI = "http://localhost:51122/oauthcallback"
GEMINICLI_SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/userinfo.profile",
]
GEMINICLI_OAUTH_CALLBACK_PORT = 51122
GEMINICLI_OAUTH_CALLBACK_PATH = "/oauthcallback"
def generate_pkce_pair() -> Tuple[str, str]:
"""Generate a PKCE verifier and challenge pair."""
verifier = secrets.token_urlsafe(32)
digest = hashlib.sha256(verifier.encode('ascii')).digest()
challenge = base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii')
return verifier, challenge
def encode_oauth_state(verifier: str) -> str:
"""Encode OAuth state parameter with PKCE verifier."""
payload = {"verifier": verifier}
return base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=')
def decode_oauth_state(state: str) -> Dict[str, str]:
"""Decode OAuth state parameter back to verifier."""
padded = state + '=' * (4 - len(state) % 4) if len(state) % 4 else state
normalized = padded.replace('-', '+').replace('_', '/')
try:
decoded = base64.b64decode(normalized).decode('utf-8')
parsed = json.loads(decoded)
return {"verifier": parsed.get("verifier", "")}
except Exception:
return {"verifier": ""}
class GeminiCLIOAuthCallbackHandler(BaseHTTPRequestHandler):
"""HTTP request handler for OAuth callback."""
callback_result: Optional[Dict[str, str]] = None
callback_error: Optional[str] = None
def log_message(self, format, *args):
"""Suppress default logging."""
pass
def do_GET(self):
"""Handle GET request for OAuth callback."""
parsed = urlparse(self.path)
if parsed.path != GEMINICLI_OAUTH_CALLBACK_PATH:
self.send_error(404, "Not Found")
return
params = parse_qs(parsed.query)
code = params.get("code", [None])[0]
state = params.get("state", [None])[0]
error = params.get("error", [None])[0]
if error:
GeminiCLIOAuthCallbackHandler.callback_error = error
self._send_error_response(error)
elif code and state:
GeminiCLIOAuthCallbackHandler.callback_result = {"code": code, "state": state}
self._send_success_response()
else:
GeminiCLIOAuthCallbackHandler.callback_error = "Missing code or state parameter"
self._send_error_response("Missing parameters")
def _send_success_response(self):
"""Send success HTML response."""
html = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Authentication Successful</title>
<style>
body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
display: flex; justify-content: center; align-items: center; height: 100vh;
margin: 0; background: linear-gradient(135deg, #4285f4 0%, #34a853 100%); }
.container { background: white; padding: 3rem; border-radius: 1rem;
box-shadow: 0 20px 60px rgba(0,0,0,0.3); text-align: center; max-width: 400px; }
h1 { color: #10B981; margin-bottom: 1rem; }
p { color: #6B7280; line-height: 1.6; }
</style>
</head>
<body>
<div class="container">
<div style="font-size: 4rem; margin-bottom: 1rem;"></div>
<h1>Authentication Successful!</h1>
<p>You have successfully authenticated with Google GeminiCLI.<br>You can close this window and return to your terminal.</p>
</div>
</body>
</html>"""
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", len(html.encode()))
self.end_headers()
self.wfile.write(html.encode())
def _send_error_response(self, error: str):
"""Send error HTML response."""
html = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Authentication Failed</title>
<style>
body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
display: flex; justify-content: center; align-items: center; height: 100vh;
margin: 0; background: #FEE2E2; }}
.container {{ background: white; padding: 3rem; border-radius: 1rem;
box-shadow: 0 10px 40px rgba(0,0,0,0.1); text-align: center; }}
h1 {{ color: #EF4444; }}
p {{ color: #6B7280; }}
</style>
</head>
<body>
<div class="container">
<h1> Authentication Failed</h1>
<p>Error: {error}</p>
<p>Please try again.</p>
</div>
</body>
</html>"""
self.send_response(400)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", len(html.encode()))
self.end_headers()
self.wfile.write(html.encode())
class GeminiCLIOAuthCallbackServer:
"""Local HTTP server to capture OAuth callback."""
def __init__(self, port: int = GEMINICLI_OAUTH_CALLBACK_PORT, timeout: float = 300.0):
self.port = port
self.timeout = timeout
self.server: Optional[HTTPServer] = None
self._thread: Optional[threading.Thread] = None
self._stop_flag = False
def start(self) -> bool:
"""Start the callback server. Returns True if successful."""
try:
GeminiCLIOAuthCallbackHandler.callback_result = None
GeminiCLIOAuthCallbackHandler.callback_error = None
self._stop_flag = False
self.server = HTTPServer(("localhost", self.port), GeminiCLIOAuthCallbackHandler)
self.server.timeout = 0.5
self._thread = threading.Thread(target=self._serve, daemon=True)
self._thread.start()
return True
except OSError as e:
debug.log(f"Failed to start OAuth callback server: {e}")
return False
def _serve(self):
"""Serve requests until shutdown or result received."""
start_time = time.time()
while not self._stop_flag and self.server:
if time.time() - start_time > self.timeout:
break
if GeminiCLIOAuthCallbackHandler.callback_result or GeminiCLIOAuthCallbackHandler.callback_error:
time.sleep(0.3)
break
try:
self.server.handle_request()
except Exception:
break
def wait_for_callback(self) -> Optional[Dict[str, str]]:
"""Wait for OAuth callback and return result."""
start_time = time.time()
while time.time() - start_time < self.timeout:
if GeminiCLIOAuthCallbackHandler.callback_result or GeminiCLIOAuthCallbackHandler.callback_error:
break
time.sleep(0.1)
self._stop_flag = True
if self._thread:
self._thread.join(timeout=2.0)
if GeminiCLIOAuthCallbackHandler.callback_error:
raise RuntimeError(f"OAuth error: {GeminiCLIOAuthCallbackHandler.callback_error}")
return GeminiCLIOAuthCallbackHandler.callback_result
def stop(self):
"""Stop the callback server."""
self._stop_flag = True
if self.server:
try:
self.server.server_close()
except Exception:
pass
self.server = None
class AuthManager(AuthFileMixin):
"""
Handles OAuth2 authentication and Google Code Assist API communication.
@ -567,4 +779,363 @@ class GeminiCLI(AsyncGeneratorProvider, ProviderModelMixin):
tools=tools,
**kwargs
):
yield chunk
yield chunk
@classmethod
def build_authorization_url(cls) -> Tuple[str, str, str]:
"""Build OAuth authorization URL with PKCE."""
verifier, challenge = generate_pkce_pair()
state = encode_oauth_state(verifier)
params = {
"client_id": AuthManager.OAUTH_CLIENT_ID,
"response_type": "code",
"redirect_uri": GEMINICLI_REDIRECT_URI,
"scope": " ".join(GEMINICLI_SCOPES),
"code_challenge": challenge,
"code_challenge_method": "S256",
"state": state,
"access_type": "offline",
"prompt": "consent",
}
url = f"https://accounts.google.com/o/oauth2/v2/auth?{urlencode(params)}"
return url, verifier, state
@classmethod
async def exchange_code_for_tokens(cls, code: str, state: str) -> Dict[str, Any]:
"""Exchange authorization code for access and refresh tokens."""
decoded_state = decode_oauth_state(state)
verifier = decoded_state.get("verifier", "")
if not verifier:
raise RuntimeError("Missing PKCE verifier in state parameter")
start_time = time.time()
async with aiohttp.ClientSession() as session:
token_data = {
"client_id": AuthManager.OAUTH_CLIENT_ID,
"client_secret": AuthManager.OAUTH_CLIENT_SECRET,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": GEMINICLI_REDIRECT_URI,
"code_verifier": verifier,
}
async with session.post(
"https://oauth2.googleapis.com/token",
data=token_data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
) as resp:
if not resp.ok:
error_text = await resp.text()
raise RuntimeError(f"Token exchange failed: {error_text}")
token_response = await resp.json()
access_token = token_response.get("access_token")
refresh_token = token_response.get("refresh_token")
expires_in = token_response.get("expires_in", 3600)
if not access_token or not refresh_token:
raise RuntimeError("Missing tokens in response")
# Get user info
email = None
async with session.get(
"https://www.googleapis.com/oauth2/v1/userinfo?alt=json",
headers={"Authorization": f"Bearer {access_token}"}
) as resp:
if resp.ok:
user_info = await resp.json()
email = user_info.get("email")
expires_at = int((start_time + expires_in) * 1000) # milliseconds
return {
"access_token": access_token,
"refresh_token": refresh_token,
"expiry_date": expires_at,
"email": email,
}
@classmethod
async def interactive_login(
cls,
no_browser: bool = False,
timeout: float = 300.0,
) -> Dict[str, Any]:
"""Perform interactive OAuth login flow."""
auth_url, verifier, state = cls.build_authorization_url()
print("\n" + "=" * 60)
print("GeminiCLI OAuth Login")
print("=" * 60)
callback_server = GeminiCLIOAuthCallbackServer(timeout=timeout)
server_started = callback_server.start()
if server_started and not no_browser:
print(f"\nOpening browser for authentication...")
print(f"If browser doesn't open, visit this URL:\n")
print(f"{auth_url}\n")
try:
webbrowser.open(auth_url)
except Exception as e:
print(f"Could not open browser automatically: {e}")
print("Please open the URL above manually.\n")
else:
if not server_started:
print(f"\nCould not start local callback server on port {GEMINICLI_OAUTH_CALLBACK_PORT}.")
print("You may need to close any application using that port.\n")
print(f"\nPlease open this URL in your browser:\n")
print(f"{auth_url}\n")
if server_started:
print("Waiting for authentication callback...")
try:
callback_result = callback_server.wait_for_callback()
if not callback_result:
raise RuntimeError("OAuth callback timed out")
code = callback_result.get("code")
callback_state = callback_result.get("state")
if not code:
raise RuntimeError("No authorization code received")
print("\n✓ Authorization code received. Exchanging for tokens...")
tokens = await cls.exchange_code_for_tokens(code, callback_state or state)
print(f"✓ Authentication successful!")
if tokens.get("email"):
print(f" Logged in as: {tokens['email']}")
return tokens
finally:
callback_server.stop()
else:
print("\nAfter completing authentication, you'll be redirected to a localhost URL.")
print("Copy and paste the full redirect URL or just the authorization code below:\n")
user_input = input("Paste redirect URL or code: ").strip()
if not user_input:
raise RuntimeError("No input provided")
if user_input.startswith("http"):
parsed = urlparse(user_input)
params = parse_qs(parsed.query)
code = params.get("code", [None])[0]
callback_state = params.get("state", [state])[0]
else:
code = user_input
callback_state = state
if not code:
raise RuntimeError("Could not extract authorization code")
print("\nExchanging code for tokens...")
tokens = await cls.exchange_code_for_tokens(code, callback_state)
print(f"✓ Authentication successful!")
if tokens.get("email"):
print(f" Logged in as: {tokens['email']}")
return tokens
@classmethod
async def login(
cls,
no_browser: bool = False,
credentials_path: Optional[Path] = None,
) -> "AuthManager":
"""
Perform interactive OAuth login and save credentials.
Args:
no_browser: If True, don't auto-open browser
credentials_path: Path to save credentials
Returns:
AuthManager with active credentials
Example:
>>> import asyncio
>>> from g4f.Provider.needs_auth import GeminiCLI
>>> asyncio.run(GeminiCLI.login())
"""
tokens = await cls.interactive_login(no_browser=no_browser)
creds = {
"access_token": tokens["access_token"],
"refresh_token": tokens["refresh_token"],
"expiry_date": tokens["expiry_date"],
"email": tokens.get("email"),
"client_id": AuthManager.OAUTH_CLIENT_ID,
"client_secret": AuthManager.OAUTH_CLIENT_SECRET,
}
if credentials_path:
path = credentials_path
else:
path = AuthManager.get_cache_file()
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w") as f:
json.dump(creds, f, indent=2)
try:
path.chmod(0o600)
except Exception:
pass
print(f"\n✓ Credentials saved to: {path}")
print("=" * 60 + "\n")
auth_manager = AuthManager(env=os.environ)
auth_manager._access_token = tokens["access_token"]
auth_manager._expiry = tokens["expiry_date"] / 1000
cls.auth_manager = auth_manager
return auth_manager
@classmethod
def has_credentials(cls) -> bool:
"""Check if valid credentials exist."""
cache_path = AuthManager.get_cache_file()
if cache_path.exists():
return True
default_path = get_oauth_creds_path()
return default_path.exists()
@classmethod
def get_credentials_path(cls) -> Optional[Path]:
"""Get path to credentials file if it exists."""
cache_path = AuthManager.get_cache_file()
if cache_path.exists():
return cache_path
default_path = get_oauth_creds_path()
if default_path.exists():
return default_path
return None
async def main():
"""CLI entry point for GeminiCLI authentication."""
import argparse
parser = argparse.ArgumentParser(
description="GeminiCLI OAuth Authentication for gpt4free",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s login # Interactive login with browser
%(prog)s login --no-browser # Manual login (paste URL)
%(prog)s status # Check authentication status
%(prog)s logout # Remove saved credentials
"""
)
subparsers = parser.add_subparsers(dest="command", help="Commands")
# Login command
login_parser = subparsers.add_parser("login", help="Authenticate with Google")
login_parser.add_argument(
"--no-browser", "-n",
action="store_true",
help="Don't auto-open browser, print URL instead"
)
# Status command
subparsers.add_parser("status", help="Check authentication status")
# Logout command
subparsers.add_parser("logout", help="Remove saved credentials")
args = parser.parse_args()
if args.command == "login":
try:
await GeminiCLI.login(no_browser=args.no_browser)
except KeyboardInterrupt:
print("\n\nLogin cancelled.")
sys.exit(1)
except Exception as e:
print(f"\n❌ Login failed: {e}")
sys.exit(1)
elif args.command == "status":
print("\nGeminiCLI Authentication Status")
print("=" * 40)
if GeminiCLI.has_credentials():
creds_path = GeminiCLI.get_credentials_path()
print(f"✓ Credentials found at: {creds_path}")
try:
with creds_path.open() as f:
creds = json.load(f)
if creds.get("email"):
print(f" Email: {creds['email']}")
expiry = creds.get("expiry_date")
if expiry:
expiry_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(expiry / 1000))
if expiry / 1000 > time.time():
print(f" Token expires: {expiry_time}")
else:
print(f" Token expired: {expiry_time} (will auto-refresh)")
except Exception as e:
print(f" (Could not read credential details: {e})")
else:
print("✗ No credentials found")
print(f"\nRun 'g4f-geminicli login' to authenticate.")
print()
elif args.command == "logout":
print("\nGeminiCLI Logout")
print("=" * 40)
removed = False
cache_path = AuthManager.get_cache_file()
if cache_path.exists():
cache_path.unlink()
print(f"✓ Removed: {cache_path}")
removed = True
default_path = get_oauth_creds_path()
if default_path.exists():
default_path.unlink()
print(f"✓ Removed: {default_path}")
removed = True
if removed:
print("\n✓ Credentials removed successfully.")
else:
print("No credentials found to remove.")
print()
else:
parser.print_help()
def cli_main():
"""Synchronous CLI entry point for setup.py console_scripts."""
asyncio.run(main())
if __name__ == "__main__":
cli_main()

View file

@ -1,10 +1,18 @@
from __future__ import annotations
import sys
import json
import time
import asyncio
from pathlib import Path
from typing import Optional
from ...typing import Messages, AsyncResult
from ..template import OpenaiTemplate
from .qwenContentGenerator import QwenContentGenerator
from .qwenOAuth2 import QwenOAuth2Client
from .sharedTokenManager import TokenManagerError
from .sharedTokenManager import TokenManagerError, SharedTokenManager
from .oauthFlow import launch_browser_for_oauth
class QwenCode(OpenaiTemplate):
label = "Qwen Code 🤖"
@ -70,4 +78,156 @@ class QwenCode(OpenaiTemplate):
else:
yield chunk
except:
raise
raise
@classmethod
async def login(cls, credentials_path: Optional[Path] = None) -> SharedTokenManager:
"""
Perform interactive OAuth login and save credentials.
Args:
credentials_path: Path to save credentials (default: g4f cache)
Returns:
SharedTokenManager with active credentials
Example:
>>> import asyncio
>>> from g4f.Provider.qwen import QwenCode
>>> asyncio.run(QwenCode.login())
"""
print("\n" + "=" * 60)
print("QwenCode OAuth Login")
print("=" * 60)
await launch_browser_for_oauth()
shared_manager = SharedTokenManager.getInstance()
print("=" * 60 + "\n")
return shared_manager
@classmethod
def has_credentials(cls) -> bool:
"""Check if valid credentials exist."""
shared_manager = SharedTokenManager.getInstance()
path = shared_manager.getCredentialFilePath()
return path.exists()
@classmethod
def get_credentials_path(cls) -> Optional[Path]:
"""Get path to credentials file if it exists."""
shared_manager = SharedTokenManager.getInstance()
path = shared_manager.getCredentialFilePath()
if path.exists():
return path
return None
async def main():
"""CLI entry point for QwenCode authentication."""
import argparse
parser = argparse.ArgumentParser(
description="QwenCode OAuth Authentication for gpt4free",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s login # Interactive device code login
%(prog)s status # Check authentication status
%(prog)s logout # Remove saved credentials
"""
)
subparsers = parser.add_subparsers(dest="command", help="Commands")
# Login command
subparsers.add_parser("login", help="Authenticate with Qwen")
# Status command
subparsers.add_parser("status", help="Check authentication status")
# Logout command
subparsers.add_parser("logout", help="Remove saved credentials")
args = parser.parse_args()
if args.command == "login":
try:
await QwenCode.login()
except KeyboardInterrupt:
print("\n\nLogin cancelled.")
sys.exit(1)
except Exception as e:
print(f"\n❌ Login failed: {e}")
sys.exit(1)
elif args.command == "status":
print("\nQwenCode Authentication Status")
print("=" * 40)
if QwenCode.has_credentials():
creds_path = QwenCode.get_credentials_path()
print(f"✓ Credentials found at: {creds_path}")
try:
with creds_path.open() as f:
creds = json.load(f)
expiry = creds.get("expiry_date")
if expiry:
expiry_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(expiry / 1000))
if expiry / 1000 > time.time():
print(f" Token expires: {expiry_time}")
else:
print(f" Token expired: {expiry_time} (will auto-refresh)")
if creds.get("resource_url"):
print(f" Endpoint: {creds['resource_url']}")
except Exception as e:
print(f" (Could not read credential details: {e})")
else:
print("✗ No credentials found")
print(f"\nRun 'g4f-qwencode login' to authenticate.")
print()
elif args.command == "logout":
print("\nQwenCode Logout")
print("=" * 40)
removed = False
shared_manager = SharedTokenManager.getInstance()
path = shared_manager.getCredentialFilePath()
if path.exists():
path.unlink()
print(f"✓ Removed: {path}")
removed = True
# Also try the default location
default_path = Path.home() / ".qwen" / "oauth_creds.json"
if default_path.exists() and default_path != path:
default_path.unlink()
print(f"✓ Removed: {default_path}")
removed = True
if removed:
print("\n✓ Credentials removed successfully.")
else:
print("No credentials found to remove.")
print()
else:
parser.print_help()
def cli_main():
"""Synchronous CLI entry point for setup.py console_scripts."""
asyncio.run(main())
if __name__ == "__main__":
cli_main()

View file

@ -120,6 +120,8 @@ setup(
'g4f=g4f.cli:main',
'g4f-mcp=g4f.mcp.server:main',
'g4f-antigravity=g4f.Provider.needs_auth.Antigravity:cli_main',
'g4f-geminicli=g4f.Provider.needs_auth.GeminiCLI:cli_main',
'g4f-qwencode=g4f.Provider.qwen.QwenCode:cli_main',
],
},
url='https://github.com/xtekky/gpt4free', # Link to your GitHub repository