Source code for configirl

# -*- coding: utf-8 -*-

"""
The MIT License (MIT)

Copyright 2019 Sanhe Hu <https://github.com/MacHu-GWU/configirl-project>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

This is a python config management tool to manage config parameter in
centralized place. The purpose of this tool is to avoid maintain complex
config/paramater handling logic in shell script, cloudformation, terraform
and any other devops tools. Instead, we manage that in Python.

Since Python is a full featured general programming language and it is
available on any Mac / Linux machine.

It allows different DevOps tools to easily talk to each other via JSON.

This library implemented in pure Python with no dependencies.
"""

from __future__ import print_function

__version__ = "0.0.9"
__short_description__ = "Centralized Config Management Tool."
__license__ = "MIT"
__author__ = "Sanhe Hu"
__author_email__ = "husanhe@gmail.com"
__github_username__ = "MacHu-GWU"

import os
import re
import sys
import json
import copy
import inspect
from collections import OrderedDict

if sys.version_info.major >= 3 and sys.version_info.minor >= 5:  # pragma: no cover
    from typing import Dict


def strip_comment_line_with_symbol(line, start):
    """
    Strip comments from line string.
    """
    parts = line.split(start)
    counts = [len(re.findall(r'(?:^|[^"\\]|(?:\\\\|\\")+)(")', part))
              for part in parts]
    total = 0
    for nr, count in enumerate(counts):
        total += count
        if total % 2 == 0:
            return start.join(parts[:nr + 1]).rstrip()
    else:  # pragma: no cover
        return line.rstrip()


def strip_comments(string, comment_symbols=frozenset(('#', '//'))):
    """
    Strip comments from json string.

    :param string: A string containing json with comments started by comment_symbols.
    :param comment_symbols: Iterable of symbols that start a line comment (default # or //).
    :return: The string with the comments removed.
    """
    lines = string.splitlines()
    for k in range(len(lines)):
        for symbol in comment_symbols:
            lines[k] = strip_comment_line_with_symbol(lines[k], start=symbol)
    return '\n'.join(lines)


def read_text(abspath, encoding="utf-8"):
    """
    :type abspath: str
    :type encoding: str
    :rtype: str
    """
    with open(abspath, "rb") as f:
        return f.read().decode(encoding)


def write_text(text, abspath, encoding="utf-8"):
    """
    :type text: str
    :type abspath: str
    :type encoding: str
    :rtype: None
    """
    with open(abspath, "wb") as f:
        return f.write(text.encode(encoding))


def json_loads(text):
    """
    :rtype: dict
    """
    return json.loads(strip_comments(text))


def json_dumps(data):
    """
    :rtype: str
    """
    return json.dumps(data, indent=4, sort_keys=False, ensure_ascii=False)


def json_load(path):  # pragma: no cover
    with open(path, "rb") as f:
        return json_loads(f.read().decode("utf-8"))


def json_dump(data, path, overwrite=False):  # pragma: no cover
    if not overwrite:
        if os.path.exists(path):
            raise EnvironmentError("%s already exists!" % path)
    with open(path, "wb") as f:
        f.write(json_dumps(data).encode("utf-8"))


def add_metaclass(metaclass):  # pragma: no cover
    """
    Class decorator for creating a class with a metaclass.

    This method is copied from six.py
    """

    def wrapper(cls):
        orig_vars = cls.__dict__.copy()
        slots = orig_vars.get('__slots__')
        if slots is not None:
            if isinstance(slots, str):
                slots = [slots]
            for slots_var in slots:
                orig_vars.pop(slots_var)
        orig_vars.pop('__dict__', None)
        orig_vars.pop('__weakref__', None)
        if hasattr(cls, '__qualname__'):
            orig_vars['__qualname__'] = cls.__qualname__
        return metaclass(cls.__name__, cls.__bases__, orig_vars)

    return wrapper


try:
    # python3 renamed copy_reg to copyreg
    import copyreg
except ImportError:  # pragma: no cover
    import copy_reg as copyreg


