Ansible的runner源碼剖析第三部分(run函數(shù)的學(xué)習(xí))
(1)run函數(shù)學(xué)習(xí)(api函數(shù)入口程序,非常重要)
def run(self):
'''1.判斷主機列表是否存在'''
if not self.run_hosts:
self.run_hosts = self.inventory.list_hosts(self.pattern)
# hosts文件就是一堆包含ip的列表['192.168.1.101']
hosts = self.run_hosts
#空主機直接返回結(jié)果
if len(hosts) == 0:
self.callbacks.on_no_hosts()
return dict(contacted={}, dark={})
'''2.全局多進程變量,為實例自己'''
global multiprocessing_runner
multiprocessing_runner = self
results = None
# Check if this is an action plugin. Some of them are designed
# to be ran once per group of hosts. Example module: pause,
# run once per hostgroup, rather than pausing once per each
# host.
'''3.通過模塊名找到插件'''
p = utils.plugins.action_loader.get(self.module_name, self)
'''4.優(yōu)化進程數(shù)量,如果進程數(shù)量大于主機數(shù)量,按照主機數(shù)量來跑多進程'''
if self.forks == 0 or self.forks > len(hosts):
self.forks = len(hosts)
if (p and (getattr(p, 'BYPASS_HOST_LOOP', None)) or self.run_once):
# Expose the current hostgroup to the bypassing plugins
self.host_set = hosts
# We aren't iterating over all the hosts in this
# group. So, just choose the "delegate_to" host if that is defined and is
# one of the targeted hosts, otherwise pick the first host in our group to
# construct the conn object with.
if self.delegate_to is not None and self.delegate_to in hosts:
host = self.delegate_to
else:
host = hosts[0]
result_data = self._executor(host, None).result
# Create a ResultData item for each host in this group
# using the returned result. If we didn't do this we would
# get false reports of dark hosts.
results = [ ReturnData(host=h, result=result_data, comm_ok=True) \
for h in hosts ]
del self.host_set
'''5.把主機列表傳遞給函數(shù)去跑結(jié)果'''
elif self.forks > 1:
try:
'''去跑主機列表結(jié)果'''
results = self._parallel_exec(hosts)
except IOError, ie:
print ie.errno
if ie.errno == 32:
# broken pipe from Ctrl+C
raise errors.AnsibleError("interrupted")
raise
else:
results = [ self._executor(h, None) for h in hosts ]
#6.結(jié)果處理
return self._partition_results(results)
(2)_parallel_exec函數(shù)學(xué)習(xí)(跑主機結(jié)果列表函數(shù))
def _parallel_exec(self, hosts):
''' handles mulitprocessing when more than 1 fork is required '''
manager = multiprocessing.Manager()
'''1任務(wù)隊列'''
job_queue = manager.Queue()
for host in hosts:
job_queue.put(host)
'''2.結(jié)果隊列'''
result_queue = manager.Queue()
try:
fileno = sys.stdin.fileno()
except ValueError:
fileno = None
workers = []
'''4.起forks進程數(shù)的進程去執(zhí)行_executor_hook函數(shù),函數(shù)的參數(shù)是任務(wù)隊列,結(jié)果隊列,以及new_stdin,按照最上面例子定義forks=5,則此處self.forks就是5'''
for i in range(self.forks):
new_stdin = None
if fileno is not None:
try:
new_stdin = os.fdopen(os.dup(fileno))
except OSError, e:
# couldn't dupe stdin, most likely because it's
# not a valid file descriptor, so we just rely on
# using the one that was passed in
pass
'''起5個進程同時跑_executor_hook函數(shù),參數(shù)分別是任務(wù)隊列job_queue,結(jié)果隊列result_queue,new_stdin'''
prc = multiprocessing.Process(target=_executor_hook,
args=(job_queue, result_queue, new_stdin))
prc.start()
workers.append(prc)
try:
'''6.保證每個子進程都執(zhí)行完畢'''
for worker in workers:
worker.join()
except KeyboardInterrupt:
for worker in workers:
worker.terminate()
worker.join()
results = []
try:
'''7.結(jié)果隊列不為空的話,不斷取出結(jié)果,放到result里'''
while not result_queue.empty():
results.append(result_queue.get(block=False))
except socket.error:
raise errors.AnsibleError("<interrupted>")
return results
(3)_executor_hook函數(shù)
def _executor_hook(job_queue, result_queue, new_stdin):
# attempt workaround of https://github.com/newsapps/beeswithmachineguns/issues/17
# this function also not present in CentOS 6
if HAS_ATFORK:
atfork()
signal.signal(signal.SIGINT, signal.SIG_IGN)
'''1.任務(wù)隊列不為空,不斷跑任務(wù)'''
while not job_queue.empty():
try:
'''2.取一個IP出來,然后去跑IP,結(jié)果為return_data'''
host = job_queue.get(block=False)
'''3.調(diào)用全局變量下的實例方法_executor,結(jié)果輸出到結(jié)果隊列中'''
return_data = multiprocessing_runner._executor(host, new_stdin)
result_queue.put(return_data)
except Queue.Empty:
pass
except:
traceback.print_exc()
(4)_executor函數(shù)
def _executor(self, host, new_stdin):
''' handler for multiprocessing library '''
try:
fileno = sys.stdin.fileno()
except ValueError:
fileno = None
try:
self._new_stdin = new_stdin
if not new_stdin and fileno is not None:
try:
self._new_stdin = os.fdopen(os.dup(fileno))
except OSError, e:
# couldn't dupe stdin, most likely because it's
# not a valid file descriptor, so we just rely on
# using the one that was passed in
pass
'''1.正常傳遞host,去跑數(shù)據(jù)'''
exec_rc = self._executor_internal(host, new_stdin)
'''2.如果返回類型不對,拋出異常'''
if type(exec_rc) != ReturnData:
raise Exception("unexpected return type: %s" % type(exec_rc))
# redundant, right?
'''3.如果沒有聯(lián)通的話,callbacks調(diào)用不可達函數(shù)處理數(shù)據(jù)'''
if not exec_rc.comm_ok:
self.callbacks.on_unreachable(host, exec_rc.result)
return exec_rc
except errors.AnsibleError, ae:
msg = str(ae)
self.callbacks.on_unreachable(host, msg)
return ReturnData(host=host, comm_ok=False, result=dict(failed=True, msg=msg))
except Exception:
msg = traceback.format_exc()
self.callbacks.on_unreachable(host, msg)
return ReturnData(host=host, comm_ok=False, result=dict(failed=True, msg=msg))
(5)_executor_internal函數(shù)
def _executor_internal(self, host, new_stdin):
''' executes any module one or more times '''
# We build the proper injected dictionary for all future
# templating operations in this run
inject = self.get_inject_vars(host)
# Then we selectively merge some variable dictionaries down to a
# single dictionary, used to template the HostVars for this host
temp_vars = self.inventory.get_variables(host, vault_password=self.vault_pass)
temp_vars = utils.combine_vars(temp_vars, inject['combined_cache'] )
temp_vars = utils.combine_vars(temp_vars, {'groups': inject['groups']})
temp_vars = utils.combine_vars(temp_vars, self.play_vars)
temp_vars = utils.combine_vars(temp_vars, self.play_file_vars)
temp_vars = utils.combine_vars(temp_vars, self.extra_vars)
hostvars = HostVars(temp_vars, self.inventory, vault_password=self.vault_pass)
# and we save the HostVars in the injected dictionary so they
# may be referenced from playbooks/templates
inject['hostvars'] = hostvars
host_connection = inject.get('ansible_connection', self.transport)
if host_connection in [ 'paramiko', 'ssh', 'accelerate' ]:
port = hostvars.get('ansible_ssh_port', self.remote_port)
if port is None:
port = C.DEFAULT_REMOTE_PORT
else:
# fireball, local, etc
port = self.remote_port
if self.inventory.basedir() is not None:
inject['inventory_dir'] = self.inventory.basedir()
if self.inventory.src() is not None:
inject['inventory_file'] = self.inventory.src()
# could be already set by playbook code
inject.setdefault('ansible_version', utils.version_info(gitinfo=False))
# allow with_foo to work in playbooks...
items = None
items_plugin = self.module_vars.get('items_lookup_plugin', None)
if items_plugin is not None and items_plugin in utils.plugins.lookup_loader:
basedir = self.basedir
if '_original_file' in inject:
basedir = os.path.dirname(inject['_original_file'])
filesdir = os.path.join(basedir, '..', 'files')
if os.path.exists(filesdir):
basedir = filesdir
try:
items_terms = self.module_vars.get('items_lookup_terms', '')
items_terms = template.template(basedir, items_terms, inject)
items = utils.plugins.lookup_loader.get(items_plugin, runner=self, basedir=basedir).run(items_terms, inject=inject)
except errors.AnsibleUndefinedVariable, e:
if 'has no attribute' in str(e):
# the undefined variable was an attribute of a variable that does
# exist, so try and run this through the conditional check to see
# if the user wanted to skip something on being undefined
if utils.check_conditional(self.conditional, self.basedir, inject, fail_on_undefined=True):
# the conditional check passed, so we have to fail here
raise
else:
# the conditional failed, so we skip this task
result = utils.jsonify(dict(changed=False, skipped=True))
self.callbacks.on_skipped(host, None)
return ReturnData(host=host, result=result)
except errors.AnsibleError, e:
raise
except Exception, e:
raise errors.AnsibleError("Unexpected error while executing task: %s" % str(e))
# strip out any jinja2 template syntax within
# the data returned by the lookup plugin
items = utils._clean_data_struct(items, from_remote=True)
if items is None:
items = []
else:
if type(items) != list:
raise errors.AnsibleError("lookup plugins have to return a list: %r" % items)
if len(items) and utils.is_list_of_strings(items) and self.module_name in [ 'apt', 'yum', 'pkgng', 'zypper' ]:
# hack for apt, yum, and pkgng so that with_items maps back into a single module call
use_these_items = []
for x in items:
inject['item'] = x
if not self.conditional or utils.check_conditional(self.conditional, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
use_these_items.append(x)
inject['item'] = ",".join(use_these_items)
items = None
def _safe_template_complex_args(args, inject):
# Ensure the complex args here are a dictionary, but
# first template them if they contain a variable
returned_args = args
if isinstance(args, basestring):
# If the complex_args were evaluated to a dictionary and there are
# more keys in the templated version than the evaled version, some
# param inserted additional keys (the template() call also runs
# safe_eval on the var if it looks like it's a datastructure). If the
# evaled_args are not a dict, it's most likely a whole variable (ie.
# args: {{var}}), in which case there's no way to detect the proper
# count of params in the dictionary.
templated_args = template.template(self.basedir, args, inject, convert_bare=True)
evaled_args = utils.safe_eval(args)
if isinstance(evaled_args, dict) and len(evaled_args) > 0 and len(evaled_args) != len(templated_args):
raise errors.AnsibleError("a variable tried to insert extra parameters into the args for this task")
# set the returned_args to the templated_args
returned_args = templated_args
# and a final check to make sure the complex args are a dict
if returned_args is not None and not isinstance(returned_args, dict):
raise errors.AnsibleError("args must be a dictionary, received %s" % returned_args)
return returned_args
# logic to decide how to run things depends on whether with_items is used
if items is None:
complex_args = _safe_template_complex_args(self.complex_args, inject)
return self._executor_internal_inner(host, self.module_name, self.module_args, inject, port, complex_args=complex_args)
elif len(items) > 0:
# executing using with_items, so make multiple calls
# TODO: refactor
if self.background > 0:
raise errors.AnsibleError("lookup plugins (with_*) cannot be used with async tasks")
all_comm_ok = True
all_changed = False
all_failed = False
results = []
for x in items:
# use a fresh inject for each item
this_inject = inject.copy()
this_inject['item'] = x
complex_args = _safe_template_complex_args(self.complex_args, this_inject)
'''調(diào)用用別的函數(shù)這次是真的去跑數(shù)據(jù)了'''
result = self._executor_internal_inner(
host,
self.module_name,
self.module_args,
this_inject,
port,
complex_args=complex_args
)
if 'stdout' in result.result and 'stdout_lines' not in result.result:
result.result['stdout_lines'] = result.result['stdout'].splitlines()
results.append(result.result)
if result.comm_ok == False:
all_comm_ok = False
all_failed = True
break
for x in results:
if x.get('changed') == True:
all_changed = True
if (x.get('failed') == True) or ('failed_when_result' in x and [x['failed_when_result']] or [('rc' in x) and (x['rc'] != 0)])[0]:
all_failed = True
break
msg = 'All items completed'
if all_failed:
msg = "One or more items failed."
rd_result = dict(failed=all_failed, changed=all_changed, results=results, msg=msg)
if not all_failed:
del rd_result['failed']
return ReturnData(host=host, comm_ok=all_comm_ok, result=rd_result)
else:
self.callbacks.on_skipped(host, None)
return ReturnData(host=host, comm_ok=True, result=dict(changed=False, skipped=True))
# *****************************************************
(6)_executor_internal_inner函數(shù)
def _executor_internal_inner(self, host, module_name, module_args, inject, port, is_chained=False, complex_args=None):
''' decides how to invoke a module '''
# late processing of parameterized become_user (with_items,..)
if self.become_user_var is not None:
self.become_user = template.template(self.basedir, self.become_user_var, inject)
# module_name may be dynamic (but cannot contain {{ ansible_ssh_user }})
module_name = template.template(self.basedir, module_name, inject)
if module_name in utils.plugins.action_loader:
if self.background != 0:
raise errors.AnsibleError("async mode is not supported with the %s module" % module_name)
handler = utils.plugins.action_loader.get(module_name, self)
elif self.background == 0:
handler = utils.plugins.action_loader.get('normal', self)
else:
handler = utils.plugins.action_loader.get('async', self)
if type(self.conditional) != list:
self.conditional = [ self.conditional ]
for cond in self.conditional:
if not utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
result = dict(changed=False, skipped=True)
if self.no_log:
result = utils.censor_unlogged_data(result)
self.callbacks.on_skipped(host, result)
else:
self.callbacks.on_skipped(host, inject.get('item',None))
return ReturnData(host=host, result=utils.jsonify(result))
if getattr(handler, 'setup', None) is not None:
handler.setup(module_name, inject)
conn = None
actual_host = inject.get('ansible_ssh_host', host)
# allow ansible_ssh_host to be templated
actual_host = template.template(self.basedir, actual_host, inject, fail_on_undefined=True)
actual_port = port
actual_user = inject.get('ansible_ssh_user', self.remote_user)
actual_pass = inject.get('ansible_ssh_pass', self.remote_pass)
actual_transport = inject.get('ansible_connection', self.transport)
actual_private_key_file = inject.get('ansible_ssh_private_key_file', self.private_key_file)
actual_private_key_file = template.template(self.basedir, actual_private_key_file, inject, fail_on_undefined=True)
self.become = utils.boolean(inject.get('ansible_become', inject.get('ansible_sudo', inject.get('ansible_su', self.become))))
self.become_user = inject.get('ansible_become_user', inject.get('ansible_sudo_user', inject.get('ansible_su_user',self.become_user)))
self.become_pass = inject.get('ansible_become_pass', inject.get('ansible_sudo_pass', inject.get('ansible_su_pass', self.become_pass)))
self.become_exe = inject.get('ansible_become_exe', inject.get('ansible_sudo_exe', self.become_exe))
self.become_method = inject.get('ansible_become_method', self.become_method)
# select default root user in case self.become requested
# but no user specified; happens e.g. in host vars when
# just ansible_become=True is specified
if self.become and self.become_user is None:
self.become_user = 'root'
if actual_private_key_file is not None:
actual_private_key_file = os.path.expanduser(actual_private_key_file)
if self.accelerate and actual_transport != 'local':
#Fix to get the inventory name of the host to accelerate plugin
if inject.get('ansible_ssh_host', None):
self.accelerate_inventory_host = host
else:
self.accelerate_inventory_host = None
# if we're using accelerated mode, force the
# transport to accelerate
actual_transport = "accelerate"
if not self.accelerate_port:
self.accelerate_port = C.ACCELERATE_PORT
actual_port = inject.get('ansible_ssh_port', port)
# the delegated host may have different SSH port configured, etc
# and we need to transfer those, and only those, variables
self.delegate_to = inject.get('delegate_to', None)
if self.delegate_to:
self.delegate_to = template.template(self.basedir, self.delegate_to, inject)
if self.delegate_to is not None:
delegate = self._compute_delegate(actual_pass, inject)
actual_transport = delegate['transport']
actual_host = delegate['ssh_host']
actual_port = delegate['port']
actual_user = delegate['user']
actual_pass = delegate['pass']
actual_private_key_file = delegate['private_key_file']
self.become_pass = delegate.get('become_pass',delegate.get('sudo_pass'))
inject = delegate['inject']
# set resolved delegate_to into inject so modules can call _remote_checksum
inject['delegate_to'] = self.delegate_to
# user/pass may still contain variables at this stage
actual_user = template.template(self.basedir, actual_user, inject)
try:
actual_pass = template.template(self.basedir, actual_pass, inject)
self.become_pass = template.template(self.basedir, self.become_pass, inject)
except:
# ignore password template errors, could be triggered by password charaters #10468
pass
# make actual_user available as __magic__ ansible_ssh_user variable
inject['ansible_ssh_user'] = actual_user
try:
if actual_transport == 'accelerate':
# for accelerate, we stuff both ports into a single
# variable so that we don't have to mangle other function
# calls just to accommodate this one case
actual_port = [actual_port, self.accelerate_port]
elif actual_port is not None:
actual_port = int(template.template(self.basedir, actual_port, inject))
except ValueError, e:
result = dict(failed=True, msg="FAILED: Configured port \"%s\" is not a valid port, expected integer" % actual_port)
return ReturnData(host=host, comm_ok=False, result=result)
try:
if self.delegate_to or host != actual_host:
delegate_host = host
else:
delegate_host = None
conn = self.connector.connect(actual_host, actual_port, actual_user, actual_pass, actual_transport, actual_private_key_file, delegate_host)
default_shell = getattr(conn, 'default_shell', '')
shell_type = inject.get('ansible_shell_type')
if not shell_type:
if default_shell:
shell_type = default_shell
else:
shell_type = os.path.basename(C.DEFAULT_EXECUTABLE)
shell_plugin = utils.plugins.shell_loader.get(shell_type)
if shell_plugin is None:
shell_plugin = utils.plugins.shell_loader.get('sh')
conn.shell = shell_plugin
except errors.AnsibleConnectionFailed, e:
result = dict(failed=True, msg="FAILED: %s" % str(e))
return ReturnData(host=host, comm_ok=False, result=result)
tmp = ''
# action plugins may DECLARE via TRANSFERS_FILES = True that they need a remote tmp path working dir
if self._early_needs_tmp_path(module_name, handler):
tmp = self._make_tmp_path(conn)
# allow module args to work as a dictionary
# though it is usually a string
if isinstance(module_args, dict):
module_args = utils.serialize_args(module_args)
# render module_args and complex_args templates
try:
# When templating module_args, we need to be careful to ensure
# that no variables inadvertently (or maliciously) add params
# to the list of args. We do this by counting the number of k=v
# pairs before and after templating.
num_args_pre = self._count_module_args(module_args, allow_dupes=True)
module_args = template.template(self.basedir, module_args, inject, fail_on_undefined=self.error_on_undefined_vars)
num_args_post = self._count_module_args(module_args)
if num_args_pre != num_args_post:
raise errors.AnsibleError("A variable inserted a new parameter into the module args. " + \
"Be sure to quote variables if they contain equal signs (for example: \"{{var}}\").")
# And we also make sure nothing added in special flags for things
# like the command/shell module (ie. #USE_SHELL)
if '#USE_SHELL' in module_args:
raise errors.AnsibleError("A variable tried to add #USE_SHELL to the module arguments.")
complex_args = template.template(self.basedir, complex_args, inject, fail_on_undefined=self.error_on_undefined_vars)
except jinja2.exceptions.UndefinedError, e:
raise errors.AnsibleUndefinedVariable("One or more undefined variables: %s" % str(e))
# filter omitted arguments out from complex_args
if complex_args:
complex_args = dict(filter(lambda x: x[1] != self.omit_token, complex_args.iteritems()))
# Filter omitted arguments out from module_args.
# We do this with split_args instead of parse_kv to ensure
# that things are not unquoted/requoted incorrectly
args = split_args(module_args)
final_args = []
for arg in args:
if '=' in arg:
k,v = arg.split('=', 1)
if unquote(v) != self.omit_token:
final_args.append(arg)
else:
# not a k=v param, append it
final_args.append(arg)
module_args = ' '.join(final_args)
result = handler.run(conn, tmp, module_name, module_args, inject, complex_args)
# Code for do until feature
until = self.module_vars.get('until', None)
if until is not None and result.comm_ok:
inject[self.module_vars.get('register')] = result.result
cond = template.template(self.basedir, until, inject, expand_lists=False)
if not utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
retries = template.template(self.basedir, self.module_vars.get('retries'), inject, expand_lists=False)
delay = self.module_vars.get('delay')
for x in range(1, int(retries) + 1):
# template the delay, cast to float and sleep
delay = template.template(self.basedir, delay, inject, expand_lists=False)
delay = float(delay)
time.sleep(delay)
tmp = ''
if self._early_needs_tmp_path(module_name, handler):
tmp = self._make_tmp_path(conn)
result = handler.run(conn, tmp, module_name, module_args, inject, complex_args)
result.result['attempts'] = x
vv("Result from run %i is: %s" % (x, result.result))
inject[self.module_vars.get('register')] = result.result
cond = template.template(self.basedir, until, inject, expand_lists=False)
if utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
break
if result.result['attempts'] == retries and not utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
result.result['failed'] = True
result.result['msg'] = "Task failed as maximum retries was encountered"
else:
result.result['attempts'] = 0
conn.close()
if not result.comm_ok:
# connection or parsing errors...
self.callbacks.on_unreachable(host, result.result)
else:
data = result.result
# https://github.com/ansible/ansible/issues/4958
if hasattr(sys.stdout, "isatty"):
if "stdout" in data and sys.stdout.isatty():
if not string_functions.isprintable(data['stdout']):
data['stdout'] = ''
if 'item' in inject:
result.result['item'] = inject['item']
result.result['invocation'] = dict(
module_args=module_args,
module_name=module_name
)
changed_when = self.module_vars.get('changed_when')
failed_when = self.module_vars.get('failed_when')
if (changed_when is not None or failed_when is not None) and self.background == 0:
register = self.module_vars.get('register')
if register is not None:
if 'stdout' in data:
data['stdout_lines'] = data['stdout'].splitlines()
inject[register] = data
# only run the final checks if the async_status has finished,
# or if we're not running an async_status check at all
if (module_name == 'async_status' and "finished" in data) or module_name != 'async_status':
if changed_when is not None and 'skipped' not in data:
data['changed'] = utils.check_conditional(changed_when, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars)
if failed_when is not None and 'skipped' not in data:
data['failed_when_result'] = data['failed'] = utils.check_conditional(failed_when, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars)
if is_chained:
# no callbacks
return result
if 'skipped' in data:
self.callbacks.on_skipped(host, inject.get('item',None))
if self.no_log:
data = utils.censor_unlogged_data(data)
if not result.is_successful():
ignore_errors = self.module_vars.get('ignore_errors', False)
self.callbacks.on_failed(host, data, ignore_errors)
else:
if self.diff:
self.callbacks.on_file_diff(conn.host, result.diff)
self.callbacks.on_ok(host, data)
return result
(7)run函數(shù)中'
---3.通過模塊名找到插件---
p = utils.plugins.action_loader.get(self.module_name, self)
其中action_loader是一個PluginLoader類的實例
action_loader = PluginLoader(
'ActionModule',
'ansible.runner.action_plugins',
C.DEFAULT_ACTION_PLUGIN_PATH,
'action_plugins'
)
class PluginLoader(object):
'''
PluginLoader loads plugins from the configured plugin directories.
It searches for plugins by iterating through the combined list of
play basedirs, configured paths, and the python path.
The first match is used.
'''
def __init__(self, class_name, package, config, subdir, aliases={}):
self.class_name = class_name
self.package = package
self.config = config
self.subdir = subdir
self.aliases = aliases
if not class_name in MODULE_CACHE:
MODULE_CACHE[class_name] = {}
if not class_name in PATH_CACHE:
PATH_CACHE[class_name] = None
if not class_name in PLUGIN_PATH_CACHE:
PLUGIN_PATH_CACHE[class_name] = {}
self._module_cache = MODULE_CACHE[class_name]
self._paths = PATH_CACHE[class_name]
self._plugin_path_cache = PLUGIN_PATH_CACHE[class_name]
self._extra_dirs = []
self._searched_paths = set()
'''get方法'''
def get(self, name, *args, **kwargs):
'''此處的name就是傳遞進去的模塊名字'''
''' instantiates a plugin of the given name using arguments '''
if name in self.aliases:
name = self.aliases[name]
'''通過find_plugin方法去找到模塊路徑'''
path = self.find_plugin(name)
if path is None:
return None
if path not in self._module_cache:
self._module_cache[path] = imp.load_source('.'.join([self.package, name]), path)
'''根據(jù)模塊路徑,加載模塊方法'''
return getattr(self._module_cache[path], self.class_name)(*args, **kwargs)
'''通過模塊名找到模塊路徑'''
def find_plugin(self, name, suffixes=None):
''' Find a plugin named name '''
if not suffixes:
if self.class_name:
suffixes = ['.py']
else:
suffixes = ['.py', '']
#frozenset就是集合,可以當(dāng)做set看
potential_names = frozenset('%s%s' % (name, s) for s in suffixes)
for full_name in potential_names:
if full_name in self._plugin_path_cache:
return self._plugin_path_cache[full_name]
found = None
for path in [p for p in self._get_paths() if p not in self._searched_paths]:
if os.path.isdir(path):
full_paths = (os.path.join(path, f) for f in os.listdir(path))
for full_path in (f for f in full_paths if os.path.isfile(f)):
for suffix in suffixes:
if full_path.endswith(suffix):
full_name = os.path.basename(full_path)
break
else: # Yes, this is a for-else: http://bit.ly/1ElPkyg
continue
if full_name not in self._plugin_path_cache:
self._plugin_path_cache[full_name] = full_path
self._searched_paths.add(path)
for full_name in potential_names:
if full_name in self._plugin_path_cache:
return self._plugin_path_cache[full_name]
# if nothing is found, try finding alias/deprecated
if not name.startswith('_'):
for alias_name in ('_%s' % n for n in potential_names):
# We've already cached all the paths at this point
if alias_name in self._plugin_path_cache:
return self._plugin_path_cache[alias_name]
return None