import streamlit as st import pandas as pd import base64 import json from scrapegraphai.graphs import SearchGraph import nest_asyncio import os import subprocess import io import time import urllib.parse import asyncio from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings from langchain.vectorstores import FAISS from langchain.text_splitter import CharacterTextSplitter from langchain.chains import ConversationalRetrievalChain from langchain.memory import ConversationBufferMemory from google import genai from google.genai import types from langchain_community.document_loaders import PlaywrightURLLoader import requests # Import Supadata and initialize the client from supadata import Supadata, SupadataError # Import Crawl4AI from crawl4ai import AsyncWebCrawler SUPADATA_API_KEY = os.getenv("SUPADATA") supadata = Supadata(api_key=SUPADATA_API_KEY) # Ensure Playwright installs required browsers and dependencies subprocess.run(["playwright", "install"]) nest_asyncio.apply() GOOGLE_API_KEY = os.environ["GOOGLE_API_KEY"] graph_config = { "llm": { "api_key": GOOGLE_API_KEY, "model": "google_genai/gemini-2.0-flash-lite", }, "max_results": 8, "verbose": True, "headless": True } def get_data(search_term): """ Run the SearchGraph for a given search term. If a rate-limit error (202) occurs, wait 10 seconds and retry. If no results are returned or an error persists, notify the user. """ full_prompt = ( f"search for {search_term} grants\n\n" "List me all grants or funds with:\n" "- Grant name/title\n" "- Short summary \n" "- Funding organization\n" "- Grant value (numeric only)\n" "- Application deadline\n" "- Eligible countries\n" "- Sector/field\n" "- Eligibility criteria\n" "Return in JSON format." ) try: search_graph = SearchGraph( prompt=full_prompt, config=graph_config, ) result = search_graph.run() if not result or not result.get("grants"): st.error(f"No results returned for {search_term}. Please try again with a different search term.") return {} return result except Exception as e: err_str = str(e) if "202" in err_str: st.warning("Rate limit reached (202). Waiting 10 seconds before retrying...") time.sleep(10) try: search_graph = SearchGraph( prompt=full_prompt, config=graph_config, ) result = search_graph.run() if not result or not result.get("grants"): st.error(f"No results returned for {search_term}. Please try again with a different search term.") return {} return result except Exception as e2: st.error(f"Retry failed: {e2}. Please try again later.") return {} else: st.error(f"An error occurred for search term: {search_term}, error: {e}. Please try again.") return {} SUPADATA_API_KEY = os.getenv("SUPADATA") def get_data_from_url(url, scraping_tool="supadata"): """ Scrape the provided URL using the selected scraping tool. Args: url: The URL to scrape scraping_tool: Either "supadata", "crawl4ai", or "playwright" Returns: Dictionary containing the extracted grant data """ page_content = None # Placeholder for storing scraped page content # Choose the scraping method based on the selected tool if scraping_tool == "crawl4ai": try: # Use Crawl4AI for scraping async def run_crawler(): async with AsyncWebCrawler() as crawler: result = await crawler.arun(url=url) return result.markdown # Run the async crawler in a synchronous context loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) page_content = loop.run_until_complete(run_crawler()) loop.close() st.success("Successfully scraped using Crawl4AI") except Exception as e: st.error(f"Error using Crawl4AI: {e}") # Fall back to Supadata if Crawl4AI fails st.warning("Falling back to Supadata scraper...") scraping_tool = "supadata" if scraping_tool == "playwright": try: loader = PlaywrightURLLoader(urls=[url], remove_selectors=["header", "footer"]) data = loader.aload() page_content = data[0].page_content if data else "" st.success("Successfully scraped using Playwright") except Exception as e: st.error(f"Error using Playwright: {e}") # Fall back to Supadata if Playwright fails st.warning("Falling back to Supadata scraper...") scraping_tool = "supadata" if scraping_tool == "supadata": # **Step 1: Attempt Supadata's Built-in Scraper** try: web_content = supadata.web.scrape(url) page_content = web_content.content st.success("Successfully scraped using Supadata built-in scraper") except TypeError as te: if "unexpected keyword argument 'type'" in str(te): st.warning("Falling back to Supadata API due to unexpected keyword 'type' error.") else: st.error(f"Unexpected error in Supadata scrape: {te}") # **Step 2: If Supadata's Built-in Scraper Fails, Use Supadata API** if not page_content: try: api_url = "https://api.supadata.ai/v1/web/scrape" headers = {"X-API-Key": SUPADATA_API_KEY} response = requests.get(api_url, headers=headers, params={"url": url}) if response.status_code == 200: page_content = response.json().get("content", "") st.success("Successfully scraped using Supadata API") else: st.error(f"Supadata API failed with status {response.status_code}") except Exception as e: st.error(f"Error calling Supadata API: {e}") # **Step 3: If Supadata API Fails, Use Direct Web Request** if not page_content: try: r = requests.get(url, timeout=10) if r.status_code == 200: page_content = r.text st.success("Successfully retrieved content with direct request") else: st.error(f"Manual scraping failed with status code {r.status_code}") return {} except Exception as e: st.error(f"Manual scraping error: {e}") return {} # If we still don't have content after all attempts if not page_content: st.error("Failed to retrieve content from the URL with all available methods") return {} # **Pass Content to Gemini AI** full_prompt = ( "Extract the following grant data from the provided web content. " "- Grant name/title\n" "- Short summary\n" "- Funding organization\n" "- Grant value (numeric only)\n" "- Application deadline\n" "- Eligible countries\n" "- Sector/field\n" "- Eligibility criteria\n" "Return in JSON format.\n\n" f"Web content: {page_content}" ) client = genai.Client(api_key=GOOGLE_API_KEY) new_answer = client.models.generate_content( model="models/gemini-2.0-flash-lite", contents=f"{full_prompt}, return the json string and nothing else" ) response = new_answer.text # **Extract JSON Output from Gemini** try: start_index = response.find('[') end_index = response.rfind(']') + 1 json_string = response[start_index:end_index] result = json.loads(json_string) except Exception as parse_error: st.error(f"Error parsing JSON from Gemini model response. Response: {response}") return {} # **Ensure JSON is Wrapped Correctly** if isinstance(result, list): result = {"grants": result} if not result.get("grants"): st.error("No grant opportunities found in the scraped URL.") return {} st.success(f"First grant opportunity: {result['grants'][0]}") return result def process_multiple_search_terms(search_terms): """ Process multiple search terms with progress tracking. Returns a dictionary with a 'grants' key containing combined results. """ all_data = {"grants": []} progress_bar = st.progress(0) status_container = st.empty() total_terms = len(search_terms) for index, term in enumerate(search_terms): term = term.strip() if not term: continue progress = (index + 1) / total_terms progress_bar.progress(progress) status_container.markdown( f""" **Processing Grant Opportunities** 🚀 Searching term {index+1} of {total_terms}: `{term}`

