談?wù)?happybase 的連接池

我們先來看下 happybase 連接池的使用:

import happybase
hbase_pool = happybase.ConnectionPool(host=conf['hbase']['host'], port=conf['hbase']['port'], size=100)
with hbase_pool.connection() as conn:
    # do sth

可以看到一開始通過指定 host 和 port 初始化了一個(gè)大小為 100 的 hbase 連接池。使用 with 關(guān)鍵詞從池子里取出了一個(gè)連接,通過這個(gè)連接我們可以完成對(duì) hbase 的 CRUD 操作。知道怎么使用是遠(yuǎn)遠(yuǎn)不夠的,遇到一些復(fù)雜問題可能會(huì)無從下手,所以看了下 happybase 連接池的源碼,了解了連接池是怎么對(duì) hbase 的連接進(jìn)行管理的。

class ConnectionPool(object):
    """
    Thread-safe connection pool.

    .. versionadded:: 0.5

    The `size` argument specifies how many connections this pool
    manages. Additional keyword arguments are passed unmodified to the
    :py:class:`happybase.Connection` constructor, with the exception of
    the `autoconnect` argument, since maintaining connections is the
    task of the pool.

    :param int size: the maximum number of concurrently open connections
    :param kwargs: keyword arguments passed to
                   :py:class:`happybase.Connection`
    """
    def __init__(self, size, **kwargs):
        if not isinstance(size, int):
            raise TypeError("Pool 'size' arg must be an integer")

        if not size > 0:
            raise ValueError("Pool 'size' arg must be greater than zero")

        logger.debug(
            "Initializing connection pool with %d connections", size)

        self._lock = threading.Lock()
        self._queue = queue.LifoQueue(maxsize=size)
        self._thread_connections = threading.local()

        connection_kwargs = kwargs
        connection_kwargs['autoconnect'] = False

        for i in range(size):
            connection = Connection(**connection_kwargs)
            self._queue.put(connection)

        # The first connection is made immediately so that trivial
        # mistakes like unresolvable host names are raised immediately.
        # Subsequent connections are connected lazily.
        with self.connection():
            pass

    def _acquire_connection(self, timeout=None):
        """Acquire a connection from the pool."""
        try:
            return self._queue.get(True, timeout)
        except queue.Empty:
            raise NoConnectionsAvailable(
                "No connection available from pool within specified "
                "timeout")

    def _return_connection(self, connection):
        """Return a connection to the pool."""
        self._queue.put(connection)

    @contextlib.contextmanager
    def connection(self, timeout=None):
        """
        Obtain a connection from the pool.

        This method *must* be used as a context manager, i.e. with
        Python's ``with`` block. Example::

            with pool.connection() as connection:
                pass  # do something with the connection

        If `timeout` is specified, this is the number of seconds to wait
        for a connection to become available before
        :py:exc:`NoConnectionsAvailable` is raised. If omitted, this
        method waits forever for a connection to become available.

        :param int timeout: number of seconds to wait (optional)
        :return: active connection from the pool
        :rtype: :py:class:`happybase.Connection`
        """

        connection = getattr(self._thread_connections, 'current', None)

        return_after_use = False
        if connection is None:
            # This is the outermost connection requests for this thread.
            # Obtain a new connection from the pool and keep a reference
            # in a thread local so that nested connection requests from
            # the same thread can return the same connection instance.
            #
            # Note: this code acquires a lock before assigning to the
            # thread local; see
            # http://emptysquare.net/blog/another-thing-about-pythons-
            # threadlocals/
            return_after_use = True
            connection = self._acquire_connection(timeout)
            with self._lock:
                self._thread_connections.current = connection

        try:
            # Open connection, because connections are opened lazily.
            # This is a no-op for connections that are already open.
            connection.open()

            # Return value from the context manager's __enter__()
            yield connection

        except (TException, socket.error):
            # Refresh the underlying Thrift client if an exception
            # occurred in the Thrift layer, since we don't know whether
            # the connection is still usable.
            logger.info("Replacing tainted pool connection")
            connection._refresh_thrift_client()
            connection.open()

            # Reraise to caller; see contextlib.contextmanager() docs
            raise

        finally:
            # Remove thread local reference after the outermost 'with'
            # block ends. Afterwards the thread no longer owns the
            # connection.
            if return_after_use:
                del self._thread_connections.current
                self._return_connection(connection)

