The Parts

Let's spend a few seconds to talk about the StackTach.v3 components and establish some terminology.

  • The source application (aka: your application) publishes notifications to the queue.
  • Yagi consumes these notifications from the queue and passes them onto a chain of Yagi Handlers.
  • Some yagi handlers include:
  • Winchester takes the distilled notifications (called events) and stores them in the MySQL database.
  • Streams may be created or processed as new events flow into the system. This can result in new events or notifications being generated.
  • Any new notifications can be published back into the queue for subsequent processing via the Notabene pipeline handler.
  • ... the sequence repeats itself.

In order to get notifications from OpenStack, you'll need to put the following lines in your service configuration files.

# OpenStack deployments before Kilo should use the old-style:
# Kilo and beyond use the "messaging" entry-point:

Where "monitor" is the name of the queue you wish to send the notifications to. When you are configuring Yagi, you'll need to ensure the queue name prefix matches what you've defined here.

Consuming Notifications with Yagi

You're going to need a way to get those notifications out of your queuing system. That's what Yagi does. Yagi reads notifications from one place and spits them out somewhere else. It's highly configurable and battle-tested for large scale deployment.

You launch the yagi-events process with the following command:

yagi-event --config yagi.conf

where yagi.conf is the name of your yagi configuration file.

A sample Yagi config file
verbose = False
debug = True
update_timer = 10

event_driver =

host = localhost
user = guest
password = guest
port = 5672
vhost = /
poll_delay = 0.5

queues =

apps = winchester.yagi_handler.WinchesterHandler, yagi.handler.shoebox_handler.ShoeboxHandler
exchange = monitor
exchange_type = topic
routing_key =
durable = True
max_messages = 100

The important part of this configuration is the [event_worker] section. This says we want to use the RabbitMQ data source. The RabbitMQ connectivity information is stored in the [rabbit_broker] section. The name of each RabbitMQ queue to consume from is specified in the [consumers] section. For every queue you define there, you will need a [consumer:<queue_name>] section. This last section is where there real magic happens. Beyond defining the exchange, routing_key and durability characteristics, it defines the chain of Yagi Handlers that will run on every notification that gets consumed.

You can write your own Yagi handlers if you like, but there are a number that ship with StackTach.v3 to do some interesting things. The most important of these is the winchester.yagi_handler:WinchesterHandler. This handler is your entry point into StackTach.v3 stream processing. But first, we need to convert those messy notifications into events ...

Distilling Notifications to Events

Now we have notifications coming into Winchester. But, as we hinted at above, we need to take the larger notification and distill it down into a, more manageable, event. The stack-distiller module makes this happen. Within StackTach.v3, this is part of winchester.yagi_handler:WinchesterHandler.

A notification is a large, nested JSON data structure. But we don't need all of that data for stream processing. In fact, we generally only require a few Traits from the notification. That's what distilling does. It pulls out the important traits, scrubs the data and uses that. Distillations are done via the distillation configuration file (specified in winchester.conf).

Only timestamp and event_type are required traits.

