# TTS Aligner Inference

In this notebook, we will walk through how to perform inference on a **[RAD-TTS Aligner](https://arxiv.org/abs/2108.10447)** checkpoint. This tutorial will cover everything from preprocessing input text and audio to generating token duration predictions and alignments. We will be visualizing and examining these steps as we go.

We will also show an example of how you can use the alignments generated by the text/audio embeddings to perform **phoneme disambiguation** of a word with multiple possible pronunciations.

This tutorial requires an already-trained Aligner checkpoint and a sample from [LJSpeech](https://keithito.com/LJ-Speech-Dataset/). Once an NGC checkpoint is released, it will be updated to use that by default. You should also be able to substitute in your own model checkpoint and samples with the code shown, if you wish.

## License

> Copyright 2022 NVIDIA. All Rights Reserved.
>
> Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
>
> `http://www.apache.org/licenses/LICENSE-2.0`
>
> Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

In [None]:
"""
You can either run this notebook locally (if you have all the dependencies and a GPU) or on Google Colab.
Instructions for setting up Colab are as follows:
1. Open a new Python 3 notebook.
2. Import this notebook from GitHub (File -> Upload Notebook -> "GITHUB" tab -> copy/paste GitHub URL)
3. Connect to an instance with a GPU (Runtime -> Change runtime type -> select "GPU" for hardware accelerator)
4. Run this cell to set up dependencies.
"""
BRANCH = 'r1.17.0'
# # If you're using Colab and not running locally, uncomment and run this cell.
# !apt-get install sox libsndfile1 ffmpeg
# !pip install wget text-unidecode
# !python -m pip install git+https://github.com/NVIDIA/NeMo.git@$BRANCH#egg=nemo_toolkit[all]


We'll need to import some libraries for loading audio, plotting various data, and of course for loading the model.

In [None]:
# Start with some imports so we can visualize alignments and load the checkpoint
%matplotlib inline
import matplotlib.pylab as plt
import IPython.display as ipd

import librosa
import soundfile as sf
import torch

from nemo.collections.tts.models import AlignerModel

## Setup

Let's start by loading the checkpoint from NGC. You can find the model card [here](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/nemo/models/tts_en_radtts_aligner).

In [None]:
# Set device (GPU or CPU)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Load the ARPABET Aligner model checkpoint
aligner = AlignerModel.from_pretrained("tts_en_radtts_aligner")

# This should be set to whatever sample rate your model was trained on
target_sr = 22050

Now we'll load an audio file and input the corresponding transcript. The audio file will be resampled to the `target_sr` given above.

This example uses the first sample from the NVIDIA test split of [LJSpeech](https://keithito.com/LJ-Speech-Dataset/), which is file `LJ023-0089.wav`. You can use whatever you'd like, of course, but this tutorial will refer to this sample specifically for a concrete example.

In [None]:
!wget https://multilangaudiosamples.s3.us-east-2.amazonaws.com/LJ023-0089.wav

In [None]:
# This tutorial uses a sample from the NVIDIA test split of LJSpeech.
audio_path = "./LJ023-0089.wav"
text_raw = "That is not only my accusation."

# Load audio and resample if necessary
audio_data, orig_sr = sf.read(audio_path)
if orig_sr != target_sr:
 audio_data = librosa.core.resample(audio_data, orig_sr=orig_sr, target_sr=target_sr)

# Let's double-check that everything matches up!
print(f"Duration (s): {len(audio_data)/target_sr}")
print("Transcript:")
print(text_raw)
ipd.Audio(audio_data, rate=target_sr)

### Audio Preprocessing

The Aligner model takes in a mel spectrogram as input, so we'll need to convert our audio signal before we can evaluate it. The trained model has a preprocessor that will do this for us once we find the audio data length.

In [None]:
# Retrieve audio length for the model's preprocessor
audio_len = torch.tensor(audio_data.shape[0], device=device).long()

# Need to unsqueeze the audio data and audio_len to simulate a batch size of 1
audio = torch.tensor(audio_data, dtype=torch.float, device=device).unsqueeze(0)
audio_len = torch.tensor(audio_len).unsqueeze(0)
print(f"Audio batch shape: {audio.shape}")
print(f"Audio length shape: {audio_len.shape}\n")

# Generate the spectrogram!
spec, spec_len = aligner.preprocessor(input_signal=audio, length=audio_len)
print(f"Spec batch shape: {spec.shape}")

Let's take a look at the spectrogram to make sure it's been loaded correctly.

In [None]:
# Plot the spectrogram
plt.figure(figsize=(15,5))
_ = plt.pcolormesh(spec[0].cpu().numpy(), cmap='viridis')

If the above looks like a spectrogram, we can move on to text preprocessing.

### Text Preprocessing

Now, we need to preprocess the text to be passed in to the model. This involves normalization, as well as conversion of the words in the transcript to phonemes where possible. OOV words, as well as words with multiple pronunciations, are ignored and kept as graphemes.

Let's take a look at these steps, one at a time.

In [None]:
# First, a standard English normalization of the text.
# We set punct_post_process=True to preserve words with apostrophes, otherwise they get split.
text_normalized = aligner.normalizer.normalize(text_raw, punct_post_process=True)
print(text_normalized)

At this point, we could normally just run the normalized text through the model's `tokenizer`, which would run G2P (grapheme to phoneme) conversion and spit out text tokens to pass into the model directly. But just to illustrate what happens within the tokenizer, let's take a look at its G2P step.

*(If you are writing your own inference script, you can leave the code in this next cell out entirely, as it's purely illustrative.)*

In [None]:
# The intermediate G2P step.
# This part is usually hidden behind just calling `tokenizer()`, but we show it here to illustrate what happens.
text_g2p = aligner.tokenizer.g2p(text_normalized)
print(text_g2p)
print(f"Length: {len(text_g2p)}")

We can see that some words have been converted to phonemes (e.g. "not" turned into `["N", "AA1", "T"]`), while some have stayed as graphemes (e.g. "that" is still `["t", "h", "a", "t"]`). As mentioned above, this is because any words with unique and known pronunciations are converted, but other words may have multiple possible pronunciations. CMUdict lists three for "that": `"DH AE1 T"`/`"DH AH0 T"`.

The next cell shows what we'd normally run right after we normalize the text. This gets us our text tokens.

In [None]:
# The tokenizer runs G2P and then encodes each token.
text_tokens = aligner.tokenizer(text_normalized)
print(text_tokens)
print(f"Length: {len(text_tokens)}")

# We need these to be torch tensors with a batch dimension before passing them in as input, of course
text = torch.tensor(text_tokens, device=device).unsqueeze(0).long()
text_len = torch.tensor(len(text_tokens), device=device).unsqueeze(0).long()
print("\nAfter unsqueezing...")
print(f"Text input shape: {text.shape}")
print(f"Text length shape: {text_len.shape}")

The length increases by 2 if `pad_with_space` was set for the model, which it was for this checkpoint. For ease of lining the results up later, let's update `text_g2p` to reflect this.

In [None]:
# Update text_g2p with spaces
text_g2p.insert(0, ' ')
text_g2p.insert(len(text_g2p), ' ')

Now we have our audio data and encoded text!

## Inference: Alignments and Phoneme Disambiguation

Now that we have the audio and tokenized text, we can pass it through the trained model and get an alignment between the two inputs.

In [None]:
# Run the aligner!
with torch.no_grad():
 attn_soft_tensor, attn_logprob_tensor = aligner(spec=spec, spec_len=spec_len, text=text, text_len=text_len)

# "Unbatch" the results
attn_soft = attn_soft_tensor[0, 0, :, :].data.cpu().numpy()
attn_logprob = attn_logprob_tensor[0, 0, :, :].data.cpu().numpy()

print(f"Dimensions should be (spec_len={spec_len[0].data}, text_len={text_len[0].data}) for both:")
print(f"Soft attention matrix shape: {attn_soft.shape}")
print(f"Log prob matrix shape: {attn_logprob.shape}")

### Visualizing the Alignments

Now that we have the soft alignments, we can take a look at how the model matches up text tokens and audio input based on the attention matrix generated. This should roughly be a **monotonically decreasing diagonal line** (towards the bottom right).

In the following cell, we transpose the **soft attention matrix** before plotting it in order to show it more "naturally," that is, with the text along the vertical edge (Y-axis) and an increase in the X-axis (left-to-right) value corresponding with moving forward in time through the spectrogram.

In [None]:
# Visualize soft attention matrix.
fig, ax = plt.subplots(figsize=(12,5))
_ = ax.imshow(attn_soft.transpose(), origin='upper', aspect='auto')
_ = ax.set_yticks(range(len(text_g2p)))
_ = ax.set_yticklabels(text_g2p) # To show the text labels

The above is a soft attention matrix, so we can see that it is somewhat noisy.

We can calculate a **hard attention matrix** to get more concrete predictions for the durations of each grapheme/phoneme. The next plot should be much sharper. We'll show the spectrogram again so we get a rough idea of what alignments match up with what spectrogram features.

In [None]:
# Import helper function to calculate hard attention
from nemo.collections.tts.parts.utils.helpers import binarize_attention

attn_hard_tensor = binarize_attention(attn_soft_tensor, text_len, spec_len)
attn_hard = attn_hard_tensor[0, 0, :, :].data.cpu().numpy()
print(f"Hard attention matrix shape: {attn_hard.shape}") # This should be the same as the soft attn matrix shape!

# Now, let's plot the hard attention matrix.
fig, ax = plt.subplots(2, 1, figsize=(12,10))
_ = ax[0].imshow(attn_hard.transpose(), origin='upper', aspect='auto')
_ = ax[0].set_yticks(range(len(text_g2p)))
_ = ax[0].set_yticklabels(text_g2p) # To show the text labels

# This is the same spectrogram as before, but we show it here just for comparison
_ = ax[1].pcolormesh(spec[0].cpu().numpy(), cmap='viridis')

### Calculating Token Durations

To get the duration (in frames) of each token, we would get the hard attention matrix, then sum up the number of frames that correspond to each token. Luckily, there is a function in the Aligner's encoder module that does this for us!

In [None]:
# Call function to calculate each token's duration in frames
durations = aligner.alignment_encoder.get_durations(attn_soft_tensor, text_len, spec_len).int()

# Let's match them up. (We strip out the first and last duration due to zero-padding.)
durations_sum = 0
for t,d in zip(text_g2p, durations[0]):
 print(f"'{t}' duration: {d}")
 durations_sum += d

# The following should be equal.
print(f"Total number of frames: {spec_len.item()}")
print(f"Sum of durations: {durations_sum}")

### Phoneme Disambiguation via Embedding Distance

Remember how some words were not converted in the G2P step, and were kept as graphemes because they had multiple possible pronunciations? It turns out that we can also use a trained Aligner model to make predictions for phoneme disambiguation!

We can do this by:

1. Generating **one text input per possible pronunciation** (e.g. one sentence with `"DH AE1 T"` and one with `"DH AH0 T"`)
2. **Running inference** on each (with the same spectrogram)
3. Calculating the **distance between the text/spectrogram embeddings**
4. Seeing **which disambiguation tokens are closer to the spectrogram**, as determined by the model.

---

Let's get started with our example! As a reminder, the original sentence we've used for this tutorial is:
```
That is not only my accusation.
```

In this sentence, "that," "is," and "accusation" have multiple entries in CMUdict. Each has two possible pronunciations, which means to disambiguate everything, we'd use six inputs:
```
# Disambiguate "that":
DH AE1 T i s N AA1 T OW1 N L IY0 M AY1 a c c u s a t i o n .
DH AH0 T i s N AA1 T OW1 N L IY0 M AY1 a c c u s a t i o n .

# Disambiguate "is":
t h a t IH1 Z N AA1 T OW1 N L IY0 M AY1 a c c u s a t i o n .
t h a t IH0 Z N AA1 T OW1 N L IY0 M AY1 a c c u s a t i o n .

# Disambiguate "accusation":
t h a t i s N AA1 T OW1 N L IY0 M AY1 AE2 K Y AH0 Z EY1 SH AH0 N .
t h a t i s N AA1 T OW1 N L IY0 M AY1 AE2 K Y UW0 Z EY1 SH AH0 N .
```

For brevity's sake, let's just disambiguate the word `that`. To create our two candidate inputs, we'll use the `text_g2p` that we generated earlier but cut out the letters from "that" and replace them with our possible pronunciations, then run them through the `EnglishPhonemesTokenizer`'s `encode_from_g2p()` function.

In [None]:
### (1) Generate one text input per possible pronunciation

# Construct our two candidate sentences by replacing "t" "h" "a" "t" with two phonemic possibilities
that1 = ["DH", "AE1", "T"]
that2 = ["DH", "AH0", "T"]
pron1_g2p = that1 + text_g2p[5:-1] # Chop off trailing space, the tokenizer will add it
pron2_g2p = that2 + text_g2p[5:-1] # Ditto.
print("=== Text ===")
print(pron1_g2p)
print(pron2_g2p)

# Tokenize!
pron1_tokens = aligner.tokenizer.encode_from_g2p(pron1_g2p)
pron2_tokens = aligner.tokenizer.encode_from_g2p(pron2_g2p)
print("\n=== Tokens===")
print(pron1_tokens)
print(pron2_tokens)

# Create a batch
disamb_text = torch.tensor([pron1_tokens, pron2_tokens], device=device).long()
disamb_text_len = torch.tensor([len(pron1_tokens), len(pron2_tokens)], device=device).long()
print("\n=== Text/Text Length Tensor Shapes ===")
print(disamb_text.shape)
print(disamb_text_len.shape)

And again, we'll insert a space at the beginning and a space at the end because `pad_with_space` is set to True in the tokenizer.

In [None]:
# Insert spaces to reflect the padded token vector
pron1_g2p.insert(0, ' ')
pron1_g2p.insert(len(pron1_g2p), ' ')
print(len(pron1_g2p))

pron2_g2p.insert(0, ' ')
pron2_g2p.insert(len(pron2_g2p), ' ')
print(len(pron2_g2p))

Note that we have picked an example where both disambiguations have the same tokenized length (which will be the case most of the time). If you have a case where the two pronunciations have different lengths, you may need to perform some padding to get the batch to line up.

Let's run inference on the new inputs. These two text inputs are candidates for the same spectrogram, so we'll duplicate the spectrogram input.

In [None]:
### (2) Run inference on each candidate

# Duplicate spec and spec_len to match the two text inputs
spec_2 = spec.repeat([2, 1, 1])
spec_len_2 = spec_len.repeat([2])

# Inference with two inputs
with torch.no_grad():
 disamb_attn_soft_tensor, _ = aligner(
 spec=spec_2,
 spec_len=spec_len_2,
 text=disamb_text,
 text_len=disamb_text_len
 )

# "Unbatch" the results
disamb_attn_soft = disamb_attn_soft_tensor[:, 0, :, :].data.cpu().numpy()
print(f"Dimensions should be (2, spec_len={spec_len_2[0].data}, text_len={max(disamb_text_len.data)}):")
print(f"Soft attention matrix shape: {disamb_attn_soft.shape}")

Next, we retrieve the L2 distance matrix between each text embedding and its corresponding spectrogram embedding. There is an alignment encoder function called `get_dist()` that will calculate $(\texttt{text_emb[i]} - \texttt{spec_emb[j]})^2$ for all pairs of text tokens and spectrogram timesteps, and we can get the L2 distance matrix by square-rooting those values.

(Note that darker = smaller distance, so we should see a dark diagonal of a similar shape to the lines above.)

In [None]:
### (3) Calculate distance between text and spectrogram embeddings for each candidate

# Housekeeping: we first need to get the text embedding from the Aligner encoder
disamb_text_embs = aligner.embed(disamb_text).transpose(1,2)

# Run the Aligner encoder to get the distances between the key (text) and query (spectrogram) embeddings.
square_dists = aligner.alignment_encoder.get_dist(keys=disamb_text_embs, queries=spec_2)
l2_dists = square_dists.sqrt()

# We can plot the L2 distances now
l2_dists_data = l2_dists.data.cpu().numpy()
fig, ax = plt.subplots(2, 1, figsize=(12,10))

# Here, we trim the first and last time steps (the zero-padding)
_ = ax[0].imshow(l2_dists_data[0, 1:-1].transpose(), origin='upper', aspect='auto')
_ = ax[0].set_yticks(range(len(pron1_g2p)))
_ = ax[0].set_yticklabels(pron1_g2p) # To show the text labels
_ = ax[0].set_title("\"DH AE1 T\" Candidate - Embedding L2 Distance Matrix")

_ = ax[1].imshow(l2_dists_data[1, 1:-1].transpose(), origin='upper', aspect='auto')
_ = ax[1].set_yticks(range(len(pron2_g2p)))
_ = ax[1].set_yticklabels(pron2_g2p)
_ = ax[1].set_title("\"DH AH0 T\" Candidate - Embedding L2 Distance Matrix")

The last step is to calculate the average distance between the text tokens for "that" and their corresponding audio frames. **We expect that the candidate pronunciation that's the closest to the audio should be the most representative of the actual speech.**

To do this, we need to get each token's durations, which will let us isolate only the (predicted) frames that correspond to `DH AE1 T` and `DH AH0 T` respectively. Then, the Aligner's encoder has a function called `get_mean_distance_for_word()` that will calculate the average distance over the frames corresponding only to the tokens in the word.

In [None]:
### (4) Check which disambiguation is closer to the speech

# Get durations like before; the batch size of 2 shouldn't change how we call the function.
disamb_durations = aligner.alignment_encoder.get_durations(
 disamb_attn_soft_tensor,
 disamb_text_len,
 spec_len_2
).int()

# Retrieve the average embedding distances for each pronunciation of "that"
that1_mean_dist = aligner.alignment_encoder.get_mean_distance_for_word(
 l2_dists=l2_dists[0],
 durs=disamb_durations[0],
 start_token=1, # Remember to account for space padding
 num_tokens=len(that1)
)
that2_mean_dist = aligner.alignment_encoder.get_mean_distance_for_word(
 l2_dists=l2_dists[1],
 durs=disamb_durations[1],
 start_token=1, # Here as well
 num_tokens=len(that2)
)

print(f"Average distance for {that1}: {that1_mean_dist}")
print(f"Average distance for {that2}: {that2_mean_dist}")

And we're done!

**With the average distance for `DH AE1 T` being about 377 and the average distance for `DH AH0 T` being about 403, we can pick `DH AE1 T` as the better match.**

As an exercise, try editing the blocks of code above to disambiguate "accusation" (`AE2 K Y AH0 Z EY1 SH AH0 N` versus `AE2 K Y UW0 Z EY1 SH AH0 N`).

#### Addendum

As a side note, there is also a function called `get_mean_dist_by_durations()` that will match up the distances between each token and its corresponding spectrogram frame (using the previously-calculated durations), then calculate the mean over the batch.

The whole-sentence average may not tell us very much here because we just want to know which pronunciation is closest to what's being said for a specific word, but it's there if you need it as an extra metric!

In [None]:
# Mean distance over the batches:
mean_dists = aligner.alignment_encoder.get_mean_dist_by_durations(
 dist=l2_dists.to('cpu'),
 durations=disamb_durations.to('cpu')
)
print(mean_dists)

## Resources

- For more information about the Aligner architecture, check out the [RAD-TTS Aligner paper](https://arxiv.org/abs/2108.10447).
- If you would like to run disambiguation on a large batch of sentences, try out the [Aligner disambiguation example script](https://github.com/NVIDIA/NeMo/blob/main/examples/tts/aligner_heteronym_disambiguation.py).