spark LDA scala版本

本文主要介紹spark LDA的使用,并且編寫(xiě)了EMLDAOptimizer文檔推斷的方法。

大綱

  1. LDA簡(jiǎn)單介紹
  2. spark LDA代碼實(shí)例
  3. 求文檔的相似性
  4. 計(jì)算新文檔的話題分布

1. LDA簡(jiǎn)單介紹

LDA是主題模型(Topic Modeling)的一種,顧名思義,就是通過(guò)LDA模型我們可以得到一批文檔的主題。但是,LDA假設(shè)每個(gè)文檔是由多個(gè)主題mix混合而成的,而且每個(gè)主題可以由多個(gè)詞的概率表示。
LDA定義每篇文檔的生成過(guò)程,其中參數(shù)\alpha, \eta提前設(shè)定,具體步驟如下:

(a) 從D(\alpha)中取樣生成文檔j的主題分布\theta_j;

(b) 對(duì)于文檔中每個(gè)詞:

  • 從主題的multi(\theta_j)中取樣生成文檔j的第w個(gè)詞的主題z_{j,w}
  • D(\eta)中采樣得到主題z_{j,w}對(duì)應(yīng)的詞語(yǔ)分布\beta_{w, k}
  • 從詞語(yǔ)的multi(\beta_{w, k})中采樣得到x_{j, w}
    LDA
算法輸入 一批文檔,主題數(shù)K,超參數(shù)\alpha, \eta
算法輸出 1. 每個(gè)主題下詞概率分布
- 2. 每篇文檔的主題概率分布
- 3. 詞表

2. LDA代碼實(shí)例

首先描述一下使用場(chǎng)景:數(shù)據(jù)集中若干些文檔(如下),
這些文檔已經(jīng)分好詞,并且標(biāo)好索引號(hào)。建索引是為了后面查找相應(yīng)文檔的向量。

1 文化,閱讀,視頻,燃文,平臺(tái),總會(huì)計(jì)師,閱讀網(wǎng),文書(shū),協(xié)會(huì),鯨魚(yú),領(lǐng)先,小說(shuō)網(wǎng),小說(shuō),新華
2 建設(shè)工程,稅務(wù)局,welcome,平臺(tái),確認(rèn),電子,選擇,nginx,招標(biāo)網(wǎng),發(fā)票,國(guó)稅,增值稅
3 數(shù)字化,人力,會(huì)議,行健,導(dǎo)航,在線,powered,系統(tǒng),美味,星空網(wǎng),discuz,上網(wǎng),支持,遠(yuǎn)程,管家,訪問(wèn),商控,teamviewer,餐飲
……

之后的操作分為以下幾步:

  • 文本預(yù)處理(需要將文本轉(zhuǎn)換成向量)
  • LDA模型訓(xùn)練
  • 獲取話題的詞概率分布,和文檔的話題概率分布

2.1 文本預(yù)處理

在文本預(yù)處理中,會(huì)使用到token化、過(guò)濾部分詞、向量化等處理。在spark中有相應(yīng)的類(lèi),如RegexTokenizer/Tokenizer、StopWordsRemover、CountVectorizer/CountVectorizerModel。

import org.apache.spark.ml.feature.{CountVectorizerModel, RegexTokenizer, StopWordsRemover}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext}

  private case class User(
    user_id: Long,
    user: String,
    text: String)
  
  private def preprocess(
      sc: SparkContext,
      documentPaths: String,
      vocab: Array[String],
      stopwordFile: String): (RDD[(Long, Vector)], Long) = {

    val sqlContext = SQLContext.getOrCreate(sc)
    import sqlContext.implicits._

    // One document per line in each text file. 
    println(s"convert to dataframe")
    val srcDF = sc.textFile(documentPaths).repartition(800).map(_.split("\t"))
      .map{line =>
      var _text = line(2).split(",").mkString(" ")
      User(line(0).toLong, line(1), _text)}.toDF()
    //println(srcDF.show())

    val tokenizer = new RegexTokenizer().setInputCol("text").setOutputCol("words")
    val wordsData = tokenizer.transform(srcDF)
    //println(wordsData.show())

    println(s"stopWordsRemover")
    val stopword_list = sc.textFile(stopwordFile).collect()
    val stopWordsRemover = new StopWordsRemover()
      .setInputCol("words")
      .setOutputCol("tokens")
    stopWordsRemover.setStopWords(stopword_list)
    val removed_data = stopWordsRemover.transform(wordsData)
    //println(removed_data.show())

    println(s"countVectorizer fit")
    val cvm = new CountVectorizerModel(vocab)
      .setInputCol("tokens")
      .setOutputCol("features")
    val raw_documents = cvm.transform(removed_data)
    println(raw_documents.show())

    val documents = raw_documents.select("user_id", "features")
      .repartition(800)
      .map { case Row(user_id: Long, features: Vector) => (user_id, features) }
      //.sortBy(_._1)
    println(documents.toDF().show())

    println(s"return vectors")

    //println(documents.first())
    (documents,
      documents.map(_._2.numActives).sum().toLong
    ) // total token count
  }

在這個(gè)過(guò)程中,由于使用RegexTokenizer/Tokenizer、StopWordsRemover等需要將rdd轉(zhuǎn)換成DataFrame。具體的方法有兩種,詳細(xì)可見(jiàn)Interoperating with RDDs。這里采用的是第一種方法Inferring the Schema Using Reflection。

2.2 LDA訓(xùn)練模型

spark的machine learning (ML) library分兩個(gè)packages:spark.ml和spark.mllib,兩者主要的區(qū)別是spark.ml針對(duì)DataFrame提供API,而spark.mllib針對(duì)RDD提供API。在這里我們使用的是spark.mllib

It divides into two packages:

spark.mllib contains the original API built on top of RDDs.
spark.ml provides higher-level API built on top of DataFrames for constructing ML pipelines.

LDA是一種無(wú)監(jiān)督的學(xué)習(xí)方法,在訓(xùn)練前我們需要提前設(shè)置好一些參數(shù):

LDA takes in a collection of documents as vectors of word counts and the following parameters (set using the builder pattern):

- k: Number of topics (i.e., cluster centers)
- optimizer: Optimizer to use for learning the LDA model, either EMLDAOptimizer or OnlineLDAOptimizer
- docConcentration: Dirichlet parameter for prior over documents’ distributions over topics. Larger values encourage smoother inferred distributions.
- topicConcentration: Dirichlet parameter for prior over topics’ distributions over terms (words). Larger values encourage smoother inferred distributions.
- maxIterations: Limit on the number of iterations.
- checkpointInterval: If using checkpointing (set in the Spark configuration), this parameter specifies the frequency with which checkpoints will be created. If maxIterations is large, using checkpointing can help reduce shuffle file sizes on disk and help with failure recovery.

由于online的優(yōu)化方法比較耗客戶(hù)端的內(nèi)存,因此我們采用的是EMLDAOptimizer。

訓(xùn)練經(jīng)歷以下幾個(gè)步驟:

  • 設(shè)置參數(shù)
  • 開(kāi)始訓(xùn)練
  • 輸出話題,并保存模型
  private case class Params(
      input:String = "hdfs_path_to_corpus/part-00[0-2]*",
      k: Int = 200,
      maxIterations: Int = 100,
      docConcentration: Double = -1,
      topicConcentration: Double = -1,
      vocabSize: Int = 140000,
      stopwordFile: String = "",
      checkpointDir: String = "hdfs_path_to_save_model/",
      checkpointInterval: Int = 10)
      
  def main(args:Array[String]) {
    println(s"defaultParams")
    val defaultParams = Params()
    println(s"run")
    run(defaultParams)
  }
  
  private def run(params: Params) {
    println(s"create context")
    val conf = new SparkConf().setAppName(s"LDA_zsm")
    conf.set("spark.driver.maxResultSize", "4g")
    conf.set("spark.default.parallelism","800")
    conf.set("spark.ui.retainedJobs", "10")
    conf.set("spark.ui.retainedStages", "10")
    conf.set("spark.shuffle.consolidateFiles", "true")
    val sc = new SparkContext(conf)

    // Load documents, and prepare them for LDA.
    val preprocessStart = System.nanoTime()
    println(s"preprocess data")
    val (corpus, vocabArray, actualNumTokens) =
      preprocess(sc, params.input, params.vocabSize, params.stopwordFile)
    println(s"get count num")
    val actualCorpusSize = corpus.count()
    val actualVocabSize = vocabArray.size
    val preprocessElapsed = (System.nanoTime() - preprocessStart) / 1e9

    println()
    println(s"Corpus summary:")
    println(s"\t Training set size: $actualCorpusSize documents")
    println(s"\t Vocabulary size: $actualVocabSize terms")
    println(s"\t Preprocessing time: $preprocessElapsed sec")
    println()

    // Run LDA. 
    val lda = new LDA()
    println(s"choose optimizer")
    val optimizer = new EMLDAOptimizer
    
    println(s"set params")
    lda.setOptimizer(optimizer)
      .setK(params.k)
      .setMaxIterations(params.maxIterations)
      .setDocConcentration(params.docConcentration)
      .setTopicConcentration(params.topicConcentration)
      .setCheckpointInterval(params.checkpointInterval)
    if (params.checkpointDir.nonEmpty) {
      sc.setCheckpointDir(params.checkpointDir)
    }
    val startTime = System.nanoTime()
    println(s"training")
    val ldaModel = lda.run(corpus)
    val elapsed = (System.nanoTime() - startTime) / 1e9

    println(s"Finished training LDA model.  Summary:")
    println(s"\t Training time: $elapsed sec")

    // Print topics and save model
    val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 100)
    val topics = topicIndices.map { case (terms, termWeights) =>
      terms.zip(termWeights).map { case (term, weight) => (vocabArray(term.toInt), weight) }
    }
    println(s"${params.k} topics:")
    topics.zipWithIndex.foreach { case (topic, i) =>
      println(s"TOPIC $i")
      topic.foreach { case (term, weight) =>
        println(s"$term\t$weight")
      }
      println()
    }
    
    val voca_path = params.checkpointDir.concat("vocab")
    println(s"save vocab model")
    sc.parallelize(vocabArray).saveAsTextFile(voca_path)
    val model_path = params.checkpointDir.concat("model")
    println(s"save lda model")
    ldaModel.save(sc,model_path)//LocalLDAModel
    
    
    # load model
    distLDAModel = DistributedLDAModel.load(sc, model_path)
    val topicDistributionMatrix = distLDAModel.topicDistributions
    val topicMatrix = distLDAModel.topicsMatrix

    sc.stop()
  }

