Add --stats support for consumer:

Gives stats over the last 10 seconds, plus global rate.

Use with http_publisher.pl to test your consumer performance.

Signed-off-by: Pedro Melo <melo@simplicidade.org>
This commit is contained in:
Pedro Melo
2014-07-21 05:47:33 +01:00
parent b69bd4a19b
commit d807d361f3

View File

@@ -7,9 +7,10 @@ use lib "$FindBin::Bin/../lib";
use AnyEvent;
use AnyEvent::NSQ::Reader;
use Getopt::Long;
use Time::HiRes qw( gettimeofday tv_interval );
my ($topic, $channel, $help, $verbose, $print);
GetOptions('help' => \$help, 'verbose' => \$verbose, 'print' => \$print) or usage();
my ($topic, $channel, $help, $verbose, $print, $stats);
GetOptions('help' => \$help, 'verbose' => \$verbose, 'print' => \$print, 'stats' => \$stats) or usage();
usage() if $help;
($topic, $channel) = @ARGV;
@@ -20,21 +21,46 @@ my $cv = AE::cv;
## return undef => mark_as_done_msg()
my $message_cb = $print ? sub { print "$_[1]{message}\n"; return } : sub {return};
my $t = 0;
my $t = my $p = 0;
my $r = AnyEvent::NSQ::Reader->new(
topic => $topic,
channel => $channel,
nsqd_tcp_addresses => '127.0.0.1',
client_id => "${channel}_consumer/pid_$$",
message_cb => sub { $t++; $message_cb->(@_, $t) },
message_cb => sub { $t++; $p++; $message_cb->(@_) },
error_cb => sub { warn "$_[1]\n" if $verbose },
disconnect_cb => sub { warn "Disconnected after $t total messages... exiting...\n" if $verbose; $cv->send },
);
my $term_sgn = AE::signal TERM => sub { $r->disconnect };
my $int_sgn = AE::signal INT => sub { $r->disconnect };
if ($stats) {
warn "Stats enabled, printed every 10 seconds\n";
my $t0 = my $p0 = [gettimeofday];
$stats = AE::timer 10, 10, sub {
my $now = [gettimeofday];
my $elapsed_t = tv_interval($t0, $now);
my $elapsed_p = tv_interval($p0, $now);
my $uptime = '';
my ($h, $m);
if ($h = int($elapsed_t / 3600)) { $uptime .= "${h}h" }
if ($m = int(($elapsed_t - $h * 3600) / 60)) { $uptime .= "${m}m" }
$uptime .= int($elapsed_t - $h * 3600 - $m * 60) . 's';
warn sprintf(
'Stats: %0.3f mesgs/sec for the past %0.3f seconds, total messages %d, global rate %0.3f, uptime %s%s',
$p / $elapsed_p,
$elapsed_p, $t, $t / $elapsed_t, $uptime,"\n"
);
$p = 0;
$p0 = $now;
};
}
my $term_sgn = AE::signal TERM => sub { $r->disconnect; undef $stats };
my $int_sgn = AE::signal INT => sub { $r->disconnect; undef $stats };
$cv->recv;