# © 2025 Elena Marziali — Code released under Apache 2.0 license. # See LICENSE in the repository for details. # Removal of this copyright is prohibited. # === Asynchronous Functions === MAX_REQUESTS = 5 API_SEMAPHORE = asyncio.Semaphore(MAX_REQUESTS) async def safe_api_request(url): async with API_SEMAPHORE: async with aiohttp.ClientSession() as session: try: async with session.get(url, timeout=10) as response: response.raise_for_status() return await response.json() except Exception as e: logging.error(f"API request error: {e}") return None # Connection pooling async def safe_api_request(url): async with aiohttp.ClientSession() as session: try: async with session.get(url, timeout=10) as response: response.raise_for_status() return await response.json() except Exception as e: logging.error(f"API request error: {e}") return None # Smart timeout import asyncio async def timeout_handler(task, timeout=20): try: return await asyncio.wait_for(task, timeout) except asyncio.TimeoutError: logging.error("API request timed out") return None import requests url = "http://export.arxiv.org/api/query?search_query=all:physics&start=0&max_results=1" response = requests.get(url, timeout=50) if response.status_code == 200: print("Connection to arXiv OK") else: print(f"Connection error: {response.status_code}") # Advanced parallelization async def fetch_multiple_data(urls): tasks = [safe_api_request(url) for url in urls] results = await asyncio.gather(*tasks, return_exceptions=True) return results # Retrieve scientific sources from Zenodo async def search_zenodo_async(query, max_results=5): """ Searches for open access articles and resources from Zenodo using their public API. """ url = f"https://zenodo.org/api/records/?q={query}&size={max_results}" async with aiohttp.ClientSession() as session: try: async with session.get(url, timeout=10) as response: response.raise_for_status() data = await response.json() articles = [] for hit in data.get("hits", {}).get("hits", []): title = hit.get("metadata", {}).get("title", "Title not available") authors = ", ".join([c.get("name", "") for c in hit.get("metadata", {}).get("creators", [])]) abstract = hit.get("metadata", {}).get("description", "Abstract not available") link = hit.get("links", {}).get("html", "No link") articles.append({ "title": title, "authors": authors, "abstract": abstract, "url": link }) return articles if articles else [{"error": "No results found on Zenodo."}] except Exception as e: return [] # Retrieve scientific sources from PubMed async def search_pubmed_async(query, max_results=5): """ Asynchronously retrieves scientific articles from PubMed. """ url = f"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi?db=pubmed&term={query}&retmax={max_results}&retmode=xml" async with aiohttp.ClientSession() as session: try: async with session.get(url, timeout=10) as response: response.raise_for_status() content = await response.text() root = ET.fromstring(content) articles = [] for id_element in root.findall(".//Id"): pubmed_id = id_element.text articles.append(f"https://pubmed.ncbi.nlm.nih.gov/{pubmed_id}/") # Article links return articles except Exception as e: return f"PubMed error: {e}" # Function to handle asynchronous responses from arXiv def parse_arxiv_response(content): """ Extracts titles and abstracts from arXiv articles. """ try: root = ET.fromstring(content) except ET.ParseError: logging.error("Error parsing arXiv XML.") return [] articles = [] for entry in root.findall(".//entry"): title = entry.find("title").text if entry.find("title") is not None else "Title not available" abstract = entry.find("summary").text if entry.find("summary") is not None else "Abstract not available" articles.append({"title": title, "abstract": abstract}) return articles # === Asynchronous search on arXiv === # Queries the arXiv API to retrieve scientific articles. async def search_arxiv_async(query, max_results=3, retry_attempts=3, timeout=20): """ Retrieves scientific articles from arXiv with advanced error handling. """ url = f"http://export.arxiv.org/api/query?search_query=all:{query}&start=0&max_results={max_results}" async with aiohttp.ClientSession() as session: for attempt in range(retry_attempts): try: async with session.get(url, timeout=timeout) as response: response.raise_for_status() content = await response.text() if not content.strip(): raise ValueError("Error: Empty response from arXiv.") return parse_arxiv_response(content) except (aiohttp.ClientError, asyncio.TimeoutError, ValueError) as e: wait_time = min(2 ** attempt + np.random.uniform(0, 1), 10) # Max wait time: 10 seconds logging.error(f"Attempt {attempt+1}: Error - {e}. Retrying in {wait_time:.1f} seconds...") await asyncio.sleep(wait_time) logging.error("Error: Unable to retrieve data from arXiv after multiple attempts.") return [] # === Asynchronous search on OpenAlex === # Retrieves scientific articles with complete metadata (title, authors, abstract, DOI) async def search_openalex_async(query, max_results=5): """ Safely retrieves scientific articles from OpenAlex. """ url = f"https://api.openalex.org/works?filter=title.search:{query}&per-page={max_results}" async with aiohttp.ClientSession() as session: try: async with session.get(url, timeout=10) as response: response.raise_for_status() data = await response.json() articles = [] for record in data.get("results", []): title = record.get("title", "Title not available") authors = ", ".join([ aut.get("display_name", "Unknown author") for aut in record.get("authorships", []) ]) abstract = record.get("abstract", "Abstract not available") article_url = record.get("doi") or record.get("id", "No link") articles.append({ "title": title, "authors": authors, "abstract": abstract, "url": article_url }) return articles except Exception as e: return f"OpenAlex error: {e}" # === Synchronous search on BASE === # Queries the BASE engine for open-access articles. def search_base(query, max_results=5): url = f"https://api.base-search.net/cgi-bin/BaseHttpSearchInterface?q={query}&num={max_results}&format=json" try: response = requests.get(url) response.raise_for_status() data = response.json() results = [] for record in data.get("docs", []): title = record.get("dcTitle", ["Title not available"])[0] link = record.get("link", ["No link available"])[0] results.append(f"**{title}**\n[Link to article]({link})\n") return "\n\n".join(results) if results else "No results found." except Exception as e: return f"Error during BASE search: {e}" # === Distributed search across multiple databases === # Executes parallel queries on arXiv, OpenAlex, PubMed, Zenodo. async def search_multi_database(query): try: tasks = [ search_arxiv_async(query), search_openalex_async(query), search_pubmed_async(query), search_zenodo_async(query) ] results = await asyncio.gather(*tasks, return_exceptions=True) articles = [] for source in results: if isinstance(source, list): articles += source else: logging.warning(f"Invalid source: {type(source)} → {source}") # Normalize immediately after articles = normalize_articles(articles) if isinstance(articles, list) and all(isinstance(a, dict) for a in articles): formatted_search = format_articles(articles) else: logging.error(f"Error: 'articles' is not a valid list. Type received: {type(articles)} - Value: {repr(articles)}") formatted_search = "Unable to format search: response not properly structured." return articles, formatted_search except Exception as e: logging.error(f"Error during multi-database search: {e}") return [], "Internal error" # === Scientific Source Integration === # Selects the first N valid articles and formats them as Markdown references. async def integrate_sources_from_database(concept, max_sources=5): articles, formatted_search = await search_multi_database(concept) if not isinstance(articles, list) or not all(isinstance(a, dict) for a in articles): logging.warning("Invalid 'articles' structure. No sources will be displayed.") return "No valid sources available." references = [] for a in articles[:max_sources]: title = a.get("title", "Title not available") url = a.get("url", "#") if url and isinstance(url, str): references.append(f"- [{title}]({url})") return "\n".join(references) if references else "No relevant sources found." # === Data Normalization === # Converts heterogeneous input (dicts, strings, links) into a consistent list of articles. def normalize_source(source): if isinstance(source, list) and all(isinstance(x, dict) for x in source): return source elif isinstance(source, dict): # Single article as dictionary return [source] elif isinstance(source, str): # Unstructured string logging.warning(f"Ignored textual source: {source[:50]}...") return [] else: logging.warning(f"Invalid source type: {type(source)}") return [] def normalize_articles(article_list): valid_articles = [] for a in article_list: if isinstance(a, dict): valid_articles.append(a) elif isinstance(a, str) and "pubmed.ncbi.nlm.nih.gov" in a: valid_articles.append({ "title": "PubMed Link", "abstract": "Not available", "url": a, "authors": "Unknown" }) else: logging.warning(f"Ignored: {repr(a)}") return valid_articles articles, formatted_search = await search_multi_database("quantum physics") print(formatted_search) # === Async Task Protection Wrapper === # Handles timeouts and errors during asynchronous function execution. def protect_async_task(func): async def wrapper(*args, **kwargs): try: return await asyncio.wait_for(func(*args, **kwargs), timeout=20) except asyncio.CancelledError: logging.warning("Task cancelled.") return None except Exception as e: logging.error(f"Error during execution of {func.__name__}: {e}") return None return wrapper # === Asynchronous Scientific Explanation Generation === # Builds the prompt and invokes the LLM model. async def generate_explanation_async(problem, level, concept, topic): """Generates the explanation using the LLM asynchronously.""" prompt = prompt_template.format( problem=problem, concept=concept, topic=topic, level=level ) try: response = await asyncio.to_thread(llm.invoke, prompt.strip()) return response except Exception as e: logging.error(f"LLM API error: {e}") return "Error generating the response." # === Conditional Interactive Chart Generation === # Generates a chart based on the analyzed problem if requested. def generate_conditional_chart(problem, chart_choice): """Generates an interactive chart if requested.""" fig = None if chart_choice.lower() in ["yes", "y"]: try: fig = generate_interactive_chart(problem) if fig is None: raise ValueError("Chart not generated correctly.") print("Chart generated successfully!") except Exception as e: logging.error(f"Chart error: {e}") return fig # === Structured Output: Text + Chart === # Combines the generated explanation with the graphical visualization. async def generate_complete_result(problem, level, concept, topic, chart_choice): """Combines explanation and chart to generate a structured output.""" response = await generate_explanation_async(problem, level, concept, topic) chart = generate_conditional_chart(problem, chart_choice) return { "response": response, "chart": chart } # === Scientific Article Validation === # Checks that each article has a title, abstract, and URL. def validate_articles(raw_articles, max_articles=5): """ Validates and filters the list of articles received from an AI or API source. Returns a clean list of dictionaries containing at least 'title', 'abstract', and 'url'. """ if not isinstance(raw_articles, list): logging.warning(f"[validate_articles] Invalid input: expected list, received {type(raw_articles)}") return [] valid_articles = [] for i, art in enumerate(raw_articles): if not isinstance(art, dict): logging.warning(f"[validate_articles] Invalid element at position {i}: {type(art)}") continue title = art.get("title") abstract = art.get("abstract") url = art.get("url") if all([title, abstract, url]): valid_articles.append({ "title": str(title).strip(), "abstract": str(abstract).strip(), "url": str(url).strip() }) else: logging.info(f"[validate_articles] Article discarded due to incomplete data (i={i}).") if not valid_articles: logging.warning("[validate_articles] No valid articles after filtering.") return valid_articles[:max_articles]