文章目录
  1. 1. RedisCluster集群模式下使用Pipeline批量操作-Java
    1. 1.1. 问题来了
    2. 1.2. 难道就放弃了
    3. 1.3. 那该如何
    4. 1.4. 辅助代码

RedisCluster集群模式下使用Pipeline批量操作-Java

在工作中无意中发现居然有些系统在完成某个功能(登录),竟然对redis产生近80次读写操作!对于这样子的读写操作,redis很快便导致系统延时过高,吞吐量低下;有没有什么办法或者redis指令可以将以上操作进行合并操作,提升系统性能和吞吐量呢?答案是:redis Pipeline(管道机制)eline指的是管道技术,指的是客户端允许将多个请求依次发给服务器,过程中而不需要等待请求的回复,在最后再一并读取结果即可。

问题来了

Pipeline功能居然不支持了

难道就放弃了

当然不了…….

那该如何

通过一下n步即可达到redis cluster也可以支持Pipeline。

  • 读取redis初始化配置
1
2
3
4
5
@Bean
@ConfigurationProperties(prefix = "spring.redis.cluster")
public RedisClusterProperties redisClusterProperties() {
return new RedisClusterProperties();
}
  • 创建JedisConnectionFactory
1
2
3
4
5
6
7
8
9
10
@Bean
public JedisConnectionFactory jedisConnectionFactory() {
Set<String> clusterNodes = clusterNodes();
JedisPoolConfig jedisPoolConfig = jedisPoolConfig();
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(new RedisClusterConfiguration(clusterNodes), jedisPoolConfig);
jedisConnectionFactory.setPassword(redisClusterProperties.getPassword());
jedisConnectionFactory.setDatabase(redisClusterProperties.getDatabase());

return jedisConnectionFactory;
}
  • 创建JedisCluster
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Bean
public JedisCluster redisCluster() {
try {
JedisCluster cluster = new JedisCluster(
getHostAndPorts(),
redisClusterProperties.getConnectionTimeout(),
redisClusterProperties.getSoTimeout(),
redisClusterProperties.getMaxAttempts(),
redisClusterProperties.getPassword(), jedisConnectionFactory().getPoolConfig());

return cluster;
} catch (Exception e) {
logger.error("Redis配置初始化出现错误:", e);
return null;
}
}

以上只是支持正常redis cluster的相关操作

  • 定义 nodeMapslotHostMap全局静态变量
1
2
public static Map<String, JedisPool> nodeMap = new ConcurrentHashMap<>();
public static TreeMap<Long, String> slotHostMap = new TreeMap<>();
  • 将以上 创建JedisCluster相关代码更改为如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Bean
public JedisCluster redisCluster() {
try {
JedisCluster cluster = new JedisCluster(
getHostAndPorts(),
redisClusterProperties.getConnectionTimeout(),
redisClusterProperties.getSoTimeout(),
redisClusterProperties.getMaxAttempts(),
redisClusterProperties.getPassword(), jedisConnectionFactory().getPoolConfig());

nodeMap = cluster.getClusterNodes();
String anyHost = nodeMap.keySet().iterator().next();
slotHostMap = getSlotHostMap(anyHost);

return cluster;
} catch (Exception e) {
logger.error("Redis配置初始化出现错误:", e);
return null;
}
}
  • Jedis对象返回
1
2
3
4
5
6
public static Jedis getJedis(String key) {
//根据key获取key分布在哪个slot上
int slot = JedisClusterCRC16.getSlot(key);
Map.Entry<Long, String> entry = slotHostMap.lowerEntry(Long.valueOf(slot));
return nodeMap.get(entry.getValue()).getResource();
}

Jedis是可以操作Pipeline的

  • 操作实例
1
2
3
4
5
6
7
8
9
......
Jedis jedis = getJedis("test");
Pipelined pipelined = jedis.pipelined();
for(int i=0; i<100; i++>){
pipelined.set("test" , i);
}

pipelined.sync();
......

辅助代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
private JedisPoolConfig jedisPoolConfig() {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(redisClusterProperties.getMaxTotal());
config.setMaxIdle(redisClusterProperties.getMaxIdle());
config.setMinIdle(redisClusterProperties.getMinIdle());
config.setMaxWaitMillis(redisClusterProperties.getMaxWaitMillis());
config.setTestOnCreate(redisClusterProperties.isTestOnCreate());
config.setTestOnBorrow(redisClusterProperties.isTestOnBorrow());
config.setTestOnReturn(redisClusterProperties.isTestOnReturn());

return config;
}

private Set<String> clusterNodes() {
Set<String> nodes = new HashSet<>();
String[] hostArray = redisClusterProperties.getClusterNodes().split(",");
for (String host : hostArray) {
nodes.add(host);
}

return nodes;
}

private Set<HostAndPort> getHostAndPorts() {
Set<String> nodes = clusterNodes();
Set<HostAndPort> hostAndPortSet = new HashSet<>();
for (String node : nodes) {
String[] ipAndPort = node.split(":");
hostAndPortSet.add(new HostAndPort(ipAndPort[0], Integer.parseInt(ipAndPort[1])));
}
return hostAndPortSet;
}

private TreeMap<Long, String> getSlotHostMap(String anyHostAndPortStr) {
TreeMap<Long, String> tree = new TreeMap<>();
String[] parts = anyHostAndPortStr.split(":");
HostAndPort anyHostAndPort = new HostAndPort(parts[0], Integer.parseInt(parts[1]));
JedisPool jedisPool = null;
try {
jedisPool = new JedisPool(jedisPoolConfig(), anyHostAndPort.getHost(), anyHostAndPort.getPort(), redisClusterProperties.getConnectionTimeout(), redisClusterProperties.getPassword());
List<Object> list = jedisPool.getResource().clusterSlots();
for (Object object : list) {
List<Object> list1 = (List<Object>) object;
List<Object> master = (List<Object>) list1.get(2);
String hostAndPort = new String((byte[]) master.get(0)) + ":" + master.get(1);
tree.put((Long) list1.get(0), hostAndPort);
tree.put((Long) list1.get(1), hostAndPort);
}

} catch (Exception e) {
logger.error("getSlotHostMap出现异常:", e);
} finally {
if (!Objects.isNull(jedisPool)) {
jedisPool.close();
}
}

return tree;
}
文章目录
  1. 1. RedisCluster集群模式下使用Pipeline批量操作-Java
    1. 1.1. 问题来了
    2. 1.2. 难道就放弃了
    3. 1.3. 那该如何
    4. 1.4. 辅助代码