etl.cleaning

Removing irrelevant, redun-dant, or noisy information from the data, such as stop words or special characters.

etl.cleaning.char module

A collection of modules for cleaning data at the character level. For example: whitespace, accent characters, and unprintable characters.

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

etl.cleaning.char.cleaning___char___remove_accent(self, spark, data: pyspark.rdd.RDD | pyspark.sql.dataframe.DataFrame, subset: str = 'text', *args, **kwargs) pyspark.rdd.RDD

Strips accents from a piece of text.

input

output

café résumé

cafe resume

Code is from facebookresearch/cc_net https://github.com/facebookresearch/cc_net/blob/main/cc_net/text_normalizer.py

Parameters:
  • spark (SparkSession) – The Spark session object.

  • data (Union[RDD, DataFrame]) – The input data to be processed.

  • subset (str) – A subset or column to consider. Defaults to ‘text’.

Returns:

The processed data with accents removed.

etl.cleaning.char.cleaning___char___normalize_whitespace(self, spark, data: pyspark.rdd.RDD | pyspark.sql.dataframe.DataFrame, subset: str = 'text', *args, **kwargs) pyspark.rdd.RDD

Normalize whitespace. - Strips the leading and trailing whitespaces. - Replaces all consecutive whitespaces with a single space, excluding \n and \r characters.

Parameters:
  • spark (SparkSession) – The Spark session object.

  • data (Union[RDD, DataFrame]) – The input data to be processed.

  • subset (str) – A subset or column to consider. Defaults to ‘text’.

Returns:

The processed data with normalized whitespace.

Return type:

RDD

etl.cleaning.char.cleaning___char___remove_unprintable(self, spark, data: pyspark.rdd.RDD | pyspark.sql.dataframe.DataFrame, subset='text', *args, **kwargs) pyspark.rdd.RDD

Remove all the non-printable characters.

Code is from facebookresearch/cc_net https://github.com/facebookresearch/cc_net/blob/main/cc_net/text_normalizer.py

Parameters:
  • spark (SparkSession) – The Spark session object.

  • data (Union[RDD, DataFrame]) – The input data to be processed.

  • subset (str) – A subset or column to consider. Defaults to ‘text’.

Returns:

The processed data with unprintable characters are removed.

Return type:

RDD

etl.cleaning.document module

A collection of modules for cleaning data at the document level.

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

etl.cleaning.document.cleaning___document___split_by_word(self, spark, data: pyspark.rdd.RDD | pyspark.sql.dataframe.DataFrame, subset: str = 'text', word_per_chunk: int = 100, delimiter: str = ' ', *args, **kwargs) pyspark.rdd.RDD

Split documents into smaller chunks by word.

Parameters:
  • spark (SparkSession) – The Spark session object.

  • data (Union[RDD, DataFrame]) – The input data to be processed.

  • subset (str, optional) – A subset or column to consider. Defaults to ‘text’.

  • word_per_chunk (int, optional) – Number of words per chunk. Defaults to 100.

  • delimiter (str, optional) – Delimiter to split the text. Defaults to “ “.

Returns:

The processed data with documents split into smaller chunks.

Return type:

RDD

Raises:

ValueError – If word_per_chunk is not a positive integer.

Examples

  • word_per_chunk = 2

  • delimiter = “ “

  • input

    text

    “hello world, how are you?”

  • output

    text

    “hello world,”

    “how are”

    “you?”

Caveats:
  • NO normalization is done here!
    • This doesn’t consider the whitespace normalization.

    • Recommend using other normalization before this.

  • All the keys from the original row are copied to all the new rows created.
    • id is not unique anymore.

    • Make sure id is assigned after this step.

etl.cleaning.html module

A collection of modules for cleaning data includes html.

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

etl.cleaning.html.cleaning___html___extract_plain_text(self, spark, data: pyspark.rdd.RDD | pyspark.sql.dataframe.DataFrame, subset: str = 'text', use_trafilatura: bool = False, *args, **kwargs) pyspark.rdd.RDD

Extracts plain text from HTML.

Parameters:
  • spark (SparkSession) – The Spark session object.

  • data (Union[RDD, DataFrame]) – The input data to be processed.

  • subset (str, optional) – A subset or column to consider. Defaults to ‘text’.

  • use_trafilatura (bool, optional) – Whether to use trafilatura instead of html2text. Defaults to False.

Returns:

The plain data extracted from html.

Caveats:
  • html2text adds a double newline after each paragraph, which is not handled at this point.

  • The option to use trafilatura is provided because extracting plain text with trafilatura does not seem to work well in some cases.

    • [OK] Case:

      text = "<body><h1>My First Heading</h1><p>My first paragraph.</p></body>"
      
      # html2text
      print(html2text.html2text(text))
      >>> '# My First Heading\n\nMy first paragraph.\n\n'
      
      # trafilatura
      print(trafilatura.html2txt(text))
      >>> 'My First HeadingMy first paragraph.'
      
    • [ERROR] Case (trafilatura removes all the text):

      text = "<p>hello <br> nice to meet you.</p>"
      
      # html2text
      print(html2text.html2text(text))
      >>> 'hello  \nnice to meet you.\n\n'
      
      # trafilatura
      print(trafilatura.html2txt(text))
      >>> ''
      

etl.cleaning.korean module

This is only for Korean text datas.

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

class etl.cleaning.korean.KoreanType(value)

Bases: IntEnum

An enumeration.

etl.cleaning.korean.cleaning___korean___filter_by_ratio(spark, data: RDD | DataFrame, subset: str = 'text', filter_type: str = 'word', korean_ratio: float = 0.5, *args, **kwargs) RDD

Filters out the text that has less than korean_ratio excluding space.

Code is from eleutherAI/dps and was modified https://github.com/EleutherAI/dps/blob/master/dps/spark/prep/korean_prep.py#L52

Parameters:
  • spark (SparkSession) – The Spark session object.

  • data (Union[RDD, DataFrame]) – The input data to be processed. It can be either an RDD or a DataFrame.

  • subset (str, optional) – A subset or column to consider. Defaults to ‘text’.

  • filter_type (str, optional) – The type of filtering to be applied. Can be ‘char’ or ‘word’. Defaults to ‘word’.

  • korean_ratio (float, optional) – The minimum ratio of Korean characters or words required for a text to survive the filtering. Defaults to 0.5.

Returns:

The filtered data with it’s Korean ratio.

Raises:

ValueError – If the filter_type is not ‘char’ or ‘word’, or if the korean_ratio is not between 0 and 1.

Examples

With korean_ratio = 0.5

text

“한국어가 포함 비율이 50% 이상인 경우만 남김”

  • filter_type = ‘char’ -> [survive!]
    • Korean characters: 17

    • Non-Korean characters: 3

    • Total characters: 20

    • Korean character ratio: 17 / 20 > 0.5 -> True

  • filter_type = ‘word’ -> [survive!]
    • Korean characters: 6

    • Non-Korean characters: 1

    • Total characters: 7

    • Korean character ratio: 6 / 7 > 0.5 -> True

text

“korean including 비율이 50% 미만인 경우 제거”

  • filter_type = ‘char’ -> [remove!]
    • Korean characters: 10

    • Non-Korean characters: 28

    • Total characters: 38

    • Korean word ratio: 10 / 38 > 0.5 -> False

  • filter_type = ‘word’ -> [survive!]
    • Korean characters: 4

    • Non-Korean characters: 3

    • Total characters: 7

    • Korean word ratio: 4 / 7 > 0.5 -> True

Note

  • The regex to count Korean characters doesn’t work properly on characters that are not words.
    • e.g 안녕”하세요 is counted is 2 korean words - [“안녕”, “하세요”]

etl.cleaning.korean.cleaning___korean___reduce_emoticon(self, spark, data: pyspark.rdd.RDD | pyspark.sql.dataframe.DataFrame, subset: str | List[str] = 'text', num_repeats: int = 2, *args, **kwargs) pyspark.rdd.RDD

Reduces emoticon Korean characters.

It performs the following steps:

  1. Splits complete Korean characters into individual characters, preserving only the previous jaum and next moum.

    • e.g. (remain) ㅋㅋ킄ㅋㅋㅋ -> ㅋㅋ킄ㅋㅋㅋ

    • e.g. (splited) ㅋㅋ쿠ㅜㅜㅜ -> ㅋㅋㅋㅜㅜㅜㅜ

  2. Reduces repeating Korean characters.
    • e.g. ㅋㅋㅋㅋㅋ -> ㅋㅋ

Parameters:
  • spark (SparkSession) – The Spark session object.

  • data (Union[RDD, DataFrame]) – The input data to be processed. It can be either an RDD or a DataFrame.

  • subset (str, optional) – A subset or columns to consider. Defaults to ‘text’.

  • num_repeats (int, optional) – The number of repeating characters to reduce. Defaults to 2.

Returns:

The processed data with reduced emoticon Korean characters.

Return type:

RDD

Note

[ potential risk of splitting complete korean character ]

splitting emoticon characters into individual characters has high risk inside so only left one case that is complete korean character between jaum and moum other cases were added also but due to the risk, wiped out

References

etl.cleaning.length module

Filtering based on length.

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

etl.cleaning.length.cleaning___length___char_len_filter(self, spark, data: pyspark.rdd.RDD | pyspark.sql.dataframe.DataFrame, subset: str = 'text', min_len: int = None, max_len: int = None, *args, **kwargs) pyspark.rdd.RDD

Filters the data by character length.

Parameters:
  • spark (SparkSession) – The Spark session object.

  • data (Union[RDD, DataFrame]) – The input data to be processed.

  • subset (str, optional) – A subset or column to consider. Defaults to ‘text’.

  • min_len (int, optional) – The minimum length of characters to filter. If None, there is no minimum length.

  • max_len (int, optional) – The maximum length of characters to filter. If None, there is no maximum length.

Returns:

The filtered data as an RDD.

Raises:

ValueError – If both min_len and max_len are None.

Note

  • min_len <= len <= max_len

  • min_len and max_len can not be None at the same time.

  • If min_len is None, then only the maximum length is considered.

  • If max_len is None, then only the minimum length is considered.

etl.cleaning.length.cleaning___length___word_len_filter(self, spark, data: pyspark.rdd.RDD | pyspark.sql.dataframe.DataFrame, subset='text', min_len: int = None, max_len: int = None, *args, **kwargs)

filter by word length

min_len <= len <= max_len - if min_len is None, then len <= max_len - if max_len is None, then len >= min_len

Parameters:
  • subset – column to filter

  • min_len – minimum length to filter

  • max_len – maximum length to filter

etl.cleaning.number module

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

etl.cleaning.number.cleaning___number___normalize(self, spark, data: pyspark.rdd.RDD | pyspark.sql.dataframe.DataFrame, subset: str = 'text', assign_number: int = 0, *args, **kwargs) pyspark.rdd.RDD

Convert all the number to assigned number (e.g. 0)

Code is from facebookresearch/cc_net https://github.com/facebookresearch/cc_net/blob/main/cc_net/text_normalizer.py

Examples

  • input

text

1234

1234.5678

  • output

text

0000

0000.0000

Parameters:
  • spark (SparkSession) – The Spark session object.

  • data (Union[RDD, DataFrame]) – The input data to be processed. It can be either an RDD or a DataFrame.

  • subset (str, optional) – A subset or column to consider. Defaults to ‘text’.

  • assign_number (int, optional) – The number to assign. Default is 0.

Returns:

The normalized data.

Raises:

AssertionError – If assign_number is not between 0 and 9 (inclusive).

etl.cleaning.table module

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

etl.cleaning.table.cleaning___table___merge_col_vertical(self, spark, data: pyspark.rdd.RDD | pyspark.sql.dataframe.DataFrame, col1: str = None, col2: str = None, merge_col_name: str = 'merge_col', *args, **kwargs) pyspark.rdd.RDD

Merges two columns vertically into one column.

Example

Before:

col1

col2

species

1

2

duck

3

4

duck

5

6

ducky

After calling cleaning_table_merge_col_vertical(...):

number

species

1

duck

3

duck

5

ducky

2

duck

4

duck

6

ducky

Parameters:
  • spark (SparkSession) – The Spark session object.

  • data (Union[RDD, DataFrame]) – The input data to be processed. It can be either an RDD or a DataFrame.

  • col1 (str) – The name of the first column to merge.

  • col2 (str) – The name of the second column to merge.

  • merge_col_name (str, optional) – The name of the merged column.

Returns:

The processed data with the merged column.

Raises:

ValueError – If col1 or col2 is not specified.

etl.cleaning.unicode module

Copyright (c) 2024-present Upstage Co., Ltd. Apache-2.0 license

etl.cleaning.unicode.cleaning___unicode___remove_punct(self, spark, data: pyspark.rdd.RDD | pyspark.sql.dataframe.DataFrame, subset: str = 'text', *args, **kwargs) pyspark.rdd.RDD

Removes all the Unicode punctuations.

Code is from facebookresearch/cc_net https://github.com/facebookresearch/cc_net/blob/main/cc_net/text_normalizer.py

Parameters:
  • spark (SparkSession) – The Spark session object.

  • data (Union[RDD, DataFrame]) – The input data to be processed. It can be either an RDD or a DataFrame.

  • subset (str, optional) – A subset or column to consider. Defaults to ‘text’.

Returns:

The cleaned data.

etl.cleaning.unicode.cleaning___unicode___replace_punct(self, spark, data: pyspark.rdd.RDD | pyspark.sql.dataframe.DataFrame, subset: str = 'text', *args, **kwargs) pyspark.rdd.RDD

Replace all the unicode punctuations

Code is from facebookresearch/cc_net https://github.com/facebookresearch/cc_net/blob/main/cc_net/text_normalizer.py

Parameters:
  • spark (SparkSession) – The Spark session object.

  • data (Union[RDD, DataFrame]) – The input data to be processed. It can be either an RDD or a DataFrame.

  • subset (str, optional) – A subset or column to consider. Defaults to ‘text’.

Returns:

The cleaned data.

etl.cleaning.unicode.cleaning___unicode___normalize(self, spark, data: pyspark.rdd.RDD | pyspark.sql.dataframe.DataFrame, subset='text', *args, **kwargs)

Normalize the unicode

Parameters:
  • spark (SparkSession) – The Spark session object.

  • data (Union[RDD, DataFrame]) – The input data to be processed. It can be either an RDD or a DataFrame.

  • subset (str, optional) – A subset or column to consider. Defaults to ‘text’.

Returns:

The cleaned data.