redis学习

在配置好redis集群后启动redis服务器,使用redis-server.exe redis.windows.conf就可以看到如下信息,代表redis客户端已经启动了,

redis学习

如果要练习客户端命令等,需要另外启动一个命令窗口,同样在同一个目录里,输入命令Redis-cli.exe就可以连接上了,如下:

redis学习


redis是一种高级的key:value存储系统,其中value支持五种数据类型:

1.字符串(strings)
2.字符串列表(lists)
3.字符串集合(sets)
4.有序字符串集合(sorted sets)

5.哈希(hashes)

1.key不要太长,尽量不要超过1024字节,这不仅消耗内存,而且会降低查找的效率;
2.key也不要太短,太短的话,key的可读性会降低;

3.在一个项目中,key最好使用统一的命名模式。

redis的字符串类型:String类型,如果redis不使用持久化那么它的功能就会跟memcached比较像。

String 类型是比较简单的类型,所以可以把它用来存储图片的二进制信息等等。

String类型也可以进行数值操作如下:

redis学习

在遇到数值操作时,redis会将字符串类型转换成数值。

由于INCR等指令本身就具有原子操作的特性,所以我们完全可以利用redis的INCR、INCRBY、DECR、DECRBY等指令来实现原子计数的效果,假如,在某种场景下有3个客户端同时读取了mynum的值(值为2),然后对其同时进行了加1的操作,那么,最后mynum的值一定是5。不少网站都利用redis的这个特性来实现业务上的统计计数需求。

lists类型:

redis底层使用的是list列表是链表类型而不是数组类型,在进行插入的时候比较简单,但是在进行定位的时候,时间复杂度会比较高。lists的常用操作包括LPUSH、RPUSH、LRANGE等。我们可以用LPUSH在lists的左侧插入一个新元素,用RPUSH在lists的右侧插入一个新元素,用LRANGE命令从lists中指定一个范围来提取元素。如下图:

redis学习

1.我们可以利用lists来实现一个消息队列,而且可以确保先后顺序,不必像MySQL那样还需要通过ORDER BY来进行排序。
2.利用LRANGE还可以很方便的实现分页的功能。

3.在博客系统中,每片博文的评论也可以存入一个单独的list中。

redis数据结构 – 集合

redis的集合,是一种无序的集合,集合中的元素没有先后顺序。集合相关的操作也很丰富,如添加新元素、删除已有元素、取交集、取并集、取差集等。

redis学习

redis数据结构 – 有序集合

有序集合中的每个元素都关联一个序号(score),这便是排序的依据。

我们都将redis中的有序集合叫做zsets,这是因为在redis中,有序集合相关的操作指令都是以z开头的,比如zrange、zadd、zrevrange、zrangebyscore等等

redis学习

redis数据结构 – 哈希

hashes存的是字符串和字符串值之间的映射,比如一个用户要存储其全名、姓氏、年龄等等,就很适合使用哈希

redis学习

redis的持久化的两种方式:RDB(Redis DataBase)和AOF(Append Only File)

RDB,简而言之,就是在不同的时间点,将redis存储的数据生成快照并存储到磁盘等介质上;

AOF,则是换了一个角度来实现持久化,那就是将redis执行过的所有写指令记录下来,在下次redis重新启动时,只要把这些写指令从前到后再重复执行一遍,就可以实现数据恢复了。

RDB持久化:

RDB方式,是将redis某一时刻的数据持久化到磁盘中,是一种快照式的持久化方法。

redis在进行数据持久化的过程中,会先将数据写入到一个临时文件中,待持久化过程都结束了,才会用这个临时文件替换上次持久化好的文件。正是这种特性,让我们可以随时来进行备份,因为快照文件总是完整可用的。

对于RDB方式,redis会单独创建(fork)一个子进程来进行持久化,而主进程是不会进行任何IO操作的,这样就确保了redis极高的性能。

如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。

虽然RDB有不少优点,但它的缺点也是不容忽视的。如果你对数据的完整性非常敏感,那么RDB方式就不太适合你,因为即使你每5分钟都持久化一次,当redis故障时,仍然会有近5分钟的数据丢失。所以,redis还提供了另一种持久化方式,那就是AOF。

AOF方式:

AOF方式是将执行过的写指令记录下来,在数据恢复时按照从前到后的顺序再将指令都执行一遍。

我们通过配置redis.conf中的appendonly yes就可以打开AOF功能。如果有写操作(如SET等),redis就会被追加到AOF文件的末尾。

默认的AOF持久化策略是每秒钟fsync一次(fsync是指把缓存中的写指令记录到磁盘中),因为在这种情况下,redis仍然可以保持很好的处理性能,即使redis故障,也只会丢失最近1秒钟的数据。

