Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
**/.DS_Store
.env
.idea
.venv/
__pycache__/
__pycache__/
44 changes: 44 additions & 0 deletions README-CRATEDB.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# CrateDB Model Context Protocol Server

## About

This CrateDB MCP Server is based on the PostgreSQL Model Context Protocol (PG-MCP) Server.
[pg-mcp] uses [asyncpg], so it can also be used with CrateDB.

`server/resources/schema.py` received a few adjustments to compensate for
missing metadata features of CrateDB, nothing serious.

## Usage

Start CrateDB.
```shell
docker run --rm \
--name=cratedb --publish=4200:4200 --publish=5432:5432 \
--env=CRATE_HEAP_SIZE=2g crate/crate:nightly \
-Cdiscovery.type=single-node
```

Initialize Python environment.
```shell
git clone https://github.com/crate-workbench/pg-mcp --branch=cratedb
cd pg-mcp
uv venv --python 3.13 --seed .venv
uv sync --frozen
```

Run MCP server and test program.
```shell
uv run -m server.app
uv run test.py "postgresql://crate@localhost/doc"
```

Run example Claude session (untested).
```shell
export DATABASE_URL=postgresql://crate@localhost
export ANTHROPIC_API_KEY=...
uv run -m client.claude_cli "Give me 5 Austria mountains (querying specific tables, like sys.summits)"
```


