6 заметок с тегом

AWS

How-to: модель GPT-2 для получения логических выводов с помощью Amazon SageMaker

Время чтения текста – 23 минуты

Не так давно, на конференции Linq мы представили генератор твитов в стиле Илона Маска. Для его создания мы взяли готовую модель GPT-2 Medium и дообучили/стилизовали ее на своем датасете. По-английски такой процесс называется fine-tuning. Модель требует достаточное количество ресурсов для этого, так что не каждый может это сделать локально на своем ПК. Однако, вопрос решается, если использовать, например, Google Colab. Мы же дообучали модель на платформе Kaggle.

Но после этого появляется новая задача — модель нужно развернуть, чтобы создать полноценный сервис. Конечно, существует множество различных решений. Ранее, при создании генератора телеграм-постов в стиле Артемия Лебедева мы обращались к сервису Yandex DataSphere, а после даже написали небольшой гайд о том, как там развернуть модель. Теперь же мы расскажем о развертывании модели GPT-2 для получения логических выводов в режиме реального времени (Real-time inference) с помощью Amazon SageMaker.

  1. Заходим в аккаунт AWS.
  1. Так как мы хотим задеплоить стилизованную модель, нам необходимо загрузить в облако необходимые файлы. В поиске вбиваем S3, выбираем первый сервис.

  1. Нажимаем Create bucket.

  1. Далее проводим конфигурацию бакета: вводим его имя, выбираем регион, настраиваем права доступа и т. д. Для простоты достаточно ввести данные только в графе General configuration.

  1. После конфигурирования нажимаем Create bucket.

  1. Затем нас перебросит на страницу Amazon S3, где можно будет увидеть что-то подобное:

  1. Далее надо загрузить сами файлы модели. Нажимаем на имя созданного бакета и в открывшемся окне нажимаем Upload.

  1. Перетаскиваем архив с файлами модели в нужную область или нажимаем Add files. При необходимости можно выстроить иерархию внутри бакета путем создания папок с помощью Add folders. Здесь важно отметить, что файлы модели должны быть в архиве с расширением tar.gz.

  1. Нажимаем Upload и ждем завершение загрузки.

  1. После успешной загрузки архива, перейдем непосредственно к деплою модели. В поиске вбиваем SageMaker, выбираем первый сервис.

  1. Для работы с с этим сервисом необходимо предварительно настроить SageMaker Domain, для этого нажимаем Get Started на баннере New to Sagemaker?.

  1. Для простой конфигурации 1 пользователя выбираем Quick setup и нажимаем Set up SageMaker Domain.

  1. Заполняем имя пользователя и настраиваем роль для исполнения. Для этого можем создать новую роль и указать в ней то, к каким бакетам S3 у пользователя будет доступ. Для простоты дадим доступ ко всем бакетам.

  1. Нажимаем Submit.

  1. Придется немного подождать, пока SageMaker Domain и пользователь будут сконфигурированы.

  1. После завершения настройки, среди пользователей появится созданный нами и можно будет запустить Studio, нажав на Launch app. SageMaker Studio — IDE, позволяющая работать работать с Jupyter ноутбуками в облаке AWS.

  1. Тут тоже придется немного подождать.

  1. Наконец, мы попадем в SageMaker Studio. Переключаясь между вкладками с помощью панели слева, можно:
    • Просмотреть рабочий репозиторий, где будут храниться ноутбуки и прочие файлы;
    • Просмотреть запущенные инстансы и приложения, Kernel и Terminal Sessions;
    • Работать с Git репозиторием;
    • Управлять ресурсами SageMaker;
    • Устанавливать разрешения для Jupyter ноутбуков.

  1. Отдельно выделим SageMaker JumpStart. Этот сервис предлагает предварительно обученные модели с открытым исходным кодом для широкого спектра задач. Вы можете обучить и настроить эти модели перед тем как развернуть их. JumpStart также предоставляет шаблоны решений для настройки инфраструктуры для распространенных случаев использования и исполняемые ноутбуки для машинного обучения с помощью SageMaker.

  1. Несмотря на наличие готовых решений, для деплоя нашей fine-tuned модели GPT-2 мы создадим новый ноутбук, где пропишем все, что нам нужно. Для этого нажмем на + в голубом прямоугольнике сверху слева. Откроется вкладка Launcher, пролистаем вниз до секции Notebooks and compute resources и выберем там Notebook Python 3.

  1. Придется немного подождать, прежде чем ядро ноутбука будет готово к работе.
  1. Наконец, можно писать код.

  1. Отдельно стоит отметить, что можно выбрать инстанс, на котором будет выполняться ноутбук. Например, если для вашей модели нужно больше ресурсов, вы запросто сможете переключиться. Но стоит помнить, что и платить придется соответственно.

  1. Во время работы с ноутбуком вы платите за время его использования с учетом типа выбранного инстанса.
  1. Для простого деплоя нашей модели можем воспользоваться готовой конфигурацией от Hugging Face. Нажимаем на кнопку Deploy, выбираем там Amazon SageMaker, выбираем задачу (в нашем случае это Text Generation) и конфигурацию (в нашем случае это AWS), копируем код в наш ноутбук.

  1. Так как мы используем свою дообученную модель, а не готовую из репозитория Hugging Face, нам надо сделать небольшие изменения в коде. Комментируем в словаре hub строку с ключом ‘HF_MODEL_ID’ и в конструкторе HuggingFaceModel добавляем ключ model_data, куда пишем путь до нашего архива с файлами модели:
