Source code for artifacts.registry

# -*- coding: utf-8 -*-
"""The artifact definitions registry."""

from artifacts import definitions
from artifacts import errors
from artifacts import source_type


[docs] class ArtifactDefinitionsRegistry(object): """Artifact definitions registry.""" _source_type_classes = { definitions.TYPE_INDICATOR_ARTIFACT_GROUP: source_type.ArtifactGroupSourceType, definitions.TYPE_INDICATOR_COMMAND: source_type.CommandSourceType, definitions.TYPE_INDICATOR_DIRECTORY: source_type.DirectorySourceType, definitions.TYPE_INDICATOR_FILE: source_type.FileSourceType, definitions.TYPE_INDICATOR_PATH: source_type.PathSourceType, definitions.TYPE_INDICATOR_WINDOWS_REGISTRY_KEY: source_type.WindowsRegistryKeySourceType, definitions.TYPE_INDICATOR_WINDOWS_REGISTRY_VALUE: source_type.WindowsRegistryValueSourceType, definitions.TYPE_INDICATOR_WMI_QUERY: source_type.WMIQuerySourceType} def __init__(self): """Initializes an artifact definitions registry.""" super(ArtifactDefinitionsRegistry, self).__init__() self._artifact_definitions_by_alias = {} self._artifact_definitions_by_name = {} self._artifact_name_references = set() self._defined_artifact_names = set()
[docs] @classmethod def CreateSourceType(cls, type_indicator, attributes): """Creates a source type object. Args: type_indicator (str): source type indicator. attributes (dict[str, object]): source attributes. Returns: SourceType: a source type. Raises: FormatError: if the type indicator is not set or unsupported, or if required attributes are missing. """ if type_indicator not in cls._source_type_classes: raise errors.FormatError( f'Unsupported type indicator: {type_indicator:s}.') return cls._source_type_classes[type_indicator](**attributes)
[docs] def DeregisterDefinition(self, artifact_definition): """Deregisters an artifact definition. Artifact definitions are identified based on their lower case name. Args: artifact_definition (ArtifactDefinition): an artifact definition. Raises: KeyError: if an artifact definition is not set for the corresponding name. """ artifact_definition_name = artifact_definition.name.lower() if artifact_definition_name not in self._artifact_definitions_by_name: raise KeyError(( f'Artifact definition not set for name: ' f'{artifact_definition.name:s}.')) for alias in artifact_definition.aliases: if alias.lower() not in self._artifact_definitions_by_alias: raise KeyError(f'Artifact definition not set for alias: {alias:s}.') del self._artifact_definitions_by_name[artifact_definition_name] for alias in artifact_definition.aliases: del self._artifact_definitions_by_alias[alias.lower()]
[docs] @classmethod def DeregisterSourceType(cls, source_type_class): """Deregisters a source type. Source types are identified based on their type indicator. Args: source_type_class (type): source type. Raises: KeyError: if a source type is not set for the corresponding type indicator. """ if source_type_class.TYPE_INDICATOR not in cls._source_type_classes: raise KeyError(( f'Source type not set for type: ' f'{source_type_class.TYPE_INDICATOR:s}.')) del cls._source_type_classes[source_type_class.TYPE_INDICATOR]
[docs] def GetDefinitionByAlias(self, alias): """Retrieves a specific artifact definition by alias. Args: alias (str): alias of the artifact definition. Returns: ArtifactDefinition: an artifact definition or None if not available. """ if not alias: return None return self._artifact_definitions_by_alias.get(alias.lower(), None)
[docs] def GetDefinitionByName(self, name): """Retrieves a specific artifact definition by name. Args: name (str): name of the artifact definition. Returns: ArtifactDefinition: an artifact definition or None if not available. """ if not name: return None return self._artifact_definitions_by_name.get(name.lower(), None)
[docs] def GetDefinitions(self): """Retrieves the artifact definitions. Returns: list[ArtifactDefinition]: artifact definitions. """ return self._artifact_definitions_by_name.values()
[docs] def GetUndefinedArtifacts(self): """Retrieves the names of undefined artifacts used by artifact groups. Returns: set[str]: undefined artifacts names. """ return self._artifact_name_references - self._defined_artifact_names
[docs] def RegisterDefinition(self, artifact_definition): """Registers an artifact definition. Artifact definitions are identified based on their lower case name. Args: artifact_definition (ArtifactDefinition): an artifact definition. Raises: KeyError: if artifact definition is already set for the corresponding name or alias. """ artifact_definition_name = artifact_definition.name.lower() if artifact_definition_name in self._artifact_definitions_by_name: raise KeyError(( f'Artifact definition already set for name: ' f'{artifact_definition.name:s}.')) for alias in artifact_definition.aliases: alias_lower = alias.lower() if alias_lower in self._artifact_definitions_by_alias: raise KeyError(f'Artifact definition already set for alias: {alias:s}.') if alias_lower in self._artifact_definitions_by_name: raise KeyError( f'Artifact definition alias: {alias:s} already used as name.') self._artifact_definitions_by_name[artifact_definition_name] = ( artifact_definition) self._defined_artifact_names.add(artifact_definition.name) for alias in artifact_definition.aliases: self._artifact_definitions_by_alias[alias.lower()] = artifact_definition for source in artifact_definition.sources: if source.type_indicator == definitions.TYPE_INDICATOR_ARTIFACT_GROUP: self._artifact_name_references.update(source.names)
[docs] @classmethod def RegisterSourceType(cls, source_type_class): """Registers a source type. Source types are identified based on their type indicator. Args: source_type_class (type): source type. Raises: KeyError: if source types is already set for the corresponding type indicator. """ if source_type_class.TYPE_INDICATOR in cls._source_type_classes: raise KeyError(( f'Source type already set for type: ' f'{source_type_class.TYPE_INDICATOR:s}.')) cls._source_type_classes[source_type_class.TYPE_INDICATOR] = ( source_type_class)
[docs] @classmethod def RegisterSourceTypes(cls, source_type_classes): """Registers source types. Source types are identified based on their type indicator. Args: source_type_classes (list[type]): source types. """ for source_type_class in source_type_classes: cls.RegisterSourceType(source_type_class)
[docs] def ReadFromDirectory(self, artifacts_reader, path, extension='yaml'): """Reads artifact definitions into the registry from files in a directory. This function does not recurse sub directories. Args: artifacts_reader (ArtifactsReader): an artifacts reader. path (str): path of the directory to read from. extension (Optional[str]): extension of the filenames to read. Raises: KeyError: if a duplicate artifact definition is encountered. """ for artifact_definition in artifacts_reader.ReadDirectory( path, extension=extension): self.RegisterDefinition(artifact_definition)
[docs] def ReadFromFile(self, artifacts_reader, filename): """Reads artifact definitions into the registry from a file. Args: artifacts_reader (ArtifactsReader): an artifacts reader. filename (str): name of the file to read from. """ for artifact_definition in artifacts_reader.ReadFile(filename): self.RegisterDefinition(artifact_definition)
[docs] def ReadFileObject(self, artifacts_reader, file_object): """Reads artifact definitions into the registry from a file-like object. Args: artifacts_reader (ArtifactsReader): an artifacts reader. file_object (file): file-like object to read from. """ for artifact_definition in artifacts_reader.ReadFileObject(file_object): self.RegisterDefinition(artifact_definition)