利用大數(shù)據(jù)技術(shù)分析新浪財(cái)經(jīng)美股行情

學(xué)習(xí)大數(shù)據(jù)技術(shù)有一段時(shí)間了,之前也寫(xiě)過(guò)一些零零散散的博客,作為自己學(xué)習(xí)的一些記錄,不過(guò)每篇博客都只是涵蓋部分技術(shù)。這次想寫(xiě)一篇比較完整的博客,記錄一個(gè)完整的項(xiàng)目從頭到尾生產(chǎn)的過(guò)程,也是對(duì)自己學(xué)習(xí)的一個(gè)總結(jié)

廢話(huà)不多說(shuō),直入正題

這次的項(xiàng)目涉及了兩條流程

一條是離線(xiàn)處理。爬蟲(chóng)爬到股票數(shù)據(jù)后,先交給 Map Reduce 清洗一下,生成格式化的數(shù)據(jù),然后倒入 hive 進(jìn)行分析,之后交給 sqoop 導(dǎo)出至 mysql 并用 echarts 可視化展現(xiàn)


離線(xiàn)處理

另一條是實(shí)時(shí)處理。爬蟲(chóng)一直爬取數(shù)據(jù),flume 監(jiān)控爬蟲(chóng)爬下來(lái)的文件所在目錄,并不斷傳送給 kafka,spark streaming 會(huì)定期從 kafka 那里拿到數(shù)據(jù),實(shí)時(shí)分析并將數(shù)據(jù)保存到 mysql,最后可視化。


實(shí)時(shí)處理

離線(xiàn)流程

網(wǎng)頁(yè)結(jié)構(gòu)分析

本次爬取 新浪財(cái)經(jīng)美股實(shí)時(shí)行情,頁(yè)面長(zhǎng)這樣

新浪財(cái)經(jīng)美股實(shí)時(shí)行情

F12,打開(kāi)開(kāi)發(fā)者工具,選擇 network 面板,F(xiàn)5 刷新頁(yè)面,找到股票的 json 數(shù)據(jù)的 api 接口。

這是 api 接口

不同的網(wǎng)站尋找 api 接口的方式不太一樣,給大家一個(gè)小訣竅,一般的接口都是 xhr 或 script 類(lèi)型,而且它的 url 后面一般都會(huì)跟著一個(gè) page 參數(shù),代表著這是第幾頁(yè)

雙擊 url 之后來(lái)到了一個(gè)新的頁(yè)面

股票的 json 格式數(shù)據(jù)

這里可以看到返回的數(shù)據(jù)不是標(biāo)準(zhǔn)的 json 格式,前面跟著一串 IO.XSRV2.CallbackList['QGNtUNkM_FleaeT1'] ,而且我們也可以在 url 里面看到這一串字符,現(xiàn)在在 url 里他刪掉,結(jié)果就變成了下面這樣子。

基本標(biāo)準(zhǔn)的 json 格式數(shù)據(jù)

現(xiàn)在數(shù)據(jù)的格式基本標(biāo)準(zhǔn)了,只不過(guò)最前面多了兩對(duì)小括號(hào),我們?cè)跁?huì)在爬蟲(chóng)程序里面去掉它。根據(jù)上面拿到的的 url ,開(kāi)始編寫(xiě)我們的爬蟲(chóng)。

爬取數(shù)據(jù)

爬蟲(chóng)程序我寫(xiě)了兩種方案,一種是用 python 語(yǔ)言寫(xiě)的;還有一種是使用 java 語(yǔ)言實(shí)現(xiàn)的 webmagic 框架寫(xiě)的,由于篇幅問(wèn)題,python 的方案就不在這篇博客里面采用了,以后可能會(huì)單開(kāi)一篇博客介紹 python 版的股票爬蟲(chóng)。

WebMagic 是一個(gè)國(guó)人寫(xiě)的簡(jiǎn)單靈活的Java爬蟲(chóng)框架。

要使用 webmagic ,首先下載它的依賴(lài)包 webmagic-0.7.3-all.tar.gz
在 eclipse 里面新建一個(gè) Java Project,在工程根目錄下新建一個(gè)文件夾,將依賴(lài)包解壓至文件夾中,全選之后添加到 Build Path

Add to Build Path

然后就可以寫(xiě)爬蟲(chóng)代碼了

private Site site = Site.me().setDomain("stock.finance.sina.com.cn").setSleepTime(2000)
            .setUserAgent("Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:39.0) Gecko/20100101 Firefox/39.0");

首先構(gòu)建要請(qǐng)求的 Site ,這里注意一點(diǎn),代碼最上面的 setDomain("stock.finance.sina.com.cn") 里面的這個(gè) "stock.finance.sina.com.cn" 就是下圖中的 Host,同時(shí)也是爬蟲(chóng)程序下載的網(wǎng)頁(yè)所存放的目錄

Host

然后編寫(xiě)一個(gè)方法,過(guò)濾掉前面提到的 json 數(shù)據(jù)外邊的兩對(duì)小括號(hào)

public String regexJson(String source) {      
//用于去掉包裹 json 數(shù)據(jù)的兩對(duì)小括號(hào)
        String regex = "\\{.*\\}";
        String result;
        Pattern pattern = Pattern.compile(regex);
        Matcher matcher = pattern.matcher(source);
        if (matcher.find()) {
            result = matcher.group(0);
        } else {
            result = null;
        }
        return result;
}

在 process 方法中編寫(xiě)爬蟲(chóng)邏輯

@Override
public void process(Page page) {
    // TODO Auto-generated method stub
    page.putField("sixty", regexJson(page.getJson().toString()));
}

主方法

public static void main(String[] args) {
        String url_init = "http://stock.finance.sina.com.cn/usstock/api/jsonp.php/" + 
    "/US_CategoryService.getList?num=60&sort=&asc=0&page=1";
        String url_pattern = "http://stock.finance.sina.com.cn/usstock/api/jsonp.php/" + 
            "/US_CategoryService.getList?num=60&sort=&asc=0&page=";
        String output = "/data/edu1/tmp/";
        QueueScheduler scheduler = new QueueScheduler();
        Spider spider = Spider.create(new GetStock())
                .addUrl(url_init)
                .setScheduler(scheduler)
                .addPipeline(new JsonFilePipeline(output))
                .addPipeline(new ConsolePipeline());
        for (int i = 1; i < 140; i++) {
            Request request = new Request();
            request.setUrl(url_pattern + i);
            scheduler.push(request, spider);
        }
        spider.thread(50).run();
}

