Транзакции в SQLAlchemy

Время чтения текста – 5 минут

Транзакция — последовательность действий, связанных с базой данных. Их основная польза заключается в том, что при возникновении какой-то ошибки или достижении других нужных условий всю транзакцию можно отменить, и все изменения, примененные к базе данных, будут отменены. Сегодня мы напишем небольшой скрипт, который при помощи транзакций SQLAlchemy пишет информацию о подписчиках сообщества в базу данных MySQL, а при возникновении ошибки отменяет текущую транзакцию.

Сбор информации об участниках через VK API

Для начала напишем пару маленьких функций — первая будет возвращать число подписчиков сообщества, а вторая — отправлять запрос и формировать датафрейм с информацией о подписчиках сообщества.

Подробнее о том, как получить токен, можно прочитать в материале «Собираем данные по рекламным кампаниям ВКонтакте»

from sqlalchemy import create_engine
import pandas as pd
import requests
import time

token = '42hj2ehd3djdournf48fjurhf9r9o2eurnf48fjurhf9r9734'
group_id = 'leftjoin'

Чтобы узнать число подписчиков достаточно отправить метод groups.getMembers с любыми параметрами — в ответе всегда возвращается количество в поле count.

def get_subs_count(group_id):
    count = requests.get('https://api.vk.com/method/groups.getMembers', params={
        'access_token':token,
        'v':5.103,
        'group_id':group_id
    }).json()['response']['count']
    return count

Для примера будем брать имена, id, фамилии подписчиков, некоторую расширенную информацию и получать только по 10 подписчиков за раз, чтобы рассмотреть работу транзакций детально — каждые 10 подписчиков будут вставляться одной транзакцией. Введём дополнительное поле offset, чтобы знать, в какой итерации добавлены строки.

def get_subs_info(group_id, offset):
    response = requests.get('https://api.vk.com/method/groups.getMembers', params={
        'access_token':token,
        'v':5.103,
        'group_id':group_id,
        'offset':offset,
        'count':10,
        'fields':'sex, has_mobile, relation, can_post'
    }).json()['response']['items']
    df = pd.DataFrame(response)
    df['offset'] = offset
    return df

Транзакции

Наконец, можем подсоединиться к базе данных при помощи SQLAlchemy:

engine = create_engine('mysql+mysqlconnector://' +
                           'root' + ':' + '' + '@' +
                           'localhost' + '/' +
                           'transaction', echo=False)

У транзакций всегда должно быть начало — begin, и конец — commit. В случае, если произошла какая-то ошибка, можно сделать откат — rollback. Сперва получаем число подписчиков сообщество, и в каждой итерации цикла при помощи контекстного менеджера with ... as создаём новое подключение. Сразу после объявляем начало транзакции по этому подключению и с обработчиком исключений пробуем получить информацию о десяти подписчиках через функцию get_subs_info. Вставляем полученный датафрейм в таблицу методом to_sql и завершаем транзакцию при помощи метода commit(). В случае, если возникла какая-то ошибка — печатаем её на экран и отменяем транзакцию.

offset = 0
subs_count = get_subs_count(group_id)
while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            transaction.commit()
        except Exception as E:
            print(E)
            transaction.rollback()
    time.sleep(1)
    offset += 10

Чтобы протестировать работу транзакций слегка обновим последний блок кода — добавим вызов ошибки ValueError после вставки данных в базу, если текущий offset равен 10.

offset = 0
subs_count = get_subs_count(group_id)
while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            if offset == 10:
                raise(ValueError)
            transaction.commit()
        except Exception as E:
            print(E)
            transaction.rollback()
    time.sleep(1)
    offset += 10

Как и планировалось, данные за итерацию с offset = 10 не занесены в таблицу. Несмотря на то, что ошибка возникла уже после добавления новых данных, транзакция была прервана методом rollback() и завершение транзакции было отменено.

 Нет комментариев    284   18 дн   Analytics Engineering   python   sql   sqlalchemy

Сбор информации о подписчиках Telegram-канала

Время чтения текста – 5 минут

На 2021 год боты в Telegram так и не имеют метода, позволяющего получать информацию о подписчиках канала. Тем не менее, существует достаточно сложное в освоении Telegram API и построенная на нём библиотека Telethon. Сегодня мы посмотрим, как при помощи библиотеки выгрузить информацию о подписчиках своего канала.

Создание приложения

Для начала необходимо создать приложение, через которое будут отправляться запросы к API. Перейдите на https://my.telegram.org и авторизуйтесь в Telegram-аккаунте:

После успешной авторизации перейдите на страницу API development tools:

Заполните все поля и жмите на создание приложения:

Из полученной конфигурации нам необходим app api_id и app api_hash:

Запрос к API

Импортируем telethon — он поможет сформировать запрос, и pandas — полученный ответ мы запишем в DataFrame.

from telethon import TelegramClient
import pandas as pd

Вводим api_id, api_hash, наш номер телефона и ссылку на канал, информацию о подписчиках которого хотим получить. Доступ к информации о подписчиках есть только у администраторов канала.

api_id = 1234567
api_hash = '1b42hj25kd8jw42b234kwj242c'
phone = '+71234567890'
channel_href = 'https://t.me/leftjoin'

Создаём новую сессию — вместо session_name можно подставить любое другое название. Методы в библиотеке работают асинхронно, поэтому ответа от них требуется ожидать:

client = TelegramClient('session_name', api_id, api_hash)
client = await client.start()
dialogs = await client.get_dialogs()

Собираем все каналы текущего пользователя. Из ссылки забираем часть с именем канала и вытаскиваем из словаря нужный:

channels = {d.entity.username: d.entity
            for d in dialogs
            if d.is_channel}
my_channel = channel_href.split('/')[-1]
channel = channels[my_channel]

Подписчиков, доступ к которым не ограничен приватностью, можно получить методом get_participants. С 20 июля 2018 года Telegram установил ограничение в 200 подписчиков для вызова метода, и установка параметра aggressive на True поможет получить всех подписчиков за раз.

members_telethon_list = await client.get_participants(channel, aggressive=True)

Из полученных библиотечных структур извлекаем информацию о пользователях — их имена и телефоны:

username_list = [member.username for member in members_telethon_list]
first_name_list = [member.first_name for member in members_telethon_list]
last_name_list = [member.last_name for member in members_telethon_list]
phone_list = [member.phone for member in members_telethon_list]

Из четырёх списков собираем DataFrame и пишем его в csv-таблицу:

df = pd.DataFrame()
df['username'] = username_list
df['first_name'] = first_name_list
df['last_name'] = last_name_list
df['phone'] = phone_list
df.to_csv('subscribers.csv', index=False)

Результат работы — такая таблица:

 Нет комментариев    396   22 дн   Analytics Engineering   python   telegram   telethon

Матемаркетинг: современный облачный Data Stack

Время чтения текста – 1 минута

С 9 по 13 ноября в онлайн-формате прошёл Матемаркетинг — крупнейшая конференция по маркетинговой аналитике в России, и в этом году мне посчастливилось стать одним из спикеров. Я выступил с двумя докладами, в этом материале обсудим первый — о современном облачном Data Stack.

Внутри объясняю подход к проектированию аналитической инфраструктуры, обосновываю использование Clickhouse при построении облачной аналитики и рассказываю о его же нюансах и говорю про Redash с точки зрения инструмента для визуализации.

 Нет комментариев    160   1 мес   Analytics Engineering   clickhouse   Data Analytics   data stack   reda

Робот для автоматизированного просмотра Instagram на Python и Selenium

Время чтения текста – 13 минут

Недавно мы начали вести Instagram — подписывайтесь, чтобы не пропустить контент, которого нет в блоге и Telegram!

Многие из нас ежедневно заходят в Instagram, чтобы посмотреть истории друзей и полистать ленту постов и рекомендаций. Предлагаем действенный способ сохранить своё время — напишем на Python и Selenium робота, который возьмёт на себя рутинную задачу проверки свежих новостей друзей и подсчитает число новых историй и входящих сообщений.

Авторизация в аккаунт

При переходе в браузерную версию сайта, нас встречает такое окно:

Но просто вставить логин, пароль и нажать на кнопку «Войти» недостаточно: впереди будет ещё два окна. Во-первых, предложение сохранить данные — здесь мы тактично жмём «Не сейчас». Instagram тщательно следит за каждым нашим действием и малейшие аномалии в поведении приводят к блокировке, поэтому любые предложения по сохранению данных будем на всякий случай пропускать.

Следующим препятствием будет предложение включить уведомление, которое мы тоже пропустим:

Первым делом импортируем библиотеки:

from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup as bs
import time
import random

И описываем функцию authorize — она будет принимать driver в качестве аргумента, отправлять в нужные поля логин и пароль, нажимать на кнопку «Войти», затем ждать десять секунд на загрузку страницы, нажимать на кнопку «Не сейчас», снова ждать загрузки страницы и пропускать уведомления:

def authorize(driver):
    username = 'login'
    password = 'password'
    driver.get('https://www.instagram.com')
    time.sleep(5)
    driver.find_element_by_name("username").send_keys(username)
    driver.find_element_by_name("password").send_keys(password)
    driver.execute_script("document.getElementsByClassName('sqdOP  L3NKy   y3zKF     ')[0].click()")
    time.sleep(10)
    driver.execute_script("document.getElementsByClassName('sqdOP  L3NKy   y3zKF     ')[0].click()")
    time.sleep(10)
    driver.execute_script("document.getElementsByClassName('aOOlW   HoLwm ')[0].click()")

Новые сообщения

В Instagram могут прийти сообщения двух видов. В случае, если вы не подписаны на отправителя — придёт запрос на диалог. Если подписаны — придёт входящее сообщения. Оба случая обрабатываются по-разному. Число входящих сообщений можно получить с главной страницы — это число над иконкой бумажного самолётика:

А число запросов можно забрать текстом заголовка h5 из раздела «Сообщения». Сперва перейдём в этот раздел и попробуем найти строку с запросами на сообщение. Затем вернёмся на главную страницу и возьмём то самое число новых сообщений.

def messages_count(driver):
    driver.get('https://www.instagram.com/direct/inbox/')
    time.sleep(2)
    inbox = bs(driver.page_source)
    try:
        queries_text = inbox.find_all('h5')[0].text
    except Exception:
        queries_text = None
    driver.get('https://www.instagram.com')
    time.sleep(2)
    content = bs(driver.page_source)
    try:
        messages_count = int(content.find_all('div', attrs={'class':'KdEwV'})[0].text)
    except Exception:
        messages_count = 0
    return queries_text, messages_count

Подсчёт числа новых сторис

Все истории хранятся в одном блоке:

Это список с одинаковым классом, но в каждом элементе списка лежит ещё один div-блок. У новых историй это класс eebAO h_uhZ, у просмотренных — eebAO.

Ещё есть такая кнопка, которая показывает следующую пачку историй:

При этом Instagram динамически прогружает код страницы, и в нём не найти те элементы, которые вы не видите своими глазами. Поэтому мы возьмём первые 8 видимых новых историй, добавим в список, нажмём на кнопку «Показать следующие истории» и будем продолжать так, пока кнопка ещё отображается. А затем подсчитаем число уникальных элементов, чтобы избежать возможных дубликатов.

def get_stories_count(driver):
    stories_divs = []
    scroll = True
    while scroll:
        try:
            content = bs(driver.page_source)
            stories_divs.extend(content.find_all('div', attrs={'class':'eebAO h_uhZ'}))
            driver.execute_script("document.getElementsByClassName('  _6CZji oevZr  ')[0].click()")
            time.sleep(1)
        except Exception as E:
            scroll = False
    return len(set(stories_divs))

Просмотр сторис

Следующее, чем может заняться реальный пользователь после авторизации — просмотр свежих историй. Для того, чтобы зайти в блок историй, нужно просто нажать на кнопку класса OE3OK:

Есть еще две кнопки, о которых мы должны знать. Это кнопка для переключения на следующую историю — она в классе FhutL и кнопка закрытия блока историй — класс wpO6b. Пускай одна история будет отнимать у нас от 10 до 15 секунд, и с вероятностью 1/5 мы переключим на следующую. При этом зададим переменные counter и limit — пусть сейчас мы хотим посмотреть случайное число историй от 5 до 45, и если мы уже посмотрели столько, то выходим из функции и историй.

def watch_stories(driver):
    watching = True
    counter = 0
    limit = random.randint(5, 45)
    driver.execute_script("document.getElementsByClassName('OE3OK ')[0].click()")
    try:
        while watching:
            time.sleep(random.randint(10, 15))
            if random.randint(1, 5) == 5:
                driver.execute_script("document.getElementsByClassName('FhutL')[0].click()")
            counter += 1
            if counter > limit:
                driver.execute_script("document.getElementsByClassName('wpO6b ')[1].click()")
                watching = False
    except Exception as E:
        print(E)
        watching = False

Скроллинг ленты

После просмотра актуальных историй можно поскроллить ленту — это действие ничем не отличается от классического скроллинга страниц в Selenium. Запоминаем последнюю доступную длину страницы, скроллим до неё, ожидаем прогрузки, получаем новую. Прекратим просматривать ленту в двух случаях — если в random.randint() сгенерировалась единица или если лента кончилась.

def scroll_feed(driver):
    scrolling = True
    last_height = driver.execute_script("return document.body.scrollHeight")
    while scrolling:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(random.randint(4,10))
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height or random.randint(1, 10) == 1:
            scrolling = False
        last_height = new_height

Просмотр рекомендуемых аккаунтов

Instagram в заглавной странице сам рекомендует нам для подписки некоторые аккаунты. Выглядит она так:

И на ней тоже придётся скроллить, чтобы дойти до конца. Заходим на страницу и ожидаем 5 секунд прогрузки, затем снова получаем длину страницы и скроллим вниз. Выходим тоже с вероятностью 1/10 или если страница кончилась, но ещё с вероятностью 1/2 подписываемся на некоторые из первых 100 аккаунтов рекомендаций:

def scroll_recomendations(driver):
   driver.get('https://www.instagram.com/explore/people/suggested/')
    time.sleep(5)
    scrolling = True
    last_height = driver.execute_script("return document.body.scrollHeight")
    while scrolling:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(random.randint(4,10))
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height or random.randint(1, 10) == 1:
            scrolling = False
        last_height = new_height
        if random.randint(0, 1):
            try:
                driver.execute_script(f"document.getElementsByClassName('sqdOP  L3NKy   y3zKF     ')[{random.randint(1,100)}].click()")
            except Exception as E:
                print(E)

Просмотр рекомендуемых постов

Помимо ленты, которая сформирована из наших подписок, Instagram собирает ленту рекомендаций. Туда входят все посты, которые потенциально могут вам понравиться — мы просто пройдём вниз по этой ленте. Выйдем с вероятностью 1/5 или когда кончится, чтобы долго не засиживаться.

def scroll_explore(driver):
    driver.get('https://www.instagram.com/explore')
    time.sleep(3)
    scrolling = True
    last_height = driver.execute_script("return document.body.scrollHeight")
    while scrolling:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(random.randint(4,10))
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height or random.randint(1, 5) == 1:
            scrolling = False
        last_height = new_height

Итог

Теперь можно собрать все функции вместе — создаём новый driver, проводим авторизацию, считаем число новых сторис и сообщений, просматриваем сторис, переходим в рекомендуемые подписки и листаем ленту. В конце печатаем полученные данные — число новых сообщений, запросов и историй друзей.

driver = webdriver.Chrome(ChromeDriverManager().install())
authorize(driver)
queries_text, messages_count = messages_count(driver)
stories_count = get_stories_count(driver)
watch_stories(driver)
scroll_recomendations(driver)
scroll_feed(driver)
scroll_explore(driver)

if queries_text is not None:
    print(queries_text)
else:
    print('Нет новых запросов на диалог')
print('Новых сообщений:', messages_count)

print('Новых историй:', stories_count)
 Нет комментариев    344   1 мес   analysis   Analytics Engineering   instagram   python   selenium

Парсинг целевой аудитории ВКонтакте

Время чтения текста – 7 минут

При размещении рекламы некоторые площадки в настройках аудитории позволяют загрузить список конкретных людей, которые увидят рекламу. Для парсинга id по конкретным пабликам существуют специальные инструменты, но куда интереснее (и дешевле) сделать это собственноручно при помощи Python и VK API. Сегодня расскажем, как для рекламной кампании LEFTJOIN мы спарсили целевую аудиторию и загрузили её в рекламный кабинет.

В материале «Собираем данные по рекламным кампаниям ВКонтакте» подробно описан процесс получения токена пользователя для VK API

Парсинг пользователей

Для отправки запросов потребуется токен пользователя и список пабликов, чьих участников мы хотим получить. Мы собрали около 30 сообществ, посвящённых аналитике, BI-инструментам и Data Science.

import requests
import time

