Wakatta!

Like Eureka!, only cooler

Seven Databases in Seven Weeks Redis Day 3

Wow, almost two months since I wrote Day 2, and more than one since the last post in this series… Time to bring it to an end.

Today is less about Redis (indeed, it is hardly used at all), and more about a concept: Polyglot Persistence, and about an implementation that showcases the concept.

In fact, I spent most of my time browsing the documentation of Node.js, the library/framework the authors used to build the demo application.

Polyglot Persistence

Polyglot Persistence, the use of several kinds of storage systems in a project, makes even more sense than Polyglot Programming (the use of several languages in a project).

While languages are, by and large, equivalent in expressive power, and mostly a matter of choice, culture, or comparative advantage (some languages favour small teams, other large ones), storage systems are sufficiently different that they are not interchangeable.

Once the idea of eventual consistency takes root, it is only a simple extension to view the data as services available from a number of sources, each optimised for its intended use (instead of a single, default source that only partially meets the more specialised needs), and with its own update cycles.

The problem, of course, is that it introduces several levels of complexity: development, deployment, monitoring, and a dizzying range of potential errors, failures, …

Polyglot Persistent Service

The implementation described in the book is small enough to fit in less than 15 pages, yet rich enough to show what is possible.

The databases are (with the versions I used):

  • Redis 2.4.8
  • CouchDB 1.1.1
  • Neo4j Community 1.6.1

and the glue language is Node.js.

Redis

Redis is used first as initial storage for the first data take-on. It is then used to track the transfer of data between CouchDB and the other databases, and finally to support auto-completion of band names.

CouchDB

CouchDB is intended as the System Of Records (i.e. master database) for the system. Data is meant to be loaded into CouchDB first, then propagated to the other databases.

Beside that, it is not used much, and after the exercises, not used at all…

Neo4j

Neo4j keeps a graph of bands, members, and instruments (or roles), and their relationships.

Node.js

Node.js is a framework/library for JavaScript based on the concept of event-based programming (similar to, but perhaps more radical than, Erlang). All I/O is done in continuation-passing style, which means that whenever a I/O operation is initiated, one of the argument is a function to handle whatever the operation produces (or deal with the errors).

This is good from a performance point of view, but it is of course more complex to design and code with. Still, it looks like a fun tool to glue various servers together.

Book Code Fixes

I had to fix some of the code from the authors (nothing serious, and all reported in the errata):

  • populate_couch.js: the trackLineCount has an off-by-one error. The check for completion should be totalBands <= processedBands
  • bands.js: the initialisation of membersQuery in the function for the /band route has a syntax error. It should be
1
2
membersQuery = 'g.V[[name:"'+bandName+'"]]' 
             + '.out("member").in("member").uniqueObject.name';

Updating the Code

The book uses a now dated version of Neo4j, so the queries do not work. The shortcut to access a node by index does not work anymore, and the uniqueObject step has been replaced by dedup.

Here are the updated relevant portions:

/band Route
1
2
membersQuery = 'g.idx("bands")[["name":"'+bandName+'"]]'
             + '.out("member").in("member").dedup.name';

and

/artist Route
1
2
rolesQuery = 'g.idx("artists")[["name":"'+artistName+'"]].out("plays").role.dedup';
bandsQuery = 'g.idx("artists")[["name":"'+artistName+'"]].in("member").name.dedup';

Exercises

I’m not sure what the second homework exercise was supposed to be about: Neo4j already contains information about members and memberships. Perhaps it dates from an early draft, before this chapter’s code evolved into what it is now. In any case, the first exercise had enough Neo4j anyway.

Adding Band Member’s start and end dates

The start and end dates for memberships in bands is sometimes provided; the purpose of this exercise is to use this information.

Pre-populate

I load the start and end dates into their own key in Redis. The key format are from:bandName:artistName and to:bandName:artistName.

First I take the data from the relevant columns:

Extracting Data
1
2
3
4
5
6
var
  artist = data[2],
  band = data[3],
  roles = buildRoles(data[4]),
  from = data[5],
  to = data[6];

Then, if they’re not empty, I create the keys in Redis:

Updating Redis
1
2
3
4
if (from != '')
  redis_client.set('from:' + band + ':' + artist, from);
if (to != '')
  redis_client.set('to:' + band + ':' + artist, to);

CouchDB

Adding the information to CouchDB is not hard; the main difficulty is to figure out how to modify the populate_couch.js script (continuation-passing style is hard).

Eventually, I just reused the roleBatch (therefore renamed artistInfoBatch) to retrieve the roles, from and to information.

Retrieving the Information
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var artistInfoBatch = [];
artists.forEach(function(artistName) {
  artistInfoBatch.push([
    'smembers',
    'artist:' + bandName + ':' + artistName
  ]);
  artistInfoBatch.push([
    'get',
    'from:' + bandName + ':' + artistName
  ]);
  artistInfoBatch.push([
    'get',
    'to:' + bandName + ':' + artistName
  ]);
});

The putting it in CouchDB is trivial:

Building Documents
1
2
3
4
5
6
7
8
9
10
artists.forEach( function(artistName) {
  var artist = { name: artistName, role : artistInfo[i++] },
      from = artistInfo[i++],
      to = artistInfo[i++];
  if (from)
    artist['from'] = from;
  if (to)
    artist['to'] = to;
  artistDocs.push(artist);
});

Neo4j

