M2S1.11 / app.py
ZENLLC's picture
Update app.py
38d82b1 verified
"""
ZEN Dual-Engine AI — GPT-5 (OpenAI) + Nano-Banana (Gemini)
Gradio 5.49.1 Space with in-UI API keys, chat history, optional image (Gemini),
telemetry, starter prompts, and robust OpenAI param fallbacks.
Key robustness:
- Auto-retry OpenAI call with `max_completion_tokens` if model rejects `max_tokens`.
- Auto-retry OpenAI call without `temperature` if model only allows the default.
- Clean Gradio 5.49.1 queue usage (no deprecated args).
"""
import os
import time
import base64
from io import BytesIO
from typing import List, Tuple, Dict, Any
import gradio as gr
# -----------------------------
# Constants & Defaults
# -----------------------------
APP_TITLE = "🔮 ZEN Dual-Engine AI — GPT-5 + Nano-Banana"
DEFAULT_OPENAI_MODEL = "gpt-5" # Adjust if your account uses a different label
DEFAULT_GEMINI_MODEL = "gemini-2.5-nano-banana"
SYSTEM_DEFAULT = (
"You are ZEN Assistant. Respond concisely, accurately, and helpfully. "
"If an image is provided, analyze it clearly. Avoid unsafe advice."
)
STARTER_PROMPTS: List[str] = [
"💡 Brainstorm 7 AI-powered product ideas that tackle youth education gaps.",
"📚 Draft a 4-week AI literacy micro-curriculum with hands-on labs.",
"🧪 Design an experiment to compare GPT-5 vs Nano-Banana on code generation.",
"🎨 Describe a museum exhibit that visualizes the history of AI in America.",
"🛠️ Generate a Python function that converts a PDF to clean Markdown.",
"🪐 Write a sci-fi scene about a student building an AI on the Moon.",
"🔍 Summarize the pros/cons of agentic workflows for startups.",
"📈 Propose a metrics dashboard for measuring AI program impact.",
]
# Very light guardrail against trivial injection/script pastes
BLOCKLIST = ["<script", "</script>", "{{", "}}"]
# -----------------------------
# Lazy Imports (boot even if SDKs are missing)
# -----------------------------
def _lazy_import_openai():
try:
from openai import OpenAI # openai>=1.0 interface
return OpenAI
except Exception as e:
raise RuntimeError(f"OpenAI SDK not available: {e}")
def _lazy_import_gemini():
try:
import google.generativeai as genai
return genai
except Exception as e:
raise RuntimeError(f"Google Generative AI SDK not available: {e}")
# -----------------------------
# Utilities
# -----------------------------
def is_blocked(text: str) -> bool:
if not text:
return False
low = text.lower()
return any(tok in low for tok in BLOCKLIST)
def pil_to_base64(image) -> str:
"""Convert PIL image to base64 JPEG (if you ever need raw bytes)."""
buffer = BytesIO()
image.convert("RGB").save(buffer, format="JPEG", quality=92)
return base64.b64encode(buffer.getvalue()).decode("utf-8")
def approx_tokens_from_chars(text: str) -> int:
return int(len(text or "") / 4)
def estimate_cost(provider_label: str, model: str, prompt: str, reply: str) -> float:
"""
Super rough illustrative CPMs. Adjust to your billing reality.
"""
toks = approx_tokens_from_chars(prompt) + approx_tokens_from_chars(reply)
if provider_label.startswith("OpenAI"):
return round(toks / 1_000_000.0 * 7.5, 4) # illustrative
return round(toks / 1_000_000.0 * 5.0, 4) # illustrative
# -----------------------------
# Providers
# -----------------------------
def call_openai_chat(
api_key: str,
model: str,
system_prompt: str,
history_messages: List[Dict[str, str]],
user_message: str,
temperature: float,
max_tokens: int,
) -> str:
"""
Calls OpenAI Chat Completions with adaptive parameter handling:
- If model rejects `max_tokens`, retry with `max_completion_tokens`.
- If model rejects non-default `temperature`, retry omitting temperature (server default).
"""
OpenAI = _lazy_import_openai()
client = OpenAI(api_key=api_key)
messages = [{"role": "system", "content": (system_prompt.strip() or SYSTEM_DEFAULT)}]
messages.extend(history_messages or [])
messages.append({"role": "user", "content": user_message})
base_kwargs = dict(
model=(model.strip() or DEFAULT_OPENAI_MODEL),
messages=messages,
)
# Try #1: legacy `max_tokens` + provided temperature
try:
kwargs_try = dict(base_kwargs)
kwargs_try["temperature"] = float(temperature)
kwargs_try["max_tokens"] = int(max_tokens)
resp = client.chat.completions.create(**kwargs_try)
return resp.choices[0].message.content
except Exception as e1:
msg1 = str(e1)
# If model wants `max_completion_tokens`
needs_mct = ("max_tokens" in msg1 and "max_completion_tokens" in msg1) or "Unsupported parameter" in msg1
# If model wants default temperature only
temp_default_only = ("temperature" in msg1) and ("unsupported_value" in msg1 or "Only the default" in msg1)
# Path A: fix tokens first, keep temperature
if needs_mct and not temp_default_only:
try:
kwargs_try = dict(base_kwargs)
kwargs_try["temperature"] = float(temperature)
kwargs_try["max_completion_tokens"] = int(max_tokens)
resp = client.chat.completions.create(**kwargs_try)
return resp.choices[0].message.content
except Exception as e2:
msg2 = str(e2)
# If that new attempt also complains about temperature, handle below
temp_default_only = ("temperature" in msg2) and ("unsupported_value" in msg2 or "Only the default" in msg2)
# Path B: fix temperature only (omit it), keep legacy tokens
if temp_default_only and not needs_mct:
try:
kwargs_try = dict(base_kwargs)
kwargs_try["max_tokens"] = int(max_tokens)
resp = client.chat.completions.create(**kwargs_try)
return resp.choices[0].message.content
except Exception as e3:
msg3 = str(e3)
# If now it also wants max_completion_tokens, do both
needs_mct = ("max_tokens" in msg3 and "max_completion_tokens" in msg3) or "Unsupported parameter" in msg3
# Path C: needs both fixes (no temperature + max_completion_tokens)
if needs_mct and temp_default_only:
kwargs_try = dict(base_kwargs)
kwargs_try["max_completion_tokens"] = int(max_tokens)
resp = client.chat.completions.create(**kwargs_try) # omit temperature
return resp.choices[0].message.content
# If none matched, re-raise original error
raise
def call_gemini_generate(
api_key: str,
model: str,
system_prompt: str,
user_message: str,
image=None,
temperature: float = 0.4,
) -> str:
"""
Calls Gemini (including Nano-Banana variants). Supports optional PIL image.
"""
genai = _lazy_import_gemini()
genai.configure(api_key=api_key)
# relaxed demo thresholds; adjust per policy as needed
safety_settings = [
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_SEXUAL_CONTENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
]
model_obj = genai.GenerativeModel(
model_name=(model.strip() or DEFAULT_GEMINI_MODEL),
system_instruction=(system_prompt.strip() or SYSTEM_DEFAULT),
safety_settings=safety_settings,
generation_config={"temperature": float(temperature)},
)
parts: List[Any] = [user_message or ""]
if image is not None:
parts.append(image) # PIL image supported directly
resp = model_obj.generate_content(parts)
if hasattr(resp, "text") and resp.text:
return resp.text
cand = getattr(resp, "candidates", None)
if cand and getattr(cand[0], "content", None):
parts = getattr(cand[0].content, "parts", None)
if parts and hasattr(parts[0], "text"):
return parts[0].text
return "(No response text returned.)"
# -----------------------------
# Orchestration
# -----------------------------
def to_openai_history(gradio_history: List[Tuple[str, str]]) -> List[Dict[str, str]]:
"""
Convert Gradio Chatbot history ([(user, assistant), ...]) to OpenAI role format.
"""
oai: List[Dict[str, str]] = []
for user_msg, ai_msg in gradio_history or []:
if user_msg:
oai.append({"role": "user", "content": user_msg})
if ai_msg:
oai.append({"role": "assistant", "content": ai_msg})
return oai
def infer(
provider_label: str,
openai_key: str,
google_key: str,
model_name: str,
system_prompt: str,
user_message: str,
image,
temperature: float,
max_tokens: int,
history: List[Tuple[str, str]],
):
"""
Main inference entry: routes to OpenAI or Gemini, measures latency, estimates cost,
and appends the turn to the chat history.
"""
if not (user_message and user_message.strip()):
raise gr.Error("Please enter a prompt (or pick a starter prompt).")
if is_blocked(user_message):
assistant = "Request blocked by safety policy. Please rephrase."
history = history or []
history.append((user_message, assistant))
return history, 0, 0.0
t0 = time.time()
history = history or []
if provider_label.startswith("OpenAI"):
api_key = (openai_key or "").strip()
if not api_key:
raise gr.Error("Enter your OpenAI API key in the Settings accordion.")
oai_history = to_openai_history(history)
reply = call_openai_chat(
api_key=api_key,
model=model_name or DEFAULT_OPENAI_MODEL,
system_prompt=system_prompt or SYSTEM_DEFAULT,
history_messages=oai_history,
user_message=user_message,
temperature=temperature,
max_tokens=max_tokens,
)
else:
api_key = (google_key or "").strip()
if not api_key:
raise gr.Error("Enter your Google Gemini API key in the Settings accordion.")
reply = call_gemini_generate(
api_key=api_key,
model=model_name or DEFAULT_GEMINI_MODEL,
system_prompt=system_prompt or SYSTEM_DEFAULT,
user_message=user_message,
image=image,
temperature=temperature,
)
latency_ms = int((time.time() - t0) * 1000)
cost_est = estimate_cost(provider_label, model_name, user_message, reply)
history.append((user_message, reply))
return history, latency_ms, cost_est
# -----------------------------
# UI
# -----------------------------
with gr.Blocks(fill_height=True, theme=gr.themes.Soft(), title="ZEN Dual-Engine AI") as demo:
gr.Markdown("# " + APP_TITLE)
gr.Markdown(
"Pick your engine, paste your API key(s), and start creating. "
"Keys entered here are **session-only**. For permanent use, set **Space Secrets**."
)
with gr.Row():
with gr.Column(scale=3, min_width=380):
provider = gr.Radio(
["OpenAI (GPT-5)", "Google (Nano-Banana)"],
value="OpenAI (GPT-5)",
label="Engine"
)
model_name = gr.Textbox(
label="Model name",
value=DEFAULT_OPENAI_MODEL,
placeholder=f"e.g., {DEFAULT_OPENAI_MODEL} or {DEFAULT_GEMINI_MODEL}"
)
system_prompt = gr.Textbox(
label="System prompt",
value=SYSTEM_DEFAULT,
lines=3,
info="Controls assistant behavior/persona."
)
with gr.Accordion("🔑 Settings • Bring Your Own Keys (session-only)", open=True):
openai_api_key = gr.Textbox(
label="OPENAI_API_KEY (for GPT-5 path)",
type="password",
placeholder="sk-..."
)
google_api_key = gr.Textbox(
label="GOOGLE_API_KEY (for Nano-Banana path)",
type="password",
placeholder="AIza..."
)
gr.Markdown(
"You can also add `OPENAI_API_KEY` and `GOOGLE_API_KEY` in the Space **Repository Secrets**."
)
user_message = gr.Textbox(
label="Your prompt",
placeholder="Ask anything… or pick a starter prompt below.",
lines=5
)
with gr.Row():
temperature = gr.Slider(
0.0, 1.0, value=0.5, step=0.05,
label="Temperature (some OpenAI models ignore non-default)"
)
max_tokens = gr.Slider(
128, 4096, value=1024, step=64,
label="Max completion tokens (OpenAI path)"
)
with gr.Row():
send = gr.Button("🚀 Generate", variant="primary")
clear = gr.Button("🧹 Clear chat")
with gr.Column(scale=4, min_width=480):
chat = gr.Chatbot(
label="Conversation",
height=440,
type="messages", # OpenAI-style roles internally
avatar_images=(None, None),
)
with gr.Row():
latency = gr.Number(label="Latency (ms)", interactive=False)
cost = gr.Number(label="Est. cost (USD)", interactive=False)
with gr.Accordion("🖼️ Optional: Image (Gemini path supports vision)", open=False):
image = gr.Image(
label="Upload image for analysis (used only on Google/Gemini path)",
type="pil"
)
with gr.Accordion("✨ Starter Prompts", open=True):
starters = gr.Dataset(
components=[gr.Textbox(visible=False)],
samples=[[p] for p in STARTER_PROMPTS],
type="index",
label="Click a row to load a starter prompt into the input."
)
gr.Markdown(
"- Try the same prompt on both engines and compare.\n"
"- Safety: blocks obvious injection/script patterns."
)
# -------------------------
# Events
# -------------------------
def on_starter_select(evt: gr.SelectData):
idx = evt.index
if isinstance(idx, (list, tuple)):
idx = idx[0]
try:
return STARTER_PROMPTS[int(idx)]
except Exception:
return STARTER_PROMPTS[0]
starters.select(on_starter_select, outputs=[user_message])
def on_send(provider, oai_key, g_key, model, sys, msg, img, temp, maxtok, hist):
return infer(provider, oai_key, g_key, model, sys, msg, img, float(temp), int(maxtok), hist)
send.click(
on_send,
inputs=[
provider, openai_api_key, google_api_key, model_name, system_prompt,
user_message, image, temperature, max_tokens, chat
],
outputs=[chat, latency, cost],
show_progress="minimal"
)
def on_clear():
return [], 0, 0.0, None, ""
clear.click(on_clear, outputs=[chat, latency, cost, image, user_message])
# -----------------------------
# Main
# -----------------------------
if __name__ == "__main__":
demo.queue(max_size=64).launch()