group_list =  ['datacampus', '185023286', 'data_mining_in_action', '223456', '187222444', 'nta_ds_ai', 'business__intelligence', 'club1981711', 'datascience', 'ozonmasters', 'businessanalysts', 'datamining.team', 'club.shad', '174278716', 'sqlex', 'sql_helper', 'odssib', 'sapbi', 'sql_learn', 'hsespbcareer', 'smartdata', 'pomoshch_s_spss', 'dwhexpert', 'k0d_ds', 'sql_ex_ru', 'datascience_ai', 'data_club', 'mashinnoe_obuchenie_ai_big_data', 'womeninbigdata', 'introstats', 'smartdata', 'data_mining_in_action', 'dlschool_mipt']

token = 'ваш_токен'

Запрос на получение участников сообщества к API ВКонтакте вернёт максимум 1000 строк — для получения последующих тысяч потребуется смещать параметр offset на единицу. Но нужно знать, до какого момента это делать — поэтому опишем функцию, которая принимает id сообщества, получает информацию о числе участников сообщества и возвращает максимальное значение для offset — отношение числа участников к 1000, ведь мы можем получить ровно тысячу человек за раз.

def get_offset(group_id):
    count = requests.get('https://api.vk.com/method/groups.getMembers', params={
            'access_token':token,
            'v':5.103,
            'group_id': group_id,
            'sort':'id_desc',
            'offset':0,
            'fields':'last_seen'
        }).json()['response']['count']
    return count // 1000

Следующим этапом опишем функцию, которая принимает id сообщества, собирает в один список id всех подписчиков и возвращает его. Для этого отправляем запросы на получение 1000 человек, пока не кончается offset, вносим данные в список и возвращаем его. Проходя по каждому человеку дополнительно проверяем дату его последнего посещения социальной сети — если он не заходил с середины ноября, добавлять его не будем. Время указывается в формате unixtime.

def get_users(group_id):
    good_id_list = []
    offset = 0
    max_offset = get_offset(group_id)
    while offset < max_offset:
        response = requests.get('https://api.vk.com/method/groups.getMembers', params={
            'access_token':token,
            'v':5.103,
            'group_id': group_id,
            'sort':'id_desc',
            'offset':offset,
            'fields':'last_seen'
        }).json()['response']
        offset += 1
        for item in response['items']:
            try:
                if item['last_seen']['time'] >= 1605571200:
                    good_id_list.append(item['id'])
            except Exception as E:
                continue
    return good_id_list

Теперь пройдём по всем сообществам из списка и для каждого соберём участников, а затем внесём их в общий список all_users. В конце переводим сначала список в множество, а затем опять в список, чтобы избавиться от возможных дубликатов: одни и те же люди могли быть участниками разных пабликов. Лишним не будет после каждого паблика приостановить работу программы на секунду, чтобы не столкнуться с ограничениями на число запросов.

all_users = []

for group in group_list:
    print(group)
    try:
        users = get_users(group)
        all_users.extend(users)
        time.sleep(1)
    except KeyError as E:
        print(group, E)
        continue

all_users = list(set(all_users))

Последним шагом записываем каждого пользователя в файл с новой строки.

with open('users.txt', 'w') as f:
    for item in all_users:
        f.write("%s\n" % item)

Аудитория в рекламном кабинете из файла

Переходим в свой рекламный кабинет ВКонтакте и заходим во вкладку «Ретаргетинг». Там будем кнопка «Создать аудиторию»:

После нажатия на неё откроется новое окно, где можно будет выбрать в качестве источника файл и указать название для аудитории:

После загрузки пройдёт несколько секунд и аудитория будет доступна. Первые минут 10 будет указано, что аудитория слишком мала: это не так и панель вскоре обновится, если в вашей аудитории действительно более 100 человек.

Итоги

Сравним среднюю стоимость привлечённого в наше сообщество участника в объявлении с автоматической настройкой аудитории и в объявлении, аудиторию для которого мы спарсили. В первом случае получаем среднюю стоимость в 52,4 рубля, а во втором — в 33,2 рубля. Подбор качественной аудитории при помощи методов парсинга данных из ВКонтакте помог снизить среднюю стоимость на 37%.

Для рекламной кампании мы подготовили такой пост (нажмите на картинку, чтобы перейти к нему):

 2 комментария    260   2 мес   Analytics Engineering   api   python   vk   vk api
Ранее Ctrl + ↓