聲明:由于本人也是處于學(xué)習(xí)階段,有些理解可能并不深刻,甚至?xí)y帶一定錯(cuò)誤,因此請(qǐng)以批判的態(tài)度來進(jìn)行閱讀,如有錯(cuò)誤,請(qǐng)留言或直接聯(lián)系本人。
WEEK3內(nèi)容概要:1)week2回顧;2)Design Pattern 1: In-mapper Combining;3)Design Pattern 2: Pairs vs Stripes;4)Writable(定義及相關(guān)Java語句); 5)Design Pattern 3: Order Inversion
關(guān)鍵詞:In-mapper Combining; commutative; associative; Pairs; Stripes; Term Co-occurrence; serialization; Writable; Order Inversion
Overview of Previous Lecture
1)Motivation of MapReduce
2)Data Structures in MapReduce: (key, value) pairs
3)Map and Reduce Functions
4)Hadoop MapReduce Programming
4.1)Mapper
4.2)Reducer
4.3)Combiner
4.4)Partitioner
4.5)Driver
問題,combiner function的作用和使用規(guī)則是什么?
1)To minimize the data transferred between map and reduce tasks
2)Combiner function is run on the map output
3)Both input and output data types must be consistent with the output of mapper (or input of reducer)
4)But Hadoop do not guarantee how many times it will call combiner function for a particular map output record
4.1)It is just optimization
4.2)The number of calling (even zero) does not affect the output of Reducers
5)Applicable on problems that are commutative(分配律) and associative(結(jié)合律)(注意,只有滿足交換律和結(jié)合律才能將reducer當(dāng)作combiner用。PPT.13)
(記?。?)Combiners and reducers share same method signature因此Sometimes, reducers can serve as combiners;2)Remember: combiner are optional optimizations,所以Should not affect algorithm correctness,以及May be run 0, 1, or multiple times)
Design Pattern 1: In-mapper Combining
在處理數(shù)據(jù)情況下,Twice the data, twice the running time,如果我們Twice the resources, half the running time(可以理解為使用兩臺(tái)計(jì)算機(jī)并行處理,那么速度就倍增),但是Data synchronization requires communication, but communication kills performance.因此,我們通過先進(jìn)行本地aggregation然后再通過Reducer的HTTP 請(qǐng)求傳輸給Reducer,以提高效率。我們已知,combiner是工作在mapper node中的optimization option,但是它的工作流程是mapper output(memory) -> disk -> combiner processing(memory)。因?yàn)橐ㄟ^I/0傳輸給disk,所以受限于I/O帶寬,效率也會(huì)受到影響。那么有沒有什么好的算法設(shè)計(jì),使得aggregation直接在memory中進(jìn)行呢?有,就是采用In-mapper Combining。
下面我們用用Word Count來作為例子進(jìn)行解釋:

圖一偽代碼將mapper output 直接作為reducer input,因此會(huì)有成百上千的(termt, count 1)的intermediate records 直接傳輸給reducer。由于網(wǎng)絡(luò)帶寬的限制,這會(huì)降低mapreduce的工作效率。

由此,我們采用圖二的方式,設(shè)一個(gè)ASSOCIATIVEARRAY將一個(gè)document下所有term t sum到H{t}中,然后EMIT(term t, count H{t})。
那么這里我們可以忽略combiner嗎?
不可以,因?yàn)槊看螆?zhí)行一個(gè)document,針對(duì)term t會(huì)建立一個(gè)新的ASSOCIATIVEARRAY。然而一個(gè)mapper node在一個(gè)task中可能會(huì)執(zhí)行多個(gè)document(假設(shè)n個(gè)document),而每個(gè)document中都可能有term t,那么將會(huì)生成n個(gè)關(guān)于term t的ASSOCIATIVEARRAY。于是,還需要使用combiner將這n個(gè)term t的ASSOCIATIVEARRAY進(jìn)行combine。

