由于需要做一個關(guān)于搜索的項目,數(shù)據(jù)來源于爬蟲爬取的文本數(shù)據(jù),那么就設(shè)計到一些業(yè)務(wù)問題的考慮。
首先是爬蟲的技術(shù)選型,考慮到海量的數(shù)據(jù),首先考慮的是Python的Scrapy框架,架構(gòu)圖如下:

原因當然是支持自動化爬取,只需要定義開始URL,以及解析數(shù)據(jù)的代碼和定義自己需要的Pipeline,就可以通過自己的Scheduler自動調(diào)度DownLoader爬取內(nèi)容并通過Pipeline輸出。
由于我使用的是Java爬蟲,所以可以使用一個開源的Java爬蟲框架WebMagic,一個類似于Scrapy的爬蟲框架,架構(gòu)圖如下:

爬蟲項目的主要要求主要有:
1.支持大規(guī)模數(shù)據(jù)量。
2.異步請求速度快
3.自動化調(diào)度
4.網(wǎng)頁內(nèi)容去重
5.不可重復爬取
6.敏感詞過濾
7.搜索性能高
關(guān)于前三個要求使用WebMagic就可以解決。
網(wǎng)頁內(nèi)容去重
關(guān)于網(wǎng)頁內(nèi)容去重,為什么要進行網(wǎng)頁內(nèi)容去重,在一些網(wǎng)站中,不同的URL可能爬取到的內(nèi)容可能是相同的,為了解決這種數(shù)據(jù)冗余,這里介紹幾種解決方案:
1.指紋碼對比。
最常見的去重方案是生成文檔的指紋碼。例如對一篇文章進行MD5加密生成一個字符串,我們可以認為這是文章的指紋碼,再和其他的文章指紋碼對比,一致則說明文章重復。但是這種方式是完全一致則是重復的,如果文章只是多了幾個標點符號,那仍舊被認為是重復的,這種方式并不合理。
2.布隆過濾器。
這種方式與url進行去重的方式一樣,使用在這里的話,也是對文章進行計算得到一個數(shù),再進行對比,缺點和方法1是一樣的,如果只有一點點不一樣,也會認為不重復,這種方式不合理。
3.KMP字符串匹配。
KMP算法是一種改進的字符串匹配算法。KMP算法的關(guān)鍵是利用匹配失敗后的信息,盡量減少模式串與主串的匹配次數(shù)以達到快速匹配的目的。能夠找到兩個文章有哪些是一樣的,哪些不一樣。這種方式能夠解決前而兩個方式的“只要一點不一樣就是不重復”的問題。但是它的時空復雜度太高了,不適合大數(shù)據(jù)量的重復比對。
還有一些其他的去重方式:最長公共子串、后綴數(shù)組、字典樹、DFA等等,但是這些方式的空間復雜度并不適合數(shù)據(jù)量較大的工業(yè)應(yīng)用場景。
4.SimHash
simhash是由Charikar 在2002年提出來的,為了便于理解盡量不使用數(shù)學公式,分為這幾步:
1、分詞,把需要判斷文本分詞形成這個文章的特征單詞。
2、hash,通過hash算法把每個詞變成hash值,比如“美國”通過hash算法計算為100101,“51區(qū)”通過hash算法計算為101011。這樣我們]的字符串就變成了一串串數(shù)字。
3、加權(quán),通過2步驟的hash生成結(jié)果,需要按照單詞的權(quán)重形成加權(quán)數(shù)字串,“美國”的hash值為“100101”,通過加權(quán)計算為“4 -4 -4 4 -4 4",“51區(qū)”計算為“5 -5 5 -5 5 5”。
4、 合并,把上而各個單詞算出來的序列值累加,變成只有一個序列串?!懊绹钡摹? -4 -4 4 -4 4",“51區(qū)”的“5 -5 5 -5 5 5”,把每一位進行累加,"4+5 -4-5 -4+5 4+-5 -4+5 4+5" ---> "9 -9 1 -1 1 9"
5、降維,把算出來的"9 -9 1 -1 1 9"變成01串,形成最終的simhash簽名。
簽名距離計算:
我們文木都轉(zhuǎn)換為simhash簽名,并轉(zhuǎn)換為long類型存儲,空間大大減少?,F(xiàn)在我們雖然解決了空間,但是如何計算兩個simhash的相似度呢?
我們通過海明距離(Hamming distance) 就可以計算出兩個simhash到底相似不相似。兩個simhash對應(yīng)二進制(01串)取值不同的數(shù)量稱為這兩個simhash的海明距離。
舉例如下:10101 和00110從第一位開始依次有第一位、第四、第五位不同,則海明距離為3。對于二進制字符串的a和b,海明距離為等于在a XOR b運算結(jié)果中1的個數(shù)(普遍算法)。
SimHash主要應(yīng)用于關(guān)于新聞內(nèi)容的去重。
5.基于flink的內(nèi)容去重。
Flink常見的去重方案:
1.基于狀態(tài)后端
2.基于HyperLogLog
3.基于布隆過濾器(BloomFilter)
4.基于BitMap
5.基于外部數(shù)據(jù)庫
1.基于狀態(tài)后端
Flink狀態(tài)后端的種類之一就是RocksDBStateBackend。他會將正在運行中的狀態(tài)數(shù)據(jù)保存在RocksDB數(shù)據(jù)庫中,該數(shù)據(jù)庫默認將數(shù)據(jù)存儲在TaskManager運行節(jié)點的數(shù)據(jù)目錄下。
public class MapStateDistinctFunction extends KeyedProcessFunction<String,Tuple2<String,Integer>,Tuple2<String,Integer>> {
private transient ValueState<Integer> counts;
@Override
public void open(Configuration parameters) throws Exception {
//我們設(shè)置 ValueState 的 TTL 的生命周期為24小時,到期自動清除狀態(tài)
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.minutes(24 * 60)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
//設(shè)置 ValueState 的默認值
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("skuNum", Integer.class);
descriptor.enableTimeToLive(ttlConfig);
counts = getRuntimeContext().getState(descriptor);
super.open(parameters);
}
@Override
public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
String f0 = value.f0;
//如果不存在則新增
if(counts.value() == null){
counts.update(1);
}else{
//如果存在則加1
counts.update(counts.value()+1);
}
out.collect(Tuple2.of(f0, counts.value()));
}
}
邏輯:定義一個MapStateDistinctFunction類,該類繼承了KeyesProcessFunction。核心的處理邏輯在processElement中,當有一條數(shù)據(jù)經(jīng)過時,會在MapState中判斷這條數(shù)據(jù)是否已經(jīng)存在,如果不存在那么計數(shù)為1,如果存在,那么在原來的計數(shù)加1.
注意:這里定義了狀態(tài)的過期時間是24小時,在實際生產(chǎn)中大量的key會使得狀態(tài)膨脹,可以對存儲的key進行處理,比如使用加密方法把key加密成幾個字節(jié)進行在存儲。
2.基于HyperLogLog
HyperLogLog 是一種估計統(tǒng)計算法,被用來統(tǒng)計一個集合中不同數(shù)據(jù)的個數(shù),也就是我們所說的去重統(tǒng)計。HyperLogLog 算法是用于基數(shù)統(tǒng)計的算法,每個 HyperLogLog 鍵只需要花費 12 KB 內(nèi)存,就可以計算接近 2 的 64 方個不同元素的基數(shù)。HyperLogLog 適用于大數(shù)據(jù)量的統(tǒng)計,因為成本相對來說是更低的,最多也就占用 12KB 內(nèi)存。
在不需要100%精確的業(yè)務(wù)場景喜愛,可以使用這種方法進行統(tǒng)計,新增依賴:
<dependency>
<groupId>net.agkn</groupId>
<artifactId>hll</artifactId>
<version>1.6.0</version>
</dependency>
public class HyperLogLogDistinct implements AggregateFunction<Tuple2<String,Long>,HLL,Long> {
@Override
public HLL createAccumulator() {
return new HLL(14, 5);
}
@Override
public HLL add(Tuple2<String, Long> value, HLL accumulator) {
//value 為訪問記錄 <商品sku, 用戶id>
accumulator.addRaw(value.f1);
return accumulator;
}
@Override
public Long getResult(HLL accumulator) {
long cardinality = accumulator.cardinality();
return cardinality;
}
@Override
public HLL merge(HLL a, HLL b) {
a.union(b);
return a;
}
}
在上面的代碼中,addRaw 方法用于向 HyperLogLog 中插入元素。如果插入的元素非數(shù)值型的,則需要 hash 過后才能插入。accumulator.cardinality() 方法用于計算 HyperLogLog 中元素的基數(shù)。
需要注意的是,HyperLogLog 并不是精準的去重,如果業(yè)務(wù)場景追求 100% 正確,那么一定不要使用這種方法。
3.基于布隆過濾器(BloomFilter)
public class BloomFilterDistinct extends KeyedProcessFunction<Long, String, Long> {
private transient ValueState<BloomFilter> bloomState;
private transient ValueState<Long> countState;
@Override
public void processElement(String value, Context ctx, Collector<Long> out) throws Exception {
BloomFilter bloomFilter = bloomState.value();
Long skuCount = countState.value();
if(bloomFilter == null){
BloomFilter.create(Funnels.unencodedCharsFunnel(), 10000000);
}
if(skuCount == null){
skuCount = 0L;
}
if(!bloomFilter.mightContain(value)){
bloomFilter.put(value);
skuCount = skuCount + 1;
}
bloomState.update(bloomFilter);
countState.update(skuCount);
out.collect(countState.value());
}
}
使用 Guava 自帶的 BloomFilter,每當來一條數(shù)據(jù)時,就檢查 state 中的布隆過濾器中是否存在當前的 數(shù)據(jù),如果沒有則初始化,如果有則數(shù)量加 1。
4.基于BitMap
HyperLogLog 和 BloomFilter 雖然減少了存儲但是丟失了精度, 這在某些業(yè)務(wù)場景下是無法被接受的。下面的這種方法不僅可以減少存儲,而且還可以做到完全準確,那就是使用 BitMap。
Bit-Map 的基本思想是用一個 bit 位來標記某個元素對應(yīng)的 Value,而 Key 即是該元素。由于采用了 Bit 為單位來存儲數(shù)據(jù),因此可以大大節(jié)省存儲空間。
假設(shè)有這樣一個需求:在 20 億個隨機整數(shù)中找出某個數(shù) m 是否存在其中,并假設(shè) 32 位操作系統(tǒng),4G 內(nèi)存。在 Java 中,int 占 4 字節(jié),1 字節(jié) = 8 位(1 byte = 8 bit)
如果每個數(shù)字用 int 存儲,那就是 20 億個 int,因而占用的空間約為 (2000000000*4/1024/1024/1024)≈7.45G
如果按位存儲就不一樣了,20 億個數(shù)就是 20 億位,占用空間約為 (2000000000/8/1024/1024/1024)≈0.233G
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
<version>0.8.0</version>
</dependency>
public class BitMapDistinct implements AggregateFunction<Long, Roaring64NavigableMap,Long> {
@Override
public Roaring64NavigableMap createAccumulator() {
return new Roaring64NavigableMap();
}
@Override
public Roaring64NavigableMap add(Long value, Roaring64NavigableMap accumulator) {
accumulator.add(value);
return accumulator;
}
@Override
public Long getResult(Roaring64NavigableMap accumulator) {
return accumulator.getLongCardinality();
}
@Override
public Roaring64NavigableMap merge(Roaring64NavigableMap a, Roaring64NavigableMap b) {
return null;
}
}
在上述方法中,我們使用了 Roaring64NavigableMap,其是 BitMap 的一種實現(xiàn),然后我們的數(shù)據(jù)是每次被訪問的 數(shù)據(jù),把它直接添加到 Roaring64NavigableMap 中,最后通過 accumulator.getLongCardinality() 可以直接獲取結(jié)果。
5.基于外部數(shù)據(jù)庫
假如業(yè)務(wù)場景非常復雜,并且數(shù)據(jù)量很大。為了防止無限制的狀態(tài)膨脹,也不想維護龐大的 Flink 狀態(tài),可以采用外部存儲的方式,比如可以選擇使用 Redis 或者 HBase 存儲數(shù)據(jù),只需要設(shè)計好存儲的 Key 即可。同時使用外部數(shù)據(jù)庫進行存儲,我們不需要關(guān)心 Flink 任務(wù)重啟造成的狀態(tài)丟失問題,但是有可能會出現(xiàn)因為重啟恢復導致的數(shù)據(jù)多次發(fā)送,從而導致結(jié)果數(shù)據(jù)不準的問題。
爬取URL去重
為什么要進行URL去重?
我們?yōu)榱伺廊⌒阅鼙M量的高,一般采取分布式加多線程構(gòu)建爬蟲。
分布式意味著多臺機器,那么爬取的URL可能會重復,怎么解決這個問題,以下解決方案提供參考。
1.HashSet
使用java中的HashSet不能重復的特點去重。
優(yōu)點:容易理解。使用方便。
缺點:占用內(nèi)存大,性能較低
2.Redis去重
使用Redis的set進行去重。
優(yōu)點:速度快(Redis本身速度就很快),而且去重不會占用爬蟲服務(wù)器的資源,可以處理更大數(shù)據(jù)量的數(shù)據(jù)爬取。
缺點:需要準備Redis 服務(wù)器,增加開發(fā)和使用成本。
具體實現(xiàn)可以這樣構(gòu)建:定義一個queue和一個set結(jié)構(gòu),set結(jié)構(gòu)對URL進行去重,隊列存儲爬取URL的優(yōu)先級,多個機器共享所定義的Set和Queue,就可以實現(xiàn)不會對同個URL的爬取,而且對于一個大型的網(wǎng)站可以實現(xiàn)優(yōu)先爬取和后續(xù)爬取。此外使用多線程技術(shù)可以加快爬取速度。
Redis去重主流的去重方案,而且Scrapy集成Redis方便,WebMagic也是一樣。
3.布隆過濾器(BloomFilter)
-
使用布隆過濾器也可以實現(xiàn)去重。
優(yōu)點:占用的內(nèi)存要比使用HashSet要小的多,也話合大量數(shù)據(jù)的去重操作。
缺點:有誤判的可能。沒有重復可能會判定重復,但是重復數(shù)據(jù)一定會判定重復。
布隆過濾器(Bloom Filter)是由Burton Howard Bloom于1970年提出,它是一種space efficient的概率型數(shù)據(jù)結(jié)構(gòu),用于判斷一個元素是否在集合中。在垃圾郵件過濾的黑白名單方法、爬蟲(Crawler)的網(wǎng)址判重模塊中等等經(jīng)常被用到。
哈希表也能用于判斷元索是否在集合中,但是布隆過濾器只需要哈希表的1/8或1/4的空間復雜度就能完成相同的問題。布隆過濾器可以插入元素,但不可以刪除已有元素。其中的元素越多,誤報率越大,但是漏報是不可能的。
以下是一個布隆過濾器的實現(xiàn),可以參考:
public class BloomFilter {
/**
* BitSet 初始分配2^24個bit
*/
private static final int DEFAULT_SIZE = 1 << 24;
/**
* 不同哈希函數(shù)的種子,一般應(yīng)取質(zhì)數(shù)
*/
private static int[] seeds = new int[]{ 5, 7, 11, 13, 31, 37};
private final BitSet bits = new BitSet(DEFAULT_SIZE);
/**
* 哈希函數(shù)對象
*/
private SimpleHash[] func = new SimpleHash[seeds.length];
public BloomFilter() {
for (int i = 0; i < seeds.length; i++) {
func[i] = new SimpleHash(DEFAULT_SIZE, seeds[i]);
}
}
/**
* 將str標記到bits中
* @param str str
*/
public void add(String str) {
for (SimpleHash f : func) {
bits.set(f.hash(str), true);
}
}
/**
* 說明:這個求hash的算法是在網(wǎng)上隨便找的
* 判斷bits中是否已經(jīng)包含str
* @param str str
* @return boolean
*/
public boolean contains(String str) {
// null "" " "
if (StringUtils.isBlank(str)) {
return false;
}
boolean ret = true;
// 遍歷,該value對應(yīng)的值是否都在bitset
for (SimpleHash f : func) {
ret = ret && bits.get(f.hash(str));
}
return ret;
}
}
以下提供一個爬蟲模塊的實現(xiàn)架構(gòu):
zookeeper監(jiān)控爬蟲,使用Watcher節(jié)點上線與下線監(jiān)控,使用spring boot實現(xiàn)定時與節(jié)點信息郵件發(fā)送功能。

注:zookeeper節(jié)點為臨時節(jié)點。
如有更好的方案歡迎留言交流,如有錯誤歡迎指正。如果對您有用麻煩點個贊?。。?br>
參考鏈接:https://blog.csdn.net/u013411339/article/details/112756745