import asyncio # import aiohttp # import os import json import trafilatura from openai import AsyncOpenAI from pydantic import BaseModel, Field from typing import List # --- CONFIGURATION --- MAX_SCRAPE_CONCURRENCY = 10 MAX_AI_CONCURRENCY = 5 # --- DATA MODELS --- class CompanyResult(BaseModel): name: str = Field(..., description="Name of the commercial company") url: str = Field(..., description="""Official website URL. Predict if missing. If you are NOT 100% sure about the official website, respond ONLY with:'SEARCH_REQUIRED'""") article_id: int = Field(..., description="The ID of the article provided in context") class ExtractionResponse(BaseModel): companies: List[CompanyResult] # --- ROBUST WORKER --- async def process_article(url: str, article_id: int, scrape_sem, ai_sem, OPENAI_API_KEY): loop = asyncio.get_running_loop() # 1. Fetch & Extract (Using Trafilatura's robust fetcher) async with scrape_sem: try: # Run the synchronous fetch_url in a separate thread downloaded = await loop.run_in_executor(None, trafilatura.fetch_url, url) if downloaded is None: return {"url": url, "error": "Fetch failed (blocked or 404)"} # Extract text (also CPU bound, so runs in executor) text = await loop.run_in_executor(None, trafilatura.extract, downloaded) if not text: return {"url": url, "error": "No main text found"} except Exception as e: return {"url": url, "error": f"Scrape error: {str(e)}"} # 2. AI Extraction truncated_text = text[:5000] # Trim to save tokens user_content = f"Article ID: {article_id}\n\nText:\n{truncated_text}" client = AsyncOpenAI(api_key=OPENAI_API_KEY) async with ai_sem: try: completion = await client.beta.chat.completions.parse( model="gpt-4o-mini", messages=[ {"role": "system", "content": "Extract commercial companies. Exclude generic entities, countries, government bodies."}, {"role": "user", "content": user_content}, ], response_format=ExtractionResponse, temperature=0 ) result_obj = completion.choices[0].message.parsed return { "url": url, "status": "success", "companies": [c.model_dump() for c in result_obj.companies] } except Exception as e: return {"url": url, "error": f"AI error: {str(e)}"} # --- MAIN ORCHESTRATOR --- async def run_pipeline(urls: List[str], OPENAI_API_KEY): scrape_sem = asyncio.Semaphore(MAX_SCRAPE_CONCURRENCY) ai_sem = asyncio.Semaphore(MAX_AI_CONCURRENCY) print(f"šŸš€ Processing {len(urls)} articles...") # We don't need aiohttp session anymore for fetching, as Trafilatura handles it. tasks = [ process_article(url, idx, scrape_sem, ai_sem, OPENAI_API_KEY) for idx, url in enumerate(urls) ] results = await asyncio.gather(*tasks) # Reporting success = [r for r in results if "error" not in r] failures = [r for r in results if "error" in r] print(f"\nāœ… Completed: {len(success)}") print(f"āŒ Failed: {len(failures)}") if success: print(f"\n[Sample Output]:\n{json.dumps(success[0], indent=2)}") # Save to file with open("final_results.json", "w") as f: json.dump(success, f, indent=2) return success def get_companies_and_articles(article_url: list, OPENAI_API_KEY): companies_with_articles = asyncio.run(run_pipeline(article_url, OPENAI_API_KEY)) return companies_with_articles # if __name__ == "__main__": # # REAL, LIVE URLs (Checked Feb 4, 2026) # live_urls = [ # "https://newsroom.ibm.com/2026-02-04-ibm-opens-global-rfp-for-ai-driven-solutions-shaping-the-future-of-work-and-education", # "https://eng.lsm.lv/article/society/defence/04.02.2026-artificial-intelligence-centre-to-get-230000-euros-from-defence-budget.a633009/", # "https://www.unesco.org/en/articles/tech-spark-africa-advances-simulation-based-learning-skills-development" # ] # # companies_with_articles = asyncio.run(run_pipeline(live_urls))