大数据去除重复--实战(一)
最近快过年了,来了一个紧急任务,加班加点的一周,终于上线了。也没多少时间去研究出去重复数据的算法,上一篇文章的算法,理论是可以的!但是由于我采用的行迭代的方式,JVM 会出现栈的深度溢出,我就换了一种方式,这里再次介绍给大家:
回顾一下题目:超过内存限制的URL,去除重复数据!
我的方法是根据hashCode 范围进行分组。比如文件A,假设有1亿行,A作为原始文件,然后循环读取每一行,根据hashCode 值分成两部分文件。初始我以int 的最大值和最小值为边界,0 作为分界线开始分,hashCode 大于0的放右边A-right,小于0的放一边,A-left
假设第一次分割成 A-right , A-left 两个 文件,范围分别是2^31-0 和 0-2^31,然后记录两个文件hashCode 的范围,作为下一次分割条件,然后继续判断A-right 和A-left 文件是否是内存范围,迭代此方法,直到分割成内存能够读取的文件A-N,然后读入内存进行除重复,追加进汇总文件。
这里临时画个图解释(图有点丑,谅解~。~):
这里我先临时写了一个生成文件的类,数据格式:www.+随即数字+.com 的形式
package com.file;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.util.Random;
/**
* 生成内存不能一次读取的文件
* @author Ran
*
*/
public class InitFiles {
private final static String PREFIX = "www.";
private final static String SUFFIX = ".com";
private final static Random RANDOM = new Random();
// 每一次写入的行数,方便监控
private final static int STRING_LINE_NUM = 1000*100*4;
// 随即生成的数字 ,假设是网站地址
public static final int RANDOM_NUM = Integer.MAX_VALUE;
// 缓冲区大小
static final int CACHE_SIZE = 1024*1024;
// 生成文件的大小,这里就2G吧
static final long FILE_MAX_LENGTH = 1024*1024*1024*2l;
// 记录总行数
static long file_lines = 0;
public static void main(String[] args) {
// 看看运行状态
printMem();
long begin = System.currentTimeMillis();
writeFile("bigDate.txt");
long result = System.currentTimeMillis()-begin;
System.out.println("总行数"+file_lines);
System.out.println("生成文件总时间:"+result+"毫秒");
}
/**
* 循环写入文件,这里临时写死,生成的大概是1G左右的文件
*/
public static void writeFile(String fileName){
Writer writer = null;
File file = null;
// 停止标志
boolean isWrite = true;
try {
file = new File(fileName);
writer = new BufferedWriter(new FileWriter(file),CACHE_SIZE);
while(isWrite){
writer.write(initStr());
if(file.length() > FILE_MAX_LENGTH){
isWrite = false;
}
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally{
try {
if(writer != null){
writer.flush();
writer.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 生成字符串,大量字符串 写入,会快一些
* @return
*/
public static String initStr(){
StringBuilder sb = new StringBuilder(1000000);
int i = 0;
// 默认10W行 一次写入
while(i<STRING_LINE_NUM){
sb.append(PREFIX);
sb.append(getContext());
sb.append(SUFFIX);
sb.append("\r\n");
i++;
}
file_lines += i;
return sb.toString();
}
/**
* 生成随机数,作为网站中间地址
* 参数 自己调节,测试方便
* @return
*/
public static String getContext(){
String context = String.valueOf(RANDOM.nextInt(RANDOM_NUM));
return context;
}
/**
* 每次写入时 信息
*/
public static void printMem(){
print("已用内存:"+currRuntime.totalMemory()/1024/1024+" MB");
print("最大内存:"+currRuntime.maxMemory()/1024/1024+" MB");
print("可用内存:"+currRuntime.freeMemory()/1024/1024+" MB");
}
/**
* 方便我测试 是否打印,影响速度
* @param str
*/
public static void print(String str){
System.out.println(str);
}
}
生成2G 文件,我这里花了:
总行数:136800000 行
生成文件总时间:75812毫秒
下面是除去重复的类代码:
package com.files;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PartFile {
// 内存监控
final static Runtime currRuntime = Runtime.getRuntime ();
// 最小的空闲空间,额,可以用来 智能控制,- -考虑到GC ,暂时没用
final static long MEMERY_LIMIT = 1024 * 1024 * 3;
// 内存限制,我内存最多容纳的文件大小
static final long FILE_LIMIT_SIZE = 1024*1024*20;
// 文件写入缓冲区 ,我默认1M
static final int CACHE_SIZE = 1024*1024;
// 默认文件后缀
static final String FILE_SUFFIX = ".txt";
// 临时分割的文件目录,可以删除~。~
static final String FILE_PREFIX = "test/";
// 汇总的文件名
static final String REQUST_FILE_NAME = "resultFile.txt";
// 存放大文件 引用,以及分割位置
static List<ChildFile> bigChildFiles = new ArrayList<ChildFile>();
// 存放小文件的,驱除重复数据
static Map<String,String> fileLinesMap = new HashMap<String,String>(10000);
public static void main(String[] args) {
long begin = System.currentTimeMillis();
new PartFile().partFile(new File("bigData.txt"),
Integer.MAX_VALUE,Integer.MIN_VALUE);
long result = System.currentTimeMillis()-begin;
System.out.println("除去重复时间为:"+result +" 毫秒");
}
// 按hashCode 范围分割
public void partFile(File origFile,long maxNum,long minNum) {
String line = null;
long hashCode = 0;
long max_left_hashCode = 0;
long min_left_hashCode = 0;
long max_right_hashCode = 0;
long min_right_hashCode = 0;
BufferedWriter rightWriter = null;
BufferedWriter leftWriter = null;
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(origFile));
long midNum = (maxNum+minNum)/2;
// 以文件hashCode 范围作为子文件名
File leftFile = new File(FILE_PREFIX+minNum+"_"+midNum+FILE_SUFFIX);
File rightFile = new File(FILE_PREFIX+midNum +"_"+maxNum+FILE_SUFFIX);
leftWriter = new BufferedWriter(new FileWriter(leftFile),CACHE_SIZE);
rightWriter = new BufferedWriter(new FileWriter(rightFile),CACHE_SIZE);
ChildFile leftChild = new ChildFile(leftFile);
ChildFile rightChild = new ChildFile(rightFile);
// 字符串 组合写入也行
// StringBuilder leftStr = new StringBuilder(100000);
// StringBuilder rightStr = new StringBuilder(100000);
// hashCode 的范围作为分割线
while ((line = reader.readLine()) != null) {
hashCode = line.hashCode();
if (hashCode > midNum) {
if(max_right_hashCode < hashCode || max_right_hashCode == 0){
max_right_hashCode = hashCode;
}else if(min_right_hashCode > hashCode || min_right_hashCode == 0){
min_right_hashCode = hashCode;
}
// 按行写入缓存
writeToFile(rightWriter, line);
}else {
if(max_left_hashCode < hashCode || max_left_hashCode == 0){
max_left_hashCode = hashCode;
}else if(min_left_hashCode > hashCode || min_left_hashCode == 0){
min_left_hashCode = hashCode;
}
writeToFile(leftWriter, line);
}
}
// 保存子文件信息
leftChild.setHashCode(min_left_hashCode, max_left_hashCode);
rightChild.setHashCode(min_right_hashCode, max_right_hashCode);
closeWriter(rightWriter);
closeWriter(leftWriter);
closeReader(reader);
// 删除原始文件,保留最原始的文件
if(!origFile.getName().equals("bigData.txt")){
origFile.delete();
}
// 分析子文件信息,是否写入或者迭代
analyseChildFile(rightChild, leftChild);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
// 分析子文件信息
public void analyseChildFile(ChildFile rightChild,ChildFile leftChild){
// 将分割后 还是大于内存的文件保存 继续分割
File rightFile = rightChild.getChildFile();
if(isSurpassFileSize(rightFile)){
bigChildFiles.add(rightChild);
}else if(rightFile.length()>0){
orderAndWriteToFiles(rightFile);
}
File leftFile = leftChild.getChildFile();
if(isSurpassFileSize(leftFile)){
bigChildFiles.add(leftChild);
}else if(leftFile.length()>0){
orderAndWriteToFiles(leftFile);
}
// 未超出直接内存排序,写入文件,超出继续分割,从末尾开始,不易栈深度溢出
if(bigChildFiles.size() > 0 ){
ChildFile e = bigChildFiles.get(bigChildFiles.size()-1);
bigChildFiles.remove(e);
// 迭代分割
partFile(e.getChildFile(), e.getMaxHashCode(), e.getMinHashCode());
}
}
// 将小文件读到内存排序除重复
public void orderAndWriteToFiles(File file){
BufferedReader reader = null;
String line = null;
BufferedWriter totalWriter = null;
StringBuilder sb = new StringBuilder(1000000);
try {
totalWriter = new BufferedWriter(new FileWriter(REQUST_FILE_NAME,true),CACHE_SIZE);
reader = new BufferedReader(new FileReader(file));
while((line = reader.readLine()) != null){
if(!fileLinesMap.containsKey(line)){
fileLinesMap.put(line, null);
sb.append(line+"\r\n");
//totalWriter.write(line+"\r\n");
}
}
totalWriter.write(sb.toString());
fileLinesMap.clear();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}finally{
closeReader(reader);
closeWriter(totalWriter);
// 删除子文件
file.delete();
}
}
// 判断该文件是否超过 内存限制
public boolean isSurpassFileSize(File file){
return FILE_LIMIT_SIZE < file.length();
}
// 将数据写入文件
public void writeToFile(BufferedWriter writer, String writeInfo) {
try {
writer.write(writeInfo+"\r\n");
} catch (IOException e) {
e.printStackTrace();
}
}
// 关闭流
public void closeReader(Reader reader) {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 关闭流
public void closeWriter(Writer writer) {
if (writer != null) {
try {
writer.flush();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 内部类,记录子文件信息
class ChildFile{
// 文件 和 内容 hash 分布
File childFile;
long maxHashCode;
long minHashCode;
public ChildFile(File childFile){
this.childFile = childFile;
}
public ChildFile(File childFile, long maxHashCode, long minHashCode) {
super();
this.childFile = childFile;
this.maxHashCode = maxHashCode;
this.minHashCode = minHashCode;
}
public File getChildFile() {
return childFile;
}
public void setChildFile(File childFile) {
this.childFile = childFile;
}
public long getMaxHashCode() {
return maxHashCode;
}
public void setMaxHashCode(long maxHashCode) {
this.maxHashCode = maxHashCode;
}
public long getMinHashCode() {
return minHashCode;
}
public void setMinHashCode(long minHashCode) {
this.minHashCode = minHashCode;
}
public void setHashCode(long minHashCode,long maxHashCode){
this.setMaxHashCode(maxHashCode);
this.setMinHashCode(minHashCode);
}
}
}
除去重复时间为:1228984 毫秒,20 多分钟
我的老电脑,总内存小,JVM 才几十M,大一些应该会快。
方法分析:
1.采用hashCode 范围迭代分割的方式,可以分割成内存可以容纳的小文件,然后完成功能
2.我们发现每次迭代,相当于重复读取里面的文件,然后再进行分割,这样浪费了很多时间,那么有没有更好的方式呢? 我们可以这样设计,假设我们知道文件的总大小,已经大概的行数,比如2G,1亿行,我们一开始就分配区间,在分配完全均匀的情况下,1亿行数据,最多占用1亿个空间,那么可以这样分配,用hashCode 的范围,也就是Integer的最大值和最小值进行模拟分配。分配范围 根据内存进行,比如:读取第一行的的hashCode 值为100,那么,我们可以分配到1-1000000,(这里以100W 为单位),也就是说只要hashCode 范围在这个区间的都分配到这里,同理,读到任何一个hashCode 值,除以单位(100W),就能找到你的区间,比如hasCode 是 2345678,那么就是200W-300W的区间。这里有些区间可能空的,就删除了,有些区间很多,就用上面的迭代,空间足够 就直接写入汇总文件。当然区间单位的颗粒度划分,根据内存和数据总量 自己弄,这样下来就会一次读取 ,就能尽量的分配均匀,就不会大量迭代读取浪费时间了。
3.我们发现分割的时候是直接写入,没有进行任何排序或者其他操作,如果我们在分割的时候保存一部分到集合内存,发现有重复的,就不写入分割文件了,如果写入量超过集合限制了,就清空集合,这样能保证单个小文件 一次达到除重复的效果,大文件部分除重复,减少子文件的数据量,如果重复数据较多,可以采用,极端的情况下完全不重复,那么集合会浪费你的空间,并且除重复的时候会浪费你的时间,这里自己对数据进行整体考虑。
4.这里内存的控制是我测试进行控制,用JDK 的方法进行内存监控,因为涉及到回收频率 以及时间上的问题,我没有动态的对集合进行监控,占了多少内存,还剩多少内存等等,可以尝试。
小结:
1.首先很感谢Jason.Hao 的的帮助,主动去实践,才会发现一些理论和实践的差距,更加体会深刻。
2.在程序设计,以及一些异常控制上没多做处理,仅仅测试一下理论功能。
3.这是根据hashCode 范围分离的方法,如果有其他方法或者建议,欢迎大家共同实践哦
最后我的新书spring 到了,兴奋啊。关于《spring in action 》 我大概看了一下,主要是介绍spring 的一些使用技巧,和一些常用功能,也就是说教你如何使用,使用方便。
关于《spring 核心技术内幕》 里面主要对spring 的一些架构设计进行分析,包括bean 加载, Ioc,AOP等,也就是说能加深你对源码的一些理解,稍微深一点。记得还有一本关于 精通AOP的书名字忘记了,有好的,欢迎分享,推荐哦。