因为采用了追加方式,如果不做任何处理的话,AOF文件会变得越来越大,为此,redis提供了AOF文件重写(rewrite)机制,即当AOF文件的大小超过所设定的阈值时,redis就会启动AOF文件的内容压缩,只保留可以恢复数据的最小指令集。举个例子或许更形象,假如我们调用了100次INCR指令,在AOF文件中就要存储100条指令,但这明显是很低效的,完全可以把这100条指令合并成一条SET指令,这就是重写机制的原理。

在进行AOF重写时,仍然是采用先写临时文件,全部完成后再替换的流程,所以断电、磁盘满等问题都不会影响AOF文件的可用性,这点大家可以放心。

AOF方式的另一个好处,我们通过一个“场景再现”来说明。某同学在操作redis时,不小心执行了FLUSHALL,导致redis内存中的数据全部被清空了,这是很悲剧的事情。不过这也不是世界末日,只要redis配置了AOF持久化方式,且AOF文件还没有被重写(rewrite),我们就可以用最快的速度暂停redis并编辑AOF文件,将最后一行的FLUSHALL命令删除,然后重启redis,就可以恢复redis的所有数据到FLUSHALL之前的状态了。是不是很神奇,这就是AOF持久化方式的好处之一。但是如果AOF文件已经被重写了,那就无法通过这种方法来恢复数据了。

虽然优点多多,但AOF方式也同样存在缺陷,比如在同样数据规模的情况下,AOF文件要比RDB文件的体积大。而且,AOF方式的恢复速度也要慢于RDB方式。

如果你直接执行BGREWRITEAOF命令,那么redis会生成一个全新的AOF文件,其中便包括了可以恢复现有数据的最少的命令集。

如果运气比较差,AOF文件出现了被写坏的情况,也不必过分担忧,redis并不会贸然加载这个有问题的AOF文件,而是报错退出。这时可以通过以下步骤来修复出错的文件:

1.备份被写坏的AOF文件
2.运行redis-check-aof –fix进行修复
3.用diff -u来看下两个文件的差异,确认问题点
4.重启redis,加载修复后的AOF文件

聊聊redis持久化 – AOF重写

AOF重写的内部运行原理,我们有必要了解一下。

在重写即将开始之际,redis会创建(fork)一个“重写子进程”,这个子进程会首先读取现有的AOF文件,并将其包含的指令进行分析压缩并写入到一个临时文件中。

与此同时,主工作进程会将新接收到的写指令一边累积到内存缓冲区中,一边继续写入到原有的AOF文件中,这样做是保证原有的AOF文件的可用性,避免在重写过程中出现意外。

当“重写子进程”完成重写工作后,它会给父进程发一个信号,父进程收到信号后就会将内存中缓存的写指令追加到新AOF文件中。

当追加结束后,redis就会用新AOF文件来代替旧AOF文件,之后再有新的写指令,就都会追加到新的AOF文件中了。

对于我们应该选择RDB还是AOF,官方的建议是两个同时使用。这样可以提供更可靠的持久化方案。

主从 – 用法

主从结构,一是为了纯粹的冗余备份,二是为了提升读性能,比如很消耗性能的SORT就可以由从服务器来承担。

redis的主从同步是异步进行的,这意味着主从同步不会影响主逻辑,也不会降低redis的处理性能。

主从架构中,可以考虑关闭主服务器的数据持久化功能,只让从服务器进行持久化,这样可以提高主服务器的处理性能。

从服务器会向主服务器发出SYNC指令,当主服务器接到此命令后,就会调用BGSAVE指令来创建一个子进程专门进行数据持久化工作,也就是将主服务器的数据写入RDB文件中。在数据持久化期间,主服务器将执行的写指令都缓存在内存中。

在BGSAVE指令执行完成后,主服务器会将持久化好的RDB文件发送给从服务器,从服务器接到此文件后会将其存储到磁盘上,然后再将其读取到内存中。这个动作完成后,主服务器会将这段时间缓存的写指令再以redis协议的格式发送给从服务器。

另外,要说的一点是,即使有多个从服务器同时发来SYNC指令,主服务器也只会执行一次BGSAVE,然后把持久化好的RDB文件发给多个下游。在redis2.8版本之前,如果从服务器与主服务器因某些原因断开连接的话,都会进行一次主从之间的全量的数据同步;而在2.8版本之后,redis支持了效率更高的增量同步策略,这大大降低了连接断开的恢复成本。

