上次发布《多种方式访问mysql的对比分析》一文后,有读者留言,说SQLAlchemy的使用方法没讲清楚,只有一段简短的介绍,演示代码也比较模糊,SQLAlchemy在实际项目运用非常广泛,由于其支持 ORM 模型,能够将表映射为类,让你用Python类的方式来操作数据库,而不需要直接写SQL语句,是面向对象访问数据库不可或缺的一个库,根据需求设计数据库结构和操作逻辑,可以大大提高开发效率和代码可维护性。今天风云将该库的详细使用方法重新整理了,发出来,供大家参考,欢迎留言讨论。

为什么还要用SQLAlchemy呢?原因有几个:
1.简洁:SQLAlchemy让你用Python代码来操作数据库,代码更简洁,也更容易理解。
2.安全:它能帮助你避免SQL注入等安全问题,让你的数据库更安全。
3.灵活:SQLAlchemy支持多种数据库(比如MySQL、PostgreSQL、SQLite等),你只需要改一下配置,就能在不同的数据库之间切换。

SQLAlchemy 包括两个核心组件:

  • SQLAlchemy Core提供底层 SQL 构造和数据库连接。
  • SQLAlchemy ORM:实现面向对象的 ORM 映射。

1. 安装 SQLAlchemy

使用 pip 安装:

pip install sqlalchemy

2. 数据库连接

使用 create_engine 创建数据库连接:

from sqlalchemy import create_engine

# 替换为实际的数据库 URL

engine = create_engine('sqlite:///example.db')  # SQLite 示例

# MySQL 示例: 'mysql+pymysql://user:password@host/dbname'

3. 使用 SQLAlchemy Core

3.1 创建表

使用 MetaData 和 Table 定义表结构:

from sqlalchemy import Table, Column, Integer, String, MetaData,create_engine

engine = create_engine('sqlite:///example.db')  # SQLite 示例数据库
metadata = MetaData() # 元数据对象

users = Table(  # 创建表
    'users', metadata,  # 表名

    Column('id', Integer, primary_key=True),  # id字段

    Column('name', String(50)),  # name字段

    Column('age', Integer) # age字段
)

metadata.create_all(engine)  # 创建表

3.2 插入数据

使用 insert() 插入数据:

from sqlalchemy import insert

with engine.connect() as conn:  # 连接数据库
    stmt = insert(users).values(name='Alice', age=30)  # 插入数据

    conn.execute(stmt)  # 执行插入语句

3.3 查询数据

使用 select() 查询:

from sqlalchemy import select

with engine.connect() as conn:  # 连接数据库
    stmt = select(users)  # 查询数据

    result = conn.execute(stmt)  # 执行查询语句

    for row in result:  # 遍历结果
        print(row)

4. 使用 SQLAlchemy ORM

4.1 定义模型

使用 ORM 的 declarative_base 定义模型:

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String

Base = declarative_base()  # 创建基类

class User(Base):  # 创建User类
    __tablename__ = 'users'  # 表名

    id = Column(Integer, primary_key=True)  # id字段

    name = Column(String(50)) # name字段

    age = Column(Integer) # age字段

4.2 创建表

创建表结构:

Base.metadata.create_all(engine)

4.3 创建会话

使用 sessionmaker 创建会话:

from sqlalchemy.orm import sessionmaker

Session = sessionmaker(bind=engine) # 创建Session会话类
session = Session()

4.4 增删改查

插入数据

new_user = User(name='Bob', age=25) # 创建新用户

session.add(new_user)# 添加新用户到session

session.commit() # 提交事务

查询数据

users = session.query(User).all()  # 查询所有用户
for user in users:  # 遍历查询结果
    print(user.name, user.age)

更新数据

user = session.query(User).filter_by(name='Bob').first() # 查询用户
user.age = 26

session.commit() # 提交事务

删除数据

user = session.query(User).filter_by(name='Bob').first() # 查询具体用户,定位到第一条记录

session.delete(user) # 删除用户
session.commit() # 提交事务

5. 使用关系映射

定义一对多关系:

from sqlalchemy import ForeignKey

from sqlalchemy.orm import relationship

class Post(Base):
    __tablename__ = 'posts'  # 表名

    id = Column(Integer, primary_key=True) # id字段

    title = Column(String(100)) # title字段

    user_id = Column(Integer, ForeignKey('users.id')) # 定义外键

    user = relationship('User', back_populates='posts') # 关联关系

User.posts = relationship('Post', order_by=Post.id, back_populates='user') # 关联关系

# 插入关联数据:
user = User(name='Charlie')

post1 = Post(title='Post 1', user=user)

post2 = Post(title='Post 2', user=user)


session.add(user) # 添加关联数据

session.add_all([post1, post2]) # 添加所有关联

session.commit() # 提交事务

