Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify | |
| import os | |
| import json | |
| import time | |
| from flask_cors import CORS | |
| from google import genai | |
| from google.genai import types | |
| from exa_py import Exa | |
| from linkup import LinkupClient | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Environment variables | |
| GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY") | |
| if not GOOGLE_API_KEY: | |
| raise ValueError("GOOGLE_API_KEY environment variable is not set.") | |
| EXA_API_KEY = os.environ.get("EXA_API_KEY") | |
| if not EXA_API_KEY: | |
| raise ValueError("EXA_API_KEY environment variable is not set.") | |
| LINKUP_API_KEY = os.environ.get("LINKUP_API_KEY") | |
| if not LINKUP_API_KEY: | |
| raise ValueError("LINKUP_API_KEY environment variable is not set.") | |
| # Initialize clients | |
| exa = Exa(api_key=EXA_API_KEY) | |
| linkup_client = LinkupClient(api_key=LINKUP_API_KEY) | |
| def get_data(search_term): | |
| """ | |
| Run the Linkup deep search for a given search term. | |
| If a rate-limit error occurs, wait 10 seconds and retry. | |
| """ | |
| full_query = f"{search_term} grants funding opportunities" | |
| print("\n=== DEBUG: Start get_data() ===") | |
| print(f"Search Term: {search_term}") | |
| print(f"Full Query: {full_query}\n") | |
| try: | |
| response = linkup_client.search( | |
| query=full_query, | |
| depth="deep", | |
| output_type="sourcedAnswer", | |
| include_images=False, | |
| ) | |
| print("\n=== DEBUG: Raw result from linkup search ===") | |
| print(response) | |
| print("===========================================") | |
| # Extract the answer content from Linkup response | |
| content = "" | |
| if hasattr(response, 'answer'): | |
| content = response.answer | |
| elif isinstance(response, dict) and 'answer' in response: | |
| content = response['answer'] | |
| else: | |
| content = str(response) | |
| # Process the content with Gemini AI to extract structured grant data | |
| structured_prompt = ( | |
| f"Based on the following search results about {search_term} grants, " | |
| "extract and structure grant information with:\n" | |
| "- Grant name/title\n" | |
| "- Short summary \n" | |
| "- Funding organization\n" | |
| "- Grant value (numeric only)\n" | |
| "- Application deadline\n" | |
| "- Eligible countries\n" | |
| "- Sector/field\n" | |
| "- Eligibility criteria\n" | |
| "- link URL\n" | |
| "Return in JSON format with a 'grants' array.\n\n" | |
| f"Search results: {content}" | |
| ) | |
| client = genai.Client(api_key=GOOGLE_API_KEY) | |
| gemini_response = client.models.generate_content( | |
| model="models/gemini-2.0-flash-lite", | |
| contents=f"{structured_prompt}, return the json string and nothing else" | |
| ) | |
| gemini_text = gemini_response.text | |
| print(f"DEBUG: Gemini response: {gemini_text}") | |
| # Parse JSON from Gemini response | |
| try: | |
| # Try to find JSON in the response | |
| start_index = gemini_text.find('{') | |
| if start_index == -1: | |
| start_index = gemini_text.find('[') | |
| if start_index != -1: | |
| if gemini_text[start_index] == '{': | |
| end_index = gemini_text.rfind('}') + 1 | |
| else: | |
| end_index = gemini_text.rfind(']') + 1 | |
| json_string = gemini_text[start_index:end_index] | |
| result = json.loads(json_string) | |
| # Ensure result has grants array | |
| if isinstance(result, list): | |
| result = {"grants": result} | |
| elif isinstance(result, dict) and "grants" not in result: | |
| # If it's a dict but no grants key, assume it's a single grant | |
| result = {"grants": [result]} | |
| else: | |
| result = {"grants": []} | |
| except json.JSONDecodeError as je: | |
| print(f"ERROR: Failed to parse JSON from Gemini response: {je}") | |
| result = {"grants": []} | |
| if not result or "grants" not in result or not result["grants"]: | |
| print(f"DEBUG: No grants found for '{search_term}'.") | |
| return {"error": f"No results returned for '{search_term}'. Please try again with a different search term."} | |
| print("DEBUG: Grants found, returning results.") | |
| return result | |
| except Exception as e: | |
| err_str = str(e) | |
| print(f"ERROR: Exception occurred - {err_str}") | |
| # Check for rate limiting or similar errors | |
| if "rate" in err_str.lower() or "limit" in err_str.lower(): | |
| print("DEBUG: Rate limit detected. Retrying in 10 seconds...") | |
| time.sleep(10) | |
| try: | |
| response = linkup_client.search( | |
| query=full_query, | |
| depth="deep", | |
| output_type="sourcedAnswer", | |
| include_images=False, | |
| ) | |
| # Process retry response similar to above | |
| content = "" | |
| if hasattr(response, 'answer'): | |
| content = response.answer | |
| elif isinstance(response, dict) and 'answer' in response: | |
| content = response['answer'] | |
| else: | |
| content = str(response) | |
| structured_prompt = ( | |
| f"Based on the following search results about {search_term} grants, " | |
| "extract and structure grant information with:\n" | |
| "- Grant name/title\n" | |
| "- Short summary \n" | |
| "- Funding organization\n" | |
| "- Grant value (numeric only)\n" | |
| "- Application deadline\n" | |
| "- Eligible countries\n" | |
| "- Sector/field\n" | |
| "- Eligibility criteria\n" | |
| "- link URL\n" | |
| "Return in JSON format with a 'grants' array.\n\n" | |
| f"Search results: {content}" | |
| ) | |
| client = genai.Client(api_key=GOOGLE_API_KEY) | |
| gemini_response = client.models.generate_content( | |
| model="models/gemini-2.0-flash-lite", | |
| contents=f"{structured_prompt}, return the json string and nothing else" | |
| ) | |
| gemini_text = gemini_response.text | |
| try: | |
| start_index = gemini_text.find('{') | |
| if start_index == -1: | |
| start_index = gemini_text.find('[') | |
| if start_index != -1: | |
| if gemini_text[start_index] == '{': | |
| end_index = gemini_text.rfind('}') + 1 | |
| else: | |
| end_index = gemini_text.rfind(']') + 1 | |
| json_string = gemini_text[start_index:end_index] | |
| result = json.loads(json_string) | |
| if isinstance(result, list): | |
| result = {"grants": result} | |
| elif isinstance(result, dict) and "grants" not in result: | |
| result = {"grants": [result]} | |
| else: | |
| result = {"grants": []} | |
| except json.JSONDecodeError: | |
| result = {"grants": []} | |
| if not result or "grants" not in result or not result["grants"]: | |
| print(f"DEBUG: No grants found after retry for '{search_term}'.") | |
| return {"error": f"No results returned for '{search_term}' after retry. Please try again with a different search term."} | |
| print("DEBUG: Grants found on retry, returning results.") | |
| return result | |
| except Exception as e2: | |
| print(f"ERROR: Retry failed - {str(e2)}") | |
| return {"error": f"Retry failed for '{search_term}': {str(e2)}. Please try again later."} | |
| else: | |
| return {"error": f"An error occurred for '{search_term}': {str(e)}. Please try again."} | |
| def process_multiple_search_terms(search_terms): | |
| """ | |
| Process multiple search terms and aggregate results. | |
| Returns a dictionary with a 'grants' key containing combined results. | |
| """ | |
| all_data = {"grants": []} | |
| for term in search_terms: | |
| term = term.strip() | |
| if not term: | |
| continue | |
| result = get_data(term) | |
| if result and result.get("grants"): | |
| all_data["grants"].extend(result["grants"]) | |
| return all_data | |
| def scrape(): | |
| """ | |
| Endpoint to scrape grant opportunities using search terms. | |
| Expects a JSON body with the key 'search_terms' (a string with newline-separated search terms | |
| or a list of strings). Returns JSON with the aggregated results. | |
| """ | |
| data = request.get_json() | |
| if not data or "search_terms" not in data: | |
| return jsonify({"error": "Request must include 'search_terms' key."}), 400 | |
| search_terms = data["search_terms"] | |
| if isinstance(search_terms, str): | |
| search_terms = [s.strip() for s in search_terms.split("\n") if s.strip()] | |
| elif not isinstance(search_terms, list): | |
| return jsonify({"error": "'search_terms' must be a string or list of strings."}), 400 | |
| if not search_terms: | |
| return jsonify({"error": "No valid search terms provided."}), 400 | |
| result = process_multiple_search_terms(search_terms) | |
| return jsonify(result), 200 | |
| def get_data_from_url(url): | |
| """ | |
| Scrape the provided URL using Exa API. | |
| Extract grant data using Gemini AI. | |
| """ | |
| print(f"\n=== DEBUG: Start get_data_from_url() ===") | |
| print(f"URL: {url}") | |
| try: | |
| # Use Exa to get content from URL | |
| result = exa.get_contents( | |
| [url], | |
| text=True | |
| ) | |
| print("\n=== DEBUG: Raw result from Exa ===") | |
| print(result) | |
| print("=====================================") | |
| # Extract text content from Exa response | |
| page_content = "" | |
| if hasattr(result, 'results') and result.results: | |
| page_content = result.results[0].text if hasattr(result.results[0], 'text') else str(result.results[0]) | |
| elif isinstance(result, dict) and 'results' in result and result['results']: | |
| page_content = result['results'][0].get('text', str(result['results'][0])) | |
| else: | |
| page_content = str(result) | |
| if not page_content: | |
| print("ERROR: No content extracted from URL") | |
| return {} | |
| print(f"DEBUG: Extracted content length: {len(page_content)}") | |
| # Process content with Gemini AI | |
| full_prompt = ( | |
| "Extract the following grant data from the provided web content. " | |
| "- Grant name/title\n" | |
| "- Short summary\n" | |
| "- Funding organization\n" | |
| "- Grant value (numeric only)\n" | |
| "- Application deadline\n" | |
| "- Eligible countries\n" | |
| "- Sector/field\n" | |
| "- Eligibility criteria\n" | |
| "Return in JSON format with a 'grants' array.\n\n" | |
| f"Web content: {page_content[:10000]}" # Limit content to avoid token limits | |
| ) | |
| client = genai.Client(api_key=GOOGLE_API_KEY) | |
| gemini_response = client.models.generate_content( | |
| model="models/gemini-2.0-flash-lite", | |
| contents=f"{full_prompt}, return the json string and nothing else" | |
| ) | |
| response_text = gemini_response.text | |
| print(f"DEBUG: Gemini response: {response_text}") | |
| # Extract JSON output from Gemini | |
| try: | |
| start_index = response_text.find('[') | |
| if start_index == -1: | |
| start_index = response_text.find('{') | |
| if start_index != -1: | |
| if response_text[start_index] == '[': | |
| end_index = response_text.rfind(']') + 1 | |
| else: | |
| end_index = response_text.rfind('}') + 1 | |
| json_string = response_text[start_index:end_index] | |
| parsed_result = json.loads(json_string) | |
| # Ensure JSON is wrapped correctly | |
| if isinstance(parsed_result, list): | |
| parsed_result = {"grants": parsed_result} | |
| elif isinstance(parsed_result, dict) and "grants" not in parsed_result: | |
| # If it's a dict but no grants key, assume it's a single grant | |
| parsed_result = {"grants": [parsed_result]} | |
| else: | |
| parsed_result = {"grants": []} | |
| except Exception as parse_error: | |
| print(f"Error parsing JSON from Gemini model response: {parse_error}") | |
| print(f"Response: {response_text}") | |
| return {} | |
| if not parsed_result.get("grants"): | |
| print("No grant opportunities found in the scraped URL.") | |
| return {} | |
| print(f"DEBUG: Found {len(parsed_result['grants'])} grants") | |
| if parsed_result['grants']: | |
| print(f"First grant opportunity: {parsed_result['grants'][0]}") | |
| return parsed_result | |
| except Exception as e: | |
| print(f"ERROR: Exception in get_data_from_url: {str(e)}") | |
| return {} | |
| def scrape_url(): | |
| """ | |
| Endpoint to scrape a provided URL for grant opportunities. | |
| Expects a JSON body with the key 'url'. | |
| Returns the scraped and processed grant data in JSON format. | |
| """ | |
| data = request.get_json() | |
| if not data or "url" not in data: | |
| return jsonify({"error": "Request must include 'url' key."}), 400 | |
| url = data["url"] | |
| result = get_data_from_url(url) | |
| if not result: | |
| return jsonify({"error": "Failed to scrape URL or no grants found."}), 500 | |
| return jsonify(result), 200 | |
| if __name__ == "__main__": | |
| app.run(debug=True, host="0.0.0.0", port=7860) |