123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587 |
- """
- JSON to SQL conversion module for structured document data.
- This module provides functionality to convert structured JSON data extracted from
- documents into SQLite database entries and vector embeddings for semantic search.
- """
- import json
- import logging
- import os
- import sqlite3
- from typing import Any, Dict, List, Optional, Tuple
- import chromadb
- import pandas as pd
- from chromadb.config import Settings
- from utils import config, InferenceUtils, JSONUtils
- def setup_logger(logfile, verbose=False):
- # Create a logger
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.DEBUG)
- # Create a file handler
- file_handler = logging.FileHandler(logfile)
- file_handler.setLevel(logging.DEBUG)
- # Create a formatter and set it for the file handler
- formatter = logging.Formatter(
- "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
- )
- file_handler.setFormatter(formatter)
- # Add the file handler to the logger
- logger.addHandler(file_handler)
- # If verbose, also add a console handler
- if verbose:
- console_handler = logging.StreamHandler()
- console_handler.setLevel(logging.DEBUG)
- console_handler.setFormatter(formatter)
- logger.addHandler(console_handler)
- return logger
- logger = logging.getLogger(__name__)
- class DatabaseManager:
- """Manager for database operations."""
- @staticmethod
- def validate_path(path: str) -> str:
- """
- Validate that a file path exists or can be created.
- Args:
- path: Path to validate
- Returns:
- Validated path
- Raises:
- ValueError: If path is invalid
- """
- if not path:
- raise ValueError("Database path cannot be empty")
- # Ensure directory exists
- directory = os.path.dirname(path)
- if directory and not os.path.exists(directory):
- try:
- os.makedirs(directory, exist_ok=True)
- except OSError as e:
- raise ValueError(f"Cannot create directory for database: {e}")
- return path
- @staticmethod
- def create_artifact_table(sql_db_path: str) -> None:
- """
- Create the SQL table schema for storing document artifacts.
- Args:
- sql_db_path: Path to the SQLite database file
- Raises:
- sqlite3.Error: If there's an error creating the table
- """
- sql_db_path = DatabaseManager.validate_path(sql_db_path)
- try:
- with sqlite3.connect(sql_db_path) as conn:
- cursor = conn.cursor()
- # Drop table if it exists
- cursor.execute("DROP TABLE IF EXISTS document_artifacts")
- # Create table with schema
- cursor.execute(
- """
- CREATE TABLE IF NOT EXISTS document_artifacts (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- doc_path TEXT,
- page_num INTEGER,
- artifact_type TEXT, -- 'table', 'text', or 'image'
- -- Common metadata
- content_json TEXT, -- JSON string of the artifact content
- -- Table specific fields
- table_info TEXT,
- -- Text specific fields
- text_content TEXT,
- text_notes TEXT,
- -- Image specific fields
- image_position_top TEXT,
- image_position_left TEXT,
- image_description TEXT,
- image_caption TEXT,
- image_type TEXT
- )
- """
- )
- # Create indexes for common queries
- cursor.execute(
- "CREATE INDEX IF NOT EXISTS idx_artifact_type ON document_artifacts(artifact_type)"
- )
- cursor.execute(
- "CREATE INDEX IF NOT EXISTS idx_doc_path ON document_artifacts(doc_path)"
- )
- conn.commit()
- except sqlite3.Error as e:
- print(f"Database error creating table: {e}")
- raise
- @staticmethod
- def sql_query(db_path: str, query: str) -> pd.DataFrame:
- """
- Query the document artifacts table and return results as a DataFrame.
- Args:
- db_path: Path to the SQLite database file
- query: SQL query to execute
- Returns:
- DataFrame containing query results
- Raises:
- ValueError: If query is empty or invalid
- sqlite3.Error: If there's a database error
- """
- if not query or not query.strip():
- raise ValueError("Query cannot be empty")
- if not os.path.exists(db_path):
- raise FileNotFoundError(f"Database file not found: {db_path}")
- try:
- with sqlite3.connect(db_path) as conn:
- df = pd.read_sql_query(query, conn)
- return df
- except (sqlite3.Error, pd.io.sql.DatabaseError) as e:
- print(f"Query error: {e}")
- raise
- @staticmethod
- def export_db(db_path: str, export_path: str) -> None:
- """
- Export the document artifacts table to a CSV file.
- Args:
- db_path: Path to the SQLite database file
- export_path: Path to the CSV file to export
- Raises:
- ValueError: If export path is invalid
- sqlite3.Error: If there's a database error
- """
- if not export_path or not export_path.strip():
- raise ValueError("Export path cannot be empty")
- if not os.path.exists(db_path):
- raise FileNotFoundError(f"Database file not found: {db_path}")
- df = DatabaseManager.sql_query(db_path, "SELECT * FROM document_artifacts")
- df.to_csv(export_path, index=False)
- class VectorIndexManager:
- """Manager for vector index operations."""
- @staticmethod
- def write_to_index(
- vector_db_path: str, document_ids: List[str], document_contents: List[str]
- ) -> None:
- """
- Write document contents to a vector index for semantic search.
- Args:
- vector_db_path: Path to the vector database
- document_ids: List of document IDs
- document_contents: List of document contents to index
- Raises:
- ValueError: If inputs are invalid
- RuntimeError: If there's an error writing to the index
- """
- if not document_ids or not document_contents:
- print("No documents to index")
- return
- if len(document_ids) != len(document_contents):
- raise ValueError(
- "document_ids and document_contents must have the same length"
- )
- try:
- client = chromadb.PersistentClient(
- path=vector_db_path, settings=Settings(anonymized_telemetry=False)
- )
- collection = client.get_or_create_collection(name="structured_parser")
- collection.add(
- documents=document_contents,
- ids=document_ids,
- )
- logger.info(f"Added {len(document_ids)} documents to vector index")
- except Exception as e:
- print(f"Error writing to vector index: {e}")
- raise RuntimeError(f"Failed to write to vector index: {e}")
- @staticmethod
- def knn_query(
- query_text: str, vector_db_path: str, n_results: int = 10
- ) -> pd.DataFrame:
- """
- Perform a semantic search query on the vector index.
- Args:
- query_text: Text to search for
- vector_db_path: Path to the vector database
- n_results: Number of results to return
- Returns:
- DataFrame containing query results
- Raises:
- ValueError: If query is empty
- FileNotFoundError: If vector database doesn't exist
- RuntimeError: If there's an error querying the index
- """
- if not query_text or not query_text.strip():
- raise ValueError("Query text cannot be empty")
- if not os.path.exists(vector_db_path):
- raise FileNotFoundError(f"Vector database not found: {vector_db_path}")
- try:
- client = chromadb.PersistentClient(
- path=vector_db_path, settings=Settings(anonymized_telemetry=False)
- )
- collection = client.get_collection(name="structured_parser")
- results = collection.query(query_texts=[query_text], n_results=n_results)
- df = pd.DataFrame(
- {k: results[k][0] for k in ["ids", "distances", "documents"]}
- )
- df["ids"] = df["ids"].apply(int)
- return df
- except Exception as e:
- print(f"Vector query error: {e}")
- raise RuntimeError(f"Failed to query vector index: {e}")
- class SQLProcessor:
- """Processor for document data."""
- @staticmethod
- def process_text_artifact(
- cursor: sqlite3.Cursor, doc_path: str, page_num: int, text: Dict[str, Any]
- ) -> Optional[Tuple[str, str]]:
- """
- Process a text artifact and insert it into the database.
- Args:
- cursor: Database cursor
- doc_path: Document path
- page_num: Page number
- text: Text artifact data
- Returns:
- Tuple of (document_id, indexable_content) or None if no content
- """
- if not text or not text.get("content"):
- return None
- cursor.execute(
- """
- INSERT INTO document_artifacts (
- doc_path, page_num, artifact_type,
- text_content, text_notes
- ) VALUES (?, ?, ?, ?, ?)
- """,
- (
- doc_path,
- page_num,
- "text",
- text.get("content", ""),
- text.get("notes", ""),
- ),
- )
- # Prepare for vector indexing
- indexable_content = text.get("content", "") + " | " + text.get("notes", "")
- return str(cursor.lastrowid), indexable_content
- @staticmethod
- def process_image_artifact(
- cursor: sqlite3.Cursor, doc_path: str, page_num: int, image: Dict[str, Any]
- ) -> Tuple[str, str]:
- """
- Process an image artifact and insert it into the database.
- Args:
- cursor: Database cursor
- doc_path: Document path
- page_num: Page number
- image: Image artifact data
- Returns:
- Tuple of (document_id, indexable_content)
- """
- # Skip empty tables
- if not isinstance(image, dict):
- return None
- cursor.execute(
- """
- INSERT INTO document_artifacts (
- doc_path, page_num, artifact_type,
- image_position_top, image_position_left,
- image_description, image_caption, image_type
- ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
- """,
- (
- doc_path,
- page_num,
- "image",
- image.get("position_top", ""),
- image.get("position_left", ""),
- image.get("description", ""),
- image.get("caption", ""),
- image.get("image_type", ""),
- ),
- )
- # Prepare for vector indexing
- indexable_content = (
- image.get("image_type", "")
- + " | "
- + image.get("description", "")
- + " | "
- + image.get("caption", "")
- )
- return str(cursor.lastrowid), indexable_content
- @staticmethod
- def process_table_artifact(
- cursor: sqlite3.Cursor, doc_path: str, page_num: int, table: Dict[str, Any]
- ) -> Optional[Tuple[str, str]]:
- """
- Process a table artifact and insert it into the database.
- Args:
- cursor: Database cursor
- doc_path: Document path
- page_num: Page number
- table: Table artifact data
- Returns:
- Tuple of (document_id, indexable_content) or None if empty table
- """
- # Skip empty tables
- if not isinstance(table, dict):
- return None
- if not table.get("table_contents") and not table.get("table_info"):
- return None
- cursor.execute(
- """
- INSERT INTO document_artifacts (
- doc_path, page_num, artifact_type,
- content_json, table_info
- ) VALUES (?, ?, ?, ?, ?)
- """,
- (
- doc_path,
- page_num,
- "table",
- json.dumps(table.get("table_contents", {})),
- table.get("table_info", ""),
- ),
- )
- # Prepare for vector indexing
- indexable_content = f"""{table.get("table_info", "")}\n\n```json {json.dumps(table.get("table_contents", {}))}```
- """
- return str(cursor.lastrowid), indexable_content
- def flatten_json_to_sql(
- json_path: str, sql_db_path: str, vector_db_path: Optional[str] = None
- ) -> None:
- """
- Convert structured JSON data to SQL database entries and optionally index in a vector database.
- Args:
- json_path: Path to the JSON file
- sql_db_path: Path to the SQLite database
- vector_db_path: Optional path to the vector database
- Raises:
- FileNotFoundError: If JSON file doesn't exist
- ValueError: If paths are invalid
- sqlite3.Error: If there's a database error
- """
- # Validate inputs
- if not os.path.exists(json_path):
- raise FileNotFoundError(f"JSON file not found: {json_path}")
- # Create the SQL table if it doesn't exist
- DatabaseManager.create_artifact_table(sql_db_path)
- # Initialize buffers for vector indexing
- document_ids = []
- document_contents = []
- # Counts for logging
- counts = {}
- # Load JSON data
- try:
- with open(json_path, "r") as f:
- data = json.load(f)
- except json.JSONDecodeError as e:
- print(f"Invalid JSON file: {e}")
- raise ValueError(f"Invalid JSON file: {e}")
- # Connect to the database
- try:
- with sqlite3.connect(sql_db_path) as conn:
- cursor = conn.cursor()
- # Process each page in the document
- for page in data:
- doc_path = page.get("doc_path", "")
- page_num = page.get("page_num", 0)
- artifacts = page.get("artifacts", {})
- # Process text
- for text in artifacts.get("text", []):
- result = SQLProcessor.process_text_artifact(
- cursor, doc_path, page_num, text
- )
- if result:
- document_ids.append(result[0])
- document_contents.append(result[1])
- counts["text"] = counts.get("text", 0) + 1
- # Process images
- for image in artifacts.get("images", []):
- result = SQLProcessor.process_image_artifact(
- cursor, doc_path, page_num, image
- )
- if result:
- document_ids.append(result[0])
- document_contents.append(result[1])
- counts["image"] = counts.get("image", 0) + 1
- # Process tables
- for table in artifacts.get("tables", []):
- result = SQLProcessor.process_table_artifact(
- cursor, doc_path, page_num, table
- )
- if result:
- document_ids.append(result[0])
- document_contents.append(result[1])
- counts["table"] = counts.get("table", 0) + 1
- conn.commit()
- except sqlite3.Error as e:
- print(f"Database error: {e}")
- raise
- # Write to vector index
- if vector_db_path and document_ids:
- VectorIndexManager.write_to_index(
- vector_db_path, document_ids, document_contents
- )
- return counts
- def json_to_csv(data: dict, info: str = "") -> Tuple[str, str]:
- system_prompt = """You are an expert at converting JSON data to flat csv tables.
- You will receive 2 inputs:
- 1. JSON-formatted data of a table
- 2. A string describing the contents of the table.
- I require 2 things from you:
- 1. A CSV string representation of the table
- 2. A succinct filename for this table based on the data contents.
- You should only respond with a JSON, no preamble required. Your JSON response should follow this format:
- {"csv_table": <str of table>, "filename": <filename to save table>}"""
- user_prompt = f"data:\n{json.dumps(data)}"
- if info:
- user_prompt += f"\n\ninfo:\n{info}"
- request = InferenceUtils.request_builder(
- user_prompt=user_prompt,
- system_prompt=system_prompt,
- temperature=0.4,
- max_completion_tokens=2048,
- top_p=0.9,
- seed=42,
- )
- backend = config["model"].get("backend")
- if backend == "offline-vllm":
- vllm_request_batch = InferenceUtils.make_vllm_batch(request)
- raw_response = InferenceUtils.run_vllm_inference(vllm_request_batch)[0]
- elif backend == "openai-compat":
- raw_response = InferenceUtils.run_openai_inference(request)
- json_response = JSONUtils.extract_json_from_response(raw_response)
- return json_response["csv_table"], json_response["filename"]
- def main(json_path, db_path, vector_db_path):
- """
- Example usage of the functions.
- """
- try:
- # Process JSON and store in SQL
- flatten_json_to_sql(json_path, db_path, vector_db_path)
- # Example SQL queries
- print("All artifacts:")
- print(DatabaseManager.sql_query(db_path, "SELECT * FROM document_artifacts"))
- print("\nTables only:")
- print(
- DatabaseManager.sql_query(
- db_path,
- "SELECT * FROM document_artifacts WHERE artifact_type = 'table'",
- )
- )
- # Example KNN queries
- query = "What is the average revenue per day for Meta?"
- print("\nVector index query: ", query)
- vector_query = VectorIndexManager.knn_query(query, vector_db_path)
- print(vector_query)
- # KNN + SQL
- df = DatabaseManager.sql_query(db_path, "SELECT * FROM document_artifacts")
- df = vector_query.merge(df, left_on="ids", right_on="id", how="inner")
- print("\nJoined results:")
- print(df)
- except Exception as e:
- print(f"Error in main: {e}")
- if __name__ == "__main__":
- main()
|