Перейти к основному содержимому
Перейти к основному содержимому

Интеграция dbt и ClickHouse

dbt (инструмент преобразования данных) позволяет аналитическим инженерам преобразовывать данные в своих хранилищах, просто написав операторы SELECT. dbt обрабатывает материализацию этих операторов SELECT в объекты в базе данных в виде таблиц и представлений - выполняя T из Извлечь Загрузить и Преобразовать (ELT). Пользователи могут создать модель, определенную оператором SELECT.

Внутри dbt эти модели могут ссылаться друг на друга и накладываться для создания более высокоуровневых концепций. Шаблонный SQL, необходимый для подключения моделей, автоматически генерируется. Более того, dbt определяет зависимости между моделями и гарантирует, что они создаются в соответствующем порядке с помощью ориентированного ациклического графа (DAG).

Dbt совместим с ClickHouse через плагин, поддерживаемый ClickHouse. Мы описываем процесс подключения ClickHouse с простым примером, основанным на общедоступном наборе данных IMDB. Мы также подчеркиваем некоторые ограничения текущего соединителя.

Концепции

dbt вводит концепцию модели. Это определяется как SQL-оператор, потенциально объединяющий множество таблиц. Модель может быть "материализована" несколькими способами. Материализация представляет собой стратегию сборки для выборки модели. Код за материализацией — это шаблонный SQL, который оборачивает ваш запрос SELECT в оператор для создания новых или обновления существующих связей.

dbt предоставляет 4 типа материализации:

  • view (по умолчанию): Модель создается как представление в базе данных.
  • table: Модель создается как таблица в базе данных.
  • ephemeral: Модель не строится напрямую в базе данных, а вместо этого объединяется в зависимые модели в виде общих табличных выражений.
  • incremental: Модель изначально материализуется как таблица, а в последующих запусках dbt вставляет новые строки и обновляет измененные строки в таблице.

Дополнительный синтаксис и условия определяют, как эти модели должны обновляться, если их основная информация изменяется. dbt в целом рекомендует начинать с материализации представлений, пока производительность не станет проблемой. Материализация таблиц обеспечивает улучшение производительности во время выполнения запросов, захватывая результаты запроса модели в виде таблицы за счет увеличения объема хранения. Инкрементный подход строится на этом дальше, позволяя захватывать последующие обновления основной информации в целевой таблице.

Текущий плагин для ClickHouse поддерживает view, table, ephemeral и incremental материализации. Плагин также поддерживает dbt snapshots и seeds, которые мы исследуем в этом руководстве.

Для следующих руководств мы предполагаем, что у вас есть доступ к экземпляру ClickHouse.

Настройка dbt и плагина ClickHouse

dbt

Мы предполагаем использование CLI dbt для следующих примеров. Пользователи также могут рассмотреть возможность использования dbt Cloud, который предлагает веб-ориентированную интегрированную среду разработки (IDE), позволяя пользователям редактировать и запускать проекты.

dbt предлагает несколько вариантов установки CLI. Следуйте инструкциям, описанным здесь. На этом этапе установите только dbt-core. Мы рекомендуем использовать pip.

Важно: Следующее протестировано под python 3.9.

Плагин ClickHouse

Установите плагин ClickHouse для dbt:

Подготовка ClickHouse

dbt отлично справляется с моделированием высоко реляционных данных. Для примера мы предоставляем небольшой набор данных IMDB со следующим реляционным схемой. Этот набор данных поступает из хранилища реляционных наборов данных. Это тривиально по сравнению с общими схемами, используемыми с dbt, но представляет собой управляемый образец:

Схема таблиц IMDB

Мы используем подмножество этих таблиц, как показано.

Создайте следующие таблицы:

примечание

Колонка created_at для таблицы roles по умолчанию имеет значение now(). Мы используем это позже для определения инкрементных обновлений для наших моделей - смотрите Инкрементные модели.

Мы используем функцию s3, чтобы прочитать исходные данные из общедоступных конечных точек для вставки данных. Выполните следующие команды для заполнения таблиц:

Время выполнения этих команд может варьироваться в зависимости от вашей пропускной способности, но каждая из них должна занять всего несколько секунд для завершения. Выполните следующий запрос, чтобы вычислить сводку для каждого актера, отсортированную по количеству появлений в фильмах, и подтвердить, что данные были загружены успешно:

Ответ должен выглядеть следующим образом:

В следующих руководствах мы преобразуем этот запрос в модель - материализуем его в ClickHouse как представление и таблицу dbt.

