Compare commits

2 Commits

Author SHA1 Message Date
Pedro Melo
54c2964390 Prepare Client.pm to use Lookupd
Signed-off-by: Pedro Melo <melo@simplicidade.org>
2014-09-19 13:53:47 +01:00
Pedro Melo
bb2aa6e247 Added initial Lookupd client code
Signed-off-by: Pedro Melo <melo@simplicidade.org>
2014-09-19 13:53:34 +01:00
3 changed files with 197 additions and 1 deletions

50
examples/lookupd_monitor.pl Executable file
View File

@@ -0,0 +1,50 @@
#!/usr/bin/env perl
use strict;
use warnings;
use FindBin;
use lib "$FindBin::Bin/../lib";
use AnyEvent;
use AnyEvent::NSQ::Lookupd;
use Getopt::Long;
my ($help, $verbose, $interval);
GetOptions('help' => \$help, 'verbose' => \$verbose, 'interval=i' => \$interval) or usage();
usage() if $help;
my ($topic, @nsqlookupds) = @ARGV;
usage("topic is a required parameter") unless $topic and length($topic);
usage("at least one lookupd address is required") unless @nsqlookupds;
my $cv = AE::cv;
my $l = AnyEvent::NSQ::Lookupd->new(
topic => $topic,
pooling_interval => $interval,
lookupd_http_addresses => \@nsqlookupds,
add_nsqd_cb => sub { print "$topic: added $_[0], version $_[1]{version}\n" },
drop_nsqd_cb => sub { print "$topic: dropped $_[0], version $_[1]{version}\n" },
);
$cv->recv;
sub usage {
print "Error: @_\n" if @_;
print <<" EOU";
Usage: lookupd_monitor.pl [options] topic lookup_addresses...
Monitors a set of nsqlookupd's and lists all nsqd's that produce a particular topic.
Options:
--help or -h Prints this message and exits
--interval=INT or -i=INT Sets the polling interval
EOU
exit(1);
}

View File

@@ -10,6 +10,7 @@ use AnyEvent;
use AnyEvent::Socket (); use AnyEvent::Socket ();
use Carp 'croak'; use Carp 'croak';
use AnyEvent::NSQ::Connection; use AnyEvent::NSQ::Connection;
use AnyEvent::NSQ::Lookupd;
#### Public API #### Public API
@@ -109,7 +110,18 @@ sub _start_nsqd_connections {
} }
## nsqlookupd support - not there yet ## nsqlookupd support - not there yet
sub _start_lookupd_poolers { } sub _start_lookupd_poolers {
my ($self) = @_;
## FIXME: Given that topic is required, maybe this should move to Reader?
$self->{lookupd_poller} = AnyEvent::NSQ::Lookupd->new(
topic => $self->{topic},
lookupd_http_addresses => $self->{lookupd_http_addresses},
add_nsqd_cb => sub { $self->_start_nsqd_connection($_[2], nsqd_id => $_[1]) },
drop_nsqd_cb => sub { $self->_drop_nsqd_connection($_[1]) },
);
}
#### nsqd pool connection management #### nsqd pool connection management
@@ -138,6 +150,11 @@ sub _start_nsqd_connection {
return; return;
} }
## drop all connections for a specific nsqd
sub _drop_nsqd_connection {
}
## return one connection that is connected ## return one connection that is connected
sub _random_connected_conn { sub _random_connected_conn {
## FIXME: yeah, Sony-style random going on :) ## FIXME: yeah, Sony-style random going on :)

129
lib/AnyEvent/NSQ/Lookupd.pm Normal file
View File

@@ -0,0 +1,129 @@
package AnyEvent::NSQ::Lookupd;
# ABSTRACT: nsqlookupd pooler
# VERSION
# AUTHORITY
use strict;
use warnings;
use AnyEvent;
use AnyEvent::HTTP;
sub new {
my ($class, %args) = @_;
my $self = bless { producers => {}, sources => {}, pooling_interval => 60 }, $class;
for my $p (qw( topic lookupd_http_addresses add_nsqd_cb drop_nsqd_cb )) {
next unless exists $args{$p} and defined $args{$p};
$self->{$p} = delete $args{$p};
croak(qq{FATAL: parameter '$p' must be a CodeRef}) if $p =~ m{_cb$} and ref($self->{$p}) ne 'CODE';
}
$self->{pooling_interval} = delete($args{pooling_interval})
if exists $args{pooling_interval} and $args{pooling_interval} > 0;
$self->{lookupd_http_addresses} = [$self->{lookupd_http_addresses}]
unless ref($self->{lookupd_http_addresses}) eq 'ARRAY';
$self->_start_poller;
return $self;
}
sub poll {
my ($self) = @_;
my $urls = $self->_get_polling_urls;
for my $url (@$urls) {
http_get $url, sub { $self->_parse_lookup_response($url, $_[0]) if defined $_[0] };
}
return;
}
sub start {
my ($self);
$self->{poller_timer} = AE::timer 0, $self->{pooling_interval}, sub { $self->poll() };
}
sub stop { delete $_[0]->{poller_timer} }
sub is_running { return exists $_[0]->{poller_timer} }
#### The actual work...
sub _get_polling_urls {
my ($self) = @_;
my @urls;
for my $address (@{ $self->{lookupd_http_addresses} }) {
$address = "http://$address" unless $address =~ m/^https?:/;
push @urls, "$address/lookup?topic=$self->{topic}";
}
return \@urls;
}
sub _parse_lookup_response {
my ($self, $source, $body) = @_;
my $json = eval { JSON::XS::decode_json($body) };
return unless $json;
return unless $json->{status_code} and $json->{status_code} == 200;
$self->_update_nsqd_list_for_source($source, $json->{data});
}
sub _update_nsqd_list_for_source {
my ($self, $source, $data) = @_;
my $source_data = $self->{sources}{$source} = {};
for my $producer (@{ $data->{producers} || [] }) {
my $bcast_addr = $producer->{broadcast_address};
my $tcp_port = $producer->{tcp_port};
next unless $bcast_addr and $tcp_port;
my $nsqd_id = "$bcast_addr:$tcp_port";
$source_data->{$nsqd_id} = {
nsqd_id => $nsqd_id,
info => $producer,
};
}
$self->_merge_sources();
}
sub _merge_sources {
my ($self) = @_;
my %merged_producers;
for my $source_producers (values %{ $self->{sources} }) {
for my $producer (values %$source_producers) {
$merged_producers{ $producer->{nsqd_id} } = $producer;
}
}
$self->_detect_changes_producers(\%merged_producers);
}
sub _detect_changes_producers {
my ($self, $merged_producers) = @_;
my $producers = $self->{producers};
## detect dropped nsqd's
for my $nsqd_id (keys %$producers) {
next if exists $merged_producers->{$nsqd_id};
$self->{drop_nsqd_cb}->($self, $nsqd_id, delete $producers->{$nsqd_id});
}
## add the new ones
for my $nsqd_id (keys %$merged_producers) {
next if exists $producers->{$nsqd_id}
$self->{add_nsqd_cb}->($self, $nsqd_id, $producers->{$nsqd_id} = $merged_producers->{$nsqd_id});
}
}
1;