Skip to content

Commit 1aab960

Browse files
committed
Cloud storage FileSystem Accessor refactor
1 parent 437d926 commit 1aab960

File tree

13 files changed

+419
-350
lines changed

13 files changed

+419
-350
lines changed

common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -100,17 +100,7 @@ protected static InputStream wrapInputStream(InputStream instream){
100100
protected static OutputStream getOutputStreamByPath(String path, OutputStream out) throws IOException{
101101
return CompressEncoder.getOutputStreamByCompressType(path,out);
102102
}
103-
protected OutputStream getOutputStream(DataCollectionMeta meta) throws IOException {
104-
OutputStream outputStream;
105-
if(!ObjectUtils.isEmpty(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG)) && "true".equalsIgnoreCase(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG).toString())){
106-
String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta);
107-
String tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath());
108-
outputStream= Files.newOutputStream(Paths.get(tmpFilePath));
109-
}else {
110-
outputStream = new ByteArrayOutputStream();
111-
}
112-
return outputStream;
113-
}
103+
114104

115105
@Override
116106
public void init(DataCollectionMeta meta){

core/src/main/java/com/robin/core/base/util/Const.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,10 @@ public enum FILESYSTEM{
393393
S3("s3"),
394394
ALIYUN("oss"),
395395
TENCENT("cos"),
396-
QINIU("qiniu");
396+
QINIU("qiniu"),
397+
BAIDU_BOS("bos"),
398+
HUAWEI_OBS("obs"),
399+
MINIO("minio");
397400
private String value;
398401
FILESYSTEM(String value){
399402
this.value=value;

core/src/main/java/com/robin/core/base/util/ResourceConst.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,4 +196,18 @@ public String getValue() {
196196
}
197197

198198
}
199+
public enum OBSPARAM{
200+
ENDPOIN("endpoint"),
201+
REGION("region"),
202+
ACESSSKEYID("accessKeyId"),
203+
SECURITYACCESSKEY("securityAccessKey");
204+
private String value;
205+
OBSPARAM(String value){
206+
this.value=value;
207+
}
208+
209+
public String getValue() {
210+
return value;
211+
}
212+
}
199213
}

hadooptool/pom.xml

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@
278278
<version>0.9.6</version>
279279
<optional>true</optional>
280280
</dependency>
281+
<!-- FileSystem dependency begin -->
281282
<dependency>
282283
<groupId>com.qcloud</groupId>
283284
<artifactId>cos_api</artifactId>
@@ -289,17 +290,20 @@
289290
<version>3.17.0</version>
290291
</dependency>
291292
<dependency>
292-
<groupId>org.mongodb</groupId>
293-
<artifactId>mongo-java-driver</artifactId>
294-
<version>3.2.2</version>
295-
<optional>true</optional>
293+
<groupId>com.huaweicloud</groupId>
294+
<artifactId>esdk-obs-java-bundle</artifactId>
295+
<version>3.23.9</version>
296+
<exclusions>
297+
<exclusion>
298+
<groupId>org.apache.logging.log4j</groupId>
299+
<artifactId>log4j-core</artifactId>
300+
</exclusion>
301+
<exclusion>
302+
<groupId>org.apache.logging.log4j</groupId>
303+
<artifactId>log4j-api</artifactId>
304+
</exclusion>
305+
</exclusions>
296306
</dependency>
297-
<dependency>
298-
<groupId>org.apache.calcite</groupId>
299-
<artifactId>calcite-core</artifactId>
300-
<optional>true</optional>
301-
</dependency>
302-
<!-- s3 import -->
303307
<dependency>
304308
<groupId>software.amazon.awssdk</groupId>
305309
<artifactId>s3</artifactId>
@@ -355,6 +359,20 @@
355359
<version>8.4.0</version>
356360
<optional>true</optional>
357361
</dependency>
362+
<!-- FileSystem dependency end -->
363+
<dependency>
364+
<groupId>org.mongodb</groupId>
365+
<artifactId>mongo-java-driver</artifactId>
366+
<version>3.2.2</version>
367+
<optional>true</optional>
368+
</dependency>
369+
<dependency>
370+
<groupId>org.apache.calcite</groupId>
371+
<artifactId>calcite-core</artifactId>
372+
<optional>true</optional>
373+
</dependency>
374+
<!-- s3 import -->
375+
358376

359377
</dependencies>
360378

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package com.robin.comm.fileaccess.fs;
2+
3+
import com.robin.core.base.util.ResourceConst;
4+
import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor;
5+
import com.robin.core.fileaccess.meta.DataCollectionMeta;
6+
import com.robin.core.fileaccess.util.ResourceUtil;
7+
import lombok.extern.slf4j.Slf4j;
8+
import org.apache.commons.lang3.tuple.Pair;
9+
import org.springframework.util.ObjectUtils;
10+
11+
import java.io.*;
12+
import java.nio.file.Files;
13+
import java.nio.file.Paths;
14+
15+
/**
16+
* Cloud Storage FileSystemAccessor Abstract super class,not singleton,must init individual
17+
*/
18+
@Slf4j
19+
public abstract class AbstractCloudStorageFileSystemAccessor extends AbstractFileSystemAccessor {
20+
protected String bucketName;
21+
@Override
22+
public Pair<BufferedReader, InputStream> getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException {
23+
InputStream inputStream = getInputStreamByConfig(meta);
24+
return Pair.of(getReaderByPath(resourcePath, inputStream, meta.getEncode()),inputStream);
25+
}
26+
27+
@Override
28+
public Pair<BufferedWriter, OutputStream> getOutResourceByWriter(DataCollectionMeta meta, String resourcePath) throws IOException {
29+
OutputStream outputStream = getOutputStream(meta);
30+
return Pair.of(getWriterByPath(meta.getPath(), outputStream, meta.getEncode()),outputStream);
31+
}
32+
33+
@Override
34+
public OutputStream getRawOutputStream(DataCollectionMeta meta, String resourcePath) throws IOException {
35+
return getOutputStream(meta);
36+
}
37+
38+
@Override
39+
public OutputStream getOutResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException {
40+
return getOutputStreamByPath(resourcePath, getOutputStream(meta));
41+
}
42+
43+
@Override
44+
public InputStream getInResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException {
45+
InputStream inputStream = getInputStreamByConfig(meta);
46+
return getInputStreamByPath(resourcePath, inputStream);
47+
}
48+
49+
@Override
50+
public InputStream getRawInputStream(DataCollectionMeta meta, String resourcePath) throws IOException {
51+
return getInputStreamByConfig(meta);
52+
}
53+
@Override
54+
public void finishWrite(DataCollectionMeta meta,OutputStream outputStream) {
55+
try{
56+
putObject(getBucketName(meta),meta,outputStream);
57+
}catch (IOException ex){
58+
log.error("{}",ex.getMessage());
59+
}
60+
}
61+
62+
63+
protected InputStream getInputStreamByConfig(DataCollectionMeta meta) {
64+
return getObject(getBucketName(meta), meta.getPath());
65+
}
66+
protected String getBucketName(DataCollectionMeta meta) {
67+
return !ObjectUtils.isEmpty(bucketName)?bucketName:meta.getResourceCfgMap().get(ResourceConst.BUCKETNAME).toString();
68+
}
69+
protected OutputStream getOutputStream(DataCollectionMeta meta) throws IOException {
70+
OutputStream outputStream;
71+
if(!ObjectUtils.isEmpty(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG)) && "true".equalsIgnoreCase(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG).toString())){
72+
String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta);
73+
String tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath());
74+
outputStream= Files.newOutputStream(Paths.get(tmpFilePath));
75+
}else {
76+
outputStream = new ByteArrayOutputStream();
77+
}
78+
return outputStream;
79+
}
80+
protected abstract boolean putObject(String bucketName,DataCollectionMeta meta,OutputStream outputStream) throws IOException;
81+
protected abstract InputStream getObject(String bucketName,String objectName);
82+
}

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/BOSFileSystemAccessor.java

Lines changed: 41 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,33 @@
44
import com.baidubce.services.bos.BosClient;
55
import com.baidubce.services.bos.BosClientConfiguration;
66
import com.baidubce.services.bos.model.BosObject;
7+
import com.baidubce.services.bos.model.ObjectMetadata;
8+
import com.baidubce.services.bos.model.PutObjectResponse;
79
import com.robin.core.base.exception.MissingConfigException;
10+
import com.robin.core.base.util.Const;
811
import com.robin.core.base.util.ResourceConst;
9-
import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor;
1012
import com.robin.core.fileaccess.meta.DataCollectionMeta;
13+
import com.robin.core.fileaccess.util.ResourceUtil;
1114
import lombok.Getter;
12-
import lombok.extern.slf4j.Slf4j;
13-
import org.apache.commons.lang3.tuple.Pair;
15+
import org.apache.commons.io.FileUtils;
1416
import org.springframework.util.Assert;
1517
import org.springframework.util.CollectionUtils;
1618
import org.springframework.util.ObjectUtils;
1719

1820
import java.io.*;
21+
import java.nio.file.Files;
22+
import java.nio.file.Paths;
1923

20-
@Slf4j
2124
@Getter
2225
@SuppressWarnings("unused")
23-
public class BOSFileSystemAccessor extends AbstractFileSystemAccessor {
26+
public class BOSFileSystemAccessor extends AbstractCloudStorageFileSystemAccessor {
2427
private String endpoint;
2528
private String accessKeyId;
2629
private String securityAccessKey;
27-
private String bucketName;
2830
private BosClient client;
31+
private BOSFileSystemAccessor(){
32+
this.identifier= Const.FILESYSTEM.BAIDU_BOS.getValue();
33+
}
2934

3035
@Override
3136
public void init(DataCollectionMeta meta) {
@@ -52,63 +57,21 @@ public void init(){
5257
client=new BosClient(config);
5358
}
5459

55-
@Override
56-
public Pair<BufferedReader, InputStream> getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException {
57-
InputStream inputStream = getInputStreamByConfig(meta);
58-
return Pair.of(getReaderByPath(resourcePath, inputStream, meta.getEncode()),inputStream);
59-
}
60-
61-
@Override
62-
public Pair<BufferedWriter, OutputStream> getOutResourceByWriter(DataCollectionMeta meta, String resourcePath) throws IOException {
63-
OutputStream outputStream = getOutputStream(meta);
64-
return Pair.of(getWriterByPath(meta.getPath(), outputStream, meta.getEncode()),outputStream);
65-
}
66-
67-
@Override
68-
public OutputStream getRawOutputStream(DataCollectionMeta meta, String resourcePath) throws IOException {
69-
return getOutputStream(meta);
70-
}
71-
72-
@Override
73-
public OutputStream getOutResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException {
74-
return getOutputStreamByPath(resourcePath, getOutputStream(meta));
75-
}
76-
77-
@Override
78-
public InputStream getInResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException {
79-
InputStream inputStream = getInputStreamByConfig(meta);
80-
return getInputStreamByPath(resourcePath, inputStream);
81-
}
82-
83-
@Override
84-
public InputStream getRawInputStream(DataCollectionMeta meta, String resourcePath) throws IOException {
85-
return getInputStreamByConfig(meta);
86-
}
87-
8860
@Override
8961
public boolean exists(DataCollectionMeta meta, String resourcePath) throws IOException {
90-
String bucketName= getBucketName(meta);
91-
return client.doesObjectExist(bucketName,resourcePath);
62+
return client.doesObjectExist(getBucketName(meta),resourcePath);
9263
}
9364

9465
@Override
9566
public long getInputStreamSize(DataCollectionMeta meta, String resourcePath) throws IOException {
96-
String bucketName= getBucketName(meta);
9767
if(exists(meta,resourcePath)){
98-
BosObject object=client.getObject(bucketName,resourcePath);
68+
BosObject object=client.getObject(getBucketName(meta),resourcePath);
9969
return object.getObjectMetadata().getContentLength();
10070
}
10171
return 0;
10272
}
103-
private InputStream getInputStreamByConfig(DataCollectionMeta meta) {
104-
String bucketName= getBucketName(meta);
105-
String objectName= meta.getPath();
106-
return getObject(bucketName,objectName);
107-
}
108-
private String getBucketName(DataCollectionMeta meta) {
109-
return !ObjectUtils.isEmpty(bucketName)?bucketName:meta.getResourceCfgMap().get(ResourceConst.BUCKETNAME).toString();
110-
}
111-
private InputStream getObject(String bucketName,String objectName){
73+
74+
protected InputStream getObject(String bucketName,String objectName){
11275
if(client.doesObjectExist(bucketName,objectName)) {
11376
BosObject object = client.getObject(bucketName, objectName);
11477
if (!ObjectUtils.isEmpty(object)) {
@@ -120,6 +83,32 @@ private InputStream getObject(String bucketName,String objectName){
12083
throw new MissingConfigException(" key "+objectName+" not in OSS bucket "+bucketName);
12184
}
12285
}
86+
87+
@Override
88+
protected boolean putObject(String bucketName, DataCollectionMeta meta, OutputStream outputStream) throws IOException {
89+
PutObjectResponse result;
90+
ObjectMetadata metadata=new ObjectMetadata();
91+
String tmpFilePath=null;
92+
if(!ObjectUtils.isEmpty(meta.getContent())){
93+
metadata.setContentType(meta.getContent().getContentType());
94+
}
95+
if(ByteArrayOutputStream.class.isAssignableFrom(outputStream.getClass())) {
96+
ByteArrayOutputStream byteArrayOutputStream=(ByteArrayOutputStream)outputStream;
97+
metadata.setContentLength(byteArrayOutputStream.size());
98+
result = client.putObject(bucketName, meta.getPath(), new ByteArrayInputStream(byteArrayOutputStream.toByteArray()),metadata);
99+
}else{
100+
outputStream.close();
101+
String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta);
102+
tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath());
103+
metadata.setContentLength(Files.size(Paths.get(tmpFilePath)));
104+
result=client.putObject(bucketName,meta.getPath(), Files.newInputStream(Paths.get(tmpFilePath)),metadata);
105+
}
106+
if(ObjectUtils.isEmpty(tmpFilePath)){
107+
FileUtils.deleteQuietly(new File(tmpFilePath));
108+
}
109+
return !ObjectUtils.isEmpty(result) && !ObjectUtils.isEmpty(result.getETag());
110+
}
111+
123112
public static class Builder{
124113
private BOSFileSystemAccessor accessor;
125114
public Builder(){

0 commit comments

Comments
 (0)