python協(xié)程,多協(xié)程,多進(jìn)程,多線程,多進(jìn)程調(diào)用協(xié)程的例子

1.笨栗子就是對(duì)多進(jìn)程中調(diào)用協(xié)程,
pool.apply_async(asyncio.get_event_loop().run_until_complete(Url().save_url_to_redis()), (i,))
以及多進(jìn)程和協(xié)程之間的關(guān)系:

  • pool.apply_async(asyncio.get_event_loop().run_until_complete(Url().save_url_to_redis()), (i,)) # 多進(jìn)程調(diào)用協(xié)程 ,將協(xié)程函數(shù)重復(fù)執(zhí)行三次,
  • 在這里的話就將 url_count=20000 重復(fù)執(zhí)行存入redis 3次,最后就導(dǎo)致總共存入的條數(shù)就是 url_count=20000 的進(jìn)程倍數(shù)
  • 總結(jié): 多進(jìn)程不要將要存入或者寫(xiě)入的數(shù)據(jù)放在函數(shù)里面,要結(jié)合redis 做隊(duì)列來(lái)分發(fā)任務(wù),不然任務(wù)就重復(fù)了
    # 協(xié)程: 也是一樣的,要將參數(shù)放在redis,rpop彈出一個(gè)參數(shù)給一個(gè)協(xié)程或者進(jìn)程去消費(fèi)
# -*- coding: utf-8 -*-
"""
 @Time   : 2020/6/30 15:54 
 @Athor   : LinXiao
 @功能   :
"""
# ------------------------------
import asyncio
import multiprocessing
import threading
import time
import timeit
from pprint import pprint

import aioredis
from loguru import logger
from redis import Redis


class Url():
    def __int__(self):
        self.do_conuts=20
        # self.session_counts=2
        # self.url_count=25

    async def redis_connect(self):
        self.host="127.0.0.1"
        self.port="6379"
        # self.password = "",
        self.db=6
        try:
            self.redis=await aioredis.create_redis_pool(
                (self.host,
                 self.port),
                # password=self.password,
                db=self.db
            )
            logger.info(f"redis connection successfully")
            return True

        except Exception as e:
            logger.error(f"redis connection error: {e.args}")
            raise e

    async def close(self):

        self.redis.close()
        await self.redis.wait_closed()

    async def save_url_to_redis_single(self):  # 單協(xié)程
        stop_flag=False
        await self.redis_connect()
        url_count=63
        while True:
            for i in range(1, url_count + 1):
                url=f"https://explorer-web.api.btc.com/v1/eth/txns/0?page={i}&size=150"
                await self.redis.lpush("redis_connect_urls", url)
                logger.info(f'push {i} to redis')
                if i == url_count:
                    await self.close()
                    # break
            await self.close()
            break

    async def save_url_to_redis_multi(self, n):  # 多協(xié)程
        stop_flag=False
        await self.redis_connect()
        url_count=6300
        while True:
            for i in range(1, url_count + 1):
                url=f"https://explorer-web.api.btc.com/v1/eth/txns/0?page={i}&size=150"
                await self.redis.lpush("redis_connect_urls", url)
                logger.info(f'push {i} to redis')
                logger.info(f"task No.{n} 第{i}頁(yè) to redis")

                if i == url_count:
                    # await self.close()
                    break
            # await self.close()
            break

    async def multiasyico_test(self, n):
        for i in range(3):
            pprint(i)
        print(
            "-----------------------------------------------------------------------------------------------------------")

    async def start(self):
        await self.redis_connect()
        asy_count=5  # 協(xié)程數(shù)
        tasks=[self.save_url_to_redis_multi(n + 1) for n in range(asy_count)]
        await asyncio.gather(*tasks)
        await self.close()

    def main(self):
        loop=asyncio.get_event_loop()
        loop.run_until_complete(self.start())  # 多協(xié)程
        # loop.run_until_complete(self.save_url_to_redis())
        # loop.close()


def save_url_to_redis_2():
    redis=Redis(db=9)
    url_count=63
    # while True:
    for i in range(1, url_count + 1):
        url=f"https://explorer-web.api.btc.com/v1/eth/txns/0?page={i}&size=150"
        redis.lpush("redis_connect_urls", url)
        logger.info(f'push {i} to redis')
        # logger.info(f"task No.{} 第{i}頁(yè) to redis")

        # if i == url_count:
        #     break
        # break


def multiPro_test():
    # for i in range(10):
    # pprint(i)
    print("-----------------------------------------------------------------------------------------------------------")


if __name__ == '__main__':

    # 在這里的話就將 url_count=20000 重復(fù)執(zhí)行存入redis 3次,最后就導(dǎo)致總共存入的條數(shù)就是 url_count=20000 的進(jìn)程倍數(shù)
    # 總結(jié): 多進(jìn)程不要將要存入或者寫(xiě)入的數(shù)據(jù)放在函數(shù)里面,要結(jié)合redis 做隊(duì)列來(lái)分發(fā)任務(wù),不然任務(wù)就重復(fù)了
    # 協(xié)程: 也是一樣的,要將參數(shù)放在redis,rpop彈出一個(gè)參數(shù)給一個(gè)協(xié)程或者進(jìn)程去消費(fèi)

    start=timeit.default_timer()

    # 多線程
    # if __name__ == '__main__':
    #     login()
    try:
        i=0
        # 開(kāi)啟線程數(shù)目
        tasks_number=10  # 這里將一個(gè)函數(shù)重復(fù)執(zhí)行三次
        print('測(cè)試啟動(dòng)')
        while i < tasks_number:
            print(f"tasks_number  is {i}")
            t=threading.Thread(target=multiPro_test)
            t.start()
            i+=1
        # time2=time.clock()
        # times=time2 - time1

    except Exception as e:
        print(e)

    # 單協(xié)程
    # loop = asyncio.get_event_loop()
    # loop.run_until_complete(Url().save_url_to_redis_single())
    # loop.close()

    # 多協(xié)程
    # Url().main()       # 3.6690529

    # 多進(jìn)程
    # process_count=3
    # pool=multiprocessing.Pool(process_count)
    # for i in range(process_count):
    #     # pool.apply_async(asyncio.get_event_loop().run_until_complete(Url().save_url_to_redis()), (i,))  # 多進(jìn)程調(diào)用協(xié)程 ,將協(xié)程函數(shù)重復(fù)執(zhí)行三次,
    #     pool.apply_async(multiPro_test(), (i,))  # 多進(jìn)程調(diào)用普通函數(shù)
    # pool.close()
    # pool.join()

    # 多協(xié)程 和 多進(jìn)程 寫(xiě)入 redis的時(shí)候 會(huì)將數(shù)據(jù)按照協(xié)程數(shù)和進(jìn)程數(shù)加倍寫(xiě)入!!!!

    # 多進(jìn)程中調(diào)用多協(xié)程:  ! 還有問(wèn)題!
    # process_count=3
    # eth = Url().main()
    # pool=multiprocessing.Pool(process_count)
    # for i in range(process_count):
    #     pool.apply_async(asyncio.get_event_loop().run_until_complete(eth), (i,)) # 多進(jìn)程調(diào)用協(xié)程 ,將協(xié)程函數(shù)重復(fù)執(zhí)行三次,
    # pool.close()
    # pool.join()

    # end=timeit.default_timer()
    # print('Running time: %s Seconds' % (end - start))


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容