使用Flask+AdminLTE 開發(fā)自己的管理平臺(tái)

題記

前段時(shí)間沒什么事情, 用Flask寫了一個(gè)web manager,目前還在不斷完善

主要完成的模塊包含有:

  • 網(wǎng)易郵箱的郵件管理
  • 域名解析管理
  • 集成grafanadashboard
  • 使用Ansible 2.0 API完成了命令批量執(zhí)行
  • Ansible-CMDB構(gòu)建CMDB
示例

<img src="http://occwxjjdz.bkt.clouddn.com/zhxfei%27s%20web%20manager%20-%20Mozilla%20Firefox_073.png">

<img src="http://occwxjjdz.bkt.clouddn.com/zhxfei%27s%20web%20manager%20-%20Mozilla%20Firefox_070.png">

<img src="http://occwxjjdz.bkt.clouddn.com/zhxfei%27s%20web%20manager%20-%20Mozilla%20Firefox_072.png">

<img src="http://occwxjjdz.bkt.clouddn.com/zhxfei%27s%20web%20manager%20-%20Mozilla%20Firefox_069.png">

<img src="http://occwxjjdz.bkt.clouddn.com/zhxfei%27s%20web%20manager%20-%20Mozilla%20Firefox_067.png">

實(shí)現(xiàn)

服務(wù)監(jiān)控
這一部分主要是依賴于之前搭建的基于open-falcon的監(jiān)控系統(tǒng),由于數(shù)據(jù)展示用的是grafana,所以使用的grafanadashboardsnapshot功能直接分享到這個(gè)系統(tǒng)上。

郵件接受
主要的code

# coding: utf-8
import poplib
from email.parser import Parser
from email.header import decode_header
from email.utils import parseaddr, parsedate_tz
from datetime import datetime
from app.utils.personal_config import pop3_server, email, password
from app import db
from app.modles import Email, SyncLog


email_info_lst = ['From', 'To', 'Subject', 'Date']


def decode_str(s):
    value, charset = decode_header(s)[0]
    if charset:
        value = value.decode(charset)
    return value


def guess_charset(msg):
    charset = msg.get_charset()
    if charset is None:
        content_type = msg.get('Content-Type', '').lower()
        pos = content_type.find('charset=')
        if pos >= 0:
            charset = content_type[pos + 8:].strip()
    return charset


def get_header_value(msg, header):
    value = msg.get(header, '')
    if value:
        if header == 'Subject':
            value = decode_str(value)
        elif header == 'Date':
            print(msg.get('Date'))
            try:
                date_time = datetime.strptime(msg.get('Date'), "%a, %d %b %Y %H:%M:%S %z (%Z)")
                value = date_time.strftime('%Y-%m-%d %H:%M')
            except ValueError as e:
                date_time = parsedate_tz(msg.get('Date'))
                value = "%s-%s-%s %s:%s" % date_time[0:5]
        else:
            hdr, addr = parseaddr(value)
            name = decode_str(hdr)
            value = u'%s %s' % (name, addr)
    return value


def save_info(msg):
    email = Email()
    for header in email_info_lst:
        value = get_header_value(msg, header)
        if header is email_info_lst[0]:
            email.mail_sender = value
        elif header is email_info_lst[1]:
            email.mail_receiver = value
        elif header is email_info_lst[2]:
            email.subject = value
        else:
            email.time = value
    db.session.add(email)


def email_sync():
    try:
        server = poplib.POP3(pop3_server)
        server.user(email)
        server.pass_(password)
        resp, mails, octets = server.list()
        index = len(mails)

        sync = SyncLog.query.order_by(SyncLog.ptr.desc()).first()
        if sync:
            start = sync.ptr
        else:
            start = 1
        if start < index+1:
            for i in range(start, index + 1):
                resp, lines, octets = server.retr(i)
                msg_content = b'\r\n'.join(lines).decode('utf-8', 'ignore')
                msg = Parser().parsestr(msg_content)
                save_info(msg)
            new_sync = SyncLog(
                ptr=index+1,
                has_view=False
            )
            db.session.add(new_sync)
            db.session.commit()
            info = '更新完成'
        else:
            info = '沒有需要更新的郵件'
    except Exception as e:
        info = '更新失敗: %s' % e
    return info

域名解析管理

# coding: utf-8
import requests
from app.utils.personal_config import dns_login_token
from app.modles import RecordInfo
from app import db
from datetime import datetime


records_url = 'https://dnsapi.cn/Record.{}'


