原始数据分表处理

This commit is contained in:
lhx
2025-10-23 10:32:48 +08:00
parent 2fdec05e86
commit 21c61cdec7
6 changed files with 385 additions and 58 deletions

View File

@@ -121,10 +121,24 @@ def batch_import_level_data(request: BatchLevelDataImportRequest, db: Session =
@router.post("/batch_import_original_data", response_model=DataImportResponse) @router.post("/batch_import_original_data", response_model=DataImportResponse)
def batch_import_original_data(request: BatchOriginalDataImportRequest, db: Session = Depends(get_db)): def batch_import_original_data(request: BatchOriginalDataImportRequest, db: Session = Depends(get_db)):
"""批量导入原始数据""" """批量导入原始数据 - 数据中必须包含account_id字段"""
try: try:
logger.info(f"Starting batch import original data, count: {len(request.data)}") logger.info(f"Starting batch import original data, count: {len(request.data)}")
# 验证数据中是否包含account_id
if not request.data or len(request.data) == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="导入数据不能为空"
)
# 检查第一条数据是否包含account_id
if 'account_id' not in request.data[0]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="数据中必须包含account_id字段"
)
# 直接使用字典列表,不需要转换 # 直接使用字典列表,不需要转换
data_list = request.data data_list = request.data
@@ -133,6 +147,8 @@ def batch_import_original_data(request: BatchOriginalDataImportRequest, db: Sess
logger.info(f"Batch import original data completed: {result['message']}") logger.info(f"Batch import original data completed: {result['message']}")
return DataImportResponse(**result) return DataImportResponse(**result)
except HTTPException:
raise
except Exception as e: except Exception as e:
logger.error(f"Batch import original data failed: {str(e)}") logger.error(f"Batch import original data failed: {str(e)}")
raise HTTPException( raise HTTPException(
@@ -246,13 +262,21 @@ def get_settlement_checkpoint(request: SettlementDataCheckpointQueryRequest, db:
# 根据期数id获取原始数据 # 根据期数id获取原始数据
@router.post("/get_original", response_model=DataResponse) @router.post("/get_original", response_model=DataResponse)
def get_original(request: OriginalDataQueryRequest, db: Session = Depends(get_db)): def get_original(request: OriginalDataQueryRequest, db: Session = Depends(get_db)):
"""获取水准数据+原始数据""" """获取水准数据+原始数据 - 必须提供account_id"""
try: try:
logger.info(f"Querying original data with params: {request.dict()}") logger.info(f"Querying original data with params: {request.dict()}")
# 验证account_id
if not request.account_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="必须提供account_id参数"
)
# 调用综合服务的业务方法 # 调用综合服务的业务方法
result = comprehensive_service.get_level_and_original_data( result = comprehensive_service.get_level_and_original_data(
db, db,
account_id=request.account_id,
id=request.id, id=request.id,
bfpcode=request.bfpcode, bfpcode=request.bfpcode,
bffb=request.bffb, bffb=request.bffb,
@@ -268,6 +292,8 @@ def get_original(request: OriginalDataQueryRequest, db: Session = Depends(get_db
data=result["data"] data=result["data"]
) )
except HTTPException:
raise
except Exception as e: except Exception as e:
logger.error(f"Query original data failed: {str(e)}") logger.error(f"Query original data failed: {str(e)}")
raise HTTPException( raise HTTPException(

View File

@@ -2,9 +2,11 @@ from sqlalchemy import Column, Integer, String, DateTime
from ..core.database import Base from ..core.database import Base
class OriginalData(Base): class OriginalData(Base):
"""原始数据模型基类 - 仅用作模板,实际使用动态生成的分表模型"""
__tablename__ = "original_data" __tablename__ = "original_data"
id = Column(Integer, primary_key=True, index=True, autoincrement=True) id = Column(Integer, primary_key=True, index=True, autoincrement=True)
account_id = Column(Integer, nullable=False, comment="账号ID", index=True)
bfpcode = Column(String(1000), nullable=False, comment="前(后)视点名称") bfpcode = Column(String(1000), nullable=False, comment="前(后)视点名称")
mtime = Column(DateTime, nullable=False, comment="测点观测时间") mtime = Column(DateTime, nullable=False, comment="测点观测时间")
bffb = Column(String(1000), nullable=False, comment="前(后)视标记符") bffb = Column(String(1000), nullable=False, comment="前(后)视标记符")
@@ -12,3 +14,51 @@ class OriginalData(Base):
bfpvalue = Column(String(1000), nullable=False, comment="前(后)视尺读数(m)") bfpvalue = Column(String(1000), nullable=False, comment="前(后)视尺读数(m)")
NYID = Column(String(100), nullable=False, comment="期数id", index=True) NYID = Column(String(100), nullable=False, comment="期数id", index=True)
sort = Column(Integer, comment="序号") sort = Column(Integer, comment="序号")
def get_original_data_model(account_id: int):
"""
根据账号ID动态生成原始数据表模型
Args:
account_id: 账号ID
Returns:
动态生成的原始数据表模型类
"""
class_name = f"OriginalData_{account_id}"
table_name = f"original_data_{account_id}"
# 动态创建模型类
DynamicModel = type(
class_name,
(Base,),
{
'__tablename__': table_name,
'__table_args__': {'extend_existing': True},
'id': Column(Integer, primary_key=True, index=True, autoincrement=True),
'account_id': Column(Integer, nullable=False, comment="账号ID", index=True),
'bfpcode': Column(String(1000), nullable=False, comment="前(后)视点名称"),
'mtime': Column(DateTime, nullable=False, comment="测点观测时间"),
'bffb': Column(String(1000), nullable=False, comment="前(后)视标记符"),
'bfpl': Column(String(1000), nullable=False, comment="前(后)视距离(m)"),
'bfpvalue': Column(String(1000), nullable=False, comment="前(后)视尺读数(m)"),
'NYID': Column(String(100), nullable=False, comment="期数id", index=True),
'sort': Column(Integer, comment="序号")
}
)
return DynamicModel
def get_table_name(account_id: int) -> str:
"""
根据账号ID获取原始数据表名
Args:
account_id: 账号ID
Returns:
表名
"""
return f"original_data_{account_id}"

View File

@@ -4,6 +4,7 @@ from typing import Any, Dict, List, Optional
# 原始数据导入请求 # 原始数据导入请求
class OriginalDataImportRequest(BaseModel): class OriginalDataImportRequest(BaseModel):
account_id: int # 账号ID,必填
bfpcode: Optional[str] = None bfpcode: Optional[str] = None
mtime: Optional[str] = None mtime: Optional[str] = None
bffb: Optional[str] = None bffb: Optional[str] = None
@@ -69,6 +70,7 @@ class SectionDataImportRequest(BaseModel):
# 原始数据查询请求 # 原始数据查询请求
class OriginalDataQueryRequest(BaseModel): class OriginalDataQueryRequest(BaseModel):
account_id: int # 账号ID,必填
linecode: Optional[str] = None linecode: Optional[str] = None
id: Optional[int] = None id: Optional[int] = None
bfpcode: Optional[str] = None bfpcode: Optional[str] = None

View File

@@ -2,15 +2,31 @@ from sqlalchemy.orm import Session
from ..models.account import Account from ..models.account import Account
from ..schemas.account import AccountCreate, AccountUpdate, AccountResponse from ..schemas.account import AccountCreate, AccountUpdate, AccountResponse
from typing import List, Optional from typing import List, Optional
import logging
logger = logging.getLogger(__name__)
class AccountService: class AccountService:
@staticmethod @staticmethod
def create_account(db: Session, account_data: AccountCreate) -> AccountResponse: def create_account(db: Session, account_data: AccountCreate) -> AccountResponse:
"""创建账号""" """创建账号并自动创建对应的原始数据表"""
from .original_data import OriginalDataService
db_account = Account(**account_data.dict()) db_account = Account(**account_data.dict())
db.add(db_account) db.add(db_account)
db.commit() db.commit()
db.refresh(db_account) db.refresh(db_account)
# 创建对应的原始数据表
try:
original_service = OriginalDataService()
original_service.create_table_for_account(db, db_account.id)
logger.info(f"Created original data table for account {db_account.id}")
except Exception as e:
logger.error(f"Failed to create original data table for account {db_account.id}: {str(e)}")
# 注意:这里不回滚账号创建,因为表创建失败不应该影响账号创建
# 可以稍后手动重试创建表
return AccountResponse.from_orm_account(db_account) return AccountResponse.from_orm_account(db_account)
@staticmethod @staticmethod

View File

@@ -168,13 +168,24 @@ class ComprehensiveDataService:
} }
def get_level_and_original_data(self, db: Session, def get_level_and_original_data(self, db: Session,
account_id: int,
id: Optional[int] = None, id: Optional[int] = None,
bfpcode: Optional[str] = None, bfpcode: Optional[str] = None,
bffb: Optional[str] = None, bffb: Optional[str] = None,
nyid: Optional[str] = None, nyid: Optional[str] = None,
linecode: Optional[str] = None, linecode: Optional[str] = None,
bfpl: Optional[str] = None) -> Dict[str, Any]: bfpl: Optional[str] = None) -> Dict[str, Any]:
"""根据条件获取水准数据+原始数据的组合查询""" """
根据条件获取水准数据+原始数据的组合查询
Args:
db: 数据库会话
account_id: 账号ID,必填
其他查询条件...
Returns:
查询结果字典
"""
# 查询水准数据 # 查询水准数据
level_data = self.level_service.search_level_data( level_data = self.level_service.search_level_data(
db, db,
@@ -182,17 +193,40 @@ class ComprehensiveDataService:
linecode=linecode linecode=linecode
) )
# 查询原始数据 # 查询原始数据 - 传递account_id
original_data = self.original_service.search_original_data( original_data = self.original_service.search_original_data(
db, db,
account_id=account_id,
bfpcode=bfpcode, bfpcode=bfpcode,
bffb=bffb, bffb=bffb,
nyid=nyid, nyid=nyid,
bfpl=bfpl bfpl=bfpl
) )
result = [] result = []
original_count = 0 original_count = 0
for level in level_data: for level in level_data:
# 将原始数据转换为字典格式
original_datas_for_level = []
for orig in original_data:
# 处理SQL查询结果(可能是Row对象或字典)
try:
# 尝试访问属性
orig_nyid = orig.NYID if hasattr(orig, 'NYID') else orig.get('NYID') if isinstance(orig, dict) else None
if orig_nyid == level.NYID:
original_datas_for_level.append({
"id": orig.id if hasattr(orig, 'id') else orig.get('id'),
"bfpcode": orig.bfpcode if hasattr(orig, 'bfpcode') else orig.get('bfpcode'),
"mtime": str(orig.mtime) if hasattr(orig, 'mtime') else str(orig.get('mtime')) if orig.get('mtime') else None,
"bffb": orig.bffb if hasattr(orig, 'bffb') else orig.get('bffb'),
"bfpl": orig.bfpl if hasattr(orig, 'bfpl') else orig.get('bfpl'),
"bfpvalue": orig.bfpvalue if hasattr(orig, 'bfpvalue') else orig.get('bfpvalue'),
"NYID": orig.NYID if hasattr(orig, 'NYID') else orig.get('NYID'),
"sort": orig.sort if hasattr(orig, 'sort') else orig.get('sort')
})
except Exception:
continue
data = { data = {
"id": level.id, "id": level.id,
"linecode": level.linecode, "linecode": level.linecode,
@@ -201,20 +235,9 @@ class ComprehensiveDataService:
"mtype": level.mtype, "mtype": level.mtype,
"NYID": level.NYID, "NYID": level.NYID,
"createDate": level.createDate, "createDate": level.createDate,
"originalDatas": [ "originalDatas": original_datas_for_level
{
"id": orig.id,
"bfpcode": orig.bfpcode,
"mtime": orig.mtime,
"bffb": orig.bffb,
"bfpl": orig.bfpl,
"bfpvalue": orig.bfpvalue,
"NYID": orig.NYID,
"sort": orig.sort
} for orig in original_data if orig.NYID == level.NYID
]
} }
original_count += len(data["originalDatas"]) original_count += len(original_datas_for_level)
result.append(data) result.append(data)
return { return {

View File

@@ -1,41 +1,190 @@
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from sqlalchemy import text, inspect
from typing import List, Optional, Dict, Any from typing import List, Optional, Dict, Any
from ..models.original_data import OriginalData from ..models.original_data import OriginalData, get_original_data_model, get_table_name
from .base import BaseService from .base import BaseService
from ..models.settlement_data import SettlementData from ..models.settlement_data import SettlementData
from ..models.account import Account
from ..core.database import engine
import logging
logger = logging.getLogger(__name__)
class OriginalDataService(BaseService[OriginalData]): class OriginalDataService(BaseService[OriginalData]):
def __init__(self): def __init__(self):
super().__init__(OriginalData) super().__init__(OriginalData)
def get_by_nyid(self, db: Session, nyid: str) -> List[OriginalData]: def _get_table_name(self, account_id: int) -> str:
"""根据期数ID获取原始数据""" """获取原始数据表名"""
return self.get_by_field(db, "NYID", nyid) return get_table_name(account_id)
def get_by_bfpcode(self, db: Session, bfpcode: str) -> List[OriginalData]: def _ensure_table_exists(self, db: Session, account_id: int) -> bool:
"""根据前(后)视点名称获取原始数据""" """
return self.get_by_field(db, "bfpcode", bfpcode) 确保指定账号的原始数据表存在,不存在则创建
Args:
db: 数据库会话
account_id: 账号ID
Returns:
bool: 表是否存在或创建成功
"""
table_name = self._get_table_name(account_id)
inspector = inspect(engine)
# 检查表是否存在
if table_name in inspector.get_table_names():
logger.info(f"Table {table_name} already exists")
return True
# 表不存在,创建表
try:
create_table_sql = f"""
CREATE TABLE `{table_name}` (
`id` INT AUTO_INCREMENT PRIMARY KEY,
`account_id` INT NOT NULL COMMENT '账号ID',
`bfpcode` VARCHAR(1000) NOT NULL COMMENT '前(后)视点名称',
`mtime` DATETIME NOT NULL COMMENT '测点观测时间',
`bffb` VARCHAR(1000) NOT NULL COMMENT '前(后)视标记符',
`bfpl` VARCHAR(1000) NOT NULL COMMENT '前(后)视距离(m)',
`bfpvalue` VARCHAR(1000) NOT NULL COMMENT '前(后)视尺读数(m)',
`NYID` VARCHAR(100) NOT NULL COMMENT '期数id',
`sort` INT COMMENT '序号',
INDEX `idx_nyid` (`NYID`),
INDEX `idx_account_id` (`account_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='原始数据表_账号{account_id}'
"""
db.execute(text(create_table_sql))
db.commit()
logger.info(f"Table {table_name} created successfully")
return True
except Exception as e:
db.rollback()
logger.error(f"Failed to create table {table_name}: {str(e)}")
raise Exception(f"创建原始数据表失败: {str(e)}")
def create_table_for_account(self, db: Session, account_id: int) -> Dict[str, Any]:
"""
为指定账号创建原始数据表
Args:
db: 数据库会话
account_id: 账号ID
Returns:
操作结果
"""
try:
# 验证账号是否存在
account = db.query(Account).filter(Account.id == account_id).first()
if not account:
return {
'success': False,
'message': f'账号ID {account_id} 不存在'
}
# 创建表
self._ensure_table_exists(db, account_id)
return {
'success': True,
'message': f'原始数据表 {self._get_table_name(account_id)} 创建成功'
}
except Exception as e:
logger.error(f"Failed to create table for account {account_id}: {str(e)}")
return {
'success': False,
'message': f'创建原始数据表失败: {str(e)}'
}
def get_by_nyid(self, db: Session, account_id: int, nyid: str) -> List[OriginalData]:
"""
根据期数ID获取原始数据
Args:
db: 数据库会话
account_id: 账号ID
nyid: 期数ID
Returns:
原始数据列表
"""
try:
table_name = self._get_table_name(account_id)
# 使用原生SQL查询
query = text(f"SELECT * FROM `{table_name}` WHERE NYID = :nyid")
result = db.execute(query, {"nyid": nyid})
return result.fetchall()
except Exception as e:
logger.error(f"Failed to query data from {self._get_table_name(account_id)}: {str(e)}")
return []
def get_by_bfpcode(self, db: Session, account_id: int, bfpcode: str) -> List[OriginalData]:
"""
根据前(后)视点名称获取原始数据
Args:
db: 数据库会话
account_id: 账号ID
bfpcode: 前(后)视点名称
Returns:
原始数据列表
"""
try:
table_name = self._get_table_name(account_id)
query = text(f"SELECT * FROM `{table_name}` WHERE bfpcode = :bfpcode")
result = db.execute(query, {"bfpcode": bfpcode})
return result.fetchall()
except Exception as e:
logger.error(f"Failed to query data from {self._get_table_name(account_id)}: {str(e)}")
return []
def search_original_data(self, db: Session, def search_original_data(self, db: Session,
account_id: int,
id: Optional[int] = None, id: Optional[int] = None,
bfpcode: Optional[str] = None, bfpcode: Optional[str] = None,
bffb: Optional[str] = None, bffb: Optional[str] = None,
nyid: Optional[str] = None, nyid: Optional[str] = None,
bfpl: Optional[str] = None) -> List[OriginalData]: bfpl: Optional[str] = None) -> List[OriginalData]:
"""根据多个条件搜索原始数据""" """
conditions = {} 根据多个条件搜索原始数据
if id is not None:
conditions["id"] = id
if bfpcode is not None:
conditions["bfpcode"] = bfpcode
if bffb is not None:
conditions["bffb"] = bffb
if nyid is not None:
conditions["NYID"] = nyid
if bfpl is not None:
conditions["bfpl"] = bfpl
return self.search_by_conditions(db, conditions) Args:
db: 数据库会话
account_id: 账号ID
其他查询条件...
Returns:
原始数据列表
"""
try:
table_name = self._get_table_name(account_id)
conditions = []
params = {}
if id is not None:
conditions.append("id = :id")
params["id"] = id
if bfpcode is not None:
conditions.append("bfpcode = :bfpcode")
params["bfpcode"] = bfpcode
if bffb is not None:
conditions.append("bffb = :bffb")
params["bffb"] = bffb
if nyid is not None:
conditions.append("NYID = :nyid")
params["nyid"] = nyid
if bfpl is not None:
conditions.append("bfpl = :bfpl")
params["bfpl"] = bfpl
where_clause = " AND ".join(conditions) if conditions else "1=1"
query = text(f"SELECT * FROM `{table_name}` WHERE {where_clause}")
result = db.execute(query, params)
return result.fetchall()
except Exception as e:
logger.error(f"Failed to search data from {self._get_table_name(account_id)}: {str(e)}")
return []
def _check_settlement_exists(self, db: Session, nyid: str) -> bool: def _check_settlement_exists(self, db: Session, nyid: str) -> bool:
"""检查期数id沉降数据是否存在""" """检查期数id沉降数据是否存在"""
@@ -44,10 +193,16 @@ class OriginalDataService(BaseService[OriginalData]):
def batch_import_original_data(self, db: Session, data: List) -> Dict[str, Any]: def batch_import_original_data(self, db: Session, data: List) -> Dict[str, Any]:
""" """
批量导入原始数据直接新增无需检查重复 批量导入原始数据到指定账号的分表,直接新增,无需检查重复
支持事务回滚失败时重试一次 支持事务回滚,失败时重试一次
Args:
db: 数据库会话
data: 数据列表,每条数据必须包含account_id字段
Returns:
操作结果
""" """
import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
total_count = len(data) total_count = len(data)
@@ -55,6 +210,55 @@ class OriginalDataService(BaseService[OriginalData]):
failed_count = 0 failed_count = 0
failed_items = [] failed_items = []
if total_count == 0:
return {
'success': False,
'message': '导入数据不能为空',
'total_count': 0,
'success_count': 0,
'failed_count': 0,
'failed_items': []
}
# 获取第一条数据的account_id
account_id = data[0].get('account_id')
if not account_id:
return {
'success': False,
'message': '数据中缺少account_id字段',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': []
}
# 验证账号是否存在
account = db.query(Account).filter(Account.id == account_id).first()
if not account:
return {
'success': False,
'message': f'账号ID {account_id} 不存在',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': []
}
# 确保表存在
try:
self._ensure_table_exists(db, account_id)
except Exception as e:
return {
'success': False,
'message': f'创建原始数据表失败: {str(e)}',
'total_count': total_count,
'success_count': 0,
'failed_count': total_count,
'failed_items': []
}
table_name = self._get_table_name(account_id)
for attempt in range(2): # 最多重试1次 for attempt in range(2): # 最多重试1次
try: try:
db.begin() db.begin()
@@ -63,11 +267,12 @@ class OriginalDataService(BaseService[OriginalData]):
failed_items = [] failed_items = []
nyid = data[0].get('NYID') nyid = data[0].get('NYID')
is_exists = db.query(OriginalData).filter( # 检查该期数数据是否已存在
OriginalData.NYID == nyid check_query = text(f"SELECT COUNT(*) as cnt FROM `{table_name}` WHERE NYID = :nyid")
).count() is_exists = db.execute(check_query, {"nyid": nyid}).fetchone()[0]
if is_exists > 0: if is_exists > 0:
db.rollback()
return { return {
'success': True, 'success': True,
'message': '数据已存在', 'message': '数据已存在',
@@ -79,26 +284,31 @@ class OriginalDataService(BaseService[OriginalData]):
for item_data in data: for item_data in data:
try: try:
# 判断期数id是否存在 # 判断期数id是否存在
settlement = self._check_settlement_exists(db, item_data.get('NYID')) settlement = self._check_settlement_exists(db, item_data.get('NYID'))
if not settlement: if not settlement:
logger.error(f"Settlement {item_data.get('NYID')} not found") logger.error(f"Settlement {item_data.get('NYID')} not found")
raise Exception(f"Settlement {item_data.get('NYID')} not found") raise Exception(f"Settlement {item_data.get('NYID')} not found")
# 构建插入SQL
insert_sql = text(f"""
INSERT INTO `{table_name}`
(account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort)
VALUES
(:account_id, :bfpcode, :mtime, :bffb, :bfpl, :bfpvalue, :NYID, :sort)
""")
# 直接新增操作 db.execute(insert_sql, {
original_data = OriginalData( "account_id": account_id,
bfpcode=item_data.get('bfpcode'), "bfpcode": item_data.get('bfpcode'),
mtime=item_data.get('mtime'), "mtime": item_data.get('mtime'),
bffb=item_data.get('bffb'), "bffb": item_data.get('bffb'),
bfpl=item_data.get('bfpl'), "bfpl": item_data.get('bfpl'),
bfpvalue=item_data.get('bfpvalue'), "bfpvalue": item_data.get('bfpvalue'),
NYID=item_data.get('NYID'), "NYID": item_data.get('NYID'),
sort=item_data.get('sort') "sort": item_data.get('sort')
) })
db.add(original_data) logger.info(f"Created original data: {item_data.get('bfpcode')}-{item_data.get('NYID')} in table {table_name}")
logger.info(f"Created original data: {item_data.get('bfpcode')}-{item_data.get('NYID')}")
success_count += 1 success_count += 1
except Exception as e: except Exception as e:
failed_count += 1 failed_count += 1