# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Built-in WideNDeep model classes."""

import tensorflow.compat.v2 as tf

from keras import activations
from keras import backend
from keras import layers as layer_module
from keras.engine import base_layer
from keras.engine import data_adapter
from keras.engine import training as keras_training
from keras.utils import generic_utils

# isort: off
from tensorflow.python.util import deprecation
from tensorflow.python.util.tf_export import keras_export


@keras_export(
    "keras.experimental.WideDeepModel",
    v1=["keras.experimental.WideDeepModel", "keras.models.WideDeepModel"],
)
@deprecation.deprecated_endpoints("keras.experimental.WideDeepModel")
class WideDeepModel(keras_training.Model):
    r"""Wide & Deep Model for regression and classification problems.

    This model jointly train a linear and a dnn model.

    Example:

    ```python
    linear_model = LinearModel()
    dnn_model = keras.Sequential([keras.layers.Dense(units=64),
                                 keras.layers.Dense(units=1)])
    combined_model = WideDeepModel(linear_model, dnn_model)
    combined_model.compile(optimizer=['sgd', 'adam'],
                           loss='mse', metrics=['mse'])
    # define dnn_inputs and linear_inputs as separate numpy arrays or
    # a single numpy array if dnn_inputs is same as linear_inputs.
    combined_model.fit([linear_inputs, dnn_inputs], y, epochs)
    # or define a single `tf.data.Dataset` that contains a single tensor or
    # separate tensors for dnn_inputs and linear_inputs.
    dataset = tf.data.Dataset.from_tensors(([linear_inputs, dnn_inputs], y))
    combined_model.fit(dataset, epochs)
    ```

    Both linear and dnn model can be pre-compiled and trained separately
    before jointly training:

    Example:
    ```python
    linear_model = LinearModel()
    linear_model.compile('adagrad', 'mse')
    linear_model.fit(linear_inputs, y, epochs)
    dnn_model = keras.Sequential([keras.layers.Dense(units=1)])
    dnn_model.compile('rmsprop', 'mse')
    dnn_model.fit(dnn_inputs, y, epochs)
    combined_model = WideDeepModel(linear_model, dnn_model)
    combined_model.compile(optimizer=['sgd', 'adam'],
                           loss='mse', metrics=['mse'])
    combined_model.fit([linear_inputs, dnn_inputs], y, epochs)
    ```

    """

    def __init__(self, linear_model, dnn_model, activation=None, **kwargs):
        """Create a Wide & Deep Model.

        Args:
          linear_model: a premade LinearModel, its output must match the output
            of the dnn model.
          dnn_model: a `tf.keras.Model`, its output must match the output of the
            linear model.
          activation: Activation function. Set it to None to maintain a linear
            activation.
          **kwargs: The keyword arguments that are passed on to
            BaseLayer.__init__. Allowed keyword arguments include `name`.
        """
        super().__init__(**kwargs)
        base_layer.keras_premade_model_gauge.get_cell("WideDeep").set(True)
        self.linear_model = linear_model
        self.dnn_model = dnn_model
        self.activation = activations.get(activation)

    def call(self, inputs, training=None):
        if not isinstance(inputs, (tuple, list)) or len(inputs) != 2:
            linear_inputs = dnn_inputs = inputs
        else:
            linear_inputs, dnn_inputs = inputs
        linear_output = self.linear_model(linear_inputs)

        if self.dnn_model._expects_training_arg:
            if training is None:
                training = backend.learning_phase()
            dnn_output = self.dnn_model(dnn_inputs, training=training)
        else:
            dnn_output = self.dnn_model(dnn_inputs)
        output = tf.nest.map_structure(
            lambda x, y: (x + y), linear_output, dnn_output
        )
        if self.activation:
            return tf.nest.map_structure(self.activation, output)
        return output

    # This does not support gradient scaling and LossScaleOptimizer.
    def train_step(self, data):
        x, y, sample_weight = data_adapter.unpack_x_y_sample_weight(data)
        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)
            loss = self.compiled_loss(
                y, y_pred, sample_weight, regularization_losses=self.losses
            )
        self.compiled_metrics.update_state(y, y_pred, sample_weight)

        if isinstance(self.optimizer, (list, tuple)):
            linear_vars = self.linear_model.trainable_variables
            dnn_vars = self.dnn_model.trainable_variables
            linear_grads, dnn_grads = tape.gradient(
                loss, (linear_vars, dnn_vars)
            )

            linear_optimizer = self.optimizer[0]
            dnn_optimizer = self.optimizer[1]
            linear_optimizer.apply_gradients(zip(linear_grads, linear_vars))
            dnn_optimizer.apply_gradients(zip(dnn_grads, dnn_vars))
        else:
            trainable_variables = self.trainable_variables
            grads = tape.gradient(loss, trainable_variables)
            self.optimizer.apply_gradients(zip(grads, trainable_variables))

        return {m.name: m.result() for m in self.metrics}

    def _make_train_function(self):
        # Only needed for graph mode and model_to_estimator.
        has_recompiled = self._recompile_weights_loss_and_weighted_metrics()
        self._check_trainable_weights_consistency()
        # If we have re-compiled the loss/weighted metric sub-graphs then create
        # train function even if one exists already. This is because
        # `_feed_sample_weights` list has been updated on re-compile.
        if getattr(self, "train_function", None) is None or has_recompiled:
            # Restore the compiled trainable state.
            current_trainable_state = self._get_trainable_state()
            self._set_trainable_state(self._compiled_trainable_state)

            inputs = (
                self._feed_inputs
                + self._feed_targets
                + self._feed_sample_weights
            )
            if not isinstance(backend.symbolic_learning_phase(), int):
                inputs += [backend.symbolic_learning_phase()]

            if isinstance(self.optimizer, (list, tuple)):
                linear_optimizer = self.optimizer[0]
                dnn_optimizer = self.optimizer[1]
            else:
                linear_optimizer = self.optimizer
                dnn_optimizer = self.optimizer

            with backend.get_graph().as_default():
                with backend.name_scope("training"):
                    # Training updates
                    updates = []
                    linear_updates = linear_optimizer.get_updates(
                        params=self.linear_model.trainable_weights,
                        loss=self.total_loss,
                    )
                    updates += linear_updates
                    dnn_updates = dnn_optimizer.get_updates(
                        params=self.dnn_model.trainable_weights,
                        loss=self.total_loss,
                    )
                    updates += dnn_updates
                    # Unconditional updates
                    updates += self.get_updates_for(None)
                    # Conditional updates relevant to this model
                    updates += self.get_updates_for(self.inputs)

                metrics = self._get_training_eval_metrics()
                metrics_tensors = [
                    m._call_result
                    for m in metrics
                    if hasattr(m, "_call_result")
                ]

            with backend.name_scope("training"):
                # Gets loss and metrics. Updates weights at each call.
                fn = backend.function(
                    inputs,
                    [self.total_loss] + metrics_tensors,
                    updates=updates,
                    name="train_function",
                    **self._function_kwargs
                )
                setattr(self, "train_function", fn)

            # Restore the current trainable state
            self._set_trainable_state(current_trainable_state)

    def get_config(self):
        linear_config = generic_utils.serialize_keras_object(self.linear_model)
        dnn_config = generic_utils.serialize_keras_object(self.dnn_model)
        config = {
            "linear_model": linear_config,
            "dnn_model": dnn_config,
            "activation": activations.serialize(self.activation),
        }
        base_config = base_layer.Layer.get_config(self)
        return dict(list(base_config.items()) + list(config.items()))

    @classmethod
    def from_config(cls, config, custom_objects=None):
        linear_config = config.pop("linear_model")
        linear_model = layer_module.deserialize(linear_config, custom_objects)
        dnn_config = config.pop("dnn_model")
        dnn_model = layer_module.deserialize(dnn_config, custom_objects)
        activation = activations.deserialize(
            config.pop("activation", None), custom_objects=custom_objects
        )
        return cls(
            linear_model=linear_model,
            dnn_model=dnn_model,
            activation=activation,
            **config
        )
