1.利用Redis-Replicator实现Redis-Cluster集群同步,源端备端都是三主三从,分布在三台CentOS7.6,4核8G的linux服务器上。在单机同步成功的基础上,开了三个线程,分别对源备的三个master进行点对点同步,结果源端1000w数据,备端只同步了300w不到,但是每个线程启动后立刻sleep 30s左右,1000w数据一个不少
public class MigrationExample {
public static void main(String[] args) throws IOException, URISyntaxException {
sync("redis://127.0.0.1:6379", "redis://127.0.0.1:6380");
}
/*
* Precondition:
* 1. Make sure the two redis version is same.
* 2. Make sure the single key-value is not very big.(highly recommend less then 1 MB)
*
* We running following steps to sync two redis.
* 1. Get rdb stream from source redis.
* 2. Convert source rdb stream to redis dump format.
* 3. Use Jedis RESTORE command to restore that dump format to target redis.
* 4. Get aof stream from source redis and sync to target redis.
*/
public static void sync(String sourceUri, String targetUri) throws IOException, URISyntaxException {
RedisURI suri = new RedisURI(sourceUri);
RedisURI turi = new RedisURI(targetUri);
final ExampleClient target = new ExampleClient(turi.getHost(), turi.getPort());
Configuration tconfig = Configuration.valueOf(turi);
if (tconfig.getAuthPassword() != null) {
Object auth = target.send(AUTH, tconfig.getAuthPassword().getBytes());
System.out.println("AUTH:" + auth);
}
final AtomicInteger dbnum = new AtomicInteger(-1);
Replicator r = dress(new RedisReplicator(suri));
r.addEventListener(new EventListener() {
@Override
public void onEvent(Replicator replicator, Event event) {
if (event instanceof DumpKeyValuePair) {
DumpKeyValuePair dkv = (DumpKeyValuePair) event;
// Step1: select db
DB db = dkv.getDb();
int index;
if (db != null && (index = (int) db.getDbNumber()) != dbnum.get()) {
target.send(SELECT, toByteArray(index));
dbnum.set(index);
System.out.println("SELECT:" + index);
}
// Step2: restore dump data
if (dkv.getExpiredMs() == null) {
Object r = target.restore(dkv.getKey(), 0L, dkv.getValue(), true);
System.out.println(r);
} else {
long ms = dkv.getExpiredMs() - System.currentTimeMillis();
if (ms <= 0) return;
Object r = target.restore(dkv.getKey(), ms, dkv.getValue(), true);
System.out.println(r);
}
}
if (event instanceof DefaultCommand) {
// Step3: sync aof command
DefaultCommand dc = (DefaultCommand) event;
Object r = target.send(dc.getCommand(), dc.getArgs());
System.out.println(r);
}
}
});
r.addCloseListener(new CloseListener() {
@Override
public void handle(Replicator replicator) {
target.close();
}
});
r.open();
}
public static class ExampleClient implements Closeable {
private Jedis jedis;
public ExampleClient(final String host, final int port) {
DefaultJedisClientConfig.Builder config = DefaultJedisClientConfig.builder();
config.timeoutMillis(10000);
this.jedis = new Jedis(new HostAndPort(host, port), config.build());
}
public Object send(Protocol.Command cmd, final byte[]... args) {
Object r = jedis.sendCommand(cmd, args);
if (r instanceof byte[]) {
return Strings.toString(r);
} else {
return r;
}
}
public Object send(final byte[] cmd, final byte[]... args) {
return send(Protocol.Command.valueOf(Strings.toString(cmd).toUpperCase()), args);
}
public Object restore(byte[] key, long expired, byte[] dumped, boolean replace) {
if (!replace) {
return send(RESTORE, key, toByteArray(expired), dumped);
} else {
return send(RESTORE, key, toByteArray(expired), dumped, "REPLACE".getBytes());
}
}
@Override
public void close() {
if (jedis != null) {
jedis.close();
}
}
}
这段代码实现了单机源备同步,现在把它单独写在一个Thread类里的run方法中,有三台源备开三个线程同时同步,CPU百分之两百多,备端同步数据不全。想定位到哪里资源耗尽了导致程序异常或者让这三个线程能顺序执行,完成任务
引用chatGPT作答,从程序的CPU利用率超过100%的现象以及备端同步数据不全的情况,可以初步推断是多线程并发导致的资源竞争问题。具体的解决方案如下:
添加日志打印信息:在代码中添加日志打印信息,可以帮助我们了解到程序的运行情况,包括每个线程的执行情况,以及出现异常时的堆栈信息等。
添加锁机制:通过添加锁机制,可以避免多个线程同时访问同一个资源的问题。可以使用Java中的synchronized关键字或者Lock接口实现锁机制。
调整线程数:在多线程并发时,线程数的多少也会对程序的执行效率产生影响。可以适当调整线程数,使其在一定范围内,达到最优的性能。
调整Redis配置:在Redis中,可以通过修改配置文件的方式,调整内存使用、网络连接、并发数等参数,来优化Redis的性能。可以根据具体情况,适当调整Redis的配置。
优化代码:在程序编写时,需要注意代码的质量和效率。可以通过合理的算法设计、优化IO操作等方式,提高程序的执行效率,减少资源占用。同时,需要及时处理异常,避免程序出现崩溃等情况。