40 заметок с тегом

python

Позднее Ctrl + ↑

Обнаружение статистических выбросов в Python

Время чтения текста – 7 минут

Параллельно с выходом материала «Обнаружение выбросов в R» предлагаем посмотреть, как те же методы обнаружения выбросов реализовать в Python.

Данные

Для наглядности эксперимента возьмём тот же пакет данных mpg — скачать его в виде csv-таблицы можно с GitHub. Импортируем библиотеки и читаем таблицу в DataFrame:

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

df = pd.read_csv('mpg.csv')

Минимальные и максимальные значения

Тут всё просто. Выводим описание всего датасета методом describe():

df.describe()

Гистограмма

Такой график тоже можно построить в одну строку, используя внутренние средства библиотеки pandas:

df.hwy.plot(kind='hist', density=1, bins=20, stacked=False, alpha=.5, color='grey')

Box plot

В случае ящика с усами далеко идти тоже не приходится — в pandas есть метод и для этого:

_, bp = df.hwy.plot.box(return_type='both')

Получим точки с графика и выведем их в таблице, используя объект bp:

outliers = [flier.get_ydata() for flier in bp["fliers"]][0]
df[df.hwy.isin(outliers)]

Процентили

При помощи метода quantile получаем соответствующую нижнюю и верхнюю границы, а затем выводим всё, что выходит за их рамки:

lower_bound = df.hwy.quantile(q=0.025)
upper_bound = df.hwy.quantile(q=0.975)
df[(df.hwy < lower_bound) | (df.hwy > upper_bound)]

Фильтр Хэмпеля

Мы используем реализацию фильтра Хэмпеля, найденную на StackOverflow

Опишем функцию, которая заменяет на nan все значения, у которых разница с медианой больше, чем три медианных абсолютных отклонения.

def hampel(vals_orig):
    vals = vals_orig.copy()    
    difference = np.abs(vals.median()-vals)
    median_abs_deviation = difference.median()
    threshold = 3 * median_abs_deviation
    outlier_idx = difference > threshold
    vals[outlier_idx] = np.nan
    return(vals)

И применим к нашему набору данных:

hampel(df.hwy)

0      29.0
1      29.0
2      31.0
3      30.0
4      26.0
       ... 
229    28.0
230    29.0
231    26.0
232    26.0
233    26.0
Name: hwy, Length: 234, dtype: float64

В выводе нет nan-значений, а значит и выбросов фильтр Хэмпеля не обнаружил.

Тест Граббса

Автор реализации теста Граббса и теста Рознера для Python

Опишем три функции: первая находит значение критерия Граббса и максимальное значение в наборе данных, вторая — критическое значение с учётом объёма выборки и уровня значимости, а третья проверяет, является ли значение с максимальным индексом выбросом:

import numpy as np
from scipy import stats

def grubbs_stat(y):
    std_dev = np.std(y)
    avg_y = np.mean(y)
    abs_val_minus_avg = abs(y - avg_y)
    max_of_deviations = max(abs_val_minus_avg)
    max_ind = np.argmax(abs_val_minus_avg)
    Gcal = max_of_deviations / std_dev
    print(f"Grubbs Statistics Value: {Gcal}")
    return Gcal, max_ind

def calculate_critical_value(size, alpha):
    t_dist = stats.t.ppf(1 - alpha / (2 * size), size - 2)
    numerator = (size - 1) * np.sqrt(np.square(t_dist))
    denominator = np.sqrt(size) * np.sqrt(size - 2 + np.square(t_dist))
    critical_value = numerator / denominator
    print(f"Grubbs Critical Value: {critical_value}")
    return critical_value

def check_G_values(Gs, Gc, inp, max_index):
    if Gs > Gc:
        print(f"{inp[max_index]} is an outlier")
    else:
        print(f"{inp[max_index]} is not an outlier")

Заменим значение в 34 строке на 212:

df.hwy[34] = 212

И выполним три функции:

Gcritical = calculate_critical_value(len(df.hwy), 0.05)
Gstat, max_index = grubbs_stat(df.hwy)
check_G_values(Gstat, Gcritical, df.hwy, max_index)

Grubbs Critical Value: 3.652090929984981
Grubbs Statistics Value: 13.745808761040397
212 is an outlier

Тест Рознера

Для теста Рознера достаточно дописать одну функцию, которая принимает набор данных, уровень значимости и число потенциальных выбросов:

def ESD_test(input_series, alpha, max_outliers):
    for iteration in range(max_outliers):
        Gcritical = calculate_critical_value(len(input_series), alpha)
        Gstat, max_index = grubbs_stat(input_series)
        check_G_values(Gstat, Gcritical, input_series, max_index)
        input_series = np.delete(input_series, max_index)

Используя функцию на нашем наборе данных получаем, что значение 212 является выбросом, а 44 — нет:

ESD_test(np.array(df.hwy), 0.05, 3)

Grubbs Critical Value: 3.652090929984981
Grubbs Statistics Value: 13.745808761040408
212 is an outlier
Grubbs Critical Value: 3.6508358337727187
Grubbs Statistics Value: 3.455960616168714
44 is not an outlier
Grubbs Critical Value: 3.649574509044683
Grubbs Statistics Value: 3.5561478280392245
44 is not an outlier

Python и тексты нового альбома Земфиры: анализируем суть песен

Время чтения текста – 18 минут

Неделю назад вышёл первый за 8 лет студийный альбом Земфиры «Бордерлайн». К работе помимо рок-певицы приложили руку разные люди, в том числе и её родственники — рифф для песни «таблетки» написал её племянник из Лондона. Альбом получился разнообразным: например, песня «остин» посвящена главному персонажу игры Homescapes российской студии Playrix (кстати, посмотрите свежие Бизнес-секреты с братьями Бухманами, там они тоже про это рассказывают) — Земфире нравится игра, и для трека она связалась со студией. А сингл «крым» был написан в качестве саундтрека к новой картине соратницы Земфиры — Ренаты Литвиновой.

Послушать альбом в Apple Music / Яндекс.Музыке / Spotify

Тем не менее, дух всего альбома довольно мрачен — в песнях часто повторяются слова «боль», «ад», «бесишь» и прочие по смыслу. Мы решили провести разведочный анализ нового альбома, а затем при помощи модели Word2Vec и косинусной меры посмотреть на семантическую близость песен между собой и вычислить общее настроение альбома.

Для тех, кому скучно читать про подготовку данных и шаги анализа можно перейти сразу к результатам.

Подготовка данных

Для начала работы напишем скрипт обработки данных. Цель скрипта — из множества текстовых файлов, в каждом из которых лежит по песне, собрать единую csv-таблицу. При этом текст треков очищаем от знаков пунктуации и ненужных слов.

import pandas as pd
import re
import string
import pymorphy2
from nltk.corpus import stopwords

Создаём морфологический анализатор и расширяем список всего, что нужно отбросить:

morph = pymorphy2.MorphAnalyzer()
stopwords_list = stopwords.words('russian')
stopwords_list.extend(['куплет', 'это', 'я', 'мы', 'ты', 'припев', 'аутро', 'предприпев', 'lyrics', '1', '2', '3', 'то'])
string.punctuation += '—'

Названия песен приведены на английском — создадим словарь для перевода на русский и словарь, из которого позднее сделаем таблицу:

result_dict = dict()

songs_dict = {
    'snow':'снег идёт',
    'crimea':'крым',
    'mother':'мама',
    'ostin':'остин',
    'abuse':'абьюз',
    'wait_for_me':'жди меня',
    'tom':'том',
    'come_on':'камон',
    'coat':'пальто',
    'this_summer':'этим летом',
    'ok':'ок',
    'pills':'таблетки'
}

Опишем несколько функций. Первая читает целиком песню из файла и удаляет переносы строки, вторая очищает текст от ненужных символов и слов, а третья при помощи морфологического анализатора pymorphy2 приводит слова к нормальной форме. Модуль pymorphy2 не всегда хорошо справляется с неоднозначностью — для слов «ад» и «рай» потребуется дополнительная обработка.

def read_song(filename):
    f = open(f'{filename}.txt', 'r').read()
    f = f.replace('\n', ' ')
    return f

def clean_string(text):
    text = re.split(' |:|\.|\(|\)|,|"|;|/|\n|\t|-|\?|\[|\]|!', text)
    text = ' '.join([word for word in text if word not in string.punctuation])
    text = text.lower()
    text = ' '.join([word for word in text.split() if word not in stopwords_list])
    return text

def string_to_normal_form(string):
    string_lst = string.split()
    for i in range(len(string_lst)):
        string_lst[i] = morph.parse(string_lst[i])[0].normal_form
        if (string_lst[i] == 'аду'):
            string_lst[i] = 'ад'
        if (string_lst[i] == 'рая'):
            string_lst[i] = 'рай'
    string = ' '.join(string_lst)
    return string

Проходим по каждой песне и читаем файл с соответствующим названием:

name_list = []
text_list = []
for song, name in songs_dict.items():
    text = string_to_normal_form(clean_string(read_song(song)))
    name_list.append(name)
    text_list.append(text)

Затем объединяем всё в DataFrame и сохраняем в виде csv-файла.

