Celery和Django的分布式自動化測試

為了實現(xiàn)快速高效使用計算集群解決大量測試用例管理和執(zhí)行的問題,基于Celery和Django的分布式自動化測試,其由API服務器層、用例管理層、任務調(diào)度層和任務執(zhí)行層組成四層架構(gòu),實現(xiàn)了定時調(diào)度測試、分布式執(zhí)行、失敗重試等功能。能夠快速部署和配置測試執(zhí)行節(jié)點,實現(xiàn)了充分利用計算集群資源、提高測試效率的目的。

自動化測試Celery工作原理:

Celery.png

Django Celery部署

  • 1. 安裝celery

首先,我們必須擁有一個broker消息隊列用于發(fā)送和接收消息。Celery官網(wǎng)給出了多個broker的備選方案:RabbitMQRedis、Database以及其他的消息中間件。我這邊使用的是Redis作為消息中間人。
django-celery-beat 定時任務
django-celery-results 存儲Celery任務結(jié)果第三方插件,我這邊是根據(jù)業(yè)務邏輯重新設計了數(shù)據(jù)結(jié)構(gòu)

pip install celery==5.0.5
pip install redis==3.5.3
pip install django-celery-beat==2.2.0
pip install django-celery-results==2.0.1
  • 2. 注冊APP
INSTALLED_APPS = [
    ....   
    'django_celery_beat',
    'django_celery_results',
]
  • 3. 配置settings.py
# 設置代理人broker
broker_url = f'redis://{HOST}:6379'
# 使用django orm 作為結(jié)果存儲
result_backend = 'django-db'
# celery 的啟動工作數(shù)量設置
worker_concurrency = 5
# 任務預取功能,就是每個工作的進程/線程在獲取任務的時候,會盡量多拿 n 個,以保證獲取的通訊成本可以壓縮。
worker_prefetch_multiplier = 5
# celery 的 worker 執(zhí)行多少個任務后進行重啟操作
worker_max_tasks_per_child = 100
# 禁用所有速度限制,如果網(wǎng)絡資源有限,不建議開足馬力。
worker_disable_rate_limits = True
# 指定任務接受的序列化類型
accept_content = ['json']
# 指定任務序列化方式
task_serializer = 'json'
# 指定結(jié)果序列化的方式
result_serializer = 'json'
# celery beat配置(周期性任務設置)
timezone = 'Asia/Shanghai'
enable_utc = False
beat_sync_every = 1
# settings USE_TZ=False時添加該選項,否啟動 django celery beat 的時候,出現(xiàn)這個錯誤TypeError: can't compare offset-naive and offset-aware datetimes
DJANGO_CELERY_BEAT_TZ_AWARE = False 
# 休眠最大秒數(shù)
beat_max_loop_interval = 300
beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'
  • 4. 新增celery_tasks文件
"""目錄結(jié)構(gòu)"""
├── celery_tasks
│ ├── init.py
│ ├── celery.py
# celery.py
# 將Celery連接到應用程序
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
# 為celery設置環(huán)境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ServerDjango.settings')
app = Celery('celery_tasks')
# 加載配置
app.config_from_envvar('DJANGO_SETTINGS_MODULE')
# 設置app自動加載任務
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


# init.py
from celery_tasks.celery import app as celery_app
__all__ = ['celery_app']

celery.py中設定了對settings.pyINSTALLED_APPSautodiscover_tasksCelery便會去查看INSTALLD_APPS下包含的所有app目錄中的tasks.py文件,找到標記為task的方法,將它們注冊為celery task需要注意的是,與一般的.py中實現(xiàn)celery不同,tasks.py必須建在各app的根目錄下,且不能隨意命名。

  • 例: tasks.py
from celery import shared_task

@shared_task
def tailf_log(channel_name, file_path):
    """跟蹤日志"""
    channel_layer = get_channel_layer()
    try:
        with open(file_path, encoding='utf-8') as f:
            while True:
                line = f.readline()
                if line:
                    async_to_sync(channel_layer.send)(
                        channel_name,
                        {
                            "type": "send.message",
                            "message": str(line)
                        }
                    )
                else:
                    time.sleep(0.5)
    except Exception as e:
        f.close()
        print(e)

  • 5. 分別啟動wokerbeat
celery -A celery_tasks worker -l info   # 啟動woker
celery -A celery_tasks beat -l info --scheduler django_celery_beat.schedulers.DatabaseScheduler #啟動beat 調(diào)度器使用數(shù)據(jù)庫
  • 依據(jù)現(xiàn)有業(yè)務邏輯增加了任務失敗重試機制、任務返回后計算下次任務執(zhí)行時間以及當前任務消耗時間功能。
import celery
from celery.schedules import crontab
from django_celery_beat.models import PeriodicTask
from ManageApps.my_tasks.models import UserTasks

