Compare commits
6 Commits
nsqlookupd
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f3fb375d10 | ||
|
|
1c5a3526fd | ||
|
|
f56271c965 | ||
|
|
681939809d | ||
|
|
dca911fee8 | ||
|
|
a08049ee0f |
@@ -38,15 +38,27 @@ sub publish {
|
|||||||
my $conn = $self->_random_connected_conn;
|
my $conn = $self->_random_connected_conn;
|
||||||
croak "ERROR: there no active connections at this moment," unless $conn;
|
croak "ERROR: there no active connections at this moment," unless $conn;
|
||||||
|
|
||||||
|
my @args;
|
||||||
if (ref($data[-1]) eq 'CODE' or !defined($data[-1])) {
|
if (ref($data[-1]) eq 'CODE' or !defined($data[-1])) {
|
||||||
my $cb = pop @data;
|
my $cb = pop @data;
|
||||||
push @data, sub { $cb->($self, $topic, \@data, @_) }
|
|
||||||
if $cb;
|
if ($cb) {
|
||||||
|
my @cb_data = @data;
|
||||||
|
push @data, sub { $cb->($self, $topic, \@cb_data, @_) }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return $conn->publish($topic, @data);
|
return $conn->publish($topic, @data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sub ready {
|
||||||
|
my ($self, $ready_count) = @_;
|
||||||
|
|
||||||
|
$_->{conn}->ready($ready_count) for values %{ $self->{nsqd_conns} };
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#### Argument parsing
|
#### Argument parsing
|
||||||
|
|
||||||
@@ -57,8 +69,8 @@ sub _parse_args {
|
|||||||
$self->{disconnect_cb} = delete($args->{disconnect_cb}) || sub { };
|
$self->{disconnect_cb} = delete($args->{disconnect_cb}) || sub { };
|
||||||
$self->{error_cb} = delete($args->{error_cb}) || sub { croak($_[1]) };
|
$self->{error_cb} = delete($args->{error_cb}) || sub { croak($_[1]) };
|
||||||
|
|
||||||
$self->{connect_cb} = delete($args->{connect_cb}) || sub { };
|
$self->{connect_cb} = delete($args->{connect_cb}) || sub { };
|
||||||
$self->{identify_cb} = delete($args->{identify_cb}) || sub { };
|
$self->{identify_cb} = delete($args->{identify_cb}) || sub { };
|
||||||
|
|
||||||
for my $arg (qw( client_id hostname connect_timeout )) {
|
for my $arg (qw( client_id hostname connect_timeout )) {
|
||||||
$self->{$arg} = delete($args->{$arg}) if exists $args->{$arg};
|
$self->{$arg} = delete($args->{$arg}) if exists $args->{$arg};
|
||||||
|
|||||||
@@ -58,8 +58,8 @@ sub _identified {
|
|||||||
|
|
||||||
## Keep the connection in the registry in case the user
|
## Keep the connection in the registry in case the user
|
||||||
## wants to issue FIN/REQs inside the callback
|
## wants to issue FIN/REQs inside the callback
|
||||||
$self->{routing}->{$msg->{message_id}} = $conn;
|
$self->{routing}->{ $msg->{message_id} } = $conn;
|
||||||
weaken( $self->{routing}->{$msg->{message_id}} );
|
weaken($self->{routing}->{ $msg->{message_id} });
|
||||||
|
|
||||||
$self->{message_cb}->($self, $msg);
|
$self->{message_cb}->($self, $msg);
|
||||||
}
|
}
|
||||||
@@ -71,40 +71,41 @@ sub _identified {
|
|||||||
sub mark_as_done_msg {
|
sub mark_as_done_msg {
|
||||||
my ($self, $msg) = @_;
|
my ($self, $msg) = @_;
|
||||||
|
|
||||||
my $conn = $self->_find_and_delete_message_connection($msg);
|
my $conn = $self->_find_message_connection($msg, {delete => 1});
|
||||||
|
|
||||||
$conn->mark_as_done_msg($msg);
|
$conn->mark_as_done_msg($msg);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
sub requeue_msg {
|
sub requeue_msg {
|
||||||
my ($self, $msg, $delay) = @_;
|
my ($self, $msg, $delay) = @_;
|
||||||
|
|
||||||
my $conn = $self->_find_and_delete_message_connection($msg);
|
my $conn = $self->_find_message_connection($msg, {delete => 1});
|
||||||
|
|
||||||
$conn->requeue_msg($msg, $delay);
|
$conn->requeue_msg($msg, $delay);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
sub touch_message {
|
sub touch_msg {
|
||||||
my ($self, $msg) = @_;
|
my ($self, $msg) = @_;
|
||||||
|
|
||||||
my $conn = $self->_find_and_delete_message_connection($msg);
|
my $conn = $self->_find_message_connection($msg);
|
||||||
|
|
||||||
$conn->touch_msg($msg);
|
$conn->touch_msg($msg);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
*touch_message = \&touch_msg;
|
||||||
|
|
||||||
sub _find_and_delete_message_connection {
|
sub _find_message_connection {
|
||||||
my ($self, $msg) = @_;
|
my ($self, $msg, $opts) = @_;
|
||||||
|
$opts = {} unless ref $opts;
|
||||||
|
|
||||||
my $id = ref($msg) ? $msg->{message_id} : $msg;
|
my $id = ref($msg) ? $msg->{message_id} : $msg;
|
||||||
|
|
||||||
my $conn = delete($self->{routing}->{$id});
|
my $conn = $self->{routing}{$id};
|
||||||
|
delete($self->{routing}{$id}) if $opts->{delete};
|
||||||
if ( !$conn ) {
|
|
||||||
croak "WARN: Could not find the connection to route msg $id";
|
croak "WARN: Could not find the connection to route msg $id" unless $conn;
|
||||||
}
|
|
||||||
|
|
||||||
return $conn;
|
return $conn;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user