铁科院功能模块

This commit is contained in:
lhx
2025-12-23 16:55:39 +08:00
parent 17feb136de
commit c1a73d9928
5 changed files with 274 additions and 0 deletions

View File

@@ -0,0 +1,80 @@
from sqlalchemy.orm import Session
from ..models.function_list import FunctionList
from ..schemas.function_list import (
FunctionListCreate, FunctionListUpdate, FunctionListResponse
)
from typing import List, Optional
import logging
logger = logging.getLogger(__name__)
class FunctionListService:
"""铁科平台功能服务"""
@staticmethod
def create_function(db: Session, func_data: FunctionListCreate) -> FunctionListResponse:
"""创建功能"""
db_func = FunctionList(**func_data.model_dump())
db.add(db_func)
db.commit()
db.refresh(db_func)
return FunctionListResponse.from_orm_function(db_func)
@staticmethod
def get_function_by_id(db: Session, func_id: int) -> Optional[FunctionList]:
"""根据ID获取功能"""
return db.query(FunctionList).filter(FunctionList.id == func_id).first()
@staticmethod
def get_function_by_name(db: Session, name: str) -> Optional[FunctionList]:
"""根据名称获取功能"""
return db.query(FunctionList).filter(FunctionList.function_name == name).first()
@staticmethod
def get_functions(db: Session, skip: int = 0, limit: int = 100) -> List[FunctionListResponse]:
"""获取功能列表"""
functions = db.query(FunctionList).offset(skip).limit(limit).all()
return [FunctionListResponse.from_orm_function(f) for f in functions]
@staticmethod
def search_functions(
db: Session,
func_id: Optional[int] = None,
function_name: Optional[str] = None
) -> List[FunctionListResponse]:
"""根据条件搜索功能"""
query = db.query(FunctionList)
if func_id is not None:
query = query.filter(FunctionList.id == func_id)
if function_name is not None:
query = query.filter(FunctionList.function_name.like(f"%{function_name}%"))
functions = query.all()
return [FunctionListResponse.from_orm_function(f) for f in functions]
@staticmethod
def update_function(
db: Session, func_id: int, func_data: FunctionListUpdate
) -> Optional[FunctionListResponse]:
"""更新功能"""
db_func = db.query(FunctionList).filter(FunctionList.id == func_id).first()
if db_func:
update_data = func_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_func, field, value)
db.commit()
db.refresh(db_func)
return FunctionListResponse.from_orm_function(db_func)
return None
@staticmethod
def delete_function(db: Session, func_id: int) -> bool:
"""删除功能"""
db_func = db.query(FunctionList).filter(FunctionList.id == func_id).first()
if db_func:
db.delete(db_func)
db.commit()
return True
return False