ProcessTracker - Python

Below is a deeper dive of the capabilities of the Python implementation ProcessTracker submodule.

Initialize Process Run

As referenced in the ::doc::Overview <overview.rst>, ProcessTracker must first be imported and then the process can be registered by setting the variables of the ProcessTracker object:

from processtracker import ProcessTracker
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local")\
                            .appName("Lahman Teams Load")\
                            .config("spark.executor.memory", "6gb")\
                            .enableHiveSupport()\
                            .getOrCreate()

process_run = ProcessTracker(process_name='Lahman Teams Load'
                                     , process_type='Stage Load'
                                     , actor_name='New ProcessTracker User'
                                     , tool_name='Spark'
                                     , sources='Lahman Baseball Dataset')

Those variables will be used to populate the data store backend as explained in the following table:

ProcessTracker object initialization variables
Variable Name Variable Description Reference Object Object Created If Not Exist?
process_name The name of the process being run. Must be unique. Process Yes
process_run_name An optional unique name given to an individual process run Process Tracking Yes
process_type The type of process being run. Optional if process already exists. Process Type Yes
actor_name The person or thing kicking off the process run. Recommended to be variable driven. :ref`actor_lkup` Yes
tool_name The tool being used to kick off the process run. Recommended to be variable driven. Tool Yes
sources Single or list of source names where data is read from. Optional. Source Yes
source_objects Dictionary of lists of source objects with source name as key where data is read from. Optional. Source Object, Process Source Object Yes
source_object_attributes Dictionary of lists of source object attributes with source and source object names as keys where data is read from. Optional. source_object_attribute Yes
targets Single or list of target names (alias of source) where data is written to. Optional. Source Yes
target_objects Dictionary of lists of target source_objects with target name as key where data is read from. Optional. Source Object, Process Target Object Yes
target_object_attributes Dictionary of lists of target object attributes with target and target object names as keys where data is read from. Optional. source_object_attribute Yes
config_location Location where Process Tracker configuration file can be found. If not set will use the system home directory and check under .process_tracker for the process_tracker_config.ini file. Optional. Configuration N/A
dataset_types Single or list of dataset category types. Will be associated with the process as well as any extracts, sources/targets, and source/target objects that are associated with the process. Dataset Type, Extract Dataset Type, Process Dataset Type, Source Dataset Type , Source Object Dataset Type No
schedule_frequency The general frequency at which the process should run. ScheduleFrequency No
process_tracking_id When recreating a Process Tracking instance, provide the process_tracking_id and it will re-instantiate as it was originally created, with the current status, objects, etc. Process Tracking, plus all other objects created on instantiation of ProcessTracker No

Once the instance has been instantiated, the rest of the options listed below become available.

Re-initializing An Instance

To set up a previous instance, pass the process tracking id to ProcessTracker. Instead of creating a new instance of the given process, it will retrieve the specific tracking record and all of it’s ancillary data.:

restored_process_run = ProcessTracker(process_tracking_id=123)

This should ideally only be used when a process is still running - like when switching between services within your cloud provider.

Change Process Run Status

Throughout the process run the process run’s status will need to be changed, usually to successful completion or to failure. ProcessTracker does allow for user defined process statuses, but the process run must finish with one of the system provided statuses if the process run is to work correctly with the rest of the system.

System provided statuses can be found at Process Status.:

process_run.change_run_status('completed')

Custom status types can be added either with the CLI tool or by entering the custom status in the change_run_status command. For instance:

process_run.change_run_status('my custom status')

On Hold Processes

Processes that are in the ‘on hold’ status are in that status because the max_concurrent_failures setting was reached. Currently, the only way to move a process out of ‘on hold’ is to manually change the last process run to ‘completed’ or some other status than ‘running’ or ‘failed’.

Triggering Errors

Errors are custom failure messages that can be pretty much anything one would want to track during a process run. They do not necessarily trigger a process run to fail.:

process_run.raise_run_error(error_type_name='Data Error'
                           , error_description='Data item out of bounds.')

This raises an error stating an item was out of bounds for what we normally look for, but doesn’t trigger the process run to fail because the hidden flag fail_run is defaulted to false. To fail a run set the flag to True.:

process_run.raise_run_error(error_type_name='Data Error'
                           , error_description='Data item out of bounds.'
                           , fail_run=True)

Another option for raising a run error is to set an end_date - this is if you want tighter control of the timestamps between ProcessTracker and any other logging you may have. This is not required because we are ideally talking about milliseconds between recording this error and writing to the log file.:

process_run.raise_run_error(error_type_name='Data Error'
                           , error_description='Data item out of bounds.'
                           , fail_run=True
                           , end_date=process_specific_datettime)

Auditing Processes

Auditing is a key feature of the ProcessTracker framework. Here are the available auditing options.

Setting Data Low/High Dates

It is important to know the data range of the data that is being processed by a run. This is where the low/high dates comes to play. The low date is the lowest date available from the data being processed. The high date is the highest date avilable from the data being processed. If audit dates are not provided with the data then the extract datetime can be utilized. If neither are available, then this audit option can’t really be used.:

process_run.set_process_run_low_high_dates(low_date=extract_low_datetime
                                          , high_date=extract_high_datetime)

If a lower or higher datetime is registered, the previous datetimes will be compared and whichever is lower of the two low dates and higher of the two high dates will be kept. While this can be set via loop, it is recommended to find the low and high dates in the set before calling set_process_run_low_high_dates() as it does make a insert/update per call.

Setting Record Count

It is important to know how many records the process and process run have processed. This can aid capacity and resource planning, especially if the information is tracked over time.:

process_run.set_process_run_record_count(num_records=10000)
set_process_run_record_count does two things:
  • sets the process run record’s total record count (wiping out the previous
value)
  • sets the process’ total record count (cumulative)

It is recommended that the number of records be determined on a per extract file or a cumulative total before setting the record count.

Dataset Types

Dataset types are high level categories for the type(s) of data the process and ancilliary objects like extracts, sources/targets, source objects/target objects, etc. can be associated to. Once the variable dataset_types is set there is nothing else required to associate the process and other entities associated to the process to the dataset type(s).

Tracking Process Sources

Processes can have sources associated for auditing purposes. There are two methods for tracking sources - source level and source object level.

Source Level

Source level tracking can be done by including a single source name or list of source names on process initialization. For example::

process_run = ProcessTracker(process_name='Lahman Teams Load'
                             , process_type='Stage Load'
                             , actor_name='New ProcessTracker User'
                             , tool_name='Spark'
                             , sources='Lahman Baseball Dataset')

For multiple sources::

process_run = ProcessTracker(process_name='Lahman Teams Load'
                             , process_type='Stage Load'
                             , actor_name='New ProcessTracker User'
                             , tool_name='Spark'
                             , sources={'Lahman Baseball Dataset', 'Another Baseball Dataset'})

Source Object Level

Source Object level tracking is done in a similar way as above. Regardless of being a single source object, multiple source objects, or multiple sources with single or multiple objects, source object level tracking is done via a dictionary of lists.:

