"""Abstraction for sessions in various clouds."""

import logging
import os

from fiona.path import parse_path, UnparsedPath

log = logging.getLogger(__name__)

try:
    import boto3
except ImportError:
    log.debug("Could not import boto3, continuing with reduced functionality.")
    boto3 = None


class Session:
    """Base for classes that configure access to secured resources.

    Attributes
    ----------
    credentials : dict
        Keys and values for session credentials.

    Notes
    -----
    This class is not intended to be instantiated.

    """

    @classmethod
    def hascreds(cls, config):
        """Determine if the given configuration has proper credentials

        Parameters
        ----------
        cls : class
            A Session class.
        config : dict
            GDAL configuration as a dict.

        Returns
        -------
        bool

        """
        return NotImplemented

    def get_credential_options(self):
        """Get credentials as GDAL configuration options

        Returns
        -------
        dict

        """
        return NotImplemented

    @staticmethod
    def from_foreign_session(session, cls=None):
        """Create a session object matching the foreign `session`.

        Parameters
        ----------
        session : obj
            A foreign session object.
        cls : Session class, optional
            The class to return.

        Returns
        -------
        Session

        """
        if not cls:
            return DummySession()
        else:
            return cls(session)

    @staticmethod
    def cls_from_path(path):
        """Find the session class suited to the data at `path`.

        Parameters
        ----------
        path : str
            A dataset path or identifier.

        Returns
        -------
        class

        """
        if not path:
            return DummySession

        path = parse_path(path)

        if isinstance(path, UnparsedPath) or path.is_local:
            return DummySession

        elif (
            path.scheme == "s3" or "amazonaws.com" in path.path
        ) and "X-Amz-Signature" not in path.path:
            if boto3 is not None:
                return AWSSession
            else:
                log.info("boto3 not available, falling back to a DummySession.")
                return DummySession

        elif path.scheme == "oss" or "aliyuncs.com" in path.path:
            return OSSSession

        elif path.path.startswith("/vsiswift/"):
            return SwiftSession

        elif path.scheme == "az":
            return AzureSession

        # This factory can be extended to other cloud providers here.
        # elif path.scheme == "cumulonimbus":  # for example.
        #     return CumulonimbusSession(*args, **kwargs)

        else:
            return DummySession

    @staticmethod
    def from_path(path, *args, **kwargs):
        """Create a session object suited to the data at `path`.

        Parameters
        ----------
        path : str
            A dataset path or identifier.
        args : sequence
            Positional arguments for the foreign session constructor.
        kwargs : dict
            Keyword arguments for the foreign session constructor.

        Returns
        -------
        Session

        """
        return Session.cls_from_path(path)(*args, **kwargs)

    @staticmethod
    def aws_or_dummy(*args, **kwargs):
        """Create an AWSSession if boto3 is available, else DummySession
        Parameters
        ----------
        path : str
            A dataset path or identifier.
        args : sequence
            Positional arguments for the foreign session constructor.
        kwargs : dict
            Keyword arguments for the foreign session constructor.
        Returns
        -------
        Session
        """
        if boto3 is not None:
            return AWSSession(*args, **kwargs)
        else:
            return DummySession(*args, **kwargs)

    @staticmethod
    def from_environ(*args, **kwargs):
        """Create a session object suited to the environment.
        Parameters
        ----------
        path : str
            A dataset path or identifier.
        args : sequence
            Positional arguments for the foreign session constructor.
        kwargs : dict
            Keyword arguments for the foreign session constructor.
        Returns
        -------
        Session
        """
        try:
            session = Session.aws_or_dummy(*args, **kwargs)
            session.credentials
        except RuntimeError:
            log.warning(
                "Credentials in environment have expired. Creating a DummySession."
            )
            session = DummySession(*args, **kwargs)
        return session


class DummySession(Session):
    """A dummy session.

    Attributes
    ----------
    credentials : dict
        The session credentials.

    """

    def __init__(self, *args, **kwargs):
        self._session = None
        self.credentials = {}

    @classmethod
    def hascreds(cls, config):
        """Determine if the given configuration has proper credentials

        Parameters
        ----------
        cls : class
            A Session class.
        config : dict
            GDAL configuration as a dict.

        Returns
        -------
        bool

        """
        return True

    def get_credential_options(self):
        """Get credentials as GDAL configuration options

        Returns
        -------
        dict

        """
        return {}


