Qt多線程中基于隊(duì)列實(shí)現(xiàn)網(wǎng)絡(luò)通信

基本思想

  1. 在線程中創(chuàng)建QTcpSocket對(duì)象實(shí)例;
  2. 在線程類中采用隊(duì)列的方式動(dòng)態(tài)添加需要執(zhí)行的網(wǎng)絡(luò)命令類實(shí)例;
  3. 為命令執(zhí)行線程定義空閑、數(shù)據(jù)發(fā)送和數(shù)據(jù)讀取三個(gè)狀態(tài);
  4. 將網(wǎng)絡(luò)反饋數(shù)據(jù)以信號(hào)的方式發(fā)送給命令類進(jìn)行解析;

命令執(zhí)行線程類

tcpcmdexecthread.h

#ifndef CMDEXECTHREADABSTRACT_H
#define CMDEXECTHREADABSTRACT_H

#include <QThread>
#include <QTcpSocket>
#include <QMutex>
class CommandAbstract;
class TcpCmdExecThread : public QThread
{
    Q_OBJECT
public:
    enum State{    //命令執(zhí)行狀態(tài)
        E_Idle = 0,//線程空閑
        E_Sending, //數(shù)據(jù)發(fā)送
        E_Reading  //數(shù)據(jù)讀取
    };
    TcpCmdExecThread(QObject *parent = 0);
    ~TcpCmdExecThread();
    void run();
    //追加要執(zhí)行的網(wǎng)絡(luò)命令到線程的執(zhí)行隊(duì)列中
    void appendCmd(CommandAbstract* cmd);
signals:
    //對(duì)外通知網(wǎng)絡(luò)反饋數(shù)據(jù)
    void response(const QByteArray& data);
public slots:
    void onSocketError(QAbstractSocket::SocketError error);
private:
    //準(zhǔn)備下一個(gè)命令,如果成功則返回true,失敗則返回false
    bool prepareNextCmd();
private:
    QList<CommandAbstract*> m_cmdQueue;//命令執(zhí)行隊(duì)列
    CommandAbstract* m_currentCmd;//當(dāng)前命令
    QTcpSocket* m_tcpSocket;
    State m_state;//命令執(zhí)行線程的狀態(tài)
    QMutex m_mutex;
};
#endif // CMDEXECTHREADABSTRACT_H

tcpcmdexecthread.cpp

#include "tcpcmdexecthread.h"
#include "commandabstract.h"
#include <QDebug>
#include <QMutexLocker>
TcpCmdExecThread::TcpCmdExecThread(QObject *parent):
    QThread(parent),
    m_currentCmd(NULL),
    m_tcpSocket(NULL),
    m_state(E_Idle)
{

}

TcpCmdExecThread::~TcpCmdExecThread()
{

}

void TcpCmdExecThread::appendCmd(CommandAbstract *cmd)
{
    QMutexLocker locker(&m_mutex);
    m_cmdQueue.append(cmd);
}

void TcpCmdExecThread::run()
{
    if(m_tcpSocket)
        m_tcpSocket->deleteLater();

    m_tcpSocket = new QTcpSocket;
    connect(m_tcpSocket,SIGNAL(error(QAbstractSocket::SocketError)),
            this,SLOT(onSocketError(QAbstractSocket::SocketError)));
    QString host = QString("127.0.0.1");
    int port = 9090;
    m_tcpSocket->connectToHost(host,port);
    if(!m_tcpSocket->waitForConnected())
    {
        return ;
    }
    qDebug()<<__LINE__<<__FUNCTION__<<"Tcp connected.";
    while(!isInterruptionRequested())//調(diào)用requestInterruption()可退出線程
    {
        QThread::msleep(25);
        switch(m_state)
        {
        case E_Idle:
        {
            QThread::msleep(10);
            if(m_cmdQueue.length() > 0)
            {
                bool success = prepareNextCmd();
                if(success)
                    m_state =  E_Sending;
            }
        }break;
        case E_Sending:
        {
            qDebug()<<__LINE__<<__FUNCTION__<<"m_state =  E_Sending";
            QByteArray sendingData = m_currentCmd->getSendingData();
            m_tcpSocket->write(sendingData);
            qDebug()<<__LINE__<<__FUNCTION__<<"sendingData = "<<sendingData;
            m_tcpSocket->waitForReadyRead();
            m_state =  E_Reading;

        }break;
        case E_Reading:
        {
            qDebug()<<__LINE__<<__FUNCTION__<<"m_state =  E_Reading";
            QByteArray readData = m_tcpSocket->readAll();
            //信號(hào)response采用BlockingQueuedConnection方式與命令槽函數(shù)parseResponseData連接,
            //則當(dāng)parseResponseData函數(shù)執(zhí)行完畢之后,線程才會(huì)繼續(xù)往后執(zhí)行
            emit response(readData);
            m_currentCmd->deleteLater();
            m_state =  E_Idle;
        }break;
        default:{

        }
        }
    }

}

void TcpCmdExecThread::onSocketError(QAbstractSocket::SocketError error)
{
    qDebug()<<__LINE__<<__FUNCTION__<<"error:"<<(int)error;
}

bool TcpCmdExecThread::prepareNextCmd()
{
    if(m_cmdQueue.length() > 0){
        m_currentCmd = m_cmdQueue.takeFirst();
        connect(this,SIGNAL(response(QByteArray)),
                m_currentCmd,SLOT(parseResponseData(QByteArray)),Qt::BlockingQueuedConnection);
        return true;
    }
    return false;
}

