Elastic-job 介紹與使用

目標(biāo)

把定時(shí)任務(wù)通過集群的方式進(jìn)行管理調(diào)度,并采用分布式部署,保證系統(tǒng)的高可用,提高了容錯(cuò)。那么如何保證定時(shí)任務(wù)只在集群的某一個(gè)節(jié)點(diǎn)上執(zhí)行,或者一個(gè)任務(wù)如何拆分為多個(gè)獨(dú)立的任務(wù)項(xiàng),由分布式的機(jī)器去分別執(zhí)行, 眾多的定時(shí)任務(wù)如何統(tǒng)一管理,現(xiàn)在有很多成熟的分布式定時(shí)任務(wù)框架,都能很好的實(shí)現(xiàn)上述的功能。

基本概念

elastic-job 是由當(dāng)當(dāng)網(wǎng)基于quartz 二次開發(fā)之后的分布式調(diào)度解決方案 , 由兩個(gè)相對(duì)獨(dú)立的子項(xiàng)目Elastic-Job-Lite和Elastic-Job-Cloud組成 。
elastic-job主要的設(shè)計(jì)理念是無中心化的分布式定時(shí)調(diào)度框架,思路來源于Quartz的基于數(shù)據(jù)庫(kù)的高可用方案。但數(shù)據(jù)庫(kù)沒有分布式協(xié)調(diào)功能,所以在高可用方案的基礎(chǔ)上增加了彈性擴(kuò)容和數(shù)據(jù)分片的思路,以便于更大限度的利用分布式服務(wù)器的資源。

1. 分片

任務(wù)的分布式執(zhí)行,需要將一個(gè)任務(wù)拆分為多個(gè)獨(dú)立的任務(wù)項(xiàng),然后由分布式的服務(wù)器分別執(zhí)行某一個(gè)或幾個(gè)分片項(xiàng)。

例如:有一個(gè)遍歷數(shù)據(jù)庫(kù)某張表的作業(yè),現(xiàn)有2臺(tái)服務(wù)器。
為了快速的執(zhí)行作業(yè),那么每臺(tái)服務(wù)器應(yīng)執(zhí)行作業(yè)的50%。 為滿足此需求,可將作業(yè)分成2片,每臺(tái)服務(wù)器執(zhí)行1片。
作業(yè)遍歷數(shù)據(jù)的邏輯可以為:服務(wù)器A遍歷ID以奇數(shù)結(jié)尾的數(shù)據(jù);服務(wù)器B遍歷ID以偶數(shù)結(jié)尾的數(shù)據(jù)。
如果分成10片,則服務(wù)器A被分配到分片項(xiàng)0,1,2,3,4;服務(wù)器B被分配到分片項(xiàng)5,6,7,8,9。
作業(yè)遍歷數(shù)據(jù)的邏輯可以為:服務(wù)器A遍歷ID以0-4結(jié)尾的數(shù)據(jù);服務(wù)器B遍歷ID以5-9結(jié)尾的數(shù)據(jù)

2. 分片項(xiàng)與業(yè)務(wù)處理解耦

Elastic-Job并不直接提供數(shù)據(jù)處理的功能,框架只會(huì)將分片項(xiàng)分配至各個(gè)運(yùn)行中的作業(yè)服務(wù)器,開發(fā)者需要自行處理分片項(xiàng)與真實(shí)數(shù)據(jù)的對(duì)應(yīng)關(guān)系。以上面例子分成10片為例,框架只負(fù)責(zé)決定服務(wù)器分配到哪些分片項(xiàng),由作業(yè)分配策略決定,但是每個(gè)分片處理哪一部分?jǐn)?shù)據(jù),比如第一個(gè)分片處理id以0-4結(jié)尾的數(shù)據(jù),是由開發(fā)者去決定和處理的。

3. 中心化

xxl-job是中心化設(shè)計(jì),在xxl-job中,所有定時(shí)任務(wù)的執(zhí)行是在調(diào)度中心判斷作業(yè)到了執(zhí)行的時(shí)間,然后通知業(yè)務(wù)系統(tǒng)去執(zhí)行,即是作業(yè)節(jié)點(diǎn)并不知道自己應(yīng)該什么時(shí)候執(zhí)行定時(shí)任務(wù),只能通過調(diào)度中心去決定作業(yè)的執(zhí)行。缺點(diǎn)是部署麻煩。

