需求背景

在JAVA应用开发过程中,越来越多的项目采用了微服务架构,而采用微服务架构最直接作用是可以实现业务层解耦,有利于研发团队可以从业务层面进行划分,比如某几个人的小团队负责某几个微服务,总之,从业务角度来讲的话,让专业的人做专业的事。而同时由于部署了微服务后,经常需要保证业务的高可用,就有了同一服务多机部署的概念,而有的服务在任务处理的时候,可能需要保证任务处理的顺序性,在同一服务多机的时候,保证任务的顺序性其实是一个比较复杂的问题,比如说经常会用的邮件通知服务,邮件通知时,如果不按照业务的顺序进行通知的话,可能会造成一定错误。(比如一个告警邮件:第一个邮件:告警,第二个邮件解除告警,在这种情况下,如果顺序错乱会导致业务上造成误会甚至错误)

现实案例

某某产品中的邮件服务,需要多机部署,而同时需要保证邮件发送的顺序性。
目前架构中邮件服务的实现基本原理:各个服务根据需要,将需要发送的邮件内容,提交到redis队列,再由邮件服务进行轮询获取发送。

难点分析

但目前没有进行多机部署,无法保证服务的高可用,需要对其进行多机部署改造。而多机部署后,又涉及到邮件的实际发送顺序问题,所以在高可用的同时,仍需要保证业务的单一顺序性。对此在多机的同时,为其增加主备的功能,在多机的情况下,通过选主的方式选出主节点即实际工作节点,当主节点发生宕机的时候,再进行一次选主,选出另外一个工作节点。对于这种业务需要保证单一顺序的服务模块,通过主备的方式进行实现。

主备设计

由于目前系统中没有引入zookpeer分布式协调工具,所以对于选主目前通过redis来进行实现。

  1. 对于每一个JVM进程为其分配唯一的NODE_ID,启动后通过heartbeat机制定时(间隔20S)地在redis设值(key:jvm_process_NODE_ID_heartbeat value:NODE_ID ttl:30S)
  2. 系统启动后,主动地定时地去争取master lock,并获取其master状态;
  3. 当前进程如果争取master lock成功,则将自身的NODE_ID,设为value,并将过期时间设为30S;
  4. 如果master lock已被占有,根据其NODE_ID进行判断 ,如果是自身,则延长key存活时间,如果不是自身,则获取其value,判断value所指的NODE_ID的节点是否还存活(通过心跳去检查另外的节点),如果不存活,直接通过cas操作将其改为当前进程的NODE_ID,cas操作成功则抢主成功,反之则失败。
  5. 当获取节点状态(要么master 要么slave)后,则触发master-slave事件。

pom文件

这里需要用到redis,引入以下依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.11.4</version>
        </dependency>

心跳机制

通过java进程定时地去设值redis key的方式去维持进程的心跳,这里为了方便redis的操作,使用了redisson client工具类。
为了方便地使用通过单例的方式进行获取使用,同时将引入到spring 中,交由spring管理并初始化

public class JVMProcessHeartbeat {
    private static final Logger log = LoggerFactory.getLogger(JVMProcessHeartbeat.class);
    /**
     * 进程唯一id
     */
    private static final String NODE_ID = SystemConstant.NODE_ID;

    /**
     * 维护心跳的key值格式
     */
    private static final String FORMAT = "jvm_process_%s_heartbeat";

    /**
     * 根据NODE_ID生成当前进程的key
     */
    private static final String HEARTBEAT_KEY = String.format(FORMAT, NODE_ID);

    /**
     * redis操作工具
     */
    private RedissonClient redissonClient;

    /**
     * 定时维持心跳的线程池
     */
    private ScheduledExecutorService scheduledExecutorService;

    /**
     * 单例模式设计心跳
     */
    private static JVMProcessHeartbeat jvmProcessHeartbeat;

    /**
     * DCL单例实现
     *
     * @param redissonClient
     * @return
     */
    public static JVMProcessHeartbeat getInstance(RedissonClient redissonClient) {
        if (jvmProcessHeartbeat == null) {
            synchronized (JVMProcessHeartbeat.class) {
                if (jvmProcessHeartbeat == null) {
                    jvmProcessHeartbeat = new JVMProcessHeartbeat(redissonClient);
                }
            }
        }
        return jvmProcessHeartbeat;
    }

    private JVMProcessHeartbeat(RedissonClient redissonClient) {
        this.redissonClient = redissonClient;
        init();
    }

    /**
     * 初始化心跳维护线程
     */
    private void init() {
        BasicThreadFactory basicThreadFactory = new BasicThreadFactory.Builder().namingPattern("heartbeat").build();
        scheduledExecutorService = new ScheduledThreadPoolExecutor(1, basicThreadFactory);
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            RBucket<String> bucket = redissonClient.getBucket(HEARTBEAT_KEY);
            bucket.set(NODE_ID, 30, TimeUnit.SECONDS);
            log.debug("keep heart by redis,node id = [{}]",NODE_ID);
        }, 1, 15, TimeUnit.SECONDS);
        Runtime.getRuntime().addShutdownHook(new Thread(this::stopHeartbeat));
        log.info("JVMProcessHeartbeat init successful");
    }


    /**
     * 停止心跳,关闭线程池
     */
    private void stopHeartbeat() {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdown();
        }
        log.info("JVMProcessHeartbeat stop!");
    }


    /**
     * 检查指定的节点是否在线
     *
     * @param nodeId
     * @return
     */
    public boolean checkOnline(final String nodeId) {
        String key = String.format(FORMAT, nodeId);
        RBucket<String> bucket = redissonClient.getBucket(key);
        if (bucket != null && bucket.isExists()) {
            return true;
        }
        return false;
    }

}