相關(guān)說(shuō)明:

- lda.run(corpus) # corpus類(lèi)型為RDD[(doc_id, countVector)]

- ldaModel.describeTopics(maxTermsPerTopic = 100) # 每個(gè)話題都返回[(term_id, term_weight),...]
- distLDAModel.topicDistributions #文檔的話題概率分布,返回(doc_id, vector)
- distLDAModel.topicsMatrix #話題的詞概率分布, 返回matrix(W, K)

3. 求文檔的相似性

在LDA模型最后可以得到文檔的話題概率分布,從另一個(gè)角度看,話題概率分布可以看作文檔的向量表示。因此,在此基礎(chǔ)上,我們可以得到相似文檔。

這里我們用cosine計(jì)算文檔的相似性。

sim = \frac{\textbf{a}\cdot\textbf}{ \left|\textbf{a}\right|\left|\textbf\right|}

在計(jì)算過(guò)程中我們使用了ElementwiseProduct,用來(lái)做向量的對(duì)應(yīng)相乘。

import org.apache.spark.mllib.feature.ElementwiseProduct

    val distLDAModel = DistributedLDAModel.load(sc, model_path)
    val topicDistributionMatrix = distLDAModel.topicDistributions.sortBy(_._1)

    # sample some testdata
    val nums: List[Int] = List(2304096, 14334195, 2110749, 114645, 16623381, 21556887, 5709227, 21360395, 6431072, 13329823, 2657615, 333413, 11031474)
    val testSamples = topicDistributionMatrix.filter{ case(doc_id, vector) => nums.exists(_ == doc_id)}.collect()
    for (_sample <- testSamples){
        val testId = _sample._1
        val testVector = _sample._2
        val transformer = new ElementwiseProduct(testVector)
        val similarities = topicDistributionMatrix
          .map{ case(doc_id, vector) =>
          val value = transformer.transform(vector).toArray.sum / (Vectors.norm(vector, 2) * Vectors.norm(testVector, 2))
          (testId, doc_id, value)}
          .sortBy(_._3, false)
          .take(20).foreach(println)
    }

在此說(shuō)明一下,spark也提供了求新文檔的話題概率分布的方法,如下:

val distLDAModel = DistributedLDAModel.load(sc, model_path)

val vector = distLDAModel.toLocal.topicDistributions(new_document) # new_document是用詞頻表示的向量

但是有個(gè)問(wèn)題:這個(gè)方法用的是online VB的方法,而我們?cè)谟?xùn)練過(guò)程中用的是MAP 的方法。雖然在理論上講,最后都會(huì)收斂到最優(yōu)值,但是實(shí)際上得到新文檔的話題分布和訓(xùn)練集中的話題分布很不一樣。

4. 計(jì)算新文檔的話題分布

在求解新文檔的話題分布之前,我們有必要了解一下訓(xùn)練的過(guò)程,spark中使用的是EMLDAOptimizer。雖然下面會(huì)涉及到公式,但是大家不用詳細(xì)了解推導(dǎo)過(guò)程,只需知道變量的意義。

