import os, time, sys, asyncio
from typing import List, Dict
import gradio as gr
from dotenv import load_dotenv
from openai import OpenAI
import base64
from embedder import EmbeddingModel
from Reranker import Reranker
if sys.platform.startswith("win"):
try:
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
except Exception:
pass
# Env
load_dotenv()
APP_Name = os.getenv("APP_Name", "RAG chatbot in Ghaymah documentation")
APP_Version = os.getenv("APP_Version", "1.0.0")
API_KEY = os.getenv("API_KEY")
HOST = os.getenv("HOST")
Embed_Model_Name = os.getenv("EMBEDDING_MODEL_NAME")
Reranker_Model_Name = os.getenv("RERANKER_MODEL_NAME")
K = int(os.getenv("K", "8"))
TOP_N = int(os.getenv("TOP_N", "5"))
RPM_LIMIT = 20
MIN_SECONDS_BETWEEN = 3
N_DIM = 384
# OpenAI client
client = None
if API_KEY:
client = OpenAI(api_key=API_KEY, base_url="https://genai.ghaymah.systems")
CSS = """
.app-header{display:flex;align-items:center;gap:12px;justify-content:center;margin:6px 0 16px}
.app-header img{height:60px;border-radius:12px}
.app-title{font-weight:800;font-size:28px;line-height:1.1}
.app-sub{opacity:.7;font-size:14px}
"""
COMPANY_LOGO = "download.jpeg"
OWNER_NAME = "ENG. Ahmed Yasser El Sharkawy"
def safe_chat_complete(model: str, messages: List[Dict], max_tokens: int = 9000) -> str:
delays = [5, 10, 20]
attempt = 0
while True:
try:
resp = client.chat.completions.create(
model=model,
messages=messages,
max_tokens=max_tokens,
temperature=0.3,
timeout=60,
)
return resp.choices[0].message.content
except Exception as e:
msg = str(e)
if "429" in msg or "Rate Limit" in msg:
if attempt < len(delays):
time.sleep(delays[attempt]); attempt += 1
continue
raise
def logo_data_uri(path: str) -> str:
if not os.path.exists(path):
return ""
ext = os.path.splitext(path)[1].lower()
mime = {
".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg",
".webp": "image/webp", ".gif": "image/gif"
}.get(ext, "image/png")
with open(path, "rb") as f:
b64 = base64.b64encode(f.read()).decode("utf-8")
return f"data:{mime};base64,{b64}"
def build_single_system_context(query: str, max_total_chars: int = 9000, k: int = 10) -> str:
Embedder = EmbeddingModel(model_name=Embed_Model_Name)
RankerModel = Reranker(model_name=Reranker_Model_Name)
results = Embedder.retrieve_top_k_remote_texts(query, k=k, HOST=HOST)
Top_sort_results = RankerModel.rerank_results(query, results, top_n=TOP_N)
snippets, sources = [], []
for p in Top_sort_results:
txt = (p.get("text") or "").strip()
if not txt: continue
src = p.get("source")
if isinstance(src, str) and src: sources.append(src)
snippets.append(txt)
if not snippets:
return ("You are ghaymah expert . follow instraction to be strict RAG assistant. No context was retrieved from the vector store for this query. "
"If the answer is not present, say do not mention in ghaymah documentation.")
header = ("You are ghaymah expert. follow instraction to be strict RAG assistant. Answer ONLY using the provided context snippets. "
"If the answer is not present, say do not mention in ghaymah documentation.")
body_budget = max_total_chars - len(header)
body_parts, used = [], 0
for snip in snippets:
piece = snip + "\n\n"
if used + len(piece) <= body_budget:
body_parts.append(piece); used += len(piece)
else:
break
seen, uniq_sources = set(), []
for s in sources:
if s not in seen:
uniq_sources.append(s); seen.add(s)
footer = "Sources:\n" + "\n".join(f"- {s}" for s in uniq_sources) + "\n" if uniq_sources else ""
return (header + "".join(body_parts) + footer).strip()
SYSTEM_SEED = "You are ghaymah expert. follow instraction to be strict RAG assistant. Answer ONLY using the provided context snippets."
def init_state():
return {"messages": [{"role": "system", "content": SYSTEM_SEED}], "last_call_ts": None}
def can_call_now(state: dict) -> bool:
last = state.get("last_call_ts")
return True if last is None else (time.time() - last) >= MIN_SECONDS_BETWEEN
def record_call_time(state: dict):
state["last_call_ts"] = time.time()
def respond(user_message: str, state: dict):
missing = []
if not API_KEY: missing.append("API_KEY")
if not HOST: missing.append("HOST")
if not Embed_Model_Name: missing.append("EMBEDDING_MODEL_NAME")
if not Reranker_Model_Name: missing.append("RERANKER_MODEL_NAME")
if missing:
return (f"Config missing: {', '.join(missing)}. Set them in your .env and restart."), state
state["messages"].append({"role": "user", "content": user_message})
if not can_call_now(state):
remaining = int(MIN_SECONDS_BETWEEN - (time.time() - (state.get("last_call_ts") or 0)))
remaining = max(1, remaining)
msg = f"Rate limit in effect. Please wait ~{remaining} seconds."
state["messages"].append({"role": "assistant", "content": msg})
return msg, state
rag_ctx = build_single_system_context(query=user_message, max_total_chars=5000, k=K)
msgs = [{"role": "system", "content": rag_ctx}]
msgs.extend([m for m in state["messages"] if m["role"] != "system"][-10:])
try:
reply = safe_chat_complete("DeepSeek-V3-0324", msgs, max_tokens=1000)
record_call_time(state)
except Exception as e:
reply = f"Request failed: {e}"
state["messages"].append({"role": "assistant", "content": reply})
return reply, state
# Gradio UI
with gr.Blocks(title=f"{APP_Name} v{APP_Version}", css=CSS) as demo:
header_logo_src = logo_data_uri(COMPANY_LOGO)
logo_html = f"" if header_logo_src else ""
gr.HTML(f"""