資深程序員:十行Python代碼教你爬取抖音視頻!

環(huán)境說明

環(huán)境:

python 3.7.1

centos 7.4

pip 10.0.1

部署

[root@localhost ~]# python3.7 --version
Python 3.7.1
[root@localhost ~]#

[root@localhost ~]# pip3 install douyin

有時候因為網(wǎng)絡(luò)原因會安裝失敗,這時重新執(zhí)行上面的命令即可,直到安裝完成。

導(dǎo)入douyin模塊

[root@localhost ~]# python3.7
>>>import douyin
>>>

導(dǎo)入如果報錯的話,可能douyin模塊沒有安裝成功。

下面我們開始爬…爬抖音小視頻和音樂咯

[root@localhost douyin]# python3.7 dou.py

幾分鐘后…我們來看看爬的成果
可以看到視頻配的音樂被存儲成了 mp3 格式的文件,抖音視頻存儲成了 mp4 文件。

嗯…不錯,哈哈。

py腳本

作者說,能爬抖音上所有熱門話題和音樂下的相關(guān)視頻都爬取到,并且將爬到的視頻下載下來,同時還要把視頻所配的音樂也單獨(dú)下載下來,不僅如此,所有視頻的相關(guān)信息如發(fā)布人、點(diǎn)贊數(shù)、評論數(shù)、發(fā)布時間、發(fā)布人、發(fā)布地點(diǎn)等等信息都需要爬取下來,并存儲到 MongoDB 數(shù)據(jù)庫。

import douyin
from douyin.structures import Topic, Music

# 定義視頻下載、音頻下載、MongoDB 存儲的處理器
video_file_handler = douyin.handlers.VideoFileHandler(folder='./videos')
music_file_handler = douyin.handlers.MusicFileHandler(folder='./musics')
#mongo_handler = douyin.handlers.MongoHandler()
# 定義下載器,并將三個處理器當(dāng)做參數(shù)傳遞
#downloader = douyin.downloaders.VideoDownloader([mongo_handler, video_file_handler, music_
file_handler])
downloader = douyin.downloaders.VideoDownloader([video_file_handler, music_file_handler])
# 循環(huán)爬取抖音熱榜信息并下載存儲
for result in douyin.hot.trend():
    for item in result.data:
        # 爬取熱門話題和熱門音樂下面的所有視頻,每個話題或音樂最多爬取 10 個相關(guān)視頻。
        downloader.download(item.videos(max=10))

由于我這里沒有mongodb所以,把這mongodb相關(guān)的配置給注釋掉了。

代碼解讀

本庫依賴的其他庫有:

  • aiohttp:利用它可以完成異步數(shù)據(jù)下載,加快下載速度
  • dateparser:利用它可以完成任意格式日期的轉(zhuǎn)化
  • motor:利用它可以完成異步 MongoDB 存儲,加快存儲速度
  • requests:利用它可以完成最基本的 HTTP 請求模擬
  • tqdm:利用它可以進(jìn)行進(jìn)度條的展示

數(shù)據(jù)結(jié)構(gòu)定義

如果要做一個庫的話,一個很重要的點(diǎn)就是對一些關(guān)鍵的信息進(jìn)行結(jié)構(gòu)化的定義,使用面向?qū)ο蟮乃季S對某些對象進(jìn)行封裝,抖音的爬取也不例外。

在抖音中,其實有很多種對象,比如視頻、音樂、話題、用戶、評論等等,它們之間通過某種關(guān)系聯(lián)系在一起,例如視頻中使用了某個配樂,那么視頻和音樂就存在使用關(guān)系;比如用戶發(fā)布了視頻,那么用戶和視頻就存在發(fā)布關(guān)系,我們可以使用面向?qū)ο蟮乃季S對每個對象進(jìn)行封裝,比如視頻的話,就可以定義成如下結(jié)構(gòu):

class Video(Base):
    def __init__(self, **kwargs):
        """
        init video object
        :param kwargs:
        """
        super().__init__()
        self.id = kwargs.get('id')
        self.desc = kwargs.get('desc')
        self.author = kwargs.get('author')
        self.music = kwargs.get('music')
        self.like_count = kwargs.get('like_count')
        self.comment_count = kwargs.get('comment_count')
        self.share_count = kwargs.get('share_count')
        self.hot_count = kwargs.get('hot_count')
        ...
        self.address = kwargs.get('address')

    def __repr__(self):
        """
        video to str
        :return: str
        """
        return '<Video: <%s, %s>>' % (self.id, self.desc[:10].strip() if self.desc else None)

