summaryrefslogtreecommitdiffstats
path: root/jenkins_jobs/parser.py
diff options
context:
space:
mode:
Diffstat (limited to 'jenkins_jobs/parser.py')
-rw-r--r--jenkins_jobs/parser.py289
1 files changed, 158 insertions, 131 deletions
diff --git a/jenkins_jobs/parser.py b/jenkins_jobs/parser.py
index 667ce426..e2ef6f15 100644
--- a/jenkins_jobs/parser.py
+++ b/jenkins_jobs/parser.py
@@ -29,9 +29,7 @@ from jenkins_jobs.formatter import deep_format
import jenkins_jobs.local_yaml as local_yaml
from jenkins_jobs import utils
-__all__ = [
- "YamlParser"
-]
+__all__ = ["YamlParser"]
logger = logging.getLogger(__name__)
@@ -45,8 +43,7 @@ def matches(what, glob_patterns):
:arg iterable glob_patterns: glob patterns to match (list, tuple, set,
etc.)
"""
- return any(fnmatch.fnmatch(what, glob_pattern)
- for glob_pattern in glob_patterns)
+ return any(fnmatch.fnmatch(what, glob_pattern) for glob_pattern in glob_patterns)
def combination_matches(combination, match_combinations):
@@ -79,28 +76,32 @@ class YamlParser(object):
self.views = []
self.jjb_config = jjb_config
- self.keep_desc = jjb_config.yamlparser['keep_descriptions']
- self.path = jjb_config.yamlparser['include_path']
+ self.keep_desc = jjb_config.yamlparser["keep_descriptions"]
+ self.path = jjb_config.yamlparser["include_path"]
def load_files(self, fn):
# handle deprecated behavior, and check that it's not a file like
# object as these may implement the '__iter__' attribute.
- if not hasattr(fn, '__iter__') or hasattr(fn, 'read'):
+ if not hasattr(fn, "__iter__") or hasattr(fn, "read"):
logger.warning(
- 'Passing single elements for the `fn` argument in '
- 'Builder.load_files is deprecated. Please update your code '
- 'to use a list as support for automatic conversion will be '
- 'removed in a future version.')
+ "Passing single elements for the `fn` argument in "
+ "Builder.load_files is deprecated. Please update your code "
+ "to use a list as support for automatic conversion will be "
+ "removed in a future version."
+ )
fn = [fn]
files_to_process = []
for path in fn:
- if not hasattr(path, 'read') and os.path.isdir(path):
- files_to_process.extend([os.path.join(path, f)
- for f in sorted(os.listdir(path))
- if (f.endswith('.yml') or
- f.endswith('.yaml'))])
+ if not hasattr(path, "read") and os.path.isdir(path):
+ files_to_process.extend(
+ [
+ os.path.join(path, f)
+ for f in sorted(os.listdir(path))
+ if (f.endswith(".yml") or f.endswith(".yaml"))
+ ]
+ )
else:
files_to_process.append(path)
@@ -108,41 +109,45 @@ class YamlParser(object):
# definitions of macros and templates when loading all from top-level
unique_files = []
for f in files_to_process:
- if hasattr(f, 'read'):
+ if hasattr(f, "read"):
unique_files.append(f)
continue
rpf = os.path.realpath(f)
if rpf not in unique_files:
unique_files.append(rpf)
else:
- logger.warning("File '%s' already added as '%s', ignoring "
- "reference to avoid duplicating yaml "
- "definitions." % (f, rpf))
+ logger.warning(
+ "File '%s' already added as '%s', ignoring "
+ "reference to avoid duplicating yaml "
+ "definitions." % (f, rpf)
+ )
for in_file in unique_files:
# use of ask-for-permissions instead of ask-for-forgiveness
# performs better when low use cases.
- if hasattr(in_file, 'name'):
+ if hasattr(in_file, "name"):
fname = in_file.name
else:
fname = in_file
logger.debug("Parsing YAML file {0}".format(fname))
- if hasattr(in_file, 'read'):
+ if hasattr(in_file, "read"):
self._parse_fp(in_file)
else:
self.parse(in_file)
def _parse_fp(self, fp):
# wrap provided file streams to ensure correct encoding used
- data = local_yaml.load(utils.wrap_stream(fp),
- self.jjb_config.yamlparser['retain_anchors'],
- search_path=self.path)
+ data = local_yaml.load(
+ utils.wrap_stream(fp),
+ self.jjb_config.yamlparser["retain_anchors"],
+ search_path=self.path,
+ )
if data:
if not isinstance(data, list):
raise JenkinsJobsException(
"The topmost collection in file '{fname}' must be a list,"
- " not a {cls}".format(fname=getattr(fp, 'name', fp),
- cls=type(data)))
+ " not a {cls}".format(fname=getattr(fp, "name", fp), cls=type(data))
+ )
for item in data:
cls, dfn = next(iter(item.items()))
group = self.data.get(cls, {})
@@ -153,41 +158,43 @@ class YamlParser(object):
n = v
break
# Syntax error
- raise JenkinsJobsException("Syntax error, for item "
- "named '{0}'. Missing indent?"
- .format(n))
+ raise JenkinsJobsException(
+ "Syntax error, for item "
+ "named '{0}'. Missing indent?".format(n)
+ )
# allow any entry to specify an id that can also be used
- _id = dfn.get('id', dfn['name'])
+ _id = dfn.get("id", dfn["name"])
if _id in group:
self._handle_dups(
"Duplicate entry found in '{0}: '{1}' already "
- "defined".format(fp.name, _id))
+ "defined".format(fp.name, _id)
+ )
group[_id] = dfn
self.data[cls] = group
def parse(self, fn):
- with io.open(fn, 'r', encoding='utf-8') as fp:
+ with io.open(fn, "r", encoding="utf-8") as fp:
self._parse_fp(fp)
def _handle_dups(self, message):
- if not self.jjb_config.yamlparser['allow_duplicates']:
+ if not self.jjb_config.yamlparser["allow_duplicates"]:
logger.error(message)
raise JenkinsJobsException(message)
else:
logger.warning(message)
def _getJob(self, name):
- job = self.data.get('job', {}).get(name, None)
+ job = self.data.get("job", {}).get(name, None)
if not job:
return job
return self._applyDefaults(job)
def _getJobGroup(self, name):
- return self.data.get('job-group', {}).get(name, None)
+ return self.data.get("job-group", {}).get(name, None)
def _getJobTemplate(self, name):
- job = self.data.get('job-template', {}).get(name, None)
+ job = self.data.get("job-template", {}).get(name, None)
if not job:
return job
return self._applyDefaults(job)
@@ -196,12 +203,12 @@ class YamlParser(object):
if override_dict is None:
override_dict = {}
- whichdefaults = data.get('defaults', 'global')
- defaults = copy.deepcopy(self.data.get('defaults',
- {}).get(whichdefaults, {}))
- if defaults == {} and whichdefaults != 'global':
- raise JenkinsJobsException("Unknown defaults set: '{0}'"
- .format(whichdefaults))
+ whichdefaults = data.get("defaults", "global")
+ defaults = copy.deepcopy(self.data.get("defaults", {}).get(whichdefaults, {}))
+ if defaults == {} and whichdefaults != "global":
+ raise JenkinsJobsException(
+ "Unknown defaults set: '{0}'".format(whichdefaults)
+ )
for key in override_dict.keys():
if key in defaults.keys():
@@ -216,53 +223,52 @@ class YamlParser(object):
if self.keep_desc:
description = job.get("description", None)
else:
- description = job.get("description", '')
+ description = job.get("description", "")
if description is not None:
- job["description"] = description + \
- self._get_managed_string().lstrip()
+ job["description"] = description + self._get_managed_string().lstrip()
def _getfullname(self, data):
- if 'folder' in data:
- return "%s/%s" % (data['folder'], data['name'])
+ if "folder" in data:
+ return "%s/%s" % (data["folder"], data["name"])
- return data['name']
+ return data["name"]
def expandYaml(self, registry, jobs_glob=None):
changed = True
while changed:
changed = False
for module in registry.modules:
- if hasattr(module, 'handle_data'):
+ if hasattr(module, "handle_data"):
if module.handle_data(self.data):
changed = True
- for job in self.data.get('job', {}).values():
+ for job in self.data.get("job", {}).values():
job = self._applyDefaults(job)
- job['name'] = self._getfullname(job)
+ job["name"] = self._getfullname(job)
- if jobs_glob and not matches(job['name'], jobs_glob):
- logger.debug("Ignoring job {0}".format(job['name']))
+ if jobs_glob and not matches(job["name"], jobs_glob):
+ logger.debug("Ignoring job {0}".format(job["name"]))
continue
- logger.debug("Expanding job '{0}'".format(job['name']))
+ logger.debug("Expanding job '{0}'".format(job["name"]))
self._formatDescription(job)
self.jobs.append(job)
- for view in self.data.get('view', {}).values():
- view['name'] = self._getfullname(view)
+ for view in self.data.get("view", {}).values():
+ view["name"] = self._getfullname(view)
- if jobs_glob and not matches(view['name'], jobs_glob):
- logger.debug("Ignoring view {0}".format(view['name']))
+ if jobs_glob and not matches(view["name"], jobs_glob):
+ logger.debug("Ignoring view {0}".format(view["name"]))
continue
- logger.debug("Expanding view '{0}'".format(view['name']))
+ logger.debug("Expanding view '{0}'".format(view["name"]))
self._formatDescription(view)
self.views.append(view)
- for project in self.data.get('project', {}).values():
- logger.debug("Expanding project '{0}'".format(project['name']))
+ for project in self.data.get("project", {}).values():
+ logger.debug("Expanding project '{0}'".format(project["name"]))
# use a set to check for duplicate job references in projects
seen = set()
- for jobspec in project.get('jobs', []):
+ for jobspec in project.get("jobs", []):
if isinstance(jobspec, dict):
# Singleton dict containing dict of job-specific params
jobname, jobparams = next(iter(jobspec.items()))
@@ -275,18 +281,20 @@ class YamlParser(object):
if job:
# Just naming an existing defined job
if jobname in seen:
- self._handle_dups("Duplicate job '{0}' specified "
- "for project '{1}'"
- .format(jobname, project['name']))
+ self._handle_dups(
+ "Duplicate job '{0}' specified "
+ "for project '{1}'".format(jobname, project["name"])
+ )
seen.add(jobname)
continue
# see if it's a job group
group = self._getJobGroup(jobname)
if group:
- for group_jobspec in group['jobs']:
+ for group_jobspec in group["jobs"]:
if isinstance(group_jobspec, dict):
- group_jobname, group_jobparams = \
- next(iter(group_jobspec.items()))
+ group_jobname, group_jobparams = next(
+ iter(group_jobspec.items())
+ )
if not isinstance(group_jobparams, dict):
group_jobparams = {}
else:
@@ -297,8 +305,10 @@ class YamlParser(object):
if group_jobname in seen:
self._handle_dups(
"Duplicate job '{0}' specified for "
- "project '{1}'".format(group_jobname,
- project['name']))
+ "project '{1}'".format(
+ group_jobname, project["name"]
+ )
+ )
seen.add(group_jobname)
continue
template = self._getJobTemplate(group_jobname)
@@ -308,10 +318,9 @@ class YamlParser(object):
d.update(group)
d.update(group_jobparams)
# Except name, since the group's name is not useful
- d['name'] = project['name']
+ d["name"] = project["name"]
if template:
- self._expandYamlForTemplateJob(d, template,
- jobs_glob)
+ self._expandYamlForTemplateJob(d, template, jobs_glob)
continue
# see if it's a template
template = self._getJobTemplate(jobname)
@@ -320,11 +329,12 @@ class YamlParser(object):
d.update(jobparams)
self._expandYamlForTemplateJob(d, template, jobs_glob)
else:
- raise JenkinsJobsException("Failed to find suitable "
- "template named '{0}'"
- .format(jobname))
+ raise JenkinsJobsException(
+ "Failed to find suitable "
+ "template named '{0}'".format(jobname)
+ )
- for viewspec in project.get('views', []):
+ for viewspec in project.get("views", []):
if isinstance(viewspec, dict):
# Singleton dict containing dict of view-specific params
viewname, viewparams = next(iter(viewspec.items()))
@@ -337,18 +347,20 @@ class YamlParser(object):
if view:
# Just naming an existing defined view
if viewname in seen:
- self._handle_dups("Duplicate view '{0}' specified "
- "for project '{1}'"
- .format(viewname, project['name']))
+ self._handle_dups(
+ "Duplicate view '{0}' specified "
+ "for project '{1}'".format(viewname, project["name"])
+ )
seen.add(viewname)
continue
# see if it's a view group
group = self._getViewGroup(viewname)
if group:
- for group_viewspec in group['views']:
+ for group_viewspec in group["views"]:
if isinstance(group_viewspec, dict):
- group_viewname, group_viewparams = \
- next(iter(group_viewspec.items()))
+ group_viewname, group_viewparams = next(
+ iter(group_viewspec.items())
+ )
if not isinstance(group_viewparams, dict):
group_viewparams = {}
else:
@@ -359,8 +371,10 @@ class YamlParser(object):
if group_viewname in seen:
self._handle_dups(
"Duplicate view '{0}' specified for "
- "project '{1}'".format(group_viewname,
- project['name']))
+ "project '{1}'".format(
+ group_viewname, project["name"]
+ )
+ )
seen.add(group_viewname)
continue
template = self._getViewTemplate(group_viewname)
@@ -370,10 +384,9 @@ class YamlParser(object):
d.update(group)
d.update(group_viewparams)
# Except name, since the group's name is not useful
- d['name'] = project['name']
+ d["name"] = project["name"]
if template:
- self._expandYamlForTemplateView(
- d, template, jobs_glob)
+ self._expandYamlForTemplateView(d, template, jobs_glob)
continue
# see if it's a template
template = self._getViewTemplate(viewname)
@@ -382,41 +395,46 @@ class YamlParser(object):
d.update(viewparams)
self._expandYamlForTemplateView(d, template, jobs_glob)
else:
- raise JenkinsJobsException("Failed to find suitable "
- "template named '{0}'"
- .format(viewname))
+ raise JenkinsJobsException(
+ "Failed to find suitable "
+ "template named '{0}'".format(viewname)
+ )
# check for duplicate generated jobs
seen = set()
# walk the list in reverse so that last definition wins
for job in self.jobs[::-1]:
- if job['name'] in seen:
- self._handle_dups("Duplicate definitions for job '{0}' "
- "specified".format(job['name']))
+ if job["name"] in seen:
+ self._handle_dups(
+ "Duplicate definitions for job '{0}' "
+ "specified".format(job["name"])
+ )
self.jobs.remove(job)
- seen.add(job['name'])
+ seen.add(job["name"])
# check for duplicate generated views
seen_views = set()
# walk the list in reverse so that last definition wins
for view in self.views[::-1]:
- if view['name'] in seen_views:
- self._handle_dups("Duplicate definitions for view '{0}' "
- "specified".format(view['name']))
+ if view["name"] in seen_views:
+ self._handle_dups(
+ "Duplicate definitions for view '{0}' "
+ "specified".format(view["name"])
+ )
self.views.remove(view)
- seen_views.add(view['name'])
+ seen_views.add(view["name"])
return self.jobs, self.views
def _expandYamlForTemplateJob(self, project, template, jobs_glob=None):
dimensions = []
- template_name = template['name']
+ template_name = template["name"]
# reject keys that are not useful during yaml expansion
- for k in ['jobs']:
+ for k in ["jobs"]:
project.pop(k)
- excludes = project.pop('exclude', [])
+ excludes = project.pop("exclude", [])
for (k, v) in project.items():
- tmpk = '{{{0}}}'.format(k)
+ tmpk = "{{{0}}}".format(k)
if tmpk not in template_name:
continue
if type(v) == list:
@@ -429,7 +447,7 @@ class YamlParser(object):
for values in itertools.product(*dimensions):
params = copy.deepcopy(project)
params = self._applyDefaults(params, template)
- params['template-name'] = re.sub(r'({|})', r'\1\1', template_name)
+ params["template-name"] = re.sub(r"({|})", r"\1\1", template_name)
try:
expanded_values = {}
@@ -441,29 +459,32 @@ class YamlParser(object):
else:
expanded_values[k] = v
except TypeError:
- project_name = project.pop('name')
+ project_name = project.pop("name")
logger.error(
"Exception thrown while expanding template '%s' for "
"project '%s', with expansion arguments of:\n%s\n"
"Original project input variables for template:\n%s\n"
"Most likely the inputs have items indented incorrectly "
"to describe how they should be applied.\n\nNote yaml "
- "'null' is mapped to python's 'None'", template_name,
+ "'null' is mapped to python's 'None'",
+ template_name,
project_name,
- "".join(local_yaml.dump({k: v}, default_flow_style=False)
- for (k, v) in values),
- local_yaml.dump(project, default_flow_style=False))
+ "".join(
+ local_yaml.dump({k: v}, default_flow_style=False)
+ for (k, v) in values
+ ),
+ local_yaml.dump(project, default_flow_style=False),
+ )
raise
params.update(expanded_values)
try:
params = deep_format(params, params)
except Exception:
- logging.error(
- "Failure formatting params '%s' with itself", params)
+ logging.error("Failure formatting params '%s' with itself", params)
raise
if combination_matches(params, excludes):
- logger.debug('Excluding combination %s', str(params))
+ logger.debug("Excluding combination %s", str(params))
continue
for key in template.keys():
@@ -472,16 +493,22 @@ class YamlParser(object):
try:
expanded = deep_format(
- template, params,
- self.jjb_config.yamlparser['allow_empty_variables'])
+ template,
+ params,
+ self.jjb_config.yamlparser["allow_empty_variables"],
+ )
except Exception:
logging.error(
"Failure formatting template '%s', containing '%s' with "
- "params '%s'", template_name, template, params)
+ "params '%s'",
+ template_name,
+ template,
+ params,
+ )
raise
- expanded['name'] = self._getfullname(expanded)
+ expanded["name"] = self._getfullname(expanded)
- job_name = expanded.get('name')
+ job_name = expanded.get("name")
if jobs_glob and not matches(job_name, jobs_glob):
continue
@@ -495,29 +522,29 @@ class YamlParser(object):
# Views related
def _getView(self, name):
- view = self.data.get('view', {}).get(name, None)
+ view = self.data.get("view", {}).get(name, None)
if not view:
return view
return self._applyDefaults(view)
def _getViewGroup(self, name):
- return self.data.get('view-group', {}).get(name, None)
+ return self.data.get("view-group", {}).get(name, None)
def _getViewTemplate(self, name):
- view = self.data.get('view-template', {}).get(name, None)
+ view = self.data.get("view-template", {}).get(name, None)
if not view:
return view
return self._applyDefaults(view)
def _expandYamlForTemplateView(self, project, template, views_glob=None):
dimensions = []
- template_name = template['name']
+ template_name = template["name"]
# reject keys that are not useful during yaml expansion
- for k in ['views']:
+ for k in ["views"]:
project.pop(k)
- excludes = project.pop('exclude', [])
+ excludes = project.pop("exclude", [])
for (k, v) in project.items():
- tmpk = '{{{0}}}'.format(k)
+ tmpk = "{{{0}}}".format(k)
if tmpk not in template_name:
continue
if type(v) == list:
@@ -543,19 +570,19 @@ class YamlParser(object):
params.update(expanded_values)
params = deep_format(params, params)
if combination_matches(params, excludes):
- logger.debug('Excluding combination %s', str(params))
+ logger.debug("Excluding combination %s", str(params))
continue
for key in template.keys():
if key not in params:
params[key] = template[key]
- params['template-name'] = template_name
+ params["template-name"] = template_name
expanded = deep_format(
- template, params,
- self.jjb_config.yamlparser['allow_empty_variables'])
+ template, params, self.jjb_config.yamlparser["allow_empty_variables"]
+ )
- view_name = expanded.get('name')
+ view_name = expanded.get("name")
if views_glob and not matches(view_name, views_glob):
continue