""" sheets.py — Google Sheets integration layer All reads and writes to the 7-sheet data store go through this module. """ import os import uuid from datetime import datetime, date from typing import Optional import gspread from google.oauth2.service_account import Credentials SCOPES = [ "https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive", ] # Sheet tab names — must match exactly in your Google Sheet SHEET_CANDIDATES = "candidates" SHEET_REPORTS = "reports_log" SHEET_SIGNALS = "signals_log" SHEET_WARNINGS = "warnings_log" SHEET_COMMITMENTS = "commitments" SHEET_EVALUATIONS = "evaluations" SHEET_DECISIONS = "decisions_log" # ───────────────────────────────────────────── # SETUP — create all sheet headers on first run # ───────────────────────────────────────────── SHEET_HEADERS = { SHEET_CANDIDATES: [ "candidate_id", "slack_user_id", "name", "stage", "cohort_id", "ft_slack_id", "hr_slack_id", "week0_start_date", "probation_start_date", "status", "miss_count", "created_at", "department", "channel_id" ], SHEET_REPORTS: [ "report_id", "candidate_id", "submitted_at", "stage", "day_number", "report_number", "worked_on", "stuck_on", "tried", "current_status", "format_valid", "quality_score", "quality_flags", "on_time" ], SHEET_SIGNALS: [ "signal_id", "candidate_id", "day_number", "signal_name", "ft_answer", "signal_value", "logged_at", "source" ], SHEET_WARNINGS: [ "warning_id", "candidate_id", "warning_type", "stage", "trigger_event", "message_sent", "issued_at", "hr_notified" ], SHEET_COMMITMENTS: [ "commitment_id", "candidate_id", "description", "due_date", "due_time", "logged_at", "delivered", "delivered_at", "flagged_proactively", "outcome" ], SHEET_EVALUATIONS: [ "eval_id", "candidate_id", "eval_type", "signal_1", "signal_2", "signal_3", "signal_4", "signal_5", "signal_6", "summary_text", "recommendation", "recommendation_reasoning", "generated_at", "hr_confirmed", "hr_confirmed_at", "outcome_communicated" ], SHEET_DECISIONS: [ "decision_id", "candidate_id", "decision_type", "recommended_by", "confirmed_by", "message_posted", "decided_at", "stage_at_decision", "overridden" ], } def setup_sheets() -> None: """ Creates all required tabs and adds headers if missing. Safe to call multiple times — skips tabs that already have headers. """ import time client = get_client() spreadsheet = client.open_by_key(os.environ["GOOGLE_SHEET_ID"]) existing_tabs = [ws.title for ws in spreadsheet.worksheets()] for tab_name, headers in SHEET_HEADERS.items(): time.sleep(1) # avoid hitting Sheets API rate limit if tab_name not in existing_tabs: ws = spreadsheet.add_worksheet(title=tab_name, rows=1000, cols=len(headers)) ws.append_row(headers) print(f"Created tab: {tab_name}") else: ws = spreadsheet.worksheet(tab_name) first_row = ws.row_values(1) if not first_row: ws.insert_row(headers, 1) print(f"Added headers to: {tab_name}") else: print(f"Tab already set up: {tab_name}") def get_client() -> gspread.Client: # Check multiple possible paths for HuggingFace Spaces compatibility possible_paths = [ os.environ.get("GOOGLE_SERVICE_ACCOUNT_PATH", ""), "/home/user/app/service_account.json", "/app/service_account.json", "service_account.json", ] path = next((p for p in possible_paths if p and os.path.exists(p)), None) if not path: raise FileNotFoundError( f"service_account.json not found. Checked: {possible_paths}" ) creds = Credentials.from_service_account_file(path, scopes=SCOPES) return gspread.authorize(creds) def get_sheet(tab_name: str) -> gspread.Worksheet: client = get_client() spreadsheet = client.open_by_key(os.environ["GOOGLE_SHEET_ID"]) return spreadsheet.worksheet(tab_name) def _now() -> str: """Returns current IST timestamp.""" from datetime import timezone, timedelta IST = timezone(timedelta(hours=5, minutes=30)) return datetime.now(IST).isoformat() def _gen_id(prefix: str) -> str: return f"{prefix}_{uuid.uuid4().hex[:6].upper()}" # ───────────────────────────────────────────── # CANDIDATES # ───────────────────────────────────────────── CANDIDATES_HEADERS = [ "candidate_id", "slack_user_id", "name", "stage", "cohort_id", "ft_slack_id", "hr_slack_id", "week0_start_date", "probation_start_date", "status", "miss_count", "created_at", "department", "channel_id" ] def _get_records(tab_name: str, headers: list) -> list[dict]: """Safe get_all_records — falls back gracefully if headers mismatch.""" ws = get_sheet(tab_name) try: return ws.get_all_records(expected_headers=headers) except Exception: try: return ws.get_all_records() except Exception: return [] def get_candidate_by_slack_id(slack_user_id: str) -> Optional[dict]: """Return candidate dict or None if not found.""" records = _get_records(SHEET_CANDIDATES, CANDIDATES_HEADERS) for row in records: if row["slack_user_id"] == slack_user_id: return row return None def get_candidate_by_id(candidate_id: str) -> Optional[dict]: """Return candidate dict by candidate_id or None if not found.""" records = _get_records(SHEET_CANDIDATES, CANDIDATES_HEADERS) for row in records: if str(row.get("candidate_id", "")).strip() == candidate_id.strip(): return row return None def get_active_candidates() -> list[dict]: records = _get_records(SHEET_CANDIDATES, CANDIDATES_HEADERS) return [r for r in records if str(r.get("status", "")) not in ("eliminated", "cleared")] def create_candidate( slack_user_id: str, name: str, cohort_id: str, ft_slack_id: str, hr_slack_id: str, week0_start_date: str, stage: str = "week0", department: str = "general", channel_id: str = "", ) -> dict: import re # Normalise date to YYYY-MM-DD regardless of input format date_str = week0_start_date.strip() # Try parsing common formats for fmt in ("%Y-%m-%d", "%d %b %Y", "%d-%m-%Y", "%d/%m/%Y", "%B %d %Y"): try: from datetime import datetime date_str = datetime.strptime(date_str, fmt).strftime("%Y-%m-%d") break except ValueError: continue ws = get_sheet(SHEET_CANDIDATES) candidate_id = _gen_id("CAND") row = [ candidate_id, slack_user_id, name, stage, cohort_id, ft_slack_id, hr_slack_id, date_str, "", # probation_start_date "active", # status 0, # miss_count _now(), # created_at department, channel_id, ] ws.append_row(row, value_input_option="RAW", insert_data_option="INSERT_ROWS", table_range="A1") # Return the constructed dict directly — avoids get_all_records rate limit issue return { "candidate_id": candidate_id, "slack_user_id": slack_user_id, "name": name, "stage": stage, "cohort_id": cohort_id, "ft_slack_id": ft_slack_id, "hr_slack_id": hr_slack_id, "week0_start_date": date_str, "probation_start_date": "", "status": "active", "miss_count": 0, "created_at": _now(), "department": department, "channel_id": channel_id, } def update_candidate_field(candidate_id: str, field: str, value) -> None: """Update a single field on a candidate row by candidate_id.""" ws = get_sheet(SHEET_CANDIDATES) records = ws.get_all_records(expected_headers=CANDIDATES_HEADERS) headers = ws.row_values(1) col_index = headers.index(field) + 1 # 1-based for i, row in enumerate(records, start=2): if row["candidate_id"] == candidate_id: ws.update_cell(i, col_index, value) return raise ValueError(f"candidate_id {candidate_id} not found") def increment_miss_count(candidate_id: str) -> int: ws = get_sheet(SHEET_CANDIDATES) records = ws.get_all_records(expected_headers=CANDIDATES_HEADERS) headers = ws.row_values(1) miss_col = headers.index("miss_count") + 1 for i, row in enumerate(records, start=2): if row["candidate_id"] == candidate_id: new_count = int(row["miss_count"]) + 1 ws.update_cell(i, miss_col, new_count) return new_count raise ValueError(f"candidate_id {candidate_id} not found") # ───────────────────────────────────────────── # REPORTS LOG # ───────────────────────────────────────────── def log_report( candidate_id: str, stage: str, day_number: int, report_number: int, worked_on: str, stuck_on: str, tried: str, current_status: str, format_valid: bool, quality_score: int, quality_flags: str, on_time: bool, ) -> str: ws = get_sheet(SHEET_REPORTS) report_id = _gen_id("RPT") row = [ report_id, candidate_id, _now(), stage, day_number, report_number, worked_on, stuck_on, tried, current_status, str(format_valid), quality_score, quality_flags, str(on_time), ] ws.append_row(row) return report_id def get_reports_for_candidate(candidate_id: str, stage: str = None) -> list[dict]: ws = get_sheet(SHEET_REPORTS) records = ws.get_all_records() results = [r for r in records if r["candidate_id"] == candidate_id] if stage: results = [r for r in results if r["stage"] == stage] return results def did_submit_in_window(candidate_id: str, window_start: str, window_end: str) -> bool: """Check if candidate submitted a valid report within the given time window.""" ws = get_sheet(SHEET_REPORTS) records = ws.get_all_records() for r in records: if r["candidate_id"] != candidate_id: continue if r["format_valid"] != "True": continue submitted = r["submitted_at"] if window_start <= submitted <= window_end: return True return False # ───────────────────────────────────────────── # SIGNALS LOG # ───────────────────────────────────────────── def log_signal( candidate_id: str, day_number: int, signal_name: str, ft_answer: str, signal_value: str, source: str = "ft_answer", ) -> str: ws = get_sheet(SHEET_SIGNALS) signal_id = _gen_id("SIG") row = [ signal_id, candidate_id, day_number, signal_name, ft_answer, signal_value, # strong | weak | flag _now(), source, ] ws.append_row(row) return signal_id def get_signals_for_candidate(candidate_id: str, week: str = None) -> list[dict]: """ week: 'w1' = days 1-5, 'w2' = days 6-10, None = all """ ws = get_sheet(SHEET_SIGNALS) records = ws.get_all_records() results = [r for r in records if r["candidate_id"] == candidate_id] if week == "w1": results = [r for r in results if int(r["day_number"]) <= 5] elif week == "w2": results = [r for r in results if int(r["day_number"]) >= 6] return results # ───────────────────────────────────────────── # WARNINGS LOG # ───────────────────────────────────────────── def log_warning( candidate_id: str, warning_type: str, stage: str, trigger_event: str, message_sent: str, hr_notified: bool, ) -> str: ws = get_sheet(SHEET_WARNINGS) warning_id = _gen_id("WARN") row = [ warning_id, candidate_id, warning_type, stage, trigger_event, message_sent, _now(), str(hr_notified), ] ws.append_row(row) return warning_id def get_warnings_for_candidate(candidate_id: str) -> list[dict]: ws = get_sheet(SHEET_WARNINGS) records = ws.get_all_records() return [r for r in records if r["candidate_id"] == candidate_id] # ───────────────────────────────────────────── # COMMITMENTS # ───────────────────────────────────────────── def log_commitment( candidate_id: str, description: str, due_date: str, due_time: str, ) -> str: ws = get_sheet(SHEET_COMMITMENTS) commitment_id = _gen_id("COM") row = [ commitment_id, candidate_id, description, due_date, due_time, _now(), # logged_at "", # delivered "", # delivered_at "", # flagged_proactively "", # outcome ] ws.append_row(row) return commitment_id def update_commitment_outcome( commitment_id: str, delivered: bool, delivered_at: str, flagged_proactively: bool, outcome: str, ) -> None: ws = get_sheet(SHEET_COMMITMENTS) records = ws.get_all_records() headers = ws.row_values(1) for i, row in enumerate(records, start=2): if row["commitment_id"] == commitment_id: updates = { "delivered": str(delivered), "delivered_at": delivered_at, "flagged_proactively": str(flagged_proactively), "outcome": outcome, } for field, value in updates.items(): col = headers.index(field) + 1 ws.update_cell(i, col, value) return def get_due_commitments(due_date: str) -> list[dict]: """Return all commitments due on a given date with no outcome yet.""" ws = get_sheet(SHEET_COMMITMENTS) records = ws.get_all_records() return [ r for r in records if r["due_date"] == due_date and r["outcome"] == "" ] def get_commitments_for_candidate(candidate_id: str) -> list[dict]: ws = get_sheet(SHEET_COMMITMENTS) records = ws.get_all_records() return [r for r in records if r["candidate_id"] == candidate_id] # ───────────────────────────────────────────── # EVALUATIONS # ───────────────────────────────────────────── def log_evaluation( candidate_id: str, eval_type: str, # week0 | probation_w1 | probation_w2 signals: dict, # {signal_name: {value, evidence}} summary_text: str, recommendation: str, recommendation_reasoning: str, ) -> str: ws = get_sheet(SHEET_EVALUATIONS) eval_id = _gen_id("EVAL") # Flatten signals into ordered columns signal_values = [ f"{k}: {v['value']} — {v['evidence']}" for k, v in signals.items() ] # Pad to 6 signal columns while len(signal_values) < 6: signal_values.append("") row = [ eval_id, candidate_id, eval_type, *signal_values[:6], summary_text, recommendation, recommendation_reasoning, _now(), # generated_at "", # hr_confirmed "", # hr_confirmed_at "", # outcome_communicated ] ws.append_row(row) return eval_id def get_latest_evaluation(candidate_id: str, eval_type: str) -> Optional[dict]: ws = get_sheet(SHEET_EVALUATIONS) records = ws.get_all_records() matches = [ r for r in records if r["candidate_id"] == candidate_id and r["eval_type"] == eval_type ] return matches[-1] if matches else None def confirm_evaluation(eval_id: str) -> None: ws = get_sheet(SHEET_EVALUATIONS) records = ws.get_all_records() headers = ws.row_values(1) for i, row in enumerate(records, start=2): if row["eval_id"] == eval_id: ws.update_cell(i, headers.index("hr_confirmed") + 1, "True") ws.update_cell(i, headers.index("hr_confirmed_at") + 1, _now()) return def mark_outcome_communicated(eval_id: str) -> None: ws = get_sheet(SHEET_EVALUATIONS) records = ws.get_all_records() headers = ws.row_values(1) for i, row in enumerate(records, start=2): if row["eval_id"] == eval_id: ws.update_cell(i, headers.index("outcome_communicated") + 1, "True") return # ───────────────────────────────────────────── # DECISIONS LOG # ───────────────────────────────────────────── def log_decision( candidate_id: str, decision_type: str, recommended_by: str, confirmed_by: str, message_posted: str, stage_at_decision: str, overridden: bool = False, ) -> str: ws = get_sheet(SHEET_DECISIONS) decision_id = _gen_id("DEC") row = [ decision_id, candidate_id, decision_type, recommended_by, confirmed_by, message_posted, _now(), stage_at_decision, str(overridden), ] ws.append_row(row) return decision_id def get_pending_recommendation(candidate_id: str) -> Optional[dict]: """ Returns the most recent evaluation that has been generated but not yet HR-confirmed — i.e. awaiting HR approval. """ ws = get_sheet(SHEET_EVALUATIONS) records = ws.get_all_records() pending = [ r for r in records if r["candidate_id"] == candidate_id and r["hr_confirmed"] == "" and r["recommendation"] != "" ] return pending[-1] if pending else None