Spaces:
Sleeping
Sleeping
| import html | |
| import json | |
| import os | |
| import re | |
| from glob import glob | |
| from typing import Any, Dict, List | |
| import gradio as gr | |
| # Optional LaTeX β MathML conversion (install: pip install latex2mathml) | |
| try: | |
| from latex2mathml.converter import convert as _latex_to_mathml | |
| except Exception: # pragma: no cover | |
| _latex_to_mathml = None | |
| # Optional Markdown renderer (install: pip install markdown) | |
| try: | |
| import markdown as _markdown | |
| except Exception: # pragma: no cover | |
| _markdown = None | |
| # Precompile math pattern once to avoid recompilation on every render | |
| # Captures four math forms with inner content groups preserved: | |
| # 1) $$ ... $$ β group2 | |
| # 2) $ ... $ β group4 | |
| # 3) \[ ... \] β group6 | |
| # 4) \( ... \) β group8 | |
| MATH_PATTERN = re.compile( | |
| r"(\$\$(.*?)\$\$)" # $$block$$ | |
| r"|(\$([^\$\n]+?)\$)" # $inline$ | |
| r"|(\\\[(.*?)\\\])" # \[block\] | |
| r"|(\\\(([^\n]+?)\\\))", # \(inline\) | |
| re.DOTALL, | |
| ) | |
| def format_step_content(content: str) -> str: | |
| """Render content to HTML with Markdown and LaTeX support. | |
| If the optional `markdown` package is available, we tokenize LaTeX spans, | |
| render Markdown so headings/lists/emphasis work, then substitute tokens | |
| with MathML (via latex2mathml). Otherwise, we fall back to escaping with | |
| inline MathML conversion. | |
| Args: | |
| content: Raw text content possibly containing LaTeX snippets. | |
| Returns: | |
| Safe HTML string for display. | |
| """ | |
| if not content: | |
| return "" | |
| # Use precompiled regex to capture block and inline math (see MATH_PATTERN above) | |
| if _markdown is not None: | |
| # Tokenize math, render Markdown, then substitute tokens with MathML | |
| parts: list[str] = [] | |
| token_to_html: dict[str, str] = {} | |
| last_idx = 0 | |
| token_index = 0 | |
| for m in MATH_PATTERN.finditer(content): | |
| # Non-math prefix: normalize light TeX-ish helpers for MD | |
| prefix = content[last_idx : m.start()] | |
| if prefix: | |
| prefix = re.sub(r"\\text\{([^}]*)\}", r"\1", prefix) | |
| prefix = re.sub(r"\\emph\{([^}]*)\}", r"*\1*", prefix) | |
| prefix = re.sub(r"\\Bbb\{([^}]*)\}", r"\\mathbb{\1}", prefix) | |
| parts.append(prefix) | |
| # Extract LaTeX | |
| latex_src = None | |
| display = False | |
| if m.group(2) is not None: | |
| latex_src = m.group(2) | |
| display = True | |
| elif m.group(4) is not None: | |
| latex_src = m.group(4) | |
| display = False | |
| elif m.group(6) is not None: | |
| latex_src = m.group(6) | |
| display = True | |
| elif m.group(8) is not None: | |
| latex_src = m.group(8) | |
| display = False | |
| token = f"[[[MATH_TOKEN_{token_index}]]]" | |
| token_index += 1 | |
| if latex_src is None: | |
| token_to_html[token] = html.escape(m.group(0)).replace("\n", "<br>") | |
| else: | |
| try: | |
| if _latex_to_mathml is not None: | |
| mathml = _latex_to_mathml(latex_src) | |
| if display and mathml.startswith("<math") and " display=" not in mathml: | |
| mathml = mathml.replace("<math", '<math display="block"', 1) | |
| token_to_html[token] = mathml | |
| else: | |
| token_to_html[token] = html.escape(m.group(0)).replace("\n", "<br>") | |
| except Exception: | |
| token_to_html[token] = html.escape(m.group(0)).replace("\n", "<br>") | |
| parts.append(token) | |
| last_idx = m.end() | |
| # Trailing non-math | |
| tail = content[last_idx:] | |
| if tail: | |
| tail = re.sub(r"\\text\{([^}]*)\}", r"\1", tail) | |
| tail = re.sub(r"\\emph\{([^}]*)\}", r"*\1*", tail) | |
| tail = re.sub(r"\\Bbb\{([^}]*)\}", r"\\mathbb{\1}", tail) | |
| parts.append(tail) | |
| text_with_tokens = "".join(parts) | |
| try: | |
| html_out = _markdown.markdown(text_with_tokens, extensions=["extra", "sane_lists", "nl2br"]) | |
| except Exception: | |
| html_out = html.escape(text_with_tokens).replace("\n", "<br>") | |
| for token, token_html in token_to_html.items(): | |
| html_out = html_out.replace(token, token_html) | |
| return html_out | |
| # Fallback: previous approach (safe HTML escaping + optional MathML) | |
| text = content | |
| rendered_parts: list[str] = [] | |
| last_idx = 0 | |
| for m in MATH_PATTERN.finditer(text): | |
| # Add preceding non-math segment (escaped, with mild TeX tweaks) | |
| prefix = text[last_idx : m.start()] | |
| if prefix: | |
| # In non-math, normalize a few TeX-ish helpers | |
| prefix = re.sub(r"\\text\{([^}]*)\}", r"\1", prefix) | |
| prefix = re.sub(r"\\emph\{([^}]*)\}", r"<em>\1</em>", prefix) | |
| prefix = re.sub(r"\\Bbb\{([^}]*)\}", r"\\mathbb{\1}", prefix) | |
| rendered_parts.append(html.escape(prefix).replace("\n", "<br>")) | |
| # Determine which group matched and extract LaTeX | |
| latex_src = None | |
| display = False | |
| if m.group(2) is not None: # $$ ... $$ | |
| latex_src = m.group(2) | |
| display = True | |
| elif m.group(4) is not None: # $ ... $ | |
| latex_src = m.group(4) | |
| display = False | |
| elif m.group(6) is not None: # \[ ... \] | |
| latex_src = m.group(6) | |
| display = True | |
| elif m.group(8) is not None: # \( ... \) | |
| latex_src = m.group(8) | |
| display = False | |
| if latex_src is None: | |
| # Should not happen; just append raw match safely | |
| rendered_parts.append(html.escape(m.group(0)).replace("\n", "<br>")) | |
| else: | |
| try: | |
| mathml = _latex_to_mathml(latex_src) | |
| # Ensure block math displays as block | |
| if display and mathml.startswith("<math"): | |
| if " display=" not in mathml: | |
| mathml = mathml.replace("<math", '<math display="block"', 1) | |
| rendered_parts.append(mathml) | |
| except Exception: | |
| # On failure, fall back to showing the LaTeX literally | |
| fallback = html.escape(m.group(0)).replace("\n", "<br>") | |
| rendered_parts.append(fallback) | |
| last_idx = m.end() | |
| # Trailing non-math segment | |
| tail = text[last_idx:] | |
| if tail: | |
| tail = re.sub(r"\\text\{([^}]*)\}", r"\1", tail) | |
| tail = re.sub(r"\\emph\{([^}]*)\}", r"<em>\1</em>", tail) | |
| tail = re.sub(r"\\Bbb\{([^}]*)\}", r"\\mathbb{\1}", tail) | |
| rendered_parts.append(html.escape(tail).replace("\n", "<br>")) | |
| return "".join(rendered_parts) | |
| def create_step_html(step: Dict[str, Any], step_num: int, border_color: str) -> str: | |
| """Create HTML for a single step (no token counting). | |
| Args: | |
| step: The step dictionary to render. | |
| step_num: 1-based step index. | |
| border_color: CSS color for the step border. | |
| Returns: | |
| HTML string for the step. | |
| """ | |
| html_out = f""" | |
| <details style="margin: 8px 0;"> | |
| <summary style="cursor: pointer; color: {border_color}; font-weight: bold;"> | |
| Step {step_num} | |
| </summary> | |
| <div style=" | |
| background-color: white; | |
| border-radius: 6px; | |
| padding: 12px; | |
| margin: 8px 0; | |
| border-left: 4px solid {border_color}; | |
| "> | |
| """ | |
| observation_text = str(step.get("observation", "")) if step.get("observation") else "None" | |
| html_out += f""" | |
| <details style="margin: 8px 0;"> | |
| <summary style="cursor: pointer; color: #666; font-size: 0.9em;">Observation</summary> | |
| <div style="background-color: #f8f9fa; padding: 8px; border-radius: 4px; margin-top: 4px; color: #333;"> | |
| {format_step_content(observation_text)} | |
| </div> | |
| </details> | |
| """ | |
| thought_text = step.get("thought", "") if step.get("thought") else "None" | |
| html_out += f""" | |
| <details style="margin: 8px 0;"> | |
| <summary style="cursor: pointer; color: #666; font-size: 0.9em;">Thought</summary> | |
| <div style="background-color: #f8f9fa; padding: 8px; border-radius: 4px; margin-top: 4px; color: #333;"> | |
| {format_step_content(thought_text)} | |
| </div> | |
| </details> | |
| """ | |
| action_text = "" | |
| action = step.get("action") | |
| if action: | |
| if isinstance(action, dict) and "action" in action: | |
| action_text = action["action"] | |
| else: | |
| action_text = str(action) | |
| html_out += f""" | |
| <div style="margin: 8px 0;"> | |
| <strong style="color: #333;">Action:</strong> | |
| <div style="background-color: #f8f9fa; padding: 8px; border-radius: 4px; margin-top: 4px; color: #333;"> | |
| {format_step_content(action_text)} | |
| </div> | |
| </div> | |
| """ | |
| reward = step.get("reward", 0.0) | |
| html_out += f""" | |
| <div style="margin: 8px 0;"> | |
| <strong style="color: #333;">Reward:</strong> <span style="color: #333;">{reward}</span> | |
| </div> | |
| """ | |
| html_out += "</div></details>" | |
| return html_out | |
| def get_agent_color(agent_name: str) -> tuple[str, str]: | |
| """Assign a stable color pair to an agent name. | |
| Args: | |
| agent_name: Agent identifier string. | |
| Returns: | |
| Tuple of (background_color, border_color). | |
| """ | |
| # Simple stable hash -> hue | |
| hue = (abs(hash(agent_name)) % 360) | |
| bg_color = f"hsl({hue}, 40%, 92%)" | |
| border_color = f"hsl({hue}, 60%, 40%)" | |
| return bg_color, border_color | |
| def create_trajectory_html(agent_name: str, trajectory: Dict[str, Any]) -> str: | |
| """Create HTML for a trajectory (no tokenization or chat rendering).""" | |
| bg_color, border_color = get_agent_color(agent_name) | |
| agent_display = agent_name | |
| html_out = f""" | |
| <div style=" | |
| background-color: {bg_color}; | |
| border-radius: 8px; | |
| padding: 16px; | |
| margin: 10px 0; | |
| border-left: 4px solid {border_color}; | |
| "> | |
| <div style="font-weight: bold; color: {border_color}; margin-bottom: 10px;">{agent_display}</div> | |
| """ | |
| steps = trajectory.get("steps", []) | |
| for i, step in enumerate(steps): | |
| html_out += create_step_html(step, i + 1, border_color) | |
| html_out += "</div>" | |
| return html_out | |
| def create_episode_view(episode: Dict[str, Any]) -> str: | |
| """Create HTML view for an episode (lightweight). | |
| Args: | |
| episode: Episode dictionary to render. | |
| Returns: | |
| HTML string. | |
| """ | |
| episode_id = episode.get("id", "") | |
| termination_reason = episode.get("termination_reason", "") | |
| html_out = f""" | |
| <div style="max-width: 1200px; margin: 0 auto; font-family: Arial, sans-serif;"> | |
| <div style=" | |
| background-color: #f8f9fa; | |
| border-radius: 8px; | |
| padding: 20px; | |
| margin-bottom: 20px; | |
| border-left: 4px solid #007bff; | |
| "> | |
| <h2 style="margin: 0 0 12px 0; color: #333;"> | |
| Episode: {episode_id} | |
| </h2> | |
| <div style="color:#333; margin-bottom: 16px;"> | |
| <strong>Termination:</strong> <span>{termination_reason}</span> | |
| </div> | |
| </div> | |
| """ | |
| task = episode.get("task", {}) | |
| if task and "question" in task: | |
| html_out += f""" | |
| <div style=" | |
| background-color: #e3f2fd; | |
| border-radius: 8px; | |
| padding: 16px; | |
| margin-bottom: 20px; | |
| border-left: 4px solid #1976d2; | |
| "> | |
| <h3 style="margin: 0 0 12px 0; color: #1976d2;">Problem Statement</h3> | |
| <div style="background-color: white; padding: 12px; border-radius: 4px; color: #333;"> | |
| {format_step_content(task["question"])} | |
| </div> | |
| </div> | |
| """ | |
| html_out += """ | |
| <div style=" | |
| border: 2px solid #ddd; | |
| border-radius: 8px; | |
| padding: 20px; | |
| background-color: white; | |
| "> | |
| <h3 style="margin: 0 0 16px 0; color: #333;">Trajectories</h3> | |
| """ | |
| trajectories = episode.get("trajectories", []) | |
| for agent_name, trajectory in trajectories: | |
| html_out += create_trajectory_html(agent_name, trajectory) | |
| html_out += "</div></div>" | |
| return html_out | |
| def list_runs(data_dir: str) -> list[str]: | |
| """List subdirectories inside the data directory. | |
| Args: | |
| data_dir: Path to the parent data directory. | |
| Returns: | |
| Sorted list of subdirectory names (runs). | |
| """ | |
| if not data_dir or not os.path.isdir(data_dir): | |
| return [] | |
| entries = [name for name in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, name))] | |
| return sorted(entries) | |
| def list_episode_files(run_dir: str) -> list[str]: | |
| """List episode JSON files inside a run directory. | |
| Args: | |
| run_dir: Absolute path to a run directory. | |
| Returns: | |
| Sorted list of file basenames. | |
| """ | |
| if not run_dir or not os.path.isdir(run_dir): | |
| return [] | |
| files = sorted(glob(os.path.join(run_dir, "*.json"))) | |
| return [os.path.basename(p) for p in files] | |
| def load_episode_from_file(file_path: str) -> Dict[str, Any]: | |
| """Load a single-episode JSON file into a dictionary. | |
| Args: | |
| file_path: Absolute path to JSON file containing one episode dict. | |
| Returns: | |
| Episode dictionary. | |
| """ | |
| with open(file_path) as f: | |
| data = json.load(f) | |
| # If file accidentally contains a list, take first element | |
| if isinstance(data, list): | |
| data = data[0] if data else {} | |
| return data | |
| def create_gradio_interface(data_dir: str): | |
| """Create the lightweight viewer with two dropdowns. | |
| Args: | |
| data_dir: Parent directory that contains run subdirectories. | |
| Returns: | |
| Gradio Blocks app. | |
| """ | |
| runs = list_runs(data_dir) | |
| custom_css = """ | |
| /* βββ force global light theme & readable text βββββββββββββββ */ | |
| :root, html, body, #root, .gradio-container{ | |
| background:#ffffff !important; | |
| color-scheme:light; | |
| color:#111 !important; | |
| } | |
| .gradio-container{ | |
| --body-background-fill:#ffffff; | |
| --background-fill-primary:#ffffff; | |
| --background-fill-secondary:#ffffff; | |
| --block-background-fill:#ffffff; | |
| --panel-background-fill:#ffffff; | |
| } | |
| /* βββ normalize MathML text color to match content βββββββββββ */ | |
| math, math *{ | |
| color:#333 !important; | |
| fill:#333 !important; | |
| stroke:#333 !important; | |
| } | |
| """ | |
| with gr.Blocks(title="Simple Episode Viewer", css=custom_css) as demo: | |
| with gr.Group(): | |
| with gr.Row(): | |
| run_dropdown = gr.Dropdown(choices=runs, value=None, label="Run (subdirectory)") | |
| episode_dropdown = gr.Dropdown(choices=[], value=None, label="Episode (file)") | |
| display_area = gr.HTML(label="Episode View", value="<p>Select a run and episode.</p>") | |
| def update_runs(dir_value: str): | |
| available_runs = list_runs(dir_value) | |
| # Start with no run selected by default | |
| return ( | |
| gr.Dropdown(choices=available_runs, value=None), | |
| gr.Dropdown(choices=[], value=None), | |
| "<p>Select a run and episode.</p>", | |
| ) | |
| def on_run_change(selected_run: str): | |
| run_path = os.path.join(data_dir, selected_run) if selected_run else None | |
| episodes = list_episode_files(run_path) if run_path else [] | |
| return gr.Dropdown(choices=episodes, value=(episodes[0] if episodes else None)), "<p>Select an episode.</p>" | |
| def on_episode_change(selected_episode: str, selected_run: str): | |
| if not selected_episode or not selected_run: | |
| return "<p>Select a run and episode.</p>" | |
| file_path = os.path.join(data_dir, selected_run, selected_episode) | |
| try: | |
| episode = load_episode_from_file(file_path) | |
| return create_episode_view(episode) | |
| except Exception as e: | |
| return f"<p>Error loading episode: {html.escape(str(e))}</p>" | |
| # Wire events | |
| run_dropdown.change(on_run_change, inputs=[run_dropdown], outputs=[episode_dropdown, display_area]) | |
| episode_dropdown.change(on_episode_change, inputs=[episode_dropdown, run_dropdown], outputs=[display_area]) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_gradio_interface("./data") | |
| demo.launch() | |