| | """
|
| | Google Drive Batch Processor for TB-Guard-XAI
|
| | Automatically processes chest X-rays uploaded to Google Drive
|
| | Uses live Hugging Face Space endpoint for analysis
|
| | """
|
| |
|
| | import os
|
| | import io
|
| | import time
|
| | import requests
|
| | from pathlib import Path
|
| | from datetime import datetime
|
| | from google.oauth2.credentials import Credentials
|
| | from google_auth_oauthlib.flow import InstalledAppFlow
|
| | from google.auth.transport.requests import Request
|
| | from googleapiclient.discovery import build
|
| | from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload
|
| | import pickle
|
| | from fpdf import FPDF
|
| |
|
| |
|
| | HF_SPACE_URL = "https://mistral-hackaton-2026-tb-guard-xai.hf.space"
|
| | API_ENDPOINT = f"{HF_SPACE_URL}/analyze"
|
| |
|
| |
|
| | SCOPES = ['https://www.googleapis.com/auth/drive']
|
| |
|
| |
|
| | INBOX_FOLDER = "TB_XRay_Inbox"
|
| | REPORTS_FOLDER = "TB_Reports"
|
| | PROCESSED_FOLDER = "TB_Processed"
|
| |
|
| | class GoogleDriveBatchProcessor:
|
| | """Batch processor for Google Drive integration using HF Space API"""
|
| |
|
| | def __init__(self, hf_space_url=HF_SPACE_URL):
|
| | self.service = self.authenticate()
|
| | self.api_endpoint = f"{hf_space_url}/analyze"
|
| | self.processed_files = set()
|
| |
|
| |
|
| | print(f"π Testing connection to Hugging Face Space...")
|
| | print(f" URL: {hf_space_url}")
|
| | try:
|
| | response = requests.get(f"{hf_space_url}/status", timeout=10)
|
| | if response.status_code == 200:
|
| | print(f" β
API is online and ready!")
|
| | else:
|
| | print(f" β οΈ API returned status {response.status_code}")
|
| | except Exception as e:
|
| | print(f" β οΈ Could not connect to API: {e}")
|
| | print(f" π‘ Make sure your Hugging Face Space is running")
|
| |
|
| |
|
| | self.inbox_id = self.get_or_create_folder(INBOX_FOLDER)
|
| | self.reports_id = self.get_or_create_folder(REPORTS_FOLDER)
|
| | self.processed_id = self.get_or_create_folder(PROCESSED_FOLDER)
|
| |
|
| | print(f"\nβ
Google Drive folders ready:")
|
| | print(f" π₯ Inbox: {INBOX_FOLDER}")
|
| | print(f" π Reports: {REPORTS_FOLDER}")
|
| | print(f" β
Processed: {PROCESSED_FOLDER}")
|
| |
|
| | def authenticate(self):
|
| | """Authenticate with Google Drive API"""
|
| | creds = None
|
| |
|
| |
|
| | if os.path.exists('token.pickle'):
|
| | with open('token.pickle', 'rb') as token:
|
| | creds = pickle.load(token)
|
| |
|
| |
|
| | if not creds or not creds.valid:
|
| | if creds and creds.expired and creds.refresh_token:
|
| | creds.refresh(Request())
|
| | else:
|
| | if not os.path.exists('credentials.json'):
|
| | print("β ERROR: credentials.json not found!")
|
| | print("\nπ Setup Instructions:")
|
| | print("1. Go to https://console.cloud.google.com/")
|
| | print("2. Create a new project or select existing")
|
| | print("3. Enable Google Drive API")
|
| | print("4. Create OAuth 2.0 credentials (Desktop app)")
|
| | print("5. Download credentials.json to this folder")
|
| | print("6. Run this script again")
|
| | raise FileNotFoundError("credentials.json not found")
|
| |
|
| | flow = InstalledAppFlow.from_client_secrets_file(
|
| | 'credentials.json', SCOPES)
|
| | creds = flow.run_local_server(port=0)
|
| |
|
| |
|
| | with open('token.pickle', 'wb') as token:
|
| | pickle.dump(creds, token)
|
| |
|
| | return build('drive', 'v3', credentials=creds)
|
| |
|
| | def get_or_create_folder(self, folder_name):
|
| | """Get folder ID or create if doesn't exist"""
|
| |
|
| | query = f"name='{folder_name}' and mimeType='application/vnd.google-apps.folder' and trashed=false"
|
| | results = self.service.files().list(q=query, fields="files(id, name)").execute()
|
| | folders = results.get('files', [])
|
| |
|
| | if folders:
|
| | return folders[0]['id']
|
| |
|
| |
|
| | file_metadata = {
|
| | 'name': folder_name,
|
| | 'mimeType': 'application/vnd.google-apps.folder'
|
| | }
|
| | folder = self.service.files().create(body=file_metadata, fields='id').execute()
|
| | print(f"π Created folder: {folder_name}")
|
| | return folder.get('id')
|
| |
|
| | def list_inbox_files(self):
|
| | """List all image files in inbox folder"""
|
| | query = f"'{self.inbox_id}' in parents and trashed=false and (mimeType='image/png' or mimeType='image/jpeg')"
|
| | results = self.service.files().list(
|
| | q=query,
|
| | fields="files(id, name, createdTime)"
|
| | ).execute()
|
| | return results.get('files', [])
|
| |
|
| | def download_file(self, file_id, file_name):
|
| | """Download file from Google Drive"""
|
| | request = self.service.files().get_media(fileId=file_id)
|
| |
|
| | temp_path = Path("temp_gdrive") / file_name
|
| | temp_path.parent.mkdir(exist_ok=True)
|
| |
|
| | fh = io.FileIO(str(temp_path), 'wb')
|
| | downloader = MediaIoBaseDownload(fh, request)
|
| |
|
| | done = False
|
| | while not done:
|
| | status, done = downloader.next_chunk()
|
| |
|
| | fh.close()
|
| | return temp_path
|
| |
|
| | def upload_file(self, file_path, folder_id, file_name=None):
|
| | """Upload file to Google Drive"""
|
| | if file_name is None:
|
| | file_name = Path(file_path).name
|
| |
|
| | file_metadata = {
|
| | 'name': file_name,
|
| | 'parents': [folder_id]
|
| | }
|
| |
|
| | media = MediaFileUpload(str(file_path), resumable=True)
|
| | file = self.service.files().create(
|
| | body=file_metadata,
|
| | media_body=media,
|
| | fields='id'
|
| | ).execute()
|
| |
|
| | return file.get('id')
|
| |
|
| | def move_file(self, file_id, new_folder_id):
|
| | """Move file to different folder"""
|
| |
|
| | file = self.service.files().get(fileId=file_id, fields='parents').execute()
|
| | previous_parents = ",".join(file.get('parents'))
|
| |
|
| |
|
| | self.service.files().update(
|
| | fileId=file_id,
|
| | addParents=new_folder_id,
|
| | removeParents=previous_parents,
|
| | fields='id, parents'
|
| | ).execute()
|
| |
|
| | def generate_pdf_report(self, file_name, analysis_result, output_path):
|
| | """Generate PDF report from analysis results"""
|
| | pdf = FPDF()
|
| | pdf.add_page()
|
| |
|
| |
|
| | pdf.set_font('Arial', 'B', 16)
|
| | pdf.cell(0, 10, 'TB-Guard-XAI Clinical Report', 0, 1, 'C')
|
| | pdf.ln(5)
|
| |
|
| |
|
| | pdf.set_font('Arial', '', 10)
|
| | pdf.cell(0, 6, f'X-Ray File: {file_name}', 0, 1)
|
| | pdf.cell(0, 6, f'Analysis Date: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', 0, 1)
|
| | pdf.cell(0, 6, f'System: TB-Guard-XAI v2.0 (Offline Mode: {analysis_result.get("mode", "unknown")})', 0, 1)
|
| | pdf.ln(5)
|
| |
|
| |
|
| | pdf.set_font('Arial', 'B', 12)
|
| | pdf.cell(0, 8, 'Analysis Results:', 0, 1)
|
| |
|
| | pdf.set_font('Arial', '', 10)
|
| | pdf.cell(0, 6, f'Prediction: {analysis_result["prediction"]}', 0, 1)
|
| | pdf.cell(0, 6, f'TB Probability: {analysis_result["probability"]*100:.1f}%', 0, 1)
|
| | pdf.cell(0, 6, f'Uncertainty: {analysis_result["uncertainty"]} (std: {analysis_result["uncertainty_std"]:.4f})', 0, 1)
|
| | pdf.cell(0, 6, f'Attention Region: {analysis_result.get("gradcam_region", "N/A")}', 0, 1)
|
| | pdf.ln(5)
|
| |
|
| |
|
| | pdf.set_font('Arial', 'B', 12)
|
| | pdf.cell(0, 8, 'Clinical Synthesis:', 0, 1)
|
| |
|
| | pdf.set_font('Arial', '', 9)
|
| | synthesis = analysis_result.get("explanation", "No synthesis available")
|
| |
|
| |
|
| | synthesis = synthesis.replace('#', '').replace('*', '').replace('`', '')
|
| |
|
| |
|
| | for line in synthesis.split('\n'):
|
| | line = line.strip()
|
| | if line:
|
| | pdf.multi_cell(0, 5, line)
|
| |
|
| | pdf.ln(5)
|
| |
|
| |
|
| | pdf.set_font('Arial', 'I', 8)
|
| | pdf.multi_cell(0, 4, 'DISCLAIMER: This is a screening tool, not a diagnostic tool. All findings must be confirmed by qualified healthcare professionals and appropriate diagnostic tests.')
|
| |
|
| |
|
| | pdf.output(str(output_path))
|
| |
|
| | def analyze_xray_via_api(self, image_path):
|
| | """Analyze X-ray using Hugging Face Space API"""
|
| | try:
|
| |
|
| | with open(image_path, 'rb') as f:
|
| | files = {'file': (Path(image_path).name, f, 'image/png')}
|
| | data = {
|
| | 'symptoms': '',
|
| | 'age_group': 'Adult (18-64)',
|
| | 'threshold': 0.5
|
| | }
|
| |
|
| |
|
| | response = requests.post(
|
| | self.api_endpoint,
|
| | files=files,
|
| | data=data,
|
| | timeout=60
|
| | )
|
| |
|
| | if response.status_code == 200:
|
| | return response.json()
|
| | else:
|
| | print(f" β οΈ API error: {response.status_code}")
|
| | print(f" Response: {response.text[:200]}")
|
| | return None
|
| |
|
| | except requests.exceptions.Timeout:
|
| | print(f" β οΈ API timeout (>60s)")
|
| | return None
|
| | except Exception as e:
|
| | print(f" β οΈ API call failed: {e}")
|
| | return None
|
| |
|
| | def process_file(self, file_info):
|
| | """Process a single X-ray file using HF Space API"""
|
| | file_id = file_info['id']
|
| | file_name = file_info['name']
|
| |
|
| | print(f"\nπ Processing: {file_name}")
|
| |
|
| | try:
|
| |
|
| | print(" π₯ Downloading from Google Drive...")
|
| | local_path = self.download_file(file_id, file_name)
|
| |
|
| |
|
| | print(" π§ Sending to Hugging Face Space for analysis...")
|
| | result = self.analyze_xray_via_api(local_path)
|
| |
|
| | if result is None:
|
| | print(f" β Analysis failed for {file_name}")
|
| | local_path.unlink()
|
| | return False
|
| |
|
| |
|
| | if 'error' in result:
|
| | print(f" β API error: {result['error']}")
|
| | local_path.unlink()
|
| | return False
|
| |
|
| |
|
| | mode = result.get('mode', 'unknown')
|
| | prob = result.get('probability', 0)
|
| | uncertainty = result.get('uncertainty', 'Unknown')
|
| | print(f" π Results: {result.get('prediction', 'Unknown')}")
|
| | print(f" β’ Probability: {prob*100:.1f}%")
|
| | print(f" β’ Uncertainty: {uncertainty}")
|
| | print(f" β’ Mode: {mode.upper()}")
|
| |
|
| |
|
| | print(" π Generating PDF report...")
|
| | report_name = Path(file_name).stem + "_report.pdf"
|
| | report_path = Path("temp_gdrive") / report_name
|
| | self.generate_pdf_report(file_name, result, report_path)
|
| |
|
| |
|
| | print(" π€ Uploading report to Google Drive...")
|
| | self.upload_file(report_path, self.reports_id, report_name)
|
| |
|
| |
|
| | print(" β
Moving to processed folder...")
|
| | self.move_file(file_id, self.processed_id)
|
| |
|
| |
|
| | local_path.unlink()
|
| | report_path.unlink()
|
| |
|
| | print(f" β
Complete: {file_name} β {report_name}")
|
| | return True
|
| |
|
| | except Exception as e:
|
| | print(f" β Error processing {file_name}: {e}")
|
| | import traceback
|
| | traceback.print_exc()
|
| | return False
|
| |
|
| | def watch_and_process(self, interval=30):
|
| | """Watch inbox folder and process new files"""
|
| | print("\n" + "="*60)
|
| | print("π TB-Guard-XAI Google Drive Batch Processor")
|
| | print("="*60)
|
| | print(f"\nπ Watching folder: {INBOX_FOLDER}")
|
| | print(f"β±οΈ Check interval: {interval} seconds")
|
| | print(f"π Reports will be saved to: {REPORTS_FOLDER}")
|
| | print("\nπ‘ Upload X-ray images to '{INBOX_FOLDER}' folder in Google Drive")
|
| | print("π Press Ctrl+C to stop\n")
|
| |
|
| | try:
|
| | while True:
|
| |
|
| | files = self.list_inbox_files()
|
| |
|
| |
|
| | new_files = [f for f in files if f['id'] not in self.processed_files]
|
| |
|
| | if new_files:
|
| | print(f"\n㪠Found {len(new_files)} new file(s)")
|
| |
|
| | for file_info in new_files:
|
| | success = self.process_file(file_info)
|
| | if success:
|
| | self.processed_files.add(file_info['id'])
|
| | else:
|
| | print(f"β³ {datetime.now().strftime('%H:%M:%S')} - No new files. Waiting...")
|
| |
|
| | time.sleep(interval)
|
| |
|
| | except KeyboardInterrupt:
|
| | print("\n\nπ Stopping batch processor...")
|
| | print("β
Processed files will remain in Google Drive")
|
| |
|
| | def main():
|
| | """Main entry point"""
|
| | import sys
|
| |
|
| | print("π§ Initializing TB-Guard-XAI Batch Processor...")
|
| | print("π Using Hugging Face Space API for analysis")
|
| |
|
| |
|
| | hf_url = os.getenv("HF_SPACE_URL", HF_SPACE_URL)
|
| | if len(sys.argv) > 1 and sys.argv[1].startswith("http"):
|
| | hf_url = sys.argv[1]
|
| | print(f"π Using custom URL: {hf_url}")
|
| |
|
| | try:
|
| | processor = GoogleDriveBatchProcessor(hf_space_url=hf_url)
|
| |
|
| |
|
| | if len(sys.argv) > 1 and sys.argv[-1] == "once":
|
| |
|
| | files = processor.list_inbox_files()
|
| | if files:
|
| | print(f"\n㪠Found {len(files)} file(s) to process")
|
| | for file_info in files:
|
| | processor.process_file(file_info)
|
| | else:
|
| | print("\nπ No files in inbox")
|
| | else:
|
| |
|
| | processor.watch_and_process(interval=30)
|
| |
|
| | except FileNotFoundError as e:
|
| | print(f"\nβ {e}")
|
| | except Exception as e:
|
| | print(f"\nβ Error: {e}")
|
| | import traceback
|
| | traceback.print_exc()
|
| |
|
| | if __name__ == "__main__":
|
| | main()
|
| |
|