爬蟲(chóng)運(yùn)行結(jié)束后,會(huì)在 /data/edu1/tmp/stock.finance.sina.com.cn 下面生成許多 json 文件,查看某一個(gè)文件,可以看到里面的 json 字符串。

json 文件內(nèi)容

接下來(lái)我們把這些文件上傳到 hdfs 上面,然后開(kāi)始編寫(xiě) MapReduce 程序清洗臟數(shù)據(jù)

hadoop fs -put /data/edu1/tmp/stock.finance.sina.com.cn/* /mystock/in

MapReduce清洗

數(shù)據(jù)清洗從名字上也看的出就是把“臟”的數(shù)據(jù)“洗掉”,指發(fā)現(xiàn)并糾正數(shù)據(jù)文件中可識(shí)別的錯(cuò)誤的最后一道程序,包括檢查數(shù)據(jù)一致性,處理無(wú)效值和缺失值等。

格式化之后的 json

這里我們把 json 格式的數(shù)據(jù)最終洗成可以直接導(dǎo)入 hive 的以 '\t' 為分隔符文本格式。而且 json 數(shù)據(jù)中有的字段會(huì)有缺失的現(xiàn)象出現(xiàn),所以我們還要填補(bǔ)空值,保持?jǐn)?shù)據(jù)的一致性

這里我們用到了阿里的 fastjson 庫(kù)來(lái)解析 json

map 代碼:

@Override
protected void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String initJsonString = value.toString();
            JSONObject initJson = JSONObject.parseObject(initJsonString);
            if (!initJsonString.contains("data")) {
                return;
            }

            JSONObject myjson = initJson.getJSONObject("sixty");

            JSONArray data = myjson.getJSONArray("data");

            for (int i = 0; i < data.size(); i++) {
                JSONObject stock = data.getJSONObject(i);
                String name = stock.getString("name").trim().equals("")?"null":stock.getString("name").trim().replaceAll("\r|\n|\t", "");
                String cname = stock.getString("cname").trim().equals("")?"null":stock.getString("cname").trim().replaceAll("\r|\n|\t", "");
                String category = stock.getString("category");
                if (category == null || category.equals("")) {
                    category = "null";
                } else {
                    category = category.toString().trim().replaceAll("\r|\n|\t", "");
                }
                System.out.println(category);
                String symbol = stock.getString("symbol").trim().equals("")?"null":stock.getString("symbol").trim().replaceAll("\r|\n|\t", "");
                String price = stock.getString("price").trim().equals("")?"null":stock.getString("price").trim().replaceAll("\r|\n|\t", "");
                String diff = stock.getString("diff").trim().equals("")?"null":stock.getString("diff").trim().replaceAll("\r|\n|\t", "");
                String chg = stock.getString("chg").trim().equals("")?"null":stock.getString("chg").trim().replaceAll("\r|\n|\t", "");
                String preclose = stock.getString("preclose").equals("")?"null":stock.getString("preclose").trim().replaceAll("\r|\n|\t", "");
                String open = stock.getString("open").trim().equals("")?"null":stock.getString("open").trim().replaceAll("\r|\n|\t", "");
                String high = stock.getString("high").trim().equals("")?"null":stock.getString("high").trim().replaceAll("\r|\n|\t", "");
                String low = stock.getString("low").trim().equals("")?"null":stock.getString("low").trim().replaceAll("\r|\n|\t", "");
                String amplitude = stock.getString("amplitude").trim().equals("")?"null":stock.getString("amplitude").trim().replaceAll("\r|\n|\t", "");
                String volume = stock.getString("volume").trim().equals("")?"null":stock.getString("volume").trim().replaceAll("\r|\n|\t", "");
                String mktcap = stock.getString("mktcap").trim().equals("")?"null":stock.getString("mktcap").trim().replaceAll("\r|\n|\t", "");
                String pe = stock.getString("pe");
                if (pe == null || pe.equals("")) {
                    pe = "null";
                } else {
                    pe = pe.trim().replaceAll("\r|\n|\t", "");
                }
                String market = stock.getString("market").trim().equals("")?"null":stock.getString("market").trim().replaceAll("\r|\n|\t", "");
                String category_id = stock.getString("category_id");
                if (category_id == null || category_id.equals("")) {
                    category_id = "null";
                } else {
                    category_id = category_id.toString().trim().replaceAll("\r|\n|\t", "");
                }
                StringBuffer sb = new StringBuffer();

                sb.append(name);            sb.append("\t");
                sb.append(cname);           sb.append("\t");
                sb.append(category);        sb.append("\t");
                sb.append(symbol);          sb.append("\t");
                sb.append(price);           sb.append("\t");
                sb.append(diff);            sb.append("\t");
                sb.append(chg);             sb.append("\t");
                sb.append(preclose);        sb.append("\t");
                sb.append(open);            sb.append("\t");
                sb.append(high);            sb.append("\t");
                sb.append(low);             sb.append("\t");
                sb.append(amplitude);       sb.append("\t");
                sb.append(volume);          sb.append("\t");
                sb.append(mktcap);          sb.append("\t");
                sb.append(pe);              sb.append("\t");
                sb.append(market);          sb.append("\t");
                sb.append(category_id);
                String result = sb.toString();
                context.write(new Text(result), new Text());
            }
}

這里解釋一下每個(gè)字段的含義

字段 含義
name # 英文名稱(chēng)
cname # 中文名稱(chēng)
category # 行業(yè)板塊
symbol # 代碼
price # 最新價(jià)
diff # 漲跌額
chg # 漲跌幅
preclose # 昨收
open # 今開(kāi)盤(pán)
high # 最高價(jià)
low # 最低價(jià)
amplitude # 振幅
volume # 成交量
mktcap # 市值(億)
pe # 市盈率
market # 上市地
category_id # 板塊ID

main 方法:

    public static void main(String[] args) throws IOException,ClassNotFoundException, InterruptedException {
            Job job = Job.getInstance();
            job.setJobName("QingXiStock");
            job.setJarByClass(QingXiStock.class);

            job.setMapperClass(doMapper.class);
            //job.setReducerClass(doReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            Path in = new Path("hdfs://localhost:9000//mystock/in");
            Path out = new Path("hdfs://localhost:9000//mystock/out");
            FileInputFormat.addInputPath(job, in);
            FileOutputFormat.setOutputPath(job, out);
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

清洗結(jié)束后,會(huì)在 hdfs 的 /mystock/out 目錄下生成新文件

清洗后的數(shù)據(jù)

導(dǎo)入 Hive ,進(jìn)行分析

數(shù)據(jù)已經(jīng)沒(méi)問(wèn)題了,接下來(lái)可以直接導(dǎo)入 hive 來(lái)進(jìn)行分析

首先進(jìn)入 hive ,新建一個(gè)數(shù)據(jù)庫(kù)

create database sina;

然后新建一個(gè)外部表 stock ,之所以新建一個(gè)外部表,是因?yàn)橥獠勘聿粫?huì)移動(dòng)數(shù)據(jù),它只是存放元數(shù)據(jù),當(dāng)外部表刪除后,只是刪除了元數(shù)據(jù),而數(shù)據(jù)不會(huì)被刪掉,所以相對(duì)更安全

create external table if not exists stock (
    > name string,
    > cname string,
    > category string,
    > symbol string,
    > price float,
    > diff float,
    > chg float,
    > preclose float,
    > open float,
    > high float,
    > low float,
    > amplitude string,
    > volume bigint,
    > mktcap bigint,
    > pe float,
    > market string,
    > category_id int
    > ) row format delimited
    > fields terminated by '\t'
    > lines terminated by '\n'
    > stored as textfile
    > location '/mystock/out/';

接下來(lái)查看一下前十條數(shù)據(jù)

select * form stock limit 10;

前十條數(shù)據(jù)

查看一下各個(gè)板塊包含的股票數(shù)量

select category,count(category) as num from stock group by category order by num desc;
各個(gè)股票板塊的股票數(shù)量

查看市值最高的十支股票

select cname,mktcap from stock order by mktcap desc limit 10;
市值最高的十支股票

查看各個(gè)上市地區(qū)的股票數(shù)量

select market,count(market) as num from stock group by market order by num desc;
各個(gè)上市地區(qū)的股票數(shù)量

查看成交量最高的十支股票

select cname,symbol,volume from stock order by volume desc limit 10;
成交量最高的十支股票

查看市盈率最高的十支股票

select cname,pe,symbol from stock order by pe desc limit 10;
市盈率最高的十支股票

漲跌幅最高的十支股票

select cname,symbol,chg from stock order by chg desc limit 10;
漲跌幅最高的十支股票

查看數(shù)據(jù)的總量一共有多少

select count(*) from stock;

8141條

Sqoop 導(dǎo)出至 MySQL

hive 分析完之后,接下來(lái)使用 sqoop 將 hive 中的數(shù)據(jù)導(dǎo)出到 mysql ,因?yàn)槲覀兊臄?shù)據(jù)量只有 8000 多條,所以這里直接導(dǎo)出整個(gè)表。

首先在 mysql 里面新建一個(gè)數(shù)據(jù)庫(kù) sina

create database if not exists sina default charset utf8 collate utf8_general_ci;

進(jìn)入數(shù)據(jù)庫(kù)后新建一個(gè)表

create table if not exists stock (
    -> name varchar(100),
    -> cname varchar(100),
    -> category varchar(100),
    -> symbol varchar(50),
    -> price float,
    -> diff float,
    -> chg float,
    -> preclose float,
    -> open float,
    -> high float,
    -> low float,
    -> amplitude varchar(50),
    -> volume bigint,
    -> mktcap bigint,
    -> pe float,
    -> market varchar(10),
    -> category_id int(10)
    -> ) default charset=utf8;

然后執(zhí)行 sqoop 導(dǎo)出命令

sqoop export \
> --connect jdbc:mysql://localhost:3306/sina?characterEncoding=UTF-8 \
> --username root \
> --password strongs \
> --table stock \
> --export-dir /user/hive/warehouse/sina.db/stock/part-r-00000 \
> --input-fields-terminated-by '\t'

導(dǎo)出過(guò)程執(zhí)行完畢后,查看一下 mysql 中的數(shù)據(jù)

select * from stock limit 10;

mysql 中的數(shù)據(jù)

查看一下數(shù)據(jù)總量,可以看到和 hive 中的一樣,是 8141 條


8141條

數(shù)據(jù)可視化

可視化過(guò)程我之前寫(xiě)過(guò)一篇博客 利用ECharts可視化mysql數(shù)據(jù)庫(kù)中的數(shù)據(jù) 和這次的道理差不多,只不過(guò)這次的 echarts 用到了 ajax 動(dòng)態(tài)加載,而且所有的請(qǐng)求都?xì)w到了一個(gè) servlet ,所以這里只貼一下 echarts 和 servlet 的代碼

工程目錄

可視化工程目錄

ServletBase.java 是一個(gè) servlet 抽象類(lèi),我們的 servlet 需要繼承它,然后實(shí)現(xiàn)里面的方法

package my.servlet;

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.lang.reflect.Method;

/**
 * Created by teaGod on 2017/12/7.
 */
