Spaces:
Runtime error
Runtime error
| # app.py β handles images, PDFs, text/code, Excel, audio, etc. | |
| import os, json, time, io, tempfile, mimetypes | |
| from functools import lru_cache | |
| import gradio as gr | |
| import requests | |
| import pandas as pd | |
| from openai import OpenAI, RateLimitError, APIError | |
| from duckduckgo_search import DDGS | |
| from PyPDF2 import PdfReader | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| OPENAI_MODEL = "gpt-4o-mini" | |
| TEXT_LIMIT = 8_000 | |
| PDF_PAGES = 3 | |
| AUDIO_SIZE_CAP = 16 * 1024 * 1024 # 16 MB | |
| # βββββββββββββββ helpers βββββββββββββββ | |
| def duckduckgo_search(query: str, max_results: int = 5) -> str: | |
| with DDGS() as ddgs: | |
| hits = [f"- {r['title']} β {r['href']}" | |
| for r in ddgs.text(query, max_results=max_results)] | |
| return "\n".join(hits) or "No results found." | |
| DDG_SCHEMA = { | |
| "name": "duckduckgo_search", | |
| "description": "Search the web for up-to-date info.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": {"type": "string"}, | |
| "max_results": {"type": "integer", "default": 5}, | |
| }, | |
| "required": ["query"], | |
| }, | |
| } | |
| def download_bytes(url: str, cap: int | None = None) -> bytes: | |
| r = requests.get(url, timeout=20) | |
| r.raise_for_status() | |
| data = r.content | |
| if cap and len(data) > cap: | |
| raise ValueError("File too large") | |
| return data | |
| def extract_text_file(url: str) -> str: | |
| try: | |
| txt = download_bytes(url).decode(errors="replace") | |
| return txt[:TEXT_LIMIT] | |
| except Exception as e: | |
| return f"[Could not fetch text file: {e}]" | |
| def extract_pdf(url: str) -> str: | |
| try: | |
| reader = PdfReader(io.BytesIO(download_bytes(url))) | |
| pages = [reader.pages[i].extract_text() or "" for i in range(min(PDF_PAGES, len(reader.pages)))] | |
| return ("\n\n".join(pages))[:TEXT_LIMIT] | |
| except Exception as e: | |
| return f"[Could not read PDF: {e}]" | |
| def extract_excel(url: str) -> str: | |
| try: | |
| buf = io.BytesIO(download_bytes(url)) | |
| df = pd.read_excel(buf, nrows=15, engine="openpyxl") | |
| return df.to_csv(index=False, header=True)[:TEXT_LIMIT] | |
| except Exception as e: | |
| return f"[Could not read Excel: {e}]" | |
| def transcribe_audio(url: str, client: OpenAI) -> str: | |
| try: | |
| data = download_bytes(url, cap=AUDIO_SIZE_CAP) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".audio") as tmp: | |
| tmp.write(data); tmp.flush() | |
| tr = client.audio.transcriptions.create(model="whisper-1", file=open(tmp.name, "rb")) | |
| return tr.text[:2000] | |
| except Exception as e: | |
| return f"[Could not transcribe audio: {e}]" | |
| # βββββββββββββββ Agent βββββββββββββββ | |
| class GPT4oMiniAgent: | |
| def __init__(self, retries=3, backoff=2.0): | |
| key = os.getenv("OPENAI_API_KEY") | |
| if not key: | |
| raise EnvironmentError("Add OPENAI_API_KEY in Space Secrets") | |
| self.client, self.retries, self.backoff = OpenAI(api_key=key), retries, backoff | |
| self.system_prompt = ( | |
| "You are a concise, accurate assistant. If certain, answer directly; " | |
| "if not, call duckduckgo_search first." | |
| ) | |
| def __call__(self, question: str, file_url: str | None = None) -> str: | |
| user_parts = [{"type": "text", "text": question}] | |
| if file_url: | |
| ext = os.path.splitext(file_url.split("?")[0].split("#")[0])[1].lower() | |
| if ext in {".png", ".jpg", ".jpeg", ".gif", ".webp"}: | |
| user_parts.append({"type": "image_url", "image_url": {"url": file_url}}) | |
| elif ext in {".pdf"}: | |
| user_parts.append({"type": "text", "text": "(PDF extract)\n" + extract_pdf(file_url)}) | |
| elif ext in {".xls", ".xlsx"}: | |
| user_parts.append({"type": "text", "text": "(Excel preview)\n" + extract_excel(file_url)}) | |
| elif ext in {".txt", ".py", ".md", ".json", ".csv", ".html"}: | |
| user_parts.append({"type": "text", "text": "(File content)\n" + extract_text_file(file_url)}) | |
| elif ext in {".mp3", ".wav", ".m4a", ".flac", ".ogg"}: | |
| user_parts.append({"type": "text", "text": "(Audio transcript)\n" + transcribe_audio(file_url, self.client)}) | |
| else: | |
| user_parts.append({"type": "text", "text": f"[File available: {file_url}]"} ) | |
| msgs = [ | |
| {"role": "system", "content": self.system_prompt}, | |
| {"role": "user", "content": user_parts}, | |
| ] | |
| resp = self._chat(msgs, tools=[DDG_SCHEMA], tool_choice="auto") | |
| if resp.choices[0].message.tool_calls: | |
| for call in resp.choices[0].message.tool_calls: | |
| args = json.loads(call.function.arguments or "{}") | |
| search_out = duckduckgo_search(**args) | |
| msgs.append({"role": "tool", "tool_call_id": call.id, "name": call.function.name, "content": search_out}) | |
| resp = self._chat(msgs) | |
| return resp.choices[0].message.content.strip() | |
| def _chat(self, messages, **kw): | |
| for i in range(1, self.retries + 1): | |
| try: | |
| return self.client.chat.completions.create( | |
| model=OPENAI_MODEL, messages=messages, | |
| temperature=0.0, max_tokens=512, **kw | |
| ) | |
| except (RateLimitError, APIError): | |
| time.sleep(self.backoff * i) | |
| raise RuntimeError("OpenAI API failed after retries.") | |
| # βββββββββββββββ pipeline βββββββββββββββ | |
| def run_and_submit_all(profile: gr.OAuthProfile | None): | |
| if not profile: | |
| return "Please log in β", None | |
| username = profile.username | |
| agent = GPT4oMiniAgent() | |
| space_id = os.getenv("SPACE_ID", "local") | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
| questions = requests.get(f"{DEFAULT_API_URL}/questions", timeout=15).json() | |
| rows, answers = [], [] | |
| for q in questions: | |
| qid = q["task_id"] | |
| qtext = q["question"] | |
| fileu = q.get("filename") or q.get("file_url") | |
| ans = agent(qtext, fileu) | |
| answers.append({"task_id": qid, "submitted_answer": ans}) | |
| rows.append({"Task ID": qid, "Question": qtext, "File": fileu or "", "Answer": ans}) | |
| payload = {"username": username, "agent_code": agent_code, "answers": answers} | |
| res = requests.post(f"{DEFAULT_API_URL}/submit", json=payload, timeout=60).json() | |
| status = f"Score {res['score']} % ({res['correct_count']}/{res['total_attempted']})" | |
| return status, pd.DataFrame(rows) | |
| # βββββββββββββββ UI βββββββββββββββ | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Unit-4 Agent β images, PDFs, Excel, audio, text, etc.") | |
| gr.LoginButton() | |
| run = gr.Button("Run Evaluation & Submit All Answers") | |
| out_status = gr.Textbox(label="Status", interactive=False) | |
| out_table = gr.DataFrame(label="Log", wrap=True) | |
| run.click(run_and_submit_all, outputs=[out_status, out_table]) | |
| if __name__ == "__main__": | |
| demo.launch(debug=True, share=False) | |