文章目录
连接池
import pymysql
from dbutils.pooled_db import PooledDB
POOL = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。
ping=0,
# ping MySQL服务端,检查是否服务可用。
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='luffy',
charset='utf8'
)
-第二步:使用
from sql_pool import POOL # 做成单例
conn = POOL.connection() # 从连接池种取一个链接(如果没有,阻塞在这)
curser = conn.cursor()
curser.execute('select * from luffy_order where id<2')
res=curser.fetchall()
print(res)
实例
from flask import Flask
app = Flask(__name__)
@app.route('/')
def hello_world(): # put application's code here
conn = POOL.connection() # 从连接池种取一个链接(如果没有,阻塞在这)
curser = conn.cursor()
curser.execute('select * from luffy_course where id<2')
res = curser.fetchall()
return '数据查到了 %s' % res
if __name__ == '__main__':
app.run()
flask定制命令
使用 flask-script定制命令(老版本,不用了)
# flask 老版本中,没有命令运行项目,自定制命令
# flask-script 解决了这个问题:flask项目可以通过命令运行,可以定制命令
# 新版的flask--》官方支持定制命令 click 定制命令,这个模块就弃用了
# flask-migrate 老版本基于flask-script,新版本基于flask-click写的
### 使用步骤
-1 pip3 install Flask-Script==2.0.3
-2 pip3 install flask==1.1.4
-3 pip3 install markupsafe=1.1.1
-4 使用
from flask_script import Manager
manager = Manager(app)
if __name__ == '__main__':
manager.run()
-5 自定制命令
@manager.command
def custom(arg):
"""自定义命令
python manage.py custom 123
"""
print(arg)
- 6 执行自定制命令
python manage.py custom 123
新版本定制命令
from flask import Flask
import click
app = Flask(__name__)
@app.cli.command("create-user")
@click.argument("name")
def create_user(args):
# from pool import POOL
# conn=POOL.connection()
# cursor=conn.cursor()
# cursor.excute('insert into user (username,password) values (%s,%s)',args=[name,'123456'])
# conn.commit()
print(name)
@app.route('/')
def index():
return 'index'
if __name__ == '__main__':
app.run()
# 运行项目的命令是:flask --app py文件名字:app run
# 命令行中执行
# flask --app 7-flask命令:app create-user lqz
# 简写成 前提条件是 app所在的py文件名字叫 app.py
# flask create-user lqz
# django定制命令
# 1 app下新建文件夹
management/commands/
# 2 在该文件夹下新建py文件,随便命名(命令名)
# 3 在py文件中写代码
from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = '命令提示'
def handle(self, *args, **kwargs):
命令逻辑
# 4 使用命令
python manage.py py文件(命令名)
flask 缓存的使用
from flask import Flask
from flask_caching import Cache
config = {
"DEBUG": True, # some Flask specific configs
"CACHE_TYPE": "SimpleCache", # Flask-Caching related configs
"CACHE_DEFAULT_TIMEOUT": 300
}
app = Flask(__name__)
# tell Flask to use the above defined config
app.config.from_mapping(config)
cache = Cache(app)
@app.route('/')
def index():
cache.set('name', 'xxx')
return 'index'
@app.route('/get')
def get():
res=cache.get('name')
return res
if __name__ == '__main__':
app.run()
flask信号的使用
# 内置信号--》flask请求过程中--》源码中定义的---》不需要我们定义和触发---》只要写了函数跟它对应--》执行到这,就会触发函数执行
# Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为
request_started = _signals.signal('request-started') # 请求到来前执行
request_finished = _signals.signal('request-finished') # 请求结束后执行
before_render_template = _signals.signal('before-render-template') # 模板渲染前执行
template_rendered = _signals.signal('template-rendered') # 模板渲染后执行
got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行
request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发
# 内置信号的使用
# 案例 :
### 我们现在想在模板渲染之 : 记录日志 使用内置信号实现
# 1 写一个函数
def before_render(*args, **kwargs):
print(args)
print(kwargs)
# 谁(ip) 在什么时间 访问了哪个页面(template)
print('记录日志,模板要渲染了')
# 2 跟内置信号绑定
from flask.signals import before_render_template
before_render_template.connect(before_render) # 信号触发的函数
# 3 源码中触发信号执行(我们不需要动)
# before_render_template.send() 源码再模板渲染之前,它写死了
@app.route('/')
def index():
return render_template('index.html')
@app.route('/login')
def login():
return render_template('login.html')
if __name__ == '__main__':
app.run()
# 自定义信号
from flask import Flask, render_template
from flask.signals import _signals
import pymysql
app = Flask(__name__)
app.debug = True
# 1 定义信号
# 自定义信号
db_save = _signals.signal('db_save')
# 2 写一个函数
def db_save_fun(*args, **kwargs):
print(args)
print(kwargs)
print('表数据插入了')
# 3 跟自定义置信号绑定
db_save.connect(db_save_fun)
# 3 触发信号执行(需要我们做)
# before_render_template.send() 源码再模板渲染之前,它写死了
@app.route('/')
def index():
return render_template('index.html')
@app.route('/create_article')
def create_article():
conn = pymysql.connect(host='127.0.0.1', user='root', password='1234', database='cnblogs')
cursor = conn.cursor()
cursor.execute('insert into article (title,author) VALUES (%s,%s)', args=['测试测试标题', '测试作者测试'])
conn.commit()
# 手动触发信号
db_save.send(table_name='article',info={'title':'测试测试标题','author':'测试作者测试'})
return '插入成功'
if __name__ == '__main__':
app.run()
# 步骤:
# 1 定义信号
db_save = _signals.signal('db_save')
# 2 写一个函数
def db_save_fun(*args, **kwargs):
print(args)
print(kwargs)
print('表数据插入了')
# 3 跟自定义置信号绑定
db_save.connect(db_save_fun)
# 3 触发信号执行(需要我们做)
db_save.send() # 需要在代码中写,可以传参数,传入的参数--》db_save_fun 能拿到
sqlalchemy原生操作
# 操作原生sql ---》用得少
import pymysql
import threading
# 1 导入
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
# 2 创建引擎
engine = create_engine(
"mysql+pymysql://root:1234@127.0.0.1:3306/cnblogs",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 3 使用引擎,拿到链接
# conn = engine.raw_connection()
# # 4 剩下的一样了
# cursor=conn.cursor(pymysql.cursors.DictCursor)
# cursor.execute('select * from article limit 10')
# res=cursor.fetchall()
# print(res)
## 多线程测试:
def task(arg):
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(
"select * from article"
)
result = cursor.fetchall()
print(result)
cursor.close()
conn.close()
for i in range(20):
t = threading.Thread(target=task, args=(i,))
t.start()
sqlalchemy操作表
# 1 建个 models.py 里面写 表模型
# 2 把表模型---》同步到数据库中
# 3 增删查改
# 1 导入
import datetime # 时间模块
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
Base = declarative_base() # Base 当成 django的 models.Model
# 2 创建表模型
class User(Base): # 模型层的名字
__tablename__ = 'users' # 数据库表的名字
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(32), index=True, nullable=True)
email = Column(String(32), unique=True)
# datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
ctime = Column(DateTime, default=datetime.datetime.now)
extra = Column(Text)
# 3 没有迁移命令---》后期使用第三方模块,可以有命令
# 目前需要手动做
# sqlalchemy 不能创建数据库,能创建表,删除表,不能删除增加字段(第三方模块)
# 3.1 创建引擎
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy01",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
## 3.2 把表模型同步到数据库中
Base.metadata.create_all(engine)
# 3.3 同步和删除表只能执行一条
# Base.metadata.drop_all(engine)
flask orm操作表
基本操作
from models import User
from sqlalchemy import create_engine
# 1 创建引擎
engine = create_engine(
"mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy01",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
# 2 orm操作--》借助于 engine 得到session(conn)对象
from sqlalchemy.orm import sessionmaker
Connection = sessionmaker(bind=engine) # 绑定连接
session = Connection()
# 3 使用conn---》进行orm操作
# 3.1 增加数据
# user = User(name='lqz', email='3@qq.com')
# # 插入到数据库
# conn.add(user) # 放个对象
# # 提交
# conn.commit()
# # 关闭链接
# conn.close()
# 3.2 查询数据
# 查询User表中id为1的所有记录--》放到列表中
# res=conn.query(User).filter_by(id=1).all()
# print(res)
## 3.3 删除
# res = conn.query(User).filter_by(name='lqz').delete()
# print(res)
# conn.commit()
## 3.4 修改
# res=conn.query(User).filter_by(name='9999').update({'extra':'xxsss'})
# conn.commit()
一对多的增加和跨表查询 (一对一只需要关联字段加上 ,unique=True)
modles.py文件
import datetime
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
Base = declarative_base() # Base 当成 models.Model
# 一对多 :一个兴趣被多个人喜欢 一个人只喜欢一个兴趣
class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
caption = Column(String(50), default='篮球')
def __str__(self):
return self.caption
def __repr__(self):
return self.caption
class Person(Base):
__tablename__ = 'person'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
# hobby指的是tablename而不是类名,uselist=False
# 外键关联--》强外键--》物理外键
hobby_id = Column(Integer, ForeignKey("hobby.id")) # 如果是一对一加上unique=True
# 跟数据库无关,不会新增字段,只用于快速连表操作
# 类名,backref用于反向查询
hobby = relationship('Hobby', backref='pers') # 重点!!!!!!!! 正反向查询要通过这个字段
def __str__(self):
return self.name
def __repr__(self):
return self.name
# views.py中
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Hobby, Person, User
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy01",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Session = sessionmaker(bind=engine)
session = Session()
# 1 增加 Hobby
# hobby = Hobby(caption='足球')
# hobby1 = Hobby(caption='篮球')
# session.add_all([hobby, hobby1]) # 列表套对象
# session.commit() # 一定要记得提交
# 2 增加Person
# p1 = Person(name='彭于晏', hobby_id=1) # 可以是查询出来的对象 也可以是id 但是id写死了
# p2 = Person(name='刘亦菲', hobby_id=2)
# session.add_all([p1, p2])
# session.commit()
# 3 简便方式增加person---》增加Person,直接新增Hobby
# hobby1 = Hobby(caption='乒乓球')
# # 同时把数据插入两张表
# p1 = Person(name='彭于晏', hobby=hobby1) # 前提是表模型 必须有 relationship
# session.add(p1)
# session.commit()
# # 4 基于对象的跨表查询---正向
# per = session.query(Person).filter_by(name='彭于晏').first()
# print(per)
# # 正向 Person的对象通过 hobby 查询到 Hobby的caption字段
# print(per.hobby.caption)
# 5 基于对象的跨表查询---正向
# hobby=session.query(Hobby).filter_by(caption='乒乓球').first()
# print(hobby)
# 反向--->拿到多条 可以通过索引取值
hobby=session.query(Hobby).filter_by(caption='乒乓球').first()
print(hobby.pers)
print(hobby.pers[0].name) # 列表套对象
多对多关系的增加和查询
# 多对多 models.py
class Boy2Girl(Base):
__tablename__ = 'boy2girl'
id = Column(Integer, primary_key=True, autoincrement=True)
girl_id = Column(Integer, ForeignKey('girl.id'))
boy_id = Column(Integer, ForeignKey('boy.id'))
# boy = relationship('Boy', backref='boy'
def __str__(self):
return self.name
def __repr__(self):
return self.name
class Girl(Base):
__tablename__ = 'girl'
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)
class Boy(Base):
__tablename__ = 'boy'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(64), unique=True, nullable=False)
# 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以--等同于manytomany
girls = relationship('Girl', secondary='boy2girl', backref='boys')
def __str__(self):
return self.name
def __repr__(self):
return self.name
if __name__ == '__main__':
# 3.1 创建引擎
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy01",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
## 3.2 把表模型同步到数据库中
Base.metadata.create_all(engine)
# 3.3 删除表
# Base.metadata.drop_all(engine)
# 视图类中
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Hobby, Person, User, Girl, Boy2Girl, Boy
engine = create_engine(
"mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy01",
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Session = sessionmaker(bind=engine)
session = Session()
# 第一种简便方式增加
# obj2 = Girl(name='张亦菲')
# obj3 = Girl(name='李娜扎')
# obj1 = Boy(name='张小勇', girls=[obj2, obj3]) # 在Boy 添加一个 通过girls关键字往 Girl 加上两个并建立关系
#
# session.add(obj1)
# session.commit()
# # 4 基于对象的跨表查询---正向
boy = session.query(Boy).filter_by(name='张小勇').first()
print(boy.girls[0].name)
# 5 基于对象的跨表查询---反向
# girl=session.query(Girl).filter_by(name='张亦菲').first()
# print(girl.boys)
# 第二中添加方式 一张一张表的添加
# 1 增加 Boy
# boy = Boy(name='王小刚')
# boy2 = Boy(name='王小明')
# boy3 = Boy(name='王小勇')
# session.add_all([boy,boy2,boy3])
# session.commit()
# 2 增加Girl
# girl = Girl(name='张小华')
# girl2 = Girl(name='刘小红')
# girl3 = Girl(name='李小丽')
# session.add_all([girl3, girl2, girl])
# session.commit()
# 3 增加Boy2Girl
# obj1=Boy2Girl(boy_id=1,girl_id=1)
# obj2=Boy2Girl(boy_id=1,girl_id=2)
# obj3=Boy2Girl(boy_id=1,girl_id=3)
# session.add_all([obj2, obj3, obj1])
# session.commit()
多对多基本的增删改查
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import User, Person, Hobby, Boy, Girl, Boy2Girl
from sqlalchemy.sql import text
# 添加
# obj1 = Boy(name='张小勇', girls=[obj2, obj3]) # 在Boy 添加一个 通过girls关键字往 Girl 加上两个并建立关系
# session.add(obj1)
# 2 删除
# 2.1 session.query(Users).filter_by(id=1).delete()
# 2.1 session.delete(对象)
# user = session.query(User).filter_by(id=1).first()
# session.delete(user)
# session.commit()
# 修改
# 1 方式一:
# session.query(Boy).filter_by(id=1).update({'name':'lqz'})
# session.commit()
# # 2 方式二 类名.属性名,作为要修改的key
# session.query(Boy).filter_by(id=4).update({Boy.name:'lqz1'})
# session.commit()
# id为4的人的名字后+ _nb 类似于django的 F 查询
# session.query(User).filter_by(id=2).update({'name':User.name+'_nb'},synchronize_session=False) # 字符串拼接
# session.query(User).filter_by(id=2).update({'id':User.id+6}, synchronize_session="evaluate") # 数字之间加
# session.commit()
# # 3 方式三:
# 对象.name='xxx'
#session.add(对象)
# boy=session.query(Boy).filter_by(id=1).first()
# boy.name='xxzzyy'
# session.add(boy) # 有id就是修改,没有就是新增
# session.commit()
本文含有隐藏内容,请 开通VIP 后查看