SQLAlchemy學(xué)習(xí)筆記

家鄉(xiāng).jpg

SQLAlchemy學(xué)習(xí)筆記(版權(quán)所有,翻版必究)

一、實例化數(shù)據(jù)庫鏈接session

  • 參數(shù)
  • engine參數(shù):

    • DATABASE_URI(數(shù)據(jù)庫地址):mysql://root:Yunji@2016@127.0.0.1:3306/flask?charset=utf8
    • convert_unicode(True):設(shè)置str、boolean類型數(shù)據(jù)的默認(rèn)行為
    • echo:engine的log行為,默認(rèn)False,可修改為True或者Debug,標(biāo)準(zhǔn)輸出到控制臺
    • pool_size:數(shù)據(jù)庫連接池大小,默認(rèn)None,為5個連接數(shù)
    • pool_recycle:數(shù)據(jù)庫連接的自動回收時間
  • session參數(shù):

    • autocommit:自動提交
    • autoflush:刷新session
    • bind:綁定engine
  • 方式
  • 方式1:不綁定app
engine = create_engine(DATABASE_URI, convert_unicode=True, pool_size=50, pool_recycle=3600)
session = scoped_session(sessionmaker(autocommit=True, autoflush=False, bind=engine))
db.session = session()
  • 方式2:綁定flask app
# store.py
from flask.ext.sqlalchemy import SQLAlchemy
db = SQLAlchemy()
----------------------------------------------------------------
# app.py
def create_app(config_name=None):
        app = Flask(__name__)
        db.app = app
        db.init_app(app)
        return app
app = create_app()
----------------------------------------------------------------
# handler.py
from store import db
result =  db.session.query(ModelA).all()

二、Model定義

import db
所有model類繼承db.Model
  • 數(shù)據(jù)類型

    • int
      • db.Column(db.Integer, primary_key=True)
    • string
      • db.Column(db.String(50), nullable=False, unique=True)
    • time
      • db.Column(db.TIMESTAMP, default=datetime.now(), nullable=False) # 不能插入timestamp=0的數(shù)據(jù)
      • db.Column(db.DateTime(), nullable=False, server_default = func.now())
    • boolean
      • db.Column(db.Boolean, nullable=True, default=True)
    • float
      • db.Column(db.Float, nullable=False)
  • 內(nèi)置方法

    • 主鍵 primary = True/False
    • 是否為空 nullable = True/False
    • 是否唯一 unique = True/False
    • 默認(rèn)值 default = 0
    • 外鍵 foreignkey
      • 級聯(lián)刪除 ondelete='CASCADE'
      # parentModel
      # 定義父級關(guān)聯(lián)的子集model
      from sqlalchemy.orm import relationship  
      children = db.relationship("hostAssets", cascade="all, delete-orphan", passive_deletes=True)  
      
      # children Model
      # 定義外鍵字段和外鍵刪除規(guī)則
      from sqlalchemy import ForeignKey  
      business_id = db.Column(db.Integer, db.ForeignKey('business.id', ondelete='CASCADE'), nullable=True)```  
     
    

三、Result序列化

# ModelA.py
model中綁定__table__的信息
__tablename__ = 'model_a'
def to_dict(self):
  return {c.name: getattr(self, c.name, None) for c in self.__table__.columns}

result = session.query().first()
result.to_dict()

四、Function

  • get 查詢
  • __dict__
_sa_instance:為內(nèi)置的屬性
  • query() 查詢
    • all()
      ret = session.query(User).all() 得到的結(jié)果是model實例對象的列表,如果查詢不到,返回空list,可以用ret.column獲取指定字段的value

    • first()
      ret = session.query(User).first() 得到的結(jié)果是model實例對象,可以用ret.column獲取指定字段的value,等同于one_or_none(),如果查詢不到,返回None

    • scalar()
      ret = session.query(User.name).scalar() 得到的是name字段的value

    • get(id)
      ret = session.query(User).get(5) 得到的結(jié)果是model實例對象,可以用ret.column獲取指定字段的value,get的內(nèi)容必須是主鍵

  • update
    參數(shù):
    synchronize_session=False 不同步更新當(dāng)前session
    synchronize_session= fetch 更新之前進行查詢,獲取最新的更新對象
db.session.query(hostAssets).filter(hostAssets.business_id == ids).update({"business_id": None},
                                                                            synchronize_session=False)  
db.session.commit()
  • merge(obiect)
net_mission = get_object_or_404(NetMission, data['id'])  #  get_object_or_404 可換為普通的query語句
net_mission.mission_name = data['mission_name']  
db.session.merge(net_mission)  
db.session.commit()
  • delete
    參數(shù):
    synchronize_session=False 不同步更新當(dāng)前session
    synchronize_session= fetch 更新之前進行查詢,獲取最新的更新對象
db.session.query(hostAssetsTrend).filter(hostAssetsTrend.tid == ids).delete(synchronize_session=False)  
PingResultHours.query.filter(PingResultHours.create_time < time_1).delete(synchronize_session='fetch')
db.session.commit()
  • delete(object)
net_mission = get_object_or_404(NetMission, data['id'])  #  get_object_or_404 可換為普通的query語句
db.session.delete(net_mission)  
db.session.commit()  
  • add 添加
  • 方式一
_ = PingResult(source_ip=ret['source_ip'], order_ip=ret['order_ip'],
                              delay=round(float(ret['delay']), 2), loss=round(float(ret['loss']), 2),
                              create_time=datetime.now())
db.session.add(_)
db.session.commit()
  • 方式二
db.session.execute(PingResult.__table__.insert(),[{"business_name":"A"},{"business_name":"B"}])
db.session.commit()
  • commit
    • db.session.commit()
  • filter
    from sqlalchemy import and\_, or\_, desc
  • and_
hosts = model.query.filter(and_(model.source_ip == data['source_ip'],
                                   model.order_ip == data['order_ip'],
                                   model.create_time >= start_time)).order_by('create_time')
  • or_
key = '%%%s%%'%key
cond = model.query.filter(or_(hardwareFault.host_sn.like(key), hardwareFault.host_name.like(key))).all()
  • between
User.query.filter(User.id.between(1, 10)).all()
  • like
key = '%%%s%%'%key
cond = model.query.filter(or_(hardwareFault.host_sn.like(key), hardwareFault.host_name.like(key))).all()
  • order_by, desc, asc
latest_data = hosts.order_by(desc('create_time')).limit(1).first()  
latest_data = hosts.order_by(hosts.create_time.desc()).limit(1).first()
  • in_
hardwareFault.query.filter(hardwareFault.host_sn.in_([1,2,3,4,5,6])).all()
  • rollback
    • db.session.rollback()
  • close
    • db.session.close()
  • fllush
    • db.session.flush()

五、高級查詢

  • label
    • 類似于sql中的as,可以對查詢的字段或者運算的結(jié)果重命名
  • group_by
  • 可以group多個字段,即為要同時符合多個條件,結(jié)果可以用(result.字段名)獲取
    result = db.session.query(PingResultHours.source_ip, PingResultHours.order_ip, func.avg(
       PingResultHours.delay).label('delay'), func.avg(PingResultHours.loss).label('loss')).filter(
       PingResultHours.create_time > time_1, PingResultHours.create_time <= current_time).group_by(
       PingResultHours.source_ip, PingResultHours.order_ip).all() 
  • union,union_all
    相當(dāng)于sql中的union,對相同字段的結(jié)果做拼接
  • join
records = db.session.query(faultRepairRecord).  
join(hostAssets, faultRepairRecord.host_sn == .order_by(faultRepairRecord.create_time.desc()).
order_by('id').offset(start).limit(result['page_size']).all()

  • limit
    限制返回的結(jié)果數(shù)量
  • skip
    要跳過的項數(shù),SKIP 不能脫離 ORDER BY 子句單獨使用
  • distinct
    對字段去重
records = db.session.query(
distinct(performanceTrend.create_time),performanceTrend.create_time,performanceTrend.cpu_ratio,
performanceTrend.cpu_max,performanceTrend.memory_ratio,performanceTrend.hdd_ratio).  
filter(and_(performanceTrend.create_time >= new_starttime, performanceTrend.create_time <= new_endtime)).all()

六、Sql執(zhí)行方式

  • session.query
  • session.execute
    條件變量可以通過dict或者%s做替換
records = db.session.execute(
"select avg(type) as datatype,DATE_FORMAT(create_time,'%Y-%m-%d')as createTime,avg(cpu_ratio) as cpu_ratio,
max(cpu_max) as cpu_max,avg(memory_ratio) as memory_ratio, avg(hdd_ratio) as hdd_ratio 
from performance_trend where type=1 and 
(create_time > :stime  and create_time < :etime) group by createTime",{"stime":new_starttime,"etime":new_endtime}).fetchall()

七、內(nèi)建func函數(shù)

  • time類
    • 對查詢的time字段做格式化輸出
      func.date_format(PingResult.create_time,'%Y-%m-%d').label('create_time'))
  • math類
  • func.sum()
  • func.avg()
  • func.count()
  • func.max()
  • func.min()
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容