File size: 3,857 Bytes
d29b763
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from pathlib import Path
import pandas as pd
import numpy as np
from typing import Any
import re
from preprocessing.artifact_manager import manager

BASE_DIR = Path(__file__).resolve().parents[2]
RAW_DIR = BASE_DIR / "data" / "raw"
PROCESSED_DIR = BASE_DIR / "data" / "processed"

TWOSIDES_HASH_DIM = 256

def _clean_cid(cid: Any) -> str:
    cleaned = re.sub(r'[^0-9]', '', str(cid)).lstrip('0')
    return cleaned if cleaned else '0'

def _canonical_pair_key(a: str, b: str) -> str:
    return '||'.join(sorted([a, b]))

import json
import hashlib

def _hash_token(token: str, size: int) -> int:
    digest = hashlib.sha1(token.encode('utf-8')).hexdigest()
    return int(digest[:8], 16) % size

def build_twosides_artifacts(force: bool = False):
    twosides_path = RAW_DIR / 'twosides' / 'ChChSe-Decagon_polypharmacy.csv.gz'
    if not twosides_path.exists():
        print(f"Skipping Twosides build: File not found at {twosides_path}")
        return
        
    out_path = PROCESSED_DIR / "twosides_stats.parquet"
    if not force and out_path.exists():
        return
        
    from training.canonical_drug_mapper import CanonicalDrugMapper
    mapper = CanonicalDrugMapper.from_structured_artifacts()
    
    unique_cids = set()
    for chunk in pd.read_csv(twosides_path, compression='gzip', chunksize=250000, usecols=[0, 1]):
        col_1, col_2 = chunk.columns[:2]
        unique_cids.update(chunk[col_1].astype(str).map(_clean_cid))
        unique_cids.update(chunk[col_2].astype(str).map(_clean_cid))
        
    cid_to_canonical = {}
    for cid in unique_cids:
        res = mapper.resolve(f"cid:{cid}", source_hint="twosides")
        if res.canonical_id:
            cid_to_canonical[cid] = res.canonical_id
        else:
            cid_to_canonical[cid] = f"stitch::{cid}"
            
    stats_dict = {}
    for chunk in pd.read_csv(twosides_path, compression='gzip', chunksize=250000):
        col_1, col_2, col_3, col_4 = chunk.columns[:4]
        
        can_1 = chunk[col_1].astype(str).map(_clean_cid).map(lambda x: cid_to_canonical.get(x, ''))
        can_2 = chunk[col_2].astype(str).map(_clean_cid).map(lambda x: cid_to_canonical.get(x, ''))
        
        valid = (can_1 != '') & (can_2 != '')
        filtered = chunk[valid].copy()
        
        filtered['pair_key'] = [
            _canonical_pair_key(a, b) for a, b in zip(can_1[valid].values, can_2[valid].values)
        ]
        
        for p_key, code, name in filtered[['pair_key', col_3, col_4]].itertuples(index=False, name=None):
            if p_key not in stats_dict:
                stats_dict[p_key] = {
                    "pair_count": 0, 
                    "codes": set(), 
                    "names": set(),
                    "hash_vector": np.zeros(TWOSIDES_HASH_DIM, dtype=np.float32)
                }
            st = stats_dict[p_key]
            st["pair_count"] += 1
            if pd.notna(code): 
                val = str(code).strip()
                if val:
                    st["codes"].add(val)
                    st["hash_vector"][_hash_token(f'code:{val}', TWOSIDES_HASH_DIM)] += 1.0
            if pd.notna(name): 
                val = str(name).strip()
                if val:
                    st["names"].add(val)
                    st["hash_vector"][_hash_token(f'name:{val}', TWOSIDES_HASH_DIM)] += 1.0

    rows = []
    for pk, st in stats_dict.items():
        rows.append({
            "pair_key": pk,
            "pair_count": st["pair_count"],
            "unique_codes": len(st["codes"]),
            "unique_names": len(st["names"]),
            "hash_vector": st["hash_vector"].tolist()
        })
        
    df = pd.DataFrame(rows)
    df.to_parquet(out_path, index=False)
    manager.register_artifact("twosides_stats", df, out_path)

if __name__ == "__main__":
    build_twosides_artifacts(True)