8 заметок с тегом

clickhouse

Частотный словарь и биграммы по постам инвесторов

Время чтения текста – 16 минут

Тинькофф Инвестиции — сервис от Тинькофф Банка для инвестирования на Московской и Санкт-Петербургской биржах. Внутри сервиса есть социальная сеть «Пульс», где инвесторы любого уровня могут делиться своими опытом, мыслями и планами, комментировать и оценивать чужие посты. Сегодня решим такую задачу:
построим частотный словарь и биграммы по постам пользователей, разделив их по объёму портфеля, чтобы понять, чем отличаются посты людей с разным объёмом инвестиций.

Лента по ценной бумаге в Пульсе выглядит вот так:

У каждого инвестора есть личный профиль. Внутри отображается объём портфеля, статистика прироста за год и число сделок за последний месяц.

Схема в Clickhouse

Для биграмм и частотного словаря достаточно собрать только тексты постов, логины пользователей и объёмы портфеля, но ради спортивного интереса будем хранить ещё и рост портфеля, число сделок человека за месяц и количество оценок под его постом. Для хранения данных получится две таблицы posts и users:

CREATE TABLE tinkoff.posts
(
    `login` String,
    `post` String,
    `likes` Int16
)
ENGINE = MergeTree
ORDER BY login

 CREATE TABLE tinkoff.users
(
    `login` String,
    `volume_prefix` String,
    `volume` String,
    `year_stats_prefix` String,
    `year_stats` String
)
ENGINE = MergeTree()
ORDER BY login

В таблице с пользователями volume_prefix — это префикс «до» или «от», стоящий в объёме портфеля, а volume — сам объём портфеля. Соответственно years_stats_prefix обозначает, портфель за год упал или вырос, а year_stats — на сколько он упал или вырос. Такая схема из двух таблиц с ключом сортировки таблиц по полю login позволит их соединить позднее.

Пишем парсер постов

У Пульса нет своего API, поэтому для парсинга постов будем использовать Selenium.

Мы уже писали про то, как парсить сайты с прокруткой при помощи Selenium

Нам понадобятся следующие библиотеки:

from selenium import webdriver
import time
from webdriver_manager.chrome import ChromeDriverManager
from clickhouse_driver import Client
from bs4 import BeautifulSoup as bs
import pandas as pd
import requests
import os
from lxml import html
import re

Сразу составим список интересующих ценных бумаг: это акции Сбербанка, Газпрома, Яндекса, Лукойла, MailRu, Аэрофлота, Киви, ВТБ, Детского Мира и Ленты. Для каждой бумаги будем переходить на страницу с постами, в которых она упоминается и проматывать страницу, пока длина страницы не станет более 300000: это около одной недели.

driver = webdriver.Chrome(ChromeDriverManager().install())

securities = ['SBER', 'GAZP', 'YNDX', 'LKOH', 'MAIL', 'AFLT', 'QIWI', 'VTBR', 'DSKY', 'LNTA']
        
for security in securities:
    try:
        print(security)
        driver.get(f'https://www.tinkoff.ru/invest/stocks/{security}/pulse/')
        
        page_length = driver.execute_script("return document.body.scrollHeight")
        while page_length < 300000:
            driver.execute_script(f"window.scrollTo(0, {page_length - 1000});")
            page_length = driver.execute_script("return document.body.scrollHeight")

После забираем себе весь код полученной страницы и извлекаем нужную информацию: логины, текст постов и число лайков. Формируем из них DataFrame и записываем его в папку data.

source_data = driver.page_source
        soup = bs(source_data, 'lxml')
        
        posts = soup.find_all('div', {'class':'PulsePostCollapsed__text_1ypMP'})
        logins = soup.find_all('div', {'class':'PulsePostAuthor__nicknameLink_19Aca'})
        likes = soup.find_all('div', {'class':'PulsePostBody__likes_3qcu0'})
        
        logins = [login.text for login in logins]
        posts = [post.text for post in posts]
        likes = [like.text.split()[0] for like in likes]
        
        df_posts = pd.DataFrame()
        df_posts['login'] = logins
        df_posts['post'] = posts
        df_posts['likes'] = likes
        
        df_posts.to_csv(f'data/{security}.csv', index=False)
        
        print(f'SAVED {security}')
    except Exception as E:
        print(E)

После того, как нужные посты собраны, отправим их в таблицу posts в Clickhouse. При помощи модуля os переходим в директорию data и собираем в список all_files названия всех файлов в ней — это все csv-таблицы, которые мы спарсили. Затем по очереди читаем файл в DataFrame и вставляем в posts.

client = Client(host='', user='', password='', port='9000', database='tinkoff')

os.chdir('data')
all_files = os.listdir()

for file in all_files:
    df = pd.read_csv(file)
    client.execute("INSERT INTO posts VALUES", df.to_dict('records'))

Собираем информацию о профилях

Чтобы собрать все профили, получим уникальный список логинов из базы:

flatten = lambda t: [item for sublist in t for item in sublist]
logins = flatten(client.execute("SELECT DISTINCT login FROM posts"))

Получать посты будем request-запросом, без Selenium, ведь ничего листать уже не нужно. Но иконка падения или роста портфеля за год не является текстом и получить её нельзя, зато внутри CSS-стилей можно увидеть её цвет — его мы и будем сохранять себе.

Поэтому опишем такую функцию: она примет объект soup и извлечёт цвет иконки.

headers = {'accept': '*/*',
           'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36'}

def up_or_down_color(soup):
    string = str(soup.find('div', {'class':'Icon__container_3u7WK'}))
    start_index = string.find('style')
    color = string[start_index + 13:start_index + 20]
    return color

А теперь напишем парсер, который проходит по списку логинов, для каждого отправляет запрос и собирает всю статистику профиля. Причём если цвет иконки роста зеленый, то в поле year_stats_prefix добавим «+», иначе «-». В конце сделаем паузу на 0.2 секунды на всякий случай, чтобы не напороться на неявные ограничения.

session = requests.Session()

login_list = []
volume_list = []
volume_prefix_list = []
year_stats_list = []
year_stats_prefix_list = []

