Spaces:
Running
Running
Enhance API security and functionality by adding authentication middleware and session management. Updated app.py to include the new auth router and integrated authentication checks for protected endpoints. Modified requirements.txt to include necessary libraries for session handling. Updated .env.example to include authentication credentials. Improved retrieval functions with query expansion for better medical term matching and enriched context in responses.
ddc9c77
| """ | |
| GitHub Storage Utility for Medical RAG Advisor | |
| Handles saving side effects reports and validation results to GitHub repository | |
| """ | |
| import os | |
| import json | |
| import csv | |
| import io | |
| import base64 | |
| from datetime import datetime | |
| from typing import Dict, List, Any, Optional | |
| import requests | |
| from .config import logger | |
| class GitHubStorage: | |
| """ | |
| Utility class for storing medical data files in GitHub repository | |
| """ | |
| def __init__(self, repo_url: str = "https://github.com/MoazEldsouky/cloud-data-store.git", | |
| github_token: str = None): | |
| """ | |
| Initialize GitHub storage with repository details | |
| Args: | |
| repo_url: GitHub repository URL | |
| github_token: GitHub personal access token | |
| """ | |
| self.repo_url = repo_url | |
| self.github_token = github_token or os.getenv("GITHUB_TOKEN", "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") | |
| # Extract owner and repo name from URL | |
| if "github.com/" in repo_url: | |
| parts = repo_url.replace("https://github.com/", "").replace(".git", "").split("/") | |
| self.owner = parts[0] | |
| self.repo_name = parts[1] | |
| else: | |
| raise ValueError("Invalid GitHub repository URL format") | |
| self.api_base = f"https://api.github.com/repos/{self.owner}/{self.repo_name}" | |
| self.headers = { | |
| "Authorization": f"token {self.github_token}", | |
| "Accept": "application/vnd.github.v3+json", | |
| "Content-Type": "application/json" | |
| } | |
| logger.info(f"GitHub storage initialized for {self.owner}/{self.repo_name}") | |
| def _get_file_sha(self, file_path: str) -> Optional[str]: | |
| """ | |
| Get the SHA of an existing file in the repository | |
| Args: | |
| file_path: Path to file in repository | |
| Returns: | |
| SHA string if file exists, None otherwise | |
| """ | |
| try: | |
| url = f"{self.api_base}/contents/{file_path}" | |
| response = requests.get(url, headers=self.headers) | |
| if response.status_code == 200: | |
| return response.json().get("sha") | |
| elif response.status_code == 404: | |
| return None | |
| else: | |
| logger.error(f"Error getting file SHA: {response.status_code} - {response.text}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Exception getting file SHA: {e}") | |
| return None | |
| def _upload_file(self, file_path: str, content: str, message: str, sha: Optional[str] = None) -> bool: | |
| """ | |
| Upload or update a file in the GitHub repository | |
| Args: | |
| file_path: Path where file should be stored in repo | |
| content: File content as string | |
| message: Commit message | |
| sha: SHA of existing file (for updates) | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| # Encode content to base64 | |
| content_encoded = base64.b64encode(content.encode('utf-8')).decode('utf-8') | |
| # Prepare request data | |
| data = { | |
| "message": message, | |
| "content": content_encoded | |
| } | |
| # Add SHA if updating existing file | |
| if sha: | |
| data["sha"] = sha | |
| # Make API request | |
| url = f"{self.api_base}/contents/{file_path}" | |
| response = requests.put(url, headers=self.headers, json=data) | |
| if response.status_code in [200, 201]: | |
| logger.info(f"Successfully uploaded {file_path} to GitHub") | |
| return True | |
| else: | |
| logger.error(f"Failed to upload {file_path}: {response.status_code} - {response.text}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Exception uploading file to GitHub: {e}") | |
| return False | |
| def _get_file_content(self, file_path: str) -> Optional[str]: | |
| """ | |
| Get the content of a file from the GitHub repository | |
| Args: | |
| file_path: Path to file in repository | |
| Returns: | |
| File content as string if successful, None otherwise | |
| """ | |
| try: | |
| url = f"{self.api_base}/contents/{file_path}" | |
| response = requests.get(url, headers=self.headers) | |
| if response.status_code == 200: | |
| content_encoded = response.json().get("content", "") | |
| content = base64.b64decode(content_encoded).decode('utf-8') | |
| return content | |
| elif response.status_code == 404: | |
| return None | |
| else: | |
| logger.error(f"Error getting file content: {response.status_code} - {response.text}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Exception getting file content: {e}") | |
| return None | |
| def save_side_effects_report(self, report_data: Dict[str, Any]) -> bool: | |
| """ | |
| Save a side effects report to GitHub repository as CSV | |
| Args: | |
| report_data: Dictionary containing side effects report data | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| file_path = "medical_data/side_effects_reports.csv" | |
| # Get existing file content | |
| existing_content = self._get_file_content(file_path) | |
| # Define CSV fieldnames | |
| fieldnames = [ | |
| 'timestamp', 'drug_name', 'side_effects', 'patient_age', | |
| 'patient_gender', 'dosage', 'duration', 'severity', | |
| 'outcome', 'additional_details', 'reporter_info', 'raw_input' | |
| ] | |
| # Create CSV content | |
| output = io.StringIO() | |
| writer = csv.DictWriter(output, fieldnames=fieldnames) | |
| # If file doesn't exist, write header | |
| if existing_content is None: | |
| writer.writeheader() | |
| csv_content = output.getvalue() | |
| else: | |
| # File exists, append to existing content | |
| csv_content = existing_content | |
| # Append new row | |
| output = io.StringIO() | |
| writer = csv.DictWriter(output, fieldnames=fieldnames) | |
| writer.writerow(report_data) | |
| new_row = output.getvalue() | |
| # Combine existing content with new row | |
| final_content = csv_content + new_row | |
| # Get SHA for update | |
| sha = self._get_file_sha(file_path) | |
| # Upload file | |
| commit_message = f"Add side effects report for {report_data.get('drug_name', 'unknown drug')} - {report_data.get('timestamp', 'unknown time')}" | |
| return self._upload_file(file_path, final_content, commit_message, sha) | |
| except Exception as e: | |
| logger.error(f"Error saving side effects report to GitHub: {e}") | |
| return False | |
| def save_validation_results(self, evaluation_data: Dict[str, Any]) -> bool: | |
| """ | |
| Save validation results to GitHub repository as JSON | |
| Args: | |
| evaluation_data: Dictionary containing evaluation data with interaction_id already set | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| file_path = "medical_data/evaluation_results.json" | |
| # Get existing file content | |
| existing_content = self._get_file_content(file_path) | |
| # Parse existing data or create new list | |
| if existing_content: | |
| try: | |
| evaluations = json.loads(existing_content) | |
| if not isinstance(evaluations, list): | |
| evaluations = [] | |
| except json.JSONDecodeError: | |
| logger.warning("Failed to parse existing evaluation_results.json, starting fresh") | |
| evaluations = [] | |
| else: | |
| evaluations = [] | |
| # Log the current state | |
| logger.info(f"Loading existing evaluations: {len(evaluations)} found") | |
| logger.info(f"Adding new evaluation with ID: {evaluation_data.get('interaction_id', 'unknown')}") | |
| # Add new evaluation to the list | |
| evaluations.append(evaluation_data) | |
| # Convert to JSON string | |
| json_content = json.dumps(evaluations, indent=2, ensure_ascii=False) | |
| # Get SHA for update | |
| sha = self._get_file_sha(file_path) | |
| # Upload file | |
| commit_message = f"Add validation results for interaction {evaluation_data.get('interaction_id', 'unknown')} - {evaluation_data.get('timestamp', 'unknown time')}" | |
| success = self._upload_file(file_path, json_content, commit_message, sha) | |
| if success: | |
| logger.info(f"Successfully saved evaluation. Total evaluations now: {len(evaluations)}") | |
| return success | |
| except Exception as e: | |
| logger.error(f"Error saving validation results to GitHub: {e}") | |
| return False | |
| def get_side_effects_reports(self) -> List[Dict[str, Any]]: | |
| """ | |
| Get all side effects reports from GitHub repository | |
| Returns: | |
| List of side effects reports as dictionaries | |
| """ | |
| try: | |
| file_path = "medical_data/side_effects_reports.csv" | |
| content = self._get_file_content(file_path) | |
| if not content: | |
| return [] | |
| # Parse CSV content | |
| csv_reader = csv.DictReader(io.StringIO(content)) | |
| reports = list(csv_reader) | |
| return reports | |
| except Exception as e: | |
| logger.error(f"Error getting side effects reports from GitHub: {e}") | |
| return [] | |
| def get_validation_results(self, limit: int = 10) -> Dict[str, Any]: | |
| """ | |
| Get validation results from GitHub repository | |
| Args: | |
| limit: Maximum number of recent evaluations to return | |
| Returns: | |
| Dictionary containing evaluation summary and recent evaluations | |
| """ | |
| try: | |
| file_path = "medical_data/evaluation_results.json" | |
| content = self._get_file_content(file_path) | |
| if not content: | |
| return {"message": "No evaluations found", "evaluations": []} | |
| # Parse JSON content | |
| evaluations = json.loads(content) | |
| if not isinstance(evaluations, list): | |
| evaluations = [] | |
| # Get recent evaluations | |
| recent_evaluations = evaluations[-limit:] if evaluations else [] | |
| # Calculate average scores | |
| if recent_evaluations: | |
| total_scores = { | |
| "accuracy": 0, | |
| "coherence": 0, | |
| "relevance": 0, | |
| "completeness": 0, | |
| "citations": 0, | |
| "length": 0, | |
| "overall": 0 | |
| } | |
| count = len(recent_evaluations) | |
| for eval_data in recent_evaluations: | |
| report = eval_data.get("validation_report", {}) | |
| total_scores["accuracy"] += int(report.get("Accuracy_Rating", 0)) | |
| total_scores["coherence"] += int(report.get("Coherence_Rating", 0)) | |
| total_scores["relevance"] += int(report.get("Relevance_Rating", 0)) | |
| total_scores["completeness"] += int(report.get("Completeness_Rating", 0)) | |
| total_scores["citations"] += int(report.get("Citations_Attribution_Rating", 0)) | |
| total_scores["length"] += int(report.get("Length_Rating", 0)) | |
| total_scores["overall"] += int(report.get("Overall_Rating", 0)) | |
| averages = {key: round(value / count, 1) for key, value in total_scores.items()} | |
| else: | |
| averages = {} | |
| return { | |
| "total_evaluations": len(evaluations), | |
| "recent_count": len(recent_evaluations), | |
| "average_scores": averages, | |
| "evaluations": recent_evaluations | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting validation results from GitHub: {e}") | |
| return {"error": str(e), "evaluations": []} | |
| def get_drug_reports(self, drug_name: str) -> List[Dict[str, Any]]: | |
| """ | |
| Get side effects reports for a specific drug from GitHub repository | |
| Args: | |
| drug_name: Name of the drug to filter reports | |
| Returns: | |
| List of reports for the specified drug | |
| """ | |
| try: | |
| all_reports = self.get_side_effects_reports() | |
| # Filter reports for the specific drug (case-insensitive) | |
| drug_reports = [ | |
| report for report in all_reports | |
| if report.get('drug_name', '').lower() == drug_name.lower() | |
| ] | |
| return drug_reports | |
| except Exception as e: | |
| logger.error(f"Error getting drug reports from GitHub: {e}") | |
| return [] | |
| # Global GitHub storage instance | |
| _github_storage = None | |
| def get_github_storage() -> GitHubStorage: | |
| """Get the global GitHub storage instance with lazy loading.""" | |
| global _github_storage | |
| if _github_storage is None: | |
| _github_storage = GitHubStorage() | |
| return _github_storage | |