df = pd.DataFrame()
df['name'] = name_list
df['text'] = text_list
df['time'] = [290, 220, 187, 270, 330, 196, 207, 188, 269, 189, 245, 244]
df.to_csv('borderline.csv', index=False)

Результат:

Облако слов по всему альбому

Начнём анализ с построения облака слов — оно отобразит, какие слова чаще всего встречаются в песнях. Импортируем нужные библиотеки, читаем csv-файл и устанавливаем конфигурации:

import nltk
from wordcloud import WordCloud
import pandas as pd
import matplotlib.pyplot as plt
from nltk import word_tokenize, ngrams

%matplotlib inline
nltk.download('punkt')
df = pd.read_csv('borderline.csv')

Теперь создаём новую фигуру, устанавливаем параметры оформления и при помощи библиотеки wordcloud отображаем слова с размером прямо пропорциональным частоте упоминания слова. Над каждым графиком дополнительно указываем название песни.

fig = plt.figure()
fig.patch.set_facecolor('white')
plt.subplots_adjust(wspace=0.3, hspace=0.2)
i = 1
for name, text in zip(df.name, df.text):
    tokens = word_tokenize(text)
    text_raw = " ".join(tokens)
    wordcloud = WordCloud(colormap='PuBu', background_color='white', contour_width=10).generate(text_raw)
    plt.subplot(4, 3, i, label=name,frame_on=True)
    plt.tick_params(labelsize=10)
    plt.imshow(wordcloud)
    plt.axis("off")
    plt.title(name,fontdict={'fontsize':7,'color':'grey'},y=0.93)
    plt.tick_params(labelsize=10)
    i += 1

EDA текстов альбома

Теперь проанализируем тексты песен — импортируем библиотеки для работы с данными и визуализации:

import plotly.graph_objects as go
import plotly.figure_factory as ff
from scipy import spatial
import collections
import pymorphy2
import gensim

morph = pymorphy2.MorphAnalyzer()

Сначала посчитаем число слов в каждой песне, число уникальных слов и процентное соотношение:

songs = []
total = []
uniq = []
percent = []

for song, text in zip(df.name, df.text):
    songs.append(song)
    total.append(len(text.split()))
    uniq.append(len(set(text.split())))
    percent.append(round(len(set(text.split())) / len(text.split()), 2) * 100)

А теперь составим из этого DataFrame и дополнительно посчитаем число слов в минуту для каждой песни:

