Skip to content

Commit b5b1aa6

Browse files
committed
optimize code
1 parent 3836807 commit b5b1aa6

File tree

6 files changed

+88
-33
lines changed

6 files changed

+88
-33
lines changed

core/src/main/java/com/robin/core/base/util/ResourceConst.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ public class ResourceConst {
55
public static final String WORKINGPATHPARAM="output.workingPath";
66
public static final String USETMPFILETAG="output.usingTmpFiles";
77
public static final String BUCKETNAME="bucketName";
8+
public static final String ALLOWOFFHEAPKEY="allowOffHeapMemLimit";
9+
public static final Double ALLOWOUFHEAPMEMLIMIT=4000.0;
810

911
public enum IngestType {
1012
TYPE_HDFS(1L,"HDFS"),

hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/OrcFileIterator.java

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
package com.robin.comm.fileaccess.iterator;
22

33
import com.robin.comm.fileaccess.util.MockFileSystem;
4+
import com.robin.comm.utils.SysUtils;
45
import com.robin.core.base.util.Const;
5-
import com.robin.core.base.util.IOUtils;
66
import com.robin.core.base.util.ResourceConst;
77
import com.robin.core.fileaccess.iterator.AbstractFileIterator;
88
import com.robin.core.fileaccess.meta.DataCollectionMeta;
99
import com.robin.core.fileaccess.util.ResourceUtil;
1010
import com.robin.hadoop.hdfs.HDFSUtil;
1111
import org.apache.commons.io.FileUtils;
12+
import org.apache.flink.core.memory.MemorySegment;
13+
import org.apache.flink.core.memory.MemorySegmentFactory;
1214
import org.apache.hadoop.conf.Configuration;
1315
import org.apache.hadoop.fs.FileSystem;
1416
import org.apache.hadoop.fs.Path;
@@ -20,10 +22,12 @@
2022
import org.springframework.util.CollectionUtils;
2123
import org.springframework.util.ObjectUtils;
2224

23-
import java.io.ByteArrayOutputStream;
2425
import java.io.File;
2526
import java.io.IOException;
2627
import java.net.URL;
28+
import java.nio.ByteBuffer;
29+
import java.nio.channels.Channels;
30+
import java.nio.channels.ReadableByteChannel;
2731
import java.sql.Timestamp;
2832
import java.util.HashMap;
2933
import java.util.List;
@@ -35,13 +39,18 @@ public class OrcFileIterator extends AbstractFileIterator {
3539
private TypeDescription schema;
3640
private RecordReader rows ;
3741
private VectorizedRowBatch batch ;
38-
private List<String> fieldNames;
42+
private MemorySegment segment;
43+
private Double allowOffHeapDumpLimit= ResourceConst.ALLOWOUFHEAPMEMLIMIT;
44+
3945
public OrcFileIterator(){
4046
identifier= Const.FILEFORMATSTR.ORC.getValue();
4147
}
4248
public OrcFileIterator(DataCollectionMeta colmeta) {
4349
super(colmeta);
4450
identifier= Const.FILEFORMATSTR.ORC.getValue();
51+
if(!CollectionUtils.isEmpty(colmeta.getResourceCfgMap()) && colmeta.getResourceCfgMap().containsKey(ResourceConst.ALLOWOFFHEAPKEY)){
52+
allowOffHeapDumpLimit=Double.parseDouble(colmeta.getResourceCfgMap().get(ResourceConst.ALLOWOFFHEAPKEY).toString());
53+
}
4554
}
4655
private final Map<String,Object> valueMap=new HashMap<>();
4756
int maxRow=-1;
@@ -137,11 +146,18 @@ public void beforeProcess() {
137146
fs=FileSystem.get(new Configuration());
138147
readPath=new File(readPath).toURI().toString();
139148
}else {
140-
instream = accessUtil.getInResourceByStream(colmeta, ResourceUtil.getProcessPath(colmeta.getPath()));
141-
if (instream.available() < Integer.MAX_VALUE) {
142-
ByteArrayOutputStream byteout = new ByteArrayOutputStream();
143-
IOUtils.copyBytes(instream, byteout, 8064);
144-
fs = new MockFileSystem(conf, byteout.toByteArray());
149+
instream = accessUtil.getRawInputStream(colmeta, ResourceUtil.getProcessPath(colmeta.getPath()));
150+
long size = instream.available();
151+
Double freeMemory= SysUtils.getFreeMemory();
152+
if (size < Integer.MAX_VALUE && freeMemory>allowOffHeapDumpLimit) {
153+
//use flink memory utils to use offHeapMemory to dump file content
154+
segment= MemorySegmentFactory.allocateOffHeapUnsafeMemory((int)size,this,new Thread(){});
155+
ByteBuffer byteBuffer=segment.getOffHeapBuffer();
156+
try(ReadableByteChannel channel= Channels.newChannel(instream)) {
157+
org.apache.commons.io.IOUtils.readFully(channel, byteBuffer);
158+
byteBuffer.position(0);
159+
}
160+
fs=new MockFileSystem(conf,byteBuffer);
145161
} else {
146162
String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(colmeta);
147163
String tmpFilePath = "file:///" + tmpPath + ResourceUtil.getProcessFileName(colmeta.getPath());
@@ -154,7 +170,6 @@ public void beforeProcess() {
154170
}
155171
oreader =OrcFile.createReader(new Path(readPath),OrcFile.readerOptions(conf).filesystem(fs));
156172
schema= oreader.getSchema();
157-
fieldNames=schema.getFieldNames();
158173
rows= oreader.rows();
159174
fields=schema.getChildren();
160175
batch= oreader.getSchema().createRowBatch();
@@ -177,6 +192,9 @@ public void close() throws IOException {
177192
if(!ObjectUtils.isEmpty(oreader)){
178193
oreader.close();
179194
}
195+
if(!ObjectUtils.isEmpty(segment)){
196+
segment.free();
197+
}
180198
if(!ObjectUtils.isEmpty(tmpFile)){
181199
FileUtils.deleteQuietly(tmpFile);
182200
}

hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetFileIterator.java

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
import com.robin.comm.fileaccess.util.ByteBufferSeekableInputStream;
44
import com.robin.comm.fileaccess.util.ParquetUtil;
5+
import com.robin.comm.utils.SysUtils;
56
import com.robin.core.base.util.Const;
7+
import com.robin.core.base.util.ResourceConst;
68
import com.robin.core.fileaccess.iterator.AbstractFileIterator;
79
import com.robin.core.fileaccess.meta.DataCollectionMeta;
810
import com.robin.core.fileaccess.util.AvroUtils;
@@ -29,6 +31,7 @@
2931
import org.apache.parquet.schema.MessageType;
3032
import org.apache.parquet.schema.Type;
3133
import org.apache.slider.server.appmaster.management.Timestamp;
34+
import org.springframework.util.CollectionUtils;
3235
import org.springframework.util.ObjectUtils;
3336

3437
import java.io.File;
@@ -54,6 +57,7 @@ public class ParquetFileIterator extends AbstractFileIterator {
5457
private boolean useAvroEncode = false;
5558
private static final long maxSize = Integer.MAX_VALUE;
5659
private MemorySegment segment;
60+
private Double allowOffHeapDumpLimit= ResourceConst.ALLOWOUFHEAPMEMLIMIT;
5761

5862
public ParquetFileIterator() {
5963
identifier = Const.FILEFORMATSTR.PARQUET.getValue();
@@ -62,6 +66,9 @@ public ParquetFileIterator() {
6266
public ParquetFileIterator(DataCollectionMeta colmeta) {
6367
super(colmeta);
6468
identifier = Const.FILEFORMATSTR.PARQUET.getValue();
69+
if(!CollectionUtils.isEmpty(colmeta.getResourceCfgMap()) && colmeta.getResourceCfgMap().containsKey(ResourceConst.ALLOWOFFHEAPKEY)){
70+
allowOffHeapDumpLimit=Double.parseDouble(colmeta.getResourceCfgMap().get(ResourceConst.ALLOWOFFHEAPKEY).toString());
71+
}
6572
}
6673

6774
private List<Schema.Field> fields;
@@ -95,9 +102,10 @@ public void beforeProcess() {
95102
file = new LocalInputFile(Paths.get(colmeta.getPath()));
96103
} else {
97104
instream = accessUtil.getRawInputStream(colmeta, ResourceUtil.getProcessPath(colmeta.getPath()));
98-
long size = instream.available();//accessUtil.getInputStreamSize(colmeta, ResourceUtil.getProcessPath(colmeta.getPath()));
99-
//file size too large ,can not store in ByteArrayOutputStream
100-
if (size >= maxSize) {
105+
long size = instream.available();
106+
Double freeMemory= SysUtils.getFreeMemory();
107+
//file size too large ,can not store in ByteBuffer or freeMemory too low
108+
if (size >= maxSize || freeMemory<allowOffHeapDumpLimit) {
101109
String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(colmeta);
102110
String tmpFilePath = "file:///" + tmpPath + ResourceUtil.getProcessFileName(colmeta.getPath());
103111
tmpFile = new File(new URL(tmpFilePath).toURI());
@@ -107,15 +115,12 @@ public void beforeProcess() {
107115
//use flink memory utils to use offHeapMemory to dump file content
108116
segment= MemorySegmentFactory.allocateOffHeapUnsafeMemory((int)size,this,new Thread(){});
109117
ByteBuffer byteBuffer=segment.getOffHeapBuffer();
110-
ReadableByteChannel channel= Channels.newChannel(instream);
111-
IOUtils.readFully(channel,byteBuffer);
112-
byteBuffer.position(0);
113-
channel.close();
114-
ByteBufferSeekableInputStream seekableInputStream=new ByteBufferSeekableInputStream(byteBuffer);
115-
/*ByteArrayOutputStream byteout = new ByteArrayOutputStream((int) size);
116-
IOUtils.copy(instream, byteout, 4096);
117-
ByteArraySeekableInputStream seekableInputStream = new ByteArraySeekableInputStream(byteout.toByteArray());*/
118-
file = ParquetUtil.makeInputFile(seekableInputStream);
118+
try(ReadableByteChannel channel= Channels.newChannel(instream)) {
119+
IOUtils.readFully(channel, byteBuffer);
120+
byteBuffer.position(0);
121+
ByteBufferSeekableInputStream seekableInputStream = new ByteBufferSeekableInputStream(byteBuffer);
122+
file = ParquetUtil.makeInputFile(seekableInputStream);
123+
}
119124
}
120125
}
121126
getSchema(file,true);
@@ -135,12 +140,13 @@ public void beforeProcess() {
135140
private void getSchema(InputFile file,boolean seekFrist) throws IOException {
136141
if (colmeta.getColumnList().isEmpty()) {
137142
ParquetReadOptions options = ParquetReadOptions.builder().withMetadataFilter(ParquetMetadataConverter.NO_FILTER).build();
138-
ParquetFileReader ireader = ParquetFileReader.open(file, options);
139-
ParquetMetadata meta = ireader.getFooter();
140-
msgtype = meta.getFileMetaData().getSchema();
141-
parseSchemaByType();
142-
if(seekFrist) {
143-
file.newStream().seek(0L);
143+
try(ParquetFileReader ireader = ParquetFileReader.open(file, options)) {
144+
ParquetMetadata meta = ireader.getFooter();
145+
msgtype = meta.getFileMetaData().getSchema();
146+
parseSchemaByType();
147+
if (seekFrist) {
148+
file.newStream().seek(0L);
149+
}
144150
}
145151
} else {
146152
schema = AvroUtils.getSchemaFromMeta(colmeta);

hadooptool/src/main/java/com/robin/comm/fileaccess/util/MockFileSystem.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,20 @@
55
import org.apache.hadoop.fs.*;
66
import org.apache.hadoop.fs.permission.FsPermission;
77
import org.apache.hadoop.util.Progressable;
8+
import org.springframework.util.ObjectUtils;
89

910
import java.io.FileNotFoundException;
1011
import java.io.IOException;
1112
import java.io.OutputStream;
1213
import java.net.URI;
1314
import java.net.URISyntaxException;
15+
import java.nio.ByteBuffer;
1416
import java.util.ArrayList;
1517
import java.util.List;
1618

1719
//mock filesystem support parquet and orc file read from outside of hadoop filesystem
1820
public class MockFileSystem extends FileSystem {
19-
byte[] streamBytes;
21+
private ByteBuffer byteBuffer;
2022
final List<MockInputStream> streams = new ArrayList<>();
2123
OutputStream outputStream;
2224
DataCollectionMeta colmeta;
@@ -29,9 +31,9 @@ public URI getUri() {
2931
throw new IllegalArgumentException("bad uri", e);
3032
}
3133
}
32-
public MockFileSystem(Configuration conf, byte[] streamBytes) {
34+
public MockFileSystem(Configuration conf,ByteBuffer byteBuffer){
3335
setConf(conf);
34-
this.streamBytes = streamBytes;
36+
this.byteBuffer=byteBuffer;
3537
}
3638
public MockFileSystem(DataCollectionMeta colmeta,OutputStream outputStream) {
3739
this.outputStream=outputStream;
@@ -40,14 +42,14 @@ public MockFileSystem(DataCollectionMeta colmeta,OutputStream outputStream) {
4042

4143
@Override
4244
public FSDataInputStream open(Path f) throws IOException {
43-
MockInputStream result = new MockInputStream(this, streamBytes);
45+
MockInputStream result =new MockInputStream(this,byteBuffer);
4446
streams.add(result);
4547
return result;
4648
}
4749

4850
@Override
4951
public FSDataInputStream open(Path path, int i) throws IOException {
50-
MockInputStream result = new MockInputStream(this, streamBytes);
52+
MockInputStream result =new MockInputStream(this,byteBuffer);
5153
streams.add(result);
5254
return result;
5355
}
@@ -80,7 +82,7 @@ public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOExcept
8082

8183
@Override
8284
public FileStatus getFileStatus(Path path) throws IOException {
83-
return new FileStatus(streamBytes.length, false, 1, 4096, 0, path);
85+
return new FileStatus(byteBuffer.capacity(), false, 1, 4096, 0, path);
8486
}
8587
void removeStream(MockInputStream stream) {
8688
streams.remove(stream);

hadooptool/src/main/java/com/robin/comm/fileaccess/util/MockInputStream.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import java.io.IOException;
66
import java.io.InputStream;
7+
import java.nio.ByteBuffer;
78

89
public class MockInputStream extends FSDataInputStream {
910
MockFileSystem fs;
@@ -12,6 +13,10 @@ public MockInputStream(MockFileSystem fs, byte[] streamBytes) throws IOException
1213
super(new ByteArraySeekableInputStream(streamBytes));
1314
this.fs = fs;
1415
}
16+
public MockInputStream(MockFileSystem fs, ByteBuffer byteBuffer){
17+
super(new ByteBufferSeekableInputStream(byteBuffer));
18+
this.fs=fs;
19+
}
1520

1621
public MockInputStream(InputStream in) {
1722
super(in);
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.robin.comm.utils;
2+
3+
import java.lang.management.ManagementFactory;
4+
import com.sun.management.OperatingSystemMXBean;
5+
import java.text.DecimalFormat;
6+
7+
public class SysUtils {
8+
private static OperatingSystemMXBean systemMXBean=(OperatingSystemMXBean)ManagementFactory.getOperatingSystemMXBean();
9+
private static DecimalFormat format=new DecimalFormat("#.##");
10+
private SysUtils(){
11+
}
12+
public static Double getFreeMemory(){
13+
return systemMXBean.getFreePhysicalMemorySize()/1024.0/1024;
14+
}
15+
public static Double getTotalMemory(){
16+
return systemMXBean.getTotalPhysicalMemorySize()/1024.0/1024;
17+
}
18+
public static void main(String[] args){
19+
System.out.println(SysUtils.getFreeMemory());
20+
System.out.println(SysUtils.getTotalMemory());
21+
}
22+
}

0 commit comments

Comments
 (0)