Ansible從入門到放棄------Ansible源碼剖析(3)

Ansible的runner源碼剖析第三部分(run函數(shù)的學(xué)習(xí))

(1)run函數(shù)學(xué)習(xí)(api函數(shù)入口程序,非常重要)

def run(self):
        '''1.判斷主機列表是否存在'''
        if not self.run_hosts:
            self.run_hosts = self.inventory.list_hosts(self.pattern)
        # hosts文件就是一堆包含ip的列表['192.168.1.101']
        hosts = self.run_hosts
        #空主機直接返回結(jié)果
        if len(hosts) == 0:
            self.callbacks.on_no_hosts()
            return dict(contacted={}, dark={})

        '''2.全局多進程變量,為實例自己'''
        global multiprocessing_runner
        multiprocessing_runner = self
        results = None

        # Check if this is an action plugin. Some of them are designed
        # to be ran once per group of hosts. Example module: pause,
        # run once per hostgroup, rather than pausing once per each
        # host.
        '''3.通過模塊名找到插件'''
        p = utils.plugins.action_loader.get(self.module_name, self)

        '''4.優(yōu)化進程數(shù)量,如果進程數(shù)量大于主機數(shù)量,按照主機數(shù)量來跑多進程'''
        if self.forks == 0 or self.forks > len(hosts):
            self.forks = len(hosts)

        if (p and (getattr(p, 'BYPASS_HOST_LOOP', None)) or self.run_once):

            # Expose the current hostgroup to the bypassing plugins
            self.host_set = hosts
            # We aren't iterating over all the hosts in this
            # group. So, just choose the "delegate_to" host if that is defined and is
            # one of the targeted hosts, otherwise pick the first host in our group to
            # construct the conn object with.
            if self.delegate_to is not None and self.delegate_to in hosts:
                host = self.delegate_to
            else:
                host = hosts[0]

            result_data = self._executor(host, None).result
            # Create a ResultData item for each host in this group
            # using the returned result. If we didn't do this we would
            # get false reports of dark hosts.
            results = [ ReturnData(host=h, result=result_data, comm_ok=True) \
                           for h in hosts ]
            del self.host_set
        '''5.把主機列表傳遞給函數(shù)去跑結(jié)果'''
        elif self.forks > 1:
            try:
                '''去跑主機列表結(jié)果'''
                results = self._parallel_exec(hosts)
            except IOError, ie:
                print ie.errno
                if ie.errno == 32:
                    # broken pipe from Ctrl+C
                    raise errors.AnsibleError("interrupted")
                raise
        else:
            results = [ self._executor(h, None) for h in hosts ]
          #6.結(jié)果處理
        return self._partition_results(results)

(2)_parallel_exec函數(shù)學(xué)習(xí)(跑主機結(jié)果列表函數(shù))

def _parallel_exec(self, hosts):
        ''' handles mulitprocessing when more than 1 fork is required '''

        manager = multiprocessing.Manager()
        '''1任務(wù)隊列'''
        job_queue = manager.Queue()
        for host in hosts:
            job_queue.put(host)
        '''2.結(jié)果隊列'''
        result_queue = manager.Queue()

        try:
            fileno = sys.stdin.fileno()
        except ValueError:
            fileno = None

        workers = []
        '''4.起forks進程數(shù)的進程去執(zhí)行_executor_hook函數(shù),函數(shù)的參數(shù)是任務(wù)隊列,結(jié)果隊列,以及new_stdin,按照最上面例子定義forks=5,則此處self.forks就是5'''
        for i in range(self.forks):
            new_stdin = None
            if fileno is not None:
                try:
                    new_stdin = os.fdopen(os.dup(fileno))
                except OSError, e:
                    # couldn't dupe stdin, most likely because it's
                    # not a valid file descriptor, so we just rely on
                    # using the one that was passed in
                    pass
            '''起5個進程同時跑_executor_hook函數(shù),參數(shù)分別是任務(wù)隊列job_queue,結(jié)果隊列result_queue,new_stdin'''
            prc = multiprocessing.Process(target=_executor_hook,
                args=(job_queue, result_queue, new_stdin))
            prc.start()
            workers.append(prc)

        try:
            '''6.保證每個子進程都執(zhí)行完畢'''
            for worker in workers:
                worker.join()
        except KeyboardInterrupt:
            for worker in workers:
                worker.terminate()
                worker.join()

        results = []
        try:
            '''7.結(jié)果隊列不為空的話,不斷取出結(jié)果,放到result里'''
            while not result_queue.empty():
                results.append(result_queue.get(block=False))
        except socket.error:
            raise errors.AnsibleError("<interrupted>")
        return results

(3)_executor_hook函數(shù)

def _executor_hook(job_queue, result_queue, new_stdin):

    # attempt workaround of https://github.com/newsapps/beeswithmachineguns/issues/17
    # this function also not present in CentOS 6
    if HAS_ATFORK:
        atfork()

    signal.signal(signal.SIGINT, signal.SIG_IGN)
     '''1.任務(wù)隊列不為空,不斷跑任務(wù)'''
    while not job_queue.empty():
        try:
            '''2.取一個IP出來,然后去跑IP,結(jié)果為return_data'''
            host = job_queue.get(block=False)
            '''3.調(diào)用全局變量下的實例方法_executor,結(jié)果輸出到結(jié)果隊列中'''
            return_data = multiprocessing_runner._executor(host, new_stdin)
            result_queue.put(return_data)
        except Queue.Empty:
            pass
        except:
            traceback.print_exc()

(4)_executor函數(shù)

def _executor(self, host, new_stdin):
        ''' handler for multiprocessing library '''

        try:
            fileno = sys.stdin.fileno()
        except ValueError:
            fileno = None

        try:
            self._new_stdin = new_stdin
            if not new_stdin and fileno is not None:
                try:
                    self._new_stdin = os.fdopen(os.dup(fileno))
                except OSError, e:
                    # couldn't dupe stdin, most likely because it's
                    # not a valid file descriptor, so we just rely on
                    # using the one that was passed in
                    pass
            '''1.正常傳遞host,去跑數(shù)據(jù)'''
            exec_rc = self._executor_internal(host, new_stdin)
            '''2.如果返回類型不對,拋出異常'''
            if type(exec_rc) != ReturnData:
                raise Exception("unexpected return type: %s" % type(exec_rc))
            # redundant, right?
            '''3.如果沒有聯(lián)通的話,callbacks調(diào)用不可達函數(shù)處理數(shù)據(jù)'''
            if not exec_rc.comm_ok:
                self.callbacks.on_unreachable(host, exec_rc.result)
            return exec_rc
        except errors.AnsibleError, ae:
            msg = str(ae)
            self.callbacks.on_unreachable(host, msg)
            return ReturnData(host=host, comm_ok=False, result=dict(failed=True, msg=msg))
        except Exception:
            msg = traceback.format_exc()
            self.callbacks.on_unreachable(host, msg)
            return ReturnData(host=host, comm_ok=False, result=dict(failed=True, msg=msg))

(5)_executor_internal函數(shù)

