Valiotti Analytics — построение аналитики для мобильных и digital-стартапов
    DataMarathon.ru — семидневный интенсив в области аналитики для начинающих
11 заметок с тегом

sql

Нормализация данных через запрос в SQL

Время чтения текста – 8 минут

Главный принцип анализа данных GIGO (от англ. garbage in — garbage out, дословный перевод «мусор на входе — мусор на выходе») говорит нам о том, что ошибки во входных данных всегда приводят к неверным результатам анализа. От того, насколько хорошо подготовлены данные, зависят результаты всей вашей работы.

Например, перед нами стоит задача подготовить выборку для использования в алгоритме машинного обучения (модели k-NN, k-means, логической регрессии и др). Признаки в исходном наборе данных могут быть в разном масштабе, как, например, возраст и рост человека. Это может привести к некорректной работе алгоритма. Такого рода данные нужно предварительно масштабировать.

В данном материале мы рассмотрим способы масштабирования данных через запрос в SQL: масштабирование методом min-max, min-max для произвольного диапазона и z-score нормализация. Для каждого из методов мы подготовили по два примера написания запроса — один с помощью подзапроса SELECT, а второй используя оконную функцию OVER().

Для работы возьмем таблицу students с данными о росте учащихся.

name height
Иван 174
Петр 181
Денис 199
Ксения 158
Сергей 179
Ольга 165
Юлия 152
Кирилл 188
Антон 177
Софья 165

Min-Max масштабирование

Подход min-max масштабирования заключается в том, что данные масштабируются до фиксированного диапазона, который обычно составляет от 0 до 1. В данном случае мы получим все данные в одном масштабе, что исключит влияние выбросов на выводы.

Выполним масштабирование по формуле:

Умножаем числитель на 1.0, чтобы в результате получилось число с плавающей точкой.

SQL-запрос с подзапросом:

SELECT height, 
       1.0 * (height-t1.min_height)/(t1.max_height - t1.min_height) AS scaled_minmax
  FROM students, 
      (SELECT min(height) as min_height, 
              max(height) as max_height 
         FROM students
      ) as t1;

SQL-запрос с оконной функцией:

SELECT height, 
       (height - MIN(height) OVER ()) * 1.0 / (MAX(height) OVER () - MIN(height) OVER ()) AS scaled_minmax
  FROM students;

В результате мы получим переменные в диапазоне [0...1], где за 0 принят рост самого невысокого учащегося, а 1 рост самого высокого.

name height scaled_minmax
Иван 174 0.46809
Петр 181 0.61702
Денис 199 1
Ксения 158 0.12766
Сергей 179 0.57447
Ольга 165 0.2766
Юлия 152 0
Кирилл 188 0.76596
Антон 177 0.53191
Софья 165 0.2766

Масштабирование для заданного диапазона

Вариант min-max нормализации для произвольных значений. Не всегда, когда речь идет о масштабировании данных, диапазон значений находится в промежутке между 0 и 1.
Формула для вычисления в этом случае такая:

Это даст нам возможность масштабировать данные к произвольной шкале. В нашем примере пусть а=10.0, а b=20.0.

SQL-запрос с подзапросом:

SELECT height, 
       ((height - min_height) * (20.0 - 10.0) / (max_height - min_height)) + 10 AS scaled_ab
  FROM students,
      (SELECT MAX(height) as max_height, 
              MIN(height) as min_height
         FROM students  
      ) t1;

SQL-запрос с оконной функцией:

SELECT height, 
       ((height - MIN(height) OVER() ) * (20.0 - 10.0) / (MAX(height) OVER() - MIN(height) OVER())) + 10.0 AS scaled_ab
  FROM students;

Получаем аналогичные результаты, что и в предыдущем методе, но данные распределены в диапазоне от 10 до 20.

name height scaled_ab
Иван 174 14.68085
Петр 181 16.17021
Денис 199 20
Ксения 158 11.2766
Сергей 179 15.74468
Ольга 165 12.76596
Юлия 152 10
Кирилл 188 17.65957
Антон 177 15.31915
Софья 165 12.76596

Нормализация с помощью z-score

В результате z-score нормализации данные будут масштабированы таким образом, чтобы они имели свойства стандартного нормального распределения — среднее (μ) равно 0, а стандартное отклонение (σ) равно 1.

Вычисляется z-score по формуле:

SQL-запрос с подзапросом:

SELECT height, 
       (height - t1.mean) * 1.0 / t1.sigma AS zscore
  FROM students,
      (SELECT AVG(height) AS mean, 
              STDDEV(height) AS sigma
         FROM students
        ) t1;

SQL-запрос с оконной функцией:

SELECT height, 
       (height - AVG(height) OVER()) * 1.0 / STDDEV(height) OVER() AS z-score
  FROM students;

В результате мы сразу заметим выбросы, которые выходят за пределы стандартного отклонения.

name height zscore
Иван 174 0.01488
Петр 181 0.53582
Денис 199 1.87538
Ксения 158 -1.17583
Сергей 179 0.38698
Ольга 165 -0.65489
Юлия 152 -1.62235
Кирилл 188 1.05676
Антон 177 0.23814
Софья 165 -0.65489

Транзакции в SQLAlchemy

Время чтения текста – 5 минут

Транзакция — последовательность действий, связанных с базой данных. Их основная польза заключается в том, что при возникновении какой-то ошибки или достижении других нужных условий всю транзакцию можно отменить, и все изменения, примененные к базе данных, будут отменены. Сегодня мы напишем небольшой скрипт, который при помощи транзакций SQLAlchemy пишет информацию о подписчиках сообщества в базу данных MySQL, а при возникновении ошибки отменяет текущую транзакцию.

Сбор информации об участниках через VK API

Для начала напишем пару маленьких функций — первая будет возвращать число подписчиков сообщества, а вторая — отправлять запрос и формировать датафрейм с информацией о подписчиках сообщества.

Подробнее о том, как получить токен, можно прочитать в материале «Собираем данные по рекламным кампаниям ВКонтакте»

from sqlalchemy import create_engine
import pandas as pd
import requests
import time

token = '42hj2ehd3djdournf48fjurhf9r9o2eurnf48fjurhf9r9734'
group_id = 'leftjoin'

Чтобы узнать число подписчиков достаточно отправить метод groups.getMembers с любыми параметрами — в ответе всегда возвращается количество в поле count.

def get_subs_count(group_id):
    count = requests.get('https://api.vk.com/method/groups.getMembers', params={
        'access_token':token,
        'v':5.103,
        'group_id':group_id
    }).json()['response']['count']
    return count

Для примера будем брать имена, id, фамилии подписчиков, некоторую расширенную информацию и получать только по 10 подписчиков за раз, чтобы рассмотреть работу транзакций детально — каждые 10 подписчиков будут вставляться одной транзакцией. Введём дополнительное поле offset, чтобы знать, в какой итерации добавлены строки.

def get_subs_info(group_id, offset):
    response = requests.get('https://api.vk.com/method/groups.getMembers', params={
        'access_token':token,
        'v':5.103,
        'group_id':group_id,
        'offset':offset,
        'count':10,
        'fields':'sex, has_mobile, relation, can_post'
    }).json()['response']['items']
    df = pd.DataFrame(response)
    df['offset'] = offset
    return df

Транзакции

Наконец, можем подсоединиться к базе данных при помощи SQLAlchemy:

engine = create_engine('mysql+mysqlconnector://' +
                           'root' + ':' + '' + '@' +
                           'localhost' + '/' +
                           'transaction', echo=False)

У транзакций всегда должно быть начало — begin, и конец — commit. В случае, если произошла какая-то ошибка, можно сделать откат — rollback. Сперва получаем число подписчиков сообщество, и в каждой итерации цикла при помощи контекстного менеджера with ... as создаём новое подключение. Сразу после объявляем начало транзакции по этому подключению и с обработчиком исключений пробуем получить информацию о десяти подписчиках через функцию get_subs_info. Вставляем полученный датафрейм в таблицу методом to_sql и завершаем транзакцию при помощи метода commit(). В случае, если возникла какая-то ошибка — печатаем её на экран и отменяем транзакцию.

offset = 0
subs_count = get_subs_count(group_id)
while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            transaction.commit()
        except Exception as E:
            print(E)
            transaction.rollback()
    time.sleep(1)
    offset += 10

Чтобы протестировать работу транзакций слегка обновим последний блок кода — добавим вызов ошибки ValueError после вставки данных в базу, если текущий offset равен 10.

offset = 0
subs_count = get_subs_count(group_id)
while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            if offset == 10:
                raise(ValueError)
            transaction.commit()
        except Exception as E:
            print(E)
            transaction.rollback()
    time.sleep(1)
    offset += 10

Как и планировалось, данные за итерацию с offset = 10 не занесены в таблицу. Несмотря на то, что ошибка возникла уже после добавления новых данных, транзакция была прервана методом rollback() и завершение транзакции было отменено.

UNPIVOT данных с использованием CROSS JOIN

Время чтения текста – 5 минут

Зачастую мы получаем данные в предагрегированном виде, когда каждая отдельная колонка является посчитанной метрикой. По аналогии мы получаем подобный результат, когда строим сводную таблицу в Excel и используем некоторое количество фактов для агрегации. Но что делать, если нам нужно произвести обратную операцию — Unpivot?

Как поступить, если в датасете понадобилось трансформировать данные в реляционный вид? В Tableau есть фича Unpivot, которая сделает всё сама: если датасет построен из файла, достаточно выделить нужные колонки и нажать на кнопку «Pivot». А в некоторых диалектах SQL, например, в Transact, уже есть встроенные функции, которые тоже делают это сами.

Но в случае, если датасет построен на Custom SQL Query из базы данных, у которой в арсенале отсутствуют встроенные функции для трансформации в сводную и обратно, необходим какой-то другой подход, и Tableau порекомендует для такой таблицы:

ID a b c
1 a1 b1 c1
2 a2 b2 c2

Воспользоваться таким стандартным универсальным, но не очень эффективным решением:

select id, ‘a’ AS col, a AS value
from yourtable
union all
select id, ‘b’ AS col, b AS value
from yourtable
union all
select id, ‘c’ AS col, c AS value
from yourtable

И в результате получить таблицу вида:

id col value
1 a a1
2 a a2
1 b b1
2 b b2
1 c c1
2 c c2

Порой, когда мы работаем с физической таблицей и нам надо быстро получить результаты для двух-трех колонок, действительно, подобное решение можно быстро применить, не задумываясь. Однако в случае, когда вместо таблицы содержится, например, сложный подзапрос с несколькими джойнами и нужно сделать Pivot для 5+ колонок, подзапрос вызовется целых 5+ раз, согласитесь, не очень действенно считать одно и тоже неоднократно. Вместо этого можно воспользоваться рецептом с CROSS JOIN, найденным на просторах Stack Overflow:

select t.id,
c.col,
    case c.col
        when 'a' then a
        when 'b' then b
        when 'c' then c
    end as data
from yourtable t
cross join
(
    select 'a' as col
    union all select 'b'
    union all select 'c'
) c

Разберём запрос подробнее. CROSS JOIN — перекрёстное соединение, декартово произведение, или, проще говоря, произведение всех строк со всеми. За ненадобностью в синтаксисе CROSS JOIN отсутствует ON — мы объединяем не по какому-то конкретному полю две таблицы, а сразу по всем существующим строкам.

Сначала мы формируем таблицу со всеми колонками, предназначенными для преобразования в строки. В нашем случае это колонки a, b и c: поэтому мы сделали таблицу c, в которой будет колонка col со значениями a, b и c:

(
    select 'a' as col
    union all select 'b'
    union all select 'c'
) c

Выглядит она так:

col
a
b
c

Затем таблицы yourtable и c объединятся перекрестным соединением, а после мы возьмём поля id, col и в зависимости от того, как называется ячейка в col, подставим соответствующие данные в поле data.

select t.id,
c.col,
    case c.col
        when 'a' then a
        when 'b' then b
        when 'c' then c
    end as value
from yourtable t
cross join
(
    select 'a' as col
    union all select 'b'
    union all select 'c'
) c

В итоге получим ту же самую искомую таблицу, с которой уже можно удобно работать любым аналитическим инструментом:

id col value
1 a a1
2 a a2
1 b b1
2 b b2
1 c c1
2 c c2
 Нет комментариев    105   4 мес   mysql   query   sql   tableau   лайфхак

Конференция Coalesce от dbt: что посмотреть?

Время чтения текста – 4 минуты

С 7 по 11 декабря проходила конференция Coalesce, о которой я рассказывал ранее. В этом году все организаторы решили проводить конференции по 5 дней с кучей докладов.

С одной стороны это плюс — ощущение, что информации много и можно выбрать, что интересно. С другой стороны такое количество информации несколько изматывает, потому что часто по названию доклада не очень понятно насколько он окажется полезным и интересным. Мне все же кажется, что более трех дней для конференции это много, т. к. интерес аудитории теряется, да и необходимость заниматься своими личными и профессиональными делами не может испариться из-за события, которое хоть и в онлайне, но занимает твое внимание.

