Source code for docviz.lib.functions

import asyncio
import concurrent.futures
import tempfile
from collections.abc import AsyncIterator, Callable, Iterator
from pathlib import Path
from typing import TYPE_CHECKING, Any

from docviz.config_cache import (
    get_default_detection_config,
    get_default_extraction_config,
    get_default_llm_config,
    get_default_ocr_config,
)
from docviz.lib.extraction import pipeline, pipeline_streaming
from docviz.logging import get_logger
from docviz.types import (
    DetectionConfig,
    ExtractionConfig,
    ExtractionEntry,
    ExtractionResult,
    ExtractionType,
    LLMConfig,
    OCRConfig,
)

if TYPE_CHECKING:
    from .document import Document


logger = get_logger(__name__)


[docs] def _convert_pipeline_results_to_extraction_result( pipeline_results: list[dict[str, Any]], ) -> ExtractionResult: """Convert pipeline results to ExtractionResult format. This function transforms the raw pipeline output into a standardized ExtractionResult format. It handles type mapping, coordinate conversion, and text content extraction for different element types. Args: pipeline_results: List of page results from pipeline function. Each page result should contain a "page_number" key and an "elements" list. Each element should have "type", "bbox", "text", "confidence", and optionally "summary" keys. Returns: ExtractionResult: Standardized extraction result containing all converted entries with proper type annotations and coordinate formats. Example: >>> pipeline_results = [{ ... "page_number": 1, ... "elements": [{ ... "type": "chart", ... "bbox": [100, 200, 300, 400], ... "text": "Chart title", ... "summary": "Bar chart showing sales data", ... "confidence": 0.95 ... }] ... }] >>> result = _convert_pipeline_results_to_extraction_result(pipeline_results) >>> result.entries[0].class_ == "figure" True """ entries = [] page_number = 1 # Default page number if no results for page_result in pipeline_results: page_number = page_result.get("page_number", 1) elements = page_result.get("elements", []) for element in elements: # Map element types to canonical names element_type = element.get("type", "other") if element_type == "chart": element_type = "figure" elif element_type == "formula": element_type = "equation" # Extract bbox and ensure it's a list bbox = element.get("bbox", []) if isinstance(bbox, tuple): bbox = list(bbox) # For chart elements, use summary as text text_content = element.get("text", "") if element_type == "figure" and "summary" in element: text_content = element.get("summary", "") entry = ExtractionEntry( text=text_content, class_=element_type, confidence=element.get("confidence", 1.0), bbox=bbox, page_number=page_number, ) entries.append(entry) return ExtractionResult(entries=entries, page_number=page_number)
[docs] def batch_extract( documents: list["Document"], extraction_config: ExtractionConfig | None = None, detection_config: DetectionConfig | None = None, includes: list[ExtractionType] | None = None, progress_callback: Callable[[int], None] | None = None, ) -> list[ExtractionResult]: """Extract content from multiple documents in batch. This function processes multiple documents sequentially using the same configuration settings. It's designed for bulk document processing scenarios where you need to extract content from a collection of documents with consistent settings. Performance considerations: - Documents are processed sequentially, not in parallel - Memory usage scales with the number of documents and their sizes - Progress tracking is available for long-running batch operations - Each document is processed independently, so failures don't affect other documents Args: documents: List of Document objects to process. Each document should be a valid Document instance with an accessible file path. extraction_config: Configuration for extraction. If None, default settings will be used for all documents. detection_config: Configuration for detection. If None, default settings will be used for all documents. includes: Types of content to include in extraction. If None, all content types will be extracted. Use ExtractionType.ALL for all types or specify individual types like [ExtractionType.TABLE, ExtractionType.TEXT]. progress_callback: Optional callback function for progress tracking. The callback receives the current document index (1-based) as its argument. Useful for updating progress bars or logging in user interfaces. Returns: list[ExtractionResult]: List of extraction results, one for each input document. The order of results matches the order of input documents. Each result contains all extracted content for that document. Example: >>> docs = [Document("doc1.pdf"), Document("doc2.pdf")] >>> results = batch_extract( ... documents=docs, ... includes=[ExtractionType.TABLE, ExtractionType.TEXT], ... progress_callback=lambda i: print(f"Processing document {i}") ... ) >>> len(results) == 2 True """ results = [] for i, document in enumerate(documents): result = extract_content_sync( document, extraction_config, detection_config, includes, progress_callback ) results.append(result) if progress_callback: progress_callback(i + 1) return results
[docs] async def extract_content( document: "Document", extraction_config: ExtractionConfig | None = None, detection_config: DetectionConfig | None = None, includes: list[ExtractionType] | None = None, progress_callback: Callable[[int], None] | None = None, ocr_config: OCRConfig | None = None, llm_config: LLMConfig | None = None, ) -> ExtractionResult: """Extract content from a document asynchronously. This is the primary async function for document content extraction. It processes the entire document and returns all extracted content in a single result. The function runs the synchronous extraction pipeline in a thread pool to provide async behavior while maintaining compatibility with the underlying processing pipeline. The function automatically sets up default configurations if none are provided: - DetectionConfig: Uses CPU device with 1024 image size and 0.5 confidence threshold - OCRConfig: English language with chart labels for pictures, tables, and formulas - LLMConfig: Uses Gemma3 model with local Ollama server - ExtractionConfig: Uses default extraction settings Processing workflow: 1. Validates and sets up default configurations 2. Creates a temporary directory for processing artifacts 3. Runs the extraction pipeline in a thread pool 4. Converts pipeline results to standardized format 5. Cleans up temporary files automatically Args: document: Document object to extract content from. Must have a valid file path accessible to the current process. extraction_config: Configuration for extraction process. If None, uses default settings optimized for general document processing. detection_config: Configuration for layout detection. If None, uses CPU-based detection with balanced speed/accuracy settings. includes: List of content types to extract. If None, extracts all available content types. Use ExtractionType.ALL for all types or specify individual types like [ExtractionType.TABLE, ExtractionType.TEXT]. progress_callback: Optional callback for progress tracking. Called with current page number during processing. Useful for UI progress updates. ocr_config: Configuration for OCR processing. If None, uses English language with optimized settings for document analysis. llm_config: Configuration for LLM-based content analysis. If None, uses local Gemma3 model via Ollama server. Returns: ExtractionResult: Complete extraction result containing all extracted content from the document, organized by page and content type. Raises: Exception: If document processing fails, file access issues, or pipeline errors occur. The specific exception depends on the failure point. Example: >>> doc = Document("document.pdf") >>> result = await extract_content( ... document=doc, ... includes=[ExtractionType.TABLE, ExtractionType.TEXT], ... progress_callback=lambda page: print(f"Processing page {page}") ... ) >>> print(f"Extracted {len(result.entries)} elements") """ if extraction_config is None: extraction_config = get_default_extraction_config() if detection_config is None: detection_config = get_default_detection_config() if ocr_config is None: ocr_config = get_default_ocr_config(include_formulas=True) if llm_config is None: llm_config = get_default_llm_config() if includes is None: includes = ExtractionType.get_all() # Handle ExtractionType.ALL if ExtractionType.ALL in includes: includes = ExtractionType.get_all() # Run the sync pipeline in an executor for async behavior def _run_sync_pipeline(): return extract_content_sync( document, extraction_config, detection_config, includes, progress_callback, ocr_config, llm_config, ) loop = asyncio.get_event_loop() with concurrent.futures.ThreadPoolExecutor() as executor: return await loop.run_in_executor(executor, _run_sync_pipeline)
[docs] def extract_content_sync( document: "Document", extraction_config: ExtractionConfig | None = None, detection_config: DetectionConfig | None = None, includes: list[ExtractionType] | None = None, progress_callback: Callable[[int], None] | None = None, ocr_config: OCRConfig | None = None, llm_config: LLMConfig | None = None, ) -> ExtractionResult: """Extract content from a document synchronously. This is the core synchronous function for document content extraction. It processes the entire document in the current thread and returns all extracted content in a single result. This function is the foundation for both sync and async extraction workflows. The function automatically sets up default configurations if none are provided: - DetectionConfig: Uses CPU device with 1024 image size and 0.5 confidence threshold - OCRConfig: English language with chart labels for pictures, tables, and formulas - LLMConfig: Uses Gemma3 model with local Ollama server - ExtractionConfig: Uses default extraction settings Processing workflow: 1. Validates and sets up default configurations 2. Creates a temporary directory for processing artifacts 3. Runs the extraction pipeline synchronously 4. Converts pipeline results to standardized format 5. Cleans up temporary files automatically Memory and performance considerations: - Processes the entire document in memory - Uses temporary files for intermediate processing steps - Automatically cleans up temporary files on completion - Suitable for documents up to several hundred pages Args: document: Document object to extract content from. Must have a valid file path accessible to the current process. extraction_config: Configuration for extraction process. If None, uses default settings optimized for general document processing. detection_config: Configuration for layout detection. If None, uses CPU-based detection with balanced speed/accuracy settings. includes: List of content types to extract. If None, extracts all available content types. Use ExtractionType.ALL for all types or specify individual types like [ExtractionType.TABLE, ExtractionType.TEXT]. progress_callback: Optional callback for progress tracking. Called with current page number during processing. Useful for UI progress updates. ocr_config: Configuration for OCR processing. If None, uses English language with optimized settings for document analysis. llm_config: Configuration for LLM-based content analysis. If None, uses local Gemma3 model via Ollama server. Returns: ExtractionResult: Complete extraction result containing all extracted content from the document, organized by page and content type. Raises: Exception: If document processing fails, file access issues, or pipeline errors occur. The specific exception depends on the failure point. Example: >>> doc = Document("document.pdf") >>> result = extract_content_sync( ... document=doc, ... includes=[ExtractionType.TABLE, ExtractionType.TEXT], ... progress_callback=lambda page: print(f"Processing page {page}") ... ) >>> print(f"Extracted {len(result.entries)} elements") """ if extraction_config is None: extraction_config = get_default_extraction_config() if detection_config is None: detection_config = get_default_detection_config() if ocr_config is None: ocr_config = get_default_ocr_config(include_formulas=False) if llm_config is None: llm_config = get_default_llm_config() if includes is None: includes = ExtractionType.get_all() # Handle ExtractionType.ALL if ExtractionType.ALL in includes: includes = ExtractionType.get_all() try: # Create temporary output directory with tempfile.TemporaryDirectory() as temp_dir: pipeline_results = pipeline( document_path=document.file_path, output_dir=Path(temp_dir), detection_config=detection_config, extraction_config=extraction_config, ocr_config=ocr_config, llm_config=llm_config, includes=includes, progress_callback=progress_callback, ) # Convert pipeline results to ExtractionResult return _convert_pipeline_results_to_extraction_result(pipeline_results) except Exception as e: # Log error and return empty result logger.error(f"Pipeline execution failed: {e}") return ExtractionResult(entries=[], page_number=0)
[docs] async def extract_content_streaming( document: "Document", extraction_config: ExtractionConfig | None = None, detection_config: DetectionConfig | None = None, includes: list[ExtractionType] | None = None, progress_callback: Callable[[int], None] | None = None, ocr_config: OCRConfig | None = None, llm_config: LLMConfig | None = None, ) -> AsyncIterator[ExtractionResult]: """Extract content from a document asynchronously with streaming results. This function provides memory-efficient streaming extraction by yielding results page by page as they are processed. It's ideal for large documents where loading all content into memory at once would be problematic. The function runs the synchronous streaming pipeline in a thread pool to provide async behavior while maintaining the memory efficiency of streaming processing. Each yielded result contains the extracted content for a single page. Key benefits: - Memory efficient: Only one page is processed at a time - Real-time results: Pages are yielded as soon as they're processed - Progress tracking: Can track progress on a per-page basis - Scalable: Suitable for documents of any size Processing workflow: 1. Sets up default configurations if none provided 2. Creates temporary directory for processing artifacts 3. Runs streaming pipeline in thread pool 4. Yields page results as they become available 5. Cleans up temporary files on completion Args: document: Document object to extract content from. Must have a valid file path accessible to the current process. extraction_config: Configuration for extraction process. If None, uses default settings optimized for general document processing. detection_config: Configuration for layout detection. If None, uses CPU-based detection with balanced speed/accuracy settings. includes: List of content types to extract. If None, extracts all available content types. Use ExtractionType.ALL for all types or specify individual types like [ExtractionType.TABLE, ExtractionType.TEXT]. progress_callback: Optional callback for progress tracking. Called with current page number during processing. Useful for UI progress updates. ocr_config: Configuration for OCR processing. If None, uses English language with optimized settings for document analysis. llm_config: Configuration for LLM-based content analysis. If None, uses local Gemma3 model via Ollama server. Yields: ExtractionResult: Extraction result for each processed page. Each result contains all extracted content for that specific page. Raises: Exception: If document processing fails, file access issues, or pipeline errors occur. The specific exception depends on the failure point. Example: >>> doc = Document("large_document.pdf") >>> async for page_result in extract_content_streaming(doc): ... print(f"Page {page_result.page_number}: {len(page_result.entries)} elements") ... # Process each page as it becomes available """ # Run the sync version in a separate thread loop = asyncio.get_event_loop() def _get_streaming_sync_generator(): return extract_content_streaming_sync( document, extraction_config, detection_config, includes, progress_callback, ocr_config, llm_config, ) # Get the generator from executor with concurrent.futures.ThreadPoolExecutor() as executor: generator = await loop.run_in_executor(executor, _get_streaming_sync_generator) # Yield each result from the generator while True: try: # Get next result from generator in executor result = await loop.run_in_executor(executor, next, generator, None) if result is None: break yield result except StopIteration: break
[docs] def extract_content_streaming_sync( document: "Document", extraction_config: ExtractionConfig | None = None, detection_config: DetectionConfig | None = None, includes: list[ExtractionType] | None = None, progress_callback: Callable[[int], None] | None = None, ocr_config: OCRConfig | None = None, llm_config: LLMConfig | None = None, ) -> Iterator[ExtractionResult]: """Extract content from a document synchronously with streaming results. This function provides memory-efficient streaming extraction by yielding results page by page as they are processed. It's the core synchronous implementation that powers both sync and async streaming workflows. The function processes pages one at a time and yields results immediately upon completion of each page. This approach is ideal for large documents where loading all content into memory at once would be problematic or when you need to start processing results before the entire document is complete. Key benefits: - Memory efficient: Only one page is processed at a time - Real-time results: Pages are yielded as soon as they're processed - Progress tracking: Can track progress on a per-page basis - Scalable: Suitable for documents of any size - Synchronous: No async/await complexity for simple use cases Processing workflow: 1. Sets up default configurations if none provided 2. Creates temporary directory for processing artifacts 3. Runs streaming pipeline synchronously 4. Yields page results as they become available 5. Cleans up temporary files on completion Args: document: Document object to extract content from. Must have a valid file path accessible to the current process. extraction_config: Configuration for extraction process. If None, uses default settings optimized for general document processing. detection_config: Configuration for layout detection. If None, uses CPU-based detection with balanced speed/accuracy settings. includes: List of content types to extract. If None, extracts all available content types. Use ExtractionType.ALL for all types or specify individual types like [ExtractionType.TABLE, ExtractionType.TEXT]. progress_callback: Optional callback for progress tracking. Called with current page number during processing. Useful for UI progress updates. ocr_config: Configuration for OCR processing. If None, uses English language with optimized settings for document analysis. llm_config: Configuration for LLM-based content analysis. If None, uses local Gemma3 model via Ollama server. Yields: ExtractionResult: Extraction result for each processed page. Each result contains all extracted content for that specific page. Raises: Exception: If document processing fails, file access issues, or pipeline errors occur. The specific exception depends on the failure point. Example: >>> doc = Document("large_document.pdf") >>> for page_result in extract_content_streaming_sync(doc): ... print(f"Page {page_result.page_number}: {len(page_result.entries)} elements") ... # Process each page as it becomes available """ if extraction_config is None: extraction_config = get_default_extraction_config() if detection_config is None: detection_config = get_default_detection_config() if ocr_config is None: ocr_config = get_default_ocr_config(include_formulas=False) if llm_config is None: llm_config = get_default_llm_config() if includes is None: includes = ExtractionType.get_all() # Handle ExtractionType.ALL if ExtractionType.ALL in includes: includes = ExtractionType.get_all() try: # Create temporary output directory with tempfile.TemporaryDirectory() as temp_dir: # Use the streaming pipeline generator pipeline_generator = pipeline_streaming( document_path=document.file_path, output_dir=Path(temp_dir), detection_config=detection_config, extraction_config=extraction_config, ocr_config=ocr_config, llm_config=llm_config, includes=includes, progress_callback=progress_callback, ) # Yield converted results one by one for page_result in pipeline_generator: # Convert single page result to ExtractionResult extraction_result = _convert_pipeline_results_to_extraction_result([page_result]) yield extraction_result except Exception as e: # Log error and return empty result logger.error(f"Streaming pipeline execution failed: {e}") yield ExtractionResult(entries=[], page_number=0)