process_run = ProcessTracker(process_name='Lahman Teams Load'
                             , process_type='Stage Load'
                             , actor_name='New ProcessTracker User'
                             , tool_name='Spark'
                             , source_objects={"Lahman Baseball Dataset": ["Team.csv", "Player.csv"]}

For multiple sources::

process_run = ProcessTracker(process_name='Lahman Teams Load'
                             , process_type='Stage Load'
                             , actor_name='New ProcessTracker User'
                             , tool_name='Spark'
                             , source_objects={"Lahman Baseball Dataset": ["Team.csv", "Player.csv"]
                                               , "Another Baseball Dataset": ["Team", "Player"]}

Note that sources is not set. The sources variable will be ignored if source_objects is set.

Tracking Process Targets

Processes can have targets associated for auditing purposes. There are two methods for tracking targets - target level and target object level. Remember target is just an alias of source. All targets and sources are stored in the Source table.

Target Level

Target level tracking can be done by including a single target name or list of target names on process initialization. For example::

process_run = ProcessTracker(process_name='Lahman Teams Load'
                             , process_type='Stage Load'
                             , actor_name='New ProcessTracker User'
                             , tool_name='Spark'
                             , targets='My Baseball Datastore')

For multiple targets::

process_run = ProcessTracker(process_name='Lahman Teams Load'
                             , process_type='Stage Load'
                             , actor_name='New ProcessTracker User'
                             , tool_name='Spark'
                             , targets={'My Baseball Datastore', 'A Different Baseball Datastore'})

Target Object Level

Target Object level tracking is done in a similar way as above. Regardless of being a single target object, multiple target objects, or multiple targets with single or multiple targets, target object level tracking is done via a dictionary of lists.:

process_run = ProcessTracker(process_name='Lahman Teams Load'
                             , process_type='Stage Load'
                             , actor_name='New ProcessTracker User'
                             , tool_name='Spark'
                             , target_objects={"My Baseball Datastore": ["team", "player"]}

For multiple targets::

process_run = ProcessTracker(process_name='Lahman Teams Load'
                             , process_type='Stage Load'
                             , actor_name='New ProcessTracker User'
                             , tool_name='Spark'
                             , target_objects={"My Baseball Datastore": ["team", "player"]
                                               , "A Different Baseball Datastore": ["Team", "Player"]}

Note that targets is not set. The targets variable will be ignored if target_objects is set.

Process Extracts

The other element to processing data is the extract files that may be used in the process or between processes. Note that using this is not required if extract files are not used. Extracts are always associated with a process run, which is why the extract functionality is primarily tied to the ProcessTracker submodule.

Finding Extracts

Extract files can be found in a few different ways. Finders will return extracts in ‘ready’ state by default. Other statuses can be searched for if required by adding the status variable. The finders also will only return extract files that have been registered in ProcessTracker.

By Filename

Full Filename

So let’s say that you know that there is a specific file that needs processing. You can search for a specific file by:

process_run.find_extracts_by_filename(filename='my_file.csv')

This will return the ExtractTracking object, which includes the location of the file.

Partial Filename

Let’s say that you know that the files you are looking for match a specific pattern, for example::

my_file_2019_01_01.csv
my_file_2019_02_01.csv
...

Instead of looking for each file one at a time, you can use the partial filename::

process_run.find_extracts_by_filename(filename='my_file_')

This will return the ExtractTracking object, which included the location of the file. This function is greedy meaning it will return ANY files with ‘my_file’ in the filename. For instance::

my_file_2019_01_01.csv
this_is_my_file.xls
2019-01-01-my_file.csv

By Location

Locations are the filepaths where extract files are stored. These can be local, a network drive, or a cloud directory.:

process_run.find_extracts_by_location(location='My Location')

The location name is used and the ExtractTracking object(s) are returned.

By Process

If the process has a parent process that creates files for it, or there is a process that produces files that will be used then the parent process’ name can be used to find any ready extracts::

process_run.find_extracts_by_process(extract_process_name='My Super Cool Process')

This will find all extract files associated to that process that are in ‘ready’ state and return their ExtractTracking objects.

Finding Extracts By Other Statuses

All finder methods have a status variable with a default of ‘ready’. To search by another status type, just modify the variable::

process_run.find_extracts_by_location(location='My Location', status='completed')

The status type must exist in Extract Status.

Registering Extracts

If your process is creating extract files, they will need to be registered. They can either be registered one at a time as noted in ExtractTracker or one of the below helper methods.

By Location

This will attempt to access the given location and find all files stored there. If the files are not already registered they will be processed, otherwise they will be ignored.:

process_run.register_extracts_by_location(location_path='/path/to/files')

Currently, this only supports local filepaths.

By Process

This method is explained over in ExtractTracker.

Bulk Extract Update

Extracts can also be processed in bulk. If you use one of the lookup functions, it returns a list of extract file objects. Passing that list to the bulk_change_extract_status method will associate those extracts with the process and bulk update their status.:

process_run.bulk_change_extract_status(extracts=extract_list, extract_status="loading")

Please note, that while going through the list if any of the extracts are interdependent of each other and the parent dependency has not been loaded, the process will currently fail to protect data continuity.

Process Helpers

There are several helpers for ProcessTracker objects to assist in working with other components associated with the ProcessTracker instance. Those objects are:

  • actor
  • process_type
  • process
  • sources
  • targets
  • tool

To use attributes of the objects, call the attribute like so::

process_run = ProcessTracker(process_name='Lahman Teams Load'
                                     , process_type='Stage Load'
                                     , actor_name='New ProcessTracker User'
                                     , tool_name='Spark'
                                     , sources='Lahman Baseball Dataset')

process_run.actor.actor_name # To get the actor_name attribute of actor object associated with process_run.

Finding Process Contacts

To find a process’ contacts as well as that process’ source contacts, there is a lookup function that will return the name and email of the contact as well as if the contact is for the process itself or that process’ source.:

process_run.find_process_contacts(process_id)

Finding Process Source Attributes

Processes usually have sources that they work with to pull data out into either extracts or for further processing. To find the source attributes that a process will use there is a helper function.:

process_run.find_process_source_attributes(process_id)

This will return a list with the following:

process_source_attributes returned
Attribute Name Attribute Description
source_name The name of the source system
source_type The type of the source system
source_object_name The object (i.e. table or dataset) name from the source system
source_object_attribute_name The attribute (i.e. column) name from the source object
is_key Is the attribute part of the source object’s key?
is_filter Is the attribute used for record version comparison?
is_partition Is the attribute used for the partitioning of the data set (i.e. if the filetype is Parquet is the attribute used for the file partition)?

Please note, if the attributes are not registered either during the initialization of the process or thru direct interaction of the data store, no data will be returned.

Finding Process Target Attributes

Processes usually have targets that they work with to push data into or for further processing. To find the target attributes that a process will use there is a helper function.:

process_run.find_process_target_attributes(process_id)

This will return a list with the following:

process_target_attributes returned
Attribute Name Attribute Description
target_name The name of the target source system
target_type The type of the target source system
target_object_name The object (i.e. table or dataset) name from the target source system
target_object_attribute_name The attribute (i.e. column) name from the target source object
is_key Is the attribute part of the source object’s key?
is_filter Is the attribute used for record version comparison?
is_partition Is the attribute used for the partitioning of the data set (i.e. if the filetype is Parquet is the attribute used for the file partition)?

Please note, if the attributes are not registered either during hte initialization of the process or thru direct interaction of the data store, no data will be returned.

Finding Processes By Schedule Frequency

Processes can have an optional schedule frequency. To find all processes of a given frequency there is a helper function.:

process_run.find_process_by_schedule_frequency(frequency="hourly")

This will return a list of process objects with the given frequency.

Finding Process Filters

Processes will be querying data from given sources. The source data will likely be filtered on specific criteria. To retrieve the process’ filters, use the find_process_filters method.:

process_run.find_process_filters(process=process_id)

This will return a list of attributes and their requisite filters.

filter attributes provided
Attribute name Attribute description
source_name The name of the source system
source_object_name The name of the object (i.e. table or data set) within/from the source system
source_object_attribute_name The attribute (i.e. column) name from the object in within/from the source system
filter_type_code Based on the defaults in Filter Type, the type of filter being applied to the source object attribute
filter_value_numeric The value being filtered by, provided the attribute is numeric.
filter_value_string The value being filtered by, provided the attribute is string.