etl.pipeline

ETL Interface: user will be interacting with this interface

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

class etl.pipeline.ETLPipeline

ETL Pipeline.

This class represents an ETL (Extract, Transform, Load) pipeline. It provides methods for managing and executing ETL processes.

registry

The registry of ETL processes.

Type:

ETLRegistry

Examples

>>> etl_pipeline = ETLPipeline()
>>> etl_pipeline.status()
>>> etl_pipeline.search('data_ingestion', 'ufl')
>>> spark, data = etl_pipeline.sample()
>>> config = Config.default()
>>> etl_pipeline.run(config = config)
status(self)

Get the status of the registry.

Returns:

The status of the registry.

Return type:

str

Raises:

None

Examples

>>> etl_pipeline = EtlPipeline()
>>> etl_pipeline.status()
'If you need details of ETL Registry use `etl_pipeline.search()`'

Note

This method does not show detailed information. It will only info about category .

search(self, category=None, sub_category=None)

Get detailed status of the registry by searching.

This function lets you know category, sub_category, and etl_name.

Parameters:
  • category (str, optional) – The category to filter the search results. Defaults to None.

  • sub_category (str, optional) – The sub-category to filter the search results. Defaults to None.

Returns:

A list of search results matching the specified category and sub-category.

Return type:

list

Examples

Return every ETL

>>> etl_pipeline.search()

Only selected category

>>> etl_pipeline.search('data_ingestion')
>>> etl_pipeline.search(category='data_ingestion')

Only selected category & sub_category

>>> etl_pipeline.search('data_ingestion', 'ufl')
>>> etl_pipeline.search(category='data_ingestion', sub_category='ufl')
get(self, key)

get ETL class from registry

setup_spark_conf(self, config, verbose=False)

AWS credential setting log is not influenced by the verbose by design

sample(self, n=100, config=None, sample_etl='data_ingestion___test___generate_fake_ufl', verbose=False)

Get the spark session and sample data.

Use this function to test the ETL pipeline quickly without config.

Parameters:
  • n (int) – The number of data to generate. Default is 100.

  • config (Union[str, dict, OmegaConf]) – Config for the ETL. Default is None.

  • sample_etl (str) – The name of the sample ETL process. Default is “data_ingestion___test___generate_fake_ufl”.

  • verbose (bool) – If True, print the status. Default is False.

Returns:

The Spark session and the sampled data.

Return type:

Tuple[SparkSession, DataFrame]

run(self, config: str | dict | DictConfig | OmegaConf | Path, verbose=False, cache=False, emr=False, *args, **kwargs)

Runs the ETL process.

Parameters:
  • config (Union[str, dict, OmegaConf]) – config for the etl - str: path to the config file - dict: config dict - OmegaConf: config object

  • verbose (bool) – if True, print the status of the etl pipeline - the verbose will be applied to the ETL process as well - ETL process verbose takes precedence over this

  • cache (bool) – cache every stage of the ETL process

  • emr (bool) – if True, run the ETL process on EMR

run_emr(self, config: str | dict | DictConfig | OmegaConf | Path, verbose=False, cache=False, *args, **kwargs)

Runs the ETL process on an EMR cluster.

Parameters:
  • config (Union[str, dict, OmegaConf]) – config for the etl - str: path to the config file - dict: config dict - OmegaConf: config object

  • verbose (bool) – if True, print the status of the etl pipeline - the verbose will be applied to the ETL process as well - ETL process verbose takes precedence over this

  • cache (bool) – cache every stage of the ETL process

Returns:

  • None for spark session

  • Config for the config
    • originally data is returned, but it is not necessary for EMR

Return type:

None, Config