"""Manage the filesystem provided by your OS.

In essence, an `OSFS` is a thin layer over the `io` and `os` modules
of the Python standard library.
"""

from __future__ import absolute_import, print_function, unicode_literals

import sys
import typing

import errno
import io
import itertools
import logging
import os
import platform
import shutil
import six
import stat
import tempfile

try:
    from os import scandir
except ImportError:
    try:
        from scandir import scandir  # type: ignore
    except ImportError:  # pragma: no cover
        scandir = None  # type: ignore  # pragma: no cover

try:
    from os import sendfile
except ImportError:
    try:
        from sendfile import sendfile  # type: ignore
    except ImportError:
        sendfile = None  # type: ignore  # pragma: no cover

from . import errors
from ._fscompat import fsdecode, fsencode, fspath
from ._url_tools import url_quote
from .base import FS
from .copy import copy_modified_time
from .enums import ResourceType
from .error_tools import convert_os_errors
from .errors import FileExpected, NoURL
from .info import Info
from .mode import Mode, validate_open_mode
from .path import basename, dirname
from .permissions import Permissions

if typing.TYPE_CHECKING:
    from typing import (
        IO,
        Any,
        BinaryIO,
        Collection,
        Dict,
        Iterator,
        List,
        Optional,
        SupportsInt,
        Text,
        Tuple,
    )

    from .base import _OpendirFactory
    from .info import RawInfo
    from .subfs import SubFS

    _O = typing.TypeVar("_O", bound="OSFS")


log = logging.getLogger("fs.osfs")


_WINDOWS_PLATFORM = platform.system() == "Windows"


