| | import httpx |
| | import time |
| | from typing import Dict, List, Optional, Any, Union |
| | import json |
| | from dataclasses import asdict |
| | import asyncio |
| | from datetime import datetime |
| | from itertools import islice |
| |
|
| | from .types import ( |
| | StackOverflowQuestion, |
| | StackOverflowAnswer, |
| | StackOverflowComment, |
| | SearchResult, |
| | SearchResultComments |
| | ) |
| |
|
| | from .env import ( |
| | MAX_REQUEST_PER_WINDOW, |
| | RATE_LIMIT_WINDOW_MS, |
| | RETRY_AFTER_MS |
| | ) |
| |
|
| | STACKOVERFLOW_API = "https://api.stackexchange.com/2.3" |
| | BATCH_SIZE = 100 |
| |
|
| | class StackExchangeAPI: |
| | def __init__(self, api_key: Optional[str] = None): |
| | self.api_key = api_key |
| | self.request_timestamps = [] |
| | self.client = httpx.AsyncClient(timeout=30.0) |
| | |
| | async def close(self): |
| | await self.client.aclose() |
| | |
| | def _check_rate_limit(self) -> bool: |
| | now = time.time() * 1000 |
| | |
| | self.request_timestamps = [ |
| | ts for ts in self.request_timestamps |
| | if now - ts < RATE_LIMIT_WINDOW_MS |
| | ] |
| | |
| | if len(self.request_timestamps) >= MAX_REQUEST_PER_WINDOW: |
| | return False |
| | |
| | self.request_timestamps.append(now) |
| | return True |
| | |
| | async def _with_rate_limit(self, func, *args, retries=3, attempts=10, **kwargs): |
| | """Execute a function with rate limiting. |
| | |
| | Args: |
| | func (_type_): Function to execute with rate limiting |
| | retries (int, optional): Number of retries after API rate limit error. Defaults to 3. |
| | attempts (int, optional): Number of times to retry after hitting local rate limit. Defaults to 10. |
| | |
| | Raises: |
| | Exception: When maximum rate limiting attempts are exceeded |
| | e: Original error if retries are exhausted |
| | |
| | Returns: |
| | Any: Result from the function |
| | """ |
| | if retries is None: |
| | retries = 3 |
| | |
| | if attempts <= 0: |
| | raise Exception("Maximum rate limiting attempts exceeded") |
| | |
| | if not self._check_rate_limit(): |
| | print("Rate limit exceeded, waiting before retry") |
| | await asyncio.sleep(RETRY_AFTER_MS / 1000) |
| | return await self._with_rate_limit(func, *args, retries=retries, attempts=attempts-1, **kwargs) |
| | |
| | try: |
| | return await func(*args, **kwargs) |
| | except httpx.HTTPStatusError as e: |
| | if retries > 0 and e.response.status_code == 429: |
| | print("Rate limit hit (429), retrying after delay...") |
| | await asyncio.sleep(RETRY_AFTER_MS/1000) |
| | return await self._with_rate_limit(func, *args, retries=retries-1, attempts=attempts, **kwargs) |
| | raise e |
| | |
| | async def fetch_batch_answers(self, question_ids: List[int]) -> Dict[int, List[StackOverflowAnswer]]: |
| | """Fetch answers for multiple questions in a single API call. |
| | |
| | Args: |
| | question_ids (List[int]): List of Stack Overflow question IDs |
| | |
| | Returns: |
| | Dict[int, List[StackOverflowAnswer]]: Dictionary mapping question IDs to their answers |
| | """ |
| | if not question_ids: |
| | return {} |
| | |
| | result = {} |
| | |
| | |
| | for i in range(0, len(question_ids), BATCH_SIZE): |
| | batch = question_ids[i:i+BATCH_SIZE] |
| | ids_string = ";".join(str(qid) for qid in batch) |
| | |
| | params = { |
| | "site": "stackoverflow", |
| | "sort": "votes", |
| | "order": "desc", |
| | "filter": "withbody", |
| | "pagesize": "100" |
| | } |
| | |
| | if self.api_key: |
| | params["key"] = self.api_key |
| | |
| | async def _do_fetch(): |
| | response = await self.client.get( |
| | f"{STACKOVERFLOW_API}/questions/{ids_string}/answers", |
| | params=params |
| | ) |
| | response.raise_for_status() |
| | return response.json() |
| | |
| | data = await self._with_rate_limit(_do_fetch) |
| | |
| | for answer_data in data.get("items", []): |
| | question_id = answer_data.get("question_id") |
| | if question_id not in result: |
| | result[question_id] = [] |
| | |
| | answer = StackOverflowAnswer( |
| | answer_id=answer_data.get("answer_id"), |
| | question_id=question_id, |
| | score=answer_data.get("score", 0), |
| | is_accepted=answer_data.get("is_accepted", False), |
| | body=answer_data.get("body", ""), |
| | creation_date=answer_data.get("creation_date", 0), |
| | last_activity_date=answer_data.get("last_activity_date", 0), |
| | link=answer_data.get("link", ""), |
| | owner=answer_data.get("owner") |
| | ) |
| | result[question_id].append(answer) |
| | |
| | return result |
| | |
| | async def fetch_batch_comments(self, post_ids: List[int]) -> Dict[int, List[StackOverflowComment]]: |
| | """Fetch comments for multiple posts in a single API call. |
| | |
| | Args: |
| | post_ids (List[int]): List of Stack Overflow post IDs (questions or answers) |
| | |
| | Returns: |
| | Dict[int, List[StackOverflowComment]]: Dictionary mapping post IDs to their comments |
| | """ |
| | if not post_ids: |
| | return {} |
| | |
| | result = {} |
| | |
| | |
| | for i in range(0, len(post_ids), BATCH_SIZE): |
| | batch = post_ids[i:i+BATCH_SIZE] |
| | ids_string = ";".join(str(pid) for pid in batch) |
| | |
| | params = { |
| | "site": "stackoverflow", |
| | "sort": "votes", |
| | "order": "desc", |
| | "filter": "withbody", |
| | "pagesize": "100" |
| | } |
| | |
| | if self.api_key: |
| | params["key"] = self.api_key |
| | |
| | async def _do_fetch(): |
| | response = await self.client.get( |
| | f"{STACKOVERFLOW_API}/posts/{ids_string}/comments", |
| | params=params |
| | ) |
| | response.raise_for_status() |
| | return response.json() |
| | |
| | data = await self._with_rate_limit(_do_fetch) |
| | |
| | for comment_data in data.get("items", []): |
| | post_id = comment_data.get("post_id") |
| | if post_id not in result: |
| | result[post_id] = [] |
| | |
| | comment = StackOverflowComment( |
| | comment_id=comment_data.get("comment_id"), |
| | post_id=post_id, |
| | score=comment_data.get("score", 0), |
| | body=comment_data.get("body", ""), |
| | creation_date=comment_data.get("creation_date", 0), |
| | owner=comment_data.get("owner") |
| | ) |
| | result[post_id].append(comment) |
| | |
| | return result |
| | |
| | async def advanced_search( |
| | self, |
| | query: Optional[str] = None, |
| | tags: Optional[List[str]] = None, |
| | excluded_tags: Optional[List[str]] = None, |
| | min_score: Optional[int] = None, |
| | title: Optional[str] = None, |
| | body: Optional[str] = None, |
| | answers: Optional[int] = None, |
| | has_accepted_answer: Optional[bool] = None, |
| | views: Optional[int] = None, |
| | url: Optional[str] = None, |
| | user_id: Optional[int] = None, |
| | is_closed: Optional[bool] = None, |
| | is_wiki: Optional[bool] = None, |
| | is_migrated: Optional[bool] = None, |
| | has_notice: Optional[bool] = None, |
| | from_date: Optional[datetime] = None, |
| | to_date: Optional[datetime] = None, |
| | sort_by: Optional[str] = "votes", |
| | limit: Optional[int] = 5, |
| | include_comments: bool = False, |
| | retries: Optional[int] = 3 |
| | ) -> List[SearchResult]: |
| | """Advanced search for Stack Overflow questions with many filter options.""" |
| | params = { |
| | "site": "stackoverflow", |
| | "sort": sort_by, |
| | "order": "desc", |
| | "filter": "withbody" |
| | } |
| | |
| | if query: |
| | params["q"] = query |
| | |
| | if tags: |
| | params["tagged"] = ";".join(tags) |
| | |
| | if excluded_tags: |
| | params["nottagged"] = ";".join(excluded_tags) |
| | |
| | if title: |
| | params["title"] = title |
| | |
| | if body: |
| | params["body"] = body |
| | |
| | if answers is not None: |
| | params["answers"] = str(answers) |
| | |
| | if has_accepted_answer is not None: |
| | params["accepted"] = "true" if has_accepted_answer else "false" |
| | |
| | if views is not None: |
| | params["views"] = str(views) |
| | |
| | if url: |
| | params["url"] = url |
| | |
| | if user_id is not None: |
| | params["user"] = str(user_id) |
| | |
| | if is_closed is not None: |
| | params["closed"] = "true" if is_closed else "false" |
| | |
| | if is_wiki is not None: |
| | params["wiki"] = "true" if is_wiki else "false" |
| | |
| | if is_migrated is not None: |
| | params["migrated"] = "true" if is_migrated else "false" |
| | |
| | if has_notice is not None: |
| | params["notice"] = "true" if has_notice else "false" |
| | |
| | if from_date: |
| | params["fromdate"] = str(int(from_date.timestamp())) |
| | |
| | if to_date: |
| | params["todate"] = str(int(to_date.timestamp())) |
| | |
| | if limit: |
| | params["pagesize"] = str(limit) |
| | |
| | if self.api_key: |
| | params["key"] = self.api_key |
| | |
| | async def _do_search(): |
| | response = await self.client.get(f"{STACKOVERFLOW_API}/search/advanced", params=params) |
| | response.raise_for_status() |
| | return response.json() |
| | |
| | data = await self._with_rate_limit(_do_search, retries=retries) |
| | |
| | questions = [] |
| | question_ids = [] |
| | |
| | for question_data in data.get("items", []): |
| | if min_score is not None and question_data.get("score", 0) < min_score: |
| | continue |
| | |
| | question = StackOverflowQuestion( |
| | question_id=question_data.get("question_id"), |
| | title=question_data.get("title", ""), |
| | body=question_data.get("body", ""), |
| | score=question_data.get("score", 0), |
| | answer_count=question_data.get("answer_count", 0), |
| | is_answered=question_data.get("is_answered", False), |
| | accepted_answer_id=question_data.get("accepted_answer_id"), |
| | creation_date=question_data.get("creation_date", 0), |
| | last_activity_date=question_data.get("last_activity_date", 0), |
| | view_count=question_data.get("view_count", 0), |
| | tags=question_data.get("tags", []), |
| | link=question_data.get("link", ""), |
| | is_closed=question_data.get("closed_date") is not None, |
| | owner=question_data.get("owner") |
| | ) |
| | questions.append(question) |
| | question_ids.append(question.question_id) |
| | |
| | answers_by_question = await self.fetch_batch_answers(question_ids) |
| | |
| | results = [] |
| | |
| | if include_comments: |
| | all_post_ids = question_ids.copy() |
| | for qid, answers in answers_by_question.items(): |
| | all_post_ids.extend([a.answer_id for a in answers]) |
| | |
| | |
| | all_comments = await self.fetch_batch_comments(all_post_ids) |
| | |
| | |
| | for question in questions: |
| | question_answers = answers_by_question.get(question.question_id, []) |
| | |
| | |
| | question_comments = all_comments.get(question.question_id, []) |
| | answer_comments = {} |
| | |
| | for answer in question_answers: |
| | answer_comments[answer.answer_id] = all_comments.get(answer.answer_id, []) |
| | |
| | comments = SearchResultComments( |
| | question=question_comments, |
| | answers=answer_comments |
| | ) |
| | |
| | results.append(SearchResult( |
| | question=question, |
| | answers=question_answers, |
| | comments=comments |
| | )) |
| | else: |
| | for question in questions: |
| | question_answers = answers_by_question.get(question.question_id, []) |
| | results.append(SearchResult( |
| | question=question, |
| | answers=question_answers, |
| | comments=None |
| | )) |
| | |
| | return results |
| | |
| | async def search_by_query( |
| | self, |
| | query: str, |
| | tags: Optional[List[str]] = None, |
| | excluded_tags: Optional[List[str]] = None, |
| | min_score: Optional[int] = None, |
| | title: Optional[str] = None, |
| | body: Optional[str] = None, |
| | has_accepted_answer: Optional[bool] = None, |
| | answers: Optional[int] = None, |
| | sort_by: Optional[str] = "votes", |
| | limit: Optional[int] = 5, |
| | include_comments: bool = False, |
| | retries: Optional[int] = 3 |
| | ) -> List[SearchResult]: |
| | """Search Stack Overflow for questions matching a query with additional filters.""" |
| | return await self.advanced_search( |
| | query=query, |
| | tags=tags, |
| | excluded_tags=excluded_tags, |
| | min_score=min_score, |
| | title=title, |
| | body=body, |
| | has_accepted_answer=has_accepted_answer, |
| | answers=answers, |
| | sort_by=sort_by, |
| | limit=limit, |
| | include_comments=include_comments, |
| | retries=retries |
| | ) |
| | |
| | async def fetch_answers(self, question_id: int) -> List[StackOverflowAnswer]: |
| | """Fetch answers for a specific question. |
| | |
| | Note: This is kept for backward compatibility, but new code should |
| | use fetch_batch_answers for better performance. |
| | """ |
| | answers_dict = await self.fetch_batch_answers([question_id]) |
| | return answers_dict.get(question_id, []) |
| | |
| | async def fetch_comments(self, post_id: int) -> List[StackOverflowComment]: |
| | """Fetch comments for a specific post. |
| | |
| | Note: This is kept for backward compatibility, but new code should |
| | use fetch_batch_comments for better performance. |
| | """ |
| | comments_dict = await self.fetch_batch_comments([post_id]) |
| | return comments_dict.get(post_id, []) |
| | |
| | async def get_question(self, question_id: int, include_comments: bool = True) -> SearchResult: |
| | """Get a specific question by ID.""" |
| | params = { |
| | "site": "stackoverflow", |
| | "filter": "withbody" |
| | } |
| | |
| | if self.api_key: |
| | params["key"] = self.api_key |
| | |
| | async def _do_fetch(): |
| | response = await self.client.get( |
| | f"{STACKOVERFLOW_API}/questions/{question_id}", |
| | params=params |
| | ) |
| | response.raise_for_status() |
| | return response.json() |
| | |
| | data = await self._with_rate_limit(_do_fetch) |
| | |
| | if not data.get("items"): |
| | raise ValueError(f"Question with ID {question_id} not found") |
| | |
| | question_data = data["items"][0] |
| | question = StackOverflowQuestion( |
| | question_id=question_data.get("question_id"), |
| | title=question_data.get("title", ""), |
| | body=question_data.get("body", ""), |
| | score=question_data.get("score", 0), |
| | answer_count=question_data.get("answer_count", 0), |
| | is_answered=question_data.get("is_answered", False), |
| | accepted_answer_id=question_data.get("accepted_answer_id"), |
| | creation_date=question_data.get("creation_date", 0), |
| | last_activity_date=question_data.get("last_activity_date", 0), |
| | view_count=question_data.get("view_count", 0), |
| | tags=question_data.get("tags", []), |
| | link=question_data.get("link", ""), |
| | is_closed=question_data.get("closed_date") is not None, |
| | owner=question_data.get("owner") |
| | ) |
| | |
| | answers = await self.fetch_answers(question.question_id) |
| | |
| | comments = None |
| | if include_comments: |
| | post_ids = [question.question_id] + [answer.answer_id for answer in answers] |
| | all_comments = await self.fetch_batch_comments(post_ids) |
| | |
| | question_comments = all_comments.get(question.question_id, []) |
| | answer_comments = {} |
| | |
| | for answer in answers: |
| | answer_comments[answer.answer_id] = all_comments.get(answer.answer_id, []) |
| | |
| | comments = SearchResultComments( |
| | question=question_comments, |
| | answers=answer_comments |
| | ) |
| | |
| | return SearchResult( |
| | question=question, |
| | answers=answers, |
| | comments=comments |
| | ) |