summaryrefslogtreecommitdiffstats
path: root/utils.py
blob: c457bc16964a779f4b96bb0359f52054f6a0bcb4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# -*- coding: UTF-8 -*-
# Copyright 2014 Red Hat, Inc.
# Part of clufter project
# Licensed under GPLv2 (a copy included | http://gnu.org/licenses/gpl-2.0.txt)
"""Various helpers"""
__author__ = "Jan Pokorný <jpokorny @at@ Red Hat .dot. com>"

import os
import sys
from subprocess import Popen

from .error import ClufterError


head_tail = lambda x=None, *y: (x, x if x is None else y)
filtervars = lambda src, which: \
             dict((x, src[x]) for x in which if x in src)
filtervarsdef = lambda src, which: \
                dict((x, src[x]) for x in which if src.get(x, None))
filtervarspop = lambda src, which: \
                dict((x, src.pop(x)) for x in which if x in src)
apply_preserving_depth = \
    lambda action: \
        lambda item: \
            type(item)([apply_preserving_depth(action)(i) for i in item]) \
            if isinstance(item, (tuple, list)) else action(item)
apply_aggregation_preserving_depth = \
    lambda agg_fn: \
        lambda item: \
            agg_fn([apply_aggregation_preserving_depth(agg_fn)(i)
                    for i in item]) \
            if isinstance(item, (tuple, list)) else item
apply_aggregation_preserving_passing_depth = \
    lambda agg_fn, d=0: \
        lambda item: \
            agg_fn([apply_aggregation_preserving_passing_depth(agg_fn, d+1)(i)
                    for i in item], d+1) \
            if isinstance(item, (tuple, list)) else item
# name comes from Haskell
# note: always returns list even for input tuple
# note: when returning from recursion, should always observe scalar or list(?)
apply_intercalate = apply_aggregation_preserving_depth(
    lambda i:
        reduce(lambda a, b: a + (b if isinstance(b, list) else [b]),
               i, [])
)

zipped_outlier = type('zipped_outlier', (tuple, ), {})
# NOTE: automatically shortens the longer counterpart in the pair
#       to the length of the bigger one
apply_loose_zip_preserving_depth = \
    lambda a, b: \
        (type(a) if type(a) == type(b) else type(a))(
            [apply_loose_zip_preserving_depth(*p) for p in zip(a, b)]
        ) if isinstance(a, (tuple, list)) == isinstance(b, (tuple, list)) \
          == True else zipped_outlier([a, b])
# as previous, but with length checking of some sort
apply_strict_zip_preserving_depth = \
    lambda a, b: \
        (type(a) if type(a) == type(b) else type(a))(
            [apply_strict_zip_preserving_depth(*p) for p in zip(a, b)]
        ) if isinstance(a, (tuple, list)) == isinstance(b, (tuple, list)) \
          == True and len(a) == len(b) else zipped_outlier([a, b])


def which(name, *where):
    """Mimic `which' UNIX utility"""
    where = tuple(os.path.abspath(i) for i in where)
    if 'PATH' in os.environ:
        path = tuple(i for i in os.environ['PATH'].split(os.pathsep)
                     if len(i.strip()))
    else:
        path = ()
    for p in where + path:
        check = os.path.join(p, name)
        if os.path.exists(check):
            return check
    else:
        return None


def func_defaults_varnames(func):
    """Using introspection, get a dict of kwargs' defaults + all arg names"""
    func_varnames = func.func_code.co_varnames
    func_defaults = dict(zip(
        func_varnames[-len(func.func_defaults):],
        func.func_defaults
    ))
    return func_defaults, func_varnames


class OneoffWrappedStdinPopen(object):
    """Singleton to watch for atmost one use of stdin in Popen context"""
    def __init__(self):
        self._used = False

    def __call__(self, args, **kwargs):
        if not 'stdin' in kwargs and '-' in args:
            if self._used:
                raise ClufterError(self, 'repeated use detected')
            kwargs['stdin'] = sys.stdin
            # only the first '-' substituted
            args[args.index('-')] = '/dev/stdin'
            self._used |= True
        return Popen(args, **kwargs)

OneoffWrappedStdinPopen = OneoffWrappedStdinPopen()


# Inspired by http://stackoverflow.com/a/1383402
class classproperty(property):
    def __init__(self, fnc):
        property.__init__(self, classmethod(fnc))

    def __get__(self, this, owner):
        return self.fget.__get__(None, owner)()


class hybridproperty(property):
    def __init__(self, fnc):
        property.__init__(self, classmethod(fnc))

    def __get__(self, this, owner):
        return self.fget.__get__(None, this if this else owner)()