處理大量數(shù)據(jù)的一種方法(C++)

處理場(chǎng)景:三臺(tái)線陣相機(jī)或者多臺(tái)其他傳感器與工控機(jī)連著,通過(guò)調(diào)用相機(jī)的SDK控制相機(jī),例如同時(shí)開(kāi)關(guān),同時(shí)從相機(jī)中獲取數(shù)據(jù)。然后采集軟件接收到相機(jī)數(shù)據(jù)后,將數(shù)據(jù)保存到硬盤(pán)。

應(yīng)用場(chǎng)景

處理的問(wèn)題:如果三臺(tái)相機(jī)發(fā)送數(shù)據(jù)頻率很快,并且數(shù)據(jù)量很大的話,如何快速將數(shù)據(jù)寫(xiě)入到硬盤(pán)里或者對(duì)數(shù)據(jù)的其他處理,從而保證內(nèi)存不增長(zhǎng)?

解決方案簡(jiǎn)述:每收到一坨相機(jī)的數(shù)據(jù),就將其從尾部壓入隊(duì)列;同時(shí)有另外一個(gè)線程在等著處理這個(gè)隊(duì)列,只要這個(gè)隊(duì)列有數(shù)據(jù),就從頭部彈出一個(gè)數(shù)據(jù),然后對(duì)這個(gè)數(shù)據(jù)進(jìn)行相應(yīng)的處理。這里存在一個(gè)問(wèn)題:如果相機(jī)頻率太快,也就是說(shuō)隊(duì)列會(huì)漲得非??欤瑵q數(shù)據(jù)的速度大于處理數(shù)據(jù)的速度,內(nèi)存就會(huì)上漲,直至崩潰。處理數(shù)據(jù),例如將相機(jī)數(shù)據(jù)保存到硬盤(pán),如果硬盤(pán)寫(xiě)數(shù)據(jù)的速度很慢,就會(huì)出現(xiàn)內(nèi)存漲爆的情況,這個(gè)時(shí)候可以考慮用多線程進(jìn)行處理,從隊(duì)列彈出一個(gè)數(shù)據(jù)后,馬上被一個(gè)線程接上,這樣可以顯著提升CPU的利用率,加快數(shù)據(jù)處理速度。

隊(duì)列緩存

簡(jiǎn)要代碼如下:

需特別注意代碼中的注釋。

//頭文件代碼
#include <iostream>
#include <windows.h>
#include <queue>
#include <mutex>
#include <future>
#include "SThreadPool.h"
struct CameraData
{
    int height;
    int width;
    int datasize;
    std::unique_ptr<char[]> data;
    UINT64 timestamps;
};

class Sensor
{
public:
    Sensor();
    ~Sensor();
    //**獲取相機(jī)數(shù)據(jù)的回調(diào),并在回調(diào)中將相機(jī)數(shù)據(jù)壓入容器中**//
    friend int WINAPI s_ManagaGetImageCallBack(void* _context, void* _system, void* _data);

    //從隊(duì)列頭部彈出數(shù)據(jù)并進(jìn)行處理//
    void t_ProcessDataQueue(void* para);

    void SaveImage(std::shared_ptr<CameraData> data);
    friend void t_SaveCameraImage(void* para, std::shared_ptr<CameraData> data);
private:

    //用來(lái)存儲(chǔ)三臺(tái)相機(jī)數(shù)據(jù)的容器//
    std::mutex m_DataMtx_;
    std::queue<std::shared_ptr<CameraData>> m_QueCamData_;
    std::condition_variable m_DataVar_;
    bool m_bQueExit_ = false;
    std::shared_ptr<SThreadPool> m_Pool_;
    std::future<void> m_fuProcessDataQue_;
};

實(shí)現(xiàn)文件中,重點(diǎn)關(guān)注函數(shù)t_ProcessDataQueue(void* para);同時(shí)這段代碼還包括一種有效退出類內(nèi)線程的方式,請(qǐng)看析構(gòu)函數(shù)部分。

//實(shí)現(xiàn)文件
Sensor::Sensor()
{
    m_fuProcessDataQue_ = std::async(std::launch::async, &Sensor::t_ProcessDataQueue, this, nullptr);

    unsigned long hardwares = std::thread::hardware_concurrency();
    m_Pool_ = std::make_shared<SThreadPool>(hardwares);
}

Sensor::~Sensor()
{
    {
        std::unique_lock<std::mutex> locker(m_DataMtx_);
        m_bQueExit_ = true;
    }
    m_DataVar_.notify_one();
    m_fuProcessDataQue_.get();
}

int WINAPI s_ManagaGetImageCallBack(void* _context, void* _system, void* _data)
{
    Sensor* pThis = (Sensor*)_context;
    std::shared_ptr<CameraData> data = std::make_shared<CameraData>();

    //將從相機(jī)返回出來(lái)的數(shù)據(jù)組裝成CameraData.....
    {
        std::unique_lock<std::mutex> lock(pThis->m_DataMtx_);
        pThis->m_QueCamData_.push(data);
    }
    pThis->m_DataVar_.notify_one();//通知另外一個(gè)線程進(jìn)行數(shù)據(jù)的處理
    return 0;
}

void Sensor::t_ProcessDataQueue(void* para)
{
    std::shared_ptr<CameraData> tempdata;


    std::unique_lock<std::mutex> locker(m_DataMtx_);
    while (true)
    {
        try
        {
            if (!m_bQueExit_) m_DataVar_.wait(locker);
            locker.unlock();
            while (true)
            {
                locker.lock();
                if (m_QueCamData_.empty())
                {
                    std::cout << "camera set queue is empty.\n";
                    break;
                }
                else
                {
                    tempdata = m_QueCamData_.front();
                    m_QueCamData_.pop();
                }
                locker.unlock();

                //process  camera data//
                //01 單線程方式存數(shù)據(jù)
                SaveImage(tempdata);

                //02 多線程的方式存數(shù)據(jù)//
                m_Pool_->enqueue(t_SaveCameraImage, this, tempdata);
            }
        }
        catch (const std::exception& e)
        {
            std::cout << e.what() << "\n";
        }


        if (m_bQueExit_)
        {
            break;
        }
    }
}


void Sensor::SaveImage(std::shared_ptr<CameraData> data)
{
    //保存相機(jī)
    //do want you want
}

void t_SaveCameraImage(void* para, std::shared_ptr<CameraData> data)
{
    //保存相機(jī)數(shù)據(jù)
    //do want you want
}

作為數(shù)據(jù)采集程序,這段代碼非常適用。任何傳感器的數(shù)據(jù)我都能用一個(gè)容器先接著,然后另外一個(gè)線程處理從這個(gè)容器中彈出來(lái)的數(shù)據(jù)。如果采用回調(diào)的方式獲取傳感器數(shù)據(jù),這種方式還不堵塞回調(diào)。既然作為一種通用的方式,這段代碼當(dāng)然可以用模板類進(jìn)行改進(jìn)。筆者嘗試著寫(xiě)了一個(gè)模板類,目前測(cè)試有效,下一篇文章公布模板代碼。

歡迎大牛們與我進(jìn)行交流探討,如果對(duì)上述代碼有疑問(wèn)或者缺少某個(gè)頭文件(如SThreadPool,這是一個(gè)線程池模板類),請(qǐng)掃碼加我微信,備注“C++”。


WOODS

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容