aboutsummaryrefslogtreecommitdiff
path: root/pyfakefs/fake_scandir.py
blob: cfe9f2f9df69cc97e5114ffcfe95e70ad27a68e9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A fake implementation for the `scandir` function working with
FakeFilesystem.
Works with both the function integrated into the `os` module since Python 3.5
and the standalone function available in the standalone `scandir` python
package.
"""
import os
import sys

from pyfakefs.extra_packages import use_scandir_package
from pyfakefs.helpers import to_string, make_string_path

if sys.version_info >= (3, 6):
    BaseClass = os.PathLike
else:
    BaseClass = object


class DirEntry(BaseClass):
    """Emulates os.DirEntry. Note that we did not enforce keyword only
    arguments."""

    def __init__(self, filesystem):
        """Initialize the dir entry with unset values.

        Args:
            filesystem: the fake filesystem used for implementation.
        """
        self._filesystem = filesystem
        self.name = ""
        self.path = ""
        self._abspath = ""
        self._inode = None
        self._islink = False
        self._isdir = False
        self._statresult = None
        self._statresult_symlink = None

    def inode(self):
        """Return the inode number of the entry."""
        if self._inode is None:
            self.stat(follow_symlinks=False)
        return self._inode

    def is_dir(self, follow_symlinks=True):
        """Return True if this entry is a directory entry.

        Args:
            follow_symlinks: If True, also return True if this entry is a
                symlink pointing to a directory.

        Returns:
            True if this entry is an existing directory entry, or if
                follow_symlinks is set, and this entry points to an existing
                directory entry.
        """
        return self._isdir and (follow_symlinks or not self._islink)

    def is_file(self, follow_symlinks=True):
        """Return True if this entry is a regular file entry.

        Args:
            follow_symlinks: If True, also return True if this entry is a
                symlink pointing to a regular file.

        Returns:
            True if this entry is an existing file entry, or if
                follow_symlinks is set, and this entry points to an existing
                file entry.
        """
        return not self._isdir and (follow_symlinks or not self._islink)

    def is_symlink(self):
        """Return True if this entry is a symbolic link (even if broken)."""
        return self._islink

    def stat(self, follow_symlinks=True):
        """Return a stat_result object for this entry.

        Args:
            follow_symlinks: If False and the entry is a symlink, return the
                result for the symlink, otherwise for the object it points to.
        """
        if follow_symlinks:
            if self._statresult_symlink is None:
                file_object = self._filesystem.resolve(self._abspath)
                self._statresult_symlink = file_object.stat_result.copy()
                if self._filesystem.is_windows_fs:
                    self._statresult_symlink.st_nlink = 0
            return self._statresult_symlink

        if self._statresult is None:
            file_object = self._filesystem.lresolve(self._abspath)
            self._inode = file_object.st_ino
            self._statresult = file_object.stat_result.copy()
            if self._filesystem.is_windows_fs:
                self._statresult.st_nlink = 0
        return self._statresult

    if sys.version_info >= (3, 6):

        def __fspath__(self):
            return self.path

    if sys.version_info >= (3, 12):

        def is_junction(self) -> bool:
            """Return True if this entry is a junction.
            Junctions are not a part of posix semantic."""
            if not self._filesystem.is_windows_fs:
                return False
            file_object = self._filesystem.resolve(self._abspath)
            return file_object.is_junction


class ScanDirIter:
    """Iterator for DirEntry objects returned from `scandir()`
    function."""

    def __init__(self, filesystem, path):
        self.filesystem = filesystem
        if isinstance(path, int):
            if not use_scandir_package and (
                sys.version_info < (3, 7) or self.filesystem.is_windows_fs
            ):
                raise NotImplementedError(
                    "scandir does not support file descriptor " "path argument"
                )
            self.abspath = self.filesystem.absnormpath(
                self.filesystem.get_open_file(path).get_object().path
            )
            self.path = ""
        else:
            path = make_string_path(path)
            self.abspath = self.filesystem.absnormpath(path)
            self.path = to_string(path)
        entries = self.filesystem.confirmdir(self.abspath).entries
        self.entry_iter = iter(entries)

    def __iter__(self):
        return self

    def __next__(self):
        entry = self.entry_iter.__next__()
        dir_entry = DirEntry(self.filesystem)
        dir_entry.name = entry
        dir_entry.path = self.filesystem.joinpaths(self.path, dir_entry.name)
        dir_entry._abspath = self.filesystem.joinpaths(self.abspath, dir_entry.name)
        dir_entry._isdir = self.filesystem.isdir(dir_entry._abspath)
        dir_entry._islink = self.filesystem.islink(dir_entry._abspath)
        return dir_entry

    if sys.version_info >= (3, 6):

        def __enter__(self):
            return self

        def __exit__(self, exc_type, exc_val, exc_tb):
            self.close()

        def close(self):
            pass


def scandir(filesystem, path=""):
    """Return an iterator of DirEntry objects corresponding to the entries
    in the directory given by path.

    Args:
        filesystem: The fake filesystem used for implementation
        path: Path to the target directory within the fake filesystem.

    Returns:
        an iterator to an unsorted list of os.DirEntry objects for
        each entry in path.

    Raises:
        OSError: if the target is not a directory.
    """
    return ScanDirIter(filesystem, path)


def _classify_directory_contents(filesystem, root):
    """Classify contents of a directory as files/directories.

    Args:
        filesystem: The fake filesystem used for implementation
        root: (str) Directory to examine.

    Returns:
        (tuple) A tuple consisting of three values: the directory examined,
        a list containing all of the directory entries, and a list
        containing all of the non-directory entries.
        (This is the same format as returned by the `os.walk` generator.)

    Raises:
        Nothing on its own, but be ready to catch exceptions generated by
        underlying mechanisms like `os.listdir`.
    """
    dirs = []
    files = []
    for entry in filesystem.listdir(root):
        if filesystem.isdir(filesystem.joinpaths(root, entry)):
            dirs.append(entry)
        else:
            files.append(entry)
    return root, dirs, files


def walk(filesystem, top, topdown=True, onerror=None, followlinks=False):
    """Perform an os.walk operation over the fake filesystem.

    Args:
        filesystem: The fake filesystem used for implementation
        top: The root directory from which to begin walk.
        topdown: Determines whether to return the tuples with the root as
            the first entry (`True`) or as the last, after all the child
            directory tuples (`False`).
      onerror: If not `None`, function which will be called to handle the
            `os.error` instance provided when `os.listdir()` fails.
      followlinks: If `True`, symbolic links are followed.

    Yields:
        (path, directories, nondirectories) for top and each of its
        subdirectories.  See the documentation for the builtin os module
        for further details.
    """

    def do_walk(top_dir, top_most=False):
        if not top_most and not followlinks and filesystem.islink(top_dir):
            return
        try:
            top_contents = _classify_directory_contents(filesystem, top_dir)
        except OSError as exc:
            top_contents = None
            if onerror is not None:
                onerror(exc)

        if top_contents is not None:
            if topdown:
                yield top_contents

            for directory in top_contents[1]:
                path = filesystem.joinpaths(top_dir, directory)
                if not followlinks and filesystem.islink(path):
                    continue
                for contents in do_walk(path):
                    yield contents
            if not topdown:
                yield top_contents

    return do_walk(to_string(top), top_most=True)


class FakeScanDirModule:
    """Uses FakeFilesystem to provide a fake `scandir` module replacement.

    .. Note:: The ``scandir`` function is a part of the standard ``os`` module
      since Python 3.5. This class handles the separate ``scandir`` module
      that is available on pypi.

    You need a fake_filesystem to use this:
    `filesystem = fake_filesystem.FakeFilesystem()`
    `fake_scandir_module = fake_filesystem.FakeScanDirModule(filesystem)`
    """

    @staticmethod
    def dir():
        """Return the list of patched function names. Used for patching
        functions imported from the module.
        """
        return "scandir", "walk"

    def __init__(self, filesystem):
        self.filesystem = filesystem

    def scandir(self, path="."):
        """Return an iterator of DirEntry objects corresponding to the entries
        in the directory given by path.

        Args:
            path: Path to the target directory within the fake filesystem.

        Returns:
            an iterator to an unsorted list of os.DirEntry objects for
            each entry in path.

        Raises:
            OSError: if the target is not a directory.
        """
        return scandir(self.filesystem, path)

    def walk(self, top, topdown=True, onerror=None, followlinks=False):
        """Perform a walk operation over the fake filesystem.

        Args:
            top: The root directory from which to begin walk.
            topdown: Determines whether to return the tuples with the root as
                the first entry (`True`) or as the last, after all the child
                directory tuples (`False`).
          onerror: If not `None`, function which will be called to handle the
                `os.error` instance provided when `os.listdir()` fails.
          followlinks: If `True`, symbolic links are followed.

        Yields:
            (path, directories, nondirectories) for top and each of its
            subdirectories.  See the documentation for the builtin os module
            for further details.
        """
        return walk(self.filesystem, top, topdown, onerror, followlinks)