Flink批量處理之DataSet

flink不僅可以支持實(shí)時(shí)流式處理,它也可以支持批量處理,其中批量處理也可以看作是實(shí)時(shí)處理的一個(gè)特殊情況

1、 dataSet的內(nèi)置數(shù)據(jù)源

基于文件數(shù)據(jù)源:

readTextFile(path) / TextInputFormat:逐行讀取文件并將其作為字符串(String)返回

readTextFileWithValue(path) / TextValueInputFormat:逐行讀取文件并將其作為StringValue返回。StringValue是Flink對(duì)String的封裝,可變、可序列化,一定程度上提高性能。

readCsvFile(path) / CsvInputFormat:解析以逗號(hào)(或其他字符)分隔字段的文件。返回元組或pojo

readFileOfPrimitives(path, Class) / PrimitiveInputFormat

readFileOfPrimitives(path, delimiter, Class) / PrimitiveInputFormat 跟readCsvFile類似,只不過(guò)以原生類型返回而不是Tuple。

readSequenceFile(Key, Value, path) / SequenceFileInputFormat:讀取SequenceFile,以Tuple2<Key, Value>返回

基于集合數(shù)據(jù)源:

fromCollection(Collection)

fromCollection(Iterator, Class)

fromElements(T ...)

fromParallelCollection(SplittableIterator, Class)

generateSequence(from, to)

通用數(shù)據(jù)源:

readFile(inputFormat, path) / FileInputFormat

createInput(inputFormat) / InputFormat

文件數(shù)據(jù)源

入門案例就是基于文件數(shù)據(jù)源,如果需要對(duì)文件夾進(jìn)行遞歸,那么我們也可以使用參數(shù)來(lái)對(duì)文件夾下面的多級(jí)文件夾進(jìn)行遞歸

import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment} object BatchOperate { def main(args: Array[String]): Unit = { val inputPath = "D:****\****count.txt" val outPut = "D:****\****data****\****result2"

val configuration: Configuration = new Configuration()
configuration.setBoolean("recursive.file.enumeration",true)

//獲取程序入口類ExecutionEnvironment val env = ExecutionEnvironment.getExecutionEnvironment val text = env.readTextFile(inputPath) .withParameters(configuration)

//引入隱式轉(zhuǎn)換import org.apache.flink.api.scala._ val value: AggregateDataSet[(String, Int)] = text.flatMap(x => x.split(" ")).map(x =>(x,1)).groupBy(0).sum(1)
value.writeAsText("d:****\****datas****\****result.txt").setParallelism(1)
env.execute("batch word count")
}
}

集合數(shù)據(jù)源

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} object DataSetSource { def main(args: Array[String]): Unit = { //獲取批量處理程序入口類ExecutionEnvironment val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //從集合當(dāng)中創(chuàng)建dataSet val myArray = Array("hello world","spark flink") val collectionSet: DataSet[String] = environment.fromCollection(myArray) val result: AggregateDataSet[(String, Int)] = collectionSet.flatMap(x => x.split(" ")).map(x =>(x,1)).groupBy(0).sum(1)
result.setParallelism(1).print() // result.writeAsText("c:\HELLO.TXT") environment.execute()
}

}

2、dataSet的算子介紹

官網(wǎng)算子介紹:

https://ci.apache.org/projects/flink/flink-docs-master/dev/batch/dataset_transformations.html

dataSet的transformation算子

Map:輸入一個(gè)元素,然后返回一個(gè)元素,中間可以做一些清洗轉(zhuǎn)換等操作

FlatMap:輸入一個(gè)元素,可以返回零個(gè),一個(gè)或者多個(gè)元素

MapPartition:類似map,一次處理一個(gè)分區(qū)的數(shù)據(jù)【如果在進(jìn)行map處理的時(shí)候需要獲取第三方資源鏈接,建議使用MapPartition】

