aboutsummaryrefslogtreecommitdiff
path: root/pyfakefs/fake_open.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyfakefs/fake_open.py')
-rw-r--r--pyfakefs/fake_open.py361
1 files changed, 361 insertions, 0 deletions
diff --git a/pyfakefs/fake_open.py b/pyfakefs/fake_open.py
new file mode 100644
index 0000000..912ada9
--- /dev/null
+++ b/pyfakefs/fake_open.py
@@ -0,0 +1,361 @@
+# Copyright 2009 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""A fake open() function replacement. See ``fake_filesystem`` for usage.
+"""
+import errno
+import os
+import sys
+from collections import namedtuple
+from stat import (
+ S_ISDIR,
+)
+from typing import (
+ Optional,
+ Union,
+ Any,
+ Tuple,
+ cast,
+ AnyStr,
+ TYPE_CHECKING,
+)
+
+from pyfakefs import helpers
+from pyfakefs.fake_file import (
+ FakePipeWrapper,
+ FakeFileWrapper,
+ FakeFile,
+ AnyFileWrapper,
+)
+from pyfakefs.helpers import (
+ AnyString,
+ is_root,
+ PERM_READ,
+ PERM_WRITE,
+)
+
+if TYPE_CHECKING:
+ from pyfakefs.fake_filesystem import FakeFilesystem
+
+
+_OpenModes = namedtuple(
+ "_OpenModes",
+ "must_exist can_read can_write truncate append must_not_exist",
+)
+
+_OPEN_MODE_MAP = {
+ # mode name:(file must exist, can read, can write,
+ # truncate, append, must not exist)
+ "r": (True, True, False, False, False, False),
+ "w": (False, False, True, True, False, False),
+ "a": (False, False, True, False, True, False),
+ "r+": (True, True, True, False, False, False),
+ "w+": (False, True, True, True, False, False),
+ "a+": (False, True, True, False, True, False),
+ "x": (False, False, True, False, False, True),
+ "x+": (False, True, True, False, False, True),
+}
+
+
+class FakeFileOpen:
+ """Faked `file()` and `open()` function replacements.
+
+ Returns FakeFile objects in a FakeFilesystem in place of the `file()`
+ or `open()` function.
+ """
+
+ __name__ = "FakeFileOpen"
+
+ def __init__(
+ self,
+ filesystem: "FakeFilesystem",
+ delete_on_close: bool = False,
+ raw_io: bool = False,
+ ):
+ """
+ Args:
+ filesystem: FakeFilesystem used to provide file system information
+ delete_on_close: optional boolean, deletes file on close()
+ """
+ self.filesystem = filesystem
+ self._delete_on_close = delete_on_close
+ self.raw_io = raw_io
+
+ def __call__(self, *args: Any, **kwargs: Any) -> AnyFileWrapper:
+ """Redirects calls to file() or open() to appropriate method."""
+ return self.call(*args, **kwargs)
+
+ def call(
+ self,
+ file_: Union[AnyStr, int],
+ mode: str = "r",
+ buffering: int = -1,
+ encoding: Optional[str] = None,
+ errors: Optional[str] = None,
+ newline: Optional[str] = None,
+ closefd: bool = True,
+ opener: Any = None,
+ open_modes: Optional[_OpenModes] = None,
+ ) -> AnyFileWrapper:
+ """Return a file-like object with the contents of the target
+ file object.
+
+ Args:
+ file_: Path to target file or a file descriptor.
+ mode: Additional file modes (all modes in `open()` are supported).
+ buffering: the buffer size used for writing. Data will only be
+ flushed if buffer size is exceeded. The default (-1) uses a
+ system specific default buffer size. Text line mode (e.g.
+ buffering=1 in text mode) is not supported.
+ encoding: The encoding used to encode unicode strings / decode
+ bytes.
+ errors: (str) Defines how encoding errors are handled.
+ newline: Controls universal newlines, passed to stream object.
+ closefd: If a file descriptor rather than file name is passed,
+ and this is set to `False`, then the file descriptor is kept
+ open when file is closed.
+ opener: an optional function object that will be called with
+ `file_` and the open flags (derived from `mode`) and returns
+ a file descriptor.
+ open_modes: Modes for opening files if called from low-level API.
+
+ Returns:
+ A file-like object containing the contents of the target file.
+
+ Raises:
+ OSError depending on Python version / call mode:
+ - if the target object is a directory
+ - on an invalid path
+ - if the file does not exist when it should
+ - if the file exists but should not
+ - if permission is denied
+ ValueError: for an invalid mode or mode combination
+ """
+ binary = "b" in mode
+
+ if binary and encoding:
+ raise ValueError("binary mode doesn't take an encoding argument")
+
+ newline, open_modes = self._handle_file_mode(mode, newline, open_modes)
+
+ # the pathlib opener is defined in a Path instance that may not be
+ # patched under some circumstances; as it just calls standard open(),
+ # we may ignore it, as it would not change the behavior
+ if opener is not None and opener.__module__ != "pathlib":
+ # opener shall return a file descriptor, which will be handled
+ # here as if directly passed
+ file_ = opener(file_, self._open_flags_from_open_modes(open_modes))
+
+ file_object, file_path, filedes, real_path = self._handle_file_arg(file_)
+ if file_object is None and file_path is None:
+ # file must be a fake pipe wrapper, find it...
+ if (
+ filedes is None
+ or len(self.filesystem.open_files) <= filedes
+ or not self.filesystem.open_files[filedes]
+ ):
+ raise OSError(errno.EBADF, "invalid pipe file descriptor")
+ wrappers = self.filesystem.open_files[filedes]
+ assert wrappers is not None
+ existing_wrapper = wrappers[0]
+ assert isinstance(existing_wrapper, FakePipeWrapper)
+ wrapper = FakePipeWrapper(
+ self.filesystem,
+ existing_wrapper.fd,
+ existing_wrapper.can_write,
+ mode,
+ )
+ file_des = self.filesystem._add_open_file(wrapper)
+ wrapper.filedes = file_des
+ return wrapper
+
+ assert file_path is not None
+ if not filedes:
+ closefd = True
+
+ if (
+ not opener
+ and open_modes.must_not_exist
+ and (
+ file_object
+ or self.filesystem.islink(file_path)
+ and not self.filesystem.is_windows_fs
+ )
+ ):
+ self.filesystem.raise_os_error(errno.EEXIST, file_path)
+
+ assert real_path is not None
+ file_object = self._init_file_object(
+ file_object, file_path, open_modes, real_path
+ )
+
+ if S_ISDIR(file_object.st_mode):
+ if self.filesystem.is_windows_fs:
+ self.filesystem.raise_os_error(errno.EACCES, file_path)
+ else:
+ self.filesystem.raise_os_error(errno.EISDIR, file_path)
+
+ # If you print obj.name, the argument to open() must be printed.
+ # Not the abspath, not the filename, but the actual argument.
+ file_object.opened_as = file_path
+ if open_modes.truncate:
+ current_time = helpers.now()
+ file_object.st_mtime = current_time
+ if not self.filesystem.is_windows_fs:
+ file_object.st_ctime = current_time
+
+ fakefile = FakeFileWrapper(
+ file_object,
+ file_path,
+ update=open_modes.can_write,
+ read=open_modes.can_read,
+ append=open_modes.append,
+ delete_on_close=self._delete_on_close,
+ filesystem=self.filesystem,
+ newline=newline,
+ binary=binary,
+ closefd=closefd,
+ encoding=encoding,
+ errors=errors,
+ buffering=buffering,
+ raw_io=self.raw_io,
+ )
+ if filedes is not None:
+ fakefile.filedes = filedes
+ # replace the file wrapper
+ open_files_list = self.filesystem.open_files[filedes]
+ assert open_files_list is not None
+ open_files_list.append(fakefile)
+ else:
+ fakefile.filedes = self.filesystem._add_open_file(fakefile)
+ return fakefile
+
+ @staticmethod
+ def _open_flags_from_open_modes(open_modes: _OpenModes) -> int:
+ flags = 0
+ if open_modes.can_read and open_modes.can_write:
+ flags |= os.O_RDWR
+ elif open_modes.can_read:
+ flags |= os.O_RDONLY
+ elif open_modes.can_write:
+ flags |= os.O_WRONLY
+
+ if open_modes.append:
+ flags |= os.O_APPEND
+ if open_modes.truncate:
+ flags |= os.O_TRUNC
+ if not open_modes.must_exist and open_modes.can_write:
+ flags |= os.O_CREAT
+ if open_modes.must_not_exist and open_modes.can_write:
+ flags |= os.O_EXCL
+ return flags
+
+ def _init_file_object(
+ self,
+ file_object: Optional[FakeFile],
+ file_path: AnyStr,
+ open_modes: _OpenModes,
+ real_path: AnyString,
+ ) -> FakeFile:
+ if file_object:
+ if not is_root() and (
+ (open_modes.can_read and not file_object.st_mode & PERM_READ)
+ or (open_modes.can_write and not file_object.st_mode & PERM_WRITE)
+ ):
+ self.filesystem.raise_os_error(errno.EACCES, file_path)
+ if open_modes.can_write:
+ if open_modes.truncate:
+ file_object.set_contents("")
+ else:
+ if open_modes.must_exist:
+ self.filesystem.raise_os_error(errno.ENOENT, file_path)
+ if self.filesystem.islink(file_path):
+ link_object = self.filesystem.resolve(file_path, follow_symlinks=False)
+ assert link_object.contents is not None
+ target_path = cast(
+ AnyStr, link_object.contents
+ ) # pytype: disable=invalid-annotation
+ else:
+ target_path = file_path
+ if self.filesystem.ends_with_path_separator(target_path):
+ error = (
+ errno.EINVAL
+ if self.filesystem.is_windows_fs
+ else errno.ENOENT
+ if self.filesystem.is_macos
+ else errno.EISDIR
+ )
+ self.filesystem.raise_os_error(error, file_path)
+ file_object = self.filesystem.create_file_internally(
+ real_path, create_missing_dirs=False, apply_umask=True
+ )
+ return file_object
+
+ def _handle_file_arg(
+ self, file_: Union[AnyStr, int]
+ ) -> Tuple[Optional[FakeFile], Optional[AnyStr], Optional[int], Optional[AnyStr]]:
+ file_object = None
+ if isinstance(file_, int):
+ # opening a file descriptor
+ filedes: int = file_
+ wrapper = self.filesystem.get_open_file(filedes)
+ if isinstance(wrapper, FakePipeWrapper):
+ return None, None, filedes, None
+ if isinstance(wrapper, FakeFileWrapper):
+ self._delete_on_close = wrapper.delete_on_close
+ file_object = cast(
+ FakeFile, self.filesystem.get_open_file(filedes).get_object()
+ )
+ assert file_object is not None
+ path = file_object.name
+ return (
+ file_object,
+ cast(AnyStr, path), # pytype: disable=invalid-annotation
+ filedes,
+ cast(AnyStr, path), # pytype: disable=invalid-annotation
+ )
+
+ # open a file file by path
+ file_path = cast(AnyStr, file_) # pytype: disable=invalid-annotation
+ if file_path == self.filesystem.dev_null.name:
+ file_object = self.filesystem.dev_null
+ real_path = file_path
+ else:
+ real_path = self.filesystem.resolve_path(file_path)
+ if self.filesystem.exists(file_path):
+ file_object = self.filesystem.get_object_from_normpath(
+ real_path, check_read_perm=False
+ )
+ return file_object, file_path, None, real_path
+
+ def _handle_file_mode(
+ self,
+ mode: str,
+ newline: Optional[str],
+ open_modes: Optional[_OpenModes],
+ ) -> Tuple[Optional[str], _OpenModes]:
+ orig_modes = mode # Save original modes for error messages.
+ # Normalize modes. Handle 't' and 'U'.
+ if ("b" in mode and "t" in mode) or (
+ sys.version_info > (3, 10) and "U" in mode
+ ):
+ raise ValueError("Invalid mode: " + mode)
+ mode = mode.replace("t", "").replace("b", "")
+ mode = mode.replace("rU", "r").replace("U", "r")
+ if not self.raw_io:
+ if mode not in _OPEN_MODE_MAP:
+ raise ValueError("Invalid mode: %r" % orig_modes)
+ open_modes = _OpenModes(*_OPEN_MODE_MAP[mode])
+ assert open_modes is not None
+ return newline, open_modes