Подключение к ClickHouse

  1. Создайте проект dbt. В этом случае мы назовем его по нашему источнику imdb. При запросе выберите clickhouse в качестве источника базы данных.

  2. cd в папку вашего проекта:

  3. На этом этапе вам потребуется текстовый редактор на ваш выбор. В следующих примерах мы используем популярный VS Code. Открыв директорию IMDB, вы должны увидеть коллекцию файлов yml и sql:

    Новый проект dbt
  4. Обновите файл dbt_project.yml, чтобы указать нашу первую модель - actor_summary и установить профиль на clickhouse_imdb.

    Профиль dbt Профиль dbt
  5. Далее нам нужно предоставить dbt с деталями подключения к нашему экземпляру ClickHouse. Добавьте следующее в ваш ~/.dbt/profiles.yml.

    Обратите внимание на необходимость изменить пользователя и пароль. Есть дополнительные доступные настройки, задокументированные здесь.

  6. Из директории IMDB выполните команду dbt debug, чтобы подтвердить, может ли dbt подключиться к ClickHouse.

    Подтвердите, что ответ включает Тест соединения: [OK соединение в порядке], что указывает на успешное соединение.

Создание простой материализации представления

При использовании материализации представления модель перестраивается как представление при каждом выполнении через оператор CREATE VIEW AS в ClickHouse. Это не требует дополнительного хранения данных, но будет медленнее для выполнения запросов, чем материализации таблиц.

  1. Из папки imdb удалите директорию models/example:

  2. Создайте новый файл в папке actors внутри папки models. Здесь мы создаем файлы, каждый из которых представляет модель актера:

  3. Создайте файлы schema.yml и actor_summary.sql в папке models/actors.

    Файл schema.yml определяет наши таблицы. Они будут доступны для использования в макросах. Измените models/actors/schema.yml, чтобы содержать следующий контент:

    actors_summary.sql определяет нашу фактическую модель. Обратите внимание, что в функции конфигурации мы также запрашиваем модель, чтобы материализовать ее как представление в ClickHouse. Наши таблицы ссылаются из файла schema.yml через функцию source, т.е. source('imdb', 'movies') ссылается на таблицу movies в базе данных imdb. Измените models/actors/actors_summary.sql, чтобы содержать следующий контент:

    Обратите внимание, как мы включаем колонку updated_at в нашем окончательном actor_summary. Мы используем это позже для инкрементных материализаций.

  4. Из директории imdb выполните команду dbt run.

  5. dbt представит модель как представление в ClickHouse, как запрашивалось. Мы теперь можем запросить это представление напрямую. Это представление будет создано в базе данных imdb_dbt - это определяется параметром схемы в файле ~/.dbt/profiles.yml под профилем clickhouse_imdb.

    Запросив это представление, мы можем воспроизвести результаты нашего предыдущего запроса с более простым синтаксисом:

Создание материализации таблицы

В предыдущем примере наша модель была материализована как представление. Хотя это может предложить достаточную производительность для некоторых запросов, более сложные SELECT или часто выполняемые запросы могут быть лучше материализованы как таблица. Эта материализация полезна для моделей, которые будут запрашиваться инструментами BI, чтобы обеспечить пользователям более быстрый опыт. Это фактически вызывает хранение результатов запроса в виде новой таблицы с сопутствующими накладными расходами по хранению - фактически выполняется INSERT TO SELECT. Обратите внимание, что эта таблица будет воссоздана каждый раз, т.е. она не инкрементальна. Большие наборы результатов могут, следовательно, приводить к длительным временам выполнения - смотрите Ограничения dbt.

  1. Измените файл actors_summary.sql, установив параметр materialized в table. Обратите внимание, как определен ORDER BY, и обратите внимание, что мы используем движок таблицы MergeTree:

  2. Из директории imdb выполните команду dbt run. Это выполнение может занять немного больше времени - около 10 секунд на большинстве машин.

  3. Подтвердите создание таблицы imdb_dbt.actor_summary:

    Вы должны увидеть таблицу с соответствующими типами данных:

  4. Подтвердите, что результаты из этой таблицы соответствуют предыдущим ответам. Обратите внимание на заметное улучшение во времени отклика теперь, когда модель является таблицей:

    Не стесняйтесь выполнять другие запросы к этой модели. Например, какие актеры имеют самые высокие рейтинги в фильмах с более чем 5 появлениями?

Создание инкрементной материализации

