GraphResearcher / app /main.py
yugbirla's picture
Add feedback collection with permanent dataset backup
e205a77
Raw
History Blame Contribute Delete
24.8 kB
from fastapi import Request, Query
from fastapi.responses import PlainTextResponse
from app.product.feedback_service import (
FeedbackRequest,
submit_feedback,
list_feedback,
export_feedback_jsonl,
feedback_status,
)
from fastapi import Request
from fastapi.responses import HTMLResponse, RedirectResponse
from app.product.secure_admin_ui import get_secure_admin_html
from app.product.auth_service import get_current_user_from_request
from app.product.admin_monitoring_service import (
get_admin_overview,
get_storage_only,
get_security_only,
get_routes_only,
)
from app.product.document_compare_service import CompareDocumentsRequest, compare_documents_with_existing_ask
from app.product.document_storage_manager import get_document_storage_status, delete_document_storage
import os
from fastapi import Request, Query
from fastapi.responses import HTMLResponse
from starlette.middleware.sessions import SessionMiddleware
from app.product.login_ui import get_login_html
from app.product.oauth_service import (
get_oauth_status,
start_google_login,
finish_google_login,
dev_session_login,
clear_session,
get_session_payload
)
from fastapi import Request, Query
from fastapi.responses import HTMLResponse
from app.product.auth_service import get_current_user_from_request, require_admin_user, dev_login_user
from app.product.admin_service import get_admin_status, get_admin_users, get_admin_documents, get_admin_conversations, get_admin_system_summary
from app.product.admin_ui import get_admin_panel_html
from app.product.source_viewer import get_source_details, get_source_html
from app.deployment.hf_status import get_home_html, get_product_app_html
from app.deployment.hf_status import get_product_app_html
import uuid
from app.product.product_db import (
init_product_database,
get_database_status,
upsert_user,
list_users,
register_document_record,
list_documents,
create_conversation,
list_conversations,
add_message,
list_messages
)
from app.product.product_schema import (
CreateLocalUserRequest,
RegisterDocumentRequest,
CreateConversationRequest,
AddMessageRequest
)
from app.evaluation.graphrag_batch_evaluator import run_graphrag_batch_evaluation
from app.evaluation.graph_fusion_evaluator import compare_graph_fusion_retrieval
from app.graph.graph_guided_retriever import graph_guided_retrieve
from app.graph.graph_context_service import build_graph_context_for_query
from app.graph.graph_visualization import get_graph_visualization_html
from app.graph.graph_builder import build_document_graph
from app.graph.graph_storage import read_document_graph
from app.graph.graph_query_service import (
list_entities,
search_entities,
get_entity_neighborhood
)
from typing import Optional
from fastapi import FastAPI, UploadFile, File, HTTPException, Query
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from app.core.config import settings
from app.schemas.query_schema import AskRequest
from app.schemas.evaluation_schema import (
RetrievalTestCaseCreate,
RetrievalEvaluationRunRequest,
AnswerTestCaseCreate,
AnswerEvaluationRunRequest
)
from app.ingestion.ingestion_service import process_uploaded_file
from app.ingestion.reprocessing_service import reprocess_document_by_id
from app.storage.status_storage import (
read_document_status,
list_document_statuses
)
from app.storage.processed_storage import (
read_processed_chunks,
read_processed_metadata
)
from app.storage.document_delete_service import delete_document_by_id
from app.retrieval.indexing_service import index_document_chunks
from app.retrieval.hybrid_search_service import retrieve_chunks
from app.generation.answer_service import answer_question
from app.generation.llm_service import get_llm_status, get_loaded_llm_info
from app.deployment.hf_status import (
get_deployment_health,
get_deployment_config,
get_demo_html, get_graphrag_demo_html
)
from app.evaluation.retrieval_eval_storage import (
load_retrieval_test_cases,
add_retrieval_test_case,
delete_retrieval_test_case
)
from app.evaluation.retrieval_evaluator import run_retrieval_evaluation
from app.evaluation.answer_eval_storage import (
load_answer_test_cases,
add_answer_test_case,
delete_answer_test_case
)
from app.evaluation.answer_evaluator import run_answer_evaluation
app = FastAPI(
title=settings.APP_NAME,
description="A production-grade multimodal GraphRAG research assistant",
version=settings.APP_VERSION
)
app.add_middleware(
SessionMiddleware,
secret_key=os.getenv('SESSION_SECRET_KEY', 'dev-change-this-session-secret'),
same_site='lax',
https_only=False
)
if settings.ENABLE_STATIC_ASSETS:
app.mount(
"/processed-assets",
StaticFiles(directory=str(settings.PROCESSED_DIR)),
name="processed-assets"
)
@app.get("/", response_class=HTMLResponse)
def home_page():
return get_home_html()
@app.get("/llm/status")
def llm_status():
return get_llm_status()
@app.get("/llm/load-test")
def llm_load_test():
return get_loaded_llm_info()
@app.post("/upload")
async def upload_document(file: UploadFile = File(...)):
return await process_uploaded_file(file)
@app.get("/documents")
def list_documents():
documents = list_document_statuses()
return {
"total_documents": len(documents),
"documents": documents
}
@app.get("/documents/{document_id}/status")
def get_document_status(document_id: str):
status = read_document_status(document_id)
if status is None:
raise HTTPException(
status_code=404,
detail="Document status not found."
)
return status
@app.get("/documents/{document_id}/chunks")
def get_document_chunks(
document_id: str,
limit: int = Query(20, ge=1, le=100),
offset: int = Query(0, ge=0),
content_type: Optional[str] = None
):
chunks = read_processed_chunks(document_id)
metadata = read_processed_metadata(document_id)
if chunks is None:
raise HTTPException(
status_code=404,
detail="Chunks not found for this document."
)
if content_type is not None:
chunks = [
chunk for chunk in chunks
if chunk.content_type == content_type
]
total_chunks = len(chunks)
paginated_chunks = chunks[offset: offset + limit]
return {
"document_id": document_id,
"metadata": metadata,
"total_chunks": total_chunks,
"returned_chunks": len(paginated_chunks),
"offset": offset,
"limit": limit,
"content_type_filter": content_type,
"chunks": paginated_chunks
}
@app.post("/documents/{document_id}/index")
def index_document(document_id: str):
result = index_document_chunks(document_id)
if result["status"] == "failed":
raise HTTPException(
status_code=400,
detail=result["message"]
)
return result
@app.get("/search")
def search_documents(
query: str = Query(..., min_length=1),
document_id: Optional[str] = None,
top_k: int = Query(5, ge=1, le=20),
retrieval_mode: str = Query("hybrid")
):
return retrieve_chunks(
query=query,
document_id=document_id,
top_k=top_k,
retrieval_mode=retrieval_mode
)
@app.post("/ask")
def ask_question(request: AskRequest):
return answer_question(
query=request.query,
document_id=request.document_id,
top_k=request.top_k,
retrieval_mode=request.retrieval_mode,
use_reranker=request.use_reranker,
use_llm=request.use_llm,
use_graph=request.use_graph,
graph_entity_limit=request.graph_entity_limit,
use_graph_retrieval=request.use_graph_retrieval,
graph_retrieval_top_k=request.graph_retrieval_top_k
)
@app.get("/evaluation/retrieval/test-cases")
def list_retrieval_test_cases():
test_cases = load_retrieval_test_cases()
return {
"total_test_cases": len(test_cases),
"test_cases": test_cases
}
@app.post("/evaluation/retrieval/test-cases")
def create_retrieval_test_case(test_case: RetrievalTestCaseCreate):
created_test_case = add_retrieval_test_case(test_case)
return {
"status": "success",
"message": "Retrieval test case created.",
"test_case": created_test_case
}
@app.delete("/evaluation/retrieval/test-cases/{test_case_id}")
def remove_retrieval_test_case(test_case_id: str):
deleted = delete_retrieval_test_case(test_case_id)
if not deleted:
raise HTTPException(
status_code=404,
detail="Retrieval test case not found."
)
return {
"status": "success",
"message": "Retrieval test case deleted.",
"test_case_id": test_case_id
}
@app.post("/evaluation/retrieval/run")
def run_retrieval_eval(request: RetrievalEvaluationRunRequest):
return run_retrieval_evaluation(request)
@app.get("/evaluation/answer/test-cases")
def list_answer_test_cases():
test_cases = load_answer_test_cases()
return {
"total_test_cases": len(test_cases),
"test_cases": test_cases
}
@app.post("/evaluation/answer/test-cases")
def create_answer_test_case(test_case: AnswerTestCaseCreate):
created_test_case = add_answer_test_case(test_case)
return {
"status": "success",
"message": "Answer test case created.",
"test_case": created_test_case
}
@app.delete("/evaluation/answer/test-cases/{test_case_id}")
def remove_answer_test_case(test_case_id: str):
deleted = delete_answer_test_case(test_case_id)
if not deleted:
raise HTTPException(
status_code=404,
detail="Answer test case not found."
)
return {
"status": "success",
"message": "Answer test case deleted.",
"test_case_id": test_case_id
}
@app.post("/evaluation/answer/run")
def run_answer_eval(request: AnswerEvaluationRunRequest):
return run_answer_evaluation(request)
@app.post("/documents/{document_id}/reprocess")
def reprocess_document(document_id: str):
try:
result = reprocess_document_by_id(document_id)
except FileNotFoundError as error:
raise HTTPException(
status_code=404,
detail=str(error)
)
except Exception as error:
raise HTTPException(
status_code=500,
detail=f"Document re-processing failed: {str(error)}"
)
if result is None:
raise HTTPException(
status_code=404,
detail="Document not found."
)
return result
@app.delete("/documents/{document_id}")
def delete_document(document_id: str):
deletion_result = delete_document_by_id(document_id)
if deletion_result is None:
raise HTTPException(
status_code=404,
detail="Document not found."
)
return {
"status": "success",
"message": "Document deleted successfully.",
"deletion_result": deletion_result
}
# Hugging Face deployment endpoints
@app.get("/deployment/health")
def deployment_health():
return get_deployment_health()
@app.get("/deployment/config")
def deployment_config():
return get_deployment_config()
@app.get("/demo", response_class=HTMLResponse)
def demo_page():
return get_demo_html()
# Graph foundation endpoints
@app.post("/documents/{document_id}/graph/build")
def build_graph_for_document(document_id: str):
result = build_document_graph(document_id)
if result.get("status") == "failed":
raise HTTPException(
status_code=400,
detail=result.get("message", "Graph build failed.")
)
return result
@app.get("/documents/{document_id}/graph")
def get_document_graph(document_id: str):
graph = read_document_graph(document_id)
if graph is None:
raise HTTPException(
status_code=404,
detail="Graph not found. Build the graph first."
)
return graph
@app.get("/documents/{document_id}/graph/entities")
def get_graph_entities(
document_id: str,
limit: int = Query(50, ge=1, le=500),
entity_type: Optional[str] = None
):
result = list_entities(
document_id=document_id,
limit=limit,
entity_type=entity_type
)
if result.get("status") == "failed":
raise HTTPException(
status_code=404,
detail=result.get("message")
)
return result
@app.get("/documents/{document_id}/graph/search")
def search_graph_entities(
document_id: str,
query: str = Query(..., min_length=1),
limit: int = Query(20, ge=1, le=100)
):
result = search_entities(
document_id=document_id,
query=query,
limit=limit
)
if result.get("status") == "failed":
raise HTTPException(
status_code=404,
detail=result.get("message")
)
return result
@app.get("/documents/{document_id}/graph/neighborhood")
def get_graph_neighborhood(
document_id: str,
entity: str = Query(..., min_length=1),
limit: int = Query(50, ge=1, le=200)
):
result = get_entity_neighborhood(
document_id=document_id,
entity=entity,
limit=limit
)
if result.get("status") == "failed":
raise HTTPException(
status_code=404,
detail=result.get("message")
)
return result
# Graph visualization endpoint
@app.get("/documents/{document_id}/graph/view", response_class=HTMLResponse)
def view_document_graph(document_id: str):
return get_graph_visualization_html(document_id)
# Graph context debug endpoint
@app.get("/documents/{document_id}/graph/context")
def get_graph_context_for_question(
document_id: str,
query: str = Query(..., min_length=1),
limit: int = Query(8, ge=1, le=30)
):
return build_graph_context_for_query(
document_id=document_id,
query=query,
limit=limit
)
# Graph-guided retrieval endpoint
@app.get("/documents/{document_id}/graph/retrieve")
def graph_guided_retrieval_endpoint(
document_id: str,
query: str = Query(..., min_length=1),
graph_entity_limit: int = Query(8, ge=1, le=30),
top_k: int = Query(5, ge=1, le=20)
):
result = graph_guided_retrieve(
document_id=document_id,
query=query,
graph_entity_limit=graph_entity_limit,
top_k=top_k
)
if result.get("status") == "failed":
raise HTTPException(
status_code=400,
detail=result
)
return result
# GraphRAG fusion evaluation endpoint
@app.get("/documents/{document_id}/evaluation/graph-fusion")
def evaluate_graph_fusion_for_document(
document_id: str,
query: str = Query(..., min_length=1),
top_k: int = Query(5, ge=1, le=20),
retrieval_mode: str = Query("hybrid"),
use_reranker: bool = True,
graph_entity_limit: int = Query(8, ge=1, le=30),
graph_retrieval_top_k: int = Query(5, ge=1, le=20)
):
return compare_graph_fusion_retrieval(
document_id=document_id,
query=query,
top_k=top_k,
retrieval_mode=retrieval_mode,
use_reranker=use_reranker,
graph_entity_limit=graph_entity_limit,
graph_retrieval_top_k=graph_retrieval_top_k
)
# GraphRAG batch evaluation endpoint
@app.get("/documents/{document_id}/evaluation/graph-fusion/batch")
def evaluate_graph_fusion_batch_for_document(
document_id: str,
custom_queries: Optional[str] = None,
top_k: int = Query(5, ge=1, le=20),
retrieval_mode: str = Query("hybrid"),
use_reranker: bool = True,
graph_entity_limit: int = Query(8, ge=1, le=30),
graph_retrieval_top_k: int = Query(5, ge=1, le=20),
compact: bool = True
):
return run_graphrag_batch_evaluation(
document_id=document_id,
custom_queries=custom_queries,
top_k=top_k,
retrieval_mode=retrieval_mode,
use_reranker=use_reranker,
graph_entity_limit=graph_entity_limit,
graph_retrieval_top_k=graph_retrieval_top_k,
compact=compact
)
# GraphRAG demo UI endpoint
@app.get("/graphrag-demo", response_class=HTMLResponse)
def graphrag_demo_page():
return get_graphrag_demo_html()
# Product database foundation endpoints
@app.on_event("startup")
def initialize_product_database_on_startup():
init_product_database()
@app.get("/product/db/status")
def product_database_status():
return get_database_status()
@app.post("/product/users/local")
def create_or_update_local_user(request: CreateLocalUserRequest):
user_id = "local_" + request.email.lower().replace("@", "_").replace(".", "_")
return upsert_user(
user_id=user_id,
email=request.email,
name=request.name,
role=request.role,
auth_provider="local"
)
@app.get("/product/users")
def get_product_users(limit: int = Query(100, ge=1, le=500)):
return {
"users": list_users(limit=limit)
}
@app.post("/product/documents/register")
def register_product_document(request: RegisterDocumentRequest):
return register_document_record(
document_id=request.document_id,
source_file_name=request.source_file_name,
owner_user_id=request.owner_user_id
)
@app.get("/product/documents")
def get_product_documents(limit: int = Query(100, ge=1, le=500)):
return {
"documents": list_documents(limit=limit)
}
@app.post("/product/conversations")
def create_product_conversation(request: CreateConversationRequest):
conversation_id = str(uuid.uuid4())
return create_conversation(
conversation_id=conversation_id,
owner_user_id=request.owner_user_id,
document_id=request.document_id,
title=request.title
)
@app.get("/product/conversations")
def get_product_conversations(limit: int = Query(100, ge=1, le=500)):
return {
"conversations": list_conversations(limit=limit)
}
@app.post("/product/messages")
def add_product_message(request: AddMessageRequest):
message_id = str(uuid.uuid4())
return add_message(
message_id=message_id,
conversation_id=request.conversation_id,
role=request.role,
content=request.content
)
@app.get("/product/conversations/{conversation_id}/messages")
def get_product_conversation_messages(conversation_id: str):
return {
"conversation_id": conversation_id,
"messages": list_messages(conversation_id=conversation_id)
}
# Product ChatGPT-style app UI endpoint
@app.get("/app", response_class=HTMLResponse)
def product_app_page():
return get_product_app_html()
# Improved product workspace app endpoint
@app.get("/app", response_class=HTMLResponse)
def product_workspace_app_page():
return get_product_app_html()
# Source viewer endpoints
@app.get("/documents/{document_id}/sources/{source_id}")
def document_source_details(
document_id: str,
source_id: str,
page: str = "",
chunk_id: str = ""
):
return get_source_details(
document_id=document_id,
source_id=source_id,
page=page,
chunk_id=chunk_id
)
@app.get("/documents/{document_id}/sources/{source_id}/view", response_class=HTMLResponse)
def document_source_view(
document_id: str,
source_id: str,
page: str = "",
chunk_id: str = ""
):
return get_source_html(
document_id=document_id,
source_id=source_id,
page=page,
chunk_id=chunk_id
)
# Auth foundation endpoints
@app.get("/auth/me")
def auth_me(request: Request):
return get_current_user_from_request(request)
@app.get("/auth/dev-login")
def auth_dev_login(
email: str = Query(..., min_length=3),
name: str = Query(None)
):
return dev_login_user(email=email, name=name)
# Hidden admin panel UI
@app.get("/admin", response_class=HTMLResponse)
def admin_panel_page():
return get_admin_panel_html()
# Admin API endpoints
@app.get("/admin/status")
def admin_status(request: Request):
current_admin = require_admin_user(request)
return get_admin_status(current_admin=current_admin)
@app.get("/admin/users")
def admin_users(
request: Request,
limit: int = Query(100, ge=1, le=500)
):
require_admin_user(request)
return get_admin_users(limit=limit)
@app.get("/admin/documents")
def admin_documents(
request: Request,
limit: int = Query(100, ge=1, le=500)
):
require_admin_user(request)
return get_admin_documents(limit=limit)
@app.get("/admin/conversations")
def admin_conversations(
request: Request,
limit: int = Query(100, ge=1, le=500)
):
require_admin_user(request)
return get_admin_conversations(limit=limit)
@app.get("/admin/system")
def admin_system(request: Request):
require_admin_user(request)
return get_admin_system_summary()
# OAuth login endpoints
@app.get("/login", response_class=HTMLResponse)
def login_page():
return get_login_html()
@app.get("/auth/oauth-status")
def auth_oauth_status():
return get_oauth_status()
@app.get("/auth/session")
def auth_session(request: Request):
return get_session_payload(request)
@app.get("/auth/google/login")
async def auth_google_login(request: Request):
return await start_google_login(request)
@app.get("/auth/google/callback")
async def auth_google_callback(request: Request):
return await finish_google_login(request)
@app.get("/auth/dev-session")
def auth_dev_session(
request: Request,
email: str = Query(..., min_length=3),
name: str = Query(None)
):
return dev_session_login(
request=request,
email=email,
name=name
)
@app.get("/auth/logout")
def auth_logout(request: Request):
return clear_session(request)
# Document storage status and delete endpoints
@app.get("/documents/{document_id}/storage")
def document_storage_status(document_id: str):
return get_document_storage_status(document_id)
@app.delete("/documents/{document_id}/delete")
def delete_document_runtime_storage(document_id: str):
return delete_document_storage(document_id)
# Backend document comparison endpoint
@app.post("/documents/compare")
async def compare_two_documents(request: CompareDocumentsRequest):
return await compare_documents_with_existing_ask(
app=app,
request=request
)
# Secure admin monitoring dashboard
@app.get("/admin/secure", response_class=HTMLResponse)
def secure_admin_dashboard(request: Request):
user = get_current_user_from_request(request)
if not user.get("authenticated") or user.get("role") != "admin":
return RedirectResponse(url="/login", status_code=302)
return get_secure_admin_html()
@app.get("/admin/api/monitor/overview")
def admin_monitor_overview(request: Request):
return get_admin_overview(request=request, app=app)
@app.get("/admin/api/monitor/storage")
def admin_monitor_storage(request: Request):
return get_storage_only(request=request)
@app.get("/admin/api/monitor/security")
def admin_monitor_security(request: Request):
return get_security_only(request=request)
@app.get("/admin/api/monitor/routes")
def admin_monitor_routes(request: Request):
return get_routes_only(request=request, app=app)
# Feedback endpoints
@app.post("/feedback")
def submit_user_feedback(payload: FeedbackRequest, request: Request):
return submit_feedback(payload=payload, request=request)
@app.get("/feedback/status")
def get_feedback_status():
return feedback_status()
@app.get("/admin/api/feedback")
def admin_list_feedback(request: Request, limit: int = Query(100, ge=1, le=500)):
try:
from app.product.admin_monitoring_service import require_secure_admin
require_secure_admin(request)
except Exception:
try:
from app.product.auth_service import require_admin_user
require_admin_user(request)
except Exception as exc:
raise exc
return list_feedback(limit=limit)
@app.get("/admin/api/feedback/export", response_class=PlainTextResponse)
def admin_export_feedback(request: Request):
try:
from app.product.admin_monitoring_service import require_secure_admin
require_secure_admin(request)
except Exception:
try:
from app.product.auth_service import require_admin_user
require_admin_user(request)
except Exception as exc:
raise exc
return export_feedback_jsonl()