• 认真地记录技术中遇到的坑!
  • 能摸鱼真是太好啦!嘿嘿嘿!

Python使用ORM进行数据库操作

生活 悠悠 3年前 (2022-03-28) 3691次浏览 0个评论

Python使用ORM进行数据库操作

ORM框架

sqlalchemypython最为常用的第三方ORM模块。

安装命令: pip install sqlalchemy

官网文档: http://docs.sqlalchemy.org/en/latest/dialects/index.html

sqlalchemy连接数据库

sqlalchemy只是封装了数据表和python对象的映射实现,并没有自行实现数据库连接,所以需要另外安装数据库连接的包。

sqlalchemy数据库连接支持: https://docs.sqlalchemy.org/en/13/dialects/index.html

可选的MySQL连接库:

  • mysqlclient (maintained fork of MySQL-Python)
  • PyMySQL
  • MySQL Connector/Python
  • CyMySQL
  • OurSQL
  • Google Cloud SQL
  • PyODBC
  • zxjdbc for Jython

相应的创建MySQL连接的方法如下:

import sqlalchemy

# mysqlclient (maintained fork of MySQL-Python)
engine = sqlalchemy.create_engine('mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>')

# PyMySQL
engine = sqlalchemy.create_engine('mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]')

# MySQL-Connector
engine = sqlalchemy.create_engine('mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>')

# cymysql
engine = sqlalchemy.create_engine('mysql+cymysql://<username>:<password>@<host>/<dbname>[?<options>]')

# OurSQL
engine = sqlalchemy.create_engine('mysql+oursql://<user>:<password>@<host>[:<port>]/<dbname>')

# pyodbc
engine = sqlalchemy.create_engine('mysql+pyodbc://<username>:<password>@<dsnname>')

# zxjdbc
engine = sqlalchemy.create_engine('mysql+zxjdbc://<user>:<password>@<hostname>[:<port>]/<database>')

另外还需要在链接数据库时设置编码等,比如设置utf8支持,可用项目选项

  • Unicode支持: charset=utf8mb4
  • 处理二进制数据: binary_prefix=true

示例: engine = create_engine("mysql+pymysql://scott:tiger@localhost/test?charset=utf8mb4&binary_prefix=true")

sqlalchemy定义数据表映射

sqlalchemy表映射对象定义方法

sqlalchemy可以定义一个类对应一个表。看下面的实例:

  • create_engine() 会返回一个数据库引擎,echo 参数为 True 时,会显示每条执行的 SQL 语句,生产环境下可关闭。
  • sessionmaker() 会生成一个数据库会话类。这个类的实例可以当成一个数据库连接,它同时还记录了一些查询的数据,并决定什么时候执行 SQL 语句。
  • declarative_base() 创建了一个 BaseModel 类,这个类的子类可以自动与一个表关联
  • BaseModel__tablename__ 属性就是数据库中该表的名称
  • BaseModel.metadata.create_all(engine) 会找到 BaseModel 的所有子类,并在数据库中建立这些表
  • BaseModel.metadata.drop_all() 则是删除这些表

由于 SQLAlchemy 自己维护了一个数据库连接池(默认 5 个连接),因此初始化一个会话的开销并不大。

import sqlalchemy as sql
from sqlalchemy.ext import declarative
from sqlalchemy.orm import sessionmaker

engine = sql.create_engine('mysql+pymysql://uusama:uusama@localhost:3306/pixiv?charset=utf8mb4')
BaseModel = declarative.declarative_base()
session = sessionmaker(bind=engine)()

class User(BaseModel):  # 必须继承 BaseModel
    __tablename__ = 'user'  # 对应的表名
    id = sql.Column(sql.BigInteger, primary_key=True, autoincrement=True)  # 绑定字段
    name = sql.Column(sql.String(100))
    age = sql.Column(sql.Integer)
    created_at = sql.Column(sql.TIMESTAMP)
    updated_at = sql.Column(sql.TIMESTAMP)

sqlalchemy支持的表数据类型

可用的数据类型如下:https://docs.sqlalchemy.org/en/13/dialects/mysql.html#module-sqlalchemy.dialects.mysql.mysqlconnector

from sqlalchemy.dialects.mysql import \
        BIGINT, BINARY, BIT, BLOB, BOOLEAN, CHAR, DATE, \
        DATETIME, DECIMAL, DECIMAL, DOUBLE, ENUM, FLOAT, INTEGER, \
        LONGBLOB, LONGTEXT, MEDIUMBLOB, MEDIUMINT, MEDIUMTEXT, NCHAR, \
        NUMERIC, NVARCHAR, REAL, SET, SMALLINT, TEXT, TIME, TIMESTAMP, \
        TINYBLOB, TINYINT, TINYTEXT, VARBINARY, VARCHAR, YEAR

