Compare commits

...

31 Commits

Author SHA1 Message Date
whm
d80dd0f5ec 1.补充数据加到error表中,推理的改回之前的 2026-03-16 11:00:53 +08:00
lhx
65744e28dd 删除对象冲突 2026-03-16 10:26:19 +08:00
lhx
beea67482d 合并代码 2026-03-16 10:24:24 +08:00
whm
a577b8ef07 1.把日更字段和补全历史数据字段分开 2026-03-16 10:12:21 +08:00
whm
3f7b839e0a Merge branch 'main' of https://gitea.yuxindazhineng.com/admin/railway_cloud
# Conflicts:
#	app/models/lose_data.py
2026-03-15 12:33:32 +08:00
whm
e5de18e80f 1.补偿数据上传 2026-03-15 12:30:53 +08:00
08b556fa6e 上传文件至「app/models」
增加losedata对象
2026-03-15 11:39:28 +08:00
cef797782c 更新 app/models/level_data.py
losedata对象增加
2026-03-15 11:34:55 +08:00
whm
42dab6f961 1.新增水准线路补充检测接口 2026-03-15 10:42:33 +08:00
whm
dd9f31ee52 1.修改推理逻辑,useflag为1的也参与推理,之前是不用的点是1,正常的是0,现在发现正常的也有1,目前未知此参数具体作用。 2026-03-13 16:48:21 +08:00
whm
ed0d840aee 1.修改工况配置,适应新旧工况 2026-03-06 17:50:20 +08:00
lhx
02bc58a17d Merge branch 'main' of https://gitea.yuxindazhineng.com/admin/railway_cloud 2026-02-12 14:49:47 +08:00
lhx
3368744d4a 增加水准编码查询账号接口 2026-02-12 14:49:26 +08:00
whm
c26bd4e8e1 1.新工况替换 2026-02-10 18:08:28 +08:00
whm
6ccda8f8a7 1.工况回退 2026-02-07 10:57:09 +08:00
whm
fdf07e0dab 1.适配新的工况信息 2026-02-06 18:17:05 +08:00
whm
f7e77093df 1.工况符号修改 2026-02-05 15:47:17 +08:00
whm
e76711af9d 1.修改工况推理,把冬休情况加入 2026-02-04 15:43:02 +08:00
whm
539c2941e1 1.修改acconut用户接口新增device_name,device_port,device_ip,is_ok可修改字段 2026-02-03 16:45:08 +08:00
whm
192c299f80 1.获取今日上传的数据接口,用urllib模块 2026-02-03 16:32:33 +08:00
whm
cd3ced8833 1.删除引用request库 2026-02-03 15:10:30 +08:00
whm
e130134791 版本回退 2026-02-03 14:55:37 +08:00
whm
28cddd7409 1.添加get_uplaod_data,获取今日上传的数据接口
2.用户model添加三个返回字段
2026-02-03 14:47:13 +08:00
whm
5cfdadf02f 1.工况匹配符号修改 2026-02-02 11:14:27 +08:00
whm
ebe3f13a58 1.工况符号修改 2026-02-02 11:11:30 +08:00
whm
313ade1a60 1.修改工况接口返回格式 2026-01-29 18:17:52 +08:00
whm
de3ce23654 1.修改返回的数据 2026-01-29 18:15:54 +08:00
whm
6517af823d 1.修改推理工况的默认期数 2026-01-29 18:01:40 +08:00
whm
2eb7b9b5c1 1.修改工况推理接口 2026-01-29 17:45:33 +08:00
lhx
8b3796dd5a 合并代码 2026-01-20 00:48:01 +00:00
lhx
a7b7a786a5 合并代码 2026-01-20 00:39:46 +00:00
21 changed files with 1357 additions and 155 deletions

View File

@@ -9,7 +9,12 @@ from ..schemas.account import (
AccountApiResponse, AccountListResponse,AccountGetRequestYH AccountApiResponse, AccountListResponse,AccountGetRequestYH
) )
from ..services.account import AccountService from ..services.account import AccountService
import json
from typing import List, Optional
from urllib.error import HTTPError, URLError
from urllib import request as urllib_request
from urllib.parse import urlencode
from socket import timeout
router = APIRouter(prefix="/accounts", tags=["账号管理"]) router = APIRouter(prefix="/accounts", tags=["账号管理"])
@router.post("/create", response_model=AccountApiResponse, status_code=status.HTTP_201_CREATED) @router.post("/create", response_model=AccountApiResponse, status_code=status.HTTP_201_CREATED)
@@ -112,6 +117,7 @@ def update_account(request: AccountUpdateRequest, db: Session = Depends(get_db))
@router.post("/delete", response_model=AccountApiResponse) @router.post("/delete", response_model=AccountApiResponse)
def delete_account(request: AccountDeleteRequest, db: Session = Depends(get_db)): def delete_account(request: AccountDeleteRequest, db: Session = Depends(get_db)):
"""删除账号""" """删除账号"""
if not AccountService.delete_account(db, request.account_id): if not AccountService.delete_account(db, request.account_id):
return AccountApiResponse( return AccountApiResponse(
@@ -124,3 +130,101 @@ def delete_account(request: AccountDeleteRequest, db: Session = Depends(get_db))
message="账号删除成功", message="账号删除成功",
data=None data=None
) )
# 获取今日上传的数据接口
@router.post("/get_uplaod_data", response_model=AccountListResponse)
def get_uplaod_data(request: AccountGetRequest, db: Session = Depends(get_db)):
"""根据多种条件查询账号,并合并外部接口的 is_ok 字段(仅返回 today_data 中存在的账号)"""
# 1. 从数据库查询账号列表(原有逻辑不变)
accounts = AccountService.search_accounts(
db,
account_id=request.account_id,
username=request.username,
project_name=request.project_name,
status=request.status,
today_updated=request.today_updated,
yh_id=request.yh_id,
cl_name=request.cl_name
)
# 2. 调用外部接口构建 user_is_ok_map替换为 urllib 实现,修复命名冲突和超时异常)
user_is_ok_map = {}
api_url = "https://engineering.yuxindazhineng.com/index/index/get_over_data"
payload = {"user_id": "68c0dbfdb7cbcd616e7c5ab5"}
try:
# 步骤1将表单 payload 转为 URL 编码的字节流urllib POST 要求数据为字节流)
payload_bytes = urlencode(payload).encode("utf-8")
# 步骤2构造 Request 对象(使用重命名后的 urllib_request避免冲突
req = urllib_request.Request(
url=api_url,
data=payload_bytes,
method="POST" # 显式指定 POST 方法
)
# 步骤3发送请求并读取响应设置 10 秒超时,避免接口挂起)
with urllib_request.urlopen(req, timeout=10) as resp:
# 读取响应字节流并解码为 UTF-8 字符串
response_str = resp.read().decode("utf-8")
# 步骤4将 JSON 字符串解析为字典
api_response = json.loads(response_str)
# 步骤5验证接口返回格式并构建 user_is_ok_map 映射
if api_response.get("code") == 0:
today_data = api_response.get("data", []) # 给默认值,避免 KeyError
for item in today_data:
# 安全获取字段并校验类型
if 'user_name' in item and 'is_ok' in item:
user_is_ok_map[item['user_name']] = item['is_ok']
except HTTPError as e:
# 捕获 HTTP 状态码错误4xx/5xx
print(f"外部接口 HTTP 错误:{e.code} - {e.reason}")
except TimeoutError: # 修复:正确的超时异常名称(首字母大写,原 timeout 会未定义报错)
# 捕获请求超时异常
print(f"外部接口调用超时(超过 10 秒)")
except URLError as e:
# 捕获 URL 解析错误、网络连接错误等
print(f"外部接口网络错误:{e.reason}")
except json.JSONDecodeError:
# 捕获非合法 JSON 格式响应
print(f"外部接口返回数据格式错误,非合法 JSON 字符串")
except Exception as e:
# 捕获其他未知异常
print(f"外部接口处理未知异常:{str(e)}")
# 3. 关键修改:仅保留 today_data 中存在的账号(核心过滤逻辑)
accounts_with_is_ok = []
for account in accounts:
# 步骤1将 AccountResponse 对象转为字典Pydantic 模型自带 dict() 方法)
account_dict = account.dict()
# 步骤2获取当前账号 username判断是否在 user_is_ok_map 中(不存在则跳过)
current_username = account_dict.get("username", "")
if current_username not in user_is_ok_map:
continue # 核心:过滤掉 today_data 中没有的账号
# 步骤3给字典添加 is_ok 字段(仅处理存在的账号,无需默认值 0
account_dict['is_ok'] = user_is_ok_map[current_username]
# 步骤4将处理后的字典加入新列表
accounts_with_is_ok.append(account_dict)
# 4. 处理空结果返回(原有逻辑不变)
if not accounts_with_is_ok:
return AccountListResponse(
code=ResponseCode.ACCOUNT_NOT_FOUND,
message=ResponseMessage.ACCOUNT_NOT_FOUND,
total=0,
data=[]
)
# 5. 正常返回:数据传入新列表 accounts_with_is_ok仅包含 today_data 中的账号)
# print(accounts_with_is_ok)
return AccountListResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(accounts_with_is_ok),
data=accounts_with_is_ok
)

View File