@WebServlet(name = "ServletBase")
public abstract class ServletBase extends HttpServlet {
    private static final long serialVersionUID = 1L;

    @Override
    public void service(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        String methodName = request.getParameter("cmd");
        if(methodName==null || methodName.trim().equals("")){
            methodName="execute";
        }
        try{
            Method method = this.getClass()
                    .getMethod(methodName,
                            HttpServletRequest.class,
                            HttpServletResponse.class);
            method.invoke(this,request,response);
        }catch(Exception e){
            throw new RuntimeException(e);
        }
    }

    public abstract void execute(HttpServletRequest request, HttpServletResponse response)
            throws Exception;
}

ServletStock.java 根據(jù)各個(gè) jsp 傳過(guò)來(lái)的 opt 參數(shù)來(lái)確定執(zhí)行哪些邏輯

package my.servlet;

import my.entity.*;
import my.manager.StockManager;
import net.sf.json.JSONArray;
import org.apache.log4j.Logger;

import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;

/**
 * Created by teaGod on 2017/12/7.
 */
@WebServlet(name = "ServletStock", value = "/ServletStock")
public class ServletStock extends ServletBase {
    private static final long serialVersionUID = 1L;

    public static Logger logger = Logger.getLogger(ServletStock.class);

    @Override
    public void execute(HttpServletRequest request, HttpServletResponse response) throws Exception {

        response.setContentType("text/html;charset=utf-8");

        String opt = request.getParameter("opt");
        StockManager stockManager = new StockManager();

        if (opt.equals("marketCount")) {
            List<MarketCount> list;
            list = stockManager.getMarketCountList();
            writeJson(list, response);
        } else if (opt.equals("categoryCount")) {
            List<CategoryCount> list;
            list = stockManager.getCategoryCountList();
            writeJson(list, response);
        } else if (opt.equals("cnameMktnum")) {
            List<CnameMktnum> list;
            list = stockManager.getCnameMktnumList();
            writeJson(list, response);
        } else if (opt.equals("marketSumchg")) {
            List<MarketSumchg> list;
            list = stockManager.getMarketSumchgList();
            writeJson(list, response);
        } else if (opt.equals("cnameVolume")) {
            List<CnameVolume> list;
            list = stockManager.getCnameVolumeList();
            writeJson(list, response);
        } else if (opt.equals("cnameHigh")) {
              List<CnameHigh> list;
              list = stockManager.getCnameHighList();
              writeJson(list, response);
         }
    }

