1010import io .delta .sharing .spark .model .DeltaTableFiles ;
1111import io .delta .sharing .spark .model .DeltaTableMetadata ;
1212import io .delta .sharing .spark .model .Table ;
13+ import org .apache .avro .generic .GenericRecord ;
14+ import org .apache .commons .io .IOUtils ;
15+ import org .apache .hadoop .conf .Configuration ;
16+ import org .apache .hadoop .fs .FSDataInputStream ;
17+ import scala .Option$ ;
18+ import scala .Some$ ;
19+ import scala .collection .JavaConverters ;
20+ import scala .collection .Seq ;
21+
1322import java .io .IOException ;
1423import java .net .URI ;
1524import java .net .URISyntaxException ;
2433import java .util .Map ;
2534import java .util .regex .Matcher ;
2635import java .util .regex .Pattern ;
27- import org .apache .avro .generic .GenericRecord ;
28- import org .apache .commons .io .IOUtils ;
29- import org .apache .hadoop .conf .Configuration ;
30- import org .apache .hadoop .fs .FSDataInputStream ;
31- import scala .Option$ ;
32- import scala .Some$ ;
33- import scala .collection .JavaConverters ;
34- import scala .collection .Seq ;
3536
3637/**
3738 * A wrapper class for {@link io.delta.sharing.spark.DeltaSharingRestClient}
@@ -78,12 +79,12 @@ public Path getCheckpointPath() {
7879 * Constructor.
7980 *
8081 * @param profileProvider An instance of {@link DeltaSharingProfileProvider}.
81- * @param checkpointPath An path to a temporary checkpoint location.
82+ * @param checkpointPath An path to a temporary checkpoint location.
8283 * @throws IOException Transitive due to the call to
83- * {@link Files#createTempDirectory(String, FileAttribute[])}.
84+ * {@link Files#createTempDirectory(String, FileAttribute[])}.
8485 */
8586 public DeltaSharing (final DeltaSharingProfileProvider profileProvider ,
86- final Path checkpointPath ) throws IOException {
87+ final Path checkpointPath ) throws IOException {
8788
8889 if (!Files .exists (checkpointPath )) {
8990 Files .createDirectory (checkpointPath );
@@ -102,15 +103,15 @@ public DeltaSharing(final DeltaSharingProfileProvider profileProvider,
102103 /**
103104 * Constructor.
104105 *
105- * @param providerConf A valid JSON document corresponding to
106- * {@link DeltaSharingProfileProvider}.
106+ * @param providerConf A valid JSON document corresponding to
107+ * {@link DeltaSharingProfileProvider}.
107108 * @param checkpointLocation A string containing a path to be used as a
108- * checkpoint location.
109+ * checkpoint location.
109110 * @throws IOException Transitive due to the call to
110- * {@link Files#createDirectories(Path, FileAttribute[])}.
111+ * {@link Files#createDirectories(Path, FileAttribute[])}.
111112 */
112113 public DeltaSharing (final String providerConf ,
113- final String checkpointLocation ) throws IOException {
114+ final String checkpointLocation ) throws IOException {
114115 this (new DeltaSharingJsonProvider (providerConf ),
115116 Paths .get (checkpointLocation ));
116117 }
@@ -122,7 +123,7 @@ public DeltaSharing(final String providerConf,
122123 *
123124 * @return A list of all tables.
124125 * @implNote Suppress unnecessary local variable is done to remove warnings
125- * for a decoupled Scala to Java conversion call and a return call.
126+ * for a decoupled Scala to Java conversion call and a return call.
126127 */
127128 @ SuppressWarnings ("UnnecessaryLocalVariable" )
128129 public List <Table > listAllTables () {
@@ -156,11 +157,11 @@ public long getTableVersion(Table table) {
156157 *
157158 * @return A list of files corresponding to a table.
158159 * @implNote Suppress unnecessary local variable is done to remove warnings
159- * for a decoupled Scala to Java conversion call and a return call.
160+ * for a decoupled Scala to Java conversion call and a return call.
160161 */
161162 @ SuppressWarnings ("UnnecessaryLocalVariable" )
162163 public List <AddFile > getFiles (Table table , List <String > predicates ,
163- Integer limit ) {
164+ Integer limit ) {
164165 Seq <String > predicatesSeq = JavaConverters
165166 .asScalaIteratorConverter (predicates .iterator ()).asScala ().toSeq ();
166167 DeltaTableFiles deltaTableFiles ;
@@ -181,7 +182,7 @@ public List<AddFile> getFiles(Table table, List<String> predicates,
181182 *
182183 * @return A list of files corresponding to a table.
183184 * @implNote Suppress unnecessary local variable is done to remove warnings
184- * for a decoupled Scala to Java conversion call and a return call.
185+ * for a decoupled Scala to Java conversion call and a return call.
185186 */
186187 public List <AddFile > getFiles (Table table , List <String > predicates ) {
187188 return getFiles (table , predicates , null );
@@ -199,11 +200,11 @@ public String getCoordinates(Table table) {
199200 String coords = table .share () + "." + table .schema () + "." + table .name ();
200201 Matcher matcher = pattern .matcher (coords );
201202 boolean matchFound = matcher .find ();
202- if (matchFound ) {
203- return coords ;
204- } else {
203+ if (!matchFound ) {
205204 throw new IllegalArgumentException ("Invalid format for coordinates" );
206205 }
206+
207+ return coords ;
207208 }
208209
209210 /**
@@ -218,13 +219,13 @@ private Path getFileCheckpointPath(AddFile file) {
218219 Pattern pattern = Pattern .compile ("[a-zA-Z0-9]*" , Pattern .CASE_INSENSITIVE );
219220 Matcher matcher = pattern .matcher (fileId );
220221 boolean matchFound = matcher .find ();
221- if (matchFound ) {
222- String path = String .format ("%s/%s.parquet" , this .tempDir , file .id ());
223- return Paths .get (path );
224- } else {
222+ if (!matchFound ) {
225223 throw new IllegalArgumentException (
226224 "Invalid format for file id. The id contains special characters." );
227225 }
226+
227+ String path = String .format ("%s/%s.parquet" , this .tempDir , file .id ());
228+ return Paths .get (path );
228229 }
229230
230231 /**
@@ -234,7 +235,7 @@ private Path getFileCheckpointPath(AddFile file) {
234235 * @param files Files for which we are generating the checkpoint file copies.
235236 * @return A fully qualified path for a checkpoint file copy.
236237 * @throws IOException Transitive exception due to the call to
237- * {@link Files#write(Path, byte[], OpenOption...)}.
238+ * {@link Files#write(Path, byte[], OpenOption...)}.
238239 */
239240 private List <Path > writeCheckpointFiles (List <AddFile > files )
240241 throws IOException , URISyntaxException {
@@ -274,9 +275,9 @@ private List<Path> getCheckpointPaths(List<AddFile> files) {
274275 *
275276 * @param table Table whose reader is requested.
276277 * @return An instance of {@link TableReader} that will manage the reads from
277- * the table.
278+ * the table.
278279 * @throws IOException Transitive due to the call to
279- * {@link TableReader#TableReader(List)}.
280+ * {@link TableReader#TableReader(List)}.
280281 */
281282 @ SuppressWarnings ("UnnecessaryLocalVariable" )
282283 public TableReader <GenericRecord > getTableReader (Table table )
@@ -286,8 +287,27 @@ public TableReader<GenericRecord> getTableReader(Table table)
286287 fs .setConf (new Configuration ());
287288 }
288289 String uniqueRef = getCoordinates (table );
289- List <Path > paths ;
290290 DeltaTableMetadata newMetadata = this .getMetadata (table );
291+
292+ List <Path > paths = getPaths (uniqueRef , files , newMetadata );
293+
294+ TableReader <GenericRecord > tableReader = new TableReader <>(paths );
295+ return tableReader ;
296+ }
297+
298+ /**
299+ * Fetches the list of file pats from the checkpoint location.
300+ * Files whose metadata has drifted are updated.
301+ *
302+ * @param uniqueRef Reference via table coordinates.
303+ * @param files A list of add files for the table.
304+ * @param newMetadata A new value of the metadata for the table.
305+ * @return A list of paths to checkpoint files.
306+ * @throws IOException Read/Write errors can occur if temp directory has been altered outside the JVM.
307+ * @throws URISyntaxException URI exception can be thrown by writeCheckpointFiles method call.
308+ */
309+ private List <Path > getPaths (String uniqueRef , List <AddFile > files , DeltaTableMetadata newMetadata ) throws IOException , URISyntaxException {
310+ List <Path > paths ;
291311 if (this .metadataMap .containsKey (uniqueRef )) {
292312 DeltaTableMetadata metadata = this .metadataMap .get (uniqueRef );
293313 if (!newMetadata .equals (metadata )) {
@@ -300,9 +320,7 @@ public TableReader<GenericRecord> getTableReader(Table table)
300320 paths = writeCheckpointFiles (files );
301321 this .metadataMap .put (uniqueRef , newMetadata );
302322 }
303-
304- TableReader <GenericRecord > tableReader = new TableReader <>(paths );
305- return tableReader ;
323+ return paths ;
306324 }
307325
308326 /**
@@ -312,7 +330,7 @@ public TableReader<GenericRecord> getTableReader(Table table)
312330 * @param table An instance of {@link Table} whose records we are reading.
313331 * @return A list of records from the table instance.
314332 * @throws IOException Transitive due to the call to
315- * {@link TableReader#read()}
333+ * {@link TableReader#read()}
316334 */
317335 public List <GenericRecord > getAllRecords (Table table )
318336 throws IOException , URISyntaxException {
@@ -333,12 +351,12 @@ public List<GenericRecord> getAllRecords(Table table)
333351 * {@link DeltaSharing#getTableReader(Table)} to access the reader and then
334352 * use {@link TableReader#readN(Integer)} to read blocks of records.
335353 *
336- * @param table An instance of {@link Table} whose records we are reading.
354+ * @param table An instance of {@link Table} whose records we are reading.
337355 * @param numRec Number of records to be read at most.
338356 * @return A list of records from the table instance. If less records are
339- * available, only the available records will be returned.
357+ * available, only the available records will be returned.
340358 * @throws IOException Transitive due to the call to
341- * {@link TableReader#read()}
359+ * {@link TableReader#read()}
342360 */
343361 public List <GenericRecord > getNumRecords (Table table , int numRec )
344362 throws IOException , URISyntaxException {
0 commit comments