Spaces:
Sleeping
Sleeping
| """ | |
| Education Counselor Platform - Browser History Service | |
| Step 1: Students download their own browser history | |
| Step 2: Upload it for counseling context | |
| """ | |
| from fastapi import FastAPI, HTTPException, UploadFile, File, Form, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, StreamingResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from typing import Optional, List | |
| from datetime import datetime, timezone, timedelta | |
| from pathlib import Path | |
| import tempfile | |
| import shutil | |
| import sqlite3 | |
| import csv | |
| import os | |
| import uuid | |
| import platform | |
| from urllib.parse import urlparse, parse_qs | |
| import io | |
| import zipfile | |
| app = FastAPI( | |
| title="Education Counselor Browser History Service", | |
| description="Download your browser history, then upload for counseling", | |
| version="2.0.0" | |
| ) | |
| # CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Storage | |
| TEMP_DIR = Path("./temp_history_data") | |
| TEMP_DIR.mkdir(exist_ok=True) | |
| SESSIONS = {} | |
| def get_browser_paths(): | |
| """Get browser history paths based on OS.""" | |
| system = platform.system() | |
| home = Path.home() | |
| paths = {} | |
| if system == 'Linux': | |
| paths = { | |
| 'Chrome': home / '.config/google-chrome/Default/History', | |
| 'Chromium': home / '.config/chromium/Default/History', | |
| 'Brave': home / '.config/BraveSoftware/Brave-Browser/Default/History', | |
| 'Edge': home / '.config/microsoft-edge/Default/History', | |
| 'Firefox': home / '.mozilla/firefox' | |
| } | |
| elif system == 'Darwin': # macOS | |
| paths = { | |
| 'Chrome': home / 'Library/Application Support/Google/Chrome/Default/History', | |
| 'Chromium': home / 'Library/Application Support/Chromium/Default/History', | |
| 'Brave': home / 'Library/Application Support/BraveSoftware/Brave-Browser/Default/History', | |
| 'Edge': home / 'Library/Application Support/Microsoft Edge/Default/History', | |
| 'Firefox': home / 'Library/Application Support/Firefox/Profiles' | |
| } | |
| elif system == 'Windows': | |
| appdata = Path(os.getenv('LOCALAPPDATA', '')) | |
| appdata_roaming = Path(os.getenv('APPDATA', '')) | |
| paths = { | |
| 'Chrome': appdata / 'Google/Chrome/User Data/Default/History', | |
| 'Chromium': appdata / 'Chromium/User Data/Default/History', | |
| 'Brave': appdata / 'BraveSoftware/Brave-Browser/User Data/Default/History', | |
| 'Edge': appdata / 'Microsoft/Edge/User Data/Default/History', | |
| 'Firefox': appdata_roaming / 'Mozilla/Firefox/Profiles' | |
| } | |
| return paths | |
| def find_firefox_db(): | |
| """Find Firefox places.sqlite.""" | |
| paths = get_browser_paths() | |
| if 'Firefox' not in paths: | |
| return None | |
| firefox_dir = paths['Firefox'] | |
| if not firefox_dir.exists(): | |
| return None | |
| for profile_dir in firefox_dir.iterdir(): | |
| if profile_dir.is_dir() and 'default' in profile_dir.name.lower(): | |
| places_db = profile_dir / 'places.sqlite' | |
| if places_db.exists(): | |
| return places_db | |
| return None | |
| def chromium_timestamp_to_iso(timestamp: int) -> str: | |
| """Convert Chromium timestamp to ISO 8601.""" | |
| if timestamp == 0: | |
| return '' | |
| try: | |
| unix_timestamp = (timestamp - 11644473600000000) / 1000000 | |
| dt = datetime.fromtimestamp(unix_timestamp, tz=timezone.utc) | |
| return dt.isoformat() | |
| except (ValueError, OSError): | |
| return '' | |
| def firefox_timestamp_to_iso(timestamp: int) -> str: | |
| """Convert Firefox timestamp to ISO 8601.""" | |
| if timestamp == 0: | |
| return '' | |
| try: | |
| if timestamp > 10000000000000000: | |
| unix_timestamp = timestamp / 1000000 | |
| elif timestamp > 10000000000: | |
| unix_timestamp = timestamp / 1000 | |
| else: | |
| unix_timestamp = timestamp | |
| dt = datetime.fromtimestamp(unix_timestamp, tz=timezone.utc) | |
| return dt.isoformat() | |
| except (ValueError, OSError): | |
| return '' | |
| def is_search_url(url: str) -> bool: | |
| """Check if URL is a search engine query.""" | |
| try: | |
| parsed = urlparse(url.lower()) | |
| domain = parsed.netloc.replace('www.', '') | |
| if 'google' in domain and ('/search' in parsed.path or 'tbm=' in parsed.query): | |
| return 'q=' in parsed.query or 'query=' in parsed.query | |
| if 'bing.com' in domain and '/search' in parsed.path: | |
| return 'q=' in parsed.query | |
| if 'duckduckgo.com' in domain: | |
| return 'q=' in parsed.query | |
| if 'yahoo.com' in domain and '/search' in parsed.path: | |
| return 'p=' in parsed.query | |
| return False | |
| except: | |
| return False | |
| def extract_search_query(url: str) -> str: | |
| """Extract search query from URL.""" | |
| try: | |
| parsed = urlparse(url) | |
| params = parse_qs(parsed.query) | |
| for param in ['q', 'p', 'query']: | |
| if param in params and params[param]: | |
| return params[param][0] | |
| return '' | |
| except: | |
| return '' | |
| def is_educational_domain(url: str) -> bool: | |
| """Check if domain is educational.""" | |
| educational_keywords = [ | |
| 'edu', 'coursera', 'udemy', 'khan', 'edx', 'youtube.com/watch', | |
| 'stackoverflow', 'github', 'medium', 'wikipedia', 'scholar', | |
| 'arxiv', 'researchgate', 'quora', 'reddit.com/r/learn', | |
| 'tutorial', 'learn', 'course', 'lecture', 'study' | |
| ] | |
| url_lower = url.lower() | |
| return any(keyword in url_lower for keyword in educational_keywords) | |
| def export_chromium_to_csv(db_path: Path, days_back: int = 7) -> str: | |
| """Export Chromium history to CSV string.""" | |
| output = io.StringIO() | |
| writer = csv.writer(output) | |
| writer.writerow(['URL', 'Title', 'Visit Count', 'Last Visit Time', 'Search Query', 'Is Educational']) | |
| try: | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_back) | |
| unix_ts = cutoff_date.timestamp() | |
| chromium_ts = int((unix_ts + 11644473600) * 1000000) | |
| query = """ | |
| SELECT url, title, visit_count, last_visit_time | |
| FROM urls | |
| WHERE last_visit_time >= ? | |
| ORDER BY last_visit_time DESC | |
| """ | |
| cursor.execute(query, (chromium_ts,)) | |
| for row in cursor.fetchall(): | |
| url, title, visit_count, last_visit_time = row | |
| search_query = extract_search_query(url) if is_search_url(url) else '' | |
| is_edu = is_educational_domain(url) | |
| writer.writerow([ | |
| url, | |
| title or '', | |
| visit_count or 0, | |
| chromium_timestamp_to_iso(last_visit_time), | |
| search_query, | |
| is_edu | |
| ]) | |
| conn.close() | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Error reading database: {str(e)}") | |
| return output.getvalue() | |
| def export_firefox_to_csv(db_path: Path, days_back: int = 7) -> str: | |
| """Export Firefox history to CSV string.""" | |
| output = io.StringIO() | |
| writer = csv.writer(output) | |
| writer.writerow(['URL', 'Title', 'Visit Count', 'Last Visit Time', 'Search Query', 'Is Educational']) | |
| try: | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| cutoff_date = datetime.now(timezone.utc) - timedelta(days=days_back) | |
| firefox_ts = int(cutoff_date.timestamp() * 1000000) | |
| cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='moz_historyvisits'") | |
| has_history_visits = cursor.fetchone() is not None | |
| if has_history_visits: | |
| query = """ | |
| SELECT DISTINCT p.url, p.title, p.visit_count, | |
| MAX(h.visit_date) as last_visit_date | |
| FROM moz_places p | |
| LEFT JOIN moz_historyvisits h ON p.id = h.place_id | |
| WHERE p.url IS NOT NULL AND h.visit_date >= ? | |
| GROUP BY p.id | |
| ORDER BY last_visit_date DESC | |
| """ | |
| else: | |
| query = """ | |
| SELECT url, title, visit_count, last_visit_date | |
| FROM moz_places | |
| WHERE url IS NOT NULL AND last_visit_date >= ? | |
| ORDER BY last_visit_date DESC | |
| """ | |
| cursor.execute(query, (firefox_ts,)) | |
| for row in cursor.fetchall(): | |
| url, title, visit_count, last_visit_date = row | |
| search_query = extract_search_query(url) if is_search_url(url) else '' | |
| is_edu = is_educational_domain(url) | |
| writer.writerow([ | |
| url, | |
| title or '', | |
| visit_count or 0, | |
| firefox_timestamp_to_iso(last_visit_date) if last_visit_date else '', | |
| search_query, | |
| is_edu | |
| ]) | |
| conn.close() | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Error reading Firefox database: {str(e)}") | |
| return output.getvalue() | |
| def categorize_interests(rows: List[dict]) -> dict: | |
| """Categorize student interests.""" | |
| categories = { | |
| 'programming': ['python', 'java', 'javascript', 'coding', 'programming', 'algorithm'], | |
| 'data_science': ['data science', 'machine learning', 'ai', 'deep learning'], | |
| 'web_development': ['html', 'css', 'react', 'angular', 'web development'], | |
| 'mathematics': ['math', 'calculus', 'algebra', 'statistics', 'probability'], | |
| 'science': ['physics', 'chemistry', 'biology', 'science'], | |
| 'business': ['business', 'management', 'marketing', 'finance'], | |
| 'design': ['design', 'ui', 'ux', 'graphic'], | |
| 'career': ['job', 'career', 'interview', 'resume', 'salary'], | |
| 'exam_prep': ['exam', 'test', 'preparation', 'gate', 'jee', 'neet'] | |
| } | |
| interest_counts = {cat: 0 for cat in categories} | |
| all_text = ' '.join([ | |
| row.get('URL', '') + ' ' + | |
| row.get('Title', '') + ' ' + | |
| row.get('Search Query', '') | |
| for row in rows | |
| ]).lower() | |
| for category, keywords in categories.items(): | |
| for keyword in keywords: | |
| interest_counts[category] += all_text.count(keyword) | |
| top_interests = sorted(interest_counts.items(), key=lambda x: x[1], reverse=True) | |
| return [cat.replace('_', ' ').title() for cat, count in top_interests[:5] if count > 0] | |
| def analyze_csv(csv_content: str) -> dict: | |
| """Analyze CSV and generate counseling context.""" | |
| rows = [] | |
| reader = csv.DictReader(io.StringIO(csv_content)) | |
| for row in reader: | |
| rows.append(row) | |
| search_queries = [row['Search Query'] for row in rows if row.get('Search Query')] | |
| educational_visits = [row for row in rows if row.get('Is Educational') == 'True'] | |
| educational_domains = [] | |
| for row in educational_visits[:20]: | |
| try: | |
| domain = urlparse(row['URL']).netloc.replace('www.', '') | |
| if domain not in educational_domains: | |
| educational_domains.append(domain) | |
| except: | |
| pass | |
| top_interests = categorize_interests(rows) | |
| return { | |
| 'total_visits': len(rows), | |
| 'search_queries_count': len(search_queries), | |
| 'educational_visits': len(educational_visits), | |
| 'search_queries': search_queries[:20], | |
| 'educational_domains': educational_domains[:10], | |
| 'top_interests': top_interests, | |
| 'study_topics': list(set(search_queries[:15])) | |
| } | |
| async def root(): | |
| """API documentation.""" | |
| return { | |
| "service": "Education Counselor Browser History Service", | |
| "version": "2.0.0", | |
| "workflow": { | |
| "step_1": "Download your browser history: GET /download/my-history", | |
| "step_2": "Upload for counseling: POST /upload/for-counseling" | |
| }, | |
| "endpoints": { | |
| "/download/my-history": "Download your own browser history as CSV", | |
| "/upload/for-counseling": "Upload history CSV for counseling context", | |
| "/context/{session_id}": "Get counseling context for a session" | |
| } | |
| } | |
| async def download_my_history( | |
| days_back: int = 7, | |
| browser: Optional[str] = None | |
| ): | |
| """ | |
| Download your own browser history as CSV. | |
| This endpoint reads your local browser database and returns CSV. | |
| Query params: | |
| - days_back: Number of days to export (default: 7) | |
| - browser: chrome, firefox, brave, edge (optional, auto-detects if not specified) | |
| """ | |
| browser_paths = get_browser_paths() | |
| # Try to find browser automatically | |
| csv_content = None | |
| detected_browser = None | |
| if browser: | |
| browser = browser.lower() | |
| if browser == 'firefox': | |
| firefox_db = find_firefox_db() | |
| if firefox_db and firefox_db.exists(): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as tmp: | |
| shutil.copy2(firefox_db, tmp.name) | |
| tmp_path = Path(tmp.name) | |
| try: | |
| csv_content = export_firefox_to_csv(tmp_path, days_back) | |
| detected_browser = 'Firefox' | |
| finally: | |
| try: | |
| os.unlink(tmp_path) | |
| except: | |
| pass | |
| else: | |
| # Chromium-based | |
| browser_map = { | |
| 'chrome': 'Chrome', | |
| 'chromium': 'Chromium', | |
| 'brave': 'Brave', | |
| 'edge': 'Edge' | |
| } | |
| browser_key = browser_map.get(browser) | |
| if browser_key and browser_key in browser_paths: | |
| db_path = browser_paths[browser_key] | |
| if db_path.exists(): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as tmp: | |
| shutil.copy2(db_path, tmp.name) | |
| tmp_path = Path(tmp.name) | |
| try: | |
| csv_content = export_chromium_to_csv(tmp_path, days_back) | |
| detected_browser = browser_key | |
| finally: | |
| try: | |
| os.unlink(tmp_path) | |
| except: | |
| pass | |
| else: | |
| # Auto-detect browser | |
| for browser_name, db_path in browser_paths.items(): | |
| if browser_name == 'Firefox': | |
| firefox_db = find_firefox_db() | |
| if firefox_db and firefox_db.exists(): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as tmp: | |
| shutil.copy2(firefox_db, tmp.name) | |
| tmp_path = Path(tmp.name) | |
| try: | |
| csv_content = export_firefox_to_csv(tmp_path, days_back) | |
| detected_browser = 'Firefox' | |
| break | |
| finally: | |
| try: | |
| os.unlink(tmp_path) | |
| except: | |
| pass | |
| elif db_path.exists(): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.db') as tmp: | |
| shutil.copy2(db_path, tmp.name) | |
| tmp_path = Path(tmp.name) | |
| try: | |
| csv_content = export_chromium_to_csv(tmp_path, days_back) | |
| detected_browser = browser_name | |
| break | |
| finally: | |
| try: | |
| os.unlink(tmp_path) | |
| except: | |
| pass | |
| if not csv_content: | |
| raise HTTPException( | |
| status_code=404, | |
| detail="No browser history found. Make sure your browser is installed and has browsing history." | |
| ) | |
| # Return as downloadable CSV | |
| filename = f"my_browser_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
| return StreamingResponse( | |
| io.StringIO(csv_content), | |
| media_type="text/csv", | |
| headers={ | |
| "Content-Disposition": f"attachment; filename={filename}", | |
| "X-Detected-Browser": detected_browser | |
| } | |
| ) | |
| async def upload_for_counseling( | |
| student_id: str = Form(...), | |
| history_file: UploadFile = File(...), | |
| background_tasks: BackgroundTasks = None | |
| ): | |
| """ | |
| Upload your browser history CSV for counseling context. | |
| Form data: | |
| - student_id: Your student ID or email | |
| - history_file: The CSV file downloaded from /download/my-history | |
| """ | |
| # Read uploaded CSV | |
| content = await history_file.read() | |
| csv_content = content.decode('utf-8') | |
| # Generate session ID | |
| session_id = str(uuid.uuid4()) | |
| # Save CSV to temp folder | |
| csv_filename = f"{session_id}_{student_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
| csv_path = TEMP_DIR / csv_filename | |
| with open(csv_path, 'w', encoding='utf-8') as f: | |
| f.write(csv_content) | |
| # Analyze CSV | |
| try: | |
| context = analyze_csv(csv_content) | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Error analyzing CSV: {str(e)}") | |
| # Store session | |
| SESSIONS[session_id] = { | |
| 'student_id': student_id, | |
| 'csv_path': str(csv_path), | |
| 'created_at': datetime.now().isoformat(), | |
| 'context': context | |
| } | |
| # Cleanup old files | |
| if background_tasks: | |
| background_tasks.add_task(cleanup_old_files) | |
| return { | |
| 'status': 'success', | |
| 'session_id': session_id, | |
| 'student_id': student_id, | |
| 'counseling_context': context, | |
| 'message': 'History uploaded successfully. Share this session_id with your counselor.' | |
| } | |
| async def get_counseling_context(session_id: str): | |
| """ | |
| Get counseling context for a session. | |
| Counselors use this endpoint to understand student's interests. | |
| """ | |
| if session_id not in SESSIONS: | |
| raise HTTPException(status_code=404, detail="Session not found or expired") | |
| session = SESSIONS[session_id] | |
| return { | |
| 'session_id': session_id, | |
| 'student_id': session['student_id'], | |
| 'created_at': session['created_at'], | |
| 'context': session['context'] | |
| } | |
| async def delete_session(session_id: str): | |
| """Delete session data for privacy.""" | |
| if session_id not in SESSIONS: | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| session = SESSIONS[session_id] | |
| # Delete CSV | |
| try: | |
| csv_path = Path(session['csv_path']) | |
| if csv_path.exists(): | |
| csv_path.unlink() | |
| except Exception as e: | |
| print(f"Error deleting CSV: {e}") | |
| del SESSIONS[session_id] | |
| return {'status': 'deleted', 'session_id': session_id} | |
| def cleanup_old_files(): | |
| """Remove files older than 24 hours.""" | |
| try: | |
| now = datetime.now() | |
| for file in TEMP_DIR.glob("*.csv"): | |
| file_time = datetime.fromtimestamp(file.stat().st_mtime) | |
| if (now - file_time).days >= 1: | |
| file.unlink() | |
| except Exception as e: | |
| print(f"Cleanup error: {e}") | |
| async def health_check(): | |
| """Health check.""" | |
| return { | |
| "status": "healthy", | |
| "sessions": len(SESSIONS), | |
| "temp_files": len(list(TEMP_DIR.glob("*.csv"))) | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |