# This file is part of TALER # (C) 2016 INRIA # # TALER is free software; you can redistribute it and/or modify it under the # terms of the GNU Affero General Public License as published by the Free Software # Foundation; either version 3, or (at your option) any later version. # # TALER is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along with # TALER; see the file COPYING. If not, see # # @author Florian Dold """ Parse GNUnet-style configurations in pure Python """ # FIXME: make sure that autovivification of config entries # does not leave garbage behind (use weakrefs!) import logging import collections import os import weakref logger = logging.getLogger(__name__) __all__ = ["TalerConfig"] taler_datadir = None try: # not clear if this is a good idea ... from talerpaths import taler_datadir as t taler_datadir = t except ImportError: pass class ConfigurationError(Exception): pass class ExpansionSyntaxError(Exception): pass def expand(s, getter): """ Do shell-style parameter expansion. Supported syntax: - ${X} - ${X:-Y} - $X """ pos = 0 result = "" while pos != -1: start = s.find("$", pos) if start == -1: break if s[start:].startswith("${"): balance = 1 end = start + 2 while balance > 0 and end < len(s): balance += {"{": 1, "}": -1}.get(s[end], 0) end += 1 if balance != 0: raise ExpansionSyntaxError("unbalanced parentheses") piece = s[start+2:end-1] if piece.find(":-") > 0: varname, alt = piece.split(":-", 1) replace = getter(varname) if replace is None: replace = expand(alt, getter) else: varname = piece replace = getter(varname) if replace is None: replace = s[start:end] else: end = start + 2 while end < len(s) and s[start+1:end+1].isalnum(): end += 1 varname = s[start+1:end] replace = getter(varname) if replace is None: replace = s[start:end] result = result + replace pos = end return result + s[pos:] class OptionDict(collections.defaultdict): def __init__(self, config, section_name): self.config = weakref.ref(config) self.section_name = section_name super().__init__() def __missing__(self, key): e = Entry(self.config(), self.section_name, key) self[key] = e return e def __getitem__(self, slice): return super().__getitem__(slice.lower()) def __setitem__(self, slice, value): super().__setitem__(slice.lower(), value) class SectionDict(collections.defaultdict): def __init__(self): super().__init__() def __missing__(self, key): v = OptionDict(self, key) self[key] = v return v def __getitem__(self, slice): return super().__getitem__(slice.lower()) def __setitem__(self, slice, value): super().__setitem__(slice.lower(), value) class Entry: def __init__(self, config, section, option, value=None, filename=None, lineno=None): self.value = value self.filename = filename self.lineno = lineno self.section = section self.option = option self.config = weakref.ref(config) def __repr__(self): return "" % (self.section, self.option, repr(self.value),) def __str__(self): return self.value def value_string(self, default=None, warn=False, required=False): if required and self.value is None: raise ConfigurationError("Missing required option '%s' in section '%s'" % (self.option.upper(), self.section.upper())) if self.value is None: if warn: if default is not None: logger.warn("Configuration is missing option '%s' in section '%s', falling back to '%s'", self.option, self.section, default) else: logger.warn("Configuration is missing option '%s' in section '%s'", self.option.upper(), self.section.upper()) return default return self.value def value_int(self, default=None, warn=False, required=False): v = self.value_string(default, warn, required) if v is None: return None try: return int(v) except ValueError: raise ConfigurationError("Expected number for option '%s' in section '%s'" % (self.option.upper(), self.section.upper())) def _getsubst(self, key): x = self.config()["paths"][key].value if x is not None: return x x = os.environ.get(key) if x is not None: return x return None def value_filename(self, default=None, warn=False, required=False): v = self.value_string(default, warn, required) if v is None: return None return expand(v, lambda x: self._getsubst(x)) def location(self): if self.filename is None or self.lineno is None: return "" return "%s:%s" % (self.filename, self.lineno) class TalerConfig: """ One loaded taler configuration, including base configuration files and included files. """ def __init__(self): """ Initialize an empty configuration """ self.sections = SectionDict() @staticmethod def from_file(filename=None, load_defaults=True): cfg = TalerConfig() if filename is None: xdg = os.environ.get("XDG_CONFIG_HOME") if xdg: filename = os.path.join(xdg, "taler.conf") else: filename = os.path.expanduser("~/.config/taler.conf") if load_defaults: cfg.load_defaults() cfg.load_file(filename) return cfg def value_string(self, section, option, default=None, required=None, warn=False): return self.sections[section][option].value_string(default, required, warn) def value_filename(self, section, option, default=None, required=None, warn=False): return self.sections[section][option].value_filename(default, required, warn) def value_int(self, section, option, default=None, required=None, warn=False): return self.sections[section][option].value_int(default, required, warn) def load_defaults(self): base_dir = os.environ.get("TALER_BASE_CONFIG") if base_dir: self.load_dir(base_dir) return prefix = os.environ.get("TALER_PREFIX") if prefix: self.load_dir(os.path.join(prefix, "share/taler/config.d")) return if taler_datadir: self.load_dir(os.path.join(taler_datadir, "share/taler/config.d")) return logger.warn("no base directory found") @staticmethod def from_env(*args, **kwargs): """ Load configuration from environment variable TALER_CONFIG_FILE or from default location if the variable is not set. """ filename = os.environ.get("TALER_CONFIG_FILE") return TalerConfig.from_file(filename, *args, **kwargs) def load_dir(self, dirname): try: files = os.listdir(dirname) except FileNotFoundError: logger.warn("can't read config directory '%s'", dirname) return for file in files: if not file.endswith(".conf"): continue self.load_file(os.path.join(dirname, file)) def load_file(self, filename): sections = self.sections with open(filename, "r") as file: lineno = 0 current_section = None for line in file: lineno += 1 line = line.strip() if len(line) == 0: # empty line continue if line.startswith("#"): # comment continue if line.startswith("["): if not line.endswith("]"): logger.error("invalid section header in line %s: %s", lineno, repr(line)) section_name = line.strip("[]").strip().strip('"') current_section = section_name continue if current_section is None: logger.error("option outside of section in line %s: %s", lineno, repr(line)) continue kv = line.split("=", 1) if len(kv) != 2: logger.error("invalid option in line %s: %s", lineno, repr(line)) key = kv[0].strip() value = kv[1].strip() if value.startswith('"'): value = value[1:] if not value.endswith('"'): logger.error("mismatched quotes in line %s: %s", lineno, repr(line)) else: value = value[:-1] e = Entry(self.sections, current_section, key, value, filename, lineno) sections[current_section][key] = e def dump(self): for section_name, section in self.sections.items(): print("[%s]" % (section.section_name,)) for option_name, e in section.items(): print("%s = %s # %s" % (e.option, e.value, e.location())) def __getitem__(self, slice): if isinstance(slice, str): return self.sections[slice] raise TypeError("index must be string") if __name__ == "__main__": import sys import argparse parser = argparse.ArgumentParser() parser.add_argument("--section", "-s", dest="section", default=None, metavar="SECTION") parser.add_argument("--option", "-o", dest="option", default=None, metavar="OPTION") parser.add_argument("--config", "-c", dest="config", default=None, metavar="FILE") parser.add_argument("--filename", "-f", dest="expand_filename", default=False, action='store_true') args = parser.parse_args() tc = TalerConfig.from_file(args.config) if args.section is not None and args.option is not None: if args.expand_filename: x = tc.value_filename(args.section, args.option) else: x = tc.value_string(args.section, args.option) if x is not None: print(x) else: tc.dump()