etl.data_ingestion

Facilitating the loading of data from various sources (e.g., data in Huggingface Hub, and parquet/csv/arrow format data in local storage) into a preferred format.

etl.data_ingestion.arrow module

Load Arrow. Support direct loading of arrow saved huggingface dataset to spark dataframe.

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

etl.data_ingestion.arrow.data_ingestion___arrow___hf2raw(self, spark, path: str | List[str], sample_n: int = -1, arrow_partition_mb_size: int = -1, raw_partition_mb_size: int = 256, repartition: int = -1, seed: int = 42, verbose: bool = True, *args, **kwargs) pyspark.rdd.RDD

Directly loads the arrow saved HuggingFace dataset to raw format as a dictionary.

Parameters:
  • spark (SparkSession) – The Spark session object.

  • path (Union[str, List[str]]) – The path of the arrow folders.

  • sample_n (int, optional) – The number of arrow files to be sampled. Defaults to -1. If sample_n is -1, all arrow files will be loaded.

  • arrow_partition_mb_size (int, optional) – The size of each arrow partition in MB. Defaults to -1. If arrow_partition_size is -1, it will repartition arrow files by the number of arrow files. This assumes that arrow file size is evenly distributed. When there is data skew in arrow file size, it is recommended to use the default (-1).

  • raw_partition_mb_size (int, optional) – The size of each raw partition in MB. Defaults to 256. This is activated only when repartition is -1.

  • repartition (int, optional) – Manually choose the number of partitions. Defaults to -1.

  • seed (int, optional) – The seed for sampling. Defaults to 42.

  • verbose (bool, optional) – Whether to print the information of the dataset. Defaults to True.

Returns:

The RDD containing the raw data in dictionary format.

Return type:

RDD

Examples

>>> import datasets
>>> dataset = datasets.load_dataset('ducky')
>>> dataset.save_to_disk('your/path/to/ducky')
>>> data_ingestion___arrow___hf2raw()(spark, 'your/path/to/ducky')
Caveats:

Arrow paths are repartitioned by the number of arrow files.

etl.data_ingestion.common_crawl module

Load Common Crawl data from dump-id & segment files

Code is from facebookresearch/cc_net with some modifications https://github.com/facebookresearch/cc_net

This is a migration of the code to Dataverse.

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

etl.data_ingestion.common_crawl.data_ingestion___common_crawl___wet2raw(self, spark, wet_path: str, segment_n: int = -1, repartition=20, seed: int = 42, verbose=True, *args, **kwargs) pyspark.rdd.RDD

Load WET files and convert them to raw format as a dictionary.

[ what is WET? ] - WET files which store extracted plain text from the data stored in the WARC.

Parameters:
  • spark – The Spark session.

  • wet_path – The path to the WET folder that includes WET format files. This search recursively, so you don’t need to specify the path to each WET file. This search for all the *.wet, *.gz files in the folder.

  • segment_n – The number of segments to load. This is a sampling parameter. One segment is about 1GB. Set as -1 (default) to load all the segments.

  • repartition – The number of partitions.

  • seed – The random seed.

  • verbose – Whether to print the information of the dataset.

Returns:

The RDD containing the converted raw data.

Return type:

rdd

etl.data_ingestion.common_crawl.data_ingestion___common_crawl___dump2raw(self, spark, dump: str, segment_n: int = -1, repartition: int = 20, use_cache: bool = True, cache_dir: str = None, seed: int = 42, verbose: bool = True, *args, **kwargs) pyspark.rdd.RDD

Ingests data from Common Crawl dump and converts it to raw format.

Parameters:
  • spark (SparkSession) – The Spark session.

  • dump (str) – The dump ID of the Common Crawl. For example, ‘2023-23’.

  • segment_n (int, optional) – The number of segments to load. Default is -1, which loads all segments. Note that one segment is about 1GB.

  • repartition (int, optional) – The number of partitions. Default is 20.

  • use_cache (bool, optional) – Whether to use the cache. Default is True. If you want to save disk space, set as False because the size of cache can be large. FYI, on WET dump is about 10TB.

  • cache_dir (str, optional) – The cache path to save the dataset.

  • seed (int, optional) – The random seed. Default is 42.

  • verbose (bool, optional) – Whether to print the information of the dataset. Default is True.

Returns:

The RDD containing the processed data.

Return type:

RDD

etl.data_ingestion.common_crawl.data_ingestion___common_crawl___raw2ufl(self, spark, data: pyspark.rdd.RDD, *args, **kwargs)

Converts raw format to UFL with custom template.

Parameters:
  • spark (SparkSession) – The Spark session.

  • data (RDD) – The input data.

Returns:

The converted data in UFL format.

etl.data_ingestion.csv module

Load CSV data

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

etl.data_ingestion.csv.data_ingestion___csv___csv2raw(self, spark, path: str | List[str], repartition: int = 20, verbose: bool = True, *args, **kwargs) pyspark.rdd.RDD

Converts CSV data to raw RDD.

Parameters:
  • spark (SparkSession) – The Spark session.

  • path (Union[str, List[str]]) – The path(s) to the CSV file(s).

  • repartition (int, optional) – The number of partitions for the RDD. Defaults to 20.

  • verbose (bool, optional) – Whether to print the information of the dataset.

Returns:

The raw RDD containing the CSV data.

Return type:

RDD

etl.data_ingestion.cultura_x module

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

