Spaces:
Sleeping
Sleeping
| """Round 2 task generation and distribution script.""" | |
| import json | |
| from datetime import datetime | |
| import httpx | |
| from instructor.database import Database | |
| from instructor.task_templates import TaskTemplateManager | |
| from shared.config import settings | |
| from shared.logger import setup_logger | |
| from shared.models import Attachment, TaskRequest | |
| from shared.utils import generate_nonce | |
| logger = setup_logger(__name__) | |
| class Round2TaskGenerator: | |
| """Generate and send round 2 tasks to students.""" | |
| def __init__(self) -> None: | |
| """Initialize task generator.""" | |
| self.db = Database() | |
| self.template_manager = TaskTemplateManager() | |
| def get_round1_repos(self) -> list[dict]: | |
| """Get all round 1 repository submissions. | |
| Returns: | |
| List of repo dictionaries | |
| """ | |
| repos = [] | |
| session = self.db.get_session() | |
| try: | |
| round1_repos = session.query(self.db.Repo).filter_by(round=1).all() | |
| repos = [repo.to_dict() for repo in round1_repos] | |
| finally: | |
| session.close() | |
| logger.info(f"Found {len(repos)} round 1 repos") | |
| return repos | |
| def generate_round2_task(self, repo: dict) -> tuple[TaskRequest, dict]: | |
| """Generate round 2 task for a repo. | |
| Args: | |
| repo: Round 1 repo submission | |
| Returns: | |
| Tuple of (task_request, submission_data) or None if skipped | |
| """ | |
| email = repo["email"] | |
| task_id = repo["task"] | |
| # Check if round 2 task already exists | |
| if self.db.task_exists(email, task_id, round=2): | |
| logger.info(f"Round 2 task already exists for {email}/{task_id}, skipping") | |
| return None | |
| # Get the round 1 task to find the template | |
| session = self.db.get_session() | |
| try: | |
| round1_task = ( | |
| session.query(self.db.Task) | |
| .filter_by(email=email, task=task_id, round=1) | |
| .first() | |
| ) | |
| if not round1_task: | |
| logger.warning(f"No round 1 task found for {email}/{task_id}") | |
| return None | |
| # Extract template ID from task ID | |
| template_id = task_id.rsplit("-", 1)[0] | |
| # Generate round 2 task from same template | |
| task_data = self.template_manager.generate_task( | |
| email, template_id=template_id, round_num=2 | |
| ) | |
| # Create task request | |
| nonce = generate_nonce() | |
| attachments = [Attachment(**att) for att in task_data["attachments"]] | |
| task_request = TaskRequest( | |
| email=email, | |
| secret=round1_task.secret, | |
| task=task_id, # Keep same task ID | |
| round=2, | |
| nonce=nonce, | |
| brief=task_data["brief"], | |
| checks=task_data["checks"], | |
| evaluation_url=settings.evaluation_api_url, | |
| attachments=attachments, | |
| ) | |
| submission_data = {"endpoint": round1_task.endpoint, "secret": round1_task.secret} | |
| logger.info(f"Generated round 2 task for {email}: {task_id}") | |
| return task_request, submission_data | |
| except Exception as e: | |
| logger.error(f"Error generating round 2 task for {email}/{task_id}: {e}") | |
| return None | |
| finally: | |
| session.close() | |
| async def send_task_request( | |
| self, task_request: TaskRequest, submission: dict | |
| ) -> int: | |
| """Send task request to student endpoint. | |
| Args: | |
| task_request: Task request to send | |
| submission: Submission data with endpoint | |
| Returns: | |
| HTTP status code | |
| """ | |
| endpoint = submission["endpoint"] | |
| logger.info(f"Sending round 2 task to {endpoint}") | |
| try: | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| response = await client.post( | |
| endpoint, | |
| json=task_request.model_dump(), | |
| headers={"Content-Type": "application/json"}, | |
| ) | |
| status_code = response.status_code | |
| logger.info( | |
| f"Round 2 task sent to {endpoint}: " | |
| f"status {status_code}, response: {response.text[:200]}" | |
| ) | |
| return status_code | |
| except Exception as e: | |
| logger.error(f"Failed to send round 2 task to {endpoint}: {e}") | |
| return 0 | |
| def save_task_record( | |
| self, task_request: TaskRequest, submission: dict, status_code: int | |
| ) -> None: | |
| """Save task record to database. | |
| Args: | |
| task_request: Task request | |
| submission: Submission data | |
| status_code: HTTP status code from response | |
| """ | |
| task_data = { | |
| "timestamp": datetime.utcnow(), | |
| "email": task_request.email, | |
| "task": task_request.task, | |
| "round": task_request.round, | |
| "nonce": task_request.nonce, | |
| "brief": task_request.brief, | |
| "attachments": json.dumps([att.model_dump() for att in task_request.attachments]), | |
| "checks": json.dumps(task_request.checks), | |
| "evaluation_url": task_request.evaluation_url, | |
| "endpoint": submission["endpoint"], | |
| "statuscode": status_code, | |
| "secret": submission["secret"], | |
| } | |
| self.db.add_task(task_data) | |
| logger.info(f"Saved round 2 task record: {task_request.task}") | |
| async def process_repo(self, repo: dict) -> None: | |
| """Process a single repo for round 2. | |
| Args: | |
| repo: Repo submission data | |
| """ | |
| try: | |
| # Generate task | |
| result = self.generate_round2_task(repo) | |
| if result is None: | |
| return # Already processed or error | |
| task_request, submission = result | |
| # Send task | |
| status_code = await self.send_task_request(task_request, submission) | |
| # Save record | |
| self.save_task_record(task_request, submission, status_code) | |
| if status_code == 200: | |
| logger.info(f"Successfully sent round 2 task to {repo['email']}") | |
| else: | |
| logger.warning( | |
| f"Failed to send round 2 task to {repo['email']}: status {status_code}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error processing repo {repo['task']}: {e}", exc_info=True) | |
| async def run(self) -> None: | |
| """Run round 2 task generation.""" | |
| logger.info("Starting round 2 task generation") | |
| # Get round 1 repos | |
| repos = self.get_round1_repos() | |
| if not repos: | |
| logger.error("No round 1 repos to process") | |
| return | |
| # Process each repo | |
| for repo in repos: | |
| await self.process_repo(repo) | |
| logger.info("Round 2 task generation complete") | |
| async def main(): | |
| """Main entry point.""" | |
| generator = Round2TaskGenerator() | |
| await generator.run() | |
| if __name__ == "__main__": | |
| import asyncio | |
| asyncio.run(main()) | |