我們先來看下 happybase 連接池的使用:
import happybase
hbase_pool = happybase.ConnectionPool(host=conf['hbase']['host'], port=conf['hbase']['port'], size=100)
with hbase_pool.connection() as conn:
# do sth
可以看到一開始通過指定 host 和 port 初始化了一個(gè)大小為 100 的 hbase 連接池。使用 with 關(guān)鍵詞從池子里取出了一個(gè)連接,通過這個(gè)連接我們可以完成對(duì) hbase 的 CRUD 操作。知道怎么使用是遠(yuǎn)遠(yuǎn)不夠的,遇到一些復(fù)雜問題可能會(huì)無從下手,所以看了下 happybase 連接池的源碼,了解了連接池是怎么對(duì) hbase 的連接進(jìn)行管理的。
class ConnectionPool(object):
"""
Thread-safe connection pool.
.. versionadded:: 0.5
The `size` argument specifies how many connections this pool
manages. Additional keyword arguments are passed unmodified to the
:py:class:`happybase.Connection` constructor, with the exception of
the `autoconnect` argument, since maintaining connections is the
task of the pool.
:param int size: the maximum number of concurrently open connections
:param kwargs: keyword arguments passed to
:py:class:`happybase.Connection`
"""
def __init__(self, size, **kwargs):
if not isinstance(size, int):
raise TypeError("Pool 'size' arg must be an integer")
if not size > 0:
raise ValueError("Pool 'size' arg must be greater than zero")
logger.debug(
"Initializing connection pool with %d connections", size)
self._lock = threading.Lock()
self._queue = queue.LifoQueue(maxsize=size)
self._thread_connections = threading.local()
connection_kwargs = kwargs
connection_kwargs['autoconnect'] = False
for i in range(size):
connection = Connection(**connection_kwargs)
self._queue.put(connection)
# The first connection is made immediately so that trivial
# mistakes like unresolvable host names are raised immediately.
# Subsequent connections are connected lazily.
with self.connection():
pass
def _acquire_connection(self, timeout=None):
"""Acquire a connection from the pool."""
try:
return self._queue.get(True, timeout)
except queue.Empty:
raise NoConnectionsAvailable(
"No connection available from pool within specified "
"timeout")
def _return_connection(self, connection):
"""Return a connection to the pool."""
self._queue.put(connection)
@contextlib.contextmanager
def connection(self, timeout=None):
"""
Obtain a connection from the pool.
This method *must* be used as a context manager, i.e. with
Python's ``with`` block. Example::
with pool.connection() as connection:
pass # do something with the connection
If `timeout` is specified, this is the number of seconds to wait
for a connection to become available before
:py:exc:`NoConnectionsAvailable` is raised. If omitted, this
method waits forever for a connection to become available.
:param int timeout: number of seconds to wait (optional)
:return: active connection from the pool
:rtype: :py:class:`happybase.Connection`
"""
connection = getattr(self._thread_connections, 'current', None)
return_after_use = False
if connection is None:
# This is the outermost connection requests for this thread.
# Obtain a new connection from the pool and keep a reference
# in a thread local so that nested connection requests from
# the same thread can return the same connection instance.
#
# Note: this code acquires a lock before assigning to the
# thread local; see
# http://emptysquare.net/blog/another-thing-about-pythons-
# threadlocals/
return_after_use = True
connection = self._acquire_connection(timeout)
with self._lock:
self._thread_connections.current = connection
try:
# Open connection, because connections are opened lazily.
# This is a no-op for connections that are already open.
connection.open()
# Return value from the context manager's __enter__()
yield connection
except (TException, socket.error):
# Refresh the underlying Thrift client if an exception
# occurred in the Thrift layer, since we don't know whether
# the connection is still usable.
logger.info("Replacing tainted pool connection")
connection._refresh_thrift_client()
connection.open()
# Reraise to caller; see contextlib.contextmanager() docs
raise
finally:
# Remove thread local reference after the outermost 'with'
# block ends. Afterwards the thread no longer owns the
# connection.
if return_after_use:
del self._thread_connections.current
self._return_connection(connection)
連接池最核心的代碼就在上面,我們下面來分析一下。
init方法內(nèi)部,
self._lock = threading.Lock()
聲明了一個(gè)線程鎖
self._queue = queue.LifoQueue(maxsize=size)
聲明了一個(gè)線程安全的先入后出隊(duì)列,大小就是初始化的池子大小,用來存儲(chǔ) hbase 連接的
self._thread_connections = threading.local()
為不同線程對(duì)象保存一個(gè)本地變量
for i in range(size):
connection = Connection(**connection_kwargs)
self._queue.put(connection)
根據(jù) size 大小,初始化 size 個(gè)連接,并放入到 queue 中
那取連接怎么取呢?以及如何保存線程安全?我們看下 connection 方法
@contextlib.contextmanager
def connection(self, timeout=None):
"""
Obtain a connection from the pool.
This method *must* be used as a context manager, i.e. with
Python's ``with`` block. Example::
with pool.connection() as connection:
pass # do something with the connection
If `timeout` is specified, this is the number of seconds to wait
for a connection to become available before
:py:exc:`NoConnectionsAvailable` is raised. If omitted, this
method waits forever for a connection to become available.
:param int timeout: number of seconds to wait (optional)
:return: active connection from the pool
:rtype: :py:class:`happybase.Connection`
"""
connection = getattr(self._thread_connections, 'current', None)
return_after_use = False
if connection is None:
# This is the outermost connection requests for this thread.
# Obtain a new connection from the pool and keep a reference
# in a thread local so that nested connection requests from
# the same thread can return the same connection instance.
#
# Note: this code acquires a lock before assigning to the
# thread local; see
# http://emptysquare.net/blog/another-thing-about-pythons-
# threadlocals/
return_after_use = True
connection = self._acquire_connection(timeout)
with self._lock:
self._thread_connections.current = connection
try:
# Open connection, because connections are opened lazily.
# This is a no-op for connections that are already open.
connection.open()
# Return value from the context manager's __enter__()
yield connection
except (TException, socket.error):
# Refresh the underlying Thrift client if an exception
# occurred in the Thrift layer, since we don't know whether
# the connection is still usable.
logger.info("Replacing tainted pool connection")
connection._refresh_thrift_client()
connection.open()
# Reraise to caller; see contextlib.contextmanager() docs
raise
finally:
# Remove thread local reference after the outermost 'with'
# block ends. Afterwards the thread no longer owns the
# connection.
if return_after_use:
del self._thread_connections.current
self._return_connection(connection)
可以看到 connection 方法用@contextlib.contextmanager 裝飾器修飾了,保證了在使用連接池的時(shí)候必須使用 with 關(guān)鍵詞,在看連接池如何拿到一個(gè)連接之前,我們先看下連接的 yield 和釋放相關(guān)代碼:
try:
# Open connection, because connections are opened lazily.
# This is a no-op for connections that are already open.
connection.open()
# Return value from the context manager's __enter__()
yield connection
except (TException, socket.error):
# Refresh the underlying Thrift client if an exception
# occurred in the Thrift layer, since we don't know whether
# the connection is still usable.
logger.info("Replacing tainted pool connection")
connection._refresh_thrift_client()
connection.open()
# Reraise to caller; see contextlib.contextmanager() docs
raise
finally:
# Remove thread local reference after the outermost 'with'
# block ends. Afterwards the thread no longer owns the
# connection.
if return_after_use:
del self._thread_connections.current
self._return_connection(connection)
可以看到拿到一個(gè)連接后,會(huì) yield 出去,finally 里會(huì)把連接歸還連接池,中間的 except 異常需要注意下,當(dāng)某個(gè)連接在執(zhí)行的時(shí)候出現(xiàn)問題時(shí),會(huì)捕獲異常,并 refresh 一個(gè)新的連接,保證最后 finally 歸還給連接池的連接是可用的連接。except 捕獲的異常必然是 with 代碼內(nèi)的,代碼外的異常是無法捕獲的,所以需要保證 with 代碼塊結(jié)束了,對(duì)連接的使用就結(jié)束了,不然就會(huì)出現(xiàn)多個(gè)線程占用同一個(gè)連接這種情況。類似 scan 操作,返回結(jié)果是生成器,最好轉(zhuǎn)成 list 在 with 內(nèi)部返回,不然直接返回生成器的話,with 代碼外部遍歷時(shí)候,其實(shí)還是在用這個(gè)連接,而其實(shí) with 已結(jié)束,連接池就會(huì)認(rèn)為連接已經(jīng)用完了,會(huì)回收掉分配給其他的線程。
下面看下連接的獲得:
connection = getattr(self._thread_connections, 'current', None)
return_after_use = False
if connection is None:
# This is the outermost connection requests for this thread.
# Obtain a new connection from the pool and keep a reference
# in a thread local so that nested connection requests from
# the same thread can return the same connection instance.
#
# Note: this code acquires a lock before assigning to the
# thread local; see
# http://emptysquare.net/blog/another-thing-about-pythons-
# threadlocals/
return_after_use = True
connection = self._acquire_connection(timeout)
with self._lock:
self._thread_connections.current = connection
會(huì)首先獲取 _thread_connections 線程本地變量的 current 屬性,每個(gè)線程的 current 屬性都是獨(dú)立的。注意不同線程的 _thread_connections 都會(huì)指向同一個(gè)對(duì)象,因?yàn)檫@個(gè)變量在連接池初始化的時(shí)候就確定了。但是 python 的 thread_local 重寫了 getattr 方法,里面會(huì)調(diào)用一個(gè) patch 方法,保證每個(gè)線程 local 變量的設(shè)置和讀取都是獨(dú)立的。
下面就好理解了,如果連接為空,就去隊(duì)列取一下,然后 set 到本地變量中。
connection = self._acquire_connection(timeout)
with self._lock:
self._thread_connections.current = connection
考慮一個(gè)問題,如果是協(xié)程模型,這個(gè)連接池模型還能 work 嗎?
如果是 gevent patch 的,是可以的,因?yàn)?gevet 會(huì)把 threading.local 這一套重寫掉,每個(gè)協(xié)程拿到的對(duì)象都是不一樣的。