Собираем топ-10 аккаунтов Instagram по теме аналитики и машинного обучения

Время чтения текста – 11 минут

В некоторых телеграм-каналах (раз, два) уже говорилось про другие интересные паблики в телеграме, однако по Instagram такого топа пока не было. Вероятно, это не самая популярная сеть для контента в нашей индустрии, тем не менее, можно проверить эту гипотезу, используя Python и данные. В этом материале рассказываем, как собрать данные по аккаунтам Instagram без API.

Метод сбора данных
Instagram API не позволит вам просто так собирать данные о других пользователях, но есть и другой метод. Можно отправить такой request-запрос:

https://instagram.com/leftjoin/?__a=1

И получить в ответе JSON-объект со всей информацией о пользователе, которую можно посмотреть самому: имя аккаунта, количество постов, подписок и подписчиков, а также первые десять постов с информацией про них: количество лайков, комментарии и прочее. Именно на таких request-запросах устроена библиотека pyInstagram.

Схема данных
Будем собирать данные в три таблицы Clickhouse: пользователи, посты и комментарии. В таблицу пользователей собираем всю информацию о них: идентификатор, наименование аккаунта, имя и фамилия человека, описание профиля, количество подписок и подписчиков, количество постов, суммарное количество комментариев и лайков, наличие верификации, география пользователя и ссылки на аватарку и Facebook.

