Compare commits
31 Commits
7091c63be9
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| d80dd0f5ec | |||
| 65744e28dd | |||
| beea67482d | |||
| a577b8ef07 | |||
| 3f7b839e0a | |||
| e5de18e80f | |||
| 08b556fa6e | |||
| cef797782c | |||
| 42dab6f961 | |||
| dd9f31ee52 | |||
| ed0d840aee | |||
| 02bc58a17d | |||
| 3368744d4a | |||
| c26bd4e8e1 | |||
| 6ccda8f8a7 | |||
| fdf07e0dab | |||
| f7e77093df | |||
| e76711af9d | |||
| 539c2941e1 | |||
| 192c299f80 | |||
| cd3ced8833 | |||
| e130134791 | |||
| 28cddd7409 | |||
| 5cfdadf02f | |||
| ebe3f13a58 | |||
| 313ade1a60 | |||
| de3ce23654 | |||
| 6517af823d | |||
| 2eb7b9b5c1 | |||
|
|
8b3796dd5a | ||
|
|
a7b7a786a5 |
@@ -9,7 +9,12 @@ from ..schemas.account import (
|
||||
AccountApiResponse, AccountListResponse,AccountGetRequestYH
|
||||
)
|
||||
from ..services.account import AccountService
|
||||
|
||||
import json
|
||||
from typing import List, Optional
|
||||
from urllib.error import HTTPError, URLError
|
||||
from urllib import request as urllib_request
|
||||
from urllib.parse import urlencode
|
||||
from socket import timeout
|
||||
router = APIRouter(prefix="/accounts", tags=["账号管理"])
|
||||
|
||||
@router.post("/create", response_model=AccountApiResponse, status_code=status.HTTP_201_CREATED)
|
||||
@@ -112,6 +117,7 @@ def update_account(request: AccountUpdateRequest, db: Session = Depends(get_db))
|
||||
|
||||
@router.post("/delete", response_model=AccountApiResponse)
|
||||
def delete_account(request: AccountDeleteRequest, db: Session = Depends(get_db)):
|
||||
|
||||
"""删除账号"""
|
||||
if not AccountService.delete_account(db, request.account_id):
|
||||
return AccountApiResponse(
|
||||
@@ -124,3 +130,101 @@ def delete_account(request: AccountDeleteRequest, db: Session = Depends(get_db))
|
||||
message="账号删除成功",
|
||||
data=None
|
||||
)
|
||||
# 获取今日上传的数据接口
|
||||
@router.post("/get_uplaod_data", response_model=AccountListResponse)
|
||||
def get_uplaod_data(request: AccountGetRequest, db: Session = Depends(get_db)):
|
||||
"""根据多种条件查询账号,并合并外部接口的 is_ok 字段(仅返回 today_data 中存在的账号)"""
|
||||
# 1. 从数据库查询账号列表(原有逻辑不变)
|
||||
accounts = AccountService.search_accounts(
|
||||
db,
|
||||
account_id=request.account_id,
|
||||
username=request.username,
|
||||
project_name=request.project_name,
|
||||
status=request.status,
|
||||
today_updated=request.today_updated,
|
||||
yh_id=request.yh_id,
|
||||
cl_name=request.cl_name
|
||||
)
|
||||
|
||||
# 2. 调用外部接口构建 user_is_ok_map(替换为 urllib 实现,修复命名冲突和超时异常)
|
||||
user_is_ok_map = {}
|
||||
api_url = "https://engineering.yuxindazhineng.com/index/index/get_over_data"
|
||||
payload = {"user_id": "68c0dbfdb7cbcd616e7c5ab5"}
|
||||
|
||||
try:
|
||||
# 步骤1:将表单 payload 转为 URL 编码的字节流(urllib POST 要求数据为字节流)
|
||||
payload_bytes = urlencode(payload).encode("utf-8")
|
||||
|
||||
# 步骤2:构造 Request 对象(使用重命名后的 urllib_request,避免冲突)
|
||||
req = urllib_request.Request(
|
||||
url=api_url,
|
||||
data=payload_bytes,
|
||||
method="POST" # 显式指定 POST 方法
|
||||
)
|
||||
|
||||
# 步骤3:发送请求并读取响应(设置 10 秒超时,避免接口挂起)
|
||||
with urllib_request.urlopen(req, timeout=10) as resp:
|
||||
# 读取响应字节流并解码为 UTF-8 字符串
|
||||
response_str = resp.read().decode("utf-8")
|
||||
|
||||
# 步骤4:将 JSON 字符串解析为字典
|
||||
api_response = json.loads(response_str)
|
||||
|
||||
# 步骤5:验证接口返回格式并构建 user_is_ok_map 映射
|
||||
if api_response.get("code") == 0:
|
||||
today_data = api_response.get("data", []) # 给默认值,避免 KeyError
|
||||
for item in today_data:
|
||||
# 安全获取字段并校验类型
|
||||
if 'user_name' in item and 'is_ok' in item:
|
||||
user_is_ok_map[item['user_name']] = item['is_ok']
|
||||
|
||||
except HTTPError as e:
|
||||
# 捕获 HTTP 状态码错误(4xx/5xx)
|
||||
print(f"外部接口 HTTP 错误:{e.code} - {e.reason}")
|
||||
except TimeoutError: # 修复:正确的超时异常名称(首字母大写,原 timeout 会未定义报错)
|
||||
# 捕获请求超时异常
|
||||
print(f"外部接口调用超时(超过 10 秒)")
|
||||
except URLError as e:
|
||||
# 捕获 URL 解析错误、网络连接错误等
|
||||
print(f"外部接口网络错误:{e.reason}")
|
||||
except json.JSONDecodeError:
|
||||
# 捕获非合法 JSON 格式响应
|
||||
print(f"外部接口返回数据格式错误,非合法 JSON 字符串")
|
||||
except Exception as e:
|
||||
# 捕获其他未知异常
|
||||
print(f"外部接口处理未知异常:{str(e)}")
|
||||
|
||||
# 3. 关键修改:仅保留 today_data 中存在的账号(核心过滤逻辑)
|
||||
accounts_with_is_ok = []
|
||||
for account in accounts:
|
||||
# 步骤1:将 AccountResponse 对象转为字典(Pydantic 模型自带 dict() 方法)
|
||||
account_dict = account.dict()
|
||||
|
||||
# 步骤2:获取当前账号 username,判断是否在 user_is_ok_map 中(不存在则跳过)
|
||||
current_username = account_dict.get("username", "")
|
||||
if current_username not in user_is_ok_map:
|
||||
continue # 核心:过滤掉 today_data 中没有的账号
|
||||
|
||||
# 步骤3:给字典添加 is_ok 字段(仅处理存在的账号,无需默认值 0)
|
||||
account_dict['is_ok'] = user_is_ok_map[current_username]
|
||||
|
||||
# 步骤4:将处理后的字典加入新列表
|
||||
accounts_with_is_ok.append(account_dict)
|
||||
|
||||
# 4. 处理空结果返回(原有逻辑不变)
|
||||
if not accounts_with_is_ok:
|
||||
return AccountListResponse(
|
||||
code=ResponseCode.ACCOUNT_NOT_FOUND,
|
||||
message=ResponseMessage.ACCOUNT_NOT_FOUND,
|
||||
total=0,
|
||||
data=[]
|
||||
)
|
||||
|
||||
# 5. 正常返回:数据传入新列表 accounts_with_is_ok(仅包含 today_data 中的账号)
|
||||
# print(accounts_with_is_ok)
|
||||
return AccountListResponse(
|
||||
code=ResponseCode.SUCCESS,
|
||||
message="查询成功",
|
||||
total=len(accounts_with_is_ok),
|
||||
data=accounts_with_is_ok
|
||||
)
|
||||
|
||||
@@ -18,6 +18,7 @@ from ..schemas.comprehensive_data import (
|
||||
SettlementDataCheckpointQueryRequest,
|
||||
LevelDataQueryRequest,
|
||||
LinecodeRequest,
|
||||
LinecodeAccountRequest,
|
||||
NYIDRequest,
|
||||
SectionByAccountRequest,
|
||||
PointByAccountRequest,
|
||||
@@ -30,6 +31,8 @@ from ..services.settlement_data import SettlementDataService
|
||||
from ..services.level_data import LevelDataService
|
||||
from ..services.original_data import OriginalDataService
|
||||
from ..services.comprehensive import ComprehensiveDataService
|
||||
from ..services.account import AccountService
|
||||
from ..utils.get_operating_mode import OperatingModePredictor
|
||||
import logging
|
||||
router = APIRouter(prefix="/comprehensive_data", tags=["综合数据管理"])
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -470,12 +473,14 @@ def get_settlement_by_linecode(
|
||||
|
||||
settlement_service = SettlementDataService()
|
||||
result = settlement_service.get_settlement_by_linecode(db, linecode)
|
||||
|
||||
settlement_data = result['settlement_data']
|
||||
predictor = OperatingModePredictor()
|
||||
result_1d = predictor.predict(settlement_data)
|
||||
return DataResponse(
|
||||
code=ResponseCode.SUCCESS,
|
||||
message=f"查询成功,共获取{len(result['settlement_data'])}条沉降数据",
|
||||
total=len(result['settlement_data']),
|
||||
data=result['settlement_data']
|
||||
total=len(result_1d),
|
||||
data=result_1d
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
@@ -608,8 +613,8 @@ def refresh_today_data(request: TodayDataRequest, db: Session = Depends(get_db))
|
||||
return DataResponse(
|
||||
code=ResponseCode.QUERY_FAILED,
|
||||
message=f"定时任务触发失败:{str(e)}",
|
||||
total=len(daily_data),
|
||||
data={}
|
||||
total=0,
|
||||
data=[]
|
||||
)
|
||||
# account_id获取所有断面数据
|
||||
@router.post("/get_all_section_by_account", response_model=DataResponse)
|
||||
@@ -724,3 +729,66 @@ def get_checkpoint_by_point(request: LevelDataQueryRequest, db: Session = Depend
|
||||
total=0,
|
||||
data=[]
|
||||
)
|
||||
|
||||
@router.post("/get_accounts_by_linecode", response_model=DataResponse)
|
||||
def get_accounts_by_linecode(request: LinecodeAccountRequest, db: Session = Depends(get_db)):
|
||||
"""
|
||||
通过水准线路编码查询账号信息
|
||||
业务逻辑:
|
||||
1. 根据linecode在水准数据表查询最新的NYID
|
||||
2. 根据NYID在沉降数据表查询所有point_id(去重)
|
||||
3. 根据point_id在观测点表查询所有section_id(去重)
|
||||
4. 根据section_id在断面表查询所有account_id(去重)
|
||||
5. 根据account_id在账号表查询账号信息并返回
|
||||
|
||||
优化:使用批量IN查询,避免循环查询数据库
|
||||
"""
|
||||
try:
|
||||
linecode = request.linecode
|
||||
logger.info(f"接口请求:根据linecode={linecode}查询账号信息")
|
||||
|
||||
# 调用服务层方法
|
||||
accounts = AccountService.get_accounts_by_linecode(db, linecode)
|
||||
|
||||
if not accounts:
|
||||
return DataResponse(
|
||||
code=ResponseCode.SUCCESS,
|
||||
message=f"未找到linecode={linecode}对应的账号信息",
|
||||
total=0,
|
||||
data=[]
|
||||
)
|
||||
|
||||
# 转换为字典列表
|
||||
account_data = []
|
||||
for account in accounts:
|
||||
account_dict = {
|
||||
"id": account.id,
|
||||
"username": account.username,
|
||||
"status": account.status,
|
||||
"today_updated": account.today_updated,
|
||||
"project_name": account.project_name,
|
||||
"created_at": account.created_at.strftime("%Y-%m-%d %H:%M:%S") if account.created_at else None,
|
||||
"updated_at": account.updated_at.strftime("%Y-%m-%d %H:%M:%S") if account.updated_at else None,
|
||||
"update_time": account.update_time,
|
||||
"max_variation": account.max_variation,
|
||||
"yh_id": account.yh_id,
|
||||
"cl_name": account.cl_name
|
||||
}
|
||||
account_data.append(account_dict)
|
||||
|
||||
return DataResponse(
|
||||
code=ResponseCode.SUCCESS,
|
||||
message=f"查询成功,共获取{len(account_data)}个账号",
|
||||
total=len(account_data),
|
||||
data=account_data
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"查询账号信息失败:{str(e)}", exc_info=True)
|
||||
return DataResponse(
|
||||
code=ResponseCode.QUERY_FAILED,
|
||||
message=f"查询失败:{str(e)}",
|
||||
total=0,
|
||||
data=[]
|
||||
)
|
||||
|
||||
|
||||
@@ -8,7 +8,11 @@ from ..schemas.level_data import (
|
||||
LevelDataListResponse,
|
||||
LevelDataResponse,
|
||||
BatchDeleteByLinecodesRequest,
|
||||
BatchDeleteByLinecodesResponse
|
||||
BatchDeleteByLinecodesResponse,
|
||||
LinecodeRequest,
|
||||
NyidListResponse,
|
||||
SyncLoseDataRequest,
|
||||
SyncLoseDataResponse,
|
||||
)
|
||||
from ..services.level_data import LevelDataService
|
||||
|
||||
@@ -36,6 +40,31 @@ def get_level_data_by_project(request: LevelDataRequest, db: Session = Depends(g
|
||||
)
|
||||
|
||||
|
||||
@router.post("/get_nyids_by_linecode", response_model=NyidListResponse)
|
||||
def get_nyids_by_linecode(request: LinecodeRequest, db: Session = Depends(get_db)):
|
||||
"""
|
||||
通过水准线路编码返回该线路下所有 NYID,只返回 NYID 列表(去重、按 NYID 降序)
|
||||
"""
|
||||
try:
|
||||
level_service = LevelDataService()
|
||||
level_list = level_service.get_by_linecode(db, linecode=request.linecode)
|
||||
nyids = sorted(
|
||||
{str(item.NYID) for item in level_list if item.NYID},
|
||||
key=lambda x: int(x) if str(x).isdigit() else 0,
|
||||
reverse=True,
|
||||
)
|
||||
return NyidListResponse(
|
||||
code=ResponseCode.SUCCESS,
|
||||
message=ResponseMessage.SUCCESS,
|
||||
data=nyids,
|
||||
)
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"查询失败: {str(e)}",
|
||||
)
|
||||
|
||||
|
||||
@router.post("/batch_delete_by_linecodes", response_model=BatchDeleteByLinecodesResponse)
|
||||
def batch_delete_by_linecodes(request: BatchDeleteByLinecodesRequest, db: Session = Depends(get_db)):
|
||||
"""
|
||||
@@ -59,3 +88,39 @@ def batch_delete_by_linecodes(request: BatchDeleteByLinecodesRequest, db: Sessio
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"批量删除失败: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
@router.post("/sync_lose_data", response_model=SyncLoseDataResponse)
|
||||
def sync_lose_data(
|
||||
request: SyncLoseDataRequest = SyncLoseDataRequest(),
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""
|
||||
同步缺失数据到 lose_data 表。
|
||||
- 不传 linecode:按所有水准线路的 NYID 计算缺失并写入,仅返回是否处理成功。
|
||||
- 传 linecode:只处理该水准线路,并返回该线路的缺失数据记录列表。
|
||||
缺失规则:原始数据无=1、沉降数据无=2,lose_data 为二者之和(0/1/2/3)。
|
||||
"""
|
||||
try:
|
||||
level_service = LevelDataService()
|
||||
result = level_service.sync_lose_data(db, linecode=request.linecode)
|
||||
if result.get("data") is None:
|
||||
data = {"success": result["success"]}
|
||||
if not result["success"]:
|
||||
return SyncLoseDataResponse(
|
||||
code=1,
|
||||
message=result.get("message", "处理失败"),
|
||||
data=data,
|
||||
)
|
||||
else:
|
||||
data = result["data"]
|
||||
return SyncLoseDataResponse(
|
||||
code=0 if result["success"] else 1,
|
||||
message=ResponseMessage.SUCCESS if result["success"] else (result.get("message") or "处理失败"),
|
||||
data=data,
|
||||
)
|
||||
except Exception as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
||||
detail=f"同步缺失数据失败: {str(e)}",
|
||||
)
|
||||
@@ -235,10 +235,11 @@ def receive_after_cursor_execute(conn, cursor, statement, params, context, execu
|
||||
log_transaction_end(success=True)
|
||||
|
||||
@event.listens_for(Engine, "handle_error")
|
||||
def receive_handle_error(exception, context):
|
||||
"""错误监听"""
|
||||
error_msg = str(exception)
|
||||
sql = context.statement if context and hasattr(context, 'statement') else None
|
||||
def receive_handle_error(context):
|
||||
"""错误监听:SQLAlchemy 只传入一个 ExceptionContext 参数"""
|
||||
exception = getattr(context, "original_exception", None) or getattr(context, "sqlalchemy_exception", None)
|
||||
error_msg = str(exception) if exception else str(context)
|
||||
sql = getattr(context, "statement", None)
|
||||
log_connection_error(error_msg, sql)
|
||||
log_transaction_end(success=False, error=error_msg)
|
||||
|
||||
|
||||
@@ -17,6 +17,10 @@ class Account(Base):
|
||||
max_variation = Column(Integer, default=1, comment="变化量的绝对值,单位是毫米")
|
||||
yh_id = Column(String(1000), comment="宇恒一号用户id")
|
||||
cl_name = Column(String(100), nullable=True, comment="测量人员")
|
||||
device_name = Column(String(1000), comment="设备名称")
|
||||
device_port = Column(String(1000), comment="设备端口")
|
||||
device_ip = Column(String(1000), comment="设备局域网内ip地址")
|
||||
is_ok = Column(Integer, default=0, comment="是否可以上传")
|
||||
|
||||
|
||||
# 模型转字典
|
||||
|
||||
10
app/models/error_linecode.py
Normal file
10
app/models/error_linecode.py
Normal file
@@ -0,0 +1,10 @@
|
||||
from sqlalchemy import Column, Integer, String
|
||||
from ..core.database import Base
|
||||
|
||||
|
||||
class ErrorLinecode(Base):
|
||||
"""推理推不出来的水准线路(如全为冬休),记录到该表。数据库表 id 必须为 AUTO_INCREMENT。"""
|
||||
__tablename__ = "error_linecode"
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
|
||||
linecode = Column(String(255), nullable=False, comment="水准线路编码", index=True)
|
||||
22
app/models/lose_data.py
Normal file
22
app/models/lose_data.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from sqlalchemy import Column, Integer, String
|
||||
from ..core.database import Base
|
||||
|
||||
|
||||
class LoseData(Base):
|
||||
"""缺失数据记录表:记录各水准线路(期数)的原始/沉降数据缺失情况"""
|
||||
__tablename__ = "lose_data"
|
||||
__table_args__ = {"extend_existing": True}
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True, autoincrement=True, comment="ID")
|
||||
account_id = Column(Integer, nullable=False, comment="账户id", index=True)
|
||||
NYID = Column(String(100), nullable=False, comment="期数ID", index=True)
|
||||
linecode = Column(String(255), nullable=False, default="0", comment="水准线路编码", index=True)
|
||||
lose_data = Column(Integer, nullable=False, default=0, comment="缺失的数据,默认是0")
|
||||
section_id = Column(String(255), nullable=True, comment="所属断面id")
|
||||
point_id = Column(String(100), nullable=False, comment="测点ID")
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
column.name: getattr(self, column.name)
|
||||
for column in self.__table__.columns
|
||||
}
|
||||
@@ -12,6 +12,10 @@ class AccountBase(BaseModel):
|
||||
max_variation: Optional[int] = None
|
||||
yh_id: Optional[str] = None
|
||||
cl_name: Optional[str] = None
|
||||
device_name: Optional[str] = None
|
||||
device_port: Optional[str] = None
|
||||
device_ip: Optional[str] = None
|
||||
is_ok: Optional[int] = None
|
||||
|
||||
class AccountCreate(AccountBase):
|
||||
pass
|
||||
@@ -24,6 +28,10 @@ class AccountUpdate(BaseModel):
|
||||
project_name: Optional[str] = None
|
||||
update_time: Optional[str] = None
|
||||
cl_name: Optional[str] = None
|
||||
device_name: Optional[str] = None
|
||||
device_port: Optional[str] = None
|
||||
device_ip: Optional[str] = None
|
||||
is_ok: Optional[int] = None
|
||||
|
||||
class AccountResponse(AccountBase):
|
||||
account_id: int
|
||||
@@ -49,7 +57,11 @@ class AccountResponse(AccountBase):
|
||||
update_time=account.update_time,
|
||||
max_variation=account.max_variation,
|
||||
yh_id=account.yh_id,
|
||||
cl_name=account.cl_name
|
||||
cl_name=account.cl_name,
|
||||
device_name=account.device_name,
|
||||
device_port=account.device_port,
|
||||
device_ip=account.device_ip,
|
||||
is_ok = account.is_ok
|
||||
)
|
||||
|
||||
class AccountListRequest(BaseModel):
|
||||
@@ -91,3 +103,4 @@ class AccountListResponse(BaseModel):
|
||||
message: str
|
||||
total: int
|
||||
data: List[AccountResponse] = []
|
||||
|
||||
|
||||
@@ -290,6 +290,10 @@ class ComprehensiveDataImportRequest(BaseModel):
|
||||
data: Dict[str, Any]
|
||||
class LinecodeRequest(BaseModel):
|
||||
linecode: str
|
||||
|
||||
class LinecodeAccountRequest(BaseModel):
|
||||
linecode: str
|
||||
|
||||
class ComprehensiveDataImportResponse(BaseModel):
|
||||
success: bool
|
||||
message: str
|
||||
|
||||
@@ -52,3 +52,40 @@ class BatchDeleteByLinecodesResponse(BaseModel):
|
||||
success: bool
|
||||
backup_file: Optional[str] = None
|
||||
deleted_counts: Optional[dict] = None
|
||||
|
||||
|
||||
class LinecodeRequest(BaseModel):
|
||||
"""按水准线路编码查询请求"""
|
||||
linecode: str = Field(..., description="水准线路编码")
|
||||
|
||||
|
||||
class NyidListResponse(BaseModel):
|
||||
"""仅返回 NYID 列表的响应"""
|
||||
code: int = 0
|
||||
message: str
|
||||
data: List[str] = Field(default_factory=list, description="NYID 列表")
|
||||
|
||||
|
||||
class SyncLoseDataRequest(BaseModel):
|
||||
"""同步缺失数据请求:不传 linecode 表示全量同步,传则只处理该线路"""
|
||||
linecode: Optional[str] = Field(None, description="水准线路编码,不传则处理全部线路")
|
||||
|
||||
|
||||
class LoseDataItem(BaseModel):
|
||||
"""lose_data 表单条记录"""
|
||||
id: int
|
||||
account_id: int
|
||||
NYID: str
|
||||
linecode: str
|
||||
lose_data: int
|
||||
section_id: Optional[str] = None
|
||||
point_id: str = ""
|
||||
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
|
||||
|
||||
class SyncLoseDataResponse(BaseModel):
|
||||
"""同步缺失数据响应:全量时 data 为是否成功,单线路时 data 为该线路缺失记录列表"""
|
||||
code: int = 0
|
||||
message: str
|
||||
data: Any = None # 全量时为 {"success": bool},单线路时为 List[LoseDataItem]
|
||||
@@ -103,3 +103,84 @@ class AccountService:
|
||||
db.commit()
|
||||
return True
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def get_accounts_by_linecode(db: Session, linecode: str) -> List[Account]:
|
||||
"""
|
||||
通过水准线路编码查询账号信息(优化版,减少数据库查询次数)
|
||||
业务逻辑:
|
||||
1. 根据linecode在水准数据表查询最新的NYID
|
||||
2. 根据NYID在沉降数据表批量查询所有point_id(去重)
|
||||
3. 根据point_id列表在观测点表批量查询所有section_id(去重)
|
||||
4. 根据section_id列表在断面表批量查询所有account_id(去重)
|
||||
5. 根据account_id列表在账号表批量查询账号信息
|
||||
|
||||
使用IN查询避免循环,大幅提升性能
|
||||
"""
|
||||
from ..models.level_data import LevelData
|
||||
from ..models.settlement_data import SettlementData
|
||||
from ..models.checkpoint import Checkpoint
|
||||
from ..models.section_data import SectionData
|
||||
|
||||
try:
|
||||
logger.info(f"开始通过linecode={linecode}查询账号信息")
|
||||
|
||||
# 1. 根据linecode查询最新的水准数据(按NYID降序取第一条)
|
||||
level_data = db.query(LevelData).filter(
|
||||
LevelData.linecode == linecode
|
||||
).order_by(LevelData.NYID.desc()).first()
|
||||
|
||||
if not level_data:
|
||||
logger.warning(f"未找到linecode={linecode}对应的水准数据")
|
||||
return []
|
||||
|
||||
nyid = level_data.NYID
|
||||
logger.info(f"找到最新期数NYID={nyid}")
|
||||
|
||||
# 2. 根据NYID批量查询沉降数据,提取所有point_id(去重)
|
||||
settlement_list = db.query(SettlementData.point_id).filter(
|
||||
SettlementData.NYID == nyid
|
||||
).distinct().all()
|
||||
|
||||
if not settlement_list:
|
||||
logger.warning(f"未找到NYID={nyid}对应的沉降数据")
|
||||
return []
|
||||
|
||||
point_ids = [s.point_id for s in settlement_list if s.point_id]
|
||||
logger.info(f"找到{len(point_ids)}个观测点ID")
|
||||
|
||||
# 3. 根据point_id列表批量查询观测点数据,提取所有section_id(去重)
|
||||
checkpoint_list = db.query(Checkpoint.section_id).filter(
|
||||
Checkpoint.point_id.in_(point_ids)
|
||||
).distinct().all()
|
||||
|
||||
if not checkpoint_list:
|
||||
logger.warning(f"未找到对应的观测点数据")
|
||||
return []
|
||||
|
||||
section_ids = [c.section_id for c in checkpoint_list if c.section_id]
|
||||
logger.info(f"找到{len(section_ids)}个断面ID")
|
||||
|
||||
# 4. 根据section_id列表批量查询断面数据,提取所有account_id(去重)
|
||||
section_list = db.query(SectionData.account_id).filter(
|
||||
SectionData.section_id.in_(section_ids)
|
||||
).distinct().all()
|
||||
|
||||
if not section_list:
|
||||
logger.warning(f"未找到对应的断面数据")
|
||||
return []
|
||||
|
||||
account_ids = [s.account_id for s in section_list if s.account_id]
|
||||
logger.info(f"找到{len(account_ids)}个账号ID")
|
||||
|
||||
# 5. 根据account_id列表批量查询账号信息
|
||||
accounts = db.query(Account).filter(
|
||||
Account.id.in_(account_ids)
|
||||
).all()
|
||||
|
||||
logger.info(f"查询完成,共找到{len(accounts)}个账号")
|
||||
return accounts
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"通过linecode={linecode}查询账号失败: {str(e)}", exc_info=True)
|
||||
raise e
|
||||
|
||||
@@ -115,11 +115,8 @@ class DailyDataService(BaseService[DailyData]):
|
||||
# 模型字段列表
|
||||
model_columns = [getattr(SettlementData, col.name) for col in SettlementData.__table__.columns]
|
||||
|
||||
# 基础条件
|
||||
base_conditions = [
|
||||
SettlementData.useflag.isnot(None),
|
||||
SettlementData.useflag != 0
|
||||
]
|
||||
# 基础条件:不按 useflag 过滤,确保能取到每个 point 的真正最新一期(按 NYID 最大)
|
||||
base_conditions = []
|
||||
if point_ids:
|
||||
base_conditions.append(SettlementData.point_id.in_(point_ids))
|
||||
|
||||
@@ -354,4 +351,5 @@ class DailyDataService(BaseService[DailyData]):
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"生成 daily 数据失败:{str(e)}", exc_info=True)
|
||||
|
||||
raise
|
||||
@@ -1,12 +1,13 @@
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import text, inspect
|
||||
from typing import List, Optional, Dict, Any
|
||||
from typing import List, Optional, Dict, Any, Tuple
|
||||
from ..models.level_data import LevelData
|
||||
from .base import BaseService
|
||||
from ..models.settlement_data import SettlementData
|
||||
from ..models.checkpoint import Checkpoint
|
||||
from ..models.section_data import SectionData
|
||||
from ..models.account import Account
|
||||
from ..models.lose_data import LoseData
|
||||
from ..core.database import engine
|
||||
import logging
|
||||
import os
|
||||
@@ -30,6 +31,12 @@ class LevelDataService(BaseService[LevelData]):
|
||||
"""根据水准线路编码获取水准数据"""
|
||||
return self.get_by_field(db, "linecode", linecode)
|
||||
|
||||
def get_last_by_linecode(self, db: Session, linecode: str) -> Optional[LevelData]:
|
||||
"""根据水准线路编码获取最新的水准数据(按NYID降序)"""
|
||||
return db.query(LevelData).filter(
|
||||
LevelData.linecode == linecode
|
||||
).order_by(LevelData.NYID.desc()).first()
|
||||
|
||||
def search_level_data(self, db: Session,
|
||||
id: Optional[str] = None,
|
||||
linecode: Optional[str] = None,
|
||||
@@ -458,6 +465,137 @@ class LevelDataService(BaseService[LevelData]):
|
||||
|
||||
return original_data_map
|
||||
|
||||
def _get_nyids_with_settlement(self, db: Session, nyid_list: List[str]) -> set:
|
||||
"""返回在沉降表中存在记录的 NYID 集合"""
|
||||
if not nyid_list:
|
||||
return set()
|
||||
rows = db.query(SettlementData.NYID).filter(SettlementData.NYID.in_(nyid_list)).distinct().all()
|
||||
return {r[0] for r in rows if r[0]}
|
||||
|
||||
def _get_nyid_to_all_points_account_section(
|
||||
self, db: Session, nyid_list: List[str]
|
||||
) -> Dict[str, List[Tuple[str, int, Optional[str]]]]:
|
||||
"""通过沉降 -> 观测点 -> 断面 得到每个 NYID 对应的【所有】测点列表 [(point_id, account_id, section_id), ...],无沉降的 NYID 返回 [('', 0, None)]"""
|
||||
if not nyid_list:
|
||||
return {}
|
||||
default_list = [("", 0, None)]
|
||||
settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all()
|
||||
# 每个 NYID 对应该期数下所有出现过的 (point_id),去重但保留顺序
|
||||
nyid_to_points: Dict[str, List[Tuple[str, int, Optional[str]]]] = {}
|
||||
point_ids = list({s.point_id for s in settlements if s.point_id})
|
||||
if not point_ids:
|
||||
return {nyid: default_list for nyid in nyid_list}
|
||||
checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_ids)).all()
|
||||
point_to_section = {c.point_id: (c.section_id or None) for c in checkpoints}
|
||||
section_ids = list({c.section_id for c in checkpoints if c.section_id})
|
||||
point_to_account: Dict[str, int] = {}
|
||||
if section_ids:
|
||||
sections = db.query(SectionData).filter(SectionData.section_id.in_(section_ids)).all()
|
||||
for c in checkpoints:
|
||||
sec = next((s for s in sections if s.section_id == c.section_id), None)
|
||||
if sec and sec.account_id is not None:
|
||||
try:
|
||||
point_to_account[c.point_id] = int(sec.account_id)
|
||||
except (ValueError, TypeError):
|
||||
point_to_account[c.point_id] = 0
|
||||
# 按 NYID 分组,每个 NYID 下该期数出现的所有 point_id(去重)
|
||||
for nyid in nyid_list:
|
||||
nyid_to_points[nyid] = []
|
||||
seen_per_nyid: Dict[str, set] = {nyid: set() for nyid in nyid_list}
|
||||
for s in settlements:
|
||||
pt_id = (s.point_id or "") if s.point_id else ""
|
||||
if pt_id not in seen_per_nyid.get(s.NYID, set()):
|
||||
seen_per_nyid[s.NYID].add(pt_id)
|
||||
acc = point_to_account.get(s.point_id, 0)
|
||||
sec_id = point_to_section.get(s.point_id)
|
||||
nyid_to_points[s.NYID].append((pt_id, acc, sec_id))
|
||||
for nyid in nyid_list:
|
||||
if not nyid_to_points[nyid]:
|
||||
nyid_to_points[nyid] = default_list
|
||||
return nyid_to_points
|
||||
|
||||
def _sync_lose_data_for_one_linecode(
|
||||
self,
|
||||
db: Session,
|
||||
linecode_val: str,
|
||||
level_list: List[LevelData],
|
||||
) -> None:
|
||||
"""仅处理一个水准线路编码:查该线路的 NYID,查沉降/原始,写入 lose_data。"""
|
||||
pairs = [(item.linecode, str(item.NYID)) for item in level_list if item.NYID]
|
||||
if not pairs:
|
||||
return
|
||||
nyid_list = list({nyid for _, nyid in pairs})
|
||||
settlement_nyids = self._get_nyids_with_settlement(db, nyid_list)
|
||||
original_data_map = self._find_original_data_by_nyids(db, nyid_list)
|
||||
original_nyids = set()
|
||||
for rows in original_data_map.values():
|
||||
for row in rows:
|
||||
n = row.get("NYID")
|
||||
if n is not None:
|
||||
original_nyids.add(str(n))
|
||||
nyid_to_points_asp = self._get_nyid_to_all_points_account_section(db, nyid_list)
|
||||
for linecode_, nyid in pairs:
|
||||
has_original = nyid in original_nyids
|
||||
has_settlement = nyid in settlement_nyids
|
||||
lose_val = (0 if has_original else 1) + (0 if has_settlement else 2)
|
||||
points_list = nyid_to_points_asp.get(nyid, [("", 0, None)])
|
||||
for point_id, acc_id, sec_id in points_list:
|
||||
pt_id = point_id or ""
|
||||
existing = db.query(LoseData).filter(
|
||||
LoseData.linecode == linecode_,
|
||||
LoseData.NYID == nyid,
|
||||
LoseData.point_id == pt_id,
|
||||
).first()
|
||||
if existing:
|
||||
existing.lose_data = lose_val
|
||||
existing.account_id = acc_id
|
||||
existing.section_id = sec_id
|
||||
else:
|
||||
db.add(LoseData(
|
||||
account_id=acc_id,
|
||||
NYID=nyid,
|
||||
linecode=linecode_,
|
||||
lose_data=lose_val,
|
||||
section_id=sec_id,
|
||||
point_id=pt_id,
|
||||
))
|
||||
|
||||
def sync_lose_data(self, db: Session, linecode: Optional[str] = None) -> Dict[str, Any]:
|
||||
"""
|
||||
同步缺失数据到 lose_data 表。
|
||||
无 linecode:按「每个水准线路编码」分批处理,每批只查该线路的 NYID 再查沉降/原始并插入,不返回明细。
|
||||
有 linecode:只处理该线路,并返回该线路在 lose_data 中的记录列表。
|
||||
缺失规则:原始数据无=1、有=0;沉降数据无=2、有=0;lose_data 字段为二者之和 0/1/2/3。
|
||||
同一 (linecode, NYID) 不重复插入,存在则更新。
|
||||
"""
|
||||
try:
|
||||
if linecode:
|
||||
level_list = self.get_by_linecode(db, linecode=linecode)
|
||||
if not level_list:
|
||||
return {"success": True, "data": []}
|
||||
self._sync_lose_data_for_one_linecode(db, linecode, level_list)
|
||||
db.commit()
|
||||
records = db.query(LoseData).filter(LoseData.linecode == linecode).order_by(LoseData.NYID.desc()).all()
|
||||
return {"success": True, "data": [r.to_dict() for r in records]}
|
||||
|
||||
# 全量:先取所有不重复的 linecode,再按每个 linecode 分批处理
|
||||
linecode_rows = db.query(LevelData.linecode).distinct().all()
|
||||
linecodes = [r[0] for r in linecode_rows if r[0]]
|
||||
if not linecodes:
|
||||
return {"success": True, "data": None}
|
||||
|
||||
for lc in linecodes:
|
||||
level_list = self.get_by_linecode(db, linecode=lc)
|
||||
self._sync_lose_data_for_one_linecode(db, lc, level_list)
|
||||
db.commit()
|
||||
logger.info(f"sync_lose_data 已处理线路: {lc}")
|
||||
|
||||
return {"success": True, "data": None}
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
logger.error(f"sync_lose_data 失败: {str(e)}", exc_info=True)
|
||||
return {"success": False, "data": [] if linecode else None, "message": str(e)}
|
||||
|
||||
def _backup_data_to_sql(self, db: Session, level_data_list: List[LevelData],
|
||||
settlement_list: List[SettlementData],
|
||||
checkpoint_list: List[Checkpoint],
|
||||
|
||||
@@ -30,6 +30,21 @@ class SettlementDataService(BaseService[SettlementData]):
|
||||
def get_by_nyid(self, db: Session, nyid: str) -> List[SettlementData]:
|
||||
"""根据期数ID获取沉降数据"""
|
||||
return self.get_by_field(db, "NYID", nyid)
|
||||
|
||||
def get_one_dict_by_nyid(self, db: Session, nyid: str) -> Optional[Dict[str, Any]]:
|
||||
"""根据期数ID取一条沉降记录并转为字典(供推理/到期计算用,datetime 转为字符串)"""
|
||||
row = db.query(SettlementData).filter(SettlementData.NYID == nyid).first()
|
||||
if not row:
|
||||
return None
|
||||
field_names = [c.name for c in SettlementData.__table__.columns]
|
||||
item = {}
|
||||
for k in field_names:
|
||||
v = getattr(row, k)
|
||||
if isinstance(v, datetime):
|
||||
item[k] = v.strftime("%Y-%m-%d %H:%M:%S")
|
||||
else:
|
||||
item[k] = v
|
||||
return item
|
||||
def get_by_nyid_and_point_id(self, db: Session, nyid: str, point_id: str) -> List[SettlementData]:
|
||||
"""根据期数ID和观测点ID获取沉降数据"""
|
||||
return self.get_by_field(db, "NYID", nyid, "point_id", point_id)
|
||||
@@ -600,11 +615,120 @@ class SettlementDataService(BaseService[SettlementData]):
|
||||
}
|
||||
|
||||
# 根据水准线路编码获取最新的NYID并获取对应的测点数据
|
||||
# def get_settlement_by_linecode(
|
||||
# self,
|
||||
# db: Session,
|
||||
# linecode: str,
|
||||
# num: int = 1000 # 控制返回的期数,默认1(最新一期)
|
||||
# ) -> Dict:
|
||||
# """
|
||||
# 根据水准线路编码(linecode)查询对应沉降数据,支持按期数筛选
|
||||
# 关联逻辑:
|
||||
# LevelData.linecode → LevelData.NYID → SettlementData.NYID
|
||||
# SettlementData.point_id(字符串)→ Checkpoint.point_id → Checkpoint.section_id → SectionData.section_id → SectionData.work_site
|
||||
# :param db: 数据库会话
|
||||
# :param linecode: 目标水准线路编码
|
||||
# :param num: 返回的期数(按NYID从大到小排序),默认1(最新一期)
|
||||
# :return: 字典格式,包含沉降数据列表,键为 "settlement_data"
|
||||
# """
|
||||
# try:
|
||||
# logger.info(f"开始查询linecode={linecode}对应的沉降数据(取前{num}期)")
|
||||
|
||||
# # 1. 根据linecode查询水准线路表,获取前N期的NYID
|
||||
# nyid_query = db.query(LevelData.NYID)\
|
||||
# .filter(LevelData.linecode == linecode)\
|
||||
# .distinct()\
|
||||
# .order_by(LevelData.NYID.desc())
|
||||
|
||||
# top_nyids = nyid_query.limit(num).all()
|
||||
# if not top_nyids:
|
||||
# logger.warning(f"未查询到linecode={linecode}对应的水准线路记录")
|
||||
# return {"settlement_data": []}
|
||||
|
||||
# target_nyids = [item.NYID for item in top_nyids]
|
||||
|
||||
# # 2. 关联查询:沉降数据 → 观测点表 → 断面表(新增查询Checkpoint.aname)
|
||||
# settlement_records = db.query(
|
||||
# SettlementData,
|
||||
# Checkpoint.section_id, # 从Checkpoint模型获取section_id
|
||||
# Checkpoint.aname, # 新增:从Checkpoint模型获取测点名称aname
|
||||
# SectionData.work_site # 从SectionData模型获取work_site
|
||||
# )\
|
||||
# .join(
|
||||
# Checkpoint, # 关联观测点模型(类名)
|
||||
# SettlementData.point_id == Checkpoint.point_id, # 字符串类型匹配
|
||||
# isouter=True # 左连接:避免测点未关联观测点时丢失数据
|
||||
# )\
|
||||
# .join(
|
||||
# SectionData, # 关联断面模型(类名)
|
||||
# Checkpoint.section_id == SectionData.section_id, # 字符串类型匹配
|
||||
# isouter=True # 左连接:避免断面ID未关联断面表时丢失数据
|
||||
# )\
|
||||
# .filter(SettlementData.NYID.in_(target_nyids))\
|
||||
# .order_by(
|
||||
# SettlementData.NYID.desc(), # 期数从新到旧
|
||||
# SettlementData.MTIME_W.asc() # 同期内按观测时间升序
|
||||
# )\
|
||||
# .all()
|
||||
|
||||
# # 3. 转换数据并新增字段
|
||||
# settlement_data = []
|
||||
# for record in settlement_records:
|
||||
# # 解析查询结果(元组:(沉降数据实例, section_id, aname, work_site))
|
||||
# settlement, section_id, aname, work_site = record
|
||||
|
||||
# # 根据work_site判断work_type(默认0表示未匹配或无数据) 涵洞H 沉降板L 观测桩G和Z(分标段) B 路基
|
||||
# work_type = 0
|
||||
# if work_site:
|
||||
# work_site_str = str(work_site).strip() # 确保为字符串且去空格
|
||||
# if "S" in aname:
|
||||
# work_type = 1
|
||||
# elif "L" in aname or "G" in aname or "Z" in aname or "B" in aname:
|
||||
# work_type = 2
|
||||
# elif "T" in aname or "D" in aname or "C " in aname:
|
||||
# work_type = 3
|
||||
# elif "H" in aname :
|
||||
# work_type = 4
|
||||
# # 组装返回字典(新增aname字段)
|
||||
# record_dict = {
|
||||
# "id": settlement.id,
|
||||
# "point_id": settlement.point_id,
|
||||
# "aname": aname, # 新增:测点名称(从Checkpoint表获取)
|
||||
# "section_id": section_id, # 新增:观测点关联的断面ID
|
||||
# "work_site": work_site, # 新增:断面的工点信息
|
||||
# "work_type": work_type, # 新增:工点类型编码(1-隧道,2-区间路基,3-桥,4-)
|
||||
# "CVALUE": settlement.CVALUE,
|
||||
# "MAVALUE": settlement.MAVALUE,
|
||||
# "MTIME_W": settlement.MTIME_W.strftime("%Y-%m-%d %H:%M:%S") if settlement.MTIME_W else None,
|
||||
# "NYID": settlement.NYID,
|
||||
# "PRELOADH": settlement.PRELOADH,
|
||||
# "PSTATE": settlement.PSTATE,
|
||||
# "REMARK": settlement.REMARK,
|
||||
# "WORKINFO": settlement.WORKINFO,
|
||||
# "createdate": settlement.createdate.strftime("%Y-%m-%d %H:%M:%S") if settlement.createdate else None,
|
||||
# "day": settlement.day,
|
||||
# "day_jg": settlement.day_jg,
|
||||
# "isgzjdxz": settlement.isgzjdxz,
|
||||
# "mavalue_bc": settlement.mavalue_bc,
|
||||
# "mavalue_lj": settlement.mavalue_lj,
|
||||
# "sjName": settlement.sjName,
|
||||
# "useflag": settlement.useflag,
|
||||
# "workinfoname": settlement.workinfoname,
|
||||
# "upd_remark": settlement.upd_remark
|
||||
# }
|
||||
# settlement_data.append(record_dict)
|
||||
|
||||
# logger.info(f"查询完成,linecode={linecode}的前{num}期对应{len(settlement_data)}条沉降数据")
|
||||
# return {"settlement_data": settlement_data}
|
||||
|
||||
# except Exception as e:
|
||||
# logger.error(f"查询linecode={linecode}的沉降数据失败:{str(e)}", exc_info=True)
|
||||
# raise e
|
||||
def get_settlement_by_linecode(
|
||||
self,
|
||||
db: Session,
|
||||
linecode: str,
|
||||
num: int = 1 # 控制返回的期数,默认1(最新一期)
|
||||
num: int = 1000 # 控制返回的期数,默认1000(最新1000期)
|
||||
) -> Dict:
|
||||
"""
|
||||
根据水准线路编码(linecode)查询对应沉降数据,支持按期数筛选
|
||||
@@ -613,8 +737,8 @@ class SettlementDataService(BaseService[SettlementData]):
|
||||
SettlementData.point_id(字符串)→ Checkpoint.point_id → Checkpoint.section_id → SectionData.section_id → SectionData.work_site
|
||||
:param db: 数据库会话
|
||||
:param linecode: 目标水准线路编码
|
||||
:param num: 返回的期数(按NYID从大到小排序),默认1(最新一期)
|
||||
:return: 字典格式,包含沉降数据列表,键为 "settlement_data"
|
||||
:param num: 返回的期数(按NYID从大到小排序),默认1000(最新1000期)
|
||||
:return: 字典格式,包含分组后的沉降数据嵌套列表,键为 "settlement_data"
|
||||
"""
|
||||
try:
|
||||
logger.info(f"开始查询linecode={linecode}对应的沉降数据(取前{num}期)")
|
||||
@@ -656,8 +780,8 @@ class SettlementDataService(BaseService[SettlementData]):
|
||||
)\
|
||||
.all()
|
||||
|
||||
# 3. 转换数据并新增字段
|
||||
settlement_data = []
|
||||
# 3. 转换数据并新增字段(先组装成原始记录列表)
|
||||
raw_settlement_data = []
|
||||
for record in settlement_records:
|
||||
# 解析查询结果(元组:(沉降数据实例, section_id, aname, work_site))
|
||||
settlement, section_id, aname, work_site = record
|
||||
@@ -701,10 +825,28 @@ class SettlementDataService(BaseService[SettlementData]):
|
||||
"workinfoname": settlement.workinfoname,
|
||||
"upd_remark": settlement.upd_remark
|
||||
}
|
||||
settlement_data.append(record_dict)
|
||||
raw_settlement_data.append(record_dict)
|
||||
|
||||
logger.info(f"查询完成,linecode={linecode}的前{num}期对应{len(settlement_data)}条沉降数据")
|
||||
return {"settlement_data": settlement_data}
|
||||
# 4. 按point_id分组,且每个分组内按NYID降序排序(核心修改部分)
|
||||
point_group_dict = {}
|
||||
for item in raw_settlement_data:
|
||||
point_id = item["point_id"]
|
||||
# 若该point_id未在字典中,初始化空列表
|
||||
if point_id not in point_group_dict:
|
||||
point_group_dict[point_id] = []
|
||||
# 将当前记录添加到对应point_id的列表中
|
||||
point_group_dict[point_id].append(item)
|
||||
|
||||
# 对每个分组内的记录按NYID降序排序(保障大的NYID在第一个)
|
||||
for point_id, item_list in point_group_dict.items():
|
||||
# 按NYID降序排序,NYID是数字,直接比较即可
|
||||
item_list.sort(key=lambda x: x["NYID"], reverse=True)
|
||||
|
||||
# 5. 提取字典的值,组成最终的嵌套列表
|
||||
final_settlement_data = list(point_group_dict.values())
|
||||
|
||||
logger.info(f"查询完成,linecode={linecode}的前{num}期对应{len(raw_settlement_data)}条沉降数据,共{len(final_settlement_data)}个不同测点")
|
||||
return {"settlement_data": final_settlement_data}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"查询linecode={linecode}的沉降数据失败:{str(e)}", exc_info=True)
|
||||
|
||||
@@ -5,53 +5,16 @@ import copy
|
||||
# 注意:根据实际项目路径调整导入,若本地测试可注释掉
|
||||
from ..core.logging_config import get_logger
|
||||
import json
|
||||
|
||||
from .operating_mode_config import BASE_PERIODS
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class ConstructionMonitorUtils:
|
||||
def __init__(self):
|
||||
# 原始工况周期映射表(保持不变)
|
||||
self.base_periods = {
|
||||
"仰拱(底板)施工完成后,第1个月": 7,
|
||||
"仰拱(底板)施工完成后,第2至3个月": 14,
|
||||
"仰拱(底板)施工完成后,3个月以后": 30,
|
||||
"仰拱(底板)施工完成后,第1个月": 7, # 原:仰拱(底板)施工完成后,第1个月
|
||||
"仰拱(底板)施工完成后,第2至3个月": 14, # 原:仰拱(底板)施工完成后,第2至3个月
|
||||
"仰拱(底板)施工完成后,3个月以后": 30, # 原:仰拱(底板)施工完成后,3个月以后
|
||||
"无砟轨道铺设后,第1至3个月": 30, # 原:无砟轨道铺设后,第1至3个月
|
||||
"无砟轨道铺设后,4至12个月": 90, # 原:无砟轨道铺设后,4至12个月
|
||||
"无砟轨道铺设后,12个月以后": 180, # 原:无砟轨道铺设后,12个月以后
|
||||
"墩台施工到一定高度": 30, # 无格式差异,保留原样
|
||||
"墩台混凝土施工": 30, # 无格式差异,保留原样
|
||||
"预制梁桥,架梁前": 30, # 原:预制梁桥,架梁前
|
||||
"预制梁桥,预制梁架设前": 1, # 原:预制梁桥,预制梁架设前
|
||||
"预制梁桥,预制梁架设后": 7, # 原:预制梁桥,预制梁架设后
|
||||
"桥位施工桥梁,制梁前": 30, # 原:桥位施工桥梁,制梁前
|
||||
"桥位施工桥梁,上部结构施工中": 1, # 原:桥位施工桥梁,上部结构施工中
|
||||
"架桥机(运梁车)通过": 7, # 无格式差异,保留原样
|
||||
"桥梁主体工程完工后,第1至3个月": 7, # 原:桥梁主体工程完工后,第1至3个月
|
||||
"桥梁主体工程完工后,第4至6个月": 14, # 原:桥梁主体工程完工后,第4至6个月
|
||||
"桥梁主体工程完工后,6个月以后": 30, # 原:桥梁主体工程完工后,6个月以后 ''
|
||||
"轨道铺设期间,前": 30,
|
||||
"轨道铺设期间,后": 14,
|
||||
"轨道铺设完成后,第1个月": 14,
|
||||
"轨道铺设完成后,2至3个月": 30,
|
||||
"轨道铺设完成后,4至12个月": 90,
|
||||
"轨道铺设完成后,12个月以后": 180,
|
||||
"铺路或堆载,一般情况": 1,
|
||||
"填筑或堆载,一般情况": 1,
|
||||
"铺路或堆载,沉降量突变情况": 1,
|
||||
"填筑或堆载,两次填筑间隔时间较长情况":3,
|
||||
"铺路或堆载,两次铺路间隔时间较长情况": 3,
|
||||
"堆载预压或路基填筑完成,第1至3个月":7, # 原:堆载预压或路基铺路完成,第1至3个月
|
||||
"堆载预压或路基填筑完成,第4至6个月": 14, # 原:堆载预压或路基铺路完成,第4至6个月
|
||||
"堆载预压或路基填筑完成,6个月以后": 30, # 原:堆载预压或路基铺路完成,6个月以后
|
||||
"架桥机(运梁车) 首次通过前": 1, # 原:架桥机(运梁车)首次通过前(仅加空格)
|
||||
"架桥机(运梁车) 首次通过后,前3天": 1, # 原:架桥机(运梁车)首次通过后,前3天
|
||||
"架桥机(运梁车) 首次通过后": 7, # 原:架桥机(运梁车)首次通过后(仅加空格)
|
||||
"轨道板(道床)铺设后,第1个月": 14, # 原:轨道板(道床)铺设后,第1个月
|
||||
"轨道板(道床)铺设后,第2至3个月": 30, # 原:轨道板(道床)铺设后,第2至3个月
|
||||
"轨道板(道床)铺设后,3个月以后": 90 # 未出现在待处理集,保留原始格式
|
||||
}
|
||||
# 使用公共配置的工况周期映射表
|
||||
self.base_periods = BASE_PERIODS.copy()
|
||||
# 构建中英文括号+逗号兼容映射表
|
||||
self.compatible_periods = self._build_compatible_brackets_map()
|
||||
|
||||
@@ -102,7 +65,7 @@ class ConstructionMonitorUtils:
|
||||
return compatible_map
|
||||
|
||||
def get_due_data(self, input_data: List[List[Dict]], start: int = 0, end: int = 0, current_date: datetime = None) -> Dict[str, List[Dict]]:
|
||||
result = {"winter": [], "data": [], "error_data": []}
|
||||
result = {"winter": [], "data": [], "error_data": [], "error_linecodes": []}
|
||||
if not input_data:
|
||||
return result
|
||||
|
||||
@@ -112,17 +75,13 @@ class ConstructionMonitorUtils:
|
||||
if not point_data:
|
||||
continue
|
||||
|
||||
# 过滤逻辑:仅保留 useflag 存在且值≠0 的记录
|
||||
# 推理用最新一期:取按 NYID 排序后的第一条(上游已保证倒序),不因 useflag 排除最新期
|
||||
latest_item = point_data[0]
|
||||
# 用于冬休回溯等:仅 useflag 有效的历史记录
|
||||
filtered_point_data = [
|
||||
item for item in point_data
|
||||
if "useflag" in item and item["useflag"] != 0 # 核心条件:字段存在 + 非0
|
||||
if "useflag" in item and item["useflag"] != 0
|
||||
]
|
||||
# 过滤后无数据则跳过当前测点
|
||||
if not filtered_point_data:
|
||||
continue
|
||||
|
||||
# 使用过滤后的数据处理
|
||||
latest_item = filtered_point_data[0]
|
||||
latest_condition = latest_item.get("workinfoname")
|
||||
if not latest_condition:
|
||||
result["error_data"].append(latest_item)
|
||||
@@ -180,7 +139,12 @@ class ConstructionMonitorUtils:
|
||||
continue
|
||||
|
||||
if not base_condition:
|
||||
# 当前为冬休或历史全是冬休 → 归入冬休;若本次是冬休且推不出,记入 error_linecodes 供写入 error_linecode 表
|
||||
result["winter"].append(item_copy)
|
||||
if latest_condition == "冬休":
|
||||
linecode = latest_item.get("__linecode")
|
||||
if linecode and linecode not in result["error_linecodes"]:
|
||||
result["error_linecodes"].append(linecode)
|
||||
continue
|
||||
|
||||
# 核心修改:冬休回溯场景下调整测量间隔(基准周期)
|
||||
|
||||
228
app/utils/get_operating_mode.py
Normal file
228
app/utils/get_operating_mode.py
Normal file
@@ -0,0 +1,228 @@
|
||||
from datetime import datetime, date
|
||||
|
||||
from .operating_mode_config import (
|
||||
BASE_PERIODS,
|
||||
OLD_TO_NEW_MAP,
|
||||
CONDITION_GROUP,
|
||||
TRANSITION_RULES,
|
||||
WINTER_BREAK_LABELS,
|
||||
)
|
||||
|
||||
|
||||
class OperatingModePredictor:
|
||||
"""
|
||||
工况预测类(处理二维倒序数据,返回一维列表,仅保留各内嵌列表最新记录)
|
||||
功能:根据输入的带时间序列的工况数据,推导每个监测点(point_id)的下一阶段工况
|
||||
特性:
|
||||
1. 输入为二维列表,每个内嵌列表对应一个point_id,且为倒序排列(最新记录在索引0)
|
||||
2. 输出为一维列表,仅保留每个内嵌列表的最新记录,新增工况推导结果字段
|
||||
3. 推导规则:有等效映射返回新工况,无等效保留旧工况;切换下一工况以base_periods为准
|
||||
4. 时间计算仅按日期(天)维度,忽略时分秒
|
||||
5. 冬休场景:仅以冬休前上一有效工况为判断基准,切换规则与非冬休完全一致
|
||||
6. 适配中英文括号、逗号、空格,内部标准化匹配,外部返回规范名称
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""初始化类,加载核心配置"""
|
||||
# 1. 基础工况配置(最终返回的规范名称,含所有新旧工况)
|
||||
self.base_periods = BASE_PERIODS.copy()
|
||||
# 2. 旧→新等效映射(优先返回新工况)
|
||||
self.old_to_new_map = OLD_TO_NEW_MAP.copy()
|
||||
# 3. 工况分组(同义工况归为同一分组,复用切换规则)
|
||||
self.condition_group = CONDITION_GROUP.copy()
|
||||
# 4. 切换触发规则(沿用原逻辑,触发天数+目标工况)
|
||||
self.transition_rules = TRANSITION_RULES.copy()
|
||||
# 5. 冬休标识
|
||||
self.winter_break_labels = WINTER_BREAK_LABELS.copy()
|
||||
|
||||
# 辅助映射:标准化名称→base_periods中的规范名称(用于最终返回)
|
||||
self.std_to_canonical = {
|
||||
self._standardize_name(name): name for name in self.base_periods.keys()
|
||||
}
|
||||
# 辅助映射:标准化名称→分组ID
|
||||
self.std_to_group = {
|
||||
self._standardize_name(name): group_id
|
||||
for name, group_id in self.condition_group.items()
|
||||
}
|
||||
|
||||
def _standardize_name(self, name):
|
||||
"""
|
||||
标准化工况名称(内部匹配用):去空格、统一中英文符号
|
||||
:param name: 原始工况名称
|
||||
:return: 标准化后的名称
|
||||
"""
|
||||
if not name:
|
||||
return ""
|
||||
# 去所有空格
|
||||
std_name = name.replace(" ", "").strip()
|
||||
# 统一中英文符号
|
||||
replace_map = {
|
||||
"(": "(", ")": ")", ",": ",", "。": ",", "~": "至",
|
||||
";": ";", ":": ":", "'": "'", """: "\""
|
||||
}
|
||||
for old, new in replace_map.items():
|
||||
std_name = std_name.replace(old, new)
|
||||
return std_name
|
||||
|
||||
def _parse_to_date(self, time_str):
|
||||
"""解析时间字符串为date对象(仅保留年月日)"""
|
||||
if not time_str:
|
||||
return None
|
||||
try:
|
||||
dt = datetime.strptime(str(time_str).strip(), "%Y-%m-%d %H:%M:%S")
|
||||
return dt.date()
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
def _get_time_statistics(self, data, target_workinfo):
|
||||
"""从倒序数据中提取目标工况的时间统计信息"""
|
||||
# 筛选目标工况的有效记录
|
||||
target_records = [
|
||||
d for d in data
|
||||
if self._standardize_name(d.get("workinfoname")) == self._standardize_name(target_workinfo)
|
||||
and d.get("workinfoname") not in self.winter_break_labels
|
||||
]
|
||||
if not target_records:
|
||||
return None, 0, 0
|
||||
|
||||
# 解析日期
|
||||
target_dates = []
|
||||
for item in target_records:
|
||||
d = self._parse_to_date(item.get("MTIME_W"))
|
||||
if d:
|
||||
target_dates.append(d)
|
||||
if not target_dates:
|
||||
return None, 0, 0
|
||||
|
||||
# 计算时间差(倒序数据:最新在0,最早在-1)
|
||||
last_date = target_dates[0]
|
||||
first_date = target_dates[-1]
|
||||
cumulative_days = (last_date - first_date).days
|
||||
today = date.today()
|
||||
days_to_today = (today - first_date).days if first_date else 0
|
||||
|
||||
return first_date, cumulative_days, days_to_today
|
||||
|
||||
def _get_pre_winter_break_workinfo(self, inner_data_list):
|
||||
"""提取冬休前的上一个有效工况"""
|
||||
if not inner_data_list:
|
||||
return None
|
||||
# 倒序遍历,跳过冬休,找第一个有效工况
|
||||
for record in inner_data_list:
|
||||
current_work = record.get("workinfoname")
|
||||
if current_work and current_work not in self.winter_break_labels:
|
||||
# 优先返回等效新工况
|
||||
return self.old_to_new_map.get(current_work, current_work)
|
||||
return None
|
||||
|
||||
def _validate_point_id(self, inner_list):
|
||||
"""校验内嵌列表内point_id是否一致"""
|
||||
if not inner_list:
|
||||
return None
|
||||
base_point_id = inner_list[0].get("point_id")
|
||||
for item in inner_list:
|
||||
if item.get("point_id") != base_point_id:
|
||||
return None
|
||||
return base_point_id
|
||||
|
||||
def predict(self, data_2d_list):
|
||||
"""
|
||||
公有核心方法:执行工况预测
|
||||
:param data_2d_list: 二维倒序数据列表,格式 [[{},{},{}], [{},{},{}]]
|
||||
:return: 一维结果列表,格式 [{}, {}, {}]
|
||||
"""
|
||||
final_result = []
|
||||
|
||||
for inner_data_list in data_2d_list:
|
||||
if not isinstance(inner_data_list, list) or len(inner_data_list) == 0:
|
||||
continue
|
||||
|
||||
# 取最新记录并复制(避免修改原数据)
|
||||
latest_record = inner_data_list[0].copy()
|
||||
point_id = self._validate_point_id(inner_data_list)
|
||||
|
||||
# 校验point_id
|
||||
if not point_id:
|
||||
latest_record.update({
|
||||
"status": "fail",
|
||||
"current_workinfo": None,
|
||||
"first_measure_date": None,
|
||||
"days_from_first_to_today": None,
|
||||
"next_workinfo": None,
|
||||
"judge_based_workinfo": None,
|
||||
"error_msg": "point_id不一致或缺失"
|
||||
})
|
||||
final_result.append(latest_record)
|
||||
continue
|
||||
|
||||
# 获取当前工况并标准化
|
||||
current_workinfo = latest_record.get("workinfoname")
|
||||
if not current_workinfo or self._standardize_name(current_workinfo) not in self.std_to_canonical:
|
||||
latest_record.update({
|
||||
"status": "fail",
|
||||
"current_workinfo": None,
|
||||
"first_measure_date": None,
|
||||
"days_from_first_to_today": None,
|
||||
"next_workinfo": None,
|
||||
"judge_based_workinfo": None,
|
||||
"error_msg": "工况无效或缺失"
|
||||
})
|
||||
final_result.append(latest_record)
|
||||
continue
|
||||
|
||||
# 冬休逻辑:取冬休前工况作为判断基准
|
||||
if current_workinfo in self.winter_break_labels:
|
||||
judge_based_workinfo = self._get_pre_winter_break_workinfo(inner_data_list)
|
||||
if not judge_based_workinfo:
|
||||
latest_record.update({
|
||||
"status": "fail",
|
||||
"current_workinfo": current_workinfo,
|
||||
"first_measure_date": None,
|
||||
"days_from_first_to_today": None,
|
||||
"next_workinfo": None,
|
||||
"judge_based_workinfo": None,
|
||||
"error_msg": "冬休前未找到有效工况"
|
||||
})
|
||||
final_result.append(latest_record)
|
||||
continue
|
||||
else:
|
||||
# 非冬休:优先使用等效新工况作为判断基准
|
||||
judge_based_workinfo = self.old_to_new_map.get(current_workinfo, current_workinfo)
|
||||
|
||||
# 计算时间统计信息
|
||||
first_dt, cumulative_days, days_to_today = self._get_time_statistics(
|
||||
inner_data_list, judge_based_workinfo
|
||||
)
|
||||
first_measure_date = first_dt.strftime("%Y-%m-%d") if first_dt else None
|
||||
|
||||
# 推导下一工况(以base_periods中的名称为准)
|
||||
std_judge_work = self._standardize_name(judge_based_workinfo)
|
||||
group_id = self.std_to_group.get(std_judge_work, "STATIC")
|
||||
rule = self.transition_rules.get(group_id, {})
|
||||
trigger_days = rule.get("trigger_days")
|
||||
next_candidates = rule.get("next", [])
|
||||
|
||||
if trigger_days is not None and cumulative_days >= trigger_days and next_candidates:
|
||||
# 切换工况:取base_periods中的规范名称
|
||||
next_workinfo = next_candidates[0]
|
||||
else:
|
||||
# 不切换:有等效则返回新工况,无则返回旧工况
|
||||
next_workinfo = self.old_to_new_map.get(judge_based_workinfo, judge_based_workinfo)
|
||||
|
||||
# 组装最终结果
|
||||
latest_record.update({
|
||||
"status": "success",
|
||||
"current_workinfo": current_workinfo, # 保留原始输入工况
|
||||
"judge_based_workinfo": judge_based_workinfo, # 实际判断用的工况(新工况优先)
|
||||
"first_measure_date": first_measure_date,
|
||||
"days_from_first_to_today": days_to_today,
|
||||
"next_workinfo": next_workinfo, # 推导结果:有等效返回新工况,无则返回旧工况
|
||||
"point_id": point_id,
|
||||
"error_msg": ""
|
||||
})
|
||||
|
||||
final_result.append(latest_record)
|
||||
|
||||
return final_result
|
||||
|
||||
|
||||
246
app/utils/operating_mode_config.py
Normal file
246
app/utils/operating_mode_config.py
Normal file
@@ -0,0 +1,246 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
工况公共配置模块
|
||||
供 get_operating_mode.py(工况预测)和 construction_monitor.py(施工监测)共用
|
||||
"""
|
||||
|
||||
# ==================== 基础工况周期(天) ====================
|
||||
# 含新旧工况及兼容变体,键为工况名称,值为测量间隔天数
|
||||
BASE_PERIODS = {
|
||||
# 路基工况(新工况)
|
||||
"路基或预压土填筑,连续填筑": 1,
|
||||
"路基或预压土填筑,两次填筑间隔时间较长": 7,
|
||||
"预压土或路基填筑完成,第1~3个月": 7,
|
||||
"预压土或路基填筑完成,第4~6个月": 14,
|
||||
"预压土或路基填筑完成,6个月以后": 30,
|
||||
"架桥机(运梁车)首次通过前": 1,
|
||||
"架桥机(运梁车)首次通过后,前3天": 1,
|
||||
"架桥机(运梁车)首次通过后": 7,
|
||||
"轨道板(道床)铺设后,第1至3个月": 14,
|
||||
"轨道板(道床)铺设后,第4至6个月": 30,
|
||||
"轨道板(道床)铺设后,6个月以后": 90,
|
||||
|
||||
# 路基旧工况
|
||||
"填筑或堆载,一般情况": 1,
|
||||
"填筑或堆载,两次填筑间隔时间较长情况": 7,
|
||||
"堆载预压或路基填筑完成,第1至3个月": 7,
|
||||
"堆载预压或路基填筑完成,第4至6个月": 14,
|
||||
"堆载预压或路基填筑完成,6个月以后": 30,
|
||||
"轨道板(道床)铺设后,第1个月": 14,
|
||||
"轨道板(道床)铺设后,第2至3个月": 30,
|
||||
"轨道板(道床)铺设后,3个月以后": 90,
|
||||
|
||||
# 路基兼容变体(铺路/填筑同义)
|
||||
"铺路或堆载,一般情况": 1,
|
||||
"铺路或堆载,沉降量突变情况": 1,
|
||||
"铺路或堆载,两次铺路间隔时间较长情况": 3,
|
||||
|
||||
# 桥梁工况(新工况)
|
||||
"桥墩(台)地面处拆模后": 30,
|
||||
"墩身混凝土施工": 30,
|
||||
"预制梁桥,预制梁架设前": 1,
|
||||
"预制梁桥,预制梁架设后": 7,
|
||||
"现浇梁,浇筑前": 30,
|
||||
"现浇梁上部结构施工中": 1,
|
||||
"架桥机(运梁车)通过": 2,
|
||||
"桥梁主体工程完工后,第1至3个月": 7,
|
||||
"桥梁主体工程完工后,第4至6个月": 14,
|
||||
"桥梁主体工程完工后,6个月以后": 30,
|
||||
"轨道铺设,前": 30,
|
||||
"轨道铺设,后": 14,
|
||||
"轨道铺设完成后,第1个月": 14,
|
||||
"轨道铺设完成后,2至3个月": 30,
|
||||
"轨道铺设完成后,4至12个月": 90,
|
||||
"轨道铺设完成后,12个月以后": 180,
|
||||
|
||||
# 桥梁旧工况
|
||||
"墩台施工到一定高度": 30,
|
||||
"墩台混凝土施工": 30,
|
||||
"预制梁桥,架梁前": 30,
|
||||
"桥位施工桥梁,制梁前": 30,
|
||||
"桥位施工桥梁,上部结构施工中": 1,
|
||||
"桥梁主体工程完工后,第1至3个月": 7,
|
||||
"桥梁主体工程完工后,第4至6个月": 14,
|
||||
"桥梁主体工程完工后,6个月以后": 30,
|
||||
"轨道铺设期间,前": 30,
|
||||
"轨道铺设期间,后": 14,
|
||||
|
||||
# 桥梁兼容变体
|
||||
"架桥机(运梁车) 首次通过前": 1,
|
||||
"架桥机(运梁车) 首次通过后,前3天": 1,
|
||||
"架桥机(运梁车) 首次通过后": 7,
|
||||
"预制梁桥预制梁架设后": 7,
|
||||
|
||||
# 隧道工况
|
||||
"仰拱(底板)施工完成后,第1个月": 7,
|
||||
"仰拱(底板)施工完成后,第2至3个月": 14,
|
||||
"仰拱(底板)施工完成后,3个月以后": 30,
|
||||
"无砟轨道铺设后,第1至3个月": 30,
|
||||
"无砟轨道铺设后,4至12个月": 90,
|
||||
"无砟轨道铺设后,12个月以后": 180,
|
||||
|
||||
# 隧道兼容变体(中文括号)
|
||||
"仰拱(底板)施工完成后,第1个月": 7,
|
||||
"仰拱(底板)施工完成后,第2至3个月": 14,
|
||||
"仰拱(底板)施工完成后,3个月以后": 30,
|
||||
|
||||
# 其他特殊工况
|
||||
"轨道板铺设前": 14,
|
||||
"站场填方路基段填筑完成至静态验收": 14,
|
||||
"特殊地段隧道施工完成后至静态验收": 14,
|
||||
"冬休": 0,
|
||||
}
|
||||
|
||||
# ==================== 旧→新等效映射(工况预测用) ====================
|
||||
OLD_TO_NEW_MAP = {
|
||||
# 路基等效
|
||||
"填筑或堆载,一般情况": "路基或预压土填筑,连续填筑",
|
||||
"填筑或堆载,两次填筑间隔时间较长情况": "路基或预压土填筑,两次填筑间隔时间较长",
|
||||
"堆载预压或路基填筑完成,第1至3个月": "预压土或路基填筑完成,第1~3个月",
|
||||
"堆载预压或路基填筑完成,第4至6个月": "预压土或路基填筑完成,第4~6个月",
|
||||
"堆载预压或路基填筑完成,6个月以后": "预压土或路基填筑完成,6个月以后",
|
||||
"轨道板(道床)铺设后,第1个月": "轨道板(道床)铺设后,第1至3个月",
|
||||
"轨道板(道床)铺设后,第2至3个月": "轨道板(道床)铺设后,第4至6个月",
|
||||
"轨道板(道床)铺设后,3个月以后": "轨道板(道床)铺设后,6个月以后",
|
||||
"铺路或堆载,一般情况": "路基或预压土填筑,连续填筑",
|
||||
"铺路或堆载,沉降量突变情况": "路基或预压土填筑,连续填筑",
|
||||
"铺路或堆载,两次铺路间隔时间较长情况": "路基或预压土填筑,两次填筑间隔时间较长",
|
||||
|
||||
# 桥梁等效
|
||||
"墩台施工到一定高度": "桥墩(台)地面处拆模后",
|
||||
"墩台混凝土施工": "墩身混凝土施工",
|
||||
"桥位施工桥梁,制梁前": "现浇梁,浇筑前",
|
||||
"桥位施工桥梁,上部结构施工中": "现浇梁上部结构施工中",
|
||||
"桥梁主体工程完工后,第1至3个月": "桥梁主体工程完工后,第1至3个月",
|
||||
"桥梁主体工程完工后,第4至6个月": "桥梁主体工程完工后,第4至6个月",
|
||||
"桥梁主体工程完工后,6个月以后": "桥梁主体工程完工后,6个月以后",
|
||||
"轨道铺设期间,前": "轨道铺设,前",
|
||||
"轨道铺设期间,后": "轨道铺设,后",
|
||||
"预制梁桥预制梁架设后": "预制梁桥,预制梁架设后",
|
||||
"架桥机(运梁车) 首次通过前": "架桥机(运梁车)首次通过前",
|
||||
"架桥机(运梁车) 首次通过后,前3天": "架桥机(运梁车)首次通过后,前3天",
|
||||
"架桥机(运梁车) 首次通过后": "架桥机(运梁车)首次通过后",
|
||||
}
|
||||
|
||||
# ==================== 工况分组(工况预测用) ====================
|
||||
CONDITION_GROUP = {
|
||||
# 路基分组
|
||||
"路基或预压土填筑,连续填筑": "DZ_CONTINUE",
|
||||
"路基或预压土填筑,两次填筑间隔时间较长": "DZ_INTERVAL",
|
||||
"预压土或路基填筑完成,第1~3个月": "DZ_FINISH_1_3",
|
||||
"预压土或路基填筑完成,第4~6个月": "DZ_FINISH_4_6",
|
||||
"预压土或路基填筑完成,6个月以后": "DZ_FINISH_AFTER_6",
|
||||
"架桥机(运梁车)首次通过前": "JQJ_FIRST_BEFORE",
|
||||
"架桥机(运梁车)首次通过后,前3天": "JQJ_FIRST_AFTER_3D",
|
||||
"架桥机(运梁车)首次通过后": "JQJ_FIRST_AFTER",
|
||||
"轨道板(道床)铺设后,第1至3个月": "GDB_FINISH_1_3",
|
||||
"轨道板(道床)铺设后,第4至6个月": "GDB_FINISH_4_6",
|
||||
"轨道板(道床)铺设后,6个月以后": "GDB_FINISH_AFTER_6",
|
||||
|
||||
# 路基旧工况分组
|
||||
"填筑或堆载,一般情况": "DZ_CONTINUE",
|
||||
"填筑或堆载,两次填筑间隔时间较长情况": "DZ_INTERVAL",
|
||||
"堆载预压或路基填筑完成,第1至3个月": "DZ_FINISH_1_3",
|
||||
"堆载预压或路基填筑完成,第4至6个月": "DZ_FINISH_4_6",
|
||||
"堆载预压或路基填筑完成,6个月以后": "DZ_FINISH_AFTER_6",
|
||||
"轨道板(道床)铺设后,第1个月": "GDB_FINISH_1_3",
|
||||
"轨道板(道床)铺设后,第2至3个月": "GDB_FINISH_4_6",
|
||||
"轨道板(道床)铺设后,3个月以后": "GDB_FINISH_AFTER_6",
|
||||
"铺路或堆载,一般情况": "DZ_CONTINUE",
|
||||
"铺路或堆载,沉降量突变情况": "DZ_CONTINUE",
|
||||
"铺路或堆载,两次铺路间隔时间较长情况": "DZ_INTERVAL",
|
||||
|
||||
# 桥梁分组
|
||||
"桥墩(台)地面处拆模后": "STATIC",
|
||||
"墩身混凝土施工": "STATIC",
|
||||
"预制梁桥,架梁前": "STATIC",
|
||||
"预制梁桥,预制梁架设前": "YZLQ_BEFORE_JS",
|
||||
"预制梁桥,预制梁架设后": "YZLQ_AFTER_JS",
|
||||
"现浇梁,浇筑前": "STATIC",
|
||||
"现浇梁上部结构施工中": "STATIC",
|
||||
"架桥机(运梁车)通过": "STATIC",
|
||||
"桥梁主体工程完工后,第1至3个月": "QL_ZHUTI_1_3",
|
||||
"桥梁主体工程完工后,第4至6个月": "QL_ZHUTI_4_6",
|
||||
"桥梁主体工程完工后,6个月以后": "QL_ZHUTI_AFTER_6",
|
||||
"轨道铺设,前": "STATIC",
|
||||
"轨道铺设,后": "STATIC",
|
||||
"轨道铺设完成后,第1个月": "GD_FINISH_1",
|
||||
"轨道铺设完成后,2至3个月": "GD_FINISH_2_3",
|
||||
"轨道铺设完成后,4至12个月": "GD_FINISH_4_12",
|
||||
"轨道铺设完成后,12个月以后": "GD_FINISH_AFTER_12",
|
||||
|
||||
# 桥梁旧工况分组
|
||||
"墩台施工到一定高度": "STATIC",
|
||||
"墩台混凝土施工": "STATIC",
|
||||
"桥位施工桥梁,制梁前": "STATIC",
|
||||
"桥位施工桥梁,上部结构施工中": "STATIC",
|
||||
"桥梁主体工程完工后,第1至3个月": "QL_ZHUTI_1_3",
|
||||
"桥梁主体工程完工后,第4至6个月": "QL_ZHUTI_4_6",
|
||||
"桥梁主体工程完工后,6个月以后": "QL_ZHUTI_AFTER_6",
|
||||
"轨道铺设期间,前": "STATIC",
|
||||
"轨道铺设期间,后": "STATIC",
|
||||
"预制梁桥预制梁架设后": "YZLQ_AFTER_JS",
|
||||
"架桥机(运梁车) 首次通过前": "JQJ_FIRST_BEFORE",
|
||||
"架桥机(运梁车) 首次通过后,前3天": "JQJ_FIRST_AFTER_3D",
|
||||
"架桥机(运梁车) 首次通过后": "JQJ_FIRST_AFTER",
|
||||
|
||||
# 隧道分组
|
||||
"仰拱(底板)施工完成后,第1个月": "YG_DIBAN_1",
|
||||
"仰拱(底板)施工完成后,第2至3个月": "YG_DIBAN_2_3",
|
||||
"仰拱(底板)施工完成后,3个月以后": "YG_DIBAN_AFTER_3",
|
||||
"无砟轨道铺设后,第1至3个月": "WZGD_1_3",
|
||||
"无砟轨道铺设后,4至12个月": "WZGD_4_12",
|
||||
"无砟轨道铺设后,12个月以后": "WZGD_AFTER_12",
|
||||
"仰拱(底板)施工完成后,第1个月": "YG_DIBAN_1",
|
||||
"仰拱(底板)施工完成后,第2至3个月": "YG_DIBAN_2_3",
|
||||
"仰拱(底板)施工完成后,3个月以后": "YG_DIBAN_AFTER_3",
|
||||
|
||||
# 其他特殊工况分组
|
||||
"轨道板铺设前": "STATIC",
|
||||
"站场填方路基段填筑完成至静态验收": "STATIC",
|
||||
"特殊地段隧道施工完成后至静态验收": "STATIC",
|
||||
|
||||
# 特殊工况
|
||||
"冬休": "STATIC",
|
||||
}
|
||||
|
||||
# ==================== 切换触发规则(工况预测用) ====================
|
||||
TRANSITION_RULES = {
|
||||
# 路基切换规则
|
||||
"DZ_FINISH_1_3": {"trigger_days": 90, "next": ["预压土或路基填筑完成,第4~6个月"]},
|
||||
"DZ_FINISH_4_6": {"trigger_days": 90, "next": ["预压土或路基填筑完成,6个月以后"]},
|
||||
"DZ_FINISH_AFTER_6": {"trigger_days": None, "next": None},
|
||||
"JQJ_FIRST_BEFORE": {"trigger_days": 1, "next": ["架桥机(运梁车)首次通过后,前3天"]},
|
||||
"JQJ_FIRST_AFTER_3D": {"trigger_days": 3, "next": ["架桥机(运梁车)首次通过后"]},
|
||||
"JQJ_FIRST_AFTER": {"trigger_days": None, "next": None},
|
||||
"GDB_FINISH_1_3": {"trigger_days": 30, "next": ["轨道板(道床)铺设后,第4至6个月"]},
|
||||
"GDB_FINISH_4_6": {"trigger_days": 30, "next": ["轨道板(道床)铺设后,6个月以后"]},
|
||||
"GDB_FINISH_AFTER_6": {"trigger_days": None, "next": None},
|
||||
|
||||
# 桥梁切换规则
|
||||
"YZLQ_BEFORE_JS": {"trigger_days": 1, "next": ["架桥机(运梁车)通过"]},
|
||||
"YZLQ_AFTER_JS": {"trigger_days": 7, "next": ["桥梁主体工程完工后,第1至3个月"]},
|
||||
"QL_ZHUTI_1_3": {"trigger_days": 90, "next": ["桥梁主体工程完工后,第4至6个月"]},
|
||||
"QL_ZHUTI_4_6": {"trigger_days": 90, "next": ["桥梁主体工程完工后,6个月以后"]},
|
||||
"QL_ZHUTI_AFTER_6": {"trigger_days": None, "next": None},
|
||||
"GD_FINISH_1": {"trigger_days": 30, "next": ["轨道铺设完成后,2至3个月"]},
|
||||
"GD_FINISH_2_3": {"trigger_days": 60, "next": ["轨道铺设完成后,4至12个月"]},
|
||||
"GD_FINISH_4_12": {"trigger_days": 240, "next": ["轨道铺设完成后,12个月以后"]},
|
||||
"GD_FINISH_AFTER_12": {"trigger_days": None, "next": None},
|
||||
|
||||
# 隧道切换规则
|
||||
"YG_DIBAN_1": {"trigger_days": 30, "next": ["仰拱(底板)施工完成后,第2至3个月"]},
|
||||
"YG_DIBAN_2_3": {"trigger_days": 60, "next": ["仰拱(底板)施工完成后,3个月以后"]},
|
||||
"YG_DIBAN_AFTER_3": {"trigger_days": None, "next": None},
|
||||
"WZGD_1_3": {"trigger_days": 90, "next": ["无砟轨道铺设后,4至12个月"]},
|
||||
"WZGD_4_12": {"trigger_days": 240, "next": ["无砟轨道铺设后,12个月以后"]},
|
||||
"WZGD_AFTER_12": {"trigger_days": None, "next": None},
|
||||
|
||||
# 静态分组(无切换)
|
||||
"DZ_CONTINUE": {"trigger_days": None, "next": None},
|
||||
"DZ_INTERVAL": {"trigger_days": None, "next": None},
|
||||
"STATIC": {"trigger_days": None, "next": None},
|
||||
}
|
||||
|
||||
# ==================== 冬休标识 ====================
|
||||
WINTER_BREAK_LABELS = {"冬休"}
|
||||
@@ -16,7 +16,10 @@ from ..services.section_data import SectionDataService
|
||||
from ..services.account import AccountService
|
||||
from ..models.daily import DailyData
|
||||
from ..models.settlement_data import SettlementData
|
||||
from typing import List
|
||||
from ..models.level_data import LevelData
|
||||
from ..models.error_linecode import ErrorLinecode
|
||||
from ..services.settlement_data import SettlementDataService
|
||||
from typing import List, Tuple, Any
|
||||
from ..utils.construction_monitor import ConstructionMonitorUtils
|
||||
import time
|
||||
import json
|
||||
@@ -95,10 +98,8 @@ class TaskScheduler:
|
||||
# name='每日重置账号更新状态'
|
||||
# )
|
||||
# logger.info("系统定时任务:每日重置账号更新状态已添加")
|
||||
# existing_job = None
|
||||
# existing_job = self.scheduler.get_job("scheduled_get_max_nyid_by_point_id")
|
||||
# if not existing_job:
|
||||
# # 添加每天凌晨1点执行获取max NYID关联数据任务
|
||||
# if existing_job is None:
|
||||
# self.scheduler.add_job(
|
||||
# scheduled_get_max_nyid_by_point_id,
|
||||
# 'cron',
|
||||
@@ -234,92 +235,105 @@ def scheduled_get_max_nyid_by_point_id(start: int = 0,end: int = 0):
|
||||
|
||||
# logger.info(f"DailyData表清空完成,共删除{delete_count}条历史记录")
|
||||
|
||||
# 1. 获取沉降数据(返回 List[List[dict]])
|
||||
daily_service = DailyDataService()
|
||||
result = daily_service.get_nyid_by_point_id(db, [], 1)
|
||||
|
||||
# 2. 计算到期数据
|
||||
monitor = ConstructionMonitorUtils()
|
||||
daily_data = monitor.get_due_data(result,start=start,end=end)
|
||||
data = daily_data['data']
|
||||
error_data = daily_data['error_data']
|
||||
|
||||
winters = daily_data['winter']
|
||||
logger.info(f"首次获取数据完成,共{len(result)}条记录")
|
||||
|
||||
# 3. 循环处理冬休数据,追溯历史非冬休记录
|
||||
max_num = 1
|
||||
print(f"首次获取冬休数据完成,共{len(winters)}条记录")
|
||||
while 1:
|
||||
max_num += 1
|
||||
print(max_num)
|
||||
# 提取冬休数据的point_id列表
|
||||
new_list = [int(w['point_id']) for w in winters]
|
||||
# print(new_list)
|
||||
if new_list == []:
|
||||
break
|
||||
# 获取更多历史记录
|
||||
nyid_list = daily_service.get_nyid_by_point_id(db, new_list, max_num)
|
||||
w_list = monitor.get_due_data(nyid_list,start=start,end=end)
|
||||
# 更新冬休、待处理、错误数据
|
||||
winters = w_list['winter']
|
||||
data.extend(w_list['data'])
|
||||
# 过期数据一并处理
|
||||
# data.extend(w_list['error_data'])
|
||||
error_data.extend(w_list['error_data'])
|
||||
# print(f"第{max_num}次获取冬休数据完成,共{len(winters)}条记录")
|
||||
if winters == []:
|
||||
break
|
||||
data.extend(error_data)
|
||||
# 4. 初始化服务实例
|
||||
# 1. 以 level_data 为来源:每个 linecode 取最新一期(NYID 最大),再按该 NYID 从 settlement 取一条
|
||||
level_service = LevelDataService()
|
||||
settlement_service = SettlementDataService()
|
||||
daily_service = DailyDataService()
|
||||
checkpoint_db = CheckpointService()
|
||||
section_db = SectionDataService()
|
||||
account_service = AccountService()
|
||||
# 5. 关联其他表数据(核心逻辑保留)
|
||||
for d in data:
|
||||
# 处理 LevelData(假设返回列表,取第一条)
|
||||
level_results = level_service.get_by_nyid(db, d['NYID'])
|
||||
level_instance = level_results[0] if isinstance(level_results, list) and level_results else level_results
|
||||
d['level_data'] = level_instance.to_dict() if level_instance else None
|
||||
monitor = ConstructionMonitorUtils()
|
||||
|
||||
# 处理 CheckpointData(返回单实例,直接使用)
|
||||
logger.info("正在查询水准线路列表…")
|
||||
linecodes = [r[0] for r in db.query(LevelData.linecode).distinct().all()]
|
||||
logger.info(f"共 {len(linecodes)} 个水准线路,开始拉取多期沉降…")
|
||||
# 每个 linecode 传多期沉降(按 NYID 降序),供冬休「一直推」直到推出或推不出
|
||||
input_data: List[List[dict]] = []
|
||||
for idx, linecode in enumerate(linecodes):
|
||||
level_list = level_service.get_by_linecode(db, linecode)
|
||||
if not level_list:
|
||||
continue
|
||||
# 按 NYID 降序(最新在前),最多取 30 期
|
||||
level_list_sorted = sorted(
|
||||
level_list,
|
||||
key=lambda x: int(x.NYID) if str(x.NYID).isdigit() else 0,
|
||||
reverse=True,
|
||||
)[:30]
|
||||
point_data_for_linecode: List[dict] = []
|
||||
level_latest = level_list_sorted[0]
|
||||
for level_instance in level_list_sorted:
|
||||
nyid = level_instance.NYID
|
||||
settlement_dict = settlement_service.get_one_dict_by_nyid(db, nyid)
|
||||
if not settlement_dict:
|
||||
continue
|
||||
settlement_dict["__linecode"] = linecode
|
||||
# 仅最新一期带 __level_data,供后续写 daily 用
|
||||
settlement_dict["__level_data"] = level_latest.to_dict()
|
||||
point_data_for_linecode.append(settlement_dict)
|
||||
if point_data_for_linecode:
|
||||
input_data.append(point_data_for_linecode)
|
||||
if (idx + 1) % 50 == 0 or idx == len(linecodes) - 1:
|
||||
logger.info(f"已处理线路 {idx + 1}/{len(linecodes)},当前共 {len(input_data)} 条可推理")
|
||||
logger.info(f"多期沉降拉取完成,共 {len(input_data)} 条线路,开始推理…")
|
||||
if not input_data:
|
||||
logger.warning("未找到任何 linecode 对应的最新期沉降数据,跳过写 daily")
|
||||
db.execute(text(f"TRUNCATE TABLE {DailyData.__tablename__}"))
|
||||
db.commit()
|
||||
db.close()
|
||||
return
|
||||
|
||||
# 2. 计算到期数据(remaining / 冬休等)
|
||||
daily_data = monitor.get_due_data(input_data, start=start, end=end)
|
||||
logger.info("推理完成")
|
||||
data = daily_data["data"]
|
||||
# 冬休一直推、推不出来的 linecode 写入 error_linecode 表(表 id 需为 AUTO_INCREMENT)
|
||||
try:
|
||||
for linecode in daily_data.get("error_linecodes", []):
|
||||
if not linecode:
|
||||
continue
|
||||
exists = db.query(ErrorLinecode).filter(ErrorLinecode.linecode == linecode).first()
|
||||
if not exists:
|
||||
db.add(ErrorLinecode(linecode=linecode))
|
||||
if daily_data.get("error_linecodes"):
|
||||
logger.info(f"推理推不出来的线路将写入 error_linecode: {daily_data['error_linecodes']}")
|
||||
except Exception as elc:
|
||||
db.rollback()
|
||||
logger.warning(
|
||||
f"error_linecode 写入失败(请确保表 id 为 AUTO_INCREMENT): {elc},已跳过,继续写 daily"
|
||||
)
|
||||
logger.info(f"按 level_data 最新期获取数据完成,共{len(data)}条有效记录")
|
||||
|
||||
# 3. 关联 level / checkpoint / section / account(level 已带在 __level_data)
|
||||
for d in data:
|
||||
d['level_data'] = d.pop('__level_data', None)
|
||||
d.pop('__linecode', None)
|
||||
checkpoint_instance = checkpoint_db.get_by_point_id(db, d['point_id'])
|
||||
d['checkpoint_data'] = checkpoint_instance.to_dict() if checkpoint_instance else None
|
||||
|
||||
# 处理 SectionData(根据checkpoint_data关联)
|
||||
if d['checkpoint_data']:
|
||||
section_instance = section_db.get_by_section_id(db, d['checkpoint_data']['section_id'])
|
||||
d['section_data'] = section_instance.to_dict() if section_instance else None
|
||||
else:
|
||||
d['section_data'] = None
|
||||
|
||||
# 处理 AccountData
|
||||
if d.get('section_data') and d['section_data'].get('account_id'):
|
||||
account_response = account_service.get_account(db, account_id=d['section_data']['account_id'])
|
||||
d['account_data'] = account_response.__dict__ if account_response else None
|
||||
else:
|
||||
d['account_data'] = None
|
||||
print(f"一共有{len(data)}条数据")
|
||||
# 6. 构造DailyData数据并批量创建
|
||||
# daily_create_data1 = set()
|
||||
|
||||
# 4. 构造 DailyData(每条已是「每 linecode 最新一期」)
|
||||
daily_create_data = []
|
||||
nyids = []
|
||||
for d in data:
|
||||
# 过滤无效数据(避免缺失关键字段报错)
|
||||
if all(key in d for key in ['NYID', 'point_id','remaining']) and d.get('level_data') and d.get('account_data') and d.get('section_data'):
|
||||
if d['NYID'] in nyids:
|
||||
continue
|
||||
tem = {
|
||||
'NYID': d['NYID'],
|
||||
'point_id': d['point_id'],
|
||||
'linecode': d['level_data']['linecode'],
|
||||
'account_id': d['account_data']['account_id'],
|
||||
'section_id': d['section_data']['section_id'],
|
||||
'remaining': (0-int(d['overdue'])) if 'overdue' in d else d['remaining'],
|
||||
}
|
||||
nyids.append(d['NYID'])
|
||||
daily_create_data.append(tem)
|
||||
if not all(key in d for key in ['NYID', 'point_id', 'remaining']) or not d.get('level_data') or not d.get('account_data') or not d.get('section_data'):
|
||||
continue
|
||||
tem = {
|
||||
'NYID': d['NYID'],
|
||||
'point_id': d['point_id'],
|
||||
'linecode': d['level_data']['linecode'],
|
||||
'account_id': d['account_data']['account_id'],
|
||||
'section_id': d['section_data']['section_id'],
|
||||
'remaining': (0 - int(d['overdue'])) if 'overdue' in d else d['remaining'],
|
||||
}
|
||||
daily_create_data.append(tem)
|
||||
# 批量创建记录
|
||||
print(daily_create_data)
|
||||
if daily_create_data:
|
||||
|
||||
36
fix_mysql.sh
Normal file
36
fix_mysql.sh
Normal file
@@ -0,0 +1,36 @@
|
||||
#!/bin/bash
|
||||
|
||||
echo "=== 修复 MySQL 监听配置 ==="
|
||||
|
||||
# 1. 备份配置文件
|
||||
sudo cp /etc/mysql/mysql.conf.d/mysqld.cnf /etc/mysql/mysql.conf.d/mysqld.cnf.backup.$(date +%Y%m%d_%H%M%S)
|
||||
echo "✅ 已备份配置文件"
|
||||
|
||||
# 2. 修改 bind-address
|
||||
sudo sed -i 's/^bind-address.*127.0.0.1/bind-address = 0.0.0.0/' /etc/mysql/mysql.conf.d/mysqld.cnf
|
||||
echo "✅ 已修改 bind-address"
|
||||
|
||||
# 3. 显示修改后的配置
|
||||
echo ""
|
||||
echo "修改后的配置:"
|
||||
sudo grep "bind-address" /etc/mysql/mysql.conf.d/mysqld.cnf
|
||||
echo ""
|
||||
|
||||
# 4. 重启 MySQL
|
||||
echo "正在重启 MySQL..."
|
||||
sudo systemctl restart mysql
|
||||
sleep 2
|
||||
|
||||
# 5. 验证
|
||||
echo ""
|
||||
echo "验证 MySQL 监听状态:"
|
||||
sudo netstat -tlnp | grep 3306
|
||||
echo ""
|
||||
|
||||
# 6. 测试连接
|
||||
DOCKER0_IP=$(ip addr show docker0 | grep -oP '(?<=inet\s)\d+(\.\d+){3}')
|
||||
echo "测试从本机连接 MySQL (IP: $DOCKER0_IP):"
|
||||
mysql -h $DOCKER0_IP -u railway -p'Railway01.' -e "SELECT 'Connection OK' as status, DATABASE() as current_db, VERSION() as version;" 2>&1
|
||||
|
||||
echo ""
|
||||
echo "=== 修复完成 ==="
|
||||
27
test_connection.sh
Normal file
27
test_connection.sh
Normal file
@@ -0,0 +1,27 @@
|
||||
#!/bin/bash
|
||||
|
||||
echo "=== Docker 网络诊断 ==="
|
||||
echo ""
|
||||
|
||||
echo "1. Docker0 网桥 IP:"
|
||||
ip addr show docker0 2>/dev/null | grep -oP '(?<=inet\s)\d+(\.\d+){3}' || echo "❌ docker0 不存在"
|
||||
echo ""
|
||||
|
||||
echo "2. MySQL 监听状态:"
|
||||
sudo netstat -tlnp | grep 3306 || echo "❌ MySQL 未运行或未监听 3306"
|
||||
echo ""
|
||||
|
||||
echo "3. MySQL 用户权限:"
|
||||
mysql -u root -p -e "SELECT user, host FROM mysql.user WHERE user='railway';" 2>/dev/null || echo "❌ 无法查询(需要 root 密码)"
|
||||
echo ""
|
||||
|
||||
echo "4. 测试从容器连接 MySQL:"
|
||||
DOCKER0_IP=$(ip addr show docker0 | grep -oP '(?<=inet\s)\d+(\.\d+){3}')
|
||||
if [ ! -z "$DOCKER0_IP" ]; then
|
||||
docker run --rm mysql:8.0 mysql -h $DOCKER0_IP -u railway -p'Railway01.' -e "SELECT 'OK' as status;" 2>&1
|
||||
else
|
||||
echo "❌ 无法获取 docker0 IP"
|
||||
fi
|
||||
|
||||
echo ""
|
||||
echo "=== 诊断完成 ==="
|
||||
Reference in New Issue
Block a user