Source code for logstash_pipeline_parser.pipeline

from collections.abc import Callable
from collections.abc import Generator
from pathlib import Path
from typing import Any
from typing import NoReturn
from typing import Optional
from typing import Self

from .tree import AST


[docs] class Pipeline: """ :param pipeline: The pipeline definition :type pipeline: str A class representing the Logstash pipeline. """ # noinspection def __init__(self, pipeline: str) -> NoReturn: self._ast: AST = AST() self._data: str = pipeline # Path type self._ast \ .add_type("aggregate_maps_path", Path) \ .add_type("cacert", Path) \ .add_type("ca_file", Path) \ .add_type("client_cert", Path) \ .add_type("client_key", Path) \ .add_type("database", Path) \ .add_type("dictionary_path", Path) \ .add_type("jaas_path", Path) \ .add_type("jdbc_driver_library", Path) \ .add_type("jdbc_password_filepath", Path) \ .add_type("json_key_file", Path) \ .add_type("kerberos_config", Path) \ .add_type("keystore", Path) \ .add_type("mib_paths", Path) \ .add_type("network_path", Path) \ .add_type("path", Path) \ .add_type("private_key", Path) \ .add_type("processed_db_path", Path) \ .add_type("public_key", Path) \ .add_type("schema_registry_ssl_keystore_location", Path) \ .add_type("schema_registry_ssl_truststore_location", Path) \ .add_type("send_nsca_config", Path) \ .add_type("ssl_cacert", Path) \ .add_type("ssl_cert", Path) \ .add_type("ssl_certificate", Path) \ .add_type("ssl_certificate_path", Path) \ .add_type("ssl_key", Path) \ .add_type("ssl_keystore_location", Path) \ .add_type("ssl_keystore_path", Path) \ .add_type("ssl_truststore_location", Path) \ .add_type("ssl_truststore_path", Path) \ .add_type("statement_filepath", Path) \ .add_type("template", Path) \ .add_type("template_file", Path) \ .add_type("truststore", Path)
[docs] @classmethod def from_file(cls, path: str | Path) -> Self: """ :param path: Path to the file :type path: str | pathlib.Path :rtype: Pipeline Instantiates class from a file. For example from string: .. code-block:: python from logstash_parser import Pipeline pipeline = Pipeline.from_file("/some/path/to/pipeline.conf") Or from Path: .. code-block:: python from logstash_parser import Pipeline from pathlib import Path path = Path("/some/path/to/pipeline.conf") pipeline = Pipeline.from_file(path) """ # noinspection if isinstance(path, str): path = Path(path) return cls(path.read_text())
[docs] def add_type(self, name: str, new_type: type[Any] | Callable[[Any], Any]) -> Self: """ :param name: Type name :type name: str :param new_type: New type for given name. :type new_type: type[typing.Any] | typing.Callable[[typing.Any], typing.Any] :rtype: Pipeline Adds a new type. For example function: .. code-block:: python from logstash_parser import Pipeline Pipeline("").add_type("port", str) For example class: .. code-block:: python from logstash_parser import Pipeline class MyPortType: pass Pipeline("").add_type("port", MyPortType) .. note:: Please see :ref:`examples-type` for more examples. """ # noinspection self._ast.add_type(name, new_type) return self
[docs] def remove_type(self, name: str) -> Self: """ :param name: Type name :type name: str :rtype: Pipeline Removes a type For example function: .. code-block:: python from logstash_parser import Pipeline Pipeline("").remove_type("port") """ # noinspection self._ast.remove_type(name) return self
[docs] def get_types(self) -> dict[str, type[Any] | Callable[[Any], Any]]: """ :return: All names as key and types as value. :rtype: dict Returns all defined types Predefined types are: .. list-table:: :header-rows: 1 :widths: auto * - Name - Type/Callable * - aggregate_maps_path - :py:class:`pathlib.Path` * - cacert - :py:class:`pathlib.Path` * - ca_file - :py:class:`pathlib.Path` * - client_cert - :py:class:`pathlib.Path` * - client_key - :py:class:`pathlib.Path` * - database - :py:class:`pathlib.Path` * - dictionary_path - :py:class:`pathlib.Path` * - jaas_path - :py:class:`pathlib.Path` * - jdbc_driver_library - :py:class:`pathlib.Path` * - jdbc_password_filepath - :py:class:`pathlib.Path` * - json_key_file - :py:class:`pathlib.Path` * - kerberos_config - :py:class:`pathlib.Path` * - keystore - :py:class:`pathlib.Path` * - mib_paths - :py:class:`pathlib.Path` * - network_path - :py:class:`pathlib.Path` * - path - :py:class:`pathlib.Path` * - private_key - :py:class:`pathlib.Path` * - processed_db_path - :py:class:`pathlib.Path` * - public_key - :py:class:`pathlib.Path` * - schema_registry_ssl_keystore_location - :py:class:`pathlib.Path` * - schema_registry_ssl_truststore_location - :py:class:`pathlib.Path` * - send_nsca_config - :py:class:`pathlib.Path` * - ssl_cacert - :py:class:`pathlib.Path` * - ssl_cert - :py:class:`pathlib.Path` * - ssl_certificate - :py:class:`pathlib.Path` * - ssl_certificate_path - :py:class:`pathlib.Path` * - ssl_key - :py:class:`pathlib.Path` * - ssl_keystore_location - :py:class:`pathlib.Path` * - ssl_keystore_path - :py:class:`pathlib.Path` * - ssl_truststore_location - :py:class:`pathlib.Path` * - ssl_truststore_path - :py:class:`pathlib.Path` * - statement_filepath - :py:class:`pathlib.Path` * - template - :py:class:`pathlib.Path` * - template_file - :py:class:`pathlib.Path` * - truststore - :py:class:`pathlib.Path` """ # noinspection return self._ast.get_types()
[docs] def parse(self) -> list: """ :return: Parsed tree :rtype: list Create an `Abstract syntax tree <https://en.wikipedia.org/wiki/Abstract_syntax_tree>`_ from the input data. Of course, it is possible to parse all kinds of plugins, conditions and data types. For example this input: .. code-block:: python from logstash_pipeline_parser import Pipeline data = r\""" input { beats { host => "0.0.0.0" port => 5044 client_inactivity_timeout => 3600 include_codec_tag => true enrich => [source_metadata, ssl_peer_metadata] ssl => true ssl_key => "/some/path/my.key" id => "input_beats" } } \""" ast = Pipeline(data).parse() will produce this array: .. code-block:: python from ipaddress import IPv4Address from pathlib import Path [ ["input",[ ["beats", [ ["host", [IPv4Address("0.0.0.0")]], ["port", [5044]], ["client_inactivity_timeout", [3600]], ["include_codec_tag", [True]], ["enrich", [ ["source_metadata", "ssl_peer_metadata"] ]], ["ssl", [True]], ["ssl_key", [Path("/some/path/my.key")]], ["id", ["input_beats"]] ]] ]] ] """ # noinspection return self._ast.parse_config(self._data)
[docs] def search(self, key: str) -> Generator[tuple[str, Any], None, None]: """ :param key: Key name to search for :type key: str :return: Found values in the form tuple[key, value] :rtype: collections.abc.Generator[tuple[str, typing.Any], None, None] Yield the searched keys and their values from the tree. The key can also contain the wildcard `*`, for example "output.*.hosts" will return (if the pipeline definition contains them): .. code-block:: console - ("output.elasticsearch.hosts", ["127.0.0.1:9200","127.0.0.2:9200"]) - ("output.logstash.hosts", "127.0.0.1:9801") .. note:: Please see :ref:`examples-search` for more examples. """ # noinspection for element in self._ast.parse_config(self._data): yield from Pipeline._recursive_search(key.split("."), element)
# thx Francois Garillot # https://stackoverflow.com/a/8848959 @staticmethod def _matcher(_l1: list, _l2: list) -> bool: if not _l1: return _l2 == [] or _l2 == ["*"] if _l2 == [] or _l2[0] == "*": return Pipeline._matcher(_l2, _l1) if _l1[0] == "*": return Pipeline._matcher(_l1, _l2[1:]) or Pipeline._matcher(_l1[1:], _l2) if _l1[0] == _l2[0]: return Pipeline._matcher(_l1[1:], _l2[1:]) else: return False @staticmethod def _recursive_search(key: list, element: list, actual_key: Optional[list] = None) -> list[tuple[str, Any]]: if actual_key is None: actual_key = [] actual_key: list = actual_key.copy() _matched: list[tuple[str, Any]] = [] if len(element) == 2 and isinstance(element[0], str): actual_key.append(element[0]) if Pipeline._matcher(actual_key, key): _matched.append((".".join(actual_key), element[1])) else: for child in element: if not isinstance(child, list): continue _matched += Pipeline._recursive_search(key, child, actual_key) return _matched