22 заметки с тегом

python

Частотный словарь и биграммы по постам инвесторов

Время чтения текста – 16 минут

Тинькофф Инвестиции — сервис от Тинькофф Банка для инвестирования на Московской и Санкт-Петербургской биржах. Внутри сервиса есть социальная сеть «Пульс», где инвесторы любого уровня могут делиться своими опытом, мыслями и планами, комментировать и оценивать чужие посты. Сегодня решим такую задачу:
построим частотный словарь и биграммы по постам пользователей, разделив их по объёму портфеля, чтобы понять, чем отличаются посты людей с разным объёмом инвестиций.

Лента по ценной бумаге в Пульсе выглядит вот так:

У каждого инвестора есть личный профиль. Внутри отображается объём портфеля, статистика прироста за год и число сделок за последний месяц.

Схема в Clickhouse

Для биграмм и частотного словаря достаточно собрать только тексты постов, логины пользователей и объёмы портфеля, но ради спортивного интереса будем хранить ещё и рост портфеля, число сделок человека за месяц и количество оценок под его постом. Для хранения данных получится две таблицы posts и users:

CREATE TABLE tinkoff.posts
(
    `login` String,
    `post` String,
    `likes` Int16
)
ENGINE = MergeTree
ORDER BY login

 CREATE TABLE tinkoff.users
(
    `login` String,
    `volume_prefix` String,
    `volume` String,
    `year_stats_prefix` String,
    `year_stats` String
)
ENGINE = MergeTree()
ORDER BY login

В таблице с пользователями volume_prefix — это префикс «до» или «от», стоящий в объёме портфеля, а volume — сам объём портфеля. Соответственно years_stats_prefix обозначает, портфель за год упал или вырос, а year_stats — на сколько он упал или вырос. Такая схема из двух таблиц с ключом сортировки таблиц по полю login позволит их соединить позднее.

Пишем парсер постов

У Пульса нет своего API, поэтому для парсинга постов будем использовать Selenium.

Мы уже писали про то, как парсить сайты с прокруткой при помощи Selenium

Нам понадобятся следующие библиотеки:

from selenium import webdriver
import time
from webdriver_manager.chrome import ChromeDriverManager
from clickhouse_driver import Client
from bs4 import BeautifulSoup as bs
import pandas as pd
import requests
import os
from lxml import html
import re

Сразу составим список интересующих ценных бумаг: это акции Сбербанка, Газпрома, Яндекса, Лукойла, MailRu, Аэрофлота, Киви, ВТБ, Детского Мира и Ленты. Для каждой бумаги будем переходить на страницу с постами, в которых она упоминается и проматывать страницу, пока длина страницы не станет более 300000: это около одной недели.

driver = webdriver.Chrome(ChromeDriverManager().install())

securities = ['SBER', 'GAZP', 'YNDX', 'LKOH', 'MAIL', 'AFLT', 'QIWI', 'VTBR', 'DSKY', 'LNTA']
        
for security in securities:
    try:
        print(security)
        driver.get(f'https://www.tinkoff.ru/invest/stocks/{security}/pulse/')
        
        page_length = driver.execute_script("return document.body.scrollHeight")
        while page_length < 300000:
            driver.execute_script(f"window.scrollTo(0, {page_length - 1000});")
            page_length = driver.execute_script("return document.body.scrollHeight")

После забираем себе весь код полученной страницы и извлекаем нужную информацию: логины, текст постов и число лайков. Формируем из них DataFrame и записываем его в папку data.

source_data = driver.page_source
        soup = bs(source_data, 'lxml')
        
        posts = soup.find_all('div', {'class':'PulsePostCollapsed__text_1ypMP'})
        logins = soup.find_all('div', {'class':'PulsePostAuthor__nicknameLink_19Aca'})
        likes = soup.find_all('div', {'class':'PulsePostBody__likes_3qcu0'})
        
        logins = [login.text for login in logins]
        posts = [post.text for post in posts]
        likes = [like.text.split()[0] for like in likes]
        
        df_posts = pd.DataFrame()
        df_posts['login'] = logins
        df_posts['post'] = posts
        df_posts['likes'] = likes
        
        df_posts.to_csv(f'data/{security}.csv', index=False)
        
        print(f'SAVED {security}')
    except Exception as E:
        print(E)

После того, как нужные посты собраны, отправим их в таблицу posts в Clickhouse. При помощи модуля os переходим в директорию data и собираем в список all_files названия всех файлов в ней — это все csv-таблицы, которые мы спарсили. Затем по очереди читаем файл в DataFrame и вставляем в posts.

client = Client(host='', user='', password='', port='9000', database='tinkoff')

os.chdir('data')
all_files = os.listdir()

for file in all_files:
    df = pd.read_csv(file)
    client.execute("INSERT INTO posts VALUES", df.to_dict('records'))

Собираем информацию о профилях

Чтобы собрать все профили, получим уникальный список логинов из базы:

flatten = lambda t: [item for sublist in t for item in sublist]
logins = flatten(client.execute("SELECT DISTINCT login FROM posts"))

Получать посты будем request-запросом, без Selenium, ведь ничего листать уже не нужно. Но иконка падения или роста портфеля за год не является текстом и получить её нельзя, зато внутри CSS-стилей можно увидеть её цвет — его мы и будем сохранять себе.

Поэтому опишем такую функцию: она примет объект soup и извлечёт цвет иконки.

headers = {'accept': '*/*',
           'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36'}

def up_or_down_color(soup):
    string = str(soup.find('div', {'class':'Icon__container_3u7WK'}))
    start_index = string.find('style')
    color = string[start_index + 13:start_index + 20]
    return color

А теперь напишем парсер, который проходит по списку логинов, для каждого отправляет запрос и собирает всю статистику профиля. Причём если цвет иконки роста зеленый, то в поле year_stats_prefix добавим «+», иначе «-». В конце сделаем паузу на 0.2 секунды на всякий случай, чтобы не напороться на неявные ограничения.

session = requests.Session()

login_list = []
volume_list = []
volume_prefix_list = []
year_stats_list = []
year_stats_prefix_list = []

count = 0
for login in logins:
    print(count, '/', len(logins))
    
    try:
        count += 1
        if login == 'blocked_user':
            continue

        url = f'https://www.tinkoff.ru/invest/social/profile/{login}'
        request = session.get(url, headers=headers)
        soup = bs(request.content, 'lxml')

        try:
            login = soup.find('div', {'class':'ProfileHeader__nickname_1oynx'}).text
            volume = soup.find('span', {'class':'Money__money_3_Tn4'}).text
            _to = soup.find('div', {'class':'ProfileHeader__statistics_11-DO'}).text.find('До')
            _from = soup.find('div', {'class':'ProfileHeader__statistics_11-DO'}).text.find('От')
            year_stats = soup.find('div', {'class':'ProfileHeader__statisticsItem_1HPLt'}).text
            color = up_or_down_color(soup)
        except AttributeError as E:
            print(login, E)
            continue
        volume_list.append(volume)
        login_list.append(login)

        if _to == -1:
            volume_prefix_list.append('до')
        else:
            volume_prefix_list.append('от')

        year_stats_list.append(re.findall(r'\d+.+', year_stats)[0])

        if color == '#22a053':
            year_stats_prefix_list.append('-')
        elif color == '#dd5656':
            year_stats_prefix_list.append('+')
        else:
            year_stats_prefix_list.append('')
    except Exception as E:
        print(E)
        continue
    time.sleep(0.2)

Собираем все аккаунты и статистику по ним в DataFrame. Их тоже сохраним себе в базу.

df_users = pd.DataFrame()
df_users['login'] = login_list
df_users['volume_prefix'] = volume_prefix_list
df_users['volume'] = volume_list
df_users['year_stats_prefix'] = year_stats_prefix_list
df_users['year_stats'] = year_stats_list

client.execute("INSERT INTO users VALUES", df_users.to_dict('records'))

А теперь сделаем LEFT JOIN таблицы с постами к таблице с пользователями, чтобы у каждой строки с постом была ещё статистика по аккаунту автора. Запишем результат в DataFrame.

posts_with_users = client.execute('''
    SELECT login, post, likes, volume_prefix, volume, year_stats_prefix, year_stats FROM posts
    LEFT JOIN users
    ON posts.login = users.login
''')
posts_with_users_df = pd.DataFrame(posts_with_users, columns=['login', 'post', 'likes', 'volume_prefix', 'volume', 'year_stats_prefix', 'year_stats'])

Полученный результат будет выглядеть так:

Частотный словарь и биграммы

Для начала составим частотный словарь по постам без разделений на группы.

posts_with_users_df.post.str.split(expand=True).stack().value_counts()

Получим, что предлоги и союзы превалируют над остальными словами:

Значит, их нужно убрать из сета данных. Но даже в том случае, если данные будут очищены, получим что-то подобное. Такие данные ни о чём не говорят, и никаких выводов сделать не удастся.

В таком случае попробуем построить биграммы. Одна биграмма — последовательность из двух элементов, то есть два слова, стоящие рядом друг с другом. Существует много алгоритмов построения n-грамм разной степени оптимизации, мы воспользуемся встроенной функцией в nltk и разберём пример построения биграмм для одной группы. Первым делом импортируем дополнительные библиотеки, загружаем stopwords для русского языка и чистим данные. В список стоп-слов вносим дополнительные: среди них будут и тикеры акций, которые встречаются в каждом посте.

import nltk
from nltk.corpus import stopwords
from pymystem3 import Mystem
from string import punctuation
import unicodedata
import collections

nltk.download("stopwords")
nltk.download('punkt')
russian_stopwords = stopwords.words("russian")
append_stopword = ['это', 'sber', 'акция', 'компания', 'aflt', 'gazp', 'yndx', 'lkoh', 'mail', 'год', 'рынок', 'https', 'млрд', 'руб', 'www', 'кв']
russian_stopwords.extend(append_stopword)

Опишем функцию для подготовки текста, которая переведёт все слова в нижний регистр, приведёт к нормальной форме, удалит стоп-слова и пунктуацию:

mystem = Mystem() 

def preprocess_text(text):
    tokens = mystem.lemmatize(text.lower())
    tokens = [token for token in tokens if token not in russian_stopwords\
              and token != " " \
              and token.strip() not in punctuation]
    
    text = " ".join(tokens)
    
    return text

posts_with_users_df.post = posts_with_users_df.post.apply(preprocess_text)

Для примера возьмём посты группы инвесторов с объёмом портфеля до 10 тысяч рублей и построим биграммы, а затем выведем самые частые:

up_to_10k_df = posts_with_users_df[(posts_with_users_df['volume_prefix'] == 'от') & (posts_with_users_df['volume'] == '10 000 ₽')]

up_to_10k_counts = collections.Counter()
for sent in up_to_10k_df["post"]:
    words = nltk.word_tokenize(sent)
    up_to_10k_counts.update(nltk.bigrams(words))
up_to_10k_counts.most_common()

Получаем такой список:

Результаты исследования биграмм

В группе с объёмом портфеля до 10 и 100 тысяч руб. инвесторы чаще пишут о личном опыте и полученной прибыли: на это указывают биграммы «чистая прибыль» и «финансовый результат».

До 10 000 руб:

  1. добрый утро, 44
  2. цена нефть, 36
  3. неквалифицированный инвестор, 36
  4. чистый прибыль, 32
  5. шапка профиль, 30
  6. московский биржа, 30
  7. совет директор, 28

До 100 000 руб:

  1. чистый прибыль, 80
  2. финансовый результат, 67
  3. добрый утро, 66
  4. индекс мосбиржа, 63
  5. цена нефть, 58
  6. квартал 2020, 42
  7. мочь становиться 41

В группе до 500 тысяч руб. впервые появляются биграммы со словами «подписываться», «выкладывать», «новость» — инвесторы с такими характеристиками портфеля часто заводят собственные блоги об инвестировании и продвигают их через посты в Пульсе.

До 500 000 руб:

  1. чистый прибыль, 169
  2. квартал 2020, 154
  3. отчетность 3, 113
  4. выкладывать новость, 113
  5. 🤝подписываться, 80
  6. подписываться выкладывать, 80
  7. публиковать отчёт, 80
  8. цена нефть, 76
  9. колво бумага, 69
  10. вес портфель, 68

В биграммах группы с объёмом портфеля до и от 1 миллиона руб. появляется «фьючерс», что логично — это сложный инструмент, который обычно не рекомендуется новичкам. Кроме того, в постах группы проходит больше обсуждений отчётностей компаний — это биграммы «финансовый отчетность», «опубликовать финансовый», «отчетность мсфо».

До 1 000 000 руб:

  1. 3 квартал, 183
  2. квартал 2020, 157
  3. фьчерс утро, 110
  4. финансовый отчетность, 107
  5. опубликовать финансовый, 104
  6. чистый прибыль, 75
  7. наш биржа, 72
  8. отчетность мсфо, 69
  9. цена нефть, 67
  10. операционный результат, 61
  11. ноябрьский фьючерс, 54
  12. азиатский площадка, 51

От 1 000 000 руб:

  1. октябрь опубликовывать, 186
  2. 3 квартал, 168
  3. квартал 2020, 168
  4. финансовый отчетность, 159,
  5. опубликовывать финансовый, 95
  6. чистый прибыль, 94
  7. операционный результат, 86
  8. целевой цена, 74
  9. опубликовывать операционный, 63
  10. цена повышать, 60

Создаём дашборд на Bootstrap (Часть 2)

Время чтения текста – 16 минут

В последнем материале мы подготовили базовый макет дашборда при помощи библиотеки dash-bootstrap-components с двумя графиками: scatter plot и российской картой, которые подробно разбирали ранее. Сегодня продолжим наполнять дашборд информацией: встроим в него таблицы и фильтр данных по пивоварням.

Получение таблиц

Сами таблицы будем описывать в макете в файле application.py, но информацию, которую они отображают лаконичнее будет получить в отдельном модуле. Создадим файл get_tables.py: в нём будет функция, передающая готовую таблицу класса Table библиотеки dbc в application.py. В этом материале мы опишем только таблицу лучших пивоварен России, но на GithHub будут представлены все три.

В таблицах по заведениям и пивоварням мы реализуем фильтр по городам, но изначально города в собранных с Untappd данных записаны на латинице. Для запросов мы будем переводить русскоязычные наименования городов на английский при помощи библиотеки Google Translate. Кроме того, одни и те же города могут называться по-разному — например, «Москва» на латинице где-то записана как «Moskva», а где-то как «Moscow». Для этого дополнительно настроим маппинг наименований города и заранее создадим словарь с корректными наименованиями основных городов. Он пригодится в самом конце.

import pandas as pd
import dash_bootstrap_components as dbc
from clickhouse_driver import Client
import numpy as np
from googletrans import Translator

translator = Translator()

client = Client(host='12.34.56.78', user='default', password='', port='9000', database='')

city_names = {
   'Moskva': 'Москва',
   'Moscow': 'Москва',
   'СПБ': 'Санкт-Петербург',
   'Saint Petersburg': 'Санкт-Петербург',
   'St Petersburg': 'Санкт-Петербург',
   'Nizhnij Novgorod': 'Нижний Новгород',
   'Tula': 'Тула',
   'Nizhniy Novgorod': 'Нижний Новгород',
}

Таблица лучших пивоварен

Таблица, о которой идёт речь в материале, будет показывать топ-10 лучших российских пивоварен с изменением рейтинга. То есть мы сравниваем данные за два периода: [30 дней назад; сегодня] и [60 дней назад; 30 дней назад] и смотрим, как менялось место пивоварни в рейтинге. Соответственно, мы опишем следующие колонки: место в рейтинге, название пивоварни, ассортимент сортов пива, рейтинг пивоварни на untappd, изменение места и количество чекинов у этой пивоварни.
Опишем функцию get_top_russian_breweries, которая отправляет запрос к Clickhouse, получает общий топ пивоварен России, формирует данные и возвращает готовый для вывода DataFrame. Отправим два запроса — топ пивоварен за последние 30 дней и топ пивоварен за предыдущие 30 дней. Следующий запрос будет отбирать лучшие пивоварни, основываясь на количестве отзывов о пиве данной пивоварни.


Забираем данные из базы

def get_top_russian_breweries(checkins_n=250):
   top_n_brewery_today = client.execute(f'''
      SELECT  rt.brewery_id,
              rt.brewery_name,
              beer_pure_average_mult_count/count_for_that_brewery as avg_rating,
              count_for_that_brewery as checkins FROM (
      SELECT           
              brewery_id,
              dictGet('breweries', 'brewery_name', toUInt64(brewery_id)) as brewery_name,
              sum(rating_score) AS beer_pure_average_mult_count,
              count(rating_score) AS count_for_that_brewery
          FROM beer_reviews t1
          ANY LEFT JOIN venues AS t2 ON t1.venue_id = t2.venue_id
          WHERE isNotNull(venue_id) AND (created_at >= (today() - 30)) AND (venue_country = 'Россия') 
          GROUP BY           
              brewery_id,
              brewery_name) rt
      WHERE (checkins>={checkins_n})
      ORDER BY avg_rating DESC
      LIMIT 10
      '''
   )

top_n_brewery_n_days = client.execute(f'''
  SELECT  rt.brewery_id,
          rt.brewery_name,
          beer_pure_average_mult_count/count_for_that_brewery as avg_rating,
          count_for_that_brewery as checkins FROM (
  SELECT           
          brewery_id,
          dictGet('breweries', 'brewery_name', toUInt64(brewery_id)) as brewery_name,
          sum(rating_score) AS beer_pure_average_mult_count,
          count(rating_score) AS count_for_that_brewery
      FROM beer_reviews t1
      ANY LEFT JOIN venues AS t2 ON t1.venue_id = t2.venue_id
      WHERE isNotNull(venue_id) AND (created_at >= (today() - 60) AND created_at <= (today() - 30)) AND (venue_country = 'Россия')
      GROUP BY           
          brewery_id,
          brewery_name) rt
  WHERE (checkins>={checkins_n})
  ORDER BY avg_rating DESC
  LIMIT 10
  '''
)

Формируем из полученных строк два DataFrame:

top_n = len(top_n_brewery_today)
column_names = ['brewery_id', 'brewery_name', 'avg_rating', 'checkins']

top_n_brewery_today_df = pd.DataFrame(top_n_brewery_today, columns=column_names).replace(np.nan, 0)
top_n_brewery_today_df['brewery_pure_average'] = round(top_n_brewery_today_df.avg_rating, 2)
top_n_brewery_today_df['brewery_rank'] = list(range(1, top_n + 1))

top_n_brewery_n_days = pd.DataFrame(top_n_brewery_n_days, columns=column_names).replace(np.nan, 0)
top_n_brewery_n_days['brewery_pure_average'] = round(top_n_brewery_n_days.avg_rating, 2)
top_n_brewery_n_days['brewery_rank'] = list(range(1, len(top_n_brewery_n_days) + 1))

А затем в итераторе считаем, как изменилось место за последнее время у пивоварни. Обработаем исключение на случай, если 60 дней назад этой пивоварни в нашей базе ещё не было.

rank_was_list = []
for brewery_id in top_n_brewery_today_df.brewery_id:
   try:
       rank_was_list.append(
           top_n_brewery_n_days[top_n_brewery_n_days.brewery_id == brewery_id].brewery_rank.item())
   except ValueError:
       rank_was_list.append('–')
top_n_brewery_today_df['rank_was'] = rank_was_list

Теперь пройдёмся по полученным колонкам с текущими местами и изменениями. Если они не пустые, то при положительном изменении добавим к записи стрелочку вверх. При отрицательном — стрелочку вниз.

diff_rank_list = []
for rank_was, rank_now in zip(top_n_brewery_today_df['rank_was'], top_n_brewery_today_df['brewery_rank']):
   if rank_was != '–':
       difference = rank_was - rank_now
       if difference > 0:
           diff_rank_list.append(f'↑ +{difference}')
       elif difference < 0:
           diff_rank_list.append(f'↓ {difference}')
       else:
           diff_rank_list.append('–')
   else:
       diff_rank_list.append(rank_was)

Наконец, разметим итоговый DataFrame и вставим в него колонку с текущим местом. При этом у топ-3 будет отображаться эмодзи с золотым кубком.

df = top_n_brewery_today_df[['brewery_name', 'avg_rating', 'checkins']].round(2)
df.insert(2, 'Изменение', diff_rank_list)
df.columns = ['НАЗВАНИЕ', 'РЕЙТИНГ', 'ИЗМЕНЕНИЕ', 'ЧЕКИНОВ']
df.insert(0, 'МЕСТО',
         list('🏆 ' + str(i) if i in [1, 2, 3] else str(i) for i in range(1, len(df) + 1)))

return df

#Выбор пивоварен с фильтром по городам
Одна из функций нашего дашборда — просмотр топа пивоварен по конкретному городу. Для корректной работы напишем скрипт, который для каждого из списка российских городов получает топ пивоварен по числу чекинов и записывает данные по каждому городу в отдельные csv-файлы. В сущности, он мало чем отличается от предыдущего — рассмотрим главные отличия.

Прежде всего, функция принимает конкретный город. Мы уже отметили, что города в базе данных записаны на латинице — поэтому сначала переводим наименование города. В случае с Санкт-Петербургом, Нижним Новгородом и Пермью придётся перевести вручную: например, Санкт-Петербург переводится в Google Translate как St. Petersburg вместо ожидаемого Saint Petersburg.

ru_city = venue_city
if ru_city == 'Санкт-Петербург':
   en_city = 'Saint Petersburg'
elif ru_city == 'Нижний Новгород':
   en_city = 'Nizhnij Novgorod'
elif ru_city == 'Пермь':
   en_city = 'Perm'
else:
   en_city = translator.translate(ru_city, dest='en').text

Следующее отличие — запрос к базе. Нам нужно добавить в него условие совпадения по городу, чтобы получать чекины только в запрошенном городе:

WHERE (rt.venue_city='{ru_city}' OR rt.venue_city='{en_city}')

Наконец, сформированный DataFrame мы не возвращаем, а сохраняем в директорию data/cities.

df = top_n_brewery_today_df[['brewery_name', 'venue_city', 'avg_rating', 'checkins']].round(2)
df.insert(3, 'Изменение', diff_rank_list)
df.columns = ['НАЗВАНИЕ', 'ГОРОД', 'РЕЙТИНГ', 'ИЗМЕНЕНИЕ', 'ЧЕКИНОВ']
df.to_csv(f'data/cities/{en_city}.csv', index=False)  # saving DF
print(f'{en_city}.csv updated!')

Обновление таблиц по расписанию

Наш дашборд будет использовать библиотеку apscheduler для вызова последней функции по расписанию и обновления таблиц по городам. Следующие строки добавим в файл application.py: scheduler будет обновлять данные для каждого города из списка all_cities ежедневно в 13:30 по МСК.

from apscheduler.schedulers.background import BackgroundScheduler
from get_tables import update_best_breweries

all_cities = sorted(['Москва', 'Сергиев Посад', 'Санкт-Петербург', 'Владимир',
             'Красная Пахра', 'Воронеж', 'Екатеринбург', 'Ярославль', 'Казань',
             'Ростов-на-Дону', 'Краснодар', 'Тула', 'Курск', 'Пермь', 'Нижний Новгород'])

scheduler = BackgroundScheduler()
@scheduler.scheduled_job('cron', hour=10, misfire_grace_time=30)
def update_data():
   for city in all_cities:
       update_best_breweries(city)
scheduler.start()

Формирование таблицы

Наконец, опишем заключительную функцию get_top_russian_breweries_table(venue_city, checkins_n=250) — она будет принимать город, количество чекинов и будет возвращать сформированную таблицу dbc. Второй параметр — checkins_n будет отсеивать пивоварни, у которых чекинов меньше значения этой переменной. Если город не указан, сразу вызываем ранее описанную get_top_russian_breweries(checkins_n) — она вернёт общую статистику за последнее время. В противном случае снова переводим города на латиницу.

if venue_city == None: 
   selected_df = get_top_russian_breweries(checkins_n)
else: 
   ru_city = venue_city
   if ru_city == 'Санкт-Петербург':
       en_city = 'Saint Petersburg'
   elif ru_city == 'Нижний Новгород':
       en_city = 'Nizhnij Novgorod'
   elif ru_city == 'Пермь':
       en_city = 'Perm'
   else:
       en_city = translator.translate(ru_city, dest='en').text

