39 заметок с тегом

python

Позднее Ctrl + ↑

Парсим вакансии для аналитиков из Indeed

Время чтения текста – 8 минут

В этом материале мы расскажем, как парсить вакансии с сайта Indeed. Indeed — это крупнейший в мире поисковик вакансий. Этим текстом мы начинаем большой проект по анализу и визуализации показателей оплаты труда в области Data Science в разных странах.
Подобный анализ рынка вакансий, но только в России, мы проводили в материале Анализ рынка вакансий аналитики и BI: дашборд в Tableau, когда парсили данные с сайта HeadHunter.

А еще у нас можно почитать материал Парсим данные каталога сайта, используя Beautiful Soup и Selenium

Импорт библиотек
Библиотека fake_useragent имитирует реальный User-Agent, чтобы преодолеть защиту сайта от парсинга. Таким образом мы сможем пройти проверку HTTP заголовка User-Agent.
Модуль urllib.parse разбирает URL-адрес на компоненты и записывает его как кортеж. Он пригодится для перехода на карточки вакансий. BeautifulSoup поможет разобраться в структуре html-страницы и добыть нужную нам информацию.

import requests
from datetime import timedelta, datetime
import urllib.parse
from fake_useragent import UserAgent
from bs4 import BeautifulSoup
import pandas as pd
import time
from lxml.html import fromstring
from clickhouse_driver import Client
from clickhouse_driver import errors
import numpy as np
from funcs import check_title, get_skills_row, parse_salary, get_sheetname, create_table

Создадим таблицу в Clickhouse
Данные, которые мы собираемся собрать, будем хранить в базе Clickhouse.

create_table = '''CREATE TABLE if not exists indeed.vacancies (
    row_idx UInt16,
    query_string String,
    country String,
    title String,
    company String,
    city String,
    job_added Date,
    easy_apply UInt8,
    company_rating Nullable(Float32),
    remote UInt8,
    job_id String,
    job_link String,
    sheet String,
    skills String,
    added_date Date,
    month_salary_from_USD Float64,
    month_salary_to_USD Float64,
    year_salary_from_USD Float64,
    year_salary_to_USD Float64,
)
ENGINE = ReplacingMergeTree
SETTINGS index_granularity = 8192'''

Обход блокировок
Нам нужно обойти защиту Indeed и избежать блокировки по IP. Для этого используем анонимные прокси адреса на сайте free-proxy-list.net. Как собрать свежие прокси, мы писали в нашем предыдущем тексте «Пишем парсер свежих прокси на Python для Selenium». Прокси адреса мы запишем в массив, который понадобится в момент обращения к Indeed, когда запрос будет проверять User-Agent.

Данный метод удаляет IP из списка с прокси в том случае, если ответ от Indeed через него так и не пришел.

def remove_proxy_from_list_and_update_if_required(proxy):
    global _proxies
    _proxies.remove(proxy)
    if len(_proxies) == 0:
        update_proxy_list()

Функция, используя прокси, возвращает нам страницу Indeed, из которой мы впоследствии спарсим данные.

def get_page(updated_url, session):
    proxy = get_proxy()
    proxy_dict = {"http": proxy, "https": proxy}
    logger.info(f'try with proxy: {proxy}')
    try:
        session.proxies = proxy_dict
        return session.get(updated_url, timeout=15)
    except (requests.exceptions.RequestException, requests.exceptions.ProxyError, requests.exceptions.ConnectTimeout,
            requests.exceptions.ReadTimeout, requests.exceptions.SSLError,
            requests.exceptions.ConnectionError, url_ex.MaxRetryError, ConnectionResetError,
            socket.timeout, url_ex.ReadTimeoutError):
        remove_proxy_from_list_and_update_if_required(proxy)
        logger.info(f'try with proxy {proxy}')
        return get_page(updated_url, session)

Методы для парсера
Искомые данные нужно будет искать по тегам и атрибутам верстки с помощью BeautifulSoup. Мы заранее собрали ключевые слова, которые нас будут интересовать в вакансиях, и подготовили с ними отдельный датасет.

В карточках вакансий нет точной даты публикации, указано лишь сколько дней назад она была опубликована. Сохраним точную дату публикации в традиционном формате с помощью timedelta.

def raw_date_to_str(raw_date):
    raw_date = raw_date.lower()
    if '+' in raw_date or "более" in raw_date:
        delta = timedelta(days=32)
        return (datetime.now() - delta).strftime("%Y-%m-%d")
    else:
        parts = raw_date.split()
        for part in parts:
            if part.isdigit():
                delta = timedelta(days=part.isdigit())
                return (datetime.now() - delta).strftime("%Y-%m-%d")
    return ""

Сохраним id вакансии в системе Indeed. Подставляя id в URL страницы, мы сможем получить доступ к полному описанию вакансий.

def get_job_id_from_card(card):
    try:
        return card['id'].split('_')[1]
    except:
        return ""

Данный метод соберет названия вакансий.

def get_title_from_card(card):
    try:
        job_title = card.find('a', {'class': 'jobtitle'}).text
        return job_title.replace('\n', '')
    except:
        return ''

Аналогичным образом напишем методы, которые будут собирать данные о названии компании, времени публикации объявления, местоположении работодателя и рейтинге работодателя на портале.

URL сайта Indeed пишется для разных стран по-разному. Для США это будет просто indeed.com, а локализации для других стран получают префиксом xx.indeed.com. Список с префиксами мы собрали в массив заранее из https://opensource.indeedeng.io/api-documentation/docs/supported-countries/ списка Indeed.

def get_link_from_card(card, card_country):
    try:
        if card_country == 'us':
            return f"https://indeed.com{card.find('a', {'class': 'jobtitle'})['href']}"
        else:
            return f"https://{card_country}.indeed.com{card.find('a', {'class': 'jobtitle'})['href']}"
    except:
        return ""

Спарсим описание вакансии, которое можно найти по тегу ’summary’. Именно там содержатся требования, которые предъявляют к кандидату.

def get_summary_from_card_and_transform_to_skills(card):
    try:
        smr = card.find('div', {'class': 'summary'}).text
        return get_skills_row(smr)
    except:
        return ""
Необходимые hard-skills из описания вакансий будем сверять со списком 'skills'. 
skills = ["python", "tableau", "etl", "power bi", "d3.js", "qlik", "qlikview", "qliksense",
          "redash", "metabase", "numpy", "pandas", "congos", "superset", "matplotlib", "plotly",
          "airflow", "spark", "luigi", "machine learning", "amplitude", "sql", "nosql", "clickhouse",
          'sas', "hadoop", "pytorch", "tensorflow", "bash", "scala", "git", "aws", "docker",
          "linux", "kafka", "nifi", "ozzie", "ssas", "ssis", "redis", 'olap', ' r ', 'bigquery', 'api', 'excel']

Эта функция разобьет ’summary’ на слова пробелом и проверит их на соответствие нашему списку. В датасет будут возвращаться совпадения с нашим списком hard-skills.

def get_skills_row(summary):
    summary = summary.lower()
    row = []
    for sk in skills:
        if sk in summary:
            row.append(sk)
    return ','.join(row)

На выходе мы получим таблицу с примерно 30 тысячами строк.

Полный код проекта можно посмотреть в нашем репозитории на GitHub.

В Python 3.10 появился pattern matching

Время чтения текста – 11 минут

Этот материал — перевод статьи «How to use structural pattern matching in Python»

В новом релизе Python 3.10 появились операторы case/match, которые отвечают за реализацию в языке синтаксиса pattern-matching.

Python, несмотря на его простоту и популярность, в отличие от других языков, не имел отдельной формы управления потоком (form of flow control) — способа взять значение и элегантно сопоставить его с одним из множества возможных условий. В C и C++ эта функция реализована конструкцией switch/case, а в Rust она называется pattern matching.

