Add -e option to CLI client

This commit is contained in:
hlohaus 2025-08-09 08:00:47 +02:00
parent 8e6ddfbf47
commit adb756ef45

View file

@ -7,6 +7,7 @@ import json
import argparse
import traceback
import requests
from pathlib import Path
from typing import Optional, List, Dict
from g4f.client import AsyncClient
@ -17,96 +18,99 @@ from g4f.image import extract_data_uri, is_accepted_format
from g4f.image.copy_images import get_media_dir
from g4f.client.helper import filter_markdown
from g4f.errors import MissingRequirementsError
try:
from g4f.integration.markitdown import MarkItDown
has_markitdown = True
except ImportError:
has_markitdown = False
from g4f.config import CONFIG_DIR, COOKIES_DIR
from g4f import debug
CONVERSATION_FILE = CONFIG_DIR / "conversation.json"
class ConversationManager:
"""Manages conversation history and state."""
def __init__(self, file_path: Optional[Path] = None, model: Optional[str] = None, provider: Optional[str] = None) -> None:
self.file_path: Optional[Path] = file_path
self.model: Optional[str] = model
self.provider: Optional[str] = provider
self.conversation = None
def __init__(
self,
file_path: Optional[Path] = None,
model: Optional[str] = None,
provider: Optional[str] = None,
max_messages: int = 5
) -> None:
self.file_path = file_path
self.model = model
self.provider = provider
self.max_messages = max_messages
self.conversation: Optional[JsonConversation] = None
self.history: List[Dict[str, str]] = []
self.data: Dict = {}
self._load()
def _load(self) -> None:
"""Load conversation from file."""
if self.file_path is None or not self.file_path.is_file():
if not self.file_path or not self.file_path.is_file():
return
try:
with open(self.file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
self.model = data.get("model") if self.model is None and self.provider is None else self.model
self.provider = data.get("provider") if self.provider is None else self.provider
if not self.provider:
self.provider = None
self.data = data.get("data", {})
if self.provider and self.data.get(self.provider):
self.conversation = JsonConversation(**self.data.get(self.provider))
elif not self.provider and self.data:
self.conversation = JsonConversation(**self.data)
self.history = data.get("items", [])
except (json.JSONDecodeError, KeyError) as e:
print(f"Error loading conversation: {e}", file=sys.stderr)
if self.model is None:
self.model = data.get("model")
if self.provider is None:
self.provider = data.get("provider")
self.data = data.get("data", {})
if self.provider and self.data.get(self.provider):
self.conversation = JsonConversation(**self.data[self.provider])
elif not self.provider and self.data:
self.conversation = JsonConversation(**self.data)
self.history = data.get("items", [])
except Exception as e:
print(f"Unexpected error loading conversation: {e}", file=sys.stderr)
print(f"Error loading conversation: {e}", file=sys.stderr)
def save(self) -> None:
"""Save conversation to file."""
if self.file_path is None:
if not self.file_path:
return
try:
if self.conversation and self.provider:
self.data[self.provider] = self.conversation.get_dict()
elif self.conversation:
self.data.update(self.conversation.get_dict())
payload = {
"model": self.model,
"provider": self.provider,
"data": self.data,
"items": self.history
}
with open(self.file_path, 'w', encoding='utf-8') as f:
if self.conversation and self.provider:
self.data[self.provider] = self.conversation.get_dict()
else:
self.data = {**self.data, **(self.conversation.get_dict() if self.conversation else {})}
json.dump({
"model": self.model,
"provider": self.provider,
"data": self.data,
"items": self.history
}, f, indent=2, ensure_ascii=False)
json.dump(payload, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f"Error saving conversation: {e}", file=sys.stderr)
def add_message(self, role: str, content: str) -> None:
"""Add a message to the conversation."""
self.history.append({"role": role, "content": content})
def get_messages(self) -> List[Dict[str, str]]:
"""Get all messages in the conversation."""
return self.history
result = []
for item in self.history[-self.max_messages:]:
if item.get("role") in ["user", "system"] or result:
result.append(item)
return result
async def stream_response(
client: AsyncClient,
input_text: str,
input_text,
conversation: ConversationManager,
output_file: Optional[Path] = None,
instructions: Optional[str] = None
) -> None:
"""Stream the response from the API and update conversation."""
media = None
if isinstance(input_text, tuple):
media, input_text = input_text
if instructions:
# Add system instructions to conversation if provided
conversation.add_message("system", instructions)
# Add user message to conversation
conversation.add_message("user", input_text)
create_args = {
@ -117,214 +121,208 @@ async def stream_response(
"conversation": conversation.conversation,
}
response_content = []
response_tokens = []
last_chunk = None
async for chunk in client.chat.completions.create(**create_args):
last_chunk = chunk
token = chunk.choices[0].delta.content
if not token:
delta = chunk.choices[0].delta.content
if not delta:
continue
if is_content(token):
response_content.append(token)
try:
print(token, end="", flush=True)
except (IOError, BrokenPipeError) as e:
print(f"\nError writing to stdout: {e}", file=sys.stderr)
break
print("\n", end="")
if is_content(delta):
response_tokens.append(delta)
print(delta, end="", flush=True)
print()
conversation.conversation = getattr(last_chunk, "conversation", conversation.conversation)
media_content = next(iter([chunk for chunk in response_content if isinstance(chunk, MediaResponse)]), None)
response_content = response_content[0] if len(response_content) == 1 else "".join([str(chunk) for chunk in response_content])
if output_file:
if save_content(response_content, media_content, output_file):
print(f"\nResponse saved to {output_file}")
if last_chunk and hasattr(last_chunk, "conversation"):
conversation.conversation = last_chunk.conversation
if response_content:
# Add assistant message to conversation
conversation.add_message("assistant", str(response_content))
media_chunk = next((t for t in response_tokens if isinstance(t, MediaResponse)), None)
text_response = ""
if media_chunk:
text_response = response_tokens[0] if len(response_tokens) == 1 else "".join(str(t) for t in response_tokens)
else:
raise RuntimeError("No response received from the API")
text_response = "".join(str(t) for t in response_tokens)
def save_content(content, media_content: Optional[MediaResponse], filepath: str, allowed_types = None):
if media_content is not None:
for url in media_content.urls:
if url.startswith("http://") or url.startswith("https://"):
if output_file:
if save_content(text_response, media_chunk, str(output_file)):
print(f"\n→ Response saved to '{output_file}'")
if text_response:
conversation.add_message("assistant", text_response)
else:
raise RuntimeError("No response received")
def save_content(content, media: Optional[MediaResponse], filepath: str, allowed_types=None) -> bool:
if media:
for url in media.urls:
if url.startswith(("http://", "https://")):
try:
response = requests.get(url, cookies=media_content.get("cookies"), headers=media_content.get("headers"))
if response.status_code == 200:
resp = requests.get(url, cookies=media.get("cookies"), headers=media.get("headers"))
if resp.status_code == 200:
with open(filepath, "wb") as f:
f.write(response.content)
f.write(resp.content)
return True
except requests.RequestException as e:
print(f"Error downloading {url}: {e}", file=sys.stderr)
except Exception as e:
print(f"Error fetching media '{url}': {e}", file=sys.stderr)
return False
else:
content = url
break
elif hasattr(content, "data"):
if hasattr(content, "data"):
content = content.data
if not content:
print("\nNo content to save.", file=sys.stderr)
return False
if content.startswith("/media/"):
os.rename(content.replace("/media", get_media_dir()).split("?")[0], filepath)
return True
elif content.startswith("data:"):
if content.startswith("data:"):
with open(filepath, "wb") as f:
f.write(extract_data_uri(content))
return True
content = filter_markdown(content, allowed_types)
if content:
with open(filepath, "w") as f:
f.write(content)
return True
else:
print("\nNo valid content to save.", file=sys.stderr)
return False
if content.startswith("/media/"):
src = content.replace("/media", get_media_dir()).split("?")[0]
os.rename(src, filepath)
return True
filtered = filter_markdown(content, allowed_types)
if filtered:
with open(filepath, "w", encoding="utf-8") as f:
f.write(filtered)
return True
print("\nUnable to save content.", file=sys.stderr)
return False
def get_parser():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
description="G4F CLI client with conversation history",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("--debug", "-d", action="store_true", help="Enable verbose logging.")
parser.add_argument('-d', '--debug', action='store_true', help="Verbose debug")
parser.add_argument('-p', '--provider', default=None,
help=f"Provider to use: {', '.join(k for k,v in ProviderUtils.convert.items() if v.working)}")
parser.add_argument('-m', '--model', help="Model name")
parser.add_argument('-O', '--output', type=Path,
help="Save assistant output to FILE (text or media)")
parser.add_argument('-i', '--instructions', help="System instructions")
parser.add_argument('-c', '--cookies-dir', type=Path, default=COOKIES_DIR,
help="Cookies/HAR directory")
parser.add_argument('--conversation-file', type=Path, default=CONVERSATION_FILE,
help="Conversation JSON")
parser.add_argument('-C', '--clear-history', action='store_true', help="Wipe history")
parser.add_argument('-N', '--no-config', action='store_true', help="Skip loading history")
# <-- updated -e/--edit to take an optional filename
parser.add_argument(
'-p', '--provider',
default=None,
help=f"Provider to use. Available: {', '.join([key for key, provider in ProviderUtils.convert.items() if provider.working])}."
)
parser.add_argument(
'-m', '--model',
help="Model to use (provider-specific)"
)
parser.add_argument(
'-O', '--output',
default=None,
'-e', '--edit',
type=Path,
metavar='FILE',
help="Output file to save the response file."
help="If FILE given: send its contents and overwrite it with AI's reply."
)
parser.add_argument(
'-i', '--instructions',
default=None,
help="Add custom system instructions."
)
parser.add_argument(
'-c', '--cookies-dir',
type=Path,
default=COOKIES_DIR,
help="Directory containing cookies for authenticated providers"
)
parser.add_argument(
'--conversation-file',
type=Path,
metavar='FILE',
default=CONVERSATION_FILE,
help="File to store/load conversation state"
)
parser.add_argument(
'-C', '--clear-history',
action='store_true',
help="Clear conversation history before starting"
)
parser.add_argument(
'-N', '--no-config',
action='store_true',
help="Do not load configuration from conversation file"
)
parser.add_argument(
'input',
nargs='*',
help="Input urls, files and text (or read from stdin)"
)
parser.add_argument('--max-messages', type=int, default=5,
help="Max user+assistant turns in context")
parser.add_argument('input', nargs='*',
help="URLs, image paths or plain text")
return parser
async def run_args(input_text: str, args):
async def run_args(input_val, args):
try:
# Ensure directories exist
# ensure dirs
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
args.conversation_file.parent.mkdir(parents=True, exist_ok=True)
if args.conversation_file:
args.conversation_file.parent.mkdir(parents=True, exist_ok=True)
args.cookies_dir.mkdir(parents=True, exist_ok=True)
if args.debug:
debug.logging = True
# Initialize conversation manager
conversation = ConversationManager(None if args.no_config else args.conversation_file, args.model, args.provider)
conv = ConversationManager(
None if args.no_config else args.conversation_file,
model=args.model,
provider=args.provider,
max_messages=args.max_messages
)
if args.clear_history:
conversation.history = []
conversation.conversation = None
conv.history = []
conv.conversation = None
# Set cookies directory if specified
set_cookies_dir(str(args.cookies_dir))
read_cookie_files()
# Initialize client with selected provider
client = AsyncClient(provider=conversation.provider)
client = AsyncClient(provider=conv.provider)
# Stream response and update conversation
await stream_response(client, input_text, conversation, args.output, args.instructions)
if isinstance(args.edit, Path):
file_to_edit = args.edit
if not file_to_edit.exists():
print(f"ERROR: file not found: {file_to_edit}", file=sys.stderr)
sys.exit(1)
text = file_to_edit.read_text(encoding="utf-8")
# we will both send and overwrite this file
input_val = f"```file: {file_to_edit}\n{text}\n```\n" + (input_val[1] if isinstance(input_val, tuple) else input_val)
output_target = file_to_edit
else:
# normal, non-edit mode
output_target = args.output
# Save conversation state
conversation.save()
except:
await stream_response(client, input_val, conv, output_target, args.instructions)
conv.save()
except Exception:
print(traceback.format_exc(), file=sys.stderr)
sys.exit(1)
def run_client_args(args):
input_text = ""
input_txt = ""
media = []
rest = 0
for idx, input_value in enumerate(args.input):
if input_value.startswith("http://") or input_value.startswith("https://"):
response = requests.head(input_value)
if not response.ok:
print(f"Error accessing URL {input_value}: {response.status_code}", file=sys.stderr)
break
if response.headers.get('Content-Type', '').startswith('image/'):
media.append(input_value)
for idx, tok in enumerate(args.input):
if tok.startswith(("http://","https://")):
# same URL logic...
resp = requests.head(tok, allow_redirects=True)
if resp.ok and resp.headers.get("Content-Type","").startswith("image"):
media.append(tok)
else:
try:
if not has_markitdown:
raise MissingRequirementsError("MarkItDown is not installed. Install it with `pip install -U markitdown`.")
md = MarkItDown()
text_content = md.convert_url(input_value).text_content
input_text += f"\n```\n{text_content}\n\nSource: {input_value}\n```\n"
except Exception as e:
print(f"Error processing URL {input_value}: {type(e).__name__}: {e}", file=sys.stderr)
break
elif os.path.isfile(input_value):
if not has_markitdown:
raise MissingRequirementsError("Install markitdown")
md = MarkItDown()
txt = md.convert_url(tok).text_content
input_txt += f"\n```source: {tok}\n{txt}\n```\n"
elif os.path.isfile(tok):
head = Path(tok).read_bytes()[:12]
try:
with open(input_value, 'rb') as f:
if is_accepted_format(f.read(12)):
media.append(Path(input_value))
if is_accepted_format(head):
media.append(Path(tok))
is_img = True
else:
is_img = False
except ValueError:
# If not a valid image, read as text
try:
with open(input_value, 'r', encoding='utf-8') as f:
file_content = f.read().strip()
except UnicodeDecodeError:
print(f"Error reading file {input_value} as text. Ensure it is a valid text file.", file=sys.stderr)
break
input_text += f"\n```{input_value}\n{file_content}\n```\n"
is_img = False
if not is_img:
txt = Path(tok).read_text(encoding="utf-8")
input_txt += f"\n```file: {tok}\n{txt}\n```\n"
else:
rest = idx
break
rest = idx + 1
input_text = (" ".join(args.input[rest:])).strip() + input_text
tail = args.input[rest:]
if tail:
input_txt = " ".join(tail) + "\n" + input_txt
if not sys.stdin.isatty() and not input_txt:
input_txt = sys.stdin.read()
if media:
input_text = (media, input_text)
if not sys.stdin.isatty() and not input_text:
input_text = sys.stdin.read().strip()
if not input_text:
print("No input provided. Use -h for help.", file=sys.stderr)
val = (media, input_txt)
else:
val = input_txt.strip()
if not val:
print("No input provided. Use -h.", file=sys.stderr)
sys.exit(1)
# Run the client with provided arguments
asyncio.run(run_args(input_text, args))
asyncio.run(run_args(val, args))
if __name__ == "__main__":
# Run the client with command line arguments
run_client_args(get_parser().parse_args())