Читаем все строки из таблицы с нужным городом и проверяем количество чекинов каждой пивоварни. В самом начале материала мы завели словарь city_names. При помощи функции map() мы пишем лямбда-выражение, которое возвращает значение ключа словаря city_names только если входной аргумент из колонки df[‘ГОРОД’] совпадает с каким-либо из ключей в city_names. В случае, если совпадения не будет возвращает просто x во избежание np.Nan.

Например, для наименования «СПБ» в колонке df[‘ГОРОД’] вернётся значение «Санкт-Петербург», так как такой ключ есть в city_names. Для «Воронеж» название таким и останется, так как совпадающий ключ не найден. В конце удаляем возможные дубликаты из DataFrame, добавляем колонку с номером места пивоварни и забираем себе первые 10 строк — это и будет топ-10 пивоварен по нужному городу.

df = pd.read_csv(f'data/cities/{en_city}.csv')
df = df.loc[df['ЧЕКИНОВ'] >= checkins_n]
df['ГОРОД'] = df['ГОРОД'].map(lambda x: city_names[x] if (x in city_names) else x)
df.drop_duplicates(subset=['НАЗВАНИЕ', 'ГОРОД'], keep='first', inplace=True) 
df.insert(0, 'МЕСТО', list('🏆 ' + str(i) if i in [1, 2, 3] else str(i) for i in range(1, len(df) + 1)))
selected_df = df.head(10)

Вне зависимости от того, получали мы DataFrame общей функцией get_top_russian_breweries() или по конкретному городу, собираем таблицу, задаём стили и возвращаем готовый dbc-объект.


Вёрстка в Dash Bootstrap Components

table = dbc.Table.from_dataframe(selected_df, striped=False,
                                bordered=False, hover=True,
                                size='sm',
                                style={'background-color': '#ffffff',
                                       'font-family': 'Proxima Nova Regular',
                                       'text-align':'center',
                                       'fontSize': '12px'},
                                className='table borderless'
                                )

return table

Структура вёрстки

Опишем в application.py слайдер, таблицу и Dropdown-фильтр с выбором города.

О вёрстке дашборда при помощи Dash Bootstrap Components мы говорили в предыдущем материале цикла

checkins_slider_tab_1 = dbc.CardBody(
                           dbc.FormGroup(
                               [
                                   html.H6('Количество чекинов', style={'text-align': 'center'})),
                                   dcc.Slider(
                                       id='checkin_n_tab_1',
                                       min=0,
                                       max=250,
                                       step=25,
                                       value=250,  
                                       loading_state={'is_loading': True},
                                       marks={i: i for i in list(range(0, 251, 25))}
                                   ),
                               ],
                           ),
                           style={'max-height': '80px', 
                                  'padding-top': '25px'
                                  }
                       )

top_breweries = dbc.Card(
       [
           dbc.CardBody(
               [
                   dbc.FormGroup(
                       [
                           html.H6('Фильтр городов', style={'text-align': 'center'}),
                           dcc.Dropdown(
                               id='city_menu',
                               options=[{'label': i, 'value': i} for i in all_cities],
                               multi=False,
                               placeholder='Выберите город',
                               style={'font-family': 'Proxima Nova Regular'}
                           ),
                       ],
                   ),
                   html.P(id="tab-1-content", className="card-text"),
               ],
           ),
   ],
)

И для обновления таблицы по фильтру и слайдеру с минимальным количеством чекинов опишем callback с вызовом get_top_russian_breweries_table(city, checkin_n):

@app.callback(
   Output("tab-1-content", "children"), [Input("city_menu", "value"),
                                         Input("checkin_n_tab_1", "value")]
)
def table_content(city, checkin_n):
   return get_top_russian_breweries_table(city, checkin_n)

Готово! Напомню, в материале описан пример создания только одной таблицы. На данный момент дашборд помимо лучших пивоварен выдаёт лучшие и худшие сорта пива, а также средний рейтинг пива по регионам и отношение количества чекинов каждой пивоварни к её средней оценке.

Полный код проекта доступен на GitHub

Собираем топ-10 аккаунтов Instagram по теме аналитики и машинного обучения

Время чтения текста – 11 минут

В некоторых телеграм-каналах (раз, два) уже говорилось про другие интересные паблики в телеграме, однако по Instagram такого топа пока не было. Вероятно, это не самая популярная сеть для контента в нашей индустрии, тем не менее, можно проверить эту гипотезу, используя Python и данные. В этом материале рассказываем, как собрать данные по аккаунтам Instagram без API.

Метод сбора данных
Instagram API не позволит вам просто так собирать данные о других пользователях, но есть и другой метод. Можно отправить такой request-запрос:

https://instagram.com/leftjoin/?__a=1

И получить в ответе JSON-объект со всей информацией о пользователе, которую можно посмотреть самому: имя аккаунта, количество постов, подписок и подписчиков, а также первые десять постов с информацией про них: количество лайков, комментарии и прочее. Именно на таких request-запросах устроена библиотека pyInstagram.

Схема данных
Будем собирать данные в три таблицы Clickhouse: пользователи, посты и комментарии. В таблицу пользователей собираем всю информацию о них: идентификатор, наименование аккаунта, имя и фамилия человека, описание профиля, количество подписок и подписчиков, количество постов, суммарное количество комментариев и лайков, наличие верификации, география пользователя и ссылки на аватарку и Facebook.

