import os, time, sys, asyncio from typing import List, Dict import gradio as gr from dotenv import load_dotenv from openai import OpenAI import base64 from embedder import EmbeddingModel from Reranker import Reranker if sys.platform.startswith("win"): try: asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) except Exception: pass # Env load_dotenv() APP_Name = os.getenv("APP_Name", "RAG chatbot in Ghaymah documentation") APP_Version = os.getenv("APP_Version", "1.0.0") API_KEY = os.getenv("API_KEY") HOST = os.getenv("HOST") Embed_Model_Name = os.getenv("EMBEDDING_MODEL_NAME") Reranker_Model_Name = os.getenv("RERANKER_MODEL_NAME") K = int(os.getenv("K", "8")) TOP_N = int(os.getenv("TOP_N", "5")) RPM_LIMIT = 20 MIN_SECONDS_BETWEEN = 3 N_DIM = 384 # OpenAI client client = None if API_KEY: client = OpenAI(api_key=API_KEY, base_url="https://genai.ghaymah.systems") CSS = """ .app-header{display:flex;align-items:center;gap:12px;justify-content:center;margin:6px 0 16px} .app-header img{height:60px;border-radius:12px} .app-title{font-weight:800;font-size:28px;line-height:1.1} .app-sub{opacity:.7;font-size:14px} """ COMPANY_LOGO = "download.jpeg" OWNER_NAME = "ENG. Ahmed Yasser El Sharkawy" def safe_chat_complete(model: str, messages: List[Dict], max_tokens: int = 9000) -> str: delays = [5, 10, 20] attempt = 0 while True: try: resp = client.chat.completions.create( model=model, messages=messages, max_tokens=max_tokens, temperature=0.3, timeout=60, ) return resp.choices[0].message.content except Exception as e: msg = str(e) if "429" in msg or "Rate Limit" in msg: if attempt < len(delays): time.sleep(delays[attempt]); attempt += 1 continue raise def logo_data_uri(path: str) -> str: if not os.path.exists(path): return "" ext = os.path.splitext(path)[1].lower() mime = { ".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".webp": "image/webp", ".gif": "image/gif" }.get(ext, "image/png") with open(path, "rb") as f: b64 = base64.b64encode(f.read()).decode("utf-8") return f"data:{mime};base64,{b64}" def build_single_system_context(query: str, max_total_chars: int = 9000, k: int = 10) -> str: Embedder = EmbeddingModel(model_name=Embed_Model_Name) RankerModel = Reranker(model_name=Reranker_Model_Name) results = Embedder.retrieve_top_k_remote_texts(query, k=k, HOST=HOST) Top_sort_results = RankerModel.rerank_results(query, results, top_n=TOP_N) snippets, sources = [], [] for p in Top_sort_results: txt = (p.get("text") or "").strip() if not txt: continue src = p.get("source") if isinstance(src, str) and src: sources.append(src) snippets.append(txt) if not snippets: return ("You are ghaymah expert . follow instraction to be strict RAG assistant. No context was retrieved from the vector store for this query. " "If the answer is not present, say do not mention in ghaymah documentation.") header = ("You are ghaymah expert. follow instraction to be strict RAG assistant. Answer ONLY using the provided context snippets. " "If the answer is not present, say do not mention in ghaymah documentation.") body_budget = max_total_chars - len(header) body_parts, used = [], 0 for snip in snippets: piece = snip + "\n\n" if used + len(piece) <= body_budget: body_parts.append(piece); used += len(piece) else: break seen, uniq_sources = set(), [] for s in sources: if s not in seen: uniq_sources.append(s); seen.add(s) footer = "Sources:\n" + "\n".join(f"- {s}" for s in uniq_sources) + "\n" if uniq_sources else "" return (header + "".join(body_parts) + footer).strip() SYSTEM_SEED = "You are ghaymah expert. follow instraction to be strict RAG assistant. Answer ONLY using the provided context snippets." def init_state(): return {"messages": [{"role": "system", "content": SYSTEM_SEED}], "last_call_ts": None} def can_call_now(state: dict) -> bool: last = state.get("last_call_ts") return True if last is None else (time.time() - last) >= MIN_SECONDS_BETWEEN def record_call_time(state: dict): state["last_call_ts"] = time.time() def respond(user_message: str, state: dict): missing = [] if not API_KEY: missing.append("API_KEY") if not HOST: missing.append("HOST") if not Embed_Model_Name: missing.append("EMBEDDING_MODEL_NAME") if not Reranker_Model_Name: missing.append("RERANKER_MODEL_NAME") if missing: return (f"Config missing: {', '.join(missing)}. Set them in your .env and restart."), state state["messages"].append({"role": "user", "content": user_message}) if not can_call_now(state): remaining = int(MIN_SECONDS_BETWEEN - (time.time() - (state.get("last_call_ts") or 0))) remaining = max(1, remaining) msg = f"Rate limit in effect. Please wait ~{remaining} seconds." state["messages"].append({"role": "assistant", "content": msg}) return msg, state rag_ctx = build_single_system_context(query=user_message, max_total_chars=5000, k=K) msgs = [{"role": "system", "content": rag_ctx}] msgs.extend([m for m in state["messages"] if m["role"] != "system"][-10:]) try: reply = safe_chat_complete("DeepSeek-V3-0324", msgs, max_tokens=1000) record_call_time(state) except Exception as e: reply = f"Request failed: {e}" state["messages"].append({"role": "assistant", "content": reply}) return reply, state # Gradio UI with gr.Blocks(title=f"{APP_Name} v{APP_Version}", css=CSS) as demo: header_logo_src = logo_data_uri(COMPANY_LOGO) logo_html = f"logo" if header_logo_src else "" gr.HTML(f"""
{logo_html}
{APP_Name}
v{APP_Version} • {OWNER_NAME}
""") state = gr.State(init_state()) with gr.Row(): # LEFT: chat + input with gr.Column(scale=3): chatbot = gr.Chatbot(label="Chat", height=520, type="messages", value=[]) txt = gr.Textbox( placeholder="Ask anything about the Ghaymah documentation…", label="Your message", lines=2, autofocus=True, ) with gr.Row(): send_btn = gr.Button("Send", variant="primary") clear_btn = gr.Button("Clear") # RIGHT with gr.Column(scale=1, min_width=300): gr.Image( value="download.jpeg", interactive=False, show_label=False, container=False, show_fullscreen_button=False, ) gr.Markdown( "Vector store: **Connected** \n" f"Embedder: `{Embed_Model_Name or 'unset'}` \n" f"RPM limit: **{RPM_LIMIT}** \n" ) def _on_user_submit(user_input, chat_messages): try: if not user_input: return "", (chat_messages or []) chat_messages = chat_messages or [] updated = chat_messages + [{"role": "user", "content": user_input}] # print("[on_submit] user:", user_input) return "", updated except Exception as e: print("[on_submit][ERROR]", repr(e)) return user_input, (chat_messages or []) txt.submit(_on_user_submit, [txt, chatbot], [txt, chatbot]) send_btn.click(_on_user_submit, [txt, chatbot], [txt, chatbot]) def _bot_step(chat_messages, state): try: chat_messages = chat_messages or [] last_user = None for msg in reversed(chat_messages): if msg.get("role") == "user" and isinstance(msg.get("content"), str): last_user = msg["content"] break if last_user is None: print("[bot_step] no user message found") return chat_messages, state # print("[bot_step] responding to:", last_user) bot_reply, new_state = respond(last_user, state) updated = chat_messages + [{"role": "assistant", "content": bot_reply}] return updated, new_state except Exception as e: # print("[bot_step][ERROR]", repr(e)) updated = (chat_messages or []) + [ {"role": "assistant", "content": f"⚠️ Internal error: {e}"} ] return updated, state txt.submit(_on_user_submit, [txt, chatbot], [txt, chatbot])\ .then(_bot_step, [chatbot, state], [chatbot, state]) send_btn.click(_on_user_submit, [txt, chatbot], [txt, chatbot])\ .then(_bot_step, [chatbot, state], [chatbot, state]) def _clear(): print("[clear] resetting state and chat") return [], init_state() clear_btn.click(_clear, outputs=[chatbot, state]) if __name__ == "__main__": demo.queue() demo.launch(debug=True,server_name="0.0.0.0" ,server_port=7860)