# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Shared functions and classes for tfdbg command-line interface."""
import math

import numpy as np
import six

from tensorflow.python.debug.cli import command_parser
from tensorflow.python.debug.cli import debugger_cli_common
from tensorflow.python.debug.cli import tensor_format
from tensorflow.python.debug.lib import common
from tensorflow.python.framework import ops
from tensorflow.python.ops import variables
from tensorflow.python.platform import gfile

RL = debugger_cli_common.RichLine

# Default threshold number of elements above which ellipses will be used
# when printing the value of the tensor.
DEFAULT_NDARRAY_DISPLAY_THRESHOLD = 2000

COLOR_BLACK = "black"
COLOR_BLUE = "blue"
COLOR_CYAN = "cyan"
COLOR_GRAY = "gray"
COLOR_GREEN = "green"
COLOR_MAGENTA = "magenta"
COLOR_RED = "red"
COLOR_WHITE = "white"
COLOR_YELLOW = "yellow"

TIME_UNIT_US = "us"
TIME_UNIT_MS = "ms"
TIME_UNIT_S = "s"
TIME_UNITS = [TIME_UNIT_US, TIME_UNIT_MS, TIME_UNIT_S]


def bytes_to_readable_str(num_bytes, include_b=False):
  """Generate a human-readable string representing number of bytes.

  The units B, kB, MB and GB are used.

  Args:
    num_bytes: (`int` or None) Number of bytes.
    include_b: (`bool`) Include the letter B at the end of the unit.

  Returns:
    (`str`) A string representing the number of bytes in a human-readable way,
      including a unit at the end.
  """

  if num_bytes is None:
    return str(num_bytes)
  if num_bytes < 1024:
    result = "%d" % num_bytes
  elif num_bytes < 1048576:
    result = "%.2fk" % (num_bytes / 1024.0)
  elif num_bytes < 1073741824:
    result = "%.2fM" % (num_bytes / 1048576.0)
  else:
    result = "%.2fG" % (num_bytes / 1073741824.0)

  if include_b:
    result += "B"
  return result


def time_to_readable_str(value_us, force_time_unit=None):
  """Convert time value to human-readable string.

  Args:
    value_us: time value in microseconds.
    force_time_unit: force the output to use the specified time unit. Must be
      in TIME_UNITS.

  Returns:
    Human-readable string representation of the time value.

  Raises:
    ValueError: if force_time_unit value is not in TIME_UNITS.
  """
  if not value_us:
    return "0"
  if force_time_unit:
    if force_time_unit not in TIME_UNITS:
      raise ValueError("Invalid time unit: %s" % force_time_unit)
    order = TIME_UNITS.index(force_time_unit)
    time_unit = force_time_unit
    return "{:.10g}{}".format(value_us / math.pow(10.0, 3*order), time_unit)
  else:
    order = min(len(TIME_UNITS) - 1, int(math.log(value_us, 10) / 3))
    time_unit = TIME_UNITS[order]
    return "{:.3g}{}".format(value_us / math.pow(10.0, 3*order), time_unit)


def parse_ranges_highlight(ranges_string):
  """Process ranges highlight string.

  Args:
    ranges_string: (str) A string representing a numerical range of a list of
      numerical ranges. See the help info of the -r flag of the print_tensor
      command for more details.

  Returns:
    An instance of tensor_format.HighlightOptions, if range_string is a valid
      representation of a range or a list of ranges.
  """

  ranges = None

  def ranges_filter(x):
    r = np.zeros(x.shape, dtype=bool)
    for range_start, range_end in ranges:
      r = np.logical_or(r, np.logical_and(x >= range_start, x <= range_end))

    return r

  if ranges_string:
    ranges = command_parser.parse_ranges(ranges_string)
    return tensor_format.HighlightOptions(
        ranges_filter, description=ranges_string)
  else:
    return None


def numpy_printoptions_from_screen_info(screen_info):
  if screen_info and "cols" in screen_info:
    return {"linewidth": screen_info["cols"]}
  else:
    return {}


