from flask import Flask, render_template, jsonify, request import json import os import requests import time import random from functools import wraps from datetime import datetime # Try to import vector search with error handling try: from vector_search import SpaceKnowledgeBase VECTOR_SEARCH_AVAILABLE = True except ImportError as e: print(f"Warning: Vector search not available due to import error: {e}") print("Application will run without advanced vector search features") VECTOR_SEARCH_AVAILABLE = False SpaceKnowledgeBase = None app = Flask(__name__) # Initialize the knowledge base for vector search knowledge_base = None def get_knowledge_base(): """Get or initialize the knowledge base""" global knowledge_base if not VECTOR_SEARCH_AVAILABLE: print("Vector search not available, skipping knowledge base initialization") return None if knowledge_base is None: try: print("Initializing Space Knowledge Base...") knowledge_base = SpaceKnowledgeBase(data_dir="data") print("Knowledge base initialized successfully!") except Exception as e: print(f"Error initializing knowledge base: {e}") return None return knowledge_base # Mistral AI configuration MISTRAL_API_KEY = os.environ.get("MISTRAL_API_KEY", "") MISTRAL_API_URL = "https://api.mistral.ai/v1/chat/completions" # Rate limiting tracking last_api_call_time = 0 MIN_REQUEST_INTERVAL = 2.0 # Minimum 1 second between requests import time import random from functools import wraps def rate_limit_handler(max_retries=3, base_delay=1): """Decorator to handle rate limiting with exponential backoff""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): for attempt in range(max_retries): try: return func(*args, **kwargs) except requests.exceptions.HTTPError as e: if e.response.status_code == 429: # Too Many Requests if attempt < max_retries - 1: # Exponential backoff with jitter delay = base_delay * (2 ** attempt) + random.uniform(0, 1) print(f"Rate limit hit. Retrying in {delay:.2f} seconds... (Attempt {attempt + 1}/{max_retries})") time.sleep(delay) continue else: print("Max retries reached. Rate limit exceeded.") raise e else: raise e except Exception as e: raise e return None return wrapper return decorator @rate_limit_handler(max_retries=3, base_delay=2) def call_mistral_api(message): """Call Mistral API with rate limiting and retry logic""" global last_api_call_time try: # Ensure minimum interval between requests current_time = time.time() time_since_last_call = current_time - last_api_call_time if time_since_last_call < MIN_REQUEST_INTERVAL: sleep_time = MIN_REQUEST_INTERVAL - time_since_last_call print(f"Throttling request. Waiting {sleep_time:.2f} seconds...") time.sleep(sleep_time) headers = { "Content-Type": "application/json", "Authorization": f"Bearer {MISTRAL_API_KEY}" } data = { "model": "mistral-large-latest", "messages": [ { "role": "system", "content": "You are a knowledgeable space exploration assistant. You help users learn about space, astronomy, rockets, astronauts, planets, telescopes, and space agencies. Use the provided context to give accurate and informative answers. Always be helpful and informative about space-related topics. If you don't know something, say so rather than making up information. Be enthusiastic about space exploration and discovery." }, { "role": "user", "content": message } ], "temperature": 0.7, "max_tokens": 3000 } # Record the time of this API call last_api_call_time = time.time() response = requests.post(MISTRAL_API_URL, headers=headers, json=data, timeout=30) response.raise_for_status() result = response.json() return result["choices"][0]["message"]["content"] except requests.RequestException as e: print(f"Mistral API Error: {e}") if hasattr(e, 'response') and e.response is not None: if e.response.status_code == 429: raise e # Let the rate_limit_handler deal with 429 errors else: print(f"HTTP Error {e.response.status_code}: {e.response.text}") raise Exception(f"Failed to get response from Mistral: {str(e)}") # NASA Images API configuration NASA_IMAGES_API_BASE = "https://images-api.nasa.gov" def search_nasa_images(query="", year="", page=1, page_size=20): """Search NASA Images API""" try: params = { 'q': query if query else 'space', 'media_type': 'image', 'page': page, 'page_size': page_size } if year: params['year_start'] = year params['year_end'] = year response = requests.get(f"{NASA_IMAGES_API_BASE}/search", params=params, timeout=10) response.raise_for_status() data = response.json() items = [] if 'collection' in data and 'items' in data['collection']: for item in data['collection']['items']: if 'data' in item and len(item['data']) > 0: item_data = item['data'][0] image_info = { 'nasa_id': item_data.get('nasa_id', ''), 'title': item_data.get('title', 'Untitled'), 'description': item_data.get('description', ''), 'date_created': item_data.get('date_created', ''), 'center': item_data.get('center', ''), 'keywords': item_data.get('keywords', []), 'photographer': item_data.get('photographer', ''), 'location': item_data.get('location', ''), 'href': item.get('href', ''), 'thumbnail': '' } # Get thumbnail URL if 'links' in item: for link in item['links']: if link.get('rel') == 'preview': image_info['thumbnail'] = link.get('href', '') break items.append(image_info) return { 'items': items, 'total': data.get('collection', {}).get('metadata', {}).get('total_hits', 0) } except requests.RequestException as e: print(f"Error fetching NASA images: {e}") return {'items': [], 'total': 0} # Load space terminology data def load_space_terms(): try: with open('data/space_terminology.json', 'r', encoding='utf-8') as f: data = json.load(f) return data['space_terms'] except FileNotFoundError: return [] # Load space agencies data def load_space_agencies(): try: with open('data/space_agencies.json', 'r', encoding='utf-8') as f: data = json.load(f) return data['space_agencies'] except FileNotFoundError: return [] # Load planets data def load_planets(): try: with open('data/planets.json', 'r', encoding='utf-8') as f: data = json.load(f) return data['planets'] except FileNotFoundError: return [] # Load rockets data def load_rockets(): try: with open('data/rockets.json', 'r', encoding='utf-8') as f: data = json.load(f) return data['rockets'] except FileNotFoundError: return [] # Load astronauts data def load_astronauts(): try: with open('data/astronauts.json', 'r', encoding='utf-8') as f: data = json.load(f) return data['astronauts'] except FileNotFoundError: return [] # Load telescopes data def load_telescopes(): try: with open('data/telescopes.json', 'r', encoding='utf-8') as f: data = json.load(f) return data['telescopes'] except FileNotFoundError: return [] # Load notable people data def load_notable_people(): try: with open('data/notable_peoples.json', 'r', encoding='utf-8-sig') as f: data = json.load(f) return data['notable_space_contributors'] except FileNotFoundError: return [] # Load space museums data def load_space_museums(): try: with open('data/space_museams.json', 'r', encoding='utf-8') as f: data = json.load(f) return data['space_museums'] except FileNotFoundError: return [] @app.route('/') def index(): """Main page with feature cards""" return render_template('index.html') @app.route('/terminologies') def terminologies(): """Space terminologies explorer page""" return render_template('terminologies.html') @app.route('/agencies') def agencies(): """Space agencies explorer page""" return render_template('agencies.html') @app.route('/planets') def planets(): """Planets explorer page""" return render_template('planets.html') @app.route('/rockets') def rockets(): """Rockets explorer page""" return render_template('rockets.html') @app.route('/astronauts') def astronauts(): """Astronauts explorer page""" return render_template('astronauts.html') @app.route('/telescopes') def telescopes(): """Telescopes explorer page""" return render_template('telescopes.html') @app.route('/museums') def museums(): """Space museums explorer page""" return render_template('museums.html') @app.route('/chat') def chat(): """Space exploration chat interface""" return render_template('chat.html') @app.route('/images') def images(): """Space images explorer page""" return render_template('images.html') @app.route('/notable-people') def notable_people(): """Notable people in space exploration""" return render_template('notable_people.html') @app.route('/api/terms') def get_terms(): """API endpoint to get all space terms""" terms = load_space_terms() # Filter by letter if provided letter = request.args.get('letter', '').upper() if letter: terms = [term for term in terms if term['term'].upper().startswith(letter)] # Filter by search query if provided search_query = request.args.get('search', '').lower() if search_query: terms = [term for term in terms if search_query in term['term'].lower() or search_query in term['short_description'].lower() or search_query in term['category'].lower()] # Sort alphabetically terms.sort(key=lambda x: x['term'].lower()) return jsonify(terms) @app.route('/api/terms/') def get_term_details(term_id): """API endpoint to get details of a specific term""" terms = load_space_terms() term = next((t for t in terms if t['id'] == term_id), None) if term: return jsonify(term) else: return jsonify({'error': 'Term not found'}), 404 @app.route('/api/categories') def get_categories(): """API endpoint to get all available categories""" terms = load_space_terms() categories = list(set(term['category'] for term in terms)) categories.sort() return jsonify(categories) @app.route('/api/alphabet') def get_alphabet_stats(): """API endpoint to get term count for each letter""" terms = load_space_terms() alphabet_stats = {} for letter in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ': count = len([term for term in terms if term['term'].upper().startswith(letter)]) alphabet_stats[letter] = count return jsonify(alphabet_stats) @app.route('/api/agencies') def get_agencies(): """API endpoint to get all space agencies""" agencies = load_space_agencies() # Filter by type if provided agency_type = request.args.get('type', '').lower() if agency_type and agency_type != 'all': agencies = [agency for agency in agencies if agency['type'].lower() == agency_type] # Filter by country if provided country = request.args.get('country', '') if country and country != 'all': agencies = [agency for agency in agencies if agency['country'].lower() == country.lower()] # Filter by search query if provided search_query = request.args.get('search', '').lower() if search_query: agencies = [agency for agency in agencies if search_query in agency['name'].lower() or search_query in agency['full_name'].lower() or search_query in agency['country'].lower() or search_query in agency['description'].lower()] # Sort alphabetically agencies.sort(key=lambda x: x['name'].lower()) return jsonify(agencies) @app.route('/api/agencies/') def get_agency_details(agency_id): """API endpoint to get details of a specific agency""" agencies = load_space_agencies() agency = next((a for a in agencies if a['id'] == agency_id), None) if agency: return jsonify(agency) else: return jsonify({'error': 'Agency not found'}), 404 @app.route('/api/planets') def get_planets(): """API endpoint to get all planets""" planets = load_planets() # Filter by type if provided planet_type = request.args.get('type', '').lower() if planet_type and planet_type != 'all': planets = [planet for planet in planets if planet['type'].lower() == planet_type] # Filter by search query if provided search_query = request.args.get('search', '').lower() if search_query: planets = [planet for planet in planets if search_query in planet['name'].lower() or search_query in planet['description'].lower() or search_query in planet['type'].lower()] # Sort by distance from sun (inner to outer) distance_order = ['mercury', 'venus', 'earth', 'mars', 'jupiter', 'saturn', 'uranus', 'neptune'] planets.sort(key=lambda x: distance_order.index(x['id']) if x['id'] in distance_order else 999) return jsonify(planets) @app.route('/api/planets/') def get_planet_details(planet_id): """API endpoint to get details of a specific planet""" planets = load_planets() planet = next((p for p in planets if p['id'] == planet_id), None) if planet: return jsonify(planet) else: return jsonify({'error': 'Planet not found'}), 404 @app.route('/api/rockets') def get_rockets(): """API endpoint to get all rockets""" rockets = load_rockets() # Filter by type if provided rocket_type = request.args.get('type', '').lower() if rocket_type and rocket_type != 'all': rockets = [rocket for rocket in rockets if rocket['type'].lower() == rocket_type] # Filter by search query if provided search_query = request.args.get('search', '').lower() if search_query: rockets = [rocket for rocket in rockets if search_query in rocket['name'].lower() or search_query in rocket['description'].lower() or search_query in rocket['type'].lower()] # Sort alphabetically rockets.sort(key=lambda x: x['name'].lower()) return jsonify(rockets) @app.route('/api/rockets/') def get_rocket_details(rocket_id): """API endpoint to get details of a specific rocket""" rockets = load_rockets() rocket = next((r for r in rockets if r['id'] == rocket_id), None) if rocket: return jsonify(rocket) else: return jsonify({'error': 'Rocket not found'}), 404 @app.route('/api/astronauts') def get_astronauts(): """API endpoint to get all astronauts""" astronauts = load_astronauts() # Filter by country if provided country = request.args.get('country', '') if country and country != 'all': astronauts = [astronaut for astronaut in astronauts if astronaut['country'].lower() == country.lower()] # Filter by agency type if provided agency_type = request.args.get('type', '').lower() if agency_type and agency_type != 'all': astronauts = [astronaut for astronaut in astronauts if astronaut['type'].lower() == agency_type] # Filter by search query if provided search_query = request.args.get('search', '').lower() if search_query: astronauts = [astronaut for astronaut in astronauts if search_query in astronaut['name'].lower() or search_query in astronaut['description'].lower() or search_query in astronaut['country'].lower() or search_query in astronaut['agency'].lower()] # Sort alphabetically by name astronauts.sort(key=lambda x: x['name'].lower()) return jsonify(astronauts) @app.route('/api/astronauts/') def get_astronaut_details(astronaut_id): """API endpoint to get details of a specific astronaut""" astronauts = load_astronauts() astronaut = next((a for a in astronauts if a['id'] == astronaut_id), None) if astronaut: return jsonify(astronaut) else: return jsonify({'error': 'Astronaut not found'}), 404 @app.route('/api/telescopes') def get_telescopes(): """API endpoint to get all telescopes""" telescopes = load_telescopes() # Filter by type if provided telescope_type = request.args.get('type', '').lower() if telescope_type and telescope_type != 'all': telescopes = [telescope for telescope in telescopes if telescope['type'].lower() == telescope_type] # Filter by country if provided country = request.args.get('country', '') if country and country != 'all': telescopes = [telescope for telescope in telescopes if telescope['country'].lower() == country.lower()] # Filter by search query if provided search_query = request.args.get('search', '').lower() if search_query: telescopes = [telescope for telescope in telescopes if search_query in telescope['name'].lower() or search_query in telescope['description'].lower() or search_query in telescope['country'].lower() or search_query in telescope['inventor'].lower()] # Sort by year (newest first) telescopes.sort(key=lambda x: x.get('year', 0), reverse=True) return jsonify(telescopes) @app.route('/api/telescopes/') def get_telescope_details(telescope_id): """API endpoint to get details of a specific telescope""" telescopes = load_telescopes() telescope = next((t for t in telescopes if t['id'] == telescope_id), None) if telescope: return jsonify(telescope) else: return jsonify({'error': 'Telescope not found'}), 404 @app.route('/api/notable-people') def get_notable_people(): """API endpoint to get all notable people""" people = load_notable_people() # Filter by country if provided country = request.args.get('country', '') if country and country != 'all': people = [person for person in people if person['country'].lower() == country.lower()] # Filter by search query if provided search_query = request.args.get('search', '').lower() if search_query: people = [person for person in people if search_query in person['name'].lower() or search_query in person['contribution'].lower() or search_query in person['known_for'].lower() or search_query in person['country'].lower()] # Sort alphabetically by name people.sort(key=lambda x: x['name'].lower()) return jsonify(people) @app.route('/api/notable-people/') def get_notable_person_details(person_name): """API endpoint to get details of a specific notable person""" people = load_notable_people() # URL decode and normalize the name for comparison normalized_name = person_name.replace('-', ' ').replace('_', ' ').lower() person = next((p for p in people if p['name'].lower() == normalized_name), None) if person: return jsonify(person) else: return jsonify({'error': 'Person not found'}), 404 @app.route('/api/images') def get_images(): """API endpoint to get space images from NASA API""" # Get query parameters search_query = request.args.get('search', '') year = request.args.get('year', '') page = request.args.get('page', 1, type=int) page_size = request.args.get('page_size', 20, type=int) # Search NASA Images API result = search_nasa_images(search_query, year, page, page_size) return jsonify(result) @app.route('/api/museums') def get_museums(): """API endpoint to get all space museums with search, country filter, and pagination""" museums = load_space_museums() # Filter by country if provided country = request.args.get('country', '') if country and country != 'all': museums = [museum for museum in museums if museum['country'].lower() == country.lower()] # Filter by search query if provided search_query = request.args.get('search', '').lower() if search_query: museums = [museum for museum in museums if search_query in museum['name'].lower() or search_query in museum['city_or_region'].lower() or search_query in museum['famous_for'].lower() or search_query in museum['country'].lower() or search_query in (museum.get('additional_info', '').lower())] # Sort alphabetically by name museums.sort(key=lambda x: x['name'].lower()) # Pagination page = request.args.get('page', 1, type=int) page_size = request.args.get('page_size', 20, type=int) total = len(museums) start = (page - 1) * page_size end = start + page_size paginated = museums[start:end] return jsonify({ 'items': paginated, 'total': total, 'page': page, 'page_size': page_size }) @app.route('/api/museums/') def get_museum_details(museum_name): """API endpoint to get details of a specific museum by name (normalized)""" museums = load_space_museums() normalized_name = museum_name.replace('-', ' ').replace('_', ' ').lower() museum = next((m for m in museums if m['name'].lower() == normalized_name), None) if museum: return jsonify(museum) else: return jsonify({'error': 'Museum not found'}), 404 def get_knowledge_base_context(query, max_items=5, max_context_length=2000): """Get relevant context from the knowledge base using vector search for RAG""" try: if not VECTOR_SEARCH_AVAILABLE: print("Vector search not available, using fallback context search") return get_fallback_context(query, max_items) kb = get_knowledge_base() if not kb: print("Knowledge base not available, falling back to keyword search") return get_fallback_context(query, max_items) # Use vector search to find relevant context context = kb.get_context_for_query(query, max_context_length=max_context_length) if context: print(f"Vector search found context (length: {len(context)})") return context else: print("No relevant context found through vector search, using fallback") return get_fallback_context(query, max_items) except Exception as e: print(f"Error in vector search: {e}") # Fallback to simple keyword search if vector search fails return get_fallback_context(query, max_items) def get_fallback_context(query, max_items=5): """Fallback keyword-based context search if vector search fails""" context_items = [] query_lower = query.lower() # Search space terms terms = load_space_terms() relevant_terms = [] for term in terms: if (query_lower in term.get('term', '').lower() or query_lower in term.get('short_description', '').lower() or query_lower in term.get('detailed_description', '').lower() or any(query_lower in keyword.lower() for keyword in term.get('keywords', []))): relevant_terms.append(term) # Add top relevant terms to context for term in relevant_terms[:max_items//2]: context_items.append(f"**{term.get('term', 'Unknown')}**: {term.get('detailed_description', term.get('short_description', ''))}") # Search space agencies agencies = load_space_agencies() relevant_agencies = [] for agency in agencies: if (query_lower in agency.get('name', '').lower() or query_lower in agency.get('full_name', '').lower() or query_lower in agency.get('description', '').lower() or query_lower in agency.get('country', '').lower()): relevant_agencies.append(agency) # Add relevant agencies to context for agency in relevant_agencies[:2]: context_items.append(f"**{agency.get('name', 'Unknown')} ({agency.get('full_name', '')})**: {agency.get('description', '')}") # Search planets planets = load_planets() relevant_planets = [] for planet in planets: if (query_lower in planet.get('name', '').lower() or query_lower in planet.get('description', '').lower()): relevant_planets.append(planet) # Add relevant planets to context for planet in relevant_planets[:2]: context_items.append(f"**{planet.get('name', 'Unknown')}**: {planet.get('description', '')}") # Search rockets rockets = load_rockets() relevant_rockets = [] for rocket in rockets: if (query_lower in rocket.get('name', '').lower() or query_lower in rocket.get('description', '').lower()): relevant_rockets.append(rocket) # Add relevant rockets to context for rocket in relevant_rockets[:2]: context_items.append(f"**{rocket.get('name', 'Unknown')}**: {rocket.get('description', '')}") # Search astronauts astronauts = load_astronauts() relevant_astronauts = [] for astronaut in astronauts: if (query_lower in astronaut.get('name', '').lower() or query_lower in astronaut.get('description', '').lower() or query_lower in astronaut.get('agency', '').lower()): relevant_astronauts.append(astronaut) # Add relevant astronauts to context for astronaut in relevant_astronauts[:2]: context_items.append(f"**{astronaut.get('name', 'Unknown')}**: {astronaut.get('description', '')}") # Search telescopes telescopes = load_telescopes() relevant_telescopes = [] for telescope in telescopes: if (query_lower in telescope.get('name', '').lower() or query_lower in telescope.get('description', '').lower()): relevant_telescopes.append(telescope) # Add relevant telescopes to context for telescope in relevant_telescopes[:2]: context_items.append(f"**{telescope.get('name', 'Unknown')}**: {telescope.get('description', '')}") # Search museums museums = load_space_museums() relevant_museums = [] for museum in museums: if (query_lower in museum.get('name', '').lower() or query_lower in museum.get('famous_for', '').lower() or query_lower in museum.get('country', '').lower() or query_lower in museum.get('city_or_region', '').lower()): relevant_museums.append(museum) # Add relevant museums to context for museum in relevant_museums[:2]: context_items.append(f"**{museum.get('name', 'Unknown')}**: {museum.get('famous_for', '')} Located in {museum.get('city_or_region', '')}, {museum.get('country', '')}") # Search notable people notable_people = load_notable_people() relevant_people = [] for person in notable_people: if (query_lower in person.get('name', '').lower() or query_lower in person.get('contribution', '').lower() or query_lower in person.get('known_for', '').lower() or query_lower in person.get('country', '').lower()): relevant_people.append(person) # Add relevant people to context for person in relevant_people[:2]: context_items.append(f"**{person.get('name', 'Unknown')}**: {person.get('contribution', '')} Known for: {person.get('known_for', '')}") return "\n\n".join(context_items[:max_items]) @app.route('/api/chat', methods=['POST']) def chat_api(): """API endpoint for chat with RAG-enhanced responses""" try: data = request.get_json() user_message = data.get('message', '').strip() if not user_message: return jsonify({'error': 'Message is required'}), 400 print(f"Debug: Received message: {user_message}") # Get relevant context from knowledge base context = get_knowledge_base_context(user_message, max_items=8) print(f"Debug: Context found: {bool(context)}") if context: print(f"Debug: Context preview: {context[:200]}...") # Prepare the message with context if context: enhanced_message = f"""Context from Space Knowledge Base: {context} User Question: {user_message} Please answer the user's question using the provided context when relevant. If the context doesn't contain relevant information, feel free to use your general knowledge about space exploration.""" else: enhanced_message = f"User Question: {user_message}" print(f"Debug: Sending message to Mistral: {enhanced_message[:100]}...") try: # Get response from Mistral API with improved error handling print("Calling Mistral API...") response_text = call_mistral_api(enhanced_message) print(f"Debug: API response received successfully") except requests.exceptions.HTTPError as http_error: print(f"Mistral API HTTP Error: {http_error}") if hasattr(http_error, 'response') and http_error.response.status_code == 429: # Rate limit specific error response_text = "āš ļø The AI assistant is currently experiencing high demand. Please wait a moment and try again. In the meantime, here's what I found in our knowledge base:\n\n" + (context if context else "Please search our database directly for space information.") else: # Other HTTP errors response_text = f"šŸ”§ The AI assistant encountered a technical issue (HTTP {http_error.response.status_code if hasattr(http_error, 'response') else 'Unknown'}). " + (f"Here's relevant information from our knowledge base:\n\n{context}" if context else "Please try again later.") except Exception as mistral_error: print(f"Mistral API General Error: {mistral_error}") # Fallback response using context only if context: response_text = f"šŸ¤– I'm having trouble connecting to the AI assistant right now, but I found this relevant information in our space knowledge base:\n\n{context}\n\nšŸ’” This should help answer your question about '{user_message}'. Feel free to ask for more details!" else: response_text = f"šŸš€ I'm experiencing technical difficulties with the AI assistant at the moment. Please try rephrasing your question or browse our space database directly. Common topics include: planets, rockets, astronauts, space agencies, and telescopes." return jsonify({ 'response': response_text, 'context_used': bool(context), 'timestamp': datetime.now().isoformat() }) except Exception as e: print(f"Error in chat API: {e}") import traceback traceback.print_exc() return jsonify({'error': f'An error occurred: {str(e)}'}), 500 if __name__ == "__main__": # Configuration for Hugging Face Spaces PORT = int(os.environ.get("PORT", 7860)) HOST = "0.0.0.0" print(f"Starting CosmoPedia on {HOST}:{PORT}") app.run(host=HOST, port=PORT, debug=False, threaded=True)