Through the needle’s eye: Data science in production

Data science is often associated with fast and dirty data analysis + machine learning solutions that do not follow the software engineering practices. Many regard data science more like a Swiss army tool that combines incompatible data and software components in impromptu ways.

A data science project starts usually with webscraping or reading data from datalakes. Data thus collected need to be formatted and pre-processed using multiple tools (command-line tools, shell scripts, Python scripts, data cleaners etc.), and then loaded into an interactive environment (be it notebook, or IDE) for exploration. After trying out several strategies for data analysis, a data scientist presents her insights in a graph form or communicates the results to the management on the paper or with slideshows. A successful data model can be then transferred to backend engineers who re-implement it in production. The process is messy, but it does not have to be like that!

These days most of software startups use the agile development strategy, i.e. they release early and release often. Therefore, there is little time to move trough the standard process of designing data algorithms. The new approaches developed by a data scientist should be directly deployed into production. For some backend engineers it might sound like a recipe for disaster, but it’s not necessarily so, as long as we, data scientists, adhere to a few simple practices.

These days most of software startups use the agile development strategy, i.e. they release early and release often. Therefore, there is little time to move trough the standard process of designing data algorithms.

Make code not words

The question of programming languages evokes extreme emotions in most developers. Data scientists tend to choose languages that offer short development cycles (no compilation, dynamic typing etc.) and high-quality data munging libraries. On the other hand, software engineers prefer languages that offer reliability, performance, reproducible build tools and provide high-quality web frameworks.

These requirements are not necessarily exclusive, and you may find a few languages that fulfill the needs of data scientist and engineers alike. Personally, I would choose Python but there are other popular choices (for example, Scala). Therefore, I would recommend aspiring data scientists to invest their time into learning some of these languages together with a versatile editor (like Atom) even if it means leaving their comfort zone of data-aware IDEs and interactive notebooks (unfortunately, executing Jupyter notebooks is still an open problem).

Thus said, a serverless approach such as Amazon Lambda offers an alternative to one-to-rule-them-all language. By providing encapsulated environments, lambdas stay language-agnostic, provide a coherent interface for data interchange and can be easily combined with functions implemented in other languages. Although very tempting, such microservice architectures remain challenging it terms of maintainability and often monolithic single-language architectures are preferred.

I would recommend an aspiring data scientist to invest their time into learning some of these languages together with a versatile editor (like Atom) even if it means leaving their comfort zone of data-aware IDEs and interactive notebooks

In the world of coding styles

No matter what language you choose, you absolutely must adhere to common coding conventions. These conventions specify how classes, functions and variables should be named (lowercase, UPPERCASE or camelCase, with or without underscores etc.), how to indent code blocks, where and in which order to include headers and import external libraries. If you don’t know these conventions, ask you colleagues, they will most likely be able to provide you with pointers to the coding style guidelines (such as PEP8 for Python devs) or even give you configuration files for automatic checkers called linters.

Linters (such as pylint, pep8 etc.) are usually integrated in your editor (or relevant plugins can be separately installed), and they go through your code as you type (or when you save a file) highlighting any departures from coding conventions.

Adherence to coding conventions improves readability and maintainability of the code. Also the future yourself will thank the present you for keeping the code clean and consistent.

Code peer review – on the shoulders of giants

While working on data projects, we often neglect tracking the changes in the code. I don’t mean here old-fashioned, but widely practiced copying files to folders with version names, but rather using a dedicated version control system. Failure to use such systems can cause some severe problems with reproducibility of you analysis workflow (for example, you might not be able to recover the parameters that gave the best result of your prediction). A good version control system such as git is the solution to circumvent such problems. Although, there are several others, Git is the industry standard, and it also powers the popular social platform for coders called Github.

Github is a platform for collaborative code editing (but it’s not limited to computer code and can be as easily used for documents, 3d models, graphics etc.). One of its killer features is so-called pull request (PR) model, which allows any developer to propose changes to the project which she does not own. These changes can be later reviewed by other contributors and either corrections are suggested or the PR is accepted and merged with the main (master) branch 🎉🎉🎉. This is a common model for development of many open-source libraries (git itself was originally created by linux kernel developers) and it’s also widely adopted in the tech.

The essential step in the process is the code peer review, i.e., independent check of the code by another developer. There are many advantages of such code review. Most importantly, it allows one to catch bugs early. Secondly, it enforces the coding conventions. Lastly, it’s a form of peer learning making developers learn from each other.

