RedisCluster集群模式下使用Pipeline批量操作-Java
在工作中无意中发现居然有些系统在完成某个功能(登录),竟然对redis产生近80次读写操作!对于这样子的读写操作,redis很快便导致系统延时过高,吞吐量低下;有没有什么办法或者redis指令可以将以上操作进行合并操作,提升系统性能和吞吐量呢?答案是:redis Pipeline(管道机制)eline指的是管道技术,指的是客户端允许将多个请求依次发给服务器,过程中而不需要等待请求的回复,在最后再一并读取结果即可。
问题来了
Pipeline功能居然不支持了
难道就放弃了
当然不了…….
那该如何
通过一下n步即可达到redis cluster也可以支持Pipeline。
1 2 3 4 5
| @Bean @ConfigurationProperties(prefix = "spring.redis.cluster") public RedisClusterProperties redisClusterProperties() { return new RedisClusterProperties(); }
|
1 2 3 4 5 6 7 8 9 10
| @Bean public JedisConnectionFactory jedisConnectionFactory() { Set<String> clusterNodes = clusterNodes(); JedisPoolConfig jedisPoolConfig = jedisPoolConfig(); JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(new RedisClusterConfiguration(clusterNodes), jedisPoolConfig); jedisConnectionFactory.setPassword(redisClusterProperties.getPassword()); jedisConnectionFactory.setDatabase(redisClusterProperties.getDatabase());
return jedisConnectionFactory; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Bean public JedisCluster redisCluster() { try { JedisCluster cluster = new JedisCluster( getHostAndPorts(), redisClusterProperties.getConnectionTimeout(), redisClusterProperties.getSoTimeout(), redisClusterProperties.getMaxAttempts(), redisClusterProperties.getPassword(), jedisConnectionFactory().getPoolConfig());
return cluster; } catch (Exception e) { logger.error("Redis配置初始化出现错误:", e); return null; } }
|
以上只是支持正常redis cluster的相关操作
- 定义
nodeMap
和slotHostMap
全局静态变量
1 2
| public static Map<String, JedisPool> nodeMap = new ConcurrentHashMap<>(); public static TreeMap<Long, String> slotHostMap = new TreeMap<>();
|
- 将以上 创建JedisCluster相关代码更改为如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Bean public JedisCluster redisCluster() { try { JedisCluster cluster = new JedisCluster( getHostAndPorts(), redisClusterProperties.getConnectionTimeout(), redisClusterProperties.getSoTimeout(), redisClusterProperties.getMaxAttempts(), redisClusterProperties.getPassword(), jedisConnectionFactory().getPoolConfig()); nodeMap = cluster.getClusterNodes(); String anyHost = nodeMap.keySet().iterator().next(); slotHostMap = getSlotHostMap(anyHost); return cluster; } catch (Exception e) { logger.error("Redis配置初始化出现错误:", e); return null; } }
|
1 2 3 4 5 6
| public static Jedis getJedis(String key) { //根据key获取key分布在哪个slot上 int slot = JedisClusterCRC16.getSlot(key); Map.Entry<Long, String> entry = slotHostMap.lowerEntry(Long.valueOf(slot)); return nodeMap.get(entry.getValue()).getResource(); }
|
Jedis是可以操作Pipeline的
1 2 3 4 5 6 7 8 9
| ...... Jedis jedis = getJedis("test"); Pipelined pipelined = jedis.pipelined(); for(int i=0; i<100; i++>){ pipelined.set("test" , i); }
pipelined.sync(); ......
|
辅助代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| private JedisPoolConfig jedisPoolConfig() { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(redisClusterProperties.getMaxTotal()); config.setMaxIdle(redisClusterProperties.getMaxIdle()); config.setMinIdle(redisClusterProperties.getMinIdle()); config.setMaxWaitMillis(redisClusterProperties.getMaxWaitMillis()); config.setTestOnCreate(redisClusterProperties.isTestOnCreate()); config.setTestOnBorrow(redisClusterProperties.isTestOnBorrow()); config.setTestOnReturn(redisClusterProperties.isTestOnReturn());
return config; }
private Set<String> clusterNodes() { Set<String> nodes = new HashSet<>(); String[] hostArray = redisClusterProperties.getClusterNodes().split(","); for (String host : hostArray) { nodes.add(host); }
return nodes; }
private Set<HostAndPort> getHostAndPorts() { Set<String> nodes = clusterNodes(); Set<HostAndPort> hostAndPortSet = new HashSet<>(); for (String node : nodes) { String[] ipAndPort = node.split(":"); hostAndPortSet.add(new HostAndPort(ipAndPort[0], Integer.parseInt(ipAndPort[1]))); } return hostAndPortSet; }
private TreeMap<Long, String> getSlotHostMap(String anyHostAndPortStr) { TreeMap<Long, String> tree = new TreeMap<>(); String[] parts = anyHostAndPortStr.split(":"); HostAndPort anyHostAndPort = new HostAndPort(parts[0], Integer.parseInt(parts[1])); JedisPool jedisPool = null; try { jedisPool = new JedisPool(jedisPoolConfig(), anyHostAndPort.getHost(), anyHostAndPort.getPort(), redisClusterProperties.getConnectionTimeout(), redisClusterProperties.getPassword()); List<Object> list = jedisPool.getResource().clusterSlots(); for (Object object : list) { List<Object> list1 = (List<Object>) object; List<Object> master = (List<Object>) list1.get(2); String hostAndPort = new String((byte[]) master.get(0)) + ":" + master.get(1); tree.put((Long) list1.get(0), hostAndPort); tree.put((Long) list1.get(1), hostAndPort); }
} catch (Exception e) { logger.error("getSlotHostMap出现异常:", e); } finally { if (!Objects.isNull(jedisPool)) { jedisPool.close(); } }
return tree; }
|