Skip to content

Commit 3d7957a

Browse files
committed
Cloud storage FileSystem Accessor refactor
1 parent b5b1aa6 commit 3d7957a

File tree

12 files changed

+326
-31
lines changed

12 files changed

+326
-31
lines changed

common/src/main/java/com/robin/comm/dal/holder/RecordWriterHolder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public class RecordWriterHolder extends AbstractResourceHolder {
2222
@Override
2323
public void init(DataCollectionMeta colmeta) throws Exception {
2424
if(writer!=null || busyTag){
25-
throw new OperationInWorkException("last Opertaion Writer already Exists.May not be shutdown Propery");
25+
throw new OperationInWorkException("last Operation Writer already Exists.May not be shutdown Property");
2626
}
2727
URI uri=new URI(colmeta.getPath());
2828
String schema=uri.getScheme();

common/src/main/java/com/robin/core/fileaccess/fs/IFileSystemAccessor.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,62 @@
55

66
import java.io.*;
77

8+
/**
9+
* All FileSystem accessor interface,Extends By Local/VFS/HDFS and cloud storage supported.
10+
*/
811
public interface IFileSystemAccessor {
12+
/**
13+
* get BufferedReader or inputStream with compress format
14+
* @param meta
15+
* @param resourcePath
16+
* @return Pair(BufferedReader,InputStream)
17+
* @throws IOException
18+
*/
919
Pair<BufferedReader,InputStream> getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException;
20+
21+
/**
22+
* get BufferedWriter or OutputStream with compress format
23+
* @param meta
24+
* @param resourcePath
25+
* @return
26+
* @throws IOException
27+
*/
1028
Pair<BufferedWriter,OutputStream> getOutResourceByWriter(DataCollectionMeta meta, String resourcePath) throws IOException;
29+
30+
/**
31+
* get OutputStream ignore compress format
32+
* @param meta
33+
* @param resourcePath
34+
* @return
35+
* @throws IOException
36+
*/
1137
OutputStream getRawOutputStream(DataCollectionMeta meta, String resourcePath) throws IOException;
38+
39+
/**
40+
* get OutputStream with compress format support
41+
* @param meta
42+
* @param resourcePath
43+
* @return
44+
* @throws IOException
45+
*/
1246
OutputStream getOutResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException;
47+
48+
/**
49+
* get InputStream with compress format support
50+
* @param meta
51+
* @param resourcePath
52+
* @return
53+
* @throws IOException
54+
*/
1355
InputStream getInResourceByStream(DataCollectionMeta meta, String resourcePath) throws IOException;
56+
57+
/**
58+
* get InputStream ignore compress format
59+
* @param meta
60+
* @param resourcePath
61+
* @return
62+
* @throws IOException
63+
*/
1464
InputStream getRawInputStream(DataCollectionMeta meta, String resourcePath) throws IOException;
1565
boolean exists(DataCollectionMeta meta, String resourcePath) throws IOException;
1666
long getInputStreamSize(DataCollectionMeta meta, String resourcePath) throws IOException;

core/src/main/java/com/robin/core/base/util/ResourceConst.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,19 @@ public enum OBSPARAM{
208208
this.value=value;
209209
}
210210

211+
public String getValue() {
212+
return value;
213+
}
214+
}
215+
public enum GCSPARAM {
216+
CREDENTIALSFILE("credentialsFile"),
217+
SCOPES("scopes"),
218+
SELFLINK("selfLink");
219+
private String value;
220+
GCSPARAM(String value){
221+
this.value=value;
222+
}
223+
211224
public String getValue() {
212225
return value;
213226
}

hadooptool/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,12 @@
372372
<version>8.4.0</version>
373373
<optional>true</optional>
374374
</dependency>
375+
<dependency>
376+
<groupId>com.google.cloud</groupId>
377+
<artifactId>google-cloud-storage</artifactId>
378+
<version>2.27.0</version>
379+
<optional>true</optional>
380+
</dependency>
375381
<!-- FileSystem dependency end -->
376382
<dependency>
377383
<groupId>org.mongodb</groupId>

hadooptool/src/main/java/com/robin/comm/fileaccess/fs/AbstractCloudStorageFileSystemAccessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
public abstract class AbstractCloudStorageFileSystemAccessor extends AbstractFileSystemAccessor {
2121
protected String bucketName;
2222
protected String tmpFilePath;
23+
2324
@Override
2425
public Pair<BufferedReader, InputStream> getInResourceByReader(DataCollectionMeta meta, String resourcePath) throws IOException {
2526
InputStream inputStream = getInputStreamByConfig(meta);
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package com.robin.comm.fileaccess.fs;
2+
3+
import com.google.auth.oauth2.GoogleCredentials;
4+
import com.google.cloud.storage.*;
5+
import com.google.common.collect.Lists;
6+
import com.robin.core.base.exception.MissingConfigException;
7+
import com.robin.core.base.util.ResourceConst;
8+
import com.robin.core.fileaccess.meta.DataCollectionMeta;
9+
import lombok.NonNull;
10+
import lombok.extern.slf4j.Slf4j;
11+
import org.springframework.util.Assert;
12+
import org.springframework.util.CollectionUtils;
13+
import org.springframework.util.ObjectUtils;
14+
15+
import java.io.*;
16+
import java.nio.channels.Channels;
17+
import java.nio.file.Paths;
18+
import java.util.List;
19+
/**
20+
* Google Cloud Storage FileSystemAccessor,must init individual
21+
*/
22+
@Slf4j
23+
public class GCSFileSystemAccessor extends AbstractCloudStorageFileSystemAccessor{
24+
private GoogleCredentials credentials;
25+
private String credentialsFile;
26+
private List<String> scopes;
27+
private Storage storage;
28+
29+
@Override
30+
public void init(DataCollectionMeta meta) {
31+
Assert.notNull(meta.getResourceCfgMap().get(ResourceConst.GCSPARAM.CREDENTIALSFILE.getValue()), "must provide credentialsFile");
32+
credentialsFile=meta.getResourceCfgMap().get(ResourceConst.GCSPARAM.CREDENTIALSFILE.getValue()).toString();
33+
if(!ObjectUtils.isEmpty(meta.getResourceCfgMap().get(ResourceConst.GCSPARAM.SCOPES.getValue()))){
34+
scopes= Lists.newArrayList(meta.getResourceCfgMap().get(ResourceConst.GCSPARAM.SCOPES.getValue()).toString().split(","));
35+
}
36+
try{
37+
credentials=GoogleCredentials.fromStream(new FileInputStream(credentialsFile));
38+
if(!CollectionUtils.isEmpty(scopes)){
39+
credentials.createScoped(scopes);
40+
}
41+
storage= StorageOptions.newBuilder().setCredentials(credentials).build().getService();
42+
}catch (IOException ex){
43+
log.error("{}",ex.getMessage());
44+
}
45+
46+
}
47+
public void init() {
48+
Assert.notNull(credentialsFile, "must provide credentialsFile");
49+
try{
50+
credentials=GoogleCredentials.fromStream(new FileInputStream(credentialsFile));
51+
if(!CollectionUtils.isEmpty(scopes)){
52+
credentials.createScoped(scopes);
53+
}
54+
storage= StorageOptions.newBuilder().setCredentials(credentials).build().getService();
55+
}catch (IOException ex){
56+
log.error("{}",ex.getMessage());
57+
}
58+
}
59+
60+
@Override
61+
public boolean exists(DataCollectionMeta meta, String resourcePath) throws IOException {
62+
checkStorage(meta);
63+
BlobId blobId=BlobId.of(getBucketName(meta),meta.getPath());
64+
Blob blob=storage.get(blobId);
65+
return blob.exists();
66+
}
67+
68+
@Override
69+
public long getInputStreamSize(DataCollectionMeta meta, String resourcePath) throws IOException {
70+
checkStorage(meta);
71+
BlobId blobId=BlobId.of(getBucketName(meta),meta.getPath());
72+
Blob blob=storage.get(blobId);
73+
if(blob.exists()){
74+
return blob.getSize();
75+
}
76+
return 0;
77+
}
78+
79+
@Override
80+
protected boolean putObject(String bucketName, DataCollectionMeta meta, OutputStream outputStream) throws IOException {
81+
checkStorage(meta);
82+
BlobId blobId=BlobId.of(getBucketName(meta),meta.getPath());
83+
String contentType=!ObjectUtils.isEmpty(meta.getContent().getContentType())?meta.getContent().getContentType():"application/octet-stream";
84+
BlobInfo blobInfo= BlobInfo.newBuilder(blobId).setContentType(contentType).build();
85+
Blob blob;
86+
if(ByteArrayOutputStream.class.isAssignableFrom(outputStream.getClass())) {
87+
ByteArrayOutputStream byteArrayOutputStream = (ByteArrayOutputStream) outputStream;
88+
blob = storage.create(blobInfo, byteArrayOutputStream.toByteArray());
89+
}else{
90+
outputStream.close();
91+
blob=storage.createFrom(blobInfo,Paths.get(tmpFilePath));
92+
}
93+
meta.getResourceCfgMap().put(ResourceConst.GCSPARAM.SELFLINK.getValue(),blob.getSelfLink());
94+
return !ObjectUtils.isEmpty(blob.getEtag());
95+
}
96+
97+
98+
99+
@Override
100+
protected InputStream getObject(String bucketName, String objectName) {
101+
if(ObjectUtils.isEmpty(bucketName)){
102+
throw new MissingConfigException("bucketName "+bucketName+" does not exists!");
103+
}
104+
BlobId blobId=BlobId.of(bucketName,objectName);
105+
Blob blob=storage.get(blobId);
106+
if(blob.exists()){
107+
return Channels.newInputStream(blob.reader());
108+
}else {
109+
throw new MissingConfigException("objectName " + objectName + " can not get!");
110+
}
111+
}
112+
private void checkStorage(DataCollectionMeta meta) {
113+
Assert.notNull(storage,"storage not initialized");
114+
Bucket bucket=storage.get(getBucketName(meta));
115+
if(ObjectUtils.isEmpty(bucket)){
116+
throw new MissingConfigException("bucketName "+getBucketName(meta)+" does not exists!");
117+
}
118+
}
119+
120+
public static class Builder{
121+
private GCSFileSystemAccessor accessor;
122+
public static S3FileSystemAccessor.Builder builder(){
123+
return new S3FileSystemAccessor.Builder();
124+
}
125+
public Builder(){
126+
accessor=new GCSFileSystemAccessor();
127+
}
128+
public GCSFileSystemAccessor.Builder credentialsFile(String credentialsFile){
129+
accessor.credentialsFile=credentialsFile;
130+
return this;
131+
}
132+
public GCSFileSystemAccessor.Builder scopes(@NonNull String scopes){
133+
Assert.notNull(scopes,"must provided scopes");
134+
accessor.scopes=Lists.newArrayList(scopes.split(","));
135+
return this;
136+
}
137+
138+
public GCSFileSystemAccessor.Builder bucket(String bucketName){
139+
accessor.bucketName=bucketName;
140+
return this;
141+
}
142+
public GCSFileSystemAccessor.Builder withMetaConfig(DataCollectionMeta meta){
143+
accessor.init(meta);
144+
return this;
145+
}
146+
public GCSFileSystemAccessor build(){
147+
if(!ObjectUtils.isEmpty(accessor.credentials)){
148+
accessor.init();
149+
}
150+
return accessor;
151+
}
152+
}
153+
}

0 commit comments

Comments
 (0)