Python實現(xiàn)一條基于POS算法的區(qū)塊鏈

最新內(nèi)容會更新在主站深入淺出區(qū)塊鏈社區(qū)
原文鏈接:Python實現(xiàn)一條基于POS算法的區(qū)塊鏈

區(qū)塊鏈中的共識算法

在比特幣公鏈架構(gòu)解析中,就曾提到過為了實現(xiàn)去中介化的設(shè)計,比特幣設(shè)計了一套共識協(xié)議,并通過此協(xié)議來保證系統(tǒng)的穩(wěn)定性和防攻擊性。 并且我們知道,截止目前使用最廣泛,也是最被大家接受的共識算法,是我們先前介紹過的POW(proof of work)工作量證明算法。目前市值排名前二的比特幣和以太坊也是采用的此算法。

雖然POW共識算法取得了巨大的成功,但對它的質(zhì)疑也從來未曾停止過。 其中最主要的一個原因就是電力消耗。據(jù)不完全統(tǒng)計,基于POW的挖礦機制所消耗的電量是非常巨大的,甚至比絕大多數(shù)國家耗電量還要多。這對我們的資源造成了極大的浪費,此外隨著比特大陸等公司的強勢崛起,造成了算力的高度集中。

基于以上種種原因,更多的共識算法被提出來 POS、DPOS、BPFT等等。 今天我們就來認(rèn)識POS(proof of stake)算法。

Proof of stake,譯為權(quán)益證明。你可能已經(jīng)猜到了,權(quán)益證明簡單理解就是擁有更多token的人,有更大的概率獲得記賬權(quán)利,然后獲得獎勵。 這個概率具體有多大呢? 下面我們在代碼實現(xiàn)中會展示,分析也放在后面。 當(dāng)然,POS是會比POW更好嗎? 會更去中心化嗎? 現(xiàn)在看來未必,所以我們這里也不去對比誰優(yōu)誰劣。 我們站在中立的角度,單純的來討論討論POS這種算法。

代碼實戰(zhàn)

生成一個Block

既然要實現(xiàn)POS算法,那么就難免要生成一條鏈,鏈又是由一個個Block生成的,所以下面我們首先來看看如何生成Block,當(dāng)然在前面的內(nèi)容里面,關(guān)于如何生成Block,以及交易、UTXO等等都已經(jīng)介紹過了。由于今天我們的核心是實現(xiàn)POS,所以關(guān)于Block的生成,我們就用最簡單的實現(xiàn)方式,好讓大家把目光聚焦在核心的內(nèi)容上面。

我們用三個方法來實現(xiàn)生成一個合法的區(qū)塊

  • calculate_hash 計算區(qū)塊的hash值
  • is_block_valid 校驗區(qū)塊是否合法
  • generate_block 生成一個區(qū)塊

from hashlib import sha256
from datetime import datetime

def generate_block(oldblock, bpm, address):
    """

    :param oldblock:
    :param bpm:
    :param address:
    :return:
    """
    newblock = {
        "Index": oldblock["Index"] + 1,
        "BPM": bpm,
        "Timestamp": str(datetime.now()),
        "PrevHash": oldblock["Hash"],
        "Validator": address
    }

    newblock["Hash"] = calculate_hash(newblock)
    return newblock


def calculate_hash(block):
    record = "".join([
        str(block["Index"]),
        str(block["BPM"]),
        block["Timestamp"],
        block["PrevHash"]
    ])

    return sha256(record.encode()).hexdigest()


def is_block_valid(newblock, oldblock):
    """

    :param newblock:
    :param oldblock:
    :return:
    """

    if oldblock["Index"] + 1 != newblock["Index"]:
        return False

    if oldblock["Hash"] != newblock["PrevHash"]:
        return False

    if calculate_hash(newblock) != newblock["Hash"]:
        return False

    return True

這里為了更靈活,我們沒有用類的實現(xiàn)方式,直接采用函數(shù)來實現(xiàn)了Block生成,相信很容易看懂。

創(chuàng)建一個TCP服務(wù)器

由于我們需要用權(quán)益證明算法來選擇記賬人,所以需要從很多Node(節(jié)點)中選擇記賬人,也就是需要一個server讓節(jié)點鏈接上來,同時要同步信息給節(jié)點。因此需要一個TCP長鏈接。

