|
|
from fastapi import FastAPI, HTTPException |
|
|
from pydantic import BaseModel |
|
|
from typing import List, Optional |
|
|
import requests |
|
|
import base64 |
|
|
import asyncio |
|
|
import aiohttp |
|
|
import uvicorn |
|
|
|
|
|
app = FastAPI(title="Molecular Structure API", version="1.1.0", description="API for Dr. Gini - Fetch molecular structure images") |
|
|
|
|
|
|
|
|
|
|
|
class CompoundRequest(BaseModel): |
|
|
compound_name: str |
|
|
|
|
|
class BatchCompoundRequest(BaseModel): |
|
|
compound_names: List[str] |
|
|
|
|
|
class MolecularResponse(BaseModel): |
|
|
success: bool |
|
|
image_url: str = None |
|
|
metadata: dict = None |
|
|
size: int = None |
|
|
error: str = None |
|
|
|
|
|
class MolecularResult(BaseModel): |
|
|
compound: str |
|
|
success: bool |
|
|
image_url: Optional[str] = None |
|
|
metadata: Optional[dict] = None |
|
|
size: Optional[int] = None |
|
|
error: Optional[str] = None |
|
|
|
|
|
class BatchMolecularResponse(BaseModel): |
|
|
success: bool |
|
|
count: int |
|
|
results: List[MolecularResult] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
return {"message": "Molecular Structure API", "docs": "/docs"} |
|
|
|
|
|
|
|
|
@app.post("/molecular-structure", response_model=MolecularResponse) |
|
|
async def get_molecular_structure(request: CompoundRequest): |
|
|
"""Get molecular structure image for a chemical compound""" |
|
|
|
|
|
compound_name = request.compound_name.strip() |
|
|
if not compound_name: |
|
|
raise HTTPException(status_code=400, detail="Compound name is required") |
|
|
|
|
|
try: |
|
|
print(f"Processing: {compound_name}") |
|
|
|
|
|
|
|
|
inchikey_url = f"https://cactus.nci.nih.gov/chemical/structure/{compound_name}/stdinchikey" |
|
|
response = requests.get(inchikey_url, timeout=10) |
|
|
|
|
|
if response.status_code == 200 and response.text.strip(): |
|
|
inchikey = response.text.strip().replace('InChIKey=', '') |
|
|
print(f"Found InChIKey: {inchikey}") |
|
|
|
|
|
|
|
|
cid_url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/inchikey/{inchikey}/cids/JSON" |
|
|
cid_response = requests.get(cid_url, timeout=10) |
|
|
|
|
|
if cid_response.status_code == 200: |
|
|
cid_data = cid_response.json() |
|
|
if 'IdentifierList' in cid_data and 'CID' in cid_data['IdentifierList']: |
|
|
cid = cid_data['IdentifierList']['CID'][0] |
|
|
print(f"Found CID: {cid}") |
|
|
|
|
|
|
|
|
image_url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/cid/{cid}/PNG" |
|
|
img_response = requests.get(image_url, timeout=15) |
|
|
|
|
|
if img_response.status_code == 200 and len(img_response.content) > 1000: |
|
|
base64_data = base64.b64encode(img_response.content).decode() |
|
|
data_url = f"data:image/png;base64,{base64_data}" |
|
|
|
|
|
return MolecularResponse( |
|
|
success=True, |
|
|
image_url=data_url, |
|
|
metadata={ |
|
|
"compound": compound_name, |
|
|
"cid": cid, |
|
|
"inchikey": inchikey, |
|
|
"source": "PubChem" |
|
|
}, |
|
|
size=len(img_response.content) |
|
|
) |
|
|
|
|
|
|
|
|
print("Trying NIH fallback...") |
|
|
nih_url = f"https://cactus.nci.nih.gov/chemical/structure/{compound_name}/image" |
|
|
nih_response = requests.get(nih_url, timeout=15) |
|
|
|
|
|
if nih_response.status_code == 200 and len(nih_response.content) > 1000: |
|
|
base64_data = base64.b64encode(nih_response.content).decode() |
|
|
data_url = f"data:image/png;base64,{base64_data}" |
|
|
|
|
|
return MolecularResponse( |
|
|
success=True, |
|
|
image_url=data_url, |
|
|
metadata={ |
|
|
"compound": compound_name, |
|
|
"source": "NIH Direct" |
|
|
}, |
|
|
size=len(nih_response.content) |
|
|
) |
|
|
|
|
|
return MolecularResponse( |
|
|
success=False, |
|
|
error="No molecular structure found" |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error: {str(e)}") |
|
|
return MolecularResponse( |
|
|
success=False, |
|
|
error=str(e) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def fetch_structure_async(session: aiohttp.ClientSession, compound_name: str) -> MolecularResult: |
|
|
"""Async function to fetch a single compound's structure (for batch processing)""" |
|
|
compound_name = compound_name.strip() |
|
|
|
|
|
if not compound_name: |
|
|
return MolecularResult(compound=compound_name, success=False, error="Empty compound name") |
|
|
|
|
|
try: |
|
|
print(f"[Batch] Processing: {compound_name}") |
|
|
|
|
|
|
|
|
inchikey_url = f"https://cactus.nci.nih.gov/chemical/structure/{compound_name}/stdinchikey" |
|
|
async with session.get(inchikey_url, timeout=aiohttp.ClientTimeout(total=10)) as response: |
|
|
if response.status == 200: |
|
|
text = await response.text() |
|
|
if text.strip(): |
|
|
inchikey = text.strip().replace('InChIKey=', '') |
|
|
print(f"[Batch] Found InChIKey for {compound_name}: {inchikey}") |
|
|
|
|
|
|
|
|
cid_url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/inchikey/{inchikey}/cids/JSON" |
|
|
async with session.get(cid_url, timeout=aiohttp.ClientTimeout(total=10)) as cid_response: |
|
|
if cid_response.status == 200: |
|
|
cid_data = await cid_response.json() |
|
|
if 'IdentifierList' in cid_data and 'CID' in cid_data['IdentifierList']: |
|
|
cid = cid_data['IdentifierList']['CID'][0] |
|
|
print(f"[Batch] Found CID for {compound_name}: {cid}") |
|
|
|
|
|
|
|
|
image_url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/cid/{cid}/PNG" |
|
|
async with session.get(image_url, timeout=aiohttp.ClientTimeout(total=15)) as img_response: |
|
|
if img_response.status == 200: |
|
|
content = await img_response.read() |
|
|
if len(content) > 1000: |
|
|
base64_data = base64.b64encode(content).decode() |
|
|
return MolecularResult( |
|
|
compound=compound_name, |
|
|
success=True, |
|
|
image_url=f"data:image/png;base64,{base64_data}", |
|
|
metadata={ |
|
|
"compound": compound_name, |
|
|
"cid": cid, |
|
|
"inchikey": inchikey, |
|
|
"source": "PubChem" |
|
|
}, |
|
|
size=len(content) |
|
|
) |
|
|
|
|
|
|
|
|
print(f"[Batch] Trying NIH fallback for {compound_name}...") |
|
|
nih_url = f"https://cactus.nci.nih.gov/chemical/structure/{compound_name}/image" |
|
|
async with session.get(nih_url, timeout=aiohttp.ClientTimeout(total=15)) as nih_response: |
|
|
if nih_response.status == 200: |
|
|
content = await nih_response.read() |
|
|
if len(content) > 1000: |
|
|
base64_data = base64.b64encode(content).decode() |
|
|
return MolecularResult( |
|
|
compound=compound_name, |
|
|
success=True, |
|
|
image_url=f"data:image/png;base64,{base64_data}", |
|
|
metadata={ |
|
|
"compound": compound_name, |
|
|
"source": "NIH Direct" |
|
|
}, |
|
|
size=len(content) |
|
|
) |
|
|
|
|
|
return MolecularResult( |
|
|
compound=compound_name, |
|
|
success=False, |
|
|
error=f"No molecular structure found for '{compound_name}'" |
|
|
) |
|
|
|
|
|
except asyncio.TimeoutError: |
|
|
print(f"[Batch] Timeout for {compound_name}") |
|
|
return MolecularResult(compound=compound_name, success=False, error="Request timeout") |
|
|
except Exception as e: |
|
|
print(f"[Batch] Error for {compound_name}: {str(e)}") |
|
|
return MolecularResult(compound=compound_name, success=False, error=str(e)) |
|
|
|
|
|
|
|
|
@app.post("/molecular-structure/batch", response_model=BatchMolecularResponse) |
|
|
async def get_batch_molecular_structures(request: BatchCompoundRequest): |
|
|
"""Get molecular structures for multiple compounds in parallel""" |
|
|
|
|
|
if not request.compound_names: |
|
|
raise HTTPException(status_code=400, detail="At least one compound name required") |
|
|
|
|
|
|
|
|
if len(request.compound_names) > 10: |
|
|
raise HTTPException(status_code=400, detail="Maximum 10 compounds per request") |
|
|
|
|
|
|
|
|
unique_compounds = list(dict.fromkeys(request.compound_names)) |
|
|
print(f"[Batch] Processing {len(unique_compounds)} compounds: {unique_compounds}") |
|
|
|
|
|
async with aiohttp.ClientSession() as session: |
|
|
tasks = [fetch_structure_async(session, name) for name in unique_compounds] |
|
|
results = await asyncio.gather(*tasks) |
|
|
|
|
|
print(f"[Batch] Completed. Success: {sum(1 for r in results if r.success)}/{len(results)}") |
|
|
|
|
|
return BatchMolecularResponse( |
|
|
success=any(r.success for r in results), |
|
|
count=len(results), |
|
|
results=list(results) |
|
|
) |
|
|
|
|
|
|
|
|
@app.get("/health") |
|
|
async def health_check(): |
|
|
return {"status": "healthy"} |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |