PIGATO - an high-performance microservices framework based on ZeroMQ
PIGATO - an high-performance Node.js microservices framework based on ZeroMQ
PIGATO aims to offer an high-performance, reliable, scalable and extensible service-oriented framework supporting multiple programming languages: Node.js/Io.js and Ruby.
Supported Programming Languages
Start a Broker
node examples/broker
1) echo : simple echo request-reply
node examples/echo/worker
node examples/echo/client
2) stocks : get stocks data from yahoo
node examples/stocks/worker
node examples/stocks/client
More examples
PIGATO-EXAMPLES : a collection of multi-purpose useful examples.
PIGATO-PERF : a command-line tool to test PIGATO performances in different scenarios. i
PIGATO.Broker(addr, conf)
addr
- Broker address (string, i.e: 'tcp://*:12345') conf
- configuration override (type=object, i.e { concurrency: 20 })
onStart
: function to be called when the Broker start onStop
: function to be called when the Broker stopSimply starts up a broker.
var Broker = require('./../index').Broker;
var broker = new Broker("tcp://*:55555");
broker.start(function() {
console.log("Broker started");
});
start
: on Client startstop
: on Client stopPIGATO.Worker(addr, serviceName, conf)
addr
- Broker address (type=string, i.e: 'tcp://localhost:12345') serviceName
- service implemented by the Worker (type=string, i.e: 'echo')
conf
- configuration override (type=object, i.e { concurrency: 20 })
concurrency
- sets max number of concurrent requests (type=int, -1 = no limit)onConnect
: function to be called when the Worker connects to the BrokeronDisconnnect
: function to be called when the Worker disconnects from the Brokeron
Worker receives request
events with 3 arguments:
data
- data sent from the Client (type=string/object/array).reply
- extended writable stream (type=object)opts
- client request options (type=object)reply
writable stream exposes also following methods and attributes:
write()
- sends partial data to the Client end()
- sends last data to the Client and completes/closes current Requestreject()
- rejects a Request.heartbeat()
- forces sending heartbeat to the Brokeractive()
- returns the status of the Request (type=boolean). A Request becomes inactive when the Worker disconnects from the Broker or it has been discarded by the Client or the Client disconnects from the Broker. This is useful for long running tasks so the Worker can monitor whether or not continue processing a Request.ended
- tells if the Request has been ended (type=boolean).Example
var worker = new PIGATO.Worker('tcp://localhost:12345', 'my-service');
worker.start();
worker.on('request', function(data, reply, copts) {
for (var i = 0; i < 1000; i++) {
reply.write('PARTIAL DATA ' + i);
}
reply.end('FINAL DATA');
});
// or
worker.on('request', function(data, reply, copts) {
fs.createReadStream(data).pipe(reply);
});
Worker may also specify whether the reply should be cached and the cache timeout in milliseconds
Example
worker.on('request', function(data, reply) {
reply.opts.cache = 1000; // cache reply for 1 second
reply.end('FINAL DATA');
});
Worker can change concurrency level updating its configuration. This information is carried with the heartbeat message.
Example
worker.conf.concurrency = 2;
Take note: due to the framing protocol of zmq
only the data supplied to response.end(data)
will be given to the client's final callback.
start
: on Worker startstop
: on Worker stopconnect
: on Worker connectdisconnect
: on Worker disconnectPIGATO.Client(addr, conf)
addr
- Broker address (type=string, i.e: 'tcp://localhost:12345') conf
autostart
: automatically starts the Client (type=boolean, default=false)onConnect
: function to be called when the Client connects to the BrokeronDisconnnect
: function to be called when the Client disconnects from the Brokerstart
Start the Client
request
Send a Request
serviceName
- name of the Service we wish to connect to (type=string)data
- data to give to the Service (type=string/object/buffer)opts
- options for the Request (type=object)
timeout
: timeout in milliseconds (type=number, default=60000, -1 for infinite timeout)retry
: if a Worker dies before replying, the Request is automatically requeued. (type=number, values=0|1, default=0)nocache
: skip Broker's cacheworkerId
: ID of the Worker that must handle the Request (type=string)Example
var client = new PIGATO.Client('tcp://localhost:12345');
client.start()
client.request('my-service', { foo: 'bar' }, { timeout: 120000 })
.on('data', function(data) {
console.log("DATA", data);
})
.on('end', function() {
console.log("END");
});
// or
client.request('my-service', 'foo', { timeout: 120000 }).pipe(process.stdout);
Clients may also make request with partial and final callbacks instead of using streams.
serviceName
data
partialCallback(err, data)
- called whenever the request does not end but emits datafinalCallback(err, data)
- called when the request will emit no more dataopts
Example
client.request('my-service', 'foo', function (err, data) {
// frames sent prior to final frame
console.log('PARTIAL', data);
}, function (err, data) {
// this is the final frame sent
console.log('FINAL', data);
}, { timeout: 30000 });
start
: on Client startstop
: on Client stopconnect
: on Client connectdisconnect
: on Client disconnectCore services are a set of Services that interact with a Broker via a dedicated PUB/SUB channel to extend its core functionalities.
var broker = new PIGATO.Broker(bhost);
var csrv = new PIGATO.services.ExampleCoreService(bhost, {
intch: broker.conf.intch // internal pub/sub channel
});
broker.start();
csrv.start();
PIGATO.services.Directory
Directory service ($dir) replies to Requests with the list of available Workers for a selected service.
Example
// Broadcast a message to all Workers that offer 'echo' Service
client.request(
'$dir', 'echo', undefined,
function(err, workers) {
workers.forEach(function(wid) {
client.request('echo', 'foo', { workerId: wid });
});
}
);
inproc
socket the broker must become active before any queued messages.