from pathlib import Path import pandas as pd import numpy as np from typing import Any import re from preprocessing.artifact_manager import manager BASE_DIR = Path(__file__).resolve().parents[2] RAW_DIR = BASE_DIR / "data" / "raw" PROCESSED_DIR = BASE_DIR / "data" / "processed" TWOSIDES_HASH_DIM = 256 def _clean_cid(cid: Any) -> str: cleaned = re.sub(r'[^0-9]', '', str(cid)).lstrip('0') return cleaned if cleaned else '0' def _canonical_pair_key(a: str, b: str) -> str: return '||'.join(sorted([a, b])) import json import hashlib def _hash_token(token: str, size: int) -> int: digest = hashlib.sha1(token.encode('utf-8')).hexdigest() return int(digest[:8], 16) % size def build_twosides_artifacts(force: bool = False): twosides_path = RAW_DIR / 'twosides' / 'ChChSe-Decagon_polypharmacy.csv.gz' if not twosides_path.exists(): print(f"Skipping Twosides build: File not found at {twosides_path}") return out_path = PROCESSED_DIR / "twosides_stats.parquet" if not force and out_path.exists(): return from training.canonical_drug_mapper import CanonicalDrugMapper mapper = CanonicalDrugMapper.from_structured_artifacts() unique_cids = set() for chunk in pd.read_csv(twosides_path, compression='gzip', chunksize=250000, usecols=[0, 1]): col_1, col_2 = chunk.columns[:2] unique_cids.update(chunk[col_1].astype(str).map(_clean_cid)) unique_cids.update(chunk[col_2].astype(str).map(_clean_cid)) cid_to_canonical = {} for cid in unique_cids: res = mapper.resolve(f"cid:{cid}", source_hint="twosides") if res.canonical_id: cid_to_canonical[cid] = res.canonical_id else: cid_to_canonical[cid] = f"stitch::{cid}" stats_dict = {} for chunk in pd.read_csv(twosides_path, compression='gzip', chunksize=250000): col_1, col_2, col_3, col_4 = chunk.columns[:4] can_1 = chunk[col_1].astype(str).map(_clean_cid).map(lambda x: cid_to_canonical.get(x, '')) can_2 = chunk[col_2].astype(str).map(_clean_cid).map(lambda x: cid_to_canonical.get(x, '')) valid = (can_1 != '') & (can_2 != '') filtered = chunk[valid].copy() filtered['pair_key'] = [ _canonical_pair_key(a, b) for a, b in zip(can_1[valid].values, can_2[valid].values) ] for p_key, code, name in filtered[['pair_key', col_3, col_4]].itertuples(index=False, name=None): if p_key not in stats_dict: stats_dict[p_key] = { "pair_count": 0, "codes": set(), "names": set(), "hash_vector": np.zeros(TWOSIDES_HASH_DIM, dtype=np.float32) } st = stats_dict[p_key] st["pair_count"] += 1 if pd.notna(code): val = str(code).strip() if val: st["codes"].add(val) st["hash_vector"][_hash_token(f'code:{val}', TWOSIDES_HASH_DIM)] += 1.0 if pd.notna(name): val = str(name).strip() if val: st["names"].add(val) st["hash_vector"][_hash_token(f'name:{val}', TWOSIDES_HASH_DIM)] += 1.0 rows = [] for pk, st in stats_dict.items(): rows.append({ "pair_key": pk, "pair_count": st["pair_count"], "unique_codes": len(st["codes"]), "unique_names": len(st["names"]), "hash_vector": st["hash_vector"].tolist() }) df = pd.DataFrame(rows) df.to_parquet(out_path, index=False) manager.register_artifact("twosides_stats", df, out_path) if __name__ == "__main__": build_twosides_artifacts(True)