Экспорт исторических данных Apple Health в Google Sheets

Время чтения текста – 9 минут

Для устройств на базе iOS и watchOS существует приложение Health, которое ежедневно записывает все данные о здоровье носителя и синхронизирует их со сторонними приложениями. Все эти данные в любой момент можно получить прямо из приложения в виде XML-документа. Сегодня мы выгрузим исторические данные о здоровье из приложения Apple Health, обработаем их и отправим в Google Sheets для анализа и визуализации в будущем.

Экспорт архива из приложения

Зайдите в приложение Health на iPhone. Нажмите на аватарку своего профиля в верхнем правом углу — откроется меню приложения.

Внизу нажмите на кнопку «Экспортировать медданные». Через некоторое время откроется меню экспорта — отправьте архив себе на компьютер любым способом, можно по AirDrop или даже по почте в письме самому себе. Из архива нужен только один файл — «экспорт.xml». Достаньте его и положите в папку с ноутбуком jupyter.

Парсер XML в DataFrame

При помощи библиотеки XML составляем дерево на основе документа из Health. Собирать в словарь будем следующие атрибуты: тип, единица измерения, дата создания, дата начала, дата конца, значение. Проходим по всему дереву и отправляем полученные значения атрибутов в records_dict.

from xml.etree import ElementTree
import pandas as pd
import datetime

tree = ElementTree.parse('экспорт.xml')
root = tree.getroot()
records = root.findall('Record')

records_dict = {
    'type':[],
    'unit':[],
    'creationDate':[],
    'startDate':[],
    'endDate':[],
    'value':[]
}

for record in records:
    for attribute in records_dict.keys():
        attribute_value = record.get(attribute)
        records_dict[attribute].append(attribute_value)

События записаны в нечитабельном виде — для перевода составим специальный словарь с нужными типами, где ключ — старое название, а значение — новое. Мы возьмём только 11 событий: минуты осознанности, дистанция на велосипеде, дистанция заплыва, дистанция ходьбы и бега, пройдено пролётов, пульс, пульс в покое, шаги, активная энергия, энергия покоя и средний пульс при ходьбе.

types_dict = {
    'HKCategoryTypeIdentifierMindfulSession': 'Mindful Session',
    'HKQuantityTypeIdentifierDistanceCycling': 'Cycling Distance',
    'HKQuantityTypeIdentifierDistanceSwimming': 'Swimming Distance',
    'HKQuantityTypeIdentifierDistanceWalkingRunning': 'Walking + Running Distance',
    'HKQuantityTypeIdentifierFlightsClimbed': 'Flights Climbed',
    'HKQuantityTypeIdentifierHeartRate': 'Heart Rate',
    'HKQuantityTypeIdentifierRestingHeartRate': 'Resting Heart Rate',
    'HKQuantityTypeIdentifierStepCount': 'Steps',
    'HKQuantityTypeIdentifierActiveEnergyBurned': 'Active Calories',
    'HKQuantityTypeIdentifierBasalEnergyBurned': 'Resting Calories',
    'HKQuantityTypeIdentifierWalkingHeartRateAverage': 'Walking Heart Rate Average'
}

Для минут осознанности в поле значения записей нет — мы сами посчитаем позже это поле как разницу даты окончания и начала события. Разница будет представлена как timedelta, поэтому напишем функцию перевода timedelta в минуты:

def td_to_m(td):
    seconds = td.seconds + td.days * 24 * 60 * 60
    return seconds // 60

Из словаря создаём DataFrame и задаём названия колонок. Оставляем только те 11 событий, которые есть в словаре types_dict и приводим все колонки к нужным типам данных:

df = pd.DataFrame(records_dict)
df.columns = ['type', 'unit', 'date', 'start', 'end', 'value']
df = df[df['type'].isin(types_dict.keys())]
df['value'] = df['value'].astype(float)
df['date'] = df['date'].astype('datetime64')
df['date'] = df['date'].dt.date
df['start'] = df['start'].astype('datetime64')
df['end'] = df['end'].astype('datetime64')
df['unit'] = df['unit'].astype(str)