def _executor_internal(self, host, new_stdin):
        ''' executes any module one or more times '''

        # We build the proper injected dictionary for all future
        # templating operations in this run
        inject = self.get_inject_vars(host)

        # Then we selectively merge some variable dictionaries down to a
        # single dictionary, used to template the HostVars for this host
        temp_vars = self.inventory.get_variables(host, vault_password=self.vault_pass)
        temp_vars = utils.combine_vars(temp_vars, inject['combined_cache'] )
        temp_vars = utils.combine_vars(temp_vars, {'groups': inject['groups']})
        temp_vars = utils.combine_vars(temp_vars, self.play_vars)
        temp_vars = utils.combine_vars(temp_vars, self.play_file_vars)
        temp_vars = utils.combine_vars(temp_vars, self.extra_vars)

        hostvars = HostVars(temp_vars, self.inventory, vault_password=self.vault_pass)

        # and we save the HostVars in the injected dictionary so they
        # may be referenced from playbooks/templates
        inject['hostvars'] = hostvars

        host_connection = inject.get('ansible_connection', self.transport)
        if host_connection in [ 'paramiko', 'ssh', 'accelerate' ]:
            port = hostvars.get('ansible_ssh_port', self.remote_port)
            if port is None:
                port = C.DEFAULT_REMOTE_PORT
        else:
            # fireball, local, etc
            port = self.remote_port

        if self.inventory.basedir() is not None:
            inject['inventory_dir'] = self.inventory.basedir()

        if self.inventory.src() is not None:
            inject['inventory_file'] = self.inventory.src()

        # could be already set by playbook code
        inject.setdefault('ansible_version', utils.version_info(gitinfo=False))

        # allow with_foo to work in playbooks...
        items = None
        items_plugin = self.module_vars.get('items_lookup_plugin', None)

        if items_plugin is not None and items_plugin in utils.plugins.lookup_loader:

            basedir = self.basedir
            if '_original_file' in inject:
                basedir = os.path.dirname(inject['_original_file'])
                filesdir = os.path.join(basedir, '..', 'files')
                if os.path.exists(filesdir):
                    basedir = filesdir

            try:
                items_terms = self.module_vars.get('items_lookup_terms', '')
                items_terms = template.template(basedir, items_terms, inject)
                items = utils.plugins.lookup_loader.get(items_plugin, runner=self, basedir=basedir).run(items_terms, inject=inject)
            except errors.AnsibleUndefinedVariable, e:
                if 'has no attribute' in str(e):
                    # the undefined variable was an attribute of a variable that does
                    # exist, so try and run this through the conditional check to see
                    # if the user wanted to skip something on being undefined
                    if utils.check_conditional(self.conditional, self.basedir, inject, fail_on_undefined=True):
                        # the conditional check passed, so we have to fail here
                        raise
                    else:
                        # the conditional failed, so we skip this task
                        result = utils.jsonify(dict(changed=False, skipped=True))
                        self.callbacks.on_skipped(host, None)
                        return ReturnData(host=host, result=result)
            except errors.AnsibleError, e:
                raise
            except Exception, e:
                raise errors.AnsibleError("Unexpected error while executing task: %s" % str(e))

            # strip out any jinja2 template syntax within
            # the data returned by the lookup plugin
            items = utils._clean_data_struct(items, from_remote=True)
            if items is None:
                items = []
            else:
                if type(items) != list:
                    raise errors.AnsibleError("lookup plugins have to return a list: %r" % items)

                if len(items) and utils.is_list_of_strings(items) and self.module_name in [ 'apt', 'yum', 'pkgng', 'zypper' ]:
                    # hack for apt, yum, and pkgng so that with_items maps back into a single module call
                    use_these_items = []
                    for x in items:
                        inject['item'] = x
                        if not self.conditional or utils.check_conditional(self.conditional, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
                            use_these_items.append(x)
                    inject['item'] = ",".join(use_these_items)
                    items = None

        def _safe_template_complex_args(args, inject):
            # Ensure the complex args here are a dictionary, but
            # first template them if they contain a variable

            returned_args = args
            if isinstance(args, basestring):
                # If the complex_args were evaluated to a dictionary and there are
                # more keys in the templated version than the evaled version, some
                # param inserted additional keys (the template() call also runs
                # safe_eval on the var if it looks like it's a datastructure). If the
                # evaled_args are not a dict, it's most likely a whole variable (ie.
                # args: {{var}}), in which case there's no way to detect the proper
                # count of params in the dictionary.

                templated_args = template.template(self.basedir, args, inject, convert_bare=True)
                evaled_args = utils.safe_eval(args)

                if isinstance(evaled_args, dict) and len(evaled_args) > 0 and len(evaled_args) != len(templated_args):
                    raise errors.AnsibleError("a variable tried to insert extra parameters into the args for this task")

                # set the returned_args to the templated_args
                returned_args = templated_args

            # and a final check to make sure the complex args are a dict
            if returned_args is not None and not isinstance(returned_args, dict):
                raise errors.AnsibleError("args must be a dictionary, received %s" % returned_args)

            return returned_args

        # logic to decide how to run things depends on whether with_items is used
        if items is None:
            complex_args = _safe_template_complex_args(self.complex_args, inject)
            return self._executor_internal_inner(host, self.module_name, self.module_args, inject, port, complex_args=complex_args)
        elif len(items) > 0:

            # executing using with_items, so make multiple calls
            # TODO: refactor

            if self.background > 0:
                raise errors.AnsibleError("lookup plugins (with_*) cannot be used with async tasks")

            all_comm_ok = True
            all_changed = False
            all_failed = False
            results = []
            for x in items:
                # use a fresh inject for each item
                this_inject = inject.copy()
                this_inject['item'] = x

                complex_args = _safe_template_complex_args(self.complex_args, this_inject)
                '''調(diào)用用別的函數(shù)這次是真的去跑數(shù)據(jù)了'''
                result = self._executor_internal_inner(
                     host,
                     self.module_name,
                     self.module_args,
                     this_inject,
                     port,
                     complex_args=complex_args
                )

                if 'stdout' in result.result and 'stdout_lines' not in result.result:
                    result.result['stdout_lines'] = result.result['stdout'].splitlines()

                results.append(result.result)
                if result.comm_ok == False:
                    all_comm_ok = False
                    all_failed = True
                    break
                for x in results:
                    if x.get('changed') == True:
                        all_changed = True
                    if (x.get('failed') == True) or ('failed_when_result' in x and [x['failed_when_result']] or [('rc' in x) and (x['rc'] != 0)])[0]:
                        all_failed = True
                        break
            msg = 'All items completed'
            if all_failed:
                msg = "One or more items failed."
            rd_result = dict(failed=all_failed, changed=all_changed, results=results, msg=msg)
            if not all_failed:
                del rd_result['failed']
            return ReturnData(host=host, comm_ok=all_comm_ok, result=rd_result)
        else:
            self.callbacks.on_skipped(host, None)
            return ReturnData(host=host, comm_ok=True, result=dict(changed=False, skipped=True))

    # *****************************************************

(6)_executor_internal_inner函數(shù)

def _executor_internal_inner(self, host, module_name, module_args, inject, port, is_chained=False, complex_args=None):
        ''' decides how to invoke a module '''

        # late processing of parameterized become_user (with_items,..)
        if self.become_user_var is not None:
            self.become_user = template.template(self.basedir, self.become_user_var, inject)

        # module_name may be dynamic (but cannot contain {{ ansible_ssh_user }})
        module_name  = template.template(self.basedir, module_name, inject)

        if module_name in utils.plugins.action_loader:
            if self.background != 0:
                raise errors.AnsibleError("async mode is not supported with the %s module" % module_name)
            handler = utils.plugins.action_loader.get(module_name, self)
        elif self.background == 0:
            handler = utils.plugins.action_loader.get('normal', self)
        else:
            handler = utils.plugins.action_loader.get('async', self)

        if type(self.conditional) != list:
            self.conditional = [ self.conditional ]

        for cond in self.conditional:

            if not utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
                result = dict(changed=False, skipped=True)
                if self.no_log:
                    result = utils.censor_unlogged_data(result)
                    self.callbacks.on_skipped(host, result)
                else:
                    self.callbacks.on_skipped(host, inject.get('item',None))
                return ReturnData(host=host, result=utils.jsonify(result))

        if getattr(handler, 'setup', None) is not None:
            handler.setup(module_name, inject)
        conn = None
        actual_host = inject.get('ansible_ssh_host', host)
        # allow ansible_ssh_host to be templated
        actual_host = template.template(self.basedir, actual_host, inject, fail_on_undefined=True)
        actual_port = port
        actual_user = inject.get('ansible_ssh_user', self.remote_user)
        actual_pass = inject.get('ansible_ssh_pass', self.remote_pass)
        actual_transport = inject.get('ansible_connection', self.transport)
        actual_private_key_file = inject.get('ansible_ssh_private_key_file', self.private_key_file)
        actual_private_key_file = template.template(self.basedir, actual_private_key_file, inject, fail_on_undefined=True)

        self.become = utils.boolean(inject.get('ansible_become', inject.get('ansible_sudo', inject.get('ansible_su', self.become))))
        self.become_user = inject.get('ansible_become_user', inject.get('ansible_sudo_user', inject.get('ansible_su_user',self.become_user)))
        self.become_pass = inject.get('ansible_become_pass', inject.get('ansible_sudo_pass', inject.get('ansible_su_pass', self.become_pass)))
        self.become_exe = inject.get('ansible_become_exe', inject.get('ansible_sudo_exe', self.become_exe))
        self.become_method = inject.get('ansible_become_method', self.become_method)

        # select default root user in case self.become requested
        # but no user specified; happens e.g. in host vars when
        # just ansible_become=True is specified
        if self.become and self.become_user is None:
            self.become_user = 'root'

        if actual_private_key_file is not None:
            actual_private_key_file = os.path.expanduser(actual_private_key_file)

        if self.accelerate and actual_transport != 'local':
            #Fix to get the inventory name of the host to accelerate plugin
            if inject.get('ansible_ssh_host', None):
                self.accelerate_inventory_host = host
            else:
                self.accelerate_inventory_host = None
            # if we're using accelerated mode, force the
            # transport to accelerate
            actual_transport = "accelerate"
            if not self.accelerate_port:
                self.accelerate_port = C.ACCELERATE_PORT

        actual_port = inject.get('ansible_ssh_port', port)

        # the delegated host may have different SSH port configured, etc
        # and we need to transfer those, and only those, variables
        self.delegate_to = inject.get('delegate_to', None)
        if self.delegate_to:
            self.delegate_to = template.template(self.basedir, self.delegate_to, inject)

        if self.delegate_to is not None:
            delegate = self._compute_delegate(actual_pass, inject)
            actual_transport = delegate['transport']
            actual_host = delegate['ssh_host']
            actual_port = delegate['port']
            actual_user = delegate['user']
            actual_pass = delegate['pass']
            actual_private_key_file = delegate['private_key_file']
            self.become_pass = delegate.get('become_pass',delegate.get('sudo_pass'))
            inject = delegate['inject']
            # set resolved delegate_to into inject so modules can call _remote_checksum
            inject['delegate_to'] = self.delegate_to

        # user/pass may still contain variables at this stage
        actual_user = template.template(self.basedir, actual_user, inject)
        try:
            actual_pass = template.template(self.basedir, actual_pass, inject)
            self.become_pass = template.template(self.basedir, self.become_pass, inject)
        except:
            # ignore password template errors, could be triggered by password charaters #10468
            pass

        # make actual_user available as __magic__ ansible_ssh_user variable
        inject['ansible_ssh_user'] = actual_user

        try:
            if actual_transport == 'accelerate':
                # for accelerate, we stuff both ports into a single
                # variable so that we don't have to mangle other function
                # calls just to accommodate this one case
                actual_port = [actual_port, self.accelerate_port]
            elif actual_port is not None:
                actual_port = int(template.template(self.basedir, actual_port, inject))
        except ValueError, e:
            result = dict(failed=True, msg="FAILED: Configured port \"%s\" is not a valid port, expected integer" % actual_port)
            return ReturnData(host=host, comm_ok=False, result=result)

        try:
            if self.delegate_to or host != actual_host:
                delegate_host = host
            else:
                delegate_host = None
            conn = self.connector.connect(actual_host, actual_port, actual_user, actual_pass, actual_transport, actual_private_key_file, delegate_host)

            default_shell = getattr(conn, 'default_shell', '')
            shell_type = inject.get('ansible_shell_type')
            if not shell_type:
                if default_shell:
                    shell_type = default_shell
                else:
                    shell_type = os.path.basename(C.DEFAULT_EXECUTABLE)

            shell_plugin = utils.plugins.shell_loader.get(shell_type)
            if shell_plugin is None:
                shell_plugin = utils.plugins.shell_loader.get('sh')
            conn.shell = shell_plugin

        except errors.AnsibleConnectionFailed, e:
            result = dict(failed=True, msg="FAILED: %s" % str(e))
            return ReturnData(host=host, comm_ok=False, result=result)

        tmp = ''
        # action plugins may DECLARE via TRANSFERS_FILES = True that they need a remote tmp path working dir
        if self._early_needs_tmp_path(module_name, handler):
            tmp = self._make_tmp_path(conn)

        # allow module args to work as a dictionary
        # though it is usually a string
        if isinstance(module_args, dict):
            module_args = utils.serialize_args(module_args)

        # render module_args and complex_args templates
        try:
            # When templating module_args, we need to be careful to ensure
            # that no variables inadvertently (or maliciously) add params
            # to the list of args. We do this by counting the number of k=v
            # pairs before and after templating.
            num_args_pre = self._count_module_args(module_args, allow_dupes=True)
            module_args = template.template(self.basedir, module_args, inject, fail_on_undefined=self.error_on_undefined_vars)
            num_args_post = self._count_module_args(module_args)
            if num_args_pre != num_args_post:
                raise errors.AnsibleError("A variable inserted a new parameter into the module args. " + \
                                          "Be sure to quote variables if they contain equal signs (for example: \"{{var}}\").")
            # And we also make sure nothing added in special flags for things
            # like the command/shell module (ie. #USE_SHELL)
            if '#USE_SHELL' in module_args:
                raise errors.AnsibleError("A variable tried to add #USE_SHELL to the module arguments.")
            complex_args = template.template(self.basedir, complex_args, inject, fail_on_undefined=self.error_on_undefined_vars)
        except jinja2.exceptions.UndefinedError, e:
            raise errors.AnsibleUndefinedVariable("One or more undefined variables: %s" % str(e))

        # filter omitted arguments out from complex_args
        if complex_args:
            complex_args = dict(filter(lambda x: x[1] != self.omit_token, complex_args.iteritems()))

        # Filter omitted arguments out from module_args.
        # We do this with split_args instead of parse_kv to ensure
        # that things are not unquoted/requoted incorrectly
        args = split_args(module_args)
        final_args = []
        for arg in args:
            if '=' in arg:
                k,v = arg.split('=', 1)
                if unquote(v) != self.omit_token:
                    final_args.append(arg)
            else:
                # not a k=v param, append it
                final_args.append(arg)
        module_args = ' '.join(final_args)

        result = handler.run(conn, tmp, module_name, module_args, inject, complex_args)
        # Code for do until feature
        until = self.module_vars.get('until', None)
        if until is not None and result.comm_ok:
            inject[self.module_vars.get('register')] = result.result

            cond = template.template(self.basedir, until, inject, expand_lists=False)
            if not utils.check_conditional(cond,  self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
                retries = template.template(self.basedir, self.module_vars.get('retries'), inject, expand_lists=False)
                delay   = self.module_vars.get('delay')
                for x in range(1, int(retries) + 1):
                    # template the delay, cast to float and sleep
                    delay = template.template(self.basedir, delay, inject, expand_lists=False)
                    delay = float(delay)
                    time.sleep(delay)
                    tmp = ''
                    if self._early_needs_tmp_path(module_name, handler):
                        tmp = self._make_tmp_path(conn)
                    result = handler.run(conn, tmp, module_name, module_args, inject, complex_args)
                    result.result['attempts'] = x
                    vv("Result from run %i is: %s" % (x, result.result))
                    inject[self.module_vars.get('register')] = result.result
                    cond = template.template(self.basedir, until, inject, expand_lists=False)
                    if utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
                        break
                if result.result['attempts'] == retries and not utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
                    result.result['failed'] = True
                    result.result['msg'] = "Task failed as maximum retries was encountered"
            else:
                result.result['attempts'] = 0
        conn.close()

        if not result.comm_ok:
            # connection or parsing errors...
            self.callbacks.on_unreachable(host, result.result)
        else:
            data = result.result

            # https://github.com/ansible/ansible/issues/4958
            if hasattr(sys.stdout, "isatty"):
                if "stdout" in data and sys.stdout.isatty():
                    if not string_functions.isprintable(data['stdout']):
                        data['stdout'] = ''

            if 'item' in inject:
                result.result['item'] = inject['item']

            result.result['invocation'] = dict(
                module_args=module_args,
                module_name=module_name
            )

            changed_when = self.module_vars.get('changed_when')
            failed_when = self.module_vars.get('failed_when')
            if (changed_when is not None or failed_when is not None) and self.background == 0:
                register = self.module_vars.get('register')
                if register is not None:
                    if 'stdout' in data:
                        data['stdout_lines'] = data['stdout'].splitlines()
                    inject[register] = data
                # only run the final checks if the async_status has finished,
                # or if we're not running an async_status check at all
                if (module_name == 'async_status' and "finished" in data) or module_name != 'async_status':
                    if changed_when is not None and 'skipped' not in data:
                        data['changed'] = utils.check_conditional(changed_when, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars)
                    if failed_when is not None and 'skipped' not in data:
                        data['failed_when_result'] = data['failed'] = utils.check_conditional(failed_when, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars)

            if is_chained:
                # no callbacks
                return result
            if 'skipped' in data:
                self.callbacks.on_skipped(host, inject.get('item',None))

            if self.no_log:
                data = utils.censor_unlogged_data(data)

            if not result.is_successful():
                ignore_errors = self.module_vars.get('ignore_errors', False)
                self.callbacks.on_failed(host, data, ignore_errors)
            else:
                if self.diff:
                    self.callbacks.on_file_diff(conn.host, result.diff)
                self.callbacks.on_ok(host, data)

        return result

(7)run函數(shù)中'
---3.通過模塊名找到插件---

p = utils.plugins.action_loader.get(self.module_name, self)

其中action_loader是一個PluginLoader類的實例

action_loader = PluginLoader(
    'ActionModule',
    'ansible.runner.action_plugins',
    C.DEFAULT_ACTION_PLUGIN_PATH,
    'action_plugins'
)

class PluginLoader(object):

    '''
    PluginLoader loads plugins from the configured plugin directories.

    It searches for plugins by iterating through the combined list of
    play basedirs, configured paths, and the python path.
    The first match is used.
    '''

    def __init__(self, class_name, package, config, subdir, aliases={}):

        self.class_name         = class_name
        self.package            = package
        self.config             = config
        self.subdir             = subdir
        self.aliases            = aliases

        if not class_name in MODULE_CACHE:
            MODULE_CACHE[class_name] = {}
        if not class_name in PATH_CACHE:
            PATH_CACHE[class_name] = None
        if not class_name in PLUGIN_PATH_CACHE:
            PLUGIN_PATH_CACHE[class_name] = {}

        self._module_cache      = MODULE_CACHE[class_name]
        self._paths             = PATH_CACHE[class_name]
        self._plugin_path_cache = PLUGIN_PATH_CACHE[class_name]

        self._extra_dirs = []
        self._searched_paths = set()

     '''get方法'''
def get(self, name, *args, **kwargs):
        '''此處的name就是傳遞進去的模塊名字'''
        ''' instantiates a plugin of the given name using arguments '''
        if name in self.aliases:
            name = self.aliases[name]
        '''通過find_plugin方法去找到模塊路徑'''
        path = self.find_plugin(name)
        if path is None:
            return None
        if path not in self._module_cache:
            self._module_cache[path] = imp.load_source('.'.join([self.package, name]), path)
        '''根據(jù)模塊路徑,加載模塊方法'''
        return getattr(self._module_cache[path], self.class_name)(*args, **kwargs)

'''通過模塊名找到模塊路徑'''
def find_plugin(self, name, suffixes=None):
        ''' Find a plugin named name '''

        if not suffixes:
            if self.class_name:
                suffixes = ['.py']
            else:
                suffixes = ['.py', '']
          #frozenset就是集合,可以當(dāng)做set看
        potential_names = frozenset('%s%s' % (name, s) for s in suffixes)
        for full_name in potential_names:
            if full_name in self._plugin_path_cache:
                return self._plugin_path_cache[full_name]

        found = None
        for path in [p for p in self._get_paths() if p not in self._searched_paths]:
            if os.path.isdir(path):
                full_paths = (os.path.join(path, f) for f in os.listdir(path))
                for full_path in (f for f in full_paths if os.path.isfile(f)):
                    for suffix in suffixes:
                        if full_path.endswith(suffix):
                            full_name = os.path.basename(full_path)
                            break
                    else: # Yes, this is a for-else: http://bit.ly/1ElPkyg
                        continue

                    if full_name not in self._plugin_path_cache:
                        self._plugin_path_cache[full_name] = full_path

            self._searched_paths.add(path)
            for full_name in potential_names:
                if full_name in self._plugin_path_cache:
                    return self._plugin_path_cache[full_name]

        # if nothing is found, try finding alias/deprecated
        if not name.startswith('_'):
            for alias_name in ('_%s' % n for n in potential_names):
                # We've already cached all the paths at this point
                if alias_name in self._plugin_path_cache:
                    return self._plugin_path_cache[alias_name]

        return None
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容