mysql表转化为sqlalchemy对象

pip install sqlacodegen

sqlacodegen mysql+pymysql://username:password@127.0.0.1:3306/db_name > entity.py

sqlalchemy实现数据增、删、改、查

假设定义了下面的实体以及数据库连接:

import sqlalchemy as sql
from sqlalchemy.ext import declarative
from sqlalchemy.orm import sessionmaker

engine = sql.create_engine('mysql+pymysql://uusama:uusama@localhost:3306/pixiv?charset=utf8mb4')
BaseModel = declarative.declarative_base()
session = sessionmaker(bind=engine)()

class User(BaseModel):
    __tablename__ = 'user'  # 对应的表名
    id = sql.Column(sql.BigInteger, primary_key=True)  # 绑定字段
    name = sql.Column(sql.String(100))
    age = sql.Column(sql.Integer)
    created_at = sql.Column(sql.TIMESTAMP)
    updated_at = sql.Column(sql.TIMESTAMP)

执行原生SQL

connection = engine.contextual_connect()
cursor = connection.sursor()
cursor.execute('select * from User')
result = cursor.fetchall()
connection.close()
cursor.close()


result = session.execute(text("SELECT * FROM user WHERE id=:param"), {"param":5})
result = session.execute(users.insert(), [{"id": 7, "name": "somename7"}, {"id": 8, "name": "somename8"}, {"id": 9, "name": "somename9"}])

添加数据

使用session.add_all(records)添加数据:

records = [
    User(name='uusama', age=20),
    User(name='uu', age=18)
]
# 添加一条数据
session.add(User(name='uusana', age=20))
# 添加多条数据
session.add_all(records)
session.commit()  # 一定要提交才能生效

查询数据

使用query = session.query(User)获取查询对象,query支持链式调用。

  • 打印查询SQL语句: query, query.statement
  • 返回所有列表数据: query.all()
  • 返回第一条数据: query.first()
  • 通过主键查找: query.get(2)
  • 过滤条件: query.filter(User.id = 2)
  • 限制一条记录: query.limit(1)
  • 规定数据起止偏移: query.offset(1)
  • 排序: query.order_by(User.name)
  • And条件: query.filter(User.id > 1, User.name != 'a').scalar()
  • 同时使用OR和AND条件: query.filter(or_(User.age > 50, User.age < 10), User.id > 10)
  • 使用sql函数: query(func.sum(User.age)).scalar()
  • in查询: query.filter(User.age.in_(20, 30, 40)).all()
  • like查询: query.filter(User.name.like('%uusa%'))
# 查询所有数据
result = session.query(User).order_by(User.age).all()
session.commit()

# 多个条件使用逗号分隔,查询
result = session.query(User).filter(User.id > 10, User.id < 100).order_by(User.age)
session.commit()

更新数据

session.query(User).filter(User.id < 10).update({'age': 22})
session.query(User).filter(User.id < 10).update({User.age: User.age + 1}, synchronize_session=False)
session.commit()

删除数据

session.query(User).filter(User.id < 10).delete()
session.commit()

批量插入大批数据

session.execute(
    User.__table__.insert(),
    [{'name': `randint(1, 100)`,'age': randint(1, 100)} for i in xrange(10000)]
)
session.commit()

替换一个已有主键的记录

类似于INSERT…ON DUPLICATE KEY UPDATE

user = User(id=1, name='ooxx')
session.merge(user)
session.commit()

也可以想select然后update

执行SQL前增加前缀

session.query(User.name).prefix_with('HIGH_PRIORITY').all()
session.execute(User.__table__.insert().prefix_with('IGNORE'), {'id': 1, 'name': '1'})

其他一些解决方法

  • 使用无符号整数: INTEGER(unsigned=True)
  • 字段名不一样: from_ = Column('from', CHAR(10))
  • 获取字段的长度: User.name.property.columns[0].type.length
  • 使用 UTF-8 编码: __table_args__ = { 'mysql_engine': 'InnoDB','mysql_charset': 'utf8'}
  • 设置外键约束: user_id1 = Column(Integer, ForeignKey('user.id'))
  • 删除 in 操作查询出来的记录: session.query(User).filter(User.id.in_((1, 2, 3))).delete(synchronize_session=False)
  • 加锁: .with_lockmode(‘update’)
  • 存在主键冲突时忽略插入: session.execute(User.__table__.insert().prefix_with('IGNORE'), {'id': 1, 'name': '1'})
# SQLAlchemy INSERT IGNORE
inserter = table_object.insert().prefix_with("OR REPLACE")
inserter.execute([{'column1':'value1'}, {'column1':'value2'}])
session.execute(User.__table__.insert().prefix_with('IGNORE'), {'id': 1, 'name': '1'})

喜欢 (2)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址