@six.python_2_unicode_compatible
class OSFS(FS):
    """Create an OSFS.

    Examples:
        >>> current_directory_fs = OSFS('.')
        >>> home_fs = OSFS('~/')
        >>> windows_system32_fs = OSFS('c://system32')

    """

    def __init__(
        self,
        root_path,  # type: Text
        create=False,  # type: bool
        create_mode=0o777,  # type: SupportsInt
        expand_vars=True,  # type: bool
    ):
        # type: (...) -> None
        """Create an OSFS instance.

        Arguments:
            root_path (str or ~os.PathLike): An OS path or path-like object
                to the location on your HD you wish to manage.
            create (bool): Set to `True` to create the root directory if it
                does not already exist, otherwise the directory should exist
                prior to creating the ``OSFS`` instance (defaults to `False`).
            create_mode (int): The permissions that will be used to create
                the directory if ``create`` is `True` and the path doesn't
                exist, defaults to ``0o777``.
            expand_vars(bool): If `True` (the default) environment variables
                of the form ``~``, ``$name`` or ``${name}`` will be expanded.

        Raises:
            `fs.errors.CreateFailed`: If ``root_path`` does not
                exist, or could not be created.

        """
        super(OSFS, self).__init__()
        if isinstance(root_path, bytes):
            root_path = fsdecode(root_path)
        self.root_path = root_path
        _drive, _root_path = os.path.splitdrive(fsdecode(fspath(root_path)))
        _root_path = _drive + (_root_path or "/") if _drive else _root_path
        _root_path = os.path.expanduser(
            os.path.expandvars(_root_path) if expand_vars else _root_path
        )
        _root_path = os.path.normpath(os.path.abspath(_root_path))
        self._root_path = _root_path

        if create:
            try:
                if not os.path.isdir(_root_path):
                    os.makedirs(_root_path, mode=int(create_mode))
            except OSError as error:
                raise errors.CreateFailed(
                    "unable to create {} ({})".format(root_path, error), error
                )
        else:
            if not os.path.isdir(_root_path):
                message = "root path '{}' does not exist".format(_root_path)
                raise errors.CreateFailed(message)

        _meta = self._meta = {
            "network": False,
            "read_only": False,
            "supports_rename": True,
            "thread_safe": True,
            "unicode_paths": os.path.supports_unicode_filenames,
            "virtual": False,
        }

        try:
            # https://stackoverflow.com/questions/7870041/check-if-file-system-is-case-insensitive-in-python
            # I don't know of a better way of detecting case insensitivity of a
            # filesystem
            with tempfile.NamedTemporaryFile(prefix="TmP") as _tmp_file:
                _meta["case_insensitive"] = os.path.exists(_tmp_file.name.lower())
        except Exception:
            if platform.system() != "Darwin":
                _meta["case_insensitive"] = os.path.normcase("Aa") == "aa"

        if _WINDOWS_PLATFORM:  # pragma: no cover
            _meta["invalid_path_chars"] = (
                "".join(six.unichr(n) for n in range(31)) + '\\:*?"<>|'
            )
        else:
            _meta["invalid_path_chars"] = "\0"

            if "PC_PATH_MAX" in os.pathconf_names:
                try:
                    _meta["max_sys_path_length"] = os.pathconf(
                        fsencode(_root_path), os.pathconf_names["PC_PATH_MAX"]
                    )
                except OSError:  # pragma: no cover
                    # The above fails with nfs mounts on OSX. Go figure.
                    pass

    def __repr__(self):
        # type: () -> str
        _fmt = "{}({!r})"
        _class_name = self.__class__.__name__
        return _fmt.format(_class_name, self.root_path)

    def __str__(self):
        # type: () -> str
        fmt = "<{} '{}'>"
        _class_name = self.__class__.__name__
        return fmt.format(_class_name.lower(), self.root_path)

    def _to_sys_path(self, path):
        # type: (Text) -> bytes
        """Convert a FS path to a path on the OS."""
        sys_path = fsencode(
            os.path.join(self._root_path, path.lstrip("/").replace("/", os.sep))
        )
        return sys_path

    @classmethod
    def _make_details_from_stat(cls, stat_result):
        # type: (os.stat_result) -> Dict[Text, object]
        """Make a *details* info dict from an `os.stat_result` object."""
        details = {
            "_write": ["accessed", "modified"],
            "accessed": stat_result.st_atime,
            "modified": stat_result.st_mtime,
            "size": stat_result.st_size,
            "type": int(cls._get_type_from_stat(stat_result)),
        }
        # On other Unix systems (such as FreeBSD), the following
        # attributes may be available (but may be only filled out if
        # root tries to use them):
        details["created"] = getattr(stat_result, "st_birthtime", None)
        ctime_key = "created" if _WINDOWS_PLATFORM else "metadata_changed"
        details[ctime_key] = stat_result.st_ctime
        return details

    @classmethod
    def _make_access_from_stat(cls, stat_result):
        # type: (os.stat_result) -> Dict[Text, object]
        """Make an *access* info dict from an `os.stat_result` object."""
        access = {}  # type: Dict[Text, object]
        access["permissions"] = Permissions(mode=stat_result.st_mode).dump()
        access["gid"] = gid = stat_result.st_gid
        access["uid"] = uid = stat_result.st_uid
        if not _WINDOWS_PLATFORM:
            import grp
            import pwd

            try:
                access["group"] = grp.getgrgid(gid).gr_name
            except KeyError:  # pragma: no cover
                pass

            try:
                access["user"] = pwd.getpwuid(uid).pw_name
            except KeyError:  # pragma: no cover
                pass
        return access

    STAT_TO_RESOURCE_TYPE = {
        stat.S_IFDIR: ResourceType.directory,
        stat.S_IFCHR: ResourceType.character,
        stat.S_IFBLK: ResourceType.block_special_file,
        stat.S_IFREG: ResourceType.file,
        stat.S_IFIFO: ResourceType.fifo,
        stat.S_IFLNK: ResourceType.symlink,
        stat.S_IFSOCK: ResourceType.socket,
    }

    @classmethod
    def _get_type_from_stat(cls, _stat):
        # type: (os.stat_result) -> ResourceType
        """Get the resource type from an `os.stat_result` object."""
        st_mode = _stat.st_mode
        st_type = stat.S_IFMT(st_mode)
        return cls.STAT_TO_RESOURCE_TYPE.get(st_type, ResourceType.unknown)

    # --------------------------------------------------------
    # Required Methods
    # --------------------------------------------------------

    def _gettarget(self, sys_path):
        # type: (Text) -> Optional[Text]
        if hasattr(os, "readlink"):
            try:
                if _WINDOWS_PLATFORM:  # pragma: no cover
                    return os.readlink(sys_path)
                else:
                    return fsdecode(os.readlink(fsencode(sys_path)))
            except OSError:
                pass
        return None

    def _make_link_info(self, sys_path):
        # type: (Text) -> Dict[Text, object]
        _target = self._gettarget(sys_path)
        return {"target": _target}

    def getinfo(self, path, namespaces=None):
        # type: (Text, Optional[Collection[Text]]) -> Info
        self.check()
        namespaces = namespaces or ()
        _path = self.validatepath(path)
        sys_path = self.getsyspath(_path)
        _lstat = None
        with convert_os_errors("getinfo", path):
            _stat = os.stat(fsencode(sys_path))
            if "lstat" in namespaces:
                _lstat = os.lstat(fsencode(sys_path))

        info = {
            "basic": {"name": basename(_path), "is_dir": stat.S_ISDIR(_stat.st_mode)}
        }
        if "details" in namespaces:
            info["details"] = self._make_details_from_stat(_stat)
        if "stat" in namespaces:
            info["stat"] = {
                k: getattr(_stat, k) for k in dir(_stat) if k.startswith("st_")
            }
        if "lstat" in namespaces:
            info["lstat"] = {
                k: getattr(_lstat, k) for k in dir(_lstat) if k.startswith("st_")
            }
        if "link" in namespaces:
            info["link"] = self._make_link_info(sys_path)
        if "access" in namespaces:
            info["access"] = self._make_access_from_stat(_stat)

        return Info(info)

    def listdir(self, path):
        # type: (Text) -> List[Text]
        self.check()
        _path = self.validatepath(path)
        sys_path = self._to_sys_path(_path)
        with convert_os_errors("listdir", path, directory=True):
            names = os.listdir(fsencode(sys_path))
        return [fsdecode(name) for name in names]
        # return names

    def makedir(
        self,  # type: _O
        path,  # type: Text
        permissions=None,  # type: Optional[Permissions]
        recreate=False,  # type: bool
    ):
        # type: (...) -> SubFS[_O]
        self.check()
        mode = Permissions.get_mode(permissions)
        _path = self.validatepath(path)
        sys_path = self._to_sys_path(_path)
        with convert_os_errors("makedir", path, directory=True):
            try:
                os.mkdir(sys_path, mode)
            except OSError as error:
                if error.errno == errno.ENOENT:
                    raise errors.ResourceNotFound(path)
                elif error.errno == errno.EEXIST and recreate:
                    pass
                else:
                    raise
            return self.opendir(_path)

    def openbin(self, path, mode="r", buffering=-1, **options):
        # type: (Text, Text, int, **Any) -> BinaryIO
        _mode = Mode(mode)
        _mode.validate_bin()
        self.check()
        _path = self.validatepath(path)
        if _path == "/":
            raise errors.FileExpected(path)
        sys_path = self._to_sys_path(_path)
        with convert_os_errors("openbin", path):
            if six.PY2 and _mode.exclusive:
                sys_path = os.open(sys_path, os.O_RDWR | os.O_CREAT | os.O_EXCL)
            binary_file = io.open(
                sys_path, mode=_mode.to_platform_bin(), buffering=buffering, **options
            )
        return binary_file  # type: ignore

    def remove(self, path):
        # type: (Text) -> None
        self.check()
        _path = self.validatepath(path)
        sys_path = self._to_sys_path(_path)
        with convert_os_errors("remove", path):
            try:
                os.remove(sys_path)
            except OSError as error:
                if error.errno == errno.EACCES and sys.platform == "win32":
                    # sometimes windows says this for attempts to remove a dir
                    if os.path.isdir(sys_path):  # pragma: no cover
                        raise errors.FileExpected(path)
                if error.errno == errno.EPERM and sys.platform == "darwin":
                    # sometimes OSX says this for attempts to remove a dir
                    if os.path.isdir(sys_path):  # pragma: no cover
                        raise errors.FileExpected(path)
                raise

    def removedir(self, path):
        # type: (Text) -> None
        self.check()
        _path = self.validatepath(path)
        if _path == "/":
            raise errors.RemoveRootError()
        sys_path = self._to_sys_path(path)
        with convert_os_errors("removedir", path, directory=True):
            os.rmdir(sys_path)

    # --------------------------------------------------------
    # Optional Methods
    # --------------------------------------------------------

    # --- Type hint for opendir ------------------------------

    if typing.TYPE_CHECKING:

        def opendir(self, path, factory=None):
            # type: (_O, Text, Optional[_OpendirFactory]) -> SubFS[_O]
            pass

    # --- Backport of os.sendfile for Python < 3.8 -----------

    def _check_copy(self, src_path, dst_path, overwrite=False):
        # validate individual paths
        _src_path = self.validatepath(src_path)
        _dst_path = self.validatepath(dst_path)
        # check src_path exists and is a file
        if self.gettype(src_path) is not ResourceType.file:
            raise errors.FileExpected(src_path)
        # check dst_path does not exist if we are not overwriting
        if not overwrite and self.exists(_dst_path):
            raise errors.DestinationExists(dst_path)
        # check parent dir of _dst_path exists and is a directory
        if self.gettype(dirname(dst_path)) is not ResourceType.directory:
            raise errors.DirectoryExpected(dirname(dst_path))
        return _src_path, _dst_path

    if sys.version_info[:2] < (3, 8) and sendfile is not None:

        _sendfile_error_codes = {
            errno.EIO,
            errno.EINVAL,
            errno.ENOSYS,
            errno.EBADF,
            errno.ENOTSOCK,
            errno.EOPNOTSUPP,
        }

        # PyPy doesn't define ENOTSUP so we have to add it conditionally.
        if hasattr(errno, "ENOTSUP"):
            _sendfile_error_codes.add(errno.ENOTSUP)

        def copy(self, src_path, dst_path, overwrite=False, preserve_time=False):
            # type: (Text, Text, bool, bool) -> None
            with self._lock:
                # validate and canonicalise paths
                _src_path, _dst_path = self._check_copy(src_path, dst_path, overwrite)
                _src_sys, _dst_sys = (
                    self.getsyspath(_src_path),
                    self.getsyspath(_dst_path),
                )
                # attempt using sendfile
                try:
                    # initialise variables to pass to sendfile
                    # open files to obtain a file descriptor
                    with io.open(_src_sys, "r") as src:
                        with io.open(_dst_sys, "w") as dst:
                            fd_src, fd_dst = src.fileno(), dst.fileno()
                            sent = maxsize = os.fstat(fd_src).st_size
                            offset = 0
                            while sent > 0:
                                sent = sendfile(fd_dst, fd_src, offset, maxsize)
                                offset += sent
                    if preserve_time:
                        copy_modified_time(self, src_path, self, dst_path)
                except OSError as e:
                    # the error is not a simple "sendfile not supported" error
                    if e.errno not in self._sendfile_error_codes:
                        raise
                    # fallback using the shutil implementation
                    shutil.copy2(_src_sys, _dst_sys)

    else:

        def copy(self, src_path, dst_path, overwrite=False, preserve_time=False):
            # type: (Text, Text, bool, bool) -> None
            with self._lock:
                _src_path, _dst_path = self._check_copy(src_path, dst_path, overwrite)
                shutil.copy2(self.getsyspath(_src_path), self.getsyspath(_dst_path))

    # --- Backport of os.scandir for Python < 3.5 ------------

    if scandir:

        def _scandir(self, path, namespaces=None):
            # type: (Text, Optional[Collection[Text]]) -> Iterator[Info]
            self.check()
            namespaces = namespaces or ()
            requires_stat = not {"details", "stat", "access"}.isdisjoint(namespaces)
            _path = self.validatepath(path)
            if _WINDOWS_PLATFORM:
                sys_path = os.path.join(
                    self._root_path, path.lstrip("/").replace("/", os.sep)
                )
            else:
                sys_path = self._to_sys_path(_path)  # type: ignore
            with convert_os_errors("scandir", path, directory=True):
                scandir_iter = scandir(sys_path)
                try:
                    for dir_entry in scandir_iter:
                        info = {
                            "basic": {
                                "name": fsdecode(dir_entry.name),
                                "is_dir": dir_entry.is_dir(),
                            }
                        }
                        if requires_stat:
                            stat_result = dir_entry.stat()
                            if "details" in namespaces:
                                info["details"] = self._make_details_from_stat(
                                    stat_result
                                )
                            if "stat" in namespaces:
                                info["stat"] = {
                                    k: getattr(stat_result, k)
                                    for k in dir(stat_result)
                                    if k.startswith("st_")
                                }
                            if "access" in namespaces:
                                info["access"] = self._make_access_from_stat(
                                    stat_result
                                )
                        if "lstat" in namespaces:
                            lstat_result = dir_entry.stat(follow_symlinks=False)
                            info["lstat"] = {
                                k: getattr(lstat_result, k)
                                for k in dir(lstat_result)
                                if k.startswith("st_")
                            }
                        if "link" in namespaces:
                            info["link"] = self._make_link_info(
                                os.path.join(sys_path, dir_entry.name)
                            )

                        yield Info(info)
                finally:
                    if sys.version_info >= (3, 6):
                        scandir_iter.close()

    else:

        def _scandir(self, path, namespaces=None):
            # type: (Text, Optional[Collection[Text]]) -> Iterator[Info]
            self.check()
            namespaces = namespaces or ()
            _path = self.validatepath(path)
            sys_path = self.getsyspath(_path)
            with convert_os_errors("scandir", path, directory=True):
                for entry_name in os.listdir(sys_path):
                    _entry_name = fsdecode(entry_name)
                    entry_path = os.path.join(sys_path, _entry_name)
                    stat_result = os.stat(fsencode(entry_path))
                    info = {
                        "basic": {
                            "name": _entry_name,
                            "is_dir": stat.S_ISDIR(stat_result.st_mode),
                        }
                    }  # type: Dict[Text, Dict[Text, Any]]
                    if "details" in namespaces:
                        info["details"] = self._make_details_from_stat(stat_result)
                    if "stat" in namespaces:
                        info["stat"] = {
                            k: getattr(stat_result, k)
                            for k in dir(stat_result)
                            if k.startswith("st_")
                        }
                    if "lstat" in namespaces:
                        lstat_result = os.lstat(entry_path)
                        info["lstat"] = {
                            k: getattr(lstat_result, k)
                            for k in dir(lstat_result)
                            if k.startswith("st_")
                        }
                    if "link" in namespaces:
                        info["link"] = self._make_link_info(
                            os.path.join(sys_path, entry_name)
                        )
                    if "access" in namespaces:
                        info["access"] = self._make_access_from_stat(stat_result)

                    yield Info(info)

    def scandir(
        self,
        path,  # type: Text
        namespaces=None,  # type: Optional[Collection[Text]]
        page=None,  # type: Optional[Tuple[int, int]]
    ):
        # type: (...) -> Iterator[Info]
        iter_info = self._scandir(path, namespaces=namespaces)
        if page is not None:
            start, end = page
            iter_info = itertools.islice(iter_info, start, end)
        return iter_info

    # --- Miscellaneous --------------------------------------

    def getsyspath(self, path):
        # type: (Text) -> Text
        sys_path = os.path.join(self._root_path, path.lstrip("/").replace("/", os.sep))
        return sys_path

    def geturl(self, path, purpose="download"):
        # type: (Text, Text) -> Text
        sys_path = self.getsyspath(path)
        if purpose == "download":
            return "file://" + sys_path
        elif purpose == "fs":
            url_path = url_quote(sys_path)
            return "osfs://" + url_path
        else:
            raise NoURL(path, purpose)

    def gettype(self, path):
        # type: (Text) -> ResourceType
        self.check()
        sys_path = self._to_sys_path(path)
        with convert_os_errors("gettype", path):
            stat = os.stat(sys_path)
        resource_type = self._get_type_from_stat(stat)
        return resource_type

    def islink(self, path):
        # type: (Text) -> bool
        self.check()
        _path = self.validatepath(path)
        sys_path = self._to_sys_path(_path)
        if not self.exists(path):
            raise errors.ResourceNotFound(path)
        with convert_os_errors("islink", path):
            return os.path.islink(sys_path)

    def open(
        self,
        path,  # type: Text
        mode="r",  # type: Text
        buffering=-1,  # type: int
        encoding=None,  # type: Optional[Text]
        errors=None,  # type: Optional[Text]
        newline="",  # type: Text
        line_buffering=False,  # type: bool
        **options  # type: Any
    ):
        # type: (...) -> IO
        _mode = Mode(mode)
        validate_open_mode(mode)
        self.check()
        _path = self.validatepath(path)
        if _path == "/":
            raise FileExpected(path)
        sys_path = self._to_sys_path(_path)
        with convert_os_errors("open", path):
            if six.PY2 and _mode.exclusive:
                sys_path = os.open(sys_path, os.O_RDWR | os.O_CREAT | os.O_EXCL)
            _encoding = encoding or "utf-8"
            return io.open(
                sys_path,
                mode=_mode.to_platform(),
                buffering=buffering,
                encoding=None if _mode.binary else _encoding,
                errors=errors,
                newline=None if _mode.binary else newline,
                **options
            )

    def setinfo(self, path, info):
        # type: (Text, RawInfo) -> None
        self.check()
        _path = self.validatepath(path)
        sys_path = self._to_sys_path(_path)
        if not os.path.exists(sys_path):
            raise errors.ResourceNotFound(path)
        if "details" in info:
            details = info["details"]
            if "accessed" in details or "modified" in details:
                _accessed = typing.cast(float, details.get("accessed"))
                _modified = typing.cast(float, details.get("modified", _accessed))
                accessed = float(_modified if _accessed is None else _accessed)
                modified = float(_modified)
                if accessed is not None or modified is not None:
                    with convert_os_errors("setinfo", path):
                        os.utime(sys_path, (accessed, modified))

    def validatepath(self, path):
        # type: (Text) -> Text
        """Check path may be encoded, in addition to usual checks."""
        try:
            fsencode(path)
        except UnicodeEncodeError as error:
            raise errors.InvalidCharsInPath(
                path,
                msg="path '{path}' could not be encoded for the filesystem (check LANG"
                " env var); {error}".format(path=path, error=error),
            )
        return super(OSFS, self).validatepath(path)
