Spaces:
Runtime error
Runtime error
| import requests | |
| import json | |
| import os | |
| import argparse | |
| import pandas as pd | |
| from tqdm import tqdm | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| def get_metadata_from_rcsb(pdb): | |
| template_file_path = "download/rcsb_query_template.txt" | |
| with open(template_file_path, 'r') as file: | |
| query_template = file.read() | |
| variables = {"id": pdb} | |
| message = f"{pdb} successfully downloaded" | |
| url = "https://data.rcsb.org/graphql" | |
| response = requests.post(url, json={'query': query_template, 'variables': variables}) | |
| if response.status_code == 200: | |
| result = response.json() | |
| else: | |
| message = f"{pdb} failed to download" | |
| return None, message | |
| if not result["data"]["entry"]: | |
| message = f"{pdb} failed to download" | |
| return None, message | |
| return result, message | |
| def download_single_pdb(pdb_id, out_dir): | |
| os.makedirs(out_dir, exist_ok=True) | |
| output_file = os.path.join(out_dir, f"{pdb_id}.json") | |
| if os.path.exists(output_file): | |
| return f"Skipping {pdb_id}, already exists" | |
| result, message = get_metadata_from_rcsb(pdb_id) | |
| if result is None: | |
| return message | |
| with open(output_file, 'w') as f: | |
| json.dump(result, f) | |
| return message | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--pdb_id_file", type=str, default=None) | |
| parser.add_argument("--pdb_id", type=str, default=None) | |
| parser.add_argument("--error_file", type=str, default=None) | |
| parser.add_argument("--out_dir", type=str, required=True) | |
| parser.add_argument("--num_workers", type=int, default=12) | |
| args = parser.parse_args() | |
| if not args.pdb_id and not args.pdb_id_file: | |
| print("Error: Must provide either pdb_id or pdb_id_file") | |
| exit(1) | |
| os.makedirs(args.out_dir, exist_ok=True) | |
| downloaded_pdbs = [p[:4] for p in os.listdir(args.out_dir)] | |
| error_proteins = [] | |
| error_messages = [] | |
| if args.pdb_id_file: | |
| pdbs = open(args.pdb_id_file, 'r').read().splitlines() | |
| def download_pdb_metadata(pdb_id, downloaded_pdbs, args): | |
| if pdb_id in downloaded_pdbs: | |
| return pdb_id, f"{pdb_id} already exists, skipping" | |
| result, message = get_metadata_from_rcsb(pdb_id) | |
| if result is None: | |
| return pdb_id, message | |
| with open(os.path.join(args.out_dir, f"{pdb_id}.json"), 'w') as f: | |
| json.dump(result, f) | |
| return pdb_id, message | |
| with ThreadPoolExecutor(max_workers=args.num_workers) as executor: | |
| future_to_pdb = {executor.submit(download_pdb_metadata, pdb_id, downloaded_pdbs, args): pdb_id for pdb_id in pdbs} | |
| with tqdm(total=len(pdbs), desc="Downloading PDB Metadata") as bar: | |
| for future in as_completed(future_to_pdb): | |
| pdb_id, message = future.result() | |
| bar.set_description(message) | |
| if "failed" in message: | |
| error_proteins.append(pdb_id) | |
| error_messages.append(message) | |
| bar.update(1) | |
| elif args.pdb_id: | |
| message = download_single_pdb(args.pdb_id, args.out_dir) | |
| print(message) | |
| if "failed" in message: | |
| error_proteins.append(args.pdb_id) | |
| error_messages.append(message) | |
| if error_proteins and args.error_file: | |
| error_dict = {"protein": error_proteins, "error": error_messages} | |
| error_file_dir = os.path.dirname(args.error_file) | |
| os.makedirs(error_file_dir, exist_ok=True) | |
| pd.DataFrame(error_dict).to_csv(args.error_file, index=False) |