Skip to content

Commit 9bc411e

Browse files
committed
using OffHeap ByteBuffer as FileSystem OutputStream
1 parent ac315ca commit 9bc411e

11 files changed

+37
-159
lines changed

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/AbstractCloudStorageFileSystemAccessor.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.robin.comm.fileaccess.util.ByteBufferInputStream;
44
import com.robin.comm.fileaccess.util.ByteBufferOutputStream;
5-
import com.robin.comm.fileaccess.util.ByteBufferSeekableInputStream;
65
import com.robin.core.base.exception.MissingConfigException;
76
import com.robin.core.base.exception.OperationNotSupportException;
87
import com.robin.core.base.util.ResourceConst;
@@ -27,7 +26,7 @@
2726
public abstract class AbstractCloudStorageFileSystemAccessor extends AbstractFileSystemAccessor {
2827
protected String bucketName;
2928
protected String tmpFilePath;
30-
private int defaultDumpSize = ResourceConst.DEFAULTDUMPEDOFFHEAPSIZE;
29+
private int dumpOffHeapSize = ResourceConst.DEFAULTDUMPEDOFFHEAPSIZE;
3130
private MemorySegment segment;
3231
private boolean useFileCache = false;
3332

@@ -111,10 +110,10 @@ protected synchronized OutputStream getOutputStream(DataCollectionMeta meta) thr
111110
throw new OperationNotSupportException("Off Heap Segment is still in used! try later");
112111
}
113112
if (!ObjectUtils.isEmpty(meta.getResourceCfgMap().get(ResourceConst.DUMPEDOFFHEAPSIZEKEY))) {
114-
defaultDumpSize = Integer.parseInt(meta.getResourceCfgMap().get(ResourceConst.DUMPEDOFFHEAPSIZEKEY).toString());
113+
dumpOffHeapSize = Integer.parseInt(meta.getResourceCfgMap().get(ResourceConst.DUMPEDOFFHEAPSIZEKEY).toString());
115114
}
116-
segment = MemorySegmentFactory.allocateOffHeapUnsafeMemory(defaultDumpSize, this, new Thread() {});
117-
outputStream = new ByteBufferOutputStream(segment.getOffHeapBuffer());//new ByteArrayOutputStream();
115+
segment = MemorySegmentFactory.allocateOffHeapUnsafeMemory(dumpOffHeapSize, this, new Thread() {});
116+
outputStream = new ByteBufferOutputStream(segment.getOffHeapBuffer());
118117
}
119118
return outputStream;
120119
}
@@ -132,6 +131,14 @@ protected boolean uploadStorage(String bucketName, DataCollectionMeta meta, Outp
132131
}
133132
} catch (IOException ex) {
134133
log.error("{}", ex.getMessage());
134+
}finally {
135+
try {
136+
if (!ObjectUtils.isEmpty(outputStream)) {
137+
outputStream.close();
138+
}
139+
}catch (IOException ex){
140+
141+
}
135142
}
136143
return false;
137144
}
@@ -142,7 +149,5 @@ protected String getContentType(DataCollectionMeta meta) {
142149

143150
protected abstract boolean putObject(String bucketName, DataCollectionMeta meta, InputStream inputStream, long size) throws IOException;
144151

145-
protected abstract boolean putObject(String bucketName, DataCollectionMeta meta, OutputStream outputStream) throws IOException;
146-
147152
protected abstract InputStream getObject(String bucketName, String objectName);
148153
}

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/BOSFileSystemAccessor.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -95,24 +95,6 @@ protected boolean putObject(String bucketName, DataCollectionMeta meta, InputStr
9595
return !ObjectUtils.isEmpty(result) && !ObjectUtils.isEmpty(result.getETag());
9696
}
9797

