文章同時(shí)發(fā)布在個(gè)人博客:基于SQL的實(shí)時(shí)股票分析

概述
這次分享的是在給某券商做實(shí)時(shí)流處理案例,基于我們公司的StreamSQL流處理功能和他們的業(yè)務(wù)數(shù)據(jù),做的股票實(shí)時(shí)分析功能。
主要功能,采集DBF實(shí)時(shí)股票交易數(shù)據(jù)并寫入消息隊(duì)列,使用StreamSQL組件,實(shí)時(shí)接受消息隊(duì)列中數(shù)據(jù),并進(jìn)行統(tǒng)計(jì)分析。
做的流處理功能有:
- 保存實(shí)時(shí)明細(xì)數(shù)據(jù),每天4個(gè)小時(shí)的時(shí)間,約為500-600萬條。
- 計(jì)算當(dāng)前成交額排名靠前的股票(實(shí)際效果跟同花順看到的成交額一致)
- 實(shí)時(shí)計(jì)算5分鐘內(nèi),成交額靠前股票

</a> 流程架構(gòu)圖[/caption]
由于時(shí)間限制,并沒有做更深入的功能。其實(shí)結(jié)合券商其他數(shù)據(jù)和實(shí)時(shí)數(shù)據(jù),可以做很多有價(jià)值的實(shí)時(shí)分析(例如:離線數(shù)據(jù)模型訓(xùn)練,實(shí)時(shí)放量股票和模型數(shù)據(jù)結(jié)合分析做股票推薦),給券商產(chǎn)品提供基礎(chǔ)數(shù)據(jù)支撐。
Stream SQL介紹
Transwarp Stream是星環(huán)專為企業(yè)級(jí)用戶打造的流計(jì)算引擎,主要應(yīng)用于實(shí)時(shí)性較強(qiáng)的應(yīng)用場(chǎng)景。比如,金融行業(yè)需要對(duì)市場(chǎng)波動(dòng)進(jìn)行實(shí)時(shí)預(yù)警;交通行業(yè)需要將卡口數(shù)據(jù)實(shí)時(shí)入庫,并在線使用圖像識(shí)別鑒別套牌車進(jìn)行預(yù)警等;銀行業(yè)務(wù)需要在線分析業(yè)務(wù),及時(shí)鑒別欺詐等違規(guī)行為;采用復(fù)雜物聯(lián)網(wǎng)的行業(yè),如機(jī)場(chǎng)、風(fēng)電等,需要將大量傳感器數(shù)據(jù)進(jìn)行實(shí)時(shí)分析和數(shù)據(jù)挖掘。
企業(yè)級(jí)用戶往往對(duì)流處理產(chǎn)品在實(shí)時(shí)性、吞吐量、高可用性、易用性、安全性和穩(wěn)定性等方面有著極其苛刻的要求。星環(huán)憑借自身強(qiáng)大的技術(shù)實(shí)力,以及國內(nèi)最多最復(fù)雜的流處理案例經(jīng)驗(yàn),開發(fā)出滿足這些苛刻要求的Transwarp Stream流計(jì)算引擎:

更多StreamSQL產(chǎn)品的介紹,可以參考星環(huán)科技Transwarp Stream:業(yè)界SQL支持最強(qiáng) 可實(shí)時(shí)數(shù)據(jù)挖掘的流計(jì)算引擎
數(shù)據(jù)采集
股票實(shí)時(shí)數(shù)據(jù)交易數(shù)據(jù)一般會(huì)存入DBF文件,這個(gè)格式證券行業(yè)已經(jīng)沿用20多年。
數(shù)據(jù)采集部分,包含功能:
- 定時(shí)解析dbf文件(判斷dbf文件是否變化)
- 寫入kafka消息隊(duì)列,為后續(xù)streamSQL提供數(shù)據(jù)
注意,需要引入解析dbf格式的jar包,dbf.jar
部分代碼如下:
InputStream fis = null;
// 讀取文件的輸入流
fis = new FileInputStream(path);
// 根據(jù)輸入流初始化一個(gè)DBFReader實(shí)例,用來讀取DBF文件信息
DBFReader reader = new DBFReader(fis);
reader.setCharactersetName("gbk");
// 調(diào)用DBFReader對(duì)實(shí)例方法得到path文件中字段的個(gè)數(shù)
int fieldsCount = reader.getFieldCount();
// 取出字段信息
// for (int i = 0; i < fieldsCount; i++) {
// DBFField field = reader.getField(i);
// // logger.info(field.getName() + "\t");
// }
Object[] rowValues;
int num = 1;
String time = null;
A: while ((rowValues = reader.nextRecord()) != null) {
//提出DBF中的當(dāng)前時(shí)間,同時(shí)判斷文件是否修改
if (num == 1 && rowValues[0].equals("000000")) {
Double t = (Double) rowValues[7];
time = rowValues[1] + "" + t.intValue();
if (now == null) {
now = time;
continue A;
} else if (now.equals(time)) {
break A;
} else {
now = time;
continue A;
}
}
num++;
StringBuffer message = new StringBuffer();
message.append(time + ",");
for (int i = 0; i < rowValues.length; i++) {
message.append(rowValues[i] + ",");
}
logger.info(message.toString());
producer.send(new KeyedMessage<Integer, String>(topic, message.toString()));
}
流處理
流處理部分,使用StreamSQL組件,編寫SQL完成。
明細(xì)保存StreamJOB
由于明細(xì)后續(xù)需要做統(tǒng)計(jì)分析,所以把數(shù)據(jù)存儲(chǔ)在基于內(nèi)存和SSD存儲(chǔ)的列式存儲(chǔ)holodesk組件中,語句
create streamjob holo_detail_stream_job as ("insert into holo_stream_zq_detail select * from stream_demo") JOBPROPERTIES('stream.number.receivers'='4');
5分鐘成交量StreamJOB
基于明細(xì)數(shù)據(jù),實(shí)時(shí)計(jì)算5分鐘內(nèi)成交額
create streamjob holo_count_stream_job as ("insert into hb_stream_holo select concat(TDH_TODATE(created,'yyyyMMddHHmmss','yyyyMMdd'),row_number() OVER(ORDER BY cjl desc)),created,hqzqdm,HQZQJC,cjl from (select max(created) as created,hqzqdm,HQZQJC,sum(HQZJCJ*HQCJBS) as cjl from (select created,hqzqdm,HQZQJC,HQZJCJ,HQCJBS from holo_stream_zq_detail union select created,hqzqdm,HQZQJC,HQZJCJ,HQCJBS from stream_demo)holo_stream_zq_detail where TDH_TODATE(created,'yyyyMMddHHmmss','yyyy-MM-dd HH:mm:ss:SSS')>CAST(sysdate-TO_MINUTE_INTERVAL(5) AS STRING) group by hqzqdm,HQZQJC order by cjl desc limit 20) t") JOBPROPERTIES('stream.number.receivers'='4');
start streamjob holo_count_stream_job;
計(jì)算結(jié)果展示
- 計(jì)算時(shí),只采集了深圳A股和創(chuàng)業(yè)板的數(shù)據(jù),計(jì)算當(dāng)前成交額排名靠前的股票,實(shí)際效果跟同花順看到的成交額一致。
- 實(shí)時(shí)計(jì)算5分鐘內(nèi),成交額靠前股票。這個(gè)在同花順也沒有統(tǒng)計(jì),有一點(diǎn)點(diǎn)的參考價(jià)值,基于這個(gè),還可以做實(shí)時(shí)放量分析。

</a>
<a rel="attachment wp-att-186">
</a>
報(bào)表展示
有了實(shí)時(shí)分析結(jié)果,再結(jié)合報(bào)表工具的股價(jià)圖,做了實(shí)現(xiàn)的展現(xiàn)(圖中數(shù)據(jù),缺失盤高盤底數(shù)據(jù))。
<a rel="attachment wp-att-185">
</a>
總結(jié)
基于StreamSQL對(duì)SQL的完整支持,和實(shí)時(shí)性、吞吐量、高可用性、易用性等特性,讓實(shí)時(shí)分析變的更簡(jiǎn)單。簡(jiǎn)單的實(shí)現(xiàn)數(shù)據(jù)采集工作,根據(jù)需求,隨時(shí)調(diào)整統(tǒng)計(jì)SQL,就可以完成實(shí)時(shí)的分析。
當(dāng)然這次案例,讓我對(duì)股票有了更深的認(rèn)識(shí)。
參考鏈接
更多StreamSQL產(chǎn)品的介紹,可以參考