Spaces:
Sleeping
Sleeping
| # File: backend/main.py | |
| import os | |
| import re | |
| import json | |
| os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' | |
| os.environ['OAUTHLIB_RELAX_TOKEN_SCOPE'] = '1' | |
| from fastapi import FastAPI, Depends, HTTPException, status | |
| from fastapi.security import OAuth2PasswordRequestForm, OAuth2PasswordBearer | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from sqlalchemy.orm import Session | |
| from sqlalchemy import func | |
| from typing import Annotated, List, Optional | |
| from datetime import datetime, timedelta | |
| from pydantic import BaseModel | |
| from . import models, schemas, database, auth, email_utils, google_utils, dropbox_utils, cloud_utils | |
| from .ml_utils import predict_category, extract_course_code, normalize_course_code | |
| from .gemini_utils import process_natural_language_sort | |
| app = FastAPI(title="DocuSort API") | |
| origins = ["http://localhost:5173", "http://localhost:3000"] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| models.Base.metadata.create_all(bind=database.engine) | |
| oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login") | |
| # --- HELPER: AUDIT LOGGER --- | |
| def create_log(db: Session, email: str, action: str, details: str, undo_payload: str = None): | |
| log = models.UserLog(user_email=email, action=action, details=details, timestamp=datetime.utcnow(), undo_payload=undo_payload) | |
| db.add(log) | |
| db.commit() | |
| # --- AUTH DEPENDENCIES --- | |
| def get_current_user(token: Annotated[str, Depends(oauth2_scheme)], db: Session = Depends(database.get_db)): | |
| credentials_exception = HTTPException(status_code=401, detail="Invalid credentials", headers={"WWW-Authenticate": "Bearer"}) | |
| payload = auth.verify_token(token, credentials_exception) | |
| email: str = payload.get("sub") | |
| user = db.query(models.User).filter(models.User.email == email).first() | |
| if user is None: raise credentials_exception | |
| is_blacklisted = db.query(models.BlacklistedEmail).filter(models.BlacklistedEmail.email == email).first() | |
| if is_blacklisted or (not user.is_active and user.otp_code is None): | |
| raise HTTPException(status_code=403, detail="Account banned.") | |
| return user | |
| def get_unrestricted_user(current_user: Annotated[models.User, Depends(get_current_user)]): | |
| if current_user.is_frozen: | |
| raise HTTPException(status_code=423, detail="Account frozen. Please submit an appeal.") | |
| return current_user | |
| def get_admin_user(current_user: Annotated[models.User, Depends(get_current_user)]): | |
| if current_user.role != "admin": raise HTTPException(status_code=403, detail="Admin privileges required") | |
| return current_user | |
| # ======================= | |
| # RESOLVE REPORT & SHARING | |
| # ======================= | |
| async def resolve_report(report_id: int, data: schemas.ResolveReportRequest, current_user: Annotated[models.User, Depends(get_admin_user)], db: Session = Depends(database.get_db)): | |
| report = db.query(models.UserReport).filter(models.UserReport.id == report_id).first() | |
| if not report: raise HTTPException(status_code=404, detail="Not found") | |
| report.status = "resolved" | |
| report.admin_response = data.response | |
| target_email = report.reporter.email if report.reporter else report.reporter_email_fallback | |
| if target_email: await email_utils.send_alert_email(target_email, "Support Response: Resolved", f"An administrator has reviewed your report/appeal and provided the following response:<br><br><b>{data.response}</b>") | |
| if report.reporter_id: db.add(models.Notification(user_id=report.reporter_id, message=f"Support Update ({report.type}): {data.response}")) | |
| db.commit() | |
| create_log(db, current_user.email, "REPORT_RESOLVED", f"Resolved {report.type}") | |
| return {"message": "Resolved"} | |
| def respond_share(data: schemas.RespondToShare, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| req = db.query(models.ShareRequest).filter(models.ShareRequest.id == data.request_id, models.ShareRequest.receiver_email == current_user.email, models.ShareRequest.status == "pending").first() | |
| if not req: raise HTTPException(status_code=404) | |
| if data.accept: | |
| req.status = "accepted" | |
| db.add(models.SharedAccess(user_id=current_user.id, folder_id=req.folder_id, access_level="view", expires_at=req.expires_at)) | |
| db.add(models.Notification(user_id=req.sender_id, message=f"{current_user.email} accepted your invite to '{req.folder.folder_name}'.")) | |
| create_log(db, current_user.email, "SHARE_ACCEPT", f"Accepted {req.folder.folder_name}") | |
| else: | |
| req.status = "declined" | |
| db.add(models.Notification(user_id=req.sender_id, message=f"{current_user.email} declined your invite to '{req.folder.folder_name}'.")) | |
| db.commit() | |
| return {"message": "Done"} | |
| def revoke(access_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| acc = db.query(models.SharedAccess).filter(models.SharedAccess.id == access_id).first() | |
| if not acc: raise HTTPException(status_code=404) | |
| is_owner = acc.folder.drive.user_id == current_user.id | |
| is_recipient = acc.user_id == current_user.id | |
| if not (is_owner or is_recipient): raise HTTPException(status_code=403) | |
| if is_owner: db.add(models.Notification(user_id=acc.user_id, message=f"Your access to '{acc.folder.folder_name}' was revoked by the owner.")) | |
| if is_recipient: db.add(models.Notification(user_id=acc.folder.drive.user_id, message=f"{current_user.email} left the shared folder '{acc.folder.folder_name}'.")) | |
| db.delete(acc); db.commit() | |
| create_log(db, current_user.email, "SHARE_REVOKE", f"Revoked {access_id}") | |
| return {"message": "Revoked"} | |
| def list_shares(current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| shares = db.query(models.SharedAccess).filter(models.SharedAccess.user_id == current_user.id).all() | |
| res = [] | |
| now = datetime.utcnow() | |
| for s in shares: | |
| if s.expires_at and s.expires_at < now: | |
| db.add(models.Notification(user_id=s.user_id, message=f"Access to '{s.folder.folder_name}' has expired.")) | |
| db.add(models.Notification(user_id=s.folder.drive.user_id, message=f"{s.user.email}'s access to '{s.folder.folder_name}' has expired.")) | |
| db.delete(s) | |
| continue | |
| total_files = len(s.folder.files) | |
| sorted_files = sum(1 for f in s.folder.files if getattr(f, 'category', 'Unsorted') != 'Unsorted') | |
| res.append({ | |
| "id": s.folder.id, | |
| "access_id": s.id, | |
| "folder_name": s.folder.folder_name, | |
| "owner_email": s.folder.drive.owner.email, | |
| "access_level": s.access_level, | |
| "expires_at": s.expires_at, | |
| "total_files": total_files, | |
| "sorted_files": sorted_files | |
| }) | |
| db.commit() | |
| return res | |
| # ======================= | |
| # STANDARD ROUTES | |
| # ======================= | |
| def get_my_logs(current_user: Annotated[models.User, Depends(get_current_user)], db: Session = Depends(database.get_db), search: str = "", date: str = None): | |
| query = db.query(models.UserLog).filter(models.UserLog.user_email == current_user.email) | |
| if search: query = query.filter(models.UserLog.action.ilike(f"%{search}%") | models.UserLog.details.ilike(f"%{search}%")) | |
| if date: | |
| try: d = datetime.strptime(date, "%Y-%m-%d"); query = query.filter(models.UserLog.timestamp >= d, models.UserLog.timestamp < d + timedelta(days=1)) | |
| except: pass | |
| return query.order_by(models.UserLog.timestamp.desc()).limit(100).all() | |
| def get_admin_logs(current_user: Annotated[models.User, Depends(get_admin_user)], db: Session = Depends(database.get_db), search: str = "", date: str = None): | |
| query = db.query(models.UserLog) | |
| if search: query = query.filter(models.UserLog.user_email.ilike(f"%{search}%") | models.UserLog.action.ilike(f"%{search}%") | models.UserLog.details.ilike(f"%{search}%")) | |
| if date: | |
| try: d = datetime.strptime(date, "%Y-%m-%d"); query = query.filter(models.UserLog.timestamp >= d, models.UserLog.timestamp < d + timedelta(days=1)) | |
| except: pass | |
| return query.order_by(models.UserLog.timestamp.desc()).limit(200).all() | |
| def get_pending_invites(current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| reqs = db.query(models.ShareRequest).filter(models.ShareRequest.receiver_email == current_user.email, models.ShareRequest.status == "pending").all() | |
| return [{"id": r.id, "sender_email": r.sender.email, "folder_name": r.folder.folder_name, "status": r.status, "created_at": r.created_at, "expires_at": r.expires_at} for r in reqs] | |
| def get_notifications(current_user: Annotated[models.User, Depends(get_current_user)], db: Session = Depends(database.get_db)): | |
| return db.query(models.Notification).filter(models.Notification.user_id == current_user.id).order_by(models.Notification.created_at.desc()).all() | |
| def mark_read(notif_id: int, current_user: Annotated[models.User, Depends(get_current_user)], db: Session = Depends(database.get_db)): | |
| n = db.query(models.Notification).filter(models.Notification.id == notif_id, models.Notification.user_id == current_user.id).first() | |
| if n: n.is_read = True; db.commit() | |
| return {"message": "Read"} | |
| # ========================================== | |
| # CUSTOM RULES | |
| # ========================================== | |
| def create_rule(rule: schemas.CustomRuleBase, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| new_rule = models.CustomRule(user_id=current_user.id, category_name=rule.category_name, keywords=rule.keywords) | |
| db.add(new_rule); db.commit(); db.refresh(new_rule) | |
| return new_rule | |
| def get_rules(current_user: Annotated[models.User, Depends(get_current_user)], db: Session = Depends(database.get_db)): | |
| return db.query(models.CustomRule).filter(models.CustomRule.user_id == current_user.id).all() | |
| def delete_rule(rule_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| rule = db.query(models.CustomRule).filter(models.CustomRule.id == rule_id, models.CustomRule.user_id == current_user.id).first() | |
| if rule: db.delete(rule); db.commit() | |
| return {"message": "Deleted"} | |
| # ========================================== | |
| # NEW: NATURAL LANGUAGE SORTING & TEMPLATES | |
| # ========================================== | |
| def create_prompt_template(template: schemas.PromptTemplateCreate, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| new_temp = models.PromptTemplate(user_id=current_user.id, name=template.name, prompt_text=template.prompt_text) | |
| db.add(new_temp); db.commit(); db.refresh(new_temp) | |
| return new_temp | |
| def get_prompt_templates(current_user: Annotated[models.User, Depends(get_current_user)], db: Session = Depends(database.get_db)): | |
| return db.query(models.PromptTemplate).filter(models.PromptTemplate.user_id == current_user.id).all() | |
| def delete_prompt_template(template_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| temp = db.query(models.PromptTemplate).filter(models.PromptTemplate.id == template_id, models.PromptTemplate.user_id == current_user.id).first() | |
| if temp: db.delete(temp); db.commit() | |
| return {"message": "Deleted template"} | |
| def create_prompt_template(template: schemas.PromptTemplateCreate, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| new_temp = models.PromptTemplate(user_id=current_user.id, name=template.name, prompt_text=template.prompt_text) | |
| db.add(new_temp); db.commit(); db.refresh(new_temp) | |
| return new_temp | |
| def get_prompt_templates(current_user: Annotated[models.User, Depends(get_current_user)], db: Session = Depends(database.get_db)): | |
| return db.query(models.PromptTemplate).filter(models.PromptTemplate.user_id == current_user.id).all() | |
| def delete_prompt_template(template_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| temp = db.query(models.PromptTemplate).filter(models.PromptTemplate.id == template_id, models.PromptTemplate.user_id == current_user.id).first() | |
| if temp: db.delete(temp); db.commit() | |
| return {"message": "Deleted template"} | |
| def smart_prompt_sort_folder(folder_db_id: int, payload: schemas.NaturalLanguageSortRequest, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| """Passes files to Gemini and returns a DRY RUN preview array to the UI.""" | |
| f = db.query(models.IndexedFolder).filter(models.IndexedFolder.id == folder_db_id).first() | |
| if not f or f.drive.user_id != current_user.id: raise HTTPException(status_code=403) | |
| if getattr(f, 'is_sorting', False): raise HTTPException(status_code=400, detail="Folder is currently sorting.") | |
| unresolved_files = [file for file in f.files if not getattr(file, 'is_locked', False) and file.category == "Unsorted"] | |
| if not unresolved_files: return {"message": "No unsorted files available for custom prompt execution.", "total": 0} | |
| # --- FETCH EXISTING TAGS FOR GEMINI CONTEXT --- | |
| all_files = db.query(models.FileMetadata).join(models.IndexedFolder).join(models.ConnectedDrive).filter(models.ConnectedDrive.user_id == current_user.id).all() | |
| existing_courses = list(set([file.course_code for file in all_files if file.course_code and file.course_code != "General"])) | |
| existing_cats = list(set([file.category for file in all_files if file.category and file.category != "Unsorted"])) | |
| f.is_sorting = True | |
| db.commit() | |
| try: | |
| from .gemini_utils import process_natural_language_sort | |
| # Pass the context into Gemini! | |
| ai_response = process_natural_language_sort(unresolved_files, payload.prompt_text, existing_courses, existing_cats) | |
| sorted_mappings = ai_response.get("sorted_files", []) | |
| preview = [] | |
| for item in sorted_mappings: | |
| file_id = item.get("file_id") | |
| custom_path = item.get("custom_path", "Unsorted").strip() | |
| if custom_path == "Unsorted": continue | |
| db_file = db.query(models.FileMetadata).filter(models.FileMetadata.id == file_id, models.FileMetadata.folder_id == folder_db_id).first() | |
| if db_file: | |
| path_parts = [part.strip() for part in custom_path.split('/') if part.strip()] | |
| if path_parts: | |
| # FIX: Correctly maps single words if Gemini forgets the slash | |
| if len(path_parts) == 1: | |
| course_code = "General" | |
| cat_name = path_parts[0] | |
| else: | |
| course_code = path_parts[0] | |
| cat_name = "/".join(path_parts[1:]) | |
| # --- NEW: REGEX FALLBACK --- | |
| # If Gemini defaulted to General, double check with Regex! | |
| if course_code == "General": | |
| extracted = extract_course_code(db_file.name) | |
| if extracted and extracted != "General": | |
| course_code = extracted | |
| new_name = f"{course_code}_{cat_name} - {db_file.name}" | |
| new_path = f"{course_code} / {cat_name}" | |
| preview.append({ | |
| "file_id": db_file.id, | |
| "old_name": db_file.name, | |
| "new_name": new_name, | |
| "old_path": getattr(db_file, 'original_folder_name', f.folder_name), | |
| "new_path": new_path | |
| }) | |
| return {"files": preview, "total": len(preview)} | |
| finally: | |
| f.is_sorting = False | |
| db.commit() | |
| class BulkMoveItem(BaseModel): | |
| file_id: int | |
| new_path: str | |
| def apply_bulk_moves(payload: List[BulkMoveItem], current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| """Physically moves edited files, handles cross-drive sorting, saves Undo state, and triggers Notifications.""" | |
| moved_count = 0 | |
| undo_data = [] | |
| folder_cache = {} | |
| google_services = {} | |
| dropbox_clients = {} | |
| for item in payload: | |
| file = db.query(models.FileMetadata).filter(models.FileMetadata.id == item.file_id).first() | |
| if not file or file.folder.drive.user_id != current_user.id: continue | |
| d = file.folder.drive | |
| parts = [p.strip() for p in item.new_path.split('/') if p.strip()] | |
| # FIX: Ensure it always has a valid Course and Category | |
| if len(parts) == 1: | |
| course_code = "General" | |
| cat_name = parts[0] | |
| elif len(parts) > 1: | |
| course_code = parts[0] | |
| cat_name = "/".join(parts[1:]) | |
| else: | |
| continue | |
| if cat_name == "Unsorted": continue | |
| new_file_name = f"{course_code}_{cat_name} - {file.name}" | |
| db_folder_name = f"{course_code} / {cat_name}" | |
| undo_data.append({ | |
| "file_db_id": file.id, | |
| "original_cloud_id": file.cloud_id, | |
| "original_name": file.name, | |
| "original_db_folder_id": file.folder_id, | |
| "original_cloud_folder_id": file.folder.folder_id | |
| }) | |
| if d.provider == "google": | |
| if d.id not in google_services: | |
| google_services[d.id] = google_utils.get_drive_service(d.access_token, d.refresh_token, "https://oauth2.googleapis.com/token", google_utils.get_client_id_from_file(), google_utils.get_client_secret_from_file()) | |
| service = google_services[d.id] | |
| course_cache_key = f"{d.id}_{course_code}" | |
| if course_cache_key not in folder_cache: | |
| folder_cache[course_cache_key] = cloud_utils.create_google_folder(service, course_code) | |
| course_folder_id = folder_cache[course_cache_key] | |
| folder_cache_key = f"{d.id}_{db_folder_name}" | |
| if folder_cache_key not in folder_cache: | |
| cloud_subfolder_id = cloud_utils.create_google_folder(service, cat_name, parent_id=course_folder_id) | |
| db_folder = db.query(models.IndexedFolder).filter(models.IndexedFolder.drive_id == d.id, models.IndexedFolder.folder_id == cloud_subfolder_id).first() | |
| if not db_folder: | |
| db_folder = models.IndexedFolder(drive_id=d.id, folder_id=cloud_subfolder_id, folder_name=db_folder_name, last_synced=datetime.utcnow()) | |
| db.add(db_folder); db.commit(); db.refresh(db_folder) | |
| folder_cache[folder_cache_key] = { "cloud_id": cloud_subfolder_id, "db_id": db_folder.id } | |
| if cloud_utils.move_google_file(service, file.cloud_id, folder_cache[folder_cache_key]["cloud_id"], new_name=new_file_name): | |
| file.name = new_file_name; file.is_sorted = True; file.is_locked = True | |
| file.course_code = course_code; file.category = cat_name | |
| file.folder_id = folder_cache[folder_cache_key]["db_id"] | |
| moved_count += 1 | |
| elif d.provider == "dropbox": | |
| if d.id not in dropbox_clients: | |
| dropbox_clients[d.id] = dropbox_utils.get_dropbox_client(d.access_token) | |
| dbx = dropbox_clients[d.id] | |
| full_cloud_path = f"/{course_code}/{cat_name}" | |
| folder_cache_key = f"{d.id}_{db_folder_name}" | |
| if folder_cache_key not in folder_cache: | |
| cloud_utils.create_dropbox_folder(dbx, course_code) | |
| cloud_utils.create_dropbox_folder(dbx, full_cloud_path) | |
| db_folder = db.query(models.IndexedFolder).filter(models.IndexedFolder.drive_id == d.id, models.IndexedFolder.folder_name == db_folder_name).first() | |
| if not db_folder: | |
| db_folder = models.IndexedFolder(drive_id=d.id, folder_id=full_cloud_path, folder_name=db_folder_name, last_synced=datetime.utcnow()) | |
| db.add(db_folder); db.commit(); db.refresh(db_folder) | |
| folder_cache[folder_cache_key] = { "cloud_id": full_cloud_path, "db_id": db_folder.id } | |
| from_path = file.cloud_id | |
| if cloud_utils.move_dropbox_file(dbx, from_path, f"{course_code}/{cat_name}", file.name, new_name=new_file_name): | |
| file.name = new_file_name; file.is_sorted = True; file.is_locked = True | |
| file.course_code = course_code; file.category = cat_name | |
| file.folder_id = folder_cache[folder_cache_key]["db_id"]; file.cloud_id = f"{full_cloud_path}/{new_file_name}" | |
| moved_count += 1 | |
| if moved_count > 0: | |
| create_log(db, current_user.email, "APPLY_SORT", f"Bulk moved and renamed {moved_count} files.", undo_payload=json.dumps(undo_data)) | |
| db.add(models.Notification(user_id=current_user.id, message=f"Organization Complete: {moved_count} files successfully moved and renamed based on AI mapping.")) | |
| db.commit() | |
| return {"message": f"Successfully organized {moved_count} files."} | |
| # ========================================== | |
| # ANALYTICS | |
| # ========================================== | |
| def get_storage_analytics(current_user: Annotated[models.User, Depends(get_current_user)], db: Session = Depends(database.get_db)): | |
| files = db.query(models.FileMetadata).join(models.IndexedFolder).join(models.ConnectedDrive).filter(models.ConnectedDrive.user_id == current_user.id).all() | |
| course_data, category_data = {}, {} | |
| for f in files: | |
| course = getattr(f, 'course_code', 'General') or 'General' | |
| cat = f.category or 'Unsorted' | |
| size = f.size or 0 | |
| if course not in course_data: course_data[course] = {"name": course, "value": 0, "fileCount": 0} | |
| course_data[course]["value"] += size | |
| course_data[course]["fileCount"] += 1 | |
| if cat not in category_data: category_data[cat] = {"name": cat, "value": 0, "fileCount": 0} | |
| category_data[cat]["value"] += size | |
| category_data[cat]["fileCount"] += 1 | |
| largest_files = sorted([f for f in files if f.size], key=lambda x: x.size, reverse=True)[:5] | |
| top_files = [{"name": f.name, "course": getattr(f, 'course_code', 'General'), "size": f.size} for f in largest_files] | |
| # FIX: Sort alphabetically by name so the React charts stop jumping! | |
| sorted_courses = sorted(list(course_data.values()), key=lambda x: x["name"]) | |
| sorted_categories = sorted(list(category_data.values()), key=lambda x: x["name"]) | |
| return {"byCourse": sorted_courses, "byCategory": sorted_categories, "topFiles": top_files} | |
| # --- AUTH --- | |
| async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: Session = Depends(database.get_db)): | |
| is_blacklisted = db.query(models.BlacklistedEmail).filter(models.BlacklistedEmail.email == form_data.username).first() | |
| user = db.query(models.User).filter(models.User.email == form_data.username).first() | |
| if is_blacklisted or (user and not user.is_active and user.otp_code is None): | |
| create_log(db, form_data.username, "LOGIN_REJECTED", "Account is banned") | |
| raise HTTPException(status_code=403, detail="Account banned.") | |
| if not user or not auth.Hash.verify(form_data.password, user.hashed_password): | |
| create_log(db, form_data.username, "LOGIN_FAILED", "Invalid credentials") | |
| raise HTTPException(status_code=401, detail="Invalid credentials") | |
| if user.mfa_enabled: | |
| otp = email_utils.generate_otp() | |
| user.otp_code = otp; user.otp_expiry = datetime.utcnow() + timedelta(minutes=5); db.commit() | |
| await email_utils.send_otp_email(user.email, otp, "Login Code") | |
| return {"message": "OTP sent", "mfa_required": True} | |
| create_log(db, user.email, "LOGIN_SUCCESS", "Logged in") | |
| return {"access_token": auth.create_access_token(data={"sub": user.email, "role": user.role}), "token_type": "bearer", "mfa_required": False} | |
| def login_verify(data: schemas.VerifyOTPRequest, db: Session = Depends(database.get_db)): | |
| user = db.query(models.User).filter(models.User.email == data.email).first() | |
| if not user or user.otp_code != data.otp: raise HTTPException(status_code=400, detail="Invalid OTP") | |
| user.otp_code = None; db.commit() | |
| create_log(db, user.email, "LOGIN_MFA", "Success") | |
| return {"access_token": auth.create_access_token(data={"sub": user.email, "role": user.role}), "token_type": "bearer"} | |
| def logout(current_user: Annotated[models.User, Depends(get_current_user)], db: Session = Depends(database.get_db)): | |
| create_log(db, current_user.email, "LOGOUT", "User logged out") | |
| current_user.token_version += 1 | |
| db.commit() | |
| return {"message": "Logged out."} | |
| async def register(user: schemas.UserCreate, db: Session = Depends(database.get_db)): | |
| if db.query(models.BlacklistedEmail).filter(models.BlacklistedEmail.email == user.email).first(): | |
| raise HTTPException(status_code=400, detail="Blacklisted") | |
| existing_user = db.query(models.User).filter(models.User.email == user.email).first() | |
| otp = email_utils.generate_otp() | |
| if existing_user: | |
| if existing_user.is_active: | |
| raise HTTPException(status_code=400, detail="Registered") | |
| else: | |
| existing_user.hashed_password = auth.Hash.bcrypt(user.password) | |
| existing_user.otp_code = otp | |
| existing_user.otp_expiry = datetime.utcnow() + timedelta(minutes=10) | |
| db.commit() | |
| await email_utils.send_alert_email(user.email, "DocuSort Verification", f"Your verification code is: <br><br><b style='font-size:24px;'>{otp}</b><br><br>This code expires in 10 minutes.") | |
| return {"message": "OTP resent"} | |
| new_user = models.User(email=user.email, hashed_password=auth.Hash.bcrypt(user.password), is_active=False, otp_code=otp, otp_expiry=datetime.utcnow()+timedelta(minutes=10)) | |
| db.add(new_user) | |
| db.commit() | |
| await email_utils.send_alert_email(user.email, "DocuSort Verification", f"Your verification code is: <br><br><b style='font-size:24px;'>{otp}</b><br><br>This code expires in 10 minutes.") | |
| return {"message": "Registered"} | |
| def verify_reg(data: schemas.VerifyOTPRequest, db: Session = Depends(database.get_db)): | |
| user = db.query(models.User).filter(models.User.email == data.email).first() | |
| if not user or user.otp_code != data.otp: raise HTTPException(status_code=400, detail="Invalid") | |
| user.is_active = True; user.otp_code = None; db.commit() | |
| return {"access_token": auth.create_access_token(data={"sub": user.email, "role": user.role}), "token_type": "bearer"} | |
| async def forgot_password(data: schemas.ForgotPasswordRequest, db: Session = Depends(database.get_db)): | |
| user = db.query(models.User).filter(models.User.email == data.email).first() | |
| if not user: | |
| raise HTTPException(status_code=404, detail="Email not found in our system.") | |
| otp = email_utils.generate_otp() | |
| user.otp_code = otp | |
| user.otp_expiry = datetime.utcnow() + timedelta(minutes=10) | |
| db.commit() | |
| await email_utils.send_otp_email(user.email, otp, "Reset Password") | |
| return {"message": "OTP sent"} | |
| def reset_password(data: schemas.ResetPasswordRequest, db: Session = Depends(database.get_db)): | |
| user = db.query(models.User).filter(models.User.email == data.email).first() | |
| if not user or user.otp_code != data.otp: raise HTTPException(status_code=400, detail="Invalid") | |
| user.hashed_password = auth.Hash.bcrypt(data.new_password); user.otp_code = None; db.commit() | |
| return {"message": "Updated"} | |
| def submit_appeal(data: schemas.AppealCreate, db: Session = Depends(database.get_db)): | |
| user = db.query(models.User).filter(models.User.email == data.email).first() | |
| if not user and not db.query(models.BlacklistedEmail).filter(models.BlacklistedEmail.email == data.email).first(): raise HTTPException(status_code=404) | |
| if db.query(models.UserReport).filter(models.UserReport.reporter_email_fallback == data.email, models.UserReport.status == "open", models.UserReport.type == "appeal").first(): raise HTTPException(status_code=400, detail="Pending") | |
| db.add(models.UserReport(type="appeal", reporter_email_fallback=data.email, reason=data.reason, description=data.description)); db.commit() | |
| return {"message": "Submitted"} | |
| def read_users_me(current_user: Annotated[models.User, Depends(get_current_user)]): | |
| return current_user | |
| def delete_me(current_user: Annotated[models.User, Depends(get_current_user)], db: Session = Depends(database.get_db)): | |
| db.delete(current_user); db.commit(); return {"message": "Deleted"} | |
| def update_settings(settings: schemas.UserSettingsUpdate, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| current_user.mfa_enabled = settings.mfa_enabled | |
| current_user.ui_theme = settings.ui_theme | |
| current_user.font_size = settings.font_size | |
| current_user.auto_sync = settings.auto_sync # <--- NEW | |
| current_user.disabled_defaults = settings.disabled_defaults # <--- ADD THIS LINE | |
| db.commit() | |
| return current_user | |
| def search_users(email: str, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| u = db.query(models.User).filter(models.User.email == email, models.User.id != current_user.id).first() | |
| return [u] if u else [] | |
| def share_req(data: schemas.ShareRequestCreate, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| f = db.query(models.IndexedFolder).filter(models.IndexedFolder.id == data.folder_id).first() | |
| if not f or f.drive.user_id != current_user.id: raise HTTPException(status_code=403) | |
| if db.query(models.ShareRequest).filter(models.ShareRequest.sender_id == current_user.id, models.ShareRequest.receiver_email == data.receiver_email, models.ShareRequest.folder_id == data.folder_id, models.ShareRequest.status == "pending").first(): raise HTTPException(status_code=400) | |
| exp = datetime.utcnow() + timedelta(days=data.expires_in_days) if data.expires_in_days else None | |
| db.add(models.ShareRequest(sender_id=current_user.id, receiver_email=data.receiver_email, folder_id=data.folder_id, expires_at=exp)) | |
| db.commit() | |
| return {"message": "Sent"} | |
| def folder_users(folder_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| f = db.query(models.IndexedFolder).filter(models.IndexedFolder.id == folder_id).first() | |
| if not f or f.drive.user_id != current_user.id: raise HTTPException(status_code=403) | |
| return [{"access_id": s.id, "user_email": s.user.email, "access_level": s.access_level, "expires_at": s.expires_at} for s in db.query(models.SharedAccess).filter(models.SharedAccess.folder_id == folder_id).all()] | |
| def report(data: schemas.ReportCreate, current_user: Annotated[models.User, Depends(get_current_user)], db: Session = Depends(database.get_db)): | |
| t = db.query(models.User).filter(models.User.email == data.reported_email).first() | |
| if t: db.add(models.UserReport(reporter_id=current_user.id, reported_user_id=t.id, reason=data.reason, description=data.description)); db.commit() | |
| return {"message": "Reported"} | |
| # --- DRIVES --- | |
| def get_google_url(current_user: Annotated[models.User, Depends(get_unrestricted_user)]): | |
| return {"url": google_utils.get_google_auth_url()} | |
| def cb(code_data: dict, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| t = google_utils.exchange_code_for_token(code_data.get("code")) | |
| e = db.query(models.ConnectedDrive).filter(models.ConnectedDrive.user_id == current_user.id, models.ConnectedDrive.drive_email == t['email'], models.ConnectedDrive.provider == "google").first() | |
| if e: e.access_token = t['access_token']; e.refresh_token = t['refresh_token']; e.status = "active" | |
| else: db.add(models.ConnectedDrive(user_id=current_user.id, provider="google", drive_email=t['email'], access_token=t['access_token'], refresh_token=t['refresh_token'], token_expiry=t['expiry'], status="active")) | |
| db.commit() | |
| return {"message": "Connected"} | |
| def list_drives(current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| return [{"id": d.id, "provider": d.provider, "drive_email": d.drive_email, "status": d.status} for d in db.query(models.ConnectedDrive).filter(models.ConnectedDrive.user_id == current_user.id).all()] | |
| def get_files(drive_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db), folder_id: str = "root"): | |
| d = db.query(models.ConnectedDrive).filter(models.ConnectedDrive.id == drive_id, models.ConnectedDrive.user_id == current_user.id).first() | |
| if not d: raise HTTPException(status_code=404) | |
| if d.status == 'expired': raise HTTPException(status_code=401) | |
| try: | |
| if d.provider == "dropbox": | |
| dbx = dropbox_utils.get_dropbox_client(d.access_token) | |
| return dropbox_utils.list_files_in_folder(dbx, folder_id) | |
| else: # Google | |
| s = google_utils.get_drive_service(d.access_token, d.refresh_token, "https://oauth2.googleapis.com/token", google_utils.get_client_id_from_file(), google_utils.get_client_secret_from_file()) | |
| return google_utils.list_files_in_folder(s, folder_id) | |
| except Exception as e: | |
| if "invalid_grant" in str(e) or "expired_access_token" in str(e): | |
| d.status = "expired"; db.commit(); raise HTTPException(status_code=401) | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| def del_drive(drive_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| """Removes a connected drive from the user's account.""" | |
| d = db.query(models.ConnectedDrive).filter( | |
| models.ConnectedDrive.id == drive_id, | |
| models.ConnectedDrive.user_id == current_user.id | |
| ).first() | |
| if not d: | |
| raise HTTPException(status_code=404, detail="Drive not found") | |
| db.delete(d) | |
| db.commit() | |
| return {"message": "Drive successfully disconnected"} | |
| def add_folder(drive_id: int, fd: schemas.FolderSelectRequest, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| if not db.query(models.ConnectedDrive).filter(models.ConnectedDrive.id == drive_id, models.ConnectedDrive.user_id == current_user.id).first(): raise HTTPException(status_code=404) | |
| if db.query(models.IndexedFolder).filter(models.IndexedFolder.drive_id == drive_id, models.IndexedFolder.folder_id == fd.folder_id).first(): raise HTTPException(status_code=400) | |
| db.add(models.IndexedFolder(drive_id=drive_id, folder_id=fd.folder_id, folder_name=fd.folder_name, last_synced=datetime.utcnow())); db.commit(); return {"message": "Added"} | |
| def get_folders(drive_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| return db.query(models.IndexedFolder).filter(models.IndexedFolder.drive_id == drive_id).all() | |
| def del_folder(folder_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| f = db.query(models.IndexedFolder).filter(models.IndexedFolder.id == folder_id).first() | |
| if f and f.drive.user_id == current_user.id: | |
| if getattr(f, 'is_sorting', False): raise HTTPException(status_code=400, detail="Cannot delete a folder while AI is sorting.") | |
| db.delete(f); db.commit() | |
| return {"message": "Deleted"} | |
| def get_dropbox_url(current_user: Annotated[models.User, Depends(get_unrestricted_user)]): | |
| return {"url": dropbox_utils.get_dropbox_auth_url()} | |
| def dropbox_cb(code_data: dict, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| try: | |
| token_data = dropbox_utils.exchange_code_for_token(code_data.get("code")) | |
| access_token = token_data.get("access_token") | |
| refresh_token = token_data.get("refresh_token", "") | |
| dbx = dropbox_utils.get_dropbox_client(access_token) | |
| drive_email = dropbox_utils.get_user_email(dbx) | |
| e = db.query(models.ConnectedDrive).filter(models.ConnectedDrive.user_id == current_user.id, models.ConnectedDrive.drive_email == drive_email, models.ConnectedDrive.provider == "dropbox").first() | |
| if e: | |
| e.access_token = access_token | |
| if refresh_token: e.refresh_token = refresh_token | |
| e.status = "active" | |
| else: | |
| db.add(models.ConnectedDrive(user_id=current_user.id, provider="dropbox", drive_email=drive_email, access_token=access_token, refresh_token=refresh_token, status="active")) | |
| db.commit() | |
| return {"message": "Dropbox Connected Successfully!"} | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Dropbox Auth Failed: {str(e)}") | |
| # ======================= | |
| # METADATA & SORTING ENDPOINTS | |
| # ======================= | |
| def get_duplicates(current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| """Advanced Duplicate Engine with Exact, Fuzzy, and Version Chain Tracking.""" | |
| all_files = db.query(models.FileMetadata).join(models.IndexedFolder).join(models.ConnectedDrive).filter( | |
| models.ConnectedDrive.user_id == current_user.id | |
| ).all() | |
| exact_groups, size_groups, name_groups, version_groups = {}, {}, {}, {} | |
| ignored_files = [] | |
| for f in all_files: | |
| if f.is_ignored_duplicate: | |
| ignored_files.append({ | |
| "id": f.id, "name": f.name, "cloud": f.cloud_provider, | |
| "category": f.category, "web_view_link": f.web_view_link | |
| }) | |
| continue | |
| if f.file_hash: exact_groups.setdefault(f.file_hash, []).append(f) | |
| if f.size and f.size > 0: size_groups.setdefault(f.size, []).append(f) | |
| if f.name: | |
| base_name = os.path.splitext(f.name)[0].lower().strip() | |
| # 1. Standard Fuzzy Name Cleaning ("- copy") | |
| clean_name = re.sub(r'(\s-\scopy\d*|\(\d+\)|\scopy\d*)$', '', base_name).strip() | |
| name_groups.setdefault(clean_name, []).append(f) | |
| # 2. NEW: Deep Version Chain Cleaning | |
| # Strips _v1, -v2.5, final, draft, revised, etc. from the end of the name | |
| version_clean = re.sub(r'[-_\s]*(v\d+(\.\d+)?|final|draft|revised|rev|edit\d*|copy\d*|\(\d+\))[-_\s]*$', '', base_name, flags=re.IGNORECASE).strip() | |
| version_groups.setdefault(version_clean, []).append(f) | |
| # Tab 1: Exact Hash Matches | |
| exact_matches = [] | |
| exact_handled_ids = set() | |
| for h, files in exact_groups.items(): | |
| if len(files) > 1: | |
| exact_matches.append({ | |
| "hash": h, | |
| "files": [{"id": f.id, "name": f.name, "cloud": f.cloud_provider, "category": f.category, "web_view_link": f.web_view_link} for f in files] | |
| }) | |
| exact_handled_ids.update([f.id for f in files]) | |
| # Tab 2: Fuzzy Matches (Name OR Size) | |
| fuzzy_matches = [] | |
| fuzzy_handled_ids = set() | |
| for clean_name, files in name_groups.items(): | |
| group_files = [f for f in files if f.id not in exact_handled_ids] | |
| if len(group_files) > 1: | |
| fuzzy_matches.append({ | |
| "reason": f"Similar File Name: '{clean_name}'", | |
| "size_bytes": None, | |
| "files": [{"id": f.id, "name": f.name, "cloud": f.cloud_provider, "category": f.category, "web_view_link": f.web_view_link} for f in group_files] | |
| }) | |
| fuzzy_handled_ids.update([f.id for f in group_files]) | |
| for size, files in size_groups.items(): | |
| group_files = [f for f in files if f.id not in exact_handled_ids and f.id not in fuzzy_handled_ids] | |
| if len(group_files) > 1: | |
| fuzzy_matches.append({ | |
| "reason": f"Same Byte Size", | |
| "size_bytes": size, | |
| "files": [{"id": f.id, "name": f.name, "cloud": f.cloud_provider, "category": f.category, "web_view_link": f.web_view_link} for f in group_files] | |
| }) | |
| fuzzy_handled_ids.update([f.id for f in group_files]) | |
| # Tab 4: NEW Version Chains | |
| version_matches = [] | |
| for v_clean, files in version_groups.items(): | |
| # Only group files that weren't already caught by exact or fuzzy rules | |
| group_files = [f for f in files if f.id not in exact_handled_ids and f.id not in fuzzy_handled_ids] | |
| if len(group_files) > 1: | |
| # Sort alphabetically (so v1 comes before v2, draft before final, etc.) | |
| group_files.sort(key=lambda x: x.name.lower()) | |
| version_matches.append({ | |
| "reason": f"Project Versions: '{v_clean}'", | |
| "files": [{"id": f.id, "name": f.name, "cloud": f.cloud_provider, "category": f.category, "web_view_link": f.web_view_link} for f in group_files] | |
| }) | |
| return { | |
| "tab_1_exact": exact_matches, | |
| "tab_2_fuzzy": fuzzy_matches, | |
| "tab_3_ignored": ignored_files, | |
| "tab_4_versions": version_matches | |
| } | |
| def unignore_duplicate(file_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| """Restores an ignored file back to the duplicate scanner.""" | |
| file = db.query(models.FileMetadata).filter(models.FileMetadata.id == file_id).first() | |
| if not file or file.folder.drive.user_id != current_user.id: raise HTTPException(status_code=403) | |
| file.is_ignored_duplicate = False | |
| db.commit() | |
| create_log(db, current_user.email, "DUPLICATE_RESTORED", f"Unignored file: {file.name}") | |
| return {"message": "File restored to cleanup scanner."} | |
| def preview_sort(drive_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| """DRY RUN: Returns exactly what will happen without actually moving anything.""" | |
| d = db.query(models.ConnectedDrive).filter(models.ConnectedDrive.id == drive_id, models.ConnectedDrive.user_id == current_user.id).first() | |
| if not d: raise HTTPException(status_code=404) | |
| # FIX: We removed the `category != "Unsorted"` filter so it grabs raw files too! | |
| files_to_move = db.query(models.FileMetadata).join(models.IndexedFolder).filter( | |
| models.IndexedFolder.drive_id == drive_id, | |
| models.FileMetadata.is_sorted == False, | |
| models.FileMetadata.is_locked == False | |
| ).all() | |
| preview = [] | |
| custom_rules = db.query(models.CustomRule).filter(models.CustomRule.user_id == current_user.id).all() | |
| disabled_defs = current_user.disabled_defaults | |
| for f in files_to_move: | |
| # FIX: Dynamically predict the category if it hasn't been sorted yet | |
| cat_name = f.category | |
| if cat_name == "Unsorted": | |
| cat_name, _ = predict_category(f.name, custom_rules, disabled_defs) | |
| course_code = f.course_code or extract_course_code(f.name) or "General" | |
| new_name = f"{course_code}_{cat_name} - {f.name}" | |
| new_path = f"{course_code} / {cat_name}" | |
| preview.append({ | |
| "file_id": f.id, | |
| "old_name": f.name, | |
| "new_name": new_name, | |
| "old_path": f.original_folder_name or f.folder.folder_name, | |
| "new_path": new_path | |
| }) | |
| return {"files": preview, "total": len(preview)} | |
| def apply_sort_and_move(drive_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| """Creates nested folders, MOVES files, RENAMES them, and saves the UNDO payload.""" | |
| d = db.query(models.ConnectedDrive).filter(models.ConnectedDrive.id == drive_id, models.ConnectedDrive.user_id == current_user.id).first() | |
| if not d: raise HTTPException(status_code=404) | |
| # FIX: Grabs all raw, unlocked files | |
| files_to_move = db.query(models.FileMetadata).join(models.IndexedFolder).filter( | |
| models.IndexedFolder.drive_id == drive_id, | |
| models.FileMetadata.is_sorted == False, | |
| models.FileMetadata.is_locked == False | |
| ).all() | |
| if not files_to_move: return {"message": "No new files to move. Make sure you run Sort first."} | |
| moved_count = 0 | |
| folder_cache = {} | |
| undo_data = [] | |
| try: | |
| if d.provider == "google": | |
| service = google_utils.get_drive_service(d.access_token, d.refresh_token, "https://oauth2.googleapis.com/token", google_utils.get_client_id_from_file(), google_utils.get_client_secret_from_file()) | |
| elif d.provider == "dropbox": | |
| dbx = dropbox_utils.get_dropbox_client(d.access_token) | |
| custom_rules = db.query(models.CustomRule).filter(models.CustomRule.user_id == current_user.id).all() | |
| disabled_defs = current_user.disabled_defaults | |
| for file in files_to_move: | |
| # FIX: Dynamically predict the category and course if missing | |
| if file.category == "Unsorted": | |
| cat, _ = predict_category(file.name, custom_rules, disabled_defs) | |
| file.category = cat | |
| file.course_code = file.course_code or extract_course_code(file.name) or "General" | |
| course_code = file.course_code | |
| cat_name = file.category | |
| new_file_name = f"{course_code}_{cat_name} - {file.name}" | |
| db_folder_name = f"{course_code} / {cat_name}" | |
| undo_data.append({ | |
| "file_db_id": file.id, | |
| "original_cloud_id": file.cloud_id, | |
| "original_name": file.name, | |
| "original_db_folder_id": file.folder_id, | |
| "original_cloud_folder_id": file.folder.folder_id | |
| }) | |
| # --- GOOGLE DRIVE --- | |
| if d.provider == "google": | |
| if course_code not in folder_cache: | |
| folder_cache[course_code] = cloud_utils.create_google_folder(service, course_code) | |
| course_folder_id = folder_cache[course_code] | |
| if db_folder_name not in folder_cache: | |
| cloud_subfolder_id = cloud_utils.create_google_folder(service, cat_name, parent_id=course_folder_id) | |
| db_folder = db.query(models.IndexedFolder).filter(models.IndexedFolder.drive_id == drive_id, models.IndexedFolder.folder_id == cloud_subfolder_id).first() | |
| if not db_folder: | |
| db_folder = models.IndexedFolder(drive_id=drive_id, folder_id=cloud_subfolder_id, folder_name=db_folder_name, last_synced=datetime.utcnow()) | |
| db.add(db_folder); db.commit(); db.refresh(db_folder) | |
| folder_cache[db_folder_name] = { "cloud_id": cloud_subfolder_id, "db_id": db_folder.id } | |
| if cloud_utils.move_google_file(service, file.cloud_id, folder_cache[db_folder_name]["cloud_id"], new_name=new_file_name): | |
| file.name = new_file_name | |
| file.is_sorted = True | |
| file.folder_id = folder_cache[db_folder_name]["db_id"] | |
| moved_count += 1 | |
| # --- DROPBOX --- | |
| elif d.provider == "dropbox": | |
| full_cloud_path = f"/{course_code}/{cat_name}" | |
| if db_folder_name not in folder_cache: | |
| cloud_utils.create_dropbox_folder(dbx, course_code) | |
| cloud_utils.create_dropbox_folder(dbx, full_cloud_path) | |
| db_folder = db.query(models.IndexedFolder).filter(models.IndexedFolder.drive_id == drive_id, models.IndexedFolder.folder_name == db_folder_name).first() | |
| if not db_folder: | |
| db_folder = models.IndexedFolder(drive_id=drive_id, folder_id=full_cloud_path, folder_name=db_folder_name, last_synced=datetime.utcnow()) | |
| db.add(db_folder); db.commit(); db.refresh(db_folder) | |
| folder_cache[db_folder_name] = { "cloud_id": full_cloud_path, "db_id": db_folder.id } | |
| from_path = file.cloud_id | |
| if cloud_utils.move_dropbox_file(dbx, from_path, f"{course_code}/{cat_name}", file.name, new_name=new_file_name): | |
| file.name = new_file_name | |
| file.is_sorted = True | |
| file.folder_id = folder_cache[db_folder_name]["db_id"] | |
| file.cloud_id = f"{full_cloud_path}/{new_file_name}" | |
| moved_count += 1 | |
| db.commit() | |
| create_log(db, current_user.email, "APPLY_SORT", f"Moved and renamed {moved_count} files.", undo_payload=json.dumps(undo_data)) | |
| return {"message": f"Successfully structured folders, renamed, and moved {moved_count} files!"} | |
| except Exception as e: | |
| db.commit() | |
| raise HTTPException(status_code=500, detail=f"Error moving files: {str(e)}") | |
| def undo_sort_action(log_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| """1-CLICK TIME TRAVEL: Reverses a batch move.""" | |
| log = db.query(models.UserLog).filter(models.UserLog.id == log_id, models.UserLog.user_email == current_user.email).first() | |
| if not log or not log.undo_payload: raise HTTPException(status_code=400, detail="Cannot undo this action.") | |
| try: | |
| undo_data = json.loads(log.undo_payload) | |
| restored_count = 0 | |
| for item in undo_data: | |
| file = db.query(models.FileMetadata).filter(models.FileMetadata.id == item["file_db_id"]).first() | |
| if not file: continue | |
| d = file.folder.drive | |
| success = False | |
| if d.provider == "google": | |
| service = google_utils.get_drive_service(d.access_token, d.refresh_token, "https://oauth2.googleapis.com/token", google_utils.get_client_id_from_file(), google_utils.get_client_secret_from_file()) | |
| success = cloud_utils.move_google_file(service, file.cloud_id, item["original_cloud_folder_id"], new_name=item["original_name"]) | |
| elif d.provider == "dropbox": | |
| dbx = dropbox_utils.get_dropbox_client(d.access_token) | |
| success = cloud_utils.move_dropbox_file(dbx, file.cloud_id, item["original_cloud_folder_id"], file.name, new_name=item["original_name"]) | |
| if success: file.cloud_id = item["original_cloud_id"] | |
| if success: | |
| file.name = item["original_name"] | |
| file.folder_id = item["original_db_folder_id"] | |
| file.is_sorted = False | |
| file.course_code = "General" | |
| file.category = "Unsorted" | |
| restored_count += 1 | |
| # Wipe the payload so it can't be undone twice | |
| log.undo_payload = None | |
| create_log(db, current_user.email, "UNDO_SORT", f"Reverted {restored_count} files back to their original state.") | |
| db.commit() | |
| return {"message": f"Successfully time-traveled {restored_count} files back to their original folders!"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Undo failed: {str(e)}") | |
| def sync(folder_db_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| f = db.query(models.IndexedFolder).filter(models.IndexedFolder.id == folder_db_id).first() | |
| if not f or f.drive.user_id != current_user.id: raise HTTPException(status_code=403) | |
| if getattr(f, 'is_sorting', False): raise HTTPException(status_code=400, detail="Sync disabled while AI is sorting.") | |
| try: | |
| if f.drive.provider == "dropbox": | |
| dbx = dropbox_utils.get_dropbox_client(f.drive.access_token) | |
| files = dropbox_utils.list_files_in_folder(dbx, f.folder_id) | |
| else: # Google | |
| s = google_utils.get_drive_service(f.drive.access_token, f.drive.refresh_token, "https://oauth2.googleapis.com/token", google_utils.get_client_id_from_file(), google_utils.get_client_secret_from_file()) | |
| files = google_utils.list_files_in_folder(s, f.folder_id) | |
| for g in files: | |
| existing = db.query(models.FileMetadata).filter(models.FileMetadata.folder_id == f.id, models.FileMetadata.cloud_id == g['id']).first() | |
| file_size = int(g.get('size', 0)) if g.get('size') else None | |
| file_hash = g.get('md5Checksum') if f.drive.provider == 'google' else g.get('content_hash') | |
| if existing: | |
| # NEW: Updates the name and size if you changed them in Google Drive/Dropbox! | |
| existing.name = g['name'] | |
| existing.size = file_size | |
| existing.file_hash = file_hash | |
| else: | |
| db.add(models.FileMetadata( | |
| cloud_id=g['id'], | |
| name=g['name'], | |
| mime_type=g['mimeType'], | |
| web_view_link=g.get('webViewLink', ''), | |
| icon_link=g.get('iconLink', ''), | |
| folder_id=f.id, | |
| size=file_size, | |
| file_hash=file_hash, | |
| cloud_provider=f.drive.provider, | |
| original_folder_name=f.folder_name | |
| )) | |
| f.last_synced = datetime.utcnow() | |
| db.commit() | |
| return {"message": "Synced"} | |
| except Exception as e: | |
| if "invalid_grant" in str(e) or "expired_access_token" in str(e): | |
| f.drive.status = "expired"; db.commit(); raise HTTPException(status_code=401) | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| def all_files(current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db), search: str = "", search_by: str = "all"): | |
| o = db.query(models.FileMetadata).join(models.IndexedFolder).join(models.ConnectedDrive).filter(models.ConnectedDrive.user_id == current_user.id) | |
| s = db.query(models.FileMetadata).join(models.IndexedFolder).join(models.SharedAccess).filter(models.SharedAccess.user_id == current_user.id) | |
| if search: | |
| st = f"%{search}%" | |
| if search_by == "name": | |
| o = o.filter(models.FileMetadata.name.ilike(st)); s = s.filter(models.FileMetadata.name.ilike(st)) | |
| elif search_by == "category": | |
| o = o.filter(models.FileMetadata.category.ilike(st)); s = s.filter(models.FileMetadata.category.ilike(st)) | |
| elif search_by == "folder": | |
| o = o.filter(models.IndexedFolder.folder_name.ilike(st)); s = s.filter(models.IndexedFolder.folder_name.ilike(st)) | |
| elif search_by == "provider": | |
| o = o.filter(models.ConnectedDrive.provider.ilike(st)); s = s.filter(models.ConnectedDrive.provider.ilike(st)) | |
| else: | |
| o = o.filter(models.FileMetadata.name.ilike(st) | models.FileMetadata.category.ilike(st) | models.IndexedFolder.folder_name.ilike(st) | models.ConnectedDrive.provider.ilike(st)) | |
| s = s.filter(models.FileMetadata.name.ilike(st) | models.FileMetadata.category.ilike(st) | models.IndexedFolder.folder_name.ilike(st) | models.ConnectedDrive.provider.ilike(st)) | |
| res = [] | |
| for f in o.all() + s.all(): | |
| owner = f.folder.drive.owner.email if f.folder.drive and f.folder.drive.owner else "Unknown" | |
| res.append({ | |
| "id": f.id, | |
| "cloud_id": f.cloud_id, | |
| "name": f.name, | |
| "mime_type": f.mime_type, | |
| "web_view_link": f.web_view_link, | |
| "icon_link": f.icon_link, | |
| "folder_name": f.folder.folder_name, | |
| "original_folder_name": getattr(f, 'original_folder_name', None) or f.folder.folder_name, | |
| "owner": owner, | |
| "drive_email": f.folder.drive.drive_email if f.folder.drive else "Unknown", | |
| "provider": f.cloud_provider, | |
| "category": f.category, | |
| "course_code": getattr(f, 'course_code', 'General'), # <--- NEW: Sends the parent folder down to React | |
| "is_locked": f.is_locked, | |
| "is_sorted": f.is_sorted | |
| }) | |
| return res | |
| # --- ADMIN --- | |
| def admin_users(current_user: Annotated[models.User, Depends(get_admin_user)], db: Session = Depends(database.get_db)): | |
| return db.query(models.User).all() | |
| def admin_stats(current_user: Annotated[models.User, Depends(get_admin_user)], db: Session = Depends(database.get_db)): | |
| users = db.query(models.User).filter(models.User.role != "admin").all() | |
| user_stats = [] | |
| for u in users: | |
| fc = sum(len(f.files) for d in u.drives for f in d.folders) | |
| user_stats.append({"id": u.id, "email": u.email, "file_count": fc, "drive_count": len(u.drives), "is_frozen": u.is_frozen, "is_active": u.is_active}) | |
| sorted_files_count = db.query(models.FileMetadata).filter(models.FileMetadata.category != "Unsorted").count() | |
| return { | |
| "counts": { | |
| "users": len(users), | |
| "files": db.query(models.FileMetadata).count(), | |
| "sorted_files": sorted_files_count, | |
| "drives": db.query(models.ConnectedDrive).count(), | |
| "shares": db.query(models.SharedAccess).join(models.User).count(), | |
| "reports": db.query(models.UserReport).filter(models.UserReport.status == "open").count() | |
| }, | |
| "user_activity": sorted(user_stats, key=lambda x: x['file_count'], reverse=True) | |
| } | |
| def admin_reports(current_user: Annotated[models.User, Depends(get_admin_user)], db: Session = Depends(database.get_db)): | |
| reports = db.query(models.UserReport).filter(models.UserReport.status == "open").all() | |
| res = [] | |
| for r in reports: | |
| reporter = r.reporter.email if r.reporter else r.reporter_email_fallback | |
| target = r.reported_user.email if r.reported_user else "N/A" | |
| res.append({"id": r.id, "type": r.type, "reporter_email": reporter, "reported_user_email": target, "reason": r.reason, "description": r.description, "status": r.status, "admin_response": r.admin_response, "created_at": r.created_at}) | |
| return res | |
| def get_blacklist(current_user: Annotated[models.User, Depends(get_admin_user)], db: Session = Depends(database.get_db)): | |
| return db.query(models.BlacklistedEmail).all() | |
| async def ban_user(data: schemas.BanRequest, current_user: Annotated[models.User, Depends(get_admin_user)], db: Session = Depends(database.get_db)): | |
| if not db.query(models.BlacklistedEmail).filter(models.BlacklistedEmail.email == data.email).first(): | |
| db.add(models.BlacklistedEmail(email=data.email, reason=data.reason, banned_by=current_user.email)) | |
| u = db.query(models.User).filter(models.User.email == data.email).first() | |
| if u and u.role != 'admin': | |
| u.is_active = False | |
| u.is_frozen = False | |
| u.token_version += 1 | |
| await email_utils.send_alert_email(u.email, "Account Permanently Banned", f"Your account has been terminated for a violation of terms.<br><br><b>Reason:</b> {data.reason}") | |
| db.commit() | |
| return {"message": "Banned"} | |
| def unban_user(id: int, current_user: Annotated[models.User, Depends(get_admin_user)], db: Session = Depends(database.get_db)): | |
| entry = db.query(models.BlacklistedEmail).filter(models.BlacklistedEmail.id == id).first() | |
| if entry: | |
| u = db.query(models.User).filter(models.User.email == entry.email).first() | |
| if u: u.is_active = True | |
| db.delete(entry) | |
| db.commit() | |
| return {"message": "Unbanned"} | |
| async def freeze_user(user_id: int, freeze: bool, current_user: Annotated[models.User, Depends(get_admin_user)], db: Session = Depends(database.get_db)): | |
| u = db.query(models.User).filter(models.User.id == user_id).first() | |
| if u and u.role != 'admin': | |
| u.is_frozen = freeze | |
| if freeze: | |
| u.token_version += 1 | |
| await email_utils.send_alert_email(u.email, "Account Frozen", "Your account features have been suspended. Please log in to your dashboard to submit an appeal.") | |
| db.add(models.Notification(user_id=u.id, message="Your account has been frozen. Features are currently restricted.")) | |
| else: | |
| await email_utils.send_alert_email(u.email, "Account Restored", "Your account has been unfrozen. You have full access again.") | |
| db.add(models.Notification(user_id=u.id, message="Your account has been unfrozen. You have full access again.")) | |
| db.commit() | |
| return {"message": "Done"} | |
| def admin_details(user_id: int, current_user: Annotated[models.User, Depends(get_admin_user)], db: Session = Depends(database.get_db)): | |
| u = db.query(models.User).filter(models.User.id == user_id).first() | |
| if not u: raise HTTPException(status_code=404) | |
| drives_info = [] | |
| for d in u.drives: | |
| total_files = 0 | |
| sorted_files = 0 | |
| folders_data = [] | |
| for f in d.folders: | |
| files_list = [] | |
| for file in f.files: | |
| files_list.append({ | |
| "id": file.id, | |
| "name": file.name, | |
| "category": getattr(file, 'category', 'Unsorted'), | |
| "web_view_link": file.web_view_link | |
| }) | |
| total_files += len(f.files) | |
| sorted_files += sum(1 for file in f.files if getattr(file, 'category', 'Unsorted') != 'Unsorted') | |
| folders_data.append({ | |
| "id": f.id, | |
| "folder_name": f.folder_name, | |
| "files": files_list | |
| }) | |
| drives_info.append({ | |
| "id": d.id, | |
| "drive_email": d.drive_email, | |
| "provider": d.provider, | |
| "folder_count": len(d.folders), | |
| "total_files": total_files, | |
| "sorted_files": sorted_files, | |
| "folders": folders_data | |
| }) | |
| return {"user": u, "drives": drives_info} | |
| def admin_del_drive(drive_id: int, current_user: Annotated[models.User, Depends(get_admin_user)], db: Session = Depends(database.get_db)): | |
| d = db.query(models.ConnectedDrive).filter(models.ConnectedDrive.id == drive_id).first() | |
| if d: db.delete(d); db.commit() | |
| return {"message": "Deleted"} | |
| def admin_del_share(access_id: int, current_user: Annotated[models.User, Depends(get_admin_user)], db: Session = Depends(database.get_db)): | |
| a = db.query(models.SharedAccess).filter(models.SharedAccess.id == access_id).first() | |
| if a: db.delete(a); db.commit() | |
| return {"message": "Revoked"} | |
| # --- SORTING ROUTES --- | |
| class CategoryOverride(BaseModel): | |
| category: str | |
| class CourseOverride(BaseModel): | |
| course_code: str | |
| class LockToggle(BaseModel): | |
| is_locked: bool | |
| def toggle_file_lock(file_id: int, payload: LockToggle, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| file = db.query(models.FileMetadata).filter(models.FileMetadata.id == file_id).first() | |
| if not file or file.folder.drive.user_id != current_user.id: raise HTTPException(status_code=403, detail="Not authorized") | |
| file.is_locked = payload.is_locked | |
| db.commit() | |
| return {"message": f"File {'locked' if payload.is_locked else 'unlocked'}"} | |
| def update_file_category(file_id: int, payload: CategoryOverride, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| """Updates category and auto-moves to a nested folder.""" | |
| file = db.query(models.FileMetadata).filter(models.FileMetadata.id == file_id).first() | |
| if not file or file.folder.drive.user_id != current_user.id: raise HTTPException(status_code=403, detail="Not authorized") | |
| file.category = payload.category | |
| file.is_locked = True | |
| d = file.folder.drive | |
| # FIX: Use the existing database course code, NOT regex extraction! | |
| course_code = getattr(file, 'course_code', 'General') | |
| if not course_code: course_code = "General" | |
| cat_name = payload.category | |
| db_folder_name = f"{course_code} / {cat_name}" | |
| try: | |
| if d.provider == "google": | |
| service = google_utils.get_drive_service(d.access_token, d.refresh_token, "https://oauth2.googleapis.com/token", google_utils.get_client_id_from_file(), google_utils.get_client_secret_from_file()) | |
| course_folder_id = cloud_utils.create_google_folder(service, course_code) | |
| cloud_subfolder_id = cloud_utils.create_google_folder(service, cat_name, parent_id=course_folder_id) | |
| db_folder = db.query(models.IndexedFolder).filter(models.IndexedFolder.drive_id == d.id, models.IndexedFolder.folder_id == cloud_subfolder_id).first() | |
| if not db_folder: | |
| db_folder = models.IndexedFolder(drive_id=d.id, folder_id=cloud_subfolder_id, folder_name=db_folder_name, last_synced=datetime.utcnow()) | |
| db.add(db_folder); db.commit(); db.refresh(db_folder) | |
| if cloud_utils.move_google_file(service, file.cloud_id, cloud_subfolder_id): | |
| file.folder_id = db_folder.id | |
| file.is_sorted = True | |
| create_log(db, current_user.email, "MANUAL_MOVE", f"Moved {file.name} to {db_folder_name}") | |
| elif d.provider == "dropbox": | |
| dbx = dropbox_utils.get_dropbox_client(d.access_token) | |
| full_cloud_path = f"/{course_code}/{cat_name}" | |
| cloud_utils.create_dropbox_folder(dbx, course_code) | |
| cloud_utils.create_dropbox_folder(dbx, full_cloud_path) | |
| db_folder = db.query(models.IndexedFolder).filter(models.IndexedFolder.drive_id == d.id, models.IndexedFolder.folder_name == db_folder_name).first() | |
| if not db_folder: | |
| db_folder = models.IndexedFolder(drive_id=d.id, folder_id=full_cloud_path, folder_name=db_folder_name, last_synced=datetime.utcnow()) | |
| db.add(db_folder); db.commit(); db.refresh(db_folder) | |
| from_path = file.cloud_id | |
| if cloud_utils.move_dropbox_file(dbx, from_path, f"{course_code}/{cat_name}", file.name): | |
| file.folder_id = db_folder.id | |
| file.cloud_id = f"{full_cloud_path}/{file.name}" | |
| file.is_sorted = True | |
| create_log(db, current_user.email, "MANUAL_MOVE", f"Moved {file.name} to {db_folder_name}") | |
| except Exception as e: | |
| print(f"Auto-move failed during manual categorize: {e}") | |
| db.commit() | |
| return {"message": "Category updated and file moved successfully!"} | |
| def update_file_course(file_id: int, payload: CourseOverride, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| """NEW: Updates the course code manually and instantly auto-moves it to the new parent folder.""" | |
| file = db.query(models.FileMetadata).filter(models.FileMetadata.id == file_id).first() | |
| if not file or file.folder.drive.user_id != current_user.id: raise HTTPException(status_code=403, detail="Not authorized") | |
| new_course = payload.course_code.strip().upper() | |
| if not new_course: new_course = "General" | |
| file.course_code = new_course | |
| file.is_locked = True | |
| d = file.folder.drive | |
| cat_name = file.category | |
| db_folder_name = f"{new_course} / {cat_name}" | |
| try: | |
| # ONLY AUTO-MOVE IF IT HAS A CATEGORY | |
| if cat_name != "Unsorted": | |
| if d.provider == "google": | |
| service = google_utils.get_drive_service(d.access_token, d.refresh_token, "https://oauth2.googleapis.com/token", google_utils.get_client_id_from_file(), google_utils.get_client_secret_from_file()) | |
| course_folder_id = cloud_utils.create_google_folder(service, new_course) | |
| cloud_subfolder_id = cloud_utils.create_google_folder(service, cat_name, parent_id=course_folder_id) | |
| db_folder = db.query(models.IndexedFolder).filter(models.IndexedFolder.drive_id == d.id, models.IndexedFolder.folder_id == cloud_subfolder_id).first() | |
| if not db_folder: | |
| db_folder = models.IndexedFolder(drive_id=d.id, folder_id=cloud_subfolder_id, folder_name=db_folder_name, last_synced=datetime.utcnow()) | |
| db.add(db_folder); db.commit(); db.refresh(db_folder) | |
| if cloud_utils.move_google_file(service, file.cloud_id, cloud_subfolder_id): | |
| file.folder_id = db_folder.id | |
| file.is_sorted = True | |
| create_log(db, current_user.email, "MANUAL_MOVE", f"Moved {file.name} to {db_folder_name}") | |
| elif d.provider == "dropbox": | |
| dbx = dropbox_utils.get_dropbox_client(d.access_token) | |
| full_cloud_path = f"/{new_course}/{cat_name}" | |
| cloud_utils.create_dropbox_folder(dbx, new_course) | |
| cloud_utils.create_dropbox_folder(dbx, full_cloud_path) | |
| db_folder = db.query(models.IndexedFolder).filter(models.IndexedFolder.drive_id == d.id, models.IndexedFolder.folder_name == db_folder_name).first() | |
| if not db_folder: | |
| db_folder = models.IndexedFolder(drive_id=d.id, folder_id=full_cloud_path, folder_name=db_folder_name, last_synced=datetime.utcnow()) | |
| db.add(db_folder); db.commit(); db.refresh(db_folder) | |
| from_path = file.cloud_id | |
| if cloud_utils.move_dropbox_file(dbx, from_path, f"{new_course}/{cat_name}", file.name): | |
| file.folder_id = db_folder.id | |
| file.cloud_id = f"{full_cloud_path}/{file.name}" | |
| file.is_sorted = True | |
| create_log(db, current_user.email, "MANUAL_MOVE", f"Moved {file.name} to {db_folder_name}") | |
| except Exception as e: | |
| print(f"Auto-move failed during course update: {e}") | |
| db.commit() | |
| return {"message": "Course updated successfully!"} | |
| def smart_sort_folder(folder_db_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| """PURE AI SORT: Predicts categories and extracts course codes via Regex.""" | |
| f = db.query(models.IndexedFolder).filter(models.IndexedFolder.id == folder_db_id).first() | |
| if not f or f.drive.user_id != current_user.id: raise HTTPException(status_code=403, detail="Not authorized") | |
| if getattr(f, 'is_sorting', False): raise HTTPException(status_code=400, detail="Folder is already being sorted.") | |
| f.is_sorting = True | |
| db.commit() | |
| sorted_count = 0 | |
| skipped_count = 0 | |
| try: | |
| custom_rules = db.query(models.CustomRule).filter(models.CustomRule.user_id == current_user.id).all() | |
| disabled_defs = current_user.disabled_defaults | |
| for file in f.files: | |
| if getattr(file, 'is_locked', False) or file.category != "Unsorted": | |
| skipped_count += 1 | |
| continue | |
| category, confidence = predict_category(file.name, custom_rules, disabled_defs) | |
| file.category = category | |
| # Only use regex if the user hasn't manually overridden the course code | |
| if not file.course_code or file.course_code == 'General': | |
| file.course_code = extract_course_code(file.name) | |
| sorted_count += 1 | |
| create_log(db, current_user.email, "SORT_FOLDER", f"AI Sorted {sorted_count} files in '{f.folder_name}'") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Sorting failed: {str(e)}") | |
| finally: | |
| f.is_sorting = False | |
| db.commit() | |
| return {"message": f"Successfully AI sorted {sorted_count} files.", "sorted_count": sorted_count} | |
| def assign_folder_course(folder_db_id: int, payload: CourseOverride, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| """NEW MANUAL OVERRIDE: Sets course code, predicts categories, physically MOVES files, and LOCKS them.""" | |
| f = db.query(models.IndexedFolder).filter(models.IndexedFolder.id == folder_db_id).first() | |
| if not f or f.drive.user_id != current_user.id: raise HTTPException(status_code=403) | |
| new_course = normalize_course_code(payload.course_code) or "General" | |
| updated_count = 0 | |
| folder_cache = {} | |
| d = f.drive | |
| try: | |
| if d.provider == "google": | |
| service = google_utils.get_drive_service(d.access_token, d.refresh_token, "https://oauth2.googleapis.com/token", google_utils.get_client_id_from_file(), google_utils.get_client_secret_from_file()) | |
| elif d.provider == "dropbox": | |
| dbx = dropbox_utils.get_dropbox_client(d.access_token) | |
| custom_rules = db.query(models.CustomRule).filter(models.CustomRule.user_id == current_user.id).all() | |
| disabled_defs = current_user.disabled_defaults | |
| for file in f.files: | |
| # Predict category if it hasn't been sorted yet | |
| if file.category == "Unsorted": | |
| category, conf = predict_category(file.name, custom_rules, disabled_defs) | |
| file.category = category | |
| file.course_code = new_course | |
| cat_name = file.category | |
| db_folder_name = f"{new_course} / {cat_name}" | |
| # --- GOOGLE DRIVE MOVE LOGIC --- | |
| if d.provider == "google": | |
| if new_course not in folder_cache: | |
| folder_cache[new_course] = cloud_utils.create_google_folder(service, new_course) | |
| course_folder_id = folder_cache[new_course] | |
| if db_folder_name not in folder_cache: | |
| cloud_subfolder_id = cloud_utils.create_google_folder(service, cat_name, parent_id=course_folder_id) | |
| db_folder = db.query(models.IndexedFolder).filter(models.IndexedFolder.drive_id == d.id, models.IndexedFolder.folder_id == cloud_subfolder_id).first() | |
| if not db_folder: | |
| db_folder = models.IndexedFolder(drive_id=d.id, folder_id=cloud_subfolder_id, folder_name=db_folder_name, last_synced=datetime.utcnow()) | |
| db.add(db_folder) | |
| db.commit(); db.refresh(db_folder) | |
| folder_cache[db_folder_name] = { "cloud_id": cloud_subfolder_id, "db_id": db_folder.id } | |
| if cloud_utils.move_google_file(service, file.cloud_id, folder_cache[db_folder_name]["cloud_id"]): | |
| file.folder_id = folder_cache[db_folder_name]["db_id"] | |
| file.is_sorted = True | |
| file.is_locked = True # <--- Locks it to protect from 'Apply Sort' | |
| updated_count += 1 | |
| # --- DROPBOX MOVE LOGIC --- | |
| elif d.provider == "dropbox": | |
| full_cloud_path = f"/{new_course}/{cat_name}" | |
| if db_folder_name not in folder_cache: | |
| cloud_utils.create_dropbox_folder(dbx, new_course) | |
| cloud_utils.create_dropbox_folder(dbx, full_cloud_path) | |
| db_folder = db.query(models.IndexedFolder).filter(models.IndexedFolder.drive_id == d.id, models.IndexedFolder.folder_name == db_folder_name).first() | |
| if not db_folder: | |
| db_folder = models.IndexedFolder(drive_id=d.id, folder_id=full_cloud_path, folder_name=db_folder_name, last_synced=datetime.utcnow()) | |
| db.add(db_folder) | |
| db.commit(); db.refresh(db_folder) | |
| folder_cache[db_folder_name] = { "cloud_id": full_cloud_path, "db_id": db_folder.id } | |
| from_path = file.cloud_id | |
| if cloud_utils.move_dropbox_file(dbx, from_path, f"{new_course}/{cat_name}", file.name): | |
| file.folder_id = folder_cache[db_folder_name]["db_id"] | |
| file.cloud_id = f"{full_cloud_path}/{file.name}" | |
| file.is_sorted = True | |
| file.is_locked = True # <--- Locks it to protect from 'Apply Sort' | |
| updated_count += 1 | |
| create_log(db, current_user.email, "ASSIGN_COURSE", f"Assigned {new_course}, moved, and locked {updated_count} files in '{f.folder_name}'") | |
| db.commit() | |
| return {"message": f"Successfully created folders, moved, and locked {updated_count} files!"} | |
| except Exception as e: | |
| db.commit() | |
| raise HTTPException(status_code=500, detail=f"Error moving files: {str(e)}") | |
| def smart_sort_drive(drive_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| d = db.query(models.ConnectedDrive).filter(models.ConnectedDrive.id == drive_id, models.ConnectedDrive.user_id == current_user.id).first() | |
| if not d: raise HTTPException(status_code=404, detail="Drive not found") | |
| folders = d.folders | |
| if any(getattr(f, 'is_sorting', False) for f in folders): raise HTTPException(status_code=400, detail="One or more folders are currently sorting. Please wait.") | |
| for f in folders: f.is_sorting = True | |
| db.commit() | |
| sorted_count = 0 | |
| try: | |
| custom_rules = db.query(models.CustomRule).filter(models.CustomRule.user_id == current_user.id).all() | |
| disabled_defs = current_user.disabled_defaults | |
| for f in folders: | |
| for file in f.files: | |
| if getattr(file, 'is_locked', False) or file.category != "Unsorted": | |
| continue | |
| category, confidence = predict_category(file.name, custom_rules, disabled_defs) | |
| file.category = category | |
| # ---> THE MISSING LINE: This physically saves it to the DB! <--- | |
| file.course_code = extract_course_code(file.name) | |
| sorted_count += 1 | |
| create_log(db, current_user.email, "SORT_DRIVE", f"Sorted {sorted_count} files across drive {d.drive_email}") | |
| finally: | |
| for f in folders: f.is_sorting = False | |
| db.commit() | |
| return {"message": f"Successfully sorted {sorted_count} files across all folders."} | |
| def ignore_duplicate(file_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| file = db.query(models.FileMetadata).filter(models.FileMetadata.id == file_id).first() | |
| if not file: raise HTTPException(status_code=404) | |
| file.is_ignored_duplicate = True | |
| db.commit() | |
| create_log(db, current_user.email, "DUPLICATE_IGNORED", f"Ignored duplicate file: {file.name}") # <--- NEW LOG | |
| return {"message": "File ignored in cleanup."} | |
| def delete_duplicate(file_id: int, current_user: Annotated[models.User, Depends(get_unrestricted_user)], db: Session = Depends(database.get_db)): | |
| file = db.query(models.FileMetadata).filter(models.FileMetadata.id == file_id).first() | |
| if not file: raise HTTPException(status_code=404) | |
| success = False | |
| if file.cloud_provider == "google": | |
| service = google_utils.get_drive_service(file.folder.drive.access_token, file.folder.drive.refresh_token, "https://oauth2.googleapis.com/token", google_utils.get_client_id_from_file(), google_utils.get_client_secret_from_file()) | |
| success = cloud_utils.delete_google_file(service, file.cloud_id) | |
| elif file.cloud_provider == "dropbox": | |
| dbx = dropbox_utils.get_dropbox_client(file.folder.drive.access_token) | |
| success = cloud_utils.delete_dropbox_file(dbx, f"/{file.name}") | |
| if success: | |
| file_name = file.name | |
| db.delete(file) | |
| db.commit() | |
| create_log(db, current_user.email, "DUPLICATE_DELETED", f"Permanently deleted cloud file: {file_name}") # <--- NEW LOG | |
| return {"message": "File permanently deleted."} | |
| raise HTTPException(status_code=500, detail="Failed to delete file from cloud provider.") | |
| def preview_folder_sort( | |
| folder_db_id: int, | |
| current_user: Annotated[models.User, Depends(get_unrestricted_user)], | |
| custom_course: Optional[str] = None, | |
| db: Session = Depends(database.get_db) | |
| ): | |
| """Generates a visual Dry Run for a SINGLE folder, with an optional course override.""" | |
| f = db.query(models.IndexedFolder).filter(models.IndexedFolder.id == folder_db_id).first() | |
| if not f or f.drive.user_id != current_user.id: raise HTTPException(status_code=403) | |
| norm_course = normalize_course_code(custom_course) if custom_course else None | |
| preview = [] | |
| custom_rules = db.query(models.CustomRule).filter(models.CustomRule.user_id == current_user.id).all() | |
| disabled_defs = current_user.disabled_defaults | |
| for file in f.files: | |
| if getattr(file, 'is_locked', False) or file.is_sorted: continue | |
| # Predict category if it hasn't been done yet | |
| cat_name = file.category | |
| if cat_name == "Unsorted": | |
| cat_name, _ = predict_category(file.name, custom_rules, disabled_defs) | |
| # Use the override if provided, otherwise fallback to AI regex extraction | |
| course_code = norm_course or extract_course_code(file.name) | |
| if not course_code: course_code = "General" | |
| new_name = f"{course_code}_{cat_name} - {file.name}" | |
| new_path = f"{course_code} / {cat_name}" | |
| preview.append({ | |
| "file_id": file.id, | |
| "old_name": file.name, | |
| "new_name": new_name, | |
| "old_path": file.original_folder_name or f.folder_name, | |
| "new_path": new_path | |
| }) | |
| return {"files": preview, "total": len(preview)} | |
| def apply_folder_sort( | |
| folder_db_id: int, | |
| current_user: Annotated[models.User, Depends(get_unrestricted_user)], | |
| custom_course: Optional[str] = None, | |
| db: Session = Depends(database.get_db) | |
| ): | |
| """Physically moves and renames files for a SINGLE folder, saving the Undo state.""" | |
| f = db.query(models.IndexedFolder).filter(models.IndexedFolder.id == folder_db_id).first() | |
| if not f or f.drive.user_id != current_user.id: raise HTTPException(status_code=403) | |
| norm_course = normalize_course_code(custom_course) if custom_course else None | |
| d = f.drive | |
| moved_count = 0 | |
| folder_cache = {} | |
| undo_data = [] | |
| try: | |
| if d.provider == "google": | |
| service = google_utils.get_drive_service(d.access_token, d.refresh_token, "https://oauth2.googleapis.com/token", google_utils.get_client_id_from_file(), google_utils.get_client_secret_from_file()) | |
| elif d.provider == "dropbox": | |
| dbx = dropbox_utils.get_dropbox_client(d.access_token) | |
| custom_rules = db.query(models.CustomRule).filter(models.CustomRule.user_id == current_user.id).all() | |
| disabled_defs = current_user.disabled_defaults | |
| for file in f.files: | |
| if getattr(file, 'is_locked', False) or file.is_sorted: continue | |
| if file.category == "Unsorted": | |
| cat, _ = predict_category(file.name, custom_rules, disabled_defs) | |
| file.category = cat | |
| file.course_code = norm_course or extract_course_code(file.name) or "General" | |
| course_code = file.course_code | |
| cat_name = file.category | |
| new_file_name = f"{course_code}_{cat_name} - {file.name}" | |
| db_folder_name = f"{course_code} / {cat_name}" | |
| undo_data.append({ | |
| "file_db_id": file.id, "original_cloud_id": file.cloud_id, "original_name": file.name, | |
| "original_db_folder_id": file.folder_id, "original_cloud_folder_id": file.folder.folder_id | |
| }) | |
| # --- GOOGLE LOGIC --- | |
| if d.provider == "google": | |
| if course_code not in folder_cache: | |
| folder_cache[course_code] = cloud_utils.create_google_folder(service, course_code) | |
| course_folder_id = folder_cache[course_code] | |
| if db_folder_name not in folder_cache: | |
| cloud_subfolder_id = cloud_utils.create_google_folder(service, cat_name, parent_id=course_folder_id) | |
| db_folder = db.query(models.IndexedFolder).filter(models.IndexedFolder.drive_id == d.id, models.IndexedFolder.folder_id == cloud_subfolder_id).first() | |
| if not db_folder: | |
| db_folder = models.IndexedFolder(drive_id=d.id, folder_id=cloud_subfolder_id, folder_name=db_folder_name, last_synced=datetime.utcnow()) | |
| db.add(db_folder); db.commit(); db.refresh(db_folder) | |
| folder_cache[db_folder_name] = { "cloud_id": cloud_subfolder_id, "db_id": db_folder.id } | |
| if cloud_utils.move_google_file(service, file.cloud_id, folder_cache[db_folder_name]["cloud_id"], new_name=new_file_name): | |
| file.name = new_file_name; file.is_sorted = True; file.is_locked = True | |
| file.folder_id = folder_cache[db_folder_name]["db_id"] | |
| moved_count += 1 | |
| # --- DROPBOX LOGIC --- | |
| elif d.provider == "dropbox": | |
| full_cloud_path = f"/{course_code}/{cat_name}" | |
| if db_folder_name not in folder_cache: | |
| cloud_utils.create_dropbox_folder(dbx, course_code) | |
| cloud_utils.create_dropbox_folder(dbx, full_cloud_path) | |
| db_folder = db.query(models.IndexedFolder).filter(models.IndexedFolder.drive_id == d.id, models.IndexedFolder.folder_name == db_folder_name).first() | |
| if not db_folder: | |
| db_folder = models.IndexedFolder(drive_id=d.id, folder_id=full_cloud_path, folder_name=db_folder_name, last_synced=datetime.utcnow()) | |
| db.add(db_folder); db.commit(); db.refresh(db_folder) | |
| folder_cache[db_folder_name] = { "cloud_id": full_cloud_path, "db_id": db_folder.id } | |
| from_path = file.cloud_id | |
| if cloud_utils.move_dropbox_file(dbx, from_path, f"{course_code}/{cat_name}", file.name, new_name=new_file_name): | |
| file.name = new_file_name; file.is_sorted = True; file.is_locked = True | |
| file.folder_id = folder_cache[db_folder_name]["db_id"]; file.cloud_id = f"{full_cloud_path}/{new_file_name}" | |
| moved_count += 1 | |
| db.commit() | |
| create_log(db, current_user.email, "APPLY_SORT", f"Moved and renamed {moved_count} files in '{f.folder_name}'.", undo_payload=json.dumps(undo_data)) | |
| return {"message": f"Successfully previewed, renamed, and moved {moved_count} files!"} | |
| except Exception as e: | |
| db.commit() | |
| raise HTTPException(status_code=500, detail=f"Error moving files: {str(e)}") |