Source code for troposphere_mate.core.orchestration

# -*- coding: utf-8 -*-

"""
Implement a Orchestration Framework.
"""

try:
    from typing import List, Tuple, Dict, Type
except:
    pass

import attr
from collections import OrderedDict
from pathlib_mate import PathCls as Path

from .mate import AWSObject, Template
from .canned import Canned


[docs]def resolve_pipeline(plan): """ :type plan: List[Tuple[str, str]] :param plan: [(can_id, tag), ...] :rtype: List[Tuple[List[str], str]]] """ pipeline_change_set = list() job = ([], None) previous_env = None for tier_name, tier_env in plan: if tier_env != previous_env: pipeline_change_set.append(job) previous_env = tier_env job = ([tier_name, ], tier_env) else: job[0].append(tier_name) pipeline_change_set.append(job) pipeline_change_set = pipeline_change_set[1:] dct = dict() pipeline = list() for tier_list, tier_env in pipeline_change_set: if tier_env in dct: dct[tier_env].extend(tier_list) else: dct[tier_env] = tier_list pipeline.append((list(dct[tier_env]), tier_env)) return pipeline
class ResourceFilter(object): def __init__(self, allowed_stack_id_list): self.allowed_stack_id_list = allowed_stack_id_list def filter(self, resource, template): """ Check if we want to keep this resource in the cloudformation. If ``True``, we keep it. if ``False`` we call ``Template.remove_resource(resource)`` to remove it, :type resource: AWSObject :type template: Template :rtype: bool """ # if resource. if resource.resource_type == "AWS::CloudFormation::Stack": if resource.title in self.allowed_stack_id_list: return True else: return False else: return True
[docs]@attr.s class CanLabel(object): """ A wrapper around a ``troposphere_mate.Canned``. It defines the metadata about the ``Canned`` **中文文档** 在 ``Canned`` 之外的进一层包装. ``logic_id`` 是当 ``Canned`` 封装的 Template 会 被作为 Nested Stack 时起作用的. 因为 ``troposphere`` 实现的 Template 可能在其他 Template 中作为 ``AWS::CloudFormation::Stack`` Resource 使用. 作为 Nested Stack 是不知道 Master Stack 中的 Resource Logic Id 的. ``filename`` 则是指定了实体文件的文件名. 因为 ``Template`` 本身只关注模板数据, 不关注模板文件. CanLabel 实现了 Y 轴上的编排. """ logic_id = attr.ib() # type: str can_class = attr.ib() # type: Type[Canned] filename = attr.ib() # type: str
[docs]@attr.s class ConfigData(object): """ **中文文档** 一串的 CanLabel (本质上是一串原子的 Nested Stack, 要么该 Stack 中的资源被全部 创建, 要么全部不被创建) 构成了一个架构的设计. 而这个架构的设计可能被部署到不同的环境中, 在不同的环境中, 配置数据可能不同, 实际被部署的 Nested Stack 的数量也可能不同. ConfigData 提供了在不同环境下 (用 env_tag 做区分) 的配置数据. ConfigData 实现了 X 轴上的编排. """ env_tag = attr.ib() # type: str data = attr.ib() # type: dict
@attr.s class Note(object): can_id = attr.ib() # type: str env_tag = attr.ib() # type: str # ---
[docs]@attr.s class TemplateFile(object): """ **中文文档** 包含了 ``troposphere_mate.Template`` 的实例 以及实际的文件路径 (绝对路径) """ template = attr.ib() # type: Template filepath = attr.ib() # type: str @filepath.validator def check_filepath(self, attribute, value): if not Path(value).is_absolute(): raise ValueError( "You have to use absolute path for 'TemplateFile.filepath`!") def make_file(self, json_or_yml="json"): self.template.to_file(self.filepath, json_or_yml=json_or_yml)
[docs]@attr.s class ExecutionJob(object): """ **中文文档** 每个 ExecutionJob 对应一次 ``aws cloudformation deploy`` 命令的执行. 本质上一个 ExecutionJob 包含了一串最终的 Template 文件实体. 所以我们需要知道 Master Template 的路径, 以及所有的 Template 的数据以及路径. """ master_can = attr.ib() # type: Canned master_template_path = attr.ib() # type: str template_file_list = attr.ib() # type: List[TemplateFile] def execute(self): self.master_can.dump_shell_script_json_config_file() self.master_can.dump_cloudformation_json_config_file() for template_file in self.template_file_list: template_file.make_file(json_or_yml="json")
[docs]class Orchestration(object): """ **中文文档** Orchestration 的本质是对 CanLabel 和 ConfigData 进行编排. 使用: ``CanLabel.logic_id`` 和 ``ConfigData.env_tag`` 指定了编排中的某个最小单元, 通过指定云架构部署的顺序, 最终实现编排. """ def __init__(self, master_canlabel_id, canlabel_list, config_data_list, notes): """ :type master_canlabel_id: str :type canlabel_list: List[CanLabel] :type config_data_list: List[ConfigData] :type notes: List[Note] """ self.master_canlabel_id = master_canlabel_id # type: str self.canlabel_mapper = OrderedDict([ (canlabel.logic_id, canlabel) for canlabel in canlabel_list ]) # type: Dict[str, CanLabel] self.config_data_mapper = OrderedDict([ (config_data.env_tag, config_data) for config_data in config_data_list ]) # type: Dict[str, ConfigData] self.notes = notes # type: List[Note] # print(self.canlabel_mapper[self.master_canlabel_id]) def plan(self, temp_dir): pipeline = resolve_pipeline([ (note.can_id, note.env_tag) for note in self.notes ]) nested_can_mapper = dict() # type: Dict[str, Canned] returned_list = list() STOP_AT_IND = 4 counter = 0 for can_id_list, env_tag in pipeline: counter += 1 deploy_workspace_dir = Path( temp_dir, "{}-{}".format(str(counter).zfill(3), env_tag)) deploy_workspace_dir.mkdir(parents=True, exist_ok=True) returned_list.append(deploy_workspace_dir) template_file_list = list() config_data = self.config_data_mapper[env_tag].data master_can_label = self.canlabel_mapper[self.master_canlabel_id] master_can = master_can_label.can_class(**config_data) master_can.CONFIG_DIR = deploy_workspace_dir.abspath master_can.create_template() master_template_path = Path( deploy_workspace_dir, master_can_label.filename) template_file_list.append( TemplateFile( template=master_can.template, filepath=master_template_path, ) ) # construct resource filter # based on two # 1. The current execution job's ``CanLabel.logic_id`` (Nested Stack Resource Logic Id) # 2. Environment specified config data's ``TIER_LIST_TO_DEPLOY`` allowed_stack_id_list = [ resource_id for resource_id in can_id_list if resource_id in master_can.TIER_LIST_TO_DEPLOY.get_value() ] r_filter = ResourceFilter(allowed_stack_id_list) # remove ignored stacks for resource_id, resource in list(master_can.template.resources.items()): keep_this_flag = r_filter.filter(resource, master_can.template) if not keep_this_flag: master_can.template.remove_resource(resource) else: if resource_id in self.canlabel_mapper: nested_canlabel = self.canlabel_mapper[resource_id] nested_can = nested_canlabel.can_class(**config_data) nested_can.create_template() nested_can_mapper[resource_id] = nested_can template_file = TemplateFile( template=nested_can.template, filepath=Path(deploy_workspace_dir, nested_canlabel.filename) ) template_file_list.append(template_file) # construct ExecutionJob print("=" * 10) print(can_id_list, env_tag) master_can.dump_cloudformation_json_config_file() for template_file in template_file_list: template_file.make_file(json_or_yml="json") # break # if STOP_AT_IND == counter: # break return returned_list