消息系统整合框架 Spring Cloud Stream

开课吧樵夫2021-11-26 15:05

  Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架。通过使用Spring Cloud Stream,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。但是目前Spring Cloud Stream只支持RabbitMQ和Kafka的自动化配置。

  官网:

  https://spring.io/projects/spring-cloud

消息系统整合框架 Spring Cloud Stream

  程序模型

消息系统整合框架 Spring Cloud Stream

  应用程序的核心部分(Application Core)通过 inputs 与 outputs 管道,与中间件连接,而管道是通过绑定器 Binder 与中间件相绑定的。

  消息发送给一个主题的生产者

  复制工程consumer-8080.重命名为stream-kafka-consumer-8080.将启动类之外的所有代码全部删除。

  添加spring-cloud-stream-binder-kafka依赖。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

  创建生产者类

package com.javafamily.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
// 将MQ与生产者类通过消息管道相绑定
@EnableBinding(Source.class)
public class PetsProducer {
    // 必须使用byName方式的自动注入
    @Autowired
    @Qualifier(Source.OUTPUT)
    private MessageChannel channel;

    public String sendMessage(String msg) {
        // 通过消息管道发送消息,即将消息写入到消息管道,再通过消息管道写入到MQ
        channel.send(MessageBuilder.withPayload(msg).build());
        return msg;
    }
}

  创建处理器

package com.javafamily.controller;

import com.javafamily.producer.PetsProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class PetsController {
    // 将生产者注入
    @Autowired
    private PetsProducer producer;

    @PostMapping("/msg/send")
    public String sendHandler(@RequestParam("message") String msg) {
        // 生产者发送消息
        return producer.sendMessage(msg);
    }
}

  修改配置文件

spring:
  application:
    name: familycloud-consumer-depart

  cloud:
    stream:
      kafka:
        binder:
          # 指定要连接的kafka集群
          brokers: kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092
          # 指定是否自动创建主题
          auto-create-topics: true
      bindings:
        # 指定要绑定的输出管道,及要输出到单管道中的消息主题及类型
        output:
          destination: names
          content-type: text/plain

eureka:
  client:
    service-url:
      defaultZone: http://localhost:8000/eureka

  可以在官网查看kafka的quickstart方法:

  https://kafka.apache.org/quickstart

  运行程序,在postman中分别post:

  http://localhost:8080/msg/send?message=aaaaaaa

  http://localhost:8080/msg/send?message=bbbbbbb

  http://localhost:8080/msg/send?message=ccccccc

  查看结果:

消息系统整合框架 Spring Cloud Stream

  gitee:

  https://gitee.com/javainfamily/spring-cloud

  以上就是开课吧小编为大家整理发布的“消息系统整合框架 Spring Cloud Stream”一文,更多相关内容尽在开课吧广场Java教程频道。

消息系统整合框架 Spring Cloud Stream

免责声明:本站所提供的内容均来源于网友提供或网络搜集,由本站编辑整理,仅供个人研究、交流学习使用。如涉及版权问题,请联系本站管理员予以更改或删除。
有用
分享