This commit is contained in:
2025-11-06 23:08:38 +08:00
17 changed files with 604 additions and 91 deletions

6
.env
View File

@@ -1,10 +1,12 @@
# 云端数据库配置
DATABASE_URL=mysql+pymysql://railway:Railway01.@localhost:3306/railway
DB_HOST=localhost
DATABASE_URL=mysql+pymysql://railway:Railway01.@172.17.0.1:3306/railway
DB_HOST=172.17.0.1
DB_PORT=3306
DB_USER=railway
DB_PASSWORD=Railway01.
DB_NAME=railway
# 本地配置
# DATABASE_URL=mysql+pymysql://railway:Railway01.@localhost:3306/railway
# DB_HOST=localhost

View File

@@ -16,8 +16,13 @@ from ..schemas.comprehensive_data import (
OriginalDataQueryRequest,
SettlementDataCheckpointQueryRequest,
LevelDataQueryRequest,
LinecodeRequest
LinecodeRequest,
NYIDRequest,
SectionByAccountRequest,
PointByAccountRequest,
TodayDataRequest
)
from ..services.daily import DailyDataService
from ..services.section_data import SectionDataService
from ..services.checkpoint import CheckpointService
from ..services.settlement_data import SettlementDataService
@@ -192,10 +197,10 @@ def batch_import_original_data(request: BatchOriginalDataImportRequest, db: Sess
# 查询断面数据对应观察点数据
@router.post("/get_section", response_model=DataResponse)
def get_section(request: SectionDataQueryRequest, db: Session = Depends(get_db)):
"""获取断面数据 + 观测点"""
"""获取断面数据 + 观测点(支持分页)"""
try:
logger.info(f"Querying section data with params: {request.dict()}")
result_data = section_service.search_sections_with_checkpoints(
result = section_service.search_sections_with_checkpoints(
db,
id=request.id,
section_id=request.section_id,
@@ -203,15 +208,17 @@ def get_section(request: SectionDataQueryRequest, db: Session = Depends(get_db))
work_site=request.work_site,
number=request.number,
status=request.status,
account_id=request.account_id
account_id=request.account_id,
skip=request.skip,
limit=request.limit
)
logger.info(f"Found {len(result_data)} sections with checkpoints")
logger.info(f"Found {result['total']} sections with checkpoints, returning {len(result['data'])} records")
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(result_data),
data=result_data
total=result['total'],
data=result['data']
)
except Exception as e:
logger.error(f"Query section data failed: {str(e)}")
@@ -225,25 +232,26 @@ def get_section(request: SectionDataQueryRequest, db: Session = Depends(get_db))
# 根据观测点id查询沉降数据
@router.post("/get_settlement", response_model=DataResponse)
def get_settlement(request: SettlementDataQueryRequest, db: Session = Depends(get_db)):
"""获取沉降数据,按上传时间倒序排序,支持limit参数限制返回数量"""
"""获取沉降数据,按上传时间倒序排序,支持分页参数skip、limit"""
try:
logger.info(f"Querying settlement data with params: {request.dict()}")
result_data = settlement_service.search_settlement_data_formatted(
result = settlement_service.search_settlement_data_formatted(
db,
id=request.id,
point_id=request.point_id,
nyid=request.NYID,
sjName=request.sjName,
workinfoname=request.workinfoname,
skip=request.skip,
limit=request.limit
)
logger.info(f"Found {len(result_data)} settlement records")
logger.info(f"Found {result['total']} settlement records, returning {len(result['data'])} records")
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(result_data),
data=result_data
total=result['total'],
data=result['data']
)
except Exception as e:
logger.error(f"Query settlement data failed: {str(e)}")
@@ -344,4 +352,144 @@ def get_settlement_by_linecode(
message=f"查询失败:{str(e)}",
total=0,
data=[]
)
@router.post("/get_settlement_by_nyid", response_model=DataResponse)
def get_settlement_by_nyid(
request: NYIDRequest, # 假设定义了接收nyid的请求模型
db: Session = Depends(get_db)
):
try:
nyid = request.NYID # 从请求体中获取nyid
logger.info(f"接口请求根据nyid={nyid}查询沉降数据")
settlement = SettlementDataService()
# 获取模型实例列表
checkpoint_instances = settlement.get_by_nyid(db, nyid=nyid)
# 转为字典列表(核心修正)
checkpoint_data = [instance.__dict__ for instance in checkpoint_instances]
# 清理 SQLAlchemy 内部属性(可选,避免多余字段)
checkpoint_data = [{k: v for k, v in item.items() if not k.startswith('_sa_')} for item in checkpoint_data]
return DataResponse(
code=ResponseCode.SUCCESS,
message=f"查询成功,共获取{len(checkpoint_data)}条沉降数据nyid={nyid}",
total=len(checkpoint_data),
data=checkpoint_data
)
except Exception as e:
logger.error(f"查询沉降数据失败:{str(e)}", exc_info=True)
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"查询失败:{str(e)}",
total=0,
data=[]
)
@router.post("/get_today_data", response_model=DataResponse)
def get_today_data(request: TodayDataRequest, db: Session = Depends(get_db)):
"""接口通过POST请求触发调度器中的 scheduled_get_max_nyid_by_point_id 定时任务"""
try:
# 获取请求参数如果需要从请求体中接收参数可通过request获取
# 示例如需接收account_id可通过 request.account_id 获取
# account_id = request.account_id # 根据根据实际需求决定是否需要
# 触发定时任务(如果需要传入参数,可在这里添加)
# scheduled_get_max_nyid_by_point_id()
# 调用服务层获取数据
account_id = request.account_id
daily_service = DailyDataService()
# 如需使用请求参数,可修改为 daily_service.get_daily_data_by_account(db, account_id=account_id)
daily_data = daily_service.get_daily_data_by_account(db, account_id=account_id)
return DataResponse(
code=ResponseCode.SUCCESS,
message="定时时任务触发执行成功!任务已开始处理(具体结果查看系统日志)",
total=1 if daily_data else 0, # 根据实际数据是否存在调整total
data=daily_data
)
except Exception as e:
logger.error(f"接口触发定时任务失败:{str(e)}", exc_info=True)
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"定时任务触发失败:{str(e)}",
total=0,
data={}
)
# account_id获取所有断面数据
@router.post("/get_all_section_by_account", response_model=DataResponse)
def get_all_section_by_account(request: SectionByAccountRequest, db: Session = Depends(get_db)):
"""获取断面数据 + 观测点"""
try:
account_id = request.account_id
section_service = SectionDataService()
result_data = section_service.get_by_account_id(db, account_id=account_id)
data_list = [item.to_dict() for item in result_data] if result_data else []
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(data_list),
data=data_list
)
except Exception as e:
logger.error(f"Query section data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
# section_id 获取所有观测点数据
@router.post("/get_all_checkpoint_by_section", response_model=DataResponse)
def get_all_checkpoint_by_section(request: SectionByAccountRequest, db: Session = Depends(get_db)):
"""获取断面数据 + 观测点"""
try:
section_id = request.section_id
checkpoint_service = CheckpointService()
result_data = checkpoint_service.get_by_section_id(db, section_id=section_id)
data_list = [item.to_dict() for item in result_data] if result_data else []
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(data_list),
data=data_list
)
except Exception as e:
logger.error(f"Query section data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)
@router.post("/get_checkpoint_by_point", response_model=DataResponse)
def get_checkpoint_by_point(request: PointByAccountRequest, db: Session = Depends(get_db)):
"""根据观测点ID获取观测点"""
try:
point_id = request.point_id
checkpoint_service = CheckpointService()
result_data = checkpoint_service.get_by_point_id(db, point_id=point_id)
# 使用 __dict__ 转换(过滤内部属性)
if result_data:
# 复制字典并排除 SQLAlchemy 内部属性
data_dict = result_data.__dict__.copy()
data_dict.pop('_sa_instance_state', None) # 移除ORM内部状态属性
data_list = [data_dict]
else:
data_list = []
return DataResponse(
code=ResponseCode.SUCCESS,
message="查询成功",
total=len(data_list),
data=data_list
)
except Exception as e:
logger.error(f"Query section data failed: {str(e)}")
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"{ResponseMessage.QUERY_FAILED}: {str(e)}",
total=0,
data=[]
)

55
app/api/test.py Normal file
View File

@@ -0,0 +1,55 @@
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from ..core.database import get_db
from ..core.response_code import ResponseCode
from ..schemas.comprehensive_data import DataResponse
import logging
from ..services.daily import DailyDataService
from ..services.checkpoint import CheckpointService
from ..services.settlement_data import SettlementDataService
from ..utils.scheduler import scheduled_get_max_nyid_by_point_id
# 导入全局定时任务调度器实例和目标任务函数
from ..utils.scheduler import task_scheduler, scheduled_get_max_nyid_by_point_id
router = APIRouter(prefix="/test", tags=["测试"])
logger = logging.getLogger(__name__)
@router.get("/trigger_max_nyid_task", response_model=DataResponse)
def trigger_max_nyid_task(db: Session = Depends(get_db)):
"""接口:直接触发调度器中的 scheduled_get_max_nyid_by_point_id 定时任务"""
try:
# 触发任务执行
# task_scheduler.run_job(scheduled_get_max_nyid_by_point_id.__name__)
# settlement = SettlementDataService()
# # 获取模型实例列表
# checkpoint_instances = settlement.get_by_nyid(db, nyid="4993546")
# # 转为字典列表(核心修正)
# checkpoint_data = [instance.__dict__ for instance in checkpoint_instances]
# # 清理 SQLAlchemy 内部属性(可选,避免多余字段)
# checkpoint_data = [{k: v for k, v in item.items() if not k.startswith('_sa_')} for item in checkpoint_data]
# return DataResponse(
# code=ResponseCode.SUCCESS,
# message="定时任务触发执行成功!任务已开始处理(具体结果查看系统日志)",
# total=len(checkpoint_data), # 修正为实际数据长度
# data=checkpoint_data
# )
# scheduled_get_max_nyid_by_point_id()
daily_service = DailyDataService()
daily_data = daily_service.get_daily_data_by_account(db,account_id=1)
return DataResponse(
code=ResponseCode.SUCCESS,
message="定时任务触发执行成功!任务已开始处理(具体结果查看系统日志)",
total=1,
data=daily_data
)
except Exception as e:
logger.error(f"接口触发定时任务失败:{str(e)}", exc_info=True)
return DataResponse(
code=ResponseCode.QUERY_FAILED,
message=f"定时任务触发失败:{str(e)}",
total=0,
data={}
)

View File

@@ -68,6 +68,7 @@ app.include_router(account_router, prefix="/api")
app.include_router(database_router, prefix="/api")
app.include_router(task_router, prefix="/api")
app.include_router(comprehensive_data_router, prefix="/api")
# app.include_router(test_router, prefix="/api")
# 根路径
@app.get("/")

View File

@@ -14,6 +14,7 @@ class Account(Base):
created_at = Column(DateTime, server_default=func.now(), comment="创建时间")
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment="更新时间")
update_time = Column(String(1000), nullable=False, comment="更新时间跨度")
max_variation = Column(Integer, default=1, comment="变化量的绝对值,单位是毫米")
# 模型转字典

View File

@@ -5,8 +5,11 @@ class DailyData(Base):
__tablename__ = "daily"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
user_id = Column(Integer, nullable=False, comment="用户id")
account_id = Column(Integer, nullable=False, comment="账户id")
point_id = Column(String(100), comment="测点id")
NYID = Column(String(100), nullable=False, comment="期数id")
linecode = Column(String(255), nullable=False, comment="水准线路编码")
section_id = Column(String(255), nullable=False, comment="所属断面id")
section_id = Column(String(255), nullable=False, comment="所属断面id")
remaining = Column(Integer, nullable=False, comment="剩余天数")
user_id = Column(Integer, default=1, nullable=False, comment="用户id")

View File

@@ -9,6 +9,7 @@ class AccountBase(BaseModel):
today_updated: Optional[int] = 0
project_name: Optional[str] = None
update_time: Optional[str] = None
max_variation: Optional[int] = None
class AccountCreate(AccountBase):
pass
@@ -42,7 +43,8 @@ class AccountResponse(AccountBase):
project_name=account.project_name,
created_at=account.created_at,
updated_at=account.updated_at,
update_time=account.update_time
update_time=account.update_time,
max_variation=account.max_variation,
)
class AccountListRequest(BaseModel):

View File

@@ -20,7 +20,12 @@ class LevelDataImportRequest(BaseModel):
wsphigh: Optional[str] = None
mtype: Optional[str] = None
createDate: Optional[str] = None
# 水准数据导入请求
class NYIDRequest(BaseModel):
NYID: str
# 今日数据请求
class TodayDataRequest(BaseModel):
account_id: str
# 沉降数据导入请求
class SettlementDataImportRequest(BaseModel):
point_id: str
@@ -104,6 +109,7 @@ class SettlementDataQueryRequest(BaseModel):
isgzjdxz: Optional[str] = None
upd_remark: Optional[str] = None
limit: Optional[int] = None # 限制返回数量None表示返回全部
skip: Optional[int] = 0 # 跳过数量用于分页默认0
# 沉降数据查询请求——水准线路编码
class SettlementDataCheckpointQueryRequest(BaseModel):
@@ -147,7 +153,15 @@ class SectionDataQueryRequest(BaseModel):
foundation_treatment_method: Optional[str] = None
rock_mass_classification: Optional[str] = None
account_id: Optional[str] = None
limit: Optional[int] = None # 限制返回数量None表示返回全部
skip: Optional[int] = 0 # 跳过数量用于分页默认0
# 断面数据导入请求
class SectionByAccountRequest(BaseModel):
account_id: Optional[str] = None
section_id: Optional[str] = None
# 测点数据
class PointByAccountRequest(BaseModel):
point_id: Optional[str] = None
# 水准数据查询请求
class LevelDataQueryRequest(BaseModel):
linecode: Optional[str] = None

