
家鄉(xiāng).jpg
SQLAlchemy學(xué)習(xí)筆記(版權(quán)所有,翻版必究)
一、實例化數(shù)據(jù)庫鏈接session
- 參數(shù)
-
- DATABASE_URI(數(shù)據(jù)庫地址):mysql://root:Yunji@2016@127.0.0.1:3306/flask?charset=utf8
- convert_unicode(True):設(shè)置str、boolean類型數(shù)據(jù)的默認(rèn)行為
- echo:engine的log行為,默認(rèn)False,可修改為True或者Debug,標(biāo)準(zhǔn)輸出到控制臺
- pool_size:數(shù)據(jù)庫連接池大小,默認(rèn)None,為5個連接數(shù)
- pool_recycle:數(shù)據(jù)庫連接的自動回收時間
-
- autocommit:自動提交
- autoflush:刷新session
- bind:綁定engine
- 方式
- 方式1:不綁定app
engine = create_engine(DATABASE_URI, convert_unicode=True, pool_size=50, pool_recycle=3600)
session = scoped_session(sessionmaker(autocommit=True, autoflush=False, bind=engine))
db.session = session()
- 方式2:綁定flask app
# store.py
from flask.ext.sqlalchemy import SQLAlchemy
db = SQLAlchemy()
----------------------------------------------------------------
# app.py
def create_app(config_name=None):
app = Flask(__name__)
db.app = app
db.init_app(app)
return app
app = create_app()
----------------------------------------------------------------
# handler.py
from store import db
result = db.session.query(ModelA).all()
二、Model定義
import db
所有model類繼承db.Model
-
- int
db.Column(db.Integer, primary_key=True)
- string
db.Column(db.String(50), nullable=False, unique=True)
- time
-
db.Column(db.TIMESTAMP, default=datetime.now(), nullable=False)# 不能插入timestamp=0的數(shù)據(jù) db.Column(db.DateTime(), nullable=False, server_default = func.now())
-
- boolean
db.Column(db.Boolean, nullable=True, default=True)
- float
db.Column(db.Float, nullable=False)
- int
-
- 主鍵 primary = True/False
- 是否為空 nullable = True/False
- 是否唯一 unique = True/False
- 默認(rèn)值 default = 0
- 外鍵 foreignkey
- 級聯(lián)刪除 ondelete='CASCADE'
# parentModel # 定義父級關(guān)聯(lián)的子集model from sqlalchemy.orm import relationship children = db.relationship("hostAssets", cascade="all, delete-orphan", passive_deletes=True) # children Model # 定義外鍵字段和外鍵刪除規(guī)則 from sqlalchemy import ForeignKey business_id = db.Column(db.Integer, db.ForeignKey('business.id', ondelete='CASCADE'), nullable=True)```
三、Result序列化
- marshmallow
- model增加to_dict方法
# ModelA.py
model中綁定__table__的信息
__tablename__ = 'model_a'
def to_dict(self):
return {c.name: getattr(self, c.name, None) for c in self.__table__.columns}
result = session.query().first()
result.to_dict()
四、Function
- get 查詢
- __dict__
_sa_instance:為內(nèi)置的屬性
- query() 查詢
all()
ret = session.query(User).all()得到的結(jié)果是model實例對象的列表,如果查詢不到,返回空list,可以用ret.column獲取指定字段的valuefirst()
ret = session.query(User).first()得到的結(jié)果是model實例對象,可以用ret.column獲取指定字段的value,等同于one_or_none(),如果查詢不到,返回Nonescalar()
ret = session.query(User.name).scalar()得到的是name字段的valueget(id)
ret = session.query(User).get(5)得到的結(jié)果是model實例對象,可以用ret.column獲取指定字段的value,get的內(nèi)容必須是主鍵
- update
參數(shù):
synchronize_session=False 不同步更新當(dāng)前session
synchronize_session= fetch 更新之前進行查詢,獲取最新的更新對象
db.session.query(hostAssets).filter(hostAssets.business_id == ids).update({"business_id": None},
synchronize_session=False)
db.session.commit()
- merge(obiect)
net_mission = get_object_or_404(NetMission, data['id']) # get_object_or_404 可換為普通的query語句
net_mission.mission_name = data['mission_name']
db.session.merge(net_mission)
db.session.commit()
- delete
參數(shù):
synchronize_session=False 不同步更新當(dāng)前session
synchronize_session= fetch 更新之前進行查詢,獲取最新的更新對象
db.session.query(hostAssetsTrend).filter(hostAssetsTrend.tid == ids).delete(synchronize_session=False)
PingResultHours.query.filter(PingResultHours.create_time < time_1).delete(synchronize_session='fetch')
db.session.commit()
- delete(object)
net_mission = get_object_or_404(NetMission, data['id']) # get_object_or_404 可換為普通的query語句
db.session.delete(net_mission)
db.session.commit()
- add 添加
- 方式一
_ = PingResult(source_ip=ret['source_ip'], order_ip=ret['order_ip'],
delay=round(float(ret['delay']), 2), loss=round(float(ret['loss']), 2),
create_time=datetime.now())
db.session.add(_)
db.session.commit()
- 方式二
db.session.execute(PingResult.__table__.insert(),[{"business_name":"A"},{"business_name":"B"}])
db.session.commit()
-
commit
db.session.commit()
-
filter
from sqlalchemy import and\_, or\_, desc
- and_
hosts = model.query.filter(and_(model.source_ip == data['source_ip'],
model.order_ip == data['order_ip'],
model.create_time >= start_time)).order_by('create_time')
- or_
key = '%%%s%%'%key
cond = model.query.filter(or_(hardwareFault.host_sn.like(key), hardwareFault.host_name.like(key))).all()
- between
User.query.filter(User.id.between(1, 10)).all()
- like
key = '%%%s%%'%key
cond = model.query.filter(or_(hardwareFault.host_sn.like(key), hardwareFault.host_name.like(key))).all()
- order_by, desc, asc
latest_data = hosts.order_by(desc('create_time')).limit(1).first()
latest_data = hosts.order_by(hosts.create_time.desc()).limit(1).first()
- in_
hardwareFault.query.filter(hardwareFault.host_sn.in_([1,2,3,4,5,6])).all()
-
rollback
db.session.rollback()
-
close
db.session.close()
-
fllush
db.session.flush()
五、高級查詢
- label
- 類似于sql中的as,可以對查詢的字段或者運算的結(jié)果重命名
- group_by
- 可以group多個字段,即為要同時符合多個條件,結(jié)果可以用(result.字段名)獲取
result = db.session.query(PingResultHours.source_ip, PingResultHours.order_ip, func.avg(
PingResultHours.delay).label('delay'), func.avg(PingResultHours.loss).label('loss')).filter(
PingResultHours.create_time > time_1, PingResultHours.create_time <= current_time).group_by(
PingResultHours.source_ip, PingResultHours.order_ip).all()

