3 минут чтения
6 сентября 2020 г.
Создаём материализованное представление в Clickhouse
В этот раз разберёмся, как с помощью Python передавать в Clickhouse данные по рекламным кампаниям и построим агрегат, используя материализованное представление.
Для чего нам материализованные представления? Часто Clickhouse используется для работы с огромными объемами данных, а время получения ответа на запрос к таблице с сырыми данными постоянно растёт. Стандартно, чтобы решить такую задачу эффективным способом, чаще всего используют ETL-процессы или создают таблицы агрегатов, что не очень удобно, ведь их необходимо регулярно пересчитывать. Clickhouse обладает встроенной и эффективной возможностью для решения задачи — материализованными представлениями.
Материализованные представления физически хранят и обновляют данные на диске в соответствии с запросом SELECT, на основе которого представление было создано. При вставке данных в искомую таблицу SELECT преобразовывает данные и вставляет их в представление.
Настройка машины
Наш скрипт на Python из предыдущих материалов необходимо подключить к Clickhouse — он будет отправлять запросы, поэтому нужно открыть несколько портов. В Dashboard AWS переходим в Network & Security — Security Groups. Наша машина входит в группу launch-wizard-1. Переходим в неё и смотрим на Inbound rules: нам нужно добавить правила как на скриншоте.
Настройка Clickhouse
Теперь настроим Clickhouse. Отредактируем файл config.xml в редакторе nano:
cd /etc/clickhouse-server
sudo nano config.xml
Воспользуйтесь мануалом по горячим клавишам, если тоже не сразу поняли, как выйти из nano.
Раскоментируем строку
чтобы доступ к базе данных был с любого IP-адреса:
Создание таблицы и материализованного представления
Зайдём в клиент и создадим нашу базу данных, в которой впоследствии создадим таблицы:
CREATE DATABASE db1
USE db1
Мы проиллюстрируем всё тот же пример сбора данных с Facebook. Информация по кампаниям может часто обновляться, и мы, в целях упражнения, хотим создать материализованное представление, которое будет автоматически пересчитывать агрегаты на основе собранных данных по затратам. Таблица в Clickhouse будет практически такой же, как DataFrame из прошлого материала. В качестве движка таблицы используем CollapsingMergeTree: он будет удалять дубликаты по ключу сортировки:
CREATE TABLE facebook_insights(
campaign_id UInt64,
clicks UInt32,
spend Float32,
impressions UInt32,
date_start Date,
date_stop Date,
sign Int8
) ENGINE = CollapsingMergeTree
ORDER BY (date_start, date_stop)
И сразу создадим материализованное представление:
CREATE MATERIALIZED VIEW fb_aggregated
ENGINE = SummingMergeTree()
ORDER BY date_start
AS
SELECT campaign_id,
date_start,
sum(spend * sign) as spent,
sum(impressions * sign) as impressions,
sum(clicks * sign) as clicks
FROM facebook_insights
GROUP BY date_start, campaign_id
Подробности рецепта можно посмотреть в блоге Clickhouse.
К сожалению, в Clickhouse UPDATE отсутствует, поэтому необходимо придумывать некоторые ухищрения. Мы воспользовались рецептом от команды Яндекса для обходного пути команды UPDATE. Идея состоит в том, чтобы в начале вставить строки, которые уже были в таблице с отрицательным Sign, а затем использовать Sign для сторнирования. Следуя этому рецепту старые данные не будут учитываться при суммировании.
Скрипт
Начнём писать скрипт. Понадобится новая библиотека – clickhouse_driver, позволяющая отправлять запросы к Clickhouse из скрипта на Python:
В материале приведена только доработка скрипта, описанного в статье «Собираем данные по рекламным кампаниям в Facebook«. Всё будет работать, если вы просто вставите код из текущего материала в скрипт предыдущего.
from datetime import datetime, timedelta
from clickhouse_driver import Client
from clickhouse_driver import errors
Объект класса Client позволит отправлять запросы методом execute(). В host вводим свой public dns, user ставим default, port – 9000 и в database базу данных для подключения.
client = Client(host=’ec1-2-34-56-78.us-east-2.compute.amazonaws.com’, user=’default’, password=’ ‘, port=’9000′, database=’db1’)
Чтобы удостовериться, что всё нормально, можно написать следующий запрос, который должен вывести наименования всех баз данных на сервере:
client.execute(‘SHOW DATABASES’)
В случае успеха получим на экране такой список:
[(‘_temporary_and_external_tables’,), (‘db1’,), (‘default’,), (‘system’,)]
Пусть, например, мы хотим рассматривать данные за последние три дня. Получим эти даты библиотекой datetime и переведём в нужный формат методом strftime():
date_start = datetime.now() — timedelta(days=3)
date_end = datetime.now() — timedelta(days=1)
date_start_str = date_start.strftime(«%Y-%m-%d»)
date_end_str = date_end.strftime(«%Y-%m-%d»)
Напишем вот такой запрос, получающий все колонки таблицы за это время:
SQL_select = f»select campaign_id, clicks, spend, impressions, date_start, date_stop, sign from facebook_insights where date_start > ‘{date_start_str}’ AND date_start < '{date_end_str}'"
[/code_snippet]
<p>И выполним запрос, поместив информацию в список <span class="inline-code">old_data_list</span>. А затем поменяем всем <span class="inline-code">sign</span> на -1 и добавим в <span class="inline-code">new_data_list</span>:</p>
[code_snippet]
new_data_list = []
old_data_list = []
old_data_list = client.execute(SQL_select)
for elem in old_data_list:
elem = list(elem)
elem[len(elem) — 1] = -1
new_data_list.append(elem)
Наконец, напишем наш алгоритм: вставляем те же самые данные с sign = −1, оптимизируем для удаления дубликатов движком CollapsingMergeTree и выполняем INSERT новых данных со знаком sign = 1.
SQL_query = ‘INSERT INTO facebook_insights VALUES’
client.execute(SQL_query, new_data_list)
SQL_optimize = «OPTIMIZE TABLE facebook_insights»
client.execute(SQL_optimize)
for i in range(len(insight_campaign_id_list)):
client.execute(SQL_query, [[insight_campaign_id_list[i],
insight_clicks_list[i],
insight_spend_list[i],
insight_impressions_list[i],
datetime.strptime(insight_date_start_list[i], ‘%Y-%m-%d’).date(),
datetime.strptime(insight_date_start_list[i], ‘%Y-%m-%d’).date(),
1]])
client.execute(SQL_optimize)
Вернёмся в Clickhouse. Выполним SELECT * FROM facebook_insights LIMIT 20, чтобы посмотреть первые 20 строк таблицы:
И SELECT * FROM fb_aggregated LIMIT 20, чтобы проверить наше представление:
Отлично! Мы сделали материализованное представление — теперь новые данные, поступающие в таблицу facebook_insights будут поступать и в материализованное представление fb_aggregated и каждый раз пересчитываться благодаря SummingMergeTree. При этом трюк с sign позволяет отлавливать уже обработанные записи и не допускать их суммирования, а CollapsingMergeTree — чистить дубликаты.
[ Рекомендации ]
Читайте также
4 минут чтения
21 января 2021
[ Связаться ]
Давайте раскроем потенциал вашего бизнеса вместе
Заполните форму на бесплатную консультацию