Главная Веб-разработка Как настроить интеграцию между Great Expectations и Impala для работы с большими данными

Как настроить интеграцию между Great Expectations и Impala для работы с большими данными

от admin

Рассказываем, как мы заставили GX подружиться с Impala.

18 открытий52 показов

Всем привет, меня зовут Ольга Вишницкая, и я работаю главным аналитиком данных в одном из департаментов Газпромбанка. Работа эта, помимо прочего, сводится к постоянному наблюдению за новыми инструментами, призванными хоть немного облегчить жизнь тому, кто имеет дело с массивами данных. Мы тестируем, пробуем, отбрасываем неудачное и внедряем полезное.

В какой-то момент наши коллеги, отвечающие за качество данных, обратили внимание на Great Expectations (GX). Библиотека многообещающая: умеет валидировать, профилировать и даже составлять документацию автоматически. Однако есть одно «но»: Impala, наш основной SQL-движок для Hadoop, официально не поддерживается.

Мы решили схитрить и пустить данные через pandas DataFrame — инструмент, с которым GX ладит отлично. Тестовые прогонки показали обнадеживающие результаты, но когда дело дошло до настоящих объемов, стало ясно: DataFrame справляется с 15 000 строк за раз, а дальше — дробление, тормоза, потеря части функционала. Итог: сутки на обработку вместо минут, которые нужны Impala.

Вывод напрашивался сам собой: надо заставить GX работать с Impala напрямую. Теоретически это возможно — Impala ведь оперирует теми же данными куда быстрее. Практически же — GX о подобной интеграции ничего не знает. Документация хранит молчание. Пришлось экспериментировать.

После нескольких попыток, ошибок и обходных маневров мы нашли решение. Им и хочу поделиться.

Примечание: в статье описана работа с GX версии 0.17.19. Вышли новые релизы, но общий подход и логика решений остались прежними.

Настройка проекта: подключаем Impala и кастомные проверки

Подключение к базе данных

Перед тем, как приступить к работе, необходимо настроить подключение к Impala и организовать структуру проекта. Начнем с самого основного — соединения с базой данных:

			from sqlalchemy import create_engine engine = create_engine('impala://host:port/database') 		

Для этого используем функцию create_engine из SQLAlchemy, которая позволяет установить связь с Impala. Более детальную информацию о возможных параметрах конфигурации можно найти в официальной документации.

Добавляем кастомные проверки

Теперь пришло время интегрировать собственные проверки. К сожалению, GX не предоставляет прямого механизма подключения проверок из произвольных папок — в документации указаны лишь способы через plugins или GitHub. Однако решение оказалось довольно простым:

  1. Создаем папку для пользовательских проверок в директории с основным кодом GX;
  2. Подключаем нужные проверки в коде, указав путь к ним:
			from .custom_expectations.expect_column_values_to_match_regex_impala import (     ExpectColumnValuesToMatchRegexImpala ) 		

Так, мы обеспечиваем корректное подключение кастомных правил в систему, позволяя использовать их наряду со стандартными инструментами Great Expectations.

Основные проблемы, с которыми мы столкнулись

Проблема №1: Регулярные выражения

Первым камнем преткновения стала попытка использовать проверки на регулярные выражения — expect_column_values_to_match_regex. При запуске на Impala они попросту не срабатывали, так как некоторые выражения Regex не поддерживались для этого диалекта.

Ошибки выглядели так:

			'ImpalaDialect' object has no attribute 'dialect' Regex is not supported for dialect <impala.sqlalchemy.ImpalaDialect object at 0x7f721f793ca0> 		

У нас было два пути:

  1. Сделать pull request с изменениями в библиотеку GX. Но это означало бы долгое ожидание и, вероятно, последующие сложности с обновлениями версии в банковской инфраструктуре.
  2. Создать свою собственную проверку, игнорируя диалект и подгоняя регулярные выражения под Impala. Именно этот путь мы и выбрали.

Решение

Сначала копируем исходный файл проверки регулярных выражений в свою директорию. Не забываем изменить его название, а также поменять наименование проверки во всех местах, где она указана. В нашем случае мы добавили суффикс _impala:

			map_metric = "column_values.match_regex_impala" 		

В файле expect_column_values_to_match_regex_impala подключаем файл с проверкой:

			from .column_values_match_regex_impala import (ColumnValuesMatchRegexImpala) 		

