GraphResearcher / scripts /phase23_product_database_foundation.py
yugbirla's picture
Add product database foundation
b33072f
Raw
History Blame Contribute Delete
16.2 kB
from pathlib import Path
# Remove BOM from Python files
for path in Path("app").rglob("*.py"):
text = path.read_text(encoding="utf-8-sig")
text = text.replace("\ufeff", "")
path.write_text(text, encoding="utf-8")
print("BOM cleanup completed.")
# =====================================================
# 1. Product DB module
# =====================================================
Path("app/product/product_db.py").write_text(r'''
import os
import sqlite3
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, Any, List, Optional
from app.core.config import settings
def utc_now() -> str:
return datetime.now(timezone.utc).isoformat()
def get_database_path() -> Path:
"""
Uses APP_DATABASE_PATH if provided.
Otherwise stores DB near processed data.
On Hugging Face free Spaces, this may be temporary unless persistent storage is attached.
"""
env_path = os.getenv("APP_DATABASE_PATH")
if env_path:
db_path = Path(env_path)
else:
db_path = settings.PROCESSED_DIR.parent / "product_app.sqlite3"
db_path.parent.mkdir(parents=True, exist_ok=True)
return db_path
def get_connection():
db_path = get_database_path()
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
return conn
def row_to_dict(row) -> Dict[str, Any]:
if row is None:
return {}
return dict(row)
def rows_to_dicts(rows) -> List[Dict[str, Any]]:
return [dict(row) for row in rows]
def init_product_database() -> Dict[str, Any]:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id TEXT PRIMARY KEY,
email TEXT UNIQUE NOT NULL,
name TEXT,
role TEXT NOT NULL DEFAULT 'user',
auth_provider TEXT DEFAULT 'local',
avatar_url TEXT,
is_active INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL,
last_login_at TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_documents (
document_id TEXT PRIMARY KEY,
owner_user_id TEXT,
source_file_name TEXT,
upload_status TEXT DEFAULT 'uploaded',
index_status TEXT DEFAULT 'not_indexed',
graph_status TEXT DEFAULT 'not_built',
chunk_count INTEGER DEFAULT 0,
entity_count INTEGER DEFAULT 0,
relation_count INTEGER DEFAULT 0,
created_at TEXT NOT NULL,
indexed_at TEXT,
graph_built_at TEXT,
metadata_json TEXT,
FOREIGN KEY(owner_user_id) REFERENCES users(user_id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS conversations (
conversation_id TEXT PRIMARY KEY,
owner_user_id TEXT,
document_id TEXT,
title TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(owner_user_id) REFERENCES users(user_id),
FOREIGN KEY(document_id) REFERENCES user_documents(document_id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS messages (
message_id TEXT PRIMARY KEY,
conversation_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT NOT NULL,
metadata_json TEXT,
FOREIGN KEY(conversation_id) REFERENCES conversations(conversation_id)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS admin_logs (
log_id TEXT PRIMARY KEY,
actor_user_id TEXT,
action TEXT NOT NULL,
target_type TEXT,
target_id TEXT,
created_at TEXT NOT NULL,
metadata_json TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS app_settings (
key TEXT PRIMARY KEY,
value TEXT,
updated_at TEXT NOT NULL
)
""")
conn.commit()
tables = get_table_counts(conn)
conn.close()
return {
"status": "success",
"message": "Product database initialized.",
"database_path": str(get_database_path()),
"tables": tables
}
def get_table_counts(conn=None) -> Dict[str, int]:
should_close = False
if conn is None:
conn = get_connection()
should_close = True
cursor = conn.cursor()
table_names = [
"users",
"user_documents",
"conversations",
"messages",
"admin_logs",
"app_settings"
]
counts = {}
for table in table_names:
cursor.execute(f"SELECT COUNT(*) AS count FROM {table}")
counts[table] = int(cursor.fetchone()["count"])
if should_close:
conn.close()
return counts
def get_database_status() -> Dict[str, Any]:
init_product_database()
conn = get_connection()
counts = get_table_counts(conn)
conn.close()
return {
"status": "healthy",
"database_path": str(get_database_path()),
"table_counts": counts,
"storage_note": "On free Hugging Face Spaces, SQLite data may reset unless persistent storage is enabled."
}
def upsert_user(
user_id: str,
email: str,
name: Optional[str] = None,
role: str = "user",
auth_provider: str = "local",
avatar_url: Optional[str] = None
) -> Dict[str, Any]:
init_product_database()
now = utc_now()
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO users (
user_id, email, name, role, auth_provider, avatar_url,
is_active, created_at, last_login_at
)
VALUES (?, ?, ?, ?, ?, ?, 1, ?, ?)
ON CONFLICT(email) DO UPDATE SET
name = excluded.name,
role = excluded.role,
auth_provider = excluded.auth_provider,
avatar_url = excluded.avatar_url,
last_login_at = excluded.last_login_at
""", (
user_id,
email,
name,
role,
auth_provider,
avatar_url,
now,
now
))
conn.commit()
cursor.execute("SELECT * FROM users WHERE email = ?", (email,))
user = row_to_dict(cursor.fetchone())
conn.close()
return user
def list_users(limit: int = 100) -> List[Dict[str, Any]]:
init_product_database()
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT user_id, email, name, role, auth_provider, is_active, created_at, last_login_at
FROM users
ORDER BY created_at DESC
LIMIT ?
""", (limit,))
users = rows_to_dicts(cursor.fetchall())
conn.close()
return users
def register_document_record(
document_id: str,
source_file_name: Optional[str] = None,
owner_user_id: Optional[str] = None,
upload_status: str = "uploaded"
) -> Dict[str, Any]:
init_product_database()
now = utc_now()
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO user_documents (
document_id, owner_user_id, source_file_name, upload_status,
index_status, graph_status, created_at
)
VALUES (?, ?, ?, ?, 'not_indexed', 'not_built', ?)
ON CONFLICT(document_id) DO UPDATE SET
source_file_name = COALESCE(excluded.source_file_name, user_documents.source_file_name),
owner_user_id = COALESCE(excluded.owner_user_id, user_documents.owner_user_id),
upload_status = excluded.upload_status
""", (
document_id,
owner_user_id,
source_file_name,
upload_status,
now
))
conn.commit()
cursor.execute("SELECT * FROM user_documents WHERE document_id = ?", (document_id,))
document = row_to_dict(cursor.fetchone())
conn.close()
return document
def update_document_index_status(
document_id: str,
index_status: str = "indexed",
chunk_count: int = 0
) -> Dict[str, Any]:
init_product_database()
now = utc_now()
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE user_documents
SET index_status = ?, chunk_count = ?, indexed_at = ?
WHERE document_id = ?
""", (
index_status,
chunk_count,
now,
document_id
))
conn.commit()
cursor.execute("SELECT * FROM user_documents WHERE document_id = ?", (document_id,))
document = row_to_dict(cursor.fetchone())
conn.close()
return document
def update_document_graph_status(
document_id: str,
graph_status: str = "built",
entity_count: int = 0,
relation_count: int = 0
) -> Dict[str, Any]:
init_product_database()
now = utc_now()
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
UPDATE user_documents
SET graph_status = ?, entity_count = ?, relation_count = ?, graph_built_at = ?
WHERE document_id = ?
""", (
graph_status,
entity_count,
relation_count,
now,
document_id
))
conn.commit()
cursor.execute("SELECT * FROM user_documents WHERE document_id = ?", (document_id,))
document = row_to_dict(cursor.fetchone())
conn.close()
return document
def list_documents(limit: int = 100) -> List[Dict[str, Any]]:
init_product_database()
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT *
FROM user_documents
ORDER BY created_at DESC
LIMIT ?
""", (limit,))
documents = rows_to_dicts(cursor.fetchall())
conn.close()
return documents
def create_conversation(
conversation_id: str,
owner_user_id: Optional[str],
document_id: Optional[str],
title: str
) -> Dict[str, Any]:
init_product_database()
now = utc_now()
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO conversations (
conversation_id, owner_user_id, document_id, title, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?)
""", (
conversation_id,
owner_user_id,
document_id,
title,
now,
now
))
conn.commit()
cursor.execute("SELECT * FROM conversations WHERE conversation_id = ?", (conversation_id,))
conversation = row_to_dict(cursor.fetchone())
conn.close()
return conversation
def list_conversations(limit: int = 100) -> List[Dict[str, Any]]:
init_product_database()
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT *
FROM conversations
ORDER BY updated_at DESC
LIMIT ?
""", (limit,))
conversations = rows_to_dicts(cursor.fetchall())
conn.close()
return conversations
def add_message(
message_id: str,
conversation_id: str,
role: str,
content: str,
metadata_json: Optional[str] = None
) -> Dict[str, Any]:
init_product_database()
now = utc_now()
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO messages (
message_id, conversation_id, role, content, created_at, metadata_json
)
VALUES (?, ?, ?, ?, ?, ?)
""", (
message_id,
conversation_id,
role,
content,
now,
metadata_json
))
cursor.execute("""
UPDATE conversations
SET updated_at = ?
WHERE conversation_id = ?
""", (
now,
conversation_id
))
conn.commit()
cursor.execute("SELECT * FROM messages WHERE message_id = ?", (message_id,))
message = row_to_dict(cursor.fetchone())
conn.close()
return message
def list_messages(conversation_id: str) -> List[Dict[str, Any]]:
init_product_database()
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT *
FROM messages
WHERE conversation_id = ?
ORDER BY created_at ASC
""", (conversation_id,))
messages = rows_to_dicts(cursor.fetchall())
conn.close()
return messages
''', encoding="utf-8")
# =====================================================
# 2. Product schemas
# =====================================================
Path("app/product/product_schema.py").write_text(r'''
from pydantic import BaseModel, Field
from typing import Optional
class CreateLocalUserRequest(BaseModel):
email: str
name: Optional[str] = None
role: str = Field(default="user")
class RegisterDocumentRequest(BaseModel):
document_id: str
source_file_name: Optional[str] = None
owner_user_id: Optional[str] = None
class CreateConversationRequest(BaseModel):
owner_user_id: Optional[str] = None
document_id: Optional[str] = None
title: str = "New chat"
class AddMessageRequest(BaseModel):
conversation_id: str
role: str = Field(..., pattern="^(user|assistant|system)$")
content: str
''', encoding="utf-8")
# =====================================================
# 3. Patch main.py
# =====================================================
main_path = Path("app/main.py")
text = main_path.read_text(encoding="utf-8-sig")
text = text.replace("\ufeff", "")
imports = '''import uuid
from app.product.product_db import (
init_product_database,
get_database_status,
upsert_user,
list_users,
register_document_record,
list_documents,
create_conversation,
list_conversations,
add_message,
list_messages
)
from app.product.product_schema import (
CreateLocalUserRequest,
RegisterDocumentRequest,
CreateConversationRequest,
AddMessageRequest
)
'''
if "from app.product.product_db import" not in text:
text = imports + text
old_phases = [
"Phase 22 - GraphRAG Demo UI Upgrade",
"Phase 21 - Deployment Sync Verification",
"Phase 20 - GraphRAG Batch Evaluation Report"
]
for old in old_phases:
text = text.replace(old, "Phase 23 - Product Database Foundation")
if "# Product database foundation endpoints" not in text:
text += '''
# Product database foundation endpoints
@app.on_event("startup")
def initialize_product_database_on_startup():
init_product_database()
@app.get("/product/db/status")
def product_database_status():
return get_database_status()
@app.post("/product/users/local")
def create_or_update_local_user(request: CreateLocalUserRequest):
user_id = "local_" + request.email.lower().replace("@", "_").replace(".", "_")
return upsert_user(
user_id=user_id,
email=request.email,
name=request.name,
role=request.role,
auth_provider="local"
)
@app.get("/product/users")
def get_product_users(limit: int = Query(100, ge=1, le=500)):
return {
"users": list_users(limit=limit)
}
@app.post("/product/documents/register")
def register_product_document(request: RegisterDocumentRequest):
return register_document_record(
document_id=request.document_id,
source_file_name=request.source_file_name,
owner_user_id=request.owner_user_id
)
@app.get("/product/documents")
def get_product_documents(limit: int = Query(100, ge=1, le=500)):
return {
"documents": list_documents(limit=limit)
}
@app.post("/product/conversations")
def create_product_conversation(request: CreateConversationRequest):
conversation_id = str(uuid.uuid4())
return create_conversation(
conversation_id=conversation_id,
owner_user_id=request.owner_user_id,
document_id=request.document_id,
title=request.title
)
@app.get("/product/conversations")
def get_product_conversations(limit: int = Query(100, ge=1, le=500)):
return {
"conversations": list_conversations(limit=limit)
}
@app.post("/product/messages")
def add_product_message(request: AddMessageRequest):
message_id = str(uuid.uuid4())
return add_message(
message_id=message_id,
conversation_id=request.conversation_id,
role=request.role,
content=request.content
)
@app.get("/product/conversations/{conversation_id}/messages")
def get_product_conversation_messages(conversation_id: str):
return {
"conversation_id": conversation_id,
"messages": list_messages(conversation_id=conversation_id)
}
'''
main_path.write_text(text, encoding="utf-8")
print("Phase 23 product database foundation added.")