連接池最核心的代碼就在上面,我們下面來分析一下。
init方法內(nèi)部,

self._lock = threading.Lock()
聲明了一個(gè)線程鎖
self._queue = queue.LifoQueue(maxsize=size)
聲明了一個(gè)線程安全的先入后出隊(duì)列,大小就是初始化的池子大小,用來存儲(chǔ) hbase 連接的
self._thread_connections = threading.local()
為不同線程對(duì)象保存一個(gè)本地變量

for i in range(size):
    connection = Connection(**connection_kwargs)
    self._queue.put(connection)
根據(jù) size 大小,初始化 size 個(gè)連接,并放入到 queue 中

那取連接怎么取呢?以及如何保存線程安全?我們看下 connection 方法

@contextlib.contextmanager
    def connection(self, timeout=None):
        """
        Obtain a connection from the pool.

        This method *must* be used as a context manager, i.e. with
        Python's ``with`` block. Example::

            with pool.connection() as connection:
                pass  # do something with the connection

        If `timeout` is specified, this is the number of seconds to wait
        for a connection to become available before
        :py:exc:`NoConnectionsAvailable` is raised. If omitted, this
        method waits forever for a connection to become available.

        :param int timeout: number of seconds to wait (optional)
        :return: active connection from the pool
        :rtype: :py:class:`happybase.Connection`
        """

        connection = getattr(self._thread_connections, 'current', None)

        return_after_use = False
        if connection is None:
            # This is the outermost connection requests for this thread.
            # Obtain a new connection from the pool and keep a reference
            # in a thread local so that nested connection requests from
            # the same thread can return the same connection instance.
            #
            # Note: this code acquires a lock before assigning to the
            # thread local; see
            # http://emptysquare.net/blog/another-thing-about-pythons-
            # threadlocals/
            return_after_use = True
            connection = self._acquire_connection(timeout)
            with self._lock:
                self._thread_connections.current = connection

        try:
            # Open connection, because connections are opened lazily.
            # This is a no-op for connections that are already open.
            connection.open()

            # Return value from the context manager's __enter__()
            yield connection

        except (TException, socket.error):
            # Refresh the underlying Thrift client if an exception
            # occurred in the Thrift layer, since we don't know whether
            # the connection is still usable.
            logger.info("Replacing tainted pool connection")
            connection._refresh_thrift_client()
            connection.open()

            # Reraise to caller; see contextlib.contextmanager() docs
            raise

        finally:
            # Remove thread local reference after the outermost 'with'
            # block ends. Afterwards the thread no longer owns the
            # connection.
            if return_after_use:
                del self._thread_connections.current
                self._return_connection(connection)

可以看到 connection 方法用@contextlib.contextmanager 裝飾器修飾了,保證了在使用連接池的時(shí)候必須使用 with 關(guān)鍵詞,在看連接池如何拿到一個(gè)連接之前,我們先看下連接的 yield 和釋放相關(guān)代碼:

try:
    # Open connection, because connections are opened lazily.
    # This is a no-op for connections that are already open.
    connection.open()

    # Return value from the context manager's __enter__()
    yield connection

except (TException, socket.error):
    # Refresh the underlying Thrift client if an exception
    # occurred in the Thrift layer, since we don't know whether
    # the connection is still usable.
    logger.info("Replacing tainted pool connection")
    connection._refresh_thrift_client()
    connection.open()

    # Reraise to caller; see contextlib.contextmanager() docs
    raise

