Python使用ORM进行数据库操作
ORM框架
sqlalchemy
是python
最为常用的第三方ORM模块。
安装命令: pip install sqlalchemy
官网文档: http://docs.sqlalchemy.org/en/latest/dialects/index.html
sqlalchemy
连接数据库
sqlalchemy
只是封装了数据表和python对象的映射实现,并没有自行实现数据库连接,所以需要另外安装数据库连接的包。
sqlalchemy
数据库连接支持: https://docs.sqlalchemy.org/en/13/dialects/index.html
可选的MySQL连接库:
- mysqlclient (maintained fork of MySQL-Python)
- PyMySQL
- MySQL Connector/Python
- CyMySQL
- OurSQL
- Google Cloud SQL
- PyODBC
- zxjdbc for Jython
相应的创建MySQL连接的方法如下:
import sqlalchemy
# mysqlclient (maintained fork of MySQL-Python)
engine = sqlalchemy.create_engine('mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>')
# PyMySQL
engine = sqlalchemy.create_engine('mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]')
# MySQL-Connector
engine = sqlalchemy.create_engine('mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>')
# cymysql
engine = sqlalchemy.create_engine('mysql+cymysql://<username>:<password>@<host>/<dbname>[?<options>]')
# OurSQL
engine = sqlalchemy.create_engine('mysql+oursql://<user>:<password>@<host>[:<port>]/<dbname>')
# pyodbc
engine = sqlalchemy.create_engine('mysql+pyodbc://<username>:<password>@<dsnname>')
# zxjdbc
engine = sqlalchemy.create_engine('mysql+zxjdbc://<user>:<password>@<hostname>[:<port>]/<database>')
另外还需要在链接数据库时设置编码等,比如设置utf8
支持,可用项目选项
- Unicode支持:
charset=utf8mb4
- 处理二进制数据:
binary_prefix=true
示例: engine = create_engine("mysql+pymysql://scott:tiger@localhost/test?charset=utf8mb4&binary_prefix=true")
sqlalchemy
定义数据表映射
sqlalchemy
表映射对象定义方法
sqlalchemy
可以定义一个类对应一个表。看下面的实例:
create_engine()
会返回一个数据库引擎,echo 参数为 True 时,会显示每条执行的 SQL 语句,生产环境下可关闭。sessionmaker()
会生成一个数据库会话类。这个类的实例可以当成一个数据库连接,它同时还记录了一些查询的数据,并决定什么时候执行 SQL 语句。declarative_base()
创建了一个 BaseModel 类,这个类的子类可以自动与一个表关联BaseModel
的__tablename__
属性就是数据库中该表的名称BaseModel.metadata.create_all(engine)
会找到BaseModel
的所有子类,并在数据库中建立这些表BaseModel.metadata.drop_all()
则是删除这些表
由于 SQLAlchemy 自己维护了一个数据库连接池(默认 5 个连接),因此初始化一个会话的开销并不大。
import sqlalchemy as sql
from sqlalchemy.ext import declarative
from sqlalchemy.orm import sessionmaker
engine = sql.create_engine('mysql+pymysql://uusama:uusama@localhost:3306/pixiv?charset=utf8mb4')
BaseModel = declarative.declarative_base()
session = sessionmaker(bind=engine)()
class User(BaseModel): # 必须继承 BaseModel
__tablename__ = 'user' # 对应的表名
id = sql.Column(sql.BigInteger, primary_key=True, autoincrement=True) # 绑定字段
name = sql.Column(sql.String(100))
age = sql.Column(sql.Integer)
created_at = sql.Column(sql.TIMESTAMP)
updated_at = sql.Column(sql.TIMESTAMP)
sqlalchemy
支持的表数据类型
from sqlalchemy.dialects.mysql import \
BIGINT, BINARY, BIT, BLOB, BOOLEAN, CHAR, DATE, \
DATETIME, DECIMAL, DECIMAL, DOUBLE, ENUM, FLOAT, INTEGER, \
LONGBLOB, LONGTEXT, MEDIUMBLOB, MEDIUMINT, MEDIUMTEXT, NCHAR, \
NUMERIC, NVARCHAR, REAL, SET, SMALLINT, TEXT, TIME, TIMESTAMP, \
TINYBLOB, TINYINT, TINYTEXT, VARBINARY, VARCHAR, YEAR
mysql表转化为sqlalchemy
对象
pip install sqlacodegen
sqlacodegen mysql+pymysql://username:password@127.0.0.1:3306/db_name > entity.py
sqlalchemy
实现数据增、删、改、查
假设定义了下面的实体以及数据库连接:
import sqlalchemy as sql
from sqlalchemy.ext import declarative
from sqlalchemy.orm import sessionmaker
engine = sql.create_engine('mysql+pymysql://uusama:uusama@localhost:3306/pixiv?charset=utf8mb4')
BaseModel = declarative.declarative_base()
session = sessionmaker(bind=engine)()
class User(BaseModel):
__tablename__ = 'user' # 对应的表名
id = sql.Column(sql.BigInteger, primary_key=True) # 绑定字段
name = sql.Column(sql.String(100))
age = sql.Column(sql.Integer)
created_at = sql.Column(sql.TIMESTAMP)
updated_at = sql.Column(sql.TIMESTAMP)
执行原生SQL
connection = engine.contextual_connect()
cursor = connection.sursor()
cursor.execute('select * from User')
result = cursor.fetchall()
connection.close()
cursor.close()
result = session.execute(text("SELECT * FROM user WHERE id=:param"), {"param":5})
result = session.execute(users.insert(), [{"id": 7, "name": "somename7"}, {"id": 8, "name": "somename8"}, {"id": 9, "name": "somename9"}])
添加数据
使用session.add_all(records)
添加数据:
records = [
User(name='uusama', age=20),
User(name='uu', age=18)
]
# 添加一条数据
session.add(User(name='uusana', age=20))
# 添加多条数据
session.add_all(records)
session.commit() # 一定要提交才能生效
查询数据
使用query = session.query(User)
获取查询对象,query
支持链式调用。
- 打印查询SQL语句:
query
,query.statement
- 返回所有列表数据:
query.all()
- 返回第一条数据:
query.first()
- 通过主键查找:
query.get(2)
- 过滤条件:
query.filter(User.id = 2)
- 限制一条记录:
query.limit(1)
- 规定数据起止偏移:
query.offset(1)
- 排序:
query.order_by(User.name)
- And条件:
query.filter(User.id > 1, User.name != 'a').scalar()
- 同时使用OR和AND条件:
query.filter(or_(User.age > 50, User.age < 10), User.id > 10)
- 使用sql函数:
query(func.sum(User.age)).scalar()
in
查询:query.filter(User.age.in_(20, 30, 40)).all()
like
查询:query.filter(User.name.like('%uusa%'))
# 查询所有数据
result = session.query(User).order_by(User.age).all()
session.commit()
# 多个条件使用逗号分隔,查询
result = session.query(User).filter(User.id > 10, User.id < 100).order_by(User.age)
session.commit()
更新数据
session.query(User).filter(User.id < 10).update({'age': 22})
session.query(User).filter(User.id < 10).update({User.age: User.age + 1}, synchronize_session=False)
session.commit()
删除数据
session.query(User).filter(User.id < 10).delete()
session.commit()
批量插入大批数据
session.execute(
User.__table__.insert(),
[{'name': `randint(1, 100)`,'age': randint(1, 100)} for i in xrange(10000)]
)
session.commit()
替换一个已有主键的记录
类似于INSERT…ON DUPLICATE KEY UPDATE
。
user = User(id=1, name='ooxx')
session.merge(user)
session.commit()
也可以想select
然后update
。
执行SQL前增加前缀
session.query(User.name).prefix_with('HIGH_PRIORITY').all()
session.execute(User.__table__.insert().prefix_with('IGNORE'), {'id': 1, 'name': '1'})
其他一些解决方法
- 使用无符号整数:
INTEGER(unsigned=True)
- 字段名不一样:
from_ = Column('from', CHAR(10))
- 获取字段的长度:
User.name.property.columns[0].type.length
- 使用 UTF-8 编码:
__table_args__ = { 'mysql_engine': 'InnoDB','mysql_charset': 'utf8'}
- 设置外键约束:
user_id1 = Column(Integer, ForeignKey('user.id'))
- 删除 in 操作查询出来的记录:
session.query(User).filter(User.id.in_((1, 2, 3))).delete(synchronize_session=False)
- 加锁: .with_lockmode(‘update’)
- 存在主键冲突时忽略插入:
session.execute(User.__table__.insert().prefix_with('IGNORE'), {'id': 1, 'name': '1'})
# SQLAlchemy INSERT IGNORE
inserter = table_object.insert().prefix_with("OR REPLACE")
inserter.execute([{'column1':'value1'}, {'column1':'value2'}])
session.execute(User.__table__.insert().prefix_with('IGNORE'), {'id': 1, 'name': '1'})