Spaces:
Running
Running
| """ | |
| sheets.py β Google Sheets integration layer | |
| All reads and writes to the 7-sheet data store go through this module. | |
| """ | |
| import os | |
| import uuid | |
| from datetime import datetime, date | |
| from typing import Optional | |
| import gspread | |
| from google.oauth2.service_account import Credentials | |
| SCOPES = [ | |
| "https://www.googleapis.com/auth/spreadsheets", | |
| "https://www.googleapis.com/auth/drive", | |
| ] | |
| # Sheet tab names β must match exactly in your Google Sheet | |
| SHEET_CANDIDATES = "candidates" | |
| SHEET_REPORTS = "reports_log" | |
| SHEET_SIGNALS = "signals_log" | |
| SHEET_WARNINGS = "warnings_log" | |
| SHEET_COMMITMENTS = "commitments" | |
| SHEET_EVALUATIONS = "evaluations" | |
| SHEET_DECISIONS = "decisions_log" | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # SETUP β create all sheet headers on first run | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| SHEET_HEADERS = { | |
| SHEET_CANDIDATES: [ | |
| "candidate_id", "slack_user_id", "name", "stage", "cohort_id", | |
| "ft_slack_id", "hr_slack_id", "week0_start_date", "probation_start_date", | |
| "status", "miss_count", "created_at", "department", "channel_id" | |
| ], | |
| SHEET_REPORTS: [ | |
| "report_id", "candidate_id", "submitted_at", "stage", "day_number", | |
| "report_number", "worked_on", "stuck_on", "tried", "current_status", | |
| "format_valid", "quality_score", "quality_flags", "on_time" | |
| ], | |
| SHEET_SIGNALS: [ | |
| "signal_id", "candidate_id", "day_number", "signal_name", | |
| "ft_answer", "signal_value", "logged_at", "source" | |
| ], | |
| SHEET_WARNINGS: [ | |
| "warning_id", "candidate_id", "warning_type", "stage", | |
| "trigger_event", "message_sent", "issued_at", "hr_notified" | |
| ], | |
| SHEET_COMMITMENTS: [ | |
| "commitment_id", "candidate_id", "description", "due_date", | |
| "due_time", "logged_at", "delivered", "delivered_at", | |
| "flagged_proactively", "outcome" | |
| ], | |
| SHEET_EVALUATIONS: [ | |
| "eval_id", "candidate_id", "eval_type", | |
| "signal_1", "signal_2", "signal_3", "signal_4", "signal_5", "signal_6", | |
| "summary_text", "recommendation", "recommendation_reasoning", | |
| "generated_at", "hr_confirmed", "hr_confirmed_at", "outcome_communicated" | |
| ], | |
| SHEET_DECISIONS: [ | |
| "decision_id", "candidate_id", "decision_type", "recommended_by", | |
| "confirmed_by", "message_posted", "decided_at", "stage_at_decision", | |
| "overridden" | |
| ], | |
| } | |
| def setup_sheets() -> None: | |
| """ | |
| Creates all required tabs and adds headers if missing. | |
| Safe to call multiple times β skips tabs that already have headers. | |
| """ | |
| import time | |
| client = get_client() | |
| spreadsheet = client.open_by_key(os.environ["GOOGLE_SHEET_ID"]) | |
| existing_tabs = [ws.title for ws in spreadsheet.worksheets()] | |
| for tab_name, headers in SHEET_HEADERS.items(): | |
| time.sleep(1) # avoid hitting Sheets API rate limit | |
| if tab_name not in existing_tabs: | |
| ws = spreadsheet.add_worksheet(title=tab_name, rows=1000, cols=len(headers)) | |
| ws.append_row(headers) | |
| print(f"Created tab: {tab_name}") | |
| else: | |
| ws = spreadsheet.worksheet(tab_name) | |
| first_row = ws.row_values(1) | |
| if not first_row: | |
| ws.insert_row(headers, 1) | |
| print(f"Added headers to: {tab_name}") | |
| else: | |
| print(f"Tab already set up: {tab_name}") | |
| def get_client() -> gspread.Client: | |
| # Check multiple possible paths for HuggingFace Spaces compatibility | |
| possible_paths = [ | |
| os.environ.get("GOOGLE_SERVICE_ACCOUNT_PATH", ""), | |
| "/home/user/app/service_account.json", | |
| "/app/service_account.json", | |
| "service_account.json", | |
| ] | |
| path = next((p for p in possible_paths if p and os.path.exists(p)), None) | |
| if not path: | |
| raise FileNotFoundError( | |
| f"service_account.json not found. Checked: {possible_paths}" | |
| ) | |
| creds = Credentials.from_service_account_file(path, scopes=SCOPES) | |
| return gspread.authorize(creds) | |
| def get_sheet(tab_name: str) -> gspread.Worksheet: | |
| client = get_client() | |
| spreadsheet = client.open_by_key(os.environ["GOOGLE_SHEET_ID"]) | |
| return spreadsheet.worksheet(tab_name) | |
| def _now() -> str: | |
| """Returns current IST timestamp.""" | |
| from datetime import timezone, timedelta | |
| IST = timezone(timedelta(hours=5, minutes=30)) | |
| return datetime.now(IST).isoformat() | |
| def _gen_id(prefix: str) -> str: | |
| return f"{prefix}_{uuid.uuid4().hex[:6].upper()}" | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # CANDIDATES | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| CANDIDATES_HEADERS = [ | |
| "candidate_id", "slack_user_id", "name", "stage", "cohort_id", | |
| "ft_slack_id", "hr_slack_id", "week0_start_date", "probation_start_date", | |
| "status", "miss_count", "created_at", "department", "channel_id" | |
| ] | |
| def _get_records(tab_name: str, headers: list) -> list[dict]: | |
| """Safe get_all_records β falls back gracefully if headers mismatch.""" | |
| ws = get_sheet(tab_name) | |
| try: | |
| return ws.get_all_records(expected_headers=headers) | |
| except Exception: | |
| try: | |
| return ws.get_all_records() | |
| except Exception: | |
| return [] | |
| def get_candidate_by_slack_id(slack_user_id: str) -> Optional[dict]: | |
| """Return candidate dict or None if not found.""" | |
| records = _get_records(SHEET_CANDIDATES, CANDIDATES_HEADERS) | |
| for row in records: | |
| if row["slack_user_id"] == slack_user_id: | |
| return row | |
| return None | |
| def get_candidate_by_id(candidate_id: str) -> Optional[dict]: | |
| """Return candidate dict by candidate_id or None if not found.""" | |
| records = _get_records(SHEET_CANDIDATES, CANDIDATES_HEADERS) | |
| for row in records: | |
| if str(row.get("candidate_id", "")).strip() == candidate_id.strip(): | |
| return row | |
| return None | |
| def get_active_candidates() -> list[dict]: | |
| records = _get_records(SHEET_CANDIDATES, CANDIDATES_HEADERS) | |
| return [r for r in records if str(r.get("status", "")) not in ("eliminated", "cleared")] | |
| def create_candidate( | |
| slack_user_id: str, | |
| name: str, | |
| cohort_id: str, | |
| ft_slack_id: str, | |
| hr_slack_id: str, | |
| week0_start_date: str, | |
| stage: str = "week0", | |
| department: str = "general", | |
| channel_id: str = "", | |
| ) -> dict: | |
| import re | |
| # Normalise date to YYYY-MM-DD regardless of input format | |
| date_str = week0_start_date.strip() | |
| # Try parsing common formats | |
| for fmt in ("%Y-%m-%d", "%d %b %Y", "%d-%m-%Y", "%d/%m/%Y", "%B %d %Y"): | |
| try: | |
| from datetime import datetime | |
| date_str = datetime.strptime(date_str, fmt).strftime("%Y-%m-%d") | |
| break | |
| except ValueError: | |
| continue | |
| ws = get_sheet(SHEET_CANDIDATES) | |
| candidate_id = _gen_id("CAND") | |
| row = [ | |
| candidate_id, | |
| slack_user_id, | |
| name, | |
| stage, | |
| cohort_id, | |
| ft_slack_id, | |
| hr_slack_id, | |
| date_str, | |
| "", # probation_start_date | |
| "active", # status | |
| 0, # miss_count | |
| _now(), # created_at | |
| department, | |
| channel_id, | |
| ] | |
| ws.append_row(row, value_input_option="RAW", insert_data_option="INSERT_ROWS", table_range="A1") | |
| # Return the constructed dict directly β avoids get_all_records rate limit issue | |
| return { | |
| "candidate_id": candidate_id, | |
| "slack_user_id": slack_user_id, | |
| "name": name, | |
| "stage": stage, | |
| "cohort_id": cohort_id, | |
| "ft_slack_id": ft_slack_id, | |
| "hr_slack_id": hr_slack_id, | |
| "week0_start_date": date_str, | |
| "probation_start_date": "", | |
| "status": "active", | |
| "miss_count": 0, | |
| "created_at": _now(), | |
| "department": department, | |
| "channel_id": channel_id, | |
| } | |
| def update_candidate_field(candidate_id: str, field: str, value) -> None: | |
| """Update a single field on a candidate row by candidate_id.""" | |
| ws = get_sheet(SHEET_CANDIDATES) | |
| records = ws.get_all_records(expected_headers=CANDIDATES_HEADERS) | |
| headers = ws.row_values(1) | |
| col_index = headers.index(field) + 1 # 1-based | |
| for i, row in enumerate(records, start=2): | |
| if row["candidate_id"] == candidate_id: | |
| ws.update_cell(i, col_index, value) | |
| return | |
| raise ValueError(f"candidate_id {candidate_id} not found") | |
| def increment_miss_count(candidate_id: str) -> int: | |
| ws = get_sheet(SHEET_CANDIDATES) | |
| records = ws.get_all_records(expected_headers=CANDIDATES_HEADERS) | |
| headers = ws.row_values(1) | |
| miss_col = headers.index("miss_count") + 1 | |
| for i, row in enumerate(records, start=2): | |
| if row["candidate_id"] == candidate_id: | |
| new_count = int(row["miss_count"]) + 1 | |
| ws.update_cell(i, miss_col, new_count) | |
| return new_count | |
| raise ValueError(f"candidate_id {candidate_id} not found") | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # REPORTS LOG | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def log_report( | |
| candidate_id: str, | |
| stage: str, | |
| day_number: int, | |
| report_number: int, | |
| worked_on: str, | |
| stuck_on: str, | |
| tried: str, | |
| current_status: str, | |
| format_valid: bool, | |
| quality_score: int, | |
| quality_flags: str, | |
| on_time: bool, | |
| ) -> str: | |
| ws = get_sheet(SHEET_REPORTS) | |
| report_id = _gen_id("RPT") | |
| row = [ | |
| report_id, | |
| candidate_id, | |
| _now(), | |
| stage, | |
| day_number, | |
| report_number, | |
| worked_on, | |
| stuck_on, | |
| tried, | |
| current_status, | |
| str(format_valid), | |
| quality_score, | |
| quality_flags, | |
| str(on_time), | |
| ] | |
| ws.append_row(row) | |
| return report_id | |
| def get_reports_for_candidate(candidate_id: str, stage: str = None) -> list[dict]: | |
| ws = get_sheet(SHEET_REPORTS) | |
| records = ws.get_all_records() | |
| results = [r for r in records if r["candidate_id"] == candidate_id] | |
| if stage: | |
| results = [r for r in results if r["stage"] == stage] | |
| return results | |
| def did_submit_in_window(candidate_id: str, window_start: str, window_end: str) -> bool: | |
| """Check if candidate submitted a valid report within the given time window.""" | |
| ws = get_sheet(SHEET_REPORTS) | |
| records = ws.get_all_records() | |
| for r in records: | |
| if r["candidate_id"] != candidate_id: | |
| continue | |
| if r["format_valid"] != "True": | |
| continue | |
| submitted = r["submitted_at"] | |
| if window_start <= submitted <= window_end: | |
| return True | |
| return False | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # SIGNALS LOG | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def log_signal( | |
| candidate_id: str, | |
| day_number: int, | |
| signal_name: str, | |
| ft_answer: str, | |
| signal_value: str, | |
| source: str = "ft_answer", | |
| ) -> str: | |
| ws = get_sheet(SHEET_SIGNALS) | |
| signal_id = _gen_id("SIG") | |
| row = [ | |
| signal_id, | |
| candidate_id, | |
| day_number, | |
| signal_name, | |
| ft_answer, | |
| signal_value, # strong | weak | flag | |
| _now(), | |
| source, | |
| ] | |
| ws.append_row(row) | |
| return signal_id | |
| def get_signals_for_candidate(candidate_id: str, week: str = None) -> list[dict]: | |
| """ | |
| week: 'w1' = days 1-5, 'w2' = days 6-10, None = all | |
| """ | |
| ws = get_sheet(SHEET_SIGNALS) | |
| records = ws.get_all_records() | |
| results = [r for r in records if r["candidate_id"] == candidate_id] | |
| if week == "w1": | |
| results = [r for r in results if int(r["day_number"]) <= 5] | |
| elif week == "w2": | |
| results = [r for r in results if int(r["day_number"]) >= 6] | |
| return results | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # WARNINGS LOG | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def log_warning( | |
| candidate_id: str, | |
| warning_type: str, | |
| stage: str, | |
| trigger_event: str, | |
| message_sent: str, | |
| hr_notified: bool, | |
| ) -> str: | |
| ws = get_sheet(SHEET_WARNINGS) | |
| warning_id = _gen_id("WARN") | |
| row = [ | |
| warning_id, | |
| candidate_id, | |
| warning_type, | |
| stage, | |
| trigger_event, | |
| message_sent, | |
| _now(), | |
| str(hr_notified), | |
| ] | |
| ws.append_row(row) | |
| return warning_id | |
| def get_warnings_for_candidate(candidate_id: str) -> list[dict]: | |
| ws = get_sheet(SHEET_WARNINGS) | |
| records = ws.get_all_records() | |
| return [r for r in records if r["candidate_id"] == candidate_id] | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # COMMITMENTS | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def log_commitment( | |
| candidate_id: str, | |
| description: str, | |
| due_date: str, | |
| due_time: str, | |
| ) -> str: | |
| ws = get_sheet(SHEET_COMMITMENTS) | |
| commitment_id = _gen_id("COM") | |
| row = [ | |
| commitment_id, | |
| candidate_id, | |
| description, | |
| due_date, | |
| due_time, | |
| _now(), # logged_at | |
| "", # delivered | |
| "", # delivered_at | |
| "", # flagged_proactively | |
| "", # outcome | |
| ] | |
| ws.append_row(row) | |
| return commitment_id | |
| def update_commitment_outcome( | |
| commitment_id: str, | |
| delivered: bool, | |
| delivered_at: str, | |
| flagged_proactively: bool, | |
| outcome: str, | |
| ) -> None: | |
| ws = get_sheet(SHEET_COMMITMENTS) | |
| records = ws.get_all_records() | |
| headers = ws.row_values(1) | |
| for i, row in enumerate(records, start=2): | |
| if row["commitment_id"] == commitment_id: | |
| updates = { | |
| "delivered": str(delivered), | |
| "delivered_at": delivered_at, | |
| "flagged_proactively": str(flagged_proactively), | |
| "outcome": outcome, | |
| } | |
| for field, value in updates.items(): | |
| col = headers.index(field) + 1 | |
| ws.update_cell(i, col, value) | |
| return | |
| def get_due_commitments(due_date: str) -> list[dict]: | |
| """Return all commitments due on a given date with no outcome yet.""" | |
| ws = get_sheet(SHEET_COMMITMENTS) | |
| records = ws.get_all_records() | |
| return [ | |
| r for r in records | |
| if r["due_date"] == due_date and r["outcome"] == "" | |
| ] | |
| def get_commitments_for_candidate(candidate_id: str) -> list[dict]: | |
| ws = get_sheet(SHEET_COMMITMENTS) | |
| records = ws.get_all_records() | |
| return [r for r in records if r["candidate_id"] == candidate_id] | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # EVALUATIONS | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def log_evaluation( | |
| candidate_id: str, | |
| eval_type: str, # week0 | probation_w1 | probation_w2 | |
| signals: dict, # {signal_name: {value, evidence}} | |
| summary_text: str, | |
| recommendation: str, | |
| recommendation_reasoning: str, | |
| ) -> str: | |
| ws = get_sheet(SHEET_EVALUATIONS) | |
| eval_id = _gen_id("EVAL") | |
| # Flatten signals into ordered columns | |
| signal_values = [ | |
| f"{k}: {v['value']} β {v['evidence']}" | |
| for k, v in signals.items() | |
| ] | |
| # Pad to 6 signal columns | |
| while len(signal_values) < 6: | |
| signal_values.append("") | |
| row = [ | |
| eval_id, | |
| candidate_id, | |
| eval_type, | |
| *signal_values[:6], | |
| summary_text, | |
| recommendation, | |
| recommendation_reasoning, | |
| _now(), # generated_at | |
| "", # hr_confirmed | |
| "", # hr_confirmed_at | |
| "", # outcome_communicated | |
| ] | |
| ws.append_row(row) | |
| return eval_id | |
| def get_latest_evaluation(candidate_id: str, eval_type: str) -> Optional[dict]: | |
| ws = get_sheet(SHEET_EVALUATIONS) | |
| records = ws.get_all_records() | |
| matches = [ | |
| r for r in records | |
| if r["candidate_id"] == candidate_id and r["eval_type"] == eval_type | |
| ] | |
| return matches[-1] if matches else None | |
| def confirm_evaluation(eval_id: str) -> None: | |
| ws = get_sheet(SHEET_EVALUATIONS) | |
| records = ws.get_all_records() | |
| headers = ws.row_values(1) | |
| for i, row in enumerate(records, start=2): | |
| if row["eval_id"] == eval_id: | |
| ws.update_cell(i, headers.index("hr_confirmed") + 1, "True") | |
| ws.update_cell(i, headers.index("hr_confirmed_at") + 1, _now()) | |
| return | |
| def mark_outcome_communicated(eval_id: str) -> None: | |
| ws = get_sheet(SHEET_EVALUATIONS) | |
| records = ws.get_all_records() | |
| headers = ws.row_values(1) | |
| for i, row in enumerate(records, start=2): | |
| if row["eval_id"] == eval_id: | |
| ws.update_cell(i, headers.index("outcome_communicated") + 1, "True") | |
| return | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # DECISIONS LOG | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def log_decision( | |
| candidate_id: str, | |
| decision_type: str, | |
| recommended_by: str, | |
| confirmed_by: str, | |
| message_posted: str, | |
| stage_at_decision: str, | |
| overridden: bool = False, | |
| ) -> str: | |
| ws = get_sheet(SHEET_DECISIONS) | |
| decision_id = _gen_id("DEC") | |
| row = [ | |
| decision_id, | |
| candidate_id, | |
| decision_type, | |
| recommended_by, | |
| confirmed_by, | |
| message_posted, | |
| _now(), | |
| stage_at_decision, | |
| str(overridden), | |
| ] | |
| ws.append_row(row) | |
| return decision_id | |
| def get_pending_recommendation(candidate_id: str) -> Optional[dict]: | |
| """ | |
| Returns the most recent evaluation that has been generated | |
| but not yet HR-confirmed β i.e. awaiting HR approval. | |
| """ | |
| ws = get_sheet(SHEET_EVALUATIONS) | |
| records = ws.get_all_records() | |
| pending = [ | |
| r for r in records | |
| if r["candidate_id"] == candidate_id | |
| and r["hr_confirmed"] == "" | |
| and r["recommendation"] != "" | |
| ] | |
| return pending[-1] if pending else None |