Skip to content

Commit 33d0363

Browse files
hlcianfagnaamotl
andauthored
RisingWave: Tutorial about Apache Iceberg -> CrateDB (#207)
* First version of iceberg-risingwave.md * Fix ref to new article in index * Fixing issues found by coderabbitai * prettier * Attempt at fixing toctree * Quote link to Minio console in localhost to avoid link checker failure Co-authored-by: Andreas Motl <andreas.motl@crate.io> * Replace tabs with spaces in SQL code blocks * Fix link to new tutorial * Apply some suggestions from CodeRabbit --------- Co-authored-by: Andreas Motl <andreas.motl@crate.io>
1 parent 6c3f483 commit 33d0363

File tree

3 files changed

+312
-1
lines changed

3 files changed

+312
-1
lines changed
Lines changed: 299 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
(iceberg-risingwave)=
2+
3+
# Stream processing from Iceberg tables to CrateDB using RisingWave
4+
5+
[RisingWave] is a stream processing platform that allows configuring data
6+
sources, views on that data, and destinations where results are materialized.
7+
8+
This guide aims to show you an example with data coming from an Iceberg table
9+
and aggregations materialized in real-time in CrateDB.
10+
11+
## Environment setup
12+
13+
For this example, we will spin up 3 containers using [Podman]
14+
15+
Let's first start a [Minio] instance:
16+
17+
```bash
18+
podman run -d --name minio -p 9000:9000 -p 9001:9001 \
19+
-e MINIO_ROOT_USER=minioadmin \
20+
-e MINIO_ROOT_PASSWORD=minioadmin \
21+
quay.io/minio/minio server /data --console-address ":9001"
22+
```
23+
24+
Now let's create a bucket called `warehouse`, for this point a browser to
25+
`http://localhost:9001`, log in with `minioadmin`/`minioadmin`, click
26+
"Create bucket", name it `warehouse`, and click again on "Create bucket".
27+
28+
Then we will spin up an instance of RisingWave:
29+
30+
```bash
31+
podman run -d --name risingwave -it -p 4566:4566 -p 5691:5691 docker.io/risingwavelabs/risingwave:v2.4.0 single_node
32+
```
33+
34+
And finally, an instance of CrateDB:
35+
36+
```bash
37+
podman run -d --name cratedb --publish=4200:4200 --publish=5432:5432 --env CRATE_HEAP_SIZE=1g docker.io/crate/crate:5.10.7 -Cdiscovery.type=single-node
38+
```
39+
40+
We will need three terminals for this demonstration.
41+
42+
On the first terminal, we will use [PyIceberg] and [IPython] to create an Iceberg
43+
table, and later we will add data and see how aggregations materialize in
44+
CrateDB in real-time.
45+
46+
On the second terminal, we will do the RisingWave and CrateDB setups, and we will
47+
leave a Python script running for the streaming of changes.
48+
49+
And on the third terminal, we will review how data appears in CrateDB.
50+
51+
## Creating an Iceberg table
52+
53+
Let's start on the first terminal. We use a Python script to create an Iceberg
54+
table on the bucket we created earlier on Minio, and as we want to keep things
55+
simple, we will use an ephemeral in-memory catalog.
56+
57+
```bash
58+
pip install pyiceberg pyarrow s3fs
59+
ipython3
60+
```
61+
62+
```python
63+
from datetime import datetime
64+
65+
import pyarrow as pa
66+
from pyiceberg.catalog.sql import SqlCatalog
67+
from pyiceberg.schema import Schema
68+
from pyiceberg.types import DoubleType, LongType, NestedField, TimestampType
69+
70+
catalog = SqlCatalog(
71+
"default",
72+
**{
73+
"uri": "sqlite:///:memory:",
74+
"warehouse": f"s3://warehouse",
75+
"s3.endpoint": "http://localhost:9000",
76+
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
77+
"s3.access-key-id": "minioadmin",
78+
"s3.secret-access-key": "minioadmin",
79+
},
80+
)
81+
82+
schema = Schema(
83+
NestedField(field_id=1, name="sensor_id", field_type=LongType()),
84+
NestedField(field_id=2, name="ts", field_type=TimestampType()),
85+
NestedField(field_id=3, name="reading", field_type=DoubleType()),
86+
)
87+
88+
catalog.create_namespace("db")
89+
90+
table = catalog.create_table(
91+
identifier="db.sensors_readings", schema=schema, properties={"format-version": "2"}
92+
)
93+
94+
95+
def create_risingwave_compatible_metadata(table, version):
96+
metadata_location = table.metadata_location
97+
metadata_dir = metadata_location.rsplit("/", 1)[0]
98+
version_hint_path = f"{metadata_dir}/version-hint.text"
99+
output_file = table.io.new_output(version_hint_path)
100+
with output_file.create(overwrite=True) as f:
101+
f.write(version.encode("utf-8"))
102+
v1_metadata_path = f"{metadata_dir}/v{version}.metadata.json"
103+
input_file = table.io.new_input(metadata_location)
104+
with input_file.open() as f_in:
105+
content = f_in.read()
106+
output_file = table.io.new_output(v1_metadata_path)
107+
with output_file.create() as f_out:
108+
f_out.write(content)
109+
110+
111+
create_risingwave_compatible_metadata(table, "1")
112+
```
113+
114+
## RisingWave and CrateDB setups
115+
116+
Let's now switch to the second terminal.
117+
118+
To interact with both CrateDB and RisingWave we will use the `psql` command line
119+
utility, let's install it:
120+
121+
```bash
122+
sudo apt-get install -y postgresql-client
123+
```
124+
125+
Now let's connect first to CrateDB to create a table where we will keep the
126+
average reading for each sensor:
127+
128+
```bash
129+
psql -h localhost -U crate
130+
```
131+
132+
```sql
133+
CREATE TABLE public.average_sensor_readings (
134+
sensor_id BIGINT PRIMARY KEY,
135+
average_reading DOUBLE
136+
);
137+
```
138+
139+
Ctrl+D
140+
141+
Now we need to tell RisingWave to source the data from the Iceberg table:
142+
143+
```bash
144+
psql -h localhost -p 4566 -d dev -U root
145+
```
146+
147+
```sql
148+
CREATE SOURCE sensors_readings
149+
WITH (
150+
connector = 'iceberg',
151+
database.name='db.db',
152+
warehouse.path='s3://warehouse/',
153+
table.name='sensors_readings',
154+
s3.endpoint = 'http://host.containers.internal:9000',
155+
s3.access.key = 'minioadmin',
156+
s3.secret.key = 'minioadmin',
157+
s3.region = 'minio'
158+
);
159+
```
160+
161+
And to materialize the averages:
162+
163+
```sql
164+
CREATE MATERIALIZED VIEW average_sensor_readings AS
165+
SELECT
166+
sensor_id,
167+
AVG(reading) AS average_reading
168+
FROM sensors_readings
169+
GROUP BY sensor_id;
170+
```
171+
172+
Ctrl+D
173+
174+
And we will now install some dependencies:
175+
176+
```bash
177+
pip install pandas records sqlalchemy-cratedb
178+
pip install psycopg2-binary
179+
pip install --upgrade packaging
180+
pip install --upgrade 'risingwave-py @ git+https://github.com/risingwavelabs/risingwave-py.git@833ca13041cb73cd96fa5cb1c898db2a558d5d8c'
181+
```
182+
183+
And kick off the Python script that will keep CrateDB up-to-date in real-time:
184+
185+
```bash
186+
cat <<EOF >>cratedb_event_handler.py
187+
```
188+
189+
```python
190+
import threading
191+
192+
import pandas as pd
193+
import records
194+
from risingwave import OutputFormat, RisingWave, RisingWaveConnOptions
195+
196+
rw = RisingWave(
197+
RisingWaveConnOptions.from_connection_info(
198+
host="localhost", port=4566, user="root", password="root", database="dev"
199+
)
200+
)
201+
202+
203+
def cratedb_event_handler(event: pd.DataFrame):
204+
cratedb = records.Database("crate://", echo=True)
205+
for _, row in event.iterrows():
206+
if row["op"] == "Insert" or row["op"] == "UpdateInsert":
207+
cratedb.query(
208+
"INSERT INTO public.average_sensor_readings (sensor_id,average_reading) VALUES (:sensor_id,:average_reading);",
209+
**dict(
210+
sensor_id=row["sensor_id"],
211+
average_reading=row["average_reading"],
212+
),
213+
)
214+
if row["op"] == "Delete" or row["op"] == "UpdateDelete":
215+
cratedb.query(
216+
"DELETE FROM public.average_sensor_readings WHERE sensor_id=:sensor_id;",
217+
**dict(sensor_id=row["sensor_id"]),
218+
)
219+
220+
221+
def subscribe_average_sensor_readings_change():
222+
rw.on_change(
223+
subscribe_from="average_sensor_readings",
224+
handler=cratedb_event_handler,
225+
output_format=OutputFormat.DATAFRAME,
226+
)
227+
228+
229+
threading.Thread(target=subscribe_average_sensor_readings_change).start()
230+
```
231+
232+
```bash
233+
EOF
234+
235+
python cratedb_event_handler.py
236+
```
237+
238+
## Adding some data and seeing results materialize in real-time
239+
240+
Let's go back to the first terminal and run:
241+
242+
```python
243+
data = pa.Table.from_pydict(
244+
{
245+
"sensor_id": [1, 1],
246+
"ts": [
247+
datetime.strptime("2025-05-14 14:00", "%Y-%m-%d %H:%M"),
248+
datetime.strptime("2025-05-14 15:00", "%Y-%m-%d %H:%M"),
249+
],
250+
"reading": [1.2, 3.4],
251+
}
252+
)
253+
254+
table.append(data)
255+
256+
create_risingwave_compatible_metadata(table, "2")
257+
```
258+
259+
Now let's go to the third terminal. Let connect to CrateDB:
260+
261+
```bash
262+
psql -h localhost -U crate
263+
```
264+
265+
And let's inspect the average_sensor_readings table:
266+
267+
```sql
268+
SELECT * FROM public.average_sensor_readings;
269+
```
270+
271+
The average for sensor 1 is 2.3
272+
273+
Let's go back to the first terminal and run:
274+
275+
```python
276+
data = pa.Table.from_pydict(
277+
{
278+
"sensor_id": [1, 1],
279+
"ts": [
280+
datetime.strptime("2025-06-04 14:00", "%Y-%m-%d %H:%M"),
281+
datetime.strptime("2025-06-04 15:00", "%Y-%m-%d %H:%M"),
282+
],
283+
"reading": [5.6, 7.8],
284+
}
285+
)
286+
287+
table.append(data)
288+
289+
create_risingwave_compatible_metadata(table, "3")
290+
```
291+
292+
If we now check `average_sensor_readings` from the third terminal, we will see
293+
that the average has already changed to 4.5.
294+
295+
[RisingWave]: https://github.com/risingwavelabs/risingwave
296+
[Podman]: https://github.com/containers/podman
297+
[Minio]: https://github.com/minio/minio
298+
[PyIceberg]: https://github.com/apache/iceberg-python
299+
[IPython]: https://github.com/ipython/ipython

docs/integrate/etl/index.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,18 @@ Tutorials and resources about configuring the managed variants, Astro and CrateD
5757
- [CrateDB Apache Hop Bulk Loader transform]
5858

5959

60+
## Apache Iceberg / RisingWave
61+
:::{div}
62+
- {ref}`iceberg-risingwave`
63+
:::
64+
65+
```{toctree}
66+
:hidden:
67+
68+
iceberg-risingwave
69+
```
70+
71+
6072
## Apache Kafka
6173
:::{div}
6274
- {ref}`kafka-connect`

docs/integrate/risingwave/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ referenced below.
8080
:::
8181
- An example with data coming from an Apache Iceberg table and aggregations
8282
materialized in real-time in CrateDB, using RisingWave.
83-
See [Stream processing from Iceberg tables to CrateDB using RisingWave].
83+
See {ref}`iceberg-risingwave`.
8484

8585
:::{note}
8686
We are tracking interoperability issues per [Tool: RisingWave] and appreciate

0 commit comments

Comments
 (0)