Skip to content

Cluster Management

This guide covers how to manage ClickHouse clusters with Housekeeper, including cluster-aware migrations and distributed deployment strategies.

Overview

ClickHouse clusters enable horizontal scaling by distributing data and queries across multiple nodes. Housekeeper provides comprehensive cluster support through:

  • ON CLUSTER clause injection for distributed DDL operations
  • Cluster-aware client configuration
  • Distributed migration execution

Client Configuration

Basic Cluster Setup

Configure Housekeeper to work with your ClickHouse cluster:

# housekeeper.yaml
clickhouse:
  version: "25.7"
  config_dir: "db/config.d"
  cluster: "production_cluster"

entrypoint: db/main.sql
dir: db/migrations

Client Options

import "github.com/pseudomuto/housekeeper/pkg/clickhouse"

// Create cluster-aware client
client, err := clickhouse.NewClientWithOptions(ctx, dsn, clickhouse.ClientOptions{
    Cluster: "production_cluster",
})

ON CLUSTER Operations

Automatic Injection

When a cluster is configured, Housekeeper automatically adds ON CLUSTER to all DDL operations:

-- Original schema
CREATE DATABASE analytics ENGINE = Atomic;
CREATE TABLE analytics.events (...) ENGINE = MergeTree();

-- Generated migration with cluster
CREATE DATABASE analytics ON CLUSTER production_cluster ENGINE = Atomic;
CREATE TABLE analytics.events ON CLUSTER production_cluster (...) ENGINE = MergeTree();

Manual ON CLUSTER

You can also specify clusters directly in your schema files:

-- Explicit cluster specification
CREATE DATABASE analytics ON CLUSTER my_cluster ENGINE = Atomic;