下面對(duì)變量做些解釋?zhuān)?/p>

  • \gamma_{wjk} = P(z=k|x=w, d=j)
  • N_{wj} 表示詞w在文檔j中出現(xiàn)的次數(shù)
  • N_{wk}=\sum_j{N_{wj}\gamma_{wjk}}
  • N_{kj}=\sum_w{N_{wj}\gamma_{wjk}}
  • N_k=\sum_wN_{wk}
  • N_j=\sum_kN_{kj}
  • \hat\beta_{wk}表示每個(gè)詞在每個(gè)話題下的概率分布
  • \hat\theta_{kj}表示每篇文檔在每個(gè)話題下的概率分布

訓(xùn)練步驟如下:

  • 初始化\gamma_{wjk},并計(jì)算N_{kj}, N_{j},N_{wk}, N_{k}

  • for i in maxIterations:

    1. 根據(jù)公式1計(jì)算\gamma_{wjk}
    2. 計(jì)算N_{kj},N_{j}, N_{wk},N_{k}
    3. 根據(jù)公式2計(jì)算\hat\beta_{wk},\hat\theta_{kj}

(1)
\gamma_{wjk} \propto \frac{(N_{wk}+\eta-1)(N_{kj}+\alpha-1)}{(N_k+W\eta-W)}
(2)
\hat\beta_{wk} = \frac{N_{wk}+\eta-1}{N_k+W\eta-W}
\hat\theta_{kj} = \frac{N_{kj}+\alpha-1}{N_j+K\alpha-K}

完成訓(xùn)練過(guò)程后,我們可以得到:\hat\beta_{wk}、\hat\theta_{kj};另外,我們可以獲取W、K、\alpha。

接下來(lái)我們看看,如果求新文檔j+1\hat\theta_{k(j+1)},我們需要哪些數(shù)據(jù)?

當(dāng)新文檔j+1出現(xiàn),可得到N_{w(j+1)}

  • 初始化\gamma_{w(j+1)k},計(jì)算N_{w(j+1)},N_{j+1}
  • 開(kāi)始迭代:

i. 根據(jù)公式1以及訓(xùn)練中得到的\hat\beta_{wk}
\gamma_{w(j+1)k} \propto \hat\beta_{wk}(N_{k(j+1)}+\alpha-1)

ii. 計(jì)算N_{w(j+1)},N_{j+1}

iii. 計(jì)算\hat\theta_{k(j+1)}

主要代碼:

  private def docTopicDistribution(
    docId: Long,
    termCounts: Vector,
    W: Int,
    K: Int,
    alpha: BDV[Double],
    topicMatrixToArray: Array[Double]): DenseVector = {

    val alpha1 = alpha :- 1.0
    val Kalpha1 = alpha1 :* K.toDouble

    //Initial the parameters
    val randomSeed = docId
    var N_j = BDV.zeros[Double](K)
    var meanThetaChange = 1D
    val gamma = BDM.zeros[Double](K, W)
    var w = 0
    while (w < W){
      val random = new Random(randomSeed + w*K)
      val gamma_w = normalize(BDV.fill[Double](K)(random.nextDouble()), 1.0)
      gamma(::, w) := gamma_w
      w += 1
    }

    var k = 0
    while (k < K){
      val gamma_k = gamma(k, ::).t
      N_j(k) = gamma_k dot BDV(termCounts.toArray)
      k += 1
    }

    val random_theta = new Random(randomSeed)
    val theta = normalize(BDV.fill[Double](K)(random_theta.nextDouble()), 1.0)
    var t = 0
    while (t < 20 && meanThetaChange > 1e-7){
      // E step, gamma
      val _mu = N_j + alpha1
      w = 0
      while (w < W){
        val beta_w = BDV(topicMatrixToArray.slice(w*K, (w+1)*K))
        val gamma_w = beta_w :* _mu
        val sum_gamma = sum(gamma_w)
        gamma_w :/= sum_gamma
        gamma(::, w) := gamma_w
        w += 1
      }

      // N_kj, N_j
      k = 0
      while (k < K){
        N_j(k) = gamma(k, ::).t dot BDV(termCounts.toArray)
        k += 1
      }
      val sum_j = sum(N_j)

      // M step, theta
      val lasttheta = theta.copy
      val sumKalpha1 = Kalpha1 :+ sum_j
      theta := (N_j + alpha1) :/ sumKalpha1
      meanThetaChange = sum(BNabs(theta - lasttheta)) / K.toDouble
      println("======================================")
      println(docId, t, meanThetaChange)
      t += 1
    }
    //val inds = argtopk(theta, 20).toArray
    //val values = inds.map(i => i.toString + "," + theta(i).toString)
    //values.mkString("|") + "|||" + t.toString

    Vectors.fromBreeze(theta).asInstanceOf[DenseVector]
  }

參考文獻(xiàn)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容