Данные Health при экспорте никак не группируются — мы сделаем это самостоятельно. DataFrame можно поделить на три: в первом будут события, у которых единица измерения «количество в минуту» — для таких событий нужно искать среднее значение. В другой группе будут минуты осознанности — считаем число минут в каждой записи и суммируем. В последней группе находятся все остальные записи, связанные с количественными событиями — шаги, дистанция ходьбы и бега и так далее. Их тоже суммируем.

df_1 = df[df['unit'] == 'count/min']
df_1 = df_1.groupby(by=['date', 'type', 'unit'], as_index=False).agg({'start':'min',
                                                                      'end':'max',
                                                                      'value':'mean'})

df_2 = df[df['type'] == 'HKCategoryTypeIdentifierMindfulSession']
df_2['value'] = df_2['end'] - df_2['start']
df_2['value'] = df_2['value'].map(td_to_m)
df_2 = df_2.groupby(by=['date', 'type', 'unit'], as_index=False).agg({'start':'min',
                                                                     'end':'max',
                                                                     'value':'sum'})
df_3 = df[(df['unit'] != 'count/min') & (df['type'] != 'HKCategoryTypeIdentifierMindfulSession')]
df_3 = df_3.groupby(by=['date', 'type', 'unit'], as_index=False).agg({'start':'min',
                                                                      'end':'max',
                                                                      'value':'sum'})
df = pd.concat([df_1, df_2, df_3])

Дату создания записи переводим в строковый тип. Все наименования типов событий заменяем согласно словарю types_dict. В переменную dates записываем все уникальные даты.

df['date'] = df['date'].astype(str)
df['type'] = df['type'].apply(lambda x: types_dict[x])
dates = df['date'].unique()

В результате нужен словарь с колонкой даты и отдельной колонкой под каждое из 11 событий:

result = {
    'date': [],
    'Steps': [],
    'Walking + Running Distance': [],
    'Swimming Distance': [],
    'Cycling Distance': [],
    'Resting Calories': [],
    'Active Calories': [],
    'Flights Climbed': [],
    'Heart Rate': [],
    'Resting Heart Rate': [],
    'Walking Heart Rate Average': [],
    'Mindful Session': []
}

Проходим по каждой дате и получаем кусок DataFrame за эту дату. Добавляем её в словарь и проходим по каждому ключу, пробуя добавить значение:

for date in dates:
    part = df[df['date'] == date]
    result['date'].append(date)
    for key in result.keys():
        if key == 'date':
            continue
        else:
            field = 'value'
        try:
            result[key].append(part[part['type'] == key][field].values[0])
        except IndexError:
            result[key].append(None)

Из полученного словаря создаём DataFrame, округляем всё до двух знаков после запятой и сортируем по дате:

result_df = pd.DataFrame(result)
result_df = result_df.round(2)
result_df = result_df.sort_values(by='date')

В результате получается такая таблица с историческими данными по 11 событиям:

Экспорт DataFrame в Google Sheets

Для экспорта в Google Docs необходим сервисный аккаунт и json-файл с ключом. О том, как его получить, мы писали в материале «Собираем данные по рекламным кампаниям ВКонтакте»

Создайте новый документ в Google Sheets. Весь DataFrame можно вставить одним действием при помощи методов библиотеки gspread. Импортируйте её, а также укажите идентификатор документа и json-файл с ключом. В методе get_worksheet указывается порядковый номер листа в файле начиная с нуля.

import pandas as pd
import gspread
from gspread_dataframe import set_with_dataframe
gc = gspread.service_account(filename='serviceAccount.json')
sh = gc.open_by_key('1osKA63LQkUC0FC0eIZ63jEJwn1TeIkUvqCV6ur')
worksheet = sh.get_worksheet(0)

В итоге в Google Spreadsheets появится такая таблица:

А в следующем материале посмотрим, как наладить ежедневный экспорт данных Здоровья в эту таблицу при помощи шорткатов и Google AppScript!

 Нет комментариев    124   1 д   Analytics Engineering   apple health   Data Analytics   pandas   python

Транзакции в SQLAlchemy

