Spring Boot整合redis實(shí)現(xiàn)隊(duì)列存儲(chǔ)微服務(wù)

本文介紹Spring Boot整合Redis實(shí)現(xiàn)隊(duì)列存儲(chǔ)。隊(duì)列存儲(chǔ)通常以Rest微服務(wù)形式提供服務(wù)接口,所以Spring Boot+Redis是一個(gè)理想選型。

典型的應(yīng)用場(chǎng)景,比如爬蟲系統(tǒng)中任務(wù)列表的存儲(chǔ),各個(gè)爬蟲子進(jìn)(線)程獨(dú)立、主動(dòng)訪問(wèn)該隊(duì)列獲取URLs,并支持批量獲取。

  • Step1:
    Spring Boot工程的Maven中添加依賴:
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-redis</artifactId>
</dependency>

本文使用SpringBoot 1.5.2.RELEASE。

  • Step2
    Application.java入口定義必要的Bean:
@SpringBootApplication(scanBasePackages = { 
        "", "" })
public class Application implements CommandLineRunner {

    @Autowired private JedisConnectionFactory jedisConnFactory;
    
    @Bean
    public StringRedisTemplate redisTemplate() {

        StringRedisTemplate redisTemplate = new StringRedisTemplate();
        redisTemplate.setConnectionFactory(jedisConnFactory);
        return redisTemplate;
    }
    
    @Bean
    public QueueService queueService() {
        return new QueueServiceSDRImpl(redisTemplate());
    }
    
    public static void main(String[] args) throws InterruptedException {

        SpringApplication app = new SpringApplication(Application.class);
        app.setBannerMode(Banner.Mode.CONSOLE);
        app.setWebEnvironment(true);
        app.run(args);
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("Project: running...");
    }
}

在此只定義一個(gè)StringRedisTemplate,至于保存對(duì)象的需求可以手動(dòng)轉(zhuǎn)成json存儲(chǔ)。

  • Step3:定義QueueService接口:
public interface QueueService {

    /**
     * 取N條URL隊(duì)列數(shù)據(jù)
     * @param fullTaskName
     * @param numbersOfURL
     * @return
     */
    public List<BasicWebURL> fetchN(String fullTaskName, Long numbersOfURL);
    
    /**
     * URL隊(duì)列入隊(duì)
     * @param webURLList
     * @return
     */
    public Long enQueue(String fullTaskName, String... webURLJSONStringArray);
    
    /**
     * URL隊(duì)列長(zhǎng)度
     * @param fullTaskName
     * @return
     */
    public Long queueSize(String fullTaskName);
    
    /**
     * 清空URL隊(duì)列
     * @param fullTaskName
     * @return
     */
    public void queueDump(String fullTaskName);
    
    /**
     * 是否已訪問(wèn)過(guò)
     * @param fullTaskName
     * @param url
     * @return
     */
    public Boolean hasVisit(String fullTaskName, String url);
    
    /**
     * 保存鏈接對(duì)象
     * @param fullTaskName
     * @param url
     */
    public Long saveURL(String fullTaskName, String... visitedLinkArray);
    
}
  • Step4:QueueServiceSDRImpl.java的具體實(shí)現(xiàn):
public class QueueServiceSDRImpl implements QueueService {

    private StringRedisTemplate redisTemplate;

    private static String HEAD_HISTORY = "HIST:";
    private static String HEAD_QUEUE = "QUEUE:";

    public QueueServiceSDRImpl(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Override
    public List<BasicWebURL> fetchN(String fullTaskName, Long numbersOfURL) {

        List<Object> results = redisTemplate.executePipelined(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection connection) throws DataAccessException {
                StringRedisConnection stringRedisConn = (StringRedisConnection) connection;
                for (int i = 0; i < numbersOfURL; i++) {
                    stringRedisConn.lPop(HEAD_QUEUE.concat(fullTaskName));
                }
                return null; 
            }
        });
        return results.stream().filter(obj -> obj != null).map(obj -> JSONObject.parseObject(obj.toString(), BasicWebURL.class)).collect(Collectors.toList());
    }

    @Override
    public Long enQueue(String fullTaskName, String... webURLJSONStringArray) {
        Long result = -1L;
        BoundListOperations<String, String> opt = redisTemplate.boundListOps(HEAD_QUEUE.concat(fullTaskName));
        try {
            opt.rightPushAll(webURLJSONStringArray);
        } catch (JedisException e) {
            e.printStackTrace();
        }
        return result;
    }

    @Override
    public Long queueSize(String fullTaskName) {
        return redisTemplate.boundListOps(HEAD_QUEUE.concat(fullTaskName)).size();
    }

    @Override
    public void queueDump(String fullTaskName) {
        redisTemplate.boundListOps(HEAD_QUEUE.concat(fullTaskName)).expire(1, TimeUnit.MILLISECONDS);
        redisTemplate.boundSetOps(HEAD_HISTORY.concat(fullTaskName)).expire(1, TimeUnit.MILLISECONDS);
    }

    @Override
    public Boolean hasVisit(String fullTaskName, String url) {
    
        Boolean hasVisit = false;
        try {
            hasVisit = redisTemplate.boundSetOps(HEAD_HISTORY.concat(fullTaskName)).isMember(url);
        } catch (JedisException e) {
            e.printStackTrace();
        }
        return hasVisit;
    }

    @Override
    public Long saveURL(String fullTaskName, String... visitedLinkArray) {
    
        Long result = -1L;
        try {
            result = redisTemplate.boundSetOps(HEAD_HISTORY.concat(fullTaskName)).add(visitedLinkArray);
        } catch (JedisException e) {
            e.printStackTrace();
        }
        return result;
    }

}

