java爬蟲多線程redis隊列(爬取國美網站的商品信息)

前面那篇爬蟲文章用的是單線程沒有用到其它一些比較提高效率的工具比較遺憾,所以今天做了一個比較全面的爬蟲。首先謝謝 @天不生我萬古長這位小伙伴的留言,不然還真有點懶了。因為上班所以也只能利用周末的時間來寫了。其實這次構思了很久。本來是想爬淘寶的商品信息,但是遇到了一個坑就是ssl的證書驗證,這里糾結了半天終于繞過去了。但是由于淘寶的限制比較嚴,ip直接被限制訪問了。我也很無語,如果同樣有小伙伴遇到了https請求的證書驗證通過不了,建議去看一下這一篇博客,感覺寫的不錯。http://blog.csdn.net/u014256984/article/details/73330573 這里主要講的就是通過java代碼獲取證書文件,然后將證書文件放入到jdk下面,具體我就不細說了。說一說今天的重點。

首先說一下我的目標頁面。國美的搜索頁
國美搜索頁面.png
搜索列表頁.png

技術點

httpClient Jsoup 這些都是爬蟲最基本的,就不老生常談了。這里我說一說用的新的技術點,以及新的技術點遇到了哪些坑。

  • redis 以及redis的隊列應用
    這里用redis主要的作用就是保存需要解析的url 以及已經解析過的url兩個隊列。這里我遇到最多的問題,就是用多線程執(zhí)行的時候出現redis鏈接重置的問題。網上查了一下也沒有一個統一的答案,我也只是根據控制臺輸出的錯誤信息感覺可能是在多線程執(zhí)行的時候,redis創(chuàng)建了多次連接。為什么會創(chuàng)建多次連接就會出現重置的問題。我的猜測就是因為redis本省是不支持windows的,只是微軟在打了補丁的情況下才支持。這可能有一點影響。這方面我也沒有去深究。我的解決方案就是創(chuàng)建一個redis的單例模式。

  • mongodb
    首先說一下為什么要用mongodb

    1. mongodb是非關系型數據庫。
    2. mongodb相對于關系型數據庫他的效率要高很多很多。
    3. mongodb存儲數據理論上是沒有上限的,當然這是理論。
    4. mongodb4.5以后是天生自帶連接池的。
  • 線程池
    在處理多線程的問題的時候,如果創(chuàng)建一個線程池管理線程。其實這里的效果是非常好的。但是好是好用,坑卻特別多,一定要注意對于有些數據進行操作的時候要進行枷鎖的操作,為了保證數據的準確性。

說了這么多也感覺有點詞窮了,還是上代碼。

  • redis的隊列創(chuàng)建
package com.xdl.redisUtil;

import redis.clients.jedis.Jedis;

/**
 * 
* @ClassName: redisqueue
* redis隊列
* @author liangchu
* @date 2018-1-6 上午11:52:44 
*
 */
public class RedisQueue {
    // 這是單例
    private static  Jedis jedis = RedisSingleton.getJedisInstance();
    
    /*public RedisQueue(){
        //連接本地的 Redis 服務    
        jedis = RedisSingleton.getJedisInstance();
        
    }*/
    
    //將未訪問的url加入到toVisit表中(使用的是尾插法)
    public static void addToVisit(String url) {
        jedis.rpush("toVisit", url);
    }

    //將未訪問的url彈出進行解析
    public static String getToVisit() {
        return jedis.lpop("toVisit");
    }

    //將已經解析過的url添加到已訪問隊列中
    public static void addVisited(String url) {
        jedis.rpush("visited", url);
    }

    //判斷待訪問url隊列是否為空
    public static boolean toVisitIsEmpty() {
        Long length = jedis.llen("toVisit");

        if (length == 0) {
            return true;
        } else {
            return false;
        }
    }
    
    
}

package com.xdl.redisUtil;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;

import com.mongodb.MongoClient;

public class MultithreadCrawler {