# Hub Model configuration. https://huggingface.co/models
hub = {
	# 'HF_MODEL_ID':'gpt2-medium',
	'HF_TASK':'text-generation'
}
 
# create Hugging Face Model Class
huggingface_model = HuggingFaceModel(
	transformers_version='4.17.0',
	pytorch_version='1.10.2',
	py_version='py38',
	env=hub,
	role=role, 
      model_data='s3://my-bucket-for-gpt2/gpt2-medium-musk.tar.gz',
)
  1. В методе deploy объекта huggingface_model мы можем выбрать, на каком инстансе произойдет развертывание нашей модели, указав его в параметре instance_type. Большинство инстансов может быть недоступно в связи с отсутствием нужных квот и их придется запрашивать в поддержке AWS. В этом случае вы увидите подобную ошибку:

  1. Если модель была успешно создана и развернута (для этого придется немного подождать), то можно вызвать метод predict.

  1. Для того, чтобы обращаться к инстансу извне AWS, придется создать Access key.
  • В поиске вбиваем IAM, выбираем первый сервис.

  • В открывшемся окне выбираем вкладку User и нажимаем на имя пользователя, под которым мы работаем.

  • Переходим на вкладку Security credentials и нажимаем Create access key.

  • Копируем Access key ID и Secret access key и сохраняем их в надежном месте.
  1. Далее нужно узнать имя созданного эндпоинта с моделью. В студии на левой панели выбираем вкладку SageMaker resources, выбираем ресурс Endpoints и дважды кликаем по имени нашего эндпоинта. Откроется вкладка с деталями, откуда мы сможем скопировать его имя.

  1. Теперь напишем код для обращения к модели извне.
import boto3
import json
import time
 
endpoint_name = '<my_endpoint_name>'
aws_access_key_id = '<my_aws_access_key_id>'
aws_secret_access_key = '<my_aws_secret_access_key>'
 
sagemaker_runtime = boto3.client(
    "sagemaker-runtime", 
    region_name='us-east-1',
    aws_access_key_id=aws_access_key_id, 
    aws_secret_access_key=aws_secret_access_key
)
 
data = {
    "inputs": "Weed is",
}
 
response = sagemaker_runtime.invoke_endpoint(
    EndpointName=endpoint_name, 
    ContentType='application/json',
    Body=json.dumps(data, ensure_ascii=False).encode('utf8')
)
 
print(response['Body'].read().decode('utf-8'))