В предыдущем примере была создана таблица для материализации модели. Эта таблица будет реконструироваться для каждого выполнения dbt. Это может быть непрактично и крайне дорого для больших наборов результатов или сложных преобразований. Чтобы решить эту проблему и сократить время сборки, dbt предлагает инкрементные материализации. Это позволяет dbt вставлять или обновлять записи в таблице с последнего выполнения, что делает его подходящим для данных в стиле событий. Внутри создается временная таблица со всеми обновленными записями, а затем все нетронутые записи, а также обновленные записи вставляются в новую целевую таблицу. Это приводит к аналогичным ограничениям для больших наборов результатов, как и для модели таблицы.

Чтобы преодолеть эти ограничения для больших наборов, плагин поддерживает режим «inserts_only», где все обновления вставляются в целевую таблицу без создания временной таблицы (подробности ниже).

Чтобы проиллюстрировать этот пример, мы добавим актера "Clicky McClickHouse", который появится в невероятных 910 фильмах - обеспечивая, что он появится в большем количестве фильмов, чем даже Mel Blanc.

  1. Сначала мы модифицируем нашу модель, чтобы она была инкрементного типа. Это дополнение требует:

    1. unique_key - Чтобы гарантировать, что плагин может уникально идентифицировать строки, мы должны предоставить unique_key - в данном случае поле id из нашего запроса подойдет. Это гарантирует, что у нас не будет дубликатов строк в нашей материализованной таблице. Более подробно о ограничениях уникальности смотрите здесь.
    2. Инкрементный фильтр - Нам также нужно сообщить dbt, как он должен определять, какие строки изменились в инкрементном запуске. Это достигается путем предоставления дельта-выражения. Обычно это включает временные метки для событийных данных; поэтому наше поле timestamp updated_at. Эта колонка, которая по умолчанию имеет значение now() при вставке строк, позволяет идентифицировать новые роли. Кроме того, нам нужно определить альтернативный случай, когда добавляются новые актеры. Используя переменную {{this}}, чтобы обозначить существующую материализованную таблицу, это дает нам выражение where id > (select max(id) from {{ this }}) or updated_at > (select max(updated_at) from {{this}}). Мы встраиваем это внутри условия {% if is_incremental() %}, гарантируя, что оно будет использоваться только в инкрементных запусках, а не при первом создании таблицы. Более подробно о фильтрации строк для инкрементных моделей смотрите в этом обсуждении в документации dbt.

    Обновите файл actor_summary.sql следующим образом:

    Обратите внимание, что наша модель будет реагировать только на обновления и добавления в таблицы roles и actors. Чтобы реагировать на все таблицы, пользователям рекомендуется разбить эту модель на несколько под-моделей - каждая со своими инкрементными критериями. Эти модели, в свою очередь, могут быть ссылочными и соединяться. Для получения дополнительных сведений о перекрестных ссылках моделей см. здесь.

  2. Выполните dbt run и подтвердите результаты полученной таблицы:

  3. Теперь мы добавим данные в нашу модель, чтобы проиллюстрировать инкрементное обновление. Добавим нашего актера "Clicky McClickHouse" в таблицу actors:

  4. Давайте сделаем так, чтобы "Clicky" снялся в 910 случайных фильмах:

  5. Подтвердите, что он теперь действительно актер с наибольшим числом появлений, выполнив запрос к основной таблице источника и игнорируя любые модели dbt:

  6. Выполните dbt run и подтвердите, что наша модель была обновлена и соответствует приведенным выше результатам:

Внутреннее устройство

Мы можем определить выполненные операторы, чтобы достичь вышеописанного инкрементального обновления, запросив журнал запросов ClickHouse.

Отрегулируйте вышеуказанный запрос на период выполнения. Мы оставляем проверку результатов пользователю, но подчеркиваем общую стратегию, используемую плагином для выполнения инкрементных обновлений:

  1. Плагин создает временную таблицу actor_sumary__dbt_tmp. Измененные строки поступают в эту таблицу.
  2. Создается новая таблица actor_summary_new. Строки из старой таблицы также поступают из старой в новую, с проверкой, чтобы удостовериться, что идентификаторы строк не существуют во временной таблице. Это эффективно обрабатывает обновления и дубликаты.
  3. Результаты из временной таблицы поступают в новую таблицу actor_summary:
  4. Наконец, новая таблица атомарно заменяет старую версию с помощью оператора EXCHANGE TABLES. Старая и временная таблицы в свою очередь удаляются.

