1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
# -*- coding: utf-8 -*-
from datetime import datetime
import requests
import fedmsg.consumers
import fedfind.release
from sqlalchemy import exc
import autocloud
from autocloud.models import init_model, ComposeDetails, ComposeJobDetails
from autocloud.producer import publish_to_fedmsg
from autocloud.utils import is_valid_image, produce_jobs
import logging
log = logging.getLogger("fedmsg")
DEBUG = autocloud.DEBUG
class AutoCloudConsumer(fedmsg.consumers.FedmsgConsumer):
"""
Fedmsg consumer for Autocloud
"""
if DEBUG:
topic = [
'org.fedoraproject.dev.__main__.pungi.compose.status.change'
]
else:
topic = [
'org.fedoraproject.prod.pungi.compose.status.change'
]
config_key = 'autocloud.consumer.enabled'
def __init__(self, *args, **kwargs):
self.supported_archs = [arch for arch, _ in ComposeJobDetails.ARCH_TYPES]
log.info("Autocloud Consumer is ready for action.")
super(AutoCloudConsumer, self).__init__(*args, **kwargs)
def consume(self, msg):
""" This is called when we receive a message matching the topic. """
log.info('Received %r %r' % (msg['topic'], msg['body']['msg_id']))
STATUS_F = ('FINISHED_INCOMPLETE', 'FINISHED',)
VARIANTS_F = ('CloudImages',)
images = []
compose_db_update = False
msg_body = msg['body']
status = msg_body['msg']['status']
compose_images_json = None
if status in STATUS_F:
location = msg_body['msg']['location']
json_metadata = '{}/metadata/images.json'.format(location)
resp = requests.get(json_metadata)
compose_images_json = getattr(resp, 'json', False)
if compose_images_json is not None:
compose_images_json = compose_images_json()
compose_images = compose_images_json['payload']['images']
compose_details = compose_images_json['payload']['compose']
compose_images = dict((variant, compose_images[variant])
for variant in VARIANTS_F
if variant in compose_images)
compose_id = compose_details['id']
rel = fedfind.release.get_release(cid=compose_id)
release = rel.release
compose_details.update({'release': release})
compose_images_variants = [variant for variant in VARIANTS_F
if variant in compose_images]
for variant in compose_images_variants:
compose_image = compose_images[variant]
for arch, payload in compose_image.iteritems():
if arch not in self.supported_archs:
continue
for item in payload:
relative_path = item['path']
if not is_valid_image(relative_path):
continue
absolute_path = '{}/{}'.format(location, relative_path)
item.update({
'compose': compose_details,
'absolute_path': absolute_path,
})
images.append(item)
compose_db_update = True
if compose_db_update:
session = init_model()
compose_date = datetime.strptime(compose_details['date'], '%Y%m%d')
try:
cd = ComposeDetails(
date=compose_date,
compose_id=compose_details['id'],
respin=compose_details['respin'],
type=compose_details['type'],
status=u'q',
location=location,
)
session.add(cd)
session.commit()
compose_details.update({
'status': 'queued',
'compose_job_id': cd.id,
})
publish_to_fedmsg(topic='compose.queued',
**compose_details)
except exc.IntegrityError:
session.rollback()
cd = session.query(ComposeDetails).filter_by(
compose_id=compose_details['id']).first()
log.info('Compose already exists %s: %s' % (
compose_details['id'],
cd.id
))
session.close()
num_images = len(images)
for pos, image in enumerate(images):
image.update({'pos': (pos+1, num_images)})
produce_jobs(images)
|