import gradio as gr import pandas as pd import os import re import html import time import sys from pathlib import Path # Import Groq API client try: from groq import Groq except ImportError: print("Groq API not installed. Run: pip install groq") # Function to load all CSV files from the current directory def load_csv_files(): csv_files = {} current_dir = Path(".") for file in current_dir.glob("*_sorted.csv"): try: df = pd.read_csv(file, encoding='utf-8') # Fill NaN values with empty strings to avoid issues df = df.fillna("") # Clean the city name from the filename city_name = file.stem.replace('_sorted', '') city_name = city_name.replace('_', ' ').title() csv_files[city_name] = df except Exception as e: print(f"Error loading {file}: {e}") return csv_files # Function to get unique queries for a specific city def get_queries_for_city(city): if city not in all_data: return [] # Get unique queries from the dataframe queries = all_data[city]['query'].dropna().unique().tolist() # Sort queries and filter out empty strings queries = sorted([str(q) for q in queries if q and str(q).strip()]) return queries # Function to find entries that have empty or missing queries def find_empty_queries(city, preserve_order=True): data = all_data.get(city) if data is None: return "City data not found" results = [] for i, row in data.iterrows(): # Check if query is empty or NaN if pd.isna(row['query']) or str(row['query']).strip() == "": # Make sure all values are strings and handle NaN/None values context = str(row['context']) if not pd.isna(row['context']) else "" query = "(No Query)" if pd.isna(row['query']) else str(row['query']) url = str(row['url']) if not pd.isna(row['url']) else "" results.append({ 'url': url, 'context': context, 'query': query, 'original_index': i # Store the original row index }) # Format results using the same HTML formatting as search_data if not results: return "No entries without queries found" # Sort results by their original index if preserve_order is True if preserve_order: results.sort(key=lambda x: x['original_index']) # Create HTML formatted results for clickable links with better styling formatted_results = "
" for i, result in enumerate(results, 1): url = result['url'] url_safe = html.escape(url) original_idx = result['original_index'] + 1 # +1 for 1-based indexing for display formatted_results += f"
" formatted_results += f"

Entry Without Query #{i} (Dataset Row: {original_idx})

" formatted_results += f"

URL: {url_safe}

" # Handle context display safely context = result['context'] try: context_preview = context[:300] + ('...' if len(context) > 300 else '') context_preview = html.escape(context_preview) except (TypeError, AttributeError): context_preview = html.escape(str(context)) formatted_results += f"

Context: {context_preview}

" formatted_results += "

" formatted_results += "
" return formatted_results # Function to search through the dataframes based on query def search_data(city, search_type, search_query, case_sensitive=False, preserve_order=True): data = all_data.get(city) if data is None: return "City data not found" # Check if search_query is empty or None if not search_query or str(search_query).strip() == "": return "Please enter a search query" # Ensure search_query is a string search_query = str(search_query) # Convert search query to lowercase if not case sensitive if not case_sensitive: search_query = search_query.lower() results = [] if search_type == "Simple Text Search": for i, row in data.iterrows(): # Make sure all values are strings and handle NaN/None values context = str(row['context']) if not pd.isna(row['context']) else "" query = str(row['query']) if not pd.isna(row['query']) else "" url = str(row['url']) if not pd.isna(row['url']) else "" # Check in context and query based on case sensitivity context_to_check = context if case_sensitive else context.lower() query_to_check = query if case_sensitive else query.lower() if search_query in context_to_check or search_query in query_to_check: results.append({ 'url': url, 'context': context, 'query': query, 'original_index': i # Store the original row index }) elif search_type == "Regular Expression Search": try: pattern = re.compile(search_query, flags=0 if case_sensitive else re.IGNORECASE) for i, row in data.iterrows(): # Make sure all values are strings and handle NaN/None values context = str(row['context']) if not pd.isna(row['context']) else "" query = str(row['query']) if not pd.isna(row['query']) else "" url = str(row['url']) if not pd.isna(row['url']) else "" try: if pattern.search(context) or pattern.search(query): results.append({ 'url': url, 'context': context, 'query': query, 'original_index': i # Store the original row index }) except (TypeError, AttributeError) as e: print(f"Error searching row {i}: {e}") continue except re.error as e: return f"Regular expression error: {str(e)}" # Format results if not results: return "No matching results found" # Sort results by their original index if preserve_order is True if preserve_order: results.sort(key=lambda x: x['original_index']) # Create HTML formatted results for clickable links with better styling formatted_results = "
" for i, result in enumerate(results, 1): url = result['url'] url_safe = html.escape(url) original_idx = result['original_index'] + 1 # +1 for 1-based indexing for display formatted_results += f"
" formatted_results += f"