- union,union_all
相當(dāng)于sql中的union,對相同字段的結(jié)果做拼接 - join
records = db.session.query(faultRepairRecord).
join(hostAssets, faultRepairRecord.host_sn == .order_by(faultRepairRecord.create_time.desc()).
order_by('id').offset(start).limit(result['page_size']).all()
- limit
限制返回的結(jié)果數(shù)量 - skip
要跳過的項數(shù),SKIP 不能脫離 ORDER BY 子句單獨使用 - distinct
對字段去重
records = db.session.query(
distinct(performanceTrend.create_time),performanceTrend.create_time,performanceTrend.cpu_ratio,
performanceTrend.cpu_max,performanceTrend.memory_ratio,performanceTrend.hdd_ratio).
filter(and_(performanceTrend.create_time >= new_starttime, performanceTrend.create_time <= new_endtime)).all()
六、Sql執(zhí)行方式
- session.query
- session.execute
條件變量可以通過dict或者%s做替換
records = db.session.execute(
"select avg(type) as datatype,DATE_FORMAT(create_time,'%Y-%m-%d')as createTime,avg(cpu_ratio) as cpu_ratio,
max(cpu_max) as cpu_max,avg(memory_ratio) as memory_ratio, avg(hdd_ratio) as hdd_ratio
from performance_trend where type=1 and
(create_time > :stime and create_time < :etime) group by createTime",{"stime":new_starttime,"etime":new_endtime}).fetchall()
七、內(nèi)建func函數(shù)
- time類
- 對查詢的time字段做格式化輸出
func.date_format(PingResult.create_time,'%Y-%m-%d').label('create_time'))
- 對查詢的time字段做格式化輸出
- math類
- func.sum()
- func.avg()
- func.count()
- func.max()
- func.min()