|
""" |
|
Utilities for processing text. |
|
""" |
|
|
|
from pathlib import Path |
|
from unidecode import unidecode |
|
|
|
import re, math, random, html |
|
import ftfy |
|
|
|
from huggingface_hub import hf_hub_download |
|
|
|
|
|
person_token = [("a person", 282265), ("someone", 121194), ("somebody", 12219)] |
|
temp_token = "xtokx" |
|
|
|
|
|
class HashtagProcessor: |
|
|
|
|
|
def __init__(self): |
|
wiki_word_frequency = hf_hub_download( |
|
"dalle-mini/dalle-mini", filename="enwiki-words-frequency.txt" |
|
) |
|
self._word_cost = ( |
|
l.split()[0] for l in Path(wiki_word_frequency).read_text().splitlines() |
|
) |
|
self._word_cost = { |
|
str(k): math.log(float(i + 1)) for i, k in enumerate(self._word_cost) |
|
} |
|
self._max_word = max(len(x) for x in self._word_cost.keys()) |
|
self._SPLIT_RE = re.compile("[^a-zA-Z0-9']+") |
|
|
|
def __call__(self, s): |
|
"""Uses dynamic programming to infer the location of spaces in a string without spaces.""" |
|
l = [self._split(x) for x in self._SPLIT_RE.split(s)] |
|
return " ".join([item for sublist in l for item in sublist]) |
|
|
|
def _split(self, s): |
|
|
|
|
|
|
|
def best_match(i): |
|
candidates = enumerate(reversed(cost[max(0, i - self._max_word) : i])) |
|
return min( |
|
(c + self._word_cost.get(s[i - k - 1 : i].lower(), 9e999), k + 1) |
|
for k, c in candidates |
|
) |
|
|
|
|
|
cost = [0] |
|
for i in range(1, len(s) + 1): |
|
c, k = best_match(i) |
|
cost.append(c) |
|
|
|
|
|
out = [] |
|
i = len(s) |
|
while i > 0: |
|
c, k = best_match(i) |
|
assert c == cost[i] |
|
newToken = True |
|
if not s[i - k : i] == "'": |
|
if len(out) > 0: |
|
|
|
if out[-1] == "'s" or ( |
|
s[i - 1].isdigit() and out[-1][0].isdigit() |
|
): |
|
out[-1] = ( |
|
s[i - k : i] + out[-1] |
|
) |
|
newToken = False |
|
|
|
if newToken: |
|
out.append(s[i - k : i]) |
|
|
|
i -= k |
|
|
|
return reversed(out) |
|
|
|
|
|
def replace_person_token(t): |
|
"Used for CC12M" |
|
t = re.sub("<person>([,\s]*(and)*[,\s]*<person>)+", " people ", t) |
|
while "<person>" in t: |
|
t = t.replace( |
|
"<person>", f" {random.choices(*tuple(zip(*person_token)))[0]} ", 1 |
|
) |
|
return t |
|
|
|
|
|
def fix_html(t): |
|
|
|
return html.unescape(html.unescape(t)) |
|
|
|
|
|
def replace_punctuation_with_commas(t): |
|
return re.sub("[()[\].,|:;?!=+~\-\/{}]", ",", t) |
|
|
|
|
|
def simplify_quotes(t): |
|
return re.sub("""['"`]""", ' " ', t) |
|
|
|
|
|
def merge_quotes(t): |
|
return re.sub('(\s*"+\s*)+', ' " ', t) |
|
|
|
|
|
def remove_comma_numbers(t): |
|
def _f(t): |
|
return re.sub("(\d),(\d{3})", r"\1\2", t) |
|
|
|
return _f(_f(t)) |
|
|
|
|
|
def pre_process_dot_numbers(t): |
|
return re.sub("(\w)\.(\w)", fr"\1{temp_token}dot{temp_token}\2", t) |
|
|
|
|
|
def post_process_dot_numbers(t): |
|
return re.sub(f"{temp_token}dot{temp_token}", ".", t) |
|
|
|
|
|
def pre_process_quotes(t): |
|
|
|
return re.sub( |
|
r"'(?=([stdm]|(ll)|(re)|(ve)|(ll))\b)", fr"{temp_token}quote{temp_token}", t |
|
) |
|
|
|
|
|
def post_process_quotes(t): |
|
return re.sub(f"{temp_token}quote{temp_token}", "'", t) |
|
|
|
|
|
def pre_process_dates(t): |
|
return re.sub("(\d)/(\d)", fr"\1{temp_token}slash{temp_token}\2", t) |
|
|
|
|
|
def post_process_dates(t): |
|
return re.sub(f"{temp_token}slash{temp_token}", "/", t) |
|
|
|
|
|
def merge_commas(t): |
|
return re.sub("(\s*,+\s*)+", ", ", t) |
|
|
|
|
|
def add_space_after_commas(t): |
|
return re.sub(",", ", ", t) |
|
|
|
|
|
def handle_special_chars(t): |
|
"Handle special characters" |
|
|
|
t = re.sub("(\w)-(\w)", r"\1 \2", t) |
|
|
|
return re.sub("([%&\/$*])", r" \1 ", t) |
|
|
|
|
|
def expand_hashtags(t, hashtag_processor): |
|
"Remove # and try to split words" |
|
return re.sub("#(\w+)", lambda m: hashtag_processor(m.group(1)), t) |
|
|
|
|
|
_re_ignore_chars = r"[_#\\]" |
|
|
|
|
|
def ignore_chars(t): |
|
"Ignore useless characters" |
|
return re.sub(_re_ignore_chars, " ", t) |
|
|
|
|
|
def remove_extra_spaces(t): |
|
"Remove extra spaces (including \t and \n)" |
|
return re.sub("\s+", " ", t) |
|
|
|
|
|
def remove_repeating_chars(t): |
|
"If the same character is present 4+ times (not 3 because of roman 'VIII'), replace with single instance" |
|
return re.sub(r"(\D)(\1{3,})", r"\1", t) |
|
|
|
|
|
def remove_urls(t): |
|
return re.sub(r"http\S+", "", t) |
|
|
|
|
|
def remove_html_tags(t): |
|
return re.sub("<[^<]+?>", "", t) |
|
|
|
|
|
def remove_first_last_commas(t): |
|
t = t.strip() |
|
t = t[:-1] if t and t[-1] == "," else t |
|
t = t[1:] if t and t[0] == "," else t |
|
return t.strip() |
|
|
|
|
|
def remove_wiki_ref(t): |
|
t = re.sub(r"\A\s*\[\d+\]", "", t) |
|
return re.sub(r"\[\d+\]\s*\Z", "", t) |
|
|
|
|
|
class TextNormalizer: |
|
"Normalize text" |
|
|
|
def __init__(self): |
|
self._hashtag_processor = HashtagProcessor() |
|
|
|
def __call__(self, t): |
|
|
|
t = ftfy.fix_text(t) |
|
|
|
t = fix_html(t) |
|
|
|
t = unidecode(t) |
|
|
|
t = t.lower() |
|
|
|
t = replace_person_token(t) |
|
|
|
t = remove_wiki_ref(t) |
|
|
|
t = remove_html_tags(t) |
|
|
|
t = remove_urls(t) |
|
|
|
t = remove_comma_numbers(t) |
|
|
|
t = pre_process_dot_numbers(t) |
|
t = pre_process_quotes(t) |
|
t = pre_process_dates(t) |
|
|
|
t = handle_special_chars(t) |
|
|
|
t = expand_hashtags(t, self._hashtag_processor) |
|
|
|
t = ignore_chars(t) |
|
|
|
t = simplify_quotes(t) |
|
|
|
t = replace_punctuation_with_commas(t) |
|
|
|
t = post_process_dot_numbers(t) |
|
t = post_process_quotes(t) |
|
t = post_process_dates(t) |
|
|
|
t = remove_repeating_chars(t) |
|
|
|
t = merge_quotes(t) |
|
|
|
t = merge_commas(t) |
|
|
|
t = remove_extra_spaces(t) |
|
|
|
t = remove_first_last_commas(t) |
|
|
|
return f" {t}" |
|
|