diff --git a/.env b/.env index 6a6eae4..a43e8ff 100644 --- a/.env +++ b/.env @@ -1,10 +1,12 @@ # 云端数据库配置 -DATABASE_URL=mysql+pymysql://railway:Railway01.@localhost:3306/railway -DB_HOST=localhost +DATABASE_URL=mysql+pymysql://railway:Railway01.@172.17.0.1:3306/railway +DB_HOST=172.17.0.1 DB_PORT=3306 DB_USER=railway DB_PASSWORD=Railway01. DB_NAME=railway + + # 本地配置 # DATABASE_URL=mysql+pymysql://railway:Railway01.@localhost:3306/railway # DB_HOST=localhost diff --git a/app/api/comprehensive_data.py b/app/api/comprehensive_data.py index 478c536..6917bae 100644 --- a/app/api/comprehensive_data.py +++ b/app/api/comprehensive_data.py @@ -16,8 +16,13 @@ from ..schemas.comprehensive_data import ( OriginalDataQueryRequest, SettlementDataCheckpointQueryRequest, LevelDataQueryRequest, - LinecodeRequest + LinecodeRequest, + NYIDRequest, + SectionByAccountRequest, + PointByAccountRequest, + TodayDataRequest ) +from ..services.daily import DailyDataService from ..services.section_data import SectionDataService from ..services.checkpoint import CheckpointService from ..services.settlement_data import SettlementDataService @@ -192,10 +197,10 @@ def batch_import_original_data(request: BatchOriginalDataImportRequest, db: Sess # 查询断面数据对应观察点数据 @router.post("/get_section", response_model=DataResponse) def get_section(request: SectionDataQueryRequest, db: Session = Depends(get_db)): - """获取断面数据 + 观测点""" + """获取断面数据 + 观测点(支持分页)""" try: logger.info(f"Querying section data with params: {request.dict()}") - result_data = section_service.search_sections_with_checkpoints( + result = section_service.search_sections_with_checkpoints( db, id=request.id, section_id=request.section_id, @@ -203,15 +208,17 @@ def get_section(request: SectionDataQueryRequest, db: Session = Depends(get_db)) work_site=request.work_site, number=request.number, status=request.status, - account_id=request.account_id + account_id=request.account_id, + skip=request.skip, + limit=request.limit ) - logger.info(f"Found {len(result_data)} sections with checkpoints") + logger.info(f"Found {result['total']} sections with checkpoints, returning {len(result['data'])} records") return DataResponse( code=ResponseCode.SUCCESS, message="查询成功", - total=len(result_data), - data=result_data + total=result['total'], + data=result['data'] ) except Exception as e: logger.error(f"Query section data failed: {str(e)}") @@ -225,25 +232,26 @@ def get_section(request: SectionDataQueryRequest, db: Session = Depends(get_db)) # 根据观测点id查询沉降数据 @router.post("/get_settlement", response_model=DataResponse) def get_settlement(request: SettlementDataQueryRequest, db: Session = Depends(get_db)): - """获取沉降数据,按上传时间倒序排序,支持limit参数限制返回数量""" + """获取沉降数据,按上传时间倒序排序,支持分页参数(skip、limit)""" try: logger.info(f"Querying settlement data with params: {request.dict()}") - result_data = settlement_service.search_settlement_data_formatted( + result = settlement_service.search_settlement_data_formatted( db, id=request.id, point_id=request.point_id, nyid=request.NYID, sjName=request.sjName, workinfoname=request.workinfoname, + skip=request.skip, limit=request.limit ) - logger.info(f"Found {len(result_data)} settlement records") + logger.info(f"Found {result['total']} settlement records, returning {len(result['data'])} records") return DataResponse( code=ResponseCode.SUCCESS, message="查询成功", - total=len(result_data), - data=result_data + total=result['total'], + data=result['data'] ) except Exception as e: logger.error(f"Query settlement data failed: {str(e)}") @@ -344,4 +352,144 @@ def get_settlement_by_linecode( message=f"查询失败:{str(e)}", total=0, data=[] + ) +@router.post("/get_settlement_by_nyid", response_model=DataResponse) +def get_settlement_by_nyid( + request: NYIDRequest, # 假设定义了接收nyid的请求模型 + db: Session = Depends(get_db) +): + try: + nyid = request.NYID # 从请求体中获取nyid + logger.info(f"接口请求:根据nyid={nyid}查询沉降数据") + + settlement = SettlementDataService() + # 获取模型实例列表 + checkpoint_instances = settlement.get_by_nyid(db, nyid=nyid) + # 转为字典列表(核心修正) + checkpoint_data = [instance.__dict__ for instance in checkpoint_instances] + # 清理 SQLAlchemy 内部属性(可选,避免多余字段) + checkpoint_data = [{k: v for k, v in item.items() if not k.startswith('_sa_')} for item in checkpoint_data] + + return DataResponse( + code=ResponseCode.SUCCESS, + message=f"查询成功,共获取{len(checkpoint_data)}条沉降数据,nyid={nyid}", + total=len(checkpoint_data), + data=checkpoint_data + ) + + except Exception as e: + logger.error(f"查询沉降数据失败:{str(e)}", exc_info=True) + return DataResponse( + code=ResponseCode.QUERY_FAILED, + message=f"查询失败:{str(e)}", + total=0, + data=[] + ) +@router.post("/get_today_data", response_model=DataResponse) +def get_today_data(request: TodayDataRequest, db: Session = Depends(get_db)): + """接口:通过POST请求触发调度器中的 scheduled_get_max_nyid_by_point_id 定时任务""" + try: + # 获取请求参数(如果需要从请求体中接收参数,可通过request获取) + # 示例:如需接收account_id,可通过 request.account_id 获取 + # account_id = request.account_id # 根据根据实际需求决定是否需要 + + # 触发定时任务(如果需要传入参数,可在这里添加) + # scheduled_get_max_nyid_by_point_id() + + # 调用服务层获取数据 + account_id = request.account_id + daily_service = DailyDataService() + # 如需使用请求参数,可修改为 daily_service.get_daily_data_by_account(db, account_id=account_id) + daily_data = daily_service.get_daily_data_by_account(db, account_id=account_id) + + return DataResponse( + code=ResponseCode.SUCCESS, + message="定时时任务触发执行成功!任务已开始处理(具体结果查看系统日志)", + total=1 if daily_data else 0, # 根据实际数据是否存在调整total + data=daily_data + ) + except Exception as e: + logger.error(f"接口触发定时任务失败:{str(e)}", exc_info=True) + return DataResponse( + code=ResponseCode.QUERY_FAILED, + message=f"定时任务触发失败:{str(e)}", + total=0, + data={} + ) +# account_id获取所有断面数据 +@router.post("/get_all_section_by_account", response_model=DataResponse) +def get_all_section_by_account(request: SectionByAccountRequest, db: Session = Depends(get_db)): + """获取断面数据 + 观测点""" + try: + account_id = request.account_id + section_service = SectionDataService() + result_data = section_service.get_by_account_id(db, account_id=account_id) + data_list = [item.to_dict() for item in result_data] if result_data else [] + return DataResponse( + code=ResponseCode.SUCCESS, + message="查询成功", + total=len(data_list), + data=data_list + ) + except Exception as e: + logger.error(f"Query section data failed: {str(e)}") + return DataResponse( + code=ResponseCode.QUERY_FAILED, + message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}", + total=0, + data=[] + ) +# section_id 获取所有观测点数据 +@router.post("/get_all_checkpoint_by_section", response_model=DataResponse) +def get_all_checkpoint_by_section(request: SectionByAccountRequest, db: Session = Depends(get_db)): + """获取断面数据 + 观测点""" + try: + section_id = request.section_id + checkpoint_service = CheckpointService() + result_data = checkpoint_service.get_by_section_id(db, section_id=section_id) + data_list = [item.to_dict() for item in result_data] if result_data else [] + return DataResponse( + code=ResponseCode.SUCCESS, + message="查询成功", + total=len(data_list), + data=data_list + ) + except Exception as e: + logger.error(f"Query section data failed: {str(e)}") + return DataResponse( + code=ResponseCode.QUERY_FAILED, + message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}", + total=0, + data=[] + ) +@router.post("/get_checkpoint_by_point", response_model=DataResponse) +def get_checkpoint_by_point(request: PointByAccountRequest, db: Session = Depends(get_db)): + """根据观测点ID获取观测点""" + try: + point_id = request.point_id + checkpoint_service = CheckpointService() + result_data = checkpoint_service.get_by_point_id(db, point_id=point_id) + + # 使用 __dict__ 转换(过滤内部属性) + if result_data: + # 复制字典并排除 SQLAlchemy 内部属性 + data_dict = result_data.__dict__.copy() + data_dict.pop('_sa_instance_state', None) # 移除ORM内部状态属性 + data_list = [data_dict] + else: + data_list = [] + + return DataResponse( + code=ResponseCode.SUCCESS, + message="查询成功", + total=len(data_list), + data=data_list + ) + except Exception as e: + logger.error(f"Query section data failed: {str(e)}") + return DataResponse( + code=ResponseCode.QUERY_FAILED, + message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}", + total=0, + data=[] ) \ No newline at end of file diff --git a/app/api/test.py b/app/api/test.py new file mode 100644 index 0000000..eb4c329 --- /dev/null +++ b/app/api/test.py @@ -0,0 +1,55 @@ +from fastapi import APIRouter, Depends +from sqlalchemy.orm import Session +from ..core.database import get_db +from ..core.response_code import ResponseCode +from ..schemas.comprehensive_data import DataResponse +import logging +from ..services.daily import DailyDataService +from ..services.checkpoint import CheckpointService +from ..services.settlement_data import SettlementDataService +from ..utils.scheduler import scheduled_get_max_nyid_by_point_id +# 导入全局定时任务调度器实例和目标任务函数 +from ..utils.scheduler import task_scheduler, scheduled_get_max_nyid_by_point_id + +router = APIRouter(prefix="/test", tags=["测试"]) +logger = logging.getLogger(__name__) + +@router.get("/trigger_max_nyid_task", response_model=DataResponse) +def trigger_max_nyid_task(db: Session = Depends(get_db)): + """接口:直接触发调度器中的 scheduled_get_max_nyid_by_point_id 定时任务""" + try: + # 触发任务执行 + # task_scheduler.run_job(scheduled_get_max_nyid_by_point_id.__name__) + # settlement = SettlementDataService() + # # 获取模型实例列表 + # checkpoint_instances = settlement.get_by_nyid(db, nyid="4993546") + # # 转为字典列表(核心修正) + # checkpoint_data = [instance.__dict__ for instance in checkpoint_instances] + # # 清理 SQLAlchemy 内部属性(可选,避免多余字段) + # checkpoint_data = [{k: v for k, v in item.items() if not k.startswith('_sa_')} for item in checkpoint_data] + + # return DataResponse( + # code=ResponseCode.SUCCESS, + # message="定时任务触发执行成功!任务已开始处理(具体结果查看系统日志)", + # total=len(checkpoint_data), # 修正为实际数据长度 + # data=checkpoint_data + # ) + + # scheduled_get_max_nyid_by_point_id() + daily_service = DailyDataService() + daily_data = daily_service.get_daily_data_by_account(db,account_id=1) + + return DataResponse( + code=ResponseCode.SUCCESS, + message="定时任务触发执行成功!任务已开始处理(具体结果查看系统日志)", + total=1, + data=daily_data + ) + except Exception as e: + logger.error(f"接口触发定时任务失败:{str(e)}", exc_info=True) + return DataResponse( + code=ResponseCode.QUERY_FAILED, + message=f"定时任务触发失败:{str(e)}", + total=0, + data={} + ) \ No newline at end of file diff --git a/app/main.py b/app/main.py index 2468308..2a1d620 100644 --- a/app/main.py +++ b/app/main.py @@ -68,6 +68,7 @@ app.include_router(account_router, prefix="/api") app.include_router(database_router, prefix="/api") app.include_router(task_router, prefix="/api") app.include_router(comprehensive_data_router, prefix="/api") +# app.include_router(test_router, prefix="/api") # 根路径 @app.get("/") diff --git a/app/models/account.py b/app/models/account.py index f3e7563..006637a 100644 --- a/app/models/account.py +++ b/app/models/account.py @@ -14,6 +14,7 @@ class Account(Base): created_at = Column(DateTime, server_default=func.now(), comment="创建时间") updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment="更新时间") update_time = Column(String(1000), nullable=False, comment="更新时间跨度") + max_variation = Column(Integer, default=1, comment="变化量的绝对值,单位是毫米") # 模型转字典 diff --git a/app/models/daily.py b/app/models/daily.py index 84ab4f5..2c45f1b 100644 --- a/app/models/daily.py +++ b/app/models/daily.py @@ -5,8 +5,11 @@ class DailyData(Base): __tablename__ = "daily" id = Column(Integer, primary_key=True, index=True, autoincrement=True) + user_id = Column(Integer, nullable=False, comment="用户id") account_id = Column(Integer, nullable=False, comment="账户id") point_id = Column(String(100), comment="测点id") NYID = Column(String(100), nullable=False, comment="期数id") linecode = Column(String(255), nullable=False, comment="水准线路编码") - section_id = Column(String(255), nullable=False, comment="所属断面id") \ No newline at end of file + section_id = Column(String(255), nullable=False, comment="所属断面id") + remaining = Column(Integer, nullable=False, comment="剩余天数") + user_id = Column(Integer, default=1, nullable=False, comment="用户id") diff --git a/app/schemas/account.py b/app/schemas/account.py index 7e052fd..617307e 100644 --- a/app/schemas/account.py +++ b/app/schemas/account.py @@ -9,6 +9,7 @@ class AccountBase(BaseModel): today_updated: Optional[int] = 0 project_name: Optional[str] = None update_time: Optional[str] = None + max_variation: Optional[int] = None class AccountCreate(AccountBase): pass @@ -42,7 +43,8 @@ class AccountResponse(AccountBase): project_name=account.project_name, created_at=account.created_at, updated_at=account.updated_at, - update_time=account.update_time + update_time=account.update_time, + max_variation=account.max_variation, ) class AccountListRequest(BaseModel): diff --git a/app/schemas/comprehensive_data.py b/app/schemas/comprehensive_data.py index 30c1b76..4b6c668 100644 --- a/app/schemas/comprehensive_data.py +++ b/app/schemas/comprehensive_data.py @@ -20,7 +20,12 @@ class LevelDataImportRequest(BaseModel): wsphigh: Optional[str] = None mtype: Optional[str] = None createDate: Optional[str] = None - +# 水准数据导入请求 +class NYIDRequest(BaseModel): + NYID: str +# 今日数据请求 +class TodayDataRequest(BaseModel): + account_id: str # 沉降数据导入请求 class SettlementDataImportRequest(BaseModel): point_id: str @@ -104,6 +109,7 @@ class SettlementDataQueryRequest(BaseModel): isgzjdxz: Optional[str] = None upd_remark: Optional[str] = None limit: Optional[int] = None # 限制返回数量,None表示返回全部 + skip: Optional[int] = 0 # 跳过数量,用于分页,默认0 # 沉降数据查询请求——水准线路编码 class SettlementDataCheckpointQueryRequest(BaseModel): @@ -147,7 +153,15 @@ class SectionDataQueryRequest(BaseModel): foundation_treatment_method: Optional[str] = None rock_mass_classification: Optional[str] = None account_id: Optional[str] = None - + limit: Optional[int] = None # 限制返回数量,None表示返回全部 + skip: Optional[int] = 0 # 跳过数量,用于分页,默认0 +# 断面数据导入请求 +class SectionByAccountRequest(BaseModel): + account_id: Optional[str] = None + section_id: Optional[str] = None +# 测点数据 +class PointByAccountRequest(BaseModel): + point_id: Optional[str] = None # 水准数据查询请求 class LevelDataQueryRequest(BaseModel): linecode: Optional[str] = None diff --git a/app/services/base.py b/app/services/base.py index e2ddebc..45b4db6 100644 --- a/app/services/base.py +++ b/app/services/base.py @@ -52,7 +52,7 @@ class BaseService(Generic[ModelType]): return db.query(self.model).filter(field == field_value).all() return [] - def search_by_conditions(self, db: Session, conditions: Dict[str, Any]) -> List[ModelType]: + def search_by_conditions(self, db: Session, conditions: Dict[str, Any], skip: int = 0, limit: Optional[int] = None) -> List[ModelType]: """根据多个条件搜索记录""" query = db.query(self.model) for field_name, field_value in conditions.items(): @@ -62,4 +62,19 @@ class BaseService(Generic[ModelType]): query = query.filter(field.like(f"{field_value}")) else: query = query.filter(field == field_value) - return query.all() \ No newline at end of file + query = query.offset(skip) + if limit is not None: + query = query.limit(limit) + return query.all() + + def search_by_conditions_count(self, db: Session, conditions: Dict[str, Any]) -> int: + """根据多个条件搜索记录总数""" + query = db.query(self.model) + for field_name, field_value in conditions.items(): + if hasattr(self.model, field_name) and field_value is not None: + field = getattr(self.model, field_name) + if isinstance(field_value, str): + query = query.filter(field.like(f"{field_value}")) + else: + query = query.filter(field == field_value) + return query.count() \ No newline at end of file diff --git a/app/services/checkpoint.py b/app/services/checkpoint.py index 4a06cb5..d9542b1 100644 --- a/app/services/checkpoint.py +++ b/app/services/checkpoint.py @@ -8,9 +8,7 @@ class CheckpointService(BaseService[Checkpoint]): def __init__(self): super().__init__(Checkpoint) - def get_by_section_id(self, db: Session, section_id: str) -> List[Checkpoint]: - """根据断面ID获取观测点""" - return self.get_by_field(db, "section_id", section_id) + def get_by_point_id(self, db: Session, point_id: str) -> Optional[Checkpoint]: """根据观测点ID获取观测点""" @@ -118,4 +116,11 @@ class CheckpointService(BaseService[Checkpoint]): 'success_count': success_count, 'failed_count': failed_count, 'failed_items': failed_items - } \ No newline at end of file + } + def get_by_nyid(self, db: Session, nyid: str) -> List[Checkpoint]: + """根据NYID获取所有相关的测点信息""" + return self.get_by_field(db, "NYID", nyid) + # 通过section_id获取所有观测点数据 + def get_by_section_id(self, db: Session, section_id: str) -> List[Checkpoint]: + """根据section_id获取所有相关的测点信息""" + return self.get_by_field(db, "section_id", section_id) diff --git a/app/services/daily.py b/app/services/daily.py index 39bb2a2..20444bc 100644 --- a/app/services/daily.py +++ b/app/services/daily.py @@ -149,4 +149,88 @@ class DailyDataService(BaseService[DailyData]): # 构建[[{}], [{}]]格式 return [ [record] for pid in point_ids for record in grouped.get(pid, []) - ] \ No newline at end of file + ] + + # 获取所有的今日数据 + def get_all_daily_data( + self, + db: Session, + user_id: Optional[int] = None # 可选参数:按user_id筛选 + ) -> List[Dict[str, Any]]: + """ + 获取所有日常数据(DailyData),支持按user_id筛选 + :param db: 数据库会话 + :param user_id: 可选用户ID,若提供则只返回该用户的数据 + :return: 日常数据字典列表,包含所有字段 + """ + try: + # 基础查询 + query = db.query(DailyData) + + # 若提供了user_id,则添加筛选条件 + if user_id is not None: + query = query.filter(DailyData.user_id == user_id) + logger.info(f"查询user_id={user_id}的所有日常数据") + else: + logger.info("查询所有日常数据") + + # 执行查询并获取所有记录 + daily_records = query.all() + + # 转换为字典列表(保留所有字段) + result = [] + for record in daily_records: + record_dict = { + column.name: getattr(record, column.name) + for column in DailyData.__table__.columns + } + result.append(record_dict) + + logger.info(f"查询完成,共获取{len(result)}条日常数据") + return result + + except Exception as e: + logger.error(f"获取日常数据失败:{str(e)}", exc_info=True) + raise e + def get_daily_data_by_account( + self, + db: Session, + account_id: str, # 账号ID(必填,因为是核心筛选条件) + user_id: Optional[int] = None # 可选参数:额外按user_id筛选 + ) -> List[Dict[str, Any]]: + """ + 根据account_id获取对应日常数据,支持额外按user_id筛选 + :param db: 数据库会话 + :param account_id: 账号ID(必填),用于精准筛选数据 + :param user_id: 可选用户ID,若提供则则进一步筛选该用户的数据 + :return: 符合条件的日常数据字典列表,包含所有字段 + """ + try: + # 基础查询:先按account_id筛选(必填条件) + query = db.query(DailyData).filter(DailyData.account_id == account_id) + + # 若提供了user_id,则添加额外筛选条件 + if user_id is not None: + query = query.filter(DailyData.user_id == user_id) + logger.info(f"查询account_id={account_id}且user_id={user_id}的日常数据") + else: + logger.info(f"查询account_id={account_id}的所有日常数据") + + # 执行查询并获取记录 + daily_records = query.all() + + # 转换为字典列表(保留所有字段) + result = [] + for record in daily_records: + record_dict = { + column.name: getattr(record, column.name) + for column in DailyData.__table__.columns + } + result.append(record_dict) + + logger.info(f"查询完成,account_id={account_id}对应{len(result)}条日常数据") + return result + + except Exception as e: + logger.error(f"获取account_id={account_id}的日常数据失败:{str(e)}", exc_info=True) + raise e \ No newline at end of file diff --git a/app/services/section_data.py b/app/services/section_data.py index 42fbbc7..ac3a0ea 100644 --- a/app/services/section_data.py +++ b/app/services/section_data.py @@ -20,7 +20,10 @@ class SectionDataService(BaseService[SectionData]): """根据断面ID获取断面数据""" sections = self.get_by_field(db, "section_id", section_id) return sections[0] if sections else None - + def get_by_account_id(self, db: Session, account_id: str) -> Optional[SectionData]: + """根据账号ID获取断面数据""" + accounts = self.get_by_field(db, "account_id", account_id) + return accounts if accounts else None def get_by_number(self, db: Session, number: str) -> List[SectionData]: """根据桥梁墩(台)编号获取断面数据""" return self.get_by_field(db, "number", number) @@ -33,7 +36,9 @@ class SectionDataService(BaseService[SectionData]): number: Optional[str] = None, status: Optional[str] = None, basic_types: Optional[str] = None, - account_id: Optional[str] = None) -> List[SectionData]: + account_id: Optional[str] = None, + skip: int = 0, + limit: Optional[int] = None) -> List[SectionData]: """根据多个条件搜索断面数据""" conditions = {} if section_id is not None: @@ -53,7 +58,7 @@ class SectionDataService(BaseService[SectionData]): if account_id is not None: conditions['account_id'] = account_id - return self.search_by_conditions(db, conditions) + return self.search_by_conditions(db, conditions, skip, limit) def search_sections_with_checkpoints(self, db: Session, id: Optional[int] = None, @@ -62,9 +67,32 @@ class SectionDataService(BaseService[SectionData]): work_site: Optional[str] = None, number: Optional[str] = None, status: Optional[str] = None, - account_id: Optional[str] = None) -> List[Dict[str, Any]]: - """查询断面数据并返回带观测点的结果""" - sections = self.search_section_data(db, id, section_id, mileage, work_site, number, status, account_id=account_id) + account_id: Optional[str] = None, + skip: int = 0, + limit: Optional[int] = None) -> Dict[str, Any]: + """查询断面数据并返回带观测点的结果(支持分页)""" + # 构建查询条件 + conditions = {} + if section_id is not None: + conditions["section_id"] = section_id + if work_site is not None: + conditions["work_site"] = work_site + if number is not None: + conditions["number"] = number + if status is not None: + conditions["status"] = status + if id is not None: + conditions['id'] = id + if mileage is not None: + conditions['mileage'] = mileage + if account_id is not None: + conditions['account_id'] = account_id + + # 获取总数 + total_count = self.search_by_conditions_count(db, conditions) + + # 获取分页数据 + sections = self.search_by_conditions(db, conditions, skip, limit) result = [] for section in sections: @@ -98,7 +126,12 @@ class SectionDataService(BaseService[SectionData]): } result.append(section_dict) - return result + return { + "data": result, + "total": total_count, + "skip": skip, + "limit": limit + } def get_section_with_checkpoints(self, db: Session, section_id: str) -> Dict[str, Any]: """获取断面数据及其关联的观测点""" diff --git a/app/services/settlement_data.py b/app/services/settlement_data.py index 4f59e7d..716beaa 100644 --- a/app/services/settlement_data.py +++ b/app/services/settlement_data.py @@ -7,6 +7,7 @@ from sqlalchemy.orm import Session from typing import Dict, List from ..models.settlement_data import SettlementData from ..models.level_data import LevelData +from ..models.section_data import SectionData import logging from datetime import datetime @@ -36,6 +37,7 @@ class SettlementDataService(BaseService[SettlementData]): nyid: Optional[str] = None, sjName: Optional[str] = None, workinfoname: Optional[str] = None, + skip: int = 0, limit: Optional[int] = None) -> List[SettlementData]: """根据多个条件搜索沉降数据,按上传时间倒序排序""" query = db.query(SettlementData) @@ -54,7 +56,9 @@ class SettlementDataService(BaseService[SettlementData]): # 按上传时间倒序排序 query = query.order_by(SettlementData.createdate.desc()) - # 如果指定了limit,则限制返回数量 + # 添加分页支持 + if skip > 0: + query = query.offset(skip) if limit is not None and limit > 0: query = query.limit(limit) @@ -66,9 +70,25 @@ class SettlementDataService(BaseService[SettlementData]): nyid: Optional[str] = None, sjName: Optional[str] = None, workinfoname: Optional[str] = None, - limit: Optional[int] = None) -> List[Dict[str, Any]]: - """查询沉降数据并返回格式化结果,按上传时间倒序排序""" - settlement_data = self.search_settlement_data(db, id, point_id, nyid, sjName, workinfoname, limit) + skip: int = 0, + limit: Optional[int] = None) -> Dict[str, Any]: + """查询沉降数据并返回格式化结果,按上传时间倒序排序(支持分页)""" + # 先获取总数(不计分页) + count_query = db.query(SettlementData) + if id is not None: + count_query = count_query.filter(SettlementData.id == id) + if point_id is not None: + count_query = count_query.filter(SettlementData.point_id == point_id) + if nyid is not None: + count_query = count_query.filter(SettlementData.NYID == nyid) + if sjName is not None: + count_query = count_query.filter(SettlementData.sjName == sjName) + if workinfoname is not None: + count_query = count_query.filter(SettlementData.workinfoname == workinfoname) + total_count = count_query.count() + + # 获取分页数据 + settlement_data = self.search_settlement_data(db, id, point_id, nyid, sjName, workinfoname, skip, limit) result = [] for settlement in settlement_data: @@ -96,7 +116,12 @@ class SettlementDataService(BaseService[SettlementData]): } result.append(settlement_dict) - return result + return { + "data": result, + "total": total_count, + "skip": skip, + "limit": limit + } def search_settlement_checkpoint_data_formatted(self, db: Session, id: Optional[int] = None, @@ -351,14 +376,16 @@ class SettlementDataService(BaseService[SettlementData]): # 根据水准线路编码获取最新的NYID并获取对应的测点数据 def get_settlement_by_linecode( - self, - db: Session, - linecode: str, - num: int = 1 # 新增参数:控制返回的期数,默认1(最新一期) -) -> Dict: + self, + db: Session, + linecode: str, + num: int = 1 # 控制返回的期数,默认1(最新一期) + ) -> Dict: """ 根据水准线路编码(linecode)查询对应沉降数据,支持按期数筛选 - 关联逻辑:LevelData.linecode → LevelData.NYID → SettlementData.NYID + 关联逻辑: + LevelData.linecode → LevelData.NYID → SettlementData.NYID + SettlementData.point_id(字符串)→ Checkpoint.point_id → Checkpoint.section_id → SectionData.section_id → SectionData.work_site :param db: 数据库会话 :param linecode: 目标水准线路编码 :param num: 返回的期数(按NYID从大到小排序),默认1(最新一期) @@ -367,55 +394,87 @@ class SettlementDataService(BaseService[SettlementData]): try: logger.info(f"开始查询linecode={linecode}对应的沉降数据(取前{num}期)") - # 1. 根据linecode查询水准线路表,获取所有关联的NYID(去重后按NYID降序排序) + # 1. 根据linecode查询水准线路表,获取前N期的NYID nyid_query = db.query(LevelData.NYID)\ .filter(LevelData.linecode == linecode)\ .distinct()\ - .order_by(LevelData.NYID.desc()) # 按NYID降序,确保最新的在前 + .order_by(LevelData.NYID.desc()) - # 根据num参数截取前N期的NYID top_nyids = nyid_query.limit(num).all() - if not top_nyids: logger.warning(f"未查询到linecode={linecode}对应的水准线路记录") return {"settlement_data": []} - # 提取NYID字符串列表(按降序排列,保持最新的在前) target_nyids = [item.NYID for item in top_nyids] - # 2. 根据NYID列表查询沉降数据表,按NYID降序、观测时间升序排列 - settlement_records = db.query(SettlementData)\ - .filter(SettlementData.NYID.in_(target_nyids))\ - .order_by( - SettlementData.NYID.desc(), # 期数从新到旧 - SettlementData.MTIME_W.asc() # 同期内按观测时间从早到晚 - )\ - .all() + # 2. 关联查询:沉降数据 → 观测点表 → 断面表(新增查询Checkpoint.aname) + settlement_records = db.query( + SettlementData, + Checkpoint.section_id, # 从Checkpoint模型获取section_id + Checkpoint.aname, # 新增:从Checkpoint模型获取测点名称aname + SectionData.work_site # 从SectionData模型获取work_site + )\ + .join( + Checkpoint, # 关联观测点模型(类名) + SettlementData.point_id == Checkpoint.point_id, # 字符串类型匹配 + isouter=True # 左连接:避免测点未关联观测点时丢失数据 + )\ + .join( + SectionData, # 关联断面模型(类名) + Checkpoint.section_id == SectionData.section_id, # 字符串类型匹配 + isouter=True # 左连接:避免断面ID未关联断面表时丢失数据 + )\ + .filter(SettlementData.NYID.in_(target_nyids))\ + .order_by( + SettlementData.NYID.desc(), # 期数从新到旧 + SettlementData.MTIME_W.asc() # 同期内按观测时间升序 + )\ + .all() - # 3. 转换模型实例为字典列表(处理日期格式) + # 3. 转换数据并新增字段 settlement_data = [] for record in settlement_records: + # 解析查询结果(元组:(沉降数据实例, section_id, aname, work_site)) + settlement, section_id, aname, work_site = record + + # 根据work_site判断work_type(默认0表示未匹配或无数据) 涵洞H 沉降板L 观测桩G和Z(分标段) B 路基 + work_type = 0 + if work_site: + work_site_str = str(work_site).strip() # 确保为字符串且去空格 + if "S" in aname: + work_type = 1 + elif "L" in aname or "G" in aname or "Z" in aname or "B" in aname: + work_type = 2 + elif "T" in aname or "D" in aname or "C " in aname: + work_type = 3 + elif "H" in aname : + work_type = 4 + # 组装返回字典(新增aname字段) record_dict = { - "id": record.id, - "point_id": record.point_id, - "CVALUE": record.CVALUE, - "MAVALUE": record.MAVALUE, - "MTIME_W": record.MTIME_W.strftime("%Y-%m-%d %H:%M:%S") if record.MTIME_W else None, - "NYID": record.NYID, - "PRELOADH": record.PRELOADH, - "PSTATE": record.PSTATE, - "REMARK": record.REMARK, - "WORKINFO": record.WORKINFO, - "createdate": record.createdate.strftime("%Y-%m-%d %H:%M:%S") if record.createdate else None, - "day": record.day, - "day_jg": record.day_jg, - "isgzjdxz": record.isgzjdxz, - "mavalue_bc": record.mavalue_bc, - "mavalue_lj": record.mavalue_lj, - "sjName": record.sjName, - "useflag": record.useflag, - "workinfoname": record.workinfoname, - "upd_remark": record.upd_remark + "id": settlement.id, + "point_id": settlement.point_id, + "aname": aname, # 新增:测点名称(从Checkpoint表获取) + "section_id": section_id, # 新增:观测点关联的断面ID + "work_site": work_site, # 新增:断面的工点信息 + "work_type": work_type, # 新增:工点类型编码(1-隧道,2-区间路基,3-桥,4-) + "CVALUE": settlement.CVALUE, + "MAVALUE": settlement.MAVALUE, + "MTIME_W": settlement.MTIME_W.strftime("%Y-%m-%d %H:%M:%S") if settlement.MTIME_W else None, + "NYID": settlement.NYID, + "PRELOADH": settlement.PRELOADH, + "PSTATE": settlement.PSTATE, + "REMARK": settlement.REMARK, + "WORKINFO": settlement.WORKINFO, + "createdate": settlement.createdate.strftime("%Y-%m-%d %H:%M:%S") if settlement.createdate else None, + "day": settlement.day, + "day_jg": settlement.day_jg, + "isgzjdxz": settlement.isgzjdxz, + "mavalue_bc": settlement.mavalue_bc, + "mavalue_lj": settlement.mavalue_lj, + "sjName": settlement.sjName, + "useflag": settlement.useflag, + "workinfoname": settlement.workinfoname, + "upd_remark": settlement.upd_remark } settlement_data.append(record_dict) diff --git a/app/utils/construction_monitor.py b/app/utils/construction_monitor.py index d6cc001..21a3a3a 100644 --- a/app/utils/construction_monitor.py +++ b/app/utils/construction_monitor.py @@ -21,7 +21,7 @@ class ConstructionMonitorUtils: "桥位施工桥梁,制梁前": 30, "桥位施工桥梁,上部结构施工中": 1, "架桥机(运梁车)通过": 7, - "桥梁主体工程完工后,第1至3个月": 7, + "桥梁主体工程完工后,第1至3个月": 7, # 模拟包含英文逗号的原始数据 "桥梁主体工程完工后,第4至6个月": 14, "桥梁主体工程完工后,6个月以后": 30, "轨道铺设期间,前": 30, @@ -43,18 +43,31 @@ class ConstructionMonitorUtils: "轨道板(道床)铺设后,第2至3个月": 30, "轨道板(道床)铺设后,3个月以后": 90 } - # 构建中英文括号兼容映射表 + # 构建中英文括号+逗号兼容映射表 self.compatible_periods = self._build_compatible_brackets_map() def _build_compatible_brackets_map(self) -> Dict[str, int]: - """构建支持中英文括号的兼容映射表""" + """构建支持中英文括号、中英文逗号的兼容映射表""" compatible_map = {} for original_key, period in self.base_periods.items(): + # 1. 保留原始key compatible_map[original_key] = period - # 生成中文括号版key并添加到映射表 + + # 2. 生成中文括号版key(原逻辑) chinese_bracket_key = original_key.replace("(", "(").replace(")", ")") if chinese_bracket_key != original_key: compatible_map[chinese_bracket_key] = period + + # 3. 生成英文逗号转中文逗号版key(新增逻辑) + chinese_comma_key = original_key.replace(",", ",") + if chinese_comma_key != original_key: + compatible_map[chinese_comma_key] = period + + # 4. 生成中文括号+中文逗号混合版key(双重兼容) + mixed_key = chinese_bracket_key.replace(",", ",") + if mixed_key != original_key and mixed_key not in compatible_map: + compatible_map[mixed_key] = period + return compatible_map def get_due_data(self, input_data: List[List[Dict]], start: int = 0, end: int = 1, current_date: datetime = None) -> Dict[str, List[Dict]]: @@ -64,9 +77,7 @@ class ConstructionMonitorUtils: calc_date = current_date.date() if current_date else datetime.now().date() - # ------------------------------ # 原有核心逻辑(完全不变) - # ------------------------------ for point_idx, point_data in enumerate(input_data): if not point_data: continue @@ -138,22 +149,17 @@ class ConstructionMonitorUtils: item_copy["remaining"] = due_days result["data"].append(item_copy) - # ------------------------------ # 新增步骤:data中相同NYID保留剩余天数最少的记录 - # ------------------------------ if result["data"]: - # 用字典临时存储:key=NYID,value=该NYID下剩余天数最少的记录 nyid_min_remaining = {} for record in result["data"]: - nyid = record.get("NYID") # 假设NYID字段名是"NYID",若实际不同需调整 + nyid = record.get("NYID") if not nyid: - continue # 无NYID的记录直接保留(或按需求处理) + continue - # 若该NYID未存储,或当前记录剩余天数更少,则更新 if nyid not in nyid_min_remaining or record["remaining"] < nyid_min_remaining[nyid]["remaining"]: nyid_min_remaining[nyid] = record - # 将字典 values 转换为列表,作为去重后的data result["data"] = list(nyid_min_remaining.values()) return result \ No newline at end of file diff --git a/app/utils/scheduler.py b/app/utils/scheduler.py index db56b94..15f0885 100644 --- a/app/utils/scheduler.py +++ b/app/utils/scheduler.py @@ -76,7 +76,9 @@ class TaskScheduler: """设置系统定时任务""" try: # 检查是否已存在每日重置任务 + existing_job = self.scheduler.get_job("daily_reset_today_updated") + # existing_job = self.scheduler.get_job("get_max_nyid") if not existing_job: # 添加每天午夜12点重置today_updated字段的任务 self.scheduler.add_job( @@ -294,7 +296,8 @@ def scheduled_get_max_nyid_by_point_id(): 'point_id': d['point_id'], 'linecode': d['level_data']['linecode'], 'account_id': d['account_data']['account_id'], - 'section_id': d['section_data']['id'] + 'section_id': d['section_data']['section_id'], + 'remaining': d['remaining'], } daily_create_data.append(tem) diff --git a/deploy.sh b/deploy.sh new file mode 100644 index 0000000..fb764b6 --- /dev/null +++ b/deploy.sh @@ -0,0 +1,82 @@ +#!/bin/bash + +# 服务部署脚本 +# 用于停止旧服务并重新启动服务 +# 使用方法: ./deploy.sh [0] (带0参数时不拉取代码) + +echo "=== 服务部署脚本 ===" +echo "此脚本将停止当前服务并重新启动服务" +echo "" + +# 检查是否跳过git pull +SKIP_GIT_PULL=0 +if [ "$1" = "0" ]; then + SKIP_GIT_PULL=1 + echo "跳过代码拉取,直接部署" +else + echo "默认执行代码拉取" +fi + +# 如果不跳过git pull,则拉取最新代码 +if [ $SKIP_GIT_PULL -eq 0 ]; then + echo "" + echo "正在拉取最新代码..." + git pull origin main + + if [ $? -eq 0 ]; then + echo "✓ 代码拉取成功" + else + echo "✗ 代码拉取失败" + echo "是否继续部署? (y/n)" + read -r CONTINUE_DEPLOY + if [ "$CONTINUE_DEPLOY" != "y" ] && [ "$CONTINUE_DEPLOY" != "Y" ]; then + echo "部署已取消" + exit 1 + fi + fi +fi + +# 读取sudo密码 +echo "" +echo -n "请输入sudo密码: " +read -s SUDO_PASSWORD +echo "" +echo "" + +# 检查密码是否为空 +if [ -z "$SUDO_PASSWORD" ]; then + echo "错误: 密码不能为空" + exit 1 +fi + +echo "正在停止当前服务..." +# 使用expect或者直接传递密码给sudo +echo "$SUDO_PASSWORD" | sudo -S docker compose down --rmi all + +if [ $? -eq 0 ]; then + echo "✓ 服务已成功停止" +else + echo "✗ 停止服务失败,请检查密码是否正确" + exit 1 +fi + +echo "" +echo "正在启动新服务..." +echo "$SUDO_PASSWORD" | sudo -S docker compose up -d + +if [ $? -eq 0 ]; then + echo "✓ 服务启动成功" + echo "" + echo "服务状态:" + echo "$SUDO_PASSWORD" | sudo -S docker compose ps +else + echo "✗ 启动服务失败" + exit 1 +fi + +echo "" +echo "当前运行的Docker容器:" +echo "$SUDO_PASSWORD" | sudo -S docker ps + +echo "" +echo "=== 部署完成 ===" \ No newline at end of file diff --git a/your_database.db b/your_database.db new file mode 100644 index 0000000..e69de29