def format_tensor(tensor,
                  tensor_name,
                  np_printoptions,
                  print_all=False,
                  tensor_slicing=None,
                  highlight_options=None,
                  include_numeric_summary=False,
                  write_path=None):
  """Generate formatted str to represent a tensor or its slices.

  Args:
    tensor: (numpy ndarray) The tensor value.
    tensor_name: (str) Name of the tensor, e.g., the tensor's debug watch key.
    np_printoptions: (dict) Numpy tensor formatting options.
    print_all: (bool) Whether the tensor is to be displayed in its entirety,
      instead of printing ellipses, even if its number of elements exceeds
      the default numpy display threshold.
      (Note: Even if this is set to true, the screen output can still be cut
       off by the UI frontend if it consist of more lines than the frontend
       can handle.)
    tensor_slicing: (str or None) Slicing of the tensor, e.g., "[:, 1]". If
      None, no slicing will be performed on the tensor.
    highlight_options: (tensor_format.HighlightOptions) options to highlight
      elements of the tensor. See the doc of tensor_format.format_tensor()
      for more details.
    include_numeric_summary: Whether a text summary of the numeric values (if
      applicable) will be included.
    write_path: A path to save the tensor value (after any slicing) to
      (optional). `numpy.save()` is used to save the value.

  Returns:
    An instance of `debugger_cli_common.RichTextLines` representing the
    (potentially sliced) tensor.
  """

  if tensor_slicing:
    # Validate the indexing.
    value = command_parser.evaluate_tensor_slice(tensor, tensor_slicing)
    sliced_name = tensor_name + tensor_slicing
  else:
    value = tensor
    sliced_name = tensor_name

  auxiliary_message = None
  if write_path:
    with gfile.Open(write_path, "wb") as output_file:
      np.save(output_file, value)
    line = debugger_cli_common.RichLine("Saved value to: ")
    line += debugger_cli_common.RichLine(write_path, font_attr="bold")
    line += " (%sB)" % bytes_to_readable_str(gfile.Stat(write_path).length)
    auxiliary_message = debugger_cli_common.rich_text_lines_from_rich_line_list(
        [line, debugger_cli_common.RichLine("")])

  if print_all:
    np_printoptions["threshold"] = value.size
  else:
    np_printoptions["threshold"] = DEFAULT_NDARRAY_DISPLAY_THRESHOLD

  return tensor_format.format_tensor(
      value,
      sliced_name,
      include_metadata=True,
      include_numeric_summary=include_numeric_summary,
      auxiliary_message=auxiliary_message,
      np_printoptions=np_printoptions,
      highlight_options=highlight_options)


def error(msg):
  """Generate a RichTextLines output for error.

  Args:
    msg: (str) The error message.

  Returns:
    (debugger_cli_common.RichTextLines) A representation of the error message
      for screen output.
  """

  return debugger_cli_common.rich_text_lines_from_rich_line_list([
      RL("ERROR: " + msg, COLOR_RED)])


def _recommend_command(command, description, indent=2, create_link=False):
  """Generate a RichTextLines object that describes a recommended command.

  Args:
    command: (str) The command to recommend.
    description: (str) A description of what the command does.
    indent: (int) How many spaces to indent in the beginning.
    create_link: (bool) Whether a command link is to be applied to the command
      string.

  Returns:
    (RichTextLines) Formatted text (with font attributes) for recommending the
      command.
  """

  indent_str = " " * indent

  if create_link:
    font_attr = [debugger_cli_common.MenuItem("", command), "bold"]
  else:
    font_attr = "bold"

  lines = [RL(indent_str) + RL(command, font_attr) + ":",
           indent_str + "  " + description]

  return debugger_cli_common.rich_text_lines_from_rich_line_list(lines)


def get_tfdbg_logo():
  """Make an ASCII representation of the tfdbg logo."""

  lines = [
      "",
      "TTTTTT FFFF DDD  BBBB   GGG ",
      "  TT   F    D  D B   B G    ",
      "  TT   FFF  D  D BBBB  G  GG",
      "  TT   F    D  D B   B G   G",
      "  TT   F    DDD  BBBB   GGG ",
      "",
  ]
  return debugger_cli_common.RichTextLines(lines)


_HORIZONTAL_BAR = "======================================"


