一、题目
生成10亿个long随机数正整数,把它写入一个文件里。然后实现一个函数 fetch(int k,int n)。(fetch函数的输出结果是这10亿个正整数中从小到大中第k个开始(不包含第k个)往后取n个数。)
给定内存为1G(可为2G)。
二、题目分析
(1)首先生成10亿个long随机正整数,可考虑使用ThreadLocalRandom和多线程生成随机数。由于全部数据内存占用10几G,需要分批写入文件。(一个数据一行,行末为\n)
(2)fetch函数的实现:
1.先对随机数进行外部排序。由于随机数文件较大,无法一次性读取全部数据进行排序,所以必须对随机数文件进行分割成多个完成数据排序的小文件,然后通过多路归并实现外部排序。
2.然后实现fetch函数,输出结果
因此本文主要针对外部排序的文件分割部分进行说明,至于多路归并和fetch函数的实现本文暂不开展。
三、程序设计
本文主要针对大文件切割的程序设计进行分析。 (为了方便描述,后文将大文件切割分为read、parse、sort、write四个工序来描述)
设计思路:通过BufferedReader的readLine()方法读取每一行数据为String(read),通过parseLong()将String转换为long(parse),存放在一个long[]数组里。当装满long[]时通过Araay.sort()排序(sort),将排序好的long[]按行输出(write)。(long[]大小自行设定)通过多次循环操作实现大文件分割。为了加快效率,我用一个线程执行read、parse,另一个线程执行sort、write,两线程间用BlockingQueue交流数据。
由于内存开销很大,而且由于过大的内存开销,很容易就堆满了,且毫无效率可言,所以必须优化。优化思路是通过duox多线程进行read,一个线程处理parse、sort、write**。**如图:
read部分:使用readLine()10亿个数据要读10亿次,因此考虑采用RandomAccessFile和多线程结合进行读取,根据偏移量进行分次读取,每次读取32M(这个量是比较快而且不容易出现堆满的)。读取的字节数据存放在byte[]数据里,这时会出现一个新问题:每次读取的数据的末尾不一定是以“\n”结束,那么必定有个随机数被分割了!
由于采用多线程进行IO读操作,因此为了解决随机数被分割问题费了点心思。
主要思路:每完成一次read(b, 0, length)之后,往后继续read()一个byte,直到遇到第一个10;同时还要判断每次read起始部分是否为完整的一个随机数,从byte[0]开始判断直到遇见第一个10。注意临界条件:第一组数据和最后一组数据的处理。这样才能在parse的时候数据时完整的。由于代码不小心删了,这里就提供一个思路。
write部分:将long数据用BufferWriter按行写出为字符,这样的效率比较低,且占用内存较多。后来考虑到分割的文件是临时文件,fetch函数使用完之后就删除了。所以考虑使用DataOutputStream包装BufferOutputStream输出为一个个8字节的long,这样减少了一半以上的文件大小,且能提高输出效率。(ps:这算是一个不错的想法)
历经千辛万苦,跑了20几分钟才分割完数据,还是太慢了。
反思:方案一有许多不足之处。
采用多线程进行I/O操作并不一定会提高效率,有时反而会影响效率。因为一个磁盘一个时间段内只能进行一个I/O操作,如果通过多线程进行I/O操作,可能造成每次I/O是磁头寻道的偏移量较大,也就是寻道时间长,反而增加了I/O时间。
其次parse部分。将byte转换为String,每次新的一个String都会占用常量池。为了避免使用String,需直接将byte[]数据转换为long。于是乎想到了迭代计算,同时参考parseLong()的源码,进行优化。
优化:
如果想要提高效率,多线程的使用时必须的,那么如何使用多线程很关键。后经高人点播:既然大文件分割分为read、parse、sort、write四个部分,而且电脑是四核(二核四线程),那么一个部分用一个线程进行操作,形成一条流水线,流水线上的数据通过BlockingQueue来传递,这样可以提高CPU的利用率。(这个流水线模式是确定的,因此后文按照不同工序的优化过程来描述)
read部分: 这里将每次读取的数据通过BlockingQueue直接传递给parse线程(后续parse部分给出解决随机数分割的问题的方法)。为了减少写出文件的数量,我尽可能的将spiltSize设置大(实际上这个方式并没有充分利用流水线模式)。由于每次read时间较长,后置的线程会先处于阻塞状态。
由于缺乏对磁盘IO的理解,我局限的认为一次性读取的数据越大(取32M),减少I/O次数而提高效率,同时我又想保证每次写出的数据也越多越好,这样也可以减少后续归并的路数。因此将每次读取文件大小尽可能调大。
通过对磁盘IO的了解:影响磁盘的关键因数是磁盘服务时间,即磁盘完成一个I/O请求所花费的时间,它由寻道时间、旋转延迟和数据传输时间三部分构成。其中寻道时间、旋转延迟是占主要的,数据传输时间可以忽略。由于磁盘上每个扇区512byte,而操作系统的文件系统不是一个扇区一个扇区的来读数据,所以有了block(块)的概念,它是一个块一个块的读取的,块(block)是基本的数据传输单元(一般的操作系统block size为4k)。那么在磁盘上的同样存储位置,JAVA进行1024次4k的IO请求和进行1次4M的IO请求,在磁盘服务时间应该是差不多的(不知道这么理解对不对,如有不对之处请指出)。那么减少每次read的大小(保证为block size 的整数倍),保证流水线一直处于运行状态,提高CPU的利用率。
ps:操作系统层对于IO的影响这边暂不考虑了。
针对这个问题做了优化。
(1)使用FileChannel包装(FileInputStream)来进行读文件,每次读取文件大小spiltSize为8k。(由于电脑较差,每次运行结果波动很大,不能判断哪种spiltSize取多少最好,经测试8k比4k好,16k比8k略差。这个地方有待继续论证,但不影响整体的设计思路。FileChannel包装也可以包装RandomAccessFile,还没测试出FileInputStream和RandomAccessFile哪个好,有高手可以指出)
(2)一开始根据10亿随机文件的大小来划分总共read的循环次数,考虑的源文件来源可能是网络传输,并不能识别源文件实际大小,无法判断实际分割的小文件的个数,因此取消了
1 | 1` |
for(int i=0;i<spiltNum;i++)
1 | 1` |
来判定循环,而改用
1 | 1` |
while(true)
1 | 1` |
的死循环,直到源文件读取完毕退出,同时向下游发送一个size=0的数组,作为结束的标志,这种处理方式提高了程序的健壮性。(值得推广)
代码优化如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 1//方案二
2public void run() {
3 /*
4 *这里也可以包装RandomAccessFile
5 *采用try-with-resource使代码更简洁
6 */
7 try(
8 FileInputStream fis = new FileInputStream(file);
9
10 FileChannel fc = fis.getChannel();
11 ){
12 ByteBuffer bb= ByteBuffer.allocate(spiltSize);//spiltSize
13 long startPosition ;
14 int read;
15 int k=0;
16 byte[] temp;
17 while(true) {
18 startPosition =(long)k*(long)spiltSize;
19 read=fc.read(bb, startPosition);
20 if(read!=-1) {
21 temp =new byte[read];
22 bb.flip();
23 bb.get(temp);
24 bb.clear();
25 bq.put(temp);
26 k++;
27 }else {
28 bq.put(new byte[0]);
29 break;
30 }
31 }
32 }catch(IOException e) {
33 e.printStackTrace();
34 }catch(InterruptedException e1) {
35 e1.printStackTrace();
36 }
37 }
38
** parse部分**:将byte[]数组直接转换为long。借鉴parseLong()方法也是直接将byte[]转化为long,我通过迭代计算将byte[]数组转换为long,同时用位运算计算乘法更快。
一开始由于不知道byte[]中可以parse出多少long数据,无法声明一个固定大小的数组,所以我这里考虑用List,这样增加的拆装与包装的过程。在read部分优化之后,考虑到每次读取数据为8k,需要积累一定数量的long数据再输出比较好好,所以通过自定义一个固定大小的long[]数组来存储parse后的long型数据。这样可以避免使用LIst<Long>,一定程度减小了内存开销和提高了性能。
随机数切割问题我们可以用 long l=0L;和l=(l<<3)+(l<<1)+(b[j]-'0');利用循环计算出一个long(详见代码)。之前我采用多线程进行读操作的方式,所以无法保证read是按顺序进行,必须考虑随机数切割问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 1 //方案二,方案一使用List,这里不给出代码
2 class Parse implements Runnable{
3 private BlockingQueue<byte[]> bq;
4 private BlockingQueue<long[]> bq1;
5 private int num ;//自定义存储long数据的个数
6 Parse(BlockingQueue<byte[]> bq,BlockingQueue<long[]> bq1,int num){
7 this.bq=bq;
8 this.bq1=bq1;
9 this.num=num;
10 }
11 public void run() {
12 byte[] b;
13 int count=0;
14 long l=0L;//byte[]转为long型数据,临时
15 long[] temp =new long[num];
16 try {
17 while(true) {
18 b=bq.take();
19 if(b.length==0) {
20 long[] temp2= new long[count];
21 System.arraycopy(temp, 0,temp2,0,count);
22 bq1.put(temp2);
23 bq1.put(new long[0]);
24 break;
25 }else {
26 for(int j=0,len=b.length;j<len;j++) {
27 if(b[j]!='\n') {//byte[]转为long
28 //l=l*10+(long)(b[j]-48);优化前
29 l=(l<<3)+(l<<1)+(b[j]-'0');//优化后
30 }else{
31 temp[count]=l;
32 count++;
33 l=0L;
34 if(count==num) {
35 bq1.put(temp);
36 count=0;
37 temp =new long[num];
38 }
39 }
40 }
41 }
42 }
43 } catch (InterruptedException e) {
44 // TODO Auto-generated catch block
45 e.printStackTrace();
46 }
47 }
48
sort部分:这个部分没啥问题,sort部分是4个部分中运行速度最快的,直接给代码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 1//方案二
2 class Sort implements Runnable{
3 private BlockingQueue<long[]> bq1;
4 private BlockingQueue<long[]> bq2;
5 Sort(BlockingQueue<long[]> bq1,BlockingQueue<long[]> bq2){
6 this.bq1=bq1;
7 this.bq2=bq2;
8 }
9 public void run() {
10 long[] longs;
11 try {
12 while(true) {
13 longs=bq1.take();
14 if(longs.length==0) {
15 bq2.put(new long[0]);
16 break;
17 }else {
18 Arrays.sort(longs);
19 bq2.put(longs);
20 }
21 }
22 } catch (InterruptedException e) {
23 // TODO Auto-generated catch block
24 e.printStackTrace();
25 }
26 }
27}
28
write部分:方案一write部分的写操作是通过遍历一个个写出long,效率比较低,造成一定的堵塞状态,需考虑其他IO方式,一次性写出较多的数据。后通过FileChannel 和 ByteBuffer写数据,具体使用方法可以上网查阅NIO的API。以缓冲块的方式写出数据比一个一个写明显快多了。(还是要注意临界问题)因为ByteBuffer的大小是固定的,所以put()之后,BufferBuffer的limit=capacity,因为最后一次的数据可能小于capacity,因此得重新设置limit,保证write出的数据正确。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 1 //方案一
2 /*
3 long[] l=bq2.take();
4 DataOutputStream dos = new DataOutputStream(new BufferedOutputStream (new FileOutputStream(new File(tempFolder.getAbsolutePath()+"\\文件"+i+".txt"))));
5 for(long a:l) {
6 dos.writeLong(a);
7 }
8 dos.close();
9 */
10 //方案二
11 class WriteData implements Runnable{
12 private BlockingQueue<long[]> bq2;
13 private File tempFolder;//临时文件夹
14 private CountDownLatch end;//告诉主线程可以继续运行,主要用于计时
15 private int num ;//自定义存储long数据的个数
16 private long[] longs;
17 private int count =0;
18 WriteData(BlockingQueue<long[]> bq2,File tempFolder,CountDownLatch end,int num){
19 this.bq2=bq2;
20 this.end=end;
21 this.tempFolder=tempFolder;
22 this.num=num;
23 }
24 public void run() {
25 try {
26 ByteBuffer bb= ByteBuffer.allocate(num*8);
27 while(true) {
28 longs=bq2.take();
29 bb.asLongBuffer().put(longs);
30 if(longs.length==num) {
31 try(
32 FileOutputStream fos = new FileOutputStream(new File(tempFolder.getAbsolutePath()+"\\文件"+count+".txt"));
33 FileChannel fc = fos.getChannel();
34 ){
35 fc.write(bb);
36 bb.clear();
37 count++;
38 }catch (IOException e) {
39 e.printStackTrace();
40 }
41 }else if(longs.length!=0) {
42 bb.limit(longs.length*8);
43 try(
44 FileOutputStream fos = new FileOutputStream(new File(tempFolder.getAbsolutePath()+"\\文件"+count+".txt"));
45 FileChannel fc = fos.getChannel();
46 ){
47 fc.write(bb);
48 bb.clear();
49 }catch (IOException e) {
50 e.printStackTrace();
51 }
52 }else {
53 end.countDown();
54 break;
55 }
56 }
57 }catch (Exception e) {
58 // TODO Auto-generated catch block
59 e.printStackTrace();
60 }
61 }
62}
63
总结:
保证read和write能够不间断的占用磁盘活动,提高IO速度是提高程序性能的关键。通过监控各个阶段的BlockingQueue阻塞情况得出read<parse>sort<write(>表示前者单次循环用时大于后者,<反之)。由于parse部分消耗时间比较多,read的部分可能存在堵塞,需要合理设置BlockingQueue的大小,保证read不会堵塞。由于read操作比write操作频繁,两线程会同时争抢磁盘IO操作,在系统层面调度可能造成不能及时write,导致数据不能及时写出,这样会导致内存开销增加。我考虑是不是利用锁或者同一个线程来控制IO操作,保证write能及时将数据写出。
同时方案二中还存在内存开销较大的情况,GC次数较多,影响IO效率。如:循环中不断new byte[]和new long[]。这里需要考虑内存复用,减少内存开销。这对整体的性能有很大的影响。除了优化GC次数外,过大的内存开销也会降低磁盘IO的速率(原因本人暂不了解,有高人可以指点一下),因此内存复用,降低内存开销是必须的。 这里其实还可以尝试优化一下parse和sort,平衡parse和sort的耗时,保证流水线更加流畅。理论上read和write的总用时为这个程序最快速度,那么剩下的就是提高IO速度了。(也就是IO为这个程序的瓶颈)
内存复用以及在应用层控制IO:
首先是内存复用问题。由于开始时只顾着实现程序,没有考虑内存开销问题,大量的new来创建数组。虽然程序能够正确运行,但是由于内存开销不断增大,整个程序的性能下降。
解决思路:将数组比作盘子,数据比作食物。如果使用new的方式创建盘子(数组),食物被吃完了盘子就等GC把它当垃圾回收了。不断创建和销毁盘子是消耗性能的。因此将被清洗干净的盘子重新利用起来保证内存复用是很重要的。具体怎么实现呢?我的思路是先分析某一种盘子(如byte[]数组)在程序运行中同一时刻最大的使用量,盘子(如byte[]数组)的最大使用量为BlockingQueue中的数量加上线程中处理的数量。(这样的话需要对BlockingQueue的大小进行限制,这需要根据程序实际运行情况来设置大小。)因此一开始创建最大需求量的盘子,通过循环使用来减少内存开销。
具体实现如下,以read部分代码为例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 1class ReadData implements Runnable{
2 private File file;
3 private BlockingQueue<byte[]> bq;
4 private int spiltSize;
5 private LinkedList<byte[]> list;//装盘子的容器
6 ReadData(File file,BlockingQueue<byte[]> bq,int spiltSize){
7 this.spiltSize=spiltSize;
8 this.file=file;
9 this.bq=bq;
10 }
11 /*
12 *init()初始化byte[]数组并存在LinkedList里面
13 *而后通过temp =list.remove()从头取数组和list.add(temp)从尾加数组
14 *这种的方式保证List里的盘子总数量不变,且第一个被使用的数组内的数据处理完之前不会复用这个数组
15 *其实也可以通过get(index)的方式循环取数组,但是这种方式计算开销会略大一点
16 */
17 public void init() {//初始化盘子
18 list = new LinkedList<byte[]>();
19 for(int i=0;i<30;i++) {
20 byte[] b = new byte[spiltSize];
21 list.add(b);
22 }
23 }
24 public void run() {
25 try(
26 RandomAccessFile raf = new RandomAccessFile(file,"r");
27 FileChannel fc = raf.getChannel();
28 ){
29 ByteBuffer bb= ByteBuffer.allocate(spiltSize);
30 long startPosition ;
31 int read;
32 int k=0;
33 byte[] temp;
34 while(true) {
35 startPosition =(long)k*(long)spiltSize;
36 read=fc.read(bb, startPosition);
37 if(read!=spiltSize) {//最后的临界情况单独考虑
38 temp =new byte[read];
39 bb.flip();
40 bb.get(temp);
41 bb.clear();
42 bq.put(temp);
43 bq.put(new byte[0]);
44 break;
45 }else {
46 temp =list.remove();//取盘子
47 bb.flip();
48 bb.get(temp);
49 bb.clear();
50 bq.put(temp);
51 k++;
52 list.add(temp); //将盘子放在队尾,等待复用
53 }
54 }
55 }catch(IOException e) {
56 e.printStackTrace();
57 }catch(InterruptedException e1) {
58 e1.printStackTrace();
59 }
60 }
61}
62
这里有个不足之处,需要根据程序的运行情况,手动设置一个合理的总盘子数。可能需要一个更灵活的方法。
应用层控制IO
由于windows系统层对IO的调度存在不确定性,write进程的IO请求有时候可能会滞后,为保证write进程的数据能及时写出,考虑采用synchronized方式对read()和write()进行上锁(切记保证正确性的情况下,synchronized代码块越小越好)。
1 | 1` |
synchronized(lock){read=fc.read(bb, startPosition);}
1 | 1` |
和
1 | 1` |
synchronized(lock){fc.write(bb);}
1 | 1` |
。运行的结果与不用锁的情况(优化内存开销后的方案二)比起来略好一点或者说更稳定一些,这可能是硬件和操作系统的问题。按理说增加获取锁的开销,性能应该差一些。(这里称之为****双线程有锁控制IO**和**双线程无锁控制IO**,这里是读和写线程分开)
另一种方式只用一个线程来控制所有IO操作。
一种想法是通过**单线程无脑执行IO操作****。每一次的read和write作为任务投递给一个线程执行,用一个BlockingQueue装着。这样线程拿到任务就无脑执行了。当然这里还有一个问题:因为投递任务的速度很快,而且read任务和write任务不是一个数量级(20000:1),会造成write任务被read任务阻塞情况出现。由于BlockingQueue一致处于阻塞状态,执行一个任务加一个任务,write任务一定正常执行,所以性能上并没有什么大的影响(总不至于20000次put的都是read任务吧,这概率得多小)。还有一个问题就是程序结束问题!因为添加read任务的人(主线程)不知道read任务的总数是多少,这时候会put无效的read任务,这里只能通过read任务的执行情况的反馈来告知主线程什么时候停止任务。
1
2
3
4
5
6
7
8
9
10 1BlockingQueue<Runnable> bq3 = new LinkedBlockingQueue<Runnable>(200);//这queue的大小可以调整
2while(true) {
3 if(rd.read!=-1) {//源文件读完后表示读任务结束
4 bq3.put(rd);
5 else {
6 bq3.remove(rd);//将多余的无效任务删除
7 break;
8 }
9}
10
当sort完成后投递write任务,由最后一次write任务让程序结束运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14 1 while(true) {
2 longs=bq1.take();
3 if(longs.length==0) {//没有数据时
4 bq2.put(new long[0]);
5 bq3.put(wd);
6 break;
7 }else {
8 Arrays.sort(longs);
9 bq2.put(longs);
10 bq3.put(wd);
11 }
12 }
13 }
14
还有一种****单线程有序执行IO任务****,就是执行任务的线程一边执行一边一天添加任务。因为只有执行任务的线程才能第一时间直到任务啥时候结束。当然效率上肯定比无脑执行任务那种方式差一点。(这里用类内部的方法来模拟任务)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 1public void run() {
2 try (
3 RandomAccessFile raf = new RandomAccessFile(file,"r");
4 FileChannel fc = raf.getChannel();
5 ){
6 while(flag) { //flag初始化为true,最后一次写结束后flag=false
7 if(read!=-1) {
8 readData(fc);
9 if(Rcount%1000==0&&bq2.size()>0) {//循环一定次数后再判断bq2中是否有数据需要写出
10 writeData();
11 }
12 }else {
13 writeData();
14 }
15 }
16 } catch (Exception e) {
17 // TODO Auto-generated catch block
18 e.printStackTrace();
19 }
20 }
21
通过对以上几种IO控制的方案进行对比(**双线程有锁控制IO**、**双线程无锁控制IO**、**单线程无脑执行IO操作**、**单线程有序执行IO任务**),其中****单线程无脑执行IO操作****运行出最快333s,平均IO速率将近80M/每秒。(读文件大小18.5G,写文件总大小7.45G)。由于电脑硬件老化的缘故,每次执行的结果相差较大,出现几十秒的差距。因此从代码层面来讲,**双线程无锁控制IO**性能也是可以的,只是将IO调度的控制交由操作系统处理了。不过由于执行IO的调度方式不同,导致他们的运行时内存占用也不同。(后续进行更充分的测试补充结论)
总结:
JAVA不同的IO类有不同的特点,根据不同情况进行合理选择,提高IO速度。在对IO进行优化时,需理解磁盘的物理结构和工作原理,避免走入误区。(一开始没找到问题根源所在,乱用多线程优化,浪费了大量的时间)每次读取数据大块文件时,以4k的整数倍比较好。
充分利用多线程,提高CPU利用率,通过多线程进行异步操作,就像“烧开水泡茶”一样。
在对数据的处理时,应该尽量减少内存的开支。new一个对象是比较消耗性能的,应该尽量复用内存,减少GC次数。务必避免大量使用String,这个很占内存。有些计算使用位运算更快。总之就是减少计算,减少内存开支,当然正确性是前提。
在保证正确性的前提下,提高程序的健壮性和可读性也很重要。同时,设计程序首先想着不是如何修改功能,而是如何扩展功能。
不足之处:
parse的耗时较长,sort的耗时较小,如果将parse部分的计算量匀一部分到sort这样可以提高CPU的利用率,减少堵塞情况。
内存复用的方式不够好,应该设计一个更灵活的方法。
IO是否还有提升空间?尝试使用内存映射的方式进行写操作,但是由于写文件总大小较大,内存占用(堆外内存)上升,IO性能下降。
程序中BlockingQueue的大小设置、每次读写文件的大小对程序性能的影响还没有明确的一个结论。对于后续的多路归并,文件数是越少越好,所以这个地方还需要研究。
本人作为刚入门的新人,第一次写博客,不足之处还望各位指正。希望各位大佬指点指点