@@ -18,6 +18,7 @@ from ..schemas.comprehensive_data import (
SettlementDataCheckpointQueryRequest, SettlementDataCheckpointQueryRequest,
LevelDataQueryRequest, LevelDataQueryRequest,
LinecodeRequest, LinecodeRequest,
LinecodeAccountRequest,
NYIDRequest, NYIDRequest,
SectionByAccountRequest, SectionByAccountRequest,
PointByAccountRequest, PointByAccountRequest,
@@ -30,6 +31,8 @@ from ..services.settlement_data import SettlementDataService
from ..services.level_data import LevelDataService from ..services.level_data import LevelDataService
from ..services.original_data import OriginalDataService from ..services.original_data import OriginalDataService
from ..services.comprehensive import ComprehensiveDataService from ..services.comprehensive import ComprehensiveDataService
from ..services.account import AccountService
from ..utils.get_operating_mode import OperatingModePredictor
import logging import logging
router = APIRouter(prefix="/comprehensive_data", tags=["综合数据管理"]) router = APIRouter(prefix="/comprehensive_data", tags=["综合数据管理"])
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -470,12 +473,14 @@ def get_settlement_by_linecode(
settlement_service = SettlementDataService() settlement_service = SettlementDataService()
result = settlement_service.get_settlement_by_linecode(db, linecode) result = settlement_service.get_settlement_by_linecode(db, linecode)
settlement_data = result['settlement_data']
predictor = OperatingModePredictor()
result_1d = predictor.predict(settlement_data)
return DataResponse( return DataResponse(
code=ResponseCode.SUCCESS, code=ResponseCode.SUCCESS,
message=f"查询成功,共获取{len(result['settlement_data'])}条沉降数据", message=f"查询成功,共获取{len(result['settlement_data'])}条沉降数据",
total=len(result['settlement_data']), total=len(result_1d),
data=result['settlement_data'] data=result_1d
) )
except Exception as e: except Exception as e:
@@ -608,8 +613,8 @@ def refresh_today_data(request: TodayDataRequest, db: Session = Depends(get_db))
return DataResponse( return DataResponse(
code=ResponseCode.QUERY_FAILED, code=ResponseCode.QUERY_FAILED,
message=f"定时任务触发失败:{str(e)}", message=f"定时任务触发失败:{str(e)}",
total=len(daily_data), total=0,
data={} data=[]
) )
# account_id获取所有断面数据 # account_id获取所有断面数据
@router.post("/get_all_section_by_account", response_model=DataResponse) @router.post("/get_all_section_by_account", response_model=DataResponse)
@@ -724,3 +729,66 @@ def get_checkpoint_by_point(request: LevelDataQueryRequest, db: Session = Depend
total=0, total=0,
data=[] data=[]
) )
@router.post("/get_accounts_by_linecode", response_model=DataResponse)
def get_accounts_by_linecode(request: LinecodeAccountRequest, db: Session = Depends(get_db)):
"""
通过水准线路编码查询账号信息
业务逻辑:
1. 根据linecode在水准数据表查询最新的NYID
2. 根据NYID在沉降数据表查询所有point_id去重
3. 根据point_id在观测点表查询所有section_id去重
4. 根据section_id在断面表查询所有account_id去重
5. 根据account_id在账号表查询账号信息并返回
优化使用批量IN查询避免循环查询数据库
"""
try:
linecode = request.linecode
logger.info(f"接口请求根据linecode={linecode}查询账号信息")
# 调用服务层方法
accounts = AccountService.get_accounts_by_linecode(db, linecode)
if not accounts:
return DataResponse(
code=ResponseCode.SUCCESS,
message=f"未找到linecode={linecode}对应的账号信息",
total=0,
data=[]
)
# 转换为字典列表
account_data = []
for account in accounts:
account_dict = {
"id": account.id,
"username": account.username,
"status": account.status,
"today_updated": account.today_updated,
"project_name": account.project_name,
"created_at": account.created_at.strftime("%Y-%m-%d %H:%M:%S") if account.created_at else None,
"updated_at": account.updated_at.strftime("%Y-%m-%d %H:%M:%S") if account.updated_at else None,
"update_time": account.update_time,
"max_variation": account.max_variation,
"yh_id": account.yh_id,
"cl_name": account.cl_name
}
account_data.append(account_dict)
return DataResponse(
code=ResponseCode.SUCCESS,
message=f"查询成功,共获取{len(account_data)}个账号",
total=len(account_data),
data=account_data
)
except Exception as e:
logger.error(f"查询账号信息失败:{str(e)}", exc_info=True)
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"查询失败:{str(e)}",
total=0,
data=[]
)

View File

@@ -8,7 +8,11 @@ from ..schemas.level_data import (
LevelDataListResponse, LevelDataListResponse,
LevelDataResponse, LevelDataResponse,
BatchDeleteByLinecodesRequest, BatchDeleteByLinecodesRequest,
BatchDeleteByLinecodesResponse BatchDeleteByLinecodesResponse,
LinecodeRequest,
NyidListResponse,
SyncLoseDataRequest,
SyncLoseDataResponse,
) )
from ..services.level_data import LevelDataService from ..services.level_data import LevelDataService
@@ -36,6 +40,31 @@ def get_level_data_by_project(request: LevelDataRequest, db: Session = Depends(g
) )
@router.post("/get_nyids_by_linecode", response_model=NyidListResponse)
def get_nyids_by_linecode(request: LinecodeRequest, db: Session = Depends(get_db)):
"""
通过水准线路编码返回该线路下所有 NYID只返回 NYID 列表(去重、按 NYID 降序)
"""
try:
level_service = LevelDataService()
level_list = level_service.get_by_linecode(db, linecode=request.linecode)
nyids = sorted(
{str(item.NYID) for item in level_list if item.NYID},
key=lambda x: int(x) if str(x).isdigit() else 0,
reverse=True,
)
return NyidListResponse(
code=ResponseCode.SUCCESS,
message=ResponseMessage.SUCCESS,
data=nyids,
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"查询失败: {str(e)}",
)
@router.post("/batch_delete_by_linecodes", response_model=BatchDeleteByLinecodesResponse) @router.post("/batch_delete_by_linecodes", response_model=BatchDeleteByLinecodesResponse)
def batch_delete_by_linecodes(request: BatchDeleteByLinecodesRequest, db: Session = Depends(get_db)): def batch_delete_by_linecodes(request: BatchDeleteByLinecodesRequest, db: Session = Depends(get_db)):
""" """
@@ -59,3 +88,39 @@ def batch_delete_by_linecodes(request: BatchDeleteByLinecodesRequest, db: Sessio
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"批量删除失败: {str(e)}" detail=f"批量删除失败: {str(e)}"
) )
@router.post("/sync_lose_data", response_model=SyncLoseDataResponse)
def sync_lose_data(
request: SyncLoseDataRequest = SyncLoseDataRequest(),
db: Session = Depends(get_db),
):
"""
同步缺失数据到 lose_data 表。
- 不传 linecode按所有水准线路的 NYID 计算缺失并写入,仅返回是否处理成功。
- 传 linecode只处理该水准线路并返回该线路的缺失数据记录列表。
缺失规则:原始数据无=1、沉降数据无=2lose_data 为二者之和0/1/2/3
"""
try:
level_service = LevelDataService()
result = level_service.sync_lose_data(db, linecode=request.linecode)
if result.get("data") is None:
data = {"success": result["success"]}
if not result["success"]:
return SyncLoseDataResponse(
code=1,
message=result.get("message", "处理失败"),
data=data,
)
else:
data = result["data"]
return SyncLoseDataResponse(
code=0 if result["success"] else 1,
message=ResponseMessage.SUCCESS if result["success"] else (result.get("message") or "处理失败"),
data=data,
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"同步缺失数据失败: {str(e)}",
)

View File

@@ -235,10 +235,11 @@ def receive_after_cursor_execute(conn, cursor, statement, params, context, execu
log_transaction_end(success=True) log_transaction_end(success=True)
@event.listens_for(Engine, "handle_error") @event.listens_for(Engine, "handle_error")
def receive_handle_error(exception, context): def receive_handle_error(context):
"""错误监听""" """错误监听SQLAlchemy 只传入一个 ExceptionContext 参数"""
error_msg = str(exception) exception = getattr(context, "original_exception", None) or getattr(context, "sqlalchemy_exception", None)
sql = context.statement if context and hasattr(context, 'statement') else None error_msg = str(exception) if exception else str(context)
sql = getattr(context, "statement", None)
log_connection_error(error_msg, sql) log_connection_error(error_msg, sql)
log_transaction_end(success=False, error=error_msg) log_transaction_end(success=False, error=error_msg)

View File

@@ -17,6 +17,10 @@ class Account(Base):
max_variation = Column(Integer, default=1, comment="变化量的绝对值,单位是毫米") max_variation = Column(Integer, default=1, comment="变化量的绝对值,单位是毫米")
yh_id = Column(String(1000), comment="宇恒一号用户id") yh_id = Column(String(1000), comment="宇恒一号用户id")
cl_name = Column(String(100), nullable=True, comment="测量人员") cl_name = Column(String(100), nullable=True, comment="测量人员")
device_name = Column(String(1000), comment="设备名称")
device_port = Column(String(1000), comment="设备端口")
device_ip = Column(String(1000), comment="设备局域网内ip地址")
is_ok = Column(Integer, default=0, comment="是否可以上传")
# 模型转字典 # 模型转字典

View File

@@ -0,0 +1,10 @@
from sqlalchemy import Column, Integer, String
from ..core.database import Base
class ErrorLinecode(Base):
"""推理推不出来的水准线路(如全为冬休),记录到该表。数据库表 id 必须为 AUTO_INCREMENT。"""
__tablename__ = "error_linecode"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
linecode = Column(String(255), nullable=False, comment="水准线路编码", index=True)

22
app/models/lose_data.py Normal file
View File

@@ -0,0 +1,22 @@
from sqlalchemy import Column, Integer, String
from ..core.database import Base
class LoseData(Base):
"""缺失数据记录表:记录各水准线路(期数)的原始/沉降数据缺失情况"""
__tablename__ = "lose_data"
__table_args__ = {"extend_existing": True}
id = Column(Integer, primary_key=True, index=True, autoincrement=True, comment="ID")
account_id = Column(Integer, nullable=False, comment="账户id", index=True)
NYID = Column(String(100), nullable=False, comment="期数ID", index=True)
linecode = Column(String(255), nullable=False, default="0", comment="水准线路编码", index=True)
lose_data = Column(Integer, nullable=False, default=0, comment="缺失的数据默认是0")
section_id = Column(String(255), nullable=True, comment="所属断面id")
point_id = Column(String(100), nullable=False, comment="测点ID")
def to_dict(self):
return {
column.name: getattr(self, column.name)
for column in self.__table__.columns
}

View File

@@ -12,6 +12,10 @@ class AccountBase(BaseModel):
max_variation: Optional[int] = None max_variation: Optional[int] = None
yh_id: Optional[str] = None yh_id: Optional[str] = None
cl_name: Optional[str] = None cl_name: Optional[str] = None
device_name: Optional[str] = None
device_port: Optional[str] = None
device_ip: Optional[str] = None
is_ok: Optional[int] = None
class AccountCreate(AccountBase): class AccountCreate(AccountBase):
pass pass
@@ -24,6 +28,10 @@ class AccountUpdate(BaseModel):
project_name: Optional[str] = None project_name: Optional[str] = None
update_time: Optional[str] = None update_time: Optional[str] = None
cl_name: Optional[str] = None cl_name: Optional[str] = None
device_name: Optional[str] = None
device_port: Optional[str] = None
device_ip: Optional[str] = None
is_ok: Optional[int] = None
class AccountResponse(AccountBase): class AccountResponse(AccountBase):
account_id: int account_id: int
@@ -49,7 +57,11 @@ class AccountResponse(AccountBase):
update_time=account.update_time, update_time=account.update_time,
max_variation=account.max_variation, max_variation=account.max_variation,
yh_id=account.yh_id, yh_id=account.yh_id,
cl_name=account.cl_name cl_name=account.cl_name,
device_name=account.device_name,
device_port=account.device_port,
device_ip=account.device_ip,
is_ok = account.is_ok
) )
class AccountListRequest(BaseModel): class AccountListRequest(BaseModel):
@@ -91,3 +103,4 @@ class AccountListResponse(BaseModel):
message: str message: str
total: int total: int
data: List[AccountResponse] = [] data: List[AccountResponse] = []