[Code peer review] is a form of peer learning making developers learn from each other.

Therefore, next time make sure to ask a developer who did not contribute to your patch (like a Backend engineer) to review your code. Also, when you are asked about the same, never refuse (even when you think that the code is outside your competences) and be diligent about your review!

Test-driven development is not an oxymoron

The “test-driven development” or TDD might sound a bell. You might have heard that some people write tests even before implementing the functionality. This is not a legend.

Although you don’t have to go all-TDD, testing, and in particular, unit testing improves drastically the code quality even if tests are written after the function. You might think it a waste of time, but believe me, it will save you more time than any autocompletion feature or new array computing library. The “unit” part of unit testing means that you only test a single feature of your code and abstract it from all other functions of the code. It means that you should test outputs of the function for sample arguments. These outputs might be either a return value, data stored in a database, or a call to another function. What comes handy, are various mocking libraries, which allow you to replace some functions with “mock-ups” that can be easily accessed and their calls compared to the expected ones. Writing good test cases might be an art in itself, but I think that you already practice it when you test you data algorithms or machine learning models with minimal, artificial inputs. But don’t forget the corner cases!

You might think it [unit testing] a waste of time, but believe me, it will save you more time than any autocompletion feature or new array computing library.

Continuous integration is better than free lunch

Continuous integration is a system that runs the whole test suite on each commit, usually on a dedicated server in a clean environment. Therefore, it allows you to detect any bugs in the code introduced by your change (called regressions) before they are pushed to production. Continuous integration is not a way to catch you red-handed, but rather to release you from worries about releasing untested code. If your code coverage is nearly 100% (it’s an asymptote so don’t ever hope to reach it and you should not trust blindly code coverage), you can be almost sure that your change will not break the application.

Continuous integration is not a way to catch you red-handed, but rather to release you from worries about releasing untested code.

Personally, I treat unit tests as a piece of documentation – they document the protocol under which the software components should interact and under what conditions they may be run. Continuous integration is a way to enforce the protocol in new commits before they are merged.

If some bugs still manage to slip through the narrow filter of unit testing and continuous integration, make sure to reproduce them in a well-tailored test case before fixing them, so you can catch them early next time.

Continuous integration systems are tightly coupled with the build systems, which compile and package your code for distribution and sometimes even deploy it to production. Build systems create reproducible environments by obtaining the code and its dependencies in well-defined versions from reliable sources. Any problem in production then can be traced back to the particular version of the code or its dependencies. These days this is usually achieved through distributing the code as Docker containers.

Quality comes first

Yes, it’s still about testing. Quality is of paramount importance for all systems running in production. If you deploy a machine learning model to the production you must make sure that it will handle its peak traffic, that it will not introduce security vulnerabilities and data leakages, that it will run under all conditions and with all kinds of possible inputs (especially if its inputs are exposed to the external world through a public API). Even the best programmer, the best data scientist and the best continuous integration system will not ensure the total reliability of the data algorithm, or another code. Therefore, testing and monitoring production systems are essential. A few important approaches can be listed:

  • dogfooding – using your own product and checking all possible corner cases is vital. Organize frequent dogfooding sessions to which you should invite people from outside of your team.
  • error handling – make sure that you have a system in place that will catch any unhandled exceptions raised in production and will report them back to you (Sentry is a popular and proven solution).
  • stress testing – try to test you application under realistic traffic (could be simulated traffic or a replay of historical data).
  • performance monitoring – monitor all API calls, compute resources, database operations and identify bottlenecks quickly.
  • system monitoring – this is mostly for dev ops, but you should be aware of any potential points of failure in the system (database crashes, compute node overload, memory leaks, hardware failures).
  • security monitoring – although this comes last, it should be probably the first thing to think about. If you handle sensitive data (even as trivial as usernames), you must know that there are ways that bad agents can retrieve them without your permission and use them to target your users. Therefore, make sure to run a live protection over your application. There is myriad of solutions, but my favourite is obviously Sqreen.

Even the best programmer, the best data scientist and the best continuous integration system will not ensure the total reliability of the data algorithm, or another code. Therefore, testing and monitoring production systems are essential.

Communication is the new oil