Однако мне удалось посмотреть большую часть докладов, кое-что пролистывая. Для начала коротко в целом о впечатлениях: очень круто изучать доклады с подобной конференции как Coalesce, потому что речь идет в основном о современных инструментах и облачных решениях. Почти в каждом докладе можно услышать про Redshift / BigQuery / Snowflake, а с точки зрения BI: Mode / Tableau / Looker / Metabase. В центре всего, разумеется, dbt.

Мой шорт-лист докладов, которые рекомендую изучить:

  1. dbt 101 — вводный доклад и интро в то, что такое dbt и как его используют
  2. Kimball in the context of the modern data warehouse: what’s worth keeping, and what’s not — интересный и очень-очень спорный доклад, который вызвал массу вопросов в slack dbt. Вкратце, автор предлагает перейти на «широкие» аналитические таблицы и отказаться от нормальных форм всюду.
  3. Building a robust data pipeline with dbt, Airflow, and Great Expectations — в докладе про небезынтересный инструмент greatexpectations, суть которого в валидации данных
  4. Orchestrating dbt with Dagster — мне было несколько скучновато слушать, но если хочется познакомиться с Dagster — самое то
  5. Supercharging your data team — ребята сделали обертку к dbt, назвали dbt executor 9000 и рассказывают о нем
  6. Presenting: SQLFluff — про очень классную штуку SQLFluff, которая автоматически редактирует SQL-код согласно канонам
  7. Quickstart your analytics with Fivetran dbt packages — из доклада можно узнать, что такое Fivetran и как его используют совместно с dbt
  8. Perfect complements: Using dbt with Looker for effective data governance — про взаимодействие dbt и looker, про различия и схожие части инструментов

Итоги прохождения курса по dbt

Время чтения текста – 4 минуты

Недавно прошёл курс по dbt от команды dbt. Курс классный, в нем много практики. Я использовал Google BigQuery и публичные датасеты от dbt для решения описанных примеров, а в обучающих материалах все построено на Snowflake.

В целом, узнал много нового и полезного о dbt, кратко summary:

  1. Во введении ребята объясняют роль Analytics Engineer, о котором так много разговоров и ссылаются на их пост блога
  2. Дается исчерпывающая информация о том, как подключить dbt к хранилищу и .git
  3. В dbt довольно тривиальными запросами реализовано тестирование данных на предмет уникальности и соответствия значениям. Это реально базовые SQL-запросы, которые проверяют наличие или отсутствие поля или значений. И тут интересно следующее: когда пишешь самостоятельно похожие запросы иногда думаешь, что во всем остальном мире так никто не делает, ну, к примеру, как в запросе ниже. А оказывается еще как делают, вот даже публично внутри dbt все эти тесты так и реализованы. И, кстати, крайне удобно, что SQL-код каждого теста можно изучить и скомпилировать.
SELECT sum(amount) FROM ... HAVING sum(amount) > 0
  1. Круто и удобно формируется документация и DAG (Directed Acyclic Graph), который показывает все шаги преобразований модели
  2. Поскольку dbt построен на Liquid и использовании Jinja (движок шаблонов в Python), то можно делать всякие невероятные вещи вроде написания внутреннего макроса (читай, условный операторы, циклы или создание функций) и применять этот макрос для автоматизации однотипных частей запроса. Это прям вау 🙂
  3. Многие вещи уже придуманы и разработаны коммьюнити, поэтому существует dbt hub, через который можно подключить интересующие пакеты и не изобретать велосипед.
  4. Отдельного упоминания достойны алгоритмы формирования инкрементального наполнения таблиц и создания снэпшотов. Для одного из проектов абсолютно такой же алгоритм по созданию снэпшотов с date_form / date_to мне доводилось проектировать самостоятельно.
    Было приятно увидеть, что у ребят из dbt это работает абсолютно аналогичным образом.
  1. Разумеется, используя Jinja и dbt, можно автоматизировать построение аналитических запросов, это так и называется Analyses. Скомпилированный код запроса, можно имплементировать в любимую BI-систему и наслаждаться результатами.

Общие впечатления очень положительные: dbt ждет большое будущее и развитие, ведь коммьюнити растет вместе с возможностями и ресурсами компании. Ждем коннекторов к другим СУБД помимо PostgreSQL, BigQuery, Snowflake, Redshift.

Ранее Ctrl + ↓