脚本宝典收集整理的这篇文章主要介绍了ZooKeeper-B站黑马程序员,多多三连,脚本宝典觉得挺不错的,现在分享给大家,也给大家做个参考。
分布式项目的配置总管理处
对于分布式项目修改共享数据时加入锁管理(同一时间只能有一个服务对数据进更改)
最常见的功能,作为注册中心使用.
Zookeeper是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构
这里面的每一个结点都被称为ZNode,每个节点都会保存自己的数据和节点信息
节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下
节点可以分为四大类
PERSISTENT 持久化节点
EPHEMRAL 临时节点 : -e
PERSISTENT_SENQUENTIAL 持久化顺序节点 : -s
EPHEMRAL_SEQUENTIAL 临时顺序节点 : -es
在安装目录的bin目录下
启动Zookeeper服务 : ./zkServer.sh start
查看Zookeeper服务状态 : ./zkServer.sh statu
停止Zookeeper服务 : ./zkServer.sh stop
重启Zookeeper服务 : ./zkServer.sh restart
连接ZooKeeper服务端
./zkCli.sh -server ip:port
断开客户端连接
quIT
设置节点的值
set /节点path value
查看帮助命令
help
删除单个节点
delete /节点path
显示指定目录下的节点
ls 目录
删除带有子节点的节点
deleteall /节点path
创建节点
create /节点path value
获取节点的值
get /节点path
创建临时节点
create -e /节点path value
创建顺序节点
create -s /节点path value
查询节点详细信息
ls -s /节点path
节点详细信息
czxid : 节点被创建的事务ID
dataversion : 数据版本号
ctime : 创建时间
aclversion : 权限版本号
mzxid : 最后一次被更新的事务ID
ephemeralOwner : 用于临时节点 ,代表临时节点的事务ID,如果为持久节点则为0
pzxid : 子节点列表最后一次被更新的事务ID
dataLength : 节点存储的数据长度
cversion : 子节点的版本号
numChildren : 当前节点的子节点数
Curator是apache ZooKeeper的Java客户端库
建立连接
添加节点
删除节点
修改节点
查询节点
Watch时间监听
分布式锁实现
/**
* Create a new client
*
* @param connectString list of servers to connect to "127.0.0.1:2181'127.0.0.2:8182"
* @param sessionTimeoutMs session timeout
* @param connectionTimeoutMs connection timeout
* @param retrypolicy retry policy to use
* @return client
*/
//重试策略1 每间隔x一共重试x次 每隔3秒连接一次一共连接10次
RetryPolicy r = new ExponentialBackoffRetry(3000, 10);
//第一种方式
CuratorFramework zookeeperClient = CuratorFrameworkFactory.newClient("116.62.71.234:2181",
60 * 1000,
15 * 1000,
r);
//开启连接
zookeeperClient.start();
//第二种连接方式:链式编程
CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.234:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
namespace("itheima").build();
//开启连接
zookeeperClient.start();
namespace : 创建根节点itheima(为了方便,不用之后每次进行客户端操作都写/根节点path)
public static void main(String[] args) throws Exception {
//第二种连接方式:链式编程
CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.235:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
namespace("itheima").build();
//开启连接
zookeeperClient.start();
/**
* 创建节点
* 1.基本创建 create().forPath("")
* 2.创建节点,带有数据 create.forPath("",data.getBytes())
* 3.设置节点的类型 create.withMode(CreateMode.ENUM).forPath("")
* 4.创建多级节点 create().creatingParentsIfNeeded().forPath("/app4/p1","hehe".getBytes());
*/
/**
* 基本创建
* 如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
*/
String path = zookeeperClient.create().forPath("/app1");
System.out.PRintln(path);
//关闭连接
zookeeperClient.close();
}
如果没有节点信息,则信息默认为客户端ip
public static void main(String[] args) throws Exception {
//第二种连接方式:链式编程
CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.235:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
namespace("itheima").build();
//开启连接
zookeeperClient.start();
/**
* 创建节点
* 1.基本创建 create().forPath("")
* 2.创建节点,带有数据 create.forPath("",data.getBytes())
* 3.设置节点的类型 create.withMode(CreateMode.ENUM).forPath("")
* 4.创建多级节点 create().creatingParentsIfNeeded().forPath("/app4/p1","hehe".getBytes());
*/
/**
* 基本创建,带有数据
* 如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
*/
String path = zookeeperClient.create().forPath("/app2","hehe".getBytes());
System.out.println(path);
//关闭连接
zookeeperClient.close();
}
默认为持久化类型
public static void main(String[] args) throws Exception {
//第二种连接方式:链式编程
CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.235:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
namespace("itheima").build();
//开启连接
zookeeperClient.start();
/**
* 创建节点
* 1.基本创建 create().forPath("")
* 2.创建节点,带有数据 create.forPath("",data.getBytes())
* 3.设置节点的类型 create.withMode(CreateMode.ENUM).forPath("")
* 4.创建多级节点 create().creatingParentsIfNeeded().forPath("/app4/p1","hehe".getBytes());
*/
/**
* 创建节点带有类型
* 默认类型:持久化
*/
String path = zookeeperClient.create().withMode(CreateMode.EPHEMERAL).forPath("/apP3","hehe".getBytes());
System.out.println(path);
//关闭连接 app3就会被释放
zookeeperClient.close();
}
ZooKeeper如果父节点不存在无法创建子节点
public static void main(String[] args) throws Exception {
//第二种连接方式:链式编程
CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.235:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
namespace("itheima").build();
//开启连接
zookeeperClient.start();
/**
* 创建节点
* 1.基本创建 create().forPath("")
* 2.创建节点,带有数据 create.forPath("",data.getBytes())
* 3.设置节点的类型 create.withMode(CreateMode.ENUM).forPath("")
* 4.创建多级节点 create().creatingParentsIfNeeded().forPath("/app4/p1","hehe".getBytes());
*/
/**
* 创建多级节点
* 父节点不存在无法直接创建子节点
* creatingParentsIfNeeded
*/
String path = zookeeperClient.create().creatingParentsIfNeeded().forPath("/app4/p1","hehe".getBytes());
System.out.println(path);
//关闭连接 app3就会被释放
zookeeperClient.close();
}
public static void main(String[] args) throws Exception {
//第二种连接方式:链式编程
CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.235:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
namespace("itheima").build();
//开启连接
zookeeperClient.start();
/**
* 查询节点
* 1.查询数据 get : getData().forPath()
* 2.查询子节点 ls getChildren.forPath();
* 3.查询节点状态信息 ; ls -s getData().storingStatIn(status).forPath("/app1")
*/
byte[] bytes = zookeeperClient.getData().forPath("/app1");
System.out.println(new String(bytes));
//关闭连接
zookeeperClient.close();
}
public static void main(String[] args) throws Exception {
//第二种连接方式:链式编程
CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.235:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
namespace("itheima").build();
//开启连接
zookeeperClient.start();
/**
* 查询节点
* 1.查询数据 get : getData().forPath()
* 2.查询子节点 ls getChildren.forPath();
* 3.查询节点状态信息 ; ls -s getData().storingStatIn(status).forPath("/app1")
*/
List<String> strings = zookeeperClient.getChildren().forPath("/app1");
//关闭连接
zookeeperClient.close();
}
返回的结果需要封装到一个JavaBean中
public static void main(String[] args) throws Exception {
//第二种连接方式:链式编程
CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.235:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
namespace("itheima").build();
//开启连接
zookeeperClient.start();
/**
* 查询节点
* 1.查询数据 get : getData().forPath()
* 2.查询子节点 ls getChildren.forPath();
* 3.查询节点状态信息 ; ls -s getData().storingStatIn(status).forPath("/app1")
*/
/**
* Stat是一个Bean将返回的节点的详细信息封装到bean中
*/
Stat status = new Stat();
zookeeperClient.getData().storingStatIn(status).forPath("/app1");
//关闭连接
zookeeperClient.close();
}
public static void main(String[] args) throws Exception {
//第二种连接方式:链式编程
CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.235:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
namespace("itheima").build();
//开启连接
zookeeperClient.start();
/**
* 修改节点数据
* 1.修改数据
* 2.根据版本修改
*/
zookeeperClient.setData().forPath("/app1","itcast".getBytes());
//关闭连接
zookeeperClient.close();
}
public static void main(String[] args) throws Exception {
//第二种连接方式:链式编程
CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.234:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
namespace("itheima").build();
//开启连接
zookeeperClient.start();
/**
* 修改节点数据
* 1.修改数据
* 2.根据版本修改
*/
/**
* 根据版本修改前需要先查出版本,查询出的版本和传入的版本号需要一致才能修改成功,目的就是让其他客户端或者其他线程不干扰我.
*/
Stat status = new Stat();
zookeeperClient.getData().storingStatIn(status).forPath("/app1");
//查询出版本
int version = status.getVersion();
System.out.println(version);
zookeeperClient.setData().withVersion(3).forPath("/app1","itcast".getBytes());
//关闭连接
zookeeperClient.close();
}
public static void main(String[] args) throws Exception {
//第二种连接方式:链式编程
CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.234:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
namespace("itheima").build();
//开启连接
zookeeperClient.start();
/**
* 删除节点 : delete deleteall
* 1.删除单个节点 delete().forPath();
* 2.删除带有子节点的节点 delete().deletingChildrenIfNeeded().forPath("/app1");
* 3.必须成功的删除 delete().guaranteed().forPath("/app1"); 为了防止网络抖动,本质就是重试.
* 4.删除回调 zookeeperClient.delete().guaranteed().inBackground(
*/
zookeeperClient.delete().forPath("/app1");
//关闭连接
zookeeperClient.close();
}
public static void main(String[] args) throws Exception {
//第二种连接方式:链式编程
CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.234:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
namespace("itheima").build();
//开启连接
zookeeperClient.start();
/**
* 删除节点 : delete deleteall
* 1.删除单个节点 delete().forPath();
* 2.删除带有子节点的节点 delete().deletingChildrenIfNeeded().forPath("/app1");
* 3.必须成功的删除 delete().guaranteed().forPath("/app1"); 为了防止网络抖动,本质就是重试.
* 4.删除回调 zookeeperClient.delete().guaranteed().inBackground(
*/
zookeeperClient.delete().deletingChildrenIfNeeded().forPath("/app1");
//关闭连接
zookeeperClient.close();
}
guaranteed();
public static void main(String[] args) throws Exception {
//第二种连接方式:链式编程
CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.234:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
namespace("itheima").build();
//开启连接
zookeeperClient.start();
/**
* 删除节点 : delete deleteall
* 1.删除单个节点 delete().forPath();
* 2.删除带有子节点的节点 delete().deletingChildrenIfNeeded().forPath("/app1");
* 3.必须成功的删除 delete().guaranteed().forPath("/app1"); 为了防止网络抖动,本质就是重试.
* 4.删除回调 zookeeperClient.delete().guaranteed().inBackground(
*/
zookeeperClient.delete().guaranteed().forPath("/app1");
//关闭连接
zookeeperClient.close();
}
inBackground(()->{})
public static void main(String[] args) throws Exception {
//第二种连接方式:链式编程
CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.234:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
namespace("itheima").build();
//开启连接
zookeeperClient.start();
/**
* 删除节点 : delete deleteall
* 1.删除单个节点 delete().forPath();
* 2.删除带有子节点的节点 delete().deletingChildrenIfNeeded().forPath("/app1");
* 3.必须成功的删除 delete().guaranteed().forPath("/app1"); 为了防止网络抖动,本质就是重试.
* 4.删除回调 zookeeperClient.delete().guaranteed().inBackground(
*/
zookeeperClient.delete().guaranteed().inBackground(new BackgroundCallback(){
@override
public void processResult(CuratorFramework client, Curatorevent event) throws Exception {
System.out.println("我被删除了");
System.out.println(event);
}
}).forPath("/app1");
//关闭连接
zookeeperClient.close();
}
ZooKeeper允许用户在指定节点上注册一些Watchr,并且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知到感兴趣的客户端上去,该机制时ZooKeeper实现分布式协调服务的重要特性.
ZooKeeper中引入了watcher机制来实现了发布/订阅功能.能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化的时候,会通知所有订阅者.
ZooKeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便需要开发人员自己反复注册Watcher,比较繁琐.
Curator引入了Cache来实现对ZooKeeper服务端事件的监听.
ZooKeeper提供了三种Watcher;
NodeCache : 只是监听了某一特定节点
PathChildrenCache : 监控一个ZNode的子节点
TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache组合.
public static void main(String[] args) throws Exception {
//第二种连接方式:链式编程
CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.234:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
namespace("itheima").build();
//开启连接
zookeeperClient.start();
/**
* NodeCache : 给指定一个节点注册监听器
*/
//1.创建NodeCache对象
NodeCache nodeCache = new NodeCache(zookeeperClient, "/app1");
//2.注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了");
//获取修改节点后的数据
byte[] data = nodeCache.getcurrentData().getData();
System.out.println(new String(data));
}
});
//3.开启监听,如果设置为true,则开启监听时,加载缓存数据
nodeCache.start();;
//while (true){} //保证线程不结束 , web环境不需要加,这里是主方法演示
//关闭连接
zookeeperClient.close();
}
参数event可以获得修改的详细信息
public static void main(String[] args) throws Exception {
//第二种连接方式:链式编程
CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.234:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
namespace("itheima").build();
//开启连接
zookeeperClient.start();
/**
* PathChildrenCache : 监听某个节点的字节点
*/
//1.创建PathChildrenCache对象
PathChildrenCache p = new PathChildrenCache(zookeeperClient,"/app2",true);
//2.注册监听
p.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("子节点发送变化了");
System.out.println(event);
//监听子节点的数据变更,并且拿到变更后的数据
//1.获取类型
PathChildrenCacheEvent.Type type = event.getType();
//2.判断类型是否为update
if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("数据变了");
byte[] data = event.getData().getData();
System.out.println(new String(data));
}
}
});
//3.开启监听,如果设置为true,则开启监听时,加载缓存数据
p.start();
//while (true){} //保证线程不结束 , web环境不需要加,这里是主方法演示
//关闭连接
zookeeperClient.close();
}
public static void main(String[] args) throws Exception {
//第二种连接方式:链式编程
CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.234:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
namespace("itheima").build();
//开启连接
zookeeperClient.start();
/**
* TreeCache : 监听某个节点及其子节点
*/
//1.创建NodeCache对象
TreeCache treeCache = new TreeCache(zookeeperClient, "/app1");
//2.注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("结点变化了");
System.out.println(event);
}
});
//3.开启监听,如果设置为true,则开启监听时,加载缓存数据
treeCache.start();
//关闭连接
zookeeperClient.close();
//while (true){} //保证线程不结束 , web环境不需要加,这里是主方法演示
}
在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者lock的方式来解决多线程见的代码同步问题,这时多线程的运行都是运行在同一 JVM下,没有任何问题
但当我们的应用时分布式集群工作的情况下,属于JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题
那么久需要一种更加高级的锁机制,来处理跨机器的进程之间的数据同步问题---这就是分布式锁.
核心思想 : 当客户端要获取锁,则创建节点,使用完锁则删除该节点
客户端获取锁时,在lock节点下创建临时顺序节点
然后分别获取lock下面的所有子节点,客户端获取到所有子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁.
如果发现自己创建的节点并非lock所有子节点中最小的,说明还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件.
如果发现比自己小得那个节点被删除,则客户端的Watcher会受到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取比自己小的一个节点并注册监听.
在Curator中有五种锁方案
Runnable
public class Ticket12306 implements Runnable {
private int tickets = 10;
//第二种连接方式:链式编程
private static CuratorFramework zookeeperClient = CuratorFrameworkFactory.builder().
connectString("116.62.71.235:2181").
sessionTimeoutMs(60 * 1000).
connectionTimeoutMs(15 * 1000).
retryPolicy(new ExponentialBackoffRetry(3000, 10)).
build();
private static InterProcessMutex lock = new InterProcessMutex(zookeeperClient,"/lock") ;
@Override
public void run() {
while (true){
//获取锁
try {
lock.acquire(3, TimeUnit.SECONDS);
if(tickets > 0){
System.out.println(Thread.currentThread().getName()+":"+tickets);
Thread.sleep(100);
tickets--;
}
} catch (Exception e) {
e.printStackTrace();
}finally {
//释放锁
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
Main
public class Lock12306 {
public static void main(String[] args) {
Ticket12306 t = new Ticket12306();
Thread t1 = new Thread(t,"携程");
Thread t2 = new Thread(t,"飞猪");
t1.start();
t2.start();
}
}
Leader选举 :
与创建单机环境类似
修改conf目录下的zoo_sample.CFg为zoo.cfg
修改data目录为指定目录,每个集群成员分别配置
在data目录下创建myid文件,内容分别为1,2,3
分别在conf目录下的zoo.cfg文件加入如下内容
server.1=192.168.149.135:2881:3881
server.2=192.168.149.135:2882:3882
server.3=192.168.149.135:2883:3883
解释F1a;server.服务器ID=服务器IP地址:服务器之间通信端口:服务器之间投票选举端口,搭建伪集群,端口可以写成127.0.0.1
Leader领导者
处理事务请求(增删改)
集群内部各个服务器的调度者
Follower跟随者
处理客户端非事务请求(查),转发事务请求给Leader服务器.
参数Leader选举投票
Observer观察者
处理客户端非事务请求,转发事务请求给Leader服务器.
以上是脚本宝典为你收集整理的ZooKeeper-B站黑马程序员,多多三连全部内容,希望文章能够帮你解决ZooKeeper-B站黑马程序员,多多三连所遇到的问题。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。