etl.data_ingestion.cultura_x.data_ingestion___cultura_x___raw2ufl(self, spark, ufl: pyspark.rdd.RDD, *args, **kwargs)

Converts raw format to UFL with custom template.

Parameters:
  • spark (SparkSession) – The Spark session object.

  • ufl (RDD) – The input DataFrame in raw format.

Returns:

The transformed DataFrame in UFL format.

Return type:

RDD

etl.data_ingestion.huggingface module

Load Huggingface data

This is used just to load huggingface dataset without any refomatting

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

etl.data_ingestion.huggingface.data_ingestion___huggingface___hf2raw(self, spark, name_or_path: str | List[str], split: int = None, from_disk: bool = False, repartition: int = 20, verbose: bool = True, *args, **kwargs) pyspark.rdd.RDD

Convert a HuggingFace dataset to raw format as a dictionary.

Parameters:
  • spark (SparkSession) – The Spark session.

  • name_or_path (Union[str, List[str]]) – The name or path of the HuggingFace dataset.

  • split (int, optional) – The split of the dataset. Defaults to None.

  • from_disk (bool, optional) – Whether to load from disk. Defaults to False. No split is allowed when from_disk is True.

  • repartition (int, optional) – The number of partitions. Defaults to 20.

  • verbose (bool, optional) – Whether to print the information of the dataset. Defaults to True.

Returns:

The converted dataset as an RDD of dictionaries.

Return type:

rdd

etl.data_ingestion.parquet module

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

etl.data_ingestion.parquet.data_ingestion___parquet___pq2raw(self, spark, path: str | List[str], repartition=20, *args, **kwargs) pyspark.rdd.RDD

Reads parquet files into an RDD and repartitions it.

Parameters:
  • spark (SparkSession) – The Spark session.

  • path (str or list) – The path of the parquet files.

  • repartition (int) – The number of partitions.

Returns:

The repartitioned RDD containing the data from the parquet files.

Return type:

rdd

etl.data_ingestion.red_pajama module

Supported datasets: https://huggingface.co/datasets/togethercomputer/RedPajama-Data-1T https://huggingface.co/datasets/togethercomputer/RedPajama-Data-1T-Sample

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

etl.data_ingestion.red_pajama.data_ingestion___red_pajama___parquet2ufl(self, spark, input_paths, repartition=20, *args, **kwargs)

convert parquet file to ufl

etl.data_ingestion.red_pajama.data_ingestion___red_pajama___hf2ufl(self, spark, name_or_path: str | List[str] = 'togethercomputer/RedPajama-Data-1T-Sample', split=None, from_disk=False, repartition=20, verbose=True, *args, **kwargs)

convert huggingface dataset to ufl

Parameters:
  • spark (SparkSession) – spark session

  • name_or_path (str or list) – the name or path of the huggingface dataset

  • split (str) – the split of the dataset

  • from_disk (bool) – whether to load from disk - no split is allowed when from_disk is True

  • repartition (int) – the number of partitions

  • verbose (bool) – whether to print the information of the dataset

etl.data_ingestion.red_pajama.data_ingestion___red_pajama___hf2raw(self, spark, name_or_path: str | List[str] = 'togethercomputer/RedPajama-Data-1T-Sample', split=None, repartition=20, verbose=True, *args, **kwargs)

convert huggingface dataset to raw format as dict

Parameters:
  • spark (SparkSession) – spark session

  • name_or_path (str or list) – the name or path of the huggingface dataset

  • split (str) – the split of the dataset

  • repartition (int) – the number of partitions

  • verbose (bool) – whether to print the information of the dataset

etl.data_ingestion.red_pajama.data_ingestion___red_pajama___raw2ufl_templatev1(self, spark, ufl, *args, **kwargs)

convert raw format to ufl with custom template

etl.data_ingestion.slim_pajama module

Supported datasets: https://huggingface.co/datasets/cerebras/SlimPajama-627B

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

etl.data_ingestion.slim_pajama.data_ingestion___slim_pajama___parquet2ufl(self, spark, input_paths, repartition=20, *args, **kwargs)

convert parquet file to ufl

etl.data_ingestion.slim_pajama.data_ingestion___slim_pajama___hf2ufl(self, spark, name_or_path: str | List[str] = 'cerebras/SlimPajama-627B', split=None, from_disk=False, repartition=20, verbose=True, *args, **kwargs)

convert huggingface dataset to ufl

Parameters:
  • spark (SparkSession) – spark session

  • name_or_path (str or list) – the name or path of the huggingface dataset

  • split (str) – the split of the dataset

  • from_disk (bool) – whether to load from disk - no split is allowed when from_disk is True

  • repartition (int) – the number of partitions

  • verbose (bool) – whether to print the information of the dataset

etl.data_ingestion.test module

special purpose to create fake data for testing or debugging

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

etl.data_ingestion.test.data_ingestion___test___generate_fake_ufl(self, spark, n: int = 100, repartition: int = 20, verbose: bool = True, *args, **kwargs) pyspark.rdd.RDD

Generate fake data for testing or debugging.

Parameters:
  • spark (SparkSession) – The Spark session object.

  • n (int, optional) – The number of data to generate. Default is 100.

  • repartition (int, optional) – The number of partitions. Default is 20.

  • verbose (bool, optional) – Whether to print the information of the dataset. Default is True.

Returns:

The generated fake data RDD.

Return type:

RDD