Long Time, no post. Today we talk about distributed real-time systems, Logging systems and 'The Cloud'

*dusts off the computer monitor, flicks a few switches and the hum of memory banks energizing can be heard in the background*

Good lord, has it been this long since I wrote a blog post? Wow.

Sorry about that folks.

Anyway, I've been doing a lot of things lately that are sort-of game related, but actually a little more mainstream. And I thought I would share it with you guys out there.

So, a little background, for you to get up to speed.

I shipped Dragon Age: Inquisition. I'm immensely proud of what we were able to accomplish on that title. A lot of firsts for me. And an amazing team that I had the luck to work with.

But times change. Family wanted to be back on the west coast. BC in specific. Apparently once it's in your blood, it's there for life.

Anyway, the fine folks at Demonware (demonware.net) were gracious enough to offer me a position there and boy howdy, has it been a learning experience.

One of the things I'm working on is real-time event ingestion. That's not something I'll be talking about here (at least, not for a while), but I would like to talk about a couple of technologies I've come across that might be of interest to the general public, as well as some of the fine folks at Demonware.

So, quick poll - how many people are familiar with metrics and event logging? Show of hands? Yeah, not a lot of my readers would be familiar with that. I was in the same boat. Coming from a game dev background, logging is something you do when you're debugging a build. But 'logging' for me meant 'printf''s all over the place. Or maybe even something a little more fancy in our scripts for manipulation assets.

What if I told you there was something else that you could use, that would persist that data. It's not a deep store, but it's in the 'days' range. What if I told you there were a number of things out there, Open Source that could do that. There are! The one we're going to look at is something called 'Kafka'. It's from Apache (you know, the guys that make that web server thing). You can find it here: http://kafka.apache.org/

No, I'm not going to go into how to install/set it up. That's a topic that already has enough coverage. So go, Google that, do some digging, maybe even set up an instance. I'll wait.

Cool, isn't it? The things we're going to be focusing on here are Kafka Topics and Producers.

Just to re-iterate: A Topic is a container for like messages sent to Kafka. So you could create a topic in Kafka called 'Failed.build.asset' and store a text blob in that message that represented the asset that failed to build, the machine that failed to build it, the time and any error information you wanted to persist. What sends that data to Kafka is called a 'producer'.

And I'm going to show you how to build one in Python.

I've created a git repository here https://github.com/Nuclearfossil/storm-tutorial for those playing along at home. The script is called datapusher.py and you can find it in the python folder.

from kafka.client import KafkaClient
from kafka.producer import SimpleProducer

import time
import random
import logging

logging.basicConfig(level=logging.INFO)

pronouns_possessive = [ "my", "your", "his", "her", ...]
nouns_singular = ["attorney-general", "brother-in-law", ...]
nouns_plural = ["attorneys-general", "brothers-in-law", ...]
verbs_singluar = ["become", "catch", "drink", "drive", ...]
verbs_plural = ["becomes", "catches", "drinks", "drives", ...]
infinitives = ["to make a pie.", "for no apparent reason.", ...]

# Make sure to change 'hostname' below.
broker = "hostname:9092"

logging.info("Connection to Kafka = {0}".format(broker))

client = KafkaClient(broker)
producer = SimpleProducer(client)
index = 0
logging.info("Sending strings")

client.ensure_topic_exists("sentenceTutorial")

while index < 10000:
    index+=1
    sentence = "{0} {1} {2} {3} {4}".format( random.choice(pronouns_possessive),
        random.choice(nouns_singular),
        random.choice(verbs_plural).lower(),
        random.choice(nouns_singular).lower() or random.choice(nouns_plural).lower(),
        random.choice(infinitives))
    producer.send_messages('sentenceTutorial', sentence)
    logging.info("sent {0} of 10000. Message: {1}".format(index, sentence))
    time.sleep(5)

So, aside from a pretty crappy sentence generator (feel free to update it), the meat of the code lives inside the KafkaClient and the SimpleProducer interface. The Broker is Kafka parlance for where the system broker for Kafka lives. The underlying broker technology Kafka uses is Zookeeper (you can look that up). I think I'm going to do an article on that at some point as it's interesting tech as well. For now, think of Zookeeper as a distributed registry (like the registry in Windows) for apps.