No matter whether you are already familiar with all the points above and ran thousands of systems in production, you should always communicate with you colleagues and clients. Make sure that they know of the new data algorithm that you released today or planning to release (also try to explain in simple terms how it works). Be first to catch a potential vulnerability in the code and communicate it to security owners (if your company does not have a dedicated security team, talk to your CTO or senior engineers, don’t discuss it in public). If you realise that your recent release may contain a loop hole or introduces a memory leak, don’t wait with reporting it to your colleagues. Make sure to explain the magnitude and possible consequences of data leakage in you application to the team, clients and users. And most importantly, remember that everyone needs to learn and everybody makes mistakes, admitting it to yourself and others leads you towards mastership.

[…] everyone needs to learn and everybody makes mistakes, admitting it to yourself and others leads you towards mastership.

Acknowledgment and disclaimer

This advice comes from my experience as a software instructor at Carpentries bootcamps and Advanced Python Programming schools, my contributions to open source software (such as NumPy, matplotlib or recently pandas), but also from my recent adventure as a senior data scientist / Backend engineer at Sqreen! Nonetheless, the content of this post reflects solely my views and was not endorsed edited by other members of these communities.

If you have any feedback on the blog post or find an error, please contact me ASAP!

Streaming data with Amazon Kinesis

I wrote this blog post when working at Sqreen, a startup that develops Software-as-a-service (SaaS) solutions to protect web applications from cyber attacks. This post summarizes the streaming technology used to analyse the attacks in real time.

Introduction

At Sqreen we use Amazon Kinesis service to process data from our agents in near real-time. This kind of processing became recently popular with the appearance of general use platforms that support it (such as Apache Kafka). Since these platforms deal with the stream of data, such processing is commonly called the “stream processing”. It’s a departure from the old model of analytics that ran the analysis in batches (hence its name “batch processing”) rather than online. The main differences between these two approaches are:

  • stream processing deals with data that are punctual in time, i.e. with events that are generated at specific points in time, whereas batch processing is applied to data batches representing larger slices of time (for example, data stored in databases),
  • stream processing analyses data online, i.e. most often almost immediately after it arrives, whereas batch processing waits for the data collection to be finished (the moment can be defined arbitrarily, for example, at the end of the day) to analyze it off-line,
  • data analysed by stream processing is unbounded, i.e. it does not have the specific end, whereas the batches are bounded, i.e. they have a well-defined window.

Streams as distributed logs

Platforms such as Apache Kafka provide streams that receive data from event sources (producers) and pass them down to consumers, which in turn can forward them to other streams. In essence, they are similar to message queues, but they support multiple consumers that process the same messages in parallel (like in publish-subscribe messaging model) and store the old messages even after they were delivered to the consumers. They are a kind of append-only event logs (Figure 1). Logs are most commonly associated with the flat files sitting in the /var/log directory and meant to be read by a human. Streams are different: they are logs optimized for storing/provisioning binary data (that could be text but also fragments of images, sensor readings, etc.). This log-like design of streams allows new consumers to be added or removed without any impact on the remaining consumers at any point. Consumers can also start reading from the stream at any offset (any message in the past).

Amazon Kinesis Stream Architecture

Figure 1 A sketch of a stream. New events are appended at the left of the stream-log and are consumed by the consumers from right to left starting with any offset.

When events arrive at high frequency, a single machine may not keep up with processing them. In this case, both streams and their consumers can be distributed by partitioning the source events (Figure 2)/. Such a partition is done on a key that will simply be part of the logged messages.

Amazon Kinesis architecture stream

Figure 2 Events emitted from the source (producer) are forwarded to the stream. In this case, the stream is distributed into two shards: an event is sent only to a single shard depending on the partition key that is part of the message (here the IP address). Messages from each shard are handled independently by different consumers.

Streaming applications

Streams have found applications in many problems. They are commonly used for real-time data analytics (such as streams of twits), for replicating databases (both for performance and reliability reasons), for real-time monitoring and detection of special events (such as fraud detection) and for building data-intensive systems that require different representations of the same data (for example, databases for operations, indexes for fast queries, and data warehouses for running batch analyses).

Amazon Kinesis Data Streams (which we will call simply Kinesis) is a managed service that provides a streaming platform. It includes solutions for stream storage and an API to implement producers and consumers. Amazon charges per hour of each stream work partition (called shards in Kinesis) and per volume of data flowing through the stream.

Goal

The goal of this tutorial is to familiarize you with the stream processing with Amazon Kinesis. In particular, we will implement a simple producer-stream-consumer pipeline that counts the number of requests in consecutive, one-minute-long time windows. We will apply this pipeline to simulated data, but it could be easily extended to work with real websites. This is precisely one of the applications that we use Kinesis for at Sqreen (more about it below).

We will demonstrate stream processing using the Jupyter notebook. You can download the notebook from here and execute it on your computer (for instructions, see Jupyter documentation). Alternatively, you can copy-paste the code examples directly to your Python interpreter.

Requirements

To install dependencies, run the following commands at the command line (i.e. in the shell).

$ pip install aws

Configure AWS credentials

To connect to AWS, you must first create your credentials (you will get them from the AWS Console). Then, simply configure them using the following command:

$ aws configure --profile blogpost-kinesis

blogpost-kinesis is the name of the profile you will use for this tutorial. When requested you will need to copy-paste your acess key id and secret obtained from AWS Management Console. For instructions, check the relevant section of AWS User Guide.

Creating a stream

Let’s create our first stream. You can either do it using the AWS Console or the API. We will use the second approach. First, we need to define the name of the stream, the region in which we will create it, and the profile to use for our AWS credentials (you can aws_profile to None if you use the default profile).

stream_name = 'blogpost-word-stream'
region = 'eu-west-1'
aws_profile = 'blogpost-kinesis'

Now we can use boto library to create the stream and wait until it becomes active.

import boto
from boto.kinesis.exceptions import ResourceInUseException
import os
import time

if aws_profile:
    os.environ['AWS_PROFILE'] = aws_profile

# connect to the kinesis
kinesis = boto.kinesis.connect_to_region(region)

def get_status():
    r = kinesis.describe_stream(stream_name)
    description = r.get('StreamDescription')
    status = description.get('StreamStatus')
    return status

def create_stream(stream_name):
    try:
        # create the stream
        kinesis.create_stream(stream_name, 1)
        print('stream {} created in region {}'.format(stream_name, region))
    except ResourceInUseException:
        print('stream {} already exists in region {}'.format(stream_name, region))


    # wait for the stream to become active
    while get_status() != 'ACTIVE':
        time.sleep(1)
    print('stream {} is active'.format(stream_name))

Running the code, generates the following output:

create_stream(stream_name)

stream blogpost-word-stream created in region eu-west-1
stream blogpost-word-stream is active

Putting data into streams

To have an operational stream processing chain, we need a source of messages (a producer in AWS terminology) and a receiver (consumer) that will obtain and process the messages. We will first define the producer.

import datetime
import time
import threading
from boto.kinesis.exceptions import ResourceNotFoundException

class KinesisProducer(threading.Thread):
    """Producer class for AWS Kinesis streams

    This class will emit records with the IP addresses as partition key and
    the emission timestamps as data"""

    def __init__(self, stream_name, sleep_interval=None, ip_addr='8.8.8.8'):
        self.stream_name = stream_name
        self.sleep_interval = sleep_interval
        self.ip_addr = ip_addr
        super().__init__()

    def put_record(self):
        """put a single record to the stream"""
        timestamp = datetime.datetime.utcnow()
        part_key = self.ip_addr
        data = timestamp.isoformat()

        kinesis.put_record(self.stream_name, data, part_key)

    def run_continously(self):
        """put a record at regular intervals"""
        while True:
            self.put_record()
            time.sleep(self.sleep_interval)

    def run(self):
        """run the producer"""
        try:
            if self.sleep_interval:
                self.run_continously()
            else:
                self.put_record()
        except ResourceNotFoundException:
            print('stream {} not found. Exiting'.format(self.stream_name))

Note that for the partition key we used the IP address and for the data the timestamps. In Kinesis, you are almost completely free to choose whatever you want for the data, as long as it can be serialised in binary format and it’s less than 50 KB of size. If you need to emit larger data, you can split it into several messages. The partition key must be a string shorter than 256 characters, it will be used to determine which shard to send the data to (Figure 2). All data that should be processed together must use the same partition key, otherwise it may be forwarded to another shard.

Note that we implemented the KinesisProducer as a Python thread, such that it can run in the background and won’t block the Python interpreter. This way we can continue executing Python instructions.

Now we create two producers with different IP addresses and different intervals between consecutive messages.