Главное — заменить все названия проверок на свои.

Далее копируем еще один файл в свою директорию, меняем его название. В самом файле модифицируем переменные condition_metric_name и regex_expression. Метод SQLAlchemy использует функцию get_dialect_regex_expression, которая вызывает ошибку диалекта, так как Impala не входит в поддерживаемые базы данных. Поэтому мы отключаем вызов этой функции и формируем наше регулярное выражение (переменная regex_expression) под Impala, взяв за основу выражение из MySQL:

			class ColumnValuesMatchRegexImpala(ColumnMapMetricProvider):     condition_metric_name = "column_values.match_regex_impala"     regex_expression = BinaryExpression(         column,          literal(regex),          custom_op("REGEXP")     ) 		

Теперь формируем условие регулярного выражения с помощью класса BinaryExpression из SQLAlchemy, где:

  • column — проверяемая колонка
  • sqlalchemy.literal(regex) — регулярное выражение
  • sqlalchemy.custom_op("REGEXP") — оператор

После выполнения этого выражения получается строка вида table_nm REGEXP ‘%abv’, в которой:

  • table_nm — колонка
  • REGEXP — оператор
  • '%abv' — регулярное выражение

Важно! Не забудьте добавить подключение BinaryExpression. Используйте literal(regex). Именно literal — без него не отработает.

			from sqlalchemy.sql.elements import BinaryExpression, literal 		

Проблема №2: Зарезервированные слова

