image

SpringCloud分布式微服务后端开发学习笔记

  • WORDS 89168

Java后端微服务分布式学习笔记

目前主流的微服务技术栈

  • SpringCloud + Feign
  • SpringCloudAlibaba + Feign
  • SpringCloudAlibaba + Dubbo

微服务拆分注意事项

  1. 不同微服务模块,不要重复开发相同业务
  2. 微服务数据库独立,不要访问其他微服务的数据库
  3. 微服务将自己的业务暴露为接口,供其他微服务调用

提供者与消费者

  • 服务提供者:一次操作中,被其他微服务调用的服务
  • 服务消费者:一次操作中,调用其他微服务的服务

SpringCloud

RestTemplate

RestTemplate是由 Spring提供的一个同步阻塞式执行 http请求的工具类,相对于传统的 httpClient来说,代码封装度更高、使用更简洁、并且对 REST风格的请求提供了更好的支持。

在IOC容器中注册组件

@Bean
public RestTemplate restTemplate(){
    return new RestTemplate();
}

发送GET请求并把返回参数封装为对象

String url = "http://localhost:8081/user/"
User user = restTemplate.getForObject(url, User.class);

Eureka注册中心

  • EurekaServer:服务端、注册中心
  • EurekaClient:客户端

在使用Eureka注册中心后,每个微服务启动后都会向eureka注册自己的信息。eureka会保存这些信息同时消费者可以通过服务名称向eureka拉取服务信息。

如果有多个服务器提供者,那么会利用负载均衡进行挑选,选中后发起远程调用。

服务提供者会每隔30秒向Eureka注册中心发送心跳请求,进行续约。心跳不正常的会被剔除。

搭建注册中心

创建一个新Module,引入 Eureka-Server依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>

创建主类

@SpringBootApplication
// 必需 开启eureka注册中心
@EnableEurekaServer
public class EurekaApplication {
    public static void main(String[] args) {<dependency>
            <groupId>org.mariadb.jdbc</groupId>
            <artifactId>mariadb-java-client</artifactId>
        </dependency>
        SpringApplication.run(EurekaApplication.class, args);
    }
}

编写配置文件

server:
  port: 10000
spring:
  application:
    # 服务名称
    name: eureka-server
eureka:
  # eureka也需要向自己注册
  client:
    service-url:
      # 注册中心地址
      defaultZone: http://localhost:10000/eureka

服务注册

给微服务模块引入 Eureka-Client依赖@EnableFeignClients(clients = {UserClient.class})

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

更改配置文件

spring:
  application:
  	# 服务名称
  	name: userservice
eureka:
  # 客户端
  client:
  	# 注册中心地址
    service-url:
      defaultZone: http://localhost:10000/eureka

服务拉取

使用了Eureka注册中心后,需要对服务进行远程调用时需要使用服务名称来代替服务的IP和端口。如果存在多少服务时还可以进行负载均衡操作。

// 使用服务名替代 ip和端口
String url = "http://userservice/user/" + order.getUserId();
User user = restTemplate.getForObject(url, User.class);

开启负载均衡

@Bean
// 为 RestTemplate开启负载均衡 对所有请求都有效 默认使用轮询算法
@LoadBalancedselect * from t_recruitment_aticle;
public RestTemplate restTemplate(){
    return new RestTemplate();
}

Ribbon负载均衡

Ribbon的负载均衡规则是由 IRule的接口来定义的,每一个子接口就是一个规则

常见的IRule规则

规则类 规则说明
RoundRobinRule 通过轮询服务列表来选择服务器,默认规则
AvailabilityFilteringRule 对这两种服务器进行忽略<br />1. 默认情况下,这台服务器三次连接失败就会被设置为短路状态,短路状态会持续30秒。如果再次连接失败那么短路状态会成倍增加<br />2.并发数过高的服务器。如果一个服务器的并发连接数量过高,配置了该规则的服务器也会将其忽略。并发连接数的上限可以由客户端进行配置。
WeightedResponseTimeRule 为每一个服务器设置一个权重值,服务器响应时间越长,权重就越小。规则会随机选择服务器,但权重值会影响服务器的选择
ZoneAvoidanceRule 以区域可用的服务器为基础进行服务器的选择。以Zone对服务器进行分类,再对一个Zone内的服务列表进行轮询。
BestAvailableRule 忽略短路的服务器,选择并发数较低的服务器
RandomRule 随机选择一个可用的服务器
RetryRule 重试机制的选择逻辑

自定义负载均衡规则

  1. 通过IOC容器注入,作用于全局,不能针对单个 service自定义

    Ribbon负载均衡规则的实现类都是继承的 IRule这个接口 只需要向IOC中注入IRule接口实现类的Bean即可实现自定义负载均衡规则

    // 使用随机规则
    @Bean
    public IRule randomRule(){
        return new RandomRule();
    }
    
    // 使用轮询规则
    @Bean
    public IRule roundRobinRule(){
        return new RoundRobinRule();
    }
    
  2. 通过配置文件自定义,可以针对单个 service配置不同的规则

    # 针对单个service
    userservice:
      ribbon:
        # 负载均衡实现类
        NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RoundRobinRule # 轮询
    

饥饿加载

Ribbon默认和 SpringMVC一样采用了懒加载策略,第一次请求进来时才会去创建。会造成第一次访问耗时较长。开启饥饿加载后回去项目启动时就创建。

ribbon:
  eager-load:
    # 开启饥饿加载
    enabled: true
    # 需要饥饿加载的服务 数组类型
    clients: 
      - userservice

Nacos注册中心

nacos是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。官网文档

nacos现在分为两个版本开发。2.x的版本比 1.x版本提供了更多的特性和性能提升,同时完全兼容 1.x版本的客户端注册。

单机安装使用

使用官方最新的 1.4.5版本

  1. 下载源码包

    wget https://github.com/alibaba/nacos/releases/download/1.4.5/nacos-server-1.4.5.zip
    # 解压
    unzip nacos-server-1.4.5.zip && cd nacos
    
  2. 创建 nacos的工作数据库。使用 nacos/conf/nacos-mysql.sql文件创建数据库

  3. 修改配置文件

    vim conf/application.properties
    
    # 修改监听的端口
    ### Default web server port: 
    server.port=8848
    
    # 修改数据库连接信息
    # 数据库的类型
    spring.datasource.platform=mysql
    # 数据库数量 如果是集群则填写真实数量
    db.num=1
    ### Connect URL of DB:
    db.url.0=jdbc:mysql://localhost:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
    db.user.0=
    db.password.0=
    
  4. 修改启动配置

    # nacos默认以集群方式启动 这里修改为单机启动
    vim bin/startup.sh
    
    # 将 MODE 的值修改为 standalone
    export MODE="standalone"
    
  5. 启动 nacos

    sh bin/startup.sh
    
    # 可以查看一下log 看看是否启动成功
    cat logs/start.out
    

使用 http://localhost:8848/nacos即可访问管理面板,默认账户和密码都是 nacos

Nacos服务注册

由于 nacos属于 SpringCloudAlibaba,所以需要先在父工程里面引入 SpringCloudAlibaba的依赖来进行统一的版本管理。Mavne地址

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-dependencies</artifactId>
    <version>2.2.5.RELEASE</version>
    <type>pom</type>
</dependency>

在客户端引入 nacos的服务注册依赖 Maven地址

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

修改客户端的配置文件

spring:
  cloud:
    nacos:
      # nacos的服务端地址
      server-addr: localhost:8848

Nacos服务分级

  • 一级是服务:例如userservice
  • 二级是集群:例如杭州或上海
  • 三级是实例:某个集群里面部署了userservice的服务器

服务跨集群调用问题

服务调用应该尽可能选择本地集群的服务。只有本地集群不可访问时,再选择跨集群访问。

自定义实例集群

spring:@EnableFeignClients(clients = {UserClient.class})
  cloud:
    nacos:
      # nacos的服务地址
      server-addr: localhost:8848
      discovery:
        # 集群名字
        cluster-name: HZ

Nacso负载均衡策略

如果需要让 Ribbon遵循 Nacos的规则优先访问本地集群内的服务器,那么需要将 Ribbon的负责均衡策略设置为 NacosRule

使用 Bean注入或者配置文件均可

userservice:
  ribbon:
    # 负载均衡实现类
    NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule # Nacos负载均衡策略

NacosRule的默认策略是优先本集群访问,在本集群的服务列表里面做随机查询。如果本集群内没有实例可用,那么才会跨集群访问同时抛出警告。

通过权重负载均衡

Nacos控制中心找到服务列表,点击后面的编辑按钮即可设置权重。权重范围 1-0.1数字越大权重越高。

如果权重设置为 0则不会被访问。

环境隔离-namespace

namespaceNacos中用来做环境隔离的,可以将开发、测试或者其他环境隔离开来。

Nacos的环境隔离级别

namespace > group > service > 集群 > 实例

Nacos控制中心命名空间中即可新建环境,环境创建完成后需要修改服务的配置文件。

不同命名空间中的服务不互通

spring:  
  cloud:
    nacos:
      server-addr: localhost:8848
      discovery:
        cluster-name: HZ
        # 命名空间的id
        namespace: 1de58ca1-3295-49b8-96a9-d23fd71826c4

Nacos注册中心

Nacos支持服务列表变更的消息推送模式,服务列表更新比较及时。

Nacos中服务列表分为临时实例和非临时实例

  • 临时实例:定时向Nacos注册中心发送心跳监测,如果超时未发送那么会直接从服务列表中剔除
  • 非临时实例:不用要求定时发送心跳监测。Nacos会主动询问健康状态,如果超时未响应那么会把这个实例标记为不健康状态,不会直接剔除。
# 服务实例可以通过修改配置文件告诉Nacos注册中心自己是不是临时实例
discovery:
  cluster-name: HZ
  # 命名空间的id
  namespace: 1de58ca1-3295-49b8-96a9-d23fd71826c4
  # 实例是否为临时实例 false:否 true:是
  ephemeral: false

Nacos配置管理

Nacos的统一配置管理可以对微服务的配置进行统一的管理和热更新。

统一配置管理

Nacos控制中心配置管理菜单找到配置列表,然后新建配置

  • Data ID:配置文件的名称,命名方式为{service}-{profile}.yaml || properties。比如 userservice-dev.yaml表示 userservicedev环境下的 yaml配置文件。
  • 配置格式:yamlproperties均可

微服务配置拉取

SpringBoot项目启动时默认会先读取本地的 bootstrap.yml配置文件和 application.yml配置文件(bootstrap.yml是在项目启动时就会读取的,优先级最高),而后在创建IOC容器,注入Bean。

如果需要使用 Nacos的配置管理的话,需要先引入 Nacos的配置管理依赖。同时SpringBoot的启动流程

变更为 bootstrap.yml > Nacos配置 > application.yml > IOC容器 > Bean

当Spring拿到Nacos的远程配置后会合本地的 application.yml进行合并,远程配置的优先级更高

  1. 服务引入依赖

    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
    </dependency>
    
  2. 在服务中新建 bootstrap.yml文件

    spring:
      application:
        # 服务名称
        name: userservice
      profiles:
      	# 环境配置文件
        active: dev
      cloud:
        nacos:
          # nacos服务地址
          server-addr: localhost:8848
          config:
          	# 配置文件的后缀
            file-extension: yaml
    
    # 这里的 服务名称 环境 配置文件后缀 要根据nacos注册中心中配置列表中的配置项来
    

配置热更新

如果是使用 @Value注解完成对配置文件中属性的注入时,那么需要为注入此属性的类加上 @RefreshScope注解来实现配置热更新。

使用 @ConfigurationProperties注解实现属性注入则无需任何操作,配置文件有变动时,属性会自动刷新。

多环境配置共享

服务在启动时会从 nacos读取多个配置文件

  • [service-name]-[profile].yaml 指定服务名和profile环境的配置文件
  • [service-name].yaml 只指定服务名的配置文件,这个文件在任何环境下都会被加载。需要多环境共享的配置可以放在这里面。

配置文件的优先级:服务名-profile.yaml > 服务名.yaml > 本地配置

Nacos集群搭建

通过修改端口模拟集群搭建,使用 8848、8849、8850三个端口。每个 nacos节点都必须使用同一个数据库或者同一个数据库集群。

安装启动

  1. 修改配置文件(所有节点都要修改)

    # 使集群配置文件可用
    mv cluster.conf.example cluster.conf
    
    vim cluster.conf
    
    # 在配置文件中填入所以nacos节点的地址
    
    #it is ip
    #example
    127.0.0.1:8848
    127.0.0.1:8849
    127.0.0.1:8850
    
    # 将nacos的启动方式更改为集群启动
    vim bin/startup.sh
    
    # 将 MODE 的值修改为 cluster
    export MODE="cluster"
    
  2. 修改单个nacos节点的端口号

    vim conf/application.properties
    
    # 修改为需要用的端口号
    server.port=8848
    
    # 在Linux可能出现端口号修改后不生效的情况 需要修改一下 startup.sh 启动脚本
    vim bin/startup.sh
    
    JAVA_OPT="${JAVA_OPT} --spring.config.additional-location=optional:${CUSTOM_SEARCH_LOCATIONS}"
    # 将上面一行更改为下面的值
    JAVA_OPT="${JAVA_OPT} --spring.config.additional-location=${CUSTOM_SEARCH_LOCATIONS}"
    
  3. 启动所有nacos节点

使用Nginx进行负载均衡

通过 Nginx的反向代理功能,对 nacos集群进行负载均衡。

安装 Nginx

# 我使用的是ArchLinux
sudo pacman -S nginx

修改 Nginx配置文件

sudo vim /etc/nginx/nginx.conf


# nginx配置文件

# nacos的集群地址 用于负载均衡
upstream nacos {
    server 127.0.0.1:8848;
    server 127.0.0.1:8849;
    server 127.0.0.1:8850;
}

server {
    listen       7000; # 监听7000端口
    server_name  localhost;

	# 转发所有指向 localhost:7000/nacos 的请求到 nacos集群
    location /nacos{
        proxy_pass http://nacos;
    }
}

Nginx启动成功后需要将所有服务中nacos的服务地址设置为Nginx的反代地址

Feign远程调用

Spring官方文档

Feign是一个声明式Http客户端 官方地址,更加优雅的实现Http调用。同时 Feign也集成了 Ribbon能够自动实现负载均衡。

使用Feign

引入依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

使用Feign

// 1. 在启动主类上添加 @EnableFeignClients 注解
@SpringBootApplication
@EnableFeignClients
public class OrderApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
    }
}

// 2. 创建xxxxClient接口 注解和SpringMVC的的注解是一样的 只不过一个是接收参数 一个是发送参数
// 需要调用的服务名称
@FeignClient("userservice")
public interface UserClient {
    // 通过服务名称 请求路径 和请求参数发送http请求 并将请求结果自动封装
    @GetMapping("/user/{id}")
    public User getUser(@PathVariable("id") Long id);
}



// 就和调用DAO层的接口一样调用
Order order = orderMapper.findById(orderId);
User user = userClient.getUser(order.getUserId());

Feign自定义配置

类型 作用 说明
feign.Loger.Level 修改Feign的日志级别 四种不同的日志级别:NONE、BASIC、HEADERS、FULL
feign.codec.Decoder 响应结果解析器 解析http请求的结果,类似于MVC的请求参数转化器
feign.codec.Encoder 请求参数转换 转换请求参数格式,便于发送
feign.Contract 支持的注解格式 默认是SpringMVC的注解
feign.Retryer 失败重试机制 默认没有,会使用Ribbon的重试
配置文件修改
  • 全局生效

    feign:
      client:
        config:
          # default全局配置
          default:
            # 日志级别
            logger-level: full
    
  • 局部生效

    feign:
      client:
        config:
          # 针对单个服务配置
          userservice:
            logger-level: fill
    
