newFinderAgent_v2 / src /fetch_and_extract.py
chummchumm's picture
Upload 6 files
8b425b2 verified
import asyncio
# import aiohttp
# import os
import json
import trafilatura
from openai import AsyncOpenAI
from pydantic import BaseModel, Field
from typing import List
# --- CONFIGURATION ---
MAX_SCRAPE_CONCURRENCY = 10
MAX_AI_CONCURRENCY = 5
# --- DATA MODELS ---
class CompanyResult(BaseModel):
name: str = Field(..., description="Name of the commercial company")
url: str = Field(..., description="""Official website URL. Predict if missing.
If you are NOT 100% sure about the official website,
respond ONLY with:'SEARCH_REQUIRED'""")
article_id: int = Field(..., description="The ID of the article provided in context")
class ExtractionResponse(BaseModel):
companies: List[CompanyResult]
# --- ROBUST WORKER ---
async def process_article(url: str, article_id: int, scrape_sem, ai_sem, OPENAI_API_KEY):
loop = asyncio.get_running_loop()
# 1. Fetch & Extract (Using Trafilatura's robust fetcher)
async with scrape_sem:
try:
# Run the synchronous fetch_url in a separate thread
downloaded = await loop.run_in_executor(None, trafilatura.fetch_url, url)
if downloaded is None:
return {"url": url, "error": "Fetch failed (blocked or 404)"}
# Extract text (also CPU bound, so runs in executor)
text = await loop.run_in_executor(None, trafilatura.extract, downloaded)
if not text:
return {"url": url, "error": "No main text found"}
except Exception as e:
return {"url": url, "error": f"Scrape error: {str(e)}"}
# 2. AI Extraction
truncated_text = text[:5000] # Trim to save tokens
user_content = f"Article ID: {article_id}\n\nText:\n{truncated_text}"
client = AsyncOpenAI(api_key=OPENAI_API_KEY)
async with ai_sem:
try:
completion = await client.beta.chat.completions.parse(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "Extract commercial companies. Exclude generic entities, countries, government bodies."},
{"role": "user", "content": user_content},
],
response_format=ExtractionResponse,
temperature=0
)
result_obj = completion.choices[0].message.parsed
return {
"url": url,
"status": "success",
"companies": [c.model_dump() for c in result_obj.companies]
}
except Exception as e:
return {"url": url, "error": f"AI error: {str(e)}"}
# --- MAIN ORCHESTRATOR ---
async def run_pipeline(urls: List[str], OPENAI_API_KEY):
scrape_sem = asyncio.Semaphore(MAX_SCRAPE_CONCURRENCY)
ai_sem = asyncio.Semaphore(MAX_AI_CONCURRENCY)
print(f"πŸš€ Processing {len(urls)} articles...")
# We don't need aiohttp session anymore for fetching, as Trafilatura handles it.
tasks = [
process_article(url, idx, scrape_sem, ai_sem, OPENAI_API_KEY)
for idx, url in enumerate(urls)
]
results = await asyncio.gather(*tasks)
# Reporting
success = [r for r in results if "error" not in r]
failures = [r for r in results if "error" in r]
print(f"\nβœ… Completed: {len(success)}")
print(f"❌ Failed: {len(failures)}")
if success:
print(f"\n[Sample Output]:\n{json.dumps(success[0], indent=2)}")
# Save to file
with open("final_results.json", "w") as f:
json.dump(success, f, indent=2)
return success
def get_companies_and_articles(article_url: list, OPENAI_API_KEY):
companies_with_articles = asyncio.run(run_pipeline(article_url, OPENAI_API_KEY))
return companies_with_articles
# if __name__ == "__main__":
# # REAL, LIVE URLs (Checked Feb 4, 2026)
# live_urls = [
# "https://newsroom.ibm.com/2026-02-04-ibm-opens-global-rfp-for-ai-driven-solutions-shaping-the-future-of-work-and-education",
# "https://eng.lsm.lv/article/society/defence/04.02.2026-artificial-intelligence-centre-to-get-230000-euros-from-defence-budget.a633009/",
# "https://www.unesco.org/en/articles/tech-spark-africa-advances-simulation-based-learning-skills-development"
# ]
#
# companies_with_articles = asyncio.run(run_pipeline(live_urls))