В процессе работы GX на Impala мы столкнулись с синтаксическими ошибками. Как оказалось, причина заключалась в том, что GX, при формировании запросов к базе данных, использует зарезервированные в Impala слова. Эти слова не могут быть использованы в качестве идентификаторов в базе данных. Вот примеры ошибок:

			DBAPIError: (impala.error.HiveServer2Error) ParseException: Syntax error in line 2: FROM (SELECT sum(condition) AS unexpected_count                 ^ Encountered: A reserved word cannot be used as an identifier: condition Expected: ALL, CASE, CAST, DATE, DEFAULT, DISTINCT, EXISTS, FALSE, IF, INTERVAL, LEFT, NOT, NULL, REPLACE, RIGHT, TRUNCATE, TRUE, IDENTIFIER 		
			Syntax error in line 3: ...d IN ('0') THEN CAST(1 AS NUMERIC) ELSE CAST(0 AS NUME...                                   ^ Encountered: A reserved word cannot be used as an identifier: NUMERIC Expected: ARRAY, BIGINT, BINARY, BOOLEAN, CHAR, DATE, DATETIME, DECIMAL, REAL, FLOAT, INTEGER, MAP, SMALLINT, STRING, STRUCT, TIMESTAMP, TINYINT, VARCHAR 		

Решение

Для устранения данной проблемы необходимо заменить метку (label) на другое подходящее имя, которое не вызывает конфликтов с диалектом. Например, в первой ошибке ключевым словом было condition, и мы заменили его на другое, обновив все файлы, где использовалась эта проверка.

Читать также:
Python venv: что такое виртуальное окружение и как им пользоваться

В случае с типом NUMERIC достаточно просто заменить его на INTEGER, который как раз и предложен в ошибке в качестве корректного варианта.

Вот пример изменений в коде:

Было:

			# The integral values are cast to SQL Numeric in order to avoid a bug in AWS Redshift (converted to integer later). count_case_statement: List[sqlalchemy.Label] = sa.case(     (         unexpected_condition,         sa.sql.expression.cast(1, sa.NUMERIC),     ),     else_=sa.sql.expression.cast(0, sa.NUMERIC), ).label("condition") 		

Стало:

			# The integral values are cast to SQL Numeric in order to avoid a bug in AWS Redshift (converted to integer later). count_case_statement: List[sqlalchemy.Label] = sa.case(     (         unexpected_condition,         sa.sql.expression.cast(1, sa.Integer),     ),     else_=sa.sql.expression.cast(0, sa.Integer), ).label("condition2") 		

Чтобы понять, в каких именно файлах необходимо внести изменения, мы обратились к логам. Логи содержат информацию о пути возникновения ошибок. Так мы находили проблемные файлы на GitHub, а затем прослеживали цепочку подключений, чтобы выяснить, откуда пришла функция проверки.

Проблема №3: Подзапросы в Impala

Следующая трудность, с которой мы столкнулись, касалась использования подзапросов. При работе с некоторыми проверками GX (например, ExpectColumnValuesToBeUnique) мы сталкивались с ошибкой:

			AnalysisException: Subqueries are not supported in the select list. 		

Дело в том, что GX использует вложенные подзапросы в разделе SELECT, которые запрещены в Impala.

Решение

Эту проблему можно обойти, если вынести не поддерживаемый Impala подзапрос в отдельный запрос и его результат подставить в основной подзапрос.

За основу мы взяли запрос из переменной unexpected_condition в файле map_condition_auxilliary_methods.py, вывели его через print и с помощью методов конструктора запросов SqlAlchemy сформировали такой же запрос в поддерживаемом Impala формате.

Вот как выглядит исходный запрос:

			SELECT `UnexpectedCountSubquery`.unexpected_count FROM (SELECT sum(condition2) AS unexpected_count FROM (SELECT CASE WHEN (product_cd IS NOT NULL AND product_cd IN  (SELECT product_cd FROM (SELECT * FROM sbx_041.kva_xref_product_ge_test WHERE product_open_dt = %(product_open_dt_1)s) AS anon_2 GROUP BY product_cd HAVING count(product_cd) > %(count_1)s))  THEN CAST(%(param_1)s AS INTEGER)  ELSE CAST(%(param_2)s AS INTEGER) END AS condition2 FROM (SELECT * FROM sbx_041.kva_xref_product_ge_test WHERE product_open_dt = %(product_open_dt_1)s) AS anon_2) AS anon_1)  AS `UnexpectedCountSubquery` 		

Из этого запроса нужно выделить подзапрос, который Impala не может обработать:

			SELECT product_cd FROM (SELECT * FROM baza.test WHERE product_open_dt = %(product_open_dt_1)s) AS anon_2 GROUP BY product_cd HAVING count(product_cd) > %(count_1)s 		

С помощью метода SqlAlchemy этот запрос будет выглядеть так:

			query = (     sa.select(sa.column(domain_kwargs['column']))     .select_from(selectable)     .group_by(sa.column(domain_kwargs['column']))     .having(sa.func.count(sa.column(domain_kwargs['column'])) > 1) ) 		

После выполнения мы получаем результат (назовем его условно result_impala) и подставляем в окончательный запрос:

			SELECT "UnexpectedCountSubquery".unexpected_count FROM (SELECT sum(condition2) AS unexpected_count FROM (SELECT CASE WHEN product_cd IS NOT NULL AND product_cd IN (result_impala)  THEN CAST(:param_1 AS INTEGER)  ELSE CAST(:param_2 AS INTEGER) END AS condition2 FROM (SELECT * FROM baza.test WHERE product_open_dt = :product_open_dt_1) AS anon_2) AS anon_1)  AS "UnexpectedCountSubquery" 		

Последовательность замены файлов

В процессе работы над проблемами, связанными с зарезервированными словами и подзапросами, нам понадобилось заменить несколько файлов. Вот полная последовательность этих изменений:

  1. Заменяем наименование проверок (чтобы они отличались от стандартных вариантов в GX) в подключении и классах: great_expectations/expectations/core/expect_column_values_to_be_unique.py
  2. Меняем только подключение в следующем файле: great_expectations/expectations/metrics/column_map_metrics/column_values_unique.py 
  3. Выполняем изменения здесь: great_expectations/expecttions/metrics/map_metric_provider/column_map_metric_provider.py
  4. Также изменяем этот файл: great_expectations/expectations/metrics/map_metric_provider/map_metric_provider.py 
  5. И, наконец, меняем файл, как описано в разделе «Проблема №1: Регулярные выражения»: great_expectations/expectations/metrics/map_metric_provider/ map_condition_auxilliary_methods.py

В результате наших поисков и экспериментов мы обрели решение, которое соединяет GX с Impala. Да, порой наш путь не был изысканным — подмена файлов библиотеки могла бы вызвать сомнения у стороннего наблюдателя. Но в тех случаях, когда нужно было получить работающий инструмент немедленно, этот способ оказался вполне оправданным. И если у вас есть мысли о том, как можно было бы обойтись без таких компромиссов, — смело делитесь ими в комментариях.

Похожие статьи