darkfire514's picture
Upload 160 files
399b80c verified
import os
from pathlib import Path
from typing import Dict, Any, Optional
from openspace.grounding.core.session import BaseSession
from openspace.grounding.core.types import BackendType, SessionConfig
from openspace.grounding.core.tool import BaseTool
from openspace.grounding.core.transport.connectors import BaseConnector
from openspace.llm import LLMClient
from openspace.utils.logging import Logger
from dotenv import load_dotenv
# Load .env from openspace package root (4 levels up), then CWD fallback.
_PKG_ENV = Path(__file__).resolve().parent.parent.parent.parent / ".env" # openspace/.env
if _PKG_ENV.is_file():
load_dotenv(_PKG_ENV)
load_dotenv()
logger = Logger.get_logger(__name__)
try:
from openai import AsyncOpenAI
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
class WebConnector(BaseConnector):
def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url
self.client: Optional[AsyncOpenAI] = None
self._connected = False
async def connect(self) -> None:
if self._connected:
return
if not OPENAI_AVAILABLE:
raise RuntimeError(
"OpenAI library not available. Install with: pip install openai"
)
if not self.api_key:
raise RuntimeError(
"API key not provided. Set OPENROUTER_API_KEY environment variable "
"or provide deep_research_api_key in config."
)
self.client = AsyncOpenAI(
base_url=self.base_url,
api_key=self.api_key
)
self._connected = True
logger.info(f"Web connector connected to {self.base_url}")
async def disconnect(self) -> None:
if not self._connected:
return
self.client = None
self._connected = False
logger.info("Web connector disconnected")
@property
def is_connected(self) -> bool:
return self._connected
async def invoke(self, name: str, params: dict) -> Any:
if name == "chat_completion":
if not self.client:
raise RuntimeError("Client not connected")
return await self.client.chat.completions.create(**params)
raise NotImplementedError(f"Unknown method: {name}")
async def request(self, *args: Any, **kwargs: Any) -> Any:
raise NotImplementedError("Web backend uses invoke() instead of request()")
class WebSession(BaseSession):
backend_type = BackendType.WEB
def __init__(
self,
*,
session_id: str,
config: SessionConfig,
deep_research_api_key: Optional[str] = None,
deep_research_base_url: str = "https://openrouter.ai/api/v1",
auto_connect: bool = True,
auto_initialize: bool = True
):
api_key = deep_research_api_key or os.getenv("OPENROUTER_API_KEY")
connector = WebConnector(
api_key=api_key or "", # Empty string will raise an error when connect
base_url=deep_research_base_url
)
super().__init__(
connector=connector,
session_id=session_id,
backend_type=BackendType.WEB,
auto_connect=auto_connect,
auto_initialize=auto_initialize
)
self.config = config
@property
def web_connector(self) -> WebConnector:
return self.connector
async def initialize(self) -> Dict[str, Any]:
"""Connect to WebConnector and register tools.
BaseSession in __aenter__ will call connect() according to auto_connect,
but in provider.create_session directly instantiating Session will not trigger this logic.
Therefore, we need to explicitly ensure that the connection is established, avoiding AttributeError
when DeepResearchTool is called and `self.web_connector.client` is still None.
"""
# If the connection is not established, connect explicitly
if not self.is_connected:
try:
await self.connect()
except Exception as e:
logger.error(f"Failed to connect WebSession {self.session_id}: {e}")
raise
if self.tools:
logger.debug(f"Web session {self.session_id} already initialized, skipping")
return {
"tools": [t.name for t in self.tools],
"backend": BackendType.WEB.value
}
self.tools = [DeepResearchTool(session=self)]
logger.info(f"Initialized Web session {self.session_id} with AI Deep Research tool")
return {
"tools": [t.name for t in self.tools],
"backend": BackendType.WEB.value
}
class DeepResearchTool(BaseTool):
backend_type = BackendType.WEB
_name = "deep_research_agent"
_description = """Knowledge Research Tool - Primary tool for acquiring external knowledge
PURPOSE:
Acquires comprehensive knowledge from the web through deep research and analysis.
Powered by Perplexity AI's sonar-deep-research model, then post-processed to extract
actionable insights and concise summaries. The main tool for gathering information
beyond existing knowledge base.
WHEN TO USE:
- Information needed on professional/technical topics
- Research on technical problems, concepts, or implementations
- Understanding of latest developments, trends, or news
- Comparison of different approaches, tools, or solutions
- Factual information, definitions, or explanations required
- Synthesis from multiple authoritative sources needed
HOW IT WORKS:
1. Conducts deep web search using Perplexity's sonar-deep-research
2. Analyzes and synthesizes information from multiple sources
3. Post-processes to distill knowledge-dense summary retaining critical details
4. Returns comprehensive summary ready for immediate use
RETURNS:
Knowledge-dense comprehensive summary (400-600 words) that:
- Retains important details and technical specifics
- Focuses on substantive knowledge without losing critical information
- Organized and structured for clarity
- Directly usable by agents for decision-making and task execution
NOT DESIGNED FOR:
- Tasks requiring browser interaction or UI manipulation
- Direct file downloads or web scraping operations
- Real-time system operations or executions
USAGE GUIDELINES:
- Frame clear, specific questions (e.g., "Explain the architecture of Transformer models")
- Specify context when needed (e.g., "Compare PostgreSQL vs MySQL for high-concurrency scenarios")
- Suitable for any knowledge or information acquisition needs
"""
def __init__(
self,
session: WebSession
):
super().__init__()
self._session = session
self._llm = LLMClient()
async def _arun(self, query: str) -> str:
if not query:
return "ERROR: Missing required parameter: query"
try:
# Step 1: Deep research
logger.info(f"Start deep research: {query}")
completion = await self._session.web_connector.client.chat.completions.create(
model="perplexity/sonar-deep-research",
messages=[{"role": "user", "content": query}]
)
full_answer = completion.choices[0].message.content
logger.info(f"Research completed, length: {len(full_answer)} characters")
# Step 2: Use LLMClient to generate summary and distill key points
logger.info(f"Begin to distill key points...")
SUMMARY_AGENT_PROMPT = f"""Please distill the following deep research results into a knowledge-dense summary. Requirements:
Provide a comprehensive yet concise summary (400-600 words):
- Focus on SUBSTANTIVE knowledge and key information
- Retain important details, technical specifics, and concrete facts
- Do NOT sacrifice critical information for brevity
- Organize information clearly and logically with proper structure
- Remove only redundancy and verbose explanations
- Include actionable insights and decision-relevant information
- Make it directly usable for task execution and decision-making
Output ONLY the summary text, no additional formatting or JSON structure needed.
Deep Research Results:
{full_answer}
"""
summary_response = await self._llm.complete(SUMMARY_AGENT_PROMPT)
summary = summary_response["message"]["content"].strip()
logger.info(f"Summary generation completed")
return summary
except Exception as e:
logger.error(f"Deep research failed: {e}")
return f"ERROR: AI research failed: {e}"