在QueueServiceSDRImpl中實(shí)現(xiàn)了兩種隊(duì)列(庫(kù)),庫(kù)的value分別是List和Set,特性對(duì)應(yīng)java中的List(有序)和Set(查重)各自特性。fullTaskName為Spring封裝的Redis存儲(chǔ)中的key對(duì)象。

  • Step5,最后看一下QueueController如何暴露服務(wù)接口:
@RestController
@RequestMapping()
public class QueueController {

    @Autowired QueueService queueService;
    
    /**
     * Fetch n BasicWebURLs.
     * @param request
     * @param fullTaskName
     * @param numbersOfURL
     * @return
     */
    @GetMapping("/queue/{fullTaskName}")
    public JSONObject webURL(HttpServletRequest request, 
            @PathVariable String fullTaskName, 
            @RequestParam(defaultValue="10", required=false) Long numbersOfURL) {
        
        JSONObject jo = new JSONObject();
        if(numbersOfURL > 0) {
            jo.put("popLength", numbersOfURL);
            jo.put("data", queueService.fetchN(fullTaskName, numbersOfURL));
        }else{
            jo.put("popLength", 0);
            jo.put("data", Lists.newArrayList());
        }
        jo.put("stillHas", queueService.queueSize(fullTaskName));
        return jo;
    }
    
    /**
     * 入隊(duì)
     * @param request
     * @param fullTaskName
     * @param body
     * @return
     */
    @PostMapping("/queue/{fullTaskName}")
    public Long enQueue(HttpServletRequest request, @PathVariable String fullTaskName, @RequestBody String body) {

        JSONObject jo = JSONObject.parseObject(body);
        if(jo != null){
            JSONArray webURLList = jo.getJSONArray("webURLs");
            if(!webURLList.isEmpty()) {
                String [] jsonArray = webURLList.stream().map(item -> item.toString()).toArray(String[]::new);
                return queueService.enQueue(fullTaskName, jsonArray);
            }
        }
        return -1L;
    }
    
    /**
     * 
     * @param request
     * @param fullTaskName
     * @return
     */
    @DeleteMapping("/queue/{fullTaskName}")
    public Integer queueDump(HttpServletRequest request, @PathVariable String fullTaskName) {
        queueService.queueDump(fullTaskName);
        return 1;
    }
    
}

以及在前文所述爬蟲系統(tǒng)場(chǎng)景中,用作查重的接口:

@RestController
@RequestMapping("/link")
@Getter
@Setter
public class VisitedLinkController {

    @Autowired QueueService queueService;
    
    @GetMapping("/{fullTaskName}")
    public String webURL(HttpServletRequest request, 
            @PathVariable String fullTaskName, 
            @RequestParam(defaultValue="", required = false) String link) {
        return queueService.hasVisit(fullTaskName, link) ? "y" : "n";
    }

    /**
     * 加入訪問(wèn)歷史
     * @param request
     * @param fullTaskName
     * @param body
     * @return
     */
    @PostMapping("/{fullTaskName}")
    public Boolean visitLinks(HttpServletRequest request, 
            @PathVariable String fullTaskName, @RequestBody String body) {
        
        JSONObject jo = JSONObject.parseObject(body);
        if(jo != null){
            JSONArray webURLList = jo.getJSONArray("visitedLinks");
            if(!webURLList.isEmpty()) {
                String [] jsonArray = webURLList.stream().map(item -> item.toString()).toArray(String[]::new);
                queueService.saveURL(fullTaskName, jsonArray);
            }
        }
        return true;
    }
    
}

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,525評(píng)論 19 139
  • Spring Boot 參考指南 介紹 轉(zhuǎn)載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 47,261評(píng)論 6 342
  • 昨晚和今晚把《小女生,職場(chǎng)修行記》看完,昨晚看到1.00。(熊貓眼)。我這人吧就是看書特死勁(或者說(shuō)特投入吧)。...
    seven小小柒閱讀 239評(píng)論 0 1
  • 兩個(gè)人若能成為好朋友,那一定是兩個(gè)心靈之間的互相認(rèn)可。 1. 小A和小B是室友,一起上學(xué)、一起吃飯、一起逛街、一起...
    一顆小香豬閱讀 436評(píng)論 0 2
  • 然后輸入@"[^"]*[\u4E00-\u9FA5]+[^"\n]*" 就可方便查找的所有字符串.
    一個(gè)開(kāi)發(fā)者_(dá)閱讀 1,556評(píng)論 1 2

友情鏈接更多精彩內(nèi)容