前言
专注于为中小企业提供成都做网站、成都网站建设服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业西湖免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了近1000家企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。
前两篇讲了Zookeeper的特性、客户端使用和集群原理,因为 Zookeeper 是分布式系统中很常见的一个基础系统。 而且问的话常问的就是说 zookeeper 的使用场景是什么? 看你知道不知道一些基本的使用场景。 但是其实 Zookeeper 挖深了自然是可以问的很深很深的。本文主要来聊聊 Zookeeper 主要的几个使用场景。
分布式集群管理
分布式集群管理的需求
架构设计
节点结构
服务状态信息
功能实现
数据生成与上报
主动查询
被动通知
关键示例代码
- package com.niuh.os;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import org.I0Itec.zkclient.ZkClient;
- import java.lang.instrument.Instrumentation;
- import java.lang.management.ManagementFactory;
- import java.lang.management.MemoryUsage;
- import java.net.InetAddress;
- import java.net.UnknownHostException;
- public class Agent {
- private static Agent ourInstance = new Agent();
- private String server = "127.0.0.1:2181";
- private ZkClient zkClient;
- private static final String rootPath = "/niuh-manger";
- private static final String servicePath = rootPath + "/service";
- private String nodePath; ///niuh-manger/service0000001 当前节点路径
- private Thread stateThread;
- public static Agent getInstance() {
- return ourInstance;
- }
- private Agent() {
- }
- // javaagent 数据监控
- public static void premain(String args, Instrumentation instrumentation) {
- Agent.getInstance().init();
- }
- public void init() {
- zkClient = new ZkClient(server, 5000, 10000);
- System.out.println("zk连接成功" + server);
- // 创建根节点
- buildRoot();
- // 创建临时节点
- createServerNode();
- // 启动更新的线程
- stateThread = new Thread(() -> {
- while (true) {
- updateServerNode();
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }, "zk_stateThread");
- stateThread.setDaemon(true);
- stateThread.start();
- }
- // 数据写到 当前的临时节点中去
- public void updateServerNode() {
- zkClient.writeData(nodePath, getOsInfo());
- }
- // 生成服务节点
- public void createServerNode() {
- nodePath = zkClient.createEphemeralSequential(servicePath, getOsInfo());
- System.out.println("创建节点:" + nodePath);
- }
- // 更新服务节点状态
- public String getOsInfo() {
- OsBean bean = new OsBean();
- bean.lastUpdateTime = System.currentTimeMillis();
- bean.ip = getLocalIp();
- bean.cpu = CPUMonitorCalc.getInstance().getProcessCpu();
- MemoryUsage memoryUsag = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
- bean.usedMemorySize = memoryUsag.getUsed() / 1024 / 1024;
- bean.usableMemorySize = memoryUsag.getMax() / 1024 / 1024;
- bean.pid = ManagementFactory.getRuntimeMXBean().getName();
- ObjectMapper mapper = new ObjectMapper();
- try {
- return mapper.writeValueAsString(bean);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
- public static String getLocalIp() {
- InetAddress addr = null;
- try {
- addr = InetAddress.getLocalHost();
- } catch (UnknownHostException e) {
- throw new RuntimeException(e);
- }
- return addr.getHostAddress();
- }
- public void buildRoot() {
- if (!zkClient.exists(rootPath)) {
- zkClient.createPersistent(rootPath);
- }
- }
- }
实现效果
启动参数设置
运行测试用例:
- package com.niuh.test;
- import com.niuh.os.Agent;
- import org.junit.Ignore;
- import org.junit.Test;
- public class AgentTest {
- @Test
- @Ignore
- public void initTest() {
- Agent.premain(null, null);
- runCPU(2); //20% 占用
- try {
- Thread.sleep(Long.MAX_VALUE);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- //
- private void runCPU(int count) {
- for (int i = 0; i < count; i++) {
- new Thread(() -> {
- while (true) {
- long bac = 1000000;
- bac = bac >> 1;
- }
- }).start();
- ;
- }
- }
- }
控制台输出:
- CPU 报警...22.55120088850181
- CPU 报警...46.06592086097357CPU 报警...47.87206766163349CPU 报警...49.49176420213768CPU 报警...48.967942479969004CPU 报警...49.193921607021565CPU 报警...48.806604284784676CPU 报警...48.63229912951865CPU 报警...49.34509647972038CPU 报警...47.07551108884401CPU 报警...49.18489236134496CPU 报警...49.903007346777066CPU 报警...49.28868795953268// 关闭测试用例服务已下线:OsBean{ip='192.168.43.11', cpu=49.28868795953268, usedMemorySize=56, usableMemorySize=3641, pid='47192@hejianhui', lastUpdateTime=1602056208842}
本Demo不适用在生产环境,示例Demo涉及组件zookeeper-agent、zookeeper-web。源代码提交在 github:https://github.com/Niuh-Frame/niuh-zookeeper。
分布式注册中心
在单体式服务中,通常是由多个客户端去调用一个服务,只要在客户端中配置唯一服务节点地址即可,当升级到分布式后,服务节点变多,像一线大厂服务节点更是上万之多,这么多节点不可能手动配置在客户端,这里就需要一个中间服务,专门用于帮助客户端发现服务节点,即许多技术书籍经常提到的服务发现。
一个完整的注册中心涵盖以下功能特性:
Dubbo 对 Zookeeper的使用
阿里著名的开源项目Dubbo 是一个基于JAVA的RCP框架,其中必不可少的注册中心可基于多种第三方组件实现,但其官方推荐的还是Zookeeper作为注册中心服务。
Dubbo Zookeeper注册中心存储结构
节点说明
流程说明
示例Demo
服务端代码
- package com.niuh.zk.dubbo;
- import com.alibaba.dubbo.config.ApplicationConfig;
- import com.alibaba.dubbo.config.ProtocolConfig;
- import com.alibaba.dubbo.config.RegistryConfig;
- import com.alibaba.dubbo.config.ServiceConfig;
- import java.io.IOException;
- public class Server {
- public void openServer(int port) {
- // 构建应用
- ApplicationConfig config = new ApplicationConfig();
- config.setName("simple-app");
- // 通信协议
- ProtocolConfig protocolConfig = new ProtocolConfig("dubbo", port);
- protocolConfig.setThreads(200);
- ServiceConfig
serviceConfig = new ServiceConfig(); - serviceConfig.setApplication(config);
- serviceConfig.setProtocol(protocolConfig);
- serviceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
- serviceConfig.setInterface(UserService.class);
- UserServiceImpl ref = new UserServiceImpl();
- serviceConfig.setRef(ref);
- //开始提供服务 开张做生意
- serviceConfig.export();
- System.out.println("服务已开启!端口:"+serviceConfig.getExportedUrls().get(0).getPort());
- ref.setPort(serviceConfig.getExportedUrls().get(0).getPort());
- }
- public static void main(String[] args) throws IOException {
- new Server().openServer(-1);
- System.in.read();
- }
- }
客户端代码
- package com.niuh.zk.dubbo;
- import com.alibaba.dubbo.config.ApplicationConfig;
- import com.alibaba.dubbo.config.ReferenceConfig;
- import com.alibaba.dubbo.config.RegistryConfig;
- import java.io.IOException;
- public class Client { UserService service; // URL 远程服务的调用地址 public UserService buildService(String url) { ApplicationConfig config = new ApplicationConfig("young-app");
- // 构建一个引用对象 ReferenceConfig
referenceConfig = new ReferenceConfig (); referenceConfig.setApplication(config); - referenceConfig.setInterface(UserService.class); // referenceConfig.setUrl(url); referenceConfig.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
- referenceConfig.setTimeout(5000);
- // 透明化 this.service = referenceConfig.get(); return service;
- } static int i = 0;
- public static void main(String[] args) throws IOException { Client client1 = new Client(); client1.buildService("");
- String cmd; while (!(cmd = read()).equals("exit")) {
- UserVo u = client1.service.getUser(Integer.parseInt(cmd)); System.out.println(u); } } private static String read() throws IOException {
- byte[] b = new byte[1024];
- int size = System.in.read(b);
- return new String(b, 0, size).trim();
- }}
查询 zk 实际存储内容:
- /dubbo
- /dubbo/com.niuh.zk.dubbo.UserService/dubbo/com.niuh.zk.dubbo.UserService/configurators/dubbo/com.niuh.zk.dubbo.UserService/routers/dubbo/com.niuh.zk.dubbo.UserService/providers/dubbo/com.niuh.zk.dubbo.UserService/providers/dubbo://192.168.43.11:20880/com.niuh.zk.dubbo.UserService?anyhost=true&application=simple-app&dubbo=2.6.2&generic=false&interface=com.niuh.zk.dubbo.UserService&methods=getUser&pid=48302&side=provider&threads=200×tamp=1602057895881/dubbo/com.niuh.zk.dubbo.UserService/consumers/dubbo/com.niuh.zk.dubbo.UserService/consumers/consumer://192.168.43.11com.niuh.zk.dubbo.UserService?application=young-app&category=consumers&check=false&dubbo=2.6.2&interface=com.niuh.zk.dubbo.UserService&methods=getUser&pid=49036&side=consumer&timeout=5000×tamp=1602075359549
示例Demo涉及组件zookeeper-dubbo。源代码提交在 github:https://github.com/Niuh-Frame/niuh-zookeeper。
分布式JOB
分布式JOB需求
多个服务节点只允许其中一个主节点运行JOB任务。
当主节点挂掉后能自动切换主节点,继续执行JOB任务。
架构设计
node结构
选举流程
服务启动:
子节点删除事件触发:
示例Demo
- package com.niuh.zookeeper.master;
- import org.I0Itec.zkclient.ZkClient;
- import java.util.Map;
- import java.util.stream.Collectors;
- public class MasterResolve {
- private String server = "127.0.0.1:2181";
- private ZkClient zkClient;
- private static final String rootPath = "/niuh-master";
- private static final String servicePath = rootPath + "/service";
- private String nodePath;
- private volatile boolean master = false;
- private static MasterResolve resolve;
- private MasterResolve() {
- zkClient = new ZkClient(server, 2000, 5000);
- buildRoot(); createServerNode(); } public static MasterResolve getInstance() {
- if (resolve == null) {
- resolve= new MasterResolve();
- } return resolve;
- } // 构建根节点
- public void buildRoot() {
- if (!zkClient.exists(rootPath)) {
- zkClient.createPersistent(rootPath);
- }
- }
- // 创建server节点
- public void createServerNode() {
- nodePath = zkClient.createEphemeralSequential(servicePath, "slave");
- System.out.println("创建service节点:" + nodePath);
- initMaster();
- initListener();
- }
- private void initMaster() {
- boolean existMaster = zkClient.getChildren(rootPath)
- .stream()
- .map(p -> rootPath + "/" + p)
- .map(p -> zkClient.readData(p))
- .anyMatch(d -> "master".equals(d));
- if (!existMaster) {
- doElection();
- System.out.println("当前当选master");
- }
- }
- private void initListener() {
- zkClient.subscribeChildChanges(rootPath, (parentPath, currentChilds) -> {
- doElection();// 执行选举
- });
- }
- // 执行选举
- public void doElection() {
- Map
childData = zkClient.getChildren(rootPath) - .stream()
- .map(p -> rootPath + "/" + p)
- .collect(Collectors.toMap(p -> p, p -> zkClient.readData(p)));
- if (childData.containsValue("master")) {
- return;
- }
- childData.keySet().stream().sorted().findFirst().ifPresent(p -> {
- if (p.equals(nodePath)) { // 设置最小值序号为master 节点
- zkClient.writeData(nodePath, "master");
- master = true;
- System.out.println("当前当选master" + nodePath);
- }
- });
- }
- public static boolean isMaster() {
- return getInstance().master;
- }
- }
示例Demo涉及组件zookeeper-master。源代码提交在 github :https://github.com/Niuh-Frame/niuh-zookeeper。
分布式锁
锁的的基本概念
开发中锁的概念并不陌生,通过锁可以实现在多个线程或多个进程间在争抢资源时,能够合理的分配置资源的所有权。在单体应用中我们可以通过 synchronized 或 ReentrantLock 来实现锁。但在分布式系统中,仅仅是加synchronized 是不够的,需要借助第三组件来实现。比如一些简单的做法是使用关系型数据行级锁来实现不同进程之间的互斥,但大型分布式系统的性能瓶颈往往集中在数据库操作上。为了提高性能得采用如Redis、Zookeeper之内的组件实现分布式锁。
共享锁:也称作只读锁,当一方获得共享锁之后,其它方也可以获得共享锁。但其只允许读取。在共享锁全部释放之前,其它方不能获得写锁。
排它锁:也称作读写锁,获得排它锁后,可以进行数据的读写。在其释放之前,其它方不能获得任何锁。
锁的获取
某银行账户,可以同时进行帐户信息的读取,但读取期间不能修改帐户数据。其账户ID为:888
获得读锁流程
数据结构
获得写锁
释放锁
读取完毕后,手动删除临时节点,如果获锁期间宕机,则会在会话失效后自动删除。
关于羊群效应
在等待锁获得期间,所有等待节点都在监听 Lock节点,一但lock 节点变更所有等待节点都会被触发,然后在同时反查Lock 子节点。如果等待对例过大会使用Zookeeper承受非常大的流量压力。
为了改善这种情况,可以采用监听链表的方式,每个等待队列只监听前一个节点,如果前一个节点释放锁的时候,才会被触发通知。这样就形成了一个监听链表。
示例Demo
- package com.niuh.zookeeper.lock;
- import org.I0Itec.zkclient.IZkDataListener;
- import org.I0Itec.zkclient.ZkClient;
- import java.util.List;
- import java.util.stream.Collectors;
- public class ZookeeperLock {
- private String server = "127.0.0.1:2181";
- private ZkClient zkClient;
- private static final String rootPath = "/niuh-lock1";
- public ZookeeperLock() {
- zkClient = new ZkClient(server, 5000, 20000);
- buildRoot(); } // 构建根节点
- public void buildRoot() {
- if (!zkClient.exists(rootPath)) {
- zkClient.createPersistent(rootPath);
- }
- }
- // 获取锁
- public Lock lock(String lockId, long timeout) {
- // 创建临时节点
- Lock lockNode = createLockNode(lockId);
- lockNode = tryActiveLock(lockNode);// 尝试激活锁
- if (!lockNode.isActive()) {
- try {
- synchronized (lockNode) {
- lockNode.wait(timeout); // 线程锁住
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- if (!lockNode.isActive()) {
- throw new RuntimeException(" lock timeout");
- }
- return lockNode;
- }
- // 释放锁
- public void unlock(Lock lock) {
- if (lock.isActive()) {
- zkClient.delete(lock.getPath());
- }
- }
- // 尝试激活锁
- private Lock tryActiveLock(Lock lockNode) {
- // 获取根节点下面所有的子节点
- List
list = zkClient.getChildren(rootPath) - .stream()
- .sorted()
- .map(p -> rootPath + "/" + p)
- .collect(Collectors.toList()); // 判断当前是否为最小节点
- String firstNodePath = list.get(0);
- // 最小节点是不是当前节点
- if (firstNodePath.equals(lockNode.getPath())) {
- lockNode.setActive(true);
- } else {
- String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1);
- zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() {
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
- }
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- // 事件处理 与心跳 在同一个线程,如果Debug时占用太多时间,将导致本节点被删除,从而影响锁逻辑。
- System.out.println("节点删除:" + dataPath);
- Lock lock = tryActiveLock(lockNode);
- synchronized (lockNode) {
- if (lock.isActive()) {
- lockNode.notify(); // 释放了
- }
- }
- zkClient.unsubscribeDataChanges(upNodePath, this);
- }
- });
- }
- return lockNode;
- }
- public Lock createLockNode(String lockId) {
- String nodePath = zkClient.createEphemeralSequential(rootPath + "/" + lockId, "w");
- return new Lock(lockId, nodePath);
- }
- }
示例Demo涉及组件zookeeper-lock。源代码提交在 github :https://github.com/Niuh-Frame/niuh-zookeeper。
分享文章:面试官:Zookeeper了解吗?说说都有哪些使用场景?
新闻来源:http://www.mswzjz.cn/qtweb/news39/515939.html
攀枝花网站建设、攀枝花网站运维推广公司-贝锐智能,是专注品牌与效果的网络营销公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 贝锐智能