Skip to content

Commit 217f749

Browse files
committed
optimize code
1 parent 460febf commit 217f749

File tree

21 files changed

+748
-133
lines changed

21 files changed

+748
-133
lines changed

common/src/main/java/com/robin/core/fileaccess/fs/AbstractFileSystemAccessor.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,16 @@
1515
*/
1616
package com.robin.core.fileaccess.fs;
1717

18+
import com.robin.core.base.util.ResourceConst;
1819
import com.robin.core.compress.util.CompressDecoder;
1920
import com.robin.core.compress.util.CompressEncoder;
2021
import com.robin.core.fileaccess.meta.DataCollectionMeta;
22+
import com.robin.core.fileaccess.util.ResourceUtil;
23+
import org.springframework.util.ObjectUtils;
2124

2225
import java.io.*;
26+
import java.nio.file.Files;
27+
import java.nio.file.Paths;
2328

2429
/**
2530
* abstract resource system access Utils (Local/Hdfs/ApacheVFS(including ftp sftp)/S3/Tencent cloud/aliyun)
@@ -95,6 +100,17 @@ protected static InputStream wrapInputStream(InputStream instream){
95100
protected static OutputStream getOutputStreamByPath(String path, OutputStream out) throws IOException{
96101
return CompressEncoder.getOutputStreamByCompressType(path,out);
97102
}
103+
protected OutputStream getOutputStream(DataCollectionMeta meta) throws IOException {
104+
OutputStream outputStream;
105+
if(!ObjectUtils.isEmpty(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG)) && "true".equalsIgnoreCase(meta.getResourceCfgMap().get(ResourceConst.USETMPFILETAG).toString())){
106+
String tmpPath = com.robin.core.base.util.FileUtils.getWorkingPath(meta);
107+
String tmpFilePath = tmpPath + ResourceUtil.getProcessFileName(meta.getPath());
108+
outputStream= Files.newOutputStream(Paths.get(tmpFilePath));
109+
}else {
110+
outputStream = new ByteArrayOutputStream();
111+
}
112+
return outputStream;
113+
}
98114

99115
@Override
100116
public void init(DataCollectionMeta meta){

common/src/main/java/com/robin/core/fileaccess/iterator/TextFileIteratorFactory.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ public static IResourceIterator getProcessIteratorByType(DataCollectionMeta colm
3838
IResourceIterator iterator=getIter(colmeta);
3939
return iterator;
4040
}
41+
public static IResourceIterator getProcessIteratorByType(DataCollectionMeta colmeta,AbstractFileSystemAccessor accessor) throws IOException{
42+
IResourceIterator iterator=getIter(colmeta,accessor);
43+
return iterator;
44+
}
4145
public static AbstractFileIterator getProcessReaderIterator(DataCollectionMeta colmeta, AbstractFileSystemAccessor utils){
4246
AbstractFileIterator iterator=null;
4347
String fileType=colmeta.getFileFormat();
@@ -67,9 +71,9 @@ public static IResourceIterator getProcessIteratorByType(DataCollectionMeta colm
6771
return iterator;
6872
}
6973
public static IResourceIterator getProcessIteratorByPath(DataCollectionMeta colmeta,InputStream in) throws IOException{
70-
List<String> suffixList=new ArrayList<>();
71-
FileUtils.parseFileFormat(colmeta.getPath(),suffixList);
72-
String fileFormat=suffixList.get(0);
74+
FileUtils.FileContent content=FileUtils.parseFile(colmeta.getPath());
75+
colmeta.setContent(content);
76+
String fileFormat=content.getFileFormat();
7377
if(StringUtils.isEmpty(colmeta.getFileFormat())){
7478
colmeta.setFileFormat(fileFormat);
7579
}
@@ -80,7 +84,7 @@ public static IResourceIterator getProcessIteratorByPath(DataCollectionMeta colm
8084
}
8185
private static IResourceIterator getIter(DataCollectionMeta colmeta) throws MissingConfigException {
8286
IResourceIterator iterator=null;
83-
String fileType=colmeta.getFileFormat();
87+
String fileType = getFileType(colmeta);
8488

8589
Class<? extends IResourceIterator> iterclass=fileIterMap.get(fileType);
8690
try {
@@ -93,6 +97,32 @@ private static IResourceIterator getIter(DataCollectionMeta colmeta) throws Miss
9397
}
9498
return iterator;
9599
}
100+
private static IResourceIterator getIter(DataCollectionMeta colmeta,AbstractFileSystemAccessor accessor) throws MissingConfigException {
101+
IResourceIterator iterator=null;
102+
String fileType = getFileType(colmeta);
103+
104+
Class<? extends IResourceIterator> iterclass=fileIterMap.get(fileType);
105+
try {
106+
if (!ObjectUtils.isEmpty(iterclass)) {
107+
iterator = iterclass.getConstructor(DataCollectionMeta.class,AbstractFileSystemAccessor.class).newInstance(colmeta,accessor);
108+
}
109+
iterator.beforeProcess();
110+
}catch (Exception ex){
111+
throw new MissingConfigException(ex);
112+
}
113+
return iterator;
114+
}
115+
116+
private static String getFileType(DataCollectionMeta colmeta) {
117+
String fileType= colmeta.getFileFormat();
118+
if(ObjectUtils.isEmpty(fileType)){
119+
FileUtils.FileContent content=FileUtils.parseFile(colmeta.getPath());
120+
colmeta.setContent(content);
121+
fileType=content.getFileFormat();
122+
}
123+
return fileType;
124+
}
125+
96126
private static void discoverIterator(){
97127
ServiceLoader.load(IResourceIterator.class).iterator().forEachRemaining(i->{
98128
if(AbstractFileIterator.class.isAssignableFrom(i.getClass()))

common/src/main/java/com/robin/core/fileaccess/writer/AbstractFileWriter.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,16 @@ protected AbstractFileWriter(DataCollectionMeta colmeta){
6262
}
6363
checkAccessUtil(colmeta.getPath());
6464
}
65+
protected AbstractFileWriter(DataCollectionMeta colmeta,AbstractFileSystemAccessor accessor){
66+
this.colmeta=colmeta;
67+
formatter=DateTimeFormatter.ofPattern(colmeta.getDefaultTimestampFormat());
68+
for (DataSetColumnMeta meta:colmeta.getColumnList()) {
69+
columnList.add(meta.getColumnName());
70+
columnMap.put(meta.getColumnName(), meta.getColumnType());
71+
}
72+
accessUtil=accessor;
73+
}
74+
6575
@Override
6676
public void setWriter(BufferedWriter writer){
6777
this.writer=writer;
@@ -161,9 +171,11 @@ protected String getOutputPath(String url){
161171
return url;
162172
}
163173
protected Const.CompressType getCompressType(){
164-
List<String> fileSuffix=new ArrayList<>();
165-
FileUtils.parseFileFormat(getOutputPath(colmeta.getPath()),fileSuffix);
166-
return FileUtils.getFileCompressType(fileSuffix);
174+
if(ObjectUtils.isEmpty(colmeta.getContent())) {
175+
FileUtils.FileContent content = FileUtils.parseFile(colmeta.getPath());
176+
colmeta.setContent(content);
177+
}
178+
return colmeta.getContent().getCompressType();
167179
}
168180

169181
@Override

common/src/main/java/com/robin/core/fileaccess/writer/TextFileWriterFactory.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.robin.core.fileaccess.writer;
1717

1818
import com.robin.core.base.util.FileUtils;
19+
import com.robin.core.fileaccess.fs.AbstractFileSystemAccessor;
1920
import com.robin.core.fileaccess.meta.DataCollectionMeta;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
@@ -39,6 +40,11 @@ public static IResourceWriter getWriterByType(DataCollectionMeta colmeta, Buffer
3940
fileWriter.setWriter(writer);
4041
return fileWriter;
4142
}
43+
public static IResourceWriter getWriterByType(DataCollectionMeta colmeta, BufferedWriter writer,AbstractFileSystemAccessor accessor) throws IOException {
44+
IResourceWriter fileWriter = getWriterByType(colmeta,accessor);
45+
fileWriter.setWriter(writer);
46+
return fileWriter;
47+
}
4248

4349
public static IResourceWriter getOutputStreamByType(DataCollectionMeta colmeta, OutputStream writer) throws IOException{
4450
IResourceWriter fileWriter = getWriterByType(colmeta);
@@ -47,6 +53,13 @@ public static IResourceWriter getOutputStreamByType(DataCollectionMeta colmeta,
4753
}
4854
return fileWriter;
4955
}
56+
public static IResourceWriter getOutputStreamByType(DataCollectionMeta colmeta, OutputStream writer,AbstractFileSystemAccessor accessor) throws IOException{
57+
IResourceWriter fileWriter = getWriterByType(colmeta,accessor);
58+
if(writer!=null) {
59+
fileWriter.setOutputStream(writer);
60+
}
61+
return fileWriter;
62+
}
5063

5164
public static IResourceWriter getWriterByPath(DataCollectionMeta colmeta, OutputStream writer) throws IOException{
5265
IResourceWriter fileWriter = getWriterByType(colmeta);
@@ -57,8 +70,8 @@ public static IResourceWriter getWriterByPath(DataCollectionMeta colmeta, Output
5770
public static IResourceWriter getWriterByType(DataCollectionMeta colmeta) throws IOException {
5871
IResourceWriter fileWriter = null;
5972
try {
73+
String fileSuffix = getFileSuffix(colmeta);
6074

61-
String fileSuffix=colmeta.getFileFormat();
6275
Class<? extends IResourceWriter> writerClass=fileWriterMap.get(fileSuffix);
6376
if (!ObjectUtils.isEmpty(writerClass)) {
6477
fileWriter = writerClass.getConstructor(DataCollectionMeta.class).newInstance(colmeta);
@@ -70,6 +83,33 @@ public static IResourceWriter getWriterByType(DataCollectionMeta colmeta) throws
7083
}
7184
return fileWriter;
7285
}
86+
public static IResourceWriter getWriterByType(DataCollectionMeta colmeta, AbstractFileSystemAccessor accessor) throws IOException {
87+
IResourceWriter fileWriter = null;
88+
try {
89+
String fileSuffix = getFileSuffix(colmeta);
90+
91+
Class<? extends IResourceWriter> writerClass=fileWriterMap.get(fileSuffix);
92+
if (!ObjectUtils.isEmpty(writerClass)) {
93+
fileWriter = writerClass.getConstructor(DataCollectionMeta.class,AbstractFileSystemAccessor.class).newInstance(colmeta,accessor);
94+
logger.info("using resource writer {}",writerClass.getCanonicalName());
95+
}
96+
97+
} catch (Exception ex) {
98+
throw new IOException(ex);
99+
}
100+
return fileWriter;
101+
}
102+
103+
private static String getFileSuffix(DataCollectionMeta colmeta) {
104+
String fileSuffix= colmeta.getFileFormat();
105+
if(ObjectUtils.isEmpty(fileSuffix)){
106+
FileUtils.FileContent content=FileUtils.parseFile(colmeta.getPath());
107+
colmeta.setContent(content);
108+
fileSuffix=content.getFileFormat();
109+
}
110+
return fileSuffix;
111+
}
112+
73113
private static void discoverIterator(Map<String,Class<? extends IResourceWriter>> fileIterMap){
74114
ServiceLoader.load(IResourceWriter.class).iterator().forEachRemaining(i->{
75115
if(AbstractFileWriter.class.isAssignableFrom(i.getClass()))

core/src/main/java/com/robin/core/base/util/Const.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,8 @@ public enum FILESYSTEM{
390390
HDFS("hdfs"),
391391
S3("s3"),
392392
ALIYUN("oss"),
393-
TENCENT("cos");
393+
TENCENT("cos"),
394+
QINIU("qiniu");
394395
private String value;
395396
FILESYSTEM(String value){
396397
this.value=value;

core/src/main/java/com/robin/core/base/util/FileUtils.java

Lines changed: 81 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
import cn.hutool.core.io.FileUtil;
44
import com.google.common.collect.Lists;
55
import com.robin.core.fileaccess.meta.DataCollectionMeta;
6+
import lombok.Data;
67
import lombok.extern.slf4j.Slf4j;
8+
import org.springframework.util.Assert;
9+
import org.springframework.util.MimeTypeUtils;
710
import org.springframework.util.ObjectUtils;
811

912
import java.io.File;
@@ -25,46 +28,89 @@ public class FileUtils {
2528
public static final List<Const.CompressType> compressTypeEnum =Collections.unmodifiableList(Lists.newArrayList(Const.CompressType.COMPRESS_TYPE_GZ, Const.CompressType.COMPRESS_TYPE_LZO,
2629
Const.CompressType.COMPRESS_TYPE_BZ2, Const.CompressType.COMPRESS_TYPE_SNAPPY, Const.CompressType.COMPRESS_TYPE_ZIP, Const.CompressType.COMPRESS_TYPE_LZMA,Const.CompressType.COMPRESS_TYPE_LZ4,Const.CompressType.COMPRESS_TYPE_ZSTD,Const.CompressType.COMPRESS_TYPE_BROTLI,Const.CompressType.COMPRESS_TYPE_XZ));
2730

31+
private static final Map<String,String> contentTypeMap=new HashMap<>();
32+
33+
static {
34+
ResourceBundle bundle=ResourceBundle.getBundle("contenttype");
35+
if(!ObjectUtils.isEmpty(bundle)){
36+
Iterator<String> iter=bundle.keySet().iterator();
37+
while (iter.hasNext()){
38+
String key=iter.next();
39+
contentTypeMap.put(key,bundle.getString(key));
40+
}
41+
}
42+
}
43+
2844
private FileUtils(){
2945

3046
}
31-
public static String parseFileFormat(String path, List<String> suffix) {
32-
String filePath = null;
33-
if (suffix == null) {
34-
suffix = new ArrayList<>();
47+
48+
public static FileContent parseFile(String path){
49+
Assert.isTrue(!ObjectUtils.isEmpty(path),"path must not be null!");
50+
FileContent fileContent=new FileContent();
51+
int pos=path.lastIndexOf(File.separator);
52+
if(pos==-1){
53+
pos=path.lastIndexOf("/");
3554
}
36-
if (path != null && !path.trim().isEmpty()) {
37-
int pos = path.lastIndexOf(File.separator);
38-
if (pos == -1) {
39-
pos = path.lastIndexOf("/");
40-
}
41-
if (pos != -1) {
42-
String fileName = path.substring(pos + 1);
43-
String[] arr = fileName.split("\\.");
44-
int lastpos = fileName.lastIndexOf(".");
45-
filePath = fileName.substring(0, lastpos);
46-
for (int i = arr.length - 1; i > 0; i--) {
47-
suffix.add(arr[i]);
55+
if(pos!=-1){
56+
String fileName=path.substring(pos+1);
57+
String filePath=path.substring(0,pos);
58+
fileContent.setFilePath(filePath);
59+
String[] parts=fileName.split("\\.");
60+
List<String> sepParts=new ArrayList<>();
61+
for(int i=parts.length-1;i>0;i--){
62+
if(Const.CompressType.COMPRESS_TYPE_NONE.equals(fileContent.getCompressType())){
63+
Const.CompressType compressType=getFileCompressType(parts[i]);
64+
if(!Const.CompressType.COMPRESS_TYPE_NONE.equals(compressType)) {
65+
fileContent.setCompressType(compressType);
66+
if (contentTypeMap.containsKey(parts[i].toLowerCase())) {
67+
fileContent.setContentType(contentTypeMap.get(parts[i].toLowerCase()));
68+
}
69+
}else{
70+
parseFileFormat(fileContent,parts[i],sepParts);
71+
}
72+
}else {
73+
parseFileFormat(fileContent,parts[i],sepParts);
4874
}
4975
}
76+
sepParts.add(parts[0]);
77+
Collections.reverse(sepParts);
78+
fileContent.setFileName(StringUtils.join(sepParts,"."));
5079
}
51-
return filePath;
80+
return fileContent;
5281
}
53-
54-
public static Const.CompressType getFileCompressType(List<String> suffixList) {
55-
Const.CompressType type = Const.CompressType.COMPRESS_TYPE_NONE;
56-
if (!suffixList.isEmpty() && avaiableCompressSuffixs.contains(suffixList.get(0).toLowerCase())) {
57-
type = compressTypeEnum.get(avaiableCompressSuffixs.indexOf(suffixList.get(0)));
82+
private static void parseFileFormat(FileContent content,String suffix,List<String> sepParts){
83+
if(ObjectUtils.isEmpty(content.getFileFormat())){
84+
content.setFileFormat(suffix);
85+
if(ObjectUtils.isEmpty(content.getContentType())){
86+
if(contentTypeMap.containsKey(suffix.toLowerCase())){
87+
content.setContentType(contentTypeMap.get(suffix.toLowerCase()));
88+
}
89+
}
90+
}else{
91+
sepParts.add(suffix);
5892
}
59-
return type;
6093
}
94+
95+
6196
public static Const.CompressType getFileCompressType(String suffix) {
6297
Const.CompressType type = Const.CompressType.COMPRESS_TYPE_NONE;
6398
if (suffix!=null && !suffix.isEmpty() && avaiableCompressSuffixs.contains(suffix.toLowerCase())) {
6499
type = compressTypeEnum.get(avaiableCompressSuffixs.indexOf(suffix.toLowerCase()));
65100
}
66101
return type;
67102
}
103+
public static String getContentType(String fileFormat){
104+
return contentTypeMap.containsKey(fileFormat.toLowerCase())?contentTypeMap.get(fileFormat.toLowerCase()):null;
105+
}
106+
public static String getContentType(DataCollectionMeta meta){
107+
String fileType=meta.getFileFormat();
108+
if(ObjectUtils.isEmpty(fileType)){
109+
FileUtils.FileContent content=FileUtils.parseFile(meta.getPath());
110+
fileType=content.getFileFormat();
111+
}
112+
return getContentType(fileType);
113+
}
68114
public static boolean mkDirWithGroupAndUser(String path,String group,String user) {
69115
FileUtil.mkdir(path);
70116
Path filePath= Paths.get(path);
@@ -111,4 +157,16 @@ public static String getWorkingPath(DataCollectionMeta meta){
111157
? meta.getResourceCfgMap().get(ResourceConst.WORKINGPATHPARAM).toString()
112158
: org.apache.commons.io.FileUtils.getTempDirectoryPath();
113159
}
160+
@Data
161+
public static class FileContent{
162+
private String fileName;
163+
private String filePath;
164+
private String fileFormat;
165+
private String contentType;
166+
private Const.CompressType compressType= Const.CompressType.COMPRESS_TYPE_NONE;
167+
}
168+
public static void main(String[] args){
169+
String path="file:///e:/tmp/test/test1.avro.cs.lz4";
170+
System.out.println(parseFile(path));
171+
}
114172
}

core/src/main/java/com/robin/core/base/util/ResourceConst.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,4 +154,19 @@ public String getValue() {
154154
return value;
155155
}
156156
}
157+
public enum QINIUPARAM{
158+
DOMAIN("domain"),
159+
REGION("region"),
160+
ACESSSKEY("accessKey"),
161+
SECURITYKEY("securityKey");
162+
private String value;
163+
QINIUPARAM(String value){
164+
this.value=value;
165+
}
166+
167+
public String getValue() {
168+
return value;
169+
}
170+
171+
}
157172
}

core/src/main/java/com/robin/core/compress/util/CompressDecoder.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,8 @@ private CompressDecoder(){
2929
}
3030
public static InputStream getInputStreamByCompressType(String path, InputStream rawstream) throws IOException{
3131
InputStream inputStream;
32-
List<String> suffixList=new ArrayList<>();
33-
FileUtils.parseFileFormat(path,suffixList);
34-
Const.CompressType type=FileUtils.getFileCompressType(suffixList);
32+
FileUtils.FileContent content=FileUtils.parseFile(path);
33+
Const.CompressType type=content.getCompressType();
3534
switch (type){
3635
case COMPRESS_TYPE_GZ:
3736
inputStream=new GZIPInputStream(wrapInputStream(rawstream));

0 commit comments

Comments
 (0)