CitizenClimate / app.py
gyrmo's picture
Indentation error
917b429 verified
raw
history blame
15 kB
# -*- coding: utf-8 -*-
"""app.py
## **CitizenClimate: RAG for climate citizen assemblies**
---
---
This is going to be the master document for all the files, though the py files will be put into their own individual files to ensure they're running all individually. The goal is to have these all as functions that can be called on when need be and have a smooth pipeline.
"""
#load packages
#LangChain and Transformers says Hi
#Initial package
#import langchain
#import
#HuggingFace shenanigans
from langchain_huggingface import HuggingFacePipeline #Easy pipeline
from langchain_huggingface.embeddings import HuggingFaceEmbeddings #For embeddings
from langchain_huggingface.embeddings import HuggingFaceEndpointEmbeddings #For even more robust embeddingd
from transformers import AutoModelForCausalLM, AutoTokenizer,pipeline #Defined pipelines
from transformers import BitsAndBytesConfig #Quantise the shit out of things
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint #For chat templates
#Text loading
from llama_index.core import SimpleDirectoryReader #Loads documents
from llama_index.core import Document #deals with the objects created
from llama_index.embeddings.huggingface import HuggingFaceEmbedding #Another embedding tool
from llama_index.core.node_parser import SentenceSplitter #Slipt the text into sentences
from llama_index.core.ingestion import IngestionPipeline #Here's the other pipeline
#Text storage and loading
import chromadb
from llama_index.vector_stores.chroma import ChromaVectorStore #Stores the embeddings
from llama_index.core import VectorStoreIndex #Stores the index
from llama_index.core import StorageContext, load_index_from_storage, Settings #Might be important
from llama_index.core.vector_stores import ExactMatchFilter, MetadataFilters
from llama_index.core.memory import ChatSummaryMemoryBuffer, Memory #We need to limit the memory
from llama_index.core.indices.prompt_helper import PromptHelper #Let's see if this works
#Using the llama index shenanigans
from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI #Not using this anymore, but keeping it in case something breaks if I remove it
from llama_index.llms.huggingface import HuggingFaceLLM #Same as the previous one
from llama_index.core import ChatPromptTemplate
from huggingface_hub import CommitScheduler # scheduler
#Adding in a reranker so that I can improve the outputs
from reranker_v1 import rerank_documents
#Adding packages to set up vLLM
from llama_index.llms.openai_like import OpenAILike
from llama_index.core import Settings
from llama_index.llms.vllm import Vllm, VllmServer
from vllm_server import start_vllm, wait_for_vllm #My own packages uwu
#Evaluation
from llama_index.core.evaluation import FaithfulnessEvaluator #I need to use you my guy
from llama_index.core.callbacks import CallbackManager, LlamaDebugHandler #Debugger
from llama_index.core.callbacks.schema import EventPayload #I am now using this to make sure that I can run the debugger orz
#More packages uwu
import gradio as gr #For I/O
import os
import json
import time
import subprocess
import threading
import requests
from datetime import datetime, timezone
from pathlib import Path
from uuid import uuid4
#from datasets import load_dataset #Uploaded the dataset to the hub, I hope this works
#it didn't work, so I decided to just upload the files directly
#I forgot to set up the vLLM bit
#So that was why it wasn't working lol
#Set the vllm url
VLLM_URL = os.getenv("VLLM_URL", "http://localhost:8000/v1")
# Start server
vllm_process = start_vllm()
#Putting in the model here because it's faster tbh
#Bets embedding model on the charts as of 28/10/2025
#Might not be the best option, let's try ChatGPT's version
embedding_model = HuggingFaceEmbedding(model_name = 'Qwen/Qwen3-Embedding-8B', device= 'cpu') #Set the model
#Then we store with ChromaDB
climate_repo_path = ('./Working_Climate_Database_Sentence')
clim_db_sentence = chromadb.PersistentClient(path=climate_repo_path)
climate_collection_sentence = clim_db_sentence.get_or_create_collection('CitizenClimate_Sentence')
vector_store_sentence = ChromaVectorStore(chroma_collection = climate_collection_sentence)
#I'm on my nth run, so I' mot going to do this just yet
#Maybe when I get new documents
"""## Retrival logic and prompt engineering
Let's make sure that it can load the model, then test out the prompts.
Then we can make this a pipeline.
"""
#The query model being used
#Moved back to using the inference API
#Because we are fighting for our lives
query_model = OpenAILike(
model='kosbu/Llama-3.3-70B-Instruct-AWQ',
api_base='http://localhost:8000/v1',
api_key='dummy',# vLLM doesn't require real API key shooketh
max_tokens = 2048,
temperature = 0.5,
is_chat_model = True
)
#So this is the not fun part
#One: Link to the storage
CHROMA_DB_PATH = './Working_Climate_Database_Sentence' # The path where Chroma's data is stored
COLLECTION_NAME = 'CitizenClimate_Sentence' # The name of one of many existing Chroma collections
#Adding a log file
log_files = Path('chatbot_interactions')
log_files.mkdir(parents=True, exist_ok=True)
log_files_path = log_files / f"train-{uuid4()}.jsonl"
scheduler = CommitScheduler(
repo_id = 'CitizenClimate',
repo_type = 'dataset',
folder_path= str(log_files_path.parent),
path_in_repo = 'data',
)
#I need to ensure that the debug handler is a global value apparently
debug_handler = None
#The engine!
def initialize_query_engine():
"""Connect to ChromaDB, load the index, and create the query engine."""
print(f'Connecting to ChromaDB at: {CHROMA_DB_PATH}...')
#Initialise the debug handler as a global variable uwu
global debug_handler
#Setting up a debugger becasue I'm not a heathen
debug_handler = LlamaDebugHandler(print_trace_on_end=True)
callback_manager = CallbackManager([debug_handler])
Settings.callback_manager = callback_manager
# Use PersistentClient since I've saved it localy
db = chromadb.PersistentClient(path=CHROMA_DB_PATH)
# Get the existing collection
try:
chroma_collection = db.get_collection(COLLECTION_NAME)
except Exception as e:
raise ValueError(
f"Could not find or connect to Chroma collection '{COLLECTION_NAME}'. "
f'Error: {e}' )
# Create the VectorStore again
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
# Index!
index = VectorStoreIndex.from_vector_store(vector_store, embed_model=embedding_model)
# Adding a memory buffer
#Changing up the memory buffer to a more stable version that summarises the chat before hand and ensures
memory = ChatSummaryMemoryBuffer.from_defaults(
llm=query_model, #We summarize our own work
token_limit=800,
count_initial_tokens=True)
#Now defining the Prompt helper which will hopefully help me budget my responses.
prompt_helper = PromptHelper(
context_window=4096, #Matches the vllm server doc
num_output=512, #Reserve space for the response
chunk_overlap_ratio=0.1, #No idea what this does, but it's in the docs
chunk_size_limit=None)
#Define some nice system requirements here
RETRIEVAL_TOP_K = 4
KIRKLEES_SYSTEM_PROMPT = ('You are a friendly, intelligent chatbot designed to answer user questions about climate issues and solutions in Kirklees. Ensure your response is viable in Kirklees unless it is a general query. Keep responses clear, factual, and easy to understand. Refer to the documents as your knowledge base e.g. According to my database, instead of according to the documents.')
# Create the Chat Engine
query_engine = index.as_chat_engine(
llm=query_model,
verbose=True,
memory=memory,
similarity_top_k=RETRIEVAL_TOP_K,
system_prompt = KIRKLEES_SYSTEM_PROMPT,
streaming=True,
prompt_helper=prompt_helper,
chat_mode='condense_plus_context') #I wonder what I should do with the chat mode...
print('ChromaDB index, callback manager and query engine initialized successfully!')
return query_engine
#We have to have something to log the interactions so that they can be analysed
def data_collection_log (message, full_response,source_nodes, duration):
""" Saving the interaction for data analysis."""
#Hello debug handler, time to work
global debug_handler
#Ensure we can access the condensed query just in case uwu
#Like sometimes we don't have a query and that's fine
condensed_query = message
#Extracts the condensed query because this looks like it'll be interesting
if debug_handler:
try:
llm_events = debug_handler.get_llm_inputs_outputs() #Gets the llm inputs and outputs
for event_pair in llm_events:
if len(event_pair) >= 1:
start_event = event_pair[0]
if start_event.payload:
if EventPayload.MESSAGES in start_event.payload:
messages = start_event.payload.get(EventPayload.MESSAGES, [])
if messages and hasattr(messages[-1], "content"):
condensed_query = messages[-1].content
break
elif EventPayload.PROMPT in start_event.payload:
condensed_query = start_event.payload.get(EventPayload.PROMPT, message)
break
except Exception as e:
print(f'Failed to extract condensed query: {e}')
log_entry = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'user_query': message,
'condensed_query': condensed_query,
'bot_response': full_response,
'latency_seconds': round(duration, 2),
'sources': [
{'file': str(getattr(n, 'metadata', {}).get('file_name', 'N/A')),
'page': str(getattr(n, 'metadata', {}).get('page_label', 'N/A')),
'score': float(getattr(n, 'score', 0.0) or 0.0),
'text_chunk': (getattr(n, 'text', '')[:2000] + '...') if getattr(n, 'text', None) else ''
}
for n in (source_nodes or []) ] #Logging what the fox says
}
try:
if hasattr(scheduler, "lock"):
with scheduler.lock:
with log_files_path.open('a', encoding='utf-8') as f:
f.write(json.dumps(log_entry) + '\n')
else:
with log_files_path.open('a', encoding='utf-8') as f:
f.write(json.dumps(log_entry) + '\n')
except Exception as e:
print(f'Failed to log interaction: {e}')
#Claer the events after logging so that my system doesn't die uwu
if debug_handler:
debug_handler.flush_event_logs()
# Initialize the engine globally
try:
RAG_CHAT_ENGINE = initialize_query_engine()
except Exception as e:
print(f'Error during initialization: {e}')
RAG_CHAT_ENGINE = None
#Nice that works!!!!
#To ensure that there is a background wait before launch
#Here is where we put the wait_for_llm() function
def background_wait():
wait_for_vllm()
print('vLLM is ready!!')
#Let's put the RAG chat function here#
#It's a baby for now, but we'll add storage and logging to it very soon uwu
def rag_chat_function(message: str, history: list) -> str:
"""
The main chat function connected to the Gradio interface.
Args:
message (str): The user's latest message.
history (list): The full chat history provided by Gradio.
Returns:
str: The assistant's response.
"""
# Guard against initialization failure
if RAG_CHAT_ENGINE is None:
yield 'RAG system not ready. Please check setup.'
return
#Start the clock!
start_time = time.time() #Start the clock, Phoenix
#Ensuring that there isn't empty variable so that it doesn't crash
#sources_text = ''
try:
#For better experience, we stream Netflix
response = RAG_CHAT_ENGINE.stream_chat(message)
#Stream the response back to Gradio
full_response = ''
for token in response.response_gen:
full_response += token
yield full_response
#We're testing shit out,so here we have the source nodes
#I.E. Where we got the text from.
#This is going to bite me in the ass because I went page by page
#Get source nodes after streaming is complete
source_nodes = response.source_nodes
if source_nodes:
source_nodes = rerank_documents(message, source_nodes, top_k=4)
#I am removing this section to make it more palatable
#if source_nodes:
# sources_text = '\n\n**Sources:**\n'
# for i, node in enumerate(source_nodes[:5]): # Display top 5 sources
# metadata = node.metadata
# f_name = str(metadata.get('file_name', 'N/A'))
# p_label = str(metadata.get('page_label', 'N/A'))
# sources_text += f"- **Source {i+1}**: File: `{f_name}`, Page: `{p_label}`\n"
#Finalise logging
duration = time.time() - start_time
data_collection_log(message, full_response, source_nodes, duration)
# Yield the final response with sources added
yield full_response #+ sources_text
except Exception as e:
print(f'An error occurred during query: {e}')
yield 'Sorry, an error occurred while processing your request. Please try again.'
return
#Then we test out the Gradio stuff right here right now
if RAG_CHAT_ENGINE is not None:
with gr.Blocks() as demo:
gr.ChatInterface(
fn=rag_chat_function,
title='CitizenClimate: Kirklees',
description = 'This is a chatbot that can be used to answer your climate questions and helps you with practical climate solutions for you and your community.',
theme=gr.themes.Soft(),
textbox=gr.Textbox( placeholder = 'Enter your question here please...', container=False, scale=7))
gr.Markdown('---')
gr.Markdown('Although care has been taken to ensure that the outputs are as accurate as possible, the system may occasionally produce harmful instructions or biased content and may occasionally generate incorrect information.')
# Launch the app (In a HuggingFace Space, you don't call launch() apparently, but this is for local testing)
#However, I will still call launch because I have things that need doing.
if __name__ == "__main__":
#threading.Thread(target=background_wait, daemon=True).start() #Ensures that the model loadds in the background before it runs
demo.launch(server_name="0.0.0.0", server_port=7860)
else:
demo = gr.Interface(
fn=lambda x: 'RAG System initialization failed. Check logs.',
inputs='text',
outputs='text',
title='RAG Prototype ERROR')