MQTT 概述

MQTT是基于TCP/IP协议栈构建的异步通信消息协议,是一种轻量级的发布、订阅信息传输协议。 可以在不可靠的网络环境中进行扩展,适用于设备硬件存储空间或网络带宽有限的场景。 使用MQTT协议,消息发送者与接收者不受时间和空间的限制。

Docker 部署 MQTT(采用docker-compose.yml)

version: "3" 
services:
    mqtt:
        image: eclipse-mosquitto
        container_name: mqtt
        privileged: true 
        ports: 
            - 1883:1883
            - 9001:9001
        volumes:
            - ./config:/mosquitto/config
            - ./data:/mosquitto/data
            - ./log:/mosquitto/log
  • 文件夹
    image

  • 创建 config/mosquitto.conf

persistence true
listener 1883
persistence_location /mosquitto/data
log_dest file /mosquitto/log/mosquitto.log
 
# 关闭匿名模式
# allow_anonymous true
# 指定密码文件
password_file /mosquitto/config/pwfile.conf
  • docker部署执行:docker compose up -d
  • 设置访问权限(用户名:admin,密码:admin123)
docker exec -it mqtt sh
touch /mosquitto/config/pwfile.conf
chmod -R 755 /mosquitto/config/pwfile.conf
mosquitto_passwd -b /mosquitto/config/pwfile.conf admin admin123
  • 重启mqtt容器:docker compose restart

Springboot 整合

  • 依赖
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.5</version>
        <relativePath/>
    </parent>
    
    <dependencies>
    		<!--  spring mqtt协议  -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
        <!--  lombok  -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--spring boot and web-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>

        <!--Http 请求 组件-->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
        </dependency>
        <!--测试组件-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-test</artifactId>
        </dependency>
    </dependencies>
  • 配置文件
mqtt.host=tcp://127.0.0.1:1883
mqtt.clientId=mqttx_a071ba88
mqtt.username=admin
mqtt.password=admin123
mqtt.topic=test_topic
mqtt.timeout=36000
mqtt.keepAlive=6000
  • 配置类
@Slf4j
@Configuration
public class MyMqttConfiguration {
    @Value("${mqtt.host}")
    String broker;
    @Value("${mqtt.clientId}")
    String clientId;
    @Value("${mqtt.username}")
    String username;
    @Value("${mqtt.password}")
    String password;
    @Value("${mqtt.timeout}")
    Integer timeout;
    @Value("${mqtt.keepAlive}")
    Integer keepAlive;
    @Value("${mqtt.topic}")
    String topic;
    @Autowired
    MyHandle myHandle;

    @Bean
    public MyMqttClient myMqttClient(){
        MyMqttClient mqttClient = new MyMqttClient(broker, username, password, clientId, timeout, keepAlive,myHandle);
        for (int i = 0; i < 10; i++) {
            try {
                mqttClient.connect();
                mqttClient.subscribe(topic,0);
                return mqttClient;
            } catch (MqttException e) {
                log.error("MQTT connect exception,connect time = " + i);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            }
        }
        return mqttClient;
    }

}
  • 客户端
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.util.ObjectUtils;

@Slf4j
public class MyMqttClient {
    private static MqttClient client;
    private String host;
    private String clientId;
    private String username;
    private String password;
    private Integer timeout;
    private Integer keepAlive;
    private MyHandle myHandle;

    public  MyMqttClient(){
        System.out.println("MyMqttClient空构造函数");
    }

    public MyMqttClient(String host, String username, String password, String clientId, Integer timeOut, Integer keepAlive,MyHandle myHandle) {
        System.out.println("MyMqttClient全参构造");
        this.host = host;
        this.username = username;
        this.password = password;
        this.clientId = clientId;
        this.timeout = timeOut;
        this.keepAlive = keepAlive;
        this.myHandle = myHandle;
    }

    public static MqttClient getClient() {
        return client;
    }

    public static void setClient(MqttClient client) {
        MyMqttClient.client = client;
    }

    /**
     * 设置mqtt连接参数
     */
     public MqttConnectOptions setMqttConnectOptions(String username,String password,Integer timeout, Integer keepAlive){
         MqttConnectOptions options = new MqttConnectOptions();
         options.setUserName(username);
         options.setPassword(password.toCharArray());
         options.setConnectionTimeout(timeout);
         options.setKeepAliveInterval(keepAlive);
         options.setCleanSession(true);
         options.setAutomaticReconnect(true);
         return options;
     }

