Compare commits

...

9 Commits

15 changed files with 809 additions and 369 deletions

View File

@@ -18,6 +18,7 @@ from ..schemas.comprehensive_data import (
SettlementDataCheckpointQueryRequest, SettlementDataCheckpointQueryRequest,
LevelDataQueryRequest, LevelDataQueryRequest,
LinecodeRequest, LinecodeRequest,
LinecodeAccountRequest,
NYIDRequest, NYIDRequest,
SectionByAccountRequest, SectionByAccountRequest,
PointByAccountRequest, PointByAccountRequest,
@@ -30,6 +31,7 @@ from ..services.settlement_data import SettlementDataService
from ..services.level_data import LevelDataService from ..services.level_data import LevelDataService
from ..services.original_data import OriginalDataService from ..services.original_data import OriginalDataService
from ..services.comprehensive import ComprehensiveDataService from ..services.comprehensive import ComprehensiveDataService
from ..services.account import AccountService
from ..utils.get_operating_mode import OperatingModePredictor from ..utils.get_operating_mode import OperatingModePredictor
import logging import logging
router = APIRouter(prefix="/comprehensive_data", tags=["综合数据管理"]) router = APIRouter(prefix="/comprehensive_data", tags=["综合数据管理"])
@@ -727,4 +729,66 @@ def get_checkpoint_by_point(request: LevelDataQueryRequest, db: Session = Depend
total=0, total=0,
data=[] data=[]
) )
@router.post("/get_accounts_by_linecode", response_model=DataResponse)
def get_accounts_by_linecode(request: LinecodeAccountRequest, db: Session = Depends(get_db)):
"""
通过水准线路编码查询账号信息
业务逻辑:
1. 根据linecode在水准数据表查询最新的NYID
2. 根据NYID在沉降数据表查询所有point_id去重
3. 根据point_id在观测点表查询所有section_id去重
4. 根据section_id在断面表查询所有account_id去重
5. 根据account_id在账号表查询账号信息并返回
优化使用批量IN查询避免循环查询数据库
"""
try:
linecode = request.linecode
logger.info(f"接口请求根据linecode={linecode}查询账号信息")
# 调用服务层方法
accounts = AccountService.get_accounts_by_linecode(db, linecode)
if not accounts:
return DataResponse(
code=ResponseCode.SUCCESS,
message=f"未找到linecode={linecode}对应的账号信息",
total=0,
data=[]
)
# 转换为字典列表
account_data = []
for account in accounts:
account_dict = {
"id": account.id,
"username": account.username,
"status": account.status,
"today_updated": account.today_updated,
"project_name": account.project_name,
"created_at": account.created_at.strftime("%Y-%m-%d %H:%M:%S") if account.created_at else None,
"updated_at": account.updated_at.strftime("%Y-%m-%d %H:%M:%S") if account.updated_at else None,
"update_time": account.update_time,
"max_variation": account.max_variation,
"yh_id": account.yh_id,
"cl_name": account.cl_name
}
account_data.append(account_dict)
return DataResponse(
code=ResponseCode.SUCCESS,
message=f"查询成功,共获取{len(account_data)}个账号",
total=len(account_data),
data=account_data
)
except Exception as e:
logger.error(f"查询账号信息失败:{str(e)}", exc_info=True)
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"查询失败:{str(e)}",
total=0,
data=[]
)

View File

@@ -8,7 +8,11 @@ from ..schemas.level_data import (
LevelDataListResponse, LevelDataListResponse,
LevelDataResponse, LevelDataResponse,
BatchDeleteByLinecodesRequest, BatchDeleteByLinecodesRequest,
BatchDeleteByLinecodesResponse BatchDeleteByLinecodesResponse,
LinecodeRequest,
NyidListResponse,
SyncLoseDataRequest,
SyncLoseDataResponse,
) )
from ..services.level_data import LevelDataService from ..services.level_data import LevelDataService
@@ -36,6 +40,31 @@ def get_level_data_by_project(request: LevelDataRequest, db: Session = Depends(g
) )
@router.post("/get_nyids_by_linecode", response_model=NyidListResponse)
def get_nyids_by_linecode(request: LinecodeRequest, db: Session = Depends(get_db)):
"""
通过水准线路编码返回该线路下所有 NYID只返回 NYID 列表(去重、按 NYID 降序)
"""
try:
level_service = LevelDataService()
level_list = level_service.get_by_linecode(db, linecode=request.linecode)
nyids = sorted(
{str(item.NYID) for item in level_list if item.NYID},
key=lambda x: int(x) if str(x).isdigit() else 0,
reverse=True,
)
return NyidListResponse(
code=ResponseCode.SUCCESS,
message=ResponseMessage.SUCCESS,
data=nyids,
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"查询失败: {str(e)}",
)
@router.post("/batch_delete_by_linecodes", response_model=BatchDeleteByLinecodesResponse) @router.post("/batch_delete_by_linecodes", response_model=BatchDeleteByLinecodesResponse)
def batch_delete_by_linecodes(request: BatchDeleteByLinecodesRequest, db: Session = Depends(get_db)): def batch_delete_by_linecodes(request: BatchDeleteByLinecodesRequest, db: Session = Depends(get_db)):
""" """
@@ -58,4 +87,40 @@ def batch_delete_by_linecodes(request: BatchDeleteByLinecodesRequest, db: Sessio
raise HTTPException( raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"批量删除失败: {str(e)}" detail=f"批量删除失败: {str(e)}"
)
@router.post("/sync_lose_data", response_model=SyncLoseDataResponse)
def sync_lose_data(
request: SyncLoseDataRequest = SyncLoseDataRequest(),
db: Session = Depends(get_db),
):
"""
同步缺失数据到 lose_data 表。
- 不传 linecode按所有水准线路的 NYID 计算缺失并写入,仅返回是否处理成功。
- 传 linecode只处理该水准线路并返回该线路的缺失数据记录列表。
缺失规则:原始数据无=1、沉降数据无=2lose_data 为二者之和0/1/2/3
"""
try:
level_service = LevelDataService()
result = level_service.sync_lose_data(db, linecode=request.linecode)
if result.get("data") is None:
data = {"success": result["success"]}
if not result["success"]:
return SyncLoseDataResponse(
code=1,
message=result.get("message", "处理失败"),
data=data,
)
else:
data = result["data"]
return SyncLoseDataResponse(
code=0 if result["success"] else 1,
message=ResponseMessage.SUCCESS if result["success"] else (result.get("message") or "处理失败"),
data=data,
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"同步缺失数据失败: {str(e)}",
) )

View File

