grantforge-api / backend /core /parp_client.py
GrantForge Bot
Deploy to Hugging Face
afd56bc
"""
Klient HTTP do API PARP (Polska Agencja Rozwoju Przedsiębiorczości).
Pobiera aktualne nabory dotacji i ich metadane.
Źródła danych:
- https://www.parp.gov.pl/component/grants/ (scraping jako fallback)
- Oficjalne API PARP (jeżeli dostępne w środowisku)
Cache: lokalne SQLite (domyślnie) z TTL 24h.
"""
import os
import json
import logging
import hashlib
from datetime import datetime, timedelta, timezone
from typing import Optional
from pathlib import Path
import httpx
logger = logging.getLogger(__name__)
# Ścieżka cache pliku JSON (prosta, nie wymaga Redis)
CACHE_DIR = Path(__file__).parent.parent / "cache"
CACHE_DIR.mkdir(exist_ok=True)
PARP_CACHE_FILE = CACHE_DIR / "parp_nabory.json"
PARP_CACHE_TTL_HOURS = 4
# Znane URL-e do scrapingu (aktualizuj gdy PARP zmieni strukturę)
PARP_BASE_URL = "https://www.parp.gov.pl"
PARP_GRANTS_URL = f"{PARP_BASE_URL}/component/grants/?task=grants.grant_list&type=0"
class PARPClient:
"""
Klient pobierający aktualne nabory z PARP.
Używa cache z TTL 24h — pierwsze wywołanie pobiera, kolejne serwują z cache.
"""
def _load_cache(self) -> Optional[dict]:
if not PARP_CACHE_FILE.exists():
return None
try:
with open(PARP_CACHE_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
fetched_at = datetime.fromisoformat(data.get("fetched_at", "2000-01-01"))
if fetched_at.tzinfo is None:
fetched_at = fetched_at.replace(tzinfo=timezone.utc)
if datetime.now(timezone.utc) - fetched_at < timedelta(
hours=PARP_CACHE_TTL_HOURS
):
logger.info(
f"PARP cache hit — {len(data.get('nabory', []))} naborów z cache."
)
return data
logger.info("PARP cache wygasł — ponowne pobieranie.")
except Exception as e:
logger.warning(f"Błąd odczytu PARP cache: {e}")
return None
def _save_cache(self, nabory: list) -> None:
try:
payload = {
"fetched_at": datetime.now(timezone.utc).isoformat(),
"nabory": nabory,
}
with open(PARP_CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
logger.info(f"Zapisano {len(nabory)} naborów PARP do cache.")
except Exception as e:
logger.warning(f"Błąd zapisu PARP cache: {e}")
async def _fetch_live(self) -> list:
"""
Pobiera aktualne nabory z bazy PARP w czasie rzeczywistym używając Firecrawl,
aby ominąć zabezpieczenia (WAF). Zastępuje to dawne, ręcznie wpisane dane zastępcze.
"""
import os
import requests
from core.date_utils import filter_outdated_grants
logger.info("Rozpoczynam pobieranie na żywo naborów PARP...")
api_key = os.getenv("FIRECRAWL_API_KEY")
all_grants = []
if api_key:
logger.info("Używam Firecrawl do ominięcia zabezpieczeń PARP...")
try:
resp = requests.post(
"https://api.firecrawl.dev/v1/scrape",
headers={"Authorization": f"Bearer {api_key}"},
json={"url": PARP_GRANTS_URL, "formats": ["markdown"]},
timeout=30.0
)
if resp.status_code == 200:
data = resp.json()
md = data.get("data", {}).get("markdown", "")
if md:
all_grants = await self._parse_firecrawl_markdown(md)
logger.info(f"Firecrawl zwrócił {len(all_grants)} naborów z PARP.")
else:
logger.warning(f"Błąd Firecrawl API (PARP): {resp.status_code} - {resp.text}")
except Exception as e:
logger.error(f"Wyjątek podczas wywołania Firecrawl API (PARP): {e}")
else:
logger.warning("Brak klucza FIRECRAWL_API_KEY. Zostanie użyty parser HTTPX (może zostać zablokowany).")
if not all_grants:
logger.info("Próba pobrania przez HTTPX / BeautifulSoup...")
try:
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True, verify=False) as client:
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"}
response = await client.get(PARP_GRANTS_URL, headers=headers)
if response.status_code == 200:
all_grants = self._parse_html(response.text)
logger.info(f"HTTPX zwrócił {len(all_grants)} naborów ze strony PARP.")
else:
logger.warning(f"Serwer PARP odrzucił połączenie: {response.status_code}")
except Exception as e:
logger.error(f"Błąd HTTPX (PARP): {e}")
# Filtrowanie przestarzałych dat (usunięcie historycznych)
active_grants = filter_outdated_grants(all_grants)
return active_grants
def _parse_html(self, html: str) -> list:
"""Parsuje surowe HTML z PARP (uproszczony parser)."""
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(html, "html.parser")
nabory = []
for item in soup.select(".grant-item, .grants-list__item")[:20]:
title_el = item.select_one("h3, .grant-title, a")
title = title_el.get_text(strip=True) if title_el else "Nieznany nabór"
link_el = item.select_one("a[href]")
if link_el:
href = link_el["href"]
url = href if href.startswith("http") else (PARP_BASE_URL + href if href.startswith("/") else PARP_BASE_URL + "/" + href)
else:
url = PARP_BASE_URL
uid = hashlib.md5(title.encode()).hexdigest()[:12]
nabory.append(
{
"id": uid,
"name": title,
"program": "PARP",
"status": "active",
"url": url,
"source": "parp_scrape",
"fetched_at": datetime.now(timezone.utc).isoformat(),
}
)
return nabory
except Exception as e:
logger.warning(f"HTML parse error: {e}")
return []
async def _parse_firecrawl_markdown(self, md: str) -> list:
"""Skanuje markdown za pomocą LLM w celu wydobycia listy naborów."""
try:
from core.llm_router import get_llm
from pydantic import BaseModel, Field
from typing import List
class Grant(BaseModel):
name: str = Field(description="Tytuł naboru/grantu/konkursu")
url: str = Field(description="Adres URL do naboru, jeśli podany w markdown. Pozostaw puste jeśli brak.")
deadline: str = Field(default="", description="Termin składania wniosków (deadline) w formacie YYYY-MM-DD. Jeśli podano tylko do kiedy, zgadnij datę. Jeśli brak, zostaw puste.")
class GrantsList(BaseModel):
grants: List[Grant]
llm = get_llm("fast").with_structured_output(GrantsList)
md_subset = md[:10000] # Limiting to prevent token bloat
prompt = f"Wydobądź listę aktualnych naborów lub programów dotacyjnych z poniższego tekstu Markdown:\n\n{md_subset}"
result = await llm.ainvoke(prompt)
nabory = []
for g in result.grants:
uid = hashlib.md5(g.name.encode()).hexdigest()[:12]
if g.url and g.url.startswith("http"):
url = g.url
elif g.url and g.url.startswith("/"):
url = PARP_BASE_URL + g.url
else:
url = PARP_GRANTS_URL
nabory.append({
"id": uid,
"name": g.name,
"program": "PARP",
"status": "active",
"url": url,
"deadline": g.deadline,
"source": "parp_scrape",
"fetched_at": datetime.now(timezone.utc).isoformat(),
})
return nabory
except Exception as e:
logger.warning(f"Błąd parsowania markdowna z LLM (PARP): {e}")
return []
def _enrich_urls(self, nabory: list) -> None:
import urllib.parse
for n in nabory:
q_eur = n.get("program") or n.get("name", "")
q_gov = n.get("name", "")
if "eurlex_url" not in n:
n["eurlex_url"] = f"https://eur-lex.europa.eu/search.html?scope=EURLEX&text={urllib.parse.quote(q_eur)}&lang=pl&type=quick"
if "official_doc_url" not in n:
n["official_doc_url"] = f"https://www.funduszeeuropejskie.gov.pl/wyszukiwarka/mikro-male-i-srednie-przedsiebiorstwa/#/szukaj?search={urllib.parse.quote(q_gov)}"
async def get_active_nabory(self, force_refresh: bool = False) -> list:
"""
Główna metoda — zwraca listę aktywnych naborów.
Parametr force_refresh=True wymusza pominięcie cache.
"""
if not force_refresh:
cached = self._load_cache()
if cached:
nabory = cached["nabory"]
self._enrich_urls(nabory)
return nabory
nabory = await self._fetch_live()
self._enrich_urls(nabory)
self._save_cache(nabory)
return nabory
async def get_nabor_by_id(self, nabor_id: str) -> Optional[dict]:
"""Pobiera szczegóły konkretnego naboru po ID."""
nabory = await self.get_active_nabory()
return next((n for n in nabory if n["id"] == nabor_id), None)
async def match_for_project(self, project_data: dict) -> list:
"""
Dopasowuje aktualne nabory do profilu projektu.
Zwraca posortowaną listę z wynikiem match %.
"""
nabory = await self.get_active_nabory()
results = []
company_size = project_data.get("company_size", "").lower()
region = project_data.get("region", "").lower()
description = (
project_data.get("description", "") + " " + project_data.get("title", "")
).lower()
for n in nabory:
score = 0
reasons = []
# Wielkość firmy
eligible_sizes = [s.lower() for s in n.get("eligible_company_sizes", [])]
if (
not eligible_sizes
or company_size in eligible_sizes
or "mśp" in eligible_sizes
):
score += 30
reasons.append("Twoja wielkość firmy kwalifikuje się.")
# Region
eligible_regions = [r.lower() for r in n.get("eligible_regions", [])]
if (
not eligible_regions
or "cała polska" in eligible_regions
or region in eligible_regions
):
score += 25
reasons.append("Twój region jest obsługiwany.")
# Słowa kluczowe z opisu
keywords = [
"innowacja",
"b+r",
"cyfryzacja",
"automatyzacja",
"export",
"startup",
"ekologia",
"zazielenienie",
"ai",
"maszyna",
]
matched_kw = [k for k in keywords if k in description]
kw_score = min(45, len(matched_kw) * 10)
score += kw_score
if matched_kw:
reasons.append(f"Słowa kluczowe pasują: {', '.join(matched_kw[:3])}")
results.append(
{
**n,
"match_score": min(100, score),
"match_reasons": reasons,
}
)
return sorted(results, key=lambda x: x["match_score"], reverse=True)
# Singleton
parp_client = PARPClient()