Блог об аналитике, визуализации данных, data science и BI

    Valiotti Analytics — построение аналитики для мобильных и digital-стартапов

Гайд по современным BI-системам

Время чтения текста – 4 минуты

В новой серии постов постараемся подробно изучить различные BI-системы на популярной группе датасетов SuperStore Sales. В основе данных — продажи и прибыль сетевого ритейлера в долларах.

В следующем посте обсудим постановку реальной задачи, которая могла бы стоять при подготовке дашборда на основе датасета и спроектируем макет эффективного дашборда, отвечающего на поставленные вопросы. В рамках описания задачи укажем желаемую цветовую гамму для того, чтобы сравнение дашбордов было максимально консистентным.

Затем, используя каждый инструмент, построим дашборд, который позволит принимать эффективные управленческие решения на основе данных. При подготовке дашборда постараемся подключать экспертов индустрии и учтём их комментарии.

Ниже перечислен перечень BI-систем и инструментов для работы, с данными, которые хотелось бы опробовать и описать опыт построения дашборда. Приглашаю тех, кто желает поучаствовать в решении данной задачи написать мне в Telegram — @valiotti. Разумеется, авторство дашборда будет указано. Проект некоммерческий, но полезный для сравнения современных систем для аналитики независимо от квадрантов Gartner.

Сейчас в планах подготовить материалы о следующих инструментах:

Бесплатные (Open source):

  • Metabase
  • Redash
  • Apache Superset
  • Dash / Plotly

Бесплатные (cloud):

  • Google Studio
  • Yandex Datalens

Платные (cloud):

  • Mode
  • Cluvio
  • Holistic
  • Chartio
  • Periscope
  • DeltaDNA
  • Klipfolio
  • Count.co

Платные:

  • PowerBI
  • Tableau
  • Looker
  • Excel
  • Alteryx
  • Qlik Sense
  • Qlik View

Итоговая цель — оценить системы по нескольким внутренним критериям:

  • порог входа в инструмент (1 — супер сложно, 10 — легко)
  • функциональность инструмента (1 — очень бедный функционал, 10 — сложно что-то добавить)
  • удобство пользования (1 — очень неудобно, 10 — супер удобно)
  • соответствие результата задаче (1 — совсем не попали в желаемый макет, 10 — очень близко к описанию и макету)
  • визуальная составляющая (1 — выглядит непривлекательно, 10 — визуально привлекательный дашборд)

На основе полученных внутренних оценок будет рассчитана интегральная взвешенная оценка для инструмента.

Параллельно, результаты работы будут представлены в Telegram-канале @leftjoin, и подписчики также смогут высказать свое мнение относительно полученного результата.
В итоге каждый инструмент будет описан точкой на плоскости, а сама плоскость будет поделена на 4 части.

По мере написания новых материалов в цикле этот пост будет обновляться: будут добавляться ссылки на посты и оценки.

Создаём дашборд на Bootstrap (Часть 2)

Время чтения текста – 16 минут

В последнем материале мы подготовили базовый макет дашборда при помощи библиотеки dash-bootstrap-components с двумя графиками: scatter plot и российской картой, которые подробно разбирали ранее. Сегодня продолжим наполнять дашборд информацией: встроим в него таблицы и фильтр данных по пивоварням.

Получение таблиц

Сами таблицы будем описывать в макете в файле application.py, но информацию, которую они отображают лаконичнее будет получить в отдельном модуле. Создадим файл get_tables.py: в нём будет функция, передающая готовую таблицу класса Table библиотеки dbc в application.py. В этом материале мы опишем только таблицу лучших пивоварен России, но на GithHub будут представлены все три.

В таблицах по заведениям и пивоварням мы реализуем фильтр по городам, но изначально города в собранных с Untappd данных записаны на латинице. Для запросов мы будем переводить русскоязычные наименования городов на английский при помощи библиотеки Google Translate. Кроме того, одни и те же города могут называться по-разному — например, «Москва» на латинице где-то записана как «Moskva», а где-то как «Moscow». Для этого дополнительно настроим маппинг наименований города и заранее создадим словарь с корректными наименованиями основных городов. Он пригодится в самом конце.

import pandas as pd
import dash_bootstrap_components as dbc
from clickhouse_driver import Client
import numpy as np
from googletrans import Translator

translator = Translator()

client = Client(host='12.34.56.78', user='default', password='', port='9000', database='')

city_names = {
   'Moskva': 'Москва',
   'Moscow': 'Москва',
   'СПБ': 'Санкт-Петербург',
   'Saint Petersburg': 'Санкт-Петербург',
   'St Petersburg': 'Санкт-Петербург',
   'Nizhnij Novgorod': 'Нижний Новгород',
   'Tula': 'Тула',
   'Nizhniy Novgorod': 'Нижний Новгород',
}

Таблица лучших пивоварен

Таблица, о которой идёт речь в материале, будет показывать топ-10 лучших российских пивоварен с изменением рейтинга. То есть мы сравниваем данные за два периода: [30 дней назад; сегодня] и [60 дней назад; 30 дней назад] и смотрим, как менялось место пивоварни в рейтинге. Соответственно, мы опишем следующие колонки: место в рейтинге, название пивоварни, ассортимент сортов пива, рейтинг пивоварни на untappd, изменение места и количество чекинов у этой пивоварни.
Опишем функцию get_top_russian_breweries, которая отправляет запрос к Clickhouse, получает общий топ пивоварен России, формирует данные и возвращает готовый для вывода DataFrame. Отправим два запроса — топ пивоварен за последние 30 дней и топ пивоварен за предыдущие 30 дней. Следующий запрос будет отбирать лучшие пивоварни, основываясь на количестве отзывов о пиве данной пивоварни.


