diff options
Diffstat (limited to 'pyfakefs/fake_filesystem.py')
-rw-r--r-- | pyfakefs/fake_filesystem.py | 4588 |
1 files changed, 904 insertions, 3684 deletions
diff --git a/pyfakefs/fake_filesystem.py b/pyfakefs/fake_filesystem.py index 29bd1ba..9f170d1 100644 --- a/pyfakefs/fake_filesystem.py +++ b/pyfakefs/fake_filesystem.py @@ -14,24 +14,11 @@ """A fake filesystem implementation for unit testing. -:Includes: - * :py:class:`FakeFile`: Provides the appearance of a real file. - * :py:class:`FakeDirectory`: Provides the appearance of a real directory. - * :py:class:`FakeFilesystem`: Provides the appearance of a real directory - hierarchy. - * :py:class:`FakeOsModule`: Uses :py:class:`FakeFilesystem` to provide a - fake :py:mod:`os` module replacement. - * :py:class:`FakeIoModule`: Uses :py:class:`FakeFilesystem` to provide a - fake ``io`` module replacement. - * :py:class:`FakePathModule`: Faked ``os.path`` module replacement. - * :py:class:`FakeFileOpen`: Faked ``file()`` and ``open()`` function - replacements. - :Usage: ->>> from pyfakefs import fake_filesystem +>>> from pyfakefs import fake_filesystem, fake_os >>> filesystem = fake_filesystem.FakeFilesystem() ->>> os_module = fake_filesystem.FakeOsModule(filesystem) +>>> os_module = fake_os.FakeOsModule(filesystem) >>> pathname = '/a/new/dir/new-file' Create a new file object, creating parent directory objects as needed: @@ -95,69 +82,48 @@ True """ import errno import heapq -import io -import locale import os import random import sys -import traceback -import uuid -from collections import namedtuple +import tempfile +from collections import namedtuple, OrderedDict from doctest import TestResults from enum import Enum from stat import ( - S_IFREG, S_IFDIR, S_ISLNK, S_IFMT, S_ISDIR, S_IFLNK, S_ISREG, S_IFSOCK + S_IFREG, + S_IFDIR, + S_ISLNK, + S_IFMT, + S_ISDIR, + S_IFLNK, + S_ISREG, ) -from types import ModuleType, TracebackType from typing import ( - List, Optional, Callable, Union, Any, Dict, Tuple, cast, AnyStr, overload, - NoReturn, ClassVar, IO, Iterator, TextIO, Type + List, + Optional, + Callable, + Union, + Any, + Dict, + Tuple, + cast, + AnyStr, + overload, + NoReturn, ) -from pyfakefs.deprecator import Deprecator -from pyfakefs.extra_packages import use_scandir -from pyfakefs.fake_scandir import scandir, walk, ScanDirIter + +from pyfakefs import fake_file, fake_path, fake_io, fake_os, helpers, fake_open +from pyfakefs.fake_file import AnyFileWrapper, AnyFile from pyfakefs.helpers import ( - FakeStatResult, BinaryBufferIO, TextBufferIO, - is_int_type, is_byte_string, is_unicode_string, make_string_path, - IS_PYPY, to_string, matching_string, real_encoding, now, AnyPath, to_bytes -) -from pyfakefs import __version__ # noqa: F401 for upwards compatibility - -PERM_READ = 0o400 # Read permission bit. -PERM_WRITE = 0o200 # Write permission bit. -PERM_EXE = 0o100 # Execute permission bit. -PERM_DEF = 0o777 # Default permission bits. -PERM_DEF_FILE = 0o666 # Default permission bits (regular file) -PERM_ALL = 0o7777 # All permission bits. - -_OpenModes = namedtuple( - '_OpenModes', - 'must_exist can_read can_write truncate append must_not_exist' + is_int_type, + make_string_path, + to_string, + matching_string, + AnyPath, + AnyString, ) -_OPEN_MODE_MAP = { - # mode name:(file must exist, can read, can write, - # truncate, append, must not exist) - 'r': (True, True, False, False, False, False), - 'w': (False, False, True, True, False, False), - 'a': (False, False, True, False, True, False), - 'r+': (True, True, True, False, False, False), - 'w+': (False, True, True, True, False, False), - 'a+': (False, True, True, False, True, False), - 'x': (False, False, True, False, False, True), - 'x+': (False, True, True, False, False, True) -} - -AnyFileWrapper = Union[ - "FakeFileWrapper", "FakeDirWrapper", - "StandardStreamWrapper", "FakePipeWrapper" -] - -AnyString = Union[str, bytes] - -AnyFile = Union["FakeFile", "FakeDirectory"] - -if sys.platform.startswith('linux'): +if sys.platform.startswith("linux"): # on newer Linux system, the default maximum recursion depth is 40 # we ignore older systems here _MAX_LINK_DEPTH = 40 @@ -165,726 +131,47 @@ else: # on MacOS and Windows, the maximum recursion depth is 32 _MAX_LINK_DEPTH = 32 -NR_STD_STREAMS = 3 -if sys.platform == 'win32': - USER_ID = 1 - GROUP_ID = 1 -else: - USER_ID = os.getuid() - GROUP_ID = os.getgid() - class OSType(Enum): """Defines the real or simulated OS of the underlying file system.""" + LINUX = "linux" MACOS = "macos" WINDOWS = "windows" -class PatchMode(Enum): - """Defines if patching shall be on, off, or in automatic mode. - Currently only used for `patch_open_code` option. - """ - OFF = 1 - AUTO = 2 - ON = 3 - - -def set_uid(uid: int) -> None: - """Set the global user id. This is used as st_uid for new files - and to differentiate between a normal user and the root user (uid 0). - For the root user, some permission restrictions are ignored. - - Args: - uid: (int) the user ID of the user calling the file system functions. - """ - global USER_ID - USER_ID = uid - - -def set_gid(gid: int) -> None: - """Set the global group id. This is only used to set st_gid for new files, - no permision checks are performed. - - Args: - gid: (int) the group ID of the user calling the file system functions. - """ - global GROUP_ID - GROUP_ID = gid - - -def reset_ids() -> None: - """Set the global user ID and group ID back to default values.""" - if sys.platform == 'win32': - set_uid(1) - set_gid(1) - else: - set_uid(os.getuid()) - set_gid(os.getgid()) - - -def is_root() -> bool: - """Return True if the current user is the root user.""" - return USER_ID == 0 - - -class FakeLargeFileIoException(Exception): - """Exception thrown on unsupported operations for fake large files. - Fake large files have a size with no real content. - """ - - def __init__(self, file_path: str) -> None: - super(FakeLargeFileIoException, self).__init__( - 'Read and write operations not supported for ' - 'fake large file: %s' % file_path) - - -def _copy_module(old: ModuleType) -> ModuleType: - """Recompiles and creates new module object.""" - saved = sys.modules.pop(old.__name__, None) - new = __import__(old.__name__) - if saved is not None: - sys.modules[old.__name__] = saved - return new - - -class FakeFile: - """Provides the appearance of a real file. - - Attributes currently faked out: - * `st_mode`: user-specified, otherwise S_IFREG - * `st_ctime`: the time.time() timestamp of the file change time (updated - each time a file's attributes is modified). - * `st_atime`: the time.time() timestamp when the file was last accessed. - * `st_mtime`: the time.time() timestamp when the file was last modified. - * `st_size`: the size of the file - * `st_nlink`: the number of hard links to the file - * `st_ino`: the inode number - a unique number identifying the file - * `st_dev`: a unique number identifying the (fake) file system device - the file belongs to - * `st_uid`: always set to USER_ID, which can be changed globally using - `set_uid` - * `st_gid`: always set to GROUP_ID, which can be changed globally using - `set_gid` - - .. note:: The resolution for `st_ctime`, `st_mtime` and `st_atime` in the - real file system depends on the used file system (for example it is - only 1s for HFS+ and older Linux file systems, but much higher for - ext4 and NTFS). This is currently ignored by pyfakefs, which uses - the resolution of `time.time()`. - - Under Windows, `st_atime` is not updated for performance reasons by - default. pyfakefs never updates `st_atime` under Windows, assuming - the default setting. - """ - stat_types = ( - 'st_mode', 'st_ino', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', - 'st_size', 'st_atime', 'st_mtime', 'st_ctime', - 'st_atime_ns', 'st_mtime_ns', 'st_ctime_ns' - ) - - def __init__(self, name: AnyStr, - st_mode: int = S_IFREG | PERM_DEF_FILE, - contents: Optional[AnyStr] = None, - filesystem: Optional["FakeFilesystem"] = None, - encoding: Optional[str] = None, - errors: Optional[str] = None, - side_effect: Optional[Callable[["FakeFile"], None]] = None): - """ - Args: - name: Name of the file/directory, without parent path information - st_mode: The stat.S_IF* constant representing the file type (i.e. - stat.S_IFREG, stat.S_IFDIR), and the file permissions. - If no file type is set (e.g. permission flags only), a - regular file type is assumed. - contents: The contents of the filesystem object; should be a string - or byte object for regular files, and a dict of other - FakeFile or FakeDirectory objects wih the file names as - keys for FakeDirectory objects - filesystem: The fake filesystem where the file is created. - encoding: If contents is a unicode string, the encoding used - for serialization. - errors: The error mode used for encoding/decoding errors. - side_effect: function handle that is executed when file is written, - must accept the file object as an argument. - """ - # to be backwards compatible regarding argument order, we raise on None - if filesystem is None: - raise ValueError('filesystem shall not be None') - self.filesystem: FakeFilesystem = filesystem - self._side_effect: Optional[Callable] = side_effect - self.name: AnyStr = name # type: ignore[assignment] - self.stat_result = FakeStatResult( - filesystem.is_windows_fs, USER_ID, GROUP_ID, now()) - if st_mode >> 12 == 0: - st_mode |= S_IFREG - self.stat_result.st_mode = st_mode - self.st_size: int = 0 - self.encoding: Optional[str] = real_encoding(encoding) - self.errors: str = errors or 'strict' - self._byte_contents: Optional[bytes] = self._encode_contents(contents) - self.stat_result.st_size = ( - len(self._byte_contents) if self._byte_contents is not None else 0) - self.epoch: int = 0 - self.parent_dir: Optional[FakeDirectory] = None - # Linux specific: extended file system attributes - self.xattr: Dict = {} - self.opened_as: AnyString = '' - - @property - def byte_contents(self) -> Optional[bytes]: - """Return the contents as raw byte array.""" - return self._byte_contents - - @property - def contents(self) -> Optional[str]: - """Return the contents as string with the original encoding.""" - if isinstance(self.byte_contents, bytes): - return self.byte_contents.decode( - self.encoding or locale.getpreferredencoding(False), - errors=self.errors) - return None - - @property - def st_ctime(self) -> float: - """Return the creation time of the fake file.""" - return self.stat_result.st_ctime - - @st_ctime.setter - def st_ctime(self, val: float) -> None: - """Set the creation time of the fake file.""" - self.stat_result.st_ctime = val - - @property - def st_atime(self) -> float: - """Return the access time of the fake file.""" - return self.stat_result.st_atime - - @st_atime.setter - def st_atime(self, val: float) -> None: - """Set the access time of the fake file.""" - self.stat_result.st_atime = val - - @property - def st_mtime(self) -> float: - """Return the modification time of the fake file.""" - return self.stat_result.st_mtime - - @st_mtime.setter - def st_mtime(self, val: float) -> None: - """Set the modification time of the fake file.""" - self.stat_result.st_mtime = val - - def set_large_file_size(self, st_size: int) -> None: - """Sets the self.st_size attribute and replaces self.content with None. - - Provided specifically to simulate very large files without regards - to their content (which wouldn't fit in memory). - Note that read/write operations with such a file raise - :py:class:`FakeLargeFileIoException`. - - Args: - st_size: (int) The desired file size - - Raises: - OSError: if the st_size is not a non-negative integer, - or if st_size exceeds the available file system space - """ - self._check_positive_int(st_size) - if self.st_size: - self.size = 0 - if self.filesystem: - self.filesystem.change_disk_usage(st_size, self.name, self.st_dev) - self.st_size = st_size - self._byte_contents = None - - def _check_positive_int(self, size: int) -> None: - # the size should be an positive integer value - if not is_int_type(size) or size < 0: - self.filesystem.raise_os_error(errno.ENOSPC, self.name) - - def is_large_file(self) -> bool: - """Return `True` if this file was initialized with size - but no contents. - """ - return self._byte_contents is None - - def _encode_contents( - self, contents: Union[str, bytes, None]) -> Optional[bytes]: - if is_unicode_string(contents): - contents = bytes( - cast(str, contents), - self.encoding or locale.getpreferredencoding(False), - self.errors) - return cast(bytes, contents) - - def set_initial_contents(self, contents: AnyStr) -> bool: - """Sets the file contents and size. - Called internally after initial file creation. - - Args: - contents: string, new content of file. - - Returns: - True if the contents have been changed. - - Raises: - OSError: if the st_size is not a non-negative integer, - or if st_size exceeds the available file system space - """ - byte_contents = self._encode_contents(contents) - changed = self._byte_contents != byte_contents - st_size = len(byte_contents) if byte_contents else 0 - - current_size = self.st_size or 0 - self.filesystem.change_disk_usage( - st_size - current_size, self.name, self.st_dev) - self._byte_contents = byte_contents - self.st_size = st_size - self.epoch += 1 - return changed - - def set_contents(self, contents: AnyStr, - encoding: Optional[str] = None) -> bool: - """Sets the file contents and size and increases the modification time. - Also executes the side_effects if available. - - Args: - contents: (str, bytes) new content of file. - encoding: (str) the encoding to be used for writing the contents - if they are a unicode string. - If not given, the locale preferred encoding is used. - - Returns: - True if the contents have been changed. - - Raises: - OSError: if `st_size` is not a non-negative integer, - or if it exceeds the available file system space. - """ - self.encoding = real_encoding(encoding) - changed = self.set_initial_contents(contents) - if self._side_effect is not None: - self._side_effect(self) - return changed - - @property - def size(self) -> int: - """Return the size in bytes of the file contents. - """ - return self.st_size - - @size.setter - def size(self, st_size: int) -> None: - """Resizes file content, padding with nulls if new size exceeds the - old size. - - Args: - st_size: The desired size for the file. - - Raises: - OSError: if the st_size arg is not a non-negative integer - or if st_size exceeds the available file system space - """ - - self._check_positive_int(st_size) - current_size = self.st_size or 0 - self.filesystem.change_disk_usage( - st_size - current_size, self.name, self.st_dev) - if self._byte_contents: - if st_size < current_size: - self._byte_contents = self._byte_contents[:st_size] - else: - self._byte_contents += b'\0' * (st_size - current_size) - self.st_size = st_size - self.epoch += 1 - - @property - def path(self) -> AnyStr: - """Return the full path of the current object.""" - names: List[AnyStr] = [] - obj: Optional[FakeFile] = self - while obj: - names.insert( - 0, matching_string(self.name, obj.name)) # type: ignore - obj = obj.parent_dir - sep = self.filesystem.get_path_separator(names[0]) - if names[0] == sep: - names.pop(0) - dir_path = sep.join(names) - drive = self.filesystem.splitdrive(dir_path)[0] - # if a Windows path already starts with a drive or UNC path, - # no extra separator is needed - if not drive: - dir_path = sep + dir_path - else: - dir_path = sep.join(names) - return self.filesystem.absnormpath(dir_path) - - @Deprecator('property path') - def GetPath(self): - return self.path - - @Deprecator('property size') - def GetSize(self): - return self.size - - @Deprecator('property size') - def SetSize(self, value): - self.size = value - - @Deprecator('property st_atime') - def SetATime(self, st_atime): - """Set the self.st_atime attribute. - - Args: - st_atime: The desired access time. - """ - self.st_atime = st_atime - - @Deprecator('property st_mtime') - def SetMTime(self, st_mtime): - """Set the self.st_mtime attribute. - - Args: - st_mtime: The desired modification time. - """ - self.st_mtime = st_mtime - - @Deprecator('property st_ctime') - def SetCTime(self, st_ctime): - """Set the self.st_ctime attribute. - - Args: - st_ctime: The desired creation time. - """ - self.st_ctime = st_ctime - - def __getattr__(self, item: str) -> Any: - """Forward some properties to stat_result.""" - if item in self.stat_types: - return getattr(self.stat_result, item) - return super().__getattribute__(item) - - def __setattr__(self, key: str, value: Any) -> None: - """Forward some properties to stat_result.""" - if key in self.stat_types: - return setattr(self.stat_result, key, value) - return super().__setattr__(key, value) - - def __str__(self) -> str: - return '%r(%o)' % (self.name, self.st_mode) - - @Deprecator('st_ino') - def SetIno(self, st_ino): - """Set the self.st_ino attribute. - Note that a unique inode is assigned automatically to a new fake file. - This function does not guarantee uniqueness and should be used with - caution. - - Args: - st_ino: (int) The desired inode. - """ - self.st_ino = st_ino - - -class FakeNullFile(FakeFile): - def __init__(self, filesystem: "FakeFilesystem") -> None: - devnull = 'nul' if filesystem.is_windows_fs else '/dev/null' - super(FakeNullFile, self).__init__( - devnull, filesystem=filesystem, contents='') - - @property - def byte_contents(self) -> bytes: - return b'' - - def set_initial_contents(self, contents: AnyStr) -> bool: - return False - - -Deprecator.add(FakeFile, FakeFile.set_large_file_size, 'SetLargeFileSize') -Deprecator.add(FakeFile, FakeFile.set_contents, 'SetContents') -Deprecator.add(FakeFile, FakeFile.is_large_file, 'IsLargeFile') - - -class FakeFileFromRealFile(FakeFile): - """Represents a fake file copied from the real file system. - - The contents of the file are read on demand only. - """ - - def __init__(self, file_path: str, filesystem: "FakeFilesystem", - side_effect: Optional[Callable] = None) -> None: - """ - Args: - file_path: Path to the existing file. - filesystem: The fake filesystem where the file is created. - - Raises: - OSError: if the file does not exist in the real file system. - OSError: if the file already exists in the fake file system. - """ - super().__init__( - name=os.path.basename(file_path), filesystem=filesystem, - side_effect=side_effect) - self.contents_read = False - - @property - def byte_contents(self) -> Optional[bytes]: - if not self.contents_read: - self.contents_read = True - with io.open(self.file_path, 'rb') as f: - self._byte_contents = f.read() - # On MacOS and BSD, the above io.open() updates atime on the real file - self.st_atime = os.stat(self.file_path).st_atime - return self._byte_contents - - def set_contents(self, contents, encoding=None): - self.contents_read = True - super(FakeFileFromRealFile, self).set_contents(contents, encoding) - - def is_large_file(self): - """The contents are never faked.""" - return False - - -class FakeDirectory(FakeFile): - """Provides the appearance of a real directory.""" - - def __init__(self, name: str, perm_bits: int = PERM_DEF, - filesystem: Optional["FakeFilesystem"] = None): - """ - Args: - name: name of the file/directory, without parent path information - perm_bits: permission bits. defaults to 0o777. - filesystem: if set, the fake filesystem where the directory - is created - """ - FakeFile.__init__( - self, name, S_IFDIR | perm_bits, '', filesystem=filesystem) - # directories have the link count of contained entries, - # including '.' and '..' - self.st_nlink += 1 - self._entries: Dict[str, AnyFile] = {} - - def set_contents(self, contents: AnyStr, - encoding: Optional[str] = None) -> bool: - raise self.filesystem.raise_os_error(errno.EISDIR, self.path) - - @property - def entries(self) -> Dict[str, FakeFile]: - """Return the list of contained directory entries.""" - return self._entries - - @property - def ordered_dirs(self) -> List[str]: - """Return the list of contained directory entry names ordered by - creation order. - """ - return [item[0] for item in sorted( - self._entries.items(), key=lambda entry: entry[1].st_ino)] - - def add_entry(self, path_object: FakeFile) -> None: - """Adds a child FakeFile to this directory. - - Args: - path_object: FakeFile instance to add as a child of this directory. - - Raises: - OSError: if the directory has no write permission (Posix only) - OSError: if the file or directory to be added already exists - """ - if (not is_root() and not self.st_mode & PERM_WRITE and - not self.filesystem.is_windows_fs): - raise OSError(errno.EACCES, 'Permission Denied', self.path) - - path_object_name: str = to_string(path_object.name) - if path_object_name in self.entries: - self.filesystem.raise_os_error(errno.EEXIST, self.path) - - self._entries[path_object_name] = path_object - path_object.parent_dir = self - if path_object.st_ino is None: - self.filesystem.last_ino += 1 - path_object.st_ino = self.filesystem.last_ino - self.st_nlink += 1 - path_object.st_nlink += 1 - path_object.st_dev = self.st_dev - if path_object.st_nlink == 1: - self.filesystem.change_disk_usage( - path_object.size, path_object.name, self.st_dev) - - def get_entry(self, pathname_name: str) -> AnyFile: - """Retrieves the specified child file or directory entry. - - Args: - pathname_name: The basename of the child object to retrieve. - - Returns: - The fake file or directory object. - - Raises: - KeyError: if no child exists by the specified name. - """ - pathname_name = self._normalized_entryname(pathname_name) - return self.entries[to_string(pathname_name)] - - def _normalized_entryname(self, pathname_name: str) -> str: - if not self.filesystem.is_case_sensitive: - matching_names = [name for name in self.entries - if name.lower() == pathname_name.lower()] - if matching_names: - pathname_name = matching_names[0] - return pathname_name - - def remove_entry(self, pathname_name: str, recursive: bool = True) -> None: - """Removes the specified child file or directory. - - Args: - pathname_name: Basename of the child object to remove. - recursive: If True (default), the entries in contained directories - are deleted first. Used to propagate removal errors - (e.g. permission problems) from contained entries. - - Raises: - KeyError: if no child exists by the specified name. - OSError: if user lacks permission to delete the file, - or (Windows only) the file is open. - """ - pathname_name = self._normalized_entryname(pathname_name) - entry = self.get_entry(pathname_name) - if self.filesystem.is_windows_fs: - if entry.st_mode & PERM_WRITE == 0: - self.filesystem.raise_os_error(errno.EACCES, pathname_name) - if self.filesystem.has_open_file(entry): - self.filesystem.raise_os_error(errno.EACCES, pathname_name) - else: - if (not is_root() and (self.st_mode & (PERM_WRITE | PERM_EXE) != - PERM_WRITE | PERM_EXE)): - self.filesystem.raise_os_error(errno.EACCES, pathname_name) - - if recursive and isinstance(entry, FakeDirectory): - while entry.entries: - entry.remove_entry(list(entry.entries)[0]) - elif entry.st_nlink == 1: - self.filesystem.change_disk_usage( - -entry.size, pathname_name, entry.st_dev) - - self.st_nlink -= 1 - entry.st_nlink -= 1 - assert entry.st_nlink >= 0 - - del self.entries[to_string(pathname_name)] - - @property - def size(self) -> int: - """Return the total size of all files contained in this directory tree. - """ - return sum([item[1].size for item in self.entries.items()]) - - @size.setter - def size(self, st_size: int) -> None: - """Setting the size is an error for a directory.""" - raise self.filesystem.raise_os_error(errno.EISDIR, self.path) - - @Deprecator('property size') - def GetSize(self): - return self.size - - def has_parent_object(self, dir_object: "FakeDirectory") -> bool: - """Return `True` if dir_object is a direct or indirect parent - directory, or if both are the same object.""" - obj: Optional[FakeDirectory] = self - while obj: - if obj == dir_object: - return True - obj = obj.parent_dir - return False - - def __str__(self) -> str: - description = super(FakeDirectory, self).__str__() + ':\n' - for item in self.entries: - item_desc = self.entries[item].__str__() - for line in item_desc.split('\n'): - if line: - description = description + ' ' + line + '\n' - return description - - -Deprecator.add(FakeDirectory, FakeDirectory.add_entry, 'AddEntry') -Deprecator.add(FakeDirectory, FakeDirectory.get_entry, 'GetEntry') -Deprecator.add(FakeDirectory, FakeDirectory.set_contents, 'SetContents') -Deprecator.add(FakeDirectory, FakeDirectory.remove_entry, 'RemoveEntry') - - -class FakeDirectoryFromRealDirectory(FakeDirectory): - """Represents a fake directory copied from the real file system. - - The contents of the directory are read on demand only. - """ - - def __init__(self, source_path: AnyPath, filesystem: "FakeFilesystem", - read_only: bool, target_path: Optional[AnyPath] = None): - """ - Args: - source_path: Full directory path. - filesystem: The fake filesystem where the directory is created. - read_only: If set, all files under the directory are treated - as read-only, e.g. a write access raises an exception; - otherwise, writing to the files changes the fake files - only as usually. - target_path: If given, the target path of the directory, - otherwise the target is the same as `source_path`. - - Raises: - OSError: if the directory does not exist in the real file system - """ - target_path = target_path or source_path - real_stat = os.stat(source_path) - super(FakeDirectoryFromRealDirectory, self).__init__( - name=to_string(os.path.split(target_path)[1]), - perm_bits=real_stat.st_mode, - filesystem=filesystem) - - self.st_ctime = real_stat.st_ctime - self.st_atime = real_stat.st_atime - self.st_mtime = real_stat.st_mtime - self.st_gid = real_stat.st_gid - self.st_uid = real_stat.st_uid - self.source_path = source_path # type: ignore - self.read_only = read_only - self.contents_read = False - - @property - def entries(self) -> Dict[str, FakeFile]: - """Return the list of contained directory entries, loading them - if not already loaded.""" - if not self.contents_read: - self.contents_read = True - base = self.path - for entry in os.listdir(self.source_path): - source_path = os.path.join(self.source_path, entry) - target_path = os.path.join(base, entry) # type: ignore - if os.path.islink(source_path): - self.filesystem.add_real_symlink(source_path, target_path) - elif os.path.isdir(source_path): - self.filesystem.add_real_directory( - source_path, self.read_only, target_path=target_path) - else: - self.filesystem.add_real_file( - source_path, self.read_only, target_path=target_path) - return self._entries - - @property - def size(self) -> int: - # we cannot get the size until the contents are loaded - if not self.contents_read: - return 0 - return super(FakeDirectoryFromRealDirectory, self).size - - @size.setter - def size(self, st_size: int) -> None: - raise self.filesystem.raise_os_error(errno.EISDIR, self.path) +# definitions for backwards compatibility +FakeFile = fake_file.FakeFile +FakeNullFile = fake_file.FakeNullFile +FakeFileFromRealFile = fake_file.FakeFileFromRealFile +FakeDirectory = fake_file.FakeDirectory +FakeDirectoryFromRealDirectory = fake_file.FakeDirectoryFromRealDirectory +FakeFileWrapper = fake_file.FakeFileWrapper +StandardStreamWrapper = fake_file.StandardStreamWrapper +FakeDirWrapper = fake_file.FakeDirWrapper +FakePipeWrapper = fake_file.FakePipeWrapper + +FakePathModule = fake_path.FakePathModule +FakeOsModule = fake_os.FakeOsModule +FakeFileOpen = fake_open.FakeFileOpen +FakeIoModule = fake_io.FakeIoModule +if sys.platform != "win32": + FakeFcntlModule = fake_io.FakeFcntlModule +PatchMode = fake_io.PatchMode + +is_root = helpers.is_root +get_uid = helpers.get_uid +set_uid = helpers.set_uid +get_gid = helpers.get_gid +set_gid = helpers.set_gid +reset_ids = helpers.reset_ids + +PERM_READ = helpers.PERM_READ +PERM_WRITE = helpers.PERM_WRITE +PERM_EXE = helpers.PERM_EXE +PERM_DEF = helpers.PERM_DEF +PERM_DEF_FILE = helpers.PERM_DEF_FILE +PERM_ALL = helpers.PERM_ALL class FakeFilesystem: @@ -897,20 +184,31 @@ class FakeFilesystem: is_macos: `True` under MacOS, or if we are faking it. is_case_sensitive: `True` if a case-sensitive file system is assumed. root: The root :py:class:`FakeDirectory` entry of the file system. - cwd: The current working directory path. umask: The umask used for newly created files, see `os.umask`. patcher: Holds the Patcher object if created from it. Allows access to the patcher object if using the pytest fs fixture. + patch_open_code: Defines how `io.open_code` will be patched; + patching can be on, off, or in automatic mode. + shuffle_listdir_results: If `True`, `os.listdir` will not sort the + results to match the real file system behavior. """ - def __init__(self, path_separator: str = os.path.sep, - total_size: int = None, - patcher: Any = None) -> None: + def __init__( + self, + path_separator: str = os.path.sep, + total_size: Optional[int] = None, + patcher: Any = None, + create_temp_dir: bool = False, + ) -> None: """ Args: path_separator: optional substitute for os.path.sep total_size: if not None, the total size in bytes of the root filesystem. + patcher: the Patcher instance if created from the Patcher + create_temp_dir: If True, a temp directory is created on initialization. + Under Posix, if the temp directory is not `/tmp`, a link to the temp + path is additionally created at `/tmp`. Example usage to use the same path separator under all systems: @@ -920,6 +218,7 @@ class FakeFilesystem: self.path_separator: str = path_separator self.alternative_path_separator: Optional[str] = os.path.altsep self.patcher = patcher + self.create_temp_dir = create_temp_dir if path_separator != os.sep: self.alternative_path_separator = None @@ -927,20 +226,20 @@ class FakeFilesystem: # Windows fs on non-Windows systems and vice verse; # is it used to support drive letters, UNC paths and some other # Windows-specific features - self.is_windows_fs = sys.platform == 'win32' + self._is_windows_fs = sys.platform == "win32" # can be used to test some MacOS-specific behavior under other systems - self.is_macos = sys.platform == 'darwin' + self._is_macos = sys.platform == "darwin" # is_case_sensitive can be used to test pyfakefs for case-sensitive # file systems on non-case-sensitive systems and vice verse - self.is_case_sensitive = not (self.is_windows_fs or self.is_macos) + self.is_case_sensitive: bool = not (self.is_windows_fs or self._is_macos) - self.root = FakeDirectory(self.path_separator, filesystem=self) - self.cwd = self.root.name + self.root: FakeDirectory + self._cwd = "" # We can't query the current value without changing it: - self.umask = os.umask(0o22) + self.umask: int = os.umask(0o22) os.umask(self.umask) # A list of open file objects. Their position in the list is their @@ -949,12 +248,12 @@ class FakeFilesystem: # A heap containing all free positions in self.open_files list self._free_fd_heap: List[int] = [] # last used numbers for inodes (st_ino) and devices (st_dev) - self.last_ino = 0 - self.last_dev = 0 - self.mount_points: Dict[AnyString, Dict] = {} - self.add_mount_point(self.root.name, total_size) - self._add_standard_streams() - self.dev_null = FakeNullFile(self) + self.last_ino: int = 0 + self.last_dev: int = 0 + self.mount_points: Dict[AnyString, Dict] = OrderedDict() + self.dev_null: Any = None + self.reset(total_size=total_size, init_pathlib=False) + # set from outside if needed self.patch_open_code = PatchMode.OFF self.shuffle_listdir_results = False @@ -964,38 +263,105 @@ class FakeFilesystem: return not self.is_windows_fs and not self.is_macos @property + def is_windows_fs(self) -> bool: + return self._is_windows_fs + + @is_windows_fs.setter + def is_windows_fs(self, value: bool) -> None: + if self._is_windows_fs != value: + self._is_windows_fs = value + self.reset() + FakePathModule.reset(self) + + @property + def is_macos(self) -> bool: + return self._is_macos + + @is_macos.setter + def is_macos(self, value: bool) -> None: + if self._is_macos != value: + self._is_macos = value + self.reset() + FakePathModule.reset(self) + + @property + def cwd(self) -> str: + """Return the current working directory of the fake filesystem.""" + return self._cwd + + @cwd.setter + def cwd(self, value: str) -> None: + """Set the current working directory of the fake filesystem. + Make sure a new drive or share is auto-mounted under Windows. + """ + self._cwd = value + self._auto_mount_drive_if_needed(value) + + @property + def root_dir(self) -> FakeDirectory: + """Return the root directory, which represents "/" under POSIX, + and the current drive under Windows.""" + if self.is_windows_fs: + return self._mount_point_dir_for_cwd() + return self.root + + @property + def root_dir_name(self) -> str: + """Return the root directory name, which is "/" under POSIX, + and the root path of the current drive under Windows.""" + root_dir = to_string(self.root_dir.name) + if not root_dir.endswith(self.path_separator): + return root_dir + self.path_separator + return root_dir + + @property def os(self) -> OSType: """Return the real or simulated type of operating system.""" - return (OSType.WINDOWS if self.is_windows_fs else - OSType.MACOS if self.is_macos else OSType.LINUX) + return ( + OSType.WINDOWS + if self.is_windows_fs + else OSType.MACOS + if self.is_macos + else OSType.LINUX + ) @os.setter def os(self, value: OSType) -> None: """Set the simulated type of operating system underlying the fake file system.""" - self.is_windows_fs = value == OSType.WINDOWS - self.is_macos = value == OSType.MACOS + self._is_windows_fs = value == OSType.WINDOWS + self._is_macos = value == OSType.MACOS self.is_case_sensitive = value == OSType.LINUX - self.path_separator = '\\' if value == OSType.WINDOWS else '/' - self.alternative_path_separator = ('/' if value == OSType.WINDOWS - else None) + self.path_separator = "\\" if value == OSType.WINDOWS else "/" + self.alternative_path_separator = "/" if value == OSType.WINDOWS else None self.reset() FakePathModule.reset(self) - def reset(self, total_size: Optional[int] = None): + def reset(self, total_size: Optional[int] = None, init_pathlib: bool = True): """Remove all file system contents and reset the root.""" self.root = FakeDirectory(self.path_separator, filesystem=self) - self.cwd = self.root.name - self.open_files = [] - self._free_fd_heap = [] + self.dev_null = FakeNullFile(self) + self.open_files.clear() + self._free_fd_heap.clear() self.last_ino = 0 self.last_dev = 0 - self.mount_points = {} - self.add_mount_point(self.root.name, total_size) + self.mount_points.clear() + self._add_root_mount_point(total_size) self._add_standard_streams() - from pyfakefs import fake_pathlib - fake_pathlib.init_module(self) + if self.create_temp_dir: + self._create_temp_dir() + if init_pathlib: + from pyfakefs import fake_pathlib + + fake_pathlib.init_module(self) + + def _add_root_mount_point(self, total_size): + mount_point = "C:" if self.is_windows_fs else self.path_separator + self._cwd = mount_point + if not self.cwd.endswith(self.path_separator): + self._cwd += self.path_separator + self.add_mount_point(mount_point, total_size) def pause(self) -> None: """Pause the patching of the file system modules until `resume` is @@ -1009,8 +375,10 @@ class FakeFilesystem: RuntimeError: if the file system was not created by a Patcher. """ if self.patcher is None: - raise RuntimeError('pause() can only be called from a fake file ' - 'system object created by a Patcher object') + raise RuntimeError( + "pause() can only be called from a fake file " + "system object created by a Patcher object" + ) self.patcher.pause() def resume(self) -> None: @@ -1022,8 +390,10 @@ class FakeFilesystem: RuntimeError: if the file system has not been created by `Patcher`. """ if self.patcher is None: - raise RuntimeError('resume() can only be called from a fake file ' - 'system object created by a Patcher object') + raise RuntimeError( + "resume() can only be called from a fake file " + "system object created by a Patcher object" + ) self.patcher.resume() def clear_cache(self) -> None: @@ -1032,14 +402,14 @@ class FakeFilesystem: self.patcher.clear_cache() def line_separator(self) -> str: - return '\r\n' if self.is_windows_fs else '\n' - - def _error_message(self, err_no: int) -> str: - return os.strerror(err_no) + ' in the fake filesystem' - - def raise_os_error(self, err_no: int, - filename: Optional[AnyString] = None, - winerror: Optional[int] = None) -> NoReturn: + return "\r\n" if self.is_windows_fs else "\n" + + def raise_os_error( + self, + err_no: int, + filename: Optional[AnyString] = None, + winerror: Optional[int] = None, + ) -> NoReturn: """Raises OSError. The error message is constructed from the given error code and shall start with the error string issued in the real system. @@ -1052,9 +422,8 @@ class FakeFilesystem: filename: The name of the affected file, if any. winerror: Windows only - the specific Windows error code. """ - message = self._error_message(err_no) - if (winerror is not None and sys.platform == 'win32' and - self.is_windows_fs): + message = os.strerror(err_no) + " in the fake filesystem" + if winerror is not None and sys.platform == "win32" and self.is_windows_fs: raise OSError(err_no, message, filename, winerror) raise OSError(err_no, message, filename) @@ -1062,20 +431,22 @@ class FakeFilesystem: """Return the path separator as the same type as path""" return matching_string(path, self.path_separator) - def _alternative_path_separator( - self, path: AnyStr) -> Optional[AnyStr]: + def _alternative_path_separator(self, path: AnyStr) -> Optional[AnyStr]: """Return the alternative path separator as the same type as path""" return matching_string(path, self.alternative_path_separator) - def _starts_with_sep(self, path: AnyStr) -> bool: + def starts_with_sep(self, path: AnyStr) -> bool: """Return True if path starts with a path separator.""" sep = self.get_path_separator(path) altsep = self._alternative_path_separator(path) - return (path.startswith(sep) or altsep is not None and - path.startswith(altsep)) - - def add_mount_point(self, path: AnyStr, - total_size: Optional[int] = None) -> Dict: + return path.startswith(sep) or altsep is not None and path.startswith(altsep) + + def add_mount_point( + self, + path: AnyStr, + total_size: Optional[int] = None, + can_exist: bool = False, + ) -> Dict: """Add a new mount point for a filesystem device. The mount point gets a new unique device number. @@ -1085,36 +456,75 @@ class FakeFilesystem: total_size: The new total size of the added filesystem device in bytes. Defaults to infinite size. + can_exist: If True, no error is raised if the mount point + already exists + Returns: The newly created mount point dict. Raises: - OSError: if trying to mount an existing mount point again. + OSError: if trying to mount an existing mount point again, + and `can_exist` is False. """ - path = self.absnormpath(path) - if path in self.mount_points: - self.raise_os_error(errno.EEXIST, path) + path = self.normpath(self.normcase(path)) + for mount_point in self.mount_points: + if ( + self.is_case_sensitive + and path == matching_string(path, mount_point) + or not self.is_case_sensitive + and path.lower() == matching_string(path, mount_point.lower()) + ): + if can_exist: + return self.mount_points[mount_point] + self.raise_os_error(errno.EEXIST, path) + self.last_dev += 1 self.mount_points[path] = { - 'idev': self.last_dev, 'total_size': total_size, 'used_size': 0 + "idev": self.last_dev, + "total_size": total_size, + "used_size": 0, } - # special handling for root path: has been created before - if path == self.root.name: + if path == matching_string(path, self.root.name): + # special handling for root path: has been created before root_dir = self.root self.last_ino += 1 root_dir.st_ino = self.last_ino else: - root_dir = self.create_dir(path) + root_dir = self._create_mount_point_dir(path) root_dir.st_dev = self.last_dev return self.mount_points[path] - def _auto_mount_drive_if_needed(self, path: AnyStr, - force: bool = False) -> Optional[Dict]: - if (self.is_windows_fs and - (force or not self._mount_point_for_path(path))): + def _create_mount_point_dir(self, directory_path: AnyPath) -> FakeDirectory: + """A version of `create_dir` for the mount point directory creation, + which avoids circular calls and unneeded checks. + """ + dir_path = self.make_string_path(directory_path) + path_components = self._path_components(dir_path) + current_dir = self.root + + new_dirs = [] + for component in [to_string(p) for p in path_components]: + directory = self._directory_content(current_dir, to_string(component))[1] + if not directory: + new_dir = FakeDirectory(component, filesystem=self) + new_dirs.append(new_dir) + current_dir.add_entry(new_dir) + current_dir = new_dir + else: + current_dir = cast(FakeDirectory, directory) + + for new_dir in new_dirs: + new_dir.st_mode = S_IFDIR | helpers.PERM_DEF + + return current_dir + + def _auto_mount_drive_if_needed(self, path: AnyStr) -> Optional[Dict]: + """Windows only: if `path` is located on an unmounted drive or UNC + mount point, the drive/mount point is added to the mount points.""" + if self.is_windows_fs: drive = self.splitdrive(path)[0] if drive: - return self.add_mount_point(path=drive) + return self.add_mount_point(path=drive, can_exist=True) return None def _mount_point_for_path(self, path: AnyStr) -> Dict: @@ -1122,7 +532,7 @@ class FakeFilesystem: for mount_path in self.mount_points: if path == matching_string(path, mount_path): return self.mount_points[mount_path] - mount_path = matching_string(path, '') + mount_path = matching_string(path, "") drive = self.splitdrive(path)[0] for root_path in self.mount_points: root_path = matching_string(path, root_path) @@ -1132,18 +542,42 @@ class FakeFilesystem: mount_path = root_path if mount_path: return self.mount_points[to_string(mount_path)] - mount_point = self._auto_mount_drive_if_needed(path, force=True) + mount_point = self._auto_mount_drive_if_needed(path) assert mount_point return mount_point + def _mount_point_dir_for_cwd(self) -> FakeDirectory: + """Return the fake directory object of the mount point where the + current working directory points to.""" + + def object_from_path(file_path) -> FakeDirectory: + path_components = self._path_components(file_path) + target = self.root + for component in path_components: + target = cast(FakeDirectory, target.get_entry(component)) + return target + + path = to_string(self.cwd) + for mount_path in self.mount_points: + if path == to_string(mount_path): + return object_from_path(mount_path) + mount_path = "" + drive = to_string(self.splitdrive(path)[0]) + for root_path in self.mount_points: + str_root_path = to_string(root_path) + if drive and not str_root_path.startswith(drive): + continue + if path.startswith(str_root_path) and len(str_root_path) > len(mount_path): + mount_path = root_path + return object_from_path(mount_path) + def _mount_point_for_device(self, idev: int) -> Optional[Dict]: for mount_point in self.mount_points.values(): - if mount_point['idev'] == idev: + if mount_point["idev"] == idev: return mount_point return None - def get_disk_usage( - self, path: AnyStr = None) -> Tuple[int, int, int]: + def get_disk_usage(self, path: Optional[AnyStr] = None) -> Tuple[int, int, int]: """Return the total, used and free disk space in bytes as named tuple, or placeholder values simulating unlimited space if not set. @@ -1154,21 +588,21 @@ class FakeFilesystem: `path` resides. Defaults to the root path (e.g. '/' on Unix systems). """ - DiskUsage = namedtuple('DiskUsage', 'total, used, free') + DiskUsage = namedtuple("DiskUsage", "total, used, free") if path is None: - mount_point = self.mount_points[self.root.name] + mount_point = next(iter(self.mount_points.values())) else: - mount_point = self._mount_point_for_path(path) - if mount_point and mount_point['total_size'] is not None: - return DiskUsage(mount_point['total_size'], - mount_point['used_size'], - mount_point['total_size'] - - mount_point['used_size']) - return DiskUsage( - 1024 * 1024 * 1024 * 1024, 0, 1024 * 1024 * 1024 * 1024) - - def set_disk_usage( - self, total_size: int, path: Optional[AnyStr] = None) -> None: + file_path = make_string_path(path) + mount_point = self._mount_point_for_path(file_path) + if mount_point and mount_point["total_size"] is not None: + return DiskUsage( + mount_point["total_size"], + mount_point["used_size"], + mount_point["total_size"] - mount_point["used_size"], + ) + return DiskUsage(1024 * 1024 * 1024 * 1024, 0, 1024 * 1024 * 1024 * 1024) + + def set_disk_usage(self, total_size: int, path: Optional[AnyStr] = None) -> None: """Changes the total size of the file system, preserving the used space. Example usage: set the size of an auto-mounted Windows drive. @@ -1183,16 +617,20 @@ class FakeFilesystem: Raises: OSError: if the new space is smaller than the used size. """ - file_path: AnyStr = (path if path is not None # type: ignore - else self.root.name) + file_path: AnyStr = ( + path if path is not None else self.root_dir_name # type: ignore + ) mount_point = self._mount_point_for_path(file_path) - if (mount_point['total_size'] is not None and - mount_point['used_size'] > total_size): + if ( + mount_point["total_size"] is not None + and mount_point["used_size"] > total_size + ): self.raise_os_error(errno.ENOSPC, path) - mount_point['total_size'] = total_size + mount_point["total_size"] = total_size - def change_disk_usage(self, usage_change: int, - file_path: AnyStr, st_dev: int) -> None: + def change_disk_usage( + self, usage_change: int, file_path: AnyStr, st_dev: int + ) -> None: """Change the used disk space by the given amount. Args: @@ -1208,14 +646,13 @@ class FakeFilesystem: """ mount_point = self._mount_point_for_device(st_dev) if mount_point: - total_size = mount_point['total_size'] + total_size = mount_point["total_size"] if total_size is not None: - if total_size - mount_point['used_size'] < usage_change: + if total_size - mount_point["used_size"] < usage_change: self.raise_os_error(errno.ENOSPC, file_path) - mount_point['used_size'] += usage_change + mount_point["used_size"] += usage_change - def stat(self, entry_path: AnyStr, - follow_symlinks: bool = True): + def stat(self, entry_path: AnyStr, follow_symlinks: bool = True): """Return the os.stat-like tuple for the FakeFile object of entry_path. Args: @@ -1232,8 +669,11 @@ class FakeFilesystem: # stat should return the tuple representing return value of os.stat try: file_object = self.resolve( - entry_path, follow_symlinks, - allow_fd=True, check_read_perm=False) + entry_path, + follow_symlinks, + allow_fd=True, + check_read_perm=False, + ) except TypeError: file_object = self.resolve(entry_path) if not is_root(): @@ -1243,15 +683,18 @@ class FakeFilesystem: self.get_object(parent_dir.path) # type: ignore[arg-type] self.raise_for_filepath_ending_with_separator( - entry_path, file_object, follow_symlinks) + entry_path, file_object, follow_symlinks + ) return file_object.stat_result.copy() def raise_for_filepath_ending_with_separator( - self, entry_path: AnyStr, - file_object: FakeFile, - follow_symlinks: bool = True, - macos_handling: bool = False) -> None: + self, + entry_path: AnyStr, + file_object: FakeFile, + follow_symlinks: bool = True, + macos_handling: bool = False, + ) -> None: if self.ends_with_path_separator(entry_path): if S_ISLNK(file_object.st_mode): try: @@ -1271,12 +714,16 @@ class FakeFilesystem: else: is_error = not S_ISDIR(file_object.st_mode) if is_error: - error_nr = (errno.EINVAL if self.is_windows_fs - else errno.ENOTDIR) + error_nr = errno.EINVAL if self.is_windows_fs else errno.ENOTDIR self.raise_os_error(error_nr, entry_path) - def chmod(self, path: AnyStr, mode: int, - follow_symlinks: bool = True) -> None: + def chmod( + self, + path: AnyStr, + mode: int, + follow_symlinks: bool = True, + force_unix_mode: bool = False, + ) -> None: """Change the permissions of a file as encoded in integer mode. Args: @@ -1284,22 +731,31 @@ class FakeFilesystem: mode: (int) Permissions. follow_symlinks: If `False` and `path` points to a symlink, the link itself is affected instead of the linked object. + force_unix_mode: if True and run under Windows, the mode is not + adapted for Windows to allow making dirs unreadable """ - file_object = self.resolve(path, follow_symlinks, allow_fd=True) - if self.is_windows_fs: - if mode & PERM_WRITE: + file_object = self.resolve( + path, follow_symlinks, allow_fd=True, check_owner=True + ) + if self.is_windows_fs and not force_unix_mode: + if mode & helpers.PERM_WRITE: file_object.st_mode = file_object.st_mode | 0o222 else: file_object.st_mode = file_object.st_mode & 0o777555 else: - file_object.st_mode = ((file_object.st_mode & ~PERM_ALL) | - (mode & PERM_ALL)) - file_object.st_ctime = now() - - def utime(self, path: AnyStr, - times: Optional[Tuple[Union[int, float], Union[int, float]]] = - None, *, ns: Optional[Tuple[int, int]] = None, - follow_symlinks: bool = True) -> None: + file_object.st_mode = (file_object.st_mode & ~helpers.PERM_ALL) | ( + mode & helpers.PERM_ALL + ) + file_object.st_ctime = helpers.now() + + def utime( + self, + path: AnyStr, + times: Optional[Tuple[Union[int, float], Union[int, float]]] = None, + *, + ns: Optional[Tuple[int, int]] = None, + follow_symlinks: bool = True, + ) -> None: """Change the access and modified times of a file. Args: @@ -1325,50 +781,37 @@ class FakeFilesystem: if times is not None: for file_time in times: if not isinstance(file_time, (int, float)): - raise TypeError('atime and mtime must be numbers') + raise TypeError("atime and mtime must be numbers") file_object.st_atime = times[0] file_object.st_mtime = times[1] elif ns is not None: for file_time in ns: if not isinstance(file_time, int): - raise TypeError('atime and mtime must be ints') + raise TypeError("atime and mtime must be ints") file_object.st_atime_ns = ns[0] file_object.st_mtime_ns = ns[1] else: - current_time = now() + current_time = helpers.now() file_object.st_atime = current_time file_object.st_mtime = current_time + @staticmethod def _handle_utime_arg_errors( - self, ns: Optional[Tuple[int, int]], - times: Optional[Tuple[Union[int, float], Union[int, float]]]): + ns: Optional[Tuple[int, int]], + times: Optional[Tuple[Union[int, float], Union[int, float]]], + ): if times is not None and ns is not None: raise ValueError( - "utime: you may specify either 'times' or 'ns' but not both") + "utime: you may specify either 'times' or 'ns' but not both" + ) if times is not None and len(times) != 2: - raise TypeError( - "utime: 'times' must be either a tuple of two ints or None") + raise TypeError("utime: 'times' must be either a tuple of two ints or None") if ns is not None and len(ns) != 2: raise TypeError("utime: 'ns' must be a tuple of two ints") - @Deprecator - def SetIno(self, path, st_ino): - """Set the self.st_ino attribute of file at 'path'. - Note that a unique inode is assigned automatically to a new fake file. - Using this function does not guarantee uniqueness and should used - with caution. - - Args: - path: Path to file. - st_ino: The desired inode. - """ - self.get_object(path).st_ino = st_ino - - def _add_open_file( - self, - file_obj: AnyFileWrapper) -> int: + def _add_open_file(self, file_obj: AnyFileWrapper) -> int: """Add file_obj to the list of open files on the filesystem. Used internally to manage open files. @@ -1415,7 +858,7 @@ class FakeFilesystem: Open file object. """ if not is_int_type(file_des): - raise TypeError('an integer is required') + raise TypeError("an integer is required") valid = file_des < len(self.open_files) if valid: file_list = self.open_files[file_des] @@ -1432,8 +875,9 @@ class FakeFilesystem: Returns: `True` if the file is open. """ - return (file_object in [wrappers[0].get_object() - for wrappers in self.open_files if wrappers]) + return file_object in [ + wrappers[0].get_object() for wrappers in self.open_files if wrappers + ] def _normalize_path_sep(self, path: AnyStr) -> AnyStr: alt_sep = self._alternative_path_separator(path) @@ -1482,16 +926,21 @@ class FakeFilesystem: drive, path_str = self.splitdrive(path_str) sep = self.get_path_separator(path_str) is_absolute_path = path_str.startswith(sep) - path_components: List[AnyStr] = path_str.split(sep) - collapsed_path_components: List[AnyStr] = [] - dot = matching_string(path_str, '.') - dotdot = matching_string(path_str, '..') + path_components: List[AnyStr] = path_str.split( + sep + ) # pytype: disable=invalid-annotation + collapsed_path_components: List[ + AnyStr + ] = [] # pytype: disable=invalid-annotation + dot = matching_string(path_str, ".") + dotdot = matching_string(path_str, "..") for component in path_components: if (not component) or (component == dot): continue if component == dotdot: if collapsed_path_components and ( - collapsed_path_components[-1] != dotdot): + collapsed_path_components[-1] != dotdot + ): # Remove an up-reference: directory/.. collapsed_path_components.pop() continue @@ -1520,17 +969,21 @@ class FakeFilesystem: def components_to_path(): if len(path_components) > len(normalized_components): normalized_components.extend( - to_string(p) for p in path_components[len( - normalized_components):]) + to_string(p) for p in path_components[len(normalized_components) :] + ) sep = self.path_separator normalized_path = sep.join(normalized_components) - if (self._starts_with_sep(path) - and not self._starts_with_sep(normalized_path)): + if self.starts_with_sep(path) and not self.starts_with_sep(normalized_path): normalized_path = sep + normalized_path + if len(normalized_path) == 2 and self.starts_with_drive_letter( + normalized_path + ): + normalized_path += sep return normalized_path if self.is_case_sensitive or not path: return path + path = self.replace_windows_root(path) path_components = self._path_components(path) normalized_components = [] current_dir = self.root @@ -1538,11 +991,13 @@ class FakeFilesystem: if not isinstance(current_dir, FakeDirectory): return components_to_path() dir_name, directory = self._directory_content( - current_dir, to_string(component)) + current_dir, to_string(component) + ) if directory is None or ( - isinstance(directory, FakeDirectory) and - directory._byte_contents is None and - directory.st_size == 0): + isinstance(directory, FakeDirectory) + and directory._byte_contents is None + and directory.st_size == 0 + ): return components_to_path() current_dir = cast(FakeDirectory, directory) normalized_components.append(dir_name) @@ -1564,17 +1019,18 @@ class FakeFilesystem: path = self.normcase(path) cwd = matching_string(path, self.cwd) if not path: - path = matching_string(path, self.path_separator) - if path == matching_string(path, '.'): + path = self.get_path_separator(path) + if path == matching_string(path, "."): path = cwd elif not self._starts_with_root_path(path): # Prefix relative paths with cwd, if cwd is not root. root_name = matching_string(path, self.root.name) - empty = matching_string(path, '') + empty = matching_string(path, "") path = self.get_path_separator(path).join( - (cwd != root_name and cwd or empty, path)) - if path == matching_string(path, '.'): - path = cwd + (cwd != root_name and cwd or empty, path) + ) + else: + path = self.replace_windows_root(path) return self.normpath(path) def splitpath(self, path: AnyStr) -> Tuple[AnyStr, AnyStr]: @@ -1596,7 +1052,7 @@ class FakeFilesystem: seps = sep if alt_sep is None else sep + alt_sep drive, path = self.splitdrive(path) i = len(path) - while i and path[i-1] not in seps: + while i and path[i - 1] not in seps: i -= 1 head, tail = path[:i], path[i:] # now tail has no slashes # remove trailing slashes from head, unless it's all slashes @@ -1623,8 +1079,7 @@ class FakeFilesystem: norm_str = self.normcase(path_str) sep = self.get_path_separator(path_str) # UNC path_str handling - if (norm_str[0:2] == sep * 2) and ( - norm_str[2:3] != sep): + if (norm_str[0:2] == sep * 2) and (norm_str[2:3] != sep): # UNC path_str handling - splits off the mount point # instead of the drive sep_index = norm_str.find(sep, 2) @@ -1636,12 +1091,67 @@ class FakeFilesystem: if sep_index2 == -1: sep_index2 = len(path_str) return path_str[:sep_index2], path_str[sep_index2:] - if path_str[1:2] == matching_string(path_str, ':'): + if path_str[1:2] == matching_string(path_str, ":"): return path_str[:2], path_str[2:] return path_str[:0], path_str - def _join_paths_with_drive_support( - self, *all_paths: AnyStr) -> AnyStr: + def splitroot(self, path: AnyStr): + """Split a pathname into drive, root and tail. + Implementation taken from ntpath and posixpath. + """ + p = os.fspath(path) + if isinstance(p, bytes): + sep = self.path_separator.encode() + altsep = None + if self.alternative_path_separator: + altsep = self.alternative_path_separator.encode() + colon = b":" + unc_prefix = b"\\\\?\\UNC\\" + empty = b"" + else: + sep = self.path_separator + altsep = self.alternative_path_separator + colon = ":" + unc_prefix = "\\\\?\\UNC\\" + empty = "" + if self.is_windows_fs: + normp = p.replace(altsep, sep) if altsep else p + if normp[:1] == sep: + if normp[1:2] == sep: + # UNC drives, e.g. \\server\share or \\?\UNC\server\share + # Device drives, e.g. \\.\device or \\?\device + start = 8 if normp[:8].upper() == unc_prefix else 2 + index = normp.find(sep, start) + if index == -1: + return p, empty, empty + index2 = normp.find(sep, index + 1) + if index2 == -1: + return p, empty, empty + return p[:index2], p[index2 : index2 + 1], p[index2 + 1 :] + else: + # Relative path with root, e.g. \Windows + return empty, p[:1], p[1:] + elif normp[1:2] == colon: + if normp[2:3] == sep: + # Absolute drive-letter path, e.g. X:\Windows + return p[:2], p[2:3], p[3:] + else: + # Relative path with drive, e.g. X:Windows + return p[:2], empty, p[2:] + else: + # Relative path, e.g. Windows + return empty, empty, p + else: + if p[:1] != sep: + # Relative path, e.g.: 'foo' + return empty, empty, p + elif p[1:2] != sep or p[2:3] == sep: + # Absolute path, e.g.: '/foo', '///foo', '////foo', etc. + return empty, sep, p[1:] + else: + return empty, p[:2], p[2:] + + def _join_paths_with_drive_support(self, *all_paths: AnyStr) -> AnyStr: """Taken from Python 3.5 os.path.join() code in ntpath.py and slightly adapted""" base_path = all_paths[0] @@ -1658,8 +1168,7 @@ class FakeFilesystem: result_path = path_part continue elif drive_part and drive_part != result_drive: - if (self.is_case_sensitive or - drive_part.lower() != result_drive.lower()): + if self.is_case_sensitive or drive_part.lower() != result_drive.lower(): # Different drives => ignore the first path entirely result_drive = drive_part result_path = path_part @@ -1671,9 +1180,13 @@ class FakeFilesystem: result_path = result_path + sep result_path = result_path + path_part # add separator between UNC and non-absolute path - colon = matching_string(base_path, ':') - if (result_path and result_path[:1] not in seps and - result_drive and result_drive[-1:] != colon): + colon = matching_string(base_path, ":") + if ( + result_path + and result_path[:1] not in seps + and result_drive + and result_drive[-1:] != colon + ): return result_drive + sep + result_path return result_drive + result_path @@ -1699,18 +1212,19 @@ class FakeFilesystem: # An absolute path joined_path_segments = [path_segment] else: - if (joined_path_segments and - not joined_path_segments[-1].endswith(sep)): + if joined_path_segments and not joined_path_segments[-1].endswith(sep): joined_path_segments.append(sep) if path_segment: joined_path_segments.append(path_segment) - return matching_string(file_paths[0], '').join(joined_path_segments) + return matching_string(file_paths[0], "").join(joined_path_segments) @overload - def _path_components(self, path: str) -> List[str]: ... + def _path_components(self, path: str) -> List[str]: + ... @overload - def _path_components(self, path: bytes) -> List[bytes]: ... + def _path_components(self, path: bytes) -> List[bytes]: + ... def _path_components(self, path: AnyStr) -> List[AnyStr]: """Breaks the path into a list of component names. @@ -1751,7 +1265,7 @@ class FakeFilesystem: path_components.insert(0, drive) return path_components - def _starts_with_drive_letter(self, file_path: AnyStr) -> bool: + def starts_with_drive_letter(self, file_path: AnyStr) -> bool: """Return True if file_path starts with a drive letter. Args: @@ -1761,12 +1275,11 @@ class FakeFilesystem: `True` if drive letter support is enabled in the filesystem and the path starts with a drive letter. """ - colon = matching_string(file_path, ':') - if (len(file_path) >= 2 and - file_path[:1].isalpha and file_path[1:2] == colon): + colon = matching_string(file_path, ":") + if len(file_path) >= 2 and file_path[0:1].isalpha() and file_path[1:2] == colon: if self.is_windows_fs: return True - if os.name == 'nt': + if os.name == "nt": # special case if we are emulating Posix under Windows # check if the path exists because it has been mapped in # this is not foolproof, but handles most cases @@ -1780,17 +1293,55 @@ class FakeFilesystem: def _starts_with_root_path(self, file_path: AnyStr) -> bool: root_name = matching_string(file_path, self.root.name) file_path = self._normalize_path_sep(file_path) - return (file_path.startswith(root_name) or - not self.is_case_sensitive and file_path.lower().startswith( - root_name.lower()) or - self._starts_with_drive_letter(file_path)) + return ( + file_path.startswith(root_name) + or not self.is_case_sensitive + and file_path.lower().startswith(root_name.lower()) + or self.starts_with_drive_letter(file_path) + ) + + def replace_windows_root(self, path: AnyStr) -> AnyStr: + """In windows, if a path starts with a single separator, + it points to the root dir of the current mount point, usually a + drive - replace it with that mount point path to get the real path. + """ + if path and self.is_windows_fs and self.root_dir: + sep = self.get_path_separator(path) + # ignore UNC paths + if path[0:1] == sep and (len(path) == 1 or path[1:2] != sep): + # check if we already have a mount point for that path + for root_path in self.mount_points: + root_path = matching_string(path, root_path) + if path.startswith(root_path): + return path + # must be a pointer to the current drive - replace it + mount_point = matching_string(path, self.root_dir_name) + path = mount_point + path[1:] + return path def _is_root_path(self, file_path: AnyStr) -> bool: root_name = matching_string(file_path, self.root.name) - return (file_path == root_name or not self.is_case_sensitive and - file_path.lower() == root_name.lower() or - 2 <= len(file_path) <= 3 and - self._starts_with_drive_letter(file_path)) + return file_path == root_name or self.is_mount_point(file_path) + + def is_mount_point(self, file_path: AnyStr) -> bool: + """Return `True` if `file_path` points to a mount point.""" + for mount_point in self.mount_points: + mount_point = matching_string(file_path, mount_point) + if ( + file_path == mount_point + or not self.is_case_sensitive + and file_path.lower() == mount_point.lower() + ): + return True + if ( + self.is_windows_fs + and len(file_path) == 3 + and len(mount_point) == 2 + and self.starts_with_drive_letter(file_path) + and file_path[:2].lower() == mount_point.lower() + ): + return True + return False def ends_with_path_separator(self, path: Union[int, AnyPath]) -> bool: """Return True if ``file_path`` ends with a valid path separator.""" @@ -1801,26 +1352,28 @@ class FakeFilesystem: return False sep = self.get_path_separator(file_path) altsep = self._alternative_path_separator(file_path) - return (file_path not in (sep, altsep) and - (file_path.endswith(sep) or - altsep is not None and file_path.endswith(altsep))) + return file_path not in (sep, altsep) and ( + file_path.endswith(sep) or altsep is not None and file_path.endswith(altsep) + ) def is_filepath_ending_with_separator(self, path: AnyStr) -> bool: if not self.ends_with_path_separator(path): return False return self.isfile(self._path_without_trailing_separators(path)) - def _directory_content(self, directory: FakeDirectory, - component: str) -> Tuple[Optional[str], - Optional[AnyFile]]: + def _directory_content( + self, directory: FakeDirectory, component: str + ) -> Tuple[Optional[str], Optional[AnyFile]]: if not isinstance(directory, FakeDirectory): return None, None if component in directory.entries: return component, directory.entries[component] if not self.is_case_sensitive: - matching_content = [(subdir, directory.entries[subdir]) for - subdir in directory.entries - if subdir.lower() == component.lower()] + matching_content = [ + (subdir, directory.entries[subdir]) + for subdir in directory.entries + if subdir.lower() == component.lower() + ] if matching_content: return matching_content[0] @@ -1841,7 +1394,7 @@ class FakeFilesystem: """ if check_link and self.islink(file_path): return True - path = to_string(make_string_path(file_path)) + path = to_string(self.make_string_path(file_path)) if path is None: raise TypeError if not path: @@ -1854,21 +1407,19 @@ class FakeFilesystem: path = self.resolve_path(path) except OSError: return False - if path == self.root.name: + if self._is_root_path(path): return True path_components: List[str] = self._path_components(path) current_dir = self.root for component in path_components: - directory = self._directory_content( - current_dir, to_string(component))[1] + directory = self._directory_content(current_dir, to_string(component))[1] if directory is None: return False current_dir = cast(FakeDirectory, directory) return True - def resolve_path(self, - file_path: AnyStr, allow_fd: bool = False) -> AnyStr: + def resolve_path(self, file_path: AnyStr, allow_fd: bool = False) -> AnyStr: """Follow a path, resolving symlinks. ResolvePath traverses the filesystem along the specified file path, @@ -1911,23 +1462,29 @@ class FakeFilesystem: path = make_string_path(file_path) if path is None: # file.open(None) raises TypeError, so mimic that. - raise TypeError('Expected file system path string, received None') + raise TypeError("Expected file system path string, received None") if not path or not self._valid_relative_path(path): # file.open('') raises OSError, so mimic that, and validate that # all parts of a relative path exist. self.raise_os_error(errno.ENOENT, path) path = self.absnormpath(self._original_path(path)) + path = self.replace_windows_root(path) if self._is_root_path(path): return path if path == matching_string(path, self.dev_null.name): return path path_components = self._path_components(path) resolved_components = self._resolve_components(path_components) - return self._components_to_path(resolved_components) + path = self._components_to_path(resolved_components) + # after resolving links, we have to check again for Windows root + return self.replace_windows_root(path) # pytype: disable=bad-return-type def _components_to_path(self, component_folders): - sep = (self.get_path_separator(component_folders[0]) - if component_folders else self.path_separator) + sep = ( + self.get_path_separator(component_folders[0]) + if component_folders + else self.path_separator + ) path = sep.join(component_folders) if not self._starts_with_root_path(path): path = sep + path @@ -1957,9 +1514,10 @@ class FakeFilesystem: # check. It is just a quick hack to prevent us from looping # forever on cycles. if link_depth > _MAX_LINK_DEPTH: - self.raise_os_error(errno.ELOOP, - self._components_to_path( - resolved_components)) + self.raise_os_error( + errno.ELOOP, + self._components_to_path(resolved_components), + ) link_path = self._follow_link(resolved_components, directory) # Following the link might result in the complete replacement @@ -1976,16 +1534,14 @@ class FakeFilesystem: def _valid_relative_path(self, file_path: AnyStr) -> bool: if self.is_windows_fs: return True - slash_dotdot = matching_string( - file_path, self.path_separator + '..') + slash_dotdot = matching_string(file_path, self.path_separator + "..") while file_path and slash_dotdot in file_path: - file_path = file_path[:file_path.rfind(slash_dotdot)] + file_path = file_path[: file_path.rfind(slash_dotdot)] if not self.exists(self.absnormpath(file_path)): return False return True - def _follow_link(self, link_path_components: List[str], - link: AnyFile) -> str: + def _follow_link(self, link_path_components: List[str], link: AnyFile) -> str: """Follow a link w.r.t. a path resolved so far. The component is either a real file, which is a no-op, or a @@ -2011,7 +1567,7 @@ class FakeFilesystem: link_path = link.contents if link_path is not None: # ignore UNC prefix for local files - if self.is_windows_fs and link_path.startswith('\\\\?\\'): + if self.is_windows_fs and link_path.startswith("\\\\?\\"): link_path = link_path[4:] sep = self.get_path_separator(link_path) # For links to absolute paths, we want to throw out everything @@ -2028,12 +1584,15 @@ class FakeFilesystem: link_path = sep.join(components) # Don't call self.NormalizePath(), as we don't want to prepend # self.cwd. - return self.normpath(link_path) + return self.normpath(link_path) # pytype: disable=bad-return-type raise ValueError("Invalid link") - def get_object_from_normpath(self, - file_path: AnyPath, - check_read_perm: bool = True) -> AnyFile: + def get_object_from_normpath( + self, + file_path: AnyPath, + check_read_perm: bool = True, + check_owner: bool = False, + ) -> AnyFile: """Search for the specified filesystem object within the fake filesystem. @@ -2042,6 +1601,9 @@ class FakeFilesystem: path that has already been normalized/resolved. check_read_perm: If True, raises OSError if a parent directory does not have read permission + check_owner: If True, and check_read_perm is also True, + only checks read permission if the current user id is + different from the file object user id Returns: The FakeFile object corresponding to file_path. @@ -2062,22 +1624,34 @@ class FakeFilesystem: for component in path_components: if S_ISLNK(target.st_mode): if target.contents: - target = cast(FakeDirectory, - self.resolve(target.contents)) + target = cast(FakeDirectory, self.resolve(target.contents)) if not S_ISDIR(target.st_mode): if not self.is_windows_fs: self.raise_os_error(errno.ENOTDIR, path) self.raise_os_error(errno.ENOENT, path) target = target.get_entry(component) # type: ignore - if (not is_root() and check_read_perm and target and - not target.st_mode & PERM_READ): + if ( + not is_root() + and check_read_perm + and target + and not self._can_read(target, check_owner) + ): self.raise_os_error(errno.EACCES, target.path) except KeyError: self.raise_os_error(errno.ENOENT, path) return target - def get_object(self, file_path: AnyPath, - check_read_perm: bool = True) -> FakeFile: + @staticmethod + def _can_read(target, owner_can_read): + if target.st_uid == helpers.get_uid(): + if owner_can_read or target.st_mode & 0o400: + return True + if target.st_gid == get_gid(): + if target.st_mode & 0o040: + return True + return target.st_mode & 0o004 + + def get_object(self, file_path: AnyPath, check_read_perm: bool = True) -> FakeFile: """Search for the specified filesystem object within the fake filesystem. @@ -2096,10 +1670,14 @@ class FakeFilesystem: path = self.absnormpath(self._original_path(path)) return self.get_object_from_normpath(path, check_read_perm) - def resolve(self, file_path: AnyStr, - follow_symlinks: bool = True, - allow_fd: bool = False, - check_read_perm: bool = True) -> FakeFile: + def resolve( + self, + file_path: AnyStr, + follow_symlinks: bool = True, + allow_fd: bool = False, + check_read_perm: bool = True, + check_owner: bool = False, + ) -> FakeFile: """Search for the specified filesystem object, resolving all links. Args: @@ -2109,6 +1687,9 @@ class FakeFilesystem: allow_fd: If `True`, `file_path` may be an open file descriptor check_read_perm: If True, raises OSError if a parent directory does not have read permission + check_owner: If True, and check_read_perm is also True, + only checks read permission if the current user id is + different from the file object user id Returns: The FakeFile object corresponding to `file_path`. @@ -2119,12 +1700,14 @@ class FakeFilesystem: if isinstance(file_path, int): if allow_fd: return self.get_open_file(file_path).get_object() - raise TypeError('path should be string, bytes or ' - 'os.PathLike, not int') + raise TypeError("path should be string, bytes or " "os.PathLike, not int") if follow_symlinks: - return self.get_object_from_normpath(self.resolve_path( - file_path, check_read_perm), check_read_perm) + return self.get_object_from_normpath( + self.resolve_path(file_path, allow_fd), + check_read_perm, + check_owner, + ) return self.lresolve(file_path) def lresolve(self, path: AnyPath) -> FakeFile: @@ -2151,7 +1734,7 @@ class FakeFilesystem: # remove trailing separator path_str = self._path_without_trailing_separators(path_str) - if path_str == matching_string(path_str, '.'): + if path_str == matching_string(path_str, "."): path_str = matching_string(path_str, self.cwd) path_str = self._original_path(path_str) @@ -2165,12 +1748,16 @@ class FakeFilesystem: if not self.is_windows_fs and isinstance(parent_obj, FakeFile): self.raise_os_error(errno.ENOTDIR, path_str) self.raise_os_error(errno.ENOENT, path_str) - if not parent_obj.st_mode & PERM_READ: + if not parent_obj.st_mode & helpers.PERM_READ: self.raise_os_error(errno.EACCES, parent_directory) - return (parent_obj.get_entry(to_string(child_name)) if child_name - else parent_obj) + return ( + parent_obj.get_entry(to_string(child_name)) + if child_name + else parent_obj + ) except KeyError: - self.raise_os_error(errno.ENOENT, path_str) + pass + raise OSError(errno.ENOENT, path_str) def add_object(self, file_path: AnyStr, file_object: AnyFile) -> None: """Add a fake file or directory into the filesystem at file_path. @@ -2184,7 +1771,7 @@ class FakeFilesystem: directory. """ if not file_path: - target_directory = self.root + target_directory = self.root_dir else: target_directory = cast(FakeDirectory, self.resolve(file_path)) if not S_ISDIR(target_directory.st_mode): @@ -2192,9 +1779,12 @@ class FakeFilesystem: self.raise_os_error(error, file_path) target_directory.add_entry(file_object) - def rename(self, old_file_path: AnyPath, - new_file_path: AnyPath, - force_replace: bool = False) -> None: + def rename( + self, + old_file_path: AnyPath, + new_file_path: AnyPath, + force_replace: bool = False, + ) -> None: """Renames a FakeFile object at old_file_path to new_file_path, preserving all properties. @@ -2231,13 +1821,12 @@ class FakeFilesystem: old_object = self.lresolve(old_path) if not self.is_windows_fs: - self._handle_posix_dir_link_errors( - new_path, old_path, ends_with_sep) + self._handle_posix_dir_link_errors(new_path, old_path, ends_with_sep) if self.exists(new_path, check_link=True): renamed_path = self._rename_to_existing_path( - force_replace, new_path, old_path, - old_object, ends_with_sep) + force_replace, new_path, old_path, old_object, ends_with_sep + ) if renamed_path is None: return @@ -2254,8 +1843,8 @@ class FakeFilesystem: self.raise_os_error(errno.EXDEV, old_path) if not S_ISDIR(new_dir_object.st_mode): self.raise_os_error( - errno.EACCES if self.is_windows_fs else errno.ENOTDIR, - new_path) + errno.EACCES if self.is_windows_fs else errno.ENOTDIR, new_path + ) if new_dir_object.has_parent_object(old_object): self.raise_os_error(errno.EINVAL, new_path) @@ -2266,8 +1855,11 @@ class FakeFilesystem: old_dir_object.remove_entry(old_name, recursive=False) object_to_rename.name = new_name new_name = new_dir_object._normalized_entryname(new_name) - old_entry = (new_dir_object.get_entry(new_name) - if new_name in new_dir_object.entries else None) + old_entry = ( + new_dir_object.get_entry(new_name) + if new_name in new_dir_object.entries + else None + ) try: if old_entry: # in case of overwriting remove the old entry first @@ -2285,31 +1877,45 @@ class FakeFilesystem: # note that the check for trailing sep has to be done earlier if self.islink(path): if not self.exists(path): - error = (errno.ENOENT if self.is_macos else - errno.EINVAL if self.is_windows_fs else errno.ENOTDIR) + error = ( + errno.ENOENT + if self.is_macos + else errno.EINVAL + if self.is_windows_fs + else errno.ENOTDIR + ) self.raise_os_error(error, path) - def _handle_posix_dir_link_errors(self, new_file_path: AnyStr, - old_file_path: AnyStr, - ends_with_sep: bool) -> None: - if (self.isdir(old_file_path, follow_symlinks=False) and - self.islink(new_file_path)): + def _handle_posix_dir_link_errors( + self, new_file_path: AnyStr, old_file_path: AnyStr, ends_with_sep: bool + ) -> None: + if self.isdir(old_file_path, follow_symlinks=False) and self.islink( + new_file_path + ): self.raise_os_error(errno.ENOTDIR, new_file_path) - if (self.isdir(new_file_path, follow_symlinks=False) and - self.islink(old_file_path)): + if self.isdir(new_file_path, follow_symlinks=False) and self.islink( + old_file_path + ): if ends_with_sep and self.is_macos: return error = errno.ENOTDIR if ends_with_sep else errno.EISDIR self.raise_os_error(error, new_file_path) - if (ends_with_sep and self.islink(old_file_path) and - old_file_path == new_file_path and not self.is_windows_fs): + if ( + ends_with_sep + and self.islink(old_file_path) + and old_file_path == new_file_path + and not self.is_windows_fs + ): self.raise_os_error(errno.ENOTDIR, new_file_path) - def _rename_to_existing_path(self, force_replace: bool, - new_file_path: AnyStr, - old_file_path: AnyStr, - old_object: FakeFile, - ends_with_sep: bool) -> Optional[AnyStr]: + def _rename_to_existing_path( + self, + force_replace: bool, + new_file_path: AnyStr, + old_file_path: AnyStr, + old_object: FakeFile, + ends_with_sep: bool, + ) -> Optional[AnyStr]: new_object = self.get_object(new_file_path) if old_file_path == new_file_path: if not S_ISLNK(new_object.st_mode) and ends_with_sep: @@ -2321,8 +1927,12 @@ class FakeFilesystem: return self._rename_same_object(new_file_path, old_file_path) if S_ISDIR(new_object.st_mode) or S_ISLNK(new_object.st_mode): self._handle_rename_error_for_dir_or_link( - force_replace, new_file_path, - new_object, old_object, ends_with_sep) + force_replace, + new_file_path, + new_object, + old_object, + ends_with_sep, + ) elif S_ISDIR(old_object.st_mode): error = errno.EEXIST if self.is_windows_fs else errno.ENOTDIR self.raise_os_error(error, new_file_path) @@ -2332,11 +1942,14 @@ class FakeFilesystem: self.remove_object(new_file_path) return new_file_path - def _handle_rename_error_for_dir_or_link(self, force_replace: bool, - new_file_path: AnyStr, - new_object: FakeFile, - old_object: FakeFile, - ends_with_sep: bool) -> None: + def _handle_rename_error_for_dir_or_link( + self, + force_replace: bool, + new_file_path: AnyStr, + new_object: FakeFile, + old_object: FakeFile, + ends_with_sep: bool, + ) -> None: if self.is_windows_fs: if force_replace: self.raise_os_error(errno.EACCES, new_file_path) @@ -2344,37 +1957,41 @@ class FakeFilesystem: self.raise_os_error(errno.EEXIST, new_file_path) if not S_ISLNK(new_object.st_mode): if new_object.entries: - if (not S_ISLNK(old_object.st_mode) or - not ends_with_sep or not self.is_macos): + if ( + not S_ISLNK(old_object.st_mode) + or not ends_with_sep + or not self.is_macos + ): self.raise_os_error(errno.ENOTEMPTY, new_file_path) if S_ISREG(old_object.st_mode): self.raise_os_error(errno.EISDIR, new_file_path) - def _rename_same_object(self, new_file_path: AnyStr, - old_file_path: AnyStr) -> Optional[AnyStr]: + def _rename_same_object( + self, new_file_path: AnyStr, old_file_path: AnyStr + ) -> Optional[AnyStr]: do_rename = old_file_path.lower() == new_file_path.lower() if not do_rename: try: real_old_path = self.resolve_path(old_file_path) original_old_path = self._original_path(real_old_path) real_new_path = self.resolve_path(new_file_path) - if (real_new_path == original_old_path and - (new_file_path == real_old_path) == - (new_file_path.lower() == - real_old_path.lower())): - real_object = self.resolve(old_file_path, - follow_symlinks=False) - do_rename = (os.path.basename(old_file_path) == - real_object.name or not self.is_macos) + if real_new_path == original_old_path and ( + new_file_path == real_old_path + ) == (new_file_path.lower() == real_old_path.lower()): + real_object = self.resolve(old_file_path, follow_symlinks=False) + do_rename = ( + os.path.basename(old_file_path) == real_object.name + or not self.is_macos + ) else: - do_rename = (real_new_path.lower() == - real_old_path.lower()) + do_rename = real_new_path.lower() == real_old_path.lower() if do_rename: # only case is changed in case-insensitive file # system - do the rename parent, file_name = self.splitpath(new_file_path) new_file_path = self.joinpaths( - self._original_path(parent), file_name) + self._original_path(parent), file_name + ) except OSError: # ResolvePath may fail due to symlink loop issues or # similar - in this case just assume the paths @@ -2411,11 +2028,12 @@ class FakeFilesystem: def make_string_path(self, path: AnyPath) -> AnyStr: path_str = make_string_path(path) os_sep = matching_string(path_str, os.sep) - fake_sep = matching_string(path_str, self.path_separator) + fake_sep = self.get_path_separator(path_str) return path_str.replace(os_sep, fake_sep) # type: ignore[return-value] - def create_dir(self, directory_path: AnyPath, - perm_bits: int = PERM_DEF) -> FakeDirectory: + def create_dir( + self, directory_path: AnyPath, perm_bits: int = helpers.PERM_DEF + ) -> FakeDirectory: """Create `directory_path`, and all the parent directories. Helper method to set up your test faster. @@ -2433,19 +2051,19 @@ class FakeFilesystem: dir_path = self.make_string_path(directory_path) dir_path = self.absnormpath(dir_path) self._auto_mount_drive_if_needed(dir_path) - if (self.exists(dir_path, check_link=True) and - dir_path not in self.mount_points): + if self.exists(dir_path, check_link=True) and dir_path not in self.mount_points: self.raise_os_error(errno.EEXIST, dir_path) path_components = self._path_components(dir_path) current_dir = self.root new_dirs = [] for component in [to_string(p) for p in path_components]: - directory = self._directory_content( - current_dir, to_string(component))[1] + directory = self._directory_content(current_dir, to_string(component))[1] if not directory: new_dir = FakeDirectory(component, filesystem=self) new_dirs.append(new_dir) + if self.is_windows_fs and current_dir == self.root: + current_dir = self.root_dir current_dir.add_entry(new_dir) current_dir = new_dir else: @@ -2464,15 +2082,18 @@ class FakeFilesystem: return current_dir - def create_file(self, file_path: AnyPath, - st_mode: int = S_IFREG | PERM_DEF_FILE, - contents: AnyString = '', - st_size: Optional[int] = None, - create_missing_dirs: bool = True, - apply_umask: bool = False, - encoding: Optional[str] = None, - errors: Optional[str] = None, - side_effect: Optional[Callable] = None) -> FakeFile: + def create_file( + self, + file_path: AnyPath, + st_mode: int = S_IFREG | helpers.PERM_DEF_FILE, + contents: AnyString = "", + st_size: Optional[int] = None, + create_missing_dirs: bool = True, + apply_umask: bool = False, + encoding: Optional[str] = None, + errors: Optional[str] = None, + side_effect: Optional[Callable] = None, + ) -> FakeFile: """Create `file_path`, including all the parent directories along the way. @@ -2503,12 +2124,23 @@ class FakeFilesystem: OSError: if the containing directory is required and missing. """ return self.create_file_internally( - file_path, st_mode, contents, st_size, create_missing_dirs, - apply_umask, encoding, errors, side_effect=side_effect) + file_path, + st_mode, + contents, + st_size, + create_missing_dirs, + apply_umask, + encoding, + errors, + side_effect=side_effect, + ) - def add_real_file(self, source_path: AnyPath, - read_only: bool = True, - target_path: Optional[AnyPath] = None) -> FakeFile: + def add_real_file( + self, + source_path: AnyPath, + read_only: bool = True, + target_path: Optional[AnyPath] = None, + ) -> FakeFile: """Create `file_path`, including all the parent directories along the way, for an existing real file. The contents of the real file are read only on demand. @@ -2536,20 +2168,19 @@ class FakeFilesystem: target_path = target_path or source_path source_path_str = make_string_path(source_path) real_stat = os.stat(source_path_str) - fake_file = self.create_file_internally(target_path, - read_from_real_fs=True) + fake_file = self.create_file_internally(target_path, read_from_real_fs=True) # for read-only mode, remove the write/executable permission bits fake_file.stat_result.set_from_stat_result(real_stat) if read_only: fake_file.st_mode &= 0o777444 fake_file.file_path = source_path_str - self.change_disk_usage(fake_file.size, fake_file.name, - fake_file.st_dev) + self.change_disk_usage(fake_file.size, fake_file.name, fake_file.st_dev) return fake_file - def add_real_symlink(self, source_path: AnyPath, - target_path: Optional[AnyPath] = None) -> FakeFile: + def add_real_symlink( + self, source_path: AnyPath, target_path: Optional[AnyPath] = None + ) -> FakeFile: """Create a symlink at source_path (or target_path, if given). It will point to the same path as the symlink on the real filesystem. Relative symlinks will point relative to their new location. Absolute symlinks @@ -2570,10 +2201,8 @@ class FakeFilesystem: OSError: if the directory already exists in the fake file system. """ source_path_str = make_string_path(source_path) # TODO: add test - source_path_str = self._path_without_trailing_separators( - source_path_str) - if (not os.path.exists(source_path_str) and - not os.path.islink(source_path_str)): + source_path_str = self._path_without_trailing_separators(source_path_str) + if not os.path.exists(source_path_str) and not os.path.islink(source_path_str): self.raise_os_error(errno.ENOENT, source_path_str) target = os.readlink(source_path_str) @@ -2584,10 +2213,12 @@ class FakeFilesystem: return self.create_symlink(source_path_str, target) def add_real_directory( - self, source_path: AnyPath, - read_only: bool = True, - lazy_read: bool = True, - target_path: Optional[AnyPath] = None) -> FakeDirectory: + self, + source_path: AnyPath, + read_only: bool = True, + lazy_read: bool = True, + target_path: Optional[AnyPath] = None, + ) -> FakeDirectory: """Create a fake directory corresponding to the real directory at the specified path. Add entries in the fake directory corresponding to the entries in the real directory. Symlinks are supported. @@ -2616,26 +2247,29 @@ class FakeFilesystem: OSError: if the directory already exists in the fake file system. """ source_path_str = make_string_path(source_path) # TODO: add test - source_path_str = self._path_without_trailing_separators( - source_path_str) + source_path_str = self._path_without_trailing_separators(source_path_str) if not os.path.exists(source_path_str): self.raise_os_error(errno.ENOENT, source_path_str) - target_path = target_path or source_path_str + target_path_str = make_string_path(target_path or source_path_str) + self._auto_mount_drive_if_needed(target_path_str) new_dir: FakeDirectory if lazy_read: - parent_path = os.path.split(target_path)[0] + parent_path = os.path.split(target_path_str)[0] if self.exists(parent_path): parent_dir = self.get_object(parent_path) else: parent_dir = self.create_dir(parent_path) new_dir = FakeDirectoryFromRealDirectory( - source_path_str, self, read_only, target_path) + source_path_str, self, read_only, target_path_str + ) parent_dir.add_entry(new_dir) else: - new_dir = self.create_dir(target_path) + new_dir = self.create_dir(target_path_str) for base, _, files in os.walk(source_path_str): - new_base = os.path.join(new_dir.path, # type: ignore[arg-type] - os.path.relpath(base, source_path_str)) + new_base = os.path.join( + new_dir.path, # type: ignore[arg-type] + os.path.relpath(base, source_path_str), + ) for fileEntry in os.listdir(base): abs_fileEntry = os.path.join(base, fileEntry) @@ -2643,19 +2277,23 @@ class FakeFilesystem: continue self.add_real_symlink( - abs_fileEntry, os.path.join(new_base, fileEntry)) + abs_fileEntry, os.path.join(new_base, fileEntry) + ) for fileEntry in files: path = os.path.join(base, fileEntry) if os.path.islink(path): continue - self.add_real_file(path, - read_only, - os.path.join(new_base, fileEntry)) + self.add_real_file( + path, read_only, os.path.join(new_base, fileEntry) + ) return new_dir - def add_real_paths(self, path_list: List[AnyStr], - read_only: bool = True, - lazy_dir_read: bool = True) -> None: + def add_real_paths( + self, + path_list: List[AnyStr], + read_only: bool = True, + lazy_dir_read: bool = True, + ) -> None: """This convenience method adds multiple files and/or directories from the real file system to the fake file system. See `add_real_file()` and `add_real_directory()`. @@ -2683,16 +2321,18 @@ class FakeFilesystem: self.add_real_file(path, read_only) def create_file_internally( - self, file_path: AnyPath, - st_mode: int = S_IFREG | PERM_DEF_FILE, - contents: AnyString = '', - st_size: Optional[int] = None, - create_missing_dirs: bool = True, - apply_umask: bool = False, - encoding: Optional[str] = None, - errors: Optional[str] = None, - read_from_real_fs: bool = False, - side_effect: Optional[Callable] = None) -> FakeFile: + self, + file_path: AnyPath, + st_mode: int = S_IFREG | helpers.PERM_DEF_FILE, + contents: AnyString = "", + st_size: Optional[int] = None, + create_missing_dirs: bool = True, + apply_umask: bool = False, + encoding: Optional[str] = None, + errors: Optional[str] = None, + read_from_real_fs: bool = False, + side_effect: Optional[Callable] = None, + ) -> FakeFile: """Internal fake file creator that supports both normal fake files and fake files based on real files. @@ -2719,7 +2359,8 @@ class FakeFilesystem: path = self.absnormpath(path) if not is_int_type(st_mode): raise TypeError( - 'st_mode must be of int type - did you mean to set contents?') + "st_mode must be of int type - did you mean to set contents?" + ) if self.exists(path, check_link=True): self.raise_os_error(errno.EEXIST, path) @@ -2730,27 +2371,33 @@ class FakeFilesystem: if not self.exists(parent_directory): if not create_missing_dirs: self.raise_os_error(errno.ENOENT, parent_directory) - self.create_dir(parent_directory) + parent_directory = matching_string( + path, self.create_dir(parent_directory).path # type: ignore + ) else: parent_directory = self._original_path(parent_directory) if apply_umask: st_mode &= ~self.umask file_object: FakeFile if read_from_real_fs: - file_object = FakeFileFromRealFile(to_string(path), - filesystem=self, - side_effect=side_effect) + file_object = FakeFileFromRealFile( + to_string(path), filesystem=self, side_effect=side_effect + ) else: - file_object = FakeFile(new_file, st_mode, filesystem=self, - encoding=encoding, errors=errors, - side_effect=side_effect) + file_object = FakeFile( + new_file, + st_mode, + filesystem=self, + encoding=encoding, + errors=errors, + side_effect=side_effect, + ) self.add_object(parent_directory, file_object) if st_size is None and contents is None: - contents = '' - if (not read_from_real_fs and - (contents is not None or st_size is not None)): + contents = "" + if not read_from_real_fs and (contents is not None or st_size is not None): try: if st_size is not None: file_object.set_large_file_size(st_size) @@ -2762,9 +2409,12 @@ class FakeFilesystem: return file_object - def create_symlink(self, file_path: AnyPath, - link_target: AnyPath, - create_missing_dirs: bool = True) -> FakeFile: + def create_symlink( + self, + file_path: AnyPath, + link_target: AnyPath, + create_missing_dirs: bool = True, + ) -> FakeFile: """Create the specified symlink, pointed at the specified link target. Args: @@ -2794,8 +2444,9 @@ class FakeFilesystem: if self.is_windows_fs: self.raise_os_error(errno.EINVAL, link_target_path) if not self.exists( - self._path_without_trailing_separators(link_path), - check_link=True): + self._path_without_trailing_separators(link_path), + check_link=True, + ): self.raise_os_error(errno.ENOENT, link_target_path) if self.is_macos: # to avoid EEXIST exception, remove the link @@ -2809,14 +2460,19 @@ class FakeFilesystem: if not self.islink(link_path): link_path = self.resolve_path(link_path) return self.create_file_internally( - link_path, st_mode=S_IFLNK | PERM_DEF, + link_path, + st_mode=S_IFLNK | helpers.PERM_DEF, contents=link_target_path, - create_missing_dirs=create_missing_dirs) + create_missing_dirs=create_missing_dirs, + ) - def create_link(self, old_path: AnyPath, - new_path: AnyPath, - follow_symlinks: bool = True, - create_missing_dirs: bool = True) -> FakeFile: + def create_link( + self, + old_path: AnyPath, + new_path: AnyPath, + follow_symlinks: bool = True, + create_missing_dirs: bool = True, + ) -> FakeFile: """Create a hard link at new_path, pointing at old_path. Args: @@ -2841,8 +2497,7 @@ class FakeFilesystem: if self.exists(new_path_normalized, check_link=True): self.raise_os_error(errno.EEXIST, new_path_str) - new_parent_directory, new_basename = self.splitpath( - new_path_normalized) + new_parent_directory, new_basename = self.splitpath(new_path_normalized) if not new_parent_directory: new_parent_directory = matching_string(new_path_str, self.cwd) @@ -2861,15 +2516,14 @@ class FakeFilesystem: # Retrieve the target file try: - old_file = self.resolve(old_path_str, - follow_symlinks=follow_symlinks) + old_file = self.resolve(old_path_str, follow_symlinks=follow_symlinks) except OSError: self.raise_os_error(errno.ENOENT, old_path_str) if old_file.st_mode & S_IFDIR: self.raise_os_error( - errno.EACCES if self.is_windows_fs - else errno.EPERM, old_path_str + errno.EACCES if self.is_windows_fs else errno.EPERM, + old_path_str, ) # abuse the name field to control the filename of the @@ -2878,9 +2532,12 @@ class FakeFilesystem: self.add_object(new_parent_directory, old_file) return old_file - def link(self, old_path: AnyPath, - new_path: AnyPath, - follow_symlinks: bool = True) -> FakeFile: + def link( + self, + old_path: AnyPath, + new_path: AnyPath, + follow_symlinks: bool = True, + ) -> FakeFile: """Create a hard link at new_path, pointing at old_path. Args: @@ -2897,8 +2554,9 @@ class FakeFilesystem: OSError: if old_path is a directory. OSError: if the parent directory doesn't exist. """ - return self.create_link(old_path, new_path, follow_symlinks, - create_missing_dirs=False) + return self.create_link( + old_path, new_path, follow_symlinks, create_missing_dirs=False + ) def _is_circular_link(self, link_obj: FakeFile) -> bool: try: @@ -2947,7 +2605,7 @@ class FakeFilesystem: assert link_obj.contents return link_obj.contents - def makedir(self, dir_path: AnyPath, mode: int = PERM_DEF) -> None: + def makedir(self, dir_path: AnyPath, mode: int = helpers.PERM_DEF) -> None: """Create a leaf Fake directory. Args: @@ -2964,14 +2622,14 @@ class FakeFilesystem: ends_with_sep = self.ends_with_path_separator(dir_name) dir_name = self._path_without_trailing_separators(dir_name) if not dir_name: - self.raise_os_error(errno.ENOENT, '') + self.raise_os_error(errno.ENOENT, "") if self.is_windows_fs: dir_name = self.absnormpath(dir_name) parent_dir, _ = self.splitpath(dir_name) if parent_dir: base_dir = self.normpath(parent_dir) - ellipsis = matching_string(parent_dir, self.path_separator + '..') + ellipsis = matching_string(parent_dir, self.path_separator + "..") if parent_dir.endswith(ellipsis) and not self.is_windows_fs: base_dir, dummy_dotdot, _ = parent_dir.partition(ellipsis) if not self.exists(base_dir): @@ -2979,7 +2637,7 @@ class FakeFilesystem: dir_name = self.absnormpath(dir_name) if self.exists(dir_name, check_link=True): - if self.is_windows_fs and dir_name == self.path_separator: + if self.is_windows_fs and dir_name == self.root_dir_name: error_nr = errno.EACCES else: error_nr = errno.EEXIST @@ -2992,16 +2650,17 @@ class FakeFilesystem: self.add_object( to_string(head), - FakeDirectory(to_string(tail), mode & ~self.umask, - filesystem=self)) + FakeDirectory(to_string(tail), mode & ~self.umask, filesystem=self), + ) def _path_without_trailing_separators(self, path: AnyStr) -> AnyStr: while self.ends_with_path_separator(path): path = path[:-1] return path - def makedirs(self, dir_name: AnyStr, mode: int = PERM_DEF, - exist_ok: bool = False) -> None: + def makedirs( + self, dir_name: AnyStr, mode: int = helpers.PERM_DEF, exist_ok: bool = False + ) -> None: """Create a leaf Fake directory and create any non-existent parent dirs. @@ -3018,12 +2677,15 @@ class FakeFilesystem: or as per :py:meth:`create_dir`. """ if not dir_name: - self.raise_os_error(errno.ENOENT, '') + self.raise_os_error(errno.ENOENT, "") ends_with_sep = self.ends_with_path_separator(dir_name) dir_name = self.absnormpath(dir_name) - if (ends_with_sep and self.is_macos and - self.exists(dir_name, check_link=True) and - not self.exists(dir_name)): + if ( + ends_with_sep + and self.is_macos + and self.exists(dir_name, check_link=True) + and not self.exists(dir_name) + ): # to avoid EEXIST exception, remove the link self.remove_object(dir_name) @@ -3032,29 +2694,33 @@ class FakeFilesystem: # Raise a permission denied error if the first existing directory # is not writeable. - current_dir = self.root + current_dir = self.root_dir for component in path_components: - if (not hasattr(current_dir, "entries") or - component not in current_dir.entries): + if ( + not hasattr(current_dir, "entries") + or component not in current_dir.entries + ): break else: - current_dir = cast(FakeDirectory, - current_dir.entries[component]) + current_dir = cast(FakeDirectory, current_dir.entries[component]) try: self.create_dir(dir_name, mode & ~self.umask) except OSError as e: if e.errno == errno.EACCES: # permission denied - propagate exception raise - if (not exist_ok or - not isinstance(self.resolve(dir_name), FakeDirectory)): + if not exist_ok or not isinstance(self.resolve(dir_name), FakeDirectory): if self.is_windows_fs and e.errno == errno.ENOTDIR: e.errno = errno.ENOENT self.raise_os_error(e.errno, e.filename) - def _is_of_type(self, path: AnyPath, st_flag: int, - follow_symlinks: bool = True, - check_read_perm: bool = True) -> bool: + def _is_of_type( + self, + path: AnyPath, + st_flag: int, + follow_symlinks: bool = True, + check_read_perm: bool = True, + ) -> bool: """Helper function to implement isdir(), islink(), etc. See the stat(2) man page for valid stat.S_I* flag values @@ -3075,11 +2741,13 @@ class FakeFilesystem: raise TypeError file_path = make_string_path(path) try: - obj = self.resolve(file_path, follow_symlinks, - check_read_perm=check_read_perm) + obj = self.resolve( + file_path, follow_symlinks, check_read_perm=check_read_perm + ) if obj: self.raise_for_filepath_ending_with_separator( - file_path, obj, macos_handling=not follow_symlinks) + file_path, obj, macos_handling=not follow_symlinks + ) return S_IFMT(obj.st_mode) == st_flag except OSError: return False @@ -3111,8 +2779,7 @@ class FakeFilesystem: Raises: TypeError: if path is None. """ - return self._is_of_type(path, S_IFREG, follow_symlinks, - check_read_perm=False) + return self._is_of_type(path, S_IFREG, follow_symlinks, check_read_perm=False) def islink(self, path: AnyPath) -> bool: """Determine if path identifies a symbolic link. @@ -3128,13 +2795,23 @@ class FakeFilesystem: """ return self._is_of_type(path, S_IFLNK, follow_symlinks=False) - def confirmdir(self, target_directory: AnyStr) -> FakeDirectory: + if sys.version_info >= (3, 12): + + def isjunction(self, path: AnyPath) -> bool: + """Returns False. Junctions are never faked.""" + return False + + def confirmdir( + self, target_directory: AnyStr, check_owner: bool = False + ) -> FakeDirectory: """Test that the target is actually a directory, raising OSError if not. Args: target_directory: Path to the target directory within the fake filesystem. + check_owner: If True, only checks read permission if the current + user id is different from the file object user id Returns: The FakeDirectory object corresponding to target_directory. @@ -3142,7 +2819,10 @@ class FakeFilesystem: Raises: OSError: if the target is not a directory. """ - directory = cast(FakeDirectory, self.resolve(target_directory)) + directory = cast( + FakeDirectory, + self.resolve(target_directory, check_owner=check_owner), + ) if not directory.st_mode & S_IFDIR: self.raise_os_error(errno.ENOTDIR, target_directory, 267) return directory @@ -3175,7 +2855,7 @@ class FakeFilesystem: error = errno.EISDIR self.raise_os_error(error, norm_path) - if path.endswith(matching_string(path, self.path_separator)): + if path.endswith(self.get_path_separator(path)): if self.is_windows_fs: error = errno.EACCES elif self.is_macos: @@ -3188,8 +2868,7 @@ class FakeFilesystem: self.remove_object(norm_path) - def rmdir(self, target_directory: AnyStr, - allow_symlink: bool = False) -> None: + def rmdir(self, target_directory: AnyStr, allow_symlink: bool = False) -> None: """Remove a leaf Fake directory. Args: @@ -3203,19 +2882,19 @@ class FakeFilesystem: OSError: if removal failed per FakeFilesystem.RemoveObject. Cannot remove '.'. """ - if target_directory == matching_string(target_directory, '.'): + if target_directory == matching_string(target_directory, "."): error_nr = errno.EACCES if self.is_windows_fs else errno.EINVAL self.raise_os_error(error_nr, target_directory) ends_with_sep = self.ends_with_path_separator(target_directory) target_directory = self.absnormpath(target_directory) - if self.confirmdir(target_directory): + if self.confirmdir(target_directory, check_owner=True): if not self.is_windows_fs and self.islink(target_directory): if allow_symlink: return if not ends_with_sep or not self.is_macos: self.raise_os_error(errno.ENOTDIR, target_directory) - dir_object = self.resolve(target_directory) + dir_object = self.resolve(target_directory, check_owner=True) if dir_object.entries: self.raise_os_error(errno.ENOTEMPTY, target_directory) self.remove_object(target_directory) @@ -3243,2501 +2922,42 @@ class FakeFilesystem: return directory_contents # type: ignore[return-value] def __str__(self) -> str: - return str(self.root) + return str(self.root_dir) def _add_standard_streams(self) -> None: self._add_open_file(StandardStreamWrapper(sys.stdin)) self._add_open_file(StandardStreamWrapper(sys.stdout)) self._add_open_file(StandardStreamWrapper(sys.stderr)) - -Deprecator.add(FakeFilesystem, FakeFilesystem.get_disk_usage, 'GetDiskUsage') -Deprecator.add(FakeFilesystem, FakeFilesystem.set_disk_usage, 'SetDiskUsage') -Deprecator.add(FakeFilesystem, - FakeFilesystem.change_disk_usage, 'ChangeDiskUsage') -Deprecator.add(FakeFilesystem, FakeFilesystem.add_mount_point, 'AddMountPoint') -Deprecator.add(FakeFilesystem, FakeFilesystem.stat, 'GetStat') -Deprecator.add(FakeFilesystem, FakeFilesystem.chmod, 'ChangeMode') -Deprecator.add(FakeFilesystem, FakeFilesystem.utime, 'UpdateTime') -Deprecator.add(FakeFilesystem, FakeFilesystem._add_open_file, 'AddOpenFile') -Deprecator.add(FakeFilesystem, - FakeFilesystem._close_open_file, 'CloseOpenFile') -Deprecator.add(FakeFilesystem, FakeFilesystem.has_open_file, 'HasOpenFile') -Deprecator.add(FakeFilesystem, FakeFilesystem.get_open_file, 'GetOpenFile') -Deprecator.add(FakeFilesystem, - FakeFilesystem.normcase, 'NormalizePathSeparator') -Deprecator.add(FakeFilesystem, FakeFilesystem.normpath, 'CollapsePath') -Deprecator.add(FakeFilesystem, FakeFilesystem._original_path, 'NormalizeCase') -Deprecator.add(FakeFilesystem, FakeFilesystem.absnormpath, 'NormalizePath') -Deprecator.add(FakeFilesystem, FakeFilesystem.splitpath, 'SplitPath') -Deprecator.add(FakeFilesystem, FakeFilesystem.splitdrive, 'SplitDrive') -Deprecator.add(FakeFilesystem, FakeFilesystem.joinpaths, 'JoinPaths') -Deprecator.add(FakeFilesystem, - FakeFilesystem._path_components, 'GetPathComponents') -Deprecator.add(FakeFilesystem, FakeFilesystem._starts_with_drive_letter, - 'StartsWithDriveLetter') -Deprecator.add(FakeFilesystem, FakeFilesystem.exists, 'Exists') -Deprecator.add(FakeFilesystem, FakeFilesystem.resolve_path, 'ResolvePath') -Deprecator.add(FakeFilesystem, FakeFilesystem.get_object_from_normpath, - 'GetObjectFromNormalizedPath') -Deprecator.add(FakeFilesystem, FakeFilesystem.get_object, 'GetObject') -Deprecator.add(FakeFilesystem, FakeFilesystem.resolve, 'ResolveObject') -Deprecator.add(FakeFilesystem, FakeFilesystem.lresolve, 'LResolveObject') -Deprecator.add(FakeFilesystem, FakeFilesystem.add_object, 'AddObject') -Deprecator.add(FakeFilesystem, FakeFilesystem.remove_object, 'RemoveObject') -Deprecator.add(FakeFilesystem, FakeFilesystem.rename, 'RenameObject') -Deprecator.add(FakeFilesystem, FakeFilesystem.create_dir, 'CreateDirectory') -Deprecator.add(FakeFilesystem, FakeFilesystem.create_file, 'CreateFile') -Deprecator.add(FakeFilesystem, FakeFilesystem.create_symlink, 'CreateLink') -Deprecator.add(FakeFilesystem, FakeFilesystem.link, 'CreateHardLink') -Deprecator.add(FakeFilesystem, FakeFilesystem.readlink, 'ReadLink') -Deprecator.add(FakeFilesystem, FakeFilesystem.makedir, 'MakeDirectory') -Deprecator.add(FakeFilesystem, FakeFilesystem.makedirs, 'MakeDirectories') -Deprecator.add(FakeFilesystem, FakeFilesystem.isdir, 'IsDir') -Deprecator.add(FakeFilesystem, FakeFilesystem.isfile, 'IsFile') -Deprecator.add(FakeFilesystem, FakeFilesystem.islink, 'IsLink') -Deprecator.add(FakeFilesystem, FakeFilesystem.confirmdir, 'ConfirmDir') -Deprecator.add(FakeFilesystem, FakeFilesystem.remove, 'RemoveFile') -Deprecator.add(FakeFilesystem, FakeFilesystem.rmdir, 'RemoveDirectory') -Deprecator.add(FakeFilesystem, FakeFilesystem.listdir, 'ListDir') - - -class FakePathModule: - """Faked os.path module replacement. - - FakePathModule should *only* be instantiated by FakeOsModule. See the - FakeOsModule docstring for details. - """ - _OS_PATH_COPY: Any = _copy_module(os.path) - - devnull: ClassVar[str] = '' - sep: ClassVar[str] = '' - altsep: ClassVar[Optional[str]] = None - linesep: ClassVar[str] = '' - pathsep: ClassVar[str] = '' - - @staticmethod - def dir() -> List[str]: - """Return the list of patched function names. Used for patching - functions imported from the module. - """ - return [ - 'abspath', 'dirname', 'exists', 'expanduser', 'getatime', - 'getctime', 'getmtime', 'getsize', 'isabs', 'isdir', 'isfile', - 'islink', 'ismount', 'join', 'lexists', 'normcase', 'normpath', - 'realpath', 'relpath', 'split', 'splitdrive', 'samefile' - ] - - def __init__(self, filesystem: FakeFilesystem, os_module: 'FakeOsModule'): - """Init. - - Args: - filesystem: FakeFilesystem used to provide file system information - """ - self.filesystem = filesystem - self._os_path = self._OS_PATH_COPY - self._os_path.os = self.os = os_module # type: ignore[attr-defined] - self.reset(filesystem) - - @classmethod - def reset(cls, filesystem: FakeFilesystem) -> None: - cls.sep = filesystem.path_separator - cls.altsep = filesystem.alternative_path_separator - cls.linesep = filesystem.line_separator() - cls.devnull = 'nul' if filesystem.is_windows_fs else '/dev/null' - cls.pathsep = ';' if filesystem.is_windows_fs else ':' - - def exists(self, path: AnyStr) -> bool: - """Determine whether the file object exists within the fake filesystem. - - Args: - path: The path to the file object. - - Returns: - (bool) `True` if the file exists. - """ - return self.filesystem.exists(path) - - def lexists(self, path: AnyStr) -> bool: - """Test whether a path exists. Returns True for broken symbolic links. - - Args: - path: path to the symlink object. - - Returns: - bool (if file exists). - """ - return self.filesystem.exists(path, check_link=True) - - def getsize(self, path: AnyStr): - """Return the file object size in bytes. - - Args: - path: path to the file object. - - Returns: - file size in bytes. - """ - file_obj = self.filesystem.resolve(path) - if (self.filesystem.ends_with_path_separator(path) and - S_IFMT(file_obj.st_mode) != S_IFDIR): - error_nr = (errno.EINVAL if self.filesystem.is_windows_fs - else errno.ENOTDIR) - self.filesystem.raise_os_error(error_nr, path) - return file_obj.st_size - - def isabs(self, path: AnyStr) -> bool: - """Return True if path is an absolute pathname.""" - if self.filesystem.is_windows_fs: - path = self.splitdrive(path)[1] - path = make_string_path(path) - return self.filesystem._starts_with_sep(path) - - def isdir(self, path: AnyStr) -> bool: - """Determine if path identifies a directory.""" - return self.filesystem.isdir(path) - - def isfile(self, path: AnyStr) -> bool: - """Determine if path identifies a regular file.""" - return self.filesystem.isfile(path) - - def islink(self, path: AnyStr) -> bool: - """Determine if path identifies a symbolic link. - - Args: - path: Path to filesystem object. - - Returns: - `True` if path points to a symbolic link. - - Raises: - TypeError: if path is None. - """ - return self.filesystem.islink(path) - - def getmtime(self, path: AnyStr) -> float: - """Returns the modification time of the fake file. - - Args: - path: the path to fake file. - - Returns: - (int, float) the modification time of the fake file - in number of seconds since the epoch. - - Raises: - OSError: if the file does not exist. - """ - try: - file_obj = self.filesystem.resolve(path) - return file_obj.st_mtime - except OSError: - self.filesystem.raise_os_error(errno.ENOENT, winerror=3) - - def getatime(self, path: AnyStr) -> float: - """Returns the last access time of the fake file. - - Note: Access time is not set automatically in fake filesystem - on access. - - Args: - path: the path to fake file. - - Returns: - (int, float) the access time of the fake file in number of seconds - since the epoch. - - Raises: - OSError: if the file does not exist. - """ - try: - file_obj = self.filesystem.resolve(path) - except OSError: - self.filesystem.raise_os_error(errno.ENOENT) - return file_obj.st_atime - - def getctime(self, path: AnyStr) -> float: - """Returns the creation time of the fake file. - - Args: - path: the path to fake file. - - Returns: - (int, float) the creation time of the fake file in number of - seconds since the epoch. - - Raises: - OSError: if the file does not exist. - """ - try: - file_obj = self.filesystem.resolve(path) - except OSError: - self.filesystem.raise_os_error(errno.ENOENT) - return file_obj.st_ctime - - def abspath(self, path: AnyStr) -> AnyStr: - """Return the absolute version of a path.""" - - def getcwd(): - """Return the current working directory.""" - # pylint: disable=undefined-variable - if isinstance(path, bytes): - return self.os.getcwdb() - else: - return self.os.getcwd() - - path = make_string_path(path) - if not self.isabs(path): - path = self.join(getcwd(), path) - elif (self.filesystem.is_windows_fs and - self.filesystem._starts_with_sep(path)): - cwd = getcwd() - if self.filesystem._starts_with_drive_letter(cwd): - path = self.join(cwd[:2], path) - return self.normpath(path) - - def join(self, *p: AnyStr) -> AnyStr: - """Return the completed path with a separator of the parts.""" - return self.filesystem.joinpaths(*p) - - def split(self, path: AnyStr) -> Tuple[AnyStr, AnyStr]: - """Split the path into the directory and the filename of the path. - """ - return self.filesystem.splitpath(path) - - def splitdrive(self, path: AnyStr) -> Tuple[AnyStr, AnyStr]: - """Split the path into the drive part and the rest of the path, if - supported.""" - return self.filesystem.splitdrive(path) - - def normpath(self, path: AnyStr) -> AnyStr: - """Normalize path, eliminating double slashes, etc.""" - return self.filesystem.normpath(path) - - def normcase(self, path: AnyStr) -> AnyStr: - """Convert to lower case under windows, replaces additional path - separator.""" - path = self.filesystem.normcase(path) - if self.filesystem.is_windows_fs: - path = path.lower() - return path - - def relpath(self, path: AnyStr, start: Optional[AnyStr] = None) -> AnyStr: - """We mostly rely on the native implementation and adapt the - path separator.""" - if not path: - raise ValueError("no path specified") - path = make_string_path(path) - if start is not None: - start = make_string_path(start) - else: - start = matching_string(path, self.filesystem.cwd) - system_sep = matching_string(path, self._os_path.sep) - if self.filesystem.alternative_path_separator is not None: - altsep = matching_string( - path, self.filesystem.alternative_path_separator) - path = path.replace(altsep, system_sep) - start = start.replace(altsep, system_sep) - sep = matching_string(path, self.filesystem.path_separator) - path = path.replace(sep, system_sep) - start = start.replace(sep, system_sep) - path = self._os_path.relpath(path, start) - return path.replace(system_sep, sep) - - def realpath(self, filename: AnyStr, strict: bool = None) -> AnyStr: - """Return the canonical path of the specified filename, eliminating any - symbolic links encountered in the path. - """ - if strict is not None and sys.version_info < (3, 10): - raise TypeError("realpath() got an unexpected " - "keyword argument 'strict'") - if strict: - # raises in strict mode if the file does not exist - self.filesystem.resolve(filename) - if self.filesystem.is_windows_fs: - return self.abspath(filename) - filename = make_string_path(filename) - path, ok = self._join_real_path(filename[:0], filename, {}) - path = self.abspath(path) - return path - - def samefile(self, path1: AnyStr, path2: AnyStr) -> bool: - """Return whether path1 and path2 point to the same file. - - Args: - path1: first file path or path object (Python >=3.6) - path2: second file path or path object (Python >=3.6) - - Raises: - OSError: if one of the paths does not point to an existing - file system object. - """ - stat1 = self.filesystem.stat(path1) - stat2 = self.filesystem.stat(path2) - return (stat1.st_ino == stat2.st_ino and - stat1.st_dev == stat2.st_dev) - - @overload - def _join_real_path( - self, path: str, - rest: str, - seen: Dict[str, Optional[str]]) -> Tuple[str, bool]: ... - - @overload - def _join_real_path( - self, path: bytes, - rest: bytes, - seen: Dict[bytes, Optional[bytes]]) -> Tuple[bytes, bool]: ... - - def _join_real_path( - self, path: AnyStr, - rest: AnyStr, - seen: Dict[AnyStr, Optional[AnyStr]]) -> Tuple[AnyStr, bool]: - """Join two paths, normalizing and eliminating any symbolic links - encountered in the second path. - Taken from Python source and adapted. - """ - curdir = matching_string(path, '.') - pardir = matching_string(path, '..') - - sep = self.filesystem.get_path_separator(path) - if self.isabs(rest): - rest = rest[1:] - path = sep - - while rest: - name, _, rest = rest.partition(sep) - if not name or name == curdir: - # current dir - continue - if name == pardir: - # parent dir - if path: - path, name = self.filesystem.splitpath(path) - if name == pardir: - path = self.filesystem.joinpaths(path, pardir, pardir) - else: - path = pardir - continue - newpath = self.filesystem.joinpaths(path, name) - if not self.filesystem.islink(newpath): - path = newpath - continue - # Resolve the symbolic link - if newpath in seen: - # Already seen this path - seen_path = seen[newpath] - if seen_path is not None: - # use cached value - path = seen_path - continue - # The symlink is not resolved, so we must have a symlink loop. - # Return already resolved part + rest of the path unchanged. - return self.filesystem.joinpaths(newpath, rest), False - seen[newpath] = None # not resolved symlink - path, ok = self._join_real_path( - path, matching_string(path, self.filesystem.readlink( - newpath)), seen) - if not ok: - return self.filesystem.joinpaths(path, rest), False - seen[newpath] = path # resolved symlink - return path, True - - def dirname(self, path: AnyStr) -> AnyStr: - """Returns the first part of the result of `split()`.""" - return self.split(path)[0] - - def expanduser(self, path: AnyStr) -> AnyStr: - """Return the argument with an initial component of ~ or ~user - replaced by that user's home directory. - """ - path = self._os_path.expanduser(path) - return path.replace( - matching_string(path, self._os_path.sep), - matching_string(path, self.sep)) - - def ismount(self, path: AnyStr) -> bool: - """Return true if the given path is a mount point. - - Args: - path: Path to filesystem object to be checked - - Returns: - `True` if path is a mount point added to the fake file system. - Under Windows also returns True for drive and UNC roots - (independent of their existence). - """ - if not path: - return False - path_str = to_string(make_string_path(path)) - normed_path = self.filesystem.absnormpath(path_str) - sep = self.filesystem.path_separator - if self.filesystem.is_windows_fs: - path_seps: Union[Tuple[str, Optional[str]], Tuple[str]] - if self.filesystem.alternative_path_separator is not None: - path_seps = ( - sep, self.filesystem.alternative_path_separator - ) - else: - path_seps = (sep,) - drive, rest = self.filesystem.splitdrive(normed_path) - if drive and drive[:1] in path_seps: - return (not rest) or (rest in path_seps) - if rest in path_seps: - return True - for mount_point in self.filesystem.mount_points: - if (to_string(normed_path).rstrip(sep) == - to_string(mount_point).rstrip(sep)): - return True - return False - - def __getattr__(self, name: str) -> Any: - """Forwards any non-faked calls to the real os.path.""" - return getattr(self._os_path, name) - - -class FakeOsModule: - """Uses FakeFilesystem to provide a fake os module replacement. - - Do not create os.path separately from os, as there is a necessary circular - dependency between os and os.path to replicate the behavior of the standard - Python modules. What you want to do is to just let FakeOsModule take care - of `os.path` setup itself. - - # You always want to do this. - filesystem = fake_filesystem.FakeFilesystem() - my_os_module = fake_filesystem.FakeOsModule(filesystem) - """ - - @staticmethod - def dir() -> List[str]: - """Return the list of patched function names. Used for patching - functions imported from the module. - """ - _dir = [ - 'access', 'chdir', 'chmod', 'chown', 'close', 'fstat', 'fsync', - 'getcwd', 'lchmod', 'link', 'listdir', 'lstat', 'makedirs', - 'mkdir', 'mknod', 'open', 'read', 'readlink', 'remove', - 'removedirs', 'rename', 'rmdir', 'stat', 'symlink', 'umask', - 'unlink', 'utime', 'walk', 'write', 'getcwdb', 'replace' - ] - if sys.platform.startswith('linux'): - _dir += [ - 'fdatasync', 'getxattr', 'listxattr', - 'removexattr', 'setxattr' - ] - if use_scandir: - _dir += ['scandir'] - return _dir - - def __init__(self, filesystem: FakeFilesystem): - """Also exposes self.path (to fake os.path). - - Args: - filesystem: FakeFilesystem used to provide file system information - """ - self.filesystem = filesystem - self._os_module: Any = os - self.path = FakePathModule(self.filesystem, self) - - @property - def devnull(self) -> str: - return self.path.devnull - - @property - def sep(self) -> str: - return self.path.sep - - @property - def altsep(self) -> Optional[str]: - return self.path.altsep - - @property - def linesep(self) -> str: - return self.path.linesep - - @property - def pathsep(self) -> str: - return self.path.pathsep - - def fdopen(self, fd: int, *args: Any, **kwargs: Any) -> AnyFileWrapper: - """Redirector to open() builtin function. - - Args: - fd: The file descriptor of the file to open. - *args: Pass through args. - **kwargs: Pass through kwargs. - - Returns: - File object corresponding to file_des. - - Raises: - TypeError: if file descriptor is not an integer. - """ - if not is_int_type(fd): - raise TypeError('an integer is required') - return FakeFileOpen(self.filesystem)(fd, *args, **kwargs) - - def _umask(self) -> int: - """Return the current umask.""" - if self.filesystem.is_windows_fs: - # windows always returns 0 - it has no real notion of umask - return 0 - if sys.platform == 'win32': - # if we are testing Unix under Windows we assume a default mask - return 0o002 - else: - # under Unix, we return the real umask; - # as there is no pure getter for umask, so we have to first - # set a mode to get the previous one and then re-set that - mask = os.umask(0) - os.umask(mask) - return mask - - def open(self, path: AnyStr, flags: int, mode: Optional[int] = None, *, - dir_fd: Optional[int] = None) -> int: - """Return the file descriptor for a FakeFile. - - Args: - path: the path to the file - flags: low-level bits to indicate io operation - mode: bits to define default permissions - Note: only basic modes are supported, OS-specific modes are - ignored - dir_fd: If not `None`, the file descriptor of a directory, - with `file_path` being relative to this directory. - - Returns: - A file descriptor. - - Raises: - OSError: if the path cannot be found - ValueError: if invalid mode is given - NotImplementedError: if `os.O_EXCL` is used without `os.O_CREAT` - """ - path = self._path_with_dir_fd(path, self.open, dir_fd) - if mode is None: - if self.filesystem.is_windows_fs: - mode = 0o666 - else: - mode = 0o777 & ~self._umask() - - has_tmpfile_flag = (hasattr(os, 'O_TMPFILE') and - flags & getattr(os, 'O_TMPFILE')) - open_modes = _OpenModes( - must_exist=not flags & os.O_CREAT and not has_tmpfile_flag, - can_read=not flags & os.O_WRONLY, - can_write=flags & (os.O_RDWR | os.O_WRONLY) != 0, - truncate=flags & os.O_TRUNC != 0, - append=flags & os.O_APPEND != 0, - must_not_exist=flags & os.O_EXCL != 0 - ) - if open_modes.must_not_exist and open_modes.must_exist: - raise NotImplementedError( - 'O_EXCL without O_CREAT mode is not supported') - if has_tmpfile_flag: - # this is a workaround for tempfiles that do not have a filename - # as we do not support this directly, we just add a unique filename - # and set the file to delete on close - path = self.filesystem.joinpaths( - path, matching_string(path, str(uuid.uuid4()))) - - if (not self.filesystem.is_windows_fs and - self.filesystem.exists(path)): - # handle opening directory - only allowed under Posix - # with read-only mode - obj = self.filesystem.resolve(path) - if isinstance(obj, FakeDirectory): - if ((not open_modes.must_exist and - not self.filesystem.is_macos) - or open_modes.can_write): - self.filesystem.raise_os_error(errno.EISDIR, path) - dir_wrapper = FakeDirWrapper(obj, path, self.filesystem) - file_des = self.filesystem._add_open_file(dir_wrapper) - dir_wrapper.filedes = file_des - return file_des - - # low level open is always binary - str_flags = 'b' - delete_on_close = has_tmpfile_flag - if hasattr(os, 'O_TEMPORARY'): - delete_on_close = flags & os.O_TEMPORARY == os.O_TEMPORARY - fake_file = FakeFileOpen( - self.filesystem, delete_on_close=delete_on_close, raw_io=True)( - path, str_flags, open_modes=open_modes) - assert not isinstance(fake_file, StandardStreamWrapper) - if fake_file.file_object != self.filesystem.dev_null: - self.chmod(path, mode) - return fake_file.fileno() - - def close(self, fd: int) -> None: - """Close a file descriptor. - - Args: - fd: An integer file descriptor for the file object requested. - - Raises: - OSError: bad file descriptor. - TypeError: if file descriptor is not an integer. - """ - file_handle = self.filesystem.get_open_file(fd) - file_handle.close() - - def read(self, fd: int, n: int) -> bytes: - """Read number of bytes from a file descriptor, returns bytes read. - - Args: - fd: An integer file descriptor for the file object requested. - n: Number of bytes to read from file. - - Returns: - Bytes read from file. - - Raises: - OSError: bad file descriptor. - TypeError: if file descriptor is not an integer. - """ - file_handle = self.filesystem.get_open_file(fd) - if isinstance(file_handle, FakeFileWrapper): - file_handle.raw_io = True - if isinstance(file_handle, FakeDirWrapper): - self.filesystem.raise_os_error(errno.EBADF, file_handle.file_path) - return file_handle.read(n) - - def write(self, fd: int, contents: bytes) -> int: - """Write string to file descriptor, returns number of bytes written. - - Args: - fd: An integer file descriptor for the file object requested. - contents: String of bytes to write to file. - - Returns: - Number of bytes written. - - Raises: - OSError: bad file descriptor. - TypeError: if file descriptor is not an integer. - """ - file_handle = cast(FakeFileWrapper, self.filesystem.get_open_file(fd)) - if isinstance(file_handle, FakeDirWrapper): - self.filesystem.raise_os_error(errno.EBADF, file_handle.file_path) - - if isinstance(file_handle, FakePipeWrapper): - return file_handle.write(contents) - - file_handle.raw_io = True - file_handle._sync_io() - file_handle.update_flush_pos() - file_handle.write(contents) - file_handle.flush() - return len(contents) - - def pipe(self) -> Tuple[int, int]: - read_fd, write_fd = os.pipe() - read_wrapper = FakePipeWrapper(self.filesystem, read_fd, False) - file_des = self.filesystem._add_open_file(read_wrapper) - read_wrapper.filedes = file_des - write_wrapper = FakePipeWrapper(self.filesystem, write_fd, True) - file_des = self.filesystem._add_open_file(write_wrapper) - write_wrapper.filedes = file_des - return read_wrapper.filedes, write_wrapper.filedes - - @staticmethod - def stat_float_times(newvalue: Optional[bool] = None) -> bool: - """Determine whether a file's time stamps are reported as floats - or ints. - - Calling without arguments returns the current value. The value is - shared by all instances of FakeOsModule. - - Args: - newvalue: If `True`, mtime, ctime, atime are reported as floats. - Otherwise, they are returned as ints (rounding down). - """ - return FakeStatResult.stat_float_times(newvalue) - - def fstat(self, fd: int) -> FakeStatResult: - """Return the os.stat-like tuple for the FakeFile object of file_des. - - Args: - fd: The file descriptor of filesystem object to retrieve. - - Returns: - The FakeStatResult object corresponding to entry_path. - - Raises: - OSError: if the filesystem object doesn't exist. - """ - # stat should return the tuple representing return value of os.stat - file_object = self.filesystem.get_open_file(fd).get_object() - assert isinstance(file_object, FakeFile) - return file_object.stat_result.copy() - - def umask(self, mask: int) -> int: - """Change the current umask. - - Args: - mask: (int) The new umask value. - - Returns: - The old umask. - - Raises: - TypeError: if new_mask is of an invalid type. - """ - if not is_int_type(mask): - raise TypeError('an integer is required') - old_umask = self.filesystem.umask - self.filesystem.umask = mask - return old_umask - - def chdir(self, path: AnyStr) -> None: - """Change current working directory to target directory. - - Args: - path: The path to new current working directory. - - Raises: - OSError: if user lacks permission to enter the argument directory - or if the target is not a directory. - """ - try: - path = self.filesystem.resolve_path( - path, allow_fd=True) - except OSError as exc: - if self.filesystem.is_macos and exc.errno == errno.EBADF: - raise OSError(errno.ENOTDIR, "Not a directory: " + str(path)) - raise - self.filesystem.confirmdir(path) - directory = self.filesystem.resolve(path) - # A full implementation would check permissions all the way - # up the tree. - if not is_root() and not directory.st_mode | PERM_EXE: - self.filesystem.raise_os_error(errno.EACCES, directory.name) - self.filesystem.cwd = path # type: ignore[assignment] - - def getcwd(self) -> str: - """Return current working directory.""" - return to_string(self.filesystem.cwd) - - def getcwdb(self) -> bytes: - """Return current working directory as bytes.""" - return to_bytes(self.filesystem.cwd) - - def listdir(self, path: AnyStr) -> List[AnyStr]: - """Return a list of file names in target_directory. - - Args: - path: Path to the target directory within the fake - filesystem. - - Returns: - A list of file names within the target directory in arbitrary - order. - - Raises: - OSError: if the target is not a directory. - """ - return self.filesystem.listdir(path) - - XATTR_CREATE = 1 - XATTR_REPLACE = 2 - - def getxattr(self, path: AnyStr, attribute: AnyString, *, - follow_symlinks: bool = True) -> Optional[bytes]: - """Return the value of the given extended filesystem attribute for - `path`. - - Args: - path: File path, file descriptor or path-like object (for - Python >= 3.6). - attribute: (str or bytes) The attribute name. - follow_symlinks: (bool) If True (the default), symlinks in the - path are traversed. - - Returns: - The contents of the extended attribute as bytes or None if - the attribute does not exist. - - Raises: - OSError: if the path does not exist. - """ - if not self.filesystem.is_linux: - raise AttributeError( - "module 'os' has no attribute 'getxattr'") - - if isinstance(attribute, bytes): - attribute = attribute.decode(sys.getfilesystemencoding()) - file_obj = self.filesystem.resolve(path, follow_symlinks, - allow_fd=True) - return file_obj.xattr.get(attribute) - - def listxattr(self, path: Optional[AnyStr] = None, *, - follow_symlinks: bool = True) -> List[str]: - """Return a list of the extended filesystem attributes on `path`. - - Args: - path: File path, file descriptor or path-like object (for - Python >= 3.6). If None, the current directory is used. - follow_symlinks: (bool) If True (the default), symlinks in the - path are traversed. - - Returns: - A list of all attribute names for the given path as str. - - Raises: - OSError: if the path does not exist. - """ - if not self.filesystem.is_linux: - raise AttributeError( - "module 'os' has no attribute 'listxattr'") - - path_str = self.filesystem.cwd if path is None else path - file_obj = self.filesystem.resolve( - cast(AnyStr, path_str), follow_symlinks, allow_fd=True) - return list(file_obj.xattr.keys()) - - def removexattr(self, path: AnyStr, attribute: AnyString, *, - follow_symlinks: bool = True) -> None: - """Removes the extended filesystem attribute attribute from `path`. - - Args: - path: File path, file descriptor or path-like object (for - Python >= 3.6). - attribute: (str or bytes) The attribute name. - follow_symlinks: (bool) If True (the default), symlinks in the - path are traversed. - - Raises: - OSError: if the path does not exist. - """ - if not self.filesystem.is_linux: - raise AttributeError( - "module 'os' has no attribute 'removexattr'") - - if isinstance(attribute, bytes): - attribute = attribute.decode(sys.getfilesystemencoding()) - file_obj = self.filesystem.resolve(path, follow_symlinks, - allow_fd=True) - if attribute in file_obj.xattr: - del file_obj.xattr[attribute] - - def setxattr(self, path: AnyStr, attribute: AnyString, value: bytes, - flags: int = 0, *, follow_symlinks: bool = True) -> None: - """Sets the value of the given extended filesystem attribute for - `path`. - - Args: - path: File path, file descriptor or path-like object (for - Python >= 3.6). - attribute: The attribute name (str or bytes). - value: (byte-like) The value to be set. - follow_symlinks: (bool) If True (the default), symlinks in the - path are traversed. - - Raises: - OSError: if the path does not exist. - TypeError: if `value` is not a byte-like object. - """ - if not self.filesystem.is_linux: - raise AttributeError( - "module 'os' has no attribute 'setxattr'") - - if isinstance(attribute, bytes): - attribute = attribute.decode(sys.getfilesystemencoding()) - if not is_byte_string(value): - raise TypeError('a bytes-like object is required') - file_obj = self.filesystem.resolve(path, follow_symlinks, - allow_fd=True) - exists = attribute in file_obj.xattr - if exists and flags == self.XATTR_CREATE: - self.filesystem.raise_os_error(errno.ENODATA, file_obj.path) - if not exists and flags == self.XATTR_REPLACE: - self.filesystem.raise_os_error(errno.EEXIST, file_obj.path) - file_obj.xattr[attribute] = value - - def scandir(self, path: str = '.') -> ScanDirIter: - """Return an iterator of DirEntry objects corresponding to the - entries in the directory given by path. - - Args: - path: Path to the target directory within the fake filesystem. - - Returns: - An iterator to an unsorted list of os.DirEntry objects for - each entry in path. - - Raises: - OSError: if the target is not a directory. - """ - return scandir(self.filesystem, path) - - def walk(self, top: AnyStr, topdown: bool = True, - onerror: Optional[bool] = None, - followlinks: bool = False): - """Perform an os.walk operation over the fake filesystem. - - Args: - top: The root directory from which to begin walk. - topdown: Determines whether to return the tuples with the root as - the first entry (`True`) or as the last, after all the child - directory tuples (`False`). - onerror: If not `None`, function which will be called to handle the - `os.error` instance provided when `os.listdir()` fails. - followlinks: If `True`, symbolic links are followed. - - Yields: - (path, directories, nondirectories) for top and each of its - subdirectories. See the documentation for the builtin os module - for further details. - """ - return walk(self.filesystem, top, topdown, onerror, followlinks) - - def readlink(self, path: AnyStr, dir_fd: Optional[int] = None) -> str: - """Read the target of a symlink. - - Args: - path: Symlink to read the target of. - dir_fd: If not `None`, the file descriptor of a directory, - with `path` being relative to this directory. - - Returns: - the string representing the path to which the symbolic link points. - - Raises: - TypeError: if `path` is None - OSError: (with errno=ENOENT) if path is not a valid path, or - (with errno=EINVAL) if path is valid, but is not a symlink - """ - path = self._path_with_dir_fd(path, self.readlink, dir_fd) - return self.filesystem.readlink(path) - - def stat(self, path: AnyStr, *, dir_fd: Optional[int] = None, - follow_symlinks: bool = True) -> FakeStatResult: - """Return the os.stat-like tuple for the FakeFile object of entry_path. - - Args: - path: path to filesystem object to retrieve. - dir_fd: (int) If not `None`, the file descriptor of a directory, - with `entry_path` being relative to this directory. - follow_symlinks: (bool) If `False` and `entry_path` points to a - symlink, the link itself is changed instead of the linked - object. - - Returns: - The FakeStatResult object corresponding to entry_path. - - Raises: - OSError: if the filesystem object doesn't exist. - """ - path = self._path_with_dir_fd(path, self.stat, dir_fd) - return self.filesystem.stat(path, follow_symlinks) - - def lstat(self, path: AnyStr, *, - dir_fd: Optional[int] = None) -> FakeStatResult: - """Return the os.stat-like tuple for entry_path, not following symlinks. - - Args: - path: path to filesystem object to retrieve. - dir_fd: If not `None`, the file descriptor of a directory, with - `path` being relative to this directory. - - Returns: - the FakeStatResult object corresponding to `path`. - - Raises: - OSError: if the filesystem object doesn't exist. - """ - # stat should return the tuple representing return value of os.stat - path = self._path_with_dir_fd(path, self.lstat, dir_fd) - return self.filesystem.stat(path, follow_symlinks=False) - - def remove(self, path: AnyStr, dir_fd: Optional[int] = None) -> None: - """Remove the FakeFile object at the specified file path. - - Args: - path: Path to file to be removed. - dir_fd: If not `None`, the file descriptor of a directory, - with `path` being relative to this directory. - - Raises: - OSError: if path points to a directory. - OSError: if path does not exist. - OSError: if removal failed. - """ - path = self._path_with_dir_fd(path, self.remove, dir_fd) - self.filesystem.remove(path) - - def unlink(self, path: AnyStr, *, dir_fd: Optional[int] = None) -> None: - """Remove the FakeFile object at the specified file path. - - Args: - path: Path to file to be removed. - dir_fd: If not `None`, the file descriptor of a directory, - with `path` being relative to this directory. - - Raises: - OSError: if path points to a directory. - OSError: if path does not exist. - OSError: if removal failed. - """ - path = self._path_with_dir_fd(path, self.unlink, dir_fd) - self.filesystem.remove(path) - - def rename(self, src: AnyStr, dst: AnyStr, *, - src_dir_fd: Optional[int] = None, - dst_dir_fd: Optional[int] = None) -> None: - """Rename a FakeFile object at old_file_path to new_file_path, - preserving all properties. - Also replaces existing new_file_path object, if one existed - (Unix only). - - Args: - src: Path to filesystem object to rename. - dst: Path to where the filesystem object will live - after this call. - src_dir_fd: If not `None`, the file descriptor of a directory, - with `src` being relative to this directory. - dst_dir_fd: If not `None`, the file descriptor of a directory, - with `dst` being relative to this directory. - - Raises: - OSError: if old_file_path does not exist. - OSError: if new_file_path is an existing directory. - OSError: if new_file_path is an existing file (Windows only) - OSError: if new_file_path is an existing file and could not - be removed (Unix) - OSError: if `dirname(new_file)` does not exist - OSError: if the file would be moved to another filesystem - (e.g. mount point) - """ - src = self._path_with_dir_fd(src, self.rename, src_dir_fd) - dst = self._path_with_dir_fd(dst, self.rename, dst_dir_fd) - self.filesystem.rename(src, dst) - - def replace(self, src: AnyStr, dst: AnyStr, *, - src_dir_fd: Optional[int] = None, - dst_dir_fd: Optional[int] = None) -> None: - """Renames a FakeFile object at old_file_path to new_file_path, - preserving all properties. - Also replaces existing new_file_path object, if one existed. - - Arg - src: Path to filesystem object to rename. - dst: Path to where the filesystem object will live - after this call. - src_dir_fd: If not `None`, the file descriptor of a directory, - with `src` being relative to this directory. - dst_dir_fd: If not `None`, the file descriptor of a directory, - with `dst` being relative to this directory. - - Raises: - OSError: if old_file_path does not exist. - OSError: if new_file_path is an existing directory. - OSError: if new_file_path is an existing file and could - not be removed - OSError: if `dirname(new_file)` does not exist - OSError: if the file would be moved to another filesystem - (e.g. mount point) - """ - src = self._path_with_dir_fd(src, self.rename, src_dir_fd) - dst = self._path_with_dir_fd(dst, self.rename, dst_dir_fd) - self.filesystem.rename(src, dst, force_replace=True) - - def rmdir(self, path: AnyStr, *, dir_fd: Optional[int] = None) -> None: - """Remove a leaf Fake directory. - - Args: - path: (str) Name of directory to remove. - dir_fd: If not `None`, the file descriptor of a directory, - with `path` being relative to this directory. - - Raises: - OSError: if `path` does not exist or is not a directory, - or as per FakeFilesystem.remove_object. Cannot remove '.'. - """ - path = self._path_with_dir_fd(path, self.rmdir, dir_fd) - self.filesystem.rmdir(path) - - def removedirs(self, name: AnyStr) -> None: - """Remove a leaf fake directory and all empty intermediate ones. - - Args: - name: the directory to be removed. - - Raises: - OSError: if target_directory does not exist or is not a directory. - OSError: if target_directory is not empty. - """ - name = self.filesystem.absnormpath(name) - directory = self.filesystem.confirmdir(name) - if directory.entries: - self.filesystem.raise_os_error( - errno.ENOTEMPTY, self.path.basename(name)) - else: - self.rmdir(name) - head, tail = self.path.split(name) - if not tail: - head, tail = self.path.split(head) - while head and tail: - head_dir = self.filesystem.confirmdir(head) - if head_dir.entries: - break - # only the top-level dir may not be a symlink - self.filesystem.rmdir(head, allow_symlink=True) - head, tail = self.path.split(head) - - def mkdir(self, path: AnyStr, mode: int = PERM_DEF, *, - dir_fd: Optional[int] = None) -> None: - """Create a leaf Fake directory. - - Args: - path: (str) Name of directory to create. - Relative paths are assumed to be relative to '/'. - mode: (int) Mode to create directory with. This argument defaults - to 0o777. The umask is applied to this mode. - dir_fd: If not `None`, the file descriptor of a directory, - with `path` being relative to this directory. - - Raises: - OSError: if the directory name is invalid or parent directory is - read only or as per FakeFilesystem.add_object. - """ - path = self._path_with_dir_fd(path, self.mkdir, dir_fd) - try: - self.filesystem.makedir(path, mode) - except OSError as e: - if e.errno == errno.EACCES: - self.filesystem.raise_os_error(e.errno, path) - raise - - def makedirs(self, name: AnyStr, mode: int = PERM_DEF, - exist_ok: bool = None) -> None: - """Create a leaf Fake directory + create any non-existent parent dirs. - - Args: - name: (str) Name of directory to create. - mode: (int) Mode to create directory (and any necessary parent - directories) with. This argument defaults to 0o777. - The umask is applied to this mode. - exist_ok: (boolean) If exist_ok is False (the default), an OSError - is raised if the target directory already exists. - - Raises: - OSError: if the directory already exists and exist_ok=False, or as - per :py:meth:`FakeFilesystem.create_dir`. - """ - if exist_ok is None: - exist_ok = False - self.filesystem.makedirs(name, mode, exist_ok) - - def _path_with_dir_fd(self, path: AnyStr, fct: Callable, - dir_fd: Optional[int]) -> AnyStr: - """Return the path considering dir_fd. Raise on invalid parameters.""" - try: - path = make_string_path(path) - except TypeError: - # the error is handled later - path = path - if dir_fd is not None: - # check if fd is supported for the built-in real function - real_fct = getattr(os, fct.__name__) - if real_fct not in self.supports_dir_fd: - raise NotImplementedError( - 'dir_fd unavailable on this platform') - if isinstance(path, int): - raise ValueError("%s: Can't specify dir_fd without " - "matching path_str" % fct.__name__) - if not self.path.isabs(path): - open_file = self.filesystem.get_open_file(dir_fd) - return self.path.join( # type: ignore[type-var, return-value] - cast(FakeFile, open_file.get_object()).path, path) - return path - - def truncate(self, path: AnyStr, length: int) -> None: - """Truncate the file corresponding to path, so that it is - length bytes in size. If length is larger than the current size, - the file is filled up with zero bytes. - - Args: - path: (str or int) Path to the file, or an integer file - descriptor for the file object. - length: (int) Length of the file after truncating it. - - Raises: - OSError: if the file does not exist or the file descriptor is - invalid. - """ - file_object = self.filesystem.resolve(path, allow_fd=True) - file_object.size = length - - def ftruncate(self, fd: int, length: int) -> None: - """Truncate the file corresponding to fd, so that it is - length bytes in size. If length is larger than the current size, - the file is filled up with zero bytes. - - Args: - fd: (int) File descriptor for the file object. - length: (int) Maximum length of the file after truncating it. - - Raises: - OSError: if the file descriptor is invalid - """ - file_object = self.filesystem.get_open_file(fd).get_object() - if isinstance(file_object, FakeFileWrapper): - file_object.size = length - else: - raise OSError(errno.EBADF, 'Invalid file descriptor') - - def access(self, path: AnyStr, mode: int, *, - dir_fd: Optional[int] = None, - effective_ids: bool = False, - follow_symlinks: bool = True) -> bool: - """Check if a file exists and has the specified permissions. - - Args: - path: (str) Path to the file. - mode: (int) Permissions represented as a bitwise-OR combination of - os.F_OK, os.R_OK, os.W_OK, and os.X_OK. - dir_fd: If not `None`, the file descriptor of a directory, with - `path` being relative to this directory. - effective_ids: (bool) Unused. Only here to match the signature. - follow_symlinks: (bool) If `False` and `path` points to a symlink, - the link itself is queried instead of the linked object. - - Returns: - bool, `True` if file is accessible, `False` otherwise. - """ - if effective_ids and self.filesystem.is_windows_fs: - raise NotImplementedError( - 'access: effective_ids unavailable on this platform') - path = self._path_with_dir_fd(path, self.access, dir_fd) - try: - stat_result = self.stat(path, follow_symlinks=follow_symlinks) - except OSError as os_error: - if os_error.errno == errno.ENOENT: - return False - raise - if is_root(): - mode &= ~os.W_OK - return (mode & ((stat_result.st_mode >> 6) & 7)) == mode - - def chmod(self, path: AnyStr, mode: int, *, - dir_fd: Optional[int] = None, - follow_symlinks: bool = True) -> None: - """Change the permissions of a file as encoded in integer mode. - - Args: - path: (str) Path to the file. - mode: (int) Permissions. - dir_fd: If not `None`, the file descriptor of a directory, with - `path` being relative to this directory. - follow_symlinks: (bool) If `False` and `path` points to a symlink, - the link itself is queried instead of the linked object. - """ - if (not follow_symlinks and - (os.chmod not in os.supports_follow_symlinks or IS_PYPY)): - raise NotImplementedError( - "`follow_symlinks` for chmod() is not available " - "on this system") - path = self._path_with_dir_fd(path, self.chmod, dir_fd) - self.filesystem.chmod(path, mode, follow_symlinks) - - def lchmod(self, path: AnyStr, mode: int) -> None: - """Change the permissions of a file as encoded in integer mode. - If the file is a link, the permissions of the link are changed. - - Args: - path: (str) Path to the file. - mode: (int) Permissions. - """ - if self.filesystem.is_windows_fs: - raise NameError("name 'lchmod' is not defined") - self.filesystem.chmod(path, mode, follow_symlinks=False) - - def utime(self, path: AnyStr, - times: Optional[Tuple[Union[int, float], Union[int, float]]] = - None, ns: Optional[Tuple[int, int]] = None, - dir_fd: Optional[int] = None, - follow_symlinks: bool = True) -> None: - """Change the access and modified times of a file. - - Args: - path: (str) Path to the file. - times: 2-tuple of int or float numbers, of the form (atime, mtime) - which is used to set the access and modified times in seconds. - If None, both times are set to the current time. - ns: 2-tuple of int numbers, of the form (atime, mtime) which is - used to set the access and modified times in nanoseconds. - If None, both times are set to the current time. - dir_fd: If not `None`, the file descriptor of a directory, - with `path` being relative to this directory. - follow_symlinks: (bool) If `False` and `path` points to a symlink, - the link itself is queried instead of the linked object. - - Raises: - TypeError: If anything other than the expected types is - specified in the passed `times` or `ns` tuple, - or if the tuple length is not equal to 2. - ValueError: If both times and ns are specified. - """ - path = self._path_with_dir_fd(path, self.utime, dir_fd) - self.filesystem.utime( - path, times=times, ns=ns, follow_symlinks=follow_symlinks) - - def chown(self, path: AnyStr, uid: int, gid: int, *, - dir_fd: Optional[int] = None, - follow_symlinks: bool = True) -> None: - """Set ownership of a faked file. - - Args: - path: (str) Path to the file or directory. - uid: (int) Numeric uid to set the file or directory to. - gid: (int) Numeric gid to set the file or directory to. - dir_fd: (int) If not `None`, the file descriptor of a directory, - with `path` being relative to this directory. - follow_symlinks: (bool) If `False` and path points to a symlink, - the link itself is changed instead of the linked object. - - Raises: - OSError: if path does not exist. - - `None` is also allowed for `uid` and `gid`. This permits `os.rename` - to use `os.chown` even when the source file `uid` and `gid` are - `None` (unset). - """ - path = self._path_with_dir_fd(path, self.chown, dir_fd) - file_object = self.filesystem.resolve( - path, follow_symlinks, allow_fd=True) - if not ((is_int_type(uid) or uid is None) and - (is_int_type(gid) or gid is None)): - raise TypeError("An integer is required") - if uid != -1: - file_object.st_uid = uid - if gid != -1: - file_object.st_gid = gid - - def mknod(self, path: AnyStr, mode: Optional[int] = None, - device: int = 0, *, - dir_fd: Optional[int] = None) -> None: - """Create a filesystem node named 'filename'. - - Does not support device special files or named pipes as the real os - module does. - - Args: - path: (str) Name of the file to create - mode: (int) Permissions to use and type of file to be created. - Default permissions are 0o666. Only the stat.S_IFREG file type - is supported by the fake implementation. The umask is applied - to this mode. - device: not supported in fake implementation - dir_fd: If not `None`, the file descriptor of a directory, - with `path` being relative to this directory. - - Raises: - OSError: if called with unsupported options or the file can not be - created. - """ - if self.filesystem.is_windows_fs: - raise AttributeError("module 'os' has no attribute 'mknode'") - if mode is None: - # note that a default value of 0o600 without a device type is - # documented - this is not how it seems to work - mode = S_IFREG | 0o600 - if device or not mode & S_IFREG and not is_root(): - self.filesystem.raise_os_error(errno.EPERM) - - path = self._path_with_dir_fd(path, self.mknod, dir_fd) - head, tail = self.path.split(path) - if not tail: - if self.filesystem.exists(head, check_link=True): - self.filesystem.raise_os_error(errno.EEXIST, path) - self.filesystem.raise_os_error(errno.ENOENT, path) - if tail in (matching_string(tail, '.'), matching_string(tail, '..')): - self.filesystem.raise_os_error(errno.ENOENT, path) - if self.filesystem.exists(path, check_link=True): - self.filesystem.raise_os_error(errno.EEXIST, path) - self.filesystem.add_object(head, FakeFile( - tail, mode & ~self.filesystem.umask, - filesystem=self.filesystem)) - - def symlink(self, src: AnyStr, dst: AnyStr, *, - dir_fd: Optional[int] = None) -> None: - """Creates the specified symlink, pointed at the specified link target. - - Args: - src: The target of the symlink. - dst: Path to the symlink to create. - dir_fd: If not `None`, the file descriptor of a directory, - with `src` being relative to this directory. - - Raises: - OSError: if the file already exists. - """ - src = self._path_with_dir_fd(src, self.symlink, dir_fd) - self.filesystem.create_symlink( - dst, src, create_missing_dirs=False) - - def link(self, src: AnyStr, dst: AnyStr, *, - src_dir_fd: Optional[int] = None, - dst_dir_fd: Optional[int] = None) -> None: - """Create a hard link at new_path, pointing at old_path. - - Args: - src: An existing path to the target file. - dst: The destination path to create a new link at. - src_dir_fd: If not `None`, the file descriptor of a directory, - with `src` being relative to this directory. - dst_dir_fd: If not `None`, the file descriptor of a directory, - with `dst` being relative to this directory. - - Raises: - OSError: if something already exists at new_path. - OSError: if the parent directory doesn't exist. - """ - src = self._path_with_dir_fd(src, self.link, src_dir_fd) - dst = self._path_with_dir_fd(dst, self.link, dst_dir_fd) - self.filesystem.link(src, dst) - - def fsync(self, fd: int) -> None: - """Perform fsync for a fake file (in other words, do nothing). - - Args: - fd: The file descriptor of the open file. - - Raises: - OSError: file_des is an invalid file descriptor. - TypeError: file_des is not an integer. - """ - # Throw an error if file_des isn't valid - if 0 <= fd < NR_STD_STREAMS: - self.filesystem.raise_os_error(errno.EINVAL) - file_object = cast(FakeFileWrapper, self.filesystem.get_open_file(fd)) - if self.filesystem.is_windows_fs: - if (not hasattr(file_object, 'allow_update') or - not file_object.allow_update): - self.filesystem.raise_os_error( - errno.EBADF, file_object.file_path) - - def fdatasync(self, fd: int) -> None: - """Perform fdatasync for a fake file (in other words, do nothing). - - Args: - fd: The file descriptor of the open file. - - Raises: - OSError: `fd` is an invalid file descriptor. - TypeError: `fd` is not an integer. - """ - if self.filesystem.is_windows_fs or self.filesystem.is_macos: - raise AttributeError("module 'os' has no attribute 'fdatasync'") - # Throw an error if file_des isn't valid - if 0 <= fd < NR_STD_STREAMS: - self.filesystem.raise_os_error(errno.EINVAL) - self.filesystem.get_open_file(fd) - - def sendfile(self, fd_out: int, fd_in: int, - offset: int, count: int) -> int: - """Copy count bytes from file descriptor fd_in to file descriptor - fd_out starting at offset. - - Args: - fd_out: The file descriptor of the destination file. - fd_in: The file descriptor of the source file. - offset: The offset in bytes where to start the copy in the - source file. If `None` (Linux only), copying is started at - the current position, and the position is updated. - count: The number of bytes to copy. If 0, all remaining bytes - are copied (MacOs only). - - Raises: - OSError: If `fd_in` or `fd_out` is an invalid file descriptor. - TypeError: If `fd_in` or `fd_out` is not an integer. - TypeError: If `offset` is None under MacOs. - """ - if self.filesystem.is_windows_fs: - raise AttributeError("module 'os' has no attribute 'sendfile'") - if 0 <= fd_in < NR_STD_STREAMS: - self.filesystem.raise_os_error(errno.EINVAL) - if 0 <= fd_out < NR_STD_STREAMS: - self.filesystem.raise_os_error(errno.EINVAL) - source = cast(FakeFileWrapper, self.filesystem.get_open_file(fd_in)) - dest = cast(FakeFileWrapper, self.filesystem.get_open_file(fd_out)) - if self.filesystem.is_macos: - if dest.get_object().stat_result.st_mode & 0o777000 != S_IFSOCK: - raise OSError('Socket operation on non-socket') - if offset is None: - if self.filesystem.is_macos: - raise TypeError('None is not a valid offset') - contents = source.read(count) - else: - position = source.tell() - source.seek(offset) - if count == 0 and self.filesystem.is_macos: - contents = source.read() - else: - contents = source.read(count) - source.seek(position) - if contents: - written = dest.write(contents) - dest.flush() - return written - return 0 - - def __getattr__(self, name: str) -> Any: - """Forwards any unfaked calls to the standard os module.""" - return getattr(self._os_module, name) - - -class FakeIoModule: - """Uses FakeFilesystem to provide a fake io module replacement. - - Currently only used to wrap `io.open()` which is an alias to `open()`. - - You need a fake_filesystem to use this: - filesystem = fake_filesystem.FakeFilesystem() - my_io_module = fake_filesystem.FakeIoModule(filesystem) - """ - - @staticmethod - def dir() -> List[str]: - """Return the list of patched function names. Used for patching - functions imported from the module. - """ - _dir = ['open'] - if sys.version_info >= (3, 8): - _dir.append('open_code') - return _dir - - def __init__(self, filesystem: FakeFilesystem): - """ - Args: - filesystem: FakeFilesystem used to provide file system information. - """ - self.filesystem = filesystem - self.skip_names: List[str] = [] - self._io_module = io - - def open(self, file: Union[AnyStr, int], - mode: str = 'r', buffering: int = -1, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, - closefd: bool = True, - opener: Optional[Callable] = None) -> Union[AnyFileWrapper, - IO[Any]]: - """Redirect the call to FakeFileOpen. - See FakeFileOpen.call() for description. - """ - # workaround for built-in open called from skipped modules (see #552) - # as open is not imported explicitly, we cannot patch it for - # specific modules; instead we check if the caller is a skipped - # module (should work in most cases) - stack = traceback.extract_stack(limit=2) - module_name = os.path.splitext(stack[0].filename)[0] - module_name = module_name.replace(os.sep, '.') - if any([module_name == sn or module_name.endswith('.' + sn) - for sn in self.skip_names]): - return io.open(file, mode, buffering, encoding, errors, - newline, closefd, opener) - fake_open = FakeFileOpen(self.filesystem) - return fake_open(file, mode, buffering, encoding, errors, - newline, closefd, opener) - - if sys.version_info >= (3, 8): - def open_code(self, path): - """Redirect the call to open. Note that the behavior of the real - function may be overridden by an earlier call to the - PyFile_SetOpenCodeHook(). This behavior is not reproduced here. - """ - if not isinstance(path, str): - raise TypeError( - "open_code() argument 'path' must be str, not int") - patch_mode = self.filesystem.patch_open_code - if (patch_mode == PatchMode.AUTO and self.filesystem.exists(path) - or patch_mode == PatchMode.ON): - return self.open(path, mode='rb') - # mostly this is used for compiled code - - # don't patch these, as the files are probably in the real fs - return self._io_module.open_code(path) - - def __getattr__(self, name): - """Forwards any unfaked calls to the standard io module.""" - return getattr(self._io_module, name) - - -if sys.platform != 'win32': - import fcntl - - class FakeFcntlModule: - """Replaces the fcntl module. Only valid under Linux/MacOS, - currently just mocks the functionality away. - """ - - @staticmethod - def dir() -> List[str]: - """Return the list of patched function names. Used for patching - functions imported from the module. - """ - return ['fcntl', 'ioctl', 'flock', 'lockf'] - - def __init__(self, filesystem: FakeFilesystem): - """ - Args: - filesystem: FakeFilesystem used to provide file system - information (currently not used). - """ - self.filesystem = filesystem - self._fcntl_module = fcntl - - def fcntl(self, fd: int, cmd: int, arg: int = 0) -> Union[int, bytes]: - return 0 - - def ioctl(self, fd: int, request: int, arg: int = 0, - mutate_flag: bool = True) -> Union[int, bytes]: - return 0 - - def flock(self, fd: int, operation: int) -> None: - pass - - def lockf(self, fd: int, cmd: int, len: int = 0, - start: int = 0, whence=0) -> Any: - pass - - def __getattr__(self, name): - """Forwards any unfaked calls to the standard fcntl module.""" - return getattr(self._fcntl_module, name) - - -class FakeFileWrapper: - """Wrapper for a stream object for use by a FakeFile object. - - If the wrapper has any data written to it, it will propagate to - the FakeFile object on close() or flush(). - """ - - def __init__(self, file_object: FakeFile, - file_path: AnyStr, - update: bool, read: bool, append: bool, delete_on_close: bool, - filesystem: FakeFilesystem, - newline: Optional[str], binary: bool, closefd: bool, - encoding: Optional[str], errors: Optional[str], - buffering: int, raw_io: bool, is_stream: bool = False): - self.file_object = file_object - self.file_path = file_path # type: ignore[var-annotated] - self._append = append - self._read = read - self.allow_update = update - self._closefd = closefd - self._file_epoch = file_object.epoch - self.raw_io = raw_io - self._binary = binary - self.is_stream = is_stream - self._changed = False - self._buffer_size = buffering - if self._buffer_size == 0 and not binary: - raise ValueError("can't have unbuffered text I/O") - # buffer_size is ignored in text mode - elif self._buffer_size == -1 or not binary: - self._buffer_size = io.DEFAULT_BUFFER_SIZE - self._use_line_buffer = not binary and buffering == 1 - - contents = file_object.byte_contents - self._encoding = encoding or locale.getpreferredencoding(False) - errors = errors or 'strict' - self._io: Union[BinaryBufferIO, TextBufferIO] = ( - BinaryBufferIO(contents) if binary - else TextBufferIO(contents, encoding=encoding, - newline=newline, errors=errors) - ) - self._read_whence = 0 - self._read_seek = 0 - self._flush_pos = 0 - if contents: - self._flush_pos = len(contents) - if update: - if not append: - self._io.seek(0) - else: - self._io.seek(self._flush_pos) - self._read_seek = self._io.tell() - - if delete_on_close: - assert filesystem, 'delete_on_close=True requires filesystem' - self._filesystem = filesystem - self.delete_on_close = delete_on_close - # override, don't modify FakeFile.name, as FakeFilesystem expects - # it to be the file name only, no directories. - self.name = file_object.opened_as - self.filedes: Optional[int] = None - - def __enter__(self) -> 'FakeFileWrapper': - """To support usage of this fake file with the 'with' statement.""" - return self - - def __exit__(self, exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType] - ) -> None: - """To support usage of this fake file with the 'with' statement.""" - self.close() - - def _raise(self, message: str) -> NoReturn: - if self.raw_io: - self._filesystem.raise_os_error(errno.EBADF, self.file_path) - raise io.UnsupportedOperation(message) - - def get_object(self) -> FakeFile: - """Return the FakeFile object that is wrapped by the current instance. - """ - return self.file_object - - def fileno(self) -> int: - """Return the file descriptor of the file object.""" - if self.filedes is not None: - return self.filedes - raise OSError(errno.EBADF, 'Invalid file descriptor') - - def close(self) -> None: - """Close the file.""" - # ignore closing a closed file - if not self._is_open(): - return - - # for raw io, all writes are flushed immediately - if self.allow_update and not self.raw_io: - self.flush() - if self._filesystem.is_windows_fs and self._changed: - self.file_object.st_mtime = now() - - assert self.filedes is not None - if self._closefd: - self._filesystem._close_open_file(self.filedes) - else: - open_files = self._filesystem.open_files[self.filedes] - assert open_files is not None - open_files.remove(self) - if self.delete_on_close: - self._filesystem.remove_object( - self.get_object().path) # type: ignore[arg-type] - - @property - def closed(self) -> bool: - """Simulate the `closed` attribute on file.""" - return not self._is_open() - - def _try_flush(self, old_pos: int) -> None: - """Try to flush and reset the position if it fails.""" - flush_pos = self._flush_pos - try: - self.flush() - except OSError: - # write failed - reset to previous position - self._io.seek(old_pos) - self._io.truncate() - self._flush_pos = flush_pos - raise - - def flush(self) -> None: - """Flush file contents to 'disk'.""" - self._check_open_file() - if self.allow_update and not self.is_stream: - contents = self._io.getvalue() - if self._append: - self._sync_io() - old_contents = self.file_object.byte_contents - assert old_contents is not None - contents = old_contents + contents[self._flush_pos:] - self._set_stream_contents(contents) - else: - self._io.flush() - changed = self.file_object.set_contents(contents, self._encoding) - self.update_flush_pos() - if changed: - if self._filesystem.is_windows_fs: - self._changed = True - else: - current_time = now() - self.file_object.st_ctime = current_time - self.file_object.st_mtime = current_time - self._file_epoch = self.file_object.epoch - - if not self.is_stream: - self._flush_related_files() - - def update_flush_pos(self) -> None: - self._flush_pos = self._io.tell() - - def _flush_related_files(self) -> None: - for open_files in self._filesystem.open_files[3:]: - if open_files is not None: - for open_file in open_files: - if (open_file is not self and - isinstance(open_file, FakeFileWrapper) and - self.file_object == open_file.file_object and - not open_file._append): - open_file._sync_io() - - def seek(self, offset: int, whence: int = 0) -> None: - """Move read/write pointer in 'file'.""" - self._check_open_file() - if not self._append: - self._io.seek(offset, whence) - else: - self._read_seek = offset - self._read_whence = whence - if not self.is_stream: - self.flush() - - def tell(self) -> int: - """Return the file's current position. - - Returns: - int, file's current position in bytes. - """ - self._check_open_file() - if not self.is_stream: - self.flush() - - if not self._append: - return self._io.tell() - if self._read_whence: - write_seek = self._io.tell() - self._io.seek(self._read_seek, self._read_whence) - self._read_seek = self._io.tell() - self._read_whence = 0 - self._io.seek(write_seek) - return self._read_seek - - def _sync_io(self) -> None: - """Update the stream with changes to the file object contents.""" - if self._file_epoch == self.file_object.epoch: - return - - contents = self.file_object.byte_contents - assert contents is not None - self._set_stream_contents(contents) - self._file_epoch = self.file_object.epoch - - def _set_stream_contents(self, contents: bytes) -> None: - whence = self._io.tell() - self._io.seek(0) - self._io.truncate() - self._io.putvalue(contents) - if not self._append: - self._io.seek(whence) - - def _read_wrappers(self, name: str) -> Callable: - """Wrap a stream attribute in a read wrapper. - - Returns a read_wrapper which tracks our own read pointer since the - stream object has no concept of a different read and write pointer. - - Args: - name: The name of the attribute to wrap. Should be a read call. - - Returns: - The read_wrapper function. - """ - io_attr = getattr(self._io, name) - - def read_wrapper(*args, **kwargs): - """Wrap all read calls to the stream object. - - We do this to track the read pointer separate from the write - pointer. Anything that wants to read from the stream object - while we're in append mode goes through this. - - Args: - *args: pass through args - **kwargs: pass through kwargs - Returns: - Wrapped stream object method - """ - self._io.seek(self._read_seek, self._read_whence) - ret_value = io_attr(*args, **kwargs) - self._read_seek = self._io.tell() - self._read_whence = 0 - self._io.seek(0, 2) - return ret_value - - return read_wrapper - - def _other_wrapper(self, name: str) -> Callable: - """Wrap a stream attribute in an other_wrapper. - - Args: - name: the name of the stream attribute to wrap. - - Returns: - other_wrapper which is described below. - """ - io_attr = getattr(self._io, name) - - def other_wrapper(*args, **kwargs): - """Wrap all other calls to the stream Object. - - We do this to track changes to the write pointer. Anything that - moves the write pointer in a file open for appending should move - the read pointer as well. - - Args: - *args: Pass through args. - **kwargs: Pass through kwargs. - - Returns: - Wrapped stream object method. - """ - write_seek = self._io.tell() - ret_value = io_attr(*args, **kwargs) - if write_seek != self._io.tell(): - self._read_seek = self._io.tell() - self._read_whence = 0 - - return ret_value - - return other_wrapper - - def _write_wrapper(self, name: str) -> Callable: - """Wrap a stream attribute in a write_wrapper. - - Args: - name: the name of the stream attribute to wrap. - - Returns: - write_wrapper which is described below. - """ - io_attr = getattr(self._io, name) - - def write_wrapper(*args, **kwargs): - """Wrap all other calls to the stream Object. - - We do this to track changes to the write pointer. Anything that - moves the write pointer in a file open for appending should move - the read pointer as well. - - Args: - *args: Pass through args. - **kwargs: Pass through kwargs. - - Returns: - Wrapped stream object method. - """ - old_pos = self._io.tell() - ret_value = io_attr(*args, **kwargs) - new_pos = self._io.tell() - - # if the buffer size is exceeded, we flush - use_line_buf = self._use_line_buffer and '\n' in args[0] - if new_pos - self._flush_pos > self._buffer_size or use_line_buf: - flush_all = (new_pos - old_pos > self._buffer_size or - use_line_buf) - # if the current write does not exceed the buffer size, - # we revert to the previous position and flush that, - # otherwise we flush all - if not flush_all: - self._io.seek(old_pos) - self._io.truncate() - self._try_flush(old_pos) - if not flush_all: - ret_value = io_attr(*args, **kwargs) - if self._append: - self._read_seek = self._io.tell() - self._read_whence = 0 - return ret_value - - return write_wrapper - - def _adapt_size_for_related_files(self, size: int) -> None: - for open_files in self._filesystem.open_files[3:]: - if open_files is not None: - for open_file in open_files: - if (open_file is not self and - isinstance(open_file, FakeFileWrapper) and - self.file_object == open_file.file_object and - cast(FakeFileWrapper, open_file)._append): - open_file._read_seek += size - - def _truncate_wrapper(self) -> Callable: - """Wrap truncate() to allow flush after truncate. - - Returns: - Wrapper which is described below. - """ - io_attr = getattr(self._io, 'truncate') - - def truncate_wrapper(*args, **kwargs): - """Wrap truncate call to call flush after truncate.""" - if self._append: - self._io.seek(self._read_seek, self._read_whence) - size = io_attr(*args, **kwargs) - self.flush() - if not self.is_stream: - self.file_object.size = size - buffer_size = len(self._io.getvalue()) - if buffer_size < size: - self._io.seek(buffer_size) - self._io.putvalue(b'\0' * (size - buffer_size)) - self.file_object.set_contents( - self._io.getvalue(), self._encoding) - self._flush_pos = size - self._adapt_size_for_related_files(size - buffer_size) - - self.flush() - return size - - return truncate_wrapper - - def size(self) -> int: - """Return the content size in bytes of the wrapped file.""" - return self.file_object.st_size - - def __getattr__(self, name: str) -> Any: - if self.file_object.is_large_file(): - raise FakeLargeFileIoException(self.file_path) - - reading = name.startswith('read') or name == 'next' - truncate = name == 'truncate' - writing = name.startswith('write') or truncate - - if reading or writing: - self._check_open_file() - if not self._read and reading: - return self._read_error() - if not self.allow_update and writing: - return self._write_error() - - if reading: - self._sync_io() - if not self.is_stream: - self.flush() - if not self._filesystem.is_windows_fs: - self.file_object.st_atime = now() - if truncate: - return self._truncate_wrapper() - if self._append: - if reading: - return self._read_wrappers(name) - elif not writing: - return self._other_wrapper(name) - if writing: - return self._write_wrapper(name) - - return getattr(self._io, name) - - def _read_error(self) -> Callable: - def read_error(*args, **kwargs): - """Throw an error unless the argument is zero.""" - if args and args[0] == 0: - if self._filesystem.is_windows_fs and self.raw_io: - return b'' if self._binary else u'' - self._raise('File is not open for reading.') - - return read_error - - def _write_error(self) -> Callable: - def write_error(*args, **kwargs): - """Throw an error.""" - if self.raw_io: - if (self._filesystem.is_windows_fs and args - and len(args[0]) == 0): - return 0 - self._raise('File is not open for writing.') - - return write_error - - def _is_open(self) -> bool: - if (self.filedes is not None and - self.filedes < len(self._filesystem.open_files)): - open_files = self._filesystem.open_files[self.filedes] - if open_files is not None and self in open_files: - return True - return False - - def _check_open_file(self) -> None: - if not self.is_stream and not self._is_open(): - raise ValueError('I/O operation on closed file') - - def __iter__(self) -> Union[Iterator[str], Iterator[bytes]]: - if not self._read: - self._raise('File is not open for reading') - return self._io.__iter__() - - def __next__(self): - if not self._read: - self._raise('File is not open for reading') - return next(self._io) - - -class StandardStreamWrapper: - """Wrapper for a system standard stream to be used in open files list. - """ - - def __init__(self, stream_object: TextIO): - self._stream_object = stream_object - self.filedes: Optional[int] = None - - def get_object(self) -> TextIO: - return self._stream_object - - def fileno(self) -> int: - """Return the file descriptor of the wrapped standard stream.""" - if self.filedes is not None: - return self.filedes - raise OSError(errno.EBADF, 'Invalid file descriptor') - - def read(self, n: int = -1) -> bytes: - return cast(bytes, self._stream_object.read()) - - def close(self) -> None: - """We do not support closing standard streams.""" - pass - - def is_stream(self) -> bool: - return True - - -class FakeDirWrapper: - """Wrapper for a FakeDirectory object to be used in open files list. - """ - - def __init__(self, file_object: FakeDirectory, - file_path: AnyString, filesystem: FakeFilesystem): - self.file_object = file_object - self.file_path = file_path - self._filesystem = filesystem - self.filedes: Optional[int] = None - - def get_object(self) -> FakeDirectory: - """Return the FakeFile object that is wrapped by the current instance. - """ - return self.file_object - - def fileno(self) -> int: - """Return the file descriptor of the file object.""" - if self.filedes is not None: - return self.filedes - raise OSError(errno.EBADF, 'Invalid file descriptor') - - def close(self) -> None: - """Close the directory.""" - assert self.filedes is not None - self._filesystem._close_open_file(self.filedes) - - -class FakePipeWrapper: - """Wrapper for a read or write descriptor of a real pipe object to be - used in open files list. - """ - - def __init__(self, filesystem: FakeFilesystem, - fd: int, can_write: bool, mode: str = ''): - self._filesystem = filesystem - self.fd = fd # the real file descriptor - self.can_write = can_write - self.file_object = None - self.filedes: Optional[int] = None - self.real_file = None - if mode: - self.real_file = open(fd, mode) - - def __enter__(self) -> 'FakePipeWrapper': - """To support usage of this fake pipe with the 'with' statement.""" - return self - - def __exit__(self, exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType] - ) -> None: - """To support usage of this fake pipe with the 'with' statement.""" - self.close() - - def get_object(self) -> None: - return self.file_object - - def fileno(self) -> int: - """Return the fake file descriptor of the pipe object.""" - if self.filedes is not None: - return self.filedes - raise OSError(errno.EBADF, 'Invalid file descriptor') - - def read(self, numBytes: int = -1) -> bytes: - """Read from the real pipe.""" - if self.real_file: - return self.real_file.read(numBytes) - return os.read(self.fd, numBytes) - - def flush(self) -> None: - """Flush the real pipe?""" - pass - - def write(self, contents: bytes) -> int: - """Write to the real pipe.""" - if self.real_file: - return self.real_file.write(contents) - return os.write(self.fd, contents) - - def close(self) -> None: - """Close the pipe descriptor.""" - assert self.filedes is not None - open_files = self._filesystem.open_files[self.filedes] - assert open_files is not None - open_files.remove(self) - if self.real_file: - self.real_file.close() - else: - os.close(self.fd) - - def readable(self) -> bool: - """The pipe end can either be readable or writable.""" - return not self.can_write - - def writable(self) -> bool: - """The pipe end can either be readable or writable.""" - return self.can_write - - def seekable(self) -> bool: - """A pipe is not seekable.""" - return False - - -Deprecator.add(FakeFileWrapper, FakeFileWrapper.get_object, 'GetObject') -Deprecator.add(FakeFileWrapper, FakeFileWrapper.size, 'Size') - - -class FakeFileOpen: - """Faked `file()` and `open()` function replacements. - - Returns FakeFile objects in a FakeFilesystem in place of the `file()` - or `open()` function. - """ - __name__ = 'FakeFileOpen' - - def __init__(self, filesystem: FakeFilesystem, - delete_on_close: bool = False, raw_io: bool = False): - """ - Args: - filesystem: FakeFilesystem used to provide file system information - delete_on_close: optional boolean, deletes file on close() - """ - self.filesystem = filesystem - self._delete_on_close = delete_on_close - self.raw_io = raw_io - - def __call__(self, *args: Any, **kwargs: Any) -> AnyFileWrapper: - """Redirects calls to file() or open() to appropriate method.""" - return self.call(*args, **kwargs) - - def call(self, file_: Union[AnyStr, int], - mode: str = 'r', - buffering: int = -1, - encoding: Optional[str] = None, - errors: Optional[str] = None, - newline: Optional[str] = None, - closefd: bool = True, - opener: Any = None, - open_modes: Optional[_OpenModes] = None) -> AnyFileWrapper: - """Return a file-like object with the contents of the target - file object. - - Args: - file_: Path to target file or a file descriptor. - mode: Additional file modes (all modes in `open()` are supported). - buffering: the buffer size used for writing. Data will only be - flushed if buffer size is exceeded. The default (-1) uses a - system specific default buffer size. Text line mode (e.g. - buffering=1 in text mode) is not supported. - encoding: The encoding used to encode unicode strings / decode - bytes. - errors: (str) Defines how encoding errors are handled. - newline: Controls universal newlines, passed to stream object. - closefd: If a file descriptor rather than file name is passed, - and this is set to `False`, then the file descriptor is kept - open when file is closed. - opener: not supported. - open_modes: Modes for opening files if called from low-level API. - - Returns: - A file-like object containing the contents of the target file. - - Raises: - OSError depending on Python version / call mode: - - if the target object is a directory - - on an invalid path - - if the file does not exist when it should - - if the file exists but should not - - if permission is denied - ValueError: for an invalid mode or mode combination - """ - binary = 'b' in mode - - if binary and encoding: - raise ValueError("binary mode doesn't take an encoding argument") - - newline, open_modes = self._handle_file_mode(mode, newline, open_modes) - - file_object, file_path, filedes, real_path = self._handle_file_arg( - file_) - if file_object is None and file_path is None: - # file must be a fake pipe wrapper, find it... - if (filedes is None or - len(self.filesystem.open_files) <= filedes or - not self.filesystem.open_files[filedes]): - raise OSError(errno.EBADF, 'invalid pipe file descriptor') - wrappers = self.filesystem.open_files[filedes] - assert wrappers is not None - existing_wrapper = wrappers[0] - assert isinstance(existing_wrapper, FakePipeWrapper) - wrapper = FakePipeWrapper(self.filesystem, existing_wrapper.fd, - existing_wrapper.can_write, mode) - file_des = self.filesystem._add_open_file(wrapper) - wrapper.filedes = file_des - return wrapper - - assert file_path is not None - if not filedes: - closefd = True - - if (open_modes.must_not_exist and - (file_object or self.filesystem.islink(file_path) and - not self.filesystem.is_windows_fs)): - self.filesystem.raise_os_error(errno.EEXIST, file_path) - - assert real_path is not None - file_object = self._init_file_object(file_object, - file_path, open_modes, - real_path) - - if S_ISDIR(file_object.st_mode): - if self.filesystem.is_windows_fs: - self.filesystem.raise_os_error(errno.EACCES, file_path) - else: - self.filesystem.raise_os_error(errno.EISDIR, file_path) - - # If you print obj.name, the argument to open() must be printed. - # Not the abspath, not the filename, but the actual argument. - file_object.opened_as = file_path - if open_modes.truncate: - current_time = now() - file_object.st_mtime = current_time - if not self.filesystem.is_windows_fs: - file_object.st_ctime = current_time - - fakefile = FakeFileWrapper(file_object, - file_path, - update=open_modes.can_write, - read=open_modes.can_read, - append=open_modes.append, - delete_on_close=self._delete_on_close, - filesystem=self.filesystem, - newline=newline, - binary=binary, - closefd=closefd, - encoding=encoding, - errors=errors, - buffering=buffering, - raw_io=self.raw_io) - if filedes is not None: - fakefile.filedes = filedes - # replace the file wrapper - open_files_list = self.filesystem.open_files[filedes] - assert open_files_list is not None - open_files_list.append(fakefile) - else: - fakefile.filedes = self.filesystem._add_open_file(fakefile) - return fakefile - - def _init_file_object(self, file_object: Optional[FakeFile], - file_path: AnyStr, - open_modes: _OpenModes, - real_path: AnyString) -> FakeFile: - if file_object: - if (not is_root() and - ((open_modes.can_read and - not file_object.st_mode & PERM_READ) - or (open_modes.can_write and - not file_object.st_mode & PERM_WRITE))): - self.filesystem.raise_os_error(errno.EACCES, file_path) - if open_modes.can_write: - if open_modes.truncate: - file_object.set_contents('') - else: - if open_modes.must_exist: - self.filesystem.raise_os_error(errno.ENOENT, file_path) - if self.filesystem.islink(file_path): - link_object = self.filesystem.resolve(file_path, - follow_symlinks=False) - assert link_object.contents is not None - target_path = cast(AnyStr, link_object.contents) - else: - target_path = file_path - if self.filesystem.ends_with_path_separator(target_path): - error = ( - errno.EINVAL if self.filesystem.is_windows_fs - else errno.ENOENT if self.filesystem.is_macos - else errno.EISDIR - ) - self.filesystem.raise_os_error(error, file_path) - file_object = self.filesystem.create_file_internally( - real_path, create_missing_dirs=False, - apply_umask=True) - return file_object - - def _handle_file_arg(self, file_: Union[AnyStr, int]) -> Tuple[ - Optional[FakeFile], Optional[AnyStr], - Optional[int], Optional[AnyStr]]: - file_object = None - if isinstance(file_, int): - # opening a file descriptor - filedes: int = file_ - wrapper = self.filesystem.get_open_file(filedes) - if isinstance(wrapper, FakePipeWrapper): - return None, None, filedes, None - if isinstance(wrapper, FakeFileWrapper): - self._delete_on_close = wrapper.delete_on_close - file_object = cast(FakeFile, self.filesystem.get_open_file( - filedes).get_object()) - assert file_object is not None - path = file_object.name - return file_object, cast(AnyStr, path), filedes, cast(AnyStr, path) - - # open a file file by path - file_path = cast(AnyStr, file_) - if file_path == self.filesystem.dev_null.name: - file_object = self.filesystem.dev_null - real_path = file_path - else: - real_path = self.filesystem.resolve_path(file_path) - if self.filesystem.exists(file_path): - file_object = self.filesystem.get_object_from_normpath( - real_path, check_read_perm=False) - return file_object, file_path, None, real_path - - def _handle_file_mode( - self, mode: str, - newline: Optional[str], - open_modes: Optional[_OpenModes]) -> Tuple[Optional[str], - _OpenModes]: - orig_modes = mode # Save original modes for error messages. - # Normalize modes. Handle 't' and 'U'. - if 'b' in mode and 't' in mode: - raise ValueError('Invalid mode: ' + mode) - mode = mode.replace('t', '').replace('b', '') - mode = mode.replace('rU', 'r').replace('U', 'r') - if not self.raw_io: - if mode not in _OPEN_MODE_MAP: - raise ValueError('Invalid mode: %r' % orig_modes) - open_modes = _OpenModes(*_OPEN_MODE_MAP[mode]) - assert open_modes is not None - return newline, open_modes + def _create_temp_dir(self): + # the temp directory is assumed to exist at least in `tempfile`, + # so we create it here for convenience + temp_dir = tempfile.gettempdir() + if not self.exists(temp_dir): + self.create_dir(temp_dir) + if sys.platform != "win32" and not self.exists("/tmp"): + # under Posix, we also create a link in /tmp if the path does not exist + self.create_symlink("/tmp", temp_dir) + # reset the used size to 0 to avoid having the link size counted + # which would make disk size tests more complicated + next(iter(self.mount_points.values()))["used_size"] = 0 def _run_doctest() -> TestResults: import doctest import pyfakefs + return doctest.testmod(pyfakefs.fake_filesystem) -if __name__ == '__main__': +def __getattr__(name): + # backwards compatibility for read access to globals moved to helpers + if name == "USER_ID": + return helpers.USER_ID + if name == "GROUP_ID": + return helpers.GROUP_ID + raise AttributeError(f"No attribute {name!r}.") + + +if __name__ == "__main__": _run_doctest() |