主服务器会在内存中维护一个缓冲区,缓冲区中存储着将要发给从服务器的内容。从服务器在与主服务器出现网络瞬断之后,从服务器会尝试再次与主服务器连接,一旦连接成功,从服务器就会把“希望同步的主服务器ID”和“希望请求的数据的偏移位置(replication offset)”发送出去。主服务器接收到这样的同步请求后,首先会验证主服务器ID是否和自己的ID匹配,其次会检查“请求的偏移位置”是否存在于自己的缓冲区中,如果两者都满足的话,主服务器就会向从服务器发送增量内容。

增量同步功能,需要服务器端支持全新的PSYNC指令。这个指令,只有在redis-2.8之后才具有。

在聊redis事务处理之前,要先和大家介绍四个redis指令,即MULTI、EXEC、DISCARD、WATCH。这四个指令构成了redis事务处理的基础。

1.MULTI用来组装一个事务;
2.EXEC用来执行一个事务;
3.DISCARD用来取消一个事务;

4.WATCH用来监视一些key,一旦这些key在事务执行之前被改变,则取消事务的执行。

redis学习

我们看到了QUEUED的字样,这表示我们在用MULTI组装事务时,每一个命令都会进入到内存队列中缓存起来,如果出现QUEUED则表示我们这个命令成功插入了缓存队列,在将来执行EXEC时,这些被QUEUED的命令都会被组装成一个事务来执行。

对于事务的执行来说,如果redis开启了AOF持久化的话,那么一旦事务被成功执行,事务中的命令就会通过write命令一次性写到磁盘中去,如果在向磁盘中写的过程中恰好出现断电、硬件故障等问题,那么就可能出现只有部分命令进行了AOF持久化,这时AOF文件就会出现不完整的情况,这时,我们可以使用redis-check-aof工具来修复这一问题,这个工具会将AOF文件中不完整的信息移除,确保AOF文件完整可用。

1.调用EXEC之前的错误

2.调用EXEC之后的错误

调用EXEC之前的错误redis拒绝执行

调用EXEC之后的错误redis不会理睬这些错误,而是继续向下执行事务中的其他命令

WATCH相当于乐观锁:

WATCH本身的作用是“监视key是否被改动过”,而且支持同时监视多个key,只要还没真正触发事务,WATCH都会尽职尽责的监视,一旦发现某个key被修改了,在执行EXEC时就会返回nil,表示事务无法触发

redis学习

redis的使用命令:

三、常用命令
    1)连接操作命令
    quit:关闭连接(connection)
    auth:简单密码认证
    help cmd: 查看cmd帮助,例如:help quit
    
    2)持久化
    save:将数据同步保存到磁盘
    bgsave:将数据异步保存到磁盘
    lastsave:返回上次成功将数据保存到磁盘的Unix时戳
    shundown:将数据同步保存到磁盘,然后关闭服务
    
    3)远程服务控制
    info:提供服务器的信息和统计
    monitor:实时转储收到的请求
    slaveof:改变复制策略设置
    config:在运行时配置Redis服务器
    
    4)对value操作的命令
    exists(key):确认一个key是否存在
    del(key):删除一个key
    type(key):返回值的类型
    keys(pattern):返回满足给定pattern的所有key
    randomkey:随机返回key空间的一个
    keyrename(oldname, newname):重命名key
    dbsize:返回当前数据库中key的数目
    expire:设定一个key的活动时间(s)
    ttl:获得一个key的活动时间
    select(index):按索引查询
    move(key, dbindex):移动当前数据库中的key到dbindex数据库
    flushdb:删除当前选择数据库中的所有key
    flushall:删除所有数据库中的所有key
    
    5)String
    set(key, value):给数据库中名称为key的string赋予值value
    get(key):返回数据库中名称为key的string的value
    getset(key, value):给名称为key的string赋予上一次的value
    mget(key1, key2,…, key N):返回库中多个string的value
    setnx(key, value):添加string,名称为key,值为value
    setex(key, time, value):向库中添加string,设定过期时间time
    mset(key N, value N):批量设置多个string的值
    msetnx(key N, value N):如果所有名称为key i的string都不存在
    incr(key):名称为key的string增1操作
    incrby(key, integer):名称为key的string增加integer
    decr(key):名称为key的string减1操作
    decrby(key, integer):名称为key的string减少integer
    append(key, value):名称为key的string的值附加value
    substr(key, start, end):返回名称为key的string的value的子串
    
    6)List 
    rpush(key, value):在名称为key的list尾添加一个值为value的元素
    lpush(key, value):在名称为key的list头添加一个值为value的 元素
    llen(key):返回名称为key的list的长度
    lrange(key, start, end):返回名称为key的list中start至end之间的元素
    ltrim(key, start, end):截取名称为key的list
    lindex(key, index):返回名称为key的list中index位置的元素
    lset(key, index, value):给名称为key的list中index位置的元素赋值
    lrem(key, count, value):删除count个key的list中值为value的元素
    lpop(key):返回并删除名称为key的list中的首元素
    rpop(key):返回并删除名称为key的list中的尾元素
    blpop(key1, key2,… key N, timeout):lpop命令的block版本。
    brpop(key1, key2,… key N, timeout):rpop的block版本。
    rpoplpush(srckey, dstkey):返回并删除名称为srckey的list的尾元素,

              并将该元素添加到名称为dstkey的list的头部
    
    7)Set
    sadd(key, member):向名称为key的set中添加元素member
    srem(key, member) :删除名称为key的set中的元素member
    spop(key) :随机返回并删除名称为key的set中一个元素
    smove(srckey, dstkey, member) :移到集合元素
    scard(key) :返回名称为key的set的基数
    sismember(key, member) :member是否是名称为key的set的元素
    sinter(key1, key2,…key N) :求交集
    sinterstore(dstkey, (keys)) :求交集并将交集保存到dstkey的集合
    sunion(key1, (keys)) :求并集
    sunionstore(dstkey, (keys)) :求并集并将并集保存到dstkey的集合
    sdiff(key1, (keys)) :求差集
    sdiffstore(dstkey, (keys)) :求差集并将差集保存到dstkey的集合
    smembers(key) :返回名称为key的set的所有元素
    srandmember(key) :随机返回名称为key的set的一个元素
    
    8)Hash
    hset(key, field, value):向名称为key的hash中添加元素field
    hget(key, field):返回名称为key的hash中field对应的value
    hmget(key, (fields)):返回名称为key的hash中field i对应的value
    hmset(key, (fields)):向名称为key的hash中添加元素field 
    hincrby(key, field, integer):将名称为key的hash中field的value增加integer
    hexists(key, field):名称为key的hash中是否存在键为field的域
    hdel(key, field):删除名称为key的hash中键为field的域
    hlen(key):返回名称为key的hash中元素个数
    hkeys(key):返回名称为key的hash中所有键
    hvals(key):返回名称为key的hash中所有键对应的value
    hgetall(key):返回名称为key的hash中所有的键(field)及其对应的value


在java中配置使用:

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;


import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;


/**
 * 
 * @author Administrator
 *
 */
public class JRedisClient {

    private static String RESULT_OK = "OK";
private static Set<HostAndPort> clusterNodes = null;
private static JedisPoolConfig config = null;
private static JedisCluster jedisCluster=null;

private JRedisClient() {
super();
}  
    private static JRedisClient jredisClient=null;  


    public static synchronized  JRedisClient getInstance() {  
         if (jredisClient == null) {    
        jredisClient = new JRedisClient();  
        jredisClient.clusterInit();
         } 
         return jredisClient;  
    }  

/**
* @param args
*/
public static void main(String[] args) {

JRedisClient jredisClient =JRedisClient.getInstance();

jredisClient.set("test1", "hello world1");
jredisClient.set("test2", "hello world2");
jredisClient.set("test3", "hello world3");
jredisClient.set("test4", "hello world4");
}


/**
* 配置Redis分布式节点
*/
private void genClusterNode() {
    clusterNodes = new HashSet<HostAndPort>();
    clusterNodes.add(new HostAndPort("192.168.1.8", 7030));
    clusterNodes.add(new HostAndPort("192.168.1.8", 7031));
    clusterNodes.add(new HostAndPort("192.168.1.8", 7032));
    clusterNodes.add(new HostAndPort("192.168.1.8", 7033));
    clusterNodes.add(new HostAndPort("192.168.1.8", 7034));
    clusterNodes.add(new HostAndPort("192.168.1.8", 7035));


    
//     clusterNodes.add(new HostAndPort("192.168.199.95", 7021));
//     clusterNodes.add(new HostAndPort("192.168.199.95", 7022));
//     clusterNodes.add(new HostAndPort("192.168.199.95", 7023));
//     clusterNodes.add(new HostAndPort("192.168.199.95", 7024));
//     clusterNodes.add(new HostAndPort("192.168.199.95", 7025));
//     clusterNodes.add(new HostAndPort("192.168.199.95", 7026));
}

/**
* 实列化JRedis 线程池相关配置
*/
private void genJedisConfig() {
config = new JedisPoolConfig();
//pool 最大线程数
     config.setMaxTotal(100000);
     // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
     config.setMaxIdle(100);
     //表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
     config.setMaxWaitMillis(180);
     // 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
     config.setTestOnBorrow(true);
}

