MapReduce任務(wù)輸出到redis中

主要包括redis連接池,重寫FileOutputFormat函數(shù)。

redis連接池

/**
* redis 連接池
*/
public class RedisHelper {

    private static JedisPool jedisPool;

    static {
        init();
    }

    public synchronized static Jedis getJedis() {
        if (jedisPool != null) {
            Jedis resource = jedisPool.getResource();
            return resource;
        } else {
            return null;
        }
    }

    /**
     * 釋放jedis資源
     *
     * @param jedis
     */
    public static void close(final Jedis jedis) {
        if (jedis != null) {
            jedis.close();
        }
    }

    public static void destroy() {
        if (jedisPool != null) {
            jedisPool.destroy();
        }
    }

public static void putValue(Jedis jedis, String key, String value) {
        jedis.set(key, value);
    }

    private static void init() {
        Properties prop = new Properties();
        try (InputStream in = RedisHelper.class.getResourceAsStream("/database_config.properties")) {
            prop.load(in);
            String redisHost = prop.getProperty("redisHost");
            int redisPort = Integer.valueOf(prop.getProperty("redisPort"));
            String redisPassword = prop.getProperty("redisPassword");
            JedisPoolConfig config = new JedisPoolConfig();
            config.setMaxTotal(Integer.valueOf(prop.getProperty("redisMaxTotal")));
            config.setMaxIdle(Integer.valueOf(prop.getProperty("redisMaxIdle")));
            config.setMinIdle(Integer.valueOf(prop.getProperty("redisMinIdle")));
            config.setMaxWaitMillis(Long.valueOf(prop.getProperty("redisMaxWaitMillis")));
            config.setTestOnBorrow(Boolean.parseBoolean(prop.getProperty("redisTestOnBorrow")));

            jedisPool = new JedisPool(config, redisHost, redisPort, 2000, redisPassword);
        } catch (IOException e) {
            log.error("Failed to initialize redis parameters. ", e);
            exit(1);
        }
    }

    private RedisHelper() {
    }
}

重寫FileOutputFormat函數(shù)

public class ResultOutputFormat<K, V> extends FileOutputFormat<K, V> {

    /**
     * 定制一個(gè)RecordWriter類,每一條reduce處理后的記錄,將該記錄輸出到數(shù)據(jù)庫中
     */
    protected static class RedisRecordWriter<K, V> extends RecordWriter<K, V> {

        private Jedis jedis;

        public RedisRecordWriter(Jedis jedis) {
            this.jedis = jedis;
        }

        @Override
        public void write(K key, V value) throws IOException, InterruptedException {
            if (key == null || value == null) {
                return;
            }
            RedisHelper.putValue(jedis, key.toString(), value.toString());
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            RedisHelper.close(jedis); //關(guān)閉鏈接
        }
    }

    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        Jedis jedis = RedisHelper.getJedis();
        return new RedisRecordWriter<K, V>(jedis);
    }
}

然后在job設(shè)置處寫入
job.setOutputFormatClass(ResultOutputFormat.class);

完美~~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容