From 6472b47739dfd96fda3461e7ac85343e6df69475 Mon Sep 17 00:00:00 2001 From: hlohaus <983577+hlohaus@users.noreply.github.com> Date: Fri, 6 Feb 2026 11:25:35 +0100 Subject: [PATCH] feat: add CLI entry points for GeminiCLI and QwenCode providers --- g4f/Provider/needs_auth/GeminiCLI.py | 575 ++++++++++++++++++++++++++- g4f/Provider/qwen/QwenCode.py | 164 +++++++- setup.py | 2 + 3 files changed, 737 insertions(+), 4 deletions(-) diff --git a/g4f/Provider/needs_auth/GeminiCLI.py b/g4f/Provider/needs_auth/GeminiCLI.py index 46fbf254..4496cbd4 100644 --- a/g4f/Provider/needs_auth/GeminiCLI.py +++ b/g4f/Provider/needs_auth/GeminiCLI.py @@ -1,9 +1,17 @@ import os +import sys import json import base64 import time +import secrets +import hashlib +import asyncio +import webbrowser +import threading from pathlib import Path -from typing import Any, AsyncGenerator, Dict, List, Optional, Union +from typing import Any, AsyncGenerator, Dict, List, Optional, Union, Tuple +from urllib.parse import urlencode, parse_qs, urlparse +from http.server import HTTPServer, BaseHTTPRequestHandler import aiohttp from aiohttp import ClientSession, ClientTimeout @@ -17,9 +25,213 @@ from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin, AuthFile from ..helper import get_connector, get_system_prompt, format_media_prompt from ... import debug + def get_oauth_creds_path(): return Path.home() / ".gemini" / "oauth_creds.json" + +# OAuth configuration for GeminiCLI +GEMINICLI_REDIRECT_URI = "http://localhost:51122/oauthcallback" +GEMINICLI_SCOPES = [ + "https://www.googleapis.com/auth/cloud-platform", + "https://www.googleapis.com/auth/userinfo.email", + "https://www.googleapis.com/auth/userinfo.profile", +] +GEMINICLI_OAUTH_CALLBACK_PORT = 51122 +GEMINICLI_OAUTH_CALLBACK_PATH = "/oauthcallback" + + +def generate_pkce_pair() -> Tuple[str, str]: + """Generate a PKCE verifier and challenge pair.""" + verifier = secrets.token_urlsafe(32) + digest = hashlib.sha256(verifier.encode('ascii')).digest() + challenge = base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii') + return verifier, challenge + + +def encode_oauth_state(verifier: str) -> str: + """Encode OAuth state parameter with PKCE verifier.""" + payload = {"verifier": verifier} + return base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=') + + +def decode_oauth_state(state: str) -> Dict[str, str]: + """Decode OAuth state parameter back to verifier.""" + padded = state + '=' * (4 - len(state) % 4) if len(state) % 4 else state + normalized = padded.replace('-', '+').replace('_', '/') + try: + decoded = base64.b64decode(normalized).decode('utf-8') + parsed = json.loads(decoded) + return {"verifier": parsed.get("verifier", "")} + except Exception: + return {"verifier": ""} + + +class GeminiCLIOAuthCallbackHandler(BaseHTTPRequestHandler): + """HTTP request handler for OAuth callback.""" + + callback_result: Optional[Dict[str, str]] = None + callback_error: Optional[str] = None + + def log_message(self, format, *args): + """Suppress default logging.""" + pass + + def do_GET(self): + """Handle GET request for OAuth callback.""" + parsed = urlparse(self.path) + + if parsed.path != GEMINICLI_OAUTH_CALLBACK_PATH: + self.send_error(404, "Not Found") + return + + params = parse_qs(parsed.query) + code = params.get("code", [None])[0] + state = params.get("state", [None])[0] + error = params.get("error", [None])[0] + + if error: + GeminiCLIOAuthCallbackHandler.callback_error = error + self._send_error_response(error) + elif code and state: + GeminiCLIOAuthCallbackHandler.callback_result = {"code": code, "state": state} + self._send_success_response() + else: + GeminiCLIOAuthCallbackHandler.callback_error = "Missing code or state parameter" + self._send_error_response("Missing parameters") + + def _send_success_response(self): + """Send success HTML response.""" + html = """ + + + + Authentication Successful + + + +
+
+

Authentication Successful!

+

You have successfully authenticated with Google GeminiCLI.
You can close this window and return to your terminal.

