Learn With Me: Great Expectations

Ever since I was introduced to Great Expectations (GX) I've wanted to find a way to incorporate it into a project. Ensuring the accuracy and validity of our data at rest is key to catching issues before they arise and to maintaining trust in our business systems.
Before working on an official implementation, I decided I would go ahead and dive in just to explore what the GX library has to offer.
"GX is a Python library that provides a framework for describing the acceptable state of data and then validating that the data meets those criteria."
I chose to combine GX with one of my other favorite Python libraries - simple-salesforce. With Simple Salesforce you can run queries via API and transform the data into a Pandas DataFrame.
GX allows you to run expectations directly on a DataFrame, so this seemed like a quick way to get started. I went through the local setup steps, which I won't go into detail on here since they're well documented in the the GX Quick Start. I then created a couple of methods to retrieve data from my Salesforce DevHub Org, and to run a basic expectation validation.
There are a few authentication protocols available for Simple Salesforce, here I'm using the basic username, password, and security token for ease of set up. I wrote a simple query that grabs all the contacts in the org and returns the records in a DataFrame.
import simple_salesforce
import pandas as pd
def get_contacts():
# Authenticate to Salesforce
sf = simple_salesforce.Salesforce(
username = SFDC_USERNAME,
password = SFDC_PASSWORD,
security_token = SFDC_TOKEN
)
# Create a base query for the data you want to validate
query = """
SELECT Id, Name, Email, Phone, MobilePhone, MailingStreet, MailingCity, MailingState, MailingPostalCode, MailingCountry
FROM Contact
"""
# Turn the results into a DataFrame
data = sf.query(query)
df = pd.DataFrame(data['records']).drop(['attributes'], axis=1)
return df
To run the validation we'll need the GX library and a data context configuration that supports the in-memory processing of the DataFrame.
import great_expectations as gx
import pandas as pd
import salesforce_data_retriever as sfdc
from great_expectations.data_context.types.base import (
DataContextConfig,
InMemoryStoreBackendDefaults
)
from great_expectations.data_context import EphemeralDataContext
There are a few configuration steps required to run our validation. Again, the GX documentation is great and explains what each of these components do and how they fit together, but for the sake of brevity I'm glossing over them.
We pull in our config and create a data context, add a Pandas data source to the context and organize into a data asset, and then build a batch request on that asset. Next, we add an expectation suite to the context and instantiate a validator using our batch request and suite.
Finally, we are ready to use the validator and get our expectation result. I'm using the expectation expect_column_values_to_not_be_null
, and validating against the Contact.Phone
field. (see the full expectation gallery here)
def run_validation():
# Setup the project configuration
project_config = DataContextConfig(
store_backend_defaults=InMemoryStoreBackendDefaults()
)
context = EphemeralDataContext(project_config=project_config)
# Connect to your Data Source
datasource = context.sources.add_pandas(name="salesforce_pandas_datasource")
df = sfdc.get_contacts()
asset_name = 'salesforce_contacts'
datasource.add_dataframe_asset(name=asset_name, dataframe=df)
# Organize into a Data Asset
data_asset = context.get_datasource("salesforce_pandas_datasource").get_asset(asset_name)
# Build a Batch Request with the asset
batch_request = data_asset.build_batch_request()
# Add an Expectation Suite to the Context
context.add_or_update_expectation_suite(expectation_suite_name='salesforce_suite')
# Create a Validator and run the Validation
validator = context.get_validator(batch_request=batch_request, expectation_suite_name='salesforce_suite')
expectation_validation_result = validator.expect_column_values_to_not_be_null(column="Phone")
# Print out the results
print(expectation_validation_result)
I have a couple of Contact records in the org that are missing phone numbers, so when I print out the result we get:
{
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
},
"result": {
"element_count": 56,
"unexpected_count": 3,
"unexpected_percent": 5.357142857142857,
"partial_unexpected_list": [
null,
null,
null
]
},
"meta": {},
"success": false,
"expectation_config": {
"expectation_type": "expect_column_values_to_not_be_null",
"meta": {},
"kwargs": {
"column": "Phone",
"batch_id": "salesforce_pandas_datasource-salesforce_contacts"
}
}
}
You can see out of the 56 total Contact records returned by the query, 3 of them have null values for Phone. While this is a straightforward example, it's easy to imagine how quickly the trust and data validity of an org would grow by running recurring checks, validating data using a variety of expectations, and alerting administrators and developers of violations.
As the complexity of a CRM grows, layering in validations is vital for catching bugs, identifying flawed user workflows, finding rouge automations, and more. I'm excited to continue exploring specifically how GX could be incorporated with Salesforce. There are obvious limitations to the in-memory context, so it would be interesting to look into other methods for using Salesforce as a data source.