使用StreamingPro 快速構(gòu)建Spark SQL on CarbonData

前言

CarbonData已經(jīng)發(fā)布了1.0版本,變更還是很快的,這個(gè)版本已經(jīng)移除了kettle了,使得部署和使用 變得很簡(jiǎn)單,而且支持1.6+ ,2.0+等多個(gè)Spark版本。

StreamingPro可以使得你很簡(jiǎn)單通過(guò)一個(gè)命令就能體驗(yàn)Carbondata,并且支持Http/JDBC的訪(fǎng)問(wèn)形態(tài)。

下載Spark發(fā)行版

比如我下載后的版本是這個(gè): spark-1.6.3-bin-hadoop2.6。

下載StreamingPro

地址在這: https://pan.baidu.com/s/1eRO5Wga ,你會(huì)得到一個(gè)比較大的Jar包。

同時(shí)你需要到maven下載一個(gè) carbondata-spark-1.0.0-incubating.jar ,這個(gè)因?yàn)橐恍┨厥庠虿艜?huì)用到。

你需要一個(gè)數(shù)據(jù)庫(kù)

因?yàn)槲覀冇玫搅薍ive 的mysql,所以你需要準(zhǔn)備一個(gè)可以連接的數(shù)據(jù)庫(kù)。只要能連接就行。如果沒(méi)有,比如你是mac的話(huà),用

brew install mysql 

即可。然后brew services start mysql

創(chuàng)建一個(gè)數(shù)據(jù)庫(kù):

create database hive CHARACTER SET latin1

//如果數(shù)據(jù)庫(kù)包字符異常啥的,啟動(dòng)完streamingpro后到數(shù)據(jù)庫(kù)做如下更改:
alter table PARTITIONS convert to character set latin1;
alter table PARTITION_KEYS convert to character set latin1;

寫(xiě)一個(gè)hive-site.xml文件

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>

<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:mysql://127.0.0.1:3306/hive?createDatabaseIfNoExist=true</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>com.mysql.jdbc.Driver</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>你的mysql賬號(hào)</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>你的mysql密碼</value>
</property>

<property>
  <name>hive.metastore.warehouse.dir</name>
  <value>file:///tmp/user/hive/warehouse</value>
</property>

<property>
<name>hive.exec.scratchdir</name>
<value>file:///tmp/hive/scratchdir</value>
</property>

<property>
 <name>hive.metastore.uris</name>
 <value></value>
</property>

<property>
  <name>datanucleus.autoCreateSchema</name>
  <value>true</value>
</property>
</configuration>

可以啟動(dòng)了

//streamingpro jar包所處的目錄,
//里面新建一個(gè)query.json文件,里面放一個(gè)大括號(hào)就行 

SHome=/Users/allwefantasy/streamingpro

./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name sql-interactive \
--jars /Users/allwefantasy/.m2/repository/org/apache/carbondata/carbondata-spark/1.0.0-incubating/carbondata-spark-1.0.0-incubating.jar \
--files $SHome/hive-site.xml \
--conf "spark.sql.hive.thriftServer.singleSession=true" \
$SHome/streamingpro-0.4.8-SNAPSHOT-online-1.6.1.jar    \
-streaming.name sql-interactive    \
-streaming.job.file.path file://$SHome/query.json \
-streaming.platform spark   \
-streaming.rest true   \
-streaming.driver.port 9004   \
-streaming.spark.service true \
-streaming.thrift true \
-streaming.enableCarbonDataSupport true \
-streaming.enableHiveSupport true \
-streaming.carbondata.store /tmp/carbondata/store \
-streaming.carbondata.meta /tmp/carbondata/meta

參數(shù)比較多。大家不用管他。 這樣http端口是9004, jdbc端口是 10000。
我們可以通過(guò)http創(chuàng)建一張表

//這里的sql是: CREATE TABLE IF NOT EXISTS test_table4(id string, name string, city string, age Int) STORED BY 'carbondata'

curl --request POST \
  --url http://127.0.0.1:9004/run/sql \
  --header 'cache-control: no-cache' \
  --header 'content-type: application/x-www-form-urlencoded' \
  --header 'postman-token: 731441ac-c398-9a1b-2f06-8725ddbe84cd' \
  --data 'sql=CREATE%20TABLE%20IF%20NOT%20EXISTS%20test_table4(id%20string%2C%20name%20string%2C%20city%20string%2C%20age%20Int)%20STORED%20BY%20'\''carbondata'\'''

寫(xiě)入數(shù)據(jù)前,我們建立一個(gè)sample.csv的文件,

id,name,city,age
1,david,shenzhen,31
2,eason,shenzhen,27
3,jarry,wuhan,35

然后將這個(gè)文件導(dǎo)入:

//實(shí)際SQL:LOAD DATA LOCAL INPATH  '/Users/allwefantasy/streamingpro/sample.csv'  INTO TABLE test_table4
curl --request POST \
  --url http://127.0.0.1:9004/run/sql \
  --header 'cache-control: no-cache' \
  --header 'content-type: application/x-www-form-urlencoded' \
  --header 'postman-token: 5eb19ab4-653c-d05f-29ab-6003d7e83755' \
  --data 'sql=LOAD%20DATA%20LOCAL%20INPATH%20%20'\''%2FUsers%2Fallwefantasy%2Fstreamingpro%2Fsample.csv'\''%20%20INTO%20TABLE%20test_table4'

這個(gè)使用我們可以用http查詢(xún):

//sql: SELECT * FROM test_table4
curl --request POST \
  --url http://127.0.0.1:9004/run/sql \
  --header 'cache-control: no-cache' \
  --header 'content-type: application/x-www-form-urlencoded' \
  --header 'postman-token: d99349ae-b226-8a4e-4d65-d92b1771c111' \
  --data 'sql=SELECT%20*%20FROM%20test_table4'

你也可以寫(xiě)一個(gè)jdbc程序:

object ScalaJdbcConnectSelect {

  def main(args: Array[String]) {
    // connect to the database named "mysql" on the localhost
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:hive2://localhost:10000/default"

    // there's probably a better way to do this
    var connection:Connection = null

    try {
      // make the connection
      Class.forName(driver)
      connection = DriverManager.getConnection(url)

      // create the statement, and run the select query
      val statement = connection.createStatement()
      val resultSet = statement.executeQuery("SELECT * FROM test_table4 ")
      while ( resultSet.next() ) {
        println(" city = "+ resultSet.getString("city") )
      }
    } catch {
      case e => e.printStackTrace
    }
    connection.close()
  }

}

完成。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容