Забираем данные из базы

def get_top_russian_breweries(checkins_n=250):
   top_n_brewery_today = client.execute(f'''
      SELECT  rt.brewery_id,
              rt.brewery_name,
              beer_pure_average_mult_count/count_for_that_brewery as avg_rating,
              count_for_that_brewery as checkins FROM (
      SELECT           
              brewery_id,
              dictGet('breweries', 'brewery_name', toUInt64(brewery_id)) as brewery_name,
              sum(rating_score) AS beer_pure_average_mult_count,
              count(rating_score) AS count_for_that_brewery
          FROM beer_reviews t1
          ANY LEFT JOIN venues AS t2 ON t1.venue_id = t2.venue_id
          WHERE isNotNull(venue_id) AND (created_at >= (today() - 30)) AND (venue_country = 'Россия') 
          GROUP BY           
              brewery_id,
              brewery_name) rt
      WHERE (checkins>={checkins_n})
      ORDER BY avg_rating DESC
      LIMIT 10
      '''
   )

top_n_brewery_n_days = client.execute(f'''
  SELECT  rt.brewery_id,
          rt.brewery_name,
          beer_pure_average_mult_count/count_for_that_brewery as avg_rating,
          count_for_that_brewery as checkins FROM (
  SELECT           
          brewery_id,
          dictGet('breweries', 'brewery_name', toUInt64(brewery_id)) as brewery_name,
          sum(rating_score) AS beer_pure_average_mult_count,
          count(rating_score) AS count_for_that_brewery
      FROM beer_reviews t1
      ANY LEFT JOIN venues AS t2 ON t1.venue_id = t2.venue_id
      WHERE isNotNull(venue_id) AND (created_at >= (today() - 60) AND created_at <= (today() - 30)) AND (venue_country = 'Россия')
      GROUP BY           
          brewery_id,
          brewery_name) rt
  WHERE (checkins>={checkins_n})
  ORDER BY avg_rating DESC
  LIMIT 10
  '''
)

Формируем из полученных строк два DataFrame:

top_n = len(top_n_brewery_today)
column_names = ['brewery_id', 'brewery_name', 'avg_rating', 'checkins']

top_n_brewery_today_df = pd.DataFrame(top_n_brewery_today, columns=column_names).replace(np.nan, 0)
top_n_brewery_today_df['brewery_pure_average'] = round(top_n_brewery_today_df.avg_rating, 2)
top_n_brewery_today_df['brewery_rank'] = list(range(1, top_n + 1))

top_n_brewery_n_days = pd.DataFrame(top_n_brewery_n_days, columns=column_names).replace(np.nan, 0)
top_n_brewery_n_days['brewery_pure_average'] = round(top_n_brewery_n_days.avg_rating, 2)
top_n_brewery_n_days['brewery_rank'] = list(range(1, len(top_n_brewery_n_days) + 1))

А затем в итераторе считаем, как изменилось место за последнее время у пивоварни. Обработаем исключение на случай, если 60 дней назад этой пивоварни в нашей базе ещё не было.

rank_was_list = []
for brewery_id in top_n_brewery_today_df.brewery_id:
   try:
       rank_was_list.append(
           top_n_brewery_n_days[top_n_brewery_n_days.brewery_id == brewery_id].brewery_rank.item())
   except ValueError:
       rank_was_list.append('–')
top_n_brewery_today_df['rank_was'] = rank_was_list

Теперь пройдёмся по полученным колонкам с текущими местами и изменениями. Если они не пустые, то при положительном изменении добавим к записи стрелочку вверх. При отрицательном — стрелочку вниз.

diff_rank_list = []
for rank_was, rank_now in zip(top_n_brewery_today_df['rank_was'], top_n_brewery_today_df['brewery_rank']):
   if rank_was != '–':
       difference = rank_was - rank_now
       if difference > 0:
           diff_rank_list.append(f'↑ +{difference}')
       elif difference < 0:
           diff_rank_list.append(f'↓ {difference}')
       else:
           diff_rank_list.append('–')
   else:
       diff_rank_list.append(rank_was)

Наконец, разметим итоговый DataFrame и вставим в него колонку с текущим местом. При этом у топ-3 будет отображаться эмодзи с золотым кубком.

df = top_n_brewery_today_df[['brewery_name', 'avg_rating', 'checkins']].round(2)
df.insert(2, 'Изменение', diff_rank_list)
df.columns = ['НАЗВАНИЕ', 'РЕЙТИНГ', 'ИЗМЕНЕНИЕ', 'ЧЕКИНОВ']
df.insert(0, 'МЕСТО',
         list('🏆 ' + str(i) if i in [1, 2, 3] else str(i) for i in range(1, len(df) + 1)))

return df

#Выбор пивоварен с фильтром по городам
Одна из функций нашего дашборда — просмотр топа пивоварен по конкретному городу. Для корректной работы напишем скрипт, который для каждого из списка российских городов получает топ пивоварен по числу чекинов и записывает данные по каждому городу в отдельные csv-файлы. В сущности, он мало чем отличается от предыдущего — рассмотрим главные отличия.

Прежде всего, функция принимает конкретный город. Мы уже отметили, что города в базе данных записаны на латинице — поэтому сначала переводим наименование города. В случае с Санкт-Петербургом, Нижним Новгородом и Пермью придётся перевести вручную: например, Санкт-Петербург переводится в Google Translate как St. Petersburg вместо ожидаемого Saint Petersburg.

ru_city = venue_city
if ru_city == 'Санкт-Петербург':
   en_city = 'Saint Petersburg'