    private void writeJson(List list, HttpServletResponse response) throws IOException {
        JSONArray jsonArray = JSONArray.fromObject(list);
        //System.err.println(jsonArray);
        PrintWriter out = response.getWriter();
        out.print(jsonArray);
        out.flush();
        out.close();
    }
}

不同的上市地所占比例,可以看到選擇紐約證券交易所上市的股票已經(jīng)占了一半以上

不同的上市地所占比例

echarts 代碼

<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta charset="utf-8">
<title>不同上市地所占比例</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">

<%@include file="../basic/cssjs.jsp"%>

<!-- 引入Jquery包 -->
<script type="text/javascript" src="../js/jquery-2.1.4.min.js"></script>

<!-- 引入Echarts3包 -->
<script type="text/javascript" src="../js/echarts.min.js"></script>

</head>

<body>

    <%@include file="../basic/top.jsp"%>

    <!-- content start -->
    <div class="container-fluid">
        <div class="row-fluid">

            <div class="span3">
                <!-- left navbar start -->
                <%@include file="../basic/left.jsp"%>
                <!-- left navbar end -->
            </div>

            <!-- right content start -->
            <div class="span9">
                <div class="session well">餅狀圖可用來(lái)展現(xiàn)相對(duì)簡(jiǎn)單的比例構(gòu)成關(guān)系,讓觀者能從中熟悉某個(gè)項(xiàng)目與整個(gè)數(shù)據(jù)組間所存在的比例關(guān)系。</div>
                <div class="session">
                    <div id="main" style="width: 100%; height: 600px;"></div>
                </div>

            </div>
            <!-- right content end -->
        </div>
    </div>
    <!-- content end -->
</body>
</html>
<script type="text/javascript">
// 基于準(zhǔn)備好的dom,初始化echarts實(shí)例
var myChart = echarts.init(document.getElementById('main'));
// 指定圖表的配置項(xiàng)和數(shù)據(jù)
myChart.setOption({
    title : {
        text: '不同上市地所占比例',
        x:'center'
    },
    tooltip : {
        trigger: 'item',
        formatter: '{a} <br/> : {c} (u0z1t8os%)'
    },
    legend: {
        orient: 'vertical',
        y:'100',
         data: []
    },
    toolbox: {
        show : true,
        feature : {
            mark : {show: true},
            dataView : {show: true, readOnly: false},
            restore : {show: true},
            saveAsImage : {show: true}
        }
    },
    series : [
        {
            name: '上市地之間比例',
            type: 'pie',
            radius : '55%',
            center: ['50%', '60%'],
            //data:[{}],
            data:[],
            itemStyle: {
                emphasis: {
                    shadowBlur: 10,
                    shadowOffsetX: 0,
                    shadowColor: 'rgba(0, 0, 0, 0.5)'
                }
            }
        }
    ]
});

//異步加載數(shù)據(jù)
var mapOnlyKey = [];
var mapKeyValue = [];
var mapOnlyValue = [];

    var info = {"opt":"marketCount"};
    $.post("../ServletStock", info, function(data){
            mapOnlyKey.length=0;
            mapKeyValue.length=0;
            mapOnlyValue.length=0;

            for(var i=0; i < data.length; i++){
                mapOnlyKey.push(data[i].market);
                mapKeyValue.push({"value":Math.round(data[i].count), "name": data[i].market});
                mapOnlyValue.push( data[i].count );
            }
            
              //console.log(mapOnlyKey);
                //console.log(mapKeyValue);
               // console.log(mapOnlyValue);
                //return false; 
                // 填入數(shù)據(jù)
                myChart.setOption({
                    legend: {
                        //類(lèi)別
                        data: mapOnlyKey
                    },
                    series: [{
                        // 根據(jù)名字對(duì)應(yīng)到相應(yīng)的系列
                        name: '數(shù)量',
                        data:mapKeyValue
                    }]
                });
            // 使用剛指定的配置項(xiàng)和數(shù)據(jù)顯示圖表。
            }, 'json');
</script>

不同上市地的漲跌幅統(tǒng)計(jì),可以看到紐約證券交易所的漲跌幅總量最大,但這也和在這里上市的股票最多有關(guān)系,不過(guò)每個(gè)上市地都是跌幅大于漲幅的,所以你可以看到股票并不是很容易就可以玩的溜的

不同上市地的漲跌幅統(tǒng)計(jì)

echarts 代碼:

<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta charset="utf-8">
<title>不同上市地區(qū)漲跌幅統(tǒng)計(jì)</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">

<%@include file="../basic/cssjs.jsp"%>

<!-- 引入Jquery包 -->
<script type="text/javascript" src="../js/jquery-2.1.4.min.js"></script>

<!-- 引入Echarts3包 -->
<script type="text/javascript" src="../js/echarts.min.js"></script>

</head>

<body>

    <%@include file="../basic/top.jsp"%>

    <!-- content start -->
    <div class="container-fluid">
        <div class="row-fluid">

            <div class="span3">
                <!-- left navbar start -->
                <%@include file="../basic/left.jsp"%>
                <!-- left navbar end -->
            </div>

            <!-- right content start -->
            <div class="span9">
                <div class="session well">折線(xiàn)圖適合用來(lái)展現(xiàn)某個(gè)項(xiàng)目的發(fā)展趨勢(shì),或展現(xiàn)并比較多個(gè)項(xiàng)目的發(fā)展趨勢(shì)。</div>
                <div class="session">
                    <div id="main" style="width: 100%; height: 600px;"></div>
                </div>

            </div>
            <!-- right content end -->
        </div>
    </div>
    <!-- content end -->
