# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Framework of debug wrapper sessions.

A debug wrapper session is a wrapper around a TensorFlow Python Session.
The wrapper preserves the Session interface, most importantly the run() method,
while providing abilities to:
a) Intercept a run() call to a wrapped session and insert debug tensor watches
   according to externally-specified debug URLs.

b) Release control to an external (i.e., non-Session) object before and after
   the run() call, so that the external object can perform actions such as
   launching a UI to let users inspect the intermediate tensors and partition
   graphs from the run() call.

c) (To be implemented in a future CL) Enter an instruction loop to let an
   external object (e.g., remote client) launch run() and cont() calls
   remotely.

*** The lifetime of a debug wrapper session: ***

1) The wrapper session is created by calling the constructor with a
   wrapped (normal) session as the argument:
     wrapper = FooDebugWrapperSession(sess)
   wherein FooDebugWrapperSession is a concrete subclass implementing the
   abstract BaseDebugWrapperSession class below.

2) Near the end of the constructor call, the on_session_init() callback is
   invoked, with a OnSessionInitRequest object as the argument. The object
   carries the wrapped (normal) session object.

3) The callback handles the request and returns a OnSessionInitResponse
   object with an action field, directing the wrapper session what to do next.

If the action field in the OnSessionInitResponse is PROCEED, the constructor
returns. Control is released back to the caller of the constructor, which can
invoke run() method of wrapper session with the same syntax as a non-wrapped
session, e.g.,:
  wrapper.run(fetches, feed_dict=feeds, options=run_options)

Below, A1 - A2 is the lifetime of a wrapper run() call if the action is
PROCEED:

A1) Right at the start of each run() call, the on_run_start() callback is
    invoked, with an OnRunStartRequest object carrying information such as
    the fetches, the feed dict, the run options and run metadata used in
    this run call, along with a count of how many run calls has occurred
    on this wrapper session. The callback then returns an OnRunStartResponse
    object, of which the action field directs what the wrapper session
    actually will do of the run() call.

    If the action is DEBUG_RUN, a debugged (tensor-watched) run will ensue,
    with the debug URLs supplied in the debug_urls field of the response.
    These can be file:// or grpc:// URLs, for example.

    If the action is NON_DEBUG_RUN, a non-debug (normal) run will ensue.

A2) Right before the run() returns, the on_run_end() callback is invoked,
    with an OnRunEndRequest object as the argument, which carries information
    including the actual action performed in the wrapper run() call and the
    run_metadata from the run() call.

However, if the action field in OnSessionInitResponse is
REMOTE_INSTR_LOOP, the constructor will automatically invoke an instruction loop
that gives the control to a remote caller.

In the remote instruction loop, the following steps will happen:

B1) Callback on_instr_start() is invoked. The callback will return an
    OnInstrStartResponse object with an action field which can order one of
    the following actions:
        i) a run() call with fetches, feeds and debug_urls specified.
       ii) exit the instruction loop.

B2) The wrapper session carries out the action specified above.

B3) If still in the instruction loop, the wrapper session invokes the
    on_instr_end() callback. After the on_instr_end() callback returns, jump
    back to B1.

TODO(cais): Implemented the instruction loop in B1 - B3.

