Files
railway_cloud/app/services/database.py
2025-09-26 15:58:32 +08:00

195 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from sqlalchemy.orm import Session
from sqlalchemy import text, MetaData, Table, Column, create_engine, inspect
from sqlalchemy.exc import SQLAlchemyError
from typing import List, Dict, Any, Optional
from ..core.database import engine
import pandas as pd
class DatabaseService:
@staticmethod
def execute_sql(db: Session, sql: str) -> Dict[str, Any]:
"""执行SQL语句"""
try:
result = db.execute(text(sql))
# 判断是否为SELECT查询
if sql.strip().upper().startswith('SELECT'):
data = []
columns = result.keys()
for row in result:
data.append(dict(zip(columns, row)))
return {
"success": True,
"message": "查询成功",
"data": data
}
else:
# 非SELECT语句提交事务
db.commit()
return {
"success": True,
"message": "执行成功",
"rows_affected": result.rowcount
}
except SQLAlchemyError as e:
db.rollback()
return {
"success": False,
"message": f"SQL执行失败: {str(e)}"
}
except Exception as e:
db.rollback()
return {
"success": False,
"message": f"未知错误: {str(e)}"
}
@staticmethod
def get_table_data(db: Session, table_name: str, limit: int = 100, offset: int = 0) -> Dict[str, Any]:
"""获取表数据"""
try:
# 先检查表是否存在
inspector = inspect(engine)
if table_name not in inspector.get_table_names():
return {
"success": False,
"message": f"{table_name} 不存在"
}
# 获取总数
count_sql = f"SELECT COUNT(*) as total FROM {table_name}"
count_result = db.execute(text(count_sql)).fetchone()
total_count = count_result.total if count_result else 0
# 获取数据
sql = f"SELECT * FROM {table_name} LIMIT {limit} OFFSET {offset}"
result = db.execute(text(sql))
data = []
columns = result.keys()
for row in result:
data.append(dict(zip(columns, row)))
return {
"success": True,
"message": "获取数据成功",
"data": data,
"total_count": total_count
}
except SQLAlchemyError as e:
return {
"success": False,
"message": f"获取表数据失败: {str(e)}"
}
except Exception as e:
return {
"success": False,
"message": f"未知错误: {str(e)}"
}
@staticmethod
def create_table(db: Session, table_name: str, columns: Dict[str, str], primary_key: Optional[str] = None) -> Dict[str, Any]:
"""创建表"""
try:
# 构建CREATE TABLE语句
column_definitions = []
for col_name, col_type in columns.items():
column_definitions.append(f"{col_name} {col_type}")
if primary_key:
column_definitions.append(f"PRIMARY KEY ({primary_key})")
sql = f"CREATE TABLE {table_name} ({', '.join(column_definitions)})"
db.execute(text(sql))
db.commit()
return {
"success": True,
"message": f"{table_name} 创建成功"
}
except SQLAlchemyError as e:
db.rollback()
return {
"success": False,
"message": f"创建表失败: {str(e)}"
}
except Exception as e:
db.rollback()
return {
"success": False,
"message": f"未知错误: {str(e)}"
}
@staticmethod
def drop_table(db: Session, table_name: str) -> Dict[str, Any]:
"""删除表"""
try:
sql = f"DROP TABLE IF EXISTS {table_name}"
db.execute(text(sql))
db.commit()
return {
"success": True,
"message": f"{table_name} 删除成功"
}
except SQLAlchemyError as e:
db.rollback()
return {
"success": False,
"message": f"删除表失败: {str(e)}"
}
except Exception as e:
db.rollback()
return {
"success": False,
"message": f"未知错误: {str(e)}"
}
@staticmethod
def import_data(db: Session, table_name: str, data: List[Dict[str, Any]]) -> Dict[str, Any]:
"""导入数据到表"""
try:
if not data:
return {
"success": False,
"message": "导入数据不能为空"
}
# 使用pandas DataFrame来处理数据导入
df = pd.DataFrame(data)
# 使用pandas的to_sql方法
df.to_sql(table_name, engine, if_exists='append', index=False)
return {
"success": True,
"message": f"成功导入 {len(data)} 条数据到表 {table_name}"
}
except SQLAlchemyError as e:
db.rollback()
return {
"success": False,
"message": f"导入数据失败: {str(e)}"
}
except Exception as e:
db.rollback()
return {
"success": False,
"message": f"未知错误: {str(e)}"
}
@staticmethod
def get_table_list() -> List[str]:
"""获取所有表名"""
try:
inspector = inspect(engine)
return inspector.get_table_names()
except Exception as e:
return []