pranav8tripathi@gmail.com
updated request body
1a734a9
"""Wikidata API client for fetching structured data about companies and other entities."""
import aiohttp
import logging
from typing import Dict, Any, Optional, List, Union
from urllib.parse import urlencode
class WikidataClient:
"""Client for interacting with the Wikidata API."""
def __init__(
self,
app_name: str = "BizInsights",
email: str = "developer@yugensys.co",
api_base_url: str = "https://www.wikidata.org/w/api.php"
) -> None:
"""Initialize the Wikidata client.
Args:
app_name: Name of your application (for User-Agent header)
email: Your email address (for contact in case of issues)
api_base_url: Base URL for the Wikidata API
"""
self.api_base_url = api_base_url.rstrip("/")
self._session = None
self._user_agent = f"{app_name}/1.0 ({email}) Python/aiohttp"
self.logger = logging.getLogger(__name__)
async def _make_api_request(
self,
params: Dict[str, Any],
endpoint: str = ""
) -> Dict[str, Any]:
"""Make a request to the Wikidata API.
Args:
params: Query parameters for the API request
endpoint: API endpoint (defaults to the main API endpoint)
Returns:
Dict containing the JSON response or error information
"""
url = f"{self.api_base_url}/{endpoint.lstrip('/')}"
# Set default parameters
params.setdefault("format", "json")
headers = {
"User-Agent": self._user_agent,
"Accept": "application/json"
}
try:
if not self._session or self._session.closed:
self._session = aiohttp.ClientSession()
async with self._session.get(
url,
params=params,
headers=headers
) as response:
if response.status != 200:
error_text = await response.text()
self.logger.error(f"API request failed: {response.status} - {error_text}")
return {
"error": f"API request failed with status {response.status}",
"status_code": response.status,
"response": error_text
}
try:
return await response.json()
except Exception as e:
self.logger.error(f"Failed to parse JSON response: {str(e)}")
return {
"error": f"Failed to parse JSON response: {str(e)}",
"status_code": response.status,
"response_text": await response.text()
}
except Exception as e:
self.logger.error(f"Request failed: {str(e)}")
return {
"error": f"Request failed: {str(e)}"
}
async def search_entity(self, search_term: str, language: str = "en", limit: int = 10) -> Dict[str, Any]:
"""Search for an entity by name.
Args:
search_term: The term to search for
language: Language code for the search (default: "en")
limit: Maximum number of results to return (default: 10)
Returns:
Dict containing search results
"""
params = {
"action": "wbsearchentities",
"search": search_term,
"language": language,
"format": "json",
"limit": str(limit)
}
result = await self._make_api_request(params)
if "error" in result:
return result
return result.get("search", [])
async def get_entity_details(
self,
entity_id: str,
language: str = "en",
properties: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Get detailed information about an entity by its ID.
Args:
entity_id: The Wikidata entity ID (e.g., "Q478214")
language: Language code for labels and descriptions (default: "en")
properties: List of property IDs to include (default: None for all)
Returns:
Dict containing entity details
"""
if not entity_id.startswith("Q"):
return {"error": f"Invalid entity ID format: {entity_id}. Must start with 'Q'"}
props = "|%".join(properties) if properties else "claims|labels|descriptions"
params = {
"action": "wbgetentities",
"ids": entity_id,
"props": props,
"languages": language,
"format": "json"
}
result = await self._make_api_request(params)
if "error" in result:
return result
return result.get("entities", {}).get(entity_id, {})
async def get_related_entities(
self,
entity_id: str,
relation_type: str = "competitor",
language: str = "en",
limit: int = 5
) -> List[Dict[str, Any]]:
"""Get entities related to the specified entity by a specific relation type.
Args:
entity_id: The Wikidata entity ID (e.g., "Q478214")
relation_type: Type of relation to look for (default: "competitor")
language: Language code for labels (default: "en")
limit: Maximum number of related entities to return (default: 5)
Returns:
List of related entities with their details
"""
# Map relation types to Wikidata property IDs
relation_properties = {
"competitor": "P1592", # competitor
"parent_company": "P749", # parent organization
"subsidiary": "P355", # subsidiary
"industry": "P452", # industry
"product": "P1056" # product or material produced
}
property_id = relation_properties.get(relation_type, "P1592") # Default to competitor
# First, get all claims for the entity
entity = await self.get_entity_details(entity_id, language=language, properties=["claims"])
if "error" in entity:
self.logger.error(f"Failed to get entity details: {entity['error']}")
return []
# Find claims with the specified property
claims = entity.get("claims", {}).get(property_id, [])
# Extract related entity IDs
related_ids = []
for claim in claims[:limit]: # Limit the number of results
if claim.get("mainsnak", {}).get("datatype") == "wikibase-item":
value = claim.get("mainsnak", {}).get("datavalue", {}).get("value")
if value and isinstance(value, dict) and value.get("id"):
related_ids.append(value["id"])
if not related_ids:
return []
# Get details for each related entity
related_entities = []
for rel_id in related_ids:
try:
details = await self.get_entity_details(
rel_id,
language=language,
properties=["labels", "descriptions", "claims"]
)
if details and "error" not in details:
related_entities.append(details)
except Exception as e:
self.logger.warning(f"Failed to get details for related entity {rel_id}: {str(e)}")
return related_entities
async def get_company_info(self, company_name: str, language: str = "en") -> Dict[str, Any]:
"""Get structured information about a company.
Args:
company_name: Name of the company to look up
language: Language code for results (default: "en")
Returns:
Dict containing company information
"""
# First, search for the company to get its QID
search_results = await self.search_entity(company_name, language)
if not search_results or "error" in search_results:
return search_results or {"error": "No results found"}
# Take the first result
entity = search_results[0]
entity_id = entity.get("id")
if not entity_id:
return {"error": "No entity ID found in search results"}
# Get detailed entity information
details = await self.get_entity_details(
entity_id,
language=language,
properties=["claims", "labels", "descriptions", "sitelinks"]
)
if "error" in details:
return details
# Extract relevant information
labels = details.get("labels", {}).get(language, {}).get("value", company_name)
description = details.get("descriptions", {}).get(language, {}).get("value", "")
# Get claims (properties)
claims = details.get("claims", {})
# Helper function to get property value
def get_claim_value(prop_id, default=None, format_type=None):
if prop_id not in claims or not claims[prop_id]:
return default
main_snak = claims[prop_id][0].get("mainsnak", {})
if main_snak.get("snaktype") != "value":
return default
datavalue = main_snak.get("datavalue", {})
if not datavalue:
return default
value = datavalue.get("value")
# Format the value based on type if specified
if format_type == "date" and isinstance(value, dict) and "time" in value:
# Extract just the year from date: "+2003-07-01T00:00:00Z" -> "2003"
return value["time"][1:5] # Extract just the year part
elif format_type == "quantity" and isinstance(value, dict):
# Format quantity: {'amount': '+7000', 'unit': '1'} -> "7,000"
amount = value.get("amount", "").lstrip("+")
try:
return "{:,}".format(int(amount))
except (ValueError, TypeError):
return amount
elif format_type == "entity" and isinstance(value, dict) and "id" in value:
# For entity references, return the ID
return value["id"]
return value
# Get common company properties
result = {
"id": entity_id,
"name": labels,
"description": description,
"ceo": None,
"headquarters": None,
"founded": None,
"industry": [],
"website": None,
"official_website": None,
"employees": None,
"revenue": None,
"stock_symbol": None
}
# Map common properties to Wikidata property IDs
property_map = {
"ceo": "P169", # chief executive officer
"headquarters": "P159", # headquarters location
"founded": "P571", # inception
"industry": ["P452", "P105"], # industry, product or material produced
"website": "P856", # official website
"employees": "P1128", # number of employees
"revenue": "P2139", # total revenue
"stock_symbol": "P414" # stock exchange
}
# Extract values for each property
for field, prop_ids in property_map.items():
if not isinstance(prop_ids, list):
prop_ids = [prop_ids]
for prop_id in prop_ids:
# Determine the format type based on the field
format_type = None
if field == "founded":
format_type = "date"
elif field in ["employees", "revenue"]:
format_type = "quantity"
elif field in ["ceo", "headquarters", "industry", "stock_symbol"]:
format_type = "entity"
value = get_claim_value(prop_id, format_type=format_type)
if value:
if field == "ceo" and isinstance(value, str):
# Get CEO name
ceo_details = await self.get_entity_details(value, language)
if "error" not in ceo_details:
result[field] = ceo_details.get("labels", {}).get(language, {}).get("value", value)
elif field == "headquarters" and isinstance(value, str):
# Get HQ location name
hq_details = await self.get_entity_details(value, language)
if "error" not in hq_details:
result[field] = hq_details.get("labels", {}).get(language, {}).get("value", value)
elif field == "industry":
# For industry, collect all values
if not isinstance(result[field], list):
result[field] = []
if isinstance(value, str):
industry_details = await self.get_entity_details(value, language)
if "error" not in industry_details:
label = industry_details.get("labels", {}).get(language, {}).get("value", value)
if label not in result[field]:
result[field].append(label)
elif field == "stock_symbol" and isinstance(value, str):
# Get stock exchange name and symbol if available
exchange_details = await self.get_entity_details(value, language)
if "error" not in exchange_details:
exchange_name = exchange_details.get("labels", {}).get(language, {}).get("value", value)
result[field] = exchange_name # Just use the exchange name for now
else:
# For simple values, just store as is
result[field] = value
# Format the output data for better readability
if isinstance(result.get("industry"), list):
result["industry"] = ", ".join(result["industry"])
return result
async def close(self):
"""Close the HTTP session."""
if self._session and not self._session.closed:
await self._session.close()