commit f8e85beba1683e5f0134abab74e8f8c10c91ed09 Author: lhx Date: Fri Dec 12 10:57:31 2025 +0800 初始化 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b01ba3e --- /dev/null +++ b/.gitignore @@ -0,0 +1,61 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# 虚拟环境 +.venv/ +venv/ +ENV/ +env/ + +# 环境配置 +.env +.env.local +.env.*.local + +# IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# 日志 +logs/ +*.log + +# 测试 +.pytest_cache/ +.coverage +htmlcov/ +.tox/ +.nox/ + +# 缓存 +.cache/ +*.pyc + +# 系统文件 +.DS_Store +Thumbs.db + +# 其他 +work.md diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..c9f6f4f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,22 @@ +FROM python:3.12-slim + +WORKDIR /app + +ENV PYTHONPATH=/app +ENV PYTHONUNBUFFERED=1 +ENV TZ=Asia/Shanghai + +RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone + +RUN pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +RUN mkdir -p /app/logs && chmod 755 /app/logs + +EXPOSE 8000 + +CMD ["gunicorn", "app.main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000", "--timeout", "120"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..ce7cd0b --- /dev/null +++ b/README.md @@ -0,0 +1,152 @@ +# 工程围岩数据信息处理系统 + +基于 FastAPI + MySQL + SQLAlchemy 的工程围岩数据管理系统。 + +## 环境要求 + +- Python 3.12+ +- MySQL 8.0+ + +## 安装 + +```bash +# 创建虚拟环境 +python -m venv .venv + +# 激活虚拟环境 +# Windows +.venv\Scripts\activate +# Linux/Mac +source .venv/bin/activate + +# 安装依赖 +pip install -r requirements.txt +``` + +## 配置 + +编辑 `.env` 文件: + +```env +APP_HOST=0.0.0.0 +APP_PORT=8000 +APP_DEBUG=true + +# Railway数据库(账号表) +RAILWAY_DB_HOST=localhost +RAILWAY_DB_PORT=3306 +RAILWAY_DB_USER=root +RAILWAY_DB_PASSWORD=your_password +RAILWAY_DB_NAME=railway + +# Tunnel数据库(业务数据) +TUNNEL_DB_HOST=localhost +TUNNEL_DB_PORT=3306 +TUNNEL_DB_USER=root +TUNNEL_DB_PASSWORD=your_password +TUNNEL_DB_NAME=Tunnel +``` + +## 运行 + +```bash +# 开发模式 +python main.py + +# 生产模式(Docker) +docker compose up -d +``` + +## API 接口 + +所有接口均为 POST 类型,访问 `/docs` 查看完整文档。 + +### 工区数据 + +```bash +# 批量导入 +POST /api/work_area/import +{ + "account_id": 1, + "data": [ + {"department_id": "D001", "parent_id": null, "type": "标段", "name": "一标段"} + ] +} + +# 查询 +POST /api/work_area/query +{"account_id": 1, "type": "标段", "page": 1, "page_size": 20} +``` + +### 断面数据 + +```bash +# 批量导入 +POST /api/section_data/import +{ + "account_id": 1, + "data": [ + {"section_id": "S001", "department_id": "D001", "mileage": "DK100+500", "rock_mass_classification": "III"} + ] +} + +# 查询 +POST /api/section_data/query +{"account_id": 1, "department_id": "D001", "page": 1, "page_size": 20} +``` + +### 观测点数据 + +```bash +# 批量导入 +POST /api/checkpoint/import +{ + "account_id": 1, + "data": [ + {"point_id": "P001", "section_id": "S001", "name": "拱顶沉降"} + ] +} + +# 按department查询(含断面里程和围岩级别) +POST /api/checkpoint/query_by_department +{"account_id": 1, "department_id": "D001", "page": 1, "page_size": 20} +``` + +### 量测数据 + +```bash +# 批量导入 +POST /api/measurement_data/import +{ + "account_id": 1, + "data": [ + {"point_id": "P001", "monitoring_time": "2024-01-01T10:00:00", "cumulative_deformation": "5.2"} + ] +} + +# 按department查询(含断面里程、围岩级别、观测点名称) +POST /api/measurement_data/query_by_department +{"account_id": 1, "department_id": "D001", "page": 1, "page_size": 20} +``` + +## 数据库分表 + +业务数据按 `account_id` 分表存储: +- `work_area_{account_id}` +- `section_data_{account_id}` +- `checkpoint_{account_id}` +- `measurement_data_{account_id}` + +## 日志 + +日志文件位于 `logs/` 目录: +- `access.log` - 接口访问日志 +- `app.log` - 业务日志 +- `database.log` - 数据库日志 + +## 部署 + +```bash +chmod +x deploy.sh +./deploy.sh +``` diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..132f09c --- /dev/null +++ b/app/__init__.py @@ -0,0 +1 @@ +# 工程围岩数据信息处理系统 diff --git a/app/api/__init__.py b/app/api/__init__.py new file mode 100644 index 0000000..61cc10e --- /dev/null +++ b/app/api/__init__.py @@ -0,0 +1,4 @@ +from .work_area import router as work_area_router +from .section_data import router as section_data_router +from .checkpoint import router as checkpoint_router +from .measurement_data import router as measurement_data_router diff --git a/app/api/checkpoint.py b/app/api/checkpoint.py new file mode 100644 index 0000000..9b96155 --- /dev/null +++ b/app/api/checkpoint.py @@ -0,0 +1,62 @@ +"""观测点数据接口""" +from fastapi import APIRouter, Depends +from sqlalchemy.orm import Session +from pydantic import BaseModel +from typing import Optional +from app.core.database import get_tunnel_db +from app.core.logging_config import get_logger +from app.schemas.checkpoint import CheckpointBatchImport, CheckpointQuery +from app.schemas.common import BatchImportResponse +from app.servives.checkpoint_service import CheckpointService + +router = APIRouter(prefix="/checkpoint", tags=["观测点数据"]) +logger = get_logger(__name__) + +class CheckpointByDepartmentQuery(BaseModel): + """根据department_id查询观测点""" + account_id: int + department_id: str + page: int = 1 + page_size: int = 20 + +@router.post("/import", response_model=BatchImportResponse) +async def batch_import( + request: CheckpointBatchImport, + db: Session = Depends(get_tunnel_db) +): + """批量导入观测点数据""" + logger.info(f"观测点数据导入请求: account_id={request.account_id}, 数据量={len(request.data)}") + return CheckpointService.batch_import(db, request.account_id, request.data) + +@router.post("/query") +async def query( + request: CheckpointQuery, + db: Session = Depends(get_tunnel_db) +): + """查询观测点数据""" + logger.info(f"观测点数据查询请求: {request}") + items, total = CheckpointService.query(db, request) + return { + "total": total, + "page": request.page, + "page_size": request.page_size, + "items": items + } + +@router.post("/query_by_department") +async def query_by_department( + request: CheckpointByDepartmentQuery, + db: Session = Depends(get_tunnel_db) +): + """根据department_id查询观测点数据(包含断面里程和围岩级别)""" + logger.info(f"根据department_id查询观测点数据: {request}") + items, total = CheckpointService.query_by_department( + db, request.account_id, request.department_id, + request.page, request.page_size + ) + return { + "total": total, + "page": request.page, + "page_size": request.page_size, + "items": items + } diff --git a/app/api/measurement_data.py b/app/api/measurement_data.py new file mode 100644 index 0000000..a98ffc1 --- /dev/null +++ b/app/api/measurement_data.py @@ -0,0 +1,66 @@ +"""量测数据接口""" +from fastapi import APIRouter, Depends +from sqlalchemy.orm import Session +from pydantic import BaseModel +from typing import Optional +from datetime import datetime +from app.core.database import get_tunnel_db +from app.core.logging_config import get_logger +from app.schemas.measurement_data import MeasurementDataBatchImport, MeasurementDataQuery +from app.schemas.common import BatchImportResponse +from app.servives.measurement_data_service import MeasurementDataService + +router = APIRouter(prefix="/measurement_data", tags=["量测数据"]) +logger = get_logger(__name__) + +class MeasurementByDepartmentQuery(BaseModel): + """根据department_id查询量测数据""" + account_id: int + department_id: str + monitoring_time_start: Optional[datetime] = None + monitoring_time_end: Optional[datetime] = None + page: int = 1 + page_size: int = 20 + +@router.post("/import", response_model=BatchImportResponse) +async def batch_import( + request: MeasurementDataBatchImport, + db: Session = Depends(get_tunnel_db) +): + """批量导入量测数据""" + logger.info(f"量测数据导入请求: account_id={request.account_id}, 数据量={len(request.data)}") + return MeasurementDataService.batch_import(db, request.account_id, request.data) + +@router.post("/query") +async def query( + request: MeasurementDataQuery, + db: Session = Depends(get_tunnel_db) +): + """查询量测数据""" + logger.info(f"量测数据查询请求: {request}") + items, total = MeasurementDataService.query(db, request) + return { + "total": total, + "page": request.page, + "page_size": request.page_size, + "items": items + } + +@router.post("/query_by_department") +async def query_by_department( + request: MeasurementByDepartmentQuery, + db: Session = Depends(get_tunnel_db) +): + """根据department_id查询量测数据(包含断面里程、围岩级别和观测点名称)""" + logger.info(f"根据department_id查询量测数据: {request}") + items, total = MeasurementDataService.query_by_department( + db, request.account_id, request.department_id, + request.page, request.page_size, + request.monitoring_time_start, request.monitoring_time_end + ) + return { + "total": total, + "page": request.page, + "page_size": request.page_size, + "items": items + } diff --git a/app/api/section_data.py b/app/api/section_data.py new file mode 100644 index 0000000..21c7201 --- /dev/null +++ b/app/api/section_data.py @@ -0,0 +1,56 @@ +"""断面数据接口""" +from fastapi import APIRouter, Depends +from sqlalchemy.orm import Session +from app.core.database import get_tunnel_db +from app.core.logging_config import get_logger +from app.schemas.section_data import SectionDataBatchImport, SectionDataQuery +from app.schemas.common import BatchImportResponse +from app.servives.section_data_service import SectionDataService + +router = APIRouter(prefix="/section_data", tags=["断面数据"]) +logger = get_logger(__name__) + +@router.post("/import", response_model=BatchImportResponse) +async def batch_import( + request: SectionDataBatchImport, + db: Session = Depends(get_tunnel_db) +): + """批量导入断面数据""" + logger.info(f"断面数据导入请求: account_id={request.account_id}, 数据量={len(request.data)}") + return SectionDataService.batch_import(db, request.account_id, request.data) + +@router.post("/query") +async def query( + request: SectionDataQuery, + db: Session = Depends(get_tunnel_db) +): + """查询断面数据""" + logger.info(f"断面数据查询请求: {request}") + items, total = SectionDataService.query(db, request) + return { + "total": total, + "page": request.page, + "page_size": request.page_size, + "items": items + } + +@router.post("/query_by_department") +async def query_by_department( + request: SectionDataQuery, + db: Session = Depends(get_tunnel_db) +): + """根据department_id查询断面数据""" + logger.info(f"根据department_id查询断面数据: {request}") + params = SectionDataQuery( + account_id=request.account_id, + department_id=request.department_id, + page=request.page, + page_size=request.page_size + ) + items, total = SectionDataService.query(db, params) + return { + "total": total, + "page": request.page, + "page_size": request.page_size, + "items": items + } diff --git a/app/api/work_area.py b/app/api/work_area.py new file mode 100644 index 0000000..1805d17 --- /dev/null +++ b/app/api/work_area.py @@ -0,0 +1,35 @@ +"""工区数据接口""" +from fastapi import APIRouter, Depends +from sqlalchemy.orm import Session +from app.core.database import get_tunnel_db +from app.core.logging_config import get_logger +from app.schemas.work_area import WorkAreaBatchImport, WorkAreaQuery +from app.schemas.common import BatchImportResponse, PageResponse +from app.servives.work_area_service import WorkAreaService + +router = APIRouter(prefix="/work_area", tags=["工区数据"]) +logger = get_logger(__name__) + +@router.post("/import", response_model=BatchImportResponse) +async def batch_import( + request: WorkAreaBatchImport, + db: Session = Depends(get_tunnel_db) +): + """批量导入工区数据""" + logger.info(f"工区数据导入请求: account_id={request.account_id}, 数据量={len(request.data)}") + return WorkAreaService.batch_import(db, request.account_id, request.data) + +@router.post("/query") +async def query( + request: WorkAreaQuery, + db: Session = Depends(get_tunnel_db) +): + """查询工区数据""" + logger.info(f"工区数据查询请求: {request}") + items, total = WorkAreaService.query(db, request) + return { + "total": total, + "page": request.page, + "page_size": request.page_size, + "items": items + } diff --git a/app/core/__init__.py b/app/core/__init__.py new file mode 100644 index 0000000..081ce13 --- /dev/null +++ b/app/core/__init__.py @@ -0,0 +1,3 @@ +from .config import settings +from .database import get_railway_db, get_tunnel_db, railway_engine, tunnel_engine +from .logging_config import setup_logging, get_logger diff --git a/app/core/config.py b/app/core/config.py new file mode 100644 index 0000000..5d68533 --- /dev/null +++ b/app/core/config.py @@ -0,0 +1,40 @@ +from pydantic_settings import BaseSettings +from functools import lru_cache + +class Settings(BaseSettings): + # 应用配置 + APP_HOST: str = "0.0.0.0" + APP_PORT: int = 8000 + APP_DEBUG: bool = True + + # Railway数据库配置 + RAILWAY_DB_HOST: str = "localhost" + RAILWAY_DB_PORT: int = 3306 + RAILWAY_DB_USER: str = "root" + RAILWAY_DB_PASSWORD: str = "" + RAILWAY_DB_NAME: str = "railway" + + # Tunnel数据库配置 + TUNNEL_DB_HOST: str = "localhost" + TUNNEL_DB_PORT: int = 3306 + TUNNEL_DB_USER: str = "root" + TUNNEL_DB_PASSWORD: str = "" + TUNNEL_DB_NAME: str = "Tunnel" + + @property + def RAILWAY_DATABASE_URL(self) -> str: + return f"mysql+pymysql://{self.RAILWAY_DB_USER}:{self.RAILWAY_DB_PASSWORD}@{self.RAILWAY_DB_HOST}:{self.RAILWAY_DB_PORT}/{self.RAILWAY_DB_NAME}?charset=utf8mb4" + + @property + def TUNNEL_DATABASE_URL(self) -> str: + return f"mysql+pymysql://{self.TUNNEL_DB_USER}:{self.TUNNEL_DB_PASSWORD}@{self.TUNNEL_DB_HOST}:{self.TUNNEL_DB_PORT}/{self.TUNNEL_DB_NAME}?charset=utf8mb4" + + class Config: + env_file = ".env" + extra = "ignore" + +@lru_cache() +def get_settings(): + return Settings() + +settings = get_settings() diff --git a/app/core/database.py b/app/core/database.py new file mode 100644 index 0000000..2920be5 --- /dev/null +++ b/app/core/database.py @@ -0,0 +1,58 @@ +from sqlalchemy import create_engine, text, MetaData +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import QueuePool +from .config import settings + +# Railway数据库引擎(账号表) +railway_engine = create_engine( + settings.RAILWAY_DATABASE_URL, + poolclass=QueuePool, + pool_pre_ping=True, + echo=False, + pool_size=20, + max_overflow=30, + pool_timeout=60, + pool_recycle=3600, + pool_reset_on_return='commit' +) + +# Tunnel数据库引擎(业务数据表) +tunnel_engine = create_engine( + settings.TUNNEL_DATABASE_URL, + poolclass=QueuePool, + pool_pre_ping=True, + echo=False, + pool_size=100, + max_overflow=200, + pool_timeout=60, + pool_recycle=3600, + pool_reset_on_return='commit' +) + +RailwaySessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=railway_engine) +TunnelSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=tunnel_engine) + +RailwayBase = declarative_base() +TunnelBase = declarative_base() + +def get_railway_db(): + """Railway数据库依赖注入""" + db = RailwaySessionLocal() + try: + yield db + finally: + db.close() + +def get_tunnel_db(): + """Tunnel数据库依赖注入""" + db = TunnelSessionLocal() + try: + yield db + finally: + db.close() + +def init_db(): + """初始化数据库""" + RailwayBase.metadata.create_all(bind=railway_engine) + TunnelBase.metadata.create_all(bind=tunnel_engine) diff --git a/app/core/db_monitor.py b/app/core/db_monitor.py new file mode 100644 index 0000000..7c2eca4 --- /dev/null +++ b/app/core/db_monitor.py @@ -0,0 +1,165 @@ +"""数据库连接池监控模块""" +import time +import threading +from typing import Dict, Any +from datetime import datetime +from sqlalchemy import event +from sqlalchemy.engine import Engine +from .logging_config import get_logger + +logger = get_logger(__name__) +db_logger = get_logger("sqlalchemy.engine") + +# 全局监控数据 +_pool_stats = { + 'total_connections': 0, + 'checked_in': 0, + 'checked_out': 0, + 'overflow': 0, + 'slow_queries': [], + 'connection_errors': [], + 'peak_connections': 0, + 'last_reset': datetime.now() +} + +_alert_thresholds = { + 'pool_usage_percent': 80, + 'slow_query_time': 5.0, + 'alert_cooldown': 300 +} + +_last_alerts = {} +_engines = [] + +def register_engine(engine): + """注册引擎用于监控""" + _engines.append(engine) + +def get_pool_status() -> Dict[str, Any]: + """获取连接池状态""" + stats = { + 'total': 0, + 'checked_in': 0, + 'checked_out': 0, + 'overflow': 0, + 'usage_percent': 0 + } + + for engine in _engines: + if hasattr(engine.pool, 'size'): + pool = engine.pool + stats['total'] += pool.size() if callable(pool.size) else pool.size + stats['checked_in'] += pool.checkedin() if hasattr(pool, 'checkedin') else 0 + stats['checked_out'] += pool.checkedout() if hasattr(pool, 'checkedout') else 0 + stats['overflow'] += pool.overflow() if hasattr(pool, 'overflow') else 0 + + if stats['total'] > 0: + stats['usage_percent'] = round((stats['checked_out'] / stats['total']) * 100, 2) + + if stats['checked_out'] > _pool_stats['peak_connections']: + _pool_stats['peak_connections'] = stats['checked_out'] + + return stats + +def check_pool_alerts(): + """检查连接池告警""" + current_time = time.time() + stats = get_pool_status() + + if stats.get('usage_percent', 0) >= _alert_thresholds['pool_usage_percent']: + alert_key = 'pool_usage' + if alert_key not in _last_alerts or (current_time - _last_alerts.get(alert_key, 0)) > _alert_thresholds['alert_cooldown']: + db_logger.warning( + f"数据库连接池告警: 使用率 {stats['usage_percent']}% 超过阈值 " + f"(已使用: {stats['checked_out']}/{stats['total']})" + ) + _last_alerts[alert_key] = current_time + +def log_slow_query(sql: str, duration: float): + """记录慢查询""" + _pool_stats['slow_queries'].append({ + 'sql': sql[:200] if len(sql) > 200 else sql, + 'duration': duration, + 'timestamp': time.time() + }) + if len(_pool_stats['slow_queries']) > 1000: + _pool_stats['slow_queries'] = _pool_stats['slow_queries'][-1000:] + +def log_connection_error(error: str): + """记录连接错误""" + _pool_stats['connection_errors'].append({ + 'error': error, + 'timestamp': time.time() + }) + if len(_pool_stats['connection_errors']) > 100: + _pool_stats['connection_errors'] = _pool_stats['connection_errors'][-100:] + +def get_monitoring_report() -> Dict[str, Any]: + """获取监控报告""" + stats = get_pool_status() + current_time = time.time() + + recent_slow_queries = [q for q in _pool_stats['slow_queries'] if (current_time - q['timestamp']) < 300] + recent_errors = [e for e in _pool_stats['connection_errors'] if (current_time - e['timestamp']) < 300] + + return { + 'timestamp': datetime.now().isoformat(), + 'pool_status': stats, + 'peak_connections': _pool_stats['peak_connections'], + 'recent_5min': { + 'slow_queries_count': len(recent_slow_queries), + 'connection_errors_count': len(recent_errors) + }, + 'last_reset': _pool_stats['last_reset'].isoformat() + } + +def monitoring_task(): + """定时监控任务""" + while True: + try: + check_pool_alerts() + time.sleep(30) + except Exception as e: + db_logger.error(f"数据库监控任务异常: {e}") + time.sleep(60) + +def start_monitoring(): + """启动后台监控""" + # 延迟导入避免循环依赖 + from .database import railway_engine, tunnel_engine + register_engine(railway_engine) + register_engine(tunnel_engine) + + monitor_thread = threading.Thread(target=monitoring_task, daemon=True) + monitor_thread.start() + db_logger.info("数据库连接池监控已启动") + +# SQL执行时间监控 +_query_start_times = {} + +@event.listens_for(Engine, "before_cursor_execute") +def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany): + _query_start_times[id(cursor)] = time.time() + +@event.listens_for(Engine, "after_cursor_execute") +def receive_after_cursor_execute(conn, cursor, statement, params, context, executemany): + start_time = _query_start_times.pop(id(cursor), None) + if start_time: + duration = time.time() - start_time + if duration >= _alert_thresholds['slow_query_time']: + log_slow_query(statement, duration) + db_logger.warning(f"慢查询: {duration:.2f}s - {statement[:100]}...") + +@event.listens_for(Engine, "handle_error") +def receive_handle_error(exception_context): + error_msg = str(exception_context.original_exception) + log_connection_error(error_msg) + db_logger.error(f"数据库错误: {error_msg}") + +def log_pool_status(): + """记录连接池状态到日志""" + stats = get_pool_status() + db_logger.info( + f"数据库连接池状态: 使用率 {stats['usage_percent']}% " + f"(已用: {stats['checked_out']}, 空闲: {stats['checked_in']}, 总计: {stats['total']})" + ) diff --git a/app/core/logging_config.py b/app/core/logging_config.py new file mode 100644 index 0000000..fed3087 --- /dev/null +++ b/app/core/logging_config.py @@ -0,0 +1,128 @@ +import logging +import logging.handlers +import os +from datetime import datetime +from pathlib import Path + +def setup_logging(): + """配置日志系统""" + # 创建logs目录 + log_dir = Path("logs") + log_dir.mkdir(exist_ok=True) + + # 配置根日志记录器 + root_logger = logging.getLogger() + root_logger.setLevel(logging.INFO) + + # 清除现有的处理器 + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + + # 创建格式化器 + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # 1. 控制台处理器 + console_handler = logging.StreamHandler() + console_handler.setLevel(logging.INFO) + console_handler.setFormatter(formatter) + root_logger.addHandler(console_handler) + + # 2. 应用日志文件处理器(按日期滚动) + app_log_file = log_dir / "app.log" + app_handler = logging.handlers.TimedRotatingFileHandler( + app_log_file, + when='midnight', + interval=1, + backupCount=30, + encoding='utf-8' + ) + app_handler.setLevel(logging.INFO) + app_handler.setFormatter(formatter) + root_logger.addHandler(app_handler) + + # 3. 错误日志文件处理器 + error_log_file = log_dir / "error.log" + error_handler = logging.handlers.TimedRotatingFileHandler( + error_log_file, + when='midnight', + interval=1, + backupCount=30, + encoding='utf-8' + ) + error_handler.setLevel(logging.ERROR) + error_handler.setFormatter(formatter) + root_logger.addHandler(error_handler) + + # 4. 任务调度器专用日志处理器 + scheduler_log_file = log_dir / "scheduler.log" + scheduler_handler = logging.handlers.TimedRotatingFileHandler( + scheduler_log_file, + when='midnight', + interval=1, + backupCount=30, + encoding='utf-8' + ) + scheduler_handler.setLevel(logging.INFO) + scheduler_handler.setFormatter(formatter) + + # 为调度器设置专用logger + scheduler_logger = logging.getLogger('apscheduler') + scheduler_logger.addHandler(scheduler_handler) + scheduler_logger.setLevel(logging.INFO) + + # 为应用调度器模块设置专用logger + app_scheduler_logger = logging.getLogger('app.utils.scheduler') + app_scheduler_logger.addHandler(scheduler_handler) + app_scheduler_logger.setLevel(logging.INFO) + + # 5. API访问日志处理器 + access_log_file = log_dir / "access.log" + access_handler = logging.handlers.TimedRotatingFileHandler( + access_log_file, + when='midnight', + interval=1, + backupCount=30, + encoding='utf-8' + ) + access_formatter = logging.Formatter( + '%(asctime)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + access_handler.setFormatter(access_formatter) + + # 为uvicorn访问日志设置处理器 + uvicorn_access_logger = logging.getLogger("uvicorn.access") + uvicorn_access_logger.addHandler(access_handler) + uvicorn_access_logger.setLevel(logging.INFO) + + # 6. 数据库日志处理器 + db_log_file = log_dir / "database.log" + db_handler = logging.handlers.TimedRotatingFileHandler( + db_log_file, + when='midnight', + interval=1, + backupCount=30, + encoding='utf-8' + ) + db_handler.setLevel(logging.INFO) + db_handler.setFormatter(formatter) + + # 为SQLAlchemy设置日志 + sqlalchemy_logger = logging.getLogger('sqlalchemy.engine') + sqlalchemy_logger.addHandler(db_handler) + sqlalchemy_logger.setLevel(logging.WARNING) # 只记录警告和错误 + + print(f"日志系统已配置,日志文件保存在: {log_dir.absolute()}") + print("日志文件:") + print(f" - 应用日志: {app_log_file}") + print(f" - 错误日志: {error_log_file}") + print(f" - 调度器日志: {scheduler_log_file}") + print(f" - 访问日志: {access_log_file}") + print(f" - 数据库日志: {db_log_file}") + +def get_logger(name: str = None): + """获取日志记录器""" + return logging.getLogger(name or __name__) \ No newline at end of file diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..0643f61 --- /dev/null +++ b/app/main.py @@ -0,0 +1,106 @@ +from fastapi import FastAPI, Request +from fastapi.middleware.cors import CORSMiddleware +from contextlib import asynccontextmanager +import time + +from .core.config import settings +from .core.logging_config import setup_logging, get_logger +from .core.database import init_db +from .core.db_monitor import start_monitoring, get_pool_status +from .api.work_area import router as work_area_router +from .api.section_data import router as section_data_router +from .api.checkpoint import router as checkpoint_router +from .api.measurement_data import router as measurement_data_router + +# 初始化日志系统 +setup_logging() +logger = get_logger(__name__) +access_logger = get_logger("uvicorn.access") + +@asynccontextmanager +async def lifespan(app: FastAPI): + """应用生命周期管理""" + logger.info("应用启动中...") + + try: + init_db() + logger.info("数据库初始化完成") + except Exception as e: + logger.error(f"数据库初始化失败: {e}") + + try: + start_monitoring() + logger.info("数据库连接池监控已启动") + pool_stats = get_pool_status() + logger.info(f"初始连接池状态: {pool_stats}") + except Exception as e: + logger.error(f"数据库监控启动失败: {e}") + + yield + + logger.info("应用关闭中...") + try: + pool_stats = get_pool_status() + logger.info(f"最终连接池状态: {pool_stats}") + except Exception as e: + logger.error(f"获取最终连接池状态失败: {e}") + +# 创建FastAPI应用 +app = FastAPI( + title="工程围岩数据信息处理系统", + description="基于FastAPI、MySQL、SQLAlchemy的工程围岩数据信息处理系统", + version="1.0.0", + lifespan=lifespan +) + +# 配置CORS +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# 访问日志中间件 +@app.middleware("http") +async def log_requests(request: Request, call_next): + start_time = time.time() + response = await call_next(request) + process_time = time.time() - start_time + access_logger.info( + f"{request.method} {request.url.path} - {response.status_code} - {process_time:.3f}s" + ) + return response + +# 注册路由 +app.include_router(work_area_router, prefix="/api") +app.include_router(section_data_router, prefix="/api") +app.include_router(checkpoint_router, prefix="/api") +app.include_router(measurement_data_router, prefix="/api") + +@app.get("/") +async def root(): + """根路径""" + return { + "message": "工程围岩数据信息处理系统 API", + "version": "1.0.0", + "docs": "/docs" + } + +@app.get("/health") +async def health_check(): + """健康检查""" + return {"status": "healthy", "database": "connected"} + +@app.get("/api/monitor/pool") +async def get_pool_monitor(): + """获取连接池状态""" + return get_pool_status() + +@app.exception_handler(Exception) +async def global_exception_handler(request: Request, exc: Exception): + """全局异常处理""" + import traceback + logger.error(f"全局异常: {type(exc).__name__}: {exc}\n{traceback.format_exc()}") + return {"detail": f"服务器内部错误: {type(exc).__name__}", "status_code": 500} diff --git a/app/models/__init__.py b/app/models/__init__.py new file mode 100644 index 0000000..b65abce --- /dev/null +++ b/app/models/__init__.py @@ -0,0 +1,5 @@ +from .account import Account +from .work_area import WorkArea +from .section_data import SectionData +from .checkpoint import Checkpoint +from .measurement_data import MeasurementData diff --git a/app/models/account.py b/app/models/account.py new file mode 100644 index 0000000..9151f16 --- /dev/null +++ b/app/models/account.py @@ -0,0 +1,18 @@ +from sqlalchemy import Column, Integer, String, DateTime, func +from app.core.database import RailwayBase + +class Account(RailwayBase): + """账号表 - railway数据库""" + __tablename__ = "accounts" + + id = Column(Integer, primary_key=True, autoincrement=True) + username = Column(String(100), nullable=False, comment="账号") + password = Column(String(100), nullable=False, comment="密码") + status = Column(String(100), nullable=False, default="1", comment="状态: 1-正常, 0-禁用") + today_updated = Column(String(100), default="0", comment="0->待处理,1->在抓取,2->抓取错误") + project_name = Column(String(100), comment="标段") + created_at = Column(DateTime, nullable=False, server_default=func.now()) + updated_at = Column(DateTime, nullable=False, server_default=func.now(), onupdate=func.now()) + update_time = Column(String(1000), comment="更新时间跨度") + max_variation = Column(Integer, nullable=False, default=1, comment="变化量的绝对值,单位是毫米") + yh_id = Column(String(100), comment="永恒一号id") diff --git a/app/models/checkpoint.py b/app/models/checkpoint.py new file mode 100644 index 0000000..37d24d3 --- /dev/null +++ b/app/models/checkpoint.py @@ -0,0 +1,30 @@ +from sqlalchemy import Column, Integer, String, DateTime + +class Checkpoint: + """观测点数据表结构定义 - 动态分表""" + + @staticmethod + def get_table_name(account_id: int) -> str: + return f"checkpoint_{account_id}" + + @staticmethod + def get_create_sql(account_id: int) -> str: + table_name = Checkpoint.get_table_name(account_id) + return f""" + CREATE TABLE IF NOT EXISTS `{table_name}` ( + `id` int unsigned NOT NULL AUTO_INCREMENT, + `name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '观察点名称', + `burial_date` datetime DEFAULT NULL COMMENT '埋设日期', + `objstate` varchar(3) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '测点状态', + `monitoring_type` varchar(100) DEFAULT NULL COMMENT '监测类型', + `period_number` int DEFAULT NULL COMMENT '量测期次', + `first_time` datetime DEFAULT NULL COMMENT '首测时间', + `manufacturer` varchar(100) DEFAULT NULL COMMENT '负责厂商', + `point_code` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '测点编码', + `point_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '观察点id', + `section_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '所属断面id', + PRIMARY KEY (`id`), + KEY `idx_point_id` (`point_id`), + KEY `idx_section_id` (`section_id`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='观测点数据表'; + """ diff --git a/app/models/measurement_data.py b/app/models/measurement_data.py new file mode 100644 index 0000000..54cdbde --- /dev/null +++ b/app/models/measurement_data.py @@ -0,0 +1,29 @@ +from sqlalchemy import Column, BigInteger, String, DateTime + +class MeasurementData: + """量测数据表结构定义 - 动态分表""" + + @staticmethod + def get_table_name(account_id: int) -> str: + return f"measurement_data_{account_id}" + + @staticmethod + def get_create_sql(account_id: int) -> str: + table_name = MeasurementData.get_table_name(account_id) + return f""" + CREATE TABLE IF NOT EXISTS `{table_name}` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `monitoring_time` datetime DEFAULT NULL COMMENT '监测时间', + `upload_time` datetime DEFAULT NULL COMMENT '上传时间', + `monitoring_value` varchar(100) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '监测值(m)', + `deformation_value` varchar(100) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '变形值(mm)', + `time_interval` varchar(100) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '时间间隔(h)', + `cumulative_deformation` varchar(100) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '累计变形量(mm)', + `deformation_rate` varchar(100) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '变形速率(mm/d)', + `distance_working_face` varchar(100) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '距掌子面距离(m)', + `point_id` varchar(100) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '测点id', + PRIMARY KEY (`id`), + KEY `idx_point_id` (`point_id`), + KEY `idx_monitoring_time` (`monitoring_time`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='量测数据'; + """ diff --git a/app/models/section_data.py b/app/models/section_data.py new file mode 100644 index 0000000..09d5c5b --- /dev/null +++ b/app/models/section_data.py @@ -0,0 +1,33 @@ +from sqlalchemy import Column, Integer, String, DECIMAL + +class SectionData: + """断面数据表结构定义 - 动态分表""" + + @staticmethod + def get_table_name(account_id: int) -> str: + return f"section_data_{account_id}" + + @staticmethod + def get_create_sql(account_id: int) -> str: + table_name = SectionData.get_table_name(account_id) + return f""" + CREATE TABLE IF NOT EXISTS `{table_name}` ( + `id` int NOT NULL AUTO_INCREMENT, + `project` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '项目', + `mileage` varchar(100) NOT NULL COMMENT '断面里程', + `name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '断面名称', + `number` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '断面编码', + `status` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '断面状态', + `excavation_method` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '开挖方法', + `rock_mass_classification` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '围岩级别', + `width` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '断面宽度(m)', + `U0` decimal(10,4) DEFAULT NULL COMMENT 'U0(mm)', + `remarks` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT 'U0备注', + `department_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '工区/工点id', + `section_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '断面id', + PRIMARY KEY (`id`), + KEY `idx_section_id` (`section_id`), + KEY `idx_department_id` (`department_id`), + KEY `idx_number` (`number`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='断面数据表'; + """ diff --git a/app/models/work_area.py b/app/models/work_area.py new file mode 100644 index 0000000..8a65992 --- /dev/null +++ b/app/models/work_area.py @@ -0,0 +1,33 @@ +from sqlalchemy import Column, BigInteger, String + +class WorkArea: + """工区表结构定义 - 动态分表""" + + @staticmethod + def get_table_name(account_id: int) -> str: + return f"work_area_{account_id}" + + @staticmethod + def get_columns(): + return { + "id": Column(BigInteger, primary_key=True, autoincrement=True), + "department_id": Column(String(100), comment="标段/工区/工点id"), + "parent_id": Column(String(100), comment="父id"), + "type": Column(String(100), comment="类型"), + "name": Column(String(100), comment="名称"), + } + + @staticmethod + def get_create_sql(account_id: int) -> str: + table_name = WorkArea.get_table_name(account_id) + return f""" + CREATE TABLE IF NOT EXISTS `{table_name}` ( + `id` bigint NOT NULL AUTO_INCREMENT, + `department_id` varchar(100) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '标段/工区/工点id', + `parent_id` varchar(100) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '父id', + `type` varchar(100) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '类型', + `name` varchar(100) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '名称', + PRIMARY KEY (`id`), + KEY `idx_department_id` (`department_id`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='标段/工区/工点信息表'; + """ diff --git a/app/schemas/__init__.py b/app/schemas/__init__.py new file mode 100644 index 0000000..5ae8958 --- /dev/null +++ b/app/schemas/__init__.py @@ -0,0 +1,5 @@ +from .work_area import WorkAreaCreate, WorkAreaResponse, WorkAreaBatchImport +from .section_data import SectionDataCreate, SectionDataResponse, SectionDataBatchImport +from .checkpoint import CheckpointCreate, CheckpointResponse, CheckpointBatchImport, CheckpointWithSection +from .measurement_data import MeasurementDataCreate, MeasurementDataResponse, MeasurementDataBatchImport, MeasurementDataWithDetail +from .common import PageParams, PageResponse, BatchImportResponse diff --git a/app/schemas/checkpoint.py b/app/schemas/checkpoint.py new file mode 100644 index 0000000..b0c9b30 --- /dev/null +++ b/app/schemas/checkpoint.py @@ -0,0 +1,62 @@ +from pydantic import BaseModel +from typing import Optional, List +from datetime import datetime + +class CheckpointCreate(BaseModel): + """观测点创建""" + point_id: str + section_id: str + name: str + burial_date: Optional[datetime] = None + objstate: Optional[str] = None + monitoring_type: Optional[str] = None + period_number: Optional[int] = None + first_time: Optional[datetime] = None + manufacturer: Optional[str] = None + point_code: Optional[str] = None + +class CheckpointResponse(BaseModel): + """观测点响应""" + id: int + point_id: Optional[str] = None + section_id: str + name: str + burial_date: Optional[datetime] = None + objstate: Optional[str] = None + monitoring_type: Optional[str] = None + period_number: Optional[int] = None + first_time: Optional[datetime] = None + manufacturer: Optional[str] = None + point_code: Optional[str] = None + +class CheckpointWithSection(BaseModel): + """观测点响应(包含断面信息)""" + id: int + point_id: Optional[str] = None + section_id: str + name: str + burial_date: Optional[datetime] = None + objstate: Optional[str] = None + monitoring_type: Optional[str] = None + period_number: Optional[int] = None + first_time: Optional[datetime] = None + manufacturer: Optional[str] = None + point_code: Optional[str] = None + # 断面信息 + section_mileage: Optional[str] = None + rock_mass_classification: Optional[str] = None + +class CheckpointBatchImport(BaseModel): + """观测点批量导入""" + account_id: int + data: List[CheckpointCreate] + +class CheckpointQuery(BaseModel): + """观测点查询参数""" + account_id: int + department_id: Optional[str] = None + section_id: Optional[str] = None + point_id: Optional[str] = None + name: Optional[str] = None + page: int = 1 + page_size: int = 20 diff --git a/app/schemas/common.py b/app/schemas/common.py new file mode 100644 index 0000000..d77dcb9 --- /dev/null +++ b/app/schemas/common.py @@ -0,0 +1,25 @@ +from pydantic import BaseModel +from typing import Optional, List, Any, Generic, TypeVar + +T = TypeVar('T') + +class PageParams(BaseModel): + """分页参数""" + page: int = 1 + page_size: int = 20 + +class PageResponse(BaseModel, Generic[T]): + """分页响应""" + total: int + page: int + page_size: int + items: List[T] + +class BatchImportResponse(BaseModel): + """批量导入响应""" + success: bool + total: int + inserted: int + skipped: int + skipped_ids: List[str] = [] + message: str diff --git a/app/schemas/measurement_data.py b/app/schemas/measurement_data.py new file mode 100644 index 0000000..6a8a859 --- /dev/null +++ b/app/schemas/measurement_data.py @@ -0,0 +1,61 @@ +from pydantic import BaseModel +from typing import Optional, List +from datetime import datetime + +class MeasurementDataCreate(BaseModel): + """量测数据创建""" + point_id: str + monitoring_time: Optional[datetime] = None + upload_time: Optional[datetime] = None + monitoring_value: Optional[str] = None + deformation_value: Optional[str] = None + time_interval: Optional[str] = None + cumulative_deformation: Optional[str] = None + deformation_rate: Optional[str] = None + distance_working_face: Optional[str] = None + +class MeasurementDataResponse(BaseModel): + """量测数据响应""" + id: int + point_id: Optional[str] = None + monitoring_time: Optional[datetime] = None + upload_time: Optional[datetime] = None + monitoring_value: Optional[str] = None + deformation_value: Optional[str] = None + time_interval: Optional[str] = None + cumulative_deformation: Optional[str] = None + deformation_rate: Optional[str] = None + distance_working_face: Optional[str] = None + +class MeasurementDataWithDetail(BaseModel): + """量测数据响应(包含断面和观测点信息)""" + id: int + point_id: Optional[str] = None + monitoring_time: Optional[datetime] = None + upload_time: Optional[datetime] = None + monitoring_value: Optional[str] = None + deformation_value: Optional[str] = None + time_interval: Optional[str] = None + cumulative_deformation: Optional[str] = None + deformation_rate: Optional[str] = None + distance_working_face: Optional[str] = None + # 观测点信息 + point_name: Optional[str] = None + # 断面信息 + section_mileage: Optional[str] = None + rock_mass_classification: Optional[str] = None + +class MeasurementDataBatchImport(BaseModel): + """量测数据批量导入""" + account_id: int + data: List[MeasurementDataCreate] + +class MeasurementDataQuery(BaseModel): + """量测数据查询参数""" + account_id: int + department_id: Optional[str] = None + point_id: Optional[str] = None + monitoring_time_start: Optional[datetime] = None + monitoring_time_end: Optional[datetime] = None + page: int = 1 + page_size: int = 20 diff --git a/app/schemas/section_data.py b/app/schemas/section_data.py new file mode 100644 index 0000000..0d9e5ea --- /dev/null +++ b/app/schemas/section_data.py @@ -0,0 +1,49 @@ +from pydantic import BaseModel +from typing import Optional, List +from decimal import Decimal + +class SectionDataCreate(BaseModel): + """断面数据创建""" + section_id: str + department_id: Optional[str] = None + project: Optional[str] = None + mileage: str + name: Optional[str] = None + number: Optional[str] = None + status: Optional[str] = None + excavation_method: Optional[str] = None + rock_mass_classification: Optional[str] = None + width: Optional[str] = None + U0: Optional[Decimal] = None + remarks: Optional[str] = None + +class SectionDataResponse(BaseModel): + """断面数据响应""" + id: int + section_id: str + department_id: Optional[str] = None + project: Optional[str] = None + mileage: str + name: Optional[str] = None + number: Optional[str] = None + status: Optional[str] = None + excavation_method: Optional[str] = None + rock_mass_classification: Optional[str] = None + width: Optional[str] = None + U0: Optional[Decimal] = None + remarks: Optional[str] = None + +class SectionDataBatchImport(BaseModel): + """断面数据批量导入""" + account_id: int + data: List[SectionDataCreate] + +class SectionDataQuery(BaseModel): + """断面数据查询参数""" + account_id: int + department_id: Optional[str] = None + section_id: Optional[str] = None + name: Optional[str] = None + number: Optional[str] = None + page: int = 1 + page_size: int = 20 diff --git a/app/schemas/work_area.py b/app/schemas/work_area.py new file mode 100644 index 0000000..aaab744 --- /dev/null +++ b/app/schemas/work_area.py @@ -0,0 +1,32 @@ +from pydantic import BaseModel +from typing import Optional, List + +class WorkAreaCreate(BaseModel): + """工区创建""" + department_id: str + parent_id: Optional[str] = None + type: Optional[str] = None + name: Optional[str] = None + +class WorkAreaResponse(BaseModel): + """工区响应""" + id: int + department_id: Optional[str] = None + parent_id: Optional[str] = None + type: Optional[str] = None + name: Optional[str] = None + +class WorkAreaBatchImport(BaseModel): + """工区批量导入""" + account_id: int + data: List[WorkAreaCreate] + +class WorkAreaQuery(BaseModel): + """工区查询参数""" + account_id: int + department_id: Optional[str] = None + parent_id: Optional[str] = None + type: Optional[str] = None + name: Optional[str] = None + page: int = 1 + page_size: int = 20 diff --git a/app/servives/__init__.py b/app/servives/__init__.py new file mode 100644 index 0000000..b1f612b --- /dev/null +++ b/app/servives/__init__.py @@ -0,0 +1,5 @@ +from .table_manager import TableManager +from .work_area_service import WorkAreaService +from .section_data_service import SectionDataService +from .checkpoint_service import CheckpointService +from .measurement_data_service import MeasurementDataService diff --git a/app/servives/checkpoint_service.py b/app/servives/checkpoint_service.py new file mode 100644 index 0000000..cab8fa6 --- /dev/null +++ b/app/servives/checkpoint_service.py @@ -0,0 +1,186 @@ +"""观测点数据服务""" +from typing import List, Dict, Tuple +from sqlalchemy import text +from sqlalchemy.orm import Session +from app.core.logging_config import get_logger +from app.models.checkpoint import Checkpoint +from app.models.section_data import SectionData +from app.schemas.checkpoint import CheckpointCreate, CheckpointQuery +from app.schemas.common import BatchImportResponse +from .table_manager import TableManager + +logger = get_logger(__name__) + +class CheckpointService: + """观测点数据服务""" + + @staticmethod + def batch_import(db: Session, account_id: int, data: List[CheckpointCreate]) -> BatchImportResponse: + """批量导入观测点数据""" + table_name = Checkpoint.get_table_name(account_id) + + if not TableManager.ensure_table_exists(db, "checkpoint", account_id): + return BatchImportResponse( + success=False, total=len(data), inserted=0, skipped=0, + message="创建表失败" + ) + + # 获取已存在的point_id + point_ids = [item.point_id for item in data if item.point_id] + existing_ids = set() + if point_ids: + placeholders = ",".join([f":id_{i}" for i in range(len(point_ids))]) + params = {f"id_{i}": pid for i, pid in enumerate(point_ids)} + result = db.execute( + text(f"SELECT point_id FROM {table_name} WHERE point_id IN ({placeholders})"), + params + ) + existing_ids = {row[0] for row in result.fetchall()} + + to_insert = [] + skipped_ids = [] + for item in data: + if item.point_id in existing_ids: + skipped_ids.append(item.point_id) + else: + to_insert.append(item) + existing_ids.add(item.point_id) + + if to_insert: + try: + values = [] + params = {} + for i, item in enumerate(to_insert): + values.append(f"(:name_{i}, :burial_date_{i}, :objstate_{i}, :monitoring_type_{i}, " + f":period_number_{i}, :first_time_{i}, :manufacturer_{i}, " + f":point_code_{i}, :point_id_{i}, :section_id_{i})") + params[f"name_{i}"] = item.name + params[f"burial_date_{i}"] = item.burial_date + params[f"objstate_{i}"] = item.objstate + params[f"monitoring_type_{i}"] = item.monitoring_type + params[f"period_number_{i}"] = item.period_number + params[f"first_time_{i}"] = item.first_time + params[f"manufacturer_{i}"] = item.manufacturer + params[f"point_code_{i}"] = item.point_code + params[f"point_id_{i}"] = item.point_id + params[f"section_id_{i}"] = item.section_id + + sql = f"""INSERT INTO {table_name} + (name, burial_date, objstate, monitoring_type, period_number, + first_time, manufacturer, point_code, point_id, section_id) + VALUES {','.join(values)}""" + db.execute(text(sql), params) + db.commit() + logger.info(f"观测点数据导入成功: account_id={account_id}, 插入={len(to_insert)}, 跳过={len(skipped_ids)}") + except Exception as e: + db.rollback() + logger.error(f"观测点数据导入失败: {e}") + return BatchImportResponse( + success=False, total=len(data), inserted=0, skipped=len(skipped_ids), + skipped_ids=skipped_ids, message=f"插入失败: {str(e)}" + ) + + return BatchImportResponse( + success=True, total=len(data), inserted=len(to_insert), skipped=len(skipped_ids), + skipped_ids=skipped_ids, message="导入成功" + ) + + @staticmethod + def query(db: Session, params: CheckpointQuery) -> Tuple[List[Dict], int]: + """查询观测点数据""" + table_name = Checkpoint.get_table_name(params.account_id) + + if not TableManager.ensure_table_exists(db, "checkpoint", params.account_id): + return [], 0 + + conditions = [] + query_params = {} + + if params.section_id: + conditions.append("section_id = :section_id") + query_params["section_id"] = params.section_id + if params.point_id: + conditions.append("point_id = :point_id") + query_params["point_id"] = params.point_id + if params.name: + conditions.append("name LIKE :name") + query_params["name"] = f"%{params.name}%" + + where_clause = " AND ".join(conditions) if conditions else "1=1" + + count_sql = f"SELECT COUNT(*) FROM {table_name} WHERE {where_clause}" + total = db.execute(text(count_sql), query_params).scalar() + + offset = (params.page - 1) * params.page_size + query_params["limit"] = params.page_size + query_params["offset"] = offset + + data_sql = f"SELECT * FROM {table_name} WHERE {where_clause} LIMIT :limit OFFSET :offset" + result = db.execute(text(data_sql), query_params) + items = [dict(row._mapping) for row in result.fetchall()] + + return items, total + + @staticmethod + def query_by_department(db: Session, account_id: int, department_id: str, + page: int = 1, page_size: int = 20) -> Tuple[List[Dict], int]: + """根据department_id查询观测点数据(包含断面信息)""" + checkpoint_table = Checkpoint.get_table_name(account_id) + section_table = SectionData.get_table_name(account_id) + + if not TableManager.ensure_table_exists(db, "checkpoint", account_id): + return [], 0 + if not TableManager.ensure_table_exists(db, "section_data", account_id): + return [], 0 + + # 先查询该department下的所有section_id + section_sql = f"SELECT section_id, mileage, rock_mass_classification FROM {section_table} WHERE department_id = :department_id" + section_result = db.execute(text(section_sql), {"department_id": department_id}) + section_map = {row[0]: {"mileage": row[1], "rock_mass_classification": row[2]} for row in section_result.fetchall()} + + if not section_map: + return [], 0 + + section_ids = list(section_map.keys()) + placeholders = ",".join([f":sid_{i}" for i in range(len(section_ids))]) + params = {f"sid_{i}": sid for i, sid in enumerate(section_ids)} + + # 查询总数 + count_sql = f"SELECT COUNT(*) FROM {checkpoint_table} WHERE section_id IN ({placeholders})" + total = db.execute(text(count_sql), params).scalar() + + # 分页查询 + offset = (page - 1) * page_size + params["limit"] = page_size + params["offset"] = offset + + data_sql = f"SELECT * FROM {checkpoint_table} WHERE section_id IN ({placeholders}) LIMIT :limit OFFSET :offset" + result = db.execute(text(data_sql), params) + + items = [] + for row in result.fetchall(): + item = dict(row._mapping) + section_info = section_map.get(item.get("section_id"), {}) + item["section_mileage"] = section_info.get("mileage") + item["rock_mass_classification"] = section_info.get("rock_mass_classification") + items.append(item) + + return items, total + + @staticmethod + def get_by_point_ids(db: Session, account_id: int, point_ids: List[str]) -> Dict[str, Dict]: + """根据point_id批量获取观测点数据""" + if not point_ids: + return {} + + table_name = Checkpoint.get_table_name(account_id) + if not TableManager.ensure_table_exists(db, "checkpoint", account_id): + return {} + + placeholders = ",".join([f":id_{i}" for i in range(len(point_ids))]) + params = {f"id_{i}": pid for i, pid in enumerate(point_ids)} + + sql = f"SELECT point_id, name, section_id FROM {table_name} WHERE point_id IN ({placeholders})" + result = db.execute(text(sql), params) + + return {row[0]: {"name": row[1], "section_id": row[2]} for row in result.fetchall()} diff --git a/app/servives/measurement_data_service.py b/app/servives/measurement_data_service.py new file mode 100644 index 0000000..68eaa92 --- /dev/null +++ b/app/servives/measurement_data_service.py @@ -0,0 +1,176 @@ +"""量测数据服务""" +from typing import List, Dict, Tuple +from sqlalchemy import text +from sqlalchemy.orm import Session +from app.core.logging_config import get_logger +from app.models.measurement_data import MeasurementData +from app.models.checkpoint import Checkpoint +from app.models.section_data import SectionData +from app.schemas.measurement_data import MeasurementDataCreate, MeasurementDataQuery +from app.schemas.common import BatchImportResponse +from .table_manager import TableManager + +logger = get_logger(__name__) + +class MeasurementDataService: + """量测数据服务""" + + @staticmethod + def batch_import(db: Session, account_id: int, data: List[MeasurementDataCreate]) -> BatchImportResponse: + """批量导入量测数据""" + table_name = MeasurementData.get_table_name(account_id) + + if not TableManager.ensure_table_exists(db, "measurement_data", account_id): + return BatchImportResponse( + success=False, total=len(data), inserted=0, skipped=0, + message="创建表失败" + ) + + # 量测数据不检查重复,直接插入 + if data: + try: + values = [] + params = {} + for i, item in enumerate(data): + values.append(f"(:monitoring_time_{i}, :upload_time_{i}, :monitoring_value_{i}, " + f":deformation_value_{i}, :time_interval_{i}, :cumulative_deformation_{i}, " + f":deformation_rate_{i}, :distance_working_face_{i}, :point_id_{i})") + params[f"monitoring_time_{i}"] = item.monitoring_time + params[f"upload_time_{i}"] = item.upload_time + params[f"monitoring_value_{i}"] = item.monitoring_value + params[f"deformation_value_{i}"] = item.deformation_value + params[f"time_interval_{i}"] = item.time_interval + params[f"cumulative_deformation_{i}"] = item.cumulative_deformation + params[f"deformation_rate_{i}"] = item.deformation_rate + params[f"distance_working_face_{i}"] = item.distance_working_face + params[f"point_id_{i}"] = item.point_id + + sql = f"""INSERT INTO {table_name} + (monitoring_time, upload_time, monitoring_value, deformation_value, + time_interval, cumulative_deformation, deformation_rate, distance_working_face, point_id) + VALUES {','.join(values)}""" + db.execute(text(sql), params) + db.commit() + logger.info(f"量测数据导入成功: account_id={account_id}, 插入={len(data)}") + except Exception as e: + db.rollback() + logger.error(f"量测数据导入失败: {e}") + return BatchImportResponse( + success=False, total=len(data), inserted=0, skipped=0, + message=f"插入失败: {str(e)}" + ) + + return BatchImportResponse( + success=True, total=len(data), inserted=len(data), skipped=0, + message="导入成功" + ) + + @staticmethod + def query(db: Session, params: MeasurementDataQuery) -> Tuple[List[Dict], int]: + """查询量测数据""" + table_name = MeasurementData.get_table_name(params.account_id) + + if not TableManager.ensure_table_exists(db, "measurement_data", params.account_id): + return [], 0 + + conditions = [] + query_params = {} + + if params.point_id: + conditions.append("point_id = :point_id") + query_params["point_id"] = params.point_id + if params.monitoring_time_start: + conditions.append("monitoring_time >= :monitoring_time_start") + query_params["monitoring_time_start"] = params.monitoring_time_start + if params.monitoring_time_end: + conditions.append("monitoring_time <= :monitoring_time_end") + query_params["monitoring_time_end"] = params.monitoring_time_end + + where_clause = " AND ".join(conditions) if conditions else "1=1" + + count_sql = f"SELECT COUNT(*) FROM {table_name} WHERE {where_clause}" + total = db.execute(text(count_sql), query_params).scalar() + + offset = (params.page - 1) * params.page_size + query_params["limit"] = params.page_size + query_params["offset"] = offset + + data_sql = f"SELECT * FROM {table_name} WHERE {where_clause} ORDER BY monitoring_time DESC LIMIT :limit OFFSET :offset" + result = db.execute(text(data_sql), query_params) + items = [dict(row._mapping) for row in result.fetchall()] + + return items, total + + @staticmethod + def query_by_department(db: Session, account_id: int, department_id: str, + page: int = 1, page_size: int = 20, + monitoring_time_start=None, monitoring_time_end=None) -> Tuple[List[Dict], int]: + """根据department_id查询量测数据(包含断面和观测点信息)""" + measurement_table = MeasurementData.get_table_name(account_id) + checkpoint_table = Checkpoint.get_table_name(account_id) + section_table = SectionData.get_table_name(account_id) + + for table_type in ["measurement_data", "checkpoint", "section_data"]: + if not TableManager.ensure_table_exists(db, table_type, account_id): + return [], 0 + + # 获取department下的section信息 + section_sql = f"SELECT section_id, mileage, rock_mass_classification FROM {section_table} WHERE department_id = :department_id" + section_result = db.execute(text(section_sql), {"department_id": department_id}) + section_map = {row[0]: {"mileage": row[1], "rock_mass_classification": row[2]} for row in section_result.fetchall()} + + if not section_map: + return [], 0 + + section_ids = list(section_map.keys()) + section_placeholders = ",".join([f":sid_{i}" for i in range(len(section_ids))]) + params = {f"sid_{i}": sid for i, sid in enumerate(section_ids)} + + # 获取这些section下的checkpoint信息 + checkpoint_sql = f"SELECT point_id, name, section_id FROM {checkpoint_table} WHERE section_id IN ({section_placeholders})" + checkpoint_result = db.execute(text(checkpoint_sql), params) + checkpoint_map = {row[0]: {"name": row[1], "section_id": row[2]} for row in checkpoint_result.fetchall()} + + if not checkpoint_map: + return [], 0 + + point_ids = list(checkpoint_map.keys()) + point_placeholders = ",".join([f":pid_{i}" for i in range(len(point_ids))]) + params = {f"pid_{i}": pid for i, pid in enumerate(point_ids)} + + # 构建时间条件 + time_conditions = [] + if monitoring_time_start: + time_conditions.append("monitoring_time >= :time_start") + params["time_start"] = monitoring_time_start + if monitoring_time_end: + time_conditions.append("monitoring_time <= :time_end") + params["time_end"] = monitoring_time_end + + time_clause = " AND " + " AND ".join(time_conditions) if time_conditions else "" + + # 查询总数 + count_sql = f"SELECT COUNT(*) FROM {measurement_table} WHERE point_id IN ({point_placeholders}){time_clause}" + total = db.execute(text(count_sql), params).scalar() + + # 分页查询 + offset = (page - 1) * page_size + params["limit"] = page_size + params["offset"] = offset + + data_sql = f"SELECT * FROM {measurement_table} WHERE point_id IN ({point_placeholders}){time_clause} ORDER BY monitoring_time DESC LIMIT :limit OFFSET :offset" + result = db.execute(text(data_sql), params) + + items = [] + for row in result.fetchall(): + item = dict(row._mapping) + checkpoint_info = checkpoint_map.get(item.get("point_id"), {}) + item["point_name"] = checkpoint_info.get("name") + + section_id = checkpoint_info.get("section_id") + section_info = section_map.get(section_id, {}) + item["section_mileage"] = section_info.get("mileage") + item["rock_mass_classification"] = section_info.get("rock_mass_classification") + items.append(item) + + return items, total diff --git a/app/servives/section_data_service.py b/app/servives/section_data_service.py new file mode 100644 index 0000000..437a82c --- /dev/null +++ b/app/servives/section_data_service.py @@ -0,0 +1,144 @@ +"""断面数据服务""" +from typing import List, Dict, Tuple +from sqlalchemy import text +from sqlalchemy.orm import Session +from app.core.logging_config import get_logger +from app.models.section_data import SectionData +from app.schemas.section_data import SectionDataCreate, SectionDataQuery +from app.schemas.common import BatchImportResponse +from .table_manager import TableManager + +logger = get_logger(__name__) + +class SectionDataService: + """断面数据服务""" + + @staticmethod + def batch_import(db: Session, account_id: int, data: List[SectionDataCreate]) -> BatchImportResponse: + """批量导入断面数据""" + table_name = SectionData.get_table_name(account_id) + + if not TableManager.ensure_table_exists(db, "section_data", account_id): + return BatchImportResponse( + success=False, total=len(data), inserted=0, skipped=0, + message="创建表失败" + ) + + # 获取已存在的section_id + section_ids = [item.section_id for item in data if item.section_id] + existing_ids = set() + if section_ids: + placeholders = ",".join([f":id_{i}" for i in range(len(section_ids))]) + params = {f"id_{i}": sid for i, sid in enumerate(section_ids)} + result = db.execute( + text(f"SELECT section_id FROM {table_name} WHERE section_id IN ({placeholders})"), + params + ) + existing_ids = {row[0] for row in result.fetchall()} + + to_insert = [] + skipped_ids = [] + for item in data: + if item.section_id in existing_ids: + skipped_ids.append(item.section_id) + else: + to_insert.append(item) + existing_ids.add(item.section_id) + + if to_insert: + try: + values = [] + params = {} + for i, item in enumerate(to_insert): + values.append(f"(:project_{i}, :mileage_{i}, :name_{i}, :number_{i}, :status_{i}, " + f":excavation_method_{i}, :rock_mass_classification_{i}, :width_{i}, " + f":U0_{i}, :remarks_{i}, :department_id_{i}, :section_id_{i})") + params[f"project_{i}"] = item.project + params[f"mileage_{i}"] = item.mileage + params[f"name_{i}"] = item.name + params[f"number_{i}"] = item.number + params[f"status_{i}"] = item.status + params[f"excavation_method_{i}"] = item.excavation_method + params[f"rock_mass_classification_{i}"] = item.rock_mass_classification + params[f"width_{i}"] = item.width + params[f"U0_{i}"] = float(item.U0) if item.U0 else None + params[f"remarks_{i}"] = item.remarks + params[f"department_id_{i}"] = item.department_id + params[f"section_id_{i}"] = item.section_id + + sql = f"""INSERT INTO {table_name} + (project, mileage, name, number, status, excavation_method, + rock_mass_classification, width, U0, remarks, department_id, section_id) + VALUES {','.join(values)}""" + db.execute(text(sql), params) + db.commit() + logger.info(f"断面数据导入成功: account_id={account_id}, 插入={len(to_insert)}, 跳过={len(skipped_ids)}") + except Exception as e: + db.rollback() + logger.error(f"断面数据导入失败: {e}") + return BatchImportResponse( + success=False, total=len(data), inserted=0, skipped=len(skipped_ids), + skipped_ids=skipped_ids, message=f"插入失败: {str(e)}" + ) + + return BatchImportResponse( + success=True, total=len(data), inserted=len(to_insert), skipped=len(skipped_ids), + skipped_ids=skipped_ids, message="导入成功" + ) + + @staticmethod + def query(db: Session, params: SectionDataQuery) -> Tuple[List[Dict], int]: + """查询断面数据""" + table_name = SectionData.get_table_name(params.account_id) + + if not TableManager.ensure_table_exists(db, "section_data", params.account_id): + return [], 0 + + conditions = [] + query_params = {} + + if params.department_id: + conditions.append("department_id = :department_id") + query_params["department_id"] = params.department_id + if params.section_id: + conditions.append("section_id = :section_id") + query_params["section_id"] = params.section_id + if params.name: + conditions.append("name LIKE :name") + query_params["name"] = f"%{params.name}%" + if params.number: + conditions.append("number = :number") + query_params["number"] = params.number + + where_clause = " AND ".join(conditions) if conditions else "1=1" + + count_sql = f"SELECT COUNT(*) FROM {table_name} WHERE {where_clause}" + total = db.execute(text(count_sql), query_params).scalar() + + offset = (params.page - 1) * params.page_size + query_params["limit"] = params.page_size + query_params["offset"] = offset + + data_sql = f"SELECT * FROM {table_name} WHERE {where_clause} LIMIT :limit OFFSET :offset" + result = db.execute(text(data_sql), query_params) + items = [dict(row._mapping) for row in result.fetchall()] + + return items, total + + @staticmethod + def get_by_section_ids(db: Session, account_id: int, section_ids: List[str]) -> Dict[str, Dict]: + """根据section_id批量获取断面数据""" + if not section_ids: + return {} + + table_name = SectionData.get_table_name(account_id) + if not TableManager.ensure_table_exists(db, "section_data", account_id): + return {} + + placeholders = ",".join([f":id_{i}" for i in range(len(section_ids))]) + params = {f"id_{i}": sid for i, sid in enumerate(section_ids)} + + sql = f"SELECT section_id, mileage, rock_mass_classification FROM {table_name} WHERE section_id IN ({placeholders})" + result = db.execute(text(sql), params) + + return {row[0]: {"mileage": row[1], "rock_mass_classification": row[2]} for row in result.fetchall()} diff --git a/app/servives/table_manager.py b/app/servives/table_manager.py new file mode 100644 index 0000000..156a4c7 --- /dev/null +++ b/app/servives/table_manager.py @@ -0,0 +1,87 @@ +""" +动态表管理器 +处理分表创建,避免事务冲突 +""" +from sqlalchemy import text +from sqlalchemy.orm import Session +from app.core.logging_config import get_logger +from app.models.work_area import WorkArea +from app.models.section_data import SectionData +from app.models.checkpoint import Checkpoint +from app.models.measurement_data import MeasurementData + +logger = get_logger(__name__) + +class TableManager: + """动态表管理器""" + + # 缓存已创建的表 + _created_tables = set() + + @classmethod + def ensure_table_exists(cls, db: Session, table_type: str, account_id: int) -> bool: + """ + 确保表存在,如果不存在则创建 + 使用独立连接创建表,避免与业务事务冲突 + """ + table_name = cls._get_table_name(table_type, account_id) + cache_key = f"{table_type}_{account_id}" + + # 检查缓存 + if cache_key in cls._created_tables: + return True + + try: + # 检查表是否存在 + result = db.execute(text(f"SHOW TABLES LIKE '{table_name}'")) + if result.fetchone(): + cls._created_tables.add(cache_key) + return True + + # 获取建表SQL + create_sql = cls._get_create_sql(table_type, account_id) + if not create_sql: + logger.error(f"未知的表类型: {table_type}") + return False + + # 使用独立连接创建表(避免事务冲突) + connection = db.get_bind().connect() + try: + connection.execute(text(create_sql)) + connection.commit() + cls._created_tables.add(cache_key) + logger.info(f"动态创建表成功: {table_name}") + return True + finally: + connection.close() + + except Exception as e: + logger.error(f"创建表失败 {table_name}: {e}") + return False + + @classmethod + def _get_table_name(cls, table_type: str, account_id: int) -> str: + """获取表名""" + table_map = { + "work_area": WorkArea.get_table_name, + "section_data": SectionData.get_table_name, + "checkpoint": Checkpoint.get_table_name, + "measurement_data": MeasurementData.get_table_name, + } + return table_map.get(table_type, lambda x: "")(account_id) + + @classmethod + def _get_create_sql(cls, table_type: str, account_id: int) -> str: + """获取建表SQL""" + sql_map = { + "work_area": WorkArea.get_create_sql, + "section_data": SectionData.get_create_sql, + "checkpoint": Checkpoint.get_create_sql, + "measurement_data": MeasurementData.get_create_sql, + } + return sql_map.get(table_type, lambda x: "")(account_id) + + @classmethod + def clear_cache(cls): + """清除表缓存""" + cls._created_tables.clear() diff --git a/app/servives/work_area_service.py b/app/servives/work_area_service.py new file mode 100644 index 0000000..c0c93a8 --- /dev/null +++ b/app/servives/work_area_service.py @@ -0,0 +1,120 @@ +"""工区数据服务""" +from typing import List, Dict, Any, Tuple +from sqlalchemy import text +from sqlalchemy.orm import Session +from app.core.logging_config import get_logger +from app.models.work_area import WorkArea +from app.schemas.work_area import WorkAreaCreate, WorkAreaQuery +from app.schemas.common import BatchImportResponse +from .table_manager import TableManager + +logger = get_logger(__name__) + +class WorkAreaService: + """工区数据服务""" + + @staticmethod + def batch_import(db: Session, account_id: int, data: List[WorkAreaCreate]) -> BatchImportResponse: + """批量导入工区数据""" + table_name = WorkArea.get_table_name(account_id) + + # 确保表存在 + if not TableManager.ensure_table_exists(db, "work_area", account_id): + return BatchImportResponse( + success=False, total=len(data), inserted=0, skipped=0, + message="创建表失败" + ) + + # 获取已存在的department_id + department_ids = [item.department_id for item in data if item.department_id] + existing_ids = set() + if department_ids: + placeholders = ",".join([f":id_{i}" for i in range(len(department_ids))]) + params = {f"id_{i}": did for i, did in enumerate(department_ids)} + result = db.execute( + text(f"SELECT department_id FROM {table_name} WHERE department_id IN ({placeholders})"), + params + ) + existing_ids = {row[0] for row in result.fetchall()} + + # 过滤重复数据 + to_insert = [] + skipped_ids = [] + for item in data: + if item.department_id in existing_ids: + skipped_ids.append(item.department_id) + else: + to_insert.append(item) + existing_ids.add(item.department_id) # 防止批次内重复 + + # 批量插入 + if to_insert: + try: + values = [] + params = {} + for i, item in enumerate(to_insert): + values.append(f"(:department_id_{i}, :parent_id_{i}, :type_{i}, :name_{i})") + params[f"department_id_{i}"] = item.department_id + params[f"parent_id_{i}"] = item.parent_id + params[f"type_{i}"] = item.type + params[f"name_{i}"] = item.name + + sql = f"INSERT INTO {table_name} (department_id, parent_id, type, name) VALUES {','.join(values)}" + db.execute(text(sql), params) + db.commit() + logger.info(f"工区数据导入成功: account_id={account_id}, 插入={len(to_insert)}, 跳过={len(skipped_ids)}") + except Exception as e: + db.rollback() + logger.error(f"工区数据导入失败: {e}") + return BatchImportResponse( + success=False, total=len(data), inserted=0, skipped=len(skipped_ids), + skipped_ids=skipped_ids, message=f"插入失败: {str(e)}" + ) + + return BatchImportResponse( + success=True, total=len(data), inserted=len(to_insert), skipped=len(skipped_ids), + skipped_ids=skipped_ids, message="导入成功" + ) + + @staticmethod + def query(db: Session, params: WorkAreaQuery) -> Tuple[List[Dict], int]: + """查询工区数据""" + table_name = WorkArea.get_table_name(params.account_id) + + # 确保表存在 + if not TableManager.ensure_table_exists(db, "work_area", params.account_id): + return [], 0 + + # 构建查询条件 + conditions = [] + query_params = {} + + if params.department_id: + conditions.append("department_id = :department_id") + query_params["department_id"] = params.department_id + if params.parent_id: + conditions.append("parent_id = :parent_id") + query_params["parent_id"] = params.parent_id + if params.type: + conditions.append("type = :type") + query_params["type"] = params.type + if params.name: + conditions.append("name LIKE :name") + query_params["name"] = f"%{params.name}%" + + where_clause = " AND ".join(conditions) if conditions else "1=1" + + # 查询总数 + count_sql = f"SELECT COUNT(*) FROM {table_name} WHERE {where_clause}" + total = db.execute(text(count_sql), query_params).scalar() + + # 分页查询 + offset = (params.page - 1) * params.page_size + query_params["limit"] = params.page_size + query_params["offset"] = offset + + data_sql = f"SELECT * FROM {table_name} WHERE {where_clause} LIMIT :limit OFFSET :offset" + result = db.execute(text(data_sql), query_params) + items = [dict(row._mapping) for row in result.fetchall()] + + return items, total diff --git a/app/utils/__init__.py b/app/utils/__init__.py new file mode 100644 index 0000000..0dc3ac6 --- /dev/null +++ b/app/utils/__init__.py @@ -0,0 +1 @@ +# 工具模块 diff --git a/deploy.sh b/deploy.sh new file mode 100644 index 0000000..26f19f0 --- /dev/null +++ b/deploy.sh @@ -0,0 +1,37 @@ +#!/bin/bash + +echo "=== 工程围岩数据信息处理系统部署脚本 ===" + +SKIP_GIT_PULL=0 +if [ "$1" = "0" ]; then + SKIP_GIT_PULL=1 + echo "跳过代码拉取" +fi + +if [ $SKIP_GIT_PULL -eq 0 ]; then + echo "正在拉取最新代码..." + git pull origin main + if [ $? -ne 0 ]; then + echo "代码拉取失败,是否继续? (y/n)" + read -r CONTINUE + if [ "$CONTINUE" != "y" ]; then + exit 1 + fi + fi +fi + +echo "正在停止当前服务..." +docker compose down --rmi all + +echo "正在启动新服务..." +docker compose up -d + +if [ $? -eq 0 ]; then + echo "服务启动成功" + docker compose ps +else + echo "启动服务失败" + exit 1 +fi + +echo "=== 部署完成 ===" diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..259ad4a --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,24 @@ +version: '3.8' + +services: + tunnel-app: + build: . + container_name: tunnel-rock-app + ports: + - "8000:8000" + extra_hosts: + - "host.docker.internal:host-gateway" + volumes: + - ./logs:/app/logs + - ./.env:/app/.env:ro + restart: unless-stopped + deploy: + resources: + limits: + memory: 4G + cpus: '4.0' + reservations: + memory: 1G + cpus: '2.0' + environment: + - APP_DEBUG=false diff --git a/main.py b/main.py new file mode 100644 index 0000000..8b1f835 --- /dev/null +++ b/main.py @@ -0,0 +1,16 @@ +import uvicorn +from app.main import app +from app.core.config import settings +from app.core.logging_config import setup_logging + +if __name__ == "__main__": + setup_logging() + + uvicorn.run( + "app.main:app", + host=settings.APP_HOST, + port=settings.APP_PORT, + reload=settings.APP_DEBUG, + access_log=True, + log_level="info" + ) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..947a77d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +fastapi==0.109.0 +uvicorn==0.27.0 +sqlalchemy==2.0.25 +pymysql==1.1.0 +pydantic==2.5.3 +pydantic-settings==2.1.0 +python-dotenv==1.0.0 +gunicorn==21.2.0 +cryptography==42.0.0