5 заметок с тегом

clickhouse

Обрабатываем нажатие кнопки в Selenium

В материале «Парсим данные, используя Buetiful Soup и Selenium» мы уже рассмотрели, как быть, когда данные на странице динамически подгружаются при скролле страницы. Но бывают ситуации, когда новые данные можно получить, только нажав на кнопку «Показать ещё» — сегодня узнаем, как через Selenium сымитировать нажатие кнопки для полного открытия страницы, соберём идентификаторы пива, оценки к каждому продукту и отправим данные в Clickhouse

Структура страницы

Возьмём случайную пивоварню — у неё 105 чекинов, то есть, отзывов. Страница с чекинами пивоварни показывает не более 25 чекинов и выглядит так:

Если попробуем промотать в самый низ, столкнёмся с той самой кнопкой, мешающей нам взять все 105 за раз:

Мы поступим так: выясним, к какому классу относится элемент кнопки и будем на неё нажимать, пока это возможно. Так как Selenium запускает браузер, следующая кнопка «Показать ещё» может не успеть прогрузиться, поэтому между нажатиями поставим интервал в пару секунд. Как только страница раскроется полностью — мы возьмём её содержимое и распарсим нужные данные из чекинов. Зайдём в код страницы и найдём кнопку — она относится к классу more_checkins.

У кнопки есть свойства стиля, а именно — display. В случае, если кнопка должна отображаться, display принимает значение block. Но когда промотаем страницу до самого конца, кнопку не нужно будет показывать, ведь открывать больше нечего — поэтому display кнопки примет значение none. В случае, если мы запросим у кнопки display и вернётся none будем знать, что открывать больше нечего и можно перестать жать на кнопку.

Пишем код

Начнём с импорта библиотек:

import time
from selenium import webdriver
from bs4 import BeautifulSoup as bs
import re
from datetime import datetime
from clickhouse_driver import Client

Chromedriver, необходимый для запуска браузера через Selenium, можно установить с официальной страницы

Подключимся к базе данных, зададим cookies:

client = Client(host='ec1-23-456-789-10.us-east-2.compute.amazonaws.com', user='', password='', port='9000', database='')
count = 0
cookies = {
    'domain':'untappd.com',
    'expiry':1594072726,
    'httpOnly':True,
    'name':'untappd_user_v3_e',
    'path':'/',
    'secure':False,
    'value':'your_value'
}

О том, как запускать Selenium с cookies можно прочитать в материале «Парсим данные каталога сайта, используя Beautiful Soup и Selenium». Нам нужен параметр untappd_user_v3_e.

Так как мы планируем работать с пивоварнями, у которых более сотни тысяч чекинов, может оказаться так, что страница будет чересчур тяжёлой, и нагрузка на машину будет огромна. Чтобы этого избежать, отключим всё лишнее, а затем подключим cookie для авторизации:

options = webdriver.ChromeOptions()
prefs = {'profile.default_content_setting_values': {'images': 2, 
                            'plugins': 2, 'fullscreen': 2}}
options.add_experimental_option('prefs', prefs)
options.add_argument("start-maximized")
options.add_argument("disable-infobars")
options.add_argument("--disable-extensions")
driver = webdriver.Chrome(options=options)
driver.get('https://untappd.com/TooSunnyBrewery')
driver.add_cookie(cookies)

Напишем функцию, которая принимает ссылку, переходит по ней, полностью раскрывает страницу и возвращает нам soup, который можно будет распарсить. Получим display кнопки и запишем в переменную more_checkins: пока он не равен none будем нажимать на кнопку и снова получать её display. Сделаем интервал в две секунды между нажатиями, чтобы подождать прогрузку страницы. Как только будет получена вся страница, переведём её в soup библиотекой bs4.

def get_html_page(url):
    driver.get(url)
    driver.maximize_window()
    more_checkins = driver.execute_script("var more_checkins=document.getElementsByClassName('more_checkins_logged')[0].style.display;return more_checkins;")
    print(more_checkins)
    while more_checkins != "none":
        driver.execute_script("document.getElementsByClassName('more_checkins_logged')[0].click()")
        time.sleep(2)
        more_checkins = driver.execute_script("var more_checkins=document.getElementsByClassName('more_checkins_logged')[0].style.display;return more_checkins;")
        print(more_checkins)
    source_data = driver.page_source
    soup = bs(source_data, 'lxml')
    return soup