elif ru_city == 'Нижний Новгород':
   en_city = 'Nizhnij Novgorod'
elif ru_city == 'Пермь':
   en_city = 'Perm'
else:
   en_city = translator.translate(ru_city, dest='en').text

Следующее отличие — запрос к базе. Нам нужно добавить в него условие совпадения по городу, чтобы получать чекины только в запрошенном городе:

WHERE (rt.venue_city='{ru_city}' OR rt.venue_city='{en_city}')

Наконец, сформированный DataFrame мы не возвращаем, а сохраняем в директорию data/cities.

df = top_n_brewery_today_df[['brewery_name', 'venue_city', 'avg_rating', 'checkins']].round(2)
df.insert(3, 'Изменение', diff_rank_list)
df.columns = ['НАЗВАНИЕ', 'ГОРОД', 'РЕЙТИНГ', 'ИЗМЕНЕНИЕ', 'ЧЕКИНОВ']
df.to_csv(f'data/cities/{en_city}.csv', index=False)  # saving DF
print(f'{en_city}.csv updated!')

Обновление таблиц по расписанию

Наш дашборд будет использовать библиотеку apscheduler для вызова последней функции по расписанию и обновления таблиц по городам. Следующие строки добавим в файл application.py: scheduler будет обновлять данные для каждого города из списка all_cities ежедневно в 13:30 по МСК.

from apscheduler.schedulers.background import BackgroundScheduler
from get_tables import update_best_breweries

all_cities = sorted(['Москва', 'Сергиев Посад', 'Санкт-Петербург', 'Владимир',
             'Красная Пахра', 'Воронеж', 'Екатеринбург', 'Ярославль', 'Казань',
             'Ростов-на-Дону', 'Краснодар', 'Тула', 'Курск', 'Пермь', 'Нижний Новгород'])

scheduler = BackgroundScheduler()
@scheduler.scheduled_job('cron', hour=10, misfire_grace_time=30)
def update_data():
   for city in all_cities:
       update_best_breweries(city)
scheduler.start()

Формирование таблицы

Наконец, опишем заключительную функцию get_top_russian_breweries_table(venue_city, checkins_n=250) — она будет принимать город, количество чекинов и будет возвращать сформированную таблицу dbc. Второй параметр — checkins_n будет отсеивать пивоварни, у которых чекинов меньше значения этой переменной. Если город не указан, сразу вызываем ранее описанную get_top_russian_breweries(checkins_n) — она вернёт общую статистику за последнее время. В противном случае снова переводим города на латиницу.

if venue_city == None: 
   selected_df = get_top_russian_breweries(checkins_n)
else: 
   ru_city = venue_city
   if ru_city == 'Санкт-Петербург':
       en_city = 'Saint Petersburg'
   elif ru_city == 'Нижний Новгород':
       en_city = 'Nizhnij Novgorod'
   elif ru_city == 'Пермь':
       en_city = 'Perm'
   else:
       en_city = translator.translate(ru_city, dest='en').text

Читаем все строки из таблицы с нужным городом и проверяем количество чекинов каждой пивоварни. В самом начале материала мы завели словарь city_names. При помощи функции map() мы пишем лямбда-выражение, которое возвращает значение ключа словаря city_names только если входной аргумент из колонки df[‘ГОРОД’] совпадает с каким-либо из ключей в city_names. В случае, если совпадения не будет возвращает просто x во избежание np.Nan.

Например, для наименования «СПБ» в колонке df[‘ГОРОД’] вернётся значение «Санкт-Петербург», так как такой ключ есть в city_names. Для «Воронеж» название таким и останется, так как совпадающий ключ не найден. В конце удаляем возможные дубликаты из DataFrame, добавляем колонку с номером места пивоварни и забираем себе первые 10 строк — это и будет топ-10 пивоварен по нужному городу.

df = pd.read_csv(f'data/cities/{en_city}.csv')
df = df.loc[df['ЧЕКИНОВ'] >= checkins_n]
df['ГОРОД'] = df['ГОРОД'].map(lambda x: city_names[x] if (x in city_names) else x)
df.drop_duplicates(subset=['НАЗВАНИЕ', 'ГОРОД'], keep='first', inplace=True) 
df.insert(0, 'МЕСТО', list('🏆 ' + str(i) if i in [1, 2, 3] else str(i) for i in range(1, len(df) + 1)))
selected_df = df.head(10)

Вне зависимости от того, получали мы DataFrame общей функцией get_top_russian_breweries() или по конкретному городу, собираем таблицу, задаём стили и возвращаем готовый dbc-объект.


Вёрстка в Dash Bootstrap Components

table = dbc.Table.from_dataframe(selected_df, striped=False,
                                bordered=False, hover=True,
                                size='sm',
                                style={'background-color': '#ffffff',
                                       'font-family': 'Proxima Nova Regular',
                                       'text-align':'center',
                                       'fontSize': '12px'},
                                className='table borderless'
                                )

return table

Структура вёрстки

Опишем в application.py слайдер, таблицу и Dropdown-фильтр с выбором города.

О вёрстке дашборда при помощи Dash Bootstrap Components мы говорили в предыдущем материале цикла

checkins_slider_tab_1 = dbc.CardBody(
                           dbc.FormGroup(
                               [
                                   html.H6('Количество чекинов', style={'text-align': 'center'})),
                                   dcc.Slider(
                                       id='checkin_n_tab_1',
                                       min=0,
                                       max=250,
                                       step=25,
                                       value=250,  
                                       loading_state={'is_loading': True},
                                       marks={i: i for i in list(range(0, 251, 25))}
                                   ),
                               ],
                           ),
                           style={'max-height': '80px', 
                                  'padding-top': '25px'
                                  }
                       )

