import html import json import os import re from glob import glob from typing import Any, Dict, List import gradio as gr # Optional LaTeX → MathML conversion (install: pip install latex2mathml) try: from latex2mathml.converter import convert as _latex_to_mathml except Exception: # pragma: no cover _latex_to_mathml = None # Optional Markdown renderer (install: pip install markdown) try: import markdown as _markdown except Exception: # pragma: no cover _markdown = None # Precompile math pattern once to avoid recompilation on every render # Captures four math forms with inner content groups preserved: # 1) $$ ... $$ → group2 # 2) $ ... $ → group4 # 3) \[ ... \] → group6 # 4) \( ... \) → group8 MATH_PATTERN = re.compile( r"(\$\$(.*?)\$\$)" # $$block$$ r"|(\$([^\$\n]+?)\$)" # $inline$ r"|(\\\[(.*?)\\\])" # \[block\] r"|(\\\(([^\n]+?)\\\))", # \(inline\) re.DOTALL, ) def format_step_content(content: str) -> str: """Render content to HTML with Markdown and LaTeX support. If the optional `markdown` package is available, we tokenize LaTeX spans, render Markdown so headings/lists/emphasis work, then substitute tokens with MathML (via latex2mathml). Otherwise, we fall back to escaping with inline MathML conversion. Args: content: Raw text content possibly containing LaTeX snippets. Returns: Safe HTML string for display. """ if not content: return "" # Use precompiled regex to capture block and inline math (see MATH_PATTERN above) if _markdown is not None: # Tokenize math, render Markdown, then substitute tokens with MathML parts: list[str] = [] token_to_html: dict[str, str] = {} last_idx = 0 token_index = 0 for m in MATH_PATTERN.finditer(content): # Non-math prefix: normalize light TeX-ish helpers for MD prefix = content[last_idx : m.start()] if prefix: prefix = re.sub(r"\\text\{([^}]*)\}", r"\1", prefix) prefix = re.sub(r"\\emph\{([^}]*)\}", r"*\1*", prefix) prefix = re.sub(r"\\Bbb\{([^}]*)\}", r"\\mathbb{\1}", prefix) parts.append(prefix) # Extract LaTeX latex_src = None display = False if m.group(2) is not None: latex_src = m.group(2) display = True elif m.group(4) is not None: latex_src = m.group(4) display = False elif m.group(6) is not None: latex_src = m.group(6) display = True elif m.group(8) is not None: latex_src = m.group(8) display = False token = f"[[[MATH_TOKEN_{token_index}]]]" token_index += 1 if latex_src is None: token_to_html[token] = html.escape(m.group(0)).replace("\n", "
") else: try: if _latex_to_mathml is not None: mathml = _latex_to_mathml(latex_src) if display and mathml.startswith("") except Exception: token_to_html[token] = html.escape(m.group(0)).replace("\n", "
") parts.append(token) last_idx = m.end() # Trailing non-math tail = content[last_idx:] if tail: tail = re.sub(r"\\text\{([^}]*)\}", r"\1", tail) tail = re.sub(r"\\emph\{([^}]*)\}", r"*\1*", tail) tail = re.sub(r"\\Bbb\{([^}]*)\}", r"\\mathbb{\1}", tail) parts.append(tail) text_with_tokens = "".join(parts) try: html_out = _markdown.markdown(text_with_tokens, extensions=["extra", "sane_lists", "nl2br"]) except Exception: html_out = html.escape(text_with_tokens).replace("\n", "
") for token, token_html in token_to_html.items(): html_out = html_out.replace(token, token_html) return html_out # Fallback: previous approach (safe HTML escaping + optional MathML) text = content rendered_parts: list[str] = [] last_idx = 0 for m in MATH_PATTERN.finditer(text): # Add preceding non-math segment (escaped, with mild TeX tweaks) prefix = text[last_idx : m.start()] if prefix: # In non-math, normalize a few TeX-ish helpers prefix = re.sub(r"\\text\{([^}]*)\}", r"\1", prefix) prefix = re.sub(r"\\emph\{([^}]*)\}", r"\1", prefix) prefix = re.sub(r"\\Bbb\{([^}]*)\}", r"\\mathbb{\1}", prefix) rendered_parts.append(html.escape(prefix).replace("\n", "
")) # Determine which group matched and extract LaTeX latex_src = None display = False if m.group(2) is not None: # $$ ... $$ latex_src = m.group(2) display = True elif m.group(4) is not None: # $ ... $ latex_src = m.group(4) display = False elif m.group(6) is not None: # \[ ... \] latex_src = m.group(6) display = True elif m.group(8) is not None: # \( ... \) latex_src = m.group(8) display = False if latex_src is None: # Should not happen; just append raw match safely rendered_parts.append(html.escape(m.group(0)).replace("\n", "
")) else: try: mathml = _latex_to_mathml(latex_src) # Ensure block math displays as block if display and mathml.startswith("") rendered_parts.append(fallback) last_idx = m.end() # Trailing non-math segment tail = text[last_idx:] if tail: tail = re.sub(r"\\text\{([^}]*)\}", r"\1", tail) tail = re.sub(r"\\emph\{([^}]*)\}", r"\1", tail) tail = re.sub(r"\\Bbb\{([^}]*)\}", r"\\mathbb{\1}", tail) rendered_parts.append(html.escape(tail).replace("\n", "
")) return "".join(rendered_parts) def create_step_html(step: Dict[str, Any], step_num: int, border_color: str) -> str: """Create HTML for a single step (no token counting). Args: step: The step dictionary to render. step_num: 1-based step index. border_color: CSS color for the step border. Returns: HTML string for the step. """ html_out = f"""
Step {step_num}
""" observation_text = str(step.get("observation", "")) if step.get("observation") else "None" html_out += f"""
Observation
{format_step_content(observation_text)}
""" thought_text = step.get("thought", "") if step.get("thought") else "None" html_out += f"""
Thought
{format_step_content(thought_text)}
""" action_text = "" action = step.get("action") if action: if isinstance(action, dict) and "action" in action: action_text = action["action"] else: action_text = str(action) html_out += f"""
Action:
{format_step_content(action_text)}
""" reward = step.get("reward", 0.0) html_out += f"""
Reward: {reward}
""" html_out += "
" return html_out def get_agent_color(agent_name: str) -> tuple[str, str]: """Assign a stable color pair to an agent name. Args: agent_name: Agent identifier string. Returns: Tuple of (background_color, border_color). """ # Simple stable hash -> hue hue = (abs(hash(agent_name)) % 360) bg_color = f"hsl({hue}, 40%, 92%)" border_color = f"hsl({hue}, 60%, 40%)" return bg_color, border_color def create_trajectory_html(agent_name: str, trajectory: Dict[str, Any]) -> str: """Create HTML for a trajectory (no tokenization or chat rendering).""" bg_color, border_color = get_agent_color(agent_name) agent_display = agent_name html_out = f"""
{agent_display}
""" steps = trajectory.get("steps", []) for i, step in enumerate(steps): html_out += create_step_html(step, i + 1, border_color) html_out += "
" return html_out def create_episode_view(episode: Dict[str, Any]) -> str: """Create HTML view for an episode (lightweight). Args: episode: Episode dictionary to render. Returns: HTML string. """ episode_id = episode.get("id", "") termination_reason = episode.get("termination_reason", "") html_out = f"""

