Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import requests | |
| import inspect | |
| import pandas as pd | |
| from langgraph.graph import StateGraph, END | |
| from typing import TypedDict | |
| import string | |
| from transformers import pipeline | |
| import re | |
| import wikipedia | |
| import wikipediaapi | |
| # (Keep Constants as is) | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| # --- Basic Agent Definition --- | |
| # ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------ | |
| # class BasicAgent: | |
| # def __init__(self): | |
| # print("BasicAgent initialized.") | |
| # def __call__(self, question: str) -> str: | |
| # print(f"Agent received question (first 50 chars): {question[:50]}...") | |
| # fixed_answer = "This is a default answer." | |
| # print(f"Agent returning fixed answer: {fixed_answer}") | |
| # return fixed_answer | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| class SuperSmartAgent: | |
| def __init__(self): | |
| self.graph = self._build_graph() | |
| self.wiki_wiki = wikipediaapi.Wikipedia( | |
| language='en', | |
| extract_format=wikipediaapi.ExtractFormat.WIKI, | |
| user_agent='SelimResearchAgent/1.0' | |
| ) | |
| def _build_graph(self): | |
| def score_text(text): | |
| alnum_count = sum(c.isalnum() for c in text) | |
| space_count = text.count(' ') | |
| punctuation_count = sum(c in string.punctuation for c in text) | |
| ends_properly = text[-1] in '.!?' | |
| score = alnum_count + space_count | |
| if ends_properly: | |
| score += 5 | |
| return score | |
| def check_reversed(state): | |
| question = state["question"] | |
| reversed_candidate = question[::-1] | |
| original_score = score_text(question) | |
| reversed_score = score_text(reversed_candidate) | |
| if reversed_score > original_score: | |
| state["is_reversed"] = True | |
| else: | |
| state["is_reversed"] = False | |
| return state | |
| def fix_question(state): | |
| if state.get("is_reversed", False): | |
| state["question"] = state["question"][::-1] | |
| return state | |
| def check_riddle_or_trick(state): | |
| q = state["question"].lower() | |
| keywords = ["opposite of", "if you understand", "riddle", "trick question", "what comes next", "i speak without"] | |
| state["is_riddle"] = any(kw in q for kw in keywords) | |
| return state | |
| def solve_riddle(state): | |
| q = state["question"].lower() | |
| if "opposite of the word" in q: | |
| if "left" in q: | |
| state["response"] = "right" | |
| elif "up" in q: | |
| state["response"] = "down" | |
| elif "hot" in q: | |
| state["response"] = "cold" | |
| else: | |
| state["response"] = "Unknown opposite." | |
| else: | |
| state["response"] = "Could not solve riddle." | |
| return state | |
| def check_python_suitability(state): | |
| question = state["question"].lower() | |
| patterns = ["sum", "average", "count", "sort", "generate", "regex", "convert"] | |
| state["is_python"] = any(word in question for word in patterns) | |
| return state | |
| def generate_code(state): | |
| q = state["question"].lower() | |
| if "sum" in q: | |
| state["response"] = "numbers = [1, 2, 3]\nprint(sum(numbers))" | |
| elif "average" in q: | |
| state["response"] = "numbers = [1, 2, 3]\nprint(sum(numbers) / len(numbers))" | |
| elif "sort" in q: | |
| state["response"] = "data = [3, 1, 2]\ndata.sort()\nprint(data)" | |
| else: | |
| state["response"] = "# Code generation not implemented for this case." | |
| return state | |
| def fallback(state): | |
| state["response"] = "This question doesn't require Python or is unclear." | |
| return state | |
| def check_reasoning_needed(state): | |
| q = state["question"].lower() | |
| needs_reasoning = any(word in q for word in ["whose", "only", "first", "after", "before", "no longer", "not", "but", "except"]) | |
| state["needs_reasoning"] = needs_reasoning | |
| return state | |
| def check_wikipedia_suitability(state): | |
| q = state["question"].lower() | |
| triggers = [ | |
| "wikipedia", "who is", "what is", "when did", "where is", | |
| "tell me about", "how many", "how much", "what was the", | |
| "describe", "explain", "information about", "details about" | |
| ] | |
| state["is_wiki"] = any(trigger in q for trigger in triggers) | |
| return state | |
| def search_wikipedia(state): | |
| question = state["question"] | |
| try: | |
| page_titles = wikipedia.search(question) | |
| if not page_titles: | |
| state["response"] = "No relevant Wikipedia article found." | |
| return state | |
| page = wikipedia.page(page_titles[0]) | |
| summary = page.summary | |
| state["response"] = summary | |
| except Exception as e: | |
| state["response"] = f"Error fetching Wikipedia content: {e}" | |
| return state | |
| def get_relevant_context(self, question, search_results): | |
| """ | |
| Get more relevant context by focusing on the most relevant page and sections. | |
| """ | |
| if not search_results: | |
| return "" | |
| try: | |
| title = search_results[0] | |
| page = self.wiki_wiki.page(title) | |
| if page.exists(): | |
| full_content = page.text | |
| # Try to identify the most relevant sections based on question keywords | |
| key_phrases = self.extract_key_phrases(question) | |
| # Split content into sections (simplified approach) | |
| sections = re.split(r'\n\s*\n', full_content) | |
| relevant_sections = [] | |
| for section in sections: | |
| # Check if section contains any of the key phrases | |
| section_lower = section.lower() | |
| if any(phrase.lower() in section_lower for phrase in key_phrases): | |
| # Also check if section looks like it contains statistics or tables | |
| if self.section_contains_statistics(section): | |
| relevant_sections.insert(0, section) # Put more likely sections first | |
| else: | |
| relevant_sections.append(section) | |
| if relevant_sections: | |
| return "\n\n".join(relevant_sections) | |
| return full_content[:10000] # Limit context size | |
| except Exception as e: | |
| print(f"Error processing page: {e}") | |
| return "" | |
| return "" | |
| def section_contains_statistics(self, section): | |
| """Determine if a section likely contains statistics.""" | |
| indicators = [ | |
| 'statistics', 'stats', 'season', 'player', | |
| 'year', 'at bat', 'walk', 'home run', 'rbi', | |
| 'era', '| Year', '| Player', '| AB', '| W' | |
| ] | |
| section_lower = section.lower() | |
| return any(indicator.lower() in section_lower for indicator in indicators) | |
| def preprocess_context(self, context): # Now a proper method | |
| context = re.sub(r'\[\d+\]', '', context) | |
| context = re.sub(r'\s+', ' ', context).strip() | |
| context = re.sub(r'\{\|.*?\|\}', '', context, flags=re.DOTALL) | |
| return context | |
| def extract_key_phrases(question): | |
| """Identify important phrases in the question""" | |
| stop_words = {'the', 'a', 'an', 'is', 'are', 'was', 'were', 'how', 'what', 'when', 'where', 'who', 'which'} | |
| words = re.findall(r'\b\w+\b', question.lower()) | |
| key_phrases = [word for word in words if word not in stop_words and len(word) > 2] | |
| return key_phrases | |
| def validate_answer(question, answer): | |
| if "how many" in question.lower(): | |
| if not re.search(r'\d+', answer): | |
| return False | |
| return True | |
| def general_reasoning_qa(state): | |
| question = state["question"] | |
| try: | |
| # Search Wikipedia for relevant pages | |
| search_results = wikipedia.search(question, results=3) | |
| if not search_results: | |
| state["response"] = "Sorry, I couldn't find relevant information." | |
| return state | |
| # Get relevant context from Wikipedia | |
| context = self.get_relevant_context(question, search_results) | |
| if not context: | |
| state["response"] = "Sorry, I couldn't find relevant information." | |
| return state | |
| # Preprocess the context | |
| context = self.preprocess_context(context) | |
| # Extract tables from the context | |
| tables = self.extract_tables_from_wikipedia(context) | |
| # First try to extract a specific answer using our enhanced method | |
| answer = self.extract_answer(question, context, tables) | |
| if answer: | |
| state["response"] = answer | |
| return state | |
| # If we didn't find a specific answer, try a more thorough search | |
| # First check if we have tables that might contain the answer | |
| if tables: | |
| table_answer = self.find_answer_in_tables(question, tables) | |
| if table_answer: | |
| state["response"] = table_answer | |
| return state | |
| # If we still don't have an answer, try to find the most relevant sentence | |
| question_keywords = self.extract_key_phrases(question) | |
| if question_keywords: | |
| sentences = re.split(r'[.!?]', context) | |
| scored_sentences = [] | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if not sentence: | |
| continue | |
| # Score based on question keyword matches | |
| score = sum(1 for keyword in question_keywords if keyword.lower() in sentence.lower()) | |
| if score > 0: | |
| scored_sentences.append((score, sentence)) | |
| if scored_sentences: | |
| # Sort by score descending, then by length descending | |
| scored_sentences.sort(key=lambda x: (-x[0], -len(x[1]))) | |
| best_sentence = scored_sentences[0][1] | |
| # Try to extract a more concise answer | |
| number_match = re.search(r'(\d[\d,]*\d*)', best_sentence) | |
| if number_match and any(kw in question_lower for kw in ["how many", "how much", "what was the"]): | |
| start_idx = max(0, number_match.start() - 30) | |
| end_idx = min(len(best_sentence), number_match.end() + 30) | |
| relevant_part = best_sentence[start_idx:end_idx].strip() | |
| if relevant_part.endswith('.'): | |
| state["response"] = relevant_part | |
| else: | |
| state["response"] = relevant_part + "." | |
| return state | |
| # Fall back to full sentence if we can't find a more concise answer | |
| if best_sentence.endswith('.'): | |
| state["response"] = best_sentence | |
| else: | |
| state["response"] = best_sentence + "." | |
| return state | |
| # If we get here, we couldn't find a specific answer - return a summary | |
| try: | |
| first_page = self.wiki_wiki.page(search_results[0]) | |
| if first_page.exists(): | |
| summary = first_page.summary[:500] + "..." # Limit summary length | |
| state["response"] = f"I couldn't find a specific answer, but here's some relevant information: {summary}" | |
| else: | |
| state["response"] = "No relevant information found." | |
| except Exception as e: | |
| state["response"] = f"I couldn't find a specific answer in the available information." | |
| except Exception as e: | |
| state["response"] = f"An error occurred while searching for information: {str(e)}" | |
| return state | |
| def extract_tables_from_wikipedia(self, content): | |
| """ | |
| Extract tables from Wikipedia content. | |
| """ | |
| tables = [] | |
| # Look for wiki markup tables | |
| table_pattern = r'\{\|(.*?)\|\}', re.DOTALL | |
| table_matches = re.findall(table_pattern, content) | |
| for table_match in table_matches: | |
| rows = re.split(r'\|\-', table_match) | |
| clean_rows = [] | |
| for row in rows: | |
| cells = re.split(r'\|\|', row) | |
| clean_cells = [] | |
| for cell in cells: | |
| cell = re.sub(r'\[\[([^|\]]+)(?:|[^\]]+)?\]\]', r'\1', cell) | |
| cell = re.sub(r'<[^>]+>', '', cell) | |
| cell = re.sub(r'{{\s*[^{}]+\s*}}', '', cell) | |
| cell = re.sub(r'\s+', ' ', cell).strip() | |
| clean_cells.append(cell) | |
| if clean_cells: | |
| clean_rows.append(clean_cells) | |
| if clean_rows: | |
| tables.append(clean_rows) | |
| # Look for HTML tables | |
| html_table_pattern = r'<table.*?</table>', re.DOTALL|re.IGNORECASE | |
| html_table_matches = re.findall(html_table_pattern, content) | |
| for table_match in html_table_matches: | |
| rows = re.findall(r'<tr.*?</tr>', table_match, re.DOTALL|re.IGNORECASE) | |
| clean_rows = [] | |
| for row in rows: | |
| cells = re.findall(r'<t[dh].*?</t[dh]>', row, re.DOTALL|re.IGNORECASE) | |
| clean_cells = [] | |
| for cell in cells: | |
| cell = re.sub(r'<.*?>', '', cell) | |
| cell = re.sub(r'\s+', ' ', cell).strip() | |
| clean_cells.append(cell) | |
| if clean_cells: | |
| clean_rows.append(clean_cells) | |
| if clean_rows: | |
| tables.append(clean_rows) | |
| return tables | |
| def extract_answer(self, question, context, tables=None): | |
| """ | |
| Enhanced general purpose answer extraction from text context. | |
| """ | |
| if tables is None: | |
| tables = [] | |
| question_lower = question.lower() | |
| context_lower = context.lower() | |
| # First try to detect what type of question it is | |
| question_type = self.detect_question_type(question_lower) | |
| # Extract all numbers from context with their surrounding text | |
| number_contexts = [] | |
| for match in re.finditer(r'(\d[\d,]*\d*)', context): | |
| start_pos = max(0, match.start() - 50) | |
| end_pos = min(len(context), match.end() + 50) | |
| surrounding_text = context[start_pos:end_pos] | |
| number_contexts.append((match.group(1).replace(',', ''), surrounding_text)) | |
| # Extract all named entities | |
| named_entities = self.extract_named_entities(context) | |
| # Try to answer based on question type | |
| if question_type in ["count", "how many"]: | |
| # Look for numbers with relevant context | |
| best_match = self.find_best_number_match(question_lower, number_contexts) | |
| if best_match: | |
| number, _ = best_match | |
| return f"The answer is {number}." | |
| # If no specific pattern matches, check tables for numeric answers | |
| if tables: | |
| table_answer = self.find_answer_in_tables(question, tables) | |
| if table_answer: | |
| return table_answer | |
| elif question_type == "person": | |
| if named_entities: | |
| # Find the first person name that appears near relevant context | |
| relevant_name = self.find_relevant_person(question_lower, context_lower, named_entities) | |
| if relevant_name: | |
| return f"The answer is {relevant_name}." | |
| elif question_type == "date": | |
| # Look for dates/years | |
| years = re.findall(r'\b(19|20)\d{2}\b', context) | |
| date_patterns = [ | |
| r'\b\d{1,2}\s+(January|February|March|April|May|June|July|August|September|October|November|December)[\s,]\s*\d{4}\b', | |
| r'\b\d{1,2}/\d{1,2}/\d{4}\b', | |
| r'\b\d{1,2}-\d{1,2}-\d{4}\b', | |
| r'\b\d{4}\b' | |
| ] | |
| for pattern in date_patterns: | |
| matches = re.findall(pattern, context) | |
| if matches: | |
| if isinstance(matches[0], tuple): | |
| return f"The answer is {matches[0][0]} {matches[0][1]}." | |
| else: | |
| return f"The answer is {matches[0]}." | |
| # For other question types, try to find the most relevant sentence | |
| if question_keywords := self.extract_key_phrases(question): | |
| sentences = re.split(r'[.!?]', context) | |
| scored_sentences = [] | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if not sentence: | |
| continue | |
| # Score based on question keyword matches | |
| score = sum(1 for keyword in question_keywords if keyword.lower() in sentence.lower()) | |
| if score > 0: | |
| scored_sentences.append((score, sentence)) | |
| if scored_sentences: | |
| # Sort by score descending, then by length descending | |
| scored_sentences.sort(key=lambda x: (-x[0], -len(x[1]))) | |
| best_sentence = scored_sentences[0][1] | |
| # Try to extract a more concise answer | |
| number_match = re.search(r'(\d[\d,]*\d*)', best_sentence) | |
| if number_match and "how many" in question_type: | |
| start_idx = max(0, number_match.start() - 30) | |
| end_idx = min(len(best_sentence), number_match.end() + 30) | |
| relevant_part = best_sentence[start_idx:end_idx].strip() | |
| if relevant_part.endswith('.'): | |
| return relevant_part | |
| return relevant_part + "." | |
| # Fall back to full sentence | |
| if best_sentence.endswith('.'): | |
| return best_sentence | |
| return best_sentence + "." | |
| return None | |
| def detect_question_type(self, question): | |
| """Classify the type of question for general processing.""" | |
| if re.search(r'\bhow many\b|\bhow much\b|\bwhat was the\s+\w+\s+of\b', question): | |
| return "count" | |
| elif re.search(r'\bwho is\b|\bwho was\b|\bwhich person\b|\bwhich player\b', question): | |
| return "person" | |
| elif re.search(r'\bwhen did\b|\bwhen was\b|\bwhat year\b|\bwhat date\b', question): | |
| return "date" | |
| elif re.search(r'\bwhat is\b|\bwhat was\b|\bwhat are\b|\bwhat were\b', question): | |
| return "definition" | |
| elif re.search(r'\bwhere is\b|\bwhere was\b|\bwhat location\b', question): | |
| return "location" | |
| elif re.search(r'\blist of\b|\blist the\b|\bgive me a list of\b', question): | |
| return "list" | |
| else: | |
| return "general" | |
| def find_best_number_match(self, question, number_contexts): | |
| """Find the number from context that best matches the question.""" | |
| if not number_contexts: | |
| return None | |
| question_keywords = self.extract_key_phrases(question) | |
| scored_numbers = [] | |
| for number, context in number_contexts: | |
| context_lower = context.lower() | |
| score = 0 | |
| # Score based on question keyword presence in context | |
| for keyword in question_keywords: | |
| if keyword.lower() in context_lower: | |
| score += 1 | |
| # Score based on proximity of keywords to the number | |
| number_pos = context_lower.find(number.lower()) | |
| if number_pos != -1: | |
| for keyword in question_keywords: | |
| keyword_positions = [m.start() for m in re.finditer(re.escape(keyword.lower()), context_lower)] | |
| for pos in keyword_positions: | |
| distance = abs(number_pos - pos) | |
| score += max(0, 10 - distance/10) # Higher score for closer keywords | |
| # Small boost for numbers appearing earlier in the document | |
| score += (10000 - len(context)) / 10000 # Earlier numbers get slightly higher scores | |
| scored_numbers.append((score, number, context)) | |
| if not scored_numbers: | |
| return None | |
| # Return the highest scoring number and its context | |
| scored_numbers.sort(reverse=True, key=lambda x: x[0]) | |
| return (scored_numbers[0][1], scored_numbers[0][2]) | |
| def extract_named_entities(self, text): | |
| """Extract named entities (people, places, etc.) from text.""" | |
| sentences = re.split(r'[.!?]', text) | |
| entities = set() | |
| for sentence in sentences: | |
| tokens = re.findall(r'\b\w+\b', sentence) | |
| # Skip first word if capitalized (likely start of sentence) | |
| if len(tokens) > 0 and tokens[0][0].isupper(): | |
| tokens = tokens[1:] | |
| # Find sequences of capitalized words (likely proper nouns) | |
| i = 0 | |
| while i < len(tokens): | |
| if tokens[i][0].isupper(): | |
| start = i | |
| while i < len(tokens) and tokens[i][0].isupper(): | |
| i += 1 | |
| entity = ' '.join(tokens[start:i]) | |
| if len(entity.split()) >= 2 or len(entity) > 10: | |
| entities.add(entity) | |
| else: | |
| i += 1 | |
| # Look for titles like Dr., Mr., etc. | |
| title_pattern = r'\b(Dr|Mr|Ms|Mrs|Prof|Sr|Jr|Rev|Gen|Col|Maj|Lt|Sgt|Capt)\.\s+[A-Z][a-z]+' | |
| for match in re.finditer(title_pattern, text, re.IGNORECASE): | |
| full_match = match.group(0) | |
| # Try to get the full name by including following capitalized words | |
| remaining_text = text[match.end():] | |
| remaining_words = re.findall(r'\b\w+\b', remaining_text) | |
| full_entity = full_match | |
| j = 0 | |
| while j < len(remaining_words) and remaining_words[j][0].isupper(): | |
| full_entity += ' ' + remaining_words[j] | |
| j += 1 | |
| if full_entity: | |
| entities.add(full_entity.replace('. ', ' ').strip()) | |
| return list(entities) | |
| def find_relevant_person(self, question, context, entities): | |
| """Find the most relevant person entity based on question context.""" | |
| if not entities: | |
| return None | |
| question_keywords = self.extract_key_phrases(question) | |
| best_score = -1 | |
| best_entity = None | |
| for entity in entities: | |
| score = 0 | |
| entity_lower = entity.lower() | |
| # Check if entity appears in context near question keywords | |
| entity_positions = [m.start() for m in re.finditer(re.escape(entity), context, re.IGNORECASE)] | |
| for pos in entity_positions: | |
| # Check surrounding context for question keywords | |
| window_start = max(0, pos - 50) | |
| window_end = min(len(context), pos + len(entity) + 50) | |
| window_text = context[window_start:window_end] | |
| # Count keyword matches in window | |
| keyword_matches = sum(1 for keyword in question_keywords | |
| if keyword.lower() in window_text.lower()) | |
| score += keyword_matches | |
| # If this entity has a higher score, select it | |
| if score > best_score: | |
| best_score = score | |
| best_entity = entity | |
| return best_entity | |
| def find_answer_in_tables(self, question, tables): | |
| """ | |
| Search through extracted tables to find an answer to the question. | |
| """ | |
| if not tables: | |
| return None | |
| key_phrases = self.extract_key_phrases(question) | |
| question_lower = question.lower() | |
| for table in tables: | |
| # Check if table is relevant to the question | |
| table_is_relevant = False | |
| # Check headers and body for keywords | |
| all_text = [] | |
| if len(table) > 0: # If table has at least one row (headers) | |
| headers = table[0] | |
| all_text.extend(headers) | |
| if len(table) > 1: # If table has data rows | |
| body_text = ' '.join([' '.join(row) for row in table[1:]]) | |
| all_text.extend(body_text.split()) | |
| all_text_lower = ' '.join(all_text).lower() | |
| table_is_relevant = any(phrase.lower() in all_text_lower for phrase in key_phrases) | |
| if not table_is_relevant: | |
| continue | |
| # Determine column types | |
| column_types = self.detect_column_types(table) | |
| # Handle different question types based on column types | |
| if "how many" in question_lower or "what was the" in question_lower: | |
| numeric_columns = [i for i, col_type in enumerate(column_types) | |
| if col_type == 'number'] | |
| if numeric_columns and len(table) > 1: | |
| # Find rows that match question keywords | |
| relevant_rows = [] | |
| for row in table[1:]: # Skip header row | |
| row_text = ' '.join(row).lower() | |
| if any(phrase.lower() in row_text for phrase in key_phrases): | |
| relevant_rows.append(row) | |
| if relevant_rows: | |
| # For each numeric column, collect the numbers from relevant rows | |
| number_candidates = [] | |
| for row in relevant_rows: | |
| for col_idx in numeric_columns: | |
| if col_idx < len(row): | |
| cell = row[col_idx] | |
| numbers = re.findall(r'\d[\d,]*\d*', cell) | |
| for num in numbers: | |
| num_clean = num.replace(',', '') | |
| if num_clean.isdigit(): | |
| number_candidates.append((int(num_clean), row)) | |
| if number_candidates: | |
| # Return the first number found in relevant rows | |
| first_num = number_candidates[0][0] | |
| return f"The answer is {first_num}." | |
| elif "who" in question_lower or "which person" in question_lower: | |
| # Try to identify name columns | |
| name_columns = [] | |
| for i, col_type in enumerate(column_types): | |
| if col_type == 'name' and len(table) > 1: | |
| # Check if this column looks like names | |
| sample_values = [row[i] for row in table[1:min(5, len(table))]] | |
| if self.column_looks_like_names(sample_values): | |
| name_columns.append(i) | |
| if name_columns: | |
| relevant_rows = [] | |
| for row in table[1:]: | |
| row_text = ' '.join(row).lower() | |
| if any(phrase.lower() in row_text for phrase in key_phrases): | |
| relevant_rows.append(row) | |
| if relevant_rows: | |
| # Return first name found in relevant rows | |
| for row in relevant_rows: | |
| for col_idx in name_columns: | |
| if col_idx < len(row): | |
| possible_name = row[col_idx] | |
| if possible_name.strip(): | |
| return f"The answer is {possible_name}." | |
| return None # Added missing return statement | |
| class AgentState(TypedDict, total=False): | |
| question: str | |
| is_reversed: bool | |
| is_python: bool | |
| is_riddle: bool | |
| is_wiki: bool # Added for Wikipedia suitability check | |
| needs_reasoning: bool # Added for reasoning check | |
| response: str | |
| use_tool: str # Keep this if it's being used elsewhere | |
| builder = StateGraph(AgentState) | |
| # Add all nodes to the builder | |
| builder.add_node("check_reversed", check_reversed) | |
| builder.add_node("fix_question", fix_question) | |
| builder.add_node("check_riddle_or_trick", check_riddle_or_trick) | |
| builder.add_node("solve_riddle", solve_riddle) | |
| builder.add_node("check_wikipedia_suitability", check_wikipedia_suitability) | |
| builder.add_node("check_reasoning_needed", check_reasoning_needed) | |
| builder.add_node("general_reasoning_qa", general_reasoning_qa) | |
| builder.add_node("search_wikipedia", search_wikipedia) | |
| builder.add_node("check_python_suitability", check_python_suitability) | |
| builder.add_node("generate_code", generate_code) | |
| builder.add_node("fallback", fallback) | |
| # Set entry point and define edges | |
| builder.set_entry_point("check_reversed") | |
| builder.add_edge("check_reversed", "fix_question") | |
| builder.add_edge("fix_question", "check_riddle_or_trick") | |
| builder.add_conditional_edges( | |
| "check_riddle_or_trick", | |
| lambda s: "solve_riddle" if s.get("is_riddle") else "check_wikipedia_suitability" | |
| ) | |
| builder.add_conditional_edges( | |
| "check_wikipedia_suitability", | |
| lambda s: "search_wikipedia" if s.get("is_wiki") else "check_reasoning_needed" | |
| ) | |
| builder.add_conditional_edges( | |
| "check_reasoning_needed", | |
| lambda s: "general_reasoning_qa" if s.get("needs_reasoning") else "check_python_suitability" | |
| ) | |
| builder.add_conditional_edges( | |
| "check_python_suitability", | |
| lambda s: "generate_code" if s.get("is_python") else "fallback" | |
| ) | |
| # Ending edges | |
| builder.add_edge("solve_riddle", END) | |
| builder.add_edge("search_wikipedia", END) | |
| builder.add_edge("general_reasoning_qa", END) | |
| builder.add_edge("generate_code", END) | |
| builder.add_edge("fallback", END) | |
| graph = builder.compile() | |
| return graph | |
| def __call__(self, question: str) -> str: | |
| state = {"question": question} | |
| result = self.graph.invoke(state) | |
| return result.get("response", "No answer generated.") | |
| ######################################## | |
| def run_and_submit_all( profile: gr.OAuthProfile | None): | |
| """ | |
| Fetches all questions, runs the BasicAgent on them, submits all answers, | |
| and displays the results. | |
| """ | |
| # --- Determine HF Space Runtime URL and Repo URL --- | |
| space_id = os.getenv("https://huggingface.co/spaces/selim-ba/Final_Agent_HF_Course/tree/main") # Get the SPACE_ID for sending link to the code | |
| if profile: | |
| username= f"{profile.username}" | |
| print(f"User logged in: {username}") | |
| else: | |
| print("User not logged in.") | |
| return "Please Login to Hugging Face with the button.", None | |
| api_url = DEFAULT_API_URL | |
| questions_url = f"{api_url}/questions" | |
| submit_url = f"{api_url}/submit" | |
| # 1. Instantiate Agent ( modify this part to create your agent) | |
| try: | |
| agent = SuperSmartAgent() #BasicAgent() | |
| except Exception as e: | |
| print(f"Error instantiating agent: {e}") | |
| return f"Error initializing agent: {e}", None | |
| # In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) | |
| agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main" | |
| print(agent_code) | |
| # 2. Fetch Questions | |
| print(f"Fetching questions from: {questions_url}") | |
| try: | |
| response = requests.get(questions_url, timeout=15) | |
| response.raise_for_status() | |
| questions_data = response.json() | |
| if not questions_data: | |
| print("Fetched questions list is empty.") | |
| return "Fetched questions list is empty or invalid format.", None | |
| print(f"Fetched {len(questions_data)} questions.") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching questions: {e}") | |
| return f"Error fetching questions: {e}", None | |
| except requests.exceptions.JSONDecodeError as e: | |
| print(f"Error decoding JSON response from questions endpoint: {e}") | |
| print(f"Response text: {response.text[:500]}") | |
| return f"Error decoding server response for questions: {e}", None | |
| except Exception as e: | |
| print(f"An unexpected error occurred fetching questions: {e}") | |
| return f"An unexpected error occurred fetching questions: {e}", None | |
| # 3. Run your Agent | |
| results_log = [] | |
| answers_payload = [] | |
| print(f"Running agent on {len(questions_data)} questions...") | |
| for item in questions_data: | |
| task_id = item.get("task_id") | |
| question_text = item.get("question") | |
| if not task_id or question_text is None: | |
| print(f"Skipping item with missing task_id or question: {item}") | |
| continue | |
| try: | |
| submitted_answer = agent(question_text) | |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": submitted_answer}) | |
| except Exception as e: | |
| print(f"Error running agent on task {task_id}: {e}") | |
| results_log.append({"Task ID": task_id, "Question": question_text, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
| if not answers_payload: | |
| print("Agent did not produce any answers to submit.") | |
| return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
| # 4. Prepare Submission | |
| submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
| status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
| print(status_update) | |
| # 5. Submit | |
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
| try: | |
| response = requests.post(submit_url, json=submission_data, timeout=60) | |
| response.raise_for_status() | |
| result_data = response.json() | |
| final_status = ( | |
| f"Submission Successful!\n" | |
| f"User: {result_data.get('username')}\n" | |
| f"Overall Score: {result_data.get('score', 'N/A')}% " | |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
| f"Message: {result_data.get('message', 'No message received.')}" | |
| ) | |
| print("Submission successful.") | |
| results_df = pd.DataFrame(results_log) | |
| return final_status, results_df | |
| except requests.exceptions.HTTPError as e: | |
| error_detail = f"Server responded with status {e.response.status_code}." | |
| try: | |
| error_json = e.response.json() | |
| error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
| except requests.exceptions.JSONDecodeError: | |
| error_detail += f" Response: {e.response.text[:500]}" | |
| status_message = f"Submission Failed: {error_detail}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except requests.exceptions.Timeout: | |
| status_message = "Submission Failed: The request timed out." | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except requests.exceptions.RequestException as e: | |
| status_message = f"Submission Failed: Network error - {e}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except Exception as e: | |
| status_message = f"An unexpected error occurred during submission: {e}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| # --- Build Gradio Interface using Blocks --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Basic Agent Evaluation Runner") | |
| gr.Markdown( | |
| """ | |
| **Instructions:** | |
| 1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... | |
| 2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. | |
| 3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. | |
| --- | |
| **Disclaimers:** | |
| Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). | |
| This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. | |
| """ | |
| ) | |
| gr.LoginButton() | |
| run_button = gr.Button("Run Evaluation & Submit All Answers") | |
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
| # Removed max_rows=10 from DataFrame constructor | |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
| run_button.click( | |
| fn=run_and_submit_all, | |
| outputs=[status_output, results_table] | |
| ) | |
| if __name__ == "__main__": | |
| print("\n" + "-"*30 + " App Starting " + "-"*30) | |
| # Check for SPACE_HOST and SPACE_ID at startup for information | |
| space_host_startup = os.getenv("SPACE_HOST") | |
| space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup | |
| if space_host_startup: | |
| print(f"✅ SPACE_HOST found: {space_host_startup}") | |
| print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
| else: | |
| print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
| if space_id_startup: # Print repo URLs if SPACE_ID is found | |
| print(f"✅ SPACE_ID found: {space_id_startup}") | |
| print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
| print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
| else: | |
| print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
| print("-"*(60 + len(" App Starting ")) + "\n") | |
| print("Launching Gradio Interface for Basic Agent Evaluation...") | |
| demo.launch(debug=True, share=False) |