View File

@@ -290,6 +290,10 @@ class ComprehensiveDataImportRequest(BaseModel):
data: Dict[str, Any] data: Dict[str, Any]
class LinecodeRequest(BaseModel): class LinecodeRequest(BaseModel):
linecode: str linecode: str
class LinecodeAccountRequest(BaseModel):
linecode: str
class ComprehensiveDataImportResponse(BaseModel): class ComprehensiveDataImportResponse(BaseModel):
success: bool success: bool
message: str message: str

View File

@@ -52,3 +52,40 @@ class BatchDeleteByLinecodesResponse(BaseModel):
success: bool success: bool
backup_file: Optional[str] = None backup_file: Optional[str] = None
deleted_counts: Optional[dict] = None deleted_counts: Optional[dict] = None
class LinecodeRequest(BaseModel):
"""按水准线路编码查询请求"""
linecode: str = Field(..., description="水准线路编码")
class NyidListResponse(BaseModel):
"""仅返回 NYID 列表的响应"""
code: int = 0
message: str
data: List[str] = Field(default_factory=list, description="NYID 列表")
class SyncLoseDataRequest(BaseModel):
"""同步缺失数据请求:不传 linecode 表示全量同步,传则只处理该线路"""
linecode: Optional[str] = Field(None, description="水准线路编码,不传则处理全部线路")
class LoseDataItem(BaseModel):
"""lose_data 表单条记录"""
id: int
account_id: int
NYID: str
linecode: str
lose_data: int
section_id: Optional[str] = None
point_id: str = ""
model_config = ConfigDict(from_attributes=True)
class SyncLoseDataResponse(BaseModel):
"""同步缺失数据响应:全量时 data 为是否成功,单线路时 data 为该线路缺失记录列表"""
code: int = 0
message: str
data: Any = None # 全量时为 {"success": bool},单线路时为 List[LoseDataItem]

View File

@@ -103,3 +103,84 @@ class AccountService:
db.commit() db.commit()
return True return True
return False return False
@staticmethod
def get_accounts_by_linecode(db: Session, linecode: str) -> List[Account]:
"""
通过水准线路编码查询账号信息(优化版,减少数据库查询次数)
业务逻辑:
1. 根据linecode在水准数据表查询最新的NYID
2. 根据NYID在沉降数据表批量查询所有point_id去重
3. 根据point_id列表在观测点表批量查询所有section_id去重
4. 根据section_id列表在断面表批量查询所有account_id去重
5. 根据account_id列表在账号表批量查询账号信息
使用IN查询避免循环大幅提升性能
"""
from ..models.level_data import LevelData
from ..models.settlement_data import SettlementData
from ..models.checkpoint import Checkpoint
from ..models.section_data import SectionData
try:
logger.info(f"开始通过linecode={linecode}查询账号信息")
# 1. 根据linecode查询最新的水准数据按NYID降序取第一条
level_data = db.query(LevelData).filter(
LevelData.linecode == linecode
).order_by(LevelData.NYID.desc()).first()
if not level_data:
logger.warning(f"未找到linecode={linecode}对应的水准数据")
return []
nyid = level_data.NYID
logger.info(f"找到最新期数NYID={nyid}")
# 2. 根据NYID批量查询沉降数据提取所有point_id去重
settlement_list = db.query(SettlementData.point_id).filter(
SettlementData.NYID == nyid
).distinct().all()
if not settlement_list:
logger.warning(f"未找到NYID={nyid}对应的沉降数据")
return []
point_ids = [s.point_id for s in settlement_list if s.point_id]
logger.info(f"找到{len(point_ids)}个观测点ID")
# 3. 根据point_id列表批量查询观测点数据提取所有section_id去重
checkpoint_list = db.query(Checkpoint.section_id).filter(
Checkpoint.point_id.in_(point_ids)
).distinct().all()
if not checkpoint_list:
logger.warning(f"未找到对应的观测点数据")
return []
section_ids = [c.section_id for c in checkpoint_list if c.section_id]
logger.info(f"找到{len(section_ids)}个断面ID")
# 4. 根据section_id列表批量查询断面数据提取所有account_id去重
section_list = db.query(SectionData.account_id).filter(
SectionData.section_id.in_(section_ids)
).distinct().all()
if not section_list:
logger.warning(f"未找到对应的断面数据")
return []
account_ids = [s.account_id for s in section_list if s.account_id]
logger.info(f"找到{len(account_ids)}个账号ID")
# 5. 根据account_id列表批量查询账号信息
accounts = db.query(Account).filter(
Account.id.in_(account_ids)
).all()
logger.info(f"查询完成,共找到{len(accounts)}个账号")
return accounts
except Exception as e:
logger.error(f"通过linecode={linecode}查询账号失败: {str(e)}", exc_info=True)
raise e

View File

@@ -115,11 +115,8 @@ class DailyDataService(BaseService[DailyData]):
# 模型字段列表 # 模型字段列表
model_columns = [getattr(SettlementData, col.name) for col in SettlementData.__table__.columns] model_columns = [getattr(SettlementData, col.name) for col in SettlementData.__table__.columns]
# 基础条件 # 基础条件:不按 useflag 过滤,确保能取到每个 point 的真正最新一期(按 NYID 最大)
base_conditions = [ base_conditions = []
SettlementData.useflag.isnot(None),
SettlementData.useflag != 0
]
if point_ids: if point_ids:
base_conditions.append(SettlementData.point_id.in_(point_ids)) base_conditions.append(SettlementData.point_id.in_(point_ids))
@@ -354,4 +351,5 @@ class DailyDataService(BaseService[DailyData]):
raise raise
except Exception as e: except Exception as e:
logger.error(f"生成 daily 数据失败:{str(e)}", exc_info=True) logger.error(f"生成 daily 数据失败:{str(e)}", exc_info=True)
raise raise

View File