Напишем следующую функцию: она тоже будет принимать url страницы, передавать его в get_html_page, получать soup и парсить его. Функция вернёт запакованные списки с идентификатором пива и оценкой к нему.

О том, как парсить элементы страницы мы уже говорили в материале «Парсим данные каталога сайта, используя Beautiful Soup».

def parse_html_page(url):
    soup = get_html_page(url)
    brewery_id = soup.find_all('a', {'class':'label',
                                     'href':re.compile('https://untappd.com/brewery/*')})[0]['href'][28:]
    items = soup.find_all('div', {'class':'item',
                                  'id':re.compile('checkin_*')})
    checkin_rating_list = []
    beer_id_list = []
    count = 0
    print('Заполняю списки')
    for checkin in items:
        print(count, '/', len(items))
        try:
            checkin_rating_list.append(float(checkin.find('div', {'class':'caps'})['data-rating']))
        except Exception:
            checkin_rating_list.append('cast(Null as Nullable(Float32))')
        try:
            beer_id_list.append(int(checkin.find('a', {'class':'label'})['href'][-7:]))
        except Exception:
            beer_id_list.append('cast(Null as Nullable(UInt64))')
        count += 1 
    return zip(checkin_rating_list, beer_id_list)

Наконец, напишем вызов функций по пивоварням. В материале «Использование словарей в Clickhouse на примере данных Untappd» мы уже рассмотрели, как получить список идентификаторов российских пивоварен — обратимся к нему через таблицу в Clickhouse

brewery_list = client.execute('SELECT brewery_id FROM brewery_info')

Если посмотрим на brewery_list, то узнаем, что данные вернулись в неудобном формате: это список кортежей.

Небольшое лямбда-выражение позволит его «выпрямить»:

flatten = lambda lst: [item for sublist in lst for item in sublist]
brewery_list = flatten(brewery_list)

Работать с таким списком значительно комфортнее:

Для каждой пивоварни в списке сформируем url — он состоит из стандартной ссылки и идентификатора пивоварни в конце. Отправим url в функцию parse_html_page, которая сама вызовет get_html_page и вернёт списки с beer_id и rating_score. Так как два списка вернутся упакованными можем пройти по ним итератором, сформировав кортеж и отправив его в Clickhouse.

for brewery_id in brewery_list:
    print('Беру пивоварню с id', brewery_id, count, '/', len(brewery_list))
    url = 'https://untappd.com/brewery/' + str(brewery_id)
    returned_checkins = parse_html_page(url)
    for rating, beer_id in returned_checkins:
        tuple_to_insert = (rating, beer_id)
        try:
            client.execute(f'INSERT INTO beer_reviews VALUES {tuple_to_insert}')
        except errors.ServerException as E:
            print(E)
    count += 1

С кнопками на этом всё. Постепенно мы формируем отличный датасет для последующего анализа, который рассмотрим в следующем цикле статей, как только завершим сбор данных — возможно, через месяц.

Использование словарей в Clickhouse на примере данных Untappd

В Clickhouse реализована возможность использования внутренних и внешних словарей, которые могут быть альтернативой JOIN (которые, к сожалению, не всегда здорово работают). Словари хранят информацию в памяти и к ним можно обратиться командой dictGet. Рассмотрим как создать словарь в Clickhouse и как его можно использовать в запросах.

Будем изучать функционал на примере данных из API Untappd. Untappd — социальная сеть любителей крафтового пива. Мы сфокусируемся на чекинах российких крафтовых пивоварен, начнем собирать информацию о них, чтобы в следующих постах проанализировать данные и сделать некоторые выводы. В рамках этого поста разберем получение мета-информации о российских пивоварнях на Untappd, а полученные данные сохраним в словаре Clickhouse.

Собираем данные с Untappd

Для обращений к API нужны client_id и  client_secret_key — их можно получить, создав приложение. Для этого переходим в раздел создания приложения в документации и указываем некоторые данные:

После отправления заявки нужно будет подождать некоторое время: от 1 до 3 недель.

import requests
import pandas as pd
import time

Отправлять запросы к API будем через requests, а в pandas посмотрим на результаты и выгрузим в csv, чтобы отправить в словарь Clickhouse. У Untappd строгие ограничения на количество запросов: всего в час можно отправить 100 запросов, поэтому будем библиотекой time ставить скрипт в ожидание на 38 секунд, чтобы число запросов в час не превосходило 100.

