From a7b7a786a52bf4af542d5bca4f3c5cf4f032d79f Mon Sep 17 00:00:00 2001 From: lhx Date: Tue, 20 Jan 2026 00:39:46 +0000 Subject: [PATCH] =?UTF-8?q?=E5=90=88=E5=B9=B6=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/services/daily.py | 709 +++++++++++++++++++++--------------------- deploy.sh | 2 +- fix_mysql.sh | 36 +++ test_connection.sh | 27 ++ 4 files changed, 420 insertions(+), 354 deletions(-) create mode 100644 fix_mysql.sh create mode 100644 test_connection.sh diff --git a/app/services/daily.py b/app/services/daily.py index b41f577..a7bda46 100644 --- a/app/services/daily.py +++ b/app/services/daily.py @@ -1,354 +1,357 @@ -from sqlalchemy.orm import Session -from typing import List, Optional, Dict, Any, Set, Tuple,Union -from ..models.level_data import LevelData -from ..models.daily import DailyData -from .base import BaseService -from ..models.settlement_data import SettlementData -from sqlalchemy import func, select, desc,over -from sqlalchemy.orm import Session -import logging -logger = logging.getLogger(__name__) -class DailyDataService(BaseService[DailyData]): - def __init__(self): - super().__init__(DailyData) - - def _dict_to_instance(self, data_dict: Dict) -> DailyData: - """辅助方法:将单个字典转换为 DailyData 实例""" - model_fields = [col.name for col in DailyData.__table__.columns] - filtered_data = {k: v for k, v in data_dict.items() if k in model_fields} - return DailyData(**filtered_data) - - def _ensure_instances(self, data: Union[List[Dict], List[DailyData]]) -> List[DailyData]: - """确保输入数据是 DailyData 实例列表""" - if not isinstance(data, list): - raise TypeError(f"输入必须是列表,而非 {type(data)}") - - instances = [] - for item in data: - if isinstance(item, DailyData): - instances.append(item) - elif isinstance(item, dict): - instances.append(self._dict_to_instance(item)) - else: - raise TypeError(f"列表元素必须是 dict 或 DailyData 实例,而非 {type(item)}") - return instances - - def batch_create_by_account_nyid(self, db: Session, data: Union[List[Dict], List[DailyData]]) -> List[DailyData]: - """ - 批量创建记录,支持两种输入格式: - - List[DailyData]:模型实例列表 - - List[dict]:字典列表(自动转换为实例) - 通过 (account_id, NYID) 联合判断是否已存在,存在则忽略 - """ - try: - data_list = self._ensure_instances(data) - except TypeError as e: - logger.error(f"数据格式错误:{str(e)}") - raise - - target_pairs: List[Tuple[int, int]] = [ - (item.account_id, item.NYID) - for item in data_list - if item.account_id is not None and item.NYID is not None - ] - - if not target_pairs: - logger.warning("批量创建失败:所有记录缺少 account_id 或 NYID") - return [] - - existing_pairs: Set[Tuple[int, int]] = { - (item.account_id, item.NYID) - for item in db.query(DailyData.account_id, DailyData.NYID) - .filter(DailyData.account_id.in_([p[0] for p in target_pairs]), - DailyData.NYID.in_([p[1] for p in target_pairs])) - .all() - } - - to_create = [ - item for item in data_list - if (item.account_id, item.NYID) not in existing_pairs - ] - - ignored_count = len(data_list) - len(to_create) - if ignored_count > 0: - logger.info(f"批量创建时忽略{ignored_count}条已存在记录(account_id和NYID已存在)") - - if not to_create: - return [] - - # 修复点:使用 add_all 替代 bulk_save_objects,确保对象被会话跟踪 - db.add_all(to_create) # 这里是关键修改 - db.commit() - - # 现在可以安全地刷新实例了 - for item in to_create: - db.refresh(item) - - return to_create - - - def get_nyid_by_point_id( - self, - db: Session, - point_ids: Optional[List[int]] = None, - max_num: int = 1 - ) -> List[List[dict]]: - """ - 获取指定point_id的记录,每个point_id的前max_num条记录放在同一个子列表中 - 返回格式:[[point1_records...], [point2_records...]] - """ - # 处理参数默认值 - point_ids = point_ids or [] - if max_num <= 0: - return [] - - # 窗口函数:按point_id分组,每组内按NYID降序编号 - row_num = over( - func.row_number(), - partition_by=SettlementData.point_id, - order_by=desc(SettlementData.NYID) - ).label("row_num") - - # 模型字段列表 - model_columns = [getattr(SettlementData, col.name) for col in SettlementData.__table__.columns] - - # 基础条件 - base_conditions = [ - SettlementData.useflag.isnot(None), - SettlementData.useflag != 0 - ] - if point_ids: - base_conditions.append(SettlementData.point_id.in_(point_ids)) - - # 子查询 - subquery = ( - select(*model_columns, row_num) - .where(*base_conditions) - .subquery() - ) - - # 主查询:筛选每个point_id的前max_num条 - query = ( - select(subquery) - .where(subquery.c.row_num <= max_num) - .order_by(subquery.c.point_id, subquery.c.row_num) - ) - - # 执行查询 - results = db.execute(query).all() - grouped: Dict[int, List[dict]] = {} # 键:point_id,值:该point_id的所有记录(子列表) - field_names = [col.name for col in SettlementData.__table__.columns] - - for row in results: - item_dict = {field: getattr(row, field) for field in field_names} - try: - pid = int(item_dict["point_id"]) # 确保point_id为整数 - except (KeyError, ValueError): - continue # 跳过无效记录 - - # 同一point_id的记录放入同一个列表 - if pid not in grouped: - grouped[pid] = [] # 初始化子列表 - grouped[pid].append(item_dict) # 追加到子列表 - - # 按输入point_ids顺序整理结果(关键:每个point_id对应一个子列表) - if not point_ids: - point_ids = sorted(grouped.keys()) # 若无指定,按point_id排序 - - # 构建最终结果:每个point_id的记录作为一个子列表 - return [grouped.get(pid, []) for pid in point_ids] - - # 获取所有的今日数据 - def get_all_daily_data( - self, - db: Session, - user_id: Optional[int] = None # 可选参数:按user_id筛选 - ) -> List[Dict[str, Any]]: - """ - 获取所有日常数据(DailyData),支持按user_id筛选 - :param db: 数据库会话 - :param user_id: 可选用户ID,若提供则只返回该用户的数据 - :return: 日常数据字典列表,包含所有字段 - """ - try: - # 基础查询 - query = db.query(DailyData) - - # 若提供了user_id,则添加筛选条件 - if user_id is not None: - query = query.filter(DailyData.user_id == user_id) - logger.info(f"查询user_id={user_id}的所有日常数据") - else: - logger.info("查询所有日常数据") - - # 执行查询并获取所有记录 - daily_records = query.all() - - # 转换为字典列表(保留所有字段) - result = [] - for record in daily_records: - record_dict = { - column.name: getattr(record, column.name) - for column in DailyData.__table__.columns - } - result.append(record_dict) - - logger.info(f"查询完成,共获取{len(result)}条日常数据") - return result - - except Exception as e: - logger.error(f"获取日常数据失败:{str(e)}", exc_info=True) - raise e - def get_daily_data_by_account( - self, - db: Session, - account_id: str, # 账号ID(必填,因为是核心筛选条件) - user_id: Optional[int] = None # 可选参数:额外按user_id筛选 - ) -> List[Dict[str, Any]]: - """ - 根据account_id获取对应日常数据,支持额外按user_id筛选 - :param db: 数据库会话 - :param account_id: 账号ID(必填),用于精准筛选数据 - :param user_id: 可选用户ID,若提供则则进一步筛选该用户的数据 - :return: 符合条件的日常数据字典列表,包含所有字段 - """ - try: - # 基础查询:先按account_id筛选(必填条件) - query = db.query(DailyData).filter(DailyData.account_id == account_id) - - # 若提供了user_id,则添加额外筛选条件 - if user_id is not None: - query = query.filter(DailyData.user_id == user_id) - logger.info(f"查询account_id={account_id}且user_id={user_id}的日常数据") - else: - logger.info(f"查询account_id={account_id}的所有日常数据") - - # 执行查询并获取记录 - daily_records = query.all() - - # 转换为字典列表(保留所有字段) - result = [] - for record in daily_records: - record_dict = { - column.name: getattr(record, column.name) - for column in DailyData.__table__.columns - } - result.append(record_dict) - - logger.info(f"查询完成,account_id={account_id}对应{len(result)}条日常数据") - return result - - except Exception as e: - logger.error(f"获取account_id={account_id}的日常数据失败:{str(e)}", exc_info=True) - raise e - - def create_daily_from_linecode( - self, - db: Session, - linecode: str, - account_id: Optional[int] = None - ) -> List[DailyData]: - """ - 通过水准线路编码生成 daily 数据 - - 业务逻辑: - 1. 在水准数据表(level_data)中查找符合 linecode 的记录,且 NYID 最大 - 2. 通过 NYID 查询沉降数据表(settlement_data) - 3. 通过沉降数据的 point_id 查询观测点表(checkpoint),得到 section_id - 4. 通过 section_id 查询断面表(section_data),得到 account_id - 5. 整合这些数据,形成 daily 对象,插入到数据库表 - - Args: - db: 数据库会话 - linecode: 水准线路编码 - account_id: 可选的账户ID筛选条件 - - Returns: - 创建的 DailyData 记录列表 - """ - try: - logger.info(f"开始处理 linecode={linecode} 的 daily 数据生成请求") - - from ..models.level_data import LevelData - from ..models.settlement_data import SettlementData - from ..models.checkpoint import Checkpoint - from ..models.section_data import SectionData - - # 1. 在水准数据表中查找符合 linecode 的记录,且 NYID 最大 - level_data_list = db.query(LevelData)\ - .filter(LevelData.linecode == linecode)\ - .all() - - if not level_data_list: - raise ValueError(f"未找到 linecode={linecode} 对应的水准数据") - - # 找到 NYID 最大的记录(将 String 转换为数字进行比较) - max_nyid = max(level_data_list, key=lambda x: int(x.NYID) if x.NYID.isdigit() else 0) - target_nyid = max_nyid.NYID - logger.info(f"找到最大 NYID: {target_nyid}") - - # 2. 通过 NYID 查询沉降数据 - settlement_data_list = db.query(SettlementData)\ - .filter(SettlementData.NYID == target_nyid)\ - .all() - - if not settlement_data_list: - raise ValueError(f"未找到 NYID={target_nyid} 对应的沉降数据") - - # 3. 遍历沉降数据,构建 daily 记录 - daily_records = [] - for settlement in settlement_data_list: - # 3.1 通过沉降数据的 point_id 查询观测点表,得到 section_id - checkpoint = db.query(Checkpoint)\ - .filter(Checkpoint.point_id == settlement.point_id)\ - .first() - if not checkpoint: - logger.warning(f"未找到 point_id={settlement.point_id} 对应的观测点,跳过该记录") - continue - - # 3.2 通过 section_id 查询断面表,得到 account_id - section = db.query(SectionData)\ - .filter(SectionData.section_id == checkpoint.section_id)\ - .first() - if not section: - logger.warning(f"未找到 section_id={checkpoint.section_id} 对应的断面,跳过该记录") - continue - - # 3.3 从断面数据中获取 account_id,作为 user_id - user_id = int(section.account_id) if section.account_id else None - if not user_id: - logger.warning(f"断面 section_id={checkpoint.section_id} 没有 account_id,跳过该记录") - continue - - # 如果提供了 account_id 筛选条件,则只处理匹配的记录 - if account_id is not None and user_id != account_id: - continue - - # 3.4 构建 daily 记录 - daily_record = DailyData( - user_id=user_id, - account_id=user_id, - point_id=settlement.point_id, - NYID=settlement.NYID, - linecode=linecode, - section_id=checkpoint.section_id, - remaining=0, - is_all=0 - ) - daily_records.append(daily_record) - - if not daily_records: - logger.warning(f"没有生成任何 daily 记录") - return [] - - # 4. 批量插入 daily 记录 - created_records = self.batch_create_by_account_nyid(db, daily_records) - - logger.info(f"成功生成 {len(created_records)} 条 daily 记录") - return created_records - - except ValueError: - raise - except Exception as e: - logger.error(f"生成 daily 数据失败:{str(e)}", exc_info=True) +from sqlalchemy.orm import Session +from typing import List, Optional, Dict, Any, Set, Tuple,Union +from ..models.level_data import LevelData +from ..models.daily import DailyData +from .base import BaseService +from ..models.settlement_data import SettlementData +from sqlalchemy import func, select, desc,over +from sqlalchemy.orm import Session +import logging +logger = logging.getLogger(__name__) +class DailyDataService(BaseService[DailyData]): + def __init__(self): + super().__init__(DailyData) + + def _dict_to_instance(self, data_dict: Dict) -> DailyData: + """辅助方法:将单个字典转换为 DailyData 实例""" + model_fields = [col.name for col in DailyData.__table__.columns] + filtered_data = {k: v for k, v in data_dict.items() if k in model_fields} + return DailyData(**filtered_data) + + def _ensure_instances(self, data: Union[List[Dict], List[DailyData]]) -> List[DailyData]: + """确保输入数据是 DailyData 实例列表""" + if not isinstance(data, list): + raise TypeError(f"输入必须是列表,而非 {type(data)}") + + instances = [] + for item in data: + if isinstance(item, DailyData): + instances.append(item) + elif isinstance(item, dict): + instances.append(self._dict_to_instance(item)) + else: + raise TypeError(f"列表元素必须是 dict 或 DailyData 实例,而非 {type(item)}") + return instances + + def batch_create_by_account_nyid(self, db: Session, data: Union[List[Dict], List[DailyData]]) -> List[DailyData]: + """ + 批量创建记录,支持两种输入格式: + - List[DailyData]:模型实例列表 + - List[dict]:字典列表(自动转换为实例) + 通过 (account_id, NYID) 联合判断是否已存在,存在则忽略 --(暂时取消查重) + """ + try: + data_list = self._ensure_instances(data) + except TypeError as e: + logger.error(f"数据格式错误:{str(e)}") + raise + + target_pairs: List[Tuple[int, int]] = [ + (item.account_id, item.NYID) + for item in data_list + if item.account_id is not None and item.NYID is not None + ] + + if not target_pairs: + logger.warning("批量创建失败:所有记录缺少 account_id 或 NYID") + return [] + + + # 取消查重处理 + # existing_pairs: Set[Tuple[int, int]] = { + # (item.account_id, item.NYID) + # for item in db.query(DailyData.account_id, DailyData.NYID) + # .filter(DailyData.account_id.in_([p[0] for p in target_pairs]), + # DailyData.NYID.in_([p[1] for p in target_pairs])) + # .all() + # } + + to_create = [ + item for item in data_list + # if (item.account_id, item.NYID) not in existing_pairs + ] + + # ignored_count = len(data_list) - len(to_create) + # if ignored_count > 0: + # logger.info(f"批量创建时忽略{ignored_count}条已存在记录(account_id和NYID已存在)") + + logger.info(f"批量创建 {to_create}") + if not to_create: + return [] + + # 修复点:使用 add_all 替代 bulk_save_objects,确保对象被会话跟踪 + db.add_all(to_create) # 这里是关键修改 + db.commit() + + # 现在可以安全地刷新实例了 + for item in to_create: + db.refresh(item) + + return to_create + + + def get_nyid_by_point_id( + self, + db: Session, + point_ids: Optional[List[int]] = None, + max_num: int = 1 + ) -> List[List[dict]]: + """ + 获取指定point_id的记录,每个point_id的前max_num条记录放在同一个子列表中 + 返回格式:[[point1_records...], [point2_records...]] + """ + # 处理参数默认值 + point_ids = point_ids or [] + if max_num <= 0: + return [] + + # 窗口函数:按point_id分组,每组内按NYID降序编号 + row_num = over( + func.row_number(), + partition_by=SettlementData.point_id, + order_by=desc(SettlementData.NYID) + ).label("row_num") + + # 模型字段列表 + model_columns = [getattr(SettlementData, col.name) for col in SettlementData.__table__.columns] + + # 基础条件 + base_conditions = [ + SettlementData.useflag.isnot(None), + SettlementData.useflag != 0 + ] + if point_ids: + base_conditions.append(SettlementData.point_id.in_(point_ids)) + + # 子查询 + subquery = ( + select(*model_columns, row_num) + .where(*base_conditions) + .subquery() + ) + + # 主查询:筛选每个point_id的前max_num条 + query = ( + select(subquery) + .where(subquery.c.row_num <= max_num) + .order_by(subquery.c.point_id, subquery.c.row_num) + ) + + # 执行查询 + results = db.execute(query).all() + grouped: Dict[int, List[dict]] = {} # 键:point_id,值:该point_id的所有记录(子列表) + field_names = [col.name for col in SettlementData.__table__.columns] + + for row in results: + item_dict = {field: getattr(row, field) for field in field_names} + try: + pid = int(item_dict["point_id"]) # 确保point_id为整数 + except (KeyError, ValueError): + continue # 跳过无效记录 + + # 同一point_id的记录放入同一个列表 + if pid not in grouped: + grouped[pid] = [] # 初始化子列表 + grouped[pid].append(item_dict) # 追加到子列表 + + # 按输入point_ids顺序整理结果(关键:每个point_id对应一个子列表) + if not point_ids: + point_ids = sorted(grouped.keys()) # 若无指定,按point_id排序 + + # 构建最终结果:每个point_id的记录作为一个子列表 + return [grouped.get(pid, []) for pid in point_ids] + + # 获取所有的今日数据 + def get_all_daily_data( + self, + db: Session, + user_id: Optional[int] = None # 可选参数:按user_id筛选 + ) -> List[Dict[str, Any]]: + """ + 获取所有日常数据(DailyData),支持按user_id筛选 + :param db: 数据库会话 + :param user_id: 可选用户ID,若提供则只返回该用户的数据 + :return: 日常数据字典列表,包含所有字段 + """ + try: + # 基础查询 + query = db.query(DailyData) + + # 若提供了user_id,则添加筛选条件 + if user_id is not None: + query = query.filter(DailyData.user_id == user_id) + logger.info(f"查询user_id={user_id}的所有日常数据") + else: + logger.info("查询所有日常数据") + + # 执行查询并获取所有记录 + daily_records = query.all() + + # 转换为字典列表(保留所有字段) + result = [] + for record in daily_records: + record_dict = { + column.name: getattr(record, column.name) + for column in DailyData.__table__.columns + } + result.append(record_dict) + + logger.info(f"查询完成,共获取{len(result)}条日常数据") + return result + + except Exception as e: + logger.error(f"获取日常数据失败:{str(e)}", exc_info=True) + raise e + def get_daily_data_by_account( + self, + db: Session, + account_id: str, # 账号ID(必填,因为是核心筛选条件) + user_id: Optional[int] = None # 可选参数:额外按user_id筛选 + ) -> List[Dict[str, Any]]: + """ + 根据account_id获取对应日常数据,支持额外按user_id筛选 + :param db: 数据库会话 + :param account_id: 账号ID(必填),用于精准筛选数据 + :param user_id: 可选用户ID,若提供则则进一步筛选该用户的数据 + :return: 符合条件的日常数据字典列表,包含所有字段 + """ + try: + # 基础查询:先按account_id筛选(必填条件) + query = db.query(DailyData).filter(DailyData.account_id == account_id) + + # 若提供了user_id,则添加额外筛选条件 + if user_id is not None: + query = query.filter(DailyData.user_id == user_id) + logger.info(f"查询account_id={account_id}且user_id={user_id}的日常数据") + else: + logger.info(f"查询account_id={account_id}的所有日常数据") + + # 执行查询并获取记录 + daily_records = query.all() + + # 转换为字典列表(保留所有字段) + result = [] + for record in daily_records: + record_dict = { + column.name: getattr(record, column.name) + for column in DailyData.__table__.columns + } + result.append(record_dict) + + logger.info(f"查询完成,account_id={account_id}对应{len(result)}条日常数据") + return result + + except Exception as e: + logger.error(f"获取account_id={account_id}的日常数据失败:{str(e)}", exc_info=True) + raise e + + def create_daily_from_linecode( + self, + db: Session, + linecode: str, + account_id: Optional[int] = None + ) -> List[DailyData]: + """ + 通过水准线路编码生成 daily 数据 + + 业务逻辑: + 1. 在水准数据表(level_data)中查找符合 linecode 的记录,且 NYID 最大 + 2. 通过 NYID 查询沉降数据表(settlement_data) + 3. 通过沉降数据的 point_id 查询观测点表(checkpoint),得到 section_id + 4. 通过 section_id 查询断面表(section_data),得到 account_id + 5. 整合这些数据,形成 daily 对象,插入到数据库表 + + Args: + db: 数据库会话 + linecode: 水准线路编码 + account_id: 可选的账户ID筛选条件 + + Returns: + 创建的 DailyData 记录列表 + """ + try: + logger.info(f"开始处理 linecode={linecode} 的 daily 数据生成请求") + + from ..models.level_data import LevelData + from ..models.settlement_data import SettlementData + from ..models.checkpoint import Checkpoint + from ..models.section_data import SectionData + + # 1. 在水准数据表中查找符合 linecode 的记录,且 NYID 最大 + level_data_list = db.query(LevelData)\ + .filter(LevelData.linecode == linecode)\ + .all() + + if not level_data_list: + raise ValueError(f"未找到 linecode={linecode} 对应的水准数据") + + # 找到 NYID 最大的记录(将 String 转换为数字进行比较) + max_nyid = max(level_data_list, key=lambda x: int(x.NYID) if x.NYID.isdigit() else 0) + target_nyid = max_nyid.NYID + logger.info(f"找到最大 NYID: {target_nyid}") + + # 2. 通过 NYID 查询沉降数据 + settlement_data_list = db.query(SettlementData)\ + .filter(SettlementData.NYID == target_nyid)\ + .all() + + if not settlement_data_list: + raise ValueError(f"未找到 NYID={target_nyid} 对应的沉降数据") + + # 3. 遍历沉降数据,构建 daily 记录 + daily_records = [] + for settlement in settlement_data_list: + # 3.1 通过沉降数据的 point_id 查询观测点表,得到 section_id + checkpoint = db.query(Checkpoint)\ + .filter(Checkpoint.point_id == settlement.point_id)\ + .first() + if not checkpoint: + logger.warning(f"未找到 point_id={settlement.point_id} 对应的观测点,跳过该记录") + continue + + # 3.2 通过 section_id 查询断面表,得到 account_id + section = db.query(SectionData)\ + .filter(SectionData.section_id == checkpoint.section_id)\ + .first() + if not section: + logger.warning(f"未找到 section_id={checkpoint.section_id} 对应的断面,跳过该记录") + continue + + # 3.3 从断面数据中获取 account_id,作为 user_id + user_id = int(section.account_id) if section.account_id else None + if not user_id: + logger.warning(f"断面 section_id={checkpoint.section_id} 没有 account_id,跳过该记录") + continue + + # 如果提供了 account_id 筛选条件,则只处理匹配的记录 + if account_id is not None and user_id != account_id: + continue + + # 3.4 构建 daily 记录 + daily_record = DailyData( + user_id=user_id, + account_id=user_id, + point_id=settlement.point_id, + NYID=settlement.NYID, + linecode=linecode, + section_id=checkpoint.section_id, + remaining=0, + is_all=0 + ) + daily_records.append(daily_record) + + if not daily_records: + logger.warning(f"没有生成任何 daily 记录") + return [] + + # 4. 批量插入 daily 记录 + created_records = self.batch_create_by_account_nyid(db, daily_records) + + logger.info(f"成功生成 {len(created_records)} 条 daily 记录") + return created_records + + except ValueError: + raise + except Exception as e: + logger.error(f"生成 daily 数据失败:{str(e)}", exc_info=True) raise \ No newline at end of file diff --git a/deploy.sh b/deploy.sh index fb764b6..9262031 100644 --- a/deploy.sh +++ b/deploy.sh @@ -79,4 +79,4 @@ echo "当前运行的Docker容器:" echo "$SUDO_PASSWORD" | sudo -S docker ps echo "" -echo "=== 部署完成 ===" \ No newline at end of file +echo "=== 部署完成 ===" diff --git a/fix_mysql.sh b/fix_mysql.sh new file mode 100644 index 0000000..c1692ed --- /dev/null +++ b/fix_mysql.sh @@ -0,0 +1,36 @@ +#!/bin/bash + +echo "=== 修复 MySQL 监听配置 ===" + +# 1. 备份配置文件 +sudo cp /etc/mysql/mysql.conf.d/mysqld.cnf /etc/mysql/mysql.conf.d/mysqld.cnf.backup.$(date +%Y%m%d_%H%M%S) +echo "✅ 已备份配置文件" + +# 2. 修改 bind-address +sudo sed -i 's/^bind-address.*127.0.0.1/bind-address = 0.0.0.0/' /etc/mysql/mysql.conf.d/mysqld.cnf +echo "✅ 已修改 bind-address" + +# 3. 显示修改后的配置 +echo "" +echo "修改后的配置:" +sudo grep "bind-address" /etc/mysql/mysql.conf.d/mysqld.cnf +echo "" + +# 4. 重启 MySQL +echo "正在重启 MySQL..." +sudo systemctl restart mysql +sleep 2 + +# 5. 验证 +echo "" +echo "验证 MySQL 监听状态:" +sudo netstat -tlnp | grep 3306 +echo "" + +# 6. 测试连接 +DOCKER0_IP=$(ip addr show docker0 | grep -oP '(?<=inet\s)\d+(\.\d+){3}') +echo "测试从本机连接 MySQL (IP: $DOCKER0_IP):" +mysql -h $DOCKER0_IP -u railway -p'Railway01.' -e "SELECT 'Connection OK' as status, DATABASE() as current_db, VERSION() as version;" 2>&1 + +echo "" +echo "=== 修复完成 ===" diff --git a/test_connection.sh b/test_connection.sh new file mode 100644 index 0000000..9b84346 --- /dev/null +++ b/test_connection.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +echo "=== Docker 网络诊断 ===" +echo "" + +echo "1. Docker0 网桥 IP:" +ip addr show docker0 2>/dev/null | grep -oP '(?<=inet\s)\d+(\.\d+){3}' || echo "❌ docker0 不存在" +echo "" + +echo "2. MySQL 监听状态:" +sudo netstat -tlnp | grep 3306 || echo "❌ MySQL 未运行或未监听 3306" +echo "" + +echo "3. MySQL 用户权限:" +mysql -u root -p -e "SELECT user, host FROM mysql.user WHERE user='railway';" 2>/dev/null || echo "❌ 无法查询(需要 root 密码)" +echo "" + +echo "4. 测试从容器连接 MySQL:" +DOCKER0_IP=$(ip addr show docker0 | grep -oP '(?<=inet\s)\d+(\.\d+){3}') +if [ ! -z "$DOCKER0_IP" ]; then + docker run --rm mysql:8.0 mysql -h $DOCKER0_IP -u railway -p'Railway01.' -e "SELECT 'OK' as status;" 2>&1 +else + echo "❌ 无法获取 docker0 IP" +fi + +echo "" +echo "=== 诊断完成 ==="