python mysql連接池 類

#!/usr/bin/env python
#coding=utf-8
from DBUtils.PooledDB import PooledDB 
import MySQLdb as mysql
from utils import util                                                                                        
import traceback
import config
import sys


class DB():
    def __init__(self):
        self.host = config.db_host
        self.name = config.db_name
        self.user = config.db_user
        self.passwd = config.db_passwd
        self.pool = PooledDB(mysql, mincached=4, maxcached=10, host=self.host,db=self.name,user=self.user,passwd=self.passwd,setsession=['SET AUTOCOMMIT = 1'])

    #連接數(shù)據(jù)庫
    def connect_db(self):
        self.db = self.pool.connection()
        self.cur = self.db.cursor()

    #關(guān)閉連接
    def close_db(self):
        self.cur.close()
        self.db.close()

    #執(zhí)行sql
    def execute(self, sql):
        self.connect_db()
        return self.cur.execute(sql)
    
    #獲取所有數(shù)據(jù)列表
    def get_list(self,table,fields):
        sql = "select %s from %s"% (",".join(fields),table)
        try: 
            self.execute(sql)
            result = self.cur.fetchall()
            if result:
                result = [dict((k,row[i]) for i, k in enumerate(fields)) for row in result]
            else:
                result = {}
            return result;
        except:
            util.WriteLog('db').info("Execute '%s' error: %s" % (sql, traceback.format_exc()))
        finally:
            self.close_db()

    #獲取某一條數(shù)據(jù),返回字典
    def get_one(self,table,fields,where):
        if isinstance(where, dict) and where:
            conditions = []
            for k,v in where.items():
                conditions.append("%s='%s'" % (k, v))
        sql = "select %s from %s where %s" % (",".join(fields),table,' AND '.join(conditions))
        try:
            self.execute(sql)
            result = self.cur.fetchone()
            if result:
                result = dict((k, result[i])for i, k in enumerate(fields))
            else:
                result = {}
            return result
        except:
            util.WriteLog('db').info("Execute '%s' error: %s" % (sql, traceback.format_exc()))
        finally:
            self.close_db()

    #更新數(shù)據(jù)
    def update(self,table,fields):
        data = ",".join(["%s='%s'"%(k,v) for k,v in fields.items()])
        sql = "update %s set %s where id=%s " % (table,data,fields["id"])
        try:
            return self.execute(sql)
        except:
            util.WriteLog('db').info("Execute '%s' error: %s" % (sql, traceback.format_exc()))
        finally:
            self.close_db()

    #添加數(shù)據(jù)
    def create(self,table,data):
        fields,values = [],[]
        for k, v in data.items():
            fields.append(k)
            values.append("'%s'" % v)
        sql = "insert into %s (%s) values (%s)" % (table,",".join(fields),",".join(values))
        try:
            return self.execute(sql)
        except:
            util.WriteLog('db').info("Execute '%s' error: %s" % (sql, traceback.format_exc()))
        finally:
            self.close_db()

    #刪除數(shù)據(jù)
    def delete(self,table,where):
        if isinstance(where, dict) and where:
            conditions = []
            for k,v in where.items():
                conditions.append("%s='%s'" % (k, v))
        sql = "delete from %s where %s" % (table,' AND '.join(conditions))
        try:
            return self.execute(sql)
        except:
            util.WriteLog('db').info("Execute '%s' error: %s" % (sql, traceback.format_exc()))
        finally:
            self.close_db()
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容