producer1 = KinesisProducer(stream_name, sleep_interval=2, ip_addr='8.8.8.8')
producer2 = KinesisProducer(stream_name, sleep_interval=5, ip_addr='8.8.8.9')
producer1.start()
producer2.start()

Sqreen’s Security Automation feature allows one to monitor traffic on a website and set conditions under which a given client should be blocked (such as, trying to read the same page too many times). To implement this feature, we are running similar event sources that inform the stream about the IP addresses from which the requests are emitted together with the timestamp of the request (Figure 3).

Consuming from a stream

Consumers receive the messages from the stream and process them. Their output could be messages forwarded to another stream, file saved on the filesystem (or Amazon S3 storage) or records stored in a database. Consumers can also keep local state. This makes them uniquely suited to work on a stream of similar data and quickly calculate a value from them.

Defining a consumer

First, let’s define a generic consumer, which will consist of a run method polling for new events from the Kinesis stream and process_method that will process the event data and produce any of the side effects (i.e. forwarding the results to another stream or committing them to a database). The process_method will not be implemented in this generic base class, and it will need to be implemented in the sub-classes (see below).

from boto.kinesis.exceptions import ProvisionedThroughputExceededException
import datetime

class KinesisConsumer:
    """Generic Consumer for Amazon Kinesis Streams"""
    def __init__(self, stream_name, shard_id, iterator_type,
                 worker_time=30, sleep_interval=0.5):

        self.stream_name = stream_name
        self.shard_id = str(shard_id)
        self.iterator_type = iterator_type
        self.worker_time = worker_time
        self.sleep_interval = sleep_interval

    def process_records(self, records):
        """the main logic of the Consumer that needs to be implemented"""
        raise NotImplementedError

    @staticmethod
    def iter_records(records):
        for record in records:
            part_key = record['PartitionKey']
            data = record['Data']
            yield part_key, data

    def run(self):
        """poll stream for new records and pass them to process_records method"""
        response = kinesis.get_shard_iterator(self.stream_name,
            self.shard_id, self.iterator_type)

        next_iterator = response['ShardIterator']

        start = datetime.datetime.now()
        finish = start + datetime.timedelta(seconds=self.worker_time)

        while finish > datetime.datetime.now():
            try:
                response = kinesis.get_records(next_iterator, limit=25)

                records = response['Records']

                if records:
                    self.process_records(records)

                next_iterator = response['NextShardIterator']
                time.sleep(self.sleep_interval)
            except ProvisionedThroughputExceededException as ptee:
                time.sleep(1)

Implementing the processing logic

Note that each stream can have many consumers that receive all the messages and process them independently. Now, we will implement process_records method that will simply print the received messages to the standard output. We will do that by sub-classing the KinesisConsumer class.

class EchoConsumer(KinesisConsumer):
    """Consumers that echos received data to standard output"""
    def process_records(self, records):
        """print the partion key and data of each incoming record"""
        for part_key, data in self.iter_records(records):
            print(part_key, ":", data)

We attach the consumer to our stream. To do that we need to pass the shard ID and the position in the stream to start processing the messages. For the latter, we can choose between the newest (LATEST) or the oldest (TRIM_HORIZON) record in the stream. Note that the default retention period for messages in Kinesis streams is 24 hours. It can be extended up to 168 hours at an additional cost.

The streams are partitioned into separate “sub-streams” (called shards) that receive messages from the same source. The target shard for each message is determined from the partition key. Each consumer can read from one or more shards, but there must be at least one consumer must be associated to every shard, otherwise some messages will be lost. Since, we only use one shard in this example, we can directly pass the default shard ID. If you need to configure more than one shard (to increase the throughput), you will need to query the stream for the IDs of all active shards using the API. For the sake of this tutorial, we will assume that we have only a single shard (this is clearly the case, since we created the stream with a single shard, see the call to kinesis.create_stream above).

shard_id = 'shardId-000000000000'
iterator_type = 'LATEST'
worker = EchoConsumer(stream_name, shard_id, iterator_type, worker_time=10)

Now, let run the consumer and observe the output:

worker.run()

8.8.8.8 : 2018-09-06T08:04:27.796125
8.8.8.8 : 2018-09-06T08:04:29.877330
8.8.8.9 : 2018-09-06T08:04:30.895562
8.8.8.8 : 2018-09-06T08:04:31.963790
8.8.8.8 : 2018-09-06T08:04:34.015333

