Рассказываем, как мы заставили GX подружиться с Impala.
18 открытий52 показов
Всем привет, меня зовут Ольга Вишницкая, и я работаю главным аналитиком данных в одном из департаментов Газпромбанка. Работа эта, помимо прочего, сводится к постоянному наблюдению за новыми инструментами, призванными хоть немного облегчить жизнь тому, кто имеет дело с массивами данных. Мы тестируем, пробуем, отбрасываем неудачное и внедряем полезное.
В какой-то момент наши коллеги, отвечающие за качество данных, обратили внимание на Great Expectations (GX). Библиотека многообещающая: умеет валидировать, профилировать и даже составлять документацию автоматически. Однако есть одно «но»: Impala, наш основной SQL-движок для Hadoop, официально не поддерживается.
Мы решили схитрить и пустить данные через pandas DataFrame — инструмент, с которым GX ладит отлично. Тестовые прогонки показали обнадеживающие результаты, но когда дело дошло до настоящих объемов, стало ясно: DataFrame справляется с 15 000 строк за раз, а дальше — дробление, тормоза, потеря части функционала. Итог: сутки на обработку вместо минут, которые нужны Impala.
Вывод напрашивался сам собой: надо заставить GX работать с Impala напрямую. Теоретически это возможно — Impala ведь оперирует теми же данными куда быстрее. Практически же — GX о подобной интеграции ничего не знает. Документация хранит молчание. Пришлось экспериментировать.
После нескольких попыток, ошибок и обходных маневров мы нашли решение. Им и хочу поделиться.
Примечание: в статье описана работа с GX версии 0.17.19. Вышли новые релизы, но общий подход и логика решений остались прежними.
Настройка проекта: подключаем Impala и кастомные проверки
Подключение к базе данных
Перед тем, как приступить к работе, необходимо настроить подключение к Impala и организовать структуру проекта. Начнем с самого основного — соединения с базой данных:
from sqlalchemy import create_engine engine = create_engine('impala://host:port/database')
Для этого используем функцию create_engine из SQLAlchemy, которая позволяет установить связь с Impala. Более детальную информацию о возможных параметрах конфигурации можно найти в официальной документации.
Добавляем кастомные проверки
Теперь пришло время интегрировать собственные проверки. К сожалению, GX не предоставляет прямого механизма подключения проверок из произвольных папок — в документации указаны лишь способы через plugins или GitHub. Однако решение оказалось довольно простым:
- Создаем папку для пользовательских проверок в директории с основным кодом GX;
- Подключаем нужные проверки в коде, указав путь к ним:
from .custom_expectations.expect_column_values_to_match_regex_impala import ( ExpectColumnValuesToMatchRegexImpala )
Так, мы обеспечиваем корректное подключение кастомных правил в систему, позволяя использовать их наряду со стандартными инструментами Great Expectations.
Основные проблемы, с которыми мы столкнулись
Проблема №1: Регулярные выражения
Первым камнем преткновения стала попытка использовать проверки на регулярные выражения — expect_column_values_to_match_regex. При запуске на Impala они попросту не срабатывали, так как некоторые выражения Regex не поддерживались для этого диалекта.
Ошибки выглядели так:
'ImpalaDialect' object has no attribute 'dialect' Regex is not supported for dialect <impala.sqlalchemy.ImpalaDialect object at 0x7f721f793ca0>
У нас было два пути:
- Сделать
pull request
с изменениями в библиотеку GX. Но это означало бы долгое ожидание и, вероятно, последующие сложности с обновлениями версии в банковской инфраструктуре. - Создать свою собственную проверку, игнорируя диалект и подгоняя регулярные выражения под Impala. Именно этот путь мы и выбрали.
Решение
Сначала копируем исходный файл проверки регулярных выражений в свою директорию. Не забываем изменить его название, а также поменять наименование проверки во всех местах, где она указана. В нашем случае мы добавили суффикс _impala
:
map_metric = "column_values.match_regex_impala"
В файле expect_column_values_to_match_regex_impala подключаем файл с проверкой:
from .column_values_match_regex_impala import (ColumnValuesMatchRegexImpala)
Главное — заменить все названия проверок на свои.
Далее копируем еще один файл в свою директорию, меняем его название. В самом файле модифицируем переменные condition_metric_name
и regex_expression. Метод SQLAlchemy использует функцию get_dialect_regex_expression
, которая вызывает ошибку диалекта, так как Impala не входит в поддерживаемые базы данных. Поэтому мы отключаем вызов этой функции и формируем наше регулярное выражение (переменная regex_expression
) под Impala, взяв за основу выражение из MySQL:
class ColumnValuesMatchRegexImpala(ColumnMapMetricProvider): condition_metric_name = "column_values.match_regex_impala" regex_expression = BinaryExpression( column, literal(regex), custom_op("REGEXP") )
Теперь формируем условие регулярного выражения с помощью класса BinaryExpression из SQLAlchemy, где:
column
— проверяемая колонкаsqlalchemy.literal(regex)
— регулярное выражениеsqlalchemy.custom_op("REGEXP")
— оператор
После выполнения этого выражения получается строка вида table_nm REGEXP ‘%abv’, в которой:
table_nm
— колонкаREGEXP
— оператор'%abv'
— регулярное выражение
Важно! Не забудьте добавить подключение BinaryExpression. Используйте literal(regex)
. Именно literal — без него не отработает.
from sqlalchemy.sql.elements import BinaryExpression, literal
Проблема №2: Зарезервированные слова
В процессе работы GX на Impala мы столкнулись с синтаксическими ошибками. Как оказалось, причина заключалась в том, что GX, при формировании запросов к базе данных, использует зарезервированные в Impala слова. Эти слова не могут быть использованы в качестве идентификаторов в базе данных. Вот примеры ошибок:
DBAPIError: (impala.error.HiveServer2Error) ParseException: Syntax error in line 2: FROM (SELECT sum(condition) AS unexpected_count ^ Encountered: A reserved word cannot be used as an identifier: condition Expected: ALL, CASE, CAST, DATE, DEFAULT, DISTINCT, EXISTS, FALSE, IF, INTERVAL, LEFT, NOT, NULL, REPLACE, RIGHT, TRUNCATE, TRUE, IDENTIFIER
Syntax error in line 3: ...d IN ('0') THEN CAST(1 AS NUMERIC) ELSE CAST(0 AS NUME... ^ Encountered: A reserved word cannot be used as an identifier: NUMERIC Expected: ARRAY, BIGINT, BINARY, BOOLEAN, CHAR, DATE, DATETIME, DECIMAL, REAL, FLOAT, INTEGER, MAP, SMALLINT, STRING, STRUCT, TIMESTAMP, TINYINT, VARCHAR
Решение
Для устранения данной проблемы необходимо заменить метку (label) на другое подходящее имя, которое не вызывает конфликтов с диалектом. Например, в первой ошибке ключевым словом было condition, и мы заменили его на другое, обновив все файлы, где использовалась эта проверка.
В случае с типом NUMERIC достаточно просто заменить его на INTEGER, который как раз и предложен в ошибке в качестве корректного варианта.
Вот пример изменений в коде:
Было:
# The integral values are cast to SQL Numeric in order to avoid a bug in AWS Redshift (converted to integer later). count_case_statement: List[sqlalchemy.Label] = sa.case( ( unexpected_condition, sa.sql.expression.cast(1, sa.NUMERIC), ), else_=sa.sql.expression.cast(0, sa.NUMERIC), ).label("condition")
Стало:
# The integral values are cast to SQL Numeric in order to avoid a bug in AWS Redshift (converted to integer later). count_case_statement: List[sqlalchemy.Label] = sa.case( ( unexpected_condition, sa.sql.expression.cast(1, sa.Integer), ), else_=sa.sql.expression.cast(0, sa.Integer), ).label("condition2")
Чтобы понять, в каких именно файлах необходимо внести изменения, мы обратились к логам. Логи содержат информацию о пути возникновения ошибок. Так мы находили проблемные файлы на GitHub, а затем прослеживали цепочку подключений, чтобы выяснить, откуда пришла функция проверки.
Проблема №3: Подзапросы в Impala
Следующая трудность, с которой мы столкнулись, касалась использования подзапросов. При работе с некоторыми проверками GX (например, ExpectColumnValuesToBeUnique) мы сталкивались с ошибкой:
AnalysisException: Subqueries are not supported in the select list.
Дело в том, что GX использует вложенные подзапросы в разделе SELECT, которые запрещены в Impala.
Решение
Эту проблему можно обойти, если вынести не поддерживаемый Impala подзапрос в отдельный запрос и его результат подставить в основной подзапрос.
За основу мы взяли запрос из переменной unexpected_condition в файле map_condition_auxilliary_methods.py, вывели его через print и с помощью методов конструктора запросов SqlAlchemy сформировали такой же запрос в поддерживаемом Impala формате.
Вот как выглядит исходный запрос:
SELECT `UnexpectedCountSubquery`.unexpected_count FROM (SELECT sum(condition2) AS unexpected_count FROM (SELECT CASE WHEN (product_cd IS NOT NULL AND product_cd IN (SELECT product_cd FROM (SELECT * FROM sbx_041.kva_xref_product_ge_test WHERE product_open_dt = %(product_open_dt_1)s) AS anon_2 GROUP BY product_cd HAVING count(product_cd) > %(count_1)s)) THEN CAST(%(param_1)s AS INTEGER) ELSE CAST(%(param_2)s AS INTEGER) END AS condition2 FROM (SELECT * FROM sbx_041.kva_xref_product_ge_test WHERE product_open_dt = %(product_open_dt_1)s) AS anon_2) AS anon_1) AS `UnexpectedCountSubquery`
Из этого запроса нужно выделить подзапрос, который Impala не может обработать:
SELECT product_cd FROM (SELECT * FROM baza.test WHERE product_open_dt = %(product_open_dt_1)s) AS anon_2 GROUP BY product_cd HAVING count(product_cd) > %(count_1)s
С помощью метода SqlAlchemy этот запрос будет выглядеть так:
query = ( sa.select(sa.column(domain_kwargs['column'])) .select_from(selectable) .group_by(sa.column(domain_kwargs['column'])) .having(sa.func.count(sa.column(domain_kwargs['column'])) > 1) )
После выполнения мы получаем результат (назовем его условно result_impala) и подставляем в окончательный запрос:
SELECT "UnexpectedCountSubquery".unexpected_count FROM (SELECT sum(condition2) AS unexpected_count FROM (SELECT CASE WHEN product_cd IS NOT NULL AND product_cd IN (result_impala) THEN CAST(:param_1 AS INTEGER) ELSE CAST(:param_2 AS INTEGER) END AS condition2 FROM (SELECT * FROM baza.test WHERE product_open_dt = :product_open_dt_1) AS anon_2) AS anon_1) AS "UnexpectedCountSubquery"
Последовательность замены файлов
В процессе работы над проблемами, связанными с зарезервированными словами и подзапросами, нам понадобилось заменить несколько файлов. Вот полная последовательность этих изменений:
- Заменяем наименование проверок (чтобы они отличались от стандартных вариантов в GX) в подключении и классах: great_expectations/expectations/core/expect_column_values_to_be_unique.py
- Меняем только подключение в следующем файле: great_expectations/expectations/metrics/column_map_metrics/column_values_unique.py
- Выполняем изменения здесь: great_expectations/expecttions/metrics/map_metric_provider/column_map_metric_provider.py
- Также изменяем этот файл: great_expectations/expectations/metrics/map_metric_provider/map_metric_provider.py
- И, наконец, меняем файл, как описано в разделе «Проблема №1: Регулярные выражения»: great_expectations/expectations/metrics/map_metric_provider/ map_condition_auxilliary_methods.py
В результате наших поисков и экспериментов мы обрели решение, которое соединяет GX с Impala. Да, порой наш путь не был изысканным — подмена файлов библиотеки могла бы вызвать сомнения у стороннего наблюдателя. Но в тех случаях, когда нужно было получить работающий инструмент немедленно, этот способ оказался вполне оправданным. И если у вас есть мысли о том, как можно было бы обойтись без таких компромиссов, — смело делитесь ими в комментариях.