From 32315b55cebde0b71ef4a8edc960c32bb1d1cae7 Mon Sep 17 00:00:00 2001 From: hlcianfagna <110453267+hlcianfagna@users.noreply.github.com> Date: Wed, 4 Jun 2025 18:04:19 +0100 Subject: [PATCH 1/9] First version of iceberg-risingwave.md --- docs/integrate/etl/iceberg-risingwave.md | 290 +++++++++++++++++++++++ docs/integrate/etl/index.md | 5 + 2 files changed, 295 insertions(+) create mode 100644 docs/integrate/etl/iceberg-risingwave.md diff --git a/docs/integrate/etl/iceberg-risingwave.md b/docs/integrate/etl/iceberg-risingwave.md new file mode 100644 index 00000000..9e4ab43a --- /dev/null +++ b/docs/integrate/etl/iceberg-risingwave.md @@ -0,0 +1,290 @@ +(iceberg-risingwave)= +# Stream processing from Iceberg tables to CrateDB using RisingWave + +[RisingWave] is a stream processing platform that allows configuring data sources, views on that data, and destinations where results are materialized. + +This guide aims to show you an example with data coming from an Iceberg table and aggregations materialized in real-time in CrateDB. + +## Environment setup + +For this example we will spin up 3 containers using [Podman] + +Let's first start a [Minio] instance: + +```bash +podman run -d --name minio -p 9000:9000 -p 9001:9001 \ + -e MINIO_ROOT_USER=minioadmin \ + -e MINIO_ROOT_PASSWORD=minioadmin \ + quay.io/minio/minio server /data --console-address ":9001" +``` + +And let's create a bucket called `warehouse`, for this point a browser to http://localhost:9001 , login with `minioadmin` / `minioadmin` , and click on "Create bucket", enter "warehouse", and click again on "Create bucket". + +Then we will spin up an instance of RisingWave: + +```bash +podman run -d --name risingwave -it -p 4566:4566 -p 5691:5691 docker.io/risingwavelabs/risingwave:v2.4.0 single_node +``` + +And finally an instance of CrateDB: + +```bash +podman run -d --name cratedb --publish=4200:4200 --publish=5432:5432 --env CRATE_HEAP_SIZE=1g docker.io/crate/crate:5.10.7 -Cdiscovery.type=single-node +``` + +We will need 3 consoles for this demonstration. + +On the first console we will use [PyIceberg] and [IPython] to create an Iceberg table, and later we will add data and see how aggregations materialize in CrateDB in real-time. + +On the second console we will do the RisingWave and CrateDB setups, and we will leave a Python script running for the streaming of changes. + +And on the 3rd console we will review how data appears in CrateDB. + +## Creating an Iceberg table + +Let's start on the first console. +We use a Python script to create an Iceberg table on the bucket we created earlier on Minio, and as we want to keep things simple, we will use an ephemeral in-memory catalog. + +```bash +pip install pyiceberg pyarrow s3fs +ipython3 +``` + +```python +import re +from datetime import datetime + +import pyarrow as pa +from pyiceberg.catalog.sql import SqlCatalog +from pyiceberg.schema import Schema +from pyiceberg.types import DoubleType, LongType, NestedField, TimestampType + +catalog = SqlCatalog( + "default", + **{ + "uri": "sqlite:///:memory:", + "warehouse": f"s3://warehouse", + "s3.endpoint": "http://localhost:9000", + "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO", + "s3.access-key-id": "minioadmin", + "s3.secret-access-key": "minioadmin", + }, +) + +schema = Schema( + NestedField(field_id=1, name="sensor_id", field_type=LongType()), + NestedField(field_id=2, name="ts", field_type=TimestampType()), + NestedField(field_id=3, name="reading", field_type=DoubleType()), +) + +catalog.create_namespace("db") + +table = catalog.create_table( + identifier="db.sensors_readings", schema=schema, properties={"format-version": "2"} +) + + +def create_risingwave_compatible_metadata(table, version): + metadata_location = table.metadata_location + metadata_dir = metadata_location.rsplit("/", 1)[0] + version_hint_path = f"{metadata_dir}/version-hint.text" + output_file = table.io.new_output(version_hint_path) + with output_file.create(overwrite=True) as f: + f.write(version.encode("utf-8")) + v1_metadata_path = f"{metadata_dir}/v{version}.metadata.json" + input_file = table.io.new_input(metadata_location) + with input_file.open() as f_in: + content = f_in.read() + output_file = table.io.new_output(v1_metadata_path) + with output_file.create() as f_out: + f_out.write(content) + + +create_risingwave_compatible_metadata(table, "1") +``` + +## RisingWave and CrateDB setups + +Let's now switch to the second console. + +To interact with both CrateDB and RisingWave we will use the `psql` command line utility, let's install it: + +```bash +sudo apt-get install -y postgresql-client +``` + +Now let's connect first to CrateDB to create a table where we will keep the average reading for each sensor: + +```bash +psql -h localhost -U crate +``` + +```sql +CREATE TABLE public.average_sensor_readings ( + sensor_id BIGINT PRIMARY KEY, + average_reading DOUBLE +); +``` + +Ctrl+D + +Now we need to tell RisingWave to source the data from the Iceberg table: + +```bash +psql -h localhost -p 4566 -d dev -U root +``` + +```sql +CREATE SOURCE sensors_readings +WITH ( + connector = 'iceberg', + database.name='db.db', + warehouse.path='s3://warehouse/', + table.name='sensors_readings', + s3.endpoint = 'http://host.containers.internal:9000', + s3.access.key = 'minioadmin', + s3.secret.key = 'minioadmin', + s3.region = 'minio' +); +``` + +And to materialize the averages: + +```sql +CREATE MATERIALIZED VIEW average_sensor_readings AS +SELECT + sensor_id, + AVG(reading) AS average_reading +FROM sensors_readings +GROUP BY sensor_id; +``` + +Ctrl+D + +And we will now install some dependencies: + +```bash +pip install pandas records sqlalchemy-cratedb +pip install psycopg2-binary +pip install --upgrade packaging +pip install --upgrade 'risingwave-py @ git+https://github.com/risingwavelabs/risingwave-py.git@833ca13041cb73cd96fa5cb1c898db2a558d5d8c' +``` + +And kick off the Python script that will keep CrateDB up-to-date in real-time: + +```bash +cat <>cratedb_event_handler.py +``` + +```python +import threading + +import pandas as pd +import records +from risingwave import OutputFormat, RisingWave, RisingWaveConnOptions + +rw = RisingWave( + RisingWaveConnOptions.from_connection_info( + host="localhost", port=4566, user="root", password="root", database="dev" + ) +) + + +def cratedb_event_handler(event: pd.DataFrame): + cratedb = records.Database("crate://", echo=True) + for _, row in event.iterrows(): + if row["op"] == "Insert" or row["op"] == "UpdateInsert": + cratedb.query( + "INSERT INTO public.average_sensor_readings (sensor_id,average_reading) VALUES (:sensor_id,:average_reading);", + **dict( + sensor_id=row["sensor_id"], + average_reading=row["average_reading"], + ), + ) + if row["op"] == "Delete" or row["op"] == "UpdateDelete": + cratedb.query( + "DELETE FROM public.average_sensor_readings WHERE sensor_id=:sensor_id;", + **dict(sensor_id=row["sensor_id"]), + ) + + +def subscribe_average_exam_scores_change(): + rw.on_change( + subscribe_from="average_sensor_readings", + handler=cratedb_event_handler, + output_format=OutputFormat.DATAFRAME, + ) + + +threading.Thread(target=subscribe_average_exam_scores_change).start() +``` + +```bash +EOF + +python cratedb_event_handler.py +``` + +## Adding some data and seeing results materialize in real-time + +Let's go back to the first console and run: + +```python +data = pa.Table.from_pydict( + { + "sensor_id": [1, 1], + "ts": [ + datetime.strptime("2025-05-14 14:00", "%Y-%m-%d %H:%M"), + datetime.strptime("2025-05-14 15:00", "%Y-%m-%d %H:%M"), + ], + "reading": [1.2, 3.4], + } +) + +table.append(data) + +create_risingwave_compatible_metadata(table, "2") +``` + +Now let's go to the third console. +Let connect to CrateDB: + +```bash +psql -h localhost -U crate +``` + +And let's inspect the average_sensor_readings table: + +```sql +SELECT * FROM public.average_sensor_readings; +``` + +The average for sensor 1 is 2.3 + +Let's go back to the first console and run: + +```python +data = pa.Table.from_pydict( + { + "sensor_id": [1, 1], + "ts": [ + datetime.strptime("2025-06-04 14:00", "%Y-%m-%d %H:%M"), + datetime.strptime("2025-06-04 15:00", "%Y-%m-%d %H:%M"), + ], + "reading": [5.6, 7.8], + } +) + +table.append(data) + +create_risingwave_compatible_metadata(table, "3") +``` + +If now we check again average_sensor_readings from the 3rd console we will see the average has already changed to 4.5 + + +[RisingWave]: https://github.com/risingwavelabs/risingwave +[Podman]: https://github.com/containers/podman +[Minio]: https://github.com/minio/minio +[PyIceberg]: https://github.com/apache/iceberg-python +[IPython]: https://github.com/ipython/ipython diff --git a/docs/integrate/etl/index.md b/docs/integrate/etl/index.md index 8daf72d6..800c1e74 100644 --- a/docs/integrate/etl/index.md +++ b/docs/integrate/etl/index.md @@ -57,6 +57,11 @@ Tutorials and resources about configuring the managed variants, Astro and CrateD - [CrateDB Apache Hop Bulk Loader transform] +## Apache Iceberg / RisingWave + +- [Stream processing from Iceberg tables to CrateDB using RisingWave] + + ## Apache Kafka :::{div} - {ref}`kafka-connect` From 33a6833e4611f94951722af263a31bed66705940 Mon Sep 17 00:00:00 2001 From: hlcianfagna <110453267+hlcianfagna@users.noreply.github.com> Date: Wed, 4 Jun 2025 18:10:51 +0100 Subject: [PATCH 2/9] Fix ref to new article in index --- docs/integrate/etl/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrate/etl/index.md b/docs/integrate/etl/index.md index 800c1e74..5b3ce20f 100644 --- a/docs/integrate/etl/index.md +++ b/docs/integrate/etl/index.md @@ -59,7 +59,7 @@ Tutorials and resources about configuring the managed variants, Astro and CrateD ## Apache Iceberg / RisingWave -- [Stream processing from Iceberg tables to CrateDB using RisingWave] +- {ref}`iceberg-risingwave` ## Apache Kafka From 790f080d342b4ac8903f928e174f9b6043cd3f3a Mon Sep 17 00:00:00 2001 From: hlcianfagna <110453267+hlcianfagna@users.noreply.github.com> Date: Wed, 4 Jun 2025 18:12:44 +0100 Subject: [PATCH 3/9] Fixing issues found by coderabbitai --- docs/integrate/etl/iceberg-risingwave.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/integrate/etl/iceberg-risingwave.md b/docs/integrate/etl/iceberg-risingwave.md index 9e4ab43a..74410b5b 100644 --- a/docs/integrate/etl/iceberg-risingwave.md +++ b/docs/integrate/etl/iceberg-risingwave.md @@ -7,7 +7,7 @@ This guide aims to show you an example with data coming from an Iceberg table an ## Environment setup -For this example we will spin up 3 containers using [Podman] +For this example, we will spin up 3 containers using [Podman] Let's first start a [Minio] instance: @@ -208,7 +208,7 @@ def cratedb_event_handler(event: pd.DataFrame): ) -def subscribe_average_exam_scores_change(): +def subscribe_average_sensor_readings_change(): rw.on_change( subscribe_from="average_sensor_readings", handler=cratedb_event_handler, @@ -216,7 +216,7 @@ def subscribe_average_exam_scores_change(): ) -threading.Thread(target=subscribe_average_exam_scores_change).start() +threading.Thread(target=subscribe_average_sensor_readings_change).start() ``` ```bash From 7ec3584be6cae8482551ce6cd8e7128885b3bc0d Mon Sep 17 00:00:00 2001 From: hlcianfagna <110453267+hlcianfagna@users.noreply.github.com> Date: Wed, 4 Jun 2025 18:19:27 +0100 Subject: [PATCH 4/9] prettier --- docs/integrate/etl/iceberg-risingwave.md | 40 +++++++++++++++--------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/docs/integrate/etl/iceberg-risingwave.md b/docs/integrate/etl/iceberg-risingwave.md index 74410b5b..f5436206 100644 --- a/docs/integrate/etl/iceberg-risingwave.md +++ b/docs/integrate/etl/iceberg-risingwave.md @@ -1,9 +1,12 @@ (iceberg-risingwave)= + # Stream processing from Iceberg tables to CrateDB using RisingWave -[RisingWave] is a stream processing platform that allows configuring data sources, views on that data, and destinations where results are materialized. +[RisingWave] is a stream processing platform that allows configuring data +sources, views on that data, and destinations where results are materialized. -This guide aims to show you an example with data coming from an Iceberg table and aggregations materialized in real-time in CrateDB. +This guide aims to show you an example with data coming from an Iceberg table +and aggregations materialized in real-time in CrateDB. ## Environment setup @@ -18,7 +21,9 @@ podman run -d --name minio -p 9000:9000 -p 9001:9001 \ quay.io/minio/minio server /data --console-address ":9001" ``` -And let's create a bucket called `warehouse`, for this point a browser to http://localhost:9001 , login with `minioadmin` / `minioadmin` , and click on "Create bucket", enter "warehouse", and click again on "Create bucket". +And let's create a bucket called `warehouse`, for this point a browser to +http://localhost:9001 , login with `minioadmin` / `minioadmin` , and click on +"Create bucket", enter "warehouse", and click again on "Create bucket". Then we will spin up an instance of RisingWave: @@ -34,16 +39,20 @@ podman run -d --name cratedb --publish=4200:4200 --publish=5432:5432 --env CRATE We will need 3 consoles for this demonstration. -On the first console we will use [PyIceberg] and [IPython] to create an Iceberg table, and later we will add data and see how aggregations materialize in CrateDB in real-time. +On the first console we will use [PyIceberg] and [IPython] to create an Iceberg +table, and later we will add data and see how aggregations materialize in +CrateDB in real-time. -On the second console we will do the RisingWave and CrateDB setups, and we will leave a Python script running for the streaming of changes. +On the second console we will do the RisingWave and CrateDB setups, and we will +leave a Python script running for the streaming of changes. And on the 3rd console we will review how data appears in CrateDB. ## Creating an Iceberg table -Let's start on the first console. -We use a Python script to create an Iceberg table on the bucket we created earlier on Minio, and as we want to keep things simple, we will use an ephemeral in-memory catalog. +Let's start on the first console. We use a Python script to create an Iceberg +table on the bucket we created earlier on Minio, and as we want to keep things +simple, we will use an ephemeral in-memory catalog. ```bash pip install pyiceberg pyarrow s3fs @@ -107,13 +116,15 @@ create_risingwave_compatible_metadata(table, "1") Let's now switch to the second console. -To interact with both CrateDB and RisingWave we will use the `psql` command line utility, let's install it: +To interact with both CrateDB and RisingWave we will use the `psql` command line +utility, let's install it: ```bash sudo apt-get install -y postgresql-client ``` -Now let's connect first to CrateDB to create a table where we will keep the average reading for each sensor: +Now let's connect first to CrateDB to create a table where we will keep the +average reading for each sensor: ```bash psql -h localhost -U crate @@ -137,7 +148,7 @@ psql -h localhost -p 4566 -d dev -U root ```sql CREATE SOURCE sensors_readings WITH ( - connector = 'iceberg', + connector = 'iceberg', database.name='db.db', warehouse.path='s3://warehouse/', table.name='sensors_readings', @@ -147,7 +158,7 @@ WITH ( s3.region = 'minio' ); ``` - + And to materialize the averages: ```sql @@ -246,8 +257,7 @@ table.append(data) create_risingwave_compatible_metadata(table, "2") ``` -Now let's go to the third console. -Let connect to CrateDB: +Now let's go to the third console. Let connect to CrateDB: ```bash psql -h localhost -U crate @@ -280,8 +290,8 @@ table.append(data) create_risingwave_compatible_metadata(table, "3") ``` -If now we check again average_sensor_readings from the 3rd console we will see the average has already changed to 4.5 - +If now we check again average_sensor_readings from the 3rd console we will see +the average has already changed to 4.5 [RisingWave]: https://github.com/risingwavelabs/risingwave [Podman]: https://github.com/containers/podman From 1f40ac1aec651acbd2e0312909fa89d39c943eba Mon Sep 17 00:00:00 2001 From: hlcianfagna <110453267+hlcianfagna@users.noreply.github.com> Date: Thu, 5 Jun 2025 09:06:02 +0100 Subject: [PATCH 5/9] Attempt at fixing toctree --- docs/integrate/etl/index.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/integrate/etl/index.md b/docs/integrate/etl/index.md index 5b3ce20f..93d070b0 100644 --- a/docs/integrate/etl/index.md +++ b/docs/integrate/etl/index.md @@ -58,8 +58,15 @@ Tutorials and resources about configuring the managed variants, Astro and CrateD ## Apache Iceberg / RisingWave - +:::{div} - {ref}`iceberg-risingwave` +::: + +```{toctree} +:hidden: + +iceberg-risingwave +``` ## Apache Kafka From 33a84622232a4dc86b54eb320e8fc7324bf5b809 Mon Sep 17 00:00:00 2001 From: hlcianfagna <110453267+hlcianfagna@users.noreply.github.com> Date: Thu, 5 Jun 2025 13:41:55 +0100 Subject: [PATCH 6/9] Quote link to Minio console in localhost to avoid link checker failure Co-authored-by: Andreas Motl --- docs/integrate/etl/iceberg-risingwave.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrate/etl/iceberg-risingwave.md b/docs/integrate/etl/iceberg-risingwave.md index f5436206..138da9d1 100644 --- a/docs/integrate/etl/iceberg-risingwave.md +++ b/docs/integrate/etl/iceberg-risingwave.md @@ -22,7 +22,7 @@ podman run -d --name minio -p 9000:9000 -p 9001:9001 \ ``` And let's create a bucket called `warehouse`, for this point a browser to -http://localhost:9001 , login with `minioadmin` / `minioadmin` , and click on +`http://localhost:9001` , login with `minioadmin` / `minioadmin` , and click on "Create bucket", enter "warehouse", and click again on "Create bucket". Then we will spin up an instance of RisingWave: From 24d84972e648ae73887654569227438995cda2a4 Mon Sep 17 00:00:00 2001 From: hlcianfagna <110453267+hlcianfagna@users.noreply.github.com> Date: Fri, 6 Jun 2025 15:54:15 +0100 Subject: [PATCH 7/9] Replace tabs with spaces in SQL code blocks --- docs/integrate/etl/iceberg-risingwave.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/integrate/etl/iceberg-risingwave.md b/docs/integrate/etl/iceberg-risingwave.md index 138da9d1..b93d6107 100644 --- a/docs/integrate/etl/iceberg-risingwave.md +++ b/docs/integrate/etl/iceberg-risingwave.md @@ -132,8 +132,8 @@ psql -h localhost -U crate ```sql CREATE TABLE public.average_sensor_readings ( - sensor_id BIGINT PRIMARY KEY, - average_reading DOUBLE + sensor_id BIGINT PRIMARY KEY, + average_reading DOUBLE ); ``` @@ -150,12 +150,12 @@ CREATE SOURCE sensors_readings WITH ( connector = 'iceberg', database.name='db.db', - warehouse.path='s3://warehouse/', + warehouse.path='s3://warehouse/', table.name='sensors_readings', - s3.endpoint = 'http://host.containers.internal:9000', + s3.endpoint = 'http://host.containers.internal:9000', s3.access.key = 'minioadmin', s3.secret.key = 'minioadmin', - s3.region = 'minio' + s3.region = 'minio' ); ``` From 1d2d8b3667842346032a9da50609eea81c08320c Mon Sep 17 00:00:00 2001 From: hlcianfagna <110453267+hlcianfagna@users.noreply.github.com> Date: Fri, 6 Jun 2025 16:00:03 +0100 Subject: [PATCH 8/9] Fix link to new tutorial --- docs/integrate/risingwave/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrate/risingwave/index.md b/docs/integrate/risingwave/index.md index cba8c717..1f4ce9a0 100644 --- a/docs/integrate/risingwave/index.md +++ b/docs/integrate/risingwave/index.md @@ -80,7 +80,7 @@ referenced below. ::: - An example with data coming from an Apache Iceberg table and aggregations materialized in real-time in CrateDB, using RisingWave. - See [Stream processing from Iceberg tables to CrateDB using RisingWave]. + See {ref}`iceberg-risingwave`. :::{note} We are tracking interoperability issues per [Tool: RisingWave] and appreciate From 38e8d10df5495d966ff02de59a380babfda47c54 Mon Sep 17 00:00:00 2001 From: hlcianfagna <110453267+hlcianfagna@users.noreply.github.com> Date: Fri, 6 Jun 2025 16:15:46 +0100 Subject: [PATCH 9/9] Apply some suggestions from CodeRabbit --- docs/integrate/etl/iceberg-risingwave.md | 31 ++++++++++++------------ 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/docs/integrate/etl/iceberg-risingwave.md b/docs/integrate/etl/iceberg-risingwave.md index b93d6107..1ac4dafb 100644 --- a/docs/integrate/etl/iceberg-risingwave.md +++ b/docs/integrate/etl/iceberg-risingwave.md @@ -21,9 +21,9 @@ podman run -d --name minio -p 9000:9000 -p 9001:9001 \ quay.io/minio/minio server /data --console-address ":9001" ``` -And let's create a bucket called `warehouse`, for this point a browser to -`http://localhost:9001` , login with `minioadmin` / `minioadmin` , and click on -"Create bucket", enter "warehouse", and click again on "Create bucket". +Now let's create a bucket called `warehouse`, for this point a browser to +`http://localhost:9001`, log in with `minioadmin`/`minioadmin`, click +"Create bucket", name it `warehouse`, and click again on "Create bucket". Then we will spin up an instance of RisingWave: @@ -31,26 +31,26 @@ Then we will spin up an instance of RisingWave: podman run -d --name risingwave -it -p 4566:4566 -p 5691:5691 docker.io/risingwavelabs/risingwave:v2.4.0 single_node ``` -And finally an instance of CrateDB: +And finally, an instance of CrateDB: ```bash podman run -d --name cratedb --publish=4200:4200 --publish=5432:5432 --env CRATE_HEAP_SIZE=1g docker.io/crate/crate:5.10.7 -Cdiscovery.type=single-node ``` -We will need 3 consoles for this demonstration. +We will need three terminals for this demonstration. -On the first console we will use [PyIceberg] and [IPython] to create an Iceberg +On the first terminal, we will use [PyIceberg] and [IPython] to create an Iceberg table, and later we will add data and see how aggregations materialize in CrateDB in real-time. -On the second console we will do the RisingWave and CrateDB setups, and we will +On the second terminal, we will do the RisingWave and CrateDB setups, and we will leave a Python script running for the streaming of changes. -And on the 3rd console we will review how data appears in CrateDB. +And on the third terminal, we will review how data appears in CrateDB. ## Creating an Iceberg table -Let's start on the first console. We use a Python script to create an Iceberg +Let's start on the first terminal. We use a Python script to create an Iceberg table on the bucket we created earlier on Minio, and as we want to keep things simple, we will use an ephemeral in-memory catalog. @@ -60,7 +60,6 @@ ipython3 ``` ```python -import re from datetime import datetime import pyarrow as pa @@ -114,7 +113,7 @@ create_risingwave_compatible_metadata(table, "1") ## RisingWave and CrateDB setups -Let's now switch to the second console. +Let's now switch to the second terminal. To interact with both CrateDB and RisingWave we will use the `psql` command line utility, let's install it: @@ -238,7 +237,7 @@ python cratedb_event_handler.py ## Adding some data and seeing results materialize in real-time -Let's go back to the first console and run: +Let's go back to the first terminal and run: ```python data = pa.Table.from_pydict( @@ -257,7 +256,7 @@ table.append(data) create_risingwave_compatible_metadata(table, "2") ``` -Now let's go to the third console. Let connect to CrateDB: +Now let's go to the third terminal. Let connect to CrateDB: ```bash psql -h localhost -U crate @@ -271,7 +270,7 @@ SELECT * FROM public.average_sensor_readings; The average for sensor 1 is 2.3 -Let's go back to the first console and run: +Let's go back to the first terminal and run: ```python data = pa.Table.from_pydict( @@ -290,8 +289,8 @@ table.append(data) create_risingwave_compatible_metadata(table, "3") ``` -If now we check again average_sensor_readings from the 3rd console we will see -the average has already changed to 4.5 +If we now check `average_sensor_readings` from the third terminal, we will see +that the average has already changed to 4.5. [RisingWave]: https://github.com/risingwavelabs/risingwave [Podman]: https://github.com/containers/podman