| | import os |
| | import streamlit as st |
| | import pandas as pd |
| | import openai |
| | import sqlite3 |
| | import json |
| | import numpy as np |
| | import datetime |
| | import re |
| | from langchain.chains import RetrievalQA |
| | from langchain.schema import Document |
| | from langchain_core.retrievers import BaseRetriever |
| | from pydantic import Field |
| | from langchain_openai import ChatOpenAI |
| | from langchain.prompts import ChatPromptTemplate |
| |
|
| | DB_PATH = "json_vector.db" |
| | OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
| | EMBEDDING_MODEL = "text-embedding-ada-002" |
| |
|
| | if "ingested_batches" not in st.session_state: |
| | st.session_state.ingested_batches = 0 |
| | if "messages" not in st.session_state: |
| | st.session_state.messages = [] |
| | if "json_links" not in st.session_state: |
| | st.session_state.json_links = [] |
| | if "json_link_details" not in st.session_state: |
| | st.session_state.json_link_details = {} |
| |
|
| | st.set_page_config(page_title="Chat with Your JSON Vectors (Hybrid, Clean)", layout="wide") |
| | st.title("Chat with Your Vectorized JSON Files") |
| |
|
| | uploaded_files = st.file_uploader( |
| | "Upload JSON files in batches (any structure)", type="json", accept_multiple_files=True |
| | ) |
| |
|
| | def flatten_json_obj(obj, parent_key="", sep="."): |
| | items = {} |
| | if isinstance(obj, dict): |
| | for k, v in obj.items(): |
| | new_key = f"{parent_key}{sep}{k}" if parent_key else k |
| | if ( |
| | k.lower() in {"customer", "user", "email", "username"} and |
| | isinstance(v, str) and "@" in v |
| | ): |
| | local = v.split("@")[0] |
| | local_clean = re.sub(r'[^a-zA-Z0-9]', ' ', local) |
| | parts = [part for part in local_clean.split() if part] |
| | if parts: |
| | items[new_key + "_name"] = parts[0].lower() |
| | items[new_key + "_all_names"] = " ".join(parts).lower() |
| | items.update(flatten_json_obj(v, new_key, sep=sep)) |
| | elif isinstance(obj, list): |
| | for i, v in enumerate(obj): |
| | new_key = f"{parent_key}{sep}{i}" if parent_key else str(i) |
| | items.update(flatten_json_obj(v, new_key, sep=sep)) |
| | else: |
| | items[parent_key] = obj |
| | return items |
| |
|
| | def get_embedding(text): |
| | client = openai.OpenAI(api_key=OPENAI_API_KEY) |
| | response = client.embeddings.create(input=[text], model=EMBEDDING_MODEL) |
| | return response.data[0].embedding |
| |
|
| | def ensure_table(): |
| | conn = sqlite3.connect(DB_PATH) |
| | cursor = conn.cursor() |
| | cursor.execute(""" |
| | CREATE TABLE IF NOT EXISTS json_records ( |
| | id INTEGER PRIMARY KEY AUTOINCREMENT, |
| | batch_time TEXT, |
| | source_file TEXT, |
| | raw_json TEXT, |
| | flat_text TEXT, |
| | embedding BLOB |
| | ) |
| | """) |
| | conn.commit() |
| | conn.close() |
| |
|
| | def ingest_json_files(files): |
| | ensure_table() |
| | rows = [] |
| | batch_time = datetime.datetime.utcnow().isoformat() |
| | for file in files: |
| | file.seek(0) |
| | raw = json.load(file) |
| | source_name = file.name |
| | records = raw if isinstance(raw, list) else [raw] |
| | for rec in records: |
| | flat = flatten_json_obj(rec) |
| | flat_text = "; ".join([f"{k}: {v}" for k, v in flat.items()]) |
| | rows.append((batch_time, source_name, json.dumps(rec), flat_text)) |
| | if not rows: |
| | st.warning("No records found in uploaded files!") |
| | return |
| | df = pd.DataFrame(rows, columns=["batch_time", "source_file", "raw_json", "flat_text"]) |
| | st.write(f"Flattened {len(df)} records. Generating embeddings (this may take time, please wait)...") |
| | df["embedding"] = df["flat_text"].apply(get_embedding) |
| | conn = sqlite3.connect(DB_PATH) |
| | cursor = conn.cursor() |
| | for _, row in df.iterrows(): |
| | emb_bytes = np.array(row.embedding, dtype=np.float32).tobytes() |
| | cursor.execute(""" |
| | INSERT INTO json_records (batch_time, source_file, raw_json, flat_text, embedding) |
| | VALUES (?, ?, ?, ?, ?) |
| | """, (row.batch_time, row.source_file, row.raw_json, row.flat_text, emb_bytes)) |
| | conn.commit() |
| | conn.close() |
| | st.success(f"Ingested and indexed {len(df)} new records!") |
| | st.session_state.ingested_batches += 1 |
| |
|
| | if uploaded_files and st.button("Ingest batch to database"): |
| | ingest_json_files(uploaded_files) |
| |
|
| | def query_vector_db(user_query, top_k=5): |
| | query_emb = get_embedding(user_query) |
| | conn = sqlite3.connect(DB_PATH) |
| | cursor = conn.cursor() |
| | cursor.execute("SELECT id, batch_time, source_file, raw_json, flat_text, embedding FROM json_records") |
| | results = [] |
| | for row in cursor.fetchall(): |
| | db_emb = np.frombuffer(row[5], dtype=np.float32) |
| | if len(db_emb) != len(query_emb): continue |
| | sim = np.dot(query_emb, db_emb) / (np.linalg.norm(query_emb) * np.linalg.norm(db_emb)) |
| | results.append((sim, row)) |
| | conn.close() |
| | results = sorted(results, reverse=True)[:top_k] |
| | docs = [] |
| | for sim, row in results: |
| | meta = { |
| | "id": row[0], |
| | "batch_time": str(row[1]), |
| | "source_file": row[2], |
| | "similarity": f"{sim:.4f} (embedding)", |
| | "raw_json": row[3], |
| | } |
| | docs.append(Document(page_content=row[4], metadata=meta)) |
| | return docs |
| |
|
| | def python_fuzzy_match(user_query, top_k=5): |
| | query_terms = set(user_query.lower().replace("@", " ").replace(".", " ").split()) |
| | conn = sqlite3.connect(DB_PATH) |
| | cursor = conn.cursor() |
| | cursor.execute("SELECT id, batch_time, source_file, raw_json, flat_text FROM json_records") |
| | results = [] |
| | for row in cursor.fetchall(): |
| | flat_text = row[4].lower() |
| | score = sum(any(term in flat_text for term in query_terms) for term in query_terms) |
| | if score > 0: |
| | results.append((score, row)) |
| | conn.close() |
| | results = sorted(results, reverse=True)[:top_k] |
| | docs = [] |
| | for score, row in results: |
| | meta = { |
| | "id": row[0], |
| | "batch_time": str(row[1]), |
| | "source_file": row[2], |
| | "similarity": f"{score} (fuzzy)", |
| | "raw_json": row[3], |
| | } |
| | docs.append(Document(page_content=row[4], metadata=meta)) |
| | return docs |
| |
|
| | def extract_main_entity(question): |
| | import re |
| | quoted = re.findall(r"['\"]([^'\"]+)['\"]", question) |
| | if quoted: |
| | return quoted[0].lower() |
| | email = re.findall(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", question) |
| | if email: |
| | return email[0].lower().split('@')[0] |
| | tokens = re.findall(r"\b([A-Za-z0-9]+)\b", question) |
| | stopwords = {"how", "much", "did", "spend", "was", "the", "is", "in", "on", "for", "a", "an", "of", "to", "with"} |
| | keywords = [t.lower() for t in tokens if t.lower() not in stopwords] |
| | if not keywords: |
| | return "" |
| | return max(keywords, key=len) |
| |
|
| | def filter_records_by_entity(records, entity): |
| | if not entity: |
| | return records |
| | matches = [] |
| | for doc in records: |
| | if entity in doc.page_content.lower(): |
| | matches.append(doc) |
| | elif any(entity in v.lower() for v in doc.page_content.split(';')): |
| | matches.append(doc) |
| | return matches if matches else records |
| |
|
| | def hybrid_query(user_query, top_k=5): |
| | vector_docs = query_vector_db(user_query, top_k=top_k) |
| | fuzzy_docs = python_fuzzy_match(user_query, top_k=top_k) |
| | all_docs = [] |
| | seen_ids = set() |
| | for doc in (vector_docs + fuzzy_docs): |
| | doc_id = doc.metadata.get("id") |
| | if doc_id not in seen_ids: |
| | all_docs.append(doc) |
| | seen_ids.add(doc_id) |
| | entity = extract_main_entity(user_query) |
| | entity_docs = filter_records_by_entity(all_docs, entity) if entity else all_docs |
| | if entity_docs: |
| | doc = entity_docs[0] |
| | return [doc] |
| | else: |
| | return all_docs[:1] |
| |
|
| | class HybridRetriever(BaseRetriever): |
| | top_k: int = Field(default=5) |
| | def _get_relevant_documents(self, query, run_manager=None, **kwargs): |
| | return hybrid_query(query, self.top_k) |
| |
|
| | system_prompt = ( |
| | "You are a JSON data assistant. " |
| | "If the question mentions a name or email (e.g. Johnny), match it to any field value (even as part of an email) " |
| | "and answer directly using the record's fields. " |
| | "For example, if 'customer: johnny.appleseed@gmail.com' and the question is about Johnny, you should use that record." |
| | "If you can't find the answer, reply: 'I don’t have that information.'" |
| | "Never make up data. Never ask for clarification." |
| | ) |
| | prompt = ChatPromptTemplate.from_messages([ |
| | ("system", system_prompt), |
| | ("human", "Here are the most relevant records:\n{context}\n\nQuestion: {question}") |
| | ]) |
| |
|
| | llm = ChatOpenAI(model="gpt-4.1", openai_api_key=OPENAI_API_KEY, temperature=0) |
| | retriever = HybridRetriever(top_k=5) |
| | qa_chain = RetrievalQA.from_chain_type( |
| | llm=llm, |
| | retriever=retriever, |
| | chain_type_kwargs={"prompt": prompt}, |
| | return_source_documents=True, |
| | ) |
| |
|
| | st.markdown("### Ask any question about your data, just like ChatGPT.") |
| |
|
| | def show_tiny_json_links(): |
| | |
| | if not st.session_state.json_links: |
| | return |
| | for idx, link_key in enumerate(st.session_state.json_links): |
| | label = st.session_state.json_link_details[link_key]['label'] |
| | rec = st.session_state.json_link_details[link_key]['record'] |
| | expander_label = f"<span style='font-size:11px; color:#444; text-decoration:underline;'>[view JSON]</span> <span style='font-size:10px; color:#aaa'>{label}</span>" |
| | with st.expander(label="", expanded=False): |
| | st.markdown(expander_label, unsafe_allow_html=True) |
| | st.code(json.dumps(rec, indent=2), language="json") |
| | st.session_state.json_links = [] |
| | st.session_state.json_link_details = {} |
| |
|
| | for msg in st.session_state.messages: |
| | if msg["role"] == "user": |
| | st.markdown(f"<div style='color: #4F8BF9;'><b>User:</b> {msg['content']}</div>", unsafe_allow_html=True) |
| | elif msg["role"] == "assistant": |
| | st.markdown(f"<div style='color: #1C6E4C;'><b>Agent:</b> {msg['content']}</div>", unsafe_allow_html=True) |
| | show_tiny_json_links() |
| |
|
| | def send_message(): |
| | user_input = st.session_state.temp_input.strip() |
| | if not user_input: |
| | return |
| | st.session_state.messages.append({"role": "user", "content": user_input}) |
| | with st.spinner("Thinking..."): |
| | result = qa_chain({"query": user_input}) |
| | answer = result['result'] |
| | st.session_state.messages.append({"role": "assistant", "content": answer}) |
| | docs = result['source_documents'] |
| | link_keys = [] |
| | link_details = {} |
| | for idx, doc in enumerate(docs): |
| | link_key = f"json_{doc.metadata['id']}_{idx}" |
| | rec = json.loads(doc.metadata["raw_json"]) |
| | label = f"{doc.metadata['source_file']} | Similarity: {doc.metadata['similarity']}" |
| | link_details[link_key] = {"label": label, "record": rec} |
| | link_keys.append(link_key) |
| | st.session_state.json_links = link_keys |
| | st.session_state.json_link_details = link_details |
| | st.session_state.temp_input = "" |
| |
|
| | st.text_input("Your message:", key="temp_input", on_change=send_message) |
| |
|
| | if st.button("Clear chat"): |
| | st.session_state.messages = [] |
| | st.session_state.json_links = [] |
| | st.session_state.json_link_details = {} |
| |
|
| | st.info(f"Batches ingested so far (this session): {st.session_state.ingested_batches}") |
| |
|