Stateful Collectors¶
Some collectors need to preserve their state between consecutive runs. The usual case is that we want to remember some information that allows us to identify the data we have already downloaded as to not download it again and make a mess out of our database with redundant data.
The most important thing to note here is that each collector is used as an OS process. So it can be run to download some data, then end its life; then it is supposed to be run again – by some scheduler like e.g. cron – after a few hours to check if some new data have been added to the source. So now we know that we cannot store our state in the process memory as it will be deallocated when the process ends. And so we need some kind of persistent storage.
Implementation¶
N6DataSources
’s module n6datasources.collectors.base
comes with a class mixin StatefulCollectorMixin
which saves the state as a pickle (so it can be almost any Python object) inside the n6’s state directory.
StatefulCollectorMixin
consist of 4 methods and an __init__
.
We will care only about 3 of them:
save_state(self, state)
- saves the given state as a pickle in the state directory.load_state(self)
- tries to load and returns a previously saved state from the state directory. If it failed it callsmake_default_state
(see below) to obtain the default state.make_default_state(self)
- creates the default state object of the collector. The default implementation just returnsNone
.
This class is a mixin, so it does not modify the run_collection
method of
a collector in any way whatsoever. So it is your responsibility to call
load_state
before starting collecting data and save_state
just before
the exiting (or after each state update, this method ensures that in case
of the collector’s crash, we will recover at point right before the error has occurred).
Note that StatefulCollectorMixin
does some initialization in __init__
which is later used by the load_state
and save_state
methods.
So make sure to call it (typically just using the super()
technique).
What does init do?¶
You can skip this part and jump to the example, but if you are curious:
__init__
basically just sets the path of the state file by joining
state_dir
path with the generated state’s file name.
How does the object know the path of the state directory? From the
config
attribute under the state_dir
key. So make sure it
is specified in the configuration.
Example¶
class MyStatefullCollector(StatefulCollectorMixin, BaseCollector):
XXX
raw_type = 'stream'
config_spec = '''
[my_statefull_collector]
state_dir :: path
url :: str
'''
def make_default_state(self):
return {}
def run(self):
self.state = self.load_state()
super(MyStatefullCollector, self).run() # <- main activity of the collector
self.save_state(self.state)
def start_publishing(self):
output_rk, output_data_body, output_prop_kwargs = self.get_output_components()
self.publish_output(output_rk, output_data_body, output_prop_kwargs)
self.inner_stop()
def get_source_channel(self, **processed_data):
return 'example-channel'
def get_output_data_body(self, **kwargs):
data = self._collect_data()
return data
def _collect_data(self):
data = self._download(url=self.config['url'], only_newer_than=self.state.get('last_utc'))
self.state['last_utc'] = data.time_utc
return data
def _download(self, url, only_newer_than):
# download some data newer than the `only_newer_than` time
In the example above a state is a simple dictionary (note that an
empty dictionary is returned by the above implementation of
make_default_state
). Here we load the state in an extended version of
the LegacyQueuedBase
’s method run
.
Note that this example is focused on showing how the state
works and should not be treated
as working-kind example cause of lack of implementation in some methods (e.g. _download()).
In the _collect_data
(called in get_output_data_body
, which is, in turn,
called in start_publishing
), we update our state with the just
downloaded data. In this case, it is just the creation time of the
processed chunk, so we will not try to download older bits of data.
In the case of this collector, only when the communication with RabbitMQ
has been properly closed, we save our state (see again our extended
version of the run
method).
Remarks¶
Some base classes (but not BaseCollector
) already implement state management by themselves;
in those cases you do not need to derive from StatefulCollectorMixin
.
Instead, read their documentation.