Filter:過(guò)濾函數(shù),對(duì)傳入的數(shù)據(jù)進(jìn)行判斷,符合條件的數(shù)據(jù)會(huì)被留下

Reduce:對(duì)數(shù)據(jù)進(jìn)行聚合操作,結(jié)合當(dāng)前元素和上一次reduce返回的值進(jìn)行聚合操作,然后返回一個(gè)新的值

Aggregate:sum、max、min等

Distinct:返回一個(gè)數(shù)據(jù)集中去重之后的元素,data.distinct()

Join:內(nèi)連接

OuterJoin:外鏈接

需求一:使用mapPartition將數(shù)據(jù)保存到數(shù)據(jù)庫(kù)

第一步:導(dǎo)入mysql的jar包坐標(biāo)

<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency>

第二步:創(chuàng)建mysql數(shù)據(jù)庫(kù)以及數(shù)據(jù)庫(kù)表

/*!40101 SET NAMES utf8 */;

/!40101 SET SQL_MODE=''/;

/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;

/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;

/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;

/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;

CREATE DATABASE /!32312 IF NOT EXISTS/flink_db /*!40100 DEFAULT CHARACTER SET utf8 */;

USE flink_db;

/*Table structure for table user */

DROP TABLE IF EXISTS user;

CREATE TABLE user (

id int(10) NOT NULL AUTO_INCREMENT,

name varchar(32) DEFAULT NULL,

PRIMARY KEY (id)

) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;

第三步:代碼開(kāi)發(fā)

import java.sql.PreparedStatement import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} object MapPartition2MySql { def main(args: Array[String]): Unit = { val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val sourceDataset: DataSet[String] = environment.fromElements("1 zhangsan","2 lisi","3 wangwu")
sourceDataset.mapPartition(part => {
Class.forName("com.mysql.jdbc.Driver").newInstance() val conn = java.sql.DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_db", "root", "123456")
part.map(x => { val statement: PreparedStatement = conn.prepareStatement("insert into user (id,name) values(?,?)")
statement.setInt(1, x.split(" ")(0).toInt)
statement.setString(2, x.split(" ")(1))
statement.execute()
})
}).print()
environment.execute()

}
}

需求二:連接操作

左外連接,右外連接,滿外連接等算子的操作可以實(shí)現(xiàn)對(duì)兩個(gè)dataset進(jìn)行join操作,按照我們指定的條件進(jìn)行join

object BatchDemoOuterJoinScala { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val data1 = ListBufferTuple2[Int,String]
data1.append((1,"zs"))
data1.append((2,"ls"))
data1.append((3,"ww")) val data2 = ListBufferTuple2[Int,String]
data2.append((1,"beijing"))
data2.append((2,"shanghai"))
data2.append((4,"guangzhou")) val text1 = env.fromCollection(data1) val text2 = env.fromCollection(data2)

text1.leftOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{ if(second==null){
(first._1,first._2,"null")
}else{
(first._1,first._2,second._2)
}
}).print()

println("===============================")

text1.rightOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{ if(first==null){
(second._1,"null",second._2)
}else{
(first._1,first._2,second._2)
}
}).print()

println("===============================")

text1.fullOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{ if(first==null){
(second._1,"null",second._2)
}else if(second==null){
(first._1,first._2,"null")
}else{
(first._1,first._2,second._2)
}
}).print()

}

}

dataSet的partition算子

Rebalance:對(duì)數(shù)據(jù)集進(jìn)行再平衡,重分區(qū),消除數(shù)據(jù)傾斜

Hash-Partition:根據(jù)指定key的哈希值對(duì)數(shù)據(jù)集進(jìn)行分區(qū)

partitionByHash()

Range-Partition:根據(jù)指定的key對(duì)數(shù)據(jù)集進(jìn)行范圍分區(qū)

.partitionByRange()

Custom Partitioning:自定義分區(qū)規(guī)則,自定義分區(qū)需要實(shí)現(xiàn)Partitioner接口partitionCustom(partitioner, "someKey")或者partitionCustom(partitioner, 0)

在flink批量處理當(dāng)中,分區(qū)算子主要涉及到rebalance,partitionByHash

,partitionByRange以及partitionCustom來(lái)進(jìn)行分區(qū)

object FlinkPartition { def main(args: Array[String]): Unit = { val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment environment.setParallelism(2) import org.apache.flink.api.scala._ val sourceDataSet: DataSet[String] = environment.fromElements("hello world","spark flink","hive sqoop") val filterSet: DataSet[String] = sourceDataSet.filter(x => x.contains("hello"))
.rebalance()
filterSet.print()
environment.execute()
}
}

自定義分區(qū)來(lái)實(shí)現(xiàn)數(shù)據(jù)分區(qū)操作

第一步:自定義分區(qū)scala的class類

import org.apache.flink.api.common.functions.Partitioner class MyPartitioner2 extends Partitioner[String]{ override def partition(word: String, num: Int): Int = {
println("****分區(qū)個(gè)數(shù)為****" + num) if(word.contains("hello")){
println("0****號(hào)分區(qū)****") 0 }else{
println("1****號(hào)分區(qū)****") 1 }
}
}

第二步:代碼實(shí)現(xiàn)

import org.apache.flink.api.scala.ExecutionEnvironment object FlinkCustomerPartition { def main(args: Array[String]): Unit = { val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//設(shè)置我們的分區(qū)數(shù),如果不設(shè)置,默認(rèn)使用
CPU核數(shù)作為分區(qū)個(gè)數(shù)environment.setParallelism(2) import org.apache.flink.api.scala._ //獲取dataset val sourceDataSet: DataSet[String] = environment.fromElements("hello world","spark flink","hello world","hive hadoop") val result: DataSet[String] = sourceDataSet.partitionCustom(new MyPartitioner2,x => x + "") val value: DataSet[String] = result.map(x => {
println("****數(shù)據(jù)的****key****為****" + x + "****線程為****" + Thread.currentThread().getId)
x
})
value.print()
environment.execute()
}
}

dataSet的sink算子

1、writeAsText() / TextOutputFormat:以字符串的形式逐行寫入元素。字符串是通過(guò)調(diào)用每個(gè)元素的toString()方法獲得的

2、writeAsFormattedText() / TextOutputFormat:以字符串的形式逐行寫入元素。字符串是通過(guò)為每個(gè)元素調(diào)用用戶定義的format()方法獲得的。

3、writeAsCsv(...) / CsvOutputFormat:將元組寫入以逗號(hào)分隔的文件。行和字段分隔符是可配置的。每個(gè)字段的值來(lái)自對(duì)象的toString()方法。

4、print() / printToErr() / print(String msg) / printToErr(String msg) ()(注: 線上應(yīng)用杜絕使用,采用抽樣打印或者日志的方式)

5、write() / FileOutputFormat

6、output()/ OutputFormat:通用的輸出方法,用于不基于文件的數(shù)據(jù)接收器(如將結(jié)果存儲(chǔ)在數(shù)據(jù)庫(kù)中)。

3、dataSet的參數(shù)傳遞

在dataSet代碼當(dāng)中,經(jīng)常用到一些參數(shù),我們可以通過(guò)構(gòu)造器的方式傳遞參數(shù),或者使用withParameters方法來(lái)進(jìn)行參數(shù)傳遞,或者使用ExecutionConfig來(lái)進(jìn)行參數(shù)傳遞

1、使用構(gòu)造器來(lái)傳遞參數(shù)

object FlinkParameter { def main(args: Array[String]): Unit = { val env=ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val sourceSet: DataSet[String] = env.fromElements("hello world","abc test") val filterSet: DataSet[String] = sourceSet.filter(new MyFilterFunction("****test****"))
filterSet.print()
env.execute()
}
} class MyFilterFunction (parameter:String) extends FilterFunction[String]{ override def filter(t: String): Boolean = { if(t.contains(parameter)){ true }else{ false }
}
}

