File size: 2,629 Bytes
f2200ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import time
from collections import defaultdict, deque
from threading import Lock
from typing import Dict

class RateLimiter:
    """
    Thread-safe rate limiter for API requests
    """
    
    def __init__(self, max_requests_per_minute: int = 10):
        """
        Initialize rate limiter
        
        Args:
            max_requests_per_minute: Maximum requests allowed per minute
        """
        self.max_requests = max_requests_per_minute
        self.window_seconds = 60
        self.requests: Dict[str, deque] = defaultdict(deque)
        self.lock = Lock()
    
    def _clean_old_requests(self, client_id: str):
        """Remove requests older than the time window"""
        current_time = time.time()
        cutoff_time = current_time - self.window_seconds
        
        while self.requests[client_id] and self.requests[client_id][0] < cutoff_time:
            self.requests[client_id].popleft()
    
    def allow_request(self, client_id: str = "default") -> bool:
        """
        Check if a request is allowed
        
        Args:
            client_id: Identifier for the client (e.g., IP address)
        
        Returns:
            True if request is allowed, False otherwise
        """
        with self.lock:
            self._clean_old_requests(client_id)
            
            if len(self.requests[client_id]) >= self.max_requests:
                return False
            
            self.requests[client_id].append(time.time())
            return True
    
    def get_remaining_requests(self, client_id: str = "default") -> int:
        """
        Get number of remaining requests in current window
        
        Args:
            client_id: Identifier for the client
        
        Returns:
            Number of remaining requests
        """
        with self.lock:
            self._clean_old_requests(client_id)
            return max(0, self.max_requests - len(self.requests[client_id]))
    
    def get_reset_time(self, client_id: str = "default") -> float:
        """
        Get time until rate limit resets
        
        Args:
            client_id: Identifier for the client
        
        Returns:
            Seconds until oldest request expires
        """
        with self.lock:
            self._clean_old_requests(client_id)
            
            if not self.requests[client_id]:
                return 0
            
            oldest_request = self.requests[client_id][0]
            current_time = time.time()
            reset_time = oldest_request + self.window_seconds
            
            return max(0, reset_time - current_time)