業(yè)務(wù)詳解
處理數(shù)據(jù)主要包括的是從互聯(lián)網(wǎng)上采集來(lái)的數(shù)據(jù),包括常見(jiàn)的新聞、微博、論壇、貼吧、博客、微博、微信等信源,對(duì)原始格式進(jìn)行實(shí)時(shí)響應(yīng)處理,以期望滿足業(yè)務(wù)系統(tǒng)使用,這便是當(dāng)前數(shù)據(jù)流中數(shù)據(jù)處理的服務(wù)宗旨。
技術(shù)點(diǎn)概覽
利用廣播變量動(dòng)態(tài)更新規(guī)則
- 業(yè)務(wù)場(chǎng)景
數(shù)據(jù)流中會(huì)動(dòng)態(tài)實(shí)時(shí)加載規(guī)則表,而這種實(shí)時(shí)性要求并不強(qiáng)烈,可以是分鐘級(jí)別、小時(shí)級(jí)別;所以沒(méi)有必要浪費(fèi)大量資源對(duì)于每一條數(shù)據(jù)都進(jìn)行一次數(shù)據(jù)庫(kù)請(qǐng)求。所以適合的應(yīng)用場(chǎng)景是以固定的頻率更新各個(gè)算子中的規(guī)則,當(dāng)然這種規(guī)則所占用的內(nèi)存消耗是比較小的。 - 技術(shù)組件 - BroadcastVariable
前置條件: Flink 中多并行度的情況下,每個(gè)算子或者不同算子運(yùn)行所在的 Slot 不一致,這就導(dǎo)致它們不會(huì)共享同一個(gè)內(nèi)存,也就不可以通過(guò)靜態(tài)變量的方式去獲取這些共享變量值。
BroadcastVariable: 可以理解是一個(gè)公共的共享變量(可能是固定不變的數(shù)據(jù)集合,也可能是動(dòng)態(tài)變化的數(shù)據(jù)集合),在作業(yè)中將該共享變量廣播出去,然后下游的所有任務(wù)都可以獲取到該共享變量,這樣就可以不用將這個(gè)變量拷貝到下游的每個(gè)任務(wù)中。
使用方式:
1: 動(dòng)態(tài)讀取規(guī)則
繼承 RichSourceFunction,對(duì)應(yīng)以下實(shí)現(xiàn):
public class GetHostSourceMappingFunction extends RichSourceFunction<Set<HostSourceMappingEntity>> {
private static final Logger logger =
LoggerFactory.getLogger(GetHostSourceMappingFunction.class);
private volatile boolean isRunning = true;
private volatile String key = null;
private volatile String connector = null;
BaseConfigEntity configEntity;
// 這里的實(shí)例不可以是單例,該算子會(huì)有自己的生命周期
private JedisClusterUtil jedisClusterUtil;
public static final String [] TYPE_ARRAY = {
FieldConstants.NEWS,
FieldConstants.MEDIA
};
public GetHostSourceMappingFunction(BaseConfigEntity configEntity) {
this.configEntity = configEntity;
}
@Override
public void open(Configuration parameters) throws Exception{
connector = configEntity.getRedis().getConnector();
key = JedisClusterUtil.combineKey(configEntity.getRedis().getHostSourceMappingKey(),configEntity.getRedis());
jedisClusterUtil = new JedisClusterUtil(configEntity.getRedis());
}
@Override
public void run(SourceContext<Set<HostSourceMappingEntity>> sourceContext) throws Exception {
while (isRunning) {
Set<HostSourceMappingEntity> set = new CopyOnWriteArraySet<>();
long start = System.currentTimeMillis();
for (String type:TYPE_ARRAY) {
try {
String result = jedisClusterUtil.getValue(key + connector + type);
if(StringUtils.isNotEmpty(result)){
set.addAll(JSONArray.parseArray(result,HostSourceMappingEntity.class));
}
}catch(Exception e){
logger.error("Maybe occur NNllException ,because the type does not exist,and ignore it",e);
}
}
logger.info("select host source mapping from redis cluster, the rule size :{}, " +
"the used time(ms) is :{}",
set.size(),(System.currentTimeMillis() - start));
// 往下游發(fā)送數(shù)據(jù)集
sourceContext.collect(set);
//以固定的頻率進(jìn)行規(guī)則的動(dòng)態(tài)更新
Thread.sleep(configEntity.getRedis().getInterval());
}
}
@Override
public void cancel() {
try{
super.close();
// 進(jìn)行連接的釋放
jedisClusterUtil.close();
}catch (Exception e) {
logger.error("runException:{}", e);
}
isRunning = false;
}
}
- 主數(shù)據(jù)關(guān)聯(lián)規(guī)則數(shù)據(jù)
DataStreamSource<Set<HostSourceMappingEntity>> hostSourceMappingDataStream = env.addSource(new GetHostSourceMappingFunction(configEntity)).
setParallelism(configEntity.getFlink().getRedisSourceParallelism());
// main stream connect rule stream;可以認(rèn)為是大數(shù)據(jù)流和小數(shù)據(jù)流的join操作
SingleOutputStreamOperator<JSONObject> hostSourceFormatDataStream = HostSourceMappingConnectOperator.
formatSourceConnectOperator(blackFilterDataStream, hostSourceMappingDataStream).
setParallelism(configEntity.getFlink().getMediumParallelism());
- 數(shù)據(jù)處理
- processElement 事件處理方法
- processBroadcastElement 廣播處理方法
- 廣播狀態(tài)中的事件順序可能因任務(wù)而異:盡管廣播流的數(shù)據(jù)元保證所有數(shù)據(jù)元將(最終)轉(zhuǎn)到所有下游任務(wù),但數(shù)據(jù)元可能以不同的順序到達(dá)每個(gè)任務(wù)。因此,每個(gè)傳入數(shù)據(jù)元的狀態(tài)更新不得取決于傳入事件的順序。言外之意,1和2方法沒(méi)有先后執(zhí)行之分, 在 processElement方法中對(duì)于獲取的廣播變量要做好異常值判定,因?yàn)楹芸赡躳rocessBroadcastElement中還沒(méi)有進(jìn)行初始化操作。詳情代碼如下:
public class HostSourceMappingConnectOperator {
/** logger */
private static final Logger logger = LoggerFactory.getLogger(HostSourceMappingConnectOperator.class);
private static String HOST_SOURCE_MAPPING_NAME = "host_source_mapping_name";
final static MapStateDescriptor<String, Set<HostSourceMappingEntity>> HOST_SOURCE_MAPPING;
static {
HOST_SOURCE_MAPPING = new MapStateDescriptor<>(
HOST_SOURCE_MAPPING_NAME,
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Set<HostSourceMappingEntity>>(){}));
}
public static SingleOutputStreamOperator<JSONObject> formatSourceConnectOperator(DataStream<JSONObject> jsonObjectDataStream,
DataStreamSource<Set<HostSourceMappingEntity>> hostSourceMappingStream){
SingleOutputStreamOperator<JSONObject> filterDataConnectStream = jsonObjectDataStream.connect(hostSourceMappingStream.broadcast(HOST_SOURCE_MAPPING))
.process(new BroadcastProcessFunction<JSONObject, Set<HostSourceMappingEntity>, JSONObject>() {
/**
main stream 處理
*/
@Override
public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
ReadOnlyBroadcastState<String, Set<HostSourceMappingEntity>> broadcastState = ctx.getBroadcastState(HOST_SOURCE_MAPPING);
Set<HostSourceMappingEntity> hostSourceSet = broadcastState.get(HOST_SOURCE_MAPPING_NAME);
if (!FieldConstants.WEI_BO.equals(value.getString(FieldConstants.DOC_TYPE)) &&
!FieldConstants.WEI_XIN.equals(value.getString(FieldConstants.DOC_TYPE)) &&
!CollectionUtils.isEmpty(hostSourceSet)) {
HostSourceMappingEntity targetEntity = null;
for (HostSourceMappingEntity hostSourceMappingEntity : hostSourceSet) {
if (value.getString(FieldConstants.URL).contains(hostSourceMappingEntity.getHost())
&& hostSourceMappingEntity.getHost().length() > (targetEntity == null ? 0 : targetEntity.getHost().length())) {
targetEntity = hostSourceMappingEntity;
}
}
if (targetEntity != null) {
logger.info("[target host mapping]" + targetEntity);
value.put(FieldConstants.SOURCE, targetEntity.getTargetSource());
value.put(FieldConstants.DOC_TYPE, targetEntity.getTargetDocType());
}
}else{
if(CollectionUtils.isEmpty(hostSourceSet)) {
logger.warn("[hostSource format] broadcast load lazy !!!!!!");
}
}
out.collect(value);
}
/**
處理廣播變量數(shù)據(jù)
*/
@Override
public void processBroadcastElement(Set<HostSourceMappingEntity> value, Context ctx, Collector<JSONObject> out) throws Exception {
if (CollectionUtil.isNullOrEmpty(value)) {
return;
}
BroadcastState<String, Set<HostSourceMappingEntity>> hostSourceMappingBroadcastState = ctx.getBroadcastState(HOST_SOURCE_MAPPING);
hostSourceMappingBroadcastState.put(HOST_SOURCE_MAPPING_NAME, value);
}
}).name(FLINK_OPERATOR_HOST_SOURCE_MAPPING_FROM_REDIS);
return filterDataConnectStream;
}
}
-
整體的數(shù)據(jù)流圖片段
廣播變量的應(yīng)用
參數(shù)的解析和配置文件規(guī)范化設(shè)置
- 關(guān)于配置文件,相信大家很少再去使用properties,取而代之的是YAML,其良好的層次結(jié)構(gòu),以及和Java Bean的映射,讓大家愛(ài)不釋手。舉例如下:
#kafka
kafka:
kafkaZk: "172.24.4.18:2181,172.24.4.19:2181,172.24.4.20:2181"
kafkaTopic: "mf-dev-hl"
kafkaBrokerList: "172.24.2.78:9092,172.24.2.79:9092,172.24.2.80:9092"
groupId: "mf-dev-hl"
autoOffsetReset: "earliest"
fetchMessageMaxBytes: 4194304
#redis cluster
redis:
ipPort: "172.24.4.18:7000,172.24.4.18:7001,172.24.4.18:7002,172.24.4.19:7000,172.24.4.19:7001,172.24.4.19:7002"
businessId: mf
spamWebSiteKey: spam_website
connector: _
areaCategoryMappingKey: area_mapping_dict
mediaRegionMappingKey: media_region
#es cluster
es:
clusterName: cluster_index
nodes: "192.168.x.x,192.168.x.x"
tcpPorts: "9300,9300"
httpPorts: "9200"
flink:
parallelism: 1
appName: Test1
- 關(guān)于命令行解析的使用
Flink 官方提供ParameterTool應(yīng)具有較好的實(shí)用性,在官方最佳實(shí)踐篇也具有較好的解釋;refer:https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/best_practices.html
在這里我使用的是: apache.commons.cli,也具有極好的使用性和易用性;代碼片段如下:
private static Options initOptions(){
Options options = new Options();
options.addOption(Option.builder(ARGS_TYPE).hasArg(true).required(true).desc("the logistic type in [hl,zhxg,bfd]").build());
options.addOption(Option.builder(ARGS_ENV).hasArg(true).required(true).desc("the env in [dev,test,prod]").build());
return options;
}
public static CommandLine getCommandLineWithCheck(String[] args){
CommandLine cmd = null;
try {
CommandLineParser parser = new DefaultParser();
cmd = parser.parse( initOptions(), args);
} catch (ParseException e) {
logger.error("args parse exception, and invoke system exit function",e);
}finally {
if(cmd == null){
logger.error("參數(shù)解析異常,退出系統(tǒng)");
System.exit(-1);
}
if(!ARGS_TYPE_VALUES.contains(cmd.getOptionValue(ARGS_TYPE))){
logger.error("the type must in " + ARGS_TYPE_VALUES);
System.exit(-1);
}
if(!ARGS_ENV_VALUES.contains(cmd.getOptionValue(ARGS_ENV))){
logger.error("the env must in " + ARGS_ENV_VALUES);
System.exit(-1);
}
}
return cmd;
}
關(guān)于TaskManger、slot(task slot)、parallelism的認(rèn)知
- TaskManager: 獨(dú)立的JVM進(jìn)程,其并行能力由slot個(gè)數(shù)決定。配置文件中默認(rèn)如下:taskmanager.numberOfTaskSlots: 1, 每個(gè)Taskmanager由一個(gè)slot組成,即其并行度為1。該 slot獨(dú)享內(nèi)存空間,如果有多個(gè)slot,那么均分Taskmanager內(nèi)存空間。
-
slot:在flink中, slot是資源組的含義。那么他有以下幾個(gè)特性:
Flink內(nèi)部圖.png
- TaskManager 是從 JobManager 處接收需要部署的 Task,任務(wù)能配置的最大并行度由 TaskManager 上可用的 Slot 決定。
- 每個(gè)任務(wù)代表分配給任務(wù)槽的一組資源,Slot 在 Flink 里面可以認(rèn)為是資源組,F(xiàn)link 將每個(gè)任務(wù)分成子任務(wù)并且將這些子任務(wù)分配到 Slot 中,這樣就可以并行的執(zhí)行程序。
- TaskManager 的一個(gè) Slot 代表一個(gè)可用線程,該線程具有固定的內(nèi)存,注意 Slot 只對(duì)內(nèi)存隔離,沒(méi)有對(duì) CPU 隔離。
- 默認(rèn)情況下,F(xiàn)link 允許子任務(wù)共享 Slot,即使它們是不同 Task 的 subtask,只要它們來(lái)自相同的 Job,這種共享模式可以大大的提高資源利用率。
parallelism: 對(duì)應(yīng)的算子或者Task的并發(fā)執(zhí)行能力。
-
slot 和 parallelism的區(qū)別
-
Slot 是指 TaskManager 最大能并發(fā)執(zhí)行的能力
slot.png
上圖表示每個(gè) TaskManager 中含有3個(gè)slot,在 Flink on yarn 中的Per Job模式下, taskManager的個(gè)數(shù)是動(dòng)態(tài)計(jì)算出來(lái)的,依據(jù)operator的最大并行度(max(p));計(jì)算公式:ceil( max(p) / numberOfTaskSlots);
-
-
parallelism 是指 TaskManager 實(shí)際使用的并發(fā)能力
parallelism.png
在并行度為1條件下,只會(huì)使用其中一個(gè) slot。接下來(lái)以實(shí)驗(yàn)來(lái)作此說(shuō)明。
- 實(shí)驗(yàn)環(huán)境
參數(shù)說(shuō)明:-ys:每個(gè)taskManger對(duì)應(yīng)的Slot個(gè)數(shù)
-ytm: 單個(gè)TaskMananger占用的內(nèi)存大小
-yn: --yarncontainer Number of YARN container to allocate
(=Number of Task Managers);對(duì)于Per Job模式是不起作用的,為啦作區(qū)分度,下述的日志以 -yn=80
程序啟動(dòng)日志顯示:
2020-02-20 00:02:47,157 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument yn is deprecated in will be ignored.
2020-02-20 00:02:47,279 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=4096, numberTaskManagers=80, slotsPerTaskManager=5}


ok,相信這一塊已經(jīng)講的很明白啦,有啦這個(gè)基礎(chǔ),便可以進(jìn)行以下的內(nèi)容。
關(guān)于Operator Chains的理解
為了更高效地分布式執(zhí)行,F(xiàn)link會(huì)盡可能地將operator的subtask鏈接(chain)在一起形成task。每個(gè)task在一個(gè)線程中執(zhí)行。將operators鏈接成task是非常有效的優(yōu)化:它能減少線程之間的切換,減少消息的序列化/反序列化,減少數(shù)據(jù)在緩沖區(qū)的交換,減少了延遲的同時(shí)提高整體的吞吐量。
下圖便是進(jìn)行啦一次算子的合并:

那么算子合并的條件是什么哪?
- 上下游的并行度一致
- 下游節(jié)點(diǎn)的入度為1 (也就是說(shuō)下游節(jié)點(diǎn)沒(méi)有來(lái)自其他節(jié)點(diǎn)的輸入)
- 上下游節(jié)點(diǎn)都在同一個(gè) slot group 中(下面會(huì)解釋 slot group)
- 下游節(jié)點(diǎn)的 chain 策略為 ALWAYS(可以與上下游鏈接,map、flatmap、filter等默認(rèn)是ALWAYS)
- 上游節(jié)點(diǎn)的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source默認(rèn)是HEAD)
- 兩個(gè)節(jié)點(diǎn)間數(shù)據(jù)分區(qū)方式是 forward(參考理解數(shù)據(jù)流的分區(qū))
- 用戶沒(méi)有禁用 chain
可以認(rèn)為效果等同于將所有operator的實(shí)現(xiàn)都封裝于一個(gè)大的方法體中串行執(zhí)行,的確是提供啦極大的靈活性。一套代碼即實(shí)現(xiàn)啦算子的合并,也可實(shí)現(xiàn)算子的拆分。 這里有一篇不錯(cuò)的文章可以參照原理實(shí)現(xiàn):http://wuchong.me/blog/2016/05/09/flink-internals-understanding-execution-resources/#
BackPressure的原理
一旦提起storm、spark streaming、Flink三種常用的數(shù)據(jù)流框架,最惹人吐槽的便是storm的反壓機(jī)制,storm是通過(guò)監(jiān)控bolt的負(fù)載,暴力的在spout端進(jìn)行數(shù)據(jù)的暫停消費(fèi)。而Flink的反壓機(jī)制這是基于由下游往上游動(dòng)態(tài)傳遞反壓信息。
未完待續(xù)......



