interns_manager / sheets.py
banao-tech's picture
Update sheets.py
3591b57 verified
"""
sheets.py β€” Google Sheets integration layer
All reads and writes to the 7-sheet data store go through this module.
"""
import os
import uuid
from datetime import datetime, date
from typing import Optional
import gspread
from google.oauth2.service_account import Credentials
SCOPES = [
"https://www.googleapis.com/auth/spreadsheets",
"https://www.googleapis.com/auth/drive",
]
# Sheet tab names β€” must match exactly in your Google Sheet
SHEET_CANDIDATES = "candidates"
SHEET_REPORTS = "reports_log"
SHEET_SIGNALS = "signals_log"
SHEET_WARNINGS = "warnings_log"
SHEET_COMMITMENTS = "commitments"
SHEET_EVALUATIONS = "evaluations"
SHEET_DECISIONS = "decisions_log"
# ─────────────────────────────────────────────
# SETUP β€” create all sheet headers on first run
# ─────────────────────────────────────────────
SHEET_HEADERS = {
SHEET_CANDIDATES: [
"candidate_id", "slack_user_id", "name", "stage", "cohort_id",
"ft_slack_id", "hr_slack_id", "week0_start_date", "probation_start_date",
"status", "miss_count", "created_at", "department", "channel_id"
],
SHEET_REPORTS: [
"report_id", "candidate_id", "submitted_at", "stage", "day_number",
"report_number", "worked_on", "stuck_on", "tried", "current_status",
"format_valid", "quality_score", "quality_flags", "on_time"
],
SHEET_SIGNALS: [
"signal_id", "candidate_id", "day_number", "signal_name",
"ft_answer", "signal_value", "logged_at", "source"
],
SHEET_WARNINGS: [
"warning_id", "candidate_id", "warning_type", "stage",
"trigger_event", "message_sent", "issued_at", "hr_notified"
],
SHEET_COMMITMENTS: [
"commitment_id", "candidate_id", "description", "due_date",
"due_time", "logged_at", "delivered", "delivered_at",
"flagged_proactively", "outcome"
],
SHEET_EVALUATIONS: [
"eval_id", "candidate_id", "eval_type",
"signal_1", "signal_2", "signal_3", "signal_4", "signal_5", "signal_6",
"summary_text", "recommendation", "recommendation_reasoning",
"generated_at", "hr_confirmed", "hr_confirmed_at", "outcome_communicated"
],
SHEET_DECISIONS: [
"decision_id", "candidate_id", "decision_type", "recommended_by",
"confirmed_by", "message_posted", "decided_at", "stage_at_decision",
"overridden"
],
}
def setup_sheets() -> None:
"""
Creates all required tabs and adds headers if missing.
Safe to call multiple times β€” skips tabs that already have headers.
"""
import time
client = get_client()
spreadsheet = client.open_by_key(os.environ["GOOGLE_SHEET_ID"])
existing_tabs = [ws.title for ws in spreadsheet.worksheets()]
for tab_name, headers in SHEET_HEADERS.items():
time.sleep(1) # avoid hitting Sheets API rate limit
if tab_name not in existing_tabs:
ws = spreadsheet.add_worksheet(title=tab_name, rows=1000, cols=len(headers))
ws.append_row(headers)
print(f"Created tab: {tab_name}")
else:
ws = spreadsheet.worksheet(tab_name)
first_row = ws.row_values(1)
if not first_row:
ws.insert_row(headers, 1)
print(f"Added headers to: {tab_name}")
else:
print(f"Tab already set up: {tab_name}")
def get_client() -> gspread.Client:
# Check multiple possible paths for HuggingFace Spaces compatibility
possible_paths = [
os.environ.get("GOOGLE_SERVICE_ACCOUNT_PATH", ""),
"/home/user/app/service_account.json",
"/app/service_account.json",
"service_account.json",
]
path = next((p for p in possible_paths if p and os.path.exists(p)), None)
if not path:
raise FileNotFoundError(
f"service_account.json not found. Checked: {possible_paths}"
)
creds = Credentials.from_service_account_file(path, scopes=SCOPES)
return gspread.authorize(creds)
def get_sheet(tab_name: str) -> gspread.Worksheet:
client = get_client()
spreadsheet = client.open_by_key(os.environ["GOOGLE_SHEET_ID"])
return spreadsheet.worksheet(tab_name)
def _now() -> str:
"""Returns current IST timestamp."""
from datetime import timezone, timedelta
IST = timezone(timedelta(hours=5, minutes=30))
return datetime.now(IST).isoformat()
def _gen_id(prefix: str) -> str:
return f"{prefix}_{uuid.uuid4().hex[:6].upper()}"
# ─────────────────────────────────────────────
# CANDIDATES
# ─────────────────────────────────────────────
CANDIDATES_HEADERS = [
"candidate_id", "slack_user_id", "name", "stage", "cohort_id",
"ft_slack_id", "hr_slack_id", "week0_start_date", "probation_start_date",
"status", "miss_count", "created_at", "department", "channel_id"
]
def _get_records(tab_name: str, headers: list) -> list[dict]:
"""Safe get_all_records β€” falls back gracefully if headers mismatch."""
ws = get_sheet(tab_name)
try:
return ws.get_all_records(expected_headers=headers)
except Exception:
try:
return ws.get_all_records()
except Exception:
return []
def get_candidate_by_slack_id(slack_user_id: str) -> Optional[dict]:
"""Return candidate dict or None if not found."""
records = _get_records(SHEET_CANDIDATES, CANDIDATES_HEADERS)
for row in records:
if row["slack_user_id"] == slack_user_id:
return row
return None
def get_candidate_by_id(candidate_id: str) -> Optional[dict]:
"""Return candidate dict by candidate_id or None if not found."""
records = _get_records(SHEET_CANDIDATES, CANDIDATES_HEADERS)
for row in records:
if str(row.get("candidate_id", "")).strip() == candidate_id.strip():
return row
return None
def get_active_candidates() -> list[dict]:
records = _get_records(SHEET_CANDIDATES, CANDIDATES_HEADERS)
return [r for r in records if str(r.get("status", "")) not in ("eliminated", "cleared")]
def create_candidate(
slack_user_id: str,
name: str,
cohort_id: str,
ft_slack_id: str,
hr_slack_id: str,
week0_start_date: str,
stage: str = "week0",
department: str = "general",
channel_id: str = "",
) -> dict:
import re
# Normalise date to YYYY-MM-DD regardless of input format
date_str = week0_start_date.strip()
# Try parsing common formats
for fmt in ("%Y-%m-%d", "%d %b %Y", "%d-%m-%Y", "%d/%m/%Y", "%B %d %Y"):
try:
from datetime import datetime
date_str = datetime.strptime(date_str, fmt).strftime("%Y-%m-%d")
break
except ValueError:
continue
ws = get_sheet(SHEET_CANDIDATES)
candidate_id = _gen_id("CAND")
row = [
candidate_id,
slack_user_id,
name,
stage,
cohort_id,
ft_slack_id,
hr_slack_id,
date_str,
"", # probation_start_date
"active", # status
0, # miss_count
_now(), # created_at
department,
channel_id,
]
ws.append_row(row, value_input_option="RAW", insert_data_option="INSERT_ROWS", table_range="A1")
# Return the constructed dict directly β€” avoids get_all_records rate limit issue
return {
"candidate_id": candidate_id,
"slack_user_id": slack_user_id,
"name": name,
"stage": stage,
"cohort_id": cohort_id,
"ft_slack_id": ft_slack_id,
"hr_slack_id": hr_slack_id,
"week0_start_date": date_str,
"probation_start_date": "",
"status": "active",
"miss_count": 0,
"created_at": _now(),
"department": department,
"channel_id": channel_id,
}
def update_candidate_field(candidate_id: str, field: str, value) -> None:
"""Update a single field on a candidate row by candidate_id."""
ws = get_sheet(SHEET_CANDIDATES)
records = ws.get_all_records(expected_headers=CANDIDATES_HEADERS)
headers = ws.row_values(1)
col_index = headers.index(field) + 1 # 1-based
for i, row in enumerate(records, start=2):
if row["candidate_id"] == candidate_id:
ws.update_cell(i, col_index, value)
return
raise ValueError(f"candidate_id {candidate_id} not found")
def increment_miss_count(candidate_id: str) -> int:
ws = get_sheet(SHEET_CANDIDATES)
records = ws.get_all_records(expected_headers=CANDIDATES_HEADERS)
headers = ws.row_values(1)
miss_col = headers.index("miss_count") + 1
for i, row in enumerate(records, start=2):
if row["candidate_id"] == candidate_id:
new_count = int(row["miss_count"]) + 1
ws.update_cell(i, miss_col, new_count)
return new_count
raise ValueError(f"candidate_id {candidate_id} not found")
# ─────────────────────────────────────────────
# REPORTS LOG
# ─────────────────────────────────────────────
def log_report(
candidate_id: str,
stage: str,
day_number: int,
report_number: int,
worked_on: str,
stuck_on: str,
tried: str,
current_status: str,
format_valid: bool,
quality_score: int,
quality_flags: str,
on_time: bool,
) -> str:
ws = get_sheet(SHEET_REPORTS)
report_id = _gen_id("RPT")
row = [
report_id,
candidate_id,
_now(),
stage,
day_number,
report_number,
worked_on,
stuck_on,
tried,
current_status,
str(format_valid),
quality_score,
quality_flags,
str(on_time),
]
ws.append_row(row)
return report_id
def get_reports_for_candidate(candidate_id: str, stage: str = None) -> list[dict]:
ws = get_sheet(SHEET_REPORTS)
records = ws.get_all_records()
results = [r for r in records if r["candidate_id"] == candidate_id]
if stage:
results = [r for r in results if r["stage"] == stage]
return results
def did_submit_in_window(candidate_id: str, window_start: str, window_end: str) -> bool:
"""Check if candidate submitted a valid report within the given time window."""
ws = get_sheet(SHEET_REPORTS)
records = ws.get_all_records()
for r in records:
if r["candidate_id"] != candidate_id:
continue
if r["format_valid"] != "True":
continue
submitted = r["submitted_at"]
if window_start <= submitted <= window_end:
return True
return False
# ─────────────────────────────────────────────
# SIGNALS LOG
# ─────────────────────────────────────────────
def log_signal(
candidate_id: str,
day_number: int,
signal_name: str,
ft_answer: str,
signal_value: str,
source: str = "ft_answer",
) -> str:
ws = get_sheet(SHEET_SIGNALS)
signal_id = _gen_id("SIG")
row = [
signal_id,
candidate_id,
day_number,
signal_name,
ft_answer,
signal_value, # strong | weak | flag
_now(),
source,
]
ws.append_row(row)
return signal_id
def get_signals_for_candidate(candidate_id: str, week: str = None) -> list[dict]:
"""
week: 'w1' = days 1-5, 'w2' = days 6-10, None = all
"""
ws = get_sheet(SHEET_SIGNALS)
records = ws.get_all_records()
results = [r for r in records if r["candidate_id"] == candidate_id]
if week == "w1":
results = [r for r in results if int(r["day_number"]) <= 5]
elif week == "w2":
results = [r for r in results if int(r["day_number"]) >= 6]
return results
# ─────────────────────────────────────────────
# WARNINGS LOG
# ─────────────────────────────────────────────
def log_warning(
candidate_id: str,
warning_type: str,
stage: str,
trigger_event: str,
message_sent: str,
hr_notified: bool,
) -> str:
ws = get_sheet(SHEET_WARNINGS)
warning_id = _gen_id("WARN")
row = [
warning_id,
candidate_id,
warning_type,
stage,
trigger_event,
message_sent,
_now(),
str(hr_notified),
]
ws.append_row(row)
return warning_id
def get_warnings_for_candidate(candidate_id: str) -> list[dict]:
ws = get_sheet(SHEET_WARNINGS)
records = ws.get_all_records()
return [r for r in records if r["candidate_id"] == candidate_id]
# ─────────────────────────────────────────────
# COMMITMENTS
# ─────────────────────────────────────────────
def log_commitment(
candidate_id: str,
description: str,
due_date: str,
due_time: str,
) -> str:
ws = get_sheet(SHEET_COMMITMENTS)
commitment_id = _gen_id("COM")
row = [
commitment_id,
candidate_id,
description,
due_date,
due_time,
_now(), # logged_at
"", # delivered
"", # delivered_at
"", # flagged_proactively
"", # outcome
]
ws.append_row(row)
return commitment_id
def update_commitment_outcome(
commitment_id: str,
delivered: bool,
delivered_at: str,
flagged_proactively: bool,
outcome: str,
) -> None:
ws = get_sheet(SHEET_COMMITMENTS)
records = ws.get_all_records()
headers = ws.row_values(1)
for i, row in enumerate(records, start=2):
if row["commitment_id"] == commitment_id:
updates = {
"delivered": str(delivered),
"delivered_at": delivered_at,
"flagged_proactively": str(flagged_proactively),
"outcome": outcome,
}
for field, value in updates.items():
col = headers.index(field) + 1
ws.update_cell(i, col, value)
return
def get_due_commitments(due_date: str) -> list[dict]:
"""Return all commitments due on a given date with no outcome yet."""
ws = get_sheet(SHEET_COMMITMENTS)
records = ws.get_all_records()
return [
r for r in records
if r["due_date"] == due_date and r["outcome"] == ""
]
def get_commitments_for_candidate(candidate_id: str) -> list[dict]:
ws = get_sheet(SHEET_COMMITMENTS)
records = ws.get_all_records()
return [r for r in records if r["candidate_id"] == candidate_id]
# ─────────────────────────────────────────────
# EVALUATIONS
# ─────────────────────────────────────────────
def log_evaluation(
candidate_id: str,
eval_type: str, # week0 | probation_w1 | probation_w2
signals: dict, # {signal_name: {value, evidence}}
summary_text: str,
recommendation: str,
recommendation_reasoning: str,
) -> str:
ws = get_sheet(SHEET_EVALUATIONS)
eval_id = _gen_id("EVAL")
# Flatten signals into ordered columns
signal_values = [
f"{k}: {v['value']} β€” {v['evidence']}"
for k, v in signals.items()
]
# Pad to 6 signal columns
while len(signal_values) < 6:
signal_values.append("")
row = [
eval_id,
candidate_id,
eval_type,
*signal_values[:6],
summary_text,
recommendation,
recommendation_reasoning,
_now(), # generated_at
"", # hr_confirmed
"", # hr_confirmed_at
"", # outcome_communicated
]
ws.append_row(row)
return eval_id
def get_latest_evaluation(candidate_id: str, eval_type: str) -> Optional[dict]:
ws = get_sheet(SHEET_EVALUATIONS)
records = ws.get_all_records()
matches = [
r for r in records
if r["candidate_id"] == candidate_id and r["eval_type"] == eval_type
]
return matches[-1] if matches else None
def confirm_evaluation(eval_id: str) -> None:
ws = get_sheet(SHEET_EVALUATIONS)
records = ws.get_all_records()
headers = ws.row_values(1)
for i, row in enumerate(records, start=2):
if row["eval_id"] == eval_id:
ws.update_cell(i, headers.index("hr_confirmed") + 1, "True")
ws.update_cell(i, headers.index("hr_confirmed_at") + 1, _now())
return
def mark_outcome_communicated(eval_id: str) -> None:
ws = get_sheet(SHEET_EVALUATIONS)
records = ws.get_all_records()
headers = ws.row_values(1)
for i, row in enumerate(records, start=2):
if row["eval_id"] == eval_id:
ws.update_cell(i, headers.index("outcome_communicated") + 1, "True")
return
# ─────────────────────────────────────────────
# DECISIONS LOG
# ─────────────────────────────────────────────
def log_decision(
candidate_id: str,
decision_type: str,
recommended_by: str,
confirmed_by: str,
message_posted: str,
stage_at_decision: str,
overridden: bool = False,
) -> str:
ws = get_sheet(SHEET_DECISIONS)
decision_id = _gen_id("DEC")
row = [
decision_id,
candidate_id,
decision_type,
recommended_by,
confirmed_by,
message_posted,
_now(),
stage_at_decision,
str(overridden),
]
ws.append_row(row)
return decision_id
def get_pending_recommendation(candidate_id: str) -> Optional[dict]:
"""
Returns the most recent evaluation that has been generated
but not yet HR-confirmed β€” i.e. awaiting HR approval.
"""
ws = get_sheet(SHEET_EVALUATIONS)
records = ws.get_all_records()
pending = [
r for r in records
if r["candidate_id"] == candidate_id
and r["hr_confirmed"] == ""
and r["recommendation"] != ""
]
return pending[-1] if pending else None