Skip to content

Commit dbc9c67

Browse files
[SDP] SQL and Python support (esp. CreateFlowCommand)
1 parent 73909f3 commit dbc9c67

File tree

6 files changed

+143
-47
lines changed

6 files changed

+143
-47
lines changed

docs/declarative-pipelines/GraphRegistrationContext.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ registerFlow(
7777

7878
* `PipelinesHandler` ([Spark Connect]({{ book.spark_connect }})) is requested to [handle DEFINE_FLOW command](PipelinesHandler.md#defineFlow)
7979
* `SqlGraphRegistrationContext` is requested to [process the following SQL commands](SqlGraphRegistrationContext.md#processSqlQuery):
80-
* `CreateFlowCommand`
80+
* [CreateFlowCommand](../logical-operators/CreateFlowCommand.md)
8181
* `CreateMaterializedViewAsSelect`
8282
* `CreateView`
8383
* `CreateStreamingTableAsSelect`

docs/declarative-pipelines/SparkConnectGraphElementRegistry.md

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
`SparkConnectGraphElementRegistry` is a [GraphElementRegistry](GraphElementRegistry.md).
44

5+
`SparkConnectGraphElementRegistry` acts as a communication bridge between Spark Declarative Pipelines' Python execution environment and Spark Connect Server (with [PipelinesHandler](PipelinesHandler.md)).
6+
57
## Creating Instance
68

79
`SparkConnectGraphElementRegistry` takes the following to be created:
@@ -28,4 +30,25 @@
2830

2931
`register_dataset` makes sure that the given `Dataset` is either `MaterializedView`, `StreamingTable` or `TemporaryView`.
3032

31-
`register_dataset` requests this [SparkSession](#spark) to [execute](#execute_command) a `PipelineCommand.DefineDataset`.
33+
`register_dataset` requests this [SparkConnectClient](#spark) to [execute](#execute_command) a `PipelineCommand.DefineDataset` command.
34+
35+
!!! note "PipelinesHandler"
36+
`DefineDataset` commands are handled by [PipelinesHandler](PipelinesHandler.md#defineDataset) on Spark Connect Server.
37+
38+
## register_flow { #register_flow }
39+
40+
??? note "GraphElementRegistry"
41+
42+
```py
43+
register_flow(
44+
self,
45+
flow: Flow
46+
) -> None
47+
```
48+
49+
`register_flow` is part of the [GraphElementRegistry](GraphElementRegistry.md#register_flow) abstraction.
50+
51+
`register_flow` requests this [SparkConnectClient](#spark) to [execute](#execute_command) a `PipelineCommand.DefineFlow` command.
52+
53+
!!! note "PipelinesHandler"
54+
`DefineFlow` commands are handled by [PipelinesHandler](PipelinesHandler.md#defineFlow) on Spark Connect Server.

docs/declarative-pipelines/SqlGraphRegistrationContext.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ processSqlQuery(
5151
* `CreateMaterializedViewAsSelect`
5252
* `CreateStreamingTableAsSelect`
5353
* `CreateStreamingTable`
54-
* `CreateFlowCommand`
54+
* [CreateFlowCommand](#CreateFlowCommand)
5555

5656
### splitSqlFileIntoQueries { #splitSqlFileIntoQueries }
5757

@@ -63,3 +63,22 @@ splitSqlFileIntoQueries(
6363
```
6464

6565
`splitSqlFileIntoQueries`...FIXME
66+
67+
## CreateFlowCommand { #CreateFlowCommand }
68+
69+
[CreateFlowCommand](../logical-operators/CreateFlowCommand.md) logical commands are handled by `CreateFlowHandler`.
70+
71+
A flow name must be a single-part name (that is resolved against the current pipelines catalog and database).
72+
73+
The [flowOperation](../logical-operators/CreateFlowCommand.md#flowOperation) of a [CreateFlowCommand](../logical-operators/CreateFlowCommand.md) command must be [InsertIntoStatement](../logical-operators/InsertIntoStatement.md).
74+
75+
!!! note
76+
Only `INSERT INTO ... BY NAME` flows are supported in [Spark Declarative Pipelines](index.md).
77+
78+
`INSERT OVERWRITE` flows are not supported.
79+
80+
`IF NOT EXISTS` not supported for flows.
81+
82+
Neither partition spec nor user-specified schema can be specified.
83+
84+
In the end, `CreateFlowHandler` requests this [GraphRegistrationContext](#graphRegistrationContext) to [register](GraphRegistrationContext.md#registerFlow) an [UnresolvedFlow](UnresolvedFlow.md).

docs/declarative-pipelines/index.md

Lines changed: 64 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ subtitle: ⚠️ 4.1.0-SNAPSHOT
44

55
# Declarative Pipelines
66

7-
**Spark Declarative Pipelines (SDP)** is a declarative framework for building ETL pipelines on Apache Spark using Python or SQL.
7+
**Spark Declarative Pipelines (SDP)** is a declarative framework for building ETL pipelines on Apache Spark using [Python](#python) or [SQL](#sql).
88

99
??? warning "Apache Spark 4.1.0-SNAPSHOT"
1010
Declarative Pipelines framework is only available in the development branch of Apache Spark 4.1.0-SNAPSHOT.
@@ -28,22 +28,18 @@ subtitle: ⚠️ 4.1.0-SNAPSHOT
2828
Type --help for more information.
2929
```
3030

31-
Streaming flows are backed by streaming sources, and batch flows are backed by batch sources.
31+
A Declarative Pipelines project is configured using a [pipeline specification file](#pipeline-specification-file) and executed with [spark-pipelines](#spark-pipelines) shell script.
32+
33+
In the pipeline specification file, Declarative Pipelines developers include definitions of tables, views and flows (transformations) in Python and SQL. A SDP project can use both languages simultaneously.
3234

3335
Declarative Pipelines uses [Python decorators](#python-decorators) to describe tables, views and flows, declaratively.
3436

37+
Streaming flows are backed by streaming sources, and batch flows are backed by batch sources.
38+
3539
[DataflowGraph](DataflowGraph.md) is the core graph structure in Declarative Pipelines.
3640

3741
Once described, a pipeline can be [started](PipelineExecution.md#runPipeline) (on a [PipelineExecution](PipelineExecution.md)).
3842

39-
## Python Import Alias Convention
40-
41-
As of this [Commit 6ab0df9]({{ spark.commit }}/6ab0df9287c5a9ce49769612c2bb0a1daab83bee), the convention to alias the import of Declarative Pipelines in Python is `dp` (from `sdp`).
42-
43-
```python
44-
from pyspark import pipelines as dp
45-
```
46-
4743
## Pipeline Specification File
4844

4945
The heart of a Declarative Pipelines project is a pipeline specification file (in YAML format).
@@ -68,12 +64,45 @@ definitions:
6864
include: transformations/**/*.sql
6965
```
7066
71-
## Python Decorators for Tables and Flows { #python-decorators }
67+
## spark-pipelines Shell Script { #spark-pipelines }
7268
73-
Declarative Pipelines uses the following [Python decorators](https://peps.python.org/pep-0318/) to describe tables and views:
69+
`spark-pipelines` shell script is used to launch [org.apache.spark.deploy.SparkPipelines](SparkPipelines.md).
7470

75-
* [@dp.materialized_view](#materialized_view) for materialized views
76-
* [@dp.table](#table) for streaming and batch tables
71+
## Dataset Types
72+
73+
Declarative Pipelines supports the following dataset types:
74+
75+
* **Materialized Views** (datasets) that are published to a catalog
76+
* **Table** that are published to a catalog
77+
* **Views** that are not published to a catalog
78+
79+
## Spark Connect Only { #spark-connect }
80+
81+
Declarative Pipelines currently only supports Spark Connect.
82+
83+
```console
84+
$ ./bin/spark-pipelines --conf spark.api.mode=xxx
85+
...
86+
25/08/03 12:33:57 INFO SparkPipelines: --spark.api.mode must be 'connect'. Declarative Pipelines currently only supports Spark Connect.
87+
Exception in thread "main" org.apache.spark.SparkUserAppException: User application exited with 1
88+
at org.apache.spark.deploy.SparkPipelines$$anon$1.handle(SparkPipelines.scala:73)
89+
at org.apache.spark.launcher.SparkSubmitOptionParser.parse(SparkSubmitOptionParser.java:169)
90+
at org.apache.spark.deploy.SparkPipelines$$anon$1.<init>(SparkPipelines.scala:58)
91+
at org.apache.spark.deploy.SparkPipelines$.splitArgs(SparkPipelines.scala:57)
92+
at org.apache.spark.deploy.SparkPipelines$.constructSparkSubmitArgs(SparkPipelines.scala:43)
93+
at org.apache.spark.deploy.SparkPipelines$.main(SparkPipelines.scala:37)
94+
at org.apache.spark.deploy.SparkPipelines.main(SparkPipelines.scala)
95+
```
96+
97+
## Python
98+
99+
### Python Import Alias Convention
100+
101+
As of this [Commit 6ab0df9]({{ spark.commit }}/6ab0df9287c5a9ce49769612c2bb0a1daab83bee), the convention to alias the import of Declarative Pipelines in Python is `dp` (from `sdp`).
102+
103+
```python
104+
from pyspark import pipelines as dp
105+
```
77106

78107
### pyspark.pipelines Python Module { #pyspark_pipelines }
79108

@@ -91,6 +120,13 @@ Use the following import in your Python code:
91120
from pyspark import pipelines as dp
92121
```
93122

123+
### Python Decorators
124+
125+
Declarative Pipelines uses the following [Python decorators](https://peps.python.org/pep-0318/) to describe tables and views:
126+
127+
* [@dp.materialized_view](#materialized_view) for materialized views
128+
* [@dp.table](#table) for streaming and batch tables
129+
94130
### @dp.append_flow { #append_flow }
95131

96132
### @dp.create_streaming_table { #create_streaming_table }
@@ -103,6 +139,20 @@ Creates a [MaterializedView](MaterializedView.md) (for a table whose contents ar
103139

104140
### @dp.temporary_view { #temporary_view }
105141

142+
[Registers](GraphElementRegistry.md#register_dataset) a `TemporaryView` dataset and a [Flow](Flow.md) in the [GraphElementRegistry](GraphElementRegistry.md#register_flow).
143+
144+
## SQL
145+
146+
Spark Declarative Pipelines supports SQL language to define pipelines.
147+
148+
Pipelines elements are defined in SQL files included as `definitions` in a [pipelines specification file](#pipeline-specification-file).
149+
150+
[SqlGraphRegistrationContext](SqlGraphRegistrationContext.md) is used on Spark Connect Server to handle SQL statements (from SQL definitions files and [Python decorators](#python-decorators)).
151+
152+
Supported SQL statements:
153+
154+
* [CREATE FLOW AS INSERT INTO BY NAME](../sql/SparkSqlAstBuilder.md#visitCreatePipelineInsertIntoFlow)
155+
106156
## Demo: Create Virtual Environment for Python Client
107157

108158
```shell
@@ -379,36 +429,6 @@ spark-warehouse
379429
3 directories, 4 files
380430
```
381431

382-
## Spark Connect Only { #spark-connect }
383-
384-
Declarative Pipelines currently only supports Spark Connect.
385-
386-
```console
387-
$ ./bin/spark-pipelines --conf spark.api.mode=xxx
388-
...
389-
25/08/03 12:33:57 INFO SparkPipelines: --spark.api.mode must be 'connect'. Declarative Pipelines currently only supports Spark Connect.
390-
Exception in thread "main" org.apache.spark.SparkUserAppException: User application exited with 1
391-
at org.apache.spark.deploy.SparkPipelines$$anon$1.handle(SparkPipelines.scala:73)
392-
at org.apache.spark.launcher.SparkSubmitOptionParser.parse(SparkSubmitOptionParser.java:169)
393-
at org.apache.spark.deploy.SparkPipelines$$anon$1.<init>(SparkPipelines.scala:58)
394-
at org.apache.spark.deploy.SparkPipelines$.splitArgs(SparkPipelines.scala:57)
395-
at org.apache.spark.deploy.SparkPipelines$.constructSparkSubmitArgs(SparkPipelines.scala:43)
396-
at org.apache.spark.deploy.SparkPipelines$.main(SparkPipelines.scala:37)
397-
at org.apache.spark.deploy.SparkPipelines.main(SparkPipelines.scala)
398-
```
399-
400-
## spark-pipelines Shell Script { #spark-pipelines }
401-
402-
`spark-pipelines` shell script is used to launch [org.apache.spark.deploy.SparkPipelines](SparkPipelines.md).
403-
404-
## Dataset Types
405-
406-
Declarative Pipelines supports the following dataset types:
407-
408-
* **Materialized Views** (datasets) that are published to a catalog
409-
* **Table** that are published to a catalog
410-
* **Views** that are not published to a catalog
411-
412432
## Demo: Scala API
413433

414434
### Step 1. Register Dataflow Graph
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
---
2+
title: CreateFlowCommand
3+
---
4+
5+
# CreateFlowCommand Binary Logical Operator
6+
7+
`CreateFlowCommand` is a `BinaryCommand` logical operator that represents [CREATE FLOW ... AS INSERT INTO ... BY NAME](../sql/SparkSqlAstBuilder.md#visitCreatePipelineInsertIntoFlow) SQL statements in [Spark Declarative Pipelines](../declarative-pipelines/index.md).
8+
9+
`CreateFlowCommand` is handled by [SqlGraphRegistrationContext](../declarative-pipelines/SqlGraphRegistrationContext.md#CreateFlowCommand).
10+
11+
`Pipelines` execution planning strategy is used to prevent direct execution of Spark Declarative Pipelines' SQL stataments.
12+
13+
## Creating Instance
14+
15+
`CreateFlowCommand` takes the following to be created:
16+
17+
* <span id="name"> Name (`UnresolvedIdentifier` leaf logical operator)
18+
* <span id="flowOperation"> Flow operation ([InsertIntoStatement](InsertIntoStatement.md) unary logical operator)
19+
* <span id="comment"> Comment (optional)
20+
21+
`CreateFlowCommand` is created when:
22+
23+
* `SparkSqlAstBuilder` is requested to [parse CREATE FLOW AS INSERT INTO BY NAME SQL statement](../sql/SparkSqlAstBuilder.md#visitCreatePipelineInsertIntoFlow)

docs/sql/SparkSqlAstBuilder.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,17 @@ Creates a [CreateTable](../logical-operators/CreateTable.md)
125125

126126
ANTLR labeled alternative: `#createHiveTable`
127127

128+
### CREATE FLOW AS INSERT INTO BY NAME { #visitCreatePipelineInsertIntoFlow }
129+
130+
Creates a [CreateFlowCommand](../logical-operators/CreateFlowCommand.md) logical operator for `CREATE FLOW` SQL statement
131+
132+
```sql
133+
CREATE FLOW [ flow_name ]
134+
AS INSERT INTO [ destination_name ] BY NAME
135+
```
136+
137+
ANTLR labeled alternative: `#createPipelineInsertIntoFlow`
138+
128139
### CREATE TABLE { #visitCreateTable }
129140

130141
Creates a [CreateTempViewUsing](../logical-operators/CreateTempViewUsing.md) logical operator for `CREATE TEMPORARY VIEW USING` or falls back to [AstBuilder](AstBuilder.md#visitCreateTable) (to create either a [CreateTableAsSelect](../logical-operators/CreateTableAsSelect.md) or a [CreateTable](../logical-operators/CreateTable.md))

0 commit comments

Comments
 (0)