這里將一些關(guān)鍵的屬性定義成 Video 類的一部分,包括 id 索引、desc 描述、author 發(fā)布人、music 配樂等等,其中 author 和 music 并不是簡單的字符串的形式,它也是單獨(dú)定義的數(shù)據(jù)結(jié)構(gòu),比如 author 就是 User 類型的對象,而 User 的定義又是如下結(jié)構(gòu):

class User(Base):

    def __init__(self, **kwargs):
        """
        init user object
        :param kwargs:
        """
        super().__init__()
        self.id = kwargs.get('id')
        self.gender = kwargs.get('gender')
        self.name = kwargs.get('name')
        self.create_time = kwargs.get('create_time')
        self.birthday = kwargs.get('birthday')
        ...

    def __repr__(self):
        """
        user to str
        :return:
        """
        return '<User: <%s, %s>>' % (self.alias, self.name)

所以說,通過屬性之間的關(guān)聯(lián),我們就可以將不同的對象關(guān)聯(lián)起來,這樣顯得邏輯架構(gòu)清晰,而且我們也不用一個個單獨(dú)維護(hù)字典來存儲了,其實這就和 Scrapy 里面的 Item 的定義是類似的。

請求和重試

實現(xiàn)爬取的過程就不必多說了,這里面其實用到的就是最簡單的抓包技巧,使用 Charles 直接進(jìn)行抓包即可。抓包之后便可以觀察到對應(yīng)的接口請求,然后進(jìn)行模擬即可。

所以問題就來了,難道我要一個接口寫一個請求方法嗎?另外還要配置 Headers、超時時間等等的內(nèi)容,那豈不是太費(fèi)勁了,所以,我們可以將請求的方法進(jìn)行單獨(dú)的封裝,這里我定義了一個 fetch 方法:

def _fetch(url, **kwargs):
    """
    fetch api response
    :param url: fetch url
    :param kwargs: other requests params
    :return: json of response
    """
    response = requests.get(url, **kwargs)
    if response.status_code != 200:
        raise requests.ConnectionError('Expected status code 200, but got {}'.format(response.status_code))
    return response.json()

這個方法留了一個必要參數(shù),即 url,另外其他的配置我留成了 kwargs,也就是可以任意傳遞,傳遞之后,它會依次傳遞給 requests 的請求方法,然后這里還做了異常處理,如果成功請求,即可返回正常的請求結(jié)果。

定義了這個方法,在其他的調(diào)用方法里面我們只需要單獨(dú)調(diào)用這個 fetch 方法即可,而不需要再去關(guān)心異常處理,返回類型了。

好,那么定義好了請求之后,如果出現(xiàn)了請求失敗怎么辦呢?按照常規(guī)的方法,我們可能就會在外面套一層方法,然后記錄調(diào)用 fetch 方法請求失敗的次數(shù),然后重新調(diào)用 fetch 方法進(jìn)行重試,但這里可以告訴大家一個更好用的庫,叫做 retrying,使用它我們可以通過定義一個裝飾器來完成重試的操作。

比如我可以使用 retry 裝飾器這么裝飾 fetch 方法:

from retrying import retry

@retry(stop_max_attempt_number=retry_max_number, wait_random_min=retry_min_random_wait,
           wait_random_max=retry_max_random_wait, retry_on_exception=need_retry)
def _fetch(url, **kwargs):
    pass

這里使用了裝飾器的四個參數(shù):

  • stop_max_attempt_number:最大重試次數(shù),如果重試次數(shù)達(dá)到該次數(shù)則放棄重試
  • wait_random_min:下次重試之前隨機(jī)等待時間的最小值
  • wait_random_max:下次重試之前隨機(jī)等待時間的最大值
  • retry_on_exception:判斷出現(xiàn)了怎樣的異常才重試

這里 retry_on_exception 參數(shù)指定了一個方法,叫做 need_retry,方法定義如下:

def need_retry(exception):
    """
    need to retry
    :param exception:
    :return:
    """
    result = isinstance(exception, (requests.ConnectionError, requests.ReadTimeout))
    if result:
        print('Exception', type(exception), 'occurred, retrying...')
    return result

這里判斷了如果是 requests 的 ConnectionError 和 ReadTimeout 異常的話,就會拋出異常進(jìn)行重試,否則不予重試。

所以,這樣我們就實現(xiàn)了請求的封裝和自動重試,是不是非常 Pythonic?

下載處理器的設(shè)計

