diff options
Diffstat (limited to 'pyfakefs/helpers.py')
-rw-r--r-- | pyfakefs/helpers.py | 226 |
1 files changed, 144 insertions, 82 deletions
diff --git a/pyfakefs/helpers.py b/pyfakefs/helpers.py index ba75d2a..5d5d590 100644 --- a/pyfakefs/helpers.py +++ b/pyfakefs/helpers.py @@ -22,12 +22,75 @@ from copy import copy from stat import S_IFLNK from typing import Union, Optional, Any, AnyStr, overload, cast -IS_PYPY = platform.python_implementation() == 'PyPy' -IS_WIN = sys.platform == 'win32' -IN_DOCKER = os.path.exists('/.dockerenv') - +AnyString = Union[str, bytes] AnyPath = Union[AnyStr, os.PathLike] +IS_PYPY = platform.python_implementation() == "PyPy" +IS_WIN = sys.platform == "win32" +IN_DOCKER = os.path.exists("/.dockerenv") + +PERM_READ = 0o400 # Read permission bit. +PERM_WRITE = 0o200 # Write permission bit. +PERM_EXE = 0o100 # Execute permission bit. +PERM_DEF = 0o777 # Default permission bits. +PERM_DEF_FILE = 0o666 # Default permission bits (regular file) +PERM_ALL = 0o7777 # All permission bits. + +if sys.platform == "win32": + USER_ID = 1 + GROUP_ID = 1 +else: + USER_ID = os.getuid() + GROUP_ID = os.getgid() + + +def get_uid() -> int: + """Get the global user id. Same as ``os.getuid()``""" + return USER_ID + + +def set_uid(uid: int) -> None: + """Set the global user id. This is used as st_uid for new files + and to differentiate between a normal user and the root user (uid 0). + For the root user, some permission restrictions are ignored. + + Args: + uid: (int) the user ID of the user calling the file system functions. + """ + global USER_ID + USER_ID = uid + + +def get_gid() -> int: + """Get the global group id. Same as ``os.getgid()``""" + return GROUP_ID + + +def set_gid(gid: int) -> None: + """Set the global group id. This is only used to set st_gid for new files, + no permission checks are performed. + + Args: + gid: (int) the group ID of the user calling the file system functions. + """ + global GROUP_ID + GROUP_ID = gid + + +def reset_ids() -> None: + """Set the global user ID and group ID back to default values.""" + if sys.platform == "win32": + set_uid(1) + set_gid(1) + else: + set_uid(os.getuid()) + set_gid(os.getgid()) + + +def is_root() -> bool: + """Return True if the current user is the root user.""" + return USER_ID == 0 + def is_int_type(val: Any) -> bool: """Return True if `val` is of integer type.""" @@ -37,30 +100,32 @@ def is_int_type(val: Any) -> bool: def is_byte_string(val: Any) -> bool: """Return True if `val` is a bytes-like object, False for a unicode string.""" - return not hasattr(val, 'encode') + return not hasattr(val, "encode") def is_unicode_string(val: Any) -> bool: """Return True if `val` is a unicode string, False for a bytes-like object.""" - return hasattr(val, 'encode') + return hasattr(val, "encode") @overload -def make_string_path(dir_name: AnyStr) -> AnyStr: ... +def make_string_path(dir_name: AnyStr) -> AnyStr: + ... @overload -def make_string_path(dir_name: os.PathLike) -> str: ... +def make_string_path(dir_name: os.PathLike) -> str: + ... def make_string_path(dir_name: AnyPath) -> AnyStr: - return cast(AnyStr, os.fspath(dir_name)) + return cast(AnyStr, os.fspath(dir_name)) # pytype: disable=invalid-annotation def to_string(path: Union[AnyStr, Union[str, bytes]]) -> str: """Return the string representation of a byte string using the preferred - encoding, or the string itself if path is a str.""" + encoding, or the string itself if path is a str.""" if isinstance(path, bytes): return path.decode(locale.getpreferredencoding(False)) return path @@ -68,7 +133,7 @@ def to_string(path: Union[AnyStr, Union[str, bytes]]) -> str: def to_bytes(path: Union[AnyStr, Union[str, bytes]]) -> bytes: """Return the bytes representation of a string using the preferred - encoding, or the byte string itself if path is a byte string.""" + encoding, or the byte string itself if path is a byte string.""" if isinstance(path, str): return bytes(path, locale.getpreferredencoding(False)) return path @@ -93,19 +158,23 @@ def now(): @overload -def matching_string(matched: bytes, string: AnyStr) -> bytes: ... +def matching_string(matched: bytes, string: AnyStr) -> bytes: + ... @overload -def matching_string(matched: str, string: AnyStr) -> str: ... +def matching_string(matched: str, string: AnyStr) -> str: + ... @overload -def matching_string(matched: AnyStr, string: None) -> None: ... +def matching_string(matched: AnyStr, string: None) -> None: + ... def matching_string( # type: ignore[misc] - matched: AnyStr, string: Optional[AnyStr]) -> Optional[AnyStr]: + matched: AnyStr, string: Optional[AnyStr] +) -> Optional[AnyString]: """Return the string as byte or unicode depending on the type of matched, assuming string is an ASCII string. """ @@ -113,7 +182,7 @@ def matching_string( # type: ignore[misc] return string if isinstance(matched, bytes) and isinstance(string, str): return string.encode(locale.getpreferredencoding(False)) - return string + return string # pytype: disable=bad-return-type class FakeStatResult: @@ -121,11 +190,14 @@ class FakeStatResult: This is needed as `os.stat_result` has no possibility to set nanosecond times directly. """ - _stat_float_times: bool = True - def __init__(self, is_windows: bool, user_id: int, group_id: int, - initial_time: Optional[float] = None): - self._use_float: Optional[bool] = None + def __init__( + self, + is_windows: bool, + user_id: int, + group_id: int, + initial_time: Optional[float] = None, + ): self.st_mode: int = 0 self.st_ino: Optional[int] = None self.st_dev: int = 0 @@ -138,29 +210,19 @@ class FakeStatResult: self._st_mtime_ns: int = self._st_atime_ns self._st_ctime_ns: int = self._st_atime_ns - @property - def use_float(self) -> bool: - if self._use_float is None: - return self.stat_float_times() - return self._use_float - - @use_float.setter - def use_float(self, val: bool) -> None: - self._use_float = val - def __eq__(self, other: Any) -> bool: return ( - isinstance(other, FakeStatResult) and - self._st_atime_ns == other._st_atime_ns and - self._st_ctime_ns == other._st_ctime_ns and - self._st_mtime_ns == other._st_mtime_ns and - self.st_size == other.st_size and - self.st_gid == other.st_gid and - self.st_uid == other.st_uid and - self.st_nlink == other.st_nlink and - self.st_dev == other.st_dev and - self.st_ino == other.st_ino and - self.st_mode == other.st_mode + isinstance(other, FakeStatResult) + and self._st_atime_ns == other._st_atime_ns + and self._st_ctime_ns == other._st_ctime_ns + and self._st_mtime_ns == other._st_mtime_ns + and self.st_size == other.st_size + and self.st_gid == other.st_gid + and self.st_uid == other.st_uid + and self.st_nlink == other.st_nlink + and self.st_dev == other.st_dev + and self.st_ino == other.st_ino + and self.st_mode == other.st_mode ) def __ne__(self, other: Any) -> bool: @@ -171,7 +233,6 @@ class FakeStatResult: behavior of the real os.stat_result. """ stat_result = copy(self) - stat_result.use_float = self.use_float return stat_result def set_from_stat_result(self, stat_result: os.stat_result) -> None: @@ -187,27 +248,10 @@ class FakeStatResult: self._st_mtime_ns = stat_result.st_mtime_ns self._st_ctime_ns = stat_result.st_ctime_ns - @classmethod - def stat_float_times(cls, newvalue: Optional[bool] = None) -> bool: - """Determine whether a file's time stamps are reported as floats - or ints. - - Calling without arguments returns the current value. - The value is shared by all instances of FakeOsModule. - - Args: - newvalue: If `True`, mtime, ctime, atime are reported as floats. - Otherwise, they are returned as ints (rounding down). - """ - if newvalue is not None: - cls._stat_float_times = bool(newvalue) - return cls._stat_float_times - @property def st_ctime(self) -> Union[int, float]: """Return the creation time in seconds.""" - ctime = self._st_ctime_ns / 1e9 - return ctime if self.use_float else int(ctime) + return self._st_ctime_ns / 1e9 @st_ctime.setter def st_ctime(self, val: Union[int, float]) -> None: @@ -217,8 +261,7 @@ class FakeStatResult: @property def st_atime(self) -> Union[int, float]: """Return the access time in seconds.""" - atime = self._st_atime_ns / 1e9 - return atime if self.use_float else int(atime) + return self._st_atime_ns / 1e9 @st_atime.setter def st_atime(self, val: Union[int, float]) -> None: @@ -228,8 +271,7 @@ class FakeStatResult: @property def st_mtime(self) -> Union[int, float]: """Return the modification time in seconds.""" - mtime = self._st_mtime_ns / 1e9 - return mtime if self.use_float else int(mtime) + return self._st_mtime_ns / 1e9 @st_mtime.setter def st_mtime(self, val: Union[int, float]) -> None: @@ -247,27 +289,45 @@ class FakeStatResult: self._st_size = val @property + def st_blocks(self) -> int: + """Return the number of 512-byte blocks allocated for the file. + Assumes a page size of 4096 (matches most systems). + Ignores that this may not be available under some systems, + and that the result may differ if the file has holes. + """ + if self.is_windows: + raise AttributeError("'os.stat_result' object has no attribute 'st_blocks'") + page_size = 4096 + blocks_in_page = page_size // 512 + pages = self._st_size // page_size + if self._st_size % page_size: + pages += 1 + return pages * blocks_in_page + + @property def st_file_attributes(self) -> int: if not self.is_windows: - raise AttributeError("module 'os.stat_result' " - "has no attribute 'st_file_attributes'") + raise AttributeError( + "module 'os.stat_result' " "has no attribute 'st_file_attributes'" + ) mode = 0 st_mode = self.st_mode if st_mode & stat.S_IFDIR: - mode |= stat.FILE_ATTRIBUTE_DIRECTORY + mode |= stat.FILE_ATTRIBUTE_DIRECTORY # type:ignore[attr-defined] if st_mode & stat.S_IFREG: - mode |= stat.FILE_ATTRIBUTE_NORMAL + mode |= stat.FILE_ATTRIBUTE_NORMAL # type:ignore[attr-defined] if st_mode & (stat.S_IFCHR | stat.S_IFBLK): - mode |= stat.FILE_ATTRIBUTE_DEVICE + mode |= stat.FILE_ATTRIBUTE_DEVICE # type:ignore[attr-defined] if st_mode & stat.S_IFLNK: - mode |= stat.FILE_ATTRIBUTE_REPARSE_POINT + mode |= stat.FILE_ATTRIBUTE_REPARSE_POINT # type:ignore return mode @property def st_reparse_tag(self) -> int: if not self.is_windows or sys.version_info < (3, 8): - raise AttributeError("module 'os.stat_result' " - "has no attribute 'st_reparse_tag'") + raise AttributeError( + "module 'os.stat_result' " "has no attribute 'st_reparse_tag'" + ) if self.st_mode & stat.S_IFLNK: return stat.IO_REPARSE_TAG_SYMLINK # type: ignore[attr-defined] return 0 @@ -297,7 +357,7 @@ class FakeStatResult: return int(self.st_mtime) if item == stat.ST_CTIME: return int(self.st_ctime) - raise ValueError('Invalid item') + raise ValueError("Invalid item") @property def st_atime_ns(self) -> int: @@ -334,21 +394,23 @@ class BinaryBufferIO(io.BytesIO): """Stream class that handles byte contents for files.""" def __init__(self, contents: Optional[bytes]): - super().__init__(contents or b'') + super().__init__(contents or b"") def putvalue(self, value: bytes) -> None: self.write(value) class TextBufferIO(io.TextIOWrapper): - """Stream class that handles Python string contents for files. - """ - - def __init__(self, contents: Optional[bytes] = None, - newline: Optional[str] = None, - encoding: Optional[str] = None, - errors: str = 'strict'): - self._bytestream = io.BytesIO(contents or b'') + """Stream class that handles Python string contents for files.""" + + def __init__( + self, + contents: Optional[bytes] = None, + newline: Optional[str] = None, + encoding: Optional[str] = None, + errors: str = "strict", + ): + self._bytestream = io.BytesIO(contents or b"") super().__init__(self._bytestream, encoding, errors, newline) def getvalue(self) -> bytes: |