Multimodel_Rag / scripts /test_day3.py
Dhrumil Parikh
deploy GeminiRAG
cdc55f4
Raw
History Blame Contribute Delete
5.35 kB
"""
Day 3 verification script — tests processors directly (bypasses Celery).
Run: python scripts/test_day3.py
"""
import json
import os
import sys
import uuid
from datetime import datetime
from pathlib import Path
# Load .env before any app imports
from dotenv import load_dotenv
load_dotenv(Path(__file__).parent.parent / ".env")
sys.path.insert(0, str(Path(__file__).parent.parent))
from sqlmodel import Session, create_engine
from app.models.db import Job, JobStatus, User, UserRole, UsageLog
from app.observability.logging import configure_logging
configure_logging()
DATABASE_URL = os.environ["DATABASE_URL"]
engine = create_engine(DATABASE_URL, echo=False)
import sys as _sys
# Allow passing specific file types: python test_day3.py pdf csv
_filter = set(_sys.argv[1:]) if len(_sys.argv) > 1 else None
_ALL_FILES = {
"pdf": "C:/tmp/geminirag_test_files/test.pdf",
"docx": "C:/tmp/geminirag_test_files/test.docx",
"csv": "C:/tmp/geminirag_test_files/titanic.csv",
}
TEST_FILES = {k: v for k, v in _ALL_FILES.items() if _filter is None or k in _filter}
def get_or_create_test_user(db: Session) -> User:
from sqlmodel import select
user = db.exec(select(User).where(User.email == "day3test@test.com")).first()
if not user:
from app.security import hash_password
user = User(
email="day3test@test.com",
hashed_password=hash_password("test123"),
role=UserRole.user,
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
return user
def make_job(db: Session, user_id, file_type: str, file_path: str) -> Job:
job = Job(
id=uuid.uuid4(),
user_id=user_id,
filename=Path(file_path).name,
file_type=file_type,
file_path=file_path,
file_size_bytes=Path(file_path).stat().st_size,
status=JobStatus.pending,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
)
db.add(job)
db.commit()
db.refresh(job)
return job
def run_test(file_type: str, file_path: str) -> dict:
print(f"\n{'='*60}")
print(f"Testing {file_type.upper()} processor: {file_path}")
print('='*60)
with Session(engine) as db:
user = get_or_create_test_user(db)
job = make_job(db, user.id, file_type, file_path)
print(f"Job ID: {job.id}")
from app.config import settings
if file_type == "pdf":
from app.processors.pdf import PDFProcessor
processor = PDFProcessor(job=job, settings=settings)
elif file_type == "docx":
from app.processors.docx_proc import DOCXProcessor
processor = DOCXProcessor(job=job, settings=settings)
elif file_type in ("xlsx", "csv"):
from app.processors.xlsx_proc import XLSXProcessor
processor = XLSXProcessor(job=job, settings=settings)
else:
raise ValueError(f"Unknown type: {file_type}")
print("Running processor.run(db) ...")
text, summary = processor.run(db)
print(f"\nExtracted text length: {len(text)} chars")
print(f"Summary keys: {list(summary.keys())}")
print(f"\nSummary preview:")
print(json.dumps(summary, indent=2)[:800])
# Verify usage_logs
from sqlmodel import select
logs = db.exec(
select(UsageLog).where(UsageLog.job_id == job.id)
).all()
print(f"\nUsage logs for this job: {len(logs)}")
for log in logs:
print(f" endpoint={log.endpoint} model={log.model} "
f"prompt_tokens={log.prompt_tokens} completion_tokens={log.completion_tokens} "
f"latency_ms={log.latency_ms}ms")
# Verify result saved to job
db.refresh(job)
assert job.result is not None, "job.result should be set"
saved = json.loads(job.result)
print(f"\nJob result saved to DB: {list(saved.keys())}")
return {
"file_type": file_type,
"text_len": len(text),
"summary_keys": list(summary.keys()),
"usage_logs": len(logs),
"tokens": sum(l.prompt_tokens + l.completion_tokens for l in logs),
}
def main():
import time
results = []
errors = []
items = [(ft, p) for ft, p in TEST_FILES.items() if Path(p).exists()]
for i, (file_type, path) in enumerate(items):
if i > 0:
print(f"\nWaiting 15s between calls to respect rate limits...")
time.sleep(15)
try:
r = run_test(file_type, path)
results.append(r)
except Exception as e:
import traceback
print(f"\nERROR testing {file_type}: {e}")
traceback.print_exc()
errors.append((file_type, str(e)))
print(f"\n\n{'='*60}")
print("DAY 3 VERIFICATION SUMMARY")
print('='*60)
for r in results:
status = "PASS" if r["usage_logs"] > 0 and r["tokens"] > 0 else "WARN"
print(f"[{status}] {r['file_type'].upper()}: text={r['text_len']}chars "
f"summary_keys={r['summary_keys']} usage_logs={r['usage_logs']} tokens={r['tokens']}")
for ft, err in errors:
print(f"[FAIL] {ft.upper()}: {err}")
if errors:
sys.exit(1)
if __name__ == "__main__":
main()