As expected the consumer printed all received records with their partition keys (IP addresses) and data (timestamps).

Event aggregation

Finally, we can implement a consumer with some non-trivial logic. The goal of this consumer is to count the number of distinct requests from each particular IP in a specific time window (here one minute). Again, we will subclass the KinesisConsumer class and re-implement the process_records method. In addition, we will define one extra helper method print_counters that will simply dump the current counts to the standard output. In practice, we would forward the outputs of such processing to another stream for further analysis (filtering, detection of untypical events etc.) or store it in the DB. This is a part of what actually happens in Sqreen’s Security Automation pipeline (see below).

from collections import defaultdict, Counter
from dateutil import parser
from operator import itemgetter

class CounterConsumer(KinesisConsumer):
    """Consumer that counts IP occurences in 1-minute time buckets"""

    def __init__(self, stream_name, shard_id, iterator_type, worker_time):
        self.time_buckets = defaultdict(Counter)
        sleep_interval = 20 # seconds
        super().__init__(stream_name, shard_id, iterator_type, worker_time, sleep_interval)

    def print_counters(self):
        """helper method to show counting results"""

        now = datetime.datetime.utcnow()
        print("##### Last run at {}".format(now))
        for timestamp, ip_counts in self.time_buckets.items():
            # sort counts with respect to the IP address
            ip_counts = sorted(ip_counts.items(), key=itemgetter(0))
            print(timestamp, ':', list(ip_counts))

    def process_records(self, records):
        for ip_addr, timestamp_str in self.iter_records(records):
            timestamp = parser.parse(timestamp_str)
            timestamp = timestamp.replace(second=0, microsecond=0)
            self.time_buckets[timestamp][ip_addr] += 1
        self.print_counters() 

Let’s test the consumer:

worker = CounterConsumer(stream_name, shard_id, iterator_type, worker_time=120)
worker.run()
##### Last run at 2018-09-06 08:04:56.468067
2018-09-06 08:04:00 : [('8.8.8.8', 9), ('8.8.8.9', 4)]
##### Last run at 2018-09-06 08:05:16.563615
2018-09-06 08:04:00 : [('8.8.8.8', 11), ('8.8.8.9', 4)]
2018-09-06 08:05:00 : [('8.8.8.8', 8), ('8.8.8.9', 3)]
##### Last run at 2018-09-06 08:05:36.670241
2018-09-06 08:04:00 : [('8.8.8.8', 11), ('8.8.8.9', 4)]
2018-09-06 08:05:00 : [('8.8.8.8', 17), ('8.8.8.9', 7)]
##### Last run at 2018-09-06 08:05:56.775192
2018-09-06 08:04:00 : [('8.8.8.8', 11), ('8.8.8.9', 4)]
2018-09-06 08:05:00 : [('8.8.8.8', 27), ('8.8.8.9', 11)]
##### Last run at 2018-09-06 08:06:16.881760
2018-09-06 08:04:00 : [('8.8.8.8', 11), ('8.8.8.9', 4)]
2018-09-06 08:05:00 : [('8.8.8.8', 29), ('8.8.8.9', 12)]
2018-09-06 08:06:00 : [('8.8.8.8', 8), ('8.8.8.9', 3)]

All the lines prefixed by the hash signs ##### show the results of the counting process for a single run of the consumer. Since the consumer is executed each time new events arrive, the lines show updated state of the time_buckets cache. Each line starts with the timestamp denoting the beginning of the time bucket (it ends with the beginning of the next time bucket, i.e. the windows do not overlap), and the it’s followed by the list of IP address and count pairs. Every time the consumer runs the values are updated, such that the counts increase. If new requests arrive at the time that is not covered by any of the buckets, a new bucket is added and the count starts from zero for this bucket. The effect is roughly what we tried to achieve.

How is streaming used at Sqreen?

At Sqreen we intensively use Kinesis streams, especially in the feature called Security Automation. Security Automation is a real-time analytics framework that allows user to control traffic on their servers based on well-defined criteria (called playbooks).

A simplified sketch of Sqreen's streaming pipeline

Figure 3 A simplified sketch of Sqreen’s streaming pipeline.

