EastSync-AI / app.py
StanSava's picture
switch sdk from gradio to docker
617beaf unverified
from __future__ import annotations
import threading
from typing import Any
import os
from dotenv import load_dotenv
from UI import EastSyncInterface
from get_final_text import get_final_text
load_dotenv()
ui = EastSyncInterface()
current_thread: threading.Thread | None = None
is_run_cancelled = False
should_narrate_events = os.getenv("SHOULD_NARRATE_EVENTS", "False").lower() == "true"
try: # pragma: no cover - fallback when providers misconfigured
from agents.orchestrator_agent import OrchestratorAgent
except Exception as exc: # pylint: disable=broad-except
OrchestratorAgent = None # type: ignore
ORCHESTRATOR_IMPORT_ERROR = str(exc)
else:
ORCHESTRATOR_IMPORT_ERROR = None
try:
from agents.narrator_agent import NarratorAgent
except Exception as exc:
NarratorAgent = None
print(f"Narrator Agent unavailable: {exc}")
if OrchestratorAgent is not None:
try:
orchestrator_agent: OrchestratorAgent | None = OrchestratorAgent()
orchestrator_error: str | None = None
except Exception as exc: # pragma: no cover - best-effort graceful fallback
orchestrator_agent = None
orchestrator_error = str(exc)
else:
orchestrator_agent = None
orchestrator_error = ORCHESTRATOR_IMPORT_ERROR or "Provider unavailable"
if NarratorAgent:
narrator_agent = NarratorAgent()
else:
narrator_agent = None
def cancel_run():
global is_run_cancelled, current_thread
is_run_cancelled = True
if current_thread and current_thread.is_alive():
current_thread.join(timeout=1)
# Run the agent using a background task, so it can be cancelled
def analyze_and_plan_interface(user_prompt: str):
global current_thread, is_run_cancelled
# Reset cancel flag for new run
is_run_cancelled = False
if orchestrator_agent is None:
message = orchestrator_error or "Agent unavailable"
ui.register_agent_action("System Offline", {"reason": message, "prompt": user_prompt})
return ui.render_error_state(message)
# Cancel any existing run, if any
if current_thread and current_thread.is_alive() and current_thread != threading.current_thread():
is_run_cancelled = True
current_thread = threading.current_thread()
def run_agent():
try:
result: Any = orchestrator_agent.analyze_and_plan(
user_prompt,
ui.register_agent_action,
get_is_run_cancelled_flag=lambda: is_run_cancelled
)
if result is not None: # agent didn't get cancelled, keep running
ui.set_analysis_result(result)
# Queue the final corny summary to the narrator stream
if should_narrate_events and narrator_agent:
corny_summary = result.get('corny_summary', '')
final_text = get_final_text(corny_summary)
if corny_summary:
narrator_agent.queue_final_summary(final_text)
except Exception as exc:
ui.set_analysis_error(str(exc))
finally:
# Stop processing when agent thread completes
ui.stop_processing()
# Start a brand new thread for the new request
# Note: start_processing() was already called by the button handler
current_thread = threading.Thread(target=run_agent)
current_thread.start()
# Return processing state - timer will poll for updates
return ui.render_project_processing_state()
def start_audio_stream():
"""
Starts the background narrator thread and returns the audio generator.
"""
if should_narrate_events:
yield from narrator_agent.narrate_event_streaming(ui)
def main():
demo = ui.build_interface(analyze_and_plan_interface, cancel_run, start_audio_stream)
demo.launch(server_name="0.0.0.0", server_port=7860, share=True)
if __name__ == "__main__":
main()