HandbookProduct EngineeringInfrastructureClickhouse

ClickHouse Cloud

We are using ClickHouse Cloud as our main data warehouse for storing and querying observability data such as traces, observations, and scores.

We have two organizations in place: Langfuse GmbH and Langfuse HIPAA. The former contains separate warehouses for our staging, EU, and US accounts. The latter is a dedicated HIPAA-compliant organization for the HIPAA cloud region.

Terminology

  • Organization: The top-level entity in ClickHouse Cloud that contains multiple warehouses. Support tickets and billing are managed at the organization level.
  • Warehouse: A logical group of services that share a common data-storage and ClickHouse-Keeper configuration. I.e. warehouses allow Compute-Compute separation.
  • Service: A set of replicas that share a common endpoint. Services can be scaled horizontally and vertically.
  • Node/Replica: An individual instance within a Service. Matches to a Kubernetes Pod.
  • Part: A logical “file” on disk. ClickHouse merges parts into larger ones over time. Initially, we write “Compact” parts where all data resides within the same file which makes writing and reading the entirety of the part cheaper. Over time and as parts grow, they are moved to “Wide” parts where each column is separated into its own file.
  • Granule: The lowest indexing entity in ClickHouse. It acts like a pointer to a set of rows within a part.

Common Concepts

In this section, we cover some common concepts that we use in Langfuse and their implications.

ReplacingMergeTrees

All tracing tables use the ReplacingMergeTree engine. This allows us to perform updates by writing a new row with a higher version number and the same sorting key and have them automatically be deduplicated in the background. As we allow updates on traces, observations, and scores, this is a central feature.

The deduplication in ReplacingMergeTrees happens eventually and there is no guarantee for it to ever happen. Therefore, we perform additional read-side deduplication for most queries to return consistent results.

There are three main ways to perform deduplication on read that we use:

  • FINAL modifier: This automatically performs the full deduplication that ClickHouse runs on merges. Especially for larger tables, this can be very expensive and slow with a high memory consumption. In addition, this blocks the use of data skipping indexes on older ClickHouse versions.
  • ORDER BY event_ts LIMIT 1 BY …: If we ensure that the sorting order avoids duplicates on updates (i.e. only sorts by time and event_ts), we can use the LIMIT 1 BY clause to return only one row per identifier. This is often the cheapest to compute and makes use of data skipping indexes, but requires careful ordering of the tables. We also observed gotchas where the timestamp shifted between updates which may cause seemingly random results.
  • argMax/argMin functions: These functions allow us to select the row with the highest/lowest version for each sorting key. This relies on a group-by call on the common identifier, e.g. GROUP BY project_id, trace_id. We rarely use this as it has a high memory consumption as it needs to keep the full result set in memory before returning.

Inserts

ClickHouse benefits heavily from large inserts as each insert creates an initial data part that needs to be merged later in the background. Therefore, we have multiple batch mechanisms in place to accumulate data before initiating an insert.

The worker container stores all ClickHouse writes into the ClickHouseWriter in an in-memory buffer. From there, we regularly flush the data to ClickHouse in configurable batch sizes and intervals. In addition, we use async_inserts on ClickHouse that collect data server-side before acknowledging the insert and writing it to disk.

Performance Considerations

Requesting Multiple Columns

Due to the columnar layout on disk, especially in Wide Parts, adding additional columns to a query becomes more expensive. This holds especially true for large binary columns like input, output, and metadata. Therefore, we have multiple optimizations in the code that allow an opt-out of those columns when requesting table data. Make sure to actively consider which columns are required for the use-case and limit your queries to those.

Deletions

Deletions produce a mutation of the affected parts in ClickHouse. Initially, a new _row_exists column is being written that requires a full column rewrite for Wide Parts and a full part rewrite for Compact Parts. Therefore, we attempt to run deletions asynchronously in the background and have multiple optimizations to batch and rate-limit deletions based on the system state.

Single Record Retrievals

The smallest addressable unit in ClickHouse is a granule which usually holds 8192 rows. Therefore, each single row retrieval reads at least that many rows. Where possible, we should use batch-processing to run queries on multiple records at once.

Joins

ClickHouse joins rarely optimize the queries to their full potential. If you observe runtime issues on large projects, it usually makes sense to investigate whether the following apply:

  • Can we push down filters into the left and the right side of the table?
  • Are there more suitable join algorithms (Clickhouse join docs)?
  • Is the smaller relation on the right side of the join?

Operations

ClickHouse support a cluster mode and a single instance mode. Those have an effect on DDL queries, as a ON CLUSTER keyword is required to achieve replication in a multi-node setup, but throws errors on single-instances without a cluster config.

For simplicity in the application, we replicate our migration scripts to support both. The migration being executed depends on the CLICKHOUSE_MIGRATION_CLUSTER_ENABLED environment variable. It defaults to true and must be set explicitly to false to disable cluster mode.

If you create a new migration, you will have to add four files:

packages/shared/clickhouse/migrations/clustered/<name>.up.sql
packages/shared/clickhouse/migrations/clustered/<name>.down.sql
packages/shared/clickhouse/migrations/unclustered/<name>.up.sql
packages/shared/clickhouse/migrations/unclustered/<name>.down.sql

The unclustered mode is tested in our CI and the clustered mode on staging, i.e. we should observe if any of the migrations you’ve added are invalid.

Also review our guidelines on ClickHouse database migrations in the repo.

Doc References

Was this page helpful?