docuscan-ai / models.py
pramodmisra's picture
Add email forwarding β€” SendGrid Inbound Parse with multi-tenant routing
db992a6
"""
DocuScan AI β€” SQLAlchemy Models
Multi-org with roles: super_admin, admin, manager, user.
Persistent storage with soft-delete everywhere.
"""
import json
from datetime import datetime
from sqlalchemy import Column, Integer, String, Float, Boolean, Text, DateTime, ForeignKey
from sqlalchemy.orm import declarative_base, relationship
Base = declarative_base()
class Organization(Base):
__tablename__ = "organizations"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(255), nullable=False)
slug = Column(String(100), unique=True, nullable=False, index=True)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
users = relationship("User", back_populates="organization")
def to_dict(self):
return {
"id": self.id,
"name": self.name,
"slug": self.slug,
"is_active": self.is_active,
"created_at": self.created_at.isoformat() if self.created_at else "",
}
class ManagerAssignment(Base):
"""Many-to-many: which users a manager oversees."""
__tablename__ = "manager_assignments"
id = Column(Integer, primary_key=True, autoincrement=True)
manager_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
created_at = Column(DateTime, default=datetime.utcnow)
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, autoincrement=True)
email = Column(String(255), unique=True, nullable=False, index=True)
hashed_password = Column(String(255), nullable=False)
full_name = Column(String(255), nullable=False)
role = Column(String(50), nullable=False, default="user") # super_admin / admin / manager / user
org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True, index=True)
is_active = Column(Boolean, default=True)
reset_token = Column(String(255), nullable=True)
reset_token_exp = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
organization = relationship("Organization", back_populates="users")
# Users this manager oversees
managed_users = relationship(
"User",
secondary="manager_assignments",
primaryjoin="User.id == ManagerAssignment.manager_id",
secondaryjoin="User.id == ManagerAssignment.user_id",
viewonly=True,
)
def to_dict(self, include_org=True):
d = {
"id": self.id,
"email": self.email,
"full_name": self.full_name,
"role": self.role,
"org_id": self.org_id,
"is_active": self.is_active,
"created_at": self.created_at.isoformat() if self.created_at else "",
}
if include_org and self.organization:
d["org_name"] = self.organization.name
d["org_slug"] = self.organization.slug
return d
class Document(Base):
__tablename__ = "documents"
id = Column(Integer, primary_key=True, autoincrement=True)
doc_id = Column(String(50), unique=True, nullable=False, index=True)
filename = Column(String(500), nullable=False)
file_path = Column(String(1000), nullable=False)
file_size = Column(Integer, default=0)
pages = Column(Integer, default=0)
status = Column(String(50), default="processing")
error = Column(Text, nullable=True)
summary = Column(Text, nullable=True)
doc_type = Column(String(50), default="other")
uploaded_by = Column(String(255), nullable=False)
org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True, index=True)
cloud_index_name = Column(String(255), nullable=True)
outputs_json = Column(Text, default="[]")
conversion_status_json = Column(Text, default='{}')
extracted_data_json = Column(Text, nullable=True)
suggestions_json = Column(Text, nullable=True) # JSON array of smart suggestions
epic_client_id = Column(String(255), nullable=True)
epic_policy_id = Column(String(255), nullable=True)
epic_posted = Column(Boolean, default=False)
epic_entry_ids_json = Column(Text, nullable=True)
is_deleted = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
@property
def outputs(self):
try:
return json.loads(self.outputs_json or "[]")
except (json.JSONDecodeError, TypeError):
return []
@outputs.setter
def outputs(self, value):
self.outputs_json = json.dumps(value)
@property
def conversion_status(self):
try:
return json.loads(self.conversion_status_json or "{}")
except (json.JSONDecodeError, TypeError):
return {}
@conversion_status.setter
def conversion_status(self, value):
self.conversion_status_json = json.dumps(value)
@property
def extracted_data(self):
try:
return json.loads(self.extracted_data_json) if self.extracted_data_json else None
except (json.JSONDecodeError, TypeError):
return None
@extracted_data.setter
def extracted_data(self, value):
self.extracted_data_json = json.dumps(value) if value else None
@property
def suggestions(self):
try:
return json.loads(self.suggestions_json) if self.suggestions_json else []
except (json.JSONDecodeError, TypeError):
return []
@suggestions.setter
def suggestions(self, value):
self.suggestions_json = json.dumps(value) if value else None
@property
def epic_entry_ids(self):
try:
return json.loads(self.epic_entry_ids_json) if self.epic_entry_ids_json else []
except (json.JSONDecodeError, TypeError):
return []
@epic_entry_ids.setter
def epic_entry_ids(self, value):
self.epic_entry_ids_json = json.dumps(value) if value else None
def to_dict(self, include_path=False):
d = {
"id": self.doc_id,
"filename": self.filename,
"size": self.file_size,
"pages": self.pages,
"status": self.status,
"summary": self.summary or "",
"doc_type": self.doc_type,
"uploaded_by": self.uploaded_by,
"org_id": self.org_id,
"uploaded_at": self.created_at.isoformat() if self.created_at else "",
"outputs": self.outputs,
"conversion_status": self.conversion_status,
"extracted_data": self.extracted_data,
"suggestions": self.suggestions,
"epic_client_id": self.epic_client_id,
"epic_policy_id": self.epic_policy_id,
"epic_posted": self.epic_posted,
}
if self.error:
d["error"] = self.error
if include_path:
d["file_path"] = self.file_path
return d
class ChatMessage(Base):
__tablename__ = "chat_messages"
id = Column(Integer, primary_key=True, autoincrement=True)
session_id = Column(String(100), nullable=False, index=True)
role = Column(String(20), nullable=False)
content = Column(Text, nullable=False)
sources_json = Column(Text, nullable=True)
user_email = Column(String(255), nullable=False)
org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
@property
def sources(self):
try:
return json.loads(self.sources_json) if self.sources_json else []
except (json.JSONDecodeError, TypeError):
return []
@sources.setter
def sources(self, value):
self.sources_json = json.dumps(value) if value else None
class BillingEvent(Base):
__tablename__ = "billing_events"
id = Column(Integer, primary_key=True, autoincrement=True)
event_id = Column(String(50), unique=True, nullable=False)
event_type = Column(String(50), nullable=False)
doc_id = Column(String(50), nullable=True)
filename = Column(String(500), nullable=True)
cost = Column(Float, default=0.0)
credits_used = Column(Float, default=0.0)
user_email = Column(String(255), nullable=False)
org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
class WorkflowRun(Base):
__tablename__ = "workflow_runs"
id = Column(Integer, primary_key=True, autoincrement=True)
run_id = Column(String(50), unique=True, nullable=False, index=True)
workflow_type = Column(String(50), nullable=False)
status = Column(String(50), default="pending")
input_doc_ids_json = Column(Text, default="[]")
result_json = Column(Text, nullable=True)
error = Column(Text, nullable=True)
user_email = Column(String(255), nullable=False)
org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True)
started_at = Column(DateTime, nullable=True)
completed_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
@property
def input_doc_ids(self):
try:
return json.loads(self.input_doc_ids_json or "[]")
except (json.JSONDecodeError, TypeError):
return []
@input_doc_ids.setter
def input_doc_ids(self, value):
self.input_doc_ids_json = json.dumps(value)
@property
def result(self):
try:
return json.loads(self.result_json) if self.result_json else None
except (json.JSONDecodeError, TypeError):
return None
@result.setter
def result(self, value):
self.result_json = json.dumps(value) if value else None
def to_dict(self):
return {
"run_id": self.run_id,
"workflow_type": self.workflow_type,
"status": self.status,
"input_doc_ids": self.input_doc_ids,
"result": self.result,
"error": self.error,
"user_email": self.user_email,
"org_id": self.org_id,
"started_at": self.started_at.isoformat() if self.started_at else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
# ═══════════════════════════════════════════════════════════════════════════════
# Pipeline & Review Models β€” Full audit trail for document-to-Epic processing
# ═══════════════════════════════════════════════════════════════════════════════
PIPELINE_STAGES = [
"intake", # Document received, metadata captured
"classify", # Auto-detect document type
"extract", # Pull structured fields
"match", # Find customer/policy in data lake
"validate", # Confidence scoring, duplicate check
"route", # Auto-post / review / reject decision
"act", # Post to Epic (or queue for manual)
"confirm", # Verify posting, record audit trail
]
class PipelineRun(Base):
"""One run of the document processing pipeline for a single document."""
__tablename__ = "pipeline_runs"
id = Column(Integer, primary_key=True, autoincrement=True)
run_id = Column(String(50), unique=True, nullable=False, index=True) # pr_xxx
doc_id = Column(String(50), nullable=False, index=True)
pipeline_type = Column(String(50), nullable=False) # carrier_statement / endorsement / claim / policy / coi / general
status = Column(String(50), default="running") # running / review_pending / completed / error / cancelled
mode = Column(String(20), default="trial") # trial / live
carrier = Column(String(255), nullable=True)
# Aggregate counts (for batch docs like carrier statements)
total_items = Column(Integer, default=0)
matched_count = Column(Integer, default=0)
auto_count = Column(Integer, default=0) # Auto-approved (>=0.95)
review_count = Column(Integer, default=0) # Needs review (0.80-0.94)
reject_count = Column(Integer, default=0) # Rejected (<0.80)
posted_count = Column(Integer, default=0) # Successfully posted to Epic
current_stage = Column(String(50), default="intake")
result_json = Column(Text, nullable=True)
error = Column(Text, nullable=True)
user_email = Column(String(255), nullable=False)
org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True)
started_at = Column(DateTime, nullable=True)
completed_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
@property
def result(self):
try:
return json.loads(self.result_json) if self.result_json else None
except (json.JSONDecodeError, TypeError):
return None
@result.setter
def result(self, value):
self.result_json = json.dumps(value) if value else None
def to_dict(self):
return {
"run_id": self.run_id,
"doc_id": self.doc_id,
"pipeline_type": self.pipeline_type,
"status": self.status,
"mode": self.mode,
"carrier": self.carrier,
"total_items": self.total_items,
"matched_count": self.matched_count,
"auto_count": self.auto_count,
"review_count": self.review_count,
"reject_count": self.reject_count,
"posted_count": self.posted_count,
"current_stage": self.current_stage,
"result": self.result,
"error": self.error,
"user_email": self.user_email,
"started_at": self.started_at.isoformat() if self.started_at else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
class PipelineStage(Base):
"""Audit record for each stage in a pipeline run."""
__tablename__ = "pipeline_stages"
id = Column(Integer, primary_key=True, autoincrement=True)
run_id = Column(String(50), ForeignKey("pipeline_runs.run_id"), nullable=False, index=True)
stage_name = Column(String(50), nullable=False) # intake/classify/extract/match/validate/route/act/confirm
stage_order = Column(Integer, nullable=False)
status = Column(String(50), default="pending") # pending / running / completed / skipped / error
input_summary = Column(Text, nullable=True) # Brief description of input
output_data_json = Column(Text, nullable=True) # Stage output (JSON)
confidence_score = Column(Float, nullable=True) # 0.0 - 1.0
items_processed = Column(Integer, default=0)
items_flagged = Column(Integer, default=0) # Items needing review
requires_review = Column(Boolean, default=False)
reviewed_by = Column(String(255), nullable=True)
review_action = Column(String(50), nullable=True) # approve / reject / skip
review_notes = Column(Text, nullable=True)
reviewed_at = Column(DateTime, nullable=True)
error = Column(Text, nullable=True)
started_at = Column(DateTime, nullable=True)
completed_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
@property
def output_data(self):
try:
return json.loads(self.output_data_json) if self.output_data_json else None
except (json.JSONDecodeError, TypeError):
return None
@output_data.setter
def output_data(self, value):
self.output_data_json = json.dumps(value) if value else None
def to_dict(self):
return {
"run_id": self.run_id,
"stage_name": self.stage_name,
"stage_order": self.stage_order,
"status": self.status,
"input_summary": self.input_summary,
"output_data": self.output_data,
"confidence_score": self.confidence_score,
"items_processed": self.items_processed,
"items_flagged": self.items_flagged,
"requires_review": self.requires_review,
"reviewed_by": self.reviewed_by,
"review_action": self.review_action,
"review_notes": self.review_notes,
"reviewed_at": self.reviewed_at.isoformat() if self.reviewed_at else None,
"error": self.error,
"started_at": self.started_at.isoformat() if self.started_at else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
}
class ReviewItem(Base):
"""Individual items requiring human review (e.g., each line item in a carrier statement)."""
__tablename__ = "review_items"
id = Column(Integer, primary_key=True, autoincrement=True)
item_id = Column(String(50), unique=True, nullable=False, index=True) # rv_xxx
run_id = Column(String(50), ForeignKey("pipeline_runs.run_id"), nullable=False, index=True)
stage_name = Column(String(50), nullable=False) # Which stage generated this
item_type = Column(String(50), nullable=False) # line_item / policy_match / field_mismatch / duplicate / missing_field
item_data_json = Column(Text, nullable=False) # The actual data needing review (JSON)
confidence_score = Column(Float, nullable=True)
reason = Column(Text, nullable=True) # Why flagged for review
matched_policy_json = Column(Text, nullable=True) # Best match from data lake (JSON)
status = Column(String(50), default="pending") # pending / approved / rejected / deferred
reviewed_by = Column(String(255), nullable=True)
review_notes = Column(Text, nullable=True)
reviewed_at = Column(DateTime, nullable=True)
epic_entry_id = Column(String(255), nullable=True) # If posted to Epic
epic_posted_at = Column(DateTime, nullable=True)
user_email = Column(String(255), nullable=False)
org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
@property
def item_data(self):
try:
return json.loads(self.item_data_json) if self.item_data_json else {}
except (json.JSONDecodeError, TypeError):
return {}
@item_data.setter
def item_data(self, value):
self.item_data_json = json.dumps(value) if value else None
@property
def matched_policy(self):
try:
return json.loads(self.matched_policy_json) if self.matched_policy_json else None
except (json.JSONDecodeError, TypeError):
return None
@matched_policy.setter
def matched_policy(self, value):
self.matched_policy_json = json.dumps(value) if value else None
def to_dict(self):
return {
"item_id": self.item_id,
"run_id": self.run_id,
"stage_name": self.stage_name,
"item_type": self.item_type,
"item_data": self.item_data,
"confidence_score": self.confidence_score,
"reason": self.reason,
"matched_policy": self.matched_policy,
"status": self.status,
"reviewed_by": self.reviewed_by,
"review_notes": self.review_notes,
"reviewed_at": self.reviewed_at.isoformat() if self.reviewed_at else None,
"epic_entry_id": self.epic_entry_id,
"epic_posted_at": self.epic_posted_at.isoformat() if self.epic_posted_at else None,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
# ═══════════════════════════════════════════════════════════════════════════════
# Cloud Storage Integration β€” OAuth connections & file imports
# ═══════════════════════════════════════════════════════════════════════════════
class CloudConnection(Base):
"""OAuth or credential-based connection to a cloud storage provider."""
__tablename__ = "cloud_connections"
id = Column(Integer, primary_key=True, autoincrement=True)
connection_id = Column(String(50), unique=True, nullable=False, index=True) # cc_xxx
user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True, index=True)
provider = Column(String(50), nullable=False) # google_drive / onedrive / sharepoint / s3 / dropbox
display_name = Column(String(255), nullable=True)
access_token_encrypted = Column(Text, nullable=True)
refresh_token_encrypted = Column(Text, nullable=True)
token_expires_at = Column(DateTime, nullable=True)
provider_user_email = Column(String(255), nullable=True)
provider_metadata_json = Column(Text, default="{}") # tenant_id, drive_id, bucket, region, etc.
status = Column(String(50), default="active") # active / expired / revoked
last_used_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
@property
def provider_metadata(self):
try:
return json.loads(self.provider_metadata_json or "{}")
except (json.JSONDecodeError, TypeError):
return {}
@provider_metadata.setter
def provider_metadata(self, value):
self.provider_metadata_json = json.dumps(value) if value else "{}"
def to_dict(self):
return {
"connection_id": self.connection_id,
"user_id": self.user_id,
"org_id": self.org_id,
"provider": self.provider,
"display_name": self.display_name,
"provider_user_email": self.provider_user_email,
"provider_metadata": self.provider_metadata,
"status": self.status,
"last_used_at": self.last_used_at.isoformat() if self.last_used_at else None,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
class CloudImport(Base):
"""Track individual file imports from cloud storage."""
__tablename__ = "cloud_imports"
id = Column(Integer, primary_key=True, autoincrement=True)
import_id = Column(String(50), unique=True, nullable=False, index=True) # ci_xxx
connection_id = Column(String(50), ForeignKey("cloud_connections.connection_id"), nullable=False, index=True)
provider = Column(String(50), nullable=False)
cloud_file_id = Column(String(500), nullable=True)
cloud_file_path = Column(String(1000), nullable=True)
cloud_file_name = Column(String(500), nullable=False)
cloud_file_size = Column(Integer, default=0)
doc_id = Column(String(50), nullable=True) # FK to documents.doc_id once imported
status = Column(String(50), default="downloading") # downloading / processing / ready / error
error = Column(Text, nullable=True)
user_email = Column(String(255), nullable=False)
org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
def to_dict(self):
return {
"import_id": self.import_id,
"connection_id": self.connection_id,
"provider": self.provider,
"cloud_file_id": self.cloud_file_id,
"cloud_file_path": self.cloud_file_path,
"cloud_file_name": self.cloud_file_name,
"cloud_file_size": self.cloud_file_size,
"doc_id": self.doc_id,
"status": self.status,
"error": self.error,
"user_email": self.user_email,
"org_id": self.org_id,
"created_at": self.created_at.isoformat() if self.created_at else None,
}
class EmailImport(Base):
"""Track inbound email imports β€” carrier statements forwarded to org-specific addresses."""
__tablename__ = "email_imports"
id = Column(Integer, primary_key=True, autoincrement=True)
import_id = Column(String(50), unique=True, nullable=False, index=True) # ei_xxx
org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True, index=True)
from_email = Column(String(255), nullable=False)
to_email = Column(String(255), nullable=False)
subject = Column(String(1000), nullable=True)
attachment_name = Column(String(500), nullable=False)
attachment_size = Column(Integer, default=0)
doc_id = Column(String(50), nullable=True) # FK to documents.doc_id once processed
status = Column(String(50), default="received") # received / processing / ready / error
error = Column(Text, nullable=True)
matched_user_email = Column(String(255), nullable=True) # user who "owns" this import
created_at = Column(DateTime, default=datetime.utcnow)
def to_dict(self):
return {
"import_id": self.import_id,
"org_id": self.org_id,
"from_email": self.from_email,
"to_email": self.to_email,
"subject": self.subject,
"attachment_name": self.attachment_name,
"attachment_size": self.attachment_size,
"doc_id": self.doc_id,
"status": self.status,
"error": self.error,
"matched_user_email": self.matched_user_email,
"created_at": self.created_at.isoformat() if self.created_at else None,
}