Spaces:
Running
Running
| import requests | |
| import logging | |
| from typing import Dict, Any, Optional | |
| logger = logging.getLogger(__name__) | |
| class KRSClient: | |
| """ | |
| Klient do pobierania Odpisu Aktualnego z publicznego API KRS (Krajowy Rejestr S膮dowy). | |
| Zgodnie z dokumentacj膮: https://api-krs.ms.gov.pl/api/krs/OdpisAktualny/{krs}?rejestr={rejestr}&format=json | |
| """ | |
| BASE_URL = "https://api-krs.ms.gov.pl/api/krs" | |
| def get_odpis_aktualny(krs: str, rejestr: str = "P") -> Optional[Dict[str, Any]]: | |
| """ | |
| Pobiera aktualny JSON podmiotu z KRS. | |
| :param krs: 10-cyfrowy numer KRS | |
| :param rejestr: P - przedsi臋biorc贸w, S - stowarzysze艅 | |
| """ | |
| url = f"{KRSClient.BASE_URL}/OdpisAktualny/{krs}" | |
| params = {"rejestr": rejestr, "format": "json"} | |
| try: | |
| response = requests.get(url, params=params, timeout=10) | |
| if response.status_code == 200: | |
| return response.json() | |
| elif response.status_code == 404: | |
| logger.warning(f"Podmiot {krs} nie znaleziony w KRS.") | |
| return None | |
| else: | |
| logger.error( | |
| f"B艂膮d us艂ugi KRS API ({response.status_code}): {response.text}" | |
| ) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Wyj膮tek podczas 艂膮czenia z KRS API: {e}") | |
| return None | |
| def extract_graph_relations(krs_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Ekstrahuje kluczowe dane do zbudowania relacji w Neo4j: Wsp贸lnik贸w i Zarz膮d. | |
| Zwraca wyselekcjonowane s艂owniki przydatne w analizach powi膮za艅 M艢P. | |
| """ | |
| if not krs_data: | |
| return {} | |
| try: | |
| odpis = krs_data.get("odpis", {}) | |
| dane = odpis.get("dane", {}) | |
| # Podstawowe info (dzia艂 1) | |
| dzial1 = dane.get("dzial1", {}) | |
| dane_podmiotu = dzial1.get("danePodmiotu", {}) | |
| nazwa = dane_podmiotu.get("nazwa", "") | |
| nip = dane_podmiotu.get("identyfikatory", {}).get("nip", "") | |
| regon = dane_podmiotu.get("identyfikatory", {}).get("regon", "") | |
| krs_nr = odpis.get("naglowekA", {}).get("numerKRS", "") | |
| # Kapita艂 (dzia艂 1 - kapita艂) | |
| kapital = dzial1.get("kapital", {}) | |
| kapital_zakladowy = kapital.get("wysokoscKapitaluZakladowego", {}).get( | |
| "wartosc", "0" | |
| ) | |
| # Wsp贸lnicy (dzia艂 1 - wsp贸lnicy) | |
| wspolnicy = [] | |
| wspolnicy_krs = dzial1.get("wspolnicySpZoo", []) | |
| for w in wspolnicy_krs: | |
| osoba = w.get("identyfikator", {}) | |
| pesel = osoba.get("pesel", "") | |
| krs_wspolnika = osoba.get("krs", "") | |
| wspolnicy.append( | |
| { | |
| "id": pesel or krs_wspolnika or osoba.get("nazwisko", ""), | |
| "nazwa": osoba.get("nazwisko", "") or osoba.get("nazwa", ""), | |
| "imiona": osoba.get("imiona", ""), | |
| "is_spolka": bool(krs_wspolnika), | |
| "posiadaneDzialyUdzialy": w.get("posiadaneDzialyUdzialy", {}), | |
| } | |
| ) | |
| # Reprezentacja / Zarz膮d (dzia艂 2) | |
| dzial2 = dane.get("dzial2", {}) | |
| zarzad_krs = dzial2.get("reprezentacja", {}).get("sklad", []) | |
| zarzad = [] | |
| for z in zarzad_krs: | |
| osoba = z.get("identyfikator", {}) | |
| funkcja = z.get("funkcjaWOrganie", "Cz艂onek Zarz膮du") | |
| pesel = osoba.get("pesel", "") | |
| zarzad.append( | |
| { | |
| "id": pesel or osoba.get("nazwisko", ""), | |
| "nazwa": osoba.get("nazwisko", "") or osoba.get("nazwa", ""), | |
| "imiona": osoba.get("imiona", ""), | |
| "funkcja": funkcja, | |
| } | |
| ) | |
| return { | |
| "krs": krs_nr, | |
| "nip": nip, | |
| "regon": regon, | |
| "nazwa": nazwa, | |
| "kapitalZakladowy": kapital_zakladowy, | |
| "wspolnicy": wspolnicy, | |
| "zarzad": zarzad, | |
| } | |
| except Exception as e: | |
| logger.error(f"B艂膮d parsowania JSON z KRS: {e}") | |
| return {} | |