CREATE TABLE instagram.users
(
    `added_at` DateTime,
    `user_id` UInt64,
    `user_name` String,
    `full_name` String,
    `base_url` String,
    `biography` String,
    `followers_count` UInt64,
    `follows_count` UInt64,
    `media_count` UInt64,
    `total_comments` UInt64,
    `total_likes` UInt64,
    `is_verified` UInt8,
    `country_block` UInt8,
    `profile_pic_url` Nullable(String),
    `profile_pic_url_hd` Nullable(String),
    `fb_page` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

В таблицу с постами сохраняем автора поста, идентификатор записи, текст, количество комментариев и прочее. is_ad, is_album и is_video — поля, проверяющие, является ли запись рекламной, «каруселью» изображений или видеозаписью.

CREATE TABLE instagram.posts
(
    `added_at` DateTime,
    `owner` String,
    `post_id` UInt64,
    `caption` Nullable(String),
    `code` String,
    `comments_count` UInt64,
    `comments_disabled` UInt8,
    `created_at` DateTime,
    `display_url` String,
    `is_ad` UInt8,
    `is_album` UInt8,
    `is_video` UInt8,
    `likes_count` UInt64,
    `location` Nullable(String),
    `recources` Array(String),
    `video_url` Nullable(String)
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

В таблице с комментариями храним отдельно каждый комментарий к записи с автором и текстом.

CREATE TABLE instagram.comments
(
    `added_at` DateTime,
    `comment_id` UInt64,
    `post_id` UInt64,
    `comment_owner` String,
    `comment_text` String
)
ENGINE = ReplacingMergeTree
ORDER BY added_at

Скрипт
Из библиотеки pyInstagram нам понадобятся классы Account, Media, WebAgent и Comment.

from instagram import Account, Media, WebAgent, Comment
from datetime import datetime
from clickhouse_driver import Client
import requests
import pandas as pd

Создаем экземпляр класса WebAgent — он необходим для вызова некоторых методов и обновления аккаунтов. В начале нам нужно иметь хотя бы названия профилей пользователей, информацию о которых мы хотим собрать, поэтому отправим другой request-запрос для поиска пользователей по ключевым словам, их список ниже в фрагменте кода. В выдаче будут аккаунты, у которых название или описание профиля совпало с ключевым словом.

agent = WebAgent()
queries_list = ['machine learning', 'data science', 'data analytics', 'analytics', 'business intelligence',
                'data engineering', 'computer science', 'big data', 'artificial intelligence',
                'deep learning', 'data scientist','machine learning engineer', 'data engineer']
client = Client(host='12.34.56.789', user='default', password='', port='9000', database='instagram')
url = 'https://www.instagram.com/web/search/topsearch/?context=user&count=0'

Проходим по всем ключевым словам и собираем все аккаунты. Так как в списке могли образоваться дубликаты, переведём список в множество и обратно в список.

response_list = []
for query in queries_list:
    response = requests.get(url, params={
        'query': query
    }).json()
    response_list.extend(response['users'])
instagram_pages_list = []
for item in response_list:
    instagram_pages_list.append(item['user']['username'])
instagram_pages_list = list(set(instagram_pages_list))

Теперь проходим по списку аккаунтов, и если аккаунта с таким наименованием ещё не было в базе, то получаем расширенную информацию о нём. Для этого пробуем создать экземпляр класса Account, передав username параметром. После при помощи объекта agent обновляем информацию об аккаунте. Будем собирать только первые 100 постов, чтобы сбор не задерживался. Создадим список media_list — он при помощи метода get_media будет хранить код каждого поста, который затем можно будет получить при помощи класса Media.


Сбор медиа аккаунта

all_posts_list = []
username_count = 0
for username in instagram_pages_list:
    if client.execute(f"SELECT count(1) FROM users WHERE user_name='{username}'")[0][0] == 0:
        print('username:', username_count, '/', len(instagram_pages_list))
        username_count += 1
        account_total_likes = 0
        account_total_comments = 0
        try:
            account = Account(username)
        except Exception as E:
            print(E)
            continue
        try:
            agent.update(account)
        except Exception as E:
            print(E)
            continue
        if account.media_count < 100:
            post_count = account.media_count
        else:
            post_count = 100
        print(account, post_count)
        media_list, _ = agent.get_media(account, count=post_count, delay=1)
        count = 0

Мы начинаем с постов и комментариев, потому что для занесения в базу нового пользователя нам нужно подсчитать сперва суммарное количество комментариев и лайков в его аккаунте. Практически все интересующие поля являются атрибутами класса Media.


Сбор постов пользователя

for media_code in media_list:
            if client.execute(f"SELECT count(1) FROM posts WHERE code='{media_code}'")[0][0] == 0:
                print('posts:', count, '/', len(media_list))
                count += 1

                post_insert_list = []
                post = Media(media_code)
                agent.update(post)
                post_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                post_insert_list.append(str(post.owner))
                post_insert_list.append(post.id)
                if post.caption is not None:
                    post_insert_list.append(post.caption.replace("'","").replace('"', ''))
                else:
                    post_insert_list.append("")
                post_insert_list.append(post.code)
                post_insert_list.append(post.comments_count)
                post_insert_list.append(int(post.comments_disabled))
                post_insert_list.append(datetime.fromtimestamp(post.date).strftime('%Y-%m-%d %H:%M:%S'))
                post_insert_list.append(post.display_url)
                try:
                    post_insert_list.append(int(post.is_ad))
                except TypeError:
                    post_insert_list.append('cast(Null as Nullable(UInt8))')
                post_insert_list.append(int(post.is_album))
                post_insert_list.append(int(post.is_video))
                post_insert_list.append(post.likes_count)
                if post.location is not None:
                    post_insert_list.append(post.location)
                else:
                    post_insert_list.append('')
                post_insert_list.append(post.resources)
                if post.video_url is not None:
                    post_insert_list.append(post.video_url)
                else:
                    post_insert_list.append('')
                account_total_likes += post.likes_count
                account_total_comments += post.comments_count
                try:
                    client.execute(f'''
                        INSERT INTO posts VALUES {tuple(post_insert_list)}
                    ''')
                except Exception as E:
                    print('posts:')
                    print(E)
                    print(post_insert_list)

Чтобы собрать комментарии необходимо вызвать метод get_comments и передать параметром экземпляр класса Media.


Сбор комментариев из поста

comments = agent.get_comments(media=post)
                for comment_id in comments[0]:
                    comment_insert_list = []
                    comment = Comment(comment_id)
                    comment_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                    comment_insert_list.append(comment.id)
                    comment_insert_list.append(post.id)
                    comment_insert_list.append(str(comment.owner))
                    comment_insert_list.append(comment.text.replace("'","").replace('"', ''))
                    try:
                        client.execute(f'''
                            INSERT INTO comments VALUES {tuple(comment_insert_list)}
                        ''')
                    except Exception as E:
                        print('comments:')
                        print(E)
                        print(comment_insert_list)


Наконец, когда все посты и комментарии пройдены, можем занести информацию о пользователе.

Сбор информации о пользователе

user_insert_list = []
        user_insert_list.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
        user_insert_list.append(account.id)
        user_insert_list.append(account.username)
        user_insert_list.append(account.full_name)
        user_insert_list.append(account.base_url)
        user_insert_list.append(account.biography)
        user_insert_list.append(account.followers_count)
        user_insert_list.append(account.follows_count)
        user_insert_list.append(account.media_count)
        user_insert_list.append(account_total_comments)
        user_insert_list.append(account_total_likes)
        user_insert_list.append(int(account.is_verified))
        user_insert_list.append(int(account.country_block))
        user_insert_list.append(account.profile_pic_url)
        user_insert_list.append(account.profile_pic_url_hd)
        if account.fb_page is not None:
            user_insert_list.append(account.fb_page)
        else:
            user_insert_list.append('')
        try:
            client.execute(f'''
                INSERT INTO users VALUES {tuple(user_insert_list)}
            ''')
        except Exception as E:
            print('users:')
            print(E)
            print(user_insert_list)

Результаты
Таким методом нам удалось собрать 500 пользователей, 20 тысяч постов и 40 тысяч комментариев. Теперь можем написать простой запрос к базе и получить топ-10 Instagram-аккаунтов по теме аналитики и машинного обучения за последнее время:

SELECT *
FROM users
ORDER BY followers_count DESC
LIMIT 10

А вот и приятный бонус, для тех, кто искал на какие аккаунты в Instagram подписаться по релевантной тематике:

  1. @ai_machine_learning
  2. @neuralnine
  3. @datascienceinfo
  4. @compscistuff
  5. @computersciencelife
  6. @welcome.ai
  7. @papa_programmer
  8. @data_science_learn
  9. @neuralnet.ai
  10. @techno_thinkers

Полный код проекта доступен на GitHub

Обзор библиотеки pandas-profiling на примере датасета Superstore Sales

Время чтения текста – 10 минут

Перед тем как работать с данными, необходимо составить представление, с чем мы имеем дело. В материале будем рассматривать датасет SuperStore Sales, а именно его лист Orders. В нём собраны данные о покупках клиентов канадского интернет-супермаркета: идентификаторы заказа, товаров, клиента, тип доставки, цены, категории и названия продуктов и прочее. Подробнее с датасетом можно ознакомиться на GitHub. Например, если мы создадим из датасета DataFrame, можем воспользоваться стандартным методом describe() библиотеки pandas для описания данных:

import pandas as pd

df = pd.read_csv('superstore_sales_orders.csv', decimal=',')
df.describe(include='all')

И во многих случаях получим такую кашу:

Код библиотеки доступен на GitHub

Если постараться и потратить время, можно извлечь полезную информацию. Например, можем узнать, что люди чаще выбирают «Regular air» в качестве доставки или что большинство заказов поступило из провинции Онтарио. Тем не менее, есть и другое решение, которое подробнее и качественнее описывает датасет — библиотека pandas-profiling. Вы отдаёте ей DataFrame, а она генерирует html-страницу с подробным описанием сета данных:

import pandas_profiling
profile = pandas_profiling.ProfileReport(df)
profile.to_file("output.html")

Всего Pandas Profiling возвращает 6 разделов: обзор датасета, переменные, отношения и корреляцию между ними, количество пропущенных значений и примеры из датасета.

Web-версия отчёта доступна по ссылке

Обзор данных

Рассмотрим первый подраздел — «Overview». Библиотека собрала следующую статистику: количество переменных, наблюдений, пропущенных ячеек, дубликатов и общий вес файла. В колонке Variable types описаны типы переменных: здесь 12 качественных и 9 числовых.

В подразделе «Reproduction» собрана техническая информация библиотеки: сколько времени занял анализ сета данных, версия библиотеки и прочее.

А подраздел «Warnings» сообщает о возможных проблемах в структуре датасета: сейчас он, например, предупреждает, что у поля «Order Date» — слишком большое количество уникальных значений.

Переменные

Двигаемся ниже. В этом разделе содержится подробное описание каждой переменной: сколько возможных уникальных значений она принимает, сколько значений пропущено, сколько памяти занимает поле. Справа от статистики присутствует гистограмма с распределением значений поля.

При нажатии на Toggle details откроется расширенная информация: квартили, медиана и прочая полезная описательная статистика. В остальных вкладках находятся гистограмма из основного экрана, топ-10 значений по частоте и экстремальные значения.

Отношения переменных

В этом разделе визуализированы отношения переменных при помощи hexbin plot: выглядит это не очень очевидно и понятно. Особенно усугубляет положение отсутствие легенды к графику.

Корреляция переменных

В этом разделе представлена по-разному посчитананя корреляция переменных: например, первым указано r-value Пирсона. Заметно, что переменная Profit положительно коррелирует с переменной Sales. При нажатии на Toggle correlation descriptions открывается подробное пояснение к каждому коэффициенту.

Пропущенные значения

Тут всё просто — bar chart, матрица и дендрограмма с количеством заполненных полей в каждой переменной. Заметно, что в колонке Product Base Margin отсутствуют три значения.

Примеры

И, наконец, последний раздел представляет первые и последние 10 значений в качестве примера кусков сета данных — аналог метода head() из pandas.

Что в итоге?

Библиотека уделяет больше внимания статистике, чем pandas: можно получить подробную описательную статистику по каждой переменной, посмотреть, как коррелируют между собой столбцы датасета. В совокупности с генерацией простого и удобного интерфейса библиотека строит полноценный отчёт по датасету, уже на основании которого можно делать выводы и сформировать представление о данных.
И всё же, у библиотеки есть и минусы. На генерацию отчётов к громадным датасетам может уйти много времени вплоть до нескольких часов. Это безусловно хороший инструмент для автоматического проектирования, но он не может сделать полноценный анализ за вас и добавить больше деталей в графики. Кроме того, если вы только начали практиковаться с анализом данных лучше будет начать с pandas — это закрепит ваши навыки и придаст уверенности при работе с данными.

Пишем клиент для нового API nalog.ru

Время чтения текста – 6 минут

Ранее в блоге мы рассказывали, как благодаря открытому API можно собирать данные от ФНС по нашим чекам из магазинов, обращаясь к приложению налоговой. С прошлой недели способ стал нерабочим: ФНС обновили методы приложения. Сегодня напишем свой клиент для nalog.ru, который проходит авторизацию и отправляет чеки на проверку.

Авторизация

Прежде чем начать пользоваться приложением, наш профиль необходимо авторизовать. В отличии от предыдущей версии, текущая требует прохождение капчи для авторизации по мобильному телефону — такой способ нам не подходит. Проще всего будет входить в профиль по ИНН и паролю от личного кабинета налогоплательщика. Для хранения этих данных создадим файл credentials.env. Переменную CLIENT_SECRET зададим согласно коду: она отвечает за авторизацию приложения. А ИНН и пароль подставляем свои.

INN = 
PASSWORD = 
CLIENT_SECRET=IyvrAbKt9h/8p6a7QPh8gpkXYQ4=

Теперь создадим файл nalog_python.py, в котором будет описан клиент. Библиотека dotenv используется для загрузки нашего логина и пароля из .env файла.

import os
import json
import requests
from dotenv import load_dotenv

Опишем класс NalogRuPython, и начнём с глобальных переменных класса. Здесь перечисляем headers, необходимые для отправки запроса.

class NalogRuPython:
    HOST = 'irkkt-mobile.nalog.ru:8888'
    DEVICE_OS = 'iOS'
    CLIENT_VERSION = '2.9.0'
    DEVICE_ID = '7C82010F-16CC-446B-8F66-FC4080C66521'
    ACCEPT = '*/*'
    USER_AGENT = 'billchecker/2.9.0 (iPhone; iOS 13.6; Scale/2.00)'
    ACCEPT_LANGUAGE = 'ru-RU;q=1, en-US;q=0.9'

В конструкторе класса прочитаем данные из нашего окружения методом load_dotenv и вызовем метод set_session_id для получения __session_id, который сейчас опишем. Идентификатор сессии необходим для отправки прочих запросов к серверу налоговой, поэтому его получаем первым делом.

def __init__(self):
        load_dotenv()
        self.__session_id = None
        self.set_session_id()

Метод set_session_id проводит авторизацию пользователя по ИНН и паролю от личного кабинета налогоплательщика и ничего не возвращает, только задаёт значение переменной __session_id. Отправляем по указанному в глобальных переменных HOST запрос с нашими данными от аккаунта и получаем идентификатор сессии в ответе.

def set_session_id(self) -> None:
        if os.getenv('CLIENT_SECRET') is None:
            raise ValueError('OS environments not content "CLIENT_SECRET"')
        if os.getenv('INN') is None:
            raise ValueError('OS environments not content "INN"')
        if os.getenv('PASSWORD') is None:
            raise ValueError('OS environments not content "PASSWORD"')

        url = f'https://{self.HOST}/v2/mobile/users/lkfl/auth'
        payload = {
            'inn': os.getenv('INN'),
            'client_secret': os.getenv('CLIENT_SECRET'),
            'password': os.getenv('PASSWORD')
        }
        headers = {
            'Host': self.HOST,
            'Accept': self.ACCEPT,
            'Device-OS': self.DEVICE_OS,
            'Device-Id': self.DEVICE_ID,
            'clientVersion': self.CLIENT_VERSION,
            'Accept-Language': self.ACCEPT_LANGUAGE,
            'User-Agent': self.USER_AGENT,
        }

        resp = requests.post(url, json=payload, headers=headers)
        self.__session_id = resp.json()['sessionId']

Получение идентификатора чека

Следующий шаг — получение ticket_id. Прежде чем отправить сам чек, необходимо получить его идентификатор для проверки. Опишем метод _get_ticket_id, который принимает строку с расшифрованным QR-кодом чека, отправляет соответствующий запрос на сервер и возвращает идентификатор для этой строки. В headers помимо указания глобальных переменных появился также __session_id, который требует метод /v2/ticket.

def _get_ticket_id(self, qr: str) -> str:
        url = f'https://{self.HOST}/v2/ticket'
        payload = {'qr': qr}
        headers = {
            'Host': self.HOST,
            'Accept': self.ACCEPT,
            'Device-OS': self.DEVICE_OS,
            'Device-Id': self.DEVICE_ID,
            'clientVersion': self.CLIENT_VERSION,
            'Accept-Language': self.ACCEPT_LANGUAGE,
            'sessionId': self.__session_id,
            'User-Agent': self.USER_AGENT,
        }
        resp = requests.post(url, json=payload, headers=headers)
        return resp.json()["id"]

Расшифровка чека

Последний шаг — проверка чека. Формируем по ticket_id запрос к серверу и получаем подробную расшифровку чека в ответе. На этом клиент полностью описан и готов к работе.

def get_ticket(self, qr: str) -> dict:
        ticket_id = self._get_ticket_id(qr)
        url = f'https://{self.HOST}/v2/tickets/{ticket_id}'
        headers = {
            'Host': self.HOST,
            'sessionId': self.__session_id,
            'Device-OS': self.DEVICE_OS,
            'clientVersion': self.CLIENT_VERSION,
            'Device-Id': self.DEVICE_ID,
            'Accept': self.ACCEPT,
            'User-Agent': self.USER_AGENT,
            'Accept-Language': self.ACCEPT_LANGUAGE,
        }
        resp = requests.get(url, headers=headers)
        return resp.json()

Наконец, для удобства опишем пример работы клиента. Создадим экземпляр класса NalogRuPython, зададим для примера строку QR-кода и получим расшифрованный ticket, который затем напечатаем на экране.

if __name__ == '__main__':
    client = NalogRuPython()
    qr_code = "t=20200727T1117&s=4850.00&fn=9287440300634471&i=13571&fp=3730902192&n=1"
    ticket = client.get_ticket(qr_code)
    print(json.dumps(ticket, indent=4))

Клиент можно использовать и в своих скриптах: для этого нужно импортировать класс, создать экземпляр и, как и в примере, вызвать метод get_ticket.

from nalog_python import NalogRuPython

client = NalogRuPython()
qr_code = "t=20200727T1117&s=4850.00&fn=9287440300634471&i=13571&fp=3730902192&n=1"
ticket = client.get_ticket(qr_code)

Полный код проекта на GitHub

 19 комментариев    1746   3 мес   Data Analytics   nalog.ru   python
Ранее Ctrl + ↓