Spaces:
Sleeping
Sleeping
| """ | |
| Day 3 verification script — tests processors directly (bypasses Celery). | |
| Run: python scripts/test_day3.py | |
| """ | |
| import json | |
| import os | |
| import sys | |
| import uuid | |
| from datetime import datetime | |
| from pathlib import Path | |
| # Load .env before any app imports | |
| from dotenv import load_dotenv | |
| load_dotenv(Path(__file__).parent.parent / ".env") | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from sqlmodel import Session, create_engine | |
| from app.models.db import Job, JobStatus, User, UserRole, UsageLog | |
| from app.observability.logging import configure_logging | |
| configure_logging() | |
| DATABASE_URL = os.environ["DATABASE_URL"] | |
| engine = create_engine(DATABASE_URL, echo=False) | |
| import sys as _sys | |
| # Allow passing specific file types: python test_day3.py pdf csv | |
| _filter = set(_sys.argv[1:]) if len(_sys.argv) > 1 else None | |
| _ALL_FILES = { | |
| "pdf": "C:/tmp/geminirag_test_files/test.pdf", | |
| "docx": "C:/tmp/geminirag_test_files/test.docx", | |
| "csv": "C:/tmp/geminirag_test_files/titanic.csv", | |
| } | |
| TEST_FILES = {k: v for k, v in _ALL_FILES.items() if _filter is None or k in _filter} | |
| def get_or_create_test_user(db: Session) -> User: | |
| from sqlmodel import select | |
| user = db.exec(select(User).where(User.email == "day3test@test.com")).first() | |
| if not user: | |
| from app.security import hash_password | |
| user = User( | |
| email="day3test@test.com", | |
| hashed_password=hash_password("test123"), | |
| role=UserRole.user, | |
| is_active=True, | |
| ) | |
| db.add(user) | |
| db.commit() | |
| db.refresh(user) | |
| return user | |
| def make_job(db: Session, user_id, file_type: str, file_path: str) -> Job: | |
| job = Job( | |
| id=uuid.uuid4(), | |
| user_id=user_id, | |
| filename=Path(file_path).name, | |
| file_type=file_type, | |
| file_path=file_path, | |
| file_size_bytes=Path(file_path).stat().st_size, | |
| status=JobStatus.pending, | |
| created_at=datetime.utcnow(), | |
| updated_at=datetime.utcnow(), | |
| ) | |
| db.add(job) | |
| db.commit() | |
| db.refresh(job) | |
| return job | |
| def run_test(file_type: str, file_path: str) -> dict: | |
| print(f"\n{'='*60}") | |
| print(f"Testing {file_type.upper()} processor: {file_path}") | |
| print('='*60) | |
| with Session(engine) as db: | |
| user = get_or_create_test_user(db) | |
| job = make_job(db, user.id, file_type, file_path) | |
| print(f"Job ID: {job.id}") | |
| from app.config import settings | |
| if file_type == "pdf": | |
| from app.processors.pdf import PDFProcessor | |
| processor = PDFProcessor(job=job, settings=settings) | |
| elif file_type == "docx": | |
| from app.processors.docx_proc import DOCXProcessor | |
| processor = DOCXProcessor(job=job, settings=settings) | |
| elif file_type in ("xlsx", "csv"): | |
| from app.processors.xlsx_proc import XLSXProcessor | |
| processor = XLSXProcessor(job=job, settings=settings) | |
| else: | |
| raise ValueError(f"Unknown type: {file_type}") | |
| print("Running processor.run(db) ...") | |
| text, summary = processor.run(db) | |
| print(f"\nExtracted text length: {len(text)} chars") | |
| print(f"Summary keys: {list(summary.keys())}") | |
| print(f"\nSummary preview:") | |
| print(json.dumps(summary, indent=2)[:800]) | |
| # Verify usage_logs | |
| from sqlmodel import select | |
| logs = db.exec( | |
| select(UsageLog).where(UsageLog.job_id == job.id) | |
| ).all() | |
| print(f"\nUsage logs for this job: {len(logs)}") | |
| for log in logs: | |
| print(f" endpoint={log.endpoint} model={log.model} " | |
| f"prompt_tokens={log.prompt_tokens} completion_tokens={log.completion_tokens} " | |
| f"latency_ms={log.latency_ms}ms") | |
| # Verify result saved to job | |
| db.refresh(job) | |
| assert job.result is not None, "job.result should be set" | |
| saved = json.loads(job.result) | |
| print(f"\nJob result saved to DB: {list(saved.keys())}") | |
| return { | |
| "file_type": file_type, | |
| "text_len": len(text), | |
| "summary_keys": list(summary.keys()), | |
| "usage_logs": len(logs), | |
| "tokens": sum(l.prompt_tokens + l.completion_tokens for l in logs), | |
| } | |
| def main(): | |
| import time | |
| results = [] | |
| errors = [] | |
| items = [(ft, p) for ft, p in TEST_FILES.items() if Path(p).exists()] | |
| for i, (file_type, path) in enumerate(items): | |
| if i > 0: | |
| print(f"\nWaiting 15s between calls to respect rate limits...") | |
| time.sleep(15) | |
| try: | |
| r = run_test(file_type, path) | |
| results.append(r) | |
| except Exception as e: | |
| import traceback | |
| print(f"\nERROR testing {file_type}: {e}") | |
| traceback.print_exc() | |
| errors.append((file_type, str(e))) | |
| print(f"\n\n{'='*60}") | |
| print("DAY 3 VERIFICATION SUMMARY") | |
| print('='*60) | |
| for r in results: | |
| status = "PASS" if r["usage_logs"] > 0 and r["tokens"] > 0 else "WARN" | |
| print(f"[{status}] {r['file_type'].upper()}: text={r['text_len']}chars " | |
| f"summary_keys={r['summary_keys']} usage_logs={r['usage_logs']} tokens={r['tokens']}") | |
| for ft, err in errors: | |
| print(f"[FAIL] {ft.upper()}: {err}") | |
| if errors: | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |