""" DocuScan AI — SQLAlchemy Models Multi-org with roles: super_admin, admin, manager, user. Persistent storage with soft-delete everywhere. """ import json from datetime import datetime from sqlalchemy import Column, Integer, String, Float, Boolean, Text, DateTime, ForeignKey from sqlalchemy.orm import declarative_base, relationship Base = declarative_base() class Organization(Base): __tablename__ = "organizations" id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(255), nullable=False) slug = Column(String(100), unique=True, nullable=False, index=True) is_active = Column(Boolean, default=True) created_at = Column(DateTime, default=datetime.utcnow) users = relationship("User", back_populates="organization") def to_dict(self): return { "id": self.id, "name": self.name, "slug": self.slug, "is_active": self.is_active, "created_at": self.created_at.isoformat() if self.created_at else "", } class ManagerAssignment(Base): """Many-to-many: which users a manager oversees.""" __tablename__ = "manager_assignments" id = Column(Integer, primary_key=True, autoincrement=True) manager_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True) user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True) created_at = Column(DateTime, default=datetime.utcnow) class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, autoincrement=True) email = Column(String(255), unique=True, nullable=False, index=True) hashed_password = Column(String(255), nullable=False) full_name = Column(String(255), nullable=False) role = Column(String(50), nullable=False, default="user") # super_admin / admin / manager / user org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True, index=True) is_active = Column(Boolean, default=True) reset_token = Column(String(255), nullable=True) reset_token_exp = Column(DateTime, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) organization = relationship("Organization", back_populates="users") # Users this manager oversees managed_users = relationship( "User", secondary="manager_assignments", primaryjoin="User.id == ManagerAssignment.manager_id", secondaryjoin="User.id == ManagerAssignment.user_id", viewonly=True, ) def to_dict(self, include_org=True): d = { "id": self.id, "email": self.email, "full_name": self.full_name, "role": self.role, "org_id": self.org_id, "is_active": self.is_active, "created_at": self.created_at.isoformat() if self.created_at else "", } if include_org and self.organization: d["org_name"] = self.organization.name d["org_slug"] = self.organization.slug return d class Document(Base): __tablename__ = "documents" id = Column(Integer, primary_key=True, autoincrement=True) doc_id = Column(String(50), unique=True, nullable=False, index=True) filename = Column(String(500), nullable=False) file_path = Column(String(1000), nullable=False) file_size = Column(Integer, default=0) pages = Column(Integer, default=0) status = Column(String(50), default="processing") error = Column(Text, nullable=True) summary = Column(Text, nullable=True) doc_type = Column(String(50), default="other") uploaded_by = Column(String(255), nullable=False) org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True, index=True) cloud_index_name = Column(String(255), nullable=True) outputs_json = Column(Text, default="[]") conversion_status_json = Column(Text, default='{}') extracted_data_json = Column(Text, nullable=True) suggestions_json = Column(Text, nullable=True) # JSON array of smart suggestions epic_client_id = Column(String(255), nullable=True) epic_policy_id = Column(String(255), nullable=True) epic_posted = Column(Boolean, default=False) epic_entry_ids_json = Column(Text, nullable=True) is_deleted = Column(Boolean, default=False) created_at = Column(DateTime, default=datetime.utcnow) @property def outputs(self): try: return json.loads(self.outputs_json or "[]") except (json.JSONDecodeError, TypeError): return [] @outputs.setter def outputs(self, value): self.outputs_json = json.dumps(value) @property def conversion_status(self): try: return json.loads(self.conversion_status_json or "{}") except (json.JSONDecodeError, TypeError): return {} @conversion_status.setter def conversion_status(self, value): self.conversion_status_json = json.dumps(value) @property def extracted_data(self): try: return json.loads(self.extracted_data_json) if self.extracted_data_json else None except (json.JSONDecodeError, TypeError): return None @extracted_data.setter def extracted_data(self, value): self.extracted_data_json = json.dumps(value) if value else None @property def suggestions(self): try: return json.loads(self.suggestions_json) if self.suggestions_json else [] except (json.JSONDecodeError, TypeError): return [] @suggestions.setter def suggestions(self, value): self.suggestions_json = json.dumps(value) if value else None @property def epic_entry_ids(self): try: return json.loads(self.epic_entry_ids_json) if self.epic_entry_ids_json else [] except (json.JSONDecodeError, TypeError): return [] @epic_entry_ids.setter def epic_entry_ids(self, value): self.epic_entry_ids_json = json.dumps(value) if value else None def to_dict(self, include_path=False): d = { "id": self.doc_id, "filename": self.filename, "size": self.file_size, "pages": self.pages, "status": self.status, "summary": self.summary or "", "doc_type": self.doc_type, "uploaded_by": self.uploaded_by, "org_id": self.org_id, "uploaded_at": self.created_at.isoformat() if self.created_at else "", "outputs": self.outputs, "conversion_status": self.conversion_status, "extracted_data": self.extracted_data, "suggestions": self.suggestions, "epic_client_id": self.epic_client_id, "epic_policy_id": self.epic_policy_id, "epic_posted": self.epic_posted, } if self.error: d["error"] = self.error if include_path: d["file_path"] = self.file_path return d class ChatMessage(Base): __tablename__ = "chat_messages" id = Column(Integer, primary_key=True, autoincrement=True) session_id = Column(String(100), nullable=False, index=True) role = Column(String(20), nullable=False) content = Column(Text, nullable=False) sources_json = Column(Text, nullable=True) user_email = Column(String(255), nullable=False) org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True) created_at = Column(DateTime, default=datetime.utcnow) @property def sources(self): try: return json.loads(self.sources_json) if self.sources_json else [] except (json.JSONDecodeError, TypeError): return [] @sources.setter def sources(self, value): self.sources_json = json.dumps(value) if value else None class BillingEvent(Base): __tablename__ = "billing_events" id = Column(Integer, primary_key=True, autoincrement=True) event_id = Column(String(50), unique=True, nullable=False) event_type = Column(String(50), nullable=False) doc_id = Column(String(50), nullable=True) filename = Column(String(500), nullable=True) cost = Column(Float, default=0.0) credits_used = Column(Float, default=0.0) user_email = Column(String(255), nullable=False) org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True) created_at = Column(DateTime, default=datetime.utcnow) class WorkflowRun(Base): __tablename__ = "workflow_runs" id = Column(Integer, primary_key=True, autoincrement=True) run_id = Column(String(50), unique=True, nullable=False, index=True) workflow_type = Column(String(50), nullable=False) status = Column(String(50), default="pending") input_doc_ids_json = Column(Text, default="[]") result_json = Column(Text, nullable=True) error = Column(Text, nullable=True) user_email = Column(String(255), nullable=False) org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True) started_at = Column(DateTime, nullable=True) completed_at = Column(DateTime, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) @property def input_doc_ids(self): try: return json.loads(self.input_doc_ids_json or "[]") except (json.JSONDecodeError, TypeError): return [] @input_doc_ids.setter def input_doc_ids(self, value): self.input_doc_ids_json = json.dumps(value) @property def result(self): try: return json.loads(self.result_json) if self.result_json else None except (json.JSONDecodeError, TypeError): return None @result.setter def result(self, value): self.result_json = json.dumps(value) if value else None def to_dict(self): return { "run_id": self.run_id, "workflow_type": self.workflow_type, "status": self.status, "input_doc_ids": self.input_doc_ids, "result": self.result, "error": self.error, "user_email": self.user_email, "org_id": self.org_id, "started_at": self.started_at.isoformat() if self.started_at else None, "completed_at": self.completed_at.isoformat() if self.completed_at else None, "created_at": self.created_at.isoformat() if self.created_at else None, } # ═══════════════════════════════════════════════════════════════════════════════ # Pipeline & Review Models — Full audit trail for document-to-Epic processing # ═══════════════════════════════════════════════════════════════════════════════ PIPELINE_STAGES = [ "intake", # Document received, metadata captured "classify", # Auto-detect document type "extract", # Pull structured fields "match", # Find customer/policy in data lake "validate", # Confidence scoring, duplicate check "route", # Auto-post / review / reject decision "act", # Post to Epic (or queue for manual) "confirm", # Verify posting, record audit trail ] class PipelineRun(Base): """One run of the document processing pipeline for a single document.""" __tablename__ = "pipeline_runs" id = Column(Integer, primary_key=True, autoincrement=True) run_id = Column(String(50), unique=True, nullable=False, index=True) # pr_xxx doc_id = Column(String(50), nullable=False, index=True) pipeline_type = Column(String(50), nullable=False) # carrier_statement / endorsement / claim / policy / coi / general status = Column(String(50), default="running") # running / review_pending / completed / error / cancelled mode = Column(String(20), default="trial") # trial / live carrier = Column(String(255), nullable=True) # Aggregate counts (for batch docs like carrier statements) total_items = Column(Integer, default=0) matched_count = Column(Integer, default=0) auto_count = Column(Integer, default=0) # Auto-approved (>=0.95) review_count = Column(Integer, default=0) # Needs review (0.80-0.94) reject_count = Column(Integer, default=0) # Rejected (<0.80) posted_count = Column(Integer, default=0) # Successfully posted to Epic current_stage = Column(String(50), default="intake") result_json = Column(Text, nullable=True) error = Column(Text, nullable=True) user_email = Column(String(255), nullable=False) org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True) started_at = Column(DateTime, nullable=True) completed_at = Column(DateTime, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) @property def result(self): try: return json.loads(self.result_json) if self.result_json else None except (json.JSONDecodeError, TypeError): return None @result.setter def result(self, value): self.result_json = json.dumps(value) if value else None def to_dict(self): return { "run_id": self.run_id, "doc_id": self.doc_id, "pipeline_type": self.pipeline_type, "status": self.status, "mode": self.mode, "carrier": self.carrier, "total_items": self.total_items, "matched_count": self.matched_count, "auto_count": self.auto_count, "review_count": self.review_count, "reject_count": self.reject_count, "posted_count": self.posted_count, "current_stage": self.current_stage, "result": self.result, "error": self.error, "user_email": self.user_email, "started_at": self.started_at.isoformat() if self.started_at else None, "completed_at": self.completed_at.isoformat() if self.completed_at else None, "created_at": self.created_at.isoformat() if self.created_at else None, } class PipelineStage(Base): """Audit record for each stage in a pipeline run.""" __tablename__ = "pipeline_stages" id = Column(Integer, primary_key=True, autoincrement=True) run_id = Column(String(50), ForeignKey("pipeline_runs.run_id"), nullable=False, index=True) stage_name = Column(String(50), nullable=False) # intake/classify/extract/match/validate/route/act/confirm stage_order = Column(Integer, nullable=False) status = Column(String(50), default="pending") # pending / running / completed / skipped / error input_summary = Column(Text, nullable=True) # Brief description of input output_data_json = Column(Text, nullable=True) # Stage output (JSON) confidence_score = Column(Float, nullable=True) # 0.0 - 1.0 items_processed = Column(Integer, default=0) items_flagged = Column(Integer, default=0) # Items needing review requires_review = Column(Boolean, default=False) reviewed_by = Column(String(255), nullable=True) review_action = Column(String(50), nullable=True) # approve / reject / skip review_notes = Column(Text, nullable=True) reviewed_at = Column(DateTime, nullable=True) error = Column(Text, nullable=True) started_at = Column(DateTime, nullable=True) completed_at = Column(DateTime, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) @property def output_data(self): try: return json.loads(self.output_data_json) if self.output_data_json else None except (json.JSONDecodeError, TypeError): return None @output_data.setter def output_data(self, value): self.output_data_json = json.dumps(value) if value else None def to_dict(self): return { "run_id": self.run_id, "stage_name": self.stage_name, "stage_order": self.stage_order, "status": self.status, "input_summary": self.input_summary, "output_data": self.output_data, "confidence_score": self.confidence_score, "items_processed": self.items_processed, "items_flagged": self.items_flagged, "requires_review": self.requires_review, "reviewed_by": self.reviewed_by, "review_action": self.review_action, "review_notes": self.review_notes, "reviewed_at": self.reviewed_at.isoformat() if self.reviewed_at else None, "error": self.error, "started_at": self.started_at.isoformat() if self.started_at else None, "completed_at": self.completed_at.isoformat() if self.completed_at else None, } class ReviewItem(Base): """Individual items requiring human review (e.g., each line item in a carrier statement).""" __tablename__ = "review_items" id = Column(Integer, primary_key=True, autoincrement=True) item_id = Column(String(50), unique=True, nullable=False, index=True) # rv_xxx run_id = Column(String(50), ForeignKey("pipeline_runs.run_id"), nullable=False, index=True) stage_name = Column(String(50), nullable=False) # Which stage generated this item_type = Column(String(50), nullable=False) # line_item / policy_match / field_mismatch / duplicate / missing_field item_data_json = Column(Text, nullable=False) # The actual data needing review (JSON) confidence_score = Column(Float, nullable=True) reason = Column(Text, nullable=True) # Why flagged for review matched_policy_json = Column(Text, nullable=True) # Best match from data lake (JSON) status = Column(String(50), default="pending") # pending / approved / rejected / deferred reviewed_by = Column(String(255), nullable=True) review_notes = Column(Text, nullable=True) reviewed_at = Column(DateTime, nullable=True) epic_entry_id = Column(String(255), nullable=True) # If posted to Epic epic_posted_at = Column(DateTime, nullable=True) user_email = Column(String(255), nullable=False) org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True) created_at = Column(DateTime, default=datetime.utcnow) @property def item_data(self): try: return json.loads(self.item_data_json) if self.item_data_json else {} except (json.JSONDecodeError, TypeError): return {} @item_data.setter def item_data(self, value): self.item_data_json = json.dumps(value) if value else None @property def matched_policy(self): try: return json.loads(self.matched_policy_json) if self.matched_policy_json else None except (json.JSONDecodeError, TypeError): return None @matched_policy.setter def matched_policy(self, value): self.matched_policy_json = json.dumps(value) if value else None def to_dict(self): return { "item_id": self.item_id, "run_id": self.run_id, "stage_name": self.stage_name, "item_type": self.item_type, "item_data": self.item_data, "confidence_score": self.confidence_score, "reason": self.reason, "matched_policy": self.matched_policy, "status": self.status, "reviewed_by": self.reviewed_by, "review_notes": self.review_notes, "reviewed_at": self.reviewed_at.isoformat() if self.reviewed_at else None, "epic_entry_id": self.epic_entry_id, "epic_posted_at": self.epic_posted_at.isoformat() if self.epic_posted_at else None, "created_at": self.created_at.isoformat() if self.created_at else None, } # ═══════════════════════════════════════════════════════════════════════════════ # Cloud Storage Integration — OAuth connections & file imports # ═══════════════════════════════════════════════════════════════════════════════ class CloudConnection(Base): """OAuth or credential-based connection to a cloud storage provider.""" __tablename__ = "cloud_connections" id = Column(Integer, primary_key=True, autoincrement=True) connection_id = Column(String(50), unique=True, nullable=False, index=True) # cc_xxx user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True) org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True, index=True) provider = Column(String(50), nullable=False) # google_drive / onedrive / sharepoint / s3 / dropbox display_name = Column(String(255), nullable=True) access_token_encrypted = Column(Text, nullable=True) refresh_token_encrypted = Column(Text, nullable=True) token_expires_at = Column(DateTime, nullable=True) provider_user_email = Column(String(255), nullable=True) provider_metadata_json = Column(Text, default="{}") # tenant_id, drive_id, bucket, region, etc. status = Column(String(50), default="active") # active / expired / revoked last_used_at = Column(DateTime, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) @property def provider_metadata(self): try: return json.loads(self.provider_metadata_json or "{}") except (json.JSONDecodeError, TypeError): return {} @provider_metadata.setter def provider_metadata(self, value): self.provider_metadata_json = json.dumps(value) if value else "{}" def to_dict(self): return { "connection_id": self.connection_id, "user_id": self.user_id, "org_id": self.org_id, "provider": self.provider, "display_name": self.display_name, "provider_user_email": self.provider_user_email, "provider_metadata": self.provider_metadata, "status": self.status, "last_used_at": self.last_used_at.isoformat() if self.last_used_at else None, "created_at": self.created_at.isoformat() if self.created_at else None, } class CloudImport(Base): """Track individual file imports from cloud storage.""" __tablename__ = "cloud_imports" id = Column(Integer, primary_key=True, autoincrement=True) import_id = Column(String(50), unique=True, nullable=False, index=True) # ci_xxx connection_id = Column(String(50), ForeignKey("cloud_connections.connection_id"), nullable=False, index=True) provider = Column(String(50), nullable=False) cloud_file_id = Column(String(500), nullable=True) cloud_file_path = Column(String(1000), nullable=True) cloud_file_name = Column(String(500), nullable=False) cloud_file_size = Column(Integer, default=0) doc_id = Column(String(50), nullable=True) # FK to documents.doc_id once imported status = Column(String(50), default="downloading") # downloading / processing / ready / error error = Column(Text, nullable=True) user_email = Column(String(255), nullable=False) org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True) created_at = Column(DateTime, default=datetime.utcnow) def to_dict(self): return { "import_id": self.import_id, "connection_id": self.connection_id, "provider": self.provider, "cloud_file_id": self.cloud_file_id, "cloud_file_path": self.cloud_file_path, "cloud_file_name": self.cloud_file_name, "cloud_file_size": self.cloud_file_size, "doc_id": self.doc_id, "status": self.status, "error": self.error, "user_email": self.user_email, "org_id": self.org_id, "created_at": self.created_at.isoformat() if self.created_at else None, } class EmailImport(Base): """Track inbound email imports — carrier statements forwarded to org-specific addresses.""" __tablename__ = "email_imports" id = Column(Integer, primary_key=True, autoincrement=True) import_id = Column(String(50), unique=True, nullable=False, index=True) # ei_xxx org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True, index=True) from_email = Column(String(255), nullable=False) to_email = Column(String(255), nullable=False) subject = Column(String(1000), nullable=True) attachment_name = Column(String(500), nullable=False) attachment_size = Column(Integer, default=0) doc_id = Column(String(50), nullable=True) # FK to documents.doc_id once processed status = Column(String(50), default="received") # received / processing / ready / error error = Column(Text, nullable=True) matched_user_email = Column(String(255), nullable=True) # user who "owns" this import created_at = Column(DateTime, default=datetime.utcnow) def to_dict(self): return { "import_id": self.import_id, "org_id": self.org_id, "from_email": self.from_email, "to_email": self.to_email, "subject": self.subject, "attachment_name": self.attachment_name, "attachment_size": self.attachment_size, "doc_id": self.doc_id, "status": self.status, "error": self.error, "matched_user_email": self.matched_user_email, "created_at": self.created_at.isoformat() if self.created_at else None, }