|
|
""" |
|
|
ZEN Dual-Engine AI — GPT-5 (OpenAI) + Nano-Banana (Gemini) |
|
|
Gradio 5.49.1 Space with in-UI API keys, chat history, optional image (Gemini), |
|
|
telemetry, starter prompts, and robust OpenAI param fallbacks. |
|
|
|
|
|
Key robustness: |
|
|
- Auto-retry OpenAI call with `max_completion_tokens` if model rejects `max_tokens`. |
|
|
- Auto-retry OpenAI call without `temperature` if model only allows the default. |
|
|
- Clean Gradio 5.49.1 queue usage (no deprecated args). |
|
|
""" |
|
|
|
|
|
import os |
|
|
import time |
|
|
import base64 |
|
|
from io import BytesIO |
|
|
from typing import List, Tuple, Dict, Any |
|
|
|
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
APP_TITLE = "🔮 ZEN Dual-Engine AI — GPT-5 + Nano-Banana" |
|
|
|
|
|
DEFAULT_OPENAI_MODEL = "gpt-5" |
|
|
DEFAULT_GEMINI_MODEL = "gemini-2.5-nano-banana" |
|
|
|
|
|
SYSTEM_DEFAULT = ( |
|
|
"You are ZEN Assistant. Respond concisely, accurately, and helpfully. " |
|
|
"If an image is provided, analyze it clearly. Avoid unsafe advice." |
|
|
) |
|
|
|
|
|
STARTER_PROMPTS: List[str] = [ |
|
|
"💡 Brainstorm 7 AI-powered product ideas that tackle youth education gaps.", |
|
|
"📚 Draft a 4-week AI literacy micro-curriculum with hands-on labs.", |
|
|
"🧪 Design an experiment to compare GPT-5 vs Nano-Banana on code generation.", |
|
|
"🎨 Describe a museum exhibit that visualizes the history of AI in America.", |
|
|
"🛠️ Generate a Python function that converts a PDF to clean Markdown.", |
|
|
"🪐 Write a sci-fi scene about a student building an AI on the Moon.", |
|
|
"🔍 Summarize the pros/cons of agentic workflows for startups.", |
|
|
"📈 Propose a metrics dashboard for measuring AI program impact.", |
|
|
] |
|
|
|
|
|
|
|
|
BLOCKLIST = ["<script", "</script>", "{{", "}}"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _lazy_import_openai(): |
|
|
try: |
|
|
from openai import OpenAI |
|
|
return OpenAI |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"OpenAI SDK not available: {e}") |
|
|
|
|
|
|
|
|
def _lazy_import_gemini(): |
|
|
try: |
|
|
import google.generativeai as genai |
|
|
return genai |
|
|
except Exception as e: |
|
|
raise RuntimeError(f"Google Generative AI SDK not available: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_blocked(text: str) -> bool: |
|
|
if not text: |
|
|
return False |
|
|
low = text.lower() |
|
|
return any(tok in low for tok in BLOCKLIST) |
|
|
|
|
|
|
|
|
def pil_to_base64(image) -> str: |
|
|
"""Convert PIL image to base64 JPEG (if you ever need raw bytes).""" |
|
|
buffer = BytesIO() |
|
|
image.convert("RGB").save(buffer, format="JPEG", quality=92) |
|
|
return base64.b64encode(buffer.getvalue()).decode("utf-8") |
|
|
|
|
|
|
|
|
def approx_tokens_from_chars(text: str) -> int: |
|
|
return int(len(text or "") / 4) |
|
|
|
|
|
|
|
|
def estimate_cost(provider_label: str, model: str, prompt: str, reply: str) -> float: |
|
|
""" |
|
|
Super rough illustrative CPMs. Adjust to your billing reality. |
|
|
""" |
|
|
toks = approx_tokens_from_chars(prompt) + approx_tokens_from_chars(reply) |
|
|
if provider_label.startswith("OpenAI"): |
|
|
return round(toks / 1_000_000.0 * 7.5, 4) |
|
|
return round(toks / 1_000_000.0 * 5.0, 4) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def call_openai_chat( |
|
|
api_key: str, |
|
|
model: str, |
|
|
system_prompt: str, |
|
|
history_messages: List[Dict[str, str]], |
|
|
user_message: str, |
|
|
temperature: float, |
|
|
max_tokens: int, |
|
|
) -> str: |
|
|
""" |
|
|
Calls OpenAI Chat Completions with adaptive parameter handling: |
|
|
- If model rejects `max_tokens`, retry with `max_completion_tokens`. |
|
|
- If model rejects non-default `temperature`, retry omitting temperature (server default). |
|
|
""" |
|
|
OpenAI = _lazy_import_openai() |
|
|
client = OpenAI(api_key=api_key) |
|
|
|
|
|
messages = [{"role": "system", "content": (system_prompt.strip() or SYSTEM_DEFAULT)}] |
|
|
messages.extend(history_messages or []) |
|
|
messages.append({"role": "user", "content": user_message}) |
|
|
|
|
|
base_kwargs = dict( |
|
|
model=(model.strip() or DEFAULT_OPENAI_MODEL), |
|
|
messages=messages, |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
kwargs_try = dict(base_kwargs) |
|
|
kwargs_try["temperature"] = float(temperature) |
|
|
kwargs_try["max_tokens"] = int(max_tokens) |
|
|
resp = client.chat.completions.create(**kwargs_try) |
|
|
return resp.choices[0].message.content |
|
|
except Exception as e1: |
|
|
msg1 = str(e1) |
|
|
|
|
|
|
|
|
needs_mct = ("max_tokens" in msg1 and "max_completion_tokens" in msg1) or "Unsupported parameter" in msg1 |
|
|
|
|
|
temp_default_only = ("temperature" in msg1) and ("unsupported_value" in msg1 or "Only the default" in msg1) |
|
|
|
|
|
|
|
|
if needs_mct and not temp_default_only: |
|
|
try: |
|
|
kwargs_try = dict(base_kwargs) |
|
|
kwargs_try["temperature"] = float(temperature) |
|
|
kwargs_try["max_completion_tokens"] = int(max_tokens) |
|
|
resp = client.chat.completions.create(**kwargs_try) |
|
|
return resp.choices[0].message.content |
|
|
except Exception as e2: |
|
|
msg2 = str(e2) |
|
|
|
|
|
temp_default_only = ("temperature" in msg2) and ("unsupported_value" in msg2 or "Only the default" in msg2) |
|
|
|
|
|
|
|
|
if temp_default_only and not needs_mct: |
|
|
try: |
|
|
kwargs_try = dict(base_kwargs) |
|
|
kwargs_try["max_tokens"] = int(max_tokens) |
|
|
resp = client.chat.completions.create(**kwargs_try) |
|
|
return resp.choices[0].message.content |
|
|
except Exception as e3: |
|
|
msg3 = str(e3) |
|
|
|
|
|
needs_mct = ("max_tokens" in msg3 and "max_completion_tokens" in msg3) or "Unsupported parameter" in msg3 |
|
|
|
|
|
|
|
|
if needs_mct and temp_default_only: |
|
|
kwargs_try = dict(base_kwargs) |
|
|
kwargs_try["max_completion_tokens"] = int(max_tokens) |
|
|
resp = client.chat.completions.create(**kwargs_try) |
|
|
return resp.choices[0].message.content |
|
|
|
|
|
|
|
|
raise |
|
|
|
|
|
|
|
|
def call_gemini_generate( |
|
|
api_key: str, |
|
|
model: str, |
|
|
system_prompt: str, |
|
|
user_message: str, |
|
|
image=None, |
|
|
temperature: float = 0.4, |
|
|
) -> str: |
|
|
""" |
|
|
Calls Gemini (including Nano-Banana variants). Supports optional PIL image. |
|
|
""" |
|
|
genai = _lazy_import_gemini() |
|
|
genai.configure(api_key=api_key) |
|
|
|
|
|
|
|
|
safety_settings = [ |
|
|
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, |
|
|
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, |
|
|
{"category": "HARM_CATEGORY_SEXUAL_CONTENT", "threshold": "BLOCK_NONE"}, |
|
|
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, |
|
|
] |
|
|
|
|
|
model_obj = genai.GenerativeModel( |
|
|
model_name=(model.strip() or DEFAULT_GEMINI_MODEL), |
|
|
system_instruction=(system_prompt.strip() or SYSTEM_DEFAULT), |
|
|
safety_settings=safety_settings, |
|
|
generation_config={"temperature": float(temperature)}, |
|
|
) |
|
|
|
|
|
parts: List[Any] = [user_message or ""] |
|
|
if image is not None: |
|
|
parts.append(image) |
|
|
|
|
|
resp = model_obj.generate_content(parts) |
|
|
|
|
|
if hasattr(resp, "text") and resp.text: |
|
|
return resp.text |
|
|
cand = getattr(resp, "candidates", None) |
|
|
if cand and getattr(cand[0], "content", None): |
|
|
parts = getattr(cand[0].content, "parts", None) |
|
|
if parts and hasattr(parts[0], "text"): |
|
|
return parts[0].text |
|
|
return "(No response text returned.)" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def to_openai_history(gradio_history: List[Tuple[str, str]]) -> List[Dict[str, str]]: |
|
|
""" |
|
|
Convert Gradio Chatbot history ([(user, assistant), ...]) to OpenAI role format. |
|
|
""" |
|
|
oai: List[Dict[str, str]] = [] |
|
|
for user_msg, ai_msg in gradio_history or []: |
|
|
if user_msg: |
|
|
oai.append({"role": "user", "content": user_msg}) |
|
|
if ai_msg: |
|
|
oai.append({"role": "assistant", "content": ai_msg}) |
|
|
return oai |
|
|
|
|
|
|
|
|
def infer( |
|
|
provider_label: str, |
|
|
openai_key: str, |
|
|
google_key: str, |
|
|
model_name: str, |
|
|
system_prompt: str, |
|
|
user_message: str, |
|
|
image, |
|
|
temperature: float, |
|
|
max_tokens: int, |
|
|
history: List[Tuple[str, str]], |
|
|
): |
|
|
""" |
|
|
Main inference entry: routes to OpenAI or Gemini, measures latency, estimates cost, |
|
|
and appends the turn to the chat history. |
|
|
""" |
|
|
if not (user_message and user_message.strip()): |
|
|
raise gr.Error("Please enter a prompt (or pick a starter prompt).") |
|
|
if is_blocked(user_message): |
|
|
assistant = "Request blocked by safety policy. Please rephrase." |
|
|
history = history or [] |
|
|
history.append((user_message, assistant)) |
|
|
return history, 0, 0.0 |
|
|
|
|
|
t0 = time.time() |
|
|
history = history or [] |
|
|
|
|
|
if provider_label.startswith("OpenAI"): |
|
|
api_key = (openai_key or "").strip() |
|
|
if not api_key: |
|
|
raise gr.Error("Enter your OpenAI API key in the Settings accordion.") |
|
|
oai_history = to_openai_history(history) |
|
|
reply = call_openai_chat( |
|
|
api_key=api_key, |
|
|
model=model_name or DEFAULT_OPENAI_MODEL, |
|
|
system_prompt=system_prompt or SYSTEM_DEFAULT, |
|
|
history_messages=oai_history, |
|
|
user_message=user_message, |
|
|
temperature=temperature, |
|
|
max_tokens=max_tokens, |
|
|
) |
|
|
else: |
|
|
api_key = (google_key or "").strip() |
|
|
if not api_key: |
|
|
raise gr.Error("Enter your Google Gemini API key in the Settings accordion.") |
|
|
reply = call_gemini_generate( |
|
|
api_key=api_key, |
|
|
model=model_name or DEFAULT_GEMINI_MODEL, |
|
|
system_prompt=system_prompt or SYSTEM_DEFAULT, |
|
|
user_message=user_message, |
|
|
image=image, |
|
|
temperature=temperature, |
|
|
) |
|
|
|
|
|
latency_ms = int((time.time() - t0) * 1000) |
|
|
cost_est = estimate_cost(provider_label, model_name, user_message, reply) |
|
|
|
|
|
history.append((user_message, reply)) |
|
|
return history, latency_ms, cost_est |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(fill_height=True, theme=gr.themes.Soft(), title="ZEN Dual-Engine AI") as demo: |
|
|
gr.Markdown("# " + APP_TITLE) |
|
|
gr.Markdown( |
|
|
"Pick your engine, paste your API key(s), and start creating. " |
|
|
"Keys entered here are **session-only**. For permanent use, set **Space Secrets**." |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=3, min_width=380): |
|
|
provider = gr.Radio( |
|
|
["OpenAI (GPT-5)", "Google (Nano-Banana)"], |
|
|
value="OpenAI (GPT-5)", |
|
|
label="Engine" |
|
|
) |
|
|
|
|
|
model_name = gr.Textbox( |
|
|
label="Model name", |
|
|
value=DEFAULT_OPENAI_MODEL, |
|
|
placeholder=f"e.g., {DEFAULT_OPENAI_MODEL} or {DEFAULT_GEMINI_MODEL}" |
|
|
) |
|
|
|
|
|
system_prompt = gr.Textbox( |
|
|
label="System prompt", |
|
|
value=SYSTEM_DEFAULT, |
|
|
lines=3, |
|
|
info="Controls assistant behavior/persona." |
|
|
) |
|
|
|
|
|
with gr.Accordion("🔑 Settings • Bring Your Own Keys (session-only)", open=True): |
|
|
openai_api_key = gr.Textbox( |
|
|
label="OPENAI_API_KEY (for GPT-5 path)", |
|
|
type="password", |
|
|
placeholder="sk-..." |
|
|
) |
|
|
google_api_key = gr.Textbox( |
|
|
label="GOOGLE_API_KEY (for Nano-Banana path)", |
|
|
type="password", |
|
|
placeholder="AIza..." |
|
|
) |
|
|
gr.Markdown( |
|
|
"You can also add `OPENAI_API_KEY` and `GOOGLE_API_KEY` in the Space **Repository Secrets**." |
|
|
) |
|
|
|
|
|
user_message = gr.Textbox( |
|
|
label="Your prompt", |
|
|
placeholder="Ask anything… or pick a starter prompt below.", |
|
|
lines=5 |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
temperature = gr.Slider( |
|
|
0.0, 1.0, value=0.5, step=0.05, |
|
|
label="Temperature (some OpenAI models ignore non-default)" |
|
|
) |
|
|
max_tokens = gr.Slider( |
|
|
128, 4096, value=1024, step=64, |
|
|
label="Max completion tokens (OpenAI path)" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
send = gr.Button("🚀 Generate", variant="primary") |
|
|
clear = gr.Button("🧹 Clear chat") |
|
|
|
|
|
with gr.Column(scale=4, min_width=480): |
|
|
chat = gr.Chatbot( |
|
|
label="Conversation", |
|
|
height=440, |
|
|
type="messages", |
|
|
avatar_images=(None, None), |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
latency = gr.Number(label="Latency (ms)", interactive=False) |
|
|
cost = gr.Number(label="Est. cost (USD)", interactive=False) |
|
|
|
|
|
with gr.Accordion("🖼️ Optional: Image (Gemini path supports vision)", open=False): |
|
|
image = gr.Image( |
|
|
label="Upload image for analysis (used only on Google/Gemini path)", |
|
|
type="pil" |
|
|
) |
|
|
|
|
|
with gr.Accordion("✨ Starter Prompts", open=True): |
|
|
starters = gr.Dataset( |
|
|
components=[gr.Textbox(visible=False)], |
|
|
samples=[[p] for p in STARTER_PROMPTS], |
|
|
type="index", |
|
|
label="Click a row to load a starter prompt into the input." |
|
|
) |
|
|
gr.Markdown( |
|
|
"- Try the same prompt on both engines and compare.\n" |
|
|
"- Safety: blocks obvious injection/script patterns." |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def on_starter_select(evt: gr.SelectData): |
|
|
idx = evt.index |
|
|
if isinstance(idx, (list, tuple)): |
|
|
idx = idx[0] |
|
|
try: |
|
|
return STARTER_PROMPTS[int(idx)] |
|
|
except Exception: |
|
|
return STARTER_PROMPTS[0] |
|
|
|
|
|
starters.select(on_starter_select, outputs=[user_message]) |
|
|
|
|
|
def on_send(provider, oai_key, g_key, model, sys, msg, img, temp, maxtok, hist): |
|
|
return infer(provider, oai_key, g_key, model, sys, msg, img, float(temp), int(maxtok), hist) |
|
|
|
|
|
send.click( |
|
|
on_send, |
|
|
inputs=[ |
|
|
provider, openai_api_key, google_api_key, model_name, system_prompt, |
|
|
user_message, image, temperature, max_tokens, chat |
|
|
], |
|
|
outputs=[chat, latency, cost], |
|
|
show_progress="minimal" |
|
|
) |
|
|
|
|
|
def on_clear(): |
|
|
return [], 0, 0.0, None, "" |
|
|
clear.click(on_clear, outputs=[chat, latency, cost, image, user_message]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.queue(max_size=64).launch() |
|
|
|