top_breweries = dbc.Card(
       [
           dbc.CardBody(
               [
                   dbc.FormGroup(
                       [
                           html.H6('Фильтр городов', style={'text-align': 'center'}),
                           dcc.Dropdown(
                               id='city_menu',
                               options=[{'label': i, 'value': i} for i in all_cities],
                               multi=False,
                               placeholder='Выберите город',
                               style={'font-family': 'Proxima Nova Regular'}
                           ),
                       ],
                   ),
                   html.P(id="tab-1-content", className="card-text"),
               ],
           ),
   ],
)

И для обновления таблицы по фильтру и слайдеру с минимальным количеством чекинов опишем callback с вызовом get_top_russian_breweries_table(city, checkin_n):

@app.callback(
   Output("tab-1-content", "children"), [Input("city_menu", "value"),
                                         Input("checkin_n_tab_1", "value")]
)
def table_content(city, checkin_n):
   return get_top_russian_breweries_table(city, checkin_n)

Готово! Напомню, в материале описан пример создания только одной таблицы. На данный момент дашборд помимо лучших пивоварен выдаёт лучшие и худшие сорта пива, а также средний рейтинг пива по регионам и отношение количества чекинов каждой пивоварни к её средней оценке.

Полный код проекта доступен на GitHub

Собираем топ-10 аккаунтов Instagram по теме аналитики и машинного обучения

Время чтения текста – 11 минут

В некоторых телеграм-каналах (раз, два) уже говорилось про другие интересные паблики в телеграме, однако по Instagram такого топа пока не было. Вероятно, это не самая популярная сеть для контента в нашей индустрии, тем не менее, можно проверить эту гипотезу, используя Python и данные. В этом материале рассказываем, как собрать данные по аккаунтам Instagram без API.

Метод сбора данных
Instagram API не позволит вам просто так собирать данные о других пользователях, но есть и другой метод. Можно отправить такой request-запрос:

https://instagram.com/leftjoin/?__a=1

И получить в ответе JSON-объект со всей информацией о пользователе, которую можно посмотреть самому: имя аккаунта, количество постов, подписок и подписчиков, а также первые десять постов с информацией про них: количество лайков, комментарии и прочее. Именно на таких request-запросах устроена библиотека pyInstagram.

Схема данных
Будем собирать данные в три таблицы Clickhouse: пользователи, посты и комментарии. В таблицу пользователей собираем всю информацию о них: идентификатор, наименование аккаунта, имя и фамилия человека, описание профиля, количество подписок и подписчиков, количество постов, суммарное количество комментариев и лайков, наличие верификации, география пользователя и ссылки на аватарку и Facebook.

