Spaces:
Build error
Build error
| print("Application script started.") # Debugging print statement | |
| import os | |
| import gradio as gr | |
| import requests | |
| import inspect | |
| import pandas as pd | |
| import cv2 # Import opencv-python for video processing | |
| import speech_recognition as sr # Import SpeechRecognition for audio processing | |
| from pydub import AudioSegment # Import pydub for audio manipulation | |
| import tempfile # Import tempfile for temporary file handling | |
| import numpy as np # Import numpy for image processing | |
| print("All libraries imported successfully.") # Debugging print | |
| # Import libraries for SerpAPI | |
| # Corrected import: Import GoogleSearch from google_search_results | |
| from google_search_results import GoogleSearch | |
| import google.generativeai as genai # Keep the import as the user might add LLM functionality back later | |
| print("SerpAPI and GenAI libraries imported successfully.") # Debugging print | |
| # Removed the import of google.colab.userdata as it's not available outside Colab | |
| # from google.colab import userdata # To access the API key from secrets | |
| # --- Get API Keys from Environment Variables --- | |
| # SERPAPI_API_KEY and GOOGLE_API_KEY should be set as secrets in your Hugging Face Space | |
| SERPAPI_API_KEY = os.getenv('SERPAPI_API_KEY') | |
| print(f"SERPAPI_API_KEY (first 5 chars): {SERPAPI_API_KEY[:5] if SERPAPI_API_KEY else 'None'}...") # Debugging API key | |
| # Access GOOGLE_API_KEY directly from environment variables using os.getenv() | |
| GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY') | |
| print(f"GOOGLE_API_KEY (first 5 chars): {GOOGLE_API_KEY[:5] if GOOGLE_API_KEY else 'None'}...") # Debugging API key | |
| print("API keys retrieved (or attempted).") # Debugging print | |
| # --- Define the default API URL --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" # Updated API URL | |
| print(f"DEFAULT_API_URL set to: {DEFAULT_API_URL}") # Debugging print | |
| # --- Google Generative AI LLM Initialization --- | |
| # Keep LLM initialization but handle potential errors and None state | |
| print("Attempting to initialize Google Generative AI model...") # Debugging print before loading | |
| gemini_model = None # Initialize to None | |
| # The check for GOOGLE_API_KEY and LLM configuration already uses os.getenv() | |
| if not GOOGLE_API_KEY: | |
| print("Warning: GOOGLE_API_KEY environment variable not set. LLM will not be available.") | |
| else: | |
| try: | |
| # Configure the generative AI library | |
| genai.configure(api_key=GOOGLE_API_KEY) | |
| print("Google Generative AI configured.") | |
| # Initialize the Generative Model | |
| # Using a fast and efficient model like gemini-1.5-flash | |
| # You can explore other models like 'gemini-1.5-pro' for potentially better results | |
| gemini_model = genai.GenerativeModel('gemini-1.5-flash') | |
| print("Gemini model initialized successfully.") # Debugging print after successful init | |
| except Exception as e: | |
| print(f"An error occurred during Google Generative AI initialization: {e}") | |
| gemini_model = None # Ensure model is None if initialization fails | |
| print("LLM initialization attempted.") # Debugging print | |
| # --- Web Search Function (using SerpAPI) --- | |
| print("Defining web_search function...") # Debugging print | |
| def web_search(query: str) -> list[dict]: | |
| # Removed global gemini_model declaration as it's not used here | |
| """ | |
| Performs a web search using SerpAPI and returns relevant information. | |
| Args: | |
| query: The search query string. | |
| Returns: | |
| A list of dictionaries, where each dictionary represents a search result | |
| with keys 'title', 'snippet', and 'url'. Returns an empty list if no | |
| results are found or an error occurs. | |
| """ | |
| print(f"web_search called with query: {query[:50]}...") # Debugging web_search call | |
| if not SERPAPI_API_KEY: | |
| print("SerpAPI key not found in environment variables.") | |
| return [] | |
| params = { | |
| "q": query, | |
| "api_key": SERPAPI_API_KEY, | |
| "engine": "google", # Use Google search engine | |
| "num": 5 # Number of results to fetch | |
| } | |
| results = [] | |
| try: | |
| search = GoogleSearch(params) # Use GoogleSearch from the correct package | |
| search_results_dict = search.get_dict() # Get results as a dictionary | |
| print(f"SerpAPI raw response keys: {search_results_dict.keys() if isinstance(search_results_dict, dict) else 'Response is not a dictionary'}") # Debugging response keys | |
| # Log the full SerpAPI response for debugging if organic_results is missing or empty | |
| if not isinstance(search_results_dict, dict) or "organic_results" not in search_results_dict or not isinstance(search_results_dict["organic_results"], list) or not search_results_dict["organic_results"]: | |
| print(f"SerpAPI response did not contain organic results or had invalid format. Response: {search_results_dict}") | |
| search_results = [] # Ensure search_results is empty if no organic results | |
| # Extract organic results | |
| # Add check that search_results_dict and organic_results are valid | |
| if isinstance(search_results_dict, dict) and "organic_results" in search_results_dict and isinstance(search_results_dict["organic_results"], list): | |
| print(f"Found {len(search_results_dict['organic_results'])} organic results.") # Debugging result count | |
| for result in search_results_dict["organic_results"]: | |
| # Add check for None or non-dict result item | |
| if result is None or not isinstance(result, dict): | |
| print(f"Skipping invalid search result item: {result}") | |
| continue | |
| item = { | |
| 'title': result.get('title'), | |
| 'url': result.get('link'), | |
| 'snippet': result.get('snippet', 'No snippet available') | |
| } | |
| search_results.append(item) # Append to search_results | |
| except Exception as e: | |
| print(f"An error occurred during SerpAPI web search: {e}") | |
| return f"An error occurred during web search: {e}" | |
| print(f"web_search returning {len(search_results)} results.") # Debugging return count | |
| return search_results # Always return a list (empty or with results) | |
| # --- Basic Agent Definition (Modified to remove LLM dependency for now) --- | |
| print("Defining BasicAgent class...") # Debugging print | |
| class BasicAgent: | |
| def __init__(self): | |
| print("BasicAgent initialized.") # Debugging print before init | |
| # Removed global gemini_model declaration as it's not used here | |
| # global gemini_model # Access global variable | |
| # if gemini_model is None: | |
| # print("Warning: Google Generative AI model not successfully loaded before agent initialization.") | |
| # else: | |
| # print("Google Generative AI model found and ready.") # Debugging print after successful init | |
| def process_video(self, video_source: str) -> str: | |
| """ | |
| Processes a video source (file path or URL), extracts frames, and | |
| performs placeholder visual analysis. | |
| Args: | |
| video_source: Path to the video file or a video URL. | |
| Returns: | |
| A string summarizing the video processing result or an error message. | |
| """ | |
| print(f"Processing video source: {video_source}") | |
| cap = None | |
| try: | |
| # Attempt to open the video source | |
| # Using cv2.CAP_FFMPEG might help with URLs, but requires FFmpeg | |
| # cap = cv2.VideoCapture(video_source, cv2.CAP_FFMPEG) | |
| cap = cv2.VideoCapture(video_source) | |
| # Check if the video was opened successfully | |
| if not cap.isOpened(): | |
| print(f"Error: Could not open video source {video_source}") | |
| return f"Error: Could not open video source {video_source}" | |
| frame_count = 0 | |
| while True: | |
| # Read a frame from the video | |
| ret, frame = cap.read() | |
| # If frame was not read successfully, we've reached the end of the video | |
| if not ret: | |
| print("End of video stream.") | |
| break | |
| frame_count += 1 | |
| # --- Placeholder for visual analysis --- | |
| # In a real application, you would perform analysis on the 'frame' object here. | |
| # This could involve object detection, scene recognition, etc. | |
| # Example placeholder: | |
| # gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| # Perform analysis on gray_frame | |
| if frame_count % 100 == 0: # Print progress every 100 frames | |
| print(f"Processed {frame_count} frames.") | |
| print(f"Finished processing video. Total frames extracted: {frame_count}") | |
| return f"Successfully processed video. Extracted {frame_count} frames." | |
| except Exception as e: | |
| print(f"An error occurred during video processing: {e}") | |
| return f"An error occurred during video processing: {e}" | |
| finally: | |
| # Release the video capture object | |
| if cap: | |
| cap.release() | |
| print("Video capture released.") | |
| def process_audio(self, audio_source: str) -> str: | |
| """ | |
| Processes an audio source (file path), extracts speech, and performs | |
| placeholder audio analysis. | |
| Args: | |
| audio_source: Path to the audio file. | |
| Returns: | |
| A string summarizing the audio processing result or an error message. | |
| """ | |
| print(f"Processing audio source: {audio_source}") | |
| recognizer = sr.Recognizer() | |
| try: | |
| # Load the audio file | |
| audio = AudioSegment.from_file(audio_source) | |
| print(f"Audio loaded. Duration: {len(audio)} ms") | |
| # Export to a format SpeechRecognition can handle (e.g., WAV) | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as fp: | |
| audio.export(fp.name, format="wav") | |
| temp_wav_file = fp.name | |
| print(f"Audio exported to temporary WAV: {temp_wav_file}") | |
| # Use SpeechRecognition to transcribe the audio | |
| with sr.AudioFile(temp_wav_file) as source: | |
| print("Reading audio file for transcription...") | |
| audio_data = recognizer.record(source) # read the entire audio file | |
| print("Audio data recorded.") | |
| # Attempt to recognize speech | |
| try: | |
| print("Attempting speech recognition...") | |
| text = recognizer.recognize_google(audio_data) # Using Google Web Speech API | |
| print(f"Transcription result: {text}") | |
| return f"Audio processed. Transcription: '{text}'" | |
| except sr.UnknownValueError: | |
| print("Speech Recognition could not understand audio") | |
| return "Audio processed, but could not understand speech." | |
| except sr.RequestError as e: | |
| print(f"Could not request results from Google Speech Recognition service; {e}") | |
| return f"Audio processed, but speech recognition service failed: {e}" | |
| except Exception as e: | |
| print(f"An unexpected error occurred during speech recognition: {e}") | |
| return f"An unexpected error occurred during speech recognition: {e}" | |
| except Exception as e: | |
| print(f"An error occurred during audio processing: {e}") | |
| return f"An error occurred during audio processing: {e}" | |
| finally: | |
| # Clean up the temporary WAV file | |
| if 'temp_wav_file' in locals() and os.path.exists(temp_wav_file): | |
| os.remove(temp_wav_file) | |
| print(f"Temporary WAV file removed: {temp_wav_file}") | |
| def __call__(self, question: str, video_source: str | None = None, audio_source: str | None = None) -> str: | |
| # Removed global gemini_model declaration as it's not used here | |
| print(f"Agent received question (first 50 chars): {question[:50]}...") | |
| print(f"Video source provided: {video_source}") | |
| print(f"Audio source provided: {audio_source}") | |
| # --- Check for media processing tasks --- | |
| media_processing_results = [] | |
| if video_source: | |
| print("Video source provided. Attempting video processing.") | |
| video_processing_result = self.process_video(video_source) | |
| media_processing_results.append(f"Video processing result: {video_processing_result}") | |
| if audio_source: | |
| print("Audio source provided. Attempting audio processing.") | |
| audio_processing_result = self.process_audio(audio_source) | |
| media_processing_results.append(f"Audio processing result: {audio_processing_result}") | |
| # If media was processed, return the results for now | |
| if media_processing_results: | |
| return "\n".join(media_processing_results) | |
| # Simple logic to determine if a web search is needed (only if no media source) | |
| question_lower = question.lower() | |
| search_keywords = ["what is", "how to", "where is", "who is", "when did", "define", "explain", "tell me about"] | |
| needs_search = any(keyword in question_lower for keyword in search_keywords) or "?" in question | |
| print(f"Needs search: {needs_search}") # Debugging search decision | |
| # --- Analyze question and refine search query --- | |
| # Simplified search query generation - removed LLM query generation | |
| search_query = question # Default search query is the original question | |
| if needs_search: | |
| print("Analyzing question for keywords and refining search query...") | |
| # Basic keyword extraction: split by common question words and take the rest | |
| parts = question_lower.split("what is", 1) | |
| if len(parts) > 1: | |
| search_query = parts[1].strip() | |
| else: | |
| parts = question_lower.split("how to", 1) | |
| if len(parts) > 1: | |
| search_query = parts[1].strip() | |
| else: | |
| parts = question_lower.split("where is", 1) | |
| if len(parts) > 1: | |
| search_query = parts[1].strip() | |
| else: | |
| parts = question_lower.split("who is", 1) | |
| if len(parts) > 1: | |
| search_query = parts[1].strip() | |
| else: | |
| parts = question_lower.split("when did", 1) | |
| if len(parts) > 1: | |
| search_query = parts[1].strip() | |
| else: | |
| parts = question_lower.split("define", 1) | |
| if len(parts) > 1: | |
| search_query = parts[1].strip() | |
| else: | |
| parts = question_lower.split("explain", 1) | |
| if len(parts) > 1: | |
| search_query = parts[1].strip() | |
| else: | |
| parts = question_lower.split("tell me about", 1) | |
| if len(parts) > 1: | |
| search_query = parts[1].strip() | |
| else: | |
| # If no specific question keyword found, use the whole question | |
| search_query = question_lower.strip() | |
| # Optional: Add quotation marks for multi-word phrases if identified | |
| # This simple approach just uses the extracted part as is. | |
| # A more complex approach would identify multi-word entities (e.g., "New York City") | |
| # and wrap them in quotes. | |
| # Optional: Add contextual terms | |
| # Example: If "musician" or "band" is in the question, add "discography" | |
| if any(word in question_lower for word in ["musician", "band", "artist", "singer"]): | |
| search_query += " discography" | |
| elif any(word in question_lower for word in ["movie", "film", "actor", "actress"]): | |
| search_query += " plot summary" | |
| elif any(word in question_lower for word in ["book", "author", "novel"]): | |
| search_query += " plot summary" | |
| print(f"Final search query used: {search_query}") # Debugging final query | |
| search_results = [] # Initialize search_results to an empty list before the try block | |
| if needs_search: | |
| print(f"Question likely requires search. Searching for: {search_query}") | |
| try: | |
| params = { # Define params here, before calling GoogleSearch | |
| "q": search_query, | |
| "api_key": SERPAPI_API_KEY, | |
| "engine": "google", # Use Google search engine | |
| "num": 5 # Number of results to fetch | |
| } | |
| search = GoogleSearch(params) # Use GoogleSearch from the correct package | |
| search_results_dict = search.get_dict() # Get results as a dictionary | |
| print(f"SerpAPI raw response keys: {search_results_dict.keys() if isinstance(search_results_dict, dict) else 'Response is not a dictionary'}") # Debugging response keys | |
| # Log the full SerpAPI response for debugging if organic_results is missing or empty | |
| if not isinstance(search_results_dict, dict) or "organic_results" not in search_results_dict or not isinstance(search_results_dict["organic_results"], list) or not search_results_dict["organic_results"]: | |
| print(f"SerpAPI response did not contain organic results or had invalid format. Response: {search_results_dict}") | |
| search_results = [] # Ensure search_results is empty if no organic results | |
| # Extract organic results | |
| # Add check that search_results_dict and organic_results are valid | |
| if isinstance(search_results_dict, dict) and "organic_results" in search_results_dict and isinstance(search_results_dict["organic_results"], list): | |
| print(f"Found {len(search_results_dict['organic_results'])} organic results.") # Debugging result count | |
| for result in search_results_dict["organic_results"]: | |
| # Add check for None or non-dict result item | |
| if result is None or not isinstance(result, dict): | |
| print(f"Skipping invalid search result item: {result}") | |
| continue | |
| item = { | |
| 'title': result.get('title'), | |
| 'url': result.get('link'), | |
| 'snippet': result.get('snippet', 'No snippet available') | |
| } | |
| search_results.append(item) # Append to search_results | |
| except Exception as e: | |
| print(f"An error occurred during SerpAPI web search: {e}") | |
| return f"An error occurred during web search: {e}" | |
| print(f"web_search returning {len(search_results)} results.") # Debugging return count | |
| # --- Use LLM to process search results if available (Removed LLM Synthesis) --- | |
| # Check that search_results is a list and is not empty | |
| if isinstance(search_results, list) and search_results and gemini_model is not None: | |
| print("Using Google LLM to process search results.") # Debugging print before LLM call | |
| # Format search results for the LLM | |
| context = "" | |
| for i, result in enumerate(search_results[:5]): # Use top 5 results for context | |
| # Add check for None or non-dict result item before accessing keys | |
| if result is None or not isinstance(result, dict): | |
| print(f"Skipping invalid result at index {i} in LLM context formatting: {result}") | |
| continue | |
| context += f"Source {i+1}:\n" | |
| if result.get('title'): | |
| context += f"Title: {result['title']}\n" | |
| if result.get('snippet'): | |
| context += f"Snippet: {result['snippet']}\n" | |
| if result.get('url'): | |
| context += f"URL: {result['url']}\n" | |
| context += "---\n" # Separator | |
| # Refined prompt for the LLM | |
| prompt = f"""Carefully read the following search results and answer the user's question based *only* on the information provided in these results. | |
| If the search results do not contain sufficient information to fully answer the question, explicitly state that you could not find enough information in the provided results. | |
| Do not use any outside knowledge. Structure your answer clearly and concisely. | |
| Question: {question} | |
| Search Results: | |
| {context} | |
| Answer:""" | |
| print(f"LLM Prompt (first 500 chars):\n{prompt[:500]}...") # Debugging prompt | |
| try: | |
| # Generate content using the Gemini model | |
| response = gemini_model.generate_content(prompt) | |
| generated_text = response.text # Get the generated text | |
| # Add check for empty or whitespace generated text | |
| if generated_text and generated_text.strip(): | |
| llm_answer = generated_text.strip() | |
| print(f"LLM generated text (first 100 chars): {generated_text[:100]}...") # Debugging raw output | |
| print(f"Agent returning LLM-based answer (first 100 chars): {llm_answer[:100]}...") # Debugging final answer | |
| return llm_answer | |
| else: | |
| print("LLM generated empty or whitespace answer.") | |
| return "I couldn't generate a specific answer based on the search results." | |
| except Exception as llm_e: | |
| print(f"An error occurred during LLM generation: {llm_e}") | |
| return f"An error occurred while processing search results with the LLM: {llm_e}" | |
| # Fallback if search results are empty or not a list, or LLM is None | |
| elif isinstance(search_results, list) and search_results: # Search results exist and is a list, but LLM is not available or failed | |
| print("Google Generative AI model not loaded or search results empty or LLM failed. Cannot use LLM for synthesis.") | |
| # Return the old style answer if LLM is not available, but only if search results exist | |
| print("Returning basic answer based on search results (LLM not available).") | |
| answer_parts = [] | |
| for i, result in enumerate(search_results[:3]): | |
| # Add check for None or non-dict result item before accessing keys | |
| if result is None or not isinstance(result, dict): | |
| print(f"Skipping invalid result at index {i} in basic answer formatting: {result}") | |
| continue | |
| if result.get('snippet'): | |
| # Limit snippet length to avoid overly long responses | |
| snippet = result['snippet'] | |
| if len(snippet) > 200: | |
| snippet = snippet[:200] + "..." | |
| answer_parts.append(f"Snippet {i+1}: {snippet}") | |
| elif result.get('title'): | |
| answer_parts.append(f"Result {i+1} Title: {result['title']}") | |
| if answer_parts: | |
| return "Based on web search (LLM not available):\n" + "\n".join(answer_parts) | |
| else: | |
| # Fallback if no useful snippets/titles found in search results | |
| print("No useful snippets/titles found in search results.") | |
| return "I couldn't find useful information in the search results (LLM not available)." | |
| else: # search_results is None or not a list, or empty | |
| print(f"Web search returned no results or results in invalid format. Type: {type(search_results)}") | |
| return "I couldn't find any relevant information on the web for your question." | |
| else: # needs_search is True but no search results were returned (this case is now covered by the try-except around web_search) | |
| # This else block should ideally not be reached if needs_search is True and web_search is called | |
| print("Question required search, but no search was performed or it failed.") | |
| return "I couldn't perform a web search for your question." | |
| else: | |
| # If no search is needed, return a default or simple response | |
| print("Question does not appear to require search. Returning fixed answer.") | |
| fixed_answer = "How can I help you?" | |
| return fixed_answer | |
| def run_and_submit_all( profile: gr.OAuthProfile | None, other_arg=None): # Modified to accept 2 arguments | |
| """ | |
| Fetches all questions, runs the BasicAgent on them, submits all answers, | |
| and displays the results. | |
| """ | |
| print("run_and_submit_all function started.") # Debugging print at function start | |
| # --- Determine HF Space Runtime URL and Repo URL --- | |
| space_id = os.getenv("SPACE_ID") # Get the SPACE_ID for sending link to the code | |
| if profile: | |
| username= f"{profile.username}" | |
| print(f"User logged in: {username}") | |
| else: | |
| print("User not logged in.") | |
| return "Please Login to Hugging Face with the button.", None | |
| api_url = DEFAULT_API_URL | |
| questions_url = f"{api_url}/questions" # Updated endpoint | |
| submit_url = f"{api_url}/submit" # Updated endpoint | |
| # 1. Instantiate Agent ( modify this part to create your agent) | |
| print("Attempting to instantiate BasicAgent...") # Debugging print before instantiation | |
| try: | |
| agent = BasicAgent() | |
| print("BasicAgent instantiated successfully.") # Debugging print after instantiation | |
| except Exception as e: | |
| print(f"Error instantiating agent: {e}") | |
| return f"Error initializing agent: {e}", None | |
| # In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
| print(agent_code) | |
| # 2. Fetch Questions | |
| print(f"Fetching questions from: {questions_url}") | |
| questions_data = None # Initialize to None | |
| try: | |
| response = requests.get(questions_url, timeout=15) | |
| response.raise_for_status() | |
| questions_data = response.json() | |
| # Add check for empty or non-list questions_data immediately after fetching | |
| if not isinstance(questions_data, list) or not questions_data: | |
| print(f"Fetched questions_data is empty or not a list. Type: {type(questions_data)}") | |
| return "Fetched questions list is empty or invalid format.", None | |
| print(f"Fetched {len(questions_data)} questions.") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching questions: {e}") | |
| return f"Error fetching questions: {e}", None | |
| except requests.exceptions.JSONDecodeError as e: | |
| print(f"Error decoding JSON response from questions endpoint: {e}") | |
| # Print the response text for debugging if JSON decoding fails | |
| print(f"Response text: {response.text[:500] if 'response' in locals() else 'No response object'}") | |
| return f"Error decoding server response for questions: {e}", None | |
| except Exception as e: | |
| print(f"An unexpected error occurred fetching questions: {e}") | |
| return f"An unexpected error occurred fetching questions: {e}", None | |
| # 3. Run your Agent | |
| results_log = [] | |
| answers_payload = [] | |
| print(f"Running agent on {len(questions_data)} questions...") | |
| # The check that questions_data is a list is now done immediately after fetching | |
| for item in questions_data: | |
| # Add check for None or non-dict item before accessing keys | |
| if item is None or not isinstance(item, dict): | |
| print(f"Skipping invalid item in questions_data: {item}") | |
| continue | |
| task_id = item.get("task_id") | |
| question_text = item.get("question") | |
| if not task_id or not isinstance(task_id, (str, int)) or not question_text or not isinstance(question_text, str): | |
| print(f"Skipping item with missing or invalid task_id or question: {item}") | |
| continue | |
| print(f"Processing Task ID: {task_id}") # Debugging task ID | |
| try: | |
| # Here, we only pass the question text for now, as the API doesn't support video input | |
| # The video processing logic is added but not triggered by this function | |
| submitted_answer = agent(question_text) | |
| print(f"Agent returned answer for {task_id}: {submitted_answer[:50]}...") # Debugging returned answer | |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
| except Exception as e: | |
| print(f"Error running agent on task {task_id}: {e}") | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
| if not answers_payload: | |
| print("Agent did not produce any answers to submit.") | |
| return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
| # 4. Prepare Submission | |
| submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
| status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
| print(status_update) | |
| # 5. Submit | |
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
| try: | |
| response = requests.post(submit_url, json=submission_data, timeout=60) | |
| response.raise_for_status() | |
| result_data = response.json() | |
| final_status = ( | |
| f"Submission Successful!\n" | |
| f"User: {result_data.get('username')}\n" | |
| f"Overall Score: {result_data.get('score', 'N/A')}% " | |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
| f"Message: {result_data.get('message', 'No message received.')}" | |
| ) | |
| print("Submission successful.") | |
| results_df = pd.DataFrame(results_log) | |
| return final_status, results_df | |
| except requests.exceptions.HTTPError as e: | |
| error_detail = f"Server responded with status {e.response.status_code}." | |
| try: | |
| error_json = e.response.json() | |
| error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
| status_message = f"Submission Failed: {error_detail}" | |
| print(status_message) | |
| # If submission fails, also return the results log so the user can see what was attempted | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except requests.exceptions.JSONDecodeError: | |
| error_detail += f" Response: {e.response.text[:500]}" | |
| status_message = f"Submission Failed: {error_detail}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| status_message = f"Submission Failed: {error_detail}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except requests.exceptions.Timeout: | |
| status_message = "Submission Failed: The request timed out." | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except requests.exceptions.RequestException as e: | |
| status_message = f"Submission Failed: Network error - {e}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except Exception as e: | |
| status_message = f"An unexpected error occurred during submission: {e}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| # Function to call process_video directly for testing | |
| def test_video_processing(video_source: str) -> str: | |
| print(f"Testing video processing with source: {video_source}") | |
| try: | |
| agent = BasicAgent() | |
| return agent.process_video(video_source) | |
| except Exception as e: | |
| return f"Error during video processing test: {e}" | |
| # Function to call process_audio directly for testing | |
| def test_audio_processing(audio_source: str) -> str: | |
| print(f"Testing audio processing with source: {audio_source}") | |
| try: | |
| agent = BasicAgent() | |
| return agent.process_audio(audio_source) | |
| except Exception as e: | |
| return f"Error during audio processing test: {e}" | |
| # Move Gradio interface definition and launch outside the function | |
| print("Defining Gradio interface...") # Debugging print | |
| with gr.Blocks(theme=gr.themes.Soft(), title="Basic Agent Evaluation Runner") as demo: | |
| gr.Markdown( | |
| """ | |
| # Basic Agent Evaluation Runner | |
| This application fetches a set of questions from a scoring API, | |
| runs your custom agent against each question, and submits the answers for scoring. | |
| **Instructions:** | |
| 1. Ensure your agent logic is defined in the `BasicAgent` class above. | |
| 2. **Get a SerpAPI key and a Google AI API key and add them as environment variables in your runtime environment (e.g., as secrets in your Hugging Face Space settings).** | |
| 3. Log in to Hugging Face using the button below. | |
| 4. Click the "Run Evaluation & Submit All Answers" button to run on predefined questions. | |
| 5. Use the "Test Video Processing" and "Test Audio Processing" sections to test media analysis. | |
| """ | |
| ) | |
| login_btn = gr.LoginButton() | |
| run_button = gr.Button("Run Evaluation & Submit All Answers") | |
| run_button.interactive = True # Re-enable the button | |
| status_output = gr.Textbox(label="Run Status", interactive=False, lines=5) | |
| results_output = gr.DataFrame(label="Evaluation Results") | |
| run_button.click( | |
| run_and_submit_all, | |
| inputs=[login_btn], # Pass the profile from the login button | |
| outputs=[status_output, results_output] | |
| ) | |
| gr.Markdown("---") # Separator | |
| gr.Markdown("## Test Media Processing") | |
| video_test_input = gr.Video(label="Upload Video or Paste URL") | |
| video_test_button = gr.Button("Test Video Processing") | |
| video_test_output = gr.Textbox(label="Video Processing Result", interactive=False) | |
| video_test_button.click( | |
| test_video_processing, | |
| inputs=[video_test_input], | |
| outputs=[video_test_output] | |
| ) | |
| audio_test_input = gr.Audio(label="Upload Audio or Paste URL") | |
| audio_test_button = gr.Button("Test Audio Processing") | |
| audio_test_output = gr.Textbox(label="Audio Processing Result", interactive=False) | |
| audio_test_button.click( | |
| test_audio_processing, | |
| inputs=[audio_test_input], | |
| outputs=[audio_test_output] | |
| ) | |
| print("Gradio interface defined.") # Debugging print | |
| # Ensure the app launches when the script is run | |
| print("Checking if script is run directly...") # Debugging print | |
| if __name__ == "__main__": | |
| print("Launching Gradio demo...") # Debugging print | |
| demo.launch(server_name="0.0.0.0") # Ensure binding to all interfaces | |
| print("Script finished.") # Debugging print |