Spaces:
Sleeping
Sleeping
| """ | |
| AI Teamwork Runner — Full Hugging Face Space | |
| """ | |
| import os | |
| import re | |
| import sys | |
| import tempfile | |
| import subprocess | |
| from typing import List, Dict, Tuple, Optional | |
| import requests | |
| import gradio as gr | |
| # ----------------------------- | |
| # Optional connectors | |
| # ----------------------------- | |
| try: | |
| from duckduckgo_search import ddg | |
| except Exception: | |
| ddg = None | |
| try: | |
| from googlesearch import search as google_search | |
| except Exception: | |
| google_search = None | |
| try: | |
| from serpapi import GoogleSearch as SerpAPIClient | |
| except Exception: | |
| SerpAPIClient = None | |
| try: | |
| from stackapi import StackAPI | |
| except Exception: | |
| StackAPI = None | |
| try: | |
| from llama_index import Document, GPTVectorStoreIndex | |
| LLM_INDEX_AVAILABLE = True | |
| except Exception: | |
| LLM_INDEX_AVAILABLE = False | |
| try: | |
| import swe_bench | |
| except Exception: | |
| swe_bench = None | |
| try: | |
| import mypy.api | |
| except Exception: | |
| mypy = None | |
| try: | |
| from autogen import AutoGen | |
| except Exception: | |
| AutoGen = None | |
| try: | |
| from langgraph import LangGraph | |
| except Exception: | |
| LangGraph = None | |
| try: | |
| from memgpt import MemGPT | |
| except Exception: | |
| MemGPT = None | |
| # ----------------------------- | |
| # Load secrets / env | |
| # ----------------------------- | |
| OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") | |
| SERPAPI_API_KEY = os.environ.get("SERPAPI_API_KEY") | |
| GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN") | |
| STACKEXCHANGE_KEY = os.environ.get("STACKEXCHANGE_KEY") | |
| OPENROUTER_URL = "https://openrouter.ai/api/v1/chat/completions" | |
| # ----------------------------- | |
| # MemGPT memory agent | |
| # ----------------------------- | |
| mem_agent = MemGPT(name="ai_teamwork_memory") if MemGPT else None | |
| def mem_store(key: str, value: str): | |
| if mem_agent: | |
| mem_agent.store(key, value) | |
| def mem_retrieve(key: str) -> Optional[str]: | |
| if mem_agent: | |
| return mem_agent.load(key) | |
| return None | |
| # ----------------------------- | |
| # LLM call (OpenRouter) | |
| # ----------------------------- | |
| def call_llm(prompt: str, | |
| model: str = "meta-llama/llama-3-70b-instruct", | |
| temperature: float = 0.2, | |
| max_tokens: int = 1200) -> str: | |
| if not OPENROUTER_API_KEY: | |
| return "[LLM] OPENROUTER_API_KEY not set. Please set it in Space settings." | |
| headers = { | |
| "Authorization": f"Bearer {OPENROUTER_API_KEY}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "model": model, | |
| "messages": [{"role": "user", "content": prompt}], | |
| "temperature": temperature, | |
| "max_tokens": max_tokens, | |
| } | |
| try: | |
| r = requests.post(OPENROUTER_URL, headers=headers, json=payload, timeout=120) | |
| r.raise_for_status() | |
| body = r.json() | |
| return body.get("choices", [{}])[0].get("message", {}).get("content", "") | |
| except Exception as e: | |
| return f"[LLM ERROR] {e}" | |
| # ----------------------------- | |
| # Search connectors | |
| # ----------------------------- | |
| def search_duckduckgo(query: str, max_results: int = 5) -> List[Dict[str, str]]: | |
| if ddg is None: | |
| return [{"title": "duckduckgo_search missing", "link": "", "snippet": "Install duckduckgo_search"}] | |
| try: | |
| hits = ddg(query, max_results=max_results) | |
| return [{ | |
| "title": h.get("title", ""), | |
| "link": h.get("href") or h.get("url", ""), | |
| "snippet": h.get("body", "") | |
| } for h in hits] | |
| except Exception as e: | |
| return [{"title": "error", "link": "", "snippet": str(e)}] | |
| def google_scrape(query: str, max_results: int = 5) -> List[Dict[str, str]]: | |
| if SERPAPI_API_KEY and SerpAPIClient: | |
| try: | |
| client = SerpAPIClient({"api_key": SERPAPI_API_KEY, "q": query, | |
| "engine": "google", "num": max_results}) | |
| data = client.get_dict() | |
| return [{ | |
| "title": o.get("title"), | |
| "link": o.get("link"), | |
| "snippet": o.get("snippet", "") | |
| } for o in data.get("organic_results", [])] | |
| except Exception: | |
| pass | |
| if google_search: | |
| try: | |
| return [{"title": "", "link": url, "snippet": ""} for url in google_search(query, num_results=max_results)] | |
| except Exception: | |
| pass | |
| return search_duckduckgo(query, max_results=max_results) | |
| def stackoverflow_search(query: str, max_results: int = 5) -> List[Dict[str, str]]: | |
| if StackAPI is None: | |
| return [{"title": "stackapi missing", "link": "", "snippet": "Install stackapi"}] | |
| try: | |
| site = StackAPI('stackoverflow', key=STACKEXCHANGE_KEY) if STACKEXCHANGE_KEY else StackAPI('stackoverflow') | |
| items = site.fetch('search/advanced', order='desc', sort='relevance', | |
| q=query, page=1, pagesize=max_results).get('items', []) | |
| return [{"title": it.get('title'), "link": it.get('link'), | |
| "snippet": it.get('excerpt', '')} for it in items] | |
| except Exception as e: | |
| return [{"title": "error", "link": "", "snippet": str(e)}] | |
| def github_code_search(query: str, max_results: int = 5) -> List[Dict[str, str]]: | |
| if not GITHUB_TOKEN: | |
| return [{"title": "github_token_missing", "link": "", "snippet": "Set GITHUB_TOKEN"}] | |
| headers = { | |
| "Authorization": f"Bearer {GITHUB_TOKEN}", | |
| "Accept": "application/vnd.github+json" | |
| } | |
| params = {"q": f"{query} in:file", "per_page": max_results} | |
| try: | |
| r = requests.get("https://api.github.com/search/code", headers=headers, params=params, timeout=30) | |
| r.raise_for_status() | |
| return [{ | |
| "title": it.get('name'), "link": it.get('html_url'), "snippet": it.get('path') | |
| } for it in r.json().get('items', [])] | |
| except Exception as e: | |
| return [{"title": "error", "link": "", "snippet": str(e)}] | |
| # ----------------------------- | |
| # RAG helpers | |
| # ----------------------------- | |
| def build_rag_index(docs: List[Dict[str, str]]): | |
| if not LLM_INDEX_AVAILABLE: | |
| return None | |
| try: | |
| li_docs = [Document("\n".join([d.get('title', ''), d.get('link', ''), d.get('snippet', '')])) for d in docs] | |
| return GPTVectorStoreIndex.from_documents(li_docs) | |
| except Exception as e: | |
| print("[RAG BUILD ERROR]", e) | |
| return None | |
| def query_rag_index(index, query: str) -> str: | |
| if index is None: | |
| return "" | |
| try: | |
| qe = index.as_query_engine() | |
| return str(qe.query(query)) | |
| except Exception as e: | |
| return f"[RAG QUERY ERROR] {e}" | |
| # ----------------------------- | |
| # Code cleaning & safe execution | |
| # ----------------------------- | |
| def clean_code(code: str) -> str: | |
| code = code.strip() | |
| code = re.sub(r"^```(?:python)?", "", code, flags=re.MULTILINE) | |
| code = re.sub(r"```$", "", code, flags=re.MULTILINE) | |
| lines = code.splitlines() | |
| kept = [] | |
| for line in lines: | |
| s = line.strip() | |
| if (not s or s.startswith("#") | |
| or re.match(r'^(import |from |def |class |if |elif |else:|for |while |try:|except |with |return |print\()', s) | |
| or re.match(r'^[\w\[\]\.]+\s*=.*', s) | |
| or re.match(r'^[\w\[\]\.]+\(.+\)', s)): | |
| kept.append(line) | |
| return "\n".join(kept) | |
| def run_python_code(code: str, timeout: int = 30) -> Tuple[str, str]: | |
| code = clean_code(code) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".py", mode="w") as f: | |
| f.write(code) | |
| path = f.name | |
| try: | |
| proc = subprocess.run([sys.executable, path], capture_output=True, text=True, timeout=timeout) | |
| return proc.stdout, proc.stderr | |
| except subprocess.TimeoutExpired: | |
| return "", "Execution Timeout" | |
| except Exception as e: | |
| return "", str(e) | |
| # ----------------------------- | |
| # Additional tools | |
| # ----------------------------- | |
| def type_check_code(code: str) -> str: | |
| if not mypy: | |
| return "mypy not installed" | |
| stdout, stderr, _ = mypy.api.run(["-c", code]) | |
| return stdout or stderr or "No type errors" | |
| def run_swe_bench(code: str) -> str: | |
| if not swe_bench: | |
| return "SWE-Bench not installed" | |
| try: | |
| return swe_bench.run_code(code) | |
| except Exception as e: | |
| return f"SWE-Bench error: {e}" | |
| def autogen_generate(prompt: str) -> str: | |
| if not AutoGen: | |
| return "[AutoGen not installed]" | |
| return AutoGen().run(prompt) | |
| def visualize_task_graph(task_prompt: str): | |
| # Return DOT string (Graphviz not required) | |
| lines = ["digraph G {", f' root [label="{task_prompt[:30]}..."];'] | |
| agents = ["Coder", "Debugger", "Tester", "Reviewer", "Documenter"] | |
| for i, agent in enumerate(agents, start=1): | |
| lines.append(f' n{i} [label="{agent}"];') | |
| lines.append(f' root -> n{i};') | |
| lines.append("}") | |
| return "\n".join(lines) | |
| # ----------------------------- | |
| # AI Teamwork Runner | |
| # ----------------------------- | |
| def ai_teamwork_runner(task_prompt: str, enable_rag: bool = True, max_results: int = 5) -> str: | |
| if not task_prompt: | |
| return "Please provide a task prompt." | |
| previous_run = mem_retrieve(task_prompt) | |
| if previous_run: | |
| return f"[Memory] Retrieved previous run:\n{previous_run}" | |
| # Retrieval | |
| combined = [] | |
| for src in ( | |
| search_duckduckgo(task_prompt, max_results), | |
| google_scrape(task_prompt, max_results), | |
| stackoverflow_search(task_prompt, max_results), | |
| github_code_search(task_prompt, max_results), | |
| ): | |
| if isinstance(src, list): | |
| combined.extend(src) | |
| # RAG context | |
| rag_index = build_rag_index(combined) if enable_rag else None | |
| rag_context = query_rag_index(rag_index, task_prompt) if rag_index else "" | |
| teamwork_prompt = f""" | |
| You are AI agents: Coder, Debugger, Tester, Reviewer, Documenter. | |
| Task: {task_prompt} | |
| Context: | |
| {rag_context} | |
| Return only working Python code with comments. | |
| """ | |
| llm_response = call_llm(teamwork_prompt) | |
| type_msg = type_check_code(llm_response) | |
| stdout, stderr = run_python_code(llm_response) | |
| bench_result = run_swe_bench(llm_response) | |
| mem_store(task_prompt, llm_response) | |
| if stderr: | |
| debug_prompt = f"The following code failed:\n{stderr}\nFix it and return working code." | |
| llm_debug = call_llm(debug_prompt) | |
| stdout2, stderr2 = run_python_code(llm_debug) | |
| if stderr2: | |
| return f"### Failed after Debug\nError:\n{stderr2}\nCode:\n{llm_debug}" | |
| llm_response = llm_debug | |
| stdout = stdout2 | |
| return ( | |
| f"### Type Check:\n{type_msg}\n\n" | |
| f"### Executed Output:\n{stdout}\n\n" | |
| f"### SWE-Bench Result:\n{bench_result}\n\n" | |
| f"### Final Code:\n{llm_response}" | |
| ) | |
| # ----------------------------- | |
| # Gradio UI | |
| # ----------------------------- | |
| def build_description() -> str: | |
| return ( | |
| "### AI Teamwork Runner — HF Space\n" | |
| "Enter a development task. System retrieves docs, optionally runs RAG, " | |
| "generates Python code, executes it, auto‑debugs once, type‑checks, " | |
| "benchmarks, and stores memory." | |
| ) | |
| with gr.Blocks(title="AI Teamwork Runner") as demo: | |
| gr.Markdown(build_description()) | |
| inp = gr.Textbox(label="Task Prompt", placeholder="Example: CSV merge utility", lines=3) | |
| btn = gr.Button("Run AI Teamwork") | |
| rag = gr.Checkbox(label="Enable RAG", value=True) | |
| maxres = gr.Slider(minimum=1, maximum=10, value=5, step=1, label="Max results per connector") | |
| out = gr.Textbox(label="Output + Code + Test Summary", lines=30) | |
| graph_dot = gr.Textbox(label="Task Graph (DOT)", lines=10) | |
| btn.click(fn=ai_teamwork_runner, inputs=[inp, rag, maxres], outputs=out) | |
| graph_btn = gr.Button("Show Task Graph") | |
| graph_btn.click(fn=visualize_task_graph, inputs=[inp], outputs=graph_dot) | |
| if __name__ == "__main__": | |
| demo.launch() | |