import time from collections import defaultdict, deque from threading import Lock from typing import Dict class RateLimiter: """ Thread-safe rate limiter for API requests """ def __init__(self, max_requests_per_minute: int = 10): """ Initialize rate limiter Args: max_requests_per_minute: Maximum requests allowed per minute """ self.max_requests = max_requests_per_minute self.window_seconds = 60 self.requests: Dict[str, deque] = defaultdict(deque) self.lock = Lock() def _clean_old_requests(self, client_id: str): """Remove requests older than the time window""" current_time = time.time() cutoff_time = current_time - self.window_seconds while self.requests[client_id] and self.requests[client_id][0] < cutoff_time: self.requests[client_id].popleft() def allow_request(self, client_id: str = "default") -> bool: """ Check if a request is allowed Args: client_id: Identifier for the client (e.g., IP address) Returns: True if request is allowed, False otherwise """ with self.lock: self._clean_old_requests(client_id) if len(self.requests[client_id]) >= self.max_requests: return False self.requests[client_id].append(time.time()) return True def get_remaining_requests(self, client_id: str = "default") -> int: """ Get number of remaining requests in current window Args: client_id: Identifier for the client Returns: Number of remaining requests """ with self.lock: self._clean_old_requests(client_id) return max(0, self.max_requests - len(self.requests[client_id])) def get_reset_time(self, client_id: str = "default") -> float: """ Get time until rate limit resets Args: client_id: Identifier for the client Returns: Seconds until oldest request expires """ with self.lock: self._clean_old_requests(client_id) if not self.requests[client_id]: return 0 oldest_request = self.requests[client_id][0] current_time = time.time() reset_time = oldest_request + self.window_seconds return max(0, reset_time - current_time)