И протестируем:

  1. Стоит отметить, что если следовать описанным выше шагам, то модель будет использовать для генерации параметры по умолчанию. Чтобы добавить кастомную логику загрузки модели, пред- и постобработки данных, предсказания, можно создать файл inference.py в студии рядом с вашим ноутбуком и там переопределить нужные вам методы. Подробнее о них можно почитать тут.

  • Чтобы этот скрипт использовался при развертывании модели, в конструкторе HuggingFaceModel нужно добавить еще один параметр:
huggingface_model = HuggingFaceModel(
	transformers_version='4.17.0',
	pytorch_version='1.10.2',
	py_version='py38',
	env=hub,
	role=role, 
      model_data='s3://my-bucket-for-gpt2/gpt2-medium-musk.tar.gz',
      entry_point='inference.py'
)
  • Разумеется, для уже созданных эндпоинтов такое изменение не будет учтено. Нужно будет заново задеплоить модель.
  • Приведем пример файла inference.py, который можно использовать для модели GPT-2:
import json
import torch
from transformers import GPT2Config, GPT2Tokenizer, GPT2LMHeadModel
 
def model_fn(model_dir):
    configuration = GPT2Config.from_pretrained(model_dir, output_hidden_states=False)
    tokenizer = GPT2Tokenizer.from_pretrained(
        model_dir,
        bos_token='<|sos|>', 
        eos_token='<|eos|>', 
        pad_token='<|pad|>'
    )
    model = GPT2LMHeadModel.from_pretrained(model_dir, config=configuration)
    model.resize_token_embeddings(len(tokenizer))
    model.eval()
    return (model, tokenizer)
 
def input_fn(request_body, request_content_type):
    if request_content_type == "application/json":
        request = json.loads(request_body)
    else:
        request = request_body
    return request
 
def predict_fn(data, model_tokenizer):
    model, tokenizer = model_tokenizer
 
    inputs = data.pop("inputs", "")
    max_length = data.pop("max_length", 50)
 
    input_ids = torch.tensor(tokenizer.encode(f'<|sos|>{inputs}')).unsqueeze(0)
    outputs = model.generate(
                input_ids, 
                max_length=max_length,
                bos_token_id=tokenizer.bos_token_id,
                pad_token_id=tokenizer.pad_token_id,
                eos_token_id=tokenizer.eos_token_id, 
                do_sample=True,
                top_k=0,
                top_p=0.95,
                no_repeat_ngram_size=4
    )
    decoded_output = tokenizer.decode(outputs[0])
 
    return {"decoded_output": decoded_output}
  1. В конце работы с ноутбуком в студии нужно будет обязательно вырубить все используемые для этого ресурсы. К сожалению, при простом закрытии вкладки со студией, ресурсы не освобождаются, поэтому приходится это делать самостоятельно. В противном случае, с вас будет списываться плата за их использование. Итак, вырубить все ненужное можно в самой студии, выбрав на панели слева вкладку Running Terminal and Kernels.

  • После закрытия ноутбука проверить то, что все ресурсы освобождены, можно на странице Amazon SageMaker. Для этого нужно будет нажать на имя пользователя и посмотреть на статус вашего приложения, тип которого KernelGateway. Статус должен быть Deleted.

  1. После того, как вы перестанете нуждаться в развернутой модели, нужно будет удалить эндпоинт. Если вы не освободили ресурсы, используемые ноутбуком в студии, то это можно будет сделать прямо оттуда, прописав строку:
predictor.delete_endpoint()
  • Иначе вы можете удалить эндпоинт, перейдя на страницу сервиса Amazon SageMaker. Там на левой панели нужно будет выбрать вкладку Inference, в выпадающем списке нажать Endpoints, затем справа выбрать нужный эндпоинт, нажать Actions и Delete.

  • Также можно будет удалить созданные модели, перейдя в Inference→Models, и конфигурации эндпоинтов, перейдя в Inference→Enpoint Configurations.

Итак, мы рассказали о том, как развертывать стилизованную модель GPT-2 для получения логических выводов в режиме реального времени (Real-time inference) с помощью Amazon SageMaker. Стоит отметить, что существует несколько вариантов развертывания, каждый из которых имеет свои особенности, например, асинхронность, пакетная обработка, наличие холодного старта, т.д. Использование того или иного варианта зависит от поставленных требований.

