|
|
import os |
|
|
import io |
|
|
import json |
|
|
import re |
|
|
import logging |
|
|
import tempfile |
|
|
import base64 |
|
|
from uuid import uuid4 |
|
|
from typing import Optional, List |
|
|
from fastapi import FastAPI, UploadFile, File, HTTPException |
|
|
from fastapi.responses import JSONResponse |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from pydantic import BaseModel |
|
|
from dotenv import load_dotenv |
|
|
from langchain.chains import create_history_aware_retriever, create_retrieval_chain |
|
|
from langchain.chains.combine_documents import create_stuff_documents_chain |
|
|
from langchain_mongodb.chat_message_histories import MongoDBChatMessageHistory |
|
|
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder |
|
|
from langchain_core.documents import Document |
|
|
from langchain_groq import ChatGroq |
|
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
|
from langchain_text_splitters import RecursiveCharacterTextSplitter |
|
|
from langchain_community.document_loaders import PyPDFLoader |
|
|
from langchain_chroma import Chroma |
|
|
from pymongo import MongoClient |
|
|
|
|
|
|
|
|
try: |
|
|
from pypdf import PdfReader |
|
|
PYPDF_AVAILABLE = True |
|
|
except ImportError: |
|
|
PYPDF_AVAILABLE = False |
|
|
|
|
|
try: |
|
|
import fitz |
|
|
PYMUPDF_AVAILABLE = True |
|
|
except ImportError: |
|
|
PYMUPDF_AVAILABLE = False |
|
|
|
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
|
GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
|
|
MONGODB_URL = os.getenv("MONGODB_URL") |
|
|
MONGODB_DATABASE = os.getenv("MONGODB_DATABASE", "greenstep_education") |
|
|
MONGODB_COLLECTION = os.getenv("MONGODB_COLLECTION", "chat_history") |
|
|
HOST = os.getenv("HOST", "0.0.0.0") |
|
|
PORT = int(os.getenv("PORT", 5000)) |
|
|
PDF_PATH = os.getenv("PDF_PATH", "./reforestation_content.pdf") |
|
|
|
|
|
|
|
|
|
|
|
if not all([HF_TOKEN, GROQ_API_KEY, PDF_PATH, MONGODB_URL]): |
|
|
logger.error("Missing required environment variables") |
|
|
raise RuntimeError("Environment variables not set. Please check HF_TOKEN, GROQ_API_KEY, PDF_PATH, and MONGODB_URL") |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
mongo_client = MongoClient(MONGODB_URL) |
|
|
mongo_client.admin.command('ping') |
|
|
logger.info("MongoDB connection successful") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to connect to MongoDB: {str(e)}") |
|
|
raise RuntimeError("MongoDB connection failed") |
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="GreenStep Education API", |
|
|
description="Educational chatbot API for GreenStep reforestation app.", |
|
|
version="1.0.0", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["GET", "POST", "DELETE"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") |
|
|
llm = ChatGroq(model_name="openai/gpt-oss-20b") |
|
|
|
|
|
|
|
|
def extract_text_with_pypdf(file_path: str) -> List[Document]: |
|
|
"""Extract text using pypdf library directly""" |
|
|
try: |
|
|
reader = PdfReader(file_path) |
|
|
documents = [] |
|
|
|
|
|
for page_num, page in enumerate(reader.pages): |
|
|
text = page.extract_text() |
|
|
if text.strip(): |
|
|
doc = Document( |
|
|
page_content=text, |
|
|
metadata={"source": file_path, "page": page_num} |
|
|
) |
|
|
documents.append(doc) |
|
|
|
|
|
logger.info(f"pypdf extracted text from {len(documents)} pages") |
|
|
return documents |
|
|
except Exception as e: |
|
|
logger.error(f"pypdf extraction failed: {str(e)}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def extract_text_with_pymupdf(file_path: str) -> List[Document]: |
|
|
"""Extract text using PyMuPDF (fitz) library - often better for complex PDFs""" |
|
|
try: |
|
|
doc = fitz.open(file_path) |
|
|
documents = [] |
|
|
|
|
|
for page_num in range(len(doc)): |
|
|
page = doc.load_page(page_num) |
|
|
text = page.get_text() |
|
|
if text.strip(): |
|
|
document = Document( |
|
|
page_content=text, |
|
|
metadata={"source": file_path, "page": page_num} |
|
|
) |
|
|
documents.append(document) |
|
|
|
|
|
doc.close() |
|
|
logger.info(f"PyMuPDF extracted text from {len(documents)} pages") |
|
|
return documents |
|
|
except Exception as e: |
|
|
logger.error(f"PyMuPDF extraction failed: {str(e)}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def process_pdf(file_path: str): |
|
|
"""Process PDF with multiple fallback methods for robust text extraction""" |
|
|
try: |
|
|
|
|
|
if not os.path.exists(file_path): |
|
|
raise FileNotFoundError(f"PDF file not found at: {file_path}") |
|
|
|
|
|
logger.info(f"Processing PDF from: {file_path}") |
|
|
documents = [] |
|
|
|
|
|
|
|
|
try: |
|
|
logger.info("Attempting extraction with PyPDFLoader...") |
|
|
loader = PyPDFLoader(file_path) |
|
|
documents = loader.load() |
|
|
|
|
|
if documents and any(doc.page_content.strip() for doc in documents): |
|
|
logger.info(f"PyPDFLoader successfully loaded {len(documents)} pages") |
|
|
else: |
|
|
documents = [] |
|
|
logger.warning("PyPDFLoader returned empty documents") |
|
|
except Exception as e: |
|
|
logger.warning(f"PyPDFLoader failed: {str(e)}") |
|
|
|
|
|
|
|
|
if not documents and PYPDF_AVAILABLE: |
|
|
logger.info("Attempting extraction with pypdf directly...") |
|
|
documents = extract_text_with_pypdf(file_path) |
|
|
|
|
|
|
|
|
if not documents and PYMUPDF_AVAILABLE: |
|
|
logger.info("Attempting extraction with PyMuPDF (fitz)...") |
|
|
documents = extract_text_with_pymupdf(file_path) |
|
|
|
|
|
|
|
|
if not documents: |
|
|
raise ValueError( |
|
|
"Failed to extract text from PDF with all available methods. " |
|
|
"The PDF might be:\n" |
|
|
"1. Empty or corrupted\n" |
|
|
"2. Password-protected\n" |
|
|
"3. Scanned images without OCR (consider using pytesseract)\n" |
|
|
"4. Using unsupported encryption" |
|
|
) |
|
|
|
|
|
|
|
|
total_text = "".join([doc.page_content for doc in documents]) |
|
|
if not total_text.strip(): |
|
|
raise ValueError("No text content found in PDF. It may contain only images.") |
|
|
|
|
|
logger.info(f"Successfully extracted {len(total_text)} characters from {len(documents)} pages") |
|
|
|
|
|
|
|
|
text_splitter = RecursiveCharacterTextSplitter( |
|
|
chunk_size=5000, |
|
|
chunk_overlap=500, |
|
|
length_function=len, |
|
|
separators=["\n\n", "\n", ". ", " ", ""] |
|
|
) |
|
|
splits = text_splitter.split_documents(documents) |
|
|
|
|
|
|
|
|
splits = [doc for doc in splits if doc.page_content.strip()] |
|
|
|
|
|
if not splits: |
|
|
raise ValueError("Text splitting resulted in zero valid chunks.") |
|
|
|
|
|
logger.info(f"Created {len(splits)} text chunks for vectorization") |
|
|
|
|
|
|
|
|
vectorstore = Chroma.from_documents( |
|
|
documents=splits, |
|
|
embedding=embeddings, |
|
|
persist_directory="./greenstep_education.db" |
|
|
) |
|
|
|
|
|
logger.info("Vectorstore created successfully") |
|
|
return vectorstore |
|
|
|
|
|
except FileNotFoundError as e: |
|
|
logger.error(f"File not found: {str(e)}") |
|
|
raise RuntimeError(f"PDF file not found: {str(e)}") |
|
|
except ValueError as e: |
|
|
logger.error(f"Invalid PDF content: {str(e)}") |
|
|
raise RuntimeError(f"PDF processing failed: {str(e)}") |
|
|
except Exception as e: |
|
|
logger.error(f"Unexpected error processing PDF: {str(e)}", exc_info=True) |
|
|
raise RuntimeError(f"PDF processing failed: {str(e)}") |
|
|
|
|
|
|
|
|
def get_session_history(session_id: str) -> MongoDBChatMessageHistory: |
|
|
"""Get MongoDB chat message history for a session""" |
|
|
return MongoDBChatMessageHistory( |
|
|
connection_string=MONGODB_URL, |
|
|
session_id=session_id, |
|
|
database_name=MONGODB_DATABASE, |
|
|
collection_name=MONGODB_COLLECTION, |
|
|
create_index=True |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
logger.info(f"Initializing vectorstore from PDF: {PDF_PATH}") |
|
|
vectorstore = process_pdf(PDF_PATH) |
|
|
retriever = vectorstore.as_retriever(search_kwargs={"k": 4}) |
|
|
logger.info("Vectorstore initialized successfully") |
|
|
except Exception as e: |
|
|
logger.error(f"Vectorstore initialization failed: {str(e)}") |
|
|
logger.error("\nTroubleshooting steps:") |
|
|
logger.error("1. Verify PDF file exists at the specified path") |
|
|
logger.error("2. Ensure PDF contains extractable text (not just scanned images)") |
|
|
logger.error("3. Check if PDF is password-protected") |
|
|
logger.error("4. Try opening the PDF manually to verify it's not corrupted") |
|
|
logger.error("\nInstall additional libraries for better PDF support:") |
|
|
logger.error(" pip install pypdf pymupdf") |
|
|
raise RuntimeError(f"Vectorstore initialization failed: {str(e)}") |
|
|
|
|
|
|
|
|
class QuestionRequest(BaseModel): |
|
|
session_id: str |
|
|
question: str |
|
|
|
|
|
|
|
|
class QuestionResponse(BaseModel): |
|
|
answer: str |
|
|
|
|
|
|
|
|
class SessionHistoryRequest(BaseModel): |
|
|
session_id: str |
|
|
|
|
|
|
|
|
class SessionHistoryResponse(BaseModel): |
|
|
session_id: str |
|
|
message_count: int |
|
|
messages: List[dict] |
|
|
|
|
|
|
|
|
@app.post( |
|
|
"/ask", |
|
|
response_model=QuestionResponse, |
|
|
summary="Ask the GreenStep education assistant", |
|
|
description="Submit a question to learn about reforestation, trees, forests, and environmental conservation." |
|
|
) |
|
|
async def ask_question(request: QuestionRequest): |
|
|
"""Handle question and maintain chat history in MongoDB""" |
|
|
session_id = request.session_id |
|
|
question = request.question |
|
|
logger.info(f"Received question for session {session_id}: {question}") |
|
|
|
|
|
try: |
|
|
|
|
|
history = get_session_history(session_id) |
|
|
all_messages = history.messages |
|
|
last_messages = all_messages[-6:] if len(all_messages) > 6 else all_messages |
|
|
|
|
|
|
|
|
contextualize_q_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", "Rephrase the user's question considering the chat history to provide better context."), |
|
|
MessagesPlaceholder("chat_history"), |
|
|
("human", "{input}") |
|
|
]) |
|
|
|
|
|
history_aware_retriever = create_history_aware_retriever( |
|
|
llm, retriever, contextualize_q_prompt |
|
|
) |
|
|
|
|
|
|
|
|
system_prompt = """You are the GreenStep Education Assistant, a friendly, knowledgeable, and inspiring chatbot |
|
|
designed to educate users about reforestation, tree planting, forest conservation, and environmental |
|
|
sustainability within the GreenStep app's Education tab. |
|
|
|
|
|
Your primary mission is to empower users with accurate, actionable knowledge about forests and their |
|
|
role in combating climate change, while fostering a deep appreciation for nature and encouraging |
|
|
environmental action. |
|
|
|
|
|
Use the following verified educational content to answer questions: |
|
|
|
|
|
{context} |
|
|
|
|
|
Your responses should be: |
|
|
1. Educational and engaging about reforestation, tree species, planting techniques, and environmental benefits |
|
|
2. Scientifically accurate based on the provided educational content |
|
|
3. Inspiring and action-oriented, motivating users to participate in reforestation |
|
|
4. Accessible to diverse audiences with clear, jargon-free language |
|
|
5. Balanced and honest about both opportunities and challenges |
|
|
6. Interactive and conversational, building on previous discussions |
|
|
7. Positive and solutions-focused, emphasizing hope and agency |
|
|
|
|
|
Remember: Transform users from passive learners into informed environmental advocates who |
|
|
understand reforestation science and feel empowered to contribute through GreenStep. |
|
|
""" |
|
|
|
|
|
qa_prompt = ChatPromptTemplate.from_messages([ |
|
|
("system", system_prompt), |
|
|
MessagesPlaceholder("chat_history"), |
|
|
("human", "{input}") |
|
|
]) |
|
|
|
|
|
question_answer_chain = create_stuff_documents_chain(llm, qa_prompt) |
|
|
rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain) |
|
|
|
|
|
|
|
|
result = rag_chain.invoke({ |
|
|
"input": question, |
|
|
"chat_history": last_messages |
|
|
}) |
|
|
raw_answer = result["answer"] |
|
|
|
|
|
|
|
|
cleaned_answer = re.sub(r"<think>.*?</think>\s*", "", raw_answer, flags=re.DOTALL).strip() |
|
|
|
|
|
|
|
|
history.add_user_message(question) |
|
|
history.add_ai_message(cleaned_answer) |
|
|
|
|
|
logger.info(f"Response saved to MongoDB for session {session_id}") |
|
|
return QuestionResponse(answer=cleaned_answer) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error processing question: {str(e)}") |
|
|
raise HTTPException(status_code=500, detail=f"Processing failed: {str(e)}") |
|
|
|
|
|
|
|
|
@app.post("/history", response_model=SessionHistoryResponse) |
|
|
async def get_history(request: SessionHistoryRequest): |
|
|
"""Retrieve chat history for a session""" |
|
|
try: |
|
|
history = get_session_history(request.session_id) |
|
|
messages = history.messages |
|
|
messages_dict = [{"type": msg.type, "content": msg.content} for msg in messages] |
|
|
return SessionHistoryResponse( |
|
|
session_id=request.session_id, |
|
|
message_count=len(messages), |
|
|
messages=messages_dict |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Error retrieving history: {str(e)}") |
|
|
raise HTTPException(status_code=500, detail=f"Failed to retrieve history: {str(e)}") |
|
|
|
|
|
|
|
|
@app.delete("/history/{session_id}") |
|
|
async def clear_history(session_id: str): |
|
|
"""Clear chat history for a session""" |
|
|
try: |
|
|
history = get_session_history(session_id) |
|
|
history.clear() |
|
|
logger.info(f"Cleared history for session {session_id}") |
|
|
return {"message": f"History cleared for session {session_id}"} |
|
|
except Exception as e: |
|
|
logger.error(f"Error clearing history: {str(e)}") |
|
|
raise HTTPException(status_code=500, detail=f"Failed to clear history: {str(e)}") |
|
|
|
|
|
|
|
|
@app.get("/health") |
|
|
async def health_check(): |
|
|
"""Health check endpoint""" |
|
|
try: |
|
|
mongo_client.admin.command('ping') |
|
|
mongo_status = "connected" |
|
|
except Exception as e: |
|
|
mongo_status = f"disconnected: {str(e)}" |
|
|
|
|
|
return { |
|
|
"status": "healthy", |
|
|
"app": "GreenStep Education Assistant", |
|
|
"mongodb": mongo_status, |
|
|
"vectorstore": "initialized" if vectorstore else "not initialized", |
|
|
"pdf_libraries": { |
|
|
"pypdf": PYPDF_AVAILABLE, |
|
|
"pymupdf": PYMUPDF_AVAILABLE |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
return { |
|
|
"message": "Welcome to GreenStep Education API", |
|
|
"description": "Learn about reforestation, tree planting, and environmental conservation", |
|
|
"endpoints": { |
|
|
"ask_question": "/ask", |
|
|
"get_history": "/history", |
|
|
"clear_history": "/history/{session_id}", |
|
|
"health_check": "/health", |
|
|
"documentation": "/docs" |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@app.on_event("shutdown") |
|
|
async def shutdown_event(): |
|
|
"""Close MongoDB connection""" |
|
|
mongo_client.close() |
|
|
logger.info("MongoDB connection closed") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run(app, host=HOST, port=PORT) |
|
|
|