@@ -1,12 +1,13 @@
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy import text, inspect from sqlalchemy import text, inspect
from typing import List, Optional, Dict, Any from typing import List, Optional, Dict, Any, Tuple
from ..models.level_data import LevelData from ..models.level_data import LevelData
from .base import BaseService from .base import BaseService
from ..models.settlement_data import SettlementData from ..models.settlement_data import SettlementData
from ..models.checkpoint import Checkpoint from ..models.checkpoint import Checkpoint
from ..models.section_data import SectionData from ..models.section_data import SectionData
from ..models.account import Account from ..models.account import Account
from ..models.lose_data import LoseData
from ..core.database import engine from ..core.database import engine
import logging import logging
import os import os
@@ -30,6 +31,12 @@ class LevelDataService(BaseService[LevelData]):
"""根据水准线路编码获取水准数据""" """根据水准线路编码获取水准数据"""
return self.get_by_field(db, "linecode", linecode) return self.get_by_field(db, "linecode", linecode)
def get_last_by_linecode(self, db: Session, linecode: str) -> Optional[LevelData]:
"""根据水准线路编码获取最新的水准数据按NYID降序"""
return db.query(LevelData).filter(
LevelData.linecode == linecode
).order_by(LevelData.NYID.desc()).first()
def search_level_data(self, db: Session, def search_level_data(self, db: Session,
id: Optional[str] = None, id: Optional[str] = None,
linecode: Optional[str] = None, linecode: Optional[str] = None,
@@ -458,6 +465,137 @@ class LevelDataService(BaseService[LevelData]):
return original_data_map return original_data_map
def _get_nyids_with_settlement(self, db: Session, nyid_list: List[str]) -> set:
"""返回在沉降表中存在记录的 NYID 集合"""
if not nyid_list:
return set()
rows = db.query(SettlementData.NYID).filter(SettlementData.NYID.in_(nyid_list)).distinct().all()
return {r[0] for r in rows if r[0]}
def _get_nyid_to_all_points_account_section(
self, db: Session, nyid_list: List[str]
) -> Dict[str, List[Tuple[str, int, Optional[str]]]]:
"""通过沉降 -> 观测点 -> 断面 得到每个 NYID 对应的【所有】测点列表 [(point_id, account_id, section_id), ...],无沉降的 NYID 返回 [('', 0, None)]"""
if not nyid_list:
return {}
default_list = [("", 0, None)]
settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all()
# 每个 NYID 对应该期数下所有出现过的 (point_id),去重但保留顺序
nyid_to_points: Dict[str, List[Tuple[str, int, Optional[str]]]] = {}
point_ids = list({s.point_id for s in settlements if s.point_id})
if not point_ids:
return {nyid: default_list for nyid in nyid_list}
checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_ids)).all()
point_to_section = {c.point_id: (c.section_id or None) for c in checkpoints}
section_ids = list({c.section_id for c in checkpoints if c.section_id})
point_to_account: Dict[str, int] = {}
if section_ids:
sections = db.query(SectionData).filter(SectionData.section_id.in_(section_ids)).all()
for c in checkpoints:
sec = next((s for s in sections if s.section_id == c.section_id), None)
if sec and sec.account_id is not None:
try:
point_to_account[c.point_id] = int(sec.account_id)
except (ValueError, TypeError):
point_to_account[c.point_id] = 0
# 按 NYID 分组,每个 NYID 下该期数出现的所有 point_id去重
for nyid in nyid_list:
nyid_to_points[nyid] = []
seen_per_nyid: Dict[str, set] = {nyid: set() for nyid in nyid_list}
for s in settlements:
pt_id = (s.point_id or "") if s.point_id else ""
if pt_id not in seen_per_nyid.get(s.NYID, set()):
seen_per_nyid[s.NYID].add(pt_id)
acc = point_to_account.get(s.point_id, 0)
sec_id = point_to_section.get(s.point_id)
nyid_to_points[s.NYID].append((pt_id, acc, sec_id))
for nyid in nyid_list:
if not nyid_to_points[nyid]:
nyid_to_points[nyid] = default_list
return nyid_to_points
def _sync_lose_data_for_one_linecode(
self,
db: Session,
linecode_val: str,
level_list: List[LevelData],
) -> None:
"""仅处理一个水准线路编码:查该线路的 NYID查沉降/原始,写入 lose_data。"""
pairs = [(item.linecode, str(item.NYID)) for item in level_list if item.NYID]
if not pairs:
return
nyid_list = list({nyid for _, nyid in pairs})
settlement_nyids = self._get_nyids_with_settlement(db, nyid_list)
original_data_map = self._find_original_data_by_nyids(db, nyid_list)
original_nyids = set()
for rows in original_data_map.values():
for row in rows:
n = row.get("NYID")
if n is not None:
original_nyids.add(str(n))
nyid_to_points_asp = self._get_nyid_to_all_points_account_section(db, nyid_list)
for linecode_, nyid in pairs:
has_original = nyid in original_nyids
has_settlement = nyid in settlement_nyids
lose_val = (0 if has_original else 1) + (0 if has_settlement else 2)
points_list = nyid_to_points_asp.get(nyid, [("", 0, None)])
for point_id, acc_id, sec_id in points_list:
pt_id = point_id or ""
existing = db.query(LoseData).filter(
LoseData.linecode == linecode_,
LoseData.NYID == nyid,
LoseData.point_id == pt_id,
).first()
if existing:
existing.lose_data = lose_val
existing.account_id = acc_id
existing.section_id = sec_id
else:
db.add(LoseData(
account_id=acc_id,
NYID=nyid,
linecode=linecode_,
lose_data=lose_val,
section_id=sec_id,
point_id=pt_id,
))
def sync_lose_data(self, db: Session, linecode: Optional[str] = None) -> Dict[str, Any]:
"""
同步缺失数据到 lose_data 表。
无 linecode按「每个水准线路编码」分批处理每批只查该线路的 NYID 再查沉降/原始并插入,不返回明细。
有 linecode只处理该线路并返回该线路在 lose_data 中的记录列表。
缺失规则:原始数据无=1、有=0沉降数据无=2、有=0lose_data 字段为二者之和 0/1/2/3。
同一 (linecode, NYID) 不重复插入,存在则更新。
"""
try:
if linecode:
level_list = self.get_by_linecode(db, linecode=linecode)
if not level_list:
return {"success": True, "data": []}
self._sync_lose_data_for_one_linecode(db, linecode, level_list)
db.commit()
records = db.query(LoseData).filter(LoseData.linecode == linecode).order_by(LoseData.NYID.desc()).all()
return {"success": True, "data": [r.to_dict() for r in records]}
# 全量:先取所有不重复的 linecode再按每个 linecode 分批处理
linecode_rows = db.query(LevelData.linecode).distinct().all()
linecodes = [r[0] for r in linecode_rows if r[0]]
if not linecodes:
return {"success": True, "data": None}
for lc in linecodes:
level_list = self.get_by_linecode(db, linecode=lc)
self._sync_lose_data_for_one_linecode(db, lc, level_list)
db.commit()
logger.info(f"sync_lose_data 已处理线路: {lc}")
return {"success": True, "data": None}
except Exception as e:
db.rollback()
logger.error(f"sync_lose_data 失败: {str(e)}", exc_info=True)
return {"success": False, "data": [] if linecode else None, "message": str(e)}
def _backup_data_to_sql(self, db: Session, level_data_list: List[LevelData], def _backup_data_to_sql(self, db: Session, level_data_list: List[LevelData],
settlement_list: List[SettlementData], settlement_list: List[SettlementData],
checkpoint_list: List[Checkpoint], checkpoint_list: List[Checkpoint],

View File

@@ -30,6 +30,21 @@ class SettlementDataService(BaseService[SettlementData]):
def get_by_nyid(self, db: Session, nyid: str) -> List[SettlementData]: def get_by_nyid(self, db: Session, nyid: str) -> List[SettlementData]:
"""根据期数ID获取沉降数据""" """根据期数ID获取沉降数据"""
return self.get_by_field(db, "NYID", nyid) return self.get_by_field(db, "NYID", nyid)
def get_one_dict_by_nyid(self, db: Session, nyid: str) -> Optional[Dict[str, Any]]:
"""根据期数ID取一条沉降记录并转为字典供推理/到期计算用datetime 转为字符串)"""
row = db.query(SettlementData).filter(SettlementData.NYID == nyid).first()
if not row:
return None
field_names = [c.name for c in SettlementData.__table__.columns]
item = {}
for k in field_names:
v = getattr(row, k)
if isinstance(v, datetime):
item[k] = v.strftime("%Y-%m-%d %H:%M:%S")
else:
item[k] = v
return item
def get_by_nyid_and_point_id(self, db: Session, nyid: str, point_id: str) -> List[SettlementData]: def get_by_nyid_and_point_id(self, db: Session, nyid: str, point_id: str) -> List[SettlementData]:
"""根据期数ID和观测点ID获取沉降数据""" """根据期数ID和观测点ID获取沉降数据"""
return self.get_by_field(db, "NYID", nyid, "point_id", point_id) return self.get_by_field(db, "NYID", nyid, "point_id", point_id)
@@ -600,11 +615,120 @@ class SettlementDataService(BaseService[SettlementData]):
} }
# 根据水准线路编码获取最新的NYID并获取对应的测点数据 # 根据水准线路编码获取最新的NYID并获取对应的测点数据
# def get_settlement_by_linecode(
# self,
# db: Session,
# linecode: str,
# num: int = 1000 # 控制返回的期数默认1最新一期
# ) -> Dict:
# """
# 根据水准线路编码(linecode)查询对应沉降数据,支持按期数筛选
# 关联逻辑:
# LevelData.linecode → LevelData.NYID → SettlementData.NYID
# SettlementData.point_id字符串→ Checkpoint.point_id → Checkpoint.section_id → SectionData.section_id → SectionData.work_site
# :param db: 数据库会话
# :param linecode: 目标水准线路编码
# :param num: 返回的期数按NYID从大到小排序默认1最新一期
# :return: 字典格式,包含沉降数据列表,键为 "settlement_data"
# """
# try:
# logger.info(f"开始查询linecode={linecode}对应的沉降数据(取前{num}期)")
# # 1. 根据linecode查询水准线路表获取前N期的NYID
# nyid_query = db.query(LevelData.NYID)\
# .filter(LevelData.linecode == linecode)\
# .distinct()\
# .order_by(LevelData.NYID.desc())
# top_nyids = nyid_query.limit(num).all()
# if not top_nyids:
# logger.warning(f"未查询到linecode={linecode}对应的水准线路记录")
# return {"settlement_data": []}
# target_nyids = [item.NYID for item in top_nyids]
# # 2. 关联查询:沉降数据 → 观测点表 → 断面表新增查询Checkpoint.aname
# settlement_records = db.query(
# SettlementData,
# Checkpoint.section_id, # 从Checkpoint模型获取section_id
# Checkpoint.aname, # 新增从Checkpoint模型获取测点名称aname
# SectionData.work_site # 从SectionData模型获取work_site
# )\
# .join(
# Checkpoint, # 关联观测点模型(类名)
# SettlementData.point_id == Checkpoint.point_id, # 字符串类型匹配
# isouter=True # 左连接:避免测点未关联观测点时丢失数据
# )\
# .join(
# SectionData, # 关联断面模型(类名)
# Checkpoint.section_id == SectionData.section_id, # 字符串类型匹配
# isouter=True # 左连接避免断面ID未关联断面表时丢失数据
# )\
# .filter(SettlementData.NYID.in_(target_nyids))\
# .order_by(
# SettlementData.NYID.desc(), # 期数从新到旧
# SettlementData.MTIME_W.asc() # 同期内按观测时间升序
# )\
# .all()
# # 3. 转换数据并新增字段
# settlement_data = []
# for record in settlement_records:
# # 解析查询结果(元组:(沉降数据实例, section_id, aname, work_site)
# settlement, section_id, aname, work_site = record
# # 根据work_site判断work_type默认0表示未匹配或无数据 涵洞H 沉降板L 观测桩G和Z分标段 B 路基
# work_type = 0
# if work_site:
# work_site_str = str(work_site).strip() # 确保为字符串且去空格
# if "S" in aname:
# work_type = 1
# elif "L" in aname or "G" in aname or "Z" in aname or "B" in aname:
# work_type = 2
# elif "T" in aname or "D" in aname or "C " in aname:
# work_type = 3
# elif "H" in aname :
# work_type = 4
# # 组装返回字典新增aname字段
# record_dict = {
# "id": settlement.id,
# "point_id": settlement.point_id,
# "aname": aname, # 新增测点名称从Checkpoint表获取
# "section_id": section_id, # 新增观测点关联的断面ID
# "work_site": work_site, # 新增:断面的工点信息
# "work_type": work_type, # 新增工点类型编码1-隧道2-区间路基3-桥4-
# "CVALUE": settlement.CVALUE,
# "MAVALUE": settlement.MAVALUE,
# "MTIME_W": settlement.MTIME_W.strftime("%Y-%m-%d %H:%M:%S") if settlement.MTIME_W else None,
# "NYID": settlement.NYID,
# "PRELOADH": settlement.PRELOADH,
# "PSTATE": settlement.PSTATE,
# "REMARK": settlement.REMARK,
# "WORKINFO": settlement.WORKINFO,
# "createdate": settlement.createdate.strftime("%Y-%m-%d %H:%M:%S") if settlement.createdate else None,
# "day": settlement.day,
# "day_jg": settlement.day_jg,
# "isgzjdxz": settlement.isgzjdxz,
# "mavalue_bc": settlement.mavalue_bc,
# "mavalue_lj": settlement.mavalue_lj,
# "sjName": settlement.sjName,
# "useflag": settlement.useflag,
# "workinfoname": settlement.workinfoname,
# "upd_remark": settlement.upd_remark
# }
# settlement_data.append(record_dict)
# logger.info(f"查询完成linecode={linecode}的前{num}期对应{len(settlement_data)}条沉降数据")
# return {"settlement_data": settlement_data}
# except Exception as e:
# logger.error(f"查询linecode={linecode}的沉降数据失败:{str(e)}", exc_info=True)
# raise e
def get_settlement_by_linecode( def get_settlement_by_linecode(
self, self,
db: Session, db: Session,
linecode: str, linecode: str,
num: int = 1 # 控制返回的期数默认1(最新一期) num: int = 1000 # 控制返回的期数默认1000最新1000期)
) -> Dict: ) -> Dict:
""" """
根据水准线路编码(linecode)查询对应沉降数据,支持按期数筛选 根据水准线路编码(linecode)查询对应沉降数据,支持按期数筛选
@@ -613,8 +737,8 @@ class SettlementDataService(BaseService[SettlementData]):
SettlementData.point_id字符串→ Checkpoint.point_id → Checkpoint.section_id → SectionData.section_id → SectionData.work_site SettlementData.point_id字符串→ Checkpoint.point_id → Checkpoint.section_id → SectionData.section_id → SectionData.work_site
:param db: 数据库会话 :param db: 数据库会话
:param linecode: 目标水准线路编码 :param linecode: 目标水准线路编码
:param num: 返回的期数按NYID从大到小排序默认1(最新一期) :param num: 返回的期数按NYID从大到小排序默认1000最新1000期)
:return: 字典格式,包含沉降数据列表,键为 "settlement_data" :return: 字典格式,包含分组后的沉降数据嵌套列表,键为 "settlement_data"
""" """
try: try:
logger.info(f"开始查询linecode={linecode}对应的沉降数据(取前{num}期)") logger.info(f"开始查询linecode={linecode}对应的沉降数据(取前{num}期)")
@@ -656,8 +780,8 @@ class SettlementDataService(BaseService[SettlementData]):
)\ )\
.all() .all()
# 3. 转换数据并新增字段 # 3. 转换数据并新增字段(先组装成原始记录列表)
settlement_data = [] raw_settlement_data = []
for record in settlement_records: for record in settlement_records:
# 解析查询结果(元组:(沉降数据实例, section_id, aname, work_site) # 解析查询结果(元组:(沉降数据实例, section_id, aname, work_site)
settlement, section_id, aname, work_site = record settlement, section_id, aname, work_site = record
@@ -701,10 +825,28 @@ class SettlementDataService(BaseService[SettlementData]):
"workinfoname": settlement.workinfoname, "workinfoname": settlement.workinfoname,
"upd_remark": settlement.upd_remark "upd_remark": settlement.upd_remark
} }
settlement_data.append(record_dict) raw_settlement_data.append(record_dict)
logger.info(f"查询完成linecode={linecode}的前{num}期对应{len(settlement_data)}条沉降数据") # 4. 按point_id分组且每个分组内按NYID降序排序核心修改部分
return {"settlement_data": settlement_data} point_group_dict = {}
for item in raw_settlement_data:
point_id = item["point_id"]
# 若该point_id未在字典中初始化空列表
if point_id not in point_group_dict:
point_group_dict[point_id] = []
# 将当前记录添加到对应point_id的列表中
point_group_dict[point_id].append(item)
# 对每个分组内的记录按NYID降序排序保障大的NYID在第一个
for point_id, item_list in point_group_dict.items():
# 按NYID降序排序NYID是数字直接比较即可
item_list.sort(key=lambda x: x["NYID"], reverse=True)
# 5. 提取字典的值,组成最终的嵌套列表
final_settlement_data = list(point_group_dict.values())
logger.info(f"查询完成linecode={linecode}的前{num}期对应{len(raw_settlement_data)}条沉降数据,共{len(final_settlement_data)}个不同测点")
return {"settlement_data": final_settlement_data}
except Exception as e: except Exception as e:
logger.error(f"查询linecode={linecode}的沉降数据失败:{str(e)}", exc_info=True) logger.error(f"查询linecode={linecode}的沉降数据失败:{str(e)}", exc_info=True)

