diff --git a/.env.example b/.env.example new file mode 100644 index 0000000000000000000000000000000000000000..76631b6a88d8c75b6b7a9924e34a2f8e81b6f62c --- /dev/null +++ b/.env.example @@ -0,0 +1,13 @@ +GROQ_API_KEY=your_groq_api_key_here + +HUGGINGFACE_TOKEN=your_hf_token_here + +JWT_SECRET_KEY=your_jwt_secret_key_here + +MONGODB_USERNAME= +MONGODB_PASSWORD= + + +REDIS_PASSWORD= + +QDRANT_API_KEY= \ No newline at end of file diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml new file mode 100644 index 0000000000000000000000000000000000000000..a91c56d52b27b7a9e1899b49b77c3a6993f73a00 --- /dev/null +++ b/.github/workflows/deploy.yml @@ -0,0 +1,40 @@ +name: Deploy to Hugging Face Spaces + +on: + push: + branches: + - main + workflow_dispatch: + +jobs: + deploy: + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Run tests + run: | + pip install pytest pytest-asyncio httpx + pytest tests/ -v --tb=short || true + + - name: Push to Hugging Face Spaces + env: + HF_TOKEN: ${{ secrets.HF_TOKEN }} + run: | + git config --global user.email "github-actions[bot]@users.noreply.github.com" + git config --global user.name "github-actions[bot]" + + git remote add hf https://HF_USERNAME:$HF_TOKEN@huggingface.co/spaces/HF_USERNAME/rag-chatbot + git push hf main --force diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..e33a4e2bcd824e64591344f81d29fa10f0d63769 --- /dev/null +++ b/.gitignore @@ -0,0 +1,57 @@ +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +venv/ +ENV/ +env/ +RAG/ + +.env +.env.local + +.vscode/ +.idea/ +*.swp +*.swo +*~ +.DS_Store + +.ipynb_checkpoints + +.pytest_cache/ +.coverage +htmlcov/ +*.cover + +*.log +logs/ + +*.db +*.sqlite3 + +uploads/ +temp/ + +*.dockerignore + +Thumbs.db + diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..76b77ca6b85d37c5e69004287f41ee6bbd74e51c --- /dev/null +++ b/Dockerfile @@ -0,0 +1,38 @@ +FROM python:3.11-slim + +# Set working directory +WORKDIR /app + +# Set environment variables +ENV PYTHONUNBUFFERED=1 \ + PYTHONDONTWRITEBYTECODE=1 \ + PIP_NO_CACHE_DIR=1 \ + PIP_DISABLE_PIP_VERSION_CHECK=1 + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + build-essential \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements first for better caching +COPY requirements.txt . + +# Install Python dependencies +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Create necessary directories +RUN mkdir -p uploads logs + +# Expose port +EXPOSE 7860 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \ + CMD curl -f http://localhost:7860/health/ || exit 1 + +# Run the application +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "7860"] diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000000000000000000000000000000000000..a999b589d2d4a5230078278d4be6668a85f47655 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2026 RAG Chatbot Project + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000000000000000000000000000000000000..8ee645114d6424b1d2f3e466471062386a9ac77d --- /dev/null +++ b/README.md @@ -0,0 +1,209 @@ +# RAG Chatbot with Advanced Retrieval + +Enterprise-grade Retrieval-Augmented Generation (RAG) chatbot built with LangChain, FastAPI, and modern AI technologies. + +## ๐ Features + +- **Hybrid Retrieval**: Combines BM25 and vector search for optimal document retrieval +- **Reranking**: FlashRank reranker for improved result quality +- **Streaming Responses**: Real-time chat with Server-Sent Events (SSE) +- **Conversation Memory**: Redis-backed chat history +- **Smart Caching**: Semantic caching with RAG/non-RAG distinction +- **Document Processing**: Support for PDF, DOCX, and TXT files +- **Background Processing**: Celery workers for async document processing +- **Real-time Updates**: MongoDB change streams for live notifications +- **Vector Database**: Qdrant for scalable vector storage + +## ๐๏ธ Architecture + +``` +โโโ app/ # Main application +โ โโโ api/ # FastAPI routes and middleware +โ โโโ core/ # RAG components (retriever, reranker, generator) +โ โโโ db/ # Database clients (MongoDB, Redis, Qdrant) +โ โโโ models/ # Pydantic schemas +โ โโโ services/ # MongoDB watcher +โ โโโ tasks/ # Celery background tasks +โ โโโ utils/ # Utilities (logger, errors, prompts) +โโโ ingestion/ # Document processing pipeline +โโโ frontend/ # Web interface (HTML/CSS/JS) +โโโ config/ # YAML configurations +โโโ tests/ # Test suite +โโโ prompts/ # LLM prompt templates +``` + +## ๐ฆ Tech Stack + +- **Framework**: FastAPI + Uvicorn +- **LLM**: Groq API (llama-3.1-70b) +- **Embeddings**: FastEmbed (BAAI/bge-small-en-v1.5) +- **Vector Store**: Qdrant Cloud +- **Databases**: MongoDB Atlas, Redis Cloud +- **Reranking**: FlashRank (ms-marco-MiniLM-L-12-v2) +- **Background Jobs**: Celery +- **LangChain**: Version 0.3.13 with LangGraph 0.2.58 + +## ๐ ๏ธ Installation + +### Local Setup + +1. **Clone the repository** +```bash +git clone https://github.com/YOUR_USERNAME/rag-chatbot.git +cd rag-chatbot +``` + +2. **Create virtual environment** +```bash +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate +``` + +3. **Install dependencies** +```bash +pip install -r requirements.txt +``` + +4. **Configure environment** +Create a `.env` file in the root directory: +```env +GROQ_API_KEY=your_groq_api_key +QDRANT_API_KEY=your_qdrant_api_key +REDIS_PASSWORD=your_redis_password +``` + +5. **Update configuration** +Edit `config/database.yaml` with your MongoDB, Redis, and Qdrant URLs. + +6. **Run the application** +```bash +uvicorn app.main:app --host 0.0.0.0 --port 7860 +``` + +Visit `http://localhost:7860` to access the chat interface. + +### Docker Setup + +1. **Build and run with Docker Compose** +```bash +docker-compose up -d +``` + +2. **View logs** +```bash +docker-compose logs -f app +``` + +3. **Stop services** +```bash +docker-compose down +``` + +## ๐งช Testing + +Run the test suite: +```bash +pytest tests/ -v +``` + +Run specific test categories: +```bash +# Unit tests only +pytest tests/ -m unit + +# Integration tests only +pytest tests/ -m integration + +# Skip slow tests +pytest tests/ -m "not slow" +``` + +## ๐ Deployment + +### Hugging Face Spaces + +1. **Create a new Space** on [Hugging Face](https://huggingface.co/spaces) +2. **Select Docker SDK** as the space type +3. **Add secrets** in Space settings: + - `GROQ_API_KEY` + - `QDRANT_API_KEY` + - `REDIS_PASSWORD` +4. **Push code** to the Space repository +5. **Automatic deployment** via GitHub Actions (see `.github/workflows/deploy.yml`) + +### Manual Deployment + +```bash +# Build Docker image +docker build -t rag-chatbot . + +# Run container +docker run -p 7860:7860 \ + -e GROQ_API_KEY=your_key \ + -e QDRANT_API_KEY=your_key \ + -e REDIS_PASSWORD=your_password \ + rag-chatbot +``` + +## ๐ Usage + +### Document Upload + +1. Click "Upload Document" in the sidebar +2. Select a PDF, DOCX, or TXT file +3. Wait for processing (documents are chunked and embedded) +4. Document appears in the sidebar + +### Chat + +1. Toggle RAG on/off using the switch +2. Type your question in the input field +3. Press Enter or click Send +4. Receive streaming responses in real-time + +### RAG vs Non-RAG + +- **RAG ON**: Answers based on your uploaded documents +- **RAG OFF**: Answers from LLM's general knowledge + +## ๐ง Configuration + +All configuration is in `config/*.yaml` files: + +- `app.yaml` - Server and upload settings +- `database.yaml` - Database connections +- `models.yaml` - LLM, embedding, reranker configs +- `rag.yaml` - Retrieval and chunking parameters +- `security.yaml` - CORS, rate limiting, JWT +- `celery.yaml` - Background worker settings +- `langchain.yaml` - LangChain tracing + +## ๐ค Contributing + +Contributions are welcome! Please: + +1. Fork the repository +2. Create a feature branch (`git checkout -b feature/amazing-feature`) +3. Commit your changes (`git commit -m 'Add amazing feature'`) +4. Push to the branch (`git push origin feature/amazing-feature`) +5. Open a Pull Request + +## ๐ License + +This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. + +## ๐ Acknowledgments + +- LangChain for the RAG framework +- Groq for fast LLM inference +- Qdrant for vector storage +- FlashRank for efficient reranking +- FastEmbed for lightweight embeddings + +## ๐ง Contact + +For questions or support, please open an issue on GitHub. + +--- + +**Built with โค๏ธ using LangChain, FastAPI, and modern AI technologies** diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/api/__init__.py b/app/api/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/api/dependencies.py b/app/api/dependencies.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/api/middleware/__init__.py b/app/api/middleware/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/api/middleware/cors.py b/app/api/middleware/cors.py new file mode 100644 index 0000000000000000000000000000000000000000..0c22fe1624c2c0d3936f3d9fd2dd5278a96660f7 --- /dev/null +++ b/app/api/middleware/cors.py @@ -0,0 +1,14 @@ +from fastapi.middleware.cors import CORSMiddleware +from app.config import config + + +def setup_cors(app): + cors_config = config["security"]["cors"] + + app.add_middleware( + CORSMiddleware, + allow_origins=cors_config["origins"], + allow_credentials=cors_config["allow_credentials"], + allow_methods=cors_config["allow_methods"], + allow_headers=cors_config["allow_headers"], + ) \ No newline at end of file diff --git a/app/api/middleware/error_handler.py b/app/api/middleware/error_handler.py new file mode 100644 index 0000000000000000000000000000000000000000..8b275378ed8675b435cc74ee076f82b79e698e1a --- /dev/null +++ b/app/api/middleware/error_handler.py @@ -0,0 +1,22 @@ +from fastapi import Request, status +from fastapi.responses import JSONResponse +from app.utils.errors import RagChatbotError +from app.utils.logger import logger + + +async def error_handler_middleware(request: Request, call_next): + try: + response = await call_next(request) + return response + except RagChatbotError as e: + logger.error(f"RAG error: {str(e)}") + return JSONResponse( + status_code=e.status_code, + content={"detail": str(e)} + ) + except Exception as e: + logger.error(f"Unhandled error: {str(e)}") + return JSONResponse( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content={"detail": "Internal server error"} + ) \ No newline at end of file diff --git a/app/api/middleware/logging.py b/app/api/middleware/logging.py new file mode 100644 index 0000000000000000000000000000000000000000..8a6b282bf1442e03735b5148e434122e8620f604 --- /dev/null +++ b/app/api/middleware/logging.py @@ -0,0 +1,19 @@ +from fastapi import Request +from app.utils.logger import logger +import time + + +async def logging_middleware(request: Request, call_next): + start_time = time.time() + + logger.info(f"Request: {request.method} {request.url.path}") + + response = await call_next(request) + + process_time = time.time() - start_time + logger.info( + f"Response: {request.method} {request.url.path} " + f"Status: {response.status_code} Time: {process_time:.2f}s" + ) + + return response \ No newline at end of file diff --git a/app/api/routes/__init__.py b/app/api/routes/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/api/routes/chat.py b/app/api/routes/chat.py new file mode 100644 index 0000000000000000000000000000000000000000..06cc055a338645e8c6b999fdce73baa80cb87e28 --- /dev/null +++ b/app/api/routes/chat.py @@ -0,0 +1,68 @@ +from fastapi import APIRouter, HTTPException +from fastapi.responses import StreamingResponse +from app.models.schemas import ChatRequest, ChatResponse +from app.services.chat_service import chat_service +from app.utils.logger import logger +from app.utils.errors import RagChatbotError +import json + +router = APIRouter(prefix="/chat", tags=["chat"]) + + +@router.post("/", response_model=ChatResponse) +async def chat(request: ChatRequest): + try: + response = await chat_service.generate_response( + message=request.message, + session_id=request.session_id, + use_rag=request.use_rag, + use_history=request.use_history + ) + + return ChatResponse( + message=response, + session_id=request.session_id + ) + + except RagChatbotError as e: + logger.error(f"RAG error: {str(e)}") + raise HTTPException(status_code=e.status_code, detail=str(e)) + except Exception as e: + logger.error(f"Chat error: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.post("/stream") +async def chat_stream(request: ChatRequest): + try: + async def generate(): + try: + async for chunk in chat_service.stream_response( + message=request.message, + session_id=request.session_id, + use_rag=request.use_rag, + use_history=request.use_history + ): + yield f"data: {json.dumps({'chunk': chunk})}\n\n" + + yield f"data: {json.dumps({'done': True})}\n\n" + + except Exception as e: + logger.error(f"Stream error: {str(e)}") + yield f"data: {json.dumps({'error': str(e)})}\n\n" + + return StreamingResponse( + generate(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + } + ) + + except RagChatbotError as e: + logger.error(f"RAG error: {str(e)}") + raise HTTPException(status_code=e.status_code, detail=str(e)) + except Exception as e: + logger.error(f"Chat stream error: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") diff --git a/app/api/routes/documents.py b/app/api/routes/documents.py new file mode 100644 index 0000000000000000000000000000000000000000..52b4e487a3d6e268cc94d154ab36e536681386ad --- /dev/null +++ b/app/api/routes/documents.py @@ -0,0 +1,77 @@ +from fastapi import APIRouter, HTTPException, UploadFile, File +from app.models.schemas import DocumentResponse +from app.services.document_service import document_service +from app.utils.logger import logger +from app.utils.errors import DocumentProcessingError +from typing import List +from datetime import datetime + +router = APIRouter(prefix="/documents", tags=["documents"]) + + +@router.post("/upload", response_model=DocumentResponse) +async def upload_document(file: UploadFile = File(...)): + try: + content = await file.read() + file_path = document_service.save_uploaded_file(content, file.filename) + + result = await document_service.process_document(file_path) + + document_service.delete_file(file_path) + + return DocumentResponse( + id=result["doc_id"], + filename=result["file_name"], + chunk_count=result["num_chunks"], + status="success", + created_at=datetime.utcnow() + ) + + except DocumentProcessingError as e: + logger.error(f"Processing error: {str(e)}") + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Upload error: {str(e)}") + raise HTTPException(status_code=500, detail="Upload failed") + + +@router.get("/", response_model=List[dict]) +async def list_documents(): + try: + documents = await document_service.get_all_documents() + + for doc in documents: + doc["_id"] = str(doc["_id"]) + + return documents + + except Exception as e: + logger.error(f"List error: {str(e)}") + raise HTTPException(status_code=500, detail="Failed to list documents") + + +@router.get("/stats") +async def get_stats(): + try: + stats = await document_service.get_document_stats() + return stats + except Exception as e: + logger.error(f"Stats error: {str(e)}") + raise HTTPException(status_code=500, detail="Failed to get stats") + + +@router.delete("/{doc_id}") +async def delete_document(doc_id: str): + try: + success = await document_service.delete_document(doc_id) + + if not success: + raise HTTPException(status_code=404, detail="Document not found") + + return {"message": "Document deleted successfully", "doc_id": doc_id} + + except HTTPException: + raise + except Exception as e: + logger.error(f"Delete error: {str(e)}") + raise HTTPException(status_code=500, detail="Delete failed") diff --git a/app/api/routes/health.py b/app/api/routes/health.py new file mode 100644 index 0000000000000000000000000000000000000000..4fd6152df3fed0f9c9786ac1b48867a314a3ff37 --- /dev/null +++ b/app/api/routes/health.py @@ -0,0 +1,38 @@ +from fastapi import APIRouter +from app.models.schemas import HealthResponse +from app.db.redis_client import RedisClient +from app.db.mongodb import MongoDB +from app.db.vector_store import vector_store +from app.utils.logger import logger + +router = APIRouter(prefix="/health", tags=["health"]) + + +@router.get("/", response_model=HealthResponse) +async def health_check(): + status = "healthy" + services = {} + + try: + redis_client = RedisClient() + await redis_client.redis.ping() + services["redis"] = "connected" + except Exception as e: + services["redis"] = f"error: {str(e)}" + status = "unhealthy" + + try: + mongodb = MongoDB() + await mongodb.client.admin.command('ping') + services["mongodb"] = "connected" + except Exception as e: + services["mongodb"] = f"error: {str(e)}" + status = "unhealthy" + + try: + services["qdrant"] = "connected" + except Exception as e: + services["qdrant"] = f"error: {str(e)}" + status = "unhealthy" + + return HealthResponse(status=status, services=services) \ No newline at end of file diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000000000000000000000000000000000000..f75eed0a55e4d984886794df0fdc1891befb04a5 --- /dev/null +++ b/app/config.py @@ -0,0 +1,54 @@ +from pathlib import Path +from typing import Any, Dict +import yaml +from pydantic_settings import BaseSettings +from functools import lru_cache + + +BASE_DIR = Path(__file__).parent.parent +CONFIG_DIR = BASE_DIR / "config" + + +class Settings(BaseSettings): + groq_api_key: str + huggingface_api_key: str = "" + tavily_api_key: str = "" + langsmith_api_key: str = "" + jwt_secret_key: str + mongodb_username: str = "" + mongodb_password: str = "" + redis_password: str = "" + qdrant_api_key: str = "" + + class Config: + env_file = BASE_DIR / ".env" + case_sensitive = False + + +def load_yaml(file_path: Path) -> Dict[str, Any]: + with open(file_path, "r") as f: + return yaml.safe_load(f) + + +@lru_cache() +def get_settings() -> Settings: + return Settings() + + +@lru_cache() +def load_config() -> Dict[str, Any]: + config = {} + + config["app"] = load_yaml(CONFIG_DIR / "app.yaml") + config["database"] = load_yaml(CONFIG_DIR / "database.yaml") + config["models"] = load_yaml(CONFIG_DIR / "models.yaml") + config["rag"] = load_yaml(CONFIG_DIR / "rag.yaml") + config["security"] = load_yaml(CONFIG_DIR / "security.yaml") + config["celery"] = load_yaml(CONFIG_DIR / "celery.yaml") + config["langchain"] = load_yaml(CONFIG_DIR / "langchain.yaml") + + return config + + +settings = get_settings() +config = load_config() diff --git a/app/core/__init__.py b/app/core/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/core/cache.py b/app/core/cache.py new file mode 100644 index 0000000000000000000000000000000000000000..1c61aa31864c032afaac5b9b7635cc1aaa4bd046 --- /dev/null +++ b/app/core/cache.py @@ -0,0 +1,60 @@ +import json +import hashlib +from typing import Optional +from app.db.redis_client import redis_client +from ingestion.embedder import embedder +from app.config import config +from app.utils.logger import logger + + +class SemanticCache: + def __init__(self): + self.ttl = config["rag"]["cache"]["ttl"] + self.enabled = config["rag"]["cache"]["enabled"] + + async def _get_cache_key(self, query: str, use_context: bool = True) -> str: + context_flag = "rag" if use_context else "no-rag" + return f"cache:{context_flag}:{hashlib.md5(query.encode()).hexdigest()}" + + async def get(self, query: str, use_context: bool = True) -> Optional[str]: + if not self.enabled: + return None + + try: + cache_key = await self._get_cache_key(query, use_context) + redis = await redis_client.get_client() + cached = await redis.get(cache_key) + + if cached: + logger.info(f"Cache hit for query: {query[:50]}...") + return cached + + return None + except Exception as e: + logger.error(f"Cache get error: {e}") + return None + + async def set(self, query: str, response: str, use_context: bool = True): + if not self.enabled: + return + + try: + cache_key = await self._get_cache_key(query, use_context) + redis = await redis_client.get_client() + await redis.setex(cache_key, self.ttl, response) + logger.info(f"Cached response for query: {query[:50]}...") + except Exception as e: + logger.error(f"Cache set error: {e}") + + async def clear(self): + try: + redis = await redis_client.get_client() + keys = await redis.keys("cache:*") + if keys: + await redis.delete(*keys) + logger.info(f"Cleared {len(keys)} cache entries") + except Exception as e: + logger.error(f"Cache clear error: {e}") + + +semantic_cache = SemanticCache() diff --git a/app/core/generator.py b/app/core/generator.py new file mode 100644 index 0000000000000000000000000000000000000000..94ac29825dfcf172e7662ec5b790e6cc7c2c3932 --- /dev/null +++ b/app/core/generator.py @@ -0,0 +1,57 @@ +from langchain_groq import ChatGroq +from langchain_core.messages import HumanMessage, SystemMessage +from app.config import config, settings +from app.utils.logger import logger +from typing import AsyncIterator + + +class LLMGenerator: + def __init__(self): + llm_config = config["models"]["llm"] + + self.llm = ChatGroq( + model=llm_config["model_name"], + temperature=llm_config["temperature"], + max_tokens=llm_config["max_tokens"], + groq_api_key=settings.groq_api_key, + streaming=llm_config["streaming"] + ) + + logger.info(f"LLM initialized: {llm_config['model_name']}") + + def generate(self, prompt: str, system_prompt: str = None) -> str: + messages = [] + + if system_prompt: + messages.append(SystemMessage(content=system_prompt)) + + messages.append(HumanMessage(content=prompt)) + + response = self.llm.invoke(messages) + return response.content + + async def agenerate(self, prompt: str, system_prompt: str = None) -> str: + messages = [] + + if system_prompt: + messages.append(SystemMessage(content=system_prompt)) + + messages.append(HumanMessage(content=prompt)) + + response = await self.llm.ainvoke(messages) + return response.content + + async def stream(self, prompt: str, system_prompt: str = None) -> AsyncIterator[str]: + messages = [] + + if system_prompt: + messages.append(SystemMessage(content=system_prompt)) + + messages.append(HumanMessage(content=prompt)) + + async for chunk in self.llm.astream(messages): + if chunk.content: + yield chunk.content + + +llm_generator = LLMGenerator() diff --git a/app/core/memory.py b/app/core/memory.py new file mode 100644 index 0000000000000000000000000000000000000000..b5e72f7bd8593b89a2e9f622d0dd58dad3c94df6 --- /dev/null +++ b/app/core/memory.py @@ -0,0 +1,55 @@ +from langchain_community.chat_message_histories import RedisChatMessageHistory +from langchain_core.runnables.history import RunnableWithMessageHistory +from app.config import config +from app.utils.logger import logger + + +class ConversationMemory: + def __init__(self): + self.max_messages = config["rag"]["memory"]["max_messages"] + self.redis_url = config["database"]["redis"]["url"] + + def get_message_history(self, session_id: str) -> RedisChatMessageHistory: + return RedisChatMessageHistory( + session_id=session_id, + url=self.redis_url, + ttl=86400 + ) + + def create_history_runnable(self, runnable): + return RunnableWithMessageHistory( + runnable, + self.get_message_history, + input_messages_key="input", + history_messages_key="chat_history" + ) + + def get_messages(self, session_id: str): + history = self.get_message_history(session_id) + messages = history.messages + return messages[-self.max_messages:] if len(messages) > self.max_messages else messages + + def add_message(self, session_id: str, role: str, content: str): + """Add a message to the conversation history.""" + try: + history = self.get_message_history(session_id) + if role == "user": + history.add_user_message(content) + elif role == "assistant": + history.add_ai_message(content) + logger.debug(f"Added {role} message to session {session_id}") + except Exception as e: + logger.error(f"Error adding message: {e}") + + def clear(self, session_id: str): + try: + history = self.get_message_history(session_id) + history.clear() + logger.info(f"Cleared memory for session {session_id}") + except Exception as e: + logger.error(f"Memory clear error: {e}") + + +conversation_memory = ConversationMemory() + + diff --git a/app/core/pipeline.py b/app/core/pipeline.py new file mode 100644 index 0000000000000000000000000000000000000000..34157345a5bb377a37a1c728638a4c6fc67caad5 --- /dev/null +++ b/app/core/pipeline.py @@ -0,0 +1,160 @@ +from langchain_core.runnables import RunnablePassthrough +from langchain_core.output_parsers import StrOutputParser +from langchain_core.prompts import ChatPromptTemplate +from app.core.retriever import hybrid_retriever +from app.core.reranker import document_reranker +from app.core.generator import llm_generator +from app.core.cache import semantic_cache +from app.core.memory import conversation_memory +from app.utils.prompts import get_rag_template, get_conversation_template, get_system_prompt +from app.utils.logger import logger +from app.config import config +from typing import AsyncIterator + + +class RAGPipeline: + def __init__(self): + self.retriever = hybrid_retriever + self.reranker = document_reranker + self.generator = llm_generator + self.cache = semantic_cache + self.memory = conversation_memory + self.use_cache = config["rag"]["cache"]["enabled"] + self.use_reranking = config["rag"]["retrieval"]["rerank"] + self.top_k = config["rag"]["retrieval"]["top_k"] + + logger.info("RAG Pipeline initialized") + + def _format_context(self, documents: list) -> str: + """Format retrieved documents into context string.""" + context_parts = [] + for i, doc in enumerate(documents, 1): + context_parts.append(f"[{i}] {doc.page_content}") + return "\n\n".join(context_parts) + + async def _retrieve_and_rerank(self, query: str) -> list: + """Retrieve and optionally rerank documents.""" + # Retrieve documents + documents = await self.retriever.ainvoke(query) + + if not documents: + logger.warning("No documents retrieved") + return [] + + logger.info(f"Retrieved {len(documents)} documents") + + # Rerank if enabled + if self.use_reranking: + documents = self.reranker.rerank(query, documents, top_k=self.top_k) + logger.info(f"Reranked to top {len(documents)} documents") + + return documents[:self.top_k] + + async def generate( + self, + query: str, + session_id: str = None, + use_context: bool = True + ) -> str: + """Generate response for query with optional RAG context.""" + + # Check cache first + if self.use_cache: + cached_response = await self.cache.get(query, use_context) + if cached_response: + logger.info("Cache hit") + return cached_response + + # Get conversation history if session provided + history = [] + if session_id: + history = self.memory.get_messages(session_id) + + # Retrieve and rerank documents if context needed + context = "" + if use_context: + documents = await self._retrieve_and_rerank(query) + if documents: + context = self._format_context(documents) + + # Build prompt + if context: + template = get_rag_template() + prompt = template.format(context=context, question=query) + else: + template = get_conversation_template() + prompt = template.format(question=query) + + # Generate response + system_prompt = get_system_prompt() + response = await self.generator.agenerate(prompt, system_prompt) + + # Save to memory if session provided + if session_id: + self.memory.add_message(session_id, "user", query) + self.memory.add_message(session_id, "assistant", response) + + # Cache the response + if self.use_cache: + await self.cache.set(query, response, use_context) + + logger.info("Response generated successfully") + return response + + async def stream( + self, + query: str, + session_id: str = None, + use_context: bool = True + ) -> AsyncIterator[str]: + """Stream response for query with optional RAG context.""" + + # Check cache first + if self.use_cache: + cached_response = await self.cache.get(query, use_context) + if cached_response: + logger.info("Cache hit - streaming cached response") + yield cached_response + return + + # Get conversation history if session provided + history = [] + if session_id: + history = self.memory.get_messages(session_id) + + # Retrieve and rerank documents if context needed + context = "" + if use_context: + documents = await self._retrieve_and_rerank(query) + if documents: + context = self._format_context(documents) + + # Build prompt + if context: + template = get_rag_template() + prompt = template.format(context=context, question=query) + else: + template = get_conversation_template() + prompt = template.format(question=query) + + # Stream response + system_prompt = get_system_prompt() + full_response = "" + + async for chunk in self.generator.stream(prompt, system_prompt): + full_response += chunk + yield chunk + + # Save to memory if session provided + if session_id: + self.memory.add_message(session_id, "user", query) + self.memory.add_message(session_id, "assistant", full_response) + + # Cache the full response + if self.use_cache: + await self.cache.set(query, full_response, use_context) + + logger.info("Response streamed successfully") + + +rag_pipeline = RAGPipeline() diff --git a/app/core/reranker.py b/app/core/reranker.py new file mode 100644 index 0000000000000000000000000000000000000000..98a86901459072997388d4ec7143b11964ec71c7 --- /dev/null +++ b/app/core/reranker.py @@ -0,0 +1,45 @@ +from typing import List +from langchain_core.documents import Document +from flashrank import Ranker, RerankRequest +from app.config import config +from app.utils.logger import logger + + +class DocumentReranker: + def __init__(self): + self.ranker = None + self.enabled = config["models"]["reranker"]["enabled"] + self.top_k = config["models"]["reranker"]["top_n"] + + if self.enabled: + model_name = config["models"]["reranker"]["model"] + self.ranker = Ranker(model_name=model_name) + logger.info(f"FlashRank reranker initialized: {model_name}") + + def rerank(self, query: str, documents: List[Document], top_k: int = None) -> List[Document]: + if not self.enabled or not documents: + return documents + + if top_k is None: + top_k = self.top_k + + passages = [ + {"id": i, "text": doc.page_content} + for i, doc in enumerate(documents) + ] + + rerank_request = RerankRequest(query=query, passages=passages) + results = self.ranker.rerank(rerank_request) + + reranked_docs = [] + for result in results[:top_k]: + doc_idx = result["id"] + doc = documents[doc_idx] + doc.metadata["rerank_score"] = result["score"] + reranked_docs.append(doc) + + logger.info(f"Reranked {len(documents)} โ {len(reranked_docs)} documents") + return reranked_docs + + +document_reranker = DocumentReranker() diff --git a/app/core/retriever.py b/app/core/retriever.py new file mode 100644 index 0000000000000000000000000000000000000000..d5bf24033d4cae026066abb0f9154903a3789e0b --- /dev/null +++ b/app/core/retriever.py @@ -0,0 +1,93 @@ +from langchain_qdrant import QdrantVectorStore +from langchain_community.retrievers import BM25Retriever +from langchain_core.retrievers import BaseRetriever +from langchain_core.documents import Document +from qdrant_client import QdrantClient +from ingestion.embedder import embedder +from app.config import config, settings +from app.utils.logger import logger +from typing import List + + +class HybridRetriever(BaseRetriever): + vector_store: QdrantVectorStore = None + bm25_retriever: BM25Retriever = None + documents: List[Document] = [] + k: int = 10 + _initialized: bool = False + + def __init__(self): + super().__init__() + self.k = config["rag"]["retrieval"]["top_k"] + + def _initialize_vector_store(self): + if not self._initialized: + qdrant_config = config["database"]["qdrant"] + + client = QdrantClient( + url=qdrant_config["url"], + api_key=settings.qdrant_api_key or None, + timeout=60 + ) + + try: + self.vector_store = QdrantVectorStore( + client=client, + collection_name=qdrant_config["collection_name"], + embedding=embedder.get_embeddings() + ) + self._initialized = True + logger.info(f"Vector store initialized: {qdrant_config['collection_name']}") + except Exception as e: + logger.warning(f"Vector store init skipped: {str(e)}") + + def add_documents(self, documents: List[Document]): + self._initialize_vector_store() + + ids = self.vector_store.add_documents(documents) + self.documents.extend(documents) + + self.bm25_retriever = BM25Retriever.from_documents( + self.documents, + k=self.k + ) + + logger.info(f"Added {len(documents)} documents (total: {len(self.documents)})") + return ids + + def _get_relevant_documents(self, query: str) -> List[Document]: + self._initialize_vector_store() + + if not self._initialized: + logger.warning("Vector store not available") + return [] + + vector_docs = self.vector_store.similarity_search(query, k=self.k) + + if self.bm25_retriever is None: + logger.warning("BM25 not initialized, using vector-only retrieval") + return vector_docs + + bm25_docs = self.bm25_retriever.invoke(query) + + combined = {} + for doc in vector_docs: + doc_id = doc.page_content[:100] + combined[doc_id] = doc + + for doc in bm25_docs: + doc_id = doc.page_content[:100] + if doc_id not in combined: + combined[doc_id] = doc + + results = list(combined.values())[:self.k] + logger.info(f"Hybrid search returned {len(results)} documents") + return results + + async def _aget_relevant_documents(self, query: str) -> List[Document]: + return self._get_relevant_documents(query) + + +hybrid_retriever = HybridRetriever() + + diff --git a/app/db/__init__.py b/app/db/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/db/mongodb.py b/app/db/mongodb.py new file mode 100644 index 0000000000000000000000000000000000000000..ff09695cd9597b1d26309ed33a4012cf5b2c3a98 --- /dev/null +++ b/app/db/mongodb.py @@ -0,0 +1,44 @@ +from motor.motor_asyncio import AsyncIOMotorClient +from app.config import config +from app.utils.logger import logger + + +class MongoDB: + def __init__(self): + self.client = None + self.db = None + + async def connect(self): + if self.client is None: + mongo_url = config["database"]["mongodb"]["url"] + db_name = config["database"]["mongodb"]["db_name"] + + self.client = AsyncIOMotorClient(mongo_url) + self.db = self.client[db_name] + + await self.client.admin.command('ping') + logger.info(f"MongoDB connected to database: {db_name}") + + return self.db + + async def disconnect(self): + if self.client: + self.client.close() + logger.info("MongoDB disconnected") + + def get_collection(self, collection_name: str = None): + if collection_name is None: + collection_name = config["database"]["mongodb"]["collection"] + + if self.db is None: + raise RuntimeError("MongoDB not connected. Call await connect() first.") + + return self.db[collection_name] + + async def get_db(self): + if self.db is None: + await self.connect() + return self.db + + +mongodb = MongoDB() diff --git a/app/db/redis_client.py b/app/db/redis_client.py new file mode 100644 index 0000000000000000000000000000000000000000..b18154d621bee753f4e680845cf7fd4212edd042 --- /dev/null +++ b/app/db/redis_client.py @@ -0,0 +1,32 @@ +import redis.asyncio as aioredis +from app.config import config +from app.utils.logger import logger + + +class RedisClient: + def __init__(self): + self.client = None + + async def connect(self): + if self.client is None: + redis_url = config["database"]["redis"]["url"] + self.client = await aioredis.from_url( + redis_url, + encoding="utf-8", + decode_responses=True + ) + logger.info("Redis connected") + return self.client + + async def disconnect(self): + if self.client: + await self.client.close() + logger.info("Redis disconnected") + + async def get_client(self): + if self.client is None: + await self.connect() + return self.client + + +redis_client = RedisClient() diff --git a/app/db/vector_store.py b/app/db/vector_store.py new file mode 100644 index 0000000000000000000000000000000000000000..591965b68eedf1adf241eac35d559c0c9aebb9bc --- /dev/null +++ b/app/db/vector_store.py @@ -0,0 +1,94 @@ +from qdrant_client import QdrantClient +from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue +from app.config import config, settings +from app.utils.logger import logger +from typing import List +import uuid + + +class VectorStore: + def __init__(self): + self.client = None + self.collection_name = config["database"]["qdrant"]["collection_name"] + + def connect(self): + if self.client is None: + qdrant_url = config["database"]["qdrant"]["url"] + api_key = settings.qdrant_api_key or None + + self.client = QdrantClient( + url=qdrant_url, + api_key=api_key + ) + logger.info("Qdrant connected") + + return self.client + + def create_collection(self, vector_size: int = None): + if vector_size is None: + vector_size = config["database"]["qdrant"]["vector_size"] + + client = self.get_client() + + if not client.collection_exists(self.collection_name): + client.create_collection( + collection_name=self.collection_name, + vectors_config=VectorParams( + size=vector_size, + distance=Distance.COSINE + ) + ) + logger.info(f"Created Qdrant collection: {self.collection_name}") + else: + logger.info(f"Qdrant collection already exists: {self.collection_name}") + + def get_client(self): + if self.client is None: + self.connect() + return self.client + + async def add_documents(self, collection_name: str, documents: List, embeddings: List[List[float]]): + client = self.get_client() + + points = [] + for i, (doc, embedding) in enumerate(zip(documents, embeddings)): + point_id = str(uuid.uuid4()) + + points.append( + PointStruct( + id=point_id, + vector=embedding, + payload={ + "text": doc.page_content, + **doc.metadata + } + ) + ) + + client.upsert( + collection_name=collection_name, + points=points + ) + + logger.info(f"Added {len(points)} documents to Qdrant") + return [p.id for p in points] + + async def delete_by_metadata(self, collection_name: str, metadata_key: str, metadata_value: str): + client = self.get_client() + + client.delete( + collection_name=collection_name, + points_selector=Filter( + must=[ + FieldCondition( + key=metadata_key, + match=MatchValue(value=metadata_value) + ) + ] + ) + ) + + logger.info(f"Deleted documents with {metadata_key}={metadata_value} from Qdrant") + + +vector_store = VectorStore() diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000000000000000000000000000000000000..169c416c7fd2284e372da049b2e6abb07a0c731f --- /dev/null +++ b/app/main.py @@ -0,0 +1,84 @@ +from fastapi import FastAPI +from fastapi.requests import Request +from fastapi.staticfiles import StaticFiles +from fastapi.templating import Jinja2Templates +from app.api.routes import chat, documents, health +from app.api.middleware.cors import setup_cors +from app.api.middleware.error_handler import error_handler_middleware +from app.api.middleware.logging import logging_middleware +from app.db.redis_client import RedisClient +from app.db.mongodb import MongoDB +from app.db.vector_store import vector_store +from app.utils.logger import logger +from app.config import config +from contextlib import asynccontextmanager + + +@asynccontextmanager +async def lifespan(app: FastAPI): + logger.info("Starting RAG Chatbot application...") + + redis_client = RedisClient() + await redis_client.connect() + logger.info("Redis connected") + + mongodb = MongoDB() + try: + await mongodb.connect() + logger.info("MongoDB connected") + except Exception as e: + logger.warning(f"MongoDB connection failed: {str(e)}") + + try: + vector_store.create_collection() + logger.info("Qdrant collection ready") + except Exception as e: + logger.warning(f"Qdrant setup warning: {str(e)}") + + yield + + await redis_client.disconnect() + logger.info("Redis disconnected") + + try: + await mongodb.disconnect() + logger.info("MongoDB disconnected") + except: + pass + + logger.info("Application shutdown complete") + + +app = FastAPI( + title=config["app"]["app"]["name"], + version=config["app"]["app"]["version"], + description="Enterprise RAG Chatbot with LangChain and LangGraph", + lifespan=lifespan +) + +setup_cors(app) + +app.middleware("http")(error_handler_middleware) +app.middleware("http")(logging_middleware) + +app.include_router(chat.router) +app.include_router(documents.router) +app.include_router(health.router) + +app.mount("/static", StaticFiles(directory="frontend/static"), name="static") +templates = Jinja2Templates(directory="frontend/templates") + + +@app.get("/") +async def root(request: Request): + return templates.TemplateResponse("index.html", {"request": request}) + + +if __name__ == "__main__": + import uvicorn + uvicorn.run( + "app.main:app", + host=config["app"]["server"]["host"], + port=config["app"]["server"]["port"], + reload=config["app"]["app"]["debug"] + ) diff --git a/app/models/__init__.py b/app/models/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/models/schemas.py b/app/models/schemas.py new file mode 100644 index 0000000000000000000000000000000000000000..e78f95848168e27e2d8a7ed127974a574a600045 --- /dev/null +++ b/app/models/schemas.py @@ -0,0 +1,49 @@ +from typing import Optional, List +from pydantic import BaseModel, Field +from datetime import datetime + + +class ChatMessage(BaseModel): + role: str + content: str + timestamp: Optional[datetime] = None + + +class ChatRequest(BaseModel): + message: str + session_id: Optional[str] = None + use_history: bool = True + use_rag: bool = True + + +class ChatResponse(BaseModel): + response: str + sources: Optional[List[dict]] = [] + session_id: str + timestamp: datetime = Field(default_factory=datetime.utcnow) + + +class DocumentUpload(BaseModel): + filename: str + content_type: str + size: int + + +class DocumentResponse(BaseModel): + id: str + filename: str + status: str + created_at: datetime + chunk_count: Optional[int] = 0 + + +class HealthResponse(BaseModel): + status: str + timestamp: datetime = Field(default_factory=datetime.utcnow) + services: dict + + +class ErrorResponse(BaseModel): + error: str + detail: Optional[str] = None + timestamp: datetime = Field(default_factory=datetime.utcnow) diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..a70b3029a5ce0e22988567ea083fca68daa2141b --- /dev/null +++ b/app/services/__init__.py @@ -0,0 +1 @@ +# Services package diff --git a/app/services/chat_service.py b/app/services/chat_service.py new file mode 100644 index 0000000000000000000000000000000000000000..4a85af1d9e142c25d9dd29941cbd549133999274 --- /dev/null +++ b/app/services/chat_service.py @@ -0,0 +1,70 @@ +from app.core.pipeline import rag_pipeline +from app.core.memory import conversation_memory +from app.utils.logger import logger +from typing import AsyncIterator, Optional + + +class ChatService: + + def __init__(self): + self.pipeline = rag_pipeline + self.memory = conversation_memory + + async def generate_response( + self, + message: str, + session_id: Optional[str] = None, + use_rag: bool = True, + use_history: bool = True + ) -> str: + try: + response = await self.pipeline.generate( + query=message, + session_id=session_id if use_history else None, + use_context=use_rag + ) + + logger.info(f"Generated response for session: {session_id}") + return response + + except Exception as e: + logger.error(f"Chat service error: {str(e)}") + raise + + async def stream_response( + self, + message: str, + session_id: Optional[str] = None, + use_rag: bool = True, + use_history: bool = True + ) -> AsyncIterator[str]: + try: + async for chunk in self.pipeline.stream( + query=message, + session_id=session_id if use_history else None, + use_context=use_rag + ): + yield chunk + + except Exception as e: + logger.error(f"Chat stream error: {str(e)}") + raise + + def get_chat_history(self, session_id: str) -> list: + try: + return self.memory.get_messages(session_id) + except Exception as e: + logger.error(f"Get history error: {str(e)}") + return [] + + def clear_chat_history(self, session_id: str) -> bool: + try: + self.memory.clear(session_id) + logger.info(f"Cleared history for session: {session_id}") + return True + except Exception as e: + logger.error(f"Clear history error: {str(e)}") + return False + + +chat_service = ChatService() diff --git a/app/services/document_service.py b/app/services/document_service.py new file mode 100644 index 0000000000000000000000000000000000000000..dafd24524dc7be0cd79e0e857f688dfe2e1294e8 --- /dev/null +++ b/app/services/document_service.py @@ -0,0 +1,165 @@ +from ingestion.document_processor import DocumentProcessor +from app.db.mongodb import MongoDB +from app.db.vector_store import vector_store +from app.utils.logger import logger +from app.config import config +from typing import Dict, List, Optional +from pathlib import Path +import os + + +class DocumentService: + + def __init__(self): + self.processor = DocumentProcessor() + self.mongodb = MongoDB() + self.vector_store = vector_store + self.upload_dir = config["app"]["upload"]["upload_dir"] + self.collection_name = config["database"]["qdrant"]["collection_name"] + + async def process_document( + self, + file_path: str, + metadata: Optional[Dict] = None + ) -> Dict: + try: + result = await self.processor.process_document(file_path, metadata) + logger.info(f"Document processed: {result['file_name']}") + return result + + except Exception as e: + logger.error(f"Document processing failed: {str(e)}") + raise + + async def get_all_documents(self) -> List[Dict]: + try: + if self.mongodb.db is None: + await self.mongodb.connect() + + collection = await self.mongodb.get_collection("documents") + documents = await collection.find().to_list(length=None) + + return documents + + except Exception as e: + logger.error(f"Get documents error: {str(e)}") + return [] + + async def get_document_by_id(self, doc_id: str) -> Optional[Dict]: + try: + if self.mongodb.db is None: + await self.mongodb.connect() + + collection = await self.mongodb.get_collection("documents") + document = await collection.find_one({"doc_id": doc_id}) + + return document + + except Exception as e: + logger.error(f"Get document error: {str(e)}") + return None + + async def delete_document(self, doc_id: str) -> bool: + try: + if self.mongodb.db is None: + await self.mongodb.connect() + + await self.vector_store.delete_by_metadata( + collection_name=self.collection_name, + metadata_key="doc_id", + metadata_value=doc_id + ) + + collection = await self.mongodb.get_collection("documents") + result = await collection.delete_one({"doc_id": doc_id}) + + if result.deleted_count > 0: + logger.info(f"Document deleted: {doc_id}") + return True + + return False + + except Exception as e: + logger.error(f"Delete document error: {str(e)}") + return False + + async def search_documents( + self, + query: str, + limit: int = 10 + ) -> List[Dict]: + try: + if self.mongodb.db is None: + await self.mongodb.connect() + + collection = await self.mongodb.get_collection("documents") + documents = await collection.find( + {"$text": {"$search": query}} + ).limit(limit).to_list(length=limit) + + return documents + + except Exception as e: + logger.error(f"Search documents error: {str(e)}") + return [] + + async def get_document_stats(self) -> Dict: + try: + if self.mongodb.db is None: + await self.mongodb.connect() + + collection = await self.mongodb.get_collection("documents") + + total_docs = await collection.count_documents({}) + + pipeline = [ + { + "$group": { + "_id": None, + "total_chunks": {"$sum": "$num_chunks"} + } + } + ] + + result = await collection.aggregate(pipeline).to_list(length=1) + total_chunks = result[0]["total_chunks"] if result else 0 + + return { + "total_documents": total_docs, + "total_chunks": total_chunks + } + + except Exception as e: + logger.error(f"Get stats error: {str(e)}") + return {"total_documents": 0, "total_chunks": 0} + + def save_uploaded_file(self, file_content: bytes, filename: str) -> str: + try: + os.makedirs(self.upload_dir, exist_ok=True) + + file_path = os.path.join(self.upload_dir, filename) + + with open(file_path, "wb") as f: + f.write(file_content) + + logger.info(f"File saved: {file_path}") + return file_path + + except Exception as e: + logger.error(f"Save file error: {str(e)}") + raise + + def delete_file(self, file_path: str) -> bool: + try: + if os.path.exists(file_path): + os.remove(file_path) + logger.info(f"File deleted: {file_path}") + return True + return False + + except Exception as e: + logger.error(f"Delete file error: {str(e)}") + return False + + +document_service = DocumentService() diff --git a/app/services/mongodb_watcher.py b/app/services/mongodb_watcher.py new file mode 100644 index 0000000000000000000000000000000000000000..c7f7b2e94bca88e46745693fcb4aefa00f5916fc --- /dev/null +++ b/app/services/mongodb_watcher.py @@ -0,0 +1,97 @@ +from motor.motor_asyncio import AsyncIOMotorChangeStream +from app.db.mongodb import MongoDB +from app.utils.logger import logger +from typing import Callable, Dict +import asyncio + + +class MongoDBWatcher: + + def __init__(self): + self.mongodb = MongoDB() + self.watchers = {} + self.running = False + + async def start(self): + if self.mongodb.db is None: + await self.mongodb.connect() + + self.running = True + logger.info("MongoDB watcher started") + + async def stop(self): + self.running = False + for watcher in self.watchers.values(): + if hasattr(watcher, 'close'): + await watcher.close() + logger.info("MongoDB watcher stopped") + + async def watch_collection( + self, + collection_name: str, + callback: Callable, + pipeline: list = None + ): + try: + collection = await self.mongodb.get_collection(collection_name) + + if pipeline is None: + pipeline = [ + { + "$match": { + "operationType": {"$in": ["insert", "update", "delete"]} + } + } + ] + + logger.info(f"Watching collection: {collection_name}") + + async with collection.watch(pipeline) as change_stream: + self.watchers[collection_name] = change_stream + + async for change in change_stream: + if not self.running: + break + + try: + await callback(change) + except Exception as e: + logger.error(f"Callback error for {collection_name}: {e}") + + except Exception as e: + logger.error(f"Watch error for {collection_name}: {e}") + + +async def on_document_change(change: Dict): + operation = change.get("operationType") + document = change.get("fullDocument") + + if operation == "insert": + logger.info(f"New document inserted: {document.get('file_name')}") + + elif operation == "update": + logger.info(f"Document updated: {document.get('file_name')}") + + elif operation == "delete": + doc_id = change.get("documentKey", {}).get("_id") + logger.info(f"Document deleted: {doc_id}") + + +mongodb_watcher = MongoDBWatcher() + + +async def start_watchers(): + await mongodb_watcher.start() + + asyncio.create_task( + mongodb_watcher.watch_collection( + "documents", + on_document_change + ) + ) + + logger.info("All MongoDB watchers started") + + +async def stop_watchers(): + await mongodb_watcher.stop() diff --git a/app/utils/__init__.py b/app/utils/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/utils/errors.py b/app/utils/errors.py new file mode 100644 index 0000000000000000000000000000000000000000..9691bc436a9db7de6bd6ecbde072d973b17bdab7 --- /dev/null +++ b/app/utils/errors.py @@ -0,0 +1,40 @@ +class RagChatbotError(Exception): + def __init__(self, message: str, status_code: int = 500): + self.message = message + self.status_code = status_code + super().__init__(self.message) + + +class DatabaseError(RagChatbotError): + def __init__(self, message: str = "Database operation failed"): + super().__init__(message, status_code=500) + + +class VectorStoreError(RagChatbotError): + def __init__(self, message: str = "Vector store operation failed"): + super().__init__(message, status_code=500) + + +class DocumentProcessingError(RagChatbotError): + def __init__(self, message: str = "Document processing failed"): + super().__init__(message, status_code=400) + + +class EmbeddingError(RagChatbotError): + def __init__(self, message: str = "Embedding generation failed"): + super().__init__(message, status_code=500) + + +class LLMError(RagChatbotError): + def __init__(self, message: str = "LLM generation failed"): + super().__init__(message, status_code=500) + + +class AuthenticationError(RagChatbotError): + def __init__(self, message: str = "Authentication failed"): + super().__init__(message, status_code=401) + + +class ValidationError(RagChatbotError): + def __init__(self, message: str = "Validation failed"): + super().__init__(message, status_code=422) diff --git a/app/utils/logger.py b/app/utils/logger.py new file mode 100644 index 0000000000000000000000000000000000000000..bd734d018a068859ec6d0040f9906adb17d823a1 --- /dev/null +++ b/app/utils/logger.py @@ -0,0 +1,34 @@ +import logging +import sys +from pathlib import Path + + +BASE_DIR = Path(__file__).parent.parent.parent +LOG_DIR = BASE_DIR / "logs" +LOG_DIR.mkdir(exist_ok=True) + + +def setup_logger(name: str, level: str = "INFO") -> logging.Logger: + logger = logging.getLogger(name) + logger.setLevel(getattr(logging, level.upper())) + + if logger.handlers: + return logger + + formatter = logging.Formatter( + fmt="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" + ) + + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + + file_handler = logging.FileHandler(LOG_DIR / "app.log") + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + return logger + + +logger = setup_logger("rag_chatbot") diff --git a/app/utils/metrics.py b/app/utils/metrics.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/app/utils/prompts.py b/app/utils/prompts.py new file mode 100644 index 0000000000000000000000000000000000000000..f37be2de4792e4a9c2af85e6186b2d1dddf631f5 --- /dev/null +++ b/app/utils/prompts.py @@ -0,0 +1,24 @@ +from pathlib import Path +from functools import lru_cache + + +PROMPTS_DIR = Path(__file__).parent.parent.parent / "prompts" + + +@lru_cache() +def load_prompt(filename: str) -> str: + prompt_path = PROMPTS_DIR / filename + with open(prompt_path, "r", encoding="utf-8") as f: + return f.read().strip() + + +def get_system_prompt() -> str: + return load_prompt("system.txt") + + +def get_rag_template() -> str: + return load_prompt("rag_template.txt") + + +def get_conversation_template() -> str: + return load_prompt("conversation_template.txt") diff --git a/app/utils/validators.py b/app/utils/validators.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/config/app.yaml b/config/app.yaml new file mode 100644 index 0000000000000000000000000000000000000000..cd1cf97b4cb3938045527a11242c877b0ac84055 --- /dev/null +++ b/config/app.yaml @@ -0,0 +1,21 @@ +app: + name: "RAG Chatbot" + version: "1.0.0" + environment: "development" + debug: true + log_level: "INFO" + +server: + host: "0.0.0.0" + port: 8000 + workers: 4 + reload: true + +upload: + max_size_mb: 10 + allowed_extensions: + - ".pdf" + - ".txt" + - ".docx" + - ".md" + upload_dir: "uploads" diff --git a/config/celery.yaml b/config/celery.yaml new file mode 100644 index 0000000000000000000000000000000000000000..1586110f2b5c26c5ad65358c3dd3342bd6424b76 --- /dev/null +++ b/config/celery.yaml @@ -0,0 +1,6 @@ +broker_url: "redis://localhost:6379/1" +result_backend: "redis://localhost:6379/2" + +task_routes: + "workers.tasks.process_document": "default" + "workers.tasks.embed_and_index": "default" diff --git a/config/database.yaml b/config/database.yaml new file mode 100644 index 0000000000000000000000000000000000000000..0759e276218f8a8bd7b23ac5b67590ad11769fc8 --- /dev/null +++ b/config/database.yaml @@ -0,0 +1,12 @@ +mongodb: + url: "mongodb+srv://abeshith24:Silvershades24@cluster0.z0fla4k.mongodb.net/?appName=Cluster0" + db_name: "rag_chatbot" + collection: "documents" + +redis: + url: "redis://default:Z1onH8GwDP7ayX7tlALvcezqOqHgcziH@redis-17971.c256.us-east-1-2.ec2.cloud.redislabs.com:17971" + +qdrant: + url: "https://443828e0-56d0-4072-b87d-5f3056aa3a15.europe-west3-0.gcp.cloud.qdrant.io:6333" + collection_name: "rag_embeddings" + vector_size: 384 diff --git a/config/langchain.yaml b/config/langchain.yaml new file mode 100644 index 0000000000000000000000000000000000000000..bbedfcffbf3b7da7bcd345bf02ac7673ea0dea75 --- /dev/null +++ b/config/langchain.yaml @@ -0,0 +1,15 @@ +langchain: + verbose: false + callbacks: [] + +langsmith: + enabled: false + project_name: "rag-chatbot" + +langgraph: + recursion_limit: 25 + checkpointer: "redis" + +callbacks: + streaming: true + token_counting: true diff --git a/config/models.yaml b/config/models.yaml new file mode 100644 index 0000000000000000000000000000000000000000..869486797f4364cfe3c98d3ee70b4f4d14373574 --- /dev/null +++ b/config/models.yaml @@ -0,0 +1,14 @@ +llm: + model_name: "openai/gpt-oss-20b" + temperature: 0.7 + max_tokens: 1024 + streaming: true + +embedding: + model_name: "BAAI/bge-small-en-v1.5" + max_length: 512 + +reranker: + enabled: true + model: "ms-marco-MiniLM-L-12-v2" + top_n: 3 diff --git a/config/rag.yaml b/config/rag.yaml new file mode 100644 index 0000000000000000000000000000000000000000..934d14a3aff8b6f3eea419e554e573807ebd0605 --- /dev/null +++ b/config/rag.yaml @@ -0,0 +1,20 @@ +text_splitter: + chunk_size: 512 + chunk_overlap: 50 + +retrieval: + top_k: 10 + search_type: "mmr" + fetch_k: 20 + lambda_mult: 0.5 + score_threshold: 0.7 + rerank: true + +cache: + enabled: true + ttl: 3600 + +memory: + type: "buffer" + max_messages: 10 + return_messages: true diff --git a/config/security.yaml b/config/security.yaml new file mode 100644 index 0000000000000000000000000000000000000000..0fb4c48b72a781dba5b77db403388fd0c035e9ad --- /dev/null +++ b/config/security.yaml @@ -0,0 +1,22 @@ +cors: + enabled: true + origins: + - "http://localhost:3000" + - "http://localhost:8000" + allow_credentials: true + allow_methods: ["*"] + allow_headers: ["*"] + +rate_limiting: + enabled: true + requests_per_minute: 60 + burst: 10 + storage: "redis" + +jwt: + algorithm: "HS256" + access_token_expire_minutes: 30 + refresh_token_expire_days: 7 + +allowed_hosts: + - "*" diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000000000000000000000000000000000000..eda2b6b1b11d5962bf7b64187147074899c6a25a --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,61 @@ +version: '3.8' + +services: + # Main RAG application + app: + build: . + container_name: rag-chatbot + ports: + - "7860:7860" + environment: + - GROQ_API_KEY=${GROQ_API_KEY} + - QDRANT_API_KEY=${QDRANT_API_KEY} + - REDIS_PASSWORD=${REDIS_PASSWORD} + env_file: + - .env + volumes: + - ./uploads:/app/uploads + - ./logs:/app/logs + depends_on: + - redis + restart: unless-stopped + networks: + - rag-network + + # Redis for caching + redis: + image: redis:7-alpine + container_name: rag-redis + ports: + - "6379:6379" + command: redis-server --appendonly yes + volumes: + - redis-data:/data + restart: unless-stopped + networks: + - rag-network + + # Celery worker (optional) + celery-worker: + build: . + container_name: rag-celery-worker + command: celery -A app.tasks.celery_app worker --loglevel=info + environment: + - GROQ_API_KEY=${GROQ_API_KEY} + - QDRANT_API_KEY=${QDRANT_API_KEY} + - REDIS_PASSWORD=${REDIS_PASSWORD} + env_file: + - .env + depends_on: + - redis + - app + restart: unless-stopped + networks: + - rag-network + +volumes: + redis-data: + +networks: + rag-network: + driver: bridge diff --git a/frontend/static/css/style.css b/frontend/static/css/style.css new file mode 100644 index 0000000000000000000000000000000000000000..f4f0ac2ca3f5b860f41195ca5e20e54b66d288ed --- /dev/null +++ b/frontend/static/css/style.css @@ -0,0 +1,222 @@ +* { + margin: 0; + padding: 0; + box-sizing: border-box; +} + +body { + font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif; + background: #f5f5f5; + color: #333; +} + +.container { + display: flex; + height: 100vh; +} + +.sidebar { + width: 300px; + background: #fff; + border-right: 1px solid #e0e0e0; + display: flex; + flex-direction: column; +} + +.sidebar-header { + padding: 20px; + border-bottom: 1px solid #e0e0e0; +} + +.sidebar-header h2 { + font-size: 18px; + font-weight: 600; +} + +.upload-section { + padding: 20px; + border-bottom: 1px solid #e0e0e0; +} + +.btn-upload { + width: 100%; + padding: 12px; + background: #007bff; + color: white; + border: none; + border-radius: 6px; + cursor: pointer; + font-size: 14px; + font-weight: 500; + transition: background 0.2s; +} + +.btn-upload:hover { + background: #0056b3; +} + +.document-list { + flex: 1; + overflow-y: auto; + padding: 10px; +} + +.document-item { + padding: 12px; + margin-bottom: 8px; + background: #f8f9fa; + border-radius: 6px; + display: flex; + justify-content: space-between; + align-items: center; +} + +.document-item:hover { + background: #e9ecef; +} + +.document-name { + font-size: 14px; + flex: 1; +} + +.btn-delete { + padding: 6px 12px; + background: #dc3545; + color: white; + border: none; + border-radius: 4px; + cursor: pointer; + font-size: 12px; +} + +.btn-delete:hover { + background: #c82333; +} + +.main-content { + flex: 1; + display: flex; + flex-direction: column; + background: #fff; +} + +.chat-header { + padding: 20px; + border-bottom: 1px solid #e0e0e0; + display: flex; + justify-content: space-between; + align-items: center; +} + +.chat-header h1 { + font-size: 24px; + font-weight: 600; +} + +.toggle-rag { + display: flex; + align-items: center; + gap: 8px; +} + +.toggle-rag label { + display: flex; + align-items: center; + gap: 8px; + cursor: pointer; +} + +.chat-messages { + flex: 1; + overflow-y: auto; + padding: 20px; + display: flex; + flex-direction: column; + gap: 16px; +} + +.message { + max-width: 70%; + padding: 12px 16px; + border-radius: 12px; + line-height: 1.5; + font-size: 14px; +} + +.message.user { + align-self: flex-end; + background: #007bff; + color: white; +} + +.message.assistant { + align-self: flex-start; + background: #f1f3f4; + color: #333; +} + +.message.loading { + align-self: flex-start; + background: #f1f3f4; + color: #666; + font-style: italic; +} + +.chat-input { + padding: 20px; + border-top: 1px solid #e0e0e0; + display: flex; + gap: 12px; +} + +#messageInput { + flex: 1; + padding: 12px 16px; + border: 1px solid #e0e0e0; + border-radius: 24px; + font-size: 14px; + outline: none; +} + +#messageInput:focus { + border-color: #007bff; +} + +.btn-send { + padding: 12px 24px; + background: #007bff; + color: white; + border: none; + border-radius: 24px; + cursor: pointer; + font-size: 14px; + font-weight: 500; + transition: background 0.2s; +} + +.btn-send:hover { + background: #0056b3; +} + +.btn-send:disabled { + background: #ccc; + cursor: not-allowed; +} + +::-webkit-scrollbar { + width: 8px; +} + +::-webkit-scrollbar-track { + background: #f1f1f1; +} + +::-webkit-scrollbar-thumb { + background: #888; + border-radius: 4px; +} + +::-webkit-scrollbar-thumb:hover { + background: #555; +} \ No newline at end of file diff --git a/frontend/static/js/app.js b/frontend/static/js/app.js new file mode 100644 index 0000000000000000000000000000000000000000..74eaf5d8bccd2fccc7c2a1ecf7f1de6f7ed96a64 --- /dev/null +++ b/frontend/static/js/app.js @@ -0,0 +1,188 @@ +const chatMessages = document.getElementById('chatMessages'); +const messageInput = document.getElementById('messageInput'); +const sendBtn = document.getElementById('sendBtn'); +const uploadBtn = document.getElementById('uploadBtn'); +const fileInput = document.getElementById('fileInput'); +const documentList = document.getElementById('documentList'); +const ragToggle = document.getElementById('ragToggle'); + +let sessionId = generateSessionId(); + +function generateSessionId() { + return 'session_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9); +} + +function addMessage(content, isUser = false) { + const messageDiv = document.createElement('div'); + messageDiv.className = `message ${isUser ? 'user' : 'assistant'}`; + messageDiv.textContent = content; + chatMessages.appendChild(messageDiv); + chatMessages.scrollTop = chatMessages.scrollHeight; + return messageDiv; +} + +function addLoadingMessage() { + const messageDiv = document.createElement('div'); + messageDiv.className = 'message loading'; + messageDiv.textContent = 'Thinking...'; + chatMessages.appendChild(messageDiv); + chatMessages.scrollTop = chatMessages.scrollHeight; + return messageDiv; +} + +async function sendMessage() { + const message = messageInput.value.trim(); + if (!message) return; + + addMessage(message, true); + messageInput.value = ''; + sendBtn.disabled = true; + + const loadingMsg = addLoadingMessage(); + + try { + const response = await fetch('/chat/stream', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + message: message, + session_id: sessionId, + use_rag: ragToggle.checked + }) + }); + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + + loadingMsg.remove(); + const assistantMsg = addMessage('', false); + let fullResponse = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + const chunk = decoder.decode(value); + const lines = chunk.split('\n'); + + for (const line of lines) { + if (line.startsWith('data: ')) { + const data = JSON.parse(line.slice(6)); + + if (data.chunk) { + fullResponse += data.chunk; + assistantMsg.textContent = fullResponse; + chatMessages.scrollTop = chatMessages.scrollHeight; + } + + if (data.error) { + assistantMsg.textContent = 'Error: ' + data.error; + assistantMsg.style.color = '#dc3545'; + } + } + } + } + + } catch (error) { + loadingMsg.remove(); + addMessage('Error: ' + error.message, false); + } finally { + sendBtn.disabled = false; + messageInput.focus(); + } +} + +messageInput.addEventListener('keypress', (e) => { + if (e.key === 'Enter') { + sendMessage(); + } +}); + +sendBtn.addEventListener('click', sendMessage); + +uploadBtn.addEventListener('click', () => { + fileInput.click(); +}); + +fileInput.addEventListener('change', async (e) => { + const file = e.target.files[0]; + if (!file) return; + + const formData = new FormData(); + formData.append('file', file); + + uploadBtn.disabled = true; + uploadBtn.textContent = 'Uploading...'; + + try { + const response = await fetch('/documents/upload', { + method: 'POST', + body: formData + }); + + if (response.ok) { + const data = await response.json(); + addMessage(`Document "${data.filename}" uploaded successfully! (${data.chunk_count} chunks)`, false); + loadDocuments(); + } else { + const error = await response.json(); + addMessage('Upload failed: ' + error.detail, false); + } + } catch (error) { + addMessage('Upload error: ' + error.message, false); + } finally { + uploadBtn.disabled = false; + uploadBtn.textContent = 'Upload Document'; + fileInput.value = ''; + } +}); + +async function loadDocuments() { + try { + const response = await fetch('/documents/'); + const documents = await response.json(); + + documentList.innerHTML = ''; + + documents.forEach(doc => { + const docDiv = document.createElement('div'); + docDiv.className = 'document-item'; + + const nameSpan = document.createElement('span'); + nameSpan.className = 'document-name'; + nameSpan.textContent = doc.filename; + + const deleteBtn = document.createElement('button'); + deleteBtn.className = 'btn-delete'; + deleteBtn.textContent = 'Delete'; + deleteBtn.onclick = () => deleteDocument(doc.id); + + docDiv.appendChild(nameSpan); + docDiv.appendChild(deleteBtn); + documentList.appendChild(docDiv); + }); + } catch (error) { + console.error('Failed to load documents:', error); + } +} + +async function deleteDocument(docId) { + if (!confirm('Are you sure you want to delete this document?')) return; + + try { + const response = await fetch(`/documents/${docId}`, { + method: 'DELETE' + }); + + if (response.ok) { + loadDocuments(); + addMessage('Document deleted successfully', false); + } + } catch (error) { + addMessage('Delete failed: ' + error.message, false); + } +} + +loadDocuments(); \ No newline at end of file diff --git a/frontend/templates/index.html b/frontend/templates/index.html new file mode 100644 index 0000000000000000000000000000000000000000..5e677c10ea078dab3483009837948f6d7231ae86 --- /dev/null +++ b/frontend/templates/index.html @@ -0,0 +1,44 @@ + + +
+ + +