Java代码方式

编写Feign的配置类

public class FeignConfig {
    // 使用bean注入日志级别
    @Bean
    public Logger.Level logLevel(){
        return Logger.Level.FULL;
    }
}

修改配置

// 全局生效 修改启动主类的 @EnableFeignClients 注解
@EnableFeignClients(defaultConfiguration = FeignConfig.class)

// 局部生效 修改Client类的 @FeignClient 注解
@FeignClient(value = "userservice", configuration = FeignConfig.class)

Feign的性能优化

Feign底层的请求客户端实现:

  • URLConnection:默认使用,JDK提供的不支持连接池
  • HttpClient:Apache提供的支持连接池
  • OKHttp:一个高性能http客户端支持连接池

使用 OKHttp替换默认的 URLConnection,首先引入依赖

<dependency>
    <groupId>io.github.openfeign</groupId>
    <artifactId>feign-okhttp</artifactId>
</dependency>

然后修改配置文件

feign:
  okhttp:
    enabled: true
  httpclient:
    enabled: false

Feign的最佳实践

  1. 继承:给消费者的 FeignClient和服务者的 Controller定义一个统一的接口作为标准。

    • 服务之间耦合度高
    • 父接口的参数映射不会被继承
  2. 抽取:由服务提供者将FeignClient、POJO、默认配置等抽取成独立的模块,当消费者需要使用时只需要引入依赖即可。避免重复开发。

    当把 Feign抽取成独立模块后,其中定义的模块不在 SpringBoot的包扫描范围内,有两种方法解决。

    1. 指定FeignClient所在的包

      @EnableFeignClients(basePackages = "cm.zeroxn.feign.clients")
      
    2. 指定FeignClient的类

      @EnableFeignClients(clients = {UserClient.class})
      

统一网关Gateway

Spring官方文档

网关的作用

  1. 对用户请求做身份认证、权限校验
  2. 将用户请求路由对对应的微服务,并实现负载均衡
  3. 对用户请求做限流

在SpringCloud中网关的实现包括两种:

  • gateway:基于Spring5提供的 WebFlux响应式编程的实现,能够比传统的 Servlet容器阻塞式编程承受更高的并发。
  • zuul:基于 Servlet的阻塞式编程

搭建使用

首先引入 nacosgateway的依赖,gateway网关需要从 nacos拉取服务列表

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!-- 如果使用负载均衡报503错误,那么还需要引入负载均衡依赖 -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>

然后创建 application.yml配置文件

server:
  # 网关对外暴露的端口
  port: 10001
