在C++中怎样创建消息队列连接,发送消息,消息中间件采用ActiveMQ

要通过以太网发送报文,用消息队列发布订阅的方式发送,现在不知道怎么创建连接,怎么发布报文

ActiveMQ.h

#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/Long.h>
#include <decaf/lang/System.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>
#include<boost/asio.hpp>
#include<boost/thread.hpp>

using namespace decaf::lang;
using namespace cms;
using namespace decaf::util::concurrent;
class MQProducer //: public Runnable
{
public:
    static void init();
    static MQProducer* instance();
    int sendMessage(const std::string& msg);
private:
    MQProducer(const std::string& brokerURI, bool useTopic = false, bool sessionTransacted = false);
    virtual ~MQProducer();
    void close();
    virtual void run();
    MQProducer(const MQProducer&);
    MQProducer& operator=(const MQProducer&);
    void cleanup();
    void doSendMessage(const string& msg);
    void getHostIP();
    
private:
    Connection* m_pConnection;
    Session* m_pSession;
    Destination* m_pDestination;
    MessageProducer* m_pProducer;
    bool m_bUseTopic;
    bool m_bSessionTransacted;
    std::string m_brokerURI;
    boost::asio::io_service m_ioservice;
    boost::asio::io_service::work m_work;
    std::string m_server;
    static MQProducer* s_pMQProducer;
};

ActiveMQ.cpp

#include <Utils/ActiveMQ.h>

using namespace activemq::core;
using namespace decaf::util;

MQProducer* MQProducer::s_pMQProducer = NULL; 

MQProducer* MQProducer::instance()
{
    return s_pMQProducer;
}

//有点特殊的单子,确保多线程前,调用了init。
void MQProducer::init()
{
    if (s_pMQProducer == NULL)
    {
        string serverip = "127.0.0.1:16161";
         std::string brokerURI =
            "failover:(tcp://"+ serverip+
            "?wireFormat=openwire"
            //        "&transport.useInactivityMonitor=false"
            //        "&connection.alwaysSyncSend=true"
            "&connection.useAsyncSend=true"
            //        "?transport.commandTracingEnabled=true"
            //        "&transport.tcpTracingEnabled=true"
            //        "&wireFormat.tightEncodingEnabled=true"
            ")";
        s_pMQProducer = new MQProducer(brokerURI, true, false);
    }
}

MQProducer::MQProducer(const std::string& brokerURI, bool useTopic, bool sessionTransacted )
    : m_pConnection(NULL)
    , m_pSession(NULL)
    , m_pDestination(NULL)
    , m_pProducer(NULL)
    , m_bUseTopic(useTopic)
    , m_bSessionTransacted(sessionTransacted)
    , m_brokerURI(brokerURI)
    , m_work(m_ioservice)
    , m_server("")
    {
        activemq::library::ActiveMQCPP::initializeLibrary();
        this->run();
    }

MQProducer::~MQProducer() 
{
    cleanup();
    activemq::library::ActiveMQCPP::shutdownLibrary();
}

void MQProducer::close() 
{
    this->cleanup();
}

void MQProducer::run() 
{
    try 
    {
        // Create a ConnectionFactory
        auto_ptr<ConnectionFactory> connectionFactory(ConnectionFactory::createCMSConnectionFactory(m_brokerURI));
        //ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory(m_brokerURI);

        // Create a Connection
        //connectionFactory->setConnectResponseTimeout(30000);
        m_pConnection = connectionFactory->createConnection();
        m_pConnection->start();

        // Create a Session
        if (this->m_bSessionTransacted) 
        {
            m_pSession = m_pConnection->createSession(Session::SESSION_TRANSACTED);
        }
        else 
        {
            m_pSession = m_pConnection->createSession(Session::AUTO_ACKNOWLEDGE);
        }

        // Create the destination (Topic or Queue)
        if (m_bUseTopic)
        {
            m_pDestination = m_pSession->createTopic("alert");
        }
        else 
        {
            m_pDestination = m_pSession->createQueue("alert");
        }

        // Create a MessageProducer from the Session to the Topic or Queue
        m_pProducer = m_pSession->createProducer(m_pDestination);
        m_pProducer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
    }
    catch (CMSException& e) 
    {
        e.printStackTrace();
    }
}

int MQProducer::sendMessage(const string& msg)
{
    //m_ioservice.post(boost::bind(&utils::MQProducer::doSendMessage, this, msg));
    doSendMessage(msg);
    return 0;
}

void MQProducer::doSendMessage(const string& msg)
{
    try
    {
       //生产者怎么封装休息,消费者那里就要怎么解析消息,这里用的是MapMessage字典方式的消息
        std::auto_ptr<MapMessage> message(m_pSession->createMapMessage());
        message->setString("Instance", "来自BJ");
        message->setString("Severity", "ERROR");
        string text(msg);
        text = boost::locale::conv::between(text, "UTF-8", "GBK");
        message->setString("Message", text);
        m_pProducer->send(message.get());
    }
    catch (CMSException& e)
    {
        e.printStackTrace();
    }
}

void MQProducer::cleanup()
{
    if (m_pConnection != NULL) 
    {
        try 
        {
            m_pConnection->close();
        }
        catch (cms::CMSException& ex) 
        {
            ex.printStackTrace();
        }
    }

    // Destroy resources.
    try {
        delete m_pDestination;
        m_pDestination = NULL;
        delete m_pProducer;
        m_pProducer = NULL;
        delete m_pSession;
        m_pSession = NULL;
        delete m_pConnection;
        m_pConnection = NULL;
    }
    catch (CMSException& e) 
    {
        e.printStackTrace();
    }
}

大概这样,未必能编译过。不过也应该差不多了。
main.cpp

#include <ActiveMQ.h>
int main(int argc, char** argv)
{
    MQProducer::init();
    !MQProducer::instance() ? 0 : utils::MQProducer::instance()->sendMessage("test");
}

C++ ActiveMQ实现通讯
https://blog.csdn.net/lwcloud/article/details/78326879