Подробнее про другие механизмы деплоя с помощью Amazon SageMaker читайте тут.

Деплой дашборда на виртуальной машине Amazon EC2

Время чтения текста – 4 минуты

Мы уже рассказывали о том, как развернуть дашборд с помощью сервиса Elastic Beanstalk от Amazon Web Services. В этом материале расскажем как развертывать дашборды на виртуальной машине Amazon EC2.

Подготовка

Начало работы с платформой AWS и создание сервера мы описали в материале Устанавливаем Clickhouse на AWS. Проект дашборда был подготовлен в предыдущей заметке Деплой дашборда на AWS Elastic Beanstalk. Все файлы можно скачать из нашего репозитория на GitHub.

Работа с терминалом

Подключитесь к вашему серверу на EC2 через терминал, используя SSH-ключ.
Из домашней директории копируем архив с необходимыми файлами на сервер командой scp:

scp -i /home/user/.ssh/ssh_key.pem /home/user/brewery_dashboard.zip ubuntu@api.sample.ru:/home/ubuntu/

Распаковываем архив с помощью команды unzip, указав директорию:

unzip -d /home/ubuntu/brewery_dashboard brewery_dashboard.zip

После этого в каталоге появится папка /brewery_dashboard/, в которой среди прочих будет текстовый файл requirements.txt. В нем находятся все библиотеки Python, которые нужны для корректной работы дашборда. Устанавливаем их следующей командой:

pip install -r requirements.txt

Запускаем дашборд

Создаем сервисный файл brewery.service в системной папке /etc/systemd/system:

sudo touch brewery.service

В нем прописываем всю необходимую информацию для деплоя нашего дашборда. Текстовый редактор вызывается следующей командой:

sudo nano brewery.service

В WorkingDirectory указываем папку, в которой находятся файлы проекта, а в ExecStart команду для запуска:

[Unit]
Description=Brewery Dashboard
After=network.target

[Service]
User=ubuntu
Group=www-data
WorkingDirectory=/home/ubuntu/brewery_dashboard/
ExecStart=/usr/bin/gunicorn3 --workers 3 --bind 0.0.0.0:8083 application:application

Запускаем brewery.service следующей командой:

sudo systemctl start brewery.service

И проверяем успешность запуска:

sudo systemctl status brewery.service

Система должна ответить, что все хорошо:

Теперь дашборд доступен по публичному адресу сервера с указанием порта . Можно открыть его в браузере или вставить на любой сайт с помощью тега <iframe>:

<ifrаme id='igraph' scrolling='no' style='border:none;'seamless='seamless' src='http://54.227.137.142:8083/' height='1100' width='800'></ifrаme>

Деплой дашборда на AWS Elastic Beanstalk

Время чтения текста – 7 минут

Если под рукой имеется машина на Amazon Web Services и стоит задача развернуть веб-приложение, можно воспользоваться сервисом Elastic Beanstalk от AWS: он позволяет развертывать приложения под другими сервисами от Amazon, включая EC2.

Готовим приложение

В материале «Делаем дашборд с параметром на Python» мы создали проект с двумя файлами: application.py — скрипт с генерацией локального дашборда и get_plots.py — скрипт, возвращающий scatter plot с пивоварнями Untappd из материала «Строим scatter plot по пивоварням Untappd». Немного подкорректируем файл application.py: чтобы приложение запускалось на Elastic Beanstalk, app.server в конце файла присвоим переменной application. Должно получиться вот так:

application = app.server

if __name__ == '__main__':
   application.run(debug=True, port=8080)

Перед тем, как развернуть приложение, нужно собрать его в архив. В архиве должны присутствовать все необходимые файлы, включая requirements.txt — перечень зависимостей приложения. В нём перечислены пакеты и версии, необходимые для запуска приложения. Чтобы его создать, достаточно в директории с проектом и окружением ввести команду pip freeze и отправить вывод в файл:

pip freeze > requirements.txt

Теперь соберём архив. В unix для архивации и сжатия предусмотрена встроенная утилита zip.

