13 заметок с тегом

sql

Моделирование LTV в SQL

Время чтения текста – 13 минут

У большинства игровых и мобильных компаний имеется кривая Retention, ранее мы писали о том, что такое Retention и как его посчитать. Вкратце — это метрика, которая позволяет понять насколько хорошо продукт вовлекает пользователей в ежедневное использование. А ещё при помощи Retention и ARPDAU можно посчитать LTV (Lifetime Value), пожизненный доход с одного пользователя. Зная средний доход с пользователя за день и кривую Retention мы можем смоделировать ее и спрогнозировать LTV.

Для материала были взяты данные из одного реального игрового проекта. Нулевой день не отображен для того, чтобы видеть динамику в деталях

В сегодняшнем материале мы подробно разберём, как смоделировать LTV для 180 дней при помощи SQL и просто линейной регрессии.

Как посчитать LTV?

В общем случае формула LTV выглядит как ARPDAU умноженное на Lifetime — время жизни пользователя в проекте.

Посмотрим на классический график Retention:

Lifetime — это площадь фигуры под Retention:

Откуда взялись интегралы и площади можно подробнее узнать в этом материале

Значит, чтобы посчитать Lifetime, нужно взять интеграл от функции удержания по времени. Формула приобретает следующий вид:

Для описания кривой Retention лучше всего подходит степенная функция a*x^b. Вот как она выглядит в сравнении с кривой Retention:

При этом x — номер дня, a и b — параметры функции, которую мы построим при помощи линейной регрессии. Регрессия появилась неслучайно — эту степенную функцию можно привести к виду линейной функции, логарифмируя:

ln(a) — intercept, b — slope. Остаётся найти эти параметры — в линейной регрессии для этого используют метод наименьших квадратов. Lifetime — кумулятивная сумма прогноза за 180 дней. Посчитав её, остаётся умножить Lifetime на ARPDAU и получим LTV за 180 дней.

Строим LTV

Перейдём к практике. Для всех расчётов мы использовали данные одной игровой компании и СУБД PostgreSQL — в ней уже реализованы функции поиска параметров для линейной регрессии. Начнём с построения Retention: соберём общее количество пользователей в период с 1 марта по 1 апреля 2021 года — мы изучаем активность за один месяц:

--общее количество юзеров в когорте
with cohort as (
    select count(distinct id) as total_users_of_cohort
    from users
    where date(registration) between date '2021-03-01' and date '2021-03-30'
),

Теперь посмотрим, как ведут себя эти пользователи в последующие 90 дней:

--количество активных юзеров на 1ый день, 2ой, 3ий и тд. из когорты
active_users as (
    select date_part('day', activity.date - users.registration) as activity_day, 
               count(distinct users.id) as active_users_of_day
    from activity
    join users on activity.user_id = users.id
    where date(registration) between date '2021-03-01' and date '2021-03-30' 
    group by 1
    having date_part('day', activity.date - users.registration) between 1 and 90 --берем только первые 90 дней, остальные дни предсказываем.
),

Кривая Retention — отношение количества активных пользователей к размеру когорты текущего дня. В нашем случае она выглядит так:

По данным кривой посчитаем параметры для линейной регрессии. regr_slope(x, y) — функция для вычисления наклона регрессии, regr_intercept(x, y) — функция для вычисления перехвата по оси Y. Эти функции являются стандартными агрегатными функциями в PostgreSQL и для известных X и Y по методу наименьших квадратов.

Вернёмся к нашей формуле — мы получили линейное уравнение, и хотим найти коэффициенты линейной регрессии. Перехват по оси Y и коэффициент наклона можем найти по дефолтным для PostgreSQL функциям. Получается:

Подробнее о том, как работают функции intercept(x, y) и slope(x, y) можно почитать в этом мануале

Из свойства натурального логарифма следует, что:

Наклон считаем аналогичным образом:

Эти же вычисления запишем в подзапрос для расчёта коэффициентов регрессии:

--рассчитываем коэффициенты регрессии
coef as (
    select exp(regr_intercept(ln(activity), ln(activity_day))) as a, 
                regr_slope(ln(activity), ln(activity_day)) as b
    from(
                select activity_day,
                            active_users_of_day::real / total_users_of_cohort as activity
                from active_users 
                cross join cohort order by activity_day 
            )
),

И получим прогноз на 180 дней, подставив параметры в степенную функцию, описанную ранее. Заодно посчитаем Lifetime — кумулятивную сумму спрогнозированных данных. В подзапросе coef мы получим только два числа — параметр наклона и перехвата. Чтобы эти параметры были доступны каждой строке подзапроса lt, делаем cross join к coef:

lt as(
    select generate_series as activity_day,
               active_users_of_day::real/total_users_of_cohort as real_data,
               a*power(generate_series,b) as pred_data, 	 
               sum(a*power(generate_series,b)) over(order by generate_series) as cumulative_lt
    from generate_series(1,180,1)
    cross join coef
    join active_users on generate_series = activity_day::int
),

Сравним прогноз на 180 дней с Retention:

Наконец, считаем сам LTV — Lifetime, умноженный на ARPDAU. В нашем случае ARPDAU равняется $83.7:

select cumulative_lt as LT,
           cumulative_lt * 83.7 as LTV
from lt

Наконец, построим график LTV на 180 дней:

Весь запрос:

--общее количество юзеров в когорте
with cohort as (
    select count(*) as total_users_of_cohort
    from users
    where date(registration) between date '2021-03-01' and date '2021-03-30'
),
--количество активных юзеров на 1ый день, 2ой, 3ий и тд. из когорты
active_users as (
    select date_part('day', activity.date - users.registration) as activity_day, 
               count(distinct users.id) as active_users_of_day
    from activity
    join users on activity.user_id = users.id
    where date(registration) between date '2021-03-01' and date '2021-03-30' 
    group by 1
    having date_part('day', activity.date - users.registration) between 1 and 90 --берем только первые 90 дней, остальные дни предсказываем.
),
--рассчитываем коэффициенты регрессии
coef as (
    select exp(regr_intercept(ln(activity), ln(activity_day))) as a, 
                regr_slope(ln(activity), ln(activity_day)) as b
    from(
                select activity_day,
                            active_users_of_day::real / total_users_of_cohort as activity
                from active_users 
                cross join cohort order by activity_day 
            )
),
lt as(
    select generate_series as activity_day,
               active_users_of_day::real/total_users_of_cohort as real_data,
               a*power(generate_series,b) as pred_data, 	 
               sum(a*power(generate_series,b)) over(order by generate_series) as cumulative_lt
    from generate_series(1,180,1)
    cross join coef
    join active_users on generate_series = activity_day::int
),
select cumulative_lt as LT,
            cumulative_lt * 83.7 as LTV
from lt
 Нет комментариев    636   2 мес   Data Analytics   ltv   postgresql   sql

Тренинг по Clickhouse от Altinity

Время чтения текста – 7 минут

Буквально на днях закончил обучение Clickhouse от Altinity (101 Series Training). Для тех, кто только знакомится с Clickhouse Altinity предлагает базовый бесплатный тренинг: Data Warehouse Basics. Рекомендую начать с него, если планируете погружаться в обучение.

Сертификация от Altinity

Хочу поделиться своими впечатлениями об обучении и поделиться своим конспектом с тренинга.
Обучение стоит $500 и длится четыре дня по два часа, проводится в наше вечернее время (начиная с 19:00 GMT+3).

Сессия №1

Первый день в бОльшей степени повторяет пройденное в Data Warehouse Basics, однако в нем есть несколько новых идей, например о том, как можно получить полезную информацию о запросах из системных таблиц.

Например, такой query выдаст какие команды запущены и в каком они статусе:

SELECT command, is_done
FROM system.mutations
WHERE table = 'ontime'

Помимо этого, для меня было очень полезно узнать про компрессию колонок с использованием кодеков:

ALTER TABLE ontime
 MODIFY COLUMN TailNum LowCardinality(String) CODEC(ZSTD(1))

Для тех, кто начинает погружение в Clickhouse первый день будет супер-полезным в том, чтобы разобраться с движками таблиц и синатксисом их создания, партициями, вставкой данных (к примеру, напрямую из S3).

INSERT INTO sdata
SELECT * FROM s3(
 'https://s3.us-east-1.amazonaws.com/d1-altinity/data/sdata*.csv.gz',
 'aws_access_key_id',
 'aws_secret_access_key',
 'Parquet',
 'DevId Int32, Type String, MDate Date, MDatetime
DateTime, Value Float64')

Сессия №2

Второй день мне представляется максимально насыщенным и полезным, потому что в рамках него Robert из Altinity подробно рассказывает про агрегирующие функции в Clickhouse и про создание материализованных представлений (подробно по шагам разбирается схема создания материализованного представления).

Отдельное внимание устройству джойнов в Clickhouse

Мне было супер-полезно узнать про типы индексов в CH

Сессия №3

В рамках третьего дня коллеги делятся знаниями о том как работать с Kafka, JSON-объектами, которые хранятся в таблицах.
Интересно было узнать, что работа с типами данных массив в Clickhouse очень похоже на работу с массивами в Python:

WITH [1, 2, 4] AS array
SELECT
 array[1] AS First,
 array[2] AS Second,
 array[3] AS Third,
 array[-1] AS Last,
 length(array) AS Length

И при работе с массивами крутая фича это ARRAY JOIN, который «разворачивает» массив в плоскую реляционную таблицу:

Clickhouse позволяет эффективно взаимодействовать с JSON-объектами, которые хранятся в таблице:

-- Get a JSON string value
SELECT JSONExtractString(row, 'request') AS request
FROM log_row LIMIT 3
-- Get a JSON numeric value
SELECT JSONExtractInt(row, 'status') AS status
FROM log_row LIMIT 3

На примере этого кусочка кода отдельно извлекаются элементы JSON-массива ’request’ и ’status’.

Их можно сложить в ту же таблицу:

ALTER TABLE log_row
 ADD COLUMN
status Int16 DEFAULT
 JSONExtractInt(row, 'status')
ALTER TABLE log_row
UPDATE status = status WHERE 1 = 1

Сессия №4

А на заключительный четвертый день оставлена самая трудная тема с моей точки зрения: построение шардированных и реплицированных кластеров, построение запросов на распределенных серверах Clickhouse.

Отдельный респект Altinity за отличную подборку лабораторных заданий в ходе обучения.

Ссылки:

 Нет комментариев    542   2 мес   clickhouse   sql

Нормализация данных через запрос в SQL

Время чтения текста – 8 минут

Главный принцип анализа данных GIGO (от англ. garbage in — garbage out, дословный перевод «мусор на входе — мусор на выходе») говорит нам о том, что ошибки во входных данных всегда приводят к неверным результатам анализа. От того, насколько хорошо подготовлены данные, зависят результаты всей вашей работы.

Например, перед нами стоит задача подготовить выборку для использования в алгоритме машинного обучения (модели k-NN, k-means, логической регрессии и др). Признаки в исходном наборе данных могут быть в разном масштабе, как, например, возраст и рост человека. Это может привести к некорректной работе алгоритма. Такого рода данные нужно предварительно масштабировать.

В данном материале мы рассмотрим способы масштабирования данных через запрос в SQL: масштабирование методом min-max, min-max для произвольного диапазона и z-score нормализация. Для каждого из методов мы подготовили по два примера написания запроса — один с помощью подзапроса SELECT, а второй используя оконную функцию OVER().

Для работы возьмем таблицу students с данными о росте учащихся.

name height
Иван 174
Петр 181
Денис 199
Ксения 158
Сергей 179
Ольга 165
Юлия 152
Кирилл 188
Антон 177
Софья 165

Min-Max масштабирование

Подход min-max масштабирования заключается в том, что данные масштабируются до фиксированного диапазона, который обычно составляет от 0 до 1. В данном случае мы получим все данные в одном масштабе, что исключит влияние выбросов на выводы.

Выполним масштабирование по формуле:

Умножаем числитель на 1.0, чтобы в результате получилось число с плавающей точкой.

SQL-запрос с подзапросом:

SELECT height, 
       1.0 * (height-t1.min_height)/(t1.max_height - t1.min_height) AS scaled_minmax
  FROM students, 
      (SELECT min(height) as min_height, 
              max(height) as max_height 
         FROM students
      ) as t1;

SQL-запрос с оконной функцией:

SELECT height, 
       (height - MIN(height) OVER ()) * 1.0 / (MAX(height) OVER () - MIN(height) OVER ()) AS scaled_minmax
  FROM students;

В результате мы получим переменные в диапазоне [0...1], где за 0 принят рост самого невысокого учащегося, а 1 рост самого высокого.

name height scaled_minmax
Иван 174 0.46809
Петр 181 0.61702
Денис 199 1
Ксения 158 0.12766
Сергей 179 0.57447
Ольга 165 0.2766
Юлия 152 0
Кирилл 188 0.76596
Антон 177 0.53191
Софья 165 0.2766

Масштабирование для заданного диапазона

Вариант min-max нормализации для произвольных значений. Не всегда, когда речь идет о масштабировании данных, диапазон значений находится в промежутке между 0 и 1.
Формула для вычисления в этом случае такая:

Это даст нам возможность масштабировать данные к произвольной шкале. В нашем примере пусть а=10.0, а b=20.0.

SQL-запрос с подзапросом:

SELECT height, 
       ((height - min_height) * (20.0 - 10.0) / (max_height - min_height)) + 10 AS scaled_ab
  FROM students,
      (SELECT MAX(height) as max_height, 
              MIN(height) as min_height
         FROM students  
      ) t1;

SQL-запрос с оконной функцией:

SELECT height, 
       ((height - MIN(height) OVER() ) * (20.0 - 10.0) / (MAX(height) OVER() - MIN(height) OVER())) + 10.0 AS scaled_ab
  FROM students;

Получаем аналогичные результаты, что и в предыдущем методе, но данные распределены в диапазоне от 10 до 20.

name height scaled_ab
Иван 174 14.68085
Петр 181 16.17021
Денис 199 20
Ксения 158 11.2766
Сергей 179 15.74468
Ольга 165 12.76596
Юлия 152 10
Кирилл 188 17.65957
Антон 177 15.31915
Софья 165 12.76596

Нормализация с помощью z-score

В результате z-score нормализации данные будут масштабированы таким образом, чтобы они имели свойства стандартного нормального распределения — среднее (μ) равно 0, а стандартное отклонение (σ) равно 1.

Вычисляется z-score по формуле:

SQL-запрос с подзапросом:

SELECT height, 
       (height - t1.mean) * 1.0 / t1.sigma AS zscore
  FROM students,
      (SELECT AVG(height) AS mean, 
              STDDEV(height) AS sigma
         FROM students
        ) t1;

SQL-запрос с оконной функцией:

SELECT height, 
       (height - AVG(height) OVER()) * 1.0 / STDDEV(height) OVER() AS z-score
  FROM students;

В результате мы сразу заметим выбросы, которые выходят за пределы стандартного отклонения.

name height zscore
Иван 174 0.01488
Петр 181 0.53582
Денис 199 1.87538
Ксения 158 -1.17583
Сергей 179 0.38698
Ольга 165 -0.65489
Юлия 152 -1.62235
Кирилл 188 1.05676
Антон 177 0.23814
Софья 165 -0.65489

Транзакции в SQLAlchemy

Время чтения текста – 5 минут

Транзакция — последовательность действий, связанных с базой данных. Их основная польза заключается в том, что при возникновении какой-то ошибки или достижении других нужных условий всю транзакцию можно отменить, и все изменения, примененные к базе данных, будут отменены. Сегодня мы напишем небольшой скрипт, который при помощи транзакций SQLAlchemy пишет информацию о подписчиках сообщества в базу данных MySQL, а при возникновении ошибки отменяет текущую транзакцию.

Сбор информации об участниках через VK API

Для начала напишем пару маленьких функций — первая будет возвращать число подписчиков сообщества, а вторая — отправлять запрос и формировать датафрейм с информацией о подписчиках сообщества.

Подробнее о том, как получить токен, можно прочитать в материале «Собираем данные по рекламным кампаниям ВКонтакте»

from sqlalchemy import create_engine
import pandas as pd
import requests
import time

token = '42hj2ehd3djdournf48fjurhf9r9o2eurnf48fjurhf9r9734'
group_id = 'leftjoin'

Чтобы узнать число подписчиков достаточно отправить метод groups.getMembers с любыми параметрами — в ответе всегда возвращается количество в поле count.

def get_subs_count(group_id):
    count = requests.get('https://api.vk.com/method/groups.getMembers', params={
        'access_token':token,
        'v':5.103,
        'group_id':group_id
    }).json()['response']['count']
    return count

Для примера будем брать имена, id, фамилии подписчиков, некоторую расширенную информацию и получать только по 10 подписчиков за раз, чтобы рассмотреть работу транзакций детально — каждые 10 подписчиков будут вставляться одной транзакцией. Введём дополнительное поле offset, чтобы знать, в какой итерации добавлены строки.

def get_subs_info(group_id, offset):
    response = requests.get('https://api.vk.com/method/groups.getMembers', params={
        'access_token':token,
        'v':5.103,
        'group_id':group_id,
        'offset':offset,
        'count':10,
        'fields':'sex, has_mobile, relation, can_post'
    }).json()['response']['items']
    df = pd.DataFrame(response)
    df['offset'] = offset
    return df

Транзакции

Наконец, можем подсоединиться к базе данных при помощи SQLAlchemy:

engine = create_engine('mysql+mysqlconnector://' +
                           'root' + ':' + '' + '@' +
                           'localhost' + '/' +
                           'transaction', echo=False)

У транзакций всегда должно быть начало — begin, и конец — commit. В случае, если произошла какая-то ошибка, можно сделать откат — rollback. Сперва получаем число подписчиков сообщество, и в каждой итерации цикла при помощи контекстного менеджера with ... as создаём новое подключение. Сразу после объявляем начало транзакции по этому подключению и с обработчиком исключений пробуем получить информацию о десяти подписчиках через функцию get_subs_info. Вставляем полученный датафрейм в таблицу методом to_sql и завершаем транзакцию при помощи метода commit(). В случае, если возникла какая-то ошибка — печатаем её на экран и отменяем транзакцию.

offset = 0
subs_count = get_subs_count(group_id)
while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            transaction.commit()
        except Exception as E:
            print(E)
            transaction.rollback()
    time.sleep(1)
    offset += 10

Чтобы протестировать работу транзакций слегка обновим последний блок кода — добавим вызов ошибки ValueError после вставки данных в базу, если текущий offset равен 10.

offset = 0
subs_count = get_subs_count(group_id)
while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            if offset == 10:
                raise(ValueError)
            transaction.commit()
        except Exception as E:
            print(E)
            transaction.rollback()
    time.sleep(1)
    offset += 10

Как и планировалось, данные за итерацию с offset = 10 не занесены в таблицу. Несмотря на то, что ошибка возникла уже после добавления новых данных, транзакция была прервана методом rollback() и завершение транзакции было отменено.

UNPIVOT данных с использованием CROSS JOIN

Время чтения текста – 5 минут

Зачастую мы получаем данные в предагрегированном виде, когда каждая отдельная колонка является посчитанной метрикой. По аналогии мы получаем подобный результат, когда строим сводную таблицу в Excel и используем некоторое количество фактов для агрегации. Но что делать, если нам нужно произвести обратную операцию — Unpivot?

Как поступить, если в датасете понадобилось трансформировать данные в реляционный вид? В Tableau есть фича Unpivot, которая сделает всё сама: если датасет построен из файла, достаточно выделить нужные колонки и нажать на кнопку «Pivot». А в некоторых диалектах SQL, например, в Transact, уже есть встроенные функции, которые тоже делают это сами.

Но в случае, если датасет построен на Custom SQL Query из базы данных, у которой в арсенале отсутствуют встроенные функции для трансформации в сводную и обратно, необходим какой-то другой подход, и Tableau порекомендует для такой таблицы:

ID a b c
1 a1 b1 c1
2 a2 b2 c2

Воспользоваться таким стандартным универсальным, но не очень эффективным решением:

select id, ‘a’ AS col, a AS value
from yourtable
union all
select id, ‘b’ AS col, b AS value
from yourtable
union all
select id, ‘c’ AS col, c AS value
from yourtable

И в результате получить таблицу вида:

id col value
1 a a1
2 a a2
1 b b1
2 b b2
1 c c1
2 c c2

Порой, когда мы работаем с физической таблицей и нам надо быстро получить результаты для двух-трех колонок, действительно, подобное решение можно быстро применить, не задумываясь. Однако в случае, когда вместо таблицы содержится, например, сложный подзапрос с несколькими джойнами и нужно сделать Pivot для 5+ колонок, подзапрос вызовется целых 5+ раз, согласитесь, не очень действенно считать одно и тоже неоднократно. Вместо этого можно воспользоваться рецептом с CROSS JOIN, найденным на просторах Stack Overflow:

select t.id,
c.col,
    case c.col
        when 'a' then a
        when 'b' then b
        when 'c' then c
    end as data
from yourtable t
cross join
(
    select 'a' as col
    union all select 'b'
    union all select 'c'
) c

Разберём запрос подробнее. CROSS JOIN — перекрёстное соединение, декартово произведение, или, проще говоря, произведение всех строк со всеми. За ненадобностью в синтаксисе CROSS JOIN отсутствует ON — мы объединяем не по какому-то конкретному полю две таблицы, а сразу по всем существующим строкам.

Сначала мы формируем таблицу со всеми колонками, предназначенными для преобразования в строки. В нашем случае это колонки a, b и c: поэтому мы сделали таблицу c, в которой будет колонка col со значениями a, b и c:

(
    select 'a' as col
    union all select 'b'
    union all select 'c'
) c

Выглядит она так:

col
a
b
c

Затем таблицы yourtable и c объединятся перекрестным соединением, а после мы возьмём поля id, col и в зависимости от того, как называется ячейка в col, подставим соответствующие данные в поле data.

select t.id,
c.col,
    case c.col
        when 'a' then a
        when 'b' then b
        when 'c' then c
    end as value
from yourtable t
cross join
(
    select 'a' as col
    union all select 'b'
    union all select 'c'
) c

В итоге получим ту же самую искомую таблицу, с которой уже можно удобно работать любым аналитическим инструментом:

id col value
1 a a1
2 a a2
1 b b1
2 b b2
1 c c1
2 c c2
 Нет комментариев    121   9 мес   mysql   query   sql   tableau   лайфхак
Ранее Ctrl + ↓