我们将创建一个简单的RocketMQ封装库,包含消息发送和接收的基本功能。然后将其打包为JAR文件供其他部门使用。 步骤: 1. 创建Maven项目 2. 添加依赖(RocketMQ客户端) 3. 编写封装代码 4. 配置Maven打包插件 5. 打包生成JAR 6. 发布JAR到Maven仓库(可选)或直接提供JAR文件 下面详细说明每一步。

Java 项目封装功能为 Jar 包的完整指南

下面是将 RocketMQ 发送接收功能封装为 Jar 包并提供给其他部门使用的详细流程:

一、项目结构与设计

推荐项目结构

text

rocketmq-client-wrapper/
├── src/
│   ├── main/
│   │   ├── java/
│   │   │   └── com/
│   │   │       └── company/
│   │   │           └── mq/
│   │   │               ├── config/
│   │   │               │   └── MQConfig.java
│   │   │               ├── consumer/
│   │   │               │   ├── MessageListener.java
│   │   │               │   └── MQConsumer.java
│   │   │               ├── producer/
│   │   │               │   └── MQProducer.java
│   │   │               └── exception/
│   │   │                   └── MQException.java
│   │   └── resources/
│   │       └── META-INF/
│   │           └── services/  # SPI 配置目录
│   └── test/
│       └── java/              # 单元测试
├── pom.xml                    # Maven 配置
└── README.md                  # 使用文档

二、详细实现步骤

1. 创建 Maven 项目

<!-- pom.xml -->
<project>
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.company.mq</groupId>
    <artifactId>rocketmq-client-wrapper</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <properties>
        <java.version>1.8</java.version>
        <rocketmq.version>4.9.4</rocketmq.version>
    </properties>

    <dependencies>
        <!-- RocketMQ 客户端 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>
        
        <!-- 日志框架 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            
            <!-- 打包插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.2.2</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addDefaultImplementationEntries>true</addDefaultImplementationEntries>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            
            <!-- 生成源码Jar -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-source-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                    <execution>
                        <id>attach-sources</id>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2. 实现核心功能

配置类

java

// MQConfig.java
package com.company.mq.config;

public class MQConfig {
    private String namesrvAddr = "localhost:9876";
    private String producerGroup = "DEFAULT_PRODUCER_GROUP";
    private String consumerGroup = "DEFAULT_CONSUMER_GROUP";
    
    // 省略getter/setter
}
生产者封装

java

// MQProducer.java
package com.company.mq.producer;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import com.company.mq.config.MQConfig;
import com.company.mq.exception.MQException;

public class MQProducer {
    private DefaultMQProducer producer;
    private final MQConfig config;

    public MQProducer(MQConfig config) {
        this.config = config;
    }

    public void start() throws MQException {
        try {
            producer = new DefaultMQProducer(config.getProducerGroup());
            producer.setNamesrvAddr(config.getNamesrvAddr());
            producer.start();
        } catch (Exception e) {
            throw new MQException("Failed to start producer", e);
        }
    }

    public SendResult send(String topic, String tags, String keys, byte[] body) 
        throws MQException {
        try {
            Message msg = new Message(topic, tags, keys, body);
            return producer.send(msg);
        } catch (Exception e) {
            throw new MQException("Failed to send message", e);
        }
    }

    public void shutdown() {
        if (producer != null) {
            producer.shutdown();
        }
    }
}
消费者封装

java

// MessageListener.java
package com.company.mq.consumer;

public interface MessageListener {
    void onMessage(String topic, String tags, String keys, byte[] body);
}

java

// MQConsumer.java
package com.company.mq.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;
import com.company.mq.config.MQConfig;
import com.company.mq.exception.MQException;

public class MQConsumer {
    private DefaultMQPushConsumer consumer;
    private final MQConfig config;
    private final MessageListener messageListener;

    public MQConsumer(MQConfig config, MessageListener messageListener) {
        this.config = config;
        this.messageListener = messageListener;
    }

