json_to_sql.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587
  1. """
  2. JSON to SQL conversion module for structured document data.
  3. This module provides functionality to convert structured JSON data extracted from
  4. documents into SQLite database entries and vector embeddings for semantic search.
  5. """
  6. import json
  7. import logging
  8. import os
  9. import sqlite3
  10. from typing import Any, Dict, List, Optional, Tuple
  11. import chromadb
  12. import pandas as pd
  13. from chromadb.config import Settings
  14. from utils import config, InferenceUtils, JSONUtils
  15. def setup_logger(logfile, verbose=False):
  16. # Create a logger
  17. logger = logging.getLogger(__name__)
  18. logger.setLevel(logging.DEBUG)
  19. # Create a file handler
  20. file_handler = logging.FileHandler(logfile)
  21. file_handler.setLevel(logging.DEBUG)
  22. # Create a formatter and set it for the file handler
  23. formatter = logging.Formatter(
  24. "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
  25. )
  26. file_handler.setFormatter(formatter)
  27. # Add the file handler to the logger
  28. logger.addHandler(file_handler)
  29. # If verbose, also add a console handler
  30. if verbose:
  31. console_handler = logging.StreamHandler()
  32. console_handler.setLevel(logging.DEBUG)
  33. console_handler.setFormatter(formatter)
  34. logger.addHandler(console_handler)
  35. return logger
  36. logger = logging.getLogger(__name__)
  37. class DatabaseManager:
  38. """Manager for database operations."""
  39. @staticmethod
  40. def validate_path(path: str) -> str:
  41. """
  42. Validate that a file path exists or can be created.
  43. Args:
  44. path: Path to validate
  45. Returns:
  46. Validated path
  47. Raises:
  48. ValueError: If path is invalid
  49. """
  50. if not path:
  51. raise ValueError("Database path cannot be empty")
  52. # Ensure directory exists
  53. directory = os.path.dirname(path)
  54. if directory and not os.path.exists(directory):
  55. try:
  56. os.makedirs(directory, exist_ok=True)
  57. except OSError as e:
  58. raise ValueError(f"Cannot create directory for database: {e}")
  59. return path
  60. @staticmethod
  61. def create_artifact_table(sql_db_path: str) -> None:
  62. """
  63. Create the SQL table schema for storing document artifacts.
  64. Args:
  65. sql_db_path: Path to the SQLite database file
  66. Raises:
  67. sqlite3.Error: If there's an error creating the table
  68. """
  69. sql_db_path = DatabaseManager.validate_path(sql_db_path)
  70. try:
  71. with sqlite3.connect(sql_db_path) as conn:
  72. cursor = conn.cursor()
  73. # Drop table if it exists
  74. cursor.execute("DROP TABLE IF EXISTS document_artifacts")
  75. # Create table with schema
  76. cursor.execute(
  77. """
  78. CREATE TABLE IF NOT EXISTS document_artifacts (
  79. id INTEGER PRIMARY KEY AUTOINCREMENT,
  80. doc_path TEXT,
  81. page_num INTEGER,
  82. artifact_type TEXT, -- 'table', 'text', or 'image'
  83. -- Common metadata
  84. content_json TEXT, -- JSON string of the artifact content
  85. -- Table specific fields
  86. table_info TEXT,
  87. -- Text specific fields
  88. text_content TEXT,
  89. text_notes TEXT,
  90. -- Image specific fields
  91. image_position_top TEXT,
  92. image_position_left TEXT,
  93. image_description TEXT,
  94. image_caption TEXT,
  95. image_type TEXT
  96. )
  97. """
  98. )
  99. # Create indexes for common queries
  100. cursor.execute(
  101. "CREATE INDEX IF NOT EXISTS idx_artifact_type ON document_artifacts(artifact_type)"
  102. )
  103. cursor.execute(
  104. "CREATE INDEX IF NOT EXISTS idx_doc_path ON document_artifacts(doc_path)"
  105. )
  106. conn.commit()
  107. except sqlite3.Error as e:
  108. print(f"Database error creating table: {e}")
  109. raise
  110. @staticmethod
  111. def sql_query(db_path: str, query: str) -> pd.DataFrame:
  112. """
  113. Query the document artifacts table and return results as a DataFrame.
  114. Args:
  115. db_path: Path to the SQLite database file
  116. query: SQL query to execute
  117. Returns:
  118. DataFrame containing query results
  119. Raises:
  120. ValueError: If query is empty or invalid
  121. sqlite3.Error: If there's a database error
  122. """
  123. if not query or not query.strip():
  124. raise ValueError("Query cannot be empty")
  125. if not os.path.exists(db_path):
  126. raise FileNotFoundError(f"Database file not found: {db_path}")
  127. try:
  128. with sqlite3.connect(db_path) as conn:
  129. df = pd.read_sql_query(query, conn)
  130. return df
  131. except (sqlite3.Error, pd.io.sql.DatabaseError) as e:
  132. print(f"Query error: {e}")
  133. raise
  134. @staticmethod
  135. def export_db(db_path: str, export_path: str) -> None:
  136. """
  137. Export the document artifacts table to a CSV file.
  138. Args:
  139. db_path: Path to the SQLite database file
  140. export_path: Path to the CSV file to export
  141. Raises:
  142. ValueError: If export path is invalid
  143. sqlite3.Error: If there's a database error
  144. """
  145. if not export_path or not export_path.strip():
  146. raise ValueError("Export path cannot be empty")
  147. if not os.path.exists(db_path):
  148. raise FileNotFoundError(f"Database file not found: {db_path}")
  149. df = DatabaseManager.sql_query(db_path, "SELECT * FROM document_artifacts")
  150. df.to_csv(export_path, index=False)
  151. class VectorIndexManager:
  152. """Manager for vector index operations."""
  153. @staticmethod
  154. def write_to_index(
  155. vector_db_path: str, document_ids: List[str], document_contents: List[str]
  156. ) -> None:
  157. """
  158. Write document contents to a vector index for semantic search.
  159. Args:
  160. vector_db_path: Path to the vector database
  161. document_ids: List of document IDs
  162. document_contents: List of document contents to index
  163. Raises:
  164. ValueError: If inputs are invalid
  165. RuntimeError: If there's an error writing to the index
  166. """
  167. if not document_ids or not document_contents:
  168. print("No documents to index")
  169. return
  170. if len(document_ids) != len(document_contents):
  171. raise ValueError(
  172. "document_ids and document_contents must have the same length"
  173. )
  174. try:
  175. client = chromadb.PersistentClient(
  176. path=vector_db_path, settings=Settings(anonymized_telemetry=False)
  177. )
  178. collection = client.get_or_create_collection(name="structured_parser")
  179. collection.add(
  180. documents=document_contents,
  181. ids=document_ids,
  182. )
  183. logger.info(f"Added {len(document_ids)} documents to vector index")
  184. except Exception as e:
  185. print(f"Error writing to vector index: {e}")
  186. raise RuntimeError(f"Failed to write to vector index: {e}")
  187. @staticmethod
  188. def knn_query(
  189. query_text: str, vector_db_path: str, n_results: int = 10
  190. ) -> pd.DataFrame:
  191. """
  192. Perform a semantic search query on the vector index.
  193. Args:
  194. query_text: Text to search for
  195. vector_db_path: Path to the vector database
  196. n_results: Number of results to return
  197. Returns:
  198. DataFrame containing query results
  199. Raises:
  200. ValueError: If query is empty
  201. FileNotFoundError: If vector database doesn't exist
  202. RuntimeError: If there's an error querying the index
  203. """
  204. if not query_text or not query_text.strip():
  205. raise ValueError("Query text cannot be empty")
  206. if not os.path.exists(vector_db_path):
  207. raise FileNotFoundError(f"Vector database not found: {vector_db_path}")
  208. try:
  209. client = chromadb.PersistentClient(
  210. path=vector_db_path, settings=Settings(anonymized_telemetry=False)
  211. )
  212. collection = client.get_collection(name="structured_parser")
  213. results = collection.query(query_texts=[query_text], n_results=n_results)
  214. df = pd.DataFrame(
  215. {k: results[k][0] for k in ["ids", "distances", "documents"]}
  216. )
  217. df["ids"] = df["ids"].apply(int)
  218. return df
  219. except Exception as e:
  220. print(f"Vector query error: {e}")
  221. raise RuntimeError(f"Failed to query vector index: {e}")
  222. class SQLProcessor:
  223. """Processor for document data."""
  224. @staticmethod
  225. def process_text_artifact(
  226. cursor: sqlite3.Cursor, doc_path: str, page_num: int, text: Dict[str, Any]
  227. ) -> Optional[Tuple[str, str]]:
  228. """
  229. Process a text artifact and insert it into the database.
  230. Args:
  231. cursor: Database cursor
  232. doc_path: Document path
  233. page_num: Page number
  234. text: Text artifact data
  235. Returns:
  236. Tuple of (document_id, indexable_content) or None if no content
  237. """
  238. if not text or not text.get("content"):
  239. return None
  240. cursor.execute(
  241. """
  242. INSERT INTO document_artifacts (
  243. doc_path, page_num, artifact_type,
  244. text_content, text_notes
  245. ) VALUES (?, ?, ?, ?, ?)
  246. """,
  247. (
  248. doc_path,
  249. page_num,
  250. "text",
  251. text.get("content", ""),
  252. text.get("notes", ""),
  253. ),
  254. )
  255. # Prepare for vector indexing
  256. indexable_content = text.get("content", "") + " | " + text.get("notes", "")
  257. return str(cursor.lastrowid), indexable_content
  258. @staticmethod
  259. def process_image_artifact(
  260. cursor: sqlite3.Cursor, doc_path: str, page_num: int, image: Dict[str, Any]
  261. ) -> Tuple[str, str]:
  262. """
  263. Process an image artifact and insert it into the database.
  264. Args:
  265. cursor: Database cursor
  266. doc_path: Document path
  267. page_num: Page number
  268. image: Image artifact data
  269. Returns:
  270. Tuple of (document_id, indexable_content)
  271. """
  272. # Skip empty tables
  273. if not isinstance(image, dict):
  274. return None
  275. cursor.execute(
  276. """
  277. INSERT INTO document_artifacts (
  278. doc_path, page_num, artifact_type,
  279. image_position_top, image_position_left,
  280. image_description, image_caption, image_type
  281. ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
  282. """,
  283. (
  284. doc_path,
  285. page_num,
  286. "image",
  287. image.get("position_top", ""),
  288. image.get("position_left", ""),
  289. image.get("description", ""),
  290. image.get("caption", ""),
  291. image.get("image_type", ""),
  292. ),
  293. )
  294. # Prepare for vector indexing
  295. indexable_content = (
  296. image.get("image_type", "")
  297. + " | "
  298. + image.get("description", "")
  299. + " | "
  300. + image.get("caption", "")
  301. )
  302. return str(cursor.lastrowid), indexable_content
  303. @staticmethod
  304. def process_table_artifact(
  305. cursor: sqlite3.Cursor, doc_path: str, page_num: int, table: Dict[str, Any]
  306. ) -> Optional[Tuple[str, str]]:
  307. """
  308. Process a table artifact and insert it into the database.
  309. Args:
  310. cursor: Database cursor
  311. doc_path: Document path
  312. page_num: Page number
  313. table: Table artifact data
  314. Returns:
  315. Tuple of (document_id, indexable_content) or None if empty table
  316. """
  317. # Skip empty tables
  318. if not isinstance(table, dict):
  319. return None
  320. if not table.get("table_contents") and not table.get("table_info"):
  321. return None
  322. cursor.execute(
  323. """
  324. INSERT INTO document_artifacts (
  325. doc_path, page_num, artifact_type,
  326. content_json, table_info
  327. ) VALUES (?, ?, ?, ?, ?)
  328. """,
  329. (
  330. doc_path,
  331. page_num,
  332. "table",
  333. json.dumps(table.get("table_contents", {})),
  334. table.get("table_info", ""),
  335. ),
  336. )
  337. # Prepare for vector indexing
  338. indexable_content = f"""{table.get("table_info", "")}\n\n```json {json.dumps(table.get("table_contents", {}))}```
  339. """
  340. return str(cursor.lastrowid), indexable_content
  341. def flatten_json_to_sql(
  342. json_path: str, sql_db_path: str, vector_db_path: Optional[str] = None
  343. ) -> None:
  344. """
  345. Convert structured JSON data to SQL database entries and optionally index in a vector database.
  346. Args:
  347. json_path: Path to the JSON file
  348. sql_db_path: Path to the SQLite database
  349. vector_db_path: Optional path to the vector database
  350. Raises:
  351. FileNotFoundError: If JSON file doesn't exist
  352. ValueError: If paths are invalid
  353. sqlite3.Error: If there's a database error
  354. """
  355. # Validate inputs
  356. if not os.path.exists(json_path):
  357. raise FileNotFoundError(f"JSON file not found: {json_path}")
  358. # Create the SQL table if it doesn't exist
  359. DatabaseManager.create_artifact_table(sql_db_path)
  360. # Initialize buffers for vector indexing
  361. document_ids = []
  362. document_contents = []
  363. # Counts for logging
  364. counts = {}
  365. # Load JSON data
  366. try:
  367. with open(json_path, "r") as f:
  368. data = json.load(f)
  369. except json.JSONDecodeError as e:
  370. print(f"Invalid JSON file: {e}")
  371. raise ValueError(f"Invalid JSON file: {e}")
  372. # Connect to the database
  373. try:
  374. with sqlite3.connect(sql_db_path) as conn:
  375. cursor = conn.cursor()
  376. # Process each page in the document
  377. for page in data:
  378. doc_path = page.get("doc_path", "")
  379. page_num = page.get("page_num", 0)
  380. artifacts = page.get("artifacts", {})
  381. # Process text
  382. for text in artifacts.get("text", []):
  383. result = SQLProcessor.process_text_artifact(
  384. cursor, doc_path, page_num, text
  385. )
  386. if result:
  387. document_ids.append(result[0])
  388. document_contents.append(result[1])
  389. counts["text"] = counts.get("text", 0) + 1
  390. # Process images
  391. for image in artifacts.get("images", []):
  392. result = SQLProcessor.process_image_artifact(
  393. cursor, doc_path, page_num, image
  394. )
  395. if result:
  396. document_ids.append(result[0])
  397. document_contents.append(result[1])
  398. counts["image"] = counts.get("image", 0) + 1
  399. # Process tables
  400. for table in artifacts.get("tables", []):
  401. result = SQLProcessor.process_table_artifact(
  402. cursor, doc_path, page_num, table
  403. )
  404. if result:
  405. document_ids.append(result[0])
  406. document_contents.append(result[1])
  407. counts["table"] = counts.get("table", 0) + 1
  408. conn.commit()
  409. except sqlite3.Error as e:
  410. print(f"Database error: {e}")
  411. raise
  412. # Write to vector index
  413. if vector_db_path and document_ids:
  414. VectorIndexManager.write_to_index(
  415. vector_db_path, document_ids, document_contents
  416. )
  417. return counts
  418. def json_to_csv(data: dict, info: str = "") -> Tuple[str, str]:
  419. system_prompt = """You are an expert at converting JSON data to flat csv tables.
  420. You will receive 2 inputs:
  421. 1. JSON-formatted data of a table
  422. 2. A string describing the contents of the table.
  423. I require 2 things from you:
  424. 1. A CSV string representation of the table
  425. 2. A succinct filename for this table based on the data contents.
  426. You should only respond with a JSON, no preamble required. Your JSON response should follow this format:
  427. {"csv_table": <str of table>, "filename": <filename to save table>}"""
  428. user_prompt = f"data:\n{json.dumps(data)}"
  429. if info:
  430. user_prompt += f"\n\ninfo:\n{info}"
  431. request = InferenceUtils.request_builder(
  432. user_prompt=user_prompt,
  433. system_prompt=system_prompt,
  434. temperature=0.4,
  435. max_completion_tokens=2048,
  436. top_p=0.9,
  437. seed=42,
  438. )
  439. backend = config["model"].get("backend")
  440. if backend == "offline-vllm":
  441. vllm_request_batch = InferenceUtils.make_vllm_batch(request)
  442. raw_response = InferenceUtils.run_vllm_inference(vllm_request_batch)[0]
  443. elif backend == "openai-compat":
  444. raw_response = InferenceUtils.run_openai_inference(request)
  445. json_response = JSONUtils.extract_json_from_response(raw_response)
  446. return json_response["csv_table"], json_response["filename"]
  447. def main(json_path, db_path, vector_db_path):
  448. """
  449. Example usage of the functions.
  450. """
  451. try:
  452. # Process JSON and store in SQL
  453. flatten_json_to_sql(json_path, db_path, vector_db_path)
  454. # Example SQL queries
  455. print("All artifacts:")
  456. print(DatabaseManager.sql_query(db_path, "SELECT * FROM document_artifacts"))
  457. print("\nTables only:")
  458. print(
  459. DatabaseManager.sql_query(
  460. db_path,
  461. "SELECT * FROM document_artifacts WHERE artifact_type = 'table'",
  462. )
  463. )
  464. # Example KNN queries
  465. query = "What is the average revenue per day for Meta?"
  466. print("\nVector index query: ", query)
  467. vector_query = VectorIndexManager.knn_query(query, vector_db_path)
  468. print(vector_query)
  469. # KNN + SQL
  470. df = DatabaseManager.sql_query(db_path, "SELECT * FROM document_artifacts")
  471. df = vector_query.merge(df, left_on="ids", right_on="id", how="inner")
  472. print("\nJoined results:")
  473. print(df)
  474. except Exception as e:
  475. print(f"Error in main: {e}")
  476. if __name__ == "__main__":
  477. main()