Spaces:
Running
Running
| import os | |
| from pathlib import Path | |
| from typing import Optional, Dict, Any, Union | |
| import magic | |
| from docling.document_converter import DocumentConverter | |
| from datetime import datetime | |
| import shutil | |
| import tempfile | |
| from .types import ParsedDocument, DocumentMetadata | |
| from .exceptions import UnsupportedFormatError, ParseError | |
| class DocumentParser: | |
| """ | |
| A multiformat document parser using Docling | |
| """ | |
| SUPPORTED_FORMATS = { | |
| 'application/pdf': 'pdf', | |
| 'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'docx', | |
| 'text/plain': 'txt', | |
| 'text/html': 'html', | |
| 'text/markdown': 'md', | |
| # Add common variations | |
| 'application/x-pdf': 'pdf', | |
| 'application/acrobat': 'pdf', | |
| 'application/msword': 'docx', | |
| 'text/x-markdown': 'md', | |
| 'text/x-html': 'html' | |
| } | |
| EXTENSION_TO_MIME = { | |
| '.pdf': 'application/pdf', | |
| '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', | |
| '.txt': 'text/plain', | |
| '.html': 'text/html', | |
| '.htm': 'text/html', | |
| '.md': 'text/markdown', | |
| '.markdown': 'text/markdown' | |
| } | |
| def __init__(self, config: Optional[Dict[str, Any]] = None): | |
| self.config = config or {} | |
| self.converter = DocumentConverter() | |
| # Create a temporary directory for processing files | |
| self.temp_dir = Path(tempfile.mkdtemp(prefix="dockling_")) | |
| def __del__(self): | |
| """Cleanup temporary directory on object destruction""" | |
| if hasattr(self, 'temp_dir') and self.temp_dir.exists(): | |
| shutil.rmtree(self.temp_dir, ignore_errors=True) | |
| def _validate_and_copy_file(self, file_path: Union[str, Path]) -> Path: | |
| """ | |
| Validate file and copy to temporary location with correct extension | |
| """ | |
| file_path = Path(file_path) | |
| if not file_path.exists(): | |
| raise FileNotFoundError(f"File not found: {file_path}") | |
| # Try to determine format from extension first | |
| extension = file_path.suffix.lower() | |
| mime_type = self.EXTENSION_TO_MIME.get(extension) | |
| # If extension not recognized, use magic | |
| if not mime_type: | |
| mime_type = magic.from_file(str(file_path), mime=True) | |
| if mime_type in self.SUPPORTED_FORMATS: | |
| extension = f".{self.SUPPORTED_FORMATS[mime_type]}" | |
| else: | |
| raise UnsupportedFormatError( | |
| f"Unsupported file format: {mime_type}. " | |
| f"Supported formats are: {', '.join(set(self.SUPPORTED_FORMATS.values()))}" | |
| ) | |
| # Copy file to temp directory with correct extension | |
| temp_file = self.temp_dir / f"doc{extension}" | |
| shutil.copy2(file_path, temp_file) | |
| return temp_file | |
| def parse(self, file_path: Union[str, Path]) -> ParsedDocument: | |
| """ | |
| Parse a document file and return structured content | |
| Args: | |
| file_path: Path to the document file | |
| Returns: | |
| ParsedDocument object containing parsed content and metadata | |
| Raises: | |
| UnsupportedFormatError: If the file format is not supported | |
| ParseError: If parsing fails | |
| """ | |
| try: | |
| # Validate and prepare file | |
| temp_file = self._validate_and_copy_file(file_path) | |
| # Get file metadata | |
| stats = temp_file.stat() | |
| mime_type = magic.from_file(str(temp_file), mime=True) | |
| metadata = DocumentMetadata( | |
| filename=Path(file_path).name, # Use original filename | |
| file_type=self.SUPPORTED_FORMATS[mime_type], | |
| size_bytes=stats.st_size, | |
| created_at=datetime.fromtimestamp(stats.st_ctime), | |
| modified_at=datetime.fromtimestamp(stats.st_mtime), | |
| mime_type=mime_type | |
| ) | |
| try: | |
| # Parse document using Docling | |
| result = self.converter.convert(str(temp_file)) | |
| doc = result.document | |
| # Extract content using proper methods | |
| try: | |
| content = doc.export_to_text() | |
| except Exception as e: | |
| raise ParseError(f"Failed to extract text content: {str(e)}") | |
| # Extract structured content | |
| structured_content = { | |
| 'sections': doc.sections if hasattr(doc, 'sections') else [], | |
| 'paragraphs': doc.paragraphs if hasattr(doc, 'paragraphs') else [], | |
| 'entities': doc.entities if hasattr(doc, 'entities') else {}, | |
| 'metadata': doc.metadata if hasattr(doc, 'metadata') else {} | |
| } | |
| # Get raw text if available | |
| try: | |
| raw_text = doc.export_to_text(include_layout=True) | |
| except: | |
| raw_text = content | |
| # Update metadata with document-specific information | |
| if hasattr(doc, 'metadata') and doc.metadata: | |
| metadata.title = doc.metadata.get('title') | |
| metadata.author = doc.metadata.get('author') | |
| metadata.pages = doc.metadata.get('pages') | |
| metadata.extra.update(doc.metadata) | |
| return ParsedDocument( | |
| content=content, | |
| metadata=metadata, | |
| raw_text=raw_text, | |
| structured_content=structured_content, | |
| confidence_score=getattr(doc, 'confidence', 1.0) | |
| ) | |
| except Exception as e: | |
| raise ParseError(f"Failed to parse document: {str(e)}") | |
| except Exception as e: | |
| raise ParseError(str(e)) | |
| finally: | |
| # Cleanup temporary files | |
| if 'temp_file' in locals() and temp_file.exists(): | |
| try: | |
| temp_file.unlink() | |
| except: | |
| pass | |
| def supports_format(self, mime_type: str) -> bool: | |
| """Check if a given MIME type is supported""" | |
| return mime_type in self.SUPPORTED_FORMATS |