Browse Source

[recipe] Add DocumentLens Recipe for structured parsing of rich documents (#980)

Suraj Subramanian 3 weeks ago
parent
commit
58bd2b51b4

+ 8 - 0
end-to-end-use-cases/structured_parser/.gitignore

@@ -0,0 +1,8 @@
+
+*.png
+*.log
+src_v2
+*.pdf
+*.json
+*.db
+*.csv

+ 134 - 0
end-to-end-use-cases/structured_parser/CONTRIBUTING.md

@@ -0,0 +1,134 @@
+# Contributing to Structured Document Parser
+
+Thank you for your interest in contributing to the Structured Document Parser! This document provides guidelines and instructions for contributors.
+
+## Development Setup
+
+1. Fork the repository and clone it locally.
+2. Install dependencies:
+   ```bash
+   pip install -r requirements.txt
+   ```
+3. Install development dependencies:
+   ```bash
+   pip install pytest black flake8 mypy
+   ```
+
+## Project Structure
+
+```
+structured_parser/
+├── src/
+│   ├── structured_extraction.py  # Main entry point
+│   ├── utils.py                  # Utility functions
+│   ├── typedicts.py              # Type definitions
+│   ├── json_to_sql.py            # Database integration
+│   └── config.yaml               # Configuration
+├── tests/                        # Test cases
+├── README.md                     # Project overview
+└── requirements.txt              # Dependencies
+```
+
+## Development Workflow
+
+### Code Style
+
+We follow the PEP 8 style guide for Python code. Please use `black` for formatting:
+
+```bash
+black src/
+```
+
+### Type Checking
+
+We use type hints and `mypy` for type checking:
+
+```bash
+mypy src/
+```
+
+### Testing
+
+Please add tests for new features and ensure all tests pass:
+
+```bash
+pytest tests/
+```
+
+## Areas for Contribution
+
+Here are some areas where contributions are especially welcome:
+
+### 1. Artifact Extraction Improvements
+
+- Adding support for new artifact types
+- Improving extraction accuracy for existing types
+- Optimizing prompts for better results
+
+### 2. Performance Optimization
+
+- Improving inference speed
+- Reducing memory usage
+- Implementing efficient batching strategies
+
+### 3. New Features
+
+- Supporting additional document types (beyond PDF)
+- Adding new output formats
+- Implementing document comparison functionality
+- Enhancing vector search capabilities
+
+### 4. Documentation and Examples
+
+- Improving documentation
+- Adding usage examples
+- Creating tutorials or guides
+
+## Submitting Changes
+
+1. Create a new branch for your changes
+2. Make your changes and commit with clear commit messages
+3. Push your branch and submit a pull request
+4. Ensure CI tests pass
+
+## Pull Request Guidelines
+
+- Provide a clear description of the problem and solution
+- Include any relevant issue numbers
+- Add tests for new functionality
+- Update documentation as needed
+- Keep pull requests focused on a single topic
+
+## Prompt Engineering Guidelines
+
+When modifying prompts in `config.yaml`, consider:
+
+1. **Clarity**: Provide clear and specific instructions
+2. **Examples**: Include examples where helpful
+3. **Structure**: Use structured formatting to guide the model
+4. **Schema alignment**: Ensure prompts align with output schemas
+5. **Testing**: Test prompts with diverse document types
+
+## Output Schema Guidelines
+
+When defining output schemas:
+
+1. Keep properties focused and well-defined
+2. Use descriptive field names
+3. Include descriptions for complex fields
+4. Consider required vs. optional fields carefully
+5. Test schemas with different document layouts
+
+## License
+
+By contributing, you agree that your contributions will be licensed under the project's license.
+
+## Questions or Issues?
+
+If you have questions or encounter issues, please:
+
+1. Check existing issues to see if it's been addressed
+2. Open a new issue with a clear description and steps to reproduce
+3. Tag relevant project maintainers
+
+Thank you for your contributions!

+ 232 - 0
end-to-end-use-cases/structured_parser/GETTING_STARTED.md

@@ -0,0 +1,232 @@
+# Getting Started with Structured Document Parser
+
+This guide walks you through setting up and using the Structured Document Parser tool to extract text, tables, and images from PDF documents.
+
+## Setup
+
+### 1. Install Dependencies
+
+```bash
+pip install -r requirements.txt
+```
+
+### 2. Configure the Tool
+
+Edit the `src/config.yaml` file to configure the tool:
+
+```yaml
+# Choose your inference backend
+model:
+  backend: openai-compat  # Use "offline-vllm" for local inference
+
+  # If using openai-compat
+  base_url: "https://api.llama.com/compat/v1"
+  api_key: "YOUR_API_KEY"
+  model_id: "Llama-4-Maverick-17B-128E-Instruct-FP8"  # Or your preferred model
+```
+
+## Basic Usage Examples
+
+### Extract Text from a PDF
+
+```bash
+python src/structured_extraction.py path/to/document.pdf --text
+```
+
+This will:
+1. Convert each PDF page to an image
+2. Run LLM inference to extract text
+3. Save extracted text as JSON in the `extracted` directory
+
+### Extract Text and Tables
+
+```bash
+python src/structured_extraction.py path/to/document.pdf --text --tables
+```
+
+### Extract All Types of Content
+
+```bash
+python src/structured_extraction.py path/to/document.pdf --text --tables --images
+```
+
+### Process Multiple PDFs
+
+```bash
+python src/structured_extraction.py path/to/pdf_directory --text --tables
+```
+
+## Working with Extraction Results
+
+### Export Tables to CSV
+
+```bash
+python src/structured_extraction.py path/to/document.pdf --tables --save_tables_as_csv
+```
+
+Tables will be saved as individual CSV files in `extracted/tables_TIMESTAMP/`.
+
+### Export Tables to Excel
+
+```bash
+python src/structured_extraction.py path/to/document.pdf --tables --export_excel
+```
+
+Tables will be combined into a single Excel file with multiple sheets.
+
+### Save to Database
+
+```bash
+python src/structured_extraction.py path/to/document.pdf --text --tables --save_to_db
+```
+
+Extracted content will be stored in an SQLite database for structured querying.
+
+## Python API Examples
+
+### Extract Content Programmatically
+
+```python
+from src.structured_extraction import ArtifactExtractor
+from src.utils import PDFUtils
+
+# Extract pages from a PDF
+pdf_pages = PDFUtils.extract_pages("document.pdf")
+
+# Process each page
+for page in pdf_pages:
+    # Extract text
+    text_artifacts = ArtifactExtractor.from_image(
+        page["image_path"], ["text"]
+    )
+
+    # Or extract multiple artifact types
+    all_artifacts = ArtifactExtractor.from_image(
+        page["image_path"], ["text", "tables", "images"]
+    )
+
+    # Process the extracted artifacts
+    print(all_artifacts)
+```
+
+### Query the Database
+
+```python
+from src.json_to_sql import DatabaseManager
+
+# Query all text artifacts
+text_df = DatabaseManager.sql_query(
+    "sqlite3.db",
+    "SELECT * FROM document_artifacts WHERE artifact_type = 'text'"
+)
+
+# Query tables containing specific content
+revenue_tables = DatabaseManager.sql_query(
+    "sqlite3.db",
+    "SELECT * FROM document_artifacts WHERE artifact_type = 'table' AND table_info LIKE '%revenue%'"
+)
+```
+
+### Semantic Search
+
+```python
+from src.json_to_sql import VectorIndexManager
+
+# Search for relevant content
+results = VectorIndexManager.knn_query(
+    "What is the revenue growth for Q2?",
+    "chroma.db",
+    n_results=5
+)
+
+# Display results
+for i, (doc_id, distance, content) in enumerate(zip(
+    results['ids'], results['distances'], results['documents']
+)):
+    print(f"Result {i+1} (similarity: {1-distance:.2f}):")
+    print(content[:200] + "...\n")
+```
+
+## Customizing Extraction
+
+### Modify Prompts
+
+Edit the prompts in `src/config.yaml` to improve extraction for your specific document types:
+
+```yaml
+artifacts:
+  text:
+    prompts:
+      system: "You are an OCR expert. Your task is to extract all text sections..."
+      user: "TARGET SCHEMA:\n```json\n{schema}\n```"
+```
+
+### Add a Custom Artifact Type
+
+1. Add configuration to `src/config.yaml`:
+
+```yaml
+artifacts:
+  my_custom_type:
+    prompts:
+      system: "Your custom system prompt..."
+      user: "Your custom user prompt with {schema} placeholder..."
+    output_schema: {
+      # Your schema definition here
+    }
+    use_json_decoding: true
+```
+
+2. Update the CLI in `src/structured_extraction.py`:
+
+```python
+def main(
+    target_path: str,
+    text: bool = True,
+    tables: bool = False,
+    images: bool = False,
+    my_custom_type: bool = False,  # Add your type here
+    save_to_db: bool = False,
+    ...
+):
+    # Update artifact types logic
+    to_extract = []
+    if text:
+        to_extract.append("text")
+    if tables:
+        to_extract.append("tables")
+    if images:
+        to_extract.append("images")
+    if my_custom_type:
+        to_extract.append("my_custom_type")  # Add your type here
+```
+
+## Troubleshooting
+
+### LLM Response Format Issues
+
+If the LLM responses aren't being correctly parsed, check:
+1. Your output schema in `config.yaml`
+2. The `use_json_decoding` setting (set to `true` for more reliable parsing)
+3. Consider using a larger model or reducing extraction complexity
+
+### Database Issues
+
+If you encounter database errors:
+1. Ensure SQLite is properly installed
+2. Check database file permissions
+3. Use `DatabaseManager.create_artifact_table()` to reinitialize the table schema
+
+### PDF Rendering Issues
+
+If PDF extraction quality is poor:
+1. Try adjusting the DPI setting in `PDFUtils.extract_pages()`
+2. For complex layouts, split extraction into smaller chunks (per section)
+3. Consider pre-processing PDFs with OCR tools for better text layer quality
+
+## Next Steps
+
+- Try extracting from different types of documents
+- Adjust prompts and schemas for your specific use cases
+- Explore the vector search capabilities for semantic document queries
+- Integrate with your existing document processing workflows

+ 221 - 0
end-to-end-use-cases/structured_parser/README.md

@@ -0,0 +1,221 @@
+# DocumentLens: Rich Document Parsing with LLMs
+
+A powerful, LLM-based tool for extracting structured data from rich documents (PDFs) with Llama models.
+
+## Overview
+
+This tool uses Llama models to extract text, tables, and images from PDFs, converting unstructured document data into structured, machine-readable formats. It supports:
+
+- **Text extraction**: Extract and structure main text, titles, captions, etc.
+- **Table extraction**: Convert complex tables into structured data formats
+- **Image extraction**: Extract images with contextual descriptions and captions
+- **Multiple output formats**: JSON, CSV, Excel, and SQL database storage
+- **Vector search capabilities**: Semantic search across extracted content
+
+The tool is designed to handle complex documents with high accuracy and provides flexible configuration options to tailor extraction tasks to specific needs.
+
+## Installation
+
+### Prerequisites
+
+- Python 3.9+
+- [Optional] Local GPU for offline inference
+
+### Setup
+
+1. Clone the repository
+2. Install dependencies:
+
+```bash
+pip install -r requirements.txt
+```
+
+3. Configure the tool (see Configuration section)
+
+## Quick Start
+
+Extract text from a PDF:
+
+```bash
+python src/structured_extraction.py path/to/document.pdf --text
+```
+
+Extract text and tables, and save tables as CSV files:
+
+```bash
+python src/structured_extraction.py path/to/document.pdf --text --tables --save_tables_as_csv
+```
+
+Process a directory of PDFs and export tables to Excel:
+
+```bash
+python src/structured_extraction.py path/to/pdf_directory --text --tables --export_excel
+```
+
+## Configuration
+
+The tool is configured via `config.yaml`. Key configuration options include:
+
+### Model Configuration
+
+```yaml
+model:
+  backend: openai-compat  # [offline-vllm, openai-compat]
+
+  # For openai-compat
+  base_url: "https://api.llama.com/compat/v1"
+  api_key: "YOUR_API_KEY"
+  model_id: "Llama-4-Maverick-17B-128E-Instruct-FP8"
+
+  # For offline-vllm
+  path: "/path/to/checkpoint"
+  tensor_parallel_size: 4
+  max_model_len: 32000
+  max_num_seqs: 32
+```
+
+### Inference Parameters
+
+```yaml
+extraction_inference:
+  temperature: 0.2
+  top_p: 0.9
+  max_completion_tokens: 17000
+  seed: 42
+```
+
+### Artifact Configuration
+
+The tool includes configurable prompts and output schemas for each artifact type (text, tables, images). These can be modified in the `config.yaml` file.
+
+## Architecture
+
+### Core Components
+
+1. **RequestBuilder**: Builds inference requests for LLMs
+2. **ArtifactExtractor**: Extracts structured data from documents
+3. **DatabaseManager**: Manages SQL database operations
+4. **VectorIndexManager**: Handles vector indexing and search
+
+### Data Flow
+
+1. PDFs are converted to images (one per page)
+2. Images are processed by the LLM to extract structured data
+3. Structured data is saved in various formats (JSON, CSV, SQL, etc.)
+4. Optional vector indexing for semantic search capabilities
+
+## Extending the Tool
+
+### Adding New Artifact Types
+
+1. Add a new artifact type configuration in `config.yaml`:
+
+```yaml
+artifacts:
+  my_new_artifact:
+    prompts:
+      system: "Your system prompt here..."
+      user: "Your user prompt with {schema} placeholder..."
+    output_schema: {
+      # Your JSON schema here
+    }
+    use_json_decoding: true
+```
+
+2. Update the command-line interface in `structured_extraction.py` to include your new artifact type.
+
+### Customizing Extraction Logic
+
+The extraction logic is modular and can be customized by:
+
+1. Modifying prompts in the `config.yaml` file
+2. Adjusting output schemas to capture different data structures
+3. Extending the `ArtifactExtractor` class for specialized extraction needs
+
+### Using Different Models
+
+The tool supports two backends:
+
+1. **openai-compat**: Any API compatible with the OpenAI API format (including Llama API)
+2. **offline-vllm**: Local inference using VLLM for self-hosted deployments
+
+## Database Integration
+
+### SQL Database
+
+The tool can store extracted data in an SQLite database:
+
+```bash
+python src/structured_extraction.py path/to/document.pdf --text --tables --save_to_db
+```
+
+### Vector Search
+
+When `save_to_db` is enabled and a vector database path is configured, the tool also indexes extracted content for semantic search:
+
+```python
+from src.json_to_sql import VectorIndexManager
+
+# Search for relevant content
+results = VectorIndexManager.knn_query("What is the revenue growth?", "chroma.db")
+```
+
+## Best Practices
+
+1. **Model Selection**: Use larger models for complex documents or when high accuracy is required
+2. **Prompt Engineering**: Adjust prompts in `config.yaml` for your specific document types
+3. **Output Schema**: Define precise schemas to guide the model's extraction process
+4. **Batch Processing**: Use directory processing for efficiently handling multiple documents
+5. **Performance Tuning**: Adjust inference parameters based on your accuracy vs. speed requirements
+
+## Limitations
+
+- PDF rendering quality affects extraction accuracy
+- Complex multi-column layouts may require specialized prompts
+- Very large tables might be truncated due to token limitations
+
+## Advanced Use Cases
+
+### Custom Processing Pipelines
+
+The tool's components can be used programmatically for custom pipelines:
+
+```python
+from src.structured_extraction import ArtifactExtractor
+from src.utils import PDFUtils
+
+# Extract pages from PDF
+pages = PDFUtils.extract_pages("document.pdf")
+
+# Process specific pages
+for page in pages[10:20]:  # Process pages 10-19
+    artifacts = ArtifactExtractor.from_image(page["image_path"], ["text", "tables"])
+    # Custom processing of artifacts...
+```
+
+### Export to Other Systems
+
+Extracted data can be exported to various systems:
+
+- **SQL databases**: Using `flatten_json_to_sql`
+- **CSV files**: Using `json_to_csv`
+- **Excel workbooks**: Using `export_csvs_to_excel_tabs`
+
+## Troubleshooting
+
+- **Model capacity errors**: Reduce max tokens or use a larger model
+- **Extraction quality issues**: Adjust prompts or output schemas
+- **Performance issues**: Use batch processing or adjust tensor parallelism
+
+## Contributing
+
+Contributions to improve the tool are welcome! Areas for improvement include:
+
+- Additional output formats
+- Improved table extraction for complex layouts
+- Support for more document types beyond PDFs
+- Optimization for specific document domains
+
+## License
+
+[License information here]

+ 23 - 0
end-to-end-use-cases/structured_parser/requirements.txt

@@ -0,0 +1,23 @@
+# Core dependencies
+pandas>=1.5.0
+numpy>=1.22.0
+pyyaml>=6.0
+tqdm>=4.64.0
+fire>=0.5.0
+
+# PDF and image processing
+pymupdf>=1.21.0
+Pillow>=9.3.0
+
+# LLM inference
+vllm>=0.2.0
+openai>=1.0.0
+
+# Database and vector search
+sqlite3>=3.35.0
+chromadb>=0.4.0
+sqlalchemy>=2.0.0
+
+# Output processing
+xlsxwriter>=3.0.0
+openpyxl>=3.0.10

+ 38 - 0
end-to-end-use-cases/structured_parser/src/__init__.py

@@ -0,0 +1,38 @@
+"""
+Structured Parser Package
+
+This package provides tools for extracting structured data from documents,
+particularly PDFs, using LLMs. It includes functionality for:
+
+1. Extracting text, tables, and images from PDFs
+2. Converting extracted data to SQL database entries
+3. Creating vector embeddings for semantic search
+"""
+
+from .json_to_sql import DatabaseManager, flatten_json_to_sql, VectorIndexManager
+from .structured_extraction import (
+    ArtifactExtractor,
+    main as extract_artifacts,
+    RequestBuilder,
+)
+from .utils import config, ImageUtils, InferenceUtils, JSONUtils, load_config, PDFUtils
+
+__all__ = [
+    # Main extraction functionality
+    "ArtifactExtractor",
+    "RequestBuilder",
+    "extract_artifacts",
+    # Database functionality
+    "DatabaseManager",
+    "VectorIndexManager",
+    "flatten_json_to_sql",
+    "sql_query",
+    # Utility classes
+    "ImageUtils",
+    "JSONUtils",
+    "PDFUtils",
+    "InferenceUtils",
+    # Configuration
+    "config",
+    "load_config",
+]

File diff suppressed because it is too large
+ 128 - 0
end-to-end-use-cases/structured_parser/src/config.yaml


+ 584 - 0
end-to-end-use-cases/structured_parser/src/json_to_sql.py

@@ -0,0 +1,584 @@
+"""
+JSON to SQL conversion module for structured document data.
+
+This module provides functionality to convert structured JSON data extracted from
+documents into SQLite database entries and vector embeddings for semantic search.
+"""
+
+import json
+import logging
+import os
+import sqlite3
+from typing import Any, Dict, List, Optional, Tuple
+
+import chromadb
+import pandas as pd
+from chromadb.config import Settings
+
+from utils import config, InferenceUtils, JSONUtils
+
+
+def setup_logger(logfile, verbose=False):
+    # Create a logger
+    logger = logging.getLogger(__name__)
+    logger.setLevel(logging.DEBUG)
+
+    # Create a file handler
+    file_handler = logging.FileHandler(logfile)
+    file_handler.setLevel(logging.DEBUG)
+
+    # Create a formatter and set it for the file handler
+    formatter = logging.Formatter(
+        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+    )
+    file_handler.setFormatter(formatter)
+
+    # Add the file handler to the logger
+    logger.addHandler(file_handler)
+
+    # If verbose, also add a console handler
+    if verbose:
+        console_handler = logging.StreamHandler()
+        console_handler.setLevel(logging.DEBUG)
+        console_handler.setFormatter(formatter)
+        logger.addHandler(console_handler)
+
+    return logger
+
+
+logger = logging.getLogger(__name__)
+
+
+class DatabaseManager:
+    """Manager for database operations."""
+
+    @staticmethod
+    def validate_path(path: str) -> str:
+        """
+        Validate that a file path exists or can be created.
+
+        Args:
+            path: Path to validate
+
+        Returns:
+            Validated path
+
+        Raises:
+            ValueError: If path is invalid
+        """
+        if not path:
+            raise ValueError("Database path cannot be empty")
+
+        # Ensure directory exists
+        directory = os.path.dirname(path)
+        if directory and not os.path.exists(directory):
+            try:
+                os.makedirs(directory, exist_ok=True)
+            except OSError as e:
+                raise ValueError(f"Cannot create directory for database: {e}")
+
+        return path
+
+    @staticmethod
+    def create_artifact_table(sql_db_path: str) -> None:
+        """
+        Create the SQL table schema for storing document artifacts.
+
+        Args:
+            sql_db_path: Path to the SQLite database file
+
+        Raises:
+            sqlite3.Error: If there's an error creating the table
+        """
+        sql_db_path = DatabaseManager.validate_path(sql_db_path)
+
+        try:
+            with sqlite3.connect(sql_db_path) as conn:
+                cursor = conn.cursor()
+
+                # Drop table if it exists
+                cursor.execute("DROP TABLE IF EXISTS document_artifacts")
+
+                # Create table with schema
+                cursor.execute("""
+                CREATE TABLE IF NOT EXISTS document_artifacts (
+                    id INTEGER PRIMARY KEY AUTOINCREMENT,
+                    doc_path TEXT,
+                    page_num INTEGER,
+                    artifact_type TEXT,  -- 'table', 'text', or 'image'
+
+                    -- Common metadata
+                    content_json TEXT,   -- JSON string of the artifact content
+
+                    -- Table specific fields
+                    table_info TEXT,
+
+                    -- Text specific fields
+                    text_content TEXT,
+                    text_notes TEXT,
+
+                    -- Image specific fields
+                    image_position_top TEXT,
+                    image_position_left TEXT,
+                    image_description TEXT,
+                    image_caption TEXT,
+                    image_type TEXT
+                )
+                """)
+
+                # Create indexes for common queries
+                cursor.execute(
+                    "CREATE INDEX IF NOT EXISTS idx_artifact_type ON document_artifacts(artifact_type)"
+                )
+                cursor.execute(
+                    "CREATE INDEX IF NOT EXISTS idx_doc_path ON document_artifacts(doc_path)"
+                )
+
+                conn.commit()
+        except sqlite3.Error as e:
+            print(f"Database error creating table: {e}")
+            raise
+
+    @staticmethod
+    def sql_query(db_path: str, query: str) -> pd.DataFrame:
+        """
+        Query the document artifacts table and return results as a DataFrame.
+
+        Args:
+            db_path: Path to the SQLite database file
+            query: SQL query to execute
+
+        Returns:
+            DataFrame containing query results
+
+        Raises:
+            ValueError: If query is empty or invalid
+            sqlite3.Error: If there's a database error
+        """
+        if not query or not query.strip():
+            raise ValueError("Query cannot be empty")
+
+        if not os.path.exists(db_path):
+            raise FileNotFoundError(f"Database file not found: {db_path}")
+
+        try:
+            with sqlite3.connect(db_path) as conn:
+                df = pd.read_sql_query(query, conn)
+            return df
+        except (sqlite3.Error, pd.io.sql.DatabaseError) as e:
+            print(f"Query error: {e}")
+            raise
+
+    @staticmethod
+    def export_db(db_path: str, export_path: str) -> None:
+        """
+        Export the document artifacts table to a CSV file.
+
+        Args:
+            db_path: Path to the SQLite database file
+            export_path: Path to the CSV file to export
+
+        Raises:
+            ValueError: If export path is invalid
+            sqlite3.Error: If there's a database error
+        """
+        if not export_path or not export_path.strip():
+            raise ValueError("Export path cannot be empty")
+
+        if not os.path.exists(db_path):
+            raise FileNotFoundError(f"Database file not found: {db_path}")
+
+        df = DatabaseManager.sql_query(db_path, "SELECT * FROM document_artifacts")
+        df.to_csv(export_path, index=False)
+
+
+class VectorIndexManager:
+    """Manager for vector index operations."""
+
+    @staticmethod
+    def write_to_index(
+        vector_db_path: str, document_ids: List[str], document_contents: List[str]
+    ) -> None:
+        """
+        Write document contents to a vector index for semantic search.
+
+        Args:
+            vector_db_path: Path to the vector database
+            document_ids: List of document IDs
+            document_contents: List of document contents to index
+
+        Raises:
+            ValueError: If inputs are invalid
+            RuntimeError: If there's an error writing to the index
+        """
+        if not document_ids or not document_contents:
+            print("No documents to index")
+            return
+
+        if len(document_ids) != len(document_contents):
+            raise ValueError(
+                "document_ids and document_contents must have the same length"
+            )
+
+        try:
+            client = chromadb.PersistentClient(
+                path=vector_db_path, settings=Settings(anonymized_telemetry=False)
+            )
+            collection = client.get_or_create_collection(name="structured_parser")
+            collection.add(
+                documents=document_contents,
+                ids=document_ids,
+            )
+            logger.info(f"Added {len(document_ids)} documents to vector index")
+        except Exception as e:
+            print(f"Error writing to vector index: {e}")
+            raise RuntimeError(f"Failed to write to vector index: {e}")
+
+    @staticmethod
+    def knn_query(
+        query_text: str, vector_db_path: str, n_results: int = 10
+    ) -> pd.DataFrame:
+        """
+        Perform a semantic search query on the vector index.
+
+        Args:
+            query_text: Text to search for
+            vector_db_path: Path to the vector database
+            n_results: Number of results to return
+
+        Returns:
+            DataFrame containing query results
+
+        Raises:
+            ValueError: If query is empty
+            FileNotFoundError: If vector database doesn't exist
+            RuntimeError: If there's an error querying the index
+        """
+        if not query_text or not query_text.strip():
+            raise ValueError("Query text cannot be empty")
+
+        if not os.path.exists(vector_db_path):
+            raise FileNotFoundError(f"Vector database not found: {vector_db_path}")
+
+        try:
+            client = chromadb.PersistentClient(
+                path=vector_db_path, settings=Settings(anonymized_telemetry=False)
+            )
+            collection = client.get_collection(name="structured_parser")
+
+            results = collection.query(query_texts=[query_text], n_results=n_results)
+            df = pd.DataFrame(
+                {k: results[k][0] for k in ["ids", "distances", "documents"]}
+            )
+            df["ids"] = df["ids"].apply(int)
+            return df
+        except Exception as e:
+            print(f"Vector query error: {e}")
+            raise RuntimeError(f"Failed to query vector index: {e}")
+
+
+class SQLProcessor:
+    """Processor for document data."""
+
+    @staticmethod
+    def process_text_artifact(
+        cursor: sqlite3.Cursor, doc_path: str, page_num: int, text: Dict[str, Any]
+    ) -> Optional[Tuple[str, str]]:
+        """
+        Process a text artifact and insert it into the database.
+
+        Args:
+            cursor: Database cursor
+            doc_path: Document path
+            page_num: Page number
+            text: Text artifact data
+
+        Returns:
+            Tuple of (document_id, indexable_content) or None if no content
+        """
+        if not text or not text.get("content"):
+            return None
+
+        cursor.execute(
+            """
+            INSERT INTO document_artifacts (
+                doc_path, page_num, artifact_type,
+                text_content, text_notes
+            ) VALUES (?, ?, ?, ?, ?)
+            """,
+            (
+                doc_path,
+                page_num,
+                "text",
+                text.get("content", ""),
+                text.get("notes", ""),
+            ),
+        )
+
+        # Prepare for vector indexing
+        indexable_content = text.get("content", "") + " | " + text.get("notes", "")
+        return str(cursor.lastrowid), indexable_content
+
+    @staticmethod
+    def process_image_artifact(
+        cursor: sqlite3.Cursor, doc_path: str, page_num: int, image: Dict[str, Any]
+    ) -> Tuple[str, str]:
+        """
+        Process an image artifact and insert it into the database.
+
+        Args:
+            cursor: Database cursor
+            doc_path: Document path
+            page_num: Page number
+            image: Image artifact data
+
+        Returns:
+            Tuple of (document_id, indexable_content)
+        """
+        # Skip empty tables
+        if not isinstance(image, dict):
+            return None
+
+        cursor.execute(
+            """
+            INSERT INTO document_artifacts (
+                doc_path, page_num, artifact_type,
+                image_position_top, image_position_left,
+                image_description, image_caption, image_type
+            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+            """,
+            (
+                doc_path,
+                page_num,
+                "image",
+                image.get("position_top", ""),
+                image.get("position_left", ""),
+                image.get("description", ""),
+                image.get("caption", ""),
+                image.get("image_type", ""),
+            ),
+        )
+
+        # Prepare for vector indexing
+        indexable_content = (
+            image.get("image_type", "")
+            + " | "
+            + image.get("description", "")
+            + " | "
+            + image.get("caption", "")
+        )
+        return str(cursor.lastrowid), indexable_content
+
+    @staticmethod
+    def process_table_artifact(
+        cursor: sqlite3.Cursor, doc_path: str, page_num: int, table: Dict[str, Any]
+    ) -> Optional[Tuple[str, str]]:
+        """
+        Process a table artifact and insert it into the database.
+
+        Args:
+            cursor: Database cursor
+            doc_path: Document path
+            page_num: Page number
+            table: Table artifact data
+
+        Returns:
+            Tuple of (document_id, indexable_content) or None if empty table
+        """
+        # Skip empty tables
+        if not isinstance(table, dict):
+            return None
+        if not table.get("table_contents") and not table.get("table_info"):
+            return None
+
+        cursor.execute(
+            """
+            INSERT INTO document_artifacts (
+                doc_path, page_num, artifact_type,
+                content_json, table_info
+            ) VALUES (?, ?, ?, ?, ?)
+            """,
+            (
+                doc_path,
+                page_num,
+                "table",
+                json.dumps(table.get("table_contents", {})),
+                table.get("table_info", ""),
+            ),
+        )
+
+        # Prepare for vector indexing
+        indexable_content = f"""{table.get("table_info", "")}\n\n```json {json.dumps(table.get("table_contents", {}))}```
+        """
+        return str(cursor.lastrowid), indexable_content
+
+
+def flatten_json_to_sql(
+    json_path: str, sql_db_path: str, vector_db_path: Optional[str] = None
+) -> None:
+    """
+    Convert structured JSON data to SQL database entries and optionally index in a vector database.
+
+    Args:
+        json_path: Path to the JSON file
+        sql_db_path: Path to the SQLite database
+        vector_db_path: Optional path to the vector database
+
+    Raises:
+        FileNotFoundError: If JSON file doesn't exist
+        ValueError: If paths are invalid
+        sqlite3.Error: If there's a database error
+    """
+    # Validate inputs
+    if not os.path.exists(json_path):
+        raise FileNotFoundError(f"JSON file not found: {json_path}")
+
+    # Create the SQL table if it doesn't exist
+    DatabaseManager.create_artifact_table(sql_db_path)
+
+    # Initialize buffers for vector indexing
+    document_ids = []
+    document_contents = []
+
+    # Counts for logging
+    counts = {}
+
+    # Load JSON data
+    try:
+        with open(json_path, "r") as f:
+            data = json.load(f)
+    except json.JSONDecodeError as e:
+        print(f"Invalid JSON file: {e}")
+        raise ValueError(f"Invalid JSON file: {e}")
+
+    # Connect to the database
+    try:
+        with sqlite3.connect(sql_db_path) as conn:
+            cursor = conn.cursor()
+
+            # Process each page in the document
+            for page in data:
+                doc_path = page.get("doc_path", "")
+                page_num = page.get("page_num", 0)
+                artifacts = page.get("artifacts", {})
+
+                # Process text
+                for text in artifacts.get("text", []):
+                    result = SQLProcessor.process_text_artifact(
+                        cursor, doc_path, page_num, text
+                    )
+                    if result:
+                        document_ids.append(result[0])
+                        document_contents.append(result[1])
+                        counts["text"] = counts.get("text", 0) + 1
+
+                # Process images
+                for image in artifacts.get("images", []):
+                    result = SQLProcessor.process_image_artifact(
+                        cursor, doc_path, page_num, image
+                    )
+                    if result:
+                        document_ids.append(result[0])
+                        document_contents.append(result[1])
+                        counts["image"] = counts.get("image", 0) + 1
+
+                # Process tables
+                for table in artifacts.get("tables", []):
+                    result = SQLProcessor.process_table_artifact(
+                        cursor, doc_path, page_num, table
+                    )
+                    if result:
+                        document_ids.append(result[0])
+                        document_contents.append(result[1])
+                        counts["table"] = counts.get("table", 0) + 1
+
+            conn.commit()
+    except sqlite3.Error as e:
+        print(f"Database error: {e}")
+        raise
+
+    # Write to vector index
+    if vector_db_path and document_ids:
+        VectorIndexManager.write_to_index(
+            vector_db_path, document_ids, document_contents
+        )
+
+    return counts
+
+
+def json_to_csv(data: dict, info: str = "") -> Tuple[str, str]:
+    system_prompt = """You are an expert at converting JSON data to flat csv tables.
+
+You will receive 2 inputs:
+1. JSON-formatted data of a table
+2. A string describing the contents of the table.
+
+I require 2 things from you:
+1. A CSV string representation of the table
+2. A succinct filename for this table based on the data contents.
+
+You should only respond with a JSON, no preamble required. Your JSON response should follow this format:
+{"csv_table": <str of table>, "filename": <filename to save table>}"""
+
+    user_prompt = f"data:\n{json.dumps(data)}"
+    if info:
+        user_prompt += f"\n\ninfo:\n{info}"
+
+    request = InferenceUtils.request_builder(
+        user_prompt=user_prompt,
+        system_prompt=system_prompt,
+        temperature=0.4,
+        max_completion_tokens=2048,
+        top_p=0.9,
+        seed=42,
+    )
+    backend = config["model"].get("backend")
+    if backend == "offline-vllm":
+        vllm_request_batch = InferenceUtils.make_vllm_batch(request)
+        raw_response = InferenceUtils.run_vllm_inference(vllm_request_batch)[0]
+    elif backend == "openai-compat":
+        raw_response = InferenceUtils.run_openai_inference(request)
+
+    json_response = JSONUtils.extract_json_from_response(raw_response)
+
+    return json_response["csv_table"], json_response["filename"]
+
+
+def main(json_path, db_path, vector_db_path):
+    """
+    Example usage of the functions.
+    """
+
+    try:
+        # Process JSON and store in SQL
+        flatten_json_to_sql(json_path, db_path, vector_db_path)
+
+        # Example SQL queries
+        print("All artifacts:")
+        print(DatabaseManager.sql_query(db_path, "SELECT * FROM document_artifacts"))
+
+        print("\nTables only:")
+        print(
+            DatabaseManager.sql_query(
+                db_path,
+                "SELECT * FROM document_artifacts WHERE artifact_type = 'table'",
+            )
+        )
+
+        # Example KNN queries
+        query = "What is the average revenue per day for Meta?"
+        print("\nVector index query: ", query)
+        vector_query = VectorIndexManager.knn_query(query, vector_db_path)
+        print(vector_query)
+
+        # KNN + SQL
+        df = DatabaseManager.sql_query(db_path, "SELECT * FROM document_artifacts")
+        df = vector_query.merge(df, left_on="ids", right_on="id", how="inner")
+        print("\nJoined results:")
+        print(df)
+    except Exception as e:
+        print(f"Error in main: {e}")
+
+
+if __name__ == "__main__":
+    main()

+ 577 - 0
end-to-end-use-cases/structured_parser/src/structured_extraction.py

@@ -0,0 +1,577 @@
+"""
+Structured data extraction module for processing images with LLMs.
+
+This module provides functionality to extract structured data from images using
+local or API-based LLMs. It handles the preparation of requests, batching for
+efficient inference, and parsing of responses into structured formats.
+"""
+
+import json
+import logging
+import os
+from datetime import datetime
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Tuple, Union
+
+import fire
+
+from json_to_sql import flatten_json_to_sql, json_to_csv
+from tqdm import tqdm
+from typedicts import ArtifactCollection, ExtractedPage, InferenceRequest
+
+from utils import (
+    config,
+    export_csvs_to_excel_tabs,
+    ImageUtils,
+    InferenceUtils,
+    JSONUtils,
+    PDFUtils,
+)
+
+
+# Constants
+EXTRACTED_DATA_KEY = "extracted_data"
+SUPPORTED_BACKENDS = ["offline-vllm", "openai-compat"]
+SUPPORTED_FILE_TYPES = [".pdf"]
+
+
+def setup_logger(logfile, verbose=False):
+    # Create a logger
+    logger = logging.getLogger(__name__)
+    logger.setLevel(logging.DEBUG)
+
+    # Create a file handler
+    file_handler = logging.FileHandler(logfile)
+    file_handler.setLevel(logging.DEBUG)
+
+    # Create a formatter and set it for the file handler
+    formatter = logging.Formatter(
+        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+    )
+    file_handler.setFormatter(formatter)
+
+    # Add the file handler to the logger
+    logger.addHandler(file_handler)
+
+    # If verbose, also add a console handler
+    if verbose:
+        console_handler = logging.StreamHandler()
+        console_handler.setLevel(logging.DEBUG)
+        console_handler.setFormatter(formatter)
+        logger.addHandler(console_handler)
+
+    return logger
+
+
+logger = setup_logger("app.log", verbose=False)
+
+
+class RequestBuilder:
+    """Builder for LLM inference requests."""
+
+    @staticmethod
+    def build(
+        img_path: str,
+        system_prompt: str,
+        user_prompt: str,
+        output_schema: Dict[str, Any],
+        use_json_decoding: bool = False,
+        model: Optional[str] = None,
+    ) -> InferenceRequest:
+        """
+        Build an inference request for an image.
+
+        Args:
+            img_path: Path to the image file
+            system_prompt: System prompt for the LLM
+            user_prompt: User prompt for the LLM
+            output_schema: JSON schema for the output
+            use_json_decoding: Whether to use JSON-guided decoding
+            model: Optional model override
+
+        Returns:
+            InferenceRequest: Formatted request for the LLM
+
+        Raises:
+            FileNotFoundError: If the image file doesn't exist
+        """
+        if not os.path.exists(img_path):
+            raise FileNotFoundError(f"Image file not found: {img_path}")
+
+        img_b64 = ImageUtils.encode_image(img_path)
+
+        # Create a copy of the inference config to avoid modifying the original
+        request_params = dict(config["extraction_inference"])
+        request_params["messages"] = [
+            {"role": "system", "content": system_prompt},
+            {
+                "role": "user",
+                "content": [
+                    {
+                        "type": "image_url",
+                        "image_url": {"url": f"data:image/png;base64,{img_b64}"},
+                    },
+                    {"type": "text", "text": user_prompt},
+                ],
+            },
+        ]
+
+        if use_json_decoding:
+            request_params["response_format"] = {
+                "type": "json_schema",
+                "json_schema": {"name": "OutputSchema", "schema": output_schema},
+            }
+
+        if model:
+            request_params["model"] = model
+
+        return request_params
+
+
+class ArtifactExtractor:
+    """Extractor for document artifacts."""
+
+    @staticmethod
+    def _prepare_inference_requests(
+        img_path: str, artifact_types: List[str]
+    ) -> List[Tuple[str, InferenceRequest]]:
+        """
+        Prepare inference requests for each artifact type.
+
+        Args:
+            img_path: Path to the image file
+            artifact_types: Types of artifacts to extract
+
+        Returns:
+            List of tuples containing (artifact_type, inference_request)
+        """
+        requests = []
+        for artifact in artifact_types:
+            art_config = config["artifacts"].get(artifact)
+            if not art_config:
+                logger.warning(f"No configuration found for artifact type: {artifact}")
+                continue
+
+            system_prompt = art_config["prompts"].get("system", "")
+            user_prompt = art_config["prompts"].get("user", "")
+            output_schema = art_config.get("output_schema", None)
+            use_json_decoding = art_config.get("use_json_decoding", False)
+
+            if user_prompt and output_schema is not None:
+                user_prompt = user_prompt.format(schema=json.dumps(output_schema))
+
+            request = RequestBuilder.build(
+                img_path,
+                system_prompt,
+                user_prompt,
+                output_schema,
+                use_json_decoding,
+            )
+            requests.append((artifact, request))
+
+        return requests
+
+    @staticmethod
+    def _run_inference(
+        requests: List[Tuple[str, InferenceRequest]],
+    ) -> List[Tuple[str, str]]:
+        """
+        Run inference for all requests.
+
+        Args:
+            requests: List of tuples containing (artifact_type, inference_request)
+
+        Returns:
+            List of tuples containing (artifact_type, response)
+
+        Raises:
+            ValueError: If the backend is not supported
+        """
+        backend = config["model"].get("backend")
+        if backend not in SUPPORTED_BACKENDS:
+            raise ValueError(
+                f"Allowed config.model.backend: {SUPPORTED_BACKENDS}, got unknown value: {backend}"
+            )
+
+        artifact_types = [r[0] for r in requests]
+        inference_requests = [r[1] for r in requests]
+
+        if backend == "offline-vllm":
+            request_batch = InferenceUtils.make_vllm_batch(inference_requests)
+            response_batch = InferenceUtils.run_vllm_inference(request_batch)
+        elif backend == "openai-compat":
+            response_batch = [
+                InferenceUtils.run_openai_inference(request)
+                for request in inference_requests
+            ]
+
+        return list(zip(artifact_types, response_batch))
+
+    @staticmethod
+    def _process_responses(responses: List[Tuple[str, str]]) -> ArtifactCollection:
+        """
+        Process responses into a structured artifact collection.
+
+        Args:
+            responses: List of tuples containing (artifact_type, response)
+
+        Returns:
+            ArtifactCollection: Extracted artifacts
+        """
+        extracted = {}
+        for artifact_type, raw_response in responses:
+            try:
+                json_response = JSONUtils.extract_json_from_response(raw_response)
+
+                if EXTRACTED_DATA_KEY in json_response:
+                    json_response = json_response[EXTRACTED_DATA_KEY]
+
+                extracted.update(json_response)
+            except Exception as e:
+                logger.error(f"Failed to process response for {artifact_type}: {e}")
+                extracted.update({artifact_type: {"error": str(e)}})
+
+        return extracted
+
+    @staticmethod
+    def from_image(
+        img_path: str,
+        artifact_types: Union[List[str], str],
+    ) -> ArtifactCollection:
+        """
+        Extract artifacts from an image.
+
+        Args:
+            img_path: Path to the image file
+            artifact_types: Type(s) of artifacts to extract
+
+        Returns:
+            ArtifactCollection: Extracted artifacts
+
+        Raises:
+            ValueError: If the backend is not supported
+            FileNotFoundError: If the image file doesn't exist
+        """
+        if not os.path.exists(img_path):
+            raise FileNotFoundError(f"Image file not found: {img_path}")
+
+        if isinstance(artifact_types, str):
+            artifact_types = [artifact_types]
+
+        # Prepare inference requests
+        requests = ArtifactExtractor._prepare_inference_requests(
+            img_path, artifact_types
+        )
+
+        # Run inference
+        responses = ArtifactExtractor._run_inference(requests)
+
+        # Process responses
+        return ArtifactExtractor._process_responses(responses)
+
+    @staticmethod
+    def from_pdf(pdf_path: str, artifact_types: List[str]) -> List[ExtractedPage]:
+        """
+        Extract artifacts from all pages in a PDF.
+
+        Args:
+            pdf_path: Path to the PDF file
+            artifact_types: Types of artifacts to extract
+
+        Returns:
+            List[ExtractedPage]: Extracted pages with artifacts
+
+        Raises:
+            FileNotFoundError: If the PDF file doesn't exist
+        """
+        if not os.path.exists(pdf_path):
+            raise FileNotFoundError(f"PDF file not found: {pdf_path}")
+
+        pdf_pages = PDFUtils.extract_pages(pdf_path)
+        logger.info(f"Processing {len(pdf_pages)} pages from {pdf_path}")
+        for page in tqdm(pdf_pages, desc="Processing PDF pages"):
+            try:
+                page_artifacts = ArtifactExtractor.from_image(
+                    page["image_path"], artifact_types
+                )
+                page_artifacts = json.loads(json.dumps(page_artifacts))
+                page["artifacts"] = page_artifacts
+            except Exception as e:
+                logger.error(
+                    f"Error processing page {page['page_num']} in {pdf_path}: {e}"
+                )
+                page["artifacts"] = {"error": f"Error {e} in artifact extraction"}
+
+        return pdf_pages
+
+    # @staticmethod
+    # async def _run_inference_async(
+    #     requests: List[Tuple[str, InferenceRequest]],
+    # ) -> List[Tuple[str, str]]:
+    #     """
+    #     Run inference asynchronously for all requests.
+
+    #     Args:
+    #         requests: List of tuples containing (artifact_type, inference_request)
+
+    #     Returns:
+    #         List of tuples containing (artifact_type, response)
+
+    #     Raises:
+    #         ValueError: If the backend is not supported
+    #     """
+    #     backend = config["model"].get("backend")
+    #     if backend not in SUPPORTED_BACKENDS:
+    #         raise ValueError(
+    #             f"Allowed config.model.backend: {SUPPORTED_BACKENDS}, got unknown value: {backend}"
+    #         )
+
+    #     artifact_types = [r[0] for r in requests]
+    #     inference_requests = [r[1] for r in requests]
+
+    #     if backend == "offline-vllm":
+    #         request_batch = InferenceUtils.make_vllm_batch(inference_requests)
+    #         response_batch = InferenceUtils.run_vllm_inference(request_batch)
+    #     elif backend == "openai-compat":
+    #         tasks = [
+    #             InferenceUtils.async_run_openai_inference(request)
+    #             for request in inference_requests
+    #         ]
+    #         response_batch = await asyncio.gather(*tasks)
+
+    #     return list(zip(artifact_types, response_batch))
+
+    # @staticmethod
+    # async def from_image_async(
+    #     img_path: str,
+    #     artifact_types: Union[List[str], str],
+    # ) -> ArtifactCollection:
+    #     """
+    #     Extract artifacts from an image asynchronously.
+
+    #     Args:
+    #         img_path: Path to the image file
+    #         artifact_types: Type(s) of artifacts to extract
+
+    #     Returns:
+    #         ArtifactCollection: Extracted artifacts
+
+    #     Raises:
+    #         ValueError: If the backend is not supported
+    #         FileNotFoundError: If the image file doesn't exist
+    #     """
+    #     if not os.path.exists(img_path):
+    #         raise FileNotFoundError(f"Image file not found: {img_path}")
+
+    #     if isinstance(artifact_types, str):
+    #         artifact_types = [artifact_types]
+
+    #     # Prepare inference requests
+    #     requests = ArtifactExtractor._prepare_inference_requests(
+    #         img_path, artifact_types
+    #     )
+
+    #     # Run inference asynchronously
+    #     responses = await ArtifactExtractor._run_inference_async(requests)
+
+    #     # Process responses
+    #     return ArtifactExtractor._process_responses(responses)
+
+
+def get_artifact_types(text: bool, tables: bool, images: bool) -> List[str]:
+    """
+    Determine which artifact types to extract based on flags.
+
+    Args:
+        text: Whether to extract text
+        tables: Whether to extract tables
+        images: Whether to extract images
+
+    Returns:
+        List of artifact types to extract
+
+    Raises:
+        ValueError: If no artifact types are specified
+    """
+    to_extract = []
+    if text:
+        to_extract.append("text")
+    if tables:
+        to_extract.append("tables")
+    if images:
+        to_extract.append("images")
+    if not to_extract:
+        raise ValueError("No artifact types specified for extraction.")
+    return to_extract
+
+
+def get_target_files(target_path: str) -> List[Path]:
+    """
+    Get list of files to process.
+
+    Args:
+        target_path: Path to a file or directory
+
+    Returns:
+        List of Path objects to process
+
+    Raises:
+        FileNotFoundError: If the target path doesn't exist
+        ValueError: If the file type is unsupported
+    """
+    if not os.path.exists(target_path):
+        raise FileNotFoundError(f"Target path not found: {target_path}")
+
+    target_path = Path(target_path)
+    if target_path.is_file() and target_path.suffix not in SUPPORTED_FILE_TYPES:
+        raise ValueError(
+            f"Unsupported file type: {target_path.suffix}. Supported types: {SUPPORTED_FILE_TYPES}"
+        )
+
+    targets = (
+        [target_path]
+        if target_path.is_file()
+        else [f for f in target_path.iterdir() if f.suffix in SUPPORTED_FILE_TYPES]
+    )
+    logger.debug(f"Processing {len(targets)} files")
+    if not targets:
+        logger.warning(f"No supported files found in {target_path}")
+
+    return targets
+
+
+def process_files(
+    targets: List[Path], artifact_types: List[str]
+) -> List[Dict[str, Any]]:
+    """
+    Process files and extract artifacts.
+
+    Args:
+        targets: List of files to process
+        artifact_types: Types of artifacts to extract
+
+    Returns:
+        List of extracted artifacts
+    """
+    out_json = []
+    for target in targets:
+        try:
+            artifacts = ArtifactExtractor.from_pdf(target, artifact_types)
+            out_json.extend(artifacts)
+        except Exception as e:
+            logger.error(f"Failed to process {target}: {e}")
+    return out_json
+
+
+def save_results(
+    output_dir: Path,
+    data: List[Dict[str, Any]],
+    save_to_db: bool = False,
+    save_tables_as_csv: bool = False,
+    export_excel: bool = False,
+) -> None:
+    """
+    Save extraction results to a file and optionally to SQL and vector databases.
+
+    Args:
+        output_path: Path to save the JSON results
+        data: Data to save
+        save_to_sql: Whether to save to SQL database
+        sql_db_path: Path to the SQLite database file
+        save_to_vector: Whether to save to vector database
+        vector_db_path: Path to the vector database
+    """
+    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+    output_dir.mkdir(parents=True, exist_ok=True)
+
+    # Save to JSON file
+    try:
+        output_path = output_dir / f"artifacts_{timestamp}.json"
+        json_content = json.dumps(data, indent=2)
+        output_path.write_text(json_content)
+        logger.info(f"Extracted artifacts written to {output_path}")
+    except Exception as e:
+        logger.error(f"Failed to write output file: {e}")
+
+    if save_tables_as_csv or export_excel:
+        tables = sum([x["artifacts"]["tables"] for x in data], [])
+        for tab in tables:
+            # llm: convert each table to a csv string
+            csv_string, filename = json_to_csv(tab)
+            outfile = output_dir / f"tables_{timestamp}" / filename
+            outfile.parent.mkdir(parents=True, exist_ok=True)
+            outfile.write_text(csv_string)
+            logger.info(f"Extracted table written to {outfile}")
+
+        if export_excel:
+            output_path = output_dir / f"tables_{timestamp}.xlsx"
+            export_csvs_to_excel_tabs(output_dir / f"tables_{timestamp}", output_path)
+
+    # Save to SQL and vector databases
+    if save_to_db:
+        # Get database paths from config
+        sql_db_path = config.get("database", {}).get("sql_db_path", None)
+        vector_db_path = config.get("database", {}).get("vector_db_path", None)
+        assert (
+            sql_db_path is not None
+        ), "Save to SQL failed; SQL database path not found in config"
+
+        # Save to SQL and optionally to vector database
+        counts = flatten_json_to_sql(str(output_path), sql_db_path, vector_db_path)
+        logger.info(
+            f"Extracted {counts.get('text', 0)} text artifacts, {counts.get('image', 0)} image artifacts, and {counts.get('table', 0)} table artifacts from {len(data)} pages."
+        )
+        logger.info(f"Extracted artifacts saved to SQL database: {sql_db_path}")
+        logger.info(f"Extracted artifacts indexed in vector database: {vector_db_path}")
+
+
+def main(
+    target_path: str,
+    text: bool = True,
+    tables: bool = False,
+    images: bool = False,
+    save_to_db: bool = False,
+    save_tables_as_csv: bool = False,
+    export_excel: bool = False,
+) -> None:
+    """
+    Extract artifacts from PDF files and optionally save to SQL and vector databases.
+
+    Args:
+        target_path: Path to a PDF file or directory containing PDF files
+        text: Whether to extract text
+        tables: Whether to extract tables
+        images: Whether to extract images
+        save_to_sql: Whether to save extracted artifacts to SQL database
+        save_to_vector: Whether to index extracted artifacts in vector database
+        log_file: Optional path to a log file to write logs to
+
+    Raises:
+        ValueError: If no artifact types are specified or the file type is unsupported
+        FileNotFoundError: If the target path doesn't exist
+    """
+    # Get artifact types to extract
+    artifact_types = get_artifact_types(text, tables, images)
+
+    # Get files to process
+    targets = get_target_files(target_path)
+    if not targets:
+        return
+
+    # Process files
+    results = process_files(targets, artifact_types)
+
+    # Save results
+    target_path = Path(target_path)
+    output_dir = target_path.parent / "extracted"
+    save_results(
+        output_dir,
+        results,
+        save_to_db=save_to_db,
+        save_tables_as_csv=save_tables_as_csv,
+        export_excel=export_excel,
+    )
+
+
+if __name__ == "__main__":
+    fire.Fire(main)

+ 68 - 0
end-to-end-use-cases/structured_parser/src/typedicts.py

@@ -0,0 +1,68 @@
+from typing import Any, Dict, List, Optional, TypedDict, Union
+
+from vllm import SamplingParams
+
+
+class MessageContent(TypedDict):
+    """Type definition for message content in LLM requests."""
+
+    type: str
+    text: Optional[str] = None
+    image_url: Optional[Dict[str, str]] = None
+
+
+class Message(TypedDict):
+    """Type definition for a message in a LLM inference request."""
+
+    role: str
+    content: Union[str, List[MessageContent]]
+
+
+class InferenceRequest(TypedDict, total=False):
+    """Type definition for LLM inference request."""
+
+    model: str
+    messages: List[Message]
+    temperature: float
+    top_p: float
+    max_completion_tokens: int
+    seed: int
+    response_format: Optional[Dict[str, Any]]
+
+
+class VLLMInferenceRequest(TypedDict):
+    """Type definition for VLLM inference request format."""
+
+    messages: List[List[Message]]
+    sampling_params: Union[SamplingParams, List[SamplingParams]]
+
+
+class TextArtifact(TypedDict):
+    content: str
+    notes: Optional[str] = None
+
+
+class ImageArtifact(TypedDict, total=False):
+    description: str
+    caption: str
+    image_type: str
+    position_top: Optional[str] = None
+    position_left: Optional[str] = None
+
+
+class TableArtifact(TypedDict, total=False):
+    table_contents: dict
+    table_info: str
+
+
+class ArtifactCollection(TypedDict, total=False):
+    text: TextArtifact
+    images: List[ImageArtifact]
+    tables: List[TableArtifact]
+
+
+class ExtractedPage(TypedDict):
+    doc_path: str
+    image_path: str
+    page_num: int
+    artifacts: ArtifactCollection

+ 527 - 0
end-to-end-use-cases/structured_parser/src/utils.py

@@ -0,0 +1,527 @@
+"""
+Utility functions for structured data extraction.
+
+This module provides helper functions for working with JSON schemas, encoding images,
+extracting structured data from LLM responses, and logging.
+"""
+
+import ast
+import base64
+import json
+import logging
+import os
+import re
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Union
+
+import pandas as pd
+import pymupdf
+import yaml
+from openai import OpenAI
+
+from typedicts import InferenceRequest, VLLMInferenceRequest
+
+from vllm import LLM, SamplingParams
+from vllm.sampling_params import GuidedDecodingParams
+
+
+def setup_logger(logfile, verbose=False):
+    # Create a logger
+    logger = logging.getLogger(__name__)
+    logger.setLevel(logging.DEBUG)
+
+    # Create a file handler
+    file_handler = logging.FileHandler(logfile)
+    file_handler.setLevel(logging.DEBUG)
+
+    # Create a formatter and set it for the file handler
+    formatter = logging.Formatter(
+        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+    )
+    file_handler.setFormatter(formatter)
+
+    # Add the file handler to the logger
+    logger.addHandler(file_handler)
+
+    # If verbose, also add a console handler
+    if verbose:
+        console_handler = logging.StreamHandler()
+        console_handler.setLevel(logging.DEBUG)
+        console_handler.setFormatter(formatter)
+        logger.addHandler(console_handler)
+
+    return logger
+
+
+logger = logging.getLogger(__name__)
+
+# Compile regex patterns once for better performance
+JSON_BLOCK_OPEN = re.compile(r"```json")
+JSON_BLOCK_CLOSE = re.compile(r"}\s+```")
+
+
+# Configuration management
+def load_config(config_path: Optional[str] = None) -> Dict[str, Any]:
+    """
+    Load configuration from YAML file.
+
+    Args:
+        config_path: Path to the configuration file. If None, uses default path.
+
+    Returns:
+        Dict containing configuration values
+
+    Raises:
+        FileNotFoundError: If the configuration file doesn't exist
+        yaml.YAMLError: If the configuration file is invalid
+    """
+    if config_path is None:
+        config_path = os.path.join(os.path.dirname(__file__), "config.yaml")
+
+    try:
+        with open(config_path, "r") as f:
+            return yaml.safe_load(f)
+    except FileNotFoundError:
+        logger.error(f"Configuration file not found: {config_path}")
+        raise
+    except yaml.YAMLError as e:
+        logger.error(f"Invalid YAML in configuration file: {e}")
+        raise
+
+
+# Load configuration
+config = load_config()
+
+
+# LLM Singleton
+class LLMSingleton:
+    """Singleton class for managing LLM instances."""
+
+    _instance = None
+
+    @classmethod
+    def get_instance(cls) -> LLM:
+        """
+        Get or create the LLM instance.
+
+        Returns:
+            LLM: An initialized VLLM model instance
+        """
+        if cls._instance is None:
+            try:
+                cls._instance = LLM(
+                    config["model"]["path"],
+                    tensor_parallel_size=config["model"]["tensor_parallel_size"],
+                    max_model_len=config["model"]["max_model_len"],
+                    max_num_seqs=config["model"]["max_num_seqs"],
+                )
+                logger.info(f"Initialized LLM with model: {config['model']['path']}")
+            except Exception as e:
+                logger.error(f"Failed to initialize LLM: {e}")
+                raise
+        return cls._instance
+
+
+class ImageUtils:
+    """Utility functions for working with images."""
+
+    @staticmethod
+    def encode_image(image_path: Union[Path, str]) -> str:
+        """
+        Encode an image to base64.
+
+        Args:
+            image_path: Path to the image file
+
+        Returns:
+            Base64-encoded string representation of the image
+
+        Raises:
+            FileNotFoundError: If the image file doesn't exist
+        """
+        if isinstance(image_path, str):
+            image_path = Path(image_path)
+        try:
+            return base64.b64encode(image_path.read_bytes()).decode("utf-8")
+        except FileNotFoundError:
+            logger.error(f"Image file not found: {image_path}")
+            raise
+
+
+class JSONUtils:
+    """Utility functions for working with JSON data."""
+
+    @staticmethod
+    def extract_json_blocks(content: str) -> List[str]:
+        """
+        Extract JSON code blocks from markdown-formatted text.
+
+        Parses a string containing markdown-formatted text and extracts all JSON blocks
+        that are enclosed in ```json ... ``` code blocks. This is useful for extracting
+        structured data from LLM responses.
+
+        Args:
+            content: The markdown-formatted text containing JSON code blocks
+
+        Returns:
+            List[str]: A list of extracted JSON strings (without the markdown delimiters)
+        """
+        blocs_ix = []
+        str_ptr = 0
+
+        while str_ptr < len(content):
+            start_ix = content.find("```json", str_ptr)
+            if start_ix == -1:
+                break
+            start_ix += len("```json")
+            end_match = JSON_BLOCK_CLOSE.search(content[start_ix:])
+            if end_match:
+                end_ix = start_ix + end_match.start() + 1
+            else:
+                end_ix = len(content)  # no closing tag, take the rest of the string
+            blocs_ix.append((start_ix, end_ix))
+            str_ptr = end_ix + 1
+
+        return [content[ix[0] : ix[1]].strip() for ix in blocs_ix]
+
+    @staticmethod
+    def load_json_from_str(json_str: str) -> Dict[str, Any]:
+        """
+        Parse a JSON string into a Python dictionary.
+
+        Attempts to parse a string as JSON using multiple methods. First tries standard
+        json.loads(), then falls back to ast.literal_eval() if that fails. This provides
+        more robust JSON parsing for LLM outputs that might not be perfectly formatted.
+
+        Args:
+            json_str: The JSON string to parse
+
+        Returns:
+            Dict[str, Any]: The parsed JSON as a dictionary
+
+        Raises:
+            ValueError: If parsing fails
+        """
+        if not isinstance(json_str, str):
+            return json_str
+
+        try:
+            return json.loads(json_str)
+        except json.decoder.JSONDecodeError:
+            # Try with None replacement
+            json_str = json_str.replace("null", "None")
+            try:
+                return ast.literal_eval(json_str)
+            except:
+                raise ValueError(f"Failed to load valid JSON from string: {json_str}")
+
+    @staticmethod
+    def extract_json_from_response(content: str) -> Dict[str, Any]:
+        """
+        Extract and parse JSON from an LLM response.
+
+        Processes a response from an LLM that may contain JSON in a markdown code block.
+        First checks if the response contains markdown-formatted JSON blocks and extracts them,
+        then parses the JSON string into a Python dictionary.
+
+        Args:
+            content: The LLM response text that may contain JSON
+
+        Returns:
+            Dict[str, Any]: The parsed JSON as a dictionary
+
+        Raises:
+            ValueError: If extraction or parsing fails
+        """
+        try:
+            if "```json" in content:
+                json_blocks = JSONUtils.extract_json_blocks(content)
+                if not json_blocks:
+                    raise ValueError("No JSON blocks found in response")
+                content = json_blocks[-1]
+
+            return JSONUtils.load_json_from_str(content)
+        except Exception as e:
+            raise ValueError(f"Failed to extract JSON from response: {str(e)}")
+
+    @staticmethod
+    def make_all_fields_required(schema: Dict[str, Any]) -> None:
+        """
+        Make all fields in a JSON schema required.
+
+        Recursively modifies the JSON schema in-place, so that every property in each 'properties'
+        is added to the 'required' list at that schema level. This ensures that the LLM will
+        attempt to extract all fields defined in the schema.
+
+        Args:
+            schema: The JSON schema to modify
+        """
+
+        def _process_schema_node(subschema):
+            """Process a single node in the schema."""
+            if not isinstance(subschema, dict):
+                return
+
+            schema_type = subschema.get("type")
+            if schema_type == "object" or (
+                isinstance(schema_type, list) and "object" in schema_type
+            ):
+                props = subschema.get("properties")
+                if isinstance(props, dict):
+                    subschema["required"] = list(props.keys())
+
+            # Recurse into sub-schemas
+            for key in ("properties", "definitions", "patternProperties"):
+                children = subschema.get(key)
+                if isinstance(children, dict):
+                    for v in children.values():
+                        _process_schema_node(v)
+
+            # Recurse into schema arrays
+            for key in ("allOf", "anyOf", "oneOf"):
+                children = subschema.get(key)
+                if isinstance(children, list):
+                    for v in children:
+                        _process_schema_node(v)
+
+            # 'items' can be a schema or list of schemas
+            items = subschema.get("items")
+            if isinstance(items, dict):
+                _process_schema_node(items)
+            elif isinstance(items, list):
+                for v in items:
+                    _process_schema_node(v)
+
+            # Extras: 'not', 'if', 'then', 'else'
+            for key in ["not", "if", "then", "else"]:
+                sub = subschema.get(key)
+                if isinstance(sub, dict):
+                    _process_schema_node(sub)
+
+        _process_schema_node(schema)
+
+
+class PDFUtils:
+    """Utility functions for working with PDF files."""
+
+    @staticmethod
+    def extract_pages(
+        pdf_path: Union[str, Path], output_dir: Union[str, Path] = None
+    ) -> List[Dict[str, Any]]:
+        """
+        Extract pages from a PDF file as images to disk.
+
+        Args:
+            pdf_path: Path to the PDF file
+            output_dir: Directory to save extracted images (defaults to /tmp/pdf_images)
+
+        Returns:
+            List of dictionaries containing doc_path, image_path, and page_num
+
+        Raises:
+            FileNotFoundError: If the PDF file doesn't exist
+        """
+        if isinstance(pdf_path, str):
+            pdf_path = Path(pdf_path)
+
+        if not pdf_path.exists():
+            logger.error(f"PDF file not found: {pdf_path}")
+            raise FileNotFoundError(f"PDF file not found: {pdf_path}")
+
+        stem = pdf_path.stem
+        if output_dir is None:
+            output_dir = Path("/tmp/pdf_images")
+        elif isinstance(output_dir, str):
+            output_dir = Path(output_dir)
+
+        output_dir.mkdir(exist_ok=True, parents=True)
+        pages = []
+
+        try:
+            pdf_document = pymupdf.open(pdf_path)
+            for page_num, page in enumerate(pdf_document):
+                image_path = output_dir / f"{stem}_{page_num}.png"
+                pix = page.get_pixmap(dpi=100)
+                pix.save(str(image_path))
+
+                pages.append(
+                    {
+                        "doc_path": str(pdf_path),
+                        "image_path": str(image_path),
+                        "page_num": page_num,
+                    }
+                )
+            return pages
+        except Exception as e:
+            logger.error(f"Failed to extract pages from PDF: {e}")
+            raise
+
+
+class InferenceUtils:
+    """Utility functions for running inference with LLMs."""
+
+    @staticmethod
+    def get_offline_llm() -> LLM:
+        """
+        Initialize and return a local LLM instance using the singleton pattern.
+
+        Returns:
+            LLM: An initialized VLLM model instance
+        """
+        return LLMSingleton.get_instance()
+
+    @staticmethod
+    def make_vllm_batch(
+        request_params_batch: Union[InferenceRequest, List[InferenceRequest]],
+    ) -> VLLMInferenceRequest:
+        """
+        Convert one or more inference requests to VLLM batch format.
+
+        Args:
+            request_params_batch: Single request parameters or a list of request parameters
+
+        Returns:
+            VLLMInferenceRequest: Formatted request for VLLM
+        """
+        if isinstance(request_params_batch, dict):
+            request_params_batch = [request_params_batch]
+
+        sampling_params = []
+        messages = []
+        for req in request_params_batch:
+            params = {
+                "top_p": req["top_p"],
+                "temperature": req["temperature"],
+                "max_tokens": req["max_completion_tokens"],
+                "seed": req["seed"],
+            }
+            if "response_format" in req:
+                gd_params = GuidedDecodingParams(
+                    json=req["response_format"]["json_schema"]["schema"]
+                )
+                sampling_params.append(
+                    SamplingParams(guided_decoding=gd_params, **params)
+                )
+            else:
+                sampling_params.append(SamplingParams(**params))
+            messages.append(req["messages"])
+
+        return {"messages": messages, "sampling_params": sampling_params}
+
+    @staticmethod
+    def run_vllm_inference(
+        vllm_request: VLLMInferenceRequest,
+    ) -> List[str]:
+        """
+        Run inference on a batch of requests using the local LLM.
+
+        This function processes one or more requests through the local LLM,
+        handling the conversion to VLLM format and extracting the raw text
+        responses.
+
+        Args:
+            vllm_request: Formatted request for VLLM
+
+        Returns:
+            List[str]: Raw text responses from the LLM for each request in the batch
+        """
+        try:
+            local_llm = InferenceUtils.get_offline_llm()
+            out = local_llm.chat(
+                vllm_request["messages"], vllm_request["sampling_params"], use_tqdm=True
+            )
+            raw_responses = [r.outputs[0].text for r in out]
+            return raw_responses
+        except Exception as e:
+            logger.error(f"VLLM inference failed: {e}")
+            raise
+
+    @staticmethod
+    def run_openai_inference(request: InferenceRequest) -> str:
+        """
+        Run inference using OpenAI-compatible API.
+
+        Args:
+            request: Inference request parameters
+
+        Returns:
+            str: Model response text
+        """
+        try:
+            client = OpenAI(
+                base_url=config["model"]["base_url"], api_key=config["model"]["api_key"]
+            )
+            model_id = config["model"]["model_id"] or client.models.list().data[0].id
+            r = client.chat.completions.create(model=model_id, **request)
+            return r.choices[0].message.content
+        except Exception as e:
+            logger.error(f"OpenAI inference failed: {e}")
+            raise
+
+    @staticmethod
+    def request_builder(
+        user_prompt: str,
+        system_prompt: str = None,
+        img_path: str = None,
+        use_json_decoding: bool = False,
+        output_schema: Dict[str, Any] = None,
+        **kwargs,
+    ) -> InferenceRequest:
+        request = kwargs
+
+        msgs = []
+        if system_prompt:
+            msgs.append({"role": "system", "content": system_prompt})
+
+        user_content = []
+        if img_path:
+            if not os.path.exists(img_path):
+                raise FileNotFoundError(f"Image file not found: {img_path}")
+            img_b64 = ImageUtils.encode_image(img_path)
+            user_content.append(
+                {
+                    "type": "image_url",
+                    "image_url": {"url": f"data:image/png;base64,{img_b64}"},
+                }
+            )
+        user_content.append({"type": "text", "text": user_prompt})
+        msgs.append({"role": "user", "content": user_content})
+        request["messages"] = msgs
+
+        if use_json_decoding:
+            request["response_format"] = {
+                "type": "json_schema",
+                "json_schema": {"name": "OutputSchema", "schema": output_schema},
+            }
+
+        return request
+
+
+def export_csvs_to_excel_tabs(csv_folder_path, output_excel_path):
+    """
+    Exports multiple CSV files from a specified folder into a single Excel
+    workbook, with each CSV appearing as a separate tab (sheet).
+
+    Args:
+        csv_folder_path (str): The path to the folder containing the CSV files.
+        output_excel_path (str): The desired path for the output Excel file.
+    """
+    try:
+        # Create an ExcelWriter object
+        with pd.ExcelWriter(output_excel_path, engine="xlsxwriter") as writer:
+            # Iterate through all files in the specified folder
+            for filename in os.listdir(csv_folder_path):
+                if filename.endswith(".csv"):
+                    csv_file_path = os.path.join(csv_folder_path, filename)
+                    sheet_name = os.path.splitext(filename)[0][:31]
+
+                    # Read the CSV file into a pandas DataFrame
+                    df = pd.read_csv(csv_file_path)
+
+                    # Write the DataFrame to a new sheet in the Excel file
+                    df.to_excel(writer, sheet_name=sheet_name, index=False)
+
+        print(f"Successfully exported CSV files to '{output_excel_path}'")
+
+    except Exception as e:
+        print(f"An error occurred: {e}")