Data ingestion

Last updated:

|Edit this page

This document covers:

  • Different options for ingesting data into MergeTree tables and trade-offs involved
  • How the Kafka table engine works
  • What are materialized views?
  • Examples of a full schema setup

Using INSERTs for ingestion

As any database system, ClickHouse allows using INSERTs to load data.

Each INSERT creates a new part in ClickHouse, which comes with a lot of overhead and, in a busy system, will lead to errors due to exceeding parts_to_throw MergeTree table setting (default 300).

ClickHouse provides a bunch of options to make INSERTs still work. For example:

These come with their own trade-offs, consistency problems, and require the ClickHouse cluster to always be accessible.

Why we ingest via Kafka tables

We instead rely on the Kafka table engine to handle ingestion into ClickHouse.

The benefits are:

  • Resiliency: Kafka handles sudden spikes in traffic and ClickHouse cluster unavailability gracefully
  • PostHog already uses Kafka throughout the app, making it a safe technical choice

It also has minimal overhead in terms of memory used and allows us to always temporarily stop ingestion by removing the tables in question.

How Kafka tables work

Kafka engine tables act as Kafka consumers in a given consumer group. Selecting from that table advances the consumer offsets.

A Kafka table on its own does nothing beyond allowing querying data from Kafka - it needs to be paired with other tables for ingestion to work.

Important note: Given Kafka engine tables operate like consumers, querying data from them moves the offsets for the consumer group forward. Doing this while ingesting data may cause data loss, and has been disallowed by default on the latest ClickHouse versions.

Example kafka engine table:

SQL
CREATE TABLE kafka_ingestion_warnings
(
team_id Int64,
source LowCardinality(VARCHAR),
type VARCHAR,
details VARCHAR CODEC(ZSTD(3)),
timestamp DateTime64(6, 'UTC')
)
ENGINE = Kafka('kafka:9092', 'clickhouse_ingestion_warnings_test', 'group1', 'JSONEachRow')

It is important to send correctly formatted messages to the topic you're selecting from. When selecting from a Kafka table, ClickHouse assumes messages in the topic are formatted correctly. If not, this may stall the consumer depending on the value of kafka_skip_broken_messages, breaking ingestion.

Beyond just skipping broken messages, it's also possible to set up a dead letter queue system for these in ClickHouse. You can read more about doing so in this Altinity blog post.

Materialized views

Materialized views in ClickHouse can be thought of as triggers - they react to new blocks being INSERTed into source tables and allow transforming and piping that data to other tables.

Materialized views come with a lot of gotchas. A great resource for learning more about them is this presentation.

Example schema - reading and writing ingestion events

Consider the following sharded table schema together with kafka_ingestion_warnings:

SQL
CREATE TABLE sharded_ingestion_warnings
(
team_id Int64,
source LowCardinality(VARCHAR),
type VARCHAR,
details VARCHAR CODEC(ZSTD(3)),
timestamp DateTime64(6, 'UTC'),
_timestamp DateTime,
_offset UInt64,
_partition UInt64
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/posthog.sharded_ingestion_warnings', '{replica}')
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (team_id, toHour(timestamp), type, source, timestamp)
CREATE TABLE ingestion_warnings ON CLUSTER 'posthog'
(
team_id Int64,
source LowCardinality(VARCHAR),
type VARCHAR,
details VARCHAR CODEC(ZSTD(3)),
timestamp DateTime64(6, 'UTC'),
_timestamp DateTime,
_offset UInt64,
_partition UInt64
)
ENGINE = Distributed('posthog', 'posthog', 'sharded_ingestion_warnings', rand())
CREATE MATERIALIZED VIEW ingestion_warnings_mv
TO posthog.ingestion_warnings
AS SELECT
team_id,
source,
type,
details,
timestamp,
_timestamp,
_offset,
_partition
FROM posthog.kafka_ingestion_warnings

In this schema:

  • sharded_ingestion_warnings MergeTree is responsible for storing the ingested data
  • ingestion_warnings table is responsible for fielding queries and distributing writes to sharded_ingestion_warnings tables across shards
  • ingestion_warnings_mv regularly polls kafka_ingestion_warnings and pushes the data to ingestion_warnings distributed table
    • Note: it also forwards _timestamp, _offset, and _partition virtual columns containing Kafka message metadata so they can be stored and used during debugging.

Example schema visualized

This is the same schema visualized in a ClickHouse cluster with 2 shards and 1 replica each:

ClickHouse Cluster
ClickHouse Shard 1, Replica 1
ClickHouse Shard 2, Replica 1
reads from
reads from
pushes data to
pushes data to
pushes data to
pushes data to
pushes data to
pushes data to
kafka_ingestion_warnings table
(Kafka table engine)
ingestion_warnings_mv table
(Materialized view)
ingestion_warnings table
(Distributed table engine)
sharded_ingestion_warnings table
(ReplicatedMergeTree table engine)
kafka_ingestion_warnings table
(Kafka table engine)
ingestion_warnings_mv table
(Materialized view)
ingestion_warnings table
(Distributed table engine)
sharded_ingestion_warnings table
(ReplicatedMergeTree table engine)
clickhouse_events_proto topic in Kafka

Further reading

Next in the ClickHouse manual: Working with JSON

Questions?

Was this page useful?

Next article

Working with JSON

At PostHog, we store arbitrary payloads users send us for further analysis as JSON. As such, it's critical we do a good job at storing and analyzing this data. This document covers: Storing JSON in String s and operations on them Why and how to compress this data Materialized columns Alternative solutions: JSON data type, arrays JSON Strings At PostHog, we store JSON data as VARCHAR (or String ) columns. Relevant properties are then parsed out from the String columns at query-time using…

Read next article