by R.I. Pienaar | Jan 1, 2013 | Code
Most Nagios systems does a lot of forking especially those built around something like NRPE where each check is a connection to be made to a remote system. On one hand I like NRPE in that it puts the check logic on the nodes using a standard plugin format and provides a fairly re-usable configuration file but on the other hand the fact that the Nagios machine has to do all this forking has never been good for me.
In the past I’ve shown one way to scale checks by aggregate all results for a specific check into one result but this is not always a good fit as pointed out in the post. I’ve now built a system that use the same underlying MCollective infrastructure as in the previous post but without the aggregation.
I have a pair of Nagios nodes – one in the UK and one in France – and they are on quite low spec VMs doing around 400 checks each. The problems I have are:
- The machines are constantly loaded under all the forking, one would sit on 1.5 Load Average almost all the time
- They use a lot of RAM and it’s quite spikey, if something is wrong especially I’d have a lot of checks concurrently so the machines have to be bigger than I want them
- The check frequency is quite low in the usual Nagios manner, sometimes 10 minutes can go by without a check
- The check results do not represent a point in time, I have no idea how the check results of node1 relate to those on node2 as they can be taken anywhere in the last 10 minutes
These are standard Nagios complaints though and there are many more but these ones specifically is what I wanted to address right now with the system I am showing here.
Probably not a surprise but the solution is built on MCollective, it uses the existing MCollective NRPE agent and the existing queueing infrastructure to push the forking to each individual node – they would do this anyway for every NRPE check – and read the results off a queue and spool it into the Nagios command file as Passive results. Internally it splits the traditional MCollective request-response system into a async processing system using the technique I blogged about before.
As you can see the system is made up of a few components:
- The Scheduler takes care of publishing requests for checks
- MCollective and the middleware provides AAA and transport
- The nodes all run the MCollective NRPE agent which put their replies on the Queue
- The Receiver reads the results from the Queue and write them to the Nagios command file
The Scheduler
The scheduler daemon is written using the excellent Rufus Scheduler gem – if you do not know it you totally should check it out, it solves many many problems. Rufus allows me to create simple checks on intervals like 60s and I combine these checks with MCollective filters to create a simple check configuration as below:
nrpe 'check_bacula_main', '6h', 'bacula::node monitored_by=monitor1'
nrpe 'check_disks', '60s', 'monitored_by=monitor1'
nrpe 'check_greylistd', '60s', 'greylistd monitored_by=monitor1'
nrpe 'check_load', '60s', 'monitored_by=monitor1'
nrpe 'check_mailq', '60s', 'monitored_by=monitor1'
nrpe 'check_mongodb', '60s', 'mongodb monitored_by=monitor1'
nrpe 'check_mysql', '60s', 'mysql::server monitored_by=monitor1'
nrpe 'check_pki', '60m', 'monitored_by=monitor1'
nrpe 'check_swap', '60s', 'monitored_by=monitor1'
nrpe 'check_totalprocs', '60s', 'monitored_by=monitor1'
nrpe 'check_zombieprocs', '60s', 'monitored_by=monitor1' |
nrpe 'check_bacula_main', '6h', 'bacula::node monitored_by=monitor1'
nrpe 'check_disks', '60s', 'monitored_by=monitor1'
nrpe 'check_greylistd', '60s', 'greylistd monitored_by=monitor1'
nrpe 'check_load', '60s', 'monitored_by=monitor1'
nrpe 'check_mailq', '60s', 'monitored_by=monitor1'
nrpe 'check_mongodb', '60s', 'mongodb monitored_by=monitor1'
nrpe 'check_mysql', '60s', 'mysql::server monitored_by=monitor1'
nrpe 'check_pki', '60m', 'monitored_by=monitor1'
nrpe 'check_swap', '60s', 'monitored_by=monitor1'
nrpe 'check_totalprocs', '60s', 'monitored_by=monitor1'
nrpe 'check_zombieprocs', '60s', 'monitored_by=monitor1'
Taking the first line it says: Run the check_bacula_main NRPE check every 6 hours on machines with the bacula::node Puppet Class and with the fact monitored_by=monitor1. I had the monitored_by fact already to assist in building my Nagios configs using a simple search based approach in Puppet.
When the scheduler starts it will log:
W, [2012-12-31T22:10:12.186789 #32043] WARN -- : activemq.rb:96:in `on_connecting' TCP Connection attempt 0 to stomp://nagios@stomp.example.net:6163
W, [2012-12-31T22:10:12.193405 #32043] WARN -- : activemq.rb:101:in `on_connected' Conncted to stomp://nagios@stomp.example.net:6163
I, [2012-12-31T22:10:12.196387 #32043] INFO -- : scheduler.rb:23:in `nrpe' Adding a job for check_bacula_main every 6h matching 'bacula::node monitored_by=monitor1', first in 19709s
I, [2012-12-31T22:10:12.196632 #32043] INFO -- : scheduler.rb:23:in `nrpe' Adding a job for check_disks every 60s matching 'monitored_by=monitor1', first in 57s
I, [2012-12-31T22:10:12.197173 #32043] INFO -- : scheduler.rb:23:in `nrpe' Adding a job for check_load every 60s matching 'monitored_by=monitor1', first in 23s
I, [2012-12-31T22:10:35.326301 #32043] INFO -- : scheduler.rb:26:in `nrpe' Publishing request for check_load with filter 'monitored_by=monitor1' |
W, [2012-12-31T22:10:12.186789 #32043] WARN -- : activemq.rb:96:in `on_connecting' TCP Connection attempt 0 to stomp://nagios@stomp.example.net:6163
W, [2012-12-31T22:10:12.193405 #32043] WARN -- : activemq.rb:101:in `on_connected' Conncted to stomp://nagios@stomp.example.net:6163
I, [2012-12-31T22:10:12.196387 #32043] INFO -- : scheduler.rb:23:in `nrpe' Adding a job for check_bacula_main every 6h matching 'bacula::node monitored_by=monitor1', first in 19709s
I, [2012-12-31T22:10:12.196632 #32043] INFO -- : scheduler.rb:23:in `nrpe' Adding a job for check_disks every 60s matching 'monitored_by=monitor1', first in 57s
I, [2012-12-31T22:10:12.197173 #32043] INFO -- : scheduler.rb:23:in `nrpe' Adding a job for check_load every 60s matching 'monitored_by=monitor1', first in 23s
I, [2012-12-31T22:10:35.326301 #32043] INFO -- : scheduler.rb:26:in `nrpe' Publishing request for check_load with filter 'monitored_by=monitor1'
You can see it reads the file and schedule the first check a random interval between now and the interval window this spread out the checks.
The Receiver
The receiver has almost no config, it just need to know what queue to read and where your Nagios command file lives, it logs:
I, [2013-01-01T11:49:38.295661 #23628] INFO -- : mnrpes.rb:35:in `daemonize' Starting in the background
W, [2013-01-01T11:49:38.302045 #23631] WARN -- : activemq.rb:96:in `on_connecting' TCP Connection attempt 0 to stomp://nagios@stomp.example.net:6163
W, [2013-01-01T11:49:38.310853 #23631] WARN -- : activemq.rb:101:in `on_connected' Conncted to stomp://nagios@stomp.example.net:6163
I, [2013-01-01T11:49:38.310980 #23631] INFO -- : receiver.rb:16:in `subscribe' Subscribing to /queue/mcollective.nagios_passive_results_monitor1
I, [2013-01-01T11:49:41.572362 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357040981] PROCESS_SERVICE_CHECK_RESULT;node1.example.net;mongodb;0;OK: connected, databases admin local my_db puppet mcollective
I, [2013-01-01T11:49:42.509061 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357040982] PROCESS_SERVICE_CHECK_RESULT;node2.example.net;zombieprocs;0;PROCS OK: 0 processes with STATE = Z
I, [2013-01-01T11:49:42.510574 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357040982] PROCESS_SERVICE_CHECK_RESULT;node3.example.net;zombieprocs;0;PROCS OK: 1 process with STATE = Z |
I, [2013-01-01T11:49:38.295661 #23628] INFO -- : mnrpes.rb:35:in `daemonize' Starting in the background
W, [2013-01-01T11:49:38.302045 #23631] WARN -- : activemq.rb:96:in `on_connecting' TCP Connection attempt 0 to stomp://nagios@stomp.example.net:6163
W, [2013-01-01T11:49:38.310853 #23631] WARN -- : activemq.rb:101:in `on_connected' Conncted to stomp://nagios@stomp.example.net:6163
I, [2013-01-01T11:49:38.310980 #23631] INFO -- : receiver.rb:16:in `subscribe' Subscribing to /queue/mcollective.nagios_passive_results_monitor1
I, [2013-01-01T11:49:41.572362 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357040981] PROCESS_SERVICE_CHECK_RESULT;node1.example.net;mongodb;0;OK: connected, databases admin local my_db puppet mcollective
I, [2013-01-01T11:49:42.509061 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357040982] PROCESS_SERVICE_CHECK_RESULT;node2.example.net;zombieprocs;0;PROCS OK: 0 processes with STATE = Z
I, [2013-01-01T11:49:42.510574 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357040982] PROCESS_SERVICE_CHECK_RESULT;node3.example.net;zombieprocs;0;PROCS OK: 1 process with STATE = Z
As the results get pushed to Nagios I see the following in its logs:
[1357042122] EXTERNAL COMMAND: PROCESS_SERVICE_CHECK_RESULT;node1.example.net;zombieprocs;0;PROCS OK: 0 processes with STATE = Z
[1357042124] PASSIVE SERVICE CHECK: node1.example.net;zombieprocs;0;PROCS OK: 0 processes with STATE = Z |
[1357042122] EXTERNAL COMMAND: PROCESS_SERVICE_CHECK_RESULT;node1.example.net;zombieprocs;0;PROCS OK: 0 processes with STATE = Z
[1357042124] PASSIVE SERVICE CHECK: node1.example.net;zombieprocs;0;PROCS OK: 0 processes with STATE = Z
Did it solve my problems?
I listed the set of problems I wanted to solve so it’s worth evaluating if I did solve them properly.
Less load and RAM use on the Nagios nodes
My Nagios nodes have gone from load averages of 1.5 to 0.1 or 0.0, they are doing nothing, they use a lot less RAM and I have removed some of the RAM from the one and given it to my Jenkins VM instead, it was a huge win. The sender and receiver is quite light on resources as you can see below:
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
nagios 9757 0.4 1.8 130132 36060 ? S 2012 3:41 ruby /usr/bin/mnrpes-receiver --pid=/var/run/mnrpes/mnrpes-receiver.pid --config=/etc/mnrpes/mnrpes-receiver.cfg
nagios 9902 0.3 1.4 120056 27612 ? Sl 2012 2:22 ruby /usr/bin/mnrpes-scheduler --pid=/var/run/mnrpes/mnrpes-scheduler.pid --config=/etc/mnrpes/mnrpes-scheduler.cfg |
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
nagios 9757 0.4 1.8 130132 36060 ? S 2012 3:41 ruby /usr/bin/mnrpes-receiver --pid=/var/run/mnrpes/mnrpes-receiver.pid --config=/etc/mnrpes/mnrpes-receiver.cfg
nagios 9902 0.3 1.4 120056 27612 ? Sl 2012 2:22 ruby /usr/bin/mnrpes-scheduler --pid=/var/run/mnrpes/mnrpes-scheduler.pid --config=/etc/mnrpes/mnrpes-scheduler.cfg
On the RAM side I now never get a pile up of many checks. I do have the stale detection enabled on my Nagios template so if something breaks in the scheduler/receiver/broker triplet Nagios will still try to do a traditional check to see what’s going on but that’s bearable.
Check frequency too low
With this system I could do my checks every 10 seconds without any problems, I settled on 60 seconds as that’s perfect for me. Rufus scheduler does a great job of managing that and the requests from the scheduler are effectively fire and forget as long as the broker is up.
Results are spread over 10 minutes
The problem with the results for load on node1 and node2 having no temporal correlation is gone too now, because I use MCollectives parallel nature all the load checks happen at the same time:
Here is the publisher:
I, [2013-01-01T12:00:14.296455 #20661] INFO -- : scheduler.rb:26:in `nrpe' Publishing request for check_load with filter 'monitored_by=monitor1' |
I, [2013-01-01T12:00:14.296455 #20661] INFO -- : scheduler.rb:26:in `nrpe' Publishing request for check_load with filter 'monitored_by=monitor1'
…and the receiver:
I, [2013-01-01T12:00:14.380981 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node1.example.net;load;0;OK - load average: 0.92, 0.54, 0.42|load1=0.920;9.000;10.000;0; load5=0.540;8.000;9.000;0; load15=0.420;7.000;8.000;0;
I, [2013-01-01T12:00:14.383875 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node2.example.net;load;0;OK - load average: 0.00, 0.00, 0.00|load1=0.000;1.500;2.000;0; load5=0.000;1.500;2.000;0; load15=0.000;1.500;2.000;0;
I, [2013-01-01T12:00:14.387427 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node3.example.net;load;0;OK - load average: 0.02, 0.07, 0.07|load1=0.020;1.500;2.000;0; load5=0.070;1.500;2.000;0; load15=0.070;1.500;2.000;0;
I, [2013-01-01T12:00:14.388754 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node4.example.net;load;0;OK - load average: 0.07, 0.02, 0.00|load1=0.070;1.500;2.000;0; load5=0.020;1.500;2.000;0; load15=0.000;1.500;2.000;0;
I, [2013-01-01T12:00:14.404650 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node5.example.net;load;0;OK - load average: 0.03, 0.09, 0.04|load1=0.030;1.500;2.000;0; load5=0.090;1.500;2.000;0; load15=0.040;1.500;2.000;0;
I, [2013-01-01T12:00:14.405689 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node6.example.net;load;0;OK - load average: 0.06, 0.06, 0.07|load1=0.060;3.000;4.000;0; load5=0.060;3.000;4.000;0; load15=0.070;3.000;4.000;0;
I, [2013-01-01T12:00:14.489590 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node7.example.net;load;0;OK - load average: 0.06, 0.14, 0.14|load1=0.060;1.500;2.000;0; load5=0.140;1.500;2.000;0; load15=0.140;1.500;2.000;0; |
I, [2013-01-01T12:00:14.380981 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node1.example.net;load;0;OK - load average: 0.92, 0.54, 0.42|load1=0.920;9.000;10.000;0; load5=0.540;8.000;9.000;0; load15=0.420;7.000;8.000;0;
I, [2013-01-01T12:00:14.383875 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node2.example.net;load;0;OK - load average: 0.00, 0.00, 0.00|load1=0.000;1.500;2.000;0; load5=0.000;1.500;2.000;0; load15=0.000;1.500;2.000;0;
I, [2013-01-01T12:00:14.387427 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node3.example.net;load;0;OK - load average: 0.02, 0.07, 0.07|load1=0.020;1.500;2.000;0; load5=0.070;1.500;2.000;0; load15=0.070;1.500;2.000;0;
I, [2013-01-01T12:00:14.388754 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node4.example.net;load;0;OK - load average: 0.07, 0.02, 0.00|load1=0.070;1.500;2.000;0; load5=0.020;1.500;2.000;0; load15=0.000;1.500;2.000;0;
I, [2013-01-01T12:00:14.404650 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node5.example.net;load;0;OK - load average: 0.03, 0.09, 0.04|load1=0.030;1.500;2.000;0; load5=0.090;1.500;2.000;0; load15=0.040;1.500;2.000;0;
I, [2013-01-01T12:00:14.405689 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node6.example.net;load;0;OK - load average: 0.06, 0.06, 0.07|load1=0.060;3.000;4.000;0; load5=0.060;3.000;4.000;0; load15=0.070;3.000;4.000;0;
I, [2013-01-01T12:00:14.489590 #23631] INFO -- : receiver.rb:34:in `receive_and_submit' Submitting passive data to nagios: [1357041614] PROCESS_SERVICE_CHECK_RESULT;node7.example.net;load;0;OK - load average: 0.06, 0.14, 0.14|load1=0.060;1.500;2.000;0; load5=0.140;1.500;2.000;0; load15=0.140;1.500;2.000;0;
All the results are from the same second, win.
Conclusion
So my scaling issues on my small site is solved and I think the way this is built will work for many people. The code is on GitHub and requires MCollective 2.2.0 or newer.
Having reused the MCollective and Rufus libraries for all the legwork including logging, daemonizing, broker connectivity, addressing and security I was able to build this in a very short time, the total code base is only 237 lines excluding packaging etc. which is a really low number of lines for what it does.
by R.I. Pienaar | Aug 31, 2012 | Code
Yesterday I mentioned on Twitter that I was playing with the MongoDB pub/sub features and that it worked quite well for my needs.
What I didn’t mention was that the documentation and blog posts were a bit all over the show and the Ruby examples I saw didn’t actually do what they said they did so I’ll show in this post working code and some basic approaches I took to deal with per consumer destinations etc.
Why?
So why would anyone want to use MongoDB as a queue or indeed MongoDB at all since everyone knows it’s unusable and does not save any data ever and kills kittens?
Actually MongoDB is a really nice database but like most NoSQL databases the thing to know about it is what shortcuts it takes with your data to do it’s magic. Knowing this you have to evaluate its suitability to your specific problem and if it’s not suitable, don’t use it.
It’s fast and has a flexible query system to search over arbitrary structured JSON data. Yes it has some interesting ideas about data durability but this is well known by now and if your needs match it’s features it’s not bad.
For shops with needs well suited to MongoDB who might want to add some queueing ability it can be daunting to bring in new tech like RabbitMQ or ActiveMQ because it brings new unknowns requires an investment in more monitoring, training and learning by making mistakes. If you already have a Mongo instance and know its quirks using it for a queue might not be such a terrible thing.
Additionally MongoDB is really easy to get going and generally I find for my work loads it just works with little maintenance required.
So my interest in its queueing abilities lies in providing a simpler ‘getting started’ for MCollective. New MCollective has pluggable discovery which works really well when discovering against a MongoDB cache of registration data so it would be nice if a simple starter edition setup could include both the queue and discovery data in one simple bit of software.
There are other options of course like Redis and I’ll evaluate them but of the various options MongoDB is the only one that comes with both pubsub and searching/querying capabilities that does what I need, isn’t written in Java and has OS packages for most distros easily available.
Background
In MongoDB when you do a find on a collection the returned result set is a Cursor. Cursors can have a number of modes or flags associated with them. Further it has something called Capped Collections that are fixed size and rotate old data out when they fill up.
The combination of some of these Cursor flags and Capped Collections enables a kind of tail -f behavior that works like a queue.
When you have a collection it usually returns nil when you reached the end of your results as can be seen here:
>> coll = db.collection('commands')
=> Mongo::DB:0x7fa1ae005f58 ....>
>> cursor = coll.find()
=> Mongo::Cursor:0x3fd0d6f61184 ....>
>> cursor.skip(cursor.count)
=> Mongo::Cursor:0x3fd0d6f61184 ....>
>> cursor.next_document
=> nil |
>> coll = db.collection('commands')
=> Mongo::DB:0x7fa1ae005f58 ....>
>> cursor = coll.find()
=> Mongo::Cursor:0x3fd0d6f61184 ....>
>> cursor.skip(cursor.count)
=> Mongo::Cursor:0x3fd0d6f61184 ....>
>> cursor.next_document
=> nil
Here we opened a collections and did a find. We moved to the end of the results and fetched the next result which immediately returned a nil indicating there’s nothing new.
Lets see how we can change the behavior of this collection that instead of returning immediately it will block for a while waiting for a new document and then return a nil after after a timeout if nothing new was found:
>> cursor = coll.find()
=> Mongo::Cursor:0x3fd0d6f61184 ....>
>> cursor.skip(cursor.count)
=> Mongo::Cursor:0x3fd0d6f61184 ....>
>> cursor.add_option(Mongo::Cursor::OP_QUERY_TAILABLE)
=> 2
>> cursor.add_option(Mongo::Cursor::OP_QUERY_AWAIT_DATA)
=> 34
>> loop { puts "#{Time.now}> Tailing...."; p cursor.next_document }
Fri Aug 31 13:40:19 +0100 2012> Tailing....
nil
Fri Aug 31 13:40:21 +0100 2012> Tailing....
nil
Fri Aug 31 13:40:23 +0100 2012> Tailing....
nil |
>> cursor = coll.find()
=> Mongo::Cursor:0x3fd0d6f61184 ....>
>> cursor.skip(cursor.count)
=> Mongo::Cursor:0x3fd0d6f61184 ....>
>> cursor.add_option(Mongo::Cursor::OP_QUERY_TAILABLE)
=> 2
>> cursor.add_option(Mongo::Cursor::OP_QUERY_AWAIT_DATA)
=> 34
>> loop { puts "#{Time.now}> Tailing...."; p cursor.next_document }
Fri Aug 31 13:40:19 +0100 2012> Tailing....
nil
Fri Aug 31 13:40:21 +0100 2012> Tailing....
nil
Fri Aug 31 13:40:23 +0100 2012> Tailing....
nil
Now instead of immediately returning a nil it will wait 2 to 3 seconds at the end of the collection incase new data comes.
So this is your consumer to the queue called commands here, anyone who saves data into the collection are producers. It’s quite light on resources on both the client and the MongoDB server, on a fairly low spec VM I was easily able to run 50+ consumers, a MongoDB instance and some producers.
MongoDB calls this feature Tailable Cursors and the thing the Ruby docs don’t tell you and that the Ruby library does not do for you is set the Mongo::Cursor::OP_QUERY_AWAIT_DATA option as above. Without this option it will still return nil immediately and the example code has a sleep to combat a infinite high resource usage loop. The proposed sleeping solution to the problem makes it completely pointless as a high performance Queue but the Mongo::Cursor::OP_QUERY_AWAIT_DATA option sorts that out.
A simple message structure
In my use case I have to be able to send messages to all consumers or sometimes just to a specific consumer. In other middleware you do this with different queue names or perhaps headers and then do selective subscribes to the queue picking off just the messages you are interested in.
I chose to use a single capped collection and use a structure similar to middleware headers to identify message targets:
{"headers" : {"target" : "all"},
"payload" : "data"} |
{"headers" : {"target" : "all"},
"payload" : "data"}
{"headers" : {"target" : "some.consumer"},
"payload" : "data"} |
{"headers" : {"target" : "some.consumer"},
"payload" : "data"}
The 2 examples show different target headers in one I am targeting everyone consuming the queue and in the 2nd one just a specific consumer. The payload can be anything, text, hashes whatever your needs are.
Lets look at a consumer that has a consumer name and that’s interested in messages directed at it or all consumers:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
@consumer_identity = "example.com"
@database = "queue"
@collection = "commands"
def get_collection
@db ||= Mongo::Connection.new().db(@database)
until @db.connection.active?
puts ">>> Retrying database connection..."
@db.connection.close
@db.connection.connect
sleep 0.5 unless @db.connection.active?
end
unless @db.collections.include?(@collection)
coll = @db.create_collection(@collection, :capped => true, :size => 10240)
else
coll = @db.collection(@collection)
end
coll
end
loop do
begin
cursor = get_collection.find({"headers.target" => {"$in" => [@consumer_identity, "all"]}})
# ignore old stuff
cursor.skip(cursor.count)
# blocking tail reads
cursor.add_option(Mongo::Cursor::OP_QUERY_TAILABLE)
cursor.add_option(Mongo::Cursor::OP_QUERY_AWAIT_DATA)
begin
# fetch all the docs forever
loop do
if doc = cursor.next_document
p doc["payload"]
end
end
rescue Mongo::OperationFailure => e
puts ">>> Cursor closed: %s (%s)" % [e.to_s, e.class]
end
rescue Mongo::ConnectionFailure
puts ">>> DB connection failed"
end
end |
@consumer_identity = "example.com"
@database = "queue"
@collection = "commands"
def get_collection
@db ||= Mongo::Connection.new().db(@database)
until @db.connection.active?
puts ">>> Retrying database connection..."
@db.connection.close
@db.connection.connect
sleep 0.5 unless @db.connection.active?
end
unless @db.collections.include?(@collection)
coll = @db.create_collection(@collection, :capped => true, :size => 10240)
else
coll = @db.collection(@collection)
end
coll
end
loop do
begin
cursor = get_collection.find({"headers.target" => {"$in" => [@consumer_identity, "all"]}})
# ignore old stuff
cursor.skip(cursor.count)
# blocking tail reads
cursor.add_option(Mongo::Cursor::OP_QUERY_TAILABLE)
cursor.add_option(Mongo::Cursor::OP_QUERY_AWAIT_DATA)
begin
# fetch all the docs forever
loop do
if doc = cursor.next_document
p doc["payload"]
end
end
rescue Mongo::OperationFailure => e
puts ">>> Cursor closed: %s (%s)" % [e.to_s, e.class]
end
rescue Mongo::ConnectionFailure
puts ">>> DB connection failed"
end
end
On line 30 we’re setting up a Cursor for all messages matching “all” and our identity. You can now simply publish data with correct headers to target specific consumers or all consumers. The 2 loops will forever attempt to reconnect to any failed database and forever read whatever new messages arrives after connection.
Using this method it’s really easy to come up with all kinds of addressing modes for your queue. For example you can give work being done a job name and combine it with the target header to create sets of named consumers that will all receive commands that match just the work they’re able to do.
Results
As I initially said I did all this to test a MCollective connector that uses MongoDB as a middleware. It worked surprisingly well and I have both broadcast and directed modes working:
$ mco ping
.
.
---- ping statistics ----
15 replies max: 57.94 min: 48.56 avg: 54.72 |
$ mco ping
.
.
---- ping statistics ----
15 replies max: 57.94 min: 48.56 avg: 54.72
I’ll try out some other options for a small site or starter edition middleware and post follow up blog posts.
I’ll say I’ve been very surprised by how well this worked though. The connector is a bit complex and awkward because of how thread safety is handled in the MongoDB Ruby drivers but it’s not been a big problem overall to solve a pretty complex use case with this.
Specifically I noted performance didn’t degrade hugely with 50 nodes connected or with larger payloads which is very nice.
by R.I. Pienaar | Aug 19, 2012 | Code
This ia a post in a series of posts I am doing about MCollective 2.0 and later.
Overview
The kind of application I tend to show with MCollective is very request-response orientated. You request some info from nodes and it shows you the data as they reply. This is not the typical thing people tend to do with middleware, instead what they do is create receivers for event streams processing those into databases or using it as a job queue.
The MCollective libraries can be used to build similar applications and today I’ll show a basic use case for this. It’s generally really easy creating a consumer for a job queue using Middleware as covered in my recent series of blog posts. It’s much harder doing it when you want to support multiple middleware brokers, support pluggable payload encryption, different serializers add some Authentication, Authorization and Auditing into the mix and soon it becomes a huge undertaking.
MCollective already has a rich sets of plugins for all of this so it would be great if you could reuse these to save yourself some time.
Request, but reply elsewhere
One of the features we added in 2.0.0 is more awareness of the classical reply-to behaviour common to middleware brokers to the core MCollective libraries. Now every request specifies a reply-to target and the nodes will send their replies there, this is how we get replies back from nodes and if the brokers support it this is typically done using temporary private queues.
But it’s not restricted to this, lets see how you can use this feature from the command line. First we’ll setup a listener on a specific queue using my stomp-irb application.
% stomp-irb -s stomp -p 6163
Interactive Ruby shell for STOMP
info> Attempting to connect to stomp://rip@stomp:6163
info> Connected to stomp://rip@stomp:6163
Type 'help' for usage instructions
>> subscribe :queue, "mcollective.nagios_passive_results"
Current Subscriptions:
/queue/mcollective.nagios_passive_results
=> nil
>> |
% stomp-irb -s stomp -p 6163
Interactive Ruby shell for STOMP
info> Attempting to connect to stomp://rip@stomp:6163
info> Connected to stomp://rip@stomp:6163
Type 'help' for usage instructions
>> subscribe :queue, "mcollective.nagios_passive_results"
Current Subscriptions:
/queue/mcollective.nagios_passive_results
=> nil
>>
We’re now receiving all messages on /queue/mcollective.nagios_passive_results, lets see how we get all our machines to send some data there:
% mco rpc nrpe runcommand command=check_load --reply-to=/queue/mcollective.nagios_passive_results
Request sent with id: 61dcd7c8c4a354198289606fb55d5480 replies to /queue/mcollective.nagios_passive_results |
% mco rpc nrpe runcommand command=check_load --reply-to=/queue/mcollective.nagios_passive_results
Request sent with id: 61dcd7c8c4a354198289606fb55d5480 replies to /queue/mcollective.nagios_passive_results
Note this client recognised that you’re never going to get replies so it just publishes the request(s) and shows you the outcome. It’s real quick and doesn’t wait of care for the results.
And over in our stomp-irb we should see many messages like this one:
<<stomp>> BAh7CzoJYm9keSIB1QQIewg6CWRhdGF7CToNZXhpdGNvZGVpADoMY29tbWFu
ZCIPY2hlY2tfbG9hZDoLb3V0cHV0IihPSyAtIGxvYWQgYXZlcmFnZTogMC44
MiwgMC43NSwgMC43MToNcGVyZmRhdGEiV2xvYWQxPTAuODIwOzEuNTAwOzIu
MDAwOzA7IGxvYWQ1PTAuNzUwOzEuNTAwOzIuMDAwOzA7IGxvYWQxNT0wLjcx
MDsxLjUwMDsyLjAwMDswOyA6D3N0YXR1c2NvZGVpADoOc3RhdHVzbXNnIgdP
SzoOcmVxdWVzdGlkIiU2MWRjZDdjOGM0YTM1NDE5ODI4OTYwNmZiNTVkNTQ4
MDoMbXNndGltZWwrBwjRMFA6DXNlbmRlcmlkIgl0d3AxOgloYXNoIgGvbVdV
V0RXaTd6a04xRWYrM0RRUWQzUldsYjJINTltMUdWYkRBdWhVamJFaEhrOGJl
Ykd1Q1daMnRaZ3VBCmx3MW5DeXhtT2xWK3RpbzlCNFBMbnhoTStvV3Z6OEo4
SVNiYTA4a2lzK3BVTVZ0cGxiL0ZPRVlMVWFPRQp5K2QvRGY3N2I2TTdGaGtJ
RUxtR2hONHdnZTMxdU4rL3hlVHpRenE0M0lJNE5CVkpRTTg9CjoQc2VuZGVy
YWdlbnQiCW5ycGU= |
<<stomp>> BAh7CzoJYm9keSIB1QQIewg6CWRhdGF7CToNZXhpdGNvZGVpADoMY29tbWFu
ZCIPY2hlY2tfbG9hZDoLb3V0cHV0IihPSyAtIGxvYWQgYXZlcmFnZTogMC44
MiwgMC43NSwgMC43MToNcGVyZmRhdGEiV2xvYWQxPTAuODIwOzEuNTAwOzIu
MDAwOzA7IGxvYWQ1PTAuNzUwOzEuNTAwOzIuMDAwOzA7IGxvYWQxNT0wLjcx
MDsxLjUwMDsyLjAwMDswOyA6D3N0YXR1c2NvZGVpADoOc3RhdHVzbXNnIgdP
SzoOcmVxdWVzdGlkIiU2MWRjZDdjOGM0YTM1NDE5ODI4OTYwNmZiNTVkNTQ4
MDoMbXNndGltZWwrBwjRMFA6DXNlbmRlcmlkIgl0d3AxOgloYXNoIgGvbVdV
V0RXaTd6a04xRWYrM0RRUWQzUldsYjJINTltMUdWYkRBdWhVamJFaEhrOGJl
Ykd1Q1daMnRaZ3VBCmx3MW5DeXhtT2xWK3RpbzlCNFBMbnhoTStvV3Z6OEo4
SVNiYTA4a2lzK3BVTVZ0cGxiL0ZPRVlMVWFPRQp5K2QvRGY3N2I2TTdGaGtJ
RUxtR2hONHdnZTMxdU4rL3hlVHpRenE0M0lJNE5CVkpRTTg9CjoQc2VuZGVy
YWdlbnQiCW5ycGU=
What you’re looking at is a base64 encoded serialized MCollective reply message. This reply message is in this case signed using a SSL key for authenticity and has the whole MCollective reply in it.
MCollective to Nagios Passive Check bridge
So as you might have guessed from the use of the NRPE plugin and the queue name I chose the next step is to connect the MCollective NRPE results to Nagios using its passive check interface:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
require 'mcollective'
require 'pp'
# where the nagios command socket is
NAGIOSCMD = "/var/log/nagios/rw/nagios.cmd"
# to mcollective this is a client, load the client config and
# inform the security system we are a client
MCollective::Applications.load_config
MCollective::PluginManager["security_plugin"].initiated_by = :client
# connect to the middleware and subscribe
connector = MCollective::PluginManager["connector_plugin"]
connector.connect
connector.connection.subscribe("/queue/mcollective.nagios_passive_results")
# consume all the things...
loop do
# get a mcollective Message object and configure it as a reply
work = connector.receive
work.type = :reply
# decode it, this will go via the MCollective security system
# and validate SSL etcetc
work.decode!
# Now we have the NRPE result, just save it to nagios
result = work.payload
data = result[:body][:data]
unless data[:perfdata] == ""
output = "%s|%s" % [data[:output], data[:perfdata]]
else
output = data[:output]
end
passive_check = "[%d] PROCESS_SERVICE_CHECK_RESULT;%s;%s;%d;%s" % [result[:msgtime], result[:senderid], data[:command].gsub("check_", ""), data[:exitcode], output]
begin
File.open(NAGIOSCMD, "w") {|nagios| nagios.puts passive_check }
rescue => e
puts "Could not write to #{NAGIOSCMD}: %s: %s" % [e.class, e.to_s]
end
end |
require 'mcollective'
require 'pp'
# where the nagios command socket is
NAGIOSCMD = "/var/log/nagios/rw/nagios.cmd"
# to mcollective this is a client, load the client config and
# inform the security system we are a client
MCollective::Applications.load_config
MCollective::PluginManager["security_plugin"].initiated_by = :client
# connect to the middleware and subscribe
connector = MCollective::PluginManager["connector_plugin"]
connector.connect
connector.connection.subscribe("/queue/mcollective.nagios_passive_results")
# consume all the things...
loop do
# get a mcollective Message object and configure it as a reply
work = connector.receive
work.type = :reply
# decode it, this will go via the MCollective security system
# and validate SSL etcetc
work.decode!
# Now we have the NRPE result, just save it to nagios
result = work.payload
data = result[:body][:data]
unless data[:perfdata] == ""
output = "%s|%s" % [data[:output], data[:perfdata]]
else
output = data[:output]
end
passive_check = "[%d] PROCESS_SERVICE_CHECK_RESULT;%s;%s;%d;%s" % [result[:msgtime], result[:senderid], data[:command].gsub("check_", ""), data[:exitcode], output]
begin
File.open(NAGIOSCMD, "w") {|nagios| nagios.puts passive_check }
rescue => e
puts "Could not write to #{NAGIOSCMD}: %s: %s" % [e.class, e.to_s]
end
end
This code connects to the middleware using the MCollective Connector Plugin, subscribes to the specified queue and consumes the messages.
You’ll note there is very little being done here that’s actually middleware related we’re just using the MCollective libraries. The beauty of this code is that if we later wish to employ a different middleware or different security system or configure our middleware connections to use TLS to ActiveMQ nothing has to change here. All the hard stuff is done in MCollective config and libraries.
In this specific case I am using the SSL plugin for MCollective so the message is signed so no-one can edit the results in a MITM attack on the monitoring system. This came for free I didn’t have to write any code here to get this ability – just use MCollective.
Scheduling Nagios Checks and scaling them with MCollective
Now that we have a way to receive check results from the network lets look at how we can initiate checks. I’ll use the very awesome Rufus Scheduler Gem for this.
I want to create something simple that reads a simple config file of checks and repeatedly request my nodes – possibly matching mcollective filters – to do NRPE checks. Here’s a sample checks file:
nrpe "check_load", "1m", "monitored_by=monitor1"
nrpe "check_swap", "1m", "monitored_by=monitor1"
nrpe "check_disks", "1m", "monitored_by=monitor1"
nrpe "check_bacula_main", "6h", "bacula::node monitored_by=monitor1" |
nrpe "check_load", "1m", "monitored_by=monitor1"
nrpe "check_swap", "1m", "monitored_by=monitor1"
nrpe "check_disks", "1m", "monitored_by=monitor1"
nrpe "check_bacula_main", "6h", "bacula::node monitored_by=monitor1"
This will check load, swap and disks on all machines monitored by this monitoring box and do a bacula backup check on machines that has the bacula::node class included via puppet.
Here’s a simple bit of code that takes the above file and schedules the checks:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
require 'rubygems'
require 'mcollective'
require 'rufus/scheduler'
# (ab)use mcollective logger...
Log = MCollective::Log
class Scheduler
include MCollective::RPC
def initialize(destination, checks)
@destination = destination
@jobs = []
@scheduler = Rufus::Scheduler.start_new
@nrpe = rpcclient("nrpe")
# this is where the magic happens, send all the results to the receiver...
@nrpe.reply_to = destination
instance_eval(File.read(checks))
end
# helper to schedule checks, this will create rufus jobs that does NRPE requests
def nrpe(command, interval, filter=nil)
options = {:first_in => "%ss" % rand(Rufus.parse_time_string(interval)),
:blocking => true}
Log.info("Adding a job for %s every %s matching '%s', first in %s" % [command, interval, filter, options[:first_in]])
@jobs << @scheduler.every(interval.to_s, options) do
Log.info("Publishing request for %s with filter '%s'" % [command, filter])
@nrpe.reset_filter
@nrpe.filter = parse_filter(filter)
@nrpe.runcommand(:command => command.to_s)
end
end
def parse_filter(filter)
new_filter = MCollective::Util.empty_filter
return new_filter unless filter
filter.split(" ").each do |filter|
begin
fact_parsed = MCollective::Util.parse_fact_string(filter)
new_filter["fact"] << fact_parsed
rescue
new_filter["cf_class"] << filter
end
end
new_filter
end
def join
@scheduler.join
end
end
s = Scheduler.new("/queue/mcollective.nagios_passive_results", "checks.txt")
s.join |
require 'rubygems'
require 'mcollective'
require 'rufus/scheduler'
# (ab)use mcollective logger...
Log = MCollective::Log
class Scheduler
include MCollective::RPC
def initialize(destination, checks)
@destination = destination
@jobs = []
@scheduler = Rufus::Scheduler.start_new
@nrpe = rpcclient("nrpe")
# this is where the magic happens, send all the results to the receiver...
@nrpe.reply_to = destination
instance_eval(File.read(checks))
end
# helper to schedule checks, this will create rufus jobs that does NRPE requests
def nrpe(command, interval, filter=nil)
options = {:first_in => "%ss" % rand(Rufus.parse_time_string(interval)),
:blocking => true}
Log.info("Adding a job for %s every %s matching '%s', first in %s" % [command, interval, filter, options[:first_in]])
@jobs << @scheduler.every(interval.to_s, options) do
Log.info("Publishing request for %s with filter '%s'" % [command, filter])
@nrpe.reset_filter
@nrpe.filter = parse_filter(filter)
@nrpe.runcommand(:command => command.to_s)
end
end
def parse_filter(filter)
new_filter = MCollective::Util.empty_filter
return new_filter unless filter
filter.split(" ").each do |filter|
begin
fact_parsed = MCollective::Util.parse_fact_string(filter)
new_filter["fact"] << fact_parsed
rescue
new_filter["cf_class"] << filter
end
end
new_filter
end
def join
@scheduler.join
end
end
s = Scheduler.new("/queue/mcollective.nagios_passive_results", "checks.txt")
s.join
When I run it I get:
% ruby schedule.rb
info 2012/08/19 13:06:46: activemq.rb:96:in `on_connecting' TCP Connection attempt 0 to stomp://nagios@stomp:6163
info 2012/08/19 13:06:46: activemq.rb:101:in `on_connected' Conncted to stomp://nagios@stomp:6163
info 2012/08/19 13:06:46: schedule.rb:25:in `nrpe' Adding a job for check_load every 1m matching 'monitored_by=monitor1', first in 36s
info 2012/08/19 13:06:46: schedule.rb:25:in `nrpe' Adding a job for check_swap every 1m matching 'monitored_by=monitor1', first in 44s
info 2012/08/19 13:06:46: schedule.rb:25:in `nrpe' Adding a job for check_disks every 1m matching 'monitored_by=monitor1', first in 43s
info 2012/08/19 13:06:46: schedule.rb:25:in `nrpe' Adding a job for check_bacula_main every 6h matching 'bacula::node monitored_by=monitor1', first in 496s
info 2012/08/19 13:07:22: schedule.rb:28:in `nrpe' Publishing request for check_load with filter 'monitored_by=monitor1'
info 2012/08/19 13:07:29: schedule.rb:28:in `nrpe' Publishing request for check_disks with filter 'monitored_by=monitor1'
info 2012/08/19 13:07:30: schedule.rb:28:in `nrpe' Publishing request for check_swap with filter 'monitored_by=monitor1'
info 2012/08/19 13:08:22: schedule.rb:28:in `nrpe' Publishing request for check_load with filter 'monitored_by=monitor1' |
% ruby schedule.rb
info 2012/08/19 13:06:46: activemq.rb:96:in `on_connecting' TCP Connection attempt 0 to stomp://nagios@stomp:6163
info 2012/08/19 13:06:46: activemq.rb:101:in `on_connected' Conncted to stomp://nagios@stomp:6163
info 2012/08/19 13:06:46: schedule.rb:25:in `nrpe' Adding a job for check_load every 1m matching 'monitored_by=monitor1', first in 36s
info 2012/08/19 13:06:46: schedule.rb:25:in `nrpe' Adding a job for check_swap every 1m matching 'monitored_by=monitor1', first in 44s
info 2012/08/19 13:06:46: schedule.rb:25:in `nrpe' Adding a job for check_disks every 1m matching 'monitored_by=monitor1', first in 43s
info 2012/08/19 13:06:46: schedule.rb:25:in `nrpe' Adding a job for check_bacula_main every 6h matching 'bacula::node monitored_by=monitor1', first in 496s
info 2012/08/19 13:07:22: schedule.rb:28:in `nrpe' Publishing request for check_load with filter 'monitored_by=monitor1'
info 2012/08/19 13:07:29: schedule.rb:28:in `nrpe' Publishing request for check_disks with filter 'monitored_by=monitor1'
info 2012/08/19 13:07:30: schedule.rb:28:in `nrpe' Publishing request for check_swap with filter 'monitored_by=monitor1'
info 2012/08/19 13:08:22: schedule.rb:28:in `nrpe' Publishing request for check_load with filter 'monitored_by=monitor1'
All the checks are loaded, they are splayed a bit so they don’t cause a thundering herd and you can see the schedule is honoured. In my nagios logs I can see the passive results being submitted by the receiver.
MCollective NRPE Scaler
So taking these ideas I’ve knocked up a project that does this with some better code than above, it’s still in progress and I’ll blog later about it. For now you can check out the code on GitHub it includes all of the above but integrated better and should serve as a more complete example than I can realistically post on a blog post.
There are many advantages to this method that comes specifically from combining MCollective and Nagios. The Nagios scheduler visit hosts one by one meaning you get this moving view of status over a 5 minute resolution. Using MCollective to request the check on all your hosts means you get a 1 second resolution – all the load averages Nagios sees are from the same narrow time period. Receiving results on a queue has scaling benefits and the MCollective libraries are already multi broker aware and supports failover to standby brokers which means this isn’t a single point of failure.
Conclusion
So we’ve seen that we can reuse much of the MCollective internals and plugin system to setup a dedicated receiver of MCollective produced data and I’ve shown a simple use case where we’re requesting data from our managed nodes.
Today what I showed kept the request-response model but split the traditional MCollective client into two. One part scheduling requests and another part processing results. These parts could even be on different machines.
We can take this further and simply connect 2 bits of code together and flow arbitrary data between them but securing the communications using the MCollective protocol. A follow up blog post will look at that.
by R.I. Pienaar | Jul 23, 2012 | Code
This ia a post in a series of posts I am doing about MCollective 2.0 and later.
We’ve discussed Direct Addressing Mode before and today I’ll show one of the new features this mode enables.
Overview
MCollective is very fast which is great usually. Sometimes though when you’re restarting webservers the speed and concurrency can be a problem. Restarting all your webservers at the same time is generally a bad idea.
In the past the general way to work around this was using a fact like cluster=a to cut your server estate into named groups and then only address them based on that. This worked OK but was clearly not the best possibly outcome.
Apart from this the concurrency also meant that once a request is sent you cannot ^C out of it. Any mistake made is final and processing cannot be interrupted.
Since MCollective 2.0 has the ability to address nodes directly without broadcasting it has become much easier to come up with a good solution to these problems. You can now construct RPC requests targeted at 100s of nodes but ask MCollective to communicate with them in smaller batches with a configurable sleep in between batches. You can ^C at any time and only batches that has already received requests will be affected.
Using on the CLI
Using this feature on the CLI is pretty simple, all RPC clients have some new CLI options:
% mco service restart httpd --batch 10 --batch-sleep 2
Discovering hosts using the mongo method .... 26
* [============================================================> ] 26 / 26
.
.
.
Finished processing 26 / 26 hosts in 6897.66 ms |
% mco service restart httpd --batch 10 --batch-sleep 2
Discovering hosts using the mongo method .... 26
* [============================================================> ] 26 / 26
.
.
.
Finished processing 26 / 26 hosts in 6897.66 ms
What you will see when running it on the CLI is that the progress bar will progress in groups of 10, pause 2 seconds and then do the next 10. In this case you could ^C at any time and only the machines in earlier batches and the 10 of the current batches will have restarted, future nodes would not yet be affected in any way.
Under the hood MCollective detects that you want to do batching then force the system into Direct Addressing Mode and makes batches of requests. The requestid stays the same throughout, auditing works, results work exactly as before and display behaviour does not change apart from progressing in steps.
Using in code
Naturally you can also use this from your own code, here’s a simple script that does the same thing as above.
1
2
3
4
5
6
7
8
9
10
11
|
#!/usr/bin/ruby
require 'mcollective'
include MCollective::RPC
svcs = rpcclient("service")
svcs.batch_size = 10
svcs.batch_sleep_time = 2
printrpc svcs.restart(:service => "httpd") |
#!/usr/bin/ruby
require 'mcollective'
include MCollective::RPC
svcs = rpcclient("service")
svcs.batch_size = 10
svcs.batch_sleep_time = 2
printrpc svcs.restart(:service => "httpd")
The key lines here are lines 8 and 9 that has the same behaviour as –batch and –batch-sleep
by R.I. Pienaar | Jul 6, 2012 | Code
This ia a post in a series of posts I am doing about MCollective 2.0 and later.
In my previous post I detailed how you can extend the scope of the information MCollective has available to it about a node using Data Plugins, this was node side plugins today we’ll look at ones that runs on the client.
Background
Using the network as your source of truth works for a certain style of application but as I pointed out in an earlier post there are kinds of application where that is not appropriate. If you want to build a deployer that rolls out the next version of your software you probably want to provide it with a list of nodes rather than have it discover against the network, this way you know when a deploy failed because a node is down rather than it just not being discovered.
These plugins give you the freedom of choice to discover against anything that can give back a list of nodes with mcollective identities. Examples are databases, CMDBs, something like Noah or Zookeeper etc.
To get this to work requires Direct Addressing, I’ll recap an example from the linked post:
c = rpcclient("service")
c.discover :nodes => File.readline("hosts.txt").map {|i| i.chomp}
printrpc c.restart(:service => "httpd") |
c = rpcclient("service")
c.discover :nodes => File.readline("hosts.txt").map {|i| i.chomp}
printrpc c.restart(:service => "httpd")
In this example MCollective is reading hosts.txt and using that as the source of truth and attempts to communicate only with the hosts discovered against that file. This, as was covered in the previous post, is in stark contrast with MCollective 1.x that had no choice but to use the network as source of truth.
Building on this we’ve built a plugin system that abstracts this away into plugins that you can use on the CLI, web etc – once activated the MCollective usage on the CLI and any existing code can use these plugins without code change.
Using Discovery Plugins
Using these plugins is the same as you’d always do discovery, in fact as of version 2.1.0 if you use mcollective you’re already using this plugin, lets see:
% mco rpc rpcutil ping
Discovering hosts using the mc method for 2 second(s) .... 26
* [============================================================> ] 26 / 26
.
.
---- rpcutil#ping call stats ----
Nodes: 26 / 26
Pass / Fail: 26 / 0
Start Time: Fri Jul 06 09:47:06 +0100 2012
Discovery Time: 2002.07ms
Agent Time: 311.14ms
Total Time: 2313.21ms |
% mco rpc rpcutil ping
Discovering hosts using the mc method for 2 second(s) .... 26
* [============================================================> ] 26 / 26
.
.
---- rpcutil#ping call stats ----
Nodes: 26 / 26
Pass / Fail: 26 / 0
Start Time: Fri Jul 06 09:47:06 +0100 2012
Discovery Time: 2002.07ms
Agent Time: 311.14ms
Total Time: 2313.21ms
Notice the discovery message says it is using the “mc” method, this is the traditional broadcast mode as before, it’s the default mode and will remain the default mode.
Lets look at the generic usage of the hosts.txt above:
% mco rpc rpcutil ping --nodes hosts.txt -v
Discovering hosts using the flatfile method .... 9
* [============================================================> ] 9 / 9
.
.
---- rpcutil#ping call stats ----
Nodes: 9 / 9
Pass / Fail: 9 / 0
Start Time: Fri Jul 06 09:48:15 +0100 2012
Discovery Time: 0.40ms
Agent Time: 34.62ms
Total Time: 35.01ms |
% mco rpc rpcutil ping --nodes hosts.txt -v
Discovering hosts using the flatfile method .... 9
* [============================================================> ] 9 / 9
.
.
---- rpcutil#ping call stats ----
Nodes: 9 / 9
Pass / Fail: 9 / 0
Start Time: Fri Jul 06 09:48:15 +0100 2012
Discovery Time: 0.40ms
Agent Time: 34.62ms
Total Time: 35.01ms
Note the change in the discovery message, it is now using the flatfile discovery method and doesn’t have a timeout. Take a look at the Discovery Time statistic, the flatfile example took a fraction of a second vs the usual 2 seconds spent discovering.
There’s a longer form of the above command:
% mco rpc rpcutil ping --disc-method flatfile --disc-option hosts.txt
Discovering hosts using the flatfile method .... 9
.
. |
% mco rpc rpcutil ping --disc-method flatfile --disc-option hosts.txt
Discovering hosts using the flatfile method .... 9
.
.
So you can pick a discovery method and they can take options. You can figure out what plugins you have available to you using the plugin application:
% mco plugin doc
Please specify a plugin. Available plugins are:
.
.
Discovery Methods:
flatfile Flatfile based discovery for node identities
mc MCollective Broadcast based discovery
mongo MongoDB based discovery for databases built using registration
puppetdb PuppetDB based discovery |
% mco plugin doc
Please specify a plugin. Available plugins are:
.
.
Discovery Methods:
flatfile Flatfile based discovery for node identities
mc MCollective Broadcast based discovery
mongo MongoDB based discovery for databases built using registration
puppetdb PuppetDB based discovery
And more information about a plugin can be seen:
% mco plugin doc mc
MCollective Broadcast based discovery
Author: R.I.Pienaar <rip@devco.net>
Version: 0.1
License: ASL 2.0
Timeout: 2
Home Page: http://marionette-collective.org/
DISCOVERY METHOD CAPABILITIES:
Filter based on configuration management classes
Filter based on system facts
Filter based on mcollective identity
Filter based on mcollective agents
Compound filters combining classes and facts |
% mco plugin doc mc
MCollective Broadcast based discovery
Author: R.I.Pienaar <rip@devco.net>
Version: 0.1
License: ASL 2.0
Timeout: 2
Home Page: http://marionette-collective.org/
DISCOVERY METHOD CAPABILITIES:
Filter based on configuration management classes
Filter based on system facts
Filter based on mcollective identity
Filter based on mcollective agents
Compound filters combining classes and facts
The discovery methods have capabilities that declare what they can do. The flatfile one for example has no idea about classes, facts etc so it’s capabilities would only be identity filters.
If you decide to always use a different plugin than mc as your discovery source you can set it in client.cfg:
default_discovery_method = mongo |
default_discovery_method = mongo
The RPC api obviously can also choose method and supply options, below code forces the flatfile mode:
c = rpcclient("service")
c.discovery_method = "flatfile"
c.discovery_options << "hosts.txt"
printrpc c.restart(:service => "httpd") |
c = rpcclient("service")
c.discovery_method = "flatfile"
c.discovery_options << "hosts.txt"
printrpc c.restart(:service => "httpd")
This has the same effect as mco rpc service restart service=httpd –dm=flatfile –do=hosts.txt
Writing a Plugin
We’ll look at the simplest plugin which is the flatfile one, this plugin ships with MCollective but it’s a good example.
This plugin will let you issue commands like:
% mco service restart httpd
% mco service restart httpd -I some.host
% mco service restart httpd -I /domain/ -I /otherdomain/ |
% mco service restart httpd
% mco service restart httpd -I some.host
% mco service restart httpd -I /domain/ -I /otherdomain/
So your basic identity filters with regular expression support or just all hosts.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
module MCollective
class Discovery
class Flatfile
def self.discover(filter, timeout, limit=0, client=nil)
unless client.options[:discovery_options].empty?
file = client.options[:discovery_options].first
else
raise "The flatfile discovery method needs a path to a text file"
end
raise "Cannot read the file %s specified as discovery source" % file unless File.readable?(file)
discovered = []
hosts = File.readlines(file).map{|l| l.chomp}
unless filter["identity"].empty?
filter["identity"].each do |identity|
identity = Regexp.new(identity.gsub("\/", "")) if identity.match("^/")
if identity.is_a?(Regexp)
discovered = hosts.grep(identity)
elsif hosts.include?(identity)
discovered << identity
end
end
else
discovered = hosts
end
discovered
end
end
end
end |
module MCollective
class Discovery
class Flatfile
def self.discover(filter, timeout, limit=0, client=nil)
unless client.options[:discovery_options].empty?
file = client.options[:discovery_options].first
else
raise "The flatfile discovery method needs a path to a text file"
end
raise "Cannot read the file %s specified as discovery source" % file unless File.readable?(file)
discovered = []
hosts = File.readlines(file).map{|l| l.chomp}
unless filter["identity"].empty?
filter["identity"].each do |identity|
identity = Regexp.new(identity.gsub("\/", "")) if identity.match("^/")
if identity.is_a?(Regexp)
discovered = hosts.grep(identity)
elsif hosts.include?(identity)
discovered << identity
end
end
else
discovered = hosts
end
discovered
end
end
end
end
Past the basic boiler plate in lines 5 to 11 we deal with the discovery options, you’ll notice discovery options is an array so users can call –disc-option many times and each call just gets appended to this array. We’ll just take one flat file and raise if you didn’t pass a file or if the file can’t be read.
Lines 13 and 14 sets up a empty array where the selected nodes will go into and reads all the hosts found in the file.
Lines 16 and 17 checks if we got anything in the identity filter, if it was not we set the discovered list to all hosts in the file in line 27. The filters are arrays so in the case of multiple -I passed you will have multiple entries here, line 17 loops all the filters. You do not need to worry about someone accidentally setting a Class filter as MCollective will know from the DDL that you are incapable of doing class filters and will just not call your plugin with those.
The body of the loop in lines 18 to 25 just does regular expression matching or exact matching over the list and if anything is found it gets added to the discovered list.
In the end we just return the list of discovered nodes, you do not need to worry about duplicates in the list or sorting it or anything.
As there were automatic documentation generated and input validation done you need to create a DDL file that describes the plugin and the data it can accept and return, here’s the DDL for this plugin:
1
2
3
4
5
6
7
8
9
10
11
|
metadata :name => "flatfile",
:description => "Flatfile based discovery for node identities",
:author => "R.I.Pienaar <rip@devco.net>",
:license => "ASL 2.0",
:version => "0.1",
:url => "http://marionette-collective.org/",
:timeout => 0
discovery do
capabilities :identity
end |
metadata :name => "flatfile",
:description => "Flatfile based discovery for node identities",
:author => "R.I.Pienaar <rip@devco.net>",
:license => "ASL 2.0",
:version => "0.1",
:url => "http://marionette-collective.org/",
:timeout => 0
discovery do
capabilities :identity
end
The meta block is familiar – set timeout to 0 if there’s no timeout and then MCollective will not inform the user about a timeout in the discovery message. Lines 9 to 11 declares the capabilities, possible capabilities are :classes, :facts, :identity, :agents, :compound. Technically :compound isn’t usable by your plugins as MCollective will force the mc plugin when you use any -S filters as those might contain references to data plugins that has to be done using the nodes as source of truth.
Finally store this in a directory like below and you can package it into a RPM or a Deb:
% tree flatfile
flatfile
โโโ discovery
โโโ flatfile.ddl
โโโ flatfile.rb
% cd flatfile
% mco plugin package
Created package mcollective-flatfile-discovery
% ls -l *rpm
-rw-rw-r-- 1 rip rip 2893 Jul 6 10:20 mcollective-flatfile-discovery-0.1-1.noarch.rpm |
% tree flatfile
flatfile
โโโ discovery
โโโ flatfile.ddl
โโโ flatfile.rb
% cd flatfile
% mco plugin package
Created package mcollective-flatfile-discovery
% ls -l *rpm
-rw-rw-r-- 1 rip rip 2893 Jul 6 10:20 mcollective-flatfile-discovery-0.1-1.noarch.rpm
Install this plugin to all your clients and it will be available to use, if you do not want to use the packages just dump the files in $libdir/discovery/.
Available Plugins
There are a few plugins available now, you saw the mc and flatfile ones here.
If you use the MongoDB based discovery system there is a fully capable discovery plugin that can work against a local MongoDB instance. This plugin has all the capabilities possible with full regular expression support and full sub collective support. I use this as my default discovery method now.
We’re also working on a PuppetDB one, it is not quite ready to publish as I am waiting for PuppetDB to get wildcard support. And finally there is a community plugin that discovers using Elastic Search.
Conclusion
These plugins conclude the big rework done on MCollective discovery. You can now mix and match any source of truth you like even ones we as MCollective developers are not aware of as you can write your own plugin.
Use the network when appropriate, use databases or flat files when appropriate and you can switch freely between modes during the life of a single application.
Using these plugins is fun as they can be extremely fast. The short 1 minute video embedded below (click here if its not shown) shows the mco, puppetdb and mongodb plugins in action.
Version 2.1.0 made these plugins available, we’re looking to bump the Production branch to support these soon.