client_id = 'ваш_client_id'
client_secret = 'ваш_client_secret'
all_brewery_of_russia = []

Мы хотим собрать всю тысячу российских пивоварен. Один запрос к методу Brewery Search позволяет получить до 50 пивоварен. При поиске вручную на сайте Untappd по слову «Russia» сайт выдаст 3369 пивоварен:

Проверим это: пролистаем страницу до самого низа и откроем код страницы.

Каждая полученная пивоварня в поиске находится в классе beer-item. Значит, можем в поиске посчитать количество упоминаний beer-item:

И выясняем, что на самом деле их здесь ровно 1000, а не 3369. По запросу Russia в выборку попадают и американские пивоварни, а некоторые были удалены. Значит, придётся отправить 20 запросов, будем получать по 50 пивоварен за раз:

for offset in range(0, 1000, 50):
    try:
        print('offset = ', offset)
        print('осталось:', 1000 - offset, '\n')
        response = requests.get(f'https://api.untappd.com/v4/search/brewery?client_id={client_id}&client_secret={client_secret}',
                               params={
                                   'q':'Russia',
                                   'offset':offset,
                                   'limit':50
                               })
        item = response.json()
        print(item, '\n')
        all_brewery_of_russia.append(item)
        time.sleep(37)
    except Exception:
        print(Exception)
        continue

В параметрах метод Brewery Search принимает q — строку, по которой будем осуществлять поиск на сервисе. Укажем в ней «Russia», чтобы получить все пивоварни, связанные с Россией. Другой параметр — offset — отвечает за смещение. Получив первые 50 пивоварен мы смещаемся на 50 строк в поиске, чтобы получить следующие 50 пивоварен. limit отвечает за количество получаемых пивоварен и не может быть больше 50.
Преобразовываем ответ в формат json и добавляем полученные данные в список all_brewery_of_russia. Объект item будет содержать такие данные:

Но в полученных данных могли затесаться и пивоварни других стран. Отфильтруем их: пройдём итератором по всему списку all_brewery_of_russia и добавим в итоговый только те пивоварни, у которых параметр country_name принимает значение Russia.

brew_list = []
for element in all_brewery_of_russia:
    brew = element['response']['brewery']
    for i in range(brew['count']):
        if brew['items'][i]['brewery']['country_name'] == 'Russia':
            brew_list.append(brew['items'][i])

Посмотрим на первый элемент списка brew_list:

print(brew_list[0])

Соберём из списка DataFrame с колонками brewery_id, beer_count, brewery_name, brewery_slug, brewery_page_url, brewery_city, lat и  lng. Получим в отдельные списки данные из  brewery_list:

df = pd.DataFrame()
brewery_id_list = []
beer_count_list = []
brewery_name_list = []
brewery_slug_list = []
brewery_page_url_list = []
brewery_location_city = []
brewery_location_lat = []
brewery_location_lng = []
for brewery in brew_list:
    brewery_id_list.append(brewery['brewery']['brewery_id'])
    beer_count_list.append(brewery['brewery']['beer_count'])
    brewery_name_list.append(brewery['brewery']['brewery_name'])
    brewery_slug_list.append(brewery['brewery']['brewery_slug'])
    brewery_page_url_list.append(brewery['brewery']['brewery_page_url'])
 brewery_location_city.append(brewery['brewery']['location']['brewery_city'])
    brewery_location_lat.append(brewery['brewery']['location']['lat'])
    brewery_location_lng.append(brewery['brewery']['location']['lng'])

И отправим их в DataFrame:

df['brewery_id'] = brewery_id_list
df['beer_count'] = beer_count_list
df['brewery_name'] = brewery_name_list
df['brewery_slug'] = brewery_slug_list
df['brewery_page_url'] = brewery_page_url_list
df['brewery_city'] = brewery_location_city
df['brewery_lat'] = brewery_location_lat
df['brewery_lng'] = brewery_location_lng

Посмотрим, как выглядит наша таблица:

df.head()

Отсортируем значения по  brewery_id и выгрузим таблицу в формате csv без столбца с индексами и заголовков колонок:

df = df.sort_values(by='brewery_id')
df.to_csv('brewery_data.csv', index=False, header=False)

Создаём словарь Clickhouse

