Эффективное логирование в Python

Время чтения текста – 5 минут

В Python существует встроенный модуль logging, который позволяет журналировать этапы выполнения программы. Логирование полезно когда, например, нужно оставить большой скрипт сбора / обработки данных на длительное время, а в случае возникновения непредвиденных ошибок выяснить, с чем они могут быть связаны. Анализ логов позволяет быстро и эффективно выявлять проблемные места в коде, но для удобного использования модуля следует написать несколько функций по взаимодействию с ним и вынести их в отдельный файл — сегодня мы этим и займёмся.

Пишем логгер

Создадим файл loggers.py. Для начала импортируем модули и задаём пару значений по умолчанию — директорию для файла с логом и наименование конфигурационного файла, содержащего шаблоны логирования. Его мы опишем следом.

import os
import json
import logging
import logging.config

FOLDER_LOG = "log"
LOGGING_CONFIG_FILE = 'loggers.json'

Опишем функцию для создания папки с логом: она принимает наименование для папки, но по умолчанию будет называть её «log». Директорию создаём при помощи модуля os и только в том случае, если такой директории ещё не существует.

def create_log_folder(folder=FOLDER_LOG):
    if not os.path.exists(folder):
        os.mkdir(folder)

Теперь опишем функцию создания нового логгера по заданному шаблону. Функция должна создать директорию для логирования, открыть конфигурационный файл и достать нужный шаблон. Затем по шаблону при помощи модуля logging создаём новый логгер:

def get_logger(name, template='default'):
    create_log_folder()
    with open(LOGGING_CONFIG_FILE, "r") as f:
        dict_config = json.load(f)
        dict_config["loggers"][name] = dict_config["loggers"][template]
    logging.config.dictConfig(dict_config)
    return logging.getLogger(name)

Для удобства опишем ещё одну функцию — получение стандартного лога. Она ничего не принимает и нужна только для инициализации лога с шаблоном default:

def get_default_logger():
    create_log_folder()
    with open(LOGGING_CONFIG_FILE, "r") as f:
        logging.config.dictConfig(json.load(f))

    return logging.getLogger("default")

Описываем конфигурационный файл

Создадим по соседству файл loggers.json — он будет содержать настройки логгера. Внутри указываем такие настройки, как версию логгера, форматы логирования для разных уровней, наименование выходного файла и его максимальный размер:

{
    "version": 1,
    "disable_existing_loggers": false,
    "formatters": {
        "default": {
            "format": "%(asctime)s - %(processName)-10s - %(name)-10s - %(levelname)-8s - %(message)s"
        }
    },
    "handlers": {
        "console": {
            "class": "logging.StreamHandler",
            "level": "INFO",
            "formatter": "default"
        },
        "rotating_file": {
            "class": "logging.handlers.RotatingFileHandler",
            "level": "DEBUG",
            "formatter": "default",
            "filename": "log/main.log",
            "maxBytes": 10485760,
            "backupCount": 20
        }
    },
    "loggers": {
        "default": {
            "handlers": ["console", "rotating_file"],
            "level": "DEBUG"
        }
    }
}

Использование логгера

Теперь давайте представим, что вы выгружаете данные по API и складываете их в базу данных на примере нашего материала про транзакции в SQLAlchemy. Рассмотрим заключительную часть кода: добавим строку с инициализацией стандартного логгера и изменим код так, чтобы сначала в лог выводился offset, затем в случае успеха предложение «Successfully inserted data», а в случае ошибки выводилась сама ошибка и предложение: «Error: tried to insert data but got an error».

logger = get_logger('main')

offset = 0
subs_count = get_subs_count(group_id)

while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            logger.info(f"{offset} / {subs_count}")
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            if offset == 10:
                raise(ValueError("This is a test errror"))
            transaction.commit()
            logger.info(f"Successfully inserted data")
        except Exception as E:
            transaction.rollback()
            logger.error(f"Error: tried to insert {df} but got an error: {E}")
    time.sleep(1)
    offset += 10

Теперь во время работы программы будет отображаться такой вывод, который также будет записан в файл main.log папки log в директории проекта. После завершения работы программы можно исследовать логи, посмотреть, на каких offset возникли проблемы, какие данные не удалось вставить и прочитать текст ошибки:

 Нет комментариев    717   2021   Analytics Engineering   python   логирование

Бот для преобразования данных из Coinkeeper

Время чтения текста – 6 минут

Coinkeeper — кроссплатформенное приложение для учёта финансов. Внутри можно выпустить виртуальную банковскую карту Visa с бесплатным годовым обслуживанием, которая будет присылать уведомления, если вы тратите больше, чем запланировали. Помимо уведомлений, приложение ведёт историю трат и позволяет выгрузить сводный отчёт в формате csv. Данные, которое выгружает приложение ещё не готовы к анализу и выглядят так:

Азат Шарипов сделал скрипт обработки данных в пригодный для Tableau вид и подготовил Tableau Public книгу, а Рома Бунин в рамках своего проекта «Переверстка» переработал дашборд.

Мы решили тоже поучаствовать, и с нашей стороны Елизавета Мазурова сделала чат-бота.

Чат-бот крутой! Помимо того, что он может как и прежде отдавать обратно .csv-файл, он позволяет автоматизировать рутину по обновлению отчета через Google-таблицы. Как, наверное, многие помнят, Tableau Public может работать на гугл-таблицах или csv файлах, но не разрешает подключение к данным. Бот умный: он создаст за вас гугл-таблицу и когда вы повторно отправите ему новый файл обновит ее.

Использование бота

Перейдите в диалог с ботом и введите команду /start — в ответе бот расскажет немного о себе. Для продолжения работы нажмите на кнопку «Начать».

Сразу после можно отправить csv-файл, выгруженный из Coinkeeper:

Выберите тип файла — csv или таблицу в Google Spreadsheets.

В случае выбора csv-файла бот пришлёт его:

А в случае ссылки в первый раз нужно будет пройти небольшую регистрацию — указать почту и наименование для файла.

Затем бот пришлёт ссылку на файл:

Скрипт преобразовал данные, и таблицу можно указать в качестве источника данных в Tableau. А благодаря тому, что в случае загрузки нового файла создаётся не новая таблица, а обновляется старая, отчёт в Tableau тоже обновится. В результате открывается возможность еженедельно присылать боту новую таблицу и сразу переходить в обновлённый отчёт.

 Нет комментариев    119   2021   Analytics Engineering   coinkeeper   google analytics   tableau   telegram

Python и тексты нового альбома Земфиры: анализируем суть песен

Время чтения текста – 18 минут

Неделю назад вышёл первый за 8 лет студийный альбом Земфиры «Бордерлайн». К работе помимо рок-певицы приложили руку разные люди, в том числе и её родственники — рифф для песни «таблетки» написал её племянник из Лондона. Альбом получился разнообразным: например, песня «остин» посвящена главному персонажу игры Homescapes российской студии Playrix (кстати, посмотрите свежие Бизнес-секреты с братьями Бухманами, там они тоже про это рассказывают) — Земфире нравится игра, и для трека она связалась со студией. А сингл «крым» был написан в качестве саундтрека к новой картине соратницы Земфиры — Ренаты Литвиновой.

Послушать альбом в Apple Music / Яндекс.Музыке / Spotify

Тем не менее, дух всего альбома довольно мрачен — в песнях часто повторяются слова «боль», «ад», «бесишь» и прочие по смыслу. Мы решили провести разведочный анализ нового альбома, а затем при помощи модели Word2Vec и косинусной меры посмотреть на семантическую близость песен между собой и вычислить общее настроение альбома.

Для тех, кому скучно читать про подготовку данных и шаги анализа можно перейти сразу к результатам.

Подготовка данных

Для начала работы напишем скрипт обработки данных. Цель скрипта — из множества текстовых файлов, в каждом из которых лежит по песне, собрать единую csv-таблицу. При этом текст треков очищаем от знаков пунктуации и ненужных слов.

import pandas as pd
import re
import string
import pymorphy2
from nltk.corpus import stopwords

Создаём морфологический анализатор и расширяем список всего, что нужно отбросить:

morph = pymorphy2.MorphAnalyzer()
stopwords_list = stopwords.words('russian')
stopwords_list.extend(['куплет', 'это', 'я', 'мы', 'ты', 'припев', 'аутро', 'предприпев', 'lyrics', '1', '2', '3', 'то'])
string.punctuation += '—'

Названия песен приведены на английском — создадим словарь для перевода на русский и словарь, из которого позднее сделаем таблицу:

result_dict = dict()

songs_dict = {
    'snow':'снег идёт',
    'crimea':'крым',
    'mother':'мама',
    'ostin':'остин',
    'abuse':'абьюз',
    'wait_for_me':'жди меня',
    'tom':'том',
    'come_on':'камон',
    'coat':'пальто',
    'this_summer':'этим летом',
    'ok':'ок',
    'pills':'таблетки'
}

Опишем несколько функций. Первая читает целиком песню из файла и удаляет переносы строки, вторая очищает текст от ненужных символов и слов, а третья при помощи морфологического анализатора pymorphy2 приводит слова к нормальной форме. Модуль pymorphy2 не всегда хорошо справляется с неоднозначностью — для слов «ад» и «рай» потребуется дополнительная обработка.

def read_song(filename):
    f = open(f'{filename}.txt', 'r').read()
    f = f.replace('\n', ' ')
    return f

def clean_string(text):
    text = re.split(' |:|\.|\(|\)|,|"|;|/|\n|\t|-|\?|\[|\]|!', text)
    text = ' '.join([word for word in text if word not in string.punctuation])
    text = text.lower()
    text = ' '.join([word for word in text.split() if word not in stopwords_list])
    return text

def string_to_normal_form(string):
    string_lst = string.split()
    for i in range(len(string_lst)):
        string_lst[i] = morph.parse(string_lst[i])[0].normal_form
        if (string_lst[i] == 'аду'):
            string_lst[i] = 'ад'
        if (string_lst[i] == 'рая'):
            string_lst[i] = 'рай'
    string = ' '.join(string_lst)
    return string

Проходим по каждой песне и читаем файл с соответствующим названием:

name_list = []
text_list = []
for song, name in songs_dict.items():
    text = string_to_normal_form(clean_string(read_song(song)))
    name_list.append(name)
    text_list.append(text)

Затем объединяем всё в DataFrame и сохраняем в виде csv-файла.

df = pd.DataFrame()
df['name'] = name_list
df['text'] = text_list
df['time'] = [290, 220, 187, 270, 330, 196, 207, 188, 269, 189, 245, 244]
df.to_csv('borderline.csv', index=False)

Результат:

Облако слов по всему альбому

Начнём анализ с построения облака слов — оно отобразит, какие слова чаще всего встречаются в песнях. Импортируем нужные библиотеки, читаем csv-файл и устанавливаем конфигурации:

import nltk
from wordcloud import WordCloud
import pandas as pd
import matplotlib.pyplot as plt
from nltk import word_tokenize, ngrams

%matplotlib inline
nltk.download('punkt')
df = pd.read_csv('borderline.csv')

Теперь создаём новую фигуру, устанавливаем параметры оформления и при помощи библиотеки wordcloud отображаем слова с размером прямо пропорциональным частоте упоминания слова. Над каждым графиком дополнительно указываем название песни.

fig = plt.figure()
fig.patch.set_facecolor('white')
plt.subplots_adjust(wspace=0.3, hspace=0.2)
i = 1
for name, text in zip(df.name, df.text):
    tokens = word_tokenize(text)
    text_raw = " ".join(tokens)
    wordcloud = WordCloud(colormap='PuBu', background_color='white', contour_width=10).generate(text_raw)
    plt.subplot(4, 3, i, label=name,frame_on=True)
    plt.tick_params(labelsize=10)
    plt.imshow(wordcloud)
    plt.axis("off")
    plt.title(name,fontdict={'fontsize':7,'color':'grey'},y=0.93)
    plt.tick_params(labelsize=10)
    i += 1

EDA текстов альбома

Теперь проанализируем тексты песен — импортируем библиотеки для работы с данными и визуализации:

import plotly.graph_objects as go
import plotly.figure_factory as ff
from scipy import spatial
import collections
import pymorphy2
import gensim

morph = pymorphy2.MorphAnalyzer()

Сначала посчитаем число слов в каждой песне, число уникальных слов и процентное соотношение:

songs = []
total = []
uniq = []
percent = []

for song, text in zip(df.name, df.text):
    songs.append(song)
    total.append(len(text.split()))
    uniq.append(len(set(text.split())))
    percent.append(round(len(set(text.split())) / len(text.split()), 2) * 100)

А теперь составим из этого DataFrame и дополнительно посчитаем число слов в минуту для каждой песни:

df_words = pd.DataFrame()
df_words['song'] = songs
df_words['total words'] = total
df_words['uniq words'] = uniq
df_words['percent'] = percent
df_words['time'] = df['time']
df_words['words per minute'] = round(total / (df['time'] // 60))
df_words = df_words[::-1]

Данные хорошо бы визуализировать — построим две столбиковые диаграммы: одну для числа слов в песне, а другую для числа слов в минуту.

colors_1 = ['rgba(101,181,205,255)'] * 12
colors_2 = ['rgba(62,142,231,255)'] * 12

fig = go.Figure(data=[
    go.Bar(name='📝 Всего слов',
           text=df_words['total words'],
           textposition='auto',
           x=df_words.song,
           y=df_words['total words'],
           marker_color=colors_1,
           marker=dict(line=dict(width=0)),),
    go.Bar(name='🌀 Уникальных слов',
           text=df_words['uniq words'].astype(str) + '<br>'+ df_words.percent.astype(int).astype(str) + '%' ,
           textposition='inside',
           x=df_words.song,
           y=df_words['uniq words'],
           textfont_color='white',
           marker_color=colors_2,
           marker=dict(line=dict(width=0)),),
])

fig.update_layout(barmode='group')

fig.update_layout(
    title = 
        {'text':'<b>Соотношение числа уникальных слов к общему количеству</b><br><span style="color:#666666"></span>'},
    showlegend = True,
    height=650,
    font={
        'family':'Open Sans, light',
        'color':'black',
        'size':14
    },
    plot_bgcolor='rgba(0,0,0,0)',
)
fig.update_layout(legend=dict(
    yanchor="top",
    xanchor="right",
))

fig.show()
colors_1 = ['rgba(101,181,205,255)'] * 12
colors_2 = ['rgba(238,85,59,255)'] * 12

fig = go.Figure(data=[
    go.Bar(name='⏱️ Длина трека, мин.',
           text=round(df_words['time'] / 60, 1),
           textposition='auto',
           x=df_words.song,
           y=-df_words['time'] // 60,
           marker_color=colors_1,
           marker=dict(line=dict(width=0)),
          ),
    go.Bar(name='🔄 Слов в минуту',
           text=df_words['words per minute'],
           textposition='auto',
           x=df_words.song,
           y=df_words['words per minute'],
           marker_color=colors_2,
           textfont_color='white',
           marker=dict(line=dict(width=0)),
          ),
])

fig.update_layout(barmode='overlay')

fig.update_layout(
    title = 
        {'text':'<b>Длина трека и число слов в минуту</b><br><span style="color:#666666"></span>'},
    showlegend = True,
    height=650,
    font={
        'family':'Open Sans, light',
        'color':'black',
        'size':14
    },
    plot_bgcolor='rgba(0,0,0,0)'
)


fig.show()

Работа с Word2Vec моделью

При помощи модуля gensim загружаем модель, указывая на бинарный файл:

model = gensim.models.KeyedVectors.load_word2vec_format('model.bin', binary=True)

Для материала мы использовали готовую обученную на Национальном Корпусе Русского Языка модель от сообщества RusVectōrēs

Модель Word2Vec основана на нейронных сетях и позволяет представлять слова в виде векторов, учитывая семантическую составляющую. Это означает, что если мы возьмём два слова — например, «мама» и «папа», представим их в виде двух векторов и посчитаем косинус, значения будет близко к 1. Аналогично, у двух слов, не имеющих ничего общего по смыслу косинусная мера близка к 0.

Опишем функцию get_vector: она будет принимать список слов, распознавать для каждого часть речи, а затем получать и суммировать вектора — так мы сможем находить вектора не для одного слова, а для целых предложений и текстов.

def get_vector(word_list):
    vector = 0
    for word in word_list:
        pos = morph.parse(word)[0].tag.POS
        if pos == 'INFN':
            pos = 'VERB'
        if pos in ['ADJF', 'PRCL', 'ADVB', 'NPRO']:
            pos = 'NOUN'
        if word and pos:
            try:
                word_pos = word + '_' + pos
                this_vector = model.word_vec(word_pos)
                vector += this_vector
            except KeyError:
                continue
    return vector

Для каждой песни находим вектор и собираем соответствующий столбец в DataFrame:

vec_list = []
for word in df['text']:
    vec_list.append(get_vector(word.split()))
df['vector'] = vec_list

Теперь сравним вектора между собой, посчитав их косинусную близость. Те песни, у которых косинусная метрика выше 0,5 запомним отдельно — так мы получим самые близкие пары песен. Данные о сравнении векторов запишем в двумерный список result.

similar = dict()
result = []
for song_1, vector_1 in zip(df.name, df.vector):
    sub_list = []
    for song_2, vector_2 in zip(df.name.iloc[::-1], df.vector.iloc[::-1]):
        res = 1 - spatial.distance.cosine(vector_1, vector_2)
        if res > 0.5 and song_1 != song_2 and (song_1 + ' / ' + song_2 not in similar.keys() and song_2 + ' / ' + song_1 not in similar.keys()):
            similar[song_1 + ' / ' + song_2] = round(res, 2)
        sub_list.append(round(res, 2))
    result.append(sub_list)

Самые похожие треки соберём в отдельный DataFrame:

df_top_sim = pd.DataFrame()
df_top_sim['name'] = list(similar.keys())
df_top_sim['value'] = list(similar.values())
df_top_sim.sort_values(by='value', ascending=False)

И построим такой же bar chart:

colors = ['rgba(101,181,205,255)'] * 5

fig = go.Figure([go.Bar(x=df_top_sim['name'],
                        y=df_top_sim['value'],
                        marker_color=colors,
                        width=[0.4,0.4,0.4,0.4,0.4],
                        text=df_top_sim['value'],
                        textfont_color='white',
                        textposition='auto')])

fig.update_layout(
    title = 
        {'text':'<b>Топ-5 схожих песен</b><br><span style="color:#666666"></span>'},
    showlegend = False,
    height=650,
    font={
        'family':'Open Sans, light',
        'color':'black',
        'size':14
    },
    plot_bgcolor='rgba(0,0,0,0)',
    xaxis={'categoryorder':'total descending'}
)

fig.show()

Имея вектор каждой песни, давайте посчитаем вектор всего альбома — сложим вектора песен. Затем для такого вектора при помощи модели получим самые близкие по духу и смыслу слова.

def get_word_from_tlist(lst):
    for word in lst:
        word = word[0].split('_')[0]
        print(word, end=' ')

vec_sum = 0
for vec in df.vector:
    vec_sum += vec
sim_word = model.similar_by_vector(vec_sum)
get_word_from_tlist(sim_word)

небо тоска тьма пламень плакать горе печаль сердце солнце мрак

Наверное, это ключевой результат и описание альбома Земфиры всего лишь в 10 словах.

Наконец, построим общую тепловую карту, каждая ячейка которой — результат сравнения косинусной мерой текстов двух треков.

colorscale=[[0.0, "rgba(255,255,255,255)"],
            [0.1, "rgba(229,232,237,255)"],
            [0.2, "rgba(216,222,232,255)"],
            [0.3, "rgba(205,214,228,255)"],
            [0.4, "rgba(182,195,218,255)"],
            [0.5, "rgba(159,178,209,255)"],
            [0.6, "rgba(137,161,200,255)"],
            [0.7, "rgba(107,137,188,255)"],
            [0.8, "rgba(96,129,184,255)"],
            [1.0, "rgba(76,114,176,255)"]]

font_colors = ['black']
x = list(df.name.iloc[::-1])
y = list(df.name)
fig = ff.create_annotated_heatmap(result, x=x, y=y, colorscale=colorscale, font_colors=font_colors)
fig.show()

Результаты анализа и интерпретация данных

Давайте ещё раз посмотрим на всё, что у нас получилось — начнём с облака слов. Нетрудно заметить, что у слов «боль», «невозможно», «сорваться», «растерзаны», «сложно», «терпеть», «любить» размер весьма приличный — всё потому, что такие слова встречаются часто на протяжении всего текста песен:

Одной из самых «разнообразных» песен оказался сингл «крым» — в нём 74% уникальных слов. А в песне «снег идёт» слов совсем мало, поэтому большинство — 82% уникальны. Самой большой песней в альбоме получился трек «таблетки» — суммарно там около 150 слов.

Как было выяснено на прошлом графике, самый «динамичный» трек — «таблетки», целых 37 слов в минуту — практически по слову на каждые две секунды. А самый длинный трек — «абъюз», в нём же и согласно предыдущему графику практически самый низкий процент уникальных слов — 46%.

Топ-5 самых семантически похожих пар текстов:

Ещё мы получили вектор всего альбома и подобрали самые близкие слова. Только посмотрите на них — «тьма», «тоска», «плакать», «горе», «печаль», «сердце» — это же ведь и есть тот перечень слов, который характеризует лирику Земфиры!

небо тоска тьма пламень плакать горе печаль сердце солнце мрак

Финал — тепловая карта. По визуализации заметно, что практически все песни достаточно схожи между собой — косинусная мера у многих пар превышает значение в 0.4.

Выводы

В материале мы провели EDA всего текста нового альбома и при помощи предобученной модели Word2Vec доказали гипотезу — большинство песен «бордерлайна» пронизывают довольно мрачные и тексты. И это нормально, ведь Земфиру мы любим именно за искренность и прямолинейность.

 1 комментарий    1251   2021   analysis   Analytics Engineering   Data Analytics   plotly   python   бордерлайн   земфира

Экспорт исторических данных Apple Health в Google Sheets

Время чтения текста – 9 минут

Для устройств на базе iOS и watchOS существует приложение Health, которое ежедневно записывает все данные о здоровье носителя и синхронизирует их со сторонними приложениями. Все эти данные в любой момент можно получить прямо из приложения в виде XML-документа. Сегодня мы выгрузим исторические данные о здоровье из приложения Apple Health, обработаем их и отправим в Google Sheets для анализа и визуализации в будущем.

Экспорт архива из приложения

Зайдите в приложение Health на iPhone. Нажмите на аватарку своего профиля в верхнем правом углу — откроется меню приложения.

Внизу нажмите на кнопку «Экспортировать медданные». Через некоторое время откроется меню экспорта — отправьте архив себе на компьютер любым способом, можно по AirDrop или даже по почте в письме самому себе. Из архива нужен только один файл — «экспорт.xml». Достаньте его и положите в папку с ноутбуком jupyter.

Парсер XML в DataFrame

При помощи библиотеки XML составляем дерево на основе документа из Health. Собирать в словарь будем следующие атрибуты: тип, единица измерения, дата создания, дата начала, дата конца, значение. Проходим по всему дереву и отправляем полученные значения атрибутов в records_dict.

from xml.etree import ElementTree
import pandas as pd
import datetime

tree = ElementTree.parse('экспорт.xml')
root = tree.getroot()
records = root.findall('Record')

records_dict = {
    'type':[],
    'unit':[],
    'creationDate':[],
    'startDate':[],
    'endDate':[],
    'value':[]
}

for record in records:
    for attribute in records_dict.keys():
        attribute_value = record.get(attribute)
        records_dict[attribute].append(attribute_value)

События записаны в нечитабельном виде — для перевода составим специальный словарь с нужными типами, где ключ — старое название, а значение — новое. Мы возьмём только 11 событий: минуты осознанности, дистанция на велосипеде, дистанция заплыва, дистанция ходьбы и бега, пройдено пролётов, пульс, пульс в покое, шаги, активная энергия, энергия покоя и средний пульс при ходьбе.

types_dict = {
    'HKCategoryTypeIdentifierMindfulSession': 'Mindful Session',
    'HKQuantityTypeIdentifierDistanceCycling': 'Cycling Distance',
    'HKQuantityTypeIdentifierDistanceSwimming': 'Swimming Distance',
    'HKQuantityTypeIdentifierDistanceWalkingRunning': 'Walking + Running Distance',
    'HKQuantityTypeIdentifierFlightsClimbed': 'Flights Climbed',
    'HKQuantityTypeIdentifierHeartRate': 'Heart Rate',
    'HKQuantityTypeIdentifierRestingHeartRate': 'Resting Heart Rate',
    'HKQuantityTypeIdentifierStepCount': 'Steps',
    'HKQuantityTypeIdentifierActiveEnergyBurned': 'Active Calories',
    'HKQuantityTypeIdentifierBasalEnergyBurned': 'Resting Calories',
    'HKQuantityTypeIdentifierWalkingHeartRateAverage': 'Walking Heart Rate Average'
}

Для минут осознанности в поле значения записей нет — мы сами посчитаем позже это поле как разницу даты окончания и начала события. Разница будет представлена как timedelta, поэтому напишем функцию перевода timedelta в минуты:

def td_to_m(td):
    seconds = td.seconds + td.days * 24 * 60 * 60
    return seconds // 60

Из словаря создаём DataFrame и задаём названия колонок. Оставляем только те 11 событий, которые есть в словаре types_dict и приводим все колонки к нужным типам данных:

df = pd.DataFrame(records_dict)
df.columns = ['type', 'unit', 'date', 'start', 'end', 'value']
df = df[df['type'].isin(types_dict.keys())]
df['value'] = df['value'].astype(float)
df['date'] = df['date'].astype('datetime64')
df['date'] = df['date'].dt.date
df['start'] = df['start'].astype('datetime64')
df['end'] = df['end'].astype('datetime64')
df['unit'] = df['unit'].astype(str)

Данные Health при экспорте никак не группируются — мы сделаем это самостоятельно. DataFrame можно поделить на три: в первом будут события, у которых единица измерения «количество в минуту» — для таких событий нужно искать среднее значение. В другой группе будут минуты осознанности — считаем число минут в каждой записи и суммируем. В последней группе находятся все остальные записи, связанные с количественными событиями — шаги, дистанция ходьбы и бега и так далее. Их тоже суммируем.

df_1 = df[df['unit'] == 'count/min']
df_1 = df_1.groupby(by=['date', 'type', 'unit'], as_index=False).agg({'start':'min',
                                                                      'end':'max',
                                                                      'value':'mean'})

df_2 = df[df['type'] == 'HKCategoryTypeIdentifierMindfulSession']
df_2['value'] = df_2['end'] - df_2['start']
df_2['value'] = df_2['value'].map(td_to_m)
df_2 = df_2.groupby(by=['date', 'type', 'unit'], as_index=False).agg({'start':'min',
                                                                     'end':'max',
                                                                     'value':'sum'})
df_3 = df[(df['unit'] != 'count/min') & (df['type'] != 'HKCategoryTypeIdentifierMindfulSession')]
df_3 = df_3.groupby(by=['date', 'type', 'unit'], as_index=False).agg({'start':'min',
                                                                      'end':'max',
                                                                      'value':'sum'})
df = pd.concat([df_1, df_2, df_3])

Дату создания записи переводим в строковый тип. Все наименования типов событий заменяем согласно словарю types_dict. В переменную dates записываем все уникальные даты.

df['date'] = df['date'].astype(str)
df['type'] = df['type'].apply(lambda x: types_dict[x])
dates = df['date'].unique()

В результате нужен словарь с колонкой даты и отдельной колонкой под каждое из 11 событий:

result = {
    'date': [],
    'Steps': [],
    'Walking + Running Distance': [],
    'Swimming Distance': [],
    'Cycling Distance': [],
    'Resting Calories': [],
    'Active Calories': [],
    'Flights Climbed': [],
    'Heart Rate': [],
    'Resting Heart Rate': [],
    'Walking Heart Rate Average': [],
    'Mindful Session': []
}

Проходим по каждой дате и получаем кусок DataFrame за эту дату. Добавляем её в словарь и проходим по каждому ключу, пробуя добавить значение:

for date in dates:
    part = df[df['date'] == date]
    result['date'].append(date)
    for key in result.keys():
        if key == 'date':
            continue
        else:
            field = 'value'
        try:
            result[key].append(part[part['type'] == key][field].values[0])
        except IndexError:
            result[key].append(None)

Из полученного словаря создаём DataFrame, округляем всё до двух знаков после запятой и сортируем по дате:

result_df = pd.DataFrame(result)
result_df = result_df.round(2)
result_df = result_df.sort_values(by='date')

В результате получается такая таблица с историческими данными по 11 событиям:

Экспорт DataFrame в Google Sheets

Для экспорта в Google Docs необходим сервисный аккаунт и json-файл с ключом. О том, как его получить, мы писали в материале «Собираем данные по рекламным кампаниям ВКонтакте»

Создайте новый документ в Google Sheets. Весь DataFrame можно вставить одним действием при помощи методов библиотеки gspread. Импортируйте её, а также укажите идентификатор документа и json-файл с ключом. В методе get_worksheet указывается порядковый номер листа в файле начиная с нуля.

import pandas as pd
import gspread
from gspread_dataframe import set_with_dataframe
gc = gspread.service_account(filename='serviceAccount.json')
sh = gc.open_by_key('1osKA63LQkUC0FC0eIZ63jEJwn1TeIkUvqCV6ur')
worksheet = sh.get_worksheet(0)

В итоге в Google Spreadsheets появится такая таблица:

А в следующем материале посмотрим, как наладить ежедневный экспорт данных Здоровья в эту таблицу при помощи шорткатов и Google AppScript!

 Нет комментариев    252   2021   Analytics Engineering   apple health   Data Analytics   pandas   python

Транзакции в SQLAlchemy

Время чтения текста – 5 минут

Транзакция — последовательность действий, связанных с базой данных. Их основная польза заключается в том, что при возникновении какой-то ошибки или достижении других нужных условий всю транзакцию можно отменить, и все изменения, примененные к базе данных, будут отменены. Сегодня мы напишем небольшой скрипт, который при помощи транзакций SQLAlchemy пишет информацию о подписчиках сообщества в базу данных MySQL, а при возникновении ошибки отменяет текущую транзакцию.

Сбор информации об участниках через VK API

Для начала напишем пару маленьких функций — первая будет возвращать число подписчиков сообщества, а вторая — отправлять запрос и формировать датафрейм с информацией о подписчиках сообщества.

Подробнее о том, как получить токен, можно прочитать в материале «Собираем данные по рекламным кампаниям ВКонтакте»

from sqlalchemy import create_engine
import pandas as pd
import requests
import time

token = '42hj2ehd3djdournf48fjurhf9r9o2eurnf48fjurhf9r9734'
group_id = 'leftjoin'

Чтобы узнать число подписчиков достаточно отправить метод groups.getMembers с любыми параметрами — в ответе всегда возвращается количество в поле count.

def get_subs_count(group_id):
    count = requests.get('https://api.vk.com/method/groups.getMembers', params={
        'access_token':token,
        'v':5.103,
        'group_id':group_id
    }).json()['response']['count']
    return count

Для примера будем брать имена, id, фамилии подписчиков, некоторую расширенную информацию и получать только по 10 подписчиков за раз, чтобы рассмотреть работу транзакций детально — каждые 10 подписчиков будут вставляться одной транзакцией. Введём дополнительное поле offset, чтобы знать, в какой итерации добавлены строки.

def get_subs_info(group_id, offset):
    response = requests.get('https://api.vk.com/method/groups.getMembers', params={
        'access_token':token,
        'v':5.103,
        'group_id':group_id,
        'offset':offset,
        'count':10,
        'fields':'sex, has_mobile, relation, can_post'
    }).json()['response']['items']
    df = pd.DataFrame(response)
    df['offset'] = offset
    return df

Транзакции

Наконец, можем подсоединиться к базе данных при помощи SQLAlchemy:

engine = create_engine('mysql+mysqlconnector://' +
                           'root' + ':' + '' + '@' +
                           'localhost' + '/' +
                           'transaction', echo=False)

У транзакций всегда должно быть начало — begin, и конец — commit. В случае, если произошла какая-то ошибка, можно сделать откат — rollback. Сперва получаем число подписчиков сообщество, и в каждой итерации цикла при помощи контекстного менеджера with ... as создаём новое подключение. Сразу после объявляем начало транзакции по этому подключению и с обработчиком исключений пробуем получить информацию о десяти подписчиках через функцию get_subs_info. Вставляем полученный датафрейм в таблицу методом to_sql и завершаем транзакцию при помощи метода commit(). В случае, если возникла какая-то ошибка — печатаем её на экран и отменяем транзакцию.

offset = 0
subs_count = get_subs_count(group_id)
while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            transaction.commit()
        except Exception as E:
            print(E)
            transaction.rollback()
    time.sleep(1)
    offset += 10

Чтобы протестировать работу транзакций слегка обновим последний блок кода — добавим вызов ошибки ValueError после вставки данных в базу, если текущий offset равен 10.

offset = 0
subs_count = get_subs_count(group_id)
while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            if offset == 10:
                raise(ValueError)
            transaction.commit()
        except Exception as E:
            print(E)
            transaction.rollback()
    time.sleep(1)
    offset += 10

Как и планировалось, данные за итерацию с offset = 10 не занесены в таблицу. Несмотря на то, что ошибка возникла уже после добавления новых данных, транзакция была прервана методом rollback() и завершение транзакции было отменено.

 Нет комментариев    329   2021   Analytics Engineering   python   sql   sqlalchemy
Ранее Ctrl + ↓