zip deploy_v0 application.py get_plots.py requirements.txt

Создаём приложение и окружение

Переходим на Elastic Beanstalk в раздел «Applications». Жмём на «Create a new application».

В открывшейся странице заполняем наименование приложения и описание. Ниже предлагается присвоить приложению теги для упрощенной категоризации ресурсов. Формат вводимого тега похож на словарь Python: это пара ключ — значение, ключ должен быть уникален. После заполнения данных жмём на оранжевую кнопку «Create».

Сразу после нам покажут список окружений для приложения: изначально он пустой, поэтому нажимаем на «Create a new environment».

Так как мы работаем с веб-приложением, выбираем окружение веб-сервера:

После предлагают ввести информацию о приложении, включая домен. Можно ввести свой домен, если таковой будет свободен:

Следом выбираем платформу веб-приложения. Наше написано на Python.

Теперь загружаем само приложение: так как код мы уже написали, выбираем «Upload your code» и прикрепляем файл с архивом. После жмём «Create environment».

Следом откроется окно с логами создания окружения. Пару минут придётся подождать.

Если все сделали правильно, увидим экран с галочкой и подписью «OK»: это означает, что наше приложение успешно загружено и доступно. Если захотим загрузить новую версию, достаточно пересобрать архив с файлами и загрузить его по кнопке «Upload and deploy».

По ссылке, представленной выше можем пройти на сайт, где лежит дашборд. При помощи тега <iframe> этот дашборд можно также встроить на другой сайт:

<iframe id="igraph" scrolling="no" style="border:none;"seamless="seamless" src="http://dashboardleftjoin-env.eba-qxzgfj64.us-east-2.elasticbeanstalk.com" height="1100" width="800"></iframe>

В итоге получим такой дашборд на сайте:

Полный код проекта на GitHub

Обрабатываем нажатие кнопки в Selenium

Время чтения текста – 10 минут

В материале «Парсим данные, используя Buetiful Soup и Selenium» мы уже рассмотрели, как быть, когда данные на странице динамически подгружаются при скролле страницы. Но бывают ситуации, когда новые данные можно получить, только нажав на кнопку «Показать ещё» — сегодня узнаем, как через Selenium сымитировать нажатие кнопки для полного открытия страницы, соберём идентификаторы пива, оценки к каждому продукту и отправим данные в Clickhouse

Структура страницы

Возьмём случайную пивоварню — у неё 105 чекинов, то есть, отзывов. Страница с чекинами пивоварни показывает не более 25 чекинов и выглядит так:

Если попробуем промотать в самый низ, столкнёмся с той самой кнопкой, мешающей нам взять все 105 за раз:

Мы поступим так: выясним, к какому классу относится элемент кнопки и будем на неё нажимать, пока это возможно. Так как Selenium запускает браузер, следующая кнопка «Показать ещё» может не успеть прогрузиться, поэтому между нажатиями поставим интервал в пару секунд. Как только страница раскроется полностью — мы возьмём её содержимое и распарсим нужные данные из чекинов. Зайдём в код страницы и найдём кнопку — она относится к классу more_checkins.

У кнопки есть свойства стиля, а именно — display. В случае, если кнопка должна отображаться, display принимает значение block. Но когда промотаем страницу до самого конца, кнопку не нужно будет показывать, ведь открывать больше нечего — поэтому display кнопки примет значение none. В случае, если мы запросим у кнопки display и вернётся none будем знать, что открывать больше нечего и можно перестать жать на кнопку.

Пишем код

Начнём с импорта библиотек:

import time
from selenium import webdriver
from bs4 import BeautifulSoup as bs
import re
from datetime import datetime
from clickhouse_driver import Client

Chromedriver, необходимый для запуска браузера через Selenium, можно установить с официальной страницы

Подключимся к базе данных, зададим cookies:

client = Client(host='ec1-23-456-789-10.us-east-2.compute.amazonaws.com', user='', password='', port='9000', database='')
count = 0
cookies = {
    'domain':'untappd.com',
    'expiry':1594072726,
    'httpOnly':True,
    'name':'untappd_user_v3_e',
    'path':'/',
    'secure':False,
    'value':'your_value'
}