Словари для Clickhouse можно создавать по-разному. Мы попробуем задать его структуру в xml-файле, настроить конфигурационные файлы сервера и обращаться к нему через клиент. Наш xml будет иметь следующую структуру:

Со всеми способами создания словарей можно ознакомиться в документации

<yandex>
<dictionary>
        <name>breweries</name>
        <source>
                <file>
                        <path>/home/ubuntu/brewery_data.csv</path>
                        <format>CSV</format>
                </file>
        </source>
        <layout>
                <flat />
        </layout>
        <structure>
                <id>
                        <name>brewery_id</name>
                </id>
                <attribute>
                        <name>beer_count</name>
                        <type>UInt64</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_name</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_slug</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_page_url</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_city</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>lat</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>lng</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
        </structure>
        <lifetime>300</lifetime>
</dictionary>
</yandex>

Под  идёт имя словаря. В  указываем свойства колонок. Под тегом идёт ключевое поле, а под тегом укажем путь и формат файла. Скоро мы положим его в папку /home/ubuntu, поэтому так и укажем.

Загрузим нашу csv-таблицу и xml-файл на сервер, это можно сделать, например, по ftp через FileZilla. В одном из материалов мы учились ставить Clickhouse на бесплатную машину от Amazon, в этот раз будем работать там же. В FileZilla заходим в настройки SFTP и добавляем файл с ключом:

И подключаемся к серверу по адресу, который указан в консоли EC2 машины на AWS. Укажем протокол SFTP, свой Host и в качестве User — Ubuntu:

В случае перезагрузки машины через консоль Public DNS мог измениться

После подсоединения мы попадём в папку /home/ubuntu сервера. Положим файлы туда же. Теперь подключимся по SSH через Termius. Чтобы Clickhouse увидел файл со структурой словаря, его нужно положить в папку /etc/clickhouse-server:

О том, как подключаться в серверу на AWS через SSH-клиент мы рассказывали в материале «Устанавливаем Clickhouse на AWS»

sudo mv breweries_dictionary.xml /etc/clickhouse server/

Переходим в конфигурационный файл:

cd /etc/clickhouse-server
sudo nano config.xml

Нам нужен тег  — он указывает путь к файлу, который описывает структуру словарей. Укажем путь к нашему xml:

<dictionaries_config>/etc/clickhouse-server/breweries_dictionary.xml</dictionaries_config>

Сохраняем файл и запускаем клиент Clickhouse:

clickhouse client

Проверим, что наш словарь действительно загрузился:

SELECT * FROM system.dictionaries\G

В случае успеха получим подобное:

Напишем запрос к функции dictGet, чтобы получить название пивоварни под ID 999. Указываем первым аргументом наименование словаря, затем поле, значение которого хотим получить и ID.

SELECT dictGet('breweries', 'brewery_name', toUInt64(999))

Если сделаем всё правильно, то выясним, что под ID 999 находится Балтика:

Аналогичным образом удобно использовать функцию, когда в таблице с фактами хранится только ID измерения для получения понятного наименования.

Создаём материализованное представление в Clickhouse

В этот раз разберёмся, как с помощью Python передавать в Clickhouse данные по рекламным кампаниям и построим агрегат, используя материализованное представление.
Для чего нам материализованные представления? Часто Clickhouse используется для работы с огромными объемами данных, а время получения ответа на запрос к таблице с сырыми данными постоянно растёт. Стандартно, чтобы решить такую задачу эффективным способом, чаще всего используют ETL-процессы или создают таблицы агрегатов, что не очень удобно, ведь их необходимо регулярно пересчитывать. Clickhouse обладает встроенной и эффективной возможностью для решения задачи — материализованными представлениями.
Материализованные представления физически хранят и обновляют данные на диске в соответствии с запросом SELECT, на основе которого представление было создано. При вставке данных в искомую таблицу SELECT преобразовывает данные и вставляет их в представление.

Настройка машины
Наш скрипт на Python из предыдущих материалов необходимо подключить к Clickhouse — он будет отправлять запросы, поэтому нужно открыть несколько портов. В Dashboard AWS переходим в Network & Security — Security Groups. Наша машина входит в группу launch-wizard-1. Переходим в неё и смотрим на Inbound rules: нам нужно добавить правила как на скриншоте.

Настройка Clickhouse
Теперь настроим Clickhouse. Отредактируем файл config.xml в редакторе nano:

cd /etc/clickhouse-server
sudo nano config.xml

Воспользуйтесь мануалом по горячим клавишам, если тоже не сразу поняли, как выйти из nano.

Раскоментируем строку

<listen_host>0.0.0.0</listen_host>

чтобы доступ к базе данных был с любого IP-адреса:

Создание таблицы и материализованного представления
Зайдём в клиент и создадим нашу базу данных, в которой впоследствии создадим таблицы:

CREATE DATABASE db1
USE db1

Мы проиллюстрируем всё тот же пример сбора данных с Facebook. Информация по кампаниям может часто обновляться, и мы, в целях упражнения, хотим создать материализованное представление, которое будет автоматически пересчитывать агрегаты на основе собранных данных по затратам. Таблица в Clickhouse будет практически такой же, как DataFrame из прошлого материала. В качестве движка таблицы используем ReplacingMergeTree: он будет удалять дубликаты по ключу сортировки:

CREATE TABLE facebook_insights(
	campaign_id UInt64,
	clicks UInt32,
	spend Float32,
	impressions UInt32,
	date_start Date,
	date_stop	 Date,
	sign Int8
) ENGINE = ReplacingMergeTree
ORDER BY (date_start, date_stop)

И сразу создадим материализованное представление:

CREATE MATERIALIZED VIEW fb_aggregated
ENGINE = SummingMergeTree()
ORDER BY date_start
	AS
	SELECT campaign_id,
		      date_start,
		      sum(spend * sign) as spent,
		      sum(impressions * sign) as impressions,
		      sum(clicks * sign) as clicks
	FROM facebook_insights
	GROUP BY date_start, campaign_id

Подробности рецепта можно посмотреть в блоге Clickhouse.

К сожалению, в Clickhouse UPDATE отсутствует, поэтому необходимо придумывать некоторые ухищрения. Мы воспользовались рецептом от команды Яндекса для обходного пути команды UPDATE. Идея состоит в том, чтобы в начале вставить строки, которые уже были в таблице с отрицательным Sign, а затем использовать Sign для сторнирования. Следуя этому рецепту старые данные не будут учитываться при суммировании.

Скрипт
Начнём писать скрипт. Понадобится новая библиотека — clickhouse_driver, позволяющая отправлять запросы к Clickhouse из скрипта на Python:

В материале приведена только доработка скрипта, описанного в статье «Собираем данные по рекламным кампаниям в Facebook». Всё будет работать, если вы просто вставите код из текущего материала в скрипт предыдущего.

from datetime import datetime, timedelta
from clickhouse_driver import Client
from clickhouse_driver import errors

Объект класса Client позволит отправлять запросы методом execute(). В host вводим свой public dns, user ставим default, port — 9000 и в database базу данных для подключения.

client = Client(host='ec1-2-34-56-78.us-east-2.compute.amazonaws.com', user='default', password=' ', port='9000', database='db1')

Чтобы удостовериться, что всё нормально, можно написать следующий запрос, который должен вывести наименования всех баз данных на сервере:

client.execute('SHOW DATABASES')

В случае успеха получим на экране такой список:

[('_temporary_and_external_tables',), ('db1',), ('default',), ('system',)]

Пусть, например, мы хотим рассматривать данные за последние три дня. Получим эти даты библиотекой datetime и переведём в нужный формат методом strftime():

date_start = datetime.now() - timedelta(days=3)
date_end = datetime.now() - timedelta(days=1)
date_start_str = date_start.strftime("%Y-%m-%d")
date_end_str = date_end.strftime("%Y-%m-%d")

Напишем вот такой запрос, получающий все колонки таблицы за это время:

SQL_select = f"select campaign_id, clicks, spend, impressions, date_start, date_stop, sign from facebook_insights where date_start > '{date_start_str}' AND date_start < '{date_end_str}'"

И выполним запрос, поместив информацию в список old_data_list. А затем поменяем всем sign на -1 и добавим в new_data_list:

new_data_list = []
old_data_list = []
old_data_list = client.execute(SQL_select)

for elem in old_data_list:
    elem = list(elem)
    elem[len(elem) - 1] = -1
    new_data_list.append(elem)

Наконец, напишем наш алгоритм: вставляем те же самые данные с sign = −1, оптимизируем для удаления дубликатов движком ReplacingMergeTree и выполняем INSERT новых данных со знаком sign = 1.