    /**
     * 连接mqtt服务端
     */
    public void connect() throws MqttException {
        if(client == null){
            client = new MqttClient(host,clientId,new MemoryPersistence());
            client.setCallback(new MyMqttCallback(MyMqttClient.this,this.myHandle));
        }
        MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepAlive);
        if(!client.isConnected()){
            client.connect(mqttConnectOptions);
        }else{
            client.disconnect();
            client.connect(mqttConnectOptions);
        }
        log.info("MQTT connect success");
    }

    /**
     * 断开连接
     * @throws MqttException
     */
    public void disconnect()throws MqttException{
        if(null!=client && client.isConnected()){
            client.disconnect();;
        }
    }
    /**
     * 发布,qos默认为0,非持久化
     */
     public void publish(String pushMessage,String topic,int qos){
         publish(pushMessage, topic, qos, false);
     }

    /**
     * 发布消息
     *
     * @param pushMessage
     * @param topic
     * @param qos
     * @param retained:留存
     */
    public void publish(String pushMessage, String topic, int qos, boolean retained) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setPayload(pushMessage.getBytes());
        mqttMessage.setQos(qos);
        mqttMessage.setRetained(retained);
        MqttTopic mqttTopic = MyMqttClient.getClient().getTopic(topic);
        if(ObjectUtils.isEmpty(mqttTopic)){
            log.error("主题不存在");
        }
        synchronized (this){
            try{
                MqttDeliveryToken mqttDeliveryToken = mqttTopic.publish(mqttMessage);
                mqttDeliveryToken.waitForCompletion(1000L);
            }catch (MqttException e){
                e.printStackTrace();
            }
        }
    }

    /**
     * 订阅
     *
     * @param topic
     * @param qos
     */
    public void subscribe(String topic, int qos) {
        try {
            MyMqttClient.getClient().subscribe(topic, qos);
            log.info("订阅主题:"+topic+"成功!");
        } catch (MqttException e) {
            log.error("订阅主题:"+topic+"失败!",e);
        }
    }
    /**
     * 取消订阅
     */
    public void cleanTopic(String topic){
        if(!ObjectUtils.isEmpty(client) && client.isConnected()){
            try{
                client.unsubscribe(topic);
            }catch (MqttException e){
                log.error("取消订阅失败!"+e);
            }
        }else{
            log.info("主题不存在或未连接!");
        }
    }
}
  • 回调类(消息发送和接收时响应)
@Slf4j
public class MyMqttCallback implements MqttCallbackExtended {
    private MyMqttClient myMqttClient;
    private MyHandle myHandle;
    public MyMqttCallback(MyMqttClient myMqttClient,MyHandle myHandle) {
        this.myMqttClient = myMqttClient;
        this.myHandle = myHandle;
    }

    /**
     * 连接完成
     * @param reconnect
     * @param serverURI
     */
    @Override
    public void connectComplete(boolean reconnect,String serverURI) {
        log.info("MQTT 连接成功,连接方式:{}",reconnect?"重连":"直连");
        //订阅主题(可以在这里订阅主题)
        try {
            MyMqttClient.getClient().subscribe("topic1");
        } catch (MqttException e) {
            log.error("主题订阅失败");
        }
    }

    /**
     * 连接丢失 进行重连操作
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.warn("mqtt connectionLost >>> 5S之后尝试重连: {}", throwable.getMessage());
        long reconnectTimes = 1;
        while (true){
            try{
                Thread.sleep(5000);
            }catch (InterruptedException ignored){}
            try{
                if(MyMqttClient.getClient().isConnected()){ // 已连接
                    return;
                }
                reconnectTimes+=1;
                log.warn("mqtt reconnect times = {} try again...  mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);
                MyMqttClient.getClient().reconnect();
            }catch (MqttException e){
                log.error("mqtt断链异常",e);
            }
        }
    }

    /**
     * 订阅者收到消息之后执行
     * @param topic
     * @param mqttMessage
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("接收消息主题 : {},接收消息内容 : {}", topic, new String(mqttMessage.getPayload()));
        myHandle.handle(topic,mqttMessage);
    }

    /**
     * * 消息到达后
     * subscribe后,执行的回调函数
     * publish后,配送完成后回调的方法
     *
     * @param iMqttDeliveryToken
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        System.out.println("接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用");
        log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
    }
}
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class MyHandle {
    @Async
    public void handle(String topic, MqttMessage message) {
        log.info("处理消息主题:" + topic + " 信息:" + message);
    }
}