@@ -235,10 +235,11 @@ def receive_after_cursor_execute(conn, cursor, statement, params, context, execu
log_transaction_end(success=True) log_transaction_end(success=True)
@event.listens_for(Engine, "handle_error") @event.listens_for(Engine, "handle_error")
def receive_handle_error(exception, context): def receive_handle_error(context):
"""错误监听""" """错误监听SQLAlchemy 只传入一个 ExceptionContext 参数"""
error_msg = str(exception) exception = getattr(context, "original_exception", None) or getattr(context, "sqlalchemy_exception", None)
sql = context.statement if context and hasattr(context, 'statement') else None error_msg = str(exception) if exception else str(context)
sql = getattr(context, "statement", None)
log_connection_error(error_msg, sql) log_connection_error(error_msg, sql)
log_transaction_end(success=False, error=error_msg) log_transaction_end(success=False, error=error_msg)

View File

@@ -20,6 +20,26 @@ class LevelData(Base):
temperature = Column(String(100), comment="温度") temperature = Column(String(100), comment="温度")
weather = Column(String(100), comment="天气") weather = Column(String(100), comment="天气")
# 模型转字典
def to_dict(self):
"""将模型实例转换为字典,支持 Pydantic 序列化"""
return {
column.name: getattr(self, column.name)
for column in self.__table__.columns
}
class LoseData(Base):
__tablename__ = "lose_data"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
account_id = Column(String(100), nullable=False, comment="水准线路编码", index=True)
NYID = Column(String(100), nullable=False, comment="期数id", index=True)
linecode = Column(String(100), nullable=False, comment="水准线路编码", index=True)
benchmarkids = Column(String(100), comment="工作基点名称序列")
point_id = Column(String(100), nullable=False, comment="观测点id", index=True)
section_id = Column(String(100), nullable=False, comment="所属断面id")
lose_data = Column(String(100), comment="水准观测类型")
# 模型转字典 # 模型转字典
def to_dict(self): def to_dict(self):
"""将模型实例转换为字典,支持 Pydantic 序列化""" """将模型实例转换为字典,支持 Pydantic 序列化"""

46
app/models/lose_data.py Normal file
View File

@@ -0,0 +1,46 @@
<<<<<<< HEAD
from sqlalchemy import Column, Integer, String
from ..core.database import Base
class LoseData(Base):
"""缺失数据记录表:记录各水准线路(期数)的原始/沉降数据缺失情况"""
__tablename__ = "lose_data"
id = Column(Integer, primary_key=True, index=True, autoincrement=True, comment="ID")
account_id = Column(Integer, nullable=False, comment="账户id", index=True)
NYID = Column(String(100), nullable=False, comment="期数ID", index=True)
linecode = Column(String(255), nullable=False, default="0", comment="水准线路编码", index=True)
lose_data = Column(Integer, nullable=False, default=0, comment="缺失的数据默认是0")
section_id = Column(String(255), nullable=True, comment="所属断面id")
point_id = Column(String(100), nullable=False, comment="测点ID")
def to_dict(self):
return {
column.name: getattr(self, column.name)
for column in self.__table__.columns
}
=======
from sqlalchemy import Column, Integer, String, DateTime
from ..core.database import Base
class LoseData(Base):
__tablename__ = "lose_data"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
account_id = Column(String(100), nullable=False, comment="水准线路编码", index=True)
NYID = Column(String(100), nullable=False, comment="期数id", index=True)
linecode = Column(String(100), nullable=False, comment="水准线路编码", index=True)
benchmarkids = Column(String(100), comment="工作基点名称序列")
point_id = Column(String(100), nullable=False, comment="观测点id", index=True)
section_id = Column(String(100), nullable=False, comment="所属断面id")
lose_data = Column(String(100), comment="水准观测类型")
# 模型转字典
def to_dict(self):
"""将模型实例转换为字典,支持 Pydantic 序列化"""
return {
column.name: getattr(self, column.name)
for column in self.__table__.columns
}
>>>>>>> 08b556fa6e96c71bc757e9aa393457d0bbf0589b

View File

@@ -290,6 +290,10 @@ class ComprehensiveDataImportRequest(BaseModel):
data: Dict[str, Any] data: Dict[str, Any]
class LinecodeRequest(BaseModel): class LinecodeRequest(BaseModel):
linecode: str linecode: str
class LinecodeAccountRequest(BaseModel):
linecode: str
class ComprehensiveDataImportResponse(BaseModel): class ComprehensiveDataImportResponse(BaseModel):
success: bool success: bool
message: str message: str

View File

@@ -51,4 +51,41 @@ class BatchDeleteByLinecodesResponse(BaseModel):
message: str message: str
success: bool success: bool
backup_file: Optional[str] = None backup_file: Optional[str] = None
deleted_counts: Optional[dict] = None deleted_counts: Optional[dict] = None
class LinecodeRequest(BaseModel):
"""按水准线路编码查询请求"""
linecode: str = Field(..., description="水准线路编码")
class NyidListResponse(BaseModel):
"""仅返回 NYID 列表的响应"""
code: int = 0
message: str
data: List[str] = Field(default_factory=list, description="NYID 列表")
class SyncLoseDataRequest(BaseModel):
"""同步缺失数据请求:不传 linecode 表示全量同步,传则只处理该线路"""
linecode: Optional[str] = Field(None, description="水准线路编码,不传则处理全部线路")
class LoseDataItem(BaseModel):
"""lose_data 表单条记录"""
id: int
account_id: int
NYID: str
linecode: str
lose_data: int
section_id: Optional[str] = None
point_id: str = ""
model_config = ConfigDict(from_attributes=True)
class SyncLoseDataResponse(BaseModel):
"""同步缺失数据响应:全量时 data 为是否成功,单线路时 data 为该线路缺失记录列表"""
code: int = 0
message: str
data: Any = None # 全量时为 {"success": bool},单线路时为 List[LoseDataItem]

View File

@@ -103,3 +103,84 @@ class AccountService:
db.commit() db.commit()
return True return True
return False return False
@staticmethod
def get_accounts_by_linecode(db: Session, linecode: str) -> List[Account]:
"""
通过水准线路编码查询账号信息(优化版,减少数据库查询次数)
业务逻辑:
1. 根据linecode在水准数据表查询最新的NYID
2. 根据NYID在沉降数据表批量查询所有point_id去重
3. 根据point_id列表在观测点表批量查询所有section_id去重
4. 根据section_id列表在断面表批量查询所有account_id去重
5. 根据account_id列表在账号表批量查询账号信息
使用IN查询避免循环大幅提升性能
"""
from ..models.level_data import LevelData
from ..models.settlement_data import SettlementData
from ..models.checkpoint import Checkpoint
from ..models.section_data import SectionData
try:
logger.info(f"开始通过linecode={linecode}查询账号信息")
# 1. 根据linecode查询最新的水准数据按NYID降序取第一条
level_data = db.query(LevelData).filter(
LevelData.linecode == linecode
).order_by(LevelData.NYID.desc()).first()
if not level_data:
logger.warning(f"未找到linecode={linecode}对应的水准数据")
return []
nyid = level_data.NYID
logger.info(f"找到最新期数NYID={nyid}")
# 2. 根据NYID批量查询沉降数据提取所有point_id去重
settlement_list = db.query(SettlementData.point_id).filter(
SettlementData.NYID == nyid
).distinct().all()
if not settlement_list:
logger.warning(f"未找到NYID={nyid}对应的沉降数据")
return []
point_ids = [s.point_id for s in settlement_list if s.point_id]
logger.info(f"找到{len(point_ids)}个观测点ID")
# 3. 根据point_id列表批量查询观测点数据提取所有section_id去重
checkpoint_list = db.query(Checkpoint.section_id).filter(
Checkpoint.point_id.in_(point_ids)
).distinct().all()
if not checkpoint_list:
logger.warning(f"未找到对应的观测点数据")
return []
section_ids = [c.section_id for c in checkpoint_list if c.section_id]
logger.info(f"找到{len(section_ids)}个断面ID")
# 4. 根据section_id列表批量查询断面数据提取所有account_id去重
section_list = db.query(SectionData.account_id).filter(
SectionData.section_id.in_(section_ids)
).distinct().all()
if not section_list:
logger.warning(f"未找到对应的断面数据")
return []
account_ids = [s.account_id for s in section_list if s.account_id]
logger.info(f"找到{len(account_ids)}个账号ID")
# 5. 根据account_id列表批量查询账号信息
accounts = db.query(Account).filter(
Account.id.in_(account_ids)
).all()
logger.info(f"查询完成,共找到{len(accounts)}个账号")
return accounts
except Exception as e:
logger.error(f"通过linecode={linecode}查询账号失败: {str(e)}", exc_info=True)
raise e

View File

@@ -115,11 +115,8 @@ class DailyDataService(BaseService[DailyData]):
# 模型字段列表 # 模型字段列表
model_columns = [getattr(SettlementData, col.name) for col in SettlementData.__table__.columns] model_columns = [getattr(SettlementData, col.name) for col in SettlementData.__table__.columns]
# 基础条件 # 基础条件:不按 useflag 过滤,确保能取到每个 point 的真正最新一期(按 NYID 最大)
base_conditions = [ base_conditions = []
SettlementData.useflag.isnot(None),
SettlementData.useflag != 0
]
if point_ids: if point_ids:
base_conditions.append(SettlementData.point_id.in_(point_ids)) base_conditions.append(SettlementData.point_id.in_(point_ids))

View File

@@ -1,12 +1,13 @@
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy import text, inspect from sqlalchemy import text, inspect
from typing import List, Optional, Dict, Any from typing import List, Optional, Dict, Any, Tuple
from ..models.level_data import LevelData from ..models.level_data import LevelData
from .base import BaseService from .base import BaseService
from ..models.settlement_data import SettlementData from ..models.settlement_data import SettlementData
from ..models.checkpoint import Checkpoint from ..models.checkpoint import Checkpoint
from ..models.section_data import SectionData from ..models.section_data import SectionData
from ..models.account import Account from ..models.account import Account
from ..models.lose_data import LoseData
from ..core.database import engine from ..core.database import engine
import logging import logging
import os import os
@@ -30,6 +31,12 @@ class LevelDataService(BaseService[LevelData]):
"""根据水准线路编码获取水准数据""" """根据水准线路编码获取水准数据"""
return self.get_by_field(db, "linecode", linecode) return self.get_by_field(db, "linecode", linecode)
def get_last_by_linecode(self, db: Session, linecode: str) -> Optional[LevelData]:
"""根据水准线路编码获取最新的水准数据按NYID降序"""
return db.query(LevelData).filter(
LevelData.linecode == linecode
).order_by(LevelData.NYID.desc()).first()
def search_level_data(self, db: Session, def search_level_data(self, db: Session,
id: Optional[str] = None, id: Optional[str] = None,
linecode: Optional[str] = None, linecode: Optional[str] = None,
@@ -458,6 +465,137 @@ class LevelDataService(BaseService[LevelData]):
return original_data_map return original_data_map
def _get_nyids_with_settlement(self, db: Session, nyid_list: List[str]) -> set:
"""返回在沉降表中存在记录的 NYID 集合"""
if not nyid_list:
return set()
rows = db.query(SettlementData.NYID).filter(SettlementData.NYID.in_(nyid_list)).distinct().all()
return {r[0] for r in rows if r[0]}
def _get_nyid_to_all_points_account_section(
self, db: Session, nyid_list: List[str]
) -> Dict[str, List[Tuple[str, int, Optional[str]]]]:
"""通过沉降 -> 观测点 -> 断面 得到每个 NYID 对应的【所有】测点列表 [(point_id, account_id, section_id), ...],无沉降的 NYID 返回 [('', 0, None)]"""
if not nyid_list:
return {}
default_list = [("", 0, None)]
settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all()
# 每个 NYID 对应该期数下所有出现过的 (point_id),去重但保留顺序
nyid_to_points: Dict[str, List[Tuple[str, int, Optional[str]]]] = {}
point_ids = list({s.point_id for s in settlements if s.point_id})
if not point_ids:
return {nyid: default_list for nyid in nyid_list}
checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_ids)).all()
point_to_section = {c.point_id: (c.section_id or None) for c in checkpoints}
section_ids = list({c.section_id for c in checkpoints if c.section_id})
point_to_account: Dict[str, int] = {}
if section_ids:
sections = db.query(SectionData).filter(SectionData.section_id.in_(section_ids)).all()
for c in checkpoints:
sec = next((s for s in sections if s.section_id == c.section_id), None)
if sec and sec.account_id is not None:
try:
point_to_account[c.point_id] = int(sec.account_id)
except (ValueError, TypeError):
point_to_account[c.point_id] = 0
# 按 NYID 分组,每个 NYID 下该期数出现的所有 point_id去重
for nyid in nyid_list:
nyid_to_points[nyid] = []
seen_per_nyid: Dict[str, set] = {nyid: set() for nyid in nyid_list}
for s in settlements:
pt_id = (s.point_id or "") if s.point_id else ""
if pt_id not in seen_per_nyid.get(s.NYID, set()):
seen_per_nyid[s.NYID].add(pt_id)
acc = point_to_account.get(s.point_id, 0)
sec_id = point_to_section.get(s.point_id)
nyid_to_points[s.NYID].append((pt_id, acc, sec_id))
for nyid in nyid_list:
if not nyid_to_points[nyid]:
nyid_to_points[nyid] = default_list
return nyid_to_points
def _sync_lose_data_for_one_linecode(
self,
db: Session,
linecode_val: str,
level_list: List[LevelData],
) -> None:
"""仅处理一个水准线路编码:查该线路的 NYID查沉降/原始,写入 lose_data。"""
pairs = [(item.linecode, str(item.NYID)) for item in level_list if item.NYID]
if not pairs:
return
nyid_list = list({nyid for _, nyid in pairs})
settlement_nyids = self._get_nyids_with_settlement(db, nyid_list)
original_data_map = self._find_original_data_by_nyids(db, nyid_list)
original_nyids = set()
for rows in original_data_map.values():
for row in rows:
n = row.get("NYID")
if n is not None:
original_nyids.add(str(n))
nyid_to_points_asp = self._get_nyid_to_all_points_account_section(db, nyid_list)
for linecode_, nyid in pairs:
has_original = nyid in original_nyids
has_settlement = nyid in settlement_nyids
lose_val = (0 if has_original else 1) + (0 if has_settlement else 2)
points_list = nyid_to_points_asp.get(nyid, [("", 0, None)])
for point_id, acc_id, sec_id in points_list:
pt_id = point_id or ""
existing = db.query(LoseData).filter(
LoseData.linecode == linecode_,
LoseData.NYID == nyid,
LoseData.point_id == pt_id,
).first()
if existing:
existing.lose_data = lose_val
existing.account_id = acc_id
existing.section_id = sec_id
else:
db.add(LoseData(
account_id=acc_id,
NYID=nyid,
linecode=linecode_,
lose_data=lose_val,
section_id=sec_id,
point_id=pt_id,
))
def sync_lose_data(self, db: Session, linecode: Optional[str] = None) -> Dict[str, Any]:
"""
同步缺失数据到 lose_data 表。
无 linecode按「每个水准线路编码」分批处理每批只查该线路的 NYID 再查沉降/原始并插入,不返回明细。
有 linecode只处理该线路并返回该线路在 lose_data 中的记录列表。
缺失规则:原始数据无=1、有=0沉降数据无=2、有=0lose_data 字段为二者之和 0/1/2/3。
同一 (linecode, NYID) 不重复插入,存在则更新。
"""
try:
if linecode:
level_list = self.get_by_linecode(db, linecode=linecode)
if not level_list:
return {"success": True, "data": []}
self._sync_lose_data_for_one_linecode(db, linecode, level_list)
db.commit()
records = db.query(LoseData).filter(LoseData.linecode == linecode).order_by(LoseData.NYID.desc()).all()
return {"success": True, "data": [r.to_dict() for r in records]}
# 全量:先取所有不重复的 linecode再按每个 linecode 分批处理
linecode_rows = db.query(LevelData.linecode).distinct().all()
linecodes = [r[0] for r in linecode_rows if r[0]]
if not linecodes:
return {"success": True, "data": None}
for lc in linecodes:
level_list = self.get_by_linecode(db, linecode=lc)
self._sync_lose_data_for_one_linecode(db, lc, level_list)
db.commit()
logger.info(f"sync_lose_data 已处理线路: {lc}")
return {"success": True, "data": None}
except Exception as e:
db.rollback()
logger.error(f"sync_lose_data 失败: {str(e)}", exc_info=True)
return {"success": False, "data": [] if linecode else None, "message": str(e)}
def _backup_data_to_sql(self, db: Session, level_data_list: List[LevelData], def _backup_data_to_sql(self, db: Session, level_data_list: List[LevelData],
settlement_list: List[SettlementData], settlement_list: List[SettlementData],
checkpoint_list: List[Checkpoint], checkpoint_list: List[Checkpoint],

View File

@@ -30,6 +30,21 @@ class SettlementDataService(BaseService[SettlementData]):
def get_by_nyid(self, db: Session, nyid: str) -> List[SettlementData]: def get_by_nyid(self, db: Session, nyid: str) -> List[SettlementData]:
"""根据期数ID获取沉降数据""" """根据期数ID获取沉降数据"""
return self.get_by_field(db, "NYID", nyid) return self.get_by_field(db, "NYID", nyid)
def get_one_dict_by_nyid(self, db: Session, nyid: str) -> Optional[Dict[str, Any]]:
"""根据期数ID取一条沉降记录并转为字典供推理/到期计算用datetime 转为字符串)"""
row = db.query(SettlementData).filter(SettlementData.NYID == nyid).first()
if not row:
return None
field_names = [c.name for c in SettlementData.__table__.columns]
item = {}
for k in field_names:
v = getattr(row, k)
if isinstance(v, datetime):
item[k] = v.strftime("%Y-%m-%d %H:%M:%S")
else:
item[k] = v
return item
def get_by_nyid_and_point_id(self, db: Session, nyid: str, point_id: str) -> List[SettlementData]: def get_by_nyid_and_point_id(self, db: Session, nyid: str, point_id: str) -> List[SettlementData]:
"""根据期数ID和观测点ID获取沉降数据""" """根据期数ID和观测点ID获取沉降数据"""
return self.get_by_field(db, "NYID", nyid, "point_id", point_id) return self.get_by_field(db, "NYID", nyid, "point_id", point_id)

View File

@@ -5,91 +5,16 @@ import copy
# 注意:根据实际项目路径调整导入,若本地测试可注释掉 # 注意:根据实际项目路径调整导入,若本地测试可注释掉
from ..core.logging_config import get_logger from ..core.logging_config import get_logger
import json import json
from .operating_mode_config import BASE_PERIODS
logger = get_logger(__name__) logger = get_logger(__name__)
class ConstructionMonitorUtils: class ConstructionMonitorUtils:
def __init__(self): def __init__(self):
# 原始工况周期映射表(保持不变) # 使用公共配置的工况周期映射表
self.base_periods = { self.base_periods = BASE_PERIODS.copy()
"路基或预压土填筑,连续填筑":1,
"路基或预压土填筑,两次填筑间隔时间较长":7,
"预压土或路基填筑完成第1~3个月":7,
"预压土或路基填筑完成第4~6个月":14,
"仰拱底板施工完成后第1个月": 7,
"预压土或路基填筑完成6个月以后":30,
"仰拱底板施工完成后第2至3个月": 14,
"仰拱底板施工完成后3个月以后": 30,
"仰拱(底板)施工完成后第1个月": 7, # 原:仰拱(底板)施工完成后,第1个月
"仰拱(底板)施工完成后第2至3个月": 14, # 原:仰拱(底板)施工完成后,第2至3个月
"仰拱(底板)施工完成后3个月以后": 30, # 原:仰拱(底板)施工完成后,3个月以后
"无砟轨道铺设后第1至3个月": 30, # 原:无砟轨道铺设后,第1至3个月
"无砟轨道铺设后4至12个月": 90, # 原:无砟轨道铺设后,4至12个月
"无砟轨道铺设后12个月以后": 180, # 原:无砟轨道铺设后,12个月以后
"墩台施工到一定高度": 30, # 无格式差异,保留原样
"墩台混凝土施工": 30, # 无格式差异,保留原样
"预制梁桥,架梁前": 30, # 原:预制梁桥,架梁前
"预制梁桥,预制梁架设前": 1, # 原:预制梁桥,预制梁架设前
"预制梁桥,预制梁架设后": 7, # 原:预制梁桥,预制梁架设后
"桥位施工桥梁,制梁前": 30, # 原:桥位施工桥梁,制梁前
"桥位施工桥梁,上部结构施工中": 1, # 原:桥位施工桥梁,上部结构施工中
# "架桥机(运梁车)通过": 7, # 无格式差异,保留原样
"桥梁主体工程完工后,第1至3个月": 7, # 原:桥梁主体工程完工后,第1至3个月
"桥梁主体工程完工后第4至6个月": 14, # 原:桥梁主体工程完工后,第4至6个月
"桥梁主体工程完工后,6个月以后": 30, # 原:桥梁主体工程完工后,6个月以后 ''
"轨道铺设期间,前": 30,
"轨道铺设期间,后": 14,
"轨道铺设完成后第1个月": 14,
"轨道铺设完成后2至3个月": 30,
"轨道铺设完成后4至12个月": 90,
"轨道铺设完成后12个月以后": 180,
"铺路或堆载,一般情况": 1,
"填筑或堆载,一般情况": 1,
"铺路或堆载,沉降量突变情况": 1,
"填筑或堆载,两次填筑间隔时间较长情况":3,
"铺路或堆载,两次铺路间隔时间较长情况": 3,
"堆载预压或路基填筑完成第1至3个月":7, # 原:堆载预压或路基铺路完成,第1至3个月
"堆载预压或路基填筑完成第4至6个月": 14, # 原:堆载预压或路基铺路完成,第4至6个月
"堆载预压或路基填筑完成6个月以后": 30, # 原:堆载预压或路基铺路完成,6个月以后
"架桥机(运梁车) 首次通过前": 1, # 原:架桥机(运梁车)首次通过前(仅加空格)
"架桥机(运梁车) 首次通过后前3天": 1, # 原:架桥机(运梁车)首次通过后,前3天
"架桥机(运梁车) 首次通过后": 7, # 原:架桥机(运梁车)首次通过后(仅加空格)
"轨道板(道床)铺设后第1个月": 14, # 原:轨道板(道床)铺设后,第1个月
"轨道板(道床)铺设后第2至3个月": 30, # 原:轨道板(道床)铺设后,第2至3个月
"轨道板(道床)铺设后3个月以后": 90,
"架桥机(运梁车)首次通过前": 1,
"架桥机(运梁车)首次通过后前3天": 1,
"架桥机(运梁车)首次通过后": 7,
"轨道板铺设前": 14,
"轨道板(道床)铺设后第1至3个月": 14,
"轨道板(道床)铺设后第4至6个月": 30,
"轨道板(道床)铺设后6个月以后": 90,
"站场填方路基段填筑完成至静态验收": 14,
"桥墩(台)地面处拆模后": 30,
"敦身混凝土施工": 30,
# "预制梁桥,架梁前": 30,
# "预制梁桥,预制梁架设前": 1,
"预制梁桥预制梁架设后": 7,
"现浇梁,浇筑前": 30,
"现浇梁上部结构施工中": 1,
"架桥机(运梁车)通过": 2,
"桥梁主体工程完工后第1至3个月": 7,
# "桥梁主体工程完工后第4至6个月": 14,
"侨梁主体工程完工后6个月以后": 30,
"轨道铺设,前": 30,
"轨道铺设,后": 14,
# "轨道铺设完成后第1个月": 14,
# "轨道铺设完成后2至3个月": 30,
# "轨道铺设完成后4至12个月": 90,
# "轨道铺设完成后12个月以后": 180,
# "仰拱(底板)施工完成后第1个月": 7,
# "仰拱(底板)施工完成后第2至3个月": 14,
# "仰拱(底板)施工完成后3个月以后": 30,
# "轨道板铺设前": 14,
# "无砟轨道铺设后第1至3个月": 30,
# "无砟轨道铺设后4至12个月": 90,
# "无砟轨道铺设后12个月以后": 180,
"特殊地段隧道施工完成后至静态验收": 14
}
# 构建中英文括号+逗号兼容映射表 # 构建中英文括号+逗号兼容映射表
self.compatible_periods = self._build_compatible_brackets_map() self.compatible_periods = self._build_compatible_brackets_map()
@@ -150,17 +75,13 @@ class ConstructionMonitorUtils:
if not point_data: if not point_data:
continue continue
# 过滤逻辑:仅保留 useflag 存在且值≠0 的记录 # 推理用最新一期:取按 NYID 排序后的第一条(上游已保证倒序),不因 useflag 排除最新期
latest_item = point_data[0]
# 用于冬休回溯等:仅 useflag 有效的历史记录
filtered_point_data = [ filtered_point_data = [
item for item in point_data item for item in point_data
if "useflag" in item and item["useflag"] != 0 # 核心条件:字段存在 + 非0 if "useflag" in item and item["useflag"] != 0
] ]
# 过滤后无数据则跳过当前测点
if not filtered_point_data:
continue
# 使用过滤后的数据处理
latest_item = filtered_point_data[0]
latest_condition = latest_item.get("workinfoname") latest_condition = latest_item.get("workinfoname")
if not latest_condition: if not latest_condition:
result["error_data"].append(latest_item) result["error_data"].append(latest_item)
@@ -218,7 +139,12 @@ class ConstructionMonitorUtils:
continue continue
if not base_condition: if not base_condition:
result["winter"].append(item_copy) # 当前为冬休且历史全是冬休 → 视为数据未补全remaining 固定为 -365
if latest_condition == "冬休":
item_copy["remaining"] = -365
result["data"].append(item_copy)
else:
result["winter"].append(item_copy)
continue continue
# 核心修改:冬休回溯场景下调整测量间隔(基准周期) # 核心修改:冬休回溯场景下调整测量间隔(基准周期)

View File

@@ -1,5 +1,14 @@
from datetime import datetime, date from datetime import datetime, date
from .operating_mode_config import (
BASE_PERIODS,
OLD_TO_NEW_MAP,
CONDITION_GROUP,
TRANSITION_RULES,
WINTER_BREAK_LABELS,
)
class OperatingModePredictor: class OperatingModePredictor:
""" """
工况预测类(处理二维倒序数据,返回一维列表,仅保留各内嵌列表最新记录) 工况预测类(处理二维倒序数据,返回一维列表,仅保留各内嵌列表最新记录)
@@ -16,15 +25,15 @@ class OperatingModePredictor:
def __init__(self): def __init__(self):
"""初始化类,加载核心配置""" """初始化类,加载核心配置"""
# 1. 基础工况配置(最终返回的规范名称,含所有新旧工况) # 1. 基础工况配置(最终返回的规范名称,含所有新旧工况)
self.base_periods = self._load_base_periods() self.base_periods = BASE_PERIODS.copy()
# 2. 旧→新等效映射(优先返回新工况) # 2. 旧→新等效映射(优先返回新工况)
self.old_to_new_map = self._load_old_to_new_map() self.old_to_new_map = OLD_TO_NEW_MAP.copy()
# 3. 工况分组(同义工况归为同一分组,复用切换规则) # 3. 工况分组(同义工况归为同一分组,复用切换规则)
self.condition_group = self._load_condition_group() self.condition_group = CONDITION_GROUP.copy()
# 4. 切换触发规则(沿用原逻辑,触发天数+目标工况) # 4. 切换触发规则(沿用原逻辑,触发天数+目标工况)
self.transition_rules = self._load_transition_rules() self.transition_rules = TRANSITION_RULES.copy()
# 5. 冬休标识 # 5. 冬休标识
self.winter_break_labels = {"冬休"} self.winter_break_labels = WINTER_BREAK_LABELS.copy()
# 辅助映射标准化名称→base_periods中的规范名称用于最终返回 # 辅助映射标准化名称→base_periods中的规范名称用于最终返回
self.std_to_canonical = { self.std_to_canonical = {
@@ -55,196 +64,6 @@ class OperatingModePredictor:
std_name = std_name.replace(old, new) std_name = std_name.replace(old, new)
return std_name return std_name
def _load_base_periods(self):
"""加载基础工况配置(最终返回的规范名称,无多余数字)"""
return {
# 路基工况(新工况优先)
"路基或预压土填筑,连续填筑": 1,
"路基或预压土填筑,两次填筑间隔时间较长": 7,
"预压土或路基填筑完成第1~3个月": 7,
"预压土或路基填筑完成第4~6个月": 14,
"预压土或路基填筑完成6个月以后": 30,
"架桥机(运梁车)首次通过前": 1,
"架桥机(运梁车)首次通过后前3天": 1,
"架桥机(运梁车)首次通过后": 7,
"轨道板(道床)铺设后第1至3个月": 14,
"轨道板(道床)铺设后第4至6个月": 30,
"轨道板(道床)铺设后6个月以后": 90,
# 路基旧工况(保留,无等效则返回)
"填筑或堆载,一般情况": 1,
"填筑或堆载,两次填筑间隔时间较长情况": 7,
"堆载预压或路基填筑完成6个月以后": 30,
"轨道板(道床)铺设后第1个月": 14,
"轨道板(道床)铺设后第2至3个月": 30,
"轨道板(道床)铺设后3个月以后": 90,
# 桥梁工况(新工况优先)
"桥墩(台)地面处拆模后": 30,
"敦身混凝土施工": 30,
"预制梁桥,预制梁架设前": 1,
"预制梁桥,预制梁架设后": 7,
"现浇梁,浇筑前": 30,
"现浇梁上部结构施工中": 1,
"架桥机(运梁车)通过": 2,
"桥梁主体工程完工后第1至3个月": 7,
"桥梁主体工程完工后第4至6个月": 14,
"桥梁主体工程完工后6个月以后": 30,
"轨道铺设,前": 30,
"轨道铺设,后": 14,
"轨道铺设完成后第1个月": 14,
"轨道铺设完成后2至3个月": 30,
"轨道铺设完成后4至12个月": 90,
"轨道铺设完成后12个月以后": 180,
# 桥梁旧工况(保留,无等效则返回)
"墩台施工到一定高度": 30,
"墩台混凝土施工": 30,
"预制梁桥,架梁前": 30,
"桥位施工桥梁,制梁前": 30,
"桥位施工桥梁,上部结构施工中": 1,
"桥梁主体工程完工后,第1至3个月": 7,
"轨道铺设期间,前": 30,
"轨道铺设期间,后": 14,
# 隧道工况
"仰拱(底板)施工完成后第1个月": 7,
"仰拱(底板)施工完成后第2至3个月": 14,
"仰拱(底板)施工完成后3个月以后": 30,
"无砟轨道铺设后第1至3个月": 30,
"无砟轨道铺设后4至12个月": 90,
"无砟轨道铺设后12个月以后": 180,
# 特殊工况
"冬休": 0
}
def _load_old_to_new_map(self):
"""加载旧→新等效映射(优先返回新工况)"""
return {
# 路基等效
"填筑或堆载,一般情况": "路基或预压土填筑,连续填筑",
"填筑或堆载,两次填筑间隔时间较长情况": "路基或预压土填筑,两次填筑间隔时间较长",
"预压土或路基填筑完成。第1~3个月": "预压土或路基填筑完成第1~3个月",
"堆载预压或路基填筑完成6个月以后": "预压土或路基填筑完成6个月以后",
"轨道板(道床)铺设后第1个月": "轨道板(道床)铺设后第1至3个月",
"轨道板(道床)铺设后第2至3个月": "轨道板(道床)铺设后第4至6个月",
"轨道板(道床)铺设后3个月以后": "轨道板(道床)铺设后6个月以后",
# 桥梁等效
"墩台施工到一定高度": "桥墩(台)地面处拆模后",
"墩台混凝土施工": "敦身混凝土施工",
"桥位施工桥梁,制梁前": "现浇梁,浇筑前",
"桥位施工桥梁,上部结构施工中": "现浇梁上部结构施工中",
"桥梁主体工程完工后,第1至3个月": "桥梁主体工程完工后第1至3个月",
"轨道铺设期间,前": "轨道铺设,前",
"轨道铺设期间,后": "轨道铺设,后"
}
def _load_condition_group(self):
"""加载工况分组(同义工况归为同一分组)"""
group_map = {
# 路基分组
"路基或预压土填筑,连续填筑": "DZ_CONTINUE",
"路基或预压土填筑,两次填筑间隔时间较长": "DZ_INTERVAL",
"预压土或路基填筑完成第1~3个月": "DZ_FINISH_1_3",
"预压土或路基填筑完成第4~6个月": "DZ_FINISH_4_6",
"预压土或路基填筑完成6个月以后": "DZ_FINISH_AFTER_6",
"架桥机(运梁车)首次通过前": "JQJ_FIRST_BEFORE",
"架桥机(运梁车)首次通过后前3天": "JQJ_FIRST_AFTER_3D",
"架桥机(运梁车)首次通过后": "JQJ_FIRST_AFTER",
"轨道板(道床)铺设后第1至3个月": "GDB_FINISH_1_3",
"轨道板(道床)铺设后第4至6个月": "GDB_FINISH_4_6",
"轨道板(道床)铺设后6个月以后": "GDB_FINISH_AFTER_6",
# 路基旧工况分组(复用新工况分组)
"填筑或堆载,一般情况": "DZ_CONTINUE",
"填筑或堆载,两次填筑间隔时间较长情况": "DZ_INTERVAL",
"堆载预压或路基填筑完成6个月以后": "DZ_FINISH_AFTER_6",
"轨道板(道床)铺设后第1个月": "GDB_FINISH_1_3",
"轨道板(道床)铺设后第2至3个月": "GDB_FINISH_4_6",
"轨道板(道床)铺设后3个月以后": "GDB_FINISH_AFTER_6",
# 桥梁分组
"桥墩(台)地面处拆模后": "STATIC",
"敦身混凝土施工": "STATIC",
"预制梁桥,架梁前": "STATIC",
"预制梁桥,预制梁架设前": "YZLQ_BEFORE_JS",
"预制梁桥,预制梁架设后": "YZLQ_AFTER_JS",
"现浇梁,浇筑前": "STATIC",
"现浇梁上部结构施工中": "STATIC",
"架桥机(运梁车)通过": "STATIC",
"桥梁主体工程完工后第1至3个月": "QL_ZHUTI_1_3",
"桥梁主体工程完工后第4至6个月": "QL_ZHUTI_4_6",
"桥梁主体工程完工后6个月以后": "QL_ZHUTI_AFTER_6",
"轨道铺设,前": "STATIC",
"轨道铺设,后": "STATIC",
"轨道铺设完成后第1个月": "GD_FINISH_1",
"轨道铺设完成后2至3个月": "GD_FINISH_2_3",
"轨道铺设完成后4至12个月": "GD_FINISH_4_12",
"轨道铺设完成后12个月以后": "GD_FINISH_AFTER_12",
# 桥梁旧工况分组(复用新工况分组)
"墩台施工到一定高度": "STATIC",
"墩台混凝土施工": "STATIC",
"桥位施工桥梁,制梁前": "STATIC",
"桥位施工桥梁,上部结构施工中": "STATIC",
"桥梁主体工程完工后,第1至3个月": "QL_ZHUTI_1_3",
"轨道铺设期间,前": "STATIC",
"轨道铺设期间,后": "STATIC",
# 隧道分组
"仰拱(底板)施工完成后第1个月": "YG_DIBAN_1",
"仰拱(底板)施工完成后第2至3个月": "YG_DIBAN_2_3",
"仰拱(底板)施工完成后3个月以后": "YG_DIBAN_AFTER_3",
"无砟轨道铺设后第1至3个月": "WZGD_1_3",
"无砟轨道铺设后4至12个月": "WZGD_4_12",
"无砟轨道铺设后12个月以后": "WZGD_AFTER_12",
# 特殊工况
"冬休": "STATIC"
}
return group_map
def _load_transition_rules(self):
"""加载切换触发规则沿用原逻辑以base_periods为准"""
return {
# 路基切换规则
"DZ_FINISH_1_3": {"trigger_days": 90, "next": ["预压土或路基填筑完成第4~6个月"]},
"DZ_FINISH_4_6": {"trigger_days": 90, "next": ["预压土或路基填筑完成6个月以后"]},
"DZ_FINISH_AFTER_6": {"trigger_days": None, "next": None},
"JQJ_FIRST_BEFORE": {"trigger_days": 1, "next": ["架桥机(运梁车)首次通过后前3天"]},
"JQJ_FIRST_AFTER_3D": {"trigger_days": 3, "next": ["架桥机(运梁车)首次通过后"]},
"JQJ_FIRST_AFTER": {"trigger_days": None, "next": None},
"GDB_FINISH_1_3": {"trigger_days": 30, "next": ["轨道板(道床)铺设后第4至6个月"]},
"GDB_FINISH_4_6": {"trigger_days": 30, "next": ["轨道板(道床)铺设后6个月以后"]},
"GDB_FINISH_AFTER_6": {"trigger_days": None, "next": None},
# 桥梁切换规则
"YZLQ_BEFORE_JS": {"trigger_days": 1, "next": ["架桥机(运梁车)通过"]},
"YZLQ_AFTER_JS": {"trigger_days": 7, "next": ["桥梁主体工程完工后第1至3个月"]},
"QL_ZHUTI_1_3": {"trigger_days": 90, "next": ["桥梁主体工程完工后第4至6个月"]},
"QL_ZHUTI_4_6": {"trigger_days": 90, "next": ["桥梁主体工程完工后6个月以后"]},
"QL_ZHUTI_AFTER_6": {"trigger_days": None, "next": None},
"GD_FINISH_1": {"trigger_days": 30, "next": ["轨道铺设完成后2至3个月"]},
"GD_FINISH_2_3": {"trigger_days": 60, "next": ["轨道铺设完成后4至12个月"]},
"GD_FINISH_4_12": {"trigger_days": 240, "next": ["轨道铺设完成后12个月以后"]},
"GD_FINISH_AFTER_12": {"trigger_days": None, "next": None},
# 隧道切换规则
"YG_DIBAN_1": {"trigger_days": 30, "next": ["仰拱(底板)施工完成后第2至3个月"]},
"YG_DIBAN_2_3": {"trigger_days": 60, "next": ["仰拱(底板)施工完成后3个月以后"]},
"YG_DIBAN_AFTER_3": {"trigger_days": None, "next": None},
"WZGD_1_3": {"trigger_days": 90, "next": ["无砟轨道铺设后4至12个月"]},
"WZGD_4_12": {"trigger_days": 240, "next": ["无砟轨道铺设后12个月以后"]},
"WZGD_AFTER_12": {"trigger_days": None, "next": None},
# 静态分组(无切换)
"DZ_CONTINUE": {"trigger_days": None, "next": None},
"DZ_INTERVAL": {"trigger_days": None, "next": None},
"STATIC": {"trigger_days": None, "next": None}
}
def _parse_to_date(self, time_str): def _parse_to_date(self, time_str):
"""解析时间字符串为date对象仅保留年月日""" """解析时间字符串为date对象仅保留年月日"""
if not time_str: if not time_str:

View File

@@ -0,0 +1,246 @@
# -*- coding: utf-8 -*-
"""
工况公共配置模块
供 get_operating_mode.py工况预测和 construction_monitor.py施工监测共用
"""
# ==================== 基础工况周期(天) ====================
# 含新旧工况及兼容变体,键为工况名称,值为测量间隔天数
BASE_PERIODS = {
# 路基工况(新工况)
"路基或预压土填筑,连续填筑": 1,
"路基或预压土填筑,两次填筑间隔时间较长": 7,
"预压土或路基填筑完成第1~3个月": 7,
"预压土或路基填筑完成第4~6个月": 14,
"预压土或路基填筑完成6个月以后": 30,
"架桥机(运梁车)首次通过前": 1,
"架桥机(运梁车)首次通过后前3天": 1,
"架桥机(运梁车)首次通过后": 7,
"轨道板(道床)铺设后第1至3个月": 14,
"轨道板(道床)铺设后第4至6个月": 30,
"轨道板(道床)铺设后6个月以后": 90,
# 路基旧工况
"填筑或堆载,一般情况": 1,
"填筑或堆载,两次填筑间隔时间较长情况": 7,
"堆载预压或路基填筑完成第1至3个月": 7,
"堆载预压或路基填筑完成第4至6个月": 14,
"堆载预压或路基填筑完成6个月以后": 30,
"轨道板(道床)铺设后第1个月": 14,
"轨道板(道床)铺设后第2至3个月": 30,
"轨道板(道床)铺设后3个月以后": 90,
# 路基兼容变体(铺路/填筑同义)
"铺路或堆载,一般情况": 1,
"铺路或堆载,沉降量突变情况": 1,
"铺路或堆载,两次铺路间隔时间较长情况": 3,
# 桥梁工况(新工况)
"桥墩(台)地面处拆模后": 30,
"墩身混凝土施工": 30,
"预制梁桥,预制梁架设前": 1,
"预制梁桥,预制梁架设后": 7,
"现浇梁,浇筑前": 30,
"现浇梁上部结构施工中": 1,
"架桥机(运梁车)通过": 2,
"桥梁主体工程完工后第1至3个月": 7,
"桥梁主体工程完工后第4至6个月": 14,
"桥梁主体工程完工后6个月以后": 30,
"轨道铺设,前": 30,
"轨道铺设,后": 14,
"轨道铺设完成后第1个月": 14,
"轨道铺设完成后2至3个月": 30,
"轨道铺设完成后4至12个月": 90,
"轨道铺设完成后12个月以后": 180,
# 桥梁旧工况
"墩台施工到一定高度": 30,
"墩台混凝土施工": 30,
"预制梁桥,架梁前": 30,
"桥位施工桥梁,制梁前": 30,
"桥位施工桥梁,上部结构施工中": 1,
"桥梁主体工程完工后,第1至3个月": 7,
"桥梁主体工程完工后,第4至6个月": 14,
"桥梁主体工程完工后,6个月以后": 30,
"轨道铺设期间,前": 30,
"轨道铺设期间,后": 14,
# 桥梁兼容变体
"架桥机(运梁车) 首次通过前": 1,
"架桥机(运梁车) 首次通过后前3天": 1,
"架桥机(运梁车) 首次通过后": 7,
"预制梁桥预制梁架设后": 7,
# 隧道工况
"仰拱(底板)施工完成后第1个月": 7,
"仰拱(底板)施工完成后第2至3个月": 14,
"仰拱(底板)施工完成后3个月以后": 30,
"无砟轨道铺设后第1至3个月": 30,
"无砟轨道铺设后4至12个月": 90,
"无砟轨道铺设后12个月以后": 180,
# 隧道兼容变体(中文括号)
"仰拱底板施工完成后第1个月": 7,
"仰拱底板施工完成后第2至3个月": 14,
"仰拱底板施工完成后3个月以后": 30,
# 其他特殊工况
"轨道板铺设前": 14,
"站场填方路基段填筑完成至静态验收": 14,
"特殊地段隧道施工完成后至静态验收": 14,
"冬休": 0,
}
# ==================== 旧→新等效映射(工况预测用) ====================
OLD_TO_NEW_MAP = {
# 路基等效
"填筑或堆载,一般情况": "路基或预压土填筑,连续填筑",
"填筑或堆载,两次填筑间隔时间较长情况": "路基或预压土填筑,两次填筑间隔时间较长",
"堆载预压或路基填筑完成第1至3个月": "预压土或路基填筑完成第1~3个月",
"堆载预压或路基填筑完成第4至6个月": "预压土或路基填筑完成第4~6个月",
"堆载预压或路基填筑完成6个月以后": "预压土或路基填筑完成6个月以后",
"轨道板(道床)铺设后第1个月": "轨道板(道床)铺设后第1至3个月",
"轨道板(道床)铺设后第2至3个月": "轨道板(道床)铺设后第4至6个月",
"轨道板(道床)铺设后3个月以后": "轨道板(道床)铺设后6个月以后",
"铺路或堆载,一般情况": "路基或预压土填筑,连续填筑",
"铺路或堆载,沉降量突变情况": "路基或预压土填筑,连续填筑",
"铺路或堆载,两次铺路间隔时间较长情况": "路基或预压土填筑,两次填筑间隔时间较长",
# 桥梁等效
"墩台施工到一定高度": "桥墩(台)地面处拆模后",
"墩台混凝土施工": "墩身混凝土施工",
"桥位施工桥梁,制梁前": "现浇梁,浇筑前",
"桥位施工桥梁,上部结构施工中": "现浇梁上部结构施工中",
"桥梁主体工程完工后,第1至3个月": "桥梁主体工程完工后第1至3个月",
"桥梁主体工程完工后,第4至6个月": "桥梁主体工程完工后第4至6个月",
"桥梁主体工程完工后,6个月以后": "桥梁主体工程完工后6个月以后",
"轨道铺设期间,前": "轨道铺设,前",
"轨道铺设期间,后": "轨道铺设,后",
"预制梁桥预制梁架设后": "预制梁桥,预制梁架设后",
"架桥机(运梁车) 首次通过前": "架桥机(运梁车)首次通过前",
"架桥机(运梁车) 首次通过后前3天": "架桥机(运梁车)首次通过后前3天",
"架桥机(运梁车) 首次通过后": "架桥机(运梁车)首次通过后",
}
# ==================== 工况分组(工况预测用) ====================
CONDITION_GROUP = {
# 路基分组
"路基或预压土填筑,连续填筑": "DZ_CONTINUE",
"路基或预压土填筑,两次填筑间隔时间较长": "DZ_INTERVAL",
"预压土或路基填筑完成第1~3个月": "DZ_FINISH_1_3",
"预压土或路基填筑完成第4~6个月": "DZ_FINISH_4_6",
"预压土或路基填筑完成6个月以后": "DZ_FINISH_AFTER_6",
"架桥机(运梁车)首次通过前": "JQJ_FIRST_BEFORE",
"架桥机(运梁车)首次通过后前3天": "JQJ_FIRST_AFTER_3D",
"架桥机(运梁车)首次通过后": "JQJ_FIRST_AFTER",
"轨道板(道床)铺设后第1至3个月": "GDB_FINISH_1_3",
"轨道板(道床)铺设后第4至6个月": "GDB_FINISH_4_6",
"轨道板(道床)铺设后6个月以后": "GDB_FINISH_AFTER_6",
# 路基旧工况分组
"填筑或堆载,一般情况": "DZ_CONTINUE",
"填筑或堆载,两次填筑间隔时间较长情况": "DZ_INTERVAL",
"堆载预压或路基填筑完成第1至3个月": "DZ_FINISH_1_3",
"堆载预压或路基填筑完成第4至6个月": "DZ_FINISH_4_6",
"堆载预压或路基填筑完成6个月以后": "DZ_FINISH_AFTER_6",
"轨道板(道床)铺设后第1个月": "GDB_FINISH_1_3",
"轨道板(道床)铺设后第2至3个月": "GDB_FINISH_4_6",
"轨道板(道床)铺设后3个月以后": "GDB_FINISH_AFTER_6",
"铺路或堆载,一般情况": "DZ_CONTINUE",
"铺路或堆载,沉降量突变情况": "DZ_CONTINUE",
"铺路或堆载,两次铺路间隔时间较长情况": "DZ_INTERVAL",
# 桥梁分组
"桥墩(台)地面处拆模后": "STATIC",
"墩身混凝土施工": "STATIC",
"预制梁桥,架梁前": "STATIC",
"预制梁桥,预制梁架设前": "YZLQ_BEFORE_JS",
"预制梁桥,预制梁架设后": "YZLQ_AFTER_JS",
"现浇梁,浇筑前": "STATIC",
"现浇梁上部结构施工中": "STATIC",
"架桥机(运梁车)通过": "STATIC",
"桥梁主体工程完工后第1至3个月": "QL_ZHUTI_1_3",
"桥梁主体工程完工后第4至6个月": "QL_ZHUTI_4_6",
"桥梁主体工程完工后6个月以后": "QL_ZHUTI_AFTER_6",
"轨道铺设,前": "STATIC",
"轨道铺设,后": "STATIC",
"轨道铺设完成后第1个月": "GD_FINISH_1",
"轨道铺设完成后2至3个月": "GD_FINISH_2_3",
"轨道铺设完成后4至12个月": "GD_FINISH_4_12",
"轨道铺设完成后12个月以后": "GD_FINISH_AFTER_12",
# 桥梁旧工况分组
"墩台施工到一定高度": "STATIC",
"墩台混凝土施工": "STATIC",
"桥位施工桥梁,制梁前": "STATIC",
"桥位施工桥梁,上部结构施工中": "STATIC",
"桥梁主体工程完工后,第1至3个月": "QL_ZHUTI_1_3",
"桥梁主体工程完工后,第4至6个月": "QL_ZHUTI_4_6",
"桥梁主体工程完工后,6个月以后": "QL_ZHUTI_AFTER_6",
"轨道铺设期间,前": "STATIC",
"轨道铺设期间,后": "STATIC",
"预制梁桥预制梁架设后": "YZLQ_AFTER_JS",
"架桥机(运梁车) 首次通过前": "JQJ_FIRST_BEFORE",
"架桥机(运梁车) 首次通过后前3天": "JQJ_FIRST_AFTER_3D",
"架桥机(运梁车) 首次通过后": "JQJ_FIRST_AFTER",
# 隧道分组
"仰拱(底板)施工完成后第1个月": "YG_DIBAN_1",
"仰拱(底板)施工完成后第2至3个月": "YG_DIBAN_2_3",
"仰拱(底板)施工完成后3个月以后": "YG_DIBAN_AFTER_3",
"无砟轨道铺设后第1至3个月": "WZGD_1_3",
"无砟轨道铺设后4至12个月": "WZGD_4_12",
"无砟轨道铺设后12个月以后": "WZGD_AFTER_12",
"仰拱底板施工完成后第1个月": "YG_DIBAN_1",
"仰拱底板施工完成后第2至3个月": "YG_DIBAN_2_3",
"仰拱底板施工完成后3个月以后": "YG_DIBAN_AFTER_3",
# 其他特殊工况分组
"轨道板铺设前": "STATIC",
"站场填方路基段填筑完成至静态验收": "STATIC",
"特殊地段隧道施工完成后至静态验收": "STATIC",
# 特殊工况
"冬休": "STATIC",
}
# ==================== 切换触发规则(工况预测用) ====================
TRANSITION_RULES = {
# 路基切换规则
"DZ_FINISH_1_3": {"trigger_days": 90, "next": ["预压土或路基填筑完成第4~6个月"]},
"DZ_FINISH_4_6": {"trigger_days": 90, "next": ["预压土或路基填筑完成6个月以后"]},
"DZ_FINISH_AFTER_6": {"trigger_days": None, "next": None},
"JQJ_FIRST_BEFORE": {"trigger_days": 1, "next": ["架桥机(运梁车)首次通过后前3天"]},
"JQJ_FIRST_AFTER_3D": {"trigger_days": 3, "next": ["架桥机(运梁车)首次通过后"]},
"JQJ_FIRST_AFTER": {"trigger_days": None, "next": None},
"GDB_FINISH_1_3": {"trigger_days": 30, "next": ["轨道板(道床)铺设后第4至6个月"]},
"GDB_FINISH_4_6": {"trigger_days": 30, "next": ["轨道板(道床)铺设后6个月以后"]},
"GDB_FINISH_AFTER_6": {"trigger_days": None, "next": None},
# 桥梁切换规则
"YZLQ_BEFORE_JS": {"trigger_days": 1, "next": ["架桥机(运梁车)通过"]},
"YZLQ_AFTER_JS": {"trigger_days": 7, "next": ["桥梁主体工程完工后第1至3个月"]},
"QL_ZHUTI_1_3": {"trigger_days": 90, "next": ["桥梁主体工程完工后第4至6个月"]},
"QL_ZHUTI_4_6": {"trigger_days": 90, "next": ["桥梁主体工程完工后6个月以后"]},
"QL_ZHUTI_AFTER_6": {"trigger_days": None, "next": None},
"GD_FINISH_1": {"trigger_days": 30, "next": ["轨道铺设完成后2至3个月"]},
"GD_FINISH_2_3": {"trigger_days": 60, "next": ["轨道铺设完成后4至12个月"]},
"GD_FINISH_4_12": {"trigger_days": 240, "next": ["轨道铺设完成后12个月以后"]},
"GD_FINISH_AFTER_12": {"trigger_days": None, "next": None},
# 隧道切换规则
"YG_DIBAN_1": {"trigger_days": 30, "next": ["仰拱(底板)施工完成后第2至3个月"]},
"YG_DIBAN_2_3": {"trigger_days": 60, "next": ["仰拱(底板)施工完成后3个月以后"]},
"YG_DIBAN_AFTER_3": {"trigger_days": None, "next": None},
"WZGD_1_3": {"trigger_days": 90, "next": ["无砟轨道铺设后4至12个月"]},
"WZGD_4_12": {"trigger_days": 240, "next": ["无砟轨道铺设后12个月以后"]},
"WZGD_AFTER_12": {"trigger_days": None, "next": None},
# 静态分组(无切换)
"DZ_CONTINUE": {"trigger_days": None, "next": None},
"DZ_INTERVAL": {"trigger_days": None, "next": None},
"STATIC": {"trigger_days": None, "next": None},
}
# ==================== 冬休标识 ====================
WINTER_BREAK_LABELS = {"冬休"}

View File

@@ -16,7 +16,9 @@ from ..services.section_data import SectionDataService
from ..services.account import AccountService from ..services.account import AccountService
from ..models.daily import DailyData from ..models.daily import DailyData
from ..models.settlement_data import SettlementData from ..models.settlement_data import SettlementData
from typing import List from ..models.level_data import LevelData
from ..services.settlement_data import SettlementDataService
from typing import List, Tuple, Any
from ..utils.construction_monitor import ConstructionMonitorUtils from ..utils.construction_monitor import ConstructionMonitorUtils
import time import time
import json import json
@@ -95,10 +97,8 @@ class TaskScheduler:
# name='每日重置账号更新状态' # name='每日重置账号更新状态'
# ) # )
# logger.info("系统定时任务:每日重置账号更新状态已添加") # logger.info("系统定时任务:每日重置账号更新状态已添加")
# existing_job = None
# existing_job = self.scheduler.get_job("scheduled_get_max_nyid_by_point_id") # existing_job = self.scheduler.get_job("scheduled_get_max_nyid_by_point_id")
# if not existing_job: # if existing_job is None:
# # 添加每天凌晨1点执行获取max NYID关联数据任务
# self.scheduler.add_job( # self.scheduler.add_job(
# scheduled_get_max_nyid_by_point_id, # scheduled_get_max_nyid_by_point_id,
# 'cron', # 'cron',
@@ -234,92 +234,73 @@ def scheduled_get_max_nyid_by_point_id(start: int = 0,end: int = 0):
# logger.info(f"DailyData表清空完成共删除{delete_count}条历史记录") # logger.info(f"DailyData表清空完成共删除{delete_count}条历史记录")
# 1. 获取沉降数据(返回 List[List[dict]] # 1. 以 level_data 为来源:每个 linecode 取最新一期NYID 最大),再按该 NYID 从 settlement 取一条
daily_service = DailyDataService()
result = daily_service.get_nyid_by_point_id(db, [], 1)
# 2. 计算到期数据
monitor = ConstructionMonitorUtils()
daily_data = monitor.get_due_data(result,start=start,end=end)
data = daily_data['data']
error_data = daily_data['error_data']
winters = daily_data['winter']
logger.info(f"首次获取数据完成,共{len(result)}条记录")
# 3. 循环处理冬休数据,追溯历史非冬休记录
max_num = 1
print(f"首次获取冬休数据完成,共{len(winters)}条记录")
while 1:
max_num += 1
print(max_num)
# 提取冬休数据的point_id列表
new_list = [int(w['point_id']) for w in winters]
# print(new_list)
if new_list == []:
break
# 获取更多历史记录
nyid_list = daily_service.get_nyid_by_point_id(db, new_list, max_num)
w_list = monitor.get_due_data(nyid_list,start=start,end=end)
# 更新冬休、待处理、错误数据
winters = w_list['winter']
data.extend(w_list['data'])
# 过期数据一并处理
# data.extend(w_list['error_data'])
error_data.extend(w_list['error_data'])
# print(f"第{max_num}次获取冬休数据完成,共{len(winters)}条记录")
if winters == []:
break
data.extend(error_data)
# 4. 初始化服务实例
level_service = LevelDataService() level_service = LevelDataService()
settlement_service = SettlementDataService()
daily_service = DailyDataService()
checkpoint_db = CheckpointService() checkpoint_db = CheckpointService()
section_db = SectionDataService() section_db = SectionDataService()
account_service = AccountService() account_service = AccountService()
# 5. 关联其他表数据(核心逻辑保留) monitor = ConstructionMonitorUtils()
linecodes = [r[0] for r in db.query(LevelData.linecode).distinct().all()]
linecode_level_settlement: List[Tuple[str, Any, dict]] = []
for linecode in linecodes:
level_instance = level_service.get_last_by_linecode(db, linecode)
if not level_instance:
continue
nyid = level_instance.NYID
settlement_dict = settlement_service.get_one_dict_by_nyid(db, nyid)
if not settlement_dict:
continue
settlement_dict['__linecode'] = linecode
settlement_dict['__level_data'] = level_instance.to_dict()
linecode_level_settlement.append((linecode, level_instance, settlement_dict))
input_data = [[s] for (_, _, s) in linecode_level_settlement]
if not input_data:
logger.warning("未找到任何 linecode 对应的最新期沉降数据,跳过写 daily")
db.execute(text(f"TRUNCATE TABLE {DailyData.__tablename__}"))
db.commit()
db.close()
return
# 2. 计算到期数据remaining / 冬休等)
daily_data = monitor.get_due_data(input_data, start=start, end=end)
data = daily_data['data']
logger.info(f"按 level_data 最新期获取数据完成,共{len(data)}条有效记录")
# 3. 关联 level / checkpoint / section / accountlevel 已带在 __level_data
for d in data: for d in data:
# 处理 LevelData(假设返回列表,取第一条) d['level_data'] = d.pop('__level_data', None)
level_results = level_service.get_by_nyid(db, d['NYID']) d.pop('__linecode', None)
level_instance = level_results[0] if isinstance(level_results, list) and level_results else level_results
d['level_data'] = level_instance.to_dict() if level_instance else None
# 处理 CheckpointData返回单实例直接使用
checkpoint_instance = checkpoint_db.get_by_point_id(db, d['point_id']) checkpoint_instance = checkpoint_db.get_by_point_id(db, d['point_id'])
d['checkpoint_data'] = checkpoint_instance.to_dict() if checkpoint_instance else None d['checkpoint_data'] = checkpoint_instance.to_dict() if checkpoint_instance else None
# 处理 SectionData根据checkpoint_data关联
if d['checkpoint_data']: if d['checkpoint_data']:
section_instance = section_db.get_by_section_id(db, d['checkpoint_data']['section_id']) section_instance = section_db.get_by_section_id(db, d['checkpoint_data']['section_id'])
d['section_data'] = section_instance.to_dict() if section_instance else None d['section_data'] = section_instance.to_dict() if section_instance else None
else: else:
d['section_data'] = None d['section_data'] = None
# 处理 AccountData
if d.get('section_data') and d['section_data'].get('account_id'): if d.get('section_data') and d['section_data'].get('account_id'):
account_response = account_service.get_account(db, account_id=d['section_data']['account_id']) account_response = account_service.get_account(db, account_id=d['section_data']['account_id'])
d['account_data'] = account_response.__dict__ if account_response else None d['account_data'] = account_response.__dict__ if account_response else None
else: else:
d['account_data'] = None d['account_data'] = None
print(f"一共有{len(data)}条数据")
# 6. 构造DailyData数据并批量创建 # 4. 构造 DailyData(每条已是「每 linecode 最新一期」)
# daily_create_data1 = set()
daily_create_data = [] daily_create_data = []
nyids = []
for d in data: for d in data:
# 过滤无效数据(避免缺失关键字段报错) if not all(key in d for key in ['NYID', 'point_id', 'remaining']) or not d.get('level_data') or not d.get('account_data') or not d.get('section_data'):
if all(key in d for key in ['NYID', 'point_id','remaining']) and d.get('level_data') and d.get('account_data') and d.get('section_data'): continue
if d['NYID'] in nyids: tem = {
continue 'NYID': d['NYID'],
tem = { 'point_id': d['point_id'],
'NYID': d['NYID'], 'linecode': d['level_data']['linecode'],
'point_id': d['point_id'], 'account_id': d['account_data']['account_id'],
'linecode': d['level_data']['linecode'], 'section_id': d['section_data']['section_id'],
'account_id': d['account_data']['account_id'], 'remaining': (0 - int(d['overdue'])) if 'overdue' in d else d['remaining'],
'section_id': d['section_data']['section_id'], }
'remaining': (0-int(d['overdue'])) if 'overdue' in d else d['remaining'], daily_create_data.append(tem)
}
nyids.append(d['NYID'])
daily_create_data.append(tem)
# 批量创建记录 # 批量创建记录
print(daily_create_data) print(daily_create_data)
if daily_create_data: if daily_create_data: