Позднее Ctrl + ↑

Сбор информации о подписчиках Telegram-канала

Время чтения текста – 6 минут

На 2021 год боты в Telegram так и не имеют метода, позволяющего получать информацию о подписчиках канала. Тем не менее, существует достаточно сложное в освоении Telegram API и построенная на нём библиотека Telethon. Сегодня мы посмотрим, как при помощи библиотеки выгрузить информацию о подписчиках своего канала.

Создание приложения

Для начала необходимо создать приложение, через которое будут отправляться запросы к API. Перейдите на https://my.telegram.org и авторизуйтесь в Telegram-аккаунте:

После успешной авторизации перейдите на страницу API development tools:

Заполните все поля и жмите на создание приложения:

Из полученной конфигурации нам необходим app api_id и app api_hash:

Запрос к API

Импортируем telethon — он поможет сформировать запрос, и pandas — полученный ответ мы запишем в DataFrame.

from telethon import TelegramClient
import pandas as pd

Вводим api_id, api_hash, наш номер телефона и ссылку на канал, информацию о подписчиках которого хотим получить. Доступ к информации о подписчиках есть только у администраторов канала.

api_id = 1234567
api_hash = '1b42hj25kd8jw42b234kwj242c'
phone = '+71234567890'
channel_href = 'https://t.me/leftjoin'

Создаём новую сессию — вместо session_name можно подставить любое другое название. Методы в библиотеке работают асинхронно, поэтому ответа от них требуется ожидать:

client = TelegramClient('session_name', api_id, api_hash)
client = await client.start()
dialogs = await client.get_dialogs()

Собираем все каналы текущего пользователя. Из ссылки забираем часть с именем канала и вытаскиваем из словаря нужный:

channels = {d.entity.username: d.entity
            for d in dialogs
            if d.is_channel}
my_channel = channel_href.split('/')[-1]
channel = channels[my_channel]

Подписчиков, доступ к которым не ограничен приватностью, можно получить методом get_participants. С 20 июля 2018 года Telegram установил ограничение в 200 подписчиков для вызова метода, и установка параметра aggressive на True поможет получить всех подписчиков за раз.

members_telethon_list = await client.get_participants(channel, aggressive=True)

Из полученных библиотечных структур извлекаем информацию о пользователях — их имена и телефоны:

username_list = [member.username for member in members_telethon_list]
first_name_list = [member.first_name for member in members_telethon_list]
last_name_list = [member.last_name for member in members_telethon_list]
phone_list = [member.phone for member in members_telethon_list]

Из четырёх списков собираем DataFrame и пишем его в csv-таблицу:

df = pd.DataFrame()
df['username'] = username_list
df['first_name'] = first_name_list
df['last_name'] = last_name_list
df['phone'] = phone_list
df.to_csv('subscribers.csv', index=False)

Результат работы — такая таблица:

Для запуска в Jupyter Notebook описанный ниже код можно просто вставить в ячейку, но при запуске из Python-файла будет такая ошибка:

SyntaxError: 'await' outside function

Устранить проблему можно, записав весь код в асинхронную функцию. Целиком выглядеть код будет так:

from telethon import TelegramClient
import pandas as pd
import asyncio

async def main():
        api_id = 1234567
        api_hash = '1b42hj25kd8jw42b234kwj242c'
        phone = '+71234567890'
        channel_href = 'https://t.me/leftjoin'

	client = TelegramClient('session_name', api_id, api_hash)
	client = await client.start()
	dialogs = await client.get_dialogs()

	channels = {d.entity.username: d.entity
				for d in dialogs
				if d.is_channel}
	my_channel = channel_href.split('/')[-1]
	channel = channels[my_channel]

	members_telethon_list = await client.get_participants(channel, aggressive=True)

	username_list = [member.username for member in members_telethon_list]
	first_name_list = [member.first_name for member in members_telethon_list]
	last_name_list = [member.last_name for member in members_telethon_list]
	phone_list = [member.phone for member in members_telethon_list]

	df = pd.DataFrame()
	df['username'] = username_list
	df['first_name'] = first_name_list
	df['last_name'] = last_name_list
	df['phone'] = phone_list
	df.to_csv('subscribers.csv', index=False)

if __name__ == '__main__':
	loop = asyncio.get_event_loop()
	loop.run_until_complete(main())
 28 комментариев    5671   2021   Analytics Engineering   python   telegram   telethon

Матемаркетинг: современный облачный Data Stack

Время чтения текста – 1 минута

С 9 по 13 ноября в онлайн-формате прошёл Матемаркетинг — крупнейшая конференция по маркетинговой аналитике в России, и в этом году мне посчастливилось стать одним из спикеров. Я выступил с двумя докладами, в этом материале обсудим первый — о современном облачном Data Stack.

Внутри объясняю подход к проектированию аналитической инфраструктуры, обосновываю использование Clickhouse при построении облачной аналитики и рассказываю о его же нюансах и говорю про Redash с точки зрения инструмента для визуализации.

 Нет комментариев    26   2021   Analytics Engineering   clickhouse   Data Analytics   data stack   reda

Робот для автоматизированного просмотра Instagram на Python и Selenium

Время чтения текста – 13 минут

Недавно мы начали вести Instagram — подписывайтесь, чтобы не пропустить контент, которого нет в блоге и Telegram!

Многие из нас ежедневно заходят в Instagram, чтобы посмотреть истории друзей и полистать ленту постов и рекомендаций. Предлагаем действенный способ сохранить своё время — напишем на Python и Selenium робота, который возьмёт на себя рутинную задачу проверки свежих новостей друзей и подсчитает число новых историй и входящих сообщений.

Авторизация в аккаунт

При переходе в браузерную версию сайта, нас встречает такое окно:

Но просто вставить логин, пароль и нажать на кнопку «Войти» недостаточно: впереди будет ещё два окна. Во-первых, предложение сохранить данные — здесь мы тактично жмём «Не сейчас». Instagram тщательно следит за каждым нашим действием и малейшие аномалии в поведении приводят к блокировке, поэтому любые предложения по сохранению данных будем на всякий случай пропускать.

Следующим препятствием будет предложение включить уведомление, которое мы тоже пропустим:

Первым делом импортируем библиотеки:

from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup as bs
import time
import random

И описываем функцию authorize — она будет принимать driver в качестве аргумента, отправлять в нужные поля логин и пароль, нажимать на кнопку «Войти», затем ждать десять секунд на загрузку страницы, нажимать на кнопку «Не сейчас», снова ждать загрузки страницы и пропускать уведомления:

def authorize(driver):
    username = 'login'
    password = 'password'
    driver.get('https://www.instagram.com')
    time.sleep(5)
    driver.find_element_by_name("username").send_keys(username)
    driver.find_element_by_name("password").send_keys(password)
    driver.execute_script("document.getElementsByClassName('sqdOP  L3NKy   y3zKF     ')[0].click()")
    time.sleep(10)
    driver.execute_script("document.getElementsByClassName('sqdOP  L3NKy   y3zKF     ')[0].click()")
    time.sleep(10)
    driver.execute_script("document.getElementsByClassName('aOOlW   HoLwm ')[0].click()")

Новые сообщения

В Instagram могут прийти сообщения двух видов. В случае, если вы не подписаны на отправителя — придёт запрос на диалог. Если подписаны — придёт входящее сообщения. Оба случая обрабатываются по-разному. Число входящих сообщений можно получить с главной страницы — это число над иконкой бумажного самолётика:

А число запросов можно забрать текстом заголовка h5 из раздела «Сообщения». Сперва перейдём в этот раздел и попробуем найти строку с запросами на сообщение. Затем вернёмся на главную страницу и возьмём то самое число новых сообщений.

def messages_count(driver):
    driver.get('https://www.instagram.com/direct/inbox/')
    time.sleep(2)
    inbox = bs(driver.page_source)
    try:
        queries_text = inbox.find_all('h5')[0].text
    except Exception:
        queries_text = None
    driver.get('https://www.instagram.com')
    time.sleep(2)
    content = bs(driver.page_source)
    try:
        messages_count = int(content.find_all('div', attrs={'class':'KdEwV'})[0].text)
    except Exception:
        messages_count = 0
    return queries_text, messages_count

Подсчёт числа новых сторис

Все истории хранятся в одном блоке:

Это список с одинаковым классом, но в каждом элементе списка лежит ещё один div-блок. У новых историй это класс eebAO h_uhZ, у просмотренных — eebAO.

Ещё есть такая кнопка, которая показывает следующую пачку историй:

При этом Instagram динамически прогружает код страницы, и в нём не найти те элементы, которые вы не видите своими глазами. Поэтому мы возьмём первые 8 видимых новых историй, добавим в список, нажмём на кнопку «Показать следующие истории» и будем продолжать так, пока кнопка ещё отображается. А затем подсчитаем число уникальных элементов, чтобы избежать возможных дубликатов.

def get_stories_count(driver):
    stories_divs = []
    scroll = True
    while scroll:
        try:
            content = bs(driver.page_source)
            stories_divs.extend(content.find_all('div', attrs={'class':'eebAO h_uhZ'}))
            driver.execute_script("document.getElementsByClassName('  _6CZji oevZr  ')[0].click()")
            time.sleep(1)
        except Exception as E:
            scroll = False
    return len(set(stories_divs))

Просмотр сторис

Следующее, чем может заняться реальный пользователь после авторизации — просмотр свежих историй. Для того, чтобы зайти в блок историй, нужно просто нажать на кнопку класса OE3OK:

Есть еще две кнопки, о которых мы должны знать. Это кнопка для переключения на следующую историю — она в классе FhutL и кнопка закрытия блока историй — класс wpO6b. Пускай одна история будет отнимать у нас от 10 до 15 секунд, и с вероятностью 1/5 мы переключим на следующую. При этом зададим переменные counter и limit — пусть сейчас мы хотим посмотреть случайное число историй от 5 до 45, и если мы уже посмотрели столько, то выходим из функции и историй.

def watch_stories(driver):
    watching = True
    counter = 0
    limit = random.randint(5, 45)
    driver.execute_script("document.getElementsByClassName('OE3OK ')[0].click()")
    try:
        while watching:
            time.sleep(random.randint(10, 15))
            if random.randint(1, 5) == 5:
                driver.execute_script("document.getElementsByClassName('FhutL')[0].click()")
            counter += 1
            if counter > limit:
                driver.execute_script("document.getElementsByClassName('wpO6b ')[1].click()")
                watching = False
    except Exception as E:
        print(E)
        watching = False

Скроллинг ленты

После просмотра актуальных историй можно поскроллить ленту — это действие ничем не отличается от классического скроллинга страниц в Selenium. Запоминаем последнюю доступную длину страницы, скроллим до неё, ожидаем прогрузки, получаем новую. Прекратим просматривать ленту в двух случаях — если в random.randint() сгенерировалась единица или если лента кончилась.

def scroll_feed(driver):
    scrolling = True
    last_height = driver.execute_script("return document.body.scrollHeight")
    while scrolling:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(random.randint(4,10))
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height or random.randint(1, 10) == 1:
            scrolling = False
        last_height = new_height

Просмотр рекомендуемых аккаунтов

Instagram в заглавной странице сам рекомендует нам для подписки некоторые аккаунты. Выглядит она так:

И на ней тоже придётся скроллить, чтобы дойти до конца. Заходим на страницу и ожидаем 5 секунд прогрузки, затем снова получаем длину страницы и скроллим вниз. Выходим тоже с вероятностью 1/10 или если страница кончилась, но ещё с вероятностью 1/2 подписываемся на некоторые из первых 100 аккаунтов рекомендаций:

def scroll_recomendations(driver):
   driver.get('https://www.instagram.com/explore/people/suggested/')
    time.sleep(5)
    scrolling = True
    last_height = driver.execute_script("return document.body.scrollHeight")
    while scrolling:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(random.randint(4,10))
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height or random.randint(1, 10) == 1:
            scrolling = False
        last_height = new_height
        if random.randint(0, 1):
            try:
                driver.execute_script(f"document.getElementsByClassName('sqdOP  L3NKy   y3zKF     ')[{random.randint(1,100)}].click()")
            except Exception as E:
                print(E)

Просмотр рекомендуемых постов

Помимо ленты, которая сформирована из наших подписок, Instagram собирает ленту рекомендаций. Туда входят все посты, которые потенциально могут вам понравиться — мы просто пройдём вниз по этой ленте. Выйдем с вероятностью 1/5 или когда кончится, чтобы долго не засиживаться.

def scroll_explore(driver):
    driver.get('https://www.instagram.com/explore')
    time.sleep(3)
    scrolling = True
    last_height = driver.execute_script("return document.body.scrollHeight")
    while scrolling:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(random.randint(4,10))
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height or random.randint(1, 5) == 1:
            scrolling = False
        last_height = new_height

Итог

Теперь можно собрать все функции вместе — создаём новый driver, проводим авторизацию, считаем число новых сторис и сообщений, просматриваем сторис, переходим в рекомендуемые подписки и листаем ленту. В конце печатаем полученные данные — число новых сообщений, запросов и историй друзей.

driver = webdriver.Chrome(ChromeDriverManager().install())
authorize(driver)
queries_text, messages_count = messages_count(driver)
stories_count = get_stories_count(driver)
watch_stories(driver)
scroll_recomendations(driver)
scroll_feed(driver)
scroll_explore(driver)

if queries_text is not None:
    print(queries_text)
else:
    print('Нет новых запросов на диалог')
print('Новых сообщений:', messages_count)

print('Новых историй:', stories_count)
 Нет комментариев    179   2021   analysis   Analytics Engineering   instagram   python   selenium

Парсинг целевой аудитории ВКонтакте

Время чтения текста – 7 минут

При размещении рекламы некоторые площадки в настройках аудитории позволяют загрузить список конкретных людей, которые увидят рекламу. Для парсинга id по конкретным пабликам существуют специальные инструменты, но куда интереснее (и дешевле) сделать это собственноручно при помощи Python и VK API. Сегодня расскажем, как для рекламной кампании LEFTJOIN мы спарсили целевую аудиторию и загрузили её в рекламный кабинет.

В материале «Собираем данные по рекламным кампаниям ВКонтакте» подробно описан процесс получения токена пользователя для VK API

Парсинг пользователей

Для отправки запросов потребуется токен пользователя и список пабликов, чьих участников мы хотим получить. Мы собрали около 30 сообществ, посвящённых аналитике, BI-инструментам и Data Science.

import requests
import time

group_list =  ['datacampus', '185023286', 'data_mining_in_action', '223456', '187222444', 'nta_ds_ai', 'business__intelligence', 'club1981711', 'datascience', 'ozonmasters', 'businessanalysts', 'datamining.team', 'club.shad', '174278716', 'sqlex', 'sql_helper', 'odssib', 'sapbi', 'sql_learn', 'hsespbcareer', 'smartdata', 'pomoshch_s_spss', 'dwhexpert', 'k0d_ds', 'sql_ex_ru', 'datascience_ai', 'data_club', 'mashinnoe_obuchenie_ai_big_data', 'womeninbigdata', 'introstats', 'smartdata', 'data_mining_in_action', 'dlschool_mipt']

token = 'ваш_токен'

Запрос на получение участников сообщества к API ВКонтакте вернёт максимум 1000 строк — для получения последующих тысяч потребуется смещать параметр offset на единицу. Но нужно знать, до какого момента это делать — поэтому опишем функцию, которая принимает id сообщества, получает информацию о числе участников сообщества и возвращает максимальное значение для offset — отношение числа участников к 1000, ведь мы можем получить ровно тысячу человек за раз.

def get_offset(group_id):
    count = requests.get('https://api.vk.com/method/groups.getMembers', params={
            'access_token':token,
            'v':5.103,
            'group_id': group_id,
            'sort':'id_desc',
            'offset':0,
            'fields':'last_seen'
        }).json()['response']['count']
    return count // 1000

Следующим этапом опишем функцию, которая принимает id сообщества, собирает в один список id всех подписчиков и возвращает его. Для этого отправляем запросы на получение 1000 человек, пока не кончается offset, вносим данные в список и возвращаем его. Проходя по каждому человеку дополнительно проверяем дату его последнего посещения социальной сети — если он не заходил с середины ноября, добавлять его не будем. Время указывается в формате unixtime.

def get_users(group_id):
    good_id_list = []
    offset = 0
    max_offset = get_offset(group_id)
    while offset < max_offset:
        response = requests.get('https://api.vk.com/method/groups.getMembers', params={
            'access_token':token,
            'v':5.103,
            'group_id': group_id,
            'sort':'id_desc',
            'offset':offset,
            'fields':'last_seen'
        }).json()['response']
        offset += 1
        for item in response['items']:
            try:
                if item['last_seen']['time'] >= 1605571200:
                    good_id_list.append(item['id'])
            except Exception as E:
                continue
    return good_id_list

Теперь пройдём по всем сообществам из списка и для каждого соберём участников, а затем внесём их в общий список all_users. В конце переводим сначала список в множество, а затем опять в список, чтобы избавиться от возможных дубликатов: одни и те же люди могли быть участниками разных пабликов. Лишним не будет после каждого паблика приостановить работу программы на секунду, чтобы не столкнуться с ограничениями на число запросов.

all_users = []

for group in group_list:
    print(group)
    try:
        users = get_users(group)
        all_users.extend(users)
        time.sleep(1)
    except KeyError as E:
        print(group, E)
        continue

all_users = list(set(all_users))

Последним шагом записываем каждого пользователя в файл с новой строки.

with open('users.txt', 'w') as f:
    for item in all_users:
        f.write("%s\n" % item)

Аудитория в рекламном кабинете из файла

Переходим в свой рекламный кабинет ВКонтакте и заходим во вкладку «Ретаргетинг». Там будем кнопка «Создать аудиторию»:

После нажатия на неё откроется новое окно, где можно будет выбрать в качестве источника файл и указать название для аудитории:

После загрузки пройдёт несколько секунд и аудитория будет доступна. Первые минут 10 будет указано, что аудитория слишком мала: это не так и панель вскоре обновится, если в вашей аудитории действительно более 100 человек.

Итоги

Сравним среднюю стоимость привлечённого в наше сообщество участника в объявлении с автоматической настройкой аудитории и в объявлении, аудиторию для которого мы спарсили. В первом случае получаем среднюю стоимость в 52,4 рубля, а во втором — в 33,2 рубля. Подбор качественной аудитории при помощи методов парсинга данных из ВКонтакте помог снизить среднюю стоимость на 37%.

Для рекламной кампании мы подготовили такой пост (нажмите на картинку, чтобы перейти к нему):

 3 комментария    681   2020   Analytics Engineering   api   python   vk   vk api

Итоги прохождения курса по dbt

Время чтения текста – 4 минуты

Недавно прошёл курс по dbt от команды dbt. Курс классный, в нем много практики. Я использовал Google BigQuery и публичные датасеты от dbt для решения описанных примеров, а в обучающих материалах все построено на Snowflake.

В целом, узнал много нового и полезного о dbt, кратко summary:

  1. Во введении ребята объясняют роль Analytics Engineer, о котором так много разговоров и ссылаются на их пост блога
  2. Дается исчерпывающая информация о том, как подключить dbt к хранилищу и .git
  3. В dbt довольно тривиальными запросами реализовано тестирование данных на предмет уникальности и соответствия значениям. Это реально базовые SQL-запросы, которые проверяют наличие или отсутствие поля или значений. И тут интересно следующее: когда пишешь самостоятельно похожие запросы иногда думаешь, что во всем остальном мире так никто не делает, ну, к примеру, как в запросе ниже. А оказывается еще как делают, вот даже публично внутри dbt все эти тесты так и реализованы. И, кстати, крайне удобно, что SQL-код каждого теста можно изучить и скомпилировать.
SELECT sum(amount) FROM ... HAVING sum(amount) > 0
  1. Круто и удобно формируется документация и DAG (Directed Acyclic Graph), который показывает все шаги преобразований модели
  2. Поскольку dbt построен на Liquid и использовании Jinja (движок шаблонов в Python), то можно делать всякие невероятные вещи вроде написания внутреннего макроса (читай, условный операторы, циклы или создание функций) и применять этот макрос для автоматизации однотипных частей запроса. Это прям вау 🙂
  3. Многие вещи уже придуманы и разработаны коммьюнити, поэтому существует dbt hub, через который можно подключить интересующие пакеты и не изобретать велосипед.
  4. Отдельного упоминания достойны алгоритмы формирования инкрементального наполнения таблиц и создания снэпшотов. Для одного из проектов абсолютно такой же алгоритм по созданию снэпшотов с date_form / date_to мне доводилось проектировать самостоятельно.
    Было приятно увидеть, что у ребят из dbt это работает абсолютно аналогичным образом.
  1. Разумеется, используя Jinja и dbt, можно автоматизировать построение аналитических запросов, это так и называется Analyses. Скомпилированный код запроса, можно имплементировать в любимую BI-систему и наслаждаться результатами.

Общие впечатления очень положительные: dbt ждет большое будущее и развитие, ведь коммьюнити растет вместе с возможностями и ресурсами компании. Ждем коннекторов к другим СУБД помимо PostgreSQL, BigQuery, Snowflake, Redshift.

 Нет комментариев    56   2020   Analytics Engineering   dbt   sql
Ранее Ctrl + ↓