from sqlalchemy.orm import Session from ..models.function_list import FunctionList from ..schemas.function_list import ( FunctionListCreate, FunctionListUpdate, FunctionListResponse ) from typing import List, Optional import logging logger = logging.getLogger(__name__) class FunctionListService: """铁科平台功能服务""" @staticmethod def create_function(db: Session, func_data: FunctionListCreate) -> FunctionListResponse: """创建功能""" db_func = FunctionList(**func_data.model_dump()) db.add(db_func) db.commit() db.refresh(db_func) return FunctionListResponse.from_orm_function(db_func) @staticmethod def get_function_by_id(db: Session, func_id: int) -> Optional[FunctionList]: """根据ID获取功能""" return db.query(FunctionList).filter(FunctionList.id == func_id).first() @staticmethod def get_function_by_name(db: Session, name: str) -> Optional[FunctionList]: """根据名称获取功能""" return db.query(FunctionList).filter(FunctionList.function_name == name).first() @staticmethod def get_functions(db: Session, skip: int = 0, limit: int = 100) -> List[FunctionListResponse]: """获取功能列表""" functions = db.query(FunctionList).offset(skip).limit(limit).all() return [FunctionListResponse.from_orm_function(f) for f in functions] @staticmethod def search_functions( db: Session, func_id: Optional[int] = None, function_name: Optional[str] = None ) -> List[FunctionListResponse]: """根据条件搜索功能""" query = db.query(FunctionList) if func_id is not None: query = query.filter(FunctionList.id == func_id) if function_name is not None: query = query.filter(FunctionList.function_name.like(f"%{function_name}%")) functions = query.all() return [FunctionListResponse.from_orm_function(f) for f in functions] @staticmethod def update_function( db: Session, func_id: int, func_data: FunctionListUpdate ) -> Optional[FunctionListResponse]: """更新功能""" db_func = db.query(FunctionList).filter(FunctionList.id == func_id).first() if db_func: update_data = func_data.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(db_func, field, value) db.commit() db.refresh(db_func) return FunctionListResponse.from_orm_function(db_func) return None @staticmethod def delete_function(db: Session, func_id: int) -> bool: """删除功能""" db_func = db.query(FunctionList).filter(FunctionList.id == func_id).first() if db_func: db.delete(db_func) db.commit() return True return False