|
|
"""
|
|
|
GDC Data Portal Client
|
|
|
Download and parse cancer genomics data from GDC
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import json
|
|
|
import requests
|
|
|
from typing import Dict, List, Optional, Any
|
|
|
from pathlib import Path
|
|
|
import yaml
|
|
|
from dataclasses import dataclass
|
|
|
import logging
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class GDCFile:
|
|
|
"""Represents a file from GDC Portal"""
|
|
|
file_id: str
|
|
|
file_name: str
|
|
|
file_size: int
|
|
|
data_type: str
|
|
|
data_format: str
|
|
|
experimental_strategy: str
|
|
|
case_id: str
|
|
|
project_id: str
|
|
|
|
|
|
|
|
|
class GDCClient:
|
|
|
"""Client for interacting with GDC Data Portal API"""
|
|
|
|
|
|
def __init__(self, config_path: str = "config.yml"):
|
|
|
with open(config_path, 'r') as f:
|
|
|
self.config = yaml.safe_load(f)['gdc']
|
|
|
|
|
|
self.api_url = self.config['api_url']
|
|
|
self.download_dir = Path(self.config['download_dir'])
|
|
|
self.download_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
self.session = requests.Session()
|
|
|
self.session.headers.update({
|
|
|
'Content-Type': 'application/json'
|
|
|
})
|
|
|
|
|
|
def search_files(
|
|
|
self,
|
|
|
filters: Optional[Dict] = None,
|
|
|
size: int = 100,
|
|
|
fields: Optional[List[str]] = None
|
|
|
) -> List[GDCFile]:
|
|
|
"""
|
|
|
Search for files in GDC
|
|
|
|
|
|
Args:
|
|
|
filters: GDC filter query
|
|
|
size: Number of results to return
|
|
|
fields: Fields to include in response
|
|
|
"""
|
|
|
endpoint = f"{self.api_url}/files"
|
|
|
|
|
|
if fields is None:
|
|
|
fields = [
|
|
|
'file_id', 'file_name', 'file_size', 'data_type',
|
|
|
'data_format', 'experimental_strategy', 'cases.case_id',
|
|
|
'cases.project.project_id'
|
|
|
]
|
|
|
|
|
|
params = {
|
|
|
'size': size,
|
|
|
'fields': ','.join(fields)
|
|
|
}
|
|
|
|
|
|
if filters:
|
|
|
params['filters'] = json.dumps(filters)
|
|
|
|
|
|
try:
|
|
|
response = self.session.get(endpoint, params=params)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
files = []
|
|
|
for hit in data.get('data', {}).get('hits', []):
|
|
|
gdc_file = GDCFile(
|
|
|
file_id=hit.get('file_id'),
|
|
|
file_name=hit.get('file_name'),
|
|
|
file_size=hit.get('file_size', 0),
|
|
|
data_type=hit.get('data_type'),
|
|
|
data_format=hit.get('data_format'),
|
|
|
experimental_strategy=hit.get('experimental_strategy'),
|
|
|
case_id=hit.get('cases', [{}])[0].get('case_id') if hit.get('cases') else None,
|
|
|
project_id=hit.get('cases', [{}])[0].get('project', {}).get('project_id') if hit.get('cases') else None
|
|
|
)
|
|
|
files.append(gdc_file)
|
|
|
|
|
|
logger.info(f"Found {len(files)} files")
|
|
|
return files
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error searching files: {e}")
|
|
|
return []
|
|
|
|
|
|
def download_file(
|
|
|
self,
|
|
|
file_id: str,
|
|
|
output_dir: Optional[Path] = None
|
|
|
) -> Optional[Path]:
|
|
|
"""
|
|
|
Download a file from GDC
|
|
|
|
|
|
Args:
|
|
|
file_id: GDC file UUID
|
|
|
output_dir: Directory to save file (defaults to config download_dir)
|
|
|
|
|
|
Returns:
|
|
|
Path to downloaded file or None if failed
|
|
|
"""
|
|
|
if output_dir is None:
|
|
|
output_dir = self.download_dir
|
|
|
|
|
|
output_dir = Path(output_dir)
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
endpoint = f"{self.api_url}/data/{file_id}"
|
|
|
|
|
|
try:
|
|
|
logger.info(f"Downloading file {file_id}")
|
|
|
response = self.session.get(endpoint, stream=True)
|
|
|
response.raise_for_status()
|
|
|
|
|
|
|
|
|
content_disposition = response.headers.get('content-disposition', '')
|
|
|
if 'filename=' in content_disposition:
|
|
|
filename = content_disposition.split('filename=')[1].strip('"')
|
|
|
else:
|
|
|
filename = file_id
|
|
|
|
|
|
output_path = output_dir / filename
|
|
|
|
|
|
with open(output_path, 'wb') as f:
|
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
|
f.write(chunk)
|
|
|
|
|
|
logger.info(f"Downloaded to {output_path}")
|
|
|
return output_path
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error downloading file {file_id}: {e}")
|
|
|
return None
|
|
|
|
|
|
def get_project_files(
|
|
|
self,
|
|
|
project_id: str,
|
|
|
data_type: Optional[str] = None,
|
|
|
limit: int = 100
|
|
|
) -> List[GDCFile]:
|
|
|
"""
|
|
|
Get files for a specific project
|
|
|
|
|
|
Args:
|
|
|
project_id: GDC project ID (e.g., TCGA-BRCA)
|
|
|
data_type: Filter by data type
|
|
|
limit: Maximum number of files
|
|
|
"""
|
|
|
filters = {
|
|
|
"op": "and",
|
|
|
"content": [
|
|
|
{
|
|
|
"op": "in",
|
|
|
"content": {
|
|
|
"field": "cases.project.project_id",
|
|
|
"value": [project_id]
|
|
|
}
|
|
|
}
|
|
|
]
|
|
|
}
|
|
|
|
|
|
if data_type:
|
|
|
filters["content"].append({
|
|
|
"op": "in",
|
|
|
"content": {
|
|
|
"field": "data_type",
|
|
|
"value": [data_type]
|
|
|
}
|
|
|
})
|
|
|
|
|
|
return self.search_files(filters=filters, size=limit)
|
|
|
|
|
|
def get_mutation_data(self, project_id: str, limit: int = 100) -> List[GDCFile]:
|
|
|
"""Get mutation/variant calling files for a project"""
|
|
|
return self.get_project_files(
|
|
|
project_id=project_id,
|
|
|
data_type="Simple Nucleotide Variation",
|
|
|
limit=limit
|
|
|
)
|
|
|
|
|
|
def get_gene_expression_data(self, project_id: str, limit: int = 100) -> List[GDCFile]:
|
|
|
"""Get gene expression data for a project"""
|
|
|
return self.get_project_files(
|
|
|
project_id=project_id,
|
|
|
data_type="Gene Expression Quantification",
|
|
|
limit=limit
|
|
|
)
|
|
|
|
|
|
def search_cases(
|
|
|
self,
|
|
|
project_id: str,
|
|
|
filters: Optional[Dict] = None,
|
|
|
size: int = 100
|
|
|
) -> List[Dict]:
|
|
|
"""
|
|
|
Search for cases (patients) in GDC
|
|
|
|
|
|
Args:
|
|
|
project_id: GDC project ID
|
|
|
filters: Additional filter criteria
|
|
|
size: Number of results
|
|
|
"""
|
|
|
endpoint = f"{self.api_url}/cases"
|
|
|
|
|
|
base_filters = {
|
|
|
"op": "in",
|
|
|
"content": {
|
|
|
"field": "project.project_id",
|
|
|
"value": [project_id]
|
|
|
}
|
|
|
}
|
|
|
|
|
|
if filters:
|
|
|
filter_query = {
|
|
|
"op": "and",
|
|
|
"content": [base_filters, filters]
|
|
|
}
|
|
|
else:
|
|
|
filter_query = base_filters
|
|
|
|
|
|
params = {
|
|
|
'size': size,
|
|
|
'filters': json.dumps(filter_query),
|
|
|
'fields': 'case_id,project.project_id,demographic,diagnoses'
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
response = self.session.get(endpoint, params=params)
|
|
|
response.raise_for_status()
|
|
|
data = response.json()
|
|
|
|
|
|
cases = data.get('data', {}).get('hits', [])
|
|
|
logger.info(f"Found {len(cases)} cases")
|
|
|
return cases
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error searching cases: {e}")
|
|
|
return []
|
|
|
|
|
|
|
|
|
class GDCDataParser:
|
|
|
"""Parse downloaded GDC data files"""
|
|
|
|
|
|
@staticmethod
|
|
|
def parse_maf(file_path: Path) -> List[Dict]:
|
|
|
"""
|
|
|
Parse MAF (Mutation Annotation Format) file
|
|
|
|
|
|
Returns list of mutation records
|
|
|
"""
|
|
|
mutations = []
|
|
|
|
|
|
try:
|
|
|
with open(file_path, 'r') as f:
|
|
|
|
|
|
for line in f:
|
|
|
if not line.startswith('#'):
|
|
|
header_line = line.strip()
|
|
|
break
|
|
|
|
|
|
headers = header_line.split('\t')
|
|
|
|
|
|
for line in f:
|
|
|
if line.startswith('#'):
|
|
|
continue
|
|
|
|
|
|
values = line.strip().split('\t')
|
|
|
if len(values) == len(headers):
|
|
|
mutation = dict(zip(headers, values))
|
|
|
mutations.append(mutation)
|
|
|
|
|
|
logger.info(f"Parsed {len(mutations)} mutations from {file_path}")
|
|
|
return mutations
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error parsing MAF file: {e}")
|
|
|
return []
|
|
|
|
|
|
@staticmethod
|
|
|
def parse_vcf(file_path: Path) -> List[Dict]:
|
|
|
"""
|
|
|
Parse VCF (Variant Call Format) file
|
|
|
|
|
|
Returns list of variant records
|
|
|
"""
|
|
|
variants = []
|
|
|
|
|
|
try:
|
|
|
with open(file_path, 'r') as f:
|
|
|
for line in f:
|
|
|
if line.startswith('##'):
|
|
|
continue
|
|
|
if line.startswith('#CHROM'):
|
|
|
headers = line.strip().split('\t')
|
|
|
continue
|
|
|
|
|
|
values = line.strip().split('\t')
|
|
|
variant = {
|
|
|
'chrom': values[0],
|
|
|
'pos': values[1],
|
|
|
'id': values[2],
|
|
|
'ref': values[3],
|
|
|
'alt': values[4],
|
|
|
'qual': values[5],
|
|
|
'filter': values[6],
|
|
|
'info': values[7]
|
|
|
}
|
|
|
variants.append(variant)
|
|
|
|
|
|
logger.info(f"Parsed {len(variants)} variants from {file_path}")
|
|
|
return variants
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error parsing VCF file: {e}")
|
|
|
return []
|
|
|
|
|
|
@staticmethod
|
|
|
def parse_clinical_data(data: Dict) -> Dict:
|
|
|
"""Parse clinical data from GDC case"""
|
|
|
clinical = {
|
|
|
'case_id': data.get('case_id'),
|
|
|
'project_id': data.get('project', {}).get('project_id'),
|
|
|
'demographic': {},
|
|
|
'diagnoses': []
|
|
|
}
|
|
|
|
|
|
|
|
|
demo = data.get('demographic', {})
|
|
|
clinical['demographic'] = {
|
|
|
'age_at_index': demo.get('age_at_index'),
|
|
|
'gender': demo.get('gender'),
|
|
|
'race': demo.get('race'),
|
|
|
'ethnicity': demo.get('ethnicity')
|
|
|
}
|
|
|
|
|
|
|
|
|
for diag in data.get('diagnoses', []):
|
|
|
diagnosis = {
|
|
|
'diagnosis_id': diag.get('diagnosis_id'),
|
|
|
'primary_diagnosis': diag.get('primary_diagnosis'),
|
|
|
'tumor_stage': diag.get('tumor_stage'),
|
|
|
'age_at_diagnosis': diag.get('age_at_diagnosis'),
|
|
|
'vital_status': diag.get('vital_status')
|
|
|
}
|
|
|
clinical['diagnoses'].append(diagnosis)
|
|
|
|
|
|
return clinical
|
|
|
|