aboutsummaryrefslogtreecommitdiff
path: root/tests/test_iostream.cpp
blob: 1be0655dfb00424415501c6b57d08af765ebc8d9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/*
    tests/test_iostream.cpp -- Usage of scoped_output_redirect

    Copyright (c) 2017 Henry F. Schreiner

    All rights reserved. Use of this source code is governed by a
    BSD-style license that can be found in the LICENSE file.
*/

#if defined(_MSC_VER) && _MSC_VER < 1910  // VS 2015's MSVC
#  pragma warning(disable: 4702) // unreachable code in system header (xatomic.h(382))
#endif

#include <pybind11/iostream.h>
#include "pybind11_tests.h"
#include <atomic>
#include <iostream>
#include <thread>


void noisy_function(std::string msg, bool flush) {

    std::cout << msg;
    if (flush)
        std::cout << std::flush;
}

void noisy_funct_dual(std::string msg, std::string emsg) {
    std::cout << msg;
    std::cerr << emsg;
}

// object to manage C++ thread
// simply repeatedly write to std::cerr until stopped
// redirect is called at some point to test the safety of scoped_estream_redirect
struct TestThread {
    TestThread() : t_{nullptr}, stop_{false} {
        auto thread_f = [this] {
            while (!stop_) {
                std::cout << "x" << std::flush;
                std::this_thread::sleep_for(std::chrono::microseconds(50));
            } };
        t_ = new std::thread(std::move(thread_f));
    }

    ~TestThread() {
        delete t_;
    }

    void stop() { stop_ = true; }

    void join() {
        py::gil_scoped_release gil_lock;
        t_->join();
    }

    void sleep() {
        py::gil_scoped_release gil_lock;
        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }

    std::thread * t_;
    std::atomic<bool> stop_;
};


TEST_SUBMODULE(iostream, m) {

    add_ostream_redirect(m);

    // test_evals

    m.def("captured_output_default", [](std::string msg) {
        py::scoped_ostream_redirect redir;
        std::cout << msg << std::flush;
    });

    m.def("captured_output", [](std::string msg) {
        py::scoped_ostream_redirect redir(std::cout, py::module_::import("sys").attr("stdout"));
        std::cout << msg << std::flush;
    });

    m.def("guard_output", &noisy_function,
            py::call_guard<py::scoped_ostream_redirect>(),
            py::arg("msg"), py::arg("flush")=true);

    m.def("captured_err", [](std::string msg) {
        py::scoped_ostream_redirect redir(std::cerr, py::module_::import("sys").attr("stderr"));
        std::cerr << msg << std::flush;
    });

    m.def("noisy_function", &noisy_function, py::arg("msg"), py::arg("flush") = true);

    m.def("dual_guard", &noisy_funct_dual,
            py::call_guard<py::scoped_ostream_redirect, py::scoped_estream_redirect>(),
            py::arg("msg"), py::arg("emsg"));

    m.def("raw_output", [](std::string msg) {
        std::cout << msg << std::flush;
    });

    m.def("raw_err", [](std::string msg) {
        std::cerr << msg << std::flush;
    });

    m.def("captured_dual", [](std::string msg, std::string emsg) {
        py::scoped_ostream_redirect redirout(std::cout, py::module_::import("sys").attr("stdout"));
        py::scoped_ostream_redirect redirerr(std::cerr, py::module_::import("sys").attr("stderr"));
        std::cout << msg << std::flush;
        std::cerr << emsg << std::flush;
    });

    py::class_<TestThread>(m, "TestThread")
        .def(py::init<>())
        .def("stop", &TestThread::stop)
        .def("join", &TestThread::join)
        .def("sleep", &TestThread::sleep);
}