2、使用withParameters來(lái)傳遞參數(shù)

import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction} import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.configuration.Configuration object FlinkParameter { def main(args: Array[String]): Unit = { val env=ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val sourceSet: DataSet[String] = env.fromElements("hello world","abc test") val configuration = new Configuration()
configuration.setString("parameterKey","test") val filterSet: DataSet[String] = sourceSet.filter(new MyFilter).withParameters(configuration)
filterSet.print()
env.execute()
}
} class MyFilter extends RichFilterFunction[String]{ var value:String =""; override def open(parameters: Configuration): Unit = { value = parameters.getString("parameterKey","defaultValue")
} override def filter(t: String): Boolean = { if(t.contains(value)){ true }else{ false }
}
}

3、全局參數(shù)傳遞

import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction} import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.configuration.Configuration object FlinkParameter { def main(args: Array[String]): Unit = { val configuration = new Configuration()
configuration.setString("parameterKey","test") val env=ExecutionEnvironment.getExecutionEnvironment env.getConfig.setGlobalJobParameters(configuration) import org.apache.flink.api.scala._ val sourceSet: DataSet[String] = env.fromElements("hello world","abc test") val filterSet: DataSet[String] = sourceSet.filter(new MyFilter)
filterSet.print()
env.execute()
}
} class MyFilter extends RichFilterFunction[String]{ var value:String =""; override def open(parameters: Configuration): Unit = { val parameters: ExecutionConfig.GlobalJobParameters = getRuntimeContext.getExecutionConfig.getGlobalJobParameters val globalConf:Configuration = parameters.asInstanceOf[Configuration] value = globalConf.getString("parameterKey","test")
} override def filter(t: String): Boolean = { if(t.contains(value)){ true }else{ false }
}
}

4、Flink的dataSet connectors

1、文件系統(tǒng)connector

為了從文件系統(tǒng)讀取數(shù)據(jù),F(xiàn)link內(nèi)置了對(duì)以下文件系統(tǒng)的支持:

|

文件系統(tǒng)

|

Schema

|

備注

|
|

HDFS

|

hdfs://

|

Hdfs文件系統(tǒng)

|
|

S3

|

s3://

|

通過(guò)hadoop文件系統(tǒng)實(shí)現(xiàn)支持

|
|

MapR

|

maprfs://

|

需要用戶添加jar

|
|

Alluxio

|

alluxio://

|

通過(guò)hadoop文件系統(tǒng)實(shí)現(xiàn)

|

注意:Flink允許用戶使用實(shí)現(xiàn)org.apache.hadoop.fs.FileSystem接口的任何文件系統(tǒng)。例如S3、 Google Cloud Storage Connector for Hadoop、 Alluxio、 XtreemFS、 FTP等各種文件系統(tǒng)

Flink與Apache Hadoop MapReduce接口兼容,因此允許重用Hadoop MapReduce實(shí)現(xiàn)的代碼:

使用Hadoop Writable data type

使用任何Hadoop InputFormat作為DataSource(flink內(nèi)置HadoopInputFormat)

使用任何Hadoop OutputFormat作為DataSink(flink內(nèi)置HadoopOutputFormat)

使用Hadoop Mapper作為FlatMapFunction

使用Hadoop Reducer作為GroupReduceFunction

2、Flink集成Hbase之?dāng)?shù)據(jù)讀取

Flink也可以直接與hbase進(jìn)行集成,將hbase作為Flink的source和sink等

第一步:創(chuàng)建hbase表并插入數(shù)據(jù)

create 'hbasesource','f1'

put 'hbasesource','0001','f1:name','zhangsan'

put 'hbasesource','0002','f1:age','18'

第二步:導(dǎo)入整合jar包

<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop2</artifactId>


<version>1.7.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hbase_2.11</artifactId> <version>1.8.1</version> </dependency>

