1.新增通过point_id获取测点信息接口

This commit is contained in:
whm
2025-11-05 12:04:17 +08:00
parent 291a768bec
commit 5357625ada
3 changed files with 55 additions and 15 deletions

View File

@@ -18,7 +18,8 @@ from ..schemas.comprehensive_data import (
LevelDataQueryRequest,
LinecodeRequest,
NYIDRequest,
SectionByAccountRequest
SectionByAccountRequest,
PointByAccountRequest
)
from ..services.daily import DailyDataService
from ..services.section_data import SectionDataService
@@ -436,6 +437,37 @@ def get_all_checkpoint_by_section(request: SectionByAccountRequest, db: Session
checkpoint_service = CheckpointService()
result_data = checkpoint_service.get_by_section_id(db, section_id=section_id)
data_list = [item.to_dict() for item in result_data] if result_data else []
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(data_list),
data=data_list
)
except Exception as e:
logger.error(f"Query section data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
@router.post("/get_checkpoint_by_point", response_model=DataResponse)
def get_checkpoint_by_point(request: PointByAccountRequest, db: Session = Depends(get_db)):
"""根据观测点ID获取观测点"""
try:
point_id = request.point_id
checkpoint_service = CheckpointService()
result_data = checkpoint_service.get_by_point_id(db, point_id=point_id)
# 使用 __dict__ 转换(过滤内部属性)
if result_data:
# 复制字典并排除 SQLAlchemy 内部属性
data_dict = result_data.__dict__.copy()
data_dict.pop('_sa_instance_state', None) # 移除ORM内部状态属性
data_list = [data_dict]
else:
data_list = []
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",

View File

@@ -153,7 +153,9 @@ class SectionDataQueryRequest(BaseModel):
class SectionByAccountRequest(BaseModel):
account_id: Optional[str] = None
section_id: Optional[str] = None
# 测点数据
class PointByAccountRequest(BaseModel):
point_id: Optional[str] = None
# 水准数据查询请求
class LevelDataQueryRequest(BaseModel):
linecode: Optional[str] = None

View File

@@ -21,7 +21,7 @@ class ConstructionMonitorUtils:
"桥位施工桥梁,制梁前": 30,
"桥位施工桥梁,上部结构施工中": 1,
"架桥机(运梁车)通过": 7,
"桥梁主体工程完工后,第1至3个月": 7,
"桥梁主体工程完工后,第1至3个月": 7, # 模拟包含英文逗号的原始数据
"桥梁主体工程完工后第4至6个月": 14,
"桥梁主体工程完工后6个月以后": 30,
"轨道铺设期间,前": 30,
@@ -43,18 +43,31 @@ class ConstructionMonitorUtils:
"轨道板(道床)铺设后第2至3个月": 30,
"轨道板(道床)铺设后3个月以后": 90
}
# 构建中英文括号兼容映射表
# 构建中英文括号+逗号兼容映射表
self.compatible_periods = self._build_compatible_brackets_map()
def _build_compatible_brackets_map(self) -> Dict[str, int]:
"""构建支持中英文括号的兼容映射表"""
"""构建支持中英文括号、中英文逗号的兼容映射表"""
compatible_map = {}
for original_key, period in self.base_periods.items():
# 1. 保留原始key
compatible_map[original_key] = period
# 生成中文括号版key并添加到映射表
# 2. 生成中文括号版key原逻辑
chinese_bracket_key = original_key.replace("(", "").replace(")", "")
if chinese_bracket_key != original_key:
compatible_map[chinese_bracket_key] = period
# 3. 生成英文逗号转中文逗号版key新增逻辑
chinese_comma_key = original_key.replace(",", "")
if chinese_comma_key != original_key:
compatible_map[chinese_comma_key] = period
# 4. 生成中文括号+中文逗号混合版key双重兼容
mixed_key = chinese_bracket_key.replace(",", "")
if mixed_key != original_key and mixed_key not in compatible_map:
compatible_map[mixed_key] = period
return compatible_map
def get_due_data(self, input_data: List[List[Dict]], start: int = 0, end: int = 1, current_date: datetime = None) -> Dict[str, List[Dict]]:
@@ -64,9 +77,7 @@ class ConstructionMonitorUtils:
calc_date = current_date.date() if current_date else datetime.now().date()
# ------------------------------
# 原有核心逻辑(完全不变)
# ------------------------------
for point_idx, point_data in enumerate(input_data):
if not point_data:
continue
@@ -138,22 +149,17 @@ class ConstructionMonitorUtils:
item_copy["remaining"] = due_days
result["data"].append(item_copy)
# ------------------------------
# 新增步骤data中相同NYID保留剩余天数最少的记录
# ------------------------------
if result["data"]:
# 用字典临时存储key=NYIDvalue=该NYID下剩余天数最少的记录
nyid_min_remaining = {}
for record in result["data"]:
nyid = record.get("NYID") # 假设NYID字段名是"NYID",若实际不同需调整
nyid = record.get("NYID")
if not nyid:
continue # 无NYID的记录直接保留或按需求处理
continue
# 若该NYID未存储或当前记录剩余天数更少则更新
if nyid not in nyid_min_remaining or record["remaining"] < nyid_min_remaining[nyid]["remaining"]:
nyid_min_remaining[nyid] = record
# 将字典 values 转换为列表作为去重后的data
result["data"] = list(nyid_min_remaining.values())
return result