4. 去中心化

elastic-job是去中心化設(shè)計(jì),作業(yè)調(diào)度中心節(jié)點(diǎn),各個(gè)作業(yè)節(jié)點(diǎn)是自治的,作業(yè)框架的程序在到達(dá)相應(yīng)時(shí)間點(diǎn)時(shí)各自觸發(fā)調(diào)度,缺點(diǎn)是可能會(huì)存在各個(gè)作業(yè)服務(wù)器的時(shí)間不一致的問題。

使用

1. 引入maven依賴

        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>2.1.5</version>
        </dependency>

        <dependency>
            <artifactId>elastic-job-lite-core</artifactId>
            <groupId>com.dangdang</groupId>
            <version>2.1.5</version>
        </dependency>

2. 配置注冊(cè)中心

    <reg:zookeeper id="regCenter" server-lists="192.168.3.191:2181" namespace="elastic-job-zookeeper" base-sleep-time-milliseconds="${baseSleepTimeMilliseconds}" max-sleep-time-milliseconds="${maxSleepTimeMilliseconds}" max-retries="${maxRetries}" />

image.png

3. 事件追蹤(可選)

    <bean id="elasticJobLog" class="com.alibaba.druid.pool.DruidDataSource" init-method="init"  destroy-method="close">
        <!--<property name="driverClassName" value="${event.rdb.driver}"/>-->
        <property name="url" value="${event.rdb.url}"/>
        <property name="username" value="${event.rdb.username}"/>
        <property name="password" value="${event.rdb.password}"/>
    </bean>

4.作業(yè)開發(fā)

package com.isuwang.soa.crm.dbc.action.shareCrm

import java.util.{Date, Optional}
...

@Transactional(value = "crm", rollbackFor = Array(classOf[Exception]))
class FxxkUpdateRecordAction() extends Action[Unit] with SimpleJob{

  override def preCheck: Unit = {}

  override def action: Unit = {
    val beginTime = System.currentTimeMillis()
    rangeConditions.append(rangeCondition)
    searchQuery.rangeConditions(rangeConditions)

    getDatas(fXRecords)

    // 同步
    fXRecords.foreach(x => {
      val tripList = getTableColumnValue(x)
      tripList.foreach(trip => {
        try {
          executeUpdate(trip._4, trip, x)
        } catch {
          case e: Throwable => {
            logger.error(e.getMessage, e)
            logger.info("=====>紛享更新回訪記錄失敗內(nèi)容id:{}", x._id)
          }
        }
      })
    })
    logger.info(s"====>${getClass.getName}耗時(shí):{}秒", (System.currentTimeMillis() - beginTime) / 1000)
  }
...
  override def execute(shardingContext: ShardingContext): Unit = action
}
    <job:simple id="FxxkUpdateRecordAction" class="com.isuwang.soa.crm.dbc.action.shareCrm.FxxkUpdateRecordAction" registry-center-ref="regCenter" sharding-total-count="1" cron="0 40 * * * ? "  failover="true" description="每晚定時(shí)統(tǒng)計(jì)獲取紛享CRM當(dāng)日更新的回訪記錄" overwrite="true"  event-trace-rdb-data-source="elasticJobLog" />

failover:是否開啟任務(wù)執(zhí)行失效轉(zhuǎn)移,開啟表示如果作業(yè)在一次任務(wù)執(zhí)行中途宕機(jī),允許將該次未完成的任務(wù)在另一作業(yè)節(jié)點(diǎn)上補(bǔ)償執(zhí)行
description:作業(yè)描述
overwrite:本地配置是否可覆蓋注冊(cè)中心配置,如果可覆蓋,每次啟動(dòng)作業(yè)都以本地配置為準(zhǔn)
event-trace-rdb-data-source:作業(yè)事件追蹤的數(shù)據(jù)源Bean引用

實(shí)現(xiàn)原理

1. 作業(yè)啟動(dòng)

image.png

2. 作業(yè)執(zhí)行

image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容