class AWSSession(Session):
    """Configures access to secured resources stored in AWS S3.
    """

    def __init__(
        self,
        session=None,
        aws_unsigned=False,
        aws_access_key_id=None,
        aws_secret_access_key=None,
        aws_session_token=None,
        region_name=None,
        profile_name=None,
        endpoint_url=None,
        requester_pays=False,
    ):
        """Create a new AWS session

        Parameters
        ----------
        session : optional
            A boto3 session object.
        aws_unsigned : bool, optional (default: False)
            If True, requests will be unsigned.
        aws_access_key_id : str, optional
            An access key id, as per boto3.
        aws_secret_access_key : str, optional
            A secret access key, as per boto3.
        aws_session_token : str, optional
            A session token, as per boto3.
        region_name : str, optional
            A region name, as per boto3.
        profile_name : str, optional
            A shared credentials profile name, as per boto3.
        endpoint_url: str, optional
            An endpoint_url, as per GDAL's AWS_S3_ENPOINT
        requester_pays : bool, optional
            True if the requester agrees to pay transfer costs (default:
            False)

        """
        if session:
            self._session = session
        else:
            self._session = boto3.Session(
                aws_access_key_id=aws_access_key_id,
                aws_secret_access_key=aws_secret_access_key,
                aws_session_token=aws_session_token,
                region_name=region_name,
                profile_name=profile_name)

        self.requester_pays = requester_pays
        self.unsigned = bool(os.getenv("AWS_NO_SIGN_REQUEST", aws_unsigned))
        self.endpoint_url = endpoint_url
        self._creds = (
            self._session.get_credentials()
            if not self.unsigned and self._session
            else None
        )

    @classmethod
    def hascreds(cls, config):
        """Determine if the given configuration has proper credentials

        Parameters
        ----------
        cls : class
            A Session class.
        config : dict
            GDAL configuration as a dict.

        Returns
        -------
        bool

        """
        return {"AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"}.issubset(config.keys())

    @property
    def credentials(self):
        """The session credentials as a dict"""
        res = {}
        if self._creds:  # pragma: no branch
            frozen_creds = self._creds.get_frozen_credentials()
            if frozen_creds.access_key:  # pragma: no branch
                res["aws_access_key_id"] = frozen_creds.access_key
            if frozen_creds.secret_key:  # pragma: no branch
                res["aws_secret_access_key"] = frozen_creds.secret_key
            if frozen_creds.token:
                res["aws_session_token"] = frozen_creds.token
        if self._session.region_name:
            res["aws_region"] = self._session.region_name
        if self.requester_pays:
            res["aws_request_payer"] = "requester"
        if self.endpoint_url:
            res["aws_s3_endpoint"] = self.endpoint_url
        return res

    def get_credential_options(self):
        """Get credentials as GDAL configuration options

        Returns
        -------
        dict

        """
        if self.unsigned:
            opts = {"AWS_NO_SIGN_REQUEST": "YES"}
            if "aws_region" in self.credentials:
                opts["AWS_REGION"] = self.credentials["aws_region"]
            return opts
        else:
            return {k.upper(): v for k, v in self.credentials.items()}


class GSSession(Session):
    """Configures access to secured resources stored in Google Cloud Storage
    """
    def __init__(self, google_application_credentials=None):
        """Create new Google Cloude Storage session

        Parameters
        ----------
        google_application_credentials: string
            Path to the google application credentials JSON file.

        """
        self._creds = {}
        if google_application_credentials is not None:
            self._creds['google_application_credentials'] = google_application_credentials

    @classmethod
    def hascreds(cls, config):
        """Determine if the given configuration has proper credentials

        Parameters
        ----------
        cls : class
            A Session class.
        config : dict
            GDAL configuration as a dict.

        Returns
        -------
        bool

        """
        return 'GOOGLE_APPLICATION_CREDENTIALS' in config

    @property
    def credentials(self):
        """The session credentials as a dict"""
        return self._creds

    def get_credential_options(self):
        """Get credentials as GDAL configuration options

        Returns
        -------
        dict

        """
        return {k.upper(): v for k, v in self.credentials.items()}


class OSSSession(Session):
    """Configures access to secured resources stored in Alibaba Cloud OSS."""

    def __init__(
        self, oss_access_key_id=None, oss_secret_access_key=None, oss_endpoint=None
    ):
        """Create new Alibaba Cloud OSS session

        Parameters
        ----------
        oss_access_key_id: string, optional (default: None)
            An access key id
        oss_secret_access_key: string, optional (default: None)
            An secret access key
        oss_endpoint: string, optional (default: None)
            the region attached to the bucket

        """
        self._creds = {
            "oss_access_key_id": oss_access_key_id,
            "oss_secret_access_key": oss_secret_access_key,
            "oss_endpoint": oss_endpoint,
        }

    @classmethod
    def hascreds(cls, config):
        """Determine if the given configuration has proper credentials

        Parameters
        ----------
        cls : class
            A Session class.
        config : dict
            GDAL configuration as a dict.

        Returns
        -------
        bool

        """
        return {"OSS_ACCESS_KEY_ID", "OSS_SECRET_ACCESS_KEY"}.issubset(config.keys())

    @property
    def credentials(self):
        """The session credentials as a dict"""
        return self._creds

    def get_credential_options(self):
        """Get credentials as GDAL configuration options

        Returns
        -------
        dict

        """
        return {k.upper(): v for k, v in self.credentials.items()}