網(wǎng)絡(luò)命令基類

commandabstract.h

#ifndef COMMANDABSTRACT_H
#define COMMANDABSTRACT_H

#include <QObject>
#include <QVariantHash>

class CommandAbstract : public QObject
{
    Q_OBJECT
public:
    explicit CommandAbstract(QObject *parent = 0);
    //接口:把命令參數(shù)組織成要發(fā)送給服務(wù)器的數(shù)據(jù)m_sendingData,其中包含操作類型cmdOperType
    virtual void prepareSendingData(const QVariantHash& cmdArgs) = 0;
    //獲取要發(fā)送給服務(wù)器的數(shù)據(jù)m_sendingData
    virtual QByteArray getSendingData() const;
    virtual void setSendingData(const QByteArray& data);
    void setCmdIndex(int index);
    void setOperType(int operType);
signals:
    //通過信號(hào)對(duì)外通知命令執(zhí)行數(shù)據(jù)m_parsedResponseData
    void infoResultData(QVariantHash& parsedResponseData);
public slots:
    //接口:解析來自服務(wù)器的響應(yīng)數(shù)據(jù),保存到m_parsedResponseData中
    virtual void parseResponseData(const QByteArray& data) = 0;
protected:
    int getCmdIndex() const;
    int getCmdOperType() const;
private:
    QVariantHash m_cmdData;//命令參數(shù)
    QByteArray m_sendingData;//最終發(fā)送給服務(wù)器的命令數(shù)據(jù)
    QVariantHash m_parsedResponseData;//解析之后的服務(wù)器響應(yīng)數(shù)據(jù)
    int m_cmdIndex;
    int m_cmdOperType;
};
Q_DECLARE_METATYPE(CommandAbstract*)
#endif // COMMANDABSTRACT_H

commandabstract.cpp

#include "commandabstract.h"
CommandAbstract::CommandAbstract(QObject *parent) :
    QObject(parent),
    m_cmdIndex(0),
    m_cmdOperType(0)
{

}

QByteArray CommandAbstract::getSendingData() const
{
    return m_sendingData;
}

void CommandAbstract::setSendingData(const QByteArray &data)
{
    m_sendingData = data;
}

void CommandAbstract::setCmdIndex(int index)
{
    m_cmdIndex = index;
}

void CommandAbstract::setOperType(int operType)
{
    m_cmdOperType = operType;
}

int CommandAbstract::getCmdIndex() const
{
    return m_cmdIndex;
}

int CommandAbstract::getCmdOperType() const
{
    return m_cmdOperType;
}

使用方法

void MainWindow::on_pushButtonSendCmd_clicked()
{
    m_tcpCmdExecThread->start();
    QVariantHash cmdArgs;
    cmdArgs.insert("key1","cmd1 data1");
    cmdArgs.insert("key2","cmd1 data2");//data1和data2是模擬要發(fā)送給服務(wù)器的命令數(shù)據(jù)

    CommandAbstract* cmd1 = new DemoCommand();//向服務(wù)程序發(fā)送的第一個(gè)命令
    //命令對(duì)象cmd解析完成之后,發(fā)送信號(hào)將數(shù)據(jù)顯示到界面
    connect(cmd1,SIGNAL(infoResultData(QVariantHash&)),
            this,SLOT(onInfoResultData(QVariantHash&)));
    cmd1->setCmdIndex(1);
    cmd1->setOperType(1);
    //準(zhǔn)備發(fā)送數(shù)據(jù)
    cmd1->prepareSendingData(cmdArgs);
    //追加命令到線程的命令隊(duì)列中
    m_tcpCmdExecThread->appendCmd(cmd1);


    cmdArgs.insert("key1","cmd2 data1");
    cmdArgs.insert("key2","cmd2 data2");
    CommandAbstract* cmd2 = new DemoCommand();//向服務(wù)程序發(fā)送的第二個(gè)命令
    //命令對(duì)象cmd解析完成之后,發(fā)送信號(hào)將數(shù)據(jù)顯示到界面
    connect(cmd2,SIGNAL(infoResultData(QVariantHash&)),
            this,SLOT(onInfoResultData(QVariantHash&)));
    cmd2->setCmdIndex(1);
    cmd2->setOperType(1);
    //準(zhǔn)備發(fā)送數(shù)據(jù)
    cmd2->prepareSendingData(cmdArgs);
    //追加命令到線程的命令隊(duì)列中
    m_tcpCmdExecThread->appendCmd(cmd2);
}

//顯示解析后的網(wǎng)絡(luò)數(shù)據(jù)到界面
void MainWindow::onInfoResultData(QVariantHash &parsedResponseData)
{
    QString dataFromServer = parsedResponseData.value("response").toString();
    ui->textEdit->append(dataFromServer);
}

運(yùn)行效果

客戶程序.png

服務(wù)程序.png

源代碼下載

說明:源代碼中的commandmodeserver.exe為本例中的服務(wù)端程序,本例中的客戶程序會(huì)與它進(jìn)行連接,提供服務(wù)端數(shù)據(jù)響應(yīng)。
百度網(wǎng)盤分享地址:
鏈接:https://pan.baidu.com/s/1K3c_OJqfHsXpRBSiinMqkA
提取碼:z8ix

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容