# Data storage or what is a MergeTree - Handbook

This document covers the answers to the following questions:

-   How data is stored on disk for MergeTree engine family tables
-   What are `parts`, `granules` and `marks`
-   How and why choosing the correct `ORDER BY` and `PARTITION BY` in table definitions affects query performance
-   How to use `EXPLAIN` to understand what ClickHouse is doing
-   Difference between `PREWHERE` and `WHERE`
-   Data compression

## Introduction to MergeTree

[Why is ClickHouse so fast?](https://clickhouse.com/docs/en/faq/general/why-clickhouse-is-so-fast/) states:

> ClickHouse was initially built as a prototype to do just a single task well: to filter and aggregate data as fast as possible.

Rather than force all possible tasks to be solved by singular tools, ClickHouse provides specialized "engines" that each solve specific problems.

MergeTree engine family tables are intended for ingesting large amounts of data, storing that data efficiently, and running analytical queries on it.

## How MergeTree stores data

Consider the following (simplified) table for storing sensor events:

SQL

PostHog AI

```sql
CREATE TABLE sensor_values (
    timestamp DateTime,
    site_id UInt32,
    event VARCHAR,
    uuid UUID,
    metric_value Int32
)
ENGINE = MergeTree()
ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)
SETTINGS index_granularity = 8192
```

Data for this table would be stored in **parts**, each part a separate directory on disk. Data for a given part is always sorted by the order set in `ORDER BY` statement and compressed.

Parts can be `Wide` or `Compact` depending on its size. We'll be mostly dealing with `Wide` parts as part of day-to-day operations.

`Wide` parts are large and store each column in a separate binary data file, which are sorted and compressed.

ClickHouse also stores a sparse index for the part. A collection of rows with size equal to the `index_granularity` setting is called a **granule**. For every **granule**, the primary index stores a **mark** containing the value of the `ORDER BY` statement as well as a pointer to where that **mark** is located in each data file.

> 💡 For better performance when running queries, it is not recommended to set `index_granularity` too low. The default value for engines in the `MergeTree` family is 8192. An implication of this is that accessing data by primary key (in this case the `ORDER BY` clause is equivalent to the primary key) will not read just one row, but rather up to `index_granularity` number of rows. This is acceptable given ClickHouse is meant to perform well with aggregations, rather than point lookups.

Diving deeper into data-on-disk for a Wide part

This assumes you're using a docker-based ClickHouse installation and have `clickhouse-client` running

##### Seeding data

SQL

PostHog AI

```sql
INSERT INTO sensor_values
SELECT *
FROM generateRandom('timestamp DateTime, site_id UInt8, event VARCHAR, uuid UUID, metric_value Int32', NULL, 10)
LIMIT 200000000
```

##### Looking at part data

[`system.parts`](https://clickhouse.com/docs/en/operations/system-tables/parts/) table contains a lot of metadata about every part.

To find out what type each part is, its size, and where on disk it's located, you can run the following query:

SQL

PostHog AI

```sql
SELECT
    name,
    part_type,
    rows,
    marks,
    formatReadableSize(bytes_on_disk),
    formatReadableSize(data_compressed_bytes),
    formatReadableSize(data_uncompressed_bytes),
    formatReadableSize(marks_bytes),
    path
FROM system.parts
WHERE active and table = 'sensor_values'
FORMAT Vertical
```

The result might look something like this:

PostHog AI

```
Row 1:
──────
name:                                        all_12_17_1
part_type:                                   Wide
rows:                                        6291270
marks:                                       769
formatReadableSize(bytes_on_disk):           476.07 MiB
formatReadableSize(data_compressed_bytes):   475.92 MiB
formatReadableSize(data_uncompressed_bytes): 474.00 MiB
formatReadableSize(marks_bytes):             90.12 KiB
path:                                        /var/lib/clickhouse/store/267/267cd730-33ca-4e43-8a84-e4f0786e364b/all_12_17_1/
```

##### Inspecting data on disk

Terminal

PostHog AI

```bash
⟩ docker exec -it posthog_clickhouse_1 ls -lhS /var/lib/clickhouse/store/267/267cd730-33ca-4e43-8a84-e4f0786e364b/all_12_17_1/
total 477M
-rw-r----- 1 clickhouse clickhouse 308M Nov  2 07:33 event.bin
-rw-r----- 1 clickhouse clickhouse  97M Nov  2 07:33 uuid.bin
-rw-r----- 1 clickhouse clickhouse  25M Nov  2 07:33 metric_value.bin
-rw-r----- 1 clickhouse clickhouse  25M Nov  2 07:33 timestamp.bin
-rw-r----- 1 clickhouse clickhouse  25M Nov  2 07:33 site_id.bin
-rw-r----- 1 clickhouse clickhouse  58K Nov  2 07:33 primary.idx
-rw-r----- 1 clickhouse clickhouse  19K Nov  2 07:33 event.mrk2
-rw-r----- 1 clickhouse clickhouse  19K Nov  2 07:33 metric_value.mrk2
-rw-r----- 1 clickhouse clickhouse  19K Nov  2 07:33 site_id.mrk2
-rw-r----- 1 clickhouse clickhouse  19K Nov  2 07:33 timestamp.mrk2
-rw-r----- 1 clickhouse clickhouse  19K Nov  2 07:33 uuid.mrk2
-rw-r----- 1 clickhouse clickhouse  494 Nov  2 07:33 checksums.txt
-rw-r----- 1 clickhouse clickhouse  123 Nov  2 07:33 columns.txt
-rw-r----- 1 clickhouse clickhouse   10 Nov  2 07:33 default_compression_codec.txt
-rw-r----- 1 clickhouse clickhouse    7 Nov  2 07:33 count.txt
```

What are these files?

-   For every column, there's a `{column_name}.bin` file, containing the compressed (LZ4 compression by default) data for that column. These take up most of the space.
-   For every column, there's a `{column_name}.mrk2` file, contains an index with data to locate each granule in `{column_name}.bin` file
-   `primary.idx` contains information on ORDER BY column values for each granule. This is loaded into memory during queries.
-   `checksums.txt`, `columns.txt`, `default_compression_codec.txt` and `count.txt` contain metadata about this part.

You can read more on the exact structure of these files and how they're used in [ClickHouse Index Design documentation](https://clickhouse.com/docs/en/guides/improving-query-performance/sparse-primary-indexes/sparse-primary-indexes-design/).

## What does the Merge stand for?

In every system, data must be ingested and kept up-to-date somehow. When data is inserted into MergeTree tables, each insert creates one or multiple parts for the data inserted.

As having a lot of small files would be disadvantageous for many reasons from query performance to storage, ClickHouse regularly **merges** small parts together until they [reach a maximum size](https://clickhouse.com/docs/en/operations/settings/merge-tree-settings/#max-bytes-to-merge-at-max-space-in-pool).

The merge combines the two parts into a new one. This is similar to how [merge sort](https://en.wikipedia.org/wiki/Merge_sort) works and atomically replaces the two source parts.

Merges can be monitored using the [`system.merges` table](https://clickhouse.com/docs/en/operations/system-tables/merges/).

## Query execution

### Aggregation supported by `ORDER BY`

Our `sensor_values` table is set up in a way that queries similar to the following are really fast to execute.

SQL

PostHog AI

```sql
SELECT
    toStartOfDay(timestamp),
    event,
    sum(metric_value) as total_metric_value
FROM sensor_values
WHERE site_id = 233 AND timestamp > '2010-01-01' and timestamp < '2023-01-01'
GROUP BY toStartOfDay(timestamp), event
ORDER BY total_metric_value DESC
LIMIT 20
```

Executing this reports:

PostHog AI

```
20 rows in set. Elapsed: 0.042 sec. Processed 90.11 thousand rows, 3.54 MB (2.13 million rows/s., 83.60 MB/s.)
```

Why can it be fast? Because ClickHouse:

1.  leverages the table `ORDER BY` clause (`ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)`) to skip reading a lot of data
2.  is fast and efficient about I/O and aggregation

Let's dig into how the primary index for this query is used by using [`EXPLAIN`](https://clickhouse.com/docs/en/sql-reference/statements/explain/).

SQL

PostHog AI

```sql
EXPLAIN indexes=1, header=1 SELECT
    toStartOfDay(timestamp),
    event,
    sum(metric_value) as total_metric_value
FROM sensor_values
WHERE site_id = 233 AND timestamp > '2010-01-01' and timestamp < '2023-01-01'
GROUP BY toStartOfDay(timestamp), event
ORDER BY total_metric_value DESC
LIMIT 20
FORMAT LineAsString
```

Show full \`EXPLAIN\` output

PostHog AI

```
Expression (Projection)
Header: toStartOfDay(timestamp) DateTime
        event String
        total_metric_value Int64
  Limit (preliminary LIMIT (without OFFSET))
  Header: toStartOfDay(timestamp) DateTime
          event String
          sum(metric_value) Int64
    Sorting (Sorting for ORDER BY)
    Header: toStartOfDay(timestamp) DateTime
            event String
            sum(metric_value) Int64
      Expression (Before ORDER BY)
      Header: toStartOfDay(timestamp) DateTime
              event String
              sum(metric_value) Int64
        Aggregating
        Header: toStartOfDay(timestamp) DateTime
                event String
                sum(metric_value) Int64
          Expression (Before GROUP BY)
          Header: event String
                  metric_value Int32
                  toStartOfDay(timestamp) DateTime
            Filter (WHERE)
            Header: timestamp DateTime
                    event String
                    metric_value Int32
              SettingQuotaAndLimits (Set limits and quota after reading from storage)
              Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
                      timestamp DateTime
                      site_id UInt32
                      event String
                      metric_value Int32
                ReadFromMergeTree
                Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
                        timestamp DateTime
                        site_id UInt32
                        event String
                        metric_value Int32
                Indexes:
                  PrimaryKey
                    Keys:
                      site_id
                      toStartOfDay(timestamp)
                    Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))
                    Parts: 2/2
                    Granules: 11/24415
```

The full output of explain is obtuse, but the most important part is also the most deeply nested one:

PostHog AI

```
ReadFromMergeTree
Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
        timestamp DateTime
        site_id UInt32
        event String
        metric_value Int32
Indexes:
    PrimaryKey
    Keys:
        site_id
        toStartOfDay(timestamp)
    Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))
    Parts: 2/2
    Granules: 11/24415
```

At the start of the query, ClickHouse loaded the primary index of each part into memory. From this output, we know that the query first used the primary key to filter based on `site_id` and `timestamp` values stored in the index. This allowed it to know that only 11 out of 24415 granules (0.05%) contained any relevant data.

From there it read those 11 granules (11 \* 8192 rows) worth of data from `timestamp`, `side_id`, `event` and `metric_value` columns and did the rest of filtering and aggregation on that data alone.

See [this documentation](https://kb.altinity.com/engines/mergetree-table-engine-family/pick-keys/) for a guide on how to choose `ORDER BY`.

### "Point queries" not supported by `ORDER BY`

Consider this query:

SQL

PostHog AI

```sql
SELECT * FROM sensor_values WHERE uuid = '69028f26-768f-afef-1816-521b22d281ca'
```

Executing this query reports:

PostHog AI

```
1 row in set. Elapsed: 0.703 sec. Processed 200.00 million rows, 3.20 GB (304.43 million rows/s., 4.87 GB/s.)
```

While the overall execution time of this query is not bad thanks to fast I/O, it needed to read 2200x the amount of data from disk. As the dataset size or column sizes increase, this performance would get dramatically worse.

Why is this query slower? Because our `ORDER BY` does not support fast filtering by `uuid` and ClickHouse needs to read the whole table to find a single record *and* read all columns.

ClickHouse provides some ways to make this faster (e.g. [Projections](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree#projections)) but in general these require extra disk space or have other trade-offs.

Thus, it's important to make sure the ClickHouse schema is aligned with queries that are being executed.

## `PARTITION BY`

Another tool to make queries faster is `PARTITION BY`. Consider the updated table definition:

SQL

PostHog AI

```sql
CREATE TABLE sensor_values (
    timestamp DateTime,
    site_id UInt32,
    event VARCHAR,
    uuid UUID,
    metric_value Int32
)
ENGINE = MergeTree()
PARTITION BY intDiv(toYear(timestamp), 10)
ORDER BY (site_id, toStartOfDay(timestamp), event, uuid)
SETTINGS index_granularity = 8192
```

Here, ClickHouse would generate one **partition** per 10 years of data, allowing to skip reading even the primary index in some cases.

In the underlying data, each **part** would belong to a single **partition** and only parts within a partition would get merged.

One additional benefit of partitioning by a derivate of timestamp is that if most queries touch recent data, you can also set up rules to automatically move older parts and partitions to cheaper storage or drop them entirely.

### Query analysis

Let's use an identical query as before to explain with the new dataset:

SQL

PostHog AI

```sql
SELECT
    toStartOfDay(timestamp),
    event,
    sum(metric_value) as total_metric_value
FROM sensor_values
WHERE site_id = 233 AND timestamp > '2010-01-01' and timestamp < '2023-01-01'
GROUP BY toStartOfDay(timestamp), event
ORDER BY total_metric_value DESC
LIMIT 20
```

Show full \`EXPLAIN\` output

PostHog AI

```
Expression (Projection)
Header: toStartOfDay(timestamp) DateTime
        event String
        total_metric_value Int64
  Limit (preliminary LIMIT (without OFFSET))
  Header: toStartOfDay(timestamp) DateTime
          event String
          sum(metric_value) Int64
    Sorting (Sorting for ORDER BY)
    Header: toStartOfDay(timestamp) DateTime
            event String
            sum(metric_value) Int64
      Expression (Before ORDER BY)
      Header: toStartOfDay(timestamp) DateTime
              event String
              sum(metric_value) Int64
        Aggregating
        Header: toStartOfDay(timestamp) DateTime
                event String
                sum(metric_value) Int64
          Expression (Before GROUP BY)
          Header: event String
                  metric_value Int32
                  toStartOfDay(timestamp) DateTime
            Filter (WHERE)
            Header: timestamp DateTime
                    event String
                    metric_value Int32
              SettingQuotaAndLimits (Set limits and quota after reading from storage)
              Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
                      timestamp DateTime
                      site_id UInt32
                      event String
                      metric_value Int32
                ReadFromMergeTree
                Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
                        timestamp DateTime
                        site_id UInt32
                        event String
                        metric_value Int32
                Indexes:
                  MinMax
                    Keys:
                      timestamp
                    Condition: and(and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))), and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))))
                    Parts: 2/14
                    Granules: 3589/24421
                  Partition
                    Keys:
                      intDiv(toYear(timestamp), 10)
                    Condition: and(and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))), and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))))
                    Parts: 2/2
                    Granules: 3589/3589
                  PrimaryKey
                    Keys:
                      site_id
                      toStartOfDay(timestamp)
                    Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))
                    Parts: 2/2
                    Granules: 12/3589
