RocketMQ 5.2.0安装指南及与Python和C通信实现详解
文章目录
前言
记录一下rocket5.2.0安装以及和python和c++通信的踩坑史。
由于只是简单实现,c++部分的实现可能并不完美。
linux:ubuntu20.04 2核2g 有公网ip
rocketmq:5.2.0 (发现一定要看官网的说明,大多博客都是安装的4.x版本的)
c++ :g++4.9.3 调用的库rocket版本:rocketmq-client-cpp-2.2.0
python:3.11.5 调用的库rocket版本:rocketmq-client-cpp-2.0.0
Pypi上推荐python版本是3.10 我这里用的3.11版本也可以通信
rocketmq官网:https://rocketmq.apache.org/
rocketmq-python客户端:https://github.com/apache/rocketmq-client-python
rocketmq-cpp客户端:https://github.com/apache/rocketmq-client-cpp
客户端总览:https://github.com/apache/rocketmq-clients/tree/master
发现有一些bug,有的rocket官方文档的各个相互之间更新的不及时,在客户端总览上说什么python还在施工,其实早就可以通信了,
还有在python客户端上的让下载的动态链接库是2.0.0版本的,而在c++客户端上没有明确说明,看release已经可以下到2.3.0版本的了。
当时我先运行的python,后来再下的c++,虽然两者的动态链接库版本不一样,但还是可以正常通信的(暂时)。
第1张图 显示的是c++的生产者发,python的消费者收
第2张图 显示的是python的生产者发 c++的消费者收
实现
若有官网下载太慢的话,可用sftp方法(这里的目录上的)。
若是遇见防火墙未开启,见下文开启,常用的用来通信的网口有:9876,10911,10909,10912
# 开放防火墙端口
firewall-cmd --zone=public --add-port=10911/tcp --permanent
# 更新
firewall-cmd --reload
# 查看
firewall-cmd --list-port
步骤:
1.安装roctetmq 和测试rocket
2.安装python 和测试python
3.安装c++
4.测试通信
这里事前我不知道使用vscode远程是可以不需要在云服务器上装vscode 我还装了一下
装vscode
# 方式1 通过snap安装
## Snap 是 Ubuntu 和其他 Linux 发行版中常用的包管理系统
sudo apt-get install snapd
snap --version
sudo snap install --classic code
code --version
# 方式2 通过apt安装
## 更新系统包列表
sudo apt update
## 安装依赖包
sudo apt install software-properties-common apt-transport-https wget
## 导入Microsoft GPG key 可分成两步
wget -q https://packages.microsoft.com/keys/microsoft.asc -O- | sudo apt-key add -
#wget https://packages.microsoft.com/keys/microsoft.asc -O microsoft.asc
#sudo apt-key add microsoft.asc
## 添加VS Code repository
sudo add-apt-repository "deb [arch=amd64] https://packages.microsoft.com/repos/vscode stable main"
## 安装VS Code
sudo apt update
sudo apt install code
code --version
这里我还安装anconda来管理python的环境
装anconda 这里选择版本
# 下载Anaconda安装脚本:
wget https://repo.anaconda.com/archive/Anaconda3-2024.06-1-Linux-x86_64.sh
# 运行安装脚本
bash Anaconda3-2024.06-1-Linux-x86_64.sh
# 选择安装位置(默认位置通常是/home/your_username/anaconda3)是否初始化 直接yes
# 激活安装
source ~/.bashrc
conda --version
第一步
安装roctetmq
github上release版本:https://github.com/apache/rocketmq/tags
官网上release版本:https://rocketmq.apache.org/zh/download
参考:【RocketMQ】安装RocketMQ5.2.0(单机版)
及官方文档
# 安装Java RocketMQ是用Java编写的,因此首先需要在你的Linux系统上安装Java
sudo apt update
sudo apt install openjdk-8-jdk
java -version
# 下载并解压RocketMQ 虽然我这里已经有源码的5.3.0版本 但我编译时不成功,选择了已经有编译过的二进制文件的5.2.0版本
wget https://dist.apache.org/repos/dist/release/rocketmq/5.2.0/rocketmq-all-5.2.0-bin-release.zip
unzip rocketmq-all-5.2.0-bin-release.zip
cd rocketmq-all-5.2.0-bin-release
cd bin
# 启动mqnamesrv #The Name Server boot success... 成功了
sh mqnamesrv &
# 启动broker # rocket-proxy startup successfully
sh mqbroker -n localhost:9876 --enable-proxy &
# 查看jps 出现一个nameserver 和 一个proxy 即启动成功
jps
# 测试 生产者发
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
# 测试 消费者收
sh tools.sh org.apache.rocketmq.example.quickstart.Consumer
# 关闭
sh mqshutdown broker
sh mqshutdown namesrv
测试通过会发这样的数据
注:
1.好多博客上讲的什么autoCreateTopicEnable=true
都是4.x版本上的,注意和5.x版本的区别在于5.x版本是--enable-proxy
,一定要在启动broker时加上,也是自动创建话题的作用。
2.如果这里的broker.conf(注意相对位置)
nano rocketmq-all-5.2.0-bin-release/conf/broker.conf
不增加一行namesrvAddr = localhost:9876
的话,在启动broker时 一定要增加这行-n localhost:9876
这里的localhost 为回环地址,也是本地地址,即自己发给自己,可以改成公网IP 这样大家可以通过公网来通信(防火墙开的话)
3.若是服务器的内存过小,是会启动失败的,因为默认rocketmq的启动内存很大,要修改三个文件,我这里只有2g 所以要修改。
会报类似于这样的错:
bin文件夹下
nano runserver.sh
将原来的8g 什么大小的 我都改成了64m (大家可参考自己的内存大小配置)
nano runbroker.sh
tools.sh
5.其他修改
增加环境变量
nano ~/.bashrc
增加了两行,使得可以用$ROCKETMQ_HOME来表示rocketmq的地址
export ROCKETMQ_HOME=/root/rocketmq-all-5.2.0-bin-release
export PATH=$ROCKETMQ_HOME/bin:$PATH
更新环境变量
source ~/.bashrc
之后在任意目录就可以运行
# 运行
mqnamesrv &
mqbroker -n localhost:9876 --enable-proxy &
# 关闭
mqshutdown broker
mqshutdown namesrv
6.
这个
nohup: ignoring input and appending output to 'nohup.out'
和
OpenJDK 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
OpenJDK 64-Bit Server VM warning: MaxNewSize (65536k) is equal to or greater than the entire heap (65536k). A new max generation size of 65472k will be used.
不算报错可以不用管
第二步
这里按照官方文档
# 选择debian,ubuntu是debian下的
wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb
sudo dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb
不装的话会报这样的错。
Traceback (most recent call last):
File "/root/study/producer.py", line 2, in <module>
from rocketmq.client import Producer, Message
File "/root/anaconda3/envs/myrl/lib/python3.11/site-packages/rocketmq/client.py", line 24, in <module>
from .ffi import (
File "/root/anaconda3/envs/myrl/lib/python3.11/site-packages/rocketmq/ffi.py", line 40, in <module>
raise ImportError('rocketmq dynamic library not found')
ImportError: rocketmq dynamic library not found
Pypi安装
# 我这里是事先装的anconda
conda create -n mymq python=3.11.5
conda activate mq
pip install rocketmq-client
测试通信
用的也是官网的例子
producer.py 生产者
from rocketmq.client import Producer, Message
producer = Producer('PID-XXX')
producer.set_name_server_address('127.0.0.1:9876')
producer.start()
msg = Message('YOUR-TOPIC')
msg.set_keys('XXX')
msg.set_tags('XXX')
msg.set_body('XXXX')
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()
consumer.py 消费者
import time
from rocketmq.client import PushConsumer, ConsumeStatus
def callback(msg):
print(msg.id, msg.body)
return ConsumeStatus.CONSUME_SUCCESS
consumer = PushConsumer('CID_XXX')
consumer.set_name_server_address('127.0.0.1:9876')
consumer.subscribe('YOUR-TOPIC', callback)
consumer.start()
while True:
time.sleep(3600)
consumer.shutdown()
注:
1.两处的topic名字得一样,YOUR-TOPIC
2.地址得一样127.0.0.1:9876
3.要namesrv 和broker 开着才能运行,先开启生产者,后开启消费者。
第三步
我是先按照这个将c++的环境先装好
Linux系统VsCode 配置C/C++环境
#include <iostream>
using namespace std
int main() {
cout << "Hello, World!" << endl;
return 0;
}
测试通过后
可以按照官网的教程装
我是直接在release版本中下了一个二进制的版本
cd /root/
# 下载
wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.2.0/rocketmq-client-cpp-2.2.0-bin-release.tar.gz
# 解压
tar -xzf rocketmq-client-cpp-2.2.0-bin-release.tar.gz
这里就出现了下载了两个版本的rocketmq-client-cpp的问题,因为我暂时也不知道如何将c++导入之前python下的rocketmq-client-cpp-2.0.0,只知道之前python装的应该在/usr/local/include/rocketmq
文件夹下。
所以这里将就用了。。
代码:
测试代码参考RocketMQ介绍(三)——RocketMQ编程示例,或者我下的二进制版本的rocketmq-client-cpp这里的文件夹下有example。
生产者 test_producer.cpp
将topic改成和python一样的YOUR-TOPIC
address 改成一样的127.0.0.1:9876
/*
* Description: Simple Producer demo
*/
#include <unistd.h>
#include <stdlib.h>
#include <iostream>
#include <string>
#include "CProducer.h"
#include "CMessage.h"
#include "CSendResult.h"
using namespace std;
// send message
void StartSendMessage(CProducer *producer)
{
CSendResult result;
// create message and set some values for it
CMessage *msg = CreateMessage("YOUR-TOPIC");
SetMessageTags(msg, "Test_Tag");
SetMessageKeys(msg, "Test_Keys");
for (int i = 0; i < 10; i++)
{
// construct different body
string strMessageBody = "this is body number-" + to_string(i);
SetMessageBody(msg, strMessageBody.c_str());
// send message
SendMessageSync(producer, msg, &result);
cout << "send message[" << i << "], result status:" << (int)result.sendStatus << ", msgBody:" << strMessageBody << endl;
usleep(1000000);
}
// destroy message
DestroyMessage(msg);
}
int main(int argc, char *argv[])
{
cout << "Producer Initializing....." << endl;
// create producer and set some values for it
CProducer *producer = CreateProducer("Group_producer");
SetProducerNameServerAddress(producer, "127.0.0.1:9876");
// start producer
StartProducer(producer);
cout << "Producer start....." << endl;
// send message
StartSendMessage(producer);
// shutdown producer
ShutdownProducer(producer);
// destroy producer
DestroyProducer(producer);
cout << "Producer Shutdown!" << endl;
return 0;
}
消费者test_consumer.cpp
/*
* Description: Simple push consumer demo
*/
#include <unistd.h>
#include <stdlib.h>
#include <iostream>
#include <string>
#include "CPushConsumer.h"
#include "CMessageExt.h"
using namespace std;
// consume message
int doConsumeMessage(struct CPushConsumer *consumer, CMessageExt *msgExt)
{
cout << "[Consume Message] " << "MsgTopic:" << GetMessageTopic(msgExt) << ", MsgTags:" << GetMessageTags(msgExt)
<< ", MsgKeys:" << GetMessageKeys(msgExt) << ", MsgBody:" << GetMessageBody(msgExt) << endl;
return E_CONSUME_SUCCESS;
}
int main(int argc, char *argv[])
{
cout << "Push consumer Initializing...." << endl;
// create push consumer and set some values for it
CPushConsumer *consumer = CreatePushConsumer("Group_Consumer_Test");
SetPushConsumerNameServerAddress(consumer, "127.0.0.1:9876");
Subscribe(consumer, "YOUR-TOPIC", "*");
// register message callback
RegisterMessageCallback(consumer, doConsumeMessage);
// start push consumer
StartPushConsumer(consumer);
cout << "Push consumer start, and listening message within 1min..." << endl;
for (int i = 0; i < 6; i++)
{
cout << "Already Running: " << (i * 10) << "S" << endl;
usleep(10000000);
}
// shutdown push consumer
ShutdownPushConsumer(consumer);
// destroy push consumer
DestroyPushConsumer(consumer);
cout << "PushConsumer Shutdown!" << endl;
return 0;
}
# 启动生产者 通过外链库
cd "/root/study/" && g++ test_producer.cpp -o test_producer -I/root/rocketmq-client-cpp-2.2.0-bin-release/centos6/rocketmq-client-cpp/include -L/root/rocketmq-client-cpp-2.2.0-bin-release/centos6/rocketmq-client-cpp/lib -lrocketmq && ./test_producer
# 启动消费者 通过外链库
cd "/root/study/" && g++ test_consumer.cpp -o test_consumer -I/root/rocketmq-client-cpp-2.2.0-bin-release/centos6/rocketmq-client-cpp/include -L/root/rocketmq-client-cpp-2.2.0-bin-release/centos6/rocketmq-client-cpp/lib -lrocketmq && ./test_consumer
第四步
测试通信
先开启mqnamesrv & 和mqbroker -n localhost:9876 –enable-proxy &
之后就可以使用python和c++通信了
注 :这里的地址 得看你自己实际的地址
# 开启python的生产者
/root/anaconda3/envs/mymq/bin/python /root/study/producer.py
# 开启C++的消费者
cd "/root/study/" && g++ test_consumer.cpp -o test_consumer -I/root/rocketmq-client-cpp-2.2.0-bin-release/centos6/rocketmq-client-cpp/include -L/root/rocketmq-client-cpp-2.2.0-bin-release/centos6/rocketmq-client-cpp/lib -lrocketmq && ./test_consumer
或者
# 开启C++的生产者
cd "/root/study/" && g++ test_producer.cpp -o test_producer -I/root/rocketmq-client-cpp-2.2.0-bin-release/centos6/rocketmq-client-cpp/include -L/root/rocketmq-client-cpp-2.2.0-bin-release/centos6/rocketmq-client-cpp/lib -lrocketmq && ./test_producer
# 开启python的消费者
/root/anaconda3/envs/mymq/bin/python /root/study/consumer.py
结果如开头所示:
其他参考
关于可视化配置还没搞
可参考:
spring-cloud alibaba 学习——linux 环境 rocketmq4.9.*安装
linux 安装rocketmq并使用
Windows环境下RocketMQ的安装及配置(图文详解)
Linux>>linux下安装RocketMQ
【Linux】RocketMQ 部署(二进制方式)
作者:荒野火狐