這篇文章主要介紹如何使用Python爬取股票數(shù)據(jù)和實(shí)現(xiàn)數(shù)據(jù)接口。
源碼下載地址:https://github.com/Lonely7th/TsSpiderServer
1.定時(shí)抓取和解析數(shù)據(jù)
這次我們抓取的目標(biāo)是網(wǎng)易財(cái)經(jīng)的股票板塊,我們首先分析一下鏈接http://quotes.money.163.com/trade/lsjysj_603088.html?year=2018&season=1。按照鏈接的格式,我們拼接好股票代碼、年份和季度:
url = "http://quotes.money.163.com/trade/lsjysj_" + key + ".html?year=" + year + "&season=" + season
拼接好鏈接后,使用requests庫(kù)獲取頁(yè)面的內(nèi)容:
requests.get(url)
self.parse_pager(content.content, item["code"])
考慮到網(wǎng)絡(luò)請(qǐng)求可能會(huì)失敗,我們?cè)谡?qǐng)求失敗時(shí)設(shè)置多次重新請(qǐng)求(最多8次),如果多次請(qǐng)求后仍然失敗,則將請(qǐng)求的相關(guān)內(nèi)容存儲(chǔ)到error_logs中:
# 請(qǐng)求失敗后重新請(qǐng)求(最多8次)
max_try = 8
for tries in range(max_try):
try:
content = requests.get(url)
self.parse_pager(content.content, item["code"])
break
except Exception:
if tries < (max_try - 1):
sleep(2)
continue
else:
add_error_logs("crawl_error", "501", key)
獲取到頁(yè)面內(nèi)容后,我們先來(lái)分析頁(yè)面結(jié)構(gòu)(圖1),我們需要的數(shù)據(jù)大概是以這樣的格式存在的:tr標(biāo)簽表示股票某一天的行情,tr標(biāo)簽下的td標(biāo)簽表示當(dāng)前行情的詳細(xì)數(shù)據(jù):
使用BeautifulSoup庫(kù)對(duì)頁(yè)面進(jìn)行解析,soup.select("div.inner_box tr")會(huì)以列表的形勢(shì)返回div.inner_box下的所有tr標(biāo)簽:
soup = bs4.BeautifulSoup(content, "lxml")
parse_list = soup.select("div.inner_box tr")
[x.string for x in item.select("td")]會(huì)將tr標(biāo)簽下的內(nèi)容組合成一個(gè)數(shù)組data,這個(gè)數(shù)組就是我們要抓取的數(shù)據(jù):
data = [x.string for x in item.select("td")]
每次解析頁(yè)面時(shí),我們都會(huì)從數(shù)據(jù)庫(kù)中取出當(dāng)前股票已經(jīng)存在的數(shù)據(jù),用于判斷待插入數(shù)據(jù)是否已經(jīng)存在數(shù)據(jù)庫(kù)中。這樣做可以及時(shí)補(bǔ)全數(shù)據(jù),并且避免數(shù)據(jù)重復(fù)插入。
if price["cur_timer"]not in timer_list:
self.dm.add_tk_item(key, price)
由于股票數(shù)據(jù)是頻繁變動(dòng)的,這就要求我們定時(shí)對(duì)數(shù)據(jù)進(jìn)行更新,這里我們編寫(xiě)一個(gè)定時(shí)器來(lái)實(shí)現(xiàn)定時(shí)更新數(shù)據(jù)的功能:
timer = threading.Timer(time_interval, fun_timer)
timer.start()
我們?cè)O(shè)置每天16點(diǎn)更新數(shù)據(jù):
if (hour =="16" or hour =="20")and minute =="00":
dc = ENDataCrawl()
dc.start_crawl()
sleep(time_interval)
rm = RedisManager()
rm.update_data()
2.存儲(chǔ)數(shù)據(jù)到MongoDB
這里我們使用MongoDB來(lái)存儲(chǔ)數(shù)據(jù),MongoDB作為一個(gè)面向文檔存儲(chǔ)的數(shù)據(jù)庫(kù),操作起來(lái)相對(duì)比較簡(jiǎn)單和容易。在編寫(xiě)代碼之前,我們需要先進(jìn)行安裝 MongoDB安裝教程,此外python操作MongoDB需要用到pymongo庫(kù),命令行下輸入pip install pymongo安裝即可。
安裝完成后,我們開(kāi)始編寫(xiě)MongoDB相關(guān)的代碼,新建DBManager類(lèi)用于管理數(shù)據(jù)庫(kù)相關(guān)操作:
class DBManager:
def __init__(self, table_name):
# 指定端口和地址
self.client = MongoClient(mod_config.get_config("database", "dbhost"), int(mod_config.get_config("database", "dbport")))
# 選擇數(shù)據(jù)庫(kù)
self.db = self.client[mod_config.get_config("database", "dbname")]
self.table = self.db[table_name]
在DBManager類(lèi)中,我們最常用到的有add_tk_item方法,這個(gè)方法會(huì)根據(jù)tk_code(股票代碼),將最新的數(shù)據(jù)插入到price_list中。
def add_tk_item(self, tk_code, price_item):
return self.table.update_one({'code': tk_code}, {"$push": {"price_list": price_item}})*
以及find_by_id方法,這個(gè)方法會(huì)根據(jù)tk_code查詢(xún)相應(yīng)的股票信息。當(dāng)我們需要對(duì)Cursor進(jìn)行長(zhǎng)時(shí)間循環(huán)遍歷時(shí),應(yīng)該將no_cursor_timeout設(shè)置為true。
def find_by_id(self, tk_code, request={}):
if tk_code:
request["code"] = tk_code
return self.table.find_one(request)
else:
# 數(shù)據(jù)量較大時(shí)避免CursorNotFoundException
return self.table.find({}, no_cursor_timeout=True)*
3.緩存數(shù)據(jù)到Redis
為了提升響應(yīng)速度,我們使用Redis對(duì)數(shù)據(jù)進(jìn)行緩存,redis作為一個(gè)key-value存儲(chǔ)系統(tǒng),具有極高的性能。跟之前一樣我們需要先安裝Redis Redis安裝教程,然后為python安裝redis庫(kù),使用pip install redis命令。
接下來(lái)我們創(chuàng)建RedisManager類(lèi)用于管理Redis的相關(guān)操作:
class RedisManager:
def __init__(self):
self.pool = redis.ConnectionPool(host=mod_config.get_config("redis", "redis_host"), port=mod_config.get_config("redis", "redis_port"), decode_responses=True)
self.r = redis.Redis(connection_pool=self.pool)
update_data方法用于將MongoDB的數(shù)據(jù)同步到Redis,每次系統(tǒng)執(zhí)行完爬取業(yè)務(wù)后都會(huì)調(diào)用該方法:
def update_data(self):
# 將mongodb中的數(shù)據(jù)同步到redis中
dm = DBManager("tk_details")
code_list = dm.find_by_id("")
for item in code_list:
try:
code = item["code"][:6]
_result = dm.find_by_id(item["code"])
sorted_result = sorted(_result["price_list"], cmp=cmp_datetime, key=operator.itemgetter("cur_timer"))
self.r.set(code, sorted_result)
except Exception:
add_error_logs("redis_error", "501", item["code"])
continue
4.配置Nginx和數(shù)據(jù)接口
由于我們只有一個(gè)簡(jiǎn)單的數(shù)據(jù)接口,所以選擇使用Nginx,Nginx 作為一個(gè)高性能的 Web 和反向代理服務(wù)器,具有簡(jiǎn)潔高效,占用資源少等優(yōu)點(diǎn)??紤]到很多開(kāi)發(fā)者習(xí)慣在Windows下調(diào)試代碼,我們先在Windows系統(tǒng)中安裝Nginx windows下安裝nginx(Windows下Nginx是以應(yīng)用的形式運(yùn)行的,這可能也是很多人不愿意在Windows下運(yùn)行Nginx的原因)。
配置好Nginx后我們開(kāi)始編寫(xiě)數(shù)據(jù)接口,start_api_tkdata方法會(huì)開(kāi)啟一個(gè)監(jiān)聽(tīng),用于響應(yīng)Nginx的請(qǐng)求:
def start_api_tkdata():
WSGIServer(myapp, bindAddress=(mod_config.get_config("server", "server_host"), int(mod_config.get_config("server", "tk_data_port")))).run()
myapp方法每次收到請(qǐng)求時(shí),都會(huì)對(duì)請(qǐng)求的格式和參數(shù)進(jìn)行校驗(yàn),校驗(yàn)通過(guò)后則從Redis中獲取數(shù)據(jù)以json格式返回。
start_response('200 OK', [('Content-Type', 'text/plain')])
result_json["data"] =str(result).replace("u'", "'")
result_json["tk_code"] =str(list_query[i +1])
return [json.dumps(result_json)]
編寫(xiě)完數(shù)據(jù)接口后,我們?cè)诒緳C(jī)啟動(dòng)Nginx,在瀏覽器中輸入http://127.0.0.1:9002/tkdata?code=600008,可以看到如下結(jié)果(圖2):
到此為止,我們的股票爬蟲(chóng)和數(shù)據(jù)接口就已經(jīng)完成了,我們還可以在現(xiàn)有的基礎(chǔ)上做一些優(yōu)化,例如:
1.爬取數(shù)據(jù)時(shí)使用多線程和多進(jìn)程。
2.添加更多的數(shù)據(jù)接口,添加均線、Macd、Boll等指標(biāo)數(shù)據(jù),這些數(shù)據(jù)可以由收盤(pán)價(jià)計(jì)算得到。
3.添加數(shù)據(jù)檢測(cè)和日志管理模塊,如果你打算將這套系統(tǒng)用在生產(chǎn)環(huán)境中,這些模塊是必須要有的。