def records_sync():
    data = {
        'login_token': dns_login_token,
        'format': 'json',
        'domain_id': '28921413'
    }
    res = requests.post(records_url.format('List'), data=data)
    if res.status_code == 200:
        res_data = res.json()
        status_code = res_data.get('status').get('code')
        if status_code == '1':
            # status code = 1 means requests get 'Action completed successful'
            records_data = res_data.get('records')
            record_new_sp_id_lst = set()
            record_old_sp_id_lst = db.session.query(RecordInfo.sp_id).group_by(RecordInfo.sp_id).all()
            record_old_sp_id_lst = set([v[0] for v in record_old_sp_id_lst])
            for record in records_data:
                record_new_sp_id_lst.add(record['id'])
                if not RecordInfo.query.filter_by(sp_id=record['id']).first():
                    if record['enabled'] == '1':
                        records = RecordInfo()
                        records.sp_id = record['id']
                        records.domain_name = res_data['domain']['name']
                        records.name = record['name']
                        records.type = record['type']
                        records.value = record['value']
                        records.updated_time = record['updated_on']
                        records.ttl = record['ttl']
                        records.use_status = True
                        records.monitor_status = 'unknown'
                        db.session.add(records)
            for record_id in record_old_sp_id_lst - record_new_sp_id_lst:
                record = RecordInfo.query.filter_by(sp_id=record_id).first()
                if record:
                    db.session.delete(record)
            db.session.commit()
            return 'sync succeed'
    else:
        return 'sync failed'


def records_add(name, value, record_type, domain_name='zhxfei.com'):
    data = {
        'login_token': dns_login_token,
        'format': 'json',
        'sub_domain': name,
        'record_type': record_type,
        'record_line': '默認(rèn)',
        'value': value,
        'domain_id': '28921413'
    }
    res = requests.post(records_url.format('Create'), data=data)
    if res.status_code == 200 and res.json()['status']['code'] == '1':
        record = res.json()['record']
        records = RecordInfo()
        records.sp_id = record['id']
        records.name = name
        records.domain_name = domain_name
        records.type = record_type
        records.value = value
        records.updated_time = datetime.now()
        records.ttl = '600'
        records.use_status = True
        records.monitor_status = 'unknown'
        db.session.add(records)
        db.session.commit()
    return res.json()['status']['message']


def record_delete(record_id):
    data = {
        'login_token': dns_login_token,
        'format': 'json',
        'record_id': record_id,
        'domain_id': '28921413'
    }
    res = requests.post(records_url.format('Remove'), data=data)
    return res.json()['status']['message']


def record_modify(record_id):
    pass

資產(chǎn)管理
關(guān)于Ansible-CMDB的介紹使用請(qǐng)看github

本人使用的服務(wù)器,目前主要是阿里云和騰訊云,我需要做一些服務(wù)器信息的收集,如我需要知道服務(wù)器的過期時(shí)間,公網(wǎng)帶寬的大小等等,主要也是對(duì)服務(wù)器的信息通過api收集,之后通過datatable進(jìn)行展示

以騰訊云為例API文檔

實(shí)現(xiàn)

# coding: utf-8
from QcloudApi.qcloudapi import QcloudApi
import requests


def req_url_generate():
    from app.utils.personal_config import tencent_secret_id, tencent_secret_key, region_lst
    module = 'cvm'
    action = 'DescribeInstances'
    config = {
        'secretId': tencent_secret_id,
        'secretKey': tencent_secret_key,
        'method': 'get'
    }
    params = {
        'SignatureMethod': 'HmacSHA1',
    }
    req_url_lst = []
    for region in region_lst:
        config['Region'] = region
        service = QcloudApi(module, config)
        req_url_lst.append(service.generateUrl(action, params))

    return req_url_lst


def parse_message_dict(dct):
    message = []
    for k, v in dct.items():
        message.append(str(k)+': '+str(v))
    return ';'.join(message)


def get_tx_vps_data():
    res_lst = []
    for req_url in req_url_generate():
        res = requests.get(req_url, timeout=3)
        result = res.json()
        if result['code'] == 0:
            info = result['instanceSet']
            for sp in info:     # info is a list
                for k, v in sp.items():
                    if isinstance(v, list):
                        sp[k] = ','.join(v)
                    if isinstance(v, dict):
                        sp[k] = parse_message_dict(v)
                sp['sp_name'] = 'tencent'

            res_lst += info
    return res_lst


def get_tx_vps_data_final():
    res_lst = []
    tx_data = get_tx_vps_data()
    '''
    tx_data_k_type_lst = ['cpu',
                          'wanIpSet',
                          'createTime',
                          'status',
                          'os',
                          'zoneName',
                          'mem',
                          'deadlineTime',
                          'lanIp'
                          'bandwidth',
                          'sp_name',]
    '''
    for info in tx_data:
        # info = {k: _ for k, _ in info.items() if k in tx_data_k_type_lst}
        res_lst.append(info)
    return res_lst