Completed: {index}/{total_terms} | Remaining: {total_terms - index - 1}

""", unsafe_allow_html=True, ) result = get_data(term) if result and result.get("grants"): all_data["grants"].extend(result["grants"]) progress_bar.empty() status_container.empty() if not all_data["grants"]: st.error("No grant opportunities were found. Please try again with different search terms.") return all_data def convert_to_csv(data): df = pd.DataFrame(data["grants"]) return df.to_csv(index=False).encode("utf-8") def convert_to_excel(data): df = pd.DataFrame(data["grants"]) buffer = io.BytesIO() with pd.ExcelWriter(buffer, engine="xlsxwriter") as writer: df.to_excel(writer, sheet_name="Grants", index=False) return buffer.getvalue() def create_knowledge_base(data): # Store JSON representation of data in session state st.session_state.knowledge_base_json = json.dumps(data, indent=2) def chat_with_knowledge_base(query): if "knowledge_base_json" not in st.session_state: return "Knowledge base not initialized. Please load grant data first." context = st.session_state.knowledge_base_json prompt = f""" You are an AI assistant that helps users analyze grant opportunities. Here is the extracted grant data in JSON format: {context} User's question: {query} Answer the question based on the provided grant data. """ llm = ChatGoogleGenerativeAI( model="gemini-2.0-flash-thinking-exp", google_api_key=GOOGLE_API_KEY, temperature=0 ) response = llm.invoke(prompt) return response.content def get_shareable_link(file_data, file_name, file_type): b64 = base64.b64encode(file_data).decode() return f"data:{file_type};base64,{b64}" def main(): st.set_page_config(page_title="Quantilytix Grant Finder", page_icon="💰", layout="wide") st.title("💰 Quantilytix Grant Finder") st.markdown("""

Welcome to Quantilytix Grant Finder, an AI-powered platform designed to streamline the grant discovery process, especially for academics and researchers across the globe.