+
+ +""" + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Content-Length", len(html.encode())) + self.end_headers() + self.wfile.write(html.encode()) + + def _send_error_response(self, error: str): + """Send error HTML response.""" + html = f""" + + + + Authentication Failed + + + +
+

❌ Authentication Failed

+

Error: {error}

+

Please try again.

+
+ +""" + self.send_response(400) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Content-Length", len(html.encode())) + self.end_headers() + self.wfile.write(html.encode()) + + +class GeminiCLIOAuthCallbackServer: + """Local HTTP server to capture OAuth callback.""" + + def __init__(self, port: int = GEMINICLI_OAUTH_CALLBACK_PORT, timeout: float = 300.0): + self.port = port + self.timeout = timeout + self.server: Optional[HTTPServer] = None + self._thread: Optional[threading.Thread] = None + self._stop_flag = False + + def start(self) -> bool: + """Start the callback server. Returns True if successful.""" + try: + GeminiCLIOAuthCallbackHandler.callback_result = None + GeminiCLIOAuthCallbackHandler.callback_error = None + self._stop_flag = False + + self.server = HTTPServer(("localhost", self.port), GeminiCLIOAuthCallbackHandler) + self.server.timeout = 0.5 + + self._thread = threading.Thread(target=self._serve, daemon=True) + self._thread.start() + return True + except OSError as e: + debug.log(f"Failed to start OAuth callback server: {e}") + return False + + def _serve(self): + """Serve requests until shutdown or result received.""" + start_time = time.time() + while not self._stop_flag and self.server: + if time.time() - start_time > self.timeout: + break + if GeminiCLIOAuthCallbackHandler.callback_result or GeminiCLIOAuthCallbackHandler.callback_error: + time.sleep(0.3) + break + try: + self.server.handle_request() + except Exception: + break + + def wait_for_callback(self) -> Optional[Dict[str, str]]: + """Wait for OAuth callback and return result.""" + start_time = time.time() + while time.time() - start_time < self.timeout: + if GeminiCLIOAuthCallbackHandler.callback_result or GeminiCLIOAuthCallbackHandler.callback_error: + break + time.sleep(0.1) + + self._stop_flag = True + + if self._thread: + self._thread.join(timeout=2.0) + + if GeminiCLIOAuthCallbackHandler.callback_error: + raise RuntimeError(f"OAuth error: {GeminiCLIOAuthCallbackHandler.callback_error}") + + return GeminiCLIOAuthCallbackHandler.callback_result + + def stop(self): + """Stop the callback server.""" + self._stop_flag = True + if self.server: + try: + self.server.server_close() + except Exception: + pass + self.server = None + class AuthManager(AuthFileMixin): """ Handles OAuth2 authentication and Google Code Assist API communication. @@ -567,4 +779,363 @@ class GeminiCLI(AsyncGeneratorProvider, ProviderModelMixin): tools=tools, **kwargs ): - yield chunk \ No newline at end of file + yield chunk + + @classmethod + def build_authorization_url(cls) -> Tuple[str, str, str]: + """Build OAuth authorization URL with PKCE.""" + verifier, challenge = generate_pkce_pair() + state = encode_oauth_state(verifier) + + params = { + "client_id": AuthManager.OAUTH_CLIENT_ID, + "response_type": "code", + "redirect_uri": GEMINICLI_REDIRECT_URI, + "scope": " ".join(GEMINICLI_SCOPES), + "code_challenge": challenge, + "code_challenge_method": "S256", + "state": state, + "access_type": "offline", + "prompt": "consent", + } + + url = f"https://accounts.google.com/o/oauth2/v2/auth?{urlencode(params)}" + return url, verifier, state + + @classmethod + async def exchange_code_for_tokens(cls, code: str, state: str) -> Dict[str, Any]: + """Exchange authorization code for access and refresh tokens.""" + decoded_state = decode_oauth_state(state) + verifier = decoded_state.get("verifier", "") + + if not verifier: + raise RuntimeError("Missing PKCE verifier in state parameter") + + start_time = time.time() + + async with aiohttp.ClientSession() as session: + token_data = { + "client_id": AuthManager.OAUTH_CLIENT_ID, + "client_secret": AuthManager.OAUTH_CLIENT_SECRET, + "code": code, + "grant_type": "authorization_code", + "redirect_uri": GEMINICLI_REDIRECT_URI, + "code_verifier": verifier, + } + + async with session.post( + "https://oauth2.googleapis.com/token", + data=token_data, + headers={"Content-Type": "application/x-www-form-urlencoded"} + ) as resp: + if not resp.ok: + error_text = await resp.text() + raise RuntimeError(f"Token exchange failed: {error_text}") + + token_response = await resp.json() + + access_token = token_response.get("access_token") + refresh_token = token_response.get("refresh_token") + expires_in = token_response.get("expires_in", 3600) + + if not access_token or not refresh_token: + raise RuntimeError("Missing tokens in response") + + # Get user info + email = None + async with session.get( + "https://www.googleapis.com/oauth2/v1/userinfo?alt=json", + headers={"Authorization": f"Bearer {access_token}"} + ) as resp: + if resp.ok: + user_info = await resp.json() + email = user_info.get("email") + + expires_at = int((start_time + expires_in) * 1000) # milliseconds + + return { + "access_token": access_token, + "refresh_token": refresh_token, + "expiry_date": expires_at, + "email": email, + } + + @classmethod + async def interactive_login( + cls, + no_browser: bool = False, + timeout: float = 300.0, + ) -> Dict[str, Any]: + """Perform interactive OAuth login flow.""" + auth_url, verifier, state = cls.build_authorization_url() + + print("\n" + "=" * 60) + print("GeminiCLI OAuth Login") + print("=" * 60) + + callback_server = GeminiCLIOAuthCallbackServer(timeout=timeout) + server_started = callback_server.start() + + if server_started and not no_browser: + print(f"\nOpening browser for authentication...") + print(f"If browser doesn't open, visit this URL:\n") + print(f"{auth_url}\n") + + try: + webbrowser.open(auth_url) + except Exception as e: + print(f"Could not open browser automatically: {e}") + print("Please open the URL above manually.\n") + else: + if not server_started: + print(f"\nCould not start local callback server on port {GEMINICLI_OAUTH_CALLBACK_PORT}.") + print("You may need to close any application using that port.\n") + + print(f"\nPlease open this URL in your browser:\n") + print(f"{auth_url}\n") + + if server_started: + print("Waiting for authentication callback...") + + try: + callback_result = callback_server.wait_for_callback() + + if not callback_result: + raise RuntimeError("OAuth callback timed out") + + code = callback_result.get("code") + callback_state = callback_result.get("state") + + if not code: + raise RuntimeError("No authorization code received") + + print("\n✓ Authorization code received. Exchanging for tokens...") + + tokens = await cls.exchange_code_for_tokens(code, callback_state or state) + + print(f"✓ Authentication successful!") + if tokens.get("email"): + print(f" Logged in as: {tokens['email']}") + + return tokens + + finally: + callback_server.stop() + else: + print("\nAfter completing authentication, you'll be redirected to a localhost URL.") + print("Copy and paste the full redirect URL or just the authorization code below:\n") + + user_input = input("Paste redirect URL or code: ").strip() + + if not user_input: + raise RuntimeError("No input provided") + + if user_input.startswith("http"): + parsed = urlparse(user_input) + params = parse_qs(parsed.query) + code = params.get("code", [None])[0] + callback_state = params.get("state", [state])[0] + else: + code = user_input + callback_state = state + + if not code: + raise RuntimeError("Could not extract authorization code") + + print("\nExchanging code for tokens...") + tokens = await cls.exchange_code_for_tokens(code, callback_state) + + print(f"✓ Authentication successful!") + if tokens.get("email"): + print(f" Logged in as: {tokens['email']}") + + return tokens + + @classmethod + async def login( + cls, + no_browser: bool = False, + credentials_path: Optional[Path] = None, + ) -> "AuthManager": + """ + Perform interactive OAuth login and save credentials. + + Args: + no_browser: If True, don't auto-open browser + credentials_path: Path to save credentials + + Returns: + AuthManager with active credentials + + Example: + >>> import asyncio + >>> from g4f.Provider.needs_auth import GeminiCLI + >>> asyncio.run(GeminiCLI.login()) + """ + tokens = await cls.interactive_login(no_browser=no_browser) + + creds = { + "access_token": tokens["access_token"], + "refresh_token": tokens["refresh_token"], + "expiry_date": tokens["expiry_date"], + "email": tokens.get("email"), + "client_id": AuthManager.OAUTH_CLIENT_ID, + "client_secret": AuthManager.OAUTH_CLIENT_SECRET, + } + + if credentials_path: + path = credentials_path + else: + path = AuthManager.get_cache_file() + + path.parent.mkdir(parents=True, exist_ok=True) + + with path.open("w") as f: + json.dump(creds, f, indent=2) + + try: + path.chmod(0o600) + except Exception: + pass + + print(f"\n✓ Credentials saved to: {path}") + print("=" * 60 + "\n") + + auth_manager = AuthManager(env=os.environ) + auth_manager._access_token = tokens["access_token"] + auth_manager._expiry = tokens["expiry_date"] / 1000 + cls.auth_manager = auth_manager + + return auth_manager + + @classmethod + def has_credentials(cls) -> bool: + """Check if valid credentials exist.""" + cache_path = AuthManager.get_cache_file() + if cache_path.exists(): + return True + default_path = get_oauth_creds_path() + return default_path.exists() + + @classmethod + def get_credentials_path(cls) -> Optional[Path]: + """Get path to credentials file if it exists.""" + cache_path = AuthManager.get_cache_file() + if cache_path.exists(): + return cache_path + default_path = get_oauth_creds_path() + if default_path.exists(): + return default_path + return None + + +async def main(): + """CLI entry point for GeminiCLI authentication.""" + import argparse + + parser = argparse.ArgumentParser( + description="GeminiCLI OAuth Authentication for gpt4free", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s login # Interactive login with browser + %(prog)s login --no-browser # Manual login (paste URL) + %(prog)s status # Check authentication status + %(prog)s logout # Remove saved credentials +""" + ) + + subparsers = parser.add_subparsers(dest="command", help="Commands") + + # Login command + login_parser = subparsers.add_parser("login", help="Authenticate with Google") + login_parser.add_argument( + "--no-browser", "-n", + action="store_true", + help="Don't auto-open browser, print URL instead" + ) + + # Status command + subparsers.add_parser("status", help="Check authentication status") + + # Logout command + subparsers.add_parser("logout", help="Remove saved credentials") + + args = parser.parse_args() + + if args.command == "login": + try: + await GeminiCLI.login(no_browser=args.no_browser) + except KeyboardInterrupt: + print("\n\nLogin cancelled.") + sys.exit(1) + except Exception as e: + print(f"\n❌ Login failed: {e}") + sys.exit(1) + + elif args.command == "status": + print("\nGeminiCLI Authentication Status") + print("=" * 40) + + if GeminiCLI.has_credentials(): + creds_path = GeminiCLI.get_credentials_path() + print(f"✓ Credentials found at: {creds_path}") + + try: + with creds_path.open() as f: + creds = json.load(f) + + if creds.get("email"): + print(f" Email: {creds['email']}") + + expiry = creds.get("expiry_date") + if expiry: + expiry_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(expiry / 1000)) + if expiry / 1000 > time.time(): + print(f" Token expires: {expiry_time}") + else: + print(f" Token expired: {expiry_time} (will auto-refresh)") + except Exception as e: + print(f" (Could not read credential details: {e})") + else: + print("✗ No credentials found") + print(f"\nRun 'g4f-geminicli login' to authenticate.") + + print() + + elif args.command == "logout": + print("\nGeminiCLI Logout") + print("=" * 40) + + removed = False + + cache_path = AuthManager.get_cache_file() + if cache_path.exists(): + cache_path.unlink() + print(f"✓ Removed: {cache_path}") + removed = True + + default_path = get_oauth_creds_path() + if default_path.exists(): + default_path.unlink() + print(f"✓ Removed: {default_path}") + removed = True + + if removed: + print("\n✓ Credentials removed successfully.") + else: + print("No credentials found to remove.") + + print() + + else: + parser.print_help() + + +def cli_main(): + """Synchronous CLI entry point for setup.py console_scripts.""" + asyncio.run(main()) + + +if __name__ == "__main__": + cli_main() \ No newline at end of file diff --git a/g4f/Provider/qwen/QwenCode.py b/g4f/Provider/qwen/QwenCode.py index 939325df..f8369ffb 100644 --- a/g4f/Provider/qwen/QwenCode.py +++ b/g4f/Provider/qwen/QwenCode.py @@ -1,10 +1,18 @@ from __future__ import annotations +import sys +import json +import time +import asyncio +from pathlib import Path +from typing import Optional + from ...typing import Messages, AsyncResult from ..template import OpenaiTemplate from .qwenContentGenerator import QwenContentGenerator from .qwenOAuth2 import QwenOAuth2Client -from .sharedTokenManager import TokenManagerError +from .sharedTokenManager import TokenManagerError, SharedTokenManager +from .oauthFlow import launch_browser_for_oauth class QwenCode(OpenaiTemplate): label = "Qwen Code 🤖" @@ -70,4 +78,156 @@ class QwenCode(OpenaiTemplate): else: yield chunk except: - raise \ No newline at end of file + raise + + @classmethod + async def login(cls, credentials_path: Optional[Path] = None) -> SharedTokenManager: + """ + Perform interactive OAuth login and save credentials. + + Args: + credentials_path: Path to save credentials (default: g4f cache) + + Returns: + SharedTokenManager with active credentials + + Example: + >>> import asyncio + >>> from g4f.Provider.qwen import QwenCode + >>> asyncio.run(QwenCode.login()) + """ + print("\n" + "=" * 60) + print("QwenCode OAuth Login") + print("=" * 60) + + await launch_browser_for_oauth() + + shared_manager = SharedTokenManager.getInstance() + print("=" * 60 + "\n") + + return shared_manager + + @classmethod + def has_credentials(cls) -> bool: + """Check if valid credentials exist.""" + shared_manager = SharedTokenManager.getInstance() + path = shared_manager.getCredentialFilePath() + return path.exists() + + @classmethod + def get_credentials_path(cls) -> Optional[Path]: + """Get path to credentials file if it exists.""" + shared_manager = SharedTokenManager.getInstance() + path = shared_manager.getCredentialFilePath() + if path.exists(): + return path + return None + + +async def main(): + """CLI entry point for QwenCode authentication.""" + import argparse + + parser = argparse.ArgumentParser( + description="QwenCode OAuth Authentication for gpt4free", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + %(prog)s login # Interactive device code login + %(prog)s status # Check authentication status + %(prog)s logout # Remove saved credentials +""" + ) + + subparsers = parser.add_subparsers(dest="command", help="Commands") + + # Login command + subparsers.add_parser("login", help="Authenticate with Qwen") + + # Status command + subparsers.add_parser("status", help="Check authentication status") + + # Logout command + subparsers.add_parser("logout", help="Remove saved credentials") + + args = parser.parse_args() + + if args.command == "login": + try: + await QwenCode.login() + except KeyboardInterrupt: + print("\n\nLogin cancelled.") + sys.exit(1) + except Exception as e: + print(f"\n❌ Login failed: {e}") + sys.exit(1) + + elif args.command == "status": + print("\nQwenCode Authentication Status") + print("=" * 40) + + if QwenCode.has_credentials(): + creds_path = QwenCode.get_credentials_path() + print(f"✓ Credentials found at: {creds_path}") + + try: + with creds_path.open() as f: + creds = json.load(f) + + expiry = creds.get("expiry_date") + if expiry: + expiry_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(expiry / 1000)) + if expiry / 1000 > time.time(): + print(f" Token expires: {expiry_time}") + else: + print(f" Token expired: {expiry_time} (will auto-refresh)") + + if creds.get("resource_url"): + print(f" Endpoint: {creds['resource_url']}") + except Exception as e: + print(f" (Could not read credential details: {e})") + else: + print("✗ No credentials found") + print(f"\nRun 'g4f-qwencode login' to authenticate.") + + print() + + elif args.command == "logout": + print("\nQwenCode Logout") + print("=" * 40) + + removed = False + + shared_manager = SharedTokenManager.getInstance() + path = shared_manager.getCredentialFilePath() + + if path.exists(): + path.unlink() + print(f"✓ Removed: {path}") + removed = True + + # Also try the default location + default_path = Path.home() / ".qwen" / "oauth_creds.json" + if default_path.exists() and default_path != path: + default_path.unlink() + print(f"✓ Removed: {default_path}") + removed = True + + if removed: + print("\n✓ Credentials removed successfully.") + else: + print("No credentials found to remove.") + + print() + + else: + parser.print_help() + + +def cli_main(): + """Synchronous CLI entry point for setup.py console_scripts.""" + asyncio.run(main()) + + +if __name__ == "__main__": + cli_main() \ No newline at end of file diff --git a/setup.py b/setup.py index bc03deaa..5f9b1ff7 100644 --- a/setup.py +++ b/setup.py @@ -120,6 +120,8 @@ setup( 'g4f=g4f.cli:main', 'g4f-mcp=g4f.mcp.server:main', 'g4f-antigravity=g4f.Provider.needs_auth.Antigravity:cli_main', + 'g4f-geminicli=g4f.Provider.needs_auth.GeminiCLI:cli_main', + 'g4f-qwencode=g4f.Provider.qwen.QwenCode:cli_main', ], }, url='https://github.com/xtekky/gpt4free', # Link to your GitHub repository