Source code for iotrelay

'''

Copyright (c) 2017, Emmanuel Levijarvi
All rights reserved.
License BSD

iotrelay.py provides an application and a framework for passing data
between data sources and handlers.
'''
import argparse
import datetime
from collections import defaultdict
import pkg_resources
import os
import sys
import signal
import threading
import logging
try:
    import configparser
except ImportError:
    import ConfigParser as configparser

logger = logging.getLogger()

DEFAULT_CONFIG = os.path.join(os.path.expanduser("~"), '.iotrelay.cfg')
GROUP = 'iotrelay'
version = "1.2.2"


class Error(Exception):
    pass


class PluginError(Error):
    pass


class ConfigParser(configparser.SafeConfigParser):
    '''ConfigParser is a subclass of the standard library's ConfigParser.

    It adds the ability to parse an option as a list. See the Python
    Standard Library Documentation for more.
    '''
    @staticmethod
    def _make_list(value, sep):
        return [v.strip() for v in value.split(sep)]

    def getlist(self, section, option, sep=','):
        '''Parse an option as a list, optionally specifying the separator
        by specifying the sep argument.
        '''
        return self._make_list(self.get(section, option), sep)


[docs]class Reading(object): '''Reading provides a container for passing a datum,or "Reading", between sources and handlers. Attributes: reading_type (str): value (str): timestamp (datetime.datetime): series_key (str): ''' def __init__(self, reading_type, value, timestamp=None, series_key=None, tags=None): self.reading_type = reading_type self.value = value self.series_key = series_key if series_key is None: self.series_key = reading_type if timestamp is None: self.timestamp = datetime.datetime.utcnow() else: self.timestamp = timestamp if tags is None: self.tags = {} else: self.tags = tags self.iteration = False def __iter__(self): return self def next(self): return self.__next__() def __next__(self): if self.iteration: raise StopIteration self.iteration = True return self def __str__(self): return "{0}: {1!s}, {2!s}".format(self.timestamp.isoformat(), self.series_key, self.value) def __repr__(self): return ("Reading({self.reading_type!r}, {self.value!r}, " "{self.timestamp!r}, {self.series_key!r})".format(self=self))
class Relay(object): def __init__(self, config): self.config = config self.stop_event = threading.Event() signal.signal(signal.SIGTERM, self.stop) signal.signal(signal.SIGINT, self.stop) self.sources = [] self.handlers = defaultdict(list) def load_plugins(self): logger.debug('IoTrelay loading plugins') for entrypoint in pkg_resources.iter_entry_points(group=GROUP): plugin_name = entrypoint.dist.project_name try: plugin_config = dict(self.config.items(plugin_name)) except configparser.NoSectionError: logging.warning('Plugin: {0} not loaded. It has not been ' 'configured.'.format(plugin_name)) continue try: tags = dict(self.config.items('{}:tags'.format(plugin_name))) except configparser.NoSectionError: pass else: plugin_config['tags'] = tags plugin = entrypoint.load()(plugin_config) if entrypoint.name == 'handler': try: reading_types = self.config.getlist(plugin_name, 'reading types') except configparser.NoOptionError: msg = "No 'reading types' specified for handler {0}" logger.warning(msg.format(plugin_name)) else: for reading_type in reading_types: self.handlers[reading_type].append(plugin) elif entrypoint.name == 'source': self.sources.append(plugin) logger.debug("Plugin: {0}.{1}, loaded".format(plugin_name, entrypoint.name)) def stop(self, signum, stack): self.stop_event.set() def run(self): while not self.stop_event.is_set(): for source in self.sources: try: readings = source.get_readings() except PluginError as e: logger.error('Unable to read from source. {0}'.format(e)) continue if readings is None: continue for reading in readings: if reading is None: e = 'None reading was received from {}'.format(source) logger.error(e) continue for handler in self.handlers.get(reading.reading_type, []): if reading.value is None: logger.warning('None value from {0}'.format( reading.series_key)) continue try: handler.set_reading(reading) except PluginError as e: logger.error('Unable to send data {0}'.format(e)) self.stop_event.wait(2) def flush_handlers(self): flushed = set() for handler_list in self.handlers.values(): for handler in handler_list: if handler in flushed: continue try: handler.flush() except AttributeError: pass flushed.add(handler) def main(): parser = argparse.ArgumentParser(description="Internet of Things Relay") parser.add_argument('-c', '--config-file', help="Configuration Filename") parser.add_argument('--log-level', help="Log Level", default='info', choices=('debug', 'info', 'warning', 'info')) args = parser.parse_args() logging.basicConfig(format='%(asctime)s %(message)s', level=args.log_level.upper()) config = ConfigParser() if args.config_file is None: config_file = DEFAULT_CONFIG else: config_file = args.config_file try: f = open(config_file, 'r') except IOError as e: logger.critical("Cannot open config file {0}. {1}.".format(e.filename, e.strerror)) sys.exit(1) with f: config.readfp(f) r = Relay(config) r.load_plugins() r.run() r.flush_handlers() if __name__ == "__main__": main()