etl.pipeline
ETL Interface: user will be interacting with this interface
Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license
- class etl.pipeline.ETLPipeline
ETL Pipeline.
This class represents an ETL (Extract, Transform, Load) pipeline. It provides methods for managing and executing ETL processes.
- registry
The registry of ETL processes.
- Type:
Examples
>>> etl_pipeline = ETLPipeline() >>> etl_pipeline.status() >>> etl_pipeline.search('data_ingestion', 'ufl') >>> spark, data = etl_pipeline.sample()
>>> config = Config.default() >>> etl_pipeline.run(config = config)
- status(self)
Get the status of the registry.
- Returns:
The status of the registry.
- Return type:
str
- Raises:
None –
Examples
>>> etl_pipeline = EtlPipeline() >>> etl_pipeline.status() 'If you need details of ETL Registry use `etl_pipeline.search()`'
Note
This method does not show detailed information. It will only info about category .
- search(self, category=None, sub_category=None)
Get detailed status of the registry by searching.
This function lets you know category, sub_category, and etl_name.
- Parameters:
category (str, optional) – The category to filter the search results. Defaults to None.
sub_category (str, optional) – The sub-category to filter the search results. Defaults to None.
- Returns:
A list of search results matching the specified category and sub-category.
- Return type:
list
Examples
Return every ETL
>>> etl_pipeline.search()
Only selected category
>>> etl_pipeline.search('data_ingestion') >>> etl_pipeline.search(category='data_ingestion')
Only selected category & sub_category
>>> etl_pipeline.search('data_ingestion', 'ufl') >>> etl_pipeline.search(category='data_ingestion', sub_category='ufl')
- get(self, key)
get ETL class from registry
- setup_spark_conf(self, config, verbose=False)
AWS credential setting log is not influenced by the verbose by design
- sample(self, n=100, config=None, sample_etl='data_ingestion___test___generate_fake_ufl', verbose=False)
Get the spark session and sample data.
Use this function to test the ETL pipeline quickly without config.
- Parameters:
n (int) – The number of data to generate. Default is 100.
config (Union[str, dict, OmegaConf]) – Config for the ETL. Default is None.
sample_etl (str) – The name of the sample ETL process. Default is “data_ingestion___test___generate_fake_ufl”.
verbose (bool) – If True, print the status. Default is False.
- Returns:
The Spark session and the sampled data.
- Return type:
Tuple[SparkSession, DataFrame]
- run(self, config: str | dict | DictConfig | OmegaConf | Path, verbose=False, cache=False, emr=False, *args, **kwargs)
Runs the ETL process.
- Parameters:
config (Union[str, dict, OmegaConf]) – config for the etl - str: path to the config file - dict: config dict - OmegaConf: config object
verbose (bool) – if True, print the status of the etl pipeline - the verbose will be applied to the ETL process as well - ETL process verbose takes precedence over this
cache (bool) – cache every stage of the ETL process
emr (bool) – if True, run the ETL process on EMR
- run_emr(self, config: str | dict | DictConfig | OmegaConf | Path, verbose=False, cache=False, *args, **kwargs)
Runs the ETL process on an EMR cluster.
- Parameters:
config (Union[str, dict, OmegaConf]) – config for the etl - str: path to the config file - dict: config dict - OmegaConf: config object
verbose (bool) – if True, print the status of the etl pipeline - the verbose will be applied to the ETL process as well - ETL process verbose takes precedence over this
cache (bool) – cache every stage of the ETL process
- Returns:
None for spark session
- Config for the config
originally data is returned, but it is not necessary for EMR
- Return type:
None, Config