python深入SQLAlchemy使用详解
上次发布《多种方式访问mysql的对比分析》一文后,有读者留言,说SQLAlchemy的使用方法没讲清楚,只有一段简短的介绍,演示代码也比较模糊,SQLAlchemy在实际项目运用非常广泛,由于其支持 ORM 模型,能够将表映射为类,让你用Python类的方式来操作数据库,而不需要直接写SQL语句,是面向对象访问数据库不可或缺的一个库,根据需求设计数据库结构和操作逻辑,可以大大提高开发效率和代码可
上次发布《多种方式访问mysql的对比分析》一文后,有读者留言,说SQLAlchemy的使用方法没讲清楚,只有一段简短的介绍,演示代码也比较模糊,SQLAlchemy在实际项目运用非常广泛,由于其支持 ORM 模型,能够将表映射为类,让你用Python类的方式来操作数据库,而不需要直接写SQL语句,是面向对象访问数据库不可或缺的一个库,根据需求设计数据库结构和操作逻辑,可以大大提高开发效率和代码可维护性。今天风云将该库的详细使用方法重新整理了,发出来,供大家参考,欢迎留言讨论。
为什么还要用SQLAlchemy呢?原因有几个:
1.简洁:SQLAlchemy让你用Python代码来操作数据库,代码更简洁,也更容易理解。
2.安全:它能帮助你避免SQL注入等安全问题,让你的数据库更安全。
3.灵活:SQLAlchemy支持多种数据库(比如MySQL、PostgreSQL、SQLite等),你只需要改一下配置,就能在不同的数据库之间切换。
SQLAlchemy 包括两个核心组件:
- SQLAlchemy Core:提供底层 SQL 构造和数据库连接。
- SQLAlchemy ORM:实现面向对象的 ORM 映射。
1. 安装 SQLAlchemy
使用 pip 安装:
pip install sqlalchemy
2. 数据库连接
使用 create_engine 创建数据库连接:
from sqlalchemy import create_engine
# 替换为实际的数据库 URL
engine = create_engine('sqlite:///example.db') # SQLite 示例
# MySQL 示例: 'mysql+pymysql://user:password@host/dbname'
3. 使用 SQLAlchemy Core
3.1 创建表
使用 MetaData 和 Table 定义表结构:
from sqlalchemy import Table, Column, Integer, String, MetaData,create_engine
engine = create_engine('sqlite:///example.db') # SQLite 示例数据库
metadata = MetaData() # 元数据对象
users = Table( # 创建表
'users', metadata, # 表名
Column('id', Integer, primary_key=True), # id字段
Column('name', String(50)), # name字段
Column('age', Integer) # age字段
)
metadata.create_all(engine) # 创建表
3.2 插入数据
使用 insert() 插入数据:
from sqlalchemy import insert
with engine.connect() as conn: # 连接数据库
stmt = insert(users).values(name='Alice', age=30) # 插入数据
conn.execute(stmt) # 执行插入语句
3.3 查询数据
使用 select() 查询:
from sqlalchemy import select
with engine.connect() as conn: # 连接数据库
stmt = select(users) # 查询数据
result = conn.execute(stmt) # 执行查询语句
for row in result: # 遍历结果
print(row)
4. 使用 SQLAlchemy ORM
4.1 定义模型
使用 ORM 的 declarative_base 定义模型:
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
Base = declarative_base() # 创建基类
class User(Base): # 创建User类
__tablename__ = 'users' # 表名
id = Column(Integer, primary_key=True) # id字段
name = Column(String(50)) # name字段
age = Column(Integer) # age字段
4.2 创建表
创建表结构:
Base.metadata.create_all(engine)
4.3 创建会话
使用 sessionmaker 创建会话:
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine) # 创建Session会话类
session = Session()
4.4 增删改查
插入数据
new_user = User(name='Bob', age=25) # 创建新用户
session.add(new_user)# 添加新用户到session
session.commit() # 提交事务
查询数据
users = session.query(User).all() # 查询所有用户
for user in users: # 遍历查询结果
print(user.name, user.age)
更新数据
user = session.query(User).filter_by(name='Bob').first() # 查询用户
user.age = 26
session.commit() # 提交事务
删除数据
user = session.query(User).filter_by(name='Bob').first() # 查询具体用户,定位到第一条记录
session.delete(user) # 删除用户
session.commit() # 提交事务
5. 使用关系映射
定义一对多关系:
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class Post(Base):
__tablename__ = 'posts' # 表名
id = Column(Integer, primary_key=True) # id字段
title = Column(String(100)) # title字段
user_id = Column(Integer, ForeignKey('users.id')) # 定义外键
user = relationship('User', back_populates='posts') # 关联关系
User.posts = relationship('Post', order_by=Post.id, back_populates='user') # 关联关系
# 插入关联数据:
user = User(name='Charlie')
post1 = Post(title='Post 1', user=user)
post2 = Post(title='Post 2', user=user)
session.add(user) # 添加关联数据
session.add_all([post1, post2]) # 添加所有关联
session.commit() # 提交事务
#查询关联数据:
user = session.query(User).filter_by(name='Charlie').first() # 查询关联数据
for post in user.posts: # 遍历关联数据
print(post.title)
6. 使用事务
手动管理事务:
from sqlalchemy.exc import SQLAlchemyError
try:
with session.begin(): # 开启事务
user = User(name='Dave', age=40) # 创建新用户
session.add(user) # 添加新用户到session
except SQLAlchemyError as e: # 如果发生异常,回滚事务
print(f"Transaction failed: {e}")
session.rollback() # 回滚事务
7. 完整的封装类
最后,风云按惯例将此使用封装为一个完整的类,大家有需要有的自取
from sqlalchemy import create_engine, Column, Integer, String, exc
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base() # 创建基类
# 定义表模型
class User(Base):
__tablename__ = 'users' # 表名
id = Column(Integer, primary_key=True, autoincrement=True) # id字段
name = Column(String(50), nullable=False) # name字段
age = Column(Integer, nullable=False) # age字段
class MySQLHandler:
def __init__(self, user, password, host, database):
"""初始化数据库连接和会话"""
try:
url = f'mysql+pymysql://{user}:{password}@{host}/{database}' # 创建数据库连接
self.engine = create_engine(url, echo=False) # 数据库引擎
Base.metadata.create_all(self.engine) # 创建表
self.Session = sessionmaker(bind=self.engine) # 创建会话
except exc.SQLAlchemyError as e:
print(f"Error initializing database: {e}")
def add_user(self, name, age):
"""添加用户"""
session = self.Session()
try:
new_user = User(name=name, age=age) # 创建新用户
session.add(new_user) # 添加新用户到session
session.commit() # 提交事务
print(f"User {name} added successfully!")
except exc.SQLAlchemyError as e: # 如果发生异常,回滚事务
session.rollback() # 执行回滚
print(f"Error adding user: {e}")
finally:
session.close() # 关闭会话
def get_users(self):
"""查询所有用户"""
session = self.Session()
try:
users = session.query(User).all() # 查询所有用户
return [{"id": user.id, "name": user.name, "age": user.age} for user in users]
except exc.SQLAlchemyError as e: # 如果发生异常,返回空
print(f"Error fetching users: {e}")
return []
finally:
session.close()
def update_user(self, user_id, name=None, age=None):
"""更新用户信息"""
session = self.Session()
try:
user = session.query(User).filter_by(id=user_id).first() # 查询用户
if not user: # 如果用户不存在,返回
print(f"User with ID {user_id} not found.")
return
if name:
user.name = name
if age:
user.age = age
session.commit() # 提交事务
print(f"User {user_id} updated successfully!")
except exc.SQLAlchemyError as e: # 如果发生异常,回滚事务
session.rollback()
print(f"Error updating user: {e}")
finally:
session.close()
def delete_user(self, user_id):
"""删除用户"""
session = self.Session()
try:
user = session.query(User).filter_by(id=user_id).first() # 查询用户
if not user: # 如果用户不存在,返回
print(f"User with ID {user_id} not found.")
return
session.delete(user) # 删除用户
session.commit() # 提交事务
print(f"User {user_id} deleted successfully!")
except exc.SQLAlchemyError as e: # 如果发生异常,回滚事务
session.rollback()
print(f"Error deleting user: {e}")
finally:
session.close()
def transaction_example(self):
"""事务操作示例"""
session = self.Session()
try:
user1 = User(name="Alice", age=30) # 创建2个新用户
user2 = User(name="Bob", age=25)
# 添加多个用户
session.add(user1) # 添加新用户到session
session.add(user2)
# 模拟事务中的错误
# Uncomment the line below to raise an exception and trigger a rollback
# raise ValueError("Simulated error!")
session.commit() # 提交事务
print("Transaction completed successfully!")
except (exc.SQLAlchemyError, ValueError) as e: # 如果发生异常,回滚事务
session.rollback()
print(f"Transaction failed: {e}")
finally:
session.close()
# 使用示例
if __name__ == "__main__":
# 初始化数据库连接
db_handler = MySQLHandler(user='root', password='password123', host='localhost', database='test_db')
# 添加用户
db_handler.add_user(name="John Doe", age=28)
# 查询用户
users = db_handler.get_users()
print("Users:", users)
# 更新用户
db_handler.update_user(user_id=1, name="Jane Doe", age=32)
# 删除用户
db_handler.delete_user(user_id=1)
# 事务示例
db_handler.transaction_example()
更多推荐
所有评论(0)