class Sentinel(object):  # pragma: no cover
    _existing_instances = {}

    def __init__(self, name):
        super(Sentinel, self).__init__()
        self._name = name
        self._existing_instances[self._name] = self

    def __repr__(self):
        return "<{0}>".format(self._name)

    def __getnewargs__(self):
        return (self._name,)

    def __new__(cls, name, obj_id=None):  # obj_id is for compatibility with previous versions
        existing_instance = cls._existing_instances.get(name)
        if existing_instance is not None:
            return existing_instance
        return super(Sentinel, cls).__new__(cls)


# obj_id is for compat. with prev. versions
def _sentinel_unpickler(name, obj_id=None):
    if name in Sentinel._existing_instances:
        return Sentinel._existing_instances[name]
    return Sentinel(name)


def _sentinel_pickler(sentinel):
    return _sentinel_unpickler, sentinel.__getnewargs__()


copyreg.pickle(Sentinel, _sentinel_pickler, _sentinel_unpickler)

NOTHING = Sentinel("NOTHING")
REQUIRED = Sentinel("REQUIRED")
OPTIONAL = Sentinel("OPTIONAL")


class ValueNotSetError(Exception):
    """
    Raises when trying to get value of a field that have not set value before.
    """
    pass


class DerivableSetValueError(Exception):
    """
    Raises when trying to set value for Derivable Field.
    """
    pass


#
class Field(object):
    """
    Base class for config value field.

    :type dont_dump: bool
    :param dont_dump: if true, then you can't get the value if ``check_dont_dump = True``
        in :meth:`BaseConfigClass.to_dict` and :meth:`BaseConfigClass.to_json`.
        this prevent from writing sensitive information to file

    :type printable: bool
    :param printable: if False, then it will not be displayed with

    :type cache: bool
    :param cache: only available for :class:`Derivable` if True,
        then it will cache derived value.
    """
    _creation_index = 0

    def __init__(self,
                 default=NOTHING,
                 dont_dump=False,
                 printable=True,
                 cache=False):
        self.name = None
        self._value = default
        self.dont_dump = dont_dump  # type: bool
        self.printable = printable  # type: bool
        self.cache = cache  # type: bool

        self._config_object = NOTHING  # type: BaseConfigClass
        self._creation_index = Field._creation_index  # type: int
        Field._creation_index += 1

        self._getter_method = NOTHING  # type: callable

    def __repr__(self):
        return "{}(name={!r}, value={!r})".format(self.__class__.__name__, self.name, self._value)

    def set_value(self, value):
        raise DerivableSetValueError(
            "Derivable.set_value() method should never bee called")

    def _get_value(self, **kwargs):
        """
        Config Value Type specified.
        """
        raise NotImplementedError

    def get_value(self,
                  check_dont_dump=False,
                  check_printable=False,
                  **kwargs):
        """
        Since the derivable

        :param config_instance:
        :param check_dont_dump:
        :param check_printable:
        :return:

        **CN Doc**

        对于 Constant Field:

        - 如果: self.value = NOTHING, 同时 .set_value(...) 方法从来没有被调用过.
        - 如果: self.value 不等于 NOTHING, 说明 .set_value(...) 方法被吊用过, 则
            返回 self.value

        对于 Derivable Field:

        - 如果: self._getter_method() 没有成功
        """
        if self._config_object is NOTHING:
            raise AttributeError("Field.get_value() can't be called without "
                                 "initialized.")
        if check_dont_dump:
            if self.dont_dump:
                raise DontDumpError("doesn't allow to dump `{}` field".format(self.name))
        if check_printable:
            if not self.printable:
                return "***HIDDEN***"

        return self._get_value(**kwargs)

    def get_value_from_env(self, prefix=""):
        """
        Use config value stored in environment variables. This usually used
        for computation server that doesn't come with the config file. Since
        config file with sensitive information may not easy to manage. A common
        use case is AWS Lambda Function.

        :param prefix: a prefix append left to the config field name. For exmaple,
            if the config field is ``PROJECT_NAME``, and the prefix is ``MY_PROJECT_``,
            then it will read value from ``MY_PROJECT_PROJECT_NAME``.
        """
        return os.environ[prefix + self.name]

    def get_value_for_lbd(self, prefix=""):
        """
        Smartly decide where should read config value from.
        """
        if self._config_object.is_aws_lambda_runtime():
            return self.get_value_from_env(prefix=prefix)
        else:
            return self.get_value()

    def _validate_method(self, config_object, value):
        return True

    def validator(self, method):
        """
        a decorator to bind validate method.

        :type method: callable
        :param method: a callable function like ``method(self, value)``
            that take ``self`` as first parameters representing the config object.
            ``value`` as second parameters to represent the value you want to validate.
        """
        self._validate_method = method

    def validate(self, *args, **kwargs):
        """
        An abstract method executes the validator method.
        """
        self._validate_method(self._config_object, self.get_value())


