Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| process_company.py | |
| One-time processing script per company. | |
| Handles PDF-only companies and PDF+XBRL companies automatically. | |
| Usage: | |
| python3 process_company.py "AXIS" | |
| python3 process_company.py --all | |
| python3 process_company.py --list | |
| """ | |
| import sys | |
| import argparse | |
| import re | |
| from pathlib import Path | |
| from datetime import datetime | |
| sys.path.insert(0, str(Path(__file__).resolve().parent)) | |
| import config | |
| from db import init_db | |
| from app.services.cache_service import ( | |
| upsert_company, save_claims, save_timeseries, | |
| save_risk, save_analysis, get_all_companies, | |
| ) | |
| from pipelines.text.extract_text_data import run_text_extraction_pipeline | |
| from pipelines.text.split_sentences import run_sentence_splitting_pipeline | |
| from pipelines.text.claim_extractor import run_claim_extraction_pipeline | |
| from pipelines.ml.run_claim_model import load_claim_model, run_claim_model_pipeline | |
| from pipelines.ml.extract_attributes import run_attribute_extraction_pipeline | |
| from pipelines.finance.extract_xbrl_data import run_xbrl_extraction_pipeline | |
| from pipelines.finance.prepare_timeseries_data import run_timeseries_pipeline | |
| from pipelines.finance.verify_claims import run_claim_verification_pipeline | |
| from pipelines.risk.aggregate_risk import run_risk_aggregation_pipeline | |
| from pipelines.rag.build_vector_db import load_vector_db, build_vector_db | |
| from pipelines.rag.rag_explainer import load_groq_client, run_rag_explanation | |
| # ββ Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _log(msg: str): | |
| print(f"[{datetime.now().strftime('%H:%M:%S')}] {msg}") | |
| def _clean_display(folder_name: str) -> str: | |
| name = re.sub(r'\s*\d{4}\s*Quarterly\s*Data\s*', '', folder_name, flags=re.IGNORECASE) | |
| return name.strip() | |
| def _find_folder(query: str): | |
| import difflib | |
| companies_dir = Path(config.DATA_DIR) / "raw" / "companies" | |
| if not companies_dir.exists(): | |
| return None | |
| folders = [f for f in companies_dir.iterdir() if f.is_dir()] | |
| query_lower = query.strip().lower() | |
| for f in folders: | |
| if f.name.lower() == query_lower or _clean_display(f.name).lower() == query_lower: | |
| return f | |
| for f in folders: | |
| if query_lower in f.name.lower() or query_lower in _clean_display(f.name).lower(): | |
| return f | |
| names = [f.name.lower() for f in folders] | |
| matches = difflib.get_close_matches(query_lower, names, n=1, cutoff=0.5) | |
| if matches: | |
| return folders[names.index(matches[0])] | |
| return None | |
| def _has_xml(folder: Path) -> bool: | |
| """Check if company folder has any XML files (any depth).""" | |
| return any(folder.rglob("*.xml")) | |
| def _match_records(records: list, folder_name: str) -> list: | |
| folder_lower = folder_name.lower() | |
| display_lower = _clean_display(folder_name).lower() | |
| exact = [r for r in records | |
| if r.get("company", "").lower() in (folder_lower, display_lower)] | |
| if exact: | |
| return exact | |
| return [r for r in records | |
| if display_lower in r.get("company", "").lower() | |
| or r.get("company", "").lower() in folder_lower] | |
| def _signal_title(sentence: str) -> str: | |
| words = sentence.split() | |
| return " ".join(words[:8]).rstrip(",.;") + ("β¦" if len(words) > 8 else "") | |
| def _infer_verdict(pos: list, neg: list) -> str: | |
| if len(pos) > len(neg) * 1.5: | |
| return "Positive" | |
| if len(neg) > len(pos) * 1.5: | |
| return "Negative" | |
| return "Mixed" | |
| def _fmt(value, suffix="") -> str: | |
| if value is None: | |
| return "N/A" | |
| try: | |
| return f"{float(value):,.1f}{suffix}" | |
| except (TypeError, ValueError): | |
| return "N/A" | |
| def _clean_quarter(q: str) -> str: | |
| """Cleans quarter strings and converts Calendar Years to Indian FY.""" | |
| q = str(q).strip().upper() | |
| year, quarter = None, None | |
| m = re.search(r"([1-4])[Qq].*?(?:FY|20)?(\d{2,4})", q) | |
| if not m: m = re.search(r"[Qq]([1-4]).*?(?:FY|20)?(\d{2,4})", q) | |
| if not m: | |
| m = re.match(r"^\d{4}-Q[1-4]$", q) | |
| if m: year, quarter = int(q[:4]), int(q[-1]) | |
| if m and not year: | |
| y_val = int(m.group(2)) | |
| year = y_val + 2000 if y_val < 100 else y_val | |
| quarter = int(m.group(1)) | |
| if year and quarter: | |
| if "FY" in q: return f"{year}-Q{quarter}" | |
| # Indian FY Math: Apr-Dec (Q1, Q2, Q3) -> Next Year. Jan-Mar (Q4) -> Same Year. | |
| fy_year = year + 1 if quarter in [1, 2, 3] else year | |
| return f"{fy_year}-Q{quarter}" | |
| return q | |
| def _trend(change) -> str: | |
| if change is None: | |
| return "flat" | |
| try: | |
| v = float(change) | |
| return "up" if v > 1 else ("down" if v < -1 else "flat") | |
| except (TypeError, ValueError): | |
| return "flat" | |
| def _format_metrics(ts_records: list) -> list: | |
| if not ts_records: | |
| return [] | |
| row = sorted(ts_records, key=lambda r: str(r.get("quarter", "")))[-1] | |
| defs = [ | |
| ("Revenue", "revenue", "βΉ Cr", "revenue_qoq_change"), | |
| ("Net Profit", "net_profit", "βΉ Cr", "net_profit_qoq_change"), | |
| ("Operating Profit", "operating_profit", "βΉ Cr", "operating_profit_qoq_change"), | |
| ("Profit Margin", "profit_margin", "%", "profit_margin_qoq_change"), | |
| ] | |
| out = [] | |
| for label, key, unit, chg_key in defs: | |
| val = row.get(key) | |
| if val is None: | |
| continue | |
| out.append({ | |
| "label": f"{label} Β· {row.get('quarter', '')}", | |
| "value": _fmt(val, f" {unit}"), | |
| "sub": f"QoQ {_fmt(row.get(chg_key), '%')}", | |
| "trend": _trend(row.get(chg_key)), | |
| }) | |
| return out | |
| def _format_bar_metrics(ts_records: list) -> list: | |
| if not ts_records: | |
| return [] | |
| row = sorted(ts_records, key=lambda r: str(r.get("quarter", "")))[-1] | |
| color_map = {"up": "var(--accent-green)", "down": "var(--accent-red)", "flat": "var(--accent-blue)"} | |
| defs = [ | |
| ("Revenue QoQ %", "revenue_qoq_change"), | |
| ("Net Profit QoQ %", "net_profit_qoq_change"), | |
| ("Operating Profit QoQ %", "operating_profit_qoq_change"), | |
| ("Profit Margin QoQ %", "profit_margin_qoq_change"), | |
| ] | |
| out = [] | |
| for label, key in defs: | |
| val = row.get(key) | |
| if val is None: | |
| continue | |
| try: | |
| pct = float(val) | |
| except (TypeError, ValueError): | |
| continue | |
| t = _trend(pct) | |
| out.append({ | |
| "label": label, | |
| "target_pct": min(int(abs(pct) * 2), 100), | |
| "color": color_map[t], | |
| "value": _fmt(pct, "%"), | |
| }) | |
| return out | |
| def _format_risk_from_claims(attributes: list) -> dict: | |
| """ | |
| Lightweight risk estimate from claim attributes alone (no XBRL needed). | |
| Uses direction distribution as a proxy for consistency. | |
| """ | |
| if not attributes: | |
| return {"level": "UNKNOWN", "consistency_score": None, "warning_flag": 0} | |
| directions = [a.get("direction") for a in attributes if a.get("direction")] | |
| if not directions: | |
| return {"level": "UNKNOWN", "consistency_score": None, "warning_flag": 0} | |
| increase_rate = directions.count("increase") / len(directions) | |
| decrease_rate = directions.count("decrease") / len(directions) | |
| missing_rate = sum(1 for a in attributes if a.get("direction_missing")) / len(attributes) | |
| # Simple heuristic score | |
| score = round(max(min(increase_rate - decrease_rate * 0.5 - missing_rate * 0.2, 1.0), 0.0), 2) | |
| level = "LOW" if score > 0.6 else ("MODERATE" if score > 0.3 else "HIGH") | |
| return { | |
| "level": level, | |
| "consistency_score": score, | |
| "warning_flag": 1 if score < 0.3 else 0, | |
| } | |
| def _format_risk_from_verified(risk_records: list) -> dict: | |
| if not risk_records: | |
| return {"level": "UNKNOWN", "consistency_score": None, "warning_flag": 0} | |
| latest = sorted(risk_records, key=lambda r: str(r.get("quarter", "")))[-1] | |
| score = latest.get("consistency_score", 0) | |
| warn = latest.get("warning_flag", 0) | |
| level = "HIGH" if (warn or score < 0.4) else ("MODERATE" if score < 0.65 else "LOW") | |
| return { | |
| "level": level, | |
| "consistency_score": score, | |
| "warning_flag": warn, | |
| "verification_rate": latest.get("verification_rate"), | |
| "failure_rate": latest.get("failure_rate"), | |
| } | |
| # ββ Core processing βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process_company(folder: Path) -> bool: | |
| folder_name = folder.name | |
| display_name = _clean_display(folder_name) | |
| _log(f"Processing: {display_name}") | |
| company_id = upsert_company(folder_name, status="processing") | |
| try: | |
| companies_parent = str(folder.parent) | |
| attributes = [] | |
| ts_records = [] | |
| verified = [] | |
| risk_records = [] | |
| sentences = [] | |
| # ββ 1. Text pipeline (always runs) ββββββββββββββββββββββββββββββββ | |
| _log(" β Extracting text from transcripts...") | |
| raw_records = run_text_extraction_pipeline(companies_parent) | |
| company_docs = _match_records(raw_records, folder_name) | |
| if company_docs: | |
| _log(f" β {len(company_docs)} file(s) found. Splitting sentences...") | |
| sentences = run_sentence_splitting_pipeline(company_docs) | |
| _log(f" β {len(sentences)} sentences. Running claim extraction...") | |
| spacy_claims = run_claim_extraction_pipeline(sentences) | |
| bert_claims = run_claim_model_pipeline(sentences) | |
| claims = bert_claims if bert_claims else spacy_claims | |
| _log(f" β {len(claims)} claims found. Extracting attributes...") | |
| attributes = run_attribute_extraction_pipeline(claims) | |
| _log(f" β {len(attributes)} attribute records extracted.") | |
| else: | |
| _log(" β No transcript files found.") | |
| # ββ 2. Financial data via XBRL/XML ββββββββββββββββββββββββββββββββ | |
| _log(" β Extracting quarterly financials from local XBRL/XML files...") | |
| has_xbrl = False | |
| if _has_xml(folder): | |
| try: | |
| # Extract XBRL records (filtering for this specific company) | |
| all_fin_records = run_xbrl_extraction_pipeline(companies_parent) | |
| fin_records = [r for r in all_fin_records if r.get("company") == folder_name] | |
| if fin_records: | |
| ts_records = run_timeseries_pipeline(fin_records) | |
| save_timeseries(company_id, ts_records) | |
| _log(f" β {len(ts_records)} timeseries rows saved.") | |
| has_xbrl = True | |
| if attributes: | |
| _log(" β Verifying claims against financials...") | |
| verified = run_claim_verification_pipeline(attributes, ts_records) | |
| risk_records = run_risk_aggregation_pipeline(verified) | |
| save_risk(company_id, risk_records) | |
| _log(f" β {len(verified)} claims verified.") | |
| else: | |
| _log(" β No financial data parsed from XMLs β skipping verification.") | |
| except Exception as e: | |
| _log(f" β Financials error: {e}") | |
| else: | |
| _log(" β No XML files found in folder β skipping verification.") | |
| # ββ 3. Save claims to DB ββββββββββββββββββββββββββββββββββββββββββ | |
| # Use verified claims if available, otherwise raw attributes | |
| claims_to_save = verified if verified else attributes | |
| if claims_to_save: | |
| save_claims(company_id, claims_to_save) | |
| # ββ 4. Load ALL chunks into ChromaDB βββββββββββββββββββββββββββββ | |
| _log(" β Loading ALL transcript chunks into vector DB...") | |
| if sentences: | |
| # We use 'sentences' instead of 'verified' so RAG has full context | |
| for s in sentences: | |
| s.setdefault("result", "CONTEXT") | |
| s.setdefault("reason", "TRANSCRIPT_EXCERPT") | |
| status = build_vector_db(sentences, company=display_name) | |
| _log(f" β Vector DB: {status}") | |
| else: | |
| _log(" β No text chunks to load into vector DB.") | |
| # ββ 5. RAG + cache all 3 modes ββββββββββββββββββββββββββββββββββββ | |
| _log(" β Generating RAG explanations...") | |
| query = f"{display_name} financial performance earnings" | |
| # Risk dict | |
| risk_dict = ( | |
| _format_risk_from_verified(risk_records) | |
| if risk_records | |
| else _format_risk_from_claims(attributes) | |
| ) | |
| # Display claims for UI (top 20) with clean quarter formats | |
| display_claims = [ | |
| { | |
| "quarter": _clean_quarter(a.get("quarter", "")), | |
| "sentence": str(a.get("sentence", ""))[:140], | |
| "metric": a.get("metric", ""), | |
| "direction": a.get("direction") or "neutral", | |
| "result": a.get("result", "UNVERIFIED"), | |
| "confidence": a.get("actual_change") or 0.0, | |
| } | |
| for a in (verified or attributes)[:20] | |
| ] | |
| quarters = sorted({ | |
| _clean_quarter(r.get("quarter", "")) for r in (verified or attributes) | |
| if r.get("quarter") | |
| }) | |
| for mode in ("full", "earnings", "financial"): | |
| try: | |
| rag = run_rag_explanation(query, company=display_name, top_k=config.RAG_TOP_K) | |
| pos_raw = rag.get("positive_signals", []) | |
| neg_raw = rag.get("negative_signals", []) | |
| result = { | |
| "company": display_name, | |
| "mode": mode, | |
| "matched_folder": folder_name, | |
| "has_xbrl": has_xbrl, | |
| "positive_signals": [ | |
| {"index": f"P{i+1}", "title": _signal_title(s), "body": s} | |
| for i, s in enumerate(pos_raw) | |
| ], | |
| "negative_signals": [ | |
| {"index": f"N{i+1}", "title": _signal_title(s), "body": s} | |
| for i, s in enumerate(neg_raw) | |
| ], | |
| "explanation": rag.get("explanation", ""), | |
| "verdict": _infer_verdict(pos_raw, neg_raw), | |
| "metrics": _format_metrics(ts_records), | |
| "bar_metrics": _format_bar_metrics(ts_records), | |
| "risk": risk_dict, | |
| "claims": display_claims, | |
| "source_label": ( | |
| f"{display_name} Β· {', '.join(quarters)}" | |
| if quarters else display_name | |
| ), | |
| } | |
| save_analysis(company_id, mode, result) | |
| except Exception as exc: | |
| _log(f" β Mode '{mode}' failed: {exc}") | |
| # ββ 6. Mark ready βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| upsert_company(folder_name, status="ready") | |
| _log(f" β {display_name} done.\n") | |
| return True | |
| except Exception as exc: | |
| upsert_company(folder_name, status="error", error_msg=str(exc)) | |
| _log(f" β Failed: {exc}\n") | |
| import traceback; traceback.print_exc() | |
| return False | |
| # ββ CLI βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def cmd_list(): | |
| companies = get_all_companies() | |
| if not companies: | |
| print("No companies registered yet.") | |
| return | |
| print(f"\n{'Display Name':<30} {'Status':<12} {'Processed At'}") | |
| print("-" * 65) | |
| for c in companies: | |
| print(f"{c['display_name']:<30} {c['status']:<12} {c['processed_at'] or 'β'}") | |
| print() | |
| def cmd_process_all(): | |
| companies_dir = Path(config.DATA_DIR) / "raw" / "companies" | |
| if not companies_dir.exists(): | |
| print(f"Not found: {companies_dir}") | |
| return | |
| folders = [f for f in sorted(companies_dir.iterdir()) if f.is_dir()] | |
| _log(f"Found {len(folders)} company folders.") | |
| ok = fail = 0 | |
| for folder in folders: | |
| if process_company(folder): | |
| ok += 1 | |
| else: | |
| fail += 1 | |
| _log(f"Done. {ok} succeeded, {fail} failed.") | |
| def main(): | |
| _log("Loading models...") | |
| load_claim_model(config.MODEL_PATH) | |
| load_vector_db(config.CHROMA_DB_PATH) | |
| load_groq_client() | |
| init_db() | |
| _log("Ready.\n") | |
| parser = argparse.ArgumentParser() | |
| group = parser.add_mutually_exclusive_group(required=True) | |
| group.add_argument("company", nargs="?", help="Company name") | |
| group.add_argument("--all", action="store_true") | |
| group.add_argument("--list", action="store_true") | |
| args = parser.parse_args() | |
| if args.list: | |
| cmd_list(); return | |
| if args.all: | |
| cmd_process_all(); return | |
| folder = _find_folder(args.company) | |
| if not folder: | |
| print(f"No matching folder for: '{args.company}'") | |
| companies_dir = Path(config.DATA_DIR) / "raw" / "companies" | |
| if companies_dir.exists(): | |
| print("Available:") | |
| for f in sorted(companies_dir.iterdir()): | |
| if f.is_dir(): | |
| print(f" {f.name}") | |
| sys.exit(1) | |
| process_company(folder) | |
| if __name__ == "__main__": | |
| main() |