Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """app.py | |
| ## **CitizenClimate: RAG for climate citizen assemblies** | |
| --- | |
| --- | |
| This is going to be the master document for all the files, though the py files will be put into their own individual files to ensure they're running all individually. The goal is to have these all as functions that can be called on when need be and have a smooth pipeline. | |
| """ | |
| #load packages | |
| #LangChain and Transformers says Hi | |
| #Initial package | |
| #import langchain | |
| #import | |
| #HuggingFace shenanigans | |
| from langchain_huggingface import HuggingFacePipeline #Easy pipeline | |
| from langchain_huggingface.embeddings import HuggingFaceEmbeddings #For embeddings | |
| from langchain_huggingface.embeddings import HuggingFaceEndpointEmbeddings #For even more robust embeddingd | |
| from transformers import AutoModelForCausalLM, AutoTokenizer,pipeline #Defined pipelines | |
| from transformers import BitsAndBytesConfig #Quantise the shit out of things | |
| from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint #For chat templates | |
| #Text loading | |
| from llama_index.core import SimpleDirectoryReader #Loads documents | |
| from llama_index.core import Document #deals with the objects created | |
| from llama_index.embeddings.huggingface import HuggingFaceEmbedding #Another embedding tool | |
| from llama_index.core.node_parser import SentenceSplitter #Slipt the text into sentences | |
| from llama_index.core.ingestion import IngestionPipeline #Here's the other pipeline | |
| #Text storage and loading | |
| import chromadb | |
| from llama_index.vector_stores.chroma import ChromaVectorStore #Stores the embeddings | |
| from llama_index.core import VectorStoreIndex #Stores the index | |
| from llama_index.core import StorageContext, load_index_from_storage, Settings #Might be important | |
| from llama_index.core.vector_stores import ExactMatchFilter, MetadataFilters | |
| from llama_index.core.memory import ChatSummaryMemoryBuffer, Memory #We need to limit the memory | |
| from llama_index.core.indices.prompt_helper import PromptHelper #Let's see if this works | |
| #Using the llama index shenanigans | |
| from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI #Not using this anymore, but keeping it in case something breaks if I remove it | |
| from llama_index.llms.huggingface import HuggingFaceLLM #Same as the previous one | |
| from llama_index.core import ChatPromptTemplate | |
| from huggingface_hub import CommitScheduler # scheduler | |
| #Adding in a reranker so that I can improve the outputs | |
| from reranker_v1 import rerank_documents | |
| #Adding packages to set up vLLM | |
| from llama_index.llms.openai_like import OpenAILike | |
| from llama_index.core import Settings | |
| from llama_index.llms.vllm import Vllm, VllmServer | |
| from vllm_server import start_vllm, wait_for_vllm #My own packages uwu | |
| #Evaluation | |
| from llama_index.core.evaluation import FaithfulnessEvaluator #I need to use you my guy | |
| from llama_index.core.callbacks import CallbackManager, LlamaDebugHandler #Debugger | |
| from llama_index.core.callbacks.schema import EventPayload #I am now using this to make sure that I can run the debugger orz | |
| #More packages uwu | |
| import gradio as gr #For I/O | |
| import os | |
| import json | |
| import time | |
| import subprocess | |
| import threading | |
| import requests | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from uuid import uuid4 | |
| #from datasets import load_dataset #Uploaded the dataset to the hub, I hope this works | |
| #it didn't work, so I decided to just upload the files directly | |
| #I forgot to set up the vLLM bit | |
| #So that was why it wasn't working lol | |
| #Set the vllm url | |
| VLLM_URL = os.getenv("VLLM_URL", "http://localhost:8000/v1") | |
| # Start server | |
| vllm_process = start_vllm() | |
| #Putting in the model here because it's faster tbh | |
| #Bets embedding model on the charts as of 28/10/2025 | |
| #Might not be the best option, let's try ChatGPT's version | |
| embedding_model = HuggingFaceEmbedding(model_name = 'Qwen/Qwen3-Embedding-8B', device= 'cpu') #Set the model | |
| #Then we store with ChromaDB | |
| climate_repo_path = ('./Working_Climate_Database_Sentence') | |
| clim_db_sentence = chromadb.PersistentClient(path=climate_repo_path) | |
| climate_collection_sentence = clim_db_sentence.get_or_create_collection('CitizenClimate_Sentence') | |
| vector_store_sentence = ChromaVectorStore(chroma_collection = climate_collection_sentence) | |
| #I'm on my nth run, so I' mot going to do this just yet | |
| #Maybe when I get new documents | |
| """## Retrival logic and prompt engineering | |
| Let's make sure that it can load the model, then test out the prompts. | |
| Then we can make this a pipeline. | |
| """ | |
| #The query model being used | |
| #Moved back to using the inference API | |
| #Because we are fighting for our lives | |
| query_model = OpenAILike( | |
| model='kosbu/Llama-3.3-70B-Instruct-AWQ', | |
| api_base='http://localhost:8000/v1', | |
| api_key='dummy',# vLLM doesn't require real API key shooketh | |
| max_tokens = 2048, | |
| temperature = 0.5, | |
| is_chat_model = True | |
| ) | |
| #So this is the not fun part | |
| #One: Link to the storage | |
| CHROMA_DB_PATH = './Working_Climate_Database_Sentence' # The path where Chroma's data is stored | |
| COLLECTION_NAME = 'CitizenClimate_Sentence' # The name of one of many existing Chroma collections | |
| #Adding a log file | |
| log_files = Path('chatbot_interactions') | |
| log_files.mkdir(parents=True, exist_ok=True) | |
| log_files_path = log_files / f"train-{uuid4()}.jsonl" | |
| scheduler = CommitScheduler( | |
| repo_id = 'CitizenClimate', | |
| repo_type = 'dataset', | |
| folder_path= str(log_files_path.parent), | |
| path_in_repo = 'data', | |
| ) | |
| #I need to ensure that the debug handler is a global value apparently | |
| debug_handler = None | |
| #The engine! | |
| def initialize_query_engine(): | |
| """Connect to ChromaDB, load the index, and create the query engine.""" | |
| print(f'Connecting to ChromaDB at: {CHROMA_DB_PATH}...') | |
| #Initialise the debug handler as a global variable uwu | |
| global debug_handler | |
| #Setting up a debugger becasue I'm not a heathen | |
| debug_handler = LlamaDebugHandler(print_trace_on_end=True) | |
| callback_manager = CallbackManager([debug_handler]) | |
| Settings.callback_manager = callback_manager | |
| # Use PersistentClient since I've saved it localy | |
| db = chromadb.PersistentClient(path=CHROMA_DB_PATH) | |
| # Get the existing collection | |
| try: | |
| chroma_collection = db.get_collection(COLLECTION_NAME) | |
| except Exception as e: | |
| raise ValueError( | |
| f"Could not find or connect to Chroma collection '{COLLECTION_NAME}'. " | |
| f'Error: {e}' ) | |
| # Create the VectorStore again | |
| vector_store = ChromaVectorStore(chroma_collection=chroma_collection) | |
| # Index! | |
| index = VectorStoreIndex.from_vector_store(vector_store, embed_model=embedding_model) | |
| # Adding a memory buffer | |
| #Changing up the memory buffer to a more stable version that summarises the chat before hand and ensures | |
| memory = ChatSummaryMemoryBuffer.from_defaults( | |
| llm=query_model, #We summarize our own work | |
| token_limit=800, | |
| count_initial_tokens=True) | |
| #Now defining the Prompt helper which will hopefully help me budget my responses. | |
| prompt_helper = PromptHelper( | |
| context_window=4096, #Matches the vllm server doc | |
| num_output=512, #Reserve space for the response | |
| chunk_overlap_ratio=0.1, #No idea what this does, but it's in the docs | |
| chunk_size_limit=None) | |
| #Define some nice system requirements here | |
| RETRIEVAL_TOP_K = 4 | |
| KIRKLEES_SYSTEM_PROMPT = ('You are a friendly, intelligent chatbot designed to answer user questions about climate issues and solutions in Kirklees. Ensure your response is viable in Kirklees unless it is a general query. Keep responses clear, factual, and easy to understand. Refer to the documents as your knowledge base e.g. According to my database, instead of according to the documents.') | |
| # Create the Chat Engine | |
| query_engine = index.as_chat_engine( | |
| llm=query_model, | |
| verbose=True, | |
| memory=memory, | |
| similarity_top_k=RETRIEVAL_TOP_K, | |
| system_prompt = KIRKLEES_SYSTEM_PROMPT, | |
| streaming=True, | |
| prompt_helper=prompt_helper, | |
| chat_mode='condense_plus_context') #I wonder what I should do with the chat mode... | |
| print('ChromaDB index, callback manager and query engine initialized successfully!') | |
| return query_engine | |
| #We have to have something to log the interactions so that they can be analysed | |
| def data_collection_log (message, full_response,source_nodes, duration): | |
| """ Saving the interaction for data analysis.""" | |
| #Hello debug handler, time to work | |
| global debug_handler | |
| #Ensure we can access the condensed query just in case uwu | |
| #Like sometimes we don't have a query and that's fine | |
| condensed_query = message | |
| #Extracts the condensed query because this looks like it'll be interesting | |
| if debug_handler: | |
| try: | |
| llm_events = debug_handler.get_llm_inputs_outputs() #Gets the llm inputs and outputs | |
| for event_pair in llm_events: | |
| if len(event_pair) >= 1: | |
| start_event = event_pair[0] | |
| if start_event.payload: | |
| if EventPayload.MESSAGES in start_event.payload: | |
| messages = start_event.payload.get(EventPayload.MESSAGES, []) | |
| if messages and hasattr(messages[-1], "content"): | |
| condensed_query = messages[-1].content | |
| break | |
| elif EventPayload.PROMPT in start_event.payload: | |
| condensed_query = start_event.payload.get(EventPayload.PROMPT, message) | |
| break | |
| except Exception as e: | |
| print(f'Failed to extract condensed query: {e}') | |
| log_entry = { | |
| 'timestamp': datetime.now(timezone.utc).isoformat(), | |
| 'user_query': message, | |
| 'condensed_query': condensed_query, | |
| 'bot_response': full_response, | |
| 'latency_seconds': round(duration, 2), | |
| 'sources': [ | |
| {'file': str(getattr(n, 'metadata', {}).get('file_name', 'N/A')), | |
| 'page': str(getattr(n, 'metadata', {}).get('page_label', 'N/A')), | |
| 'score': float(getattr(n, 'score', 0.0) or 0.0), | |
| 'text_chunk': (getattr(n, 'text', '')[:2000] + '...') if getattr(n, 'text', None) else '' | |
| } | |
| for n in (source_nodes or []) ] #Logging what the fox says | |
| } | |
| try: | |
| if hasattr(scheduler, "lock"): | |
| with scheduler.lock: | |
| with log_files_path.open('a', encoding='utf-8') as f: | |
| f.write(json.dumps(log_entry) + '\n') | |
| else: | |
| with log_files_path.open('a', encoding='utf-8') as f: | |
| f.write(json.dumps(log_entry) + '\n') | |
| except Exception as e: | |
| print(f'Failed to log interaction: {e}') | |
| #Claer the events after logging so that my system doesn't die uwu | |
| if debug_handler: | |
| debug_handler.flush_event_logs() | |
| # Initialize the engine globally | |
| try: | |
| RAG_CHAT_ENGINE = initialize_query_engine() | |
| except Exception as e: | |
| print(f'Error during initialization: {e}') | |
| RAG_CHAT_ENGINE = None | |
| #Nice that works!!!! | |
| #To ensure that there is a background wait before launch | |
| #Here is where we put the wait_for_llm() function | |
| def background_wait(): | |
| wait_for_vllm() | |
| print('vLLM is ready!!') | |
| #Let's put the RAG chat function here# | |
| #It's a baby for now, but we'll add storage and logging to it very soon uwu | |
| def rag_chat_function(message: str, history: list) -> str: | |
| """ | |
| The main chat function connected to the Gradio interface. | |
| Args: | |
| message (str): The user's latest message. | |
| history (list): The full chat history provided by Gradio. | |
| Returns: | |
| str: The assistant's response. | |
| """ | |
| # Guard against initialization failure | |
| if RAG_CHAT_ENGINE is None: | |
| yield 'RAG system not ready. Please check setup.' | |
| return | |
| #Start the clock! | |
| start_time = time.time() #Start the clock, Phoenix | |
| #Ensuring that there isn't empty variable so that it doesn't crash | |
| #sources_text = '' | |
| try: | |
| #For better experience, we stream Netflix | |
| response = RAG_CHAT_ENGINE.stream_chat(message) | |
| #Stream the response back to Gradio | |
| full_response = '' | |
| for token in response.response_gen: | |
| full_response += token | |
| yield full_response | |
| #We're testing shit out,so here we have the source nodes | |
| #I.E. Where we got the text from. | |
| #This is going to bite me in the ass because I went page by page | |
| #Get source nodes after streaming is complete | |
| source_nodes = response.source_nodes | |
| if source_nodes: | |
| source_nodes = rerank_documents(message, source_nodes, top_k=4) | |
| #I am removing this section to make it more palatable | |
| #if source_nodes: | |
| # sources_text = '\n\n**Sources:**\n' | |
| # for i, node in enumerate(source_nodes[:5]): # Display top 5 sources | |
| # metadata = node.metadata | |
| # f_name = str(metadata.get('file_name', 'N/A')) | |
| # p_label = str(metadata.get('page_label', 'N/A')) | |
| # sources_text += f"- **Source {i+1}**: File: `{f_name}`, Page: `{p_label}`\n" | |
| #Finalise logging | |
| duration = time.time() - start_time | |
| data_collection_log(message, full_response, source_nodes, duration) | |
| # Yield the final response with sources added | |
| yield full_response #+ sources_text | |
| except Exception as e: | |
| print(f'An error occurred during query: {e}') | |
| yield 'Sorry, an error occurred while processing your request. Please try again.' | |
| return | |
| #Then we test out the Gradio stuff right here right now | |
| if RAG_CHAT_ENGINE is not None: | |
| with gr.Blocks() as demo: | |
| gr.ChatInterface( | |
| fn=rag_chat_function, | |
| title='CitizenClimate: Kirklees', | |
| description = 'This is a chatbot that can be used to answer your climate questions and helps you with practical climate solutions for you and your community.', | |
| theme=gr.themes.Soft(), | |
| textbox=gr.Textbox( placeholder = 'Enter your question here please...', container=False, scale=7)) | |
| gr.Markdown('---') | |
| gr.Markdown('Although care has been taken to ensure that the outputs are as accurate as possible, the system may occasionally produce harmful instructions or biased content and may occasionally generate incorrect information.') | |
| # Launch the app (In a HuggingFace Space, you don't call launch() apparently, but this is for local testing) | |
| #However, I will still call launch because I have things that need doing. | |
| if __name__ == "__main__": | |
| #threading.Thread(target=background_wait, daemon=True).start() #Ensures that the model loadds in the background before it runs | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |
| else: | |
| demo = gr.Interface( | |
| fn=lambda x: 'RAG System initialization failed. Check logs.', | |
| inputs='text', | |
| outputs='text', | |
| title='RAG Prototype ERROR') |