# This file is part of TALER # (C) 2016 INRIA # # TALER is free software; you can redistribute it and/or modify it under the # terms of the GNU Affero General Public License as published by the Free Software # Foundation; either version 3, or (at your option) any later version. # # TALER is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along with # TALER; see the file COPYING. If not, see # # @author Florian Dold """ Parse GNUnet-style configurations in pure Python """ import logging import collections import os import weakref import sys import re LOGGER = logging.getLogger(__name__) __all__ = ["TalerConfig"] TALER_DATADIR = None try: # not clear if this is a good idea ... from talerpaths import TALER_DATADIR as t TALER_DATADIR = t except ImportError: pass class ConfigurationError(Exception): pass class ExpansionSyntaxError(Exception): pass def expand(var, getter): """ Do shell-style parameter expansion. Supported syntax: - ${X} - ${X:-Y} - $X """ pos = 0 result = "" while pos != -1: start = var.find("$", pos) if start == -1: break if var[start:].startswith("${"): balance = 1 end = start + 2 while balance > 0 and end < len(var): balance += {"{": 1, "}": -1}.get(var[end], 0) end += 1 if balance != 0: raise ExpansionSyntaxError("unbalanced parentheses") piece = var[start+2:end-1] if piece.find(":-") > 0: varname, alt = piece.split(":-", 1) replace = getter(varname) if replace is None: replace = expand(alt, getter) else: varname = piece replace = getter(varname) if replace is None: replace = var[start:end] else: end = start + 2 while end < len(var) and var[start+1:end+1].isalnum(): end += 1 varname = var[start+1:end] replace = getter(varname) if replace is None: replace = var[start:end] result = result + replace pos = end return result + var[pos:] class OptionDict(collections.defaultdict): def __init__(self, config, section_name): self.config = weakref.ref(config) self.section_name = section_name super().__init__() def __missing__(self, key): entry = Entry(self.config(), self.section_name, key) self[key] = entry return entry def __getitem__(self, chunk): return super().__getitem__(chunk.lower()) def __setitem__(self, chunk, value): super().__setitem__(chunk.lower(), value) class SectionDict(collections.defaultdict): def __missing__(self, key): value = OptionDict(self, key) self[key] = value return value def __getitem__(self, chunk): return super().__getitem__(chunk.lower()) def __setitem__(self, chunk, value): super().__setitem__(chunk.lower(), value) class Entry: def __init__(self, config, section, option, **kwargs): self.value = kwargs.get("value") self.filename = kwargs.get("filename") self.lineno = kwargs.get("lineno") self.section = section self.option = option self.config = weakref.ref(config) def __repr__(self): return "" \ % (self.section, self.option, repr(self.value),) def __str__(self): return self.value def value_string(self, default=None, required=False, warn=False): if required and self.value is None: raise ConfigurationError("Missing required option '%s' in section '%s'" \ % (self.option.upper(), self.section.upper())) if self.value is None: if warn: if default is not None: LOGGER.warning("Configuration is missing option '%s' in section '%s',\ falling back to '%s'", self.option, self.section, default) else: LOGGER.warning("Configuration ** is missing option '%s' in section '%s'", self.option.upper(), self.section.upper()) return default return self.value def value_int(self, default=None, required=False, warn=False): value = self.value_string(default, warn, required) if value is None: return None try: return int(value) except ValueError: raise ConfigurationError("Expected number for option '%s' in section '%s'" \ % (self.option.upper(), self.section.upper())) def _getsubst(self, key): value = self.config()["paths"][key].value if value is not None: return value value = os.environ.get(key) if value is not None: return value return None def value_filename(self, default=None, required=False, warn=False): value = self.value_string(default, required, warn) if value is None: return None return expand(value, self._getsubst) def location(self): if self.filename is None or self.lineno is None: return "" return "%s:%s" % (self.filename, self.lineno) class TalerConfig: """ One loaded taler configuration, including base configuration files and included files. """ def __init__(self): """ Initialize an empty configuration """ self.sections = SectionDict() # defaults != config file: the first is the 'base' # whereas the second overrides things from the first. @staticmethod def from_file(filename=None, load_defaults=True): cfg = TalerConfig() if filename is None: xdg = os.environ.get("XDG_CONFIG_HOME") if xdg: filename = os.path.join(xdg, "taler.conf") else: filename = os.path.expanduser("~/.config/taler.conf") if load_defaults: cfg.load_defaults() cfg.load_file(filename) return cfg def value_string(self, section, option, **kwargs): return self.sections[section][option].value_string( kwargs.get("default"), kwargs.get("required"), kwargs.get("warn")) def value_filename(self, section, option, **kwargs): return self.sections[section][option].value_filename( kwargs.get("default"), kwargs.get("required"), kwargs.get("warn")) def value_int(self, section, option, **kwargs): return self.sections[section][option].value_int( kwargs.get("default"), kwargs.get("required"), kwargs.get("warn")) def load_defaults(self): base_dir = os.environ.get("TALER_BASE_CONFIG") if base_dir: self.load_dir(base_dir) return prefix = os.environ.get("TALER_PREFIX") if prefix: tmp = os.path.split(os.path.normpath(prefix)) if re.match("lib", tmp[1]): prefix = tmp[0] self.load_dir(os.path.join(prefix, "share/taler/config.d")) return if TALER_DATADIR: self.load_dir(os.path.join(TALER_DATADIR, "share/taler/config.d")) return LOGGER.warning("no base directory found") @staticmethod def from_env(*args, **kwargs): """ Load configuration from environment variable TALER_CONFIG_FILE or from default location if the variable is not set. """ filename = os.environ.get("TALER_CONFIG_FILE") return TalerConfig.from_file(filename, *args, **kwargs) def load_dir(self, dirname): try: files = os.listdir(dirname) except FileNotFoundError: LOGGER.warning("can't read config directory '%s'", dirname) return for file in files: if not file.endswith(".conf"): continue self.load_file(os.path.join(dirname, file)) def load_file(self, filename): sections = self.sections try: with open(filename, "r") as file: lineno = 0 current_section = None for line in file: lineno += 1 line = line.strip() if line == "": # empty line continue if line.startswith("#"): # comment continue if line.startswith("["): if not line.endswith("]"): LOGGER.error("invalid section header in line %s: %s", lineno, repr(line)) section_name = line.strip("[]").strip().strip('"') current_section = section_name continue if current_section is None: LOGGER.error("option outside of section in line %s: %s", lineno, repr(line)) continue pair = line.split("=", 1) if len(pair) != 2: LOGGER.error("invalid option in line %s: %s", lineno, repr(line)) key = pair[0].strip() value = pair[1].strip() if value.startswith('"'): value = value[1:] if not value.endswith('"'): LOGGER.error("mismatched quotes in line %s: %s", lineno, repr(line)) else: value = value[:-1] entry = Entry(self.sections, current_section, key, value=value, filename=filename, lineno=lineno) sections[current_section][key] = entry except FileNotFoundError: LOGGER.error("Configuration file (%s) not found", filename) sys.exit(3) def dump(self): for kv_section in self.sections.items(): print("[%s]" % (kv_section[1].section_name,)) for kv_option in kv_section[1].items(): print("%s = %s # %s" % \ (kv_option[1].option, kv_option[1].value, kv_option[1].location())) def __getitem__(self, chunk): if isinstance(chunk, str): return self.sections[chunk] raise TypeError("index must be string") if __name__ == "__main__": import argparse PARSER = argparse.ArgumentParser() PARSER.add_argument("--section", "-s", dest="section", default=None, metavar="SECTION") PARSER.add_argument("--option", "-o", dest="option", default=None, metavar="OPTION") PARSER.add_argument("--config", "-c", dest="config", default=None, metavar="FILE") PARSER.add_argument("--filename", "-f", dest="expand_filename", default=False, action='store_true') ARGS = PARSER.parse_args() TC = TalerConfig.from_file(ARGS.config) if ARGS.section is not None and ARGS.option is not None: if ARGS.expand_filename: X = TC.value_filename(ARGS.section, ARGS.option) else: X = TC.value_string(ARGS.section, ARGS.option) if X is not None: print(X) else: TC.dump()