A Slowly Altering Dimension (SCD) is a dimension that shops and manages each present and historic knowledge over time in an information warehouse. It’s thought of and carried out as probably the most vital ETL duties in monitoring the historical past of dimension information.
SCD2 is a dimension that shops and manages present and historic knowledge over time in a data warehouse. The aim of an SCD2 is to protect the historical past of adjustments. If a buyer adjustments their tackle, for instance, or every other attribute, an SCD2 permits analysts to hyperlink info again to the client and their attributes within the state they had been on the time of the actual fact occasion.
The next diagram illustrates a star schema with a Sale
reality desk and Buyer
dimension desk, which is managed as an SCD2 desk.
Let’s have a deeper have a look at the Buyer
dimension desk schema. You may categorize the columns into three completely different teams:
- Key:
customer_dim_key
, additionally referred to as a surrogate key, has a singular worth generated mechanically. It’s used as a international key for the sale reality desk. - Attributes:
customer_id
,first_name
,last_name
,metropolis
, andnation
have a enterprise worth utilized in enterprise intelligence (BI) stories. - SCD2 metadata:
eff_start_date
,eff_end_date
, andis_current
are designed to handle the state of the report.eff_start_date
andeff_end_date
include the time interval when the report is efficient. - Metadata:
timestamp
is the precise time when the client report was generated.
Let’s look into the code to see how one can construct your customized SCD2 class and apply adjustments in a fast and environment friendly method. Allow us to name this class as ScdTwo.
To seek out the adjustments, we are going to use __get_dataframe_hash()
technique. This technique will return the mixed hash of the important thing and knowledge. This technique will likely be used to detect adjustments between present knowledge and new incoming knowledge.
We have now one other technique referred to as _get_changes()
, which can discover any new information, up to date information, unchanged information, and deleted information. New information may have start_at
set as the present date time; up to date information may have end_at
time set as the present date time. Any unchanged information will stay unchanged, and no begin or finish dates will likely be up to date. Any information that had been deleted by the supply will likely be marked expired with the top date as the present date time once more.
Under is the detailed code to your reference:
from pyspark.sql.features import *
class ScdTwo():
def __init__(self, current_data, incoming_changes, keys, non_keys, update_timestamp, __END_AT_col="__END_AT",
__START_AT_col="__START_AT") -> None:
self.current_data = current_data
self.incoming_changes = incoming_changes
self.keys = keys
self.non_keys = non_keys
self.__END_AT_col = __END_AT_col
self.__START_AT_col = __START_AT_col
self.hashLength = 256
self.update_timestamp = update_timestamp
def __get_dataframe_hash(self, df):
hashed_df = (df.withColumn("data_hash", sha2(concat_ws('', *[(col(c)) for c in self.non_keys]), self.hashLength))
.withColumn("key_hash", sha2(concat_ws('', *[col(c) for c in self.keys]), self.hashLength)))
return hashed_df.withColumn("combined_hash", concat_ws('', hashed_df.key_hash, hashed_df.data_hash))
def _get_changes(self):
current_data_hash = self.__get_dataframe_hash(self.current_data)
incoming_changes_hash = self.__get_dataframe_hash(self.incoming_changes)
current_key_hash = current_data_hash.selectExpr("key_hash")
current_data_hash = current_data_hash.selectExpr("data_hash")
existing_combined_hash = current_data_hash.selectExpr("combined_hash")
new_key_hash = incoming_changes_hash.selectExpr("key_hash")
incoming_changes_hash = incoming_changes_hash.selectExpr("data_hash")
new_combined_hash = incoming_changes_hash.selectExpr("combined_hash")
matched_combined_hash = new_combined_hash.subtract(new_combined_hash.subtract(existing_combined_hash))
matched_data = current_data_hash.be a part of(matched_combined_hash, "combined_hash", "inside")
matched_key_hash = matched_data.selectExpr("key_hash")
brand_new_key_hash = new_key_hash.subtract(current_key_hash)
brand_incoming_changes = incoming_changes_hash.be a part of(brand_new_key_hash, "key_hash", "inside")
deleted_key_hash = current_key_hash.subtract(new_key_hash)
records_marked_for_deletion = current_data_hash.be a part of(deleted_key_hash, "key_hash", "inside")
deleted_data = records_marked_for_deletion.withColumn(self.__END_AT_col, expr(f"solid('{self.update_timestamp}' as timestamp) - interval 1 milliseconds"))
unmatched_key_hash = current_key_hash.subtract(new_key_hash)
unmatched_data = current_data_hash.be a part of(unmatched_key_hash, "key_hash", "inside")
updated_key_hash = (current_key_hash.be a part of(new_key_hash, "key_hash", "inside")).subtract(matched_key_hash)
updated_data = incoming_changes_hash.be a part of(updated_key_hash, "key_hash", "inside")
expired_data_prep = current_data_hash.be a part of(updated_key_hash,"key_hash", "inside")
expired_data = expired_data_prep.withColumn(self.__END_AT_col, expr(f"solid('{self.update_timestamp}' as timestamp) - interval 1 milliseconds"))
return (matched_data.drop("data_hash").drop("key_hash").drop("combined_hash"), brand_incoming_changes.drop("data_hash").drop("key_hash").drop("combined_hash"),
deleted_data.drop("data_hash").drop("key_hash").drop("combined_hash"), updated_data.drop("data_hash").drop("key_hash").drop("combined_hash"),
expired_data.drop("data_hash").drop("key_hash").drop("combined_hash"))
Conclusion
Implementing Slowly Altering Dimension Sort 2 (SCD2) from scratch affords flexibility, customization, and a deep understanding of information administration processes. Whereas constructing your personal SCD2 answer requires cautious design and coding, it permits for tailor-made logic particular to enterprise wants, together with customized replace methods, historic monitoring, and efficiency optimization.
Some key takeaways embody:
- Knowledge integrity: Correct historic monitoring ensures dependable knowledge evaluation.
- Customization: Enterprise-specific guidelines may be built-in seamlessly.
- Scalability: Effectively-designed SCD2 implementations scale with rising datasets.
Nonetheless, be conscious of potential challenges equivalent to knowledge skew, efficiency bottlenecks, and upkeep overhead. Combining greatest practices like partitioning, indexing, and incremental processing might help overcome these hurdles.