查询数据1

This commit is contained in:
lhx
2025-09-29 11:58:56 +08:00
parent 2e735e587b
commit 242fedd37e
5 changed files with 129 additions and 24 deletions

View File

@@ -9,8 +9,11 @@ from ..schemas.comprehensive_data import (
BatchLevelDataImportRequest, BatchLevelDataImportRequest,
BatchOriginalDataImportRequest, BatchOriginalDataImportRequest,
DataImportResponse, DataImportResponse,
ComprehensiveDataImportRequest, DataResponse,
ComprehensiveDataImportResponse SectionDataQueryRequest,
SettlementDataQueryRequest,
OriginalDataQueryRequest,
LevelDataQueryRequest
) )
from ..services.section_data import SectionDataService from ..services.section_data import SectionDataService
from ..services.checkpoint import CheckpointService from ..services.checkpoint import CheckpointService
@@ -134,20 +137,30 @@ def batch_import_original_data(request: BatchOriginalDataImportRequest, db: Sess
detail=f"批量导入原始数据失败: {str(e)}" detail=f"批量导入原始数据失败: {str(e)}"
) )
# 保留原有接口以保持兼容性 # 查询断面数据对应观察点数据
@router.post("/data_settlement_import", response_model=ComprehensiveDataImportResponse) @router.post("/get_section", response_model = DataResponse)
def data_import(request: ComprehensiveDataImportRequest, db: Session = Depends(get_db)): def get_section(request: SectionDataQueryRequest, db: Session = Depends(get_db)):
"""导入综合数据 (兼容旧接口)""" """获取断面数据 + 观测点"""
try: section_service.search_section_data(db,
logger.info("Using legacy data import interface") id=request.id,
# 这里可以根据需要实现旧接口的兼容逻辑 section_id=request.section_id,
return ComprehensiveDataImportResponse( mileage=request.mileage,
success=True, work_site=request.work_site
message="请使用新的批量导入接口" )
)
except Exception as e:
logger.error(f"Legacy data import failed: {str(e)}")
raise HTTPException( # 根据观测点id查询沉降数据
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, @router.post("/get_section", response_model = DataResponse)
detail=f"数据导入失败: {str(e)}" def get_settlenment(request: SettlementDataQueryRequest, db: Session = Depends(get_db)):
) """获取沉降数据"""
# 查询水准数据
@router.post("/get_level", response_model = DataResponse)
def get_level(request: LevelDataQueryRequest, db: Session = Depends(get_db)):
"""查询水准数据"""
# 根据期数id获取原始数据
def get_original(request: OriginalDataQueryRequest, db: Session = Depends(get_db)):
"""获取原始数据 + 水准数据"""

View File

@@ -1,5 +1,6 @@
from sqlalchemy import Column, Integer, String from sqlalchemy import Column, Integer, String, List
from ..core.database import Base from ..core.database import Base
from .checkpoint import Checkpoint
class SectionData(Base): class SectionData(Base):
__tablename__ = "section_data" __tablename__ = "section_data"
@@ -12,4 +13,6 @@ class SectionData(Base):
status = Column(String(100), nullable=False, comment="断面状态") status = Column(String(100), nullable=False, comment="断面状态")
number = Column(String(100), nullable=False, comment="所在桥梁墩(台)编号", index=True) number = Column(String(100), nullable=False, comment="所在桥梁墩(台)编号", index=True)
transition_paragraph = Column(String(100), comment="过渡段") transition_paragraph = Column(String(100), comment="过渡段")
section_id = Column(String(100), nullable=False, comment="断面id", index=True) section_id = Column(String(100), nullable=False, comment="断面id", index=True)
Checkpoints = List[Checkpoint]

View File

