12 заметок с тегом

clickhouse

Три способа рассчитать накопленную сумму в SQL

Время чтения текста – 7 минут

Расчет накопленной (или кумулятивной, что то же самое) суммы SQL — это очень распространенный запрос, который часто используют в анализе финансов, динамики прибыли и прочих показателей компании. В сегодняшней статье вы узнаете, что такое накопленная сумма и как можно написать SQL-запрос для ее вычисления.

Если вы вдруг являетесь начинающим пользователем SQL, то давайте, как в школьной задаче, поймем, что нам дано и что нам необходимо найти. Накопленная сумма — это совокупная сумма предыдущих чисел в столбце. Давайте посмотрим на пример ниже, чтобы точно знать, какой результат мы ожидаем увидеть в итоге. Итак, существует таблица leftjoin.daily_sales_sample, в которой есть всего два столбца date и revenue. По столбцу revenue нам нужно рассчитать накопленную сумму и записать результат в отдельный столбец.

Что у нас есть?

Date Revenue
10.11.2021 1200
11.11.2021 1600
12.11.2021 800
13.11.2021 3000

Что мы хотим найти?

Date Revenue Cumulative Revenue
10.11.2021 1200 1200 ↓
11.11.2021 1600 2800↓
12.11.2021 800 3600 ↓
13.11.2021 3000 6600

На графике две этих переменных выглядят следующим образом:

Итак, без лишних слов, давайте приступать к решению задачи.

Способ 1 — Идеальный — Используем оконные функции

Итак, если в базе данных можно пользоваться оконными функциями, то жизнь хороша и прекрасна. С их помощью можно написать простой запрос, который будет суммировать значения из столбца revenue по мере увеличения даты и сразу вернет нам таблицу с кумулятивной суммой в столбце, который мы назвали total.

SELECT
	date,
	revenue,
	SUM(revenue) OVER (ORDER BY date asc) as total
FROM leftjoin.daily_sales_sample 
ORDER BY date;

Способ 2 — Хитрый — Решение без оконных функций

Вполне возможно, что вам понадобится решить такую задачу без использования оконных функций. К примеру, если вы используете MySQL (до 8 версии) или любую другую БД, в которой оконных функций нет. Тогда решение задачи чуть усложняется. Однако, вы ведь знаете, что нет ничего невозможного?
Чтобы провернуть все то же самое без оконных функций, нужно использовать INNER JOIN для присоединения таблицы к себе самой. Так, к каждой строке таблицы мы присоединяем строки, которые соответствуют всем предыдущим датам до текущей даты включительно. В нашем примере, для 10 ноября — 10 ноября, для 11 ноября — 10 и 11 ноября и так далее. Промежуточный запрос будет выглядеть вот так:

SELECT * 
FROM leftjoin.daily_sales_sample ds1 
INNER JOIN leftjoin.daily_sales_sample ds2 on ds1.date>=ds2.date
ORDER BY ds1.date, ds2.date;

А его результат:

Date 1 Revenue 1 Date 2 Revenue 2
10.11.2021 1200 10.11.2021 1200
11.11.2021 1600 10.11.2021 1200
11.11.2021 1600 11.11.2021 1600
12.11.2021 800 10.11.2021 1200
12.11.2021 800 11.11.2021 1600
12.11.2021 800 12.11.2021 800
13.11.2021 300 10.11.2021 1200
13.11.2021 300 11.11.2021 1600
13.11.2021 300 12.11.2021 800
13.11.2021 300 13.11.2021 300

А затем, нужно просуммировать прибыли, группируя их по каждой дате. Если собрать все в единый запрос, то он будет выглядеть вот так:

SELECT
	ds1.date,
	ds1.revenue,
	SUM(ds2.revenue) as total
FROM leftjoin.daily_sales_sample ds1 
INNER JOIN leftjoin.daily_sales_sample ds2 on ds1.date>=ds2.date
GROUP BY ds1.date, ds1.revenue
ORDER BY ds1.date;

Способ 3 — Специфический — Решение с помощью массивов в ClickHouse

Если вы используете Clickhouse, то в этой системе есть специальная функция, которая может помочь рассчитать кумулятивную сумму. Для начала, нам нужно преобразовать все столбцы таблицы в массивы и рассчитать показатель «Moving Sum» для столбца revenue.

SELECT groupArray(date) dates, groupArray(revenue) as revs, 
groupArrayMovingSum(revenue) AS total
FROM (SELECT date, revenue FROM leftjoin.daily_sales_sample
	  ORDER BY date)

Спасибо Дмитрию Титову из Altinity за комментарий про сортировку в подзапросе

Так, мы получим три массива значений:

dates revs total
[’10.11.2021’,’11.11.2021’,’12.11.2021’,’13.11.2021’] [1200, 1600, 800, 300] [1200, 2800, 3600, 3900]

Но три массива, которые записаны в ячейки — это не то, что мы хотим получить, хотя значения этих массивов уже абсолютно соответствуют искомому результату. Теперь массивы нужно привести обратно к табличному виду с помощью функции ARRAY JOIN.

SELECT dates, revs, total FROM
(SELECT groupArray(date) dates, groupArray(revenue) as revs, 
groupArrayMovingSum(revenue) AS total
FROM (SELECT date, revenue FROM leftjoin.daily_sales_sample
	  ORDER BY date)) as t
ARRAY JOIN dates, revs, total;

Бонус — Оконные функции в Clickhouse

Если вам не хочется иметь дело с массивами, что иногда и правда бывает затратно по времени, то есть еще один вариант решения задачи. Можно использовать оконные функции, например функцию runningAccumulate(), которая суммирует значения всех ячеек с первой до текущей.

SELECT date, runningAccumulate(revenue)
  FROM 
  (
    SELECT date, sumState(revenue) AS revenue
    FROM leftjoin.daily_sales_sample
    GROUP BY date 
    ORDER BY date ASC
  )
ORDER BY date

Если вы столкнетесь с необходимостью рассчитать кумулятивную сумму в SQL, то теперь вы сможете решить эту задачу, в какой бы системе управления баз данных ни была организована работа :)

 1 комментарий    1405   1 мес   clickhouse   mysql   postgresql   sql

Тренинг по Clickhouse от Altinity

Время чтения текста – 7 минут

Буквально на днях закончил обучение Clickhouse от Altinity (101 Series Training). Для тех, кто только знакомится с Clickhouse Altinity предлагает базовый бесплатный тренинг: Data Warehouse Basics. Рекомендую начать с него, если планируете погружаться в обучение.

Сертификация от Altinity

Хочу поделиться своими впечатлениями об обучении и поделиться своим конспектом с тренинга.
Обучение стоит $500 и длится четыре дня по два часа, проводится в наше вечернее время (начиная с 19:00 GMT+3).

Сессия №1

Первый день в бОльшей степени повторяет пройденное в Data Warehouse Basics, однако в нем есть несколько новых идей, например о том, как можно получить полезную информацию о запросах из системных таблиц.

Например, такой query выдаст какие команды запущены и в каком они статусе:

SELECT command, is_done
FROM system.mutations
WHERE table = 'ontime'

Помимо этого, для меня было очень полезно узнать про компрессию колонок с использованием кодеков:

ALTER TABLE ontime
 MODIFY COLUMN TailNum LowCardinality(String) CODEC(ZSTD(1))

Для тех, кто начинает погружение в Clickhouse первый день будет супер-полезным в том, чтобы разобраться с движками таблиц и синатксисом их создания, партициями, вставкой данных (к примеру, напрямую из S3).

INSERT INTO sdata
SELECT * FROM s3(
 'https://s3.us-east-1.amazonaws.com/d1-altinity/data/sdata*.csv.gz',
 'aws_access_key_id',
 'aws_secret_access_key',
 'Parquet',
 'DevId Int32, Type String, MDate Date, MDatetime
DateTime, Value Float64')

Сессия №2

Второй день мне представляется максимально насыщенным и полезным, потому что в рамках него Robert из Altinity подробно рассказывает про агрегирующие функции в Clickhouse и про создание материализованных представлений (подробно по шагам разбирается схема создания материализованного представления).

Отдельное внимание устройству джойнов в Clickhouse

Мне было супер-полезно узнать про типы индексов в CH

Сессия №3

В рамках третьего дня коллеги делятся знаниями о том как работать с Kafka, JSON-объектами, которые хранятся в таблицах.
Интересно было узнать, что работа с типами данных массив в Clickhouse очень похоже на работу с массивами в Python:

WITH [1, 2, 4] AS array
SELECT
 array[1] AS First,
 array[2] AS Second,
 array[3] AS Third,
 array[-1] AS Last,
 length(array) AS Length

И при работе с массивами крутая фича это ARRAY JOIN, который «разворачивает» массив в плоскую реляционную таблицу:

Clickhouse позволяет эффективно взаимодействовать с JSON-объектами, которые хранятся в таблице:

-- Get a JSON string value
SELECT JSONExtractString(row, 'request') AS request
FROM log_row LIMIT 3
-- Get a JSON numeric value
SELECT JSONExtractInt(row, 'status') AS status
FROM log_row LIMIT 3

На примере этого кусочка кода отдельно извлекаются элементы JSON-массива ’request’ и ’status’.

Их можно сложить в ту же таблицу:

ALTER TABLE log_row
 ADD COLUMN
status Int16 DEFAULT
 JSONExtractInt(row, 'status')
ALTER TABLE log_row
UPDATE status = status WHERE 1 = 1

Сессия №4

А на заключительный четвертый день оставлена самая трудная тема с моей точки зрения: построение шардированных и реплицированных кластеров, построение запросов на распределенных серверах Clickhouse.

Отдельный респект Altinity за отличную подборку лабораторных заданий в ходе обучения.

Ссылки:

 Нет комментариев    602   5 мес   clickhouse   sql

Парсим вакансии для аналитиков из Indeed

Время чтения текста – 8 минут

В этом материале мы расскажем, как парсить вакансии с сайта Indeed. Indeed — это крупнейший в мире поисковик вакансий. Этим текстом мы начинаем большой проект по анализу и визуализации показателей оплаты труда в области Data Science в разных странах.
Подобный анализ рынка вакансий, но только в России, мы проводили в материале Анализ рынка вакансий аналитики и BI: дашборд в Tableau, когда парсили данные с сайта HeadHunter.

А еще у нас можно почитать материал Парсим данные каталога сайта, используя Beautiful Soup и Selenium

Импорт библиотек
Библиотека fake_useragent имитирует реальный User-Agent, чтобы преодолеть защиту сайта от парсинга. Таким образом мы сможем пройти проверку HTTP заголовка User-Agent.
Модуль urllib.parse разбирает URL-адрес на компоненты и записывает его как кортеж. Он пригодится для перехода на карточки вакансий. BeautifulSoup поможет разобраться в структуре html-страницы и добыть нужную нам информацию.

import requests
from datetime import timedelta, datetime
import urllib.parse
from fake_useragent import UserAgent
from bs4 import BeautifulSoup
import pandas as pd
import time
from lxml.html import fromstring
from clickhouse_driver import Client
from clickhouse_driver import errors
import numpy as np
from funcs import check_title, get_skills_row, parse_salary, get_sheetname, create_table

Создадим таблицу в Clickhouse
Данные, которые мы собираемся собрать, будем хранить в базе Clickhouse.

create_table = '''CREATE TABLE if not exists indeed.vacancies (
    row_idx UInt16,
    query_string String,
    country String,
    title String,
    company String,
    city String,
    job_added Date,
    easy_apply UInt8,
    company_rating Nullable(Float32),
    remote UInt8,
    job_id String,
    job_link String,
    sheet String,
    skills String,
    added_date Date,
    month_salary_from_USD Float64,
    month_salary_to_USD Float64,
    year_salary_from_USD Float64,
    year_salary_to_USD Float64,
)
ENGINE = ReplacingMergeTree
SETTINGS index_granularity = 8192'''

Обход блокировок
Нам нужно обойти защиту Indeed и избежать блокировки по IP. Для этого используем анонимные прокси адреса на сайте free-proxy-list.net. Как собрать свежие прокси, мы писали в нашем предыдущем тексте «Пишем парсер свежих прокси на Python для Selenium». Прокси адреса мы запишем в массив, который понадобится в момент обращения к Indeed, когда запрос будет проверять User-Agent.

Данный метод удаляет IP из списка с прокси в том случае, если ответ от Indeed через него так и не пришел.

def remove_proxy_from_list_and_update_if_required(proxy):
    global _proxies
    _proxies.remove(proxy)
    if len(_proxies) == 0:
        update_proxy_list()

Функция, используя прокси, возвращает нам страницу Indeed, из которой мы впоследствии спарсим данные.

def get_page(updated_url, session):
    proxy = get_proxy()
    proxy_dict = {"http": proxy, "https": proxy}
    logger.info(f'try with proxy: {proxy}')
    try:
        session.proxies = proxy_dict
        return session.get(updated_url, timeout=15)
    except (requests.exceptions.RequestException, requests.exceptions.ProxyError, requests.exceptions.ConnectTimeout,
            requests.exceptions.ReadTimeout, requests.exceptions.SSLError,
            requests.exceptions.ConnectionError, url_ex.MaxRetryError, ConnectionResetError,
            socket.timeout, url_ex.ReadTimeoutError):
        remove_proxy_from_list_and_update_if_required(proxy)
        logger.info(f'try with proxy {proxy}')
        return get_page(updated_url, session)

Методы для парсера
Искомые данные нужно будет искать по тегам и атрибутам верстки с помощью BeautifulSoup. Мы заранее собрали ключевые слова, которые нас будут интересовать в вакансиях, и подготовили с ними отдельный датасет.

В карточках вакансий нет точной даты публикации, указано лишь сколько дней назад она была опубликована. Сохраним точную дату публикации в традиционном формате с помощью timedelta.

def raw_date_to_str(raw_date):
    raw_date = raw_date.lower()
    if '+' in raw_date or "более" in raw_date:
        delta = timedelta(days=32)
        return (datetime.now() - delta).strftime("%Y-%m-%d")
    else:
        parts = raw_date.split()
        for part in parts:
            if part.isdigit():
                delta = timedelta(days=part.isdigit())
                return (datetime.now() - delta).strftime("%Y-%m-%d")
    return ""

Сохраним id вакансии в системе Indeed. Подставляя id в URL страницы, мы сможем получить доступ к полному описанию вакансий.

def get_job_id_from_card(card):
    try:
        return card['id'].split('_')[1]
    except:
        return ""

Данный метод соберет названия вакансий.

def get_title_from_card(card):
    try:
        job_title = card.find('a', {'class': 'jobtitle'}).text
        return job_title.replace('\n', '')
    except:
        return ''

Аналогичным образом напишем методы, которые будут собирать данные о названии компании, времени публикации объявления, местоположении работодателя и рейтинге работодателя на портале.

URL сайта Indeed пишется для разных стран по-разному. Для США это будет просто indeed.com, а локализации для других стран получают префиксом xx.indeed.com. Список с префиксами мы собрали в массив заранее из https://opensource.indeedeng.io/api-documentation/docs/supported-countries/ списка Indeed.

def get_link_from_card(card, card_country):
    try:
        if card_country == 'us':
            return f"https://indeed.com{card.find('a', {'class': 'jobtitle'})['href']}"
        else:
            return f"https://{card_country}.indeed.com{card.find('a', {'class': 'jobtitle'})['href']}"
    except:
        return ""

Спарсим описание вакансии, которое можно найти по тегу ’summary’. Именно там содержатся требования, которые предъявляют к кандидату.

def get_summary_from_card_and_transform_to_skills(card):
    try:
        smr = card.find('div', {'class': 'summary'}).text
        return get_skills_row(smr)
    except:
        return ""
Необходимые hard-skills из описания вакансий будем сверять со списком 'skills'. 
skills = ["python", "tableau", "etl", "power bi", "d3.js", "qlik", "qlikview", "qliksense",
          "redash", "metabase", "numpy", "pandas", "congos", "superset", "matplotlib", "plotly",
          "airflow", "spark", "luigi", "machine learning", "amplitude", "sql", "nosql", "clickhouse",
          'sas', "hadoop", "pytorch", "tensorflow", "bash", "scala", "git", "aws", "docker",
          "linux", "kafka", "nifi", "ozzie", "ssas", "ssis", "redis", 'olap', ' r ', 'bigquery', 'api', 'excel']

Эта функция разобьет ’summary’ на слова пробелом и проверит их на соответствие нашему списку. В датасет будут возвращаться совпадения с нашим списком hard-skills.

def get_skills_row(summary):
    summary = summary.lower()
    row = []
    for sk in skills:
        if sk in summary:
            row.append(sk)
    return ','.join(row)

На выходе мы получим таблицу с примерно 30 тысячами строк.

Полный код проекта можно посмотреть в нашем репозитории на GitHub.

Матемаркетинг: современный облачный Data Stack

Время чтения текста – 1 минута

С 9 по 13 ноября в онлайн-формате прошёл Матемаркетинг — крупнейшая конференция по маркетинговой аналитике в России, и в этом году мне посчастливилось стать одним из спикеров. Я выступил с двумя докладами, в этом материале обсудим первый — о современном облачном Data Stack.

Внутри объясняю подход к проектированию аналитической инфраструктуры, обосновываю использование Clickhouse при построении облачной аналитики и рассказываю о его же нюансах и говорю про Redash с точки зрения инструмента для визуализации.

Частотный словарь и биграммы по постам инвесторов

Время чтения текста – 16 минут

Тинькофф Инвестиции — сервис от Тинькофф Банка для инвестирования на Московской и Санкт-Петербургской биржах. Внутри сервиса есть социальная сеть «Пульс», где инвесторы любого уровня могут делиться своими опытом, мыслями и планами, комментировать и оценивать чужие посты. Сегодня решим такую задачу:
построим частотный словарь и биграммы по постам пользователей, разделив их по объёму портфеля, чтобы понять, чем отличаются посты людей с разным объёмом инвестиций.

Лента по ценной бумаге в Пульсе выглядит вот так:

У каждого инвестора есть личный профиль. Внутри отображается объём портфеля, статистика прироста за год и число сделок за последний месяц.

Схема в Clickhouse

Для биграмм и частотного словаря достаточно собрать только тексты постов, логины пользователей и объёмы портфеля, но ради спортивного интереса будем хранить ещё и рост портфеля, число сделок человека за месяц и количество оценок под его постом. Для хранения данных получится две таблицы posts и users:

CREATE TABLE tinkoff.posts
(
    `login` String,
    `post` String,
    `likes` Int16
)
ENGINE = MergeTree
ORDER BY login

 CREATE TABLE tinkoff.users
(
    `login` String,
    `volume_prefix` String,
    `volume` String,
    `year_stats_prefix` String,
    `year_stats` String
)
ENGINE = MergeTree()
ORDER BY login

В таблице с пользователями volume_prefix — это префикс «до» или «от», стоящий в объёме портфеля, а volume — сам объём портфеля. Соответственно years_stats_prefix обозначает, портфель за год упал или вырос, а year_stats — на сколько он упал или вырос. Такая схема из двух таблиц с ключом сортировки таблиц по полю login позволит их соединить позднее.

Пишем парсер постов

У Пульса нет своего API, поэтому для парсинга постов будем использовать Selenium.

Мы уже писали про то, как парсить сайты с прокруткой при помощи Selenium

Нам понадобятся следующие библиотеки:

from selenium import webdriver
import time
from webdriver_manager.chrome import ChromeDriverManager
from clickhouse_driver import Client
from bs4 import BeautifulSoup as bs
import pandas as pd
import requests
import os
from lxml import html
import re

Сразу составим список интересующих ценных бумаг: это акции Сбербанка, Газпрома, Яндекса, Лукойла, MailRu, Аэрофлота, Киви, ВТБ, Детского Мира и Ленты. Для каждой бумаги будем переходить на страницу с постами, в которых она упоминается и проматывать страницу, пока длина страницы не станет более 300000: это около одной недели.

driver = webdriver.Chrome(ChromeDriverManager().install())

securities = ['SBER', 'GAZP', 'YNDX', 'LKOH', 'MAIL', 'AFLT', 'QIWI', 'VTBR', 'DSKY', 'LNTA']
        
for security in securities:
    try:
        print(security)
        driver.get(f'https://www.tinkoff.ru/invest/stocks/{security}/pulse/')
        
        page_length = driver.execute_script("return document.body.scrollHeight")
        while page_length < 300000:
            driver.execute_script(f"window.scrollTo(0, {page_length - 1000});")
            page_length = driver.execute_script("return document.body.scrollHeight")

После забираем себе весь код полученной страницы и извлекаем нужную информацию: логины, текст постов и число лайков. Формируем из них DataFrame и записываем его в папку data.

source_data = driver.page_source
        soup = bs(source_data, 'lxml')
        
        posts = soup.find_all('div', {'class':'PulsePostCollapsed__text_1ypMP'})
        logins = soup.find_all('div', {'class':'PulsePostAuthor__nicknameLink_19Aca'})
        likes = soup.find_all('div', {'class':'PulsePostBody__likes_3qcu0'})
        
        logins = [login.text for login in logins]
        posts = [post.text for post in posts]
        likes = [like.text.split()[0] for like in likes]
        
        df_posts = pd.DataFrame()
        df_posts['login'] = logins
        df_posts['post'] = posts
        df_posts['likes'] = likes
        
        df_posts.to_csv(f'data/{security}.csv', index=False)
        
        print(f'SAVED {security}')
    except Exception as E:
        print(E)

После того, как нужные посты собраны, отправим их в таблицу posts в Clickhouse. При помощи модуля os переходим в директорию data и собираем в список all_files названия всех файлов в ней — это все csv-таблицы, которые мы спарсили. Затем по очереди читаем файл в DataFrame и вставляем в posts.

client = Client(host='', user='', password='', port='9000', database='tinkoff')

os.chdir('data')
all_files = os.listdir()

for file in all_files:
    df = pd.read_csv(file)
    client.execute("INSERT INTO posts VALUES", df.to_dict('records'))

Собираем информацию о профилях

Чтобы собрать все профили, получим уникальный список логинов из базы:

flatten = lambda t: [item for sublist in t for item in sublist]
logins = flatten(client.execute("SELECT DISTINCT login FROM posts"))

Получать посты будем request-запросом, без Selenium, ведь ничего листать уже не нужно. Но иконка падения или роста портфеля за год не является текстом и получить её нельзя, зато внутри CSS-стилей можно увидеть её цвет — его мы и будем сохранять себе.

Поэтому опишем такую функцию: она примет объект soup и извлечёт цвет иконки.

headers = {'accept': '*/*',
           'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36'}

def up_or_down_color(soup):
    string = str(soup.find('div', {'class':'Icon__container_3u7WK'}))
    start_index = string.find('style')
    color = string[start_index + 13:start_index + 20]
    return color

А теперь напишем парсер, который проходит по списку логинов, для каждого отправляет запрос и собирает всю статистику профиля. Причём если цвет иконки роста зеленый, то в поле year_stats_prefix добавим «+», иначе «-». В конце сделаем паузу на 0.2 секунды на всякий случай, чтобы не напороться на неявные ограничения.

session = requests.Session()

login_list = []
volume_list = []
volume_prefix_list = []
year_stats_list = []
year_stats_prefix_list = []

count = 0
for login in logins:
    print(count, '/', len(logins))
    
    try:
        count += 1
        if login == 'blocked_user':
            continue

        url = f'https://www.tinkoff.ru/invest/social/profile/{login}'
        request = session.get(url, headers=headers)
        soup = bs(request.content, 'lxml')

        try:
            login = soup.find('div', {'class':'ProfileHeader__nickname_1oynx'}).text
            volume = soup.find('span', {'class':'Money__money_3_Tn4'}).text
            _to = soup.find('div', {'class':'ProfileHeader__statistics_11-DO'}).text.find('До')
            _from = soup.find('div', {'class':'ProfileHeader__statistics_11-DO'}).text.find('От')
            year_stats = soup.find('div', {'class':'ProfileHeader__statisticsItem_1HPLt'}).text
            color = up_or_down_color(soup)
        except AttributeError as E:
            print(login, E)
            continue
        volume_list.append(volume)
        login_list.append(login)

        if _to == -1:
            volume_prefix_list.append('до')
        else:
            volume_prefix_list.append('от')

        year_stats_list.append(re.findall(r'\d+.+', year_stats)[0])

        if color == '#22a053':
            year_stats_prefix_list.append('-')
        elif color == '#dd5656':
            year_stats_prefix_list.append('+')
        else:
            year_stats_prefix_list.append('')
    except Exception as E:
        print(E)
        continue
    time.sleep(0.2)

Собираем все аккаунты и статистику по ним в DataFrame. Их тоже сохраним себе в базу.

df_users = pd.DataFrame()
df_users['login'] = login_list
df_users['volume_prefix'] = volume_prefix_list
df_users['volume'] = volume_list
df_users['year_stats_prefix'] = year_stats_prefix_list
df_users['year_stats'] = year_stats_list

client.execute("INSERT INTO users VALUES", df_users.to_dict('records'))

А теперь сделаем LEFT JOIN таблицы с постами к таблице с пользователями, чтобы у каждой строки с постом была ещё статистика по аккаунту автора. Запишем результат в DataFrame.

posts_with_users = client.execute('''
    SELECT login, post, likes, volume_prefix, volume, year_stats_prefix, year_stats FROM posts
    LEFT JOIN users
    ON posts.login = users.login
''')
posts_with_users_df = pd.DataFrame(posts_with_users, columns=['login', 'post', 'likes', 'volume_prefix', 'volume', 'year_stats_prefix', 'year_stats'])

Полученный результат будет выглядеть так:

Частотный словарь и биграммы

Для начала составим частотный словарь по постам без разделений на группы.

posts_with_users_df.post.str.split(expand=True).stack().value_counts()

Получим, что предлоги и союзы превалируют над остальными словами:

Значит, их нужно убрать из сета данных. Но даже в том случае, если данные будут очищены, получим что-то подобное. Такие данные ни о чём не говорят, и никаких выводов сделать не удастся.

В таком случае попробуем построить биграммы. Одна биграмма — последовательность из двух элементов, то есть два слова, стоящие рядом друг с другом. Существует много алгоритмов построения n-грамм разной степени оптимизации, мы воспользуемся встроенной функцией в nltk и разберём пример построения биграмм для одной группы. Первым делом импортируем дополнительные библиотеки, загружаем stopwords для русского языка и чистим данные. В список стоп-слов вносим дополнительные: среди них будут и тикеры акций, которые встречаются в каждом посте.

import nltk
from nltk.corpus import stopwords
from pymystem3 import Mystem
from string import punctuation
import unicodedata
import collections

nltk.download("stopwords")
nltk.download('punkt')
russian_stopwords = stopwords.words("russian")
append_stopword = ['это', 'sber', 'акция', 'компания', 'aflt', 'gazp', 'yndx', 'lkoh', 'mail', 'год', 'рынок', 'https', 'млрд', 'руб', 'www', 'кв']
russian_stopwords.extend(append_stopword)

Опишем функцию для подготовки текста, которая переведёт все слова в нижний регистр, приведёт к нормальной форме, удалит стоп-слова и пунктуацию:

mystem = Mystem() 

def preprocess_text(text):
    tokens = mystem.lemmatize(text.lower())
    tokens = [token for token in tokens if token not in russian_stopwords\
              and token != " " \
              and token.strip() not in punctuation]
    
    text = " ".join(tokens)
    
    return text

posts_with_users_df.post = posts_with_users_df.post.apply(preprocess_text)

Для примера возьмём посты группы инвесторов с объёмом портфеля до 10 тысяч рублей и построим биграммы, а затем выведем самые частые:

up_to_10k_df = posts_with_users_df[(posts_with_users_df['volume_prefix'] == 'от') & (posts_with_users_df['volume'] == '10 000 ₽')]

up_to_10k_counts = collections.Counter()
for sent in up_to_10k_df["post"]:
    words = nltk.word_tokenize(sent)
    up_to_10k_counts.update(nltk.bigrams(words))
up_to_10k_counts.most_common()

Получаем такой список:

Результаты исследования биграмм

В группе с объёмом портфеля до 10 и 100 тысяч руб. инвесторы чаще пишут о личном опыте и полученной прибыли: на это указывают биграммы «чистая прибыль» и «финансовый результат».

До 10 000 руб:

  1. добрый утро, 44
  2. цена нефть, 36
  3. неквалифицированный инвестор, 36
  4. чистый прибыль, 32
  5. шапка профиль, 30
  6. московский биржа, 30
  7. совет директор, 28

До 100 000 руб:

  1. чистый прибыль, 80
  2. финансовый результат, 67
  3. добрый утро, 66
  4. индекс мосбиржа, 63
  5. цена нефть, 58
  6. квартал 2020, 42
  7. мочь становиться 41

В группе до 500 тысяч руб. впервые появляются биграммы со словами «подписываться», «выкладывать», «новость» — инвесторы с такими характеристиками портфеля часто заводят собственные блоги об инвестировании и продвигают их через посты в Пульсе.

До 500 000 руб:

  1. чистый прибыль, 169
  2. квартал 2020, 154
  3. отчетность 3, 113
  4. выкладывать новость, 113
  5. 🤝подписываться, 80
  6. подписываться выкладывать, 80
  7. публиковать отчёт, 80
  8. цена нефть, 76
  9. колво бумага, 69
  10. вес портфель, 68

В биграммах группы с объёмом портфеля до и от 1 миллиона руб. появляется «фьючерс», что логично — это сложный инструмент, который обычно не рекомендуется новичкам. Кроме того, в постах группы проходит больше обсуждений отчётностей компаний — это биграммы «финансовый отчетность», «опубликовать финансовый», «отчетность мсфо».

До 1 000 000 руб:

  1. 3 квартал, 183
  2. квартал 2020, 157
  3. фьчерс утро, 110
  4. финансовый отчетность, 107
  5. опубликовать финансовый, 104
  6. чистый прибыль, 75
  7. наш биржа, 72
  8. отчетность мсфо, 69
  9. цена нефть, 67
  10. операционный результат, 61
  11. ноябрьский фьючерс, 54
  12. азиатский площадка, 51

От 1 000 000 руб:

  1. октябрь опубликовывать, 186
  2. 3 квартал, 168
  3. квартал 2020, 168
  4. финансовый отчетность, 159,
  5. опубликовывать финансовый, 95
  6. чистый прибыль, 94
  7. операционный результат, 86
  8. целевой цена, 74
  9. опубликовывать операционный, 63
  10. цена повышать, 60
 2 комментария    91   2020   clickhouse   Data Analytics   data science   nltk   python
Ранее Ctrl + ↓