前段時間開始做股票數(shù)據(jù)分析的業(yè)余項目,希望能提高自己對大型數(shù)據(jù)量的處理能力。目前大致的想法是用python的tushare模塊獲取數(shù)據(jù),用Java的框架做發(fā)布。
1.Tushare模塊的說明
tushare是國內(nèi)金融從業(yè)者@JimmyTu(挖地兔)搜集國內(nèi)各個公開渠道的股票數(shù)據(jù)接口,并把這些接口整理后用python寫的一個模塊。文檔地址為:http://tushare.org/index.html
首先說明一下tushare獲取到的數(shù)據(jù)格式,這里以分筆歷史紀錄交易為例:
import tushare as ts
df = ts.get_tick_data('600848',date='2014-01-09')
df.head(4)
time price change volume amount type
0 15:00:00 6.05 -- 8 4840 賣盤
1 14:59:55 6.05 -- 50 30250 賣盤
2 14:59:35 6.05 -- 20 12100 賣盤
3 14:59:30 6.05 -0.01 165 99825 賣盤
請求得到的結(jié)果是pandas模塊中的一種基本數(shù)據(jù)結(jié)構(gòu)類——DataFrame,這是一種類似二維表的數(shù)據(jù)結(jié)構(gòu)。DataFrame類提供了多種數(shù)據(jù)處理、存儲的方法,其中也包括了將數(shù)據(jù)存入數(shù)據(jù)庫的to_sql方法,更多用法可查閱pandas的官方文檔。
2.數(shù)據(jù)庫設計
目前每日分筆數(shù)據(jù)采用動態(tài)建表的方式保存,即將每天所有股票的分筆數(shù)據(jù)存儲在一張以當天日期命名的表中,并且間隔一定時間周期后再分庫。
3.代碼
以下為一個初步的使用多線程請求歷史分筆記錄的python3代碼
# -*- coding: utf-8 -*-
"""
Created on Fri Jan 2 12:37:43 2017
@author: jerry
"""
import tushare as ts
import time
import queue
import threading
import pandas as ps
from sqlalchemy import create_engine
THREADS_NUM = 25 # 采集線程數(shù)
THREADS_EXITFLAG = 0 # 線程退出標志
TICKS_DATA_DATE = '2017-01-04' # 指定采集日期
MYSQL_ENGINE = 'mysql://root:pwd@ip:port/dbname?charset=utf8'
class GetStockData(threading.Thread):
def __init__(self, threadID, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.q = q
def run(self):
print ('線程%s開始下載' % (self.threadID))
self._process_data()
def _process_data(self):
engine = create_engine(MYSQL_ENGINE)
while not THREADS_EXITFLAG:
if not self.q.empty():
code = self.q.get()
remain_num = self.q.qsize()
tick_data = get_tick(code, TICKS_DATA_DATE)
#根據(jù)當請求的股票當日停牌時,返回的數(shù)據(jù)有三行
if len(tick_data) > 3:
save_to_mysql(TICKS_DATA_DATE, tick_data, engine, code, remain_num)
time.sleep(0.05)
else:
break
def get_stock_basics():
"""
獲取當日股票列表
Return
--------
DataFrame
"""
basics = ts.get_stock_basics()
return basics
def get_tick(stockCode=None, date=None):
"""
根據(jù)股票列表的股票代碼獲取當日/指定日期歷史分筆
Return
--------
DataFrame
"""
tick_data = ''
if date != None and date != '':
tick_data = ts.get_tick_data(stockCode, date)
else:
tick_data = ts.get_today_ticks(stockCode)
if not tick_data.dropna(axis=0, how='any', thresh=None).empty:
tick_data.insert(0, 'code', stockCode) #插入股票代碼字段
return tick_data
def save_to_mysql(tablename=None, data=None, engine=None, code=None, num=None):
"""
保存獲取的數(shù)據(jù)到MySQL數(shù)據(jù)庫中
Return
--------
"""
for i in range(3):
try:
data.to_sql(tablename, engine, if_exists='append')
print('save %s %s' % (code, num))
break
except BaseException as e:
print ('Save Error %s ' % (code))
return
def main():
# reload(sys)
# sys.setdefaultencoding('utf8')
stock_codes = get_stock_basics()
threads = []
try:
"""
根據(jù)股票代碼列表創(chuàng)建隊列
"""
stocks = queue.Queue(len(stock_codes))
for code in stock_codes.index:
code = str(code)
if (len(code) != 6):
code = (6 - len(code)) * '0' + code
stocks.put(code)
"""
創(chuàng)建并運行線程
"""
for n in range(THREADS_NUM):
thread = GetStockData(n, stocks)
thread.start()
threads.append(thread)
while not stocks.empty():
pass
print ('數(shù)據(jù)請求完畢。')
THREADS_EXITFLAG = 1
for t in threads:
t.join()
except BaseException as e:
print ('Error', e)
return
if __name__ == '__main__':
print ('開始請求%s的數(shù)據(jù)' % (TICKS_DATA_DATE))
main()
代碼很簡單,請求當日的股票列表,并將列表中的股票代碼放入隊列中,爾后開啟指定數(shù)量的線程并根據(jù)隊列中的股票代碼請求數(shù)據(jù),請求完畢后將數(shù)據(jù)保存至mysql數(shù)據(jù)庫。
當然,以上只是個初步版本,還需要修改一些紕漏之處,并增加日志等功能。