[asyncpg]: https://pypi.org/project/asyncpg
[pg-mcp]: https://github.com/stuzero/pg-mcp
112 changes: 22 additions & 90 deletions server/resources/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ async def db_info(conn_id: str):
# Get all non-system schemas
schemas_query = """
SELECT
schema_name,
obj_description(pg_namespace.oid) as description
schema_name
FROM information_schema.schemata
JOIN pg_namespace ON pg_namespace.nspname = schema_name
WHERE
Expand All @@ -45,9 +44,7 @@ async def db_info(conn_id: str):
# Get all tables in the schema
tables_query = """
SELECT
t.table_name,
obj_description(format('"%s"."%s"', t.table_schema, t.table_name)::regclass::oid) as description,
pg_stat_get_tuples_inserted(format('"%s"."%s"', t.table_schema, t.table_name)::regclass::oid) as row_count
t.table_name
FROM information_schema.tables t
WHERE
t.table_schema = $1
Expand Down Expand Up @@ -76,16 +73,15 @@ async def db_info(conn_id: str):
c.column_name,
c.data_type,
c.is_nullable,
c.column_default,
col_description(format('"%s"."%s"', c.table_schema, c.table_name)::regclass::oid, c.ordinal_position) as description
c.column_default
FROM information_schema.columns c
WHERE
c.table_schema = $1 AND
c.table_name = $2
ORDER BY c.ordinal_position
"""
columns = await execute_query(columns_query, conn_id, [schema_name, table_name])

# Get constraints for this table to identify primary keys, etc.
constraints_query = """
SELECT
Expand All @@ -97,18 +93,15 @@ async def db_info(conn_id: str):
WHEN c.contype = 'f' THEN 'FOREIGN KEY'
WHEN c.contype = 'c' THEN 'CHECK'
ELSE 'OTHER'
END as constraint_type_desc,
ARRAY_AGG(col.attname ORDER BY u.attposition) as column_names
END as constraint_type_desc
FROM
pg_constraint c
JOIN
pg_namespace n ON n.oid = c.connamespace
JOIN
pg_class t ON t.oid = c.conrelid
LEFT JOIN
LATERAL unnest(c.conkey) WITH ORDINALITY AS u(attnum, attposition) ON TRUE
LEFT JOIN
pg_attribute col ON col.attrelid = t.oid AND col.attnum = u.attnum
pg_attribute col ON col.attrelid = t.oid
WHERE
n.nspname = $1
AND t.relname = $2
Expand All @@ -118,7 +111,7 @@ async def db_info(conn_id: str):
c.contype, c.conname
"""
constraints = await execute_query(constraints_query, conn_id, [schema_name, table_name])

# Process columns and add constraint information
for column in columns:
column_name = column['column_name']
Expand All @@ -128,56 +121,22 @@ async def db_info(conn_id: str):
for constraint in constraints:
if column_name in constraint.get('column_names', []):
column_constraints.append(constraint['constraint_type_desc'])

# Add column info
column_info = {
"name": column_name,
"type": column['data_type'],
"nullable": column['is_nullable'] == 'YES',
"default": column['column_default'],
"description": column['description'],
"constraints": column_constraints
}

table_info["columns"].append(column_info)

# Process foreign key constraints
foreign_keys_query = """
SELECT
c.conname as constraint_name,
ARRAY_AGG(col.attname ORDER BY u.attposition) as column_names,
nr.nspname as referenced_schema,
ref_table.relname as referenced_table,
ARRAY_AGG(ref_col.attname ORDER BY u2.attposition) as referenced_columns
FROM
pg_constraint c
JOIN
pg_namespace n ON n.oid = c.connamespace
JOIN
pg_class t ON t.oid = c.conrelid
JOIN
pg_class ref_table ON ref_table.oid = c.confrelid
JOIN
pg_namespace nr ON nr.oid = ref_table.relnamespace
LEFT JOIN
LATERAL unnest(c.conkey) WITH ORDINALITY AS u(attnum, attposition) ON TRUE
LEFT JOIN
pg_attribute col ON col.attrelid = t.oid AND col.attnum = u.attnum
LEFT JOIN
LATERAL unnest(c.confkey) WITH ORDINALITY AS u2(attnum, attposition) ON TRUE
LEFT JOIN
pg_attribute ref_col ON ref_col.attrelid = c.confrelid AND ref_col.attnum = u2.attnum
WHERE
n.nspname = $1
AND t.relname = $2
AND c.contype = 'f'
GROUP BY
c.conname, nr.nspname, ref_table.relname
ORDER BY
c.conname
"""
foreign_keys = await execute_query(foreign_keys_query, conn_id, [schema_name, table_name])

# CrateDB does not provide foreign key constraints.
foreign_keys = []

for fk in foreign_keys:
fk_info = {
"name": fk['constraint_name'],
Expand All @@ -201,8 +160,7 @@ async def list_schemas(conn_id: str):
"""List all non-system schemas in the database."""
query = """
SELECT
schema_name,
obj_description(pg_namespace.oid) as description
schema_name
FROM information_schema.schemata
JOIN pg_namespace ON pg_namespace.nspname = schema_name
WHERE
Expand All @@ -217,9 +175,7 @@ async def list_schema_tables(conn_id: str, schema: str):
"""List all tables in a specific schema with their descriptions."""
query = """
SELECT
t.table_name,
obj_description(format('"%s"."%s"', t.table_schema, t.table_name)::regclass::oid) as description,
pg_stat_get_tuples_inserted(format('"%s"."%s"', t.table_schema, t.table_name)::regclass::oid) as total_rows
t.table_name
FROM information_schema.tables t
WHERE
t.table_schema = $1
Expand All @@ -236,8 +192,7 @@ async def get_table_columns(conn_id: str, schema: str, table: str):
c.column_name,
c.data_type,
c.is_nullable,
c.column_default,
col_description(format('"%s"."%s"', c.table_schema, c.table_name)::regclass::oid, c.ordinal_position) as description
c.column_default
FROM information_schema.columns c
WHERE
c.table_schema = $1 AND
Expand All @@ -252,10 +207,7 @@ async def get_table_indexes(conn_id: str, schema: str, table: str):
query = """
SELECT
i.relname as index_name,
pg_get_indexdef(i.oid) as index_definition,
obj_description(i.oid) as description,
am.amname as index_type,
ARRAY_AGG(a.attname ORDER BY k.i) as column_names,
ix.indisunique as is_unique,
ix.indisprimary as is_primary,
ix.indisexclusion as is_exclusion
Expand All @@ -270,9 +222,7 @@ async def get_table_indexes(conn_id: str, schema: str, table: str):
JOIN
pg_am am ON i.relam = am.oid
LEFT JOIN
LATERAL unnest(ix.indkey) WITH ORDINALITY AS k(attnum, i) ON TRUE
LEFT JOIN
pg_attribute a ON a.attrelid = t.oid AND a.attnum = k.attnum
pg_attribute a ON a.attrelid = t.oid
WHERE
n.nspname = $1
AND t.relname = $2
Expand All @@ -299,14 +249,11 @@ async def get_table_constraints(conn_id: str, schema: str, table: str):
WHEN c.contype = 'x' THEN 'EXCLUSION'
ELSE 'OTHER'
END as constraint_type_desc,
obj_description(c.oid) as description,
pg_get_constraintdef(c.oid) as definition,
CASE
WHEN c.contype = 'f' THEN
(SELECT nspname FROM pg_namespace WHERE oid = ref_table.relnamespace) || '.' || ref_table.relname
ELSE NULL
END as referenced_table,
ARRAY_AGG(col.attname ORDER BY u.attposition) as column_names
END as referenced_table
FROM
pg_constraint c
JOIN
Expand All @@ -316,9 +263,7 @@ async def get_table_constraints(conn_id: str, schema: str, table: str):
LEFT JOIN
pg_class ref_table ON ref_table.oid = c.confrelid
LEFT JOIN
LATERAL unnest(c.conkey) WITH ORDINALITY AS u(attnum, attposition) ON TRUE
LEFT JOIN
pg_attribute col ON col.attrelid = t.oid AND col.attnum = u.attnum
pg_attribute col ON col.attrelid = t.oid
WHERE
n.nspname = $1
AND t.relname = $2
Expand All @@ -335,8 +280,6 @@ async def get_index_details(conn_id: str, schema: str, table: str, index: str):
query = """
SELECT
i.relname as index_name,
pg_get_indexdef(i.oid) as index_definition,
obj_description(i.oid) as description,
am.amname as index_type,
ix.indisunique as is_unique,
ix.indisprimary as is_primary,
Expand All @@ -345,9 +288,7 @@ async def get_index_details(conn_id: str, schema: str, table: str, index: str):
ix.indisclustered as is_clustered,
ix.indisvalid as is_valid,
i.relpages as pages,
i.reltuples as rows,
ARRAY_AGG(a.attname ORDER BY k.i) as column_names,
ARRAY_AGG(pg_get_indexdef(i.oid, k.i, false) ORDER BY k.i) as column_expressions
i.reltuples as rows
FROM
pg_index ix
JOIN
Expand All @@ -359,9 +300,7 @@ async def get_index_details(conn_id: str, schema: str, table: str, index: str):
JOIN
pg_am am ON i.relam = am.oid
LEFT JOIN
LATERAL unnest(ix.indkey) WITH ORDINALITY AS k(attnum, i) ON TRUE
LEFT JOIN
pg_attribute a ON a.attrelid = t.oid AND a.attnum = k.attnum
pg_attribute a ON a.attrelid = t.oid
WHERE
n.nspname = $1
AND t.relname = $2
Expand Down Expand Up @@ -389,17 +328,14 @@ async def get_constraint_details(conn_id: str, schema: str, table: str, constrai
WHEN c.contype = 'x' THEN 'EXCLUSION'
ELSE 'OTHER'
END as constraint_type_desc,
obj_description(c.oid) as description,
pg_get_constraintdef(c.oid) as definition,
CASE
WHEN c.contype = 'f' THEN
(SELECT nspname FROM pg_namespace WHERE oid = ref_table.relnamespace) || '.' || ref_table.relname
ELSE NULL
END as referenced_table,
ARRAY_AGG(col.attname ORDER BY u.attposition) as column_names,
CASE
WHEN c.contype = 'f' THEN
ARRAY_AGG(ref_col.attname ORDER BY u2.attposition)
ARRAY_AGG(ref_col.attname)
ELSE NULL
END as referenced_columns
FROM
Expand All @@ -411,13 +347,9 @@ async def get_constraint_details(conn_id: str, schema: str, table: str, constrai
LEFT JOIN
pg_class ref_table ON ref_table.oid = c.confrelid
LEFT JOIN
LATERAL unnest(c.conkey) WITH ORDINALITY AS u(attnum, attposition) ON TRUE
LEFT JOIN
pg_attribute col ON col.attrelid = t.oid AND col.attnum = u.attnum
LEFT JOIN
LATERAL unnest(c.confkey) WITH ORDINALITY AS u2(attnum, attposition) ON c.contype = 'f'
pg_attribute col ON col.attrelid = t.oid
LEFT JOIN
pg_attribute ref_col ON c.contype = 'f' AND ref_col.attrelid = c.confrelid AND ref_col.attnum = u2.attnum
pg_attribute ref_col ON c.contype = 'f' AND ref_col.attrelid = c.confrelid
WHERE
n.nspname = $1
AND t.relname = $2
Expand Down
2 changes: 1 addition & 1 deletion server/tools/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async def pg_explain(query: str, conn_id: str, params=None):
Complete JSON-formatted execution plan
"""
# Prepend EXPLAIN to the query
explain_query = f"EXPLAIN (FORMAT JSON) {query}"
explain_query = f"EXPLAIN {query}"

# Execute the explain query
result = await execute_query(explain_query, conn_id, params)
Expand Down
4 changes: 2 additions & 2 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ async def run(connection_string: str | None):
print(f"Error fetching extensions: {e}")

# Find a schema with tables to test table resources
for schema_idx, schema in enumerate(schemas_data[:3]):
for schema_idx, schema in enumerate(schemas_data[:10]):
schema_name = schema.get('schema_name')

print(f"\nTesting tables for schema '{schema_name}'...")
Expand Down Expand Up @@ -292,7 +292,7 @@ async def run(connection_string: str | None):
break

# Test disconnect tool if available
break # Exit schema loop once we've found a table
# break # Exit schema loop once we've found a table
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the break be removed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be removed, sure. I didn't want to change too much compared to the original, that's why it's just a comment.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also note this PR is just for visibility purposes. It will probably never be merged, at least not in its current form.

except json.JSONDecodeError:
print(f"Error parsing schemas: {content_item.text[:100]}")

Expand Down