Neo4j was the hardest piece of the puzzle: I didn’t know, and could not find any definitive documentation on, how to relationship properties at creation time. Eventually I found that adding them to the data attribute passed at creation time did the trick (although it still took me more time to understand how to use them).

The problem to do so is that the neo4j_caching_client.js library does not support adding properties to relationships, but it was easy enough to modify this library to add this feature.

Relationship properties
1
2
3
4
5
6
7
8
9
neo4jClient.createRelationship = function(fromNode, toNode, type, callback, props) {
  var fromPath = (fromNode || '').replace(/^.*?\/db\/data\//, ''),
      rel = { to: toNode, type: type };
  if (props)
    rel.data = props;
  neo4jClient.post(
    [fromPath, 'relationships'], rel, callback
  );
}

then the relevant properties can be passed to the function above in the graph_sync.js script:

Passing from and to properties
1
2
3
4
5
6
7
8
9
var props = {};
progress.emit('progress', 'artist');
if (artist.from)
  props.from = artist.from;
if (artist.to)
  props.to = artist.to;
relate(bandNode.self, artistNode.self, 'member', function(){
  progress.emit('progress', 'member');
}, props);

Using the new data

To make use of the new data, I tried to differentiate between current and old members of a band. I simply define a current member as one whose to property is null.

Figuring how to write a Gremlin query that extracted the information I needed was challenging: the documentation is often sparse, and many concepts barely explained.

I found that I could collect nodes or relationships along a path by naming them (with the step as), and then gather all of them in a single row of a Table. I used this to get both the from, to properties and the artist name property in a single query. However, I spent some time tracking a bug in my filters where apparently, null to would not be returned as current members. I finally realise that when a given node or relationship is given two different names, these names will appear in reverse order in the Table.

So in my case, the query:

1
2
3
g.idx("bands")[["name":"Nine Inch Nails"]].outE("member").as("from").as("to")
.filter{it.to != null}.inV.as("name")
.table(new Table()).{it.to}{it.from}{it.name}.cap()

I give the names from and to to the relationship, but used them in reverse order in the Table closures. Is this the intended behaviour or a bug? Does anybody know?

It seems like a common problem with some NoSQL databases: the query language feels very much adhoc, and not entirely sound or fully thought through. Despite its many defects, SQL was at least based (if sometimes remotely) on the relational calculus, which gave a precise meaning to queries. It was further specified in different standards, so that even its defects were fully clarified (XPath/XQuery is another pretty well specified query language). When playing with NoSQL databases that pretend to have a query language, I often find it difficult to go beyond the simpler examples, precisely because of this linguistic fuzziness.

But I solved it for this case, so now I have my Table. It is an object with two properties: columns is an array of column names, and data is an array of arrays (each one being a row). To convert them to an array of objects, I use the following code:

Convert Table data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
function convertGremlinTable(table) {
    return fromTableToObject(table[0][0].columns, table[0][0].data);
}

function fromTableToObject(columns, data) {
    var res = [];
    for (var i = 0; i < data.length; i++) {
        var obj = {};
        for (var j = 0; j < columns.length; j++) {
            if (data[i][j] != 'null')
                obj[columns[j]] = data[i][j];
        }
        res.push(obj);
    }

    return res;
}

The rest of the code is just the nested Node.js event functions, and the formatting using the mustache (which was pretty cool and easy to use).

Full Code

(pre_populate.js) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/***
 * Excerpted from "Seven Databases in Seven Weeks",
 * published by The Pragmatic Bookshelf.
 * Copyrights apply to this code. It may not be used to create training material, 
 * courses, books, articles, and the like. Contact us if you are in doubt.
 * We make no guarantees that this code is fit for any purpose. 
 * Visit http://www.pragmaticprogrammer.com/titles/rwdata for more book information.
***/

var
  // The band data file name in tab-seperated form
  tsvFileName = 'group_membership.tsv',
  // track how many file lines we've processed
  processedLines = 0,

  // standard libraries
  csv = require('csv'),
  redis = require('redis'),

  // database clients
  redis_client = redis.createClient(6379);

/**
 * A helper function that splits up the comma-seperated list of roles and
 * converts it to an array. If no valid roles exist, return an empty array.
 * @param string the CSV to split into a role array
 */
function buildRoles( string ) {
  var roles = string.split(',');
  if(roles.length === 1 && roles[0] === '') roles = [];
  return roles;
};

/**
 * Utility function that increments the total number
 * of lines (artists) processed and outputs every 1000.
 */
function trackLineCount() {
  if( ++processedLines % 1000 === 0 )
    console.log('Lines Processed: ' + processedLines);
}

/**
 * Does all heavy lifting. Loops through the CSV file
 * and populate Redis with the given values.
 */
function populateRedis() {
  csv().
  fromPath( tsvFileName, { delimiter: '\t', quote: '' }).
  on('data', function(data, index) {
    var
      artist = data[2],
      band = data[3],
      roles = buildRoles(data[4]),
      from = data[5],
      to = data[6];

    if( band === '' || artist === '' ) {
      trackLineCount();
      return true;
    }

    redis_client.sadd('band:' + band, artist);
    roles.forEach(function(role) {
      redis_client.sadd('artist:' + band + ':' + artist, role);
    });

    if (from != '')
      redis_client.set('from:' + band + ':' + artist, from);

    if (to != '')
      redis_client.set('to:' + band + ':' + artist, to);

    trackLineCount();
  }).
  on('end', function(total_lines)
  {
    console.log('Total Lines Processed: ' + processedLines);
    redis_client.quit();
  });
};

populateRedis();
(populate_couch.js) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/***
 * Excerpted from "Seven Databases in Seven Weeks",
 * published by The Pragmatic Bookshelf.
 * Copyrights apply to this code. It may not be used to create training material, 
 * courses, books, articles, and the like. Contact us if you are in doubt.
 * We make no guarantees that this code is fit for any purpose. 
 * Visit http://www.pragmaticprogrammer.com/titles/rwdata for more book information.
***/

var
  // how many bands we expect to process
  totalBands = null,
  // and keep track of how many bands we have processed
  processedBands = 0,
  // The name of the couch database
  couchDBpath = '/bands',

  // standard libraries
  http = require('http'),
  redis = require('redis'),

  // database clients
  couchClient = http.createClient(5984, 'localhost'),
  redisClient = redis.createClient(6379);

/**
 * A helper function that builds a good CouchDB key
 * @param string the unicode string being keyified
 */
function couchKeyify( string )
{
  // remove bad chars, and disallow starting with an underscore
  return string.
    replace(/[\t \?\#\\\-\+\.\,'"()*&!\/]+/g, '_').
    replace(/^_+/, '');
};

/*
 * Keep track of the number of bands processed, output every 1000 loaded,
 * and close the Redis client when we've loaded them all.
 */
function trackLineCount( increment ) {

  processedBands += increment;

  // output once every 1000 lines
  if(processedBands % 1000 === 0)
    console.log('Bands Loaded: ' + processedBands);

  // close the Redis Client when complete
  if(totalBands <= processedBands) {
    console.log('Total Bands Loaded: ' + processedBands);
    redisClient.quit();
  }
};

/*
 * Post one or more documents into CouchDB.
 * @param url is where we POST to.
 * @param docString a stringified JSON document.
 * @param count the number of documents being inserted.
 */
function postDoc( url, docsString, count ) {

  var request = couchClient.request(
    'POST',
    url,
    { 'Content-Type' : 'application/json' });
  request.end( docsString );

  request.on('response', function(response) {
    if(response.statusCode == 201)
      trackLineCount( count );
  }).
  on('error', function(e) {
    console.log('postDoc Got error: ' + e.message);
  });
};

/*
 * Loop through all of the bands populated in Redis. We expect
 * the format of each key to be 'band:Band Name' having the value
 * as a set of artist names. The artists each have the list of roles
 * they play in each band, keyed by 'artist:Band Name:Artist Name'.
 * The band name, set of artists, and set of roles each artist plays
 * populates the CouchDB documents. eg:
  {
    name:"Nirvana",
    artists:[{
      name: "Kurt Cobain",
      roles:["Lead Vocals", "Guitar"]
    },...]
  }
 */
function populateBands() {

  // First, create the couch database
  couchClient.request('PUT', couchDBpath).end();

  redisClient.keys('band:*', function(error, bandKeys) {
    totalBands = bandKeys.length;
    var
      readBands = 0,
      bandsBatch = [];

    bandKeys.forEach(function(bandKey) {
      // substring of 'band:'.length gives us the band name
      var bandName = bandKey.substring(5);

      redisClient.smembers(bandKey, function(error, artists) {
        // batch the Redis calls to get all artists' information at once
        var artistInfoBatch = [];
        artists.forEach(function(artistName) {
          artistInfoBatch.push([
            'smembers',
            'artist:' + bandName + ':' + artistName
          ]);
          artistInfoBatch.push([
            'get',
            'from:' + bandName + ':' + artistName
          ]);
          artistInfoBatch.push([
            'get',
            'to:' + bandName + ':' + artistName
          ]);
        });

        // batch up each band member to find the roles they play
        redisClient.
          multi(artistInfoBatch).
          exec(function(err, artistInfo)
          {
            var
              i = 0,
              artistDocs = [];

            // build the artists sub-documents
            artists.forEach( function(artistName) {
              var artist = { name: artistName, role : artistInfo[i++] },
                  from = artistInfo[i++],
                  to = artistInfo[i++];
              if (from)
                artist['from'] = from;
              if (to)
                artist['to'] = to;
              artistDocs.push(artist);
            });

            // add this new band document to the batch to be executed later
            bandsBatch.push({
              _id: couchKeyify( bandName ),
              name: bandName,
              artists: artistDocs
            });
            // keep track of the total number of bands read
            readBands++;

            // upload batches of 50 values to couch, or the remaining values left
            if( bandsBatch.length >= 50 || totalBands - readBands == 0) {
              postDoc(
                couchDBpath+'/_bulk_docs',
                JSON.stringify({ docs : bandsBatch }),
                bandsBatch.length);

              // empty out the batch array to be filled again
              bandsBatch = [];
            }
          }
        );
      });
    });
  });
};

// expose couchKeyify function
exports.couchKeyify = couchKeyify;

// start populating bands if running as main script
if(!module.parent) {
  populateBands();
}
(graph_sync.js) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/***
 * Excerpted from "Seven Databases in Seven Weeks",
 * published by The Pragmatic Bookshelf.
 * Copyrights apply to this code. It may not be used to create training material, 
 * courses, books, articles, and the like. Contact us if you are in doubt.
 * We make no guarantees that this code is fit for any purpose. 
 * Visit http://www.pragmaticprogrammer.com/titles/rwdata for more book information.
***/

var
  // standard libraries
  events = require('events'),
  esc = require('querystring').escape,
  redis = require('redis'),

  // custom libraries
  couch = require('./watch_changes_continuous.js'),
  neo4j = require('./neo4j_caching_client.js'),

  // database clients
  neo4jClient = neo4j.createClient({
    limit: 10
  }),
  couchWatcher = couch.createWatcher({
    db: 'bands'
  }),
  redisClient = redis.createClient(6379);

// feed band information into redis for autocompleter
function feedBandToRedis(band) {
  redisClient.set('band-name:' + band.name, 1);
  band.artists.forEach(function(artist) {
    redisClient.set('artist-name:' + artist.name, 1);
    artist.role.forEach(function(role){
      redisClient.set('role-name:' + role, 1);
    });
  });
}

/**
 * feed band membership and artist/role content from couch to neo4j.
 * @param band A band document from CouchDB.
 * @param progress EventEmitter to emit progress events.
 */
function feedBandToNeo4j(band, progress) {
  var
    lookup = neo4jClient.lookupOrCreateNode,
    relate = neo4jClient.createRelationship;

  lookup('bands', 'name', band.name, function(bandNode) {
    progress.emit('progress', 'band');
    band.artists.forEach(function(artist) {
      lookup('artists', 'name', artist.name, function(artistNode){
        var props = {};
        progress.emit('progress', 'artist');
        if (artist.from)
          props.from = artist.from;
        if (artist.to)
          props.to = artist.to;
        relate(bandNode.self, artistNode.self, 'member', function(){
          progress.emit('progress', 'member');
        }, props);
        artist.role.forEach(function(role){
          lookup('roles', 'role', role, function(roleNode){
            progress.emit('progress', 'role');
            relate(artistNode.self, roleNode.self, 'plays', function(){
              progress.emit('progress', 'plays');
            });
          });
        });
      });
    });
  });
}

// process only interesting bands (ones with artists who have roles)
function processBand(band, progress) {
  // change this to true to process all bands
  var addBand = false;
  band.artists.forEach(function(artist){
    if (artist.role.length)
      addBand = true;
  });
  if (addBand) {
    feedBandToRedis(band);
    feedBandToNeo4j(band, progress);
  }
}

// progress reporting measures (how much work has been done)
var
  stats = { doc:0, band:0, artist:0, member:0, role:0, plays:0 },
  progress = new events.EventEmitter(),
  timer = setInterval(function() {
    console.log(stats);
  }, 1000);

progress.on('progress', function(type) {
  stats[type] = (stats[type] || 0) + 1;
});

// start watching couch and processing bands as they come in
couchWatcher
  .on('change', function(data){
    progress.emit('progress', 'doc');
    if (data.doc && data.doc.name)
      processBand(data.doc, progress);
  })
  .start();
(neo4j_caching_client.js) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/***
 * Excerpted from "Seven Databases in Seven Weeks",
 * published by The Pragmatic Bookshelf.
 * Copyrights apply to this code. It may not be used to create training material, 
 * courses, books, articles, and the like. Contact us if you are in doubt.
 * We make no guarantees that this code is fit for any purpose. 
 * Visit http://www.pragmaticprogrammer.com/titles/rwdata for more book information.
***/

var
  http = require('http'),
  events = require('events'),
  esc = require('querystring').escape,
  redis = require('redis'),
  neo4j = require('./neo4j_driver.js');

exports.createClient = function(options) {
  options = options || {};

  var
    neo4jClient = neo4j.createClient(options),
    redisClient = redis.createClient(),
    pending = new events.EventEmitter();

  pending.setMaxListeners(0); // unlimited

  neo4jClient.expiry = options.expiry || 300; // default 5 min

  // Run a gremlin script against the server.
  neo4jClient.runGremlin = function(script, callback) {
    var path = ['ext/GremlinPlugin/graphdb/execute_script'];
    neo4jClient.post(path, { script : script }, callback);
  };

  // lookup a key/value node by index.
  neo4jClient.lookupNode = function(index, key, value, callback) {
    var path = ['index/node', esc(index), esc(key), esc(value)];
    neo4jClient.get(path, callback);
  };

  // create a key/value node and index it.
  neo4jClient.createNode = function(index, key, value, callback) {
    var input = {};
    input[key] = value;
    neo4jClient.post('node', input, function(obj){
      var data = { uri: obj.self, key: key, value: value };
      neo4jClient.post(['index/node', esc(index)], data, callback);
    });
  }

  // lookup a node or create/index and cache it
  neo4jClient.lookupOrCreateNode = function(index, key, value, callback) {

    var
      cacheKey = 'lookup:' + index + ':' + key + ':' + value,
      ex = neo4jClient.expiry;

    // only one pending lookup for a given index/key/value allowed at a time
    if (!pending.listeners(cacheKey).length) {

      // check redis first
      redisClient.get(cacheKey, function(err, text){
        if (!err && text) {
          // found in redis cache, use it and refresh
          pending.emit(cacheKey, JSON.parse(text));
          redisClient.expire(cacheKey, ex);
        } else {
          // missed redis cache, lookup in neo4j index
          neo4jClient.lookupNode(index, key, value, function(list, res){
            if (list && list.length) {
              // found in index, use it and cache
              pending.emit(cacheKey, list[0]);
              redisClient.setex(cacheKey, ex, JSON.stringify(list[0]));
            } else {
              // missed index, create it and cache it
              neo4jClient.createNode(index, key, value, function(obj){
                pending.emit(cacheKey, obj);
                redisClient.setex(cacheKey, ex, JSON.stringify(obj));
              });
            }
          });
        }
      });

    }

    pending.once(cacheKey, callback);

  }

  // create a relationship between two nodes
  neo4jClient.createRelationship = function(fromNode, toNode, type, callback, props) {
    var fromPath = (fromNode || '').replace(/^.*?\/db\/data\//, ''),
        rel = { to: toNode, type: type };
    if (props)
      rel.data = props;
    neo4jClient.post(
      [fromPath, 'relationships'], rel, callback
    );
  }

  return neo4jClient;

}
(bands.js) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
/***
 * Excerpted from "Seven Databases in Seven Weeks",
 * published by The Pragmatic Bookshelf.
 * Copyrights apply to this code. It may not be used to create training material, 
 * courses, books, articles, and the like. Contact us if you are in doubt.
 * We make no guarantees that this code is fit for any purpose. 
 * Visit http://www.pragmaticprogrammer.com/titles/rwdata for more book information.
***/

var
  port = 8080,
  jsonHeader = {'Content-Type':'application/json'},

  // standard libraries
  http = require('http'),
  redis = require('redis'),
  bricks = require('bricks'),
  mustache = require('mustache'),
  fs = require('fs'),

  // custom libraries
  couchUtil = require('./populate_couch.js'),
  neo4j = require('./neo4j_caching_client.js'),

  // database clients
  couchClient = http.createClient(5984, 'localhost'),
  neo4jClient = neo4j.createClient(),
  redisClient = redis.createClient(6379);

var
  gremlin = neo4jClient.runGremlin;

/**
 * A convenience function for wrapping the
 * reading of JSON reponse data chunks.
 * @param response A Node HTTP response object.
 * @param callback the function to populate and call on completion.
 */
function processBuffer( response, callback )
{
  var buffer = '';
  response.on('data', function(chunk) {
    buffer += chunk;
  });
  response.on('end', function() {
    if(buffer === '') buffer = 'null';
    callback( JSON.parse(buffer) );
  });
};

/*
 * Post one or more documents into CouchDB.
 * @param url is where we POST to.
 * @param docString a stringified JSON document.
 * @param count the number of documents being inserted.
 */
function getCouchDoc( path, httpResponse, callback )
{
  var request = couchClient.request( 'GET', path, jsonHeader );
  request.end();
  request.on('response', function( response ) {
    if( response.statusCode != 200 ) {
      writeTemplate( httpResponse, '', { message: "Value not found" } );
    } else {
      processBuffer( response, function( couchObj ) {
        callback( couchObj );
      });
    }
  }).
  on('error', function(e) {
    console.log('postDoc Got error: ' + e.message);
  });
};

/**
 * Wraps a block of HTML with a standard template. HTML lives in template.html.
 * @innerHtml populates the body of the template
 */
function htmlTemplate( innerHtml )
{
  var file_data = fs.readFileSync( 'template.html', 'utf8' );
  return file_data.replace("[[YIELD]]", innerHtml);
};

function writeTemplate( response, innerHtml, values )
{
  response.write( mustache.to_html( htmlTemplate( innerHtml ), values ));
  response.end();
}

function convertGremlinTable(table) {
    return fromTableToObject(table[0][0].columns, table[0][0].data);
}

function fromTableToObject(columns, data) {
    var res = [];
    for (var i = 0; i < data.length; i++) {
        var obj = {};
        for (var j = 0; j < columns.length; j++) {
            if (data[i][j] != 'null')
                obj[columns[j]] = data[i][j];
        }
        res.push(obj);
    }

    return res;
}


// A Nodejs web app utility setup
appServer = new bricks.appserver();

// attach request plugin to easily extract params
appServer.addRoute("^/", appServer.plugins.request);

/*
 * Just display a blank form if no band is given.
 */
appServer.addRoute("^/$", function(req, res) {
  writeTemplate( res, '', { message: "Find a band" } );
});

/*
 * Accepts a band name and displays all artists in the band.
 * Also displays a list of suggested bands where at least
 * one artist has played at one time.
 */

/*
 * Gremlin: aliases introduced for the same node and/or
 * relationship appear to be used in reverse order in
 * Tables.
 */
appServer.addRoute("^/band$", function(req, res) {
    var bandName = req.param('name'),
      bandNodePath = '/bands/' + couchUtil.couchKeyify( bandName ),
      queryPrefix = 'g.idx("bands")[["name":"'+bandName+'"]]',
      otherBandsQuery = queryPrefix
            + '.out("member").in("member").dedup.name',
      currentMembersQuery = queryPrefix
            + '.outE("member").as("from")'
            + '.filter{it.to == null}.inV.as("name")'
            + '.table(new Table()){it.from}{it.name}.cap()',
      otherMembersQuery = queryPrefix
            + '.outE("member").as("from").as("to")'
            + '.filter{it.to != null}.inV.as("name")'
            + '.table(new Table())'
            + '{it.to}{it.from}{it.name}.cap()';

    gremlin(otherBandsQuery, function(graphData) {
        gremlin(currentMembersQuery, function(currentMembers) {
            gremlin(otherMembersQuery, function(otherMembers) {
                var values = {band: bandName, bands: graphData,
                             currents: convertGremlinTable(currentMembers),
                             others: convertGremlinTable(otherMembers)};
                var body = '<h2>Current {{band}} Band Members</h2>';
                if (currentMembers[0][0].data.length > 0) {
                    body += '<ul>{{#currents}}';
                    body += '<li><a href="/artist?name={{name}}">{{name}}';
                    body += '{{#from}} from {{from}}{{/from}}</a></li>';
                    body += '{{/currents}}';
                } else {
                    body += "<p>No current member (dead band?)</p>";
                }
                if (otherMembers[0][0].data.length > 0) {
                    body += '<h3>Other members</h3>';
                    body += '<ul>{{#others}}';
                    body += '<li><a href="/artist?name={{name}}">{{name}}';
                    body += '{{#from}} from {{from}}{{/from}}';
                    body += '{{#to}} to {{to}}{{/to}}</a></li>';
                    body += '{{/others}}</ul>';
                }
                body += '<h3>You may also like</h3>';
                body += '<ul>{{#bands}}';
                body += '<li><a href="/band?name={{.}}">{{.}}</a></li>';
                body += '{{/bands}}</ul>';

                writeTemplate( res, body, values );
            });
        });
    });
});

/*
 * Accepts an artist name and displays band and role information
 */
appServer.addRoute("^/artist$", function(req, res) {
    var artistName = req.param('name'),
        queryPrefix = 'g.idx("artists")[["name":"'+artistName+'"]]',
        rolesQuery = queryPrefix +'.out("plays").role.dedup',
        bandsQuery = 'g.idx("artists")[["name":"'+artistName+'"]]'
            + '.inE("member").as("to").as("from").outV.as("name")'
            + '.table(new Table()){it.from}{it.to}{it.name}.cap()';

    gremlin( rolesQuery, function(roles) {
        gremlin( bandsQuery, function(bands) {
            var values = { artist: artistName, roles: roles,
                           bands: convertGremlinTable(bands) };

            var body = '<h3>{{artist}} Performs these Roles</h3>';
            body += '<ul>{{#roles}}';
            body += '<li>{{.}}</li>';
            body += '{{/roles}}</ul>';
            body += '<h3>Play in Bands</h3>';
            body += '<ul>{{#bands}}';
            body += '<li><a href="/band?name={{name}}">{{name}}';
            body += '{{#from}} from {{from}}{{/from}}';
            body += '{{#to}} to {{to}}{{/to}}</a></li>';
            body += '</a></li>';
            body += '{{/bands}}</ul>';

      writeTemplate( res, body, values );
    });
  });
});

/*
 * A band name search. Used for autocompletion.
 */
appServer.addRoute("^/search$", function(req, res) {
  var query = req.param('term');

  redisClient.keys("band-name:"+query+"*", function(error, keys) {
    var bands = [];
    keys.forEach(function(key){
      bands.push(key.replace("band-name:", ''));
    });
    res.write( JSON.stringify(bands) );
    res.end();
  });
});

// catch all unknown routes with a 404
appServer.addRoute(".+", appServer.plugins.fourohfour);
appServer.addRoute(".+", appServer.plugins.loghandler, { section: "final" });

// start up the server
console.log("Starting Server on port " + port);
appServer.createServer().listen(port);

Add Music Samples

The book (in beta 5.0) suggested to use Riak’s Luwak, but this component has recently been removed, and there seems to be no replacement at this time. So I went with MongoDB’s GridFS instead. This is a little more complex than a simple replacement of the client libraries: MongoDB does not have an HTTP ReST API for GridFS, so I need to stream the content of the file through the server.

Overview

To keep things simple, I load only on sample per band; the file name must be the same as the CouchDB key, followed by ‘.mp3’.

To access MongoDB from Node.js, I use node-mongodb-native, which can be installed with npm. It has all the expected features of a client, including GridFS support (with one caveat, see below).

To stream the file from the server, I use a dedicated port, for no better reason than because Brick.js, that the authors used to build the service, was giving me trouble, while the standard http module did not.

When displaying the band information, I check whether a file exists with the same name as the band’s key: if it does, I add a link to the dedicated streaming port, passing the key as parameter:

Adding Sample Link
1
2
3
4
mongoClient.open(function(err, db) {
  mongodb.GridStore.exist(db, bandKey + '.mp3', function(err, exist) {
    if (exist)
      body += '<a href="http://'+host+':'+streamPort+'?band=">Sample</a>';

Then, I create a new http server to send the music:

Streaming Files from GridFS
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
http.createServer(function(request, response) {
    var band = url.parse(request.url, true).query.band;
    mongoClient.open(function(err, db) {
        var gs = new mongodb.GridStore(db, band+'.mp3', "r");
        gs.open(function(err, gs) {
            console.log("streaming...");
            response.writeHeader(200, {
                'Content-type': 'audio/mpeg, audio/x-mpeg, audio/x-mpeg-3, audio/mpeg3',
                // magic headers to stream mp3...
                'X-Pad': 'avoid browser bug',
                'Cache-Control': 'no-cache',
                'Content-Length': gs.length});
// cannot use gridstore streams; somehow file always
// truncated - load in memory instead
//            gs.stream(true).pipe(response);
            gs.read(gs.length, function(err, data) {
                response.write(data);
                response.end();
                db.close();
            });
        });
    });
}).listen(streamPort);

The only problem I had (but it took me a while to figure it out) was that the stream support in the MongoDB client for GridFS content is (as far as I can tell) defective: it will close the stream after just one or two chunks’ worth of data (Issue in Github).

So instead I have to load the whole file in memory then write it in the response… Clearly not the best approach, but hey, it works!

Full Code

(bands.js) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
/***
 * Excerpted from "Seven Databases in Seven Weeks",
 * published by The Pragmatic Bookshelf.
 * Copyrights apply to this code. It may not be used to create training material, 
 * courses, books, articles, and the like. Contact us if you are in doubt.
 * We make no guarantees that this code is fit for any purpose. 
 * Visit http://www.pragmaticprogrammer.com/titles/rwdata for more book information.
***/

var
  port = 8080,
  streamPort = 8089,
  host = 'localhost',
  jsonHeader = {'Content-Type':'application/json'},

  // standard libraries
  http = require('http'),
  redis = require('redis'),
  bricks = require('bricks'),
  mustache = require('mustache'),
  fs = require('fs'),
  url = require('url'),

  // mongodb
  mongodb = require('mongodb'),

  // custom libraries
  couchUtil = require('./populate_couch.js'),
  neo4j = require('./neo4j_caching_client.js'),

  // database clients
  couchClient = http.createClient(5984, 'localhost'),
  neo4jClient = neo4j.createClient(),
  redisClient = redis.createClient(6379),

  mongoClient =
    new mongodb.Db('music',
                   new mongodb.Server('localhost',
                                      mongodb.Connection.DEFAULT_PORT, {}),
                   {native_parser:true});

var
  gremlin = neo4jClient.runGremlin;

/**
 * A convenience function for wrapping the
 * reading of JSON reponse data chunks.
 * @param response A Node HTTP response object.
 * @param callback the function to populate and call on completion.
 */
function processBuffer( response, callback )
{
  var buffer = '';
  response.on('data', function(chunk) {
    buffer += chunk;
  });
  response.on('end', function() {
    if(buffer === '') buffer = 'null';
    callback( JSON.parse(buffer) );
  });
};

/*
 * Post one or more documents into CouchDB.
 * @param url is where we POST to.
 * @param docString a stringified JSON document.
 * @param count the number of documents being inserted.
 */
function getCouchDoc( path, httpResponse, callback )
{
  var request = couchClient.request( 'GET', path, jsonHeader );
  request.end();
  request.on('response', function( response ) {
    if( response.statusCode != 200 ) {
      writeTemplate( httpResponse, '', { message: "Value not found" } );
    } else {
      processBuffer( response, function( couchObj ) {
        callback( couchObj );
      });
    }
  }).
  on('error', function(e) {
    console.log('postDoc Got error: ' + e.message);
  });
};

/**
 * Wraps a block of HTML with a standard template. HTML lives in template.html.
 * @innerHtml populates the body of the template
 */
function htmlTemplate( innerHtml )
{
  var file_data = fs.readFileSync( 'template.html', 'utf8' );
  return file_data.replace("[[YIELD]]", innerHtml);
};

function writeTemplate( response, innerHtml, values )
{
  response.write( mustache.to_html( htmlTemplate( innerHtml ), values ));
  response.end();
}

function convertGremlinTable(table) {
    return fromTableToObject(table[0][0].columns, table[0][0].data);
}

function fromTableToObject(columns, data) {
    var res = [];
    for (var i = 0; i < data.length; i++) {
        var obj = {};
        for (var j = 0; j < columns.length; j++) {
            if (data[i][j] != 'null')
                obj[columns[j]] = data[i][j];
        }
        res.push(obj);
    }

    return res;
}


// A Nodejs web app utility setup
appServer = new bricks.appserver();

// attach request plugin to easily extract params
appServer.addRoute("^/", appServer.plugins.request);

/*
 * Just display a blank form if no band is given.
 */
appServer.addRoute("^/$", function(req, res) {
  writeTemplate( res, '', { message: "Find a band" } );
});

/*
 * Accepts a band name and displays all artists in the band.
 * Also displays a list of suggested bands where at least
 * one artist has played at one time.
 */

/*
 * Gremlin: aliases introduced for the same node and/or
 * relationship appear to be used in reverse order in
 * Tables.
 */
appServer.addRoute("^/band$", function(req, res) {
    var bandName = req.param('name'),
      bandKey = couchUtil.couchKeyify( bandName ),
      bandNodePath = '/bands/' + bandKey,
      queryPrefix = 'g.idx("bands")[["name":"'+bandName+'"]]',
      otherBandsQuery = queryPrefix
            + '.out("member").in("member").dedup.name',
      currentMembersQuery = queryPrefix
            + '.outE("member").as("from")'
            + '.filter{it.to == null}.inV.as("name")'
            + '.table(new Table()){it.from}{it.name}.cap()',
      otherMembersQuery = queryPrefix
            + '.outE("member").as("from").as("to")'
            + '.filter{it.to != null}.inV.as("name")'
            + '.table(new Table())'
            + '{it.to}{it.from}{it.name}.cap()';

    gremlin(otherBandsQuery, function(graphData) {
        gremlin(currentMembersQuery, function(currentMembers) {
            gremlin(otherMembersQuery, function(otherMembers) {
                var values = {band: bandName, bands: graphData,
                             currents: convertGremlinTable(currentMembers),
                             others: convertGremlinTable(otherMembers),
                             bandK: bandKey};
                var body = '<h2>Current {{band}} Band Members</h2>';
                if (currentMembers[0][0].data.length > 0) {
                    body += '<ul>{{#currents}}';
                    body += '<li><a href="/artist?name={{name}}">{{name}}';
                    body += '{{#from}} from {{from}}{{/from}}</a></li>';
                    body += '{{/currents}}';
                } else {
                    body += "<p>No current member (dead band?)</p>";
                }

                mongoClient.open(function(err, db) {
                    mongodb.GridStore.exist(db, bandKey + '.mp3', function(err, exist) {
                        if (exist)
                            body += '<a href="http://'+host+':'+streamPort+'?band={{bandK}}">Sample</a>';
                        if (otherMembers[0][0].data.length > 0) {
                            body += '<h3>Other members</h3>';
                            body += '<ul>{{#others}}';
                            body += '<li><a href="/artist?name={{name}}">{{name}}';
                            body += '{{#from}} from {{from}}{{/from}}';
                            body += '{{#to}} to {{to}}{{/to}}</a></li>';
                            body += '{{/others}}</ul>';
                        }
                        body += '<h3>You may also like</h3>';
                        body += '<ul>{{#bands}}';
                        body += '<li><a href="/band?name={{.}}">{{.}}</a></li>';
                        body += '{{/bands}}</ul>';

                        writeTemplate( res, body, values );
                        db.close();
                    });
                });
            });
        });
    });
});

/*
 * Accepts an artist name and displays band and role information
 */
appServer.addRoute("^/artist$", function(req, res) {
    var artistName = req.param('name'),
        queryPrefix = 'g.idx("artists")[["name":"'+artistName+'"]]',
        rolesQuery = queryPrefix +'.out("plays").role.dedup',
        bandsQuery = 'g.idx("artists")[["name":"'+artistName+'"]]'
            + '.inE("member").as("to").as("from").outV.as("name")'
            + '.table(new Table()){it.from}{it.to}{it.name}.cap()';

    gremlin( rolesQuery, function(roles) {
        gremlin( bandsQuery, function(bands) {
            var values = { artist: artistName, roles: roles,
                           bands: convertGremlinTable(bands) };

            var body = '<h3>{{artist}} Performs these Roles</h3>';
            body += '<ul>{{#roles}}';
            body += '<li>{{.}}</li>';
            body += '{{/roles}}</ul>';
            body += '<h3>Play in Bands</h3>';
            body += '<ul>{{#bands}}';
            body += '<li><a href="/band?name={{name}}">{{name}}';
            body += '{{#from}} from {{from}}{{/from}}';
            body += '{{#to}} to {{to}}{{/to}}</a></li>';
            body += '</a></li>';
            body += '{{/bands}}</ul>';

      writeTemplate( res, body, values );
    });
  });
});

/*
 * A band name search. Used for autocompletion.
 */
appServer.addRoute("^/search$", function(req, res) {
  var query = req.param('term');

  redisClient.keys("band-name:"+query+"*", function(error, keys) {
    var bands = [];
    keys.forEach(function(key){
      bands.push(key.replace("band-name:", ''));
    });
    res.write( JSON.stringify(bands) );
    res.end();
  });
});

// cannot seem to get bricks to stream data back
// use simple default http server
http.createServer(function(request, response) {
    var band = url.parse(request.url, true).query.band;
    mongoClient.open(function(err, db) {
        var gs = new mongodb.GridStore(db, band+'.mp3', "r");
        gs.open(function(err, gs) {
            console.log("streaming...");
            response.writeHeader(200, {
                'Content-type': 'audio/mpeg, audio/x-mpeg, audio/x-mpeg-3, audio/mpeg3',
                // magic headers to stream mp3...
                'X-Pad': 'avoid browser bug',
                'Cache-Control': 'no-cache',
                'Content-Length': gs.length});
// cannot use gridstore streams; somehow file always
// truncated - load in memory instead
//            gs.stream(true).pipe(response);
            gs.read(gs.length, function(err, data) {
                response.write(data);
                response.end();
                db.close();
            });
        });
    });
}).listen(streamPort);

// catch all unknown routes with a 404
appServer.addRoute(".+", appServer.plugins.fourohfour);
appServer.addRoute(".+", appServer.plugins.loghandler, { section: "final" });

// start up the server
console.log("Starting Server on port " + port);
appServer.createServer().listen(port);

Wrapping Up

Well, that was a long day. I should have enjoyed it, but the lack of maturity in some of the tools (Neo4j’s always evolving query language and the GridFS streaming bug) caused hours of frustration. The main cause, however, was missing knowledge: faced with an unexpected behaviour, I had no idea whether it was a bug (find a workaround) or an incorrect invocation (rework the query to correct it).

The exposition of polyglot persistence through the music information service were pretty good, given the space constraint. Of course it skipped the really ugly and tedious parts (how to incrementally keep the databases in sync when the main records are updated, not merely created); given the variation in data models, data manipulation (or lack thereof) and query between the different databases, this can easily become a nightmare (especially if incremental updates are not part of the initial design).

Another upcoming book, Big Data, takes a very different approach (no updates, only appends). I look forward to reading it.

Comments