自定義sourceAndsink

連接kafka獲取數(shù)據(jù)

//flink流處理
StreamExecutionEnvironment exe = StreamExecutionEnvironment.getExecutionEnvironment();
        //將Properties創(chuàng)建出來
        Properties properties = new Properties();
        //將配置信息set到Properties
        properties.setProperty("bootstrap.servers","node4:9092");
        properties.setProperty("group.id","1");
        //使用FlinkKafkaConsumer011(自己的kafka版本[參數(shù)](z'ji't'y))
        FlinkKafkaConsumer011<String> jk = new FlinkKafkaConsumer011<>("jk", new SimpleStringSchema(),properties);
        //
        DataStreamSource<String> stringDataStreamSource = exe.addSource(jk);

MySQLsource

                      使用RichSourceFunction自定義source
public class MySqlSource extends RichSourceFunction<Tuple4<String, String, Double, Long>> {

    Connection conn;
    Statement stat;
    boolean flag = true;


    @Override
    //open內(nèi)寫連接方法
    public void open(Configuration parameters) throws Exception {
        //類反射mysqlDriver
        Class.forName("com.mysql.jdbc.Driver");
        //mysql的連接方式用戶密碼
        conn = DriverManager.getConnection("jdbc:mysql://node4:3306/1704e", "root", "123456");
        stat = conn.createStatement();
    }

    @Override
    //run內(nèi)寫具體的邏輯
    public void run(SourceContext<Tuple4<String, String, Double, Long>> ctx) throws Exception {
        while (flag) {
            ResultSet resultSet = stat.executeQuery("select * from t_dev where dev_state = 0");
            StringBuffer sb = new StringBuffer("(");
            int count = 0;
            while (resultSet.next()){
                count ++;
                long id = resultSet.getLong(1);
                String devId = resultSet.getString(2);
                String metric = resultSet.getString(3);
                double value = resultSet.getDouble(4);
                long timestamp = resultSet.getLong(5);
                sb.append(id+",");
                ctx.collect(Tuple4.of(devId, metric, value,timestamp));
            }
            String ids = sb.toString();
            ids = ids.substring(0, ids.length() - 1);
            ids = ids + ")";
            if(count != 0){
                String updateSql = "update t_dev set dev_state = 1 where id in "+ids;
                System.out.println(updateSql);
                stat.execute(updateSql);
            }
            Thread.sleep(10000);
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }

    @Override
    public void close() throws Exception {
        if(conn != null){
            conn.close();
        }
    }
}

HBaseSink

                          //繼承RichSinkFunction
public class HBaseSink extends RichSinkFunction<Tuple6<String, String, Double, Double, Double, Long>> {

    private org.apache.hadoop.conf.Configuration conf;
    private Connection conn;
    private Table table;
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");


    @Override
    //open內(nèi)寫連接方法
    public void open(Configuration parameters) throws Exception {
        //創(chuàng)建Hbase連接
        conf = HBaseConfiguration.create();
        //配置信息
        conf.set("hbase.zookeeper.quorum","node4:2181");
        System.out.println("long...........");
        conn = ConnectionFactory.createConnection(conf);
        System.out.println("ok....................");
        //獲取表
        table = conn.getTable(TableName.valueOf("ns1:t_dev_data"));
    }

    @Override
    //invoke 具有邏輯實(shí)現(xiàn)
    public void invoke(Tuple6<String, String, Double, Double, Double, Long> value, Context context) throws Exception {
        String format = sdf.format(value.f5);
        String rowKeyStr = value.f0 + value.f1 + format;

        Put put = new Put(Bytes.toBytes(rowKeyStr));
        put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("max"),Bytes.toBytes(value.f2));
        put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("min"),Bytes.toBytes(value.f3));
        put.addColumn(Bytes.toBytes("f1"),Bytes.toBytes("avg"),Bytes.toBytes(value.f4));

        table.put(put);

    }

    @Override
    public void close() throws Exception {
        if(table != null) {
            table.close();
        }
        if(conn != null){
            conn.close();
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容