A sample notification
  '_context_is_admin': True,    
  '_context_project_id': u'7c150a59fe714e6f9263774af9688f0e',    
  '_context_quota_class': None,    
  '_context_read_deleted': u'no',    
  '_context_remote_address': u'',    
  '_context_request_id': u'req-d68b36e0-9233-467f-9afb-d81435d64d66',    
  '_context_roles': [u'admin'],    
  '_context_timestamp': u'2012-05-08T20:23:41.425105',    
  '_context_user_id': u'1e3ce043029547f1a61c1996d1a531a2',    
  'event_type': u'compute.instance.create.end',    
  'message_id': u'dae6f69c-00e0-41c0-b371-41ec3b7f4451',    
  'priority': u'INFO',    
  'publisher_id': u'compute.vagrant-precise',    
  'timestamp': u'2012-05-08 20:23:48.028195',
  'payload': {
      'created_at': u'2012-05-08 20:23:41',                 
      'deleted_at': u'',                 
      'disk_gb': 0,                 
      'display_name': u'testme',                 
      'fixed_ips': [{ 'address': u'',                                 
                  'floating_ips': [],                                 
                  'meta': {},                                 
                  'type': u'fixed',               
                  'version': 4}],                 
      'image_ref_url': u'',                 
      'instance_id': u'9f9d01b9-4a58-4271-9e27-398b21ab20d1',                 
      'instance_type': u'm1.tiny',                 
      'instance_type_id': 2,                 
      'launched_at': u'2012-05-08 20:23:47.985999',                 
      'memory_mb': 512,                 
      'state': u'active',                 
      'state_description': u'',                 
      'tenant_id': u'7c150a59fe714e6f9263774af9688f0e',                 
      'user_id': u'1e3ce043029547f1a61c1996d1a531a2',                 
      'reservation_id': u'1e3ce043029547f1a61c1996d1a531a3',                 
      'vcpus': 1,                 
      'root_gb': 0,                 
      'ephemeral_gb': 0,                 
      'host': u'compute-host-name',                 
      'availability_zone': u'1e3ce043029547f1a61c1996d1a531a4',                 
      'os_type': u'linux?',                 
      'architecture': u'x86',                 
      'image_ref': u'UUID',                 
      'kernel_id': u'1e3ce043029547f1a61c1996d1a531a5',                 
      'ramdisk_id': u'1e3ce043029547f1a61c1996d1a531a6',                    

A typical distiller file might look like this

A sample stack-distiller.yaml
- event_type: compute.instance.*
        - payload.tenant_id
        - _context_project_id
      fields: payload.user_id
        fields: _context_request_id
        - payload.instance_uuid
        - payload.instance_id
        - exception.kwargs.uuid
        - instance.uuid

For details on the distiller configuration grammar, see the associated docs. Note that the distiller config file is in YAML format.

When processed, the notification would be converted into an event that looks like this:

A sample event
  'event_type': 'compute.instance.create.end',
  'timestamp': datetime(2012, 05, 08, 20, 23, 48, 28195),
  'tenant_id': '7c150a59fe714e6f9263774af9688f0e',
  'user_id': '1e3ce043029547f1a61c1996d1a531a2',
  'request_id': 'req-d68b36e0-9233-467f-9afb-d81435d64d66',
  'instance_id': '9f9d01b9-4a58-4271-9e27-398b21ab20d1'

We call event key/value pairs Traits. This will be important when we start defining Streams.

Differences between a notification and an event

Supports nested-data
Supports lists
Max string length no limit 255
Float support
Integer support
Boolean support Coerced to Integer
Millisecond resolution datetime support
Datetime range support


Streams are the key to StackTach.v3. You should have a good understanding about the lifecycle of a stream and how to define a stream. So let's start with some basics ...

As events come into Winchester, they are sorted into streams. Any trait in the event can be used to differentiate which stream they go into. One event can go in to many streams. These traits are called distinguishing_traits since they are used to distinguish one stream from another. All of this is defined in the winchester configuration file.

A bare-bones Winchester config file.

    url: mysql://winchester:testpasswd@localhost/winchester

distiller_config: event_definitions.yaml
trigger_definitions: triggers.yaml
pipeline_config: pipelines.yaml

    logger: winchester.pipeline_handler:LoggingHandler
    usage: winchester.pipeline_handler:UsageHandler
    notabene: winchester.pipeline_handler:NotabeneHandler

The first thing you'll notice is the database connection string. But then you'll notice that the Winchester module needs three other configuration files. The distiller config file we've already covered. The other two require a little more explanation. They define your Triggers and your Pipelines.

We left that little detail out when we were explaining Yagi previously. But the WinchesterHandler needs to know where your winchester config file lives. You define this by adding a [winchester] section to your yagi config file.

config_file = winchester.yaml

Triggers and Pipelines

The trigger definition file tells winchester:

  • into which streams to place incoming events
  • which events to accept into a stream
  • when a stream is ready for processing
  • when a stream expires
  • how to process ready streams
  • how to process expired streams

Like the distiller config file, the trigger definition config file is YAML based.

Sample trigger def file
- name: my_trigger
  - request_id
  expiration: "$last + 1h"
  fire_pipeline: "my_fire_pipeline"
  expire_pipeline: "my_expire_pipeline"
  - event_type:
    - compute.instance.*
  - event_type: compute.instance.*.end

The matching criteria states the traits that need to match for the event to be included in this stream. The most common matching criteria is event_type, but you could also limit event inclusion by region, server, time range, etc. Depending on your application, it's generally better to be more inclusive of events initially.

Streams are buckets that collect events. The bucket the event goes in is determined by the distinguishing traits you define. Generally these are traits that have a somewhat constrained set of values. For example, instance_id, request_id, user_id, tenant_id, region, server, ip_address ... are all good choices. Timestamp is generally not a good distinguishing trait since it varies so greatly. You would end up with a different stream for every incoming event and each stream would only have one event in it. Not very useful. Also, you can define multiple distinguishing traits. For example: region and the "day" portion of the timestamp. This would produce one stream for each region for each day of the month. If you had five regions, you'd end up with 5*31 stream buckets. The choices are limitless.

At some point you have to do something with the data in your buckets. This is what the fire criteria defines. You can make time-based firing criteria (such as 2 hours past the last collected event) or trait-based criteria (such as "when you see the 'foo' event"). Wildcards are permitted in matching criteria. Time-based firings are defined with the "expiration" setting. There is a simple grammar for defining how much time has to elapse for a expiry to occur. We will go into detail on this later. For real-time stream processing, it's best to keep these expiries short or stick with trait-based firing criteria. Expiries = lag.

Finally, we define the pipelines that will process the streams when they fire or expire. Pipelines are sets of pipeline handlers that do the processing. A pipeline handler is called with all the events in that stream. The events are in the temporal order they were generated. A pipeline handler does not need to concern itself with querying the database. It has all that it needs. Out-of-the-box, StackTach.v3 comes with a collection of pipeline handler for computing OpenStack usage for billing as well as re-publishing new notifications back into the queue. More are constantly being added and writing your own pipeline handlers is trivial. But more on that later.

You can define different pipelines for streams that fire and streams that expire. In the trigger definition file you simply give the name of the pipeline. Your winchester config file points to the pipeline configuration file that lists the pipeline handlers to run.

Sample pipeline definition file

    - logger
    - usage
    - name: notabene
        host: localhost
        user: guest
        password: guest
        exchange_type: topic
            - usage_notifications

    - logger
    - usage

You can see the name of each of the pipeline handlers here. These will be called in the order they appear. You'll note that the notabene handler takes a set of parameters. These are passed into the handler when the pipeline is being processed. Back in the winchester configuration file you'll see where these handler names are mapped to the Python code that implements them. You can add your own handlers there.

Stream States

During pipeline processing each handler is called to process the events in the stream. A handler has three methods: handle_events(), commit() and rollback(). The handle_events() method is called for each handler in the order they're defined. If they all succeed, the commit() method of each handler is called. Otherwise, the rollback() method of each handler is called. No work should be performed in the handle_events() method. The data should be pre-computed and stored, but not actioned until in the commit() method. In the case of errors, the handle_event() method could be called many times. So, to ensure at-most-once functionality, non-reversible operations should be reserved for the commit() call. Things like, publishing new notifications, emitting metrics, sending emails, etc. should be done in commit(). rollback() is a last chance for you to unwind any work you may have performed.

Stream debugging

As you've seen, there are a lot of moving parts to high-volume stream processing. Dealing with thousands of events per second and monitoring many stream definitions is hard. We have added some special debugging tools to StackTach.v3 to help with this. There are better ways of debugging streams than turning your log output to DEBUG and sifting through the volumes of output. We recommend you keep your winchester log levels at INFO and turn on debugging on a per-stream basis. This is done in your trigger definition file.

- name: test_trigger
  debug_level: 2
  - instance_id
  - timestamp: "day"

The debug_level setting will enable special logging output for this stream only. The higher the number, the more detail (1 or 2 currently). Once enabled, and the yagi-event and pipeline-workers have been restarted, you will start to see log output like this:

yagi.stats[INFO line: 44] messages_sent:100|c[INFO line: 193] Update timer elapsed: 11 seconds[INFO line: 198]      Sent 1100 messages from[INFO line: 200]      Sent 1100 total messages[INFO line: 203]      Messages per second: 100.000000
winchester.trigger_manager[INFO line: 195] Received 116 notifications. Saved 103 events.
winchester.debugging[INFO line: 155] ---- Trigger Definition: test_trigger ----
winchester.debugging[INFO line: 140] Fire Criteria: 397 checks, 2 passed
winchester.debugging[INFO line: 144]  - Not in timerange = 1
winchester.debugging[INFO line: 144]  - Wrong event type = 295
winchester.debugging[INFO line: 144]  - Not enough matching criteria = 99
winchester.debugging[INFO line: 140] Match Criteria: 207 checks, 200 passed
winchester.debugging[INFO line: 144]  - No matching criteria = 1
winchester.debugging[INFO line: 144]  - not 'instance_id' = 2
winchester.debugging[INFO line: 144]  - Wrong event type = 4
winchester.debugging[INFO line: 148] Counter "New stream" = 58
winchester.debugging[INFO line: 148] Counter "Added events" = 100
winchester.debugging[INFO line: 148] Counter "Ready to fire" = 1
winchester.debugging[INFO line: 161] ----------------------------

The first lines are from yagi itself and tells you how many events were digested over the last N seconds. Assuming this number is greater than zero, we can move onto stream debugging.

The winchester.debugging lines will tell you how fire and matching criteria is progressing. In this case, it's saying that 397 firing criteria checks were made and only 2 passed. If your debug level is 2, you will get breakdown of the reasons the checks failed. You can use this information to review your trigger definitions and see if something could be wrong. Additionally, the matching criteria results are detailed. In this case we see that, of 207 events, 200 were acceptable. The details on the 7 rejected are listed below. Finally, some "counters" are supplied on the stream processing in general. 58 new streams were created on this pass, 100 new events added to various "test_trigger" streams, and 1 stream is ready to fire.

By selectively turning on per-stream debugging, you can quickly find processing problems and ignore a lot of log noise.

Pre-commit successful The handle_events() call was successful for this pipeline. +1 per pipeline, not per handler.
Pipeline error A handle_events() call failed. Details in log files.
Commit successful The commit() calls for the pipeline succeeded. +1 per handler.
Commit error A commit() call for the pipeline failed. +1 per handler.
Rollback successful The rollback() calls for the pipeline succeeded. +1 per handler.
Rollback failed A rollback() calls for the pipeline succeeded. +1 per handler.
Locked Attempted to fire/expire a stream that was locked in the db.
Unknown trigger def '[trigger]' Could not find the trigger definition in order to fire/expire this stream.
Unknown pipeline '[pipeline]' Could not find the fire/expire pipeline for this stream.
No fire pipeline for '[stream name]' A fire pipeline was not defined for this stream.
No expire pipeline for '[stream name]' An expire pipeline was not defined for this stream.
Streams fired The number of streams fired during this cycle.
Streams expired The number of streams expired during this cycle.
New stream New streams created for this trigger definition during this cycle.
Ready to fire Number of streams for this trigger definition ready to fire this cycle.
Added events Number of events added to this trigger definition during this cycle.
Match failed: not '[trait name]' Event did not match this trigger def because it was missing a needed distinguishing trait.
Match: Passed Some criteria matched for adding this event to a stream.
Match: No matching criteria No criteria matched to add this event to a stream.
Fire: Not enough matching criteria Number of fire criteria matches < criteria number (default 1).
Fire: Passed Enough criteria matched for adding this stream to fire.
Match/Fire: not [trait name] Criteria match failed because named trait not present in event.
Match/Fire: == failed Criteria match failed because named trait value != expected.
Match/Fire: > failed Criteria match failed because named trait value <= expected.
Match/Fire: < failed Criteria match failed because named trait value >= expected.
Match/Fire: Criterion match() fall-thru Criteria match failed because of undefined comparison operation.
Match/Fire: Time: not '[trait name]' Time criteria match failed because trait not present in event.
Match/Fire: Time: no referenced trait Time criteria match failed because time expression bad.
Match/Fire: Time: not in timerange Time criteria match failed because time value was not in required range.
Match/Fire: Wrong event type Criteria match failed because event is wrong type.
Match/Fire: Time: No timestamp trait Criteria match failed because referenced timestamp trait doesn't exist.
Match/Fire: Time: Not time yet Criteria match failed because 'timestamp' not in defined time ranges.

Winchester Pipeline Handlers

Winchester comes with a set of stock pipeline handlers for the most popular OpenStack operations.

The UsageHandler

The UsageHandler is a pipeline handler for determining the daily usage of every instance with an OpenStack Nova deployment. The usage handler is cells-aware so it can support large deployments.

The usage handler requires a stream per instance per day. It triggers when the compute.instance.exists event is seen. Audit notifications should be enabled within Nova. See the samples for an example of a usage stream definition.

Once triggered, the usage handler will compare the daily transactional events for every instance against the various .exists records for that instance. If nothing happens to an instance within that 24-hour period, an end-of-day .exists notification is sent from Nova. Nova operations that change the launched_at date for an instance will issue additional .exists records. These include create, delete, resize and rebuild operations. If the transactional events for the instance match the values in the .exists event, a compute.instance.exists.verified notification is created, otherwise a compute.instance.exists.failed and/or compute.instance.exists.warnings notifications are created. When coupled with the NotabeneHandler, these new notifications can be republished to the queue for subsequent processing.

The schema of these new notifications are as follows:

    'event_type': human readable name of event (eg: foo.blah.zoo)
    'message_id': unique message id (uuid)
    'timestamp': datetime this notification was generated at source
    'stream_id': stream id
    'original_message_id': message_id of .exists event
    'payload': {
      'audit_period_beginning': start datetime of audit period
      'audit_period_ending': ending datetime of audit period
      'launched_at': datetime this instance was launched
      'deleted_at': datetime this instance was deleted
      'instance_id': instance uuid
      'tenant_id': tenant id
      'display_name': instance display name
      'instance_type': instance flavor type description
      'instance_flavor_id': instance flavor type id
      'state': instance vm power state
      'state_description': human readable instance vm power state
      'bandwidth': {
         'public': {
           'bw_in': incoming bandwidth
           'bw_out': outgoing bandwidth
      'image_meta': {
        'org.openstack__1__architecture': image architecture
        'org.openstack__1__os_version': image version
        'org.openstack__1__os_distro': image distribution
        'org.rackspace__1__options': service provider specific (opt)
    'event_type': human readable name of event (eg: foo.blah.zoo)
    'message_id': unique message id (uuid)
    'timestamp': datetime this notification was generated at source
    'stream_id': stream id
    'original_message_id': message_id of .exists event
    'error': human readable explanation for verification failure
    'error_code': numeric error code (see below)
    'payload': {
      'audit_period_beginning': start datetime of audit period
      'audit_period_ending': ending datetime of audit period
      'launched_at': datetime this instance was launched
      'deleted_at': datetime this instance was deleted
      'instance_id': instance uuid
      'tenant_id': tenant id
      'display_name': instance display name
      'instance_type': instance flavor type description
      'instance_flavor_id': instance flavor type id
      'state': instance vm power state
      'state_description': human readable instance vm power state
      'bandwidth': {
         'public': {
           'bw_in': incoming bandwidth
           'bw_out': outgoing bandwidth
      'image_meta': {
        'org.openstack__1__architecture': image architecture
        'org.openstack__1__os_version': image version
        'org.openstack__1__os_distro': image distribution
        'org.rackspace__1__options': service provider specific (opt)

Tests currently performed by the UsageHandler include:

Error CodeMessageExplanation
U1 .exists has no launched_at value. We received a .exists event that has no launched_at value set.
U2 Conflicting '[trait]' values ('value1' != 'value2') A trait in the .exists record does not match the value of the related transactional event.
U3 .exists state not 'deleted' but .exists deleted_at is set. Nova says the instance is deleted, but the deleted_at trait isn't defined.
U4 .exists deleted_at less than .exists launched_at. The deleted_at trait is earlier than when the instance was launched.
U5 .exists deleted_at in audit period, but no matching .deleted event found. The deleted_at trait falls within the last 24hrs, but we didn't receive any .deleted events in that time frame.
U6 .deleted events found but .exists has no deleted_at value. We received transactional .deleted events, but the deleted_at trait in the .exists event is not defined.
U7 Multiple .delete.end events We should only get one compute.instance.delete.end event.
U8 .exists launched_at in audit period, but no related events found. We received a .exists event that has the launched_at trait within the last 24hrs, but there were no transactional events in that time frame.
    'event_type': human readable name of event (eg: foo.blah.zoo)
    'message_id': unique message id (uuid)
    'timestamp': datetime this notification was generated at source
    'instance_id': instance uuid
    'stream_id': stream id
    'warnings': [list of human readable warning messages]

The NotabeneHandler

The NotabeneHandler will take any new notifications (not events) it finds in the pipeline Environment variable and publish them to the RabbitMQ exchange specified. The handler will look for a key/value in the pipeline environment (passed into the handler on the handle_events() call).

In your pipeline definition, you can set the configuration for the NotabeneHandler as shown below. Note how the environment variable keys are defined by the env_keys value. This can be a list of keys. Any new notifications this handler finds in those variables will get published to the RabbitMQ exchange specified in the rest of the configuration. The queue_name is also critical so we know which topic to publish to. In OpenStack, the routing key is the queue name. The notabene handler does connection pooling to the various queues, so specifying many different servers is not expensive.

Because these environment keys have to be set before the notabene handler is called, it has to be one of the last handlers in the pipeline. The UsageHandler adds new notifications to the usage_notifications key. If the notabene handler is not part of the pipeline, these new notifications are dropped when the pipeline is finished.

    - logger
    - usage
    - name: notabene
        host: localhost
        user: guest
        password: guest
        port: 5672
        vhost: /
        library: librabbitmq
        exchange: nova
        exchange_type: topic
            - usage_notifications