View File

@@ -5,53 +5,16 @@ import copy
# 注意:根据实际项目路径调整导入,若本地测试可注释掉 # 注意:根据实际项目路径调整导入,若本地测试可注释掉
from ..core.logging_config import get_logger from ..core.logging_config import get_logger
import json import json
from .operating_mode_config import BASE_PERIODS
logger = get_logger(__name__) logger = get_logger(__name__)
class ConstructionMonitorUtils: class ConstructionMonitorUtils:
def __init__(self): def __init__(self):
# 原始工况周期映射表(保持不变) # 使用公共配置的工况周期映射表
self.base_periods = { self.base_periods = BASE_PERIODS.copy()
"仰拱底板施工完成后第1个月": 7,
"仰拱底板施工完成后第2至3个月": 14,
"仰拱底板施工完成后3个月以后": 30,
"仰拱(底板)施工完成后第1个月": 7, # 原:仰拱(底板)施工完成后,第1个月
"仰拱(底板)施工完成后第2至3个月": 14, # 原:仰拱(底板)施工完成后,第2至3个月
"仰拱(底板)施工完成后3个月以后": 30, # 原:仰拱(底板)施工完成后,3个月以后
"无砟轨道铺设后第1至3个月": 30, # 原:无砟轨道铺设后,第1至3个月
"无砟轨道铺设后4至12个月": 90, # 原:无砟轨道铺设后,4至12个月
"无砟轨道铺设后12个月以后": 180, # 原:无砟轨道铺设后,12个月以后
"墩台施工到一定高度": 30, # 无格式差异,保留原样
"墩台混凝土施工": 30, # 无格式差异,保留原样
"预制梁桥,架梁前": 30, # 原:预制梁桥,架梁前
"预制梁桥,预制梁架设前": 1, # 原:预制梁桥,预制梁架设前
"预制梁桥,预制梁架设后": 7, # 原:预制梁桥,预制梁架设后
"桥位施工桥梁,制梁前": 30, # 原:桥位施工桥梁,制梁前
"桥位施工桥梁,上部结构施工中": 1, # 原:桥位施工桥梁,上部结构施工中
"架桥机(运梁车)通过": 7, # 无格式差异,保留原样
"桥梁主体工程完工后,第1至3个月": 7, # 原:桥梁主体工程完工后,第1至3个月
"桥梁主体工程完工后第4至6个月": 14, # 原:桥梁主体工程完工后,第4至6个月
"桥梁主体工程完工后,6个月以后": 30, # 原:桥梁主体工程完工后,6个月以后 ''
"轨道铺设期间,前": 30,
"轨道铺设期间,后": 14,
"轨道铺设完成后第1个月": 14,
"轨道铺设完成后2至3个月": 30,
"轨道铺设完成后4至12个月": 90,
"轨道铺设完成后12个月以后": 180,
"铺路或堆载,一般情况": 1,
"填筑或堆载,一般情况": 1,
"铺路或堆载,沉降量突变情况": 1,
"填筑或堆载,两次填筑间隔时间较长情况":3,
"铺路或堆载,两次铺路间隔时间较长情况": 3,
"堆载预压或路基填筑完成第1至3个月":7, # 原:堆载预压或路基铺路完成,第1至3个月
"堆载预压或路基填筑完成第4至6个月": 14, # 原:堆载预压或路基铺路完成,第4至6个月
"堆载预压或路基填筑完成6个月以后": 30, # 原:堆载预压或路基铺路完成,6个月以后
"架桥机(运梁车) 首次通过前": 1, # 原:架桥机(运梁车)首次通过前(仅加空格)
"架桥机(运梁车) 首次通过后前3天": 1, # 原:架桥机(运梁车)首次通过后,前3天
"架桥机(运梁车) 首次通过后": 7, # 原:架桥机(运梁车)首次通过后(仅加空格)
"轨道板(道床)铺设后第1个月": 14, # 原:轨道板(道床)铺设后,第1个月
"轨道板(道床)铺设后第2至3个月": 30, # 原:轨道板(道床)铺设后,第2至3个月
"轨道板(道床)铺设后3个月以后": 90 # 未出现在待处理集,保留原始格式
}
# 构建中英文括号+逗号兼容映射表 # 构建中英文括号+逗号兼容映射表
self.compatible_periods = self._build_compatible_brackets_map() self.compatible_periods = self._build_compatible_brackets_map()
@@ -102,7 +65,7 @@ class ConstructionMonitorUtils:
return compatible_map return compatible_map
def get_due_data(self, input_data: List[List[Dict]], start: int = 0, end: int = 0, current_date: datetime = None) -> Dict[str, List[Dict]]: def get_due_data(self, input_data: List[List[Dict]], start: int = 0, end: int = 0, current_date: datetime = None) -> Dict[str, List[Dict]]:
result = {"winter": [], "data": [], "error_data": []} result = {"winter": [], "data": [], "error_data": [], "error_linecodes": []}
if not input_data: if not input_data:
return result return result
@@ -112,17 +75,13 @@ class ConstructionMonitorUtils:
if not point_data: if not point_data:
continue continue
# 过滤逻辑:仅保留 useflag 存在且值≠0 的记录 # 推理用最新一期:取按 NYID 排序后的第一条(上游已保证倒序),不因 useflag 排除最新期
latest_item = point_data[0]
# 用于冬休回溯等:仅 useflag 有效的历史记录
filtered_point_data = [ filtered_point_data = [
item for item in point_data item for item in point_data
if "useflag" in item and item["useflag"] != 0 # 核心条件:字段存在 + 非0 if "useflag" in item and item["useflag"] != 0
] ]
# 过滤后无数据则跳过当前测点
if not filtered_point_data:
continue
# 使用过滤后的数据处理
latest_item = filtered_point_data[0]
latest_condition = latest_item.get("workinfoname") latest_condition = latest_item.get("workinfoname")
if not latest_condition: if not latest_condition:
result["error_data"].append(latest_item) result["error_data"].append(latest_item)
@@ -180,7 +139,12 @@ class ConstructionMonitorUtils:
continue continue
if not base_condition: if not base_condition:
# 当前为冬休或历史全是冬休 → 归入冬休;若本次是冬休且推不出,记入 error_linecodes 供写入 error_linecode 表
result["winter"].append(item_copy) result["winter"].append(item_copy)
if latest_condition == "冬休":
linecode = latest_item.get("__linecode")
if linecode and linecode not in result["error_linecodes"]:
result["error_linecodes"].append(linecode)
continue continue
# 核心修改:冬休回溯场景下调整测量间隔(基准周期) # 核心修改:冬休回溯场景下调整测量间隔(基准周期)

View File

