diff --git a/.gitignore b/.gitignore index 99a1356..814a7ed 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ upload_app/data/ upload_app/output/ upload_app/*.json +backups/ diff --git a/app/services/daily.py b/app/services/daily.py index a7bda46..06d09b1 100644 --- a/app/services/daily.py +++ b/app/services/daily.py @@ -1,357 +1,358 @@ -from sqlalchemy.orm import Session -from typing import List, Optional, Dict, Any, Set, Tuple,Union -from ..models.level_data import LevelData -from ..models.daily import DailyData -from .base import BaseService -from ..models.settlement_data import SettlementData -from sqlalchemy import func, select, desc,over -from sqlalchemy.orm import Session -import logging -logger = logging.getLogger(__name__) -class DailyDataService(BaseService[DailyData]): - def __init__(self): - super().__init__(DailyData) - - def _dict_to_instance(self, data_dict: Dict) -> DailyData: - """辅助方法:将单个字典转换为 DailyData 实例""" - model_fields = [col.name for col in DailyData.__table__.columns] - filtered_data = {k: v for k, v in data_dict.items() if k in model_fields} - return DailyData(**filtered_data) - - def _ensure_instances(self, data: Union[List[Dict], List[DailyData]]) -> List[DailyData]: - """确保输入数据是 DailyData 实例列表""" - if not isinstance(data, list): - raise TypeError(f"输入必须是列表,而非 {type(data)}") - - instances = [] - for item in data: - if isinstance(item, DailyData): - instances.append(item) - elif isinstance(item, dict): - instances.append(self._dict_to_instance(item)) - else: - raise TypeError(f"列表元素必须是 dict 或 DailyData 实例,而非 {type(item)}") - return instances - - def batch_create_by_account_nyid(self, db: Session, data: Union[List[Dict], List[DailyData]]) -> List[DailyData]: - """ - 批量创建记录,支持两种输入格式: - - List[DailyData]:模型实例列表 - - List[dict]:字典列表(自动转换为实例) - 通过 (account_id, NYID) 联合判断是否已存在,存在则忽略 --(暂时取消查重) - """ - try: - data_list = self._ensure_instances(data) - except TypeError as e: - logger.error(f"数据格式错误:{str(e)}") - raise - - target_pairs: List[Tuple[int, int]] = [ - (item.account_id, item.NYID) - for item in data_list - if item.account_id is not None and item.NYID is not None - ] - - if not target_pairs: - logger.warning("批量创建失败:所有记录缺少 account_id 或 NYID") - return [] - - - # 取消查重处理 - # existing_pairs: Set[Tuple[int, int]] = { - # (item.account_id, item.NYID) - # for item in db.query(DailyData.account_id, DailyData.NYID) - # .filter(DailyData.account_id.in_([p[0] for p in target_pairs]), - # DailyData.NYID.in_([p[1] for p in target_pairs])) - # .all() - # } - - to_create = [ - item for item in data_list - # if (item.account_id, item.NYID) not in existing_pairs - ] - - # ignored_count = len(data_list) - len(to_create) - # if ignored_count > 0: - # logger.info(f"批量创建时忽略{ignored_count}条已存在记录(account_id和NYID已存在)") - - logger.info(f"批量创建 {to_create}") - if not to_create: - return [] - - # 修复点:使用 add_all 替代 bulk_save_objects,确保对象被会话跟踪 - db.add_all(to_create) # 这里是关键修改 - db.commit() - - # 现在可以安全地刷新实例了 - for item in to_create: - db.refresh(item) - - return to_create - - - def get_nyid_by_point_id( - self, - db: Session, - point_ids: Optional[List[int]] = None, - max_num: int = 1 - ) -> List[List[dict]]: - """ - 获取指定point_id的记录,每个point_id的前max_num条记录放在同一个子列表中 - 返回格式:[[point1_records...], [point2_records...]] - """ - # 处理参数默认值 - point_ids = point_ids or [] - if max_num <= 0: - return [] - - # 窗口函数:按point_id分组,每组内按NYID降序编号 - row_num = over( - func.row_number(), - partition_by=SettlementData.point_id, - order_by=desc(SettlementData.NYID) - ).label("row_num") - - # 模型字段列表 - model_columns = [getattr(SettlementData, col.name) for col in SettlementData.__table__.columns] - - # 基础条件 - base_conditions = [ - SettlementData.useflag.isnot(None), - SettlementData.useflag != 0 - ] - if point_ids: - base_conditions.append(SettlementData.point_id.in_(point_ids)) - - # 子查询 - subquery = ( - select(*model_columns, row_num) - .where(*base_conditions) - .subquery() - ) - - # 主查询:筛选每个point_id的前max_num条 - query = ( - select(subquery) - .where(subquery.c.row_num <= max_num) - .order_by(subquery.c.point_id, subquery.c.row_num) - ) - - # 执行查询 - results = db.execute(query).all() - grouped: Dict[int, List[dict]] = {} # 键:point_id,值:该point_id的所有记录(子列表) - field_names = [col.name for col in SettlementData.__table__.columns] - - for row in results: - item_dict = {field: getattr(row, field) for field in field_names} - try: - pid = int(item_dict["point_id"]) # 确保point_id为整数 - except (KeyError, ValueError): - continue # 跳过无效记录 - - # 同一point_id的记录放入同一个列表 - if pid not in grouped: - grouped[pid] = [] # 初始化子列表 - grouped[pid].append(item_dict) # 追加到子列表 - - # 按输入point_ids顺序整理结果(关键:每个point_id对应一个子列表) - if not point_ids: - point_ids = sorted(grouped.keys()) # 若无指定,按point_id排序 - - # 构建最终结果:每个point_id的记录作为一个子列表 - return [grouped.get(pid, []) for pid in point_ids] - - # 获取所有的今日数据 - def get_all_daily_data( - self, - db: Session, - user_id: Optional[int] = None # 可选参数:按user_id筛选 - ) -> List[Dict[str, Any]]: - """ - 获取所有日常数据(DailyData),支持按user_id筛选 - :param db: 数据库会话 - :param user_id: 可选用户ID,若提供则只返回该用户的数据 - :return: 日常数据字典列表,包含所有字段 - """ - try: - # 基础查询 - query = db.query(DailyData) - - # 若提供了user_id,则添加筛选条件 - if user_id is not None: - query = query.filter(DailyData.user_id == user_id) - logger.info(f"查询user_id={user_id}的所有日常数据") - else: - logger.info("查询所有日常数据") - - # 执行查询并获取所有记录 - daily_records = query.all() - - # 转换为字典列表(保留所有字段) - result = [] - for record in daily_records: - record_dict = { - column.name: getattr(record, column.name) - for column in DailyData.__table__.columns - } - result.append(record_dict) - - logger.info(f"查询完成,共获取{len(result)}条日常数据") - return result - - except Exception as e: - logger.error(f"获取日常数据失败:{str(e)}", exc_info=True) - raise e - def get_daily_data_by_account( - self, - db: Session, - account_id: str, # 账号ID(必填,因为是核心筛选条件) - user_id: Optional[int] = None # 可选参数:额外按user_id筛选 - ) -> List[Dict[str, Any]]: - """ - 根据account_id获取对应日常数据,支持额外按user_id筛选 - :param db: 数据库会话 - :param account_id: 账号ID(必填),用于精准筛选数据 - :param user_id: 可选用户ID,若提供则则进一步筛选该用户的数据 - :return: 符合条件的日常数据字典列表,包含所有字段 - """ - try: - # 基础查询:先按account_id筛选(必填条件) - query = db.query(DailyData).filter(DailyData.account_id == account_id) - - # 若提供了user_id,则添加额外筛选条件 - if user_id is not None: - query = query.filter(DailyData.user_id == user_id) - logger.info(f"查询account_id={account_id}且user_id={user_id}的日常数据") - else: - logger.info(f"查询account_id={account_id}的所有日常数据") - - # 执行查询并获取记录 - daily_records = query.all() - - # 转换为字典列表(保留所有字段) - result = [] - for record in daily_records: - record_dict = { - column.name: getattr(record, column.name) - for column in DailyData.__table__.columns - } - result.append(record_dict) - - logger.info(f"查询完成,account_id={account_id}对应{len(result)}条日常数据") - return result - - except Exception as e: - logger.error(f"获取account_id={account_id}的日常数据失败:{str(e)}", exc_info=True) - raise e - - def create_daily_from_linecode( - self, - db: Session, - linecode: str, - account_id: Optional[int] = None - ) -> List[DailyData]: - """ - 通过水准线路编码生成 daily 数据 - - 业务逻辑: - 1. 在水准数据表(level_data)中查找符合 linecode 的记录,且 NYID 最大 - 2. 通过 NYID 查询沉降数据表(settlement_data) - 3. 通过沉降数据的 point_id 查询观测点表(checkpoint),得到 section_id - 4. 通过 section_id 查询断面表(section_data),得到 account_id - 5. 整合这些数据,形成 daily 对象,插入到数据库表 - - Args: - db: 数据库会话 - linecode: 水准线路编码 - account_id: 可选的账户ID筛选条件 - - Returns: - 创建的 DailyData 记录列表 - """ - try: - logger.info(f"开始处理 linecode={linecode} 的 daily 数据生成请求") - - from ..models.level_data import LevelData - from ..models.settlement_data import SettlementData - from ..models.checkpoint import Checkpoint - from ..models.section_data import SectionData - - # 1. 在水准数据表中查找符合 linecode 的记录,且 NYID 最大 - level_data_list = db.query(LevelData)\ - .filter(LevelData.linecode == linecode)\ - .all() - - if not level_data_list: - raise ValueError(f"未找到 linecode={linecode} 对应的水准数据") - - # 找到 NYID 最大的记录(将 String 转换为数字进行比较) - max_nyid = max(level_data_list, key=lambda x: int(x.NYID) if x.NYID.isdigit() else 0) - target_nyid = max_nyid.NYID - logger.info(f"找到最大 NYID: {target_nyid}") - - # 2. 通过 NYID 查询沉降数据 - settlement_data_list = db.query(SettlementData)\ - .filter(SettlementData.NYID == target_nyid)\ - .all() - - if not settlement_data_list: - raise ValueError(f"未找到 NYID={target_nyid} 对应的沉降数据") - - # 3. 遍历沉降数据,构建 daily 记录 - daily_records = [] - for settlement in settlement_data_list: - # 3.1 通过沉降数据的 point_id 查询观测点表,得到 section_id - checkpoint = db.query(Checkpoint)\ - .filter(Checkpoint.point_id == settlement.point_id)\ - .first() - if not checkpoint: - logger.warning(f"未找到 point_id={settlement.point_id} 对应的观测点,跳过该记录") - continue - - # 3.2 通过 section_id 查询断面表,得到 account_id - section = db.query(SectionData)\ - .filter(SectionData.section_id == checkpoint.section_id)\ - .first() - if not section: - logger.warning(f"未找到 section_id={checkpoint.section_id} 对应的断面,跳过该记录") - continue - - # 3.3 从断面数据中获取 account_id,作为 user_id - user_id = int(section.account_id) if section.account_id else None - if not user_id: - logger.warning(f"断面 section_id={checkpoint.section_id} 没有 account_id,跳过该记录") - continue - - # 如果提供了 account_id 筛选条件,则只处理匹配的记录 - if account_id is not None and user_id != account_id: - continue - - # 3.4 构建 daily 记录 - daily_record = DailyData( - user_id=user_id, - account_id=user_id, - point_id=settlement.point_id, - NYID=settlement.NYID, - linecode=linecode, - section_id=checkpoint.section_id, - remaining=0, - is_all=0 - ) - daily_records.append(daily_record) - - if not daily_records: - logger.warning(f"没有生成任何 daily 记录") - return [] - - # 4. 批量插入 daily 记录 - created_records = self.batch_create_by_account_nyid(db, daily_records) - - logger.info(f"成功生成 {len(created_records)} 条 daily 记录") - return created_records - - except ValueError: - raise - except Exception as e: - logger.error(f"生成 daily 数据失败:{str(e)}", exc_info=True) - raise \ No newline at end of file +from sqlalchemy.orm import Session +from typing import List, Optional, Dict, Any, Set, Tuple,Union +from ..models.level_data import LevelData +from ..models.daily import DailyData +from .base import BaseService +from ..models.settlement_data import SettlementData +from sqlalchemy import func, select, desc,over +from sqlalchemy.orm import Session +import logging +logger = logging.getLogger(__name__) +class DailyDataService(BaseService[DailyData]): + def __init__(self): + super().__init__(DailyData) + + def _dict_to_instance(self, data_dict: Dict) -> DailyData: + """辅助方法:将单个字典转换为 DailyData 实例""" + model_fields = [col.name for col in DailyData.__table__.columns] + filtered_data = {k: v for k, v in data_dict.items() if k in model_fields} + return DailyData(**filtered_data) + + def _ensure_instances(self, data: Union[List[Dict], List[DailyData]]) -> List[DailyData]: + """确保输入数据是 DailyData 实例列表""" + if not isinstance(data, list): + raise TypeError(f"输入必须是列表,而非 {type(data)}") + + instances = [] + for item in data: + if isinstance(item, DailyData): + instances.append(item) + elif isinstance(item, dict): + instances.append(self._dict_to_instance(item)) + else: + raise TypeError(f"列表元素必须是 dict 或 DailyData 实例,而非 {type(item)}") + return instances + + def batch_create_by_account_nyid(self, db: Session, data: Union[List[Dict], List[DailyData]]) -> List[DailyData]: + """ + 批量创建记录,支持两种输入格式: + - List[DailyData]:模型实例列表 + - List[dict]:字典列表(自动转换为实例) + 通过 (account_id, NYID) 联合判断是否已存在,存在则忽略 --(暂时取消查重) + """ + try: + data_list = self._ensure_instances(data) + except TypeError as e: + logger.error(f"数据格式错误:{str(e)}") + raise + + target_pairs: List[Tuple[int, int]] = [ + (item.account_id, item.NYID) + for item in data_list + if item.account_id is not None and item.NYID is not None + ] + + if not target_pairs: + logger.warning("批量创建失败:所有记录缺少 account_id 或 NYID") + return [] + + + # 取消查重处理 + # existing_pairs: Set[Tuple[int, int]] = { + # (item.account_id, item.NYID) + # for item in db.query(DailyData.account_id, DailyData.NYID) + # .filter(DailyData.account_id.in_([p[0] for p in target_pairs]), + # DailyData.NYID.in_([p[1] for p in target_pairs])) + # .all() + # } + + to_create = [ + item for item in data_list + # if (item.account_id, item.NYID) not in existing_pairs + ] + + # ignored_count = len(data_list) - len(to_create) + # if ignored_count > 0: + # logger.info(f"批量创建时忽略{ignored_count}条已存在记录(account_id和NYID已存在)") + + logger.info(f"批量创建 {to_create}") + if not to_create: + return [] + + # 修复点:使用 add_all 替代 bulk_save_objects,确保对象被会话跟踪 + db.add_all(to_create) # 这里是关键修改 + db.commit() + + # 现在可以安全地刷新实例了 + for item in to_create: + db.refresh(item) + + return to_create + + + def get_nyid_by_point_id( + self, + db: Session, + point_ids: Optional[List[int]] = None, + max_num: int = 1 + ) -> List[List[dict]]: + """ + 获取指定point_id的记录,每个point_id的前max_num条记录放在同一个子列表中 + 返回格式:[[point1_records...], [point2_records...]] + """ + # 处理参数默认值 + point_ids = point_ids or [] + if max_num <= 0: + return [] + + # 窗口函数:按point_id分组,每组内按NYID降序编号 + row_num = over( + func.row_number(), + partition_by=SettlementData.point_id, + order_by=desc(SettlementData.NYID) + ).label("row_num") + + # 模型字段列表 + model_columns = [getattr(SettlementData, col.name) for col in SettlementData.__table__.columns] + + # 基础条件 + base_conditions = [ + SettlementData.useflag.isnot(None), + SettlementData.useflag != 0 + ] + if point_ids: + base_conditions.append(SettlementData.point_id.in_(point_ids)) + + # 子查询 + subquery = ( + select(*model_columns, row_num) + .where(*base_conditions) + .subquery() + ) + + # 主查询:筛选每个point_id的前max_num条 + query = ( + select(subquery) + .where(subquery.c.row_num <= max_num) + .order_by(subquery.c.point_id, subquery.c.row_num) + ) + + # 执行查询 + results = db.execute(query).all() + grouped: Dict[int, List[dict]] = {} # 键:point_id,值:该point_id的所有记录(子列表) + field_names = [col.name for col in SettlementData.__table__.columns] + + for row in results: + item_dict = {field: getattr(row, field) for field in field_names} + try: + pid = int(item_dict["point_id"]) # 确保point_id为整数 + except (KeyError, ValueError): + continue # 跳过无效记录 + + # 同一point_id的记录放入同一个列表 + if pid not in grouped: + grouped[pid] = [] # 初始化子列表 + grouped[pid].append(item_dict) # 追加到子列表 + + # 按输入point_ids顺序整理结果(关键:每个point_id对应一个子列表) + if not point_ids: + point_ids = sorted(grouped.keys()) # 若无指定,按point_id排序 + + # 构建最终结果:每个point_id的记录作为一个子列表 + return [grouped.get(pid, []) for pid in point_ids] + + # 获取所有的今日数据 + def get_all_daily_data( + self, + db: Session, + user_id: Optional[int] = None # 可选参数:按user_id筛选 + ) -> List[Dict[str, Any]]: + """ + 获取所有日常数据(DailyData),支持按user_id筛选 + :param db: 数据库会话 + :param user_id: 可选用户ID,若提供则只返回该用户的数据 + :return: 日常数据字典列表,包含所有字段 + """ + try: + # 基础查询 + query = db.query(DailyData) + + # 若提供了user_id,则添加筛选条件 + if user_id is not None: + query = query.filter(DailyData.user_id == user_id) + logger.info(f"查询user_id={user_id}的所有日常数据") + else: + logger.info("查询所有日常数据") + + # 执行查询并获取所有记录 + daily_records = query.all() + + # 转换为字典列表(保留所有字段) + result = [] + for record in daily_records: + record_dict = { + column.name: getattr(record, column.name) + for column in DailyData.__table__.columns + } + result.append(record_dict) + + logger.info(f"查询完成,共获取{len(result)}条日常数据") + return result + + except Exception as e: + logger.error(f"获取日常数据失败:{str(e)}", exc_info=True) + raise e + def get_daily_data_by_account( + self, + db: Session, + account_id: str, # 账号ID(必填,因为是核心筛选条件) + user_id: Optional[int] = None # 可选参数:额外按user_id筛选 + ) -> List[Dict[str, Any]]: + """ + 根据account_id获取对应日常数据,支持额外按user_id筛选 + :param db: 数据库会话 + :param account_id: 账号ID(必填),用于精准筛选数据 + :param user_id: 可选用户ID,若提供则则进一步筛选该用户的数据 + :return: 符合条件的日常数据字典列表,包含所有字段 + """ + try: + # 基础查询:先按account_id筛选(必填条件) + query = db.query(DailyData).filter(DailyData.account_id == account_id) + + # 若提供了user_id,则添加额外筛选条件 + if user_id is not None: + query = query.filter(DailyData.user_id == user_id) + logger.info(f"查询account_id={account_id}且user_id={user_id}的日常数据") + else: + logger.info(f"查询account_id={account_id}的所有日常数据") + + # 执行查询并获取记录 + daily_records = query.all() + + # 转换为字典列表(保留所有字段) + result = [] + for record in daily_records: + record_dict = { + column.name: getattr(record, column.name) + for column in DailyData.__table__.columns + } + result.append(record_dict) + + logger.info(f"查询完成,account_id={account_id}对应{len(result)}条日常数据") + return result + + except Exception as e: + logger.error(f"获取account_id={account_id}的日常数据失败:{str(e)}", exc_info=True) + raise e + + def create_daily_from_linecode( + self, + db: Session, + linecode: str, + account_id: Optional[int] = None + ) -> List[DailyData]: + """ + 通过水准线路编码生成 daily 数据 + + 业务逻辑: + 1. 在水准数据表(level_data)中查找符合 linecode 的记录,且 NYID 最大 + 2. 通过 NYID 查询沉降数据表(settlement_data) + 3. 通过沉降数据的 point_id 查询观测点表(checkpoint),得到 section_id + 4. 通过 section_id 查询断面表(section_data),得到 account_id + 5. 整合这些数据,形成 daily 对象,插入到数据库表 + + Args: + db: 数据库会话 + linecode: 水准线路编码 + account_id: 可选的账户ID筛选条件 + + Returns: + 创建的 DailyData 记录列表 + """ + try: + logger.info(f"开始处理 linecode={linecode} 的 daily 数据生成请求") + + from ..models.level_data import LevelData + from ..models.settlement_data import SettlementData + from ..models.checkpoint import Checkpoint + from ..models.section_data import SectionData + + # 1. 在水准数据表中查找符合 linecode 的记录,且 NYID 最大 + level_data_list = db.query(LevelData)\ + .filter(LevelData.linecode == linecode)\ + .all() + + if not level_data_list: + raise ValueError(f"未找到 linecode={linecode} 对应的水准数据") + + # 找到 NYID 最大的记录(将 String 转换为数字进行比较) + max_nyid = max(level_data_list, key=lambda x: int(x.NYID) if x.NYID.isdigit() else 0) + target_nyid = max_nyid.NYID + logger.info(f"找到最大 NYID: {target_nyid}") + + # 2. 通过 NYID 查询沉降数据 + settlement_data_list = db.query(SettlementData)\ + .filter(SettlementData.NYID == target_nyid)\ + .all() + + if not settlement_data_list: + raise ValueError(f"未找到 NYID={target_nyid} 对应的沉降数据") + + # 3. 遍历沉降数据,构建 daily 记录 + daily_records = [] + for settlement in settlement_data_list: + # 3.1 通过沉降数据的 point_id 查询观测点表,得到 section_id + checkpoint = db.query(Checkpoint)\ + .filter(Checkpoint.point_id == settlement.point_id)\ + .first() + if not checkpoint: + logger.warning(f"未找到 point_id={settlement.point_id} 对应的观测点,跳过该记录") + continue + + # 3.2 通过 section_id 查询断面表,得到 account_id + section = db.query(SectionData)\ + .filter(SectionData.section_id == checkpoint.section_id)\ + .first() + if not section: + logger.warning(f"未找到 section_id={checkpoint.section_id} 对应的断面,跳过该记录") + continue + + # 3.3 从断面数据中获取 account_id,作为 user_id + user_id = int(section.account_id) if section.account_id else None + if not user_id: + logger.warning(f"断面 section_id={checkpoint.section_id} 没有 account_id,跳过该记录") + continue + + # 如果提供了 account_id 筛选条件,则只处理匹配的记录 + if account_id is not None and user_id != account_id: + continue + + # 3.4 构建 daily 记录 + daily_record = DailyData( + user_id=user_id, + account_id=user_id, + point_id=settlement.point_id, + NYID=settlement.NYID, + linecode=linecode, + section_id=checkpoint.section_id, + remaining=0, + is_all=0 + ) + daily_records.append(daily_record) + + if not daily_records: + logger.warning(f"没有生成任何 daily 记录") + return [] + + # 4. 批量插入 daily 记录 + created_records = self.batch_create_by_account_nyid(db, daily_records) + + logger.info(f"成功生成 {len(created_records)} 条 daily 记录") + return created_records + + except ValueError: + raise + except Exception as e: + logger.error(f"生成 daily 数据失败:{str(e)}", exc_info=True) + + raise diff --git a/app/utils/construction_monitor.py b/app/utils/construction_monitor.py index c2d0cba..ea61261 100644 --- a/app/utils/construction_monitor.py +++ b/app/utils/construction_monitor.py @@ -11,6 +11,9 @@ class ConstructionMonitorUtils: def __init__(self): # 原始工况周期映射表(保持不变) self.base_periods = { + "仰拱(底板)施工完成后,第1个月": 7, + "仰拱(底板)施工完成后,第2至3个月": 14, + "仰拱(底板)施工完成后,3个月以后": 30, "仰拱(底板)施工完成后,第1个月": 7, # 原:仰拱(底板)施工完成后,第1个月 "仰拱(底板)施工完成后,第2至3个月": 14, # 原:仰拱(底板)施工完成后,第2至3个月 "仰拱(底板)施工完成后,3个月以后": 30, # 原:仰拱(底板)施工完成后,3个月以后 @@ -39,7 +42,7 @@ class ConstructionMonitorUtils: "铺路或堆载,沉降量突变情况": 1, "填筑或堆载,两次填筑间隔时间较长情况":3, "铺路或堆载,两次铺路间隔时间较长情况": 3, - "堆载预压或路基填筑完成, 第1至3个月":7, # 原:堆载预压或路基铺路完成,第1至3个月 + "堆载预压或路基填筑完成,第1至3个月":7, # 原:堆载预压或路基铺路完成,第1至3个月 "堆载预压或路基填筑完成,第4至6个月": 14, # 原:堆载预压或路基铺路完成,第4至6个月 "堆载预压或路基填筑完成,6个月以后": 30, # 原:堆载预压或路基铺路完成,6个月以后 "架桥机(运梁车) 首次通过前": 1, # 原:架桥机(运梁车)首次通过前(仅加空格)