ddi / src /preprocessing /twosides_builder.py
github-actions[bot]
Deploy from GitHub Actions (fb28c05c54cf19184fc3f14f1bf3297ba5749ea2)
d29b763
from pathlib import Path
import pandas as pd
import numpy as np
from typing import Any
import re
from preprocessing.artifact_manager import manager
BASE_DIR = Path(__file__).resolve().parents[2]
RAW_DIR = BASE_DIR / "data" / "raw"
PROCESSED_DIR = BASE_DIR / "data" / "processed"
TWOSIDES_HASH_DIM = 256
def _clean_cid(cid: Any) -> str:
cleaned = re.sub(r'[^0-9]', '', str(cid)).lstrip('0')
return cleaned if cleaned else '0'
def _canonical_pair_key(a: str, b: str) -> str:
return '||'.join(sorted([a, b]))
import json
import hashlib
def _hash_token(token: str, size: int) -> int:
digest = hashlib.sha1(token.encode('utf-8')).hexdigest()
return int(digest[:8], 16) % size
def build_twosides_artifacts(force: bool = False):
twosides_path = RAW_DIR / 'twosides' / 'ChChSe-Decagon_polypharmacy.csv.gz'
if not twosides_path.exists():
print(f"Skipping Twosides build: File not found at {twosides_path}")
return
out_path = PROCESSED_DIR / "twosides_stats.parquet"
if not force and out_path.exists():
return
from training.canonical_drug_mapper import CanonicalDrugMapper
mapper = CanonicalDrugMapper.from_structured_artifacts()
unique_cids = set()
for chunk in pd.read_csv(twosides_path, compression='gzip', chunksize=250000, usecols=[0, 1]):
col_1, col_2 = chunk.columns[:2]
unique_cids.update(chunk[col_1].astype(str).map(_clean_cid))
unique_cids.update(chunk[col_2].astype(str).map(_clean_cid))
cid_to_canonical = {}
for cid in unique_cids:
res = mapper.resolve(f"cid:{cid}", source_hint="twosides")
if res.canonical_id:
cid_to_canonical[cid] = res.canonical_id
else:
cid_to_canonical[cid] = f"stitch::{cid}"
stats_dict = {}
for chunk in pd.read_csv(twosides_path, compression='gzip', chunksize=250000):
col_1, col_2, col_3, col_4 = chunk.columns[:4]
can_1 = chunk[col_1].astype(str).map(_clean_cid).map(lambda x: cid_to_canonical.get(x, ''))
can_2 = chunk[col_2].astype(str).map(_clean_cid).map(lambda x: cid_to_canonical.get(x, ''))
valid = (can_1 != '') & (can_2 != '')
filtered = chunk[valid].copy()
filtered['pair_key'] = [
_canonical_pair_key(a, b) for a, b in zip(can_1[valid].values, can_2[valid].values)
]
for p_key, code, name in filtered[['pair_key', col_3, col_4]].itertuples(index=False, name=None):
if p_key not in stats_dict:
stats_dict[p_key] = {
"pair_count": 0,
"codes": set(),
"names": set(),
"hash_vector": np.zeros(TWOSIDES_HASH_DIM, dtype=np.float32)
}
st = stats_dict[p_key]
st["pair_count"] += 1
if pd.notna(code):
val = str(code).strip()
if val:
st["codes"].add(val)
st["hash_vector"][_hash_token(f'code:{val}', TWOSIDES_HASH_DIM)] += 1.0
if pd.notna(name):
val = str(name).strip()
if val:
st["names"].add(val)
st["hash_vector"][_hash_token(f'name:{val}', TWOSIDES_HASH_DIM)] += 1.0
rows = []
for pk, st in stats_dict.items():
rows.append({
"pair_key": pk,
"pair_count": st["pair_count"],
"unique_codes": len(st["codes"]),
"unique_names": len(st["names"]),
"hash_vector": st["hash_vector"].tolist()
})
df = pd.DataFrame(rows)
df.to_parquet(out_path, index=False)
manager.register_artifact("twosides_stats", df, out_path)
if __name__ == "__main__":
build_twosides_artifacts(True)