MetaSearch / utils /queue_manager.py
Tirath5504's picture
Initial upload
f2200ab verified
import asyncio
from typing import Coroutine, Any
from asyncio import Semaphore, Queue
from datetime import datetime
class QueueManager:
"""
Async queue manager for handling concurrent pipeline executions
"""
def __init__(self, max_concurrent: int = 3):
"""
Initialize queue manager
Args:
max_concurrent: Maximum number of concurrent tasks
"""
self.max_concurrent = max_concurrent
self.semaphore = Semaphore(max_concurrent)
self.queue: Queue = Queue()
self.active_tasks = 0
self.total_processed = 0
async def add_task(self, coro: Coroutine) -> Any:
"""
Add a task to the queue and execute it
Args:
coro: Coroutine to execute
Returns:
Result from the coroutine
"""
async with self.semaphore:
self.active_tasks += 1
try:
result = await coro
self.total_processed += 1
return result
finally:
self.active_tasks -= 1
def get_queue_status(self) -> dict:
"""
Get current queue status
Returns:
Dictionary with queue statistics
"""
return {
"active_tasks": self.active_tasks,
"max_concurrent": self.max_concurrent,
"total_processed": self.total_processed,
"available_slots": self.max_concurrent - self.active_tasks,
"timestamp": datetime.now().isoformat()
}
async def wait_for_slot(self, timeout: float = 60.0) -> bool:
"""
Wait for an available slot in the queue
Args:
timeout: Maximum time to wait in seconds
Returns:
True if slot became available, False if timeout
"""
start_time = asyncio.get_event_loop().time()
while self.active_tasks >= self.max_concurrent:
if asyncio.get_event_loop().time() - start_time > timeout:
return False
await asyncio.sleep(0.5)
return True