1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
# -*- coding: UTF-8 -*-
# Copyright 2014 Red Hat, Inc.
# Part of clufter project
# Licensed under GPLv2+ (a copy included | http://gnu.org/licenses/gpl-2.0.txt)
"""Command context, i.e., state distributed along filters chain"""
__author__ = "Jan Pokorný <jpokorny @at@ Red Hat .dot. com>"
import logging
from collections import MutableMapping
from .error import ClufterError
from .utils import isinstanceexcept
from .utils_prog import TweakedDict
log = logging.getLogger(__name__)
class CommandContextError(ClufterError):
pass
class CommandContextBase(TweakedDict):
"""Object representing command context"""
def __init__(self, initial=None, parent=None, **kwargs):
super(CommandContextBase, self).__init__(initial=initial, **kwargs)
if parent is not None:
self._parent = parent
@property
def anabasis(self):
"""Traverse nested contexts hierarchy upwards"""
cur = self
while True:
yield cur
if cur is cur._parent:
break
cur = cur._parent
@property
def parent(self):
return self._parent
def __setitem__(self, key, value):
# XXX value could be also any valid dict constructor argument
if any(getattr(p, '_notaint', False) for p in self.anabasis):
raise RuntimeError("Cannot set item in notaint context")
self._dict[key] = CommandContextBase(initial=value, parent=self) \
if isinstanceexcept(value, MutableMapping,
CommandContextBase) \
else value
class CommandContext(CommandContextBase):
class notaint_context(CommandContextBase.notaint_context):
def __init__(self, self_outer, exit_off):
super(self.__class__, self).__init__(self_outer, exit_off)
self._fc = self_outer['__filter_context__'] \
.prevented_taint(exit_off)
def __enter__(self):
super(self.__class__, self).__enter__()
self._fc.__enter__()
def __exit__(self, *exc):
self._fc.__exit__()
super(self.__class__, self).__exit__()
def __init__(self, *args, **kwargs):
# filter_context ... where global arguments for filters to be stored
# filters ... where filter instance + arguments hybrid is stored
super(CommandContext, self).__init__(*args, **kwargs)
# could be cycle up to self if not bypassed
self['__filter_context__'] = CommandContextBase()
self['__filters__'] = CommandContextBase()
@staticmethod
def _wrapping_nested_context(obj):
class wrapped(CommandContextBase):
def __getattribute__(self, name):
if name == 'ctxt_wrapped':
return obj
elif name == 'ctxt_set':
ret = lambda self, **kwargs: self.update(kwargs)
elif name.startswith('ctxt_'):
# by convention, ctxt_* methods are using second
# argument to pass the respective (nested) context
ret = obj.__getattribute__(name)
if callable(ret):
ret = \
lambda *args, **kwargs: \
obj.__getattribute__(name)(self, *args, **kwargs)
else:
try:
ret = super(wrapped, self).__getattribute__(name)
except AttributeError:
ret = obj.__getattribute__(name)
return ret
def __setattribute__(self, name, value):
obj.__setattribute__(name, value)
return wrapped
def ensure_filter(self, flt):
existing, key = self['__filters__'], flt.__class__.name
ret = existing.get(key, None)
if ret is not None:
assert id(ret.ctxt_wrapped) == id(flt)
else:
ret = self._wrapping_nested_context(flt)
ret = existing[key] = ret(parent=self['__filter_context__'])
return ret
def ensure_filters(self, flts):
return map(self.ensure_filter, flts)
def filter(self, which=None):
if which is not None:
ret = self['__filters__'][which]
else:
ret = self['__filter_context__']
return ret
|