summaryrefslogtreecommitdiffstats
path: root/roles/fedmsg
diff options
context:
space:
mode:
authorRicky Elrod <codeblock@fedoraproject.org>2017-08-10 15:49:30 +0000
committerRicky Elrod <codeblock@fedoraproject.org>2017-08-10 15:49:30 +0000
commitc421c8b4a19f6f298c8464091b91cf84b3395886 (patch)
treeff303885f3b6ce62b1483fe3074e826568c4a981 /roles/fedmsg
parent6832c75c58356d101c64eb9f232e5a204f3ec65c (diff)
downloadansible-c421c8b4a19f6f298c8464091b91cf84b3395886.tar.gz
ansible-c421c8b4a19f6f298c8464091b91cf84b3395886.tar.xz
ansible-c421c8b4a19f6f298c8464091b91cf84b3395886.zip
nuke old fedmsg hotfix
Signed-off-by: Ricky Elrod <codeblock@fedoraproject.org>
Diffstat (limited to 'roles/fedmsg')
-rw-r--r--roles/fedmsg/hotfix/files/consumers-init.py295
-rw-r--r--roles/fedmsg/hotfix/tasks/main.yml9
2 files changed, 0 insertions, 304 deletions
diff --git a/roles/fedmsg/hotfix/files/consumers-init.py b/roles/fedmsg/hotfix/files/consumers-init.py
deleted file mode 100644
index 2e679f015..000000000
--- a/roles/fedmsg/hotfix/files/consumers-init.py
+++ /dev/null
@@ -1,295 +0,0 @@
-# This file is part of fedmsg.
-# Copyright (C) 2012 - 2014 Red Hat, Inc.
-#
-# fedmsg is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# fedmsg is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with fedmsg; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Ralph Bean <rbean@redhat.com>
-#
-
-import inspect
-import json
-import logging
-import os
-import psutil
-import requests
-import threading
-import time
-
-import moksha.hub.api.consumer
-
-import fedmsg.crypto
-import fedmsg.encoding
-from fedmsg.replay import check_for_replay
-
-
-class FedmsgConsumer(moksha.hub.api.consumer.Consumer):
- """ Base class for fedmsg consumers.
-
- The fedmsg consumption API is really just a thin wrapper over moksha.
- Moksha expects consumers to:
-
- * Declare themselves on the moksha.consumers python entry-point.
- * Declare a ``consume(...)`` method.
- * Specify a ``topic``.
-
- All this class does in addition to moksha is:
-
- * Provide a mechanism for disabling/enabling consumers by configuration
- in a consistent way (namely, by use of ``config_key``).
-
- If you set ``validate_signatures = False`` on your consumer, it will
- be exempt from global validation rules. Messages will not be
- checked for authenticity before being handed off to your consume
- method. This is handy if you're developing or building a
- special-case consumer. The consumer used by ``fedmsg-relay``
- (described in :doc:`commands`) sets ``validate_signatures = False``
- so that it can transparently forward along everything and let the
- terminal endpoints decide whether or not to consume particular
- messages.
-
- * Provide a mechanism for automatically validating fedmsg messages
- with :mod:`fedmsg.crypto`.
-
- * Provide a mechanism to play back messages that haven't been received
- by the hub even though emitted. To make use of this feature, you have
- to set ``replay_name`` to some string corresponding to an endpoint in
- the ``replay_endpoints`` dict in the configuration.
-
- You must set ``config_key`` to some string. A config value by
- that name must be True in the config parsed by :mod:`fedmsg.config`
- in order for the consumer to be activated.
- """
-
- validate_signatures = None
- config_key = None
-
- def __init__(self, hub):
- module = inspect.getmodule(self).__name__
- name = self.__class__.__name__
- self.log = logging.getLogger("fedmsg")
-
- if not self.config_key:
- raise ValueError("%s:%s must declare a 'config_key'" % (
- module, name))
-
- self.log.debug("%s is %r" % (
- self.config_key, hub.config.get(self.config_key)
- ))
-
- if not hub.config.get(self.config_key, False):
- self.log.info('* disabled by config - %s:%s' % (module, name))
- return
-
- self.log.info(' enabled by config - %s:%s' % (module, name))
-
- # This call "completes" registration of this consumer with the hub.
- super(FedmsgConsumer, self).__init__(hub)
-
- # Now, re-get our logger to override the one moksha assigns us.
- self.log = logging.getLogger("fedmsg")
-
- if self.validate_signatures is None:
- self.validate_signatures = self.hub.config['validate_signatures']
-
- if hasattr(self, "replay_name"):
- self.name_to_seq_id = {}
- if self.replay_name in self.hub.config.get("replay_endpoints", {}):
- self.name_to_seq_id[self.replay_name] = -1
-
- # Check if we have a status file to see if we have a backlog or not.
- # Create its directory if it doesn't exist.
- self.status_directory = self.hub.config.get('status_directory')
- self.status_filename, self.status_lock = None, None
- if self.status_directory:
-
- # Extract proc name and handle differences between py2.6 and py2.7
- proc_name = current_proc().name
- if callable(proc_name):
- proc_name = proc_name()
-
- self.status_filename = os.path.join(
- self.status_directory, proc_name, type(self).__name__)
-
- topmost_directory, _ = self.status_filename.rsplit('/', 1)
- if not os.path.exists(topmost_directory):
- os.makedirs(topmost_directory)
-
- self.datagrepper_url = self.hub.config.get('datagrepper_url')
- if self.status_filename and self.datagrepper_url:
- # First, try to read in the status from a previous run and fire off
- # a thread to set up our workload.
- self.log.info("Backlog handling setup. status: %r, url: %r" % (
- self.status_filename, self.datagrepper_url))
- self.status_lock = threading.Lock()
- try:
- with self.status_lock:
- with open(self.status_filename, 'r') as f:
- data = f.read()
- moksha.hub.reactor.reactor.callInThread(self._backlog, data)
- except IOError as e:
- self.log.info(e)
- else:
- self.log.info("No backlog handling. status: %r, url: %r" % (
- self.status_filename, self.datagrepper_url))
-
- def _backlog(self, data):
- """Find all the datagrepper messages between 'then' and 'now'.
-
- Put those on our work queue.
-
- Should be called in a thread so as not to block the hub at startup.
- """
-
- try:
- data = json.loads(data)
- except ValueError as e:
- self.log.info("Status contents are %r" % data)
- self.log.exception(e)
- self.log.info("Skipping backlog retrieval.")
- return
-
- last = data['message']['body']
- if isinstance(last, str):
- last = json.loads(last)
-
- then = last['timestamp']
- now = int(time.time())
-
- retrieved = 0
- for message in self.get_datagrepper_results(then, now):
-
- # Take the messages from datagrepper and remove any keys that were
- # artificially added to the message. The presence of these would
- # otherwise cause message crypto validation to fail.
- for artificial_key in ('source_name', 'source_version'):
- if artificial_key in message:
- del message[artificial_key]
-
- # Also, we expect the timestamp to be an 'int'
- message['timestamp'] = int(message['timestamp'])
-
- if message['msg_id'] != last['msg_id']:
- retrieved = retrieved + 1
- self.incoming.put(dict(body=message, topic=message['topic']))
- else:
- self.log.warning("Already seen %r; Skipping." % last['msg_id'])
-
- self.log.info("Retrieved %i messages from datagrepper." % retrieved)
-
- def get_datagrepper_results(self, then, now):
- def _make_query(page=1):
- return requests.get(self.datagrepper_url, params=dict(
- rows_per_page=100, page=page, start=then, end=now, order='asc'
- )).json()
-
- # Grab the first page of results
- data = _make_query()
-
- # Grab and smash subsequent pages if there are any
- interesting_topics = self.topic
- if not isinstance(interesting_topics, list):
- interesting_topics = [interesting_topics]
-
- for page in range(1, data['pages'] + 1):
- self.log.info("Retrieving datagrepper page %i of %i" % (
- page, data['pages']))
- data = _make_query(page=page)
-
- for message in data['raw_messages']:
- for topic in interesting_topics:
- if message['topic'].startswith(topic[:-1]):
- yield message
- break
-
- def validate(self, message):
- """ This needs to raise an exception, caught by moksha. """
- if hasattr(message, '__json__'):
- message = message.__json__()
- if isinstance(message['body'], str):
- message['body'] = json.loads(message['body'])
-
- # Massage STOMP messages into a more compatible format.
- if 'topic' not in message['body']:
- message['body'] = {
- 'topic': message.get('topic'),
- 'msg': message['body'],
- }
-
- # If we're not validating, then everything is valid.
- # If this is turned on globally, our child class can override it.
- if not self.validate_signatures:
- return
-
- # We assume these match inside fedmsg.crypto, so we should enforce it.
- if not message['topic'] == message['body']['topic']:
- raise RuntimeWarning("Topic envelope mismatch.")
-
- if not fedmsg.crypto.validate(message['body'], **self.hub.config):
- raise RuntimeWarning("Failed to authn message.")
-
- def _consume(self, message):
-
- try:
- self.validate(message)
- except RuntimeWarning as e:
- self.log.warn("Received invalid message {0}".format(e))
- return
-
- # Pass along headers if present. May be useful to filters or
- # fedmsg.meta routines.
- if 'headers' in message.__dict__ and 'body' in message.__dict__:
- message['body']['headers'] = message['headers']
-
- if hasattr(self, "replay_name"):
- for m in check_for_replay(
- self.replay_name, self.name_to_seq_id,
- message, self.hub.config):
-
- try:
- self.validate(m)
- super(FedmsgConsumer, self)._consume(m)
- except RuntimeWarning as e:
- self.log.warn("Received invalid message {}".format(e))
- else:
- super(FedmsgConsumer, self)._consume(message)
-
- def pre_consume(self, message):
- self.save_status(dict(
- message=message,
- status='pre',
- ))
-
- def post_consume(self, message):
- self.save_status(dict(
- message=message,
- status='post',
- ))
-
- def save_status(self, data):
- if self.status_filename and self.status_lock:
- with self.status_lock:
- with open(self.status_filename, 'w') as f:
- f.write(fedmsg.encoding.dumps(data))
-
-
-def current_proc():
- mypid = os.getpid()
-
- for proc in psutil.process_iter():
- if proc.pid == mypid:
- return proc
-
- # This should be impossible.
- raise ValueError("Could not find process %r" % mypid)
diff --git a/roles/fedmsg/hotfix/tasks/main.yml b/roles/fedmsg/hotfix/tasks/main.yml
deleted file mode 100644
index 6d75292fd..000000000
--- a/roles/fedmsg/hotfix/tasks/main.yml
+++ /dev/null
@@ -1,9 +0,0 @@
----
-- name: "[HOTFIX] Fix consumers/__init__.py"
- copy: src=consumers-init.py dest=/usr/lib/python2.7/site-packages/fedmsg/consumers/__init__.py
- tags:
- - fedmsg/gateway
- - hotfix
- when: inventory_hostname.startswith(('proxy11', 'proxy12', 'proxy13'))
- notify:
- - restart fedmsg-gateway