Result {i} (Dataset Row: {original_idx})

" formatted_results += f"

URL: {url_safe}

" formatted_results += f"

Query: {html.escape(str(result['query']))}

" # Handle context display safely context = result['context'] try: context_preview = context[:300] + ('...' if len(context) > 300 else '') context_preview = html.escape(context_preview) except (TypeError, AttributeError): context_preview = html.escape(str(context)) formatted_results += f"

Context: {context_preview}

" formatted_results += "

" formatted_results += "
" return formatted_results # Function to generate an answer using Groq API for a selected query def generate_answer_with_groq(city, question, max_sources=3, api_key=None, temperature=0.3): if not api_key or api_key.strip() == "": return "Error: Groq API key not provided. Please enter your API key in the field above." # Try to initialize the Groq client with the provided API key # Handle potential proxy-related issues on Hugging Face with multiple fallback strategies client = None # Strategy 1: Try basic initialization try: client = Groq(api_key=api_key) except TypeError as e: if "proxies" in str(e): # Strategy 2: Clear proxy environment variables and try again try: import os proxy_vars = ['http_proxy', 'https_proxy', 'HTTP_PROXY', 'HTTPS_PROXY', 'no_proxy', 'NO_PROXY'] original_values = {} # Store and clear all proxy-related env vars for var in proxy_vars: if var in os.environ: original_values[var] = os.environ[var] del os.environ[var] # Also try to clear any requests-related proxy settings import sys if 'requests' in sys.modules: import requests # Clear any session-level proxy settings requests.Session.proxies = {} # Try initializing again with clean environment client = Groq(api_key=api_key) # Restore original environment variables for var, value in original_values.items(): os.environ[var] = value except Exception as fallback_e1: # Strategy 3: Try importing and using Groq differently try: # Force reload the Groq module to clear any cached configurations if 'groq' in sys.modules: import importlib importlib.reload(sys.modules['groq']) from groq import Groq as GroqClient # Try with explicit parameter naming client = GroqClient(api_key=api_key) except Exception as fallback_e2: # Strategy 4: Try creating a minimal client configuration try: # Import inspect to check function signature import inspect groq_init_sig = inspect.signature(Groq.__init__) # Create kwargs with only supported parameters valid_kwargs = {'api_key': api_key} # Only include parameters that exist in the constructor for param_name in groq_init_sig.parameters: if param_name in ['self']: continue if param_name == 'api_key': valid_kwargs['api_key'] = api_key client = Groq(**valid_kwargs) except Exception as fallback_e3: return (f"Error initializing Groq client after multiple attempts:\n" f"Original error: {str(e)}\n" f"Fallback 1 failed: {str(fallback_e1)}\n" f"Fallback 2 failed: {str(fallback_e2)}\n" f"Fallback 3 failed: {str(fallback_e3)}\n" f"This might be due to version incompatibility or environment configuration on Hugging Face.") else: return f"Error initializing Groq client: {str(e)}" except Exception as e: return f"Error initializing Groq client: {str(e)}" # Check if client was successfully created if client is None: return "Failed to initialize Groq client after all attempts." data = all_data.get(city) if data is None: return "City data not found" # Find most relevant entries for the question # This is a simple relevance sorting based on TF-IDF-like scoring # For a production app, consider using proper embedding and semantic search scores = [] # Keywords that indicate modern tourism/hotel content to deprioritize tourism_keywords = ['hotel', 'vacation', 'booking', 'resort', 'accommodation', 'travel package', 'tourism', 'tourist', 'reservation', 'stay', 'room', 'suite', 'spa', 'restaurant'] for i, row in data.iterrows(): context = str(row['context']) if not pd.isna(row['context']) else "" url = str(row['url']) if not pd.isna(row['url']) else "" # Check if this entry is primarily about modern tourism context_lower = context.lower() url_lower = url.lower() tourism_score = sum(1 for keyword in tourism_keywords if keyword in context_lower or keyword in url_lower) # Simple scoring: count word overlap between question and context question_words = set(question.lower().split()) context_words = set(context.lower().split()) overlap = len(question_words.intersection(context_words)) # Add a score if there are words in common, but penalize tourism content if overlap > 0: # Reduce score for entries with high tourism content final_score = overlap - (tourism_score * 0.5) # Penalize tourism content if final_score > 0: # Only include if still has positive relevance scores.append({ 'index': i, 'score': final_score, 'url': url, 'context': context, 'tourism_score': tourism_score }) # Sort by score (descending) and take the top entries scores.sort(key=lambda x: x['score'], reverse=True) top_entries = scores[:max_sources] if not top_entries: return f"No relevant information found in the {city} dataset for this question." # Build a context from the most relevant entries context_for_llm = f"Question about the ancient city of {city}: {question}\n\n" context_for_llm += "Information from dataset:\n\n" for i, entry in enumerate(top_entries, 1): context_for_llm += f"Source {i}: {entry['url']}\n" context_for_llm += f"Context: {entry['context'][:500]}...\n\n" # Create a prompt for the LLM prompt = f"""You are an expert historian specializing in ancient cities. Use the following information to answer the question about the ancient city of {city}. Base your answer ONLY on the provided information and cite the sources. If you cannot find relevant information to answer the question, say so honestly. IMPORTANT: Ignore any information about modern hotels, vacation packages, tourism accommodations, travel bookings, or contemporary tourism services. Focus only on historical, archaeological, and scholarly information about the ancient city. {context_for_llm} Answer the question in a comprehensive, detailed, and informative way. Provide as much relevant historical context as possible. Include proper citations to the sources using [Source X] notation. Question: {question} First, conduct a thorough analysis of each source - evaluate the information quality, relevance, and historical significance. Skip any sources that only contain information about hotels, vacations, or modern tourism. Then provide a detailed, well-structured answer with comprehensive explanations and proper citations focused on historical and archaeological content. Include relevant background information, context, and connections to broader historical themes when supported by the sources. Answer with this structure: [THINKING] (Show your detailed analysis of the sources here, noting if any sources are skipped due to being about hotels/tourism. Explain how you're weighing the information and what historical connections you're making.) [/THINKING] [ANSWER] (Your comprehensive, detailed answer with citations, focusing on historical content only. Provide thorough explanations, context, and analysis based on the available sources.) [/ANSWER]""" try: # Make the API call to Groq start_time = time.time() # Call Groq API with the deepseek-r1-distill-llama-70b model response = client.chat.completions.create( model="moonshotai/kimi-k2-instruct-0905", messages=[ {"role": "system", "content": "You are an expert historian specializing in ancient cities."}, {"role": "user", "content": prompt} ], temperature=temperature, max_tokens=4000, # Increased for longer, more comprehensive answers top_p=0.9, ) completion_time = time.time() - start_time # Extract and format the response full_response = response.choices[0].message.content # Check for explicit markers first thinking_match = re.search(r'\[THINKING\](.*?)\[/THINKING\]', full_response, re.DOTALL) answer_match = re.search(r'\[ANSWER\](.*?)\[/ANSWER\]', full_response, re.DOTALL) # Initialize variables thinking = "" answer = "" # Try different strategies to extract thinking and answer sections if thinking_match and answer_match: # Case 1: Both markers exist thinking = thinking_match.group(1).strip() answer = answer_match.group(1).strip() elif "Final Answer:" in full_response: # Case 2: There's a "Final Answer:" heading parts = full_response.split("Final Answer:", 1) thinking = parts[0].strip() answer = parts[1].strip() elif "**Analysis of Sources:**" in full_response and "**Conclusion:**" in full_response: # Case 3: Look for analysis section followed by conclusion analysis_start = full_response.find("**Analysis of Sources:**") conclusion_start = full_response.find("**Conclusion:**") if analysis_start < conclusion_start: thinking = full_response[:analysis_start].strip() answer = full_response[analysis_start:].strip() else: thinking = full_response[:conclusion_start].strip() answer = full_response[conclusion_start:].strip() elif "Thus," in full_response and "Therefore," in full_response: # Case 4: Look for natural language transitions thinking_end = max(full_response.rfind("Thus,"), full_response.rfind("Therefore,")) if thinking_end > 0: thinking = full_response[:thinking_end].strip() answer = full_response[thinking_end:].strip() elif "Starting with Source" in full_response or "Source 1" in full_response: # Case 5: Detect source analysis pattern # Look for where detailed source analysis ends and final answer begins patterns = [ r"\n\n(?:To address|Based on|In conclusion|The answer|Therefore,|Thus,)", r"\n\n\*\*.*?\*\*", # Look for bold headings that might start the answer r"\n\nGiven the", r"\n\nFrom the" ] split_point = -1 for pattern in patterns: matches = list(re.finditer(pattern, full_response, re.IGNORECASE)) if matches: # Take the last match to ensure we're at the final answer section split_point = matches[-1].start() break if split_point > 0: thinking = full_response[:split_point].strip() answer = full_response[split_point:].strip() else: # Fallback: try to split at paragraph that doesn't start with "Source" parts = re.split(r'\n\n(?![Ss]ource)', full_response, 1) if len(parts) > 1 and len(parts[1]) > 100: # Make sure second part is substantial thinking = parts[0].strip() answer = parts[1].strip() else: thinking = "Source analysis integrated with response." answer = full_response else: # Case 6: Try to split at a double newline followed by a sentence # that doesn't start with "Source" (which is likely part of analysis) parts = re.split(r'\n\n(?![Ss]ource)', full_response, 1) if len(parts) > 1 and len(parts[1]) > 50: # Make sure second part is substantial thinking = parts[0].strip() answer = parts[1].strip() else: # Case 7: Default - use the whole response as answer and note no clear division thinking = "Analysis not clearly separated in the model's response." answer = full_response # Format the answer as HTML with collapsible thinking and prominent answer sections html_answer = f"
" # Add the main answer section first (most prominent) html_answer += "
" html_answer += "

Answer:

" # Format answer with proper paragraphs and citation highlighting formatted_answer = answer # Highlight source citations [Source X] formatted_answer = re.sub( r'\[Source (\d+)\]', r'[Source \1]', formatted_answer ) # Add paragraph breaks formatted_answer = formatted_answer.replace("\n\n", "

") formatted_answer = f"

{formatted_answer}

" html_answer += f"
{formatted_answer}
" html_answer += "
" # Add the collapsible thinking section html_answer += "
" html_answer += """
🔍 Show Analysis Process
""" # Format thinking text with proper paragraphs and source highlighting formatted_thinking = thinking # Replace "Source X:" with bold, highlighted version for i in range(1, 10): # Support up to 9 sources formatted_thinking = re.sub( rf"Source {i}:", f"Source {i}:", formatted_thinking ) # Add paragraph breaks for readability formatted_thinking = formatted_thinking.replace("\n\n", "

") formatted_thinking = f"

{formatted_thinking}

" html_answer += f"
{formatted_thinking}
" html_answer += "
" # Add source references at the bottom html_answer += "

Sources:

" # Add a small note at the bottom html_answer += f"

Generated using moonshotai/kimi-k2-instruct-0905 in {completion_time:.2f} seconds

" return html_answer except Exception as e: return f"Error generating answer: {str(e)}" # Load all CSV files on startup all_data = load_csv_files() city_names = list(all_data.keys()) if not city_names: city_names = ["No data found"] # Create the Gradio interface with gr.Blocks(title="Archaeological Query Engine") as app: # Add tabs - make sure there's only one top-level Tabs component with gr.Tabs() as tabs: with gr.TabItem("Search Dataset"): gr.Markdown("Search through information about ancient cities from CSV files.") with gr.Row(): with gr.Column(): city_dropdown = gr.Dropdown( choices=city_names, value=city_names[0] if city_names else None, label="Select City" ) # Dropdown for queries based on the selected city query_dropdown = gr.Dropdown( choices=get_queries_for_city(city_names[0] if city_names else None), label="Select a Query", allow_custom_value=True ) search_type = gr.Radio( choices=["Simple Text Search", "Regular Expression Search"], value="Simple Text Search", label="Search Type" ) # Keep a text box for custom queries search_query = gr.Textbox( label="Custom Search Query (optional)", placeholder="Enter custom text to search for..." ) case_sensitive = gr.Checkbox( label="Case Sensitive", value=False ) show_empty_queries = gr.Checkbox( label="Show Entries Without Queries", value=False, info="Check this to display entries that have empty or missing queries" ) preserve_order = gr.Checkbox( label="Preserve Original Dataset Order", value=True, info="When checked, results will be displayed in their original order from the dataset. When unchecked, results will be displayed in the order they are found." ) search_button = gr.Button("Search") with gr.Column(): results_text = gr.HTML( label="Search Results", value="", elem_classes=["results-output"] ) stats_text = gr.Textbox( label="Dataset Statistics", value=f"Total cities loaded: {len(city_names)}\nCities: {', '.join(city_names)}" ) # Update the query dropdown when the city changes def update_queries(city): return gr.Dropdown(choices=get_queries_for_city(city)) city_dropdown.change( fn=update_queries, inputs=city_dropdown, outputs=query_dropdown ) # Use either the dropdown query or the custom search query def search_with_queries(city, search_type, query_from_dropdown, custom_query, case_sensitive, show_empty_queries, preserve_order): if show_empty_queries: # If show_empty_queries is checked, we show entries without queries return find_empty_queries(city, preserve_order) else: # Otherwise, use the custom query if provided, otherwise use the dropdown selection final_query = custom_query if custom_query and custom_query.strip() else query_from_dropdown return search_data(city, search_type, final_query, case_sensitive, preserve_order) search_button.click( fn=search_with_queries, inputs=[city_dropdown, search_type, query_dropdown, search_query, case_sensitive, show_empty_queries, preserve_order], outputs=results_text ) # Add new tab for AI-generated answers using Groq API with gr.TabItem("AI Answers (Groq API)"): gr.Markdown("Ask questions about the dataset and get AI-generated answers using the Groq API with the moonshotai/kimi-k2-instruct-0905 model.") with gr.Row(): with gr.Column(): # API key is now hardcoded in the code ai_city_dropdown = gr.Dropdown( choices=city_names, value=city_names[0] if city_names else None, label="Select City" ) question_input = gr.Textbox( label="Ask a Question", placeholder="E.g., What was the historical significance of this ancient city?", lines=3 ) max_sources_slider = gr.Slider( minimum=1, maximum=10, value=3, step=1, label="Maximum Number of Sources to Consider", info="Higher values may provide more comprehensive answers but will take longer" ) temperature_slider = gr.Slider( minimum=0.0, maximum=1.0, value=0.3, step=0.1, label="Temperature", info="Lower values create more focused answers, higher values create more creative ones" ) generate_button = gr.Button("Generate Answer") with gr.Column(): answer_output = gr.HTML( label="AI-Generated Answer", value="", elem_classes=["results-output"] ) # Function to handle the Generate Answer button click def on_generate_answer(city, question, max_sources, temperature): if not question or not question.strip(): return "Please enter a question to generate an answer." groq_api_key = os.environ.get("GROQ_API") if not groq_api_key: return ( "Error: GROQ_API environment variable not set. " "Please set your Groq API key in the environment." ) try: return generate_answer_with_groq( city, question, max_sources, groq_api_key, temperature ) except Exception as e: return f"Error: {str(e)}" generate_button.click( fn=on_generate_answer, inputs=[ai_city_dropdown, question_input, max_sources_slider, temperature_slider], outputs=answer_output ) # Add CSS styling gr.HTML(""" """) # Launch the app if __name__ == "__main__": try: print(f"Loaded {len(city_names)} cities: {', '.join(city_names)}") app.launch(show_error=True) except Exception as e: print(f"Error starting application: {e}") import traceback traceback.print_exc()