</body>
</html>
<script type="text/javascript">
    var data = [];
    var markLineData = [];
    for ( var i = 1; i < data.length; i++) {
        markLineData.push([ {
            xAxis : i - 1,
            yAxis : data[i - 1],
            value : (data[i] + data[i - 1]).toFixed(2)
        }, {
            xAxis : i,
            yAxis : data[i]
        } ]);
    }
    option = {
        title : {
            text : '不同上市地漲跌幅統(tǒng)計(jì)',
            subtext : '單位:%'
        },
        tooltip : {
            trigger : 'axis'
        },
        toolbox: {
            show : true,
            feature : {
                mark : {show: true},
                dataView : {show: true, readOnly: false},
                magicType : {show: true, type: ['bar']},
                restore : {show: true},
                saveAsImage : {show: true}
            }
        },
        xAxis : {
            name: '上市地',
            data : []
        },
        yAxis : {},
        series : [ {
            type : 'line',
            data : data,
            markPoint : {
                data : [ {
                    type : 'max',
                    name : '最大值'
                }, {
                    type : 'min',
                    name : '最小值'
                } ]
            },
            markLine : {
                smooth : true,
                effect : {
                    show : true
                },
                distance : 10,
                label : {
                    normal : {
                        position : 'middle'
                    }
                },
                symbol : [ 'none', 'none' ],
                data : markLineData
            }
        } ]
    };
    var myChart = echarts.init(document.getElementById('main'));
    myChart.setOption(option);
    var mapOnlyKey = [];
    var mapKeyValue = [];
    var mapOnlyValue = [];
    var info = {
        "opt" : "marketSumchg"
    };
    $.post("../ServletStock", info, function(data) {
        mapOnlyKey.length = 0;
        mapKeyValue.length = 0;
        mapOnlyValue.length = 0;
        for ( var i = 0; i < data.length; i++) {
            mapOnlyKey.push(data[i].market);
            mapKeyValue.push({
                "value" : Math.round(data[i].sumchg),
                "name" : data[i].market
            });
            mapOnlyValue.push(data[i].sumchg);
        }
        myChart.setOption({
            legend : {
                data : mapOnlyKey
            },
            xAxis : [ {
                data : mapOnlyKey
            } ],
            series : [ {
                name : '百分比',
                data : mapKeyValue
            } ]
        });
    }, 'json');
</script>

最受歡迎的十大股票板塊,可以看到股權(quán)是排在第一的,排在第二的是銀行,軟件排在第五

最受歡迎的十大股票板塊

echarts 代碼:

<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta charset="utf-8">
<title>最受歡迎的十大股票板塊</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">

<%@include file="../basic/cssjs.jsp"%>

<!-- 引入Jquery包 -->
<script type="text/javascript" src="../js/jquery-2.1.4.min.js"></script>

<!-- 引入Echarts3包 -->
<script type="text/javascript" src="../js/echarts.min.js"></script>

</head>

<body>

    <%@include file="../basic/top.jsp"%>

    <!-- content start -->
    <div class="container-fluid">
        <div class="row-fluid">

            <div class="span3">
                <!-- left navbar start -->
                <%@include file="../basic/left.jsp"%>
                <!-- left navbar end -->
            </div>

            <!-- right content start -->
            <div class="span9">
                <div class="session well">柱狀圖以坐標(biāo)軸上的長(zhǎng)方形元素作為變量,以此來(lái)達(dá)到展現(xiàn)并比較數(shù)據(jù)情況的目的</div>
                <div class="session">
                    <div id="main" style="width: 100%; height: 600px;"></div>
                </div>

            </div>
            <!-- right content end -->
        </div>
    </div>
    <!-- content end -->
</body>
</html>
<script type="text/javascript">
        // 基于準(zhǔn)備好的dom,初始化echarts實(shí)例
        var myChart = echarts.init(document.getElementById('main'));
        // 指定圖表的配置項(xiàng)和數(shù)據(jù)
        myChart.setOption({
            title: {
                text: '最受歡迎的十大股票板塊'
            },
            tooltip: {
                show: true
            },
            legend: {
                data:[]
            },
            xAxis : [
                {
                    name: '行業(yè)板塊',
                    type : 'category',
                    data : []
                }
            ],
            yAxis : [
                {
                    name: '市值',
                    type : 'value'
                }
            ],
            series : [
                {
                    name:'數(shù)量',
                    type:'bar',
                    data: []
                }
            ]
        });
        
        // 異步加載數(shù)據(jù)
        var mapOnlyKey = [];
        var mapKeyValue = [];
        var mapOnlyValue = [];
        var info = {"opt": "categoryCount"};
        $.post("../ServletStock", info, function(data){
            mapOnlyKey.length=0;
            mapKeyValue.length=0;
            mapOnlyValue.length=0;
            for(var i=0; i < data.length; i++){
                mapOnlyKey.push( data[i].category);
                mapKeyValue.push({"value":Math.round(data[i].count), "name": data[i].category});
                mapOnlyValue.push( data[i].count );
            }
            console.log(mapOnlyKey);
            console.log(mapKeyValue);
            console.log(mapOnlyValue);
            
            // 填入數(shù)據(jù)
            myChart.setOption({
                legend: {
                    //類(lèi)別
                    data: mapOnlyKey
                },
                xAxis : [
                    {
                        data : mapOnlyKey
                    }
                ],
                series: [{
                    // 根據(jù)名字對(duì)應(yīng)到相應(yīng)的系列
                    name: '數(shù)量',
                    data: mapKeyValue
                }]
            });
        // 使用剛指定的配置項(xiàng)和數(shù)據(jù)顯示圖表。
        }, 'json');
        
    </script>

市值最高的十支股票,可以看到蘋(píng)果公司以超過(guò) 8000 億的數(shù)量領(lǐng)先,有個(gè)股票的名字 “HSBC Holdings, plc. Perpetual Sub Cap Secs” 比較長(zhǎng),把別的股票名字都蓋住了,阿里巴巴排在第十

市值最高的十支股票

成交量最高的十支股票


成交量最高的十支股票

到這里離線(xiàn)分析的流程就告一段落了,hadoop 適用于對(duì)時(shí)延要求不高的離線(xiàn)處理,而當(dāng)我們需要實(shí)時(shí)處理的時(shí)候,就需要用到 Storm 或者 Spark Streaming 了

實(shí)時(shí)流程

再放一遍前面的實(shí)時(shí)處理流程圖


實(shí)時(shí)處理