    /**
     * @throws Exception 
     * @throws InterruptedException 
     * @Title: main
     * @Description: TODO(這里用一句話描述這個方法的作用)
     * @param @param args    參數
     * @return void 返回類型
     * @author  liangchu
     * @date 2018-1-6 下午12:19:53 
     * @throws
     */
    public static void main(String[] args) throws  Exception {
        
     //拿到種子鏈接 這里主要從這幾個方面抓取數據
        List<String> strings = new ArrayList<String>();
        strings.add("手機");
        strings.add("男裝");
        strings.add("女裝");
        strings.add("電腦");
        strings.add("相機");
        strings.add("食品");
        //將種子鏈接寫進redis數據庫的待抓取列表
        for (String url : strings) {
            RedisQueue.addToVisit("http://search.gome.com.cn/search?question="+url+"&searchType=goods&page=1");
        }
        //創(chuàng)建一個收集線程的列表
        List<Thread> threadList = new ArrayList<Thread>();
        //創(chuàng)建線程的個數
        int threadNum = 5;
        // mongodb連接
        MongoClient mongo = new MongoClient("127.0.0.1", 27017);
        RunThread run = new RunThread();
        run.setThreads(threadNum,mongo);
        //創(chuàng)建5個線程,并對其進行收集
        for (int i = 0; i < threadNum; i++) {
            Thread thread = new Thread(run);
            thread.start();
            threadList.add(thread);
        }
        //main線程需要等待所有子線程退出
        while (threadList.size() > 0) {
            Thread child = threadList.remove(0);
            child.join();
        }
    }   
}

  • run函數
package com.xdl.redisUtil;

import java.util.ArrayList;
import java.util.List;

import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.MongoClient;

public class RunThread extends Thread {
    MongoClient mongo = null;
    //線程計數器需要對所有線程可見,是共享變量
    int threads = 0;
    //redis隊列的對象,也是所有對象共享的變量
    //創(chuàng)建線程鎖
    private static Object lock = new Object();
    public void setThreads(int threads,MongoClient mongo) {
        this.threads = threads;
        this.mongo = mongo;
    }

    @SuppressWarnings("deprecation")
    public void parseToVisitUrltoRedis() throws Exception {
        //用來保存新提取出來的url列表(此變量不應是共享變量,我們把它變?yōu)槊總€線程的私有變量)
        //我們應該知道的是在Java中哪些變量在線程之間是不共享的,參考資料:
        List<String> urlList = new ArrayList<String>();
        boolean flag = true;
        while (flag) {
            //從爬蟲隊列中取出待抓取的url
            if (!RedisQueue.toVisitIsEmpty()) {
                String url = RedisQueue.getToVisit();
                /**
                 * 對此url進行解析,提取出新的url列表
                 * 解析出來的url順便就寫進urlList中了
                 *
                 * 在這個過程中不要求保證同步,每個線程都負責解析自己所屬的url,解析完成
                 * 之后將url寫入自己的urlList之中,當在解析過程中發(fā)生阻塞,則切換到其他
                 * 線程,保證程序的高并發(fā)性。
                 */
                
             // 創(chuàng)建httpclient實例  
                CloseableHttpClient httpClient = HttpClients.createDefault();  
                // 創(chuàng)建httpget實例  
                HttpGet httpGet = new HttpGet(url); 
                // 執(zhí)行http get 請求  
                CloseableHttpResponse response = null;  
                response = httpClient.execute(httpGet);  
                HttpEntity entity = response.getEntity();// 獲取返回實體  
                // EntityUtils.toString(entity,"utf-8");//獲取網頁內容,指定編碼  
                String html = EntityUtils.toString(entity, "UTF-8");  
                response.close();  
                httpClient.close();
                Document doc = Jsoup.parse(html);                               
                // 獲取產品列表信息
                Element elementP =  doc.getElementById("product-box");
                // 獲取產品列
                Elements elements = elementP.select("li[class=product-item]")
                        .select("div[class=item-tab-warp]");
               
                // 下一頁的信息就存入redis隊列當中 做下一次分析的url鏈接所用
                // 如果這個沒有數據這個線程就退出
                if(elements.size() <=0){
                    flag = false;
                    return ;
                }
                for (Element element : elements) {
                    // 獲取產品價格
                    String price = element.select("div[class=item-tab]").select("div[class=item-price-info]")
                            .select("p[class=item-price]")
                            .select("span[class=price asynPrice]").text();
                    // 獲取產品名稱 和產品鏈接
                    String producthref = element.select("p[class=item-name]")
                            .select("a[class=emcodeItem item-link]").attr("href");
                    String productTitle = element.select("p[class=item-name]")
                            .select("a[class=emcodeItem item-link]").attr("title");
                    // 評價人數
                    String productStatus = element.select("p[class=item-comment-dispatching]")
                            .select("a[class=comment]").text();
                    // 經營品牌
                    String product = element.select("p[class=item-shop]")
                            .text();
                    // 將這些信息存入mogondb中   
                    DB  db =  mongo.getDB("taobao");
                    DBCollection emp = db.getCollection("productinfo");
                    DBObject obj = new BasicDBObject();
                    obj.put("productTitle", productTitle);
                    obj.put("producthref", producthref);
                    obj.put("productStatus", productStatus);
                    obj.put("product", product);
                    obj.put("price", price);
                    emp.insert(obj);
                    // 這里我也糾結了好久要不要關,如果關了就會報錯 所以最后就沒關了如果各位有好的解決方案 記得告訴我O(∩_∩)O
                    //mongo.close(); 
                }
                // 這里是獲取它的下一頁,然后將下一頁的連接加入到redis隊列當中
               int page = Integer.parseInt(url.substring(url.lastIndexOf("=")+1))+1;
               String redisToVisit = url.substring(0, url.lastIndexOf("=")+1)+page;
               if(page >5){
                   flag = false;
                   return;
               }
                /**
                 * 在此同步塊中主要進行提取出來的url的寫操作,必須是同步操作,保證一個同
                 * 一時間只有一個線程在對Redis數據庫進行寫操作。
                 */
                synchronized(lock){
                    // 加入到redis隊列中
                    RedisQueue.addToVisit(redisToVisit);
                }
            } else {
                //在改變線程計數器的值的時候必須保證線程的同步性
                synchronized (lock) {
                    //等待線程數的計數器的計數器減1
                    threads--;
                    //如果仍然有其他線程在活動,則通知此線程進行等待
                    if (threads > 0) {
                        /*調用線程的wait方法會將此線程掛起,直到有其他線程調用notify\
                        notifyAll將此線程進行喚醒*/
                        wait();
                        threads++;
                    } else {
                        //如果其他的線程都在等待,說明待抓取隊列已空,則通知所有線程進行退出
                        notifyAll();
                        return;
                    }
                }
            }
        }
    }

