Skip to content

Commit 3836807

Browse files
committed
use OffHeap memory to store temporary parquet and orc files to avoid GC
1 parent ed5b791 commit 3836807

File tree

9 files changed

+274
-33
lines changed

9 files changed

+274
-33
lines changed

hadooptool/pom.xml

Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,18 @@
1414
<properties>
1515
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
1616
<aws.java.sdk.version>2.20.43</aws.java.sdk.version>
17+
<flink.version>1.20.0</flink.version>
18+
<cos.version>5.6.24</cos.version>
19+
<bos.version></bos.version>
20+
<cos.version>5.6.24</cos.version>
21+
<obs.version>3.23.9</obs.version>
22+
<qiniu.version>[7.16.0, 7.16.99]</qiniu.version>
23+
<oss.version>3.17.0</oss.version>
24+
<bce.version>0.10.353</bce.version>
25+
<cassandra.version>3.11.12</cassandra.version>
26+
<flink.version>1.20.0</flink.version>
1727
</properties>
18-
<dependencyManagement>
19-
<dependencies>
20-
<dependency>
21-
<groupId>software.amazon.awssdk</groupId>
22-
<artifactId>bom</artifactId>
23-
<version>${aws.java.sdk.version}</version>
24-
<type>pom</type>
25-
<scope>import</scope>
26-
</dependency>
27-
</dependencies>
28-
</dependencyManagement>
28+
2929

3030
<dependencies>
3131
<dependency>
@@ -128,6 +128,7 @@
128128
<dependency>
129129
<groupId>junit</groupId>
130130
<artifactId>junit</artifactId>
131+
<optional>true</optional>
131132
</dependency>
132133

133134
<dependency>
@@ -282,17 +283,20 @@
282283
<dependency>
283284
<groupId>com.qcloud</groupId>
284285
<artifactId>cos_api</artifactId>
285-
<version>5.6.24</version>
286+
<version>${cos.version}</version>
287+
<optional>true</optional>
286288
</dependency>
287289
<dependency>
288290
<groupId>com.aliyun.oss</groupId>
289291
<artifactId>aliyun-sdk-oss</artifactId>
290-
<version>3.17.0</version>
292+
<version>${oss.version}</version>
293+
<optional>true</optional>
291294
</dependency>
292295
<dependency>
293296
<groupId>com.huaweicloud</groupId>
294297
<artifactId>esdk-obs-java-bundle</artifactId>
295-
<version>3.23.9</version>
298+
<version>${obs.version}</version>
299+
<optional>true</optional>
296300
<exclusions>
297301
<exclusion>
298302
<groupId>org.apache.logging.log4j</groupId>
@@ -307,6 +311,8 @@
307311
<dependency>
308312
<groupId>software.amazon.awssdk</groupId>
309313
<artifactId>s3</artifactId>
314+
<version>${aws.java.sdk.version}</version>
315+
<optional>true</optional>
310316
<exclusions>
311317
<exclusion>
312318
<groupId>software.amazon.awssdk</groupId>
@@ -321,14 +327,20 @@
321327
<dependency>
322328
<groupId>software.amazon.awssdk</groupId>
323329
<artifactId>sso</artifactId>
330+
<version>${aws.java.sdk.version}</version>
331+
<optional>true</optional>
324332
</dependency>
325333
<dependency>
326334
<groupId>software.amazon.awssdk</groupId>
327335
<artifactId>ssooidc</artifactId>
336+
<version>${aws.java.sdk.version}</version>
337+
<optional>true</optional>
328338
</dependency>
329339
<dependency>
330340
<groupId>software.amazon.awssdk</groupId>
331341
<artifactId>apache-client</artifactId>
342+
<version>${aws.java.sdk.version}</version>
343+
<optional>true</optional>
332344
<exclusions>
333345
<exclusion>
334346
<groupId>commons-logging</groupId>
@@ -339,13 +351,14 @@
339351
<dependency>
340352
<groupId>com.qiniu</groupId>
341353
<artifactId>qiniu-java-sdk</artifactId>
342-
<version>[7.16.0, 7.16.99]</version>
354+
<version>${qiniu.version}</version>
343355
<optional>true</optional>
344356
</dependency>
345357
<dependency>
346358
<groupId>com.baidubce</groupId>
347359
<artifactId>bce-java-sdk</artifactId>
348-
<version>0.10.353</version>
360+
<version>${bce.version}</version>
361+
<optional>true</optional>
349362
<exclusions>
350363
<exclusion>
351364
<groupId>org.eclipse.paho</groupId>
@@ -372,6 +385,12 @@
372385
<optional>true</optional>
373386
</dependency>
374387
<!-- s3 import -->
388+
<dependency>
389+
<groupId>org.apache.flink</groupId>
390+
<artifactId>flink-core</artifactId>
391+
<version>${flink.version}</version>
392+
<optional>true</optional>
393+
</dependency>
375394

