Spring Cloud Stream消息发送给多个主题的生产者

开课吧樵夫2021-11-30 16:03

  在上一篇文章中,我们把消息发送给了一个主题的生产者,本篇会演示将消息发送给多个主题的生产者。

  复制stream-kafka-8080工程,重命名为stream-kafka2-8080.

  首先要在producer文件夹下自定义一个Source接口。  

package com.javafamily.producer;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * 自定义管道
 */
public interface CustomSource {
    String CHANNEL_NAME = "xxx";

    @Output(CustomSource.CHANNEL_NAME)
    MessageChannel output();
}

  之后将原有的发布者类PetsProducer进行修改。

package com.javafamily.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
// 将MQ与生产者类通过消息管道相绑定
@EnableBinding({Source.class, CustomSource.class})
public class PetsProducer {
    // 必须使用byName方式的自动注入
    @Autowired
    @Qualifier(Source.OUTPUT)
    private MessageChannel channel;

    @Autowired
    @Qualifier(CustomSource.CHANNEL_NAME)
    private MessageChannel customChannel;

    public String sendMessage(String msg) {
        // 将消息写入到两个管道,将会写入到两个主题
        channel.send(MessageBuilder.withPayload(msg).build());
        customChannel.send(MessageBuilder.withPayload(msg).build());
        return msg;
    }
}

  在配置文件中添加如下输出目标。

spring:
  cloud:
    stream:    
      bindings:
        xxx:
          destination: cities
          content-type: text/plain

  完成以上配置后,运行程序,在postman中分别post:

  http://localhost:8080/msg/send?message=ddddddd

  http://localhost:8080/msg/send?message=eeeeeee

  http://localhost:8080/msg/send?message=fffffff

 

SpringCloudStream消息发送给多个主题的生产者

  创建消息消费者

  Spring Cloud Stream提供了三种创建消费者的方式,这三种方式的都是在消费者类的“消费”方法上添加注解。只要有新的消息写入到了管道,该“消费”方法就会执行。只不过三种注解,其底层的实现方式不同。即当新消息到来后,触发“消费”方法去执行的实现方式不同。

  @PostConstruct:以发布/订阅方式实现

  @ServiceActivator:以新消息激活服务的方式实现

  @StreamListener:以监听方式实现

  @PostConstruct

  在consumer文件夹下创建PostConstructConsumer类。

package com.javafamily.consumer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
@EnableBinding(Sink.class)
public class PostConstructConsumer {
    @Autowired
    @Qualifier(Sink.INPUT)
    private SubscribableChannel channel;

    @PostConstruct
    public void printMessage() {
        channel.subscribe(msg -> {
            // MessageHeaders headers = msg.getHeaders();
            System.out.println(new String((byte[]) msg.getPayload()));
        });
    }
}

  添加配置文件。

spring:  
  cloud:
    stream:
      bindings:
        # 指定要绑定的输入管道,及要消费的管道中的消息主题
        input:
          destination: names

  @ServiceActivator

  在consumer文件夹下创建PostConstructConsumer类。

package com.javafamily.consumer;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class ServiceActivatorConsumer {
    @ServiceActivator(inputChannel = Sink.INPUT)
    public void printMessage(Object msg) {
        System.out.println(msg);
    }
}

  在运行程序前先将PostConstructConsumer类注释掉。

  @StreamListener

  在consumer文件夹下创建StreamListenerConsumer类。

package com.javafamily.consumer;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class StreamListenerConsumer {
    @StreamListener(Sink.INPUT)
    public void printMessage(Object msg) {
        System.out.println(msg);
    }
}

  gitee:

  https://gitee.com/javainfamily/spring-cloud

  以上就是开课吧小编为大家整理发布的“Spring Cloud Stream消息发送给多个主题的生产者”一文,更多相关内容尽在开课吧广场Java教程频道。

SpringCloudStream消息发送给多个主题的生产者

免责声明:本站所提供的内容均来源于网友提供或网络搜集,由本站编辑整理,仅供个人研究、交流学习使用。如涉及版权问题,请联系本站管理员予以更改或删除。
有用1
分享
全部评论(共 1 条评论)
登录 后可发表观点…
发表
  • 。。。
    不错,有点用
    2021-12-01 17:48:30
高并发编程训练营