count = 0
for login in logins:
    print(count, '/', len(logins))
    
    try:
        count += 1
        if login == 'blocked_user':
            continue

        url = f'https://www.tinkoff.ru/invest/social/profile/{login}'
        request = session.get(url, headers=headers)
        soup = bs(request.content, 'lxml')

        try:
            login = soup.find('div', {'class':'ProfileHeader__nickname_1oynx'}).text
            volume = soup.find('span', {'class':'Money__money_3_Tn4'}).text
            _to = soup.find('div', {'class':'ProfileHeader__statistics_11-DO'}).text.find('До')
            _from = soup.find('div', {'class':'ProfileHeader__statistics_11-DO'}).text.find('От')
            year_stats = soup.find('div', {'class':'ProfileHeader__statisticsItem_1HPLt'}).text
            color = up_or_down_color(soup)
        except AttributeError as E:
            print(login, E)
            continue
        volume_list.append(volume)
        login_list.append(login)

        if _to == -1:
            volume_prefix_list.append('до')
        else:
            volume_prefix_list.append('от')

        year_stats_list.append(re.findall(r'\d+.+', year_stats)[0])

        if color == '#22a053':
            year_stats_prefix_list.append('-')
        elif color == '#dd5656':
            year_stats_prefix_list.append('+')
        else:
            year_stats_prefix_list.append('')
    except Exception as E:
        print(E)
        continue
    time.sleep(0.2)

Собираем все аккаунты и статистику по ним в DataFrame. Их тоже сохраним себе в базу.

df_users = pd.DataFrame()
df_users['login'] = login_list
df_users['volume_prefix'] = volume_prefix_list
df_users['volume'] = volume_list
df_users['year_stats_prefix'] = year_stats_prefix_list
df_users['year_stats'] = year_stats_list

client.execute("INSERT INTO users VALUES", df_users.to_dict('records'))

А теперь сделаем LEFT JOIN таблицы с постами к таблице с пользователями, чтобы у каждой строки с постом была ещё статистика по аккаунту автора. Запишем результат в DataFrame.

posts_with_users = client.execute('''
    SELECT login, post, likes, volume_prefix, volume, year_stats_prefix, year_stats FROM posts
    LEFT JOIN users
    ON posts.login = users.login
''')
posts_with_users_df = pd.DataFrame(posts_with_users, columns=['login', 'post', 'likes', 'volume_prefix', 'volume', 'year_stats_prefix', 'year_stats'])

Полученный результат будет выглядеть так:

Частотный словарь и биграммы

Для начала составим частотный словарь по постам без разделений на группы.

posts_with_users_df.post.str.split(expand=True).stack().value_counts()

Получим, что предлоги и союзы превалируют над остальными словами:

Значит, их нужно убрать из сета данных. Но даже в том случае, если данные будут очищены, получим что-то подобное. Такие данные ни о чём не говорят, и никаких выводов сделать не удастся.

В таком случае попробуем построить биграммы. Одна биграмма — последовательность из двух элементов, то есть два слова, стоящие рядом друг с другом. Существует много алгоритмов построения n-грамм разной степени оптимизации, мы воспользуемся встроенной функцией в nltk и разберём пример построения биграмм для одной группы. Первым делом импортируем дополнительные библиотеки, загружаем stopwords для русского языка и чистим данные. В список стоп-слов вносим дополнительные: среди них будут и тикеры акций, которые встречаются в каждом посте.

import nltk
from nltk.corpus import stopwords
from pymystem3 import Mystem
from string import punctuation
import unicodedata
import collections

nltk.download("stopwords")
nltk.download('punkt')
russian_stopwords = stopwords.words("russian")
append_stopword = ['это', 'sber', 'акция', 'компания', 'aflt', 'gazp', 'yndx', 'lkoh', 'mail', 'год', 'рынок', 'https', 'млрд', 'руб', 'www', 'кв']
russian_stopwords.extend(append_stopword)

Опишем функцию для подготовки текста, которая переведёт все слова в нижний регистр, приведёт к нормальной форме, удалит стоп-слова и пунктуацию:

mystem = Mystem() 

def preprocess_text(text):
    tokens = mystem.lemmatize(text.lower())
    tokens = [token for token in tokens if token not in russian_stopwords\
              and token != " " \
              and token.strip() not in punctuation]
    
    text = " ".join(tokens)
    
    return text

posts_with_users_df.post = posts_with_users_df.post.apply(preprocess_text)

Для примера возьмём посты группы инвесторов с объёмом портфеля до 10 тысяч рублей и построим биграммы, а затем выведем самые частые:

up_to_10k_df = posts_with_users_df[(posts_with_users_df['volume_prefix'] == 'от') & (posts_with_users_df['volume'] == '10 000 ₽')]

up_to_10k_counts = collections.Counter()
for sent in up_to_10k_df["post"]:
    words = nltk.word_tokenize(sent)
    up_to_10k_counts.update(nltk.bigrams(words))
up_to_10k_counts.most_common()

Получаем такой список:

Результаты исследования биграмм

В группе с объёмом портфеля до 10 и 100 тысяч руб. инвесторы чаще пишут о личном опыте и полученной прибыли: на это указывают биграммы «чистая прибыль» и «финансовый результат».

До 10 000 руб:

  1. добрый утро, 44
  2. цена нефть, 36
  3. неквалифицированный инвестор, 36
  4. чистый прибыль, 32
  5. шапка профиль, 30
  6. московский биржа, 30
  7. совет директор, 28

До 100 000 руб:

  1. чистый прибыль, 80
  2. финансовый результат, 67
  3. добрый утро, 66
  4. индекс мосбиржа, 63
  5. цена нефть, 58
  6. квартал 2020, 42
  7. мочь становиться 41

В группе до 500 тысяч руб. впервые появляются биграммы со словами «подписываться», «выкладывать», «новость» — инвесторы с такими характеристиками портфеля часто заводят собственные блоги об инвестировании и продвигают их через посты в Пульсе.

До 500 000 руб:

  1. чистый прибыль, 169
  2. квартал 2020, 154
  3. отчетность 3, 113
  4. выкладывать новость, 113
  5. 🤝подписываться, 80
  6. подписываться выкладывать, 80
  7. публиковать отчёт, 80
  8. цена нефть, 76
  9. колво бумага, 69
  10. вес портфель, 68

В биграммах группы с объёмом портфеля до и от 1 миллиона руб. появляется «фьючерс», что логично — это сложный инструмент, который обычно не рекомендуется новичкам. Кроме того, в постах группы проходит больше обсуждений отчётностей компаний — это биграммы «финансовый отчетность», «опубликовать финансовый», «отчетность мсфо».

До 1 000 000 руб:

  1. 3 квартал, 183
  2. квартал 2020, 157
  3. фьчерс утро, 110
  4. финансовый отчетность, 107
  5. опубликовать финансовый, 104
  6. чистый прибыль, 75
  7. наш биржа, 72
  8. отчетность мсфо, 69
  9. цена нефть, 67
  10. операционный результат, 61
  11. ноябрьский фьючерс, 54
  12. азиатский площадка, 51

