diff options
Diffstat (limited to 'pyfakefs/fake_file.py')
-rw-r--r-- | pyfakefs/fake_file.py | 1316 |
1 files changed, 1316 insertions, 0 deletions
diff --git a/pyfakefs/fake_file.py b/pyfakefs/fake_file.py new file mode 100644 index 0000000..5bc3119 --- /dev/null +++ b/pyfakefs/fake_file.py @@ -0,0 +1,1316 @@ +# Copyright 2009 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Fake implementations for different file objects. +""" +import errno +import io +import locale +import os +import sys +from stat import ( + S_IFREG, + S_IFDIR, +) +from types import TracebackType +from typing import ( + List, + Optional, + Callable, + Union, + Any, + Dict, + cast, + AnyStr, + NoReturn, + Iterator, + TextIO, + Type, + TYPE_CHECKING, +) + +from pyfakefs import helpers +from pyfakefs.helpers import ( + FakeStatResult, + BinaryBufferIO, + TextBufferIO, + is_int_type, + is_unicode_string, + to_string, + matching_string, + real_encoding, + AnyPath, + AnyString, +) + +if TYPE_CHECKING: + from pyfakefs.fake_filesystem import FakeFilesystem + +AnyFileWrapper = Union[ + "FakeFileWrapper", + "FakeDirWrapper", + "StandardStreamWrapper", + "FakePipeWrapper", +] +AnyFile = Union["FakeFile", "FakeDirectory"] + + +class FakeLargeFileIoException(Exception): + """Exception thrown on unsupported operations for fake large files. + Fake large files have a size with no real content. + """ + + def __init__(self, file_path: str) -> None: + super(FakeLargeFileIoException, self).__init__( + "Read and write operations not supported for " + "fake large file: %s" % file_path + ) + + +class FakeFile: + """Provides the appearance of a real file. + + Attributes currently faked out: + * `st_mode`: user-specified, otherwise S_IFREG + * `st_ctime`: the time.time() timestamp of the file change time (updated + each time a file's attributes is modified). + * `st_atime`: the time.time() timestamp when the file was last accessed. + * `st_mtime`: the time.time() timestamp when the file was last modified. + * `st_size`: the size of the file + * `st_nlink`: the number of hard links to the file + * `st_ino`: the inode number - a unique number identifying the file + * `st_dev`: a unique number identifying the (fake) file system device + the file belongs to + * `st_uid`: always set to USER_ID, which can be changed globally using + `set_uid` + * `st_gid`: always set to GROUP_ID, which can be changed globally using + `set_gid` + + .. note:: The resolution for `st_ctime`, `st_mtime` and `st_atime` in the + real file system depends on the used file system (for example it is + only 1s for HFS+ and older Linux file systems, but much higher for + ext4 and NTFS). This is currently ignored by pyfakefs, which uses + the resolution of `time.time()`. + + Under Windows, `st_atime` is not updated for performance reasons by + default. pyfakefs never updates `st_atime` under Windows, assuming + the default setting. + """ + + stat_types = ( + "st_mode", + "st_ino", + "st_dev", + "st_nlink", + "st_uid", + "st_gid", + "st_size", + "st_atime", + "st_mtime", + "st_ctime", + "st_atime_ns", + "st_mtime_ns", + "st_ctime_ns", + ) + + def __init__( + self, + name: AnyStr, + st_mode: int = S_IFREG | helpers.PERM_DEF_FILE, + contents: Optional[AnyStr] = None, + filesystem: Optional["FakeFilesystem"] = None, + encoding: Optional[str] = None, + errors: Optional[str] = None, + side_effect: Optional[Callable[["FakeFile"], None]] = None, + ): + """ + Args: + name: Name of the file/directory, without parent path information + st_mode: The stat.S_IF* constant representing the file type (i.e. + stat.S_IFREG, stat.S_IFDIR), and the file permissions. + If no file type is set (e.g. permission flags only), a + regular file type is assumed. + contents: The contents of the filesystem object; should be a string + or byte object for regular files, and a dict of other + FakeFile or FakeDirectory objects with the file names as + keys for FakeDirectory objects + filesystem: The fake filesystem where the file is created. + encoding: If contents is a unicode string, the encoding used + for serialization. + errors: The error mode used for encoding/decoding errors. + side_effect: function handle that is executed when file is written, + must accept the file object as an argument. + """ + # to be backwards compatible regarding argument order, we raise on None + if filesystem is None: + raise ValueError("filesystem shall not be None") + self.filesystem: "FakeFilesystem" = filesystem + self._side_effect: Optional[Callable] = side_effect + self.name: AnyStr = name # type: ignore[assignment] + self.stat_result = FakeStatResult( + filesystem.is_windows_fs, + helpers.get_uid(), + helpers.get_gid(), + helpers.now(), + ) + if st_mode >> 12 == 0: + st_mode |= S_IFREG + self.stat_result.st_mode = st_mode + self.st_size: int = 0 + self.encoding: Optional[str] = real_encoding(encoding) + self.errors: str = errors or "strict" + self._byte_contents: Optional[bytes] = self._encode_contents(contents) + self.stat_result.st_size = ( + len(self._byte_contents) if self._byte_contents is not None else 0 + ) + self.epoch: int = 0 + self.parent_dir: Optional[FakeDirectory] = None + # Linux specific: extended file system attributes + self.xattr: Dict = {} + self.opened_as: AnyString = "" + + @property + def byte_contents(self) -> Optional[bytes]: + """Return the contents as raw byte array.""" + return self._byte_contents + + @property + def contents(self) -> Optional[str]: + """Return the contents as string with the original encoding.""" + if isinstance(self.byte_contents, bytes): + return self.byte_contents.decode( + self.encoding or locale.getpreferredencoding(False), + errors=self.errors, + ) + return None + + @property + def st_ctime(self) -> float: + """Return the creation time of the fake file.""" + return self.stat_result.st_ctime + + @st_ctime.setter + def st_ctime(self, val: float) -> None: + """Set the creation time of the fake file.""" + self.stat_result.st_ctime = val + + @property + def st_atime(self) -> float: + """Return the access time of the fake file.""" + return self.stat_result.st_atime + + @st_atime.setter + def st_atime(self, val: float) -> None: + """Set the access time of the fake file.""" + self.stat_result.st_atime = val + + @property + def st_mtime(self) -> float: + """Return the modification time of the fake file.""" + return self.stat_result.st_mtime + + @st_mtime.setter + def st_mtime(self, val: float) -> None: + """Set the modification time of the fake file.""" + self.stat_result.st_mtime = val + + def set_large_file_size(self, st_size: int) -> None: + """Sets the self.st_size attribute and replaces self.content with None. + + Provided specifically to simulate very large files without regards + to their content (which wouldn't fit in memory). + Note that read/write operations with such a file raise + :py:class:`FakeLargeFileIoException`. + + Args: + st_size: (int) The desired file size + + Raises: + OSError: if the st_size is not a non-negative integer, + or if st_size exceeds the available file system space + """ + self._check_positive_int(st_size) + if self.st_size: + self.size = 0 + if self.filesystem: + self.filesystem.change_disk_usage(st_size, self.name, self.st_dev) + self.st_size = st_size + self._byte_contents = None + + def _check_positive_int(self, size: int) -> None: + # the size should be an positive integer value + if not is_int_type(size) or size < 0: + self.filesystem.raise_os_error(errno.ENOSPC, self.name) + + def is_large_file(self) -> bool: + """Return `True` if this file was initialized with size + but no contents. + """ + return self._byte_contents is None + + def _encode_contents(self, contents: Union[str, bytes, None]) -> Optional[bytes]: + if is_unicode_string(contents): + contents = bytes( + cast(str, contents), + self.encoding or locale.getpreferredencoding(False), + self.errors, + ) + return cast(bytes, contents) + + def set_initial_contents(self, contents: AnyStr) -> bool: + """Sets the file contents and size. + Called internally after initial file creation. + + Args: + contents: string, new content of file. + + Returns: + True if the contents have been changed. + + Raises: + OSError: if the st_size is not a non-negative integer, + or if st_size exceeds the available file system space + """ + byte_contents = self._encode_contents(contents) + changed = self._byte_contents != byte_contents + st_size = len(byte_contents) if byte_contents else 0 + + current_size = self.st_size or 0 + self.filesystem.change_disk_usage( + st_size - current_size, self.name, self.st_dev + ) + self._byte_contents = byte_contents + self.st_size = st_size + self.epoch += 1 + return changed + + def set_contents(self, contents: AnyStr, encoding: Optional[str] = None) -> bool: + """Sets the file contents and size and increases the modification time. + Also executes the side_effects if available. + + Args: + contents: (str, bytes) new content of file. + encoding: (str) the encoding to be used for writing the contents + if they are a unicode string. + If not given, the locale preferred encoding is used. + + Returns: + True if the contents have been changed. + + Raises: + OSError: if `st_size` is not a non-negative integer, + or if it exceeds the available file system space. + """ + self.encoding = real_encoding(encoding) + changed = self.set_initial_contents(contents) + if self._side_effect is not None: + self._side_effect(self) + return changed + + @property + def size(self) -> int: + """Return the size in bytes of the file contents.""" + return self.st_size + + @size.setter + def size(self, st_size: int) -> None: + """Resizes file content, padding with nulls if new size exceeds the + old size. + + Args: + st_size: The desired size for the file. + + Raises: + OSError: if the st_size arg is not a non-negative integer + or if st_size exceeds the available file system space + """ + + self._check_positive_int(st_size) + current_size = self.st_size or 0 + self.filesystem.change_disk_usage( + st_size - current_size, self.name, self.st_dev + ) + if self._byte_contents: + if st_size < current_size: + self._byte_contents = self._byte_contents[:st_size] + else: + self._byte_contents += b"\0" * (st_size - current_size) + self.st_size = st_size + self.epoch += 1 + + @property + def path(self) -> AnyStr: + """Return the full path of the current object.""" + names: List[AnyStr] = [] # pytype: disable=invalid-annotation + obj: Optional[FakeFile] = self + while obj: + names.insert(0, matching_string(self.name, obj.name)) # type: ignore + obj = obj.parent_dir + sep = self.filesystem.get_path_separator(names[0]) + if names[0] == sep: + names.pop(0) + dir_path = sep.join(names) + drive = self.filesystem.splitdrive(dir_path)[0] + # if a Windows path already starts with a drive or UNC path, + # no extra separator is needed + if not drive: + dir_path = sep + dir_path + else: + dir_path = sep.join(names) + return self.filesystem.absnormpath(dir_path) + + if sys.version_info >= (3, 12): + + @property + def is_junction(self) -> bool: + return self.filesystem.isjunction(self.path) + + def __getattr__(self, item: str) -> Any: + """Forward some properties to stat_result.""" + if item in self.stat_types: + return getattr(self.stat_result, item) + return super().__getattribute__(item) + + def __setattr__(self, key: str, value: Any) -> None: + """Forward some properties to stat_result.""" + if key in self.stat_types: + return setattr(self.stat_result, key, value) + return super().__setattr__(key, value) + + def __str__(self) -> str: + return "%r(%o)" % (self.name, self.st_mode) + + +class FakeNullFile(FakeFile): + def __init__(self, filesystem: "FakeFilesystem") -> None: + devnull = "nul" if filesystem.is_windows_fs else "/dev/null" + super(FakeNullFile, self).__init__(devnull, filesystem=filesystem, contents="") + + @property + def byte_contents(self) -> bytes: + return b"" + + def set_initial_contents(self, contents: AnyStr) -> bool: + return False + + +class FakeFileFromRealFile(FakeFile): + """Represents a fake file copied from the real file system. + + The contents of the file are read on demand only. + """ + + def __init__( + self, + file_path: str, + filesystem: "FakeFilesystem", + side_effect: Optional[Callable] = None, + ) -> None: + """ + Args: + file_path: Path to the existing file. + filesystem: The fake filesystem where the file is created. + + Raises: + OSError: if the file does not exist in the real file system. + OSError: if the file already exists in the fake file system. + """ + super().__init__( + name=os.path.basename(file_path), + filesystem=filesystem, + side_effect=side_effect, + ) + self.contents_read = False + + @property + def byte_contents(self) -> Optional[bytes]: + if not self.contents_read: + self.contents_read = True + with io.open(self.file_path, "rb") as f: + self._byte_contents = f.read() + # On MacOS and BSD, the above io.open() updates atime on the real file + self.st_atime = os.stat(self.file_path).st_atime + return self._byte_contents + + def set_contents(self, contents, encoding=None): + self.contents_read = True + super(FakeFileFromRealFile, self).set_contents(contents, encoding) + + def is_large_file(self): + """The contents are never faked.""" + return False + + +class FakeDirectory(FakeFile): + """Provides the appearance of a real directory.""" + + def __init__( + self, + name: str, + perm_bits: int = helpers.PERM_DEF, + filesystem: Optional["FakeFilesystem"] = None, + ): + """ + Args: + name: name of the file/directory, without parent path information + perm_bits: permission bits. defaults to 0o777. + filesystem: if set, the fake filesystem where the directory + is created + """ + FakeFile.__init__(self, name, S_IFDIR | perm_bits, "", filesystem=filesystem) + # directories have the link count of contained entries, + # including '.' and '..' + self.st_nlink += 1 + self._entries: Dict[str, AnyFile] = {} + + def set_contents(self, contents: AnyStr, encoding: Optional[str] = None) -> bool: + raise self.filesystem.raise_os_error(errno.EISDIR, self.path) + + @property + def entries(self) -> Dict[str, FakeFile]: + """Return the list of contained directory entries.""" + return self._entries + + @property + def ordered_dirs(self) -> List[str]: + """Return the list of contained directory entry names ordered by + creation order. + """ + return [ + item[0] + for item in sorted(self._entries.items(), key=lambda entry: entry[1].st_ino) + ] + + def add_entry(self, path_object: FakeFile) -> None: + """Adds a child FakeFile to this directory. + + Args: + path_object: FakeFile instance to add as a child of this directory. + + Raises: + OSError: if the directory has no write permission (Posix only) + OSError: if the file or directory to be added already exists + """ + if ( + not helpers.is_root() + and not self.st_mode & helpers.PERM_WRITE + and not self.filesystem.is_windows_fs + ): + raise OSError(errno.EACCES, "Permission Denied", self.path) + + path_object_name: str = to_string(path_object.name) + if path_object_name in self.entries: + self.filesystem.raise_os_error(errno.EEXIST, self.path) + + self._entries[path_object_name] = path_object + path_object.parent_dir = self + if path_object.st_ino is None: + self.filesystem.last_ino += 1 + path_object.st_ino = self.filesystem.last_ino + self.st_nlink += 1 + path_object.st_nlink += 1 + path_object.st_dev = self.st_dev + if path_object.st_nlink == 1: + self.filesystem.change_disk_usage( + path_object.size, path_object.name, self.st_dev + ) + + def get_entry(self, pathname_name: str) -> AnyFile: + """Retrieves the specified child file or directory entry. + + Args: + pathname_name: The basename of the child object to retrieve. + + Returns: + The fake file or directory object. + + Raises: + KeyError: if no child exists by the specified name. + """ + pathname_name = self._normalized_entryname(pathname_name) + return self.entries[to_string(pathname_name)] + + def _normalized_entryname(self, pathname_name: str) -> str: + if not self.filesystem.is_case_sensitive: + matching_names = [ + name for name in self.entries if name.lower() == pathname_name.lower() + ] + if matching_names: + pathname_name = matching_names[0] + return pathname_name + + def remove_entry(self, pathname_name: str, recursive: bool = True) -> None: + """Removes the specified child file or directory. + + Args: + pathname_name: Basename of the child object to remove. + recursive: If True (default), the entries in contained directories + are deleted first. Used to propagate removal errors + (e.g. permission problems) from contained entries. + + Raises: + KeyError: if no child exists by the specified name. + OSError: if user lacks permission to delete the file, + or (Windows only) the file is open. + """ + pathname_name = self._normalized_entryname(pathname_name) + entry = self.get_entry(pathname_name) + if self.filesystem.is_windows_fs: + if entry.st_mode & helpers.PERM_WRITE == 0: + self.filesystem.raise_os_error(errno.EACCES, pathname_name) + if self.filesystem.has_open_file(entry): + self.filesystem.raise_os_error(errno.EACCES, pathname_name) + else: + if not helpers.is_root() and ( + self.st_mode & (helpers.PERM_WRITE | helpers.PERM_EXE) + != helpers.PERM_WRITE | helpers.PERM_EXE + ): + self.filesystem.raise_os_error(errno.EACCES, pathname_name) + + if recursive and isinstance(entry, FakeDirectory): + while entry.entries: + entry.remove_entry(list(entry.entries)[0]) + elif entry.st_nlink == 1: + self.filesystem.change_disk_usage(-entry.size, pathname_name, entry.st_dev) + + self.st_nlink -= 1 + entry.st_nlink -= 1 + assert entry.st_nlink >= 0 + + del self.entries[to_string(pathname_name)] + + @property + def size(self) -> int: + """Return the total size of all files contained + in this directory tree. + """ + return sum([item[1].size for item in self.entries.items()]) + + @size.setter + def size(self, st_size: int) -> None: + """Setting the size is an error for a directory.""" + raise self.filesystem.raise_os_error(errno.EISDIR, self.path) + + def has_parent_object(self, dir_object: "FakeDirectory") -> bool: + """Return `True` if dir_object is a direct or indirect parent + directory, or if both are the same object.""" + obj: Optional[FakeDirectory] = self + while obj: + if obj == dir_object: + return True + obj = obj.parent_dir + return False + + def __str__(self) -> str: + description = super(FakeDirectory, self).__str__() + ":\n" + for item in self.entries: + item_desc = self.entries[item].__str__() + for line in item_desc.split("\n"): + if line: + description = description + " " + line + "\n" + return description + + +class FakeDirectoryFromRealDirectory(FakeDirectory): + """Represents a fake directory copied from the real file system. + + The contents of the directory are read on demand only. + """ + + def __init__( + self, + source_path: AnyPath, + filesystem: "FakeFilesystem", + read_only: bool, + target_path: Optional[AnyPath] = None, + ): + """ + Args: + source_path: Full directory path. + filesystem: The fake filesystem where the directory is created. + read_only: If set, all files under the directory are treated + as read-only, e.g. a write access raises an exception; + otherwise, writing to the files changes the fake files + only as usually. + target_path: If given, the target path of the directory, + otherwise the target is the same as `source_path`. + + Raises: + OSError: if the directory does not exist in the real file system + """ + target_path = target_path or source_path + real_stat = os.stat(source_path) + super(FakeDirectoryFromRealDirectory, self).__init__( + name=to_string(os.path.split(target_path)[1]), + perm_bits=real_stat.st_mode, + filesystem=filesystem, + ) + + self.st_ctime = real_stat.st_ctime + self.st_atime = real_stat.st_atime + self.st_mtime = real_stat.st_mtime + self.st_gid = real_stat.st_gid + self.st_uid = real_stat.st_uid + self.source_path = source_path # type: ignore + self.read_only = read_only + self.contents_read = False + + @property + def entries(self) -> Dict[str, FakeFile]: + """Return the list of contained directory entries, loading them + if not already loaded.""" + if not self.contents_read: + self.contents_read = True + base = self.path + for entry in os.listdir(self.source_path): + source_path = os.path.join(self.source_path, entry) + target_path = os.path.join(base, entry) # type: ignore + if os.path.islink(source_path): + self.filesystem.add_real_symlink(source_path, target_path) + elif os.path.isdir(source_path): + self.filesystem.add_real_directory( + source_path, self.read_only, target_path=target_path + ) + else: + self.filesystem.add_real_file( + source_path, self.read_only, target_path=target_path + ) + return self._entries + + @property + def size(self) -> int: + # we cannot get the size until the contents are loaded + if not self.contents_read: + return 0 + return super(FakeDirectoryFromRealDirectory, self).size + + @size.setter + def size(self, st_size: int) -> None: + raise self.filesystem.raise_os_error(errno.EISDIR, self.path) + + +class FakeFileWrapper: + """Wrapper for a stream object for use by a FakeFile object. + + If the wrapper has any data written to it, it will propagate to + the FakeFile object on close() or flush(). + """ + + def __init__( + self, + file_object: FakeFile, + file_path: AnyStr, + update: bool, + read: bool, + append: bool, + delete_on_close: bool, + filesystem: "FakeFilesystem", + newline: Optional[str], + binary: bool, + closefd: bool, + encoding: Optional[str], + errors: Optional[str], + buffering: int, + raw_io: bool, + is_stream: bool = False, + ): + self.file_object = file_object + self.file_path = file_path # type: ignore[var-annotated] + self._append = append + self._read = read + self.allow_update = update + self._closefd = closefd + self._file_epoch = file_object.epoch + self.raw_io = raw_io + self._binary = binary + self.is_stream = is_stream + self._changed = False + self._buffer_size = buffering + if self._buffer_size == 0 and not binary: + raise ValueError("can't have unbuffered text I/O") + # buffer_size is ignored in text mode + elif self._buffer_size == -1 or not binary: + self._buffer_size = io.DEFAULT_BUFFER_SIZE + self._use_line_buffer = not binary and buffering == 1 + + contents = file_object.byte_contents + self._encoding = encoding or locale.getpreferredencoding(False) + errors = errors or "strict" + self._io: Union[BinaryBufferIO, TextBufferIO] = ( + BinaryBufferIO(contents) + if binary + else TextBufferIO( + contents, encoding=encoding, newline=newline, errors=errors + ) + ) + self._read_whence = 0 + self._read_seek = 0 + self._flush_pos = 0 + if contents: + self._flush_pos = len(contents) + if update: + if not append: + self._io.seek(0) + else: + self._io.seek(self._flush_pos) + self._read_seek = self._io.tell() + + if delete_on_close: + assert filesystem, "delete_on_close=True requires filesystem" + self._filesystem = filesystem + self.delete_on_close = delete_on_close + # override, don't modify FakeFile.name, as FakeFilesystem expects + # it to be the file name only, no directories. + self.name = file_object.opened_as + self.filedes: Optional[int] = None + + def __enter__(self) -> "FakeFileWrapper": + """To support usage of this fake file with the 'with' statement.""" + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + """To support usage of this fake file with the 'with' statement.""" + self.close() + + def _raise(self, message: str) -> NoReturn: + if self.raw_io: + self._filesystem.raise_os_error(errno.EBADF, self.file_path) + raise io.UnsupportedOperation(message) + + def get_object(self) -> FakeFile: + """Return the FakeFile object that is wrapped + by the current instance. + """ + return self.file_object + + def fileno(self) -> int: + """Return the file descriptor of the file object.""" + if self.filedes is not None: + return self.filedes + raise OSError(errno.EBADF, "Invalid file descriptor") + + def close(self) -> None: + """Close the file.""" + # ignore closing a closed file + if not self._is_open(): + return + + # for raw io, all writes are flushed immediately + if self.allow_update and not self.raw_io: + self.flush() + if self._filesystem.is_windows_fs and self._changed: + self.file_object.st_mtime = helpers.now() + + assert self.filedes is not None + if self._closefd: + self._filesystem._close_open_file(self.filedes) + else: + open_files = self._filesystem.open_files[self.filedes] + assert open_files is not None + open_files.remove(self) + if self.delete_on_close: + self._filesystem.remove_object( + self.get_object().path # type: ignore[arg-type] + ) + + @property + def closed(self) -> bool: + """Simulate the `closed` attribute on file.""" + return not self._is_open() + + def _try_flush(self, old_pos: int) -> None: + """Try to flush and reset the position if it fails.""" + flush_pos = self._flush_pos + try: + self.flush() + except OSError: + # write failed - reset to previous position + self._io.seek(old_pos) + self._io.truncate() + self._flush_pos = flush_pos + raise + + def flush(self) -> None: + """Flush file contents to 'disk'.""" + self._check_open_file() + if self.allow_update and not self.is_stream: + contents = self._io.getvalue() + if self._append: + self._sync_io() + old_contents = self.file_object.byte_contents + assert old_contents is not None + contents = old_contents + contents[self._flush_pos :] + self._set_stream_contents(contents) + else: + self._io.flush() + changed = self.file_object.set_contents(contents, self._encoding) + self.update_flush_pos() + if changed: + if self._filesystem.is_windows_fs: + self._changed = True + else: + current_time = helpers.now() + self.file_object.st_ctime = current_time + self.file_object.st_mtime = current_time + self._file_epoch = self.file_object.epoch + + if not self.is_stream: + self._flush_related_files() + + def update_flush_pos(self) -> None: + self._flush_pos = self._io.tell() + + def _flush_related_files(self) -> None: + for open_files in self._filesystem.open_files[3:]: + if open_files is not None: + for open_file in open_files: + if ( + open_file is not self + and isinstance(open_file, FakeFileWrapper) + and self.file_object == open_file.file_object + and not open_file._append + ): + open_file._sync_io() + + def seek(self, offset: int, whence: int = 0) -> None: + """Move read/write pointer in 'file'.""" + self._check_open_file() + if not self._append: + self._io.seek(offset, whence) + else: + self._read_seek = offset + self._read_whence = whence + if not self.is_stream: + self.flush() + + def tell(self) -> int: + """Return the file's current position. + + Returns: + int, file's current position in bytes. + """ + self._check_open_file() + if not self.is_stream: + self.flush() + + if not self._append: + return self._io.tell() + if self._read_whence: + write_seek = self._io.tell() + self._io.seek(self._read_seek, self._read_whence) + self._read_seek = self._io.tell() + self._read_whence = 0 + self._io.seek(write_seek) + return self._read_seek + + def _sync_io(self) -> None: + """Update the stream with changes to the file object contents.""" + if self._file_epoch == self.file_object.epoch: + return + + contents = self.file_object.byte_contents + assert contents is not None + self._set_stream_contents(contents) + self._file_epoch = self.file_object.epoch + + def _set_stream_contents(self, contents: bytes) -> None: + whence = self._io.tell() + self._io.seek(0) + self._io.truncate() + self._io.putvalue(contents) + if not self._append: + self._io.seek(whence) + + def _read_wrappers(self, name: str) -> Callable: + """Wrap a stream attribute in a read wrapper. + + Returns a read_wrapper which tracks our own read pointer since the + stream object has no concept of a different read and write pointer. + + Args: + name: The name of the attribute to wrap. Should be a read call. + + Returns: + The read_wrapper function. + """ + io_attr = getattr(self._io, name) + + def read_wrapper(*args, **kwargs): + """Wrap all read calls to the stream object. + + We do this to track the read pointer separate from the write + pointer. Anything that wants to read from the stream object + while we're in append mode goes through this. + + Args: + *args: pass through args + **kwargs: pass through kwargs + Returns: + Wrapped stream object method + """ + self._io.seek(self._read_seek, self._read_whence) + ret_value = io_attr(*args, **kwargs) + self._read_seek = self._io.tell() + self._read_whence = 0 + self._io.seek(0, 2) + return ret_value + + return read_wrapper + + def _other_wrapper(self, name: str) -> Callable: + """Wrap a stream attribute in an other_wrapper. + + Args: + name: the name of the stream attribute to wrap. + + Returns: + other_wrapper which is described below. + """ + io_attr = getattr(self._io, name) + + def other_wrapper(*args, **kwargs): + """Wrap all other calls to the stream Object. + + We do this to track changes to the write pointer. Anything that + moves the write pointer in a file open for appending should move + the read pointer as well. + + Args: + *args: Pass through args. + **kwargs: Pass through kwargs. + + Returns: + Wrapped stream object method. + """ + write_seek = self._io.tell() + ret_value = io_attr(*args, **kwargs) + if write_seek != self._io.tell(): + self._read_seek = self._io.tell() + self._read_whence = 0 + + return ret_value + + return other_wrapper + + def _write_wrapper(self, name: str) -> Callable: + """Wrap a stream attribute in a write_wrapper. + + Args: + name: the name of the stream attribute to wrap. + + Returns: + write_wrapper which is described below. + """ + io_attr = getattr(self._io, name) + + def write_wrapper(*args, **kwargs): + """Wrap all other calls to the stream Object. + + We do this to track changes to the write pointer. Anything that + moves the write pointer in a file open for appending should move + the read pointer as well. + + Args: + *args: Pass through args. + **kwargs: Pass through kwargs. + + Returns: + Wrapped stream object method. + """ + old_pos = self._io.tell() + ret_value = io_attr(*args, **kwargs) + new_pos = self._io.tell() + + # if the buffer size is exceeded, we flush + use_line_buf = self._use_line_buffer and "\n" in args[0] + if new_pos - self._flush_pos > self._buffer_size or use_line_buf: + flush_all = new_pos - old_pos > self._buffer_size or use_line_buf + # if the current write does not exceed the buffer size, + # we revert to the previous position and flush that, + # otherwise we flush all + if not flush_all: + self._io.seek(old_pos) + self._io.truncate() + self._try_flush(old_pos) + if not flush_all: + ret_value = io_attr(*args, **kwargs) + if self._append: + self._read_seek = self._io.tell() + self._read_whence = 0 + return ret_value + + return write_wrapper + + def _adapt_size_for_related_files(self, size: int) -> None: + for open_files in self._filesystem.open_files[3:]: + if open_files is not None: + for open_file in open_files: + if ( + open_file is not self + and isinstance(open_file, FakeFileWrapper) + and self.file_object == open_file.file_object + and cast(FakeFileWrapper, open_file)._append + ): + open_file._read_seek += size + + def _truncate_wrapper(self) -> Callable: + """Wrap truncate() to allow flush after truncate. + + Returns: + Wrapper which is described below. + """ + io_attr = self._io.truncate + + def truncate_wrapper(*args, **kwargs): + """Wrap truncate call to call flush after truncate.""" + if self._append: + self._io.seek(self._read_seek, self._read_whence) + size = io_attr(*args, **kwargs) + self.flush() + if not self.is_stream: + self.file_object.size = size + buffer_size = len(self._io.getvalue()) + if buffer_size < size: + self._io.seek(buffer_size) + self._io.putvalue(b"\0" * (size - buffer_size)) + self.file_object.set_contents(self._io.getvalue(), self._encoding) + self._flush_pos = size + self._adapt_size_for_related_files(size - buffer_size) + + self.flush() + return size + + return truncate_wrapper + + def size(self) -> int: + """Return the content size in bytes of the wrapped file.""" + return self.file_object.st_size + + def __getattr__(self, name: str) -> Any: + if self.file_object.is_large_file(): + raise FakeLargeFileIoException(self.file_path) + + reading = name.startswith("read") or name == "next" + truncate = name == "truncate" + writing = name.startswith("write") or truncate + + if reading or writing: + self._check_open_file() + if not self._read and reading: + return self._read_error() + if not self.allow_update and writing: + return self._write_error() + + if reading: + self._sync_io() + if not self.is_stream: + self.flush() + if not self._filesystem.is_windows_fs: + self.file_object.st_atime = helpers.now() + if truncate: + return self._truncate_wrapper() + if self._append: + if reading: + return self._read_wrappers(name) + elif not writing: + return self._other_wrapper(name) + if writing: + return self._write_wrapper(name) + + return getattr(self._io, name) + + def _read_error(self) -> Callable: + def read_error(*args, **kwargs): + """Throw an error unless the argument is zero.""" + if args and args[0] == 0: + if self._filesystem.is_windows_fs and self.raw_io: + return b"" if self._binary else "" + self._raise("File is not open for reading.") + + return read_error + + def _write_error(self) -> Callable: + def write_error(*args, **kwargs): + """Throw an error.""" + if self.raw_io: + if self._filesystem.is_windows_fs and args and len(args[0]) == 0: + return 0 + self._raise("File is not open for writing.") + + return write_error + + def _is_open(self) -> bool: + if self.filedes is not None and self.filedes < len(self._filesystem.open_files): + open_files = self._filesystem.open_files[self.filedes] + if open_files is not None and self in open_files: + return True + return False + + def _check_open_file(self) -> None: + if not self.is_stream and not self._is_open(): + raise ValueError("I/O operation on closed file") + + def __iter__(self) -> Union[Iterator[str], Iterator[bytes]]: + if not self._read: + self._raise("File is not open for reading") + return self._io.__iter__() + + def __next__(self): + if not self._read: + self._raise("File is not open for reading") + return next(self._io) + + +class StandardStreamWrapper: + """Wrapper for a system standard stream to be used in open files list.""" + + def __init__(self, stream_object: TextIO): + self._stream_object = stream_object + self.filedes: Optional[int] = None + + def get_object(self) -> TextIO: + return self._stream_object + + def fileno(self) -> int: + """Return the file descriptor of the wrapped standard stream.""" + if self.filedes is not None: + return self.filedes + raise OSError(errno.EBADF, "Invalid file descriptor") + + def read(self, n: int = -1) -> bytes: + return cast(bytes, self._stream_object.read()) + + def close(self) -> None: + """We do not support closing standard streams.""" + + def is_stream(self) -> bool: + return True + + +class FakeDirWrapper: + """Wrapper for a FakeDirectory object to be used in open files list.""" + + def __init__( + self, + file_object: FakeDirectory, + file_path: AnyString, + filesystem: "FakeFilesystem", + ): + self.file_object = file_object + self.file_path = file_path + self._filesystem = filesystem + self.filedes: Optional[int] = None + + def get_object(self) -> FakeDirectory: + """Return the FakeFile object that is wrapped by the current + instance.""" + return self.file_object + + def fileno(self) -> int: + """Return the file descriptor of the file object.""" + if self.filedes is not None: + return self.filedes + raise OSError(errno.EBADF, "Invalid file descriptor") + + def close(self) -> None: + """Close the directory.""" + assert self.filedes is not None + self._filesystem._close_open_file(self.filedes) + + +class FakePipeWrapper: + """Wrapper for a read or write descriptor of a real pipe object to be + used in open files list. + """ + + def __init__( + self, + filesystem: "FakeFilesystem", + fd: int, + can_write: bool, + mode: str = "", + ): + self._filesystem = filesystem + self.fd = fd # the real file descriptor + self.can_write = can_write + self.file_object = None + self.filedes: Optional[int] = None + self.real_file = None + if mode: + self.real_file = open(fd, mode) + + def __enter__(self) -> "FakePipeWrapper": + """To support usage of this fake pipe with the 'with' statement.""" + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + """To support usage of this fake pipe with the 'with' statement.""" + self.close() + + def get_object(self) -> None: + return self.file_object + + def fileno(self) -> int: + """Return the fake file descriptor of the pipe object.""" + if self.filedes is not None: + return self.filedes + raise OSError(errno.EBADF, "Invalid file descriptor") + + def read(self, numBytes: int = -1) -> bytes: + """Read from the real pipe.""" + if self.real_file: + return self.real_file.read(numBytes) # pytype: disable=bad-return-type + return os.read(self.fd, numBytes) + + def flush(self) -> None: + """Flush the real pipe?""" + + def write(self, contents: bytes) -> int: + """Write to the real pipe.""" + if self.real_file: + return self.real_file.write(contents) + return os.write(self.fd, contents) + + def close(self) -> None: + """Close the pipe descriptor.""" + assert self.filedes is not None + open_files = self._filesystem.open_files[self.filedes] + assert open_files is not None + open_files.remove(self) + if self.real_file: + self.real_file.close() + else: + os.close(self.fd) + + def readable(self) -> bool: + """The pipe end can either be readable or writable.""" + return not self.can_write + + def writable(self) -> bool: + """The pipe end can either be readable or writable.""" + return self.can_write + + def seekable(self) -> bool: + """A pipe is not seekable.""" + return False |