Files
iot-device-management-service/app/models_iot.py
16337 fa1a0b96c0 修复:IoT数据库连接支持mysql-connector-python驱动
腾讯云MySQL需要use_pure+ssl_disabled参数才能认证通过,
当URL使用mysqlconnector驱动时自动添加connect_args
2026-03-31 15:15:32 +08:00

144 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
IoT 工单只读 ORM 模型
映射 aiot-platform 库的工单表vsp-service 通过第二个 SQLAlchemy 引擎跨库只读查询。
"""
from datetime import datetime
from typing import Optional
from sqlalchemy import (
Column, String, Integer, SmallInteger, BigInteger, DateTime, Text,
create_engine, Index,
)
from sqlalchemy.orm import declarative_base, sessionmaker
from app.config import settings
from app.utils.logger import logger
IotBase = declarative_base()
class IotOpsOrder(IotBase):
"""IoT 通用工单主表(只读)"""
__tablename__ = "ops_order"
id = Column(BigInteger, primary_key=True)
order_code = Column(String(64), comment="工单编号")
order_type = Column(String(32), comment="工单类型: SECURITY/CLEAN")
title = Column(String(200), comment="工单标题")
description = Column(Text, comment="工单描述")
priority = Column(SmallInteger, comment="优先级: 0低/1中/2高")
status = Column(String(32), comment="状态: PENDING/ASSIGNED/ARRIVED/PAUSED/COMPLETED/CANCELLED")
area_id = Column(BigInteger, comment="区域ID")
location = Column(String(200), comment="位置")
assignee_id = Column(BigInteger, comment="指派人ID")
assignee_name = Column(String(100), comment="指派人姓名")
start_time = Column(DateTime, comment="开始时间")
end_time = Column(DateTime, comment="结束时间")
creator = Column(String(64), comment="创建者")
create_time = Column(DateTime, comment="创建时间")
update_time = Column(DateTime, comment="更新时间")
deleted = Column(SmallInteger, default=0, comment="是否删除")
tenant_id = Column(BigInteger, default=0, comment="租户编号")
class IotOpsOrderSecurityExt(IotBase):
"""安保工单扩展表(只读)"""
__tablename__ = "ops_order_security_ext"
id = Column(BigInteger, primary_key=True)
ops_order_id = Column(BigInteger, nullable=False, comment="工单ID")
alarm_id = Column(String(64), comment="关联告警ID")
alarm_type = Column(String(32), comment="告警类型")
camera_id = Column(String(64), comment="摄像头ID")
camera_name = Column(String(128), comment="摄像头名称")
roi_id = Column(String(64), comment="ROI区域ID")
image_url = Column(String(512), comment="截图URL")
assigned_user_id = Column(BigInteger, comment="处理人ID")
assigned_user_name = Column(String(100), comment="处理人姓名")
result = Column(Text, comment="处理结果")
result_img_urls = Column(Text, comment="结果图片URL")
false_alarm = Column(SmallInteger, comment="是否误报")
dispatched_time = Column(DateTime, comment="派单时间")
confirmed_time = Column(DateTime, comment="确认时间")
completed_time = Column(DateTime, comment="完成时间")
creator = Column(String(64))
create_time = Column(DateTime)
updater = Column(String(64))
update_time = Column(DateTime)
deleted = Column(SmallInteger, default=0)
tenant_id = Column(BigInteger, default=0)
class IotOpsOrderCleanExt(IotBase):
"""保洁工单扩展表(只读)"""
__tablename__ = "ops_order_clean_ext"
id = Column(BigInteger, primary_key=True)
ops_order_id = Column(BigInteger, nullable=False, comment="工单ID")
is_auto = Column(SmallInteger, default=1, comment="是否自动工单")
expected_duration = Column(Integer, comment="预计作业时长(分钟)")
arrived_time = Column(DateTime, comment="实际到岗时间")
completed_time = Column(DateTime, comment="实际完成时间")
pause_start_time = Column(DateTime, comment="暂停开始时间")
pause_end_time = Column(DateTime, comment="暂停结束时间")
total_pause_seconds = Column(Integer, default=0, comment="累计暂停时长(秒)")
cleaning_type = Column(String(32), comment="保洁类型: ROUTINE/DEEP/SPOT/EMERGENCY")
difficulty_level = Column(SmallInteger, comment="难度等级(1-5)")
creator = Column(String(64))
create_time = Column(DateTime)
updater = Column(String(64))
update_time = Column(DateTime)
deleted = Column(SmallInteger, default=0)
tenant_id = Column(BigInteger, default=0)
dispatched_time = Column(DateTime, comment="实际下发时间")
first_dispatched_time = Column(DateTime, comment="首次下发时间")
# ==================== IoT 数据库连接管理 ====================
_iot_engine = None
_IotSessionLocal = None
def get_iot_engine():
"""获取 IoT 数据库引擎(懒初始化)"""
global _iot_engine
if _iot_engine is None:
iot_url = settings.iot_database_url
if not iot_url:
raise RuntimeError("IOT_DATABASE_URL 未配置,无法连接 IoT 数据库")
# mysql+mysqlconnector 需要 connect_args 传递认证参数
connect_args = {}
if "mysqlconnector" in iot_url:
connect_args = {"use_pure": True, "ssl_disabled": True}
_iot_engine = create_engine(
iot_url,
echo=False,
pool_recycle=1800,
pool_pre_ping=True,
connect_args=connect_args,
)
logger.info(f"IoT 数据库引擎已创建")
return _iot_engine
def get_iot_session():
"""获取 IoT 数据库只读 session"""
global _IotSessionLocal
if _IotSessionLocal is None:
_IotSessionLocal = sessionmaker(
bind=get_iot_engine(), autocommit=False, autoflush=False,
)
return _IotSessionLocal()
def close_iot_db():
"""关闭 IoT 数据库连接"""
global _iot_engine, _IotSessionLocal
if _iot_engine:
_iot_engine.dispose()
_iot_engine = None
_IotSessionLocal = None