@@ -0,0 +1,228 @@
from datetime import datetime, date
from .operating_mode_config import (
BASE_PERIODS,
OLD_TO_NEW_MAP,
CONDITION_GROUP,
TRANSITION_RULES,
WINTER_BREAK_LABELS,
)
class OperatingModePredictor:
"""
工况预测类(处理二维倒序数据,返回一维列表,仅保留各内嵌列表最新记录)
功能根据输入的带时间序列的工况数据推导每个监测点point_id的下一阶段工况
特性:
1. 输入为二维列表每个内嵌列表对应一个point_id且为倒序排列最新记录在索引0
2. 输出为一维列表,仅保留每个内嵌列表的最新记录,新增工况推导结果字段
3. 推导规则有等效映射返回新工况无等效保留旧工况切换下一工况以base_periods为准
4. 时间计算仅按日期(天)维度,忽略时分秒
5. 冬休场景:仅以冬休前上一有效工况为判断基准,切换规则与非冬休完全一致
6. 适配中英文括号、逗号、空格,内部标准化匹配,外部返回规范名称
"""
def __init__(self):
"""初始化类,加载核心配置"""
# 1. 基础工况配置(最终返回的规范名称,含所有新旧工况)
self.base_periods = BASE_PERIODS.copy()
# 2. 旧→新等效映射(优先返回新工况)
self.old_to_new_map = OLD_TO_NEW_MAP.copy()
# 3. 工况分组(同义工况归为同一分组,复用切换规则)
self.condition_group = CONDITION_GROUP.copy()
# 4. 切换触发规则(沿用原逻辑,触发天数+目标工况)
self.transition_rules = TRANSITION_RULES.copy()
# 5. 冬休标识
self.winter_break_labels = WINTER_BREAK_LABELS.copy()
# 辅助映射标准化名称→base_periods中的规范名称用于最终返回
self.std_to_canonical = {
self._standardize_name(name): name for name in self.base_periods.keys()
}
# 辅助映射标准化名称→分组ID
self.std_to_group = {
self._standardize_name(name): group_id
for name, group_id in self.condition_group.items()
}
def _standardize_name(self, name):
"""
标准化工况名称(内部匹配用):去空格、统一中英文符号
:param name: 原始工况名称
:return: 标准化后的名称
"""
if not name:
return ""
# 去所有空格
std_name = name.replace(" ", "").strip()
# 统一中英文符号
replace_map = {
"": "(", "": ")", "": ",", "": ",", "~": "",
"": ";", "": ":", "": "'", "": "\""
}
for old, new in replace_map.items():
std_name = std_name.replace(old, new)
return std_name
def _parse_to_date(self, time_str):
"""解析时间字符串为date对象仅保留年月日"""
if not time_str:
return None
try:
dt = datetime.strptime(str(time_str).strip(), "%Y-%m-%d %H:%M:%S")
return dt.date()
except ValueError:
return None
def _get_time_statistics(self, data, target_workinfo):
"""从倒序数据中提取目标工况的时间统计信息"""
# 筛选目标工况的有效记录
target_records = [
d for d in data
if self._standardize_name(d.get("workinfoname")) == self._standardize_name(target_workinfo)
and d.get("workinfoname") not in self.winter_break_labels
]
if not target_records:
return None, 0, 0
# 解析日期
target_dates = []
for item in target_records:
d = self._parse_to_date(item.get("MTIME_W"))
if d:
target_dates.append(d)
if not target_dates:
return None, 0, 0
# 计算时间差倒序数据最新在0最早在-1
last_date = target_dates[0]
first_date = target_dates[-1]
cumulative_days = (last_date - first_date).days
today = date.today()
days_to_today = (today - first_date).days if first_date else 0
return first_date, cumulative_days, days_to_today
def _get_pre_winter_break_workinfo(self, inner_data_list):
"""提取冬休前的上一个有效工况"""
if not inner_data_list:
return None
# 倒序遍历,跳过冬休,找第一个有效工况
for record in inner_data_list:
current_work = record.get("workinfoname")
if current_work and current_work not in self.winter_break_labels:
# 优先返回等效新工况
return self.old_to_new_map.get(current_work, current_work)
return None
def _validate_point_id(self, inner_list):
"""校验内嵌列表内point_id是否一致"""
if not inner_list:
return None
base_point_id = inner_list[0].get("point_id")
for item in inner_list:
if item.get("point_id") != base_point_id:
return None
return base_point_id
def predict(self, data_2d_list):
"""
公有核心方法:执行工况预测
:param data_2d_list: 二维倒序数据列表,格式 [[{},{},{}], [{},{},{}]]
:return: 一维结果列表,格式 [{}, {}, {}]
"""
final_result = []
for inner_data_list in data_2d_list:
if not isinstance(inner_data_list, list) or len(inner_data_list) == 0:
continue
# 取最新记录并复制(避免修改原数据)
latest_record = inner_data_list[0].copy()
point_id = self._validate_point_id(inner_data_list)
# 校验point_id
if not point_id:
latest_record.update({
"status": "fail",
"current_workinfo": None,
"first_measure_date": None,
"days_from_first_to_today": None,
"next_workinfo": None,
"judge_based_workinfo": None,
"error_msg": "point_id不一致或缺失"
})
final_result.append(latest_record)
continue
# 获取当前工况并标准化
current_workinfo = latest_record.get("workinfoname")
if not current_workinfo or self._standardize_name(current_workinfo) not in self.std_to_canonical:
latest_record.update({
"status": "fail",
"current_workinfo": None,
"first_measure_date": None,
"days_from_first_to_today": None,
"next_workinfo": None,
"judge_based_workinfo": None,
"error_msg": "工况无效或缺失"
})
final_result.append(latest_record)
continue
# 冬休逻辑:取冬休前工况作为判断基准
if current_workinfo in self.winter_break_labels:
judge_based_workinfo = self._get_pre_winter_break_workinfo(inner_data_list)
if not judge_based_workinfo:
latest_record.update({
"status": "fail",
"current_workinfo": current_workinfo,
"first_measure_date": None,
"days_from_first_to_today": None,
"next_workinfo": None,
"judge_based_workinfo": None,
"error_msg": "冬休前未找到有效工况"
})
final_result.append(latest_record)
continue
else:
# 非冬休:优先使用等效新工况作为判断基准
judge_based_workinfo = self.old_to_new_map.get(current_workinfo, current_workinfo)
# 计算时间统计信息
first_dt, cumulative_days, days_to_today = self._get_time_statistics(
inner_data_list, judge_based_workinfo
)
first_measure_date = first_dt.strftime("%Y-%m-%d") if first_dt else None
# 推导下一工况以base_periods中的名称为准
std_judge_work = self._standardize_name(judge_based_workinfo)
group_id = self.std_to_group.get(std_judge_work, "STATIC")
rule = self.transition_rules.get(group_id, {})
trigger_days = rule.get("trigger_days")
next_candidates = rule.get("next", [])
if trigger_days is not None and cumulative_days >= trigger_days and next_candidates:
# 切换工况取base_periods中的规范名称
next_workinfo = next_candidates[0]
else:
# 不切换:有等效则返回新工况,无则返回旧工况
next_workinfo = self.old_to_new_map.get(judge_based_workinfo, judge_based_workinfo)
# 组装最终结果
latest_record.update({
"status": "success",
"current_workinfo": current_workinfo, # 保留原始输入工况
"judge_based_workinfo": judge_based_workinfo, # 实际判断用的工况(新工况优先)
"first_measure_date": first_measure_date,
"days_from_first_to_today": days_to_today,
"next_workinfo": next_workinfo, # 推导结果:有等效返回新工况,无则返回旧工况
"point_id": point_id,
"error_msg": ""
})
final_result.append(latest_record)
return final_result

View File