Изящных способов реализовать это в Python, кроме как воспользоваться конструкцией if/elif/else и поиском по словарю, до этого момента не существовало. Оба способа работают, но из-за своей громоздкости они могли затруднить читабельность кода.

За последние годы были предприняты несколько попыток включить синтаксис типа switch/case в Python, но все они провалились. Это первая реализация структурного сопоставления шаблонов (structural pattern matching), которая сейчас доступна только в версии для разработчиков.

Введение в pattern matching на Python

Структурное сопоставление шаблонов (structural pattern matching) вводит оператор match/case, который работает по той же схеме, что и switch/case. Оператор проверяет объект на соответствие одному или нескольким шаблонам и, если совпадение найдено, выполняет действие.

match command:
    case "quit":
        quit()
    case "reset":
        reset()
    case unknown_command:
        print (f"Unknown command'{unknown_command}'")

За каждым выражением case следует шаблон для сопоставления. В данном примере сверху вниз идет сопоставление строк с оператором, и если такое сопоставление найдено, оператор выполняется. Также можно захватить все или часть совпадения и повторно использовать их. В нашем примере в случае с шаблоном сопоставления unknown_command мы использовали его повторно внутри f-строки.

Сопоставление переменных с помощью pattern matching

Если вы хотите сопоставить значение с константами, то константы следует отнести к полям класса:

class Command:
    QUIT = 0
    RESET = 1

command = 0

match command:
    case Command.QUIT:
        quit()
    case Command.RESET:
        reset()

Если вы попробуете сделать это не прибегая к классам, например, так:

QUIT = 0
RESET = 1

command = 0
match command:
    case QUIT:
        quit()
    case RESET:
        reset()

Получите в ответ ошибку, связанную с тем, что имя не относится к известному паттерну:

name capture 'QUIT' makes remaining patterns unreachable

Сопоставление нескольких элементов с помощью pattern matching

Pattern matching используется не только как замена поиска по словарю. Оно используется для описания самой структуры того, что вы хотите сопоставить. Таким образом, вы можете выполнять сопоставления на основе количества сопоставляемых элементов или их комбинаций.

Вот более сложный пример. Здесь пользователь вводит команду, за которой, возможно, следует имя файла:

command = input()
match command.split():
    case ["quit"]:
        quit()
    case ["load", filename]:
        load_from(filename)
    case ["save", filename]:
        save_to(filename)
    case _:
        print (f"Command '{command}' not understood")

Давайте рассмотрим варианты case по порядку:

  • case [’quit’]: проверяет, соответствует ли то, что мы сопоставляем, списку только с элементом ’quit’, полученных после разделения введенных данных с помощью split().
  • case [’load’, filename]: проверяет, является ли первый разделенный элемент строкой ’load’, и следует ли за ней вторая строка. Если вторая строка есть, то вторая строка сохраняется в переменной filename и используется для дальнейшей работы. Аналогично проверяется case [«save», filename]:.
  • case _: это совпадение с подстановочным знаком (wildcard match). Происходит совпадение, если до этого момента не происходило никакого другого совпадения. Обратите внимание, что символ нижнего подчеркивания ( _ ) ни к чему не привязан, в данном случае нижнее подчеркивание используется как сигнал команде match, что рассматриваемый случай является подстановочным знаком (wildcard). (Вот почему мы ссылаемся на команду переменной в теле блока case, ведь ничего не было захвачено.)

Шаблоны в structural pattern matching

Шаблоны могут быть простыми значениями или содержать более сложную логику сопоставления.
Вот несколько примеров:

  • case ’a’: сопоставить с единственным значением ’a’.
  • case [’a’,’b’]: сопоставить с коллекцией (collection) [’a’,’b’].
  • case [’a’, value1]: сопоставить с коллекцией, в которой два значения, и поместить второе значение в переменную value1.
  • case [’a’, *values]: сопоставить с коллекцией, в которой как минимум одно значение. Остальные значения, если они есть, хранить в values. Обратите внимание, что вы можете включить только один элемент со звездочкой в шаблон.
  • case (’a’|’b’|’c’): Оператор or, он же |, может использоваться для обработки нескольких обращений в одном блоке case. Здесь мы сопоставляем ’a’, ’b’, или ’c’.
  • case (’a’|’b’|’c’) as letter: То же, что и выше, за исключением того, что теперь мы помещаем соответствующий элемент в переменную letter.
  • case [’a’, value] if : Переменная связывается только если expression истинно. Переменные, которые мы хотим связать, можно использовать в . Например, если мы используем if value in valid_values, то case будет действительным только в том случае, если захваченное значение value был на самом деле в коллекции valid_values.
  • case [’z’, _]: будет соответствовать любая коллекция элементов, которая начинается с ’z’.

Сопоставление с объектами с помощью pattern matching

Самая продвинутая функция pattern matching в Python — это возможность сопоставлять объекты с определенными свойствами. Рассмотрим приложение, в котором мы работаем с объектом media_object. Этот объект мы хотим преобразовать в файл .jpg и вернуть из функции.

match media_object:
    case Image(type="jpg"):
        # Return as-is
        return media_object
    case Image(type="png") | Image(type="gif"):
        return render_as(media_object, "jpg")
    case Video():
        raise ValueError("Can't extract frames from video yet")
    case other_type:
        raise Exception(f"Media type {media_object} can't be handled yet")

В каждом из описанных выше case мы ищем объект определенного типа, иногда с определенными атрибутами. В первом case ищем соответствие объекта Image , у которого type атрибутирован как ’jpg’. Во втором case идет сопоставление, если type соответствует ’png’> или ’gif’. В третьем case идет проверка на соответствие объекта типу Video, при этом атрибут не имеет значения. И в последнем случае мы получаем все, что не было выбрано ранее.

Вы также можете выполнять захват с сопоставлением объектов:

match media_object:
    case Image(type=media_type):
        print (f"Image of type {media_type}")

Эффективное использование pattern matching

Ключевой момент при работе с match/case в Python заключается в написании шаблонов, в которых будет описана структура того, с чем вы хотите работать. Простые тесты на константы хороши, но если это все, что вы делаете, то лучше просто сделать поиск по словарю. Настоящая ценность структурного сопоставления с шаблоном (structural pattern matching) в Python заключается в возможности сопоставления с шаблоном объекта, а не только с каким-то одним объектом или даже с их набором.

Еще одна важная деталь, которую нужно иметь в виду, это порядок написания сопоставлений. То, какие сопоставления вы проверите в первую очередь, повлияет на эффективность и точность вашего сопоставления в целом. Размещайте наиболее конкретные сопоставления на первом месте, а наиболее общие — на последнем.

В конечном счете, если у вас есть проблема, которую можно решить с помощью if/elif/else или поиска по словарю, то используйте их вместо match/case. Pattern matching является мощным, но не универсальным решением. Используйте его, когда это наиболее целесообразно.

Подробнее с документацией по pattern matching в Python (PEP 622) можно ознакомиться тут.

 2 комментария    847   5 мес   english   python   перевод

Пишем скрипт для автоматизации коммитов GitHub

Время чтения текста – 9 минут

У GitHub есть API, позволяющий делать всё, что можно сделать руками: создавать репозитории, делать коммиты файлов и переключаться между ветками. Не все используют GUI при работе с системой контроля версий, и для коммита файлов приходится вводить не одну команду в терминал, а смена веток нередко приводит к запутанности действий. Сегодня мы поэкспериментируем с GitHub API и напишем скрипт, который сам собирает все файлы текущей директории раз в час и отправляет в отдельную ветку на GitHub при помощи get, post и put-запросов к методу /repos.

Получение access token

Для работы с GitHub API нужно получить токен, который выдаётся каждому пользователю в настройках. Для создания нового токена нажимаем на «Generate new token»:

В процессе работы с GitHub API токен может просрочиться и в ответ придёт ошибка «Bad credentials». Для решения проблемы можно создать новый токен или получить токен приложения

Следом нужно оставить название для токена и отметить области доступа ключу. Для наших целей достаточно дать доступ только к разделу repo.

После в зелёном окошке появится токен:

Отправляем запросы

Подключим нужные библиотеки и введём некоторые константные значения: логин, токен, название для репозитория и ветки, в которую в автоматическом режиме будут отправляться файлы:

import requests
import base64
import json
import os

username = 'leftjoin'
repo = 'leftjoin'
token = "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"
new_branch_name = 'automatic'

Создадим новый репозиторий post-запросом. В поле data передаем название репозитория, в auth — логин и токен:

r = requests.post('https://api.github.com/user/repos',
                  data=json.dumps({'name':repo}),
                  auth=(username, token),
                  headers={"Content-Type": "application/json"})

Теперь загрузим файл README.md, в котором будет строка “Hello, world!”. Для этого строку нужно перекодировать в base64 — GitHub принимает данные в такой кодировке:

content = "Hello, world!"

b_content = content.encode('utf-8')
base64_content = base64.b64encode(b_content)
base64_content_str = base64_content.decode('utf-8')

Теперь создадим коммит в формате словаря: он состоит из пути (пока его оставим пустым), сообщения коммита и содержания:

f = {'path':'',
     'message': 'Automatic update',
     'content': base64_content_str}

Отправим файл в репозиторий put-запросом, передав путь до README.md прямо в ссылке запроса:

f_resp = requests.put(f'https://api.github.com/repos/{username}/{repo}/contents/README.md',
                auth=(username, token),
                headers={ "Content-Type": "application/json" },
                data=json.dumps(f))

Теперь в репозитории в ветке main есть один файл README.md, который содержит строку «Hello, world!»:

Работа с ветками

Так как наш скрипт подразумевает работу в фоновом режиме, не всегда будет возможность контролировать, какие версии файлов он отправляет. Чтобы не засорять основную ветку будем отправлять все файлы в специальную, из которой при надобности можно будет достать обновления. Логика такая: проверяем, есть ли в репозитории такая ветка и, если нет, создаём её. Затем получаем все файлы текущей директории, построчно считываем, перекодируем в base64, коммитим и отправляем в ветку для автоматических обновлений.

Начнём с функции, которая определяет, существует ли ветка с таким названием. Она отправит запрос на получение всех веток репозитория, и вернёт False, если ветки не существует:

def branch_exist(branch_name):
    branches = requests.get(f'https://api.github.com/repos/{username}/{repo}/git/refs').json()
    try:
        for branch in branches:
            if branch['ref'] == 'refs/heads/' + branch_name:
                return True
        return False
    except Exception:
        return False

У каждой ветки, файлов и репозиториев на GitHub есть уникальный идентификатор, получаемый путём хэш-суммы алгоритмом SHA. Он необходим как для модификации файлов, так и для смены веток — опишем функцию, которая по названию ветки получает SHA для неё:

def get_branch_sha(branch_name):
    try:
        branches = requests.get(f'https://api.github.com/repos/{username}/{repo}/git/refs').json()
        for branch in branches:
            sha = None
            if branch['ref'] == 'refs/heads/' + branch_name:
                sha = branch['object']['sha']
            return sha
    except Exception:
        return None

Следом опишем функцию создания новой ветки с названием из переменной new_branch_name:

def create_branch():
    main_branch_sha = get_branch_sha('main')
    requests.post(f'https://api.github.com/repos/{username}/{repo}/git/refs',
             auth=(username, token),
             data=json.dumps({
                 'ref':f'refs/heads/{new_branch_name}',
                 'sha':main_branch_sha
             })).json()

Для модификации файлов тоже необходимо знать его идентификатор:

def get_sha(path):
    r = requests.get(f'https://api.github.com/repos/{username}/{repo}/contents/{path}',
                    auth=(username, token),
                    data=json.dumps({
                        'branch': new_branch_name
                    }))
    sha = r.json()['sha']
    return sha

