# Copyright 2022 DeepMind Technologies Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Didactic example of an autoregressive Transformer-based language model.

Glossary of shapes:
- B: Batch size.
- T: Sequence length.
- D: Model embedding size.
- H: Number of attention heads.
- V: Vocabulary size.

Forked from: haiku.examples.transformer.model
"""

import collections
import dataclasses
from typing import Callable, List, Optional

import chex
import haiku as hk
import jax
import jax.numpy as jnp
import numpy as np
from tracr.transformer import attention

# hk.Modules are not always callable: github.com/deepmind/dm-haiku/issues/52
# Ideally, we'd want a type:
# CallableHaikuModule = Intersection[Callable[..., jax.Array], hk.Module]
# But Intersection does not exist (yet): github.com/python/typing/issues/213
CallableHaikuModule = Callable[..., jax.Array]


@chex.dataclass
class TransformerOutput:
  layer_outputs: List[jax.Array]  # [B, T, D]
  residuals: List[jax.Array]  # [B, T, D]
  attn_logits: List[jax.Array]  # [B, H, T, T]
  output: jax.Array  # [B, T, D]
  input_embeddings: jax.Array  # [B, T, D]


@dataclasses.dataclass
class TransformerConfig:
  num_heads: int
  num_layers: int
  key_size: int
  mlp_hidden_size: int
  dropout_rate: float
  activation_function: Callable[[jax.Array], jax.Array] = jax.nn.gelu
  layer_norm: bool = True
  causal: bool = False


@dataclasses.dataclass
class Transformer(hk.Module):
  """A transformer stack."""

  config: TransformerConfig
  name: Optional[str] = None

  def __call__(
      self,
      embeddings: jax.Array,  # [B, T, D]
      mask: jax.Array,  # [B, T]
      *,
      use_dropout: bool = True,
  ) -> TransformerOutput:
    """Transforms input embedding sequences to output embedding sequences."""

    def layer_norm(x: jax.Array) -> jax.Array:
      """Applies a unique LayerNorm to x with default settings."""
      if self.config.layer_norm:
        return hk.LayerNorm(axis=-1, create_scale=True, create_offset=True)(x)
      return x

    initializer = hk.initializers.VarianceScaling(2 / self.config.num_layers)
    dropout_rate = self.config.dropout_rate if use_dropout else 0.
    _, seq_len, model_size = embeddings.shape

    # Compute causal mask for autoregressive sequence modelling.
    mask = mask[:, None, None, :]  # [B, H=1, T'=1, T]
    mask = mask.repeat(seq_len, axis=2)  # [B, H=1, T, T]

    if self.config.causal:
      causal_mask = np.ones((1, 1, seq_len, seq_len))  # [B=1, H=1, T, T]
      causal_mask = np.tril(causal_mask)
      mask = mask * causal_mask  # [B, H=1, T, T]

    # Set up activation collection.
    collected = collections.defaultdict(list)

    def collect(**kwargs):
      for k, v in kwargs.items():
        collected[k].append(v)

    residual = embeddings
    for layer in range(self.config.num_layers):
      with hk.experimental.name_scope(f"layer_{layer}"):
        # First the attention block.
        attn_block = attention.MultiHeadAttention(
            num_heads=self.config.num_heads,
            key_size=self.config.key_size,
            model_size=model_size,
            w_init=initializer,
            name="attn")
        attn_in = layer_norm(residual)
        attn_out = attn_block(attn_in, attn_in, attn_in, mask=mask)
        attn_out, attn_logits = attn_out.out, attn_out.logits
        if dropout_rate > 0:
          attn_out = hk.dropout(hk.next_rng_key(), dropout_rate, attn_out)
        residual = residual + attn_out

        collect(
            residuals=residual, layer_outputs=attn_out, attn_logits=attn_logits)

        # Then the dense block.
        with hk.experimental.name_scope("mlp"):
          dense_block = hk.Sequential([
              hk.Linear(
                  self.config.mlp_hidden_size,
                  w_init=initializer,
                  name="linear_1"),
              self.config.activation_function,
              hk.Linear(model_size, w_init=initializer, name="linear_2"),
          ])
        dense_in = layer_norm(residual)
        dense_out = dense_block(dense_in)
        if dropout_rate > 0:
          dense_out = hk.dropout(hk.next_rng_key(), dropout_rate, dense_out)
        residual = residual + dense_out

        collect(residuals=residual, layer_outputs=dense_out)

    return TransformerOutput(
        residuals=collected["residuals"],
        layer_outputs=collected["layer_outputs"],
        attn_logits=collected["attn_logits"],
        output=layer_norm(residual),
        input_embeddings=embeddings,
    )


@chex.dataclass
class CompiledTransformerModelOutput:
  transformer_output: TransformerOutput
  unembedded_output: jax.Array  # [B, T]


@dataclasses.dataclass
class CompiledTransformerModel(hk.Module):
  """A transformer model with one-hot embeddings."""
  transformer: Transformer
  token_embed: CallableHaikuModule
  position_embed: CallableHaikuModule
  unembed: CallableHaikuModule
  use_unembed_argmax: bool
  pad_token: Optional[int] = None

  def embed(self, tokens: jax.Array) -> jax.Array:
    token_embeddings = self.token_embed(tokens)
    positional_embeddings = self.position_embed(jnp.indices(tokens.shape)[-1])
    return token_embeddings + positional_embeddings  # [B, T, D]

  def __call__(
      self,
      tokens: jax.Array,
      use_dropout: bool = True,
  ) -> CompiledTransformerModelOutput:
    """Embed tokens, pass through model, and unembed output."""
    if self.pad_token is None:
      input_mask = jnp.ones_like(tokens)
    else:
      input_mask = (tokens != self.pad_token)
    input_embeddings = self.embed(tokens)

    transformer_output = self.transformer(
        input_embeddings,
        input_mask,
        use_dropout=use_dropout,
    )
    return CompiledTransformerModelOutput(
        transformer_output=transformer_output,
        unembedded_output=self.unembed(
            transformer_output.output,
            use_unembed_argmax=self.use_unembed_argmax,
        ),
    )