#查询关联数据:
user = session.query(User).filter_by(name='Charlie').first() # 查询关联数据

for post in user.posts: # 遍历关联数据
    print(post.title)

6. 使用事务

手动管理事务:

from sqlalchemy.exc import SQLAlchemyError

try:
    with session.begin():  # 开启事务

        user = User(name='Dave', age=40) # 创建新用户

        session.add(user) # 添加新用户到session
except SQLAlchemyError as e: # 如果发生异常,回滚事务

    print(f"Transaction failed: {e}")

    session.rollback() # 回滚事务

7. 完整的封装类

最后,风云按惯例将此使用封装为一个完整的类,大家有需要有的自取

from sqlalchemy import create_engine, Column, Integer, String, exc
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker


Base = declarative_base()  # 创建基类

# 定义表模型
class User(Base):
    __tablename__ = 'users' # 表名

    id = Column(Integer, primary_key=True, autoincrement=True) # id字段

    name = Column(String(50), nullable=False) # name字段

    age = Column(Integer, nullable=False) # age字段

class MySQLHandler:
    def __init__(self, user, password, host, database):
        """初始化数据库连接和会话"""
        try:
            url = f'mysql+pymysql://{user}:{password}@{host}/{database}' # 创建数据库连接

            self.engine = create_engine(url, echo=False) # 数据库引擎

            Base.metadata.create_all(self.engine)  # 创建表

            self.Session = sessionmaker(bind=self.engine) # 创建会话

        except exc.SQLAlchemyError as e:

            print(f"Error initializing database: {e}")


    def add_user(self, name, age):
        """添加用户"""
        session = self.Session() 

        try:
            new_user = User(name=name, age=age) # 创建新用户

            session.add(new_user) # 添加新用户到session
            session.commit() # 提交事务

            print(f"User {name} added successfully!")

        except exc.SQLAlchemyError as e: # 如果发生异常,回滚事务
            session.rollback() # 执行回滚

            print(f"Error adding user: {e}")

        finally:
            session.close() # 关闭会话


    def get_users(self):
        """查询所有用户"""
        session = self.Session()

        try:
            users = session.query(User).all() # 查询所有用户

            return [{"id": user.id, "name": user.name, "age": user.age} for user in users]

        except exc.SQLAlchemyError as e: # 如果发生异常,返回空
            print(f"Error fetching users: {e}")

            return []

        finally:
            session.close()


    def update_user(self, user_id, name=None, age=None):
        """更新用户信息"""
        session = self.Session()

        try:
            user = session.query(User).filter_by(id=user_id).first() # 查询用户

            if not user: # 如果用户不存在,返回

                print(f"User with ID {user_id} not found.")

                return

            if name:
                user.name = name

            if age:
                user.age = age

            session.commit() # 提交事务

            print(f"User {user_id} updated successfully!")

        except exc.SQLAlchemyError as e: # 如果发生异常,回滚事务
            session.rollback()

            print(f"Error updating user: {e}")

        finally:
            session.close()


    def delete_user(self, user_id):
        """删除用户"""
        session = self.Session()

        try:
            user = session.query(User).filter_by(id=user_id).first() # 查询用户

            if not user: # 如果用户不存在,返回
                print(f"User with ID {user_id} not found.")

                return

            session.delete(user) # 删除用户
            session.commit() # 提交事务

            print(f"User {user_id} deleted successfully!")

        except exc.SQLAlchemyError as e: # 如果发生异常,回滚事务
            session.rollback()

            print(f"Error deleting user: {e}")

        finally:
            session.close()


    def transaction_example(self):
        """事务操作示例"""
        session = self.Session()

        try:
            user1 = User(name="Alice", age=30) # 创建2个新用户
            user2 = User(name="Bob", age=25) 

            # 添加多个用户
            session.add(user1) # 添加新用户到session
            session.add(user2)

            # 模拟事务中的错误
            # Uncomment the line below to raise an exception and trigger a rollback

            # raise ValueError("Simulated error!")

            session.commit() # 提交事务

            print("Transaction completed successfully!")

        except (exc.SQLAlchemyError, ValueError) as e: # 如果发生异常,回滚事务
            session.rollback()

            print(f"Transaction failed: {e}")

        finally:
            session.close()


# 使用示例
if __name__ == "__main__":
    # 初始化数据库连接
    db_handler = MySQLHandler(user='root', password='password123', host='localhost', database='test_db')

    # 添加用户
    db_handler.add_user(name="John Doe", age=28)

    # 查询用户
    users = db_handler.get_users()
    print("Users:", users)

    # 更新用户
    db_handler.update_user(user_id=1, name="Jane Doe", age=32)

    # 删除用户
    db_handler.delete_user(user_id=1)

    # 事务示例
    db_handler.transaction_example()

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