""" GDC Data Portal Client Download and parse cancer genomics data from GDC """ import os import json import requests from typing import Dict, List, Optional, Any from pathlib import Path import yaml from dataclasses import dataclass import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class GDCFile: """Represents a file from GDC Portal""" file_id: str file_name: str file_size: int data_type: str data_format: str experimental_strategy: str case_id: str project_id: str class GDCClient: """Client for interacting with GDC Data Portal API""" def __init__(self, config_path: str = "config.yml"): with open(config_path, 'r') as f: self.config = yaml.safe_load(f)['gdc'] self.api_url = self.config['api_url'] self.download_dir = Path(self.config['download_dir']) self.download_dir.mkdir(parents=True, exist_ok=True) self.session = requests.Session() self.session.headers.update({ 'Content-Type': 'application/json' }) def search_files( self, filters: Optional[Dict] = None, size: int = 100, fields: Optional[List[str]] = None ) -> List[GDCFile]: """ Search for files in GDC Args: filters: GDC filter query size: Number of results to return fields: Fields to include in response """ endpoint = f"{self.api_url}/files" if fields is None: fields = [ 'file_id', 'file_name', 'file_size', 'data_type', 'data_format', 'experimental_strategy', 'cases.case_id', 'cases.project.project_id' ] params = { 'size': size, 'fields': ','.join(fields) } if filters: params['filters'] = json.dumps(filters) try: response = self.session.get(endpoint, params=params) response.raise_for_status() data = response.json() files = [] for hit in data.get('data', {}).get('hits', []): gdc_file = GDCFile( file_id=hit.get('file_id'), file_name=hit.get('file_name'), file_size=hit.get('file_size', 0), data_type=hit.get('data_type'), data_format=hit.get('data_format'), experimental_strategy=hit.get('experimental_strategy'), case_id=hit.get('cases', [{}])[0].get('case_id') if hit.get('cases') else None, project_id=hit.get('cases', [{}])[0].get('project', {}).get('project_id') if hit.get('cases') else None ) files.append(gdc_file) logger.info(f"Found {len(files)} files") return files except Exception as e: logger.error(f"Error searching files: {e}") return [] def download_file( self, file_id: str, output_dir: Optional[Path] = None ) -> Optional[Path]: """ Download a file from GDC Args: file_id: GDC file UUID output_dir: Directory to save file (defaults to config download_dir) Returns: Path to downloaded file or None if failed """ if output_dir is None: output_dir = self.download_dir output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) endpoint = f"{self.api_url}/data/{file_id}" try: logger.info(f"Downloading file {file_id}") response = self.session.get(endpoint, stream=True) response.raise_for_status() # Get filename from headers content_disposition = response.headers.get('content-disposition', '') if 'filename=' in content_disposition: filename = content_disposition.split('filename=')[1].strip('"') else: filename = file_id output_path = output_dir / filename with open(output_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) logger.info(f"Downloaded to {output_path}") return output_path except Exception as e: logger.error(f"Error downloading file {file_id}: {e}") return None def get_project_files( self, project_id: str, data_type: Optional[str] = None, limit: int = 100 ) -> List[GDCFile]: """ Get files for a specific project Args: project_id: GDC project ID (e.g., TCGA-BRCA) data_type: Filter by data type limit: Maximum number of files """ filters = { "op": "and", "content": [ { "op": "in", "content": { "field": "cases.project.project_id", "value": [project_id] } } ] } if data_type: filters["content"].append({ "op": "in", "content": { "field": "data_type", "value": [data_type] } }) return self.search_files(filters=filters, size=limit) def get_mutation_data(self, project_id: str, limit: int = 100) -> List[GDCFile]: """Get mutation/variant calling files for a project""" return self.get_project_files( project_id=project_id, data_type="Simple Nucleotide Variation", limit=limit ) def get_gene_expression_data(self, project_id: str, limit: int = 100) -> List[GDCFile]: """Get gene expression data for a project""" return self.get_project_files( project_id=project_id, data_type="Gene Expression Quantification", limit=limit ) def search_cases( self, project_id: str, filters: Optional[Dict] = None, size: int = 100 ) -> List[Dict]: """ Search for cases (patients) in GDC Args: project_id: GDC project ID filters: Additional filter criteria size: Number of results """ endpoint = f"{self.api_url}/cases" base_filters = { "op": "in", "content": { "field": "project.project_id", "value": [project_id] } } if filters: filter_query = { "op": "and", "content": [base_filters, filters] } else: filter_query = base_filters params = { 'size': size, 'filters': json.dumps(filter_query), 'fields': 'case_id,project.project_id,demographic,diagnoses' } try: response = self.session.get(endpoint, params=params) response.raise_for_status() data = response.json() cases = data.get('data', {}).get('hits', []) logger.info(f"Found {len(cases)} cases") return cases except Exception as e: logger.error(f"Error searching cases: {e}") return [] class GDCDataParser: """Parse downloaded GDC data files""" @staticmethod def parse_maf(file_path: Path) -> List[Dict]: """ Parse MAF (Mutation Annotation Format) file Returns list of mutation records """ mutations = [] try: with open(file_path, 'r') as f: # Skip comment lines for line in f: if not line.startswith('#'): header_line = line.strip() break headers = header_line.split('\t') for line in f: if line.startswith('#'): continue values = line.strip().split('\t') if len(values) == len(headers): mutation = dict(zip(headers, values)) mutations.append(mutation) logger.info(f"Parsed {len(mutations)} mutations from {file_path}") return mutations except Exception as e: logger.error(f"Error parsing MAF file: {e}") return [] @staticmethod def parse_vcf(file_path: Path) -> List[Dict]: """ Parse VCF (Variant Call Format) file Returns list of variant records """ variants = [] try: with open(file_path, 'r') as f: for line in f: if line.startswith('##'): continue if line.startswith('#CHROM'): headers = line.strip().split('\t') continue values = line.strip().split('\t') variant = { 'chrom': values[0], 'pos': values[1], 'id': values[2], 'ref': values[3], 'alt': values[4], 'qual': values[5], 'filter': values[6], 'info': values[7] } variants.append(variant) logger.info(f"Parsed {len(variants)} variants from {file_path}") return variants except Exception as e: logger.error(f"Error parsing VCF file: {e}") return [] @staticmethod def parse_clinical_data(data: Dict) -> Dict: """Parse clinical data from GDC case""" clinical = { 'case_id': data.get('case_id'), 'project_id': data.get('project', {}).get('project_id'), 'demographic': {}, 'diagnoses': [] } # Parse demographic data demo = data.get('demographic', {}) clinical['demographic'] = { 'age_at_index': demo.get('age_at_index'), 'gender': demo.get('gender'), 'race': demo.get('race'), 'ethnicity': demo.get('ethnicity') } # Parse diagnosis data for diag in data.get('diagnoses', []): diagnosis = { 'diagnosis_id': diag.get('diagnosis_id'), 'primary_diagnosis': diag.get('primary_diagnosis'), 'tumor_stage': diag.get('tumor_stage'), 'age_at_diagnosis': diag.get('age_at_diagnosis'), 'vital_status': diag.get('vital_status') } clinical['diagnoses'].append(diagnosis) return clinical