О том, как запускать Selenium с cookies можно прочитать в материале «Парсим данные каталога сайта, используя Beautiful Soup и Selenium». Нам нужен параметр untappd_user_v3_e.

Так как мы планируем работать с пивоварнями, у которых более сотни тысяч чекинов, может оказаться так, что страница будет чересчур тяжёлой, и нагрузка на машину будет огромна. Чтобы этого избежать, отключим всё лишнее, а затем подключим cookie для авторизации:

options = webdriver.ChromeOptions()
prefs = {'profile.default_content_setting_values': {'images': 2, 
                            'plugins': 2, 'fullscreen': 2}}
options.add_experimental_option('prefs', prefs)
options.add_argument("start-maximized")
options.add_argument("disable-infobars")
options.add_argument("--disable-extensions")
driver = webdriver.Chrome(options=options)
driver.get('https://untappd.com/TooSunnyBrewery')
driver.add_cookie(cookies)

Напишем функцию, которая принимает ссылку, переходит по ней, полностью раскрывает страницу и возвращает нам soup, который можно будет распарсить. Получим display кнопки и запишем в переменную more_checkins: пока он не равен none будем нажимать на кнопку и снова получать её display. Сделаем интервал в две секунды между нажатиями, чтобы подождать прогрузку страницы. Как только будет получена вся страница, переведём её в soup библиотекой bs4.

def get_html_page(url):
    driver.get(url)
    driver.maximize_window()
    more_checkins = driver.execute_script("var more_checkins=document.getElementsByClassName('more_checkins_logged')[0].style.display;return more_checkins;")
    print(more_checkins)
    while more_checkins != "none":
        driver.execute_script("document.getElementsByClassName('more_checkins_logged')[0].click()")
        time.sleep(2)
        more_checkins = driver.execute_script("var more_checkins=document.getElementsByClassName('more_checkins_logged')[0].style.display;return more_checkins;")
        print(more_checkins)
    source_data = driver.page_source
    soup = bs(source_data, 'lxml')
    return soup

Напишем следующую функцию: она тоже будет принимать url страницы, передавать его в get_html_page, получать soup и парсить его. Функция вернёт запакованные списки с идентификатором пива и оценкой к нему.

О том, как парсить элементы страницы мы уже говорили в материале «Парсим данные каталога сайта, используя Beautiful Soup».

def parse_html_page(url):
    soup = get_html_page(url)
    brewery_id = soup.find_all('a', {'class':'label',
                                     'href':re.compile('https://untappd.com/brewery/*')})[0]['href'][28:]
    items = soup.find_all('div', {'class':'item',
                                  'id':re.compile('checkin_*')})
    checkin_rating_list = []
    beer_id_list = []
    count = 0
    print('Заполняю списки')
    for checkin in items:
        print(count, '/', len(items))
        try:
            checkin_rating_list.append(float(checkin.find('div', {'class':'caps'})['data-rating']))
        except Exception:
            checkin_rating_list.append('cast(Null as Nullable(Float32))')
        try:
            beer_id_list.append(int(checkin.find('a', {'class':'label'})['href'][-7:]))
        except Exception:
            beer_id_list.append('cast(Null as Nullable(UInt64))')
        count += 1 
    return zip(checkin_rating_list, beer_id_list)

Наконец, напишем вызов функций по пивоварням. В материале «Использование словарей в Clickhouse на примере данных Untappd» мы уже рассмотрели, как получить список идентификаторов российских пивоварен — обратимся к нему через таблицу в Clickhouse

brewery_list = client.execute('SELECT brewery_id FROM brewery_info')

Если посмотрим на brewery_list, то узнаем, что данные вернулись в неудобном формате: это список кортежей.

Небольшое лямбда-выражение позволит его «выпрямить»:

flatten = lambda lst: [item for sublist in lst for item in sublist]
brewery_list = flatten(brewery_list)

Работать с таким списком значительно комфортнее:

Для каждой пивоварни в списке сформируем url — он состоит из стандартной ссылки и идентификатора пивоварни в конце. Отправим url в функцию parse_html_page, которая сама вызовет get_html_page и вернёт списки с beer_id и rating_score. Так как два списка вернутся упакованными можем пройти по ним итератором, сформировав кортеж и отправив его в Clickhouse.

for brewery_id in brewery_list:
    print('Беру пивоварню с id', brewery_id, count, '/', len(brewery_list))
    url = 'https://untappd.com/brewery/' + str(brewery_id)
    returned_checkins = parse_html_page(url)
    for rating, beer_id in returned_checkins:
        tuple_to_insert = (rating, beer_id)
        try:
            client.execute(f'INSERT INTO beer_reviews VALUES {tuple_to_insert}')
        except errors.ServerException as E:
            print(E)
    count += 1

С кнопками на этом всё. Постепенно мы формируем отличный датасет для последующего анализа, который рассмотрим в следующем цикле статей, как только завершим сбор данных — возможно, через месяц.

Создаём материализованное представление в Clickhouse

Время чтения текста – 10 минут

В этот раз разберёмся, как с помощью Python передавать в Clickhouse данные по рекламным кампаниям и построим агрегат, используя материализованное представление.
Для чего нам материализованные представления? Часто Clickhouse используется для работы с огромными объемами данных, а время получения ответа на запрос к таблице с сырыми данными постоянно растёт. Стандартно, чтобы решить такую задачу эффективным способом, чаще всего используют ETL-процессы или создают таблицы агрегатов, что не очень удобно, ведь их необходимо регулярно пересчитывать. Clickhouse обладает встроенной и эффективной возможностью для решения задачи — материализованными представлениями.
Материализованные представления физически хранят и обновляют данные на диске в соответствии с запросом SELECT, на основе которого представление было создано. При вставке данных в искомую таблицу SELECT преобразовывает данные и вставляет их в представление.

Настройка машины
Наш скрипт на Python из предыдущих материалов необходимо подключить к Clickhouse — он будет отправлять запросы, поэтому нужно открыть несколько портов. В Dashboard AWS переходим в Network & Security — Security Groups. Наша машина входит в группу launch-wizard-1. Переходим в неё и смотрим на Inbound rules: нам нужно добавить правила как на скриншоте.

Настройка Clickhouse
Теперь настроим Clickhouse. Отредактируем файл config.xml в редакторе nano:

cd /etc/clickhouse-server
sudo nano config.xml

Воспользуйтесь мануалом по горячим клавишам, если тоже не сразу поняли, как выйти из nano.

Раскоментируем строку

<listen_host>0.0.0.0</listen_host>

чтобы доступ к базе данных был с любого IP-адреса:

Создание таблицы и материализованного представления
Зайдём в клиент и создадим нашу базу данных, в которой впоследствии создадим таблицы:

CREATE DATABASE db1
USE db1

Мы проиллюстрируем всё тот же пример сбора данных с Facebook. Информация по кампаниям может часто обновляться, и мы, в целях упражнения, хотим создать материализованное представление, которое будет автоматически пересчитывать агрегаты на основе собранных данных по затратам. Таблица в Clickhouse будет практически такой же, как DataFrame из прошлого материала. В качестве движка таблицы используем CollapsingMergeTree: он будет удалять дубликаты по ключу сортировки:

CREATE TABLE facebook_insights(
	campaign_id UInt64,
	clicks UInt32,
	spend Float32,
	impressions UInt32,
	date_start Date,
	date_stop	 Date,
	sign Int8
) ENGINE = CollapsingMergeTree
ORDER BY (date_start, date_stop)

И сразу создадим материализованное представление:

CREATE MATERIALIZED VIEW fb_aggregated
ENGINE = SummingMergeTree()
ORDER BY date_start
	AS
	SELECT campaign_id,
		      date_start,
		      sum(spend * sign) as spent,
		      sum(impressions * sign) as impressions,
		      sum(clicks * sign) as clicks
	FROM facebook_insights
	GROUP BY date_start, campaign_id

