Spark 與數(shù)據(jù)的機器學習

機器學習可以從數(shù)據(jù)中得到有用的見解. 目標是縱觀Spark MLlib,采用合適的算法從數(shù)據(jù)集中生成見解。對于 Twitter的數(shù)據(jù)集, 采用非監(jiān)督集群算法來區(qū)分Apache
Spark-相關(guān)的tweets . 初始輸入是混合在一起的tweets. 首先提取相關(guān)特性, 然后在數(shù)據(jù)集中使用機器學習算法 , 最后評估結(jié)果和性能.

本章重點如下:

?了解 Spark MLlib 模塊及其算法,還有典型的機器學習流程 .

? 預處理 所采集的Twitter 數(shù)據(jù)集提取相關(guān)特性, 應用非監(jiān)督集群算法識別Apache Spark-
相關(guān)的tweets. 然后, 評估得到的模型和結(jié)果.

? 描述Spark 機器學習的流水線.

Spark MLlib 在應用架構(gòu)中的位置

先集中關(guān)注分析層,準確一點說是機器學習. 這是批處理和流處理數(shù)據(jù)學習的基礎(chǔ),它們只是推測的規(guī)則不同。

下圖指出了在分析層處理探索式數(shù)據(jù)分析的工具 Spark SQL和Pandas外,還有機器學習模塊.

4-1 Spark MLlib in Architect

Spark MLlib 算法分類

Spark MLlib 是一個更新很快的模塊,新的算法不斷地引入到 Spark中.

下圖提供了 Spark MLlib 算法的高層通覽,并根據(jù)傳統(tǒng)機器學習技術(shù)的體系或數(shù)據(jù)的連續(xù)性進行了分組:

4-2 Spark MLlib 算法分類

根據(jù)數(shù)據(jù)的類型,將 Spark MLlib 算法分成兩欄, 性質(zhì)無序或者數(shù)量連續(xù)的 . 我們將數(shù)據(jù)區(qū)分為無序的性質(zhì)數(shù)據(jù)和數(shù)量連續(xù)的數(shù)據(jù)。一個性質(zhì)數(shù)據(jù)的例子:在給定大氣壓,溫度,云的類型和呈現(xiàn), 天氣是晴朗,干燥,多雨,或者陰天時
時,預測她是否穿著男性上裝,這是離散的值。 另一方面, 給定了位置,住房面積,房間數(shù), 我們想預測房屋的價格 , 房地產(chǎn)可以通過線性回歸預測。
本例中,討論數(shù)量連續(xù)的數(shù)據(jù)。

水平分組反映了所使用的機器學習算法的類型。監(jiān)督和非監(jiān)督機器學習取決于訓練的數(shù)據(jù)是否標注。非監(jiān)督學習的挑戰(zhàn)是對非標注數(shù)據(jù)使用學習算法。目標是發(fā)現(xiàn)輸入中隱含的結(jié)構(gòu)。
而監(jiān)督式學習的數(shù)據(jù)是標注過的。 重點是對連續(xù)數(shù)據(jù)的回歸預測以及離散數(shù)據(jù)的分類.
機器學習的一個重要門類是推薦系統(tǒng),主要是利用了協(xié)同過濾技術(shù)。 Amazon 和 Netflix 有自己非常強大的 推薦系統(tǒng).

隨機梯度下降是適合于Spark分布計算的機器學習優(yōu)化技術(shù)之一。對于處理大量的文本,Spark提供了特性提取和轉(zhuǎn)換的重要的庫例如: TF-IDF , Word2Vec, standard scaler, 和normalizer.

監(jiān)督和非監(jiān)督式學習

深入研究一下Spark MLlib 提供的傳統(tǒng)的機器學習算法 . 監(jiān)督和非監(jiān)督機器學習取決于訓練的數(shù)據(jù)是否標注. 區(qū)別無序和連續(xù)取決于數(shù)據(jù)的離散或連續(xù).
下圖解釋了 Spark MLlib 監(jiān)督和非監(jiān)督時算法及預處理技術(shù) :


4-3 監(jiān)督和非監(jiān)督式學習

下面時Spakr MLlib中的監(jiān)督和非監(jiān)督算法以及與預處理技術(shù):

? Clustering 聚類: 一個非監(jiān)督式機器學習技術(shù),數(shù)據(jù)是沒有標注的,目的是數(shù)據(jù)中提取結(jié)構(gòu):


∞ K-Means:  從 K個不同聚類中進行數(shù)據(jù)分片


∞ Gaussian  Mixture: 基于組件的最大化后驗概率聚類  



∞ Power Iteration Clustering(PIC): 根據(jù)圖頂點的兩兩邊的相似程度聚類
 



∞ Latent  Dirichlet  Allocation  (LDA):  用于將文本文檔聚類成話題 




∞ Streaming  K-Means:  使用窗口函數(shù)將進入的數(shù)據(jù)流動態(tài)的進行K-Means聚類
  

? Dimensionality Reduction: 目的在減少特性的數(shù)量. 基本上, 用于削減數(shù)據(jù)噪音并關(guān)注關(guān)鍵特性:

∞ Singular  Value  Decomposition  (SVD):  將矩陣中的數(shù)據(jù)分割成簡單的有意義的小片,把初始矩陣生成3個矩陣.
∞ Principal  Component  Analysis  (PCA):  以子空間的低維數(shù)據(jù)逼近高維數(shù)據(jù)集..

? Regression and Classification: 當分類器將結(jié)果分成類時,回歸使用標注過的訓練數(shù)據(jù)來預測輸出結(jié)果。分類的變量是離散無序的,回歸的變量是連續(xù)而有序的
:

∞ Linear  Regression  Models  (linear  regression,  logistic  regression,
and  support  vector  machines): 線性回歸算法可以表達為凸優(yōu)化問題,目的是基于有權(quán)重變量的向量使目標函數(shù)最小化.  目標函數(shù)通過函數(shù)的正則部分控制函數(shù)的復雜性,通過函數(shù)的損失部分控制函數(shù)的誤差.

∞ Naive  Bayes: 基于給定標簽的條件概率來預測,基本假設(shè)是變量的相互獨立性。

∞ Decision  Trees:  它執(zhí)行了特性空間的遞歸二元劃分,為了最好的劃分分割需要將樹狀節(jié)點的信息增益最大化。 