為了下載視頻,我們需要設(shè)計一個下載處理器來下載已經(jīng)爬取到的視頻鏈接,所以下載處理器的輸入就是一批批的視頻鏈接,下載器接收到這些鏈接,會將其進(jìn)行下載處理,并將視頻存儲到對應(yīng)的位置,另外也可以完成一些信息存儲操作。

  • 在設(shè)計時,下載處理器的要求有兩個,一個是保證高速的下載,另一個就是可擴(kuò)展性要強(qiáng),下面我們分別來針對這兩個特點(diǎn)進(jìn)行設(shè)計:
    高速下載,為了實現(xiàn)高速的下載,要么可以使用多線程或多進(jìn)程,要么可以用異步下載,很明顯,后者是更有優(yōu)勢的。
  • 擴(kuò)展性強(qiáng),下載處理器要能下載音頻、視頻,另外還可以支持?jǐn)?shù)據(jù)庫等存儲,所以為了解耦合,我們可以將視頻下載、音頻下載、數(shù)據(jù)庫存儲的功能獨(dú)立出來,下載處理器只負(fù)責(zé)視頻鏈接的主要邏輯處理和分配即可。

為了實現(xiàn)高速下載,這里我們可以使用 aiohttp 庫來完成,另外異步下載我們也不能一下子下載太多,不然網(wǎng)絡(luò)波動太大,所以我們可以設(shè)置 batch 式下載,可以避免同時大量的請求和網(wǎng)絡(luò)擁塞,主要的下載函數(shù)如下:

def download(self, inputs):
    """
    download video or video lists
    :param data:
    :return:
    """
    if isinstance(inputs, types.GeneratorType):
        temps = []
        for result in inputs:
            print('Processing', result, '...')
            temps.append(result)
            if len(temps) == self.batch:
                self.process_items(temps)
                temps = []
    else:
        inputs = inputs if isinstance(inputs, list) else [inputs]
        self.process_items(inputs)

這個 download 方法設(shè)計了多種數(shù)據(jù)接收類型,可以接收一個生成器,也可以接收單個或列表形式的視頻對象數(shù)據(jù),接著調(diào)用了 process_items 方法進(jìn)行了異步下載,其方法實現(xiàn)如下:

def process_items(self, objs):
    """
    process items
    :param objs: objs
    :return:
    """
    # define progress bar
    with tqdm(total=len(objs)) as self.bar:
        # init event loop
        loop = asyncio.get_event_loop()
        # get num of batches
        total_step = int(math.ceil(len(objs) / self.batch))
        # for every batch
        for step in range(total_step):
            start, end = step * self.batch, (step + 1) * self.batch
            print('Processing %d-%d of files' % (start + 1, end))
            # get batch of objs
            objs_batch = objs[start: end]
            # define tasks and run loop
            tasks = [asyncio.ensure_future(self.process_item(obj)) for obj in objs_batch]
            for task in tasks:
                task.add_done_callback(self.update_progress)
            loop.run_until_complete(asyncio.wait(tasks))

這里使用了 asyncio 實現(xiàn)了異步處理,并通過對視頻鏈接進(jìn)行分批處理保證了流量的穩(wěn)定性,另外還使用了 tqdm 實現(xiàn)了進(jìn)度條的顯示。

我們可以看到,真正的處理下載的方法是 process_item,這里面會調(diào)用視頻下載、音頻下載、數(shù)據(jù)庫存儲的一些組件來完成處理,由于我們使用了 asyncio 進(jìn)行了異步處理,所以 process_item 也需要是一個支持異步處理的方法,定義如下:

async def process_item(self, obj):
    """
    process item
    :param obj: single obj
    :return:
    """
    if isinstance(obj, Video):
        print('Processing', obj, '...')
        for handler in self.handlers:
            if isinstance(handler, Handler):
                await handler.process(obj)

這里我們可以看到,真正的處理邏輯都在一個個 handler 里面,我們將每個單獨(dú)的功能進(jìn)行了抽離,定義成了一個個 Handler,這樣可以實現(xiàn)良好的解耦合,如果我們要增加和關(guān)閉某些功能,只需要配置不同的 Handler 即可,而不需要去改動代碼,這也是設(shè)計模式的一個解耦思想,類似工廠模式。

Handler 的設(shè)計

剛才我們講了,Handler 就負(fù)責(zé)一個個具體功能的實現(xiàn),比如視頻下載、音頻下載、數(shù)據(jù)存儲等等,所以我們可以將它們定義成不同的 Handler,而視頻下載、音頻下載又都是文件下載,所以又可以利用繼承的思想設(shè)計一個文件下載的 Handler,定義如下:

from os.path import join, exists
from os import makedirs
from douyin.handlers import Handler
from douyin.utils.type import mime_to_ext
import aiohttp

