Examples

This code demonstrates usage of Logstash pipeline parser. It gives you a good overview of all the things that can be done. Also included is output so you can see what gets printed when you run the code.

Initialization

First of all we initialize Pipeline from string

from logstash_pipeline_parser import Pipeline

data = r"""
    input {
      syslog {
        port => 5014
      }
    }
"""

pipeline = Pipeline(data)

Or it is possible to initialize the pipeline from a file.

from logstash_parser import Pipeline
from pathlib import Path

# string parameter
pipeline = Pipeline.from_file("/some/path/to/pipeline.conf")

# Path parameter
path = Path("/some/path/to/pipeline.conf")
pipeline = Pipeline.from_file(path)

Parsing

Let’s parse some beats input.

from logstash_pipeline_parser import Pipeline

data = r\"""
    input {
      beats {
        host => "0.0.0.0"
        port => 5044
        client_inactivity_timeout => 3600
        include_codec_tag => true
        enrich => [source_metadata, ssl_peer_metadata]
        ssl => true
        ssl_key => "/some/path/my.key"
        id => "input_beats"
      }
    }
\"""

ast = Pipeline(data).parse()

This will produce array:

from ipaddress import IPv4Address
from pathlib import Path

[
    ["input",[
        ["beats", [
            ["host", [IPv4Address("0.0.0.0")]],
            ["port", [5044]],
            ["client_inactivity_timeout", [3600]],
            ["include_codec_tag", [True]],
            ["enrich", [
                ["source_metadata", "ssl_peer_metadata"]
            ]],
            ["ssl", [True]],
            ["ssl_key", [Path("/some/path/my.key")]],
            ["id", ["input_beats"]]
        ]]
    ]]
]

Note

Parser automatically casts boolean values, numbers, IPv4/IPv6 addresses and filesystem paths.

Types

Let’s say:
  1. we don’t want to return “include_codec_tag” as bool but a simple str.

  2. we don’t want to return “ssl_key” as pathlib.Path but a simple str.

  3. we want “host” of type MyHost

  4. we want to return only the first value from “enrich”.

from logstash_pipeline_parser import Pipeline
from typing import NoReturn

def return_first(data:list) -> str:
    return data[0]

class MyHost:

    def __init__(self, data: Any) -> NoReturn:
        self.data = data

    def __repr__(self) -> str:
        return f"MyHost(data={self.data})"

pipeline = Pipeline(data)

# add new types
pipeline.add_type('include_codec_tag', str)
pipeline.add_type('host', MyHost)
pipeline.add_type("enrich", return_first)

# remove default type
pipeline.remove_type('ssl_key')

ast = pipeline.parse()

Of course these examples don’t make much sense, it’s just a usage example. The parsing result is:

[
  ["input", [
    ["beats", [
      ["host", [MyHost(data=IPv4Address("0.0.0.0"))]],
      ["port", [5044]],
      ["client_inactivity_timeout", [3600]],
      ["include_codec_tag", ["True"]],
      ["enrich", ["source_metadata"]],
      ["ssl", [True]],
      ["ssl_key", ["/some/path/my.key"]],
      ["id", ["input_beats"]]
    ]]
  ]]
]