customs-data / packages /core /models.py
3v324v23's picture
Change trade_date to String type to avoid SQLAlchemy datetime conversion errors
d4f9f67
Raw
History Blame Contribute Delete
5.81 kB
from sqlalchemy import Column, String, Integer, Float, DateTime, JSON, Index, ForeignKey, Text
from sqlalchemy.sql import func
from packages.core.database import Base
class RawTradeRecord(Base):
"""
原始海关数据表:
用于保存原始网页抓取或 API 返回的未经任何处理的 JSON,
作为数据血缘的起点,支持后续解析器升级后的数据重放。
"""
__tablename__ = "raw_trade_records"
id = Column(String(50), primary_key=True, comment="内部生成的唯一原始记录ID")
batch_no = Column(String(50), index=True, nullable=False, comment="抓取批次号")
source_country = Column(String(10), index=True, nullable=False, comment="数据来源国代码 (如 US, BR)")
source_system = Column(String(50), nullable=False, comment="抓取源系统名称")
raw_json = Column(JSON, nullable=False, comment="原始抓取数据结构")
content_hash = Column(String(64), unique=True, nullable=False, comment="内容指纹,防止重复抓取")
created_at = Column(DateTime(timezone=True), server_default=func.now())
class StandardTradeRecord(Base):
"""
标准化海关数据表:
对外提供查询的核心业务表。
所有金额统一转 USD,所有重量统一转 KG/TON,所有时间格式化。
"""
__tablename__ = "standard_trade_records"
record_id = Column(String(50), primary_key=True, comment="标准记录ID")
source_record_id = Column(String(50), ForeignKey("raw_trade_records.id"), index=True, nullable=False, comment="关联的原始记录ID(数据血缘)")
batch_no = Column(String(50), index=True, nullable=False, comment="处理批次号")
source_country = Column(String(10), index=True, nullable=False, comment="数据来源国")
trade_direction = Column(String(10), index=True, nullable=False, comment="贸易方向: import / export")
trade_date = Column(String(50), index=True, nullable=True, comment="交易发生日期")
importer_name = Column(String(255), index=True, nullable=True, comment="进口商名称(清洗后)")
exporter_name = Column(String(255), index=True, nullable=True, comment="出口商名称(清洗后)")
hs_code = Column(String(20), index=True, nullable=True, comment="商品 HS 编码")
product_name = Column(Text, nullable=True, comment="商品描述")
amount = Column(Float, nullable=True, comment="交易金额(FOB/CIF)")
currency = Column(String(10), nullable=True, comment="原始币种")
weight = Column(Float, nullable=True, comment="重量")
weight_unit = Column(String(20), nullable=True, comment="重量单位")
origin_country = Column(String(10), index=True, nullable=True, comment="原产国")
destination_country = Column(String(10), index=True, nullable=True, comment="目的国")
departure_port = Column(String(100), index=True, nullable=True, comment="起运港")
arrival_port = Column(String(100), index=True, nullable=True, comment="目的港")
transport_mode = Column(String(20), nullable=True, comment="运输方式 (如 SEA, AIR, ROAD)")
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
__table_args__ = (
Index("idx_trade_search", "source_country", "trade_direction", "trade_date"),
)
class Subscription(Base):
"""
企业动态监控与预警订阅表
"""
__tablename__ = "subscriptions"
id = Column(String(50), primary_key=True)
user_email = Column(String(100), index=True, nullable=False, comment="订阅用户邮箱")
target_entity_id = Column(String(50), nullable=True, comment="订阅的目标企业ID")
target_hs_code = Column(String(20), nullable=True, comment="订阅的HS Code")
last_notified_at = Column(DateTime(timezone=True), nullable=True, comment="上次通知时间")
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
class EntityMapping(Base):
"""
企业实体映射表:
用于将各种不同的原始公司名称映射到唯一的标准公司主体。
"""
__tablename__ = "entity_mappings"
id = Column(String(50), primary_key=True)
original_name = Column(String(500), unique=True, nullable=False, comment="原始名称(或清洗后的中间名称)")
standard_entity_id = Column(String(50), index=True, nullable=False, comment="标准主体ID")
standard_name = Column(String(500), nullable=False, comment="标准主体名称")
confidence_score = Column(Float, default=1.0, comment="映射置信度")
is_manual = Column(Integer, default=0, comment="是否人工确认: 1-是, 0-否")
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
class EntityReviewPool(Base):
"""
疑似重复实体人工确认池:
当相似度介于阈值之间时,落入此表等待运营人工审核。
"""
__tablename__ = "entity_review_pool"
id = Column(String(50), primary_key=True)
source_name = Column(String(500), nullable=False, comment="待确认的名称")
target_name = Column(String(500), nullable=False, comment="匹配到的可能名称")
target_entity_id = Column(String(50), nullable=False, comment="匹配到的标准主体ID")
similarity_score = Column(Float, nullable=False, comment="相似度得分")
status = Column(String(20), default="PENDING", comment="状态: PENDING, APPROVED, REJECTED")
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())