bddinh3's picture
Upload 10 files
dda5804 verified
"""
RAG chains and backend health checks for the AI Litigation Tracker.
This module provides:
- case_specific_qa:
RAG over a single case (looked up by docket number or case name).
- global_qa:
RAG over the full case corpus via the Pinecone index.
- ping_backends:
Lightweight health check for OpenAI and Pinecone connectivity.
It assumes:
- A Pinecone index is configured in vectorstore.cases_vectorstore.
- Case-level text blobs are stored under data/case_blobs/.
- OPENAI_API_KEY and PINECONE_API_KEY are set in the environment.
"""
import os
from pathlib import Path
from typing import Optional, List, Dict, Any
from dotenv import load_dotenv
from openai import OpenAI
from pinecone import Pinecone
from langchain_core.runnables import RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from vectorstore.cases_vectorstore import (
query_global,
get_case_by_filter,
PINECONE_INDEX,
)
# Load environment variables from .env if present
load_dotenv()
# Single OpenAI client reused across calls
_oai = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
# System prompt shared across case-level and global Q&A
SYS = (
"You are a careful legal research assistant helping summarize and explain "
"U.S. court cases about AI and technology.\n\n"
"Rules:\n"
"1. Use only the provided case context (metadata and excerpts).\n"
"2. If the answer is not clearly supported by that context, say you cannot find it.\n"
"3. Always include citations in the format (Case Name, Docket).\n"
"4. Be concise, neutral, and non-speculative. Do not give legal advice; "
"only describe what the documents say.\n"
"5. Never guess or infer facts beyond what is stated in the context; if something "
"is not stated, treat it as unknown.\n"
)
# ============================================================
# Low-level OpenAI wrapper (LangChain-compatible)
# ============================================================
def _invoke_openai(payload: Dict[str, str]) -> str:
"""
Thin wrapper around the OpenAI Chat Completions API.
Expects:
payload["system"]: system message
payload["user"]: user message
Returns:
The assistant's message content as a plain string.
"""
resp = _oai.chat.completions.create(
model=os.getenv("OPENAI_LLM_MODEL", "gpt-4o-mini"),
messages=[
{"role": "system", "content": payload["system"]},
{"role": "user", "content": payload["user"]},
],
temperature=0.1,
)
return resp.choices[0].message.content
# Turn the low-level call into a LangChain Runnable
_LC_CALL = RunnableLambda(_invoke_openai) | StrOutputParser()
# ============================================================
# Helpers: formatting and context loading
# ============================================================
def _format_case_blob(md: Dict[str, Any]) -> str:
"""
Format a compact metadata header for a case.
Included keys (when present and non-empty):
- case_name, docket_number
- court_id, jurisdiction
- filing_date, latest_update
- n_docs, courtlistener_url
"""
keys = [
"case_name",
"docket_number",
"court_id",
"jurisdiction",
"filing_date",
"latest_update",
"n_docs",
"courtlistener_url",
]
lines: List[str] = []
for k in keys:
v = md.get(k)
if v not in (None, ""):
lines.append(f"{k}: {v}")
return "\n".join(lines)
# Case blobs are written by data_updating_scripts.build_cases_vectorstore
BLOB_DIR = Path("data/case_blobs")
def _blob_path(md: Dict[str, Any]) -> Path:
"""
Compute the expected text blob path for a given case.
Uses court_id and docket_number (with ':' replaced by '-') to match
the naming convention from build_cases_vectorstore.py.
"""
court = md.get("court_id", "")
docket = (md.get("docket_number", "") or "").replace(":", "-")
return BLOB_DIR / f"{court}_{docket}.txt"
def _load_context(md: Dict[str, Any], max_chars: int = 6000) -> str:
"""
Load a truncated text snapshot for a case from its blob file.
Args:
md:
Case metadata dict. Should include court_id and docket_number
so the blob path can be resolved.
max_chars:
Maximum number of characters from the blob to include in the prompt.
Returns:
A plain-text string (possibly empty). If the blob file does not exist,
returns an empty string.
"""
p = _blob_path(md)
if not p.exists():
# If blobs have not been generated yet, RAG still degrades gracefully.
return ""
txt = p.read_text(encoding="utf-8", errors="ignore")
return txt[:max_chars]
# ============================================================
# Public chains
# ============================================================
def case_specific_qa(
question: str,
*,
docket_number: Optional[str] = None,
case_name: Optional[str] = None,
) -> str:
"""
Answer a question using a single selected case.
Resolution strategy:
1. Prefer an exact match on docket_number (most precise).
2. If docket_number is not provided, fall back to case_name.
The chain:
- Looks up the case in the vectorstore metadata.
- Loads a truncated excerpt from the corresponding case blob (if present).
- Sends metadata plus excerpt as context to the LLM.
- Instructs the model to stay grounded in that content and to admit
when the answer is not supported.
Args:
question:
Natural language question about the selected case.
docket_number:
Optional exact docket (for example, "8:23-cv-02367").
case_name:
Optional case name filter, used if docket_number is not provided.
Returns:
A concise answer string, or an informative message if the case or
context cannot be found.
"""
md = get_case_by_filter(docket_number=docket_number, case_name=case_name)
if not md:
return "I couldn’t find that case in the index."
ctx_txt = _load_context(md)
header = _format_case_blob(md)
if not ctx_txt:
# If no text snapshot is available, fall back to metadata-only context.
prompt = f"""QUESTION:
{question}
CASE METADATA:
{header}
INSTRUCTIONS:
- Answer only if the information is clearly supported by the metadata above.
- If the question cannot be answered from this metadata, say you cannot find it
in the provided case materials.
- Be concise (3–6 sentences).
- When you rely on something from the metadata, cite it as
({md.get('case_name','?')}, {md.get('docket_number','?')}).
ANSWER:"""
return _LC_CALL.invoke({"system": SYS, "user": prompt})
prompt = f"""QUESTION:
{question}
CASE METADATA:
{header}
CASE EXCERPTS (truncated snapshot):
\"\"\"{ctx_txt}\"\"\"
INSTRUCTIONS:
- Use only the metadata and excerpts above as your sources.
- If the question cannot be answered from this material, say you cannot find it
in the provided case materials.
- Be concise (3–6 sentences unless a short bullet list is clearly better).
- When you rely on something from the text, cite it as
({md.get('case_name','?')}, {md.get('docket_number','?')}).
ANSWER:"""
return _LC_CALL.invoke({"system": SYS, "user": prompt})
def global_qa(question: str, top_k: int = 4) -> str:
"""
Answer a question across all cases using RAG.
The chain:
- Uses the global vectorstore to retrieve the top-k most relevant cases.
- For each hit, loads a text excerpt plus a metadata header.
- Concatenates these into a single context block for the LLM.
- Instructs the model to answer only from this context and to admit
when an answer is not supported.
Args:
question:
Natural language question about the overall litigation corpus.
top_k:
Number of cases to retrieve from the vectorstore.
Returns:
A concise answer string, or a message indicating that no relevant
cases were found.
"""
hits = query_global(question, top_k=top_k)
if not hits:
return "No relevant cases found."
contexts: List[str] = []
for h in hits:
header = _format_case_blob(h)
ctx_txt = _load_context(h, max_chars=2500)
if ctx_txt:
contexts.append(f"{header}\n---\n{ctx_txt}")
else:
contexts.append(f"{header}\n(no text snapshot available)")
joined = "\n\n====\n\n".join(contexts)
prompt = f"""QUESTION:
{question}
YOU ARE GIVEN {len(hits)} CANDIDATE CASES.
Each case has a metadata header and (when available) an excerpt.
CASES CONTEXT:
{joined}
INSTRUCTIONS:
- Use only the cases and excerpts above as your sources.
- If the question cannot be answered from these materials, say you cannot find it
in the provided cases.
- Be concise (4–8 sentences total, plus citations).
- When making a claim, indicate which case supports it using the format
(Case Name, Docket). If multiple cases support the same point, you may cite
more than one.
- If the answer depends on differences between cases, briefly compare them.
ANSWER:"""
return _LC_CALL.invoke({"system": SYS, "user": prompt})
# ============================================================
# Sidebar health check for Streamlit
# ============================================================
def ping_backends() -> Dict[str, Optional[Any]]:
"""
Lightweight health check used by app.py in the sidebar.
Returns:
A dict with:
"openai": bool indicating whether a tiny embedding call succeeded.
"pinecone": bool indicating whether the configured index name
is present in the account.
"index_name": Name of the Pinecone index if found, else None.
Notes:
- This function is intentionally forgiving: exceptions are caught
and encoded as False rather than raised.
- It is meant only for user feedback, not strict monitoring.
"""
out: Dict[str, Optional[Any]] = {
"openai": False,
"pinecone": False,
"index_name": None,
}
# Tiny embed call to confirm that OpenAI credentials are valid
try:
_oai.embeddings.create(
model=os.getenv("OPENAI_EMBED_MODEL", "text-embedding-3-small"),
input="ping",
)
out["openai"] = True
except Exception:
# Keep False; the sidebar will show a warning instead of crashing.
pass
# Check whether the configured Pinecone index is visible
try:
pc = Pinecone(api_key=os.environ["PINECONE_API_KEY"])
names = [i["name"] for i in pc.list_indexes()]
out["pinecone"] = PINECONE_INDEX in names
if out["pinecone"]:
out["index_name"] = PINECONE_INDEX
except Exception:
# Keep False; missing or invalid Pinecone config is reported gently.
pass
return out