Source code for hamilton.data_quality.base
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import abc
import dataclasses
import enum
import inspect
import logging
from typing import Any
logger = logging.getLogger(__name__)
class DataValidationError(Exception):
pass
class DataValidationLevel(enum.Enum):
WARN = "warn"
FAIL = "fail"
@dataclasses.dataclass
class ValidationResult:
passes: bool # Whether or not this passed the validation
message: str # Error message or success message
diagnostics: dict[str, Any] = dataclasses.field(
default_factory=dict
) # Any extra diagnostics information needed, free-form
class DataValidator(abc.ABC):
"""Base class for a data quality operator. This will be used by the `data_quality` operator"""
def __init__(self, importance: str):
self._importance = DataValidationLevel(importance)
@property
def importance(self) -> DataValidationLevel:
return self._importance
@abc.abstractmethod
def applies_to(self, datatype: type[type]) -> bool:
"""Whether or not this data validator can apply to the specified dataset
:param datatype:
:return: True if it can be run on the specified type, false otherwise
"""
pass
@abc.abstractmethod
def description(self) -> str:
"""Gives a description of this validator. E.G.
`Checks whether the entire dataset lies between 0 and 1.`
Note it should be able to access internal state (E.G. constructor arguments).
:return: The description of the validator as a string
"""
pass
@classmethod
@abc.abstractmethod
def name(cls) -> str:
"""Returns the name for this validator."""
@abc.abstractmethod
def validate(self, dataset: Any) -> ValidationResult:
"""Actually performs the validation. Note when you
:param dataset: dataset to validate
:return: The result of validation
"""
pass
[docs]
class AsyncDataValidator(DataValidator, abc.ABC):
"""Base class for an async data quality operator. Use this when validation requires async operations
(e.g. async database queries, async API calls). Must be used with AsyncDriver."""
[docs]
@abc.abstractmethod
async def validate(self, dataset: Any) -> ValidationResult:
"""Asynchronously performs the validation.
:param dataset: dataset to validate
:return: The result of validation
"""
pass
def is_async_validator(validator: DataValidator) -> bool:
"""Checks whether a validator's validate method is a coroutine function.
:param validator: The validator to check
:return: True if the validator's validate method is async
"""
return inspect.iscoroutinefunction(validator.validate)
def act_warn(node_name: str, validation_result: ValidationResult, validator: DataValidator):
"""This is the current default for acting on the validation result when you want to warn.
Note that we might move this at some point -- we'll want to make it configurable. But for now, this
seems like a fine place to put it.
:param node_name: the name of the node we are validating.
:param validation_result: the result
:param validator: the validator object.
"""
if not validation_result.passes:
logger.warning(_create_error_string(node_name, validation_result, validator))
def _create_error_string(node_name, validation_result, validator):
return (
f"[{node_name}:{validator.name()}] validator failed. Message was: {validation_result.message}. "
f"Diagnostic information is: {validation_result.diagnostics}."
)
def act_fail_bulk(node_name: str, failures: list[tuple[ValidationResult, DataValidator]]):
"""This is the current default for acting on the validation result when you want to fail.
Note that we might move this at some point -- we'll want to make it configurable. But for now, this
seems like a fine place to put it.
:param node_name: the name of the node we are validating.
:param failures: list of tuples of (validation_result, validator)
:raises DataValidationError: if there are errors detected.
"""
error_messages = []
for validation_result, validator in failures:
if not validation_result.passes:
message = f"{_create_error_string(node_name, validation_result, validator)}\n"
logger.error(message) # log here so things print nicely at least
error_messages.append(message)
if error_messages:
raise DataValidationError(error_messages)
class BaseDefaultValidator(DataValidator, abc.ABC):
"""Base class for a default validator.
These are all validators that utilize a single argument to be passed to the decorator check_output.
check_output can thus delegate to multiple of these. This is an internal abstraction to allow for easy
creation of validators.
"""
def __init__(self, importance: str):
super(BaseDefaultValidator, self).__init__(importance)
@classmethod
@abc.abstractmethod
def applies_to(cls, datatype: type[type]) -> bool:
pass
@abc.abstractmethod
def description(self) -> str:
pass
@abc.abstractmethod
def validate(self, data: Any) -> ValidationResult:
pass
@classmethod
@abc.abstractmethod
def arg(cls) -> str:
"""Yields a string that represents this validator's argument.
@check_output() will be passed a series of kwargs, each one of which will correspond to
one of these default validators. Note that we have the limitation of allowing just a single
argument.
:return: The argument that this needs.
"""
pass
@classmethod
def name(cls) -> str:
return f"{cls.arg()}_validator"
[docs]
class AsyncBaseDefaultValidator(BaseDefaultValidator, abc.ABC):
"""Base class for an async default validator.
Async variant of BaseDefaultValidator for validators that require async operations.
Must be used with AsyncDriver.
"""
[docs]
@abc.abstractmethod
async def validate(self, data: Any) -> ValidationResult:
pass