@@ -0,0 +1,246 @@
# -*- coding: utf-8 -*-
"""
工况公共配置模块
供 get_operating_mode.py工况预测和 construction_monitor.py施工监测共用
"""
# ==================== 基础工况周期(天) ====================
# 含新旧工况及兼容变体,键为工况名称,值为测量间隔天数
BASE_PERIODS = {
# 路基工况(新工况)
"路基或预压土填筑,连续填筑": 1,
"路基或预压土填筑,两次填筑间隔时间较长": 7,
"预压土或路基填筑完成第1~3个月": 7,
"预压土或路基填筑完成第4~6个月": 14,
"预压土或路基填筑完成6个月以后": 30,
"架桥机(运梁车)首次通过前": 1,
"架桥机(运梁车)首次通过后前3天": 1,
"架桥机(运梁车)首次通过后": 7,
"轨道板(道床)铺设后第1至3个月": 14,
"轨道板(道床)铺设后第4至6个月": 30,
"轨道板(道床)铺设后6个月以后": 90,
# 路基旧工况
"填筑或堆载,一般情况": 1,
"填筑或堆载,两次填筑间隔时间较长情况": 7,
"堆载预压或路基填筑完成第1至3个月": 7,
"堆载预压或路基填筑完成第4至6个月": 14,
"堆载预压或路基填筑完成6个月以后": 30,
"轨道板(道床)铺设后第1个月": 14,
"轨道板(道床)铺设后第2至3个月": 30,
"轨道板(道床)铺设后3个月以后": 90,
# 路基兼容变体(铺路/填筑同义)
"铺路或堆载,一般情况": 1,
"铺路或堆载,沉降量突变情况": 1,
"铺路或堆载,两次铺路间隔时间较长情况": 3,
# 桥梁工况(新工况)
"桥墩(台)地面处拆模后": 30,
"墩身混凝土施工": 30,
"预制梁桥,预制梁架设前": 1,
"预制梁桥,预制梁架设后": 7,
"现浇梁,浇筑前": 30,
"现浇梁上部结构施工中": 1,
"架桥机(运梁车)通过": 2,
"桥梁主体工程完工后第1至3个月": 7,
"桥梁主体工程完工后第4至6个月": 14,
"桥梁主体工程完工后6个月以后": 30,
"轨道铺设,前": 30,
"轨道铺设,后": 14,
"轨道铺设完成后第1个月": 14,
"轨道铺设完成后2至3个月": 30,
"轨道铺设完成后4至12个月": 90,
"轨道铺设完成后12个月以后": 180,
# 桥梁旧工况
"墩台施工到一定高度": 30,
"墩台混凝土施工": 30,
"预制梁桥,架梁前": 30,
"桥位施工桥梁,制梁前": 30,
"桥位施工桥梁,上部结构施工中": 1,
"桥梁主体工程完工后,第1至3个月": 7,
"桥梁主体工程完工后,第4至6个月": 14,
"桥梁主体工程完工后,6个月以后": 30,
"轨道铺设期间,前": 30,
"轨道铺设期间,后": 14,
# 桥梁兼容变体
"架桥机(运梁车) 首次通过前": 1,
"架桥机(运梁车) 首次通过后前3天": 1,
"架桥机(运梁车) 首次通过后": 7,
"预制梁桥预制梁架设后": 7,
# 隧道工况
"仰拱(底板)施工完成后第1个月": 7,
"仰拱(底板)施工完成后第2至3个月": 14,
"仰拱(底板)施工完成后3个月以后": 30,
"无砟轨道铺设后第1至3个月": 30,
"无砟轨道铺设后4至12个月": 90,
"无砟轨道铺设后12个月以后": 180,
# 隧道兼容变体(中文括号)
"仰拱底板施工完成后第1个月": 7,
"仰拱底板施工完成后第2至3个月": 14,
"仰拱底板施工完成后3个月以后": 30,
# 其他特殊工况
"轨道板铺设前": 14,
"站场填方路基段填筑完成至静态验收": 14,
"特殊地段隧道施工完成后至静态验收": 14,
"冬休": 0,
}
# ==================== 旧→新等效映射(工况预测用) ====================
OLD_TO_NEW_MAP = {
# 路基等效
"填筑或堆载,一般情况": "路基或预压土填筑,连续填筑",
"填筑或堆载,两次填筑间隔时间较长情况": "路基或预压土填筑,两次填筑间隔时间较长",
"堆载预压或路基填筑完成第1至3个月": "预压土或路基填筑完成第1~3个月",
"堆载预压或路基填筑完成第4至6个月": "预压土或路基填筑完成第4~6个月",
"堆载预压或路基填筑完成6个月以后": "预压土或路基填筑完成6个月以后",
"轨道板(道床)铺设后第1个月": "轨道板(道床)铺设后第1至3个月",
"轨道板(道床)铺设后第2至3个月": "轨道板(道床)铺设后第4至6个月",
"轨道板(道床)铺设后3个月以后": "轨道板(道床)铺设后6个月以后",
"铺路或堆载,一般情况": "路基或预压土填筑,连续填筑",
"铺路或堆载,沉降量突变情况": "路基或预压土填筑,连续填筑",
"铺路或堆载,两次铺路间隔时间较长情况": "路基或预压土填筑,两次填筑间隔时间较长",
# 桥梁等效
"墩台施工到一定高度": "桥墩(台)地面处拆模后",
"墩台混凝土施工": "墩身混凝土施工",
"桥位施工桥梁,制梁前": "现浇梁,浇筑前",
"桥位施工桥梁,上部结构施工中": "现浇梁上部结构施工中",
"桥梁主体工程完工后,第1至3个月": "桥梁主体工程完工后第1至3个月",
"桥梁主体工程完工后,第4至6个月": "桥梁主体工程完工后第4至6个月",
"桥梁主体工程完工后,6个月以后": "桥梁主体工程完工后6个月以后",
"轨道铺设期间,前": "轨道铺设,前",
"轨道铺设期间,后": "轨道铺设,后",
"预制梁桥预制梁架设后": "预制梁桥,预制梁架设后",
"架桥机(运梁车) 首次通过前": "架桥机(运梁车)首次通过前",
"架桥机(运梁车) 首次通过后前3天": "架桥机(运梁车)首次通过后前3天",
"架桥机(运梁车) 首次通过后": "架桥机(运梁车)首次通过后",
}
# ==================== 工况分组(工况预测用) ====================
CONDITION_GROUP = {
# 路基分组
"路基或预压土填筑,连续填筑": "DZ_CONTINUE",
"路基或预压土填筑,两次填筑间隔时间较长": "DZ_INTERVAL",
"预压土或路基填筑完成第1~3个月": "DZ_FINISH_1_3",
"预压土或路基填筑完成第4~6个月": "DZ_FINISH_4_6",
"预压土或路基填筑完成6个月以后": "DZ_FINISH_AFTER_6",
"架桥机(运梁车)首次通过前": "JQJ_FIRST_BEFORE",
"架桥机(运梁车)首次通过后前3天": "JQJ_FIRST_AFTER_3D",
"架桥机(运梁车)首次通过后": "JQJ_FIRST_AFTER",
"轨道板(道床)铺设后第1至3个月": "GDB_FINISH_1_3",
"轨道板(道床)铺设后第4至6个月": "GDB_FINISH_4_6",
"轨道板(道床)铺设后6个月以后": "GDB_FINISH_AFTER_6",
# 路基旧工况分组
"填筑或堆载,一般情况": "DZ_CONTINUE",
"填筑或堆载,两次填筑间隔时间较长情况": "DZ_INTERVAL",
"堆载预压或路基填筑完成第1至3个月": "DZ_FINISH_1_3",
"堆载预压或路基填筑完成第4至6个月": "DZ_FINISH_4_6",
"堆载预压或路基填筑完成6个月以后": "DZ_FINISH_AFTER_6",
"轨道板(道床)铺设后第1个月": "GDB_FINISH_1_3",
"轨道板(道床)铺设后第2至3个月": "GDB_FINISH_4_6",
"轨道板(道床)铺设后3个月以后": "GDB_FINISH_AFTER_6",
"铺路或堆载,一般情况": "DZ_CONTINUE",
"铺路或堆载,沉降量突变情况": "DZ_CONTINUE",
"铺路或堆载,两次铺路间隔时间较长情况": "DZ_INTERVAL",
# 桥梁分组
"桥墩(台)地面处拆模后": "STATIC",
"墩身混凝土施工": "STATIC",
"预制梁桥,架梁前": "STATIC",
"预制梁桥,预制梁架设前": "YZLQ_BEFORE_JS",
"预制梁桥,预制梁架设后": "YZLQ_AFTER_JS",
"现浇梁,浇筑前": "STATIC",
"现浇梁上部结构施工中": "STATIC",
"架桥机(运梁车)通过": "STATIC",
"桥梁主体工程完工后第1至3个月": "QL_ZHUTI_1_3",
"桥梁主体工程完工后第4至6个月": "QL_ZHUTI_4_6",
"桥梁主体工程完工后6个月以后": "QL_ZHUTI_AFTER_6",
"轨道铺设,前": "STATIC",
"轨道铺设,后": "STATIC",
"轨道铺设完成后第1个月": "GD_FINISH_1",
"轨道铺设完成后2至3个月": "GD_FINISH_2_3",
"轨道铺设完成后4至12个月": "GD_FINISH_4_12",
"轨道铺设完成后12个月以后": "GD_FINISH_AFTER_12",
# 桥梁旧工况分组
"墩台施工到一定高度": "STATIC",
"墩台混凝土施工": "STATIC",
"桥位施工桥梁,制梁前": "STATIC",
"桥位施工桥梁,上部结构施工中": "STATIC",
"桥梁主体工程完工后,第1至3个月": "QL_ZHUTI_1_3",
"桥梁主体工程完工后,第4至6个月": "QL_ZHUTI_4_6",
"桥梁主体工程完工后,6个月以后": "QL_ZHUTI_AFTER_6",
"轨道铺设期间,前": "STATIC",
"轨道铺设期间,后": "STATIC",
"预制梁桥预制梁架设后": "YZLQ_AFTER_JS",
"架桥机(运梁车) 首次通过前": "JQJ_FIRST_BEFORE",
"架桥机(运梁车) 首次通过后前3天": "JQJ_FIRST_AFTER_3D",
"架桥机(运梁车) 首次通过后": "JQJ_FIRST_AFTER",
# 隧道分组
"仰拱(底板)施工完成后第1个月": "YG_DIBAN_1",
"仰拱(底板)施工完成后第2至3个月": "YG_DIBAN_2_3",
"仰拱(底板)施工完成后3个月以后": "YG_DIBAN_AFTER_3",
"无砟轨道铺设后第1至3个月": "WZGD_1_3",
"无砟轨道铺设后4至12个月": "WZGD_4_12",
"无砟轨道铺设后12个月以后": "WZGD_AFTER_12",
"仰拱底板施工完成后第1个月": "YG_DIBAN_1",
"仰拱底板施工完成后第2至3个月": "YG_DIBAN_2_3",
"仰拱底板施工完成后3个月以后": "YG_DIBAN_AFTER_3",
# 其他特殊工况分组
"轨道板铺设前": "STATIC",
"站场填方路基段填筑完成至静态验收": "STATIC",
"特殊地段隧道施工完成后至静态验收": "STATIC",
# 特殊工况
"冬休": "STATIC",
}
# ==================== 切换触发规则(工况预测用) ====================
TRANSITION_RULES = {
# 路基切换规则
"DZ_FINISH_1_3": {"trigger_days": 90, "next": ["预压土或路基填筑完成第4~6个月"]},
"DZ_FINISH_4_6": {"trigger_days": 90, "next": ["预压土或路基填筑完成6个月以后"]},
"DZ_FINISH_AFTER_6": {"trigger_days": None, "next": None},
"JQJ_FIRST_BEFORE": {"trigger_days": 1, "next": ["架桥机(运梁车)首次通过后前3天"]},
"JQJ_FIRST_AFTER_3D": {"trigger_days": 3, "next": ["架桥机(运梁车)首次通过后"]},
"JQJ_FIRST_AFTER": {"trigger_days": None, "next": None},
"GDB_FINISH_1_3": {"trigger_days": 30, "next": ["轨道板(道床)铺设后第4至6个月"]},
"GDB_FINISH_4_6": {"trigger_days": 30, "next": ["轨道板(道床)铺设后6个月以后"]},
"GDB_FINISH_AFTER_6": {"trigger_days": None, "next": None},
# 桥梁切换规则
"YZLQ_BEFORE_JS": {"trigger_days": 1, "next": ["架桥机(运梁车)通过"]},
"YZLQ_AFTER_JS": {"trigger_days": 7, "next": ["桥梁主体工程完工后第1至3个月"]},
"QL_ZHUTI_1_3": {"trigger_days": 90, "next": ["桥梁主体工程完工后第4至6个月"]},
"QL_ZHUTI_4_6": {"trigger_days": 90, "next": ["桥梁主体工程完工后6个月以后"]},
"QL_ZHUTI_AFTER_6": {"trigger_days": None, "next": None},
"GD_FINISH_1": {"trigger_days": 30, "next": ["轨道铺设完成后2至3个月"]},
"GD_FINISH_2_3": {"trigger_days": 60, "next": ["轨道铺设完成后4至12个月"]},
"GD_FINISH_4_12": {"trigger_days": 240, "next": ["轨道铺设完成后12个月以后"]},
"GD_FINISH_AFTER_12": {"trigger_days": None, "next": None},
# 隧道切换规则
"YG_DIBAN_1": {"trigger_days": 30, "next": ["仰拱(底板)施工完成后第2至3个月"]},
"YG_DIBAN_2_3": {"trigger_days": 60, "next": ["仰拱(底板)施工完成后3个月以后"]},
"YG_DIBAN_AFTER_3": {"trigger_days": None, "next": None},
"WZGD_1_3": {"trigger_days": 90, "next": ["无砟轨道铺设后4至12个月"]},
"WZGD_4_12": {"trigger_days": 240, "next": ["无砟轨道铺设后12个月以后"]},
"WZGD_AFTER_12": {"trigger_days": None, "next": None},
# 静态分组(无切换)
"DZ_CONTINUE": {"trigger_days": None, "next": None},
"DZ_INTERVAL": {"trigger_days": None, "next": None},
"STATIC": {"trigger_days": None, "next": None},
}
# ==================== 冬休标识 ====================
WINTER_BREAK_LABELS = {"冬休"}

View File

