官网

https://help.aliyun.com/document_detail/444759.html

定义

消费者是 Apache RocketMQ 中用来接收并处理消息的运行实体。 消费者通常被集成在业务系统中,从 Apache RocketMQ 服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。
在消息消费端,可以定义如下传输行为:

  • 消费者身份:消费者必须关联一个指定的消费者分组,以获取分组内统一定义的行为配置和消费状态
  • 消费者类型:Apache RocketMQ 面向不同的开发场景提供了多样的消费者类型,包括PushConsumer类型、SimpleConsumer类型、PullConsumer类型(仅推荐流处理场景使用)等。
  • 消费者本地运行配置:消费者根据不同的消费者类型,控制消费者客户端本地的运行配置。例如消费者客户端的线程数,消费并发度等,实现不同的传输效果。

Apache RocketMQ 的消费者处理消息时主要经过以下阶段:消息获取—>消息处理—>消费状态提交。
在这里插入图片描述

功能概述

在这里插入图片描述

消费者分类

官网:https://help.aliyun.com/document_detail/444764.html?spm=a2c4g.444763.0.0.6dfc51022FWeT8
rocket-client-java 提供了不同的消费者类型: PushConsumer 、SimpleConsumer,这两种消费者相互变更类型不影响当前消息队列RocketMQ版资源的使用和业务处理。
rocket-client存在PullConsumer,不能与上面两种类型混用,会出现问题
在这里插入图片描述

消费重试

官网:https://help.aliyun.com/document_detail/440356.html?spm=a2c4g.444763.0.0.2fe11288TmQsbG
在这里插入图片描述

消费负载均衡

  • 消息粒度负载均衡:PushConsumer和SimpleConsumer唯一的负载策略
  • 队列粒度负载均衡:PullConsumer默认负载策略

消息粒度负载均衡

https://help.aliyun.com/document_detail/444763.html?spm=a2c4g.444768.0.0.36115b23x9MDEB

策略原理

消息粒度负载均衡策略中,同一消费者分组内的多个消费者将按照消息粒度平均分摊主题中的所有消息,即同一个队列中的消息,可被平均分配给多个消费者共同消费。
在这里插入图片描述
消息粒度负载均衡策略保证同一个队列的消息可以被多个消费者共同处理,但是该策略使用的消息分配算法结果是随机的,并不能指定消息被哪一个特定的消费者处理。

消息粒度的负载均衡机制,是基于内部的单条消息确认语义实现的。消费者获取某条消息后,服务端会将该消息加锁,保证这条消息对其他消费者不可见,直到该消息消费成功或消费超时。因此,即使多个消费者同时消费同一队列的消息,服务端也可保证消息不会被多个消费者重复消费。

顺序消息负载机制

在顺序消息中,消息的顺序性指的是同一消息组内的多个消息之间的先后顺序。因此,顺序消息场景下,消息粒度负载均衡策略还需要保证同一消息组内的消息,按照服务端存储的先后顺序进行消费。不同消费者处理同一个消息组内的消息时,会严格按照先后顺序锁定消息状态,确保同一消息组的消息串行消费。
在这里插入图片描述

Consumer创建过程

准确来说Producer和Consumer的创建的基本流程都是这样,只不过在一些方法的实现细节上有差异
在这里插入图片描述

  1. ClientManagerImpl.clearIdleRpcClients():每一个endPoint(包含broker对应的endpoint)都对应一个RpcClient(与endPoint进行通信的实例),保存在rpcClientTable中,如果RpcClient长时间空闲,从rpcClientTable中移除并调用对应的shutdown方法释放资源。
  2. Client.doHeartbeat():跟topicRouteCache缓存中的broker发送心跳信息
  3. Client.doStats():PushConsumerImpl实现了这个方法,重置一些统计数据,并打印日志
  4. Client.syncSettings():同步配置信息给topicRouteCache中缓存的broker
  5. ClientImpl.updateRouteCache()是对ClientImpl.fetchTopicRoute()的包装
  6. ClientImpl.fetchTopicRoute():遍历Topic集合,向创建client时的endPoint发送请求获取对应的Broker的MessageQueue信息;并将与本地缓存不一致的路由信息保存到topicRouteCache中。

获取topic对应的路由信息

  1. 向客户端配置的endpoints请求路由信息
  2. 更新本地的路由信息
   private ListenableFuture<TopicRouteData> fetchTopicRoute(final String topic) {
   		// 通过GRPC获取topic对应的路由信息
        final ListenableFuture<TopicRouteData> future0 = fetchTopicRoute0(topic);
        final ListenableFuture<TopicRouteData> future = Futures.transformAsync(future0,
        	// 保存路由信息到本地
            topicRouteData -> onTopicRouteDataFetched(topic, topicRouteData), MoreExecutors.directExecutor());
        Futures.addCallback(future, new FutureCallback<TopicRouteData>() {
            @Override
            public void onSuccess(TopicRouteData topicRouteData) {
                log.info("Fetch topic route successfully, clientId={}, topic={}, topicRouteData={}", clientId,
                    topic, topicRouteData);
            }

            @Override
            public void onFailure(Throwable t) {
                log.error("Failed to fetch topic route, clientId={}, topic={}", clientId, topic, t);
            }
        }, MoreExecutors.directExecutor());
        return future;
    }

保存请求得到的路由信息

  1. 获取路由信息中的endpoints
  2. 与缓存中的endpoints做对比,得到新添加的endpoints
  3. 同步配置到新添加的endpoints,同步完成后更新本地的路由缓存
 public ListenableFuture<TopicRouteData> onTopicRouteDataFetched(String topic,
    TopicRouteData topicRouteData) throws ClientException {
        final Set<Endpoints> routeEndpoints = topicRouteData
            .getMessageQueues().stream()
            .map(mq -> mq.getBroker().getEndpoints())
            .collect(Collectors.toSet());
        final Set<Endpoints> existRouteEndpoints = getTotalRouteEndpoints();
        final Set<Endpoints> newEndpoints = new HashSet<>(Sets.difference(routeEndpoints, existRouteEndpoints));
        List<ListenableFuture<?>> futures = new ArrayList<>();
        for (Endpoints endpoints : newEndpoints) {
            final ClientSessionImpl clientSession = getClientSession(endpoints);
            futures.add(clientSession.syncSettings());
        }
        final ListenableFuture<?> future = Futures.allAsList(futures);
        return Futures.transform(future, (Function<Object, TopicRouteData>) input -> {
            topicRouteCache.put(topic, topicRouteData);
            onTopicRouteDataUpdate0(topic, topicRouteData);
            return topicRouteData;
        }, MoreExecutors.directExecutor());
    }
Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