Это визуализируется ниже:

инкрементные обновления dbt

Эта стратегия может встретить сложности на очень больших моделях. Для получения дополнительной информации смотрите Ограничения.

Стратегия добавления (режим только вставки)

Чтобы преодолеть ограничения больших наборов данных в инкрементальных моделях, плагин использует параметр конфигурации dbt incremental_strategy. Это может быть установлено в значение append. Когда это установлено, обновленные строки вставляются непосредственно в целевую таблицу (так называемую imdb_dbt.actor_summary), и временная таблица не создается. Примечание: режим только для добавления требует, чтобы ваши данные были неизменяемыми или чтобы дубликаты были приемлемы. Если вы хотите инкрементальную таблицу, которая поддерживает измененные строки, не используйте этот режим!

Чтобы проиллюстрировать этот режим, мы добавим еще одного нового актера и повторно выполним dbt run с incremental_strategy='append'.

  1. Настройте режим только добавления в actor_summary.sql:

  2. Давайте добавим еще одного знаменитого актера - Дэнни ДеВито

  3. Давайте сделаем так, чтобы Дэнни снялся в 920 случайных фильмах.

  4. Выполните dbt run и подтвердите, что Дэнни был добавлен в таблицу actor-summary

Обратите внимание, насколько быстрее прошло инкрементальное обновление по сравнению с вставкой "Clicky".

Проверка таблицы query_log опять показывает разницу между 2 инкрементальными запусками:

В этом запуске только новые строки добавляются прямо в таблицу imdb_dbt.actor_summary, и никакого создания таблицы не требуется.

Режим удаление+вставка (экспериментальный)

Исторически ClickHouse имел лишь ограниченную поддержку для обновлений и удалений, в виде асинхронных Mutations. Эти операции могут быть крайне ресурсоемкими и их следует в общем избегать.

ClickHouse 22.8 представил Легкие удаления. Эти опции в настоящее время экспериментальные, но предлагают более производительный способ удаления данных.

Этот режим можно настроить для модели через параметр incremental_strategy, то есть.

Эта стратегия работает непосредственно с таблицей целевой модели, поэтому если возникнет проблема во время операции, данные в инкрементальной модели, вероятно, будут находиться в недопустимом состоянии - атомарного обновления нет.

В целом, этот подход:

  1. Плагин создает временную таблицу actor_sumary__dbt_tmp. Измененные строки поступают в эту таблицу.
  2. Выполняется команда DELETE для текущей таблицы actor_summary. Строки удаляются по идентификатору из actor_sumary__dbt_tmp.
  3. Строки из actor_sumary__dbt_tmp вставляются в actor_summary с помощью команды INSERT INTO actor_summary SELECT * FROM actor_sumary__dbt_tmp.

Этот процесс показан ниже:

легкое удаление инкрементальное

Режим вставки_перезаписи (экспериментальный)

Выполняет следующие шаги:

  1. Создает промежуточную (временную) таблицу с такой же структурой, как и отношение инкрементальной модели: CREATE TABLE {staging} AS {target}.
  2. Вставляет только новые записи (сгенерированные с помощью SELECT) в промежуточную таблицу.
  3. Заменяет только новые партиции (присутствующие в промежуточной таблице) в целевой таблице.

Этот подход обладает следующими преимуществами:

  • Он быстрее, чем стратегия по умолчанию, поскольку не копирует всю таблицу.
  • Он безопаснее других стратегий, поскольку не изменяет оригинальную таблицу до тех пор, пока операция ВСТАВКИ не завершится успешно: в случае промежуточной ошибки оригинальная таблица не изменяется.
  • Он имплементирует «неизменяемость партиций» - лучшую практику в области обработки данных. Это упрощает инкрементальную и параллельную обработку данных, откаты и т.д.
вставка перезапись инкрементальная

Создание снимка

Снимки dbt позволяют записывать изменения в изменяемой модели с течением времени. Это, в свою очередь, позволяет выполнять запросы на определенный момент времени по моделям, где аналитики могут "ознакомиться с состоянием в прошлом" модели. Это достигается с помощью типа 2 «Медленно изменяющиеся измерения», где из столбцов from и to фиксируется, когда строка была действительна. Эта функциональность поддерживается плагином ClickHouse и демонстрируется ниже.

