Spaces:
Runtime error
Runtime error
| """Google Sheets API service for fetching form responses.""" | |
| from __future__ import annotations | |
| import re | |
| import csv | |
| from io import StringIO | |
| from typing import Optional | |
| from datetime import datetime | |
| import httpx | |
| from google.oauth2.credentials import Credentials | |
| from google.auth.transport.requests import Request | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| from .database import get_oauth_tokens, save_oauth_tokens | |
| from .config import get_settings, GOOGLE_SCOPES | |
| def extract_sheet_id(url: str) -> Optional[str]: | |
| """Extract the spreadsheet ID from a Google Sheets or Forms URL.""" | |
| # Google Sheets URL pattern | |
| sheets_pattern = r'/spreadsheets/d/([a-zA-Z0-9-_]+)' | |
| sheets_match = re.search(sheets_pattern, url) | |
| if sheets_match: | |
| return sheets_match.group(1) | |
| # Google Forms URL pattern - need to find linked response sheet | |
| forms_pattern = r'/forms/d/e?/?([a-zA-Z0-9-_]+)' | |
| forms_match = re.search(forms_pattern, url) | |
| if forms_match: | |
| # For forms, we need to handle this differently | |
| # The user should provide the response sheet URL directly | |
| return None | |
| return None | |
| async def fetch_public_sheet(sheet_id: str, answer_key: Optional[dict] = None) -> dict: | |
| """ | |
| Fetch data from a PUBLIC Google Sheet (no OAuth needed). | |
| The sheet must be shared as "Anyone with the link can view". | |
| Uses the CSV export endpoint which works for public sheets. | |
| """ | |
| # Google Sheets public CSV export URL | |
| csv_url = f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv" | |
| try: | |
| async with httpx.AsyncClient(follow_redirects=True) as client: | |
| response = await client.get(csv_url, timeout=30.0) | |
| if response.status_code == 200: | |
| csv_content = response.text | |
| # Check if we got actual CSV data | |
| if csv_content.startswith('<!DOCTYPE') or '<html' in csv_content[:100].lower(): | |
| return { | |
| "error": "This sheet is not public. Please make it 'Anyone with the link can view' or use CSV export.", | |
| "needs_auth": False | |
| } | |
| # Parse the CSV | |
| result = parse_csv_responses(csv_content, answer_key) | |
| if "error" not in result: | |
| result["exam_title"] = f"Google Sheet {sheet_id[:8]}..." | |
| return result | |
| elif response.status_code == 404: | |
| return { | |
| "error": "Sheet not found. Please check the URL.", | |
| "needs_auth": False | |
| } | |
| else: | |
| return { | |
| "error": f"Could not access sheet (status {response.status_code}). Make sure it's set to 'Anyone with the link can view'.", | |
| "needs_auth": False | |
| } | |
| except httpx.TimeoutException: | |
| return { | |
| "error": "Request timed out. Please try again.", | |
| "needs_auth": False | |
| } | |
| except Exception as e: | |
| return { | |
| "error": f"Error fetching sheet: {str(e)}", | |
| "needs_auth": False | |
| } | |
| async def get_google_credentials(teacher_email: str) -> Optional[Credentials]: | |
| """Get valid Google credentials for a teacher.""" | |
| tokens = await get_oauth_tokens(teacher_email) | |
| if not tokens: | |
| return None | |
| settings = get_settings() | |
| credentials = Credentials( | |
| token=tokens.get("access_token"), | |
| refresh_token=tokens.get("refresh_token"), | |
| token_uri="https://oauth2.googleapis.com/token", | |
| client_id=settings.google_client_id, | |
| client_secret=settings.google_client_secret, | |
| scopes=GOOGLE_SCOPES, | |
| ) | |
| # Refresh if expired | |
| if credentials.expired and credentials.refresh_token: | |
| try: | |
| credentials.refresh(Request()) | |
| # Save updated tokens | |
| new_tokens = { | |
| "access_token": credentials.token, | |
| "refresh_token": credentials.refresh_token, | |
| } | |
| await save_oauth_tokens(teacher_email, new_tokens) | |
| except Exception as e: | |
| print(f"Error refreshing credentials: {e}") | |
| return None | |
| return credentials | |
| def normalize_question_type(header: str, sample_answers: list[str]) -> str: | |
| """Infer question type from header and sample answers.""" | |
| header_lower = header.lower() | |
| # Check for common patterns | |
| if any(word in header_lower for word in ['multiple choice', 'mcq', 'select']): | |
| return 'mcq' | |
| # Check if answers are numeric | |
| numeric_count = 0 | |
| for answer in sample_answers[:5]: # Check first 5 answers | |
| if answer: | |
| try: | |
| float(answer.replace(',', '')) | |
| numeric_count += 1 | |
| except ValueError: | |
| pass | |
| if numeric_count >= len([a for a in sample_answers[:5] if a]) * 0.8: | |
| return 'numeric' | |
| # Check for short answers (likely MCQ) vs long answers (open) | |
| avg_length = sum(len(a) for a in sample_answers if a) / max(len([a for a in sample_answers if a]), 1) | |
| if avg_length < 20: | |
| return 'mcq' | |
| return 'open' | |
| def extract_question_text(header: str) -> str: | |
| """Extract clean question text from a header.""" | |
| # Remove common prefixes like "Q1:", "1.", etc. | |
| cleaned = re.sub(r'^[Q]?\d+[\.\:\)]\s*', '', header) | |
| # Remove parenthetical scoring info like "(2 points)" | |
| cleaned = re.sub(r'\s*\(\d+\s*(?:points?|pts?|marks?)\)\s*$', '', cleaned, flags=re.IGNORECASE) | |
| return cleaned.strip() or header | |
| async def fetch_google_form_responses( | |
| google_form_url: str, | |
| teacher_email: str, | |
| answer_key: Optional[dict] = None | |
| ) -> dict: | |
| """ | |
| Fetch responses from a Google Form's response spreadsheet. | |
| Returns normalized exam JSON format: | |
| { | |
| "exam_title": str, | |
| "questions": [{"question_id", "question_text", "type", "choices", "correct_answer"}], | |
| "responses": [{"student_id", "student_name", "answers": {...}}] | |
| } | |
| """ | |
| credentials = await get_google_credentials(teacher_email) | |
| if not credentials: | |
| return { | |
| "error": "Not authorized. Please connect your Google account first.", | |
| "needs_auth": True | |
| } | |
| sheet_id = extract_sheet_id(google_form_url) | |
| if not sheet_id: | |
| return { | |
| "error": "Could not extract spreadsheet ID from URL. Please provide a valid Google Sheets response URL.", | |
| "needs_auth": False | |
| } | |
| try: | |
| # Build the Sheets API service | |
| service = build('sheets', 'v4', credentials=credentials) | |
| # Get spreadsheet metadata | |
| spreadsheet = service.spreadsheets().get(spreadsheetId=sheet_id).execute() | |
| title = spreadsheet.get('properties', {}).get('title', 'Untitled Exam') | |
| # Try to find "Form Responses 1" sheet, or use first sheet | |
| sheet_name = None | |
| for sheet in spreadsheet.get('sheets', []): | |
| props = sheet.get('properties', {}) | |
| name = props.get('title', '') | |
| if 'response' in name.lower() or 'form' in name.lower(): | |
| sheet_name = name | |
| break | |
| if not sheet_name: | |
| sheet_name = spreadsheet['sheets'][0]['properties']['title'] | |
| # Fetch all values | |
| result = service.spreadsheets().values().get( | |
| spreadsheetId=sheet_id, | |
| range=f"'{sheet_name}'!A:ZZ" | |
| ).execute() | |
| values = result.get('values', []) | |
| if not values or len(values) < 2: | |
| return { | |
| "error": "No responses found in the spreadsheet.", | |
| "needs_auth": False | |
| } | |
| # First row is headers | |
| headers = values[0] | |
| responses_data = values[1:] | |
| # Identify columns | |
| # Typically: Timestamp, Email, Name, Q1, Q2, ... | |
| timestamp_col = None | |
| email_col = None | |
| name_col = None | |
| question_cols = [] | |
| for idx, header in enumerate(headers): | |
| header_lower = header.lower() | |
| if 'timestamp' in header_lower or 'time' in header_lower: | |
| timestamp_col = idx | |
| elif 'email' in header_lower and email_col is None: | |
| email_col = idx | |
| elif any(word in header_lower for word in ['name', 'student', 'your name']): | |
| name_col = idx | |
| else: | |
| # This is likely a question | |
| question_cols.append((idx, header)) | |
| # Build questions list | |
| questions = [] | |
| for q_idx, (col_idx, header) in enumerate(question_cols): | |
| question_id = f"Q{q_idx + 1}" | |
| # Sample answers for type detection | |
| sample_answers = [row[col_idx] if col_idx < len(row) else "" for row in responses_data[:10]] | |
| question = { | |
| "question_id": question_id, | |
| "question_text": extract_question_text(header), | |
| "type": normalize_question_type(header, sample_answers), | |
| "choices": [], # Would need to parse from form structure | |
| "correct_answer": "" | |
| } | |
| # Apply answer key if provided | |
| if answer_key and question_id in answer_key: | |
| question["correct_answer"] = answer_key[question_id] | |
| questions.append(question) | |
| # Build responses list | |
| responses = [] | |
| for row_idx, row in enumerate(responses_data): | |
| student_id = f"S{row_idx + 1:02d}" | |
| # Get student name | |
| if name_col is not None and name_col < len(row): | |
| student_name = row[name_col] | |
| elif email_col is not None and email_col < len(row): | |
| # Use email prefix as name | |
| email = row[email_col] | |
| student_name = email.split('@')[0] if '@' in email else email | |
| else: | |
| student_name = f"Student {row_idx + 1}" | |
| # Get answers | |
| answers = {} | |
| for q_idx, (col_idx, _) in enumerate(question_cols): | |
| question_id = f"Q{q_idx + 1}" | |
| answer = row[col_idx] if col_idx < len(row) else "" | |
| answers[question_id] = answer | |
| responses.append({ | |
| "student_id": student_id, | |
| "student_name": student_name, | |
| "answers": answers | |
| }) | |
| return { | |
| "exam_title": title, | |
| "questions": questions, | |
| "responses": responses | |
| } | |
| except HttpError as e: | |
| if e.resp.status == 403: | |
| return { | |
| "error": "Access denied. Please ensure the spreadsheet is shared with your Google account.", | |
| "needs_auth": False | |
| } | |
| elif e.resp.status == 404: | |
| return { | |
| "error": "Spreadsheet not found. Please check the URL.", | |
| "needs_auth": False | |
| } | |
| else: | |
| return { | |
| "error": f"Google API error: {str(e)}", | |
| "needs_auth": False | |
| } | |
| except Exception as e: | |
| return { | |
| "error": f"Error fetching responses: {str(e)}", | |
| "needs_auth": False | |
| } | |
| def parse_csv_responses(csv_content: str, answer_key: Optional[dict] = None) -> dict: | |
| """ | |
| Parse CSV content into normalized exam format. | |
| Fallback when OAuth is not available. | |
| """ | |
| import csv | |
| from io import StringIO | |
| reader = csv.reader(StringIO(csv_content)) | |
| rows = list(reader) | |
| if not rows or len(rows) < 2: | |
| return {"error": "CSV must have at least a header row and one response."} | |
| headers = rows[0] | |
| responses_data = rows[1:] | |
| # Same logic as Google Sheets parsing | |
| timestamp_col = None | |
| email_col = None | |
| name_col = None | |
| question_cols = [] | |
| for idx, header in enumerate(headers): | |
| header_lower = header.lower() | |
| if 'timestamp' in header_lower: | |
| timestamp_col = idx | |
| elif 'email' in header_lower and email_col is None: | |
| email_col = idx | |
| elif any(word in header_lower for word in ['name', 'student']): | |
| name_col = idx | |
| else: | |
| question_cols.append((idx, header)) | |
| # Build questions | |
| questions = [] | |
| for q_idx, (col_idx, header) in enumerate(question_cols): | |
| question_id = f"Q{q_idx + 1}" | |
| sample_answers = [row[col_idx] if col_idx < len(row) else "" for row in responses_data[:10]] | |
| question = { | |
| "question_id": question_id, | |
| "question_text": extract_question_text(header), | |
| "type": normalize_question_type(header, sample_answers), | |
| "choices": [], | |
| "correct_answer": answer_key.get(question_id, "") if answer_key else "" | |
| } | |
| questions.append(question) | |
| # Build responses | |
| responses = [] | |
| for row_idx, row in enumerate(responses_data): | |
| student_id = f"S{row_idx + 1:02d}" | |
| if name_col is not None and name_col < len(row): | |
| student_name = row[name_col] | |
| elif email_col is not None and email_col < len(row): | |
| email = row[email_col] | |
| student_name = email.split('@')[0] if '@' in email else email | |
| else: | |
| student_name = f"Student {row_idx + 1}" | |
| answers = {} | |
| for q_idx, (col_idx, _) in enumerate(question_cols): | |
| question_id = f"Q{q_idx + 1}" | |
| answer = row[col_idx] if col_idx < len(row) else "" | |
| answers[question_id] = answer | |
| responses.append({ | |
| "student_id": student_id, | |
| "student_name": student_name, | |
| "answers": answers | |
| }) | |
| return { | |
| "exam_title": "Uploaded Exam", | |
| "questions": questions, | |
| "responses": responses | |
| } | |