import datetime
from io import BytesIO
import re
from warnings import catch_warnings

import numpy as np
import pytest

from pandas import (
    CategoricalIndex,
    DataFrame,
    HDFStore,
    MultiIndex,
    _testing as tm,
    date_range,
    read_hdf,
)
from pandas.tests.io.pytables.common import (
    ensure_clean_path,
    ensure_clean_store,
)

from pandas.io.pytables import (
    Term,
    _maybe_adjust_name,
)

pytestmark = pytest.mark.single_cpu


def test_pass_spec_to_storer(setup_path):

    df = tm.makeDataFrame()

    with ensure_clean_store(setup_path) as store:
        store.put("df", df)
        msg = (
            "cannot pass a column specification when reading a Fixed format "
            "store. this store must be selected in its entirety"
        )
        with pytest.raises(TypeError, match=msg):
            store.select("df", columns=["A"])
        msg = (
            "cannot pass a where specification when reading from a Fixed "
            "format store. this store must be selected in its entirety"
        )
        with pytest.raises(TypeError, match=msg):
            store.select("df", where=[("columns=A")])


def test_table_index_incompatible_dtypes(setup_path):
    df1 = DataFrame({"a": [1, 2, 3]})
    df2 = DataFrame({"a": [4, 5, 6]}, index=date_range("1/1/2000", periods=3))

    with ensure_clean_store(setup_path) as store:
        store.put("frame", df1, format="table")
        msg = re.escape("incompatible kind in col [integer - datetime64]")
        with pytest.raises(TypeError, match=msg):
            store.put("frame", df2, format="table", append=True)


def test_unimplemented_dtypes_table_columns(setup_path):

    with ensure_clean_store(setup_path) as store:

        dtypes = [("date", datetime.date(2001, 1, 2))]

        # currently not supported dtypes ####
        for n, f in dtypes:
            df = tm.makeDataFrame()
            df[n] = f
            msg = re.escape(f"[{n}] is not implemented as a table column")
            with pytest.raises(TypeError, match=msg):
                store.append(f"df1_{n}", df)

    # frame
    df = tm.makeDataFrame()
    df["obj1"] = "foo"
    df["obj2"] = "bar"
    df["datetime1"] = datetime.date(2001, 1, 2)
    df = df._consolidate()._convert(datetime=True)

    with ensure_clean_store(setup_path) as store:
        # this fails because we have a date in the object block......
        msg = re.escape(
            """Cannot serialize the column [datetime1]
because its data contents are not [string] but [date] object dtype"""
        )
        with pytest.raises(TypeError, match=msg):
            store.append("df_unimplemented", df)


def test_invalid_terms(setup_path):

    with ensure_clean_store(setup_path) as store:

        with catch_warnings(record=True):

            df = tm.makeTimeDataFrame()
            df["string"] = "foo"
            df.loc[df.index[0:4], "string"] = "bar"

            store.put("df", df, format="table")

            # some invalid terms
            msg = re.escape(
                "__init__() missing 1 required positional argument: 'where'"
            )
            with pytest.raises(TypeError, match=msg):
                Term()

            # more invalid
            msg = re.escape(
                "cannot process expression [df.index[3]], "
                "[2000-01-06 00:00:00] is not a valid condition"
            )
            with pytest.raises(ValueError, match=msg):
                store.select("df", "df.index[3]")

            msg = "invalid syntax"
            with pytest.raises(SyntaxError, match=msg):
                store.select("df", "index>")

    # from the docs
    with ensure_clean_path(setup_path) as path:
        dfq = DataFrame(
            np.random.randn(10, 4),
            columns=list("ABCD"),
            index=date_range("20130101", periods=10),
        )
        dfq.to_hdf(path, "dfq", format="table", data_columns=True)

        # check ok
        read_hdf(path, "dfq", where="index>Timestamp('20130104') & columns=['A', 'B']")
        read_hdf(path, "dfq", where="A>0 or C>0")

    # catch the invalid reference
    with ensure_clean_path(setup_path) as path:
        dfq = DataFrame(
            np.random.randn(10, 4),
            columns=list("ABCD"),
            index=date_range("20130101", periods=10),
        )
        dfq.to_hdf(path, "dfq", format="table")

        msg = (
            r"The passed where expression: A>0 or C>0\n\s*"
            r"contains an invalid variable reference\n\s*"
            r"all of the variable references must be a reference to\n\s*"
            r"an axis \(e.g. 'index' or 'columns'\), or a data_column\n\s*"
            r"The currently defined references are: index,columns\n"
        )
        with pytest.raises(ValueError, match=msg):
            read_hdf(path, "dfq", where="A>0 or C>0")


