Space4.O / app.py
shubham2007's picture
Update app.py
2721aac verified
"""
AI Teamwork Runner — Full Hugging Face Space
"""
import os
import re
import sys
import tempfile
import subprocess
from typing import List, Dict, Tuple, Optional
import requests
import gradio as gr
# -----------------------------
# Optional connectors
# -----------------------------
try:
from duckduckgo_search import ddg
except Exception:
ddg = None
try:
from googlesearch import search as google_search
except Exception:
google_search = None
try:
from serpapi import GoogleSearch as SerpAPIClient
except Exception:
SerpAPIClient = None
try:
from stackapi import StackAPI
except Exception:
StackAPI = None
try:
from llama_index import Document, GPTVectorStoreIndex
LLM_INDEX_AVAILABLE = True
except Exception:
LLM_INDEX_AVAILABLE = False
try:
import swe_bench
except Exception:
swe_bench = None
try:
import mypy.api
except Exception:
mypy = None
try:
from autogen import AutoGen
except Exception:
AutoGen = None
try:
from langgraph import LangGraph
except Exception:
LangGraph = None
try:
from memgpt import MemGPT
except Exception:
MemGPT = None
# -----------------------------
# Load secrets / env
# -----------------------------
OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY")
SERPAPI_API_KEY = os.environ.get("SERPAPI_API_KEY")
GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN")
STACKEXCHANGE_KEY = os.environ.get("STACKEXCHANGE_KEY")
OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions"
# -----------------------------
# MemGPT memory agent
# -----------------------------
mem_agent = MemGPT(name="ai_teamwork_memory") if MemGPT else None
def mem_store(key: str, value: str):
if mem_agent:
mem_agent.store(key, value)
def mem_retrieve(key: str) -> Optional[str]:
if mem_agent:
return mem_agent.load(key)
return None
# -----------------------------
# LLM call (OpenRouter)
# -----------------------------
def call_llm(prompt: str,
model: str = "meta-llama/llama-3-70b-instruct",
temperature: float = 0.2,
max_tokens: int = 1200) -> str:
if not OPENROUTER_API_KEY:
return "[LLM] OPENROUTER_API_KEY not set. Please set it in Space settings."
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": temperature,
"max_tokens": max_tokens,
}
try:
r = requests.post(OPENROUTER_URL, headers=headers, json=payload, timeout=120)
r.raise_for_status()
body = r.json()
return body.get("choices", [{}])[0].get("message", {}).get("content", "")
except Exception as e:
return f"[LLM ERROR] {e}"
# -----------------------------
# Search connectors
# -----------------------------
def search_duckduckgo(query: str, max_results: int = 5) -> List[Dict[str, str]]:
if ddg is None:
return [{"title": "duckduckgo_search missing", "link": "", "snippet": "Install duckduckgo_search"}]
try:
hits = ddg(query, max_results=max_results)
return [{
"title": h.get("title", ""),
"link": h.get("href") or h.get("url", ""),
"snippet": h.get("body", "")
} for h in hits]
except Exception as e:
return [{"title": "error", "link": "", "snippet": str(e)}]
def google_scrape(query: str, max_results: int = 5) -> List[Dict[str, str]]:
if SERPAPI_API_KEY and SerpAPIClient:
try:
client = SerpAPIClient({"api_key": SERPAPI_API_KEY, "q": query,
"engine": "google", "num": max_results})
data = client.get_dict()
return [{
"title": o.get("title"),
"link": o.get("link"),
"snippet": o.get("snippet", "")
} for o in data.get("organic_results", [])]
except Exception:
pass
if google_search:
try:
return [{"title": "", "link": url, "snippet": ""} for url in google_search(query, num_results=max_results)]
except Exception:
pass
return search_duckduckgo(query, max_results=max_results)
def stackoverflow_search(query: str, max_results: int = 5) -> List[Dict[str, str]]:
if StackAPI is None:
return [{"title": "stackapi missing", "link": "", "snippet": "Install stackapi"}]
try:
site = StackAPI('stackoverflow', key=STACKEXCHANGE_KEY) if STACKEXCHANGE_KEY else StackAPI('stackoverflow')
items = site.fetch('search/advanced', order='desc', sort='relevance',
q=query, page=1, pagesize=max_results).get('items', [])
return [{"title": it.get('title'), "link": it.get('link'),
"snippet": it.get('excerpt', '')} for it in items]
except Exception as e:
return [{"title": "error", "link": "", "snippet": str(e)}]
def github_code_search(query: str, max_results: int = 5) -> List[Dict[str, str]]:
if not GITHUB_TOKEN:
return [{"title": "github_token_missing", "link": "", "snippet": "Set GITHUB_TOKEN"}]
headers = {
"Authorization": f"Bearer {GITHUB_TOKEN}",
"Accept": "application/vnd.github+json"
}
params = {"q": f"{query} in:file", "per_page": max_results}
try:
r = requests.get("https://api.github.com/search/code", headers=headers, params=params, timeout=30)
r.raise_for_status()
return [{
"title": it.get('name'), "link": it.get('html_url'), "snippet": it.get('path')
} for it in r.json().get('items', [])]
except Exception as e:
return [{"title": "error", "link": "", "snippet": str(e)}]
# -----------------------------
# RAG helpers
# -----------------------------
def build_rag_index(docs: List[Dict[str, str]]):
if not LLM_INDEX_AVAILABLE:
return None
try:
li_docs = [Document("\n".join([d.get('title', ''), d.get('link', ''), d.get('snippet', '')])) for d in docs]
return GPTVectorStoreIndex.from_documents(li_docs)
except Exception as e:
print("[RAG BUILD ERROR]", e)
return None
def query_rag_index(index, query: str) -> str:
if index is None:
return ""
try:
qe = index.as_query_engine()
return str(qe.query(query))
except Exception as e:
return f"[RAG QUERY ERROR] {e}"
# -----------------------------
# Code cleaning & safe execution
# -----------------------------
def clean_code(code: str) -> str:
code = code.strip()
code = re.sub(r"^```(?:python)?", "", code, flags=re.MULTILINE)
code = re.sub(r"```$", "", code, flags=re.MULTILINE)
lines = code.splitlines()
kept = []
for line in lines:
s = line.strip()
if (not s or s.startswith("#")
or re.match(r'^(import |from |def |class |if |elif |else:|for |while |try:|except |with |return |print\()', s)
or re.match(r'^[\w\[\]\.]+\s*=.*', s)
or re.match(r'^[\w\[\]\.]+\(.+\)', s)):
kept.append(line)
return "\n".join(kept)
def run_python_code(code: str, timeout: int = 30) -> Tuple[str, str]:
code = clean_code(code)
with tempfile.NamedTemporaryFile(delete=False, suffix=".py", mode="w") as f:
f.write(code)
path = f.name
try:
proc = subprocess.run([sys.executable, path], capture_output=True, text=True, timeout=timeout)
return proc.stdout, proc.stderr
except subprocess.TimeoutExpired:
return "", "Execution Timeout"
except Exception as e:
return "", str(e)
# -----------------------------
# Additional tools
# -----------------------------
def type_check_code(code: str) -> str:
if not mypy:
return "mypy not installed"
stdout, stderr, _ = mypy.api.run(["-c", code])
return stdout or stderr or "No type errors"
def run_swe_bench(code: str) -> str:
if not swe_bench:
return "SWE-Bench not installed"
try:
return swe_bench.run_code(code)
except Exception as e:
return f"SWE-Bench error: {e}"
def autogen_generate(prompt: str) -> str:
if not AutoGen:
return "[AutoGen not installed]"
return AutoGen().run(prompt)
def visualize_task_graph(task_prompt: str):
# Return DOT string (Graphviz not required)
lines = ["digraph G {", f' root [label="{task_prompt[:30]}..."];']
agents = ["Coder", "Debugger", "Tester", "Reviewer", "Documenter"]
for i, agent in enumerate(agents, start=1):
lines.append(f' n{i} [label="{agent}"];')
lines.append(f' root -> n{i};')
lines.append("}")
return "\n".join(lines)
# -----------------------------
# AI Teamwork Runner
# -----------------------------
def ai_teamwork_runner(task_prompt: str, enable_rag: bool = True, max_results: int = 5) -> str:
if not task_prompt:
return "Please provide a task prompt."
previous_run = mem_retrieve(task_prompt)
if previous_run:
return f"[Memory] Retrieved previous run:\n{previous_run}"
# Retrieval
combined = []
for src in (
search_duckduckgo(task_prompt, max_results),
google_scrape(task_prompt, max_results),
stackoverflow_search(task_prompt, max_results),
github_code_search(task_prompt, max_results),
):
if isinstance(src, list):
combined.extend(src)
# RAG context
rag_index = build_rag_index(combined) if enable_rag else None
rag_context = query_rag_index(rag_index, task_prompt) if rag_index else ""
teamwork_prompt = f"""
You are AI agents: Coder, Debugger, Tester, Reviewer, Documenter.
Task: {task_prompt}
Context:
{rag_context}
Return only working Python code with comments.
"""
llm_response = call_llm(teamwork_prompt)
type_msg = type_check_code(llm_response)
stdout, stderr = run_python_code(llm_response)
bench_result = run_swe_bench(llm_response)
mem_store(task_prompt, llm_response)
if stderr:
debug_prompt = f"The following code failed:\n{stderr}\nFix it and return working code."
llm_debug = call_llm(debug_prompt)
stdout2, stderr2 = run_python_code(llm_debug)
if stderr2:
return f"### Failed after Debug\nError:\n{stderr2}\nCode:\n{llm_debug}"
llm_response = llm_debug
stdout = stdout2
return (
f"### Type Check:\n{type_msg}\n\n"
f"### Executed Output:\n{stdout}\n\n"
f"### SWE-Bench Result:\n{bench_result}\n\n"
f"### Final Code:\n{llm_response}"
)
# -----------------------------
# Gradio UI
# -----------------------------
def build_description() -> str:
return (
"### AI Teamwork Runner — HF Space\n"
"Enter a development task. System retrieves docs, optionally runs RAG, "
"generates Python code, executes it, auto‑debugs once, type‑checks, "
"benchmarks, and stores memory."
)
with gr.Blocks(title="AI Teamwork Runner") as demo:
gr.Markdown(build_description())
inp = gr.Textbox(label="Task Prompt", placeholder="Example: CSV merge utility", lines=3)
btn = gr.Button("Run AI Teamwork")
rag = gr.Checkbox(label="Enable RAG", value=True)
maxres = gr.Slider(minimum=1, maximum=10, value=5, step=1, label="Max results per connector")
out = gr.Textbox(label="Output + Code + Test Summary", lines=30)
graph_dot = gr.Textbox(label="Task Graph (DOT)", lines=10)
btn.click(fn=ai_teamwork_runner, inputs=[inp, rag, maxres], outputs=out)
graph_btn = gr.Button("Show Task Graph")
graph_btn.click(fn=visualize_task_graph, inputs=[inp], outputs=graph_dot)
if __name__ == "__main__":
demo.launch()