aboutsummaryrefslogtreecommitdiff
path: root/pyfakefs/helpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyfakefs/helpers.py')
-rw-r--r--pyfakefs/helpers.py226
1 files changed, 144 insertions, 82 deletions
diff --git a/pyfakefs/helpers.py b/pyfakefs/helpers.py
index ba75d2a..5d5d590 100644
--- a/pyfakefs/helpers.py
+++ b/pyfakefs/helpers.py
@@ -22,12 +22,75 @@ from copy import copy
from stat import S_IFLNK
from typing import Union, Optional, Any, AnyStr, overload, cast
-IS_PYPY = platform.python_implementation() == 'PyPy'
-IS_WIN = sys.platform == 'win32'
-IN_DOCKER = os.path.exists('/.dockerenv')
-
+AnyString = Union[str, bytes]
AnyPath = Union[AnyStr, os.PathLike]
+IS_PYPY = platform.python_implementation() == "PyPy"
+IS_WIN = sys.platform == "win32"
+IN_DOCKER = os.path.exists("/.dockerenv")
+
+PERM_READ = 0o400 # Read permission bit.
+PERM_WRITE = 0o200 # Write permission bit.
+PERM_EXE = 0o100 # Execute permission bit.
+PERM_DEF = 0o777 # Default permission bits.
+PERM_DEF_FILE = 0o666 # Default permission bits (regular file)
+PERM_ALL = 0o7777 # All permission bits.
+
+if sys.platform == "win32":
+ USER_ID = 1
+ GROUP_ID = 1
+else:
+ USER_ID = os.getuid()
+ GROUP_ID = os.getgid()
+
+
+def get_uid() -> int:
+ """Get the global user id. Same as ``os.getuid()``"""
+ return USER_ID
+
+
+def set_uid(uid: int) -> None:
+ """Set the global user id. This is used as st_uid for new files
+ and to differentiate between a normal user and the root user (uid 0).
+ For the root user, some permission restrictions are ignored.
+
+ Args:
+ uid: (int) the user ID of the user calling the file system functions.
+ """
+ global USER_ID
+ USER_ID = uid
+
+
+def get_gid() -> int:
+ """Get the global group id. Same as ``os.getgid()``"""
+ return GROUP_ID
+
+
+def set_gid(gid: int) -> None:
+ """Set the global group id. This is only used to set st_gid for new files,
+ no permission checks are performed.
+
+ Args:
+ gid: (int) the group ID of the user calling the file system functions.
+ """
+ global GROUP_ID
+ GROUP_ID = gid
+
+
+def reset_ids() -> None:
+ """Set the global user ID and group ID back to default values."""
+ if sys.platform == "win32":
+ set_uid(1)
+ set_gid(1)
+ else:
+ set_uid(os.getuid())
+ set_gid(os.getgid())
+
+
+def is_root() -> bool:
+ """Return True if the current user is the root user."""
+ return USER_ID == 0
+
def is_int_type(val: Any) -> bool:
"""Return True if `val` is of integer type."""
@@ -37,30 +100,32 @@ def is_int_type(val: Any) -> bool:
def is_byte_string(val: Any) -> bool:
"""Return True if `val` is a bytes-like object, False for a unicode
string."""
- return not hasattr(val, 'encode')
+ return not hasattr(val, "encode")
def is_unicode_string(val: Any) -> bool:
"""Return True if `val` is a unicode string, False for a bytes-like
object."""
- return hasattr(val, 'encode')
+ return hasattr(val, "encode")
@overload
-def make_string_path(dir_name: AnyStr) -> AnyStr: ...
+def make_string_path(dir_name: AnyStr) -> AnyStr:
+ ...
@overload
-def make_string_path(dir_name: os.PathLike) -> str: ...
+def make_string_path(dir_name: os.PathLike) -> str:
+ ...
def make_string_path(dir_name: AnyPath) -> AnyStr:
- return cast(AnyStr, os.fspath(dir_name))
+ return cast(AnyStr, os.fspath(dir_name)) # pytype: disable=invalid-annotation
def to_string(path: Union[AnyStr, Union[str, bytes]]) -> str:
"""Return the string representation of a byte string using the preferred
- encoding, or the string itself if path is a str."""
+ encoding, or the string itself if path is a str."""
if isinstance(path, bytes):
return path.decode(locale.getpreferredencoding(False))
return path
@@ -68,7 +133,7 @@ def to_string(path: Union[AnyStr, Union[str, bytes]]) -> str:
def to_bytes(path: Union[AnyStr, Union[str, bytes]]) -> bytes:
"""Return the bytes representation of a string using the preferred
- encoding, or the byte string itself if path is a byte string."""
+ encoding, or the byte string itself if path is a byte string."""
if isinstance(path, str):
return bytes(path, locale.getpreferredencoding(False))
return path
@@ -93,19 +158,23 @@ def now():
@overload
-def matching_string(matched: bytes, string: AnyStr) -> bytes: ...
+def matching_string(matched: bytes, string: AnyStr) -> bytes:
+ ...
@overload
-def matching_string(matched: str, string: AnyStr) -> str: ...
+def matching_string(matched: str, string: AnyStr) -> str:
+ ...
@overload
-def matching_string(matched: AnyStr, string: None) -> None: ...
+def matching_string(matched: AnyStr, string: None) -> None:
+ ...
def matching_string( # type: ignore[misc]
- matched: AnyStr, string: Optional[AnyStr]) -> Optional[AnyStr]:
+ matched: AnyStr, string: Optional[AnyStr]
+) -> Optional[AnyString]:
"""Return the string as byte or unicode depending
on the type of matched, assuming string is an ASCII string.
"""
@@ -113,7 +182,7 @@ def matching_string( # type: ignore[misc]
return string
if isinstance(matched, bytes) and isinstance(string, str):
return string.encode(locale.getpreferredencoding(False))
- return string
+ return string # pytype: disable=bad-return-type
class FakeStatResult:
@@ -121,11 +190,14 @@ class FakeStatResult:
This is needed as `os.stat_result` has no possibility to set
nanosecond times directly.
"""
- _stat_float_times: bool = True
- def __init__(self, is_windows: bool, user_id: int, group_id: int,
- initial_time: Optional[float] = None):
- self._use_float: Optional[bool] = None
+ def __init__(
+ self,
+ is_windows: bool,
+ user_id: int,
+ group_id: int,
+ initial_time: Optional[float] = None,
+ ):
self.st_mode: int = 0
self.st_ino: Optional[int] = None
self.st_dev: int = 0
@@ -138,29 +210,19 @@ class FakeStatResult:
self._st_mtime_ns: int = self._st_atime_ns
self._st_ctime_ns: int = self._st_atime_ns
- @property
- def use_float(self) -> bool:
- if self._use_float is None:
- return self.stat_float_times()
- return self._use_float
-
- @use_float.setter
- def use_float(self, val: bool) -> None:
- self._use_float = val
-
def __eq__(self, other: Any) -> bool:
return (
- isinstance(other, FakeStatResult) and
- self._st_atime_ns == other._st_atime_ns and
- self._st_ctime_ns == other._st_ctime_ns and
- self._st_mtime_ns == other._st_mtime_ns and
- self.st_size == other.st_size and
- self.st_gid == other.st_gid and
- self.st_uid == other.st_uid and
- self.st_nlink == other.st_nlink and
- self.st_dev == other.st_dev and
- self.st_ino == other.st_ino and
- self.st_mode == other.st_mode
+ isinstance(other, FakeStatResult)
+ and self._st_atime_ns == other._st_atime_ns
+ and self._st_ctime_ns == other._st_ctime_ns
+ and self._st_mtime_ns == other._st_mtime_ns
+ and self.st_size == other.st_size
+ and self.st_gid == other.st_gid
+ and self.st_uid == other.st_uid
+ and self.st_nlink == other.st_nlink
+ and self.st_dev == other.st_dev
+ and self.st_ino == other.st_ino
+ and self.st_mode == other.st_mode
)
def __ne__(self, other: Any) -> bool:
@@ -171,7 +233,6 @@ class FakeStatResult:
behavior of the real os.stat_result.
"""
stat_result = copy(self)
- stat_result.use_float = self.use_float
return stat_result
def set_from_stat_result(self, stat_result: os.stat_result) -> None:
@@ -187,27 +248,10 @@ class FakeStatResult:
self._st_mtime_ns = stat_result.st_mtime_ns
self._st_ctime_ns = stat_result.st_ctime_ns
- @classmethod
- def stat_float_times(cls, newvalue: Optional[bool] = None) -> bool:
- """Determine whether a file's time stamps are reported as floats
- or ints.
-
- Calling without arguments returns the current value.
- The value is shared by all instances of FakeOsModule.
-
- Args:
- newvalue: If `True`, mtime, ctime, atime are reported as floats.
- Otherwise, they are returned as ints (rounding down).
- """
- if newvalue is not None:
- cls._stat_float_times = bool(newvalue)
- return cls._stat_float_times
-
@property
def st_ctime(self) -> Union[int, float]:
"""Return the creation time in seconds."""
- ctime = self._st_ctime_ns / 1e9
- return ctime if self.use_float else int(ctime)
+ return self._st_ctime_ns / 1e9
@st_ctime.setter
def st_ctime(self, val: Union[int, float]) -> None:
@@ -217,8 +261,7 @@ class FakeStatResult:
@property
def st_atime(self) -> Union[int, float]:
"""Return the access time in seconds."""
- atime = self._st_atime_ns / 1e9
- return atime if self.use_float else int(atime)
+ return self._st_atime_ns / 1e9
@st_atime.setter
def st_atime(self, val: Union[int, float]) -> None:
@@ -228,8 +271,7 @@ class FakeStatResult:
@property
def st_mtime(self) -> Union[int, float]:
"""Return the modification time in seconds."""
- mtime = self._st_mtime_ns / 1e9
- return mtime if self.use_float else int(mtime)
+ return self._st_mtime_ns / 1e9
@st_mtime.setter
def st_mtime(self, val: Union[int, float]) -> None:
@@ -247,27 +289,45 @@ class FakeStatResult:
self._st_size = val
@property
+ def st_blocks(self) -> int:
+ """Return the number of 512-byte blocks allocated for the file.
+ Assumes a page size of 4096 (matches most systems).
+ Ignores that this may not be available under some systems,
+ and that the result may differ if the file has holes.
+ """
+ if self.is_windows:
+ raise AttributeError("'os.stat_result' object has no attribute 'st_blocks'")
+ page_size = 4096
+ blocks_in_page = page_size // 512
+ pages = self._st_size // page_size
+ if self._st_size % page_size:
+ pages += 1
+ return pages * blocks_in_page
+
+ @property
def st_file_attributes(self) -> int:
if not self.is_windows:
- raise AttributeError("module 'os.stat_result' "
- "has no attribute 'st_file_attributes'")
+ raise AttributeError(
+ "module 'os.stat_result' " "has no attribute 'st_file_attributes'"
+ )
mode = 0
st_mode = self.st_mode
if st_mode & stat.S_IFDIR:
- mode |= stat.FILE_ATTRIBUTE_DIRECTORY
+ mode |= stat.FILE_ATTRIBUTE_DIRECTORY # type:ignore[attr-defined]
if st_mode & stat.S_IFREG:
- mode |= stat.FILE_ATTRIBUTE_NORMAL
+ mode |= stat.FILE_ATTRIBUTE_NORMAL # type:ignore[attr-defined]
if st_mode & (stat.S_IFCHR | stat.S_IFBLK):
- mode |= stat.FILE_ATTRIBUTE_DEVICE
+ mode |= stat.FILE_ATTRIBUTE_DEVICE # type:ignore[attr-defined]
if st_mode & stat.S_IFLNK:
- mode |= stat.FILE_ATTRIBUTE_REPARSE_POINT
+ mode |= stat.FILE_ATTRIBUTE_REPARSE_POINT # type:ignore
return mode
@property
def st_reparse_tag(self) -> int:
if not self.is_windows or sys.version_info < (3, 8):
- raise AttributeError("module 'os.stat_result' "
- "has no attribute 'st_reparse_tag'")
+ raise AttributeError(
+ "module 'os.stat_result' " "has no attribute 'st_reparse_tag'"
+ )
if self.st_mode & stat.S_IFLNK:
return stat.IO_REPARSE_TAG_SYMLINK # type: ignore[attr-defined]
return 0
@@ -297,7 +357,7 @@ class FakeStatResult:
return int(self.st_mtime)
if item == stat.ST_CTIME:
return int(self.st_ctime)
- raise ValueError('Invalid item')
+ raise ValueError("Invalid item")
@property
def st_atime_ns(self) -> int:
@@ -334,21 +394,23 @@ class BinaryBufferIO(io.BytesIO):
"""Stream class that handles byte contents for files."""
def __init__(self, contents: Optional[bytes]):
- super().__init__(contents or b'')
+ super().__init__(contents or b"")
def putvalue(self, value: bytes) -> None:
self.write(value)
class TextBufferIO(io.TextIOWrapper):
- """Stream class that handles Python string contents for files.
- """
-
- def __init__(self, contents: Optional[bytes] = None,
- newline: Optional[str] = None,
- encoding: Optional[str] = None,
- errors: str = 'strict'):
- self._bytestream = io.BytesIO(contents or b'')
+ """Stream class that handles Python string contents for files."""
+
+ def __init__(
+ self,
+ contents: Optional[bytes] = None,
+ newline: Optional[str] = None,
+ encoding: Optional[str] = None,
+ errors: str = "strict",
+ ):
+ self._bytestream = io.BytesIO(contents or b"")
super().__init__(self._bytestream, encoding, errors, newline)
def getvalue(self) -> bytes: