Spark數(shù)據(jù)傾斜解決方案四:使用隨機Key進行雙重聚合

在使用reduceByKey,groupByKey算子時,都是針對PairRDD進行操作,那么,我們就可以PairRDD的每個元素的Key加上一個隨機數(shù)前綴,這樣的話,之前存在的大量相同而導致數(shù)據(jù)傾斜問題的Key就會被重新打散,從而避免數(shù)據(jù)傾斜。


在進行第一輪聚合之前,先把原先的Key加上一個隨機數(shù)前綴(10以內(nèi)的就可以),然后對隨機的Key進行聚合操作,這是可以看到,之前相同的Key都會被分到一個Task中處理,現(xiàn)在的話,就會被分配到更多的Task中處理。第一輪聚合完成之后,再把每個Key的隨機前綴去掉,恢復成原始的樣子,最后進行一次全局聚合。

代碼實現(xiàn):

package cn.spark.core.common;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Tuple2;

/**
 * Data Skew Solution
 *
 */
public class DataSkew {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("DataSkew");

        JavaSparkContext sc = new JavaSparkContext(conf);

        // create initial RDD
        JavaRDD<String> initRDD = sc.textFile(args[0]);

        // transform initRDD into pairRDD
        JavaPairRDD<String, Integer> pairRDD = initRDD.mapToPair(
                new PairFunction<String, String, Integer>() {

                    private static final long serialVersionUID = 2479906636617428526L;

                    @Override
                    public Tuple2<String, Integer> call(String line) throws Exception {
                        
                        String[] arr = line.split(",");
                        String key = arr[0];
                        Integer value = Integer.valueOf(arr[1]);
                        
                        return new Tuple2<String, Integer>(key, value);
                    }
                });
            
        // add random prefix from pairRDD
        JavaPairRDD<String, Integer> prePairRDD = pairRDD.mapToPair(
                new PairFunction<Tuple2<String,Integer>, String, Integer>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Integer> call(Tuple2<String, Integer> tuple) throws Exception {
                        
                        Random random = new Random();
                        int prefix = random.nextInt(10);
                        
                        String key = prefix+"_"+tuple._1;
                        
                        return new Tuple2<String, Integer>(key, tuple._2);
                    }
                });
        
        // reduceByKey 
        JavaPairRDD<String, Integer> tempPrePairRDD = prePairRDD.reduceByKey(
                new Function2<Integer, Integer, Integer>() {

                    private static final long serialVersionUID = 2021476568518204795L;

                    @Override
                    public Integer call(Integer value1, Integer value2) throws Exception {
                        
                        return value1 + value2;
                    }
                });
        
        // split Key
        JavaPairRDD<String, Integer> initPairRDD = tempPrePairRDD.mapToPair(
                new PairFunction<Tuple2<String,Integer>, String, Integer>() {

                    private static final long serialVersionUID = -178978937197684290L;

                    @Override
                    public Tuple2<String, Integer> call(Tuple2<String, Integer> tuple) throws Exception {
                        
                        String key = tuple._1.split("_")[1];
                        
                        return new Tuple2<String, Integer>(key, tuple._2);
                    }
                }); 
        
        // reduceByKey
        JavaPairRDD<String, Integer> resultPairRDD = initPairRDD.reduceByKey(
                new Function2<Integer, Integer, Integer>() {

                    private static final long serialVersionUID = -815845668882788529L;

                    @Override
                    public Integer call(Integer value1, Integer value2) throws Exception {

                        return value1 + value2;
                    }
                });
        
        saveToMysql(resultPairRDD, args[1]);
        
        sc.close();

    }
    
    /**
     * save resultRDD to mysql
     * 
     * @param resultPairRDD
     */
    public static void saveToMysql(JavaPairRDD<String, Integer> resultPairRDD, String tableName) {
        
        // create SparkSession object
        SparkSession spark = SparkSession.builder().getOrCreate();
        
        // create RowRDD
        JavaRDD<Row> rowRDD = resultPairRDD.map(
                new Function<Tuple2<String,Integer>, Row>() {

                    private static final long serialVersionUID = 7659308133806959864L;

                    @Override
                    public Row call(Tuple2<String, Integer> tuple) throws Exception {
                        
                        return RowFactory.create(tuple._1, tuple._2);
                    }
                });
        
        // create Schema
        List<StructField> fields = new ArrayList<StructField>();
        StructField field = null;
        field = DataTypes.createStructField("key", DataTypes.StringType, true);
        fields.add(field);
        field = DataTypes.createStructField("value", DataTypes.IntegerType, true);
        fields.add(field);
        StructType schema = DataTypes.createStructType(fields);
        
        // create DataFrame
        Dataset<Row> resultDF = spark.createDataFrame(rowRDD, schema);
        
        // save to mysql
        Properties properties = new Properties();
        properties.put("driver", "com.mysql.jdbc.Driver");
        properties.put("user", "root");
        properties.put("password", "hadoop");
        
        resultDF.write().mode("overwrite").jdbc("jdbc:mysql://localhost:3306", tableName, properties);
    }
    
}

spark-submit腳本:

spark-submit \
--class cn.spark.core.common.DataSkew \
--num-executors 1 \
--driver-memory 1000m \
--executor-memory 1000m \
--executor-cores 2 \
--driver-class-path /root/workspace/java/mysql-connector-java.jar \
--jars /root/workspace/java/mysql-connector-java.jar \
/root/workspace/java/spark-java-0.0.1-SNAPSHOT-jar-with-dependencies.jar hdfs:///temp/data/dataskew.txt retail_db.dataskew \

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • 1.1、 分配更多資源 1.1.1、分配哪些資源? Executor的數(shù)量 每個Executor所能分配的CPU數(shù)...
    miss幸運閱讀 3,282評論 3 15
  • 1、 性能調(diào)優(yōu) 1.1、 分配更多資源 1.1.1、分配哪些資源? Executor的數(shù)量 每個Executor所...
    Frank_8942閱讀 4,824評論 2 36
  • 場景 數(shù)據(jù)傾斜解決方案與shuffle類性能調(diào)優(yōu) 分析 數(shù)據(jù)傾斜 有的時候,我們可能會遇到大數(shù)據(jù)計算中一個最棘手的...
    過江小卒閱讀 3,578評論 0 9
  • 本以為這輩子也就那樣了 你卻帶來了光 華服熠熠,唇角帶笑 把這個世界的牢籠撕碎 灌之以新的希望 蜷縮在角落 抬頭 ...
    云想衣裳花想容zz閱讀 437評論 0 0
  • 1 陽光暖洋洋的照射大地。春天的陽光沒有夏天的毒辣,懶洋洋的曬在身上很舒服。微風拂過,鳥兒在歌唱蝶兒在舞蹈,春天是...
    夷則五閱讀 366評論 1 3

友情鏈接更多精彩內(nèi)容