From 21c61cdec72d2c49d597cdde0cfe8474000ac856 Mon Sep 17 00:00:00 2001 From: lhx Date: Thu, 23 Oct 2025 10:32:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=9F=E5=A7=8B=E6=95=B0=E6=8D=AE=E5=88=86?= =?UTF-8?q?=E8=A1=A8=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/comprehensive_data.py | 30 +++- app/models/original_data.py | 52 +++++- app/schemas/comprehensive_data.py | 2 + app/services/account.py | 18 +- app/services/comprehensive.py | 53 ++++-- app/services/original_data.py | 288 ++++++++++++++++++++++++++---- 6 files changed, 385 insertions(+), 58 deletions(-) diff --git a/app/api/comprehensive_data.py b/app/api/comprehensive_data.py index 89fdfc2..4344263 100644 --- a/app/api/comprehensive_data.py +++ b/app/api/comprehensive_data.py @@ -121,10 +121,24 @@ def batch_import_level_data(request: BatchLevelDataImportRequest, db: Session = @router.post("/batch_import_original_data", response_model=DataImportResponse) def batch_import_original_data(request: BatchOriginalDataImportRequest, db: Session = Depends(get_db)): - """批量导入原始数据""" + """批量导入原始数据 - 数据中必须包含account_id字段""" try: logger.info(f"Starting batch import original data, count: {len(request.data)}") + # 验证数据中是否包含account_id + if not request.data or len(request.data) == 0: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="导入数据不能为空" + ) + + # 检查第一条数据是否包含account_id + if 'account_id' not in request.data[0]: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="数据中必须包含account_id字段" + ) + # 直接使用字典列表,不需要转换 data_list = request.data @@ -133,6 +147,8 @@ def batch_import_original_data(request: BatchOriginalDataImportRequest, db: Sess logger.info(f"Batch import original data completed: {result['message']}") return DataImportResponse(**result) + except HTTPException: + raise except Exception as e: logger.error(f"Batch import original data failed: {str(e)}") raise HTTPException( @@ -246,13 +262,21 @@ def get_settlement_checkpoint(request: SettlementDataCheckpointQueryRequest, db: # 根据期数id获取原始数据 @router.post("/get_original", response_model=DataResponse) def get_original(request: OriginalDataQueryRequest, db: Session = Depends(get_db)): - """获取水准数据+原始数据""" + """获取水准数据+原始数据 - 必须提供account_id""" try: logger.info(f"Querying original data with params: {request.dict()}") + # 验证account_id + if not request.account_id: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="必须提供account_id参数" + ) + # 调用综合服务的业务方法 result = comprehensive_service.get_level_and_original_data( db, + account_id=request.account_id, id=request.id, bfpcode=request.bfpcode, bffb=request.bffb, @@ -268,6 +292,8 @@ def get_original(request: OriginalDataQueryRequest, db: Session = Depends(get_db data=result["data"] ) + except HTTPException: + raise except Exception as e: logger.error(f"Query original data failed: {str(e)}") raise HTTPException( diff --git a/app/models/original_data.py b/app/models/original_data.py index 809c70c..a4e3f10 100644 --- a/app/models/original_data.py +++ b/app/models/original_data.py @@ -2,13 +2,63 @@ from sqlalchemy import Column, Integer, String, DateTime from ..core.database import Base class OriginalData(Base): + """原始数据模型基类 - 仅用作模板,实际使用动态生成的分表模型""" __tablename__ = "original_data" id = Column(Integer, primary_key=True, index=True, autoincrement=True) + account_id = Column(Integer, nullable=False, comment="账号ID", index=True) bfpcode = Column(String(1000), nullable=False, comment="前(后)视点名称") mtime = Column(DateTime, nullable=False, comment="测点观测时间") bffb = Column(String(1000), nullable=False, comment="前(后)视标记符") bfpl = Column(String(1000), nullable=False, comment="前(后)视距离(m)") bfpvalue = Column(String(1000), nullable=False, comment="前(后)视尺读数(m)") NYID = Column(String(100), nullable=False, comment="期数id", index=True) - sort = Column(Integer, comment="序号") \ No newline at end of file + sort = Column(Integer, comment="序号") + + +def get_original_data_model(account_id: int): + """ + 根据账号ID动态生成原始数据表模型 + + Args: + account_id: 账号ID + + Returns: + 动态生成的原始数据表模型类 + """ + class_name = f"OriginalData_{account_id}" + table_name = f"original_data_{account_id}" + + # 动态创建模型类 + DynamicModel = type( + class_name, + (Base,), + { + '__tablename__': table_name, + '__table_args__': {'extend_existing': True}, + 'id': Column(Integer, primary_key=True, index=True, autoincrement=True), + 'account_id': Column(Integer, nullable=False, comment="账号ID", index=True), + 'bfpcode': Column(String(1000), nullable=False, comment="前(后)视点名称"), + 'mtime': Column(DateTime, nullable=False, comment="测点观测时间"), + 'bffb': Column(String(1000), nullable=False, comment="前(后)视标记符"), + 'bfpl': Column(String(1000), nullable=False, comment="前(后)视距离(m)"), + 'bfpvalue': Column(String(1000), nullable=False, comment="前(后)视尺读数(m)"), + 'NYID': Column(String(100), nullable=False, comment="期数id", index=True), + 'sort': Column(Integer, comment="序号") + } + ) + + return DynamicModel + + +def get_table_name(account_id: int) -> str: + """ + 根据账号ID获取原始数据表名 + + Args: + account_id: 账号ID + + Returns: + 表名 + """ + return f"original_data_{account_id}" \ No newline at end of file diff --git a/app/schemas/comprehensive_data.py b/app/schemas/comprehensive_data.py index b06534b..5069299 100644 --- a/app/schemas/comprehensive_data.py +++ b/app/schemas/comprehensive_data.py @@ -4,6 +4,7 @@ from typing import Any, Dict, List, Optional # 原始数据导入请求 class OriginalDataImportRequest(BaseModel): + account_id: int # 账号ID,必填 bfpcode: Optional[str] = None mtime: Optional[str] = None bffb: Optional[str] = None @@ -69,6 +70,7 @@ class SectionDataImportRequest(BaseModel): # 原始数据查询请求 class OriginalDataQueryRequest(BaseModel): + account_id: int # 账号ID,必填 linecode: Optional[str] = None id: Optional[int] = None bfpcode: Optional[str] = None diff --git a/app/services/account.py b/app/services/account.py index 02c228a..daf0466 100644 --- a/app/services/account.py +++ b/app/services/account.py @@ -2,15 +2,31 @@ from sqlalchemy.orm import Session from ..models.account import Account from ..schemas.account import AccountCreate, AccountUpdate, AccountResponse from typing import List, Optional +import logging + +logger = logging.getLogger(__name__) class AccountService: @staticmethod def create_account(db: Session, account_data: AccountCreate) -> AccountResponse: - """创建账号""" + """创建账号并自动创建对应的原始数据表""" + from .original_data import OriginalDataService + db_account = Account(**account_data.dict()) db.add(db_account) db.commit() db.refresh(db_account) + + # 创建对应的原始数据表 + try: + original_service = OriginalDataService() + original_service.create_table_for_account(db, db_account.id) + logger.info(f"Created original data table for account {db_account.id}") + except Exception as e: + logger.error(f"Failed to create original data table for account {db_account.id}: {str(e)}") + # 注意:这里不回滚账号创建,因为表创建失败不应该影响账号创建 + # 可以稍后手动重试创建表 + return AccountResponse.from_orm_account(db_account) @staticmethod diff --git a/app/services/comprehensive.py b/app/services/comprehensive.py index 3ba0f8d..31c9cd1 100644 --- a/app/services/comprehensive.py +++ b/app/services/comprehensive.py @@ -168,13 +168,24 @@ class ComprehensiveDataService: } def get_level_and_original_data(self, db: Session, + account_id: int, id: Optional[int] = None, bfpcode: Optional[str] = None, bffb: Optional[str] = None, nyid: Optional[str] = None, linecode: Optional[str] = None, bfpl: Optional[str] = None) -> Dict[str, Any]: - """根据条件获取水准数据+原始数据的组合查询""" + """ + 根据条件获取水准数据+原始数据的组合查询 + + Args: + db: 数据库会话 + account_id: 账号ID,必填 + 其他查询条件... + + Returns: + 查询结果字典 + """ # 查询水准数据 level_data = self.level_service.search_level_data( db, @@ -182,17 +193,40 @@ class ComprehensiveDataService: linecode=linecode ) - # 查询原始数据 + # 查询原始数据 - 传递account_id original_data = self.original_service.search_original_data( db, + account_id=account_id, bfpcode=bfpcode, bffb=bffb, nyid=nyid, bfpl=bfpl ) + result = [] original_count = 0 for level in level_data: + # 将原始数据转换为字典格式 + original_datas_for_level = [] + for orig in original_data: + # 处理SQL查询结果(可能是Row对象或字典) + try: + # 尝试访问属性 + orig_nyid = orig.NYID if hasattr(orig, 'NYID') else orig.get('NYID') if isinstance(orig, dict) else None + if orig_nyid == level.NYID: + original_datas_for_level.append({ + "id": orig.id if hasattr(orig, 'id') else orig.get('id'), + "bfpcode": orig.bfpcode if hasattr(orig, 'bfpcode') else orig.get('bfpcode'), + "mtime": str(orig.mtime) if hasattr(orig, 'mtime') else str(orig.get('mtime')) if orig.get('mtime') else None, + "bffb": orig.bffb if hasattr(orig, 'bffb') else orig.get('bffb'), + "bfpl": orig.bfpl if hasattr(orig, 'bfpl') else orig.get('bfpl'), + "bfpvalue": orig.bfpvalue if hasattr(orig, 'bfpvalue') else orig.get('bfpvalue'), + "NYID": orig.NYID if hasattr(orig, 'NYID') else orig.get('NYID'), + "sort": orig.sort if hasattr(orig, 'sort') else orig.get('sort') + }) + except Exception: + continue + data = { "id": level.id, "linecode": level.linecode, @@ -201,20 +235,9 @@ class ComprehensiveDataService: "mtype": level.mtype, "NYID": level.NYID, "createDate": level.createDate, - "originalDatas": [ - { - "id": orig.id, - "bfpcode": orig.bfpcode, - "mtime": orig.mtime, - "bffb": orig.bffb, - "bfpl": orig.bfpl, - "bfpvalue": orig.bfpvalue, - "NYID": orig.NYID, - "sort": orig.sort - } for orig in original_data if orig.NYID == level.NYID - ] + "originalDatas": original_datas_for_level } - original_count += len(data["originalDatas"]) + original_count += len(original_datas_for_level) result.append(data) return { diff --git a/app/services/original_data.py b/app/services/original_data.py index 59249b0..5de1499 100644 --- a/app/services/original_data.py +++ b/app/services/original_data.py @@ -1,41 +1,190 @@ from sqlalchemy.orm import Session +from sqlalchemy import text, inspect from typing import List, Optional, Dict, Any -from ..models.original_data import OriginalData +from ..models.original_data import OriginalData, get_original_data_model, get_table_name from .base import BaseService from ..models.settlement_data import SettlementData +from ..models.account import Account +from ..core.database import engine +import logging + +logger = logging.getLogger(__name__) class OriginalDataService(BaseService[OriginalData]): def __init__(self): super().__init__(OriginalData) - def get_by_nyid(self, db: Session, nyid: str) -> List[OriginalData]: - """根据期数ID获取原始数据""" - return self.get_by_field(db, "NYID", nyid) + def _get_table_name(self, account_id: int) -> str: + """获取原始数据表名""" + return get_table_name(account_id) - def get_by_bfpcode(self, db: Session, bfpcode: str) -> List[OriginalData]: - """根据前(后)视点名称获取原始数据""" - return self.get_by_field(db, "bfpcode", bfpcode) + def _ensure_table_exists(self, db: Session, account_id: int) -> bool: + """ + 确保指定账号的原始数据表存在,不存在则创建 + + Args: + db: 数据库会话 + account_id: 账号ID + + Returns: + bool: 表是否存在或创建成功 + """ + table_name = self._get_table_name(account_id) + inspector = inspect(engine) + + # 检查表是否存在 + if table_name in inspector.get_table_names(): + logger.info(f"Table {table_name} already exists") + return True + + # 表不存在,创建表 + try: + create_table_sql = f""" + CREATE TABLE `{table_name}` ( + `id` INT AUTO_INCREMENT PRIMARY KEY, + `account_id` INT NOT NULL COMMENT '账号ID', + `bfpcode` VARCHAR(1000) NOT NULL COMMENT '前(后)视点名称', + `mtime` DATETIME NOT NULL COMMENT '测点观测时间', + `bffb` VARCHAR(1000) NOT NULL COMMENT '前(后)视标记符', + `bfpl` VARCHAR(1000) NOT NULL COMMENT '前(后)视距离(m)', + `bfpvalue` VARCHAR(1000) NOT NULL COMMENT '前(后)视尺读数(m)', + `NYID` VARCHAR(100) NOT NULL COMMENT '期数id', + `sort` INT COMMENT '序号', + INDEX `idx_nyid` (`NYID`), + INDEX `idx_account_id` (`account_id`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='原始数据表_账号{account_id}' + """ + db.execute(text(create_table_sql)) + db.commit() + logger.info(f"Table {table_name} created successfully") + return True + except Exception as e: + db.rollback() + logger.error(f"Failed to create table {table_name}: {str(e)}") + raise Exception(f"创建原始数据表失败: {str(e)}") + + def create_table_for_account(self, db: Session, account_id: int) -> Dict[str, Any]: + """ + 为指定账号创建原始数据表 + + Args: + db: 数据库会话 + account_id: 账号ID + + Returns: + 操作结果 + """ + try: + # 验证账号是否存在 + account = db.query(Account).filter(Account.id == account_id).first() + if not account: + return { + 'success': False, + 'message': f'账号ID {account_id} 不存在' + } + + # 创建表 + self._ensure_table_exists(db, account_id) + return { + 'success': True, + 'message': f'原始数据表 {self._get_table_name(account_id)} 创建成功' + } + except Exception as e: + logger.error(f"Failed to create table for account {account_id}: {str(e)}") + return { + 'success': False, + 'message': f'创建原始数据表失败: {str(e)}' + } + + def get_by_nyid(self, db: Session, account_id: int, nyid: str) -> List[OriginalData]: + """ + 根据期数ID获取原始数据 + + Args: + db: 数据库会话 + account_id: 账号ID + nyid: 期数ID + + Returns: + 原始数据列表 + """ + try: + table_name = self._get_table_name(account_id) + # 使用原生SQL查询 + query = text(f"SELECT * FROM `{table_name}` WHERE NYID = :nyid") + result = db.execute(query, {"nyid": nyid}) + return result.fetchall() + except Exception as e: + logger.error(f"Failed to query data from {self._get_table_name(account_id)}: {str(e)}") + return [] + + def get_by_bfpcode(self, db: Session, account_id: int, bfpcode: str) -> List[OriginalData]: + """ + 根据前(后)视点名称获取原始数据 + + Args: + db: 数据库会话 + account_id: 账号ID + bfpcode: 前(后)视点名称 + + Returns: + 原始数据列表 + """ + try: + table_name = self._get_table_name(account_id) + query = text(f"SELECT * FROM `{table_name}` WHERE bfpcode = :bfpcode") + result = db.execute(query, {"bfpcode": bfpcode}) + return result.fetchall() + except Exception as e: + logger.error(f"Failed to query data from {self._get_table_name(account_id)}: {str(e)}") + return [] def search_original_data(self, db: Session, + account_id: int, id: Optional[int] = None, bfpcode: Optional[str] = None, bffb: Optional[str] = None, nyid: Optional[str] = None, bfpl: Optional[str] = None) -> List[OriginalData]: - """根据多个条件搜索原始数据""" - conditions = {} - if id is not None: - conditions["id"] = id - if bfpcode is not None: - conditions["bfpcode"] = bfpcode - if bffb is not None: - conditions["bffb"] = bffb - if nyid is not None: - conditions["NYID"] = nyid - if bfpl is not None: - conditions["bfpl"] = bfpl + """ + 根据多个条件搜索原始数据 - return self.search_by_conditions(db, conditions) + Args: + db: 数据库会话 + account_id: 账号ID + 其他查询条件... + + Returns: + 原始数据列表 + """ + try: + table_name = self._get_table_name(account_id) + conditions = [] + params = {} + + if id is not None: + conditions.append("id = :id") + params["id"] = id + if bfpcode is not None: + conditions.append("bfpcode = :bfpcode") + params["bfpcode"] = bfpcode + if bffb is not None: + conditions.append("bffb = :bffb") + params["bffb"] = bffb + if nyid is not None: + conditions.append("NYID = :nyid") + params["nyid"] = nyid + if bfpl is not None: + conditions.append("bfpl = :bfpl") + params["bfpl"] = bfpl + + where_clause = " AND ".join(conditions) if conditions else "1=1" + query = text(f"SELECT * FROM `{table_name}` WHERE {where_clause}") + result = db.execute(query, params) + return result.fetchall() + except Exception as e: + logger.error(f"Failed to search data from {self._get_table_name(account_id)}: {str(e)}") + return [] def _check_settlement_exists(self, db: Session, nyid: str) -> bool: """检查期数id沉降数据是否存在""" @@ -44,10 +193,16 @@ class OriginalDataService(BaseService[OriginalData]): def batch_import_original_data(self, db: Session, data: List) -> Dict[str, Any]: """ - 批量导入原始数据,直接新增,无需检查重复 - 支持事务回滚,失败时重试一次 + 批量导入原始数据到指定账号的分表,直接新增,无需检查重复 + 支持事务回滚,失败时重试一次 + + Args: + db: 数据库会话 + data: 数据列表,每条数据必须包含account_id字段 + + Returns: + 操作结果 """ - import logging logger = logging.getLogger(__name__) total_count = len(data) @@ -55,6 +210,55 @@ class OriginalDataService(BaseService[OriginalData]): failed_count = 0 failed_items = [] + if total_count == 0: + return { + 'success': False, + 'message': '导入数据不能为空', + 'total_count': 0, + 'success_count': 0, + 'failed_count': 0, + 'failed_items': [] + } + + # 获取第一条数据的account_id + account_id = data[0].get('account_id') + if not account_id: + return { + 'success': False, + 'message': '数据中缺少account_id字段', + 'total_count': total_count, + 'success_count': 0, + 'failed_count': total_count, + 'failed_items': [] + } + + # 验证账号是否存在 + account = db.query(Account).filter(Account.id == account_id).first() + if not account: + return { + 'success': False, + 'message': f'账号ID {account_id} 不存在', + 'total_count': total_count, + 'success_count': 0, + 'failed_count': total_count, + 'failed_items': [] + } + + # 确保表存在 + try: + self._ensure_table_exists(db, account_id) + except Exception as e: + return { + 'success': False, + 'message': f'创建原始数据表失败: {str(e)}', + 'total_count': total_count, + 'success_count': 0, + 'failed_count': total_count, + 'failed_items': [] + } + + table_name = self._get_table_name(account_id) + for attempt in range(2): # 最多重试1次 try: db.begin() @@ -63,11 +267,12 @@ class OriginalDataService(BaseService[OriginalData]): failed_items = [] nyid = data[0].get('NYID') - is_exists = db.query(OriginalData).filter( - OriginalData.NYID == nyid - ).count() + # 检查该期数数据是否已存在 + check_query = text(f"SELECT COUNT(*) as cnt FROM `{table_name}` WHERE NYID = :nyid") + is_exists = db.execute(check_query, {"nyid": nyid}).fetchone()[0] if is_exists > 0: + db.rollback() return { 'success': True, 'message': '数据已存在', @@ -79,26 +284,31 @@ class OriginalDataService(BaseService[OriginalData]): for item_data in data: try: - # 判断期数id是否存在 settlement = self._check_settlement_exists(db, item_data.get('NYID')) if not settlement: logger.error(f"Settlement {item_data.get('NYID')} not found") raise Exception(f"Settlement {item_data.get('NYID')} not found") + # 构建插入SQL + insert_sql = text(f""" + INSERT INTO `{table_name}` + (account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort) + VALUES + (:account_id, :bfpcode, :mtime, :bffb, :bfpl, :bfpvalue, :NYID, :sort) + """) - # 直接新增操作 - original_data = OriginalData( - bfpcode=item_data.get('bfpcode'), - mtime=item_data.get('mtime'), - bffb=item_data.get('bffb'), - bfpl=item_data.get('bfpl'), - bfpvalue=item_data.get('bfpvalue'), - NYID=item_data.get('NYID'), - sort=item_data.get('sort') - ) - db.add(original_data) - logger.info(f"Created original data: {item_data.get('bfpcode')}-{item_data.get('NYID')}") + db.execute(insert_sql, { + "account_id": account_id, + "bfpcode": item_data.get('bfpcode'), + "mtime": item_data.get('mtime'), + "bffb": item_data.get('bffb'), + "bfpl": item_data.get('bfpl'), + "bfpvalue": item_data.get('bfpvalue'), + "NYID": item_data.get('NYID'), + "sort": item_data.get('sort') + }) + logger.info(f"Created original data: {item_data.get('bfpcode')}-{item_data.get('NYID')} in table {table_name}") success_count += 1 except Exception as e: failed_count += 1