diff --git a/app/api/daily_diff.py b/app/api/daily_diff.py new file mode 100644 index 0000000..3a4bc25 --- /dev/null +++ b/app/api/daily_diff.py @@ -0,0 +1,94 @@ +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.orm import Session +from ..core.database import get_db +from ..core.response_code import ResponseCode +from ..schemas.daily_diff import ( + BatchCreateRequest, + BatchCreateResponse, + DailyDiffQueryRequest, + DailyDiffListResponse, + DailyDiffResponse, + BatchDeleteRequest, + BatchDeleteResponse +) +from ..services.daily_diff import DailyDiffService + +router = APIRouter(prefix="/daily_diff", tags=["每日差异数据"]) + +@router.post("/batch_create", response_model=BatchCreateResponse) +def batch_create(request: BatchCreateRequest, db: Session = Depends(get_db)): + """ + 批量新增数据 + 使用check_time和linecode检查重复,重复则跳过该条数据 + """ + try: + service = DailyDiffService() + # 转换为字典列表 + data_list = [item.model_dump() for item in request.data] + result = service.batch_create(db, data_list) + + return BatchCreateResponse( + code=ResponseCode.SUCCESS if result['success'] else ResponseCode.IMPORT_FAILED, + message=result['message'], + total_count=result['total_count'], + success_count=result['success_count'], + skip_count=result['skip_count'] + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"批量新增失败: {str(e)}" + ) + +@router.post("/query", response_model=DailyDiffListResponse) +def query(request: DailyDiffQueryRequest, db: Session = Depends(get_db)): + """ + 多条件查询 + 支持account_id、account_name、check_time、check_time范围、linecode等条件 + """ + try: + service = DailyDiffService() + data, total = service.query_by_conditions( + db, + account_id=request.account_id, + account_name=request.account_name, + check_time=request.check_time, + check_time_start=request.check_time_start, + check_time_end=request.check_time_end, + linecode=request.linecode, + linecodes=request.linecodes, + page=request.page, + page_size=request.page_size + ) + + return DailyDiffListResponse( + code=ResponseCode.SUCCESS, + message="查询成功", + total=total, + data=[DailyDiffResponse.model_validate(item) for item in data] + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"查询失败: {str(e)}" + ) + +@router.post("/batch_delete", response_model=BatchDeleteResponse) +def batch_delete(request: BatchDeleteRequest, db: Session = Depends(get_db)): + """ + 批量删除数据 + """ + try: + service = DailyDiffService() + deleted_count = service.batch_delete(db, request.ids) + + return BatchDeleteResponse( + code=ResponseCode.SUCCESS, + message="删除成功", + deleted_count=deleted_count + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"删除失败: {str(e)}" + ) diff --git a/app/main.py b/app/main.py index 2fe1c27..0cac66f 100644 --- a/app/main.py +++ b/app/main.py @@ -17,6 +17,7 @@ from .api.section_data import router as section_data_router from .api.level_data import router as level_data_router from .api.checkpoint import router as checkpoint_router from .api.function_list import router as function_list_router +from .api.daily_diff import router as daily_diff_router from .utils.scheduler import task_scheduler # 初始化日志系统 @@ -100,6 +101,7 @@ app.include_router(section_data_router, prefix="/api") app.include_router(level_data_router, prefix="/api") app.include_router(checkpoint_router, prefix="/api") app.include_router(function_list_router, prefix="/api") +app.include_router(daily_diff_router, prefix="/api") # app.include_router(test_router, prefix="/api") # 根路径 diff --git a/app/models/daily_diff.py b/app/models/daily_diff.py new file mode 100644 index 0000000..7d58e02 --- /dev/null +++ b/app/models/daily_diff.py @@ -0,0 +1,21 @@ +from sqlalchemy import Column, BigInteger, String, Date +from ..core.database import Base + +class DailyDiff(Base): + __tablename__ = "daily_diff" + + id = Column(BigInteger, primary_key=True, index=True, autoincrement=True) + account_id = Column(BigInteger, nullable=False, comment="账号ID", index=True) + account_name = Column(String(100), comment="账号名称") + check_time = Column(Date, comment="检查时间", index=True) + linecode = Column(String(100), comment="线路编码", index=True) + + def to_dict(self): + """将模型实例转换为字典""" + return { + "id": self.id, + "account_id": self.account_id, + "account_name": self.account_name, + "check_time": self.check_time.strftime("%Y-%m-%d") if self.check_time else None, + "linecode": self.linecode + } diff --git a/app/schemas/daily_diff.py b/app/schemas/daily_diff.py new file mode 100644 index 0000000..fa4a4ca --- /dev/null +++ b/app/schemas/daily_diff.py @@ -0,0 +1,62 @@ +from pydantic import BaseModel, Field +from typing import Optional, List +from datetime import date + +class DailyDiffBase(BaseModel): + """daily_diff基础模型""" + account_id: int = Field(..., description="账号ID") + account_name: Optional[str] = Field(None, description="账号名称") + check_time: Optional[date] = Field(None, description="检查时间(年月日)") + linecode: Optional[str] = Field(None, description="线路编码") + +class DailyDiffCreate(DailyDiffBase): + """创建daily_diff请求模型""" + pass + +class DailyDiffResponse(DailyDiffBase): + """daily_diff响应模型""" + id: int + + class Config: + from_attributes = True + +class BatchCreateRequest(BaseModel): + """批量新增请求""" + data: List[DailyDiffCreate] = Field(..., description="批量新增数据列表") + +class BatchCreateResponse(BaseModel): + """批量新增响应""" + code: int = 0 + message: str + total_count: int = Field(0, description="总数据量") + success_count: int = Field(0, description="成功插入数量") + skip_count: int = Field(0, description="跳过重复数量") + +class DailyDiffQueryRequest(BaseModel): + """多条件查询请求""" + account_id: Optional[int] = Field(None, description="账号ID") + account_name: Optional[str] = Field(None, description="账号名称(模糊匹配)") + check_time: Optional[date] = Field(None, description="检查时间") + check_time_start: Optional[date] = Field(None, description="检查时间开始") + check_time_end: Optional[date] = Field(None, description="检查时间结束") + linecode: Optional[str] = Field(None, description="线路编码") + linecodes: Optional[List[str]] = Field(None, description="线路编码列表") + page: int = Field(1, ge=1, description="页码") + page_size: int = Field(20, ge=1, le=1000, description="每页数量") + +class DailyDiffListResponse(BaseModel): + """查询列表响应""" + code: int = 0 + message: str + total: int = 0 + data: List[DailyDiffResponse] = [] + +class BatchDeleteRequest(BaseModel): + """批量删除请求""" + ids: List[int] = Field(..., description="要删除的ID列表") + +class BatchDeleteResponse(BaseModel): + """批量删除响应""" + code: int = 0 + message: str + deleted_count: int = 0 diff --git a/app/services/daily_diff.py b/app/services/daily_diff.py new file mode 100644 index 0000000..d4eb8b3 --- /dev/null +++ b/app/services/daily_diff.py @@ -0,0 +1,174 @@ +from sqlalchemy.orm import Session +from sqlalchemy import and_ +from typing import List, Optional, Dict, Any, Tuple +from datetime import date +from ..models.daily_diff import DailyDiff +from .base import BaseService +import logging + +logger = logging.getLogger(__name__) + +class DailyDiffService(BaseService[DailyDiff]): + def __init__(self): + super().__init__(DailyDiff) + + def batch_create(self, db: Session, data_list: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + 批量新增数据 + 使用check_time和linecode检查重复,重复则跳过 + 优化:批量IN查询后代码处理,减少循环调用数据库 + """ + total_count = len(data_list) + if total_count == 0: + return { + 'success': True, + 'message': '数据为空', + 'total_count': 0, + 'success_count': 0, + 'skip_count': 0 + } + + try: + # 提取所有check_time和linecode用于批量查询 + check_times = list(set(item.get('check_time') for item in data_list if item.get('check_time'))) + linecodes = list(set(item.get('linecode') for item in data_list if item.get('linecode'))) + + # 批量查询已存在的数据(IN查询) + existing_records = [] + if check_times and linecodes: + existing_records = db.query(DailyDiff).filter( + and_( + DailyDiff.check_time.in_(check_times), + DailyDiff.linecode.in_(linecodes) + ) + ).all() + + # 构建已存在数据的key集合: (check_time, linecode) + existing_keys = set() + for record in existing_records: + key = (record.check_time, record.linecode) + existing_keys.add(key) + + # 筛选需要插入的数据 + to_insert = [] + skip_count = 0 + for item in data_list: + check_time = item.get('check_time') + linecode = item.get('linecode') + key = (check_time, linecode) + + if key in existing_keys: + skip_count += 1 + logger.debug(f"跳过重复数据: check_time={check_time}, linecode={linecode}") + else: + to_insert.append(item) + # 添加到已存在集合,防止本批次内重复 + existing_keys.add(key) + + # 批量插入 + success_count = 0 + if to_insert: + batch_size = 500 + for i in range(0, len(to_insert), batch_size): + batch = to_insert[i:i + batch_size] + db_objects = [ + DailyDiff( + account_id=item.get('account_id'), + account_name=item.get('account_name'), + check_time=item.get('check_time'), + linecode=item.get('linecode') + ) + for item in batch + ] + db.add_all(db_objects) + success_count += len(batch) + + db.commit() + logger.info(f"批量插入完成: 成功{success_count}条, 跳过{skip_count}条") + + return { + 'success': True, + 'message': '批量新增完成', + 'total_count': total_count, + 'success_count': success_count, + 'skip_count': skip_count + } + + except Exception as e: + db.rollback() + logger.error(f"批量新增失败: {str(e)}", exc_info=True) + return { + 'success': False, + 'message': f'批量新增失败: {str(e)}', + 'total_count': total_count, + 'success_count': 0, + 'skip_count': 0 + } + + def query_by_conditions( + self, + db: Session, + account_id: Optional[int] = None, + account_name: Optional[str] = None, + check_time: Optional[date] = None, + check_time_start: Optional[date] = None, + check_time_end: Optional[date] = None, + linecode: Optional[str] = None, + linecodes: Optional[List[str]] = None, + page: int = 1, + page_size: int = 20 + ) -> Tuple[List[DailyDiff], int]: + """ + 多条件查询 + 返回: (数据列表, 总数) + """ + query = db.query(DailyDiff) + + # 构建查询条件 + if account_id is not None: + query = query.filter(DailyDiff.account_id == account_id) + + if account_name: + query = query.filter(DailyDiff.account_name.like(f"%{account_name}%")) + + if check_time: + query = query.filter(DailyDiff.check_time == check_time) + + if check_time_start: + query = query.filter(DailyDiff.check_time >= check_time_start) + + if check_time_end: + query = query.filter(DailyDiff.check_time <= check_time_end) + + if linecode: + query = query.filter(DailyDiff.linecode == linecode) + + if linecodes: + query = query.filter(DailyDiff.linecode.in_(linecodes)) + + # 获取总数 + total = query.count() + + # 分页 + offset = (page - 1) * page_size + data = query.order_by(DailyDiff.id.desc()).offset(offset).limit(page_size).all() + + return data, total + + def batch_delete(self, db: Session, ids: List[int]) -> int: + """ + 批量删除 + 返回删除的记录数 + """ + if not ids: + return 0 + + try: + deleted_count = db.query(DailyDiff).filter(DailyDiff.id.in_(ids)).delete(synchronize_session=False) + db.commit() + logger.info(f"批量删除完成: 删除{deleted_count}条") + return deleted_count + except Exception as e: + db.rollback() + logger.error(f"批量删除失败: {str(e)}", exc_info=True) + raise e