Время чтения текста – 5 минут

Транзакция — последовательность действий, связанных с базой данных. Их основная польза заключается в том, что при возникновении какой-то ошибки или достижении других нужных условий всю транзакцию можно отменить, и все изменения, примененные к базе данных, будут отменены. Сегодня мы напишем небольшой скрипт, который при помощи транзакций SQLAlchemy пишет информацию о подписчиках сообщества в базу данных MySQL, а при возникновении ошибки отменяет текущую транзакцию.

Сбор информации об участниках через VK API

Для начала напишем пару маленьких функций — первая будет возвращать число подписчиков сообщества, а вторая — отправлять запрос и формировать датафрейм с информацией о подписчиках сообщества.

Подробнее о том, как получить токен, можно прочитать в материале «Собираем данные по рекламным кампаниям ВКонтакте»

from sqlalchemy import create_engine
import pandas as pd
import requests
import time

token = '42hj2ehd3djdournf48fjurhf9r9o2eurnf48fjurhf9r9734'
group_id = 'leftjoin'

Чтобы узнать число подписчиков достаточно отправить метод groups.getMembers с любыми параметрами — в ответе всегда возвращается количество в поле count.

def get_subs_count(group_id):
    count = requests.get('https://api.vk.com/method/groups.getMembers', params={
        'access_token':token,
        'v':5.103,
        'group_id':group_id
    }).json()['response']['count']
    return count

Для примера будем брать имена, id, фамилии подписчиков, некоторую расширенную информацию и получать только по 10 подписчиков за раз, чтобы рассмотреть работу транзакций детально — каждые 10 подписчиков будут вставляться одной транзакцией. Введём дополнительное поле offset, чтобы знать, в какой итерации добавлены строки.

def get_subs_info(group_id, offset):
    response = requests.get('https://api.vk.com/method/groups.getMembers', params={
        'access_token':token,
        'v':5.103,
        'group_id':group_id,
        'offset':offset,
        'count':10,
        'fields':'sex, has_mobile, relation, can_post'
    }).json()['response']['items']
    df = pd.DataFrame(response)
    df['offset'] = offset
    return df

Транзакции

Наконец, можем подсоединиться к базе данных при помощи SQLAlchemy:

engine = create_engine('mysql+mysqlconnector://' +
                           'root' + ':' + '' + '@' +
                           'localhost' + '/' +
                           'transaction', echo=False)

У транзакций всегда должно быть начало — begin, и конец — commit. В случае, если произошла какая-то ошибка, можно сделать откат — rollback. Сперва получаем число подписчиков сообщество, и в каждой итерации цикла при помощи контекстного менеджера with ... as создаём новое подключение. Сразу после объявляем начало транзакции по этому подключению и с обработчиком исключений пробуем получить информацию о десяти подписчиках через функцию get_subs_info. Вставляем полученный датафрейм в таблицу методом to_sql и завершаем транзакцию при помощи метода commit(). В случае, если возникла какая-то ошибка — печатаем её на экран и отменяем транзакцию.

offset = 0
subs_count = get_subs_count(group_id)
while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            transaction.commit()
        except Exception as E:
            print(E)
            transaction.rollback()
    time.sleep(1)
    offset += 10

Чтобы протестировать работу транзакций слегка обновим последний блок кода — добавим вызов ошибки ValueError после вставки данных в базу, если текущий offset равен 10.

offset = 0
subs_count = get_subs_count(group_id)
while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            if offset == 10:
                raise(ValueError)
            transaction.commit()
        except Exception as E:
            print(E)
            transaction.rollback()
    time.sleep(1)
    offset += 10

Как и планировалось, данные за итерацию с offset = 10 не занесены в таблицу. Несмотря на то, что ошибка возникла уже после добавления новых данных, транзакция была прервана методом rollback() и завершение транзакции было отменено.

 Нет комментариев    275   20 дн   Analytics Engineering   python   sql   sqlalchemy

Сбор информации о подписчиках Telegram-канала

Время чтения текста – 5 минут