Episode: {episode_id}

Termination: {termination_reason}
""" task = episode.get("task", {}) if task and "question" in task: html_out += f"""

Problem Statement

{format_step_content(task["question"])}
""" html_out += """

Trajectories

""" trajectories = episode.get("trajectories", []) for agent_name, trajectory in trajectories: html_out += create_trajectory_html(agent_name, trajectory) html_out += "
" return html_out def list_runs(data_dir: str) -> list[str]: """List subdirectories inside the data directory. Args: data_dir: Path to the parent data directory. Returns: Sorted list of subdirectory names (runs). """ if not data_dir or not os.path.isdir(data_dir): return [] entries = [name for name in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, name))] return sorted(entries) def list_episode_files(run_dir: str) -> list[str]: """List episode JSON files inside a run directory. Args: run_dir: Absolute path to a run directory. Returns: Sorted list of file basenames. """ if not run_dir or not os.path.isdir(run_dir): return [] files = sorted(glob(os.path.join(run_dir, "*.json"))) return [os.path.basename(p) for p in files] def load_episode_from_file(file_path: str) -> Dict[str, Any]: """Load a single-episode JSON file into a dictionary. Args: file_path: Absolute path to JSON file containing one episode dict. Returns: Episode dictionary. """ with open(file_path) as f: data = json.load(f) # If file accidentally contains a list, take first element if isinstance(data, list): data = data[0] if data else {} return data def create_gradio_interface(data_dir: str): """Create the lightweight viewer with two dropdowns. Args: data_dir: Parent directory that contains run subdirectories. Returns: Gradio Blocks app. """ runs = list_runs(data_dir) custom_css = """ /* ─── force global light theme & readable text ─────────────── */ :root, html, body, #root, .gradio-container{ background:#ffffff !important; color-scheme:light; color:#111 !important; } .gradio-container{ --body-background-fill:#ffffff; --background-fill-primary:#ffffff; --background-fill-secondary:#ffffff; --block-background-fill:#ffffff; --panel-background-fill:#ffffff; } /* ─── normalize MathML text color to match content ─────────── */ math, math *{ color:#333 !important; fill:#333 !important; stroke:#333 !important; } """ with gr.Blocks(title="Simple Episode Viewer", css=custom_css) as demo: with gr.Group(): with gr.Row(): run_dropdown = gr.Dropdown(choices=runs, value=None, label="Run (subdirectory)") episode_dropdown = gr.Dropdown(choices=[], value=None, label="Episode (file)") display_area = gr.HTML(label="Episode View", value="

Select a run and episode.

") def update_runs(dir_value: str): available_runs = list_runs(dir_value) # Start with no run selected by default return ( gr.Dropdown(choices=available_runs, value=None), gr.Dropdown(choices=[], value=None), "

Select a run and episode.

", ) def on_run_change(selected_run: str): run_path = os.path.join(data_dir, selected_run) if selected_run else None episodes = list_episode_files(run_path) if run_path else [] return gr.Dropdown(choices=episodes, value=(episodes[0] if episodes else None)), "

Select an episode.

" def on_episode_change(selected_episode: str, selected_run: str): if not selected_episode or not selected_run: return "

Select a run and episode.

" file_path = os.path.join(data_dir, selected_run, selected_episode) try: episode = load_episode_from_file(file_path) return create_episode_view(episode) except Exception as e: return f"

Error loading episode: {html.escape(str(e))}

" # Wire events run_dropdown.change(on_run_change, inputs=[run_dropdown], outputs=[episode_dropdown, display_area]) episode_dropdown.change(on_episode_change, inputs=[episode_dropdown, run_dropdown], outputs=[display_area]) return demo if __name__ == "__main__": demo = create_gradio_interface("./data") demo.launch()