∞ Ensembles  of  trees  (Random  Forests  and  Gradient-Boosted  Trees):
樹集成算法結(jié)合了多種決策樹模型來構(gòu)建一個高性能的模型,它們對分類和回歸是非常直觀和成功的。
  

? Isotonic Regression保序回歸: 最小化所給數(shù)據(jù)和可觀測響應的均方根誤差.

其他學習算法

Spark MLlib 還提供了很多其它的算法,廣泛地說有三種其它的機器學習算法:
推薦系統(tǒng), 優(yōu)化算法, 和特征提取.

4-4 Spark MLlib 中的其他算法

下面當前是 MLlib 中的其它算法:

? Collaborative filtering: 是推薦系統(tǒng)的基礎(chǔ),創(chuàng)建用戶-物品關(guān)聯(lián)矩陣,目標是填充差異?;谄渌脩襞c物品的關(guān)聯(lián)評分,為沒有評分的目標用戶推薦物品。在分布式計算中, ALS (short for Alternating Least Square)是最成功的算法之一:


∞ Alternating  Least  Squares交替最小二乘法:  矩陣分解技術(shù)采用了隱性反饋,時間效應和置信水平,把大規(guī)模的用戶物品矩陣分解成低維的用戶物品因子,通過替換因子最小化二次損失函數(shù)。

? Feature extraction and transformation: 這些是大規(guī)模文檔處理的基礎(chǔ),包括如下技術(shù):

∞ Term  Frequency:  搜素引擎使用Search  engines  use  TF-IDF 對語料庫中的文檔進行等級評分。機器學習中用它來判斷一個詞在文檔或語料庫中的重要性.  詞頻統(tǒng)計所確定的權(quán)重取決于它在語料庫中出現(xiàn)的頻率。 詞頻本身可能存在錯誤導向比如過分強調(diào)了類似 the,  of,  or  and  等這樣無意義的信息詞匯.  逆文檔頻率提供了特異性或者信息量的度量,詞匯在語料庫中所有文檔是否常見。  
∞ Word2Vec:  包括了兩個模型 Skip-Gram 和Continuous
 Bag  of  Word.  Skip-Gram  預測了一個給定詞匯的鄰近詞,  基于詞匯的滑動窗口; Continuous  Bag  of
Words 預測了所給定鄰近詞匯的當前詞是什么.  

∞ Standard  Scaler:  作為預處理的一部分,數(shù)據(jù)集經(jīng)常通過平均去除和方差縮放來標準化.  我們在訓練數(shù)據(jù)上計算平均和標準偏差,并應用同樣的變形來測試數(shù)據(jù).  


∞ Normalizer:  在縮放樣本時需要規(guī)范化. 對于點積或核心方法這樣的二次型非常有用。  

∞ Feature  selection: 通過選擇模型中最相關(guān)的特征來降低向量空間的維度。  


∞ Chi-Square  Selector:  一個統(tǒng)計方法來測量兩個事件的獨立性。

? Optimization: 這些 Spark MLlib 優(yōu)化算法聚焦在各種梯度下降的技術(shù)。 Spark 在分布式集群上提供了非常有效的梯度下降的實現(xiàn),通過本地極小的迭代完成梯度的快速下降。迭代所有的數(shù)據(jù)變量是計算密集型的:

∞ Stochastic  Gradient  Descent:  最小化目標函數(shù)即
微分函數(shù)的和.  隨機梯度下降僅使用了一個訓練數(shù)據(jù)的抽樣來更新一個特殊迭代的參數(shù),用來解決大規(guī)模稀疏機器學習問題例如文本分類.
  

? Limited-memory BFGS (L-BFGS): 文如其名,L-BFGS使用了有限內(nèi)存,適合于Spark MLlib 實現(xiàn)的分布式優(yōu)化算法。

Spark MLlib data types

MLlib 支持4種數(shù)據(jù)類型: local vector, labeled point, local matrix,
and distributed matrix. Spark MLlib 算法廣泛地使用了這些數(shù)據(jù)類型:

? Local vector: 位于單機,可以是緊密或稀疏的:



∞ Dense  vector 是傳統(tǒng)的 doubles 數(shù)組.  例如[5.0,  0.0,  1.0,  7.0].
  


∞ Sparse  vector  使用整數(shù)和double值.  稀疏向量  [5.0,  0.0,  1.0,  7.0]  應該是 (4,
[0,  2,  3],  [5.0,  1.0,  7.0]),表明了向量的維數(shù).

這是 PySpark中一個使用本地向量的例子:

import  numpy  as  np
import  scipy.sparse  as  sps
from  pyspark.mllib.linalg  import  Vectors

#  NumPy  array  for  dense  vector.
dvect1  =  np.array([5.0,  0.0,  1.0,  7.0])
#  Python  list  for  dense  vector.
dvect2  =  [5.0,  0.0,  1.0,  7.0]
#  SparseVector  creation
svect1  =  Vectors.sparse(4,  [0,  2,  3],  [5.0,  1.0,  7.0])
#  Sparse  vector  using  a  single-column  SciPy  csc_matrix
svect2  =  sps.csc_matrix((np.array([5.0,  1.0,  7.0]),  np.array([0,
2,  3])),  shape  =  (4,  1))

? Labeled point. 一個標注點是監(jiān)督式學習中的一個有標簽的緊密或稀疏向量. 在二元標簽中, 0.0 代表負值, 1.0 代表正值.

這有一個PySpark中標注點的例子:

 from  pyspark.mllib.linalg  import  SparseVector
from  pyspark.mllib.regression  import  LabeledPoint

#  Labeled  point  with  a  positive  label  and  a  dense  feature  vector.
lp_pos  =  LabeledPoint(1.0,  [5.0,  0.0,  1.0,  7.0])

#  Labeled  point  with  a  negative  label  and  a  sparse  feature  vector.
lp_neg  =  LabeledPoint(0.0,  SparseVector(4,  [0,  2,  3],  [5.0,  1.0,
7.0]))

? Local Matrix: 本地矩陣位于單機上,擁有整型索引和double的值.
這是一個 PySpark中本地矩陣的例子:

from  pyspark.mllib.linalg  import  Matrix,  Matrices

#  Dense  matrix  ((1.0,  2.0,  3.0),  (4.0,  5.0,  6.0))
dMatrix  =  Matrices.dense(2,  3,  [1,  2,  3,  4,  5,  6])

#  Sparse  matrix  ((9.0,  0.0),  (0.0,  8.0),  (0.0,  6.0))
sMatrix  =  Matrices.sparse(3,  2,  [0,  1,  3],  [0,  2,  1],  [9,  6,  8])

? Distributed Matrix: 充分利用 RDD的成熟性,分布式矩陣可以在集群中共享
. 有4種分布式矩陣類型: RowMatrix, IndexedRowMatrix,
CoordinateMatrix, and BlockMatrix:


∞ RowMatrix:  使用多個向量的一個  RDD,創(chuàng)建無意義索引的行分布式矩陣叫做  RowMatrix.  


∞ IndexedRowMatrix:  行索引是有意義的. 首先使用IndexedRow 類創(chuàng)建一個 帶索引行的RDDFirst,
再創(chuàng)建一個 IndexedRowMatrix.  



∞ CoordinateMatrix:  對于表達巨大而稀疏的矩陣非常有用。CoordinateMatrix 從MatrixEntry的RDD創(chuàng)建,用類型元組   (long,  long,  or float)來表示。  




∞ BlockMatrix:  從 子矩陣塊的RDDs創(chuàng)建,
子矩陣塊形如 ((blockRowIndex,  blockColIndex),
sub-matrix).

機器學習的工作流和數(shù)據(jù)流

除了算法, 機器學習還需要處理過程,我們將討論監(jiān)督和非監(jiān)督學習的典型流程和數(shù)據(jù)流.

監(jiān)督式學習工作流程

在監(jiān)督式學習中, 輸入的訓練數(shù)據(jù)集是標注過的. 一個重要的話數(shù)據(jù)實踐是分割輸入來訓練和測試, 以及嚴重相應的模式.完成監(jiān)督式學習有6個步驟:

? Collect the data: 這個步驟依賴于前面的章節(jié),保證數(shù)據(jù)正確的容量和顆粒度,使機器學習算法能夠提供可靠的答案.

? Preprocess the data: 通過抽樣檢查數(shù)據(jù)質(zhì)量,添補遺漏的數(shù)據(jù)值,縮放和規(guī)范化數(shù)據(jù)。同時,定義特征提取處理。典型地,在大文本數(shù)據(jù)集中,分詞,移除停詞,詞干提取 和 TF-IDF. 在監(jiān)督式學習中,分別將數(shù)據(jù)放入訓練和測試集。我門也實現(xiàn)了抽樣的各種策略, 為交叉檢驗分割數(shù)據(jù)集。

? Ready the data: 準備格式化的數(shù)據(jù)和算法所需的數(shù)據(jù)類型。在 Spark MLlib中, 包括 local vector, dense or sparse vectors, labeled points, local matrix, distributed matrix with row matrix, indexed row matrix, coordinate matrix, and block matrix.

? Model: 針對問題使用算法以及獲得最適合算法的評估結(jié)果,可能有多種算法適合同一問題; 它們的性能存儲在評估步驟中以便選擇性能最好的一個。 我門可以實現(xiàn)一個綜合方案或者模型組合來得到最好的結(jié)果。

? Optimize: 為了一些算法的參數(shù)優(yōu)化,需要運行網(wǎng)格搜索。這些9參數(shù)取決于訓練,測試和產(chǎn)品調(diào)優(yōu)的階段。
? Evaluate: 最終給模型打分,并綜合準確率,性能,可靠性,伸縮性 選擇最好的一個模型。 用性能最好的模型來測試數(shù)據(jù)來探明模型預測的準確性。一旦對調(diào)優(yōu)模型滿意,就可以到生產(chǎn)環(huán)境處理真正的數(shù)據(jù)了 .

監(jiān)督式機器學習的工作流程和數(shù)據(jù)流如下圖所示:

4-5 監(jiān)督式學習工作流程

非監(jiān)督式學習工作流程

與監(jiān)督式學習相對,非監(jiān)督式學習的初始數(shù)據(jù)使沒有標注的,這是真實生活的情形。通過聚類或降維算法提取數(shù)據(jù)中的結(jié)構(gòu), 在非監(jiān)督式學習中,不用分割數(shù)據(jù)到訓練和測試中,因為數(shù)據(jù)沒有標注,我們不能做任何預測。訓練數(shù)據(jù)的6個步驟與監(jiān)督式學習中的那些步驟相似。一旦模型訓練過了,將評估結(jié)果和調(diào)優(yōu)模型,然后發(fā)布到生產(chǎn)環(huán)境。

非監(jiān)督式學習是監(jiān)督式學習的初始步驟。 也即是說, 數(shù)據(jù)降維先于進入學習階段。

非監(jiān)督式機器學習的工作流程和數(shù)據(jù)流表達如下:


4-6 非監(jiān)督式學習工作流程

Twitter 數(shù)據(jù)集聚類

感受一下從Twitter提取到的數(shù)據(jù),理解數(shù)據(jù)結(jié)構(gòu),然后運行 K-Means 聚類算法 . 使用非監(jiān)督式的處理和數(shù)據(jù)流程,步驟如下:

  1. 組合所有的 tweet文件成一個 dataframe.

解析 tweets, 移除停詞,提取表情符號,提取URL, 并最終規(guī)范化詞 (如,轉(zhuǎn)化為小寫,移除標點符號和數(shù)字).

  1. 特征提取包括以下步驟:

∞ Tokenization:  將tweet的文本解析成單個的單詞或tokens  


∞ TF-IDF:  應用 TF-IDF  算法從文本分詞中創(chuàng)建特征向量  



∞ Hash  TF-IDF:  應用哈希函數(shù)的TF-IDF

4.運行 K-Means 聚類算法.

5.評估 K-Means聚類的結(jié)果:

∞ 界定 tweet 的成員關(guān)系 和聚類結(jié)果  

∞ 通過多維縮放和PCA算法執(zhí)行降維分析到兩維
  

∞ 繪制聚類
  

6.流水線:

∞ 調(diào)優(yōu)相關(guān)聚類K值數(shù)目

∞ 測量模型成本

∞ 選擇優(yōu)化的模型

