Doc_chat / app.py
PRSHNTKUMR's picture
Create app.py
4a04295 verified
import os
import io
import json
import base64
from fastapi import FastAPI, HTTPException, Header, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional
import anthropic
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
app = FastAPI(title="Dr. Gini DocRAG Service")
# CORS - Allow your frontend domains
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://your-frontend.netlify.app",
"https://your-space.hf.space",
"http://localhost:3000",
"http://localhost:5173",
"*" # Remove in production, use specific domains
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Simple API key auth (optional but recommended)
API_KEY = os.environ.get("DOCRAG_API_KEY", "")
def verify_api_key(x_api_key: str = Header(None, alias="X-API-Key")):
"""Verify API key if configured"""
if API_KEY and x_api_key != API_KEY:
raise HTTPException(status_code=401, detail="Invalid API key")
return True
# Initialize Claude client
claude_client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
# Google Drive Service Account
def get_drive_service():
"""Initialize Google Drive service with service account"""
service_account_info = json.loads(os.environ.get("GOOGLE_SERVICE_ACCOUNT", "{}"))
if not service_account_info:
raise HTTPException(status_code=500, detail="Google Service Account not configured")
credentials = service_account.Credentials.from_service_account_info(
service_account_info,
scopes=['https://www.googleapis.com/auth/drive.readonly']
)
return build('drive', 'v3', credentials=credentials)
# ============ Request/Response Models ============
class Document(BaseModel):
driveFileId: str
fileName: str
mimeType: str
class DocRAGRequest(BaseModel):
userId: str
sessionId: str
query: str
selectedDocs: List[Document]
class DocRAGResponse(BaseModel):
success: bool
query: str
answer: Optional[str] = None
documentsUsed: List[str] = []
error: Optional[str] = None
# ============ Helper Functions ============
def download_from_drive(drive_service, file_id: str, file_name: str) -> bytes:
"""Download file from Google Drive using service account"""
try:
request = drive_service.files().get_media(fileId=file_id)
file_buffer = io.BytesIO()
downloader = MediaIoBaseDownload(file_buffer, request)
done = False
while not done:
status, done = downloader.next_chunk()
file_buffer.seek(0)
return file_buffer.read()
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Failed to download {file_name}: {str(e)}"
)
def get_claude_media_type(mime_type: str) -> tuple[str, str]:
"""Map MIME type to Claude's supported types"""
if mime_type == "application/pdf":
return "document", "application/pdf"
if mime_type in ["image/jpeg", "image/png", "image/gif", "image/webp"]:
return "image", mime_type
if mime_type in ["text/plain", "text/csv", "text/html", "text/markdown",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document"]:
return "text", mime_type
return "document", "application/pdf"
# ============ API Endpoints ============
@app.post("/docrag", response_model=DocRAGResponse)
async def chat_with_documents(
request: DocRAGRequest,
authenticated: bool = Depends(verify_api_key)
):
"""Chat with uploaded documents using Claude"""
try:
drive_service = get_drive_service()
# Download all selected documents
documents_content = []
documents_used = []
for doc in request.selectedDocs:
try:
file_bytes = download_from_drive(
drive_service,
doc.driveFileId,
doc.fileName
)
documents_content.append({
"fileName": doc.fileName,
"content": file_bytes,
"mimeType": doc.mimeType
})
documents_used.append(doc.fileName)
print(f"✓ Downloaded: {doc.fileName}")
except Exception as e:
print(f"✗ Error downloading {doc.fileName}: {e}")
continue
if not documents_content:
return DocRAGResponse(
success=False,
query=request.query,
error="Could not download any documents. Check if folder is shared with service account."
)
# Build Claude message
content = []
for doc in documents_content:
content_type, media_type = get_claude_media_type(doc["mimeType"])
if content_type == "document":
content.append({
"type": "document",
"source": {
"type": "base64",
"media_type": media_type,
"data": base64.b64encode(doc["content"]).decode("utf-8")
}
})
elif content_type == "image":
content.append({
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": base64.b64encode(doc["content"]).decode("utf-8")
}
})
else:
try:
text_content = doc["content"].decode("utf-8")
content.append({
"type": "text",
"text": f"=== Document: {doc['fileName']} ===\n\n{text_content}\n\n=== End ==="
})
except UnicodeDecodeError:
continue
# Add query
content.append({
"type": "text",
"text": request.query
})
# Call Claude
response = claude_client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
system="""You are Dr. Gini, a research copilot for drug discovery and pharmaceutical research.
When answering:
- Be precise and cite specific sections when relevant
- If information is not in the documents, say so clearly
- For multiple documents, compare and synthesize across them
- Use scientific terminology appropriately
- Highlight key findings, methods, and limitations""",
messages=[{"role": "user", "content": content}]
)
return DocRAGResponse(
success=True,
query=request.query,
answer=response.content[0].text,
documentsUsed=documents_used
)
except anthropic.APIError as e:
return DocRAGResponse(
success=False,
query=request.query,
error=f"Claude API error: {str(e)}"
)
except Exception as e:
import traceback
traceback.print_exc()
return DocRAGResponse(
success=False,
query=request.query,
error=f"Error: {str(e)}"
)
@app.get("/health")
async def health_check():
return {"status": "healthy", "service": "Dr. Gini DocRAG"}
@app.get("/test-drive")
async def test_drive_connection():
"""Test Google Drive connection"""
try:
drive_service = get_drive_service()
results = drive_service.files().list(
pageSize=5,
fields="files(id, name)"
).execute()
files = results.get('files', [])
return {
"status": "connected",
"files_visible": len(files),
"sample_files": [f["name"] for f in files[:5]]
}
except Exception as e:
return {"status": "error", "error": str(e)}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)