abhinav-047's picture
Add toolsforgitnotionslack folder as a folder for my code which can be run nativelly and as a fastapi endpoint
4499d27
"""
FastAPI endpoint for the Enterprise Knowledge Assistant.
pip install fastapi uvicorn
Run:
uvicorn app:app --reload --port 8000
Endpoints:
POST /chat β€” single-turn or multi-turn conversation
DELETE /chat β€” clear session history and cache
GET /health β€” service status
"""
from dotenv import load_dotenv
load_dotenv()
import asyncio, json, os, time, uuid
from contextlib import asynccontextmanager
from typing import Optional
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from openai import AsyncOpenAI
from tools.github_tools import GITHUB_TOOLS, GITHUB_TOOL_FNS
from tools.slack_tools import SLACK_TOOLS, SLACK_TOOL_FNS
from tools.notion_tools import NOTION_TOOLS, NOTION_TOOL_FNS
from agent.cache import Cache
from agent.planner import build_system_prompt
# ── Config ─────────────────────────────────────────────────────────────────────
MODEL = "gpt-4o-mini"
GITHUB_REPO = os.environ.get("GITHUB_REPO", "")
client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
ALL_TOOLS = GITHUB_TOOLS + SLACK_TOOLS + NOTION_TOOLS
ALL_FNS = {**GITHUB_TOOL_FNS, **SLACK_TOOL_FNS, **NOTION_TOOL_FNS}
tool_cache = Cache(ttl_seconds=300)
# session_id β†’ list of message dicts
sessions: dict[str, list[dict]] = {}
# ── Lifespan ───────────────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
missing = [v for v in ["OPENAI_API_KEY"] if not os.environ.get(v)]
if missing:
print(f"⚠️ Missing env vars: {', '.join(missing)}")
print(f" GitHub {'βœ“' if os.environ.get('GITHUB_TOKEN') else 'βœ—'} "
f"Slack {'βœ“' if os.environ.get('SLACK_BOT_TOKEN') else 'βœ—'} "
f"Notion {'βœ“' if os.environ.get('NOTION_API_TOKEN') else 'βœ—'}")
yield
# ── App ────────────────────────────────────────────────────────────────────────
app = FastAPI(
title="Enterprise Knowledge Assistant",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# ── Schemas ────────────────────────────────────────────────────────────────────
class ChatRequest(BaseModel):
message: str
session_id: Optional[str] = None # omit to start a new session
class ToolCall(BaseModel):
name: str
args: dict
result: str
cached: bool
class ChatResponse(BaseModel):
answer: str
session_id: str
elapsed_seconds: float
tool_calls: list[ToolCall]
class ClearResponse(BaseModel):
cleared: bool
session_id: str
class HealthResponse(BaseModel):
status: str
model: str
github: bool
slack: bool
notion: bool
active_sessions: int
# ── Agent ──────────────────────────────────────────────────────────────────────
async def dispatch(name: str, args: dict) -> tuple[str, bool]:
"""Returns (result, was_cached)."""
key = f"{name}:{json.dumps(args, sort_keys=True)}"
cached = tool_cache.get(key)
if cached:
return cached, True
fn = ALL_FNS.get(name)
result = await fn(**args) if fn else f"Unknown tool: {name}"
tool_cache.set(key, result)
return result, False
async def run_agent(history: list[dict]) -> tuple[str, list[ToolCall]]:
system = build_system_prompt(GITHUB_REPO)
messages = [{"role": "system", "content": system}] + history
tool_log: list[ToolCall] = []
for _ in range(14):
response = await client.chat.completions.create(
model=MODEL,
messages=messages,
tools=ALL_TOOLS,
tool_choice="auto",
)
msg = response.choices[0].message
tool_calls = msg.tool_calls or []
if not tool_calls:
return msg.content or "", tool_log
messages.append(msg)
names_and_args = [
(tc.function.name, json.loads(tc.function.arguments))
for tc in tool_calls
]
results = await asyncio.gather(*[
dispatch(name, args) for name, args in names_and_args
])
for tc, (name, args), (result, was_cached) in zip(tool_calls, names_and_args, results):
tool_log.append(ToolCall(name=name, args=args,
result=result[:500], cached=was_cached))
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": result,
})
return "⚠️ Reached reasoning limit. Try a more specific question.", tool_log
# ── Routes ─────────────────────────────────────────────────────────────────────
@app.post("/chat", response_model=ChatResponse)
async def chat(req: ChatRequest):
session_id = req.session_id or str(uuid.uuid4())
history = sessions.setdefault(session_id, [])
history.append({"role": "user", "content": req.message})
t0 = time.monotonic()
try:
answer, tool_log = await run_agent(history)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
history.append({"role": "assistant", "content": answer})
# cap session size
if len(history) > 60:
sessions[session_id] = history[-60:]
return ChatResponse(
answer=answer,
session_id=session_id,
elapsed_seconds=round(time.monotonic() - t0, 2),
tool_calls=tool_log,
)
@app.delete("/chat", response_model=ClearResponse)
async def clear_session(session_id: str):
existed = session_id in sessions
sessions.pop(session_id, None)
tool_cache.clear()
return ClearResponse(cleared=existed, session_id=session_id)
@app.get("/health", response_model=HealthResponse)
async def health():
return HealthResponse(
status="ok",
model=MODEL,
github=bool(os.environ.get("GITHUB_TOKEN")),
slack=bool(os.environ.get("SLACK_BOT_TOKEN")),
notion=bool(os.environ.get("NOTION_API_TOKEN")),
active_sessions=len(sessions),
)