应对高并发量的数据汇聚方案
在统计文章的阅读数,点赞量或者高并发量的数据统计解决方案。
基本思路:
汇聚层:
1、希望所有的外部请求都将数据保存到汇聚层就可结束,类似于消息中间件。
汇聚的重点在于数据的可聚合性,比如文章的点赞数量,访问数量等等。
若请求过来的时候,发现汇聚层中有同一篇文章的数据,那么可以直接将此次的数据和已有数据进行结合(相加),那么就可以将相同文章的多次请求在内存中进行逻辑聚合。可以大大的减少数据库持久压力。
2、希望请求过来的时候,有数据的FIFO特性,就是数据的先入先出策略,不能够让一个请求昨天进来,而今天都还没有处理。
请求数量是无限制的,可以设置持久化线程数量,比如2个或者可用cpu数量等等。
具体实现代码为:
package com.hengyunsoft.converge;
import java.util.concurrent.atomic.AtomicInteger;
public class ConvergeEntiy {
private AtomicInteger lock = new AtomicInteger(0);
private AtomicInteger count = new AtomicInteger(0);
private Integer key;
public ConvergeEntiy(int count,Integer key) {
this.count.set(count);
this.key = key;
}
/**
* 对应的key标示
* @return
*/
public Integer getKey() {
return key;
}
/**
* 对此entity加锁,cas操作
* 若次所依据被移除,则将永远不能再进行加锁 {@link #tryRemoveLock(int)} {@link #removeLock()}
* @return 是否加锁成功,若加锁成功则必须配备一个{@link #unlock()}.进行解锁
*
*/
public boolean lock() {
int lockCount = lock.get();
while(lockCount >=0 ) {
if(!lock.compareAndSet(lockCount, lockCount+1)) {
lockCount = lock.get();
} else {
return true;
}
}
return false;
}
/**
* 解锁 与 {@link #lock()} 配对使用
*/
public void unlock() {
lock.decrementAndGet();
}
/**
* 统计当前对象上面有多少锁
* @return
*/
public int lockCount() {
return lock.get();
}
/**
* 尝试移除锁
* @param tryNum 尝试次数
* @return 是否移除成功
*/
public boolean tryRemoveLock(int tryNum) {
for (int i = 0; i < tryNum; i++) {
if(lock.compareAndSet(0, -1)) {
return true;
}
}
return false;
}
/**
* 将保存在此对象上面的值进行累加
* @param num
* @return 累加后的值
*/
public int addCount(int num) {
return count.addAndGet(num);
}
/**
* 移除锁
*/
public void removeLock() {
while(!this.lock.compareAndSet(0, -1)) ;
}
/**
* 获取值
* @return
*/
public int getCount() {
return count.get();
}
}
package com.hengyunsoft.converge;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
public class ConvergeMap {
private ConcurrentHashMap<Integer, ConvergeEntiy> converge = new ConcurrentHashMap<>();
/**
* 放入数据(就是生产数据)
* @param key 生产者标示,对应的就是将数据累与当前存在的值累加标示
* @param num 累加值
* @return 是与现有数据累加汇聚还是新增。若是累加汇聚就返回false,新增返回true
*/
public boolean produce(Integer key,int num) {
while(true) {
ConvergeEntiy entiy = converge.get(key);
if(entiy == null || !entiy.lock()) {
//没有数据获取或者对已有数据的锁获取失败
//构建新的数据放入聚合map
ConvergeEntiy oldEntity = converge.putIfAbsent(key, new ConvergeEntiy(num,key));
if(oldEntity != null) {
//已经存在,直接在存在的基础之上进行增加
if(oldEntity.lock()) {
//锁获取成功
oldEntity.addCount(num);
oldEntity.unlock();
return false; //聚合
}
//不成功则进行循环
} else {
//
return true; //新增
}
} else {
entiy.addCount(num);
entiy.unlock();
return false;//聚合
}
}
}
/**
* 在指定的keys中找到最容易消费的值进行返回
* 最容易是指在这些keys中,并发最小的
* @param keys
* @return
*/
public KVObject consumer(Integer... keys) {
List<ConvergeEntiy> entitys = new ArrayList<>(keys.length);
for (int i = 0; i < keys.length; i++) {
Integer key = keys[i];
ConvergeEntiy entity = this.converge.get(key);
if(entity != null) {
entitys.add(entity);
}
}
if(entitys.isEmpty()) {
return null;
}
Collections.sort(entitys, (r,j)->{
return Integer.compare(r.lockCount(),j.lockCount());
});
ConvergeEntiy entity = entitys.get(0);
this.converge.remove(entity.getKey()); //取加锁最少的那一个 等待时间就最少
entity.removeLock();
return new KVObject(entity.getKey(), entity.getCount());
}
public int size() {
return converge.size();
}
}
实体类
package com.hengyunsoft.converge;
public class KVObject {
private Integer key;
private Integer value;
public KVObject(Integer key, Integer value) {
super();
this.key = key;
this.value = value;
}
public Integer getKey() {
return key;
}
public Integer getValue() {
return value;
}
@Override
public String toString() {
return "KVObject [key=" + key + ", value=" + value + "]";
}
}
构建一个生产者消费者上下文,使得他们都在一个代码中实现,对外提供两个api接口
package com.hengyunsoft.converge;
import java.util.ArrayList;
import java.util.function.Consumer;
public class ProduceConsumer {
/**
* 保存聚合(汇聚)数据,用map加快数据的检索速度。因为在汇聚的时候,需要将老数据检索出来进行操作。
* 故将大量用到数据的检索。map是o(1)
*/
ConvergeMap convergeMap = new ConvergeMap();
/**
* 保持数据的存储顺序。因为map不具备数据的放入顺序。故用队列来进行数据的顺序记录
* 方便消费的时候满足数据的FIFO(先讲先出)策略。
*/
ProduceQueue produceQueue = new ProduceQueue();
public void increment(Integer key) throws InterruptedException {
if(key == null) {
return ;
}
boolean isNewAdd = convergeMap.produce(key, 1);
if(isNewAdd) {
//是新增数据,而不是在已有的数据之上进行增加
produceQueue.put(key);
}
}
public void consumer(Consumer<KVObject> consumer) throws InterruptedException {
int size = produceQueue.size();
int max = size == 0? 1:size>4?4:size;
ArrayList<Integer> keys = new ArrayList<Integer>(max);
for (int i = 0; i < max; i++) {
keys.add(produceQueue.poll());
}
for (int i = 0; i < max; i++) {
KVObject kv = convergeMap.consumer(keys.toArray(new Integer[0]));
if(kv == null) {
break ;
}
consumer.accept(kv);
keys.remove(kv.getKey());
}
}
public String test() {
return produceQueue.size() + " " + convergeMap.size();
}
}
构建一个消费队列,只是简单的封装了LinkedBlockingQueue,你也可以直接就实现LinkedBlockingQueue而不需要下面这个类。
package com.hengyunsoft.converge;
import java.io.Serializable;
import java.util.concurrent.LinkedBlockingQueue;
public class ProduceQueue implements Serializable {
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
public void put(Integer value) throws InterruptedException {
queue.put(value);
}
public Integer poll() throws InterruptedException {
return queue.take();
}
public int size() {
return queue.size();
}
}
测试类为: 以下测试请求线程18个,而持久消费线程为1个。看看请求线程全部的累计值与消费线程的消费值是否相等。
package poi.test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import com.hengyunsoft.converge.ProduceConsumer;
public class ProduceConsumerTest {
static volatile long consumerSum = 0;
public static void main(String[] args) throws InterruptedException {
final ProduceConsumer produceConsumer = new ProduceConsumer();
ExecutorService consumer = Executors.newSingleThreadExecutor();
final int produceThreadNum = 18;
final CountDownLatch countDownLatch = new CountDownLatch(produceThreadNum);
consumer.execute(()->{
while(true) {
try {
produceConsumer.consumer(r->{
consumerSum += r.getValue();
System.out.println(consumerSum);
});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
ExecutorService produces = Executors.newFixedThreadPool(produceThreadNum);
final AtomicLong atomicLong = new AtomicLong(0);
for (int i = 0; i < produceThreadNum; i++) {
produces.execute(()->{
int j=10000;
while((--j)>=0) {
try {
Thread.sleep(1L);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
atomicLong.incrementAndGet();
try {
produceConsumer.increment(ThreadLocalRandom.current().nextInt(10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
countDownLatch.countDown();
System.out.println("finish");
});
}
Thread.sleep(10000);
countDownLatch.await();
produces.shutdown();
System.out.println("produceSum "+atomicLong.get());
System.out.println(produceConsumer.test());
System.out.println("consumerSum "+consumerSum);
consumer.shutdown();
}
}