@@ -16,7 +16,10 @@ from ..services.section_data import SectionDataService
from ..services.account import AccountService from ..services.account import AccountService
from ..models.daily import DailyData from ..models.daily import DailyData
from ..models.settlement_data import SettlementData from ..models.settlement_data import SettlementData
from typing import List from ..models.level_data import LevelData
from ..models.error_linecode import ErrorLinecode
from ..services.settlement_data import SettlementDataService
from typing import List, Tuple, Any
from ..utils.construction_monitor import ConstructionMonitorUtils from ..utils.construction_monitor import ConstructionMonitorUtils
import time import time
import json import json
@@ -95,10 +98,8 @@ class TaskScheduler:
# name='每日重置账号更新状态' # name='每日重置账号更新状态'
# ) # )
# logger.info("系统定时任务:每日重置账号更新状态已添加") # logger.info("系统定时任务:每日重置账号更新状态已添加")
# existing_job = None
# existing_job = self.scheduler.get_job("scheduled_get_max_nyid_by_point_id") # existing_job = self.scheduler.get_job("scheduled_get_max_nyid_by_point_id")
# if not existing_job: # if existing_job is None:
# # 添加每天凌晨1点执行获取max NYID关联数据任务
# self.scheduler.add_job( # self.scheduler.add_job(
# scheduled_get_max_nyid_by_point_id, # scheduled_get_max_nyid_by_point_id,
# 'cron', # 'cron',
@@ -234,92 +235,105 @@ def scheduled_get_max_nyid_by_point_id(start: int = 0,end: int = 0):
# logger.info(f"DailyData表清空完成共删除{delete_count}条历史记录") # logger.info(f"DailyData表清空完成共删除{delete_count}条历史记录")
# 1. 获取沉降数据(返回 List[List[dict]] # 1. 以 level_data 为来源:每个 linecode 取最新一期NYID 最大),再按该 NYID 从 settlement 取一条
daily_service = DailyDataService()
result = daily_service.get_nyid_by_point_id(db, [], 1)
# 2. 计算到期数据
monitor = ConstructionMonitorUtils()
daily_data = monitor.get_due_data(result,start=start,end=end)
data = daily_data['data']
error_data = daily_data['error_data']
winters = daily_data['winter']
logger.info(f"首次获取数据完成,共{len(result)}条记录")
# 3. 循环处理冬休数据,追溯历史非冬休记录
max_num = 1
print(f"首次获取冬休数据完成,共{len(winters)}条记录")
while 1:
max_num += 1
print(max_num)
# 提取冬休数据的point_id列表
new_list = [int(w['point_id']) for w in winters]
# print(new_list)
if new_list == []:
break
# 获取更多历史记录
nyid_list = daily_service.get_nyid_by_point_id(db, new_list, max_num)
w_list = monitor.get_due_data(nyid_list,start=start,end=end)
# 更新冬休、待处理、错误数据
winters = w_list['winter']
data.extend(w_list['data'])
# 过期数据一并处理
# data.extend(w_list['error_data'])
error_data.extend(w_list['error_data'])
# print(f"第{max_num}次获取冬休数据完成,共{len(winters)}条记录")
if winters == []:
break
data.extend(error_data)
# 4. 初始化服务实例
level_service = LevelDataService() level_service = LevelDataService()
settlement_service = SettlementDataService()
daily_service = DailyDataService()
checkpoint_db = CheckpointService() checkpoint_db = CheckpointService()
section_db = SectionDataService() section_db = SectionDataService()
account_service = AccountService() account_service = AccountService()
# 5. 关联其他表数据(核心逻辑保留) monitor = ConstructionMonitorUtils()
for d in data:
# 处理 LevelData假设返回列表取第一条
level_results = level_service.get_by_nyid(db, d['NYID'])
level_instance = level_results[0] if isinstance(level_results, list) and level_results else level_results
d['level_data'] = level_instance.to_dict() if level_instance else None
# 处理 CheckpointData返回单实例直接使用 logger.info("正在查询水准线路列表…")
linecodes = [r[0] for r in db.query(LevelData.linecode).distinct().all()]
logger.info(f"{len(linecodes)} 个水准线路,开始拉取多期沉降…")
# 每个 linecode 传多期沉降(按 NYID 降序),供冬休「一直推」直到推出或推不出
input_data: List[List[dict]] = []
for idx, linecode in enumerate(linecodes):
level_list = level_service.get_by_linecode(db, linecode)
if not level_list:
continue
# 按 NYID 降序(最新在前),最多取 30 期
level_list_sorted = sorted(
level_list,
key=lambda x: int(x.NYID) if str(x.NYID).isdigit() else 0,
reverse=True,
)[:30]
point_data_for_linecode: List[dict] = []
level_latest = level_list_sorted[0]
for level_instance in level_list_sorted:
nyid = level_instance.NYID
settlement_dict = settlement_service.get_one_dict_by_nyid(db, nyid)
if not settlement_dict:
continue
settlement_dict["__linecode"] = linecode
# 仅最新一期带 __level_data供后续写 daily 用
settlement_dict["__level_data"] = level_latest.to_dict()
point_data_for_linecode.append(settlement_dict)
if point_data_for_linecode:
input_data.append(point_data_for_linecode)
if (idx + 1) % 50 == 0 or idx == len(linecodes) - 1:
logger.info(f"已处理线路 {idx + 1}/{len(linecodes)},当前共 {len(input_data)} 条可推理")
logger.info(f"多期沉降拉取完成,共 {len(input_data)} 条线路,开始推理…")
if not input_data:
logger.warning("未找到任何 linecode 对应的最新期沉降数据,跳过写 daily")
db.execute(text(f"TRUNCATE TABLE {DailyData.__tablename__}"))
db.commit()
db.close()
return
# 2. 计算到期数据remaining / 冬休等)
daily_data = monitor.get_due_data(input_data, start=start, end=end)
logger.info("推理完成")
data = daily_data["data"]
# 冬休一直推、推不出来的 linecode 写入 error_linecode 表(表 id 需为 AUTO_INCREMENT
try:
for linecode in daily_data.get("error_linecodes", []):
if not linecode:
continue
exists = db.query(ErrorLinecode).filter(ErrorLinecode.linecode == linecode).first()
if not exists:
db.add(ErrorLinecode(linecode=linecode))
if daily_data.get("error_linecodes"):
logger.info(f"推理推不出来的线路将写入 error_linecode: {daily_data['error_linecodes']}")
except Exception as elc:
db.rollback()
logger.warning(
f"error_linecode 写入失败(请确保表 id 为 AUTO_INCREMENT: {elc},已跳过,继续写 daily"
)
logger.info(f"按 level_data 最新期获取数据完成,共{len(data)}条有效记录")
# 3. 关联 level / checkpoint / section / accountlevel 已带在 __level_data
for d in data:
d['level_data'] = d.pop('__level_data', None)
d.pop('__linecode', None)
checkpoint_instance = checkpoint_db.get_by_point_id(db, d['point_id']) checkpoint_instance = checkpoint_db.get_by_point_id(db, d['point_id'])
d['checkpoint_data'] = checkpoint_instance.to_dict() if checkpoint_instance else None d['checkpoint_data'] = checkpoint_instance.to_dict() if checkpoint_instance else None
# 处理 SectionData根据checkpoint_data关联
if d['checkpoint_data']: if d['checkpoint_data']:
section_instance = section_db.get_by_section_id(db, d['checkpoint_data']['section_id']) section_instance = section_db.get_by_section_id(db, d['checkpoint_data']['section_id'])
d['section_data'] = section_instance.to_dict() if section_instance else None d['section_data'] = section_instance.to_dict() if section_instance else None
else: else:
d['section_data'] = None d['section_data'] = None
# 处理 AccountData
if d.get('section_data') and d['section_data'].get('account_id'): if d.get('section_data') and d['section_data'].get('account_id'):
account_response = account_service.get_account(db, account_id=d['section_data']['account_id']) account_response = account_service.get_account(db, account_id=d['section_data']['account_id'])
d['account_data'] = account_response.__dict__ if account_response else None d['account_data'] = account_response.__dict__ if account_response else None
else: else:
d['account_data'] = None d['account_data'] = None
print(f"一共有{len(data)}条数据")
# 6. 构造DailyData数据并批量创建 # 4. 构造 DailyData(每条已是「每 linecode 最新一期」)
# daily_create_data1 = set()
daily_create_data = [] daily_create_data = []
nyids = []
for d in data: for d in data:
# 过滤无效数据(避免缺失关键字段报错) if not all(key in d for key in ['NYID', 'point_id', 'remaining']) or not d.get('level_data') or not d.get('account_data') or not d.get('section_data'):
if all(key in d for key in ['NYID', 'point_id','remaining']) and d.get('level_data') and d.get('account_data') and d.get('section_data'): continue
if d['NYID'] in nyids: tem = {
continue 'NYID': d['NYID'],
tem = { 'point_id': d['point_id'],
'NYID': d['NYID'], 'linecode': d['level_data']['linecode'],
'point_id': d['point_id'], 'account_id': d['account_data']['account_id'],
'linecode': d['level_data']['linecode'], 'section_id': d['section_data']['section_id'],
'account_id': d['account_data']['account_id'], 'remaining': (0 - int(d['overdue'])) if 'overdue' in d else d['remaining'],
'section_id': d['section_data']['section_id'], }
'remaining': (0-int(d['overdue'])) if 'overdue' in d else d['remaining'], daily_create_data.append(tem)
}
nyids.append(d['NYID'])
daily_create_data.append(tem)
# 批量创建记录 # 批量创建记录
print(daily_create_data) print(daily_create_data)
if daily_create_data: if daily_create_data:

36
fix_mysql.sh Normal file
View File

@@ -0,0 +1,36 @@
#!/bin/bash
echo "=== 修复 MySQL 监听配置 ==="
# 1. 备份配置文件
sudo cp /etc/mysql/mysql.conf.d/mysqld.cnf /etc/mysql/mysql.conf.d/mysqld.cnf.backup.$(date +%Y%m%d_%H%M%S)
echo "✅ 已备份配置文件"
# 2. 修改 bind-address
sudo sed -i 's/^bind-address.*127.0.0.1/bind-address = 0.0.0.0/' /etc/mysql/mysql.conf.d/mysqld.cnf
echo "✅ 已修改 bind-address"
# 3. 显示修改后的配置
echo ""
echo "修改后的配置:"
sudo grep "bind-address" /etc/mysql/mysql.conf.d/mysqld.cnf
echo ""
# 4. 重启 MySQL
echo "正在重启 MySQL..."
sudo systemctl restart mysql
sleep 2
# 5. 验证
echo ""
echo "验证 MySQL 监听状态:"
sudo netstat -tlnp | grep 3306
echo ""
# 6. 测试连接
DOCKER0_IP=$(ip addr show docker0 | grep -oP '(?<=inet\s)\d+(\.\d+){3}')
echo "测试从本机连接 MySQL (IP: $DOCKER0_IP):"
mysql -h $DOCKER0_IP -u railway -p'Railway01.' -e "SELECT 'Connection OK' as status, DATABASE() as current_db, VERSION() as version;" 2>&1
echo ""
echo "=== 修复完成 ==="

27
test_connection.sh Normal file
View File

@@ -0,0 +1,27 @@
#!/bin/bash
echo "=== Docker 网络诊断 ==="
echo ""
echo "1. Docker0 网桥 IP:"
ip addr show docker0 2>/dev/null | grep -oP '(?<=inet\s)\d+(\.\d+){3}' || echo "❌ docker0 不存在"
echo ""
echo "2. MySQL 监听状态:"
sudo netstat -tlnp | grep 3306 || echo "❌ MySQL 未运行或未监听 3306"
echo ""
echo "3. MySQL 用户权限:"
mysql -u root -p -e "SELECT user, host FROM mysql.user WHERE user='railway';" 2>/dev/null || echo "❌ 无法查询(需要 root 密码)"
echo ""
echo "4. 测试从容器连接 MySQL:"
DOCKER0_IP=$(ip addr show docker0 | grep -oP '(?<=inet\s)\d+(\.\d+){3}')
if [ ! -z "$DOCKER0_IP" ]; then
docker run --rm mysql:8.0 mysql -h $DOCKER0_IP -u railway -p'Railway01.' -e "SELECT 'OK' as status;" 2>&1
else
echo "❌ 无法获取 docker0 IP"
fi
echo ""
echo "=== 诊断完成 ==="