Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import base64 | |
| import json | |
| from scrapegraphai.graphs import SearchGraph | |
| import nest_asyncio | |
| import os | |
| import subprocess | |
| import io | |
| import time | |
| import urllib.parse | |
| import asyncio | |
| from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings | |
| from langchain.vectorstores import FAISS | |
| from langchain.text_splitter import CharacterTextSplitter | |
| from langchain.chains import ConversationalRetrievalChain | |
| from langchain.memory import ConversationBufferMemory | |
| from google import genai | |
| from google.genai import types | |
| from langchain_community.document_loaders import PlaywrightURLLoader | |
| import requests | |
| # Import Supadata and initialize the client | |
| from supadata import Supadata, SupadataError | |
| # Import Crawl4AI | |
| from crawl4ai import AsyncWebCrawler | |
| SUPADATA_API_KEY = os.getenv("SUPADATA") | |
| supadata = Supadata(api_key=SUPADATA_API_KEY) | |
| # Ensure Playwright installs required browsers and dependencies | |
| subprocess.run(["playwright", "install"]) | |
| nest_asyncio.apply() | |
| GOOGLE_API_KEY = os.environ["GOOGLE_API_KEY"] | |
| graph_config = { | |
| "llm": { | |
| "api_key": GOOGLE_API_KEY, | |
| "model": "google_genai/gemini-2.0-flash-lite", | |
| }, | |
| "max_results": 8, | |
| "verbose": True, | |
| "headless": True | |
| } | |
| def get_data(search_term): | |
| """ | |
| Run the SearchGraph for a given search term. | |
| If a rate-limit error (202) occurs, wait 10 seconds and retry. | |
| If no results are returned or an error persists, notify the user. | |
| """ | |
| full_prompt = ( | |
| f"search for {search_term} grants\n\n" | |
| "List me all grants or funds with:\n" | |
| "- Grant name/title\n" | |
| "- Short summary \n" | |
| "- Funding organization\n" | |
| "- Grant value (numeric only)\n" | |
| "- Application deadline\n" | |
| "- Eligible countries\n" | |
| "- Sector/field\n" | |
| "- Eligibility criteria\n" | |
| "Return in JSON format." | |
| ) | |
| try: | |
| search_graph = SearchGraph( | |
| prompt=full_prompt, | |
| config=graph_config, | |
| ) | |
| result = search_graph.run() | |
| if not result or not result.get("grants"): | |
| st.error(f"No results returned for {search_term}. Please try again with a different search term.") | |
| return {} | |
| return result | |
| except Exception as e: | |
| err_str = str(e) | |
| if "202" in err_str: | |
| st.warning("Rate limit reached (202). Waiting 10 seconds before retrying...") | |
| time.sleep(10) | |
| try: | |
| search_graph = SearchGraph( | |
| prompt=full_prompt, | |
| config=graph_config, | |
| ) | |
| result = search_graph.run() | |
| if not result or not result.get("grants"): | |
| st.error(f"No results returned for {search_term}. Please try again with a different search term.") | |
| return {} | |
| return result | |
| except Exception as e2: | |
| st.error(f"Retry failed: {e2}. Please try again later.") | |
| return {} | |
| else: | |
| st.error(f"An error occurred for search term: {search_term}, error: {e}. Please try again.") | |
| return {} | |
| SUPADATA_API_KEY = os.getenv("SUPADATA") | |
| def get_data_from_url(url, scraping_tool="supadata"): | |
| """ | |
| Scrape the provided URL using the selected scraping tool. | |
| Args: | |
| url: The URL to scrape | |
| scraping_tool: Either "supadata", "crawl4ai", or "playwright" | |
| Returns: | |
| Dictionary containing the extracted grant data | |
| """ | |
| page_content = None # Placeholder for storing scraped page content | |
| # Choose the scraping method based on the selected tool | |
| if scraping_tool == "crawl4ai": | |
| try: | |
| # Use Crawl4AI for scraping | |
| async def run_crawler(): | |
| async with AsyncWebCrawler() as crawler: | |
| result = await crawler.arun(url=url) | |
| return result.markdown | |
| # Run the async crawler in a synchronous context | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| page_content = loop.run_until_complete(run_crawler()) | |
| loop.close() | |
| st.success("Successfully scraped using Crawl4AI") | |
| except Exception as e: | |
| st.error(f"Error using Crawl4AI: {e}") | |
| # Fall back to Supadata if Crawl4AI fails | |
| st.warning("Falling back to Supadata scraper...") | |
| scraping_tool = "supadata" | |
| if scraping_tool == "playwright": | |
| try: | |
| loader = PlaywrightURLLoader(urls=[url], remove_selectors=["header", "footer"]) | |
| data = loader.aload() | |
| page_content = data[0].page_content if data else "" | |
| st.success("Successfully scraped using Playwright") | |
| except Exception as e: | |
| st.error(f"Error using Playwright: {e}") | |
| # Fall back to Supadata if Playwright fails | |
| st.warning("Falling back to Supadata scraper...") | |
| scraping_tool = "supadata" | |
| if scraping_tool == "supadata": | |
| # **Step 1: Attempt Supadata's Built-in Scraper** | |
| try: | |
| web_content = supadata.web.scrape(url) | |
| page_content = web_content.content | |
| st.success("Successfully scraped using Supadata built-in scraper") | |
| except TypeError as te: | |
| if "unexpected keyword argument 'type'" in str(te): | |
| st.warning("Falling back to Supadata API due to unexpected keyword 'type' error.") | |
| else: | |
| st.error(f"Unexpected error in Supadata scrape: {te}") | |
| # **Step 2: If Supadata's Built-in Scraper Fails, Use Supadata API** | |
| if not page_content: | |
| try: | |
| api_url = "https://api.supadata.ai/v1/web/scrape" | |
| headers = {"X-API-Key": SUPADATA_API_KEY} | |
| response = requests.get(api_url, headers=headers, params={"url": url}) | |
| if response.status_code == 200: | |
| page_content = response.json().get("content", "") | |
| st.success("Successfully scraped using Supadata API") | |
| else: | |
| st.error(f"Supadata API failed with status {response.status_code}") | |
| except Exception as e: | |
| st.error(f"Error calling Supadata API: {e}") | |
| # **Step 3: If Supadata API Fails, Use Direct Web Request** | |
| if not page_content: | |
| try: | |
| r = requests.get(url, timeout=10) | |
| if r.status_code == 200: | |
| page_content = r.text | |
| st.success("Successfully retrieved content with direct request") | |
| else: | |
| st.error(f"Manual scraping failed with status code {r.status_code}") | |
| return {} | |
| except Exception as e: | |
| st.error(f"Manual scraping error: {e}") | |
| return {} | |
| # If we still don't have content after all attempts | |
| if not page_content: | |
| st.error("Failed to retrieve content from the URL with all available methods") | |
| return {} | |
| # **Pass Content to Gemini AI** | |
| full_prompt = ( | |
| "Extract the following grant data from the provided web content. " | |
| "- Grant name/title\n" | |
| "- Short summary\n" | |
| "- Funding organization\n" | |
| "- Grant value (numeric only)\n" | |
| "- Application deadline\n" | |
| "- Eligible countries\n" | |
| "- Sector/field\n" | |
| "- Eligibility criteria\n" | |
| "Return in JSON format.\n\n" | |
| f"Web content: {page_content}" | |
| ) | |
| client = genai.Client(api_key=GOOGLE_API_KEY) | |
| new_answer = client.models.generate_content( | |
| model="models/gemini-2.0-flash-lite", | |
| contents=f"{full_prompt}, return the json string and nothing else" | |
| ) | |
| response = new_answer.text | |
| # **Extract JSON Output from Gemini** | |
| try: | |
| start_index = response.find('[') | |
| end_index = response.rfind(']') + 1 | |
| json_string = response[start_index:end_index] | |
| result = json.loads(json_string) | |
| except Exception as parse_error: | |
| st.error(f"Error parsing JSON from Gemini model response. Response: {response}") | |
| return {} | |
| # **Ensure JSON is Wrapped Correctly** | |
| if isinstance(result, list): | |
| result = {"grants": result} | |
| if not result.get("grants"): | |
| st.error("No grant opportunities found in the scraped URL.") | |
| return {} | |
| st.success(f"First grant opportunity: {result['grants'][0]}") | |
| return result | |
| def process_multiple_search_terms(search_terms): | |
| """ | |
| Process multiple search terms with progress tracking. | |
| Returns a dictionary with a 'grants' key containing combined results. | |
| """ | |
| all_data = {"grants": []} | |
| progress_bar = st.progress(0) | |
| status_container = st.empty() | |
| total_terms = len(search_terms) | |
| for index, term in enumerate(search_terms): | |
| term = term.strip() | |
| if not term: | |
| continue | |
| progress = (index + 1) / total_terms | |
| progress_bar.progress(progress) | |
| status_container.markdown( | |
| f""" | |
| **Processing Grant Opportunities** π | |
| Searching term {index+1} of {total_terms}: `{term}` | |
| <br> | |
| <p style='font-size: 0.9em; color: #6699CC;'>Completed: {index}/{total_terms} | Remaining: {total_terms - index - 1}</p> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| result = get_data(term) | |
| if result and result.get("grants"): | |
| all_data["grants"].extend(result["grants"]) | |
| progress_bar.empty() | |
| status_container.empty() | |
| if not all_data["grants"]: | |
| st.error("No grant opportunities were found. Please try again with different search terms.") | |
| return all_data | |
| def convert_to_csv(data): | |
| df = pd.DataFrame(data["grants"]) | |
| return df.to_csv(index=False).encode("utf-8") | |
| def convert_to_excel(data): | |
| df = pd.DataFrame(data["grants"]) | |
| buffer = io.BytesIO() | |
| with pd.ExcelWriter(buffer, engine="xlsxwriter") as writer: | |
| df.to_excel(writer, sheet_name="Grants", index=False) | |
| return buffer.getvalue() | |
| def create_knowledge_base(data): | |
| # Store JSON representation of data in session state | |
| st.session_state.knowledge_base_json = json.dumps(data, indent=2) | |
| def chat_with_knowledge_base(query): | |
| if "knowledge_base_json" not in st.session_state: | |
| return "Knowledge base not initialized. Please load grant data first." | |
| context = st.session_state.knowledge_base_json | |
| prompt = f""" | |
| You are an AI assistant that helps users analyze grant opportunities. | |
| Here is the extracted grant data in JSON format: | |
| {context} | |
| User's question: {query} | |
| Answer the question based on the provided grant data. | |
| """ | |
| llm = ChatGoogleGenerativeAI( | |
| model="gemini-2.0-flash-thinking-exp", google_api_key=GOOGLE_API_KEY, temperature=0 | |
| ) | |
| response = llm.invoke(prompt) | |
| return response.content | |
| def get_shareable_link(file_data, file_name, file_type): | |
| b64 = base64.b64encode(file_data).decode() | |
| return f"data:{file_type};base64,{b64}" | |
| def main(): | |
| st.set_page_config(page_title="Quantilytix Grant Finder", page_icon="π°", layout="wide") | |
| st.title("π° Quantilytix Grant Finder") | |
| st.markdown(""" | |
| <div style="text-align: justify;"> | |
| <p> | |
| Welcome to <b>Quantilytix Grant Finder</b>, an AI-powered platform designed to streamline the grant discovery process, especially for academics and researchers across the globe. | |
| </p> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Sidebar controls | |
| st.sidebar.image("logoqb.jpeg", use_container_width=True) | |
| st.sidebar.header("Scrape & Configure") | |
| if "scraped_data" not in st.session_state: | |
| st.session_state.scraped_data = None | |
| if "chat_history" not in st.session_state: | |
| st.session_state.chat_history = [] | |
| if "chat_interface_active" not in st.session_state: | |
| st.session_state.chat_interface_active = False | |
| # Sidebar: Input Type Selection | |
| input_type = st.sidebar.radio( | |
| "Select Input Type:", | |
| ("Search Query", "URL"), | |
| key="input_type_selector" | |
| ) | |
| # Sidebar: Input field based on selection | |
| if input_type == "Search Query": | |
| search_input = st.sidebar.text_area( | |
| "Enter Search Terms (one per line). Maximum 2", | |
| height=150, | |
| help="Input search terms to discover grant opportunities. Terms can be specific or generic.", | |
| placeholder="e.g.,\nRenewable energy \nclimate change research\nAgriculture in Africa" | |
| ) | |
| else: | |
| url_input = st.sidebar.text_input( | |
| "Enter URL to scrape for grant opportunities", | |
| placeholder="https://example.com/grants" | |
| ) | |
| # Scraping tool selector | |
| scraping_tool = st.sidebar.radio( | |
| "Select Scraping Tool:", | |
| ("Supadata", "Crawl4AI", "Playwright"), | |
| key="scraping_tool_selector" | |
| ) | |
| # Execute based on input type selection | |
| if input_type == "Search Query": | |
| if st.sidebar.button("π Get Grant Opportunities"): | |
| if search_input: | |
| search_terms = [term.strip() for term in search_input.split("\n") if term.strip()] | |
| if search_terms: | |
| with st.spinner("Searching in progress... Please wait patiently."): | |
| result = process_multiple_search_terms(search_terms) | |
| st.session_state.scraped_data = result | |
| if result.get("grants"): | |
| st.sidebar.success(f"β Found {len(result['grants'])} grant opportunities from {len(search_terms)} search terms!") | |
| else: | |
| st.sidebar.warning("β οΈ Please enter valid search terms.") | |
| else: | |
| st.sidebar.warning("β οΈ Please enter at least one search term to begin.") | |
| else: # URL input | |
| if st.sidebar.button("π Scrape URL for Grant Opportunities"): | |
| if url_input: | |
| with st.spinner(f"Scraping URL using {scraping_tool}... Please wait patiently."): | |
| result = get_data_from_url(url_input, scraping_tool.lower()) | |
| st.session_state.scraped_data = result | |
| if result.get("grants"): | |
| st.sidebar.success(f"β Found {len(result['grants'])} grant opportunities from the URL!") | |
| else: | |
| st.sidebar.warning("β οΈ Please enter a valid URL to scrape.") | |
| # Sidebar: Download & Share Controls | |
| if st.session_state.scraped_data and st.session_state.scraped_data.get('grants'): | |
| st.sidebar.markdown("---") | |
| st.sidebar.subheader("Download & Share") | |
| selected_format = st.sidebar.selectbox("Download As:", ("CSV", "Excel"), key="download_format_selector") | |
| if selected_format == "CSV": | |
| file_data = convert_to_csv(st.session_state.scraped_data) | |
| file_name = "grants_data.csv" | |
| file_type = "text/csv" | |
| else: | |
| file_data = convert_to_excel(st.session_state.scraped_data) | |
| file_name = "grants_data.xlsx" | |
| file_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" | |
| download_link_html = f"<a href='data:{file_type};base64,{base64.b64encode(file_data).decode()}' download='{file_name}'><button style='background-color:#4CAF50;color:white;padding:10px 15px;border:none;border-radius:4px;'>β¬οΈ Download {selected_format}</button></a>" | |
| st.sidebar.markdown(download_link_html, unsafe_allow_html=True) | |
| shareable_link = get_shareable_link(file_data, file_name, file_type) | |
| whatsapp_url = f"https://api.whatsapp.com/send?text={urllib.parse.quote(f'Check out these grant opportunities: {shareable_link}')}" | |
| email_subject = urllib.parse.quote("Grant Opportunities File") | |
| email_body = urllib.parse.quote(f"Download the grant opportunities file here: {shareable_link}") | |
| email_url = f"mailto:?subject={email_subject}&body={email_body}" | |
| st.sidebar.markdown("<div style='margin-top:10px;'>Share via:</div>", unsafe_allow_html=True) | |
| st.sidebar.markdown(f"π± [WhatsApp]({whatsapp_url}) | π§ [Email]({email_url})", unsafe_allow_html=True) | |
| # Sidebar: Load as Knowledge Base & Chat | |
| if st.sidebar.button("π§ Load as Knowledge Base & Chat"): | |
| with st.spinner("Loading data into knowledge base..."): | |
| create_knowledge_base(st.session_state.scraped_data) | |
| st.session_state.chat_interface_active = True | |
| st.session_state.chat_history = [] | |
| st.sidebar.success("Knowledge base loaded!") | |
| # Main area: Data Preview | |
| st.markdown("---") | |
| if st.session_state.scraped_data and st.session_state.scraped_data.get('grants'): | |
| st.header("π Found Grant Data") | |
| with st.expander(f"π Preview Grant Data ({len(st.session_state.scraped_data['grants'])} grants)"): | |
| st.dataframe(st.session_state.scraped_data["grants"]) | |
| # Main area: Chat UI (shown if knowledge base is loaded) | |
| if st.session_state.get("chat_interface_active"): | |
| st.header("π¬ Chat with Grants Bot") | |
| query = st.text_input("Your question:", key="chat_input_main") | |
| if query: | |
| with st.spinner("Generating response..."): | |
| response = chat_with_knowledge_base(query) | |
| answer = response["answer"] if isinstance(response, dict) and "answer" in response else response | |
| st.session_state.chat_history.append({"query": query, "response": answer}) | |
| if st.session_state.chat_history: | |
| st.subheader("Chat History") | |
| for chat in reversed(st.session_state.chat_history): | |
| st.markdown( | |
| f"<div style='padding: 10px; border-radius: 5px; margin-bottom: 5px; background-color:#444444; color: white;'><strong>You:</strong> {chat['query']}</div>", | |
| unsafe_allow_html=True) | |
| st.markdown( | |
| f"<div style='padding: 10px; border-radius: 5px; margin-bottom: 10px; background-color:#007BFF; color: white;'><strong>Grants Bot:</strong> {chat['response']}</div>", | |
| unsafe_allow_html=True) | |
| else: | |
| st.info("β¬ οΈ Enter search terms or a URL in the sidebar and click the appropriate button to start searching.") | |
| st.sidebar.markdown("---") | |
| st.sidebar.markdown( | |
| """ | |
| <div style='text-align: center; font-size: 0.8em; color: grey;'> | |
| Powered by <a href="https://quantilytix.com" style='color: grey;'>Quantilytix</a> | © 2025 | |
| </div> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| if __name__ == "__main__": | |
| main() |