Java项目中RocketMQ集成指南

文章目录

  • 1.调整MQ的配置
  • 1.进入bin目录
  • 2.关闭broker和namesrv
  • 3.查看进程确认关闭
  • 4.编辑配置文件broker.conf,配置brokerIP1
  • 5.开放端口10911
  • 6.重新启动
  • 1.进入bin目录
  • 2.启动mqnamesrv和mqbroker
  • 1.启动 NameServer 并将输出重定向到 mqnamesrv.log
  • 2.**启动 Broker 并将输出重定向到** **mqbroker.log**
  • 3.**实时监控 NameServer 的日志文件**
  • 4.**实时监控 Broker 的日志文件**
  • 5.查看进程
  • 2.项目集成MQ
  • 1.domain引入mq依赖
  • 2.sun-club-application-mq 引入domain依赖,用于消费mq
  • 3.sun-club-starter 引入mq层
  • 4.application.yml 配置mq
  • 5.SubjectController.java
  • 1.依赖注入 RocketMQTemplate
  • 2.编写controller,作为消息生产者
  • 6.TestConsumer.java 测试消费
  • 5.测试
  • 3.点赞业务优化为MQ处理
  • 1.SubjectLikedMessage.java 点赞消息实体
  • 2.sun-club-domain 同步点赞数据
  • 1.SubjectLikedDomainService.java
  • 2.SubjectLikedDomainServiceImpl.java
  • 3.add方法逻辑修改
  • 4.测试
  • 1.调整MQ的配置

    1.进入bin目录
    cd /usr/local/soft/rocketmq-all-4.8.0-bin-release/bin
    
    2.关闭broker和namesrv
    sh mqshutdown broker && sh mqshutdown namesrv
    

    CleanShot 2024-07-12 at 12.20.25@2x

    3.查看进程确认关闭
    ps -ef | grep NamesrvStartup && ps -ef | grep BrokerStartup
    

    CleanShot 2024-07-12 at 12.21.38@2x

    4.编辑配置文件broker.conf,配置brokerIP1
    vim /usr/local/soft/rocketmq-all-4.8.0-bin-release/conf/broker.conf
    
    # NameServer 地址(开端口)
    namesrvAddr=
    
    # brokerIP1 指定了 Broker 对外提供服务的 IP 地址
    brokerIP1=
    
    # listenPort 指定了 Broker 监听客户端连接的端口(开端口)
    listenPort=10911
    
    # 当这个选项设置为 true 时,如果客户端尝试向一个不存在的主题发送消息,Broker 会自动创建这个主题
    autoCreateTopicEnable=true
    
    5.开放端口10911
    systemctl start firewalld && firewall-cmd --permanent --add-port=10911/tcp && firewall-cmd --reload && firewall-cmd --query-port=10911/tcp
    

    CleanShot 2024-07-12 at 12.52.09@2x

    6.重新启动
    1.进入bin目录
    cd /usr/local/soft/rocketmq-all-4.8.0-bin-release/bin
    
    2.启动mqnamesrv和mqbroker
    1.启动 NameServer 并将输出重定向到 mqnamesrv.log
    nohup sh mqnamesrv > mqnamesrv.log 2>&1 &
    
    2.启动 Broker 并将输出重定向到 mqbroker.log
    nohup sh mqbroker -c ../conf/broker.conf > mqbroker.log 2>&1 &
    
    3.实时监控 NameServer 的日志文件
    tail -f mqnamesrv.log &
    

    CleanShot 2024-07-12 at 12.40.03@2x

    4.实时监控 Broker 的日志文件
    tail -f mqbroker.log &
    
    5.查看进程
    ps -ef | grep NamesrvStartup && ps -ef | grep BrokerStartup
    

    CleanShot 2024-07-12 at 12.57.04@2x

    2.项目集成MQ

    1.domain引入mq依赖
            <!-- rocketmq -->
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-spring-boot-starter</artifactId>
                <version>2.1.1</version>
            </dependency>
    

    CleanShot 2024-07-12 at 13.05.02@2x

    2.sun-club-application-mq 引入domain依赖,用于消费mq
            <!-- 引入domain层 -->
            <dependency>
                <groupId>com.sun.club</groupId>
                <artifactId>sun-club-domain</artifactId>
                <version>1.0-SNAPSHOT</version>
            </dependency>
    

    CleanShot 2024-07-12 at 13.07.58@2x

    3.sun-club-starter 引入mq层
            <!-- 引入mq层 -->
            <dependency>
                <groupId>com.sun.club</groupId>
                <artifactId>sun-club-application-mq</artifactId>
                <version>1.0-SNAPSHOT</version>
            </dependency>
    

    CleanShot 2024-07-12 at 13.11.35@2x

    4.application.yml 配置mq
    # mq配置
    rocketmq:
      name-server:  # 作用是服务注册和发现,会自动发现broker
      producer:
        group: test-group
    
    

    CleanShot 2024-07-12 at 13.16.40@2x

    5.SubjectController.java
    1.依赖注入 RocketMQTemplate
        @Resource
        private RocketMQTemplate rocketMQTemplate;
    
    2.编写controller,作为消息生产者
        /**
         * 测试mq发送
         * @return
         */
        @GetMapping("/pushMessage")
        public Result<Boolean> pushMessage(@Param("id") int id) {
            rocketMQTemplate.convertAndSend("first-topic", "hello " + id);
            return Result.ok();
        }
    

    CleanShot 2024-07-12 at 13.34.21@2x

    6.TestConsumer.java 测试消费
    package com.sunxiansheng.subject.application.mq;
    
    import com.sun.media.jfxmedia.logging.Logger;
    import groovy.util.logging.Slf4j;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    /**
     * Description:
     * @Author sun
     * @Create 2024/7/12 13:24
     * @Version 1.0
     */
    @Component
    // topic:主题,就是生产者那里指定的主题
    // consumerGroup:消费组,在application.yml文件中配置的
    // RocketMQListener<String>:这里的泛型就是消息的类型
    @RocketMQMessageListener(topic = "first-topic", consumerGroup = "test-group")
    @Slf4j
    public class TestConsumer implements RocketMQListener<String> {
    
        private static final org.slf4j.Logger log = LoggerFactory.getLogger(TestConsumer.class);
    
        /**
         * 对消息进行消费
         * @param s
         */
        @Override
        public void onMessage(String s) {
            log.info("接受到消息了:{}", s);
        }
    
    }
    

    CleanShot 2024-07-12 at 13.34.57@2x

    5.测试

    CleanShot 2024-07-12 at 13.35.13@2x

    CleanShot 2024-07-12 at 13.35.21@2x

    3.点赞业务优化为MQ处理

    1.SubjectLikedMessage.java 点赞消息实体
    package com.sunxiansheng.subject.domain.entity;
    
    import lombok.Data;
    import lombok.experimental.Accessors;
    
    import java.io.Serializable;
    
    /**
     * 题目点赞消息
     */
    @Data
    @Accessors(chain = true) // 支持链式调用
    public class SubjectLikedMessage implements Serializable {
    
        /**
         * 题目id
         */
        private Long subjectId;
    
        /**
         * 点赞人id
         */
        private String likeUserId;
    
        /**
         * 点赞状态 1点赞 0不点赞
         */
        private Integer status;
    
    }
    
    2.sun-club-domain 同步点赞数据
    1.SubjectLikedDomainService.java
        /**
         * MQ同步点赞数据
         * @param subjectLikedBO
         */
        void syncLikedByMsg(SubjectLikedBO subjectLikedBO);
    
    2.SubjectLikedDomainServiceImpl.java
        @Override
        public void syncLikedByMsg(SubjectLikedBO subjectLikedBO) {
            SubjectLiked subjectLiked = new SubjectLiked();
            subjectLiked.setSubjectId(subjectLikedBO.getSubjectId());
            subjectLiked.setLikeUserId(subjectLikedBO.getLikeUserId());
            subjectLiked.setStatus(subjectLikedBO.getStatus());
            subjectLiked.setIsDeleted(IsDeleteFlagEnum.UN_DELETED.getCode());
            subjectLikedService.insert(subjectLiked);
        }
    
    3.add方法逻辑修改

    CleanShot 2024-07-12 at 14.33.20@2x

    4.测试

    CleanShot 2024-07-12 at 14.37.16@2x

    CleanShot 2024-07-12 at 14.37.25@2x

    作者:S-X-S

    物联沃分享整理
    物联沃-IOTWORD物联网 » Java项目中RocketMQ集成指南

    发表回复