from socketserver import BaseRequestHandler, ThreadingTCPServer

def run():
    # start a tcp server
    serv = ThreadingTCPServer(('', 9090), HandleConn)
    serv.serve_forever()

在這里我們用了python內(nèi)庫socketserver來創(chuàng)建了一個TCPServer。 需要注意的是,這里我們是采用的多線程的創(chuàng)建方式,這樣可以保證有多個客戶端同時連接上來,而不至于被阻塞。當(dāng)然,這里這個server也是存在問題的,那就是有多少個客戶端連接,就會創(chuàng)建多少個線程,更好的方式是創(chuàng)建一個線程池。由于這里是測試,所以就采用更簡單的方式了。

相信大家已經(jīng)看到了,在我們創(chuàng)建TCPServer的時候,使用到了HandleConn,但是我們還沒有定義,所以接下來我們就來定義一個HandleConn

消息處理器

下面我們來實現(xiàn)Handler函數(shù),Handler函數(shù)在跟Client Node通信的時候,需要我們的Node實現(xiàn)下面的功能

  • Node可以輸入balance(token數(shù)量) 也就是股權(quán)數(shù)目
  • Node需要能夠接收廣播,方便Server同步區(qū)塊以及記賬人信息
  • 添加自己到候選人名單 (候選人為持有token的人)
  • 輸入BPM生成Block
  • 驗證一個區(qū)塊的合法性

感覺任務(wù)還是蠻多的,接下來我們看代碼實現(xiàn)

import threading
from queue import Queue, Empty

# 定義變量
block_chain = []
temp_blocks = []
candidate_blocks = Queue()  # 創(chuàng)建隊列,用于線程間通信
announcements = Queue()
validators = {}

My_Lock = threading.Lock()

class HandleConn(BaseRequestHandler):
    def handle(self):
        print("Got connection from", self.client_address)

        # validator address
        self.request.send(b"Enter token balance:")
        balance = self.request.recv(8192)
        try:
            balance = int(balance)
        except Exception as e:
            print(e)

        t = str(datetime.now())
        address = sha256(t.encode()).hexdigest()
        validators[address] = balance
        print(validators)

        while True:
            announce_winner_t = threading.Thread(target=annouce_winner, args=(announcements, self.request,),
                                                 daemon=True)
            announce_winner_t.start()

            self.request.send(b"\nEnter a new BPM:")
            bpm = self.request.recv(8192)
            try:
                bpm = int(bpm)
            except Exception as e:
                print(e)
                del validators[address]
                break

            # with My_Lock:
            last_block = block_chain[-1]

            new_block = generate_block(last_block, bpm, address)

            if is_block_valid(new_block, last_block):
                print("new block is valid!")
                candidate_blocks.put(new_block)

            self.request.send(b"\nEnter a new BPM:\n")

            annouce_blockchain_t = threading.Thread(target=annouce_blockchain, args=(self.request,), daemon=True)
            annouce_blockchain_t.start()

這段代碼,可能對大多數(shù)同學(xué)來說是有難度的,在這里我們采用了多線程的方式,同時為了能夠讓消息在線程間通信,我們使用了隊列。 這里使用隊列,也是為了我們的系統(tǒng)可以更好的拓展,后面如果可能,這一節(jié)的程序很容易拓展為分布式系統(tǒng)。 將多線程里面處理的任務(wù)拆分出去成獨立的服務(wù),然后用消息隊列進(jìn)行通信,就是一個簡單的分布式系統(tǒng)啦。(是不是很激動?)

由于這里有難度,所以代碼還是講一講吧

    # validator address
        self.request.send(b"Enter token balance:")
        balance = self.request.recv(8192)
        try:
            balance = int(balance)
        except Exception as e:
            print(e)

        t = str(datetime.now())
        address = sha256(t.encode()).hexdigest()
        validators[address] = balance
        print(validators)

這一段就是我們提到的Node 客戶端添加自己到候選人的代碼,每鏈接一個客戶端,就會添加一個候選人。 這里我們用添加的時間戳的hash來記錄候選人。 當(dāng)然也可以用其他的方式,比如我們代碼里面的client_address


