From d807d361f33487a58e1ec328c26eaa0ad2145c02 Mon Sep 17 00:00:00 2001 From: Pedro Melo Date: Mon, 21 Jul 2014 05:47:33 +0100 Subject: [PATCH] Add --stats support for consumer: Gives stats over the last 10 seconds, plus global rate. Use with http_publisher.pl to test your consumer performance. Signed-off-by: Pedro Melo --- examples/consumer.pl | 38 ++++++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/examples/consumer.pl b/examples/consumer.pl index a4dd610..be4751e 100755 --- a/examples/consumer.pl +++ b/examples/consumer.pl @@ -7,9 +7,10 @@ use lib "$FindBin::Bin/../lib"; use AnyEvent; use AnyEvent::NSQ::Reader; use Getopt::Long; +use Time::HiRes qw( gettimeofday tv_interval ); -my ($topic, $channel, $help, $verbose, $print); -GetOptions('help' => \$help, 'verbose' => \$verbose, 'print' => \$print) or usage(); +my ($topic, $channel, $help, $verbose, $print, $stats); +GetOptions('help' => \$help, 'verbose' => \$verbose, 'print' => \$print, 'stats' => \$stats) or usage(); usage() if $help; ($topic, $channel) = @ARGV; @@ -20,21 +21,46 @@ my $cv = AE::cv; ## return undef => mark_as_done_msg() my $message_cb = $print ? sub { print "$_[1]{message}\n"; return } : sub {return}; -my $t = 0; +my $t = my $p = 0; my $r = AnyEvent::NSQ::Reader->new( topic => $topic, channel => $channel, nsqd_tcp_addresses => '127.0.0.1', client_id => "${channel}_consumer/pid_$$", - message_cb => sub { $t++; $message_cb->(@_, $t) }, + message_cb => sub { $t++; $p++; $message_cb->(@_) }, error_cb => sub { warn "$_[1]\n" if $verbose }, disconnect_cb => sub { warn "Disconnected after $t total messages... exiting...\n" if $verbose; $cv->send }, ); -my $term_sgn = AE::signal TERM => sub { $r->disconnect }; -my $int_sgn = AE::signal INT => sub { $r->disconnect }; +if ($stats) { + warn "Stats enabled, printed every 10 seconds\n"; + my $t0 = my $p0 = [gettimeofday]; + $stats = AE::timer 10, 10, sub { + my $now = [gettimeofday]; + my $elapsed_t = tv_interval($t0, $now); + my $elapsed_p = tv_interval($p0, $now); + + my $uptime = ''; + my ($h, $m); + if ($h = int($elapsed_t / 3600)) { $uptime .= "${h}h" } + if ($m = int(($elapsed_t - $h * 3600) / 60)) { $uptime .= "${m}m" } + $uptime .= int($elapsed_t - $h * 3600 - $m * 60) . 's'; + + warn sprintf( + 'Stats: %0.3f mesgs/sec for the past %0.3f seconds, total messages %d, global rate %0.3f, uptime %s%s', + $p / $elapsed_p, + $elapsed_p, $t, $t / $elapsed_t, $uptime,"\n" + ); + + $p = 0; + $p0 = $now; + }; +} + +my $term_sgn = AE::signal TERM => sub { $r->disconnect; undef $stats }; +my $int_sgn = AE::signal INT => sub { $r->disconnect; undef $stats }; $cv->recv;