От 1 000 000 руб:

  1. октябрь опубликовывать, 186
  2. 3 квартал, 168
  3. квартал 2020, 168
  4. финансовый отчетность, 159,
  5. опубликовывать финансовый, 95
  6. чистый прибыль, 94
  7. операционный результат, 86
  8. целевой цена, 74
  9. опубликовывать операционный, 63
  10. цена повышать, 60
 Нет комментариев    177   12 дн   clickhouse   Data Analytics   data science   nltk   python

Собираем топ-10 аккаунтов Instagram по теме аналитики и машинного обучения

Время чтения текста – 11 минут

В некоторых телеграм-каналах (раз, два) уже говорилось про другие интересные паблики в телеграме, однако по Instagram такого топа пока не было. Вероятно, это не самая популярная сеть для контента в нашей индустрии, тем не менее, можно проверить эту гипотезу, используя Python и данные. В этом материале рассказываем, как собрать данные по аккаунтам Instagram без API.

Метод сбора данных
Instagram API не позволит вам просто так собирать данные о других пользователях, но есть и другой метод. Можно отправить такой request-запрос:

https://instagram.com/leftjoin/?__a=1

И получить в ответе JSON-объект со всей информацией о пользователе, которую можно посмотреть самому: имя аккаунта, количество постов, подписок и подписчиков, а также первые десять постов с информацией про них: количество лайков, комментарии и прочее. Именно на таких request-запросах устроена библиотека pyInstagram.

Схема данных
Будем собирать данные в три таблицы Clickhouse: пользователи, посты и комментарии. В таблицу пользователей собираем всю информацию о них: идентификатор, наименование аккаунта, имя и фамилия человека, описание профиля, количество подписок и подписчиков, количество постов, суммарное количество комментариев и лайков, наличие верификации, география пользователя и ссылки на аватарку и Facebook.