class SwiftSession(Session):
    """Configures access to secured resources stored in OpenStack Swift Object Storage."""

    def __init__(
        self,
        session=None,
        swift_storage_url=None,
        swift_auth_token=None,
        swift_auth_v1_url=None,
        swift_user=None,
        swift_key=None,
    ):
        """Create new OpenStack Swift Object Storage Session.

        Three methods are possible:
            1. Create session by the swiftclient library.
            2. The SWIFT_STORAGE_URL and SWIFT_AUTH_TOKEN (this method
               is recommended by GDAL docs).
            3. The SWIFT_AUTH_V1_URL, SWIFT_USER and SWIFT_KEY (This
               depends on the swiftclient library).

        Parameters
        ----------
        session: optional
            A swiftclient connection object
        swift_storage_url:
            the storage URL
        swift_auth_token:
            the value of the x-auth-token authorization token
        swift_storage_url: string, optional
            authentication URL
        swift_user: string, optional
            user name to authenticate as
        swift_key: string, optional
            key/password to authenticate with

        Examples
        --------
        >>> import rasterio
        >>> from rasterio.session import SwiftSession
        >>> fp = '/vsiswift/bucket/key.tif'
        >>> conn = Connection(
        ...     authurl='http://127.0.0.1:7777/auth/v1.0',
        ...     user='test:tester',
        ...     key='testing'
        ... )
        >>> session = SwiftSession(conn)
        >>> with rasterio.Env(session):
        >>>     with rasterio.open(fp) as src:
        >>>         print(src.profile)

        """
        if swift_storage_url and swift_auth_token:
            self._creds = {
                "swift_storage_url": swift_storage_url,
                "swift_auth_token": swift_auth_token,
            }
        else:
            from swiftclient.client import Connection

            if session:
                self._session = session
            else:
                self._session = Connection(
                    authurl=swift_auth_v1_url, user=swift_user, key=swift_key
                )
            self._creds = {
                "swift_storage_url": self._session.get_auth()[0],
                "swift_auth_token": self._session.get_auth()[1],
            }

    @classmethod
    def hascreds(cls, config):
        """Determine if the given configuration has proper credentials

        Parameters
        ----------
        cls : class
            A Session class.
        config : dict
            GDAL configuration as a dict.

        Returns
        -------
        bool

        """
        return {"SWIFT_STORAGE_URL", "SWIFT_AUTH_TOKEN"}.issubset(config.keys())

    @property
    def credentials(self):
        """The session credentials as a dict"""
        return self._creds

    def get_credential_options(self):
        """Get credentials as GDAL configuration options

        Returns
        -------
        dict

        """
        return {k.upper(): v for k, v in self.credentials.items()}


class AzureSession(Session):
    """Configures access to secured resources stored in Microsoft Azure Blob Storage."""

    def __init__(
        self,
        azure_storage_connection_string=None,
        azure_storage_account=None,
        azure_storage_access_key=None,
        azure_unsigned=False,
    ):
        """Create new Microsoft Azure Blob Storage session

        Parameters
        ----------
        azure_storage_connection_string: string
            A connection string contains both an account name and a secret key.
        azure_storage_account: string
            An account name
        azure_storage_access_key: string
            A secret key
        azure_unsigned : bool, optional (default: False)
            If True, requests will be unsigned.

        """
        self.unsigned = bool(os.getenv("AZURE_NO_SIGN_REQUEST", azure_unsigned))
        self.storage_account = os.getenv("AZURE_STORAGE_ACCOUNT", azure_storage_account)

        if azure_storage_connection_string:
            self._creds = {
                "azure_storage_connection_string": azure_storage_connection_string
            }
        elif not self.unsigned:
            self._creds = {
                "azure_storage_account": self.storage_account,
                "azure_storage_access_key": azure_storage_access_key,
            }
        else:
            self._creds = {"azure_storage_account": self.storage_account}

    @classmethod
    def hascreds(cls, config):
        """Determine if the given configuration has proper credentials

        Parameters
        ----------
        cls : class
            A Session class.
        config : dict
            GDAL configuration as a dict.

        Returns
        -------
        bool

        """
        return (
            "AZURE_STORAGE_CONNECTION_STRING" in config
            or {"AZURE_STORAGE_ACCOUNT", "AZURE_STORAGE_ACCESS_KEY"}.issubset(
                config.keys()
            )
            or {"AZURE_STORAGE_ACCOUNT", "AZURE_NO_SIGN_REQUEST"}.issubset(
                config.keys()
            )
        )

    @property
    def credentials(self):
        """The session credentials as a dict"""
        return self._creds

    def get_credential_options(self):
        """Get credentials as GDAL configuration options

        Returns
        -------
        dict

        """
        if self.unsigned:
            return {
                "AZURE_NO_SIGN_REQUEST": "YES",
                "AZURE_STORAGE_ACCOUNT": self.storage_account,
            }
        else:
            return {k.upper(): v for k, v in self.credentials.items()}