命令執(zhí)行
主要是使用的Ansible,對(duì)Ansible本文就不過多介紹,我使用的它的最主要的原因是十分輕量,它直接使用ssh進(jìn)行操作,而不是像puppet/saltstack等其他自動(dòng)化運(yùn)維工具,需要在每臺(tái)機(jī)器上裝Agent

值得注意的是AnsibleapiAnsible的版本相關(guān),Ansible 2.0Ansible 1.0的差距也是蠻大的,相比1.0新的版本要復(fù)雜的多

本人對(duì)Ansible 2.0 apimodule部分進(jìn)行了封裝,需要注意的是,目前還沒有使用動(dòng)態(tài)的inventory文件,之后應(yīng)該會(huì)實(shí)現(xiàn)


#!/usr/bin/env python

# import json, logging
from collections import namedtuple
from ansible.parsing.dataloader import DataLoader
from ansible.vars.manager import VariableManager
from ansible.inventory.manager import InventoryManager
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase


#   logging = logging.getLevelName()


class ResultCallback(CallbackBase):
    """A sample callback plugin used for performing an action as results come in

    If you want to collect all results into a single object for processing at
    or writing your own custom callback plugin

    """

    def __init__(self, *args, **kwargs):
        super(ResultCallback, self).__init__(*args, **kwargs)
        self.job_id = 0
        self.result_host_all = {}
        self.result_host_ok = {}
        self.result_host_failed = {}
        self.result_host_unreachable = {}
        self.result_host_stdout_lines = {}
        self.result_host_stderr_lines = {}
        self.result_has_stderr_lines = False
        self.result_has_stdout_lines = False

    def v2_runner_on_ok(self, result, **kwargs):
        self.job_id += 1
        host = result._host.get_name() + ' job_' + str(self.job_id)
        self.result_host_all[host] = result._result
        self.result_host_ok[host] = result._result
        if result._result.get('stdout_lines'):
            self.result_has_stdout_lines = True
            self.result_host_stdout_lines[host] = result._result.get('stdout_lines')

    def v2_runner_on_failed(self, result, ignore_errors=False):
        self.job_id += 1
        host = result._host.get_name() + ' job_' + str(self.job_id)
        self.result_host_all[host] = result._result
        self.result_host_failed[host] = result._result
        if result._result.get('stderr_lines'):
            self.result_has_stderr_lines = True
            self.result_host_stderr_lines[host] = result._result.get('stderr_lines')

    def v2_runner_on_unreachable(self, result):
        self.job_id += 1
        host = result._host.get_name() + ' job_' + str(self.job_id)
        self.result_host_all[host] = result._result
        self.result_host_unreachable[host] = result._result


def _parse_task(task_lst):
    tasks = []
    if task_lst:
        for task in task_lst:
            module = task.get('module')
            args = task.get('args')
            tasks.append(dict(action=dict(module=module, args=args)))
    return tasks


class AnsibleRun(object):
    Options = namedtuple('Options',
                         ['connection',
                          'module_path',
                          'forks',
                          'become',
                          'become_method',
                          'become_user',
                          'check',
                          'diff',
                          'sudo',
                          'timeout'])

    def __init__(self, hosts, result_callback=None):
        self.loader = DataLoader()
        self.options = AnsibleRun.Options(connection='ssh',
                                          module_path='../../env/lib/python3.5/site-packages/ansible/modules/',
                                          forks=100,
                                          sudo='yes',
                                          become=None,
                                          become_method=None,
                                          become_user='root',
                                          check=False,
                                          diff=False,
                                          timeout=3)
        self.passwords = dict(vault_pass='secret')
        self.hosts = hosts
        self.inventory = InventoryManager(loader=self.loader, sources=['/etc/ansible/hosts'])
        self.variable_manager = VariableManager(loader=self.loader, inventory=self.inventory)
        self.result_callback = result_callback if result_callback else ResultCallback()

    def module_run(self, task_lst):
        """
        task_lst is a list for dict, Just like :
        [
            {
                'module': 'your_self_module',
                'args': 'args=sssss'
            },
            {
                'module': 'shell',
                'args': 'ifconfig'
            }
        ]
        :param task_lst:
        :return None:
        """
        tasks = _parse_task(task_lst)
        play_source = dict(
            name="Ansible Play",
            hosts=self.hosts,
            gather_facts='no',
            tasks=tasks
        )

        play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)

        # actually run it
        tqm = None
        try:
            tqm = TaskQueueManager(
                inventory=self.inventory,
                variable_manager=self.variable_manager,
                loader=self.loader,
                options=self.options,
                passwords=self.passwords,
                stdout_callback=self.result_callback,
                # Use our custom callback instead of the ``default`` callback plugin
            )
            result = tqm.run(play)
        finally:
            if tqm is not None:
                tqm.cleanup()

    def play_book_run(self):
        pass

    def get_result(self, result_type='all'):
        params_allow_lst = ['result_all',
                            'result_ok',
                            'result_stdout_lines',
                            'result_stderr_lines',
                            'result_failed',
                            'result_unreachable']
        assert result_type in params_allow_lst, 'result_type must in {params_allow_lst}'.format(
                                                                        params_allow_lst=params_allow_lst)
        if result_type == 'result_all':
            return self.result_callback.result_host_all
        if result_type == 'result_ok':
            return self.result_callback.result_host_ok
        if result_type == 'result_failed':
            return self.result_callback.result_host_failed
        if result_type == 'result_unreachable':
            return self.result_callback.result_host_unreachable
        if result_type == 'result_stdout_lines':
            if self.result_callback.result_has_stdout_lines:
                return self.result_callback.result_host_stdout_lines
        if result_type == 'result_stderr_lines':
            if self.result_callback.result_has_stderr_lines:
                return self.result_callback.result_host_stderr_lines


