Lung-Cancer-AI-Advisor / core /github_storage.py
moazx's picture
Enhance API security and functionality by adding authentication middleware and session management. Updated app.py to include the new auth router and integrated authentication checks for protected endpoints. Modified requirements.txt to include necessary libraries for session handling. Updated .env.example to include authentication credentials. Improved retrieval functions with query expansion for better medical term matching and enriched context in responses.
ddc9c77
raw
history blame
14.2 kB
"""
GitHub Storage Utility for Medical RAG Advisor
Handles saving side effects reports and validation results to GitHub repository
"""
import os
import json
import csv
import io
import base64
from datetime import datetime
from typing import Dict, List, Any, Optional
import requests
from .config import logger
class GitHubStorage:
"""
Utility class for storing medical data files in GitHub repository
"""
def __init__(self, repo_url: str = "https://github.com/MoazEldsouky/cloud-data-store.git",
github_token: str = None):
"""
Initialize GitHub storage with repository details
Args:
repo_url: GitHub repository URL
github_token: GitHub personal access token
"""
self.repo_url = repo_url
self.github_token = github_token or os.getenv("GITHUB_TOKEN", "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")
# Extract owner and repo name from URL
if "github.com/" in repo_url:
parts = repo_url.replace("https://github.com/", "").replace(".git", "").split("/")
self.owner = parts[0]
self.repo_name = parts[1]
else:
raise ValueError("Invalid GitHub repository URL format")
self.api_base = f"https://api.github.com/repos/{self.owner}/{self.repo_name}"
self.headers = {
"Authorization": f"token {self.github_token}",
"Accept": "application/vnd.github.v3+json",
"Content-Type": "application/json"
}
logger.info(f"GitHub storage initialized for {self.owner}/{self.repo_name}")
def _get_file_sha(self, file_path: str) -> Optional[str]:
"""
Get the SHA of an existing file in the repository
Args:
file_path: Path to file in repository
Returns:
SHA string if file exists, None otherwise
"""
try:
url = f"{self.api_base}/contents/{file_path}"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
return response.json().get("sha")
elif response.status_code == 404:
return None
else:
logger.error(f"Error getting file SHA: {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"Exception getting file SHA: {e}")
return None
def _upload_file(self, file_path: str, content: str, message: str, sha: Optional[str] = None) -> bool:
"""
Upload or update a file in the GitHub repository
Args:
file_path: Path where file should be stored in repo
content: File content as string
message: Commit message
sha: SHA of existing file (for updates)
Returns:
True if successful, False otherwise
"""
try:
# Encode content to base64
content_encoded = base64.b64encode(content.encode('utf-8')).decode('utf-8')
# Prepare request data
data = {
"message": message,
"content": content_encoded
}
# Add SHA if updating existing file
if sha:
data["sha"] = sha
# Make API request
url = f"{self.api_base}/contents/{file_path}"
response = requests.put(url, headers=self.headers, json=data)
if response.status_code in [200, 201]:
logger.info(f"Successfully uploaded {file_path} to GitHub")
return True
else:
logger.error(f"Failed to upload {file_path}: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Exception uploading file to GitHub: {e}")
return False
def _get_file_content(self, file_path: str) -> Optional[str]:
"""
Get the content of a file from the GitHub repository
Args:
file_path: Path to file in repository
Returns:
File content as string if successful, None otherwise
"""
try:
url = f"{self.api_base}/contents/{file_path}"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
content_encoded = response.json().get("content", "")
content = base64.b64decode(content_encoded).decode('utf-8')
return content
elif response.status_code == 404:
return None
else:
logger.error(f"Error getting file content: {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"Exception getting file content: {e}")
return None
def save_side_effects_report(self, report_data: Dict[str, Any]) -> bool:
"""
Save a side effects report to GitHub repository as CSV
Args:
report_data: Dictionary containing side effects report data
Returns:
True if successful, False otherwise
"""
try:
file_path = "medical_data/side_effects_reports.csv"
# Get existing file content
existing_content = self._get_file_content(file_path)
# Define CSV fieldnames
fieldnames = [
'timestamp', 'drug_name', 'side_effects', 'patient_age',
'patient_gender', 'dosage', 'duration', 'severity',
'outcome', 'additional_details', 'reporter_info', 'raw_input'
]
# Create CSV content
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames)
# If file doesn't exist, write header
if existing_content is None:
writer.writeheader()
csv_content = output.getvalue()
else:
# File exists, append to existing content
csv_content = existing_content
# Append new row
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writerow(report_data)
new_row = output.getvalue()
# Combine existing content with new row
final_content = csv_content + new_row
# Get SHA for update
sha = self._get_file_sha(file_path)
# Upload file
commit_message = f"Add side effects report for {report_data.get('drug_name', 'unknown drug')} - {report_data.get('timestamp', 'unknown time')}"
return self._upload_file(file_path, final_content, commit_message, sha)
except Exception as e:
logger.error(f"Error saving side effects report to GitHub: {e}")
return False
def save_validation_results(self, evaluation_data: Dict[str, Any]) -> bool:
"""
Save validation results to GitHub repository as JSON
Args:
evaluation_data: Dictionary containing evaluation data with interaction_id already set
Returns:
True if successful, False otherwise
"""
try:
file_path = "medical_data/evaluation_results.json"
# Get existing file content
existing_content = self._get_file_content(file_path)
# Parse existing data or create new list
if existing_content:
try:
evaluations = json.loads(existing_content)
if not isinstance(evaluations, list):
evaluations = []
except json.JSONDecodeError:
logger.warning("Failed to parse existing evaluation_results.json, starting fresh")
evaluations = []
else:
evaluations = []
# Log the current state
logger.info(f"Loading existing evaluations: {len(evaluations)} found")
logger.info(f"Adding new evaluation with ID: {evaluation_data.get('interaction_id', 'unknown')}")
# Add new evaluation to the list
evaluations.append(evaluation_data)
# Convert to JSON string
json_content = json.dumps(evaluations, indent=2, ensure_ascii=False)
# Get SHA for update
sha = self._get_file_sha(file_path)
# Upload file
commit_message = f"Add validation results for interaction {evaluation_data.get('interaction_id', 'unknown')} - {evaluation_data.get('timestamp', 'unknown time')}"
success = self._upload_file(file_path, json_content, commit_message, sha)
if success:
logger.info(f"Successfully saved evaluation. Total evaluations now: {len(evaluations)}")
return success
except Exception as e:
logger.error(f"Error saving validation results to GitHub: {e}")
return False
def get_side_effects_reports(self) -> List[Dict[str, Any]]:
"""
Get all side effects reports from GitHub repository
Returns:
List of side effects reports as dictionaries
"""
try:
file_path = "medical_data/side_effects_reports.csv"
content = self._get_file_content(file_path)
if not content:
return []
# Parse CSV content
csv_reader = csv.DictReader(io.StringIO(content))
reports = list(csv_reader)
return reports
except Exception as e:
logger.error(f"Error getting side effects reports from GitHub: {e}")
return []
def get_validation_results(self, limit: int = 10) -> Dict[str, Any]:
"""
Get validation results from GitHub repository
Args:
limit: Maximum number of recent evaluations to return
Returns:
Dictionary containing evaluation summary and recent evaluations
"""
try:
file_path = "medical_data/evaluation_results.json"
content = self._get_file_content(file_path)
if not content:
return {"message": "No evaluations found", "evaluations": []}
# Parse JSON content
evaluations = json.loads(content)
if not isinstance(evaluations, list):
evaluations = []
# Get recent evaluations
recent_evaluations = evaluations[-limit:] if evaluations else []
# Calculate average scores
if recent_evaluations:
total_scores = {
"accuracy": 0,
"coherence": 0,
"relevance": 0,
"completeness": 0,
"citations": 0,
"length": 0,
"overall": 0
}
count = len(recent_evaluations)
for eval_data in recent_evaluations:
report = eval_data.get("validation_report", {})
total_scores["accuracy"] += int(report.get("Accuracy_Rating", 0))
total_scores["coherence"] += int(report.get("Coherence_Rating", 0))
total_scores["relevance"] += int(report.get("Relevance_Rating", 0))
total_scores["completeness"] += int(report.get("Completeness_Rating", 0))
total_scores["citations"] += int(report.get("Citations_Attribution_Rating", 0))
total_scores["length"] += int(report.get("Length_Rating", 0))
total_scores["overall"] += int(report.get("Overall_Rating", 0))
averages = {key: round(value / count, 1) for key, value in total_scores.items()}
else:
averages = {}
return {
"total_evaluations": len(evaluations),
"recent_count": len(recent_evaluations),
"average_scores": averages,
"evaluations": recent_evaluations
}
except Exception as e:
logger.error(f"Error getting validation results from GitHub: {e}")
return {"error": str(e), "evaluations": []}
def get_drug_reports(self, drug_name: str) -> List[Dict[str, Any]]:
"""
Get side effects reports for a specific drug from GitHub repository
Args:
drug_name: Name of the drug to filter reports
Returns:
List of reports for the specified drug
"""
try:
all_reports = self.get_side_effects_reports()
# Filter reports for the specific drug (case-insensitive)
drug_reports = [
report for report in all_reports
if report.get('drug_name', '').lower() == drug_name.lower()
]
return drug_reports
except Exception as e:
logger.error(f"Error getting drug reports from GitHub: {e}")
return []
# Global GitHub storage instance
_github_storage = None
def get_github_storage() -> GitHubStorage:
"""Get the global GitHub storage instance with lazy loading."""
global _github_storage
if _github_storage is None:
_github_storage = GitHubStorage()
return _github_storage