RocketMQ 5.2.0安装指南及与Python和C通信实现详解

文章目录

  • 前言
  • 实现
  • 第一步
  • 第二步
  • 第三步
  • 第四步
  • 其他参考

  • 前言

    记录一下rocket5.2.0安装以及和python和c++通信的踩坑史。
    由于只是简单实现,c++部分的实现可能并不完美。

    linux:ubuntu20.04 2核2g 有公网ip
    rocketmq:5.2.0 (发现一定要看官网的说明,大多博客都是安装的4.x版本的)
    c++ :g++4.9.3 调用的库rocket版本:rocketmq-client-cpp-2.2.0
    python:3.11.5 调用的库rocket版本:rocketmq-client-cpp-2.0.0
    Pypi上推荐python版本是3.10 我这里用的3.11版本也可以通信

    rocketmq官网:https://rocketmq.apache.org/
    rocketmq-python客户端:https://github.com/apache/rocketmq-client-python
    rocketmq-cpp客户端:https://github.com/apache/rocketmq-client-cpp

    客户端总览:https://github.com/apache/rocketmq-clients/tree/master

    发现有一些bug,有的rocket官方文档的各个相互之间更新的不及时,在客户端总览上说什么python还在施工,其实早就可以通信了,
    还有在python客户端上的让下载的动态链接库是2.0.0版本的,而在c++客户端上没有明确说明,看release已经可以下到2.3.0版本的了。

    当时我先运行的python,后来再下的c++,虽然两者的动态链接库版本不一样,但还是可以正常通信的(暂时)。


    第1张图 显示的是c++的生产者发,python的消费者收

    第2张图 显示的是python的生产者发 c++的消费者收

    实现

    若有官网下载太慢的话,可用sftp方法(这里的目录上的)。

    若是遇见防火墙未开启,见下文开启,常用的用来通信的网口有:9876,10911,10909,10912

    # 开放防火墙端口
    firewall-cmd --zone=public --add-port=10911/tcp --permanent
    # 更新
    firewall-cmd --reload
    # 查看
    firewall-cmd --list-port
    

    步骤:
    1.安装roctetmq 和测试rocket
    2.安装python 和测试python
    3.安装c++
    4.测试通信

    这里事前我不知道使用vscode远程是可以不需要在云服务器上装vscode 我还装了一下

    装vscode

    # 方式1 通过snap安装
    ## Snap 是 Ubuntu 和其他 Linux 发行版中常用的包管理系统
    sudo apt-get install snapd
    snap --version
    sudo snap install --classic code
    code --version
    # 方式2 通过apt安装
    ## 更新系统包列表
    sudo apt update
    ## 安装依赖包
    sudo apt install software-properties-common apt-transport-https wget
    ## 导入Microsoft GPG key 可分成两步
    wget -q https://packages.microsoft.com/keys/microsoft.asc -O- | sudo apt-key add -
    #wget https://packages.microsoft.com/keys/microsoft.asc -O microsoft.asc
    #sudo apt-key add microsoft.asc 
    ## 添加VS Code repository
    sudo add-apt-repository "deb [arch=amd64] https://packages.microsoft.com/repos/vscode stable main"
    ## 安装VS Code
    sudo apt update
    sudo apt install code
    code --version
    

    这里我还安装anconda来管理python的环境
    装anconda 这里选择版本

    # 下载Anaconda安装脚本:
    wget https://repo.anaconda.com/archive/Anaconda3-2024.06-1-Linux-x86_64.sh
    # 运行安装脚本
    bash Anaconda3-2024.06-1-Linux-x86_64.sh
    # 选择安装位置(默认位置通常是/home/your_username/anaconda3)是否初始化 直接yes
    # 激活安装
    source ~/.bashrc
    conda --version
    

    第一步

    安装roctetmq
    github上release版本:https://github.com/apache/rocketmq/tags
    官网上release版本:https://rocketmq.apache.org/zh/download

    参考:【RocketMQ】安装RocketMQ5.2.0(单机版)
    及官方文档

    # 安装Java RocketMQ是用Java编写的,因此首先需要在你的Linux系统上安装Java
    sudo apt update
    sudo apt install openjdk-8-jdk
    java -version
    # 下载并解压RocketMQ 虽然我这里已经有源码的5.3.0版本 但我编译时不成功,选择了已经有编译过的二进制文件的5.2.0版本
    wget https://dist.apache.org/repos/dist/release/rocketmq/5.2.0/rocketmq-all-5.2.0-bin-release.zip
    unzip rocketmq-all-5.2.0-bin-release.zip
    cd rocketmq-all-5.2.0-bin-release
    cd bin
    # 启动mqnamesrv #The Name Server boot success...  成功了
    sh mqnamesrv &
    # 启动broker # rocket-proxy startup successfully 
    sh mqbroker -n localhost:9876 --enable-proxy &
    # 查看jps 出现一个nameserver 和 一个proxy 即启动成功
    jps
    # 测试 生产者发
    sh tools.sh org.apache.rocketmq.example.quickstart.Producer
    # 测试 消费者收
    sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
    # 关闭
    sh mqshutdown broker
    sh mqshutdown namesrv
    
    

    测试通过会发这样的数据

    注:
    1.好多博客上讲的什么autoCreateTopicEnable=true 都是4.x版本上的,注意和5.x版本的区别在于5.x版本是--enable-proxy ,一定要在启动broker时加上,也是自动创建话题的作用。
    2.如果这里的broker.conf(注意相对位置)

    nano rocketmq-all-5.2.0-bin-release/conf/broker.conf
    


    不增加一行namesrvAddr = localhost:9876的话,在启动broker时 一定要增加这行-n localhost:9876
    这里的localhost 为回环地址,也是本地地址,即自己发给自己,可以改成公网IP 这样大家可以通过公网来通信(防火墙开的话)

    3.若是服务器的内存过小,是会启动失败的,因为默认rocketmq的启动内存很大,要修改三个文件,我这里只有2g 所以要修改。
    会报类似于这样的错:

    bin文件夹下

    nano runserver.sh
    

    将原来的8g 什么大小的 我都改成了64m (大家可参考自己的内存大小配置)

    nano runbroker.sh
    

    tools.sh
    


    5.其他修改
    增加环境变量

    nano ~/.bashrc
    

    增加了两行,使得可以用$ROCKETMQ_HOME来表示rocketmq的地址

    export ROCKETMQ_HOME=/root/rocketmq-all-5.2.0-bin-release
    export PATH=$ROCKETMQ_HOME/bin:$PATH
    


    更新环境变量

    source ~/.bashrc
    

    之后在任意目录就可以运行

    # 运行
    mqnamesrv &
    mqbroker -n localhost:9876 --enable-proxy &
    # 关闭
    mqshutdown broker
    mqshutdown namesrv
    


    6.
    这个

    nohup: ignoring input and appending output to 'nohup.out'
    

    OpenJDK 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
    OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
    OpenJDK 64-Bit Server VM warning: MaxNewSize (65536k) is equal to or greater than the entire heap (65536k).  A new max generation size of 65472k will be used.
    

    不算报错可以不用管

    第二步

    这里按照官方文档

    # 选择debian,ubuntu是debian下的
    wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb
    sudo dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb
    

    不装的话会报这样的错。

    Traceback (most recent call last):
      File "/root/study/producer.py", line 2, in <module>
        from rocketmq.client import Producer, Message
      File "/root/anaconda3/envs/myrl/lib/python3.11/site-packages/rocketmq/client.py", line 24, in <module>
        from .ffi import (
      File "/root/anaconda3/envs/myrl/lib/python3.11/site-packages/rocketmq/ffi.py", line 40, in <module>
        raise ImportError('rocketmq dynamic library not found')
    ImportError: rocketmq dynamic library not found
    

    Pypi安装

    # 我这里是事先装的anconda
    conda create -n mymq python=3.11.5
    conda activate mq
    pip install rocketmq-client
    

    测试通信
    用的也是官网的例子
    producer.py 生产者

    from rocketmq.client import Producer, Message
    
    producer = Producer('PID-XXX')
    producer.set_name_server_address('127.0.0.1:9876')
    producer.start()
    
    msg = Message('YOUR-TOPIC')
    msg.set_keys('XXX')
    msg.set_tags('XXX')
    msg.set_body('XXXX')
    ret = producer.send_sync(msg)
    print(ret.status, ret.msg_id, ret.offset)
    producer.shutdown()
    

    consumer.py 消费者

    import time
    
    from rocketmq.client import PushConsumer, ConsumeStatus
    
    
    def callback(msg):
        print(msg.id, msg.body)
        return ConsumeStatus.CONSUME_SUCCESS
    
    
    consumer = PushConsumer('CID_XXX')
    consumer.set_name_server_address('127.0.0.1:9876')
    consumer.subscribe('YOUR-TOPIC', callback)
    consumer.start()
    
    while True:
        time.sleep(3600)
    
    consumer.shutdown()
    
    

    注:
    1.两处的topic名字得一样,YOUR-TOPIC
    2.地址得一样127.0.0.1:9876
    3.要namesrv 和broker 开着才能运行,先开启生产者,后开启消费者。

    第三步

    我是先按照这个将c++的环境先装好
    Linux系统VsCode 配置C/C++环境

    #include <iostream>
    using namespace std
    int main() {
        cout << "Hello, World!" << endl;
        return 0;
    }
    

    测试通过后
    可以按照官网的教程装
    我是直接在release版本中下了一个二进制的版本

    cd /root/
    # 下载
    wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.2.0/rocketmq-client-cpp-2.2.0-bin-release.tar.gz
    # 解压
    tar -xzf rocketmq-client-cpp-2.2.0-bin-release.tar.gz
    

    这里就出现了下载了两个版本的rocketmq-client-cpp的问题,因为我暂时也不知道如何将c++导入之前python下的rocketmq-client-cpp-2.0.0,只知道之前python装的应该在/usr/local/include/rocketmq文件夹下。
    所以这里将就用了。。

    代码:
    测试代码参考RocketMQ介绍(三)——RocketMQ编程示例,或者我下的二进制版本的rocketmq-client-cpp这里的文件夹下有example。

    生产者 test_producer.cpp
    将topic改成和python一样的YOUR-TOPIC
    address 改成一样的127.0.0.1:9876

    /*
    * Description: Simple Producer demo
    */
     
    #include <unistd.h>
    #include <stdlib.h>
    #include <iostream>
    #include <string>
    #include "CProducer.h"
    #include "CMessage.h"
    #include "CSendResult.h"
     
    using namespace std;
     
    // send message
    void StartSendMessage(CProducer *producer)
    {
        CSendResult result;
     
        // create message and set some values for it
        CMessage *msg = CreateMessage("YOUR-TOPIC");
        SetMessageTags(msg, "Test_Tag");
        SetMessageKeys(msg, "Test_Keys");
        
        for (int i = 0; i < 10; i++)
        {
            // construct different body
            string strMessageBody = "this is body number-" + to_string(i);
            
            SetMessageBody(msg, strMessageBody.c_str());
            // send message
            SendMessageSync(producer, msg, &result);
     
            cout << "send message[" << i << "], result status:" << (int)result.sendStatus << ", msgBody:" << strMessageBody << endl;
            usleep(1000000);
        }
     
        // destroy message
        DestroyMessage(msg);
    }
     
    int main(int argc, char *argv[])
    {
        cout << "Producer Initializing....." << endl;
     
        // create producer and set some values for it
        CProducer *producer = CreateProducer("Group_producer");
        SetProducerNameServerAddress(producer, "127.0.0.1:9876");
        // start producer
        StartProducer(producer);
        cout << "Producer start....." << endl;
        // send message
        StartSendMessage(producer);
        // shutdown producer
        ShutdownProducer(producer);
        // destroy producer
        DestroyProducer(producer);
        cout << "Producer Shutdown!" << endl;
        
        return 0;
    }
     
    

    消费者test_consumer.cpp

    /*
    * Description: Simple push consumer demo
    */
     
    #include <unistd.h>
    #include <stdlib.h>
    #include <iostream>
    #include <string>
    #include "CPushConsumer.h"
    #include "CMessageExt.h"
     
    using namespace std;
     
    // consume message
    int doConsumeMessage(struct CPushConsumer *consumer, CMessageExt *msgExt)
    {
        cout << "[Consume Message] " << "MsgTopic:" << GetMessageTopic(msgExt) << ", MsgTags:" << GetMessageTags(msgExt)
            << ", MsgKeys:" << GetMessageKeys(msgExt) << ", MsgBody:" << GetMessageBody(msgExt) << endl;
     
        return E_CONSUME_SUCCESS;
    }
     
    int main(int argc, char *argv[])
    {
        cout << "Push consumer Initializing...." << endl;
        // create push consumer and set some values for it
        CPushConsumer *consumer = CreatePushConsumer("Group_Consumer_Test");
        SetPushConsumerNameServerAddress(consumer, "127.0.0.1:9876");
        Subscribe(consumer, "YOUR-TOPIC", "*");
        // register message callback
        RegisterMessageCallback(consumer, doConsumeMessage);
        // start push consumer
        StartPushConsumer(consumer);
        cout << "Push consumer start, and listening message within 1min..." << endl;
        for (int i = 0; i < 6; i++)
        {
            cout << "Already Running: " << (i * 10) << "S" << endl;
            usleep(10000000);
        }
        // shutdown push consumer
        ShutdownPushConsumer(consumer);
        // destroy push consumer
        DestroyPushConsumer(consumer);
        cout << "PushConsumer Shutdown!" << endl;
        
        return 0;
    }
     
    
    # 启动生产者 通过外链库
    cd "/root/study/" && g++ test_producer.cpp -o test_producer -I/root/rocketmq-client-cpp-2.2.0-bin-release/centos6/rocketmq-client-cpp/include -L/root/rocketmq-client-cpp-2.2.0-bin-release/centos6/rocketmq-client-cpp/lib -lrocketmq && ./test_producer
    # 启动消费者 通过外链库
    cd "/root/study/" && g++ test_consumer.cpp -o test_consumer -I/root/rocketmq-client-cpp-2.2.0-bin-release/centos6/rocketmq-client-cpp/include -L/root/rocketmq-client-cpp-2.2.0-bin-release/centos6/rocketmq-client-cpp/lib -lrocketmq && ./test_consumer
    

    第四步

    测试通信
    先开启mqnamesrv & 和mqbroker -n localhost:9876 –enable-proxy &
    之后就可以使用python和c++通信了
    注 :这里的地址 得看你自己实际的地址

    # 开启python的生产者
    /root/anaconda3/envs/mymq/bin/python /root/study/producer.py
    # 开启C++的消费者
    cd "/root/study/" && g++ test_consumer.cpp -o test_consumer -I/root/rocketmq-client-cpp-2.2.0-bin-release/centos6/rocketmq-client-cpp/include -L/root/rocketmq-client-cpp-2.2.0-bin-release/centos6/rocketmq-client-cpp/lib -lrocketmq && ./test_consumer
    

    或者

    # 开启C++的生产者
    cd "/root/study/" && g++ test_producer.cpp -o test_producer -I/root/rocketmq-client-cpp-2.2.0-bin-release/centos6/rocketmq-client-cpp/include -L/root/rocketmq-client-cpp-2.2.0-bin-release/centos6/rocketmq-client-cpp/lib -lrocketmq && ./test_producer
    # 开启python的消费者
    /root/anaconda3/envs/mymq/bin/python /root/study/consumer.py
    

    结果如开头所示:

    其他参考

    关于可视化配置还没搞
    可参考:
    spring-cloud alibaba 学习——linux 环境 rocketmq4.9.*安装
    linux 安装rocketmq并使用
    Windows环境下RocketMQ的安装及配置(图文详解)
    Linux>>linux下安装RocketMQ
    【Linux】RocketMQ 部署(二进制方式)

    作者:荒野火狐

    物联沃分享整理
    物联沃-IOTWORD物联网 » RocketMQ 5.2.0安装指南及与Python和C通信实现详解

    发表回复