CREATE TABLE instagram.users
(
    `added_at` DateTime,
    `user_id` UInt64,
    `user_name` String,
    `full_name` String,
    `base_url` String,
    `biography` String,
    `followers_count` UInt64,
    `follows_count` UInt64,
    `media_count` UInt64,
    `total_comments` UInt64,
    `total_likes` UInt64,
    `is_verified` UInt8,
    `country_block` UInt8,
    `profile_pic_url` Nullable(String),
    `profile_pic_url_hd` Nullable(String),
    `fb_page` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

В таблицу с постами сохраняем автора поста, идентификатор записи, текст, количество комментариев и прочее. is_ad, is_album и is_video — поля, проверяющие, является ли запись рекламной, «каруселью» изображений или видеозаписью.

CREATE TABLE instagram.posts
(
    `added_at` DateTime,
    `owner` String,
    `post_id` UInt64,
    `caption` Nullable(String),
    `code` String,
    `comments_count` UInt64,
    `comments_disabled` UInt8,
    `created_at` DateTime,
    `display_url` String,
    `is_ad` UInt8,
    `is_album` UInt8,
    `is_video` UInt8,
    `likes_count` UInt64,
    `location` Nullable(String),
    `recources` Array(String),
    `video_url` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

В таблице с комментариями храним отдельно каждый комментарий к записи с автором и текстом.

CREATE TABLE instagram.comments
(
    `added_at` DateTime,
    `comment_id` UInt64,
    `post_id` UInt64,
    `comment_owner` String,
    `comment_text` String
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

Скрипт
Из библиотеки pyInstagram нам понадобятся классы Account, Media, WebAgent и Comment.

from instagram import Account, Media, WebAgent, Comment
from datetime import datetime
from clickhouse_driver import Client
import requests
import pandas as pd

Создаем экземпляр класса WebAgent — он необходим для вызова некоторых методов и обновления аккаунтов. В начале нам нужно иметь хотя бы названия профилей пользователей, информацию о которых мы хотим собрать, поэтому отправим другой request-запрос для поиска пользователей по ключевым словам, их список ниже в фрагменте кода. В выдаче будут аккаунты, у которых название или описание профиля совпало с ключевым словом.

agent = WebAgent()
queries_list = ['machine learning', 'data science', 'data analytics', 'analytics', 'business intelligence',
                'data engineering', 'computer science', 'big data', 'artificial intelligence',
                'deep learning', 'data scientist','machine learning engineer', 'data engineer']
client = Client(host='54.227.137.142', user='default', password='', port='9000', database='instagram')
url = 'https://www.instagram.com/web/search/topsearch/?context=user&count=0'

Проходим по всем ключевым словам и собираем все аккаунты. Так как в списке могли образоваться дубликаты, переведём список в множество и обратно в список.

response_list = []
for query in queries_list:
    response = requests.get(url, params={
        'query': query
    }).json()
    response_list.extend(response['users'])
instagram_pages_list = []
for item in response_list:
    instagram_pages_list.append(item['user']['username'])
instagram_pages_list = list(set(instagram_pages_list))

Теперь проходим по списку аккаунтов, и если аккаунта с таким наименованием ещё не было в базе, то получаем расширенную информацию о нём. Для этого пробуем создать экземпляр класса Account, передав username параметром. После при помощи объекта agent обновляем информацию об аккаунте. Будем собирать только первые 100 постов, чтобы сбор не задерживался. Создадим список media_list — он при помощи метода get_media будет хранить код каждого поста, который затем можно будет получить при помощи класса Media.


Сбор медиа аккаунта

all_posts_list = []
username_count = 0
for username in instagram_pages_list:
    if client.execute(f"SELECT count(1) FROM users WHERE user_name='{username}'")[0][0] == 0:
        print('username:', username_count, '/', len(instagram_pages_list))
        username_count += 1
        account_total_likes = 0
        account_total_comments = 0
        try:
            account = Account(username)
        except Exception as E:
            print(E)
            continue
        try:
            agent.update(account)
        except Exception as E:
            print(E)
            continue
        if account.media_count < 100:
            post_count = account.media_count
        else:
            post_count = 100
        print(account, post_count)
        media_list, _ = agent.get_media(account, count=post_count, delay=1)
        count = 0

Мы начинаем с постов и комментариев, потому что для занесения в базу нового пользователя нам нужно подсчитать сперва суммарное количество комментариев и лайков в его аккаунте. Практически все интересующие поля являются атрибутами класса Media.


Сбор постов пользователя

for media_code in media_list:
            if client.execute(f"SELECT count(1) FROM posts WHERE code='{media_code}'")[0][0] == 0:
                print('posts:', count, '/', len(media_list))
                count += 1

                post_insert_list = []
                post = Media(media_code)
                agent.update(post)
                post_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                post_insert_list.append(str(post.owner))
                post_insert_list.append(post.id)
                if post.caption is not None:
                    post_insert_list.append(post.caption.replace("'","").replace('"', ''))
                else:
                    post_insert_list.append("")
                post_insert_list.append(post.code)
                post_insert_list.append(post.comments_count)
                post_insert_list.append(int(post.comments_disabled))
                post_insert_list.append(datetime.fromtimestamp(post.date).strftime('%Y-%m-%d %H:%M:%S'))
                post_insert_list.append(post.display_url)
                try:
                    post_insert_list.append(int(post.is_ad))
                except TypeError:
                    post_insert_list.append('cast(Null as Nullable(UInt8))')
                post_insert_list.append(int(post.is_album))
                post_insert_list.append(int(post.is_video))
                post_insert_list.append(post.likes_count)
                if post.location is not None:
                    post_insert_list.append(post.location)
                else:
                    post_insert_list.append('')
                post_insert_list.append(post.resources)
                if post.video_url is not None:
                    post_insert_list.append(post.video_url)
                else:
                    post_insert_list.append('')
                account_total_likes += post.likes_count
                account_total_comments += post.comments_count
                try:
                    client.execute(f'''
                        INSERT INTO posts VALUES {tuple(post_insert_list)}
                    ''')
                except Exception as E:
                    print('posts:')
                    print(E)
                    print(post_insert_list)

Чтобы собрать комментарии необходимо вызвать метод get_comments и передать параметром экземпляр класса Media.


Сбор комментариев из поста

comments = agent.get_comments(media=post)
                for comment_id in comments[0]:
                    comment_insert_list = []
                    comment = Comment(comment_id)
                    comment_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                    comment_insert_list.append(comment.id)
                    comment_insert_list.append(post.id)
                    comment_insert_list.append(str(comment.owner))
                    comment_insert_list.append(comment.text.replace("'","").replace('"', ''))
                    try:
                        client.execute(f'''
                            INSERT INTO comments VALUES {tuple(comment_insert_list)}
                        ''')
                    except Exception as E:
                        print('comments:')
                        print(E)
                        print(comment_insert_list)


Наконец, когда все посты и комментарии пройдены, можем занести информацию о пользователе.

Сбор информации о пользователе

user_insert_list = []
        user_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
        user_insert_list.append(account.id)
        user_insert_list.append(account.username)
        user_insert_list.append(account.full_name)
        user_insert_list.append(account.base_url)
        user_insert_list.append(account.biography)
        user_insert_list.append(account.followers_count)
        user_insert_list.append(account.follows_count)
        user_insert_list.append(account.media_count)
        user_insert_list.append(account_total_comments)
        user_insert_list.append(account_total_likes)
        user_insert_list.append(int(account.is_verified))
        user_insert_list.append(int(account.country_block))
        user_insert_list.append(account.profile_pic_url)
        user_insert_list.append(account.profile_pic_url_hd)
        if account.fb_page is not None:
            user_insert_list.append(account.fb_page)
        else:
            user_insert_list.append('')
        try:
            client.execute(f'''
                INSERT INTO users VALUES {tuple(user_insert_list)}
            ''')
        except Exception as E:
            print('users:')
            print(E)
            print(user_insert_list)

Результаты
Таким методом нам удалось собрать 500 пользователей, 20 тысяч постов и 40 тысяч комментариев. Теперь можем написать простой запрос к базе и получить топ-10 Instagram-аккаунтов по теме аналитики и машинного обучения за последнее время:

SELECT *
FROM users
ORDER BY followers_count DESC
LIMIT 10

А вот и приятный бонус, для тех, кто искал на какие аккаунты в Instagram подписаться по релевантной тематике:

  1. @ai_machine_learning
  2. @neuralnine
  3. @datascienceinfo
  4. @compscistuff
  5. @computersciencelife
  6. @welcome.ai
  7. @papa_programmer
  8. @data_science_learn
  9. @neuralnet.ai
  10. @techno_thinkers

Полный код проекта доступен на GitHub

 Нет комментариев    104   3 дн   clickhouse   Data Analytics   Data Engineering   instagram   python

Анализ рынка вакансий аналитики и BI: дашборд в Tableau

Время чтения текста – 16 минут

По данным рейтинга SimilarWeb, hh.ru — третий по популярности сайт о трудоустройстве в мире. В одном из разговоров с Ромой Буниным у нас появилась идея сделать совместный проект: собрать данные из открытого HeadHunter API и визуализировать их при помощи Tableau Public. Нам захотелось понять, как меняется зарплата в зависимости от указанных в вакансии навыков, наименования позиции и сравнить, как обстоят дела в Москве, Санкт-Петербурге и регионах.

Как мы собирали данные?

Схема данных основана на коротком представлении вакансии, которую возвращает метод GET /vacancies. Из представления собираются следующие поля: тип вакансии, идентификатор, премиальность вакансии, необходимость прохождения тестирования, адрес компании, информация о зарплате, график работы и другие. Соответствующий CREATE-запрос для таблицы:


Запрос создания таблицы vacancies_short

CREATE TABLE headhunter.vacancies_short
(
    `added_at` DateTime,
    `query_string` String,
    `type` String,
    `level` String,
    `direction` String,
    `vacancy_id` UInt64,
    `premium` UInt8,
    `has_test` UInt8,
    `response_url` String,
    `address_city` String,
    `address_street` String,
    `address_building` String,
    `address_description` String,
    `address_lat` String,
    `address_lng` String,
    `address_raw` String,
    `address_metro_stations` String,
    `alternate_url` String,
    `apply_alternate_url` String,
    `department_id` String,
    `department_name` String,
    `salary_from` Nullable(Float64),
    `salary_to` Nullable(Float64),
    `salary_currency` String,
    `salary_gross` Nullable(UInt8),
    `name` String,
    `insider_interview_id` Nullable(UInt64),
    `insider_interview_url` String,
    `area_url` String,
    `area_id` UInt64,
    `area_name` String,
    `url` String,
    `published_at` DateTime,
    `employer_url` String,
    `employer_alternate_url` String,
    `employer_logo_urls_90` String,
    `employer_logo_urls_240` String,
    `employer_logo_urls_original` String,
    `employer_name` String,
    `employer_id` UInt64,
    `response_letter_required` UInt8,
    `type_id` String,
    `type_name` String,
    `archived` UInt8,
    `schedule_id` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY vacancy_id

Первый скрипт собирает данные с HeadHunter по API и отправляет их в Clickhouse. Он использует следующие библиотеки:

import requests
from clickhouse_driver import Client
from datetime import datetime
import pandas as pd
import re

Далее загружаем таблицу с запросами и подключаемся к CH:

queries = pd.read_csv('hh_data.csv')
client = Client(host='1.234.567.890', user='default', password='', port='9000', database='headhunter')

Таблица queries хранит список поисковых запросов. Она содержит следующие колонки: тип запроса, уровень вакансии для поиска, направление вакансии и саму поисковую фразу. В строку с запросом можно помещать логические операторы: например, чтобы найти вакансии, в которых должны присутствовать ключевые слова «Python», «data» и «анализ» между ними можно указать логическое «И».

Не всегда вакансии в выдаче соответствуют ожиданиям: случайно в базу могут попасть повара, маркетологи и администраторы магазина. Чтобы этого не произошло, опишем функцию check_name(name) — она будет принимать наименование вакансии и возвращать True в случае, если вакансия не подошла по названию.

def check_name(name):
    bad_names = [r'курьер', r'грузчик', r'врач', r'менеджер по закупу',
           r'менеджер по продажам', r'оператор', r'повар', r'продавец',
          r'директор магазина', r'директор по продажам', r'директор по маркетингу',
          r'кабельщик', r'начальник отдела продаж', r'заместитель', r'администратор магазина', 
          r'категорийный', r'аудитор', r'юрист', r'контент', r'супервайзер', r'стажер-ученик', 
          r'су-шеф', r'маркетолог$', r'региональный', r'ревизор', r'экономист', r'ветеринар', 
          r'торговый', r'клиентский', r'начальник цеха', r'территориальный', r'переводчик', 
          r'маркетолог /', r'маркетолог по']
    for item in bad_names:
        if re.match(item, name):
            return True

Затем объявляем бесконечный цикл — мы собираем данные без перерыва. Идём по DataFrame queries и сразу забираем оттуда тип вакансии, уровень, направление и поисковый запрос в отдельные переменные. Сначала по ключевому слову отправляем один запрос к методу /GET vacancies и получаем количество страниц. После идём от нулевой до последней страницы, отправляем те же запросы и заполняем список vacancies_from_response с полученными в выдаче короткими представлениями всех вакансий. В параметрах указываем 10 вакансий на страницу — больше ограничения HH API получить не позволяют. Так как мы не указали параметр area, API возвращает вакансии по всему миру.

while True:
   for query_type, level, direction, query_string in zip(queries['Тип'], queries['Уровень'], queries['Направление'], queries['Ключевое слово']):
           print(f'ключевое слово: {query_string}')
           url = 'https://api.hh.ru/vacancies'
           par = {'text': query_string, 'per_page':'10', 'page':0}
           r = requests.get(url, params=par).json()
           added_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
           pages = r['pages']
           found = r['found']
           vacancies_from_response = []

           for i in range(0, pages + 1):
               par = {'text': query_string, 'per_page':'10', 'page':i}
               r = requests.get(url, params=par).json()
               try:
                   vacancies_from_response.append(r['items'])
               except Exception as E:
                   continue

Теперь проходим по каждой вакансии на каждой странице двойным итератором. Сперва отправим запрос к Clickhouse и проверим, нет ли уже в базе вакансии с таким идентификатором и таким поисковым запросом. Если проверка пройдена — проверяем название вакансии. В случае неудачи переходим к следующей.

for item in vacancies_from_response:
               for vacancy in item:
                   if client.execute(f"SELECT count(1) FROM vacancies_short WHERE vacancy_id={vacancy['id']} AND query_string='{query_string}'")[0][0] == 0:
                       name = vacancy['name'].replace("'","").replace('"','')
                       if check_name(name):
                           continue

Теперь проходим по вакансии и собираем все нужные поля. В случае отсутствия некоторых данных будем отправлять пустые строки:


Код для сбора данных о вакансии

vacancy_id = vacancy['id']
                       is_premium = int(vacancy['premium'])
                       has_test = int(vacancy['has_test'])
                       response_url = vacancy['response_url']
                       try:
                           address_city = vacancy['address']['city']
                           address_street = vacancy['address']['street']
                           address_building = vacancy['address']['building']
                           address_description = vacancy['address']['description']
                           address_lat = vacancy['address']['lat']
                           address_lng = vacancy['address']['lng']
                           address_raw = vacancy['address']['raw']
                           address_metro_stations = str(vacancy['address']['metro_stations']).replace("'",'"')
                       except TypeError:
                           address_city = ""
                           address_street = ""
                           address_building = ""
                           address_description = ""
                           address_lat = ""
                           address_lng = ""
                           address_raw = ""
                           address_metro_stations = ""
                       alternate_url = vacancy['alternate_url']
                       apply_alternate_url = vacancy['apply_alternate_url']
                       try:
                           department_id = vacancy['department']['id']
                       except TypeError as E:
                           department_id = ""
                       try:
                           department_name = vacancy['department']['name']
                       except TypeError as E:
                           department_name = ""
                       try:
                           salary_from = vacancy['salary']['from']
                       except TypeError as E:
                           salary_from = "cast(Null as Nullable(UInt64))"
                       try:
                           salary_to = vacancy['salary']['to']
                       except TypeError as E:
                           salary_to = "cast(Null as Nullable(UInt64))"
                       try:
                           salary_currency = vacancy['salary']['currency']
                       except TypeError as E:
                           salary_currency = ""
                       try:
                           salary_gross = int(vacancy['salary']['gross'])
                       except TypeError as E:
                           salary_gross = "cast(Null as Nullable(UInt8))"
                       try:
                           insider_interview_id = vacancy['insider_interview']['id']
                       except TypeError:
                           insider_interview_id = "cast(Null as Nullable(UInt64))"
                       try:
                           insider_interview_url = vacancy['insider_interview']['url']
                       except TypeError:
                           insider_interview_url = ""
                       area_url = vacancy['area']['url']
                       area_id = vacancy['area']['id']
                       area_name = vacancy['area']['name']
                       url = vacancy['url']
                       published_at = vacancy['published_at']
                       published_at = datetime.strptime(published_at,'%Y-%m-%dT%H:%M:%S%z').strftime('%Y-%m-%d %H:%M:%S')
                       try:
                           employer_url = vacancy['employer']['url']
                       except Exception as E:
                           print(E)
                           employer_url = ""
                       try:
                           employer_alternate_url = vacancy['employer']['alternate_url']
                       except Exception as E:
                           print(E)
                           employer_alternate_url = ""
                       try:
                           employer_logo_urls_90 = vacancy['employer']['logo_urls']['90']
                           employer_logo_urls_240 = vacancy['employer']['logo_urls']['240']
                           employer_logo_urls_original = vacancy['employer']['logo_urls']['original']
                       except Exception as E:
                           print(E)
                           employer_logo_urls_90 = ""
                           employer_logo_urls_240 = ""
                           employer_logo_urls_original = ""
                       employer_name = vacancy['employer']['name'].replace("'","").replace('"','')
                       try:
                           employer_id = vacancy['employer']['id']
                       except Exception as E:
                           print(E)
                       response_letter_required = int(vacancy['response_letter_required'])
                       type_id = vacancy['type']['id']
                       type_name = vacancy['type']['name']
                       is_archived = int(vacancy['archived'])

Последнее поле — график работы. В случае, если вакансия подразумевает вахтовый метод работы она нам точно не подходит.

try:
    schedule = vacancy['schedule']['id']
except Exception as E:
    print(E)
    schedule = ''"
if schedule == 'flyInFlyOut':
    continue

Теперь формируем список из полученных переменных, заменяем в нём None-значения на пустые строки во избежании конфликтов с Clickhouse и вставляем строку в таблицу.

vacancies_short_list = [added_at, query_string, query_type, level, direction, vacancy_id, is_premium, has_test, response_url, address_city, address_street, address_building, address_description, address_lat, address_lng, address_raw, address_metro_stations, alternate_url, apply_alternate_url, department_id, department_name,
salary_from, salary_to, salary_currency, salary_gross, insider_interview_id, insider_interview_url, area_url, area_name, url, published_at, employer_url, employer_logo_urls_90, employer_logo_urls_240,  employer_name, employer_id, response_letter_required, type_id, type_name, is_archived, schedule]
for index, item in enumerate(vacancies_short_list):
    if item is None:
        vacancies_short_list[index] = ""
tuple_to_insert = tuple(vacancies_short_list)
print(tuple_to_insert)
client.execute(f'INSERT INTO vacancies_short VALUES {tuple_to_insert}')

Как подключили Tableau к данным?

Tableau Public не умеет работать с базами данных, поэтому мы написали коннектор Clickhouse к Google Sheets. Он использует библиотеки gspread и oauth2client для авторизации в Google Spreadsheets API и библиотеку schedule для ежедневной работы по графику.

Работа с Google Spreadseets API подробно разобрана в материале «Собираем данные по рекламным кампаниям ВКонтакте»

import schedule
from clickhouse_driver import Client
import gspread
import pandas as pd
from oauth2client.service_account import ServiceAccountCredentials
from datetime import datetime

scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
client = Client(host='54.227.137.142', user='default', password='', port='9000', database='headhunter')
creds = ServiceAccountCredentials.from_json_keyfile_name('credentials.json', scope)
gc = gspread.authorize(creds)

Опишем функцию update_sheet() — она будет брать все данные из Clickhouse и вставлять их в таблицу Google Docs.

def update_sheet():
   print('Updating cell at', datetime.now())
   columns = []
   for item in client.execute('describe table headhunter.vacancies_short'):
       columns.append(item[0])
   vacancies = client.execute('SELECT * FROM headhunter.vacancies_short')
   df_vacancies = pd.DataFrame(vacancies, columns=columns)
   df_vacancies.to_csv('vacancies_short.csv', index=False)
   content = open('vacancies_short.csv', 'r').read()
   gc.import_csv('1ZWS2kqraPa4i72hzp0noU02SrYVo0teD7KZ0c3hl-UI', content.encode('utf-8'))

Чтобы скрипт запускался в 16:00 по МСК каждый день используем библиотеку schedule:

schedule.every().day.at("13:00").do(update_sheet)
while True:
   schedule.run_pending()

А что в результате?

Рома построил на полученных данных дашборд.

И в youtube-ролике рассказывает о том, как эффективно использовать дашборд

Инсайты, которые можно извлечь из дашборда

  1. Аналитики с навыком бизнес-аналитики востребованы на рынке больше всего: по такому запросу нашлось больше всего вакансий. Тем не менее, средняя зарплата выше у продуктовых аналитиков и аналитиков BI.
  2. В Москве средние зарплаты выше на 10-30 тысяч рублей, чем в Санкт-Петербурге и на 30-40 тысячи рублей, чем в регионах. Там же работы нашлось больше всего в России.
  3. Самые высокооплачиваемые должности: руководитель отдела аналитики (в среднем, 110 тыс. руб. в месяц), инженер баз данных (138 тыс. руб. в месяц) и директор по машинному обучению (250 тыс. руб. в месяц).
  4. Самые полезные навыки на рынке — владение Python c библиотеками pandas и numpy, Tableau, Power BI, Etl и Spark. Вакансий с такими требованиями больше и зарплаты в них указаны выше прочих. Для Python-программистов знание matplotlib ценится на рынке выше, чем владение plotly.

Полный код проекта доступен на GitHub

 1 комментарий    654   8 дн   bi   BI-инструменты   clickhouse   Data Analytics   Data Engineering   headhunter

Семантический анализ мнений о поправках к Конституции на основе данных ВКонтакте

Время чтения текста – 14 минут

Сегодня поработаем с открытыми данными из ВКонтакте и получим семантическую оценку на популярное и актуальное событие — поправки к Конституции Российской Федерации.

Обзор методов API

Воспользуемся методом newsfeed.search: он позволяет получить до тысячи последних постов из новостной ленты по ключевому слову. В результате приходит много полей: среди них идентификаторы записи и пользователя или сообщества, текст поста, количество лайков, комментарии, приложения, геопозиция и прочее. Нас интересуют только идентификаторы и текст.
Для аналитики пригодится расширенная информация об авторе поста: его город, пол и возраст можно получить методом users.get, причём в запросе будем отправлять сразу до тысячи пользователей.

Создаём таблицы в Clickhouse

Данные нужно будет где-то хранить, в качестве СУБД подойдёт Clickhouse. Создадим две таблицы: для постов и для пользователей. В первой будем хранить идентификаторы и текст поста, во второй — данные о пользователе: его id, пол, возраст и город. Движок ReplacingMergeTree() будет удалять дубликаты.

Мы уже писали о том, как установить Clickhouse на бесплатную машину AWS, создавать в нём внешние словари и материализованные представления

CREATE TABLE vk_posts(
   post_id UInt64,
   post_date DateTime,
   owner_id UInt64,
   from_id UInt64,
   text String
) ENGINE ReplacingMergeTree()
ORDER BY post_date

CREATE TABLE vk_users(
   user_id UInt64,
   user_sex Nullable(UInt8),
   user_city String,
   user_age Nullable(UInt16)
) ENGINE ReplacingMergeTree()
ORDER BY user_id

Сбор постов через API ВКонтакте

Перейдём к написанию скрипта. Импортируем библиотеки и задаём несколько константных значений:

В материале «Собираем данные по рекламным кампаниям ВКонтакте» подробно описан процесс получения токена пользователя для VK API

from clickhouse_driver import Client
from datetime import datetime
import requests
import pandas as pd
import time

token = 'your_token'
version = 5.103
client = Client(host='ec1-23-456-789-1011.us-east-2.compute.amazonaws.com', user='default', password='', port='9000', database='default')      
data_list = []
start_from = 0
query_string = 'конституция'

Опишем функцию get_and_insert_info_by_user — она будет принимать список идентификаторов пользователей, получать расширенную информацию о них и отправлять в таблицу vk_users. Так как параметр user_ids метода принимает список как строку, переводим структуру в тип str и отсекаем квадратные скобки. Многие пользователи скрывают пол, возраст или город — в таком случае вставляет Nullable значения. Для получения возраста берём текущий год и вычитаем год из даты рождения, если он представлен — проверку делаем регулярным выражением по четырём цифрам.


Функция get_and_insert_info_by_user

def get_and_insert_info_by_user(users):
    try:
        r = requests.get('https://api.vk.com/method/users.get', params={
            'access_token':token,
            'v':version,
            'user_ids':str(users)[1:-2],
            'fields':'sex, city, bdate'
        }).json()['response']
        for user in r:
            user_list = []
            user_list.append(user['id'])
            if client.execute(f"SELECT count(1) FROM vk_users where user_id={user['id']}")[0][0] == 0:
                print(user['id'])
                try:
                    user_list.append(user['sex'])
                except Exception:
                    user_list.append('cast(Null as Nullable(UInt8))')
                try:
                    user_list.append(user['city']['title'])
                except Exception:
                    user_list.append('')
                try:
                    now = datetime.now()
    			    year = item.split('.')[-1]
    			    if re.match(r'\d\d\d\d', year):
        		        age = now.year - int(year)
			    	   user_list.append(age)
                except Exception:
                    user_list.append('cast(Null as Nullable(UInt16))')
                user_insert_tuple = tuple(user_list)
                client.execute(f'INSERT INTO vk_users VALUES {user_insert_tuple}')
    except KeyError:
        pass


Наш скрипт будет работать в вечном цикле, чтобы постоянно добирать новые данные, ведь мы можем получать только тысячу последних. Метод newsfeed.search за раз возвращает двести постов, так что нужно вызывать его пять раз подряд и собирать все ответы.


Цикл сбора новых постов

while True:
    for i in range(5):
        r = requests.get('https://api.vk.com/method/newsfeed.search', params={
            'access_token':token,
            'v':version,
            'q':query_string,
            'count':200,
            'start_from': start_from
        })
        data_list.append(r.json()['response'])
        try:
            start_from = r.json()['response']['next_from']
        except KeyError:
            pass

Полученные в ответе данные можно распарсить. В ВКонтакте у пользователей id всегда положительный, а у сообществ идёт со знаком минус. Чтобы получить данные только от пользователей, будем собирать только те, где from_id больше нуля. Следующая проверка — на отсутствие текста в посте, такие нам тоже не нужны. Наконец, будем собирать данные только если таких ещё нет — для этого обращаемся к таблице vk_posts по текущему id. В конце приостановим скрипт на 180 секунд, чтобы дождаться новых постов и не столкнуться с ограничениями по запросам VK API.


Занесение новых данных в Clickhouse

user_ids = []
    for data in data_list:
        for data_item in data['items']:
            if data_item['from_id'] > 0:
                post_list = []
                if not data_item['text']:
                    continue
                if client.execute(f"SELECT count(1) FROM vk_posts WHERE post_id={data_item['id']} AND from_id={data_item['from_id']}")[0][0] == 0:
                    user_ids.append(data_item['from_id'])
                    date = datetime.fromtimestamp(data_item['date'])
                    date = datetime.strftime(date, '%Y-%m-%d %H:%M:%S')
                    post_list.append(date)
                    post_list.append(data_item['id'])
                    post_list.append(data_item['owner_id'])
                    post_list.append(data_item['from_id'])
post_list.append(data_item['text'].replace("'","").replace('"','').replace("\n",""))
                    post_list.append(query_string)
                    post_tuple = tuple(post_list)
                    print(post_list)
                    try:
                        client.execute(f'INSERT INTO vk_posts VALUES {post_tuple}')
                    except Exception as E:
                        print('!!!!! try to insert into vk_post but got', E)
    try:
        get_and_insert_info_by_user(user_ids)
    except Exception as E:
        print("Try to insert user list:", user_ids, "but got:", E)
    time.sleep(180)

Анализ постов через Dostoevsky

Этот скрипт мы оставили работать на неделю: за это время он набрал почти 20000 постов из ВКонтакте, в которых упоминается ключевое слово «конституция». Напишем второй скрипт — для аналитики и визуализации данных. Для начала соберём данные из таблицы, сформируем DataFrame и для каждого поста получим значения тональности: насколько он положителен, отрицателен и нейтрален. Для оценки тональности текста будем использовать библиотеку Dostoevsky.

from dostoevsky.tokenization import RegexTokenizer
from dostoevsky.models import FastTextSocialNetworkModel
from clickhouse_driver import Client
import pandas as pd
client = Client(host='ec1-23-456-789-1011.us-east-2.compute.amazonaws.com', user='default', password='', port='9000', database='default')

Простым запросом содержимое всей таблицы с постами занесём в переменную vk_posts. Пройдём все посты, выберем те посты, где есть текст помимо пробелов и положим их в DataFrame.

vk_posts = client.execute('SELECT * FROM vk_posts')
list_of_posts = []
list_of_ids = []
for post in vk_posts:
    if str(post[-2]).replace(" ", ""):
        list_of_posts.append(str(post[-2]).replace("\n",""))
        list_of_ids.append(int(post[2]))
df_posts = pd.DataFrame()
df_posts['post'] = list_of_posts
df_posts['id'] = list_of_ids

Обходим моделью весь список постов с текстом и получаем к оценку тональности для каждой записи.

tokenizer = RegexTokenizer()
model = FastTextSocialNetworkModel(tokenizer=tokenizer)
sentiment_list = []
results = model.predict(list_of_posts, k=2)
for sentiment in results:
    sentiment_list.append(sentiment)

Для каждой строки в DataFrame заведём ещё три колонки: насколько запись положительна, отрицательна и нейтральна. В случае, если по одному из трёх параметров ничего не вернулось, будем заносить ноль.

neutral_list = []
negative_list = []
positive_list = []
speech_list = []
skip_list = []
for sentiment in sentiment_list:
    neutral = sentiment.get('neutral')
    negative = sentiment.get('negative')
    positive = sentiment.get('positive')
    if neutral is None:
        neutral_list.append(0)
    else:
        neutral_list.append(sentiment.get('neutral'))
    if negative is None:
        negative_list.append(0)
    else:
        negative_list.append(sentiment.get('negative'))
    if positive is None:
        positive_list.append(0)
    else:
        positive_list.append(sentiment.get('positive'))
df_posts['neutral'] = neutral_list
df_posts['negative'] = negative_list
df_posts['positive'] = positive_list

Посмотрим, как выглядит наш DataFrame теперь:

Можем посмотреть примеры самых негативных постов:

df_posts[df_posts.negative > 0.9]

Нашей таблице не хватает данных об авторах постов. Возьмём их из таблицы vk_users и сольём обе таблицы по полю «id».

vk_users = client.execute('SELECT * FROM vk_users')
vk_user_ids_list = []
vk_user_sex_list = []
vk_user_city_list = []
vk_user_age_list = []
for user in vk_users:
    vk_user_ids_list.append(user[0])
    vk_user_sex_list.append(user[1])
    vk_user_city_list.append(user[2])
    vk_user_age_list.append(user[3])
df_users = pd.DataFrame()
df_users['id'] = vk_user_ids_list
df_users['sex'] = vk_user_sex_list
df_users['city'] = vk_user_city_list
df_users['age'] = vk_user_age_list
df = df_posts.merge(df_users, on='id')

Теперь таблица выглядит так:

Анализируем графики от plotly

В материале «Как построить красивый waterfall chart в Python?» мы уже строили графики библиотекой plotly

Для начала посчитаем процентное соотношение постов с положительной, отрицательной и нейтральной тональностью: пройдём все три столбца и подсчитаем для каждого случая строки, отличные от нуля. Затем проделаем то же самое для разных возрастных категорий и половых принадлежностей.

Из графика следует, что 46% постов по запросу «конституция» за последнюю неделю имеют негативный окрас. Другие 52% высказываются нейтрально. Чуть позже узнаем, насколько мнения в интернете совпадают с официальными результатами голосования.

Заметно, что доля положительных постов среди мужской аудитории составляет 2%, среди женской — вдвое больше, 4%. Впрочем, негативных постов в обоих группах практически поровну: 47% среди мужской и 44% среди женской.

Наконец, оценка постов по возрастным группам: больше всего доля позитивного текста наблюдается в группе 18 — 25 лет, это 3%. Меньше всего позитивных постов в группе до 18 лет, но это может происходить и в связи с тем, что многие пользователи моложе 18 лет предпочитают скрывать возраст, и точные данные по такой группе получить не удастся. Негативных постов во всех группах кроме 18 — 25 поровну: 46%.
Заметно, что на всех трёх графиках данные распределены приблизительно одинаково. Это говорит о том, что за последнюю неделю практически половина всех постов по ключевому слову «конституция» в новостной ленте ВКонтакте имела негативный окрас.

 Нет комментариев    863   2 мес   Data Analytics   Data Engineering   plotly

Обрабатываем нажатие кнопки в Selenium

Время чтения текста – 10 минут

В материале «Парсим данные, используя Buetiful Soup и Selenium» мы уже рассмотрели, как быть, когда данные на странице динамически подгружаются при скролле страницы. Но бывают ситуации, когда новые данные можно получить, только нажав на кнопку «Показать ещё» — сегодня узнаем, как через Selenium сымитировать нажатие кнопки для полного открытия страницы, соберём идентификаторы пива, оценки к каждому продукту и отправим данные в Clickhouse

Структура страницы

Возьмём случайную пивоварню — у неё 105 чекинов, то есть, отзывов. Страница с чекинами пивоварни показывает не более 25 чекинов и выглядит так:

Если попробуем промотать в самый низ, столкнёмся с той самой кнопкой, мешающей нам взять все 105 за раз:

Мы поступим так: выясним, к какому классу относится элемент кнопки и будем на неё нажимать, пока это возможно. Так как Selenium запускает браузер, следующая кнопка «Показать ещё» может не успеть прогрузиться, поэтому между нажатиями поставим интервал в пару секунд. Как только страница раскроется полностью — мы возьмём её содержимое и распарсим нужные данные из чекинов. Зайдём в код страницы и найдём кнопку — она относится к классу more_checkins.

У кнопки есть свойства стиля, а именно — display. В случае, если кнопка должна отображаться, display принимает значение block. Но когда промотаем страницу до самого конца, кнопку не нужно будет показывать, ведь открывать больше нечего — поэтому display кнопки примет значение none. В случае, если мы запросим у кнопки display и вернётся none будем знать, что открывать больше нечего и можно перестать жать на кнопку.

Пишем код

Начнём с импорта библиотек:

import time
from selenium import webdriver
from bs4 import BeautifulSoup as bs
import re
from datetime import datetime
from clickhouse_driver import Client

Chromedriver, необходимый для запуска браузера через Selenium, можно установить с официальной страницы

Подключимся к базе данных, зададим cookies:

client = Client(host='ec1-23-456-789-10.us-east-2.compute.amazonaws.com', user='', password='', port='9000', database='')
count = 0
cookies = {
    'domain':'untappd.com',
    'expiry':1594072726,
    'httpOnly':True,
    'name':'untappd_user_v3_e',
    'path':'/',
    'secure':False,
    'value':'your_value'
}

О том, как запускать Selenium с cookies можно прочитать в материале «Парсим данные каталога сайта, используя Beautiful Soup и Selenium». Нам нужен параметр untappd_user_v3_e.

Так как мы планируем работать с пивоварнями, у которых более сотни тысяч чекинов, может оказаться так, что страница будет чересчур тяжёлой, и нагрузка на машину будет огромна. Чтобы этого избежать, отключим всё лишнее, а затем подключим cookie для авторизации:

options = webdriver.ChromeOptions()
prefs = {'profile.default_content_setting_values': {'images': 2, 
                            'plugins': 2, 'fullscreen': 2}}
options.add_experimental_option('prefs', prefs)
options.add_argument("start-maximized")
options.add_argument("disable-infobars")
options.add_argument("--disable-extensions")
driver = webdriver.Chrome(options=options)
driver.get('https://untappd.com/TooSunnyBrewery')
driver.add_cookie(cookies)

Напишем функцию, которая принимает ссылку, переходит по ней, полностью раскрывает страницу и возвращает нам soup, который можно будет распарсить. Получим display кнопки и запишем в переменную more_checkins: пока он не равен none будем нажимать на кнопку и снова получать её display. Сделаем интервал в две секунды между нажатиями, чтобы подождать прогрузку страницы. Как только будет получена вся страница, переведём её в soup библиотекой bs4.

def get_html_page(url):
    driver.get(url)
    driver.maximize_window()
    more_checkins = driver.execute_script("var more_checkins=document.getElementsByClassName('more_checkins_logged')[0].style.display;return more_checkins;")
    print(more_checkins)
    while more_checkins != "none":
        driver.execute_script("document.getElementsByClassName('more_checkins_logged')[0].click()")
        time.sleep(2)
        more_checkins = driver.execute_script("var more_checkins=document.getElementsByClassName('more_checkins_logged')[0].style.display;return more_checkins;")
        print(more_checkins)
    source_data = driver.page_source
    soup = bs(source_data, 'lxml')
    return soup

Напишем следующую функцию: она тоже будет принимать url страницы, передавать его в get_html_page, получать soup и парсить его. Функция вернёт запакованные списки с идентификатором пива и оценкой к нему.

О том, как парсить элементы страницы мы уже говорили в материале «Парсим данные каталога сайта, используя Beautiful Soup».

def parse_html_page(url):
    soup = get_html_page(url)
    brewery_id = soup.find_all('a', {'class':'label',
                                     'href':re.compile('https://untappd.com/brewery/*')})[0]['href'][28:]
    items = soup.find_all('div', {'class':'item',
                                  'id':re.compile('checkin_*')})
    checkin_rating_list = []
    beer_id_list = []
    count = 0
    print('Заполняю списки')
    for checkin in items:
        print(count, '/', len(items))
        try:
            checkin_rating_list.append(float(checkin.find('div', {'class':'caps'})['data-rating']))
        except Exception:
            checkin_rating_list.append('cast(Null as Nullable(Float32))')
        try:
            beer_id_list.append(int(checkin.find('a', {'class':'label'})['href'][-7:]))
        except Exception:
            beer_id_list.append('cast(Null as Nullable(UInt64))')
        count += 1 
    return zip(checkin_rating_list, beer_id_list)

Наконец, напишем вызов функций по пивоварням. В материале «Использование словарей в Clickhouse на примере данных Untappd» мы уже рассмотрели, как получить список идентификаторов российских пивоварен — обратимся к нему через таблицу в Clickhouse

brewery_list = client.execute('SELECT brewery_id FROM brewery_info')

Если посмотрим на brewery_list, то узнаем, что данные вернулись в неудобном формате: это список кортежей.

Небольшое лямбда-выражение позволит его «выпрямить»:

flatten = lambda lst: [item for sublist in lst for item in sublist]
brewery_list = flatten(brewery_list)

Работать с таким списком значительно комфортнее:

Для каждой пивоварни в списке сформируем url — он состоит из стандартной ссылки и идентификатора пивоварни в конце. Отправим url в функцию parse_html_page, которая сама вызовет get_html_page и вернёт списки с beer_id и rating_score. Так как два списка вернутся упакованными можем пройти по ним итератором, сформировав кортеж и отправив его в Clickhouse.

for brewery_id in brewery_list:
    print('Беру пивоварню с id', brewery_id, count, '/', len(brewery_list))
    url = 'https://untappd.com/brewery/' + str(brewery_id)
    returned_checkins = parse_html_page(url)
    for rating, beer_id in returned_checkins:
        tuple_to_insert = (rating, beer_id)
        try:
            client.execute(f'INSERT INTO beer_reviews VALUES {tuple_to_insert}')
        except errors.ServerException as E:
            print(E)
    count += 1

С кнопками на этом всё. Постепенно мы формируем отличный датасет для последующего анализа, который рассмотрим в следующем цикле статей, как только завершим сбор данных — возможно, через месяц.

 Нет комментариев    33   3 мес   Amazon Web Services   AWS   clickhouse   Data Engineering   python

Использование словарей в Clickhouse на примере данных Untappd

Время чтения текста – 15 минут

В Clickhouse реализована возможность использования внутренних и внешних словарей, которые могут быть альтернативой JOIN (которые, к сожалению, не всегда здорово работают). Словари хранят информацию в памяти и к ним можно обратиться командой dictGet. Рассмотрим как создать словарь в Clickhouse и как его можно использовать в запросах.

Будем изучать функционал на примере данных из API Untappd. Untappd — социальная сеть любителей крафтового пива. Мы сфокусируемся на чекинах российких крафтовых пивоварен, начнем собирать информацию о них, чтобы в следующих постах проанализировать данные и сделать некоторые выводы. В рамках этого поста разберем получение мета-информации о российских пивоварнях на Untappd, а полученные данные сохраним в словаре Clickhouse.

Собираем данные с Untappd

Для обращений к API нужны client_id и  client_secret_key — их можно получить, создав приложение. Для этого переходим в раздел создания приложения в документации и указываем некоторые данные:

После отправления заявки нужно будет подождать некоторое время: от 1 до 3 недель.

import requests
import pandas as pd
import time

Отправлять запросы к API будем через requests, а в pandas посмотрим на результаты и выгрузим в csv, чтобы отправить в словарь Clickhouse. У Untappd строгие ограничения на количество запросов: всего в час можно отправить 100 запросов, поэтому будем библиотекой time ставить скрипт в ожидание на 38 секунд, чтобы число запросов в час не превосходило 100.

client_id = 'ваш_client_id'
client_secret = 'ваш_client_secret'
all_brewery_of_russia = []

Мы хотим собрать всю тысячу российских пивоварен. Один запрос к методу Brewery Search позволяет получить до 50 пивоварен. При поиске вручную на сайте Untappd по слову «Russia» сайт выдаст 3369 пивоварен:

Проверим это: пролистаем страницу до самого низа и откроем код страницы.

Каждая полученная пивоварня в поиске находится в классе beer-item. Значит, можем в поиске посчитать количество упоминаний beer-item:

И выясняем, что на самом деле их здесь ровно 1000, а не 3369. По запросу Russia в выборку попадают и американские пивоварни, а некоторые были удалены. Значит, придётся отправить 20 запросов, будем получать по 50 пивоварен за раз:

for offset in range(0, 1000, 50):
    try:
        print('offset = ', offset)
        print('осталось:', 1000 - offset, '\n')
        response = requests.get(f'https://api.untappd.com/v4/search/brewery?client_id={client_id}&client_secret={client_secret}',
                               params={
                                   'q':'Russia',
                                   'offset':offset,
                                   'limit':50
                               })
        item = response.json()
        print(item, '\n')
        all_brewery_of_russia.append(item)
        time.sleep(37)
    except Exception:
        print(Exception)
        continue

В параметрах метод Brewery Search принимает q — строку, по которой будем осуществлять поиск на сервисе. Укажем в ней «Russia», чтобы получить все пивоварни, связанные с Россией. Другой параметр — offset — отвечает за смещение. Получив первые 50 пивоварен мы смещаемся на 50 строк в поиске, чтобы получить следующие 50 пивоварен. limit отвечает за количество получаемых пивоварен и не может быть больше 50.
Преобразовываем ответ в формат json и добавляем полученные данные в список all_brewery_of_russia. Объект item будет содержать такие данные:

Но в полученных данных могли затесаться и пивоварни других стран. Отфильтруем их: пройдём итератором по всему списку all_brewery_of_russia и добавим в итоговый только те пивоварни, у которых параметр country_name принимает значение Russia.

brew_list = []
for element in all_brewery_of_russia:
    brew = element['response']['brewery']
    for i in range(brew['count']):
        if brew['items'][i]['brewery']['country_name'] == 'Russia':
            brew_list.append(brew['items'][i])

Посмотрим на первый элемент списка brew_list:

print(brew_list[0])

Соберём из списка DataFrame с колонками brewery_id, beer_count, brewery_name, brewery_slug, brewery_page_url, brewery_city, lat и  lng. Получим в отдельные списки данные из  brewery_list:

df = pd.DataFrame()
brewery_id_list = []
beer_count_list = []
brewery_name_list = []
brewery_slug_list = []
brewery_page_url_list = []
brewery_location_city = []
brewery_location_lat = []
brewery_location_lng = []
for brewery in brew_list:
    brewery_id_list.append(brewery['brewery']['brewery_id'])
    beer_count_list.append(brewery['brewery']['beer_count'])
    brewery_name_list.append(brewery['brewery']['brewery_name'])
    brewery_slug_list.append(brewery['brewery']['brewery_slug'])
    brewery_page_url_list.append(brewery['brewery']['brewery_page_url'])
 brewery_location_city.append(brewery['brewery']['location']['brewery_city'])
    brewery_location_lat.append(brewery['brewery']['location']['lat'])
    brewery_location_lng.append(brewery['brewery']['location']['lng'])

И отправим их в DataFrame:

df['brewery_id'] = brewery_id_list
df['beer_count'] = beer_count_list
df['brewery_name'] = brewery_name_list
df['brewery_slug'] = brewery_slug_list
df['brewery_page_url'] = brewery_page_url_list
df['brewery_city'] = brewery_location_city
df['brewery_lat'] = brewery_location_lat
df['brewery_lng'] = brewery_location_lng

Посмотрим, как выглядит наша таблица:

df.head()

Отсортируем значения по  brewery_id и выгрузим таблицу в формате csv без столбца с индексами и заголовков колонок:

df = df.sort_values(by='brewery_id')
df.to_csv('brewery_data.csv', index=False, header=False)

Создаём словарь Clickhouse

Словари для Clickhouse можно создавать по-разному. Мы попробуем задать его структуру в xml-файле, настроить конфигурационные файлы сервера и обращаться к нему через клиент. Наш xml будет иметь следующую структуру:

Со всеми способами создания словарей можно ознакомиться в документации

<yandex>
<dictionary>
        <name>breweries</name>
        <source>
                <file>
                        <path>/home/ubuntu/brewery_data.csv</path>
                        <format>CSV</format>
                </file>
        </source>
        <layout>
                <flat />
        </layout>
        <structure>
                <id>
                        <name>brewery_id</name>
                </id>
                <attribute>
                        <name>beer_count</name>
                        <type>UInt64</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_name</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_slug</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_page_url</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>brewery_city</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>lat</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
                <attribute>
                        <name>lng</name>
                        <type>String</type>
                        <null_value>Null</null_value>
                </attribute>
        </structure>
        <lifetime>300</lifetime>
</dictionary>
</yandex>

Под  идёт имя словаря. В  указываем свойства колонок. Под тегом идёт ключевое поле, а под тегом укажем путь и формат файла. Скоро мы положим его в папку /home/ubuntu, поэтому так и укажем.

Загрузим нашу csv-таблицу и xml-файл на сервер, это можно сделать, например, по ftp через FileZilla. В одном из материалов мы учились ставить Clickhouse на бесплатную машину от Amazon, в этот раз будем работать там же. В FileZilla заходим в настройки SFTP и добавляем файл с ключом:

И подключаемся к серверу по адресу, который указан в консоли EC2 машины на AWS. Укажем протокол SFTP, свой Host и в качестве User — Ubuntu:

В случае перезагрузки машины через консоль Public DNS мог измениться

После подсоединения мы попадём в папку /home/ubuntu сервера. Положим файлы туда же. Теперь подключимся по SSH через Termius. Чтобы Clickhouse увидел файл со структурой словаря, его нужно положить в папку /etc/clickhouse-server:

О том, как подключаться в серверу на AWS через SSH-клиент мы рассказывали в материале «Устанавливаем Clickhouse на AWS»

sudo mv breweries_dictionary.xml /etc/clickhouse server/

Переходим в конфигурационный файл:

cd /etc/clickhouse-server
sudo nano config.xml

Нам нужен тег  — он указывает путь к файлу, который описывает структуру словарей. Укажем путь к нашему xml:

<dictionaries_config>/etc/clickhouse-server/breweries_dictionary.xml</dictionaries_config>

Сохраняем файл и запускаем клиент Clickhouse:

clickhouse client

Проверим, что наш словарь действительно загрузился:

SELECT * FROM system.dictionaries\G

В случае успеха получим подобное:

Напишем запрос к функции dictGet, чтобы получить название пивоварни под ID 999. Указываем первым аргументом наименование словаря, затем поле, значение которого хотим получить и ID.

SELECT dictGet('breweries', 'brewery_name', toUInt64(999))

Если сделаем всё правильно, то выясним, что под ID 999 находится Балтика:

Аналогичным образом удобно использовать функцию, когда в таблице с фактами хранится только ID измерения для получения понятного наименования.

 Нет комментариев    97   3 мес   Amazon Web Services   clickhouse   Data Analytics   Data Engineering   python
Ранее Ctrl + ↓