diff --git a/app/api/comprehensive_data.py b/app/api/comprehensive_data.py index 69b36e0..897cc60 100644 --- a/app/api/comprehensive_data.py +++ b/app/api/comprehensive_data.py @@ -9,6 +9,7 @@ from ..schemas.comprehensive_data import ( BatchSettlementDataImportRequest, BatchLevelDataImportRequest, BatchOriginalDataImportRequest, + BatchOriginalDataImportRequestNew, DataImportResponse, DataResponse, SectionDataQueryRequest, @@ -193,6 +194,70 @@ def batch_import_original_data(request: BatchOriginalDataImportRequest, db: Sess data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []} ) +@router.post("/batch_import_original_data_new", response_model=DataImportResponse) +def batch_import_original_data_new(request: BatchOriginalDataImportRequestNew, db: Session = Depends(get_db)): + """ + 新版批量导入原始数据 - 支持分组格式 + 传入参数格式:data:[[{},{},{}],[{},{}]] + 里层一个[{},{}]称为一组数据,数据{}内容与旧接口一致 + 一组数据全部记录的NYID与account_id将会一样,不同组可能不同 + + 导入逻辑: + 1. 按account_id分表存储,没表就建表 + 2. 插入前根据NYID判断表中是否有重复数据 + 3. 有重复就删除表中全部同NYID数据,插入新的,不重复就直接插入 + """ + try: + logger.info(f"Starting batch import original data (new), group count: {len(request.data)}") + + # 验证分组数据 + if not request.data or len(request.data) == 0: + return DataImportResponse( + code=ResponseCode.BAD_REQUEST, + message="导入数据不能为空", + data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []} + ) + + # 检查第一组数据是否包含account_id + if len(request.data) == 0 or len(request.data[0]) == 0: + return DataImportResponse( + code=ResponseCode.BAD_REQUEST, + message="分组数据不能为空", + data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []} + ) + + first_item = request.data[0][0] + if 'account_id' not in first_item: + return DataImportResponse( + code=ResponseCode.BAD_REQUEST, + message="数据中必须包含account_id字段", + data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []} + ) + + # 调用服务层方法 + data_list = request.data + result = original_service.batch_import_original_data_new(db, data_list) + logger.info(f"Batch import original data (new) completed: {result['message']}") + + return DataImportResponse( + code=ResponseCode.SUCCESS if result.get('success') else ResponseCode.IMPORT_FAILED, + message=result['message'], + data={ + 'total_count': result.get('total_count', 0), + 'success_count': result.get('success_count', 0), + 'failed_count': result.get('failed_count', 0), + 'failed_items': result.get('failed_items', []) + } + ) + except Exception as e: + logger.error(f"Batch import original data (new) failed: {str(e)}") + return DataImportResponse( + code=ResponseCode.IMPORT_FAILED, + message=f"{ResponseMessage.IMPORT_FAILED}: {str(e)}", + data={'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': []} + ) + + # 查询断面数据对应观察点数据 @router.post("/get_section", response_model=DataResponse) def get_section(request: SectionDataQueryRequest, db: Session = Depends(get_db)): diff --git a/app/schemas/comprehensive_data.py b/app/schemas/comprehensive_data.py index e02b9f4..6141df0 100644 --- a/app/schemas/comprehensive_data.py +++ b/app/schemas/comprehensive_data.py @@ -205,6 +205,18 @@ class BatchOriginalDataImportRequest(BaseModel): class Config: extra = "allow" # 允许额外字段 +class BatchOriginalDataImportRequestNew(BaseModel): + """ + 新版原始数据批量导入请求模型 + 支持格式:data:[[{},{},{}],[{},{}]] + 里层一个[{},{}]称为一组数据,数据{}内容与旧接口一致 + 一组数据全部记录的NYID与account_id将会一样 + """ + data: List[List[Dict[str, Any]]] + + class Config: + extra = "allow" # 允许额外字段 + # 新增响应模型 class DataImportResponse(BaseModel): code: int = 0 # 响应状态码,0表示成功 diff --git a/app/services/original_data.py b/app/services/original_data.py index ec38d04..4911772 100644 --- a/app/services/original_data.py +++ b/app/services/original_data.py @@ -35,6 +35,8 @@ class OriginalDataService(BaseService[OriginalData]): # 检查表是否存在 if table_name in inspector.get_table_names(): logger.info(f"Table {table_name} already exists") + # 表已存在时,也执行预热操作 + self._warmup_table(db, table_name) return True # 表不存在,创建表 - 添加重试机制 @@ -61,6 +63,14 @@ class OriginalDataService(BaseService[OriginalData]): conn.execute(text(create_table_sql)) logger.info(f"Table {table_name} created successfully on attempt {attempt + 1}") + + # 表创建后延迟更长时间,避免立即查询触发"表定义变更"错误 + import time + time.sleep(0.2) + + # 预热表:执行一个简单的查询来稳定表定义 + self._warmup_table(db, table_name) + return True except Exception as e: @@ -69,10 +79,13 @@ class OriginalDataService(BaseService[OriginalData]): # 检查是否是表已存在错误(并发情况下可能发生) if "already exists" in str(e).lower() or "exist" in str(e).lower(): logger.info(f"Table {table_name} was created by another process") + # 延迟并预热表 + import time + time.sleep(0.2) + self._warmup_table(db, table_name) return True if attempt == max_retries - 1: - db.rollback() # 确保回滚当前事务 logger.error(f"Failed to create table {table_name} after {max_retries} attempts: {str(e)}") raise Exception(f"创建原始数据表失败: {str(e)}") @@ -82,6 +95,19 @@ class OriginalDataService(BaseService[OriginalData]): return False + def _warmup_table(self, db: Session, table_name: str) -> None: + """ + 预热表:执行一个简单查询来稳定表定义,避免"表定义变更"错误 + """ + try: + # 执行一个简单的COUNT查询来预热表 + warmup_query = text(f"SELECT COUNT(*) FROM `{table_name}` LIMIT 0") + db.execute(warmup_query) + logger.debug(f"Table {table_name} warmed up successfully") + except Exception as e: + logger.warning(f"Failed to warm up table {table_name}: {str(e)}") + # 预热失败不影响主流程,只是记录警告 + def create_table_for_account(self, db: Session, account_id: int) -> Dict[str, Any]: """ 为指定账号创建原始数据表 @@ -337,46 +363,40 @@ class OriginalDataService(BaseService[OriginalData]): table_name = self._get_table_name(account_id) - # 检查是否已在事务中(避免重复开始事务) - in_transaction = db.in_transaction() + # **重要**: 始终使用内部事务,确保数据能正确提交 + # 这是为了解决外部事务可能不提交的问题 + # in_transaction = db.in_transaction() - # 如果不在事务中,才需要手动管理事务 - if not in_transaction: - for attempt in range(2): # 最多重试1次 - try: - db.begin() - success_count = 0 - failed_count = 0 - failed_items = [] - - # 执行数据导入操作 - success_count = self._execute_import(db, table_name, data, account_id) - db.commit() - logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}") - break - - except Exception as e: - db.rollback() - logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") - if attempt == 1: # 最后一次重试失败 - logger.error("Batch import original data failed after retries") - return { - 'success': False, - 'message': f'批量导入失败: {str(e)}', - 'total_count': total_count, - 'success_count': 0, - 'failed_count': total_count, - 'failed_items': failed_items - } - else: - # 如果已在事务中,直接执行操作(不管理事务) + # 始终创建内部事务 + for attempt in range(2): # 最多重试1次 try: + db.begin() + success_count = 0 + failed_count = 0 + failed_items = [] + + # 执行数据导入操作 success_count = self._execute_import(db, table_name, data, account_id) - logger.info(f"Batch import original data completed in existing transaction. Success: {success_count}") + db.commit() + logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}") + break + except Exception as e: - logger.error(f"Batch import failed in existing transaction: {str(e)}") - # 抛出异常,让外部处理事务回滚 - raise + try: + db.rollback() + except: + pass # 如果回滚失败,忽略错误 + logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") + if attempt == 1: # 最后一次重试失败 + logger.error("Batch import original data failed after retries") + return { + 'success': False, + 'message': f'批量导入失败: {str(e)}', + 'total_count': total_count, + 'success_count': 0, + 'failed_count': total_count, + 'failed_items': failed_items + } return { 'success': True, @@ -479,4 +499,401 @@ class OriginalDataService(BaseService[OriginalData]): logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch_data)} records") logger.info(f"Batch import completed: {total_inserted} records inserted, {skipped_count} duplicates skipped") + return total_inserted + + def batch_import_original_data_new(self, db: Session, data: List[List[Dict[str, Any]]]) -> Dict[str, Any]: + """ + 新版批量导入原始数据 - 支持分组格式 data:[[{},{},{}],[{},{}]] + 里层一个[{},{}]称为一组数据,数据{}内容与旧接口一致 + 一组数据全部记录的NYID与account_id将会一样,不同组可能不同 + + 导入逻辑: + 1. 按account_id分表存储,没表就建表 + 2. 插入前根据NYID判断表中是否有重复数据 + 3. 有重复就删除表中全部同NYID数据,插入新的,不重复就直接插入 + + Args: + db: 数据库会话 + data: 分组数据列表,格式为 [[{},{},{}], [{},{}]] + + Returns: + 操作结果 + """ + logger = logging.getLogger(__name__) + + total_count = 0 + success_count = 0 + failed_count = 0 + failed_items = [] + + if not data or len(data) == 0: + return { + 'success': False, + 'message': '导入数据不能为空', + 'total_count': 0, + 'success_count': 0, + 'failed_count': 0, + 'failed_items': [] + } + + # 验证所有组的account_id和NYID是否一致,并按account_id分组 + group_validation_results = [] + account_id_to_groups = {} # {account_id: [group_indices]} + + for group_idx, group in enumerate(data): + if not group or len(group) == 0: + failed_items.append({ + 'group_index': group_idx, + 'message': f'第{group_idx + 1}组数据为空' + }) + continue + + # 获取组内第一个数据的account_id和NYID + first_item = group[0] + group_account_id = str(first_item.get('account_id')) + group_nyid = str(first_item.get('NYID')) + + if not group_account_id or not group_nyid: + failed_items.append({ + 'group_index': group_idx, + 'message': f'第{group_idx + 1}组数据缺少account_id或NYID字段' + }) + continue + + # 验证组内所有数据的account_id和NYID是否一致 + group_valid = True + for item_idx, item in enumerate(group): + item_account_id = str(item.get('account_id')) + item_nyid = str(item.get('NYID')) + + if item_account_id != group_account_id: + failed_items.append({ + 'group_index': group_idx, + 'item_index': item_idx, + 'message': f'第{group_idx + 1}组第{item_idx + 1}条数据account_id不一致,期望:{group_account_id},实际:{item_account_id}' + }) + group_valid = False + if item_nyid != group_nyid: + failed_items.append({ + 'group_index': group_idx, + 'item_index': item_idx, + 'message': f'第{group_idx + 1}组第{item_idx + 1}条数据NYID不一致,期望:{group_nyid},实际:{item_nyid}' + }) + group_valid = False + + # 记录验证结果 + group_validation_results.append({ + 'group_index': group_idx, + 'account_id': group_account_id, + 'nyid': group_nyid, + 'valid': group_valid, + 'data': group + }) + + # 按account_id分组 + if group_account_id not in account_id_to_groups: + account_id_to_groups[group_account_id] = [] + account_id_to_groups[group_account_id].append(group_idx) + + # 如果有验证错误,返回失败 + if failed_items: + return { + 'success': False, + 'message': f'数据验证失败,发现{len(failed_items)}个错误', + 'total_count': sum(len(group) for group in data if group), + 'success_count': 0, + 'failed_count': len(failed_items), + 'failed_items': failed_items + } + + # 记录总体统计 + total_count = sum(len(group['data']) for group in group_validation_results if group['valid']) + logger.info(f"Total valid groups: {len(group_validation_results)}, Total records: {total_count}") + + # **重要**: 始终使用内部事务,确保数据能正确提交 + # 这是为了解决外部事务可能不提交的问题 + # in_transaction = db.in_transaction() + # logger.info(f"Original transaction status: {'in_transaction' if in_transaction else 'not in_transaction'}") + + # 始终创建内部事务 + for attempt in range(2): + try: + logger.info(f"Starting internal transaction (attempt {attempt + 1})") + db.begin() + success_count = 0 + failed_count = 0 + failed_items = [] + + # 处理每个account_id + for account_id_str, group_indices in account_id_to_groups.items(): + account_id = int(account_id_str) + logger.info(f"Processing account_id: {account_id}, groups: {group_indices}") + + # 验证账号是否存在 + account = db.query(Account).filter(Account.id == account_id).first() + if not account: + error_msg = f'账号ID {account_id} 不存在' + logger.error(error_msg) + for group_idx in group_indices: + failed_count += len(group_validation_results[group_idx]['data']) + failed_items.append({ + 'group_index': group_idx, + 'message': error_msg + }) + continue + + # 确保表存在 + table_created = self._ensure_table_exists(db, account_id) + if not table_created: + error_msg = f'创建原始数据表失败 (account_id: {account_id})' + logger.error(error_msg) + for group_idx in group_indices: + failed_count += len(group_validation_results[group_idx]['data']) + failed_items.append({ + 'group_index': group_idx, + 'message': error_msg + }) + continue + + # 收集该account_id的所有组数据 + account_groups_data = [] + for group_idx in group_indices: + account_groups_data.append(group_validation_results[group_idx]['data']) + + # 执行分组导入操作 + table_name = self._get_table_name(account_id) + logger.info(f"Processing {len(account_groups_data)} groups for table: {table_name}") + group_results = self._execute_import_new(db, table_name, account_groups_data, account_id) + success_count += group_results['success_count'] + failed_count += group_results['failed_count'] + failed_items.extend(group_results['failed_items']) + logger.info(f"Account {account_id} completed: Success={group_results['success_count']}, Failed={group_results['failed_count']}") + + logger.info(f"Before commit: Success={success_count}, Failed={failed_count}") + db.commit() + logger.info(f"Transaction committed successfully! Success: {success_count}, Failed: {failed_count}") + break + + except Exception as e: + logger.error(f"Transaction rollback due to: {str(e)}") + try: + db.rollback() + except: + pass # 如果回滚失败,忽略错误 + logger.warning(f"Batch import attempt {attempt + 1} failed: {str(e)}") + if attempt == 1: + logger.error("Batch import original data failed after retries") + return { + 'success': False, + 'message': f'批量导入失败: {str(e)}', + 'total_count': total_count, + 'success_count': 0, + 'failed_count': total_count, + 'failed_items': [] + } + + return { + 'success': True if failed_count == 0 else False, + 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', + 'total_count': total_count, + 'success_count': success_count, + 'failed_count': failed_count, + 'failed_items': failed_items + } + + def _execute_import_new(self, db: Session, table_name: str, data: List[List[Dict[str, Any]]], account_id: int) -> Dict[str, Any]: + """ + 执行新版分组导入操作 + + Args: + db: 数据库会话 + table_name: 表名 + data: 分组数据列表 + account_id: 账号ID + + Returns: + 操作结果 + """ + logger.info(f"Starting batch import (new) into {table_name}, total groups: {len(data)}") + + # 导入前预热表,确保表定义稳定 + self._warmup_table(db, table_name) + + success_count = 0 + failed_count = 0 + failed_items = [] + + # 收集所有NYID和对应数据(用于批量查询沉降数据) + all_nyids = set() + + for group in data: + if group and len(group) > 0: + group_nyid = str(group[0].get('NYID')) + all_nyids.add(group_nyid) + + # 批量查询沉降数据是否存在 + from ..models.settlement_data import SettlementData + if all_nyids: + settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(list(all_nyids))).all() + settlement_map = {s.NYID: s for s in settlements} + missing_nyids = all_nyids - set(settlement_map.keys()) + + if missing_nyids: + raise Exception(f'以下期数在沉降表中不存在: {list(missing_nyids)}') + + logger.info(f"Found {len(all_nyids)} unique NYIDs to process: {list(all_nyids)}") + + # 按NYID分组,查询并处理重复数据 + processed_nyids = set() + group_index = 0 + + for group in data: + if not group or len(group) == 0: + continue + + group_nyid = str(group[0].get('NYID')) + logger.info(f"Processing group {group_index}: NYID={group_nyid}, {len(group)} records") + + # 如果已经处理过这个NYID,跳过并记录到failed_items + if group_nyid in processed_nyids: + logger.warning(f"NYID {group_nyid} appears in multiple groups (group {group_index}), skipping duplicate") + failed_count += len(group) + failed_items.append({ + 'group_index': group_index, + 'group_nyid': group_nyid, + 'message': f'NYID {group_nyid} 已在之前组中处理,跳过重复组' + }) + group_index += 1 + continue + + processed_nyids.add(group_nyid) + + # 添加重试机制处理表定义变更错误 + max_retries = 3 + for attempt in range(max_retries): + try: + # 检查是否已存在该NYID的数据 + existing_query = text(f"SELECT id FROM `{table_name}` WHERE NYID = :nyid AND account_id = :account_id") + existing_result = db.execute(existing_query, {"nyid": group_nyid, "account_id": account_id}).fetchall() + logger.debug(f"Found {len(existing_result)} existing records for NYID {group_nyid}") + + # 如果存在,删除旧数据 + if existing_result: + delete_query = text(f"DELETE FROM `{table_name}` WHERE NYID = :nyid AND account_id = :account_id") + db.execute(delete_query, {"nyid": group_nyid, "account_id": account_id}) + logger.info(f"Deleted {len(existing_result)} existing records for NYID {group_nyid}") + + # 准备插入当前组的数据 + to_insert = [] + for item in group: + # 确保NYID和account_id是字符串类型 + item_copy = item.copy() + item_copy['NYID'] = str(item_copy.get('NYID')) + item_copy['account_id'] = str(account_id) + to_insert.append(item_copy) + + # 批量插入当前组的数据 + if to_insert: + logger.info(f"Inserting {len(to_insert)} records for NYID {group_nyid} into {table_name}") + inserted_count = self._batch_insert_group(db, table_name, to_insert, account_id) + logger.info(f"Successfully inserted {inserted_count} records for NYID {group_nyid}") + success_count += inserted_count + + # 成功,退出重试循环 + break + + except Exception as e: + error_msg = str(e) + if "Table definition has changed" in error_msg: + if attempt < max_retries - 1: + # 重试前重新预热表 + import time + time.sleep(0.1 * (attempt + 1)) + self._warmup_table(db, table_name) + logger.warning(f"Table definition changed for {group_nyid}, retrying (attempt {attempt + 2})...") + continue + else: + failed_count += len(group) + failed_items.append({ + 'group_index': group_index, + 'group_nyid': group_nyid, + 'message': f'处理NYID {group_nyid} 失败: 表定义变更,重试{attempt + 1}次后仍失败' + }) + logger.error(f"Failed to process group with NYID {group_nyid} after {max_retries} retries: {str(e)}") + break + else: + failed_count += len(group) + failed_items.append({ + 'group_index': group_index, + 'group_nyid': group_nyid, + 'message': f'处理NYID {group_nyid} 失败: {str(e)}' + }) + logger.error(f"Failed to process group with NYID {group_nyid}: {str(e)}") + break + + group_index += 1 + + logger.info(f"Batch import (new) completed: {success_count} records inserted, {failed_count} failed") + return { + 'success_count': success_count, + 'failed_count': failed_count, + 'failed_items': failed_items + } + + def _batch_insert_group(self, db: Session, table_name: str, data: List[Dict[str, Any]], account_id: int) -> int: + """ + 批量插入一组数据 + + Args: + db: 数据库会话 + table_name: 表名 + data: 要插入的数据列表 + account_id: 账号ID + + Returns: + 插入的记录数 + """ + if not data: + return 0 + + # 将数据分组,每组1000条(MySQL默认支持) + batch_size = 1000 + total_inserted = 0 + + for i in range(0, len(data), batch_size): + batch_data = data[i:i + batch_size] + + # 构建批量参数 + values_list = [] + params = {} + logger.info(f"Preparing to insert batch of {len(batch_data)} records into {table_name}") + for idx, item_data in enumerate(batch_data): + values_list.append( + f"(:account_id_{idx}, :bfpcode_{idx}, :mtime_{idx}, :bffb_{idx}, " + f":bfpl_{idx}, :bfpvalue_{idx}, :NYID_{idx}, :sort_{idx})" + ) + params.update({ + f"account_id_{idx}": str(account_id), + f"bfpcode_{idx}": item_data.get('bfpcode'), + f"mtime_{idx}": item_data.get('mtime'), + f"bffb_{idx}": item_data.get('bffb'), + f"bfpl_{idx}": item_data.get('bfpl'), + f"bfpvalue_{idx}": item_data.get('bfpvalue'), + f"NYID_{idx}": str(item_data.get('NYID')), + f"sort_{idx}": item_data.get('sort') + }) + + # 批量插入SQL + insert_sql = f""" + INSERT INTO `{table_name}` + (account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort) + VALUES {", ".join(values_list)} + """ + final_sql = text(insert_sql) + logger.debug(f"Final SQL preview: {insert_sql[:200]}...") # 只打印前200个字符 + result = db.execute(final_sql, params) + logger.info(f"Batch insert successful: {len(batch_data)} records affected") + total_inserted += len(batch_data) + logger.debug(f"Inserted batch {i//batch_size + 1}: {len(batch_data)} records") + return total_inserted \ No newline at end of file