announce_winner_t = threading.Thread(target=annouce_winner, args=(announcements, self.request,),
                                                daemon=True)
        announce_winner_t.start()

def annouce_winner(announcements, request):
    """

    :param announcements:
    :param request:
    :return:
    """
    while True:
        try:
            msg = announcements.get(block=False)
            request.send(msg.encode())
            request.send(b'\n')
        except Empty:
            time.sleep(3)
            continue


然后接下來我們起了一個線程去廣播獲得記賬權(quán)的節(jié)點信息到所有節(jié)點。

self.request.send(b"\nEnter a new BPM:")
            bpm = self.request.recv(8192)
            try:
                bpm = int(bpm)
            except Exception as e:
                print(e)
                del validators[address]
                break

            # with My_Lock:
            last_block = block_chain[-1]

            new_block = generate_block(last_block, bpm, address)

            if is_block_valid(new_block, last_block):
                print("new block is valid!")
                candidate_blocks.put(new_block)

根據(jù)節(jié)點輸入的BPM值生成一個區(qū)塊,并校驗區(qū)塊的有效性。 將有效的區(qū)塊放到候選區(qū)塊當(dāng)中,等待記賬人將區(qū)塊添加到鏈上。

annouce_blockchain_t = threading.Thread(target=annouce_blockchain, args=(self.request,), daemon=True)
        annouce_blockchain_t.start()

def annouce_blockchain(request):
    """

    :param request:
    :return:
    """
    while True:
        time.sleep(30)
        with My_Lock:
            output = json.dumps(block_chain)
        try:
            request.send(output.encode())
            request.send(b'\n')
        except OSError:
            pass

最后起一個線程,同步區(qū)塊鏈到所有節(jié)點。

看完了,節(jié)點跟Server交互的部分,接下來是最重要的部分,

POS算法實現(xiàn)

def pick_winner(announcements):
    """
    選擇記賬人
    :param announcements:
    :return:
    """
    time.sleep(10)

    while True:
        with My_Lock:
            temp = temp_blocks

        lottery_pool = []  #

        if temp:
            for block in temp:
                if block["Validator"] not in lottery_pool:
                    set_validators = validators
                    k = set_validators.get(block["Validator"])
                    if k:
                        for i in range(k):
                            lottery_pool.append(block["Validator"])

            lottery_winner = choice(lottery_pool)
            print(lottery_winner)
            # add block of winner to blockchain and let all the other nodes known
            for block in temp:
                if block["Validator"] == lottery_winner:
                    with My_Lock:
                        block_chain.append(block)

                    # write message in queue.
                    msg = "\n{0} 贏得了記賬權(quán)利\n".format(lottery_winner)
                    announcements.put(msg)

                    break

        with My_Lock:
            temp_blocks.clear()

這里我們用pick_winner 來選擇記賬權(quán)利,我們根據(jù)token數(shù)量構(gòu)造了一個列表。 一個人獲得記賬權(quán)利的概率為:

p = mount['NodeA']/mount['All']

文字描述就是其token數(shù)目在總數(shù)中的占比。 比如總數(shù)有100個,他有10個,那么其獲得記賬權(quán)的概率就是0.1, 到這里核心的部分就寫的差不多了,接下來,我們來添加節(jié)點,開始測試吧

測試POS的記賬方式

在測試之前,起始還有一部分工作要做,前面我們的run方法需要完善下,代碼如下:

def run():
    # create a genesis block
    t = str(datetime.now())
    genesis_block = {
        "Index": 0,
        "Timestamp": t,
        "BPM": 0,
        "PrevHash": "",
        "Validator": ""
    }

    genesis_block["Hash"] = calculate_hash(genesis_block)
    print(genesis_block)
    block_chain.append(genesis_block)

    thread_canditate = threading.Thread(target=candidate, args=(candidate_blocks,), daemon=True)
    thread_pick = threading.Thread(target=pick_winner, args=(announcements,), daemon=True)

    thread_canditate.start()
    thread_pick.start()

    # start a tcp server
    serv = ThreadingTCPServer(('', 9090), HandleConn)
    serv.serve_forever()

def candidate(candidate_blocks):
    """

    :param candidate_blocks:
    :return:
    """
    while True:
        try:
            candi = candidate_blocks.get(block=False)
        except Empty:
            time.sleep(5)
            continue
        temp_blocks.append(candi)