Этот пример предполагает, что вы завершили Создание инкрементной таблицы модели. Убедитесь, что ваш файл actor_summary.sql не устанавливает inserts_only=True. Ваш models/actor_summary.sql должен выглядеть следующим образом:

  1. Создайте файл actor_summary в каталоге снимков.

  2. Обновите содержимое файла actor_summary.sql следующим содержимым:

Несколько замечаний относительно этого содержания:

  • Запрос select определяет результаты, которые вы хотите фиксировать с течением времени. Функция ref используется для ссылки на ранее созданную модель actor_summary.
  • Мы требуем столбец временной метки, чтобы указать изменения записей. Наш столбец updated_at (смотрите Создание инкрементной таблицы модели) может быть использован здесь. Параметр strategy указывает на использование временной метки для обозначения обновлений, а параметр updated_at указывает, какой столбец использовать. Если этого не имеется в вашей модели, вы можете использовать альтернативно стратегию проверки. Это значительно менее эффективно и требует от пользователя указать список столбцов для сравнения. dbt сравнивает текущие и исторические значения этих столбцов, фиксируя любые изменения (или ничего не делая, если они идентичны).
  1. Выполните команду dbt snapshot.

Обратите внимание, что в базе данных (db) был создана таблица actor_summary_snapshot (определяемая параметром target_schema).

  1. Изучая эти данные, вы увидите, как dbt включил столбцы dbt_valid_from и dbt_valid_to. Последний имеет значения, установленные в null. Последующие запуски обновят это.

  2. Сделаем так, чтобы наш любимый актер Clicky McClickHouse появился в еще 10 фильмах.

  3. Повторно выполните команду dbt run из каталога imdb. Это обновит инкрементальную модель. После завершения запустите dbt snapshot, чтобы зафиксировать изменения.

  4. Если мы теперь запросим наш снимок, обратите внимание, что у нас есть 2 строки для Clicky McClickHouse. Наша предыдущая запись теперь имеет значение dbt_valid_to. Наша новая запись фиксируется с тем же значением в столбце dbt_valid_from, и значение dbt_valid_to равно null. Если бы у нас были новые строки, они также были бы добавлены в снимок.

Для получения дополнительных сведений о снимках dbt см. здесь.

Использование семян

dbt предоставляет возможность загружать данные из CSV файлов. Эта возможность не предназначена для загрузки больших экспортов базы данных и больше подходит для небольших файлов, обычно используемых для кодовых таблиц и словарей, например, для сопоставления кодов стран с названиями стран. Для простого примера мы генерируем и затем загружаем список кодов жанров, используя функционал семян.

  1. Мы генерируем список кодов жанров из нашего существующего набора данных. В каталоге dbt используйте clickhouse-client, чтобы создать файл seeds/genre_codes.csv:

  2. Выполните команду dbt seed. Это создаст новую таблицу genre_codes в нашей базе данных imdb_dbt (как определено в нашей конфигурации схемы) с строками из нашего CSV файла.

  3. Подтвердите, что они были загружены:

Ограничения

Текущий плагин ClickHouse для dbt имеет несколько ограничений, о которых пользователям следует знать:

  1. Плагин в настоящее время материализует модели как таблицы, используя INSERT TO SELECT. Это фактически означает дублирование данных. Очень большие наборы данных (PB) могут привести к крайне длительному времени выполнения, что делает некоторые модели непригодными. Старайтесь минимизировать количество строк, возвращаемых любым запросом, используя GROUP BY, где это возможно. Предпочитайте модели, которые суммируют данные, по сравнению с теми, которые просто выполняют преобразование, сохраняя количество строк источника.
  2. Чтобы использовать распределенные таблицы для представления модели, пользователи должны вручную создавать исходные реплицированные таблицы на каждом узле. Распределенная таблица, в свою очередь, может быть создана поверх этих таблиц. Плагин не управляет созданием кластера.
  3. Когда dbt создает отношение (таблицу/представление) в базе данных, он обычно создает его как: {{ database }}.{{ schema }}.{{ table/view id }}. ClickHouse не имеет понятия схем. Поэтому плагин использует {{schema}}.{{ table/view id }}, где schema - это база данных ClickHouse.

Дополнительная информация

Предыдущие руководства лишь слегка касаются функционала dbt. Пользователям рекомендуется прочитать отличную документацию dbt.

Дополнительная конфигурация для плагина описана здесь.

Fivetran

Коннектор dbt-clickhouse также доступен для использования в преобразованиях Fivetran, что позволяет безшовную интеграцию и возможности преобразования непосредственно в платформе Fivetran с использованием dbt.