Steven's Knowledge

Getting Started

Run Materialize locally, create a SQL-defined materialized view over a Kafka topic that stays fresh

Getting Started

This page boots Materialize with a Kafka source and shows the "streaming database" model end-to-end. By the end you'll have a SQL view that updates as events arrive in Kafka — no application code.

For Flink and Kafka Streams equivalents, see the Patterns page.

Bring Up Kafka + Materialize

# docker-compose.yml
services:
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    ports: ["9092:9092"]
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qg"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  materialize:
    image: materialize/materialized:latest
    ports:
      - "6875:6875"   # SQL (Postgres wire protocol)
      - "6878:6878"   # HTTP admin
docker compose up -d

# Materialize speaks the Postgres protocol — use any Postgres client
psql -h localhost -p 6875 -U materialize materialize

Create a Kafka Source

In Materialize:

-- Define how to connect to Kafka
CREATE CONNECTION kafka_conn TO KAFKA (BROKER 'kafka:9092');

-- Create a source that pulls from a Kafka topic
CREATE SOURCE clicks
  FROM KAFKA CONNECTION kafka_conn (TOPIC 'clicks')
  FORMAT JSON
  ENVELOPE NONE;

Now clicks is a relation in Materialize, continuously fed by the Kafka topic.

Materialized Views Over the Stream

-- Clicks per user, last 5 minutes, kept fresh
CREATE MATERIALIZED VIEW clicks_per_user AS
SELECT
  (data->>'user_id')::text AS user_id,
  COUNT(*) AS clicks
FROM clicks
WHERE (data->>'ts')::timestamp > mz_now() - INTERVAL '5 minutes'
GROUP BY (data->>'user_id')::text;

-- Top-10 most active users right now
CREATE MATERIALIZED VIEW top_users AS
SELECT user_id, clicks
FROM clicks_per_user
ORDER BY clicks DESC
LIMIT 10;

The views maintain themselves. As Kafka receives events, Materialize incrementally updates the views.

Produce Some Events

In another terminal, produce sample events:

docker compose exec kafka kafka-console-producer \
  --bootstrap-server kafka:9092 --topic clicks <<EOF
{"user_id": "alice", "ts": "2025-05-21T14:00:00Z", "url": "/page1"}
{"user_id": "bob",   "ts": "2025-05-21T14:00:05Z", "url": "/page2"}
{"user_id": "alice", "ts": "2025-05-21T14:00:10Z", "url": "/page3"}
{"user_id": "alice", "ts": "2025-05-21T14:00:15Z", "url": "/page1"}
EOF

Query the View

Back in the Materialize SQL session:

SELECT * FROM top_users;

-- Will show:
-- user_id | clicks
-- --------+--------
-- alice   |   3
-- bob     |   1

Produce more events; query again — the view is fresh. No batch jobs, no consumers, no state to manage.

Subscribe to Changes

The killer feature: SUBSCRIBE streams updates to the view as they happen:

COPY (SUBSCRIBE TO top_users) TO STDOUT;

Now every change to top_users appears as a row. Your app can hold this connection open and react in real time — push notifications, dashboard updates, etc.

Join Streams

Multiple sources, joined live:

CREATE SOURCE users
  FROM KAFKA CONNECTION kafka_conn (TOPIC 'users-cdc')
  FORMAT JSON ENVELOPE UPSERT;     -- CDC: latest state per key

CREATE MATERIALIZED VIEW clicks_with_user AS
SELECT
  c.user_id,
  u.data->>'name' AS name,
  u.data->>'tier' AS tier,
  COUNT(*) AS clicks
FROM clicks c
JOIN users u ON c.user_id = (u.data->>'id')
WHERE (c.data->>'ts')::timestamp > mz_now() - INTERVAL '1 hour'
GROUP BY c.user_id, u.data->>'name', u.data->>'tier';

Now clicks_with_user joins click events with the latest user state — staying fresh as either side changes. Try doing that with a normal database and you'd be writing a lot of code.

Sink Output Back

For downstream consumers:

CREATE SINK alerts
  FROM top_users
  INTO KAFKA CONNECTION kafka_conn (TOPIC 'top-users-output')
  FORMAT JSON
  ENVELOPE UPSERT (KEY (user_id));

Materialize publishes view changes to top-users-output as they happen. Downstream services consume the topic.

Equivalent: Kafka Streams (JVM)

// Continuously count clicks per user, window of 5 minutes
StreamsBuilder builder = new StreamsBuilder();
builder.<String, ClickEvent>stream("clicks")
  .groupBy((k, v) -> v.getUserId())
  .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
  .count()
  .toStream()
  .to("clicks-per-user-5m");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Same outcome; you write JVM code, run a JVM process, manage state via RocksDB embedded in your app.

-- Run in Flink SQL Gateway
CREATE TABLE clicks (
  user_id STRING,
  url STRING,
  ts TIMESTAMP_LTZ(3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH ('connector' = 'kafka', 'topic' = 'clicks', ...);

CREATE TABLE top_users (
  user_id STRING,
  clicks BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka', 'topic' = 'top-users', ...);

INSERT INTO top_users
SELECT user_id, COUNT(*) AS clicks
FROM clicks
GROUP BY user_id, TUMBLE(ts, INTERVAL '5' MINUTES);

Flink has the heavier operational story (cluster, state backend, checkpoints) but matches Materialize's expressiveness and handles much larger state.

Tear Down

docker compose down -v

What's Next

You have a stream that auto-maintains a SQL view. Next:

  • Patterns — windowing, watermarks, joins, state, exactly-once, CDC
  • Best Practices — state sizing, observability, recovery, latency budgets

On this page