376395

377396
</dependencies>

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/AbstractCloudStorageFileSystemAccessor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ protected InputStream getInputStreamByConfig(DataCollectionMeta meta) {
6969
protected String getBucketName(DataCollectionMeta meta) {
7070
return !ObjectUtils.isEmpty(bucketName)?bucketName:meta.getResourceCfgMap().get(ResourceConst.BUCKETNAME).toString();
7171
}
72+
73+
/**
74+
* Cloud storage now only support ingest InputStream ,So use ByteArrayOutputStream or use temp file to store data temporary
75+
* @param meta
76+
* @return
77+
* @throws IOException
78+
*/
7279
protected OutputStream getOutputStream(DataCollectionMeta meta) throws IOException {
7380
OutputStream outputStream;
7481
if(!ObjectUtils.isEmpty(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG)) && "true".equalsIgnoreCase(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG).toString())){

hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetFileIterator.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
package com.robin.comm.fileaccess.iterator;
22

3+
import com.robin.comm.fileaccess.util.ByteBufferSeekableInputStream;
34
import com.robin.comm.fileaccess.util.ParquetUtil;
4-
import com.robin.comm.fileaccess.util.SeekableInputStream;
55
import com.robin.core.base.util.Const;
6-
import com.robin.core.base.util.IOUtils;
76
import com.robin.core.fileaccess.iterator.AbstractFileIterator;
87
import com.robin.core.fileaccess.meta.DataCollectionMeta;
98
import com.robin.core.fileaccess.util.AvroUtils;
@@ -13,6 +12,9 @@
1312
import org.apache.avro.Schema;
1413
import org.apache.avro.generic.GenericData;
1514
import org.apache.commons.io.FileUtils;
15+
import org.apache.commons.io.IOUtils;
16+
import org.apache.flink.core.memory.MemorySegment;
17+
import org.apache.flink.core.memory.MemorySegmentFactory;
1618
import org.apache.hadoop.conf.Configuration;
1719
import org.apache.hadoop.fs.Path;
1820
import org.apache.parquet.ParquetReadOptions;
@@ -22,19 +24,20 @@
2224
import org.apache.parquet.hadoop.ParquetReader;
2325
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
2426
import org.apache.parquet.hadoop.util.HadoopInputFile;
25-
import org.apache.parquet.hadoop.util.HadoopStreams;
2627
import org.apache.parquet.io.InputFile;
2728
import org.apache.parquet.io.LocalInputFile;
2829
import org.apache.parquet.schema.MessageType;
2930
import org.apache.parquet.schema.Type;
3031
import org.apache.slider.server.appmaster.management.Timestamp;
3132
import org.springframework.util.ObjectUtils;
3233

33-
import java.io.ByteArrayOutputStream;
3434
import java.io.File;
3535
import java.io.IOException;
3636
import java.net.URI;
3737
import java.net.URL;
38+
import java.nio.ByteBuffer;
39+
import java.nio.channels.Channels;
40+
import java.nio.channels.ReadableByteChannel;
3841
import java.nio.file.Paths;
3942
import java.util.HashMap;
4043
import java.util.List;
@@ -50,6 +53,7 @@ public class ParquetFileIterator extends AbstractFileIterator {
5053
private ParquetReader<Map<String, Object>> ireader;
5154
private boolean useAvroEncode = false;
5255
private static final long maxSize = Integer.MAX_VALUE;
56+
private MemorySegment segment;
5357

5458
public ParquetFileIterator() {
5559
identifier = Const.FILEFORMATSTR.PARQUET.getValue();
@@ -100,9 +104,17 @@ public void beforeProcess() {
100104
copyToLocal(tmpFile, instream);
101105
file = new LocalInputFile(Paths.get(new URI(tmpFilePath)));
102106
} else {
103-
ByteArrayOutputStream byteout = new ByteArrayOutputStream((int) size);
104-
IOUtils.copyBytes(instream, byteout, 8000);
105-
SeekableInputStream seekableInputStream = new SeekableInputStream(byteout.toByteArray());
107+
//use flink memory utils to use offHeapMemory to dump file content
108+
segment= MemorySegmentFactory.allocateOffHeapUnsafeMemory((int)size,this,new Thread(){});
109+
ByteBuffer byteBuffer=segment.getOffHeapBuffer();
110+
ReadableByteChannel channel= Channels.newChannel(instream);
111+
IOUtils.readFully(channel,byteBuffer);
112+
byteBuffer.position(0);
113+
channel.close();
114+
ByteBufferSeekableInputStream seekableInputStream=new ByteBufferSeekableInputStream(byteBuffer);
115+
/*ByteArrayOutputStream byteout = new ByteArrayOutputStream((int) size);
116+
IOUtils.copy(instream, byteout, 4096);
117+
ByteArraySeekableInputStream seekableInputStream = new ByteArraySeekableInputStream(byteout.toByteArray());*/
106118
file = ParquetUtil.makeInputFile(seekableInputStream);
107119
}
108120
}
@@ -210,6 +222,10 @@ public void close() throws IOException {
210222
if (!ObjectUtils.isEmpty(preader)) {
211223
preader.close();
212224
}
225+
//free offHeap memory
226+
if(!ObjectUtils.isEmpty(segment)){
227+
segment.free();
228+
}
213229

214230
if (!ObjectUtils.isEmpty(tmpFile)) {
215231
FileUtils.deleteQuietly(tmpFile);

hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetStreamIterator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.robin.comm.fileaccess.iterator;
22

33
import com.robin.comm.fileaccess.util.ParquetUtil;
4-
import com.robin.comm.fileaccess.util.SeekableInputStream;
4+
import com.robin.comm.fileaccess.util.ByteArraySeekableInputStream;
55
import com.robin.core.base.util.Const;
66
import com.robin.core.base.util.IOUtils;
77
import com.robin.core.fileaccess.iterator.AbstractFileIterator;
@@ -58,7 +58,7 @@ public void beforeProcess() {
5858
//seek remote file to local tmp
5959
ByteArrayOutputStream byteout=new ByteArrayOutputStream(instream.available());
6060
IOUtils.copyBytes(instream,byteout,8000);
61-
com.robin.comm.fileaccess.util.SeekableInputStream seekableInputStream=new SeekableInputStream(byteout.toByteArray());
61+
ByteArraySeekableInputStream seekableInputStream=new ByteArraySeekableInputStream(byteout.toByteArray());
6262
preader = AvroParquetReader
6363
.<GenericData.Record>builder(ParquetUtil.makeInputFile(seekableInputStream)).withConf(conf).build();
6464
fields = schema.getFields();

hadooptool/src/main/java/com/robin/comm/fileaccess/util/SeekableInputStream.java renamed to hadooptool/src/main/java/com/robin/comm/fileaccess/util/ByteArraySeekableInputStream.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,17 @@
66
import java.io.ByteArrayInputStream;
77
import java.io.EOFException;
88
import java.io.IOException;
9+
import java.nio.ByteBuffer;
910

1011

11-
public class SeekableInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable {
12+
public class ByteArraySeekableInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable {
1213

13-
14-
public SeekableInputStream(byte[] buf) {
14+
public ByteArraySeekableInputStream(byte[] buf) {
1515
super(buf);
1616
}
17+
public ByteArraySeekableInputStream(ByteBuffer byteBuffer){
18+
super(byteBuffer.array());
19+
}
1720

1821

1922
@Override
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package com.robin.comm.fileaccess.util;
2+
3+
4+
import org.apache.hadoop.fs.PositionedReadable;
5+
import org.apache.hadoop.fs.Seekable;
6+
7+
import java.io.EOFException;
8+
import java.io.IOException;
9+
import java.io.InputStream;
10+
import java.nio.ByteBuffer;
11+
12+
public class ByteBufferSeekableInputStream extends InputStream implements Seekable, PositionedReadable {
13+
private ByteBuffer byteBuffer;
14+
public ByteBufferSeekableInputStream(ByteBuffer byteBuffer){
15+
this.byteBuffer=byteBuffer;
16+
}
17+
@Override
18+
public int read() throws IOException {
19+
if (byteBuffer.remaining() == 0) {
20+
return -1;
21+
}
22+
return byteBuffer.get() & 0xff;
23+
}
24+
25+
@Override
26+
public int available() throws IOException {
27+
return byteBuffer.remaining();
28+
}
29+
30+
@Override
31+
public int read(byte[] b, int off, int len) throws IOException {
32+
if (byteBuffer.remaining() == 0) {
33+
return -1;
34+
}
35+
if (len > byteBuffer.remaining()) {
36+
len = byteBuffer.remaining();
37+
}
38+
byteBuffer.get(b, off, len);
39+
return len;
40+
}
41+
42+
@Override
43+
public int read(byte[] b) throws IOException {
44+
return read(b, 0, b.length);
45+
}
46+
47+
@Override
48+
public long skip(long n) throws IOException {
49+
long newPos = byteBuffer.position() + n;
50+
if (newPos > byteBuffer.remaining()) {
51+
n = byteBuffer.remaining();
52+
}
53+
byteBuffer.position(byteBuffer.position() + (int) n);
54+
return n;
55+
}
56+
57+
@Override
58+
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
59+
long oldPos=getPos();
60+
int nread = -1;
61+
try {
62+
seek(position);
63+
nread = read(buffer, offset, length);
64+
} finally {
65+
seek(oldPos);
66+
}
67+
return nread;
68+
}
69+
70+
@Override
71+
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
72+
int nread = 0;
73+
while (nread < length) {
74+
int nbytes = read(position + nread, buffer, offset + nread, length - nread);
75+
if (nbytes < 0) {
76+
throw new EOFException("End of file reached before reading fully.");
77+
}
78+
nread += nbytes;
79+
}
80+
}
81+
82+
@Override
83+
public void readFully(long position, byte[] buffer) throws IOException {
84+
readFully(position, buffer, 0, buffer.length);
85+
}
86+
87+
@Override
88+
public void seek(long pos) throws IOException {
89+
byteBuffer.position((int)pos);
90+
}
91+
92+
@Override
93+
public long getPos() throws IOException {
94+
return byteBuffer.position();
95+
}
96+
97+
@Override
98+
public boolean seekToNewSource(long l) throws IOException {
99+
return false;
100+
}
101+
}

hadooptool/src/main/java/com/robin/comm/fileaccess/util/MockInputStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public class MockInputStream extends FSDataInputStream {
99
MockFileSystem fs;
1010

1111
public MockInputStream(MockFileSystem fs, byte[] streamBytes) throws IOException {
12-
super(new SeekableInputStream(streamBytes));
12+
super(new ByteArraySeekableInputStream(streamBytes));
1313
this.fs = fs;
1414
}
1515

0 commit comments

Comments
 (0)