Spaces:
Sleeping
Sleeping
| import os, json, re, random | |
| import uuid | |
| import time | |
| import logging | |
| from typing import Literal, List, Dict, Any, Optional | |
| from pydantic import BaseModel, Field, ValidationError | |
| from crewai import Agent, Task, Crew, Process | |
| from crewai.tools import tool | |
| from crewai.llm import LLM | |
| import dotenv | |
| dotenv.load_dotenv("api_key.env") | |
| # ============================================================ | |
| # Guardrails: logging, retries, deterministic config | |
| # ============================================================ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s | %(levelname)s | %(message)s", | |
| ) | |
| logger = logging.getLogger("smart_tutor_guardrails") | |
| DETERMINISTIC_TEMPERATURE = float(os.getenv("DETERMINISTIC_TEMPERATURE", "0.1")) | |
| TOOL_MAX_RETRIES = int(os.getenv("TOOL_MAX_RETRIES", "2")) | |
| # ============================================================ | |
| # Guardrails: rate limits / timeouts / policies | |
| # ============================================================ | |
| MAX_FILE_SIZE_MB = int(os.getenv("MAX_FILE_SIZE_MB", "500")) | |
| MAX_PDF_PAGES = int(os.getenv("MAX_PDF_PAGES", "2000")) | |
| PDF_EXTRACTION_TIMEOUT = float(os.getenv("PDF_EXTRACTION_TIMEOUT", "200")) # seconds | |
| ALLOWED_TOOLS = {"process_file", "store_quiz", "grade_quiz"} | |
| PROMPT_INJECTION_PATTERNS = [ | |
| "ignore previous instructions", | |
| "ignore all previous instructions", | |
| "system:", | |
| "assistant:", | |
| "developer:", | |
| "act as", | |
| "you must", | |
| "follow these instructions", | |
| "override", | |
| ] | |
| # ============================================================ | |
| # Helpers | |
| # ============================================================ | |
| def clean_text(text: str) -> str: | |
| text = text.replace("\x00", " ") | |
| text = re.sub(r"[ \t]+", " ", text) | |
| text = re.sub(r"\n{3,}", "\n\n", text) | |
| return text.strip() | |
| def detect_prompt_injection(text: str) -> bool: | |
| lower = text.lower() | |
| return any(p in lower for p in PROMPT_INJECTION_PATTERNS) | |
| def chunk_text(text: str, max_chars: int = 1200, overlap: int = 150) -> List[str]: | |
| text = clean_text(text) | |
| if not text: | |
| return [] | |
| chunks = [] | |
| start = 0 | |
| n = len(text) | |
| while start < n: | |
| end = min(start + max_chars, n) | |
| part = text[start:end].strip() | |
| if part: | |
| chunks.append(part) | |
| if end == n: | |
| break | |
| start = max(0, end - overlap) | |
| return chunks | |
| def keyword_retrieve(chunks: List[str], query: str, top_k: int) -> List[str]: | |
| q_terms = [w for w in re.findall(r"\w+", query.lower()) if len(w) > 2] | |
| def score(c: str) -> int: | |
| c_l = c.lower() | |
| return sum(1 for t in q_terms if t in c_l) | |
| ranked = sorted(chunks, key=score, reverse=True) | |
| return [c for c in ranked[:top_k] if c] | |
| # ============================================================ | |
| # File extraction with limits + timeout | |
| # ============================================================ | |
| def extract_text(file_path: str) -> str: | |
| if os.path.getsize(file_path) > MAX_FILE_SIZE_MB * 1024 * 1024: | |
| raise ValueError(f"File too large (> {MAX_FILE_SIZE_MB} MB)") | |
| ext = os.path.splitext(file_path)[1].lower() | |
| if ext == ".txt": | |
| with open(file_path, "r", encoding="utf-8", errors="ignore") as f: | |
| return f.read() | |
| if ext == ".pdf": | |
| import fitz # PyMuPDF | |
| start_time = time.time() | |
| doc = fitz.open(file_path) | |
| if len(doc) > MAX_PDF_PAGES: | |
| raise ValueError(f"PDF exceeds max page limit ({MAX_PDF_PAGES})") | |
| parts = [] | |
| for i in range(len(doc)): | |
| if time.time() - start_time > PDF_EXTRACTION_TIMEOUT: | |
| raise TimeoutError("PDF extraction timeout") | |
| t = doc.load_page(i).get_text("text") or "" | |
| t = clean_text(t) | |
| if t: | |
| parts.append(t) | |
| return "\n\n".join(parts).strip() | |
| raise ValueError("Unsupported file type (PDF/TXT only).") | |
| # ============================================================ | |
| # Schemas (Structured Inputs / Outputs) | |
| # ============================================================ | |
| class ProcessArgs(BaseModel): | |
| file_path: str = Field(..., description="Local path to PDF/TXT") | |
| query: str = Field(..., description="User question or instruction") | |
| mode: Literal["summarize", "quiz", "explain"] = Field(..., description="Task type") | |
| top_k: int = Field(6, ge=1, le=15, description="How many chunks to use as context") | |
| class QuizQuestion(BaseModel): | |
| qid: str | |
| question: str | |
| options: Dict[Literal["A", "B", "C", "D"], str] | |
| correct: Literal["A", "B", "C", "D"] | |
| explanation: str = "" | |
| supporting_context: str = "" | |
| class StoreQuizArgs(BaseModel): | |
| file_path: str = Field( | |
| ..., description="The absolute file path of the document used" | |
| ) | |
| questions: List[QuizQuestion] | |
| class GradeQuizArgs(BaseModel): | |
| quiz_id: str | |
| answers: Dict[str, Literal["A", "B", "C", "D"]] | |
| class ToolError(BaseModel): | |
| error: str | |
| details: Optional[Any] = None | |
| class ProcessFileResult(BaseModel): | |
| mode: str | |
| query: str | |
| context_chunks: List[str] | |
| stats: Dict[str, Any] | |
| class StoreQuizResult(BaseModel): | |
| quiz_id: str | |
| questions: List[Dict[str, Any]] # masked questions | |
| class GradeQuizResult(BaseModel): | |
| quiz_id: str | |
| score: int | |
| total: int | |
| percentage: float | |
| file_path: Optional[str] = None | |
| details: List[Dict[str, Any]] | |
| # ============================================================ | |
| # Memory/State with Persistence | |
| # ============================================================ | |
| QUIZ_FILE = "quizzes_db.json" | |
| def load_quizzes(): | |
| if os.path.exists(QUIZ_FILE): | |
| try: | |
| with open(QUIZ_FILE, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except: | |
| return {} | |
| return {} | |
| def save_quizzes(data): | |
| try: | |
| with open(QUIZ_FILE, "w", encoding="utf-8") as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| logger.error(f"Failed to save quizzes: {e}") | |
| QUIZ_STORE: Dict[str, Dict[str, Any]] = load_quizzes() | |
| # ============================================================ | |
| # Tool wrapper: retries + logs + redaction | |
| # ============================================================ | |
| def _redact(obj: Any) -> Any: | |
| """Redact secrets + quiz answer key in logs.""" | |
| try: | |
| if isinstance(obj, dict): | |
| out = {} | |
| for k, v in obj.items(): | |
| lk = str(k).lower() | |
| if lk in {"openai_api_key", "api_key", "authorization", "x-api-key"}: | |
| out[k] = "***" | |
| elif lk == "correct": | |
| out[k] = "***" | |
| else: | |
| out[k] = _redact(v) | |
| return out | |
| if isinstance(obj, list): | |
| return [_redact(x) for x in obj] | |
| if isinstance(obj, str): | |
| key = os.getenv("OPENAI_API_KEY") or "" | |
| if key and key in obj: | |
| return obj.replace(key, "***") | |
| return obj | |
| return obj | |
| except Exception: | |
| return "<redacted>" | |
| def safe_tool_call(tool_name: str, fn): | |
| if tool_name not in ALLOWED_TOOLS: | |
| raise RuntimeError("Tool not allowed by policy") | |
| last_err = None | |
| for attempt in range(1, TOOL_MAX_RETRIES + 2): | |
| try: | |
| logger.info(f"[TOOL_CALL] {tool_name} attempt={attempt}") | |
| out = fn() | |
| logger.info( | |
| f"[TOOL_RESULT] {tool_name} attempt={attempt} out={json.dumps(_redact(out), ensure_ascii=False)[:900]}" | |
| ) | |
| return out | |
| except Exception as e: | |
| last_err = e | |
| logger.warning( | |
| f"[TOOL_ERROR] {tool_name} attempt={attempt} err={type(e).__name__}" | |
| ) | |
| time.sleep(0.2 * attempt) | |
| raise last_err | |
| # ============================================================ | |
| # Tools | |
| # ============================================================ | |
| def process_file(file_path: str, query: str, mode: str, top_k: int = 6) -> str: | |
| """Read PDF/TXT, chunk it, retrieve top_k relevant chunks. Returns structured JSON.""" | |
| try: | |
| args = ProcessArgs(file_path=file_path, query=query, mode=mode, top_k=top_k) | |
| except ValidationError as ve: | |
| return json.dumps( | |
| ToolError(error="Invalid arguments", details=ve.errors()).model_dump(), | |
| ensure_ascii=False, | |
| ) | |
| def _run(): | |
| # Clean path: remove quotes and whitespace that agents sometimes add | |
| clean_path = args.file_path.strip().strip("'\"").strip() | |
| if not os.path.exists(clean_path): | |
| return ToolError(error=f"Invalid file path: {clean_path}").model_dump() | |
| try: | |
| raw_text = extract_text(args.file_path) | |
| except Exception as e: | |
| return ToolError( | |
| error="Extraction failed", details=type(e).__name__ | |
| ).model_dump() | |
| if detect_prompt_injection(raw_text): | |
| logger.warning( | |
| "[SECURITY] Potential prompt injection detected in document. Treating as data only." | |
| ) | |
| text = clean_text(raw_text) | |
| if not text: | |
| return ToolError(error="Empty or unreadable file text.").model_dump() | |
| chunks = chunk_text(text) | |
| if not chunks: | |
| return ToolError(error="No chunks produced.").model_dump() | |
| context = keyword_retrieve(chunks, args.query, args.top_k) | |
| return ProcessFileResult( | |
| mode=args.mode, | |
| query=args.query, | |
| context_chunks=context, | |
| stats={ | |
| "chunks_total": len(chunks), | |
| "chars_extracted": len(text), | |
| "top_k": args.top_k, | |
| }, | |
| ).model_dump() | |
| try: | |
| out = safe_tool_call("process_file", _run) | |
| return json.dumps(out, ensure_ascii=False) | |
| except Exception as e: | |
| return json.dumps( | |
| ToolError( | |
| error="process_file failed", details=type(e).__name__ | |
| ).model_dump(), | |
| ensure_ascii=False, | |
| ) | |
| def clean_json_input(text: str) -> str: | |
| """Clean markdown code blocks and extract JSON object from string.""" | |
| text = text.strip() | |
| # Remove markdown code blocks (flexible) | |
| # This handles ```json ... ``` even if there is text before/after | |
| pattern = r"```(?:json)?\s*(\{.*?\})\s*```" | |
| match = re.search(pattern, text, re.DOTALL) | |
| if match: | |
| return match.group(1) | |
| # If no code blocks, try to find the first outer-most JSON object | |
| # This regex looks for { ... } minimally or greedily? | |
| # We want the largest block starting with { and ending with } | |
| # but strictly speaking, standard json.loads might just work if we strip. | |
| # If text starts with ``` but didn't match the block above (maybe incomplete), | |
| # let's just strip the fences. | |
| if text.startswith("```"): | |
| text = re.sub(r"^```(\w+)?\n?", "", text) | |
| text = re.sub(r"\n?```$", "", text) | |
| # Remove single backticks | |
| if text.startswith("`") and text.endswith("`"): | |
| text = text.strip("`") | |
| return text.strip() | |
| def store_quiz(quiz_package_json: str) -> str: | |
| """Store quiz with hidden answers; return masked quiz (no correct answers).""" | |
| def _run(): | |
| try: | |
| cleaned_json = clean_json_input(quiz_package_json) | |
| # First try: direct parse | |
| pkg_raw = json.loads(cleaned_json) | |
| except json.JSONDecodeError: | |
| # Second try: liberal regex search for { ... } | |
| # Use dotall and greedy to capture nested objects | |
| match = re.search(r"(\{.*\})", quiz_package_json, re.DOTALL) | |
| if match: | |
| try: | |
| pkg_raw = json.loads(match.group(1)) | |
| except json.JSONDecodeError as e: | |
| return ToolError( | |
| error=f"quiz_package_json is not valid JSON. Parse error: {str(e)}", | |
| details=f"Input fragment: {quiz_package_json[:200]}...", | |
| ).model_dump() | |
| else: | |
| return ToolError( | |
| error="quiz_package_json is not valid JSON (no braces found)", | |
| details=f"Input fragment: {quiz_package_json[:200]}...", | |
| ).model_dump() | |
| try: | |
| pkg = StoreQuizArgs(**pkg_raw) | |
| except ValidationError as ve: | |
| return ToolError( | |
| error="Invalid quiz_package_json", details=ve.errors() | |
| ).model_dump() | |
| quiz_id = str(uuid.uuid4()) | |
| # Randomize options for each question | |
| final_questions = [] | |
| for q in pkg.questions: | |
| # q is a QuizQuestion object | |
| original_options = q.options # dict e.g. {"A": "...", "B": "..."} | |
| original_correct_key = q.correct # "A" | |
| correct_text = original_options[original_correct_key] | |
| # Extract texts | |
| option_texts = list(original_options.values()) | |
| random.shuffle(option_texts) | |
| # Re-map to A, B, C, D | |
| new_options = {} | |
| new_correct_key = "" | |
| keys = ["A", "B", "C", "D"] | |
| # Handle cases with fewer than 4 options just in case | |
| for i, text in enumerate(option_texts): | |
| if i < len(keys): | |
| key = keys[i] | |
| new_options[key] = text | |
| if text == correct_text: | |
| new_correct_key = key | |
| # Update the question object (create a copy/dict) | |
| q_dump = q.model_dump() | |
| q_dump["options"] = new_options | |
| q_dump["correct"] = new_correct_key | |
| final_questions.append(q_dump) | |
| QUIZ_STORE[quiz_id] = { | |
| "file_path": pkg.file_path, | |
| "questions": final_questions, | |
| } | |
| save_quizzes(QUIZ_STORE) | |
| masked = [ | |
| {"qid": q["qid"], "question": q["question"], "options": q["options"]} | |
| for q in final_questions | |
| ] | |
| return StoreQuizResult(quiz_id=quiz_id, questions=masked).model_dump() | |
| try: | |
| out = safe_tool_call("store_quiz", _run) | |
| return json.dumps(out, ensure_ascii=False) | |
| except Exception as e: | |
| return json.dumps( | |
| ToolError(error="store_quiz failed", details=type(e).__name__).model_dump(), | |
| ensure_ascii=False, | |
| ) | |
| def grade_quiz(quiz_id: str, answers_json: str) -> str: | |
| """Grade quiz answers by quiz_id and answers_json. Returns score + details as structured JSON. | |
| Also returns 'file_path' and 'question' text for further processing.""" | |
| def _run(): | |
| if quiz_id not in QUIZ_STORE: | |
| return ToolError(error="Unknown quiz_id.").model_dump() | |
| try: | |
| cleaned_json = clean_json_input(answers_json) | |
| submitted_raw = json.loads(cleaned_json) | |
| except json.JSONDecodeError: | |
| # Fallback | |
| match = re.search(r"(\{.*\})", answers_json, re.DOTALL) | |
| if match: | |
| try: | |
| submitted_raw = json.loads(match.group(1)) | |
| except: | |
| return ToolError( | |
| error="answers_json is not valid JSON" | |
| ).model_dump() | |
| else: | |
| return ToolError(error="answers_json is not valid JSON").model_dump() | |
| try: | |
| args = GradeQuizArgs(quiz_id=quiz_id, answers=submitted_raw) | |
| except ValidationError as ve: | |
| return ToolError( | |
| error="Invalid answers_json", details=ve.errors() | |
| ).model_dump() | |
| stored_data = QUIZ_STORE[args.quiz_id] | |
| questions = stored_data["questions"] | |
| file_path = stored_data.get("file_path") | |
| total = len(questions) | |
| score = 0 | |
| details = [] | |
| for q in questions: | |
| qid = q["qid"] | |
| correct = q["correct"] | |
| question_text = q.get("question", "") | |
| your = (args.answers.get(qid) or "").strip().upper() | |
| is_correct = your == correct | |
| score += 1 if is_correct else 0 | |
| details.append( | |
| { | |
| "qid": qid, | |
| "question": question_text, # Added for Agent context | |
| "is_correct": is_correct, | |
| "your_answer": your, | |
| "correct_answer": correct, # NOTE: returned to tutor; OK for feedback | |
| "explanation": q.get("explanation", "") or "", | |
| "supporting_context": q.get("supporting_context", "") or "", | |
| } | |
| ) | |
| percentage = round((score / total) * 100, 2) if total else 0.0 | |
| return GradeQuizResult( | |
| quiz_id=args.quiz_id, | |
| score=score, | |
| total=total, | |
| percentage=percentage, | |
| file_path=file_path, | |
| details=details, | |
| ).model_dump() | |
| try: | |
| out = safe_tool_call("grade_quiz", _run) | |
| return json.dumps(out, ensure_ascii=False) | |
| except Exception as e: | |
| return json.dumps( | |
| ToolError(error="grade_quiz failed", details=type(e).__name__).model_dump(), | |
| ensure_ascii=False, | |
| ) | |
| # ============================================================ | |
| # CrewAI setup | |
| # ============================================================ | |
| llm = LLM( | |
| model="gpt-4o-mini", | |
| api_key=os.getenv("OPENAI_API_KEY"), | |
| temperature=DETERMINISTIC_TEMPERATURE, | |
| ) | |
| manager = Agent( | |
| role="Manager (Router)", | |
| goal=( | |
| "Route user request to the correct specialist co-worker." | |
| " Pass ALL user constraints (line count, " | |
| "paragraph count, language, etc.) to the specialist." | |
| ), | |
| backstory=( | |
| "You are a routing agent. You HAVE specialist co-workers: " | |
| "Summarizer, Quiz Maker, and Tutor. " | |
| "Your ONLY job is to delegate the task to the right co-worker " | |
| "using your delegation tool. " | |
| "NEVER answer the user yourself. NEVER use internal knowledge. " | |
| "Always forward the FULL user request including any constraints." | |
| ), | |
| allow_delegation=True, | |
| llm=llm, | |
| verbose=True, | |
| ) | |
| summarizer = Agent( | |
| role="Summarizer", | |
| goal=( | |
| "Produce a summary grounded strictly in " | |
| "context_chunks from process_file. STRICTLY " | |
| "follow any user constraints on length, " | |
| "number of lines, paragraphs, or format." | |
| ), | |
| backstory=( | |
| "Call process_file(mode=summarize) first. " | |
| "Summarize ONLY from context_chunks. " | |
| "If the user specifies constraints like " | |
| "'3 lines', '2 paragraphs', 'short', or " | |
| "'detailed', you MUST follow them exactly. " | |
| "Use bullet points (- or *) for lists instead of numbering. " | |
| "No outside knowledge." | |
| ), | |
| tools=[process_file], | |
| llm=llm, | |
| verbose=True, | |
| ) | |
| quizzer = Agent( | |
| role="Quiz Maker", | |
| goal="Generate EXACTLY the number of multiple-choice questions requested by the user, grounded strictly in process_file context.", | |
| backstory=( | |
| "STEP 1: Extract the EXACT number of questions from user request (e.g., '3 questions' = 3, default = 5).\n" | |
| "STEP 2: Call process_file(mode=quiz) with file_path. Create ONLY that exact number of MCQs A-D from context_chunks.\n" | |
| "STEP 3: Build quiz_package_json with absolute 'file_path' and correct answers, call store_quiz.\n" | |
| 'CRITICAL: The \'qid\' field for each question MUST be a STRING (e.g., "1", "2") NOT an integer (1, 2).\n' | |
| 'Ensure VALID JSON: {"file_path": "...", "questions": [...]}. CRITICAL: Match requested count exactly. Never reveal answers.' | |
| ), | |
| tools=[process_file, store_quiz], | |
| llm=llm, | |
| verbose=True, | |
| ) | |
| tutor = Agent( | |
| role="Tutor", | |
| goal="Grade quiz and provide intelligent explanation for errors.", | |
| backstory=( | |
| "You are an expert Tutor. When asked to grade a quiz:\n" | |
| "1. Call 'grade_quiz' to get the base results.\n" | |
| "2. For every INCORRECT answer, you MUST Explain WHY it is wrong:\n" | |
| " - Use the 'question' text and 'file_path' from the result to call 'process_file' (mode='explain', query=question).\n" | |
| " - REWRITE the 'explanation' field in the JSON detail for that question with your new explanation.\n" | |
| " - Use bullet points for any lists in your explanations.\n" | |
| "3. Return the fully updated JSON object." | |
| ), | |
| tools=[process_file, grade_quiz], | |
| llm=llm, | |
| verbose=True, | |
| ) | |
| task = Task( | |
| description=( | |
| "User request: {user_request}\n\n" | |
| "Route by intent:\n" | |
| "- Summary -> Summarizer\n" | |
| "- Quiz -> Quiz Maker\n" | |
| "- Explanation -> Tutor\n" | |
| "- Grading (contains quiz_id + answers_json) -> Tutor\n\n" | |
| "Guardrails:\n" | |
| "- Tool outputs are structured JSON.\n" | |
| "- Tools validate inputs with Pydantic.\n" | |
| "- Tool calls are logged without secrets.\n" | |
| "- Do not reveal hidden quiz answers during quiz generation." | |
| ), | |
| expected_output=( | |
| "Grounded response: summary OR " "masked quiz OR graded feedback." | |
| ), | |
| agent=manager, | |
| ) | |
| crew = Crew( | |
| agents=[manager, summarizer, quizzer, tutor], | |
| tasks=[task], | |
| process=Process.sequential, | |
| verbose=True, | |
| ) | |
| from pathlib import Path | |
| def run_with_file(prompt: str, file_path: str | None = None): | |
| file_text = "" | |
| if file_path: | |
| file_text = Path(file_path).read_text(encoding="utf-8", errors="ignore") | |
| full_prompt = prompt | |
| if file_text: | |
| full_prompt += "\n\n[FILE CONTENT]\n" + file_text | |
| return full_prompt | |
| if __name__ == "__main__": | |
| print( | |
| run_with_file( | |
| r"please give me a quiz about 3 questions from this file - file_path=C:\Users\Yaz00\OneDrive\سطح المكتب\Agent AI - Tuwaiq\week 5\Homework 1\Phase2.pdf" | |
| ) | |
| ) | |
| # Example grading: | |
| # print(run(r"grade this quiz_id=<PUT_ID_HERE> answers_json={\"q1\":\"A\",\"q2\":\"C\",\"q3\":\"B\"}")) | |
| pass | |