redis学习
在配置好redis集群后启动redis服务器,使用redis-server.exe redis.windows.conf就可以看到如下信息,代表redis客户端已经启动了,
如果要练习客户端命令等,需要另外启动一个命令窗口,同样在同一个目录里,输入命令Redis-cli.exe就可以连接上了,如下:
redis是一种高级的key:value存储系统,其中value支持五种数据类型:
1.字符串(strings)
2.字符串列表(lists)
3.字符串集合(sets)
4.有序字符串集合(sorted sets)
5.哈希(hashes)
1.key不要太长,尽量不要超过1024字节,这不仅消耗内存,而且会降低查找的效率;
2.key也不要太短,太短的话,key的可读性会降低;
3.在一个项目中,key最好使用统一的命名模式。
redis的字符串类型:String类型,如果redis不使用持久化那么它的功能就会跟memcached比较像。
String 类型是比较简单的类型,所以可以把它用来存储图片的二进制信息等等。
String类型也可以进行数值操作如下:
在遇到数值操作时,redis会将字符串类型转换成数值。
由于INCR等指令本身就具有原子操作的特性,所以我们完全可以利用redis的INCR、INCRBY、DECR、DECRBY等指令来实现原子计数的效果,假如,在某种场景下有3个客户端同时读取了mynum的值(值为2),然后对其同时进行了加1的操作,那么,最后mynum的值一定是5。不少网站都利用redis的这个特性来实现业务上的统计计数需求。
lists类型:
redis底层使用的是list列表是链表类型而不是数组类型,在进行插入的时候比较简单,但是在进行定位的时候,时间复杂度会比较高。lists的常用操作包括LPUSH、RPUSH、LRANGE等。我们可以用LPUSH在lists的左侧插入一个新元素,用RPUSH在lists的右侧插入一个新元素,用LRANGE命令从lists中指定一个范围来提取元素。如下图:
1.我们可以利用lists来实现一个消息队列,而且可以确保先后顺序,不必像MySQL那样还需要通过ORDER BY来进行排序。
2.利用LRANGE还可以很方便的实现分页的功能。
3.在博客系统中,每片博文的评论也可以存入一个单独的list中。
redis数据结构 – 集合
redis的集合,是一种无序的集合,集合中的元素没有先后顺序。集合相关的操作也很丰富,如添加新元素、删除已有元素、取交集、取并集、取差集等。
redis数据结构 – 有序集合
有序集合中的每个元素都关联一个序号(score),这便是排序的依据。
我们都将redis中的有序集合叫做zsets,这是因为在redis中,有序集合相关的操作指令都是以z开头的,比如zrange、zadd、zrevrange、zrangebyscore等等
redis数据结构 – 哈希
hashes存的是字符串和字符串值之间的映射,比如一个用户要存储其全名、姓氏、年龄等等,就很适合使用哈希
redis的持久化的两种方式:RDB(Redis DataBase)和AOF(Append Only File)
RDB,简而言之,就是在不同的时间点,将redis存储的数据生成快照并存储到磁盘等介质上;
AOF,则是换了一个角度来实现持久化,那就是将redis执行过的所有写指令记录下来,在下次redis重新启动时,只要把这些写指令从前到后再重复执行一遍,就可以实现数据恢复了。
RDB持久化:
RDB方式,是将redis某一时刻的数据持久化到磁盘中,是一种快照式的持久化方法。
redis在进行数据持久化的过程中,会先将数据写入到一个临时文件中,待持久化过程都结束了,才会用这个临时文件替换上次持久化好的文件。正是这种特性,让我们可以随时来进行备份,因为快照文件总是完整可用的。
对于RDB方式,redis会单独创建(fork)一个子进程来进行持久化,而主进程是不会进行任何IO操作的,这样就确保了redis极高的性能。
如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。
虽然RDB有不少优点,但它的缺点也是不容忽视的。如果你对数据的完整性非常敏感,那么RDB方式就不太适合你,因为即使你每5分钟都持久化一次,当redis故障时,仍然会有近5分钟的数据丢失。所以,redis还提供了另一种持久化方式,那就是AOF。
AOF方式:
AOF方式是将执行过的写指令记录下来,在数据恢复时按照从前到后的顺序再将指令都执行一遍。
我们通过配置redis.conf中的appendonly yes就可以打开AOF功能。如果有写操作(如SET等),redis就会被追加到AOF文件的末尾。
默认的AOF持久化策略是每秒钟fsync一次(fsync是指把缓存中的写指令记录到磁盘中),因为在这种情况下,redis仍然可以保持很好的处理性能,即使redis故障,也只会丢失最近1秒钟的数据。
因为采用了追加方式,如果不做任何处理的话,AOF文件会变得越来越大,为此,redis提供了AOF文件重写(rewrite)机制,即当AOF文件的大小超过所设定的阈值时,redis就会启动AOF文件的内容压缩,只保留可以恢复数据的最小指令集。举个例子或许更形象,假如我们调用了100次INCR指令,在AOF文件中就要存储100条指令,但这明显是很低效的,完全可以把这100条指令合并成一条SET指令,这就是重写机制的原理。
在进行AOF重写时,仍然是采用先写临时文件,全部完成后再替换的流程,所以断电、磁盘满等问题都不会影响AOF文件的可用性,这点大家可以放心。
AOF方式的另一个好处,我们通过一个“场景再现”来说明。某同学在操作redis时,不小心执行了FLUSHALL,导致redis内存中的数据全部被清空了,这是很悲剧的事情。不过这也不是世界末日,只要redis配置了AOF持久化方式,且AOF文件还没有被重写(rewrite),我们就可以用最快的速度暂停redis并编辑AOF文件,将最后一行的FLUSHALL命令删除,然后重启redis,就可以恢复redis的所有数据到FLUSHALL之前的状态了。是不是很神奇,这就是AOF持久化方式的好处之一。但是如果AOF文件已经被重写了,那就无法通过这种方法来恢复数据了。
虽然优点多多,但AOF方式也同样存在缺陷,比如在同样数据规模的情况下,AOF文件要比RDB文件的体积大。而且,AOF方式的恢复速度也要慢于RDB方式。
如果你直接执行BGREWRITEAOF命令,那么redis会生成一个全新的AOF文件,其中便包括了可以恢复现有数据的最少的命令集。
如果运气比较差,AOF文件出现了被写坏的情况,也不必过分担忧,redis并不会贸然加载这个有问题的AOF文件,而是报错退出。这时可以通过以下步骤来修复出错的文件:
1.备份被写坏的AOF文件
2.运行redis-check-aof –fix进行修复
3.用diff -u来看下两个文件的差异,确认问题点
4.重启redis,加载修复后的AOF文件
AOF重写的内部运行原理,我们有必要了解一下。
在重写即将开始之际,redis会创建(fork)一个“重写子进程”,这个子进程会首先读取现有的AOF文件,并将其包含的指令进行分析压缩并写入到一个临时文件中。
与此同时,主工作进程会将新接收到的写指令一边累积到内存缓冲区中,一边继续写入到原有的AOF文件中,这样做是保证原有的AOF文件的可用性,避免在重写过程中出现意外。
当“重写子进程”完成重写工作后,它会给父进程发一个信号,父进程收到信号后就会将内存中缓存的写指令追加到新AOF文件中。
当追加结束后,redis就会用新AOF文件来代替旧AOF文件,之后再有新的写指令,就都会追加到新的AOF文件中了。
对于我们应该选择RDB还是AOF,官方的建议是两个同时使用。这样可以提供更可靠的持久化方案。
主从 – 用法
主从结构,一是为了纯粹的冗余备份,二是为了提升读性能,比如很消耗性能的SORT就可以由从服务器来承担。
redis的主从同步是异步进行的,这意味着主从同步不会影响主逻辑,也不会降低redis的处理性能。
主从架构中,可以考虑关闭主服务器的数据持久化功能,只让从服务器进行持久化,这样可以提高主服务器的处理性能。
从服务器会向主服务器发出SYNC指令,当主服务器接到此命令后,就会调用BGSAVE指令来创建一个子进程专门进行数据持久化工作,也就是将主服务器的数据写入RDB文件中。在数据持久化期间,主服务器将执行的写指令都缓存在内存中。
在BGSAVE指令执行完成后,主服务器会将持久化好的RDB文件发送给从服务器,从服务器接到此文件后会将其存储到磁盘上,然后再将其读取到内存中。这个动作完成后,主服务器会将这段时间缓存的写指令再以redis协议的格式发送给从服务器。
另外,要说的一点是,即使有多个从服务器同时发来SYNC指令,主服务器也只会执行一次BGSAVE,然后把持久化好的RDB文件发给多个下游。在redis2.8版本之前,如果从服务器与主服务器因某些原因断开连接的话,都会进行一次主从之间的全量的数据同步;而在2.8版本之后,redis支持了效率更高的增量同步策略,这大大降低了连接断开的恢复成本。
主服务器会在内存中维护一个缓冲区,缓冲区中存储着将要发给从服务器的内容。从服务器在与主服务器出现网络瞬断之后,从服务器会尝试再次与主服务器连接,一旦连接成功,从服务器就会把“希望同步的主服务器ID”和“希望请求的数据的偏移位置(replication offset)”发送出去。主服务器接收到这样的同步请求后,首先会验证主服务器ID是否和自己的ID匹配,其次会检查“请求的偏移位置”是否存在于自己的缓冲区中,如果两者都满足的话,主服务器就会向从服务器发送增量内容。
增量同步功能,需要服务器端支持全新的PSYNC指令。这个指令,只有在redis-2.8之后才具有。
在聊redis事务处理之前,要先和大家介绍四个redis指令,即MULTI、EXEC、DISCARD、WATCH。这四个指令构成了redis事务处理的基础。
1.MULTI用来组装一个事务;
2.EXEC用来执行一个事务;
3.DISCARD用来取消一个事务;
4.WATCH用来监视一些key,一旦这些key在事务执行之前被改变,则取消事务的执行。
我们看到了QUEUED的字样,这表示我们在用MULTI组装事务时,每一个命令都会进入到内存队列中缓存起来,如果出现QUEUED则表示我们这个命令成功插入了缓存队列,在将来执行EXEC时,这些被QUEUED的命令都会被组装成一个事务来执行。
对于事务的执行来说,如果redis开启了AOF持久化的话,那么一旦事务被成功执行,事务中的命令就会通过write命令一次性写到磁盘中去,如果在向磁盘中写的过程中恰好出现断电、硬件故障等问题,那么就可能出现只有部分命令进行了AOF持久化,这时AOF文件就会出现不完整的情况,这时,我们可以使用redis-check-aof工具来修复这一问题,这个工具会将AOF文件中不完整的信息移除,确保AOF文件完整可用。
1.调用EXEC之前的错误
2.调用EXEC之后的错误
调用EXEC之前的错误redis拒绝执行
调用EXEC之后的错误redis不会理睬这些错误,而是继续向下执行事务中的其他命令
WATCH相当于乐观锁:
WATCH本身的作用是“监视key是否被改动过”,而且支持同时监视多个key,只要还没真正触发事务,WATCH都会尽职尽责的监视,一旦发现某个key被修改了,在执行EXEC时就会返回nil,表示事务无法触发
redis的使用命令:
三、常用命令
1)连接操作命令
quit:关闭连接(connection)
auth:简单密码认证
help cmd: 查看cmd帮助,例如:help quit
2)持久化
save:将数据同步保存到磁盘
bgsave:将数据异步保存到磁盘
lastsave:返回上次成功将数据保存到磁盘的Unix时戳
shundown:将数据同步保存到磁盘,然后关闭服务
3)远程服务控制
info:提供服务器的信息和统计
monitor:实时转储收到的请求
slaveof:改变复制策略设置
config:在运行时配置Redis服务器
4)对value操作的命令
exists(key):确认一个key是否存在
del(key):删除一个key
type(key):返回值的类型
keys(pattern):返回满足给定pattern的所有key
randomkey:随机返回key空间的一个
keyrename(oldname, newname):重命名key
dbsize:返回当前数据库中key的数目
expire:设定一个key的活动时间(s)
ttl:获得一个key的活动时间
select(index):按索引查询
move(key, dbindex):移动当前数据库中的key到dbindex数据库
flushdb:删除当前选择数据库中的所有key
flushall:删除所有数据库中的所有key
5)String
set(key, value):给数据库中名称为key的string赋予值value
get(key):返回数据库中名称为key的string的value
getset(key, value):给名称为key的string赋予上一次的value
mget(key1, key2,…, key N):返回库中多个string的value
setnx(key, value):添加string,名称为key,值为value
setex(key, time, value):向库中添加string,设定过期时间time
mset(key N, value N):批量设置多个string的值
msetnx(key N, value N):如果所有名称为key i的string都不存在
incr(key):名称为key的string增1操作
incrby(key, integer):名称为key的string增加integer
decr(key):名称为key的string减1操作
decrby(key, integer):名称为key的string减少integer
append(key, value):名称为key的string的值附加value
substr(key, start, end):返回名称为key的string的value的子串
6)List
rpush(key, value):在名称为key的list尾添加一个值为value的元素
lpush(key, value):在名称为key的list头添加一个值为value的 元素
llen(key):返回名称为key的list的长度
lrange(key, start, end):返回名称为key的list中start至end之间的元素
ltrim(key, start, end):截取名称为key的list
lindex(key, index):返回名称为key的list中index位置的元素
lset(key, index, value):给名称为key的list中index位置的元素赋值
lrem(key, count, value):删除count个key的list中值为value的元素
lpop(key):返回并删除名称为key的list中的首元素
rpop(key):返回并删除名称为key的list中的尾元素
blpop(key1, key2,… key N, timeout):lpop命令的block版本。
brpop(key1, key2,… key N, timeout):rpop的block版本。
rpoplpush(srckey, dstkey):返回并删除名称为srckey的list的尾元素,
并将该元素添加到名称为dstkey的list的头部
7)Set
sadd(key, member):向名称为key的set中添加元素member
srem(key, member) :删除名称为key的set中的元素member
spop(key) :随机返回并删除名称为key的set中一个元素
smove(srckey, dstkey, member) :移到集合元素
scard(key) :返回名称为key的set的基数
sismember(key, member) :member是否是名称为key的set的元素
sinter(key1, key2,…key N) :求交集
sinterstore(dstkey, (keys)) :求交集并将交集保存到dstkey的集合
sunion(key1, (keys)) :求并集
sunionstore(dstkey, (keys)) :求并集并将并集保存到dstkey的集合
sdiff(key1, (keys)) :求差集
sdiffstore(dstkey, (keys)) :求差集并将差集保存到dstkey的集合
smembers(key) :返回名称为key的set的所有元素
srandmember(key) :随机返回名称为key的set的一个元素
8)Hash
hset(key, field, value):向名称为key的hash中添加元素field
hget(key, field):返回名称为key的hash中field对应的value
hmget(key, (fields)):返回名称为key的hash中field i对应的value
hmset(key, (fields)):向名称为key的hash中添加元素field
hincrby(key, field, integer):将名称为key的hash中field的value增加integer
hexists(key, field):名称为key的hash中是否存在键为field的域
hdel(key, field):删除名称为key的hash中键为field的域
hlen(key):返回名称为key的hash中元素个数
hkeys(key):返回名称为key的hash中所有键
hvals(key):返回名称为key的hash中所有键对应的value
hgetall(key):返回名称为key的hash中所有的键(field)及其对应的value
在java中配置使用:
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
*
* @author Administrator
*
*/
public class JRedisClient {
private static String RESULT_OK = "OK";
private static Set<HostAndPort> clusterNodes = null;
private static JedisPoolConfig config = null;
private static JedisCluster jedisCluster=null;
private JRedisClient() {
super();
}
private static JRedisClient jredisClient=null;
public static synchronized JRedisClient getInstance() {
if (jredisClient == null) {
jredisClient = new JRedisClient();
jredisClient.clusterInit();
}
return jredisClient;
}
/**
* @param args
*/
public static void main(String[] args) {
JRedisClient jredisClient =JRedisClient.getInstance();
jredisClient.set("test1", "hello world1");
jredisClient.set("test2", "hello world2");
jredisClient.set("test3", "hello world3");
jredisClient.set("test4", "hello world4");
}
/**
* 配置Redis分布式节点
*/
private void genClusterNode() {
clusterNodes = new HashSet<HostAndPort>();
clusterNodes.add(new HostAndPort("192.168.1.8", 7030));
clusterNodes.add(new HostAndPort("192.168.1.8", 7031));
clusterNodes.add(new HostAndPort("192.168.1.8", 7032));
clusterNodes.add(new HostAndPort("192.168.1.8", 7033));
clusterNodes.add(new HostAndPort("192.168.1.8", 7034));
clusterNodes.add(new HostAndPort("192.168.1.8", 7035));
// clusterNodes.add(new HostAndPort("192.168.199.95", 7021));
// clusterNodes.add(new HostAndPort("192.168.199.95", 7022));
// clusterNodes.add(new HostAndPort("192.168.199.95", 7023));
// clusterNodes.add(new HostAndPort("192.168.199.95", 7024));
// clusterNodes.add(new HostAndPort("192.168.199.95", 7025));
// clusterNodes.add(new HostAndPort("192.168.199.95", 7026));
}
/**
* 实列化JRedis 线程池相关配置
*/
private void genJedisConfig() {
config = new JedisPoolConfig();
//pool 最大线程数
config.setMaxTotal(100000);
// 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
config.setMaxIdle(100);
//表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
config.setMaxWaitMillis(180);
// 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
config.setTestOnBorrow(true);
}
/**
* JRedis分布式初始化
*/
private void clusterInit() {
if(jedisCluster == null){
/* 配置Redis分布式节点 **/
genClusterNode();
/* 实列化JRedis 线程池相关配置 **/
genJedisConfig();
jedisCluster = new JedisCluster(clusterNodes, 5000, config);
}
}
/**
* 字符串类型数据存储
* @param key
* @param value
* @return
*/
public boolean set(String key, String value)
{
return this.set(key,0,value);
}
/**
* 字符串数据类型存储并设置有效期
* @param key
* @param value
* @return
*/
public boolean set(String key, int seconds,String value)
{
boolean isOk = false;
String returnResult = "";
if(seconds>0){
returnResult= jedisCluster.setex(key, seconds, value);
}else{
returnResult = jedisCluster.set(key, value);
}
if(RESULT_OK.equals(returnResult)){
isOk = true;
}
return isOk;
}
/**
* 获取存储的字符串类型数据
* @param key
* @return
*/
public String get(String key) {
return jedisCluster.get(key);
}
/**
* Map<String,String>类型数据存储
* @param key
* @param value
* @return
*/
public boolean hmset(String key, Map<String,String> hashMap){
if(hashMap.keySet().size() > 0) {
return this.hmset(key,0, hashMap);
}else {
return false;
}
}
/**
* Map<String,String>类型数据存储,并设置有效期
* @param key
* @param value
* @return
*/
public boolean hmset(String key,int seconds, Map<String,String> hashMap){
boolean isOk = false;
String returnResult = "";
returnResult = jedisCluster.hmset(key, hashMap);
if(seconds>0){
this.expire(key, seconds);
}
if(RESULT_OK.equals(returnResult)){
isOk = true;
}
return isOk;
}
/**
* 获取Map中的指定KEY值
* @param key
* @param field
* @return String
*/
public String hget(String key,String field){
return jedisCluster.hget(key, field);
}
/**
* 获取指定key中Map
* @param key
* @return Map
*/
public Map hget(String key){
return jedisCluster.hgetAll(key);
}
/**
* 计数递增
* @param key
* @return
*/
public long incr(String key){
return jedisCluster.incr(key);
}
/**
* 计数递减
* @param key
* @return
*/
public long decr(String key){
return jedisCluster.decr(key);
}
/**
* 设置KEY的有效期
* @param key
* @param seconds
* @return
*/
public long expire(String key,int seconds){
return jedisCluster.expire(key, seconds);
}
/**
* 删除指定KEY
* @param key
* @return
*/
public long del(String key){
return jedisCluster.del(key);
}
/**
* 存储List<String>数据
* @param key
* @param seconds
* @param list
*/
public long list(String key,List<String> list){
return this.list(key, 0, list);
}
/**
* 存储List<String>数据并设置有效期
* @param key
* @param seconds
* @param list
*/
public long list(String key,int seconds,List<String> list){
long returnResult = 0;
String[] strArray = null;
if(list!=null&&!list.isEmpty()){
strArray = new String[list.size()];
for(int i=0;i<list.size();i++){
strArray[i]=list.get(i).toString();
}
}
returnResult = jedisCluster.lpush(key,strArray);
if(seconds>0){
this.expire(key, seconds);
}
return returnResult;
}
/**
* 获取List<String>
* @param key
* @param start
* @param end
* @return
*/
public List<String> lrange(String key,long start,long end){
return jedisCluster.lrange(key, start, end);
}
/**
* KEYS 获取String数据类型的值
* @param pattern
* @return
*/
public TreeSet<String> keys(String pattern){
TreeSet<String> keys = new TreeSet<String>();
Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();
for(String k : clusterNodes.keySet()){
JedisPool jp = clusterNodes.get(k);
Jedis connection = jp.getResource();
try {
keys.addAll(connection.keys(pattern));
} catch(Exception e){
e.printStackTrace();
} finally{
connection.close();//用完一定要close这个链接!!!
}
}
return keys;
}
}
使用:
public void LoadGoods() {
ArrayList<GoodsBook> books = mBookManager.getGoods();
HashMap<String, String> map = new HashMap<String, String>();
for (GoodsBook book : books) {
JSONArray jsonArray = JSONArray.fromObject(book);
String bkString = jsonArray.toString();
map.put(book.getId(), bkString);
}
mJRedisClient.hmset("books", map);
}