Skip to content

Commit c6bb662

Browse files
authored
Merge pull request #15 from databrickslabs/feature/fix-feedback
Fixes following feedback from nfx
2 parents f7828fd + cf3786b commit c6bb662

File tree

6 files changed

+209
-180
lines changed

6 files changed

+209
-180
lines changed

docs/source/usage/DeltaSharing.rst

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@
22
DeltaSharing
33
==================
44

5-
From the provider JSON we can easily instantiate our Java Connector using the DeltaSharingFactory instance.
6-
DeltaSharingFactory provides create API that returns and instance of DeltaSharing object based on the provider JSON.
5+
From the provider JSON we can easily instantiate our Java Connector using the DeltaSharing object, based on the provider JSON.
76
DeltaSharing instance is used to get access ot the table reader and to interact with the delta sharing server.
87

98
Examples
@@ -12,22 +11,19 @@ Examples
1211
.. tabs::
1312
.. code-tab:: java
1413

15-
import com.databricks.labs.delta.sharing.java.DeltaSharingFactory;
1614
import com.databricks.labs.delta.sharing.java.DeltaSharing;
1715

18-
DeltaSharing sharing = DeltaSharingFactory
19-
.create(
16+
DeltaSharing sharing = DeltaSharing(
2017
providerJSON,
2118
"/dedicated/persisted/cache/location/"
2219
);
2320

2421

2522
.. code-tab:: scala
2623

27-
import com.databricks.labs.delta.sharing.java.DeltaSharingFactory
24+
import com.databricks.labs.delta.sharing.java.DeltaSharing
2825

29-
val sharing = DeltaSharingFactory
30-
.create(
26+
val sharing = DeltaSharing(
3127
providerJSON,
3228
"/dedicated/persisted/cache/location/"
3329
)

src/main/java/com/databricks/labs/delta/sharing/java/DeltaSharing.java

Lines changed: 121 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.databricks.labs.delta.sharing.java;
22

3+
import com.databricks.labs.delta.sharing.java.adaptor.DeltaSharingJsonProvider;
34
import com.databricks.labs.delta.sharing.java.format.parquet.TableReader;
45
import io.delta.sharing.spark.DeltaSharingFileSystem;
56
import io.delta.sharing.spark.DeltaSharingProfileProvider;
@@ -16,6 +17,7 @@
1617
import java.nio.file.OpenOption;
1718
import java.nio.file.Path;
1819
import java.nio.file.Paths;
20+
import java.util.HashMap;
1921
import java.util.LinkedList;
2022
import java.util.List;
2123
import java.util.Map;
@@ -31,30 +33,25 @@
3133
import scala.collection.Seq;
3234

3335
/**
34-
* A wrapper class for {@link io.delta.sharing.spark.DeltaSharingRestClient} instance. This class
35-
* ensures we have access to a temp directory where parquet files will be kept during the life of
36-
* the JVM. The temp directory has a destroy hook register as do all the files when they land in the
37-
* temp directory. The class keeps a runtime map of metadata for each file stored in the temp
38-
* directory. If the metadata has changed we will retrieve the new state of the file. If the
39-
* metadata has remained the same we will use the available local copy in the temp directory.
36+
* A wrapper class for {@link io.delta.sharing.spark.DeltaSharingRestClient}
37+
* instance. This class ensures we have access to a temp directory where parquet
38+
* files will be kept during the life of the JVM. The temp directory has a
39+
* destroy hook register as do all the files when they land in the temp
40+
* directory. The class keeps a runtime map of metadata for each file stored in
41+
* the temp directory. If the metadata has changed we will retrieve the new
42+
* state of the file. If the metadata has remained the same we will use the
43+
* available local copy in the temp directory.
4044
* <p/>
4145
*
4246
* @author Milos Colic
4347
* @since 0.1.0
4448
*/
4549
public class DeltaSharing {
46-
DeltaSharingProfileProvider profileProvider;
47-
DeltaSharingRestClient httpClient;
48-
Path checkpointPath;
49-
Path tempDir;
50-
Map<String, DeltaTableMetadata> metadataMap;
51-
52-
/**
53-
* Default constructor Construction of instances is delegated to a factory.
54-
*
55-
* @see DeltaSharingFactory
56-
*/
57-
public DeltaSharing() {}
50+
private DeltaSharingProfileProvider profileProvider;
51+
private DeltaSharingRestClient httpClient;
52+
private Path checkpointPath;
53+
private Path tempDir;
54+
private Map<String, DeltaTableMetadata> metadataMap;
5855

5956
/**
6057
* Getter for {@link DeltaSharing#profileProvider}.
@@ -71,12 +68,61 @@ public DeltaSharingRestClient getHttpClient() {
7168
}
7269

7370
/**
74-
* Adapter method for getting a List of all tables. Scala API returns a {@link Seq} and we require
75-
* a {@link List}.
71+
* Getter for checkpointPath.
72+
*/
73+
public Path getCheckpointPath() {
74+
return checkpointPath;
75+
}
76+
77+
/**
78+
* Constructor.
79+
*
80+
* @param profileProvider An instance of {@link DeltaSharingProfileProvider}.
81+
* @param checkpointPath An path to a temporary checkpoint location.
82+
* @throws IOException Transitive due to the call to
83+
* {@link Files#createTempDirectory(String, FileAttribute[])}.
84+
*/
85+
public DeltaSharing(final DeltaSharingProfileProvider profileProvider,
86+
final Path checkpointPath) throws IOException {
87+
88+
if (!Files.exists(checkpointPath)) {
89+
Files.createDirectory(checkpointPath);
90+
}
91+
Path tempDir = Files.createTempDirectory(checkpointPath, "delta_sharing");
92+
tempDir.toFile().deleteOnExit();
93+
94+
this.profileProvider = profileProvider;
95+
this.httpClient =
96+
new DeltaSharingRestClient(profileProvider, 120, 4, false);
97+
this.checkpointPath = checkpointPath;
98+
this.tempDir = tempDir;
99+
this.metadataMap = new HashMap<>();
100+
}
101+
102+
/**
103+
* Constructor.
104+
*
105+
* @param providerConf A valid JSON document corresponding to
106+
* {@link DeltaSharingProfileProvider}.
107+
* @param checkpointLocation A string containing a path to be used as a
108+
* checkpoint location.
109+
* @throws IOException Transitive due to the call to
110+
* {@link Files#createDirectories(Path, FileAttribute[])}.
111+
*/
112+
public DeltaSharing(final String providerConf,
113+
final String checkpointLocation) throws IOException {
114+
this(new DeltaSharingJsonProvider(providerConf),
115+
Paths.get(checkpointLocation));
116+
}
117+
118+
119+
/**
120+
* Adapter method for getting a List of all tables. Scala API returns a
121+
* {@link Seq} and we require a {@link List}.
76122
*
77123
* @return A list of all tables.
78-
* @implNote Suppress unnecessary local variable is done to remove warnings for a decoupled Scala
79-
* to Java conversion call and a return call.
124+
* @implNote Suppress unnecessary local variable is done to remove warnings
125+
* for a decoupled Scala to Java conversion call and a return call.
80126
*/
81127
@SuppressWarnings("UnnecessaryLocalVariable")
82128
public List<Table> listAllTables() {
@@ -86,49 +132,56 @@ public List<Table> listAllTables() {
86132
}
87133

88134
/**
89-
* Getter for {@link io.delta.sharing.spark.DeltaSharingRestClient#getMetadata(Table)}.
135+
* Getter for
136+
* {@link io.delta.sharing.spark.DeltaSharingRestClient#getMetadata(Table)}.
137+
*
138+
* @return DeltaTableMetadata
90139
*/
91140
public DeltaTableMetadata getMetadata(Table table) {
92141
return httpClient.getMetadata(table);
93142
}
94143

95144
/**
96-
* Getter for {@link io.delta.sharing.spark.DeltaSharingRestClient#getTableVersion(Table)}
145+
* Getter for
146+
* {@link io.delta.sharing.spark.DeltaSharingRestClient#getTableVersion(Table)}
97147
* (Table)}.
98148
*/
99149
public long getTableVersion(Table table) {
100150
return httpClient.getTableVersion(table);
101151
}
102152

103153
/**
104-
* Adapter method for getting a List of files belonging to a {@link Table}. Scala API returns a
105-
* {@link Seq} and we require a {@link List}.
154+
* Adapter method for getting a List of files belonging to a {@link Table}.
155+
* Scala API returns a {@link Seq} and we require a {@link List}.
106156
*
107157
* @return A list of files corresponding to a table.
108-
* @implNote Suppress unnecessary local variable is done to remove warnings for a decoupled Scala
109-
* to Java conversion call and a return call.
158+
* @implNote Suppress unnecessary local variable is done to remove warnings
159+
* for a decoupled Scala to Java conversion call and a return call.
110160
*/
111161
@SuppressWarnings("UnnecessaryLocalVariable")
112-
public List<AddFile> getFiles(Table table, List<String> predicates, Integer limit) {
113-
Seq<String> predicatesSeq =
114-
JavaConverters.asScalaIteratorConverter(predicates.iterator()).asScala().toSeq();
162+
public List<AddFile> getFiles(Table table, List<String> predicates,
163+
Integer limit) {
164+
Seq<String> predicatesSeq = JavaConverters
165+
.asScalaIteratorConverter(predicates.iterator()).asScala().toSeq();
115166
DeltaTableFiles deltaTableFiles;
116167
if (limit != null) {
117-
deltaTableFiles = httpClient.getFiles(table, predicatesSeq, Some$.MODULE$.apply(limit));
168+
deltaTableFiles =
169+
httpClient.getFiles(table, predicatesSeq, Some$.MODULE$.apply(limit));
118170
} else {
119-
deltaTableFiles = httpClient.getFiles(table, predicatesSeq, Option$.MODULE$.apply(null));
171+
deltaTableFiles = httpClient.getFiles(table, predicatesSeq,
172+
Option$.MODULE$.apply(null));
120173
}
121174
List<AddFile> files = JavaConverters.seqAsJavaList(deltaTableFiles.files());
122175
return files;
123176
}
124177

125178
/**
126-
* Adapter method for getting a List of files belonging to a {@link Table}. Scala API returns a
127-
* {@link Seq} and we require a {@link List}.
179+
* Adapter method for getting a List of files belonging to a {@link Table}.
180+
* Scala API returns a {@link Seq} and we require a {@link List}.
128181
*
129182
* @return A list of files corresponding to a table.
130-
* @implNote Suppress unnecessary local variable is done to remove warnings for a decoupled Scala
131-
* to Java conversion call and a return call.
183+
* @implNote Suppress unnecessary local variable is done to remove warnings
184+
* for a decoupled Scala to Java conversion call and a return call.
132185
*/
133186
public List<AddFile> getFiles(Table table, List<String> predicates) {
134187
return getFiles(table, predicates, null);
@@ -140,7 +193,8 @@ public List<AddFile> getFiles(Table table, List<String> predicates) {
140193
* @return A string representing coordinates of the table.
141194
*/
142195
public String getCoordinates(Table table) {
143-
Pattern pattern = Pattern.compile("[a-zA-Z0-9\\-_]*\\.[a-zA-Z0-9\\-_]*\\.[a-zA-Z0-9\\-_]*",
196+
Pattern pattern = Pattern.compile(
197+
"[a-zA-Z0-9\\-_]*\\.[a-zA-Z0-9\\-_]*\\.[a-zA-Z0-9\\-_]*",
144198
Pattern.CASE_INSENSITIVE);
145199
String coords = table.share() + "." + table.schema() + "." + table.name();
146200
Matcher matcher = pattern.matcher(coords);
@@ -153,7 +207,8 @@ public String getCoordinates(Table table) {
153207
}
154208

155209
/**
156-
* Getter for a temp file that will be stored in {@link DeltaSharing#checkpointPath}.
210+
* Getter for a temp file that will be stored in
211+
* {@link DeltaSharing#checkpointPath}.
157212
*
158213
* @param file File for which we are generating the checkpoint path for.
159214
* @return A fully qualified path for a checkpoint file copy.
@@ -185,8 +240,8 @@ private List<Path> writeCheckpointFiles(List<AddFile> files)
185240
throws IOException, URISyntaxException {
186241
List<Path> paths = new LinkedList<>();
187242
for (AddFile file : files) {
188-
FSDataInputStream stream =
189-
new FSDataInputStream(new InMemoryHttpInputStream(new URI(file.url())));
243+
FSDataInputStream stream = new FSDataInputStream(
244+
new InMemoryHttpInputStream(new URI(file.url())));
190245
Path path = getFileCheckpointPath(file);
191246
paths.add(path);
192247
Files.write(path, IOUtils.toByteArray(stream));
@@ -196,7 +251,8 @@ private List<Path> writeCheckpointFiles(List<AddFile> files)
196251
}
197252

198253
/**
199-
* Getter for a temp files that will be stored in {@link DeltaSharing#checkpointPath}.
254+
* Getter for a temp files that will be stored in
255+
* {@link DeltaSharing#checkpointPath}.
200256
*
201257
* @param files Files for which we are generating the checkpoint paths for.
202258
* @return A List of fully qualified paths for a checkpoint file copies.
@@ -211,13 +267,16 @@ private List<Path> getCheckpointPaths(List<AddFile> files) {
211267
}
212268

213269
/**
214-
* Resolves and constructs the {@link TableReader} instance associated with the table. It inspects
215-
* the available {@link DeltaSharing#metadataMap} and based on the metadata it re-fetches the
216-
* stale into the {@link DeltaSharing#checkpointPath} directory.
270+
* Resolves and constructs the {@link TableReader} instance associated with
271+
* the table. It inspects the available {@link DeltaSharing#metadataMap} and
272+
* based on the metadata it re-fetches the stale into the
273+
* {@link DeltaSharing#checkpointPath} directory.
217274
*
218275
* @param table Table whose reader is requested.
219-
* @return An instance of {@link TableReader} that will manage the reads from the table.
220-
* @throws IOException Transitive due to the call to {@link TableReader#TableReader(List)}.
276+
* @return An instance of {@link TableReader} that will manage the reads from
277+
* the table.
278+
* @throws IOException Transitive due to the call to
279+
* {@link TableReader#TableReader(List)}.
221280
*/
222281
@SuppressWarnings("UnnecessaryLocalVariable")
223282
public TableReader<GenericRecord> getTableReader(Table table)
@@ -247,13 +306,16 @@ public TableReader<GenericRecord> getTableReader(Table table)
247306
}
248307

249308
/**
250-
* A reader method that reads all the records from all the files that belong to a table.
309+
* A reader method that reads all the records from all the files that belong
310+
* to a table.
251311
*
252312
* @param table An instance of {@link Table} whose records we are reading.
253313
* @return A list of records from the table instance.
254-
* @throws IOException Transitive due to the call to {@link TableReader#read()}
314+
* @throws IOException Transitive due to the call to
315+
* {@link TableReader#read()}
255316
*/
256-
public List<GenericRecord> getAllRecords(Table table) throws IOException, URISyntaxException {
317+
public List<GenericRecord> getAllRecords(Table table)
318+
throws IOException, URISyntaxException {
257319
TableReader<GenericRecord> tableReader = getTableReader(table);
258320
List<GenericRecord> records = new LinkedList<>();
259321
GenericRecord currentRecord = tableReader.read();
@@ -265,16 +327,18 @@ public List<GenericRecord> getAllRecords(Table table) throws IOException, URISyn
265327
}
266328

267329
/**
268-
* A reader method that reads all the records from all the files that belong to a table. This call
269-
* will always create a new instance of the reader and will always return the same N records. For
270-
* full reads use {@link DeltaSharing#getTableReader(Table)} to access the reader and then use
271-
* {@link TableReader#readN(Integer)} to read blocks of records.
330+
* A reader method that reads all the records from all the files that belong
331+
* to a table. This call will always create a new instance of the reader and
332+
* will always return the same N records. For full reads use
333+
* {@link DeltaSharing#getTableReader(Table)} to access the reader and then
334+
* use {@link TableReader#readN(Integer)} to read blocks of records.
272335
*
273336
* @param table An instance of {@link Table} whose records we are reading.
274337
* @param numRec Number of records to be read at most.
275-
* @return A list of records from the table instance. If less records are available, only the
276-
* available records will be returned.
277-
* @throws IOException Transitive due to the call to {@link TableReader#read()}
338+
* @return A list of records from the table instance. If less records are
339+
* available, only the available records will be returned.
340+
* @throws IOException Transitive due to the call to
341+
* {@link TableReader#read()}
278342
*/
279343
public List<GenericRecord> getNumRecords(Table table, int numRec)
280344
throws IOException, URISyntaxException {

0 commit comments

Comments
 (0)