ProcessTracker - Python

Below is a deeper dive of the capabilities of the Python implementation ProcessTracker submodule.

Initialize Process Run

As referenced in the ::doc::Overview <overview.rst>, ProcessTracker must first be imported and then the process can be registered by setting the variables of the ProcessTracker object:

from processtracker import ProcessTracker
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local")\
                            .appName("Lahman Teams Load")\
                            .config("spark.executor.memory", "6gb")\
                            .enableHiveSupport()\
                            .getOrCreate()

process_run = ProcessTracker(process_name='Lahman Teams Load'
                                     , process_type='Stage Load'
                                     , actor_name='New ProcessTracker User'
                                     , tool_name='Spark'
                                     , sources='Lahman Baseball Dataset')

Those variables will be used to populate the data store backend as explained in the following table:

ProcessTracker object initialization variables
Variable Name Variable Description Reference Object Object Created If Not Exist?
process_name The name of the process being run. Must be unique. Process Yes
process_type The type of process being run. Process Type Yes
actor_name The person or thing kicking off the process run. Recommended to be variable driven. :ref`actor_lkup` Yes
tool_name The tool being used to kick off the process run. Recommended to be variable driven. Tool Yes
sources Single or list of source names where data is read from. Optional. Source Yes
source_objects Dictionary of lists of source objects with source name as key where data is read from. Optional. Source Object, Process Source Object Yes
targets Single or list of target names (alias of source) where data is written to. Optional. Source Yes
target_objects Dictionary of lists of target source_objects with target name as key where data is read from. Optional. Source Object, Process Target Object Yes

Once the instance has been instantiated, the rest of the options listed below become available.

Change Process Run Status

Throughout the process run the process run’s status will need to be changed, usually to successful completion or to failure. ProcessTracker does allow for user defined process statuses, but the process run must finish with one of the system provided statuses if the process run is to work correctly with the rest of the system.

System provided statuses can be found at Process Status.:

process_run.change_run_status('completed')

Custom status types can be added either with the CLI tool or by entering the custom status in the change_run_status command. For instance:

process_run.change_run_status('my custom status')

Triggering Errors

Errors are custom failure messages that can be pretty much anything one would want to track during a process run. They do not necessarily trigger a process run to fail.:

process_run.raise_run_error(error_type_name='Data Error'
                           , error_description='Data item out of bounds.')

This raises an error stating an item was out of bounds for what we normally look for, but doesn’t trigger the process run to fail because the hidden flag fail_run is defaulted to false. To fail a run set the flag to True.:

process_run.raise_run_error(error_type_name='Data Error'
                           , error_description='Data item out of bounds.'
                           , fail_run=True)

Another option for raising a run error is to set an end_date - this is if you want tighter control of the timestamps between ProcessTracker and any other logging you may have. This is not required because we are ideally talking about milliseconds between recording this error and writing to the log file.:

process_run.raise_run_error(error_type_name='Data Error'
                           , error_description='Data item out of bounds.'
                           , fail_run=True
                           , end_date=process_specific_datettime)

Auditing Processes

Auditing is a key feature of the ProcessTracker framework. Here are the available auditing options.

Setting Data Low/High Dates

It is important to know the data range of the data that is being processed by a run. This is where the low/high dates comes to play. The low date is the lowest date available from the data being processed. The high date is the highest date avilable from the data being processed. If audit dates are not provided with the data then the extract datetime can be utilized. If neither are available, then this audit option can’t really be used.:

process_run.set_process_run_low_high_dates(low_date=extract_low_datetime
                                          , high_date=extract_high_datetime)

If a lower or higher datetime is registered, the previous datetimes will be compared and whichever is lower of the two low dates and higher of the two high dates will be kept. While this can be set via loop, it is recommended to find the low and high dates in the set before calling set_process_run_low_high_dates() as it does make a insert/update per call.

Setting Record Count

It is important to know how many records the process and process run have processed. This can aid capacity and resource planning, especially if the information is tracked over time.:

process_run.set_process_run_record_count(num_records=10000)
set_process_run_record_count does two things:
  • sets the process run record’s total record count (wiping out the previous
value)
  • sets the process’ total record count (cumulative)

It is recommended that the number of records be determined on a per extract file or a cumulative total before setting the record count.

Tracking Process Sources

Processes can have sources associated for auditing purposes. There are two methods for tracking sources - source level and source object level.

Source Level

Source level tracking can be done by including a single source name or list of source names on process initialization. For example::

process_run = ProcessTracker(process_name='Lahman Teams Load'
                             , process_type='Stage Load'
                             , actor_name='New ProcessTracker User'
                             , tool_name='Spark'
                             , sources='Lahman Baseball Dataset')

For multiple sources::

process_run = ProcessTracker(process_name='Lahman Teams Load'
                             , process_type='Stage Load'
                             , actor_name='New ProcessTracker User'
                             , tool_name='Spark'
                             , sources={'Lahman Baseball Dataset', 'Another Baseball Dataset'})

Source Object Level

Source Object level tracking is done in a similar way as above. Regardless of being a single source object, multiple source objects, or multiple sources with single or multiple objects, source object level tracking is done via a dictionary of lists.:

