import asyncio from typing import Coroutine, Any from asyncio import Semaphore, Queue from datetime import datetime class QueueManager: """ Async queue manager for handling concurrent pipeline executions """ def __init__(self, max_concurrent: int = 3): """ Initialize queue manager Args: max_concurrent: Maximum number of concurrent tasks """ self.max_concurrent = max_concurrent self.semaphore = Semaphore(max_concurrent) self.queue: Queue = Queue() self.active_tasks = 0 self.total_processed = 0 async def add_task(self, coro: Coroutine) -> Any: """ Add a task to the queue and execute it Args: coro: Coroutine to execute Returns: Result from the coroutine """ async with self.semaphore: self.active_tasks += 1 try: result = await coro self.total_processed += 1 return result finally: self.active_tasks -= 1 def get_queue_status(self) -> dict: """ Get current queue status Returns: Dictionary with queue statistics """ return { "active_tasks": self.active_tasks, "max_concurrent": self.max_concurrent, "total_processed": self.total_processed, "available_slots": self.max_concurrent - self.active_tasks, "timestamp": datetime.now().isoformat() } async def wait_for_slot(self, timeout: float = 60.0) -> bool: """ Wait for an available slot in the queue Args: timeout: Maximum time to wait in seconds Returns: True if slot became available, False if timeout """ start_time = asyncio.get_event_loop().time() while self.active_tasks >= self.max_concurrent: if asyncio.get_event_loop().time() - start_time > timeout: return False await asyncio.sleep(0.5) return True