Compare commits
6 Commits
nsqlookupd
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f3fb375d10 | ||
|
|
1c5a3526fd | ||
|
|
f56271c965 | ||
|
|
681939809d | ||
|
|
dca911fee8 | ||
|
|
a08049ee0f |
@@ -1,50 +0,0 @@
|
||||
#!/usr/bin/env perl
|
||||
|
||||
use strict;
|
||||
use warnings;
|
||||
use FindBin;
|
||||
use lib "$FindBin::Bin/../lib";
|
||||
use AnyEvent;
|
||||
use AnyEvent::NSQ::Lookupd;
|
||||
use Getopt::Long;
|
||||
|
||||
my ($help, $verbose, $interval);
|
||||
GetOptions('help' => \$help, 'verbose' => \$verbose, 'interval=i' => \$interval) or usage();
|
||||
usage() if $help;
|
||||
|
||||
my ($topic, @nsqlookupds) = @ARGV;
|
||||
usage("topic is a required parameter") unless $topic and length($topic);
|
||||
usage("at least one lookupd address is required") unless @nsqlookupds;
|
||||
|
||||
my $cv = AE::cv;
|
||||
|
||||
my $l = AnyEvent::NSQ::Lookupd->new(
|
||||
topic => $topic,
|
||||
pooling_interval => $interval,
|
||||
|
||||
lookupd_http_addresses => \@nsqlookupds,
|
||||
|
||||
add_nsqd_cb => sub { print "$topic: added $_[0], version $_[1]{version}\n" },
|
||||
drop_nsqd_cb => sub { print "$topic: dropped $_[0], version $_[1]{version}\n" },
|
||||
);
|
||||
|
||||
$cv->recv;
|
||||
|
||||
|
||||
sub usage {
|
||||
print "Error: @_\n" if @_;
|
||||
|
||||
print <<" EOU";
|
||||
Usage: lookupd_monitor.pl [options] topic lookup_addresses...
|
||||
|
||||
Monitors a set of nsqlookupd's and lists all nsqd's that produce a particular topic.
|
||||
|
||||
Options:
|
||||
|
||||
--help or -h Prints this message and exits
|
||||
|
||||
--interval=INT or -i=INT Sets the polling interval
|
||||
EOU
|
||||
|
||||
exit(1);
|
||||
}
|
||||
@@ -10,7 +10,6 @@ use AnyEvent;
|
||||
use AnyEvent::Socket ();
|
||||
use Carp 'croak';
|
||||
use AnyEvent::NSQ::Connection;
|
||||
use AnyEvent::NSQ::Lookupd;
|
||||
|
||||
#### Public API
|
||||
|
||||
@@ -39,15 +38,27 @@ sub publish {
|
||||
my $conn = $self->_random_connected_conn;
|
||||
croak "ERROR: there no active connections at this moment," unless $conn;
|
||||
|
||||
my @args;
|
||||
if (ref($data[-1]) eq 'CODE' or !defined($data[-1])) {
|
||||
my $cb = pop @data;
|
||||
push @data, sub { $cb->($self, $topic, \@data, @_) }
|
||||
if $cb;
|
||||
|
||||
if ($cb) {
|
||||
my @cb_data = @data;
|
||||
push @data, sub { $cb->($self, $topic, \@cb_data, @_) }
|
||||
}
|
||||
}
|
||||
|
||||
return $conn->publish($topic, @data);
|
||||
}
|
||||
|
||||
sub ready {
|
||||
my ($self, $ready_count) = @_;
|
||||
|
||||
$_->{conn}->ready($ready_count) for values %{ $self->{nsqd_conns} };
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
#### Argument parsing
|
||||
|
||||
@@ -58,8 +69,8 @@ sub _parse_args {
|
||||
$self->{disconnect_cb} = delete($args->{disconnect_cb}) || sub { };
|
||||
$self->{error_cb} = delete($args->{error_cb}) || sub { croak($_[1]) };
|
||||
|
||||
$self->{connect_cb} = delete($args->{connect_cb}) || sub { };
|
||||
$self->{identify_cb} = delete($args->{identify_cb}) || sub { };
|
||||
$self->{connect_cb} = delete($args->{connect_cb}) || sub { };
|
||||
$self->{identify_cb} = delete($args->{identify_cb}) || sub { };
|
||||
|
||||
for my $arg (qw( client_id hostname connect_timeout )) {
|
||||
$self->{$arg} = delete($args->{$arg}) if exists $args->{$arg};
|
||||
@@ -110,18 +121,7 @@ sub _start_nsqd_connections {
|
||||
}
|
||||
|
||||
## nsqlookupd support - not there yet
|
||||
sub _start_lookupd_poolers {
|
||||
my ($self) = @_;
|
||||
|
||||
## FIXME: Given that topic is required, maybe this should move to Reader?
|
||||
$self->{lookupd_poller} = AnyEvent::NSQ::Lookupd->new(
|
||||
topic => $self->{topic},
|
||||
lookupd_http_addresses => $self->{lookupd_http_addresses},
|
||||
|
||||
add_nsqd_cb => sub { $self->_start_nsqd_connection($_[2], nsqd_id => $_[1]) },
|
||||
drop_nsqd_cb => sub { $self->_drop_nsqd_connection($_[1]) },
|
||||
);
|
||||
}
|
||||
sub _start_lookupd_poolers { }
|
||||
|
||||
|
||||
#### nsqd pool connection management
|
||||
@@ -150,11 +150,6 @@ sub _start_nsqd_connection {
|
||||
return;
|
||||
}
|
||||
|
||||
## drop all connections for a specific nsqd
|
||||
sub _drop_nsqd_connection {
|
||||
|
||||
}
|
||||
|
||||
## return one connection that is connected
|
||||
sub _random_connected_conn {
|
||||
## FIXME: yeah, Sony-style random going on :)
|
||||
|
||||
@@ -1,129 +0,0 @@
|
||||
package AnyEvent::NSQ::Lookupd;
|
||||
|
||||
# ABSTRACT: nsqlookupd pooler
|
||||
# VERSION
|
||||
# AUTHORITY
|
||||
|
||||
use strict;
|
||||
use warnings;
|
||||
use AnyEvent;
|
||||
use AnyEvent::HTTP;
|
||||
|
||||
sub new {
|
||||
my ($class, %args) = @_;
|
||||
my $self = bless { producers => {}, sources => {}, pooling_interval => 60 }, $class;
|
||||
|
||||
for my $p (qw( topic lookupd_http_addresses add_nsqd_cb drop_nsqd_cb )) {
|
||||
next unless exists $args{$p} and defined $args{$p};
|
||||
$self->{$p} = delete $args{$p};
|
||||
croak(qq{FATAL: parameter '$p' must be a CodeRef}) if $p =~ m{_cb$} and ref($self->{$p}) ne 'CODE';
|
||||
}
|
||||
|
||||
$self->{pooling_interval} = delete($args{pooling_interval})
|
||||
if exists $args{pooling_interval} and $args{pooling_interval} > 0;
|
||||
|
||||
$self->{lookupd_http_addresses} = [$self->{lookupd_http_addresses}]
|
||||
unless ref($self->{lookupd_http_addresses}) eq 'ARRAY';
|
||||
|
||||
$self->_start_poller;
|
||||
|
||||
return $self;
|
||||
}
|
||||
|
||||
sub poll {
|
||||
my ($self) = @_;
|
||||
my $urls = $self->_get_polling_urls;
|
||||
|
||||
for my $url (@$urls) {
|
||||
http_get $url, sub { $self->_parse_lookup_response($url, $_[0]) if defined $_[0] };
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
sub start {
|
||||
my ($self);
|
||||
|
||||
$self->{poller_timer} = AE::timer 0, $self->{pooling_interval}, sub { $self->poll() };
|
||||
}
|
||||
|
||||
sub stop { delete $_[0]->{poller_timer} }
|
||||
|
||||
sub is_running { return exists $_[0]->{poller_timer} }
|
||||
|
||||
|
||||
#### The actual work...
|
||||
|
||||
sub _get_polling_urls {
|
||||
my ($self) = @_;
|
||||
|
||||
my @urls;
|
||||
for my $address (@{ $self->{lookupd_http_addresses} }) {
|
||||
$address = "http://$address" unless $address =~ m/^https?:/;
|
||||
push @urls, "$address/lookup?topic=$self->{topic}";
|
||||
}
|
||||
|
||||
return \@urls;
|
||||
}
|
||||
|
||||
sub _parse_lookup_response {
|
||||
my ($self, $source, $body) = @_;
|
||||
|
||||
my $json = eval { JSON::XS::decode_json($body) };
|
||||
return unless $json;
|
||||
return unless $json->{status_code} and $json->{status_code} == 200;
|
||||
|
||||
$self->_update_nsqd_list_for_source($source, $json->{data});
|
||||
}
|
||||
|
||||
sub _update_nsqd_list_for_source {
|
||||
my ($self, $source, $data) = @_;
|
||||
my $source_data = $self->{sources}{$source} = {};
|
||||
|
||||
for my $producer (@{ $data->{producers} || [] }) {
|
||||
my $bcast_addr = $producer->{broadcast_address};
|
||||
my $tcp_port = $producer->{tcp_port};
|
||||
next unless $bcast_addr and $tcp_port;
|
||||
|
||||
my $nsqd_id = "$bcast_addr:$tcp_port";
|
||||
|
||||
$source_data->{$nsqd_id} = {
|
||||
nsqd_id => $nsqd_id,
|
||||
info => $producer,
|
||||
};
|
||||
}
|
||||
|
||||
$self->_merge_sources();
|
||||
}
|
||||
|
||||
sub _merge_sources {
|
||||
my ($self) = @_;
|
||||
|
||||
my %merged_producers;
|
||||
for my $source_producers (values %{ $self->{sources} }) {
|
||||
for my $producer (values %$source_producers) {
|
||||
$merged_producers{ $producer->{nsqd_id} } = $producer;
|
||||
}
|
||||
}
|
||||
|
||||
$self->_detect_changes_producers(\%merged_producers);
|
||||
}
|
||||
|
||||
sub _detect_changes_producers {
|
||||
my ($self, $merged_producers) = @_;
|
||||
my $producers = $self->{producers};
|
||||
|
||||
## detect dropped nsqd's
|
||||
for my $nsqd_id (keys %$producers) {
|
||||
next if exists $merged_producers->{$nsqd_id};
|
||||
$self->{drop_nsqd_cb}->($self, $nsqd_id, delete $producers->{$nsqd_id});
|
||||
}
|
||||
|
||||
## add the new ones
|
||||
for my $nsqd_id (keys %$merged_producers) {
|
||||
next if exists $producers->{$nsqd_id}
|
||||
$self->{add_nsqd_cb}->($self, $nsqd_id, $producers->{$nsqd_id} = $merged_producers->{$nsqd_id});
|
||||
}
|
||||
}
|
||||
|
||||
1;
|
||||
@@ -58,8 +58,8 @@ sub _identified {
|
||||
|
||||
## Keep the connection in the registry in case the user
|
||||
## wants to issue FIN/REQs inside the callback
|
||||
$self->{routing}->{$msg->{message_id}} = $conn;
|
||||
weaken( $self->{routing}->{$msg->{message_id}} );
|
||||
$self->{routing}->{ $msg->{message_id} } = $conn;
|
||||
weaken($self->{routing}->{ $msg->{message_id} });
|
||||
|
||||
$self->{message_cb}->($self, $msg);
|
||||
}
|
||||
@@ -71,40 +71,41 @@ sub _identified {
|
||||
sub mark_as_done_msg {
|
||||
my ($self, $msg) = @_;
|
||||
|
||||
my $conn = $self->_find_and_delete_message_connection($msg);
|
||||
my $conn = $self->_find_message_connection($msg, {delete => 1});
|
||||
|
||||
$conn->mark_as_done_msg($msg);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
sub requeue_msg {
|
||||
my ($self, $msg, $delay) = @_;
|
||||
|
||||
my $conn = $self->_find_and_delete_message_connection($msg);
|
||||
my $conn = $self->_find_message_connection($msg, {delete => 1});
|
||||
|
||||
$conn->requeue_msg($msg, $delay);
|
||||
return 1;
|
||||
}
|
||||
|
||||
sub touch_message {
|
||||
sub touch_msg {
|
||||
my ($self, $msg) = @_;
|
||||
|
||||
my $conn = $self->_find_and_delete_message_connection($msg);
|
||||
|
||||
my $conn = $self->_find_message_connection($msg);
|
||||
|
||||
$conn->touch_msg($msg);
|
||||
return 1;
|
||||
}
|
||||
*touch_message = \&touch_msg;
|
||||
|
||||
sub _find_and_delete_message_connection {
|
||||
my ($self, $msg) = @_;
|
||||
sub _find_message_connection {
|
||||
my ($self, $msg, $opts) = @_;
|
||||
$opts = {} unless ref $opts;
|
||||
|
||||
my $id = ref($msg) ? $msg->{message_id} : $msg;
|
||||
|
||||
my $conn = delete($self->{routing}->{$id});
|
||||
|
||||
if ( !$conn ) {
|
||||
croak "WARN: Could not find the connection to route msg $id";
|
||||
}
|
||||
my $conn = $self->{routing}{$id};
|
||||
delete($self->{routing}{$id}) if $opts->{delete};
|
||||
|
||||
croak "WARN: Could not find the connection to route msg $id" unless $conn;
|
||||
|
||||
return $conn;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user