    public void run() {
        //雖然run方法不能拋出異常,但是可以在run方法中進行try,catch
        try {
            parseToVisitUrltoRedis();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  • 主函數
package com.xdl.redisUtil;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;

import com.mongodb.MongoClient;

public class MultithreadCrawler {

    /**
     * @throws Exception 
     * @throws InterruptedException 
     * @Title: main
     * @Description: TODO(這里用一句話描述這個方法的作用)
     * @param @param args    參數
     * @return void 返回類型
     * @author  liangchu
     * @date 2018-1-6 下午12:19:53 
     * @throws
     */
    public static void main(String[] args) throws  Exception {
        
        //拿到種子鏈接 這里主要從 手機 服飾 電器 食品 這幾個大的方面來抓取
        List<String> strings = new ArrayList<String>();
        strings.add("手機");
        strings.add("男裝");
        strings.add("女裝");
        strings.add("電腦");
        strings.add("相機");
        strings.add("食品");
        //將種子鏈接寫進redis數據庫的待抓取列表
        for (String url : strings) {
            RedisQueue.addToVisit("http://search.gome.com.cn/search?question="+url+"&searchType=goods&page=1");
        }
        //創(chuàng)建一個收集線程的列表
        List<Thread> threadList = new ArrayList<Thread>();
        //創(chuàng)建線程的個數
        int threadNum = 1;
        MongoClient mongo = new MongoClient("127.0.0.1", 27017);
        RunThread run = new RunThread();
        run.setThreads(threadNum,mongo);
        //創(chuàng)建5個線程,并對其進行收集
        for (int i = 0; i < threadNum; i++) {
            Thread thread = new Thread(run);
            thread.start();
            threadList.add(thread);
        }
        //main線程需要等待所有子線程退出
        while (threadList.size() > 0) {
            Thread child = threadList.remove(0);
            child.join();
        }
    }   
}

商品信息列表.png

總結

不得不說加入了redis隊列和mongodb存儲數據 效率簡直要起飛了。15s不到就抓了1200條商品信息。因為有了上次的教訓不敢抓得太久,所以只抓取了1200條。如果有不怕封的小伙伴可以試試,當然后果是自負。O(∩∩)O,終于弄完了整整一天。下次加入quartz定時任務,這樣獲取股票,天氣,航班什么的都可以獲取實時的了。如果有需求的小伙伴可以留言,有時間一定完成。good night!!(*^_^*) 嘻嘻

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容