def test():
    ansible_client = AnsibleRun('localhost')
    ansible_client.module_run([
        # {
        #     'module': 'echo',
        #     'args': 'args=sssss'
        # },
        {
            'module': 'cron',
            'args': "name='just a test echo' job='echo hello world' minute='*/1'"
        }
    ])
    out = ansible_client.get_result('result_all')
    print(out)
    out = ansible_client.get_result('result_ok')
    print(out)
    out = ansible_client.get_result('result_stdout_lines')
    print(out)
    out = ansible_client.get_result('result_failed')
    print(out)
    out = ansible_client.get_result('result_stderr_lines')
    print(out)
    out = ansible_client.get_result('result_unreachable')
    print(out)

之后在Flask的視圖函數(shù)中直接調(diào)用:


@admin.route('/command/exec/', methods=['GET', 'POST'])
@login_req
def command_exec():
    form = CommandCommitForm()
    form.result.render_kw['rows'] = 10
    res_count = None
    if form.validate_on_submit():
        host = form.data['host']
        host_name_lst = [h for k, h in host_type if k in host]
        command = form.data['content']
        if is_sec(command):
            runner = AnsibleRun(host_name_lst)
            runner.module_run([
            {
                'module': 'shell',
                'args': command
            }
            ])
            res_body = """"""
            res_stdout = runner.get_result('result_stdout_lines')
            std_out_count = len(res_stdout) if res_stdout else 0
            if res_stdout:
                res_body += '正確輸出:\n'
                for host_job, std in res_stdout.items():
                    res_body += host_job + '\n' * 2 + '\n'.join([' '*4 + v for v in std]) + '\n'
                    res_body += '---' * 30 + '\n'
            res_stderr = runner.get_result('result_stderr_lines')
            std_err_count = len(res_stderr) if res_stderr else 0
            if res_stderr:
                res_body += '錯(cuò)誤輸出:\n'
                for host_job, std in res_stderr.items():
                    res_body += host_job + '\n' * 2 + '\n'.join([' '*4 + v for v in std]) + '\n'
                    res_body += '---' * 30 + '\n'
            res_unreachable = runner.get_result('result_unreachable')
            unreachable_count = len(res_unreachable) if res_unreachable else 0
            if res_unreachable:
                res_body += '不可達(dá)輸出:\n'
                for host_job, std in res_unreachable.items():
                    res_body += host_job + '\n' + '    ' +std['msg'] + '\n'
                    res_body += '---' * 30 + '\n'
            form.result.data = res_body
            form.result.render_kw['rows'] = 40
            global res_count
            res_count = (std_out_count, std_err_count, unreachable_count)
        else:
            flash('command not allowed', 'failed')
    return render_template('admin/command_exec.html', form=form, res_count=res_count)


目前測(cè)試,在Ansible上執(zhí)行批量處理,雖然頁面是阻塞加載,頁面響應(yīng)時(shí)間在4s左右,還能接受,以后還能再優(yōu)化。

完整代碼見這里:My-Admin

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,568評(píng)論 19 139
  • 22年12月更新:個(gè)人網(wǎng)站關(guān)停,如果仍舊對(duì)舊教程有興趣參考 Github 的markdown內(nèi)容[https://...
    tangyefei閱讀 35,408評(píng)論 22 257
  • # Python 資源大全中文版 我想很多程序員應(yīng)該記得 GitHub 上有一個(gè) Awesome - XXX 系列...
    aimaile閱讀 26,835評(píng)論 6 427
  • Python 資源大全中文版 awesome-python[https://github.com/vinta/aw...
    萬色星辰閱讀 9,932評(píng)論 0 255
  • Android 自定義View的各種姿勢(shì)1 Activity的顯示之ViewRootImpl詳解 Activity...
    passiontim閱讀 179,052評(píng)論 25 709

友情鏈接更多精彩內(nèi)容