@@ -61,6 +61,61 @@ class SectionDataImportRequest(BaseModel):
height: Optional[str] = None height: Optional[str] = None
transition_paragraph: Optional[str] = None transition_paragraph: Optional[str] = None
# 原始数据查询请求
class OriginalDataQueryRequest(BaseModel):
id: Optional[int] = None
bfpcode: Optional[str] = None
mtime: Optional[str] = None
bffb: Optional[str] = None
bfpl: Optional[str] = None
bfpvalue: Optional[str] = None
times: Optional[str] = None
NYID: str
sort: Optional[int] = None
# 沉降数据查询请求
class SettlementDataQueryRequest(BaseModel):
id: Optional[int] = None
point_id: Optional[int] = None
NYID: str
CVALUE: Optional[str] = None
MAVALUE: Optional[str] = None
MTIME_W: Optional[str] = None
PRELOADH: Optional[str] = None
PSTATE: Optional[str] = None
createdate: Optional[str] = None
day: Optional[str] = None
day_jg: Optional[str] = None
REMARK: Optional[str] = None
WORKINFO: Optional[str] = None
useflag: Optional[str] = None
mavalue_lj: Optional[str] = None
mavalue_bc: Optional[str] = None
sjName: Optional[str] = None
workinfoname: Optional[str] = None
isgzjdxz: Optional[str] = None
upd_remark: Optional[str] = None
# 断面数据导入请求
class SectionDataQueryRequest(BaseModel):
id: Optional[int] = None
section_id: str
mileage: Optional[str] = None
work_site: Optional[str] = None
status: Optional[str] = None
number: Optional[str] = None
basic_types: Optional[str] = None
height: Optional[str] = None
transition_paragraph: Optional[str] = None
# 水准数据查询请求
class LevelDataQueryRequest(BaseModel):
linecode: Optional[str] = None
NYID: Optional[str] = None
benchmarkids: Optional[str] = None
wsphigh: Optional[str] = None
createDate: Optional[str] = None
# 批量导入请求 # 批量导入请求
class BatchSectionDataImportRequest(BaseModel): class BatchSectionDataImportRequest(BaseModel):
data: List[SectionDataImportRequest] data: List[SectionDataImportRequest]
@@ -77,7 +132,7 @@ class BatchLevelDataImportRequest(BaseModel):
class BatchOriginalDataImportRequest(BaseModel): class BatchOriginalDataImportRequest(BaseModel):
data: List[OriginalDataImportRequest] data: List[OriginalDataImportRequest]
# 响应模型 # 新增响应模型
class DataImportResponse(BaseModel): class DataImportResponse(BaseModel):
success: bool success: bool
message: str message: str
@@ -86,6 +141,13 @@ class DataImportResponse(BaseModel):
failed_count: int failed_count: int
failed_items: List[Dict[str, Any]] = [] failed_items: List[Dict[str, Any]] = []
# 查询响应模型
class DataResponse(BaseModel):
success: bool
message: str
count: int
data: List[Dict[str, Any]] = []
# 兼容旧接口的模型 # 兼容旧接口的模型
class ComprehensiveDataImportRequest(BaseModel): class ComprehensiveDataImportRequest(BaseModel):
data: Dict[str, Any] data: Dict[str, Any]

View File

@@ -26,7 +26,9 @@ class SectionDataService(BaseService[SectionData]):
return self.get_by_field(db, "number", number) return self.get_by_field(db, "number", number)
def search_section_data(self, db: Session, def search_section_data(self, db: Session,
id: Optional[int] = None,
section_id: Optional[str] = None, section_id: Optional[str] = None,
mileage: Optional[str] = None,
work_site: Optional[str] = None, work_site: Optional[str] = None,
number: Optional[str] = None, number: Optional[str] = None,
status: Optional[str] = None, status: Optional[str] = None,
@@ -43,8 +45,14 @@ class SectionDataService(BaseService[SectionData]):
conditions["status"] = status conditions["status"] = status
if basic_types is not None: if basic_types is not None:
conditions["basic_types"] = basic_types conditions["basic_types"] = basic_types
if id is not None:
return self.search_by_conditions(db, conditions) conditions['id'] = id
if mileage is not None:
conditions['mileage'] = mileage
section_data = self.search_by_conditions(db, conditions)
# 查询对应观察点数据
if len(section_data) > 0:
def get_section_with_checkpoints(self, db: Session, section_id: str) -> Dict[str, Any]: def get_section_with_checkpoints(self, db: Session, section_id: str) -> Dict[str, Any]:
"""获取断面数据及其关联的观测点""" """获取断面数据及其关联的观测点"""

View File

@@ -81,6 +81,24 @@ class TaskScheduler:
except Exception as e: except Exception as e:
logger.error(f"设置系统定时任务失败: {e}") logger.error(f"设置系统定时任务失败: {e}")
# 设置测试任务
try:
existing_job = self.scheduler.get_job("test_task")
if not existing_job:
# 添加每天每小时重置today_updated字段的任务
self.scheduler.add_job(
reset_today_updated_task,
'cron',
id='test_task',
hour='*',
minute=0,
second=0,
name='测试任务'
)
logger.info("系统定时任务:测试任务已添加")
except Exception as e:
logger.error(f"设置测试任务失败: {e}")
def add_cron_job(self, func, job_id: str, **kwargs): def add_cron_job(self, func, job_id: str, **kwargs):
"""添加cron定时任务""" """添加cron定时任务"""
return self.scheduler.add_job(func, 'cron', id=job_id, **kwargs) return self.scheduler.add_job(func, 'cron', id=job_id, **kwargs)
@@ -142,6 +160,7 @@ def reset_today_updated_task():
logger.info("开始执行每日重置账号更新状态任务") logger.info("开始执行每日重置账号更新状态任务")
# 更新所有账号的today_updated字段为0 # 更新所有账号的today_updated字段为0
updated_need_count = db.query(Account).filter(Account.today_updated == 1).count()
updated_count = db.query(Account).update({Account.today_updated: 0}) updated_count = db.query(Account).update({Account.today_updated: 0})
db.commit() db.commit()