由于combiner的效率還是不高,于是我們采用圖三的in-mapper combining。這里的先創(chuàng)立一個(gè)總的ASSOCIATIVEARRAY,通過不斷的讀入documents來不斷的將term t的數(shù)量sum到這個(gè)總的ASSOCIATIVEARRAY中的H{t}中。
由此,我們可以總結(jié)in-mapper combining的特點(diǎn):
1)Fold the functionality of the combiner into the mapper by preserving state across multiple map calls
2)Advantages
2.1)Speed(通過網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)更加小(就wordcount而言,一個(gè)mapper傳給一個(gè)reducer只有一個(gè)pair))
2.2)It's faster than actual combiners(因?yàn)閕n-mapper combining是完全在memory中進(jìn)行的,而combiner combine數(shù)據(jù)要將map output從memory傳到disk,再從disk傳到memory進(jìn)行combine)
3)Disadvantages
3.1)Explicit memory management required
3.2)Potential for order-dependent bugs
下面我們?cè)儆靡粋€(gè)求平均值的例子更加詳細(xì)的說明in-mapper combining:

在圖四中,我們直接將mapper output作為reducer input,而省略了combiner,那么這里我們將reducer當(dāng)作combiner用可不可以呢?
不可以,因?yàn)橹挥袧M足交換律和結(jié)合律才能將reducer當(dāng)作combiner用。
(注意,這是個(gè)錯(cuò)誤示例,是為了說明combiner的使用是要滿足commutative(分配律)和 associative(結(jié)合律)的)
class MAPPER
method MAP(string t, interger r)
EMIT(string t, integer r)
class COMBINER
method COMBINER(string t, intgers[r1, r2, ...])
sum <- 0
count <- 0
for all integer r 屬于 intergers[r1, r2, ...] do
sum <- sum + r
count <- count + 1
r(avg) <- sum/count
EMIT(string t, integer r(avg))
class REDUCER
method REDUCER(string t, integers[r1(avg), r2(avg),...])
sum <- 0
count <- 0
for all integer r 屬于 intergers[r1(avg), r2(avg),...] do
sum <- sum + r(avg)
count <- count + 1
r_all(avg) <- sum/count
EMIT(string t, integer r_all(avg))
然而在這里,如果每一個(gè)mapper node的關(guān)于string t的平均值都被combiner算出來了,那么在reducer中首先得到的是每個(gè)mapper node關(guān)于string t的平均值,接著這些平均值被sum然后求average,這是不正確的。例如:Mean(1, 2, 3, 4, 5) != Mean(Mean(1, 2), Mean(3, 4, 5))

那么,我們采用圖5的方法可以嗎?
不可以,因?yàn)閙apper input的類型與reducer input 的類型不符,這里經(jīng)過combiner的combine之后,數(shù)據(jù)類型被改變了,而根據(jù)MapReduce的作業(yè)要求:Both input and output data types must be consistent with the output of mapper (or input of reducer)。所以圖五的方式不正確。

圖六的寫法是對(duì)的,然而有combiner還是不那么高效,可以將其轉(zhuǎn)化為更高效的in-mapper combining 嗎?可以,見圖七。

問題,How to Implement In-mapper Combiner in MapReduce?
1)Lifecycle: setup -> map -> cleanup
1.1)setup(): called once at the beginning of the task
1.2)map(): do the map
1.3)cleanup(): called once at the end of the task.
1.4)We do not invoke these functions
2)In-mapper Combining:
1.1)Use setup() to initialize the state preserving data structure
2.2)Use clearnup() to emit the final key-value pairs

Design Pattern 2: Pairs vs Stripes
Term Co-occurrence Computation
1)Term co-occurrence matrix for a text collection
1.1)M = N x N matrix (N = vocabulary size)
1.2)Mij: number of times i and j co-occur in some context (for concreteness, let’s say context = sentence)
1.3)specific instance of a large counting problem
1.3.1)A large event space (number of terms)
1.3.2)A large number of observations (the collection itself)
1.3.3)Goal: keep track of interesting statistics about the events
2)Basic approach
2.1)Mappers generate partial counts
2.2)Reducers aggregate partial counts
那我們用什么方法來計(jì)算Term Co-occurrence呢?
1)Pairs
2)Stripes
pairs:
1)Each mapper takes a sentence
1.1)Generate all co-occurring term pairs
1.2)For all pairs, emit (a, b) → count
2)Reducers sum up counts associated with these pairs
3)Use combiners

