Spaces:
Running
Running
| """ | |
| DocuScan AI β SQLAlchemy Models | |
| Multi-org with roles: super_admin, admin, manager, user. | |
| Persistent storage with soft-delete everywhere. | |
| """ | |
| import json | |
| from datetime import datetime | |
| from sqlalchemy import Column, Integer, String, Float, Boolean, Text, DateTime, ForeignKey | |
| from sqlalchemy.orm import declarative_base, relationship | |
| Base = declarative_base() | |
| class Organization(Base): | |
| __tablename__ = "organizations" | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| name = Column(String(255), nullable=False) | |
| slug = Column(String(100), unique=True, nullable=False, index=True) | |
| is_active = Column(Boolean, default=True) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| users = relationship("User", back_populates="organization") | |
| def to_dict(self): | |
| return { | |
| "id": self.id, | |
| "name": self.name, | |
| "slug": self.slug, | |
| "is_active": self.is_active, | |
| "created_at": self.created_at.isoformat() if self.created_at else "", | |
| } | |
| class ManagerAssignment(Base): | |
| """Many-to-many: which users a manager oversees.""" | |
| __tablename__ = "manager_assignments" | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| manager_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True) | |
| user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| class User(Base): | |
| __tablename__ = "users" | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| email = Column(String(255), unique=True, nullable=False, index=True) | |
| hashed_password = Column(String(255), nullable=False) | |
| full_name = Column(String(255), nullable=False) | |
| role = Column(String(50), nullable=False, default="user") # super_admin / admin / manager / user | |
| org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True, index=True) | |
| is_active = Column(Boolean, default=True) | |
| reset_token = Column(String(255), nullable=True) | |
| reset_token_exp = Column(DateTime, nullable=True) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| organization = relationship("Organization", back_populates="users") | |
| # Users this manager oversees | |
| managed_users = relationship( | |
| "User", | |
| secondary="manager_assignments", | |
| primaryjoin="User.id == ManagerAssignment.manager_id", | |
| secondaryjoin="User.id == ManagerAssignment.user_id", | |
| viewonly=True, | |
| ) | |
| def to_dict(self, include_org=True): | |
| d = { | |
| "id": self.id, | |
| "email": self.email, | |
| "full_name": self.full_name, | |
| "role": self.role, | |
| "org_id": self.org_id, | |
| "is_active": self.is_active, | |
| "created_at": self.created_at.isoformat() if self.created_at else "", | |
| } | |
| if include_org and self.organization: | |
| d["org_name"] = self.organization.name | |
| d["org_slug"] = self.organization.slug | |
| return d | |
| class Document(Base): | |
| __tablename__ = "documents" | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| doc_id = Column(String(50), unique=True, nullable=False, index=True) | |
| filename = Column(String(500), nullable=False) | |
| file_path = Column(String(1000), nullable=False) | |
| file_size = Column(Integer, default=0) | |
| pages = Column(Integer, default=0) | |
| status = Column(String(50), default="processing") | |
| error = Column(Text, nullable=True) | |
| summary = Column(Text, nullable=True) | |
| doc_type = Column(String(50), default="other") | |
| uploaded_by = Column(String(255), nullable=False) | |
| org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True, index=True) | |
| cloud_index_name = Column(String(255), nullable=True) | |
| outputs_json = Column(Text, default="[]") | |
| conversion_status_json = Column(Text, default='{}') | |
| extracted_data_json = Column(Text, nullable=True) | |
| suggestions_json = Column(Text, nullable=True) # JSON array of smart suggestions | |
| epic_client_id = Column(String(255), nullable=True) | |
| epic_policy_id = Column(String(255), nullable=True) | |
| epic_posted = Column(Boolean, default=False) | |
| epic_entry_ids_json = Column(Text, nullable=True) | |
| is_deleted = Column(Boolean, default=False) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| def outputs(self): | |
| try: | |
| return json.loads(self.outputs_json or "[]") | |
| except (json.JSONDecodeError, TypeError): | |
| return [] | |
| def outputs(self, value): | |
| self.outputs_json = json.dumps(value) | |
| def conversion_status(self): | |
| try: | |
| return json.loads(self.conversion_status_json or "{}") | |
| except (json.JSONDecodeError, TypeError): | |
| return {} | |
| def conversion_status(self, value): | |
| self.conversion_status_json = json.dumps(value) | |
| def extracted_data(self): | |
| try: | |
| return json.loads(self.extracted_data_json) if self.extracted_data_json else None | |
| except (json.JSONDecodeError, TypeError): | |
| return None | |
| def extracted_data(self, value): | |
| self.extracted_data_json = json.dumps(value) if value else None | |
| def suggestions(self): | |
| try: | |
| return json.loads(self.suggestions_json) if self.suggestions_json else [] | |
| except (json.JSONDecodeError, TypeError): | |
| return [] | |
| def suggestions(self, value): | |
| self.suggestions_json = json.dumps(value) if value else None | |
| def epic_entry_ids(self): | |
| try: | |
| return json.loads(self.epic_entry_ids_json) if self.epic_entry_ids_json else [] | |
| except (json.JSONDecodeError, TypeError): | |
| return [] | |
| def epic_entry_ids(self, value): | |
| self.epic_entry_ids_json = json.dumps(value) if value else None | |
| def to_dict(self, include_path=False): | |
| d = { | |
| "id": self.doc_id, | |
| "filename": self.filename, | |
| "size": self.file_size, | |
| "pages": self.pages, | |
| "status": self.status, | |
| "summary": self.summary or "", | |
| "doc_type": self.doc_type, | |
| "uploaded_by": self.uploaded_by, | |
| "org_id": self.org_id, | |
| "uploaded_at": self.created_at.isoformat() if self.created_at else "", | |
| "outputs": self.outputs, | |
| "conversion_status": self.conversion_status, | |
| "extracted_data": self.extracted_data, | |
| "suggestions": self.suggestions, | |
| "epic_client_id": self.epic_client_id, | |
| "epic_policy_id": self.epic_policy_id, | |
| "epic_posted": self.epic_posted, | |
| } | |
| if self.error: | |
| d["error"] = self.error | |
| if include_path: | |
| d["file_path"] = self.file_path | |
| return d | |
| class ChatMessage(Base): | |
| __tablename__ = "chat_messages" | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| session_id = Column(String(100), nullable=False, index=True) | |
| role = Column(String(20), nullable=False) | |
| content = Column(Text, nullable=False) | |
| sources_json = Column(Text, nullable=True) | |
| user_email = Column(String(255), nullable=False) | |
| org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| def sources(self): | |
| try: | |
| return json.loads(self.sources_json) if self.sources_json else [] | |
| except (json.JSONDecodeError, TypeError): | |
| return [] | |
| def sources(self, value): | |
| self.sources_json = json.dumps(value) if value else None | |
| class BillingEvent(Base): | |
| __tablename__ = "billing_events" | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| event_id = Column(String(50), unique=True, nullable=False) | |
| event_type = Column(String(50), nullable=False) | |
| doc_id = Column(String(50), nullable=True) | |
| filename = Column(String(500), nullable=True) | |
| cost = Column(Float, default=0.0) | |
| credits_used = Column(Float, default=0.0) | |
| user_email = Column(String(255), nullable=False) | |
| org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| class WorkflowRun(Base): | |
| __tablename__ = "workflow_runs" | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| run_id = Column(String(50), unique=True, nullable=False, index=True) | |
| workflow_type = Column(String(50), nullable=False) | |
| status = Column(String(50), default="pending") | |
| input_doc_ids_json = Column(Text, default="[]") | |
| result_json = Column(Text, nullable=True) | |
| error = Column(Text, nullable=True) | |
| user_email = Column(String(255), nullable=False) | |
| org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True) | |
| started_at = Column(DateTime, nullable=True) | |
| completed_at = Column(DateTime, nullable=True) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| def input_doc_ids(self): | |
| try: | |
| return json.loads(self.input_doc_ids_json or "[]") | |
| except (json.JSONDecodeError, TypeError): | |
| return [] | |
| def input_doc_ids(self, value): | |
| self.input_doc_ids_json = json.dumps(value) | |
| def result(self): | |
| try: | |
| return json.loads(self.result_json) if self.result_json else None | |
| except (json.JSONDecodeError, TypeError): | |
| return None | |
| def result(self, value): | |
| self.result_json = json.dumps(value) if value else None | |
| def to_dict(self): | |
| return { | |
| "run_id": self.run_id, | |
| "workflow_type": self.workflow_type, | |
| "status": self.status, | |
| "input_doc_ids": self.input_doc_ids, | |
| "result": self.result, | |
| "error": self.error, | |
| "user_email": self.user_email, | |
| "org_id": self.org_id, | |
| "started_at": self.started_at.isoformat() if self.started_at else None, | |
| "completed_at": self.completed_at.isoformat() if self.completed_at else None, | |
| "created_at": self.created_at.isoformat() if self.created_at else None, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Pipeline & Review Models β Full audit trail for document-to-Epic processing | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| PIPELINE_STAGES = [ | |
| "intake", # Document received, metadata captured | |
| "classify", # Auto-detect document type | |
| "extract", # Pull structured fields | |
| "match", # Find customer/policy in data lake | |
| "validate", # Confidence scoring, duplicate check | |
| "route", # Auto-post / review / reject decision | |
| "act", # Post to Epic (or queue for manual) | |
| "confirm", # Verify posting, record audit trail | |
| ] | |
| class PipelineRun(Base): | |
| """One run of the document processing pipeline for a single document.""" | |
| __tablename__ = "pipeline_runs" | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| run_id = Column(String(50), unique=True, nullable=False, index=True) # pr_xxx | |
| doc_id = Column(String(50), nullable=False, index=True) | |
| pipeline_type = Column(String(50), nullable=False) # carrier_statement / endorsement / claim / policy / coi / general | |
| status = Column(String(50), default="running") # running / review_pending / completed / error / cancelled | |
| mode = Column(String(20), default="trial") # trial / live | |
| carrier = Column(String(255), nullable=True) | |
| # Aggregate counts (for batch docs like carrier statements) | |
| total_items = Column(Integer, default=0) | |
| matched_count = Column(Integer, default=0) | |
| auto_count = Column(Integer, default=0) # Auto-approved (>=0.95) | |
| review_count = Column(Integer, default=0) # Needs review (0.80-0.94) | |
| reject_count = Column(Integer, default=0) # Rejected (<0.80) | |
| posted_count = Column(Integer, default=0) # Successfully posted to Epic | |
| current_stage = Column(String(50), default="intake") | |
| result_json = Column(Text, nullable=True) | |
| error = Column(Text, nullable=True) | |
| user_email = Column(String(255), nullable=False) | |
| org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True) | |
| started_at = Column(DateTime, nullable=True) | |
| completed_at = Column(DateTime, nullable=True) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| def result(self): | |
| try: | |
| return json.loads(self.result_json) if self.result_json else None | |
| except (json.JSONDecodeError, TypeError): | |
| return None | |
| def result(self, value): | |
| self.result_json = json.dumps(value) if value else None | |
| def to_dict(self): | |
| return { | |
| "run_id": self.run_id, | |
| "doc_id": self.doc_id, | |
| "pipeline_type": self.pipeline_type, | |
| "status": self.status, | |
| "mode": self.mode, | |
| "carrier": self.carrier, | |
| "total_items": self.total_items, | |
| "matched_count": self.matched_count, | |
| "auto_count": self.auto_count, | |
| "review_count": self.review_count, | |
| "reject_count": self.reject_count, | |
| "posted_count": self.posted_count, | |
| "current_stage": self.current_stage, | |
| "result": self.result, | |
| "error": self.error, | |
| "user_email": self.user_email, | |
| "started_at": self.started_at.isoformat() if self.started_at else None, | |
| "completed_at": self.completed_at.isoformat() if self.completed_at else None, | |
| "created_at": self.created_at.isoformat() if self.created_at else None, | |
| } | |
| class PipelineStage(Base): | |
| """Audit record for each stage in a pipeline run.""" | |
| __tablename__ = "pipeline_stages" | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| run_id = Column(String(50), ForeignKey("pipeline_runs.run_id"), nullable=False, index=True) | |
| stage_name = Column(String(50), nullable=False) # intake/classify/extract/match/validate/route/act/confirm | |
| stage_order = Column(Integer, nullable=False) | |
| status = Column(String(50), default="pending") # pending / running / completed / skipped / error | |
| input_summary = Column(Text, nullable=True) # Brief description of input | |
| output_data_json = Column(Text, nullable=True) # Stage output (JSON) | |
| confidence_score = Column(Float, nullable=True) # 0.0 - 1.0 | |
| items_processed = Column(Integer, default=0) | |
| items_flagged = Column(Integer, default=0) # Items needing review | |
| requires_review = Column(Boolean, default=False) | |
| reviewed_by = Column(String(255), nullable=True) | |
| review_action = Column(String(50), nullable=True) # approve / reject / skip | |
| review_notes = Column(Text, nullable=True) | |
| reviewed_at = Column(DateTime, nullable=True) | |
| error = Column(Text, nullable=True) | |
| started_at = Column(DateTime, nullable=True) | |
| completed_at = Column(DateTime, nullable=True) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| def output_data(self): | |
| try: | |
| return json.loads(self.output_data_json) if self.output_data_json else None | |
| except (json.JSONDecodeError, TypeError): | |
| return None | |
| def output_data(self, value): | |
| self.output_data_json = json.dumps(value) if value else None | |
| def to_dict(self): | |
| return { | |
| "run_id": self.run_id, | |
| "stage_name": self.stage_name, | |
| "stage_order": self.stage_order, | |
| "status": self.status, | |
| "input_summary": self.input_summary, | |
| "output_data": self.output_data, | |
| "confidence_score": self.confidence_score, | |
| "items_processed": self.items_processed, | |
| "items_flagged": self.items_flagged, | |
| "requires_review": self.requires_review, | |
| "reviewed_by": self.reviewed_by, | |
| "review_action": self.review_action, | |
| "review_notes": self.review_notes, | |
| "reviewed_at": self.reviewed_at.isoformat() if self.reviewed_at else None, | |
| "error": self.error, | |
| "started_at": self.started_at.isoformat() if self.started_at else None, | |
| "completed_at": self.completed_at.isoformat() if self.completed_at else None, | |
| } | |
| class ReviewItem(Base): | |
| """Individual items requiring human review (e.g., each line item in a carrier statement).""" | |
| __tablename__ = "review_items" | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| item_id = Column(String(50), unique=True, nullable=False, index=True) # rv_xxx | |
| run_id = Column(String(50), ForeignKey("pipeline_runs.run_id"), nullable=False, index=True) | |
| stage_name = Column(String(50), nullable=False) # Which stage generated this | |
| item_type = Column(String(50), nullable=False) # line_item / policy_match / field_mismatch / duplicate / missing_field | |
| item_data_json = Column(Text, nullable=False) # The actual data needing review (JSON) | |
| confidence_score = Column(Float, nullable=True) | |
| reason = Column(Text, nullable=True) # Why flagged for review | |
| matched_policy_json = Column(Text, nullable=True) # Best match from data lake (JSON) | |
| status = Column(String(50), default="pending") # pending / approved / rejected / deferred | |
| reviewed_by = Column(String(255), nullable=True) | |
| review_notes = Column(Text, nullable=True) | |
| reviewed_at = Column(DateTime, nullable=True) | |
| epic_entry_id = Column(String(255), nullable=True) # If posted to Epic | |
| epic_posted_at = Column(DateTime, nullable=True) | |
| user_email = Column(String(255), nullable=False) | |
| org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| def item_data(self): | |
| try: | |
| return json.loads(self.item_data_json) if self.item_data_json else {} | |
| except (json.JSONDecodeError, TypeError): | |
| return {} | |
| def item_data(self, value): | |
| self.item_data_json = json.dumps(value) if value else None | |
| def matched_policy(self): | |
| try: | |
| return json.loads(self.matched_policy_json) if self.matched_policy_json else None | |
| except (json.JSONDecodeError, TypeError): | |
| return None | |
| def matched_policy(self, value): | |
| self.matched_policy_json = json.dumps(value) if value else None | |
| def to_dict(self): | |
| return { | |
| "item_id": self.item_id, | |
| "run_id": self.run_id, | |
| "stage_name": self.stage_name, | |
| "item_type": self.item_type, | |
| "item_data": self.item_data, | |
| "confidence_score": self.confidence_score, | |
| "reason": self.reason, | |
| "matched_policy": self.matched_policy, | |
| "status": self.status, | |
| "reviewed_by": self.reviewed_by, | |
| "review_notes": self.review_notes, | |
| "reviewed_at": self.reviewed_at.isoformat() if self.reviewed_at else None, | |
| "epic_entry_id": self.epic_entry_id, | |
| "epic_posted_at": self.epic_posted_at.isoformat() if self.epic_posted_at else None, | |
| "created_at": self.created_at.isoformat() if self.created_at else None, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Cloud Storage Integration β OAuth connections & file imports | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class CloudConnection(Base): | |
| """OAuth or credential-based connection to a cloud storage provider.""" | |
| __tablename__ = "cloud_connections" | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| connection_id = Column(String(50), unique=True, nullable=False, index=True) # cc_xxx | |
| user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True) | |
| org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True, index=True) | |
| provider = Column(String(50), nullable=False) # google_drive / onedrive / sharepoint / s3 / dropbox | |
| display_name = Column(String(255), nullable=True) | |
| access_token_encrypted = Column(Text, nullable=True) | |
| refresh_token_encrypted = Column(Text, nullable=True) | |
| token_expires_at = Column(DateTime, nullable=True) | |
| provider_user_email = Column(String(255), nullable=True) | |
| provider_metadata_json = Column(Text, default="{}") # tenant_id, drive_id, bucket, region, etc. | |
| status = Column(String(50), default="active") # active / expired / revoked | |
| last_used_at = Column(DateTime, nullable=True) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| def provider_metadata(self): | |
| try: | |
| return json.loads(self.provider_metadata_json or "{}") | |
| except (json.JSONDecodeError, TypeError): | |
| return {} | |
| def provider_metadata(self, value): | |
| self.provider_metadata_json = json.dumps(value) if value else "{}" | |
| def to_dict(self): | |
| return { | |
| "connection_id": self.connection_id, | |
| "user_id": self.user_id, | |
| "org_id": self.org_id, | |
| "provider": self.provider, | |
| "display_name": self.display_name, | |
| "provider_user_email": self.provider_user_email, | |
| "provider_metadata": self.provider_metadata, | |
| "status": self.status, | |
| "last_used_at": self.last_used_at.isoformat() if self.last_used_at else None, | |
| "created_at": self.created_at.isoformat() if self.created_at else None, | |
| } | |
| class CloudImport(Base): | |
| """Track individual file imports from cloud storage.""" | |
| __tablename__ = "cloud_imports" | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| import_id = Column(String(50), unique=True, nullable=False, index=True) # ci_xxx | |
| connection_id = Column(String(50), ForeignKey("cloud_connections.connection_id"), nullable=False, index=True) | |
| provider = Column(String(50), nullable=False) | |
| cloud_file_id = Column(String(500), nullable=True) | |
| cloud_file_path = Column(String(1000), nullable=True) | |
| cloud_file_name = Column(String(500), nullable=False) | |
| cloud_file_size = Column(Integer, default=0) | |
| doc_id = Column(String(50), nullable=True) # FK to documents.doc_id once imported | |
| status = Column(String(50), default="downloading") # downloading / processing / ready / error | |
| error = Column(Text, nullable=True) | |
| user_email = Column(String(255), nullable=False) | |
| org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True) | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| def to_dict(self): | |
| return { | |
| "import_id": self.import_id, | |
| "connection_id": self.connection_id, | |
| "provider": self.provider, | |
| "cloud_file_id": self.cloud_file_id, | |
| "cloud_file_path": self.cloud_file_path, | |
| "cloud_file_name": self.cloud_file_name, | |
| "cloud_file_size": self.cloud_file_size, | |
| "doc_id": self.doc_id, | |
| "status": self.status, | |
| "error": self.error, | |
| "user_email": self.user_email, | |
| "org_id": self.org_id, | |
| "created_at": self.created_at.isoformat() if self.created_at else None, | |
| } | |
| class EmailImport(Base): | |
| """Track inbound email imports β carrier statements forwarded to org-specific addresses.""" | |
| __tablename__ = "email_imports" | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| import_id = Column(String(50), unique=True, nullable=False, index=True) # ei_xxx | |
| org_id = Column(Integer, ForeignKey("organizations.id"), nullable=True, index=True) | |
| from_email = Column(String(255), nullable=False) | |
| to_email = Column(String(255), nullable=False) | |
| subject = Column(String(1000), nullable=True) | |
| attachment_name = Column(String(500), nullable=False) | |
| attachment_size = Column(Integer, default=0) | |
| doc_id = Column(String(50), nullable=True) # FK to documents.doc_id once processed | |
| status = Column(String(50), default="received") # received / processing / ready / error | |
| error = Column(Text, nullable=True) | |
| matched_user_email = Column(String(255), nullable=True) # user who "owns" this import | |
| created_at = Column(DateTime, default=datetime.utcnow) | |
| def to_dict(self): | |
| return { | |
| "import_id": self.import_id, | |
| "org_id": self.org_id, | |
| "from_email": self.from_email, | |
| "to_email": self.to_email, | |
| "subject": self.subject, | |
| "attachment_name": self.attachment_name, | |
| "attachment_size": self.attachment_size, | |
| "doc_id": self.doc_id, | |
| "status": self.status, | |
| "error": self.error, | |
| "matched_user_email": self.matched_user_email, | |
| "created_at": self.created_at.isoformat() if self.created_at else None, | |
| } | |