We push a message into a Kafka topic "sentenceTutorial" via the send_messages() method. We're not limited to sending one message at a time to Kafka, but they do have to be on the same topic.

So, what's in Kafka now? In the topic "sentenceTutorial" there will be a new string pushed once every 5 seconds. Kafka has lots of ways of printing out what's in the topic, so read up on kafka_console_consumer.

Alright. Now we have something putting content into Kafka. Now we want to consume that data. And by consume, I mean read. What we write doesn't actually remove content from Kafka. That data will naturally expire based on a sliding window defined in the Kafka setup.

So, this is where Apache Storm comes into the mix. There are a bunch of other real time systems available, but Storm has the longest legs on it. That and so far, it's been the most reliable.

So, in the java folder of the storm-tutorial repo, I have a stand-alone storm project that will also (I think) work in a clustered environment.

I'll leave it as an exercise for the reader to examine the pom.xml file to see how I've set up the project for Maven. It's as simple and straight-forward as you can make a Maven project.

Now for the actual code. This will end up being a starter point for more tutorials, so it's best to start off simple. This is nothing more than a main class with a single class for doing the processing.

So, what does Storm 'do'? From the Apache Storm website:
Apache Storm is a free and open source distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is a lot of fun to use!

You need to have a cluster set up for doing any distributed processing, but we can simulate that on a single machine (and we can also debug it better).

How does it work? It's essentially a network topology that defines data 'Spouts' and process 'Bolts' that transform the data. You use the topology framework to define the network of these Spouts and bolts.

Here's an example from the repo:

// The TopologyBuilder is what defines the topology for the storm
// processing 'network'
TopologyBuilder builder = new TopologyBuilder();

// This next section defines the configuration information for the Kafka
// spout.
GlobalPartitionInformation hostsAndPartition = new GlobalPartitionInformation();

// Make sure to change 'hostname' below
hostsAndPartition.addPartition(0, new Broker("hostname", 9092));
BrokerHosts brokerHosts = new StaticHosts(hostsAndPartition);
SpoutConfig config = new SpoutConfig(brokerHosts, "sentenceTutorial", "/demonwaretutorial", "987654321");

// defining a scheme here does some default data transformation.
// if we don't define it, the data comes across as raw binary.
config.scheme = new SchemeAsMultiScheme(new StringScheme());

// This is where we define a spout in our topology
builder.setSpout("spout", new KafkaSpout(config), 1);

// This is were we define our bolt in the topology and what data it receives
builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("spout");

What we're left with, defined in the builder instance is a topology that takes data from the Kafka spout "spout" and feeds that data into a 'PrinterBolt' bolt.

Note the 'StringScheme' that is bound to the config.scheme. In essence, this does some preliminary data transformation of the data. I'll dig into that in a later tutorial as well (as I'm still figuring it out).

That PrinterBolt looks like this:

public static class PrinterBolt extends BaseBasicBolt {
    static Logger logger = Logger.getLogger(PrinterBolt.class);
    /**
     * This execute method reads the data ingested by the spout and simply
     * logs it.
     * It's a very straightforward bolt.
     *
     * @param input
     * @param collector
     */
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String msg = input.getString(0);
        logger.info(String.format("Received message: %s", msg));
    }
    /**
     * Currently this does nothing.
     * @param declarer
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
}

I recommend stepping through the execute method to see what is in the Tuple that is passed in. The long and short of it is, it's a map of objects and in this case, we're getting the first string in that map.

One last thing. In the main function, we also do this:

if (args != null && args.length > 0) {
    // So, if we're in here, we're running the topology on a cluster.
    conf.setNumWorkers(3);
    StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
    // if we're here, we're running stand-alone.
    conf.setMaxTaskParallelism(3);
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("tutorial01", conf, builder.createTopology());
    Thread.sleep(100000);
    cluster.killTopology("tutorial01");
    cluster.shutdown();
}

Like the comments say, the first block is for running in a distributed cluster mode. The second part runs it in stand-alone mode.

So, feel free to give the project a go. Make sure you change all the instances of 'hostname' in the code to your Kafka instance to get this running. I'll eventually make this so it's far more configurable for both the Java and Python codebases.

This was a lot. There also isn't a lot of theory in here. I think I'll tackle that in the next blog post.

Enjoy and looking forward to the comments!

Comments

Popular Posts