1.業(yè)務(wù)數(shù)據(jù)的采集

canal
2.Canal的使用
1.添加?個普通?戶
CREATE USER canal IDENTIFIED BY 'canal123'; 創(chuàng)建一個用戶
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'canal'@'%'; 添加權(quán)限
FLUSH PRIVILEGES; 刷新
如果失敗:
修改密碼的安全權(quán)限
set global validate_password_policy=LOW;
set global validate_password_length=6;
2. 修改MySQL的配置
canal的原理是基于mysql binlog技術(shù),所以這??定需要開啟mysql的binlog寫?功能,建議配置binlog模式為row
vi /etc/my.cnf
[mysqld]
log-bin=mysql-bin #添加這??就ok
binlog-format=ROW #選擇row模式
server_id=1 #配置mysql replaction需要定義,不能和canal的slaveId重復(fù)
service mysqld restart 重啟
3..修改Canal的配置?件
cd /bigdata/canal/
vi canal.properties
#canal跟kafka整合,將數(shù)據(jù)發(fā)送到kafka
canal.serverMode = kafka
#指定kafka broker地址
canal.mq.servers = linux03:9092,linux04:9092,linux05:9092
#數(shù)據(jù)發(fā)送kafka失敗重試次數(shù)
canal.mq.retries = 10
修改canal的實例配置?件
vi conf/example/instance.properties vi conf/example/instance.properties
#mysql數(shù)據(jù)庫的地址
canal.instance.master.address=127.0.0.1:3306
#mysql?戶名
canal.instance.dbUsername=canal
#mysql密碼
canal.instance.dbPassword=canal123
#注釋掉使?默認的topic(將數(shù)據(jù)寫?到默認的Topic)
#canal.mq.topic=example
# dynamic topic route by schema or table regex
#使?動態(tài)topic,將doit的數(shù)據(jù)庫的test表發(fā)送到kafka的test1的topic
#將hhht的數(shù)據(jù)庫的user表發(fā)送到kafka的user1的topic
canal.mq.dynamicTopic=topic1:db1\\.tb1,user1:hhh\\.user
4.啟動Canal
bin/startup.sh
5.問題的處理
啟動不起來
1.刪除mysql?錄下的數(shù)據(jù)
rm -rf /var/lib/mysql
mysqld --initialize --user=mysql
3.join案例
2021-01-26 09:00:01,user1,tuan001,手機,product005,9.9,create
2021-01-26 09:00:08,user2,tuan001,手機,product005,9.9,join
2021-01-26 09:00:08,user2,tuan001,手機,product005,9.9,join
2021-01-26 09:00:08,tuan001,3,手機,product005,9.9,success
2021-01-26 09:00:10,user5,tuan002,電腦,product009,9.9,create
2021-01-26 09:00:08,user3,tuan002,電腦,product009,9.9,join
2021-01-26 09:00:08,user4,tuan002,電腦,product009,9.9,join
2021-01-26 09:00:08,tuan002,3,電腦,product009,9.9,success
2021-01-26 09:00:01,user1,tuan003,服裝,product007,9.9,create
2021-01-26 09:00:08,user2,tuan003,電腦,product009,9.9,join
2021-01-26 09:00:01,user2,tuan004,日用品,product010,9.9,create
2021-01-26 10:00:01,tuan004,日用品,product010,9.9,fail
1.從今天凌晨開始,到現(xiàn)在有多少個團,各種團的狀態(tài)
2.參與開團的人數(shù)和次數(shù)
3.通過各個分類成交的開團金額
public class FlinkUtilsV2 {
private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
public static <T> DataStream<T> createKafkaDataStream(ParameterTool parameters, Class<? extends DeserializationSchema<T>> clazz) throws Exception {
String topics = parameters.getRequired("kafka.topics");
String groupId = parameters.getRequired("group.id");
return createKafkaDataStream(parameters, topics, groupId, clazz);
}
public static <T> DataStream<T> createKafkaDataStream(ParameterTool parameters, String topics, Class<? extends DeserializationSchema<T>> clazz) throws Exception {
String groupId = parameters.getRequired("group.id");
return createKafkaDataStream(parameters, topics, groupId, clazz);
}
public static <T> DataStream<T> createKafkaDataStream(ParameterTool parameters, String topics, String groupId, Class<? extends DeserializationSchema<T>> clazz) throws Exception {
//將ParameterTool的參數(shù)設(shè)置成全局的參數(shù)
env.getConfig().setGlobalJobParameters(parameters);
//開啟checkpoint
env.enableCheckpointing(parameters.getLong("checkpoint.interval", 10000L), CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointingMode(CheckpointConfig.DEFAULT_MODE);
//重啟策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(parameters.getInt("restart.times", 10), Time.seconds(5)));
//設(shè)置statebackend
String path = parameters.get("state.backend.path");
if(path != null) {
//最好的方式將setStateBackend配置到Flink的全局配置文件中flink-conf.yaml
env.setStateBackend(new FsStateBackend(path));
}
//設(shè)置cancel任務(wù)不用刪除checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(3);
//String topics = parameters.getRequired("kafka.topics");
List<String> topicList = Arrays.asList(topics.split(","));
Properties properties = parameters.getProperties();
properties.setProperty("group.id", groupId);
//創(chuàng)建FlinkKafkaConsumer
FlinkKafkaConsumer<T> kafkaConsumer = new FlinkKafkaConsumer<T>(
topicList,
clazz.newInstance(),
properties
);
return env.addSource(kafkaConsumer);
}
public static StreamExecutionEnvironment getEnv() {
return env;
}
public class OrderMain {
private Long oid;
private Date create_time;
private Double total_money;
private int status;
private Date update_time;
private String province;
private String city;
//對數(shù)據(jù)庫的操作類型:INSERT、UPDATE
private String type;
public Long getOid() {
return oid;
}
public void setOid(Long oid) {
this.oid = oid;
}
public Date getCreate_time() {
return create_time;
}
public void setCreate_time(Date create_time) {
this.create_time = create_time;
}
public Double getTotal_money() {
return total_money;
}
public void setTotal_money(Double total_money) {
this.total_money = total_money;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public Date getUpdate_time() {
return update_time;
}
public void setUpdate_time(Date update_time) {
this.update_time = update_time;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
@Override
public String toString() {
return "OrderMain{" +
"oid=" + oid +
", create_time=" + create_time +
", total_money=" + total_money +
", status=" + status +
", update_time=" + update_time +
", province='" + province + '\'' +
", city='" + city + '\'' +
", type='" + type + '\'' +
'}';
}
}
public class OrderDetail {
private Long id;
private Long order_id;
private int category_id;
private String categoryName;
private Long sku;
private Double money;
private int amount;
private Date create_time;
private Date update_time;
//對數(shù)據(jù)庫的操作類型:INSERT、UPDATE
private String type;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Long getOrder_id() {
return order_id;
}
public void setOrder_id(Long order_id) {
this.order_id = order_id;
}
public int getCategory_id() {
return category_id;
}
public void setCategory_id(int category_id) {
this.category_id = category_id;
}
public Long getSku() {
return sku;
}
public void setSku(Long sku) {
this.sku = sku;
}
public Double getMoney() {
return money;
}
public void setMoney(Double money) {
this.money = money;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public Date getCreate_time() {
return create_time;
}
public void setCreate_time(Date create_time) {
this.create_time = create_time;
}
public Date getUpdate_time() {
return update_time;
}
public void setUpdate_time(Date update_time) {
this.update_time = update_time;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getCategoryName() {
return categoryName;
}
public void setCategoryName(String categoryName) {
this.categoryName = categoryName;
}
@Override
public String toString() {
return "OrderDetail{" +
"id=" + id +
", order_id=" + order_id +
", category_id=" + category_id +
", categoryName='" + categoryName + '\'' +
", sku=" + sku +
", money=" + money +
", amount=" + amount +
", create_time=" + create_time +
", update_time=" + update_time +
", type='" + type + '\'' +
'}';
}
}
public class OrderJoin {
public static void main(String[] args) throws Exception {
ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]);
//使用EventTime作為時間標準
FlinkUtilsV2.getEnv().setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> orderMainLinesDataStream = FlinkUtilsV2.createKafkaDataStream(parameters, "ordermain", "g1", SimpleStringSchema.class);
DataStream<String> orderDetailLinesDataStream = FlinkUtilsV2.createKafkaDataStream(parameters, "orderdetail", "g1", SimpleStringSchema.class);
//對數(shù)據(jù)進行解析
SingleOutputStreamOperator<OrderMain> orderMainDataStream = orderMainLinesDataStream.process(new ProcessFunction<String, OrderMain>() {
@Override
public void processElement(String line, Context ctx, Collector<OrderMain> out) throws Exception {
//flatMap+filter
try {
JSONObject jsonObject = JSON.parseObject(line);
String type = jsonObject.getString("type");
if (type.equals("INSERT") || type.equals("UPDATE")) {
JSONArray jsonArray = jsonObject.getJSONArray("data");
for (int i = 0; i < jsonArray.size(); i++) {
OrderMain orderMain = jsonArray.getObject(i, OrderMain.class);
orderMain.setType(type); //設(shè)置操作類型
out.collect(orderMain);
}
}
} catch (Exception e) {
//e.printStackTrace();
//記錄錯誤的數(shù)據(jù)
}
}
});
//對數(shù)據(jù)進行解析
SingleOutputStreamOperator<OrderDetail> orderDetailDataStream = orderDetailLinesDataStream.process(new ProcessFunction<String, OrderDetail>() {
@Override
public void processElement(String line, Context ctx, Collector<OrderDetail> out) throws Exception {
//flatMap+filter
try {
JSONObject jsonObject = JSON.parseObject(line);
String type = jsonObject.getString("type");
if (type.equals("INSERT") || type.equals("UPDATE")) {
JSONArray jsonArray = jsonObject.getJSONArray("data");
for (int i = 0; i < jsonArray.size(); i++) {
OrderDetail orderDetail = jsonArray.getObject(i, OrderDetail.class);
orderDetail.setType(type); //設(shè)置操作類型
out.collect(orderDetail);
}
}
} catch (Exception e) {
//e.printStackTrace();
//記錄錯誤的數(shù)據(jù)
}
}
});
int delaySeconds = 2;
//提取EventTime生成WaterMark
SingleOutputStreamOperator<OrderMain> orderMainStreamWithWaterMark = orderMainDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderMain>(Time.seconds(delaySeconds)) {
@Override
public long extractTimestamp(OrderMain element) {
return element.getCreate_time().getTime();
}
});
SingleOutputStreamOperator<OrderDetail> orderDetailStreamWithWaterMark = orderDetailDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderDetail>(Time.seconds(delaySeconds)) {
@Override
public long extractTimestamp(OrderDetail element) {
return element.getCreate_time().getTime();
}
});
//Left Out JOIN,并且將訂單明細表作為左表
DataStream<Tuple2<OrderDetail, OrderMain>> joined = orderDetailStreamWithWaterMark.coGroup(orderMainStreamWithWaterMark)
.where(new KeySelector<OrderDetail, Long>() {
@Override
public Long getKey(OrderDetail value) throws Exception {
return value.getOrder_id();
}
})
.equalTo(new KeySelector<OrderMain, Long>() {
@Override
public Long getKey(OrderMain value) throws Exception {
return value.getOid();
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new CoGroupFunction<OrderDetail, OrderMain, Tuple2<OrderDetail, OrderMain>>() {
@Override
public void coGroup(Iterable<OrderDetail> first, Iterable<OrderMain> second, Collector<Tuple2<OrderDetail, OrderMain>> out) throws Exception {
for (OrderDetail orderDetail : first) {
boolean isJoined = false;
for (OrderMain orderMain : second) {
out.collect(Tuple2.of(orderDetail, orderMain));
isJoined = true;
}
if (!isJoined) {
out.collect(Tuple2.of(orderDetail, null));
}
}
}
});
joined.print();
FlinkUtilsV2.getEnv().execute();
}
}
public class OrderJoinAdv {
public static void main(String[] args) throws Exception {
ParameterTool parameters = ParameterTool.fromPropertiesFile(args[0]);
FlinkUtilsV2.getEnv().setParallelism(1);
//使用EventTime作為時間標準
FlinkUtilsV2.getEnv().setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> orderMainLinesDataStream = FlinkUtilsV2.createKafkaDataStream(parameters, "ordermain", "g1", SimpleStringSchema.class);
DataStream<String> orderDetailLinesDataStream = FlinkUtilsV2.createKafkaDataStream(parameters, "orderdetail", "g1", SimpleStringSchema.class);
//對數(shù)據(jù)進行解析
SingleOutputStreamOperator<OrderMain> orderMainDataStream = orderMainLinesDataStream.process(new ProcessFunction<String, OrderMain>() {
@Override
public void processElement(String line, Context ctx, Collector<OrderMain> out) throws Exception {
//flatMap+filter
try {
JSONObject jsonObject = JSON.parseObject(line);
String type = jsonObject.getString("type");
if (type.equals("INSERT") || type.equals("UPDATE")) {
JSONArray jsonArray = jsonObject.getJSONArray("data");
for (int i = 0; i < jsonArray.size(); i++) {
OrderMain orderMain = jsonArray.getObject(i, OrderMain.class);
orderMain.setType(type); //設(shè)置操作類型
out.collect(orderMain);
}
}
} catch (Exception e) {
//e.printStackTrace();
//記錄錯誤的數(shù)據(jù)
}
}
});
//對數(shù)據(jù)進行解析
SingleOutputStreamOperator<OrderDetail> orderDetailDataStream = orderDetailLinesDataStream.process(new ProcessFunction<String, OrderDetail>() {
@Override
public void processElement(String line, Context ctx, Collector<OrderDetail> out) throws Exception {
//flatMap+filter
try {
JSONObject jsonObject = JSON.parseObject(line);
String type = jsonObject.getString("type");
if (type.equals("INSERT") || type.equals("UPDATE")) {
JSONArray jsonArray = jsonObject.getJSONArray("data");
for (int i = 0; i < jsonArray.size(); i++) {
OrderDetail orderDetail = jsonArray.getObject(i, OrderDetail.class);
orderDetail.setType(type); //設(shè)置操作類型
out.collect(orderDetail);
}
}
} catch (Exception e) {
//e.printStackTrace();
//記錄錯誤的數(shù)據(jù)
}
}
});
int delaySeconds = 2;
int windowSize = 5;
//提取EventTime生成WaterMark
SingleOutputStreamOperator<OrderMain> orderMainStreamWithWaterMark = orderMainDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderMain>(Time.seconds(delaySeconds)) {
@Override
public long extractTimestamp(OrderMain element) {
return element.getCreate_time().getTime();
}
});
SingleOutputStreamOperator<OrderDetail> orderDetailStreamWithWaterMark = orderDetailDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderDetail>(Time.seconds(delaySeconds)) {
@Override
public long extractTimestamp(OrderDetail element) {
return element.getCreate_time().getTime();
}
});
//定義遲到側(cè)流輸出的Tag
OutputTag<OrderDetail> lateTag = new OutputTag<OrderDetail>("late-date") {};
//對左表進行單獨劃分窗口,窗口的長度與cogroup的窗口長度一樣
SingleOutputStreamOperator<OrderDetail> orderDetailWithWindow = orderDetailStreamWithWaterMark.windowAll(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
.sideOutputLateData(lateTag) //將遲到的數(shù)據(jù)打上Tag
.apply(new AllWindowFunction<OrderDetail, OrderDetail, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<OrderDetail> values, Collector<OrderDetail> out) throws Exception {
for (OrderDetail value : values) {
out.collect(value);
}
}
});
//獲取遲到的數(shù)據(jù)
DataStream<OrderDetail> lateOrderDetailStream = orderDetailWithWindow.getSideOutput(lateTag);
//應(yīng)為orderDetail表的數(shù)據(jù)遲到數(shù)據(jù)不是很多,沒必要使用異步IO,直接使用RichMapFunction
SingleOutputStreamOperator<Tuple2<OrderDetail, OrderMain>> lateOrderDetailAndOrderMain = lateOrderDetailStream.map(new RichMapFunction<OrderDetail, Tuple2<OrderDetail, OrderMain>>() {
@Override
public Tuple2<OrderDetail, OrderMain> map(OrderDetail detail) throws Exception {
return Tuple2.of(detail, null);
}
});
//Left Out JOIN,并且將訂單明細表作為左表
DataStream<Tuple2<OrderDetail, OrderMain>> joined = orderDetailWithWindow.coGroup(orderMainStreamWithWaterMark)
.where(new KeySelector<OrderDetail, Long>() {
@Override
public Long getKey(OrderDetail value) throws Exception {
return value.getOrder_id();
}
})
.equalTo(new KeySelector<OrderMain, Long>() {
@Override
public Long getKey(OrderMain value) throws Exception {
return value.getOid();
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
.apply(new CoGroupFunction<OrderDetail, OrderMain, Tuple2<OrderDetail, OrderMain>>() {
@Override
public void coGroup(Iterable<OrderDetail> first, Iterable<OrderMain> second, Collector<Tuple2<OrderDetail, OrderMain>> out) throws Exception {
for (OrderDetail orderDetail : first) {
boolean isJoined = false;
for (OrderMain orderMain : second) {
out.collect(Tuple2.of(orderDetail, orderMain));
isJoined = true;
}
if (!isJoined) {
out.collect(Tuple2.of(orderDetail, null));
}
}
}
});
joined.union(lateOrderDetailAndOrderMain).map(new RichMapFunction<Tuple2<OrderDetail, OrderMain>, Tuple2<OrderDetail, OrderMain>>() {
private transient Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
//可以創(chuàng)建數(shù)據(jù)庫連接
connection = DriverManager.getConnection("jdbc:mysql://172.16.100.100:3306/bigdata?characterEncoding=UTF-8", "root", "123456");
}
@Override
public Tuple2<OrderDetail, OrderMain> map(Tuple2<OrderDetail, OrderMain> tp) throws Exception {
//每個關(guān)聯(lián)上訂單主表的數(shù)據(jù),就查詢書庫
if (tp.f1 == null) {
tp.f1 = queryOrderMainFromMySQL(tp.f0.getOrder_id(), connection);
}
return tp;
}
@Override
public void close() throws Exception {
//關(guān)閉數(shù)據(jù)庫連接
}
}).print();
FlinkUtilsV2.getEnv().execute();
}
private static OrderMain queryOrderMainFromMySQL(Long order_id, Connection connection) throws Exception {
PreparedStatement preparedStatement = connection.prepareStatement("SELECT * ordermain WHERE oid = ?");
//設(shè)置參數(shù)
preparedStatement.setLong(1, order_id);
//執(zhí)行查詢
ResultSet resultSet = preparedStatement.executeQuery();
//取出結(jié)果
long oid = resultSet.getLong("oid");
Date createTime = resultSet.getDate("create_time");
double totalMoney = resultSet.getDouble("total_money");
int status = resultSet.getInt("status");
OrderMain orderMain = new OrderMain();
orderMain.setOid(oid);
orderMain.setStatus(status);
return orderMain;
}
}