Подробности рецепта можно посмотреть в блоге Clickhouse.

К сожалению, в Clickhouse UPDATE отсутствует, поэтому необходимо придумывать некоторые ухищрения. Мы воспользовались рецептом от команды Яндекса для обходного пути команды UPDATE. Идея состоит в том, чтобы в начале вставить строки, которые уже были в таблице с отрицательным Sign, а затем использовать Sign для сторнирования. Следуя этому рецепту старые данные не будут учитываться при суммировании.

Скрипт
Начнём писать скрипт. Понадобится новая библиотека — clickhouse_driver, позволяющая отправлять запросы к Clickhouse из скрипта на Python:

В материале приведена только доработка скрипта, описанного в статье «Собираем данные по рекламным кампаниям в Facebook». Всё будет работать, если вы просто вставите код из текущего материала в скрипт предыдущего.

from datetime import datetime, timedelta
from clickhouse_driver import Client
from clickhouse_driver import errors

Объект класса Client позволит отправлять запросы методом execute(). В host вводим свой public dns, user ставим default, port — 9000 и в database базу данных для подключения.

client = Client(host='ec1-2-34-56-78.us-east-2.compute.amazonaws.com', user='default', password=' ', port='9000', database='db1')

Чтобы удостовериться, что всё нормально, можно написать следующий запрос, который должен вывести наименования всех баз данных на сервере:

client.execute('SHOW DATABASES')

В случае успеха получим на экране такой список:

[('_temporary_and_external_tables',), ('db1',), ('default',), ('system',)]

Пусть, например, мы хотим рассматривать данные за последние три дня. Получим эти даты библиотекой datetime и переведём в нужный формат методом strftime():

date_start = datetime.now() - timedelta(days=3)
date_end = datetime.now() - timedelta(days=1)
date_start_str = date_start.strftime("%Y-%m-%d")
date_end_str = date_end.strftime("%Y-%m-%d")

Напишем вот такой запрос, получающий все колонки таблицы за это время:

SQL_select = f"select campaign_id, clicks, spend, impressions, date_start, date_stop, sign from facebook_insights where date_start > '{date_start_str}' AND date_start < '{date_end_str}'"

И выполним запрос, поместив информацию в список old_data_list. А затем поменяем всем sign на -1 и добавим в new_data_list:

new_data_list = []
old_data_list = []
old_data_list = client.execute(SQL_select)

for elem in old_data_list:
    elem = list(elem)
    elem[len(elem) - 1] = -1
    new_data_list.append(elem)

Наконец, напишем наш алгоритм: вставляем те же самые данные с sign = −1, оптимизируем для удаления дубликатов движком CollapsingMergeTree и выполняем INSERT новых данных со знаком sign = 1.

SQL_query = 'INSERT INTO facebook_insights VALUES'
client.execute(SQL_query, new_data_list)
SQL_optimize = "OPTIMIZE TABLE facebook_insights"
client.execute(SQL_optimize)
for i in range(len(insight_campaign_id_list)):
    client.execute(SQL_query, [[insight_campaign_id_list[i],
                                insight_clicks_list[i],
                                insight_spend_list[i],
                                insight_impressions_list[i],
                                datetime.strptime(insight_date_start_list[i], '%Y-%m-%d').date(),
                                datetime.strptime(insight_date_start_list[i], '%Y-%m-%d').date(),
                                1]])
    client.execute(SQL_optimize)

Вернёмся в Clickhouse. Выполним SELECT * FROM facebook_insights LIMIT 20, чтобы посмотреть первые 20 строк таблицы:

И SELECT * FROM fb_aggregated LIMIT 20, чтобы проверить наше представление:

Отлично! Мы сделали материализованное представление — теперь новые данные, поступающие в таблицу facebook_insights будут поступать и в материализованное представление fb_aggregated и каждый раз пересчитываться благодаря SummingMergeTree. При этом трюк с sign позволяет отлавливать уже обработанные записи и не допускать их суммирования, а CollapsingMergeTree — чистить дубликаты.

Ранее Ctrl + ↓