Wow, almost two months since I wrote
Day 2,
and more than one since the last post in this series… Time to bring
it to an end.
Today is less about Redis (indeed, it is hardly used at all), and more
about a concept: Polyglot Persistence, and about an implementation
that showcases the concept.
In fact, I spent most of my time browsing the documentation of
Node.js, the library/framework the authors used
to build the demo application.
Polyglot Persistence
Polyglot Persistence, the use of several kinds of storage systems in a
project, makes even more sense than Polyglot Programming (the
use of several languages in a project).
While languages are, by and large, equivalent in expressive power, and
mostly a matter of choice, culture, or comparative advantage (some
languages favour small teams, other large ones), storage systems are
sufficiently different that they are not interchangeable.
Once the idea of eventual consistency takes root, it is only a simple
extension to view the data as services available from a number of
sources, each optimised for its intended use (instead of a single,
default source that only partially meets the more specialised needs),
and with its own update cycles.
The problem, of course, is that it introduces several levels of
complexity: development, deployment, monitoring, and a dizzying range
of potential errors, failures, …
Polyglot Persistent Service
The implementation described in the book is small enough to fit in
less than 15 pages, yet rich enough to show what is possible.
The databases are (with the versions I used):
Redis 2.4.8
CouchDB 1.1.1
Neo4j Community 1.6.1
and the glue language is Node.js.
Redis
Redis is used first as initial storage for the first data take-on. It
is then used to track the transfer of data between CouchDB and the
other databases, and finally to support auto-completion of band names.
CouchDB
CouchDB is intended as the System Of Records (i.e. master database)
for the system. Data is meant to be loaded into CouchDB first, then
propagated to the other databases.
Beside that, it is not used much, and after the exercises, not used at all…
Neo4j
Neo4j keeps a graph of bands, members, and instruments (or roles), and
their relationships.
Node.js
Node.js is a framework/library for JavaScript based on the concept of
event-based programming (similar to, but perhaps more radical than,
Erlang). All I/O is done in continuation-passing style, which means
that whenever a I/O operation is initiated, one of the argument is a
function to handle whatever the operation produces (or deal with the
errors).
This is good from a performance point of view, but it is of course
more complex to design and code with. Still, it looks like a fun tool
to glue various servers together.
Book Code Fixes
I had to fix some of the code from the authors (nothing serious, and
all reported in the errata):
populate_couch.js: the trackLineCount has an off-by-one
error. The check for completion should be totalBands <=
processedBands
bands.js: the initialisation of membersQuery in the function
for the /band route has a syntax error. It should be
The book uses a now dated version of Neo4j, so the queries do not
work. The shortcut to access a node by index does not work anymore,
and the uniqueObject step has been replaced by dedup.
I’m not sure what the second homework exercise was supposed to be
about: Neo4j already contains information about members and
memberships. Perhaps it dates from an early draft, before this
chapter’s code evolved into what it is now. In any case, the first
exercise had enough Neo4j anyway.
Adding Band Member’s start and end dates
The start and end dates for memberships in bands is sometimes
provided; the purpose of this exercise is to use this information.
Pre-populate
I load the start and end dates into their own key in Redis. The key
format are from:bandName:artistName and to:bandName:artistName.
Adding the information to CouchDB is not hard; the main difficulty is
to figure out how to modify the populate_couch.js script
(continuation-passing style is hard).
Eventually, I just reused the roleBatch (therefore renamed
artistInfoBatch) to retrieve the roles, from and to information.
Neo4j was the hardest piece of the puzzle: I didn’t know, and could
not find any definitive documentation on, how to relationship
properties at creation time. Eventually I found that adding them to
the data attribute passed at creation time did the trick (although
it still took me more time to understand how to use them).
The problem to do so is that the neo4j_caching_client.js library
does not support adding properties to relationships, but it was easy
enough to modify this library to add this feature.
To make use of the new data, I tried to differentiate between current
and old members of a band. I simply define a current member as one
whose to property is null.
Figuring how to write a Gremlin query that extracted the information I
needed was challenging: the documentation is often sparse, and many
concepts barely explained.
I found that I could collect nodes or relationships along a path by
naming them (with the step as), and then gather all of them in a
single row of a
Table.
I used this to get both the from, to
properties and the artist name property in a single query. However,
I spent some time tracking a bug in my filters where apparently, null
to would not be returned as current members. I finally realise that
when a given node or relationship is given two different names, these
names will appear in reverse order in the Table.
So in my case, the query:
123
g.idx("bands")[["name":"Nine Inch Nails"]].outE("member").as("from").as("to")
.filter{it.to != null}.inV.as("name")
.table(new Table()).{it.to}{it.from}{it.name}.cap()
I give the names from and to to the relationship, but used them in
reverse order in the Table closures. Is this the intended behaviour
or a bug? Does anybody know?
It seems like a common problem with some NoSQL databases: the query
language feels very much adhoc, and not entirely sound or fully
thought through. Despite its many defects, SQL was at least based (if
sometimes remotely) on the relational calculus, which gave a precise
meaning to queries. It was further specified in different standards,
so that even its defects were fully clarified (XPath/XQuery is another
pretty well specified query language). When playing with NoSQL
databases that pretend to have a query language, I often find it
difficult to go beyond the simpler examples, precisely because of this
linguistic fuzziness.
But I solved it for this case, so now I have my Table. It is an
object with two properties: columns is an array of column names, and
data is an array of arrays (each one being a row). To convert them
to an array of objects, I use the following code:
/*** * Excerpted from "Seven Databases in Seven Weeks", * published by The Pragmatic Bookshelf. * Copyrights apply to this code. It may not be used to create training material, * courses, books, articles, and the like. Contact us if you are in doubt. * We make no guarantees that this code is fit for any purpose. * Visit http://www.pragmaticprogrammer.com/titles/rwdata for more book information.***/var// The band data file name in tab-seperated formtsvFileName='group_membership.tsv',// track how many file lines we've processedprocessedLines=0,// standard librariescsv=require('csv'),redis=require('redis'),// database clientsredis_client=redis.createClient(6379);/** * A helper function that splits up the comma-seperated list of roles and * converts it to an array. If no valid roles exist, return an empty array. * @param string the CSV to split into a role array */functionbuildRoles(string){varroles=string.split(',');if(roles.length===1&&roles[0]==='')roles=[];returnroles;};/** * Utility function that increments the total number * of lines (artists) processed and outputs every 1000. */functiontrackLineCount(){if(++processedLines%1000===0)console.log('Lines Processed: '+processedLines);}/** * Does all heavy lifting. Loops through the CSV file * and populate Redis with the given values. */functionpopulateRedis(){csv().fromPath(tsvFileName,{delimiter:'\t',quote:''}).on('data',function(data,index){varartist=data[2],band=data[3],roles=buildRoles(data[4]),from=data[5],to=data[6];if(band===''||artist===''){trackLineCount();returntrue;}redis_client.sadd('band:'+band,artist);roles.forEach(function(role){redis_client.sadd('artist:'+band+':'+artist,role);});if(from!='')redis_client.set('from:'+band+':'+artist,from);if(to!='')redis_client.set('to:'+band+':'+artist,to);trackLineCount();}).on('end',function(total_lines){console.log('Total Lines Processed: '+processedLines);redis_client.quit();});};populateRedis();
/*** * Excerpted from "Seven Databases in Seven Weeks", * published by The Pragmatic Bookshelf. * Copyrights apply to this code. It may not be used to create training material, * courses, books, articles, and the like. Contact us if you are in doubt. * We make no guarantees that this code is fit for any purpose. * Visit http://www.pragmaticprogrammer.com/titles/rwdata for more book information.***/var// how many bands we expect to processtotalBands=null,// and keep track of how many bands we have processedprocessedBands=0,// The name of the couch databasecouchDBpath='/bands',// standard librarieshttp=require('http'),redis=require('redis'),// database clientscouchClient=http.createClient(5984,'localhost'),redisClient=redis.createClient(6379);/** * A helper function that builds a good CouchDB key * @param string the unicode string being keyified */functioncouchKeyify(string){// remove bad chars, and disallow starting with an underscorereturnstring.replace(/[\t \?\#\\\-\+\.\,'"()*&!\/]+/g,'_').replace(/^_+/,'');};/* * Keep track of the number of bands processed, output every 1000 loaded, * and close the Redis client when we've loaded them all. */functiontrackLineCount(increment){processedBands+=increment;// output once every 1000 linesif(processedBands%1000===0)console.log('Bands Loaded: '+processedBands);// close the Redis Client when completeif(totalBands<=processedBands){console.log('Total Bands Loaded: '+processedBands);redisClient.quit();}};/* * Post one or more documents into CouchDB. * @param url is where we POST to. * @param docString a stringified JSON document. * @param count the number of documents being inserted. */functionpostDoc(url,docsString,count){varrequest=couchClient.request('POST',url,{'Content-Type':'application/json'});request.end(docsString);request.on('response',function(response){if(response.statusCode==201)trackLineCount(count);}).on('error',function(e){console.log('postDoc Got error: '+e.message);});};/* * Loop through all of the bands populated in Redis. We expect * the format of each key to be 'band:Band Name' having the value * as a set of artist names. The artists each have the list of roles * they play in each band, keyed by 'artist:Band Name:Artist Name'. * The band name, set of artists, and set of roles each artist plays * populates the CouchDB documents. eg: { name:"Nirvana", artists:[{ name: "Kurt Cobain", roles:["Lead Vocals", "Guitar"] },...] } */functionpopulateBands(){// First, create the couch databasecouchClient.request('PUT',couchDBpath).end();redisClient.keys('band:*',function(error,bandKeys){totalBands=bandKeys.length;varreadBands=0,bandsBatch=[];bandKeys.forEach(function(bandKey){// substring of 'band:'.length gives us the band namevarbandName=bandKey.substring(5);redisClient.smembers(bandKey,function(error,artists){// batch the Redis calls to get all artists' information at oncevarartistInfoBatch=[];artists.forEach(function(artistName){artistInfoBatch.push(['smembers','artist:'+bandName+':'+artistName]);artistInfoBatch.push(['get','from:'+bandName+':'+artistName]);artistInfoBatch.push(['get','to:'+bandName+':'+artistName]);});// batch up each band member to find the roles they playredisClient.multi(artistInfoBatch).exec(function(err,artistInfo){vari=0,artistDocs=[];// build the artists sub-documentsartists.forEach(function(artistName){varartist={name:artistName,role:artistInfo[i++]},from=artistInfo[i++],to=artistInfo[i++];if(from)artist['from']=from;if(to)artist['to']=to;artistDocs.push(artist);});// add this new band document to the batch to be executed laterbandsBatch.push({_id:couchKeyify(bandName),name:bandName,artists:artistDocs});// keep track of the total number of bands readreadBands++;// upload batches of 50 values to couch, or the remaining values leftif(bandsBatch.length>=50||totalBands-readBands==0){postDoc(couchDBpath+'/_bulk_docs',JSON.stringify({docs:bandsBatch}),bandsBatch.length);// empty out the batch array to be filled againbandsBatch=[];}});});});});};// expose couchKeyify functionexports.couchKeyify=couchKeyify;// start populating bands if running as main scriptif(!module.parent){populateBands();}
/*** * Excerpted from "Seven Databases in Seven Weeks", * published by The Pragmatic Bookshelf. * Copyrights apply to this code. It may not be used to create training material, * courses, books, articles, and the like. Contact us if you are in doubt. * We make no guarantees that this code is fit for any purpose. * Visit http://www.pragmaticprogrammer.com/titles/rwdata for more book information.***/var// standard librariesevents=require('events'),esc=require('querystring').escape,redis=require('redis'),// custom librariescouch=require('./watch_changes_continuous.js'),neo4j=require('./neo4j_caching_client.js'),// database clientsneo4jClient=neo4j.createClient({limit:10}),couchWatcher=couch.createWatcher({db:'bands'}),redisClient=redis.createClient(6379);// feed band information into redis for autocompleterfunctionfeedBandToRedis(band){redisClient.set('band-name:'+band.name,1);band.artists.forEach(function(artist){redisClient.set('artist-name:'+artist.name,1);artist.role.forEach(function(role){redisClient.set('role-name:'+role,1);});});}/** * feed band membership and artist/role content from couch to neo4j. * @param band A band document from CouchDB. * @param progress EventEmitter to emit progress events. */functionfeedBandToNeo4j(band,progress){varlookup=neo4jClient.lookupOrCreateNode,relate=neo4jClient.createRelationship;lookup('bands','name',band.name,function(bandNode){progress.emit('progress','band');band.artists.forEach(function(artist){lookup('artists','name',artist.name,function(artistNode){varprops={};progress.emit('progress','artist');if(artist.from)props.from=artist.from;if(artist.to)props.to=artist.to;relate(bandNode.self,artistNode.self,'member',function(){progress.emit('progress','member');},props);artist.role.forEach(function(role){lookup('roles','role',role,function(roleNode){progress.emit('progress','role');relate(artistNode.self,roleNode.self,'plays',function(){progress.emit('progress','plays');});});});});});});}// process only interesting bands (ones with artists who have roles)functionprocessBand(band,progress){// change this to true to process all bandsvaraddBand=false;band.artists.forEach(function(artist){if(artist.role.length)addBand=true;});if(addBand){feedBandToRedis(band);feedBandToNeo4j(band,progress);}}// progress reporting measures (how much work has been done)varstats={doc:0,band:0,artist:0,member:0,role:0,plays:0},progress=newevents.EventEmitter(),timer=setInterval(function(){console.log(stats);},1000);progress.on('progress',function(type){stats[type]=(stats[type]||0)+1;});// start watching couch and processing bands as they come incouchWatcher.on('change',function(data){progress.emit('progress','doc');if(data.doc&&data.doc.name)processBand(data.doc,progress);}).start();
/*** * Excerpted from "Seven Databases in Seven Weeks", * published by The Pragmatic Bookshelf. * Copyrights apply to this code. It may not be used to create training material, * courses, books, articles, and the like. Contact us if you are in doubt. * We make no guarantees that this code is fit for any purpose. * Visit http://www.pragmaticprogrammer.com/titles/rwdata for more book information.***/varhttp=require('http'),events=require('events'),esc=require('querystring').escape,redis=require('redis'),neo4j=require('./neo4j_driver.js');exports.createClient=function(options){options=options||{};varneo4jClient=neo4j.createClient(options),redisClient=redis.createClient(),pending=newevents.EventEmitter();pending.setMaxListeners(0);// unlimitedneo4jClient.expiry=options.expiry||300;// default 5 min// Run a gremlin script against the server.neo4jClient.runGremlin=function(script,callback){varpath=['ext/GremlinPlugin/graphdb/execute_script'];neo4jClient.post(path,{script:script},callback);};// lookup a key/value node by index.neo4jClient.lookupNode=function(index,key,value,callback){varpath=['index/node',esc(index),esc(key),esc(value)];neo4jClient.get(path,callback);};// create a key/value node and index it.neo4jClient.createNode=function(index,key,value,callback){varinput={};input[key]=value;neo4jClient.post('node',input,function(obj){vardata={uri:obj.self,key:key,value:value};neo4jClient.post(['index/node',esc(index)],data,callback);});}// lookup a node or create/index and cache itneo4jClient.lookupOrCreateNode=function(index,key,value,callback){varcacheKey='lookup:'+index+':'+key+':'+value,ex=neo4jClient.expiry;// only one pending lookup for a given index/key/value allowed at a timeif(!pending.listeners(cacheKey).length){// check redis firstredisClient.get(cacheKey,function(err,text){if(!err&&text){// found in redis cache, use it and refreshpending.emit(cacheKey,JSON.parse(text));redisClient.expire(cacheKey,ex);}else{// missed redis cache, lookup in neo4j indexneo4jClient.lookupNode(index,key,value,function(list,res){if(list&&list.length){// found in index, use it and cachepending.emit(cacheKey,list[0]);redisClient.setex(cacheKey,ex,JSON.stringify(list[0]));}else{// missed index, create it and cache itneo4jClient.createNode(index,key,value,function(obj){pending.emit(cacheKey,obj);redisClient.setex(cacheKey,ex,JSON.stringify(obj));});}});}});}pending.once(cacheKey,callback);}// create a relationship between two nodesneo4jClient.createRelationship=function(fromNode,toNode,type,callback,props){varfromPath=(fromNode||'').replace(/^.*?\/db\/data\//,''),rel={to:toNode,type:type};if(props)rel.data=props;neo4jClient.post([fromPath,'relationships'],rel,callback);}returnneo4jClient;}
/*** * Excerpted from "Seven Databases in Seven Weeks", * published by The Pragmatic Bookshelf. * Copyrights apply to this code. It may not be used to create training material, * courses, books, articles, and the like. Contact us if you are in doubt. * We make no guarantees that this code is fit for any purpose. * Visit http://www.pragmaticprogrammer.com/titles/rwdata for more book information.***/varport=8080,jsonHeader={'Content-Type':'application/json'},// standard librarieshttp=require('http'),redis=require('redis'),bricks=require('bricks'),mustache=require('mustache'),fs=require('fs'),// custom librariescouchUtil=require('./populate_couch.js'),neo4j=require('./neo4j_caching_client.js'),// database clientscouchClient=http.createClient(5984,'localhost'),neo4jClient=neo4j.createClient(),redisClient=redis.createClient(6379);vargremlin=neo4jClient.runGremlin;/** * A convenience function for wrapping the * reading of JSON reponse data chunks. * @param response A Node HTTP response object. * @param callback the function to populate and call on completion. */functionprocessBuffer(response,callback){varbuffer='';response.on('data',function(chunk){buffer+=chunk;});response.on('end',function(){if(buffer==='')buffer='null';callback(JSON.parse(buffer));});};/* * Post one or more documents into CouchDB. * @param url is where we POST to. * @param docString a stringified JSON document. * @param count the number of documents being inserted. */functiongetCouchDoc(path,httpResponse,callback){varrequest=couchClient.request('GET',path,jsonHeader);request.end();request.on('response',function(response){if(response.statusCode!=200){writeTemplate(httpResponse,'',{message:"Value not found"});}else{processBuffer(response,function(couchObj){callback(couchObj);});}}).on('error',function(e){console.log('postDoc Got error: '+e.message);});};/** * Wraps a block of HTML with a standard template. HTML lives in template.html. * @innerHtml populates the body of the template */functionhtmlTemplate(innerHtml){varfile_data=fs.readFileSync('template.html','utf8');returnfile_data.replace("[[YIELD]]",innerHtml);};functionwriteTemplate(response,innerHtml,values){response.write(mustache.to_html(htmlTemplate(innerHtml),values));response.end();}functionconvertGremlinTable(table){returnfromTableToObject(table[0][0].columns,table[0][0].data);}functionfromTableToObject(columns,data){varres=[];for(vari=0;i<data.length;i++){varobj={};for(varj=0;j<columns.length;j++){if(data[i][j]!='null')obj[columns[j]]=data[i][j];}res.push(obj);}returnres;}// A Nodejs web app utility setupappServer=newbricks.appserver();// attach request plugin to easily extract paramsappServer.addRoute("^/",appServer.plugins.request);/* * Just display a blank form if no band is given. */appServer.addRoute("^/$",function(req,res){writeTemplate(res,'',{message:"Find a band"});});/* * Accepts a band name and displays all artists in the band. * Also displays a list of suggested bands where at least * one artist has played at one time. *//* * Gremlin: aliases introduced for the same node and/or * relationship appear to be used in reverse order in * Tables. */appServer.addRoute("^/band$",function(req,res){varbandName=req.param('name'),bandNodePath='/bands/'+couchUtil.couchKeyify(bandName),queryPrefix='g.idx("bands")[["name":"'+bandName+'"]]',otherBandsQuery=queryPrefix+'.out("member").in("member").dedup.name',currentMembersQuery=queryPrefix+'.outE("member").as("from")'+'.filter{it.to == null}.inV.as("name")'+'.table(new Table()){it.from}{it.name}.cap()',otherMembersQuery=queryPrefix+'.outE("member").as("from").as("to")'+'.filter{it.to != null}.inV.as("name")'+'.table(new Table())'+'{it.to}{it.from}{it.name}.cap()';gremlin(otherBandsQuery,function(graphData){gremlin(currentMembersQuery,function(currentMembers){gremlin(otherMembersQuery,function(otherMembers){varvalues={band:bandName,bands:graphData,currents:convertGremlinTable(currentMembers),others:convertGremlinTable(otherMembers)};varbody='<h2>Current {{band}} Band Members</h2>';if(currentMembers[0][0].data.length>0){body+='<ul>{{#currents}}';body+='<li><a href="/artist?name={{name}}">{{name}}';body+='{{#from}} from {{from}}{{/from}}</a></li>';body+='{{/currents}}';}else{body+="<p>No current member (dead band?)</p>";}if(otherMembers[0][0].data.length>0){body+='<h3>Other members</h3>';body+='<ul>{{#others}}';body+='<li><a href="/artist?name={{name}}">{{name}}';body+='{{#from}} from {{from}}{{/from}}';body+='{{#to}} to {{to}}{{/to}}</a></li>';body+='{{/others}}</ul>';}body+='<h3>You may also like</h3>';body+='<ul>{{#bands}}';body+='<li><a href="/band?name={{.}}">{{.}}</a></li>';body+='{{/bands}}</ul>';writeTemplate(res,body,values);});});});});/* * Accepts an artist name and displays band and role information */appServer.addRoute("^/artist$",function(req,res){varartistName=req.param('name'),queryPrefix='g.idx("artists")[["name":"'+artistName+'"]]',rolesQuery=queryPrefix+'.out("plays").role.dedup',bandsQuery='g.idx("artists")[["name":"'+artistName+'"]]'+'.inE("member").as("to").as("from").outV.as("name")'+'.table(new Table()){it.from}{it.to}{it.name}.cap()';gremlin(rolesQuery,function(roles){gremlin(bandsQuery,function(bands){varvalues={artist:artistName,roles:roles,bands:convertGremlinTable(bands)};varbody='<h3>{{artist}} Performs these Roles</h3>';body+='<ul>{{#roles}}';body+='<li>{{.}}</li>';body+='{{/roles}}</ul>';body+='<h3>Play in Bands</h3>';body+='<ul>{{#bands}}';body+='<li><a href="/band?name={{name}}">{{name}}';body+='{{#from}} from {{from}}{{/from}}';body+='{{#to}} to {{to}}{{/to}}</a></li>';body+='</a></li>';body+='{{/bands}}</ul>';writeTemplate(res,body,values);});});});/* * A band name search. Used for autocompletion. */appServer.addRoute("^/search$",function(req,res){varquery=req.param('term');redisClient.keys("band-name:"+query+"*",function(error,keys){varbands=[];keys.forEach(function(key){bands.push(key.replace("band-name:",''));});res.write(JSON.stringify(bands));res.end();});});// catch all unknown routes with a 404appServer.addRoute(".+",appServer.plugins.fourohfour);appServer.addRoute(".+",appServer.plugins.loghandler,{section:"final"});// start up the serverconsole.log("Starting Server on port "+port);appServer.createServer().listen(port);
Add Music Samples
The book (in beta 5.0) suggested to use Riak’s Luwak, but this
component has recently been removed, and there seems to be no
replacement at this time. So I went with MongoDB’s
GridFS instead. This is
a little more complex than a simple replacement of the client
libraries: MongoDB does not have an HTTP ReST API for GridFS, so I
need to stream the content of the file through the server.
Overview
To keep things simple, I load only on sample per band; the file name
must be the same as the CouchDB key, followed by ‘.mp3’.
To access MongoDB from Node.js, I use
node-mongodb-native,
which can be installed with npm. It has all the expected features of
a client, including GridFS support (with one caveat, see below).
To stream the file from the server, I use a dedicated port, for no
better reason than because
Brick.js, that the authors used to
build the service, was giving me trouble, while the standard http
module did not.
When displaying the band information, I check whether a file exists
with the same name as the band’s key: if it does, I add a link to the
dedicated streaming port, passing the key as parameter:
Then, I create a new http server to send the music:
Streaming Files from GridFS
1234567891011121314151617181920212223
http.createServer(function(request,response){varband=url.parse(request.url,true).query.band;mongoClient.open(function(err,db){vargs=newmongodb.GridStore(db,band+'.mp3',"r");gs.open(function(err,gs){console.log("streaming...");response.writeHeader(200,{'Content-type':'audio/mpeg, audio/x-mpeg, audio/x-mpeg-3, audio/mpeg3',// magic headers to stream mp3...'X-Pad':'avoid browser bug','Cache-Control':'no-cache','Content-Length':gs.length});// cannot use gridstore streams; somehow file always// truncated - load in memory instead// gs.stream(true).pipe(response);gs.read(gs.length,function(err,data){response.write(data);response.end();db.close();});});});}).listen(streamPort);
The only problem I had (but it took me a while to figure it out) was
that the stream support in the MongoDB client for GridFS content is
(as far as I can tell) defective: it will close the stream after just
one or two chunks’ worth of data
(Issue in Github).
So instead I have to load the whole file in memory then write it in
the response… Clearly not the best approach, but hey, it works!
/*** * Excerpted from "Seven Databases in Seven Weeks", * published by The Pragmatic Bookshelf. * Copyrights apply to this code. It may not be used to create training material, * courses, books, articles, and the like. Contact us if you are in doubt. * We make no guarantees that this code is fit for any purpose. * Visit http://www.pragmaticprogrammer.com/titles/rwdata for more book information.***/varport=8080,streamPort=8089,host='localhost',jsonHeader={'Content-Type':'application/json'},// standard librarieshttp=require('http'),redis=require('redis'),bricks=require('bricks'),mustache=require('mustache'),fs=require('fs'),url=require('url'),// mongodbmongodb=require('mongodb'),// custom librariescouchUtil=require('./populate_couch.js'),neo4j=require('./neo4j_caching_client.js'),// database clientscouchClient=http.createClient(5984,'localhost'),neo4jClient=neo4j.createClient(),redisClient=redis.createClient(6379),mongoClient=newmongodb.Db('music',newmongodb.Server('localhost',mongodb.Connection.DEFAULT_PORT,{}),{native_parser:true});vargremlin=neo4jClient.runGremlin;/** * A convenience function for wrapping the * reading of JSON reponse data chunks. * @param response A Node HTTP response object. * @param callback the function to populate and call on completion. */functionprocessBuffer(response,callback){varbuffer='';response.on('data',function(chunk){buffer+=chunk;});response.on('end',function(){if(buffer==='')buffer='null';callback(JSON.parse(buffer));});};/* * Post one or more documents into CouchDB. * @param url is where we POST to. * @param docString a stringified JSON document. * @param count the number of documents being inserted. */functiongetCouchDoc(path,httpResponse,callback){varrequest=couchClient.request('GET',path,jsonHeader);request.end();request.on('response',function(response){if(response.statusCode!=200){writeTemplate(httpResponse,'',{message:"Value not found"});}else{processBuffer(response,function(couchObj){callback(couchObj);});}}).on('error',function(e){console.log('postDoc Got error: '+e.message);});};/** * Wraps a block of HTML with a standard template. HTML lives in template.html. * @innerHtml populates the body of the template */functionhtmlTemplate(innerHtml){varfile_data=fs.readFileSync('template.html','utf8');returnfile_data.replace("[[YIELD]]",innerHtml);};functionwriteTemplate(response,innerHtml,values){response.write(mustache.to_html(htmlTemplate(innerHtml),values));response.end();}functionconvertGremlinTable(table){returnfromTableToObject(table[0][0].columns,table[0][0].data);}functionfromTableToObject(columns,data){varres=[];for(vari=0;i<data.length;i++){varobj={};for(varj=0;j<columns.length;j++){if(data[i][j]!='null')obj[columns[j]]=data[i][j];}res.push(obj);}returnres;}// A Nodejs web app utility setupappServer=newbricks.appserver();// attach request plugin to easily extract paramsappServer.addRoute("^/",appServer.plugins.request);/* * Just display a blank form if no band is given. */appServer.addRoute("^/$",function(req,res){writeTemplate(res,'',{message:"Find a band"});});/* * Accepts a band name and displays all artists in the band. * Also displays a list of suggested bands where at least * one artist has played at one time. *//* * Gremlin: aliases introduced for the same node and/or * relationship appear to be used in reverse order in * Tables. */appServer.addRoute("^/band$",function(req,res){varbandName=req.param('name'),bandKey=couchUtil.couchKeyify(bandName),bandNodePath='/bands/'+bandKey,queryPrefix='g.idx("bands")[["name":"'+bandName+'"]]',otherBandsQuery=queryPrefix+'.out("member").in("member").dedup.name',currentMembersQuery=queryPrefix+'.outE("member").as("from")'+'.filter{it.to == null}.inV.as("name")'+'.table(new Table()){it.from}{it.name}.cap()',otherMembersQuery=queryPrefix+'.outE("member").as("from").as("to")'+'.filter{it.to != null}.inV.as("name")'+'.table(new Table())'+'{it.to}{it.from}{it.name}.cap()';gremlin(otherBandsQuery,function(graphData){gremlin(currentMembersQuery,function(currentMembers){gremlin(otherMembersQuery,function(otherMembers){varvalues={band:bandName,bands:graphData,currents:convertGremlinTable(currentMembers),others:convertGremlinTable(otherMembers),bandK:bandKey};varbody='<h2>Current {{band}} Band Members</h2>';if(currentMembers[0][0].data.length>0){body+='<ul>{{#currents}}';body+='<li><a href="/artist?name={{name}}">{{name}}';body+='{{#from}} from {{from}}{{/from}}</a></li>';body+='{{/currents}}';}else{body+="<p>No current member (dead band?)</p>";}mongoClient.open(function(err,db){mongodb.GridStore.exist(db,bandKey+'.mp3',function(err,exist){if(exist)body+='<a href="http://'+host+':'+streamPort+'?band={{bandK}}">Sample</a>';if(otherMembers[0][0].data.length>0){body+='<h3>Other members</h3>';body+='<ul>{{#others}}';body+='<li><a href="/artist?name={{name}}">{{name}}';body+='{{#from}} from {{from}}{{/from}}';body+='{{#to}} to {{to}}{{/to}}</a></li>';body+='{{/others}}</ul>';}body+='<h3>You may also like</h3>';body+='<ul>{{#bands}}';body+='<li><a href="/band?name={{.}}">{{.}}</a></li>';body+='{{/bands}}</ul>';writeTemplate(res,body,values);db.close();});});});});});});/* * Accepts an artist name and displays band and role information */appServer.addRoute("^/artist$",function(req,res){varartistName=req.param('name'),queryPrefix='g.idx("artists")[["name":"'+artistName+'"]]',rolesQuery=queryPrefix+'.out("plays").role.dedup',bandsQuery='g.idx("artists")[["name":"'+artistName+'"]]'+'.inE("member").as("to").as("from").outV.as("name")'+'.table(new Table()){it.from}{it.to}{it.name}.cap()';gremlin(rolesQuery,function(roles){gremlin(bandsQuery,function(bands){varvalues={artist:artistName,roles:roles,bands:convertGremlinTable(bands)};varbody='<h3>{{artist}} Performs these Roles</h3>';body+='<ul>{{#roles}}';body+='<li>{{.}}</li>';body+='{{/roles}}</ul>';body+='<h3>Play in Bands</h3>';body+='<ul>{{#bands}}';body+='<li><a href="/band?name={{name}}">{{name}}';body+='{{#from}} from {{from}}{{/from}}';body+='{{#to}} to {{to}}{{/to}}</a></li>';body+='</a></li>';body+='{{/bands}}</ul>';writeTemplate(res,body,values);});});});/* * A band name search. Used for autocompletion. */appServer.addRoute("^/search$",function(req,res){varquery=req.param('term');redisClient.keys("band-name:"+query+"*",function(error,keys){varbands=[];keys.forEach(function(key){bands.push(key.replace("band-name:",''));});res.write(JSON.stringify(bands));res.end();});});// cannot seem to get bricks to stream data back// use simple default http serverhttp.createServer(function(request,response){varband=url.parse(request.url,true).query.band;mongoClient.open(function(err,db){vargs=newmongodb.GridStore(db,band+'.mp3',"r");gs.open(function(err,gs){console.log("streaming...");response.writeHeader(200,{'Content-type':'audio/mpeg, audio/x-mpeg, audio/x-mpeg-3, audio/mpeg3',// magic headers to stream mp3...'X-Pad':'avoid browser bug','Cache-Control':'no-cache','Content-Length':gs.length});// cannot use gridstore streams; somehow file always// truncated - load in memory instead// gs.stream(true).pipe(response);gs.read(gs.length,function(err,data){response.write(data);response.end();db.close();});});});}).listen(streamPort);// catch all unknown routes with a 404appServer.addRoute(".+",appServer.plugins.fourohfour);appServer.addRoute(".+",appServer.plugins.loghandler,{section:"final"});// start up the serverconsole.log("Starting Server on port "+port);appServer.createServer().listen(port);
Wrapping Up
Well, that was a long day. I should have enjoyed it, but the lack of
maturity in some of the tools (Neo4j’s always evolving query language
and the GridFS streaming bug) caused hours of frustration. The main
cause, however, was missing knowledge: faced with an unexpected
behaviour, I had no idea whether it was a bug (find a workaround) or
an incorrect invocation (rework the query to correct it).
The exposition of polyglot persistence through the music information
service were pretty good, given the space constraint. Of course it
skipped the really ugly and tedious parts (how to incrementally keep
the databases in sync when the main records are updated, not merely
created); given the variation in data models, data manipulation (or
lack thereof) and query between the different databases, this can
easily become a nightmare (especially if incremental updates are not
part of the initial design).
Another upcoming book, Big Data, takes
a very different approach (no updates, only appends). I look forward
to reading it.