|
|
""" |
|
|
GitHub Storage Utility for Lung Cancer AI Advisor |
|
|
Handles saving side effects reports and validation results to GitHub repository |
|
|
""" |
|
|
import os |
|
|
import json |
|
|
import csv |
|
|
import io |
|
|
import base64 |
|
|
import time |
|
|
import traceback |
|
|
from datetime import datetime |
|
|
from typing import Dict, List, Any, Optional |
|
|
import requests |
|
|
from .config import logger |
|
|
|
|
|
|
|
|
class GitHubStorage: |
|
|
""" |
|
|
Utility class for storing medical data files in GitHub repository |
|
|
""" |
|
|
|
|
|
def __init__(self, repo_url: str = "https://github.com/MoazEldsouky/cloud-data-store.git", |
|
|
github_token: str = None): |
|
|
""" |
|
|
Initialize GitHub storage with repository details |
|
|
|
|
|
Args: |
|
|
repo_url: GitHub repository URL |
|
|
github_token: GitHub personal access token |
|
|
""" |
|
|
self.repo_url = repo_url |
|
|
self.github_token = github_token or os.getenv("GITHUB_TOKEN") |
|
|
|
|
|
|
|
|
if self.github_token: |
|
|
token_preview = self.github_token[:7] + "..." + self.github_token[-4:] if len(self.github_token) > 11 else "***" |
|
|
logger.info(f"GitHub token configured: {token_preview}") |
|
|
else: |
|
|
logger.warning("No GitHub token configured - uploads will fail!") |
|
|
|
|
|
|
|
|
if "github.com/" in repo_url: |
|
|
parts = repo_url.replace("https://github.com/", "").replace(".git", "").split("/") |
|
|
self.owner = parts[0] |
|
|
self.repo_name = parts[1] |
|
|
else: |
|
|
raise ValueError("Invalid GitHub repository URL format") |
|
|
|
|
|
self.api_base = f"https://api.github.com/repos/{self.owner}/{self.repo_name}" |
|
|
self.headers = { |
|
|
"Authorization": f"token {self.github_token}", |
|
|
"Accept": "application/vnd.github.v3+json", |
|
|
"Content-Type": "application/json" |
|
|
} |
|
|
|
|
|
logger.info(f"GitHub storage initialized for {self.owner}/{self.repo_name}") |
|
|
|
|
|
def _get_file_sha(self, file_path: str) -> Optional[str]: |
|
|
""" |
|
|
Get the SHA of an existing file in the repository |
|
|
|
|
|
Args: |
|
|
file_path: Path to file in repository |
|
|
|
|
|
Returns: |
|
|
SHA string if file exists, None otherwise |
|
|
""" |
|
|
try: |
|
|
url = f"{self.api_base}/contents/{file_path}" |
|
|
response = requests.get(url, headers=self.headers) |
|
|
|
|
|
if response.status_code == 200: |
|
|
return response.json().get("sha") |
|
|
elif response.status_code == 404: |
|
|
return None |
|
|
else: |
|
|
logger.error(f"Error getting file SHA: {response.status_code} - {response.text}") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Exception getting file SHA: {e}") |
|
|
return None |
|
|
|
|
|
def _upload_file(self, file_path: str, content: str, message: str, sha: Optional[str] = None) -> bool: |
|
|
""" |
|
|
Upload or update a file in the GitHub repository |
|
|
|
|
|
Args: |
|
|
file_path: Path where file should be stored in repo |
|
|
content: File content as string |
|
|
message: Commit message |
|
|
sha: SHA of existing file (for updates) |
|
|
|
|
|
Returns: |
|
|
True if successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
|
|
|
content_encoded = base64.b64encode(content.encode('utf-8')).decode('utf-8') |
|
|
|
|
|
|
|
|
data = { |
|
|
"message": message, |
|
|
"content": content_encoded |
|
|
} |
|
|
|
|
|
|
|
|
if sha: |
|
|
data["sha"] = sha |
|
|
|
|
|
|
|
|
url = f"{self.api_base}/contents/{file_path}" |
|
|
logger.info(f"Uploading to GitHub: {file_path} (size: {len(content)} bytes)") |
|
|
response = requests.put(url, headers=self.headers, json=data, timeout=30) |
|
|
|
|
|
if response.status_code in [200, 201]: |
|
|
logger.info(f"✓ Successfully uploaded {file_path} to GitHub") |
|
|
return True |
|
|
elif response.status_code == 401: |
|
|
logger.error(f"❌ Authentication failed uploading {file_path}: Invalid or expired GitHub token") |
|
|
logger.error(f"Response: {response.text}") |
|
|
return False |
|
|
elif response.status_code == 403: |
|
|
logger.error(f"❌ Permission denied uploading {file_path}: Token lacks required permissions") |
|
|
logger.error(f"Response: {response.text}") |
|
|
return False |
|
|
elif response.status_code == 404: |
|
|
logger.error(f"❌ Repository not found: {self.owner}/{self.repo_name}") |
|
|
logger.error(f"Response: {response.text}") |
|
|
return False |
|
|
elif response.status_code == 409: |
|
|
logger.error(f"Conflict error uploading {file_path}: File may have been modified. Status: {response.status_code}") |
|
|
logger.error(f"Response: {response.text[:500]}") |
|
|
return False |
|
|
else: |
|
|
logger.error(f"Failed to upload {file_path}. Status: {response.status_code}") |
|
|
logger.error(f"Response: {response.text}") |
|
|
return False |
|
|
|
|
|
except requests.exceptions.Timeout as e: |
|
|
logger.error(f"Timeout uploading file to GitHub: {e}") |
|
|
return False |
|
|
except requests.exceptions.RequestException as e: |
|
|
logger.error(f"Request exception uploading file to GitHub: {e}") |
|
|
return False |
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected exception uploading file to GitHub: {e}") |
|
|
logger.error(f"Traceback: {traceback.format_exc()}") |
|
|
return False |
|
|
|
|
|
def _get_file_content(self, file_path: str) -> Optional[str]: |
|
|
""" |
|
|
Get the content of a file from the GitHub repository |
|
|
|
|
|
Args: |
|
|
file_path: Path to file in repository |
|
|
|
|
|
Returns: |
|
|
File content as string if successful, None otherwise |
|
|
""" |
|
|
try: |
|
|
url = f"{self.api_base}/contents/{file_path}" |
|
|
response = requests.get(url, headers=self.headers) |
|
|
|
|
|
if response.status_code == 200: |
|
|
content_encoded = response.json().get("content", "") |
|
|
content = base64.b64decode(content_encoded).decode('utf-8') |
|
|
return content |
|
|
elif response.status_code == 404: |
|
|
return None |
|
|
else: |
|
|
logger.error(f"Error getting file content: {response.status_code} - {response.text}") |
|
|
return None |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Exception getting file content: {e}") |
|
|
return None |
|
|
|
|
|
def save_side_effects_report(self, report_data: Dict[str, Any]) -> bool: |
|
|
""" |
|
|
Save a side effects report to GitHub repository as CSV |
|
|
|
|
|
Args: |
|
|
report_data: Dictionary containing side effects report data |
|
|
|
|
|
Returns: |
|
|
True if successful, False otherwise |
|
|
""" |
|
|
try: |
|
|
file_path = "medical_data/side_effects_reports.csv" |
|
|
|
|
|
|
|
|
existing_content = self._get_file_content(file_path) |
|
|
|
|
|
|
|
|
fieldnames = [ |
|
|
'timestamp', 'drug_name', 'side_effects', 'patient_age', |
|
|
'patient_gender', 'dosage', 'duration', 'severity', |
|
|
'outcome', 'additional_details', 'reporter_info', 'raw_input' |
|
|
] |
|
|
|
|
|
|
|
|
output = io.StringIO() |
|
|
writer = csv.DictWriter(output, fieldnames=fieldnames) |
|
|
|
|
|
|
|
|
if existing_content is None: |
|
|
writer.writeheader() |
|
|
csv_content = output.getvalue() |
|
|
else: |
|
|
|
|
|
csv_content = existing_content |
|
|
|
|
|
|
|
|
output = io.StringIO() |
|
|
writer = csv.DictWriter(output, fieldnames=fieldnames) |
|
|
writer.writerow(report_data) |
|
|
new_row = output.getvalue() |
|
|
|
|
|
|
|
|
final_content = csv_content + new_row |
|
|
|
|
|
|
|
|
sha = self._get_file_sha(file_path) |
|
|
|
|
|
|
|
|
commit_message = f"Add side effects report for {report_data.get('drug_name', 'unknown drug')} - {report_data.get('timestamp', 'unknown time')}" |
|
|
|
|
|
return self._upload_file(file_path, final_content, commit_message, sha) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error saving side effects report to GitHub: {e}") |
|
|
return False |
|
|
|
|
|
def save_validation_results(self, evaluation_data: Dict[str, Any]) -> bool: |
|
|
""" |
|
|
Save validation results to GitHub repository as JSON with robust append logic. |
|
|
Always loads existing data first, then appends new evaluation without overwriting. |
|
|
|
|
|
Args: |
|
|
evaluation_data: Dictionary containing evaluation data with interaction_id already set |
|
|
|
|
|
Returns: |
|
|
True if successful, False otherwise |
|
|
""" |
|
|
max_retries = 3 |
|
|
retry_count = 0 |
|
|
|
|
|
while retry_count < max_retries: |
|
|
try: |
|
|
file_path = "medical_data/evaluation_results.json" |
|
|
|
|
|
|
|
|
logger.info(f"Attempt {retry_count + 1}/{max_retries}: Loading existing evaluations from GitHub...") |
|
|
existing_content = self._get_file_content(file_path) |
|
|
|
|
|
|
|
|
evaluations = [] |
|
|
if existing_content: |
|
|
try: |
|
|
evaluations = json.loads(existing_content) |
|
|
if not isinstance(evaluations, list): |
|
|
logger.warning("Existing content is not a list, creating new list") |
|
|
evaluations = [] |
|
|
else: |
|
|
logger.info(f"Successfully loaded {len(evaluations)} existing evaluations") |
|
|
except json.JSONDecodeError as e: |
|
|
logger.error(f"Failed to parse existing evaluation_results.json: {e}") |
|
|
|
|
|
if retry_count < max_retries - 1: |
|
|
retry_count += 1 |
|
|
logger.warning(f"Retrying due to JSON parse error...") |
|
|
time.sleep(2) |
|
|
continue |
|
|
else: |
|
|
logger.error("Max retries reached. Cannot parse existing data.") |
|
|
return False |
|
|
else: |
|
|
logger.info("No existing file found, creating new evaluation list") |
|
|
|
|
|
|
|
|
new_interaction_id = evaluation_data.get('interaction_id', 'unknown') |
|
|
logger.info(f"Adding new evaluation with ID: {new_interaction_id}") |
|
|
|
|
|
|
|
|
existing_ids = [e.get('interaction_id') for e in evaluations] |
|
|
if new_interaction_id in existing_ids: |
|
|
logger.warning(f"Evaluation with ID {new_interaction_id} already exists. Skipping duplicate.") |
|
|
return True |
|
|
|
|
|
|
|
|
evaluations.append(evaluation_data) |
|
|
logger.info(f"Appended new evaluation. Total count: {len(evaluations)}") |
|
|
|
|
|
|
|
|
json_content = json.dumps(evaluations, indent=2, ensure_ascii=False) |
|
|
|
|
|
|
|
|
sha = self._get_file_sha(file_path) |
|
|
if existing_content and not sha: |
|
|
logger.error("File exists but SHA not found. Possible race condition.") |
|
|
if retry_count < max_retries - 1: |
|
|
retry_count += 1 |
|
|
logger.warning("Retrying due to SHA retrieval failure...") |
|
|
time.sleep(2) |
|
|
continue |
|
|
else: |
|
|
return False |
|
|
|
|
|
|
|
|
commit_message = f"Add validation results for interaction {new_interaction_id} - {evaluation_data.get('timestamp', 'unknown time')}" |
|
|
|
|
|
success = self._upload_file(file_path, json_content, commit_message, sha) |
|
|
|
|
|
if success: |
|
|
logger.info(f"✓ Successfully saved evaluation {new_interaction_id}. Total evaluations now: {len(evaluations)}") |
|
|
return True |
|
|
else: |
|
|
logger.error(f"Failed to upload file (attempt {retry_count + 1}/{max_retries})") |
|
|
if retry_count < max_retries - 1: |
|
|
retry_count += 1 |
|
|
logger.warning("Retrying upload...") |
|
|
time.sleep(2) |
|
|
continue |
|
|
else: |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error saving validation results to GitHub (attempt {retry_count + 1}/{max_retries}): {e}") |
|
|
if retry_count < max_retries - 1: |
|
|
retry_count += 1 |
|
|
logger.warning("Retrying due to exception...") |
|
|
time.sleep(2) |
|
|
continue |
|
|
else: |
|
|
return False |
|
|
|
|
|
return False |
|
|
|
|
|
def get_side_effects_reports(self) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Get all side effects reports from GitHub repository |
|
|
|
|
|
Returns: |
|
|
List of side effects reports as dictionaries |
|
|
""" |
|
|
try: |
|
|
file_path = "medical_data/side_effects_reports.csv" |
|
|
content = self._get_file_content(file_path) |
|
|
|
|
|
if not content: |
|
|
return [] |
|
|
|
|
|
|
|
|
csv_reader = csv.DictReader(io.StringIO(content)) |
|
|
reports = list(csv_reader) |
|
|
|
|
|
return reports |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error getting side effects reports from GitHub: {e}") |
|
|
return [] |
|
|
|
|
|
def get_validation_results(self, limit: int = 10) -> Dict[str, Any]: |
|
|
""" |
|
|
Get validation results from GitHub repository |
|
|
|
|
|
Args: |
|
|
limit: Maximum number of recent evaluations to return |
|
|
|
|
|
Returns: |
|
|
Dictionary containing evaluation summary and recent evaluations |
|
|
""" |
|
|
try: |
|
|
file_path = "medical_data/evaluation_results.json" |
|
|
content = self._get_file_content(file_path) |
|
|
|
|
|
if not content: |
|
|
return {"message": "No evaluations found", "evaluations": []} |
|
|
|
|
|
|
|
|
evaluations = json.loads(content) |
|
|
if not isinstance(evaluations, list): |
|
|
evaluations = [] |
|
|
|
|
|
|
|
|
recent_evaluations = evaluations[-limit:] if evaluations else [] |
|
|
|
|
|
|
|
|
if recent_evaluations: |
|
|
total_scores = { |
|
|
"accuracy": 0, |
|
|
"coherence": 0, |
|
|
"relevance": 0, |
|
|
"completeness": 0, |
|
|
"citations": 0, |
|
|
"length": 0, |
|
|
"overall": 0 |
|
|
} |
|
|
|
|
|
count = len(recent_evaluations) |
|
|
for eval_data in recent_evaluations: |
|
|
report = eval_data.get("validation_report", {}) |
|
|
total_scores["accuracy"] += int(report.get("Accuracy_Rating", 0)) |
|
|
total_scores["coherence"] += int(report.get("Coherence_Rating", 0)) |
|
|
total_scores["relevance"] += int(report.get("Relevance_Rating", 0)) |
|
|
total_scores["completeness"] += int(report.get("Completeness_Rating", 0)) |
|
|
total_scores["citations"] += int(report.get("Citations_Attribution_Rating", 0)) |
|
|
total_scores["length"] += int(report.get("Length_Rating", 0)) |
|
|
total_scores["overall"] += int(report.get("Overall_Rating", 0)) |
|
|
|
|
|
averages = {key: round(value / count, 1) for key, value in total_scores.items()} |
|
|
else: |
|
|
averages = {} |
|
|
|
|
|
return { |
|
|
"total_evaluations": len(evaluations), |
|
|
"recent_count": len(recent_evaluations), |
|
|
"average_scores": averages, |
|
|
"evaluations": recent_evaluations |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error getting validation results from GitHub: {e}") |
|
|
return {"error": str(e), "evaluations": []} |
|
|
|
|
|
def get_drug_reports(self, drug_name: str) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Get side effects reports for a specific drug from GitHub repository |
|
|
|
|
|
Args: |
|
|
drug_name: Name of the drug to filter reports |
|
|
|
|
|
Returns: |
|
|
List of reports for the specified drug |
|
|
""" |
|
|
try: |
|
|
all_reports = self.get_side_effects_reports() |
|
|
|
|
|
|
|
|
drug_reports = [ |
|
|
report for report in all_reports |
|
|
if report.get('drug_name', '').lower() == drug_name.lower() |
|
|
] |
|
|
|
|
|
return drug_reports |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error getting drug reports from GitHub: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
_github_storage = None |
|
|
|
|
|
def get_github_storage() -> GitHubStorage: |
|
|
"""Get the global GitHub storage instance with lazy loading.""" |
|
|
global _github_storage |
|
|
if _github_storage is None: |
|
|
_github_storage = GitHubStorage() |
|
|
return _github_storage |
|
|
|