SQL_query = 'INSERT INTO facebook_insights VALUES'
client.execute(SQL_query, new_data_list)
SQL_optimize = "OPTIMIZE TABLE facebook_insights"
client.execute(SQL_optimize)
for i in range(len(insight_campaign_id_list)):
    client.execute(SQL_query, [[insight_campaign_id_list[i],
                                insight_clicks_list[i],
                                insight_spend_list[i],
                                insight_impressions_list[i],
                                datetime.strptime(insight_date_start_list[i], '%Y-%m-%d').date(),
                                datetime.strptime(insight_date_start_list[i], '%Y-%m-%d').date(),
                                1]])
    client.execute(SQL_optimize)

Вернёмся в Clickhouse. Выполним SELECT * FROM facebook_insights LIMIT 20, чтобы посмотреть первые 20 строк таблицы:

И SELECT * FROM fb_aggregated LIMIT 20, чтобы проверить наше представление:

Отлично! Мы сделали материализованное представление — теперь новые данные, поступающие в таблицу facebook_insights будут поступать и в материализованное представление fb_aggregated и каждый раз пересчитываться благодаря SummingMergeTree. При этом трюк с sign позволяет отлавливать уже обработанные записи и не допускать их суммирования, а ReplacingMergeTree — чистить дубликаты.

Устанавливаем Clickhouse на AWS

Сегодня поработаем с Clickhouse — поставим его на личную бесплатную машину с Amazon Web Services.

Аккаунт на AWS и машина на Ubuntu
Проще всего Clickhouse установить из deb пакетов на машину под управлением Ubuntu. Сегодня вовсе необязательно иметь такую под рукой — достаточно создать годовой бесплатный аккаунт на Amazon Web Services, который выделит нам нужную машину. Переходим на https://aws.amazon.com и создаём аккаунт и попадаем в Dashboard. В разделе «Build a solution» идём в «Launch a virtual machine» и подбираем виртуальную машину. Нам подойдёт та, на которой установлен Ubuntu Server.

Заодно создаём key pair — пара состоит из публичного ключа AWS и приватного ключа-файла. Последний нужно сохранить у себя на компьютере для подключения к машине.

После откроется консоль EC2, где уже будет запущен наш Instance виртуальной машины. У него есть public dns — сохраняем его.

Подключаемся через Termius
Подключаться к виртуальной машине будем через протокол удалённого доступа ssh. Есть много клиентов, поддерживающих этот протокол — я буду использовать Termius. Жмём на «+ NEW HOST» и заполняем информацию.
В поле address вводим наш public dns, который заранее сохранили. В Username вводим «ubuntu», поле пароля оставляем пустым. Чтобы заполнить Key нужно добавить новый ключ — то есть, указать путь к файлу с расширением .pem, который мы получили при создании новой виртуальной машины. Должно получиться что-то такое:

Сразу после авторизации подключаемся к машине и получаем новый экран с консолью:

Для начала установим Clickhouse — это быстро. Выполним следующую команду, чтобы добавить репозиторий Clickhouse к нашим APT репозиториям:

Подробнее со всеми способами установки Clickhouse можно ознакомиться в документации

echo "deb http://repo.yandex.ru/clickhouse/deb/stable/ main/" | sudo tee /etc/apt/sources.list.d/clickhouse.list

Обязательно обновляем пакеты:

sudo apt-get update

Наконец, установим клиент и сервер командой:

sudo apt-get install -y clickhouse-server clickhouse-client

Готово! Клиент и сервер Clickhouse установлены на виртуальной машине. Запустим сервер, чтобы подсоединиться позже через клиент:

sudo service clickhouse-server start

И проверим успешность запуска командой:

sudo service clickhouse-server status

Получим примерно такой результат:

Clickhouse установлен! Для подключения к клиенту необходимо ввести:

clickhouse client

Чтобы убедиться, что всё работает как надо, документация предлагает ввести запрос

SELECT 1

Вот, что мы должны получить:

Готово! А в следующем материале поработаем в связке Python + Clickhouse: вернёмся к нашему скрипту, который получает информацию по рекламным кампаниям и сделаем так, чтобы эти данные собирались в таблицу и материализованное представление.

 2 комментария    34   1 мес   Amazon Web Services   AWS   clickhouse   Data analytics

Clickhouse в качестве consumer для Amazon MSK