Наконец, опишем ряд главных функций — первая по аналогии с примером из начала материала принимает содержимое файла и формирует коммит в отдельную ветку, а вторая отправляет файл в указанный репозиторий:

def make_file_and_commit(content, sha=None):
    b_content = content.encode('utf-8')
    base64_content = base64.b64encode(b_content)
    base64_content_str = base64_content.decode('utf-8')
    f = {'path':'',
     'message': 'Automatic update',
     'content': base64_content_str,
     'sha':sha,
     'branch': new_branch_name}
    return f

def send_file(path, data):
    f_resp = requests.put(f'https://api.github.com/repos/{username}/{repo}/contents/{path}',
                    auth=(username, token),
                    headers={ "Content-Type": "application/json" },
                    data=json.dumps(data))
    print(f_resp.json())
    return f_resp

И ещё две функции напоследок: первая нужна, потому что мы должны знать, модифицируем мы уже существующий файл или новый. В случае с новым нет необходимости получать SHA для файла — его просто не существует. Вторая функция считывает всё содержимое файла в строку.

def file_exist(path):
    r = requests.get(f'https://api.github.com/repos/{username}/{repo}/contents/{path}',
                    auth=(username, token))
    return r.ok

def file_reader(path):
    lines = ""
    with open(path, 'r') as f:
        for line in f:
            lines += line
    return lines

Соберём всё вместе в функцию main(). Проверяем, существует ли ветка для автоматических коммитов, затем получаем все файлы директории, проверяем их существование в репозитории и отправляем:

def main():
    if not branch_exist(new_branch_name):
        create_branch()
    files = [os.path.join(dp, f) for dp, dn, fn in os.walk(os.path.expanduser(".")) for f in fn]
    for path in files:
        print(path)
        sha = None
        if file_exist(path):
            sha = get_sha(path)
        lines = file_reader(path)
        f = make_file_and_commit(lines, sha)
        r = send_file(path, f)

Перевести скрипт в автоматический режим может помочь библиотека schedule, которую мы уже использовали в материале «Анализ рынка вакансий аналитики и BI: дашборд в Tableau»

import schedule

schedule.every().hour.do(main)

while True:
    schedule.run_pending()

Вот и всё: мы написали скрипт, который самостоятельно каждый час отправляет все файлы текущей директории в ветку на GitHub.

 Нет комментариев    133   6 мес   api   github   python

Эффективное логирование в Python

Время чтения текста – 5 минут

В Python существует встроенный модуль logging, который позволяет журналировать этапы выполнения программы. Логирование полезно когда, например, нужно оставить большой скрипт сбора / обработки данных на длительное время, а в случае возникновения непредвиденных ошибок выяснить, с чем они могут быть связаны. Анализ логов позволяет быстро и эффективно выявлять проблемные места в коде, но для удобного использования модуля следует написать несколько функций по взаимодействию с ним и вынести их в отдельный файл — сегодня мы этим и займёмся.

Пишем логгер

Создадим файл loggers.py. Для начала импортируем модули и задаём пару значений по умолчанию — директорию для файла с логом и наименование конфигурационного файла, содержащего шаблоны логирования. Его мы опишем следом.

import os
import json
import logging
import logging.config

FOLDER_LOG = "log"
LOGGING_CONFIG_FILE = 'loggers.json'

Опишем функцию для создания папки с логом: она принимает наименование для папки, но по умолчанию будет называть её «log». Директорию создаём при помощи модуля os и только в том случае, если такой директории ещё не существует.

def create_log_folder(folder=FOLDER_LOG):
    if not os.path.exists(folder):
        os.mkdir(folder)

Теперь опишем функцию создания нового логгера по заданному шаблону. Функция должна создать директорию для логирования, открыть конфигурационный файл и достать нужный шаблон. Затем по шаблону при помощи модуля logging создаём новый логгер:

def get_logger(name, template='default'):
    create_log_folder()
    with open(LOGGING_CONFIG_FILE, "r") as f:
        dict_config = json.load(f)
        dict_config["loggers"][name] = dict_config["loggers"][template]
    logging.config.dictConfig(dict_config)
    return logging.getLogger(name)