def test_append_with_diff_col_name_types_raises_value_error(setup_path):
    df = DataFrame(np.random.randn(10, 1))
    df2 = DataFrame({"a": np.random.randn(10)})
    df3 = DataFrame({(1, 2): np.random.randn(10)})
    df4 = DataFrame({("1", 2): np.random.randn(10)})
    df5 = DataFrame({("1", 2, object): np.random.randn(10)})

    with ensure_clean_store(setup_path) as store:
        name = f"df_{tm.rands(10)}"
        store.append(name, df)

        for d in (df2, df3, df4, df5):
            msg = re.escape(
                "cannot match existing table structure for [0] on appending data"
            )
            with pytest.raises(ValueError, match=msg):
                store.append(name, d)


def test_invalid_complib(setup_path):
    df = DataFrame(np.random.rand(4, 5), index=list("abcd"), columns=list("ABCDE"))
    with tm.ensure_clean(setup_path) as path:
        msg = r"complib only supports \[.*\] compression."
        with pytest.raises(ValueError, match=msg):
            df.to_hdf(path, "df", complib="foolib")


@pytest.mark.parametrize(
    "idx",
    [
        date_range("2019", freq="D", periods=3, tz="UTC"),
        CategoricalIndex(list("abc")),
    ],
)
def test_to_hdf_multiindex_extension_dtype(idx, setup_path):
    # GH 7775
    mi = MultiIndex.from_arrays([idx, idx])
    df = DataFrame(0, index=mi, columns=["a"])
    with ensure_clean_path(setup_path) as path:
        with pytest.raises(NotImplementedError, match="Saving a MultiIndex"):
            df.to_hdf(path, "df")


def test_unsuppored_hdf_file_error(datapath):
    # GH 9539
    data_path = datapath("io", "data", "legacy_hdf/incompatible_dataset.h5")
    message = (
        r"Dataset\(s\) incompatible with Pandas data types, "
        "not table, or no datasets found in HDF5 file."
    )

    with pytest.raises(ValueError, match=message):
        read_hdf(data_path)


def test_read_hdf_errors(setup_path):
    df = DataFrame(np.random.rand(4, 5), index=list("abcd"), columns=list("ABCDE"))

    with ensure_clean_path(setup_path) as path:
        msg = r"File [\S]* does not exist"
        with pytest.raises(OSError, match=msg):
            read_hdf(path, "key")

        df.to_hdf(path, "df")
        store = HDFStore(path, mode="r")
        store.close()

        msg = "The HDFStore must be open for reading."
        with pytest.raises(OSError, match=msg):
            read_hdf(store, "df")


def test_read_hdf_generic_buffer_errors():
    msg = "Support for generic buffers has not been implemented."
    with pytest.raises(NotImplementedError, match=msg):
        read_hdf(BytesIO(b""), "df")


@pytest.mark.parametrize("bad_version", [(1, 2), (1,), [], "12", "123"])
def test_maybe_adjust_name_bad_version_raises(bad_version):
    msg = "Version is incorrect, expected sequence of 3 integers"
    with pytest.raises(ValueError, match=msg):
        _maybe_adjust_name("values_block_0", version=bad_version)
