""" AI Teamwork Runner — Full Hugging Face Space """ import os import re import sys import tempfile import subprocess from typing import List, Dict, Tuple, Optional import requests import gradio as gr # ----------------------------- # Optional connectors # ----------------------------- try: from duckduckgo_search import ddg except Exception: ddg = None try: from googlesearch import search as google_search except Exception: google_search = None try: from serpapi import GoogleSearch as SerpAPIClient except Exception: SerpAPIClient = None try: from stackapi import StackAPI except Exception: StackAPI = None try: from llama_index import Document, GPTVectorStoreIndex LLM_INDEX_AVAILABLE = True except Exception: LLM_INDEX_AVAILABLE = False try: import swe_bench except Exception: swe_bench = None try: import mypy.api except Exception: mypy = None try: from autogen import AutoGen except Exception: AutoGen = None try: from langgraph import LangGraph except Exception: LangGraph = None try: from memgpt import MemGPT except Exception: MemGPT = None # ----------------------------- # Load secrets / env # ----------------------------- OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") SERPAPI_API_KEY = os.environ.get("SERPAPI_API_KEY") GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN") STACKEXCHANGE_KEY = os.environ.get("STACKEXCHANGE_KEY") OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" # ----------------------------- # MemGPT memory agent # ----------------------------- mem_agent = MemGPT(name="ai_teamwork_memory") if MemGPT else None def mem_store(key: str, value: str): if mem_agent: mem_agent.store(key, value) def mem_retrieve(key: str) -> Optional[str]: if mem_agent: return mem_agent.load(key) return None # ----------------------------- # LLM call (OpenRouter) # ----------------------------- def call_llm(prompt: str, model: str = "meta-llama/llama-3-70b-instruct", temperature: float = 0.2, max_tokens: int = 1200) -> str: if not OPENROUTER_API_KEY: return "[LLM] OPENROUTER_API_KEY not set. Please set it in Space settings." headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json" } payload = { "model": model, "messages": [{"role": "user", "content": prompt}], "temperature": temperature, "max_tokens": max_tokens, } try: r = requests.post(OPENROUTER_URL, headers=headers, json=payload, timeout=120) r.raise_for_status() body = r.json() return body.get("choices", [{}])[0].get("message", {}).get("content", "") except Exception as e: return f"[LLM ERROR] {e}" # ----------------------------- # Search connectors # ----------------------------- def search_duckduckgo(query: str, max_results: int = 5) -> List[Dict[str, str]]: if ddg is None: return [{"title": "duckduckgo_search missing", "link": "", "snippet": "Install duckduckgo_search"}] try: hits = ddg(query, max_results=max_results) return [{ "title": h.get("title", ""), "link": h.get("href") or h.get("url", ""), "snippet": h.get("body", "") } for h in hits] except Exception as e: return [{"title": "error", "link": "", "snippet": str(e)}] def google_scrape(query: str, max_results: int = 5) -> List[Dict[str, str]]: if SERPAPI_API_KEY and SerpAPIClient: try: client = SerpAPIClient({"api_key": SERPAPI_API_KEY, "q": query, "engine": "google", "num": max_results}) data = client.get_dict() return [{ "title": o.get("title"), "link": o.get("link"), "snippet": o.get("snippet", "") } for o in data.get("organic_results", [])] except Exception: pass if google_search: try: return [{"title": "", "link": url, "snippet": ""} for url in google_search(query, num_results=max_results)] except Exception: pass return search_duckduckgo(query, max_results=max_results) def stackoverflow_search(query: str, max_results: int = 5) -> List[Dict[str, str]]: if StackAPI is None: return [{"title": "stackapi missing", "link": "", "snippet": "Install stackapi"}] try: site = StackAPI('stackoverflow', key=STACKEXCHANGE_KEY) if STACKEXCHANGE_KEY else StackAPI('stackoverflow') items = site.fetch('search/advanced', order='desc', sort='relevance', q=query, page=1, pagesize=max_results).get('items', []) return [{"title": it.get('title'), "link": it.get('link'), "snippet": it.get('excerpt', '')} for it in items] except Exception as e: return [{"title": "error", "link": "", "snippet": str(e)}] def github_code_search(query: str, max_results: int = 5) -> List[Dict[str, str]]: if not GITHUB_TOKEN: return [{"title": "github_token_missing", "link": "", "snippet": "Set GITHUB_TOKEN"}] headers = { "Authorization": f"Bearer {GITHUB_TOKEN}", "Accept": "application/vnd.github+json" } params = {"q": f"{query} in:file", "per_page": max_results} try: r = requests.get("https://api.github.com/search/code", headers=headers, params=params, timeout=30) r.raise_for_status() return [{ "title": it.get('name'), "link": it.get('html_url'), "snippet": it.get('path') } for it in r.json().get('items', [])] except Exception as e: return [{"title": "error", "link": "", "snippet": str(e)}] # ----------------------------- # RAG helpers # ----------------------------- def build_rag_index(docs: List[Dict[str, str]]): if not LLM_INDEX_AVAILABLE: return None try: li_docs = [Document("\n".join([d.get('title', ''), d.get('link', ''), d.get('snippet', '')])) for d in docs] return GPTVectorStoreIndex.from_documents(li_docs) except Exception as e: print("[RAG BUILD ERROR]", e) return None def query_rag_index(index, query: str) -> str: if index is None: return "" try: qe = index.as_query_engine() return str(qe.query(query)) except Exception as e: return f"[RAG QUERY ERROR] {e}" # ----------------------------- # Code cleaning & safe execution # ----------------------------- def clean_code(code: str) -> str: code = code.strip() code = re.sub(r"^```(?:python)?", "", code, flags=re.MULTILINE) code = re.sub(r"```$", "", code, flags=re.MULTILINE) lines = code.splitlines() kept = [] for line in lines: s = line.strip() if (not s or s.startswith("#") or re.match(r'^(import |from |def |class |if |elif |else:|for |while |try:|except |with |return |print\()', s) or re.match(r'^[\w\[\]\.]+\s*=.*', s) or re.match(r'^[\w\[\]\.]+\(.+\)', s)): kept.append(line) return "\n".join(kept) def run_python_code(code: str, timeout: int = 30) -> Tuple[str, str]: code = clean_code(code) with tempfile.NamedTemporaryFile(delete=False, suffix=".py", mode="w") as f: f.write(code) path = f.name try: proc = subprocess.run([sys.executable, path], capture_output=True, text=True, timeout=timeout) return proc.stdout, proc.stderr except subprocess.TimeoutExpired: return "", "Execution Timeout" except Exception as e: return "", str(e) # ----------------------------- # Additional tools # ----------------------------- def type_check_code(code: str) -> str: if not mypy: return "mypy not installed" stdout, stderr, _ = mypy.api.run(["-c", code]) return stdout or stderr or "No type errors" def run_swe_bench(code: str) -> str: if not swe_bench: return "SWE-Bench not installed" try: return swe_bench.run_code(code) except Exception as e: return f"SWE-Bench error: {e}" def autogen_generate(prompt: str) -> str: if not AutoGen: return "[AutoGen not installed]" return AutoGen().run(prompt) def visualize_task_graph(task_prompt: str): # Return DOT string (Graphviz not required) lines = ["digraph G {", f' root [label="{task_prompt[:30]}..."];'] agents = ["Coder", "Debugger", "Tester", "Reviewer", "Documenter"] for i, agent in enumerate(agents, start=1): lines.append(f' n{i} [label="{agent}"];') lines.append(f' root -> n{i};') lines.append("}") return "\n".join(lines) # ----------------------------- # AI Teamwork Runner # ----------------------------- def ai_teamwork_runner(task_prompt: str, enable_rag: bool = True, max_results: int = 5) -> str: if not task_prompt: return "Please provide a task prompt." previous_run = mem_retrieve(task_prompt) if previous_run: return f"[Memory] Retrieved previous run:\n{previous_run}" # Retrieval combined = [] for src in ( search_duckduckgo(task_prompt, max_results), google_scrape(task_prompt, max_results), stackoverflow_search(task_prompt, max_results), github_code_search(task_prompt, max_results), ): if isinstance(src, list): combined.extend(src) # RAG context rag_index = build_rag_index(combined) if enable_rag else None rag_context = query_rag_index(rag_index, task_prompt) if rag_index else "" teamwork_prompt = f""" You are AI agents: Coder, Debugger, Tester, Reviewer, Documenter. Task: {task_prompt} Context: {rag_context} Return only working Python code with comments. """ llm_response = call_llm(teamwork_prompt) type_msg = type_check_code(llm_response) stdout, stderr = run_python_code(llm_response) bench_result = run_swe_bench(llm_response) mem_store(task_prompt, llm_response) if stderr: debug_prompt = f"The following code failed:\n{stderr}\nFix it and return working code." llm_debug = call_llm(debug_prompt) stdout2, stderr2 = run_python_code(llm_debug) if stderr2: return f"### Failed after Debug\nError:\n{stderr2}\nCode:\n{llm_debug}" llm_response = llm_debug stdout = stdout2 return ( f"### Type Check:\n{type_msg}\n\n" f"### Executed Output:\n{stdout}\n\n" f"### SWE-Bench Result:\n{bench_result}\n\n" f"### Final Code:\n{llm_response}" ) # ----------------------------- # Gradio UI # ----------------------------- def build_description() -> str: return ( "### AI Teamwork Runner — HF Space\n" "Enter a development task. System retrieves docs, optionally runs RAG, " "generates Python code, executes it, auto‑debugs once, type‑checks, " "benchmarks, and stores memory." ) with gr.Blocks(title="AI Teamwork Runner") as demo: gr.Markdown(build_description()) inp = gr.Textbox(label="Task Prompt", placeholder="Example: CSV merge utility", lines=3) btn = gr.Button("Run AI Teamwork") rag = gr.Checkbox(label="Enable RAG", value=True) maxres = gr.Slider(minimum=1, maximum=10, value=5, step=1, label="Max results per connector") out = gr.Textbox(label="Output + Code + Test Summary", lines=30) graph_dot = gr.Textbox(label="Task Graph (DOT)", lines=10) btn.click(fn=ai_teamwork_runner, inputs=[inp, rag, maxres], outputs=out) graph_btn = gr.Button("Show Task Graph") graph_btn.click(fn=visualize_task_graph, inputs=[inp], outputs=graph_dot) if __name__ == "__main__": demo.launch()