Our pipeline consists of several streams and associated consumers (Figure 3). The events are produced by agents that sit in the web apps of Sqreen users. They contain the basic information about the connection (source IP etc.) and any extra details relevant to the business logic of user’s application. These events are then consumed by a consumer that filters the events and forwards them to the Log stream. The Detection consumer consumes from the Log stream, applies playbooks and detects anomalies (for example, too many requests from a single IP) and generates a response (for example, notify the owner of the webapp or block the IP). In parallel, the messages from Log stream are consumed by the Counter consumer that does the aggregation similar to the one demonstrated in this tutorial. These aggregated data are then stored in a database and exposed in the form of a graph. This approach, in which data is processed in parallel in different ways to obtain different views is typical for stream processing. Note that Detection and Counter consumers read from Log stream with a different offset and do not interfere with each other (for example, if one consumer crashes or has a significant backlog, the other consumer is not affected). At Sqreen this design allows us to have multiple actions associated with the messages coming from the user web apps (IP blocking, notifications, logging, monitoring etc.).

Conclusions

We demonstrated how to use Amazon Kinesis on a request counting example. Although the example was simplified, it contained the basic components of all stream processors — two producers, a stream (with a single shard) and one consumer. You can easily take this example and adapt it to your needs.

One important limitation of the present CounterConsumer is that it keeps in memory and print all counting windows at each run of the consumer. In real applications, we might save only the completed windows in the database and remove them from the time_buckets cache. This is not a trivial problem, because we can never be sure whether some events will arrive late, for example, due to some network delay or temporary network outage.

Another extension of CounterConsumer is to allow for an overlap between the windows. This overlap would provide some smoothing in the counts and make our pipeline more responsive, because the end user would not have to wait for the full window to be complete before seeing a new event being added to the counts.

Last, but not least we did not cover an important topic of spawning new consumers in the case when the existing consumer fails or we want to increase the number of shards. Similarly, we did not talk the checkpointing that allows for recovery of consumer state from the crash. These are non-trivial problems but they can be handled by the Amazon Kinesis Client Library (KCL), which is based on a Java-based orchestrator called MultiLangDaeamon. We will look into running stream consumer process with KCL in a follow-up blog post.

Cleaning up

We can delete the stream at the end of the exercise to minimize AWS costs (you will be charged for each stream-hour whether you use the created stream or not).

kinesis.delete_stream(stream_name)

stream blogpost-word-stream not found. Exiting
stream blogpost-word-stream not found. Exiting

The two messages are printed by the consumers that do not find the stream anymore and have to exit.

Further reading

[1] Jan Kreps, The Log: What every software engineer should know about real-time data’s unifying abstraction, 2013, blogpost

[2] Martin Kleppmann, Designing data-intensive applications, O’Reilly media, 2017

[3] Martin Kleppmann, Making Sense of Stream Processing, O’Reilly media, 2016, read online

Giving presentations with IPython notebook

IPython notebook became a very popular tool for programming short scripts in Python, interactive computing, sharing code, teaching or even demonstrations. Its advantage is the possibility to combine Python code with graphics, HTML, videos or even interactive JavaScript objects in one notebook. With this functionality it may also serve as a great presentation tool.

Continue reading “Giving presentations with IPython notebook”

6 steps to migrate your scientifc scripts to Python 3

Python 3 has been around for some time (the most recent stable version is Python 3.2), but till now it was not widely adopted by scientific community. One of the reason was that the basic scientific Python libraries such as NumPy and SciPy were not ported to Python 3. Since this is no longer the case, there is no reasons anymore to resist migration to Python (you can find the pros and cons on the Python website)

In this guide I am going to describe some tips that I learnt while trying to make my scripts compatible with Python 3. There is nothing to be afraid of – the procedures are actually quite easy and very rewarding (it is like a glimpse into the future of Python!).
Continue reading “6 steps to migrate your scientifc scripts to Python 3”

Scientific computing with GAE and PiCloud

Google App Engine (GAE) is a great platform for learning web programming and testing out new ideas. It is free and offers great functionality, such as Channel API (basically Websockets). Deployment is as easy as clicking a button (on a Mac) on running a Python script (on Linux). The best of all is that you can program in Python and offer an easy end-user web interface without time consuming installation, dependencies and nerves. Continue reading “Scientific computing with GAE and PiCloud”

MNS 2008/09

The Model of Neural Systems programming course will start on Monday, October 27th. It will be given by Robert Schmidt and me. The first programming assignments are available on the course webpage. See you all on Monday!