    /**
     * JRedis分布式初始化
     */
private void clusterInit() {
if(jedisCluster == null){
/*  配置Redis分布式节点 **/
    genClusterNode();
    /* 实列化JRedis 线程池相关配置 **/
    genJedisConfig();
    
    jedisCluster = new JedisCluster(clusterNodes, 5000, config);
}
}


/**
* 字符串类型数据存储
* @param key
* @param value
* @return
*/
public boolean  set(String key, String value)
    {
return this.set(key,0,value);
}

/**
* 字符串数据类型存储并设置有效期
* @param key
* @param value
* @return
*/
public boolean  set(String key, int seconds,String value)
    {
boolean isOk = false;
String returnResult = "";
if(seconds>0){
returnResult= jedisCluster.setex(key, seconds, value);
}else{
returnResult = jedisCluster.set(key, value);
}
if(RESULT_OK.equals(returnResult)){
isOk = true;
}
return isOk;
}
    
/**
* 获取存储的字符串类型数据
* @param key
* @return
*/
public String get(String key) {
return jedisCluster.get(key);
}

/**
* Map<String,String>类型数据存储
* @param key
* @param value
* @return
*/
public boolean hmset(String key, Map<String,String> hashMap){
if(hashMap.keySet().size() > 0) {
return this.hmset(key,0, hashMap);
}else {
return false;
}
}

/**
* Map<String,String>类型数据存储,并设置有效期
* @param key
* @param value
* @return
*/
public boolean hmset(String key,int seconds, Map<String,String> hashMap){
boolean isOk = false;
String returnResult = "";
returnResult = jedisCluster.hmset(key, hashMap);
if(seconds>0){
this.expire(key, seconds);
}
if(RESULT_OK.equals(returnResult)){
isOk = true;
}
return isOk;
}

/**
* 获取Map中的指定KEY值
* @param key
* @param field
* @return String
*/
public String  hget(String key,String field){
return jedisCluster.hget(key, field);
}

/**
* 获取指定key中Map
* @param key
* @return Map
*/
public Map  hget(String key){
return jedisCluster.hgetAll(key);
}

/**
* 计数递增
* @param key
* @return
*/
public long incr(String key){

return jedisCluster.incr(key);
}

/**
* 计数递减
* @param key
* @return
*/
public long decr(String key){
return jedisCluster.decr(key);
}



/**
* 设置KEY的有效期
* @param key
* @param seconds
* @return
*/
public long expire(String key,int seconds){
return jedisCluster.expire(key, seconds);
}

/**
* 删除指定KEY
* @param key
* @return
*/
public long del(String key){
return jedisCluster.del(key);
}

/**
* 存储List<String>数据
* @param key
* @param seconds
* @param list
*/
public long list(String key,List<String> list){
return this.list(key, 0, list);

}

/**
* 存储List<String>数据并设置有效期
* @param key
* @param seconds
* @param list
*/
public long list(String key,int seconds,List<String> list){
long returnResult = 0;
String[] strArray = null;
if(list!=null&&!list.isEmpty()){
strArray = new String[list.size()];
for(int i=0;i<list.size();i++){
strArray[i]=list.get(i).toString();
}
}
returnResult = jedisCluster.lpush(key,strArray);
if(seconds>0){
this.expire(key, seconds);
}
return returnResult;
}

/**
* 获取List<String>
* @param key
* @param start
* @param end
* @return
*/
public List<String> lrange(String key,long start,long end){
return jedisCluster.lrange(key, start, end);
}

/**
* KEYS 获取String数据类型的值
* @param pattern
* @return
*/
    public TreeSet<String> keys(String pattern){  
        TreeSet<String> keys = new TreeSet<String>();  
        Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();  
        for(String k : clusterNodes.keySet()){  
            JedisPool jp = clusterNodes.get(k);  
            Jedis connection = jp.getResource();  
            try {  
                keys.addAll(connection.keys(pattern));  
            } catch(Exception e){  
            e.printStackTrace();
            } finally{  
                connection.close();//用完一定要close这个链接!!!  
            }  
        }  
        return keys;  
    }  
}



使用:

public void LoadGoods() {
ArrayList<GoodsBook> books = mBookManager.getGoods();
HashMap<String, String> map = new HashMap<String, String>();
for (GoodsBook book : books) {
JSONArray jsonArray = JSONArray.fromObject(book);
String bkString = jsonArray.toString();
map.put(book.getId(), bkString);
}

mJRedisClient.hmset("books", map);

}