Skip to content

Commit 9bb9cfb

Browse files
[SDP] PipelinesHandler and Pipeline Commands
1 parent dbc9c67 commit 9bb9cfb

File tree

5 files changed

+42
-25
lines changed

5 files changed

+42
-25
lines changed

docs/declarative-pipelines/GraphRegistrationContext.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ toDataflowGraph: DataflowGraph
3131

3232
`toDataflowGraph` is used when:
3333

34-
* `PipelinesHandler` ([Spark Connect]({{ book.spark_connect }})) is requested to [startRun](PipelinesHandler.md#startRun)
34+
* `PipelinesHandler` ([Spark Connect]({{ book.spark_connect }})) is requested to [start a pipeline run](PipelinesHandler.md#startRun)
3535

3636
## Tables { #tables }
3737

docs/declarative-pipelines/PipelineExecution.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818
runPipeline(): Unit
1919
```
2020

21-
`runPipeline` [starts the pipeline](#startPipeline) and requests the [PipelineExecution](PipelineUpdateContext.md#pipelineExecution) (of this [PipelineUpdateContext](#context)) to [wait for the execution to complete](#awaitCompletion).
21+
`runPipeline` [starts this pipeline](#startPipeline) and requests the [PipelineExecution](PipelineUpdateContext.md#pipelineExecution) (of this [PipelineUpdateContext](#context)) to [wait for the execution to complete](#awaitCompletion).
2222

2323
---
2424

2525
`runPipeline` is used when:
2626

27-
* `PipelinesHandler` is requested to [startRun](PipelinesHandler.md#startRun) (for [Spark Connect]({{ book.spark_connect }}))
27+
* `PipelinesHandler` is requested to [start a pipeline run](PipelinesHandler.md#startRun)
2828

2929
## Start Pipeline { #startPipeline }
3030

docs/declarative-pipelines/PipelinesHandler.md

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
`PipelinesHandler` is used to [handle pipeline commands](#handlePipelinesCommand) in [Spark Connect]({{ book.spark_connect }}) ([SparkConnectPlanner]({{ book.spark_connect }}/server/SparkConnectPlanner), precisely).
44

5+
`PipelinesHandler` acts as a bridge between Python and SQL "frontends" and Spark Connect Server (where pipeline execution happens).
6+
57
## Handle Pipelines Command { #handlePipelinesCommand }
68

79
```scala
@@ -14,14 +16,14 @@ handlePipelinesCommand(
1416

1517
`handlePipelinesCommand` handles the given pipeline `cmd` command.
1618

17-
| PipelineCommand | Description |
18-
|-----------------|-------------|
19-
| `CREATE_DATAFLOW_GRAPH` | [Creates a new Dataflow Graph](#createDataflowGraph) |
20-
| `DROP_DATAFLOW_GRAPH` | [Drops a pipeline](#DROP_DATAFLOW_GRAPH) |
21-
| `DEFINE_DATASET` | [Defines a dataset](#DEFINE_DATASET) |
22-
| `DEFINE_FLOW` | [Defines a flow](#DEFINE_FLOW) |
23-
| `START_RUN` | [Starts a pipeline](#START_RUN) |
24-
| `DEFINE_SQL_GRAPH_ELEMENTS` | [DEFINE_SQL_GRAPH_ELEMENTS](#DEFINE_SQL_GRAPH_ELEMENTS) |
19+
| PipelineCommand | Description | Initiator |
20+
|-----------------|-------------|-----------|
21+
| `CREATE_DATAFLOW_GRAPH` | [Creates a new dataflow graph](#CREATE_DATAFLOW_GRAPH) | [pyspark.pipelines.spark_connect_pipeline](#create_dataflow_graph) |
22+
| `DROP_DATAFLOW_GRAPH` | [Drops a pipeline](#DROP_DATAFLOW_GRAPH) ||
23+
| `DEFINE_DATASET` | [Defines a dataset](#DEFINE_DATASET) | [SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_dataset) |
24+
| `DEFINE_FLOW` | [Defines a flow](#DEFINE_FLOW) | [SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_flow) |
25+
| `START_RUN` | [Starts a pipeline run](#START_RUN) | [pyspark.pipelines.spark_connect_pipeline](#start_run) |
26+
| `DEFINE_SQL_GRAPH_ELEMENTS` | [DEFINE_SQL_GRAPH_ELEMENTS](#DEFINE_SQL_GRAPH_ELEMENTS) | [SparkConnectGraphElementRegistry](SparkConnectGraphElementRegistry.md#register_sql) |
2527

2628
`handlePipelinesCommand` reports an `UnsupportedOperationException` for incorrect commands:
2729

@@ -33,9 +35,13 @@ handlePipelinesCommand(
3335

3436
`handlePipelinesCommand` is used when:
3537

36-
* `SparkConnectPlanner` is requested to `handlePipelineCommand` (for `PIPELINE_COMMAND` command)
38+
* `SparkConnectPlanner` ([Spark Connect]({{ book.spark_connect }}/server/SparkConnectPlanner)) is requested to `handlePipelineCommand` (for `PIPELINE_COMMAND` command)
39+
40+
### CREATE_DATAFLOW_GRAPH { #CREATE_DATAFLOW_GRAPH }
3741

38-
### Define Dataset Command { #DEFINE_DATASET }
42+
`handlePipelinesCommand` [creates a dataflow graph](#createDataflowGraph) and sends the graph ID back.
43+
44+
### DEFINE_DATASET { #DEFINE_DATASET }
3945

4046
`handlePipelinesCommand` prints out the following INFO message to the logs:
4147

@@ -45,7 +51,7 @@ Define pipelines dataset cmd received: [cmd]
4551

4652
`handlePipelinesCommand` [defines a dataset](#defineDataset).
4753

48-
### Define Flow Command { #DEFINE_FLOW }
54+
### DEFINE_FLOW { #DEFINE_FLOW }
4955

5056
`handlePipelinesCommand` prints out the following INFO message to the logs:
5157

@@ -55,7 +61,17 @@ Define pipelines flow cmd received: [cmd]
5561

5662
`handlePipelinesCommand` [defines a flow](#defineFlow).
5763

58-
### Start Pipeline { #startRun }
64+
### START_RUN { #START_RUN }
65+
66+
`handlePipelinesCommand` prints out the following INFO message to the logs:
67+
68+
```text
69+
Start pipeline cmd received: [cmd]
70+
```
71+
72+
`handlePipelinesCommand` [starts a pipeline run](#startRun).
73+
74+
## Start Pipeline Run { #startRun }
5975

6076
```scala
6177
startRun(
@@ -64,21 +80,18 @@ startRun(
6480
sessionHolder: SessionHolder): Unit
6581
```
6682

67-
`startRun` prints out the following INFO message to the logs:
68-
69-
```text
70-
Start pipeline cmd received: [cmd]
71-
```
83+
??? note "`START_RUN` Pipeline Command"
84+
`startRun` is used when `PipelinesHandler` is requested to handle [proto.PipelineCommand.CommandTypeCase.START_RUN](#START_RUN) command.
7285

7386
`startRun` finds the [GraphRegistrationContext](GraphRegistrationContext.md) by `dataflowGraphId` in the [DataflowGraphRegistry](DataflowGraphRegistry.md) (in the given `SessionHolder`).
7487

7588
`startRun` creates a `PipelineEventSender` to send pipeline events back to the Spark Connect client (_Python pipeline runtime_).
7689

7790
`startRun` creates a [PipelineUpdateContextImpl](PipelineUpdateContextImpl.md) (with the `PipelineEventSender`).
7891

79-
In the end, `startRun` requests the `PipelineUpdateContextImpl` for the [PipelineExecution](PipelineExecution.md) to [runPipeline](PipelineExecution.md#runPipeline) or [dryRunPipeline](PipelineExecution.md#dryRunPipeline) for `dry-run` or `run` command, respectively.
92+
In the end, `startRun` requests the `PipelineUpdateContextImpl` for the [PipelineExecution](PipelineUpdateContext.md#pipelineExecution) to [run a pipeline](PipelineExecution.md#runPipeline) or [dry-run a pipeline](PipelineExecution.md#dryRunPipeline) for `dry-run` or `run` command, respectively.
8093

81-
### Create Dataflow Graph { #createDataflowGraph }
94+
## Create Dataflow Graph { #createDataflowGraph }
8295

8396
```scala
8497
createDataflowGraph(
@@ -90,7 +103,7 @@ createDataflowGraph(
90103

91104
`createDataflowGraph` returns the ID of the created dataflow graph.
92105

93-
### defineSqlGraphElements { #defineSqlGraphElements }
106+
## defineSqlGraphElements { #defineSqlGraphElements }
94107

95108
```scala
96109
defineSqlGraphElements(
@@ -100,7 +113,7 @@ defineSqlGraphElements(
100113

101114
`defineSqlGraphElements`...FIXME
102115

103-
### Define Dataset (Table or View) { #defineDataset }
116+
## Define Dataset (Table or View) { #defineDataset }
104117

105118
```scala
106119
defineDataset(
@@ -123,7 +136,7 @@ For unknown types, `defineDataset` reports an `IllegalArgumentException`:
123136
Unknown dataset type: [type]
124137
```
125138

126-
### Define Flow { #defineFlow }
139+
## Define Flow { #defineFlow }
127140

128141
```scala
129142
defineFlow(
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# UnresolvedFlow
2+
3+
`UnresolvedFlow` is...FIXME

docs/declarative-pipelines/index.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ Pipelines elements are defined in SQL files included as `definitions` in a [pipe
152152
Supported SQL statements:
153153

154154
* [CREATE FLOW AS INSERT INTO BY NAME](../sql/SparkSqlAstBuilder.md#visitCreatePipelineInsertIntoFlow)
155+
* ...
155156

156157
## Demo: Create Virtual Environment for Python Client
157158

0 commit comments

Comments
 (0)