Spaces:
Running
Running
| from pathlib import Path | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Any | |
| import re | |
| from preprocessing.artifact_manager import manager | |
| BASE_DIR = Path(__file__).resolve().parents[2] | |
| RAW_DIR = BASE_DIR / "data" / "raw" | |
| PROCESSED_DIR = BASE_DIR / "data" / "processed" | |
| TWOSIDES_HASH_DIM = 256 | |
| def _clean_cid(cid: Any) -> str: | |
| cleaned = re.sub(r'[^0-9]', '', str(cid)).lstrip('0') | |
| return cleaned if cleaned else '0' | |
| def _canonical_pair_key(a: str, b: str) -> str: | |
| return '||'.join(sorted([a, b])) | |
| import json | |
| import hashlib | |
| def _hash_token(token: str, size: int) -> int: | |
| digest = hashlib.sha1(token.encode('utf-8')).hexdigest() | |
| return int(digest[:8], 16) % size | |
| def build_twosides_artifacts(force: bool = False): | |
| twosides_path = RAW_DIR / 'twosides' / 'ChChSe-Decagon_polypharmacy.csv.gz' | |
| if not twosides_path.exists(): | |
| print(f"Skipping Twosides build: File not found at {twosides_path}") | |
| return | |
| out_path = PROCESSED_DIR / "twosides_stats.parquet" | |
| if not force and out_path.exists(): | |
| return | |
| from training.canonical_drug_mapper import CanonicalDrugMapper | |
| mapper = CanonicalDrugMapper.from_structured_artifacts() | |
| unique_cids = set() | |
| for chunk in pd.read_csv(twosides_path, compression='gzip', chunksize=250000, usecols=[0, 1]): | |
| col_1, col_2 = chunk.columns[:2] | |
| unique_cids.update(chunk[col_1].astype(str).map(_clean_cid)) | |
| unique_cids.update(chunk[col_2].astype(str).map(_clean_cid)) | |
| cid_to_canonical = {} | |
| for cid in unique_cids: | |
| res = mapper.resolve(f"cid:{cid}", source_hint="twosides") | |
| if res.canonical_id: | |
| cid_to_canonical[cid] = res.canonical_id | |
| else: | |
| cid_to_canonical[cid] = f"stitch::{cid}" | |
| stats_dict = {} | |
| for chunk in pd.read_csv(twosides_path, compression='gzip', chunksize=250000): | |
| col_1, col_2, col_3, col_4 = chunk.columns[:4] | |
| can_1 = chunk[col_1].astype(str).map(_clean_cid).map(lambda x: cid_to_canonical.get(x, '')) | |
| can_2 = chunk[col_2].astype(str).map(_clean_cid).map(lambda x: cid_to_canonical.get(x, '')) | |
| valid = (can_1 != '') & (can_2 != '') | |
| filtered = chunk[valid].copy() | |
| filtered['pair_key'] = [ | |
| _canonical_pair_key(a, b) for a, b in zip(can_1[valid].values, can_2[valid].values) | |
| ] | |
| for p_key, code, name in filtered[['pair_key', col_3, col_4]].itertuples(index=False, name=None): | |
| if p_key not in stats_dict: | |
| stats_dict[p_key] = { | |
| "pair_count": 0, | |
| "codes": set(), | |
| "names": set(), | |
| "hash_vector": np.zeros(TWOSIDES_HASH_DIM, dtype=np.float32) | |
| } | |
| st = stats_dict[p_key] | |
| st["pair_count"] += 1 | |
| if pd.notna(code): | |
| val = str(code).strip() | |
| if val: | |
| st["codes"].add(val) | |
| st["hash_vector"][_hash_token(f'code:{val}', TWOSIDES_HASH_DIM)] += 1.0 | |
| if pd.notna(name): | |
| val = str(name).strip() | |
| if val: | |
| st["names"].add(val) | |
| st["hash_vector"][_hash_token(f'name:{val}', TWOSIDES_HASH_DIM)] += 1.0 | |
| rows = [] | |
| for pk, st in stats_dict.items(): | |
| rows.append({ | |
| "pair_key": pk, | |
| "pair_count": st["pair_count"], | |
| "unique_codes": len(st["codes"]), | |
| "unique_names": len(st["names"]), | |
| "hash_vector": st["hash_vector"].tolist() | |
| }) | |
| df = pd.DataFrame(rows) | |
| df.to_parquet(out_path, index=False) | |
| manager.register_artifact("twosides_stats", df, out_path) | |
| if __name__ == "__main__": | |
| build_twosides_artifacts(True) |