Spaces:
Sleeping
Sleeping
| """Wikidata API client for fetching structured data about companies and other entities.""" | |
| import aiohttp | |
| import logging | |
| from typing import Dict, Any, Optional, List, Union | |
| from urllib.parse import urlencode | |
| class WikidataClient: | |
| """Client for interacting with the Wikidata API.""" | |
| def __init__( | |
| self, | |
| app_name: str = "BizInsights", | |
| email: str = "developer@yugensys.co", | |
| api_base_url: str = "https://www.wikidata.org/w/api.php" | |
| ) -> None: | |
| """Initialize the Wikidata client. | |
| Args: | |
| app_name: Name of your application (for User-Agent header) | |
| email: Your email address (for contact in case of issues) | |
| api_base_url: Base URL for the Wikidata API | |
| """ | |
| self.api_base_url = api_base_url.rstrip("/") | |
| self._session = None | |
| self._user_agent = f"{app_name}/1.0 ({email}) Python/aiohttp" | |
| self.logger = logging.getLogger(__name__) | |
| async def _make_api_request( | |
| self, | |
| params: Dict[str, Any], | |
| endpoint: str = "" | |
| ) -> Dict[str, Any]: | |
| """Make a request to the Wikidata API. | |
| Args: | |
| params: Query parameters for the API request | |
| endpoint: API endpoint (defaults to the main API endpoint) | |
| Returns: | |
| Dict containing the JSON response or error information | |
| """ | |
| url = f"{self.api_base_url}/{endpoint.lstrip('/')}" | |
| # Set default parameters | |
| params.setdefault("format", "json") | |
| headers = { | |
| "User-Agent": self._user_agent, | |
| "Accept": "application/json" | |
| } | |
| try: | |
| if not self._session or self._session.closed: | |
| self._session = aiohttp.ClientSession() | |
| async with self._session.get( | |
| url, | |
| params=params, | |
| headers=headers | |
| ) as response: | |
| if response.status != 200: | |
| error_text = await response.text() | |
| self.logger.error(f"API request failed: {response.status} - {error_text}") | |
| return { | |
| "error": f"API request failed with status {response.status}", | |
| "status_code": response.status, | |
| "response": error_text | |
| } | |
| try: | |
| return await response.json() | |
| except Exception as e: | |
| self.logger.error(f"Failed to parse JSON response: {str(e)}") | |
| return { | |
| "error": f"Failed to parse JSON response: {str(e)}", | |
| "status_code": response.status, | |
| "response_text": await response.text() | |
| } | |
| except Exception as e: | |
| self.logger.error(f"Request failed: {str(e)}") | |
| return { | |
| "error": f"Request failed: {str(e)}" | |
| } | |
| async def search_entity(self, search_term: str, language: str = "en", limit: int = 10) -> Dict[str, Any]: | |
| """Search for an entity by name. | |
| Args: | |
| search_term: The term to search for | |
| language: Language code for the search (default: "en") | |
| limit: Maximum number of results to return (default: 10) | |
| Returns: | |
| Dict containing search results | |
| """ | |
| params = { | |
| "action": "wbsearchentities", | |
| "search": search_term, | |
| "language": language, | |
| "format": "json", | |
| "limit": str(limit) | |
| } | |
| result = await self._make_api_request(params) | |
| if "error" in result: | |
| return result | |
| return result.get("search", []) | |
| async def get_entity_details( | |
| self, | |
| entity_id: str, | |
| language: str = "en", | |
| properties: Optional[List[str]] = None | |
| ) -> Dict[str, Any]: | |
| """Get detailed information about an entity by its ID. | |
| Args: | |
| entity_id: The Wikidata entity ID (e.g., "Q478214") | |
| language: Language code for labels and descriptions (default: "en") | |
| properties: List of property IDs to include (default: None for all) | |
| Returns: | |
| Dict containing entity details | |
| """ | |
| if not entity_id.startswith("Q"): | |
| return {"error": f"Invalid entity ID format: {entity_id}. Must start with 'Q'"} | |
| props = "|%".join(properties) if properties else "claims|labels|descriptions" | |
| params = { | |
| "action": "wbgetentities", | |
| "ids": entity_id, | |
| "props": props, | |
| "languages": language, | |
| "format": "json" | |
| } | |
| result = await self._make_api_request(params) | |
| if "error" in result: | |
| return result | |
| return result.get("entities", {}).get(entity_id, {}) | |
| async def get_related_entities( | |
| self, | |
| entity_id: str, | |
| relation_type: str = "competitor", | |
| language: str = "en", | |
| limit: int = 5 | |
| ) -> List[Dict[str, Any]]: | |
| """Get entities related to the specified entity by a specific relation type. | |
| Args: | |
| entity_id: The Wikidata entity ID (e.g., "Q478214") | |
| relation_type: Type of relation to look for (default: "competitor") | |
| language: Language code for labels (default: "en") | |
| limit: Maximum number of related entities to return (default: 5) | |
| Returns: | |
| List of related entities with their details | |
| """ | |
| # Map relation types to Wikidata property IDs | |
| relation_properties = { | |
| "competitor": "P1592", # competitor | |
| "parent_company": "P749", # parent organization | |
| "subsidiary": "P355", # subsidiary | |
| "industry": "P452", # industry | |
| "product": "P1056" # product or material produced | |
| } | |
| property_id = relation_properties.get(relation_type, "P1592") # Default to competitor | |
| # First, get all claims for the entity | |
| entity = await self.get_entity_details(entity_id, language=language, properties=["claims"]) | |
| if "error" in entity: | |
| self.logger.error(f"Failed to get entity details: {entity['error']}") | |
| return [] | |
| # Find claims with the specified property | |
| claims = entity.get("claims", {}).get(property_id, []) | |
| # Extract related entity IDs | |
| related_ids = [] | |
| for claim in claims[:limit]: # Limit the number of results | |
| if claim.get("mainsnak", {}).get("datatype") == "wikibase-item": | |
| value = claim.get("mainsnak", {}).get("datavalue", {}).get("value") | |
| if value and isinstance(value, dict) and value.get("id"): | |
| related_ids.append(value["id"]) | |
| if not related_ids: | |
| return [] | |
| # Get details for each related entity | |
| related_entities = [] | |
| for rel_id in related_ids: | |
| try: | |
| details = await self.get_entity_details( | |
| rel_id, | |
| language=language, | |
| properties=["labels", "descriptions", "claims"] | |
| ) | |
| if details and "error" not in details: | |
| related_entities.append(details) | |
| except Exception as e: | |
| self.logger.warning(f"Failed to get details for related entity {rel_id}: {str(e)}") | |
| return related_entities | |
| async def get_company_info(self, company_name: str, language: str = "en") -> Dict[str, Any]: | |
| """Get structured information about a company. | |
| Args: | |
| company_name: Name of the company to look up | |
| language: Language code for results (default: "en") | |
| Returns: | |
| Dict containing company information | |
| """ | |
| # First, search for the company to get its QID | |
| search_results = await self.search_entity(company_name, language) | |
| if not search_results or "error" in search_results: | |
| return search_results or {"error": "No results found"} | |
| # Take the first result | |
| entity = search_results[0] | |
| entity_id = entity.get("id") | |
| if not entity_id: | |
| return {"error": "No entity ID found in search results"} | |
| # Get detailed entity information | |
| details = await self.get_entity_details( | |
| entity_id, | |
| language=language, | |
| properties=["claims", "labels", "descriptions", "sitelinks"] | |
| ) | |
| if "error" in details: | |
| return details | |
| # Extract relevant information | |
| labels = details.get("labels", {}).get(language, {}).get("value", company_name) | |
| description = details.get("descriptions", {}).get(language, {}).get("value", "") | |
| # Get claims (properties) | |
| claims = details.get("claims", {}) | |
| # Helper function to get property value | |
| def get_claim_value(prop_id, default=None, format_type=None): | |
| if prop_id not in claims or not claims[prop_id]: | |
| return default | |
| main_snak = claims[prop_id][0].get("mainsnak", {}) | |
| if main_snak.get("snaktype") != "value": | |
| return default | |
| datavalue = main_snak.get("datavalue", {}) | |
| if not datavalue: | |
| return default | |
| value = datavalue.get("value") | |
| # Format the value based on type if specified | |
| if format_type == "date" and isinstance(value, dict) and "time" in value: | |
| # Extract just the year from date: "+2003-07-01T00:00:00Z" -> "2003" | |
| return value["time"][1:5] # Extract just the year part | |
| elif format_type == "quantity" and isinstance(value, dict): | |
| # Format quantity: {'amount': '+7000', 'unit': '1'} -> "7,000" | |
| amount = value.get("amount", "").lstrip("+") | |
| try: | |
| return "{:,}".format(int(amount)) | |
| except (ValueError, TypeError): | |
| return amount | |
| elif format_type == "entity" and isinstance(value, dict) and "id" in value: | |
| # For entity references, return the ID | |
| return value["id"] | |
| return value | |
| # Get common company properties | |
| result = { | |
| "id": entity_id, | |
| "name": labels, | |
| "description": description, | |
| "ceo": None, | |
| "headquarters": None, | |
| "founded": None, | |
| "industry": [], | |
| "website": None, | |
| "official_website": None, | |
| "employees": None, | |
| "revenue": None, | |
| "stock_symbol": None | |
| } | |
| # Map common properties to Wikidata property IDs | |
| property_map = { | |
| "ceo": "P169", # chief executive officer | |
| "headquarters": "P159", # headquarters location | |
| "founded": "P571", # inception | |
| "industry": ["P452", "P105"], # industry, product or material produced | |
| "website": "P856", # official website | |
| "employees": "P1128", # number of employees | |
| "revenue": "P2139", # total revenue | |
| "stock_symbol": "P414" # stock exchange | |
| } | |
| # Extract values for each property | |
| for field, prop_ids in property_map.items(): | |
| if not isinstance(prop_ids, list): | |
| prop_ids = [prop_ids] | |
| for prop_id in prop_ids: | |
| # Determine the format type based on the field | |
| format_type = None | |
| if field == "founded": | |
| format_type = "date" | |
| elif field in ["employees", "revenue"]: | |
| format_type = "quantity" | |
| elif field in ["ceo", "headquarters", "industry", "stock_symbol"]: | |
| format_type = "entity" | |
| value = get_claim_value(prop_id, format_type=format_type) | |
| if value: | |
| if field == "ceo" and isinstance(value, str): | |
| # Get CEO name | |
| ceo_details = await self.get_entity_details(value, language) | |
| if "error" not in ceo_details: | |
| result[field] = ceo_details.get("labels", {}).get(language, {}).get("value", value) | |
| elif field == "headquarters" and isinstance(value, str): | |
| # Get HQ location name | |
| hq_details = await self.get_entity_details(value, language) | |
| if "error" not in hq_details: | |
| result[field] = hq_details.get("labels", {}).get(language, {}).get("value", value) | |
| elif field == "industry": | |
| # For industry, collect all values | |
| if not isinstance(result[field], list): | |
| result[field] = [] | |
| if isinstance(value, str): | |
| industry_details = await self.get_entity_details(value, language) | |
| if "error" not in industry_details: | |
| label = industry_details.get("labels", {}).get(language, {}).get("value", value) | |
| if label not in result[field]: | |
| result[field].append(label) | |
| elif field == "stock_symbol" and isinstance(value, str): | |
| # Get stock exchange name and symbol if available | |
| exchange_details = await self.get_entity_details(value, language) | |
| if "error" not in exchange_details: | |
| exchange_name = exchange_details.get("labels", {}).get(language, {}).get("value", value) | |
| result[field] = exchange_name # Just use the exchange name for now | |
| else: | |
| # For simple values, just store as is | |
| result[field] = value | |
| # Format the output data for better readability | |
| if isinstance(result.get("industry"), list): | |
| result["industry"] = ", ".join(result["industry"]) | |
| return result | |
| async def close(self): | |
| """Close the HTTP session.""" | |
| if self._session and not self._session.closed: | |
| await self._session.close() | |