class DontDumpError(Exception):
    """
    Raises when trying to dump a ``dont_dump=True`` config value.
    """
    pass


class Constant(Field):
    """
    Constant Value Field.
    """

    def set_value(self, value):
        self._value = value

    def _get_value(self, **kwargs):
        if self._value is NOTHING:
            raise ValueNotSetError(
                "{}.{} has not set a value yet!".format(
                    self._config_object.__class__.__name__, self.name
                )
            )
        return self._value


class Derivable(Field):
    """
    Derivable Value Field.
    """

    def getter(self, method):
        self._getter_method = method

    def _get_value(self, **kwargs):
        if self._getter_method is NOTHING:
            raise NotImplementedError(
                "{}.{} getter method is not implemented, "
                "use @{}.getter to decorate a getter function.".format(
                    self._config_object.__class__.__name__, self.name, self.name
                ))

        try:
            if self.cache:
                if self._value is NOTHING:
                    self._value = self._getter_method(self._config_object, **kwargs)
                return self._value
            else:
                return self._getter_method(self._config_object, **kwargs)
        except ValueNotSetError as e:  # dependent constant value not set yet
            raise ValueNotSetError(
                "can't get {}.{}, because: {}".format(
                    self._config_object.__class__.__name__, self.name, e
                )
            )
        except Exception as e:
            raise e


def is_instance_or_subclass(val, class_):
    """Return True if ``val`` is either a subclass or instance of ``class_``."""
    try:
        return issubclass(val, class_)
    except TypeError:
        return isinstance(val, class_)


def _get_fields(attrs, field_class, pop=False, ordered=False):
    """Get fields from a class. If ordered=True, fields will sorted by creation index.
    :param attrs: Mapping of class attributes
    :param type field_class: Base field class
    :param bool pop: Remove matching fields
    """
    fields = [
        (field_name, field_value)
        for field_name, field_value in attrs.items()
        if is_instance_or_subclass(field_value, field_class)
    ]
    if pop:  # pragma: no cover
        for field_name, _ in fields:
            del attrs[field_name]
    if ordered:
        fields.sort(key=lambda pair: pair[1]._creation_index)
    return fields


def _get_fields_by_mro(klass, field_class, ordered=False):
    """Collect fields from a class, following its method resolution order. The
    class itself is excluded from the search; only its parents are checked. Get
    fields from ``_declared_fields`` if available, else use ``__dict__``.
    :param type klass: Class whose fields to retrieve
    :param type field_class: Base field class
    """
    mro = inspect.getmro(klass)
    # Loop over mro in reverse to maintain correct order of fields
    return sum(
        (
            _get_fields(
                getattr(base, "_declared_fields", base.__dict__),
                field_class,
                ordered=ordered,
            )
            for base in mro[:0:-1]
        ),
        [],
    )


class ConfigMeta(type):
    def __new__(cls, name, bases, attrs):
        cls_fields = _get_fields(attrs, Field, pop=False, ordered=True)
        klass = super(ConfigMeta, cls).__new__(cls, name, bases, attrs)
        inherited_fields = _get_fields_by_mro(klass, Field, ordered=True)

        # Assign _declared_fields on class
        klass._declared_fields = OrderedDict(inherited_fields + cls_fields)
        klass._constant_fields = OrderedDict([
            (name, field)
            for name, field in klass._declared_fields.items()
            if isinstance(field, Constant)
        ])
        klass._deriable_fields = OrderedDict([
            (name, field)
            for name, field in klass._declared_fields.items()
            if isinstance(field, Derivable)
        ])
        for name, field in klass._declared_fields.items():
            field.name = name
        return klass


class BaseConfigClass(object):
    """

    - :attr:`BaseConfigClass._declared_fields`:
    - :attr:`BaseConfigClass._constant_fields`:
    - :attr:`BaseConfigClass._deriable_fields`:
    """
    _declared_fields = OrderedDict()  # type: Dict[str: Field]
    _constant_fields = OrderedDict()  # type: Dict[str: Constant]
    _deriable_fields = OrderedDict()  # type: Dict[str: Derivable]

    # --- constructor method
    def __init__(self, **kwargs):
        self.__pre_init_hook()
        for name, field in self._declared_fields.items():
            if name in kwargs:
                field.set_value(kwargs[name])
            field._config_object = self

    def __pre_init_hook(self):
        """
        All declared fields is a mutable :class:`Field` Instance, defined
        in Class level. So when you creating a new instance, the class
        level fields have to be deep copied to the config instance.
        """
        self._declared_fields = OrderedDict([
            (attr, copy.deepcopy(field))
            for attr, field in self._declared_fields.items()
        ])
        self._constant_fields = OrderedDict([
            (attr, self._declared_fields[attr])
            for attr, _ in self._constant_fields.items()
        ])
        self._deriable_fields = OrderedDict([
            (attr, self._declared_fields[attr])
            for attr, _ in self._deriable_fields.items()
        ])

        for attr, field in self._declared_fields.items():
            setattr(self, attr, field)

    @classmethod
    def from_dict(cls, dct):
        """
        Only read constant config variables from json file.

        :type dct: dict
        :rtype: BaseConfigClass
        """
        config = cls()
        for key, value in dct.items():
            if key in config._constant_fields:
                config._constant_fields[key].set_value(value)
        return config

    @classmethod
    def from_json(cls, json_str):
        """
        :type json_str: str

        :rtype: BaseConfigClass
        """
        return cls.from_dict(json.loads(strip_comments(json_str)))

    def update(self, dct):
        """
        Update constance config values from dictionary.
        Only those fields defines as Constant value will be loaded.
        Fields don't belong to this config definition will not be loaded.

        :type dct: dict

        :rtype: None
        """
        for key, value in dct.items():
            if key in self._constant_fields:
                self._constant_fields[key].set_value(value)

    def update_from_raw_json_file(self):
        """
        Update constant config values from the :attr:`BaseConfigClass.CONFIG_RAW_JSON_FILE`.
        """
        dct = json.loads(strip_comments(read_text(self.CONFIG_RAW_JSON_FILE)))
        self.update(dct)

    def update_from_env_var(self, prefix):
        """
        Update constant config values from environment variables.

        :type prefix: str
        :param prefix: a prefix used in all related environment variable.
        """
        dct = {
            key.replace(prefix, "", 1): value
            for key, value in os.environ.items()
            if key.replace(prefix, "", 1)
        }
        self.update(dct)

    def to_dict(self,
                check_dont_dump=True,
                check_printable=False,
                ignore_na=False,
                prefix=""):
        """
        Dump config values to dictionary.

        :type check_dont_dump: bool
        :param check_dont_dump: if True, then it will check if a field has
            a True value ``dont_dump`` flag, then :class:`DontDumpError` error
            is raised.

        :type check_printable: bool
        :param check_printable: if True, then it will check if a field has
            a False value ``printable`` flag, then it returns **HIDDEN**.

        :type ignore_na: bool
        :param ignore_na: if True, then :class:`ValueNotSetError` error will be
            ignored.

        :type prefix: str
        :param prefix: a prefix that appended to the left of every field

        :rtype: dict
        """
        dct = OrderedDict()
        for attr, value in self._declared_fields.items():
            key = prefix + attr
            try:
                dct[key] = value.get_value(
                    check_dont_dump=check_dont_dump,
                    check_printable=check_printable,
                )
            except DontDumpError:
                pass
            except ValueNotSetError as e:
                if ignore_na:
                    pass
                else:
                    raise e
            except Exception as e:
                raise e
        return dct

    def to_json(self,
                check_dont_dump=True,
                check_printable=False,
                ignore_na=False,
                prefix=""):
        """
        Dump config values to json.

        :type check_dont_dump: bool
        :param check_dont_dump: if True, then it will check if a field has
            a True value ``dont_dump`` flag, then :class:`DontDumpError` error
            is raised.

        :type check_printable: bool
        :param check_printable: if True, then it will check if a field has
            a False value ``printable`` flag, then it returns **HIDDEN**.

        :type ignore_na: bool
        :param ignore_na: if True, then :class:`ValueNotSetError` error will be
            ignored.

        :type prefix: str
        :param prefix: a prefix that appended to the left of every field

        :rtype: str
        """
        return json.dumps(
            self.to_dict(
                check_dont_dump=check_dont_dump,
                check_printable=check_printable,
                ignore_na=ignore_na,
                prefix=prefix,
            ),
            indent=4, sort_keys=False,
        )

    def __repr__(self):
        return "Config({})".format(
            self.to_json(check_dont_dump=False, check_printable=True)
        )

    def pprint(self):
        print(self.__repr__())

    def validate(self):
        for field in self._declared_fields.values():
            field.validate()

    # --- Runtime Detection ---
    @classmethod
    def is_aws_ec2_amz_linux_runtime(cls):  # pragma: no cover
        if os.environ["HOME"].endswith("ec2-user"):
            return True
        else:
            return False

    @classmethod
    def is_aws_ec2_redhat_runtime(cls):  # pragma: no cover
        if os.environ["HOME"].endswith("ec2-user"):
            return True
        else:
            return False

    @classmethod
    def is_aws_ec2_freebsd_runtime(cls):  # pragma: no cover
        if os.environ["HOME"].endswith("ec2-user"):
            return True
        else:
            return False

    @classmethod
    def is_aws_lambda_runtime(cls):  # pragma: no cover
        """
        Ref: https://docs.aws.amazon.com/lambda/latest/dg/lambda-environment-variables.html
        """
        if "AWS_LAMBDA_FUNCTION_NAME" in os.environ:
            return True
        else:
            return False

    @classmethod
    def is_aws_code_build_runtime(cls):  # pragma: no cover
        """
        Ref: https://docs.aws.amazon.com/codebuild/latest/userguide/build-env-ref-env-vars.html
        """
        if "CODEBUILD_BUILD_ID" in os.environ:
            return True
        else:
            return False

    # CI
    @classmethod
    def is_ci_runtime(cls):  # pragma: no cover
        if "CI" in os.environ:
            if os.environ["CI"]:
                return True
            else:
                return False
        else:
            return False

    @classmethod
    def is_circle_ci_runtime(cls):  # pragma: no cover
        """
        Ref: https://circleci.com/docs/2.0/env-vars/#built-in-environment-variables
        :return:
        """
        if "CIRCLECI" in os.environ:
            if os.environ["CIRCLECI"]:
                return True
            else:
                return False
        else:
            return False

    @classmethod
    def is_travis_ci_runtime(cls):  # pragma: no cover
        """
        Ref: https://docs.travis-ci.com/user/environment-variables/#default-environment-variables
        """
        if "TRAVIS" in os.environ:
            if os.environ["TRAVIS"]:
                return True
            else:
                return False
        else:
            return False

    @classmethod
    def is_gitlab_ci_runtime(cls):  # pragma: no cover
        """
        Ref: https://docs.gitlab.com/ee/ci/variables/
        """
        if "GITLAB_CI" in os.environ:
            if os.environ["GITLAB_CI"]:
                return True
            else:
                return False
        else:
            return False

    # --- config file path management
    CONFIG_DIR = NOTHING  # type: str

    def _join_config_dir(self, filename):
        if self.CONFIG_DIR is NOTHING:
            raise ValueError("You have to specify `{}.CONFIG_DIR`!".format(
                self.__class__.__name__))
        if not os.path.exists(self.CONFIG_DIR):
            raise ValueError("`{}.CONFIG_DIR` ('{}') doesn't exist!".format(
                self.__class__.__name__, self.CONFIG_DIR))
        return os.path.join(self.CONFIG_DIR, filename)

    @property
    def CONFIG_RAW_JSON_FILE(self):
        return self._join_config_dir("config-raw.json")

    @property
    def CONFIG_FINAL_JSON_FILE_FOR_PYTHON(self):
        return self._join_config_dir("config-final-for-python.json")

    @property
    def CONFIG_FINAL_JSON_FILE_FOR_SHELL_SCRIPT(self):
        return self._join_config_dir("config-final-for-shell-script.json")

    @property
    def CONFIG_FINAL_JSON_FILE_FOR_CLOUDFORMATION(self):
        return self._join_config_dir("config-final-for-cloudformation.json")

    @property
    def CONFIG_FINAL_JSON_FILE_FOR_SAM(self):
        return self._join_config_dir("config-final-for-sam.json")

    @property
    def CONFIG_FINAL_JSON_FILE_FOR_SERVERLESS(self):
        return self._join_config_dir("config-final-for-serverless.json")

    @property
    def CONFIG_FINAL_JSON_FILE_FOR_TERRAFORM(self):
        return self._join_config_dir("config-final-for-terraform.json")

    # --- Custom logic for different devops tools
    def to_python_json_config_data(self):
        return self.to_dict()

    def to_shell_script_config_data(self):
        return self.to_dict()

    def to_cloudformation_config_data(self):
        def to_big_camel_case(text):
            return "".join([
                word[0].upper() + word[1:].lower()
                for word in text.split("_")
            ])

        return OrderedDict([
            (to_big_camel_case(key), value)
            for key, value in self.to_dict().items()
        ])

    def to_sam_config_data(self):
        return self.to_dict()

    def to_serverless_config_data(self):
        return self.to_dict()

    def to_terraform_config_data(self):
        return self.to_dict()

    def _dump_for_xxx_config_file(self,
                                  to_config_data_meth,
                                  config_json_file_path):
        json_str = json_dumps(to_config_data_meth())
        write_text(json_str, config_json_file_path)

    def dump_python_json_config_file(self):
        self._dump_for_xxx_config_file(
            self.to_python_json_config_data,
            self.CONFIG_FINAL_JSON_FILE_FOR_PYTHON,
        )

    def dump_shell_script_json_config_file(self):
        self._dump_for_xxx_config_file(
            self.to_shell_script_config_data,
            self.CONFIG_FINAL_JSON_FILE_FOR_SHELL_SCRIPT,
        )

    def dump_cloudformation_json_config_file(self):
        self._dump_for_xxx_config_file(
            self.to_cloudformation_config_data,
            self.CONFIG_FINAL_JSON_FILE_FOR_CLOUDFORMATION,
        )

    def dump_sam_json_config_file(self):
        self._dump_for_xxx_config_file(
            self.to_sam_config_data,
            self.CONFIG_FINAL_JSON_FILE_FOR_SAM,
        )

    def dump_serverless_json_config_file(self):
        self._dump_for_xxx_config_file(
            self.to_serverless_config_data,
            self.CONFIG_FINAL_JSON_FILE_FOR_SERVERLESS,
        )

    def dump_terraform_json_config_file(self):
        self._dump_for_xxx_config_file(
            self.to_terraform_config_data,
            self.CONFIG_FINAL_JSON_FILE_FOR_TERRAFORM,
        )


@add_metaclass(ConfigMeta)
class ConfigClass(BaseConfigClass):
    pass


__all__ = ["ConfigClass", "Constant", "Derivable"]