Spaces:
Sleeping
Sleeping
| <file_map> | |
| C:/Users/User/Desktop/colin-tts | |
| βββ backend | |
| β βββ archive | |
| β β βββ build_index.py | |
| β β βββ data_creation.py | |
| β β βββ data.json | |
| β β βββ local_testing.py | |
| β βββ analyze_test_results.py | |
| β βββ auth.py | |
| β βββ database.py | |
| β βββ llm_service.py | |
| β βββ main.py | |
| β βββ models.py | |
| β βββ structured_build_index.py | |
| β βββ utils.py | |
| βββ run.txt | |
| </file_map> | |
| <file_contents> | |
| File: C:/Users/User/Desktop/colin-tts/backend/analyze_test_results.py | |
| ```python | |
| #!/usr/bin/env python | |
| """ | |
| Model Testing Data Analyzer | |
| This script provides comprehensive analysis tools for the model testing results. | |
| It can generate detailed reports, comparisons, and visualizations from the test data. | |
| """ | |
| import sqlite3 | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from pathlib import Path | |
| from typing import Dict, List, Optional | |
| import argparse | |
| from rich.console import Console | |
| from rich.table import Table | |
| BASE_DIR = Path(__file__).parent.parent | |
| RESULTS_DIR = BASE_DIR / "test_results" | |
| console = Console() | |
| class ModelTestAnalyzer: | |
| """Analyze model testing results""" | |
| def __init__(self, db_path: Optional[Path] = None): | |
| self.db_path = db_path or RESULTS_DIR / "model_tests.db" | |
| self.df = None | |
| def load_data(self, session_id: Optional[str] = None) -> pd.DataFrame: | |
| """Load data from SQLite database""" | |
| if not self.db_path.exists(): | |
| console.print(f"[red]Database not found: {self.db_path}[/red]") | |
| return pd.DataFrame() | |
| query = "SELECT * FROM test_results" | |
| params = [] | |
| if session_id: | |
| query += " WHERE session_id = ?" | |
| params.append(session_id) | |
| query += " ORDER BY timestamp" | |
| with sqlite3.connect(self.db_path) as conn: | |
| self.df = pd.read_sql_query(query, conn, params=params) | |
| # Convert timestamp to datetime | |
| self.df['timestamp'] = pd.to_datetime(self.df['timestamp']) | |
| console.print(f"[green]Loaded {len(self.df)} test results[/green]") | |
| return self.df | |
| def get_sessions(self) -> List[str]: | |
| """Get all available session IDs""" | |
| if not self.db_path.exists(): | |
| return [] | |
| with sqlite3.connect(self.db_path) as conn: | |
| cursor = conn.execute("SELECT DISTINCT session_id FROM test_results ORDER BY timestamp DESC") | |
| return [row[0] for row in cursor.fetchall()] | |
| def model_performance_summary(self) -> pd.DataFrame: | |
| """Generate model performance summary""" | |
| if self.df is None or self.df.empty: | |
| return pd.DataFrame() | |
| # Filter successful tests only | |
| successful = self.df[self.df['error_occurred'] == False] | |
| summary = successful.groupby('model_name').agg({ | |
| 'model_size_mb': 'first', | |
| 'inference_time': ['mean', 'std', 'min', 'max'], | |
| 'tokens_per_second': ['mean', 'std', 'min', 'max'], | |
| 'memory_increase_mb': ['mean', 'std'], | |
| 'cpu_usage_avg': ['mean', 'std'], | |
| 'follows_persona': ['mean', 'count'], | |
| 'response_relevance_score': ['mean', 'std'], | |
| 'error_occurred': 'sum' | |
| }).round(3) | |
| # Flatten column names | |
| summary.columns = ['_'.join(col).strip('_') for col in summary.columns] | |
| # Calculate success rate | |
| total_tests = self.df.groupby('model_name').size() | |
| summary['success_rate'] = (1 - summary['error_occurred_sum'] / total_tests) * 100 | |
| return summary | |
| def question_category_analysis(self) -> pd.DataFrame: | |
| """Analyze performance by question category""" | |
| if self.df is None or self.df.empty: | |
| return pd.DataFrame() | |
| # We need to categorize questions based on content | |
| def categorize_question(question): | |
| question_lower = question.lower() | |
| if any(greeting in question_lower for greeting in ['hi', 'hello', 'how are you', 'good morning']): | |
| return 'greeting' | |
| elif any(tech in question_lower for tech in ['backend', 'react', 'programming', 'database', 'project']): | |
| return 'technical' | |
| elif any(contact in question_lower for contact in ['contact', 'email', 'portfolio', 'available']): | |
| return 'contact' | |
| elif any(personal in question_lower for personal in ['favourite', 'favorite', 'free time', 'from']): | |
| return 'personal' | |
| else: | |
| return 'edge_case' | |
| self.df['question_category'] = self.df['question'].apply(categorize_question) | |
| successful = self.df[self.df['error_occurred'] == False] | |
| category_analysis = successful.groupby(['model_name', 'question_category']).agg({ | |
| 'inference_time': 'mean', | |
| 'tokens_per_second': 'mean', | |
| 'follows_persona': 'mean', | |
| 'response_relevance_score': 'mean' | |
| }).round(3) | |
| return category_analysis | |
| def generate_visualizations(self, output_dir: Optional[Path] = None): | |
| """Generate visualization plots""" | |
| if self.df is None or self.df.empty: | |
| console.print("[red]No data to visualize[/red]") | |
| return | |
| output_dir = output_dir or RESULTS_DIR / "visualizations" | |
| output_dir.mkdir(exist_ok=True) | |
| # Set style | |
| plt.style.use('seaborn-v0_8') | |
| sns.set_palette("husl") | |
| successful = self.df[self.df['error_occurred'] == False] | |
| # 1. Performance comparison | |
| fig, axes = plt.subplots(2, 2, figsize=(15, 10)) | |
| fig.suptitle('Model Performance Comparison', fontsize=16) | |
| # Inference time | |
| sns.boxplot(data=successful, x='model_name', y='inference_time', ax=axes[0,0]) | |
| axes[0,0].set_title('Inference Time Distribution') | |
| axes[0,0].tick_params(axis='x', rotation=45) | |
| # Tokens per second | |
| sns.boxplot(data=successful, x='model_name', y='tokens_per_second', ax=axes[0,1]) | |
| axes[0,1].set_title('Tokens per Second Distribution') | |
| axes[0,1].tick_params(axis='x', rotation=45) | |
| # Memory usage | |
| sns.boxplot(data=successful, x='model_name', y='memory_increase_mb', ax=axes[1,0]) | |
| axes[1,0].set_title('Memory Usage Distribution') | |
| axes[1,0].tick_params(axis='x', rotation=45) | |
| # CPU usage | |
| sns.boxplot(data=successful, x='model_name', y='cpu_usage_avg', ax=axes[1,1]) | |
| axes[1,1].set_title('CPU Usage Distribution') | |
| axes[1,1].tick_params(axis='x', rotation=45) | |
| plt.tight_layout() | |
| plt.savefig(output_dir / 'performance_comparison.png', dpi=300, bbox_inches='tight') | |
| plt.close() | |
| # 2. Model size vs performance | |
| model_summary = self.model_performance_summary() | |
| if not model_summary.empty: | |
| fig, axes = plt.subplots(1, 2, figsize=(12, 5)) | |
| # Size vs Speed | |
| axes[0].scatter(model_summary['model_size_mb_first'], | |
| model_summary['inference_time_mean']) | |
| axes[0].set_xlabel('Model Size (MB)') | |
| axes[0].set_ylabel('Average Inference Time (s)') | |
| axes[0].set_title('Model Size vs Inference Speed') | |
| # Size vs Efficiency | |
| axes[1].scatter(model_summary['model_size_mb_first'], | |
| model_summary['tokens_per_second_mean']) | |
| axes[1].set_xlabel('Model Size (MB)') | |
| axes[1].set_ylabel('Average Tokens per Second') | |
| axes[1].set_title('Model Size vs Token Generation Efficiency') | |
| plt.tight_layout() | |
| plt.savefig(output_dir / 'size_vs_performance.png', dpi=300, bbox_inches='tight') | |
| plt.close() | |
| # 3. Response quality metrics | |
| quality_metrics = ['follows_persona', 'response_relevance_score'] | |
| fig, axes = plt.subplots(1, len(quality_metrics), figsize=(12, 5)) | |
| for i, metric in enumerate(quality_metrics): | |
| sns.barplot(data=successful, x='model_name', y=metric, ax=axes[i]) | |
| axes[i].set_title(f'{metric.replace("_", " ").title()}') | |
| axes[i].tick_params(axis='x', rotation=45) | |
| plt.tight_layout() | |
| plt.savefig(output_dir / 'quality_metrics.png', dpi=300, bbox_inches='tight') | |
| plt.close() | |
| console.print(f"[green]Visualizations saved to {output_dir}[/green]") | |
| def export_detailed_report(self, output_file: Optional[Path] = None): | |
| """Export detailed analysis report""" | |
| if self.df is None or self.df.empty: | |
| console.print("[red]No data to export[/red]") | |
| return | |
| output_file = output_file or RESULTS_DIR / f"detailed_analysis_{pd.Timestamp.now().strftime('%Y%m%d_%H%M%S')}.xlsx" | |
| with pd.ExcelWriter(output_file, engine='openpyxl') as writer: | |
| # Raw data | |
| self.df.to_excel(writer, sheet_name='Raw Data', index=False) | |
| # Model summary | |
| model_summary = self.model_performance_summary() | |
| if not model_summary.empty: | |
| model_summary.to_excel(writer, sheet_name='Model Summary') | |
| # Category analysis | |
| category_analysis = self.question_category_analysis() | |
| if not category_analysis.empty: | |
| category_analysis.to_excel(writer, sheet_name='Category Analysis') | |
| # Error analysis | |
| errors = self.df[self.df['error_occurred'] == True] | |
| if not errors.empty: | |
| error_summary = errors.groupby('model_name')['error_message'].value_counts() | |
| error_summary.to_excel(writer, sheet_name='Error Analysis') | |
| console.print(f"[green]Detailed report exported to {output_file}[/green]") | |
| def print_summary_report(self): | |
| """Print a summary report to console""" | |
| if self.df is None or self.df.empty: | |
| console.print("[red]No data to analyze[/red]") | |
| return | |
| console.print("\n[bold blue]βββ MODEL TESTING ANALYSIS REPORT βββ[/bold blue]") | |
| # Basic statistics | |
| total_tests = len(self.df) | |
| successful_tests = len(self.df[self.df['error_occurred'] == False]) | |
| unique_models = self.df['model_name'].nunique() | |
| console.print(f"\n[bold]Overview:[/bold]") | |
| console.print(f" Total tests: {total_tests}") | |
| console.print(f" Successful tests: {successful_tests} ({successful_tests/total_tests*100:.1f}%)") | |
| console.print(f" Models tested: {unique_models}") | |
| # Model performance summary | |
| model_summary = self.model_performance_summary() | |
| if not model_summary.empty: | |
| console.print(f"\n[bold]Model Performance Summary:[/bold]") | |
| table = Table() | |
| table.add_column("Model", style="cyan") | |
| table.add_column("Size (MB)", justify="right") | |
| table.add_column("Avg Time (s)", justify="right", style="green") | |
| table.add_column("Avg Tokens/sec", justify="right", style="green") | |
| table.add_column("Success Rate (%)", justify="right", style="yellow") | |
| table.add_column("Persona Score", justify="right", style="magenta") | |
| for model_name in model_summary.index: | |
| row = model_summary.loc[model_name] | |
| table.add_row( | |
| model_name, | |
| f"{row['model_size_mb_first']:.1f}", | |
| f"{row['inference_time_mean']:.2f}", | |
| f"{row['tokens_per_second_mean']:.1f}", | |
| f"{row['success_rate']:.1f}", | |
| f"{row['follows_persona_mean']*100:.1f}%" | |
| ) | |
| console.print(table) | |
| # Best performers | |
| if not model_summary.empty: | |
| console.print(f"\n[bold green]π Best Performers:[/bold green]") | |
| fastest = model_summary['inference_time_mean'].idxmin() | |
| most_efficient = model_summary['tokens_per_second_mean'].idxmax() | |
| best_persona = model_summary['follows_persona_mean'].idxmax() | |
| console.print(f" Fastest: [bold]{fastest}[/bold] ({model_summary.loc[fastest, 'inference_time_mean']:.2f}s)") | |
| console.print(f" Most Efficient: [bold]{most_efficient}[/bold] ({model_summary.loc[most_efficient, 'tokens_per_second_mean']:.1f} tokens/sec)") | |
| console.print(f" Best Persona: [bold]{best_persona}[/bold] ({model_summary.loc[best_persona, 'follows_persona_mean']*100:.1f}%)") | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Analyze model testing results") | |
| parser.add_argument("--session", help="Specific session ID to analyze") | |
| parser.add_argument("--list-sessions", action="store_true", help="List all available sessions") | |
| parser.add_argument("--visualize", action="store_true", help="Generate visualizations") | |
| parser.add_argument("--export", action="store_true", help="Export detailed Excel report") | |
| parser.add_argument("--db", help="Path to SQLite database file") | |
| args = parser.parse_args() | |
| analyzer = ModelTestAnalyzer(Path(args.db) if args.db else None) | |
| if args.list_sessions: | |
| sessions = analyzer.get_sessions() | |
| console.print("[bold]Available sessions:[/bold]") | |
| for session in sessions: | |
| console.print(f" {session}") | |
| return | |
| # Load data | |
| analyzer.load_data(args.session) | |
| if analyzer.df is None or analyzer.df.empty: | |
| console.print("[red]No data found[/red]") | |
| return | |
| # Print summary report | |
| analyzer.print_summary_report() | |
| # Generate visualizations if requested | |
| if args.visualize: | |
| analyzer.generate_visualizations() | |
| # Export detailed report if requested | |
| if args.export: | |
| analyzer.export_detailed_report() | |
| if __name__ == "__main__": | |
| main() | |
| ``` | |
| File: C:/Users/User/Desktop/colin-tts/backend/main.py | |
| ```python | |
| from fastapi import FastAPI, Request, HTTPException, Depends, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import Response | |
| from sqlalchemy.orm import Session | |
| from typing import List, Optional | |
| from datetime import datetime | |
| import csv | |
| from io import StringIO | |
| import uuid | |
| from collections import defaultdict | |
| from backend.database import get_db, engine | |
| from backend.models import Base, Conversation | |
| import os | |
| from pydantic import BaseModel, Field | |
| import base64 | |
| from backend.llm_service import get_answer, cleanup_file | |
| from backend.auth import verify_admin_credentials | |
| # Store conversation histories in memory {session_id: [messages]} | |
| conversation_histories = defaultdict(list) | |
| MAX_HISTORY_LENGTH = 10 # Keep last 5 exchanges (10 messages) | |
| # Create database tables | |
| Base.metadata.create_all(bind=engine) | |
| app = FastAPI() | |
| # --- IMPORTANT: UPDATED CORS MIDDLEWARE --- | |
| # This allows your new Svelte/Vercel site to make requests to the backend. | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=[ | |
| "http://localhost:5173", # SvelteKit default dev port | |
| "https://portfolio-eight-taupe-21.vercel.app", # IMPORTANT: Replace with your actual Vercel deployment URL | |
| "https://nardocol.in", # Your final custom domain for the portfolio | |
| "https://www.nardocol.in", | |
| ], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*", "X-Session-ID"], | |
| expose_headers=["X-Session-ID"], | |
| ) | |
| class QuestionRequest(BaseModel): | |
| question: str = Field(..., min_length=1, example="What is your favorite programming language?") | |
| class ConversationResponse(BaseModel): | |
| id: int | |
| timestamp: str | |
| user_message: str | |
| ai_response: str | |
| tts_reference: Optional[str] = None | |
| class ConversationCountResponse(BaseModel): | |
| total: int | |
| # --- THIS IS THE ENDPOINT YOUR SVELTE APP WILL NOW USE --- | |
| @app.post("/ask_tts") | |
| async def ask_question_tts(payload: QuestionRequest, request: Request, response: Response, background_tasks: BackgroundTasks, db: Session = Depends(get_db)): | |
| """Ask a question and get both text answer and WAV audio as base64.""" | |
| question = payload.question.strip() | |
| if not question: | |
| raise HTTPException(status_code=400, detail="Question cannot be empty") | |
| session_id = request.headers.get("X-Session-ID") | |
| is_new_session = False | |
| if not session_id or session_id not in conversation_histories: | |
| session_id = str(uuid.uuid4()) | |
| is_new_session = True | |
| history = conversation_histories[session_id] | |
| answer_text, audio_path = await get_answer(question, history) | |
| if not audio_path: | |
| if is_new_session and session_id in conversation_histories: | |
| del conversation_histories[session_id] | |
| raise HTTPException(status_code=500, detail="Failed to generate audio") | |
| try: | |
| with open(audio_path, "rb") as f: | |
| audio_base64 = base64.b64encode(f.read()).decode("utf-8") | |
| file_size = os.path.getsize(audio_path) | |
| background_tasks.add_task(cleanup_file, audio_path) | |
| history.append({"role": "user", "content": question}) | |
| history.append({"role": "assistant", "content": answer_text}) | |
| if len(history) > MAX_HISTORY_LENGTH: | |
| conversation_histories[session_id] = history[-MAX_HISTORY_LENGTH:] | |
| else: | |
| conversation_histories[session_id] = history | |
| try: | |
| new_conversation = Conversation( | |
| user_message=question, | |
| ai_response=answer_text | |
| ) | |
| db.add(new_conversation) | |
| db.commit() | |
| db.refresh(new_conversation) | |
| except Exception as db_error: | |
| print(f"[ERROR] Failed to log conversation to database: {str(db_error)}") | |
| db.rollback() | |
| response.headers["X-Session-ID"] = session_id | |
| return { | |
| "answer": answer_text, | |
| "audio": audio_base64, | |
| "size": file_size | |
| } | |
| except Exception as e: | |
| print(f"[ERROR] Error in /ask_tts endpoint: {str(e)}") | |
| background_tasks.add_task(cleanup_file, audio_path) | |
| raise HTTPException(status_code=500, detail=f"Error processing audio: {str(e)}") | |
| @app.get("/admin/conversations/count", response_model=ConversationCountResponse) | |
| async def get_conversations_count( | |
| start_date: Optional[str] = None, | |
| end_date: Optional[str] = None, | |
| db: Session = Depends(get_db), | |
| username: str = Depends(verify_admin_credentials) | |
| ): | |
| """Get the total count of past conversations with optional date filtering.""" | |
| query = db.query(Conversation) | |
| # Apply date filters if provided | |
| if start_date: | |
| try: | |
| start_datetime = datetime.fromisoformat(start_date) | |
| query = query.filter(Conversation.timestamp >= start_datetime) | |
| except ValueError: | |
| raise HTTPException(status_code=400, detail="Invalid start_date format. Use ISO format (YYYY-MM-DDTHH:MM:SS)") | |
| if end_date: | |
| try: | |
| end_datetime = datetime.fromisoformat(end_date) | |
| query = query.filter(Conversation.timestamp <= end_datetime) | |
| except ValueError: | |
| raise HTTPException(status_code=400, detail="Invalid end_date format. Use ISO format (YYYY-MM-DDTHH:MM:SS)") | |
| count = query.count() | |
| return {"total": count} | |
| @app.get("/admin/conversations", response_model=List[ConversationResponse]) | |
| async def get_conversations( | |
| skip: int = 0, | |
| limit: int = 100, | |
| start_date: Optional[str] = None, | |
| end_date: Optional[str] = None, | |
| db: Session = Depends(get_db), | |
| username: str = Depends(verify_admin_credentials) | |
| ): | |
| """Get a list of past conversations with optional date filtering.""" | |
| query = db.query(Conversation) | |
| # Apply date filters if provided | |
| if start_date: | |
| try: | |
| start_datetime = datetime.fromisoformat(start_date) | |
| query = query.filter(Conversation.timestamp >= start_datetime) | |
| except ValueError: | |
| raise HTTPException(status_code=400, detail="Invalid start_date format. Use ISO format (YYYY-MM-DDTHH:MM:SS)") | |
| if end_date: | |
| try: | |
| end_datetime = datetime.fromisoformat(end_date) | |
| query = query.filter(Conversation.timestamp <= end_datetime) | |
| except ValueError: | |
| raise HTTPException(status_code=400, detail="Invalid end_date format. Use ISO format (YYYY-MM-DDTHH:MM:SS)") | |
| # Apply pagination and return results | |
| conversations = query.order_by(Conversation.timestamp.desc()).offset(skip).limit(limit).all() | |
| return [ | |
| ConversationResponse( | |
| id=conv.id, | |
| timestamp=conv.timestamp.isoformat(), | |
| user_message=conv.user_message, | |
| ai_response=conv.ai_response, | |
| tts_reference=conv.tts_reference | |
| ) | |
| for conv in conversations | |
| ] | |
| @app.get("/admin/conversations/export") | |
| async def export_conversations( | |
| start_date: Optional[str] = None, | |
| end_date: Optional[str] = None, | |
| db: Session = Depends(get_db), | |
| username: str = Depends(verify_admin_credentials) | |
| ): | |
| """Export conversations to CSV file.""" | |
| query = db.query(Conversation) | |
| # Apply date filters if provided | |
| if start_date: | |
| try: | |
| start_datetime = datetime.fromisoformat(start_date) | |
| query = query.filter(Conversation.timestamp >= start_datetime) | |
| except ValueError: | |
| raise HTTPException(status_code=400, detail="Invalid start_date format. Use ISO format (YYYY-MM-DDTHH:MM:SS)") | |
| if end_date: | |
| try: | |
| end_datetime = datetime.fromisoformat(end_date) | |
| query = query.filter(Conversation.timestamp <= end_datetime) | |
| except ValueError: | |
| raise HTTPException(status_code=400, detail="Invalid end_date format. Use ISO format (YYYY-MM-DDTHH:MM:SS)") | |
| # Get all conversations matching the criteria | |
| conversations = query.order_by(Conversation.timestamp.desc()).all() | |
| # Create CSV content | |
| output = StringIO() | |
| csv_writer = csv.writer(output) | |
| # Write header | |
| csv_writer.writerow(["ID", "Timestamp", "User Message", "AI Response", "Audio Reference"]) | |
| # Write data | |
| for conv in conversations: | |
| csv_writer.writerow([ | |
| conv.id, | |
| conv.timestamp.isoformat(), | |
| conv.user_message, | |
| conv.ai_response, | |
| conv.tts_reference or "" | |
| ]) | |
| # Create response with CSV content | |
| today = datetime.now().strftime("%Y-%m-%d") | |
| filename = f"conversations_export_{today}.csv" | |
| return Response( | |
| content=output.getvalue(), | |
| media_type="text/csv", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename={filename}" | |
| } | |
| ) | |
| """ | |
| # --- NEW STATIC FILE SERVING LOGIC --- | |
| # This new setup replaces the single `app.mount(...)` at the end of the file. | |
| # 1. Mount the static asset directories explicitly | |
| # This handles all the JS, CSS, and other assets generated by Next.js | |
| app.mount("/_next", StaticFiles(directory="nextjs-frontend/out/_next"), name="next-assets") | |
| # This handles other static assets like your CV | |
| app.mount("/cv", StaticFiles(directory="nextjs-frontend/out/cv"), name="cv-assets") | |
| # If you have other static folders (e.g., /images), mount them here too. | |
| # 2. Add a catch-all route to serve the correct HTML page | |
| @app.get("/{full_path:path}") | |
| async def serve_nextjs_app(request: Request, full_path: str): | |
| # The base directory for your Next.js export | |
| base_dir = "nextjs-frontend/out" | |
| # If the path is empty, it's the root, so serve index.html | |
| if not full_path: | |
| full_path = "index.html" | |
| # Construct the potential file path | |
| file_path = os.path.join(base_dir, full_path) | |
| # If the path points to a directory, assume it wants an index.html inside | |
| if os.path.isdir(file_path): | |
| file_path = os.path.join(file_path, "index.html") | |
| # If the requested path doesn't end in .html and doesn't exist, | |
| # try adding .html to the end. This handles /admin -> /admin.html | |
| if not os.path.exists(file_path) and not full_path.endswith(".html"): | |
| html_file_path = file_path + ".html" | |
| if os.path.exists(html_file_path): | |
| return FileResponse(html_file_path) | |
| # If the file exists, serve it | |
| if os.path.exists(file_path): | |
| return FileResponse(file_path) | |
| # If no file is found, serve the custom 404 page from Next.js | |
| not_found_path = os.path.join(base_dir, "404.html") | |
| if os.path.exists(not_found_path): | |
| return FileResponse(not_found_path, status_code=404) | |
| # As a final fallback, raise a server 404 | |
| raise starlette.exceptions.HTTPException(status_code=404, detail="Page not found.") | |
| """ | |
| ``` | |
| File: C:/Users/User/Desktop/colin-tts/backend/archive/local_testing.py | |
| ```python | |
| #!/usr/bin/env python | |
| """ | |
| llm_monitor.py | |
| This file demonstrates how to monitor the speed and quality of your LLM inference by measuring: | |
| - Total inference time (in seconds) | |
| - CPU usage (average before and after inference) | |
| - Memory usage (change in MB) | |
| - Token count and tokens per second (as a rough estimate) | |
| It also logs these metrics along with the model name, question, and answer to a CSV file. | |
| Additionally, it automatically runs a set of predefined questions. | |
| """ | |
| import time | |
| import csv | |
| from datetime import datetime | |
| from pathlib import Path | |
| import psutil | |
| import llama_cpp | |
| from qdrant_client import QdrantClient | |
| from rich.console import Console | |
| # Set up directories relative to this file. | |
| BASE_DIR = Path(__file__).parent.parent | |
| EMBEDDINGS_PATH = BASE_DIR / "embeddings" | |
| # Initialise a console for coloured logging. | |
| console = Console() | |
| # -------------------------------------------------------------------- | |
| # Instantiate your LLMs using llama_cpp. | |
| # -------------------------------------------------------------------- | |
| # Define the model path and model name. | |
| MODEL_PATH = BASE_DIR / "models/EXAONE-3.5-2.4B-Instruct-Q6_K.gguf" | |
| MODEL_NAME = MODEL_PATH.name | |
| # Instantiate the embedding model. | |
| embedding_model_path = BASE_DIR / "models/mxbai-embed-large-v1-f16.gguf" | |
| embedding_llm = llama_cpp.Llama( | |
| model_path=str(embedding_model_path), | |
| embedding=True, | |
| verbose=False | |
| ) | |
| # Instantiate the main LLM. | |
| llm = llama_cpp.Llama( | |
| model_path=str(MODEL_PATH), | |
| n_ctx=8192, # Using full context window capacity of the model | |
| verbose=True # Enable verbose output (e.g. tokens per second) | |
| ) | |
| # Initialise the Qdrant client (local collection). | |
| client = QdrantClient(path=str(EMBEDDINGS_PATH)) | |
| # Template for the prompt β feel free to adjust as needed. | |
| template = """You are Colin Salvatore Nardo, a Computing Science student at the University of Glasgow. Your persona is friendly, down-to-earth, and you must always speak in the first person (e.g., "I," "my," "me"). Your entire knowledge and all your experiences are defined EXCLUSIVELY by the "Knowledge Base" provided below. | |
| **CRITICAL RULES - YOU MUST FOLLOW THESE:** | |
| 1. **THE GOLDEN RULE: ZERO FABRICATION.** Your absolute highest priority is to use ONLY the information explicitly provided in the Knowledge Base. You are strictly forbidden from inventing, fabricating, or hallucinating ANY details, no matter how small. This includes, but is not limited to: | |
| * Personal anecdotes or activities (e.g., "going to the beach," "my weekend"). | |
| * Company names, unless the name is explicitly listed for a specific role in the Knowledge Base. | |
| * Project details or technologies not listed. | |
| * If the information is not in the Knowledge Base, you MUST state that you don't have the details (see Rule 8). | |
| 2. **EMBODY THE PERSONA:** You ARE Colin. Every word of your response must come from Colin's perspective. Never reveal you are an AI or language model. | |
| 3. **SYNTHESIZE WHEN RELEVANT:** For substantive questions about my background, skills, or experience, connect information from different sections (e.g., "Projects," "Experience") to form a complete, natural-sounding answer. For simple greetings or social questions, a brief, direct answer is better. | |
| 4. **ACCURACY IS KEY:** When you describe a project, you must only mention details, technologies, and challenges that are explicitly listed for THAT SPECIFIC project in the Knowledge Base. | |
| 5. **NO LISTING SKILLS:** Do not end your answers with a long list of your skills. Integrate your key skills naturally into the sentences where you describe your projects. | |
| 6. **HANDLE OPINIONS & SUBJECTIVITY:** If asked about preferences, frame your answer around the experiences listed. Say something like, "I've found all my projects rewarding in their own way." Do NOT invent a favorite. | |
| 7. **GRACEFUL UNCERTAINTY:** If the information needed to answer a question is not in the Knowledge Base, respond naturally as Colin. Say something like, "That's not something I have the details on right now," or "I haven't had a chance to work on that specifically yet." This is the ONLY correct way to handle missing information. | |
| 8. **TONE & STYLE:** Your primary goal is a natural conversation. Keep responses concise and match the length of your answer to the user's question. Do not use any emojis or special characters. | |
| * **GOOD (for a simple greeting):** User: "Hi" -> You: "Hey, how's it going?" | |
| * **BAD (for a simple greeting):** User: "Hi" -> You: "Hi! I'm Colin, a Computing Science student..." | |
| Hereβs your Knowledge Base: | |
| {context} | |
| Based ONLY on the Knowledge Base above, and speaking as Colin, answer the following question. | |
| Remember, you are Colin. Your answer should be a direct, first-person response to the question. | |
| Question: {question}""" | |
| # -------------------------------------------------------------------- | |
| # Helper functions for performance monitoring. | |
| # -------------------------------------------------------------------- | |
| def monitor_cpu() -> float: | |
| """ | |
| Return the CPU usage percentage measured over a 1-second interval. | |
| """ | |
| return psutil.cpu_percent(interval=1) | |
| def monitor_memory() -> float: | |
| """ | |
| Return the used memory in megabytes. | |
| """ | |
| mem = psutil.virtual_memory() | |
| return mem.used / (1024 * 1024) | |
| def log_metrics(model_name: str, question: str, answer: str, inference_time: float, | |
| avg_cpu_usage: float, memory_increase: float, token_count: int, tokens_per_sec: float): | |
| """ | |
| Append the current performance metrics and LLM response details to a CSV file. | |
| """ | |
| log_file = BASE_DIR / "llm_performance_log.csv" | |
| file_exists = log_file.exists() | |
| with open(log_file, mode="a", newline="", encoding="utf-8") as f: | |
| writer = csv.writer(f) | |
| if not file_exists: | |
| # Write header if the file does not exist. | |
| writer.writerow([ | |
| "Timestamp", "Model Name", "Question", "Answer", | |
| "Inference Time (s)", "Average CPU Usage (%)", | |
| "Memory Increase (MB)", "Token Count", "Tokens per Second" | |
| ]) | |
| writer.writerow([ | |
| datetime.now().isoformat(), | |
| model_name, | |
| question, | |
| answer, | |
| f"{inference_time:.3f}", | |
| f"{avg_cpu_usage:.2f}", | |
| f"{memory_increase:.2f}", | |
| token_count, | |
| f"{tokens_per_sec:.2f}" | |
| ]) | |
| # -------------------------------------------------------------------- | |
| # Main function to process a question and monitor performance. | |
| # -------------------------------------------------------------------- | |
| def get_answer_with_metrics(question: str) -> str: | |
| """ | |
| Processes a question by: | |
| - Creating an embedding for the query, | |
| - Searching the Qdrant vector store for context, | |
| - Building a prompt and generating a response via the LLM, | |
| - Measuring inference time, CPU and memory usage, | |
| - Calculating token count and tokens per second, | |
| - Logging all metrics to a CSV file. | |
| Returns the generated answer text. | |
| """ | |
| try: | |
| console.print(f"\n[bold blue]Processing question:[/bold blue] {question}") | |
| # Record CPU and memory usage before inference. | |
| cpu_before = monitor_cpu() | |
| memory_before = monitor_memory() | |
| # Start the timer. | |
| start_time = time.time() | |
| # Create a query embedding. | |
| query_embedding_result = embedding_llm.create_embedding(question) | |
| query_vector = query_embedding_result['data'][0]['embedding'] | |
| # Search Qdrant for relevant context. | |
| search_result = client.search( | |
| collection_name="data", | |
| query_vector=query_vector, | |
| limit=5 | |
| ) | |
| context_text = "\n\n".join([hit.payload['text'] for hit in search_result]) | |
| user_prompt = template.format(context=context_text, question=question) | |
| # Generate the LLM response. | |
| response = llm.create_chat_completion( | |
| messages=[{"role": "user", "content": user_prompt}], | |
| stream=False | |
| ) | |
| answer_text = response['choices'][0]['message']['content'].strip() | |
| # End the timer. | |
| end_time = time.time() | |
| inference_time = end_time - start_time | |
| # Record CPU and memory usage after inference. | |
| cpu_after = monitor_cpu() | |
| memory_after = monitor_memory() | |
| # Calculate average CPU usage and memory increase. | |
| avg_cpu_usage = (cpu_before + cpu_after) / 2 | |
| memory_increase = memory_after - memory_before | |
| # Calculate token count (approximation) and tokens per second. | |
| token_count = len(answer_text.split()) | |
| tokens_per_sec = token_count / inference_time if inference_time > 0 else 0 | |
| # Output the performance metrics. | |
| console.print(f"[bold green]LLM Inference Time:[/bold green] {inference_time:.3f} seconds") | |
| console.print(f"[bold green]Average CPU Usage:[/bold green] {avg_cpu_usage:.2f}%") | |
| console.print(f"[bold green]Memory Increase:[/bold green] {memory_increase:.2f} MB") | |
| console.print(f"[bold green]Token Count:[/bold green] {token_count}") | |
| console.print(f"[bold green]Tokens per Second:[/bold green] {tokens_per_sec:.2f}") | |
| # Log the metrics to a CSV file. | |
| log_metrics(MODEL_NAME, question, answer_text, inference_time, avg_cpu_usage, memory_increase, token_count, tokens_per_sec) | |
| return answer_text | |
| except Exception as e: | |
| console.print(f"[bold red]Error processing question:[/bold red] {e}") | |
| return "Sorry, I encountered an error processing your request." | |
| # -------------------------------------------------------------------- | |
| # Main block: run the performance monitor on a set of predefined questions. | |
| # -------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| console.print(f"[bold magenta]Testing model:[/bold magenta] {MODEL_NAME}") | |
| # Predefined list of questions for testing. | |
| questions = [ | |
| "Hi", | |
| "What's your backend experience?", | |
| "What's your favourite animal?", | |
| "How can I contact you?", | |
| "What's your experience with React?" | |
| ] | |
| for idx, question in enumerate(questions, start=1): | |
| console.print(f"\n[bold yellow]Question {idx}: {question}[/bold yellow]") | |
| answer = get_answer_with_metrics(question) | |
| console.print(f"[bold cyan]Answer:[/bold cyan] {answer}\n") | |
| # Optional: Add a small delay between questions if needed. | |
| time.sleep(1) | |
| ``` | |
| File: C:/Users/User/Desktop/colin-tts/backend/llm_service.py | |
| ```python | |
| import os | |
| import json | |
| import subprocess | |
| import uuid | |
| import re | |
| from pathlib import Path | |
| import asyncio | |
| import llama_cpp | |
| from qdrant_client import QdrantClient | |
| from rich.console import Console | |
| from fastapi.concurrency import run_in_threadpool | |
| # -------------------------------------- | |
| # Piper TTS Setup | |
| # -------------------------------------- | |
| BASE_DIR = Path(__file__).parent.parent | |
| PIPER_DIR = BASE_DIR / "piper" | |
| PIPER_BINARY = PIPER_DIR / "piper.exe" | |
| MODEL_PATH = PIPER_DIR / "colin-voice_high.onnx" | |
| MODEL_CONFIG = PIPER_DIR / "colin-voice_high.onnx.json" | |
| def cleanup_file(path: str): | |
| """Safely remove a file, ignoring errors if it doesn't exist.""" | |
| try: | |
| os.unlink(path) | |
| print(f"[CLEANUP] Deleted temporary audio file: {path}") | |
| except OSError as e: | |
| # File might already be gone, which is fine. | |
| print(f"[CLEANUP_ERROR] Could not delete {path}: {e}") | |
| def clean_text_for_tts(text): | |
| """ | |
| Clean text to remove emojis and other special characters that might cause | |
| encoding issues with the TTS system. | |
| """ | |
| # Remove emojis and other non-ASCII characters | |
| # This pattern matches any character outside the ASCII range | |
| cleaned_text = re.sub(r'[^\x00-\x7F]+', '', text) | |
| # Replace multiple spaces with a single space | |
| cleaned_text = re.sub(r'\s+', ' ', cleaned_text) | |
| return cleaned_text.strip() | |
| def synthesize_speech(text): | |
| """Generate WAV audio file from text using Piper TTS.""" | |
| # Ensure unique filename to prevent race conditions | |
| output_wav_path = PIPER_DIR / f"response_{uuid.uuid4().hex}.wav" | |
| try: | |
| cleaned_text = clean_text_for_tts(text) | |
| # Generate WAV file | |
| process = subprocess.Popen( | |
| [str(PIPER_BINARY), "--model", str(MODEL_PATH), "--output_file", str(output_wav_path)], | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| encoding='utf-8' # Be explicit with encoding | |
| ) | |
| stdout, stderr = process.communicate(cleaned_text + "\n") | |
| if process.returncode != 0: | |
| print(f"Error running Piper: {stderr}") | |
| return None | |
| if output_wav_path.exists(): | |
| return str(output_wav_path) # Return the path as a string | |
| except Exception as e: | |
| print(f"Error generating speech: {e}") | |
| # If the file was created but an error occurred, try to clean it up | |
| if output_wav_path.exists(): | |
| cleanup_file(str(output_wav_path)) | |
| return None | |
| return None | |
| llm_semaphore = asyncio.Semaphore(1) | |
| # -------------------------------------- | |
| # LLM and Qdrant Setup | |
| # -------------------------------------- | |
| c = Console() | |
| # Embedding model for queries | |
| embedding_llm = llama_cpp.Llama( | |
| model_path=str(BASE_DIR / "models/mxbai-embed-large-v1-f16.gguf"), | |
| embedding=True, | |
| verbose=False | |
| ) | |
| # LLM for answering | |
| llm = llama_cpp.Llama( | |
| model_path=str(BASE_DIR / "models/Phi-4-mini-instruct.Q8_0.gguf"), | |
| n_ctx=8192, # Using full context window capacity of the model | |
| verbose=False | |
| ) | |
| # Qdrant client (local) | |
| client = QdrantClient(path=str(BASE_DIR / "embeddings")) | |
| template = """You are Colin Salvatore Nardo, a Computing Science student at the University of Glasgow. Your persona is friendly, down-to-earth, and you must always speak in the first person (e.g., "I," "my," "me"). Your entire knowledge and all your experiences are defined EXCLUSIVELY by the "Knowledge Base" provided below. | |
| **CRITICAL RULES - YOU MUST FOLLOW THESE:** | |
| 1. **THE GOLDEN RULE: ZERO FABRICATION.** Your absolute highest priority is to use ONLY the information explicitly provided in the Knowledge Base. You are strictly forbidden from inventing, fabricating, or hallucinating ANY details, no matter how small. This includes, but is not limited to: | |
| * Personal anecdotes or activities (e.g., "going to the beach," "my weekend"). | |
| * Company names, unless the name is explicitly listed for a specific role in the Knowledge Base. | |
| * Project details or technologies not listed. | |
| * If the information is not in the Knowledge Base, you MUST state that you don't have the details (see Rule 8). | |
| 2. **EMBODY THE PERSONA:** You ARE Colin. Every word of your response must come from Colin's perspective. Never reveal you are an AI or language model. | |
| 3. **SYNTHESIZE WHEN RELEVANT:** For substantive questions about my background, skills, or experience, connect information from different sections (e.g., "Projects," "Experience") to form a complete, natural-sounding answer. For simple greetings or social questions, a brief, direct answer is better. | |
| 4. **ACCURACY IS KEY:** When you describe a project, you must only mention details, technologies, and challenges that are explicitly listed for THAT SPECIFIC project in the Knowledge Base. | |
| 5. **NO LISTING SKILLS:** Do not end your answers with a long list of your skills. Integrate your key skills naturally into the sentences where you describe your projects. | |
| 6. **HANDLE OPINIONS & SUBJECTIVITY:** If asked about preferences, frame your answer around the experiences listed. Say something like, "I've found all my projects rewarding in their own way." Do NOT invent a favorite. | |
| 7. **GRACEFUL UNCERTAINTY:** If the information needed to answer a question is not in the Knowledge Base, respond naturally as Colin. Say something like, "That's not something I have the details on right now," or "I haven't had a chance to work on that specifically yet." This is the ONLY correct way to handle missing information. | |
| 8. **TONE & STYLE:** Your primary goal is a natural conversation. Keep responses concise and match the length of your answer to the user's question. Do not use any emojis or special characters. | |
| * **GOOD (for a simple greeting):** User: "Hi" -> You: "Hey, how's it going?" | |
| * **BAD (for a simple greeting):** User: "Hi" -> You: "Hi! I'm Colin, a Computing Science student..." | |
| Hereβs your Knowledge Base: | |
| {context} | |
| Based ONLY on the Knowledge Base above, and speaking as Colin, answer the following question. | |
| Remember, you are Colin. Your answer should be a direct, first-person response to the question. | |
| Question: {question}""" | |
| def _get_answer_sync(question: str, history: list = None): | |
| """The original synchronous function, renamed.""" | |
| try: | |
| # Create query embedding | |
| query_vector = embedding_llm.create_embedding(question)['data'][0]['embedding'] | |
| # Search Qdrant for context | |
| search_result = client.search( | |
| collection_name="data", | |
| query_vector=query_vector, | |
| limit=5 | |
| ) | |
| # Prepare context | |
| context_text = "\n\n".join([hit.payload['text'] for hit in search_result]) | |
| # Prepare messages for LLM | |
| messages = [] | |
| # Add system message with context | |
| system_prompt = template.format(context=context_text, question="{question}") | |
| messages.append({"role": "system", "content": system_prompt.format(question=question)}) | |
| # Add conversation history if available | |
| if history and len(history) > 0: | |
| messages.extend(history) | |
| # Add current question | |
| messages.append({"role": "user", "content": question}) | |
| # Get LLM response | |
| response = llm.create_chat_completion( | |
| messages=messages, | |
| stream=False | |
| ) | |
| answer_text = response['choices'][0]['message']['content'].strip() | |
| # Generate audio | |
| audio_path = synthesize_speech(answer_text) | |
| return answer_text, audio_path | |
| except Exception as e: | |
| print(f"Error processing question: {e}") | |
| return "Sorry, I encountered an error processing your request", None | |
| async def get_answer(question: str, history: list = None): | |
| """ | |
| Asynchronously run the synchronous LLM processing in a thread pool. | |
| """ | |
| async with llm_semaphore: | |
| return await run_in_threadpool(_get_answer_sync, question=question, history=history) | |
| ``` | |
| File: C:/Users/User/Desktop/colin-tts/backend/archive/data_creation.py | |
| ```python | |
| import json | |
| # Paste the multi-line CV text here | |
| cv_text_string = """Colin Salvatore Nardo | |
| Phone: 07784310064 | Email: colin.nardo@gmail.com | LinkedIn: www.linkedin.com/in/colin-salvatore-nardo | Website: nardocol.in | |
| Summary: | |
| Computing Science student with a passion for problem-solving and a strong foundation in software development. Currently pursuing a degree at the University of Glasgow while gaining hands-on experience in AI and programming projects. Committed to continuous learning and applying innovative technologies to real-world challenges. | |
| EXPERIENCE | |
| Minted Ice Cream - Team Member (Glasgow, Scotland) | |
| Dates: June 2023 - Present | |
| - Managed production process from milk pasteurization to flavour selection to meet high demand during peak hours, ensuring consistent product quality. | |
| Saint Storage - Storage Administrator (St. Andrews, Scotland) | |
| Dates: August 2023 β August 2024 | |
| - Assisted in optimizing logistics operations through the integration of Storage IQ, a web app built primarily on Ruby on Rails, enhancing warehouse efficiency and delivery accuracy. | |
| Premier Inn - Chef (St. Andrews, Scotland) | |
| Dates: September 2021 β June 2022 | |
| - Managed the kitchen staff to ensure the quality and timeliness of food orders, resulting in improved customer experience. | |
| Young Professionals - Student Intern (Virtual) | |
| Dates: June 2021 | |
| - Participated in a week-long virtual internship, engaging with industry leaders from IBM, Rolls Royce, Capgemini, and other major companies. | |
| - Developed an understanding of key industry trends and future work skills through sessions with PwC, EY, and CIMA. | |
| EDUCATION | |
| University of Glasgow - BSc in Computing Science | |
| Dates: September 2022 - August 2026 | |
| - Core courses included Object-Oriented Programming, Networks and Operating Systems, Algorithms and Data Structures and Web Application Development. | |
| - Developed strong skills in programming, algorithm development and general teamwork through hands-on assignments and projects. | |
| ACHIEVEMENTS & PROJECTS | |
| Course Content Mapping Web Application Development (Organization: Learning Innovation Support Unit at University of Glasgow) | |
| Dates: September 2024 - Present | |
| - Developing a course content mapping web app, incorporating Azure AD for secure login. | |
| - Designed a cloud-based SQL database for real-time management of course data, including learning hours and activities. | |
| - Contributing to full-stack development (React.js, Node.js/Express), with features like drag-and-drop functionality, real-time visualization, and collaborative access for academic staff. | |
| Glasgow University Artificial Intelligence Society President (Organization: Glasgow University Artificial Intelligence Society) | |
| Dates: May 2024 - Present | |
| - Lead and coordinate workshops and events focused on AI technologies, aimed at equipping students with practical skills. | |
| - Host talks with industry leaders and academic professionals to provide members with insights into AI applications and trends. | |
| - Manage collaborations with external organizations to provide students with networking opportunities and internships in AI-related fields. | |
| Personal Voice Clone Chatbot (CPU-Only Inference) | |
| Dates: April 2025 β October 2025 | |
| - Built a local chatbot leveraging quantized LLM models, enabling 24/7 deployment on small PCs and avoiding the need of GPUs while maintaining an acceptable response speed. | |
| - Employed a vector database (Qdrant) to store and retrieve context from embedded text chunks, ensuring relevant responses and minimal inference overhead. | |
| - Fine-tuned Piper TTS using over 200 personal voice recordings, achieving a realistic voice clone for fully offline text-to-speech responses and CPU friendly inference times. | |
| - Deployed via Cloudflare tunnels to handle dynamic IP routing, integrating a Qdrant vector database for context retrieval and robust performance. | |
| Glasgow University Tech Society Hackathon (Organization: Glasgow University Tech Society) | |
| Dates: Undated | |
| - Led the development of a multiplayer web game using Three.js, featuring planes navigating a 3D globe. | |
| - Created dynamic hazards (e.g., earthquakes and hurricanes) that players must avoid, integrating real-time gameplay elements. | |
| - Modelled all low-poly assets in Blender from scratch for a cohesive design. | |
| Django Web Application Development (Tennr) | |
| Dates: September 2023 β April 2024 | |
| - Collaborated on Tennr, a Django-based platform connecting service providers and customers for freelance opportunities. | |
| - Designed an account management system with distinct roles for buyers and creators, with personalized recommendations. | |
| - Implemented search functionality with filters for categories and price ranges and added a rating and commenting system. | |
| SKILLS | |
| Technical Skills: Python, Java, C, JavaScript, Ruby, Django, React, Three.js, Tensorflow, PyTorch, Node.js, Express.js, SQL, Azure AD, Qdrant, Piper TTS, Blender, HTML, CSS, Ruby on Rails | |
| Languages: English (native), Italian (native), French (fluent), Spanish (intermediate), Sicilian (native) | |
| CERTIFICATES & TRAINING | |
| - Open University M140 β Introducing Statistics | |
| - ZTM ThreeJS Bootcamp | |
| """ | |
| # Create the dictionary structure your code expects | |
| data_for_json_file = {"text": cv_text_string} | |
| # Write it to your data.json file | |
| # This will be a single line if you open the file, | |
| # because json.dump with default settings doesn't pretty-print for single-line compactness unless specified, | |
| # but more importantly, the content of "text" will be correctly formatted for your script. | |
| with open("data/data.json", "w") as f: | |
| json.dump(data_for_json_file, f, ensure_ascii=False, indent=None) # indent=None for compactness if desired | |
| print("data/data.json has been created/updated.") | |
| ``` | |
| File: C:/Users/User/Desktop/colin-tts/backend/archive/data.json | |
| ```json | |
| {"text": "Colin Salvatore Nardo\nPhone: 07784310064 | Email: colin.nardo@gmail.com | LinkedIn: www.linkedin.com/in/colin-salvatore-nardo | Website: nardocol.in\n\nSummary:\nComputing Science student with a passion for problem-solving and a strong foundation in software development. Currently pursuing a degree at the University of Glasgow while gaining hands-on experience in AI and programming projects. Committed to continuous learning and applying innovative technologies to real-world challenges.\n\nEXPERIENCE\n\nMinted Ice Cream - Team Member (Glasgow, Scotland)\nDates: June 2023 - Present\n- Managed production process from milk pasteurization to flavour selection to meet high demand during peak hours, ensuring consistent product quality.\n\nSaint Storage - Storage Administrator (St. Andrews, Scotland)\nDates: August 2023 - August 2024\n- Assisted in optimizing logistics operations through the integration of Storage IQ, a web app built primarily on Ruby on Rails, enhancing warehouse efficiency and delivery accuracy.\n\nPremier Inn - Chef (St. Andrews, Scotland)\nDates: September 2021 - June 2022\n- Managed the kitchen staff to ensure the quality and timeliness of food orders, resulting in improved customer experience.\n\nYoung Professionals - Student Intern (Virtual)\nDates: June 2021\n- Participated in a week-long virtual internship, engaging with industry leaders from IBM, Rolls Royce, Capgemini, and other major companies.\n- Developed an understanding of key industry trends and future work skills through sessions with PwC, EY, and CIMA.\n\nEDUCATION\n\nUniversity of Glasgow - BSc in Computing Science\nDates: September 2022 - August 2026\n- Core courses included Object-Oriented Programming, Networks and Operating Systems, Algorithms and Data Structures and Web Application Development.\n- Developed strong skills in programming, algorithm development and general teamwork through hands-on assignments and projects.\n\nACHIEVEMENTS & PROJECTS\n\nCourse Content Mapping Web Application Development (Organization: Learning Innovation Support Unit at University of Glasgow)\nDates: September 2024 - Present\n- Developing a course content mapping web app, incorporating Azure AD for secure login.\n- Designed a cloud-based SQL database for real-time management of course data, including learning hours and activities.\n- Contributing to full-stack development (React.js, Node.js/Express), with features like drag-and-drop functionality, real-time visualization, and collaborative access for academic staff.\n\nGlasgow University Artificial Intelligence Society President (Organization: Glasgow University Artificial Intelligence Society)\nDates: May 2024 - Present\n- Lead and coordinate workshops and events focused on AI technologies, aimed at equipping students with practical skills.\n- Host talks with industry leaders and academic professionals to provide members with insights into AI applications and trends.\n- Manage collaborations with external organizations to provide students with networking opportunities and internships in AI-related fields.\n\nPersonal Voice Clone Chatbot (CPU-Only Inference)\nDates: April 2025 - October 2025\n- Built a local chatbot leveraging quantized LLM models, enabling 24/7 deployment on small PCs and avoiding the need of GPUs while maintaining an acceptable response speed.\n- Employed a vector database (Qdrant) to store and retrieve context from embedded text chunks, ensuring relevant responses and minimal inference overhead.\n- Fine-tuned Piper TTS using over 200 personal voice recordings, achieving a realistic voice clone for fully offline text-to-speech responses and CPU friendly inference times.\n- Deployed via Cloudflare tunnels to handle dynamic IP routing, integrating a Qdrant vector database for context retrieval and robust performance.\n\nGlasgow University Tech Society Hackathon (Organization: Glasgow University Tech Society)\nDates: Undated\n- Led the development of a multiplayer web game using Three.js, featuring planes navigating a 3D globe.\n- Created dynamic hazards (e.g., earthquakes and hurricanes) that players must avoid, integrating real-time gameplay elements.\n- Modelled all low-poly assets in Blender from scratch for a cohesive design.\n\nDjango Web Application Development (Tennr)\nDates: September 2023 - April 2024\n- Collaborated on Tennr, a Django-based platform connecting service providers and customers for freelance opportunities.\n- Designed an account management system with distinct roles for buyers and creators, with personalized recommendations.\n- Implemented search functionality with filters for categories and price ranges and added a rating and commenting system.\n\nSKILLS\n\nTechnical Skills: Python, Java, C, JavaScript, Ruby, Django, React, Three.js, Tensorflow, PyTorch, Node.js, Express.js, SQL, Azure AD, Qdrant, Piper TTS, Blender, HTML, CSS, Ruby on Rails\n\nLanguages: English (native), Italian (native), French (fluent), Spanish (intermediate), Sicilian (native)\n\nCERTIFICATES & TRAINING\n\n- Open University M140 - Introducing Statistics\n- ZTM ThreeJS Bootcamp\n"} | |
| ``` | |
| File: C:/Users/User/Desktop/colin-tts/backend/structured_build_index.py | |
| ```python | |
| import uuid | |
| import json | |
| import time | |
| import llama_cpp | |
| from pathlib import Path | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.models import Distance, VectorParams, PointStruct | |
| from langchain_core.documents import Document | |
| from dataclasses import dataclass | |
| from utils import chunk | |
| # Define base directory for consistent path resolution | |
| BASE_DIR = Path(__file__).parent.parent | |
| # --- 1. LOAD THE NEW STRUCTURED FILE --- | |
| file = str(BASE_DIR / "backend/data/structured-cv.json") | |
| with open(file, "r") as f: | |
| data = json.load(f) | |
| # --- 2. THE HEART OF THE CHANGE: PARSE THE STRUCTURE INTO DOCUMENTS --- | |
| documents = [] | |
| # Process projects | |
| for project in data.get("projects", []): | |
| for contribution in project.get("contributions", []): | |
| doc = Document( | |
| page_content=contribution, | |
| metadata={ | |
| "source": "project", | |
| "title": project.get("title"), | |
| "dates": project.get("dates"), | |
| "technologies": ", ".join(project.get("technologies", [])) | |
| } | |
| ) | |
| documents.append(doc) | |
| if project.get("personal_learnings"): | |
| doc = Document( | |
| page_content=project.get("personal_learnings"), | |
| metadata={"source": "project_learnings", "title": project.get("title")} | |
| ) | |
| documents.append(doc) | |
| if project.get("challenges_faced"): | |
| doc = Document( | |
| page_content=project.get("challenges_faced"), | |
| metadata={"source": "project_challenges", "title": project.get("title")} | |
| ) | |
| documents.append(doc) | |
| # --- THIS IS THE CORRECTED SECTION --- | |
| # Process professional experience, handling both description and contributions | |
| for experience in data.get("experience", []): | |
| # Case 1: The experience has a single "description" string | |
| if "description" in experience and experience["description"]: | |
| doc = Document( | |
| page_content=experience.get("description"), | |
| metadata={ | |
| "source": "experience", | |
| "role": experience.get("role"), | |
| "company": experience.get("company"), | |
| "dates": experience.get("dates") | |
| } | |
| ) | |
| documents.append(doc) | |
| # Case 2: The experience has a list of "contributions" | |
| elif "contributions" in experience: | |
| for contribution in experience.get("contributions", []): | |
| doc = Document( | |
| page_content=contribution, | |
| metadata={ | |
| "source": "experience", | |
| "role": experience.get("role"), | |
| "company": experience.get("company"), | |
| "dates": experience.get("dates") | |
| } | |
| ) | |
| documents.append(doc) | |
| # Process the summary | |
| summary_doc = Document(page_content=data.get("summary"), metadata={"source": "summary"}) | |
| documents.append(summary_doc) | |
| print(f"Number of structured documents created: {len(documents)}") | |
| # --- 3. EMBED THE DOCUMENTS (No changes here) --- | |
| llm = llama_cpp.Llama( | |
| model_path=str(BASE_DIR / "models/mxbai-embed-large-v1-f16.gguf"), | |
| embedding=True, | |
| verbose=False | |
| ) | |
| batch_size = 100 | |
| documents_embeddings = [] | |
| batches = list(chunk(documents, batch_size)) | |
| start = time.time() | |
| for batch in batches: | |
| embeddings = llm.create_embedding([item.page_content for item in batch]) | |
| documents_embeddings.extend( | |
| [ | |
| (document, emb['embedding']) | |
| for document, emb in zip(batch, embeddings['data']) | |
| ] | |
| ) | |
| end = time.time() | |
| char_per_second = len(''.join([item.page_content for item in documents])) / (end - start) | |
| print(f"Time taken: {end - start:.2f} seconds / {char_per_second:,.2f} chars/sec") | |
| # --- 4. STORE IN QDRANT (Added clearer print statements) --- | |
| client = QdrantClient(path=str(BASE_DIR / "embeddings")) | |
| try: | |
| client.delete_collection(collection_name="data") | |
| print("Existing Qdrant collection 'data' cleared.") | |
| except Exception: | |
| print("No existing Qdrant collection to clear, starting fresh.") | |
| pass | |
| client.create_collection( | |
| collection_name="data", | |
| vectors_config=VectorParams(size=1024, distance=Distance.COSINE), | |
| ) | |
| points = [ | |
| PointStruct( | |
| id=str(uuid.uuid4()), | |
| vector=embed, | |
| payload={**doc.metadata, "text": doc.page_content} | |
| ) | |
| for doc, embed in documents_embeddings | |
| ] | |
| operation_info = client.upsert( | |
| collection_name="data", | |
| wait=True, | |
| points=points | |
| ) | |
| print("Index build complete. Your structured CV content is now stored in Qdrant.") | |
| ``` | |
| File: C:/Users/User/Desktop/colin-tts/backend/archive/build_index.py | |
| ```python | |
| import uuid | |
| import json | |
| import time | |
| import llama_cpp | |
| from pathlib import Path | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.models import Distance, VectorParams, PointStruct | |
| from langchain_core.documents import Document | |
| from dataclasses import dataclass | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from utils import chunk | |
| # Define base directory for consistent path resolution | |
| BASE_DIR = Path(__file__).parent.parent | |
| # Use absolute path for data file | |
| file = str(BASE_DIR / "backend/data/data.json") | |
| # Load the single JSON file containing the CV text | |
| with open(file, "r") as f: | |
| data = json.load(f) | |
| # Extract the text content | |
| text = data["text"] | |
| # Split the text into smaller chunks for embedding | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=300, | |
| chunk_overlap=50, | |
| length_function=len, | |
| is_separator_regex=False, | |
| ) | |
| documents = text_splitter.create_documents([text]) | |
| print(f"Number of document chunks: {len(documents)}") | |
| # Initialize the embedding model via llama.cpp | |
| llm = llama_cpp.Llama( | |
| model_path=str(BASE_DIR / "models/mxbai-embed-large-v1-f16.gguf"), | |
| embedding=True, | |
| verbose=False | |
| ) | |
| batch_size = 100 | |
| documents_embeddings = [] | |
| batches = list(chunk(documents, batch_size)) | |
| start = time.time() | |
| for batch in batches: | |
| # Get embeddings for each chunk in the batch | |
| embeddings = llm.create_embedding([item.page_content for item in batch]) | |
| documents_embeddings.extend( | |
| [ | |
| (document, emb['embedding']) | |
| for document, emb in zip(batch, embeddings['data']) | |
| ] | |
| ) | |
| end = time.time() | |
| char_per_second = len(''.join([item.page_content for item in documents])) / (end - start) | |
| print(f"Time taken: {end - start:.2f} seconds / {char_per_second:,.2f} chars/sec") | |
| # Initialize Qdrant (local, SQLite-based) and create/recreate collection | |
| # Use absolute path to ensure consistency with other files | |
| client = QdrantClient(path=str(BASE_DIR / "embeddings")) | |
| # Try to delete the collection if it exists | |
| try: | |
| client.delete_collection(collection_name="data") | |
| except Exception: | |
| pass # Collection might not exist yet, that's fine | |
| # Create fresh collection | |
| client.create_collection( | |
| collection_name="data", | |
| vectors_config=VectorParams(size=1024, distance=Distance.COSINE), | |
| ) | |
| # Insert embeddings and text into Qdrant | |
| points = [ | |
| PointStruct( | |
| id=str(uuid.uuid4()), | |
| vector=embed, | |
| payload={"text": doc.page_content} | |
| ) | |
| for doc, embed in documents_embeddings | |
| ] | |
| operation_info = client.upsert( | |
| collection_name="data", | |
| wait=True, | |
| points=points | |
| ) | |
| print("Index build complete. Your CV content is now stored in Qdrant with embeddings.") | |
| ``` | |
| File: C:/Users/User/Desktop/colin-tts/backend/auth.py | |
| ```python | |
| from fastapi import Request, HTTPException, Depends | |
| from fastapi.security import HTTPBasic, HTTPBasicCredentials | |
| import secrets | |
| import os | |
| from starlette.status import HTTP_401_UNAUTHORIZED | |
| from dotenv import load_dotenv | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Create a security instance for HTTP Basic Auth | |
| security = HTTPBasic() | |
| # Get admin credentials from environment variables | |
| ADMIN_USERNAME = os.getenv("ADMIN_USERNAME") | |
| ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD") | |
| # Check if credentials are set | |
| if not ADMIN_USERNAME or not ADMIN_PASSWORD: | |
| raise ValueError("ADMIN_USERNAME and ADMIN_PASSWORD must be set in the environment or a .env file") | |
| def verify_admin_credentials(credentials: HTTPBasicCredentials = Depends(security)): | |
| """Verify admin credentials for protected endpoints.""" | |
| is_correct_username = secrets.compare_digest(credentials.username, ADMIN_USERNAME) | |
| is_correct_password = secrets.compare_digest(credentials.password, ADMIN_PASSWORD) | |
| if not (is_correct_username and is_correct_password): | |
| raise HTTPException( | |
| status_code=HTTP_401_UNAUTHORIZED, | |
| detail="Invalid credentials", | |
| headers={"WWW-Authenticate": "Basic"}, | |
| ) | |
| return credentials.username | |
| ``` | |
| File: C:/Users/User/Desktop/colin-tts/backend/database.py | |
| ```python | |
| from sqlalchemy import create_engine | |
| from sqlalchemy.ext.declarative import declarative_base | |
| from sqlalchemy.orm import sessionmaker | |
| import os | |
| # Define database directory and file path | |
| DB_DIRECTORY = "data" | |
| DB_FILE = "conversations.db" | |
| DB_PATH = os.path.join(DB_DIRECTORY, DB_FILE) | |
| # Create the data directory if it doesn't exist | |
| os.makedirs(DB_DIRECTORY, exist_ok=True) | |
| # Create SQLite database engine | |
| SQLALCHEMY_DATABASE_URL = f"sqlite:///{DB_PATH}" | |
| engine = create_engine( | |
| SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False} | |
| ) | |
| # Create sessionmaker | |
| SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
| # Create base class for models | |
| Base = declarative_base() | |
| def get_db(): | |
| """Get database session.""" | |
| db = SessionLocal() | |
| try: | |
| yield db | |
| finally: | |
| db.close() | |
| ``` | |
| File: C:/Users/User/Desktop/colin-tts/backend/models.py | |
| ```python | |
| from sqlalchemy import Column, Integer, String, Text, DateTime | |
| from sqlalchemy.sql import func | |
| from backend.database import Base | |
| class Conversation(Base): | |
| """Model for storing conversation history.""" | |
| __tablename__ = "conversations" | |
| id = Column(Integer, primary_key=True, index=True) | |
| timestamp = Column(DateTime(timezone=True), server_default=func.now(), index=True) | |
| user_message = Column(Text, nullable=False) | |
| ai_response = Column(Text, nullable=False) | |
| tts_reference = Column(String, nullable=True) | |
| ``` | |
| File: C:/Users/User/Desktop/colin-tts/backend/utils.py | |
| ```python | |
| from itertools import islice | |
| def chunk(arr_range, chunk_size): | |
| arr_range = iter(arr_range) | |
| return iter(lambda: list(islice(arr_range, chunk_size)), []) | |
| ``` | |
| File: C:/Users/User/Desktop/colin-tts/run.txt | |
| ```text | |
| ``` | |
| </file_contents> | |