GraphResearcher / scripts /phase35_document_storage_delete.py
yugbirla's picture
Add document storage status and backend delete
7be4ee1
Raw
History Blame Contribute Delete
8.54 kB
from pathlib import Path
# Clean BOM
for path in Path("app").rglob("*.py"):
text = path.read_text(encoding="utf-8-sig")
text = text.replace("\ufeff", "")
path.write_text(text, encoding="utf-8")
Path("app/product").mkdir(parents=True, exist_ok=True)
Path("app/product/document_storage_manager.py").write_text(r'''
import shutil
from pathlib import Path
from typing import Dict, Any, List
from app.core.config import settings
def as_path(value, fallback: str) -> Path:
try:
if value:
return Path(value)
except Exception:
pass
return Path(fallback)
def get_storage_paths() -> Dict[str, Path]:
upload_dir = as_path(
getattr(settings, "UPLOAD_DIR", None),
"/tmp/graphrag/uploads"
)
processed_dir = as_path(
getattr(settings, "PROCESSED_DIR", None),
"/tmp/graphrag/processed"
)
qdrant_dir = as_path(
getattr(settings, "QDRANT_LOCAL_PATH", None),
"/tmp/graphrag/qdrant"
)
evaluation_dir = as_path(
getattr(settings, "EVALUATION_DIR", None),
"/tmp/graphrag/evaluation"
)
return {
"upload_dir": upload_dir,
"processed_dir": processed_dir,
"qdrant_dir": qdrant_dir,
"evaluation_dir": evaluation_dir
}
def path_size_bytes(path: Path) -> int:
if not path.exists():
return 0
if path.is_file():
try:
return path.stat().st_size
except Exception:
return 0
total = 0
for item in path.rglob("*"):
if item.is_file():
try:
total += item.stat().st_size
except Exception:
pass
return total
def find_matching_items(root: Path, document_id: str) -> List[Dict[str, Any]]:
matches = []
if not root.exists():
return matches
for item in root.rglob("*"):
try:
item_name = item.name
item_path = str(item)
except Exception:
continue
if document_id in item_name or document_id in item_path:
matches.append({
"path": str(item),
"type": "directory" if item.is_dir() else "file",
"size_bytes": path_size_bytes(item)
})
return matches
def get_document_storage_status(document_id: str) -> Dict[str, Any]:
paths = get_storage_paths()
processed_doc_dir = paths["processed_dir"] / document_id
upload_matches = find_matching_items(paths["upload_dir"], document_id)
processed_matches = find_matching_items(paths["processed_dir"], document_id)
evaluation_matches = find_matching_items(paths["evaluation_dir"], document_id)
processed_exists = processed_doc_dir.exists()
return {
"document_id": document_id,
"storage_type": "runtime_ephemeral_storage",
"important_note": (
"On free/basic Hugging Face Spaces, files stored under /tmp can disappear "
"after rebuild, restart, or runtime reset unless persistent storage is configured."
),
"paths": {
"upload_dir": str(paths["upload_dir"]),
"processed_dir": str(paths["processed_dir"]),
"processed_document_dir": str(processed_doc_dir),
"qdrant_dir": str(paths["qdrant_dir"]),
"evaluation_dir": str(paths["evaluation_dir"])
},
"exists": {
"processed_document_dir": processed_exists,
"upload_matches_found": len(upload_matches),
"processed_matches_found": len(processed_matches),
"evaluation_matches_found": len(evaluation_matches)
},
"sizes": {
"processed_document_size_bytes": path_size_bytes(processed_doc_dir),
"upload_matches_size_bytes": sum(x["size_bytes"] for x in upload_matches),
"evaluation_matches_size_bytes": sum(x["size_bytes"] for x in evaluation_matches)
},
"matches": {
"uploads": upload_matches[:50],
"processed": processed_matches[:50],
"evaluation": evaluation_matches[:50]
},
"status": "available" if processed_exists or upload_matches or processed_matches else "missing_or_runtime_reset",
"recommendation": (
"If this says missing_or_runtime_reset but the UI still shows the document, "
"clear workspace cache and re-upload the document."
)
}
def remove_path(path: Path) -> Dict[str, Any]:
result = {
"path": str(path),
"existed": path.exists(),
"deleted": False,
"error": None
}
if not path.exists():
return result
try:
if path.is_dir():
shutil.rmtree(path)
else:
path.unlink()
result["deleted"] = True
except Exception as exc:
result["error"] = str(exc)
return result
def delete_product_db_records(document_id: str) -> Dict[str, Any]:
result = {
"attempted": True,
"deleted_records": {},
"error": None
}
try:
from app.product.product_db import get_connection, init_product_database
init_product_database()
conn = get_connection()
cur = conn.cursor()
tables = [
("messages", "conversation_id", "SELECT conversation_id FROM conversations WHERE document_id = ?"),
("conversations", "document_id", None),
("user_documents", "document_id", None)
]
# Delete messages belonging to conversations for this document
cur.execute("SELECT conversation_id FROM conversations WHERE document_id = ?", (document_id,))
conversation_ids = [row["conversation_id"] for row in cur.fetchall()]
msg_count = 0
for cid in conversation_ids:
cur.execute("DELETE FROM messages WHERE conversation_id = ?", (cid,))
msg_count += cur.rowcount
result["deleted_records"]["messages"] = msg_count
cur.execute("DELETE FROM conversations WHERE document_id = ?", (document_id,))
result["deleted_records"]["conversations"] = cur.rowcount
cur.execute("DELETE FROM user_documents WHERE document_id = ?", (document_id,))
result["deleted_records"]["user_documents"] = cur.rowcount
conn.commit()
conn.close()
except Exception as exc:
result["error"] = str(exc)
return result
def delete_document_storage(document_id: str) -> Dict[str, Any]:
before = get_document_storage_status(document_id)
paths = get_storage_paths()
deleted_items = []
processed_doc_dir = paths["processed_dir"] / document_id
deleted_items.append(remove_path(processed_doc_dir))
for match in before["matches"]["uploads"]:
deleted_items.append(remove_path(Path(match["path"])))
for match in before["matches"]["evaluation"]:
deleted_items.append(remove_path(Path(match["path"])))
db_result = delete_product_db_records(document_id)
after = get_document_storage_status(document_id)
return {
"document_id": document_id,
"status": "delete_attempt_complete",
"before": before,
"deleted_items": deleted_items,
"database_cleanup": db_result,
"after": after,
"qdrant_note": (
"This endpoint removes runtime files and DB records. "
"Vector DB point-level deletion is not attempted here unless your existing vector store exposes a safe delete method. "
"If stale vector results appear, rebuild/restart or add vector deletion in a later phase."
)
}
''', encoding="utf-8")
main_path = Path("app/main.py")
text = main_path.read_text(encoding="utf-8-sig")
text = text.replace("\ufeff", "")
if "from app.product.document_storage_manager import" not in text:
text = (
"from app.product.document_storage_manager import get_document_storage_status, delete_document_storage\n"
+ text
)
if "# Document storage status and delete endpoints" not in text:
text += '''
# Document storage status and delete endpoints
@app.get("/documents/{document_id}/storage")
def document_storage_status(document_id: str):
return get_document_storage_status(document_id)
@app.delete("/documents/{document_id}/delete")
def delete_document_runtime_storage(document_id: str):
return delete_document_storage(document_id)
'''
main_path.write_text(text, encoding="utf-8")
print("Phase 35 document storage status and backend delete added.")