Skip to content

Commit 2c1aced

Browse files
committed
optimize code
1 parent c5c58a1 commit 2c1aced

File tree

6 files changed

+25
-31
lines changed

6 files changed

+25
-31
lines changed

common/src/test/java/com/robin/comm/test/TestExcelOperation.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,10 @@ public void testGenerate() throws Exception {
5151
AbstractResIterator iterator = new AbstractResIterator() {
5252
Map<String, Object> map = new HashMap<>();
5353
int row = 0;
54-
55-
@Override
56-
public void init() {
57-
58-
}
59-
6054
@Override
6155
public void beforeProcess() {
6256

6357
}
64-
6558
@Override
6659
public void afterProcess() {
6760

core/src/main/java/com/robin/core/compress/util/CompressDecoder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public static InputStream getInputStreamByCompressType(String path, InputStream
5656
inputStream=new LZ4FrameInputStream(wrapInputStream(rawstream));
5757
break;
5858
case COMPRESS_TYPE_LZMA:
59-
inputStream=new LZMAInputStream(wrapInputStream(rawstream));
59+
inputStream=new XZInputStream(wrapInputStream(rawstream));
6060
break;
6161
case COMPRESS_TYPE_ZSTD:
6262
inputStream=new ZstdCompressorInputStream(wrapInputStream(rawstream));

core/src/main/java/com/robin/core/compress/util/CompressEncoder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.commons.compress.compressors.zstandard.ZstdCompressorOutputStream;
1212
import org.tukaani.xz.LZMA2Options;
1313
import org.tukaani.xz.LZMAOutputStream;
14+
import org.tukaani.xz.XZOutputStream;
1415
import org.xerial.snappy.SnappyOutputStream;
1516

1617
import java.io.BufferedOutputStream;
@@ -64,7 +65,7 @@ public static OutputStream getOutputStreamByCompressType(String path,OutputStrea
6465
outputStream=new LZ4FrameOutputStream(wrapOutputStream(rawstream));
6566
break;
6667
case COMPRESS_TYPE_LZMA:
67-
outputStream=new LZMAOutputStream(wrapOutputStream(rawstream),new LZMA2Options(),false);
68+
outputStream=new XZOutputStream(wrapOutputStream(rawstream),new LZMA2Options());
6869
break;
6970
case COMPRESS_TYPE_ZSTD:
7071
outputStream=new ZstdCompressorOutputStream(wrapOutputStream(rawstream));

hadooptool/src/main/java/com/robin/comm/fileaccess/iterator/ParquetFileIterator.java

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,11 @@ public void beforeProcess() {
7272
if (colmeta.getResourceCfgMap().containsKey("file.useAvroEncode") && "true".equalsIgnoreCase(colmeta.getResourceCfgMap().get("file.useAvroEncode").toString())) {
7373
useAvroEncode = true;
7474
}
75+
7576
if (Const.FILESYSTEM.HDFS.getValue().equals(colmeta.getFsType())) {
7677
conf = new HDFSUtil(colmeta).getConfig();
77-
if (colmeta.getColumnList().isEmpty()) {
78-
ParquetReadOptions options = ParquetReadOptions.builder().withMetadataFilter(ParquetMetadataConverter.NO_FILTER).build();
79-
ParquetFileReader ireader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(colmeta.getPath()), conf), options);
80-
ParquetMetadata meta = ireader.getFooter();
81-
msgtype = meta.getFileMetaData().getSchema();
82-
parseSchemaByType();
83-
} else {
84-
schema = AvroUtils.getSchemaFromMeta(colmeta);
85-
}
78+
file=HadoopInputFile.fromPath(new Path(colmeta.getPath()), conf);
79+
getSchema(file,false);
8680
if (!useAvroEncode) {
8781
ParquetReader.Builder<Map<String, Object>> builder = ParquetReader.builder(new CustomReadSupport(), new Path(ResourceUtil.getProcessPath(colmeta.getPath()))).withConf(conf);
8882
ireader = builder.build();
@@ -111,30 +105,35 @@ public void beforeProcess() {
111105
file = ParquetUtil.makeInputFile(seekableInputStream);
112106
}
113107
}
114-
if (colmeta.getColumnList().isEmpty()) {
115-
ParquetReadOptions options = ParquetReadOptions.builder().withMetadataFilter(ParquetMetadataConverter.NO_FILTER).build();
116-
ParquetFileReader ireader = ParquetFileReader.open(file, options);
117-
ParquetMetadata meta = ireader.getFooter();
118-
msgtype = meta.getFileMetaData().getSchema();
119-
parseSchemaByType();
120-
//read footer and schema,must return header
121-
file.newStream().seek(0L);
122-
} else {
123-
schema = AvroUtils.getSchemaFromMeta(colmeta);
124-
}
125-
fields = schema.getFields();
108+
getSchema(file,true);
126109
if (!useAvroEncode) {
127110
ireader = CustomParquetReader.builder(file, colmeta).build();
128111
} else {
129112
preader = AvroParquetReader.<GenericData.Record>builder(file).build();
130113
}
131114
}
115+
fields = schema.getFields();
132116
} catch (Exception ex) {
133117
logger.error("{}", ex.getMessage());
134118
}
135119

136120
}
137121

122+
private void getSchema(InputFile file,boolean seekFrist) throws IOException {
123+
if (colmeta.getColumnList().isEmpty()) {
124+
ParquetReadOptions options = ParquetReadOptions.builder().withMetadataFilter(ParquetMetadataConverter.NO_FILTER).build();
125+
ParquetFileReader ireader = ParquetFileReader.open(file, options);
126+
ParquetMetadata meta = ireader.getFooter();
127+
msgtype = meta.getFileMetaData().getSchema();
128+
parseSchemaByType();
129+
if(seekFrist) {
130+
file.newStream().seek(0L);
131+
}
132+
} else {
133+
schema = AvroUtils.getSchemaFromMeta(colmeta);
134+
}
135+
}
136+
138137
@Override
139138
public boolean hasNext() {
140139
try {

hadooptool/src/main/java/com/robin/comm/fileaccess/writer/ProtoBufFileWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.robin.core.fileaccess.writer.AbstractFileWriter;
1414
import org.springframework.util.CollectionUtils;
1515
import org.springframework.util.ObjectUtils;
16+
import org.tukaani.xz.LZMAOutputStream;
1617

1718
import javax.naming.OperationNotSupportedException;
1819
import java.io.IOException;

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@
127127
<svn.version>1.8.9</svn.version>
128128
<lzocore.version>1.0.4</lzocore.version>
129129
<snappy.version>1.1.2.6</snappy.version>
130-
<xz.version>1.6</xz.version>
130+
<xz.version>1.9</xz.version>
131131
<cassandradriver.version>3.6.0</cassandradriver.version>
132132
<graal.version>1.0.0-rc7</graal.version>
133133
<calcite.version>1.21.0</calcite.version>

0 commit comments

Comments
 (0)