四、Azkaban各種類型的Job編寫

一、概述

原生的 Azkaban 支持的plugin類型有以下這些:

  1. command:Linux shell命令行任務(wù)
  2. gobblin:通用數(shù)據(jù)采集工具
  3. hadoopJava:運(yùn)行hadoopMR任務(wù)
  4. java:原生java任務(wù)
  5. hive:支持執(zhí)行hiveSQL
  6. pig:pig腳本任務(wù)
  7. spark:spark任務(wù)
  8. hdfsToTeradata:把數(shù)據(jù)從hdfs導(dǎo)入Teradata
  9. teradataToHdfs:把數(shù)據(jù)從Teradata導(dǎo)入hdfs

其中最簡單而且最常用的是command類型,我們在上一篇文章中已經(jīng)描述了如何編寫一個command的job任務(wù)。如果使用command類型,效果其實(shí)跟在本地執(zhí)行Linux shell命令一樣,這樣的話,還不如把shell放到crontable 中運(yùn)行。所以我們把重點(diǎn)放到Azkaban支持的比較常用的四種類型:java、hadoopJava、hive、spark

二、java類型

1、代碼編寫:MyJavaJob.java

package com.dataeye.java;

public class MyJavaJob {

    public static void main(String[] args) {
        System.out.println("#################################");
        System.out.println("####  MyJavaJob class exec... ###");
        System.out.println("#################################");
    }

}

2、打包成jar文件:使用maven或者eclipse導(dǎo)出為jar文件

3、編寫job文件:java.job

type=javaprocess

classpath=./lib/*,${azkaban.home}/lib/*

java.class=com.dataeye.java.MyJavaJob

4、組成一個完整的運(yùn)行包
新建一個目錄,在該目錄下創(chuàng)建一個lib文件夾,把第二步打包的jar文件放到這里,把job文件放到和lib文件夾同一級的目錄下,如下所示:


完整的運(yùn)行包

5、打包成zip文件

把lib目錄和job文件打包成zip文件,如下的java.zip:

zip文件

6、提交運(yùn)行,過程跟之前文章介紹的步驟一樣,不再詳述,執(zhí)行結(jié)果如下:

執(zhí)行結(jié)果

從輸出日志可以看出,代碼已經(jīng)正常執(zhí)行。

以上是java類型的任務(wù)編寫和執(zhí)行的過程。接下來介紹其他任務(wù)編寫的時候,只會介紹代碼的編寫和job的編寫,其他過程與上面的一致。

三、hadoopJava類型

1、數(shù)據(jù)準(zhǔn)備

以下內(nèi)容是運(yùn)行wordcount任務(wù)時需要的輸入文件input.txt:

1   Ross    male    33  3674
2   Julie   male    42  2019
3   Gloria  female  45  3567
4   Carol   female  36  2813
5   Malcolm male    42  2856
6   Joan    female  22  2235
7   Niki    female  27  3682
8   Betty   female  20  3001
9   Linda   male    21  2511
10  Whitney male    35  3075
11  Lily    male    27  3645
12  Fred    female  39  2202
13  Gary    male    28  3925
14  William female  38  2056
15  Charles male    48  2981
16  Michael male    25  2606
17  Karl    female  32  2260
18  Barbara male    39  2743
19  Elizabeth   female  26  2726
20  Helen   female  47  2457
21  Katharine   male    45  3638
22  Lee female  43  3050
23  Ann male    35  2874
24  Diana   male    37  3929
25  Fiona   female  45  2955
26  Bob female  21  3382
27  John    male    48  3677
28  Thomas  female  22  2784
29  Dean    male    38  2266
30  Paul    female  31  2679

把input.txt文件拷貝到hdfs的 /data/yann/input 目錄下

2、代碼準(zhǔn)備:

package azkaban.jobtype.examples.java;

import azkaban.jobtype.javautils.AbstractHadoopJob;
import azkaban.utils.Props;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.log4j.Logger;

public class WordCount extends AbstractHadoopJob
{
  private static final Logger logger = Logger.getLogger(WordCount.class);
  private final String inputPath;
  private final String outputPath;
  private boolean forceOutputOverrite;

  public WordCount(String name, Props props)
  {
    super(name, props);
    this.inputPath = props.getString("input.path");
    this.outputPath = props.getString("output.path");
    this.forceOutputOverrite = props.getBoolean("force.output.overwrite", false);
  }

  public void run()
    throws Exception
  {
    logger.info(String.format("Starting %s", new Object[] { getClass().getSimpleName() }));

    JobConf jobconf = getJobConf();
    jobconf.setJarByClass(WordCount.class);

    jobconf.setOutputKeyClass(Text.class);
    jobconf.setOutputValueClass(IntWritable.class);

    jobconf.setMapperClass(Map.class);
    jobconf.setReducerClass(Reduce.class);

    jobconf.setInputFormat(TextInputFormat.class);
    jobconf.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.addInputPath(jobconf, new Path(this.inputPath));
    FileOutputFormat.setOutputPath(jobconf, new Path(this.outputPath));

    if (this.forceOutputOverrite)
    {
      FileSystem fs = FileOutputFormat.getOutputPath(jobconf).getFileSystem(jobconf);
      fs.delete(FileOutputFormat.getOutputPath(jobconf), true);
    }

    super.run();
  }

  public static class Map extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable>
  {
    private static final IntWritable one = new IntWritable(1);
    private Text word = new Text();

    private long numRecords = 0L;

    public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException
    {
      String line = value.toString();
      StringTokenizer tokenizer = new StringTokenizer(line);
      while (tokenizer.hasMoreTokens()) {
        this.word.set(tokenizer.nextToken());
        output.collect(this.word, one);
        reporter.incrCounter(Counters.INPUT_WORDS, 1L);
      }

      if (++this.numRecords % 100L == 0L)
        reporter.setStatus("Finished processing " + this.numRecords + " records " + "from the input file");
    }

    static enum Counters
    {
      INPUT_WORDS;
    }
  }

  public static class Reduce extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable>
  {
    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter)
      throws IOException
    {
      int sum = 0;
      while (values.hasNext()) {
        sum += ((IntWritable)values.next()).get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }
}

3、編寫job文件

wordcount.job文件內(nèi)容如下:

type=hadoopJava

job.extend=false

job.class=azkaban.jobtype.examples.java.WordCount

classpath=./lib/*,${azkaban.home}/lib/*

force.output.overwrite=true

input.path=/data/yann/input

output.path=/data/yann/output

這樣hadoopJava類型的任務(wù)已經(jīng)完成,打包提交到Azkaban中執(zhí)行即可

四、hive類型

1、編寫 hive.sql文件

use azkaban;

INSERT OVERWRITE TABLE 
 user_table1 PARTITION (day_p='2017-02-08') 
SELECT appid,uid,country,province,city 
 FROM user_table0 where adType=1;

以上是標(biāo)準(zhǔn)的hive的sql腳本,首先切換到azkaban數(shù)據(jù)庫,然后把user_table0 的數(shù)據(jù)插入到user_table1 表的指定day_p分區(qū)。需要先準(zhǔn)備好 user_table0 和 user_table1 表結(jié)構(gòu)和數(shù)據(jù)。

編寫完成后,把文件放入 res 文件夾中。

2、編寫hive.job文件

type=hive

user.to.proxy=azkaban

classpath=./lib/*,${azkaban.home}/lib/*

azk.hive.action=execute.query

hive.script=res/hive.sql

關(guān)鍵的參數(shù)是 hive.script,該參數(shù)指定使用的sql腳本在 res目錄下的hive.sql文件

五、spark類型

spark任務(wù)有兩種運(yùn)行方式,一種是command類型,另一種是spark類型

首先準(zhǔn)備好spark任務(wù)的代碼

package com.dataeye.template.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext}

object WordCount {
  def main(args: Array[String]) {
    if (args.length < 1) {
      System.err.println("Usage:WordCount <hdfs_file>")
      System.exit(1)
    }

    System.out.println("get first param ==> " + args(0))
    System.out.println("get second param ==> " + args(1))

    /** spark 2.0的方式
      * val spark = SparkSession.builder().appName("WordCount").getOrCreate()
      */
    val sc = new SparkContext(new SparkConf().setAppName("WordCount"))
    val spark = new SQLContext(sc)
    val file = spark.sparkContext.textFile(args(0))
    val wordCounts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
    // 數(shù)據(jù)collect 到driver端打印
    wordCounts.collect().foreach(println _)
  }
}

然后準(zhǔn)備數(shù)據(jù),數(shù)據(jù)就使用前面hadoopJava中的數(shù)據(jù)即可。

最后打包成jar文件:spark-template-1.0-SNAPSHOT.jar

1、command類型

command類型的配置方式比較簡單,spark.job文件如下:

type=command

command=${spark.home}/bin/spark-submit --master yarn-cluster --class com.dataeye.template.spark.WordCount lib/spark-template-1.0-SNAPSHOT.jar   hdfs://de-hdfs/data/yann/info.txt   paramtest

2、spark類型

type=spark

master=yarn-cluster
execution-jar=lib/spark-template-1.0-SNAPSHOT.jar
class=com.dataeye.template.spark.WordCount
params=hdfs://de-hdfs/data/yann/info.txt  paramtest

以上就是Azkaban支持的幾種常用的任務(wù)類型。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容