"""

import abc
import re
import threading

import six

from tensorflow.core.protobuf import config_pb2
from tensorflow.python.client import session
from tensorflow.python.debug.lib import debug_utils
from tensorflow.python.framework import errors
from tensorflow.python.framework import ops
from tensorflow.python.platform import tf_logging
from tensorflow.python.training import monitored_session
from tensorflow.python.util import nest
from tensorflow.python.util.compat import collections_abc


# Helper function.
def _check_type(obj, expected_types):
  """Check if an object is of the expected type.

  Args:
    obj: The object being checked.
    expected_types: (`type` or an iterable of `type`s) The expected `type`(s)
      of obj.

  Raises:
      TypeError: If obj is not an instance of expected_type.
  """
  if not isinstance(obj, expected_types):
    raise TypeError("Expected type %s; got type %s" %
                    (expected_types, type(obj)))


class OnSessionInitRequest(object):
  """Request to an on-session-init callback.

  This callback is invoked during the __init__ call to a debug-wrapper session.
  """

  def __init__(self, sess):
    """Constructor.

    Args:
      sess: A tensorflow Session object.
    """

    _check_type(sess, (session.BaseSession, monitored_session.MonitoredSession))
    self.session = sess


class OnSessionInitAction(object):
  """Enum-like values for possible action to take on session init."""

  # Proceed, without special actions, in the wrapper session initialization.
  # What action the wrapper session performs next is determined by the caller
  # of the wrapper session. E.g., it can call run().
  PROCEED = "proceed"

  # Instead of letting the caller of the wrapper session determine what actions
  # the wrapper session will perform next, enter a loop to receive instructions
  # from a remote client.
  # For example, TensorBoard visual debugger can use this action so that it can
  # launch session.run() calls remotely.
  REMOTE_INSTR_LOOP = "remote_instr_loop"


class OnSessionInitResponse(object):
  """Response from an on-session-init callback."""

  def __init__(self, action):
    """Constructor.

    Args:
      action: (`OnSessionInitAction`) Debugger action to take on session init.
    """
    _check_type(action, str)
    self.action = action


class OnRunStartRequest(object):
  """Request to an on-run-start callback.

  This callback is invoked during a run() call of the debug-wrapper
  session, immediately after the run() call counter is incremented.
  """

  def __init__(self, fetches, feed_dict, run_options, run_metadata,
               run_call_count, is_callable_runner=False):
    """Constructor of `OnRunStartRequest`.

    Args:
      fetches: Fetch targets of the run() call.
      feed_dict: The feed dictionary to the run() call.
      run_options: RunOptions input to the run() call.
      run_metadata: RunMetadata input to the run() call.
        The above four arguments are identical to the input arguments to the
        run() method of a non-wrapped TensorFlow session.
      run_call_count: 1-based count of how many run calls (including this one)
        has been invoked.
      is_callable_runner: (bool) whether a runner returned by
        Session.make_callable is being run.
    """
    self.fetches = fetches
    self.feed_dict = feed_dict
    self.run_options = run_options
    self.run_metadata = run_metadata
    self.run_call_count = run_call_count
    self.is_callable_runner = is_callable_runner


class OnRunStartAction(object):
  """Enum-like values for possible action to take on start of a run() call."""

  # Run once with debug tensor-watching.
  DEBUG_RUN = "debug_run"

  # Run once with profiler.
  PROFILE_RUN = "profile_run"

  # Run without debug tensor-watching.
  NON_DEBUG_RUN = "non_debug_run"



class OnRunStartResponse(object):
  """Request from an on-run-start callback.

  The caller of the callback can use this response object to specify what
  action the debug-wrapper session actually takes on the run() call.
  """

  def __init__(self,
               action,
               debug_urls,
               debug_ops="DebugIdentity",
               node_name_regex_allowlist=None,
               op_type_regex_allowlist=None,
               tensor_dtype_regex_allowlist=None,
               tolerate_debug_op_creation_failures=False):
    """Constructor of `OnRunStartResponse`.

    Args:
      action: (`OnRunStartAction`) the action actually taken by the wrapped
        session for the run() call.
      debug_urls: (`list` of `str`) debug_urls used in watching the tensors
        during the run() call.
      debug_ops: (`str` or `list` of `str`) Debug op(s) to be used by the
        debugger.
      node_name_regex_allowlist: Regular-expression allowlist for node
        name.
      op_type_regex_allowlist: Regular-expression allowlist for op type.
      tensor_dtype_regex_allowlist: Regular-expression allowlist for tensor
        dtype.
      tolerate_debug_op_creation_failures: Whether debug op creation failures
        are to be tolerated.
    """

    _check_type(action, str)
    self.action = action

    _check_type(debug_urls, list)
    self.debug_urls = debug_urls

    self.debug_ops = debug_ops

    self.node_name_regex_allowlist = node_name_regex_allowlist
    self.op_type_regex_allowlist = op_type_regex_allowlist
    self.tensor_dtype_regex_allowlist = tensor_dtype_regex_allowlist
    self.tolerate_debug_op_creation_failures = (
        tolerate_debug_op_creation_failures)


class OnRunEndRequest(object):
  """Request to an on-run-end callback.

  The callback is invoked immediately before the wrapped run() call ends.
  """

  def __init__(self,
               performed_action,
               run_metadata=None,
               client_graph_def=None,
               tf_error=None):
    """Constructor for `OnRunEndRequest`.

    Args:
      performed_action: (`OnRunStartAction`) Actually-performed action by the
        debug-wrapper session.
      run_metadata: run_metadata output from the run() call (if any).
      client_graph_def: (GraphDef) GraphDef from the client side, i.e., from
        the python front end of TensorFlow. Can be obtained with
        session.graph.as_graph_def().
      tf_error: (errors.OpError subtypes) TensorFlow OpError that occurred
        during the run (if any).
    """

    _check_type(performed_action, str)
    self.performed_action = performed_action

    if run_metadata is not None:
      _check_type(run_metadata, config_pb2.RunMetadata)
    self.run_metadata = run_metadata
    self.client_graph_def = client_graph_def
    self.tf_error = tf_error


class OnRunEndResponse(object):
  """Response from an on-run-end callback."""

  def __init__(self):

    # Currently only a placeholder.
    pass


@six.add_metaclass(abc.ABCMeta)
class BaseDebugWrapperSession(session.SessionInterface):
  """Base class of debug-wrapper session classes.

  Concrete classes that inherit from this class need to implement the abstract
  methods such as on_session_init, on_run_start and on_run_end.
  """

  def __init__(self, sess, thread_name_filter=None,
               pass_through_operrors=False):
    """Constructor of `BaseDebugWrapperSession`.

    Args:
      sess: An (unwrapped) TensorFlow session instance. It should be a subtype
        of `BaseSession` or `tf.MonitoredSession`.
      thread_name_filter: Regular-expression filter (allowlist) for name(s) of
        thread(s) on which the wrapper session will be active. This regular
        expression is used in a start-anchored fashion on the thread name, i.e.,
        by applying the `match` method of the compiled pattern. The default
        `None` means that the wrapper session will be active on all threads.
        E.g., r"MainThread$", r"QueueRunnerThread.*".
      pass_through_operrors: If True, all captured OpErrors will be
        propagated.  By default this captures all OpErrors.

    Raises:
      ValueError: On invalid `OnSessionInitAction` value.
      NotImplementedError: If a non-DirectSession sess object is received.
    """

    _check_type(sess, (session.BaseSession, monitored_session.MonitoredSession))

    # The session being wrapped.
    self._sess = sess
    self._thread_name_filter_pattern = (re.compile(thread_name_filter)
                                        if thread_name_filter else None)
    # TODO(cais/kstevens): Unittest this pass through feature.
    self._pass_through_operrors = pass_through_operrors

    # Keeps track of number of run calls that have been performed on this
    # debug-wrapper session. The count can be used for purposes such as
    # displaying the state of the Session in a UI and determining a run
    # number-dependent debug URL.
    self._run_call_count = 0

    # Invoke on-session-init callback.
    response = self.on_session_init(OnSessionInitRequest(self._sess))
    _check_type(response, OnSessionInitResponse)

    if response.action == OnSessionInitAction.PROCEED:
      pass
    elif response.action == OnSessionInitAction.REMOTE_INSTR_LOOP:
      # TODO(cais): Implement REMOTE_INSTR_LOOP
      raise NotImplementedError(
          "OnSessionInitAction REMOTE_INSTR_LOOP has not been "
          "implemented.")
    else:
      raise ValueError(
          "Invalid OnSessionInitAction value: %s" % response.action)

    self._default_session_context_manager = None

    # A cache for callables created from CallableOptions.
    self._cached_callables_from_options = {}

  @property
  def graph(self):
    return self._sess.graph

  @property
  def graph_def(self):
    return self._sess.graph_def

  @property
  def sess_str(self):
    return self._sess.sess_str

  @property
  def session(self):
    return self._sess

  def run(self,
          fetches,
          feed_dict=None,
          options=None,
          run_metadata=None,
          callable_runner=None,
          callable_runner_args=None,
          callable_options=None):
    """Wrapper around Session.run() that inserts tensor watch options.

    Args:
      fetches: Same as the `fetches` arg to regular `Session.run()`.
      feed_dict: Same as the `feed_dict` arg to regular `Session.run()`.
      options: Same as the `options` arg to regular `Session.run()`.
      run_metadata: Same as the `run_metadata` arg to regular `Session.run()`.
      callable_runner: A `callable` returned by `Session.make_callable()`.
        If not `None`, `fetches` and `feed_dict` must both be `None`.
        Mutually exclusive with `callable_options`.
      callable_runner_args: An optional list of arguments to `callable_runner`
        or for `callable_options`.
      callable_options: An instance of `config_pb2.CallableOptions`, to be
        used with `Session._make_callable_from_options()`. Mutually exclusive
        with `callable_runner`.

    Returns:
      Simply forwards the output of the wrapped `Session.run()` call.

    Raises:
      ValueError: On invalid `OnRunStartAction` value. Or if `callable_runner`
        is not `None` and either or both of `fetches` and `feed_dict` is `None`.
    """
    if callable_runner and callable_options:
      raise ValueError(
          "callable_runner and callable_options are mutually exclusive, but "
          "are both specified in this call to BaseDebugWrapperSession.run().")

    if callable_runner and (fetches or feed_dict):
      raise ValueError(
          "callable_runner and fetches/feed_dict are mutually exclusive, "
          "but are used simultaneously.")
    elif callable_options and (fetches or feed_dict):
      raise ValueError(
          "callable_options and fetches/feed_dict are mutually exclusive, "
          "but are used simultaneously.")

    self.increment_run_call_count()

    def is_empty(x):
      """Check whether a possibly nested structure is empty."""
      if not nest.is_nested(x):
        return False
      if isinstance(x, collections_abc.Mapping):
        return is_empty(list(x.values()))
      for item in x:
        if not is_empty(item):
          return False
      return True

    empty_fetches = is_empty(fetches)
    if empty_fetches:
      tf_logging.info(
          "Due to empty fetches, tfdbg Session wrapper is letting a "
          "Session.run pass through without any debugging actions.")
    if self._is_disabled_thread() or empty_fetches:
      if callable_runner:
        return callable_runner(*callable_runner_args)
      elif callable_options:
        # pylint:disable=protected-access
        return self._sess._make_callable_from_options(
            callable_options)(*callable_runner_args)
        # pylint:enable=protected-access
      else:
        return self._sess.run(fetches,
                              feed_dict=feed_dict,
                              options=options,
                              run_metadata=run_metadata)

    # Invoke on-run-start callback and obtain response.
    run_start_resp = self.on_run_start(
        OnRunStartRequest(fetches, feed_dict, options, run_metadata,
                          self._run_call_count,
                          is_callable_runner=bool(callable_runner)))
    _check_type(run_start_resp, OnRunStartResponse)

    if run_start_resp.action == OnRunStartAction.DEBUG_RUN:
      retvals, run_end_req = self._run_with_debugging(
          run_start_resp, fetches, feed_dict, options, run_metadata,
          callable_runner, callable_runner_args, callable_options)
    elif run_start_resp.action == OnRunStartAction.PROFILE_RUN:
      retvals, run_end_req = self._run_with_profiling(
          run_start_resp, fetches, feed_dict, options, run_metadata,
          callable_runner, callable_runner_args, callable_options)
    elif run_start_resp.action == OnRunStartAction.NON_DEBUG_RUN:
      # Invoke run() method of the wrapped session.
      if callable_runner:
        retvals = callable_runner(*callable_runner_args)
      elif callable_options:
        # pylint:disable=protected-access
        callable_object = self._sess._make_callable_from_options(
            callable_options)
        # pylint:enable=protected-access
        retvals = callable_object(*callable_runner_args)
      else:
        retvals = self._sess.run(
            fetches,
            feed_dict=feed_dict,
            options=options,
            run_metadata=run_metadata)

      # Prepare arg for the on-run-end callback.
      run_end_req = OnRunEndRequest(run_start_resp.action)
    else:
      raise ValueError(
          "Invalid OnRunStartAction value: %s" % run_start_resp.action)

    # Invoke on-run-end callback and obtain response.
    run_end_resp = self.on_run_end(run_end_req)
    _check_type(run_end_resp, OnRunEndResponse)
    # Currently run_end_resp is only a placeholder. No action is taken on it.

    return retvals

  def _run_with_debugging(self,
                          run_start_resp,
                          fetches,
                          feed_dict,
                          options,
                          run_metadata,
                          callable_runner,
                          callable_runner_args,
                          callable_options):
    """Perform a session.run() or callable with debugging."""
    # Decorate RunOption to fill in debugger tensor watch specifications.
    decorated_run_options = None
    if callable_options:
      callable_options_id = id(callable_options)
      if callable_options_id not in self._cached_callables_from_options:
        # Make a copy of callable_options to avoid mutating it.
        new_callable_options = config_pb2.CallableOptions()
        new_callable_options.CopyFrom(callable_options)
        decorated_run_options = new_callable_options.run_options
    else:
      decorated_run_options = options or config_pb2.RunOptions()

    run_metadata = run_metadata or config_pb2.RunMetadata()

    if decorated_run_options:
      self._decorate_run_options_for_debug(
          decorated_run_options,
          run_start_resp.debug_urls,
          debug_ops=run_start_resp.debug_ops,
          node_name_regex_allowlist=(run_start_resp.node_name_regex_allowlist),
          op_type_regex_allowlist=run_start_resp.op_type_regex_allowlist,
          tensor_dtype_regex_allowlist=(
              run_start_resp.tensor_dtype_regex_allowlist),
          tolerate_debug_op_creation_failures=(
              run_start_resp.tolerate_debug_op_creation_failures))

    # Invoke the run() method of the wrapped Session. Catch any TensorFlow
    # runtime errors.
    tf_error = None
    try:
      if callable_runner:
        retvals = callable_runner(*callable_runner_args,
                                  options=decorated_run_options,
                                  run_metadata=run_metadata)
      elif callable_options:
        # pylint:disable=protected-access
        if callable_options_id in self._cached_callables_from_options:
          callable_object = self._cached_callables_from_options[
              callable_options_id]
        else:
          callable_object = self._sess._make_callable_from_options(
              new_callable_options)
          self._cached_callables_from_options[
              callable_options_id] = callable_object
        # pylint:enable=protected-access
        retvals = callable_object(
            *callable_runner_args, run_metadata=run_metadata)
      else:
        retvals = self._sess.run(fetches,
                                 feed_dict=feed_dict,
                                 options=decorated_run_options,
                                 run_metadata=run_metadata)
    except errors.OpError as op_error:
      if self._pass_through_operrors:
        raise op_error
      tf_error = op_error
      retvals = op_error

    return retvals, OnRunEndRequest(
        run_start_resp.action,
        run_metadata=run_metadata,
        client_graph_def=self._sess.graph.as_graph_def(),
        tf_error=tf_error)

  def _run_with_profiling(self,
                          run_start_resp,
                          fetches,
                          feed_dict,
                          options,
                          run_metadata,
                          callable_runner,
                          callable_runner_args,
                          callable_options):
    """Perform a session.run() or callable with profiling."""
    # Decorate RunOption to fill in debugger tensor watch specifications.
    decorated_run_options = None
    if callable_options:
      callable_options_id = id(callable_options)
      if callable_options_id not in self._cached_callables_from_options:
        # Make a copy of callable_options to avoid mutating it.
        new_callable_options = config_pb2.CallableOptions()
        new_callable_options.CopyFrom(callable_options)
        decorated_run_options = new_callable_options.run_options
    else:
      decorated_run_options = options or config_pb2.RunOptions()
    self._decorate_run_options_for_profile(decorated_run_options)

    run_metadata = run_metadata or config_pb2.RunMetadata()
    if callable_runner:
      retvals = callable_runner(*callable_runner_args,
                                options=decorated_run_options,
                                run_metadata=run_metadata)
    elif callable_options:
      # pylint:disable=protected-access
      callable_object = self._sess._make_callable_from_options(
          new_callable_options)
      # pylint:enable=protected-access
      retvals = callable_object(
          *callable_runner_args, run_metadata=run_metadata)
    else:
      retvals = self._sess.run(fetches,
                               feed_dict=feed_dict,
                               options=decorated_run_options,
                               run_metadata=run_metadata)
    return retvals, OnRunEndRequest(
        run_start_resp.action,
        run_metadata=run_metadata,
        client_graph_def=self._sess.graph.as_graph_def())

  def _is_disabled_thread(self):
    thread_name = threading.current_thread().name or ""
    return (self._thread_name_filter_pattern and
            not self._thread_name_filter_pattern.match(thread_name))

  def run_step_fn(self, step_fn):
    return step_fn(
        monitored_session.MonitoredSession.StepContext(self._sess, self.run))

  def partial_run_setup(self, fetches, feeds=None):
    """Sets up the feeds and fetches for partial runs in the session."""
    raise NotImplementedError(
        "partial_run_setup is not implemented for debug-wrapper sessions.")

  def partial_run(self, handle, fetches, feed_dict=None):
    raise NotImplementedError(
        "partial_run is not implemented for debug-wrapper sessions.")

  def list_devices(self, *args, **kwargs):
    return self._sess.list_devices(*args, **kwargs)

  def reset(self, *args, **kwargs):
    return self._sess.reset(*args, **kwargs)

  def make_callable(self,
                    fetches,
                    feed_list=None,
                    accept_options=False):
    runner = self._sess.make_callable(
        fetches, feed_list=feed_list, accept_options=True)
    def wrapped_runner(*runner_args, **kwargs):
      return self.run(None,
                      feed_dict=None,
                      options=kwargs.get("options", None),
                      run_metadata=kwargs.get("run_metadata", None),
                      callable_runner=runner,
                      callable_runner_args=runner_args)
    return wrapped_runner

  def _make_callable_from_options(self, callable_options):
    def wrapped_runner(*feed_values, **kwargs):
      return self.run(None,
                      run_metadata=kwargs.get("run_metadata", None),
                      callable_options=callable_options,
                      callable_runner_args=feed_values)
    return wrapped_runner

  @property
  def run_call_count(self):
    return self._run_call_count

  def increment_run_call_count(self):
    self._run_call_count += 1

  def _is_disk_usage_reset_each_run(self):
    """Indicates whether disk usage is reset after each Session.run.

    Subclasses that clean up the disk usage after every run should
    override this protected method.

    Returns:
      (`bool`) Whether the disk usage amount is reset to zero after
        each Session.run.
    """
    return False

  def _decorate_run_options_for_debug(
      self,
      run_options,
      debug_urls,
      debug_ops="DebugIdentity",
      node_name_regex_allowlist=None,
      op_type_regex_allowlist=None,
      tensor_dtype_regex_allowlist=None,
      tolerate_debug_op_creation_failures=False):
    """Modify a RunOptions object for debug tensor watching.

    Specifies request for outputting partition graphs. Adds
    debug_tensor_watch_opts with proper debug URLs.

    Args:
      run_options: (RunOptions) the modified RunOptions object.
      debug_urls: (list of str) debug URLs to be entered in run_options.
        debug_tensor_watch_opts.
      debug_ops: (str or list of str) debug op(s) to be used by the debugger.
      node_name_regex_allowlist: Regular-expression allowlist for node
        name.
      op_type_regex_allowlist: Regular-expression allowlist for op type.
      tensor_dtype_regex_allowlist: Regular-expression allowlist for tensor
        dtype.
      tolerate_debug_op_creation_failures: Whether debug op creation failures
        are to be tolerated.
    """

    run_options.output_partition_graphs = True
    debug_utils.watch_graph(
        run_options,
        self._sess.graph,
        debug_urls=debug_urls,
        debug_ops=debug_ops,
        node_name_regex_allowlist=node_name_regex_allowlist,
        op_type_regex_allowlist=op_type_regex_allowlist,
        tensor_dtype_regex_allowlist=tensor_dtype_regex_allowlist,
        tolerate_debug_op_creation_failures=tolerate_debug_op_creation_failures,
        reset_disk_byte_usage=(self._run_call_count == 1 or
                               self._is_disk_usage_reset_each_run()))

  def _decorate_run_options_for_profile(self, run_options):
    """Modify a RunOptions object for profiling TensorFlow graph execution.

    Args:
      run_options: (RunOptions) the modified RunOptions object.
    """

    run_options.trace_level = config_pb2.RunOptions.FULL_TRACE

  @abc.abstractmethod
  def on_session_init(self, request):
    """Callback invoked during construction of the debug-wrapper session.

    This is a blocking callback.
    The invocation happens right before the constructor ends.

    Args:
      request: (`OnSessionInitRequest`) callback request carrying information
        such as the session being wrapped.

    Returns:
      An instance of `OnSessionInitResponse`.
    """

  @abc.abstractmethod
  def on_run_start(self, request):
    """Callback invoked on run() calls to the debug-wrapper session.

    This is a blocking callback.
    The invocation happens after the wrapper's run() call is entered,
    after an increment of run call counter.

    Args:
      request: (`OnRunStartRequest`) callback request object carrying
        information about the run call such as the fetches, feed dict, run
        options, run metadata, and how many `run()` calls to this wrapper
        session have occurred.

    Returns:
      An instance of `OnRunStartResponse`, carrying information to
        debug URLs used to watch the tensors.
    """

  @abc.abstractmethod
  def on_run_end(self, request):
    """Callback invoked on run() calls to the debug-wrapper session.

    This is a blocking callback.
    The invocation happens right before the wrapper exits its run() call.

    Args:
      request: (`OnRunEndRequest`) callback request object carrying information
        such as the actual action performed by the session wrapper for the
        run() call.

    Returns:
      An instance of `OnRunStartResponse`.
    """

  def as_default(self):
    return ops.default_session(self)

  def __enter__(self):
    if self._default_session_context_manager is None:
      self._default_session_context_manager = self.as_default()
    return self._default_session_context_manager.__enter__()

  def __exit__(self, exec_type, exec_value, exec_tb):
    self._default_session_context_manager.__exit__(
        exec_type, exec_value, exec_tb)

  def __del__(self):
    if hasattr(self._sess, "__del__"):
      self._sess.__del__()

  def close(self):
    self._sess.close()

  # TODO(cais): Add _node_name_regex_allowlist and
  #   _node_op_type_regex_allowlist.

  def should_stop(self):
    if hasattr(self._sess, "should_stop"):
      return self._sess.should_stop()
    else:
      raise ValueError(
          "The wrapped session %r does not have a method called 'should_stop'. "
          "Do you intend to wrap a tf.MonitoredSession instead?" % self._sess)


class WatchOptions(object):
  """Type for return values of watch_fn."""

  def __init__(self,
               debug_ops=None,
               node_name_regex_allowlist=None,
               op_type_regex_allowlist=None,
               tensor_dtype_regex_allowlist=None,
               tolerate_debug_op_creation_failures=False):
    """Constructor of WatchOptions: Debug watch options.

    Used as return values of `watch_fn`s.

    Args:
      debug_ops: (`str` or `list of str`) Debug ops to be used.
      node_name_regex_allowlist: Regular-expression allowlist for node_name,
        e.g., `"(weight_[0-9]+|bias_.*)"`
      op_type_regex_allowlist: Regular-expression allowlist for the op type of
        nodes, e.g., `"(Variable|Add)"`.
        If both `node_name_regex_allowlist` and `op_type_regex_allowlist`
        are set, the two filtering operations will occur in a logical `AND`
        relation. In other words, a node will be included if and only if it
        hits both allowlists.
      tensor_dtype_regex_allowlist: Regular-expression allowlist for Tensor
        data type, e.g., `"^int.*"`.
        This allowlist operates in logical `AND` relations to the two allowlists
        above.
      tolerate_debug_op_creation_failures: (`bool`) whether debug op creation
        failures (e.g., due to dtype incompatibility) are to be tolerated by not
        throwing exceptions.
    """
    if debug_ops:
      self.debug_ops = debug_ops
    else:
      self.debug_ops = ["DebugIdentity"]
    self.node_name_regex_allowlist = node_name_regex_allowlist
    self.op_type_regex_allowlist = op_type_regex_allowlist
    self.tensor_dtype_regex_allowlist = tensor_dtype_regex_allowlist
    self.tolerate_debug_op_creation_failures = (
        tolerate_debug_op_creation_failures)

  def __repr__(self):
    return ("WatchOptions(debug_ops=%r, node_name_regex_allowlist=%r, "
            "op_type_regex_allowlist=%r, tensor_dtype_regex_allowlist=%r, "
            "tolerate_debug_op_creation_failures=%r)" %
            (self.debug_ops, self.node_name_regex_allowlist,
             self.op_type_regex_allowlist, self.tensor_dtype_regex_allowlist,
             self.tolerate_debug_op_creation_failures))


class NonInteractiveDebugWrapperSession(BaseDebugWrapperSession):
  """Base class for non-interactive (i.e., non-CLI) debug wrapper sessions."""

  def __init__(self, sess, watch_fn=None, thread_name_filter=None,
               pass_through_operrors=False):
    """Constructor of NonInteractiveDebugWrapperSession.

    Args:
      sess: The TensorFlow `Session` object being wrapped.
      watch_fn: (`Callable`) A Callable that maps the fetches and feeds of a
        debugged `Session.run()` call to `WatchOptions.`
        * Args:
          * `fetches`: the fetches to the `Session.run()` call.
          * `feeds`: the feeds to the `Session.run()` call.

        * Returns:
         (`tf_debug.WatchOptions`) An object containing debug options including
           the debug ops to use, the node names, op types and/or tensor data
           types to watch, etc. See the documentation of `tf_debug.WatchOptions`
           for more details.
      thread_name_filter: Regular-expression white list for threads on which the
        wrapper session will be active. See doc of `BaseDebugWrapperSession` for
        more details.
      pass_through_operrors: If true, all captured OpErrors will be
        propagated.  By default this captures all OpErrors.
    Raises:
       TypeError: If a non-None `watch_fn` is specified and it is not callable.
    """

    BaseDebugWrapperSession.__init__(
        self, sess, thread_name_filter=thread_name_filter,
        pass_through_operrors=pass_through_operrors)

    self._watch_fn = None
    if watch_fn is not None:
      if not callable(watch_fn):
        raise TypeError("watch_fn is not callable")
      self._watch_fn = watch_fn

  def on_session_init(self, request):
    """See doc of BaseDebugWrapperSession.on_run_start."""

    return OnSessionInitResponse(OnSessionInitAction.PROCEED)

  @abc.abstractmethod
  def prepare_run_debug_urls(self, fetches, feed_dict):
    """Abstract method to be implemented by concrete subclasses.

    This method prepares the run-specific debug URL(s).

    Args:
      fetches: Same as the `fetches` argument to `Session.run()`
      feed_dict: Same as the `feed_dict` argument to `Session.run()`

    Returns:
      debug_urls: (`str` or `list` of `str`) Debug URLs to be used in
        this `Session.run()` call.
    """

  def on_run_start(self, request):
    """See doc of BaseDebugWrapperSession.on_run_start."""

    debug_urls, watch_opts = self._prepare_run_watch_config(
        request.fetches, request.feed_dict)

    return OnRunStartResponse(
        OnRunStartAction.DEBUG_RUN,
        debug_urls,
        debug_ops=watch_opts.debug_ops,
        node_name_regex_allowlist=watch_opts.node_name_regex_allowlist,
        op_type_regex_allowlist=watch_opts.op_type_regex_allowlist,
        tensor_dtype_regex_allowlist=watch_opts.tensor_dtype_regex_allowlist,
        tolerate_debug_op_creation_failures=(
            watch_opts.tolerate_debug_op_creation_failures))

  def _prepare_run_watch_config(self, fetches, feed_dict):
    """Get the debug_urls, and node/op allowlists for the current run() call.

    Args:
      fetches: Same as the `fetches` argument to `Session.run()`.
      feed_dict: Same as the `feed_dict argument` to `Session.run()`.

    Returns:
      debug_urls: (str or list of str) Debug URLs for the current run() call.
        Currently, the list consists of only one URL that is a file:// URL.
      watch_options: (WatchOptions) The return value of a watch_fn, containing
        options including debug_ops, and allowlists.
    """

    debug_urls = self.prepare_run_debug_urls(fetches, feed_dict)
    if self._watch_fn is None:
      watch_options = WatchOptions()
    else:
      watch_options = self._watch_fn(fetches, feed_dict)
      if isinstance(watch_options, tuple):
        # For legacy return type (tuples).
        watch_options = WatchOptions(*watch_options)

    return debug_urls, watch_options

  def on_run_end(self, request):
    """See doc of BaseDebugWrapperSession.on_run_end."""

    return OnRunEndResponse()