На 2021 год боты в Telegram так и не имеют метода, позволяющего получать информацию о подписчиках канала. Тем не менее, существует достаточно сложное в освоении Telegram API и построенная на нём библиотека Telethon. Сегодня мы посмотрим, как при помощи библиотеки выгрузить информацию о подписчиках своего канала.

Создание приложения

Для начала необходимо создать приложение, через которое будут отправляться запросы к API. Перейдите на https://my.telegram.org и авторизуйтесь в Telegram-аккаунте:

После успешной авторизации перейдите на страницу API development tools:

Заполните все поля и жмите на создание приложения:

Из полученной конфигурации нам необходим app api_id и app api_hash:

Запрос к API

Импортируем telethon — он поможет сформировать запрос, и pandas — полученный ответ мы запишем в DataFrame.

from telethon import TelegramClient
import pandas as pd

Вводим api_id, api_hash, наш номер телефона и ссылку на канал, информацию о подписчиках которого хотим получить. Доступ к информации о подписчиках есть только у администраторов канала.

api_id = 1234567
api_hash = '1b42hj25kd8jw42b234kwj242c'
phone = '+71234567890'
channel_href = 'https://t.me/leftjoin'

Создаём новую сессию — вместо session_name можно подставить любое другое название. Методы в библиотеке работают асинхронно, поэтому ответа от них требуется ожидать:

client = TelegramClient('session_name', api_id, api_hash)
client = await client.start()
dialogs = await client.get_dialogs()

Собираем все каналы текущего пользователя. Из ссылки забираем часть с именем канала и вытаскиваем из словаря нужный:

channels = {d.entity.username: d.entity
            for d in dialogs
            if d.is_channel}
my_channel = channel_href.split('/')[-1]
channel = channels[my_channel]

Подписчиков, доступ к которым не ограничен приватностью, можно получить методом get_participants. С 20 июля 2018 года Telegram установил ограничение в 200 подписчиков для вызова метода, и установка параметра aggressive на True поможет получить всех подписчиков за раз.

members_telethon_list = await client.get_participants(channel, aggressive=True)

Из полученных библиотечных структур извлекаем информацию о пользователях — их имена и телефоны:

username_list = [member.username for member in members_telethon_list]
first_name_list = [member.first_name for member in members_telethon_list]
last_name_list = [member.last_name for member in members_telethon_list]
phone_list = [member.phone for member in members_telethon_list]

Из четырёх списков собираем DataFrame и пишем его в csv-таблицу:

df = pd.DataFrame()
df['username'] = username_list
df['first_name'] = first_name_list
df['last_name'] = last_name_list
df['phone'] = phone_list
df.to_csv('subscribers.csv', index=False)

Результат работы — такая таблица:

 Нет комментариев    379   24 дн   Analytics Engineering   python   telegram   telethon

Матемаркетинг: современный облачный Data Stack

Время чтения текста – 1 минута

С 9 по 13 ноября в онлайн-формате прошёл Матемаркетинг — крупнейшая конференция по маркетинговой аналитике в России, и в этом году мне посчастливилось стать одним из спикеров. Я выступил с двумя докладами, в этом материале обсудим первый — о современном облачном Data Stack.

Внутри объясняю подход к проектированию аналитической инфраструктуры, обосновываю использование Clickhouse при построении облачной аналитики и рассказываю о его же нюансах и говорю про Redash с точки зрения инструмента для визуализации.

 Нет комментариев    109   1 мес   Analytics Engineering   clickhouse   Data Analytics   data stack   reda

Робот для автоматизированного просмотра Instagram на Python и Selenium

Время чтения текста – 13 минут

Недавно мы начали вести Instagram — подписывайтесь, чтобы не пропустить контент, которого нет в блоге и Telegram!

Многие из нас ежедневно заходят в Instagram, чтобы посмотреть истории друзей и полистать ленту постов и рекомендаций. Предлагаем действенный способ сохранить своё время — напишем на Python и Selenium робота, который возьмёт на себя рутинную задачу проверки свежих новостей друзей и подсчитает число новых историй и входящих сообщений.

Авторизация в аккаунт

При переходе в браузерную версию сайта, нас встречает такое окно:

Но просто вставить логин, пароль и нажать на кнопку «Войти» недостаточно: впереди будет ещё два окна. Во-первых, предложение сохранить данные — здесь мы тактично жмём «Не сейчас». Instagram тщательно следит за каждым нашим действием и малейшие аномалии в поведении приводят к блокировке, поэтому любые предложения по сохранению данных будем на всякий случай пропускать.

Следующим препятствием будет предложение включить уведомление, которое мы тоже пропустим:

Первым делом импортируем библиотеки:

from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup as bs
import time
import random

И описываем функцию authorize — она будет принимать driver в качестве аргумента, отправлять в нужные поля логин и пароль, нажимать на кнопку «Войти», затем ждать десять секунд на загрузку страницы, нажимать на кнопку «Не сейчас», снова ждать загрузки страницы и пропускать уведомления:

def authorize(driver):
    username = 'login'
    password = 'password'
    driver.get('https://www.instagram.com')
    time.sleep(5)
    driver.find_element_by_name("username").send_keys(username)
    driver.find_element_by_name("password").send_keys(password)
    driver.execute_script("document.getElementsByClassName('sqdOP  L3NKy   y3zKF     ')[0].click()")
    time.sleep(10)
    driver.execute_script("document.getElementsByClassName('sqdOP  L3NKy   y3zKF     ')[0].click()")
    time.sleep(10)
    driver.execute_script("document.getElementsByClassName('aOOlW   HoLwm ')[0].click()")

Новые сообщения

В Instagram могут прийти сообщения двух видов. В случае, если вы не подписаны на отправителя — придёт запрос на диалог. Если подписаны — придёт входящее сообщения. Оба случая обрабатываются по-разному. Число входящих сообщений можно получить с главной страницы — это число над иконкой бумажного самолётика:

А число запросов можно забрать текстом заголовка h5 из раздела «Сообщения». Сперва перейдём в этот раздел и попробуем найти строку с запросами на сообщение. Затем вернёмся на главную страницу и возьмём то самое число новых сообщений.

def messages_count(driver):
    driver.get('https://www.instagram.com/direct/inbox/')
    time.sleep(2)
    inbox = bs(driver.page_source)
    try:
        queries_text = inbox.find_all('h5')[0].text
    except Exception:
        queries_text = None
    driver.get('https://www.instagram.com')
    time.sleep(2)
    content = bs(driver.page_source)
    try:
        messages_count = int(content.find_all('div', attrs={'class':'KdEwV'})[0].text)
    except Exception:
        messages_count = 0
    return queries_text, messages_count

Подсчёт числа новых сторис

Все истории хранятся в одном блоке:

Это список с одинаковым классом, но в каждом элементе списка лежит ещё один div-блок. У новых историй это класс eebAO h_uhZ, у просмотренных — eebAO.

Ещё есть такая кнопка, которая показывает следующую пачку историй:

При этом Instagram динамически прогружает код страницы, и в нём не найти те элементы, которые вы не видите своими глазами. Поэтому мы возьмём первые 8 видимых новых историй, добавим в список, нажмём на кнопку «Показать следующие истории» и будем продолжать так, пока кнопка ещё отображается. А затем подсчитаем число уникальных элементов, чтобы избежать возможных дубликатов.

def get_stories_count(driver):
    stories_divs = []
    scroll = True
    while scroll:
        try:
            content = bs(driver.page_source)
            stories_divs.extend(content.find_all('div', attrs={'class':'eebAO h_uhZ'}))
            driver.execute_script("document.getElementsByClassName('  _6CZji oevZr  ')[0].click()")
            time.sleep(1)
        except Exception as E:
            scroll = False
    return len(set(stories_divs))

Просмотр сторис

Следующее, чем может заняться реальный пользователь после авторизации — просмотр свежих историй. Для того, чтобы зайти в блок историй, нужно просто нажать на кнопку класса OE3OK:

Есть еще две кнопки, о которых мы должны знать. Это кнопка для переключения на следующую историю — она в классе FhutL и кнопка закрытия блока историй — класс wpO6b. Пускай одна история будет отнимать у нас от 10 до 15 секунд, и с вероятностью 1/5 мы переключим на следующую. При этом зададим переменные counter и limit — пусть сейчас мы хотим посмотреть случайное число историй от 5 до 45, и если мы уже посмотрели столько, то выходим из функции и историй.

def watch_stories(driver):
    watching = True
    counter = 0
    limit = random.randint(5, 45)
    driver.execute_script("document.getElementsByClassName('OE3OK ')[0].click()")
    try:
        while watching:
            time.sleep(random.randint(10, 15))
            if random.randint(1, 5) == 5:
                driver.execute_script("document.getElementsByClassName('FhutL')[0].click()")
            counter += 1
            if counter > limit:
                driver.execute_script("document.getElementsByClassName('wpO6b ')[1].click()")
                watching = False
    except Exception as E:
        print(E)
        watching = False

Скроллинг ленты

После просмотра актуальных историй можно поскроллить ленту — это действие ничем не отличается от классического скроллинга страниц в Selenium. Запоминаем последнюю доступную длину страницы, скроллим до неё, ожидаем прогрузки, получаем новую. Прекратим просматривать ленту в двух случаях — если в random.randint() сгенерировалась единица или если лента кончилась.

def scroll_feed(driver):
    scrolling = True
    last_height = driver.execute_script("return document.body.scrollHeight")
    while scrolling:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(random.randint(4,10))
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height or random.randint(1, 10) == 1:
            scrolling = False
        last_height = new_height

Просмотр рекомендуемых аккаунтов

Instagram в заглавной странице сам рекомендует нам для подписки некоторые аккаунты. Выглядит она так:

И на ней тоже придётся скроллить, чтобы дойти до конца. Заходим на страницу и ожидаем 5 секунд прогрузки, затем снова получаем длину страницы и скроллим вниз. Выходим тоже с вероятностью 1/10 или если страница кончилась, но ещё с вероятностью 1/2 подписываемся на некоторые из первых 100 аккаунтов рекомендаций:

def scroll_recomendations(driver):
   driver.get('https://www.instagram.com/explore/people/suggested/')
    time.sleep(5)
    scrolling = True
    last_height = driver.execute_script("return document.body.scrollHeight")
    while scrolling:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(random.randint(4,10))
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height or random.randint(1, 10) == 1:
            scrolling = False
        last_height = new_height
        if random.randint(0, 1):
            try:
                driver.execute_script(f"document.getElementsByClassName('sqdOP  L3NKy   y3zKF     ')[{random.randint(1,100)}].click()")
            except Exception as E:
                print(E)

Просмотр рекомендуемых постов

Помимо ленты, которая сформирована из наших подписок, Instagram собирает ленту рекомендаций. Туда входят все посты, которые потенциально могут вам понравиться — мы просто пройдём вниз по этой ленте. Выйдем с вероятностью 1/5 или когда кончится, чтобы долго не засиживаться.

def scroll_explore(driver):
    driver.get('https://www.instagram.com/explore')
    time.sleep(3)
    scrolling = True
    last_height = driver.execute_script("return document.body.scrollHeight")
    while scrolling:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(random.randint(4,10))
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height or random.randint(1, 5) == 1:
            scrolling = False
        last_height = new_height

Итог

Теперь можно собрать все функции вместе — создаём новый driver, проводим авторизацию, считаем число новых сторис и сообщений, просматриваем сторис, переходим в рекомендуемые подписки и листаем ленту. В конце печатаем полученные данные — число новых сообщений, запросов и историй друзей.

driver = webdriver.Chrome(ChromeDriverManager().install())
authorize(driver)
queries_text, messages_count = messages_count(driver)
stories_count = get_stories_count(driver)
watch_stories(driver)
scroll_recomendations(driver)
scroll_feed(driver)
scroll_explore(driver)

if queries_text is not None:
    print(queries_text)
else:
    print('Нет новых запросов на диалог')
print('Новых сообщений:', messages_count)

print('Новых историй:', stories_count)
 Нет комментариев    269   1 мес   analysis   Analytics Engineering   instagram   python   selenium
Ранее Ctrl + ↓