demo10 / chatbot.py
chaaim123's picture
Create chatbot.py
e62a20b verified
#!/usr/bin/env python
"""
American University Academic Advisor Chatbot
===========================================
A RAG-based chatbot system that answers questions about American University academic programs,
leveraging ChromaDB for vector retrieval and Mistral 7B for response generation.
Features:
---------
- Course requirement pattern recognition: Distinguishes between required courses, alternative
options ("take either X or Y"), option groups, and true electives
- Academic terminology matching: Connects student questions using "required" to program
descriptions using "must complete"
- Specialized formatting for course requirements: Organizes courses by type with clear labels
- Response generation using Mistral 7B: Creates natural language responses with source citations
- Conversation history tracking: Maintains context across multiple questions
Usage:
------
1. Command line:
python chatbot.py
2. Import in another script:
from chatbot import ask_question
result = ask_question("What are the required courses for the Data Science program?")
print(result["response"])
3. Clear conversation history:
from chatbot import clear_conversation
clear_conversation()
Requirements:
------------
- Python 3.8+
- ChromaDB for vector storage and retrieval
- Hugging Face API access for Mistral 7B
- Keyring (optional) for secure API key storage
Configuration:
-------------
The system needs a Hugging Face API key for generating responses. Set it using:
keyring.set_password("HF_API_KEY", "rressler", "<your_api_key>")
Or create an .env file with:
HF_API_KEY=<your_api_key>
Note:
-----
This implementation is designed specifically for academic program queries that
involve distinguishing between required courses and alternatives. It uses
specialized detection for patterns like "STAT-320 or STAT-302" to correctly
inform students about their course options.
"""
# chatbot.py
import os
import sys
import re
from pathlib import Path
import logging
import requests
import json
import math
import warnings
from typing import List, Dict, Tuple, Any, Optional
# Suppress some unnecessary warnings
warnings.filterwarnings("ignore", category=FutureWarning)
# Local imports
from utils.logging_utils import setup_logging
from utils.chroma_utils import get_chroma_manager
from utils.auth_utils import authenticate_huggingface
# Configure logging
logger = setup_logging(logger_name="Chatbot", log_filename="chatbot.log")
def configure_api_credentials() -> Tuple[Optional[str], str, Optional[Dict[str, str]]]:
"""
Configure Hugging Face API credentials using a unified method.
Returns:
Tuple: (API key, Model URL, Headers)
"""
try:
hf_api_key, headers = authenticate_huggingface()
model_url = os.getenv(
"MISTRAL_API_URL",
"https://api-inference.huggingface.co/models/mistralai/Mistral-7B-Instruct-v0.3"
)
return hf_api_key, model_url, headers
except Exception as e:
logger.warning(f"Authentication failed: {e}")
raise
# Global configuration
try:
HF_API_KEY, MISTRAL_API_URL, MISTRAL_HEADERS = configure_api_credentials()
except Exception as e:
logger.error(f"Failed to configure API credentials: {e}")
HF_API_KEY, MISTRAL_API_URL, MISTRAL_HEADERS = None, None, None
# Initialize ChromaDB manager
global_chroma_manager = get_chroma_manager(model_size="medium")
print(type(global_chroma_manager))
def classify_course_level(course_code):
"""
Classify course level based on course number.
Args:
course_code (str): The course code (e.g., "MATH-221", "STAT-615")
Returns:
dict: Dictionary with course_level and level_description
"""
# Initialize classification metadata
classification = {
"course_level": "unknown",
"level_description": "Unknown course level"
}
# Extract the course number from the course code
try:
# Handle different separator formats (hyphen, space, dot)
if '-' in course_code:
parts = course_code.split('-')
elif ' ' in course_code:
parts = course_code.split(' ')
elif '.' in course_code:
parts = course_code.split('.')
else:
# Try to separate letters from numbers
import re
match = re.match(r'^([A-Za-z]+)(\d+)$', course_code)
if match:
parts = [match.group(1), match.group(2)]
else:
return classification
# Get the course number
if len(parts) < 2:
return classification
# Extract numeric part and convert to integer
course_num_str = parts[1].strip()
# Remove any trailing letters (like in "100A")
course_num_str = ''.join(c for c in course_num_str if c.isdigit())
course_num = int(course_num_str)
# Classify based on course number
if course_num <= 499:
classification["course_level"] = "undergraduate"
classification["level_description"] = "Undergraduate course"
elif 500 <= course_num <= 599:
classification["course_level"] = "graduate_open"
classification["level_description"] = "Graduate course open to qualified undergraduate students"
elif 600 <= course_num <= 699:
classification["course_level"] = "graduate_core"
classification["level_description"] = "Core graduate course for the master's degree in the field of study"
elif 700 <= course_num <= 799:
classification["course_level"] = "graduate_advanced"
classification["level_description"] = "Advanced graduate course"
else:
classification["course_level"] = "other"
classification["level_description"] = f"Course number {course_num} outside standard classification"
except Exception as e:
# If there's any error in parsing, return the default classification
pass
return classification
def extract_courses_from_results(results):
"""
Extract course information from the query results with level classification.
Args:
results (dict): Results from ChromaDB query
Returns:
list: List of course objects with code, title, credits, type, and level classification
"""
courses = []
course_codes_seen = set()
# Parse through each document
for i, (doc, metadata) in enumerate(zip(results["documents"][0], results["metadatas"][0])):
# Extract course section type
section_type = metadata.get("section_type", "unknown")
# Extract course codes using regex
# Format: DEPT-123 Course Title (3)
course_pattern = r'([A-Z]{2,4}-\d{3})\s+([^(]+)(?:\s*\((\d+(?:\.\d+)?)\))?'
for line in doc.split('\n'):
matches = re.findall(course_pattern, line)
for match in matches:
code = match[0].strip()
title = match[1].strip() if len(match) > 1 else ""
credits = match[2] if len(match) > 2 and match[2] else "N/A"
# Skip duplicates
if code in course_codes_seen:
continue
course_codes_seen.add(code)
# Get course level classification
classification = classify_course_level(code)
courses.append({
"code": code,
"title": title,
"credits": credits,
"type": section_type,
"course_level": classification["course_level"],
"level_description": classification["level_description"]
})
return courses
def format_courses_for_display(courses):
"""
Format the courses into a readable string with level information.
Args:
courses (list): List of course objects
Returns:
str: Formatted string with course information grouped by type and level
"""
if not courses:
return "No courses found."
# Group courses by type
grouped_courses = {
"required_courses": [],
"elective_courses": [],
"option_group": [],
"small_option_group": []
}
for course in courses:
course_type = course["type"]
if course_type in grouped_courses:
grouped_courses[course_type].append(course)
# Format the output
output = []
# Add required courses
if grouped_courses["required_courses"]:
output.append("**Required Courses:**")
output.append("These courses must be completed by all students in the program:")
# Sort required courses by level (undergraduate first, then graduate)
level_priority = {
"undergraduate": 1,
"graduate_open": 2,
"graduate_core": 3,
"graduate_advanced": 4,
"other": 5,
"unknown": 6
}
# Sort the courses by level priority
sorted_courses = sorted(
grouped_courses["required_courses"],
key=lambda x: level_priority.get(x.get("course_level", "unknown"), 999)
)
# Group by level for clearer presentation
current_level = None
for course in sorted_courses:
level = course.get("course_level", "unknown")
level_desc = course.get("level_description", "")
# Add level header if changed
if level != current_level:
current_level = level
if level_desc:
output.append(f"\n{level_desc.upper()}:")
output.append(f"- {course['code']} {course['title']} ({course['credits']})")
output.append("")
# Add small option groups (either X or Y)
if grouped_courses["small_option_group"]:
output.append("**Alternative Course Options:**")
output.append("Students must complete ONE course from each of these groups:")
# Group the courses by their option group
group_id = 1
# First gather all courses into groups
groups = {}
for course in grouped_courses["small_option_group"]:
# Extract group info from metadata if available, or use sequential numbering
group_id = course.get("group_id", group_id)
if group_id not in groups:
groups[group_id] = []
groups[group_id].append(course)
# Now display the groups
for group_id, course_list in groups.items():
output.append(f"\nOption Group {group_id}:")
# Sort by course level
level_priority = {
"undergraduate": 1,
"graduate_open": 2,
"graduate_core": 3,
"graduate_advanced": 4,
"other": 5,
"unknown": 6
}
sorted_courses = sorted(
course_list,
key=lambda x: level_priority.get(x.get("course_level", "unknown"), 999)
)
for course in sorted_courses:
level_desc = course.get("level_description", "")
output.append(f"- {course['code']} {course['title']} ({course['credits']}) - {level_desc}")
output.append("")
# Add option groups (choose one or more)
if grouped_courses["option_group"]:
output.append("**Option Groups:**")
output.append("Students must select courses from the following groups according to program requirements:")
# Sort by course level
level_priority = {
"undergraduate": 1,
"graduate_open": 2,
"graduate_core": 3,
"graduate_advanced": 4,
"other": 5,
"unknown": 6
}
sorted_courses = sorted(
grouped_courses["option_group"],
key=lambda x: level_priority.get(x.get("course_level", "unknown"), 999)
)
# Group by level for clearer presentation
current_level = None
for course in sorted_courses:
level = course.get("course_level", "unknown")
level_desc = course.get("level_description", "")
# Add level header if changed
if level != current_level:
current_level = level
if level_desc:
output.append(f"\n{level_desc.upper()}:")
output.append(f"- {course['code']} {course['title']} ({course['credits']})")
output.append("")
# Add elective courses
if grouped_courses["elective_courses"]:
output.append("**Elective Courses:**")
output.append("Students may choose from these optional courses to fulfill elective requirements:")
# Sort by course level
level_priority = {
"undergraduate": 1,
"graduate_open": 2,
"graduate_core": 3,
"graduate_advanced": 4,
"other": 5,
"unknown": 6
}
sorted_courses = sorted(
grouped_courses["elective_courses"],
key=lambda x: level_priority.get(x.get("course_level", "unknown"), 999)
)
# Group by level for clearer presentation
current_level = None
for course in sorted_courses:
level = course.get("course_level", "unknown")
level_desc = course.get("level_description", "")
# Add level header if changed
if level != current_level:
current_level = level
if level_desc:
output.append(f"\n{level_desc.upper()}:")
output.append(f"- {course['code']} {course['title']} ({course['credits']})")
return "\n".join(output)
def process_program_query(query, program_name=None):
"""
Check if the query is about program requirements or courses and extract program name.
Args:
query (str): The user's query
program_name (str, optional): Pre-identified program name
Returns:
dict: Information about the query intent and program
"""
logger.info(f"Processing query in process_program_query start line 446: {repr(query)}")
logger.info(f"[process_program_query] Got query: {repr(query)} | Type: {type(query)} | ID: {id(query)}")
if not isinstance(query, str):
logger.warning(f"Query is not a string! Got {type(query)}: {repr(query)}")
return {
"is_course_query": False,
"course_type": None,
"program_name": program_name,
"query_type": "invalid"
}
query_lower = query.lower()
result = {
"is_course_query": False,
"course_type": None,
"program_name": program_name,
"query_type": "general"
}
# Course query patterns
course_query_patterns = [
# Direct questions about specific course types
r'what(?:\s+are)?(?:\s+the)?\s+(required|core|elective|optional|must[\s-]complete)\s+courses\s+for\s+(?:the\s+)?(.+?)(?:\s+program|\s+major|\s+degree|\s+minor)?$',
# Questions about program requirements in general
r'(?:the\s+)?(.+?)(?:\s+program|\s+major|\s+degree|\s+minor)(?:\s+requirements|(?:\s+)courses)',
# Questions about what courses to take
r'what\s+courses\s+(?:do\s+I|does\s+one|should\s+I)\s+(?:need\s+to|have\s+to|must)\s+(?:take|complete)\s+for\s+(?:the\s+)?(.+?)(?:\s+program|\s+major|\s+degree|\s+minor)?',
# Alternate phrasing about "must complete" courses
r'what(?:\s+courses)?\s+(?:do\s+I|does\s+one|should\s+I)\s+(?:have\s+to|need\s+to|must)\s+complete\s+for\s+(?:the\s+)?(.+?)(?:\s+program|\s+major|\s+degree|\s+minor)?'
]
# Try each pattern
for pattern in course_query_patterns:
match = re.search(pattern, query_lower)
if match:
result["is_course_query"] = True
# Extract course type and program name
if len(match.groups()) > 1:
course_type = match.group(1)
program_name = match.group(2)
# Map course type
if course_type in ['required', 'core', 'must-complete', 'must complete']:
result["course_type"] = 'required_courses'
elif course_type in ['elective', 'optional']:
result["course_type"] = 'elective_courses'
else:
result["course_type"] = 'all'
result["program_name"] = program_name
result["query_type"] = "course_requirements"
break
elif len(match.groups()) == 1:
# Just program name, no course type specified
program_name = match.group(1)
result["program_name"] = program_name
result["course_type"] = 'all'
result["query_type"] = "program_requirements"
break
return result
def expand_query_with_academic_terms(query):
"""
Expand the query with alternate academic terminology to improve retrieval.
This function identifies key terms in the query and adds synonyms/alternate
phrasings that are common in academic contexts, focusing especially on
course requirement terminology.
Args:
query (str): The original user query
Returns:
str: Expanded query with alternate terminology
"""
# Define academic term mappings (original term -> list of synonyms)
academic_term_mappings = {
"required": ["must complete", "must take", "mandatory", "core", "required", "requirement", "capstone"],
"elective": ["optional", "elective", "choice", "select from"],
"prerequisite": ["prereq", "prerequisite", "before taking", "prior to"],
"corequisite": ["coreq", "corequisite", "concurrent", "alongside"],
"credit": ["credit hour", "credit", "unit"],
"major": ["major", "program", "degree", "concentration"],
"minor": ["minor", "secondary field"],
"course": ["course", "class", "subject"]
}
# Check if the query contains any of our mapped terms
expanded_terms = []
logger.info(f"Processing query for mapped terms: {repr(query)}")
query_lower = query.lower()
for original_term, synonyms in academic_term_mappings.items():
if original_term in query_lower:
# Add synonyms of terms that appear in the query
expanded_terms.extend(synonyms)
# If we found terms to expand
if expanded_terms:
# Create an expanded query by adding synonyms
# We use a format that works well with sentence transformers
expanded_query = f"{query} {' '.join(expanded_terms)}"
return expanded_query
# If no expansion needed, return original
return query
def get_program_courses(program_name, course_type='all', n_results=10):
"""
Get specific course information for a program based on course type.
Args:
program_name (str): Name of the academic program
course_type (str): Type of courses to retrieve ('required_courses',
'elective_courses', 'option_group', 'small_option_group', or 'all')
n_results (int): Number of results to return
Returns:
dict: Results containing course information
"""
# Get ChromaDB manager
chroma_manager = global_chroma_manager
# Build the where clause based on the requested course type
if course_type == 'all':
where_clause = {
"$or": [
{"section_type": "required_courses"},
{"section_type": "elective_courses"},
{"section_type": "option_group"},
{"section_type": "small_option_group"}
]
}
else:
where_clause = {"section_type": course_type}
# Add program name to the query
if program_name and program_name.lower() != "any":
# Use a more flexible approach for program name matching
query = f"{course_type} for {program_name} program"
# Add program name condition to where clause with flexible matching
where_clause["$and"] = [
{"type": "program"},
{"$or": [
{"program_name": {"$contains": program_name.lower()}},
{"parent_title": {"$contains": program_name.lower()}}
]}
]
else:
query = f"{course_type}"
where_clause["type"] = "program"
# Expand query with academic terminology
expanded_query = expand_query_with_academic_terms(query)
# Execute the query with filtering
results = chroma_manager.query(
query_text=expanded_query,
where=where_clause,
n_results=n_results
)
return results
def get_program_course_information(program_name, course_type='all'):
"""
Get formatted course information for a program.
Args:
program_name (str): Name of the academic program
course_type (str): Type of courses to retrieve
Returns:
str: Formatted course information
"""
results = get_program_courses(program_name, course_type, n_results=15)
courses = extract_courses_from_results(results)
return format_courses_for_display(courses)
# Enhanced program requirements extraction with better program differentiation
def extract_validated_program_requirements(soup, program_name, department, url, debug_mode=False):
"""
Extract program requirements with strict validation to avoid mixing electives with requirements.
Carefully differentiates between similarly named programs.
Args:
soup (BeautifulSoup): Parsed HTML content
program_name (str): Name of the program
department (str): Department name
url (str): URL of the page
debug_mode (bool): Whether to log debug information
Returns:
dict: Validated program requirements
"""
logger.info(f"Extracting validated requirements for: {program_name}")
# Initialize structured requirements
requirements = {
"program_name": program_name,
"department": department,
"url": url,
"core_requirements": [],
"electives": [],
"capstone": None,
"total_credits": 0
}
# Determine exact program type to avoid confusion between similar programs
# Normalize program name for comparison
normalized_program = program_name.lower().strip()
# Identify the specific program
if normalized_program == "bs data science" or normalized_program == "b.s. data science":
program_type = "BS_DATA_SCIENCE"
elif normalized_program == "bs data sciences" or normalized_program == "b.s. data sciences":
program_type = "BS_DATA_SCIENCES" # Note the plural
elif normalized_program == "ms data science" or normalized_program == "m.s. data science":
program_type = "MS_DATA_SCIENCE"
elif normalized_program == "ms data sciences" or normalized_program == "m.s. data sciences":
program_type = "MS_DATA_SCIENCES" # Note the plural
else:
# Generic handling for other programs
program_type = "OTHER"
requirements["program_type"] = program_type
if debug_mode:
logger.debug(f"Identified program type: {program_type}")
# Look for specific requirement sections
requirement_sections = []
# Find headers that likely contain requirement information
requirement_headers = soup.find_all(['h2', 'h3', 'h4'], string=lambda text: text and any(keyword in text.lower()
for keyword in ['requirement', 'core', 'foundation', 'required', 'curriculum',
'major', 'course', 'capstone', 'thesis', 'project', 'elective']))
for header in requirement_headers:
section_title = header.get_text(strip=True)
section_content = []
# Get all content until the next header
current = header.next_sibling
while current and not (hasattr(current, 'name') and current.name in ['h2', 'h3', 'h4']):
if hasattr(current, 'get_text'):
text = current.get_text(strip=True)
if text:
section_content.append(text)
elif isinstance(current, str) and current.strip():
section_content.append(current.strip())
current = current.next_sibling
if section_content:
section_text = ' '.join(section_content)
# Categorize the section based on its title and content
section_type = "unknown"
# Check for capstone specifically first (highest priority)
if any(keyword in section_title.lower() for keyword in ['capstone', 'thesis', 'project', 'senior']):
section_type = "capstone"
requirements["capstone"] = {
"title": section_title,
"content": section_text,
"courses": extract_course_codes(section_text)
}
# Validate capstone based on program type
if program_type == "BS_DATA_SCIENCE":
# Check for STAT-427 for BS Data Science
if "stat-427" in section_text.lower() or "stat 427" in section_text.lower():
requirements["capstone"]["validated"] = True
requirements["capstone"]["credits"] = 3
requirements["capstone"]["course_title"] = "Statistical Machine Learning"
else:
requirements["capstone"]["validated"] = False
else:
# For other programs, just extract course information
requirements["capstone"]["validated"] = True # Assume valid for other programs
# Check for electives
elif any(keyword in section_title.lower() for keyword in ['elective', 'optional', 'choose']):
section_type = "electives"
requirements["electives"].append({
"title": section_title,
"content": section_text,
"courses": extract_course_codes(section_text)
})
# Check for core requirements
elif any(keyword in section_title.lower() for keyword in ['requirement', 'core', 'required', 'foundation']):
section_type = "core"
requirements["core_requirements"].append({
"title": section_title,
"content": section_text,
"courses": extract_course_codes(section_text)
})
# Add this section to our list
requirement_sections.append({
"title": section_title,
"content": section_text,
"type": section_type
})
# Extract total credits information
credit_patterns = [
r'total\s+of\s+(\d+)\s+credit',
r'(\d+)\s+credits?\s+(?:are|is)\s+required',
r'requires\s+(\d+)\s+credits?',
r'minimum\s+of\s+(\d+)\s+credits?'
]
full_text = soup.get_text()
for pattern in credit_patterns:
match = re.search(pattern, full_text, re.IGNORECASE)
if match:
try:
requirements["total_credits"] = int(match.group(1))
break
except ValueError:
pass
# Program-specific validation
if program_type == "BS_DATA_SCIENCE":
# Known core courses for BS Data Science at American University
expected_core_courses = [
"MATH-221", "MATH-222", "MATH-313", "STAT-203", "STAT-302",
"CSC-280", "DATA-320", "STAT-412", "STAT-415"
]
# Validate that all expected courses are in our core requirements
found_courses = []
for section in requirements["core_requirements"]:
for course in section["courses"]:
course_clean = clean_course_code(course)
if course_clean in expected_core_courses and course_clean not in found_courses:
found_courses.append(course_clean)
# Check coverage of expected courses
missing_courses = [c for c in expected_core_courses if c not in found_courses]
requirements["core_coverage"] = len(found_courses) / len(expected_core_courses)
if debug_mode:
logger.debug(f"Found {len(found_courses)}/{len(expected_core_courses)} expected core courses")
if missing_courses:
logger.debug(f"Missing core courses: {', '.join(missing_courses)}")
elif program_type == "MS_DATA_SCIENCE":
# Different validation for MS Data Science
# (Add expected courses for MS Data Science when available)
pass
# Log the results
logger.info(f"Extracted {len(requirements['core_requirements'])} core requirement sections, {len(requirements['electives'])} elective sections")
return requirements
def extract_course_codes(text):
"""Extract course codes from text using regex."""
# Pattern for course codes like STAT-203, MATH 221, CSC280, etc.
pattern = r'([A-Z]{2,4})[\s\-]?(\d{3,4}[A-Z]?)'
matches = re.findall(pattern, text, re.IGNORECASE)
# Format matches into standard course codes
courses = [f"{dept.upper()}-{num}" for dept, num in matches]
return courses
def clean_course_code(course_code):
"""Standardize course code format to DEPT-NUM."""
parts = re.match(r'([A-Z]{2,4})[\s\-]?(\d{3,4}[A-Z]?)', course_code, re.IGNORECASE)
if parts:
return f"{parts.group(1).upper()}-{parts.group(2)}"
return course_code
# Enhanced retrieval function to query for program requirements
def retrieve_validated_program_requirements(chroma_manager, program_name, debug_mode=False):
"""
Retrieve and validate program requirements from ChromaDB.
Args:
chroma_manager: ChromaDB manager instance
program_name (str): Name of the program to retrieve
debug_mode (bool): Whether to log debug information
Returns:
dict: Validated program requirements
"""
# Determine exact program type to avoid confusion between similar programs
# Normalize program name for comparison
normalized_program = program_name.lower().strip()
# Identify the specific program
if normalized_program == "bs data science" or normalized_program == "b.s. data science":
program_type = "BS_DATA_SCIENCE"
elif normalized_program == "bs data sciences" or normalized_program == "b.s. data sciences":
program_type = "BS_DATA_SCIENCES" # Note the plural
elif normalized_program == "ms data science" or normalized_program == "m.s. data science":
program_type = "MS_DATA_SCIENCE"
elif normalized_program == "ms data sciences" or normalized_program == "m.s. data sciences":
program_type = "MS_DATA_SCIENCES" # Note the plural
else:
# Generic handling for other programs
program_type = "OTHER"
if debug_mode:
logger.debug(f"Retrieving requirements for: {program_name} (Type: {program_type})")
# Query for program requirements with exact program name match
query = f"requirements for {program_name}"
# First try to find the program summary with exact program_name match
summary_results = chroma_manager.query(
query_text=query,
n_results=5,
metadata_filter={"program_name": program_name, "type": "program", "section_type": "program_summary"}
)
if summary_results and len(summary_results['ids']) > 0:
# We found a program summary, which is most reliable
if debug_mode:
logger.debug(f"Found program summary for {program_name}")
# Parse the summary to extract structured requirements
summary_text = summary_results['documents'][0]
# Extract core requirements, electives, and capstone from summary
requirements = {
"program_name": program_name,
"program_type": program_type,
"department": summary_results['metadatas'][0].get('department', 'Unknown Department'),
"core_requirements": [],
"electives": [],
"capstone": None
}
# Extract major requirements from summary
if "REQUIRED COURSES" in summary_text:
core_section = summary_text.split("REQUIRED COURSES")[1].split("ELECTIVE COURSES")[0] if "ELECTIVE COURSES" in summary_text else summary_text.split("REQUIRED COURSES")[1]
requirements["core_requirements"] = [{
"title": "Major Requirements",
"content": core_section,
"courses": extract_course_codes(core_section)
}]
# Extract electives
if "ELECTIVE COURSES" in summary_text:
elective_section = summary_text.split("ELECTIVE COURSES")[1]
requirements["electives"] = [{
"title": "Elective Courses",
"content": elective_section,
"courses": extract_course_codes(elective_section)
}]
return requirements
# If we don't find a summary, query for individual requirement sections
section_results = chroma_manager.query(
query_text=query,
n_results=10,
metadata_filter={"program_name": program_name, "type": "program"}
)
if not section_results or len(section_results['ids']) == 0:
logger.warning(f"No results found for {program_name} requirements")
return None
# Parse the results to extract structured requirements
requirements = {
"program_name": program_name,
"program_type": program_type,
"department": section_results['metadatas'][0].get('department', 'Unknown Department'),
"core_requirements": [],
"electives": [],
"capstone": None
}
# Process each result
for i, doc in enumerate(section_results['documents']):
metadata = section_results['metadatas'][i]
section_type = metadata.get('section_type', 'unknown')
title = metadata.get('title', f"Section {i+1}")
# Determine if this section contains requirements, electives, or capstone
if section_type in ['required_courses', 'option_group']:
requirements["core_requirements"].append({
"title": title,
"content": doc,
"courses": extract_course_codes(doc)
})
elif section_type == 'elective_courses':
requirements["electives"].append({
"title": title,
"content": doc,
"courses": extract_course_codes(doc)
})
elif "capstone" in title.lower() or "senior" in title.lower():
requirements["capstone"] = {
"title": title,
"content": doc,
"courses": extract_course_codes(doc)
}
return requirements
# Function to generate accurate program requirement response
def generate_accurate_requirements_response(requirements, program_name):
"""
Generate an accurate response about program requirements.
Enhanced to handle the updated classification where required electives and minors
are properly included in the required_courses category.
Args:
requirements (dict): Validated program requirements
program_name (str): Name of the program
Returns:
str: Formatted response with accurate requirements
"""
if not requirements:
return f"I'm sorry, but I couldn't find specific requirements for the {program_name} program. Please check the department website for the most up-to-date information."
response = [f"# {program_name} Requirements", ""]
# Add department information
if requirements.get("department"):
response.append(f"**Department:** {requirements['department']}")
response.append("")
# Add total credits if available
if requirements.get("total_credits"):
response.append(f"**Total Credits Required:** {requirements['total_credits']}")
response.append("")
# Add core requirements
if requirements.get("core_requirements"):
response.append("## Core Requirements")
# Track which sections we've already displayed to avoid duplication
displayed_sections = set()
for section in requirements["core_requirements"]:
# Skip if we've already displayed this section (by title)
if section['title'] in displayed_sections:
continue
response.append(f"**{section['title']}**")
displayed_sections.add(section['title'])
# Format course list neatly if we have extracted courses
if section.get("courses"):
for course in section["courses"]:
# Try to find the course name/title from our database
# For now, just list the course code
response.append(f"- {course}")
else:
# Just add the raw content
response.append(section["content"])
response.append("")
# Add capstone if available
if requirements.get("capstone"):
response.append("## Capstone Experience")
capstone = requirements["capstone"]
response.append(f"**{capstone['title']}**")
# Special handling for BS Data Science capstone
program_type = requirements.get("program_type", "OTHER")
if program_type == "BS_DATA_SCIENCE" and capstone.get("validated", False):
response.append("**STAT-427: Statistical Machine Learning (3 credits)**")
response.append("This course serves as the capstone experience for the Data Science program.")
elif capstone.get("courses"):
for course in capstone["courses"]:
response.append(f"- {course}")
else:
response.append(capstone["content"])
response.append("")
# Add minor or second major requirements if available
# This might now be included in core_requirements, so check if it wasn't displayed yet
if requirements.get("minor_requirement") and not any(
"minor" in section['title'].lower() for section in requirements.get("core_requirements", [])
):
response.append("## Minor or Second Major Requirement")
minor = requirements["minor_requirement"]
response.append(f"**{minor['title']}**")
response.append(minor["content"])
response.append("")
# Add required electives
# Check for electives that are required (might now be in core_requirements)
required_electives = []
elective_titles = set()
# First, find elective sections that might be in core requirements
if requirements.get("core_requirements"):
for section in requirements["core_requirements"]:
if 'elective' in section['title'].lower() and section['title'] not in elective_titles:
required_electives.append(section)
elective_titles.add(section['title'])
# Then add any from the explicit electives category
if requirements.get("electives"):
for section in requirements["electives"]:
if section['title'] not in elective_titles:
required_electives.append(section)
elective_titles.add(section['title'])
# Display required electives
if required_electives:
response.append("## Elective Requirements")
for section in required_electives:
response.append(f"**{section['title']}**")
# Format course list neatly if we have extracted courses
if section.get("courses"):
for course in section["courses"]:
response.append(f"- {course}")
else:
response.append(section["content"])
response.append("")
# Add option groups if available
if requirements.get("option_groups"):
response.append("## Option Groups")
for section in requirements["option_groups"]:
response.append(f"**{section['title']}**")
# Format course list neatly if we have extracted courses
if section.get("courses"):
for course in section["courses"]:
response.append(f"- {course}")
else:
response.append(section["content"])
response.append("")
# Add a note about accuracy
response.append("*Note: These requirements are subject to change. Please consult with an academic advisor or refer to the official program documentation for the most current information.*")
return "\n".join(response)
# Example usage for BS Data Science
# requirements = retrieve_validated_program_requirements(chroma_manager, "BS Data Science", debug_mode=True)
# response = generate_accurate_requirements_response(requirements, "BS Data Science")
# print(response)
class AcademicChatbot:
"""
A RAG-based chatbot for answering questions about academic programs and courses
using Mistral 7B model and ChromaDB for retrieval.
"""
# Update the __init__ method of the AcademicChatbot class
def __init__(self):
"""Initialize the chatbot with ChromaDB and model configuration."""
# Reuse the existing instance
self.chroma_manager = global_chroma_manager
self.collection = self.chroma_manager.get_collection()
# Use global configuration
self.api_url = MISTRAL_API_URL
self.headers = MISTRAL_HEADERS # Use the globally defined headers
self.conversation_history = []
# Add a check to ensure headers are properly initialized
if not self.headers:
logger.warning("Mistral API headers not properly configured. Regenerate API credentials.")
raise ValueError("Failed to initialize Mistral API headers. Check API key configuration.")
def add_message(self, role: str, content: str):
"""Add a message to the conversation history."""
self.conversation_history.append({"role": role, "content": content})
def clear_history(self):
"""Clear the conversation history."""
self.conversation_history = []
def get_history(self):
"""Get the conversation history."""
return self.conversation_history
def get_url_from_metadata(self, metadata):
"""Extract URL from metadata, checking multiple possible field names."""
# Check various possible field names for URLs
url_field_names = ['url', 'course_url', 'source_url', 'link', 'href', 'source']
for field in url_field_names:
if field in metadata and metadata[field]:
return metadata[field]
# If no URL field found, return empty string
return ''
def retrieve_context(self, query: str, n_results: int = 8) -> Tuple[List[str], List[Dict[str, Any]]]:
"""
Retrieve diverse and relevant documents from ChromaDB based on the query.
Args:
query: The user's question
n_results: Number of documents to retrieve
Returns:
Tuple containing (contexts, metadata)
"""
logger.info(f"Retrieving context for query: {query}")
# Use expanded query with academic terminology
expanded_query = expand_query_with_academic_terms(query)
logger.info(f"Expanded query: {expanded_query}")
# Retrieve more documents than needed to improve diversity
retrieve_count = min(n_results * 3, 25) # Limit to 25 to avoid excessive retrieval
results = self.chroma_manager.query(expanded_query, n_results=retrieve_count)
# Extract the documents and their metadata
contexts = []
metadata_list = []
if 'documents' in results and results['documents']:
documents = results['documents'][0]
metadatas = results['metadatas'][0] if 'metadatas' in results and results['metadatas'] else [{}] * len(documents)
# Track URLs to ensure diversity
seen_urls = set()
seen_titles = set()
# First pass: group by URL and title
doc_groups = {}
for doc, meta in zip(documents, metadatas):
url = meta.get('url', '') if meta else ''
title = meta.get('title', '') if meta else ''
key = (url, title)
if key not in doc_groups:
doc_groups[key] = []
doc_groups[key].append((doc, meta))
# Second pass: select one document from each group until we have enough
while len(contexts) < n_results and doc_groups:
for key in list(doc_groups.keys()):
if doc_groups[key]:
doc, meta = doc_groups[key].pop(0)
contexts.append(doc)
metadata_list.append(meta)
if not doc_groups[key]: # If group is empty, remove it
del doc_groups[key]
if len(contexts) >= n_results:
break
# If we still need more documents, fill in from the original list
if len(contexts) < n_results:
i = 0
while len(contexts) < n_results and i < len(documents):
if documents[i] not in contexts:
contexts.append(documents[i])
metadata_list.append(metadatas[i])
i += 1
logger.info(f"Retrieved {len(contexts)} context documents")
return contexts, metadata_list
def merge_program_documents(self, docs, metas, max_chars=15000):
"""Merge documents by category to create comprehensive context."""
# Create category containers for all program information sections
categories = {
"comprehensive": {"content": "", "sources": []},
"core": {"content": "", "sources": []},
"electives": {"content": "", "sources": []},
"minor": {"content": "", "sources": []},
"capstone": {"content": "", "sources": []},
"ethics": {"content": "", "sources": []},
"admission": {"content": "", "sources": []},
"au_core": {"content": "", "sources": []},
"university_requirements": {"content": "", "sources": []},
"major_requirements": {"content": "", "sources": []},
"other": {"content": "", "sources": []}
}
# Process each document and categorize
for i, (doc, meta) in enumerate(zip(docs, metas)):
title = meta.get("title", "").lower() if meta else ""
# Determine the appropriate category
if "complete" in title and "requirements" in title:
category = "comprehensive"
elif "elective" in title:
category = "electives"
elif "minor" in title or "second major" in title:
category = "minor"
elif "capstone" in title:
category = "capstone"
elif "ethics" in title:
category = "ethics"
elif "admission" in title or "apply" in title:
category = "admission"
elif "au core" in title or "general education" in title:
category = "au_core"
elif "university requirement" in title:
category = "university_requirements"
elif "major requirement" in title:
category = "major_requirements"
elif any(term in title for term in ["statistics", "data science essentials", "intermediate"]):
category = "core"
else:
category = "other"
# Add content to the appropriate category
categories[category]["content"] += f"\n\n## {meta.get('title', '')}\n{doc}"
categories[category]["sources"].append(i)
# Create output documents ensuring all major categories are included
output_docs = []
output_metas = []
source_indices = set()
# First add comprehensive document if available
if categories["comprehensive"]["content"]:
output_docs.append(categories["comprehensive"]["content"])
output_metas.append({"title": "Complete Program Requirements"})
source_indices.update(categories["comprehensive"]["sources"])
# Create a document for university and general requirements
general_content = "# General Program Requirements\n"
general_sources = []
# Add sections for university requirements, AU Core, admission
for cat_name, display_name in [
("university_requirements", "University Requirements"),
("au_core", "AU Core Requirements"),
("admission", "Admission Requirements")
]:
if categories[cat_name]["content"]:
general_content += f"\n\n# {display_name}\n{categories[cat_name]['content']}"
general_sources.extend(categories[cat_name]["sources"])
# Add general requirements document if not empty
if general_content.strip() != "# General Program Requirements":
output_docs.append(general_content)
output_metas.append({"title": "General Requirements"})
source_indices.update(general_sources)
# Create a document for major requirements
major_content = "# Major Requirements\n"
# Add major requirements section
if categories["major_requirements"]["content"]:
major_content += categories["major_requirements"]["content"]
# Add core course sections
if categories["core"]["content"]:
major_content += "\n\n# Core Course Requirements\n" + categories["core"]["content"]
# Add the major requirements document
if major_content.strip() != "# Major Requirements":
output_docs.append(major_content)
output_metas.append({"title": "Major Requirements"})
source_indices.update(categories["major_requirements"]["sources"])
source_indices.update(categories["core"]["sources"])
# Create a document for additional requirements
additional_content = "# Additional Program Requirements\n"
additional_sources = []
# Add sections for electives, minor, capstone, ethics
for cat_name, display_name in [
("electives", "Elective Requirements"),
("minor", "Minor or Second Major Requirements"),
("capstone", "Capstone Requirements"),
("ethics", "Ethics Requirements")
]:
if categories[cat_name]["content"]:
additional_content += f"\n\n# {display_name}\n{categories[cat_name]['content']}"
additional_sources.extend(categories[cat_name]["sources"])
# Add additional requirements document
if additional_content.strip() != "# Additional Program Requirements":
output_docs.append(additional_content)
output_metas.append({"title": "Additional Requirements"})
source_indices.update(additional_sources)
# Check if we're under the character limit
total_chars = sum(len(doc) for doc in output_docs)
# Add other content if space permits
if categories["other"]["content"] and total_chars + len(categories["other"]["content"]) <= max_chars:
other_content = "# Other Program Information\n" + categories["other"]["content"]
output_docs.append(other_content)
output_metas.append({"title": "Other Information"})
source_indices.update(categories["other"]["sources"])
# Make sure we have all metadata for sources
all_sources = []
for i in range(len(metas)):
all_sources.append(metas[i])
logger.info(f"Merged {len(docs)} documents into {len(output_docs)} comprehensive documents (Total chars: {sum(len(d) for d in output_docs)})")
return output_docs, all_sources
# Find and prioritize required course documents for this specific program
def trim_documents(self, docs, metas, max_chars=12000):
"""Trim documents to avoid token overload while ensuring all requirements are included."""
output_docs, output_metas = [], []
total_chars = 0
# First identify documents for this program that are required_courses
query = getattr(self, "current_query", None)
query_info = process_program_query(query) if isinstance(query, str) else None
if query_info:
logger.info(f"[trim_documents] query_info: {query_info} | program_name: {query_info.get('program_name')}")
program_name = (query_info.get("program_name") or "").lower() if query_info else ""
# If this is a program requirement query, prioritize required documents
if program_name:
# First add comprehensive document if available
comprehensive_index = None
for i, meta in enumerate(metas):
title = meta.get("title", "").lower() if meta else ""
if "complete" in title and "requirement" in title and program_name in meta.get("program_name", "").lower():
comprehensive_index = i
break
if comprehensive_index is not None and total_chars + len(docs[comprehensive_index]) <= max_chars:
output_docs.append(docs[comprehensive_index])
output_metas.append(metas[comprehensive_index])
total_chars += len(docs[comprehensive_index])
# Then add all required course documents for this program
for i, meta in enumerate(metas):
# Skip if already added
if i == comprehensive_index:
continue
# Check if document is a required course for this program
is_required = meta.get("section_type", "") == "required_courses"
is_this_program = program_name in meta.get("program_name", "").lower()
# Add if it's a required document and fits within our limit
if is_required and is_this_program and total_chars + len(docs[i]) <= max_chars:
output_docs.append(docs[i])
output_metas.append(metas[i])
total_chars += len(docs[i])
# Make sure minor/second major is included
has_minor = any("minor" in meta.get("title", "").lower() or "second major" in meta.get("title", "").lower()
for meta in output_metas)
if not has_minor:
for i, meta in enumerate(metas):
if "minor" in meta.get("title", "").lower() or "second major" in meta.get("title", "").lower():
if total_chars + len(docs[i]) <= max_chars:
output_docs.append(docs[i])
output_metas.append(metas[i])
total_chars += len(docs[i])
break
# Make sure capstone is included
has_capstone = any("capstone" in meta.get("title", "").lower() for meta in output_metas)
if not has_capstone:
for i, meta in enumerate(metas):
if "capstone" in meta.get("title", "").lower():
if total_chars + len(docs[i]) <= max_chars:
output_docs.append(docs[i])
output_metas.append(metas[i])
total_chars += len(docs[i])
break
# Make sure electives are included
has_electives = any("elective" in meta.get("title", "").lower() for meta in output_metas)
if not has_electives:
for i, meta in enumerate(metas):
if "elective" in meta.get("title", "").lower():
if total_chars + len(docs[i]) <= max_chars:
output_docs.append(docs[i])
output_metas.append(metas[i])
total_chars += len(docs[i])
break
# If we haven't added any documents yet (or this isn't a program query),
# fall back to the original trim behavior
if not output_docs:
for doc, meta in zip(docs, metas):
# Always include at least one document
if len(output_docs) == 0 or total_chars + len(doc) <= max_chars:
output_docs.append(doc)
output_metas.append(meta)
total_chars += len(doc)
else:
break
logger.info(f"Trimmed documents from {len(docs)} to {len(output_docs)} (Total chars: {total_chars})")
return output_docs, output_metas
def generate_response(self, query: str, contexts: List[str],
metadata: List[Dict[str, Any]], temperature: float = 0.7) -> str:
"""
Generate a response using Mistral 7B with retrieved contexts.
Args:
query: The user's question
contexts: Retrieved document contents
metadata: Metadata for the retrieved documents
temperature: Controls randomness in generation
Returns:
Generated response
"""
logger.info(f"Generating response for query: {query}")
# Store current query for use in other methods
self.current_query = query
if not isinstance(query, str) or not query.strip():
logger.warning("Query is missing or not a string.")
return "No query provided."
# First check if this is a program course query that we should handle specially
query_info = process_program_query(query)
if query_info["is_course_query"] and query_info["program_name"]:
logger.info(f"Detected course query for program: {query_info['program_name']}, type: {query_info['course_type']}")
# First try to use the validated program requirements approach
try:
# Use the validated program requirements retrieval
requirements = retrieve_validated_program_requirements(
self.chroma_manager,
query_info["program_name"],
debug_mode=False
)
# If we have validated requirements, use them to generate a response
if requirements:
logger.info(f"Using validated requirements for {query_info['program_name']}")
response = generate_accurate_requirements_response(
requirements,
query_info["program_name"]
)
# Add sources
sources = []
for i, meta in enumerate(metadata):
if meta:
title = meta.get("title", "")
url = self.get_url_from_metadata(meta)
if url:
if title:
citation = f"[{i+1}] {title} - {url}"
else:
citation = f"[{i+1}] Program information - {url}"
else:
if title:
citation = f"[{i+1}] {title}"
else:
citation = f"[{i+1}] Program information"
sources.append(citation)
if sources:
# Identify sources referenced in response
used_source_indexes = set()
for i in range(len(sources)):
# Look for [1], [2], etc. references in the text
if f"[{i+1}]" in response:
used_source_indexes.add(i)
# If we found referenced sources, show them first
if used_source_indexes:
response += "\n\nSources Referenced in Response:"
for i in sorted(used_source_indexes):
response += f"\n{sources[i]}"
# Add all retrieved sources section
response += "\n\nAll Retrieved Sources:"
for source in sources:
response += f"\n{source}"
return response
except Exception as e:
logger.error(f"Error using validated requirements approach: {str(e)}")
# Fall back to the regular course information retrieval
# Fall back to the basic course information approach if validation fails
try:
program_courses = get_program_course_information(
query_info["program_name"],
query_info["course_type"]
)
# If we got results, return them directly
if program_courses and "No courses found" not in program_courses:
program_name = query_info["program_name"].title()
# Create a nicely formatted response with introduction
response = f"Here's information about the {program_name} program courses:\n\n{program_courses}"
# Add sources from metadata
sources = []
for i, meta in enumerate(metadata):
if meta:
title = meta.get("title", "")
url = self.get_url_from_metadata(meta)
if url:
if title:
citation = f"[{i+1}] {title} - {url}"
else:
citation = f"[{i+1}] Program information - {url}"
else:
if title:
citation = f"[{i+1}] {title}"
else:
citation = f"[{i+1}] Program information"
sources.append(citation)
if sources:
# Identify sources referenced in response
used_source_indexes = set()
for i in range(len(sources)):
# Look for [1], [2], etc. references in the text
if f"[{i+1}]" in response:
used_source_indexes.add(i)
# If we found referenced sources, show them in a separate section
if used_source_indexes:
response += "\n\nSources Referenced in Response:"
for i in sorted(used_source_indexes):
response += f"\n{sources[i]}"
# Add all retrieved sources section
response += "\n\nAll Retrieved Sources:"
for source in sources:
response += f"\n{source}"
return response
except Exception as e:
logger.error(f"Error handling specialized course query: {str(e)}")
# Fall back to regular processing if there's an error
# Trim documents to avoid token limits
# For program requirement queries, use document merging instead of trimming
if query_info["is_course_query"] and query_info["program_name"]:
contexts, metadata = self.merge_program_documents(contexts, metadata, max_chars=12000)
else:
# For other queries, use regular trimming
contexts, metadata = self.trim_documents(contexts, metadata, max_chars=10000)
# Create a structured context from retrieved documents with their URLs
enhanced_contexts = []
for i, (doc, meta) in enumerate(zip(contexts, metadata)):
source_type = meta.get("type", "document")
title = meta.get("title", "")
url = self.get_url_from_metadata(meta)
# Limit document length to prevent token overflow
doc_preview = doc[:1500] + ("..." if len(doc) > 1500 else "")
# Format document with metadata
doc_header = f"Document {i+1} ({source_type.capitalize()}"
if title:
doc_header += f": {title}"
if url:
doc_header += f" - {url}"
doc_header += "):"
enhanced_contexts.append(f"{doc_header}\n{doc_preview}")
# Include conversation history in the prompt
history_text = ""
if self.conversation_history:
recent_history = self.conversation_history[-3:] # Include only the last 3 messages
if recent_history:
history_text = "### Recent Conversation:\n"
for msg in recent_history:
role = "User" if msg["role"] == "user" else "Assistant"
history_text += f"{role}: {msg['content']}\n\n"
# Format the full prompt with context and query
context_text = "\n\n".join(enhanced_contexts)
prompt = f"""You are an AI assistant answering questions about American University's academic programs and courses.
Use the following documents as your primary source of information.
Important rules:
- If the answer is not explicitly stated, you may reason from the information provided, but explain your reasoning.
- Courses marked as "must be completed", "prerequisites", or "required" are mandatory.
- When you see "one of the following" or "either X or Y", students must choose exactly one course from the options.
- When you see "option group", students must select some number of courses from that group.
- Courses listed as electives form a group from which a certain number must be completed, but not every course.
- Always mention the source document when including specific information.
- If you don't know or the information is not in the documents, be honest about it.
- For Data Science programs, STAT-427 (Statistical Machine Learning) is the 3-credit capstone course.
- Undergraduate courses have numbers 499 and below, graduate courses open to qualified undergraduates have numbers 500-599,
core graduate courses have numbers 600-699, and advanced graduate courses have numbers 700-799.
{history_text if history_text else ""}
### Context:
{context_text}
### Question:
{query}
"""
# For program requirement queries, use a more comprehensive format
# Use different prompts based on query type
logger.info(f"Processing query in process_program_query instructions: {repr(query)}")
if isinstance(query, str) and ("course requirement" in query.lower() or "program requirement" in query.lower()):
prompt += """
IMPORTANT: Your response should include ALL required components for this degree program.
Ensure you cover all sections mentioned in the documents, including:
- All core course requirements with their credit hours
- Any elective requirements with credit hours
- Any minor or second major requirements
- Any capstone or project requirements
Present requirements in a clear, organized format that makes the degree structure easy to understand.
DO NOT OMIT any requirements or sections mentioned in the documents.
"""
prompt += "\n\n### Answer:"
logger.info(f"Processed query in instructions: {repr(query)}")
# Call Hugging Face API
payload = {
"inputs": prompt,
"parameters": {
"max_new_tokens": 4000,
"temperature": temperature,
"top_p": 0.85,
"do_sample": True
}
}
try:
response = requests.post(self.api_url, headers=self.headers, json=payload)
if response.status_code == 200:
# Extract the answer part from the response
generated_text = response.json()[0]["generated_text"]
answer = generated_text.split("### Answer:")[-1].strip()
# IMPORTANT CHANGE: Always replace the sources section
# Remove any existing sources section
if "\n\nSources:" in answer:
answer = answer.split("\n\nSources:")[0].strip()
# Add our properly formatted sources
sources = []
for i, meta in enumerate(metadata):
if meta:
source_type = meta.get("type", "document")
title = meta.get("title", "")
url = self.get_url_from_metadata(meta)
# Build citation with URL
if url:
if title:
citation = f"[{i+1}] {title} - {url}"
else:
citation = f"[{i+1}] {source_type.capitalize()} - {url}"
else:
if title:
citation = f"[{i+1}] {title}"
else:
citation = f"[{i+1}] {source_type.capitalize()}"
sources.append(citation)
# Always add our formatted sources with new organization
if sources:
# Identify sources referenced in response
used_source_indexes = set()
for i in range(len(sources)):
# Look for [1], [2], etc. references in the text
if f"[{i+1}]" in answer:
used_source_indexes.add(i)
# If we found referenced sources, show them in a separate section
if used_source_indexes:
answer += "\n\nSources Referenced in Response:"
for i in sorted(used_source_indexes):
answer += f"\n{sources[i]}"
# Add all retrieved sources section
answer += "\n\nAll Retrieved Sources:"
for source in sources:
answer += f"\n{source}"
return answer
else:
error_msg = f"Error: {response.status_code}, {response.text}"
logger.error(error_msg)
return error_msg
except Exception as e:
error_msg = f"Exception during response generation: {str(e)}"
logger.error(error_msg)
return error_msg
def add_document(self, text: str, metadata: Dict[str, Any], doc_id: Optional[str] = None) -> str:
"""Add a document to the ChromaDB collection."""
return self.chroma_manager.add_document(text, metadata, doc_id)
def get_collection_info(self) -> Dict[str, Any]:
"""Get information about the ChromaDB collection."""
return self.collection.get()
def ask(self, query: str, n_results: int = 8, temperature: float = 0.7) -> Dict[str, Any]:
"""
Process a query and return a response with relevant context.
Args:
query: The user's question
n_results: Number of documents to retrieve
temperature: Controls randomness in generation
Returns:
Dictionary with response and context information
"""
# Add user message to history
self.add_message("user", query)
# Retrieve context
contexts, metadata = self.retrieve_context(query, n_results)
# Check if we found relevant documents
if not contexts:
response = "I couldn't find any relevant information to answer your question. Could you please rephrase or ask about a different topic related to American University's programs or courses?"
else:
# Generate response - NO CHUNKING, get full response
response = self.generate_response(query, contexts, metadata, temperature)
# Instead of chunking, truncate if absolutely necessary (rarely needed with 4000 token limit)
if len(response) > 15000: # Very high limit just as a safeguard
response = response[:14800] + "...\n\n[Response truncated due to length. Please ask for specific details if needed.]"
# Add assistant message to history
self.add_message("assistant", response)
# Return the result with context information
return {
"response": response,
"contexts": contexts,
"metadata": metadata,
"history": self.conversation_history
}
# Then simplify the standalone function to just call this
def ask_question(query: str, n_results: int = 8, temperature: float = 0.7) -> Dict[str, Any]:
"""Ask a question to the chatbot."""
return chatbot.ask(query, n_results, temperature)
# Create a singleton instance
chatbot = AcademicChatbot()
# Convenience function for direct usage
def ask_question(query: str, n_results: int = 10, temperature: float = 0.7) -> Dict[str, Any]:
"""Ask a question to the chatbot."""
return chatbot.ask(query, n_results, temperature)
# Convenience function to clear conversation history
def clear_conversation():
"""Clear the conversation history."""
chatbot.clear_history()
# Convenience function to add a document
def add_document(text: str, metadata: Dict[str, Any], doc_id: Optional[str] = None) -> str:
"""Add a document to the collection."""
return chatbot.add_document(text, metadata, doc_id)
# Interactive chat function for CLI usage
def split_long_response(response: str, max_chunk_size: int = 3500) -> List[str]:
"""
Split a long response into manageable chunks while preserving whole sentences.
Args:
response (str): The full response text
max_chunk_size (int): Maximum size of each chunk in characters
Returns:
List[str]: List of response chunks
"""
# If response is short enough, return as single chunk
if len(response) <= max_chunk_size:
return [response]
# Function to split text into sentences
def split_sentences(text):
# Use multiple delimiters to split sentences
import re
return re.split(r'(?<=[.!?])\s+', text)
chunks = []
current_chunk = []
current_chunk_length = 0
sentences = split_sentences(response)
for sentence in sentences:
# If adding this sentence would exceed max chunk size, start a new chunk
if current_chunk_length + len(sentence) > max_chunk_size:
# Join the current chunk and add to chunks
chunks.append(' '.join(current_chunk))
current_chunk = []
current_chunk_length = 0
# Add sentence to current chunk
current_chunk.append(sentence)
current_chunk_length += len(sentence) + 1 # +1 for space
# Add the last chunk if not empty
if current_chunk:
chunks.append(' '.join(current_chunk))
# Add continuation markers
for i in range(len(chunks)):
if i < len(chunks) - 1:
chunks[i] += f"\n\n(Continued in next message - Part {i+1}/{len(chunks)})"
else:
chunks[i] += f"\n\n(End of response - Part {i+1}/{len(chunks)})"
return chunks
def generate_response_with_mistral(prompt, temperature):
"""
Generate response using Mistral 7B via Hugging Face API.
Args:
prompt: Fully formatted prompt for the model
temperature: Sampling temperature for response generation
Returns:
Generated response as a string
"""
if not HF_API_KEY:
raise ValueError("Hugging Face API key not found. Please configure credentials.")
try:
# Initialize Hugging Face client
client = InferenceClient(
"mistralai/Mistral-7B-Instruct-v0.3",
token=HF_API_KEY
)
# Generate response
response = client.text_generation(
prompt,
max_new_tokens=4096, # Increased token limit
temperature=temperature,
stop_sequences=["\n\nUser:"], # Prevent generating additional conversations
)
return response.strip()
except Exception as e:
error_msg = f"Error generating response with Mistral: {e}"
logger.error(error_msg)
return error_msg
# Generate response
try:
response = client.text_generation(
prompt,
max_new_tokens=4096, # Increased token limit
temperature=temperature,
stop_sequences=["\n\nUser:"], # Prevent generating additional conversations
)
return response.strip()
except Exception as e:
logging.error(f"Error generating response with Mistral: {str(e)}")
return f"I apologize, but I encountered an error generating a response: {str(e)}"
def clear_conversation():
"""
Clear the conversation history.
Implement this based on your specific conversation tracking mechanism.
"""
# Reset any conversation-specific state
# For example, you might clear a list of previous messages
pass
# Optional: Add a function to retrieve full response chunks if needed
def get_full_response_chunks(result):
"""
Retrieve all chunks of a potentially long response.
Args:
result (Dict): Result from ask_question
Returns:
List[str]: All response chunks
"""
return result.get('full_response_chunks', [result.get('response', '')])
def initialize_chatbot():
"""
Initialize the chatbot with a welcome message and system setup.
Returns:
Dict[str, str]: Initial chatbot response
"""
welcome_message = """Welcome to the American University Academic Advisor Chatbot!
I'm here to help you with information about:
- Academic programs
- Course details
- Program requirements
- Academic policies
What would you like to know about American University's academic offerings?
Some example questions you can ask:
- Tell me about the Data Science program
- What are the requirements for a Data Science major?
- What courses are required for a Statistics minor?
- Can you help me understand the AU Core curriculum?
Feel free to ask, and I'll do my best to provide comprehensive and helpful information!"""
return {
"response": welcome_message,
"sources": "AU Academic Advisor Chatbot - Initial Welcome Message"
}
def get_chatbot_info():
"""
Provide information about the chatbot's capabilities and sources.
Returns:
Dict[str, str]: Chatbot information
"""
info_message = """πŸ€– AU Academic Advisor Chatbot Information
Data Sources:
- American University's official website
- Course catalog
- Program description pages
- Academic department information
Technologies Used:
- Retrieval-Augmented Generation (RAG)
- Mistral 7B Language Model
- ChromaDB Vector Database
- Sentence Transformers for Embedding
Capabilities:
- Retrieve detailed information about academic programs
- Explain course requirements
- Provide insights into academic policies
- Offer guidance on course selection
Limitations:
- Information is based on available web sources
- Might not reflect the most recent updates
- Recommended to verify critical information with official AU sources
Developed as a student research project to assist with academic advising.
"""
return {
"response": info_message,
"sources": "AU Academic Advisor Chatbot - System Information"
}
def interactive_chat():
"""
Run an interactive chat session in the command line.
Updated to handle multi-part responses.
"""
print("πŸ€– AU Academic Advisor Chatbot - Interactive Mode")
print("Type 'quit', 'exit', or 'q' to end the conversation.")
print("Type 'info' to get information about the chatbot.\n")
# Start with initialization message
init_response = initialize_chatbot()
print("πŸ€– ", init_response["response"])
print("\n--- How can I help you today? ---\n")
while True:
try:
# Get user input
user_query = input("You: ").strip()
# Check for exit commands
if user_query.lower() in ['quit', 'exit', 'q']:
print("\nπŸ€– Thank you for using the AU Academic Advisor Chatbot. Goodbye!")
break
# Check for info command
if user_query.lower() == 'info':
info_response = get_chatbot_info()
print("πŸ€– ", info_response["response"])
continue
# Process the query
if user_query:
print("\nπŸ€– Thinking...\n")
response = ask_question(user_query)
# Print the response - Use full_response if available
if "full_response" in response:
print("πŸ€– ", response["full_response"])
else:
print("πŸ€– ", response["response"])
# Print sources if available from metadata
if "metadata" in response and response["metadata"]:
print("\n--- Sources ---")
for i, meta in enumerate(response["metadata"]):
source = meta.get('url', 'Unknown Source')
title = meta.get('title', 'Untitled')
print(f"{i+1}. {title} - {source}")
print("\n")
except KeyboardInterrupt:
print("\n\nπŸ€– Chat interrupted. Type 'quit' to exit.")
except Exception as e:
print(f"\nπŸ€– An error occurred: {e}")
# Run the interactive chat when the script is executed directly
if __name__ == "__main__":
try:
interactive_chat()
except Exception as e:
print(f"An unexpected error occurred: {e}")
import traceback
traceback.print_exc()
# Ensure these functions are available when the module is imported
__all__ = [
'ask_question',
'initialize_chatbot',
'get_chatbot_info',
'clear_conversation',
'split_long_response',
'interactive_chat'
]