Pairs’ Advantages
1)Easy to implement, easy to understand
Pairs’ Disadvantages
1)Lots of pairs to sort and shuffle around (upper bound?)
2)Not many opportunities for combiners to work(因?yàn)?,相?duì)于一個(gè)詞的wordcount,相同單詞對(duì)在一個(gè)mapper node中顯得較少,因此combiner作用的機(jī)會(huì)少)
Stripes:


上圖,MAPPER中,第一個(gè)for循環(huán)找出,key:w并創(chuàng)立新ASSOCIATIVEARRAY: H(用來存儲(chǔ)stripes),然后第二個(gè)for循環(huán)找出w的neighbor word: u,然后對(duì)u計(jì)數(shù)存入H{u}。最后輸出EMIT(Term w, Stripe H)。在Reducer中,通過對(duì)以Key:w的數(shù)據(jù)類中的stripes里的neighbor words的H進(jìn)行sum,來得到一個(gè)最終的以key: w; stripe: Hf的數(shù)據(jù)類型輸出。(例如, 2個(gè)mapper各向reducer傳入:a→{b:1; d:5; e:3}; a→{b:1; c:2; d:2; f:2}, 于是reducer將他們aggregate后得到 a→{b:2; c:2; d:7; e:3; f:2})
Stripes’ Advantages
1)Far less sorting and shuffling of key-value pairs
2)Can make better use of combiners
Stripes’ Disadvantages
1)More difficult to implement
2)Underlying object more heavyweight
3)Fundamental limitation in terms of size of event space
Pairs vs. Stripes:
1)The pairs approach
1.1)Keep track of each team co-occurrence separately
1.2)Generates a large number of key-value pairs (also intermediate)
1.3)The benefit from combiners is limited, as it is less likely for a mapper to process multiple occurrences of a word
2)The stripe approach
2.1)Keep track of all terms that co-occur with the same term
2.2)Generates fewer and shorted intermediate keys
2.3)The framework has less sorting to do
2.4)Greatly benefits from combiners, as the key space is the vocabulary
2.5)More efficient, but may suffer from memory problem
3)These two design patterns are broadly useful and frequently observed in a variety of applications
3.1)Text processing, data mining, and bioinformatics
相較于Pairs,Stripes面對(duì)大量數(shù)據(jù)時(shí)體現(xiàn)出更高的時(shí)間效率。
問題,How to Implement “Pairs” and “Stripes”in MapReduce?
首先介紹以下Serialization:Process of turning structured objects into a byte stream for transmission over a network or for writing to persistent storage
Requirements:
1)Compact:To make efficient use of storage space
2)Fast:The overhead in reading and writing of data is minimal
3)Extensible:We can transparently read data written in an older format
4)Interoperable:We can read or write persistent data using different language
再介紹以下Writable Interface:Writable is a serializable object which implements a simple, efficient, serialization protocol
public interface Writable {
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}
Writable Interface要求:
1)All values must implement interface Writable
2)All keys must implement interface WritableComparable
3)context.write(WritableComparable, Writable)
3.1)You cannot use java primitives here!!
Writable 再pairs和stripes中的應(yīng)用:


Design Pattern 3: Order Inversion
在WordCount的時(shí)候我們,可能不僅需要統(tǒng)計(jì)一對(duì)單詞組(例如,computer science)的出現(xiàn)次數(shù),而且還要統(tǒng)計(jì)它(computer, science)在所有的出現(xiàn)computer的單詞對(duì)(computer, *)中的出現(xiàn)頻率,因此我們需要統(tǒng)計(jì)出所有(computer, *)(設(shè)為N(Wi, w’))的數(shù)量,然后再用(computer, science)(設(shè)為N(Wi, wj))出現(xiàn)的次數(shù)除以N(Wi, w’)。見下圖

