import io import json import re from pathlib import Path from typing import List, Optional, Dict, Any, Tuple from functools import lru_cache import pandas as pd from fastapi import FastAPI, UploadFile, File, HTTPException, Query from fastapi.responses import JSONResponse, StreamingResponse import difflib from fastapi.middleware.cors import CORSMiddleware import asyncio app = FastAPI(title="RFQ ↔ Product Master Matcher (difflib hybrid - Optimized)") app.add_middleware( CORSMiddleware, allow_origins=["*"], # lock this down in prod allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ---------- Fixed Tender Template ---------- TEMPLATE_COLUMNS = [ "id", "tender_id", "tender_code", "customer_id", "customer_name", "fy", "category", "code", "current_brand_description", "generic_name", "annual_volume_qty", "quotation Price", "dosage form" ] # ---------- OPTIMIZED: Compile regex patterns once at module level ---------- UNIT_PATTERN_COMPILED = re.compile( r'\b\d+(?:\.\d+)?\s*(?:mg|mcg|μg|µg|gm?|kg|iu|i\. u\.|kiu|miu|ml|l|dl|%|w/w|w/v|v/v|microgram|milligram|gram|kilogram|liter|milliliter)\b', re.IGNORECASE ) FORMS_PATTERN_COMPILED = re. compile( r'\b(tablet|tablets|capsule|capsules|cap|caps|injection|injections|inj|syrup|syrups|suspension|suspensions|cream|creams|ointment|ointments|gel|gels|drop|drops|spray|sprays|powder|powders|inhaler|inhalers|solution|solutions|ampule|ampules|amp|amps|vial|vials|via|bottle|bottles|bot|bots|sachet|sachets|sac|sacs|suppository|suppositories|sup|sups|patch|patches|pat|pats|lotion|lotions|respule|respules|res|pfs|kit|kits|num|nums|car|cars|pac|pacs|tub|tubs|box|boxes|for)\b', re.IGNORECASE ) FRACTION_PATTERN = re.compile(r'\d+\s*/\s*\d+') STANDALONE_NUM_PATTERN = re.compile(r'\b\d+(?:\.\d+)?\b') WV_PATTERN = re.compile(r'\b[wv]\s*/\s*[wv]\b', re.IGNORECASE) WHITESPACE_PATTERN = re.compile(r'\s+') NON_WORD_PATTERN = re.compile(r'[^\w\s. %/+-]') # ---------- Normalization ---------- # OPTIMIZED: Use lru_cache for frequently repeated strings @lru_cache(maxsize=10000) def norm_base(s: str) -> str: s = str(s or "") s = s.lower() s = s.replace("+", " ").replace("/", " ") s = NON_WORD_PATTERN. sub(" ", s) s = WHITESPACE_PATTERN.sub(" ", s).strip() return s @lru_cache(maxsize=10000) def extract_numbers(s: str) -> Tuple[str, ... ]: # Return tuple for hashability s2 = norm_base(s) num_unit = UNIT_PATTERN_COMPILED.findall(s2) nums = STANDALONE_NUM_PATTERN.findall(s2) all_numbers = num_unit + nums return tuple(sorted(set([x. strip() for x in all_numbers]))) @lru_cache(maxsize=10000) def token_set(s: str) -> Tuple[str, ...]: # Return tuple for hashability return tuple(t for t in norm_base(s).split(" ") if t) # ---------- Synonyms / detection ---------- SYNONYMS: Dict[str, List[str]] = { "generic_name": [ "generic name", "generic", "molecule", "molecule name", "molecule with strength", "composition", "salt", "api", "active ingredient" ], "current_brand_description": ["brand name", "brand", "trade name", "product", "product name", "item", "item name", "drug name"], "annual_volume_qty": ["potential annual volume", "annual volume qty", "annual qty", "annual volume", "qty", "quantity", "rfq qty", "order qty", "excepted annual consumption qty_total", "annual consumption"], "quotation Price": ["offer price(unit wise) without taxes in rs", "offer price", "unit price", "quoted rate", "rate", "basic rate", "price per unit", "price"], "code": ["item code", "product code", "sku", "catalogue no", "catalog no", "catalog number", "code"], "customer_name": ["customer name", "hospital name", "hospital", "buyer", "consignee", "institution", "institute", "organisation", "organization"], "fy": ["fy", "financial year", "f.y.", "year"], "id": ["s no", "sr no", "serial", "s.no", "line id", "id"], "tender_id": ["tender id", "rfq id", "enquiry id"], "tender_code": ["tender code", "rfq code", "enquiry code", "tender no", "tender number", "rfq no", "rfq number"], "category": ["category", "schedule", "section", "chapter", "dept"], "dosage form": ["dosage form", "form", "drug form", "pharmaceutical form", "presentation", "type", "medicine type"], "__product_master_molecule__": ["molecule", "molecule name", "generic", "generic name", "api", "active ingredient", "composition", "salt"], "__product_master_brand_id__": ["brand id", "brand_id", "id", "bid", "brand code", "brand_code", "brandcode"], "__product_master_brand_name__": ["brand name", "brand", "product", "trade name", "brand_name", "brandname", "product name"], } # ---------- Header mapping ---------- def score_header(tcol: str, scol: str) -> float: tn, sn = norm_base(tcol), norm_base(scol) tset, sset = set(tn. split()), set(sn.split()) jacc = (len(tset & sset) / len(tset | sset)) if (tset and sset) else 0.0 contains = 1.0 if (tn in sn or sn in tn) else 0.0 fuzzy = difflib.SequenceMatcher(None, tn, sn).ratio() return 0.60*jacc + 0.25*contains + 0.15*fuzzy def map_headers_auto(src_cols: List[str], target_cols: List[str]) -> Dict[str, Optional[str]]: src_cols = [str(c) for c in src_cols] src_norm_map = {norm_base(c): c for c in src_cols} mapping: Dict[str, Optional[str]] = {} for tcol in target_cols: # 1) exact synonym for alias in SYNONYMS.get(tcol, []): n = norm_base(alias) if n in src_norm_map: mapping[tcol] = src_norm_map[n] break else: # 2) contains any synonym hit = None for alias in SYNONYMS.get(tcol, []): n = norm_base(alias) contain = [orig for nn, orig in src_norm_map.items() if (n in nn or nn in n)] if contain: hit = contain[0] break if hit: mapping[tcol] = hit else: # 3) best score best_src, best_score = None, -1.0 for scol in src_cols: sc = score_header(tcol, scol) if sc > best_score: best_score, best_src = sc, scol mapping[tcol] = best_src if best_score >= 0.35 else None return mapping def detect_single_column(df: pd.DataFrame, logical_name: str) -> Optional[str]: cols = [str(c) for c in df.columns] norm_map = {norm_base(c): c for c in cols} # exact first for alias in SYNONYMS.get(logical_name, []): n = norm_base(alias) if n in norm_map: return norm_map[n] # contains next for alias in SYNONYMS.get(logical_name, []): n = norm_base(alias) for nn, orig in norm_map.items(): if n in nn or nn in n: return orig # fallback: score best_col, best_score = None, -1.0 for c in cols: sc = score_header(logical_name, c) if sc > best_score: best_score, best_col = sc, c return best_col if best_score >= 0.35 else None # ---------- File reading ---------- def guess_delimiter(sample: str) -> str: for d in ["\t", ";", "|", ","]: if d in sample: return d if d != "\t" else "\t" return "," def drop_unnamed_columns(df: pd.DataFrame) -> pd.DataFrame: keep = [c for c in df. columns if not str(c).startswith("Unnamed")] return df.loc[:, keep] def ensure_str_columns(df: pd.DataFrame) -> pd.DataFrame: df. columns = [str(c) for c in df.columns] return df def choose_best_sheet_and_header(xl: pd.ExcelFile, max_header_rows: int = 30): best = {"score": -1, "df": None, "sheet": None, "header": None, "mapping": None} for sheet in xl.sheet_names: for header in range(max_header_rows + 1): try: df = pd.read_excel(xl, sheet_name=sheet, header=header) df = drop_unnamed_columns(df) if df.dropna(how="all").empty: continue df = ensure_str_columns(df) m = map_headers_auto(df.columns. tolist(), TEMPLATE_COLUMNS) score = sum(1 for v in m.values() if v is not None) if score > best["score"]: best = {"score": score, "df": df, "sheet": sheet, "header": header, "mapping": m} except: continue if best["df"] is None: raise ValueError("No readable tables found in the Excel workbook.") return best def dataframe_from_upload_bytes(filename: str, data: bytes) -> pd.DataFrame: ext = Path(filename).suffix.lower() if ext in [".xlsx", ".xls", ".xlsm", ". ods"]: xl = pd.ExcelFile(io.BytesIO(data)) best = choose_best_sheet_and_header(xl) return best["df"] if ext in [".csv", ".tsv"]: text = data.decode("utf-8", errors="ignore") delim = guess_delimiter(text[: 4096]) return pd.read_csv(io. StringIO(text), sep=delim, engine="python") if ext == ".json": js = json.loads(data.decode("utf-8", errors="ignore")) if isinstance(js, list): return pd.DataFrame(js) if isinstance(js, dict) and "data" in js and isinstance(js["data"], list): return pd.json_normalize(js["data"]) raise ValueError( "Product master JSON must be a list of objects or an object with a 'data' array.") raise ValueError(f"Unsupported file type: {ext}") def build_mapped_rfq(src_df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Optional[str]]]: src_df = ensure_str_columns(drop_unnamed_columns(src_df)) mapping = map_headers_auto(src_df.columns.tolist(), TEMPLATE_COLUMNS) out = pd.DataFrame(index=src_df.index) for tcol in TEMPLATE_COLUMNS: src = mapping.get(tcol) out[tcol] = src_df[str(src)] if src else pd.Series( [pd.NA]*len(src_df), index=src_df.index) return out, mapping # ---------- OPTIMIZED: Molecule extraction with caching ---------- @lru_cache(maxsize=10000) def extract_molecule_base(s: str) -> str: """Extract core molecule name by removing dosages, units, and forms.""" s_norm = norm_base(s) # Step 1: Remove dosage forms FIRST s_norm = FORMS_PATTERN_COMPILED.sub(' ', s_norm) # Step 2: Remove number+unit patterns s_norm = UNIT_PATTERN_COMPILED.sub(' ', s_norm) # Step 3: Remove fractions and ratios s_norm = FRACTION_PATTERN. sub(' ', s_norm) # Step 4: Remove standalone numbers s_norm = STANDALONE_NUM_PATTERN.sub(' ', s_norm) # Step 5: Remove w/w, w/v, v/v s_norm = WV_PATTERN.sub(' ', s_norm) # Step 6: Clean up spaces s_norm = WHITESPACE_PATTERN.sub(' ', s_norm).strip() return s_norm # ---------- OPTIMIZED: Pre-computed product master ---------- class PrecomputedProductMaster: """Pre-compute all expensive operations once for the product master""" def __init__(self, pm_df: pd.DataFrame, molecule_col: str, brand_id_col: Optional[str], brand_name_col: Optional[str]): subset = pm_df. dropna(subset=[molecule_col]).copy() # Store original data self.molecule_col = molecule_col self.mol_raw = subset[molecule_col].astype(str).tolist() self.brand_ids = subset[brand_id_col].astype(str).tolist() \ if brand_id_col and brand_id_col in subset. columns else [None] * len(subset) self.brand_names = subset[brand_name_col].astype(str).tolist() \ if brand_name_col and brand_name_col in subset.columns else [None] * len(subset) self.idxs = subset.index.tolist() # Pre-compute normalized forms print(f"Pre-computing {len(self.mol_raw)} product master entries...") self.mol_norm = [norm_base(m) for m in self.mol_raw] self.mol_base = [extract_molecule_base(m) for m in self.mol_raw] self.mol_tokens = [set(token_set(mb)) for mb in self.mol_base] self.mol_numbers = [set(extract_numbers(m)) for m in self.mol_raw] print("Pre-computation complete!") def __len__(self): return len(self.mol_raw) # ---------- OPTIMIZED: Fast pre-filter ---------- def quick_filter(g_tokens: set, pm_tokens: set, threshold: float = 0.15) -> bool: """Fast token overlap check to skip obvious non-matches""" if not g_tokens or not pm_tokens: return False overlap = len(g_tokens & pm_tokens) / len(g_tokens | pm_tokens) return overlap >= threshold # ---------- OPTIMIZED: Hybrid similarity with pre-computed data ---------- def hybrid_similarity_optimized( g_norm: str, g_base: str, g_tokens: set, g_numbers: set, pm_norm: str, pm_base: str, pm_tokens: set, pm_numbers: set ) -> Dict[str, float]: """ Enhanced similarity using pre-computed normalized forms. """ # Exact match = perfect score if g_norm == pm_norm: return {"diff": 100.0, "jacc": 100.0, "num": 100.0, "mol_base": 100.0, "score": 100.0} # 1. Full text difflib similarity diff = difflib.SequenceMatcher(None, g_norm, pm_norm).ratio() * 100.0 # 2. Token Jaccard similarity jacc = (len(g_tokens & pm_tokens) / len(g_tokens | pm_tokens) * 100.0) if (g_tokens and pm_tokens) else 0.0 # 3. Number matching (bonus only) num_match = 100.0 if (g_numbers and pm_numbers and g_numbers == pm_numbers) else 0.0 # 4. Molecule base matching mol_base_score = 0.0 if g_base and pm_base: if g_base == pm_base: mol_base_score = 100.0 else: mol_base_diff = difflib.SequenceMatcher(None, g_base, pm_base).ratio() * 100.0 base_tokens_g = set(g_base. split()) base_tokens_pm = set(pm_base. split()) if base_tokens_g and base_tokens_pm: base_jacc = len(base_tokens_g & base_tokens_pm) / len(base_tokens_g | base_tokens_pm) * 100.0 mol_base_score = 0.40 * mol_base_diff + 0.60 * base_jacc else: mol_base_score = mol_base_diff # 5. Scoring formula if mol_base_score >= 95: score = (0.60 * mol_base_score + 0.20 * diff + 0.15 * jacc + 0.05 * num_match) else: score = (0.50 * mol_base_score + 0.25 * diff + 0.20 * jacc + 0.05 * num_match) return { "diff": round(diff, 2), "jacc": round(jacc, 2), "num": round(num_match, 2), "mol_base": round(mol_base_score, 2), "score": round(score, 2) } # ---------- OPTIMIZED: Batch matching ---------- def match_generic_to_product_master_optimized( generic_list: List[str], pm: PrecomputedProductMaster, min_score: float = 60.0, return_all: bool = False, batch_size: int = 100 ) -> List[Dict[str, Any]]: """Optimized matching using pre-computed product master""" results = [] total = len(generic_list) for batch_start in range(0, total, batch_size): batch_end = min(batch_start + batch_size, total) batch = generic_list[batch_start:batch_end] if batch_start % 500 == 0: print(f"Processing RFQ rows {batch_start}-{batch_end} of {total}...") for i_in_batch, g in enumerate(batch): i = batch_start + i_in_batch g_str = str(g or "").strip() if not g_str: continue # Pre-compute for this generic g_norm = norm_base(g_str) g_base = extract_molecule_base(g_str) g_tokens = set(token_set(g_base)) g_numbers = set(extract_numbers(g_str)) best_score, best_pos, best_parts = -1.0, None, None for pos in range(len(pm)): # Quick filter to skip obvious non-matches if not quick_filter(g_tokens, pm.mol_tokens[pos]): continue # Full similarity calculation only for candidates parts = hybrid_similarity_optimized( g_norm, g_base, g_tokens, g_numbers, pm.mol_norm[pos], pm.mol_base[pos], pm.mol_tokens[pos], pm.mol_numbers[pos] ) if parts["score"] > best_score: best_score, best_pos, best_parts = parts["score"], pos, parts if best_pos is None: continue item = { "row_index": i, "generic_name": g_str, "matched_name": pm.mol_raw[best_pos], "matched_brand_name": pm.brand_names[best_pos], "match_percent": round(best_score, 2), "brand_id": pm.brand_ids[best_pos], "brand_name": pm.brand_names[best_pos], "master_row_index": int(pm.idxs[best_pos]), } if return_all: item["_debug"] = best_parts results.append(item) else: if best_score >= min_score: results.append(item) return results # ---------- OPTIMIZED: Grouped matcher ---------- def match_generic_to_product_master_grouped_for_row_optimized( generic_value: str, pm: PrecomputedProductMaster, min_score: float = 60.0, top_n: int = 3 ) -> List[Dict[str, Any]]: """Optimized grouped matching for a single row""" g_str = str(generic_value or "").strip() if not g_str: return [] # Pre-compute for this generic g_norm = norm_base(g_str) g_base = extract_molecule_base(g_str) g_tokens = set(token_set(g_base)) g_numbers = set(extract_numbers(g_str)) scored = [] for idx in range(len(pm)): # Quick filter if not quick_filter(g_tokens, pm.mol_tokens[idx]): continue # Full calculation parts = hybrid_similarity_optimized( g_norm, g_base, g_tokens, g_numbers, pm.mol_norm[idx], pm.mol_base[idx], pm.mol_tokens[idx], pm.mol_numbers[idx] ) score = parts["score"] if score >= min_score: scored. append({ "matched_name": pm.mol_raw[idx], "brand_name": pm.brand_names[idx], "brand_id": pm.brand_ids[idx], "match_percent": round(score, 2), "_debug": parts }) scored.sort(key=lambda x: x["match_percent"], reverse=True) return scored[:top_n] # ---------- OPTIMIZED Endpoints ---------- @app.post("/match-difflib") async def match_with_difflib( rfq_file: UploadFile = File(...), product_master_json: UploadFile = File(...), min_score: float = Query(60.0, description="Minimum composite score (0-100)") ): try: # RFQ rfq_bytes = await rfq_file.read() rfq_df = dataframe_from_upload_bytes(rfq_file.filename, rfq_bytes) mapped, mapping = build_mapped_rfq(rfq_df) if "generic_name" not in mapped. columns: raise HTTPException( status_code=400, detail="No 'generic_name' column found after mapping RFQ.") gen_series = mapped["generic_name"] nonempty_mask = gen_series.notna() & gen_series.astype( str).str.strip().ne("") & gen_series.astype(str).str.lower().ne("") generic_list = gen_series[nonempty_mask].astype(str).tolist() # Product master pm_bytes = await product_master_json. read() pm_df = dataframe_from_upload_bytes("product_master.json", pm_bytes) pm_df = ensure_str_columns(drop_unnamed_columns(pm_df)) molecule_col = detect_single_column(pm_df, "__product_master_molecule__") brand_id_col = detect_single_column(pm_df, "__product_master_brand_id__") brand_name_col = detect_single_column(pm_df, "__product_master_brand_name__") if not molecule_col: raise HTTPException( status_code=400, detail="Could not detect molecule column in product master JSON.") # OPTIMIZED: Pre-compute product master pm = PrecomputedProductMaster(pm_df, molecule_col, brand_id_col, brand_name_col) # OPTIMIZED: Use optimized matching matches = match_generic_to_product_master_optimized( generic_list, pm, min_score=min_score, return_all=False ) return JSONResponse({ "rfq_rows": int(nonempty_mask.sum()), "product_master_detected": { "molecule_col": molecule_col, "brand_id_col": brand_id_col, "brand_name_col": brand_name_col }, "product_master_size": len(pm), "matches_returned": len(matches), "data": matches }) except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/test-extract-base") def test_extract_base(text: str): """Test molecule base extraction""" normalized = norm_base(text) mol_base = extract_molecule_base(text) return { "original": text, "normalized": normalized, "molecule_base": mol_base, "numbers_extracted": list(extract_numbers(text)), "tokens": list(token_set(text)) } @app.post("/match-difflib-debug") async def match_with_difflib_debug( rfq_file: UploadFile = File(...), product_master_json: UploadFile = File(...), sample: int = Query(5, ge=1, le=200), min_score: float = Query(60.0), sample_contains: str = Query("", description="Filter RFQ rows by substring (case-insensitive)") ): """ Diagnostics: return BEST match (+%) for the first N RFQ rows, optionally filtered by text. """ try: # RFQ rfq_bytes = await rfq_file.read() rfq_df = dataframe_from_upload_bytes(rfq_file.filename, rfq_bytes) mapped, mapping = build_mapped_rfq(rfq_df) gen_series = mapped. get("generic_name", pd.Series([], dtype=object)) nonempty_mask = gen_series.notna() & gen_series.astype( str).str.strip().ne("") & gen_series.astype(str).str.lower().ne("") generic_list_all = gen_series[nonempty_mask].astype(str) if sample_contains: flt = generic_list_all.str.contains(sample_contains, case=False, na=False) generic_list = generic_list_all[flt]. tolist()[:sample] else: generic_list = generic_list_all.tolist()[:sample] # Product master pm_bytes = await product_master_json.read() pm_df = dataframe_from_upload_bytes("product_master.json", pm_bytes) pm_df = ensure_str_columns(drop_unnamed_columns(pm_df)) molecule_col = detect_single_column(pm_df, "__product_master_molecule__") brand_id_col = detect_single_column(pm_df, "__product_master_brand_id__") brand_name_col = detect_single_column(pm_df, "__product_master_brand_name__") # OPTIMIZED: Pre-compute pm = PrecomputedProductMaster(pm_df, molecule_col, brand_id_col, brand_name_col) demo_matches = match_generic_to_product_master_optimized( generic_list, pm, min_score=min_score, return_all=True ) return JSONResponse({ "rfq_detected_headers": list(map(str, rfq_df.columns)), "template_mapping": mapping, "nonempty_generic_count": int(nonempty_mask.sum()), "product_master_detected": { "molecule_col": molecule_col, "brand_id_col": brand_id_col, "brand_name_col": brand_name_col }, "product_master_size": len(pm), "filter": sample_contains or None, "examples": demo_matches }) except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/match-difflib-grouped") async def match_with_difflib_grouped( rfq_file: UploadFile = File(...), product_master_json: UploadFile = File(...), min_score: float = Query(60.0, description="Minimum score to include"), top_n: int = Query(3, description="Max number of matches per RFQ row") ): """ Return ALL extracted RFQ rows with matches array. OPTIMIZED version with pre-computation and batching. """ try: # RFQ rfq_bytes = await rfq_file.read() rfq_df = dataframe_from_upload_bytes(rfq_file.filename, rfq_bytes) mapped, mapping = build_mapped_rfq(rfq_df) for col in TEMPLATE_COLUMNS: if col not in mapped.columns: mapped[col] = pd.NA # Product master pm_bytes = await product_master_json.read() pm_df = dataframe_from_upload_bytes("product_master.json", pm_bytes) pm_df = ensure_str_columns(drop_unnamed_columns(pm_df)) molecule_col = detect_single_column(pm_df, "__product_master_molecule__") brand_id_col = detect_single_column(pm_df, "__product_master_brand_id__") brand_name_col = detect_single_column(pm_df, "__product_master_brand_name__") if not molecule_col: raise HTTPException( status_code=400, detail="Could not detect molecule column in product master JSON.") # OPTIMIZED: Pre-compute product master pm = PrecomputedProductMaster(pm_df, molecule_col, brand_id_col, brand_name_col) # Build response data data_out = [] match_rows_with_any = 0 total = len(mapped) print(f"Processing {total} RFQ rows against {len(pm)} products...") for idx, row in mapped.iterrows(): if idx % 100 == 0: print(f"Processing RFQ row {idx}/{total}...") rfq_record = {col: (None if pd.isna(row. get(col)) else str(row.get(col))) for col in TEMPLATE_COLUMNS} g_val = rfq_record.get("generic_name") or "" # OPTIMIZED: Use optimized matching matches = match_generic_to_product_master_grouped_for_row_optimized( generic_value=g_val, pm=pm, min_score=min_score, top_n=top_n ) if matches: match_rows_with_any += 1 data_out.append({ "row_index": int(idx), "rfq": rfq_record, "matches": matches }) print(f"Completed! {match_rows_with_any}/{total} rows had matches.") return { "rfq_rows": int(len(mapped)), "product_master_detected": { "molecule_col": molecule_col, "brand_id_col": brand_id_col, "brand_name_col": brand_name_col }, "product_master_size": len(pm), "rows_with_matches": match_rows_with_any, "data": data_out } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/debug-score") def debug_score(a: str, b: str): """Quick check for two strings.""" # Pre-compute both sides a_norm = norm_base(a) a_base = extract_molecule_base(a) a_tokens = set(token_set(a_base)) a_numbers = set(extract_numbers(a)) b_norm = norm_base(b) b_base = extract_molecule_base(b) b_tokens = set(token_set(b_base)) b_numbers = set(extract_numbers(b)) result = hybrid_similarity_optimized( a_norm, a_base, a_tokens, a_numbers, b_norm, b_base, b_tokens, b_numbers ) return { "a": a, "b": b, "a_normalized": a_norm, "b_normalized": b_norm, "a_base": a_base, "b_base": b_base, "a_tokens": list(a_tokens), "b_tokens": list(b_tokens), "quick_filter_pass": quick_filter(a_tokens, b_tokens), "similarity": result } @app. get("/") def root(): return { "status": "ok", "message": "OPTIMIZED version with pre-computation and batching", "endpoints": { "/match-difflib": "Standard matching", "/match-difflib-grouped": "Grouped matching (recommended)", "/match-difflib-debug": "Debug mode", "/debug-score": "Test two strings", "/test-extract-base": "Test molecule extraction" } } if __name__ == "__main__": import uvicorn # INCREASED TIMEOUT: 10 minutes (600 seconds) uvicorn.run(app, host="0.0.0.0", port=7860, timeout_keep_alive=600)