Disclaimer: заметка носит технический характер, поэтому может быть интересна меньшему числу лиц с бизнес-бэкграундом.

В этом блоге еще ни разу не затрагивалась тема Clickhouse, одной из самых быстрых баз данных от компании Яндекс. Краткая справка без деталей: Clickhouse — наиболее эффективно написанная СУБД колоночного типа с точки зрения программного кода, информация о СУБД довольно подробно описана в документации и во множестве видео на Youtube (раз, два, три).

В своей практике последние четыре года я использовал Clickhouse в качестве аналитика и эксперта по построению аналитической отчетности. В основном для решения задачи визуализации отчетности / отчетов с параметрами / дашбордов использовался Redash как наиболее удобный интерфейс для доступа к данным Clickhouse.
Однако совсем недавно в Looker, о котором я рассказывал ранее, появилась возможность подключить Clickhouse в качестве источника данных. Следует заметить, что подключение к Clickhouse в Tableau существует уже довольно давно.

Архитектура аналитического сервиса, в основе которого лежит Clickhouse, в основном облачная. И в рассматриваемой задаче было именно так. Предположим, у вас существует выделенный instance EC2 в Amazon (на который вы установили Clickhouse) и отдельный Kafka-кластер (решение Amazon MSK).

Задача: подключить Clickhouse в качестве consumer для получения информации с брокеров вашего кластера Kafka. На самом деле, в документации на сайте Amazon MSK довольно подробно описано как именно подключаться к кластеру Kafka, не буду дублировать эту информацию. В моем случае гайд помог: топики создавались продюсером с машины, на которой установлен Clickhouse и с неё читались консюмером.

Но возникла проблема: при подключении Clickhouse к Kafka в качестве консюмера, происходила следующая ошибка:

020.02.02 18:01:56.209132 [ 46 ] {e7124cd5-2144-4b1d-bd49-8a410cdbd607} <Error> executeQuery: std::exception. Code: 1001, type: cppkafka::HandleException, e.what() = Local: Timed out, version = 20.1.2.4 (official build) (from 127.0.0.1:46586) (in query: SELECT * FROM events), Stack trace (when copying this message, always include the lines below):

Продолжительное время я искал информацию в документации Clickhouse о том, что может вызывать эту ошибку, но не смог ничего найти. Следующей мыслью стала проверка работы локального брокера Kafka с той же машины. Установил клиент Kafka, подключил Clickhouse, отправил данные в топик и Clickhouse с легкостью их прочитал, т. е. консюмер Clickhouse работает с локальным брокером, а значит и вовсе работает.

Поговорив со всеми своими знакомыми экспертами в области инфраструктуры и Clickhouse (Вася, Макс, привет!), с ходу мы не смогли определить в чем проблема. Проверили firewall, настройки сети, все было открыто. Подтверждалось также тем, что с локальной машины можно было отправить в топик удаленного брокера Kafka сообщения командой bin/kafka-console-producer.sh и прочитать оттуда же bin/kafka-console-consumer.sh.

Затем мне пришла в голову мысль обратиться к главному гуру и разработчику Clickhouse — Алексею Миловидову. Алексей с радостью постарался ответить на возникшие вопросы и предложил ряд гипотез, которые мы проверили (трассировку сетевых подключений и т. п.), однако и после более низкоуровневого аудита проблему локализовать не удалось. Тогда Алексей посоветовал обратиться к Михаилу Филимонову из компании Altinity. Михаил оказался очень отзывчивым экспертом и одну за другой предлагал гипотезы для проверки (параллельно подсказывая как именно будет лучше их проверить).

В итоге в ходе совместных усилий мы обнаружили, что проблема возникает у библиотеки librdkafka, так как другой пакет kafkacat, который использует эту же библиотеку отваливается от подключения к брокеру с той же проблемой (Local: timed out).

После изучения подключения через bin/kafka-console-consumer.sh и параметров подключения, Михаил посоветовал добавить такую строку в /etc/clickhouse-server/config.xml:

<kafka><security_protocol>ssl</security_protocol></kafka>

И, о чудо! Clickhouse подключился к кластеру и вытянул необходимые данные с брокера.

Надеюсь, этот рецепт и мой опыт позволит вам сэкономить время и силы на изучение похожей проблемы :)

 Нет комментариев    55   4 мес   clickhouse   Data engineering   expert   troubleshooting   yandex