注意:1)N(·,·) is the number of times a co-occurring word pair is observed
2)The denominator is called the marginal
這里有兩種方式統(tǒng)計(jì)詞頻:1)Stripes; 2)Pairs (統(tǒng)計(jì)詞頻是在reducer中進(jìn)行的)
1)Stripes:(這個(gè)方法面對(duì)的問題是memory,因?yàn)樗械膕tripes的活動(dòng)都得在內(nèi)存中進(jìn)行)
In the reducer, the counts of all words that co-occur with the conditioning variable (wi) are available in the associative array. Hence, the sum of all those counts gives the marginal.
a → {b1:3, b2 :12, b3 :7, b4 :1, … }
f(b1|a) = 3 / (3 + 12 + 7 + 1 + …)
2)Pairs:
Reducer直接接收pairs然后進(jìn)行詞頻統(tǒng)計(jì),然而這里有個(gè)問題,就是分母必須是在掃描了所有的(wi, *)后才能得到,不然無法進(jìn)行詞頻計(jì)算。
這里有兩種解決方式:
2.1) Fortunately, as for the mapper, also the reducer can preserve state across multiple keys. We can buffer in memory all the words that co-occur with wi and their counts. This is basically building the associative array in the stripes method.(類似于stripes的方式,問題也是memory)
但是這個(gè)方法在mapper傳來的pairs沒有被sort的時(shí)候,需要對(duì)其先按照(wi, *)的方式進(jìn)行sort,然后才是(wi, wj)的sort,然而在sort的過程中(即sort(wi, )),我們遍歷了所有(wi,)對(duì),所以分母就知道了。(所有的一切都是在memory中進(jìn)行)
這里還有個(gè)問題就是,reducer1可能索取了(wi, wj),reducer2可能索取了(wi, wq),這就會(huì)產(chǎn)生詞頻統(tǒng)計(jì)不正確的現(xiàn)象(因?yàn)榉帜稿e(cuò)誤了),所以我們需要自己設(shè)定partitioner來使一個(gè)reducer來請(qǐng)求(wi, *)的單詞對(duì)。這里的partitioner是基于采用hash value的方式,因?yàn)閔ash值是具有身份鑒別功能,所以只需要(wi, *)中的wi的hash value相同,就會(huì)被同一個(gè)reducer fetch。
2.2)將統(tǒng)計(jì)好的marginal先傳輸給reducer(例如,(pair(wi, *), all_count)),再將各個(gè)單詞對(duì)及其出現(xiàn)次數(shù)(pair(wi, wu), count)傳輸給reducer進(jìn)行詞頻計(jì)算(即 frequency_of_(wi, wu) = count/all_count)。在這里關(guān)鍵就是properly sequence data presented to reducers,那么我們就需要采用一種叫“order inversion”的方法:
(1)The mapper:
(1.1)additionally emits a “special” key of the form (wi, ?)
(1.2)The value associated to the special key is one, that represents the contribution of the word pair to the marginal
(1.3)Using combiners, these partial marginal counts will be aggregated before being sent to the reducers
(2)The reducer:
(2.1)We must make sure that the special key-value pairs are processed before any other key-value pairs where the left word is wi (define sort order)
(2.2)We also need to guarantee that all pairs associated with the same word are sent to the same reducer (use partitioner)

方法二的特點(diǎn):
(1)Memory requirements:
(1.1)Minimal, because only the marginal (an integer) needs to be stored
(1.2)No buffering of individual co-occurring word
(1.3)No scalability bottleneck
(2)Key ingredients for order inversion
(2.1)Emit a special key-value pair to capture the marginal
(2.2)Control the sort order of the intermediate key, so that the special key-value pair is processed first
(2.3)Define a custom partitioner for routing intermediate key-value pairs
Order Inversion:
(1)Common design pattern
(1.1)Computing relative frequencies requires marginal counts
(1.2)But marginal cannot be computed until you see all counts
(1.3)Buffering is a bad idea!
(1.4)Trick: getting the marginal counts to arrive at the reducer before the joint counts
(2)Optimizations
(2.1)Apply in-memory combining pattern to accumulate marginal counts