| import asyncio |
| |
| |
| import json |
| import trafilatura |
| from openai import AsyncOpenAI |
| from pydantic import BaseModel, Field |
| from typing import List |
|
|
| |
| MAX_SCRAPE_CONCURRENCY = 10 |
| MAX_AI_CONCURRENCY = 5 |
|
|
|
|
|
|
| |
| class CompanyResult(BaseModel): |
| name: str = Field(..., description="Name of the commercial company") |
| url: str = Field(..., description="""Official website URL. Predict if missing. |
| If you are NOT 100% sure about the official website, |
| respond ONLY with:'SEARCH_REQUIRED'""") |
| article_id: int = Field(..., description="The ID of the article provided in context") |
|
|
|
|
| class ExtractionResponse(BaseModel): |
| companies: List[CompanyResult] |
|
|
|
|
| |
| async def process_article(url: str, article_id: int, scrape_sem, ai_sem, OPENAI_API_KEY): |
| loop = asyncio.get_running_loop() |
|
|
| |
| async with scrape_sem: |
| try: |
| |
| downloaded = await loop.run_in_executor(None, trafilatura.fetch_url, url) |
|
|
| if downloaded is None: |
| return {"url": url, "error": "Fetch failed (blocked or 404)"} |
|
|
| |
| text = await loop.run_in_executor(None, trafilatura.extract, downloaded) |
|
|
| if not text: |
| return {"url": url, "error": "No main text found"} |
|
|
| except Exception as e: |
| return {"url": url, "error": f"Scrape error: {str(e)}"} |
|
|
| |
| truncated_text = text[:5000] |
| user_content = f"Article ID: {article_id}\n\nText:\n{truncated_text}" |
| client = AsyncOpenAI(api_key=OPENAI_API_KEY) |
| async with ai_sem: |
| try: |
| completion = await client.beta.chat.completions.parse( |
| model="gpt-4o-mini", |
| messages=[ |
| {"role": "system", "content": "Extract commercial companies. Exclude generic entities, countries, government bodies."}, |
| {"role": "user", "content": user_content}, |
| ], |
| response_format=ExtractionResponse, |
| temperature=0 |
| ) |
|
|
| result_obj = completion.choices[0].message.parsed |
|
|
| return { |
| "url": url, |
| "status": "success", |
| "companies": [c.model_dump() for c in result_obj.companies] |
| } |
|
|
| except Exception as e: |
| return {"url": url, "error": f"AI error: {str(e)}"} |
|
|
|
|
| |
| async def run_pipeline(urls: List[str], OPENAI_API_KEY): |
| scrape_sem = asyncio.Semaphore(MAX_SCRAPE_CONCURRENCY) |
| ai_sem = asyncio.Semaphore(MAX_AI_CONCURRENCY) |
|
|
| print(f"π Processing {len(urls)} articles...") |
|
|
| |
| tasks = [ |
| process_article(url, idx, scrape_sem, ai_sem, OPENAI_API_KEY) |
| for idx, url in enumerate(urls) |
| ] |
| results = await asyncio.gather(*tasks) |
|
|
| |
| success = [r for r in results if "error" not in r] |
| failures = [r for r in results if "error" in r] |
|
|
| print(f"\nβ
Completed: {len(success)}") |
| print(f"β Failed: {len(failures)}") |
|
|
| if success: |
| print(f"\n[Sample Output]:\n{json.dumps(success[0], indent=2)}") |
|
|
| |
| with open("final_results.json", "w") as f: |
| json.dump(success, f, indent=2) |
|
|
| return success |
|
|
|
|
| def get_companies_and_articles(article_url: list, OPENAI_API_KEY): |
| companies_with_articles = asyncio.run(run_pipeline(article_url, OPENAI_API_KEY)) |
| return companies_with_articles |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|