CREATE TABLE instagram.users
(
    `added_at` DateTime,
    `user_id` UInt64,
    `user_name` String,
    `full_name` String,
    `base_url` String,
    `biography` String,
    `followers_count` UInt64,
    `follows_count` UInt64,
    `media_count` UInt64,
    `total_comments` UInt64,
    `total_likes` UInt64,
    `is_verified` UInt8,
    `country_block` UInt8,
    `profile_pic_url` Nullable(String),
    `profile_pic_url_hd` Nullable(String),
    `fb_page` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

В таблицу с постами сохраняем автора поста, идентификатор записи, текст, количество комментариев и прочее. is_ad, is_album и is_video — поля, проверяющие, является ли запись рекламной, «каруселью» изображений или видеозаписью.

CREATE TABLE instagram.posts
(
    `added_at` DateTime,
    `owner` String,
    `post_id` UInt64,
    `caption` Nullable(String),
    `code` String,
    `comments_count` UInt64,
    `comments_disabled` UInt8,
    `created_at` DateTime,
    `display_url` String,
    `is_ad` UInt8,
    `is_album` UInt8,
    `is_video` UInt8,
    `likes_count` UInt64,
    `location` Nullable(String),
    `recources` Array(String),
    `video_url` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

В таблице с комментариями храним отдельно каждый комментарий к записи с автором и текстом.

CREATE TABLE instagram.comments
(
    `added_at` DateTime,
    `comment_id` UInt64,
    `post_id` UInt64,
    `comment_owner` String,
    `comment_text` String
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

Скрипт
Из библиотеки pyInstagram нам понадобятся классы Account, Media, WebAgent и Comment.

from instagram import Account, Media, WebAgent, Comment
from datetime import datetime
from clickhouse_driver import Client
import requests
import pandas as pd

Создаем экземпляр класса WebAgent — он необходим для вызова некоторых методов и обновления аккаунтов. В начале нам нужно иметь хотя бы названия профилей пользователей, информацию о которых мы хотим собрать, поэтому отправим другой request-запрос для поиска пользователей по ключевым словам, их список ниже в фрагменте кода. В выдаче будут аккаунты, у которых название или описание профиля совпало с ключевым словом.

agent = WebAgent()
queries_list = ['machine learning', 'data science', 'data analytics', 'analytics', 'business intelligence',
                'data engineering', 'computer science', 'big data', 'artificial intelligence',
                'deep learning', 'data scientist','machine learning engineer', 'data engineer']
client = Client(host='12.34.56.789', user='default', password='', port='9000', database='instagram')
url = 'https://www.instagram.com/web/search/topsearch/?context=user&count=0'

Проходим по всем ключевым словам и собираем все аккаунты. Так как в списке могли образоваться дубликаты, переведём список в множество и обратно в список.

response_list = []
for query in queries_list:
    response = requests.get(url, params={
        'query': query
    }).json()
    response_list.extend(response['users'])
instagram_pages_list = []
for item in response_list:
    instagram_pages_list.append(item['user']['username'])
instagram_pages_list = list(set(instagram_pages_list))

Теперь проходим по списку аккаунтов, и если аккаунта с таким наименованием ещё не было в базе, то получаем расширенную информацию о нём. Для этого пробуем создать экземпляр класса Account, передав username параметром. После при помощи объекта agent обновляем информацию об аккаунте. Будем собирать только первые 100 постов, чтобы сбор не задерживался. Создадим список media_list — он при помощи метода get_media будет хранить код каждого поста, который затем можно будет получить при помощи класса Media.


Сбор медиа аккаунта

all_posts_list = []
username_count = 0
for username in instagram_pages_list:
    if client.execute(f"SELECT count(1) FROM users WHERE user_name='{username}'")[0][0] == 0:
        print('username:', username_count, '/', len(instagram_pages_list))
        username_count += 1
        account_total_likes = 0
        account_total_comments = 0
        try:
            account = Account(username)
        except Exception as E:
            print(E)
            continue
        try:
            agent.update(account)
        except Exception as E:
            print(E)
            continue
        if account.media_count < 100:
            post_count = account.media_count
        else:
            post_count = 100
        print(account, post_count)
        media_list, _ = agent.get_media(account, count=post_count, delay=1)
        count = 0

Мы начинаем с постов и комментариев, потому что для занесения в базу нового пользователя нам нужно подсчитать сперва суммарное количество комментариев и лайков в его аккаунте. Практически все интересующие поля являются атрибутами класса Media.


Сбор постов пользователя

for media_code in media_list:
            if client.execute(f"SELECT count(1) FROM posts WHERE code='{media_code}'")[0][0] == 0:
                print('posts:', count, '/', len(media_list))
                count += 1

                post_insert_list = []
                post = Media(media_code)
                agent.update(post)
                post_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                post_insert_list.append(str(post.owner))
                post_insert_list.append(post.id)
                if post.caption is not None:
                    post_insert_list.append(post.caption.replace("'","").replace('"', ''))
                else:
                    post_insert_list.append("")
                post_insert_list.append(post.code)
                post_insert_list.append(post.comments_count)
                post_insert_list.append(int(post.comments_disabled))
                post_insert_list.append(datetime.fromtimestamp(post.date).strftime('%Y-%m-%d %H:%M:%S'))
                post_insert_list.append(post.display_url)
                try:
                    post_insert_list.append(int(post.is_ad))
                except TypeError:
                    post_insert_list.append('cast(Null as Nullable(UInt8))')
                post_insert_list.append(int(post.is_album))
                post_insert_list.append(int(post.is_video))
                post_insert_list.append(post.likes_count)
                if post.location is not None:
                    post_insert_list.append(post.location)
                else:
                    post_insert_list.append('')
                post_insert_list.append(post.resources)
                if post.video_url is not None:
                    post_insert_list.append(post.video_url)
                else:
                    post_insert_list.append('')
                account_total_likes += post.likes_count
                account_total_comments += post.comments_count
                try:
                    client.execute(f'''
                        INSERT INTO posts VALUES {tuple(post_insert_list)}
                    ''')
                except Exception as E:
                    print('posts:')
                    print(E)
                    print(post_insert_list)

Чтобы собрать комментарии необходимо вызвать метод get_comments и передать параметром экземпляр класса Media.


Сбор комментариев из поста

comments = agent.get_comments(media=post)
                for comment_id in comments[0]:
                    comment_insert_list = []
                    comment = Comment(comment_id)
                    comment_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                    comment_insert_list.append(comment.id)
                    comment_insert_list.append(post.id)
                    comment_insert_list.append(str(comment.owner))
                    comment_insert_list.append(comment.text.replace("'","").replace('"', ''))
                    try:
                        client.execute(f'''
                            INSERT INTO comments VALUES {tuple(comment_insert_list)}
                        ''')
                    except Exception as E:
                        print('comments:')
                        print(E)
                        print(comment_insert_list)


Наконец, когда все посты и комментарии пройдены, можем занести информацию о пользователе.

Сбор информации о пользователе

user_insert_list = []
        user_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
        user_insert_list.append(account.id)
        user_insert_list.append(account.username)
        user_insert_list.append(account.full_name)
        user_insert_list.append(account.base_url)
        user_insert_list.append(account.biography)
        user_insert_list.append(account.followers_count)
        user_insert_list.append(account.follows_count)
        user_insert_list.append(account.media_count)
        user_insert_list.append(account_total_comments)
        user_insert_list.append(account_total_likes)
        user_insert_list.append(int(account.is_verified))
        user_insert_list.append(int(account.country_block))
        user_insert_list.append(account.profile_pic_url)
        user_insert_list.append(account.profile_pic_url_hd)
        if account.fb_page is not None:
            user_insert_list.append(account.fb_page)
        else:
            user_insert_list.append('')
        try:
            client.execute(f'''
                INSERT INTO users VALUES {tuple(user_insert_list)}
            ''')
        except Exception as E:
            print('users:')
            print(E)
            print(user_insert_list)

Результаты
Таким методом нам удалось собрать 500 пользователей, 20 тысяч постов и 40 тысяч комментариев. Теперь можем написать простой запрос к базе и получить топ-10 Instagram-аккаунтов по теме аналитики и машинного обучения за последнее время:

SELECT *
FROM users
ORDER BY followers_count DESC
LIMIT 10

А вот и приятный бонус, для тех, кто искал на какие аккаунты в Instagram подписаться по релевантной тематике:

  1. @ai_machine_learning
  2. @neuralnine
  3. @datascienceinfo
  4. @compscistuff
  5. @computersciencelife
  6. @welcome.ai
  7. @papa_programmer
  8. @data_science_learn
  9. @neuralnet.ai
  10. @techno_thinkers

Полный код проекта доступен на GitHub

Анализ рынка вакансий аналитики и BI: дашборд в Tableau

Время чтения текста – 16 минут

По данным рейтинга SimilarWeb, hh.ru — третий по популярности сайт о трудоустройстве в мире. В одном из разговоров с Ромой Буниным у нас появилась идея сделать совместный проект: собрать данные из открытого HeadHunter API и визуализировать их при помощи Tableau Public. Нам захотелось понять, как меняется зарплата в зависимости от указанных в вакансии навыков, наименования позиции и сравнить, как обстоят дела в Москве, Санкт-Петербурге и регионах.

Как мы собирали данные?

Схема данных основана на коротком представлении вакансии, которую возвращает метод GET /vacancies. Из представления собираются следующие поля: тип вакансии, идентификатор, премиальность вакансии, необходимость прохождения тестирования, адрес компании, информация о зарплате, график работы и другие. Соответствующий CREATE-запрос для таблицы:


Запрос создания таблицы vacancies_short

CREATE TABLE headhunter.vacancies_short
(
    `added_at` DateTime,
    `query_string` String,
    `type` String,
    `level` String,
    `direction` String,
    `vacancy_id` UInt64,
    `premium` UInt8,
    `has_test` UInt8,
    `response_url` String,
    `address_city` String,
    `address_street` String,
    `address_building` String,
    `address_description` String,
    `address_lat` String,
    `address_lng` String,
    `address_raw` String,
    `address_metro_stations` String,
    `alternate_url` String,
    `apply_alternate_url` String,
    `department_id` String,
    `department_name` String,
    `salary_from` Nullable(Float64),
    `salary_to` Nullable(Float64),
    `salary_currency` String,
    `salary_gross` Nullable(UInt8),
    `name` String,
    `insider_interview_id` Nullable(UInt64),
    `insider_interview_url` String,
    `area_url` String,
    `area_id` UInt64,
    `area_name` String,
    `url` String,
    `published_at` DateTime,
    `employer_url` String,
    `employer_alternate_url` String,
    `employer_logo_urls_90` String,
    `employer_logo_urls_240` String,
    `employer_logo_urls_original` String,
    `employer_name` String,
    `employer_id` UInt64,
    `response_letter_required` UInt8,
    `type_id` String,
    `type_name` String,
    `archived` UInt8,
    `schedule_id` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY vacancy_id

Первый скрипт собирает данные с HeadHunter по API и отправляет их в Clickhouse. Он использует следующие библиотеки:

import requests
from clickhouse_driver import Client
from datetime import datetime
import pandas as pd
import re

Далее загружаем таблицу с запросами и подключаемся к CH:

queries = pd.read_csv('hh_data.csv')
client = Client(host='1.234.567.890', user='default', password='', port='9000', database='headhunter')

Таблица queries хранит список поисковых запросов. Она содержит следующие колонки: тип запроса, уровень вакансии для поиска, направление вакансии и саму поисковую фразу. В строку с запросом можно помещать логические операторы: например, чтобы найти вакансии, в которых должны присутствовать ключевые слова «Python», «data» и «анализ» между ними можно указать логическое «И».

Не всегда вакансии в выдаче соответствуют ожиданиям: случайно в базу могут попасть повара, маркетологи и администраторы магазина. Чтобы этого не произошло, опишем функцию check_name(name) — она будет принимать наименование вакансии и возвращать True в случае, если вакансия не подошла по названию.

def check_name(name):
    bad_names = [r'курьер', r'грузчик', r'врач', r'менеджер по закупу',
           r'менеджер по продажам', r'оператор', r'повар', r'продавец',
          r'директор магазина', r'директор по продажам', r'директор по маркетингу',
          r'кабельщик', r'начальник отдела продаж', r'заместитель', r'администратор магазина', 
          r'категорийный', r'аудитор', r'юрист', r'контент', r'супервайзер', r'стажер-ученик', 
          r'су-шеф', r'маркетолог$', r'региональный', r'ревизор', r'экономист', r'ветеринар', 
          r'торговый', r'клиентский', r'начальник цеха', r'территориальный', r'переводчик', 
          r'маркетолог /', r'маркетолог по']
    for item in bad_names:
        if re.match(item, name):
            return True

Затем объявляем бесконечный цикл — мы собираем данные без перерыва. Идём по DataFrame queries и сразу забираем оттуда тип вакансии, уровень, направление и поисковый запрос в отдельные переменные. Сначала по ключевому слову отправляем один запрос к методу /GET vacancies и получаем количество страниц. После идём от нулевой до последней страницы, отправляем те же запросы и заполняем список vacancies_from_response с полученными в выдаче короткими представлениями всех вакансий. В параметрах указываем 10 вакансий на страницу — больше ограничения HH API получить не позволяют. Так как мы не указали параметр area, API возвращает вакансии по всему миру.

while True:
   for query_type, level, direction, query_string in zip(queries['Тип'], queries['Уровень'], queries['Направление'], queries['Ключевое слово']):
           print(f'ключевое слово: {query_string}')
           url = 'https://api.hh.ru/vacancies'
           par = {'text': query_string, 'per_page':'10', 'page':0}
           r = requests.get(url, params=par).json()
           added_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
           pages = r['pages']
           found = r['found']
           vacancies_from_response = []

           for i in range(0, pages + 1):
               par = {'text': query_string, 'per_page':'10', 'page':i}
               r = requests.get(url, params=par).json()
               try:
                   vacancies_from_response.append(r['items'])
               except Exception as E:
                   continue

Теперь проходим по каждой вакансии на каждой странице двойным итератором. Сперва отправим запрос к Clickhouse и проверим, нет ли уже в базе вакансии с таким идентификатором и таким поисковым запросом. Если проверка пройдена — проверяем название вакансии. В случае неудачи переходим к следующей.

for item in vacancies_from_response:
               for vacancy in item:
                   if client.execute(f"SELECT count(1) FROM vacancies_short WHERE vacancy_id={vacancy['id']} AND query_string='{query_string}'")[0][0] == 0:
                       name = vacancy['name'].replace("'","").replace('"','')
                       if check_name(name):
                           continue

Теперь проходим по вакансии и собираем все нужные поля. В случае отсутствия некоторых данных будем отправлять пустые строки:


Код для сбора данных о вакансии

vacancy_id = vacancy['id']
                       is_premium = int(vacancy['premium'])
                       has_test = int(vacancy['has_test'])
                       response_url = vacancy['response_url']
                       try:
                           address_city = vacancy['address']['city']
                           address_street = vacancy['address']['street']
                           address_building = vacancy['address']['building']
                           address_description = vacancy['address']['description']
                           address_lat = vacancy['address']['lat']
                           address_lng = vacancy['address']['lng']
                           address_raw = vacancy['address']['raw']
                           address_metro_stations = str(vacancy['address']['metro_stations']).replace("'",'"')
                       except TypeError:
                           address_city = ""
                           address_street = ""
                           address_building = ""
                           address_description = ""
                           address_lat = ""
                           address_lng = ""
                           address_raw = ""
                           address_metro_stations = ""
                       alternate_url = vacancy['alternate_url']
                       apply_alternate_url = vacancy['apply_alternate_url']
                       try:
                           department_id = vacancy['department']['id']
                       except TypeError as E:
                           department_id = ""
                       try:
                           department_name = vacancy['department']['name']
                       except TypeError as E:
                           department_name = ""
                       try:
                           salary_from = vacancy['salary']['from']
                       except TypeError as E:
                           salary_from = "cast(Null as Nullable(UInt64))"
                       try:
                           salary_to = vacancy['salary']['to']
                       except TypeError as E:
                           salary_to = "cast(Null as Nullable(UInt64))"
                       try:
                           salary_currency = vacancy['salary']['currency']
                       except TypeError as E:
                           salary_currency = ""
                       try:
                           salary_gross = int(vacancy['salary']['gross'])
                       except TypeError as E:
                           salary_gross = "cast(Null as Nullable(UInt8))"
                       try:
                           insider_interview_id = vacancy['insider_interview']['id']
                       except TypeError:
                           insider_interview_id = "cast(Null as Nullable(UInt64))"
                       try:
                           insider_interview_url = vacancy['insider_interview']['url']
                       except TypeError:
                           insider_interview_url = ""
                       area_url = vacancy['area']['url']
                       area_id = vacancy['area']['id']
                       area_name = vacancy['area']['name']
                       url = vacancy['url']
                       published_at = vacancy['published_at']
                       published_at = datetime.strptime(published_at,'%Y-%m-%dT%H:%M:%S%z').strftime('%Y-%m-%d %H:%M:%S')
                       try:
                           employer_url = vacancy['employer']['url']
                       except Exception as E:
                           print(E)
                           employer_url = ""
                       try:
                           employer_alternate_url = vacancy['employer']['alternate_url']
                       except Exception as E:
                           print(E)
                           employer_alternate_url = ""
                       try:
                           employer_logo_urls_90 = vacancy['employer']['logo_urls']['90']
                           employer_logo_urls_240 = vacancy['employer']['logo_urls']['240']
                           employer_logo_urls_original = vacancy['employer']['logo_urls']['original']
                       except Exception as E:
                           print(E)
                           employer_logo_urls_90 = ""
                           employer_logo_urls_240 = ""
                           employer_logo_urls_original = ""
                       employer_name = vacancy['employer']['name'].replace("'","").replace('"','')
                       try:
                           employer_id = vacancy['employer']['id']
                       except Exception as E:
                           print(E)
                       response_letter_required = int(vacancy['response_letter_required'])
                       type_id = vacancy['type']['id']
                       type_name = vacancy['type']['name']
                       is_archived = int(vacancy['archived'])

Последнее поле — график работы. В случае, если вакансия подразумевает вахтовый метод работы она нам точно не подходит.

try:
    schedule = vacancy['schedule']['id']
except Exception as E:
    print(E)
    schedule = ''"
if schedule == 'flyInFlyOut':
    continue

Теперь формируем список из полученных переменных, заменяем в нём None-значения на пустые строки во избежании конфликтов с Clickhouse и вставляем строку в таблицу.

vacancies_short_list = [added_at, query_string, query_type, level, direction, vacancy_id, is_premium, has_test, response_url, address_city, address_street, address_building, address_description, address_lat, address_lng, address_raw, address_metro_stations, alternate_url, apply_alternate_url, department_id, department_name,
salary_from, salary_to, salary_currency, salary_gross, insider_interview_id, insider_interview_url, area_url, area_name, url, published_at, employer_url, employer_logo_urls_90, employer_logo_urls_240,  employer_name, employer_id, response_letter_required, type_id, type_name, is_archived, schedule]
for index, item in enumerate(vacancies_short_list):
    if item is None:
        vacancies_short_list[index] = ""
tuple_to_insert = tuple(vacancies_short_list)
print(tuple_to_insert)
client.execute(f'INSERT INTO vacancies_short VALUES {tuple_to_insert}')

Как подключили Tableau к данным?

Tableau Public не умеет работать с базами данных, поэтому мы написали коннектор Clickhouse к Google Sheets. Он использует библиотеки gspread и oauth2client для авторизации в Google Spreadsheets API и библиотеку schedule для ежедневной работы по графику.

Работа с Google Spreadseets API подробно разобрана в материале «Собираем данные по рекламным кампаниям ВКонтакте»

import schedule
from clickhouse_driver import Client
import gspread
import pandas as pd
from oauth2client.service_account import ServiceAccountCredentials
from datetime import datetime

scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
client = Client(host='54.227.137.142', user='default', password='', port='9000', database='headhunter')
creds = ServiceAccountCredentials.from_json_keyfile_name('credentials.json', scope)
gc = gspread.authorize(creds)

Опишем функцию update_sheet() — она будет брать все данные из Clickhouse и вставлять их в таблицу Google Docs.

def update_sheet():
   print('Updating cell at', datetime.now())
   columns = []
   for item in client.execute('describe table headhunter.vacancies_short'):
       columns.append(item[0])
   vacancies = client.execute('SELECT * FROM headhunter.vacancies_short')
   df_vacancies = pd.DataFrame(vacancies, columns=columns)
   df_vacancies.to_csv('vacancies_short.csv', index=False)
   content = open('vacancies_short.csv', 'r').read()
   gc.import_csv('1ZWS2kqraPa4i72hzp0noU02SrYVo0teD7KZ0c3hl-UI', content.encode('utf-8'))

Чтобы скрипт запускался в 16:00 по МСК каждый день используем библиотеку schedule:

schedule.every().day.at("13:00").do(update_sheet)
while True:
   schedule.run_pending()

А что в результате?

Рома построил на полученных данных дашборд.

И в youtube-ролике рассказывает о том, как эффективно использовать дашборд

Инсайты, которые можно извлечь из дашборда

  1. Аналитики с навыком бизнес-аналитики востребованы на рынке больше всего: по такому запросу нашлось больше всего вакансий. Тем не менее, средняя зарплата выше у продуктовых аналитиков и аналитиков BI.
  2. В Москве средние зарплаты выше на 10-30 тысяч рублей, чем в Санкт-Петербурге и на 30-40 тысячи рублей, чем в регионах. Там же работы нашлось больше всего в России.
  3. Самые высокооплачиваемые должности: руководитель отдела аналитики (в среднем, 110 тыс. руб. в месяц), инженер баз данных (138 тыс. руб. в месяц) и директор по машинному обучению (250 тыс. руб. в месяц).
  4. Самые полезные навыки на рынке — владение Python c библиотеками pandas и numpy, Tableau, Power BI, Etl и Spark. Вакансий с такими требованиями больше и зарплаты в них указаны выше прочих. Для Python-программистов знание matplotlib ценится на рынке выше, чем владение plotly.

Полный код проекта доступен на GitHub

Обрабатываем нажатие кнопки в Selenium

Время чтения текста – 10 минут

В материале «Парсим данные, используя Buetiful Soup и Selenium» мы уже рассмотрели, как быть, когда данные на странице динамически подгружаются при скролле страницы. Но бывают ситуации, когда новые данные можно получить, только нажав на кнопку «Показать ещё» — сегодня узнаем, как через Selenium сымитировать нажатие кнопки для полного открытия страницы, соберём идентификаторы пива, оценки к каждому продукту и отправим данные в Clickhouse

Структура страницы

Возьмём случайную пивоварню — у неё 105 чекинов, то есть, отзывов. Страница с чекинами пивоварни показывает не более 25 чекинов и выглядит так:

Если попробуем промотать в самый низ, столкнёмся с той самой кнопкой, мешающей нам взять все 105 за раз:

Мы поступим так: выясним, к какому классу относится элемент кнопки и будем на неё нажимать, пока это возможно. Так как Selenium запускает браузер, следующая кнопка «Показать ещё» может не успеть прогрузиться, поэтому между нажатиями поставим интервал в пару секунд. Как только страница раскроется полностью — мы возьмём её содержимое и распарсим нужные данные из чекинов. Зайдём в код страницы и найдём кнопку — она относится к классу more_checkins.

У кнопки есть свойства стиля, а именно — display. В случае, если кнопка должна отображаться, display принимает значение block. Но когда промотаем страницу до самого конца, кнопку не нужно будет показывать, ведь открывать больше нечего — поэтому display кнопки примет значение none. В случае, если мы запросим у кнопки display и вернётся none будем знать, что открывать больше нечего и можно перестать жать на кнопку.

Пишем код

Начнём с импорта библиотек:

import time
from selenium import webdriver
from bs4 import BeautifulSoup as bs
import re
from datetime import datetime
from clickhouse_driver import Client

Chromedriver, необходимый для запуска браузера через Selenium, можно установить с официальной страницы

Подключимся к базе данных, зададим cookies:

client = Client(host='ec1-23-456-789-10.us-east-2.compute.amazonaws.com', user='', password='', port='9000', database='')
count = 0
cookies = {
    'domain':'untappd.com',
    'expiry':1594072726,
    'httpOnly':True,
    'name':'untappd_user_v3_e',
    'path':'/',
    'secure':False,
    'value':'your_value'
}

О том, как запускать Selenium с cookies можно прочитать в материале «Парсим данные каталога сайта, используя Beautiful Soup и Selenium». Нам нужен параметр untappd_user_v3_e.

Так как мы планируем работать с пивоварнями, у которых более сотни тысяч чекинов, может оказаться так, что страница будет чересчур тяжёлой, и нагрузка на машину будет огромна. Чтобы этого избежать, отключим всё лишнее, а затем подключим cookie для авторизации:

options = webdriver.ChromeOptions()
prefs = {'profile.default_content_setting_values': {'images': 2, 
                            'plugins': 2, 'fullscreen': 2}}
options.add_experimental_option('prefs', prefs)
options.add_argument("start-maximized")
options.add_argument("disable-infobars")
options.add_argument("--disable-extensions")
driver = webdriver.Chrome(options=options)
driver.get('https://untappd.com/TooSunnyBrewery')
driver.add_cookie(cookies)

Напишем функцию, которая принимает ссылку, переходит по ней, полностью раскрывает страницу и возвращает нам soup, который можно будет распарсить. Получим display кнопки и запишем в переменную more_checkins: пока он не равен none будем нажимать на кнопку и снова получать её display. Сделаем интервал в две секунды между нажатиями, чтобы подождать прогрузку страницы. Как только будет получена вся страница, переведём её в soup библиотекой bs4.

def get_html_page(url):
    driver.get(url)
    driver.maximize_window()
    more_checkins = driver.execute_script("var more_checkins=document.getElementsByClassName('more_checkins_logged')[0].style.display;return more_checkins;")
    print(more_checkins)
    while more_checkins != "none":
        driver.execute_script("document.getElementsByClassName('more_checkins_logged')[0].click()")
        time.sleep(2)
        more_checkins = driver.execute_script("var more_checkins=document.getElementsByClassName('more_checkins_logged')[0].style.display;return more_checkins;")
        print(more_checkins)
    source_data = driver.page_source
    soup = bs(source_data, 'lxml')
    return soup

Напишем следующую функцию: она тоже будет принимать url страницы, передавать его в get_html_page, получать soup и парсить его. Функция вернёт запакованные списки с идентификатором пива и оценкой к нему.

О том, как парсить элементы страницы мы уже говорили в материале «Парсим данные каталога сайта, используя Beautiful Soup».

def parse_html_page(url):
    soup = get_html_page(url)
    brewery_id = soup.find_all('a', {'class':'label',
                                     'href':re.compile('https://untappd.com/brewery/*')})[0]['href'][28:]
    items = soup.find_all('div', {'class':'item',
                                  'id':re.compile('checkin_*')})
    checkin_rating_list = []
    beer_id_list = []
    count = 0
    print('Заполняю списки')
    for checkin in items:
        print(count, '/', len(items))
        try:
            checkin_rating_list.append(float(checkin.find('div', {'class':'caps'})['data-rating']))
        except Exception:
            checkin_rating_list.append('cast(Null as Nullable(Float32))')
        try:
            beer_id_list.append(int(checkin.find('a', {'class':'label'})['href'][-7:]))
        except Exception:
            beer_id_list.append('cast(Null as Nullable(UInt64))')
        count += 1 
    return zip(checkin_rating_list, beer_id_list)

Наконец, напишем вызов функций по пивоварням. В материале «Использование словарей в Clickhouse на примере данных Untappd» мы уже рассмотрели, как получить список идентификаторов российских пивоварен — обратимся к нему через таблицу в Clickhouse

brewery_list = client.execute('SELECT brewery_id FROM brewery_info')

Если посмотрим на brewery_list, то узнаем, что данные вернулись в неудобном формате: это список кортежей.

Небольшое лямбда-выражение позволит его «выпрямить»:

flatten = lambda lst: [item for sublist in lst for item in sublist]
brewery_list = flatten(brewery_list)

Работать с таким списком значительно комфортнее:

Для каждой пивоварни в списке сформируем url — он состоит из стандартной ссылки и идентификатора пивоварни в конце. Отправим url в функцию parse_html_page, которая сама вызовет get_html_page и вернёт списки с beer_id и rating_score. Так как два списка вернутся упакованными можем пройти по ним итератором, сформировав кортеж и отправив его в Clickhouse.

for brewery_id in brewery_list:
    print('Беру пивоварню с id', brewery_id, count, '/', len(brewery_list))
    url = 'https://untappd.com/brewery/' + str(brewery_id)
    returned_checkins = parse_html_page(url)
    for rating, beer_id in returned_checkins:
        tuple_to_insert = (rating, beer_id)
        try:
            client.execute(f'INSERT INTO beer_reviews VALUES {tuple_to_insert}')
        except errors.ServerException as E:
            print(E)
    count += 1

С кнопками на этом всё. Постепенно мы формируем отличный датасет для последующего анализа, который рассмотрим в следующем цикле статей, как только завершим сбор данных — возможно, через месяц.

Использование словарей в Clickhouse на примере данных Untappd

Время чтения текста – 15 минут

В Clickhouse реализована возможность использования внутренних и внешних словарей, которые могут быть альтернативой JOIN (которые, к сожалению, не всегда здорово работают). Словари хранят информацию в памяти и к ним можно обратиться командой dictGet. Рассмотрим как создать словарь в Clickhouse и как его можно использовать в запросах.

Будем изучать функционал на примере данных из API Untappd. Untappd — социальная сеть любителей крафтового пива. Мы сфокусируемся на чекинах российких крафтовых пивоварен, начнем собирать информацию о них, чтобы в следующих постах проанализировать данные и сделать некоторые выводы. В рамках этого поста разберем получение мета-информации о российских пивоварнях на Untappd, а полученные данные сохраним в словаре Clickhouse.

Собираем данные с Untappd

Для обращений к API нужны client_id и  client_secret_key — их можно получить, создав приложение. Для этого переходим в раздел создания приложения в документации и указываем некоторые данные:

После отправления заявки нужно будет подождать некоторое время: от 1 до 3 недель.

import requests
import pandas as pd
import time

Отправлять запросы к API будем через requests, а в pandas посмотрим на результаты и выгрузим в csv, чтобы отправить в словарь Clickhouse. У Untappd строгие ограничения на количество запросов: всего в час можно отправить 100 запросов, поэтому будем библиотекой time ставить скрипт в ожидание на 38 секунд, чтобы число запросов в час не превосходило 100.

client_id = 'ваш_client_id'
client_secret = 'ваш_client_secret'
all_brewery_of_russia = []

Мы хотим собрать всю тысячу российских пивоварен. Один запрос к методу Brewery Search позволяет получить до 50 пивоварен. При поиске вручную на сайте Untappd по слову «Russia» сайт выдаст 3369 пивоварен:

Проверим это: пролистаем страницу до самого низа и откроем код страницы.

Каждая полученная пивоварня в поиске находится в классе beer-item. Значит, можем в поиске посчитать количество упоминаний beer-item:

И выясняем, что на самом деле их здесь ровно 1000, а не 3369. По запросу Russia в выборку попадают и американские пивоварни, а некоторые были удалены. Значит, придётся отправить 20 запросов, будем получать по 50 пивоварен за раз:

for offset in range(0, 1000, 50):
    try:
        print('offset = ', offset)
        print('осталось:', 1000 - offset, '\n')
        response = requests.get(f'https://api.untappd.com/v4/search/brewery?client_id={client_id}&client_secret={client_secret}',
                               params={
                                   'q':'Russia',
                                   'offset':offset,
                                   'limit':50
                               })
        item = response.json()
        print(item, '\n')
        all_brewery_of_russia.append(item)
        time.sleep(37)
    except Exception:
        print(Exception)
        continue

В параметрах метод Brewery Search принимает q — строку, по которой будем осуществлять поиск на сервисе. Укажем в ней «Russia», чтобы получить все пивоварни, связанные с Россией. Другой параметр — offset — отвечает за смещение. Получив первые 50 пивоварен мы смещаемся на 50 строк в поиске, чтобы получить следующие 50 пивоварен. limit отвечает за количество получаемых пивоварен и не может быть больше 50.
Преобразовываем ответ в формат json и добавляем полученные данные в список all_brewery_of_russia. Объект item будет содержать такие данные:

Но в полученных данных могли затесаться и пивоварни других стран. Отфильтруем их: пройдём итератором по всему списку all_brewery_of_russia и добавим в итоговый только те пивоварни, у которых параметр country_name принимает значение Russia.

brew_list = []
for element in all_brewery_of_russia:
    brew = element['response']['brewery']
    for i in range(brew['count']):
        if brew['items'][i]['brewery']['country_name'] == 'Russia':
            brew_list.append(brew['items'][i])

Посмотрим на первый элемент списка brew_list:

print(brew_list[0])

Соберём из списка DataFrame с колонками brewery_id, beer_count, brewery_name, brewery_slug, brewery_page_url, brewery_city, lat и  lng. Получим в отдельные списки данные из  brewery_list:

df = pd.DataFrame()
brewery_id_list = []
beer_count_list = []
brewery_name_list = []
brewery_slug_list = []
brewery_page_url_list = []
brewery_location_city = []
brewery_location_lat = []
brewery_location_lng = []
for brewery in brew_list:
    brewery_id_list.append(brewery['brewery']['brewery_id'])
    beer_count_list.append(brewery['brewery']['beer_count'])
    brewery_name_list.append(brewery['brewery']['brewery_name'])
    brewery_slug_list.append(brewery['brewery']['brewery_slug'])
    brewery_page_url_list.append(brewery['brewery']['brewery_page_url'])
 brewery_location_city.append(brewery['brewery']['location']['brewery_city'])
    brewery_location_lat.append(brewery['brewery']['location']['lat'])
    brewery_location_lng.append(brewery['brewery']['location']['lng'])

И отправим их в DataFrame:

df['brewery_id'] = brewery_id_list
df['beer_count'] = beer_count_list
df['brewery_name'] = brewery_name_list
df['brewery_slug'] = brewery_slug_list
df['brewery_page_url'] = brewery_page_url_list
df['brewery_city'] = brewery_location_city
df['brewery_lat'] = brewery_location_lat
df['brewery_lng'] = brewery_location_lng

Посмотрим, как выглядит наша таблица:

df.head()

Отсортируем значения по  brewery_id и выгрузим таблицу в формате csv без столбца с индексами и заголовков колонок:

df = df.sort_values(by='brewery_id')
df.to_csv('brewery_data.csv', index=False, header=False)

Создаём словарь Clickhouse

Словари для Clickhouse можно создавать по-разному. Мы попробуем задать его структуру в xml-файле, настроить конфигурационные файлы сервера и обращаться к нему через клиент. Наш xml будет иметь следующую структуру:

Со всеми способами создания словарей можно ознакомиться в документации

<yandex>
<dictionary>
        <name>breweries</name>
        <source>
                <file>
                        <path>/home/ubuntu/brewery_data.csv</path>
                        <format>CSV</format>
                </file>
        </source>
        <layout>
                <flat />
        </layout>
        <structure>
                <id>
                        <name>brewery_id</name>
                </id>
                <attribute>
                        <name>beer_count</name>
                        <type>UInt64</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_name</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_slug</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_page_url</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_city</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>lat</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>lng</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
        </structure>
        <lifetime>300</lifetime>
</dictionary>
</yandex>

Под  идёт имя словаря. В  указываем свойства колонок. Под тегом идёт ключевое поле, а под тегом укажем путь и формат файла. Скоро мы положим его в папку /home/ubuntu, поэтому так и укажем.

Загрузим нашу csv-таблицу и xml-файл на сервер, это можно сделать, например, по ftp через FileZilla. В одном из материалов мы учились ставить Clickhouse на бесплатную машину от Amazon, в этот раз будем работать там же. В FileZilla заходим в настройки SFTP и добавляем файл с ключом:

И подключаемся к серверу по адресу, который указан в консоли EC2 машины на AWS. Укажем протокол SFTP, свой Host и в качестве User — Ubuntu:

В случае перезагрузки машины через консоль Public DNS мог измениться

После подсоединения мы попадём в папку /home/ubuntu сервера. Положим файлы туда же. Теперь подключимся по SSH через Termius. Чтобы Clickhouse увидел файл со структурой словаря, его нужно положить в папку /etc/clickhouse-server:

О том, как подключаться в серверу на AWS через SSH-клиент мы рассказывали в материале «Устанавливаем Clickhouse на AWS»

sudo mv breweries_dictionary.xml /etc/clickhouse server/

Переходим в конфигурационный файл:

cd /etc/clickhouse-server
sudo nano config.xml

Нам нужен тег  — он указывает путь к файлу, который описывает структуру словарей. Укажем путь к нашему xml:

<dictionaries_config>/etc/clickhouse-server/breweries_dictionary.xml</dictionaries_config>

Сохраняем файл и запускаем клиент Clickhouse:

clickhouse client

Проверим, что наш словарь действительно загрузился:

SELECT * FROM system.dictionaries\G

В случае успеха получим подобное:

Напишем запрос к функции dictGet, чтобы получить название пивоварни под ID 999. Указываем первым аргументом наименование словаря, затем поле, значение которого хотим получить и ID.

SELECT dictGet('breweries', 'brewery_name', toUInt64(999))

Если сделаем всё правильно, то выясним, что под ID 999 находится Балтика:

Аналогичным образом удобно использовать функцию, когда в таблице с фактами хранится только ID измерения для получения понятного наименования.

Ранее Ctrl + ↓