博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【zookeeper】4、利用zookeeper,借助观察模式,判断服务器的上下线
阅读量:5360 次
发布时间:2019-06-15

本文共 14389 字,大约阅读时间需要 47 分钟。

 首先什么是观察者模式,可以看看我之前的设计模式的文章

 

确定一下,要有观察者,要有被观察者,然后要被观察者触发事件,事件发生之后,观察者触发相应的事件发生

了解了基本概念,我们来看看zookeeper是什么情况

zookeeper也是类似观察者一样,我们先把本机信息注册进入服务器,然后设置一个watch方法,这个在zookeeper节点发生变化的时候通知对应的客户端,触发对应的方法

这里先注册服务,如何向zookeeper进行注册呢

 

package cn.cutter.demo.hadoop.zookeeper;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;/** * @ProjectName: cutter-point * @Package: cn.cutter.demo.hadoop.zookeeper * @ClassName: TimeQueryServer * @Author: xiaof * @Description: 利用zookeeper来进行分布式时间查询 * @Date: 2019/4/2 19:37 * @Version: 1.0 */public class TimeQueryServer {    private ZooKeeper zooKeeper;    // 构造zk客户端连接    public void connectZK() throws Exception{        zooKeeper = new ZooKeeper("192.168.1.4:2181,192.168.1.4:2182,192.168.1.4:2183", 2000, null);    }    // 注册服务器信息    public void registerServerInfo(String hostname,String port) throws Exception{        /**         * 先判断注册节点的父节点是否存在,如果不存在,则创建         */        Stat stat = zooKeeper.exists("/servers", false);        if(stat==null){            zooKeeper.create("/servers", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);        }        // 注册服务器数据到zk的约定注册节点下        String create = zooKeeper.create("/servers/server", (hostname+":"+port).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);        System.out.println(hostname+" 服务器向zk注册信息成功,注册的节点为:" + create);    }}

 

如果注入了服务,那么我们为了监控这个服务的存在,那么是不是应该也模拟一个服务?

好,这里我们就做一个时钟同步的服务,用消费线程不断请求服务,并获取当前时间

package cn.cutter.demo.hadoop.zookeeper;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import java.io.IOException;import java.io.InputStream;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.util.Date;import java.util.Iterator;/** * @ProjectName: cutter-point * @Package: cn.cutter.demo.hadoop.zookeeper * @ClassName: TimeQueryService * @Author: xiaof * @Description: ${description} * @Date: 2019/4/2 19:43 * @Version: 1.0 */public class TimeQueryService extends Thread {    private static final Log log = LogFactory.getLog(TimeQueryService.class);    int port = 0;    public TimeQueryService(int port) {        this.port = port;    }    @Override    public void run() {        //1.创建信道选择器        Selector selector = null;        //不断读取字符,只有读到换行我们才进行输出//        StringBuffer stringBuffer = new StringBuffer();        try {            selector = Selector.open();            //2.创建对应端口的监听            //2.1 创建通道            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();            //2.2 socket 对象绑定端口  socket() 获取与此通道关联的服务器套接字            serverSocketChannel.socket().bind(new InetSocketAddress(port));            //2.3 设置为非阻塞            serverSocketChannel.configureBlocking(false);            //注册到对应的选择器,读取信息            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);        } catch (IOException e) {            e.printStackTrace();        }        //3.轮询获取信息        while (true) {            //获取socket对象            //获取准备好的信道总数            if (!selector.isOpen()) {                System.out.println("is close over");                break;            }            try {                if (selector.select(3000) == 0) {                    continue; //下一次循环                }            } catch (IOException e) {                e.printStackTrace();            }            //获取信道            Iterator
keyIterable = selector.selectedKeys().iterator(); while (keyIterable.hasNext()) { //6.遍历键集,判断键类型,执行相应的操作 SelectionKey selectionKey = keyIterable.next(); //判断键类型,执行相应操作 if (selectionKey.isAcceptable()) { try { //从key中获取对应信道 //接受数据 SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept(); //并设置成非阻塞 socketChannel.configureBlocking(false); //从新注册,修改状态 socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(1024)); } catch (IOException e) { e.printStackTrace(); } } if (selectionKey.isReadable()) { //读取数据 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); //获取当前的附加对象。 ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment(); //判断是是否断开连接 int count = 0; while (true) { try { if (!((count = socketChannel.read(byteBuffer)) != 0 && count != -1 && selectionKey.isValid())) { if(count == -1) { //关闭通道 socketChannel.close(); } break; } } catch (IOException e) {// e.printStackTrace(); try { //如果读取数据会抛出异常,那么就断定通道已经被客户端关闭 socketChannel.close(); } catch (IOException e1) { e1.printStackTrace(); } System.out.println("无法读取数据!"); break; } //判断是否有换行// byteBuffer.flip(); byte msg[] = byteBuffer.array(); boolean isOver = false; int i = byteBuffer.position() - count; for (; i < byteBuffer.position(); ++i) { //判断是否有换行 if (byteBuffer.get(i) == '\r' || byteBuffer.get(i) == '\n') { //输出 //先压缩数据 byteBuffer.flip(); byte out[] = new byte[byteBuffer.limit()]; byteBuffer.get(out, 0, out.length); log.info(new String(out)); //设置成可以读和可写状态 byteBuffer.compact(); byteBuffer.clear(); isOver = true; } } if (isOver == true) {// interestOps(SelectionKey.OP_READ);的意思其实就是用同一个KEY重新注册 selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); break; } } if (count == -1) { //如果是-1 ,那么就关闭客户端 try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } else {// selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } //告知此键是否有效。 if (selectionKey.isValid() && selectionKey.isWritable()) { //获取当前的附加对象。// ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment(); // 清空,并写入数据// byteBuffer.clear(); byte smsBytes[] = (new Date().toString() + "\n").getBytes(); ByteBuffer byteBuffer = ByteBuffer.wrap(smsBytes);// byteBuffer.put(smsBytes); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); //写入数据// System.out.println(new String(byteBuffer.array())); while (byteBuffer.hasRemaining()) { //输出数据 try { socketChannel.write(byteBuffer); } catch (IOException e) { e.printStackTrace(); } } //判断是否判断是否有待处理数据 if (!byteBuffer.hasRemaining()) { //数据清理干净 selectionKey.interestOps(SelectionKey.OP_READ); } //压缩此缓冲区将缓冲区的当前位置和界限之间的字节(如果有)复制到缓冲区的开始处。 // 即将索引 p = position() 处的字节复制到索引 0 处,将索引 p + 1 处的字节复制到索引 1 处,依此类推,直到将索引 limit() - 1 处的字节复制到索引 // n = limit() - 1 - p 处。然后将缓冲区的位置设置为 n+1,并将其界限设置为其容量。如果已定义了标记,则丢弃它。 //将缓冲区的位置设置为复制的字节数,而不是零,以便调用此方法后可以紧接着调用另一个相对 put 方法。 //从缓冲区写入数据之后调用此方法,以防写入不完整。例如,以下循环语句通过 buf 缓冲区将字节从一个信道复制到另一个信道: byteBuffer.compact(); } //执行操作的时候,移除避免下一次循环干扰// 原因是Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中 keyIterable.remove(); } } }}

 

说实话,这个服务端当时花了好大的力气写完的,mmp,因为客户端进行可以不关闭通道直接kill,导致服务端并不知道对端已经离线,到时候服务端不断再进行空轮训,一旦进行read就抛出io异常!!

好了,服务和注册写完了,那么我们注册一把呗

 

@Test    public void test1() throws Exception {        TimeQueryServer timeQueryServer = new TimeQueryServer();        // 构造zk客户端连接        timeQueryServer.connectZK();        // 注册服务器信息        timeQueryServer.registerServerInfo("192.168.1.7", "8888");        // 启动业务线程开始处理业务        new TimeQueryService(Integer.parseInt("8888")).start();        while(true) {            Thread.sleep(200000);//            System.out.println("..");        }    }

 

 不要在意那个null异常,那是因为我爸watch设置为null的原因

 

服务端写完了,我们再考虑写一波客户端

package cn.cutter.demo.hadoop.zookeeper;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import java.net.InetSocketAddress;import java.net.SocketException;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;import java.util.ArrayList;import java.util.List;import java.util.Random;/** * @ProjectName: cutter-point * @Package: cn.cutter.demo.hadoop.zookeeper * @ClassName: Consumer * @Author: xiaof * @Description: ${description} * @Date: 2019/4/3 14:11 * @Version: 1.0 */public class Consumer {    // 定义一个list用于存放最新的在线服务器列表    private volatile ArrayList
onlineServers = new ArrayList<>(); // 构造zk连接对象 ZooKeeper zk = null; // 构造zk客户端连接 public void connectZK() throws Exception { zk = new ZooKeeper("192.168.1.4:2181,192.168.1.4:2182,192.168.1.4:2183", 2000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected && event.getType() == Event.EventType.NodeChildrenChanged) { try { // 事件回调逻辑中,再次查询zk上的在线服务器节点即可,查询逻辑中又再次注册了子节点变化事件监听 getOnlineServers(); } catch (Exception e) { e.printStackTrace(); } } } }); } // 查询在线服务器列表 public void getOnlineServers() throws Exception { List
children = zk.getChildren("/servers", true); ArrayList
servers = new ArrayList<>(); for (String child : children) { byte[] data = zk.getData("/servers/" + child, false, null); String serverInfo = new String(data); servers.add(serverInfo); } onlineServers = servers; System.out.println("查询了一次zk,当前在线的服务器有:" + servers); } public void sendRequest() throws Exception { Random random = new Random(); while (true) { try { // 挑选一台当前在线的服务器 int nextInt = random.nextInt(onlineServers.size()); String server = onlineServers.get(nextInt); String hostname = server.split(":")[0]; int port = Integer.parseInt(server.split(":")[1]); System.out.println("本次请求挑选的服务器为:" + server); //2.打开socket信道,设置成非阻塞模式 SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); //3.尝试建立连接,然后轮询,判定,连接是否完全建立 int times = 0; if(!socketChannel.connect(new InetSocketAddress(hostname, port))) { while(!socketChannel.finishConnect()) {// System.out.println(times++ + ". "); } } //4.创建相应的buffer缓冲 ByteBuffer writeBuffer = ByteBuffer.wrap("test\n".getBytes()); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int totalBytesRcvd = 0; int bytesRcvd; //5.向socket信道发送数据,然后尝试读取数据 socketChannel.write(writeBuffer); //读取数据 if((bytesRcvd = socketChannel.read(readBuffer)) == -1) { //这种非阻塞模式,如果读取不到数据是会返回0的,如果是-1该通道已到达流的末尾 throw new SocketException("连接关闭??"); } //不停尝试获取数据,这是因为服务端数据反馈太慢了??? while (bytesRcvd == 0) { bytesRcvd = socketChannel.read(readBuffer); } //6.输出 readBuffer.flip(); byte reads[] = new byte[readBuffer.limit()]; readBuffer.get(reads, 0, reads.length); System.out.println("收到信息:" + new String(reads)); //7.关闭信道 socketChannel.close(); Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { Consumer consumer = new Consumer(); // 构造zk连接对象 consumer.connectZK(); // 查询在线服务器列表 consumer.getOnlineServers(); // 处理业务(向一台服务器发送时间查询请求) consumer.sendRequest(); }}

启动客户端:

 

 

 

 为了体现zookeeper监控服务是否在线的操作,我们多起几个服务端,然后监控客户端的信息展示

 

 我们再起一个

 

 

 

 

 接下来我们kill掉8888端口的进程

 

 我们当前在线可以看到只有2个节点了,我们上zk看看,确实只有2个了

 

 

到这里zookeeper的上下线的判断已经完成,我最近再自学大数据的东西,想向大数据进军一波,欢迎大家一起探讨大数据的学习。

 

转载于:https://www.cnblogs.com/cutter-point/p/10651293.html

你可能感兴趣的文章
Vue 2.x + Webpack 3.x + Nodejs 多页面项目框架(上篇——纯前端多页面)
查看>>
我的PHP学习之路
查看>>
【题解】luogu p2340 奶牛会展
查看>>
对PostgreSQL的 SPI_prepare 的理解。
查看>>
解决响应式布局下兼容性的问题
查看>>
使用DBCP连接池对连接进行管理
查看>>
【洛谷】【堆+模拟】P2278 操作系统
查看>>
hdu3307 欧拉函数
查看>>
Spring Bean InitializingBean和DisposableBean实例
查看>>
[容斥][dp][快速幂] Jzoj P5862 孤独
查看>>
Lucene 学习之二:数值类型的索引和范围查询分析
查看>>
软件开发工作模型
查看>>
Java基础之字符串匹配大全
查看>>
面向对象
查看>>
lintcode83- Single Number II- midium
查看>>
移动端 响应式、自适应、适配 实现方法分析(和其他基础知识拓展)
查看>>
使用vue的v-model自定义 checkbox组件
查看>>
[工具] Sublime Text 使用指南
查看>>
Web服务器的原理
查看>>
#10015 灯泡(无向图连通性+二分)
查看>>