Files
railway_cloud/app/services/daily.py
2026-01-10 19:13:51 +08:00

357 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy.orm import Session
from typing import List, Optional, Dict, Any, Set, Tuple,Union
from ..models.level_data import LevelData
from ..models.daily import DailyData
from .base import BaseService
from ..models.settlement_data import SettlementData
from sqlalchemy import func, select, desc,over
from sqlalchemy.orm import Session
import logging
logger = logging.getLogger(__name__)
class DailyDataService(BaseService[DailyData]):
def __init__(self):
super().__init__(DailyData)
def _dict_to_instance(self, data_dict: Dict) -> DailyData:
"""辅助方法:将单个字典转换为 DailyData 实例"""
model_fields = [col.name for col in DailyData.__table__.columns]
filtered_data = {k: v for k, v in data_dict.items() if k in model_fields}
return DailyData(**filtered_data)
def _ensure_instances(self, data: Union[List[Dict], List[DailyData]]) -> List[DailyData]:
"""确保输入数据是 DailyData 实例列表"""
if not isinstance(data, list):
raise TypeError(f"输入必须是列表,而非 {type(data)}")
instances = []
for item in data:
if isinstance(item, DailyData):
instances.append(item)
elif isinstance(item, dict):
instances.append(self._dict_to_instance(item))
else:
raise TypeError(f"列表元素必须是 dict 或 DailyData 实例,而非 {type(item)}")
return instances
def batch_create_by_account_nyid(self, db: Session, data: Union[List[Dict], List[DailyData]]) -> List[DailyData]:
"""
批量创建记录,支持两种输入格式:
- List[DailyData]:模型实例列表
- List[dict]:字典列表(自动转换为实例)
通过 (account_id, NYID) 联合判断是否已存在,存在则忽略 --(暂时取消查重)
"""
try:
data_list = self._ensure_instances(data)
except TypeError as e:
logger.error(f"数据格式错误:{str(e)}")
raise
target_pairs: List[Tuple[int, int]] = [
(item.account_id, item.NYID)
for item in data_list
if item.account_id is not None and item.NYID is not None
]
if not target_pairs:
logger.warning("批量创建失败:所有记录缺少 account_id 或 NYID")
return []
# 取消查重处理
# existing_pairs: Set[Tuple[int, int]] = {
# (item.account_id, item.NYID)
# for item in db.query(DailyData.account_id, DailyData.NYID)
# .filter(DailyData.account_id.in_([p[0] for p in target_pairs]),
# DailyData.NYID.in_([p[1] for p in target_pairs]))
# .all()
# }
to_create = [
item for item in data_list
# if (item.account_id, item.NYID) not in existing_pairs
]
# ignored_count = len(data_list) - len(to_create)
# if ignored_count > 0:
# logger.info(f"批量创建时忽略{ignored_count}条已存在记录account_id和NYID已存在")
logger.info(f"批量创建 {to_create}")
if not to_create:
return []
# 修复点:使用 add_all 替代 bulk_save_objects确保对象被会话跟踪
db.add_all(to_create) # 这里是关键修改
db.commit()
# 现在可以安全地刷新实例了
for item in to_create:
db.refresh(item)
return to_create
def get_nyid_by_point_id(
self,
db: Session,
point_ids: Optional[List[int]] = None,
max_num: int = 1
) -> List[List[dict]]:
"""
获取指定point_id的记录每个point_id的前max_num条记录放在同一个子列表中
返回格式:[[point1_records...], [point2_records...]]
"""
# 处理参数默认值
point_ids = point_ids or []
if max_num <= 0:
return []
# 窗口函数按point_id分组每组内按NYID降序编号
row_num = over(
func.row_number(),
partition_by=SettlementData.point_id,
order_by=desc(SettlementData.NYID)
).label("row_num")
# 模型字段列表
model_columns = [getattr(SettlementData, col.name) for col in SettlementData.__table__.columns]
# 基础条件
base_conditions = [
SettlementData.useflag.isnot(None),
SettlementData.useflag != 0
]
if point_ids:
base_conditions.append(SettlementData.point_id.in_(point_ids))
# 子查询
subquery = (
select(*model_columns, row_num)
.where(*base_conditions)
.subquery()
)
# 主查询筛选每个point_id的前max_num条
query = (
select(subquery)
.where(subquery.c.row_num <= max_num)
.order_by(subquery.c.point_id, subquery.c.row_num)
)
# 执行查询
results = db.execute(query).all()
grouped: Dict[int, List[dict]] = {} # 键point_id该point_id的所有记录子列表
field_names = [col.name for col in SettlementData.__table__.columns]
for row in results:
item_dict = {field: getattr(row, field) for field in field_names}
try:
pid = int(item_dict["point_id"]) # 确保point_id为整数
except (KeyError, ValueError):
continue # 跳过无效记录
# 同一point_id的记录放入同一个列表
if pid not in grouped:
grouped[pid] = [] # 初始化子列表
grouped[pid].append(item_dict) # 追加到子列表
# 按输入point_ids顺序整理结果关键每个point_id对应一个子列表
if not point_ids:
point_ids = sorted(grouped.keys()) # 若无指定按point_id排序
# 构建最终结果每个point_id的记录作为一个子列表
return [grouped.get(pid, []) for pid in point_ids]
# 获取所有的今日数据
def get_all_daily_data(
self,
db: Session,
user_id: Optional[int] = None # 可选参数按user_id筛选
) -> List[Dict[str, Any]]:
"""
获取所有日常数据DailyData支持按user_id筛选
:param db: 数据库会话
:param user_id: 可选用户ID若提供则只返回该用户的数据
:return: 日常数据字典列表,包含所有字段
"""
try:
# 基础查询
query = db.query(DailyData)
# 若提供了user_id则添加筛选条件
if user_id is not None:
query = query.filter(DailyData.user_id == user_id)
logger.info(f"查询user_id={user_id}的所有日常数据")
else:
logger.info("查询所有日常数据")
# 执行查询并获取所有记录
daily_records = query.all()
# 转换为字典列表(保留所有字段)
result = []
for record in daily_records:
record_dict = {
column.name: getattr(record, column.name)
for column in DailyData.__table__.columns
}
result.append(record_dict)
logger.info(f"查询完成,共获取{len(result)}条日常数据")
return result
except Exception as e:
logger.error(f"获取日常数据失败:{str(e)}", exc_info=True)
raise e
def get_daily_data_by_account(
self,
db: Session,
account_id: str, # 账号ID必填因为是核心筛选条件
user_id: Optional[int] = None # 可选参数额外按user_id筛选
) -> List[Dict[str, Any]]:
"""
根据account_id获取对应日常数据支持额外按user_id筛选
:param db: 数据库会话
:param account_id: 账号ID必填用于精准筛选数据
:param user_id: 可选用户ID若提供则则进一步筛选该用户的数据
:return: 符合条件的日常数据字典列表,包含所有字段
"""
try:
# 基础查询先按account_id筛选必填条件
query = db.query(DailyData).filter(DailyData.account_id == account_id)
# 若提供了user_id则添加额外筛选条件
if user_id is not None:
query = query.filter(DailyData.user_id == user_id)
logger.info(f"查询account_id={account_id}且user_id={user_id}的日常数据")
else:
logger.info(f"查询account_id={account_id}的所有日常数据")
# 执行查询并获取记录
daily_records = query.all()
# 转换为字典列表(保留所有字段)
result = []
for record in daily_records:
record_dict = {
column.name: getattr(record, column.name)
for column in DailyData.__table__.columns
}
result.append(record_dict)
logger.info(f"查询完成account_id={account_id}对应{len(result)}条日常数据")
return result
except Exception as e:
logger.error(f"获取account_id={account_id}的日常数据失败:{str(e)}", exc_info=True)
raise e
def create_daily_from_linecode(
self,
db: Session,
linecode: str,
account_id: Optional[int] = None
) -> List[DailyData]:
"""
通过水准线路编码生成 daily 数据
业务逻辑:
1. 在水准数据表level_data中查找符合 linecode 的记录,且 NYID 最大
2. 通过 NYID 查询沉降数据表settlement_data
3. 通过沉降数据的 point_id 查询观测点表checkpoint得到 section_id
4. 通过 section_id 查询断面表section_data得到 account_id
5. 整合这些数据,形成 daily 对象,插入到数据库表
Args:
db: 数据库会话
linecode: 水准线路编码
account_id: 可选的账户ID筛选条件
Returns:
创建的 DailyData 记录列表
"""
try:
logger.info(f"开始处理 linecode={linecode} 的 daily 数据生成请求")
from ..models.level_data import LevelData
from ..models.settlement_data import SettlementData
from ..models.checkpoint import Checkpoint
from ..models.section_data import SectionData
# 1. 在水准数据表中查找符合 linecode 的记录,且 NYID 最大
level_data_list = db.query(LevelData)\
.filter(LevelData.linecode == linecode)\
.all()
if not level_data_list:
raise ValueError(f"未找到 linecode={linecode} 对应的水准数据")
# 找到 NYID 最大的记录(将 String 转换为数字进行比较)
max_nyid = max(level_data_list, key=lambda x: int(x.NYID) if x.NYID.isdigit() else 0)
target_nyid = max_nyid.NYID
logger.info(f"找到最大 NYID: {target_nyid}")
# 2. 通过 NYID 查询沉降数据
settlement_data_list = db.query(SettlementData)\
.filter(SettlementData.NYID == target_nyid)\
.all()
if not settlement_data_list:
raise ValueError(f"未找到 NYID={target_nyid} 对应的沉降数据")
# 3. 遍历沉降数据,构建 daily 记录
daily_records = []
for settlement in settlement_data_list:
# 3.1 通过沉降数据的 point_id 查询观测点表,得到 section_id
checkpoint = db.query(Checkpoint)\
.filter(Checkpoint.point_id == settlement.point_id)\
.first()
if not checkpoint:
logger.warning(f"未找到 point_id={settlement.point_id} 对应的观测点,跳过该记录")
continue
# 3.2 通过 section_id 查询断面表,得到 account_id
section = db.query(SectionData)\
.filter(SectionData.section_id == checkpoint.section_id)\
.first()
if not section:
logger.warning(f"未找到 section_id={checkpoint.section_id} 对应的断面,跳过该记录")
continue
# 3.3 从断面数据中获取 account_id作为 user_id
user_id = int(section.account_id) if section.account_id else None
if not user_id:
logger.warning(f"断面 section_id={checkpoint.section_id} 没有 account_id跳过该记录")
continue
# 如果提供了 account_id 筛选条件,则只处理匹配的记录
if account_id is not None and user_id != account_id:
continue
# 3.4 构建 daily 记录
daily_record = DailyData(
user_id=user_id,
account_id=user_id,
point_id=settlement.point_id,
NYID=settlement.NYID,
linecode=linecode,
section_id=checkpoint.section_id,
remaining=0,
is_all=0
)
daily_records.append(daily_record)
if not daily_records:
logger.warning(f"没有生成任何 daily 记录")
return []
# 4. 批量插入 daily 记录
created_records = self.batch_create_by_account_nyid(db, daily_records)
logger.info(f"成功生成 {len(created_records)} 条 daily 记录")
return created_records
except ValueError:
raise
except Exception as e:
logger.error(f"生成 daily 数据失败:{str(e)}", exc_info=True)
raise