    public void start(String topic, String tags) throws MQException {
        try {
            consumer = new DefaultMQPushConsumer(config.getConsumerGroup());
            consumer.setNamesrvAddr(config.getNamesrvAddr());
            consumer.subscribe(topic, tags);
            
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                for (MessageExt msg : msgs) {
                    messageListener.onMessage(
                        msg.getTopic(),
                        msg.getTags(),
                        msg.getKeys(),
                        msg.getBody()
                    );
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            
            consumer.start();
        } catch (Exception e) {
            throw new MQException("Failed to start consumer", e);
        }
    }

    public void shutdown() {
        if (consumer != null) {
            consumer.shutdown();
        }
    }
}

3. 添加文档注释和日志

java

/**
 * RocketMQ 生产者封装
 * 
 * 使用示例:
 * MQConfig config = new MQConfig();
 * config.setNamesrvAddr("192.168.1.100:9876");
 * 
 * MQProducer producer = new MQProducer(config);
 * producer.start();
 * 
 * SendResult result = producer.send("ORDER_TOPIC", "CREATE", "order123", "订单内容".getBytes());
 * 
 * producer.shutdown();
 */
public class MQProducer {
    // ...
}

4. 创建 SPI 扩展点(可选)

在 src/main/resources/META-INF/services/ 创建文件:

text

# 文件名:com.company.mq.consumer.MessageListener
# 内容:com.company.mq.consumer.CustomListenerImpl

三、构建与打包

1. 构建 JAR 包

bash

mvn clean package

生成的 JAR 文件位于:

text

target/rocketmq-client-wrapper-1.0.0.jar
target/rocketmq-client-wrapper-1.0.0-sources.jar # 源码Jar

2. 生成 JavaDoc

bash

mvn javadoc:javadoc

生成的文档位于:

text

target/site/apidocs/

四、提供文档与使用示例

README.md 示例

markdown

# RocketMQ 客户端封装库

## 功能概述
封装 RocketMQ 生产者和消费者操作,简化使用流程

## 快速开始

### 添加依赖
```xml
<dependency>
    <groupId>com.company.mq</groupId>
    <artifactId>rocketmq-client-wrapper</artifactId>
    <version>1.0.0</version>
</dependency>
```

### 生产者示例
```java
MQConfig config = new MQConfig();
config.setNamesrvAddr("192.168.1.100:9876");
config.setProducerGroup("ORDER_PRODUCER_GROUP");

MQProducer producer = new MQProducer(config);
producer.start();

// 发送消息
SendResult result = producer.send(
    "ORDER_TOPIC", 
    "CREATE_ORDER", 
    "order_123", 
    "订单内容".getBytes()
);

producer.shutdown();
```

### 消费者示例
```java
MQConfig config = new MQConfig();
config.setNamesrvAddr("192.168.1.100:9876");
config.setConsumerGroup("ORDER_CONSUMER_GROUP");

MQConsumer consumer = new MQConsumer(config, (topic, tags, keys, body) -> {
    System.out.println("收到消息: " + new String(body));
});

// 订阅所有标签的消息
consumer.start("ORDER_TOPIC", "*");
```

## 配置参数
| 参数名 | 默认值 | 说明 |
|--------|--------|------|
| namesrvAddr | localhost:9876 | NameServer 地址 |
| producerGroup | DEFAULT_PRODUCER_GROUP | 生产者组 |
| consumerGroup | DEFAULT_CONSUMER_GROUP | 消费者组 |

## 异常处理
所有异常都封装为 `MQException`,建议捕获处理:
```java
try {
    producer.send(...);
} catch (MQException e) {
    logger.error("消息发送失败", e);
    // 处理异常
}
```

五、分发与使用

1. 分发方式

方式一:直接提供 JAR 文件
  • 将以下文件打包提供给其他部门:

    • rocketmq-client-wrapper-1.0.0.jar

    • rocketmq-client-wrapper-1.0.0-sources.jar (可选)

    • javadoc/ 目录 (可选)

方式二:部署到 Maven 私服
  1. 在 pom.xml 中添加部署配置:

    xml