spring:
  application:
    name: gateway
  cloud:
    nacos:
      # nacos的服务地址
      server-addr: localhost:8848
    gateway:
      # gateway路由
      routes:
        # 服务id 必须唯一
        - id: user-service
          # 路由的目标地址 前缀http则表示固定 lb表示负载均衡
          # 会通过服务名从nacos中拉取服务列表
          uri: lb://userservice
          # 判断路由的规则 断言成功则将这次请求转发到对应的微服务
          predicates:
            # 路径断言 由对应的断言工厂来处理 判断路径是否是 /user 下的请求  如果是则将请求转发到userservice
            - Path=/user/**
        - id: order-service
          uri: lb://orderservice
          predicates:
            - Path=/order/**

最后创建一个启动主类,启动项目即可

路由断言工厂 GatewayPredicate

网关路由可以配置的内容包括:

  • 路由id:路由唯一标识
  • uri:路由的目的地,支持lb和http两种 lb:负载均衡 http:固定
  • predicates:路由断言,判断请求是否符合规则,符合规则就将这次请求转发到路由目的地
  • filters:路由过滤器,处理请求或响应

Spring提供了11种基本的Predicate工厂

名称 说明 示例
After 是某个时间点之后的请求 - After=2037-01-20T17:42:47.789-07:00[America/Denver]
Before 是某个时间点之前的请求 - Before=2037-01-20T17:42:47.789-07:00[Asia/Shanghai]
Between 在某两个时间点之前的请求 - Between=2037-01-20T17:42:47.789-07:00[America/Denver],2037-04-22T17:42:47.789-07:00[America/Denver]
Cookie 请求必须包含某些Cookie - Cookie=tokenxxxx
Header 请求必须包含某些Header - Header=origin
Host 请求必须是某个host(域名) - Host=**.zeroxn.com,xxxxxxx
Method 请求方式必须是指定方式 - Method=GET,POST
Path 请求路径必须符合指定规则 - Path=/user/{id},/user/**
Query 请求参数必须包含指定参数 - Query=name,id
RemoteAddr 请求者的IP必须是指定范围 - RemoteAddr=10.10.10.1/24
Weight 权重处理

路由过滤器 GatewayFilter

GatewayFilter是网关中提供的一种过滤器,可以对进入网关的请求和微服务返回的响应做处理。

Spring提供了31种不同的路由过滤器工厂

名称 说明
AddRequestHeader 给当前请求添加一个请求头
RemoveRequestHeader 移除当前请求中的一个请求头
AddResponseHeader 给当前请求响应结果添加一个响应头
RemoveResponseHeader 移除当前请求响应结果中的一个响应头
RequestRateLimiter 限制请求的流量

全部过滤器

给单个服务添加过滤器

gateway:
  routes:
    - id: user-service
      uri: lb://userservice
      predicates:
        - Path=/user/**
      # 给所以指向userservice的请求添加过滤器
      filters:
        # 给所以请求都添加一个请求标头 Text=hello 这里的 ", " 是 "=" 号的意思
        - AddRequestHeader=Text, hello

默认过滤器

gateway:
  routes:
    - id: user-service
      uri: lb://userservice
      predicates:
        - Path=/user/**
    - id: order-service
      uri: lb://orderservice
      predicates:
        - Path=/order/**
  # 默认过滤器 针对所以请求都生效
  default-filters:
    - AddRequestHeader=message, world

全局过滤器 GlobalFilter

全局过滤器的作用是处理一切进入网关的请求和微服务响应,与过滤器的效果一样。

区别在于 GatewayFilter通过配置定义,处理逻辑是写死的。而 GlobalFilter的逻辑需要自己代码实现。

// 可以通过注解修改过滤器的优先级 也可以通过继承 Ordered接口修改 数字越小优先级越高
//@Order(-1)

// 注入到Spring容器 自定义的全局过滤器需要实现 GlobalFilter 接口
@Component
public class LoginFilter implements GlobalFilter, Ordered {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // 获取请求
        ServerHttpRequest request = exchange.getRequest();
        // 获取请求参数
        MultiValueMap<String, String> queryParams = request.getQueryParams();
        // 获取身份认证参数并判断
        String auth = queryParams.getFirst("auth");
        if("admin".equals(auth)){
            // 通过则放行
            return chain.filter(exchange);
        }
        // 设置响应的响应码为401
        exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
        // 结束请求
        return exchange.getResponse().setComplete();
    }

    @Override
    public int getOrder() {
        // 优先级为 -1
        return -1;
    }
}

过滤器的执行顺序

请求进入网关被路由后,会将当前路由过滤器的DefaultFilter、GlobalFilter合并到一个过滤器链中,排序后依次执行。

GlobalFilter全局过滤器的 order值由我们自己指定,而路由过滤器和 DefaultFilterorder值是由 Spring指定的,默认按照声明顺序从 1递增。

当过滤器的 order值一样时,会按照 defaultFilter > 路由过滤器>GlobalFilter的顺序执行

跨域问题处理

Gateway提供了针对跨域请求的处理,只需要修改配置文件即可

spring:
  cloud:
    gateway: 
      # 全局跨域配置
      globalcors:
       # 针对 options 请求的统一处理
        add-to-simple-url-handler-mapping: true
        cors-configurations:
          # 拦截所有请求
          '[/**]':
            # 允许哪些源站跨域
            allowedOrigins:
              - 'http://localhost:4017'
              - 'http://zeroxn.com'
            # 允许跨域的请求方式
            allowedMethods:
              - 'GET'
              - 'PUT'
              - 'POST'
              - 'DELETE'
              - 'OPTIONS'
            # 允许在请求中携带的头信息
            allowedHeaders: '*'
            # 是否允许携带Cookie
            allowCredentials: true
            # 跨域有效时间 超时会重新检测
            maxAge: 360000

服务异步通信(MQ)

同步调用存在的问题:

  • 耦合度高:每次加入新的需求,都需要修改原来的代码
  • 性能下降:调用者需要等待服务提供者响应,调用链过长那么响应时间会叠加增长
  • 资源浪费:调用链中的每个服务都在等待响应,不能释放请求占用的资源。高并发场景下极度浪费系统资源
  • 级联失败:如果服务提供者出现问题,那么所有调用方都会出问题

异步调用方案:

异步调用的常见实现是事件驱动模式
  • 优点:

    1. 服务解藕
    2. 性能提升,吞吐量提高
    3. 不用担心级联失败
    4. 流量削峰
  • 缺点:

    1. 依赖于Broker的可靠性和吞吐能力
    2. 架构如果过于复杂,业务不好追踪管理

什么是MQ

MQ(MessageQueue),消息队列,字面意思就是存放消息的队列。也就是事件驱动架构中的 Broker

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit Apache Alibaba Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP、XMPP、SMTP、STOMP OpenWire、STOMP、REST、XMPP、AMQP 私有协议 私有协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微妙级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

RabbitMQ快速入门

官网地址

RabbitMQ的结构和概念

PubLisher -> exchange -> quene -> consumer

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • quene:消息队列负责缓存消息
  • virtualhost:虚拟主机,各个虚拟主机之间不互通
安装RabbitMQ

Docker镜像地址

# hostname 主机名称 集群部署需要
# RABBITMQ_DEFAULT_USER 用户名
# RABBITMQ_DEFAULT_PASS 用户密码
# 15672 RabbitMQ的后台端口
# 5672 RabbitMQ的服务端口

docker run -d --hostname rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:3-management
常见消息模型
  • 基本消息队列(BasicQuene)
  • 工作消息队列(WorkQuene)
  • 发布订阅(Publish,Subscribe),又根据交换机类型不同分为三种:
    1. Fanout Exchange:广播
    2. Direct Exchange:路由
    3. Topic Exchange:主题

官方的Demo是基于基本消息队模型来实现的,只包含三个角色:

  • Publisher:消息发布者,将消息发送到队列中
  • quene:消息队列,接受并缓存消息
  • consumer:订阅队列,处理队列中的消息
基本消息发送和接受

消息发送流程:

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 利用channel向队列发送消息
// 设置连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("admin");
// 创建连接
Connection connection = factory.newConnection();
// 创建Channel
Channel channel = connection.createChannel();
// 创建队列
channel.queueDeclare("demo.quene", false, false, false, null);
String message = "hello demo2";
// 向队列发送消息
channel.basicPublish("", "demo.quene", null, message.getBytes());
System.out.println("消息发送成功:" + message);

消息接收流程:

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 定义consumer的消费行为handleDelivery()
  5. 利用channel将消费者和队列绑定
// 设置连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("admin");
// 创建连接
Connection connection = factory.newConnection();
// 创建Channel
Channel channel = connection.createChannel();
// 创建队列  在接收着还创建队列是为了防止消息发送者还未创建队列
channel.queueDeclare("demo.quene", false, false, false, null);
// 接收到消息后的回调处理
DeliverCallback deliverCallback = ((s, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println("接受到消息:" + message);
});
channel.basicConsume("demo.quene", deliverCallback, c -> {});

SpringAMQP

AMQP,即 Advanced Message Queuing Protocol一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。 基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

Spring AMQP 项目将核心 Spring 概念应用于基于 AMQP 的消息传递解决方案的开发。提供一个“模板”作为发送和接收消息的高级抽象。我们还为消息驱动的 POJO 提供支持。这些库促进了 AMQP 资源的管理,同时促进了依赖注入和声明性配置的使用。spring-amqp是基础抽象,spring-rabbit是默认实现。

基础使用

使用 SpringAMQP完成上面 RabbitMQ的入门案例

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

修改配置文件

spring:
  rabbitmq:
  	# 连接地址
    host: localhost
    # 端口
    port: 5672
    # 虚拟主机
    virtual-host: /
    # 用户名
    username: root
    # 密码
    password: admin

消息发布者

// 实现ApplicationRunner接口 在容器启动后就自动运行
@Component
public class RabbitRunner implements ApplicationRunner {
    // 使用Spring提供的 Rabbit消息发布模板
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Override
    public void run(ApplicationArguments args) throws Exception {
        String queueName = "demo.queue";
        String message = "SpringAMQP";
        // 只需要队列名称和消息内容即可发布消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

消息订阅者

@Component
public class MyRabbitListener {
    // 订阅名为 demo.quene 的消息队列 队列中新发布消息后会调用下面的方法处理
    @RabbitListener(queues = "demo.quene")
    public void listenerRabbit(String message){
        System.out.println("接受到消息:" + message);
    }
}

Work Queue (工作队列)

Work Queue工作队列,可以提高消息的处理速度,避免队列消息堆积。多个消费者绑定到一个队列,同一条消息只会被一个消费者处理。

消费预取限制,通过修改配置文件,可以控制消费者预取消息的上限

spring:
  rabbitmq:
    listener:
      simple:
        # 每次只获取一条消息 处理完后再获取下一条
        prefetch: 1
// 同一个消息队列设置两个消费者 设置休眠时间 让休眠时间短的多处理消息

@Component
public class MyRabbitListener {
    @RabbitListener(queues = "demo.queue")
    public void listenerRabbit1(String message) throws InterruptedException {
        System.out.println("消费者1接收到消息:" + message);
        Thread.sleep(20);
    }
    @RabbitListener(queues = "demo.queue")
    public void listenerRabbit2(String message) throws InterruptedException {
        System.out.println("消费者2接收到消息:" + message);
        Thread.sleep(200);
    }
}

发布(Publish)、订阅(Subscribe)

发布订阅模式允许将同一条消息发送给多个消费者,实现方式是加入了 exchange

exchange负责消息路由,只负责消息的转发而不是存储。转发失败消息会丢失。

常见的 exchange类型包括:

  • Fanout:广播
  • Direct:路由
  • Topic:话题
FanoutExchange

会将接收到的消息路由给每一个跟其绑定的 quene

consumer服务声明 FountExchangeQuene和绑定关系对象 Binding

@Configuration
public class FanoutExchangeConfig {
  
    // 创建一个 FanoutExchange
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("demo.fanout.exchange");
    }
  
    // 创建两个 Queue
    @Bean
    public Queue exchangeQueue1(){
        return new Queue("fanout.queue1");
    }
    @Bean
    public Queue exchangeQueue2(){
        return new Queue("fanout.queue2");
    }
  
    // 将两个 Queue 绑定到 FanoutExchange
    @Bean
    public Binding fanoutBindingQueue1(Queue exchangeQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(exchangeQueue1).to(fanoutExchange);
    }
    @Bean
    public Binding fanoutBindingQueue2(Queue exchangeQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(exchangeQueue2).to(fanoutExchange);
    }
}

订阅两个 queue

@RabbitListener(queues = "fanout.queue1")
public void listenerFanoutQueue1(String message){
    System.out.println("queue1接收到消息:" + message);
}

@RabbitListener(queues = "fanout.queue2")
public void listenerFanoutQueue2(String message){
    System.out.println("queue2接收到消息:" + message);
}

发布消息

@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void run(ApplicationArguments args) {
    String exchangeName = "demo.fanout.exchange";
    String message = "FanoutExchange";
    // 发布者直接向 exchange 发布消息即可 两个 queue 都会接收到消息
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}
DirectExchange

会将接收到的消息根据规则路由到指定的 Quene,每一个和 DirectExchage关联的 Quene都需要设置一个 bindingKey

发布者发布消息时,通过指定消息的 RoutingKey来告诉 Exchange需要将这条消息路由到哪个 Quene

使用 @RabbitListener声明 ExchangeQueue和它们之间的绑定关系。

@RabbitListener(bindings = @QueueBinding(
    		// 声明 queue 并指定name
            value = @Queue(name = "direct.queue1"),
    		// 声明 exchangge 并指定name和类型
            exchange = @Exchange(name = "demo.direct", type = ExchangeTypes.DIRECT),
    		// 指定 queue 的 bindingKey,可以设置多个
            key = {"dev", "prod"}
    ))
public void listenerDirectQueue1(String message){
    System.out.println("dev Queue收到消息:" + message);
}
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "demo.direct", type = ExchangeTypes.DIRECT),
    key = {"test", "prod"}
))
public void listenerDirectQueue2(String message){
    System.out.println("test Queue收到消息:" + message);
}

发布消息

@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void run(ApplicationArguments args) {
    String exchangeName = "demo.direct";
    // 指定 routingKey 发布
    rabbitTemplate.convertAndSend(exchangeName, "test", "hello, testQueue");
    rabbitTemplate.convertAndSend(exchangeName, "dev", "hello, devQueue");
    rabbitTemplate.convertAndSend(exchangeName, "prod", "hello, prodQueue");
}
TopicExchange

TopicExchangeDirectExchange的功能类似,区别在于 routingKey必须是多个单词,单词之间使用 .分割

QueneExchange指定 BindingKey时可以使用通配符:

  • #:0个或多个单词
  • *:一个单词
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue1"),
    // 指定类型为 Topic
    exchange = @Exchange(name = "demo.topic", type = ExchangeTypes.TOPIC),
    // 绑定 dev.下的所有key
    key = "dev.#"
))
public void listenerDirectQueue1(String message){
    System.out.println("dev Queue收到消息:" + message);
}
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "topic.queue2"),
    exchange = @Exchange(name = "demo.topic", type = ExchangeTypes.TOPIC),
    key = "test.#"
))
public void listenerDirectQueue2(String message){
    System.out.println("test Queue收到消息:" + message);
}

发布消息

public void run(ApplicationArguments args) {
    String exchangeName = "demo.topic";
    // 这里的消息会被 test.# 的queue接收
    rabbitTemplate.convertAndSend(exchangeName, "test.news.info", "hello, testQueue");
}

消息转换器

SpringAMQP发送方法中,可以发送的消息类型为 ObjectSpring提供了类似于 SpringMVC的消息转换器来完成类的序列化。

消息对象的处理是由 org.springframework.amqp.support.converter.MessageConverter来完成的。默认实现是 SimpleMessageConverter,基于JDK提供的 ObjectOutputStream实现,而不是 SpringMVC的使用 jackson实现。这种实现方式的弊端在于性能较差、会对对象数据进行Base64编码,增大了数据传输的损耗。

SpringAMQP提供了基于 Jaskson实现的消息转换器,如果需要使用只需要注入Bean即可

<!-- 引入Jackson依赖 -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
// 注入bean
@Bean
public MessageConverter gsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

接收消息也同理,在方法的入参使用消息发布时对应的类型即可。

Elasticsearch

Elasticsearch是一个开源的分布式搜索引擎,结合 KibanaLogstashBeats,被统称为 elastic stackELK环境。广泛应用在日志数据分析、实时监控等领域。

ElasticsearchElastic Stack的核心,负载存储、搜索和分析数据

  • Kibana:数据可视化
  • ElasticSearch:存储、计算和搜索数据
  • Logstash、Beats:数据抓取

正向索引和倒排索引

传统数据库采用正向索引。搜索数据时会逐行扫描数据判断是否符合搜索条件,符合则放进结果集、不符合则丢弃。

elasticsearch使用倒排索引:

倒排索引会先对文档进行分词,对词条创建索引并记录词条所在的文档信息。

  • 文档 document:每条数据就是一个文档
  • 词条 term:文档按照词义分成的词语

发起搜索时会先对搜索关键字进行分词得到词条,再通过词条去 elasticsearch建立的词条索引列表里面获取所关联的文档id,得到文档id之后再通过id去查询对应的文档并存入结果集。

基础知识

  • 文档:可以是数据库中的一条商品数据,一个新闻信息。文档数据会被序列化为json格式后存储在 elasticsearch中。
  • 索引(index):相同类型的文档的集合
  • 映射(mapping):索引中文档的字段约束信息,类似表的结构约束
MySQL Elasticsearch 说明
Table Index 索引,文档的集合。类似于表
Row Docuemnt 文档,一条条的Json数据。类似于表中的行
Column Field 字段,Json文档中的字段,类似于行中的列
Schema Mapping 映射,是索引中文档的约束,例如字段类型约束
SQL DSL elasticsearch提供的Json风格的请求语句,用来操作elasticsearch

安装Elasticsearch

参考 Elastic官方文档

拉取 Elasticsearch镜像

docker pull docker.elastic.co/elasticsearch/elasticsearch:8.7.0

在docker里面创建一个新网络组,用于连接 Kibana

docker network create elastic

启动 Elasticsearch的Docker容器

可选设置:

-e ES_JAVA_OPTS="-Xms1g -Xmx1g"    设置JVM虚拟机的内存占用大小
-e ENROLLMENT_TOKEN="<token>"   使用token加入现有的集群
# 挂载数据和插件目录到本地

docker run --name elasticsearch --net elastic -p 9200:9200 -it -v ./data:/usr/share/elasticsearch/data -v ./plugins:/usr/share/elasticsearch/plugins docker.elastic.co/elasticsearch/elasticsearch:8.7.0


# 命令运行后 在终端会输出默认生成的密码和连接Kibana需要的令牌 令牌30分钟内有效

# 启动时可能会遇到 max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144] 报错,这是由于vm.max_map_count太小导致的

sudo vim /etc/sysctl.conf
# ArchLinux
sudo vim /etc/sysctl.d/99-sysctl.conf

# 插入这一行
vm.max_map_count = 262144

# 应用更改
sudo sysctl --system 

重置 Elasticsearch密码

docker exec -it es01 /usr/share/elasticsearch/bin/elasticsearch-reset-password

复制安全证书,用户使用SSL加密

# 复制到当前路径
docker cp es01:/usr/share/elasticsearch/config/certs/http_ca.crt .

验证证书是否可用

curl --cacert http_ca.crt -u elastic https://localhost:9200

# 回车后会出现提示输入用户名密码

安装Kibana

参考 Kibana官方文档

拉取镜像

docker pull docker.elastic.co/kibana/kibana:8.7.0

启动 Kibana容器

# --net 使用 elasticsearch 所在的网络组

docker run --name kibana --net elastic -p 5601:5601 -v ./data:/usr/share/kibana/data  docker.elastic.co/kibana/kibana:8.7.0

使用 Docker-Compose

version: '2'
services:
  kibana:
    image: docker.elastic.co/kibana/kibana:8.7.0
    volumes:
      # 将配置文件目录和数据目录挂载到本地
      - ./config:/usr/share/kibana/config
      - ./data:/usr/share/kibana/data
    environment:
      # 主机名
      SERVER_NAME: kibana.example.org
      # 需要连接的ES集群 es01是docker的容器名字 可以单个也可可以多个
      ELASTICSEARCH_HOSTS: '["http://es01:9200","http://es02:9200","http://es03:9200"]'
    ports:
      - 5601:5601

启动完成之后打开 http://localhost:5601,输入 Elasticsearch启动时输出的连接 Kibana的Token来建立连接。需要的 Code在启动 Kibana的控制台会输出。默认的用户名 elastic,密码使用 Elasticsearch输出的密码。

安装Ik分词器

官方仓库地址

进入 Elasticsearch在本地的插件挂载目录,下载Ik分词器

cd ~/Documents/docker/ELK/elasticsearch/plugins

# 下载对应的版本
wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v8.7.0/elasticsearch-analysis-ik-8.7.0.zip

# 解压
mkdir ik
unzip -d ./ik elasticsearch-analysis-ik-8.7.0.zip

# 重启
docker restart elasticsearch

Ik分词器包含两种模式:

  • ik_smart:最少拆分,分到最大的词之后不会再往下拆分
  • ik_max_word:最细拆分,拆分出所有可能的词
// 测试分词是否可用 在kibana的 Dev Tools 中测试

POST /_analyze
{
  "text": "今天天气很不错",
  "analyzer": "ik_smart"
}

分词器的扩展和停用词典

可以通过编辑Ik分词器的 IKAnalyzer.cfg.xml来实现扩展分词和停用某些字的分词

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
        <comment>IK Analyzer 扩展配置</comment>
        <!--用户可以在这里配置自己的扩展字典 在配置文件同级目录下 -->
        <entry key="ext_dict">ext.dic</entry>
         <!--用户可以在这里配置自己的扩展停止词字典-->
        <entry key="ext_stopwords">stopword.dic</entry>
        <!--用户可以在这里配置远程扩展字典 -->
        <!-- <entry key="remote_ext_dict">words_location</entry> -->
        <!--用户可以在这里配置远程扩展停止词字典-->
        <!-- <entry key="remote_ext_stopwords">words_location</entry> -->
</properties>

添加扩展词和停用词都只需要在对应的文件中添加词语即可,一个词语占一行

添加完成后需要重启 Elasticsearch

索引库操作

mapping属性

mapping是对索引库中文档的约束,常用的 mapping属性有:

  • type:字段数据类型:
    • 字符串:
      • text:可被分词的文本
      • keyword:关键字、精确值
    • 数值:long、integer、short、byte、double、float
    • 布尔:boolean
    • 日期:date
    • 对象:object
  • index:是否创建索引、默认为true
  • analyzer:使用哪种分词器
  • properties:字段的子字段(object对象的属性)
  • geo_point:由经纬度确定的一个点(32.4564564, 42.1234562)
  • geo_shape:由多个经纬度组成的复杂几何图形
创建索引库

Elasticsearch通过 Restful请求操作索引库和文档,请求内容用DSL语句表示。

可以通过 copy_to属性将当前字段的值拷贝到指定的字段,适用于需要对都多个字段进行搜索的情况。

// 创建索引库
PUT /student
{
  "mappings": {
    "properties": {
      "message": {
        "type": "text",
        "analyzer": "ik_smart"
      },
      "email": {
        "type": "keyword",
        "index": false,
        // 这个字段的值会被拷贝到message字段
        "copy_to": "message"
      },
      "name": {
        "type": "object",
        "properties": {
          "firstName": {
            "type": "keyword"
          },
          "lastName": {
            "type": "keyword"
          }
        }
      }
    }
  }
}
查看、删除索引库
  • 查看索引库

    GET /索引库名
    GET /student
    
  • 删除索引库

    DELETE /索引库名
    DELETE /student
    
修改索引库

Elasticsearch中索引库和 mapping创建之后就无法修改,但是可以添加新的字段

// 添加新字段
// student 是需要添加字段的索引库名
PUT /student/_mapping
{
  "properties": {
    // 这里必须是一个索引库中不存在的新字段
    "age": {
      "type": "integer",
      "index": false
    }
  }
}

文档操作

添加文档
// 向 student 索引库添加一条id为1的数据

POST /student/_doc/1
{
  "age": 8,
  "message": "三年级二班",
  "email": "lisi@admin.com",
  "name": {
    "firstName": "四",
    "lastName": "李"
  }
}
查询、删除文档
# 查询文档
GET /索引库名/_doc/文档id
GET /student/_doc/1

# 删除文档
DELETE /索引库名/_doc/文档id
DELETE /student/_doc/1
修改文档
  1. 全量修改,通过传入的id先删除旧文档,再保存新文档

    // 如果传入的文档id在索引库中不存在 那么会直接新增文档 同时返回码变成201
    
    PUT /student/_doc/2
    {
      "age": 10,
      "message": "三年级四班",
      "email": "zhangsan@admin.com",
      "name": {
        "firstName": "三",
        "lastName": "张"
      }
    }
    
  2. 局部修改文档字段

    // 只会更新文档的指定字段
    
    POST /student/_update/1
    {
      "doc": {
        "age": 12,
        "name": {
          "lastName": "张"
        }
      }
    }
    

JavaClient操作索引库

Elasticsearch提供了不同语言的客户端用来操作 Elasticsearch,主要作用就是组装DSL语句并通过http请求发送给 Elasticsearch服务器。官方文档

引入 lasticsearch-java依赖和 jackson-databind依赖

<dependency>
    <groupId>co.elastic.clients</groupId>
    <artifactId>elasticsearch-java</artifactId>
    <version>8.7.0</version>
</dependency>
<!-- SpringBoot项目无需引入 -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.12.3</version>
</dependency>
建立连接
BasicCredentialsProvider credsProv = new BasicCredentialsProvider();
// 设置账号密码验证
credsProv.setCredentials(
    AuthScope.ANY, new UsernamePasswordCredentials("elastic", "_T4HUl1L5GOVsXY4Wx9h")
);
RestClient restClient = RestClient
    // 连接地址和连接端口
    .builder(new HttpHost("10.10.10.10", 9200))
    .setHttpClientConfigCallback(hc -> hc.setDefaultCredentialsProvider(credsProv))
    .build();
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient elasticsearchClient = new ElasticsearchClient(transport);

建立连接如果报ClassNotFoundException: jakarta.json.spi.JsonProvider错误,则需要引入下面的依赖

<!-- SpringBoot项目会自动管理依赖版本 但是ES的Java API需要2.0.1或之后的版本 -->

<dependency>
    <groupId>jakarta.json</groupId>
    <artifactId>jakarta.json-api</artifactId>
    <version>2.0.1</version>
</dependency>

Elasticsearch8.0以上版本默认开启SSL验证,为了开发环境方便选择关闭

# 从容器中复制配置文件到当前目录
docker cp elasticsearch:/usr/share/elasticsearch/config/elasticsearch.yml ./
# 编辑配置文件
vim elasticsearch.yml
xpack.security.http.ssl:
  # 这里设置为false
  enabled: false
  keystore.path: certs/http.p12
# 复制更改完的配置文件到容器
docker cp ./elasticsearch.yml elasticsearch:/usr/share/elasticsearch/config/
# 重启容器
docker restart elasticsearch

# elasticsearch设置完之后还需要更改kibana的配置文件 将节点的连接地址更改为http类型 否则会无法连接
创建索引库
Map<String, Property> indexMap = new HashMap<>();
		// 设置索引库的字段
        indexMap.put("id", Property.of(property ->
                property.keyword(KeywordProperty.of(keyword ->
                        keyword.index(true)))
                )
        );
        indexMap.put("search", Property.of(property ->
                property.text(TextProperty.of(text ->
                        text.index(true).analyzer("ik_smart")))
                )
        );
		// 分别设置了字段名称 字段类型 是否索引 分词器 拷贝字段
        indexMap.put("name", Property.of(property ->
                property.text(TextProperty.of(text ->
                        text.index(true).analyzer("ik_smart")
                                .copyTo("search")))
                )
        );
        indexMap.put("address", Property.of(property ->
                property.keyword(KeywordProperty.of(keyword ->
                        keyword.index(false)))
                )
        );
        indexMap.put("price", Property.of(property ->
                property.integer(IntegerNumberProperty.of(integer ->
                        integer.index(true)))
                )
        );
        indexMap.put("score", Property.of(property ->
                    property.integer(IntegerNumberProperty.of(integer ->
                            integer.index(true)))
                )
        );
        indexMap.put("brand", Property.of(property ->
                    property.keyword(KeywordProperty.of(keyword ->
                            keyword.index(true).copyTo("search")))
                )
        );
        indexMap.put("city", Property.of(property ->
                    property.keyword(KeywordProperty.of(keyword ->
                            keyword.index(true).copyTo("search")))
                )
        );
        indexMap.put("starName", Property.of(property ->
                    property.keyword(KeywordProperty.of(keyword ->
                            keyword.index(true).copyTo("search")))
                )
        );
        indexMap.put("business", Property.of(property ->
                    property.keyword(KeywordProperty.of(keyword ->
                            keyword.index(true).copyTo("search")))
                )
        );
        indexMap.put("location", Property.of(property ->
                property.geoPoint(new GeoPointProperty.Builder().build())
                )
        );
        indexMap.put("pic", Property.of(property ->
                property.keyword(KeywordProperty.of(keyword ->
                        keyword.index(false))))
        );
// elasticsearchClient参考建立连接 indices()是针对索引库的操作 request.index("hotel")指定索引库名称
// mappings -> mappings.properties(indexMap) 指定字段映射
CreateIndexResponse response = elasticsearchClient.indices().create(request -> request.index("hotel")
                                                                    .mappings(mappings -> mappings.properties(indexMap)));
// 为true则创建成功
System.out.println(response.acknowledged());
删除索引库、判断索引库是否存在
// 删除索引库
DeleteIndexResponse response = elasticsearchClient.indices().delete(request ->
        request.index("hotel"));
// true 删除成功
System.out.println(response.acknowledged());

//判断索引库是否存在
BooleanResponse exists = elasticsearchClient.indices().exists(request ->
        request.index("hotel"));
// true存在 false不存在
System.out.println(exists.value());

JavaClient操作文档

添加文档
HotelDoc hotelDoc = new HotelDoc(hotelService.getById(36934));
// 通过泛型指定文档类
IndexRequest<HotelDoc> indexRequest = new IndexRequest.Builder<HotelDoc>()
    	// 设置索引库 文档内容和id 如果不设置id那么ES会自动生产随机id
        .index("hotel").document(hotelDoc).id(hotelDoc.getId().toString()).build();
// 通过 index() 来操作文档
IndexResponse index = elasticsearchClient.index(indexRequest);
System.out.println(index.toString());
查询文档
// request类型是GetRequest
GetResponse<HotelDoc> response = elasticsearchClient.get(request ->
        request.index("hotel").id("36934"), HotelDoc.class);
System.out.println(response);
修改文档

修改文档有两种方式:

  • 全量更新:使用添加文档的 index()方法即可,会删除旧文档再添加新文档
  • 局部更新:只更新文档中的部分字段
// 将需要更新的字段使用Map装起来
Map<String, String> map = new HashMap<>();
map.put("city", "重庆");
// 两个泛型 一个是原始数据类型 一个是需要更新的部分字段类型
UpdateRequest<HotelDoc, Map<String, String>> request = new UpdateRequest
    .Builder<HotelDoc, Map<String, String>>().index("hotel").id("36934").doc(map).build();
UpdateResponse<HotelDoc> res = elasticsearchClient.update(request, HotelDoc.class);
System.out.println(res);
删除文档
// request类型是DeleteRequest
DeleteResponse response = elasticsearchClient.delete(request ->
        request.index("hotel").id("36934"));
System.out.println(response);
批量添加文档
// 需要添加的对象
List<Hotel> hotelList = hotelService.list();
// 需要添加的文档列表
List<BulkOperation> bulkOperationList = new ArrayList<>();
hotelList.forEach(hotel -> {
    HotelDoc hotelDoc = new HotelDoc(hotel);
    // 设置文档id和文档内容
    CreateOperation<HotelDoc>  createOperation = new CreateOperation.Builder<HotelDoc>()
        .id(hotelDoc.getId().toString())
        .document(hotelDoc)
        .build();
    // 建立文档
    BulkOperation bulkOperation = new BulkOperation.Builder().create(createOperation).build();
    // 添加到文档列表
    bulkOperationList.add(bulkOperation);
});
// 发送请求 批量添加
BulkResponse response = elasticsearchClient.bulk(request ->
        request.index("hotel").operations(bulkOperationList));
System.out.println(response);

DSL查询文档

DSL Query的分类

  • 查询所有:查询出所有数据

  • 全文检索(full text)查询:利用分词器对输入内容进行分词,然后去索引库中匹配

    • match_query
    • multi_match_query
  • 精确查询:通过精确词条值查找数据,一般是查找keyword、数值、日期等

    • ids:通过id查询
    • range:根据数值范围做查询
    • term:通过数据的值查询
  • 地理查询:通过经纬度查询

    • geo_distance:查询到指定中心点距离小于某个值内的所有文档
    • geo_bounding_box:geo_point值落在某个范围内的所有文档
  • 复合查询:复合查询可以组装合并上述查询条件

    • bool

    • function_score:算分函数查询,控制文档排名。会与 query_score运算,得到新算分。

      常见的算分函数有:

      • weight:给定一个常量值作为函数结果
      • field_value_factor:用文档中的某个字段作为函数结果
      • random_score:使用一个随机值做为函数结果
      • script_score:自定义计算公式,结果作为函数结果
基本查询

查询所有

// 查询 hotel 索引库中的所有数据
GET /hotel/_search
{
  "query": {
    "match_all": {}
  }
}

match查询,单字段查询

GET /hotel/_search
{
  "query": {
    "match": {
      // 查询字段和查询值
      "search": "希尔顿上海"
    }
  }
}

multi_match多字段查询,同时查询多个字段

GET /hotel/_search
{
  "query": {
    "multi_match": {
      // 查询值
      "query": "上海希尔顿",
      // 需要查询的字段
      "fields": ["name", "city", "business"]
    }
  }
}
精确查询

精确查询不会对搜索条件进行分词

term精确匹配查询

GET /hotel/_search
{
  "query": {
    "term": {
      // 只返回 city = 上海 的文档
      "city": {
        "value": "上海"
      }
    }
  }
}

range范围查询

GET /hotel/_search
{
  "query": {
    "range": {
      // 需要查询的字段
      "price": {
        // 大于等于 gt:大于
        "gte": 200,
        // 小于等于 lt:小于
        "lte": 400
      }
    }
  }
}
地理查询

地理矩形范围查询

GET /hotel/_search
{
  "query": {
    "geo_bounding_box": {
      // 查询的字段
      "location": {
        // 矩形的左上角坐标
        "top_left": {
          "lat": 31.1,
          "lon": 121.1
        },
        // 矩形的右下角坐标
        "bottom_right": {
          "lat": 30.9,
          "lon": 121.7
        }
      }
    }
  }
}

指定距离查询

GET /hotel/_search
{
  "query": {
    "geo_distance": {
      // 到中心点的距离
      "distance": "5km",
      // 查询的字段
      "location": {
        // 中心点的纬度
        "lat": 31.01,
        // 中心点的经度
        "lon": 121.2
      }
    }
  }
}
复合查询

算分函数查询

GET /hotel/_search
{
  "query": {
    "function_score": {
      // 普通查询
      "query": {
        "match": {
          "search": "上海外滩"
        }
      },
      "functions": [{
          // 过滤器 只对符合条件的值进行处理
          "filter": {
            // 精确匹配
            "term": {
              "id": 434082
            }
          },
          // 指定算分函数
          "weight": 10
        }],
      // 算分函数与 query_score的运算方式
      // multiply:相乘默认,replace:算分函数的值替换query_score的值
      // sum、avg、max、min
      "boost_mode": "multiply"
    }
  }
}
布尔查询

布尔查询是一个或多个查询子句的组合,组合方式有:

  • must:必须匹配每个子查询
  • should:选择性匹配子查询
  • must_not:必须不匹配,不参与算分
  • filter:必须匹配,不参与算分
GET /hotel/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "name": "如家"
          }
        }
      ],
      "must_not": [
        {
          "range": {
            "price": {
              "gte": 400
            }
          }
        }
      ],
      "filter": [
        {
          "geo_distance": {
            "distance": "10km",
            "location": {
              "lat": 31.21,
              "lon": 121.5
            }
          }
        }
      ]
    }
  }
}

// 单条件版本 会参与算分 性能不如上面
GET /hotel/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "name": "如家"
          }
        },
        {
          "range": {
            "price": {
              "lte": 400
            }
          }
        },
        {
          "geo_distance": {
            "distance": "10km",
            "location": {
              "lat": 31.21,
              "lon": 121.5
            }
          }
        }
      ]
    }
  }
}
搜索结果处理
  • 排序

    elasticsearch支持对搜索结果排序,默认是根据相关度算分 _score来排序。支持排序的字段类型有 keyword、数值类型、地理坐标类型、日期类型等

    GET /hotel/_search
    {
      "query": {
        "match_all": {}
      },
      // 对搜索结果排序 先按评分降序排序 评分相同则按价格升序排序
      "sort": [
        {
          "score": {
            "order": "desc"
          }
        },
        {
          "price": {
            "order": "asc"
          }
        }
      ]
    }
    
    GET /hotel/_search
    {
      "query": {
        "match_all": {}
      },
      // 根据目标距离进行排序
      "sort": [
        {
          "_geo_distance": {
            // 字段
            "location": {
              // 依据经纬度
              "lat": 31.2521784,
              "lon": 121.4890408
            },
            // 升序排序并指定单位为 km
            "order": "asc",
            "unit": "km"
          }
        }
      ]
    }
    
  • 分页

    elasticsearch在搜索时默认只返回前10条数据。通过 fromsize参数控制分页结果

    GET /hotel/_search
    {
      "query": {
        "match_all": {}
      },
      "sort": [
        {
          "price": "asc"
        }
      ],
      // 分页开始的位置
      "from": 10,
      // 每一页的记录条数
      "size": 30
    }
    

    深度分页问题, 假如我现在想查询从990-1000的这10条文档

    1. 首先在每个集群节点的数据分片上排序并查询前1000条文档
    2. 将所有节点的结果聚合,重新排序选出前1000条文档
    3. 从这1000条文档中,选取从990开始的10条文档

    搜索页数越深,结果集就越大。ES设定结果集查询的上限是10000

    • from + size:支持随机翻页,会有深度分页问题
    • after search:分页时需要排序,使用当前页最后一条文档的值获取下一页数据。没有深度分页问题,不支持随机分页。
  • 高亮

    在搜索结果中把搜索关键字突出显示。将搜索结果中的关键字用标签标记出来,在页面中给标签添加css样式。

    GET /hotel/_search
    {
      "query": {
        "match": {
          "search": "如家"
        }
      },
      "highlight": {
        "fields": {
          "name": {
            // 关闭字段匹配 默认情况下 只有高亮字段和搜索字段一样才会高亮
            "require_field_match": "false", 
            // 前置标签 默认 <em>
            "pre_tags": "<em>",
            // 后置标签
            "post_tags": "</em>"
          }
        }
      }
    }
    

JavaClient查询文档

基本查询
// 查询所有数据
SearchResponse<HotelDoc> response = elasticsearchClient.search(request ->
        request.index("hotel").query(query -> query.matchAll(a -> a.queryName(""))), HotelDoc.class);
// 获取文档列表
List<Hit<HotelDoc>> hits = response.hits().hits();
hits.forEach(hit -> {
    HotelDoc hotelDoc = hit.source();
    System.out.println(hotelDoc);
});
全文检索查询
// 单字段查询
SearchResponse<HotelDoc> response = elasticsearchClient.search(request -> request
        .index("hotel")
        .query(query -> query
                .match(m -> m
                        .field("search")
                        .query("上海如家"))),
        HotelDoc.class);
List<Hit<HotelDoc>> hits = response.hits().hits();

// 多字段查询
SearchResponse<HotelDoc> response = elasticsearchClient.search(request -> request
         .index("hotel")
         .query(query -> query
         .multiMatch(m -> m
                         .fields("name", "city")
                         .query("上海希尔顿"))),
         HotelDoc.class);
List<Hit<HotelDoc>> hits = response.hits().hits();

// 精确查询
SearchResponse<HotelDoc> response = elasticsearchClient.search(request -> request
        .index("hotel")
        .query(query -> query
                .term(t -> t
                        .field("city")
                        .value("北京"))),
        HotelDoc.class);

// 范围查询
SearchResponse<HotelDoc> response = elasticsearchClient.search(request -> request
        .index("hotel")
        .query(query -> query
                .range(r -> r
                        .field("price")
                        .gte(JsonData.of(400))
                        .lte(JsonData.of(1000)))),
        HotelDoc.class);
复合查询
SearchResponse<HotelDoc> response = elasticsearchClient.search(request -> request
        .index("hotel")
        .query(query -> query.bool(b -> b
                        .must(q -> q.term(t -> t.field("name").value("如家")))
                        .mustNot(q -> q.range(r -> r.field("price").gte(JsonData.of(400))))
                        .filter(f -> f.geoDistance(g -> g.field("location").distance("10km")
                                .location(l -> l.text("31.21, 121.5"))))
                )),
        HotelDoc.class);
排序、分页和高亮
// 查询5条 按照价格倒序排序
SearchResponse<HotelDoc> response = elasticsearchClient.search(request -> request
                .index("hotel")
                .query(query -> query.match(m -> m.field("name").query("希尔顿")))
                .from(0)
                .size(5)
                .sort(s -> s.field(f -> f.field("price").order(SortOrder.Desc)))
                .highlight(h -> h.requireFieldMatch(false).fields("name", v -> v.preTags("<em>").postTags("</em>"))),,
        HotelDoc.class);
List<Hit<HotelDoc>> hits = response.hits().hits();
hits.forEach(hit -> {
    HotelDoc hotelDoc = hit.source();
    // 高亮结果
    System.out.println(hit.highlight());
    System.out.println(hotelDoc);
});

数据聚合

聚合可以实现对文档数据的统计、分析和运算。常见聚合有三类:

  • 桶(Bucket)聚合:对文档做分组

    • TermAggregation:按照文档字段值分组(字段不能被分词)
    • Date Histogram:按照日期阶梯分组,例如一周或一个月为一组
  • 度量(Metric)聚合:用于计算一些值

    • Avg:求平均值
    • Max:求最大值
    • Min:求最小值
    • Stats:同时求max、min、avg、sum等
  • 管道(pipeline)聚合:以其他聚合的结果为基础做聚合

DSL实现Bucket聚合

默认情况下,Bucket聚合会统计 Bucket内的文档数量,按照数量进项降序排序

GET /hotel/_search
{
  // 不显示文档
  "size": 0,
  // 聚合
  "aggs": {
    // 聚合名称
    "brandAgg": {
      // 聚合类型
      "terms": {
        // 按照字段进行聚合
        "field": "brand",
        // 每次显示20条
        "size": 20,
        // 自定义排序 按照 _count 文档数量进行升序排序
        "order": {
          "_count": "asc"
        }
      }
    }
  }
}

根据条件限定聚合范围

GET /hotel/_search
{
  "query": {
    // 限定对地域为上海的酒店进行聚合
    "term": {
      "city": "上海"
    }
  }, 
  "size": 0,
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20
      }
    }
  }
}
DSL实现Metrics聚合
GET /hotel/_search
{
  "size": 0,
  // 先进行Bucket聚合
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20,
        // 通过Metrics聚合的avg字段对Bucket进行排序
        "order": {
          "scoreAgg.avg": "desc"
        }
      },
      // 子聚合 针对每一个Bucket内文档的score字段 求出最高、最低、平均评分
      "aggs": {
        "scoreAgg": {
          "stats": {
            "field": "score"
          }
        }
      }
    }
  }
}
JavaClient实现聚合
SearchResponse<HotelDoc> response = elasticsearchClient.search(request -> request
        .index("hotel")
        .size(0)
        .aggregations("bucketAgg", a -> a.terms(t -> t.field("brand").size(20))), HotelDoc.class);
// 获取Bucket桶数据
Aggregate aggregate = response.aggregations().get("buckAgg");
// 解析数据
List<StringTermsBucket> bucketList = aggregate.sterms().buckets().array();
bucketList.forEach(bucket -> {
    System.out.println(bucket.key().stringValue());
});

自动补全

拼音分词器

如果需要按照字母自动补全,需要使用拼音分词器 Github仓库

安装步骤和 IK分词器一致

自定义分词器

Elasticsearch中分词器 analyzer的组成包含三部分:

  • character filters:在tokeninzer之前对文本进行处理
  • tokenizer:将文本按照一定的规则切割成词条 term
  • tokenizer filter:将 tokenizer输出的词条做进一步处理
PUT /test
{
  "settings": {
    "analysis": {
      "analyzer": {
        // 添加自定义分词器
        "smart_anlyzer": {
          // 先使用ik分词器处理 再使用自定义的pinyin过滤器处理
          "tokenizer": "ik_smart",
          "filter": "py"
        }
      },
      "filter": {
        // 自定义过滤器
        "py": {
          // 使用pinyin分词器
          "type": "pinyin",
          // 设置可配置项
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "name": {
        "type": "text",
        // 创建倒排索引库使用自定义分词器
        "analyzer": "smart_anlyzer",
        // 搜索使用ik分词器
        "search_analyzer": "ik_smart"
      }
    }
  }tong bu
}

拼音分词器适合在创建倒排索引的时候使用,如果在搜索的时候使用,那么会搜出同音字

Completion Suggester查询

Elasticsearch提供了 Completion Suggester查询来实现自动补全功能。会匹配以用户输入内容开头的词条并返回

  • 参与补全查询的字段必须是 completion类型
  • 字段的内容一般是用来补全的多个词条形成的数组
GET /test/_search
{
  "suggest": {
    "title_suggest": {
      // 需要自动补全的值
      "text": "s",
      "completion": {
        // 查询的字段和查询返回的数量
        "field": "title",
        // 结果去重
        "skip_duplicates": true,
        "size": 10
      }
    }
  }
}
JavaClient实现自动补全
SearchResponse<HotelDoc> response = elasticsearchClient.search(request -> request
        .suggest(s -> s.suggesters("suggest", f -> f
                .text("sd")
                .completion(c -> c
                        .field("suggestion")
                        .size(10)
                        .skipDuplicates(true)))), HotelDoc.class);

// 解析响应结果 这里的 suggest 和前面发送请求的 key 必须一致
List<Suggestion<HotelDoc>> suggestionList = response.suggest().get("suggest");
suggestionList.forEach(suggestion -> {
    List<CompletionSuggestOption<HotelDoc>> options = suggestion.completion().options();
    options.forEach(op -> {
        // 自动补全返回的词条
        System.out.println(op.text());
        // 文档数据
        HotelDoc source = op.source();
        System.out.println(source);
    });
});

数据同步

  • 同步调用:往数据库进行CRUD操作时同时对 Elasticsearch也操作一次
  • 异步通知:使用消息队列。管理服务向消息队列中发布消息,搜索服务监听到消息后执行对应的操作
  • 监听binlog:监听 Mysql数据库的 binlog实现数据同步
MQ实现数据同步
// 声明交换机和路由的绑定关系和路由key
@Configuration
public class MqConfig {
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("hotel.topic");
    }
    @Bean
    public Queue insertQueue(){
        return new Queue("hotel.insert.queue");
    }
    @Bean
    public Queue deleteQueue(){
        return new Queue("hotel.delete.queue");
    }
    @Bean
    public Binding insertQueueBinding(Queue insertQueue, TopicExchange topicExchange){
        return BindingBuilder.bind(insertQueue).to(topicExchange).with("hotel.insert.key");
    }
    @Bean
    public Binding deleteQueueBinding(Queue deleteQueue, TopicExchange topicExchange){
        return BindingBuilder.bind(deleteQueue).to(topicExchange).with("hotel.delete.key");
    }
}
// 发生不同的操作后向不同queue中发送消息

@PostMapping
public void saveHotel(@RequestBody Hotel hotel){
    hotelService.save(hotel);
    rabbitTemplate.convertAndSend("hotel.topic", "hotel.insert.key", hotel.getId());
}

@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
    hotelService.removeById(id);
    rabbitTemplate.convertAndSend("hotel.topic", "hotel.delete.key", id);
}
// 监听队列
@Component
public class RabbitMQListener {
    private final HotelService hotelService;
    public RabbitMQListener(HotelService hotelService){
        this.hotelService = hotelService;
    }
    @RabbitListener(queues = "hotel.insert.queue")
    public void listenerHotelInsertOrUpdateById(Long id){
        System.out.println("从消息队列中监听到更新消息:" + id);
        // 通过id从数据库中获取数据 然后添加
        hotelService.insertOrUpdateHotelById(id);
    }
    @RabbitListener(queues = "hotel.delete.queue")
    public void listenerHotelDeleteQueue(Long id){
        System.out.println("从消息队列中监听到删除消息:" + id);
        hotelService.deleteHotelById(id);
    }
}

Elasticsearch集群

单机的 Elasticsearch做数据存储,必然面临两个问题:

  • 海量数据存储问题:将索引库从逻辑上拆分为N个分片 shard,存储到多个节点上
  • 单点故障问题:将单点数据在不同节点备份 replice

Elasticsearch集群中节点的职责划分

节点类型 配置参数 默认值 节点职责
master eligible node.master true 备选主节点:主节点可以管理和记录集群状态、决定分片在哪个节点、处理创建和删除索引库的请求
data node.data true 数据节点:存储数据、搜索、聚合、CRUD
ingest node.ingest true 数据存储之前的预处理
coordinating 上面3个参数都为flase则为coordinating节点 路由请求到其他节点并合并其他节点处理的结果,返回给用户

Elasticsearch中每个节点都有自己不同的职责划分,建议在集群部署时,每个节点都是独立的角色

默认情况下,每个节点都是 master eligible节点。一旦主节点宕机,其他节点会选举一个主节点。当主节点与其他节点发生网络故障时,可能发生脑裂问题

为了避免脑裂,节点当选主节点需要选票超过 (eligible节点数量 + 1) / 2,因此 1eligible节点的数量最好是奇数。

分布式存储

当新增文档时,应该保存到不同分片,保证数据均衡。Elasticsearch会通过 hash算法来计算文档应该存储到哪个分片

shard = hash(_routing) % number_shards
  • _routing默认是文档的 id
  • 算法和分片数量有关,因此索引库一旦创建,分片数量便不能再修改

新增文档流程:

获取文档id -> hash运算,得到要存储的位置 -> 将请求路由到对应分片 -> 分片保存文档 -> 同步文档给备份副本分片

分布式查询

Elasticsearch的查询分为两个阶段:

  • scatter phase:分散阶段,会把请求分发到每一个分片
  • gather phase:聚集阶段,汇总所有数据节点的搜索结果,并处理为结果集返回给用户

故障转移

集群的 master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机中节点的数据迁移到其他节点,确保数据安全。

微服务保护

微服务调用链路中的某个服务故障,引起整个链路中的所有微服务都不可用,这就是雪崩问题。解决方式常见有四种:

  • 超时处理:设定超时时间,请求超过一定时间没有响应就返回错误信息
  • 舱壁模式:限定每个业务能使用的线程数,避免耗尽整个 tomcat资源,也叫做线程隔离
  • 熔断降级:由断路器统计业务执行的异常比例,如果超过阈值则会熔断该业务,拦截访问该业务的一切请求
  • 流量控制:限制业务访问的QPS,避免服务因流量的突增而故障

服务保护技术对比:

Sentinel Hystrix
隔离策略 信号量隔离 线程池隔离/信号量隔离
熔断降级策略 基于慢调用比例或异常比例 基于失败比例
实时指标实现 滑动窗口 滑动窗口(基于RxJava)
规则配置 支持多种数据源 支持多种数据源
扩展性 多个扩展点 插件的形式
基于注解的支持 支持 支持
限流 基于QPS,支持基于调用关系的限流 有限的支持
流量整形 支持慢启动、匀速排队模式 不支持
系统自适应保护 支持 不支持
控制台 开箱即用,可配置规则、秒级监控等 不支持

使用Sentinel

Sentinel是阿里巴巴开源的一款微服务流量控制组件,官方文档。具有以下特性:

  • 丰富的应用场景:Sentinel承接了阿里巴巴近10年的高并发核心场景,例如秒杀、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等
  • 完备的实时监控:提供实时的监控功能,能够看到单台机器秒级数据和集群汇总运行情况
  • 广泛的开源生态:
  • 完善的SPI扩展点

安装控制台

下载最新发行版本

wget https://github.com/alibaba/Sentinel/releases/download/1.8.6/sentinel-dashboard-1.8.6.jar

启动 jar包,默认监听 8080端口。账号密码都是 sentinel

java -jar sentinel-dashboard-1.8.6.jar

# 修改端口 server.port
# 修改用户名 sentinel.dashboard.auth.username
# 修改密码 sentinel.dashboard.auth.password

微服务整合

引入依赖

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>

配置控制台地址

spring: 
  cloud:
    sentinel:
      transport:
        dashboard: 10.10.10.10:8080

访问微服务的任意端点即可触发监控

限流规则

簇点链路

簇点链路就是项目内的调用链路,链路中被监控的每个接口就是一个资源。默认情况下 sentinel会监控 SpringMVC的每一个端点。

流控、熔断等都是针对簇点链路中的资源来设置的

限流规则高级选项中有三种流控模式:

  • 直接:统计当前资源的请求,触发阈值时对对应资源直接限流

  • 关联:统计与当前资源相关的另一个资源,触发阈值时,对当前资源限流

    使用场景:当修改业务触发阈值时,对查询业务进行限流。

  • 链路:统计从指定链路访问到本资源的请求,触发阈值时,对指定链路限流

    Sentinel默认只标记 Controller中的方法为资源,如果要标记其他方法,需要使用 @SentinelResource注解。同时需要关闭 context整合,否则会导致链路模式的流控失效

    spring:
      cloud:
        sentinel:
          web-context-unify: false
    

流控效果

流控效果是指请求达到阈值时应该采取的措施,包括三种:

  • 加速失败:达到阈值后,新的请求会被立即抛出 FlowException异常,默认方式

  • Warm Up:预热模式,对超出阈值的请求同样是拒绝并抛出异常,这种模式阈值会动态变化,从一个较小值逐渐增加到最大阈值

    是应对服务冷启动的一种方案。请求阈值的初始值是 threshold/coldFactor,持续指定时长后,逐渐提高到 threshold值。coldFactor的默认值是3

  • 排队等待:让所有的请求按照先后次序排队执行,两个请求的间隔不能小于指定时长

    让所有的请求进入一个队列中,然后按照阈值允许的时间间隔依次执行。后来的请求必须前面的执行完成,如果请求预期的等待时间超出最大时长,则会被拒绝

热点参数限流

热点参数限流是分别统计参数值相同的请求,判断是否超过 QPS阈值。热点参数限流对于默认的 SpringMVC资源是无效的,需要使用 @SentinelResource注解

降级和熔断

Feign整合Sentinel

修改配置文件,开启 FeignSentinel功能

feign:
  sentinel:
    enabled: true

FeignClient编写失败后的降级逻辑

  1. 使用 FallbackClass,无法对远程调用的异常做处理
  2. 使用 FallbackFactory,可以对远程调用的异常做处理

使用 FallbackFactory

@Slf4j
public class UserClientFallbackFactory implements FallbackFactory<UserClient> {
    @Override
    public UserClient create(Throwable throwable) {
        return new UserClient() {
            @Override
            // 调用异常返回一个空用户
            public User getUser(Long id) {
                log.error("查询用户异常,用户id:" + id, throwable);
                return new User();
            }
        };
    }
}
// 做为Bean注入到IOC容器中
@Bean
public UserClientFallbackFactory userClientFallbackFactory(){
    return new UserClientFallbackFactory();
}
// 声明需要使用的 fallbackFactory
@FeignClient(value = "userservice", fallbackFactory = UserClientFallbackFactory.class)
public interface UserClient {
    @GetMapping("/user/{id}")
    User getUser(@PathVariable("id") Long id);
}

记录一下Feign开启Sentinel启动出现了无法注入的问题 原因在于版本不兼容 将SpringCloud的版本更改为Hoxton.SR9后启动成功

线程隔离

线程隔离有两种方式实现:

  • 线程池隔离,适用于低扇出场景
    • 优点:支持主动超时、异步调用
    • 缺点:线程的额外开销较大
  • 信号量隔离(Sentinel默认采用),适用于高频调用、高扇出场景
    • 优点:轻量级、无额外开销
    • 缺点:不支持主动超时和异步调用

Sentinel添加限流规则时阈值类型选择线程数即可实现线程隔离。线程数是该资源能使用 tomcat线程数的最大值。

熔断降级

熔断降级是解决雪崩问题的重要手段。其思路是有断路器统计服务调用的异常比例、慢请求比例,如果超出阈值则会熔断该服务。拦截访问该服务的一切请求。当服务恢复后,又会自动放行访问该服务的请求

断路器有三个状态:

  • Closed:不拦截任何请求,会记录请求的响应时间,达到失败阈值后进入 Open状态
  • Open:拦截所有请求,快速失败
  • Half-Open:熔断时间结束后进入 Half-Open状态,这时候断路器会放行一次请求,如果请求失败那么继续进入 Open状态,成功则进入 Closed状态

断路器的熔断策略有三种:

  • 慢调用:请求的响应时长大于指定时长就被认定为慢请求。在指定时间内,如果请求数量超过设定的最小数量,慢调用比例大于设定的阈值,则触发熔断
  • 异常比例或者异常数:统计指定时间内的调用,如果调用次数超过指定次数,并且出现异常的比例达到设定的比例阈值(或超过指定异常数)则触发熔断

授权规则

授权规则可以对调用方方来源做控制,有白名单和黑名单两种模式:

  • 白名单:来源 origin在白名单内的调用者可以访问
  • 黑名单:来源 origin在黑名单内的调用者不可访问

Sentinel通过 RequestOriginParser接口来获取请求的来源

// 继承接口 实现代码逻辑
@Component
public class HandlerOriginParser implements RequestOriginParser {
    @Override
    public String parseOrigin(HttpServletRequest httpServletRequest) {
        String origin = httpServletRequest.getHeader("origin");
        if(StringUtils.isEmpty(origin)){
            return "default";
        }
        return origin;
    }
}
# 网关的配置文件里面添加统一的请求头
filters:
  - AddRequestHeader=origin, gateway

自定义异常结果

默认情况下,发生降级、限流、授权拦截时,都会抛出异常给调用方。如果需要自定义异常时的返回结果,需要实现 BlockExceptionHandler接口

BlockException的子类,应对不同的场景

异常 场景
FlowException 限流异常
ParamFlowException 热点参数限流的异常
DegradeException 降级异常
AuthorityException 授权规则异常
SystemBlockException 系统规则异常
// 继承BlockExceptionHandler接口 实现自定义异常处理
@Component
@Slf4j
public class SentinelExceptionHandler implements BlockExceptionHandler {
    @Override
    public void handle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, BlockException e) throws Exception {
        log.warn(e.getMessage());
        String message = "未知异常";
        int status = 429;
        if(e instanceof FlowException){
            message = "请求限流";
        } else if (e instanceof ParamFlowException) {
            message = "热点参数限流";
        } else if (e instanceof DegradeException) {
            message = "请求降级";
        } else if (e instanceof AuthorityException) {
            message = "没有权限访问";
            status = 401;
        }
        JSONObject json = new JSONObject();
        json.put("message", message);
        json.put("status", status);
        httpServletResponse.setStatus(status);
        httpServletResponse.setContentType("application/json");
        httpServletResponse.getWriter().write(json.toJSONString());
    }
}

规则持久化

Sentinel的控制台规则管理有三种模式:

  • 原始模式:默认模式,将规则保存在内存中,重启丢失。
  • pull模式:控制台将配置的规则推送到 Sentinel客户端,客户端会将配置规则保存到本地文件或数据库中。然后会定时去本地文件或数据库中查询,更新本地规则
  • push模式:控制台将配置规则推送到远程配置中心,例如 Nacos、Zookeeper等。Sentinel客户端监听配置中心配置变更的推送消息,完成配置更新

分布式事务

在分布式系统下,一个业务跨越多个服务或数据源,每个服务都是一个分支事务,要保证所有分支事务的最终状态一致,这就是分布式事务。

理论基础

CAP定理,分布式系统有三个指标:

  • Consistency(一致性):用户访问分布式系统中的任意节点,得到的数据必须一致
  • Availability(可用性):用户访问集群中的健康节点,必须能得到响应,而不是超时或拒绝
  • Partition(分区):因为网络故障或其他原因导致分布式系统中的部分节点与其他节点失去连接,形成独立分区
  • Tolerance(容错):在集群出现分区时,整个系统也要持续对外提供服务

但是分布式系统无法同时满足这三个指标,这个结论就叫CAP定理

BASE理论是对CAP的一种解决思路,包含三个思想:

  • Basically Available(基本可用):分布式系统出现故障时,允许损失部分可用性,即保证核心可用
  • Soft State(软件态):在一定时间内,允许出现中间状态 ,比如临时的不一致状态
  • Eventually Consistent(最终一致性):虽然无法保证强一致性,但在软件态结束后,最终达到数据一致。

分布式事务最大的问题是各个子事务的一致性问题

  • AP模式:隔子事务分别执行和提交,允许出现结果不一致, 然后采用弥补措施恢复数据,实现最终一致
  • CP模式:各个子事务执行后互相等待,同时提交、同时回滚,达成强一致。但在事务等待过程中,处于弱可用状态

在分布式事务中,子事务也被成为分支事务,有关联的各个分支事务在一起称为全局事务

Seata简介

Seata是一个开源的分布式事务管理框架官网地址

Seata事务管理中有三个重要的角色:

  • TC(Transaction Coordinator)- 事务协调者:维护全局和分支事务的状态,协调全局事务提交或回滚
  • TM(Transaction Manageer)- 事务管理器:定义全局事务的范围、开始全局事务、提交或回滚全局事务
  • RM(Resource Manager)- 资源管理器:管理分支事务处理的资源,与 TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚

Seata提供了四种不同的分布式事务解决方案:

  • XA模式:强一致性分阶段事务模式,牺牲一定的可用性,无业务入侵
  • TCC模式:最终一致的分阶段事务模式,有业务侵入
  • AT模式:最终一致的分阶段事务模式,无业务侵入,也是 Seata的默认模式
  • SAGA模式:长事务模式,有业务侵入
XA AT TCC SAGA
一致性 强一致 弱一致 弱一致 最终一致
隔离性 完全隔离 基区全局锁隔离 基于资源预留隔离 无隔离
代码侵入 有、要编写三个接口 有、要编写状态机和补偿业务
性能 非常好 非常好
场景 对一致性、隔离型有高要求的业务 基于关系型数据库的大多数分布式事务场景都可以 对性能要求较高的业务、有非关系型数据库要参与的事务 业务流程长、业务流程多,参与者包含其他公司的服务,无法提供TCC模式要求的三个接口

Seata-Server部署

前往发布页下载需要的版本,步骤参考官方文档

wget https://github.com/seata/seata/releases/download/v1.6.1/seata-server-1.6.1.zip
# 解压缩
unzip seata-server-1.6.1.zip

修改配置文件,模式选择 db,注册中心和配置中心都使用 nacos

# 复制需要用的配置
cat conf/application.example.yml

vim conf/application.yml
# application.yml

seata:
  # 配置中心
  config:
    # support: nacos, consul, apollo, zk, etcd3
    type: nacos
    nacos:
      server-addr: 10.10.10.10:8848
      group : "SEATA_GROUP"
      namespace: ""
      username: "nacos"
      password: "nacos"
      # 远程配置的 id
      dataId: "seataServer.properties"
  # 注册中心
  registry:
    # support: nacos, eureka, redis, zk, consul, etcd3, sofa
    type: nacos
    nacos:
      # 在nacos中显示的服务名称
      application: seata-tc-server
      server-addr: 10.10.10.10:8848
      group : "DEFAULT_GROUP"
      namespace: ""
      # 集群地址
      cluster: default
      username: "nacos"
      password: "nacos"
    # support: file 、 db 、 redis
    # 使用数据库模式
    mode: db
    db:
      # 配置数据源
      datasource: druid
      db-type: mysql
      driver-class-name: com.mysql.jdbc.Driver
      url: jdbc:mysql://10.10.10.10:3307/seata?rewriteBatchedStatements=true
      user: root
      password: admin
      min-conn: 10
      max-conn: 100
      global-table: global_table
      branch-table: branch_table
      lock-table: lock_table
      distributed-lock-table: distributed_lock
      query-limit: 1000
      max-wait: 5000
    # 注册端口
    server:
      service-port: 8091

建立数据库和数据表,使用 Mysql数据库

create database seata;
use seata;

建表语句在 seata/script/server/db目录下,直接执行即可

启动 seata-server

./bin/seata-server.sh

# 现在seata在启动之后默认会使用docker的网络 需要在启动参数里面指定host地址 本机ip或者0.0.0.0都可
./bin/seata-server.sh -h 10.10.10.10

# 查看日志 是否启动成功
cat logs/start.out

集成Seata

引入依赖

<properties>
    <seata.version>1.6.1</seata.version>
</properties>

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    <exclusions>
        <!--版本较低,因此排除-->
        <exclusion>
            <artifactId>seata-spring-boot-starter</artifactId>
            <groupId>io.seata</groupId>
        </exclusion>
    </exclusions>
</dependency>
<!-- 使用和服务端对应的版本 -->
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>${seata.version}</version>
</dependency>

修改配置文件

seata:
  registry:
  	# nacos注册中心
    type: nacos
    nacos:
      server-addr: 10.10.10.10:8848
      # 命名空间
      namespace: ""
      group: DEFAULT_GROUP
      # seata-server端在nacos中的名称
      application: seata-tc-server
      # 集群地址
      cluster: default

XA模式

XA规范是 X/Open组织定义的分布式事务处理(DTP)标准,XA规范描述了全局的 TM与局部的 RM之间的接口,几乎所有的主流数据库都对 XA规范提供了支持。XA模式具备强一致性并且没有代码侵入

RM向TM报告自己的执行状态,TM根据每个RM的报告来决定事务是提交还是回滚

在XA规范里面这里的RM就是数据库

SeataXA模式做了一些调整:

RM变成了每个微服务,TM向TC请求开启全局事务然后去调用分支的RM,每个RM向TC注册分支事务和报告事务状态。所有分支事务执行完毕后,TM会向TC请求提交还是回滚事务,TC接收到请求后会去检查每个分支事务的状态,如果全部成功那么通知所有RM提交事务,有一个失败则回滚事务

使用XA模式

# 给每个参与事务的微服务的seata都设置为XA模式
seata:
  data-source-proxy-mode: XA
// 在开启全局事务的方法上使用 @GlobalTransactional 注解
@GlobalTransactional
public Long create(Order order) {
    // 创建订单
    orderMapper.insert(order);
    try {
        // 扣用户余额
        accountClient.deduct(order.getUserId(), order.getMoney());
        // 扣库存
        storageClient.deduct(order.getCommodityCode(), order.getCount());

    } catch (FeignException e) {
        log.error("下单失败,原因:{}", e.contentUTF8(), e);
        throw new RuntimeException(e.contentUTF8(), e);
    }
    return order.getId();
}

AT模式

AT模式同样是分阶段提交的事务模型,不过弥补了 XA模式中资源锁定周期过长的缺陷,官方文档解析

AT模式的总体流程和XA模式一致。也是由TM去请求TC开启全局事务,然后调用分支的RM。每个RM会先向TC注册分支事务,然后直接执行SQL语句并提交事务,提交完成之后记录更新前的的快照 undo log并向TC报告分支事务状态。所有分支事务执行完毕后,TM会向TC请求提交还是回滚事务,TC接收到请求后会去检查每个分支事务的状态,如果全部成功则使用异步线程去删除备份的快照,存在失败则会恢复所有分支事务备份的快照数据

  • XA模式一阶段不提交事务,锁定资源。AT模式直接提交不锁定资源
  • XA模式依赖数据库的回滚机制,AT默认利用快照实现回滚
  • XA模式强一致,AT模式最终一致

AT模式的脏写问题

线程一拿到数据库锁后保存了快照,数据更新直接提交了事务释放了数据库锁。这时线程二在线程一提交事务后拿到了数据库锁,对刚才线程一更新的数据进行了再次更新。如果线程一的全局事务发生回滚的话,那么会用线程一所保存的快照对数据进行恢复导致线程二的更新无效,这就发生了脏写问题。

AT模式的写隔离

为了应对脏写问题,Seata在AT模式引入了全局锁的概念,由 TC记录当前正在操作某行数据的事务,该事务持有全局锁,具有执行权

还是以上面的例子为例,线程一在执行业务SQL时会先尝试向TC获取全局锁,获取成功才会提交。在线程一的全局事务没有结束之前,线程二针对该行数据的更新会因为无法获取到全局锁而阻塞,这时如果线程一需要恢复快照则需要等待线程二执行完毕来获取数据库锁,这就产生了死锁问题。为了避免思索问题,等待全局锁的时间最多300MS,超时则会回滚事务并释放数据库锁

使用AT模式

在参与事务的每个微服务所关联数据库中创建 undo_log表,用来记录快照

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

修改配置文件,每个参与事务的微服务都需要修改

seata:
  # 使用AT模式
  data-source-proxy-mode: AT

TCC模式

TCC模式与AT模式非常相似,每个阶段都是独立事务,不同的是TCC模式是通过代码来实现数据恢复的,需要实现三个方法

  • Try:资源的检测和预留
  • Confirm:完成资源操作业务,要求Try成功Confirm一定要能成功
  • Cancel:预留资源释放,可以理解为Try的反向操作

Try、Confirm、Cancel都是在RM里面执行的操作,Confirm对应提交、Cancel对应回滚操作

优点

  • 直接提交事务,释放数据库锁,性能好
  • 无需生成快照、无需全局锁、性能好
  • 不依赖数据库事务,可以用于非事务型数据库

缺点

  • 有代码侵入,需要人为编写Try、Confirm和Cancel接口
  • 软状态,事务是最终一致
  • 需要考虑Confirm和Cancel的失败情况和幂等性

TCC的空回滚和业务悬挂

当TM调用RM时,其中有一个分支发生了阻塞没有执行 Try逻辑,TM发现阻塞超过一定时间后就会向TC报超时错误同时回滚全局事务,其中因为阻塞没有执行 Try逻辑的RM就会发生空回滚。

一个业务发生了空回滚以后,如果还能继续执行 Try就会发生业务悬挂。应当阻止执行空回滚后的 Try操作

使用TCC模式

根据业务需求创建一个表用来存储 Try逻辑冻结的资源

CREATE TABLE `account_freeze_tbl`  (
  `xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL comment '全局事务id',
  `user_id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL comment '用户id',
  `freeze_money` int(11) UNSIGNED NULL DEFAULT 0 comment '冻结金额',
  `state` int(1) NULL DEFAULT NULL COMMENT '事务状态,0:try,1:confirm,2:cancel',
  PRIMARY KEY (`xid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = COMPACT;

编写TCC接口

// 声明使用TCC模式
@LocalTCC
public interface AccountTCCService {
    // name:Try语句 commitMethod:confirm语句 rollbackMethod:cancel语句
    @TwoPhaseBusinessAction(name = "deduct", commitMethod = "confirm", rollbackMethod = "cancel")
    // BusinessActionContextParameter可以将参数放入 BusinessActionContext 作用域中 然后在confirm和cancel语句中也能获取
    void deduct(@BusinessActionContextParameter("userId") String userId,
                @BusinessActionContextParameter("money") int money);
    boolean confirm(BusinessActionContext context);
    boolean cancel(BusinessActionContext context);
}
@Service
public class AccountTCCServiceImpl implements AccountTCCService {
    private final AccountMapper accountMapper;
    private final AccountFreezeMapper accountFreezeMapper;

    public AccountTCCServiceImpl(AccountMapper accountMapper, AccountFreezeMapper accountFreezeMapper){
        this.accountMapper = accountMapper;
        this.accountFreezeMapper = accountFreezeMapper;
    }
    @Override
    public void deduct(String userId, int money) {
        String xid = RootContext.getXID();
        // 防止业务悬挂 只要业务发生过CANCEL操作就会留下记录
        if (accountFreezeMapper.selectById(xid) != null){
            return;
        }
        // 业务逻辑
        accountMapper.deduct(userId, money);
        AccountFreeze accountFreeze = new AccountFreeze(xid, userId, money, AccountFreeze.State.TRY);
        // 冻结金额 保存到数据库用于后面回滚
        accountFreezeMapper.insert(accountFreeze);
    }

    @Override
    public boolean confirm(BusinessActionContext context) {
        // 全局事务提交后的执行逻辑 删除对应事务id的冻结金额
        String xid = context.getXid();
        int count = accountFreezeMapper.deleteById(xid);
        return count == 1;
    }

    @Override
    public boolean cancel(BusinessActionContext context) {
        String xid = context.getXid();
        AccountFreeze freeze = accountFreezeMapper.selectById(xid);
        // 防止空回滚 new一个新的AccountFreeze对象并将状态设置为 CANCEL
        if (freeze == null){
            String userId = context.getActionContext("userId", String.class);
            AccountFreeze accountFreeze = new AccountFreeze(xid, userId, 0, AccountFreeze.State.CANCEL);
            accountFreezeMapper.insert(accountFreeze);
            return true;
        }
        // 防止幂等性
        if(freeze.getState() == AccountFreeze.State.CANCEL){
            return true;
        }
        // 回滚操作 更新金额到Try语句执行前
        String userId = freeze.getUserId();
        Integer money = freeze.getFreezeMoney();
        accountMapper.refund(userId, money);
        // 回滚后 将freeze的冻结金额设为0 状态更改为CANCEL 然后更新到数据库
        freeze.setFreezeMoney(0);
        freeze.setState(AccountFreeze.State.CANCEL);
        int count = accountFreezeMapper.updateById(freeze);
        return count == 1;
    }
}

Saga模式

Saga模式是 Seata提供的长事务解决方案,也分为两个阶段:

  • 一阶段:RM直接提交本地事务
  • 二阶段:成功则什么都不做,失败则通过编写补偿业务来回滚

优点

  • 事务参与者可以基于事件驱动实现异步调用,吞吐性能更好
  • 一阶段RM直接提交事务,不会长时间占用数据库锁和没有全局锁,性能好
  • 不用编写TCC中的三个阶段,实现简单

缺点

  • 异步驱动存在时效性问题,数据软状态时间不确定
  • 没有锁和事务隔离,存在脏写

分布式缓存

单点 Redis的问题:

  • 数据丢失问题

    实现 Redis数据持久化

  • 存储能力问题

    搭建分片集群,利用插槽机制实现动态扩容

  • 并发能力问题

    搭建主从集群,实现读写分离

  • 故障恢复问题

    利用 Redis哨兵,实现健康检测和自动恢复

Redis持久化

RDB AOF
持久化方式 定时对整个内存做快照 记录每一次执行的命令
数据完整性 不完整,两次备份之间会丢失 相对完整,取决去策略
文件大小 会有压缩,文件体积小 记录命令,体积大
宕机恢复速度 很快
数据恢复优先级 低,完整性不如AOF
系统资源占用 高,大量CPU和内存消耗 低,但重写AOF文件会占用资源
使用场景 更快的启动速度和数分钟的数据丢失 数据安全性要求较高

RDB持久化

RDB全程 Redis Database Backup file,也被叫做 Redis数据快照。负责把内存中的所有数据记录到磁盘中。当 Redis实例故障重启后,从磁盘中读取快照文件,恢复数据

Redis在停止服务时默认会执行一次 RDB持久化操作,这时使用的使用的是 save命令

  • save:暂停主进程,写入内存数据到rdb文件,会阻塞所有请求
  • bgsave:后台保存,fork主进程得到子进程,子进程共享主进程的内存数据,然后写入到rdb文件

如果想要实现在后台自动备份,可以修改 Redis的配置文件 redis.conf

sudo vim /etc/redis/redis.conf

# 默认是 save "" 表示禁用RDB
# 取消注释 save "" 编写自己的备份规则
# 900秒内有一个key被修改,则执行 bgsave
save 900 1
save 300 10
save 60 10000

# 其他配置
# RDB保存时是否压缩 压缩会占用大量CPU资源
rdbcompression yes
# RDB备份文件的名字
dbfilename dump.rdb
# RDB备份文件的路径
dir /var/lib/redis/

AOF持久化

AOF全称为 Append Only File(追加文件)。Redis处理的每一个写命令都会记录在 AOF文件中,可以看做类似于 Mysql的日志文件

AOF默认是关闭的,可以修改 redis.conf来开启 AOF功能

sudo vim /etc/redis/redis.conf

# 修改配置
# 开启AOF
appendonly yes
# AOF的文件名称
appendfilename "appendonlf.aof"
# AOF的文件路径
appenddirname "/var/lib/redis"
# 写入AOF文件的频率
appendfsync everysec # 先放入AOF缓冲区 每隔1秒写入一次 默认方案
# always 每执行一次写命令就记录一次
# no 先放入AOF缓冲区 由操作系统决定何时写入

因为是记录命令,AOF文件会比 RDB文件大的多。而且 AOF会记录对同一个 Key的多次写操作,可以通过 bgrewriteaof命令让 AOF文件执行重写功能。

可以通过修改配置文件中触发阈值配置让 Redis去自动重写 AOF文件

# 增长超过多少百分比触发重写
auto-aof-rewrite-percentage 100
# AOF文件体积最小多大以上触发重写
auto-aof-rewrite-min-size 64mb

在实际生产环境中,RDB可以和 AOF结合使用

Redis主从

搭建Redis主从架构

为了在高并发场景下进一步提高 Redis的并发能力,就可以搭建主从集群,实现读写分离

使用 Docker Compose搭建一个一主二从的读写分离集群

# 创建对应的工作目录 redis-master redis-replica1/2
# 工作目录内再创建 data conf目录 存放数据和配置文件

# 主节点配置文件
vim redis-master/conf/redis.conf

----------------------------------------
# 端口号
port 7001
bind 0.0.0.0
# 不开启appendonly备份模式
appendonly no
# 日志配置
loglevel notice
# 持久化文件保存目录
dir /data
# 持久化保存文件名
dbfilename dump.rdb
# 节点密码
requirepass admin
# 声明自己的ip地址与端口号 使用物理机的外网端口
slave-announce-ip 10.10.10.10
slave-announce-port 7001
# 主节点连接密码
masterauth admin
----------------------------------------

# 从节点同理
vim redis-replica1/conf/redis.conf

----------------------------------------
# 端口号
port 7002
bind 0.0.0.0
# 不开启appendonly备份模式
appendonly no
# 日志配置
loglevel notice
# 持久化文件保存目录
dir /data
# 持久化保存文件名
dbfilename dump.rdb
# 节点密码
requirepass admin
# 主节点的配置 docker中可以使用容器名字访问
replicaof redis-master 7001
# 声明自己的ip地址与端口号
slave-announce-ip 10.10.10.10
slave-announce-port 7002
# 主节点连接密码
masterauth admin
----------------------------------------
# 创建一个虚拟网络 供redis使用
docker network create redis-net
version: '3'
services:
  redis-master:
    image: redis
    container_name: redis-master
    networks:
      - redis-net
    ports:
      - 7001:7001
    volumes:
      - ./redis-master/data:/data
      - ./redis-master/conf/redis.conf:/etc/redis/redis.conf
    command: redis-server /etc/redis/redis.conf
  redis-replica1:
    image: redis
    container_name: redis-replica1
    networks:
      - redis-net
    ports:
      - 7002:7002
    volumes:
      - ./redis-replica1/data:/data
      - ./redis-replica1/conf/redis.conf:/etc/redis/redis.conf
    command: redis-server /etc/redis/redis.conf
  redis-replica2:
    image: redis
    container_name: redis-replica2
    networks:
      - redis-net
    ports:
      - 7003:7003
    volumes:
      - ./redis-replica2/data:/data
      - ./redis-replica2/conf/redis.conf:/etc/redis/redis.conf
    command: redis-server /etc/redis/redis.conf
networks:
  redis-net:
    external: true
    name: redis-net
# 启动
docker compose up -d
# 验证一下 从节点如果连接成功会有显示
docker logs redis-master

数据同步原理

Redis主从同步的第一次同步是全量同步,从节点会带上 replidoffset去请求主节点,主节点判断 replid是否和自己一致,如果不一致则触发全量同步,这时主节点会执行 bgsave,生成 RDB文件并发送给从节点,从节点清空本地数据加载 RDB文件。在 bgsave执行期间,主节点会记录所有的命令并放在缓冲区 repl_baklog,等从节点数据加载完毕之后再将命令发送给从节点,从节点执行命令实现数据同步

  • Replication Id:简称replid,是数据集的标记,id一致则说明是同一数据集。每一个 master都有唯一的 replidslave则会继承 master节点的 replid
  • offset:偏移量,随着记录在 repl_baklog中的数据增多而逐渐增大。slave完成同步时也会记录当前同步的 offset,如果 slaveoffset小于 masteroffset,说明 slave数据落后于 master,需要更新

如果主节点判断 replid一致的话就触发增量同步,主节点会去 repl_baklog缓存区中查找从节点 offset之后的数据并发送给从节点

如果从节点的宕机时间过久,主节点中 repl_baklog覆盖了最早的数据,那么从节点只能再次全量同步

主从同步优化

  • master中配置 repl-diskless-sync yes启动无磁盘同步,避免全量同步时还需要先将数据写入到磁盘

  • Redis单节点上的内存占用不要太大,减少 RDB导致的过多磁盘IO

  • 适当提高 repl_baklog的大小,发现从节点宕机时尽快恢复,尽可能避免全量同步

  • 限制一个 master上的从节点数量,如果从节点过多,则可以采用主 -> 从 -> 从链式结构

    从节点将 replicaof设置为另一个从节点的地址和端口即可实现链式同步

Redis哨兵

哨兵的作用和原理

Redis提供了哨兵 Sentinel机制来实现主从集群的自动故障恢复

  • 监控:哨兵会不断检查 masterreplica是否正常工作
  • 自动故障恢复:如果 master故障,哨兵会将一个 replica提升为 master,故障实例恢复之后也以新的 master为主
  • 哨兵充当 Redis客户端的服务发现来源,当集群发生故障转移时,会将最新消息推送给 Redis的客户端

哨兵 Sentinel基于心跳机制监测服务状态,每隔1秒向集群的中每个实例发送 ping命令

  • 主观下线:某个 Sentinel节点发现某个实例未在规定时间响应,则任务该实例主观下线
  • 客观下线:若超过指定数量 quorumSentinel都认为该实例主观下线,则该实例客观下线

一旦发现 master故障,Sentinel需要在 replica中选择一个作为新的 master

  • 首先会判断 replica节点与 master节点断开时间长短,如果超出指定值则会排除该 replica节点
  • 然后判断 replica节点的 slave-priority值,越小优先级越高,如果是0则不参加选举
  • 如果 slave-priority一样,则判断 replica节点的 offset值,越大则数据越新,优先级越高
  • 最后是判断 replica节点的运行 id大小,越小优先级越高

选中了其中一个新的 replica为新的 master后,就会开始故障转移

  • Sentinel给备选的 replica节点发送 replicaof no one命令,让该节点成为 master
  • Sentinel给其他所有 replica节点发送 replicaof xxxxx xxxx命令,让这些 replica成为新的 master的从节点

搭建哨兵集群

创建 sentinel.conf配置文件

vim sentinel.conf

# 配置
port 27001
# 这里的ip是redis-master的ip地址
sentinel monitor mymaster 10.10.10.10 7001 2
sentinel down-after-milliseconds mymaster 5000
sentinel failover-timeout mymaster 10000
# 主库的连接密码
sentinel auth-pass mymaster admin
dir /data

创建集群文件夹

mkdir s1 s2 s3
mkdir s1/data s2/data s3/data
# 将配置文件复制到每个目录各一份 确保修改端口

使用 Docker Compose部署

version: '3'
services:
  redis-sentinel1:
    image: redis
    container_name: redis-sentinel1
    command: redis-sentinel /etc/redis/sentinel.conf
    networks:
      - redis-net
    # 映射配置文件和数据目录
    volumes:
      - ./s1/sentinel.conf:/etc/redis/sentinel.conf
      - ./s1/data:/data
    ports:
      - "27001:27001"
  redis-sentinel2:
    image: redis
    container_name: redis-sentinel2
    command: redis-sentinel /etc/redis/sentinel.conf
    networks:
      - redis-net
    volumes:
      - ./s2/sentinel.conf:/etc/redis/sentinel.conf
      - ./s2/data:/data
    ports:
      - "27002:27002"
  redis-sentinel3:
    image: redis
    container_name: redis-sentinel3
    command: redis-sentinel /etc/redis/sentinel.conf
    networks:
      - redis-net
    volumes:
      - ./s3/sentinel.conf:/etc/redis/sentinel.conf
      - ./s3/data:/data
    ports:
      - "27003:27003"
# 和redis主从集群共用网络
networks:
  redis-net:
    external: true
    name: redis-net
# 启动 redis 哨兵集群
docker compose up -d
# 查看log 是否和redis-master成功建立连接
docker logs redis-sentinel1

RedisTemplate的哨兵模式

Sentinel集群监管下的 Redis主从集群,其节点会因为自动故障转移而发生变化,Redis的客户端需要及时更新连接信息。Spring提供的 RedisTemplate底层利用了 lettuce实现了节点的感知和自动切换。

修改配置文件

spring:
  redis:
    # 配置redis的哨兵集群
    sentinel:
      master: mymaster # 指定master名称 需要和sentinel配置文件中的一致
      nodes:
        - 10.10.10.10:27001
        - 10.10.10.10:27002
        - 10.10.10.10:27003

配置主从读写分离

// ReadFrom是Redis的读取策略 枚举类
// MASTER:从主节点读取
// MASTER_PREDERRED:优先从主节点读取,主节点不可用才读取从节点
// REPLICA:从从节点读取
// REPLICA_PREFRRED:优先从节点读取,所有从节点不可用才读取主节点

@Bean
public LettuceClientConfigurationBuilderCustomizer configurationBuilderCustomizer(){
    return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
}

Redis分片集群

主从和哨兵可以解决高可用、高并发读的问题。分片集群可以解决海量数据存储问题、高并发写的问题。分片集群的特征:

  • 集群中有多个 master,每个 master保存不同的数据
  • 每个 master都可以有多个 replica节点
  • master之间通过 ping监测彼此健康状态

集群搭建

使用 Docker搭建 Redis分片集群

version: '3'
services:
  redis-1:
    image: redis
    container_name: redis-1
    networks:
      - redis-net
    ports:
      - 7001:7001
      # 17001是redis集群的总线通信端口  规则是redis端口 + 10000
      - 17001:17001
    command: redis-server --cluster-enabled yes --cluster-config-file redis-1.conf --port 7001 --bind 0.0.0.0
  redis-2:
    image: redis
    container_name: redis-2
    networks:
      - redis-net
    ports:
      - 7002:7002
      - 17002:17002
    command: redis-server --cluster-enabled yes --cluster-config-file redis-2.conf --port 7002 --bind 0.0.0.0
  redis-3:
    image: redis
    container_name: redis-3
    networks:
      - redis-net
    ports:
      - 7003:7003
      - 17003:17003
    command: redis-server --cluster-enabled yes --cluster-config-file redis-3.conf --port 7003 --bind 0.0.0.0
  redis-4:
    image: redis
    container_name: redis-4
    networks:
      - redis-net
    ports:
      - 8001:8001
      - 18001:18001
    command: redis-server --cluster-enabled yes --cluster-config-file redis-4.conf --port 8001 --bind 0.0.0.0
  redis-5:
    image: redis
    container_name: redis-5
    networks:
      - redis-net
    ports:
      - 8002:8002
      - 18002:18002
    command: redis-server --cluster-enabled yes --cluster-config-file redis-5.conf --port 8002 --bind 0.0.0.0
  redis-6:
    image: redis
    container_name: redis-6
    networks:
      - redis-net
    ports:
      - 8003:8003
      - 18003:18003
    command: redis-server --cluster-enabled yes --cluster-config-file redis-6.conf --port 8003 --bind 0.0.0.0
networks:
  redis-net:
    external: true
    name: redis-net

测试集群是否可用

# 进入容器
docker exec -it redis-1 /bin/bash
# 登陆redis
redis-cli -c -p 7001
# 查看集群节点
cluster nodes

散列插槽

Redis会把每一个 master节点映射到 0~1638316384个插槽。在分片集群中,数据 key不是与节点绑定,而是与插槽绑定。Redis会根据 key的有效部分计算插槽值

  • key中包含 {},且 {}中不为空,则 {}中的是有效部分
  • key中不包含 {},整个 key都是有效部分

集群伸缩

添加节点

通过 redis-cli --cluster命令来操作 Redis集群

# 添加一个节点到集群 需要新节点的ip 端口 和集群中一个旧节点的ip和端口
redis-cli --cluster add-node newIp:newPort oldIp:oldPort
删除节点

故障转移

Redis集群具备自动主从、故障切换,不需要哨兵

手动故障转移

使用 cluster failover命令可以手动让集群中的某个 master宕机,切换到这个 master节点的 replica节点,实现手动故障转移和数据迁移

手动的 failover支持三种不同模式:

  • 缺省:默认流程
  • force:忽略对数据的一致性校验
  • takeover:直接替换,忽略数据一致性、master状态和其他master意见

RedisTemplate访问分片集群

RedisTemplate底层基于 lettuce实现了分片集群的支持,步骤与哨兵模式基本一致

只需要修改配置文件

spring:
  redis:
    # 配置集群地址
    cluster:
      nodes:
        - 10.10.10.10:7001
        - 10.10.10.10:7002
        - 10.10.10.10:7003
        - 10.10.10.10:8001
        - 10.10.10.10:8002
        - 10.10.10.10:8003

同哨兵模式的配置一样,也需要配置读写分离

多级缓存

传统的缓存策略是请求到达 Servlet容器后,先查询 RedisRedis中没有命中再查询数据库。这就存在几个问题:

  • 请求要经过 Servlet容器,传统的 Tomcat性能会成为整个系统的瓶颈
  • Redis缓存失效时,会对数据库产生冲击

多级缓存就是充分利用请求处理的每个环节,分别添加缓存,提升服务性能

浏览器客户端缓存 -> Nginx本地缓存 -> Redis缓存 -> Tomcat进程缓存 -> 查询数据库

JVM进程缓存(本地进程缓存)

缓存可以大量减少数据库的访问,减轻数据库压力、提升服务响应性能,可以把缓存分为两类:

  • 分布式缓存,例如 Redis
    • 优点:可共享、容量大、可靠性好
    • 缺点:访问有网络开销
    • 场景:缓存数据量较大、可靠性要求较高、需要共享
  • 进程本地缓存,例如 HashMap、GuavaCache
    • 优点:读取本地内存、无网络开销、速度快
    • 缺点:存储容量有限、可靠性较低、无法共享
    • 场景:性能要求较高、缓存数据量较小

使用 Caffeine作为本地缓存,官方地址

// 创建缓存对象
Cache<String, String> cache = Caffeine.newBuilder().build();
// 添加数据到缓存
cache.put("num", "123");
// 获取数据 不存在则返回null
String num = cache.getIfPresent("num");
System.out.println(num);
// 获取数据 不存在则执行对应的处理逻辑 key对应第一个参数值
String one = cache.get("one", key -> {
    return "1";
});
System.out.println(one);

设置缓存过期策略

Cache<String, String> cache = Caffeine.newBuilder()
    // 设置缓存大小上限为 1
    .maximumSize(1)
    .build();

Cache<String, String> cache = Caffeine.newBuilder()
    // 设置缓存有效期为10秒钟
    .expireAfterWrite(Duration.ofSeconds(1)) 
    .build();

在默认情况下,当一个缓存元素过期的时候,Caffeine不会自动立即将其清理和驱逐。而是在一次读或写操作后或者在空闲时间完成对失效数据的驱逐。

// 实践
@Bean
public Cache<Long, Item> itemCache() {
    return Caffeine.newBuilder()
        .initialCapacity(100)
        .maximumSize(10000)
        .build();
}

@GetMapping("/{id}")
public Item findById(@PathVariable("id") Long id){
    return itemCache.get(id, key -> itemService.query()
           .ne("status", 3).eq("id", key)
           .one());
}

Lua语法

官方网站

vim hello.lua
# 写入 print('Hello World')
# 运行
lua hello.lua

变量和循环

数据类型 说明
nil 表示一个无效值(在条件表达式中等同于false)
boolean 布尔值:true和false
number 双精度类型的实浮点数
string 字符串,双引号或者单引号表示
function 由C或者Lua编写的函数
table 表(table)其实是一个关联数组,索引可以是数字、字符串或表类型。table的创建通过构造表达式完成,最简单的是 {},创建一个空表

Lua在声明变量的时候,不需要指定数据类型

local str = "123"
local num = 123
local bl = true
-- 声明数组 Lua的数组下标从1开始
local arrs = {1, 2, 3}
-- 声明table
local object = {name="AB", age=18}

访问 table

-- 访问数组
print(arrs[1])
-- 访问table
print(object.age)

遍历数组

-- 遍历 arrs 数组 index:下标 value:数组中对应index的值 do:循环开始 end:循环结束
for index, value in ipairs(arrs) do
    print(index, value)
end

遍历 table

for key, value in pairs(object) do
    print(key, value)
end

条件控制、函数

定义函数的语法

function 函数名(param1, param2...)
    return 返回值
end

条件控制

if(布尔表达式)
then
    -- 为true时执行
else
    -- 为false时执行
end

Lua中布尔表达式的逻辑运算基于英文单词

&& -> and || -> or ! -> not

使用多级缓存

OpenResty

OpenResty是一个基于 Nginx的高性能Web平台,用于方便的搭建能处理超高并发、扩展性极高的动态Web应用、Web服务和动态网关。官方地址

  • 具备Nginx的完整功能
  • 基于Lua语言进行扩展,集成大量Lua库、模块
  • 允许使用Lua自定义业务逻辑、自定义库
安装

ArchLinux系统可以直接通过 yay安装

yay -S openresty

# 安装后 openresty的安装目录在 /opt/openresty

其他系统安装查看官方教程

快速入门

修改 openrestynginx的配置文件,加载 Lua模块

sudo vim /opt/openresty/nginx/conf/nginx.conf

# 在http代码块里面 添加下面两行
lua_package_path "/opt/openresty/lualib/?.lua;;";
lua_package_cpath "/opt/openresty/lualib/?.so;;";

使用 Lua代码返回响应结果

# 监听 /api/item 请求
location /api/item {
	# 默认的响应类型
	default_type application/json
	# 响应结果由 lua/item.lua 文件决定
	content_by_lua_file lua/item.lua
}
请求参数处理
参数格式 示例 解析代码示例
路径占位符 /item/1001 location ~ /item/(\d+){ } 正则表达式匹配<br />local id = ngx.var[1] 参数会被保存到 var数组中
请求头 id:1001 local headers = ngx.req.get_headers()
Get请求参数 ?id=1001 local getParams = ngx.req.get_uri_args()
Post表单参数 id=1001 ngx.req.read_body() 读取请求体<br />local postParams = ngx.req.get_post_args()
Json参数 {"id": 1001} ngx.req.read_body()<br />local json = ngx.req.get_body_data()
# 通过正则匹配 /api/item/ 路径后面的id参数 然后交给item.lua文件处理
location ~ /api/item/(\d+) {
    default_type application/json;
    content_by_lua_file lua/item.lua;
}
-- 获取路径参数
local id = ngx.var[1]

-- 直接将获取到的id返回
ngx.say('{"id": '..id..'}')

查询Tomcat

通过 Lua查询 Tomcat需要发送 Http请求,这里单独抽取一个方法出来,作为模块放入 openrestylualib文件夹中

sudo vim /opt/openresty/lualib/request.lua
local function http_get(path, params)
    -- capture发送http请求 这里的path不能写ip和端口 只能写一个路径 这个路径会被nginx内部的server监听 需要再反向代理一次
    local response = ngx.location.capture(path, {
        method = ngx.HTTP_GET, -- get请求
        arg = params -- 请求参数
    })
    -- 判断response是否存在 存在则直接返回响应结果 不存在则记录日志 返回404
    if(not response) then
        ngx.log(ngx.ERR, "http error, path:", path, ", args: ", params)
        ngx.exit(404)
    end
    return response.body
end

-- 导出方法
local _M = {
    http_get = http_get
}
return _M
# 路径的反向代理示例
# 将通过 capture 发送的指向 /server 路径的请求代理到指定ip地址和端口
location /server/ {
    proxy_pass http://10.10.10.176:8081/;
}

重写刚才的 item.lua文件

-- 导入函数库
-- 自己封装的发送请求函数库
local request = require 'request'
-- openresty提供的json序列化和反序列化函数库 encode 序列化  decode 反序列化
local cjson = require 'cjson'

-- 获取路径参数
local id = ngx.var[1]

-- 查询商品信息
local itemJson = request.http_get("/server/item/" .. id, nil)

-- 查询库存信息
local stockJson = request.http_get("/server/item/stock/" .. id, nil)

-- 反序列化json 合并数据
local item = cjson.decode(itemJson)
local stock = cjson.decode(stockJson)

item.stock = stock.stock
item.sold = stock.sold

-- 序列化商品信息并返回结果
ngx.say(cjson.encode(item))

针对后端的 server集群改造 nginx.conf文件

# 使用负载均衡
upstream server-cluster{
    # 通过对请求uri使用hash算法 确保同一个请求永远打到同一台服务器 提升后端本地缓存的命中率
    hash $request_uri;
    server 10.10.10.176:8081;
    server 10.10.10.176:8082;
}

location /server/ {
	# 负载均衡
    proxy_pass http://server-cluster/;
}

查询Redis缓存

冷启动与缓存预热:

  • 冷启动:服务刚刚启动时,Redis中并没有缓存,如果所有商品都在第一次查询后添加缓存,可能会给数据库带来较大压力
  • 缓存预热:在实际开发中,可以在项目启动时将某些热点数据提前查询并保存到 Redis

如果数据量较少,启动时可以将所有数据都放入缓存中

// 实现 ApplicationRunner 接口 在Spring的IOC容器加载完成后会立即执行 run 方法
@Component
public class RedisInitRunner implements ApplicationRunner {
    private final StringRedisTemplate redisTemplate;
    private final ItemService itemService;
    private final ItemStockService stockService;

    private static final ObjectMapper MAPPER = new ObjectMapper();

    public RedisInitRunner(StringRedisTemplate redisTemplate, ItemService itemService, ItemStockService stockService){
        this.redisTemplate = redisTemplate;
        this.itemService = itemService;
        this.stockService = stockService;
    }
    // 项目启动后 将数据库中的所有数据都放入redis中做缓存
    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 查询商品信息
        List<Item> itemList = itemService.list();
        itemList.forEach(item -> {
            try {
                // 序列化为json
                String itemJson = MAPPER.writeValueAsString(item);
                // 针对商品信息和商品库存使用不同的key
                redisTemplate.opsForValue().set("item:id:" + item.getId(), itemJson);
            } catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        });
        // 查询库存信息
        List<ItemStock> stockList = stockService.list();
        stockList.forEach(stock -> {
            try {
                String stockJson = MAPPER.writeValueAsString(stock);
                redisTemplate.opsForValue().set("stock:id:" + stock.getId(), stockJson);
            } catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        });
    }
}

使用 openresty提供的 redis模块查询 redis,封装一个 redis工具函数

-- 导入redis模块
local redis = require 'resty.redis'
local red = redis:new()
-- 设置超时时间
red:set_timeouts(1000, 1000, 1000)

-- 查询redis的方法
local function redis_value_get(ip, port, passwd, key)
    -- 建立连接
    local ok, err = red:connect(ip, port)
    if not ok then
        ngx.log(ngx.ERR, "建立redis连接失败", err)
        return nil
    end
    local ok, err = red:auth(passwd)
    if not ok then
        ngx.log(ngx.ERR, "redis密码错误", err)
        return nil
    end
    local response, err = red:get(key)
    -- 判断查询是否成功
    if not response then
        ngx.log(ngx.ERR, "查询redis失败", err, ", key = ", key)
    end
    -- 判断查询结果是否为空
    if response == ngx.null then
        response = nil
        ngx.log(ngx.WARN, "查询redis数据为空,key = ", key)
    end
    -- 将连接放入连接池 闲置是时间10s 最大连接数100
    red:set_keepalive(10000, 100)
    return response
end

-- 导出方法
local _M = {
    redis_value_get = redis_value_get
}

return _M

修改 item.lua中的查询逻辑,redis中查询为空时再查询 tomcat

-- 导入函数库
local request = require 'request'
local redisutil = require 'redisutil'
local cjson = require 'cjson'

-- 抽取成方法
local function query_item(key, path, params)
    local response = redisutil.redis_value_get("10.10.10.10", 6379, "admin", key)
    -- 如果response == nil 那么就会去查询tomcat
    if not response then
        response = request.http_get(path, params)
    end
    return response
end

-- 获取路径参数
local id = ngx.var[1]

-- 查询商品信息
local itemJson = query_item("item:id:" .. id, "/server/item/" .. id, nil)

-- 查询库存信息
local stockJson = query_item("stock:id:" .. id, "/server/item/stock/" .. id, nil)

Nginx本地缓存

OpenrestyNginx提供了 shard dict的功能,可以在 nginx多个 worker之间共享数据,实现缓存功能

开启 shard dict,在 nginx.confhttp段配置内添加一行

# 使用名称为 item_cache 的shard dict 大小为150MB
lua_shared_dict item_cache 150m;

Lua代码内操作本地缓存对象

-- 获取本地缓存
local item_cache = ngx.shared.item_cache

-- 查询方法
local function query_item(key, time, path, params)
    -- 先查询本地缓存 为空再查询redis
    local response = item_cache:get(key)
    if not response then
        response = redisutil.redis_value_get("10.10.10.10", 6379, "admin", key)
        if not response then
            response = request.http_get(path, params)
        end
        -- 通过redis或者tomcat查询到数据后 将数据保存到本地缓存中
        item_cache:set(key, response, time)
    end
    return response
end

缓存同步

缓存同步策略:

  • 设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新
    • 优点:简单、方便
    • 缺点:时效性差,容易出现缓存不一致问题
    • 场景:更新频率较低,时效性要求不高的业务
  • 同步双写:在修改数据库的同时,直接修改缓存
    • 优点:时效性强,缓存与数据库强一致
    • 缺点:有代码侵入,耦合度高
    • 场景:对一致性、时效性要求较高的缓存数据
  • 异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据
    • 优点:低耦合度,可以同时通知多个缓存服务
    • 缺点:时效性一般,可能存在中间不一致状态
    • 场景:时效性要求一般,有多个服务需要同步

使用Canal

Canal是阿里巴巴基于数据库增量日志解析,提供增量数据订阅&消费。Github地址

Canal的原理是基于 Mysql的主从同步来实现的,把自己伪装成 Mysqlslave节点,监听主节点的 binary log变化。再将数据变化的消息通知给 Canal客户端

需要先开启 Mysql的主从同步功能

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-do-db=item # 针对哪个库开启binlog功能 这里只需要item库
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 唯一id

新增一个用于 Canal同步的用户

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

使用 Docker启动 Canal

docker pull canal/canal-server:latest

# 启动容器
docker run -p 11111:11111 --name canal \
-e canal.destinations=canal \ # 集群名称
-e canal.instance.master.address=10.10.10.10:3307  \ # 主库地址
-e canal.instance.dbUsername=canal  \ # 主从同步的用户名和密码
-e canal.instance.dbPassword=canal  \
-e canal.instance.connectionCharset=UTF-8 \
-e canal.instance.tsdb.enable=true \
-e canal.instance.gtidon=false  \
-e canal.instance.filter.regex='item\\..*' \ # 监听的数据库
-d canal/canal-server:latest

# 查看日志
docker logs canal

SpringBoot中集成 Canal

MQ常见问题

还没学,学了再写

死信交换机

消息堆积及惰性队列

MQ集群

关联文章

0 条评论