如何使數(shù)據(jù)庫訪問速度提升167倍

關(guān)鍵字:Redis NoSQL Flask RQ Flask_Cache cache
非關(guān)系型數(shù)據(jù)庫 內(nèi)存型數(shù)據(jù)庫 RQ隊(duì)列 緩存
之前上線了的一個(gè)應(yīng)用 -- Framework7+Vue+Flask開發(fā)實(shí)戰(zhàn) - PT保種管理系統(tǒng)1 - 概述,由于經(jīng)常要在海量數(shù)據(jù)中統(tǒng)計(jì)、查詢,隨著數(shù)據(jù)逐漸增多,每次數(shù)據(jù)庫訪問等待時(shí)間從幾百ms,增長到10幾秒!
所以,必須引入Cache機(jī)制。
目光轉(zhuǎn)向目前 流行的Redis --> 內(nèi)存型NoSQL數(shù)據(jù)庫

結(jié)果比較:速度提升167倍!

time-cost

測試程序:

from timeit import timeit

timeit(stmt="[Ob.query.get(i) for i in range(10000)]", setup="from __main__ import Ob", number=10)
# PostgreSQL 25.15s

timeit(stmt="[r.get(i) for i in range(10000)]", setup="from __main__ import r", number=10)
# Redis 9.32s

timeit(stmt="[pipe.get(i) for i in range(10000)]; pipe.execute()", setup="from __main__ import r; pipe=r.pipeline(False)", number=100)
# Redis Pipe 0.15s

讀取速度:62倍 167倍

timeit(stmt="[Cookie.query.filter_by(key='sys_status').update({'val':i}) for i in range(10000)]", setup="from app.models import Cookie", number=1)
# 20s

timeit(stmt="[r.set('tmp:%d'%i, i) for i in range(10000)]", setup="from __main__ import r;", number=10)
# 9.17s

timeit(stmt="[pipe.set('tmp:%d'%i, i) for i in range(10000)]; pipe.execute()", setup="from __main__ import r; pipe=r.pipeline(False)", number=10)
# 0.21s

寫入速度:43倍 95倍

以上是本地測試數(shù)據(jù)。如果你使用SaaS網(wǎng)上托管主機(jī),差距更加明顯!因?yàn)樗鼈兊臄?shù)據(jù)庫一般不在本地,像Heroku,數(shù)據(jù)庫在AWS,網(wǎng)絡(luò)耗時(shí)又增加不少(慢2倍以上)。

如何在我們的框架中引入Redis:

之前我們的Flask后臺(tái)框架:Vue 2.0 起步(4) 輕量級(jí)后端Flask用戶認(rèn)證 - 微信公眾號(hào)RSS
引入RQ,參考:https://beenje.github.io/blog/posts/running-background-tasks-with-flask-and-rq/

# /app/main/views.py

import redis
from rq import push_connection, pop_connection, Queue
from rq.job import Job
from . import tasks, ob_api

def get_redis_connection():
    redis_connection = getattr(g, '_redis_connection', None)
    if redis_connection is None:
        redis_url = current_app.config['REDIS_URL']
        redis_connection = g._redis_connection = redis.from_url(redis_url)
    return redis_connection

@main.before_request
def push_rq_connection():
    push_connection(get_redis_connection())
    app_status = r.hget('status', 'app')
    if app_status and app_status.decode('ascii')=='idle':
      q = Queue()
      task_list = ['ob_sync', 'db2redis', 'ob_seeding_sync' ]
      if (r.hget('status', 'tasks:ob_sync') is None) or (q.fetch_job(r.hget('status', 'tasks:ob_sync').decode('ascii')) is None): # 同步OB 種子
        job = q.enqueue_call(func=tasks.ob_sync, args=('all',), timeout=3600, result_ttl=5*3600)  # 結(jié)果緩存6*3600
        print('start queue: tasks.ob_sync...', job.get_id())
        r.hset('status', 'app', 'ob_sync')
        r.hset('status', 'tasks:ob_sync', job.get_id())
      if (r.hget('status', 'tasks:db2redis') is None) or (q.fetch_job(r.hget('status', 'tasks:db2redis').decode('ascii')) is None): # 檢查 db2redis有沒有結(jié)果
        # job2
        pass
      if (r.hget('status', 'tasks:ob_seeding_sync') is None) or (q.fetch_job(r.hget('status', 'tasks:ob_seeding_sync').decode('ascii')) is None): # 串行
        # job3
        pass
    elif app_status:
      q = Queue()
      task_list = ['ob_sync', 'db2redis', 'ob_seeding_sync' ]
      for task in task_list:
        if r.hget('status', 'tasks:%s'%task):
          job_key = r.hget('status', 'tasks:%s'%task).decode('ascii')
          job = q.fetch_job(job_key)
          if job:
            print(task, job_key, job.status, job.result)
            if job.status in ['finished', 'failed'] : r.hset('status', 'app', 'idle')
            else: r.hset('status', 'app', task)
    else:
      r.hset('status', 'app', 'idle')

@main.teardown_request
def pop_rq_connection(exception=None):
    pop_connection()

Flask_Cache

另外,對(duì)于一些經(jīng)常訪問且變化不大的路由(views),可以引入Flask_Cache 高速緩存:

配置文件引入Redis server:

# config.py
class Config:
  REDISTOGO_URL = os.getenv('REDIS_URL', 'redis://localhost:6379')

app初始化時(shí),引入flask_cache,flask_redis:

## /app/__init__.py
from flask_cache import Cache
from flask_redis import FlaskRedis
from urllib import parse

cache = Cache()
r = FlaskRedis()

def create_app(config_name):
    parse.uses_netloc.append("redis")
  redis_url = parse.urlparse(app.config['REDISTOGO_URL'])
  cache.init_app(app, config={
    'CACHE_TYPE': 'redis',
    'CACHE_KEY_PREFIX': 'fcache',
    'CACHE_REDIS_HOST': redis_url.hostname,
    'CACHE_REDIS_PORT': redis_url.port,
    'CACHE_REDIS_USERNAME': redis_url.username or '',
    'CACHE_REDIS_PASSWORD': redis_url.password or '',
    # 'CACHE_REDIS_URL': app.config['REDISTOGO_URL'],
    })

  r.init_app(app)

路由里就可以按需使用cache裝飾器了:

# /app/main/views.py
import redis
from rq import push_connection, pop_connection, Queue
from rq.job import Job
from .. import db, admin, cache, r

@cache.memoize(timeout=20)
def query_db():
  time.sleep(3)
  r.set('query_db', time.ctime())
  return time.ctime() # you must return something, otherwise Cache will not work

@main.route("/cache")
def cache_view():
  app = current_app._get_current_object()
  with app.app_context():
    start = time.time()
    query_db()
    return "Results from DB in {:.2f}sec".format(time.time()-start)

@main.route('/api/ob_report', methods=['GET', 'POST'])
@login_required
@cache.cached(timeout=50)
def api_ob_report():
  app = current_app._get_current_object()
  rsp = ob_api.ob_report(app)
  return jsonify(code=rsp['code'], msg=rsp['msg'], sys_status=rsp['sys_status'])

注:Redis只在Linux平臺(tái)可用,如果你用Windows:

Windows環(huán)境(Cygwin)下,使用PostgreSQL, Redis

2017數(shù)據(jù)庫排名:


db rank
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容