98-
@Override
99-
protected boolean putObject(String bucketName, DataCollectionMeta meta, OutputStream outputStream) throws IOException {
100-
PutObjectResponse result;
101-
ObjectMetadata metadata=new ObjectMetadata();
102-
if(!ObjectUtils.isEmpty(meta.getContent())){
103-
metadata.setContentType(meta.getContent().getContentType());
104-
}
105-
if(ByteArrayOutputStream.class.isAssignableFrom(outputStream.getClass())) {
106-
ByteArrayOutputStream byteArrayOutputStream=(ByteArrayOutputStream)outputStream;
107-
metadata.setContentLength(byteArrayOutputStream.size());
108-
result = client.putObject(bucketName, meta.getPath(), new ByteArrayInputStream(byteArrayOutputStream.toByteArray()),metadata);
109-
}else{
110-
outputStream.close();
111-
result=client.putObject(bucketName,meta.getPath(), Files.newInputStream(Paths.get(tmpFilePath)),metadata);
112-
}
113-
return !ObjectUtils.isEmpty(result) && !ObjectUtils.isEmpty(result.getETag());
114-
}
115-
11698
public static class Builder{
11799
private BOSFileSystemAccessor accessor;
118100
public Builder(){

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/COSFileSystemAccessor.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,6 @@ protected boolean putObject(String bucketName, DataCollectionMeta meta, InputStr
137137
return false;
138138
}
139139

140-
@Override
141-
protected boolean putObject(String bucketName, DataCollectionMeta meta, OutputStream outputStream) throws IOException {
142-
throw new OperationNotSupportException("putObject replaced in cos filesystem");
143-
}
144-
145140
public static class Builder{
146141
private COSFileSystemAccessor accessor;
147142
public Builder(){

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/GCSFileSystemAccessor.java

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@
1212
import org.springframework.util.CollectionUtils;
1313
import org.springframework.util.ObjectUtils;
1414

15-
import java.io.*;
15+
import java.io.FileInputStream;
16+
import java.io.IOException;
17+
import java.io.InputStream;
1618
import java.nio.channels.Channels;
17-
import java.nio.file.Paths;
1819
import java.util.List;
1920
/**
2021
* Google Cloud Storage FileSystemAccessor,must init individual
@@ -77,24 +78,6 @@ public long getInputStreamSize(DataCollectionMeta meta, String resourcePath) thr
7778
return 0;
7879
}
7980

80-
@Override
81-
protected boolean putObject(String bucketName, DataCollectionMeta meta, OutputStream outputStream) throws IOException {
82-
checkStorage(meta);
83-
BlobId blobId=BlobId.of(getBucketName(meta),meta.getPath());
84-
String contentType=!ObjectUtils.isEmpty(meta.getContent().getContentType())?meta.getContent().getContentType():"application/octet-stream";
85-
BlobInfo blobInfo= BlobInfo.newBuilder(blobId).setContentType(contentType).build();
86-
Blob blob;
87-
if(ByteArrayOutputStream.class.isAssignableFrom(outputStream.getClass())) {
88-
ByteArrayOutputStream byteArrayOutputStream = (ByteArrayOutputStream) outputStream;
89-
blob = storage.create(blobInfo, byteArrayOutputStream.toByteArray());
90-
}else{
91-
outputStream.close();
92-
blob=storage.createFrom(blobInfo,Paths.get(tmpFilePath));
93-
}
94-
meta.getResourceCfgMap().put(ResourceConst.GCSPARAM.SELFLINK.getValue(),blob.getSelfLink());
95-
return !ObjectUtils.isEmpty(blob.getEtag());
96-
}
97-
9881
@Override
9982
protected boolean putObject(String bucketName, DataCollectionMeta meta, InputStream inputStream,long size) throws IOException {
10083
checkStorage(meta);

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/MinioFileSystemAccessor.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@
1212
import org.springframework.util.CollectionUtils;
1313
import org.springframework.util.ObjectUtils;
1414

15-
import java.io.*;
16-
import java.nio.file.Files;
17-
import java.nio.file.Paths;
15+
import java.io.IOException;
16+
import java.io.InputStream;
1817

1918
/**
2019
* Minio FileSystemAccessor,must init individual
@@ -72,18 +71,6 @@ protected InputStream getObject(String bucketName,String objectName) {
7271
}
7372
}
7473

75-
@Override
76-
protected boolean putObject(String bucketName, DataCollectionMeta meta, OutputStream outputStream) throws IOException {
77-
String contentType=!ObjectUtils.isEmpty(meta.getContent().getContentType())?meta.getContent().getContentType():"application/octet-stream";
78-
if(ByteArrayOutputStream.class.isAssignableFrom(outputStream.getClass())){
79-
ByteArrayOutputStream byteArrayOutputStream=(ByteArrayOutputStream)outputStream;
80-
return MinioUtils.putBucket(client,getBucketName(meta),meta.getPath(),new ByteArrayInputStream(byteArrayOutputStream.toByteArray()),byteArrayOutputStream.size(),contentType);
81-
}else{
82-
outputStream.close();
83-
return MinioUtils.putBucket(client,getBucketName(meta),meta.getPath(), Files.newInputStream(Paths.get(tmpFilePath)),Files.size(Paths.get(tmpFilePath)),contentType);
84-
}
85-
}
86-
8774
@Override
8875
protected boolean putObject(String bucketName, DataCollectionMeta meta, InputStream inputStream,long size) throws IOException {
8976
return MinioUtils.putBucket(client,getBucketName(meta),meta.getPath(),inputStream,size,getContentType(meta));

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/OBSFileSystemAccessor.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
import org.springframework.util.CollectionUtils;
1616
import org.springframework.util.ObjectUtils;
1717

18-
import java.io.*;
19-
import java.nio.file.Files;
20-
import java.nio.file.Paths;
18+
import java.io.IOException;
19+
import java.io.InputStream;
2120

2221
/**
2322
* HUAWEI OBS FileSystemAccessor,must init individual
@@ -92,24 +91,6 @@ protected boolean putObject(String bucketName, DataCollectionMeta meta, InputStr
9291
return result.getStatusCode()==200;
9392
}
9493

95-
@Override
96-
protected boolean putObject(String bucketName, DataCollectionMeta meta, OutputStream outputStream) throws IOException {
97-
ObjectMetadata metadata=new ObjectMetadata();
98-
PutObjectResult result;
99-
if(!ObjectUtils.isEmpty(meta.getContent())){
100-
metadata.setContentType(meta.getContent().getContentType());
101-
}
102-
if(ByteArrayOutputStream.class.isAssignableFrom(outputStream.getClass())) {
103-
ByteArrayOutputStream byteArrayOutputStream = (ByteArrayOutputStream) outputStream;
104-
metadata.setContentLength(Long.valueOf(byteArrayOutputStream.size()));
105-
result=client.putObject(bucketName,meta.getPath(),new ByteArrayInputStream(byteArrayOutputStream.toByteArray()),metadata);
106-
}else{
107-
outputStream.close();
108-
result=client.putObject(bucketName,meta.getPath(), Files.newInputStream(Paths.get(tmpFilePath)),metadata);
109-
}
110-
return result.getStatusCode()==200;
111-
}
112-
11394
public static class Builder {
11495
private OBSFileSystemAccessor accessor;
11596

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/OSSFileSystemAccessor.java

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import com.aliyun.oss.OSSClientBuilder;
55
import com.aliyun.oss.common.auth.CredentialsProvider;
66
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
7-
import com.aliyun.oss.common.comm.ResponseMessage;
87
import com.aliyun.oss.model.Bucket;
98
import com.aliyun.oss.model.OSSObject;
109
import com.aliyun.oss.model.ObjectMetadata;
@@ -13,16 +12,13 @@
1312
import com.robin.core.base.util.Const;
1413
import com.robin.core.base.util.ResourceConst;
1514
import com.robin.core.fileaccess.meta.DataCollectionMeta;
16-
import com.robin.core.fileaccess.util.ResourceUtil;
1715
import lombok.Getter;
18-
import org.apache.commons.io.FileUtils;
1916
import org.springframework.util.Assert;
2017
import org.springframework.util.CollectionUtils;
2118
import org.springframework.util.ObjectUtils;
2219

23-
import java.io.*;
24-
import java.nio.file.Files;
25-
import java.nio.file.Paths;
20+
import java.io.IOException;
21+
import java.io.InputStream;
2622

2723
/**
2824
* Aliyun OSS FileSystemAccessor
@@ -109,23 +105,6 @@ protected boolean putObject(String bucketName, DataCollectionMeta meta, InputStr
109105
return result.getResponse().isSuccessful();
110106
}
111107

112-
protected boolean putObject(String bucketName, DataCollectionMeta meta, OutputStream outputStream) throws IOException{
113-
PutObjectResult result;
114-
ObjectMetadata metadata=new ObjectMetadata();
115-
if(!ObjectUtils.isEmpty(meta.getContent())){
116-
metadata.setContentType(meta.getContent().getContentType());
117-
}
118-
if(ByteArrayOutputStream.class.isAssignableFrom(outputStream.getClass())) {
119-
ByteArrayOutputStream byteArrayOutputStream=(ByteArrayOutputStream)outputStream;
120-
metadata.setContentLength(byteArrayOutputStream.size());
121-
result = ossClient.putObject(bucketName, meta.getPath(), new ByteArrayInputStream(byteArrayOutputStream.toByteArray()),metadata);
122-
}else{
123-
outputStream.close();
124-
result=ossClient.putObject(bucketName,meta.getPath(), Files.newInputStream(Paths.get(tmpFilePath)),metadata);
125-
}
126-
return result.getResponse().isSuccessful();
127-
}
128-
129108
public static class Builder{
130109
private OSSFileSystemAccessor accessor;
131110
public Builder(){

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/QiniuFileSystemAccessor.java

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,17 @@
1414
import com.robin.core.base.util.Const;
1515
import com.robin.core.base.util.ResourceConst;
1616
import com.robin.core.fileaccess.meta.DataCollectionMeta;
17-
import com.robin.core.fileaccess.util.ResourceUtil;
1817
import lombok.Getter;
1918
import lombok.extern.slf4j.Slf4j;
2019
import org.springframework.lang.NonNull;
2120
import org.springframework.util.Assert;
2221
import org.springframework.util.CollectionUtils;
2322
import org.springframework.util.ObjectUtils;
2423

25-
import java.io.*;
24+
import java.io.IOException;
25+
import java.io.InputStream;
2626
import java.net.URL;
2727
import java.net.URLEncoder;
28-
import java.nio.file.Files;
29-
import java.nio.file.Paths;
3028

3129

3230
/**
@@ -42,7 +40,7 @@ public class QiniuFileSystemAccessor extends AbstractCloudStorageFileSystemAcces
4240
private String accessKey;
4341
private String secretKey;
4442
private Region region;
45-
private Gson gson= GsonUtil.getGson();
43+
private final Gson gson= GsonUtil.getGson();
4644
private String downDomain;
4745
private QiniuFileSystemAccessor(){
4846
this.identifier= Const.FILESYSTEM.QINIU.getValue();
@@ -56,8 +54,8 @@ public void init(DataCollectionMeta meta) {
5654
Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.ACESSSKEY.getValue()),"must provide accessKey");
5755
Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.SECURITYKEY.getValue()),"must provide securityKey");
5856
Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.DOWNDOMAIN.getValue()),"must provide downDomain");
59-
String accessKey=meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.ACESSSKEY.getValue()).toString();
60-
String secretKey=meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.SECURITYKEY.getValue()).toString();
57+
accessKey=meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.ACESSSKEY.getValue()).toString();
58+
secretKey=meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.SECURITYKEY.getValue()).toString();
6159
domain=meta.getResourceCfgMap().get(ResourceConst.QINIUPARAM.DOMAIN.getValue()).toString();
6260
auth= Auth.create(accessKey,secretKey);
6361
region=Region.autoRegion();
@@ -109,32 +107,12 @@ private long getSize(String bucketName,String key) {
109107
}
110108
return 0L;
111109
}
112-
protected boolean putObject(String token,DataCollectionMeta meta,OutputStream outputStream) throws IOException{
113-
Response result;
114-
if(ByteArrayOutputStream.class.isAssignableFrom(outputStream.getClass())) {
115-
ByteArrayOutputStream byteArrayOutputStream=(ByteArrayOutputStream)outputStream;
116-
result= uploadManager.put(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()),byteArrayOutputStream.size(),meta.getPath(),token,null,meta.getContent().getContentType(),true);
117-
}else{
118-
outputStream.close();
119-
long size=Files.size(Paths.get(tmpFilePath));
120-
result=uploadManager.put(Files.newInputStream(Paths.get(tmpFilePath)),size,meta.getPath(),token,null,meta.getContent().getContentType(),true);
121-
}
122-
DefaultPutRet putRet=gson.fromJson(result.bodyString(),DefaultPutRet.class);
123-
if(!ObjectUtils.isEmpty(putRet)){
124-
return true;
125-
}
126-
return false;
127-
}
128-
129110
@Override
130111
protected boolean putObject(String bucketName, DataCollectionMeta meta, InputStream inputStream,long size) throws IOException {
131112
String token=auth.uploadToken(bucketName,meta.getPath());
132113
Response result=uploadManager.put(inputStream,size,meta.getPath(),token,null,getContentType(meta),true);
133114
DefaultPutRet putRet=gson.fromJson(result.bodyString(),DefaultPutRet.class);
134-
if(!ObjectUtils.isEmpty(putRet)){
135-
return true;
136-
}
137-
return false;
115+
return !ObjectUtils.isEmpty(putRet);
138116
}
139117

140118
protected InputStream getObject(@NonNull String bucketName, @NonNull String key) {

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/S3FileSystemAccessor.java

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import com.robin.core.base.util.Const;
66
import com.robin.core.base.util.ResourceConst;
77
import com.robin.core.fileaccess.meta.DataCollectionMeta;
8-
import com.robin.core.fileaccess.util.ResourceUtil;
98
import com.robin.dfs.aws.AwsUtils;
109
import lombok.Getter;
1110
import lombok.extern.slf4j.Slf4j;
@@ -16,9 +15,8 @@
1615
import software.amazon.awssdk.services.s3.S3AsyncClient;
1716
import software.amazon.awssdk.services.s3.S3Client;
1817

19-
import java.io.*;
20-
import java.nio.file.Files;
21-
import java.nio.file.Paths;
18+
import java.io.IOException;
19+
import java.io.InputStream;
2220

2321
/**
2422
* Amazon AWS FileSystemAccessor
@@ -44,8 +42,8 @@ public void init(DataCollectionMeta meta) {
4442
if (!CollectionUtils.isEmpty(meta.getResourceCfgMap())) {
4543
if (meta.getResourceCfgMap().containsKey(ResourceConst.S3PARAM.ACCESSKEY.getValue()) &&
4644
meta.getResourceCfgMap().containsKey(ResourceConst.S3PARAM.SECRET.getValue())) {
47-
Object regionName = meta.getResourceCfgMap().get(ResourceConst.S3PARAM.REGION.getValue());
48-
region = ObjectUtils.isEmpty(regionName) ? Region.US_EAST_1 : Region.of(regionName.toString());
45+
regionName = meta.getResourceCfgMap().get(ResourceConst.S3PARAM.REGION.getValue()).toString();
46+
region = ObjectUtils.isEmpty(regionName) ? Region.US_EAST_1 : Region.of(regionName);
4947
client = AwsUtils.getClientByCredential(region, meta.getResourceCfgMap().get(ResourceConst.S3PARAM.ACCESSKEY.getValue()).toString(), meta.getResourceCfgMap().get(ResourceConst.S3PARAM.SECRET.getValue()).toString());
5048
asyncClient = AwsUtils.getAsyncClientByCredential(region, meta.getResourceCfgMap().get(ResourceConst.S3PARAM.ACCESSKEY.getValue()).toString(), meta.getResourceCfgMap().get(ResourceConst.S3PARAM.SECRET.getValue()).toString());
5149
}else{
@@ -78,22 +76,6 @@ protected boolean putObject(String bucketName, DataCollectionMeta meta, InputStr
7876
return AwsUtils.put(client,bucketName,meta.getPath(),getContentType(meta),inputStream,size);
7977
}
8078

81-
@Override
82-
protected boolean putObject(String bucketName, DataCollectionMeta meta, OutputStream outputStream) throws IOException {
83-
String tmpFilePath;
84-
String contentType=!ObjectUtils.isEmpty(meta.getContent())?meta.getContent().getContentType():"application/octet-stream";
85-
if(ByteArrayOutputStream.class.isAssignableFrom(outputStream.getClass())) {
86-
ByteArrayOutputStream byteArrayOutputStream = (ByteArrayOutputStream) outputStream;
87-
return AwsUtils.put(client,bucketName,meta.getPath(),contentType,new ByteArrayInputStream(byteArrayOutputStream.toByteArray()),new Long(byteArrayOutputStream.size()));
88-
}else{
89-
outputStream.close();
90-
String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta);
91-
tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath());
92-
long size= Files.size(Paths.get(tmpFilePath));
93-
return AwsUtils.put(client,bucketName,meta.getPath(),contentType,Files.newInputStream(Paths.get(tmpFilePath)),size);
94-
}
95-
}
96-
9779
@Override
9880
protected InputStream getObject(String bucketName, String objectName) {
9981
return AwsUtils.getObject(client,bucketName,objectName);

hadooptool/src/main/java/com/robin/dfs/aws/AwsUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@
2727

2828
@Slf4j
2929
public class AwsUtils {
30+
private AwsUtils(){
31+
32+
}
3033
public static S3Client getHttpClient() {
3134
return S3Client.builder().httpClientBuilder(ApacheHttpClient.builder()).build();
3235
}

0 commit comments

Comments
 (0)