Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| from elasticsearch import Elasticsearch, ConnectionError, AuthenticationException | |
| # Configure logging at the application level | |
| logger = logging.getLogger(__name__) | |
| logging.basicConfig(level=logging.INFO) | |
| # Load environment variables | |
| ES_CLIENT_URL = os.getenv("ELASTICSEARCH_HOSTS", "http://localhost:9200") | |
| class ElasticsearchClientError(Exception): | |
| """Custom exception for Elasticsearch client errors.""" | |
| pass | |
| def get_es_client() -> Elasticsearch: | |
| """ | |
| Establish connection to Elasticsearch and return the client instance. | |
| Raises ElasticsearchClientError if the connection cannot be established. | |
| """ | |
| try: | |
| print("es client", ES_CLIENT_URL) | |
| # Initialize Elasticsearch client | |
| es_client = Elasticsearch( | |
| hosts=[ES_CLIENT_URL], | |
| ) | |
| # Verify connection | |
| if not es_client.ping(): | |
| error_message = "Elasticsearch cluster is not reachable!" | |
| logger.error(error_message) | |
| raise ElasticsearchClientError(error_message) | |
| logger.info("Successfully connected to Elasticsearch") | |
| return es_client | |
| except (ConnectionError, AuthenticationException) as e: | |
| error_message = f"Elasticsearch connection error: {e}" | |
| logger.error(error_message) | |
| raise ElasticsearchClientError(error_message) from e |