Parser Base Classes¶
As with the collectors, n6 already comes with some tools to relieve the developer of some work that needs to be done when implementing a new parser, like modifying a routing key.
There are three classes to choose from, one for each of the parser types:
BaseParser
,AggregatedEventParser
,BlackListParser
.
Their names speak for themselves. All of them can be found in the
n6datasources.parsers.base
module in the N6DataSources
library.
(Truth be told, there are more than just three, but those others are considered legacy and should not be used in any new code.)
Configuration¶
Each parser has its configuration.
Simple parsers will only have one attribute
which is prefetch_count
(setting it to “1” should
work in most basic cases).
Content of the configuration file could look like so:
[MyOwnParser]
prefetch_count = 1
In n6 source code repository, the templates of all configuration files
for the collectors and parsers are stored in the
N6DataSources/n6datasources/data/conf
directory and their names start
with 60_
followed by the source provider name and suffixed with
.conf
(e.g., 60_example.conf
). If a source has the collector and the
corresponding parser, then the configuration for the collector and the
parser should be, by convention, in the same file.
Implementation based on BaseParser¶
When implementing parser using BaseParser
as our
parent class, we need to specify some class attributes as well
as implement the parse
method. BaseParser
has
more methods that ease implementing some
special corner cases, but for a generic source this one
should be sufficient.
There are two (or three, see section Versioning parsers) attributes we care about:
default_binding_key
- the routing key by which the parser will draw its data from the queue. Typically, it consists of two parts joined with the “.” character. The first part is the source name (provider) and the second one is the source channel. So for theexample-provider
source provider and theexample-channel
source channel, it would look like so:default_bindig_key = "example-provider.example-channel"
.constant_items
- which is a dictionary of items that are constant for all of the output events. Most parsers will have a dictionary of at least 3 itemsrestriction
,confidence
andcategory
.
The parse
method takes only one positional argument (apart from self
):
data
(a dict containing, among others, the actual input data),
and - as we can have multiple events from one chunk of data (e.g. a CSV-formatted rows with one event per row) -
yields consecutive events as dict-like objects.
Remember that data
contains staff that was taken from the input queue, so
there are some additional properties like timestamp
and so on.
What is most important, the actual input content is stored under the raw
key
(typically, it is exactly what was sent by the collector as the output
data body).
Example data:
[
{
"properties.app_id": null,
"properties.cluster_id": null,
"properties.content_encoding": null,
"properties.content_type": null,
"properties.correlation_id": null,
"properties.delivery_mode": null,
"properties.expiration": null,
"properties.message_id": "0123456789abcdef0123456789abcdef",
"properties.priority": null,
"properties.reply_to": null,
"properties.timestamp": "2019-11-10 10:14:00",
"properties.type": "foo...bar...",
"properties.user_id": null,
"raw": "{\"tag\": \"example.com,some1.2019-10-11T23:59:59\"}",
"raw_format_version_tag": null,
"source": "example-provider.example-channel"
},
{
"properties.app_id": null,
"properties.cluster_id": null,
"properties.content_encoding": null,
"properties.content_type": null,
"properties.correlation_id": null,
"properties.delivery_mode": null,
"properties.expiration": null,
"properties.message_id": "0123456789abcdef0123456789abcdef",
"properties.priority": null,
"properties.reply_to": null,
"properties.timestamp": "2019-11-10 10:14:03",
"properties.type": "foo...bar...",
"properties.user_id": null,
"raw": "{\"tag\": \"example.org,some2.2019-10-12T01:02:03\"}",
"raw_format_version_tag": null,
"source": "example-provider.example-channel"
}
]
So some very simple implementation could look like so:
class MyOwnParser(BaseParser):
default_binding_key = "example-provider.example-channel"
constant_items = {
'restriction': 'public',
'confidence': 'medium',
'category': 'server-exploit',
}
def parse(self, data):
raw = json.loads(data['raw'])
for event in raw:
with self.new_record_dict(data) as parsed:
tag_parts = event['tag'].split(',')
parsed['fqdn'] = tag_parts[0]
parsed['name'] = tag_parts[1]
parsed['time'] = tag_parts[2]
yield parsed
In the example we see that the helper method new_record_dict
(which creates an instance of n6lib.record_dict.RecordDict
, which is
a dict-like mapping class with some validation and adjustment capabilities added).
It is used with the with
clause as it sets the exception
handle callback as the handle_parse_error
method of the BaseParser
which can be overridden by the deriving classes (for example, a class deriving from
SkipParseExceptionMixin
will suppress most errors…).
Implementation based on the BlackListParser¶
Implementing a blacklist parser works mostly the same as implementing a generic one. There are just some additional class attributes to specify if needed.
The blacklist event has some private data added for the Comparator module to use, and we need to help the parser find the values it needs.
The data we need to find is the time of the event. There are three attributes we can set:
bl_current_time_regex_group
,bl_current_time_regex
,bl_current_time_format
.
If none of those will be set by the subclass, the time will be taken
from the properties.timestamp
key in the data. If the collector’s
input is an e-mail then the time will be taken from the key
mail_time
in the dictionary stored under the meta
key in the
top-level data
dictionary; if it is a website then the time will be
taken from the key http_last_modified
from the same dictionary (it is
the responsibility of appropriate collectors to place that data in the
meta
header of the AMQP message being sent to the RabbitMQ queue).
If bl_current_time_regex
is specified, then the
parser will search for the match in the data['raw']
.
If the match is found, it will capture the time
using the bl_current_time_regex_group
, which defaults
to 'datetime'
. Lastly, the parser assumes that
the data extracted this way will be in the iso format.
If that is not correct, then the bl_current_time_format
should be set to the value which will be used
as the format when calling datetime.strptime
on the
extracted string.
Implementation based on the AggregatedEventParser¶
Implementation is the same as the BaseParser
, except it needs to create
a _group
key for the Aggregator module
– so the group_id_components
class attribute needs to be provided.
See high frequency data sources.
Example:
class ExampleHiFreqParser(AggregatedEventParser):
default_binding_key = 'example-provider.example-channel'
constant_items = {
'restriction': 'public',
'confidence': 'low',
'category': 'tor',
}
group_id_components = 'ip', 'dport', 'proto'
def parse(self, data):
# implementation here...
pass
Versioning parsers¶
A little disclaimer about the default_binding_key
’s value.
We said before that the key consists of two parts separated with the “.” character,
but it can be three sometimes. The full format of the default binding key looks
like so: {source provider}.{source channel}.{raw_format_version_tag}
- with the provision that
the last part is optional.
So what is this raw_format_version_tag
?
When the format of the data provided by some external source changes
we need to implement a new or modify the old collector.
However, for the archivers sake (it can recover our data, but the
data are kept raw from the collectors, so they need to be parsed
again upon recovery) we need to keep the old parser unchanged.
So we need to somehow say to the n6 that the source is
the same, it’s just a newer version. That’s when the
raw_format_version_tag
comes into play.
If needed, we set the version tag inside our new (or modified)
collector (as the raw_format_version_tag
attribute) and then add it to the default_binding_key
of our new parser. So, for example, if we would
use the raw_format_version_tag
in format YYYYMM
(which is a handy convention; we imply that the source does not change its format often)
then, if the source has changed, the collector would
look like so:
# Modified collector
class ExampleSourceCollector(BaseCollector):
raw_type = 'stream'
raw_format_version_tag = '201912'
config_spec = '''
config spec here
'''
# methods implementation below skipped
The parsers:
# Old parser implementation
class ExampleSourceParser(BaseParser):
default_binding_key = 'example-provider.example-channel'
# implementation skipped
# New parser
class ExampleSource202112Parser(BaseParser):
default_binding_key = 'example-provider.example-channel.201912'
# implementation skipped
Remarks¶
This document is just a draft and only covers the most basic
implementation. To learn more about how parsers work and
see some examples, look into the base.py
module inside
the “parsers” directory, or implementation of any
parsers that can be found here.