    <distributionManagement>
        <repository>
            <id>company-nexus</id>
            <url>http://nexus.company.com/repository/maven-releases</url>
        </repository>
    </distributionManagement>
  2. 执行部署命令:

    bash

    mvn clean deploy

2. 其他部门使用方式

作为依赖引入

xml

<dependency>
    <groupId>com.company.mq</groupId>
    <artifactId>rocketmq-client-wrapper</artifactId>
    <version>1.0.0</version>
</dependency>
基础使用示例

java

public class OrderService {
    private MQProducer producer;
    
    @PostConstruct
    public void init() {
        MQConfig config = new MQConfig();
        config.setNamesrvAddr("mq.company.com:9876");
        
        producer = new MQProducer(config);
        producer.start();
    }
    
    public void createOrder(Order order) {
        try {
            String json = objectMapper.writeValueAsString(order);
            producer.send("ORDER_TOPIC", "CREATE", order.getId(), json.getBytes());
        } catch (MQException e) {
            // 处理异常
        }
    }
    
    @PreDestroy
    public void destroy() {
        producer.shutdown();
    }
}

六、最佳实践建议

  1. 版本管理

    • 使用语义化版本控制 (SemVer)

    • 每次更新递增版本号

    • 维护 CHANGELOG.md 记录变更

  2. 兼容性设计

    • 保持公共 API 向后兼容

    • 使用 @Deprecated 标记即将废弃的方法

    • 提供迁移指南

  3. 日志增强

    java

    public class MQProducer {
        private static final Logger logger = LoggerFactory.getLogger(MQProducer.class);
        
        public SendResult send(...) throws MQException {
            long start = System.currentTimeMillis();
            try {
                // ...
                logger.debug("Message sent successfully: {}", result.getMsgId());
                return result;
            } catch (Exception e) {
                logger.error("Failed to send message to {}:{}", topic, tags, e);
                throw e;
            } finally {
                logger.info("Send execution time: {}ms", System.currentTimeMillis() - start);
            }
        }
    }
  4. 配置扩展

    java

    public class MQConfig {
        // 添加高级配置
        private int producerRetryTimes = 3;
        private int consumerThreadMin = 20;
        private int consumerThreadMax = 64;
        
        // 添加自定义配置
        private Map<String, String> customProperties = new HashMap<>();
    }
  5. 性能优化

    • 使用对象池管理 Producer/Consumer 实例

    • 支持批量消息发送

    • 提供异步发送接口

  6. 健康检查

    java

    public class MQHealthChecker {
        public boolean checkProducerHealth(MQProducer producer) {
            // 实现健康检查逻辑
        }
        
        public boolean checkConsumerHealth(MQConsumer consumer) {
            // 实现健康检查逻辑
        }
    }

七、常见问题解决

问题1:类冲突

现象:NoSuchMethodError/ClassNotFoundException
解决

xml

<dependency>
    <groupId>com.company.mq</groupId>
    <artifactId>rocketmq-client-wrapper</artifactId>
    <version>1.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
        </exclusion>
    </exclusions>
</dependency>

问题2:配置不生效

解决:提供配置示例和默认值

java

public class MQConfig {
    public MQConfig() {
        // 从环境变量读取配置
        String namesrv = System.getenv("ROCKETMQ_NAMESRV_ADDR");
        if (namesrv != null) {
            this.namesrvAddr = namesrv;
        }
    }
}

问题3:资源未释放

解决:提供关闭钩子

java

public class MQProducer {
    static {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (producer != null) {
                producer.shutdown();
            }
        }));
    }
}

总结

封装 Jar 包的关键步骤:

  1. 设计清晰API:定义简单易用的接口

  2. 完善错误处理:统一异常封装

  3. 详细文档:提供使用示例和配置说明

  4. 严格测试:确保核心功能稳定

  5. 版本管理:遵循语义化版本规范

  6. 兼容性保障:保持API向后兼容

  7. 分发机制:提供Maven仓库或直接JAR分发

通过良好封装的Jar包,其他团队可以快速集成RocketMQ能力,而无需关心底层实现细节,提高开发效率和系统稳定性。

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