CREATE TABLE instagram.users
(
    `added_at` DateTime,
    `user_id` UInt64,
    `user_name` String,
    `full_name` String,
    `base_url` String,
    `biography` String,
    `followers_count` UInt64,
    `follows_count` UInt64,
    `media_count` UInt64,
    `total_comments` UInt64,
    `total_likes` UInt64,
    `is_verified` UInt8,
    `country_block` UInt8,
    `profile_pic_url` Nullable(String),
    `profile_pic_url_hd` Nullable(String),
    `fb_page` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

В таблицу с постами сохраняем автора поста, идентификатор записи, текст, количество комментариев и прочее. is_ad, is_album и is_video — поля, проверяющие, является ли запись рекламной, «каруселью» изображений или видеозаписью.

CREATE TABLE instagram.posts
(
    `added_at` DateTime,
    `owner` String,
    `post_id` UInt64,
    `caption` Nullable(String),
    `code` String,
    `comments_count` UInt64,
    `comments_disabled` UInt8,
    `created_at` DateTime,
    `display_url` String,
    `is_ad` UInt8,
    `is_album` UInt8,
    `is_video` UInt8,
    `likes_count` UInt64,
    `location` Nullable(String),
    `recources` Array(String),
    `video_url` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

В таблице с комментариями храним отдельно каждый комментарий к записи с автором и текстом.

CREATE TABLE instagram.comments
(
    `added_at` DateTime,
    `comment_id` UInt64,
    `post_id` UInt64,
    `comment_owner` String,
    `comment_text` String
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

Скрипт
Из библиотеки pyInstagram нам понадобятся классы Account, Media, WebAgent и Comment.

from instagram import Account, Media, WebAgent, Comment
from datetime import datetime
from clickhouse_driver import Client
import requests
import pandas as pd

Создаем экземпляр класса WebAgent — он необходим для вызова некоторых методов и обновления аккаунтов. В начале нам нужно иметь хотя бы названия профилей пользователей, информацию о которых мы хотим собрать, поэтому отправим другой request-запрос для поиска пользователей по ключевым словам, их список ниже в фрагменте кода. В выдаче будут аккаунты, у которых название или описание профиля совпало с ключевым словом.

agent = WebAgent()
queries_list = ['machine learning', 'data science', 'data analytics', 'analytics', 'business intelligence',
                'data engineering', 'computer science', 'big data', 'artificial intelligence',
                'deep learning', 'data scientist','machine learning engineer', 'data engineer']
client = Client(host='12.34.56.789', user='default', password='', port='9000', database='instagram')
url = 'https://www.instagram.com/web/search/topsearch/?context=user&count=0'

Проходим по всем ключевым словам и собираем все аккаунты. Так как в списке могли образоваться дубликаты, переведём список в множество и обратно в список.

response_list = []
for query in queries_list:
    response = requests.get(url, params={
        'query': query
    }).json()
    response_list.extend(response['users'])
instagram_pages_list = []
for item in response_list:
    instagram_pages_list.append(item['user']['username'])
instagram_pages_list = list(set(instagram_pages_list))

Теперь проходим по списку аккаунтов, и если аккаунта с таким наименованием ещё не было в базе, то получаем расширенную информацию о нём. Для этого пробуем создать экземпляр класса Account, передав username параметром. После при помощи объекта agent обновляем информацию об аккаунте. Будем собирать только первые 100 постов, чтобы сбор не задерживался. Создадим список media_list — он при помощи метода get_media будет хранить код каждого поста, который затем можно будет получить при помощи класса Media.


Сбор медиа аккаунта

all_posts_list = []
username_count = 0
for username in instagram_pages_list:
    if client.execute(f"SELECT count(1) FROM users WHERE user_name='{username}'")[0][0] == 0:
        print('username:', username_count, '/', len(instagram_pages_list))
        username_count += 1
        account_total_likes = 0
        account_total_comments = 0
        try:
            account = Account(username)
        except Exception as E:
            print(E)
            continue
        try:
            agent.update(account)
        except Exception as E:
            print(E)
            continue
        if account.media_count < 100:
            post_count = account.media_count
        else:
            post_count = 100
        print(account, post_count)
        media_list, _ = agent.get_media(account, count=post_count, delay=1)
        count = 0

Мы начинаем с постов и комментариев, потому что для занесения в базу нового пользователя нам нужно подсчитать сперва суммарное количество комментариев и лайков в его аккаунте. Практически все интересующие поля являются атрибутами класса Media.


Сбор постов пользователя

for media_code in media_list:
            if client.execute(f"SELECT count(1) FROM posts WHERE code='{media_code}'")[0][0] == 0:
                print('posts:', count, '/', len(media_list))
                count += 1

                post_insert_list = []
                post = Media(media_code)
                agent.update(post)
                post_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                post_insert_list.append(str(post.owner))
                post_insert_list.append(post.id)
                if post.caption is not None:
                    post_insert_list.append(post.caption.replace("'","").replace('"', ''))
                else:
                    post_insert_list.append("")
                post_insert_list.append(post.code)
                post_insert_list.append(post.comments_count)
                post_insert_list.append(int(post.comments_disabled))
                post_insert_list.append(datetime.fromtimestamp(post.date).strftime('%Y-%m-%d %H:%M:%S'))
                post_insert_list.append(post.display_url)
                try:
                    post_insert_list.append(int(post.is_ad))
                except TypeError:
                    post_insert_list.append('cast(Null as Nullable(UInt8))')
                post_insert_list.append(int(post.is_album))
                post_insert_list.append(int(post.is_video))
                post_insert_list.append(post.likes_count)
                if post.location is not None:
                    post_insert_list.append(post.location)
                else:
                    post_insert_list.append('')
                post_insert_list.append(post.resources)
                if post.video_url is not None:
                    post_insert_list.append(post.video_url)
                else:
                    post_insert_list.append('')
                account_total_likes += post.likes_count
                account_total_comments += post.comments_count
                try:
                    client.execute(f'''
                        INSERT INTO posts VALUES {tuple(post_insert_list)}
                    ''')
                except Exception as E:
                    print('posts:')
                    print(E)
                    print(post_insert_list)

Чтобы собрать комментарии необходимо вызвать метод get_comments и передать параметром экземпляр класса Media.


Сбор комментариев из поста

comments = agent.get_comments(media=post)
                for comment_id in comments[0]:
                    comment_insert_list = []
                    comment = Comment(comment_id)
                    comment_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                    comment_insert_list.append(comment.id)
                    comment_insert_list.append(post.id)
                    comment_insert_list.append(str(comment.owner))
                    comment_insert_list.append(comment.text.replace("'","").replace('"', ''))
                    try:
                        client.execute(f'''
                            INSERT INTO comments VALUES {tuple(comment_insert_list)}
                        ''')
                    except Exception as E:
                        print('comments:')
                        print(E)
                        print(comment_insert_list)


Наконец, когда все посты и комментарии пройдены, можем занести информацию о пользователе.

Сбор информации о пользователе

user_insert_list = []
        user_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
        user_insert_list.append(account.id)
        user_insert_list.append(account.username)
        user_insert_list.append(account.full_name)
        user_insert_list.append(account.base_url)
        user_insert_list.append(account.biography)
        user_insert_list.append(account.followers_count)
        user_insert_list.append(account.follows_count)
        user_insert_list.append(account.media_count)
        user_insert_list.append(account_total_comments)
        user_insert_list.append(account_total_likes)
        user_insert_list.append(int(account.is_verified))
        user_insert_list.append(int(account.country_block))
        user_insert_list.append(account.profile_pic_url)
        user_insert_list.append(account.profile_pic_url_hd)
        if account.fb_page is not None:
            user_insert_list.append(account.fb_page)
        else:
            user_insert_list.append('')
        try:
            client.execute(f'''
                INSERT INTO users VALUES {tuple(user_insert_list)}
            ''')
        except Exception as E:
            print('users:')
            print(E)
            print(user_insert_list)

Результаты
Таким методом нам удалось собрать 500 пользователей, 20 тысяч постов и 40 тысяч комментариев. Теперь можем написать простой запрос к базе и получить топ-10 Instagram-аккаунтов по теме аналитики и машинного обучения за последнее время:

SELECT *
FROM users
ORDER BY followers_count DESC
LIMIT 10

А вот и приятный бонус, для тех, кто искал на какие аккаунты в Instagram подписаться по релевантной тематике:

  1. @ai_machine_learning
  2. @neuralnine
  3. @datascienceinfo
  4. @compscistuff
  5. @computersciencelife
  6. @welcome.ai
  7. @papa_programmer
  8. @data_science_learn
  9. @neuralnet.ai
  10. @techno_thinkers

Полный код проекта доступен на GitHub

Доклады онлайн-конференции FutureData

Время чтения текста – 14 минут

C 8 по 9 сентября состоялась онлайн-конференция FutureData, в которой я принял участие. Вчера организаторы опубликовали записи докладов. Хочу поделиться своими наблюдениями и докладами, которые заинтересовали меня. Постарался собрать максимально релевантные скриншоты, но сразу извиняюсь за их качество, выдирал прямо из видео.

Featured Keynote: Automating Analysis
Спикер: Pat Hanrahan
В докладе профессор Стэнфордского университета и сооснователь Tableau рассуждает об использовании AI в аналитике. Доклад получился монотонным, и большую часть времени Pat обсуждает где мы сейчас, как мы используем AI, однако секция вопросов и ответов получилась интересная.

The Modern Data Stack: Past, Present, and Future
Спикер: Tristan Handy
Автор знаменитой публикации о руководстве по аналитике для основателя стартапа и создатель dbt рассуждает о том, как менялся современный data-stack с 2012 по 2020 год. Для меня доклад оказался наиболее интересным, особенно учитывая, что Tristan делает предсказания о том, что будет расти и развиваться в data-stack в ближайшее время.

Making Enterprise Data Timelier and More Reliable with Lakehouse Technology
Спикер: Matei Zaharia
Доклад главного технолога DataBricks. К сожалению, в докладе большие проблемы с аудио, но Matei рассматривает проблемы современного Data Lake, а дальше продвигает технологию DataBricks — DeltaLake. Как по мне, доклад получился рекламным, но послушать интересно.

How to Close the Analytic Divide
Спикер: Alan Jacobson
Chief Data Officer из Alteryx рассуждает о профессии Data Scientist и приводит статистику по зарплатам, в которой средняя зарплата специалиста по данным существенно выше, чем у остальных аналитиков. К слову, наше недавнее исследование с Ромой Буниным это подтверждает. Далее Alan обсуждает выручку компаний, находящихся на разных стадиях аналитического развития. Более развитые — (сюрприз!) растут быстрее. Отдельная часть доклада посвящена изменениям в трансформации к подходу к работе с данными, а в конце небольшое рекламное интро Alteryx. Доклад смотрится легко.

Hot Analytics — Handle with Care
Спикер: Gian Merlino
Co-Founder и CTO Imply приводит сравнение hot & cold data (намек на Snowflake?). Затем — демонстрация некоторой BI от Imply с простеньким интерфейсом и реализованным drag-n-drop. Далее Gian рассказывает о возможных аналитических архитектурах и затрагивает тему Druid, на которой построен Imply.

Анализ рынка вакансий аналитики и BI: дашборд в Tableau

Время чтения текста – 16 минут

По данным рейтинга SimilarWeb, hh.ru — третий по популярности сайт о трудоустройстве в мире. В одном из разговоров с Ромой Буниным у нас появилась идея сделать совместный проект: собрать данные из открытого HeadHunter API и визуализировать их при помощи Tableau Public. Нам захотелось понять, как меняется зарплата в зависимости от указанных в вакансии навыков, наименования позиции и сравнить, как обстоят дела в Москве, Санкт-Петербурге и регионах.

Как мы собирали данные?

Схема данных основана на коротком представлении вакансии, которую возвращает метод GET /vacancies. Из представления собираются следующие поля: тип вакансии, идентификатор, премиальность вакансии, необходимость прохождения тестирования, адрес компании, информация о зарплате, график работы и другие. Соответствующий CREATE-запрос для таблицы:


Запрос создания таблицы vacancies_short

CREATE TABLE headhunter.vacancies_short
(
    `added_at` DateTime,
    `query_string` String,
    `type` String,
    `level` String,
    `direction` String,
    `vacancy_id` UInt64,
    `premium` UInt8,
    `has_test` UInt8,
    `response_url` String,
    `address_city` String,
    `address_street` String,
    `address_building` String,
    `address_description` String,
    `address_lat` String,
    `address_lng` String,
    `address_raw` String,
    `address_metro_stations` String,
    `alternate_url` String,
    `apply_alternate_url` String,
    `department_id` String,
    `department_name` String,
    `salary_from` Nullable(Float64),
    `salary_to` Nullable(Float64),
    `salary_currency` String,
    `salary_gross` Nullable(UInt8),
    `name` String,
    `insider_interview_id` Nullable(UInt64),
    `insider_interview_url` String,
    `area_url` String,
    `area_id` UInt64,
    `area_name` String,
    `url` String,
    `published_at` DateTime,
    `employer_url` String,
    `employer_alternate_url` String,
    `employer_logo_urls_90` String,
    `employer_logo_urls_240` String,
    `employer_logo_urls_original` String,
    `employer_name` String,
    `employer_id` UInt64,
    `response_letter_required` UInt8,
    `type_id` String,
    `type_name` String,
    `archived` UInt8,
    `schedule_id` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY vacancy_id

Первый скрипт собирает данные с HeadHunter по API и отправляет их в Clickhouse. Он использует следующие библиотеки:

import requests
from clickhouse_driver import Client
from datetime import datetime
import pandas as pd
import re

Далее загружаем таблицу с запросами и подключаемся к CH:

queries = pd.read_csv('hh_data.csv')
client = Client(host='1.234.567.890', user='default', password='', port='9000', database='headhunter')

Таблица queries хранит список поисковых запросов. Она содержит следующие колонки: тип запроса, уровень вакансии для поиска, направление вакансии и саму поисковую фразу. В строку с запросом можно помещать логические операторы: например, чтобы найти вакансии, в которых должны присутствовать ключевые слова «Python», «data» и «анализ» между ними можно указать логическое «И».

Не всегда вакансии в выдаче соответствуют ожиданиям: случайно в базу могут попасть повара, маркетологи и администраторы магазина. Чтобы этого не произошло, опишем функцию check_name(name) — она будет принимать наименование вакансии и возвращать True в случае, если вакансия не подошла по названию.

def check_name(name):
    bad_names = [r'курьер', r'грузчик', r'врач', r'менеджер по закупу',
           r'менеджер по продажам', r'оператор', r'повар', r'продавец',
          r'директор магазина', r'директор по продажам', r'директор по маркетингу',
          r'кабельщик', r'начальник отдела продаж', r'заместитель', r'администратор магазина', 
          r'категорийный', r'аудитор', r'юрист', r'контент', r'супервайзер', r'стажер-ученик', 
          r'су-шеф', r'маркетолог$', r'региональный', r'ревизор', r'экономист', r'ветеринар', 
          r'торговый', r'клиентский', r'начальник цеха', r'территориальный', r'переводчик', 
          r'маркетолог /', r'маркетолог по']
    for item in bad_names:
        if re.match(item, name):
            return True

Затем объявляем бесконечный цикл — мы собираем данные без перерыва. Идём по DataFrame queries и сразу забираем оттуда тип вакансии, уровень, направление и поисковый запрос в отдельные переменные. Сначала по ключевому слову отправляем один запрос к методу /GET vacancies и получаем количество страниц. После идём от нулевой до последней страницы, отправляем те же запросы и заполняем список vacancies_from_response с полученными в выдаче короткими представлениями всех вакансий. В параметрах указываем 10 вакансий на страницу — больше ограничения HH API получить не позволяют. Так как мы не указали параметр area, API возвращает вакансии по всему миру.

while True:
   for query_type, level, direction, query_string in zip(queries['Тип'], queries['Уровень'], queries['Направление'], queries['Ключевое слово']):
           print(f'ключевое слово: {query_string}')
           url = 'https://api.hh.ru/vacancies'
           par = {'text': query_string, 'per_page':'10', 'page':0}
           r = requests.get(url, params=par).json()
           added_at = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
           pages = r['pages']
           found = r['found']
           vacancies_from_response = []

           for i in range(0, pages + 1):
               par = {'text': query_string, 'per_page':'10', 'page':i}
               r = requests.get(url, params=par).json()
               try:
                   vacancies_from_response.append(r['items'])
               except Exception as E:
                   continue

Теперь проходим по каждой вакансии на каждой странице двойным итератором. Сперва отправим запрос к Clickhouse и проверим, нет ли уже в базе вакансии с таким идентификатором и таким поисковым запросом. Если проверка пройдена — проверяем название вакансии. В случае неудачи переходим к следующей.

for item in vacancies_from_response:
               for vacancy in item:
                   if client.execute(f"SELECT count(1) FROM vacancies_short WHERE vacancy_id={vacancy['id']} AND query_string='{query_string}'")[0][0] == 0:
                       name = vacancy['name'].replace("'","").replace('"','')
                       if check_name(name):
                           continue

Теперь проходим по вакансии и собираем все нужные поля. В случае отсутствия некоторых данных будем отправлять пустые строки:


Код для сбора данных о вакансии

vacancy_id = vacancy['id']
                       is_premium = int(vacancy['premium'])
                       has_test = int(vacancy['has_test'])
                       response_url = vacancy['response_url']
                       try:
                           address_city = vacancy['address']['city']
                           address_street = vacancy['address']['street']
                           address_building = vacancy['address']['building']
                           address_description = vacancy['address']['description']
                           address_lat = vacancy['address']['lat']
                           address_lng = vacancy['address']['lng']
                           address_raw = vacancy['address']['raw']
                           address_metro_stations = str(vacancy['address']['metro_stations']).replace("'",'"')
                       except TypeError:
                           address_city = ""
                           address_street = ""
                           address_building = ""
                           address_description = ""
                           address_lat = ""
                           address_lng = ""
                           address_raw = ""
                           address_metro_stations = ""
                       alternate_url = vacancy['alternate_url']
                       apply_alternate_url = vacancy['apply_alternate_url']
                       try:
                           department_id = vacancy['department']['id']
                       except TypeError as E:
                           department_id = ""
                       try:
                           department_name = vacancy['department']['name']
                       except TypeError as E:
                           department_name = ""
                       try:
                           salary_from = vacancy['salary']['from']
                       except TypeError as E:
                           salary_from = "cast(Null as Nullable(UInt64))"
                       try:
                           salary_to = vacancy['salary']['to']
                       except TypeError as E:
                           salary_to = "cast(Null as Nullable(UInt64))"
                       try:
                           salary_currency = vacancy['salary']['currency']
                       except TypeError as E:
                           salary_currency = ""
                       try:
                           salary_gross = int(vacancy['salary']['gross'])
                       except TypeError as E:
                           salary_gross = "cast(Null as Nullable(UInt8))"
                       try:
                           insider_interview_id = vacancy['insider_interview']['id']
                       except TypeError:
                           insider_interview_id = "cast(Null as Nullable(UInt64))"
                       try:
                           insider_interview_url = vacancy['insider_interview']['url']
                       except TypeError:
                           insider_interview_url = ""
                       area_url = vacancy['area']['url']
                       area_id = vacancy['area']['id']
                       area_name = vacancy['area']['name']
                       url = vacancy['url']
                       published_at = vacancy['published_at']
                       published_at = datetime.strptime(published_at,'%Y-%m-%dT%H:%M:%S%z').strftime('%Y-%m-%d %H:%M:%S')
                       try:
                           employer_url = vacancy['employer']['url']
                       except Exception as E:
                           print(E)
                           employer_url = ""
                       try:
                           employer_alternate_url = vacancy['employer']['alternate_url']
                       except Exception as E:
                           print(E)
                           employer_alternate_url = ""
                       try:
                           employer_logo_urls_90 = vacancy['employer']['logo_urls']['90']
                           employer_logo_urls_240 = vacancy['employer']['logo_urls']['240']
                           employer_logo_urls_original = vacancy['employer']['logo_urls']['original']
                       except Exception as E:
                           print(E)
                           employer_logo_urls_90 = ""
                           employer_logo_urls_240 = ""
                           employer_logo_urls_original = ""
                       employer_name = vacancy['employer']['name'].replace("'","").replace('"','')
                       try:
                           employer_id = vacancy['employer']['id']
                       except Exception as E:
                           print(E)
                       response_letter_required = int(vacancy['response_letter_required'])
                       type_id = vacancy['type']['id']
                       type_name = vacancy['type']['name']
                       is_archived = int(vacancy['archived'])

Последнее поле — график работы. В случае, если вакансия подразумевает вахтовый метод работы она нам точно не подходит.

try:
    schedule = vacancy['schedule']['id']
except Exception as E:
    print(E)
    schedule = ''"
if schedule == 'flyInFlyOut':
    continue

Теперь формируем список из полученных переменных, заменяем в нём None-значения на пустые строки во избежании конфликтов с Clickhouse и вставляем строку в таблицу.

vacancies_short_list = [added_at, query_string, query_type, level, direction, vacancy_id, is_premium, has_test, response_url, address_city, address_street, address_building, address_description, address_lat, address_lng, address_raw, address_metro_stations, alternate_url, apply_alternate_url, department_id, department_name,
salary_from, salary_to, salary_currency, salary_gross, insider_interview_id, insider_interview_url, area_url, area_name, url, published_at, employer_url, employer_logo_urls_90, employer_logo_urls_240,  employer_name, employer_id, response_letter_required, type_id, type_name, is_archived, schedule]
for index, item in enumerate(vacancies_short_list):
    if item is None:
        vacancies_short_list[index] = ""
tuple_to_insert = tuple(vacancies_short_list)
print(tuple_to_insert)
client.execute(f'INSERT INTO vacancies_short VALUES {tuple_to_insert}')

Как подключили Tableau к данным?

Tableau Public не умеет работать с базами данных, поэтому мы написали коннектор Clickhouse к Google Sheets. Он использует библиотеки gspread и oauth2client для авторизации в Google Spreadsheets API и библиотеку schedule для ежедневной работы по графику.

Работа с Google Spreadseets API подробно разобрана в материале «Собираем данные по рекламным кампаниям ВКонтакте»

import schedule
from clickhouse_driver import Client
import gspread
import pandas as pd
from oauth2client.service_account import ServiceAccountCredentials
from datetime import datetime

scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
client = Client(host='54.227.137.142', user='default', password='', port='9000', database='headhunter')
creds = ServiceAccountCredentials.from_json_keyfile_name('credentials.json', scope)
gc = gspread.authorize(creds)

Опишем функцию update_sheet() — она будет брать все данные из Clickhouse и вставлять их в таблицу Google Docs.

def update_sheet():
   print('Updating cell at', datetime.now())
   columns = []
   for item in client.execute('describe table headhunter.vacancies_short'):
       columns.append(item[0])
   vacancies = client.execute('SELECT * FROM headhunter.vacancies_short')
   df_vacancies = pd.DataFrame(vacancies, columns=columns)
   df_vacancies.to_csv('vacancies_short.csv', index=False)
   content = open('vacancies_short.csv', 'r').read()
   gc.import_csv('1ZWS2kqraPa4i72hzp0noU02SrYVo0teD7KZ0c3hl-UI', content.encode('utf-8'))

Чтобы скрипт запускался в 16:00 по МСК каждый день используем библиотеку schedule:

schedule.every().day.at("13:00").do(update_sheet)
while True:
   schedule.run_pending()

А что в результате?

Рома построил на полученных данных дашборд.

И в youtube-ролике рассказывает о том, как эффективно использовать дашборд

Инсайты, которые можно извлечь из дашборда

  1. Аналитики с навыком бизнес-аналитики востребованы на рынке больше всего: по такому запросу нашлось больше всего вакансий. Тем не менее, средняя зарплата выше у продуктовых аналитиков и аналитиков BI.
  2. В Москве средние зарплаты выше на 10-30 тысяч рублей, чем в Санкт-Петербурге и на 30-40 тысячи рублей, чем в регионах. Там же работы нашлось больше всего в России.
  3. Самые высокооплачиваемые должности: руководитель отдела аналитики (в среднем, 110 тыс. руб. в месяц), инженер баз данных (138 тыс. руб. в месяц) и директор по машинному обучению (250 тыс. руб. в месяц).
  4. Самые полезные навыки на рынке — владение Python c библиотеками pandas и numpy, Tableau, Power BI, Etl и Spark. Вакансий с такими требованиями больше и зарплаты в них указаны выше прочих. Для Python-программистов знание matplotlib ценится на рынке выше, чем владение plotly.

Полный код проекта доступен на GitHub

Ранее Ctrl + ↓