???? Zookeeper部署与使用

1 分布式安装部署

配置服务器编号

在.../zkData文件目录下创建myid文件,并写入服务器编号

[root@hadoop101 apache-zookeeper-3.5.10-bin]# cd zkData/
[root@hadoop101 zkData]# 1 > myid
bash: 1: 未找到命令...
[root@hadoop101 zkData]# echo 1 > myid

其他的两台服务器分别写为2 3

增加zoo.cfg集群配置参数
##############cluster##############
server.1=192.168.60.101:2888:3888
server.2=192.168.60.102:2888:3888
server.3=192.168.60.103:2888:3888

⏰ 配置参数解读 server.A=B:C:D

  • A是一个数字,表示是第几号服务器。

    Zookeeper启动的时候会先读取dataDir目录下的myid文件,拿到里面的数据与zoo.cfg里面的配置信息对比从而判断到底是那个server

  • B是这个服务器的ip地址。

  • C是这个服务器与Leader服务器交换同步副本信息的端口号。

  • D是当Leader宕机后,这个服务器与其他服务器通信交换选举信息的端口号。

2 启动集群服务器

????先启动一台服务器,会发现由于半数机制,Zookeeper没有启动成功

[root@hadoop101 bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /opt/module/apache-zookeeper-3.5.10-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@hadoop101 bin]# jps
33904 QuorumPeerMain
34066 Jps
[root@hadoop101 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/apache-zookeeper-3.5.10-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Error contacting service. It is probably not running.

????这时候再启动二号机,一号机就会变成Follower,二号机成为Leader

[root@hadoop101 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/module/apache-zookeeper-3.5.10-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

3 ⭐启动集群客户端和命令

./zkcli.sh

⏰shell命令

  • 显示所有操作: help

  • 查看当前znode中包含的内容:ls

  • 查看当前znode中包含的详细内容:ls2

  • 创建普通节点:create

    [zk: localhost:2181(CONNECTED) 5] create /sanguo
    Created /sanguo
    [zk: localhost:2181(CONNECTED) 6] create /sanguo/shuguo "liubei"
    Created /sanguo/shuguo
    [zk: localhost:2181(CONNECTED) 7] get /sanguo
    null
    [zk: localhost:2181(CONNECTED) 8] get /sanguo/shuguo 
    liubei
    
  • 获取节点内容:get

  • 创建短暂节点:create -e,客户端退出后则消失:

    [zk: localhost:2181(CONNECTED) 9] create -e /sanguo/wuguo "sunquan"
    Created /sanguo/wuguo
    [zk: localhost:2181(CONNECTED) 10] get /sanguo/wuguo
    sunquan
    [zk: localhost:2181(CONNECTED) 11] quit
    
    [zk: localhost:2181(CONNECTED) 0] ls /sanguo 
    [shuguo]
    
  • 创建带序号的节点:create -s

    [zk: localhost:2181(CONNECTED) 0] ls /sanguo 
    [shuguo]
    [zk: localhost:2181(CONNECTED) 1] create /sanguo/weiguo "caocao"
    Created /sanguo/weiguo
    [zk: localhost:2181(CONNECTED) 2] create -s /sanguo/weiguo "caocao"
    Created /sanguo/weiguo0000000003
    [zk: localhost:2181(CONNECTED) 3] create -s /sanguo/weiguo "caocao"
    Created /sanguo/weiguo0000000004
    [zk: localhost:2181(CONNECTED) 4] create -s /sanguo/weiguo "caocao"
    Created /sanguo/weiguo0000000005
    [zk: localhost:2181(CONNECTED) 5] ls /sanguo 
    [shuguo, weiguo, weiguo0000000003, weiguo0000000004, weiguo0000000005]
    

    会自动为节点添加编号,没有则从0开始,否则编号顺序添加后有几个节点则编号就为几

  • 修改节点的值:set

    [zk: localhost:2181(CONNECTED) 6] set /sanguo/weiguo "simayi"
    [zk: localhost:2181(CONNECTED) 7] get /sanguo/weiguo
    weiguo             weiguo0000000003   weiguo0000000004   weiguo0000000005   
    [zk: localhost:2181(CONNECTED) 7] get /sanguo/weiguo
    simayi
    
  • 节点值的变化监听:get -w

    [zk: localhost:2181(CONNECTED) 4] get -w /sanguo
    null
    [zk: localhost:2181(CONNECTED) 5] 
    WATCHER::
    
    WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo
    

    watch监听只能生效一次

  • 节点路径监听:ls -w

    [zk: localhost:2181(CONNECTED) 8] ls -w /sanguo
    [shuguo, weiguo, weiguo0000000003, weiguo0000000004, weiguo0000000005, wuguo]
    [zk: localhost:2181(CONNECTED) 9] 
    WATCHER::
    
    WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo
    
    

    如果只创建节点而不存值的话是监听不到的

  • 删除节点:delete

    [zk: localhost:2181(CONNECTED) 11] delete /sanguo/weiguo
    weiguo             weiguo0000000003   weiguo0000000004   weiguo0000000005   
    [zk: localhost:2181(CONNECTED) 11] delete /sanguo/weiguo
    
  • 递归删除:deleteall

    [zk: localhost:2181(CONNECTED) 12] delete /sanguo
    Node not empty: /sanguo
    [zk: localhost:2181(CONNECTED) 13] rmr /sanguo
    The command 'rmr' has been deprecated. Please use 'deleteall' instead.
    [zk: localhost:2181(CONNECTED) 14] deleteall /sanguo
    Node does not exist: /sanguo
    [zk: localhost:2181(CONNECTED) 15] ls /
    [zookeeper]
    

4 ????️API的使用

引入依赖
<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.hadoop/zookeeper -->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.8.1</version>
    </dependency>

</dependencies>
创建客户端

使用构造函数创建Zookeeper客户端对象,参数为:

  1. ip地址+端口号(集群之间使用逗号隔开,不能添加空格)
  2. 会话超时时间
  3. watcher匿名内部类,包含一个process方法,即一个监听器对象
public class ZookeeperTest {

    // Zookeeper客户端器的ip和端口号
    private String connectString = "192.168.60.101:2181,192.168.60.102:2181,192.168.60.103:2181";
    // 会话超时时间
    private int sessionTimeOut = 2000;


    @Test
    public void test1() throws IOException {
        ZooKeeper zkCli = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }
}
20:08:20.001 [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=192.168.60.101:2181,192.168.60.102:2181,192.168.60.103:2181 sessionTimeout=2000 watcher=com.hikaru.ZookeeperTest$1@50cbc42f
创建节点
public class ZookeeperTest {

    // Zookeeper客户端器的ip和端口号
    private String connectString = "192.168.60.101:2181,192.168.60.102:2181,192.168.60.103:2181";
    // 会话超时时间
    private int sessionTimeOut = 2000;

    private ZooKeeper zkCli;

    @Before
    public void test1() throws IOException {
        zkCli = new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }

    @Test
    public void createNodeTest() throws KeeperException, InterruptedException {
        String path = zkCli.create("/zhanguo", "ZhenTianXingCun".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(path);
    }
}
[zk: localhost:2181(CONNECTED) 11] get /zhanguo 
ZhenTianXingCun
获取子节点并监听数据变化

????查询节点

    @Test
    public void getDataAndWatchTest() throws KeeperException, InterruptedException {
        List<String> children = zkCli.getChildren("/", false);
        for(String child : children) {
            System.out.println(child);
        }
    }
zookeeper
zhanguo

????查询并监控

@Test
public void getDataAndWatchTest() throws KeeperException, InterruptedException {
    List<String> children = zkCli.getChildren("/", true);
    for(String child : children) {
        System.out.println(child);
    }

    Thread.sleep(Long.MAX_VALUE);
}
21:09:45.568 [main-SendThread(192.168.60.101:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got notification session id: 0x100001a97080003
21:09:45.569 [main-SendThread(192.168.60.101:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/ for session id 0x100001a97080003
判断节点是否存在
@Test
public void exist() throws KeeperException, InterruptedException {
    Stat exists = zkCli.exists("/zhanguo", false);
    System.out.println(exists);
}

????️服务器节点动态上下线案例分析

  • 需求:客户端能实时洞察到服务器上下线的变化

  • ????分析:相对于Zookeeper集群,服务器端和客户端都是“客户端”:服务端需要向集群写数据,客户端则需要监视数据

image-20230131212756287

  • 服务器启动的时候就去注册信息,此时在Zookeeper创建的都是临时节点
  • 客户端启动就去getChildren,获取当前在线服务器列表,并注册监听(注册实际上就是创建一个节点)
  • 服务器下线的时候就会通知客户端

✒️代码实现

如上面所说的,这里的服务器和客户端是相对于我们开发人员接触到的分布式微服务而言的,下面实现的本质是依靠Zookeeper客户端实现server注册(本质即在Zookeeper集群创建节点)和数据监听(本质监听Zookeeper集群)

  • 服务器注册代码实现,比较简单不再赘述,需要注意一点创建的节点应该是带序号的临时节点
public class DistributeServer {
    // Zookeeper客户端器的ip和端口号
    private String connectString = "192.168.60.101:2181,192.168.60.102:2181,192.168.60.103:2181";
    // 会话超时时间
    private int sessionTimeOut = 2000;

    private ZooKeeper zkCli;

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        DistributeServer distributeServer = new DistributeServer();
        // 1 server连接集群
        distributeServer.connect();

        // 2 server注册
        distributeServer.register(args);

        // 3 业务逻辑
        distributeServer.services();
    }

    private void services() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void register(String[] hosts) throws KeeperException, InterruptedException {
        for(String host : hosts) {
            zkCli.create("/servers/server", host.getBytes(StandardCharsets.UTF_8),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        }
    }

    private void connect() throws IOException {
        this.zkCli =  new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {

            }
        });
    }
}

  • 客户端监听代码实现
    1. 首先客户端连接Zookeeper集群,并进行注册监听
    2. 注册监听会首先利用Zookeeper客户端获取并监听持久节点/services下面的子节点,子节点的值对应一台服务器,如此便获取到全部节点信息
    3. 每当节点信息发生变动则会触发process()方法,继续注册一次监听并实时获取全部节点信息(这样做的目的是监听只能生效一次)
public class DistributeClient {
    // Zookeeper客户端器的ip和端口号
    private String connectString = "192.168.60.101:2181,192.168.60.102:2181,192.168.60.103:2181";
    // 会话超时时间
    private int sessionTimeOut = 3000;

    private ZooKeeper zkCli;


    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        DistributeClient distributeClient = new DistributeClient();
        // 1 客户端连接集群
        distributeClient.connect();

        // 2 Client注册监听
        distributeClient.watch();

        // 3 业务逻辑
        distributeClient.services();
    }

    private void services() throws InterruptedException {
        Thread.sleep(Long.MAX_VALUE);
    }

    private void watch() throws KeeperException, InterruptedException {
        List<String> children = zkCli.getChildren("/servers", true);

        List<String> serversOnline = new ArrayList<>();

        for(String child : children) {
            byte[] host = zkCli.getData("/servers/" + child, false, null);
            serversOnline.add(new String(host));
        }

        System.out.println(serversOnline);
    }

    private void connect() throws IOException {
        this.zkCli =  new ZooKeeper(connectString, sessionTimeOut, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    watch();
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}