if __name__ == '__main__':
    run()

添加節(jié)點連接到TCPServer

為了充分減少程序的復(fù)雜性,tcp client我們這里就不實現(xiàn)了,可以放在后面拓展部分。 畢竟我們這個系統(tǒng)是很容易擴展的,后面我們拆分了多線程的部分,在實現(xiàn)tcp client就是一個完整的分布式系統(tǒng)了。

所以,我們這里用linux自帶的命令 nc,不知道nc怎么用的同學(xué)可以google或者 man nc


image
  • 啟動服務(wù) 運行 python pos.py
  • 打開3個終端
  • 分別輸入下面命令
    • nc localhost 9090

終端如果輸出

Enter token balance: 

說明你client已經(jīng)鏈接服務(wù)器ok啦.

測試POS的記賬方式

接下來依次按照提示操作。 balance可以按心情來操作,因為這里是測試,我們輸入100,
緊接著會提示輸入BPM,我們前面提到過,輸入BPM是為了生成Block,那么就輸入吧,隨便輸入個9. ok, 接下來就稍等片刻,等待記賬。
輸出如同所示


image

依次在不同的終端,根據(jù)提示輸入數(shù)字,等待消息同步。

生成區(qū)塊鏈

下面是我這邊獲得的3個block信息。


image

總結(jié)

在上面的代碼中,我們實現(xiàn)了一個完整的基于POS算法記賬的鏈,當(dāng)然這里有許多值得擴展與改進(jìn)的地方。

  • python中多線程開銷比較大,可以改成協(xié)程的方式
  • TCP建立的長鏈接是基于TCPServer,是中心化的方式,可以改成P2P對等網(wǎng)絡(luò)
  • 鏈的信息不夠完整
  • 系統(tǒng)可以拓展成分布式,讓其更健壯

大概列了以上幾點,其他還有很多可以拓展的地方,感興趣的朋友可以先玩玩, 后者等到我們后面的教程。 (廣告打的措手不及,哈哈)

當(dāng)然了,語言不是重點,所以在這里,我也實現(xiàn)了go語言的版本源碼地址

go語言的實現(xiàn)感覺要更好理解一點,也顯得要優(yōu)雅一點。這也是為什么go語言在分布式領(lǐng)域要更搶手的原因之一吧!

項目地址

參考

本文來自深入淺出區(qū)塊鏈作者 Magic_陳,他的專欄專注區(qū)塊鏈底層技術(shù)開發(fā),P2P網(wǎng)絡(luò)、加密技術(shù)、MerkleTree、DAG、DHT等等,另外對于分布式系統(tǒng)的學(xué)習(xí)也很有幫助。歡迎大家交流。
深入淺出區(qū)塊鏈 - 系統(tǒng)學(xué)習(xí)區(qū)塊鏈,打造最好的區(qū)塊鏈技術(shù)博客。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 巴比特旗下時戳資本近日發(fā)布了《區(qū)塊鏈公鏈項目研究報告》。作為時戳資本區(qū)塊鏈行業(yè)研究報告系列03,這份最新的報告主要...
    shenciyou閱讀 2,642評論 1 10
  • 我是一個急性子,因為吃虧上當(dāng)?shù)亩嗔耍袁F(xiàn)在不愛幫人墊錢買東西了,除非在我們的交情里,你是一個物質(zhì)守恒的人。 也許...
    靜夜與空白閱讀 499評論 1 1
  • 戊戌春月,萬物復(fù)蘇,是一個乘船游湖的好時節(jié),今天我就將和二年級的小朋友們一起在課堂里去解決游湖中所遇到的一...
    老毛子的想象力閱讀 560評論 0 2
  • 來源:技術(shù)鄰 張應(yīng)遷 屢冠全球!江蘇14座跨江大橋美不勝收! 1.江陰長江大橋:位于江蘇省靖江市與江陰市間,是我國...
    技術(shù)汪閱讀 7,427評論 0 1
  • “說的這么容易?那你來做??!” “我沒做過,不會,我怎么做?” “那你就不要指指點點,這個不對,那個不滿意,你要你...
    紫嵐兮閱讀 221評論 0 0

友情鏈接更多精彩內(nèi)容