Spaces:
Sleeping
Sleeping
| import asyncio | |
| from typing import Coroutine, Any | |
| from asyncio import Semaphore, Queue | |
| from datetime import datetime | |
| class QueueManager: | |
| """ | |
| Async queue manager for handling concurrent pipeline executions | |
| """ | |
| def __init__(self, max_concurrent: int = 3): | |
| """ | |
| Initialize queue manager | |
| Args: | |
| max_concurrent: Maximum number of concurrent tasks | |
| """ | |
| self.max_concurrent = max_concurrent | |
| self.semaphore = Semaphore(max_concurrent) | |
| self.queue: Queue = Queue() | |
| self.active_tasks = 0 | |
| self.total_processed = 0 | |
| async def add_task(self, coro: Coroutine) -> Any: | |
| """ | |
| Add a task to the queue and execute it | |
| Args: | |
| coro: Coroutine to execute | |
| Returns: | |
| Result from the coroutine | |
| """ | |
| async with self.semaphore: | |
| self.active_tasks += 1 | |
| try: | |
| result = await coro | |
| self.total_processed += 1 | |
| return result | |
| finally: | |
| self.active_tasks -= 1 | |
| def get_queue_status(self) -> dict: | |
| """ | |
| Get current queue status | |
| Returns: | |
| Dictionary with queue statistics | |
| """ | |
| return { | |
| "active_tasks": self.active_tasks, | |
| "max_concurrent": self.max_concurrent, | |
| "total_processed": self.total_processed, | |
| "available_slots": self.max_concurrent - self.active_tasks, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def wait_for_slot(self, timeout: float = 60.0) -> bool: | |
| """ | |
| Wait for an available slot in the queue | |
| Args: | |
| timeout: Maximum time to wait in seconds | |
| Returns: | |
| True if slot became available, False if timeout | |
| """ | |
| start_time = asyncio.get_event_loop().time() | |
| while self.active_tasks >= self.max_concurrent: | |
| if asyncio.get_event_loop().time() - start_time > timeout: | |
| return False | |
| await asyncio.sleep(0.5) | |
| return True | |