def get_run_start_intro(run_call_count,
                        fetches,
                        feed_dict,
                        tensor_filters,
                        is_callable_runner=False):
  """Generate formatted intro for run-start UI.

  Args:
    run_call_count: (int) Run call counter.
    fetches: Fetches of the `Session.run()` call. See doc of `Session.run()`
      for more details.
    feed_dict: Feeds to the `Session.run()` call. See doc of `Session.run()`
      for more details.
    tensor_filters: (dict) A dict from tensor-filter name to tensor-filter
      callable.
    is_callable_runner: (bool) whether a runner returned by
        Session.make_callable is being run.

  Returns:
    (RichTextLines) Formatted intro message about the `Session.run()` call.
  """

  fetch_lines = common.get_flattened_names(fetches)

  if not feed_dict:
    feed_dict_lines = [debugger_cli_common.RichLine("  (Empty)")]
  else:
    feed_dict_lines = []
    for feed_key in feed_dict:
      feed_key_name = common.get_graph_element_name(feed_key)
      feed_dict_line = debugger_cli_common.RichLine("  ")
      feed_dict_line += debugger_cli_common.RichLine(
          feed_key_name,
          debugger_cli_common.MenuItem(None, "pf '%s'" % feed_key_name))
      # Surround the name string with quotes, because feed_key_name may contain
      # spaces in some cases, e.g., SparseTensors.
      feed_dict_lines.append(feed_dict_line)
  feed_dict_lines = debugger_cli_common.rich_text_lines_from_rich_line_list(
      feed_dict_lines)

  out = debugger_cli_common.RichTextLines(_HORIZONTAL_BAR)
  if is_callable_runner:
    out.append("Running a runner returned by Session.make_callable()")
  else:
    out.append("Session.run() call #%d:" % run_call_count)
    out.append("")
    out.append("Fetch(es):")
    out.extend(debugger_cli_common.RichTextLines(
        ["  " + line for line in fetch_lines]))
    out.append("")
    out.append("Feed dict:")
    out.extend(feed_dict_lines)
  out.append(_HORIZONTAL_BAR)
  out.append("")
  out.append("Select one of the following commands to proceed ---->")

  out.extend(
      _recommend_command(
          "run",
          "Execute the run() call with debug tensor-watching",
          create_link=True))
  out.extend(
      _recommend_command(
          "run -n",
          "Execute the run() call without debug tensor-watching",
          create_link=True))
  out.extend(
      _recommend_command(
          "run -t <T>",
          "Execute run() calls (T - 1) times without debugging, then "
          "execute run() once more with debugging and drop back to the CLI"))
  out.extend(
      _recommend_command(
          "run -f <filter_name>",
          "Keep executing run() calls until a dumped tensor passes a given, "
          "registered filter (conditional breakpoint mode)"))

  more_lines = ["    Registered filter(s):"]
  if tensor_filters:
    filter_names = []
    for filter_name in tensor_filters:
      filter_names.append(filter_name)
      command_menu_node = debugger_cli_common.MenuItem(
          "", "run -f %s" % filter_name)
      more_lines.append(RL("        * ") + RL(filter_name, command_menu_node))
  else:
    more_lines.append("        (None)")

  out.extend(
      debugger_cli_common.rich_text_lines_from_rich_line_list(more_lines))

  out.append("")

  out.append_rich_line(RL("For more details, see ") +
                       RL("help.", debugger_cli_common.MenuItem("", "help")) +
                       ".")
  out.append("")

  # Make main menu for the run-start intro.
  menu = debugger_cli_common.Menu()
  menu.append(debugger_cli_common.MenuItem("run", "run"))
  menu.append(debugger_cli_common.MenuItem("exit", "exit"))
  out.annotations[debugger_cli_common.MAIN_MENU_KEY] = menu

  return out


def get_run_short_description(run_call_count,
                              fetches,
                              feed_dict,
                              is_callable_runner=False):
  """Get a short description of the run() call.

  Args:
    run_call_count: (int) Run call counter.
    fetches: Fetches of the `Session.run()` call. See doc of `Session.run()`
      for more details.
    feed_dict: Feeds to the `Session.run()` call. See doc of `Session.run()`
      for more details.
    is_callable_runner: (bool) whether a runner returned by
        Session.make_callable is being run.

  Returns:
    (str) A short description of the run() call, including information about
      the fetche(s) and feed(s).
  """
  if is_callable_runner:
    return "runner from make_callable()"

  description = "run #%d: " % run_call_count

  if isinstance(fetches, (ops.Tensor, ops.Operation, variables.Variable)):
    description += "1 fetch (%s); " % common.get_graph_element_name(fetches)
  else:
    # Could be (nested) list, tuple, dict or namedtuple.
    num_fetches = len(common.get_flattened_names(fetches))
    if num_fetches > 1:
      description += "%d fetches; " % num_fetches
    else:
      description += "%d fetch; " % num_fetches

  if not feed_dict:
    description += "0 feeds"
  else:
    if len(feed_dict) == 1:
      for key in feed_dict:
        description += "1 feed (%s)" % (
            key if isinstance(key, six.string_types) or not hasattr(key, "name")
            else key.name)
    else:
      description += "%d feeds" % len(feed_dict)

  return description


def get_error_intro(tf_error):
  """Generate formatted intro for TensorFlow run-time error.

  Args:
    tf_error: (errors.OpError) TensorFlow run-time error object.

  Returns:
    (RichTextLines) Formatted intro message about the run-time OpError, with
      sample commands for debugging.
  """

  if hasattr(tf_error, "op") and hasattr(tf_error.op, "name"):
    op_name = tf_error.op.name
  else:
    op_name = None

  intro_lines = [
      "--------------------------------------",
      RL("!!! An error occurred during the run !!!", "blink"),
      "",
  ]

  out = debugger_cli_common.rich_text_lines_from_rich_line_list(intro_lines)

  if op_name is not None:
    out.extend(debugger_cli_common.RichTextLines(
        ["You may use the following commands to debug:"]))
    out.extend(
        _recommend_command("ni -a -d -t %s" % op_name,
                           "Inspect information about the failing op.",
                           create_link=True))
    out.extend(
        _recommend_command("li -r %s" % op_name,
                           "List inputs to the failing op, recursively.",
                           create_link=True))

    out.extend(
        _recommend_command(
            "lt",
            "List all tensors dumped during the failing run() call.",
            create_link=True))
  else:
    out.extend(debugger_cli_common.RichTextLines([
        "WARNING: Cannot determine the name of the op that caused the error."]))

  more_lines = [
      "",
      "Op name:    %s" % op_name,
      "Error type: " + str(type(tf_error)),
      "",
      "Details:",
      str(tf_error),
      "",
      "--------------------------------------",
      "",
  ]

  out.extend(debugger_cli_common.RichTextLines(more_lines))

  return out
