Spaces:
Build error
Build error
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| import re, io, os | |
| import gradio as gr | |
| import pandas as pd | |
| # docx | |
| try: | |
| from docx import Document | |
| DOCX_OK = True | |
| except Exception: | |
| DOCX_OK = False | |
| # ocr | |
| try: | |
| from PIL import Image, ImageOps | |
| import pytesseract | |
| OCR_OK = True | |
| except Exception: | |
| OCR_OK = False | |
| # ---------- helpers ---------- | |
| def _norm_name(s: str) -> str: | |
| import re | |
| return re.sub(r"[^a-z0-9]", "", str(s).lower()) | |
| def _pick_col(cols, *cands): | |
| m = { _norm_name(c): c for c in cols } | |
| for cand in cands: | |
| for k, orig in m.items(): | |
| if cand in k: | |
| return orig | |
| return None | |
| def _coerce_numeric(x): | |
| if pd.isna(x): return x | |
| if isinstance(x, (int,float)): return float(x) | |
| s = str(x).replace(",","").replace("$","").strip() | |
| try: return float(s) | |
| except: return pd.NA | |
| PARAM_PATTERNS = { | |
| "cost": r"cost\s*[:=]\s*\$?\s*([\d,]+(?:\.\d+)?)", | |
| "salvage": r"salvage\s*[:=]\s*\$?\s*([\d,]+(?:\.\d+)?)", | |
| "life": r"(?:life|useful\s*life)\s*[:=]\s*([\d,]+)", | |
| "start_year": r"(?:start\s*year|start)\s*[:=]\s*([12]\d{3})", | |
| } | |
| def _extract_params(text: str): | |
| vals = {} | |
| low = (text or "").lower() | |
| for k, pat in PARAM_PATTERNS.items(): | |
| m = re.search(pat, low, flags=re.I) | |
| if m: | |
| raw = m.group(1).replace(",", "") | |
| vals[k] = float(raw) if k in ("cost","salvage") else int(float(raw)) | |
| return vals | |
| def _docx_to_table_and_text(fileobj) -> tuple[pd.DataFrame|None, str]: | |
| if not DOCX_OK: | |
| return None, "(python-docx not available)" | |
| try: | |
| doc = Document(fileobj) | |
| except Exception as e: | |
| return None, f"[docx open failed] {e}" | |
| # collect paragraphs (for param scraping) | |
| all_text = "\n".join(p.text for p in doc.paragraphs) | |
| # try to find a depreciation table | |
| for t in doc.tables: | |
| rows = [[c.text.strip() for c in r.cells] for r in t.rows] | |
| if not rows: | |
| continue | |
| hdr = rows[0] | |
| if len(hdr) >= 4 and any("year" in _norm_name(h) for h in hdr): | |
| df = pd.DataFrame(rows[1:], columns=hdr) | |
| df = df[~(df.astype(str).apply(lambda r: "".join(r), axis=1).str.strip() == "")] | |
| if not df.empty: | |
| return df, all_text | |
| return None, all_text | |
| def _image_to_text(img: Image.Image) -> str: | |
| if not OCR_OK: | |
| return "(pytesseract not available)" | |
| try: | |
| img = ImageOps.exif_transpose(img) | |
| gray = ImageOps.grayscale(img) | |
| return pytesseract.image_to_string(gray) | |
| except Exception as e: | |
| return f"[ocr failed] {e}" | |
| def _table_from_ocr_text(text: str) -> pd.DataFrame|None: | |
| if not text or not text.strip(): | |
| return None | |
| lines = [ln.strip() for ln in text.splitlines() if ln.strip()] | |
| hdr_i = -1 | |
| for i, ln in enumerate(lines): | |
| low = ln.lower() | |
| if ("year" in low and "begin" in low and "dep" in low and "end" in low): | |
| hdr_i = i | |
| break | |
| if hdr_i == -1: | |
| for i, ln in enumerate(lines): | |
| parts = re.split(r"\s{2,}|\t+", ln) | |
| low = ln.lower() | |
| if len([p for p in parts if p.strip()]) >= 4 and any(k in low for k in ["year","begin","dep","end"]): | |
| hdr_i = i | |
| break | |
| if hdr_i == -1: | |
| return None | |
| header = [h.strip() for h in re.split(r"\s{2,}|\t+", lines[hdr_i]) if h.strip()] | |
| data = [] | |
| for ln in lines[hdr_i+1:]: | |
| parts = [p.strip() for p in re.split(r"\s{2,}|\t+", ln) if p.strip()] | |
| if len(parts) == len(header): | |
| data.append(parts) | |
| else: | |
| if len(data) >= 1: | |
| break | |
| if not data: | |
| return None | |
| return pd.DataFrame(data, columns=header) | |
| def _normalize_depr_columns(df_in: pd.DataFrame) -> pd.DataFrame: | |
| df = df_in.copy() | |
| out = pd.DataFrame() | |
| c_year = _pick_col(df.columns, "year") | |
| c_beg = _pick_col(df.columns, "beginbv","beginningbv","beginbook","begin","beginningvalue") | |
| c_dep = _pick_col(df.columns, "depreciation","dep") | |
| c_acc = _pick_col(df.columns, "accumdep","accumulateddep","accum","accdep") | |
| c_end = _pick_col(df.columns, "endbv","endingbv","endbook","end","endingvalue") | |
| out["Year"] = df[c_year] if c_year else pd.NA | |
| out["Begin BV"] = df[c_beg] if c_beg else pd.NA | |
| out["Depreciation"] = df[c_dep] if c_dep else pd.NA | |
| out["Accum Dep"] = df[c_acc] if c_acc else pd.NA | |
| out["End BV"] = df[c_end] if c_end else pd.NA | |
| out["Year"] = pd.to_numeric(out["Year"], errors="coerce") | |
| for col in ["Begin BV","Depreciation","Accum Dep","End BV"]: | |
| out[col] = out[col].map(_coerce_numeric) | |
| out = out[~out[["Begin BV","Depreciation","Accum Dep","End BV"]].isna().all(axis=1)].reset_index(drop=True) | |
| return out | |
| # Monday Aug 11 New helpers | |
| def build_sl_schedule(cost: float, salvage: float, life: int, start_year: int): | |
| dep = (cost - salvage) / life | |
| years = [start_year + i for i in range(life)] | |
| begin_bv, dep_col, accum, end_bv = [], [], [], [] | |
| b = cost | |
| acc = 0.0 | |
| for _ in years: | |
| begin_bv.append(b) | |
| dep_col.append(dep) | |
| acc += dep | |
| accum.append(acc) | |
| b = b - dep | |
| end_bv.append(b) | |
| out = pd.DataFrame( | |
| { | |
| "Year": years, | |
| "Begin BV": begin_bv, | |
| "Depreciation": dep_col, | |
| "Accum Dep": accum, | |
| "End BV": end_bv, | |
| } | |
| ) | |
| return out | |
| def audit_against_expected(expected: pd.DataFrame, actual: pd.DataFrame): | |
| if actual is None or actual.empty: | |
| return pd.DataFrame(), "No student table found to check." | |
| merged = expected.merge( | |
| actual[["Year","Begin BV","Depreciation","Accum Dep","End BV"]], | |
| on="Year", how="inner", suffixes=("_exp","_act") | |
| ) | |
| if merged.empty: | |
| return pd.DataFrame(), "No matching years between expected and uploaded table." | |
| deltas = pd.DataFrame({"Year": merged["Year"]}) | |
| for c in ["Begin BV","Depreciation","Accum Dep","End BV"]: | |
| deltas[c + " Δ"] = merged[f"{c}_act"] - merged[f"{c}_exp"] | |
| first_bad = None | |
| for _, r in deltas.iterrows(): | |
| if any(abs(r[col]) > 1e-6 for col in deltas.columns if col.endswith("Δ")): | |
| first_bad = int(r["Year"]) | |
| break | |
| msg = ( | |
| "All good 🎉 Straight‑line matches your table." | |
| if first_bad is None | |
| else f"First mismatch at year {first_bad}. Remember: Dep=(Cost−Salvage)/Life and Accum_t=Accum_(t−1)+Dep." | |
| ) | |
| return deltas, msg | |
| # ---------- Gradio callbacks ---------- | |
| def _params_tuple(p): | |
| p = p or {} | |
| return ( | |
| float(p.get("cost", 0.0)), | |
| float(p.get("salvage", 0.0)), | |
| int(p.get("life", 10)), | |
| int(p.get("start_year", pd.Timestamp.now().year)), | |
| ) | |
| def handle_docx(file): | |
| if file is None: | |
| return "(no file)", {}, pd.DataFrame(), 0.0, 0.0, 10, pd.Timestamp.now().year, {}, pd.DataFrame() | |
| df_raw, header = _docx_to_table_and_text(file.name if hasattr(file, "name") else file) | |
| params = _extract_params(header or "") | |
| df_norm = _normalize_depr_columns(df_raw) if df_raw is not None else None | |
| cost, salv, life, year = _params_tuple(params) | |
| return ( | |
| header or "(no text found)", | |
| params, | |
| (df_norm if df_norm is not None else pd.DataFrame()), | |
| cost, salv, life, year, | |
| params, | |
| (df_norm if df_norm is not None else pd.DataFrame()), | |
| ) | |
| #def handle_image(img): | |
| # if img is None: | |
| # return "(no image)", {}, None, pd.DataFrame(), 0.0, 0.0, 10, pd.Timestamp.now().year, {}, pd.DataFrame() | |
| # from PIL import Image as PILImage | |
| # pil = img if isinstance(img, PILImage.Image) else PILImage.fromarray(img) | |
| # ocr_text = _image_to_text(pil) | |
| # params = _extract_params(ocr_text or "") | |
| # df_raw = _table_from_ocr_text(ocr_text or "") | |
| # df_norm = _normalize_depr_columns(df_raw) if df_raw is not None else None | |
| # cost, salv, life, year = _params_tuple(params) | |
| # return ( | |
| # ocr_text or "(empty OCR)", | |
| # params, | |
| # df_raw, | |
| # (df_norm if df_norm is not None else pd.DataFrame()), | |
| # cost, salv, life, year, | |
| # params, | |
| # (df_norm if df_norm is not None else pd.DataFrame()), | |
| # ) | |
| def handle_image(img): | |
| if img is None: | |
| return "(no image)", {}, pd.DataFrame(), pd.DataFrame(), 0.0, 0.0, 10, pd.Timestamp.now().year, {}, pd.DataFrame() | |
| from PIL import Image as PILImage | |
| pil = img if isinstance(img, PILImage.Image) else PILImage.fromarray(img) | |
| ocr_text = _image_to_text(pil) | |
| params = _extract_params(ocr_text or "") | |
| df_raw = _table_from_ocr_text(ocr_text or "") | |
| df_norm = _normalize_depr_columns(df_raw) if df_raw is not None else pd.DataFrame() | |
| cost, salv, life, year = _params_tuple(params) | |
| return ( | |
| ocr_text or "(empty OCR)", | |
| params, | |
| df_raw, # raw table shown in OCR tab | |
| df_norm, # normalized table shown in OCR tab | |
| cost, salv, life, year, # auto-fill numbers | |
| params, # save params state | |
| df_raw # 🔹 save normalized table to last_table (same as docx) | |
| ) | |
| def fill_from_state(p): | |
| p = p or {} | |
| return ( | |
| float(p.get("cost", 0.0)), | |
| float(p.get("salvage", 0.0)), | |
| int(p.get("life", 10)), | |
| int(p.get("start_year", pd.Timestamp.now().year)), | |
| ) | |
| def build_cb(cost, salv, life, year): | |
| try: | |
| df = build_sl_schedule(float(cost), float(salv), int(life), int(year)) | |
| except Exception as e: | |
| return pd.DataFrame([{"error": str(e)}]) | |
| return df | |
| #def check_cb(cost, salv, life, year, table_state): | |
| # # expected (numeric) | |
| # exp = build_sl_schedule(float(cost), float(salv), int(life), int(year)) | |
| # exp["Year"] = pd.to_numeric(exp["Year"], errors="coerce") | |
| # nothing to check? | |
| # if not isinstance(table_state, pd.DataFrame) or table_state.empty: | |
| # return pd.DataFrame(), "No student table found to check." | |
| # 👇 Gradio returns strings → re-normalize and coerce here every time | |
| # actual = _normalize_depr_columns(table_state) | |
| # for c in ["Year", "Begin BV", "Depreciation", "Accum Dep", "End BV"]: | |
| # actual[c] = pd.to_numeric(actual[c], errors="coerce") | |
| # actual = actual.dropna(subset=["Year"]).reset_index(drop=True) | |
| # deltas, msg = audit_against_expected(exp, actual) | |
| # return deltas, msg | |
| def check_cb(cost, salv, life, year, table_state): | |
| exp = build_sl_schedule(float(cost), float(salv), int(life), int(year)) | |
| exp["Year"] = pd.to_numeric(exp["Year"], errors="coerce") | |
| # Accept pd.DataFrame OR list-of-lists/dicts from Gradio | |
| actual = table_state | |
| if isinstance(actual, list): | |
| # best effort columns; normalize later | |
| actual = pd.DataFrame(actual) | |
| elif not isinstance(actual, pd.DataFrame): | |
| return pd.DataFrame(), "No student table found to check." | |
| # normalize columns & numeric coercion | |
| actual = _normalize_depr_columns(actual) | |
| for c in ["Year","Begin BV","Depreciation","Accum Dep","End BV"]: | |
| actual[c] = pd.to_numeric(actual[c], errors="coerce") | |
| actual = actual.dropna(subset=["Year"]).reset_index(drop=True) | |
| deltas, msg = audit_against_expected(exp, actual) | |
| return deltas, msg | |
| # --- Debug utilities --- | |
| def debug_dump(ocr_text, params, raw_tbl, norm_tbl, last_tbl, image): | |
| import pandas as pd, io | |
| def df_summary(name, df): | |
| if isinstance(df, pd.DataFrame) and not df.empty: | |
| head = df.head(5).to_string(index=False) | |
| return f"**{name}**: shape={df.shape}, cols={list(df.columns)}\n```\n{head}\n```" | |
| return f"**{name}**: {type(df).__name__} (empty or not a DataFrame)" | |
| lines = [] | |
| lines.append(f"**OCR text length**: {len(ocr_text or '')}") | |
| lines.append(f"**Params keys**: {sorted(list((params or {}).keys()))}") | |
| lines.append(df_summary("raw_df (Tab 2)", raw_tbl)) | |
| lines.append(df_summary("norm_df (Tab 2)", norm_tbl)) | |
| lines.append(df_summary("last_table (State)", last_tbl)) | |
| report = "\n\n".join(lines) | |
| # Return the report and echoes of the DFs and image for visual confirmation | |
| # (use empty DataFrames if inputs aren't DataFrames) | |
| def ensure_df(x): | |
| return x if isinstance(x, pd.DataFrame) else pd.DataFrame() | |
| return ( | |
| report, | |
| ensure_df(raw_tbl), | |
| ensure_df(norm_tbl), | |
| ensure_df(last_tbl), | |
| image # echo the image | |
| ) | |
| # ---------- UI ---------- | |
| with gr.Blocks(title="Jerry • HW Intake (Echo)") as demo: | |
| last_params = gr.State({}) | |
| last_table = gr.State(pd.DataFrame()) | |
| gr.Markdown("## Jerry (TA) – Homework Intake\nThis Space **only reads and echoes** your files.\nNext step will add solving & coaching.") | |
| # --- Tab 1: DOCX --- | |
| with gr.Tab("Upload .docx"): | |
| docx_in = gr.File(file_types=[".docx"], label="Homework .docx") | |
| btn1 = gr.Button("Read") | |
| header_txt = gr.Textbox(label="Header/Text (for params)", lines=8) | |
| params_json = gr.JSON(label="Detected parameters") | |
| table_df = gr.Dataframe(label="Detected table (normalized)", interactive=False) | |
| # --- Tab 2: Image --- | |
| with gr.Tab("Upload Image (.png/.jpg)"): | |
| img_in = gr.Image(type="pil", label="Photo or screenshot of your table") | |
| btn2 = gr.Button("OCR") | |
| ocr_txt = gr.Textbox(label="Raw OCR text", lines=12) | |
| params_json2 = gr.JSON(label="Detected parameters") | |
| raw_df = gr.Dataframe(label="Raw table guess", interactive=False) | |
| norm_df = gr.Dataframe(label="Detected table (normalized)", interactive=False) | |
| # --- Tab 3: Solve & Check --- | |
| with gr.Tab("Straight-Line • Solve & Check"): | |
| gr.Markdown("Enter params (auto-filled if detected) → build the correct SL schedule → compare to your uploaded table.") | |
| with gr.Row(): | |
| in_cost = gr.Number(label="Cost", value=0.0) | |
| in_salv = gr.Number(label="Salvage", value=0.0) | |
| in_life = gr.Number(label="Life (years)", value=10, precision=0) | |
| in_year = gr.Number(label="Start year", value=2025, precision=0) | |
| btn_use = gr.Button("Use detected params") | |
| btn_build = gr.Button("Build expected schedule") | |
| expected_df = gr.Dataframe(label="Expected (SL) schedule", interactive=False) | |
| btn_check = gr.Button("Check against uploaded table") | |
| deltas_df = gr.Dataframe(label="Differences (student − expected)", interactive=False) | |
| coach_txt = gr.Markdown() | |
| with gr.Tab("Debug"): | |
| dbg_btn = gr.Button("Dump OCR state") | |
| dbg_md = gr.Markdown() | |
| dbg_raw = gr.Dataframe(label="raw_df echo", interactive=False) | |
| dbg_norm = gr.Dataframe(label="norm_df echo", interactive=False) | |
| dbg_last = gr.Dataframe(label="last_table (State) echo", interactive=False) | |
| dbg_img = gr.Image(label="Image echo") | |
| # ---------- Wire events AFTER all components exist ---------- | |
| btn1.click( | |
| handle_docx, | |
| inputs=docx_in, | |
| outputs=[ | |
| header_txt, # text | |
| params_json, # json | |
| table_df, # normalized table (tab 1) | |
| in_cost, in_salv, in_life, in_year, # autofill inputs | |
| last_params, # state | |
| last_table, # state | |
| ], | |
| ) | |
| btn2.click( | |
| handle_image, | |
| inputs=img_in, | |
| outputs=[ | |
| ocr_txt, # raw OCR text | |
| params_json2, # json | |
| raw_df, # raw table | |
| norm_df, # normalized table (tab 2) | |
| in_cost, in_salv, in_life, in_year, # autofill inputs | |
| last_params, # state | |
| last_table, # state | |
| ], | |
| ) | |
| btn_build.click(build_cb, [in_cost, in_salv, in_life, in_year], [expected_df]) | |
| btn_check.click(check_cb, [in_cost, in_salv, in_life, in_year, last_table], [deltas_df, coach_txt]) | |
| dbg_btn.click( | |
| debug_dump, | |
| # inputs come from the components/state already populated by handle_image | |
| inputs=[ocr_txt, params_json2, raw_df, norm_df, last_table, img_in], | |
| outputs=[dbg_md, dbg_raw, dbg_norm, dbg_last, dbg_img], | |
| ) | |
| gr.Markdown("— Echo mode finished. When this looks good, we’ll plug in the SL solver + coaching.") | |
| if __name__ == "__main__": | |
| demo.launch() | |