""" Klient HTTP do API PARP (Polska Agencja Rozwoju Przedsiębiorczości). Pobiera aktualne nabory dotacji i ich metadane. Źródła danych: - https://www.parp.gov.pl/component/grants/ (scraping jako fallback) - Oficjalne API PARP (jeżeli dostępne w środowisku) Cache: lokalne SQLite (domyślnie) z TTL 24h. """ import os import json import logging import hashlib from datetime import datetime, timedelta, timezone from typing import Optional from pathlib import Path import httpx logger = logging.getLogger(__name__) # Ścieżka cache pliku JSON (prosta, nie wymaga Redis) CACHE_DIR = Path(__file__).parent.parent / "cache" CACHE_DIR.mkdir(exist_ok=True) PARP_CACHE_FILE = CACHE_DIR / "parp_nabory.json" PARP_CACHE_TTL_HOURS = 4 # Znane URL-e do scrapingu (aktualizuj gdy PARP zmieni strukturę) PARP_BASE_URL = "https://www.parp.gov.pl" PARP_GRANTS_URL = f"{PARP_BASE_URL}/component/grants/?task=grants.grant_list&type=0" class PARPClient: """ Klient pobierający aktualne nabory z PARP. Używa cache z TTL 24h — pierwsze wywołanie pobiera, kolejne serwują z cache. """ def _load_cache(self) -> Optional[dict]: if not PARP_CACHE_FILE.exists(): return None try: with open(PARP_CACHE_FILE, "r", encoding="utf-8") as f: data = json.load(f) fetched_at = datetime.fromisoformat(data.get("fetched_at", "2000-01-01")) if fetched_at.tzinfo is None: fetched_at = fetched_at.replace(tzinfo=timezone.utc) if datetime.now(timezone.utc) - fetched_at < timedelta( hours=PARP_CACHE_TTL_HOURS ): logger.info( f"PARP cache hit — {len(data.get('nabory', []))} naborów z cache." ) return data logger.info("PARP cache wygasł — ponowne pobieranie.") except Exception as e: logger.warning(f"Błąd odczytu PARP cache: {e}") return None def _save_cache(self, nabory: list) -> None: try: payload = { "fetched_at": datetime.now(timezone.utc).isoformat(), "nabory": nabory, } with open(PARP_CACHE_FILE, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) logger.info(f"Zapisano {len(nabory)} naborów PARP do cache.") except Exception as e: logger.warning(f"Błąd zapisu PARP cache: {e}") async def _fetch_live(self) -> list: """ Pobiera aktualne nabory z bazy PARP w czasie rzeczywistym używając Firecrawl, aby ominąć zabezpieczenia (WAF). Zastępuje to dawne, ręcznie wpisane dane zastępcze. """ import os import requests from core.date_utils import filter_outdated_grants logger.info("Rozpoczynam pobieranie na żywo naborów PARP...") api_key = os.getenv("FIRECRAWL_API_KEY") all_grants = [] if api_key: logger.info("Używam Firecrawl do ominięcia zabezpieczeń PARP...") try: resp = requests.post( "https://api.firecrawl.dev/v1/scrape", headers={"Authorization": f"Bearer {api_key}"}, json={"url": PARP_GRANTS_URL, "formats": ["markdown"]}, timeout=30.0 ) if resp.status_code == 200: data = resp.json() md = data.get("data", {}).get("markdown", "") if md: all_grants = await self._parse_firecrawl_markdown(md) logger.info(f"Firecrawl zwrócił {len(all_grants)} naborów z PARP.") else: logger.warning(f"Błąd Firecrawl API (PARP): {resp.status_code} - {resp.text}") except Exception as e: logger.error(f"Wyjątek podczas wywołania Firecrawl API (PARP): {e}") else: logger.warning("Brak klucza FIRECRAWL_API_KEY. Zostanie użyty parser HTTPX (może zostać zablokowany).") if not all_grants: logger.info("Próba pobrania przez HTTPX / BeautifulSoup...") try: async with httpx.AsyncClient(timeout=15.0, follow_redirects=True, verify=False) as client: headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"} response = await client.get(PARP_GRANTS_URL, headers=headers) if response.status_code == 200: all_grants = self._parse_html(response.text) logger.info(f"HTTPX zwrócił {len(all_grants)} naborów ze strony PARP.") else: logger.warning(f"Serwer PARP odrzucił połączenie: {response.status_code}") except Exception as e: logger.error(f"Błąd HTTPX (PARP): {e}") # Filtrowanie przestarzałych dat (usunięcie historycznych) active_grants = filter_outdated_grants(all_grants) return active_grants def _parse_html(self, html: str) -> list: """Parsuje surowe HTML z PARP (uproszczony parser).""" try: from bs4 import BeautifulSoup soup = BeautifulSoup(html, "html.parser") nabory = [] for item in soup.select(".grant-item, .grants-list__item")[:20]: title_el = item.select_one("h3, .grant-title, a") title = title_el.get_text(strip=True) if title_el else "Nieznany nabór" link_el = item.select_one("a[href]") if link_el: href = link_el["href"] url = href if href.startswith("http") else (PARP_BASE_URL + href if href.startswith("/") else PARP_BASE_URL + "/" + href) else: url = PARP_BASE_URL uid = hashlib.md5(title.encode()).hexdigest()[:12] nabory.append( { "id": uid, "name": title, "program": "PARP", "status": "active", "url": url, "source": "parp_scrape", "fetched_at": datetime.now(timezone.utc).isoformat(), } ) return nabory except Exception as e: logger.warning(f"HTML parse error: {e}") return [] async def _parse_firecrawl_markdown(self, md: str) -> list: """Skanuje markdown za pomocą LLM w celu wydobycia listy naborów.""" try: from core.llm_router import get_llm from pydantic import BaseModel, Field from typing import List class Grant(BaseModel): name: str = Field(description="Tytuł naboru/grantu/konkursu") url: str = Field(description="Adres URL do naboru, jeśli podany w markdown. Pozostaw puste jeśli brak.") deadline: str = Field(default="", description="Termin składania wniosków (deadline) w formacie YYYY-MM-DD. Jeśli podano tylko do kiedy, zgadnij datę. Jeśli brak, zostaw puste.") class GrantsList(BaseModel): grants: List[Grant] llm = get_llm("fast").with_structured_output(GrantsList) md_subset = md[:10000] # Limiting to prevent token bloat prompt = f"Wydobądź listę aktualnych naborów lub programów dotacyjnych z poniższego tekstu Markdown:\n\n{md_subset}" result = await llm.ainvoke(prompt) nabory = [] for g in result.grants: uid = hashlib.md5(g.name.encode()).hexdigest()[:12] if g.url and g.url.startswith("http"): url = g.url elif g.url and g.url.startswith("/"): url = PARP_BASE_URL + g.url else: url = PARP_GRANTS_URL nabory.append({ "id": uid, "name": g.name, "program": "PARP", "status": "active", "url": url, "deadline": g.deadline, "source": "parp_scrape", "fetched_at": datetime.now(timezone.utc).isoformat(), }) return nabory except Exception as e: logger.warning(f"Błąd parsowania markdowna z LLM (PARP): {e}") return [] def _enrich_urls(self, nabory: list) -> None: import urllib.parse for n in nabory: q_eur = n.get("program") or n.get("name", "") q_gov = n.get("name", "") if "eurlex_url" not in n: n["eurlex_url"] = f"https://eur-lex.europa.eu/search.html?scope=EURLEX&text={urllib.parse.quote(q_eur)}&lang=pl&type=quick" if "official_doc_url" not in n: n["official_doc_url"] = f"https://www.funduszeeuropejskie.gov.pl/wyszukiwarka/mikro-male-i-srednie-przedsiebiorstwa/#/szukaj?search={urllib.parse.quote(q_gov)}" async def get_active_nabory(self, force_refresh: bool = False) -> list: """ Główna metoda — zwraca listę aktywnych naborów. Parametr force_refresh=True wymusza pominięcie cache. """ if not force_refresh: cached = self._load_cache() if cached: nabory = cached["nabory"] self._enrich_urls(nabory) return nabory nabory = await self._fetch_live() self._enrich_urls(nabory) self._save_cache(nabory) return nabory async def get_nabor_by_id(self, nabor_id: str) -> Optional[dict]: """Pobiera szczegóły konkretnego naboru po ID.""" nabory = await self.get_active_nabory() return next((n for n in nabory if n["id"] == nabor_id), None) async def match_for_project(self, project_data: dict) -> list: """ Dopasowuje aktualne nabory do profilu projektu. Zwraca posortowaną listę z wynikiem match %. """ nabory = await self.get_active_nabory() results = [] company_size = project_data.get("company_size", "").lower() region = project_data.get("region", "").lower() description = ( project_data.get("description", "") + " " + project_data.get("title", "") ).lower() for n in nabory: score = 0 reasons = [] # Wielkość firmy eligible_sizes = [s.lower() for s in n.get("eligible_company_sizes", [])] if ( not eligible_sizes or company_size in eligible_sizes or "mśp" in eligible_sizes ): score += 30 reasons.append("Twoja wielkość firmy kwalifikuje się.") # Region eligible_regions = [r.lower() for r in n.get("eligible_regions", [])] if ( not eligible_regions or "cała polska" in eligible_regions or region in eligible_regions ): score += 25 reasons.append("Twój region jest obsługiwany.") # Słowa kluczowe z opisu keywords = [ "innowacja", "b+r", "cyfryzacja", "automatyzacja", "export", "startup", "ekologia", "zazielenienie", "ai", "maszyna", ] matched_kw = [k for k in keywords if k in description] kw_score = min(45, len(matched_kw) * 10) score += kw_score if matched_kw: reasons.append(f"Słowa kluczowe pasują: {', '.join(matched_kw[:3])}") results.append( { **n, "match_score": min(100, score), "match_reasons": reasons, } ) return sorted(results, key=lambda x: x["match_score"], reverse=True) # Singleton parp_client = PARPClient()