首先什么是观察者模式,可以看看我之前的设计模式的文章
确定一下,要有观察者,要有被观察者,然后要被观察者触发事件,事件发生之后,观察者触发相应的事件发生
了解了基本概念,我们来看看zookeeper是什么情况
zookeeper也是类似观察者一样,我们先把本机信息注册进入服务器,然后设置一个watch方法,这个在zookeeper节点发生变化的时候通知对应的客户端,触发对应的方法
这里先注册服务,如何向zookeeper进行注册呢
package cn.cutter.demo.hadoop.zookeeper;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;/** * @ProjectName: cutter-point * @Package: cn.cutter.demo.hadoop.zookeeper * @ClassName: TimeQueryServer * @Author: xiaof * @Description: 利用zookeeper来进行分布式时间查询 * @Date: 2019/4/2 19:37 * @Version: 1.0 */public class TimeQueryServer { private ZooKeeper zooKeeper; // 构造zk客户端连接 public void connectZK() throws Exception{ zooKeeper = new ZooKeeper("192.168.1.4:2181,192.168.1.4:2182,192.168.1.4:2183", 2000, null); } // 注册服务器信息 public void registerServerInfo(String hostname,String port) throws Exception{ /** * 先判断注册节点的父节点是否存在,如果不存在,则创建 */ Stat stat = zooKeeper.exists("/servers", false); if(stat==null){ zooKeeper.create("/servers", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 注册服务器数据到zk的约定注册节点下 String create = zooKeeper.create("/servers/server", (hostname+":"+port).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname+" 服务器向zk注册信息成功,注册的节点为:" + create); }}
如果注入了服务,那么我们为了监控这个服务的存在,那么是不是应该也模拟一个服务?
好,这里我们就做一个时钟同步的服务,用消费线程不断请求服务,并获取当前时间
package cn.cutter.demo.hadoop.zookeeper;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import java.io.IOException;import java.io.InputStream;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.util.Date;import java.util.Iterator;/** * @ProjectName: cutter-point * @Package: cn.cutter.demo.hadoop.zookeeper * @ClassName: TimeQueryService * @Author: xiaof * @Description: ${description} * @Date: 2019/4/2 19:43 * @Version: 1.0 */public class TimeQueryService extends Thread { private static final Log log = LogFactory.getLog(TimeQueryService.class); int port = 0; public TimeQueryService(int port) { this.port = port; } @Override public void run() { //1.创建信道选择器 Selector selector = null; //不断读取字符,只有读到换行我们才进行输出// StringBuffer stringBuffer = new StringBuffer(); try { selector = Selector.open(); //2.创建对应端口的监听 //2.1 创建通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //2.2 socket 对象绑定端口 socket() 获取与此通道关联的服务器套接字 serverSocketChannel.socket().bind(new InetSocketAddress(port)); //2.3 设置为非阻塞 serverSocketChannel.configureBlocking(false); //注册到对应的选择器,读取信息 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); } //3.轮询获取信息 while (true) { //获取socket对象 //获取准备好的信道总数 if (!selector.isOpen()) { System.out.println("is close over"); break; } try { if (selector.select(3000) == 0) { continue; //下一次循环 } } catch (IOException e) { e.printStackTrace(); } //获取信道 IteratorkeyIterable = selector.selectedKeys().iterator(); while (keyIterable.hasNext()) { //6.遍历键集,判断键类型,执行相应的操作 SelectionKey selectionKey = keyIterable.next(); //判断键类型,执行相应操作 if (selectionKey.isAcceptable()) { try { //从key中获取对应信道 //接受数据 SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept(); //并设置成非阻塞 socketChannel.configureBlocking(false); //从新注册,修改状态 socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(1024)); } catch (IOException e) { e.printStackTrace(); } } if (selectionKey.isReadable()) { //读取数据 SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); //获取当前的附加对象。 ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment(); //判断是是否断开连接 int count = 0; while (true) { try { if (!((count = socketChannel.read(byteBuffer)) != 0 && count != -1 && selectionKey.isValid())) { if(count == -1) { //关闭通道 socketChannel.close(); } break; } } catch (IOException e) {// e.printStackTrace(); try { //如果读取数据会抛出异常,那么就断定通道已经被客户端关闭 socketChannel.close(); } catch (IOException e1) { e1.printStackTrace(); } System.out.println("无法读取数据!"); break; } //判断是否有换行// byteBuffer.flip(); byte msg[] = byteBuffer.array(); boolean isOver = false; int i = byteBuffer.position() - count; for (; i < byteBuffer.position(); ++i) { //判断是否有换行 if (byteBuffer.get(i) == '\r' || byteBuffer.get(i) == '\n') { //输出 //先压缩数据 byteBuffer.flip(); byte out[] = new byte[byteBuffer.limit()]; byteBuffer.get(out, 0, out.length); log.info(new String(out)); //设置成可以读和可写状态 byteBuffer.compact(); byteBuffer.clear(); isOver = true; } } if (isOver == true) {// interestOps(SelectionKey.OP_READ);的意思其实就是用同一个KEY重新注册 selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); break; } } if (count == -1) { //如果是-1 ,那么就关闭客户端 try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } else {// selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } //告知此键是否有效。 if (selectionKey.isValid() && selectionKey.isWritable()) { //获取当前的附加对象。// ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment(); // 清空,并写入数据// byteBuffer.clear(); byte smsBytes[] = (new Date().toString() + "\n").getBytes(); ByteBuffer byteBuffer = ByteBuffer.wrap(smsBytes);// byteBuffer.put(smsBytes); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); //写入数据// System.out.println(new String(byteBuffer.array())); while (byteBuffer.hasRemaining()) { //输出数据 try { socketChannel.write(byteBuffer); } catch (IOException e) { e.printStackTrace(); } } //判断是否判断是否有待处理数据 if (!byteBuffer.hasRemaining()) { //数据清理干净 selectionKey.interestOps(SelectionKey.OP_READ); } //压缩此缓冲区将缓冲区的当前位置和界限之间的字节(如果有)复制到缓冲区的开始处。 // 即将索引 p = position() 处的字节复制到索引 0 处,将索引 p + 1 处的字节复制到索引 1 处,依此类推,直到将索引 limit() - 1 处的字节复制到索引 // n = limit() - 1 - p 处。然后将缓冲区的位置设置为 n+1,并将其界限设置为其容量。如果已定义了标记,则丢弃它。 //将缓冲区的位置设置为复制的字节数,而不是零,以便调用此方法后可以紧接着调用另一个相对 put 方法。 //从缓冲区写入数据之后调用此方法,以防写入不完整。例如,以下循环语句通过 buf 缓冲区将字节从一个信道复制到另一个信道: byteBuffer.compact(); } //执行操作的时候,移除避免下一次循环干扰// 原因是Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中 keyIterable.remove(); } } }}
说实话,这个服务端当时花了好大的力气写完的,mmp,因为客户端进行可以不关闭通道直接kill,导致服务端并不知道对端已经离线,到时候服务端不断再进行空轮训,一旦进行read就抛出io异常!!
好了,服务和注册写完了,那么我们注册一把呗
@Test public void test1() throws Exception { TimeQueryServer timeQueryServer = new TimeQueryServer(); // 构造zk客户端连接 timeQueryServer.connectZK(); // 注册服务器信息 timeQueryServer.registerServerInfo("192.168.1.7", "8888"); // 启动业务线程开始处理业务 new TimeQueryService(Integer.parseInt("8888")).start(); while(true) { Thread.sleep(200000);// System.out.println(".."); } }
不要在意那个null异常,那是因为我爸watch设置为null的原因
服务端写完了,我们再考虑写一波客户端
package cn.cutter.demo.hadoop.zookeeper;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import java.net.InetSocketAddress;import java.net.SocketException;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;import java.util.ArrayList;import java.util.List;import java.util.Random;/** * @ProjectName: cutter-point * @Package: cn.cutter.demo.hadoop.zookeeper * @ClassName: Consumer * @Author: xiaof * @Description: ${description} * @Date: 2019/4/3 14:11 * @Version: 1.0 */public class Consumer { // 定义一个list用于存放最新的在线服务器列表 private volatile ArrayListonlineServers = new ArrayList<>(); // 构造zk连接对象 ZooKeeper zk = null; // 构造zk客户端连接 public void connectZK() throws Exception { zk = new ZooKeeper("192.168.1.4:2181,192.168.1.4:2182,192.168.1.4:2183", 2000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected && event.getType() == Event.EventType.NodeChildrenChanged) { try { // 事件回调逻辑中,再次查询zk上的在线服务器节点即可,查询逻辑中又再次注册了子节点变化事件监听 getOnlineServers(); } catch (Exception e) { e.printStackTrace(); } } } }); } // 查询在线服务器列表 public void getOnlineServers() throws Exception { List children = zk.getChildren("/servers", true); ArrayList servers = new ArrayList<>(); for (String child : children) { byte[] data = zk.getData("/servers/" + child, false, null); String serverInfo = new String(data); servers.add(serverInfo); } onlineServers = servers; System.out.println("查询了一次zk,当前在线的服务器有:" + servers); } public void sendRequest() throws Exception { Random random = new Random(); while (true) { try { // 挑选一台当前在线的服务器 int nextInt = random.nextInt(onlineServers.size()); String server = onlineServers.get(nextInt); String hostname = server.split(":")[0]; int port = Integer.parseInt(server.split(":")[1]); System.out.println("本次请求挑选的服务器为:" + server); //2.打开socket信道,设置成非阻塞模式 SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); //3.尝试建立连接,然后轮询,判定,连接是否完全建立 int times = 0; if(!socketChannel.connect(new InetSocketAddress(hostname, port))) { while(!socketChannel.finishConnect()) {// System.out.println(times++ + ". "); } } //4.创建相应的buffer缓冲 ByteBuffer writeBuffer = ByteBuffer.wrap("test\n".getBytes()); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int totalBytesRcvd = 0; int bytesRcvd; //5.向socket信道发送数据,然后尝试读取数据 socketChannel.write(writeBuffer); //读取数据 if((bytesRcvd = socketChannel.read(readBuffer)) == -1) { //这种非阻塞模式,如果读取不到数据是会返回0的,如果是-1该通道已到达流的末尾 throw new SocketException("连接关闭??"); } //不停尝试获取数据,这是因为服务端数据反馈太慢了??? while (bytesRcvd == 0) { bytesRcvd = socketChannel.read(readBuffer); } //6.输出 readBuffer.flip(); byte reads[] = new byte[readBuffer.limit()]; readBuffer.get(reads, 0, reads.length); System.out.println("收到信息:" + new String(reads)); //7.关闭信道 socketChannel.close(); Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { Consumer consumer = new Consumer(); // 构造zk连接对象 consumer.connectZK(); // 查询在线服务器列表 consumer.getOnlineServers(); // 处理业务(向一台服务器发送时间查询请求) consumer.sendRequest(); }}
启动客户端:
为了体现zookeeper监控服务是否在线的操作,我们多起几个服务端,然后监控客户端的信息展示
我们再起一个
接下来我们kill掉8888端口的进程
我们当前在线可以看到只有2个节点了,我们上zk看看,确实只有2个了
到这里zookeeper的上下线的判断已经完成,我最近再自学大数据的东西,想向大数据进军一波,欢迎大家一起探讨大数据的学习。