哪个zookeeper是master节点?
哪个zookeeper是master节点?
这里不区分主从,任何节点都可以用,默认连接第一个,集群状态由集群去维护,应用不关心,通过zookeeper客户端依赖包去完成读写操作。
随着分布式系统的流行,现在许多服务都需要24小时工作,如果机器挂了的话,我们希望能有其他机器顶替他继续工作。这种问题通常采用master-slave模式,即是正常情况下主机提供服务,备用机器监听主机状态,当主机发生异常时,会选取一个备用机器代替主机器提供服务,这个选举过程即是master选举。
由于zookeeper能保证同一时刻只能具有一个主节点,即使有网络问题,zookeeper的集群也完全可以解决,而且zookeeper的通知机制,客户端可以监听节点的变化,所以可以考虑采用zookeeper实现master选举。其实现原理图如下:
上图中,左侧是zookeeper节点,master节点保存当前master的机器,server节点保存可用的备用机器,每个服务器在创建的时候会进行选举,即每个服务器同时去创建master节点,只有一个机器能创建成功,创建成功的机器则被选为master,其他机器会同时监听master节点的变化,当主机器挂了时,master节点会被删除,其他机器会重新进行master选举。
用zkclient实现master选举
/**
* @author 全恒
* 用户中心,用来模拟 争抢master节点的机器
*/
public class UserCenter implements Serializable{
private static final long serialVersionUID = -1776114173857775665L;
private int mc_id; //机器信息
private String mc_name;//机器名称
public int getMc_id() {
return mc_id;
}
public void setMc_id(int mc_id) {
this.mc_id = mc_id;
}
public String getMc_name() {
return mc_name;
}
public void setMc_name(String mc_name) {
this.mc_name = mc_name;
}
@Override
public String toString() {
return "UserCenter{" + "mc_id=" + mc_id + ", mc_name='" + mc_name + '\'' + '}';
}
}
/**
* @author 全恒
* 提供master选举的服务
*/
public class MasterSelector {
private ZkClient zkClient;
private final static String MASTER_PATH = "/master"; //需要争抢的master节点
private IZkDataListener dataListener; //注册节点内容变化
private UserCenter server; //其他服务器
private UserCenter master; //master节点
private boolean isRunning = false;
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
public MasterSelector(UserCenter server, ZkClient zkClient) {
System.out.println("[" + server + "] 去争抢master权限");
this.server = server;
this.zkClient = zkClient;
this.dataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
//节点如果被删除, 重新进行master选举
chooseMaster();
}
};
}
public void start(){
//开始选举,如果当前机器已经启动,则不进行任何处理
if(!isRunning){
isRunning = true;
//注册节点事件,使当前客户端机器监听master节点的删除动作
zkClient.subscribeDataChanges(MASTER_PATH, dataListener);
chooseMaster();
}
}
public void stop(){
//停止,如果当前机器已经停止,则不进行任何处理
if(isRunning){
isRunning = false;
//关闭定时器
scheduledExecutorService.shutdown();
//取消订阅
zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);
releaseMaster();
}
}
//具体选master的实现逻辑
private void chooseMaster(){
if(!isRunning){
System.out.println("当前服务没有启动");
return ;
}
try {
zkClient.createEphemeral(MASTER_PATH, server);
master=server; //把server节点赋值给master
System.out.println(master + "->我现在已经是master,你们要听我的");
//使用定时器模拟 主服务器出现故障,每2秒释放一次
scheduledExecutorService.schedule(()->{
releaseMaster();//释放锁(模拟故障的发生)
}, 2, TimeUnit.SECONDS);
} catch (ZkNodeExistsException e){
//表示master已经存在
UserCenter userCenter = zkClient.readData(MASTER_PATH, true);
if(userCenter == null) {
System.out.println("启动操作:");
chooseMaster(); //再次获取master
} else {
master = userCenter;
}
}
}
//释放锁(模拟故障的发生)
private void releaseMaster(){
//判断当前是不是master,只有master才需要释放
if(checkMaster()){
zkClient.delete(MASTER_PATH); //删除
}
}
//判断当前的server是不是master
private boolean checkMaster(){
UserCenter userCenter = zkClient.readData(MASTER_PATH);
if(userCenter.getMc_name().equals(server.getMc_name())){
master = userCenter;
return true;
}
return false;
}
}
/**
* @author 全恒
* 模拟多个机器抢夺master角色
*/
public class MasterChooseTest {
private final static String CONNECTSTRING = "192.168.123.38:2181,192.168.123.55:2181," +
"192.168.123.45:2181,192.168.123.174:2181";
public static void main(String[] args) throws IOException {
List<MasterSelector> selectorLists = new ArrayList<>();
try {
for(int i=0; i<10; i++) {
ZkClient zkClient = new ZkClient(CONNECTSTRING, 5000,
5000, new SerializableSerializer());
UserCenter userCenter = new UserCenter();
userCenter.setMc_id(i);
userCenter.setMc_name("客户端:" + i);
MasterSelector selector = new MasterSelector(userCenter, zkClient);
selectorLists.add(selector);
selector.start();//触发选举操作
TimeUnit.SECONDS.sleep(1);//睡眠1秒
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
for(MasterSelector selector:selectorLists){
selector.stop();
}
}
}
}
用curator实现master选举
public class MasterSelector {
private final static String CONNECTSTRING = "192.168.123.38:2181,192.168.123.55:2181," +
"192.168.123.45:2181,192.168.123.174:2181";
private final static String MASTER_PATH = "/curator_master_path";
public static void main(String[] args) {
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
.connectString(CONNECTSTRING)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
@SuppressWarnings("resource")
LeaderSelector leaderSelector = new LeaderSelector(curatorFramework, MASTER_PATH, new LeaderSelectorListenerAdapter() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println("获得leader成功");
TimeUnit.SECONDS.sleep(2);
}
});
leaderSelector.autoRequeue();//自动争抢leader
leaderSelector.start();//开始选举
}
}
采用基于ZooKeeper的Leader Latch实现Master选举。具体步骤如下:
在分布式系统中,多台机器同时在ZooKeeper的同一节点下创建临时节点,使用Curator提供的LeaderLatch实现,ZooKeeper保证只有一个节点能创建成功,即成为Master节点。
使用Curator提供的方法判断当前节点是否为Master节点。
下面是一个示例代码,使用Spring定时任务实现定时任务逻辑,来源于网络,有一部分代码需要调整参数和节点路径:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class RegularTask {
private static final Logger log = LoggerFactory.getLogger(RegularTask.class);
//zk集群,可自行调整参数和节点路径
private static final String ZOOKEEPER_STR = "127.0.0.1:2181";
private static CuratorFramework curatorFramework;
private static LeaderLatch leaderLatch;
static {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
curatorFramework = CuratorFrameworkFactory.newClient(ZOOKEEPER_STR, retryPolicy);
curatorFramework.start();
leaderLatch = new LeaderLatch(curatorFramework, "/regulartask");
try {
leaderLatch.start();
} catch (Exception e) {
log.error("LeaderLatch start error:{}", e);
}
}
@Lazy
@Scheduled(cron = "0 0/1 * * * ?")
public void send() {
try {
//判断是否为master节点
if (!leaderLatch.hasLeadership()) {
log.info("current machine is not a leader");
return;
}
//TODO 定时任务逻辑
} catch (Exception e) {
log.error("regulartask run error:{}", e);
} finally {
try {
if (leaderLatch != null) {
leaderLatch.close();
}
} catch (IOException e) {
log.error("leaderLatch close error:{}", e);
e.printStackTrace();
}
}
}
}