在 Twitter數(shù)據(jù)集上應用Scikit-Learn

Python 有自己的 Scikit-Learn 機器學習庫,是最可靠直觀和健壯的工具之一 。 使用Pandas 和 Scikit-Learn運行預處理和非監(jiān)督式學習。 在用Spark MLlib 完成聚類之前,使用Scikit-Learn來探索數(shù)據(jù)的抽樣是非常有益的。我們混合了
7,540 tweets, 它包含了與Apache Spark,Python相關(guān)的tweets, 即將到來的總統(tǒng)選舉: 希拉里克林頓 和 唐納德 ,一些時尚相關(guān)的 tweets , Lady Gaga 和Justin Bieber的音樂. 在Twitter 數(shù)據(jù)集上使用
Scikit-Learn 并運行K-Means 聚類算法。

先將樣本數(shù)據(jù)加載到 一個 Pandas dataframe:

import  pandas  as  pd
 
csv_in  =  'C:\\Users\\Amit\\Documents\\IPython  Notebooks\\AN00_Data\\
unq_tweetstxt.csv'
  
twts_df01  =  pd.read_csv(csv_in,  sep  =';',  encoding='utf-8')

In  [24]:
csv(csv_in,  sep  =';',  encoding='utf-8')
In  [24]:

twts_df01.count()
Out[24]:
Unnamed:  0 7540
id  7540
created_at  7540
user_id 7540
user_name   7538
tweet_text  7540
dtype:  int64

#
#  Introspecting  the  tweets  text
#
In  [82]:

twtstxt_ls01[6910:6920]
Out[82]:
['RT  @deroach_Ismoke:  I  am  NOT  voting  for  #hilaryclinton  http://t.co/jaZZpcHkkJ',
'RT  @AnimalRightsJen:  #HilaryClinton  What  do  Bernie  Sanders  and Donald  Trump  Have  in  Common?:  He  has  so  far  been  th...  http://t.co/t2YRcGCh6…',
'I  understand  why  Bill  was  out  banging  other  chicks.......      I mean
look  at  what  he  is  married  to.....\n@HilaryClinton',
'#HilaryClinton  What  do  Bernie  Sanders  and  Donald  Trump  Have  in Common?:  He  has  so  far  been  th...  http://t.co/t2YRcGCh67  #Tcot #UniteBlue']

先從Tweets 的文本中做一個特征提取,使用一個有10000特征和英文停詞的TF-IDF 矢量器將數(shù)據(jù)集向量化:

In  [37]:

print("Extracting  features  from  the  training  dataset  using  a  sparse vectorizer")
t0  =  time()
Extracting  features  from  the  training  dataset  using  a  sparse
vectorizer
In  [38]:

vectorizer  =  TfidfVectorizer(max_df=0.5,  max_features=10000,
min_df=2,  stop_words='english',use_idf=True)
X  =  vectorizer.fit_transform(twtstxt_ls01)
#
#  Output  of  the  TFIDF  Feature  vectorizer
#
print("done  in  %fs"  %  (time()  -  t0))
print("n_samples:  %d,  n_features:  %d"  %  X.shape)
print()
done  in  5.232165s
n_samples:  7540,  n_features:  6638

數(shù)據(jù)集被分成擁有6638特征的7540個抽樣, 形成稀疏矩陣給 K-Means 聚類算法 ,初始選擇7個聚類和最多100次迭代:

In  [47]:

km  =  KMeans(n_clusters=7,  init='k-means++',  max_iter=100,  n_init=1,
verbose=1)

print("Clustering  sparse  data  with  %s"  %  km)
t0  =  time()
km.fit(X)
print("done  in  %0.3fs"  %  (time()  -  t0))

Clustering  sparse  data  with  KMeans(copy_x=True,  init='k-means++',  max_iter=100,  n_clusters=7,  n_init=1,
n_jobs=1,  precompute_distances='auto',  random_state=None,
tol=0.0001,verbose=1)
Initialization  complete
Iteration   0,  inertia  13635.141
Iteration   1,  inertia  6943.485
Iteration   2,  inertia  6924.093
Iteration   3,  inertia  6915.004
Iteration   4,  inertia  6909.212
Iteration   5,  inertia  6903.848
Iteration   6,  inertia  6888.606
Iteration   7,  inertia  6863.226
Iteration   8,  inertia  6860.026
Iteration   9,  inertia  6859.338
Iteration  10,  inertia  6859.213
Iteration  11,  inertia  6859.102
Iteration  12,  inertia  6859.080
Iteration  13,  inertia  6859.060
Iteration  14,  inertia  6859.047
Iteration  15,  inertia  6859.039
Iteration  16,  inertia  6859.032
Iteration  17,  inertia  6859.031
Iteration  18,  inertia  6859.029
Converged  at  iteration  18
done  in  1.701s

在18次迭代后 K-Means聚類算法收斂,根據(jù)相應的關(guān)鍵詞看一下7個聚類的結(jié)果 . Clusters 0 和6 是關(guān)于音樂和時尚的 Justin Bieber 和Lady Gaga 相關(guān)的tweets.
Clusters 1 和5 是與美國總統(tǒng)大選 Donald Trump和 Hilary Clinton相關(guān)的tweets. Clusters 2 和3 是我們感興趣的Apache Spark 和Python. Cluster 4 包含了 Thailand相關(guān)的 tweets:

#
#  Introspect  top  terms  per  cluster
#

In  [49]:

print("Top  terms  per  cluster:")
order_centroids  =  km.cluster_centers_.argsort()[:,  ::-1]
terms  =  vectorizer.get_feature_names()
for  i  in  range(7):
    print("Cluster  %d:"  %  i,  end='')
    for  ind  in  order_centroids[i,  :20]:
        print('  %s'  %  terms[ind],  end='')
        print()
