Files
railway_cloud/app/services/daily_diff.py
2026-01-07 11:47:01 +08:00

178 lines
6.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy.orm import Session
from sqlalchemy import and_
from typing import List, Optional, Dict, Any, Tuple
from datetime import date
from ..models.daily_diff import DailyDiff
from .base import BaseService
import logging
logger = logging.getLogger(__name__)
class DailyDiffService(BaseService[DailyDiff]):
def __init__(self):
super().__init__(DailyDiff)
def batch_create(self, db: Session, data_list: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
批量新增数据
使用check_time和linecode检查重复重复则跳过
优化批量IN查询后代码处理减少循环调用数据库
"""
total_count = len(data_list)
if total_count == 0:
return {
'success': True,
'message': '数据为空',
'total_count': 0,
'success_count': 0,
'skip_count': 0
}
try:
# 提取所有check_time和linecode用于批量查询
check_times = list(set(item.get('check_time') for item in data_list if item.get('check_time')))
linecodes = list(set(item.get('linecode') for item in data_list if item.get('linecode')))
# 批量查询已存在的数据IN查询
existing_records = []
if check_times and linecodes:
existing_records = db.query(DailyDiff).filter(
and_(
DailyDiff.check_time.in_(check_times),
DailyDiff.linecode.in_(linecodes)
)
).all()
# 构建已存在数据的key集合: (check_time, linecode)
existing_keys = set()
for record in existing_records:
key = (record.check_time, record.linecode)
existing_keys.add(key)
# 筛选需要插入的数据
to_insert = []
skip_count = 0
for item in data_list:
check_time = item.get('check_time')
linecode = item.get('linecode')
key = (check_time, linecode)
if key in existing_keys:
skip_count += 1
logger.debug(f"跳过重复数据: check_time={check_time}, linecode={linecode}")
else:
to_insert.append(item)
# 添加到已存在集合,防止本批次内重复
existing_keys.add(key)
# 批量插入
success_count = 0
if to_insert:
batch_size = 500
for i in range(0, len(to_insert), batch_size):
batch = to_insert[i:i + batch_size]
db_objects = [
DailyDiff(
account_id=item.get('account_id'),
account_name=item.get('account_name'),
check_time=item.get('check_time'),
linecode=item.get('linecode')
)
for item in batch
]
db.add_all(db_objects)
success_count += len(batch)
db.commit()
logger.info(f"批量插入完成: 成功{success_count}条, 跳过{skip_count}")
return {
'success': True,
'message': '批量新增完成',
'total_count': total_count,
'success_count': success_count,
'skip_count': skip_count
}
except Exception as e:
db.rollback()
logger.error(f"批量新增失败: {str(e)}", exc_info=True)
return {
'success': False,
'message': f'批量新增失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'skip_count': 0
}
def query_by_conditions(
self,
db: Session,
account_id: Optional[int] = None,
account_name: Optional[str] = None,
check_time: Optional[date] = None,
check_time_start: Optional[date] = None,
check_time_end: Optional[date] = None,
linecode: Optional[str] = None,
linecodes: Optional[List[str]] = None,
page: Optional[int] = None,
page_size: Optional[int] = None
) -> Tuple[List[DailyDiff], int]:
"""
多条件查询
返回: (数据列表, 总数)
"""
query = db.query(DailyDiff)
# 构建查询条件
if account_id is not None:
query = query.filter(DailyDiff.account_id == account_id)
if account_name:
query = query.filter(DailyDiff.account_name.like(f"%{account_name}%"))
if check_time:
query = query.filter(DailyDiff.check_time == check_time)
if check_time_start:
query = query.filter(DailyDiff.check_time >= check_time_start)
if check_time_end:
query = query.filter(DailyDiff.check_time <= check_time_end)
if linecode:
query = query.filter(DailyDiff.linecode == linecode)
if linecodes:
query = query.filter(DailyDiff.linecode.in_(linecodes))
# 获取总数
total = query.count()
# 分页(如果提供了分页参数)
if page is not None and page_size is not None:
offset = (page - 1) * page_size
data = query.order_by(DailyDiff.id.desc()).offset(offset).limit(page_size).all()
else:
data = query.order_by(DailyDiff.id.desc()).all()
return data, total
def batch_delete(self, db: Session, ids: List[int]) -> int:
"""
批量删除
返回删除的记录数
"""
if not ids:
return 0
try:
deleted_count = db.query(DailyDiff).filter(DailyDiff.id.in_(ids)).delete(synchronize_session=False)
db.commit()
logger.info(f"批量删除完成: 删除{deleted_count}")
return deleted_count
except Exception as e:
db.rollback()
logger.error(f"批量删除失败: {str(e)}", exc_info=True)
raise e