Skip to content

Commit ed5b791

Browse files
committed
Cloud storage FileSystem Accessor refactor
1 parent 1aab960 commit ed5b791

File tree

8 files changed

+12
-48
lines changed

8 files changed

+12
-48
lines changed

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/AbstractCloudStorageFileSystemAccessor.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.robin.core.fileaccess.meta.DataCollectionMeta;
66
import com.robin.core.fileaccess.util.ResourceUtil;
77
import lombok.extern.slf4j.Slf4j;
8+
import org.apache.commons.io.FileUtils;
89
import org.apache.commons.lang3.tuple.Pair;
910
import org.springframework.util.ObjectUtils;
1011

@@ -18,6 +19,7 @@
1819
@Slf4j
1920
public abstract class AbstractCloudStorageFileSystemAccessor extends AbstractFileSystemAccessor {
2021
protected String bucketName;
22+
protected String tmpFilePath;
2123
@Override
2224
public Pair<BufferedReader, InputStream> getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException {
2325
InputStream inputStream = getInputStreamByConfig(meta);
@@ -54,12 +56,13 @@ public InputStream getRawInputStream(DataCollectionMeta meta, String resourcePat
5456
public void finishWrite(DataCollectionMeta meta,OutputStream outputStream) {
5557
try{
5658
putObject(getBucketName(meta),meta,outputStream);
59+
if(!ObjectUtils.isEmpty(tmpFilePath)){
60+
FileUtils.deleteQuietly(new File(tmpFilePath));
61+
}
5762
}catch (IOException ex){
5863
log.error("{}",ex.getMessage());
5964
}
6065
}
61-
62-
6366
protected InputStream getInputStreamByConfig(DataCollectionMeta meta) {
6467
return getObject(getBucketName(meta), meta.getPath());
6568
}
@@ -70,7 +73,7 @@ protected OutputStream getOutputStream(DataCollectionMeta meta) throws IOExcepti
7073
OutputStream outputStream;
7174
if(!ObjectUtils.isEmpty(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG)) && "true".equalsIgnoreCase(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG).toString())){
7275
String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta);
73-
String tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath());
76+
tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath());
7477
outputStream= Files.newOutputStream(Paths.get(tmpFilePath));
7578
}else {
7679
outputStream = new ByteArrayOutputStream();

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/BOSFileSystemAccessor.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ protected InputStream getObject(String bucketName,String objectName){
8888
protected boolean putObject(String bucketName, DataCollectionMeta meta, OutputStream outputStream) throws IOException {
8989
PutObjectResponse result;
9090
ObjectMetadata metadata=new ObjectMetadata();
91-
String tmpFilePath=null;
9291
if(!ObjectUtils.isEmpty(meta.getContent())){
9392
metadata.setContentType(meta.getContent().getContentType());
9493
}
@@ -98,14 +97,8 @@ protected boolean putObject(String bucketName, DataCollectionMeta meta, OutputSt
9897
result = client.putObject(bucketName, meta.getPath(), new ByteArrayInputStream(byteArrayOutputStream.toByteArray()),metadata);
9998
}else{
10099
outputStream.close();
101-
String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta);
102-
tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath());
103-
metadata.setContentLength(Files.size(Paths.get(tmpFilePath)));
104100
result=client.putObject(bucketName,meta.getPath(), Files.newInputStream(Paths.get(tmpFilePath)),metadata);
105101
}
106-
if(ObjectUtils.isEmpty(tmpFilePath)){
107-
FileUtils.deleteQuietly(new File(tmpFilePath));
108-
}
109102
return !ObjectUtils.isEmpty(result) && !ObjectUtils.isEmpty(result.getETag());
110103
}
111104

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/COSFileSystemAccessor.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ private boolean upload(DataCollectionMeta meta, OutputStream outputStream) throw
127127
String bucketName= getBucketName(meta);
128128
TransferManager transferManager=getManager();
129129
PutObjectRequest request;
130-
String tmpFilePath=null;
131130
ObjectMetadata objectMetadata = new ObjectMetadata();
132131
if(!ObjectUtils.isEmpty(meta.getContent())){
133132
objectMetadata.setContentType(meta.getContent().getContentType());
@@ -137,11 +136,8 @@ private boolean upload(DataCollectionMeta meta, OutputStream outputStream) throw
137136
request = new PutObjectRequest(bucketName, meta.getPath(), new ByteArrayInputStream(((ByteArrayOutputStream)outputStream).toByteArray()),objectMetadata);
138137
}else{
139138
outputStream.close();
140-
String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta);
141-
tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath());
142139
request=new PutObjectRequest(bucketName,meta.getPath(),new File(tmpFilePath));
143140
}
144-
145141
try {
146142
Upload upload = transferManager.upload(request, null);
147143
UploadResult result = upload.waitForUploadResult();
@@ -152,9 +148,6 @@ private boolean upload(DataCollectionMeta meta, OutputStream outputStream) throw
152148
if (!ObjectUtils.isEmpty(transferManager)) {
153149
transferManager.shutdownNow(true);
154150
}
155-
if(!ObjectUtils.isEmpty(tmpFilePath)){
156-
FileUtils.deleteQuietly(new File(tmpFilePath));
157-
}
158151
}
159152
return false;
160153
}

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/MinioFileSystemAccessor.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,11 @@ protected InputStream getObject(String bucketName,String objectName) {
7474
@Override
7575
protected boolean putObject(String bucketName, DataCollectionMeta meta, OutputStream outputStream) throws IOException {
7676
String contentType=!ObjectUtils.isEmpty(meta.getContent().getContentType())?meta.getContent().getContentType():"application/octet-stream";
77-
String tmpFilePath;
7877
if(ByteArrayOutputStream.class.isAssignableFrom(outputStream.getClass())){
7978
ByteArrayOutputStream byteArrayOutputStream=(ByteArrayOutputStream)outputStream;
8079
return MinioUtils.putBucket(client,getBucketName(meta),meta.getPath(),new ByteArrayInputStream(byteArrayOutputStream.toByteArray()),byteArrayOutputStream.size(),contentType);
8180
}else{
8281
outputStream.close();
83-
String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta);
84-
tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath());
8582
return MinioUtils.putBucket(client,getBucketName(meta),meta.getPath(), Files.newInputStream(Paths.get(tmpFilePath)),Files.size(Paths.get(tmpFilePath)),contentType);
8683
}
8784
}

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/OBSFileSystemAccessor.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.robin.core.base.util.ResourceConst;
1212
import com.robin.core.fileaccess.meta.DataCollectionMeta;
1313
import com.robin.core.fileaccess.util.ResourceUtil;
14+
import lombok.Getter;
1415
import org.springframework.util.Assert;
1516
import org.springframework.util.CollectionUtils;
1617
import org.springframework.util.ObjectUtils;
@@ -22,6 +23,7 @@
2223
/**
2324
* HUAWEI OBS FileSystemAccessor,must init individual
2425
*/
26+
@Getter
2527
public class OBSFileSystemAccessor extends AbstractCloudStorageFileSystemAccessor {
2628
private String endpoint;
2729
private String accessKeyId;
@@ -84,7 +86,6 @@ protected InputStream getObject(String bucketName, String objectName) {
8486

8587
@Override
8688
protected boolean putObject(String bucketName, DataCollectionMeta meta, OutputStream outputStream) throws IOException {
87-
String tmpFilePath;
8889
ObjectMetadata metadata=new ObjectMetadata();
8990
PutObjectResult result;
9091
if(!ObjectUtils.isEmpty(meta.getContent())){
@@ -96,9 +97,6 @@ protected boolean putObject(String bucketName, DataCollectionMeta meta, OutputSt
9697
result=client.putObject(bucketName,meta.getPath(),new ByteArrayInputStream(byteArrayOutputStream.toByteArray()),metadata);
9798
}else{
9899
outputStream.close();
99-
String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta);
100-
tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath());
101-
metadata.setContentLength(Files.size(Paths.get(tmpFilePath)));
102100
result=client.putObject(bucketName,meta.getPath(), Files.newInputStream(Paths.get(tmpFilePath)),metadata);
103101
}
104102
return result.getStatusCode()==200;

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/OSSFileSystemAccessor.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ public class OSSFileSystemAccessor extends AbstractCloudStorageFileSystemAccesso
3535
private String region;
3636
private String accessKeyId;
3737
private String securityAccessKey;
38-
private String bucketName;
3938

4039
private OSSFileSystemAccessor(){
4140
this.identifier= Const.FILESYSTEM.ALIYUN.getValue();
@@ -91,7 +90,7 @@ protected InputStream getObject(String bucketName,String objectName){
9190
if (object.getResponse().isSuccessful()) {
9291
return object.getObjectContent();
9392
} else {
94-
throw new RuntimeException("objectName " + objectName + " can not get!");
93+
throw new MissingConfigException("objectName " + objectName + " can not get!");
9594
}
9695
}else{
9796
throw new MissingConfigException(" key "+objectName+" not in OSS bucket "+bucketName);
@@ -102,7 +101,6 @@ private Bucket createBucket(String bucketName){
102101
}
103102
protected boolean putObject(String bucketName,DataCollectionMeta meta,OutputStream outputStream) throws IOException{
104103
PutObjectResult result;
105-
String tmpFilePath=null;
106104
ObjectMetadata metadata=new ObjectMetadata();
107105
if(!ObjectUtils.isEmpty(meta.getContent())){
108106
metadata.setContentType(meta.getContent().getContentType());
@@ -113,16 +111,9 @@ protected boolean putObject(String bucketName,DataCollectionMeta meta,OutputStre
113111
result = ossClient.putObject(bucketName, meta.getPath(), new ByteArrayInputStream(byteArrayOutputStream.toByteArray()),metadata);
114112
}else{
115113
outputStream.close();
116-
String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta);
117-
tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath());
118-
metadata.setContentLength(Files.size(Paths.get(tmpFilePath)));
119114
result=ossClient.putObject(bucketName,meta.getPath(), Files.newInputStream(Paths.get(tmpFilePath)),metadata);
120115
}
121-
ResponseMessage message=result.getResponse();
122-
if(message.isSuccessful() && !ObjectUtils.isEmpty(tmpFilePath)){
123-
FileUtils.deleteQuietly(new File(tmpFilePath));
124-
}
125-
return message.isSuccessful();
116+
return result.getResponse().isSuccessful();
126117
}
127118

128119
public static class Builder{

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/QiniuFileSystemAccessor.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ public class QiniuFileSystemAccessor extends AbstractCloudStorageFileSystemAcces
4242
private String accessKey;
4343
private String secretKey;
4444
private Region region;
45-
private String bucketName;
4645
private Gson gson= GsonUtil.getGson();
4746
private String downDomain;
4847
private QiniuFileSystemAccessor(){
@@ -95,7 +94,7 @@ private boolean isKeyExist(String bucketName,String key) {
9594
try {
9695
FileInfo info = bucketManager.stat(bucketName, key);
9796
int status = info.status;
98-
return true;
97+
return status>0;
9998
} catch (QiniuException ex) {
10099
log.error("{}", ex.getMessage());
101100
}
@@ -112,14 +111,11 @@ private long getSize(String bucketName,String key) {
112111
}
113112
protected boolean putObject(String token,DataCollectionMeta meta,OutputStream outputStream) throws IOException{
114113
Response result;
115-
String tmpFilePath=null;
116114
if(ByteArrayOutputStream.class.isAssignableFrom(outputStream.getClass())) {
117115
ByteArrayOutputStream byteArrayOutputStream=(ByteArrayOutputStream)outputStream;
118116
result= uploadManager.put(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()),byteArrayOutputStream.size(),meta.getPath(),token,null,meta.getContent().getContentType(),true);
119117
}else{
120118
outputStream.close();
121-
String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta);
122-
tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath());
123119
long size=Files.size(Paths.get(tmpFilePath));
124120
result=uploadManager.put(Files.newInputStream(Paths.get(tmpFilePath)),size,meta.getPath(),token,null,meta.getContent().getContentType(),true);
125121
}

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/S3FileSystemAccessor.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,7 @@ protected InputStream getObject(String bucketName, String objectName) {
9494
return AwsUtils.getObject(client,bucketName,objectName);
9595
}
9696

97-
@Override
98-
public void finishWrite(DataCollectionMeta meta, OutputStream outputStream) {
99-
String bucketName = getBucketName(meta);
100-
ByteArrayOutputStream outputStream1=(ByteArrayOutputStream) outputStream;
101-
int size=outputStream1.size();
102-
String contentType=!ObjectUtils.isEmpty(meta.getContent())?meta.getContent().getContentType():"application/octet-stream";
103-
AwsUtils.put(client,bucketName,meta.getPath(),contentType,new ByteArrayInputStream(outputStream1.toByteArray()),new Long(size));
104-
}
97+
10598
public static class Builder{
10699
private S3FileSystemAccessor accessor;
107100
public static Builder builder(){

0 commit comments

Comments
 (0)