https://juejin.im/book/5afc2e5f6fb9a07a9b362527/
Basic tips:
String
Expand
The capacity will expand as two times speed within
1m
, otherwise it will increase1m
every time until the max capacity(512m
).Example
1
2
3
4
5
6
7
8
9
10
11
12
13> setex name 5 codehole # 5s equals:set+expire
> get name
"codehole"
... # wait for 5s
> get name
(nil)
> incrby age -5
(integer) 31
> set codehole 9223372036854775807 # Long.Max
OK
> incr codehole
(error) ERR increment or decrement would overflow...Tips
1 bit =8> 1 byte =much> string
So 1 bit =much> string, like bitmap structure.
list
Time Complexity
insert and delete O(1) find O(n)
Example :
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17> rpush books python java golang
(integer) 3
> lindex books 1 # O(n)
"java"
> lrange books 0 -1 # get all emelents,O(n)
1) "python"
2) "java"
3) "golang"
> ltrim books 1 -1 # O(n) -1 means the first last position
OK
> lrange books 0 -1
1) "java"
2) "golang"
> ltrim books 1 0 # Actually it empty the whole list
OK
> llen books
(integer) 0...Implement
The list is implemented by a quicklist, which will use ziplist as small size and change to quicklist when the size grow enough.
The quick list link different ziplist by
prev
andnext
as list.
Hash
Application scene
Lock
How do client do when set lock failed?
Throw exception and notify user to try again after a while.
Sleep then try again.
Not for the situation such as the too many message with too many colide cause one dead lock eventually kill the whole threads.
Move the request to dealy queue and try agin
Delay Queue
rpush/lpush
while(pop) {get}
, question is the queue will pop in loop which will cause the performance question.
There are two ways to solve this question:sleep(1)
or blocking read by usingblpop, brpop
.
Howerver, when the connection closed(Server always will close the connection which is idle thread), blpop/brpop will throw exception, so just use it carefully.How to implement the delay queue
message seralize as
zset
value while expire time as the score, then use mulityply thread to get the expire task to handle.value from
zrem
return decide the current instance has caught the task.meanwhile, u must catch the handle_msg to prevent the particular task being processed dead cause the circle exist not as supposed.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import redis.clients.jedis.Jedis;
public class RedisDelayingQueue<T> {
static class TaskItem<T> {
public String id;
public T msg;
}
// fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
private Type TaskType = new TypeReference<TaskItem<T>>() {
}.getType();
private Jedis jedis;
private String queueKey;
public RedisDelayingQueue(Jedis jedis, String queueKey) {
this.jedis = jedis;
this.queueKey = queueKey;
}
public void delay(T msg) {
TaskItem<T> task = new TaskItem<T>();
task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
task.msg = msg;
String s = JSON.toJSONString(task); // fastjson 序列化
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
}
public void loop() {
while (!Thread.interrupted()) {
// 只取一条
Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
if (values.isEmpty()) {
try {
Thread.sleep(500); // 歇会继续
} catch (InterruptedException e) {
break;
}
continue;
}
String s = values.iterator().next();
if (jedis.zrem(queueKey, s) > 0) { // 抢到了
TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
this.handleMsg(task.msg);
}
}
}
public void handleMsg(T msg) {
System.out.println(msg);
}
public static void main(String[] args) {
Jedis jedis = new Jedis();
RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");
Thread producer = new Thread() {
public void run() {
for (int i = 0; i < 10; i++) {
queue.delay("codehole" + i);
}
}
};
Thread consumer = new Thread() {
public void run() {
queue.loop();
}
};
producer.start();
consumer.start();
try {
producer.join();
Thread.sleep(6000);
consumer.interrupt();
consumer.join();
} catch (InterruptedException e) {
}
}
}- combine zrangebyscore and zerm as atomic operationa on the server to prevent the waste some threads get task but not competed success by using zrem.
HyperLogLog
UV situation
pfadd
&pfcount
Sparse Matrix converted to Dense Matrix when the data up to the threshold value.
https://juejin.im/book/5afc2e5f6fb9a07a9b362527/section/5b336548e51d4558a426ff56
BloomFiler
Distinct: remove old news which has been read