df_words = pd.DataFrame()
df_words['song'] = songs
df_words['total words'] = total
df_words['uniq words'] = uniq
df_words['percent'] = percent
df_words['time'] = df['time']
df_words['words per minute'] = round(total / (df['time'] // 60))
df_words = df_words[::-1]

Данные хорошо бы визуализировать — построим две столбиковые диаграммы: одну для числа слов в песне, а другую для числа слов в минуту.

colors_1 = ['rgba(101,181,205,255)'] * 12
colors_2 = ['rgba(62,142,231,255)'] * 12

fig = go.Figure(data=[
    go.Bar(name='📝 Всего слов',
           text=df_words['total words'],
           textposition='auto',
           x=df_words.song,
           y=df_words['total words'],
           marker_color=colors_1,
           marker=dict(line=dict(width=0)),),
    go.Bar(name='🌀 Уникальных слов',
           text=df_words['uniq words'].astype(str) + '<br>'+ df_words.percent.astype(int).astype(str) + '%' ,
           textposition='inside',
           x=df_words.song,
           y=df_words['uniq words'],
           textfont_color='white',
           marker_color=colors_2,
           marker=dict(line=dict(width=0)),),
])

fig.update_layout(barmode='group')

fig.update_layout(
    title = 
        {'text':'<b>Соотношение числа уникальных слов к общему количеству</b><br><span style="color:#666666"></span>'},
    showlegend = True,
    height=650,
    font={
        'family':'Open Sans, light',
        'color':'black',
        'size':14
    },
    plot_bgcolor='rgba(0,0,0,0)',
)
fig.update_layout(legend=dict(
    yanchor="top",
    xanchor="right",
))

fig.show()
colors_1 = ['rgba(101,181,205,255)'] * 12
colors_2 = ['rgba(238,85,59,255)'] * 12

fig = go.Figure(data=[
    go.Bar(name='⏱️ Длина трека, мин.',
           text=round(df_words['time'] / 60, 1),
           textposition='auto',
           x=df_words.song,
           y=-df_words['time'] // 60,
           marker_color=colors_1,
           marker=dict(line=dict(width=0)),
          ),
    go.Bar(name='🔄 Слов в минуту',
           text=df_words['words per minute'],
           textposition='auto',
           x=df_words.song,
           y=df_words['words per minute'],
           marker_color=colors_2,
           textfont_color='white',
           marker=dict(line=dict(width=0)),
          ),
])

fig.update_layout(barmode='overlay')

fig.update_layout(
    title = 
        {'text':'<b>Длина трека и число слов в минуту</b><br><span style="color:#666666"></span>'},
    showlegend = True,
    height=650,
    font={
        'family':'Open Sans, light',
        'color':'black',
        'size':14
    },
    plot_bgcolor='rgba(0,0,0,0)'
)


fig.show()

Работа с Word2Vec моделью

При помощи модуля gensim загружаем модель, указывая на бинарный файл:

model = gensim.models.KeyedVectors.load_word2vec_format('model.bin', binary=True)

Для материала мы использовали готовую обученную на Национальном Корпусе Русского Языка модель от сообщества RusVectōrēs

Модель Word2Vec основана на нейронных сетях и позволяет представлять слова в виде векторов, учитывая семантическую составляющую. Это означает, что если мы возьмём два слова — например, «мама» и «папа», представим их в виде двух векторов и посчитаем косинус, значения будет близко к 1. Аналогично, у двух слов, не имеющих ничего общего по смыслу косинусная мера близка к 0.

Опишем функцию get_vector: она будет принимать список слов, распознавать для каждого часть речи, а затем получать и суммировать вектора — так мы сможем находить вектора не для одного слова, а для целых предложений и текстов.

def get_vector(word_list):
    vector = 0
    for word in word_list:
        pos = morph.parse(word)[0].tag.POS
        if pos == 'INFN':
            pos = 'VERB'
        if pos in ['ADJF', 'PRCL', 'ADVB', 'NPRO']:
            pos = 'NOUN'
        if word and pos:
            try:
                word_pos = word + '_' + pos
                this_vector = model.word_vec(word_pos)
                vector += this_vector
            except KeyError:
                continue
    return vector

Для каждой песни находим вектор и собираем соответствующий столбец в DataFrame:

vec_list = []
for word in df['text']:
    vec_list.append(get_vector(word.split()))
df['vector'] = vec_list

Теперь сравним вектора между собой, посчитав их косинусную близость. Те песни, у которых косинусная метрика выше 0,5 запомним отдельно — так мы получим самые близкие пары песен. Данные о сравнении векторов запишем в двумерный список result.

similar = dict()
result = []
for song_1, vector_1 in zip(df.name, df.vector):
    sub_list = []
    for song_2, vector_2 in zip(df.name.iloc[::-1], df.vector.iloc[::-1]):
        res = 1 - spatial.distance.cosine(vector_1, vector_2)
        if res > 0.5 and song_1 != song_2 and (song_1 + ' / ' + song_2 not in similar.keys() and song_2 + ' / ' + song_1 not in similar.keys()):
            similar[song_1 + ' / ' + song_2] = round(res, 2)
        sub_list.append(round(res, 2))
    result.append(sub_list)

Самые похожие треки соберём в отдельный DataFrame:

df_top_sim = pd.DataFrame()
df_top_sim['name'] = list(similar.keys())
df_top_sim['value'] = list(similar.values())
df_top_sim.sort_values(by='value', ascending=False)

И построим такой же bar chart:

colors = ['rgba(101,181,205,255)'] * 5

fig = go.Figure([go.Bar(x=df_top_sim['name'],
                        y=df_top_sim['value'],
                        marker_color=colors,
                        width=[0.4,0.4,0.4,0.4,0.4],
                        text=df_top_sim['value'],
                        textfont_color='white',
                        textposition='auto')])

fig.update_layout(
    title = 
        {'text':'<b>Топ-5 схожих песен</b><br><span style="color:#666666"></span>'},
    showlegend = False,
    height=650,
    font={
        'family':'Open Sans, light',
        'color':'black',
        'size':14
    },
    plot_bgcolor='rgba(0,0,0,0)',
    xaxis={'categoryorder':'total descending'}
)

fig.show()

Имея вектор каждой песни, давайте посчитаем вектор всего альбома — сложим вектора песен. Затем для такого вектора при помощи модели получим самые близкие по духу и смыслу слова.

def get_word_from_tlist(lst):
    for word in lst:
        word = word[0].split('_')[0]
        print(word, end=' ')

vec_sum = 0
for vec in df.vector:
    vec_sum += vec
sim_word = model.similar_by_vector(vec_sum)
get_word_from_tlist(sim_word)

небо тоска тьма пламень плакать горе печаль сердце солнце мрак

Наверное, это ключевой результат и описание альбома Земфиры всего лишь в 10 словах.

Наконец, построим общую тепловую карту, каждая ячейка которой — результат сравнения косинусной мерой текстов двух треков.

colorscale=[[0.0, "rgba(255,255,255,255)"],
            [0.1, "rgba(229,232,237,255)"],
            [0.2, "rgba(216,222,232,255)"],
            [0.3, "rgba(205,214,228,255)"],
            [0.4, "rgba(182,195,218,255)"],
            [0.5, "rgba(159,178,209,255)"],
            [0.6, "rgba(137,161,200,255)"],
            [0.7, "rgba(107,137,188,255)"],
            [0.8, "rgba(96,129,184,255)"],
            [1.0, "rgba(76,114,176,255)"]]

font_colors = ['black']
x = list(df.name.iloc[::-1])
y = list(df.name)
fig = ff.create_annotated_heatmap(result, x=x, y=y, colorscale=colorscale, font_colors=font_colors)
fig.show()

Результаты анализа и интерпретация данных

Давайте ещё раз посмотрим на всё, что у нас получилось — начнём с облака слов. Нетрудно заметить, что у слов «боль», «невозможно», «сорваться», «растерзаны», «сложно», «терпеть», «любить» размер весьма приличный — всё потому, что такие слова встречаются часто на протяжении всего текста песен:

Одной из самых «разнообразных» песен оказался сингл «крым» — в нём 74% уникальных слов. А в песне «снег идёт» слов совсем мало, поэтому большинство — 82% уникальны. Самой большой песней в альбоме получился трек «таблетки» — суммарно там около 150 слов.

Как было выяснено на прошлом графике, самый «динамичный» трек — «таблетки», целых 37 слов в минуту — практически по слову на каждые две секунды. А самый длинный трек — «абъюз», в нём же и согласно предыдущему графику практически самый низкий процент уникальных слов — 46%.

Топ-5 самых семантически похожих пар текстов:

Ещё мы получили вектор всего альбома и подобрали самые близкие слова. Только посмотрите на них — «тьма», «тоска», «плакать», «горе», «печаль», «сердце» — это же ведь и есть тот перечень слов, который характеризует лирику Земфиры!

небо тоска тьма пламень плакать горе печаль сердце солнце мрак

Финал — тепловая карта. По визуализации заметно, что практически все песни достаточно схожи между собой — косинусная мера у многих пар превышает значение в 0.4.

Выводы

В материале мы провели EDA всего текста нового альбома и при помощи предобученной модели Word2Vec доказали гипотезу — большинство песен «бордерлайна» пронизывают довольно мрачные и тексты. И это нормально, ведь Земфиру мы любим именно за искренность и прямолинейность.

Экспорт исторических данных Apple Health в Google Sheets

Время чтения текста – 9 минут

Для устройств на базе iOS и watchOS существует приложение Health, которое ежедневно записывает все данные о здоровье носителя и синхронизирует их со сторонними приложениями. Все эти данные в любой момент можно получить прямо из приложения в виде XML-документа. Сегодня мы выгрузим исторические данные о здоровье из приложения Apple Health, обработаем их и отправим в Google Sheets для анализа и визуализации в будущем.

Экспорт архива из приложения

Зайдите в приложение Health на iPhone. Нажмите на аватарку своего профиля в верхнем правом углу — откроется меню приложения.

Внизу нажмите на кнопку «Экспортировать медданные». Через некоторое время откроется меню экспорта — отправьте архив себе на компьютер любым способом, можно по AirDrop или даже по почте в письме самому себе. Из архива нужен только один файл — «экспорт.xml». Достаньте его и положите в папку с ноутбуком jupyter.

Парсер XML в DataFrame

При помощи библиотеки XML составляем дерево на основе документа из Health. Собирать в словарь будем следующие атрибуты: тип, единица измерения, дата создания, дата начала, дата конца, значение. Проходим по всему дереву и отправляем полученные значения атрибутов в records_dict.

from xml.etree import ElementTree
import pandas as pd
import datetime

tree = ElementTree.parse('экспорт.xml')
root = tree.getroot()
records = root.findall('Record')

records_dict = {
    'type':[],
    'unit':[],
    'creationDate':[],
    'startDate':[],
    'endDate':[],
    'value':[]
}

for record in records:
    for attribute in records_dict.keys():
        attribute_value = record.get(attribute)
        records_dict[attribute].append(attribute_value)

События записаны в нечитабельном виде — для перевода составим специальный словарь с нужными типами, где ключ — старое название, а значение — новое. Мы возьмём только 11 событий: минуты осознанности, дистанция на велосипеде, дистанция заплыва, дистанция ходьбы и бега, пройдено пролётов, пульс, пульс в покое, шаги, активная энергия, энергия покоя и средний пульс при ходьбе.

types_dict = {
    'HKCategoryTypeIdentifierMindfulSession': 'Mindful Session',
    'HKQuantityTypeIdentifierDistanceCycling': 'Cycling Distance',
    'HKQuantityTypeIdentifierDistanceSwimming': 'Swimming Distance',
    'HKQuantityTypeIdentifierDistanceWalkingRunning': 'Walking + Running Distance',
    'HKQuantityTypeIdentifierFlightsClimbed': 'Flights Climbed',
    'HKQuantityTypeIdentifierHeartRate': 'Heart Rate',
    'HKQuantityTypeIdentifierRestingHeartRate': 'Resting Heart Rate',
    'HKQuantityTypeIdentifierStepCount': 'Steps',
    'HKQuantityTypeIdentifierActiveEnergyBurned': 'Active Calories',
    'HKQuantityTypeIdentifierBasalEnergyBurned': 'Resting Calories',
    'HKQuantityTypeIdentifierWalkingHeartRateAverage': 'Walking Heart Rate Average'
}

Для минут осознанности в поле значения записей нет — мы сами посчитаем позже это поле как разницу даты окончания и начала события. Разница будет представлена как timedelta, поэтому напишем функцию перевода timedelta в минуты:

def td_to_m(td):
    seconds = td.seconds + td.days * 24 * 60 * 60
    return seconds // 60

Из словаря создаём DataFrame и задаём названия колонок. Оставляем только те 11 событий, которые есть в словаре types_dict и приводим все колонки к нужным типам данных:

df = pd.DataFrame(records_dict)
df.columns = ['type', 'unit', 'date', 'start', 'end', 'value']
df = df[df['type'].isin(types_dict.keys())]
df['value'] = df['value'].astype(float)
df['date'] = df['date'].astype('datetime64')
df['date'] = df['date'].dt.date
df['start'] = df['start'].astype('datetime64')
df['end'] = df['end'].astype('datetime64')
df['unit'] = df['unit'].astype(str)

Данные Health при экспорте никак не группируются — мы сделаем это самостоятельно. DataFrame можно поделить на три: в первом будут события, у которых единица измерения «количество в минуту» — для таких событий нужно искать среднее значение. В другой группе будут минуты осознанности — считаем число минут в каждой записи и суммируем. В последней группе находятся все остальные записи, связанные с количественными событиями — шаги, дистанция ходьбы и бега и так далее. Их тоже суммируем.

df_1 = df[df['unit'] == 'count/min']
df_1 = df_1.groupby(by=['date', 'type', 'unit'], as_index=False).agg({'start':'min',
                                                                      'end':'max',
                                                                      'value':'mean'})

df_2 = df[df['type'] == 'HKCategoryTypeIdentifierMindfulSession']
df_2['value'] = df_2['end'] - df_2['start']
df_2['value'] = df_2['value'].map(td_to_m)
df_2 = df_2.groupby(by=['date', 'type', 'unit'], as_index=False).agg({'start':'min',
                                                                     'end':'max',
                                                                     'value':'sum'})
df_3 = df[(df['unit'] != 'count/min') & (df['type'] != 'HKCategoryTypeIdentifierMindfulSession')]
df_3 = df_3.groupby(by=['date', 'type', 'unit'], as_index=False).agg({'start':'min',
                                                                      'end':'max',
                                                                      'value':'sum'})
df = pd.concat([df_1, df_2, df_3])

Дату создания записи переводим в строковый тип. Все наименования типов событий заменяем согласно словарю types_dict. В переменную dates записываем все уникальные даты.

df['date'] = df['date'].astype(str)
df['type'] = df['type'].apply(lambda x: types_dict[x])
dates = df['date'].unique()

В результате нужен словарь с колонкой даты и отдельной колонкой под каждое из 11 событий:

result = {
    'date': [],
    'Steps': [],
    'Walking + Running Distance': [],
    'Swimming Distance': [],
    'Cycling Distance': [],
    'Resting Calories': [],
    'Active Calories': [],
    'Flights Climbed': [],
    'Heart Rate': [],
    'Resting Heart Rate': [],
    'Walking Heart Rate Average': [],
    'Mindful Session': []
}

Проходим по каждой дате и получаем кусок DataFrame за эту дату. Добавляем её в словарь и проходим по каждому ключу, пробуя добавить значение:

for date in dates:
    part = df[df['date'] == date]
    result['date'].append(date)
    for key in result.keys():
        if key == 'date':
            continue
        else:
            field = 'value'
        try:
            result[key].append(part[part['type'] == key][field].values[0])
        except IndexError:
            result[key].append(None)

Из полученного словаря создаём DataFrame, округляем всё до двух знаков после запятой и сортируем по дате:

result_df = pd.DataFrame(result)
result_df = result_df.round(2)
result_df = result_df.sort_values(by='date')

В результате получается такая таблица с историческими данными по 11 событиям:

Экспорт DataFrame в Google Sheets

Для экспорта в Google Docs необходим сервисный аккаунт и json-файл с ключом. О том, как его получить, мы писали в материале «Собираем данные по рекламным кампаниям ВКонтакте»

Создайте новый документ в Google Sheets. Весь DataFrame можно вставить одним действием при помощи методов библиотеки gspread. Импортируйте её, а также укажите идентификатор документа и json-файл с ключом. В методе get_worksheet указывается порядковый номер листа в файле начиная с нуля.

import pandas as pd
import gspread
from gspread_dataframe import set_with_dataframe
gc = gspread.service_account(filename='serviceAccount.json')
sh = gc.open_by_key('1osKA63LQkUC0FC0eIZ63jEJwn1TeIkUvqCV6ur')
worksheet = sh.get_worksheet(0)

В итоге в Google Spreadsheets появится такая таблица:

А в следующем материале посмотрим, как наладить ежедневный экспорт данных Здоровья в эту таблицу при помощи шорткатов и Google AppScript!

Транзакции в SQLAlchemy

Время чтения текста – 5 минут

Транзакция — последовательность действий, связанных с базой данных. Их основная польза заключается в том, что при возникновении какой-то ошибки или достижении других нужных условий всю транзакцию можно отменить, и все изменения, примененные к базе данных, будут отменены. Сегодня мы напишем небольшой скрипт, который при помощи транзакций SQLAlchemy пишет информацию о подписчиках сообщества в базу данных MySQL, а при возникновении ошибки отменяет текущую транзакцию.

Сбор информации об участниках через VK API

Для начала напишем пару маленьких функций — первая будет возвращать число подписчиков сообщества, а вторая — отправлять запрос и формировать датафрейм с информацией о подписчиках сообщества.

Подробнее о том, как получить токен, можно прочитать в материале «Собираем данные по рекламным кампаниям ВКонтакте»

from sqlalchemy import create_engine
import pandas as pd
import requests
import time

token = '42hj2ehd3djdournf48fjurhf9r9o2eurnf48fjurhf9r9734'
group_id = 'leftjoin'

Чтобы узнать число подписчиков достаточно отправить метод groups.getMembers с любыми параметрами — в ответе всегда возвращается количество в поле count.

def get_subs_count(group_id):
    count = requests.get('https://api.vk.com/method/groups.getMembers', params={
        'access_token':token,
        'v':5.103,
        'group_id':group_id
    }).json()['response']['count']
    return count

Для примера будем брать имена, id, фамилии подписчиков, некоторую расширенную информацию и получать только по 10 подписчиков за раз, чтобы рассмотреть работу транзакций детально — каждые 10 подписчиков будут вставляться одной транзакцией. Введём дополнительное поле offset, чтобы знать, в какой итерации добавлены строки.

def get_subs_info(group_id, offset):
    response = requests.get('https://api.vk.com/method/groups.getMembers', params={
        'access_token':token,
        'v':5.103,
        'group_id':group_id,
        'offset':offset,
        'count':10,
        'fields':'sex, has_mobile, relation, can_post'
    }).json()['response']['items']
    df = pd.DataFrame(response)
    df['offset'] = offset
    return df

Транзакции

Наконец, можем подсоединиться к базе данных при помощи SQLAlchemy:

engine = create_engine('mysql+mysqlconnector://' +
                           'root' + ':' + '' + '@' +
                           'localhost' + '/' +
                           'transaction', echo=False)

У транзакций всегда должно быть начало — begin, и конец — commit. В случае, если произошла какая-то ошибка, можно сделать откат — rollback. Сперва получаем число подписчиков сообщество, и в каждой итерации цикла при помощи контекстного менеджера with ... as создаём новое подключение. Сразу после объявляем начало транзакции по этому подключению и с обработчиком исключений пробуем получить информацию о десяти подписчиках через функцию get_subs_info. Вставляем полученный датафрейм в таблицу методом to_sql и завершаем транзакцию при помощи метода commit(). В случае, если возникла какая-то ошибка — печатаем её на экран и отменяем транзакцию.

offset = 0
subs_count = get_subs_count(group_id)
while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            transaction.commit()
        except Exception as E:
            print(E)
            transaction.rollback()
    time.sleep(1)
    offset += 10

Чтобы протестировать работу транзакций слегка обновим последний блок кода — добавим вызов ошибки ValueError после вставки данных в базу, если текущий offset равен 10.

offset = 0
subs_count = get_subs_count(group_id)
while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            if offset == 10:
                raise(ValueError)
            transaction.commit()
        except Exception as E:
            print(E)
            transaction.rollback()
    time.sleep(1)
    offset += 10

Как и планировалось, данные за итерацию с offset = 10 не занесены в таблицу. Несмотря на то, что ошибка возникла уже после добавления новых данных, транзакция была прервана методом rollback() и завершение транзакции было отменено.

 Нет комментариев    130   9 мес   Analytics Engineering   python   sql   sqlalchemy

Сбор информации о подписчиках Telegram-канала

Время чтения текста – 6 минут

На 2021 год боты в Telegram так и не имеют метода, позволяющего получать информацию о подписчиках канала. Тем не менее, существует достаточно сложное в освоении Telegram API и построенная на нём библиотека Telethon. Сегодня мы посмотрим, как при помощи библиотеки выгрузить информацию о подписчиках своего канала.

Создание приложения

Для начала необходимо создать приложение, через которое будут отправляться запросы к API. Перейдите на https://my.telegram.org и авторизуйтесь в Telegram-аккаунте:

После успешной авторизации перейдите на страницу API development tools:

Заполните все поля и жмите на создание приложения:

Из полученной конфигурации нам необходим app api_id и app api_hash:

Запрос к API

Импортируем telethon — он поможет сформировать запрос, и pandas — полученный ответ мы запишем в DataFrame.

from telethon import TelegramClient
import pandas as pd

Вводим api_id, api_hash, наш номер телефона и ссылку на канал, информацию о подписчиках которого хотим получить. Доступ к информации о подписчиках есть только у администраторов канала.

api_id = 1234567
api_hash = '1b42hj25kd8jw42b234kwj242c'
phone = '+71234567890'
channel_href = 'https://t.me/leftjoin'

Создаём новую сессию — вместо session_name можно подставить любое другое название. Методы в библиотеке работают асинхронно, поэтому ответа от них требуется ожидать:

client = TelegramClient('session_name', api_id, api_hash)
client = await client.start()
dialogs = await client.get_dialogs()

Собираем все каналы текущего пользователя. Из ссылки забираем часть с именем канала и вытаскиваем из словаря нужный:

channels = {d.entity.username: d.entity
            for d in dialogs
            if d.is_channel}
my_channel = channel_href.split('/')[-1]
channel = channels[my_channel]

Подписчиков, доступ к которым не ограничен приватностью, можно получить методом get_participants. С 20 июля 2018 года Telegram установил ограничение в 200 подписчиков для вызова метода, и установка параметра aggressive на True поможет получить всех подписчиков за раз.

members_telethon_list = await client.get_participants(channel, aggressive=True)

Из полученных библиотечных структур извлекаем информацию о пользователях — их имена и телефоны:

username_list = [member.username for member in members_telethon_list]
first_name_list = [member.first_name for member in members_telethon_list]
last_name_list = [member.last_name for member in members_telethon_list]
phone_list = [member.phone for member in members_telethon_list]

Из четырёх списков собираем DataFrame и пишем его в csv-таблицу:

df = pd.DataFrame()
df['username'] = username_list
df['first_name'] = first_name_list
df['last_name'] = last_name_list
df['phone'] = phone_list
df.to_csv('subscribers.csv', index=False)

Результат работы — такая таблица:

Для запуска в Jupyter Notebook описанный ниже код можно просто вставить в ячейку, но при запуске из Python-файла будет такая ошибка:

SyntaxError: 'await' outside function

Устранить проблему можно, записав весь код в асинхронную функцию. Целиком выглядеть код будет так:

from telethon import TelegramClient
import pandas as pd
import asyncio

async def main():
        api_id = 1234567
        api_hash = '1b42hj25kd8jw42b234kwj242c'
        phone = '+71234567890'
        channel_href = 'https://t.me/leftjoin'

	client = TelegramClient('session_name', api_id, api_hash)
	client = await client.start()
	dialogs = await client.get_dialogs()

	channels = {d.entity.username: d.entity
				for d in dialogs
				if d.is_channel}
	my_channel = channel_href.split('/')[-1]
	channel = channels[my_channel]

	members_telethon_list = await client.get_participants(channel, aggressive=True)

	username_list = [member.username for member in members_telethon_list]
	first_name_list = [member.first_name for member in members_telethon_list]
	last_name_list = [member.last_name for member in members_telethon_list]
	phone_list = [member.phone for member in members_telethon_list]

	df = pd.DataFrame()
	df['username'] = username_list
	df['first_name'] = first_name_list
	df['last_name'] = last_name_list
	df['phone'] = phone_list
	df.to_csv('subscribers.csv', index=False)

if __name__ == '__main__':
	loop = asyncio.get_event_loop()
	loop.run_until_complete(main())
 9 комментариев    630   9 мес   Analytics Engineering   python   telegram   telethon
Ранее Ctrl + ↓