diff --git a/g4f/Provider/needs_auth/GeminiCLI.py b/g4f/Provider/needs_auth/GeminiCLI.py
index 46fbf254..4496cbd4 100644
--- a/g4f/Provider/needs_auth/GeminiCLI.py
+++ b/g4f/Provider/needs_auth/GeminiCLI.py
@@ -1,9 +1,17 @@
import os
+import sys
import json
import base64
import time
+import secrets
+import hashlib
+import asyncio
+import webbrowser
+import threading
from pathlib import Path
-from typing import Any, AsyncGenerator, Dict, List, Optional, Union
+from typing import Any, AsyncGenerator, Dict, List, Optional, Union, Tuple
+from urllib.parse import urlencode, parse_qs, urlparse
+from http.server import HTTPServer, BaseHTTPRequestHandler
import aiohttp
from aiohttp import ClientSession, ClientTimeout
@@ -17,9 +25,213 @@ from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin, AuthFile
from ..helper import get_connector, get_system_prompt, format_media_prompt
from ... import debug
+
def get_oauth_creds_path():
return Path.home() / ".gemini" / "oauth_creds.json"
+
+# OAuth configuration for GeminiCLI
+GEMINICLI_REDIRECT_URI = "http://localhost:51122/oauthcallback"
+GEMINICLI_SCOPES = [
+ "https://www.googleapis.com/auth/cloud-platform",
+ "https://www.googleapis.com/auth/userinfo.email",
+ "https://www.googleapis.com/auth/userinfo.profile",
+]
+GEMINICLI_OAUTH_CALLBACK_PORT = 51122
+GEMINICLI_OAUTH_CALLBACK_PATH = "/oauthcallback"
+
+
+def generate_pkce_pair() -> Tuple[str, str]:
+ """Generate a PKCE verifier and challenge pair."""
+ verifier = secrets.token_urlsafe(32)
+ digest = hashlib.sha256(verifier.encode('ascii')).digest()
+ challenge = base64.urlsafe_b64encode(digest).rstrip(b'=').decode('ascii')
+ return verifier, challenge
+
+
+def encode_oauth_state(verifier: str) -> str:
+ """Encode OAuth state parameter with PKCE verifier."""
+ payload = {"verifier": verifier}
+ return base64.urlsafe_b64encode(json.dumps(payload).encode()).decode().rstrip('=')
+
+
+def decode_oauth_state(state: str) -> Dict[str, str]:
+ """Decode OAuth state parameter back to verifier."""
+ padded = state + '=' * (4 - len(state) % 4) if len(state) % 4 else state
+ normalized = padded.replace('-', '+').replace('_', '/')
+ try:
+ decoded = base64.b64decode(normalized).decode('utf-8')
+ parsed = json.loads(decoded)
+ return {"verifier": parsed.get("verifier", "")}
+ except Exception:
+ return {"verifier": ""}
+
+
+class GeminiCLIOAuthCallbackHandler(BaseHTTPRequestHandler):
+ """HTTP request handler for OAuth callback."""
+
+ callback_result: Optional[Dict[str, str]] = None
+ callback_error: Optional[str] = None
+
+ def log_message(self, format, *args):
+ """Suppress default logging."""
+ pass
+
+ def do_GET(self):
+ """Handle GET request for OAuth callback."""
+ parsed = urlparse(self.path)
+
+ if parsed.path != GEMINICLI_OAUTH_CALLBACK_PATH:
+ self.send_error(404, "Not Found")
+ return
+
+ params = parse_qs(parsed.query)
+ code = params.get("code", [None])[0]
+ state = params.get("state", [None])[0]
+ error = params.get("error", [None])[0]
+
+ if error:
+ GeminiCLIOAuthCallbackHandler.callback_error = error
+ self._send_error_response(error)
+ elif code and state:
+ GeminiCLIOAuthCallbackHandler.callback_result = {"code": code, "state": state}
+ self._send_success_response()
+ else:
+ GeminiCLIOAuthCallbackHandler.callback_error = "Missing code or state parameter"
+ self._send_error_response("Missing parameters")
+
+ def _send_success_response(self):
+ """Send success HTML response."""
+ html = """
+
+
+
+ Authentication Successful
+
+
+
+
+
✅
+
Authentication Successful!
+
You have successfully authenticated with Google GeminiCLI.
You can close this window and return to your terminal.
+
+
+"""
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html; charset=utf-8")
+ self.send_header("Content-Length", len(html.encode()))
+ self.end_headers()
+ self.wfile.write(html.encode())
+
+ def _send_error_response(self, error: str):
+ """Send error HTML response."""
+ html = f"""
+
+
+
+ Authentication Failed
+
+
+
+
+
❌ Authentication Failed
+
Error: {error}
+
Please try again.
+
+
+"""
+ self.send_response(400)
+ self.send_header("Content-Type", "text/html; charset=utf-8")
+ self.send_header("Content-Length", len(html.encode()))
+ self.end_headers()
+ self.wfile.write(html.encode())
+
+
+class GeminiCLIOAuthCallbackServer:
+ """Local HTTP server to capture OAuth callback."""
+
+ def __init__(self, port: int = GEMINICLI_OAUTH_CALLBACK_PORT, timeout: float = 300.0):
+ self.port = port
+ self.timeout = timeout
+ self.server: Optional[HTTPServer] = None
+ self._thread: Optional[threading.Thread] = None
+ self._stop_flag = False
+
+ def start(self) -> bool:
+ """Start the callback server. Returns True if successful."""
+ try:
+ GeminiCLIOAuthCallbackHandler.callback_result = None
+ GeminiCLIOAuthCallbackHandler.callback_error = None
+ self._stop_flag = False
+
+ self.server = HTTPServer(("localhost", self.port), GeminiCLIOAuthCallbackHandler)
+ self.server.timeout = 0.5
+
+ self._thread = threading.Thread(target=self._serve, daemon=True)
+ self._thread.start()
+ return True
+ except OSError as e:
+ debug.log(f"Failed to start OAuth callback server: {e}")
+ return False
+
+ def _serve(self):
+ """Serve requests until shutdown or result received."""
+ start_time = time.time()
+ while not self._stop_flag and self.server:
+ if time.time() - start_time > self.timeout:
+ break
+ if GeminiCLIOAuthCallbackHandler.callback_result or GeminiCLIOAuthCallbackHandler.callback_error:
+ time.sleep(0.3)
+ break
+ try:
+ self.server.handle_request()
+ except Exception:
+ break
+
+ def wait_for_callback(self) -> Optional[Dict[str, str]]:
+ """Wait for OAuth callback and return result."""
+ start_time = time.time()
+ while time.time() - start_time < self.timeout:
+ if GeminiCLIOAuthCallbackHandler.callback_result or GeminiCLIOAuthCallbackHandler.callback_error:
+ break
+ time.sleep(0.1)
+
+ self._stop_flag = True
+
+ if self._thread:
+ self._thread.join(timeout=2.0)
+
+ if GeminiCLIOAuthCallbackHandler.callback_error:
+ raise RuntimeError(f"OAuth error: {GeminiCLIOAuthCallbackHandler.callback_error}")
+
+ return GeminiCLIOAuthCallbackHandler.callback_result
+
+ def stop(self):
+ """Stop the callback server."""
+ self._stop_flag = True
+ if self.server:
+ try:
+ self.server.server_close()
+ except Exception:
+ pass
+ self.server = None
+
class AuthManager(AuthFileMixin):
"""
Handles OAuth2 authentication and Google Code Assist API communication.
@@ -567,4 +779,363 @@ class GeminiCLI(AsyncGeneratorProvider, ProviderModelMixin):
tools=tools,
**kwargs
):
- yield chunk
\ No newline at end of file
+ yield chunk
+
+ @classmethod
+ def build_authorization_url(cls) -> Tuple[str, str, str]:
+ """Build OAuth authorization URL with PKCE."""
+ verifier, challenge = generate_pkce_pair()
+ state = encode_oauth_state(verifier)
+
+ params = {
+ "client_id": AuthManager.OAUTH_CLIENT_ID,
+ "response_type": "code",
+ "redirect_uri": GEMINICLI_REDIRECT_URI,
+ "scope": " ".join(GEMINICLI_SCOPES),
+ "code_challenge": challenge,
+ "code_challenge_method": "S256",
+ "state": state,
+ "access_type": "offline",
+ "prompt": "consent",
+ }
+
+ url = f"https://accounts.google.com/o/oauth2/v2/auth?{urlencode(params)}"
+ return url, verifier, state
+
+ @classmethod
+ async def exchange_code_for_tokens(cls, code: str, state: str) -> Dict[str, Any]:
+ """Exchange authorization code for access and refresh tokens."""
+ decoded_state = decode_oauth_state(state)
+ verifier = decoded_state.get("verifier", "")
+
+ if not verifier:
+ raise RuntimeError("Missing PKCE verifier in state parameter")
+
+ start_time = time.time()
+
+ async with aiohttp.ClientSession() as session:
+ token_data = {
+ "client_id": AuthManager.OAUTH_CLIENT_ID,
+ "client_secret": AuthManager.OAUTH_CLIENT_SECRET,
+ "code": code,
+ "grant_type": "authorization_code",
+ "redirect_uri": GEMINICLI_REDIRECT_URI,
+ "code_verifier": verifier,
+ }
+
+ async with session.post(
+ "https://oauth2.googleapis.com/token",
+ data=token_data,
+ headers={"Content-Type": "application/x-www-form-urlencoded"}
+ ) as resp:
+ if not resp.ok:
+ error_text = await resp.text()
+ raise RuntimeError(f"Token exchange failed: {error_text}")
+
+ token_response = await resp.json()
+
+ access_token = token_response.get("access_token")
+ refresh_token = token_response.get("refresh_token")
+ expires_in = token_response.get("expires_in", 3600)
+
+ if not access_token or not refresh_token:
+ raise RuntimeError("Missing tokens in response")
+
+ # Get user info
+ email = None
+ async with session.get(
+ "https://www.googleapis.com/oauth2/v1/userinfo?alt=json",
+ headers={"Authorization": f"Bearer {access_token}"}
+ ) as resp:
+ if resp.ok:
+ user_info = await resp.json()
+ email = user_info.get("email")
+
+ expires_at = int((start_time + expires_in) * 1000) # milliseconds
+
+ return {
+ "access_token": access_token,
+ "refresh_token": refresh_token,
+ "expiry_date": expires_at,
+ "email": email,
+ }
+
+ @classmethod
+ async def interactive_login(
+ cls,
+ no_browser: bool = False,
+ timeout: float = 300.0,
+ ) -> Dict[str, Any]:
+ """Perform interactive OAuth login flow."""
+ auth_url, verifier, state = cls.build_authorization_url()
+
+ print("\n" + "=" * 60)
+ print("GeminiCLI OAuth Login")
+ print("=" * 60)
+
+ callback_server = GeminiCLIOAuthCallbackServer(timeout=timeout)
+ server_started = callback_server.start()
+
+ if server_started and not no_browser:
+ print(f"\nOpening browser for authentication...")
+ print(f"If browser doesn't open, visit this URL:\n")
+ print(f"{auth_url}\n")
+
+ try:
+ webbrowser.open(auth_url)
+ except Exception as e:
+ print(f"Could not open browser automatically: {e}")
+ print("Please open the URL above manually.\n")
+ else:
+ if not server_started:
+ print(f"\nCould not start local callback server on port {GEMINICLI_OAUTH_CALLBACK_PORT}.")
+ print("You may need to close any application using that port.\n")
+
+ print(f"\nPlease open this URL in your browser:\n")
+ print(f"{auth_url}\n")
+
+ if server_started:
+ print("Waiting for authentication callback...")
+
+ try:
+ callback_result = callback_server.wait_for_callback()
+
+ if not callback_result:
+ raise RuntimeError("OAuth callback timed out")
+
+ code = callback_result.get("code")
+ callback_state = callback_result.get("state")
+
+ if not code:
+ raise RuntimeError("No authorization code received")
+
+ print("\n✓ Authorization code received. Exchanging for tokens...")
+
+ tokens = await cls.exchange_code_for_tokens(code, callback_state or state)
+
+ print(f"✓ Authentication successful!")
+ if tokens.get("email"):
+ print(f" Logged in as: {tokens['email']}")
+
+ return tokens
+
+ finally:
+ callback_server.stop()
+ else:
+ print("\nAfter completing authentication, you'll be redirected to a localhost URL.")
+ print("Copy and paste the full redirect URL or just the authorization code below:\n")
+
+ user_input = input("Paste redirect URL or code: ").strip()
+
+ if not user_input:
+ raise RuntimeError("No input provided")
+
+ if user_input.startswith("http"):
+ parsed = urlparse(user_input)
+ params = parse_qs(parsed.query)
+ code = params.get("code", [None])[0]
+ callback_state = params.get("state", [state])[0]
+ else:
+ code = user_input
+ callback_state = state
+
+ if not code:
+ raise RuntimeError("Could not extract authorization code")
+
+ print("\nExchanging code for tokens...")
+ tokens = await cls.exchange_code_for_tokens(code, callback_state)
+
+ print(f"✓ Authentication successful!")
+ if tokens.get("email"):
+ print(f" Logged in as: {tokens['email']}")
+
+ return tokens
+
+ @classmethod
+ async def login(
+ cls,
+ no_browser: bool = False,
+ credentials_path: Optional[Path] = None,
+ ) -> "AuthManager":
+ """
+ Perform interactive OAuth login and save credentials.
+
+ Args:
+ no_browser: If True, don't auto-open browser
+ credentials_path: Path to save credentials
+
+ Returns:
+ AuthManager with active credentials
+
+ Example:
+ >>> import asyncio
+ >>> from g4f.Provider.needs_auth import GeminiCLI
+ >>> asyncio.run(GeminiCLI.login())
+ """
+ tokens = await cls.interactive_login(no_browser=no_browser)
+
+ creds = {
+ "access_token": tokens["access_token"],
+ "refresh_token": tokens["refresh_token"],
+ "expiry_date": tokens["expiry_date"],
+ "email": tokens.get("email"),
+ "client_id": AuthManager.OAUTH_CLIENT_ID,
+ "client_secret": AuthManager.OAUTH_CLIENT_SECRET,
+ }
+
+ if credentials_path:
+ path = credentials_path
+ else:
+ path = AuthManager.get_cache_file()
+
+ path.parent.mkdir(parents=True, exist_ok=True)
+
+ with path.open("w") as f:
+ json.dump(creds, f, indent=2)
+
+ try:
+ path.chmod(0o600)
+ except Exception:
+ pass
+
+ print(f"\n✓ Credentials saved to: {path}")
+ print("=" * 60 + "\n")
+
+ auth_manager = AuthManager(env=os.environ)
+ auth_manager._access_token = tokens["access_token"]
+ auth_manager._expiry = tokens["expiry_date"] / 1000
+ cls.auth_manager = auth_manager
+
+ return auth_manager
+
+ @classmethod
+ def has_credentials(cls) -> bool:
+ """Check if valid credentials exist."""
+ cache_path = AuthManager.get_cache_file()
+ if cache_path.exists():
+ return True
+ default_path = get_oauth_creds_path()
+ return default_path.exists()
+
+ @classmethod
+ def get_credentials_path(cls) -> Optional[Path]:
+ """Get path to credentials file if it exists."""
+ cache_path = AuthManager.get_cache_file()
+ if cache_path.exists():
+ return cache_path
+ default_path = get_oauth_creds_path()
+ if default_path.exists():
+ return default_path
+ return None
+
+
+async def main():
+ """CLI entry point for GeminiCLI authentication."""
+ import argparse
+
+ parser = argparse.ArgumentParser(
+ description="GeminiCLI OAuth Authentication for gpt4free",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog="""
+Examples:
+ %(prog)s login # Interactive login with browser
+ %(prog)s login --no-browser # Manual login (paste URL)
+ %(prog)s status # Check authentication status
+ %(prog)s logout # Remove saved credentials
+"""
+ )
+
+ subparsers = parser.add_subparsers(dest="command", help="Commands")
+
+ # Login command
+ login_parser = subparsers.add_parser("login", help="Authenticate with Google")
+ login_parser.add_argument(
+ "--no-browser", "-n",
+ action="store_true",
+ help="Don't auto-open browser, print URL instead"
+ )
+
+ # Status command
+ subparsers.add_parser("status", help="Check authentication status")
+
+ # Logout command
+ subparsers.add_parser("logout", help="Remove saved credentials")
+
+ args = parser.parse_args()
+
+ if args.command == "login":
+ try:
+ await GeminiCLI.login(no_browser=args.no_browser)
+ except KeyboardInterrupt:
+ print("\n\nLogin cancelled.")
+ sys.exit(1)
+ except Exception as e:
+ print(f"\n❌ Login failed: {e}")
+ sys.exit(1)
+
+ elif args.command == "status":
+ print("\nGeminiCLI Authentication Status")
+ print("=" * 40)
+
+ if GeminiCLI.has_credentials():
+ creds_path = GeminiCLI.get_credentials_path()
+ print(f"✓ Credentials found at: {creds_path}")
+
+ try:
+ with creds_path.open() as f:
+ creds = json.load(f)
+
+ if creds.get("email"):
+ print(f" Email: {creds['email']}")
+
+ expiry = creds.get("expiry_date")
+ if expiry:
+ expiry_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(expiry / 1000))
+ if expiry / 1000 > time.time():
+ print(f" Token expires: {expiry_time}")
+ else:
+ print(f" Token expired: {expiry_time} (will auto-refresh)")
+ except Exception as e:
+ print(f" (Could not read credential details: {e})")
+ else:
+ print("✗ No credentials found")
+ print(f"\nRun 'g4f-geminicli login' to authenticate.")
+
+ print()
+
+ elif args.command == "logout":
+ print("\nGeminiCLI Logout")
+ print("=" * 40)
+
+ removed = False
+
+ cache_path = AuthManager.get_cache_file()
+ if cache_path.exists():
+ cache_path.unlink()
+ print(f"✓ Removed: {cache_path}")
+ removed = True
+
+ default_path = get_oauth_creds_path()
+ if default_path.exists():
+ default_path.unlink()
+ print(f"✓ Removed: {default_path}")
+ removed = True
+
+ if removed:
+ print("\n✓ Credentials removed successfully.")
+ else:
+ print("No credentials found to remove.")
+
+ print()
+
+ else:
+ parser.print_help()
+
+
+def cli_main():
+ """Synchronous CLI entry point for setup.py console_scripts."""
+ asyncio.run(main())
+
+
+if __name__ == "__main__":
+ cli_main()
\ No newline at end of file
diff --git a/g4f/Provider/qwen/QwenCode.py b/g4f/Provider/qwen/QwenCode.py
index 939325df..f8369ffb 100644
--- a/g4f/Provider/qwen/QwenCode.py
+++ b/g4f/Provider/qwen/QwenCode.py
@@ -1,10 +1,18 @@
from __future__ import annotations
+import sys
+import json
+import time
+import asyncio
+from pathlib import Path
+from typing import Optional
+
from ...typing import Messages, AsyncResult
from ..template import OpenaiTemplate
from .qwenContentGenerator import QwenContentGenerator
from .qwenOAuth2 import QwenOAuth2Client
-from .sharedTokenManager import TokenManagerError
+from .sharedTokenManager import TokenManagerError, SharedTokenManager
+from .oauthFlow import launch_browser_for_oauth
class QwenCode(OpenaiTemplate):
label = "Qwen Code 🤖"
@@ -70,4 +78,156 @@ class QwenCode(OpenaiTemplate):
else:
yield chunk
except:
- raise
\ No newline at end of file
+ raise
+
+ @classmethod
+ async def login(cls, credentials_path: Optional[Path] = None) -> SharedTokenManager:
+ """
+ Perform interactive OAuth login and save credentials.
+
+ Args:
+ credentials_path: Path to save credentials (default: g4f cache)
+
+ Returns:
+ SharedTokenManager with active credentials
+
+ Example:
+ >>> import asyncio
+ >>> from g4f.Provider.qwen import QwenCode
+ >>> asyncio.run(QwenCode.login())
+ """
+ print("\n" + "=" * 60)
+ print("QwenCode OAuth Login")
+ print("=" * 60)
+
+ await launch_browser_for_oauth()
+
+ shared_manager = SharedTokenManager.getInstance()
+ print("=" * 60 + "\n")
+
+ return shared_manager
+
+ @classmethod
+ def has_credentials(cls) -> bool:
+ """Check if valid credentials exist."""
+ shared_manager = SharedTokenManager.getInstance()
+ path = shared_manager.getCredentialFilePath()
+ return path.exists()
+
+ @classmethod
+ def get_credentials_path(cls) -> Optional[Path]:
+ """Get path to credentials file if it exists."""
+ shared_manager = SharedTokenManager.getInstance()
+ path = shared_manager.getCredentialFilePath()
+ if path.exists():
+ return path
+ return None
+
+
+async def main():
+ """CLI entry point for QwenCode authentication."""
+ import argparse
+
+ parser = argparse.ArgumentParser(
+ description="QwenCode OAuth Authentication for gpt4free",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog="""
+Examples:
+ %(prog)s login # Interactive device code login
+ %(prog)s status # Check authentication status
+ %(prog)s logout # Remove saved credentials
+"""
+ )
+
+ subparsers = parser.add_subparsers(dest="command", help="Commands")
+
+ # Login command
+ subparsers.add_parser("login", help="Authenticate with Qwen")
+
+ # Status command
+ subparsers.add_parser("status", help="Check authentication status")
+
+ # Logout command
+ subparsers.add_parser("logout", help="Remove saved credentials")
+
+ args = parser.parse_args()
+
+ if args.command == "login":
+ try:
+ await QwenCode.login()
+ except KeyboardInterrupt:
+ print("\n\nLogin cancelled.")
+ sys.exit(1)
+ except Exception as e:
+ print(f"\n❌ Login failed: {e}")
+ sys.exit(1)
+
+ elif args.command == "status":
+ print("\nQwenCode Authentication Status")
+ print("=" * 40)
+
+ if QwenCode.has_credentials():
+ creds_path = QwenCode.get_credentials_path()
+ print(f"✓ Credentials found at: {creds_path}")
+
+ try:
+ with creds_path.open() as f:
+ creds = json.load(f)
+
+ expiry = creds.get("expiry_date")
+ if expiry:
+ expiry_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(expiry / 1000))
+ if expiry / 1000 > time.time():
+ print(f" Token expires: {expiry_time}")
+ else:
+ print(f" Token expired: {expiry_time} (will auto-refresh)")
+
+ if creds.get("resource_url"):
+ print(f" Endpoint: {creds['resource_url']}")
+ except Exception as e:
+ print(f" (Could not read credential details: {e})")
+ else:
+ print("✗ No credentials found")
+ print(f"\nRun 'g4f-qwencode login' to authenticate.")
+
+ print()
+
+ elif args.command == "logout":
+ print("\nQwenCode Logout")
+ print("=" * 40)
+
+ removed = False
+
+ shared_manager = SharedTokenManager.getInstance()
+ path = shared_manager.getCredentialFilePath()
+
+ if path.exists():
+ path.unlink()
+ print(f"✓ Removed: {path}")
+ removed = True
+
+ # Also try the default location
+ default_path = Path.home() / ".qwen" / "oauth_creds.json"
+ if default_path.exists() and default_path != path:
+ default_path.unlink()
+ print(f"✓ Removed: {default_path}")
+ removed = True
+
+ if removed:
+ print("\n✓ Credentials removed successfully.")
+ else:
+ print("No credentials found to remove.")
+
+ print()
+
+ else:
+ parser.print_help()
+
+
+def cli_main():
+ """Synchronous CLI entry point for setup.py console_scripts."""
+ asyncio.run(main())
+
+
+if __name__ == "__main__":
+ cli_main()
\ No newline at end of file
diff --git a/setup.py b/setup.py
index bc03deaa..5f9b1ff7 100644
--- a/setup.py
+++ b/setup.py
@@ -120,6 +120,8 @@ setup(
'g4f=g4f.cli:main',
'g4f-mcp=g4f.mcp.server:main',
'g4f-antigravity=g4f.Provider.needs_auth.Antigravity:cli_main',
+ 'g4f-geminicli=g4f.Provider.needs_auth.GeminiCLI:cli_main',
+ 'g4f-qwencode=g4f.Provider.qwen.QwenCode:cli_main',
],
},
url='https://github.com/xtekky/gpt4free', # Link to your GitHub repository