# This file is part of TALER
# (C) 2016 INRIA
#
# TALER is free software; you can redistribute it and/or modify it under the
# terms of the GNU Affero General Public License as published by the Free Software
# Foundation; either version 3, or (at your option) any later version.
#
# TALER is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# TALER; see the file COPYING. If not, see
#
# @author Florian Dold
"""
Parse GNUnet-style configurations in pure Python
"""
# FIXME: make sure that autovivification of config entries
# does not leave garbage behind (use weakrefs!)
import logging
import collections
import os
import weakref
logger = logging.getLogger(__name__)
__all__ = ["TalerConfig"]
taler_datadir = None
try:
# not clear if this is a good idea ...
from talerpaths import taler_datadir as t
taler_datadir = t
except ImportError:
pass
class ConfigurationError(Exception):
pass
class ExpansionSyntaxError(Exception):
pass
def expand(s, getter):
"""
Do shell-style parameter expansion.
Supported syntax:
- ${X}
- ${X:-Y}
- $X
"""
pos = 0
result = ""
while pos != -1:
start = s.find("$", pos)
if start == -1:
break
if s[start:].startswith("${"):
balance = 1
end = start + 2
while balance > 0 and end < len(s):
balance += {"{": 1, "}": -1}.get(s[end], 0)
end += 1
if balance != 0:
raise ExpansionSyntaxError("unbalanced parentheses")
piece = s[start+2:end-1]
if piece.find(":-") > 0:
varname, alt = piece.split(":-", 1)
replace = getter(varname)
if replace is None:
replace = expand(alt, getter)
else:
varname = piece
replace = getter(varname)
if replace is None:
replace = s[start:end]
else:
end = start + 2
while end < len(s) and s[start+1:end+1].isalnum():
end += 1
varname = s[start+1:end]
replace = getter(varname)
if replace is None:
replace = s[start:end]
result = result + replace
pos = end
return result + s[pos:]
class OptionDict(collections.defaultdict):
def __init__(self, config, section_name):
self.config = weakref.ref(config)
self.section_name = section_name
super().__init__()
def __missing__(self, key):
e = Entry(self.config(), self.section_name, key)
self[key] = e
return e
def __getitem__(self, slice):
return super().__getitem__(slice.lower())
def __setitem__(self, slice, value):
super().__setitem__(slice.lower(), value)
class SectionDict(collections.defaultdict):
def __init__(self):
super().__init__()
def __missing__(self, key):
v = OptionDict(self, key)
self[key] = v
return v
def __getitem__(self, slice):
return super().__getitem__(slice.lower())
def __setitem__(self, slice, value):
super().__setitem__(slice.lower(), value)
class Entry:
def __init__(self, config, section, option, value=None, filename=None, lineno=None):
self.value = value
self.filename = filename
self.lineno = lineno
self.section = section
self.option = option
self.config = weakref.ref(config)
def __repr__(self):
return "" % (self.section, self.option, repr(self.value),)
def __str__(self):
return self.value
def value_string(self, default=None, warn=False, required=False):
if required and self.value is None:
raise ConfigurationError("Missing required option '%s' in section '%s'" % (self.option.upper(), self.section.upper()))
if self.value is None:
if warn:
if default is not None:
logger.warn("Configuration is missing option '%s' in section '%s', falling back to '%s'",
self.option, self.section, default)
else:
logger.warn("Configuration is missing option '%s' in section '%s'", self.option.upper(), self.section.upper())
return default
return self.value
def value_int(self, default=None, warn=False, required=False):
v = self.value_string(default, warn, required)
if v is None:
return None
try:
return int(v)
except ValueError:
raise ConfigurationError("Expected number for option '%s' in section '%s'" % (self.option.upper(), self.section.upper()))
def _getsubst(self, key):
x = self.config()["paths"][key].value
if x is not None:
return x
x = os.environ.get(key)
if x is not None:
return x
return None
def value_filename(self, default=None, warn=False, required=False):
v = self.value_string(default, warn, required)
if v is None:
return None
return expand(v, lambda x: self._getsubst(x))
def location(self):
if self.filename is None or self.lineno is None:
return ""
return "%s:%s" % (self.filename, self.lineno)
class TalerConfig:
"""
One loaded taler configuration, including base configuration
files and included files.
"""
def __init__(self):
"""
Initialize an empty configuration
"""
self.sections = SectionDict()
@staticmethod
def from_file(filename=None, load_defaults=True):
cfg = TalerConfig()
if filename is None:
xdg = os.environ.get("XDG_CONFIG_HOME")
if xdg:
filename = os.path.join(xdg, "taler.conf")
else:
filename = os.path.expanduser("~/.config/taler.conf")
if load_defaults:
cfg.load_defaults()
cfg.load_file(filename)
return cfg
def value_string(self, section, option, default=None, required=None, warn=False):
return self.sections[section][option].value_string(default, required, warn)
def value_filename(self, section, option, default=None, required=None, warn=False):
return self.sections[section][option].value_filename(default, required, warn)
def value_int(self, section, option, default=None, required=None, warn=False):
return self.sections[section][option].value_int(default, required, warn)
def load_defaults(self):
base_dir = os.environ.get("TALER_BASE_CONFIG")
if base_dir:
self.load_dir(base_dir)
return
prefix = os.environ.get("TALER_PREFIX")
if prefix:
self.load_dir(os.path.join(prefix, "share/taler/config.d"))
return
if taler_datadir:
self.load_dir(os.path.join(taler_datadir, "share/taler/config.d"))
return
logger.warn("no base directory found")
@staticmethod
def from_env(*args, **kwargs):
"""
Load configuration from environment variable TALER_CONFIG_FILE
or from default location if the variable is not set.
"""
filename = os.environ.get("TALER_CONFIG_FILE")
return TalerConfig.from_file(filename, *args, **kwargs)
def load_dir(self, dirname):
try:
files = os.listdir(dirname)
except FileNotFoundError:
logger.warn("can't read config directory '%s'", dirname)
return
for file in files:
if not file.endswith(".conf"):
continue
self.load_file(os.path.join(dirname, file))
def load_file(self, filename):
sections = self.sections
with open(filename, "r") as file:
lineno = 0
current_section = None
for line in file:
lineno += 1
line = line.strip()
if len(line) == 0:
# empty line
continue
if line.startswith("#"):
# comment
continue
if line.startswith("["):
if not line.endswith("]"):
logger.error("invalid section header in line %s: %s", lineno, repr(line))
section_name = line.strip("[]").strip().strip('"')
current_section = section_name
continue
if current_section is None:
logger.error("option outside of section in line %s: %s", lineno, repr(line))
continue
kv = line.split("=", 1)
if len(kv) != 2:
logger.error("invalid option in line %s: %s", lineno, repr(line))
key = kv[0].strip()
value = kv[1].strip()
if value.startswith('"'):
value = value[1:]
if not value.endswith('"'):
logger.error("mismatched quotes in line %s: %s", lineno, repr(line))
else:
value = value[:-1]
e = Entry(self.sections, current_section, key, value, filename, lineno)
sections[current_section][key] = e
def dump(self):
for section_name, section in self.sections.items():
print("[%s]" % (section.section_name,))
for option_name, e in section.items():
print("%s = %s # %s" % (e.option, e.value, e.location()))
def __getitem__(self, slice):
if isinstance(slice, str):
return self.sections[slice]
raise TypeError("index must be string")
if __name__ == "__main__":
import sys
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--section", "-s", dest="section", default=None, metavar="SECTION")
parser.add_argument("--option", "-o", dest="option", default=None, metavar="OPTION")
parser.add_argument("--config", "-c", dest="config", default=None, metavar="FILE")
parser.add_argument("--filename", "-f", dest="expand_filename", default=False, action='store_true')
args = parser.parse_args()
tc = TalerConfig.from_file(args.config)
if args.section is not None and args.option is not None:
if args.expand_filename:
x = tc.value_filename(args.section, args.option)
else:
x = tc.value_string(args.section, args.option)
if x is not None:
print(x)
else:
tc.dump()