etl.data_ingestion
Facilitating the loading of data from various sources (e.g., data in Huggingface Hub, and parquet/csv/arrow format data in local storage) into a preferred format.
etl.data_ingestion.arrow module
Load Arrow. Support direct loading of arrow saved huggingface dataset to spark dataframe.
Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license
- etl.data_ingestion.arrow.data_ingestion___arrow___hf2raw(self, spark, path: str | List[str], sample_n: int = -1, arrow_partition_mb_size: int = -1, raw_partition_mb_size: int = 256, repartition: int = -1, seed: int = 42, verbose: bool = True, *args, **kwargs) pyspark.rdd.RDD
Directly loads the arrow saved HuggingFace dataset to raw format as a dictionary.
- Parameters:
spark (SparkSession) – The Spark session object.
path (Union[str, List[str]]) – The path of the arrow folders.
sample_n (int, optional) – The number of arrow files to be sampled. Defaults to -1. If sample_n is -1, all arrow files will be loaded.
arrow_partition_mb_size (int, optional) – The size of each arrow partition in MB. Defaults to -1. If arrow_partition_size is -1, it will repartition arrow files by the number of arrow files. This assumes that arrow file size is evenly distributed. When there is data skew in arrow file size, it is recommended to use the default (-1).
raw_partition_mb_size (int, optional) – The size of each raw partition in MB. Defaults to 256. This is activated only when repartition is -1.
repartition (int, optional) – Manually choose the number of partitions. Defaults to -1.
seed (int, optional) – The seed for sampling. Defaults to 42.
verbose (bool, optional) – Whether to print the information of the dataset. Defaults to True.
- Returns:
The RDD containing the raw data in dictionary format.
- Return type:
RDD
Examples
>>> import datasets >>> dataset = datasets.load_dataset('ducky') >>> dataset.save_to_disk('your/path/to/ducky') >>> data_ingestion___arrow___hf2raw()(spark, 'your/path/to/ducky')
- Caveats:
Arrow paths are repartitioned by the number of arrow files.
etl.data_ingestion.common_crawl module
Load Common Crawl data from dump-id & segment files
Code is from facebookresearch/cc_net with some modifications https://github.com/facebookresearch/cc_net
This is a migration of the code to Dataverse.
Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license
- etl.data_ingestion.common_crawl.data_ingestion___common_crawl___wet2raw(self, spark, wet_path: str, segment_n: int = -1, repartition=20, seed: int = 42, verbose=True, *args, **kwargs) pyspark.rdd.RDD
Load WET files and convert them to raw format as a dictionary.
[ what is WET? ] - WET files which store extracted plain text from the data stored in the WARC.
- Parameters:
spark – The Spark session.
wet_path – The path to the WET folder that includes WET format files. This search recursively, so you don’t need to specify the path to each WET file. This search for all the *.wet, *.gz files in the folder.
segment_n – The number of segments to load. This is a sampling parameter. One segment is about 1GB. Set as -1 (default) to load all the segments.
repartition – The number of partitions.
seed – The random seed.
verbose – Whether to print the information of the dataset.
- Returns:
The RDD containing the converted raw data.
- Return type:
rdd
- etl.data_ingestion.common_crawl.data_ingestion___common_crawl___dump2raw(self, spark, dump: str, segment_n: int = -1, repartition: int = 20, use_cache: bool = True, cache_dir: str = None, seed: int = 42, verbose: bool = True, *args, **kwargs) pyspark.rdd.RDD
Ingests data from Common Crawl dump and converts it to raw format.
- Parameters:
spark (SparkSession) – The Spark session.
dump (str) – The dump ID of the Common Crawl. For example, ‘2023-23’.
segment_n (int, optional) – The number of segments to load. Default is -1, which loads all segments. Note that one segment is about 1GB.
repartition (int, optional) – The number of partitions. Default is 20.
use_cache (bool, optional) – Whether to use the cache. Default is True. If you want to save disk space, set as False because the size of cache can be large. FYI, on WET dump is about 10TB.
cache_dir (str, optional) – The cache path to save the dataset.
seed (int, optional) – The random seed. Default is 42.
verbose (bool, optional) – Whether to print the information of the dataset. Default is True.
- Returns:
The RDD containing the processed data.
- Return type:
RDD
- etl.data_ingestion.common_crawl.data_ingestion___common_crawl___raw2ufl(self, spark, data: pyspark.rdd.RDD, *args, **kwargs)
Converts raw format to UFL with custom template.
- Parameters:
spark (SparkSession) – The Spark session.
data (RDD) – The input data.
- Returns:
The converted data in UFL format.
etl.data_ingestion.csv module
Load CSV data
Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license
- etl.data_ingestion.csv.data_ingestion___csv___csv2raw(self, spark, path: str | List[str], repartition: int = 20, verbose: bool = True, *args, **kwargs) pyspark.rdd.RDD
Converts CSV data to raw RDD.
- Parameters:
spark (SparkSession) – The Spark session.
path (Union[str, List[str]]) – The path(s) to the CSV file(s).
repartition (int, optional) – The number of partitions for the RDD. Defaults to 20.
verbose (bool, optional) – Whether to print the information of the dataset.
- Returns:
The raw RDD containing the CSV data.
- Return type:
RDD
etl.data_ingestion.cultura_x module
Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license
- etl.data_ingestion.cultura_x.data_ingestion___cultura_x___raw2ufl(self, spark, ufl: pyspark.rdd.RDD, *args, **kwargs)
Converts raw format to UFL with custom template.
- Parameters:
spark (SparkSession) – The Spark session object.
ufl (RDD) – The input DataFrame in raw format.
- Returns:
The transformed DataFrame in UFL format.
- Return type:
RDD
etl.data_ingestion.huggingface module
Load Huggingface data
This is used just to load huggingface dataset without any refomatting
Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license
- etl.data_ingestion.huggingface.data_ingestion___huggingface___hf2raw(self, spark, name_or_path: str | List[str], split: int = None, from_disk: bool = False, repartition: int = 20, verbose: bool = True, *args, **kwargs) pyspark.rdd.RDD
Convert a HuggingFace dataset to raw format as a dictionary.
- Parameters:
spark (SparkSession) – The Spark session.
name_or_path (Union[str, List[str]]) – The name or path of the HuggingFace dataset.
split (int, optional) – The split of the dataset. Defaults to None.
from_disk (bool, optional) – Whether to load from disk. Defaults to False. No split is allowed when from_disk is True.
repartition (int, optional) – The number of partitions. Defaults to 20.
verbose (bool, optional) – Whether to print the information of the dataset. Defaults to True.
- Returns:
The converted dataset as an RDD of dictionaries.
- Return type:
rdd
etl.data_ingestion.parquet module
Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license
- etl.data_ingestion.parquet.data_ingestion___parquet___pq2raw(self, spark, path: str | List[str], repartition=20, *args, **kwargs) pyspark.rdd.RDD
Reads parquet files into an RDD and repartitions it.
- Parameters:
spark (SparkSession) – The Spark session.
path (str or list) – The path of the parquet files.
repartition (int) – The number of partitions.
- Returns:
The repartitioned RDD containing the data from the parquet files.
- Return type:
rdd
etl.data_ingestion.red_pajama module
Supported datasets: https://huggingface.co/datasets/togethercomputer/RedPajama-Data-1T https://huggingface.co/datasets/togethercomputer/RedPajama-Data-1T-Sample
Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license
- etl.data_ingestion.red_pajama.data_ingestion___red_pajama___parquet2ufl(self, spark, input_paths, repartition=20, *args, **kwargs)
convert parquet file to ufl
- etl.data_ingestion.red_pajama.data_ingestion___red_pajama___hf2ufl(self, spark, name_or_path: str | List[str] = 'togethercomputer/RedPajama-Data-1T-Sample', split=None, from_disk=False, repartition=20, verbose=True, *args, **kwargs)
convert huggingface dataset to ufl
- Parameters:
spark (SparkSession) – spark session
name_or_path (str or list) – the name or path of the huggingface dataset
split (str) – the split of the dataset
from_disk (bool) – whether to load from disk - no split is allowed when from_disk is True
repartition (int) – the number of partitions
verbose (bool) – whether to print the information of the dataset
- etl.data_ingestion.red_pajama.data_ingestion___red_pajama___hf2raw(self, spark, name_or_path: str | List[str] = 'togethercomputer/RedPajama-Data-1T-Sample', split=None, repartition=20, verbose=True, *args, **kwargs)
convert huggingface dataset to raw format as dict
- Parameters:
spark (SparkSession) – spark session
name_or_path (str or list) – the name or path of the huggingface dataset
split (str) – the split of the dataset
repartition (int) – the number of partitions
verbose (bool) – whether to print the information of the dataset
- etl.data_ingestion.red_pajama.data_ingestion___red_pajama___raw2ufl_templatev1(self, spark, ufl, *args, **kwargs)
convert raw format to ufl with custom template
etl.data_ingestion.slim_pajama module
Supported datasets: https://huggingface.co/datasets/cerebras/SlimPajama-627B
Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license
- etl.data_ingestion.slim_pajama.data_ingestion___slim_pajama___parquet2ufl(self, spark, input_paths, repartition=20, *args, **kwargs)
convert parquet file to ufl
- etl.data_ingestion.slim_pajama.data_ingestion___slim_pajama___hf2ufl(self, spark, name_or_path: str | List[str] = 'cerebras/SlimPajama-627B', split=None, from_disk=False, repartition=20, verbose=True, *args, **kwargs)
convert huggingface dataset to ufl
- Parameters:
spark (SparkSession) – spark session
name_or_path (str or list) – the name or path of the huggingface dataset
split (str) – the split of the dataset
from_disk (bool) – whether to load from disk - no split is allowed when from_disk is True
repartition (int) – the number of partitions
verbose (bool) – whether to print the information of the dataset
etl.data_ingestion.test module
special purpose to create fake data for testing or debugging
Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license
- etl.data_ingestion.test.data_ingestion___test___generate_fake_ufl(self, spark, n: int = 100, repartition: int = 20, verbose: bool = True, *args, **kwargs) pyspark.rdd.RDD
Generate fake data for testing or debugging.
- Parameters:
spark (SparkSession) – The Spark session object.
n (int, optional) – The number of data to generate. Default is 100.
repartition (int, optional) – The number of partitions. Default is 20.
verbose (bool, optional) – Whether to print the information of the dataset. Default is True.
- Returns:
The generated fake data RDD.
- Return type:
RDD