class CeleryTask(celery.Task):

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        task = UserTasks.objects.get(task_id=kwargs['task_id'])
        if kwargs["task_id"] and task.retry:
            # 失敗重試,默認300s
            self.retry(exc=exc, countdown=300, max_retries=1)
        return super(CeleryTask, self).on_failure(exc, task_id, args, kwargs, einfo)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        if kwargs["task_id"]:  # 關(guān)鍵字參數(shù)task_id, 判斷是否為定時任務
            task = _next_run_time(kwargs['task_id'])
            elapsed_time = (datetime.datetime.now() - task.last_run_at).total_seconds()
            UserTaskResult.objects.create(**{"task_id": kwargs['task_id'], "result_id": task_id,
                                             "elapsed": round(elapsed_time, 2), "status": status})
        return super(CeleryTask, self).after_return(status, retval, task_id, args, kwargs, einfo)

def _next_run_time(task_id):
    """計算任務下次運行時間"""
    per_task = PeriodicTask.objects.get(id=task_id)
    my_task = UserTasks.objects.get(task_id=task_id)
    if per_task.crontab_id and my_task.start_time:
        # 周期任務
        cron_obj = CrontabSchedule.objects.get(id=per_task.crontab_id)
        cron = crontab(minute=cron_obj.minute, hour=cron_obj.hour, day_of_week=cron_obj.day_of_week,
                       day_of_month=cron_obj.day_of_month, month_of_year=cron_obj.month_of_year)
        now = cron.now()  # 當前運行時間
        result = cron.remaining_delta(last_run_at=now)
        ends_in = (result[0] + result[1]).replace(tzinfo=None)
        my_task.start_time = ends_in
    elif per_task.interval_id and my_task.start_time:
        # 間隔任務
        interval = IntervalSchedule.objects.get(id=per_task.interval_id)
        offset = datetime.timedelta(minutes=+0)
        if interval.period == 'minutes':
            offset = datetime.timedelta(minutes=+interval.every)
        elif interval.period == 'days':
            offset = datetime.timedelta(days=+interval.every)
        elif interval.period == 'hours':
            offset = datetime.timedelta(hours=+interval.every)
        elif interval.period == 'seconds':
            offset = datetime.timedelta(seconds=+interval.every)
        elif interval.period == 'microseconds':
            offset = datetime.timedelta(microseconds=+interval.every)
        my_task.start_time = datetime.datetime.now() + offset
    else:
        # 第一次運行寫入當前時間
        my_task.start_time = datetime.datetime.now()
    my_task.save()
    return my_task
  • 參考django-celery-beat、django-celery-result二次設計任務模型
from django.db import models
from django_celery_beat.models import PeriodicTask
from django_celery_results.models import TaskResult, TASK_STATE_CHOICES

class UserTasks(models.Model):
    user = models.ForeignKey('user.User', on_delete=models.CASCADE, verbose_name='所屬用戶', help_text='所屬用戶',
                             null=True, blank=True)
    task = models.ForeignKey(to=PeriodicTask, on_delete=models.CASCADE, verbose_name='所屬任務', help_text='所屬任務',
                             null=True, blank=True)
    task_tags = models.CharField(max_length=255, null=True, blank=True, verbose_name='任務標簽', help_text='任務標簽')
    notice = models.SmallIntegerField(verbose_name='任務通知', help_text='任務通知')
    failfast = models.BooleanField(default=False, blank=True, verbose_name='錯誤停止測試機制', help_text='錯誤停止測試機制')
    retry = models.BooleanField(default=False, blank=True, verbose_name='重試機制', help_text='重試機制')
    task_type = models.BooleanField(default=False, blank=True, verbose_name='任務類型', help_text='任務類型')
    last_run_at = models.DateTimeField(blank=True, null=True, verbose_name='Last Run Datetime',
                                       help_text='計劃上次觸發(fā)任務運行的日期時間')
    start_time = models.DateTimeField(blank=True, null=True, verbose_name='Start Datetime',
                                      help_text='Datetime when the schedule should begin triggering the task to run',)

    class Meta:
        db_table = 'tb_user_tasks'
        verbose_name = '用戶任務'
        verbose_name_plural = verbose_name


class UserTaskResult(models.Model):
    result_id = models.CharField(max_length=255, null=True, blank=True, verbose_name='Result ID', help_text='結(jié)果ID')
    task = models.ForeignKey(to=PeriodicTask, on_delete=models.CASCADE, verbose_name='所屬任務', help_text='所屬任務',
                             null=True, blank=True)
    create_time = models.BigIntegerField(verbose_name="創(chuàng)建時間", help_text="創(chuàng)建時間")
    elapsed = models.FloatField(verbose_name="耗時/s", help_text="耗時/s", null=True, blank=True, default=0.00)
    status = models.CharField(max_length=50, default='PENDING', choices=TASK_STATE_CHOICES,
                              verbose_name='任務狀態(tài)',
                              help_text='Current state of the task being run')

    class Meta:
        db_table = 'tb_user_task_result'
        verbose_name = '用戶任務結(jié)果'
        verbose_name_plural = verbose_name

前端頁面

  • 任務管理
    image.png

    image.png
  • 任務統(tǒng)計
    image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容