finally:
    # Remove thread local reference after the outermost 'with'
    # block ends. Afterwards the thread no longer owns the
    # connection.
    if return_after_use:
        del self._thread_connections.current
        self._return_connection(connection)

可以看到拿到一個(gè)連接后,會(huì) yield 出去,finally 里會(huì)把連接歸還連接池,中間的 except 異常需要注意下,當(dāng)某個(gè)連接在執(zhí)行的時(shí)候出現(xiàn)問題時(shí),會(huì)捕獲異常,并 refresh 一個(gè)新的連接,保證最后 finally 歸還給連接池的連接是可用的連接。except 捕獲的異常必然是 with 代碼內(nèi)的,代碼外的異常是無法捕獲的,所以需要保證 with 代碼塊結(jié)束了,對(duì)連接的使用就結(jié)束了,不然就會(huì)出現(xiàn)多個(gè)線程占用同一個(gè)連接這種情況。類似 scan 操作,返回結(jié)果是生成器,最好轉(zhuǎn)成 list 在 with 內(nèi)部返回,不然直接返回生成器的話,with 代碼外部遍歷時(shí)候,其實(shí)還是在用這個(gè)連接,而其實(shí) with 已結(jié)束,連接池就會(huì)認(rèn)為連接已經(jīng)用完了,會(huì)回收掉分配給其他的線程。
下面看下連接的獲得:

connection = getattr(self._thread_connections, 'current', None)

return_after_use = False
if connection is None:
    # This is the outermost connection requests for this thread.
    # Obtain a new connection from the pool and keep a reference
    # in a thread local so that nested connection requests from
    # the same thread can return the same connection instance.
    #
    # Note: this code acquires a lock before assigning to the
    # thread local; see
    # http://emptysquare.net/blog/another-thing-about-pythons-
    # threadlocals/
    return_after_use = True
    connection = self._acquire_connection(timeout)
    with self._lock:
        self._thread_connections.current = connection

會(huì)首先獲取 _thread_connections 線程本地變量的 current 屬性,每個(gè)線程的 current 屬性都是獨(dú)立的。注意不同線程的 _thread_connections 都會(huì)指向同一個(gè)對(duì)象,因?yàn)檫@個(gè)變量在連接池初始化的時(shí)候就確定了。但是 python 的 thread_local 重寫了 getattr 方法,里面會(huì)調(diào)用一個(gè) patch 方法,保證每個(gè)線程 local 變量的設(shè)置和讀取都是獨(dú)立的。
下面就好理解了,如果連接為空,就去隊(duì)列取一下,然后 set 到本地變量中。

connection = self._acquire_connection(timeout)
with self._lock:
    self._thread_connections.current = connection

考慮一個(gè)問題,如果是協(xié)程模型,這個(gè)連接池模型還能 work 嗎?
如果是 gevent patch 的,是可以的,因?yàn)?gevet 會(huì)把 threading.local 這一套重寫掉,每個(gè)協(xié)程拿到的對(duì)象都是不一樣的。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,540評(píng)論 19 139
  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 178,942評(píng)論 25 709
  • #一、開始 最近不知道怎么了。 突然想起了當(dāng)兵的那段時(shí)間。 那時(shí)我剛當(dāng)兵。 男兒當(dāng)兵無非是為了渴望在部隊(duì)做成一番成...
    智慧測(cè)試閱讀 650評(píng)論 0 1
  • 經(jīng)常會(huì)遇到,經(jīng)常會(huì)混淆,所以記錄一下 主要有3種情況 1.內(nèi)容中有return,finally中沒有return2...
    hongdada閱讀 806評(píng)論 0 0
  • 一天,一個(gè)博士坐船欣賞風(fēng)景。 在船上,博士問漁夫:“你會(huì)生物嗎?”漁夫說不會(huì),博士就說:“那你的生命就要失去4分之...
    萬事從容閱讀 259評(píng)論 0 0

友情鏈接更多精彩內(nèi)容