""" 数据库操作服务层 直接操作数据库,跳过HTTP请求环节 复用现有的业务逻辑和模型 """ import os import sys import logging from pathlib import Path from typing import List, Optional, Dict, Any # 添加项目根目录到Python路径 PROJECT_ROOT = Path(__file__).parent.parent.parent.absolute() sys.path.insert(0, str(PROJECT_ROOT)) # 导入数据库操作相关模块 from sqlalchemy import create_engine, text, inspect from sqlalchemy.orm import sessionmaker, Session from dotenv import load_dotenv # 导入数据模型 from app.models.section_data import SectionData from app.models.checkpoint import Checkpoint from app.models.settlement_data import SettlementData from app.models.level_data import LevelData from app.models.account import Account # 导入原始数据动态表支持 from app.models.original_data import get_original_data_model, get_table_name # 加载环境变量 load_dotenv(PROJECT_ROOT / ".env") # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class DatabaseService: """数据库操作服务类 - 直接操作数据库""" def __init__(self): """初始化数据库连接""" # 构建数据库URL database_url = os.getenv("DATABASE_URL") if not database_url: # 从单独的数据库配置构建URL db_host = os.getenv("DB_HOST", "localhost") db_port = os.getenv("DB_PORT", "3306") db_user = os.getenv("DB_USER", "root") db_password = os.getenv("DB_PASSWORD", "root") db_name = os.getenv("DB_NAME", "railway") database_url = f"mysql+pymysql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}" logger.info(f"数据库连接: {database_url}") # 创建数据库引擎和会话工厂 self.engine = create_engine( database_url, pool_pre_ping=True, pool_recycle=3600, echo=False # 设置为True可以看到SQL语句 ) self.SessionLocal = sessionmaker(bind=self.engine) def get_db_session(self) -> Session: """获取数据库会话""" return self.SessionLocal() # ==================== 断面数据服务 ==================== def batch_import_sections(self, db: Session, data: List[Dict[str, Any]]) -> Dict[str, Any]: """批量导入断面数据 - 复用现有业务逻辑""" total_count = len(data) success_count = 0 failed_count = 0 failed_items = [] if total_count == 0: return { 'success': False, 'message': '导入数据不能为空', 'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': [] } try: db.begin() # 批量查询现有断面数据(IN查询) section_id_list = list(set(str(item.get('section_id')) for item in data if item.get('section_id'))) logger.info(f"Checking {len(section_id_list)} unique section_ids") existing_sections = db.query(SectionData).filter(SectionData.section_id.in_(section_id_list)).all() # 使用section_id创建查找表 existing_map = { section.section_id: section for section in existing_sections } logger.info(f"Found {len(existing_sections)} existing sections") # 批量处理插入和跳过 to_insert = [] for item_data in data: section_id = str(item_data.get('section_id')) if section_id in existing_map: # 数据已存在,跳过 logger.info(f"Continue section data: {section_id}") failed_count += 1 failed_items.append({ 'data': item_data, 'error': '数据已存在,跳过插入操作' }) else: # 记录需要插入的数据 to_insert.append(item_data) # 执行批量插入 if to_insert: logger.info(f"Inserting {len(to_insert)} new records") batch_size = 500 for i in range(0, len(to_insert), batch_size): batch = to_insert[i:i + batch_size] try: section_data_list = [ SectionData( section_id=str(item.get('section_id')), mileage=item.get('mileage'), work_site=item.get('work_site'), basic_types=item.get('basic_types'), height=item.get('height'), status=item.get('status'), number=str(item.get('number')) if item.get('number') else None, transition_paragraph=item.get('transition_paragraph'), design_fill_height=item.get('design_fill_height'), compression_layer_thickness=item.get('compression_layer_thickness'), treatment_depth=item.get('treatment_depth'), foundation_treatment_method=item.get('foundation_treatment_method'), rock_mass_classification=item.get('rock_mass_classification'), account_id=str(item.get('account_id')) if item.get('account_id') else None ) for item in batch ] db.add_all(section_data_list) success_count += len(batch) logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records") except Exception as e: failed_count += len(batch) failed_items.extend([ { 'data': item, 'error': f'插入失败: {str(e)}' } for item in batch ]) logger.error(f"Failed to insert batch: {str(e)}") raise e db.commit() logger.info(f"Batch import sections completed. Success: {success_count}, Failed: {failed_count}") return { 'success': True, 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', 'total_count': total_count, 'success_count': success_count, 'failed_count': failed_count, 'failed_items': failed_items } except Exception as e: db.rollback() logger.error(f"Batch import sections failed: {str(e)}") return { 'success': False, 'message': f'批量导入失败: {str(e)}', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': failed_items } # ==================== 观测点数据服务 ==================== def batch_import_checkpoints(self, db: Session, data: List[Dict[str, Any]]) -> Dict[str, Any]: """批量导入观测点数据 - 复用现有业务逻辑""" total_count = len(data) success_count = 0 failed_count = 0 failed_items = [] if total_count == 0: return { 'success': False, 'message': '导入数据不能为空', 'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': [] } try: db.begin() # 批量查询断面数据(IN查询) section_id_list = list(set(str(item.get('section_id')) for item in data if item.get('section_id'))) logger.info(f"Checking {len(section_id_list)} unique section_ids in section data") sections = db.query(SectionData).filter(SectionData.section_id.in_(section_id_list)).all() section_map = {s.section_id: s for s in sections} missing_section_ids = set(section_id_list) - set(section_map.keys()) # 记录缺失的断面 for item_data in data: section_id = str(item_data.get('section_id')) if section_id in missing_section_ids: failed_count += 1 failed_items.append({ 'data': item_data, 'error': '断面ID不存在,跳过插入操作' }) # 如果所有数据都失败,直接返回 if failed_count == total_count: db.rollback() return { 'success': False, 'message': '所有断面ID都不存在', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': failed_items } # 批量查询现有观测点数据 valid_items = [item for item in data if str(item.get('section_id')) not in missing_section_ids] if valid_items: point_id_list = list(set(str(item.get('point_id')) for item in valid_items if item.get('point_id'))) existing_checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_id_list)).all() existing_map = { checkpoint.point_id: checkpoint for checkpoint in existing_checkpoints } logger.info(f"Found {len(existing_checkpoints)} existing checkpoints") to_insert = [] for item_data in valid_items: point_id = str(item_data.get('point_id')) if point_id in existing_map: # 数据已存在,跳过 logger.info(f"Continue checkpoint data: {point_id}") failed_count += 1 failed_items.append({ 'data': item_data, 'error': '数据已存在,跳过插入操作' }) else: to_insert.append(item_data) # 执行批量插入 if to_insert: logger.info(f"Inserting {len(to_insert)} new records") batch_size = 500 for i in range(0, len(to_insert), batch_size): batch = to_insert[i:i + batch_size] try: checkpoint_list = [ Checkpoint( point_id=str(item.get('point_id')), aname=item.get('aname'), section_id=str(item.get('section_id')), burial_date=item.get('burial_date') ) for item in batch ] db.add_all(checkpoint_list) success_count += len(batch) logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records") except Exception as e: failed_count += len(batch) failed_items.extend([ { 'data': item, 'error': f'插入失败: {str(e)}' } for item in batch ]) logger.error(f"Failed to insert batch: {str(e)}") raise e db.commit() logger.info(f"Batch import checkpoints completed. Success: {success_count}, Failed: {failed_count}") return { 'success': True, 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', 'total_count': total_count, 'success_count': success_count, 'failed_count': failed_count, 'failed_items': failed_items } except Exception as e: db.rollback() logger.error(f"Batch import checkpoints failed: {str(e)}") return { 'success': False, 'message': f'批量导入失败: {str(e)}', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': failed_items } # ==================== 沉降数据服务 ==================== def batch_import_settlement_data(self, db: Session, data: List[Dict[str, Any]]) -> Dict[str, Any]: """批量导入沉降数据 - 复用现有业务逻辑""" total_count = len(data) success_count = 0 failed_count = 0 failed_items = [] if total_count == 0: return { 'success': False, 'message': '导入数据不能为空', 'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': [] } try: db.begin() # 批量查询观测点数据 point_id_list = list(set(str(item.get('point_id')) for item in data if item.get('point_id'))) logger.info(f"Checking {len(point_id_list)} unique point_ids in checkpoint data") checkpoints = db.query(Checkpoint).filter(Checkpoint.point_id.in_(point_id_list)).all() checkpoint_map = {c.point_id: c for c in checkpoints} missing_point_ids = set(point_id_list) - set(checkpoint_map.keys()) # 记录缺失的观测点 for item_data in data: point_id = str(item_data.get('point_id')) if point_id in missing_point_ids: failed_count += 1 failed_items.append({ 'data': item_data, 'error': '测点id不存在,跳过插入操作' }) if failed_count == total_count: db.rollback() return { 'success': False, 'message': '所有观测点ID都不存在', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': failed_items } # 批量查询现有沉降数据 valid_items = [item for item in data if str(item.get('point_id')) not in missing_point_ids] if valid_items: existing_data = db.query(SettlementData).filter( SettlementData.point_id.in_(point_id_list) ).all() existing_map = { f"{item.point_id}_{item.NYID}": item for item in existing_data } logger.info(f"Found {len(existing_data)} existing settlement records") to_insert = [] for item_data in valid_items: point_id = str(item_data.get('point_id')) nyid = str(item_data.get('NYID')) key = f"{point_id}_{nyid}" if key in existing_map: # 数据已存在,跳过 logger.info(f"Continue settlement data: {point_id}-{nyid}") failed_count += 1 failed_items.append({ 'data': item_data, 'error': '数据已存在,跳过插入操作' }) else: to_insert.append(item_data) # 执行批量插入 if to_insert: logger.info(f"Inserting {len(to_insert)} new records") batch_size = 500 for i in range(0, len(to_insert), batch_size): batch = to_insert[i:i + batch_size] try: settlement_data_list = [ SettlementData( point_id=str(item.get('point_id')), CVALUE=item.get('CVALUE'), MAVALUE=item.get('MAVALUE'), MTIME_W=item.get('MTIME_W'), NYID=str(item.get('NYID')), PRELOADH=item.get('PRELOADH'), PSTATE=item.get('PSTATE'), REMARK=item.get('REMARK'), WORKINFO=item.get('WORKINFO'), createdate=item.get('createdate'), day=item.get('day'), day_jg=item.get('day_jg'), isgzjdxz=item.get('isgzjdxz'), mavalue_bc=item.get('mavalue_bc'), mavalue_lj=item.get('mavalue_lj'), sjName=item.get('sjName'), useflag=item.get('useflag'), workinfoname=item.get('workinfoname'), upd_remark=item.get('upd_remark') ) for item in batch ] db.add_all(settlement_data_list) success_count += len(batch) logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records") except Exception as e: failed_count += len(batch) failed_items.extend([ { 'data': item, 'error': f'插入失败: {str(e)}' } for item in batch ]) logger.error(f"Failed to insert batch: {str(e)}") raise e db.commit() logger.info(f"Batch import settlement data completed. Success: {success_count}, Failed: {failed_count}") return { 'success': True, 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', 'total_count': total_count, 'success_count': success_count, 'failed_count': failed_count, 'failed_items': failed_items } except Exception as e: db.rollback() logger.error(f"Batch import settlement data failed: {str(e)}") return { 'success': False, 'message': f'批量导入失败: {str(e)}', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': failed_items } # ==================== 水准数据服务 ==================== def batch_import_level_data(self, db: Session, data: List[Dict[str, Any]]) -> Dict[str, Any]: """批量导入水准数据 - 复用现有业务逻辑""" total_count = len(data) success_count = 0 failed_count = 0 failed_items = [] if total_count == 0: return { 'success': False, 'message': '导入数据不能为空', 'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': [] } try: db.begin() # 批量查询沉降数据 nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID'))) logger.info(f"Checking {len(nyid_list)} unique NYIDs in settlement data") settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all() settlement_map = {s.NYID: s for s in settlements} missing_nyids = set(nyid_list) - set(settlement_map.keys()) # 记录缺失的NYID for item_data in data: nyid = str(item_data.get('NYID')) if nyid in missing_nyids: failed_count += 1 failed_items.append({ 'data': item_data, 'error': '期数ID在沉降表中不存在,跳过插入操作' }) if failed_count == total_count: db.rollback() return { 'success': False, 'message': '所有期数ID在沉降表中都不存在', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': failed_items } # 批量查询现有水准数据 valid_items = [item for item in data if str(item.get('NYID')) not in missing_nyids] if valid_items: existing_data = db.query(LevelData).filter( LevelData.NYID.in_(nyid_list) ).all() existing_map = { f"{item.NYID}_{item.linecode}": item for item in existing_data } logger.info(f"Found {len(existing_data)} existing level records") to_insert = [] for item_data in valid_items: nyid = str(item_data.get('NYID')) linecode = item_data.get('linecode') key = f"{nyid}_{linecode}" if key in existing_map: # 数据已存在,跳过 logger.info(f"Continue level data: {nyid}-{linecode}") failed_count += 1 failed_items.append({ 'data': item_data, 'error': '数据已存在,跳过插入操作' }) else: to_insert.append(item_data) # 执行批量插入 if to_insert: logger.info(f"Inserting {len(to_insert)} new records") batch_size = 500 for i in range(0, len(to_insert), batch_size): batch = to_insert[i:i + batch_size] try: level_data_list = [ LevelData( linecode=str(item.get('linecode')), benchmarkids=item.get('benchmarkids'), wsphigh=item.get('wsphigh'), mtype=item.get('mtype'), NYID=str(item.get('NYID')), createDate=item.get('createDate'), wspversion=item.get('wspversion'), barometric=str(item.get('barometric')) if item.get('barometric') is not None else None, equipbrand=item.get('equipbrand'), instrumodel=item.get('instrumodel'), serialnum=item.get('serialnum'), sjname=item.get('sjname'), temperature=str(item.get('temperature')) if item.get('temperature') is not None else None, weather=str(item.get('weather')) if item.get('weather') is not None else None ) for item in batch ] db.add_all(level_data_list) success_count += len(batch) logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records") except Exception as e: failed_count += len(batch) failed_items.extend([ { 'data': item, 'error': f'插入失败: {str(e)}' } for item in batch ]) logger.error(f"Failed to insert batch: {str(e)}") raise e db.commit() logger.info(f"Batch import level data completed. Success: {success_count}, Failed: {failed_count}") return { 'success': True, 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', 'total_count': total_count, 'success_count': success_count, 'failed_count': failed_count, 'failed_items': failed_items } except Exception as e: db.rollback() logger.error(f"Batch import level data failed: {str(e)}") return { 'success': False, 'message': f'批量导入失败: {str(e)}', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': failed_items } # ==================== 原始数据服务 ==================== def _ensure_table_exists(self, account_id: int) -> bool: """确保指定账号的原始数据表存在,不存在则创建""" table_name = get_table_name(account_id) inspector = inspect(self.engine) # 检查表是否存在 if table_name in inspector.get_table_names(): logger.info(f"Table {table_name} already exists") return True # 表不存在,创建表 max_retries = 3 for attempt in range(max_retries): try: create_table_sql = f""" CREATE TABLE `{table_name}` ( `id` INT AUTO_INCREMENT PRIMARY KEY, `account_id` INT NOT NULL COMMENT '账号ID', `bfpcode` VARCHAR(1000) NOT NULL COMMENT '前(后)视点名称', `mtime` DATETIME NOT NULL COMMENT '测点观测时间', `bffb` VARCHAR(1000) NOT NULL COMMENT '前(后)视标记符', `bfpl` VARCHAR(1000) NOT NULL COMMENT '前(后)视距离(m)', `bfpvalue` VARCHAR(1000) NOT NULL COMMENT '前(后)视尺读数(m)', `NYID` VARCHAR(100) NOT NULL COMMENT '期数id', `sort` INT COMMENT '序号', INDEX `idx_nyid` (`NYID`), INDEX `idx_account_id` (`account_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='原始数据表_账号{account_id}' """ # 使用引擎直接执行,不需要事务(CREATE TABLE是自动提交的) with self.engine.begin() as conn: conn.execute(text(create_table_sql)) logger.info(f"Table {table_name} created successfully") return True except Exception as e: logger.warning(f"Attempt {attempt + 1} to create table {table_name} failed: {str(e)}") if attempt == max_retries - 1: logger.error(f"Failed to create table {table_name} after {max_retries} attempts") return False import time time.sleep(0.1 * (attempt + 1)) return False def batch_import_original_data(self, db: Session, data: List[Dict[str, Any]]) -> Dict[str, Any]: """批量导入原始数据 - 复用现有业务逻辑,支持分表""" total_count = len(data) success_count = 0 failed_count = 0 failed_items = [] if total_count == 0: return { 'success': False, 'message': '导入数据不能为空', 'total_count': 0, 'success_count': 0, 'failed_count': 0, 'failed_items': [] } # 获取数据的account_id account_id = data[0].get('account_id') if not account_id: return { 'success': False, 'message': '数据中缺少account_id字段', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': [] } # 验证账号是否存在 account = db.query(Account).filter(Account.id == account_id).first() if not account: return { 'success': False, 'message': f'账号ID {account_id} 不存在', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': [] } # 确保表存在 table_created = self._ensure_table_exists(account_id) if not table_created: return { 'success': False, 'message': '创建原始数据表失败', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': [] } table_name = get_table_name(account_id) try: # 注意:不开启db.begin(),由上层方法管理事务 # 批量查询沉降数据是否存在 nyid_list = list(set(str(item.get('NYID')) for item in data if item.get('NYID'))) from app.models.settlement_data import SettlementData settlements = db.query(SettlementData).filter(SettlementData.NYID.in_(nyid_list)).all() settlement_map = {s.NYID: s for s in settlements} missing_nyids = set(nyid_list) - set(settlement_map.keys()) if missing_nyids: raise Exception(f'以下期数在沉降表中不存在: {list(missing_nyids)}') # 查询现有原始数据 existing_data = db.query(text("*")).from_statement( text(f"SELECT * FROM `{table_name}` WHERE account_id = :account_id") ).params(account_id=account_id).all() existing_map = { f"{item[7]}_{item[8]}": item # NYID是第8个字段(索引7),sort是第9个字段(索引8) for item in existing_data } logger.info(f"Found {len(existing_data)} existing records in {table_name}") # 批量处理插入和跳过 to_insert = [] skipped_count = 0 for item_data in data: nyid = str(item_data.get('NYID')) sort = item_data.get('sort') key = f"{nyid}_{sort}" if key in existing_map: # 数据已存在,跳过 skipped_count += 1 else: to_insert.append(item_data) logger.info(f"Filtered {skipped_count} duplicate records, {len(to_insert)} new records to insert") # 执行批量插入 if to_insert: logger.info(f"Inserting {len(to_insert)} new records") batch_size = 1000 for i in range(0, len(to_insert), batch_size): batch = to_insert[i:i + batch_size] # 构建批量参数 values_list = [] params = {} for idx, item_data in enumerate(batch): values_list.append( f"(:account_id_{idx}, :bfpcode_{idx}, :mtime_{idx}, :bffb_{idx}, " f":bfpl_{idx}, :bfpvalue_{idx}, :NYID_{idx}, :sort_{idx})" ) params.update({ f"account_id_{idx}": account_id, f"bfpcode_{idx}": item_data.get('bfpcode'), f"mtime_{idx}": item_data.get('mtime'), f"bffb_{idx}": item_data.get('bffb'), f"bfpl_{idx}": item_data.get('bfpl'), f"bfpvalue_{idx}": item_data.get('bfpvalue'), f"NYID_{idx}": item_data.get('NYID'), f"sort_{idx}": item_data.get('sort') }) # 批量插入SQL insert_sql = f""" INSERT INTO `{table_name}` (account_id, bfpcode, mtime, bffb, bfpl, bfpvalue, NYID, sort) VALUES {", ".join(values_list)} """ final_sql = text(insert_sql) db.execute(final_sql, params) success_count += len(batch) logger.info(f"Inserted batch {i//batch_size + 1}: {len(batch)} records") # 注意:不在这里提交事务,由上层方法管理事务 logger.info(f"Batch import original data completed. Success: {success_count}, Failed: {failed_count}") return { 'success': True, 'message': '批量导入完成' if failed_count == 0 else f'部分导入失败', 'total_count': total_count, 'success_count': success_count, 'failed_count': failed_count, 'failed_items': failed_items } except Exception as e: # 注意:不在这里回滚,由上层方法管理事务 logger.error(f"Batch import original data failed: {str(e)}") return { 'success': False, 'message': f'批量导入失败: {str(e)}', 'total_count': total_count, 'success_count': 0, 'failed_count': total_count, 'failed_items': failed_items }