帮助中心/最新通知

质量为本、客户为根、勇于拼搏、务实创新

< 返回文章列表

【服务器相关】redis 集群批量操作实现

发表时间:2025-09-24 16:09:00 小编:主机乐-Yutio

 Redis集群是没法执行批量操作命令的,如mget,pipeline等。这是因为redis将集群划分为16383个哈希槽,不同的key会划分到不同的槽中。但是,Jedis客户端提供了计算key的slot方法,已经slot和节点之间的映射关系,通过这两个数据,就可以计算出每个key所在的节点,然后使用pipeline获取数据。具体代码如下:

初始化    JedisCluster类


@Configurationpublic class JedisClusterConfig {@Value("${spring.redis.cluster.nodes}")private String clusterNodes;@Value("${spring.redis.cache.commandTimeout}")private Integer commandTimeout;@Beanpublic JedisCluster getJedisCluster() {String[] serverArray = clusterNodes.split(",");Set<HostAndPort> nodes = new HashSet<>();for (String ipPort : serverArray) {String[] ipPortPair = ipPort.split(":");nodes.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim())));}return new JedisCluster(nodes, commandTimeout);}}

工具类 JedisClusterUtil


@Componentpublic class JedisClusterUtil {@Autowiredprivate JedisCluster jedisCluster;@Resource(name = "redisTemplate4Json")protected RedisTemplate<String, Object> redisTemplate;/** * ZSet批量查询 * @param keys * @return */public List<Object> batchZRange(List<String> keys) {List<Object> resList = new ArrayList<>();if (keys == null || keys.size() == 0) {return resList;}if (keys.size() == 1) {BoundZSetOperations<String, Object> operations = redisTemplate.boundZSetOps(keys.get(0));Set<Object> set = operations.reverseRange(0, 0);resList.add(set.iterator().next());return resList;}Map<JedisPool, List<String>> jedisPoolMap = getJedisPool(keys);List<String> keyList;JedisPool currentJedisPool = null;Pipeline currentPipeline;List<Object> res = new ArrayList<>();Map<String, Object> resultMap = new HashMap<>();//执行for (Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()) {Jedis jedis = null;try {currentJedisPool = entry.getKey();keyList = entry.getValue();//获取pipelinejedis = currentJedisPool.getResource();currentPipeline = jedis.pipelined();for (String key : keyList) {currentPipeline.zrevrange(key, 0, 0);}//从pipeline中获取结果res = currentPipeline.syncAndReturnAll();currentPipeline.close();for (int i = 0; i < keyList.size(); i++) {if (null == res.get(i)) {resultMap.put(keyList.get(i), null);} else {Set<Object> set = (Set<Object>) res.get(i);if (null == set || set.isEmpty()) {resultMap.put(keyList.get(i), null);} else {byte[] byteStr = set.iterator().next().toString().getBytes();Object obj = redisTemplate.getDefaultSerializer().deserialize(byteStr);resultMap.put(keyList.get(i), obj);}}}} catch (Exception e) {e.printStackTrace();} finally {returnResource(jedis, currentJedisPool);}}resList = sortList(keys, resultMap);return resList;}/** * Value批量查询 * @param keys * @return */public List<Object> batchGet(List<String> keys){List<Object> resList = new ArrayList<>();if (keys == null || keys.size() == 0) {return resList;}if (keys.size() == 1) {BoundValueOperations<String, Object> operations = redisTemplate.boundValueOps(keys.get(0));resList.add(operations.get());return resList;}Map<JedisPool, List<String>> jedisPoolMap = getJedisPool(keys);List<String> keyList;JedisPool currentJedisPool = null;Pipeline currentPipeline;List<Object> res = new ArrayList<>();Map<String, Object> resultMap = new HashMap<>();for (Map.Entry<JedisPool, List<String>> entry : jedisPoolMap.entrySet()) {Jedis jedis = null;try {currentJedisPool = entry.getKey();keyList = entry.getValue();//获取pipelinejedis = currentJedisPool.getResource();currentPipeline = jedis.pipelined();for (String key : keyList) {currentPipeline.get(key);}//从pipeline中获取结果res = currentPipeline.syncAndReturnAll();currentPipeline.close();for (int i = 0; i < keyList.size(); i++) {if (null == res.get(i)) {resultMap.put(keyList.get(i), null);} else {byte[] byteStr = keyList.get(i).toString().getBytes();Object obj = redisTemplate.getDefaultSerializer().deserialize(byteStr);resultMap.put(keyList.get(i), obj);}}} catch (Exception e) {e.printStackTrace();} finally {returnResource(jedis, currentJedisPool);}}resList = sortList(keys, resultMap);return resList;}private Map<JedisPool, List<String>> getJedisPool(List<String> keys){//JedisCluster继承了BinaryJedisCluster//BinaryJedisCluster的JedisClusterConnectionHandler属性//里面有JedisClusterInfoCache,根据这一条继承链,可以获取到JedisClusterInfoCache//从而获取slot和JedisPool直接的映射MetaObject metaObject = SystemMetaObject.forObject(jedisCluster);JedisClusterInfoCache cache = (JedisClusterInfoCache) metaObject.getValue("connectionHandler.cache");//保存地址+端口和命令的映射Map<JedisPool, List<String>> jedisPoolMap = new HashMap<>();JedisPool currentJedisPool = null;List<String> keyList;for (String key : keys) {//计算哈希槽int crc = JedisClusterCRC16.getSlot(key);//通过哈希槽获取节点的连接currentJedisPool = cache.getSlotPool(crc);//由于JedisPool作为value保存在JedisClusterInfoCache中的一个map对象中,每个节点的//JedisPool在map的初始化阶段就是确定的和唯一的,所以获取到的每个节点的JedisPool都是一样//的,可以作为map的keyif (jedisPoolMap.containsKey(currentJedisPool)) {jedisPoolMap.get(currentJedisPool).add(key);} else {keyList = new ArrayList<>();keyList.add(key);jedisPoolMap.put(currentJedisPool, keyList);}}return jedisPoolMap;}private List<Object> sortList(List<String> keys, Map<String, Object> params) {List<Object> resultList = new ArrayList<>();Iterator<String> it = keys.iterator();while (it.hasNext()) {String key = it.next();resultList.add(params.get(key));}return resultList;}/** * 释放jedis资源 * * @param jedis */public void returnResource(Jedis jedis, JedisPool jedisPool) {if (jedis != null && jedisPool != null) {jedisPool.returnResource(jedis);}}

 注意:一定要完成后释放 jedis 资源  不然会造成卡死现象

到此这篇关于redis 集群批量操作实现的文章就介绍到这了,更多相关redis 集群批量操作内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!


联系我们
返回顶部