Для удобства опишем ещё одну функцию — получение стандартного лога. Она ничего не принимает и нужна только для инициализации лога с шаблоном default:

def get_default_logger():
    create_log_folder()
    with open(LOGGING_CONFIG_FILE, "r") as f:
        logging.config.dictConfig(json.load(f))

    return logging.getLogger("default")

Описываем конфигурационный файл

Создадим по соседству файл loggers.json — он будет содержать настройки логгера. Внутри указываем такие настройки, как версию логгера, форматы логирования для разных уровней, наименование выходного файла и его максимальный размер:

{
    "version": 1,
    "disable_existing_loggers": false,
    "formatters": {
        "default": {
            "format": "%(asctime)s - %(processName)-10s - %(name)-10s - %(levelname)-8s - %(message)s"
        }
    },
    "handlers": {
        "console": {
            "class": "logging.StreamHandler",
            "level": "INFO",
            "formatter": "default"
        },
        "rotating_file": {
            "class": "logging.handlers.RotatingFileHandler",
            "level": "DEBUG",
            "formatter": "default",
            "filename": "log/main.log",
            "maxBytes": 10485760,
            "backupCount": 20
        }
    },
    "loggers": {
        "default": {
            "handlers": ["console", "rotating_file"],
            "level": "DEBUG"
        }
    }
}

Использование логгера

Теперь давайте представим, что вы выгружаете данные по API и складываете их в базу данных на примере нашего материала про транзакции в SQLAlchemy. Рассмотрим заключительную часть кода: добавим строку с инициализацией стандартного логгера и изменим код так, чтобы сначала в лог выводился offset, затем в случае успеха предложение «Successfully inserted data», а в случае ошибки выводилась сама ошибка и предложение: «Error: tried to insert data but got an error».

logger = get_logger('main')

offset = 0
subs_count = get_subs_count(group_id)

while offset < subs_count:
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            logger.info(f"{offset} / {subs_count}")
            df = get_subs_info(group_id, offset)
            df.to_sql('subscribers', con=conn, if_exists='append', index=False)
            if offset == 10:
                raise(ValueError("This is a test errror"))
            transaction.commit()
            logger.info(f"Successfully inserted data")
        except Exception as E:
            transaction.rollback()
            logger.error(f"Error: tried to insert {df} but got an error: {E}")
    time.sleep(1)
    offset += 10

Теперь во время работы программы будет отображаться такой вывод, который также будет записан в файл main.log папки log в директории проекта. После завершения работы программы можно исследовать логи, посмотреть, на каких offset возникли проблемы, какие данные не удалось вставить и прочитать текст ошибки:

Обнаружение статистических выбросов в Python

Время чтения текста – 7 минут

Параллельно с выходом материала «Обнаружение выбросов в R» предлагаем посмотреть, как те же методы обнаружения выбросов реализовать в Python.

Данные

Для наглядности эксперимента возьмём тот же пакет данных mpg — скачать его в виде csv-таблицы можно с GitHub. Импортируем библиотеки и читаем таблицу в DataFrame:

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

df = pd.read_csv('mpg.csv')

Минимальные и максимальные значения

Тут всё просто. Выводим описание всего датасета методом describe():

df.describe()

Гистограмма

Такой график тоже можно построить в одну строку, используя внутренние средства библиотеки pandas:

df.hwy.plot(kind='hist', density=1, bins=20, stacked=False, alpha=.5, color='grey')

Box plot

В случае ящика с усами далеко идти тоже не приходится — в pandas есть метод и для этого:

_, bp = df.hwy.plot.box(return_type='both')

Получим точки с графика и выведем их в таблице, используя объект bp:

outliers = [flier.get_ydata() for flier in bp["fliers"]][0]
df[df.hwy.isin(outliers)]

Процентили

При помощи метода quantile получаем соответствующую нижнюю и верхнюю границы, а затем выводим всё, что выходит за их рамки:

lower_bound = df.hwy.quantile(q=0.025)
upper_bound = df.hwy.quantile(q=0.975)
df[(df.hwy < lower_bound) | (df.hwy > upper_bound)]

Фильтр Хэмпеля

Мы используем реализацию фильтра Хэмпеля, найденную на StackOverflow

Опишем функцию, которая заменяет на nan все значения, у которых разница с медианой больше, чем три медианных абсолютных отклонения.

def hampel(vals_orig):
    vals = vals_orig.copy()    
    difference = np.abs(vals.median()-vals)
    median_abs_deviation = difference.median()
    threshold = 3 * median_abs_deviation
    outlier_idx = difference > threshold
    vals[outlier_idx] = np.nan
    return(vals)

И применим к нашему набору данных:

hampel(df.hwy)

0      29.0
1      29.0
2      31.0
3      30.0
4      26.0
       ... 
229    28.0
230    29.0
231    26.0
232    26.0
233    26.0
Name: hwy, Length: 234, dtype: float64

В выводе нет nan-значений, а значит и выбросов фильтр Хэмпеля не обнаружил.

Тест Граббса

Автор реализации теста Граббса и теста Рознера для Python

Опишем три функции: первая находит значение критерия Граббса и максимальное значение в наборе данных, вторая — критическое значение с учётом объёма выборки и уровня значимости, а третья проверяет, является ли значение с максимальным индексом выбросом:

import numpy as np
from scipy import stats

def grubbs_stat(y):
    std_dev = np.std(y)
    avg_y = np.mean(y)
    abs_val_minus_avg = abs(y - avg_y)
    max_of_deviations = max(abs_val_minus_avg)
    max_ind = np.argmax(abs_val_minus_avg)
    Gcal = max_of_deviations / std_dev
    print(f"Grubbs Statistics Value: {Gcal}")
    return Gcal, max_ind

def calculate_critical_value(size, alpha):
    t_dist = stats.t.ppf(1 - alpha / (2 * size), size - 2)
    numerator = (size - 1) * np.sqrt(np.square(t_dist))
    denominator = np.sqrt(size) * np.sqrt(size - 2 + np.square(t_dist))
    critical_value = numerator / denominator
    print(f"Grubbs Critical Value: {critical_value}")
    return critical_value

def check_G_values(Gs, Gc, inp, max_index):
    if Gs > Gc:
        print(f"{inp[max_index]} is an outlier")
    else:
        print(f"{inp[max_index]} is not an outlier")

Заменим значение в 34 строке на 212:

df.hwy[34] = 212

И выполним три функции:

Gcritical = calculate_critical_value(len(df.hwy), 0.05)
Gstat, max_index = grubbs_stat(df.hwy)
check_G_values(Gstat, Gcritical, df.hwy, max_index)

Grubbs Critical Value: 3.652090929984981
Grubbs Statistics Value: 13.745808761040397
212 is an outlier

Тест Рознера

Для теста Рознера достаточно дописать одну функцию, которая принимает набор данных, уровень значимости и число потенциальных выбросов:

def ESD_test(input_series, alpha, max_outliers):
    for iteration in range(max_outliers):
        Gcritical = calculate_critical_value(len(input_series), alpha)
        Gstat, max_index = grubbs_stat(input_series)
        check_G_values(Gstat, Gcritical, input_series, max_index)
        input_series = np.delete(input_series, max_index)

Используя функцию на нашем наборе данных получаем, что значение 212 является выбросом, а 44 — нет:

ESD_test(np.array(df.hwy), 0.05, 3)

Grubbs Critical Value: 3.652090929984981
Grubbs Statistics Value: 13.745808761040408
212 is an outlier
Grubbs Critical Value: 3.6508358337727187
Grubbs Statistics Value: 3.455960616168714
44 is not an outlier
Grubbs Critical Value: 3.649574509044683
Grubbs Statistics Value: 3.5561478280392245
44 is not an outlier
Ранее Ctrl + ↓