Top  terms  per  cluster:
Cluster  0:  justinbieber  love  mean  rt  follow  thank  hi https
whatdoyoumean  video  wanna  hear  whatdoyoumeanviral  rorykramer  happy  lol making  person  dream  justin
Cluster  1:  donaldtrump  hilaryclinton  rt  https  trump2016
realdonaldtrump  trump  gop  amp  justinbieber  president  clinton  emails oy8ltkstze  tcot  like  berniesanders  hilary  people  email
Cluster  2:  bigdata  apachespark  hadoop  analytics  rt  spark  training chennai  ibm  datascience  apache  processing  cloudera  mapreduce  data  sap https  vora  transforming  development
Cluster  3:  apachespark  python  https  rt  spark  data  amp  databricks  using new  learn  hadoop  ibm  big  apache  continuumio  bluemix  learning  join  open
Cluster  4:  ernestsgantt  simbata3  jdhm2015  elsahel12  phuketdailynews dreamintentions  beyhiveinfrance  almtorta18  civipartnership  9_a_6
25whu72ep0  k7erhvu7wn  fdmxxxcm3h  osxuh2fxnt  5o5rmb0xhp  jnbgkqn0dj ovap57ujdh  dtzsz3lb6x  sunnysai12345  sdcvulih6g
Cluster  5:  trump  donald  donaldtrump  starbucks  trumpquote
trumpforpresident  oy8ltkstze  https  zfns7pxysx  silly  goy  stump trump2016  news  jeremy  coffee  corbyn  ok7vc8aetz  rt  tonight
Cluster  6:  ladygaga  gaga  lady  rt  https  love  follow  horror  cd  story ahshotel  american  japan  hotel  human  trafficking  music  fashion  diet queen  ahs

我們將通過畫圖來可視化結(jié)果。由于我們有6638個特征的7540個抽樣,很難多維可視化,所以通過MDS算法來降維描繪 :

import  matplotlib.pyplot  as  plt
import  matplotlib  as  mpl
from  sklearn.manifold  import  MDS

MDS()

#
#  Bring  down  the  MDS  to  two  dimensions  (components)  as  we  will  plot
#  the  clusters
#
mds  =  MDS(n_components=2,  dissimilarity="precomputed",  random_state=1)

pos  =  mds.fit_transform(dist) #  shape  (n_components,  n_samples)

xs,  ys  =  pos[:,  0],  pos[:,  1]

In  [67]:

#
#  Set  up  colors  per  clusters  using  a  dict
#
cluster_colors  =  {0:  '#1b9e77',  1:  '#d95f02',  2:  '#7570b3',  3:
'#e7298a',  4:  '#66a61e',  5:  '#9990b3',  6:  '#e8888a'}

#
#set  up  cluster  names  using  a  dict
#
cluster_names  =  {0:  'Music,  Pop',
1:  'USA  Politics,  Election',
2:  'BigData,  Spark',
3:  'Spark,  Python',
4:  'Thailand',
5:  'USA  Politics,  Election',
6:  'Music,  Pop'}
In  [115]:
#
#  ipython  magic  to  show  the  matplotlib  plots  inline
#
%matplotlib  inline

#
#  Create  data  frame  which  includes  MDS  results,  cluster  numbers  and
tweet  texts  to  be  displayed
#
df  =  pd.DataFrame(dict(x=xs,  y=ys,  label=clusters,  txt=twtstxt_ls02_
utf8))
ix_start  =  2000
ix_stop =  2050
df01  =  df[ix_start:ix_stop]

print(df01[['label','txt']])
print(len(df01))
print()

#  Group  by  cluster

groups  =  df.groupby('label')
groups01  =  df01.groupby('label')

#  Set  up  the  plot

fig,  ax  =  plt.subplots(figsize=(17,  10))
ax.margins(0.05)

#
#  Build  the  plot  object
#
for  name,  group  in  groups01:
ax.plot(group.x,  group.y,  marker='o',  linestyle='',  ms=12,
label=cluster_names[name],  color=cluster_colors[name],
mec='none')
ax.set_aspect('auto')
ax.tick_params(\
axis=  'x', #  settings  for  x-axis
which='both',   #
bottom='off',   #
top='off',  #
labelbottom='off')
ax.tick_params(\
axis=  'y', #  settings  for  y-axis
which='both',   #
left='off', #
top='off',  #
labelleft='off')

ax.legend(numpoints=1)  #
#
#  Add  label  in  x,y  position  with  tweet  text
#
for  i  in  range(ix_start,  ix_stop):
ax.text(df01.ix[i]['x'],  df01.ix[i]['y'],  df01.ix[i]['txt'],
size=10)

plt.show()  #  Display  the  plot


label   text
2000    2   b'RT  @BigDataTechCon:  '
2001    3   b"@4Quant  's  presentat"
2002    2   b'Cassandra  Summit  201'

這是Cluster 2的圖, 由藍點表示Big Data 和 Spark; Cluster 3, 由紅點表示Spark 和Python,以及相關(guān)的 tweets 內(nèi)容抽樣 :

4-7 tweet 聚類

利用 Scikit-Learn 的處理結(jié)果,已經(jīng)探索到數(shù)據(jù)的一些好的見解,現(xiàn)在關(guān)注在Twitter數(shù)據(jù)集上執(zhí)行 Spark MLlib .

預處理數(shù)據(jù)集

為了準備在數(shù)據(jù)集上運行聚類算法,現(xiàn)在聚焦特征提取和工程化。我們實例化Spark Context,讀取 Twitter 數(shù)據(jù)集到一個 Spark dataframe. 然后,對tweet文本數(shù)據(jù)連續(xù)分詞,應用哈希詞頻算法到 tokens, 并最終應用 Inverse Document Frequency 算法,重新縮放數(shù)據(jù) 。代碼如下:

In  [3]:
#
#  Read  csv  in  a  Panda  DF
#
#
import  pandas  as  pd
csv_in  =  '/home/an/spark/spark-1.5.0-bin-hadoop2.6/examples/AN_Spark/data/unq_tweetstxt.csv'
pddf_in  =  pd.read_csv(csv_in,  index_col=None,  header=0,  sep=';',encoding='utf-8')

In  [4]:

sqlContext  =  SQLContext(sc)

In  [5]:

#
#  Convert  a  Panda  DF  to  a  Spark  DF
#
#

spdf_02  =  sqlContext.createDataFrame(pddf_in[['id',  'user_id',  'user_name',  'tweet_text']])

In  [8]:

spdf_02.show()

In  [7]: 
spdf_02.take(3)

Out[7]:

