Python SQLAlchemy详细使用指南
目录
1、SQLAlchemy
1.1、ORM概述
1.2、SQLAlchemy概述
1.3、SQLAlchemy的组成部分
1.4、SQLAlchemy的使用
1.4.1、安装
1.4.2、创建数据库连接
1.4.3、执行原生SQL语句
1.4.4、映射已存在的表
1.4.5、创建表
1.4.5.1、创建表的两种方式
1、使用 Table 类直接创建表
2、使用声明式方式创建模型类
1.4.5.2、约束
1、创建约束示列
2、外键约束
3、删除/更新行为
1.4.5.3、多表关系
2、一对一关系
3、一对多/多对一关系
4、多对多关系
1.4.5.4、scoped_session实现线程安全
1.4.5.5、新增数据
1.4.5.6、修改数据
1.4.5.7、删除数据
1.4.5.8、查询数据
1、测试数据准备
2、基础查询
2.1、查询多个字段(查询指定字段)
2.2、去除重复记录
2.3、调试小技巧
3、条件查询
3.1、常用的比较运算符
3.2、常见的逻辑运算符
3.3、综合示例
4、聚合函数
4.1、常见的聚合函数
4.2、综合示列
5、分组查询
5.1、综合示例
6、排序查询
6.1、排序方式
7、 分页查询
7.1、综合示例
1.4.5.9、多表查询
1、多表查询概述
1.1、测试数据准备
1.2、概述
1.3、多表查询的分类
2、内连接
3、外连接
4、自连接查询
5、联合查询
6、子查询
6.1、概述
6.2、标量子查询
6.3、列子查询
6.4、行子查询
6.5、表子查询
1、SQLAlchemy
1.1、ORM概述
定义:ORM(Object-Relational Mapping)模型,即对象关系映射,是一种程序设计技术,用于实现面向对象编程语言里不同类型系统的数据之间的转换。在面向对象的编程语言中,如Java、Python等,数据通常被组织成复杂的对象关系。简单来说就是ORM模型把面向对象编程与操作数据库之间建立了映射。设置开发者操作数据库无需维护和编写SQL语句,而是基于面对对象的方式操作数据库。
映射关系:数据库中的表>编程语言中的类,表中的字段>类中的属性,表之间的关系>类之间的关系。
使用ORM模型的优势在于:
当然ORM也存在一下劣势:
1.2、SQLAlchemy概述
在Python语言中实现ORM系统的就是SQLAlchemy,它具备以下特点:
然而,ORM模型也存在一些缺点:
注意:在处理复杂的SQL查询时,由于ORM框架效率低下,所以这个时候可以编写SQL语句执行原生SQL语句。
1.3、SQLAlchemy的组成部分
1、核心架构(Core):
2、ORM架构:
4、数据库连接池(Connection Pooling):管理数据库连接的池化,确保高效的数据库连接复用。
5、Dialect:选择连接数据库的DB API种类,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作。
6、架构和类型(Schema/Types):定义数据库的架构和数据类型。
7、SQL表达式语言(SQL Expression Language):
1.4、SQLAlchemy的使用
SQLAlchemy官方文档:Dialects — SQLAlchemy 2.0 Documentation
1.4.1、安装
pip install sqlalchemy
1.4.2、创建数据库连接
注意:sqlalchemy没有提供直接连接数据库的操作,所以需要借助第三方库来连接数据库,操作数据库。以 MySQL 为例,sqlalchemy就是借助pymsql库来实现对数据的连接和操作。
连接不同/相同的数据库借助不同的第三方库如下:
MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value…]
创建连接:
from sqlalchemy import create_engine
from urllib import parse
user = "root" # 用户名
password = "" # 密码
pwd = parse.quote_plus(password) # 解决密码中含@符导致报错
host = "172.22.70.174" # 数据库主机地址
# 第一步: 创建engine
engine = create_engine(
url=f"mysql+pymysql://{user}:{pwd}@{host}:3306/test?charset=utf8",
max_overflow=10, # 超过连接池大小外最多创建的连接
pool_size=10, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 对线程池中的线程进行一次连接的回收的时间,如果是3600,表示1个小时后对连接进行回收
)
1.4.3、执行原生SQL语句
需求:查询表t_student的全部数据,执行的SQL语句是:select * from t_student
from sqlalchemy import create_engine
from urllib import parse
import threading
user = "root" # 用户名
password = "" # 密码
pwd = parse.quote_plus(password) # 解决密码中含@符导致报错
host = "172.22.70.174" # 数据库主机地址
# 第一步: 创建engine
engine = create_engine(
url=f"mysql+pymysql://{user}:{pwd}@{host}:3306/test?charset=utf8",
max_overflow=2, # 超过连接池大小外最多创建的连接
pool_size=3, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 对线程池中的线程进行一次连接的回收的时间,如果是3600,表示1个小时后对连接进行回收
)
# 第二步:使用
def test_execute():
# conn = engine.connect() # 创建一个新的连接
conn = engine.raw_connection() # 从连接池中取一个连接
cursor = conn.cursor() # 创建游标
sql = "select * from t_student" # 定义执行的SQL语句
cursor.execute(sql) # 执行SQL语句
print(cursor.fetchall()) # 获取执行的结果并打印置控制台
# 测试配置是否生效
if __name__ == '__main__':
for i in range(20):
t = threading.Thread(target=test_execute)
t.start()
1.4.4、映射已存在的表
说明:使用ORM映射已存在的表时,只能映射其对应的字段,对于每个字段的约束最好和原表保持一致,映射已存在的表时不能新增字段,新增外键约束,新增索引操作。如果需要进行这些操作可以数据库中执行相关sql语句或者使用SQLAlchemy的迁移工具(如Alembic)
注意:
示例:
需求:创建Student表模型映射数据库中的t_student表
import datetime
from sqlalchemy.orm import declarative_base , sessionmaker
from sqlalchemyBaseUse import engine # 这里的engine就是上面创建连接中创建的engine
from sqlalchemy import Column, Integer, String, Text, DateTime, Index
# 声明ORM基类
Base = declarative_base()
class Student(Base):
__tablename__ = 't_student'
id = Column(Integer, primary_key=True)
stuno = Column(String(10), comment="学号")
name = Column(String(10), comment="姓名")
gender = Column(String(1), comment="性别")
age = Column(Integer, comment="年龄")
idcard = Column(String(18), comment="身份证")
entrydate = Column(DateTime, default=datetime.datetime.now, comment="入学时间")
addr = Column(String(50), comment="家庭地址")
def init_db():
# 创建继承base类的表的映射关系
Base.metadata.create_all(engine)
def drop_db():
# 删除继承base类的表,注意:除了删除表的映射关系,数据库中的表和数据都会被删减,生产中谨慎操作
Base.metadata.drop_all(engine)
if __name__ == '__main__':
init_db()
1.4.5、创建表
1.4.5.1、创建表的两种方式
1、使用 Table 类直接创建表
示例:
from sqlalchemy import create_engine, Column, Integer, String, MetaData, Table, CheckConstraint
# 假设已经有了一个引擎和元数据
engine = create_engine('sqlite:///:memory:')
metadata = MetaData()
# 创建一个表,并添加一个检查约束
my_table = Table('my_table', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(50)),
Column('age', Integer),
CheckConstraint("age >= 0 AND age <= 150", name='age_check')
)
# 创建表(包括检查约束)
metadata.create_all(engine)
2、使用声明式方式创建模型类
示例:
from sqlalchemy import Column, Integer, String, CheckConstraint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class MyModel(Base):
__tablename__ = 'my_table'
id = Column(Integer, primary_key=True)
name = Column(String(50))
age = Column(Integer)
__table_args__ = (
CheckConstraint("age >= 0 AND age <= 150", name='age_check'),
)
1.4.5.2、约束
概念:约束是作用于表中字段上的规则,用于限制存储在表中的数据。
目的:保证数据库中数据的正确、有效性和完整性。
分类:
约束 |
描述 |
关键字 |
非空约束 |
限制该字段的数据不能为null |
NOT NULL |
唯一约束 |
保证该字段的所有数据都是唯一、不重复的 |
UNIQUE |
主键约束 |
主键是一行数据的唯一标识,要求非空且唯一 |
PRIMARY KEY |
默认约束 |
保存数据时,如果未指定该字段的值,则采用默认值 |
DEFAULT |
检查约束(8.0.16版本之后) |
保证字段值满足某一个条件 |
CHECK |
外键约束 |
用来让两张表的数据之间建立连接,保证数据的一致性和完整性 |
FOREIGN KEY |
注意:约束是作用于表中字段上的,可以在创建表/修改表的时候添加约束。
1、创建约束示列
需求如下:
要求创建一张名为t_user的数据表,各个字段的规则如下:
字段名 |
字段含义 |
字段类型 |
约束条件 |
约束关键字 |
id |
ID唯一标识 |
int |
主键,并且自动增长 |
PRIMARY KEY,AUTO_INCREMENT |
name |
姓名 |
varchar(10) |
不为空,并且唯一 |
NOT NULL , UNIQUE |
age |
年龄 |
int |
大于0,并且小于等于150 |
CHECK |
status |
状态 |
char(1) |
如果没有指定该值,默认为1 |
DEFAULT |
gender |
性别 |
char(1) |
无 |
from sqlalchemy.orm import declarative_base , sessionmaker
from sqlalchemyBaseUse import engine
from sqlalchemy import Column, Integer, String, CheckConstraint
# 声明ORM基类
Base = declarative_base()
class UserModel(Base):
__tablename__ = "t_user"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(10), nullable=True, unique=True, comment="姓名")
age = Column(Integer, comment="年龄")
status = Column(String(1), default=1, comment="状态")
gender = Column(String(1), comment="性别")
# 添加age的检查约束
# 注意__table_args__是元祖数据类型,如果只有一个数据的时候,注意后面的逗号不能少
__table_args__ = (
CheckConstraint("age >0 AND age <= 150", name="age_check"),
)
def init_db():
# 创建继承base类的表的映射关系
Base.metadata.create_all(engine)
if __name__ == '__main__':
init_db()
2、外键约束
作用:外键用来让两张表的数据之间建立连接,从而保证数据的一致性和完整性。
mysql 语法:
1、添加外键
CREATE TABLE 表名(
字段名 数据类型,
…
[CONSTRAINT] [外键名称] FOREIGN KEY (外键字段名) REFERENCES 主表 (主表列名)
);
或
ALTER TABLE 表名 ADD CONSTRAINT 外键名称 FOREIGN KEY (外键字段名)REFERENCES 主表 (主表列名) ;2、删除外键
ALTER TABLE 表名 DROP FOREIGN KEY 外键名称;
在 SQLAlchemy 中,创建外键约束通常不需要直接使用 __table_args__,因为 SQLAlchemy 提供了 ForeignKey 和 relationship 这两个工具来定义关系和外键约束。
示例:
需求:创建一张部门表和员工表,建立外键约束关系一个员工对应一个部门。
部门表 t_departments
字段名 |
字段含义 |
字段类型 |
约束条件 |
约束关键字 |
id |
ID唯一标识 |
int |
主键,并且自动增长 |
PRIMARY KEY,AUTO_INCREMENT |
name |
部门名称 |
varchar(50) |
非空约束 |
NOT NULL |
员工表 t_employees
字段名 |
字段含义 |
字段类型 |
约束条件 |
约束关键字 |
id |
ID唯一标识 |
int |
主键,并且自动增长 |
PRIMARY KEY,AUTO_INCREMENT |
name |
姓名 |
varchar(50) |
非空约束 |
NOT NULL |
age |
年龄 |
int |
||
dept_id |
部门id |
int |
外键约束 |
FOREIGN KEY |
1、创建ORM表模型如下:
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemyBaseUse import engine
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index, CheckConstraint
# 声明ORM基类
Base = declarative_base()
# 创建部门表
class Department(Base):
__tablename__ = 't_departments'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False, comment="部门名称")
# 定义与 Employee 的一对多关系
employees = relationship("Employee", back_populates="department")
# 创建员工表
class Employee(Base):
__tablename__ = 't_employees'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False, comment="姓名")
age = Column(Integer, comment="年龄")
department_id = Column(Integer, ForeignKey('t_departments.id', ondelete="CASCADE"), nullable=False)
# 定义与 Department 的多对一关系
department = relationship("Department", back_populates="employees")
def init_db():
# 创建继承base类的表的映射关系
Base.metadata.create_all(engine)
if __name__ == '__main__':
init_db()
2、往数据库中插入测试数据如下:
# 部门表测试数据
INSERT INTO t_departments (id, name) VALUES (1, '研发部'), (2, '市场部'),(3, '财务部');# 员工表测试数据
INSERT INTO t_employees (id, name, age, department_id)
VALUES (1, '张三', 22, 1), (2, '李四', 33, 1), (3, 'TOM', 30, 2), (4, '小艺', 25, 2), (5, '小李', 36, 3);
3、展示通过关联关系查询,如:查询员工姓名为张三对应的部门信息
def query():
Session = sessionmaker(bind=engine)
session = Session()
# 查询姓名为张三的员工
employee = session.query(Employee).filter_by(name="张三").first()
# 如果找到了员工,则获取其部门信息
if employee:
department = employee.department
print(f"员工姓名: {employee.name}, 部门名称: {department.name}")
else:
print("没有找到姓名为'张三'的员工")
if __name__ == '__main__':
# init_db()
query()
4、展示通过关联关系查询,如:查询研发部所有员工信息
def query2():
Session = sessionmaker(bind=engine)
session = Session()
# 直接通过部门对象获取其员工
department = session.query(Department).filter_by(name='研发部').first()
if department:
for employee in department.employees: # 使用了Department 模型中有一个名为 'employees' 的反向关联
print(f"员工姓名: {employee.name}, 员工ID: {employee.id}")
if __name__ == '__main__':
# init_db()
query2()
还是上面4的示例,如果没有使用反向关联,在正常情况下我们应该如何查询呢?
思考:如果正常在数据库中查询时,利用的外键关系进行查询,可以使用内连接查询,对应的SQL语句如下:
select e.*, d.name from t_departments d inner join t_employees e on e.department_id=d.id where d.name='研发部';
那么把SQL语句对应到ORM中就是(使用子查询):
def query3():
Session = sessionmaker(bind=engine)
session = Session()
# 1、先查询研发部信息
department = session.query(Department).filter_by(name='研发部').first()
# 如果没有找到研发部,则退出查询
if not department:
print("没有找到研发部")
session.close()
exit()
# 2、根据研发部的部门id查询所有员工
employees = session.query(Employee).filter_by(department_id=department.id).all()
# 打印研发部所有员工的信息
for employee in employees:
print(f"员工姓名: {employee.name}, 员工ID: {employee.id}")
if __name__ == '__main__':
# init_db()
query3()
综上:使用反向关联可以直接查询所需要的数据,效率更高,代码更加简洁。
5、relationship的使用说明
可以看到上面两张表中分别创建了一个反向关联如下:
t_departments: employees = relationship("Employee", back_populates="department")
t_employees :department = relationship("Department", back_populates="employees")
参数说明:
3、删除/更新行为
添加了外键之后,再删除父表数据时产生的约束行为,就称为删除/更新行为。具体的删除/更新行为有以下几种:
行为 |
说明 |
NO ACTION |
当在父表中删除/更新对应记录时,首先检查该记录是否有对应外键,如果有则不允许删除/更新。 (与 RESTRICT 一致) 默认行为 |
RESTRICT |
当在父表中删除/更新对应记录时,首先检查该记录是否有对应外键,如果有则不允许删除/更新。 (与 NO ACTION 一致) 默认行为 |
CASCADE |
当在父表中删除/更新对应记录时,首先检查该记录是否有对应外键,如果有,则同时删除/更新外键在子表中的记录。 |
SET NULL |
当在父表中删除对应记录时,首先检查该记录是否有对应外键,如果有则设置子表中该外键值为null(这就要求该外键允许取null)。 |
SET DEFAULT |
父表有变更时,子表将外键列设置成一个默认的值 (Innodb不支持) |
对应SQL语法:
ALTER TABLE 表名 ADD CONSTRAINT 外键名称 FOREIGN KEY (外键字段) REFERENCES 主表名 (主表字段名) ON UPDATE CASCADE ON DELETE CASCADE;
示例:
在上面的t_employees表中我们创建了一个外键,并指定了外键的删除行为为CASCADE,也就是如果父表t_departments发生了删除,那么对应子表t_employees中外键关联的数据也会被删除。
department_id = Column(Integer, ForeignKey('t_departments.id', ondelete="CASCADE"), nullable=False)
注意事项:
1.4.5.3、多表关系
1、relationship 的作用与使用
relationship 是 SQLAlchemy 库中的一个重要功能,它用于在模型(或称为“表”)之间建立关联关系。以下是对 relationship 的详细说明,包括其作用和使用方式:
作用:
使用方式:
from sqlalchemy.orm import relationship
2、一对一关系
实现:在任意一方添加外键,关联另一方的主键,并设置外键为唯一的(UNIQUE),一般用作单表的拆分【注:这里的实现是指在MySQL中的实现方式,但是在ORM中实现思路是一样的】
示列:用户 与 用户详情的关系
关系:一对一关系,用于单表拆分,将一张表的基础字段放在一张表中,其他详情字段放在另一张表中,用来提升操作效率。
1、方式一:使用back_populates参数实现反向关联
# 声明ORM基类
Base = declarative_base()
class User(Base):
__tablename__ = 't_user'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False, comment="姓名")
# 定义一个关系来访问UserDetail对象
detail = relationship("UserDetail", uselist=False, back_populates="user")
class UserDetail(Base):
__tablename__ = 't_user_details'
id = Column(Integer, primary_key=True, autoincrement=True)
address = Column(String(100), comment="地址")
phone = Column(String(20), comment="电话")
# 外键列,引用t_user表的id
user_id = Column(Integer, ForeignKey('t_user.id'), nullable=False, unique=True)
# 定义一个关系来回引用User对象
user = relationship("User", back_populates="detail")
2、方式二:使用backref参数实现反向关联
说明:backref 参数是 relationship() 函数的一个非常有用的功能,它允许我们自动创建反向关系,而无需在另一个模型类中显式定义它。使用 backref 可以简化代码并减少冗余。
class User(Base):
__tablename__ = 't_user'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False, comment="姓名")
# 使用 backref 自动创建反向关系
detail = relationship("UserDetail", uselist=False, backref="user")
class UserDetail(Base):
__tablename__ = 't_user_details'
id = Column(Integer, primary_key=True, autoincrement=True)
address = Column(String(100), comment="地址")
phone = Column(String(20), comment="电话")
# 外键列,引用t_user表的id
user_id = Column(Integer, ForeignKey('t_user.id'), nullable=False, unique=True)
# 注意:这里我们不再需要显式定义 user 关系,因为 backref 已经为我们做了
# 定义一个关系来回引用User对象
# user = relationship("User", back_populates="detail")
3、backref与back_populates的区别
backref
back_populates
3、一对多/多对一关系
实现:在多的一方建立外键,指向一的一方的主键
示例:部门表和员工表关系
关系:一个员工对应一个部门,一个部门对应多个员工
class Department(Base):
__tablename__ = 't_departments'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False, comment="部门名称")
# 定义与 Employee 的一对多关系
employees = relationship("Employee", back_populates="department")
class Employee(Base):
__tablename__ = 't_employees'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False, comment="姓名")
age = Column(Integer, comment="年龄")
department_id = Column(Integer, ForeignKey('t_departments.id', ondelete="CASCADE"), nullable=False)
# 定义与 Department 的多对一关系
department = relationship("Department", back_populates="employees")
4、多对多关系
实现:建立第三张中间表,中间表至少包含两个外键,分别关联两方的主键
示列:学生 与 课程的关系
关系:一个学生可以选修多门课程,一门课程也可让多个学生选择
学生表 t_student
字段名 |
字段含义 |
字段类型 |
约束条件 |
约束关键字 |
id |
ID唯一标识 |
int |
主键,并且自动增长 |
PRIMARY KEY,AUTO_INCREMENT |
name |
姓名 |
varchar(10) |
非空约束 |
NOT NULL |
stuno |
学号 |
varchar(10) |
非空约束 |
NOT NULL |
课程表 t_course
字段名 |
字段含义 |
字段类型 |
约束条件 |
约束关键字 |
id |
ID唯一标识 |
int |
主键,并且自动增长 |
PRIMARY KEY,AUTO_INCREMENT |
name |
课程名称 |
varchar(10) |
非空约束 |
NOT NULL |
学生课程关系表 t_student_course
字段名 |
字段含义 |
字段类型 |
约束条件 |
约束关键字 |
id |
ID唯一标识 |
int |
主键,并且自动增长 |
PRIMARY KEY,AUTO_INCREMENT |
studentid |
学生ID |
int |
非空约束,外键约束 |
NOT NULL, FOREIGN KEY |
courseid |
课程ID |
int |
非空约束,外键约束 |
NOT NULL, FOREIGN KEY |
方式一:关联表不直接映射到ORM类,使用back_populates参数实现反向关联
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
from sqlalchemyBaseUse import engine
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, Table
# 声明ORM基类
Base = declarative_base()
# 关联表(不直接映射到ORM类)
student_course = Table('t_student_course', Base.metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column('studentid', Integer, ForeignKey('t_student.id'), nullable=False),
Column('courseid', Integer, ForeignKey('t_course.id'), nullable=False),
)
class Student(Base):
__tablename__ = 't_student'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(10), nullable=False, comment="姓名")
stuno = Column(String(10), nullable=False, comment="学号")
# 使用relationship()定义多对多关系
courses = relationship("Course",
# 指定中间关联表
secondary="t_student_course",
back_populates="students",
lazy='dynamic') # 使用lazy='dynamic'可以返回查询对象
class Course(Base):
__tablename__ = 't_course'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(10), nullable=False, comment="课程名称")
# 使用relationship()定义反向多对多关系
students = relationship("Student",
# 指定中间关联表
secondary="t_student_course",
back_populates="courses")
def init_db():
# 创建继承base类的表的映射关系
Base.metadata.create_all(engine)
if __name__ == '__main__':
init_db()
方式二:关联表映射到ORM类 使用backref参数实现反向关联
class Student(Base):
__tablename__ = 't_student'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(10), nullable=False, comment="姓名")
stuno = Column(String(10), nullable=False, comment="学号")
# 与StudentCourse定义多对多关系
courses = relationship("Course",
secondary="t_student_course", # 关联表的名称
backref="students") # 为Course模型创建反向引用
class Course(Base):
__tablename__ = 't_course'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(10), nullable=False, comment="课程名称")
class StudentCourse(Base):
__tablename__ = 't_student_course'
id = Column(Integer, primary_key=True, autoincrement=True)
studentid = Column(Integer, ForeignKey('t_student.id'), nullable=False)
courseid = Column(Integer, ForeignKey('t_course.id'), nullable=False)
__table_args__ = (
# 确保student_id和course_id的组合是唯一的
UniqueConstraint('studentid', 'courseid', name='_student_course_uc'),
)
示例:查询姓名为张三选择的所有课程名称
1、插入测试数据
insert into t_student values (null, '张三', '2000100101'),(null, '李四',
'2000100102'),(null, '小五', '2000100103'),(null, '小七', '2000100104');insert into t_course values (null, 'Java'), (null, 'PHP'), (null , 'MySQL') , (null, 'Hadoop');
insert into t_student_course values (null,1,1),(null,1,2),(null,1,3),(null,2,2), (null,2,3),(null,3,4);
2、代码实现
def query():
Session = sessionmaker(bind=engine)
session = Session()
stu = session.query(Student).filter_by(name="张三").first()
if stu:
for course in stu.courses:
print(course.name)
if __name__ == '__main__':
query()
1.4.5.4、scoped_session实现线程安全
在SQLAlchemy中,scoped_session是一个工厂,它产生线程局部(thread-local)的Session对象。也就是在一个线程中,多次调用scoped_session工厂将返回同一个Session实例,而在另一个线程中,你会得到一个不同的实例。这有助于实现线程安全的数据库会话管理,因为每个线程都有自己的会话,从而避免了并发问题。
使用scoped_session来实现线程安全的步骤:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session,declarative_base
# 假设你有一个Base类和你的模型定义...
Base = declarative_base()
# 创建一个引擎
engine = create_engine('sqlite:///example.db')
# 创建一个Session类
Session = sessionmaker(bind=engine)
# 使用scoped_session来包装Session类
# 这将确保每次在同一个线程中调用scoped_session()时,都会返回相同的Session实例
scoped_session = scoped_session(Session)
# 在你的代码中...
def some_function():
# 获取一个会话
session = scoped_session()
# 使用会话进行查询、添加、更新或删除操作...
# 例如: result = session.query(MyModel).filter_by(some_column='value').first()
# 提交事务(如果需要)
session.commit()
# 关闭会话(通常不需要,因为scoped_session会在线程结束时自动关闭会话)
# 但如果你想在函数结束时立即关闭它,可以调用remove()方法
# scoped_session.remove()
1.4.5.5、新增数据
需求:在表t_student中新增数据
数据模型:
class Student(Base):
__tablename__ = 't_student'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(10), nullable=False, comment="姓名")
stuno = Column(String(10), nullable=False, comment="学号")
# 使用relationship()定义多对多关系
courses = relationship("Course",
secondary="t_student_course",
back_populates="students",
lazy='dynamic') # 使用lazy='dynamic'可以返回查询对象
新增数据示例:
from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session
from sqlalchemyBaseUse import engine
from many_to_many_relationship import Student
# 创建ORM基类
Base = declarative_base()
# 创建会话类
Session = sessionmaker(bind=engine)
# 使用scoped_session创建线程安全会话
session = scoped_session(Session)
# # 新增一条数据
# # 创建一个新的Student实例
# stu1 = Student(name="Tom", stuno="2000100105")
# # 将新实例添加到会话中
# session.add(stu1)
# # 提交会话,将数据保存到数据库
# session.commit()
# 批量添加数据
session.add_all([
Student(name="Jack", stuno="2000100106"),
Student(name="小爱", stuno="2000100107"),
Student(name="大胖", stuno="2000100108"),
])
session.commit()
1.4.5.6、修改数据
修改数据的流程:先查询出需要修改的数据,然后修改数据,最后提交修改。
需求:修改表t_student的数据
注意:如果stuno的长度是10,需要增加一下长度,不然新增超过长度,会出现添加失败的错误。修改字段数据类型的SQL语句如下(需要执行SQL语句):
ALTER TABLE t_student MODIFY stuno varchar(30);
from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session
from sqlalchemyBaseUse import engine
from many_to_many_relationship import Student
# 创建ORM基类
Base = declarative_base()
# 创建会话类
Session = sessionmaker(bind=engine)
# 使用scoped_session创建线程安全会话
session = scoped_session(Session)
# 1、修改id=5的数据,修改为name="Nicky", stuno="2000100109"
# # 查询出id=5的数据
# stu = session.get(Student, 5)
# # 修改数据
# if stu:
# stu.name = "Nicky"
# stu.stuno = "2000100109"
# # 提交修改的数据
# session.commit()
# # 2、修改name="大胖"的数据,修改为name="小胖"
# session.query(Student).filter_by(name="大胖").update({"name":"小胖"})
# # 提交修改的数据
# session.commit()
# 3、修改id>6的stuno,每个stuno前面都加123
session.query(Student).filter(Student.id > 6).update({"stuno": "123" + Student.stuno})
session.commit()
1.4.5.7、删除数据
删除数据流程:先查询出数据,然后再删除数据
需求:删除表t_student的数据
from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session
from sqlalchemyBaseUse import engine
from many_to_many_relationship import Student
# 创建ORM基类
Base = declarative_base()
# 创建会话类
Session = sessionmaker(bind=engine)
# 使用scoped_session创建线程安全会话
session = scoped_session(Session)
# 1、删除id=5的数据
# stu_info = session.get(Student, 5)
# if stu_info:
# session.delete(stu_info)
# session.commit()
# 2、删除name="小胖"的数据
session.query(Student).filter_by(name="小胖").delete()
session.commit()
1.4.5.8、查询数据
1、测试数据准备
ALTER TABLE t_student2 CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
from sqlalchemy.orm import declarative_base , sessionmaker, scoped_session
from sqlalchemyBaseUse import engine
from sqlalchemy import Column, Integer, String, DateTime
# 声明ORM基类
Base = declarative_base()
Session = sessionmaker(bind=engine)
session = scoped_session(Session)
class Student(Base):
__tablename__ = 't_student2'
id = Column(Integer, primary_key=True)
stuno = Column(String(10), comment="学号")
name = Column(String(10), comment="姓名")
gender = Column(String(1), comment="性别")
age = Column(Integer, comment="年龄")
idcard = Column(String(18), comment="身份证")
entrydate = Column(DateTime, comment="入学时间")
addr = Column(String(50), comment="家庭地址")
def init_db():
# 创建继承base类的表的映射关系
Base.metadata.create_all(engine)
def basic_query():
pass
if __name__ == '__main__':
init_db()
# 向表中插入数据
insert into t_student2 values('1','1','小洋洋','女',18,'12345678901234957','2023-02-03',"北京"),
('2','2','小芳','女',18,'123456789012345789','2023-02-03',"北京"),
('3','3','小枫','男',22,'123456789012345123','2023-01-03',"上海"),
('4','4','小敏','女',20,'123456789012345345','2022-01-03',"北京"),
('5','5','小李','男',20,'12345678901234534X','2022-01-03',"上海"),
('6','6','王小敏','女',16,'123456789012345345','2022-01-03',"成都"),
('7','7','大刘','男',25,'123456789012345102','2022-01-03',"深圳"),
('8','8','林逸','男',17,'12345678901234534X','2022-01-03',"北京"),
('9','9','莫小迪','女',21,'123456789012345302','2022-01-03',"成都"),
('10','10','林仙仙','女',16,'123456789012345330','2022-01-03',"深圳"),
('11','11','叶小辰','男',18,'123456789012345352','2022-01-03',"成都"),
('12','12','韩跑跑','男',24,'12345678901234554X','2022-01-03',"北京");
2、基础查询
2.1、查询多个字段(查询指定字段)
# 指定字段查询
stus = session.query(Student.name, Student.age).all()
for name, age in stus:
print(name, age)
# 全表查询
stus = session.query(Student).all()
for stu in stus:
print(stu.name)
2.2、去除重复记录
# 查询单个字段的不重复值
# 查询User表中所有不重复的name字段值
unique_names = session.query(distinct(User.name)).all()
# 遍历结果
for name in unique_names:
print(name)
# 查询多个字段的不重复组合
# 查询User表中所有不重复的name和age组合
unique_name_age_combinations = session.query(distinct(User.name, User.age)).all()
# 遍历结果
for name, age in unique_name_age_combinations:
print(name, age)
2.3、调试小技巧
在编写完查询代码后,可以先打印出代码对应转化的SQL语句,检查SQL语句是否正确,然后再执行。
示例:
# 指定字段查询
# 此时语句结尾不加.all(),输出的就是SQL语句
stus = session.query(Student.name, Student.age)
print(stus)
3、条件查询
3.1、常用的比较运算符
比较运算符 |
作用 |
> |
大于 |
>= |
大于等于 |
小于 |
|
小于等于 |
|
= |
等于 |
<> 或 != |
不等于 |
IN(…) |
在in之后的列表中的值,多选一 |
LIKE 占位符 |
模糊匹配(_匹配单个字符, %匹配任意个字符) |
IS NULL |
是NULL |
3.2、常见的逻辑运算符
逻辑运算符 |
作用 |
AND |
并且 (多个条件同时成立) |
OR |
或者 (多个条件任意一个成立) |
NOT |
非 , 不是 |
3.3、综合示例
def conditional_query():
# 1、查询年龄大于20的学生
stus = session.query(Student).filter(Student.age > 20).all()
# 2、查询年龄不等于20的学生
stus2 = session.query(Student).filter(Student.age != 20).all()
# 或者使用<>, 注意python3.X版本不支持使用<>,官方推荐使用!=作为不等于运算符。
# stus2_1 = session.query(Student).filter(Student.age <> 20).all()
# 3、查询年龄为18或20或25的学生信息
stus3 = session.query(Student).filter(Student.age.in_([18, 20, 25])).all()
# 4、查询年龄不为18或20或25的学生信息
stus4 = session.query(Student).filter(~Student.age.in_([18, 20, 25])).all()
# 5、查询家庭住址为空的学生信息
stus5 = session.query(Student).filter(Student.addr == None).all()
# 或者使用is_()方法
stus5_1 = session.query(Student).filter(Student.addr.is_(None)).all()
# 6、查询家庭住址不为空的学生信息
stus6 = session.query(Student).filter(Student.addr != None).all()
# 或者使用isnot_()方法
stus6_1 = session.query(Student).filter(Student.addr.isnot(None)).all()
# 7、查询姓林,名字是两个字的学生信息
stus7 = session.query(Student).filter(Student.name.like("林_")).all()
# 8、查询身份证号最后一位是X的学生信息
stus8 = session.query(Student).filter(Student.idcard.like("%X")).all()
# 9、查询年龄在18岁(包含)到25岁(包含)之间的学生信息
stus9 = session.query(Student).filter(Student.age >=18, Student.age <= 25).all()
# 或者使用and_()方法
stus9_1 = session.query(Student).filter(and_(Student.age >= 18, Student.age <= 25)).all()
# 10、查询年龄为18或20或25的学生信息
stus10 = session.query(Student).filter(or_(Student.age==18, Student.age==20, Student.age==25)).all()
4、聚合函数
说明:将一列数据作为一个整体,进行纵向计算 。
语法:
注意 : NULL值是不参与所有聚合函数运算的。
4.1、常见的聚合函数
函数 |
作用 |
count |
统计数量 |
max |
最大值 |
min |
最小值 |
avg |
平均值 |
sum |
求和 |
说明:SQLAlchemy 提供了一组内置的函数,这些函数可以在 func 命名空间中直接使用,类似于 SQL 中的聚合函数,如 COUNT(), SUM(), AVG(), MAX(), MIN() 等。
4.2、综合示列
def aggregate_query():
# 1、统计学生总人数
count = session.query(func.count(Student.id)).scalar()
# 2、统计学生的平均年龄
avg = session.query(func.avg(Student.age)).scalar()
# 3、统计学生的最大年龄
max = session.query(func.max(Student.age)).scalar()
# 4、统计学生的最小年龄
min = session.query(func.min(Student.age)).scalar()
# 统计男生的总年龄
count_man = session.query(func.sum(Student.age)).filter_by(gender="男").scalar()
5、分组查询
1、where与having区别
注意事项:
在 SQLAlchemy 中,使用 group_by() 方法来执行分组查询。分组查询通常与聚合函数(如 COUNT(), SUM(), AVG(), MAX(), MIN() 等)一起使用,以便对每个分组进行计算。
5.1、综合示例
def group_by_query():
# 1、根据性别分组,统计男学生和女学生的数量
# select sex, count(*) from t_student2 group by sex;
results = session.query(Student.gender, func.count(Student.id).label("stu_nums")).group_by(Student.gender).all()
# for gender, stu_nums in results:
# print(gender, stu_nums)
# 2、根据性别分组 , 统计男学生和女学生的平均年龄
# select sex,avg(age) from t_student2 group by sex;
results2 = session.query(Student.gender, func.avg(Student.age).label("avg_age")).group_by(Student.gender).all()
# for gender, avg_age in results2:
# print(gender, avg_age)
# 3、查询年龄小于25的学生 , 并根据家庭地址分组 , 获取学生数量大于等于3的家庭地址
# select addr,count(*) addr_num from t_student2 where age<25 group by addr having addr_num>=3;
# 方式一: 通过having实现
results3 = session.query(Student.addr, func.count(Student.id)).filter(Student.age < 25).group_by(Student.addr)\
.having(func.count(Student.id) >= 3).all()
# 方式二: 通过子查询实现,在子句中使用 filter()和比较运算符来实现它。
# 3.1、 先查询年龄小于 25 的学生,并根据家庭地址分组,获取学生数量大于等于 3 的家庭地址
subquery = session.query(Student.addr, func.count(Student.id).label("stu_nums")).filter(Student.age < 25).\
group_by(Student.addr).subquery()
# 3.2、使用子查询和 filter 子句来过滤出学生数量大于等于 3 的家庭地址
results3_1 = session.query(subquery.c.addr, subquery.c.stu_nums).filter(subquery.c.stu_nums >= 3).all()
# for addr, stu_nums in results3:
# print(addr, stu_nums)
# 统计不同家庭地址男女生的数量
# select addr,gender,count(*) from t_student2 group by addr,gender;
results4 = session.query(Student.addr, Student.gender, func.count(Student.id).label("stu_nums")).group_by(Student.addr, Student.gender).all()
print(results4)
6、排序查询
6.1、排序方式
注意:
def order_by_query():
# 1、根据年龄对学生进行升序排序
# select *from t_student order by age asc;
res = session.query(Student).order_by(Student.age.asc()).all()
# 或者
# select *from t_student order by age;
res_1 = session.query(Student).order_by(Student.age).all()
# for res in res:
# print(res.age)
# 2、根据年龄对学生进行升序排序 , 年龄相同 , 再按照入学时间进行降序排序
# select *from t_student order by age,entrydate desc;
res2 = session.query(Student).order_by(Student.age, Student.entrydate.desc()).all()
# for res in res2:
# print(res.age,res.entrydate)
7、 分页查询
注意:
说明:在 SQLAlchemy 中,分页查询通常通过使用 offset() 和 limit() 方法来实现。这两个方法分别用于设置索引偏移量和限制查询结果的条数。
7.1、综合示例
def limit_query():
# 1、查询第1页学生数据, 每页展示10条记录
# select *from t_student limit 0,10;
res = session.query(Student).offset(0).limit(10).all()
# 或者
# select *from t_student limit 10;
res_1 = session.query(Student).limit(10).all()
# for stu in res:
# print(stu.id)
# 2、查询第2页学生数据, 每页展示10条记录
# 说明:起始索引=(页码-1)*页展示记录数
# select *from t_student limit 10,10;
res2 = session.query(Student).offset(10).limit(10).all()
# for stu in res2:
# print(stu.id)
1.4.5.9、多表查询
1、多表查询概述
1.1、测试数据准备
说明:先建立t_emp员工表和t_dept部门表两张表,并插入对应数据
1、创建对应的ORM模型如下:
from sqlalchemy.orm import declarative_base , sessionmaker, scoped_session
from sqlalchemyBaseUse import engine
from sqlalchemy import Column, Integer, String, DateTime, distinct, or_, and_, func, ForeignKey
# 声明ORM基类
Base = declarative_base()
Session = sessionmaker(bind=engine)
session = scoped_session(Session)
# 部门表
class Departments(Base):
__tablename__ = 't_dept'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False, comment="部门名称")
# 员工表
class Employes(Base):
__tablename__ = "t_emp"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(50), nullable=False, comment="姓名")
age = Column(Integer, comment="年龄")
job = Column(String(20), comment="职位")
salary = Column(Integer, comment="薪资")
entrydate = Column(DateTime, comment="入职时间")
managerid = Column(Integer, comment="直属领导ID")
dept_id = Column(Integer, ForeignKey("t_dept.id"))
def init_db():
# 创建继承base类的表的映射关系
Base.metadata.create_all(engine)
if __name__ == '__main__':
init_db()
注意:如果创建的表的编码不是utf8mb4,需要修改为utf8mb4,不然插入中文的数据会出现编码错误,执行以下SQL语句:
ALTER TABLE t_dept CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
ALTER TABLE t_emp CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
2、插入测试数据
INSERT INTO t_dept (name) VALUES ('研发部'), ('市场部'),('财务部'), ('销售部'), ('总经办'), ('人事部');
INSERT INTO t_emp (id, name, age, job,salary, entrydate, managerid, dept_id) VALUES
(1, '大刘', 28, '总裁',40000, '2000-01-01', null,5),
(2, '夏析', 20, '项目经理',20000, '2005-12-05', 1,1),
(3, '李兴', 33, '开发', 8000,'2000-11-03', 2,1),
(4, '张敏', 30, '开发',11000, '2002-02-05', 2,1),
(5, '林夕', 43, '开发',10500, '2004-09-07', 3,1),
(6, '小美', 19, '程序员鼓励师',6600, '2004-10-12', 2,1),
(7, '林逸', 60, '财务总监',8500, '2002-09-12', 1,3),
(8, '李媛', 19, '会计',48000, '2006-06-02', 7,3),
(9, '林妙妙', 23, '出纳',5250, '2009-05-13', 7,3),
(10, '赵芳', 20, '市场部总监',12500, '2004-10-12', 1,2),
(11, '张三', 56, '职员',3750, '2006-10-03', 10,2),
(12, '李四', 19, '职员',3750, '2007-05-09', 10,2),
(13, '王二', 19, '职员',5500, '2009-02-12', 10,2),
(14, '周鑫', 88, '销售总监',14000, '2004-10-12', 1,4),
(15, '刘达', 38, '销售',4600, '2004-10-12', 14,4),
(16, '老钱', 40, '销售',4600, '2004-10-12', 14,4),
(17, '小六', 42, null,2000, '2011-10-12', 1,null);
1.2、概述
说明:多表查询就是指从多张表中查询数据。
操作:要执行多表查询,就只需要使用逗号分隔多张表,如: select * from t_emp , t_dept;
具体的执行结果如下:
解释:可见查询结果中包含了大量的结果集,总共102条记录,而这其实就是员工表emp所有的记录 (17) 与 部门表dept所有记录(6) 的所有组合情况,这种现象称之为笛卡尔积。
笛卡尔积: 笛卡尔乘积是指在数学中,两个集合A集合 和 B集合的所有组合情况。
而在多表查询中,需要消除无效的笛卡尔积,只保留两张表关联部分的数据;使用SQL语句,消除多表查询的笛卡尔积:
select *from t_emp,t_dept where t_emp.dept_id = t_dept.id;
1.3、多表查询的分类
2、内连接
说明:内连接查询的是两张表交集部分的数据
内连接的语法分为两种:
语法:
1、隐式内连接
SELECT 字段列表 FROM 表1 , 表2 WHERE 条件 … ;
2、显示内连接(inner join)
SELECT 字段列表 FROM 表1 [ INNER ] JOIN 表2 ON 连接条件 … ;
注意:SQL语句中的内连接的inner关键可以省略
说明:在 SQLAlchemy 中,内连接(INNER JOIN)是默认的连接类型,当你使用 join() 方法而不指定 isouter=True 时,你就是在执行内连接。内连接只返回满足连接条件的行,即两个表中都存在匹配项的行。
示例:
1、查询每一个员工的姓名 , 及关联的部门的名称 (使用隐式内连接实现)
res = session.query(Employes, Departments).filter(Employes.dept_id == Departments.id).all()
2、查询每一个员工的姓名 , 及关联的部门的名称 (使用显式内连接实现)
res2 = session.query(Employes, Departments).join(Departments, Employes.dept_id == Departments.id).all()
3、查询每一个员工的姓名 , 及关联的部门的名称(使用relationship实现内连接查询)
employes = relationship("Employes", back_populates="departments")
departments = relationship("Departments", back_populates="employes")
def init_db():
# 创建继承base类的表的映射关系
Base.metadata.create_all(engine)
if __name__ == '__main__':
init_db()
res3 = session.query(Employes).join(Employes.departments)
3、外连接
外连接分为两种,分别是:
1、左外连接
说明:左外连接相当于查询表1(左表)的所有数据,当然也包含表1和表2交集部分的数据。
语法:
SELECT 字段列表 FROM 表1 LEFT [ OUTER ] JOIN 表2 ON 条件 … ;
2、右外连接
说明:右外连接相当于查询表2(右表)的所有数据,当然也包含表1和表2交集部分的数据。
语法:
SELECT 字段列表 FROM 表1 RIGHT [ OUTER ] JOIN 表2 ON 条件 … ;
说明:SQLAlchemy 支持左外连接(LEFT OUTER JOIN)和右外连接(RIGHT OUTER JOIN)。不过,在 SQLAlchemy 的 ORM 层,通常更常见的是使用左外连接,因为 SQLAlchemy 更多地是按照关系型数据库的标准来设计的,而标准 SQL 中右外连接并不如左外连接那样常见。
但是,你可以使用 SQLAlchemy 的 Core 表达式语言来执行右外连接。以下是如何在 SQLAlchemy 中使用左外连接和右外连接的示例:
注意事项:左外连接和右外连接是可以相互替换,只需要调整在连接查询时SQL中,表结构的先后顺序就可以。在实际开发使用时,左外连接常用。
示例:
1、查询t_emp表的所有数据, 和对应的部门信息 (左外连接)
# 1、查询t_emp表的所有数据, 和对应的部门信息(左外连接)
# select e.*,d.name from t_emp e left outer join t_dept d on e.dept_id = d.id;
# 注意:outerjoin()函数中的连接条件可以省略,Employes.dept_id == Departments.id
res = session.query(Employes, Departments.name).outerjoin(Departments).all()
# print(res)
2、查询t_dept表的所有数据, 和对应的员工信息(右外连接)
# 查询t_dept表的所有数据, 和对应的员工信息(右外连接)
# select d.*,e.* from t_emp e right outer join t_dept d on d.id = e.dept_id;
# 使用join()函数实现,isouter=True使用左外连接,左外连接和右外连接可以相互转换,也就是表的位置不同
# join()函数中的第一个参数Employes表示左外连接关联的表是t_emp,主表是t_dept
res2 = session.query(Employes, Departments).join(Employes, isouter=True).all()
# print(res2)
4、自连接查询
说明:自连接查询,顾名思义,就是自己连接自己,也就是把一张表连接查询多次。
连接方式:对于自连接查询,可以是内连接查询,也可以是外连接查询。
注意事项:在自连接查询中,必须要为表起别名,不然不清楚所指定的条件、返回的字段,到底是哪一张表的字段。
语法:
SELECT 字段列表 FROM 表A 别名A JOIN 表A 别名B ON 条件 … ;
说明:同样的在SQlAlchemy中实现自连接查询,也需要为表起别名的方式,起别名使用aliased()函数实现。
示列:
1、查询员工 及其 所属领导的名字
# select a.name '员工姓名', b.name '领导姓名' from t_emp a join t_emp b on a.managerid = b.id;
# 或
# select a.name '员工姓名', b.name '领导姓名' from t_emp a, t_emp b where a.managerid = b.id;
# 为表Employes创建别名
user = aliased(Employes)
manager = aliased(Employes)
# 使用Inner join 的连接方式
res = session.query(user.name.label("employee_name"), manager.name.label("manager_name")).join(manager, user.managerid == manager.id).all()
print(res)
2、查询所有员工 t_emp a 及其领导的名字 t_emp b, 如果员工没有领导, 也需要查询出来 。
# 为表Employes创建别名
user = aliased(Employes)
manager = aliased(Employes)
# select a.name '员工姓名', b.name '领导姓名' from t_emp a left join t_emp b on a.managerid = b.id;
res2 = session.query(user.name.label("employee_name"), manager.name.label("manager_name")).join(manager, user.managerid == manager.id, isouter=True).all()
print(res2)
5、联合查询
关键字:union
说明:
语法:
SELECT 字段列表 FROM 表A …
UNION [ ALL ]
SELECT 字段列表 FROM 表B ….;
说明:在 SQLAlchemy 中,可以使用 union() 或者 union_all()方法来执行联合查询。
示例:
1、将薪资低于 5000 的员工 , 和 年龄大于 45 岁的员工全部查询出来.
SQL实现如下:
select * from t_emp where salary < 5000
union all
select * from t_emp where age > 45;
ORM实现如下:
query1 = session.query(Employes).filter(Employes.salary < 5000)
query2 = session.query(Employes).filter(Employes.age > 45)
union_query = query1.union(query2).all()
for un in union_query:
print(un.name)
6、子查询
6.1、概述
1、概念
如:
SELECT * FROM t1 WHERE column1 = ( SELECT column1 FROM t2 );
2、分类
分类依据:根据子查询结果
分为:
分类依据2:根据子查询位置
分为:
6.2、标量子查询
说明:子查询返回的结果是单个值(数字、字符串、日期等)
常用操作符:= <> > >= <
示列:
1、查询“销售部”所有员工的信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出销售部的部门id
select id from t_dept where name = '销售部';2、然后根据查出的部门id再查询销售部的所有员工信息
select e.* from t_emp e where dept_id = (select id from t_dept where name = '销售部');或者使用关联查询
select e.* from t_emp e join t_dept d on e.dept_id = d.id where d.name='销售部';
ORM实现如下:
# 1.1、先查询出销售部的部门id
subquery = session.query(Departments.id).filter(Departments.name=="销售部").scalar_subquery()
# 可能会出现警告,可以将子查询转换成 select 语句
# subquery = select([Departments.id]).where(Departments.name == "销售部")
# 1.2、然后根据查出的部门id再查询销售部的所有员工信息
res = session.query(Employes).filter(Employes.dept_id.in_(subquery)).all()
print(res)
2、查询在员工“林逸”之后入职的所有员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出林逸的入职时间
select entrydate from t_emp where name = '林逸';2、然后根据入职时间再查找出在林逸入职时间之后的员工信息
select e.* from t_emp e where entrydate > (select entrydate from t_emp where name = '林逸');
ORM实现如下:
subquery2 = session.query(Employes.entrydate).filter_by(name="林逸").scalar_subquery()
res2 = session.query(Employes).filter(Employes.entrydate > subquery2)
print(res2)
6.3、列子查询
说明:子查询返回的结果是一列(可以是多行)
常用操作符:IN 、NOT IN 、 ANY 、SOME 、 ALL
操作符 |
描述 |
IN |
在指定的集合范围之内,多选一 |
NOT IN |
不在指定的集合范围之内 |
ANY |
子查询返回列表中,有任意一个满足即可 |
SOME |
与ANY等同,使用SOME的地方都可以使用ANY |
ALL |
子查询返回列表的所有值都必须满足 |
示列:
1、查询 "销售部" 和 "市场部" 的所有员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先在部门表中查询出销售部和市场部的部门id
select id from t_dept where name = '销售部' or name = '市场部';2、然后根据1查出的结果,使用关键字in查询出对应销售部和市场部的员工信息1
select * from t_emp where dept_id in (select id from t_dept where name = '销售部' or name = '市场部');
ORM实现如下:
subquery = session.query(Departments.id).filter(or_(Departments.name=="销售部", Departments.name=="市场部")).subquery()
res = session.query(Employes).filter(Employes.id.in_(subquery))
2、 查询比“财务部”所有人工资都高的员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出财务部所有人员的工资信息
select salary from t_emp e join t_dept d on e.dept_id = d.id where d.name = '财务部';2、然后根据1查出的结果,使用关键字all查询出比财务部所有人工资都高的员工信息
select * from t_emp where salary > all (select salary from t_emp e join t_dept d on e.dept_id = d.id where d.name = '财务部');
或
select * from t_emp where salary > all ( select salary from t_emp where dept_id =(select id from t_dept where name = '财务部') );
ORM实现如下:
subquery2 = session.query(Employes.salary).join(Departments).filter(Departments.name=="财务部").scalar_subquery()
res2 = session.query(Employes).filter(Employes.salary > all_(subquery2)).all()
3、 查询比“研发部”其中任意一人工资都高的员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出研发部所有人员的工资信息
select salary from t_emp where dept_id = (select id from t_dept where name = '研发部');2、然后根据1查出的结果,使用关键字any查询出比研发部任意一人工资都高的员工信息
select * from t_emp where salary > any (select salary from t_emp where dept_id = (select id from t_dept where name = '研发部'));
ORM实现如下:
subquery3 = session.query(Employes.salary).join(Departments).filter(Departments.name == "研发部").scalar_subquery()
res3 = session.query(Employes).filter(Employes.salary > any_(subquery3)).all()
6.4、行子查询
说明:子查询返回的结果是一行(可以是多列)
常用的操作符:= 、<> 、IN 、NOT IN
示列:
1、 查询与 "张敏" 的薪资及直属领导相同的员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出张敏的薪资和直属领导的id
select salary,managerid from t_emp where name = '张敏';2、然后根据1查出的结果,使用 = 查询与"张敏"的薪资及直属领导相同的员工信息
select * from t_emp where (salary,managerid) = (select salary,managerid from t_emp where name = '张敏');
ORM实现如下:
subquery = session.query(Employes.salary, Employes.managerid).filter_by(name="张敏").subquery()
res = session.query(Employes).filter(tuple_(Employes.salary,Employes.managerid).in_(subquery)).all()
6.5、表子查询
说明:子查询返回的结果是多行多列
常用的操作符:IN
示列:
1、查询与 "林夕" , "林妙妙" 的职位和薪资相同的员工信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询林夕和林妙妙的职位与薪资信息
select job, salary from t_emp where name in ('林夕','林妙妙');2、然后根据1查出的结果,使用 in 查询与"林夕","林妙妙"的职位和薪资相同的员工信息
select * from t_emp where (job,salary) in (select job, salary from t_emp where name in ('林夕','林妙妙'));
ORM实现如下:
subquery = session.query(Employes.job, Employes.salary).filter(Employes.name.in_(["林夕", "林妙妙"])).subquery()
res = session.query(Employes).filter(tuple_(Employes.job, Employes.salary).in_(subquery)).all()
2、查询入职日期是 "2002-09-12" 之后的员工信息 , 及其部门信息
SQL实现如下:
分析:可先将查询分为两步
1、先查询出入职日期是 "2002-09-12" 之后的员工信息
select * from t_emp where entrydate > "2002-09-12";2、根据1查询出的表信息,在查询对应的部门信息
select e.*, d.* from (select * from t_emp where entrydate > "2002-09-12") e join t_dept d on e.dept_id = d.id;
ORM实现如下:
subquery2 = session.query(Employes).filter(Employes.entrydate > "2002-09-12").subquery()
res2 = session.query(subquery2, Departments).join(Departments, subquery2.c.dept_id == Departments.id).all()
作者:测试有道