Spaces:
Running
Running
| """ | |
| Klient HTTP do API PARP (Polska Agencja Rozwoju Przedsiębiorczości). | |
| Pobiera aktualne nabory dotacji i ich metadane. | |
| Źródła danych: | |
| - https://www.parp.gov.pl/component/grants/ (scraping jako fallback) | |
| - Oficjalne API PARP (jeżeli dostępne w środowisku) | |
| Cache: lokalne SQLite (domyślnie) z TTL 24h. | |
| """ | |
| import os | |
| import json | |
| import logging | |
| import hashlib | |
| from datetime import datetime, timedelta, timezone | |
| from typing import Optional | |
| from pathlib import Path | |
| import httpx | |
| logger = logging.getLogger(__name__) | |
| # Ścieżka cache pliku JSON (prosta, nie wymaga Redis) | |
| CACHE_DIR = Path(__file__).parent.parent / "cache" | |
| CACHE_DIR.mkdir(exist_ok=True) | |
| PARP_CACHE_FILE = CACHE_DIR / "parp_nabory.json" | |
| PARP_CACHE_TTL_HOURS = 4 | |
| # Znane URL-e do scrapingu (aktualizuj gdy PARP zmieni strukturę) | |
| PARP_BASE_URL = "https://www.parp.gov.pl" | |
| PARP_GRANTS_URL = f"{PARP_BASE_URL}/component/grants/?task=grants.grant_list&type=0" | |
| class PARPClient: | |
| """ | |
| Klient pobierający aktualne nabory z PARP. | |
| Używa cache z TTL 24h — pierwsze wywołanie pobiera, kolejne serwują z cache. | |
| """ | |
| def _load_cache(self) -> Optional[dict]: | |
| if not PARP_CACHE_FILE.exists(): | |
| return None | |
| try: | |
| with open(PARP_CACHE_FILE, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| fetched_at = datetime.fromisoformat(data.get("fetched_at", "2000-01-01")) | |
| if fetched_at.tzinfo is None: | |
| fetched_at = fetched_at.replace(tzinfo=timezone.utc) | |
| if datetime.now(timezone.utc) - fetched_at < timedelta( | |
| hours=PARP_CACHE_TTL_HOURS | |
| ): | |
| logger.info( | |
| f"PARP cache hit — {len(data.get('nabory', []))} naborów z cache." | |
| ) | |
| return data | |
| logger.info("PARP cache wygasł — ponowne pobieranie.") | |
| except Exception as e: | |
| logger.warning(f"Błąd odczytu PARP cache: {e}") | |
| return None | |
| def _save_cache(self, nabory: list) -> None: | |
| try: | |
| payload = { | |
| "fetched_at": datetime.now(timezone.utc).isoformat(), | |
| "nabory": nabory, | |
| } | |
| with open(PARP_CACHE_FILE, "w", encoding="utf-8") as f: | |
| json.dump(payload, f, ensure_ascii=False, indent=2) | |
| logger.info(f"Zapisano {len(nabory)} naborów PARP do cache.") | |
| except Exception as e: | |
| logger.warning(f"Błąd zapisu PARP cache: {e}") | |
| async def _fetch_live(self) -> list: | |
| """ | |
| Pobiera aktualne nabory z bazy PARP w czasie rzeczywistym używając Firecrawl, | |
| aby ominąć zabezpieczenia (WAF). Zastępuje to dawne, ręcznie wpisane dane zastępcze. | |
| """ | |
| import os | |
| import requests | |
| from core.date_utils import filter_outdated_grants | |
| logger.info("Rozpoczynam pobieranie na żywo naborów PARP...") | |
| api_key = os.getenv("FIRECRAWL_API_KEY") | |
| all_grants = [] | |
| if api_key: | |
| logger.info("Używam Firecrawl do ominięcia zabezpieczeń PARP...") | |
| try: | |
| resp = requests.post( | |
| "https://api.firecrawl.dev/v1/scrape", | |
| headers={"Authorization": f"Bearer {api_key}"}, | |
| json={"url": PARP_GRANTS_URL, "formats": ["markdown"]}, | |
| timeout=30.0 | |
| ) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| md = data.get("data", {}).get("markdown", "") | |
| if md: | |
| all_grants = await self._parse_firecrawl_markdown(md) | |
| logger.info(f"Firecrawl zwrócił {len(all_grants)} naborów z PARP.") | |
| else: | |
| logger.warning(f"Błąd Firecrawl API (PARP): {resp.status_code} - {resp.text}") | |
| except Exception as e: | |
| logger.error(f"Wyjątek podczas wywołania Firecrawl API (PARP): {e}") | |
| else: | |
| logger.warning("Brak klucza FIRECRAWL_API_KEY. Zostanie użyty parser HTTPX (może zostać zablokowany).") | |
| if not all_grants: | |
| logger.info("Próba pobrania przez HTTPX / BeautifulSoup...") | |
| try: | |
| async with httpx.AsyncClient(timeout=15.0, follow_redirects=True, verify=False) as client: | |
| headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"} | |
| response = await client.get(PARP_GRANTS_URL, headers=headers) | |
| if response.status_code == 200: | |
| all_grants = self._parse_html(response.text) | |
| logger.info(f"HTTPX zwrócił {len(all_grants)} naborów ze strony PARP.") | |
| else: | |
| logger.warning(f"Serwer PARP odrzucił połączenie: {response.status_code}") | |
| except Exception as e: | |
| logger.error(f"Błąd HTTPX (PARP): {e}") | |
| # Filtrowanie przestarzałych dat (usunięcie historycznych) | |
| active_grants = filter_outdated_grants(all_grants) | |
| return active_grants | |
| def _parse_html(self, html: str) -> list: | |
| """Parsuje surowe HTML z PARP (uproszczony parser).""" | |
| try: | |
| from bs4 import BeautifulSoup | |
| soup = BeautifulSoup(html, "html.parser") | |
| nabory = [] | |
| for item in soup.select(".grant-item, .grants-list__item")[:20]: | |
| title_el = item.select_one("h3, .grant-title, a") | |
| title = title_el.get_text(strip=True) if title_el else "Nieznany nabór" | |
| link_el = item.select_one("a[href]") | |
| if link_el: | |
| href = link_el["href"] | |
| url = href if href.startswith("http") else (PARP_BASE_URL + href if href.startswith("/") else PARP_BASE_URL + "/" + href) | |
| else: | |
| url = PARP_BASE_URL | |
| uid = hashlib.md5(title.encode()).hexdigest()[:12] | |
| nabory.append( | |
| { | |
| "id": uid, | |
| "name": title, | |
| "program": "PARP", | |
| "status": "active", | |
| "url": url, | |
| "source": "parp_scrape", | |
| "fetched_at": datetime.now(timezone.utc).isoformat(), | |
| } | |
| ) | |
| return nabory | |
| except Exception as e: | |
| logger.warning(f"HTML parse error: {e}") | |
| return [] | |
| async def _parse_firecrawl_markdown(self, md: str) -> list: | |
| """Skanuje markdown za pomocą LLM w celu wydobycia listy naborów.""" | |
| try: | |
| from core.llm_router import get_llm | |
| from pydantic import BaseModel, Field | |
| from typing import List | |
| class Grant(BaseModel): | |
| name: str = Field(description="Tytuł naboru/grantu/konkursu") | |
| url: str = Field(description="Adres URL do naboru, jeśli podany w markdown. Pozostaw puste jeśli brak.") | |
| deadline: str = Field(default="", description="Termin składania wniosków (deadline) w formacie YYYY-MM-DD. Jeśli podano tylko do kiedy, zgadnij datę. Jeśli brak, zostaw puste.") | |
| class GrantsList(BaseModel): | |
| grants: List[Grant] | |
| llm = get_llm("fast").with_structured_output(GrantsList) | |
| md_subset = md[:10000] # Limiting to prevent token bloat | |
| prompt = f"Wydobądź listę aktualnych naborów lub programów dotacyjnych z poniższego tekstu Markdown:\n\n{md_subset}" | |
| result = await llm.ainvoke(prompt) | |
| nabory = [] | |
| for g in result.grants: | |
| uid = hashlib.md5(g.name.encode()).hexdigest()[:12] | |
| if g.url and g.url.startswith("http"): | |
| url = g.url | |
| elif g.url and g.url.startswith("/"): | |
| url = PARP_BASE_URL + g.url | |
| else: | |
| url = PARP_GRANTS_URL | |
| nabory.append({ | |
| "id": uid, | |
| "name": g.name, | |
| "program": "PARP", | |
| "status": "active", | |
| "url": url, | |
| "deadline": g.deadline, | |
| "source": "parp_scrape", | |
| "fetched_at": datetime.now(timezone.utc).isoformat(), | |
| }) | |
| return nabory | |
| except Exception as e: | |
| logger.warning(f"Błąd parsowania markdowna z LLM (PARP): {e}") | |
| return [] | |
| def _enrich_urls(self, nabory: list) -> None: | |
| import urllib.parse | |
| for n in nabory: | |
| q_eur = n.get("program") or n.get("name", "") | |
| q_gov = n.get("name", "") | |
| if "eurlex_url" not in n: | |
| n["eurlex_url"] = f"https://eur-lex.europa.eu/search.html?scope=EURLEX&text={urllib.parse.quote(q_eur)}&lang=pl&type=quick" | |
| if "official_doc_url" not in n: | |
| n["official_doc_url"] = f"https://www.funduszeeuropejskie.gov.pl/wyszukiwarka/mikro-male-i-srednie-przedsiebiorstwa/#/szukaj?search={urllib.parse.quote(q_gov)}" | |
| async def get_active_nabory(self, force_refresh: bool = False) -> list: | |
| """ | |
| Główna metoda — zwraca listę aktywnych naborów. | |
| Parametr force_refresh=True wymusza pominięcie cache. | |
| """ | |
| if not force_refresh: | |
| cached = self._load_cache() | |
| if cached: | |
| nabory = cached["nabory"] | |
| self._enrich_urls(nabory) | |
| return nabory | |
| nabory = await self._fetch_live() | |
| self._enrich_urls(nabory) | |
| self._save_cache(nabory) | |
| return nabory | |
| async def get_nabor_by_id(self, nabor_id: str) -> Optional[dict]: | |
| """Pobiera szczegóły konkretnego naboru po ID.""" | |
| nabory = await self.get_active_nabory() | |
| return next((n for n in nabory if n["id"] == nabor_id), None) | |
| async def match_for_project(self, project_data: dict) -> list: | |
| """ | |
| Dopasowuje aktualne nabory do profilu projektu. | |
| Zwraca posortowaną listę z wynikiem match %. | |
| """ | |
| nabory = await self.get_active_nabory() | |
| results = [] | |
| company_size = project_data.get("company_size", "").lower() | |
| region = project_data.get("region", "").lower() | |
| description = ( | |
| project_data.get("description", "") + " " + project_data.get("title", "") | |
| ).lower() | |
| for n in nabory: | |
| score = 0 | |
| reasons = [] | |
| # Wielkość firmy | |
| eligible_sizes = [s.lower() for s in n.get("eligible_company_sizes", [])] | |
| if ( | |
| not eligible_sizes | |
| or company_size in eligible_sizes | |
| or "mśp" in eligible_sizes | |
| ): | |
| score += 30 | |
| reasons.append("Twoja wielkość firmy kwalifikuje się.") | |
| # Region | |
| eligible_regions = [r.lower() for r in n.get("eligible_regions", [])] | |
| if ( | |
| not eligible_regions | |
| or "cała polska" in eligible_regions | |
| or region in eligible_regions | |
| ): | |
| score += 25 | |
| reasons.append("Twój region jest obsługiwany.") | |
| # Słowa kluczowe z opisu | |
| keywords = [ | |
| "innowacja", | |
| "b+r", | |
| "cyfryzacja", | |
| "automatyzacja", | |
| "export", | |
| "startup", | |
| "ekologia", | |
| "zazielenienie", | |
| "ai", | |
| "maszyna", | |
| ] | |
| matched_kw = [k for k in keywords if k in description] | |
| kw_score = min(45, len(matched_kw) * 10) | |
| score += kw_score | |
| if matched_kw: | |
| reasons.append(f"Słowa kluczowe pasują: {', '.join(matched_kw[:3])}") | |
| results.append( | |
| { | |
| **n, | |
| "match_score": min(100, score), | |
| "match_reasons": reasons, | |
| } | |
| ) | |
| return sorted(results, key=lambda x: x["match_score"], reverse=True) | |
| # Singleton | |
| parp_client = PARPClient() | |