Faraz618's picture
Update core/database.py
580665c verified
Raw
History Blame Contribute Delete
8.42 kB
import json
import logging
import sqlite3
from datetime import datetime
from typing import Optional
import config
logger = logging.getLogger(__name__)
DB_PATH = config.SQLITE_DB_PATH
def init_db():
"""Initialize SQLite database."""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
document_id TEXT UNIQUE NOT NULL,
filename TEXT,
file_extension TEXT,
document_type TEXT,
language TEXT,
is_arabic INTEGER DEFAULT 0,
total_pages INTEGER DEFAULT 0,
total_words INTEGER DEFAULT 0,
extraction_source TEXT,
confidence REAL DEFAULT 0.0,
full_text TEXT,
structured_json TEXT,
processed_at TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS entities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
document_id TEXT,
entity_type TEXT,
entity_value TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (document_id) REFERENCES documents(document_id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS benchmark_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
test_name TEXT,
language TEXT,
extraction_source TEXT,
word_count INTEGER,
char_count INTEGER,
confidence REAL,
processing_time_ms REAL,
arabic_char_ratio REAL,
run_at TEXT DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_doc_id ON documents(document_id)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_language ON documents(language)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_doc_type ON documents(document_type)"
)
conn.commit()
conn.close()
logger.info(f"Database initialized at {DB_PATH}")
def save_document(structured_doc: dict) -> bool:
"""Save a structured document to the database."""
try:
init_db()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
meta = structured_doc.get("metadata", {})
analysis = structured_doc.get("document_analysis", {})
content = structured_doc.get("content", {})
cursor.execute(
"""
INSERT OR REPLACE INTO documents (
document_id, filename, file_extension, document_type,
language, is_arabic, total_pages, total_words,
extraction_source, confidence, full_text,
structured_json, processed_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
structured_doc.get("document_id"),
meta.get("filename"),
meta.get("file_extension"),
analysis.get("document_type"),
analysis.get("language"),
1 if analysis.get("is_arabic") else 0,
analysis.get("total_pages", 0),
analysis.get("total_words", 0),
meta.get("extraction_source"),
meta.get("extraction_confidence", 0.0),
content.get("full_text", ""),
json.dumps(structured_doc, ensure_ascii=False),
meta.get("processed_at"),
),
)
doc_id = structured_doc.get("document_id")
entities = structured_doc.get("extracted_entities", {})
for entity_type, values in entities.items():
for value in values if isinstance(values, list) else [values]:
if value:
cursor.execute(
"""
INSERT INTO entities (document_id, entity_type, entity_value)
VALUES (?, ?, ?)
""",
(doc_id, entity_type, str(value)),
)
conn.commit()
conn.close()
logger.info(f"Saved document: {doc_id}")
return True
except Exception as e:
logger.error(f"Failed to save document: {e}")
return False
def get_all_documents(limit: int = 50, offset: int = 0) -> list:
"""Retrieve all documents."""
try:
init_db()
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
"""
SELECT document_id, filename, document_type, language,
is_arabic, total_pages, total_words, extraction_source,
confidence, processed_at, created_at
FROM documents
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""",
(limit, offset),
)
rows = [dict(row) for row in cursor.fetchall()]
conn.close()
return rows
except Exception as e:
logger.error(f"Failed to retrieve documents: {e}")
return []
def get_document_by_id(document_id: str) -> Optional[dict]:
"""Retrieve a specific document by ID."""
try:
init_db()
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
"SELECT * FROM documents WHERE document_id = ?",
(document_id,),
)
row = cursor.fetchone()
conn.close()
if row:
doc = dict(row)
if doc.get("structured_json"):
doc["structured_data"] = json.loads(doc["structured_json"])
return doc
return None
except Exception as e:
logger.error(f"Failed to get document {document_id}: {e}")
return None
def search_documents(query: str, language: str = None) -> list:
"""Search documents by text content."""
try:
init_db()
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
if language:
cursor.execute(
"""
SELECT document_id, filename, document_type, language,
total_words, confidence, processed_at
FROM documents
WHERE full_text LIKE ? AND language = ?
ORDER BY created_at DESC LIMIT 20
""",
(f"%{query}%", language),
)
else:
cursor.execute(
"""
SELECT document_id, filename, document_type, language,
total_words, confidence, processed_at
FROM documents
WHERE full_text LIKE ?
ORDER BY created_at DESC LIMIT 20
""",
(f"%{query}%",),
)
rows = [dict(row) for row in cursor.fetchall()]
conn.close()
return rows
except Exception as e:
logger.error(f"Search failed: {e}")
return []
def get_stats() -> dict:
"""Get overall database statistics."""
try:
init_db()
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM documents")
total = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM documents WHERE is_arabic = 1")
arabic_count = cursor.fetchone()[0]
cursor.execute(
"SELECT document_type, COUNT(*) as cnt FROM documents GROUP BY document_type"
)
by_type = dict(cursor.fetchall())
cursor.execute("SELECT AVG(confidence) FROM documents WHERE confidence > 0")
avg_conf = cursor.fetchone()[0] or 0.0
cursor.execute("SELECT SUM(total_words) FROM documents")
total_words = cursor.fetchone()[0] or 0
conn.close()
return {
"total_documents": total,
"arabic_documents": arabic_count,
"english_documents": total - arabic_count,
"by_document_type": by_type,
"average_confidence": round(avg_conf, 4),
"total_words_processed": total_words,
}
except Exception as e:
logger.error(f"Stats failed: {e}")
return {}