Skip to content

Commit 48351d1

Browse files
authored
Feat: Airflow uses kubernetes_executor as default executor, git sync dags and add tutorials (#109)
Signed-off-by: xingcan-ltc <xingcan.hu@hotmail.com>
1 parent 83a893e commit 48351d1

File tree

12 files changed

+176
-102
lines changed

12 files changed

+176
-102
lines changed

catalog/airflow/apps/airflow.app/README.md

Lines changed: 11 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,56 +12,34 @@ Airflow 通过KDP web部署。
1212

1313
##### 2.2.1 创建DAG文件
1414

15-
以下为一个简单的 DAG 文件,将以下内容复制到文件 demo.py 中。
16-
17-
```python
18-
from datetime import datetime
19-
20-
from airflow import DAG
21-
from airflow.decorators import task
22-
from airflow.operators.bash import BashOperator
23-
24-
with DAG(dag_id="demo", start_date=datetime(2024, 5, 20), schedule="0 0 * * *") as dag:
25-
26-
hello = BashOperator(task_id="hello", bash_command="echo hello")
27-
28-
@task()
29-
def airflow():
30-
print("airflow")
31-
32-
hello >> airflow()
33-
```
34-
35-
##### 2.2.2 将 demo.py 文件通过kubectl 拷贝到airflow scheduler和worker pod 中
36-
37-
```shell
38-
kubectl cp demo.py airflow-scheduler-7cf5ddb9c5-fql6x:/opt/airflow/dags -n kdp-data
39-
kubectl cp demo.py airflow-worker-584b97f6cb-8gxq8:/opt/airflow/dags -n kdp-data
40-
```
41-
42-
注意:使用时请修改 `kdp-data` 为你的 namespace,修改 `airflow-scheduler-7cf5ddb9c5-fql6x` 为你的 scheduler pod 名称,修改 `airflow-worker-584b97f6cb-8gxq8` 为你的 worker pod 名称。
43-
4415
##### 2.2.3 浏览器访问airflow web
4516

46-
可通过配置的ingress地址访问airflow web,或者通过kubectl port-forward 访问,以下为port-forward命令。
17+
可通过配置的ingress(http://airflow-web-kdp-data.kdp-e2e.io/home)地址访问airflow web,或者通过kubectl port-forward 访问,以下为port-forward命令。
4718

4819
```shell
4920
kubectl port-forward svc/airflow-webserver -n kdp-data 8080:8080
5021
```
5122

5223
默认登陆用户/密码为 `admin/admin`
5324

54-
#### 2.2.4 查看DAG以及任务执行
25+
#### 2.2.4 配置DAG文件
26+
27+
DAG文件存放在git仓库中,默认安装配置的dag文件存放在git仓库中,你可以通过修改该文件来修改DAG文件。
28+
你也可以自己fork该仓库,然后修改dags文件,然后提交到git仓库中, 在 KDP 页面安装配置修改 dag repo, branch 等,然后更新即可。
29+
30+
#### 2.2.4 运行DAG
31+
32+
DAG 默认是暂停状态,需要手动启动。手动激活(点击名称旁边的开关即可)名称`hello_airflow`的DAG即可,该DAG是每天运行一次,激活后会自动补跑昨天的任务。
33+
也可以手动触发:点击`hello_airflow`DAG右边`Trigger DAG`按钮即可。
5534

56-
当前配置的scheduler扫描频率为1分钟,在web页面可见demo DAG,点击右侧 `Trigger DAG` 按钮,即可触发DAG执行。
5735

5836
### 3. 常见问题自检
5937

6038
#### 3.1. DAG执行失败
6139

6240
原因与排查:
6341

64-
1. 检查scheduler和worker pod 中 `/opt/airflow/dags` 目录下是否均存在 demo.py 文件;
42+
1. 检查 dag 代码同步是否成功,检查日志: `kubectl logs -l component=scheduler,release=airflow -c git-sync -n kdp-data`
6543
2. 查看scheduler和worker pod 日志输出信息。
6644

6745
### 4. 附录

catalog/airflow/apps/airflow.app/app.yaml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ metadata:
1010
spec:
1111
type: airflow
1212
properties:
13-
chartVersion: v1.1.0-1.14.0
14-
images:
15-
airflow:
16-
tag: 2.9.1
13+
scheduler:
14+
replicas: 1
1715

Lines changed: 19 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,74 +1,49 @@
11
### 1. Application Description
22

3-
Airflow™ is a platform created by the community to programmatically author, schedule and monitor workflows.
3+
Airflow™ is a platform for programmatically authoring, scheduling, and monitoring workflows.
44

55
### 2. Quick Start
66

7-
#### 2.1 Deploy
7+
#### 2.1 Deployment
88

9-
Airflow is deployed by using KDP web.
9+
Airflow is deployed via the KDP web interface.
1010

11-
#### 2.2. Practice
11+
#### 2.2 Practical Usage
1212

13-
##### 2.2.1 Create DAG file
13+
##### 2.2.1 Creating DAG Files
1414

15-
Here is a simple DAG file, copy the following content to the demo.py file.
15+
##### 2.2.3 Accessing Airflow Web via Browser
1616

17-
```python
18-
from datetime import datetime
19-
20-
from airflow import DAG
21-
from airflow.decorators import task
22-
from airflow.operators.bash import BashOperator
23-
24-
with DAG(dag_id="demo", start_date=datetime(2024, 5, 20), schedule="0 0 * * *") as dag:
25-
26-
hello = BashOperator(task_id="hello", bash_command="echo hello")
27-
28-
@task()
29-
def airflow():
30-
print("airflow")
31-
32-
hello >> airflow()
33-
```
34-
35-
##### 2.2.2 Copy DAG file to airflow scheduler and worker pod
17+
You can access the Airflow web interface through the configured ingress (http://airflow-web-kdp-data.kdp-e2e.io/home) or by using kubectl port-forward, as shown in the following command:
3618

3719
```shell
38-
kubectl cp demo.py airflow-scheduler-7cf5ddb9c5-fql6x:/opt/airflow/dags -n kdp-data
39-
kubectl cp demo.py airflow-worker-584b97f6cb-8gxq8:/opt/airflow/dags -n kdp-data
20+
kubectl port-forward svc/airflow-webserver -n kdp-data 8080:8080
4021
```
4122

42-
Note: When using this example, please modify `kdp-data` to your namespace, modify `airflow-scheduler-7cf5ddb9c5-fql6x` to your scheduler pod name, modify `airflow-worker-584b97f6cb-8gxq8` to your worker pod name.
43-
44-
##### 2.2.3 Visit airflow web
23+
The default login username/password is admin/admin.
4524

46-
Visit airflow web by ingress address, or using kubectl port-forward, as follows:
4725

48-
```shell
49-
kubectl port-forward svc/airflow-webserver -n kdp-data 8080:8080
50-
```
26+
### 2.2.4 Configuring DAG Files
5127

52-
Default login user/password is `admin/admin`
28+
DAG files are stored in a Git repository. The default installation configuration places the DAG files in a Git repository, which you can modify to change the DAG files. Alternatively, you can fork the repository, modify the DAG files, and then commit them to the Git repository. You can also install and configure the DAG repository, branch, etc., on the KDP page and then update it.
5329

54-
#### 2.2.4 View DAG and task execution
30+
### 2.2.4 Running DAGs
5531

56-
The current configured scheduler scanning frequency is 1 minute, and the demo DAG can be seen on the web page. Click the 'Trigger DAG' button on the right to trigger DAG execution.
32+
DAGs are set to a paused state by default and need to be manually started. Manually activate the DAG named `hello_airflow` by clicking the switch next to its name. This DAG runs once a day and will automatically catch up on yesterday's tasks after activation. You can also manually trigger it by clicking the `Trigger DAG` button on the right side of the `hello_airflow` DAG.
5733

5834
### 3. FAQ
5935

60-
#### 3.1. DAG execution failed
36+
#### 3.1. DAG Execution Failure
6137

62-
Reasons and results:
38+
Causes and Troubleshooting:
6339

64-
1. Check whether the demo.py file exists in the `/opt/airflow/dags` directory of the scheduler and worker pod;
65-
2. View the output information of scheduler and worker pod logs.
40+
- Check if the DAG code synchronization is successful by checking the logs: `kubectl logs -l component=scheduler,release=airflow -c git-sync -n kdp-data`
41+
- Review the log output information for the scheduler and worker pods.
6642

6743
### 4. Appendix
6844

69-
#### 4.1. Concept
45+
#### 4.1. Concept Introduction
7046

7147
**DAG**
7248

73-
A DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run.
74-
49+
Directed Acyclic Graph, which is the basic unit for describing workflows in Airflow.

catalog/airflow/x-definitions/app-airflow.cue

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,4 @@
1-
import (
2-
"strconv"
3-
"strings"
4-
)
5-
6-
import ("encoding/json")
1+
import "strings"
72

83
airflow: {
94
annotations: {}
@@ -65,6 +60,16 @@ template: {
6560
values: {
6661
defaultAirflowRepository: _imageRegistry + "apache/airflow"
6762
defaultAirflowTag: parameter.images.airflow.tag
63+
executor: "KubernetesExecutor"
64+
dags: {
65+
gitSync: {
66+
enabled: true
67+
repo: parameter.dags.gitSync.repo
68+
branch: parameter.dags.gitSync.branch
69+
rev: parameter.dags.gitSync.rev
70+
subPath: parameter.dags.gitSync.subPath
71+
}
72+
}
6873
config: {
6974
core: {
7075
"default_timezone": "Asia/Shanghai"
@@ -198,10 +203,6 @@ template: {
198203
enabled: false
199204
}
200205
}
201-
redis: {
202-
enabled: true
203-
terminationGracePeriodSeconds: 30
204-
}
205206
ingress: {
206207
web: {
207208
enabled: true
@@ -210,18 +211,17 @@ template: {
210211
name: "airflow-web-" + context.namespace + "." + context["ingress.root_domain"]
211212
},
212213
]
213-
ingressClassName: "kong"
214214
}
215215
}
216216
images: {
217217
statsd: {
218218
repository: _imageRegistry + "prometheus/statsd-exporter"
219-
tag: "v0.26.1"
220219
}
221-
redis: {
222-
repository: _imageRegistry + "redis"
223-
tag: "7-bookworm"
220+
gitSync: {
221+
repository: _imageRegistry + "git-sync/git-sync"
222+
tag: "v4.2.3"
224223
}
224+
225225
}
226226
}
227227
}
@@ -246,8 +246,29 @@ template: {
246246
mysqlSecret: string
247247
}
248248
}
249-
// +ui:title=Webserver
249+
250+
// ui:title=DAG 配置
250251
// +ui:order=2
252+
dags: {
253+
// +ui:description=git 同步配置 (必须配置)
254+
gitSync: {
255+
// +ui:description=git 仓库地址。 备用镜像仓库: https://gitee.com/linktime-cloud/example-datasets.git
256+
// +ui:order=1
257+
repo: *"https://github.com/linktimecloud/example-datasets.git" | string
258+
// +ui:description=git 仓库分支
259+
// +ui:order=2
260+
branch: *"airflow" | string
261+
// +ui:description=git 仓库提交 ID
262+
// +ui:order=3
263+
rev: *"HEAD" | string
264+
// +ui:description= DAG 代码目录 (根目录请使用空字符串)
265+
// +ui:order=4
266+
subPath: *"dags" | string
267+
}
268+
}
269+
270+
// +ui:title=Webserver
271+
// +ui:order=3
251272
webserver: {
252273
// +ui:description=资源规格
253274
// +ui:order=1
@@ -288,7 +309,7 @@ template: {
288309
replicas: *1 | int
289310
}
290311
// +ui:title=Scheduler
291-
// +ui:order=3
312+
// +ui:order=4
292313
scheduler: {
293314
// +ui:description=资源规格
294315
// +ui:order=1
@@ -329,7 +350,7 @@ template: {
329350
replicas: *1 | int
330351
}
331352
// +ui:title=Workers
332-
// +ui:order=4
353+
// +ui:order=5
333354
workers: {
334355
// +ui:description=资源规格
335356
// +ui:order=1
@@ -380,7 +401,7 @@ template: {
380401
images: {
381402
airflow: {
382403
// +ui:options={"disabled":true}
383-
tag: *"2.9.1" | string
404+
tag: *"v1.0.0-2.9.1" | string
384405
}
385406
}
386407
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
2+
# Batch Job Scheduling for Hive SQL with Apache Airflow
3+
4+
# 1. Introduction
5+
Apache Airflow is an open-source platform for orchestrating and automating batch jobs, allowing for the easy creation, scheduling, monitoring, and management of workflows. Airflow supports Hive SQL, enabling the effortless execution of Hive SQL tasks.
6+
7+
Apache Airflow utilizes Directed Acyclic Graphs (DAGs) to represent workflows, which consist of task nodes and dependencies. Task nodes can be Python operations, Shell operations, SQL operations, and more. Airflow supports various schedulers, including the Local scheduler, Celery scheduler, Kubernetes scheduler, and others.
8+
9+
This article introduces how to write Hive SQL tasks using `pyhive` and execute them with the Apache Airflow Kubernetes scheduler.
10+
11+
# 2. Writing a Hive SQL DAG
12+
13+
The specific code implementation can be accessed on [Github](https://github.com/linktimecloud/example-datasets/blob/airflow/dags/hive-sql-example.py) or [Gitee](https://gitee.com/linktime-cloud/example-datasets/blob/airflow/dags/hive-sql-example.py).
14+
15+
This code is a DAG (Directed Acyclic Graph) written using the Apache Airflow framework, designed for automating data processing tasks. It primarily performs two tasks: creating a Hive table and inserting data, followed by identifying the top-scoring students in each subject.
16+
17+
# 3. Running the DAG
18+
## 3.1 Component Dependencies
19+
The following components need to be installed in KDP:
20+
- mysql
21+
- airflow
22+
- zookeeper
23+
- hdfs
24+
- hive (hive metastore, hive server)
25+
- hue, httpfs-gateway (optional)
26+
27+
## 3.2 Scheduling Jobs
28+
After installing Airflow with default parameters in KDP, log in to the Airflow Web interface using the username `admin` and password `admin`.
29+
30+
Start the DAG named `hive-sql-example`.
31+
32+
![Airflow Web Interface](./images/airflow01.png)
33+
34+
Upon successful execution, the results can be viewed through the Hue Web interface. Alternatively, you can refer to the `hive-server2` Quick Start guide to connect to Hive Server2 using beeline and view the results.
35+
36+
![Hue Web Interface](./images/airflow02.png)
37+
38+
39+
运行成功后,可以通过Hue Web界面查看结果。也可以参考 `hive-server2` Qucick Start 使用beeline 连接 Hive Server2 查看结果。
40+
41+
![](./images/airflow02.png)
42+
43+
44+
45+
46+
47+
48+
49+
50+
51+
93.4 KB
Loading
127 KB
Loading

docs/en/user-tutorials/tutorials.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,6 @@ Users can refer to the following scenario tutorials to practice how to do data i
1919
* [How to integrate with Kafka on KDP](./integration-kafka-with-int-ext-comps.md)
2020
* [Data Import from External HDFS to KDP HDFS](./import-from-hdfs-to-hdfs.md)
2121
* [Exploring data using Airbyte/ClickHouse/Superset](./exploring-data-using-airbyte-clickhouse-superset.md)
22-
* More...
22+
* [Batch Job Scheduling for Hive SQL with Apache Airflow](./batch-job-scheduling-for-hive-sql-with-apache-airflow.md)
23+
* More...
24+

0 commit comments

Comments
 (0)