2c4ccca6883211ebb6edd017c2d2eca2

SpringCloud Stream 消息驱动

屏蔽底层消息中间件的差异 统一消息的编程模型(没有什么是套一层接口解决不了的~)

比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,

像RabbitMQ有exchange,kafka有Topic和Partitions分区,

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

image-20220102154028129

image-20220102154111230

image-20220102154155055

Stream总体架构图

image-20220102154639569

发送和接收消息流程

生产者

  • pom.xml 引入依赖

    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
  • yaml配置

    server:
    port: 8801

    spring:
    application:
    name: cloud-stream-provider

    rabbitmq:
    host: 106.14.154.114
    port: 5672
    username: admin
    password: 123456
    cloud:
    stream:
    binders: # 在此处配置要绑定的rabbitmq的服务信息;
    defaultRabbit: # 表示定义的名称,用于于binding整合
    type: rabbit # 消息组件类型
    bindings: # 服务的整合处理
    output: # 这个名字是一个通道的名称
    destination: studyExchange # 表示要使用的Exchange名称定义
    content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
    binder: defaultRabbit # 设置要绑定的消息服务的具体设置

    eureka:
    client: # 客户端进行Eureka注册的配置
    service-url:
    defaultZone: http://localhost:7001/eureka,http://localhost:7002/eureka
    instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    # instance-id: send-8801.com # 在信息列表时显示主机名称
    # prefer-ip-address: true # 访问的路径变为IP地址
  • 发送消息

    Controller

    @GetMapping(value = "/sendMessage")
    public String sendMessage()
    {
    return messageProvider.send();
    }

    messageProvider.send()

    package com.atguigu.springcloud.service.impl;

    import com.atguigu.springcloud.service.IMessageProvider;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.integration.support.MessageBuilderFactory;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.integration.support.MessageBuilder;
    import javax.annotation.Resource;
    import org.springframework.cloud.stream.messaging.Source;

    import javax.annotation.Resource;
    import java.util.UUID;

    @EnableBinding(Source.class) //定义消息的推送管道 Source 是推送方 Sink 是接收方
    public class MessageProviderImpl implements IMessageProvider
    {
    @Resource
    private MessageChannel output; // 消息发送管道

    @Override
    public String send()
    {
    String serial = UUID.randomUUID().toString();
    output.send(MessageBuilder.withPayload(serial).build());
    System.out.println("*****serial: "+serial);
    return null;
    }
    }

  • 调用接口 向RabbitMQ中发送消息

    image-20220102155630310

http://localhost:8801/sendMessage

消费者

pom.xml

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

yaml

server:
port: 8802

spring:
application:
name: cloud-stream-consumer


rabbitmq:
host: 106.14.154.114
port: 5672
username: admin
password: 123456
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
# environment: # 设置rabbitmq的相关的环境配置
# spring:
# rabbitmq:
# host: localhost
# port: 5672
# username: guest
# password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置

Controller

package com.atguigu.springcloud.controller;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
@EnableBinding(Sink.class) //表明绑定接收方
public class ReceiveMessageListenerController
{
@Value("${server.port}")
private String serverPort;


@StreamListener(Sink.INPUT)
public void input(Message<String> message)
{
System.out.println("消费者1号,----->接受到的消息: "+message.getPayload()+"\t port: "+serverPort);
}
}

分组消费与持久化

先给出结论 不同组是订阅模式 会重复消费 同一组中所有单位只有一个能消费

设置分组

bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
group: atguiguA # 设置消费者的分组

验证: 设置统一分组 每个消费者消费不同的消息

image-20220103110049010

image-20220103110102009

设置不同分组 每个消费者消费相同消息:

image-20220103110232984

image-20220103110243685

持久化

给出结论: 当消费者宕机之后 设置分组的消费者可消费 还在交换机中的消息

验证

都设置不同分组

image-20220103110531191

image-20220103110543622

每个消费者消费同一消息

设置相同分组:

image-20220103110818482

后启动的没抢到消息~~

image-20220103110842411


SpringCloud Sleuth 链路追踪

引出: 在微服务框架中,一个由客户端发起的请求在后端系统中会经过多个不同的的服务节点调用来协同产生最后的请求结果,每一个前段请求都会形成一条复杂的分布式服务调用链路,链路中的任何一环出现高延时或错误都会引起整个请求最后的失败。

挑明说吧: 就是查看每条路线和在该路线下所花的时间

image-20220103111333415

使用步骤

  • 【zipkin】下载安装启动 端口: http://localhost:9411

  • 客户端服务端分别导入依赖

    <!--包含了sleuth+zipkin-->
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zipkin</artifactId>
    </dependency>
  • Yaml配置


    server:
    port: 8001

    spring:
    application:
    name: cloud-payment-service
    zipkin:
    base-url: http://localhost:9411
    sleuth:
    sampler:

    #采样率值介于 0 到 1 之间,1 则表示全部采集
    probability: 1
  • 测试即可

  • image-20220103113720880

image-20220103113800132