```

The relevant part of EXPLAIN is again nested deep within:

PostHog AI

```
ReadFromMergeTree
Header: and(greater(timestamp, '2010-01-01'), less(timestamp, '2023-01-01')) UInt8
        timestamp DateTime
        site_id UInt32
        event String
        metric_value Int32
Indexes:
  MinMax
    Keys:
      timestamp
    Condition: and(and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))), and((timestamp in (-Inf, 1672531199]), (timestamp in [1262304001, +Inf))))
    Parts: 2/14
    Granules: 3589/24421
  Partition
    Keys:
      intDiv(toYear(timestamp), 10)
    Condition: and(and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))), and((intDiv(toYear(timestamp), 10) in (-Inf, 202]), (intDiv(toYear(timestamp), 10) in [201, +Inf))))
    Parts: 2/2
    Granules: 3589/3589
  PrimaryKey
    Keys:
      site_id
      toStartOfDay(timestamp)
    Condition: and(and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf))), and((site_id in [233, 233]), and((toStartOfDay(timestamp) in (-Inf, 1672531200]), (toStartOfDay(timestamp) in [1262304000, +Inf)))))
    Parts: 2/2
    Granules: 12/3589
```

What this tells us is that ClickHouse:

1.  First leverages an internal MinMax index on `timestamp` to whittle down the number of parts to 2/14 and granules to 3589/24421
2.  Then it tries to filter via the partition key but this doesn't narrow things down further
3.  Then, it loads and leverages the Primary key as before to narrow data down to 12 granules.
4.  Lastly reads, filters and aggregates data in those 12 granules

The benefit here is that it could skip reading the primary key index for most of the parts that did not contain relevant data. If and how much this speeds up the query however depends on the size of the dataset.

### Choosing a good `PARTITION BY`

Use partitions wisely - each INSERT should ideally only touch 1-2 partitions and too many partitions will cause issues around replication or prove useless for filtering.

Loading the primary index/marks file might not be the bottleneck you expect, so be sure to benchmark different schemas against each other.

See the following Altinity documentation for more guidance:

-   [How to pick an ORDER BY / PRIMARY KEY / PARTITION BY for the MergeTree-family table](https://kb.altinity.com/engines/mergetree-table-engine-family/pick-keys/)
-   [How much is too much? ClickHouse limitations](https://kb.altinity.com/altinity-kb-schema-design/how-much-is-too-much/)

## Other notes on MergeTree

### Data is expensive to update

Updating data in ClickHouse is expensive and analogous to a schema migration.

For example, to update an event's properties, ClickHouse frequently needs to:

-   Scan all the data to find what parts contain the relevant data. This isn't often covered by `ORDER BY` and thus quite expensive.
-   Rewrite the *whole* part (including any columns) - this could be potentially up to 150GB of data rewritten for a single update.

This makes things operationally hard. We mitigate this by:

-   Writing duplicated rows for new data, using other table engines (e.g. ReplacingMergeTree) and accounting for this duplication in our queries.
-   Batching up GDPR or other data deletions and doing them on a schedule rather than immediately.

### No query planner

ClickHouse doesn't have a query planner in the sense PostgreSQL or other databases do.

On the one hand, you often end up fighting the query planner in other databases. If we know how ClickHouse works internally and can develop that into intuition for how SQL is executed, we're well-equipped to deal with performance issues as they arise.

On the other, this means that we'll need to be careful writing SQL as small changes can have huge performance implications.

Examples:

-   For best performance, ClickHouse requires you "push" predicates in WHERE clauses into sub-queries rather than filtering at the outermost query.
-   In the `sensor_values` queries above, the execution plan would have been *slightly* more optimal if the filter condition on `toYear(timestamp)` rather than `timestamp`.

One notable exception to "no query planner" is that ClickHouse often pushes predicates from `WHERE` into `PREWHERE`. Filters in `PREWHERE` are executed first and ClickHouse moves columns it thinks are "cheaper" or "more selective" into it. However putting the wrong column (e.g. a fat column containing JSON) in `PREWHERE` can cause performance to tank.

Read more on `PREWHERE` in the [ClickHouse docs](https://clickhouse.com/docs/en/sql-reference/statements/select/prewhere/).

### Data compression

Compression means that if subsequent column values of a given column are often similar or identical, the data compresses really well. At PostHog we frequently see uncompressed / compressed ratios of 20x-40x for JSON columns and 300x-2000x for sparse small columns.

Compression ratios have direct impact on query performance: I/O is often the bottleneck, meaning that highly compressed data can be read faster from disk at the cost of more CPU work for decompression.

By default columns are compressed by the `LZ4` algorithm. We've found good success using `ZSTD(3)` for storing JSON columns - see [benchmarks](https://github.com/PostHog/posthog/issues/10616) for more information.

Another tip is to use ClickHouse's `LowCardinality` data type modifier on schemas where a given column will store values with low cardinality i.e. the total number of values is low. An example of this would be "country name".

### Weak JOIN support

ClickHouse excels at aggregating data from a single table at a time. If you however have a query with JOINs or subqueries, the right-hand-side of the JOIN would be loaded into memory first. Thus, you should always have the bigger table on the left side of left-hand-side!

This means that at scale JOINs can kill performance. Read more on the effect of removing JOINs from our events database here:

-   [https://github.com/PostHog/posthog/issues/7962](https://github.com/PostHog/posthog/issues/7962)
-   [https://github.com/PostHog/product-internal/pull/240](https://github.com/PostHog/product-internal/pull/240)

## Suggested reading

-   [ClickHouse MergeTree docs](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree/)
-   [Why is ClickHouse so fast?](https://clickhouse.com/docs/en/faq/general/why-clickhouse-is-so-fast/)
-   [Overview of ClickHouse Architecture](https://clickhouse.com/docs/en/development/architecture/)
-   [ClickHouse Index Design](https://clickhouse.com/docs/en/guides/improving-query-performance/sparse-primary-indexes/sparse-primary-indexes-design/)
-   [How much is too much? ClickHouse limitations](https://kb.altinity.com/altinity-kb-schema-design/how-much-is-too-much/)
-   [How to pick an ORDER BY / PRIMARY KEY / PARTITION BY for the MergeTree-family table](https://kb.altinity.com/engines/mergetree-table-engine-family/pick-keys/)

Next in the ClickHouse manual: [Data replication](/handbook/engineering/clickhouse/replication.md)

### Community questions

Ask a question

### Was this page useful?

HelpfulCould be better