<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0-cdh5.14.2</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.0-cdh5.14.2</version> </dependency>

第三步:開(kāi)發(fā)flink集成hbase讀取hbase數(shù)據(jù)

import org.apache.flink.addons.hbase.TableInputFormat import org.apache.flink.api.java.tuple import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} import org.apache.flink.configuration.Configuration import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes object FlinkReadHBase { def main(args: Array[String]): Unit = { val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val hbaseData: DataSet[tuple.Tuple2[String, String]] = environment.createInput(new TableInputFormat[tuple.Tuple2[String, String]] { override def configure(parameters: Configuration): Unit = { val conf = HBaseConfiguration.create();
conf.set(HConstants.ZOOKEEPER_QUORUM, "node01,node02,node03")
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") val conn: Connection = ConnectionFactory.createConnection(conf) table = classOf[HTable].cast(conn.getTable(TableName.valueOf("hbasesource"))) scan = new Scan() { // setStartRow(Bytes.toBytes("1001"))
// setStopRow(Bytes.toBytes("1004"))
addFamily(Bytes.toBytes("f1"))
}
} override def getScanner: Scan = { scan } override def getTableName: String = { "hbasesource" } override def mapResultToTuple(result: Result): tuple.Tuple2[String, String] = { val rowkey: String = Bytes.toString(result.getRow) val sb = new StringBuffer() for (cell: Cell <- result.rawCells()) { val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
sb.append(value).append(",")
} val valueString = sb.replace(sb.length() - 1, sb.length(), "").toString val tuple2 = new org.apache.flink.api.java.tuple.Tuple2[String, String]
tuple2.setField(rowkey, 0)
tuple2.setField(valueString, 1)
tuple2
}

})
hbaseData.print()
environment.execute()
}
}

3、Flink讀取數(shù)據(jù),然后寫入hbase

Flink也可以集成Hbase實(shí)現(xiàn)將數(shù)據(jù)寫入到Hbase里面去

  1. 第一種:實(shí)現(xiàn)OutputFormat接口

  2. 第二種:繼承RichSinkFunction重寫父類方法

import java.util import org.apache.flink.api.common.io.OutputFormat import org.apache.flink.api.scala.{ExecutionEnvironment} import org.apache.flink.configuration.Configuration import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes object FlinkWriteHBase { def main(args: Array[String]): Unit = { val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ val sourceDataSet: DataSet[String] = environment.fromElements("01,zhangsan,28","02,lisi,30")
sourceDataSet.output(new HBaseOutputFormat)
environment.execute()
}
} class HBaseOutputFormat extends OutputFormat[String]{ val zkServer = "node01" val port = "2181" var conn: Connection = **null

override def** configure(configuration: Configuration): Unit = {

} override def open(i: Int, i1: Int): Unit = { val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create config.set(HConstants.ZOOKEEPER_QUORUM, zkServer)
config.set(HConstants.ZOOKEEPER_CLIENT_PORT, port)
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000) conn = ConnectionFactory.createConnection(config)
} override def writeRecord(it: String): Unit = { val tableName: TableName = TableName.valueOf("hbasesource") val cf1 = "f1" val array: Array[String] = it.split(",") val put: Put = new Put(Bytes.toBytes(array(0)))
put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1)))
put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2))) val putList: util.ArrayList[Put] = new util.ArrayList[Put]
putList.add(put) //設(shè)置緩存1m,當(dāng)達(dá)到1m時(shí)數(shù)據(jù)會(huì)自動(dòng)刷到hbase val params: BufferedMutatorParams = new BufferedMutatorParams(tableName) //設(shè)置緩存的大小params.writeBufferSize(1024 * 1024) val mutator: BufferedMutator = conn.getBufferedMutator(params)
mutator.mutate(putList)
mutator.flush()
putList.clear()
} override def close(): Unit = { if(null != conn){ conn.close()
}
}
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容