from sqlalchemy.orm import Session from sqlalchemy import and_ from typing import List, Optional, Dict, Any, Tuple from datetime import date from ..models.daily_diff import DailyDiff from .base import BaseService import logging logger = logging.getLogger(__name__) class DailyDiffService(BaseService[DailyDiff]): def __init__(self): super().__init__(DailyDiff) def batch_create(self, db: Session, data_list: List[Dict[str, Any]]) -> Dict[str, Any]: """ 批量新增数据 使用check_time和linecode检查重复,重复则跳过 优化:批量IN查询后代码处理,减少循环调用数据库 """ total_count = len(data_list) if total_count == 0: return { 'success': True, 'message': '数据为空', 'total_count': 0, 'success_count': 0, 'skip_count': 0 } try: # 提取所有check_time和linecode用于批量查询 check_times = list(set(item.get('check_time') for item in data_list if item.get('check_time'))) linecodes = list(set(item.get('linecode') for item in data_list if item.get('linecode'))) # 批量查询已存在的数据(IN查询) existing_records = [] if check_times and linecodes: existing_records = db.query(DailyDiff).filter( and_( DailyDiff.check_time.in_(check_times), DailyDiff.linecode.in_(linecodes) ) ).all() # 构建已存在数据的key集合: (check_time, linecode) existing_keys = set() for record in existing_records: key = (record.check_time, record.linecode) existing_keys.add(key) # 筛选需要插入的数据 to_insert = [] skip_count = 0 for item in data_list: check_time = item.get('check_time') linecode = item.get('linecode') key = (check_time, linecode) if key in existing_keys: skip_count += 1 logger.debug(f"跳过重复数据: check_time={check_time}, linecode={linecode}") else: to_insert.append(item) # 添加到已存在集合,防止本批次内重复 existing_keys.add(key) # 批量插入 success_count = 0 if to_insert: batch_size = 500 for i in range(0, len(to_insert), batch_size): batch = to_insert[i:i + batch_size] db_objects = [ DailyDiff( account_id=item.get('account_id'), account_name=item.get('account_name'), check_time=item.get('check_time'), linecode=item.get('linecode') ) for item in batch ] db.add_all(db_objects) success_count += len(batch) db.commit() logger.info(f"批量插入完成: 成功{success_count}条, 跳过{skip_count}条") return { 'success': True, 'message': '批量新增完成', 'total_count': total_count, 'success_count': success_count, 'skip_count': skip_count } except Exception as e: db.rollback() logger.error(f"批量新增失败: {str(e)}", exc_info=True) return { 'success': False, 'message': f'批量新增失败: {str(e)}', 'total_count': total_count, 'success_count': 0, 'skip_count': 0 } def query_by_conditions( self, db: Session, account_id: Optional[int] = None, account_name: Optional[str] = None, check_time: Optional[date] = None, check_time_start: Optional[date] = None, check_time_end: Optional[date] = None, linecode: Optional[str] = None, linecodes: Optional[List[str]] = None, page: Optional[int] = None, page_size: Optional[int] = None ) -> Tuple[List[DailyDiff], int]: """ 多条件查询 返回: (数据列表, 总数) """ query = db.query(DailyDiff) # 构建查询条件 if account_id is not None: query = query.filter(DailyDiff.account_id == account_id) if account_name: query = query.filter(DailyDiff.account_name.like(f"%{account_name}%")) if check_time: query = query.filter(DailyDiff.check_time == check_time) if check_time_start: query = query.filter(DailyDiff.check_time >= check_time_start) if check_time_end: query = query.filter(DailyDiff.check_time <= check_time_end) if linecode: query = query.filter(DailyDiff.linecode == linecode) if linecodes: query = query.filter(DailyDiff.linecode.in_(linecodes)) # 获取总数 total = query.count() # 分页(如果提供了分页参数) if page is not None and page_size is not None: offset = (page - 1) * page_size data = query.order_by(DailyDiff.id.desc()).offset(offset).limit(page_size).all() else: data = query.order_by(DailyDiff.id.desc()).all() return data, total def batch_delete(self, db: Session, ids: List[int]) -> int: """ 批量删除 返回删除的记录数 """ if not ids: return 0 try: deleted_count = db.query(DailyDiff).filter(DailyDiff.id.in_(ids)).delete(synchronize_session=False) db.commit() logger.info(f"批量删除完成: 删除{deleted_count}条") return deleted_count except Exception as e: db.rollback() logger.error(f"批量删除失败: {str(e)}", exc_info=True) raise e