這次我們要實(shí)時(shí)爬取各只股票的最高價(jià),然后以 echarts 動(dòng)態(tài)圖的形式展現(xiàn)出來(lái),所以這次需要修改一下爬蟲(chóng)

爬取最高價(jià)

首先我們定義一個(gè)實(shí)體類(lèi),來(lái)封裝各只股票的名稱(chēng)與最高價(jià)

CnameHigh.java:

package my.webmagic;

public class CnameHigh {
    private String cname;
    private Float high;
    public CnameHigh() {
        super();
    }
    public CnameHigh(String cname, Float high) {
        super();
        this.cname = cname;
        this.high = high;
    }
    public String getCname() {
        return cname;
    }
    public void setCname(String cname) {
        this.cname = cname;
    }
    public Float getHigh() {
        return high;
    }
    public void setHigh(Float high) {
        this.high = high;
    }
    @Override
    public String toString() {
        return cname + ":" + high;
    }
    
}

這里我們?cè)O(shè)置了爬蟲(chóng)每爬一次就睡 1500 毫秒,比之前的爬蟲(chóng)少睡了 500 毫秒,因?yàn)槲以O(shè)置的 echarts 動(dòng)態(tài)圖是每 2000 毫秒刷新一次,所以至少要保證數(shù)據(jù)更新的速度要比顯示的速度快。

    private Site site = Site.me().setDomain("stock.finance.sina.com.cn").setSleepTime(1500)
            .setUserAgent("Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:39.0) Gecko/20100101 Firefox/39.0");

在 process 方法中修改爬蟲(chóng)的邏輯,這次我們不需要全部的 json 數(shù)據(jù),只需要其中的 cname(股票名稱(chēng)) 和 high(最高價(jià)) 字段,所以需要在爬蟲(chóng)代碼中解析 json 數(shù)據(jù),將這兩個(gè)字段取出來(lái)

public void process(Page page) {
//      page.putField("sixty", regexJson(page.getJson().toString()));
        String stockJson = regexJson(page.getJson().toString());
        if (!stockJson.contains("data") || !stockJson.contains("high")) {
            return;
        }
        JSONObject myjson = JSON.parseObject(stockJson);
        
        JSONArray dataArray = myjson.getJSONArray("data");
        
        ArrayList<CnameHigh> highList = new ArrayList<>();
        
        String cname;
        Float high;
        for (int i = 0; i < dataArray.size(); i++) {
            JSONObject jsonObject = dataArray.getJSONObject(i);
            cname = jsonObject.getString("cname");
            high = jsonObject.getFloatValue("high");
            CnameHigh cnameHigh = new CnameHigh(cname, high);
            highList.add(cnameHigh);
        }
        page.putField("high", highList);
}

在 main 方法里我們修改了 url 里的 num 參數(shù),由原來(lái)的每次爬 60 條數(shù)據(jù)改成每次爬 20 條數(shù)據(jù),這是為了讓爬蟲(chóng)跑的時(shí)間長(zhǎng)一點(diǎn),以使動(dòng)態(tài)圖可以顯示的久一點(diǎn)。此外,我們還修改了輸出目錄,然后我們將原來(lái)的 JsonFilePipeline 改成了 FilePipeline ,這樣可以減少 Spark Streaming 中的實(shí)時(shí)處理代碼量

public static void main(String[] args) {
        String url_init = "http://stock.finance.sina.com.cn/usstock/api/jsonp.php/" + 
    "/US_CategoryService.getList?num=20&sort=&asc=0&page=1";
        String url_pattern = "http://stock.finance.sina.com.cn/usstock/api/jsonp.php/" + 
            "/US_CategoryService.getList?num=20&sort=&asc=0&page=";
        String output = "/data/edu6/tmp/";
        QueueScheduler scheduler = new QueueScheduler();
        Spider spider = Spider.create(new GetStockHigh())
                .addUrl(url_init)
                .setScheduler(scheduler)
                .addPipeline(new FilePipeline(output))
                .addPipeline(new ConsolePipeline());
        for (int i = 1; i < 418; i++) {
            Request request = new Request();
            request.setUrl(url_pattern + i);
            scheduler.push(request, spider);
        }
        spider.thread(1).run();
}

先讓爬蟲(chóng)跑一下看看得到的數(shù)據(jù)是什么樣子

爬蟲(chóng)下載到的包含股票名稱(chēng)和最高價(jià)的文件

可以看到我們爬到的文件最前面兩行不是我們需要的,所以我們需要在 Spark Streaming 中過(guò)濾掉,然后將剩下的股票名稱(chēng)和最高價(jià)存到數(shù)據(jù)庫(kù)里面,同時(shí)用 echarts 實(shí)時(shí)展示

為了過(guò)濾掉前兩行,我們發(fā)現(xiàn)了一個(gè)規(guī)律,如果以 ":" 將每行文本分割,第一行被分成三段,第二行被分成一段,而其余我們需要的數(shù)據(jù)都被分成了兩段。以這個(gè)規(guī)律,在 Spark Streaming 中編寫(xiě)代碼來(lái)過(guò)濾出我們需要的數(shù)據(jù)。

編寫(xiě) Spark Streaming

在 Spark Streaming 中我們要做的工作就是不斷地讀取 kafka 傳送過(guò)來(lái)的數(shù)據(jù),過(guò)濾后存到 mysql 里面

首先定義 kafka 的配置

val sparkConf = new SparkConf().setAppName("StockMonitor").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint("checkpoint")
val topics = Set("flumesendkafka")
val brokers = "localhost:9092"
val zkQuorum = "localhost:2181"
val kafkaParams = Map[String, String](
        "metadata.broker.list" -> brokers,
        "serializer.class" -> "kafka.serializer.StringEncoder")

然后定義數(shù)據(jù)庫(kù)的連接配置

val db_host = "localhost"
val db_name = "sina"
val db_user = "root"
val db_passwd = "strongs"
val db_connection_str = "jdbc:mysql://" + db_host + ":3306/" + db_name + "?user=" + db_user + "&password=" + db_passwd

然后定義 Dstream

val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

最后對(duì) Dstream 執(zhí)行計(jì)算,這里注意一點(diǎn),我們是對(duì)每個(gè) RDD 分區(qū)建立一個(gè)數(shù)據(jù)庫(kù)連接。因?yàn)槿绻麑?duì)每一行建立一個(gè)連接的話(huà),頻繁的新建和關(guān)閉數(shù)據(jù)庫(kù)連接對(duì)系統(tǒng)開(kāi)銷(xiāo)很大,影響實(shí)時(shí)處理的速度;而對(duì)直接每個(gè) RDD 建立一個(gè)連接的話(huà)又會(huì)報(bào)不能序列化的異常。