CREATE TABLE analytics.events ON CLUSTER my_cluster (
    id UInt64,
    timestamp DateTime,
    data String
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/events', '{replica}')
ORDER BY (id, timestamp);

Replicated Tables

ReplicatedMergeTree Setup

For distributed setups, use ReplicatedMergeTree engines:

CREATE TABLE analytics.events ON CLUSTER production_cluster (
    id UInt64,
    timestamp DateTime DEFAULT now(),
    event_type LowCardinality(String),
    user_id UInt64,
    data Map(String, String)
) 
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/events', '{replica}')
PARTITION BY toYYYYMM(timestamp)
ORDER BY (user_id, timestamp)
SETTINGS index_granularity = 8192;

Distributed Tables

Create distributed tables for cluster-wide querying:

CREATE TABLE analytics.events_distributed ON CLUSTER production_cluster AS analytics.events
ENGINE = Distributed(production_cluster, analytics, events, rand());

Global Objects in Clusters

Role Management Across Clusters

Roles are global objects that exist at the cluster level. When a cluster is configured, Housekeeper automatically adds ON CLUSTER clauses to role operations:

-- Your schema definition
CREATE ROLE IF NOT EXISTS analytics_reader;
GRANT SELECT ON analytics.* TO analytics_reader;

-- Generated migration with cluster configured
CREATE ROLE IF NOT EXISTS `analytics_reader` ON CLUSTER `production_cluster`;
GRANT `SELECT` ON `analytics`.* TO `analytics_reader` ON CLUSTER `production_cluster`;

Global Object Synchronization

All global objects (roles, users, settings profiles) must be synchronized across the cluster:

  • Consistent State: Ensure all nodes have the same role definitions
  • Migration Order: Global objects are processed first to be available cluster-wide
  • Access Control: Role permissions apply uniformly across all cluster nodes

For detailed role management patterns, see the Role Management guide.

ClickHouse Configuration

Cluster Definition

Configure your ClickHouse cluster in config.xml:

<clickhouse>
    <remote_servers>
        <production_cluster>
            <shard>
                <replica>
                    <host>clickhouse-01</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>clickhouse-02</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <replica>
                    <host>clickhouse-03</host>
                    <port>9000</port>
                </replica>
                <replica>
                    <host>clickhouse-04</host>
                    <port>9000</port>
                </replica>
            </shard>
        </production_cluster>
    </remote_servers>
</clickhouse>

ZooKeeper Integration

For ReplicatedMergeTree tables, configure ZooKeeper:

<clickhouse>
    <zookeeper>
        <node>
            <host>zk-01</host>
            <port>2181</port>
        </node>
        <node>
            <host>zk-02</host>
            <port>2181</port>
        </node>
        <node>
            <host>zk-03</host>
            <port>2181</port>
        </node>
    </zookeeper>
</clickhouse>

Migration Strategies

Cluster-Wide Migrations

All migrations generated by Housekeeper include cluster support when configured:

# Generate cluster-aware migration (cluster configuration comes from project config)
housekeeper diff

Safe Migration Practices

  1. Test on Staging: Always test migrations on a staging cluster first
  2. Backup Critical Data: Ensure backups before major schema changes
  3. Monitor Replication: Watch replication lag during migrations
  4. Gradual Rollout: Consider rolling migrations shard by shard for large changes

Docker Integration

Cluster Testing

Use Docker for local cluster testing:

import "github.com/pseudomuto/housekeeper/pkg/docker"

// Start cluster-enabled container
container := docker.NewWithOptions(docker.DockerOptions{
    Version:   "25.7",
    ConfigDir: "./config.d", // Contains cluster configuration
})

container.Start()
defer container.Stop()

Configuration Directory

Mount cluster configuration for Docker containers:

# config.d/_clickhouse.xml
<clickhouse>
    <remote_servers>
        <test_cluster>
            <shard>
                <replica>
                    <host>127.0.0.1</host>
                    <port>9000</port>
                </replica>
            </shard>
        </test_cluster>
    </remote_servers>
</clickhouse>

Best Practices

Schema Design

  1. Use Replicated Engines: Always use ReplicatedMergeTree for production clusters
  2. Consistent Sharding: Choose appropriate sharding keys for even distribution
  3. Partition Strategy: Align partitioning with your query patterns
  4. Avoid Cross-Shard JOINs: Design schemas to minimize cross-shard operations

Migration Safety

  1. Cluster Validation: Verify cluster health before migrations
  2. Lock Coordination: Be aware of distributed DDL locks
  3. Timeout Configuration: Set appropriate timeouts for cluster operations
  4. Error Handling: Implement retry logic for transient cluster issues

Monitoring

  1. Replication Lag: Monitor system.replicas table
  2. Distributed DDL: Check system.distributed_ddl_queue
  3. Cluster Health: Monitor node availability and connectivity
  4. Migration Status: Track migration application across all nodes

Troubleshooting

Common Issues

  1. DDL Timeout: Increase distributed_ddl_task_timeout setting
  2. Replication Issues: Check ZooKeeper connectivity and disk space
  3. Cluster Connectivity: Verify network connectivity between nodes
  4. Permission Errors: Ensure proper cluster permissions

Debug Commands

# Check cluster configuration
SELECT * FROM system.clusters WHERE cluster = 'production_cluster';

# Monitor distributed DDL
SELECT * FROM system.distributed_ddl_queue;

# Check replication status
SELECT * FROM system.replicas WHERE database = 'analytics';

Example: Complete Cluster Setup

-- Create cluster-aware database
CREATE DATABASE analytics ON CLUSTER production_cluster 
ENGINE = Atomic 
COMMENT 'Analytics database for production cluster';

-- Create replicated table
CREATE TABLE analytics.events ON CLUSTER production_cluster (
    id UInt64,
    timestamp DateTime DEFAULT now(),
    event_type LowCardinality(String),
    user_id UInt64,
    session_id String,
    properties Map(String, String)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/events', '{replica}')
PARTITION BY toYYYYMM(timestamp)
ORDER BY (user_id, timestamp)
SETTINGS index_granularity = 8192;

-- Create distributed table for querying
CREATE TABLE analytics.events_distributed ON CLUSTER production_cluster AS analytics.events
ENGINE = Distributed(production_cluster, analytics, events, sipHash64(user_id));

-- Create materialized view with replication
CREATE MATERIALIZED VIEW analytics.mv_daily_stats ON CLUSTER production_cluster
ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/daily_stats', '{replica}')
ORDER BY date
AS SELECT
    toDate(timestamp) as date,
    count() as events,
    uniq(user_id) as users
FROM analytics.events
GROUP BY date;

This setup provides a robust, scalable ClickHouse cluster with proper replication, distribution, and migration support through Housekeeper.