class FileHandler(Handler):

    def __init__(self, folder):
        """
        init save folder
        :param folder:
        """
        super().__init__()
        self.folder = folder
        if not exists(self.folder):
            makedirs(self.folder)

    async def _process(self, obj, **kwargs):
        """
        download to file
        :param url: resource url
        :param name: save name
        :param kwargs:
        :return:
        """
        print('Downloading', obj, '...')
        kwargs.update({'ssl': False})
        kwargs.update({'timeout': 10})
        async with aiohttp.ClientSession() as session:
            async with session.get(obj.play_url, **kwargs) as response:
                if response.status == 200:
                    extension = mime_to_ext(response.headers.get('Content-Type'))
                    full_path = join(self.folder, '%s.%s' % (obj.id, extension))
                    with open(full_path, 'wb') as f:
                        f.write(await response.content.read())
                    print('Downloaded file to', full_path)
                else:
                    print('Cannot download %s, response status %s' % (obj.id, response.status))

    async def process(self, obj, **kwargs):
        """
        process obj
        :param obj:
        :param kwargs:
        :return:
        """
        return await self._process(obj, **kwargs)

這里我們還是使用了 aiohttp,因為在下載處理器中需要 Handler 支持異步操作,這里下載的時候就是直接請求了文件鏈接,然后判斷了文件的類型,并完成了文件保存。

視頻下載的 Handler 只需要繼承當(dāng)前的 FileHandler 即可:

from douyin.handlers import FileHandler
from douyin.structures import Video

class VideoFileHandler(FileHandler):

    async def process(self, obj, **kwargs):
        """
        process video obj
        :param obj:
        :param kwargs:
        :return:
        """
        if isinstance(obj, Video):
            return await self._process(obj, **kwargs)

這里其實就是加了類別判斷,確保數(shù)據(jù)類型的一致性,當(dāng)然音頻下載也是一樣的。

異步 MongoDB 存儲

上面介紹了視頻和音頻處理的 Handler,另外還有一個存儲的 Handler 沒有介紹,那就是 MongoDB 存儲,平常我們可能習(xí)慣使用 PyMongo 來完成存儲,但這里我們?yōu)榱思铀?,需要支持異步操作,所以這里有一個可以實現(xiàn)異步 MongoDB 存儲的庫,叫做 Motor,其實使用的方法差不太多,MongoDB 的連接對象不再是 PyMongo 的 MongoClient 了,而是 Motor 的 AsyncIOMotorClient,其他的配置基本類似。

在存儲時使用的是 update_one 方法并開啟了 upsert 參數(shù),這樣可以做到存在即更新,不存在即插入的功能,保證數(shù)據(jù)的不重復(fù)性。

整個 MongoDB 存儲的 Handler 定義如下:

from douyin.handlers import Handler
from motor.motor_asyncio import AsyncIOMotorClient
from douyin.structures import *

class MongoHandler(Handler):

    def __init__(self, conn_uri=None, db='douyin'):
        """
        init save folder
        :param folder:
        """
        super().__init__()
        if not conn_uri:
            conn_uri = 'localhost'
        self.client = AsyncIOMotorClient(conn_uri)
        self.db = self.client[db]

    async def process(self, obj, **kwargs):
        """
        download to file
        :param url: resource url
        :param name: save name
        :param kwargs:
        :return:
        """
        collection_name = 'default'
        if isinstance(obj, Video):
            collection_name = 'videos'
        elif isinstance(obj, Music):
            collection_name = 'musics'
        collection = self.db[collection_name]
        # save to mongodb
        print('Saving', obj, 'to mongodb...')
        if await collection.update_one({'id': obj.id}, {'$set': obj.json()}, upsert=True):
            print('Saved', obj, 'to mongodb successfully')
        else:
            print('Error occurred while saving', obj)

可以看到我們在類中定義了 AsyncIOMotorClient 對象,并暴露了 conn_uri 連接字符串和 db 數(shù)據(jù)庫名稱,可以在聲明 MongoHandler 類的時候指定 MongoDB 的鏈接地址和數(shù)據(jù)庫名。

同樣的 process 方法,這里使用 await 修飾了 update_one 方法,完成了異步 MongoDB 存儲。

很多小伙伴在學(xué)習(xí)Python的過程中往往因為沒有資料或者沒人指導(dǎo)從而導(dǎo)致自己不想學(xué)下去了,因此我特意準(zhǔn)備了大量的PDF書籍、視頻教程,都免費(fèi)送給大家!不管你是零基礎(chǔ)還是有基礎(chǔ)都可以獲取到自己相對應(yīng)的學(xué)習(xí)禮包!包括Python軟件工具和2019最新入門到實戰(zhàn)教程,(https://url.cn/59RWE1Z)復(fù)制到瀏覽器打開!

好,以上便是 douyin 庫的所有的關(guān)鍵部分介紹,這部分內(nèi)容可以幫助大家理解這個庫的核心部分實現(xiàn),另外可能對設(shè)計模式、面向?qū)ο笏季S以及一些實用庫的使用有一定的幫助。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容