import os import io import json import base64 from fastapi import FastAPI, HTTPException, Header, Depends from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List, Optional import anthropic from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseDownload app = FastAPI(title="Dr. Gini DocRAG Service") # CORS - Allow your frontend domains app.add_middleware( CORSMiddleware, allow_origins=[ "https://your-frontend.netlify.app", "https://your-space.hf.space", "http://localhost:3000", "http://localhost:5173", "*" # Remove in production, use specific domains ], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Simple API key auth (optional but recommended) API_KEY = os.environ.get("DOCRAG_API_KEY", "") def verify_api_key(x_api_key: str = Header(None, alias="X-API-Key")): """Verify API key if configured""" if API_KEY and x_api_key != API_KEY: raise HTTPException(status_code=401, detail="Invalid API key") return True # Initialize Claude client claude_client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY")) # Google Drive Service Account def get_drive_service(): """Initialize Google Drive service with service account""" service_account_info = json.loads(os.environ.get("GOOGLE_SERVICE_ACCOUNT", "{}")) if not service_account_info: raise HTTPException(status_code=500, detail="Google Service Account not configured") credentials = service_account.Credentials.from_service_account_info( service_account_info, scopes=['https://www.googleapis.com/auth/drive.readonly'] ) return build('drive', 'v3', credentials=credentials) # ============ Request/Response Models ============ class Document(BaseModel): driveFileId: str fileName: str mimeType: str class DocRAGRequest(BaseModel): userId: str sessionId: str query: str selectedDocs: List[Document] class DocRAGResponse(BaseModel): success: bool query: str answer: Optional[str] = None documentsUsed: List[str] = [] error: Optional[str] = None # ============ Helper Functions ============ def download_from_drive(drive_service, file_id: str, file_name: str) -> bytes: """Download file from Google Drive using service account""" try: request = drive_service.files().get_media(fileId=file_id) file_buffer = io.BytesIO() downloader = MediaIoBaseDownload(file_buffer, request) done = False while not done: status, done = downloader.next_chunk() file_buffer.seek(0) return file_buffer.read() except Exception as e: raise HTTPException( status_code=400, detail=f"Failed to download {file_name}: {str(e)}" ) def get_claude_media_type(mime_type: str) -> tuple[str, str]: """Map MIME type to Claude's supported types""" if mime_type == "application/pdf": return "document", "application/pdf" if mime_type in ["image/jpeg", "image/png", "image/gif", "image/webp"]: return "image", mime_type if mime_type in ["text/plain", "text/csv", "text/html", "text/markdown", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"]: return "text", mime_type return "document", "application/pdf" # ============ API Endpoints ============ @app.post("/docrag", response_model=DocRAGResponse) async def chat_with_documents( request: DocRAGRequest, authenticated: bool = Depends(verify_api_key) ): """Chat with uploaded documents using Claude""" try: drive_service = get_drive_service() # Download all selected documents documents_content = [] documents_used = [] for doc in request.selectedDocs: try: file_bytes = download_from_drive( drive_service, doc.driveFileId, doc.fileName ) documents_content.append({ "fileName": doc.fileName, "content": file_bytes, "mimeType": doc.mimeType }) documents_used.append(doc.fileName) print(f"✓ Downloaded: {doc.fileName}") except Exception as e: print(f"✗ Error downloading {doc.fileName}: {e}") continue if not documents_content: return DocRAGResponse( success=False, query=request.query, error="Could not download any documents. Check if folder is shared with service account." ) # Build Claude message content = [] for doc in documents_content: content_type, media_type = get_claude_media_type(doc["mimeType"]) if content_type == "document": content.append({ "type": "document", "source": { "type": "base64", "media_type": media_type, "data": base64.b64encode(doc["content"]).decode("utf-8") } }) elif content_type == "image": content.append({ "type": "image", "source": { "type": "base64", "media_type": media_type, "data": base64.b64encode(doc["content"]).decode("utf-8") } }) else: try: text_content = doc["content"].decode("utf-8") content.append({ "type": "text", "text": f"=== Document: {doc['fileName']} ===\n\n{text_content}\n\n=== End ===" }) except UnicodeDecodeError: continue # Add query content.append({ "type": "text", "text": request.query }) # Call Claude response = claude_client.messages.create( model="claude-sonnet-4-20250514", max_tokens=4096, system="""You are Dr. Gini, a research copilot for drug discovery and pharmaceutical research. When answering: - Be precise and cite specific sections when relevant - If information is not in the documents, say so clearly - For multiple documents, compare and synthesize across them - Use scientific terminology appropriately - Highlight key findings, methods, and limitations""", messages=[{"role": "user", "content": content}] ) return DocRAGResponse( success=True, query=request.query, answer=response.content[0].text, documentsUsed=documents_used ) except anthropic.APIError as e: return DocRAGResponse( success=False, query=request.query, error=f"Claude API error: {str(e)}" ) except Exception as e: import traceback traceback.print_exc() return DocRAGResponse( success=False, query=request.query, error=f"Error: {str(e)}" ) @app.get("/health") async def health_check(): return {"status": "healthy", "service": "Dr. Gini DocRAG"} @app.get("/test-drive") async def test_drive_connection(): """Test Google Drive connection""" try: drive_service = get_drive_service() results = drive_service.files().list( pageSize=5, fields="files(id, name)" ).execute() files = results.get('files', []) return { "status": "connected", "files_visible": len(files), "sample_files": [f["name"] for f in files[:5]] } except Exception as e: return {"status": "error", "error": str(e)} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)