process_run = ProcessTracker(process_name='Lahman Teams Load'
                             , process_type='Stage Load'
                             , actor_name='New ProcessTracker User'
                             , tool_name='Spark'
                             , source_objects={"Lahman Baseball Dataset": ["Team.csv", "Player.csv"]}

For multiple sources::

process_run = ProcessTracker(process_name='Lahman Teams Load'
                             , process_type='Stage Load'
                             , actor_name='New ProcessTracker User'
                             , tool_name='Spark'
                             , source_objects={"Lahman Baseball Dataset": ["Team.csv", "Player.csv"]
                                               , "Another Baseball Dataset": ["Team", "Player"]}

Note that sources is not set. The sources variable will be ignored if source_objects is set.

Tracking Process Targets

Processes can have targets associated for auditing purposes. There are two methods for tracking targets - target level and target object level. Remember target is just an alias of source. All targets and sources are stored in the Source table.

Target Level

Target level tracking can be done by including a single target name or list of target names on process initialization. For example::

process_run = ProcessTracker(process_name='Lahman Teams Load'
                             , process_type='Stage Load'
                             , actor_name='New ProcessTracker User'
                             , tool_name='Spark'
                             , targets='My Baseball Datastore')

For multiple targets::

process_run = ProcessTracker(process_name='Lahman Teams Load'
                             , process_type='Stage Load'
                             , actor_name='New ProcessTracker User'
                             , tool_name='Spark'
                             , targets={'My Baseball Datastore', 'A Different Baseball Datastore'})

Target Object Level

Target Object level tracking is done in a similar way as above. Regardless of being a single target object, multiple target objects, or multiple targets with single or multiple targets, target object level tracking is done via a dictionary of lists.:

process_run = ProcessTracker(process_name='Lahman Teams Load'
                             , process_type='Stage Load'
                             , actor_name='New ProcessTracker User'
                             , tool_name='Spark'
                             , target_objects={"My Baseball Datastore": ["team", "player"]}

For multiple targets::

process_run = ProcessTracker(process_name='Lahman Teams Load'
                             , process_type='Stage Load'
                             , actor_name='New ProcessTracker User'
                             , tool_name='Spark'
                             , target_objects={"My Baseball Datastore": ["team", "player"]
                                               , "A Different Baseball Datastore": ["Team", "Player"]}

Note that targets is not set. The targets variable will be ignored if target_objects is set.

Process Extracts

The other element to processing data is the extract files that may be used in the process or between processes. Note that using this is not required if extract files are not used. Extracts are always associated with a process run, which is why the extract functionality is primarily tied to the ProcessTracker submodule.

Finding Extracts

Extract files can be found in a few different ways. Finders will return extracts in ‘ready’ state by default. Other statuses can be searched for if required by adding the status variable. The finders also will only return extract files that have been registered in ProcessTracker.

By Filename

Full Filename

So let’s say that you know that there is a specific file that needs processing. You can search for a specific file by:

process_run.find_extracts_by_filename(filename='my_file.csv')

This will return the ExtractTracking object, which includes the location of the file.

Partial Filename

Let’s say that you know that the files you are looking for match a specific pattern, for example::

my_file_2019_01_01.csv
my_file_2019_02_01.csv
...

Instead of looking for each file one at a time, you can use the partial filename::

process_run.find_extracts_by_filename(filename='my_file_')

This will return the ExtractTracking object, which included the location of the file. This function is greedy meaning it will return ANY files with ‘my_file’ in the filename. For instance::

my_file_2019_01_01.csv
this_is_my_file.xls
2019-01-01-my_file.csv

By Location

Locations are the filepaths where extract files are stored. These can be local, a network drive, or a cloud directory.:

process_run.find_extracts_by_location(location='My Location')

The location name is used and the ExtractTracking object(s) are returned.

By Process

If the process has a parent process that creates files for it, or there is a process that produces files that will be used then the parent process’ name can be used to find any ready extracts::

process_run.find_extracts_by_process(extract_process_name='My Super Cool Process')

This will find all extract files associated to that process that are in ‘ready’ state and return their ExtractTracking objects.

Finding Extracts By Other Statuses

All finder methods have a status variable with a default of ‘ready’. To search by another status type, just modify the variable::

process_run.find_extracts_by_location(location='My Location', status='completed')

The status type must exist in Extract Status.

Registering Extracts

If your process is creating extract files, they will need to be registered. They can either be registered one at a time as noted in ExtractTracker or one of the below helper methods.

By Location

This will attempt to access the given location and find all files stored there. If the files are not already registered they will be processed, otherwise they will be ignored.:

process_run.register_extracts_by_location(location_path='/path/to/files')

Currently, this only supports local filepaths.

By Process

This method is explained over in ExtractTracker.

Bulk Extract Update

Extracts can also be processed in bulk. If you use one of the lookup functions, it returns a list of extract file objects. Passing that list to the bulk_change_extract_status method will associate those extracts with the process and bulk update their status.:

process_run.bulk_change_extract_status(extracts=extract_list, extract_status="loading")

Please note, that while going through the list if any of the extracts are interdependent of each other and the parent dependency has not been loaded, the process will currently fail to protect data continuity.