Mentors4EDU's picture
Upload 33 files
7a92197 verified
"""
GDC Data Portal Client
Download and parse cancer genomics data from GDC
"""
import os
import json
import requests
from typing import Dict, List, Optional, Any
from pathlib import Path
import yaml
from dataclasses import dataclass
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class GDCFile:
"""Represents a file from GDC Portal"""
file_id: str
file_name: str
file_size: int
data_type: str
data_format: str
experimental_strategy: str
case_id: str
project_id: str
class GDCClient:
"""Client for interacting with GDC Data Portal API"""
def __init__(self, config_path: str = "config.yml"):
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)['gdc']
self.api_url = self.config['api_url']
self.download_dir = Path(self.config['download_dir'])
self.download_dir.mkdir(parents=True, exist_ok=True)
self.session = requests.Session()
self.session.headers.update({
'Content-Type': 'application/json'
})
def search_files(
self,
filters: Optional[Dict] = None,
size: int = 100,
fields: Optional[List[str]] = None
) -> List[GDCFile]:
"""
Search for files in GDC
Args:
filters: GDC filter query
size: Number of results to return
fields: Fields to include in response
"""
endpoint = f"{self.api_url}/files"
if fields is None:
fields = [
'file_id', 'file_name', 'file_size', 'data_type',
'data_format', 'experimental_strategy', 'cases.case_id',
'cases.project.project_id'
]
params = {
'size': size,
'fields': ','.join(fields)
}
if filters:
params['filters'] = json.dumps(filters)
try:
response = self.session.get(endpoint, params=params)
response.raise_for_status()
data = response.json()
files = []
for hit in data.get('data', {}).get('hits', []):
gdc_file = GDCFile(
file_id=hit.get('file_id'),
file_name=hit.get('file_name'),
file_size=hit.get('file_size', 0),
data_type=hit.get('data_type'),
data_format=hit.get('data_format'),
experimental_strategy=hit.get('experimental_strategy'),
case_id=hit.get('cases', [{}])[0].get('case_id') if hit.get('cases') else None,
project_id=hit.get('cases', [{}])[0].get('project', {}).get('project_id') if hit.get('cases') else None
)
files.append(gdc_file)
logger.info(f"Found {len(files)} files")
return files
except Exception as e:
logger.error(f"Error searching files: {e}")
return []
def download_file(
self,
file_id: str,
output_dir: Optional[Path] = None
) -> Optional[Path]:
"""
Download a file from GDC
Args:
file_id: GDC file UUID
output_dir: Directory to save file (defaults to config download_dir)
Returns:
Path to downloaded file or None if failed
"""
if output_dir is None:
output_dir = self.download_dir
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
endpoint = f"{self.api_url}/data/{file_id}"
try:
logger.info(f"Downloading file {file_id}")
response = self.session.get(endpoint, stream=True)
response.raise_for_status()
# Get filename from headers
content_disposition = response.headers.get('content-disposition', '')
if 'filename=' in content_disposition:
filename = content_disposition.split('filename=')[1].strip('"')
else:
filename = file_id
output_path = output_dir / filename
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logger.info(f"Downloaded to {output_path}")
return output_path
except Exception as e:
logger.error(f"Error downloading file {file_id}: {e}")
return None
def get_project_files(
self,
project_id: str,
data_type: Optional[str] = None,
limit: int = 100
) -> List[GDCFile]:
"""
Get files for a specific project
Args:
project_id: GDC project ID (e.g., TCGA-BRCA)
data_type: Filter by data type
limit: Maximum number of files
"""
filters = {
"op": "and",
"content": [
{
"op": "in",
"content": {
"field": "cases.project.project_id",
"value": [project_id]
}
}
]
}
if data_type:
filters["content"].append({
"op": "in",
"content": {
"field": "data_type",
"value": [data_type]
}
})
return self.search_files(filters=filters, size=limit)
def get_mutation_data(self, project_id: str, limit: int = 100) -> List[GDCFile]:
"""Get mutation/variant calling files for a project"""
return self.get_project_files(
project_id=project_id,
data_type="Simple Nucleotide Variation",
limit=limit
)
def get_gene_expression_data(self, project_id: str, limit: int = 100) -> List[GDCFile]:
"""Get gene expression data for a project"""
return self.get_project_files(
project_id=project_id,
data_type="Gene Expression Quantification",
limit=limit
)
def search_cases(
self,
project_id: str,
filters: Optional[Dict] = None,
size: int = 100
) -> List[Dict]:
"""
Search for cases (patients) in GDC
Args:
project_id: GDC project ID
filters: Additional filter criteria
size: Number of results
"""
endpoint = f"{self.api_url}/cases"
base_filters = {
"op": "in",
"content": {
"field": "project.project_id",
"value": [project_id]
}
}
if filters:
filter_query = {
"op": "and",
"content": [base_filters, filters]
}
else:
filter_query = base_filters
params = {
'size': size,
'filters': json.dumps(filter_query),
'fields': 'case_id,project.project_id,demographic,diagnoses'
}
try:
response = self.session.get(endpoint, params=params)
response.raise_for_status()
data = response.json()
cases = data.get('data', {}).get('hits', [])
logger.info(f"Found {len(cases)} cases")
return cases
except Exception as e:
logger.error(f"Error searching cases: {e}")
return []
class GDCDataParser:
"""Parse downloaded GDC data files"""
@staticmethod
def parse_maf(file_path: Path) -> List[Dict]:
"""
Parse MAF (Mutation Annotation Format) file
Returns list of mutation records
"""
mutations = []
try:
with open(file_path, 'r') as f:
# Skip comment lines
for line in f:
if not line.startswith('#'):
header_line = line.strip()
break
headers = header_line.split('\t')
for line in f:
if line.startswith('#'):
continue
values = line.strip().split('\t')
if len(values) == len(headers):
mutation = dict(zip(headers, values))
mutations.append(mutation)
logger.info(f"Parsed {len(mutations)} mutations from {file_path}")
return mutations
except Exception as e:
logger.error(f"Error parsing MAF file: {e}")
return []
@staticmethod
def parse_vcf(file_path: Path) -> List[Dict]:
"""
Parse VCF (Variant Call Format) file
Returns list of variant records
"""
variants = []
try:
with open(file_path, 'r') as f:
for line in f:
if line.startswith('##'):
continue
if line.startswith('#CHROM'):
headers = line.strip().split('\t')
continue
values = line.strip().split('\t')
variant = {
'chrom': values[0],
'pos': values[1],
'id': values[2],
'ref': values[3],
'alt': values[4],
'qual': values[5],
'filter': values[6],
'info': values[7]
}
variants.append(variant)
logger.info(f"Parsed {len(variants)} variants from {file_path}")
return variants
except Exception as e:
logger.error(f"Error parsing VCF file: {e}")
return []
@staticmethod
def parse_clinical_data(data: Dict) -> Dict:
"""Parse clinical data from GDC case"""
clinical = {
'case_id': data.get('case_id'),
'project_id': data.get('project', {}).get('project_id'),
'demographic': {},
'diagnoses': []
}
# Parse demographic data
demo = data.get('demographic', {})
clinical['demographic'] = {
'age_at_index': demo.get('age_at_index'),
'gender': demo.get('gender'),
'race': demo.get('race'),
'ethnicity': demo.get('ethnicity')
}
# Parse diagnosis data
for diag in data.get('diagnoses', []):
diagnosis = {
'diagnosis_id': diag.get('diagnosis_id'),
'primary_diagnosis': diag.get('primary_diagnosis'),
'tumor_stage': diag.get('tumor_stage'),
'age_at_diagnosis': diag.get('age_at_diagnosis'),
'vital_status': diag.get('vital_status')
}
clinical['diagnoses'].append(diagnosis)
return clinical