支持主从切换的Bean

对于需要进行主从切换的bean,将其生命周期划分为以下几个阶段

  1. 初始化(init)
  2. turnMaster(切换为主)
  3. turnSlave(切换为从)
public interface MasterSlaveSwitchBean {

    /**
     * 初始伦
     */
    default void init() {

    }

    /**
     * 切换为master为的触发事件
     */
    default void turnMaster() {

    }

    /**
     * 切换为slave后的触发事件
     */
    default void turnSlave() {

    }

    /**
     * 通过静态方法的方式将其进行注册
     *
     * @param applicationContext
     * @param masterSlaveSwitchBean
     */
    static void supportMasterSlave(ApplicationContext applicationContext, MasterSlaveSwitchBean masterSlaveSwitchBean) {
        ProcessMasterSelector masterSelector = null;
        MasterSlaveNamespace masterSlaveNamespace = masterSlaveSwitchBean.getClass().getAnnotation(MasterSlaveNamespace.class);
        String namespace = Optional.ofNullable(masterSlaveNamespace).map(MasterSlaveNamespace::value).orElse(SystemConstant.DEFAULT_MASTER_SLAVE_NAMESPACE);
        try {
            masterSelector = applicationContext.getBean(namespace, ProcessMasterSelector.class);
        } catch (Exception e) {
        }
        if (masterSelector == null) {
            //如果是非集群状态下,则先init初始化,再运行turnMaster
            masterSlaveSwitchBean.init();
            masterSlaveSwitchBean.turnMaster();
        } else {
            //将其注册上去
            masterSelector.register(new MasterSlaveSwitchBeanDecorator(masterSlaveSwitchBean));
        }
    }
}

为了更方便、更准确地控制支持主从切换bean的生命周期,为其添加一个包装类MasterSlaveSwitchBeanDecorator,重点关注其内部的boolean类型的属性 init 和 start
init方法在整个进程运行期间,只会被调用一次,而turnMaster 和 turnSlave则会根据切换可能被多次调用,这里有到了装饰者模式,实现如下 :

public class MasterSlaveSwitchBeanDecorator implements MasterSlaveSwitchBean {
    /**
     * 初始化标识
     */
    @Getter
    private boolean inited = false;
    /**
     * 是否已经运行
     */
    @Getter
    private boolean started = false;

    /**
     * 具体的clusterBootstrapBean
     */
    @Getter
    private MasterSlaveSwitchBean masterSlaveSwitchBean;

    public MasterSlaveSwitchBeanDecorator(MasterSlaveSwitchBean masterSlaveSwitchBean) {
        this.masterSlaveSwitchBean = masterSlaveSwitchBean;
    }
    @Override
    public void init() {
        long startTime = System.currentTimeMillis();
        log.info("start handle [{}] init method", masterSlaveSwitchBean.getClass().getSimpleName());
        masterSlaveSwitchBean.init();
        inited = true;
        log.info("end handle [{}] init method,cost time [{}] seconds", masterSlaveSwitchBean.getClass().getSimpleName(), (System.currentTimeMillis() - startTime) / 1000);
    }
    @Override
    public void turnMaster() {
        if (started) {
            return;
        }
        log.info("start handle [{}] turn master method", masterSlaveSwitchBean.getClass().getSimpleName());
        masterSlaveSwitchBean.turnMaster();
        started = true;
    }
    @Override
    public void turnSlave() {
        if (!started) {
            return;
        }
        log.info("start handle [{}] turn slave method", masterSlaveSwitchBean.getClass().getSimpleName());
        masterSlaveSwitchBean.turnSlave();
        started = false;
    }
}

为了更方便地在代码中使用masterSlaveSwitchBean,为其添加一个抽象类,bean只需要继承这个抽象类,就能够被master selector动态地控制进行事件的触发。
实现者只需要继承该类即可

public abstract class AbstractMasterSlaveSwitchBean implements ApplicationContextAware, MasterSlaveSwitchBean {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        MasterSlaveSwitchBean.supportMasterSlave(applicationContext, this);
    }
}

选主器(MasterSelector)

选主流程

  1. 系统启动后,定时地去维护一个选主的key,如果这个key的value值是自己的话,则自身就是master节点
  2. 当完成一次状态维护后,触发事件(根据当前注册上来的bean状态信息去执行具体的业务逻辑(三个方法 init turnMaster turnSlave))
    具体实现代码相对较多,已放至github,欢迎大家对不足之处进行指正。
    github地址:https://github.com/873098424/redisMasterSelctor.git