aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKapileshwar Singh <kapileshwar.singh@arm.com>2015-12-11 16:01:04 +0000
committerKapileshwar Singh <kapileshwar.singh@arm.com>2015-12-12 23:08:25 +0000
commitd437831e4e605bcc37d8ab80d12c8ae395f2b6aa (patch)
tree0f6a620abc12273bafc139bc583a71b427e7eaa2
parent03b31786ba6059157f625111615f12abd575853a (diff)
downloadbart-d437831e4e605bcc37d8ab80d12c8ae395f2b6aa.tar.gz
sched: functions: Moved from trappy.stats.SchedConf
Update the respective usage of SchedConf to that from bart.sched.functions Signed-off-by: Kapileshwar Singh <kapileshwar.singh@arm.com>
-rwxr-xr-xbart/sched/SchedAssert.py40
-rwxr-xr-xbart/sched/SchedMatrix.py14
-rwxr-xr-xbart/sched/SchedMultiAssert.py6
-rw-r--r--bart/sched/functions.py597
4 files changed, 627 insertions, 30 deletions
diff --git a/bart/sched/SchedAssert.py b/bart/sched/SchedAssert.py
index f9d19f4..92dbc4a 100755
--- a/bart/sched/SchedAssert.py
+++ b/bart/sched/SchedAssert.py
@@ -23,7 +23,7 @@ import trappy
import itertools
import math
from trappy.stats.Aggregator import MultiTriggerAggregator
-from trappy.stats import SchedConf as sconf
+from bart.sched import functions as sched_funcs
from bart.common import Utils
import numpy as np
@@ -77,7 +77,7 @@ class SchedAssert(object):
self._pid = self._validate_pid(pid)
self._aggs = {}
self._topology = topology
- self._triggers = sconf.sched_triggers(self._run, self._pid,
+ self._triggers = sched_funcs.sched_triggers(self._run, self._pid,
trappy.sched.SchedSwitch)
self.name = "{}-{}".format(self.execname, self._pid)
@@ -85,7 +85,7 @@ class SchedAssert(object):
"""Validate the passed pid argument"""
if not pid:
- pids = sconf.get_pids_for_process(self._run,
+ pids = sched_funcs.get_pids_for_process(self._run,
self.execname)
if len(pids) != 1:
@@ -98,7 +98,7 @@ class SchedAssert(object):
elif self.execname:
- pids = sconf.get_pids_for_process(self._run,
+ pids = sched_funcs.get_pids_for_process(self._run,
self.execname)
if pid not in pids:
raise RuntimeError(
@@ -106,7 +106,7 @@ class SchedAssert(object):
pid,
self.execname))
else:
- self.execname = sconf.get_task_name(self._run, pid)
+ self.execname = sched_funcs.get_task_name(self._run, pid)
return pid
@@ -172,7 +172,7 @@ class SchedAssert(object):
# Get the index of the node in the level
node_index = self._topology.get_index(level, node)
- agg = self._aggregator(sconf.residency_sum)
+ agg = self._aggregator(sched_funcs.residency_sum)
level_result = agg.aggregate(level=level, window=window)
node_value = level_result[node_index]
@@ -243,8 +243,8 @@ class SchedAssert(object):
:return: The first time the task ran across all the CPUs
"""
- agg = self._aggregator(sconf.first_time)
- result = agg.aggregate(level="all", value=sconf.TASK_RUNNING)
+ agg = self._aggregator(sched_funcs.first_time)
+ result = agg.aggregate(level="all", value=sched_funcs.TASK_RUNNING)
return min(result[0])
def getEndTime(self):
@@ -253,9 +253,9 @@ class SchedAssert(object):
all the CPUs
"""
- agg = self._aggregator(sconf.first_time)
- agg = self._aggregator(sconf.last_time)
- result = agg.aggregate(level="all", value=sconf.TASK_RUNNING)
+ agg = self._aggregator(sched_funcs.first_time)
+ agg = self._aggregator(sched_funcs.last_time)
+ result = agg.aggregate(level="all", value=sched_funcs.TASK_RUNNING)
return max(result[0])
def _relax_switch_window(self, series, direction, window):
@@ -277,8 +277,8 @@ class SchedAssert(object):
even in the extended window
"""
- series = series[series == sconf.TASK_RUNNING]
- w_series = sconf.select_window(series, window)
+ series = series[series == sched_funcs.TASK_RUNNING]
+ w_series = sched_funcs.select_window(series, window)
start, stop = window
if direction == "left":
@@ -286,7 +286,7 @@ class SchedAssert(object):
return w_series.index.values[-1]
else:
start_time = self.getStartTime()
- w_series = sconf.select_window(
+ w_series = sched_funcs.select_window(
series,
window=(
start_time,
@@ -302,7 +302,7 @@ class SchedAssert(object):
return w_series.index.values[0]
else:
end_time = self.getEndTime()
- w_series = sconf.select_window(series, window=(stop, end_time))
+ w_series = sched_funcs.select_window(series, window=(stop, end_time))
if not len(w_series):
return None
@@ -344,7 +344,7 @@ class SchedAssert(object):
from_node_index = self._topology.get_index(level, from_node)
to_node_index = self._topology.get_index(level, to_node)
- agg = self._aggregator(sconf.csum)
+ agg = self._aggregator(sched_funcs.csum)
level_result = agg.aggregate(level=level)
from_node_result = level_result[from_node_index]
@@ -379,7 +379,7 @@ class SchedAssert(object):
.. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertRuntime`
"""
- agg = self._aggregator(sconf.residency_sum)
+ agg = self._aggregator(sched_funcs.residency_sum)
run_time = agg.aggregate(level="all", window=window)[0]
if percent:
@@ -470,7 +470,7 @@ class SchedAssert(object):
.. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertPeriod`
"""
- agg = self._aggregator(sconf.period)
+ agg = self._aggregator(sched_funcs.period)
deltas = agg.aggregate(level="all", window=window)[0]
if not len(deltas):
@@ -586,7 +586,7 @@ class SchedAssert(object):
.. seealso:: :mod:`bart.sched.SchedAssert.SchedAssert.assertFirstCPU`
"""
- agg = self._aggregator(sconf.first_cpu)
+ agg = self._aggregator(sched_funcs.first_cpu)
result = agg.aggregate(level="cpu", window=window)
result = list(itertools.chain.from_iterable(result))
@@ -619,7 +619,7 @@ class SchedAssert(object):
:mod:`bart.sched.SchedMultiAssert` class for plotting data
"""
- agg = self._aggregator(sconf.trace_event)
+ agg = self._aggregator(sched_funcs.trace_event)
result = agg.aggregate(level=level, window=window)
events = []
diff --git a/bart/sched/SchedMatrix.py b/bart/sched/SchedMatrix.py
index a01ad71..5088c7b 100755
--- a/bart/sched/SchedMatrix.py
+++ b/bart/sched/SchedMatrix.py
@@ -70,7 +70,7 @@ import trappy
import numpy as np
from trappy.stats.Aggregator import MultiTriggerAggregator
from trappy.stats.Correlator import Correlator
-from trappy.stats import SchedConf as sconf
+from bart.sched import functions as sched_funcs
from bart.common import Utils
POSITIVE_TOLERANCE = 0.80
@@ -150,7 +150,7 @@ class SchedMatrix(object):
trace,
topology,
execnames,
- aggfunc=sconf.csum):
+ aggfunc=sched_funcs.csum):
run = Utils.init_run(trace)
reference_run = Utils.init_run(reference_trace)
@@ -171,12 +171,12 @@ class SchedMatrix(object):
"""Populate the qualifying PIDs from the run"""
if len(self._execnames) == 1:
- return sconf.get_pids_for_process(run, self._execnames[0])
+ return sched_funcs.get_pids_for_process(run, self._execnames[0])
pids = []
for proc in self._execnames:
- pids += sconf.get_pids_for_process(run, proc)
+ pids += sched_funcs.get_pids_for_process(run, proc)
return list(set(pids))
@@ -190,7 +190,7 @@ class SchedMatrix(object):
reference_aggs.append(
MultiTriggerAggregator(
- sconf.sched_triggers(
+ sched_funcs.sched_triggers(
reference_run,
self._reference_pids[idx],
trappy.sched.SchedSwitch
@@ -200,7 +200,7 @@ class SchedMatrix(object):
aggs.append(
MultiTriggerAggregator(
- sconf.sched_triggers(
+ sched_funcs.sched_triggers(
run,
self._pids[idx],
trappy.sched.SchedSwitch
@@ -222,7 +222,7 @@ class SchedMatrix(object):
corr = Correlator(
ref_result,
test_result,
- corrfunc=sconf.binary_correlate,
+ corrfunc=sched_funcs.binary_correlate,
filter_gaps=True)
_, total = corr.correlate(level="cluster")
diff --git a/bart/sched/SchedMultiAssert.py b/bart/sched/SchedMultiAssert.py
index 94404d7..62de8bd 100755
--- a/bart/sched/SchedMultiAssert.py
+++ b/bart/sched/SchedMultiAssert.py
@@ -19,7 +19,7 @@ statistics aggregation framework"""
import re
import inspect
import trappy
-from trappy.stats import SchedConf as sconf
+from bart.sched import functions as sched_funcs
from bart.sched.SchedAssert import SchedAssert
from bart.common import Utils
@@ -167,12 +167,12 @@ class SchedMultiAssert(object):
"""Map the input execnames to PIDs"""
if len(self._execnames) == 1:
- return sconf.get_pids_for_process(self._run, self._execnames[0])
+ return sched_funcs.get_pids_for_process(self._run, self._execnames[0])
pids = []
for proc in self._execnames:
- pids += sconf.get_pids_for_process(self._run, proc)
+ pids += sched_funcs.get_pids_for_process(self._run, proc)
return list(set(pids))
diff --git a/bart/sched/functions.py b/bart/sched/functions.py
new file mode 100644
index 0000000..5353e39
--- /dev/null
+++ b/bart/sched/functions.py
@@ -0,0 +1,597 @@
+# Copyright 2015-2015 ARM Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Scheduler specific Functionality for the
+stats framework
+
+The Scheduler stats aggregation is based on a signal
+which is generated by the combination of two triggers
+from the events with the following parameters
+
+========================= ============ =============
+EVENT VALUE FILTERS
+========================= ============ =============
+:func:`sched_switch` 1 next_pid
+:func:`sched_switch` -1 prev_pid
+========================= ============ =============
+
+Both these Triggers are provided by the event
+:mod:`trappy.sched.SchedSwitch` which correspond to
+the :code:`sched_switch` unique word in the trace
+
+.. seealso:: :mod:`trappy.stats.Trigger.Trigger`
+
+Using the above information the following signals are
+generated.
+
+**EVENT SERIES**
+
+This is a combination of the two triggers as specified
+above and has alternating +/- 1 values and is merely
+a representation of the position in time when the process
+started or stopped running on a CPU
+
+**RESIDENCY SERIES**
+
+This series is a cumulative sum of the event series and
+is a representation of the continuous residency of the
+process on a CPU
+
+The pivot for the aggregators is the CPU on which the
+event occurred on. If N is the number of CPUs in the
+system, N signal for each CPU are generated. These signals
+can then be aggregated by specifying a Topology
+
+.. seealso:: :mod:`trappy.stats.Topology.Topology`
+"""
+
+import numpy as np
+from trappy.stats.Trigger import Trigger
+
+WINDOW_SIZE = 0.0001
+"""A control config for filter events. Some analyses
+may require ignoring of small interruptions"""
+
+# Trigger Values
+SCHED_SWITCH_IN = 1
+"""Value of the event when a task is **switch in**
+or scheduled on a CPU"""
+SCHED_SWITCH_OUT = -1
+"""Value of the event when a task is **switched out**
+or relinquishes a CPU"""
+NO_EVENT = 0
+"""Signifies no event on an event trace"""
+
+# Field Names
+CPU_FIELD = "__cpu"
+"""The column in the sched_switch event that
+indicates the CPU on which the event occurred
+"""
+NEXT_PID_FIELD = "next_pid"
+"""The column in the sched_switch event that
+indicates the PID of the next process to be scheduled
+"""
+PREV_PID_FIELD = "prev_pid"
+"""The column in the sched_switch event that
+indicates the PID of the process that was scheduled
+in
+"""
+TASK_RUNNING = 1
+"""The column in the sched_switch event that
+indicates the CPU on which the event occurred
+"""
+TASK_NOT_RUNNING = 0
+"""In a residency series, a zero indicates
+that the task is not running
+"""
+TIME_INVAL = -1
+"""Standard Value to indicate invalid time data"""
+SERIES_SANTIZED = "_sched_sanitized"
+"""A memoized flag which is set when an event series
+is checked for boundary conditions
+"""
+
+
+def sanitize_asymmetry(series, window=None):
+ """Sanitize the cases when a :code:`SWITCH_OUT`
+ happens before a :code:`SWITCH_IN`. (The case when
+ a process is already running before the trace started)
+
+ :param series: Input Time Series data
+ :type series: :mod:`pandas.Series`
+
+ :param window: A tuple indicating a time window
+ :type window: tuple
+ """
+
+ if not hasattr(series, SERIES_SANTIZED):
+
+ events = series[series != 0]
+ if len(series) >= 2 and len(events):
+ if series.values[0] == SCHED_SWITCH_OUT:
+ series.values[0] = TASK_NOT_RUNNING
+
+ elif events.values[0] == SCHED_SWITCH_OUT:
+ series.values[0] = SCHED_SWITCH_IN
+ if window:
+ series.index.values[0] = window[0]
+
+ if series.values[-1] == SCHED_SWITCH_IN:
+ series.values[-1] = TASK_NOT_RUNNING
+
+ elif events.values[-1] == SCHED_SWITCH_IN:
+ series.values[-1] = SCHED_SWITCH_OUT
+ if window:
+ series.index.values[-1] = window[1]
+
+ # No point if the series just has one value and
+ # one event. We do not have sufficient data points
+ # for any calculation. We should Ideally never reach
+ # here.
+ elif len(series) == 1:
+ series.values[0] = 0
+
+ setattr(series, SERIES_SANTIZED, True)
+
+ return series
+
+
+def csum(series, window=None, filter_gaps=False):
+ """:func:`aggfunc` for the cumulative sum of the
+ input series data
+
+ :param series: Input Time Series data
+ :type series: :mod:`pandas.Series`
+
+ :param window: A tuple indicating a time window
+ :type window: tuple
+
+ :param filter_gaps: If set, a process being switched out
+ for :mod:`bart.sched.functions.WINDOW_SIZE` is
+ ignored. This is helpful when small interruptions need
+ to be ignored to compare overall correlation
+ :type filter_gaps: bool
+ """
+
+ if filter_gaps:
+ series = filter_small_gaps(series)
+
+ series = series.cumsum()
+ return select_window(series, window)
+
+def filter_small_gaps(series):
+ """A helper function that does filtering of gaps
+ in residency series < :mod:`bart.sched.functions.WINDOW_SIZE`
+
+ :param series: Input Time Series data
+ :type series: :mod:`pandas.Series`
+ """
+
+ start = None
+ for index, value in series.iteritems():
+
+ if value == SCHED_SWITCH_IN:
+ if start == None:
+ continue
+
+ if index - start < WINDOW_SIZE:
+ series[start] = NO_EVENT
+ series[index] = NO_EVENT
+ start = None
+
+ if value == SCHED_SWITCH_OUT:
+ start = index
+
+ return series
+
+def first_cpu(series, window=None):
+ """:func:`aggfunc` to calculate the time of
+ the first switch in event in the series
+ This is returned as a vector of unit length
+ so that it can be aggregated and reduced across
+ nodes to find the first cpu of a task
+
+ :param series: Input Time Series data
+ :type series: :mod:`pandas.Series`
+
+ :param window: A tuple indicating a time window
+ :type window: tuple
+ """
+ series = select_window(series, window)
+ series = series[series == SCHED_SWITCH_IN]
+ if len(series):
+ return [series.index.values[0]]
+ else:
+ return [float("inf")]
+
+def select_window(series, window):
+ """Helper Function to select a portion of
+ pandas time series
+
+ :param series: Input Time Series data
+ :type series: :mod:`pandas.Series`
+
+ :param window: A tuple indicating a time window
+ :type window: tuple
+ """
+
+ if not window:
+ return series
+
+ start, stop = window
+ ix = series.index
+ selector = ((ix >= start) & (ix <= stop))
+ window_series = series[selector]
+ return window_series
+
+def residency_sum(series, window=None):
+ """:func:`aggfunc` to calculate the total
+ residency
+
+
+ The input series is processed for
+ intervals between a :mod:`bart.sched.functions.SCHED_SWITCH_OUT`
+ and :mod:`bart.sched.functions.SCHED_SWITCH_IN` to track
+ additive residency of a task
+
+ .. math::
+
+ S_{in} = i_{1}, i_{2}...i_{N} \\\\
+ S_{out} = o_{1}, o_{2}...o_{N} \\\\
+ R_{total} = \sum_{k}^{N}\Delta_k = \sum_{k}^{N}(o_{k} - i_{k})
+
+ :param series: Input Time Series data
+ :type series: :mod:`pandas.Series`
+
+ :param window: A tuple indicating a time window
+ :type window: tuple
+
+ :return: A scalar float value
+ """
+
+ if not len(series):
+ return 0.0
+
+ org_series = series
+ series = select_window(series, window)
+ series = sanitize_asymmetry(series, window)
+
+ s_in = series[series == SCHED_SWITCH_IN]
+ s_out = series[series == SCHED_SWITCH_OUT]
+
+ if not (len(s_in) and len(s_out)):
+ try:
+ org_series = sanitize_asymmetry(org_series)
+ running = select_window(org_series.cumsum(), window)
+ if running.values[0] == TASK_RUNNING and running.values[-1] == TASK_RUNNING:
+ return window[1] - window[0]
+ except Exception,e:
+ pass
+
+ if len(s_in) != len(s_out):
+ raise RuntimeError(
+ "Unexpected Lengths: s_in={}, s_out={}".format(
+ len(s_in),
+ len(s_out)))
+ else:
+ return np.sum(s_out.index.values - s_in.index.values)
+
+
+def first_time(series, value, window=None):
+ """:func:`aggfunc` to:
+
+ - Return the first index where the
+ series == value
+
+ - If no such index is found
+ +inf is returned
+
+ :param series: Input Time Series data
+ :type series: :mod:`pandas.Series`
+
+ :param window: A tuple indicating a time window
+ :type window: tuple
+
+ :return: A vector of Unit Length
+ """
+
+ series = select_window(series, window)
+ series = series[series == value]
+
+ if not len(series):
+ return [float("inf")]
+
+ return [series.index.values[0]]
+
+
+def period(series, align="start", window=None):
+ """This :func:`aggfunc` returns a tuple
+ of the average duration between two triggers:
+
+ - When :code:`align=start` the :code:`SCHED_IN`
+ trigger is used
+
+ - When :code:`align=end` the :code:`SCHED_OUT`
+ trigger is used
+
+
+ .. math::
+
+ E = e_{1}, e_{2}...e_{N} \\\\
+ T_p = \\frac{\sum_{j}^{\lfloor N/2 \\rfloor}(e_{2j + 1} - e_{2j})}{N}
+
+ :param series: Input Time Series data
+ :type series: :mod:`pandas.Series`
+
+ :param window: A tuple indicating a time window
+ :type window: tuple
+
+ :return:
+ A list of deltas of successive starts/stops
+ of a task
+
+ """
+
+ series = select_window(series, window)
+ series = sanitize_asymmetry(series, window)
+
+ if align == "start":
+ series = series[series == SCHED_SWITCH_IN]
+ elif align == "end":
+ series = series[series == SCHED_SWITCH_OUT]
+
+ if len(series) % 2 == 0:
+ series = series[:1]
+
+ if not len(series):
+ return []
+
+ return list(np.diff(series.index.values))
+
+def last_time(series, value, window=None):
+ """:func:`aggfunc` to:
+
+ - The first index where the
+ series == value
+
+ - If no such index is found
+ :mod:`bart.sched.functions.TIME_INVAL`
+ is returned
+
+ :param series: Input Time Series data
+ :type series: :mod:`pandas.Series`
+
+ :param window: A tuple indicating a time window
+ :type window: tuple
+
+ :return: A vector of Unit Length
+ """
+
+ series = select_window(series, window)
+ series = series[series == value]
+ if not len(series):
+ return [TIME_INVAL]
+
+ return [series.index.values[-1]]
+
+
+def binary_correlate(series_x, series_y):
+ """Helper function to Correlate binary Data
+
+ Both the series should have same indices
+
+ For binary time series data:
+
+ .. math::
+
+ \\alpha_{corr} = \\frac{N_{agree} - N_{disagree}}{N}
+
+ :param series_x: First time Series data
+ :type series_x: :mod:`pandas.Series`
+
+ :param series_y: Second time Series data
+ :type series_y: :mod:`pandas.Series`
+ """
+
+ if len(series_x) != len(series_y):
+ raise ValueError("Cannot compute binary correlation for \
+ unequal vectors")
+
+ agree = len(series_x[series_x == series_y])
+ disagree = len(series_x[series_x != series_y])
+
+ return (agree - disagree) / float(len(series_x))
+
+def get_pids_for_process(run, execname, cls=None):
+ """Get the PIDs for a given process
+
+ :param run: A run object with a sched_switch
+ event
+ :type run: :mod:`trappy.run.Run`
+
+ :param execname: The name of the process
+ :type execname: str
+
+ :param cls: The SchedSwitch event class (required if
+ a different event is to be used)
+ :type cls: :mod:`trappy.base.Base`
+
+ :return: The set of PIDs for the execname
+ """
+
+ if not cls:
+ try:
+ df = run.sched_switch.data_frame
+ except AttributeError:
+ raise ValueError("SchedSwitch event not found in run")
+ else:
+ event = getattr(run, cls.name)
+ df = event.data_frame
+
+ mask = df["next_comm"].apply(lambda x : True if x.startswith(execname) else False)
+ return list(np.unique(df[mask]["next_pid"].values))
+
+def get_task_name(run, pid, cls=None):
+ """Returns the execname for pid
+
+ :param run: A run object with a sched_switch
+ event
+ :type run: :mod:`trappy.run.Run`
+
+ :param pid: The PID of the process
+ :type pid: int
+
+ :param cls: The SchedSwitch event class (required if
+ a different event is to be used)
+ :type cls: :mod:`trappy.base.Base`
+
+ :return: The execname for the PID
+ """
+
+ if not cls:
+ try:
+ df = run.sched_switch.data_frame
+ except AttributeError:
+ raise ValueError("SchedSwitch event not found in run")
+ else:
+ event = getattr(run, cls.name)
+ df = event.data_frame
+
+ df = df[df["next_pid"] == pid]
+ if not len(df):
+ return ""
+ else:
+ return df["next_comm"].values[0]
+
+def sched_triggers(run, pid, sched_switch_class):
+ """Returns the list of sched_switch triggers
+
+ :param run: A run object with a sched_switch
+ event
+ :type run: :mod:`trappy.run.Run`
+
+ :param pid: The PID of the associated process
+ :type pid: int
+
+ :param sched_switch_class: The SchedSwitch event class
+ :type sched_switch_class: :mod:`trappy.base.Base`
+
+ :return: List of triggers, such that
+ ::
+
+ triggers[0] = switch_in_trigger
+ triggers[1] = switch_out_trigger
+ """
+
+ if not hasattr(run, "sched_switch"):
+ raise ValueError("SchedSwitch event not found in run")
+
+ triggers = []
+ triggers.append(sched_switch_in_trigger(run, pid, sched_switch_class))
+ triggers.append(sched_switch_out_trigger(run, pid, sched_switch_class))
+ return triggers
+
+def sched_switch_in_trigger(run, pid, sched_switch_class):
+ """
+ :param run: A run object with a sched_switch
+ event
+ :type run: :mod:`trappy.run.Run`
+
+ :param pid: The PID of the associated process
+ :type pid: int
+
+ :param sched_switch_class: The SchedSwitch event class
+ :type sched_switch_class: :mod:`trappy.base.Base`
+
+ :return: :mod:`trappy.stats.Trigger.Trigger` on
+ the SchedSwitch: IN for the given PID
+ """
+
+ task_in = {}
+ task_in[NEXT_PID_FIELD] = pid
+
+ return Trigger(run,
+ sched_switch_class, # trappy Event Class
+ task_in, # Filter Dictionary
+ SCHED_SWITCH_IN, # Trigger Value
+ CPU_FIELD) # Primary Pivot
+
+def sched_switch_out_trigger(run, pid, sched_switch_class):
+ """
+ :param run: A run object with a sched_switch
+ event
+ :type run: :mod:`trappy.run.Run`
+
+ :param pid: The PID of the associated process
+ :type pid: int
+
+ :param sched_switch_class: The SchedSwitch event class
+ :type sched_switch_class: :mod:`trappy.base.Base`
+
+ :return: :mod:`trappy.stats.Trigger.Trigger` on
+ the SchedSwitch: OUT for the given PID
+ """
+
+ task_out = {}
+ task_out[PREV_PID_FIELD] = pid
+
+ return Trigger(run,
+ sched_switch_class, # trappy Event Class
+ task_out, # Filter Dictionary
+ SCHED_SWITCH_OUT, # Trigger Value
+ CPU_FIELD) # Primary Pivot
+
+
+def trace_event(series, window=None):
+ """
+ :func:`aggfunc` to be used for plotting
+ the process residency data using
+ :mod:`trappy.plotter.EventPlot`
+
+ :param series: Input Time Series data
+ :type series: :mod:`pandas.Series`
+
+ :param window: A tuple indicating a time window
+ :type window: tuple
+
+ :return: A list of events
+ of the type:
+ ::
+
+ [
+ [start_time_1, stop_time_1],
+ [start_time_2, stop_time_2],
+ #
+ #
+ [start_time_N, stop_time_N],
+ ]
+ """
+ rects = []
+ series = select_window(series, window)
+ series = sanitize_asymmetry(series, window)
+
+ s_in = series[series == SCHED_SWITCH_IN]
+ s_out = series[series == SCHED_SWITCH_OUT]
+
+ if not len(s_in):
+ return rects
+
+ if len(s_in) != len(s_out):
+ raise RuntimeError(
+ "Unexpected Lengths: s_in={}, s_out={}".format(
+ len(s_in),
+ len(s_out)))
+
+ return np.column_stack((s_in.index.values, s_out.index.values))