View File

@@ -52,7 +52,7 @@ class BaseService(Generic[ModelType]):
return db.query(self.model).filter(field == field_value).all()
return []
def search_by_conditions(self, db: Session, conditions: Dict[str, Any]) -> List[ModelType]:
def search_by_conditions(self, db: Session, conditions: Dict[str, Any], skip: int = 0, limit: Optional[int] = None) -> List[ModelType]:
"""根据多个条件搜索记录"""
query = db.query(self.model)
for field_name, field_value in conditions.items():
@@ -62,4 +62,19 @@ class BaseService(Generic[ModelType]):
query = query.filter(field.like(f"{field_value}"))
else:
query = query.filter(field == field_value)
return query.all()
query = query.offset(skip)
if limit is not None:
query = query.limit(limit)
return query.all()
def search_by_conditions_count(self, db: Session, conditions: Dict[str, Any]) -> int:
"""根据多个条件搜索记录总数"""
query = db.query(self.model)
for field_name, field_value in conditions.items():
if hasattr(self.model, field_name) and field_value is not None:
field = getattr(self.model, field_name)
if isinstance(field_value, str):
query = query.filter(field.like(f"{field_value}"))
else:
query = query.filter(field == field_value)
return query.count()

View File

@@ -8,9 +8,7 @@ class CheckpointService(BaseService[Checkpoint]):
def __init__(self):
super().__init__(Checkpoint)
def get_by_section_id(self, db: Session, section_id: str) -> List[Checkpoint]:
"""根据断面ID获取观测点"""
return self.get_by_field(db, "section_id", section_id)
def get_by_point_id(self, db: Session, point_id: str) -> Optional[Checkpoint]:
"""根据观测点ID获取观测点"""
@@ -118,4 +116,11 @@ class CheckpointService(BaseService[Checkpoint]):
'success_count': success_count,
'failed_count': failed_count,
'failed_items': failed_items
}
}
def get_by_nyid(self, db: Session, nyid: str) -> List[Checkpoint]:
"""根据NYID获取所有相关的测点信息"""
return self.get_by_field(db, "NYID", nyid)
# 通过section_id获取所有观测点数据
def get_by_section_id(self, db: Session, section_id: str) -> List[Checkpoint]:
"""根据section_id获取所有相关的测点信息"""
return self.get_by_field(db, "section_id", section_id)

View File

@@ -149,4 +149,88 @@ class DailyDataService(BaseService[DailyData]):
# 构建[[{}], [{}]]格式
return [
[record] for pid in point_ids for record in grouped.get(pid, [])
]
]
# 获取所有的今日数据
def get_all_daily_data(
self,
db: Session,
user_id: Optional[int] = None # 可选参数按user_id筛选
) -> List[Dict[str, Any]]:
"""
获取所有日常数据DailyData支持按user_id筛选
:param db: 数据库会话
:param user_id: 可选用户ID若提供则只返回该用户的数据
:return: 日常数据字典列表,包含所有字段
"""
try:
# 基础查询
query = db.query(DailyData)
# 若提供了user_id则添加筛选条件
if user_id is not None:
query = query.filter(DailyData.user_id == user_id)
logger.info(f"查询user_id={user_id}的所有日常数据")
else:
logger.info("查询所有日常数据")
# 执行查询并获取所有记录
daily_records = query.all()
# 转换为字典列表(保留所有字段)
result = []
for record in daily_records:
record_dict = {
column.name: getattr(record, column.name)
for column in DailyData.__table__.columns
}
result.append(record_dict)
logger.info(f"查询完成,共获取{len(result)}条日常数据")
return result
except Exception as e:
logger.error(f"获取日常数据失败:{str(e)}", exc_info=True)
raise e
def get_daily_data_by_account(
self,
db: Session,
account_id: str, # 账号ID必填因为是核心筛选条件
user_id: Optional[int] = None # 可选参数额外按user_id筛选
) -> List[Dict[str, Any]]:
"""
根据account_id获取对应日常数据支持额外按user_id筛选
:param db: 数据库会话
:param account_id: 账号ID必填用于精准筛选数据
:param user_id: 可选用户ID若提供则则进一步筛选该用户的数据
:return: 符合条件的日常数据字典列表,包含所有字段
"""
try:
# 基础查询先按account_id筛选必填条件
query = db.query(DailyData).filter(DailyData.account_id == account_id)
# 若提供了user_id则添加额外筛选条件
if user_id is not None:
query = query.filter(DailyData.user_id == user_id)
logger.info(f"查询account_id={account_id}且user_id={user_id}的日常数据")
else:
logger.info(f"查询account_id={account_id}的所有日常数据")
# 执行查询并获取记录
daily_records = query.all()
# 转换为字典列表(保留所有字段)
result = []
for record in daily_records:
record_dict = {
column.name: getattr(record, column.name)
for column in DailyData.__table__.columns
}
result.append(record_dict)
logger.info(f"查询完成account_id={account_id}对应{len(result)}条日常数据")
return result
except Exception as e:
logger.error(f"获取account_id={account_id}的日常数据失败:{str(e)}", exc_info=True)
raise e

View File

@@ -20,7 +20,10 @@ class SectionDataService(BaseService[SectionData]):
"""根据断面ID获取断面数据"""
sections = self.get_by_field(db, "section_id", section_id)
return sections[0] if sections else None
def get_by_account_id(self, db: Session, account_id: str) -> Optional[SectionData]:
"""根据账号ID获取断面数据"""
accounts = self.get_by_field(db, "account_id", account_id)
return accounts if accounts else None
def get_by_number(self, db: Session, number: str) -> List[SectionData]:
"""根据桥梁墩(台)编号获取断面数据"""
return self.get_by_field(db, "number", number)
@@ -33,7 +36,9 @@ class SectionDataService(BaseService[SectionData]):
number: Optional[str] = None,
status: Optional[str] = None,
basic_types: Optional[str] = None,
account_id: Optional[str] = None) -> List[SectionData]:
account_id: Optional[str] = None,
skip: int = 0,
limit: Optional[int] = None) -> List[SectionData]:
"""根据多个条件搜索断面数据"""
conditions = {}
if section_id is not None:
@@ -53,7 +58,7 @@ class SectionDataService(BaseService[SectionData]):
if account_id is not None:
conditions['account_id'] = account_id
return self.search_by_conditions(db, conditions)
return self.search_by_conditions(db, conditions, skip, limit)
def search_sections_with_checkpoints(self, db: Session,
id: Optional[int] = None,
@@ -62,9 +67,32 @@ class SectionDataService(BaseService[SectionData]):
work_site: Optional[str] = None,
number: Optional[str] = None,
status: Optional[str] = None,
account_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""查询断面数据并返回带观测点的结果"""
sections = self.search_section_data(db, id, section_id, mileage, work_site, number, status, account_id=account_id)
account_id: Optional[str] = None,
skip: int = 0,
limit: Optional[int] = None) -> Dict[str, Any]:
"""查询断面数据并返回带观测点的结果(支持分页)"""
# 构建查询条件
conditions = {}
if section_id is not None:
conditions["section_id"] = section_id
if work_site is not None:
conditions["work_site"] = work_site
if number is not None:
conditions["number"] = number
if status is not None:
conditions["status"] = status
if id is not None:
conditions['id'] = id
if mileage is not None:
conditions['mileage'] = mileage
if account_id is not None:
conditions['account_id'] = account_id
# 获取总数
total_count = self.search_by_conditions_count(db, conditions)
# 获取分页数据
sections = self.search_by_conditions(db, conditions, skip, limit)
result = []
for section in sections:
@@ -98,7 +126,12 @@ class SectionDataService(BaseService[SectionData]):
}
result.append(section_dict)
return result
return {
"data": result,
"total": total_count,
"skip": skip,
"limit": limit
}
def get_section_with_checkpoints(self, db: Session, section_id: str) -> Dict[str, Any]:
"""获取断面数据及其关联的观测点"""

View File

@@ -7,6 +7,7 @@ from sqlalchemy.orm import Session
from typing import Dict, List
from ..models.settlement_data import SettlementData
from ..models.level_data import LevelData
from ..models.section_data import SectionData
import logging
from datetime import datetime
@@ -36,6 +37,7 @@ class SettlementDataService(BaseService[SettlementData]):
nyid: Optional[str] = None,
sjName: Optional[str] = None,
workinfoname: Optional[str] = None,
skip: int = 0,
limit: Optional[int] = None) -> List[SettlementData]:
"""根据多个条件搜索沉降数据,按上传时间倒序排序"""
query = db.query(SettlementData)
@@ -54,7 +56,9 @@ class SettlementDataService(BaseService[SettlementData]):
# 按上传时间倒序排序
query = query.order_by(SettlementData.createdate.desc())
# 如果指定了limit则限制返回数量
# 添加分页支持
if skip > 0:
query = query.offset(skip)
if limit is not None and limit > 0:
query = query.limit(limit)
@@ -66,9 +70,25 @@ class SettlementDataService(BaseService[SettlementData]):
nyid: Optional[str] = None,
sjName: Optional[str] = None,
workinfoname: Optional[str] = None,
limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""查询沉降数据并返回格式化结果,按上传时间倒序排序"""
settlement_data = self.search_settlement_data(db, id, point_id, nyid, sjName, workinfoname, limit)
skip: int = 0,
limit: Optional[int] = None) -> Dict[str, Any]:
"""查询沉降数据并返回格式化结果,按上传时间倒序排序(支持分页)"""
# 先获取总数(不计分页)
count_query = db.query(SettlementData)
if id is not None:
count_query = count_query.filter(SettlementData.id == id)
if point_id is not None:
count_query = count_query.filter(SettlementData.point_id == point_id)
if nyid is not None:
count_query = count_query.filter(SettlementData.NYID == nyid)
if sjName is not None:
count_query = count_query.filter(SettlementData.sjName == sjName)
if workinfoname is not None:
count_query = count_query.filter(SettlementData.workinfoname == workinfoname)
total_count = count_query.count()
# 获取分页数据
settlement_data = self.search_settlement_data(db, id, point_id, nyid, sjName, workinfoname, skip, limit)
result = []
for settlement in settlement_data:
@@ -96,7 +116,12 @@ class SettlementDataService(BaseService[SettlementData]):
}
result.append(settlement_dict)
return result
return {
"data": result,
"total": total_count,
"skip": skip,
"limit": limit
}
def search_settlement_checkpoint_data_formatted(self, db: Session,
id: Optional[int] = None,
@@ -351,14 +376,16 @@ class SettlementDataService(BaseService[SettlementData]):
# 根据水准线路编码获取最新的NYID并获取对应的测点数据
def get_settlement_by_linecode(
self,
db: Session,
linecode: str,
num: int = 1 # 新增参数:控制返回的期数默认1最新一期
) -> Dict:
self,
db: Session,
linecode: str,
num: int = 1 # 控制返回的期数默认1最新一期
) -> Dict:
"""
根据水准线路编码(linecode)查询对应沉降数据,支持按期数筛选
关联逻辑:LevelData.linecode → LevelData.NYID → SettlementData.NYID
关联逻辑:
LevelData.linecode → LevelData.NYID → SettlementData.NYID
SettlementData.point_id字符串→ Checkpoint.point_id → Checkpoint.section_id → SectionData.section_id → SectionData.work_site
:param db: 数据库会话
:param linecode: 目标水准线路编码
:param num: 返回的期数按NYID从大到小排序默认1最新一期
@@ -367,55 +394,87 @@ class SettlementDataService(BaseService[SettlementData]):
try:
logger.info(f"开始查询linecode={linecode}对应的沉降数据(取前{num}期)")
# 1. 根据linecode查询水准线路表获取所有关联的NYID去重后按NYID降序排序
# 1. 根据linecode查询水准线路表获取前N期的NYID
nyid_query = db.query(LevelData.NYID)\
.filter(LevelData.linecode == linecode)\
.distinct()\
.order_by(LevelData.NYID.desc()) # 按NYID降序确保最新的在前
.order_by(LevelData.NYID.desc())
# 根据num参数截取前N期的NYID
top_nyids = nyid_query.limit(num).all()
if not top_nyids:
logger.warning(f"未查询到linecode={linecode}对应的水准线路记录")
return {"settlement_data": []}
# 提取NYID字符串列表按降序排列保持最新的在前
target_nyids = [item.NYID for item in top_nyids]
# 2. 根据NYID列表查询沉降数据按NYID降序、观测时间升序排列
settlement_records = db.query(SettlementData)\
.filter(SettlementData.NYID.in_(target_nyids))\
.order_by(
SettlementData.NYID.desc(), # 期数从新到旧
SettlementData.MTIME_W.asc() # 同期内按观测时间从早到晚
)\
.all()
# 2. 关联查询沉降数据 → 观测点表 → 断面表新增查询Checkpoint.aname
settlement_records = db.query(
SettlementData,
Checkpoint.section_id, # 从Checkpoint模型获取section_id
Checkpoint.aname, # 新增从Checkpoint模型获取测点名称aname
SectionData.work_site # 从SectionData模型获取work_site
)\
.join(
Checkpoint, # 关联观测点模型(类名)
SettlementData.point_id == Checkpoint.point_id, # 字符串类型匹配
isouter=True # 左连接:避免测点未关联观测点时丢失数据
)\
.join(
SectionData, # 关联断面模型(类名)
Checkpoint.section_id == SectionData.section_id, # 字符串类型匹配
isouter=True # 左连接避免断面ID未关联断面表时丢失数据
)\
.filter(SettlementData.NYID.in_(target_nyids))\
.order_by(
SettlementData.NYID.desc(), # 期数从新到旧
SettlementData.MTIME_W.asc() # 同期内按观测时间升序
)\
.all()
# 3. 转换模型实例为字典列表(处理日期格式)
# 3. 转换数据并新增字段
settlement_data = []
for record in settlement_records:
# 解析查询结果(元组:(沉降数据实例, section_id, aname, work_site)
settlement, section_id, aname, work_site = record
# 根据work_site判断work_type默认0表示未匹配或无数据 涵洞H 沉降板L 观测桩G和Z分标段 B 路基
work_type = 0
if work_site:
work_site_str = str(work_site).strip() # 确保为字符串且去空格
if "S" in aname:
work_type = 1
elif "L" in aname or "G" in aname or "Z" in aname or "B" in aname:
work_type = 2
elif "T" in aname or "D" in aname or "C " in aname:
work_type = 3
elif "H" in aname :
work_type = 4
# 组装返回字典新增aname字段
record_dict = {
"id": record.id,
"point_id": record.point_id,
"CVALUE": record.CVALUE,
"MAVALUE": record.MAVALUE,
"MTIME_W": record.MTIME_W.strftime("%Y-%m-%d %H:%M:%S") if record.MTIME_W else None,
"NYID": record.NYID,
"PRELOADH": record.PRELOADH,
"PSTATE": record.PSTATE,
"REMARK": record.REMARK,
"WORKINFO": record.WORKINFO,
"createdate": record.createdate.strftime("%Y-%m-%d %H:%M:%S") if record.createdate else None,
"day": record.day,
"day_jg": record.day_jg,
"isgzjdxz": record.isgzjdxz,
"mavalue_bc": record.mavalue_bc,
"mavalue_lj": record.mavalue_lj,
"sjName": record.sjName,
"useflag": record.useflag,
"workinfoname": record.workinfoname,
"upd_remark": record.upd_remark
"id": settlement.id,
"point_id": settlement.point_id,
"aname": aname, # 新增测点名称从Checkpoint表获取
"section_id": section_id, # 新增观测点关联的断面ID
"work_site": work_site, # 新增:断面的工点信息
"work_type": work_type, # 新增工点类型编码1-隧道2-区间路基3-桥4-
"CVALUE": settlement.CVALUE,
"MAVALUE": settlement.MAVALUE,
"MTIME_W": settlement.MTIME_W.strftime("%Y-%m-%d %H:%M:%S") if settlement.MTIME_W else None,
"NYID": settlement.NYID,
"PRELOADH": settlement.PRELOADH,
"PSTATE": settlement.PSTATE,
"REMARK": settlement.REMARK,
"WORKINFO": settlement.WORKINFO,
"createdate": settlement.createdate.strftime("%Y-%m-%d %H:%M:%S") if settlement.createdate else None,
"day": settlement.day,
"day_jg": settlement.day_jg,
"isgzjdxz": settlement.isgzjdxz,
"mavalue_bc": settlement.mavalue_bc,
"mavalue_lj": settlement.mavalue_lj,
"sjName": settlement.sjName,
"useflag": settlement.useflag,
"workinfoname": settlement.workinfoname,
"upd_remark": settlement.upd_remark
}
settlement_data.append(record_dict)

View File

@@ -21,7 +21,7 @@ class ConstructionMonitorUtils:
"桥位施工桥梁,制梁前": 30,
"桥位施工桥梁,上部结构施工中": 1,
"架桥机(运梁车)通过": 7,
"桥梁主体工程完工后,第1至3个月": 7,
"桥梁主体工程完工后,第1至3个月": 7, # 模拟包含英文逗号的原始数据
"桥梁主体工程完工后第4至6个月": 14,
"桥梁主体工程完工后6个月以后": 30,
"轨道铺设期间,前": 30,
@@ -43,18 +43,31 @@ class ConstructionMonitorUtils:
"轨道板(道床)铺设后第2至3个月": 30,
"轨道板(道床)铺设后3个月以后": 90
}
# 构建中英文括号兼容映射表
# 构建中英文括号+逗号兼容映射表
self.compatible_periods = self._build_compatible_brackets_map()
def _build_compatible_brackets_map(self) -> Dict[str, int]:
"""构建支持中英文括号的兼容映射表"""
"""构建支持中英文括号、中英文逗号的兼容映射表"""
compatible_map = {}
for original_key, period in self.base_periods.items():
# 1. 保留原始key
compatible_map[original_key] = period
# 生成中文括号版key并添加到映射表
# 2. 生成中文括号版key原逻辑
chinese_bracket_key = original_key.replace("(", "").replace(")", "")
if chinese_bracket_key != original_key:
compatible_map[chinese_bracket_key] = period
# 3. 生成英文逗号转中文逗号版key新增逻辑
chinese_comma_key = original_key.replace(",", "")
if chinese_comma_key != original_key:
compatible_map[chinese_comma_key] = period
# 4. 生成中文括号+中文逗号混合版key双重兼容
mixed_key = chinese_bracket_key.replace(",", "")
if mixed_key != original_key and mixed_key not in compatible_map:
compatible_map[mixed_key] = period
return compatible_map
def get_due_data(self, input_data: List[List[Dict]], start: int = 0, end: int = 1, current_date: datetime = None) -> Dict[str, List[Dict]]:
@@ -64,9 +77,7 @@ class ConstructionMonitorUtils:
calc_date = current_date.date() if current_date else datetime.now().date()
# ------------------------------
# 原有核心逻辑(完全不变)
# ------------------------------
for point_idx, point_data in enumerate(input_data):
if not point_data:
continue
@@ -138,22 +149,17 @@ class ConstructionMonitorUtils:
item_copy["remaining"] = due_days
result["data"].append(item_copy)
# ------------------------------
# 新增步骤data中相同NYID保留剩余天数最少的记录
# ------------------------------
if result["data"]:
# 用字典临时存储key=NYIDvalue=该NYID下剩余天数最少的记录
nyid_min_remaining = {}
for record in result["data"]:
nyid = record.get("NYID") # 假设NYID字段名是"NYID",若实际不同需调整
nyid = record.get("NYID")
if not nyid:
continue # 无NYID的记录直接保留或按需求处理
continue
# 若该NYID未存储或当前记录剩余天数更少则更新
if nyid not in nyid_min_remaining or record["remaining"] < nyid_min_remaining[nyid]["remaining"]:
nyid_min_remaining[nyid] = record
# 将字典 values 转换为列表作为去重后的data
result["data"] = list(nyid_min_remaining.values())
return result

View File

@@ -76,7 +76,9 @@ class TaskScheduler:
"""设置系统定时任务"""
try:
# 检查是否已存在每日重置任务
existing_job = self.scheduler.get_job("daily_reset_today_updated")
# existing_job = self.scheduler.get_job("get_max_nyid")
if not existing_job:
# 添加每天午夜12点重置today_updated字段的任务
self.scheduler.add_job(
@@ -294,7 +296,8 @@ def scheduled_get_max_nyid_by_point_id():
'point_id': d['point_id'],
'linecode': d['level_data']['linecode'],
'account_id': d['account_data']['account_id'],
'section_id': d['section_data']['id']
'section_id': d['section_data']['section_id'],
'remaining': d['remaining'],
}
daily_create_data.append(tem)

82
deploy.sh Normal file
View File

@@ -0,0 +1,82 @@
#!/bin/bash
# 服务部署脚本
# 用于停止旧服务并重新启动服务
# 使用方法: ./deploy.sh [0] (带0参数时不拉取代码)
echo "=== 服务部署脚本 ==="
echo "此脚本将停止当前服务并重新启动服务"
echo ""
# 检查是否跳过git pull
SKIP_GIT_PULL=0
if [ "$1" = "0" ]; then
SKIP_GIT_PULL=1
echo "跳过代码拉取,直接部署"
else
echo "默认执行代码拉取"
fi
# 如果不跳过git pull则拉取最新代码
if [ $SKIP_GIT_PULL -eq 0 ]; then
echo ""
echo "正在拉取最新代码..."
git pull origin main
if [ $? -eq 0 ]; then
echo "✓ 代码拉取成功"
else
echo "✗ 代码拉取失败"
echo "是否继续部署? (y/n)"
read -r CONTINUE_DEPLOY
if [ "$CONTINUE_DEPLOY" != "y" ] && [ "$CONTINUE_DEPLOY" != "Y" ]; then
echo "部署已取消"
exit 1
fi
fi
fi
# 读取sudo密码
echo ""
echo -n "请输入sudo密码: "
read -s SUDO_PASSWORD
echo ""
echo ""
# 检查密码是否为空
if [ -z "$SUDO_PASSWORD" ]; then
echo "错误: 密码不能为空"
exit 1
fi
echo "正在停止当前服务..."
# 使用expect或者直接传递密码给sudo
echo "$SUDO_PASSWORD" | sudo -S docker compose down --rmi all
if [ $? -eq 0 ]; then
echo "✓ 服务已成功停止"
else
echo "✗ 停止服务失败,请检查密码是否正确"
exit 1
fi
echo ""
echo "正在启动新服务..."
echo "$SUDO_PASSWORD" | sudo -S docker compose up -d
if [ $? -eq 0 ]; then
echo "✓ 服务启动成功"
echo ""
echo "服务状态:"
echo "$SUDO_PASSWORD" | sudo -S docker compose ps
else
echo "✗ 启动服务失败"
exit 1
fi
echo ""
echo "当前运行的Docker容器:"
echo "$SUDO_PASSWORD" | sudo -S docker ps
echo ""
echo "=== 部署完成 ==="

0
your_database.db Normal file
View File