dstream.foreachRDD {rdd =>
          rdd.foreachPartition {partitionOfRdd =>
               var conn: Connection = DriverManager.getConnection(db_connection_str)
               
                val lines = partitionOfRdd.filter(_.split(':').length == 2)     //過(guò)濾出我們需要的數(shù)據(jù)
                try {
                    lines.foreach ( line => {
                      var strs = line.mkString.split(':')
                      
                      if (strs.length == 2) {
                        var ps: PreparedStatement = null
                        val sql = "insert into cnamehigh (cname, high) values (?, ?)"
                        val cname = strs(0)
                        val high = strs(1).toFloat
                        println("cname:" + cname + ",high:" + high)
                          
                        ps = conn.prepareStatement(sql)
                        ps.setString(1, cname)
                        ps.setFloat(2, high)
                        ps.execute()
                        if (ps != null) {
                            ps.close()
                        }
                      }
                    })
                } catch {
                    case e: Exception => println("MySQL Exception")// todo: handle error
                } finally {
                    if (conn != null) {
                        conn.close()
                    }
                }
            }
}

完整代碼 StockMonitor.scala:

package my.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka.KafkaUtils
import kafka.serializer.StringDecoder
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.DriverManager

object StockMonitor {
    def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("StockMonitor").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
        ssc.checkpoint("checkpoint")
        val topics = Set("flumesendkafka")
        val brokers = "localhost:9092"
        val zkQuorum = "localhost:2181"
        val kafkaParams = Map[String, String](
            "metadata.broker.list" -> brokers,
            "serializer.class" -> "kafka.serializer.StringEncoder")
        val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
        
        val db_host = "localhost"
        val db_name = "sina"
        val db_user = "root"
        val db_passwd = "strongs"
        val db_connection_str = "jdbc:mysql://" + db_host + ":3306/" + db_name + "?user=" + db_user + "&password=" + db_passwd
        
        dstream.foreachRDD {rdd =>
          rdd.foreachPartition {partitionOfRdd =>
               var conn: Connection = DriverManager.getConnection(db_connection_str)
               
                val lines = partitionOfRdd.filter(_.split(':').length == 2)     //過(guò)濾出我們需要的數(shù)據(jù)
                try {
                    lines.foreach ( line => {
                      var strs = line.mkString.split(':')
                      
                      if (strs.length == 2) {
                        var ps: PreparedStatement = null
                        val sql = "insert into cnamehigh (cname, high) values (?, ?)"
                        val cname = strs(0)
                        val high = strs(1).toFloat
                        println("cname:" + cname + ",high:" + high)
                          
                        ps = conn.prepareStatement(sql)
                        ps.setString(1, cname)
                        ps.setFloat(2, high)
                        ps.execute()
                        if (ps != null) {
                            ps.close()
                        }
                      }
                    })
                } catch {
                    case e: Exception => println("MySQL Exception")// todo: handle error
                } finally {
                    if (conn != null) {
                        conn.close()
                    }
                }
            }
        }
        
        ssc.start()
        ssc.awaitTermination()
        ssc.stop()
    }
}

使用 Kafka 對(duì) SparkStreaming 進(jìn)行測(cè)試

開(kāi)啟 kafka 之前要先開(kāi)啟 zookeeper

/apps/zookeeper/bin/zkServer.sh start

開(kāi)啟 kafka

/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties

然后另開(kāi)一個(gè)終端在 kafka 中新建一個(gè) topic

/apps/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --topic flumesendkafka --partitions 1

查看剛才新建的 topic

/apps/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
剛剛新建的 topic

然后打開(kāi) mysql 切換到 sina 數(shù)據(jù)庫(kù),新建一個(gè)表,這里我們新增了一個(gè)自增的 id 字段,因?yàn)樵谧隹梢暬臅r(shí)候,要一直顯示最新的數(shù)據(jù),這時(shí)我們就可以按照 id 來(lái)降序查找,以保證每次查到的數(shù)據(jù)都不同

create table cnamehigh (id int not null auto_increment, cname varchar(100), high float, primary key(id));

查看一下數(shù)據(jù)庫(kù),可以看到里面還沒(méi)有數(shù)據(jù)

select * from cnamehigh;

查看數(shù)據(jù)庫(kù)

我們先用控制臺(tái)來(lái)當(dāng) kafka 的 producer,模擬輸入一些數(shù)據(jù),看看能不能正確的插入到 mysql 中

運(yùn)行 spark streaming

另開(kāi)一個(gè)終端,新建一個(gè) console producer

/apps/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flumesendkafka

然后輸入一些模擬數(shù)據(jù)

其中只有第一行是正確的數(shù)據(jù)

在 spark streaming 的控制臺(tái)中看到,只輸出了第一行數(shù)據(jù)

spark streaming 輸出結(jié)果

而且在 mysql 中也可以看到新增進(jìn)來(lái)的數(shù)據(jù)

mysql 中新增的數(shù)據(jù)

好了,因?yàn)閯偛诺谋聿迦肓诵碌臄?shù)據(jù),所以我們把剛才的表刪掉,重新創(chuàng)建一個(gè)相同的表備用

create table cnamehigh1 like cnamehigh;
drop table cnamehigh;
alter table cnamehigh1 rename cnamehigh;

接下來(lái)我們配置 flume

Flume 配置

/data/edu1 目錄下新建一個(gè) flume 配置文件

vim /data/edu1/spooldir_mem_kafka.conf

將下列配置填寫(xiě)進(jìn)去

agent1.sources = src
agent1.channels = ch
agent1.sinks = des

agent1.sources.src.type = spooldir
agent1.sources.src.restart = true
agent1.sources.src.spoolDir = /data/edu6/tmp/stock.finance.sina.com.cn

agent1.channels.ch.type = memory

agent1.sinks.des.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.des.brokerList = localhost:9092
agent1.sinks.des.topic = flumesendkafka
agent1.sinks.des.batchSize = 1
agent1.sinks.des.requiredAcks = 1

agent1.sources.src.channels = ch
agent1.sinks.des.channel = ch

這里我們?cè)O(shè)置 flume 監(jiān)控的目錄為新爬蟲(chóng)的輸出目錄
然后設(shè)置 batchSize = 1 是為了讓數(shù)據(jù)庫(kù)更新的及時(shí)一點(diǎn),以便我們可以觀測(cè)到動(dòng)態(tài)圖的變化