""", unsafe_allow_html=True) # Sidebar controls st.sidebar.image("logoqb.jpeg", use_container_width=True) st.sidebar.header("Scrape & Configure") if "scraped_data" not in st.session_state: st.session_state.scraped_data = None if "chat_history" not in st.session_state: st.session_state.chat_history = [] if "chat_interface_active" not in st.session_state: st.session_state.chat_interface_active = False # Sidebar: Input Type Selection input_type = st.sidebar.radio( "Select Input Type:", ("Search Query", "URL"), key="input_type_selector" ) # Sidebar: Input field based on selection if input_type == "Search Query": search_input = st.sidebar.text_area( "Enter Search Terms (one per line). Maximum 2", height=150, help="Input search terms to discover grant opportunities. Terms can be specific or generic.", placeholder="e.g.,\nRenewable energy \nclimate change research\nAgriculture in Africa" ) else: url_input = st.sidebar.text_input( "Enter URL to scrape for grant opportunities", placeholder="https://example.com/grants" ) # Scraping tool selector scraping_tool = st.sidebar.radio( "Select Scraping Tool:", ("Supadata", "Crawl4AI", "Playwright"), key="scraping_tool_selector" ) # Execute based on input type selection if input_type == "Search Query": if st.sidebar.button("🔍 Get Grant Opportunities"): if search_input: search_terms = [term.strip() for term in search_input.split("\n") if term.strip()] if search_terms: with st.spinner("Searching in progress... Please wait patiently."): result = process_multiple_search_terms(search_terms) st.session_state.scraped_data = result if result.get("grants"): st.sidebar.success(f"✅ Found {len(result['grants'])} grant opportunities from {len(search_terms)} search terms!") else: st.sidebar.warning("⚠️ Please enter valid search terms.") else: st.sidebar.warning("⚠️ Please enter at least one search term to begin.") else: # URL input if st.sidebar.button("🔍 Scrape URL for Grant Opportunities"): if url_input: with st.spinner(f"Scraping URL using {scraping_tool}... Please wait patiently."): result = get_data_from_url(url_input, scraping_tool.lower()) st.session_state.scraped_data = result if result.get("grants"): st.sidebar.success(f"✅ Found {len(result['grants'])} grant opportunities from the URL!") else: st.sidebar.warning("⚠️ Please enter a valid URL to scrape.") # Sidebar: Download & Share Controls if st.session_state.scraped_data and st.session_state.scraped_data.get('grants'): st.sidebar.markdown("---") st.sidebar.subheader("Download & Share") selected_format = st.sidebar.selectbox("Download As:", ("CSV", "Excel"), key="download_format_selector") if selected_format == "CSV": file_data = convert_to_csv(st.session_state.scraped_data) file_name = "grants_data.csv" file_type = "text/csv" else: file_data = convert_to_excel(st.session_state.scraped_data) file_name = "grants_data.xlsx" file_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" download_link_html = f"" st.sidebar.markdown(download_link_html, unsafe_allow_html=True) shareable_link = get_shareable_link(file_data, file_name, file_type) whatsapp_url = f"https://api.whatsapp.com/send?text={urllib.parse.quote(f'Check out these grant opportunities: {shareable_link}')}" email_subject = urllib.parse.quote("Grant Opportunities File") email_body = urllib.parse.quote(f"Download the grant opportunities file here: {shareable_link}") email_url = f"mailto:?subject={email_subject}&body={email_body}" st.sidebar.markdown("
Share via:
", unsafe_allow_html=True) st.sidebar.markdown(f"📱 [WhatsApp]({whatsapp_url}) | 📧 [Email]({email_url})", unsafe_allow_html=True) # Sidebar: Load as Knowledge Base & Chat if st.sidebar.button("🧠 Load as Knowledge Base & Chat"): with st.spinner("Loading data into knowledge base..."): create_knowledge_base(st.session_state.scraped_data) st.session_state.chat_interface_active = True st.session_state.chat_history = [] st.sidebar.success("Knowledge base loaded!") # Main area: Data Preview st.markdown("---") if st.session_state.scraped_data and st.session_state.scraped_data.get('grants'): st.header("📊 Found Grant Data") with st.expander(f"📊 Preview Grant Data ({len(st.session_state.scraped_data['grants'])} grants)"): st.dataframe(st.session_state.scraped_data["grants"]) # Main area: Chat UI (shown if knowledge base is loaded) if st.session_state.get("chat_interface_active"): st.header("💬 Chat with Grants Bot") query = st.text_input("Your question:", key="chat_input_main") if query: with st.spinner("Generating response..."): response = chat_with_knowledge_base(query) answer = response["answer"] if isinstance(response, dict) and "answer" in response else response st.session_state.chat_history.append({"query": query, "response": answer}) if st.session_state.chat_history: st.subheader("Chat History") for chat in reversed(st.session_state.chat_history): st.markdown( f"
You: {chat['query']}
", unsafe_allow_html=True) st.markdown( f"
Grants Bot: {chat['response']}
", unsafe_allow_html=True) else: st.info("⬅️ Enter search terms or a URL in the sidebar and click the appropriate button to start searching.") st.sidebar.markdown("---") st.sidebar.markdown( """
Powered by Quantilytix | © 2025
""", unsafe_allow_html=True, ) if __name__ == "__main__": main()