[Row(id=638830426971181057,  user_id=3276255125,  user_name=u'True
Equality',  tweet_text=u'ernestsgantt:  BeyHiveInFrance:  9_A_6:
dreamintentions:  elsahel12:  simbata3:  JDHM2015:  almtorta18:
dreamintentions:\u2026  http://t.co/VpD7FoqMr0'),
Row(id=638830426727911424,  user_id=3276255125,  user_name=u'True
Equality',  tweet_text=u'ernestsgantt:  BeyHiveInFrance:
PhuketDailyNews:  dreamintentions:  elsahel12:  simbata3:  JDHM2015:
almtorta18:  CiviPa\u2026  http://t.co/VpD7FoqMr0'),
Row(id=638830425402556417,  user_id=3276255125,  user_name=u'True
Equality',  tweet_text=u'ernestsgantt:  BeyHiveInFrance:  9_A_6:
ernestsgantt:  elsahel12:  simbata3:  JDHM2015:  almtorta18:
CiviPartnership:  dr\u2026  http://t.co/EMDOn8chPK')]

In  [9]:

from  pyspark.ml.feature  import  HashingTF,  IDF,  Tokenizer

In  [10]:

#
#  Tokenize  the  tweet_text
#
tokenizer  =  Tokenizer(inputCol="tweet_text",  outputCol="tokens")
tokensData  =  tokenizer.transform(spdf_02)

In  [11]:

tokensData.take(1)

Out[11]:

[Row(id=638830426971181057,  user_id=3276255125,  user_name=u'True Equality',  tweet_text=u'ernestsgantt:  BeyHiveInFrance:9_A_6:  dreamintentions:  elsahel12:  simbata3:  JDHM2015:almtorta18:  dreamintentions:\u2026  http://t.co/VpD7FoqMr0',
tokens=[u'ernestsgantt:',  u'beyhiveinfrance:',  u'9_a_6:',
u'dreamintentions:',  u'elsahel12:',  u'simbata3:',  u'jdhm2015:',u'almtorta18:',  u'dreamintentions:\u2026',  u'http://t.co/vpd7foqmr0'])]

In  [14]:
#
#  Apply  Hashing  TF  to  the  tokens
#
hashingTF  =  HashingTF(inputCol="tokens",  outputCol="rawFeatures",
numFeatures=2000)
featuresData  =  hashingTF.transform(tokensData)

In  [15]:

featuresData.take(1)

Out[15]:

[Row(id=638830426971181057,  user_id=3276255125,  user_name=u'True Equality',  tweet_text=u'ernestsgantt:  BeyHiveInFrance:9_A_6:  dreamintentions:  elsahel12:  simbata3:  JDHM2015:almtorta18:  dreamintentions:\u2026  http://t.co/VpD7FoqMr0',
tokens=[u'ernestsgantt:',  u'beyhiveinfrance:',  u'9_a_6:',
u'dreamintentions:',  u'elsahel12:',  u'simbata3:',  u'jdhm2015:',u'almtorta18:',  u'dreamintentions:\u2026',  u'http://t.co/vpd7foqmr0'],
rawFeatures=SparseVector(2000,  {74:  1.0,  97:  1.0,  100:  1.0,  160:  1.0,185:  1.0,  742:  1.0,  856:  1.0,  991:  1.0,  1383:  1.0,  1620:  1.0}))]

In  [16]:

#
#  Apply  IDF  to  the  raw  features  and  rescale  the  data
#
idf  =  IDF(inputCol="rawFeatures",  outputCol="features")
idfModel  =  idf.fit(featuresData)
rescaledData  =  idfModel.transform(featuresData)

for  features  in  rescaledData.select("features").take(3):
    print(features)

In  [17]:

rescaledData.take(2)

Out[17]:

[Row(id=638830426971181057,  user_id=3276255125,  user_name=u'True Equality',  tweet_text=u'ernestsgantt:  BeyHiveInFrance:9_A_6:  dreamintentions:  elsahel12:  simbata3:  JDHM2015:almtorta18:  dreamintentions:\u2026  http://t.co/VpD7FoqMr0',
tokens=[u'ernestsgantt:',  u'beyhiveinfrance:',  u'9_a_6:',
u'dreamintentions:',  u'elsahel12:',  u'simbata3:',  u'jdhm2015:',
u'almtorta18:',  u'dreamintentions:\u2026',  u'http://t.co/vpd7foqmr0'],
rawFeatures=SparseVector(2000,  {74:  1.0,  97:  1.0,  100:  1.0,  160:
1.0,  185:  1.0,  742:  1.0,  856:  1.0,  991:  1.0,  1383:  1.0,  1620:  1.0}),
features=SparseVector(2000,  {74:  2.6762,  97:  1.8625,  100:  2.6384,  160:
2.9985,  185:  2.7481,  742:  5.5269,  856:  4.1406,  991:  2.9518,  1383:
4.694,  1620:  3.073})),
Row(id=638830426727911424,  user_id=3276255125,  user_name=u'True
Equality',  tweet_text=u'ernestsgantt:  BeyHiveInFrance:
PhuketDailyNews:  dreamintentions:  elsahel12:  simbata3:
JDHM2015:  almtorta18:  CiviPa\u2026  http://t.co/VpD7FoqMr0',
tokens=[u'ernestsgantt:',  u'beyhiveinfrance:',  u'phuketdailynews:',u'dreamintentions:',  u'elsahel12:',  u'simbata3:',  u'jdhm2015:',u'almtorta18:',  u'civipa\u2026',  u'http://t.co/vpd7foqmr0'],
rawFeatures=SparseVector(2000,  {74:  1.0,  97:  1.0,  100:  1.0,  160:1.0,  185:  1.0,  460:  1.0,  987:  1.0,  991:  1.0,  1383:  1.0,  1620:  1.0}),
features=SparseVector(2000,  {74:  2.6762,  97:  1.8625,  100:  2.6384,160:  2.9985,  185:  2.7481,  460:  6.4432,  987:  2.9959,  991:  2.9518,  1383:4.694,  1620:  3.073}))]

In  [21]:

rs_pddf  =  rescaledData.toPandas()

In  [22]:

rs_pddf.count()

Out[22]:

id  7540
user_id 7540
user_name   7540
tweet_text  7540
tokens  7540
rawFeatures 7540
features    7540
dtype:  int64


In  [27]:

feat_lst  =  rs_pddf.features.tolist()

In  [28]:
feat_lst[:2]

Out[28]:

[SparseVector(2000,  {74:  2.6762,  97:  1.8625,  100:  2.6384, 160:  2.9985,
185:  2.7481,  742:  5.5269,  856:  4.1406,  991:  2.9518,  1383: 4.694,  1620:
3.073}),
SparseVector(2000,  {74:  2.6762,  97:  1.8625,  100:  2.6384, 160:  2.9985,
185:  2.7481,  460:  6.4432,  987:  2.9959,  991:  2.9518,  1383: 4.694,  1620:
3.073})]

運行聚類算法

在Twitter數(shù)據(jù)集上運行 K-Means 算法, 作為非標簽的tweets, 我們希望看到Apache Spark tweets 形成一個聚類。 遵從以前的步驟, 特征的 TF-IDF 稀疏向量轉(zhuǎn)化為一個 RDD 將被輸入到 Spark MLlib 程序。我們初始化 K-Means 模型為 5 聚類, 10 次迭代:

In  [32]:

from  pyspark.mllib.clustering  import  KMeans,  KMeansModel
from  numpy  import  array
from  math  import  sqrt

In  [34]:

#  Load  and  parse  the  data


in_Data  =  sc.parallelize(feat_lst)

In  [35]:

in_Data.take(3)

Out[35]:

[SparseVector(2000,  {74:  2.6762,  97:  1.8625,  100:  2.6384, 160:  2.9985,185:  2.7481,  742:  5.5269,  856:  4.1406,  991:  2.9518,  1383: 4.694,  1620:3.073}),
SparseVector(2000,  {74:  2.6762,  97:  1.8625,  100:  2.6384, 160:  2.9985,185:  2.7481,  460:  6.4432,  987:  2.9959,  991:  2.9518,  1383: 4.694,  1620:3.073}),  
SparseVector(2000,  {20:  4.3534,  74:  2.6762,  97:  1.8625,  100:  5.2768,185:  2.7481,  856:  4.1406,  991:  2.9518,  1039:  3.073,  1620:  3.073,  1864:4.6377})]

In  [37]:

in_Data.count()

Out[37]:

7540

In  [38]:

#  Build  the  model  (cluster  the  data)

clusters  =  KMeans.train(in_Data,  5,  maxIterations=10,
runs=10,  initializationMode="random")

In  [53]:

#  Evaluate  clustering  by  computing  Within  Set  Sum  of  Squared  Errors

def  error(point):
    center  =  clusters.centers[clusters.predict(point)]
    return  sqrt(sum([x**2  for  x  in  (point  -  center)]))

WSSSE  =  in_Data.map(lambda  point:  error(point)).reduce(lambda  x,  y:  x +  y)
print("Within  Set  Sum  of  Squared  Error  =  "  +  str(WSSSE))

評估模型和結(jié)果

聚類算法調(diào)優(yōu)的一個方式是改變聚類的個數(shù)并驗證輸出. 檢查這些聚類,感受一下目前的聚類結(jié)果:

In  [43]:

cluster_membership  =  in_Data.map(lambda  x:  clusters.predict(x))

In  [54]:  
cluster_idx  =  cluster_membership.zipWithIndex()

In  [55]:

type(cluster_idx)

Out[55]:

pyspark.rdd.PipelinedRDD

In  [58]:

cluster_idx.take(20)

Out[58]:

[(3, 0),
(3, 1),
(3, 2),
(3, 3),
(3, 4),
(3, 5),
(1, 6),
(3, 7),
(3, 8),
(3, 9),
(3, 10),
(3, 11),
(3, 12),
(3, 13),
(3, 14),
(1, 15),
(3, 16),
(3, 17),
(1, 18),
(1, 19)]

In  [59]:

cluster_df  =  cluster_idx.toDF()

In  [65]:
pddf_with_cluster  =  pd.concat([pddf_in,  cluster_pddf],axis=1)

In  [76]:

pddf_with_cluster._1.unique()

Out[76]:

array([3,  1,  4,  0,  2])

In  [79]:

pddf_with_cluster[pddf_with_cluster['_1']  ==  0].head(10)

Out[79]:
Unnamed:  0 id  created_at  user_id user_name   tweet_text  _1
_2
6227    3   642418116819988480  Fri  Sep  11  19:23:09  +0000  2015
49693598    Ajinkya  Kale   RT  @bigdata:  Distributed  Matrix  Computations
i...    0   6227
6257    45  642391207205859328  Fri  Sep  11  17:36:13  +0000  2015
937467860   Angela  Bassa   [Auto]  I'm  reading  ""Distributed  Matrix
Comput...   0   6257
6297    119 642348577147064320  Fri  Sep  11  14:46:49  +0000
2015    18318677    Ben  Lorica Distributed  Matrix  Computations  in  @
ApacheSpar...   0   6297
In  [80]:

pddf_with_cluster[pddf_with_cluster['_1']  ==  1].head(10)

Out[80]:
Unnamed:  0 id  created_at  user_id user_name   tweet_text  _1
_2
6   6   638830419090079746  Tue  Sep  01  21:46:55  +0000  2015
2241040634  Massimo  Carrisi    Python:Python:  Removing  \xa0  from
string?  -  I  ...  16
15  17  638830380578045953  Tue  Sep  01  21:46:46  +0000  2015
57699376    Rafael  Monnerat    RT  @ramalhoorg:  Noite  de  autógrafos  do
Fluent  ... 115
18  41  638830280988426250  Tue  Sep  01  21:46:22  +0000  2015
951081582   Jack  Baldwin   RT  @cloudaus:  We  are  3/4  full!  2-day  @
swcarpen... 1   18
19  42  638830276626399232  Tue  Sep  01  21:46:21  +0000  2015
6525302 Masayoshi  Nakamura PynamoDB  #AWS  #DynamoDB  #Python
http://...  1   19
20  43  638830213288235008  Tue  Sep  01  21:46:06  +0000  2015
3153874869  Baltimore  Python   Flexx:  Python  UI  tookit  based  on  web
technolog...    1   20
21  44  638830117645516800  Tue  Sep  01  21:45:43  +0000  2015
48474625    Radio  Free  Denali Hmm,  emerge  --depclean  wants  to  remove
somethi...  1   21
22  46  638829977014636544  Tue  Sep  01  21:45:10  +0000  2015
154915461   Luciano  Ramalho    Noite  de  autógrafos  do  Fluent  Python  no
Garoa  ...  122
23  47  638829882928070656  Tue  Sep  01  21:44:47  +0000  2015
917320920   bsbafflesbrains @DanSWright  Harper  channeling  Monty
Python.  "...   1   23
24  48  638829868679954432  Tue  Sep  01  21:44:44  +0000  2015
134280898   Lannick  Technology RT  @SergeyKalnish:  I  am  #hiring:
Senior  Back  e...  1   24
25  49  638829707484508161  Tue  Sep  01  21:44:05  +0000  2015
2839203454  Joshua  Jones   RT  @LindseyPelas:  Surviving  Monty  Python
in  Fl...   1   25
In  [81]:

pddf_with_cluster[pddf_with_cluster['_1']  ==  2].head(10)

Out[81]:
Unnamed:  0 id  created_at  user_id user_name   tweet_text  _1
_2
7280    688 639056941592014848  Wed  Sep  02  12:47:02  +0000  2015
2735137484  Chris   A  true  gay  icon  when  will  @ladygaga  @Madonna  @...
2   7280
In  [82]:

pddf_with_cluster[pddf_with_cluster['_1']  ==  3].head(10)

Out[82]:
Unnamed:  0 id  created_at  user_id user_name   tweet_text  _1
_2
0   0   638830426971181057  Tue  Sep  01  21:46:57  +0000  2015
3276255125  True  Equality  ernestsgantt:  BeyHiveInFrance:  9_A_6:
dreamint... 3   0
1   1   638830426727911424  Tue  Sep  01  21:46:57  +0000  2015
3276255125  True  Equality  ernestsgantt:  BeyHiveInFrance:
PhuketDailyNews...  3   1
2   2   638830425402556417  Tue  Sep  01  21:46:56  +0000  2015
3276255125  True  Equality  ernestsgantt:  BeyHiveInFrance:  9_A_6:
ernestsg... 3   2
3   3   638830424563716097  Tue  Sep  01  21:46:56  +0000  2015
3276255125  True  Equality  ernestsgantt:  BeyHiveInFrance:
PhuketDailyNews...  3   3
4   4   638830422256816132  Tue  Sep  01  21:46:56  +0000  2015
3276255125  True  Equality  ernestsgantt:  elsahel12:  9_A_6:
dreamintention...   3   4
5   5   638830420159655936  Tue  Sep  01  21:46:55  +0000  2015
3276255125  True  Equality  ernestsgantt:  BeyHiveInFrance:
PhuketDailyNews...  3   5
7   7   638830418330980352  Tue  Sep  01  21:46:55  +0000  2015
3276255125  True  Equality  ernestsgantt:  elsahel12:  9_A_6:
dreamintention...   3   7
8   8   638830397648822272  Tue  Sep  01  21:46:50  +0000  2015
3276255125  True  Equality  ernestsgantt:  BeyHiveInFrance:
PhuketDailyNews...  3   8
9   9   638830395375529984  Tue  Sep  01  21:46:49  +0000  2015
3276255125  True  Equality  ernestsgantt:  elsahel12:  9_A_6:
dreamintention...   3   9
10  10  638830392389177344  Tue  Sep  01  21:46:49  +0000  2015
3276255125  True  Equality  ernestsgantt:  BeyHiveInFrance:
PhuketDailyNews...  3   10
In  [83]:

pddf_with_cluster[pddf_with_cluster['_1']  ==  4].head(10)

Out[83]:
Unnamed:  0 id  created_at  user_id user_name   tweet_text  _1
_2
1361    882 642648214454317056  Sat  Sep  12  10:37:28  +0000  2015
27415756    Raymond  Enisuoh    LA  Chosen  For  US  2024  Olympic  Bid  -
LA2016  See...  4   1361
1363    885 642647848744583168  Sat  Sep  12  10:36:01  +0000  2015
27415756    Raymond  Enisuoh    Prison  See:  https://t.co/x3EKAExeFi  …  …  …
…  …  ...   41363
5412    11  640480770369286144  Sun  Sep  06  11:04:49  +0000  2015
3242403023  Donald  Trump  2016 "  igiboooy!  @  Starbucks  https://t.
co/97wdL... 4   5412
5428    27  640477140660518912  Sun  Sep  06  10:50:24  +0000  2015
3242403023  Donald  Trump  2016 "   @  Starbucks  https://t.co/
wsEYFIefk7  "  -  D...  4   5428
5455    61  640469542272110592  Sun  Sep  06  10:20:12  +0000  2015
3242403023  Donald  Trump  2016 "  starbucks  @  Starbucks  Mam  Plaza
https://t.co... 4   5455
5456    62  640469541370372096  Sun  Sep  06  10:20:12  +0000  2015
3242403023  Donald  Trump  2016 "  Aaahhh  the  pumpkin  spice  latte  is
back,  fall...  4   5456
5457    63  640469539524898817  Sun  Sep  06  10:20:12  +0000  2015
3242403023  Donald  Trump  2016 "  RT  kayyleighferry:  Oh  my  goddd
Harry  Potter  ...  45457
5458    64  640469537176031232  Sun  Sep  06  10:20:11  +0000  2015
3242403023  Donald  Trump  2016 "  Starbucks  https://t.co/3xYYXlwNkf
"  -  Donald... 4   5458

我們以部分樣本tweet映射成5個聚類. Cluster 0 關(guān)于 Spark. Cluster 1
關(guān)于 Python. Cluster 2 關(guān)于 Lady Gaga. Cluster 3 關(guān)于Thailand's Phuket
新聞. Cluster 4 關(guān)于 Donald Trump.

構(gòu)建機器學習流水線

我們希望當優(yōu)化最佳參數(shù)來獲得最好執(zhí)行模型時,能夠組合特征提取,準備活動,訓練,測試,和預測活動。
在 Spark MLlib 中實現(xiàn)了強大的機器學習流水線,以5行代碼準確地捕獲了下面的tweet:

4-8 構(gòu)建機器學習流水線

Spark ML 流水線從 Python's Scikit-Learn 中得到了靈感,創(chuàng)建了簡潔數(shù)據(jù)連續(xù)轉(zhuǎn)換的聲明式語句可以快速地發(fā)布可調(diào)的模型。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容