最后一點(diǎn)工作就是 echarts 動(dòng)態(tài)圖的完成

echarts 動(dòng)態(tài)圖

可視化的部分已經(jīng)花了很大的篇幅講過(guò)了,這里就不啰嗦了,直接貼上 echarts 動(dòng)態(tài)圖的代碼

<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
    <meta charset="utf-8">
    <title>股票最高價(jià)實(shí)時(shí)統(tǒng)計(jì)</title>
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    
    <%@include file="../basic/cssjs.jsp" %>
    
     <!-- 引入Jquery包 -->
    <script type="text/javascript" src="../js/jquery-2.1.4.min.js"></script>
    
    <!-- 引入Echarts3包 -->
    <script type="text/javascript" src="../js/echarts.min.js"></script>

</head>

<body>

    <%@include file="../basic/top.jsp" %>

    <!-- content start -->
    <div class="container-fluid">
        <div class="row-fluid">
        
            
            
            <!-- right content start -->
            <div class="span12">
                <div class="session">
                    <div id="main" style="width: 100%;height:600px;"></div>
                </div>
            </div>
            <!-- right content end -->
        </div>
    </div>
    <!-- content end -->
</body>
</html>
<script type="text/javascript">

    var data = [];
    var markLineData = [];
    for ( var i = 1; i < data.length; i++) {
        markLineData.push([ {
            xAxis : i - 1,
            yAxis : data[i - 1],
            value : (data[i] + data[i - 1]).toFixed(2)
        }, {
            xAxis : i,
            yAxis : data[i]
        } ]);
    }
    option = {
        title : {
            text : '股票最高價(jià)實(shí)時(shí)統(tǒng)計(jì)',
        },
        tooltip : {
            trigger : 'axis'
        },
        xAxis : {
            name: '股票名稱(chēng)',
            data : (function (){
                var now = new Date();
                var res = [];
                var len = 10;
                while (len--) {
                    res.push("");
                }
                return res;
            })()
        },
        yAxis: [
                {
                    type: 'value',
                    scale: true,
                    name: '最高價(jià)',
                    max: 500,
                    min: 0,
                    boundaryGap: [0.2, 0.2]
                }
            ],
        dataZoom: [
                   {
                       id: 'dataZoomX',
                       type: 'slider',
                       xAxisIndex: [0],
                       filterMode: 'filter'
                   },
               ],
        series : [ {
            type : 'line',
            data : (function (){
                var res = [];
                var len = 0;
                while (len < 10) {
                    res.push(0);
                    len++;
                }
                return res;
            })(),
            markPoint : {
                data : [ {
                    type : 'max',
                    name : '最大值'
                }, {
                    type : 'min',
                    name : '最小值'
                } ]
            },
            markLine : {
                smooth : true,
                effect : {
                    show : true
                },
                distance : 10,
                label : {
                    normal : {
                        position : 'middle'
                    }
                },
                symbol : [ 'none', 'none' ],
                data : markLineData
            }
        } ]
    };
    setInterval(function () {//實(shí)現(xiàn)定時(shí)訪(fǎng)問(wèn)數(shù)據(jù)庫(kù)添加地方1
    var myChart = echarts.init(document.getElementById('main'));
    myChart.setOption(option);
    var mapOnlyKey = [];
    var mapKeyValue = [];
    var mapOnlyValue = [];
    var info = {
        "opt" : "cnameHigh"
    };
    $.post("../ServletStock", info, function(data) {
        mapOnlyKey.length = 0;
        mapKeyValue.length = 0;
        mapOnlyValue.length = 0;
        
        
        for ( var i = 0; i < data.length; i++) {
            mapOnlyKey.push(data[i].cname);
            mapKeyValue.push({
                "value" : Math.round(data[i].high),
                "name" : data[i].cname
            });
            mapOnlyValue.push(data[i].high);
        }
        
        var data1 = option.series[0].data;
        data1.shift();
        data1.push(mapOnlyValue.shift());
        
        option.xAxis.data.shift();
        option.xAxis.data.push(mapOnlyKey.shift());
        
        myChart.setOption(option);
    
    }, 'json')
    }, 2000);//實(shí)現(xiàn)定時(shí)訪(fǎng)問(wèn)數(shù)據(jù)庫(kù)添加地方2
</script>

不過(guò)有一點(diǎn)需要注意, dao 層的 sql 我是這么寫(xiě)的,以保證每次查到的數(shù)據(jù)都不同

select cname,high from cnamehigh order by id desc limit 1;

但是因?yàn)閿?shù)據(jù)庫(kù)更新的比較快,所以我們每查一次可能 id 已經(jīng)漲了幾十上百了,所以嚴(yán)格來(lái)說(shuō)這也不太算實(shí)時(shí),不過(guò)道理還是一樣的

所有準(zhǔn)備工作都做完后,最后就是讓工程跑起來(lái),終于到了激動(dòng)人心的時(shí)刻!

工程執(zhí)行的順序依次為:開(kāi)啟可視化=>開(kāi)啟kafka=>開(kāi)啟spark streaming=>開(kāi)啟flume=>開(kāi)啟爬蟲(chóng)程序

開(kāi)啟可視化

開(kāi)啟可視化

可以看到動(dòng)態(tài)圖還沒(méi)有變化

動(dòng)態(tài)圖

開(kāi)啟 kafka

/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties

開(kāi)啟 spark streaming

開(kāi)啟 spark streaming

開(kāi)啟 flume

flume-ng agent -c /data/edu1/ -f /data/edu1/spooldir_mem_kafka.conf -n agent1 -Dflume.root.logger=DEBUG,console

最后開(kāi)啟爬蟲(chóng)

開(kāi)啟爬蟲(chóng)

實(shí)時(shí)的動(dòng)態(tài)圖

股票最高價(jià)實(shí)時(shí)統(tǒng)計(jì).gif

總結(jié)

經(jīng)歷了四個(gè)月的大數(shù)據(jù)學(xué)習(xí),我學(xué)到了很多有趣的東西,其中既有對(duì)已有知識(shí)的鞏固,也領(lǐng)略到了大數(shù)據(jù)這個(gè)新興行業(yè)的魅力。感謝各位老師的悉心指導(dǎo),還有各位小伙伴的互相交流,希望大家一直保持著旺盛的好奇心與求知欲,永遠(yuǎn)年輕。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容