Compare commits
6 Commits
nsqlookupd
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f3fb375d10 | ||
|
|
1c5a3526fd | ||
|
|
f56271c965 | ||
|
|
681939809d | ||
|
|
dca911fee8 | ||
|
|
a08049ee0f |
@@ -38,15 +38,27 @@ sub publish {
|
||||
my $conn = $self->_random_connected_conn;
|
||||
croak "ERROR: there no active connections at this moment," unless $conn;
|
||||
|
||||
my @args;
|
||||
if (ref($data[-1]) eq 'CODE' or !defined($data[-1])) {
|
||||
my $cb = pop @data;
|
||||
push @data, sub { $cb->($self, $topic, \@data, @_) }
|
||||
if $cb;
|
||||
|
||||
if ($cb) {
|
||||
my @cb_data = @data;
|
||||
push @data, sub { $cb->($self, $topic, \@cb_data, @_) }
|
||||
}
|
||||
}
|
||||
|
||||
return $conn->publish($topic, @data);
|
||||
}
|
||||
|
||||
sub ready {
|
||||
my ($self, $ready_count) = @_;
|
||||
|
||||
$_->{conn}->ready($ready_count) for values %{ $self->{nsqd_conns} };
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
#### Argument parsing
|
||||
|
||||
|
||||
@@ -58,8 +58,8 @@ sub _identified {
|
||||
|
||||
## Keep the connection in the registry in case the user
|
||||
## wants to issue FIN/REQs inside the callback
|
||||
$self->{routing}->{$msg->{message_id}} = $conn;
|
||||
weaken( $self->{routing}->{$msg->{message_id}} );
|
||||
$self->{routing}->{ $msg->{message_id} } = $conn;
|
||||
weaken($self->{routing}->{ $msg->{message_id} });
|
||||
|
||||
$self->{message_cb}->($self, $msg);
|
||||
}
|
||||
@@ -71,7 +71,7 @@ sub _identified {
|
||||
sub mark_as_done_msg {
|
||||
my ($self, $msg) = @_;
|
||||
|
||||
my $conn = $self->_find_and_delete_message_connection($msg);
|
||||
my $conn = $self->_find_message_connection($msg, {delete => 1});
|
||||
|
||||
$conn->mark_as_done_msg($msg);
|
||||
return 1;
|
||||
@@ -80,31 +80,32 @@ sub mark_as_done_msg {
|
||||
sub requeue_msg {
|
||||
my ($self, $msg, $delay) = @_;
|
||||
|
||||
my $conn = $self->_find_and_delete_message_connection($msg);
|
||||
my $conn = $self->_find_message_connection($msg, {delete => 1});
|
||||
|
||||
$conn->requeue_msg($msg, $delay);
|
||||
return 1;
|
||||
}
|
||||
|
||||
sub touch_message {
|
||||
sub touch_msg {
|
||||
my ($self, $msg) = @_;
|
||||
|
||||
my $conn = $self->_find_and_delete_message_connection($msg);
|
||||
my $conn = $self->_find_message_connection($msg);
|
||||
|
||||
$conn->touch_msg($msg);
|
||||
return 1;
|
||||
}
|
||||
*touch_message = \&touch_msg;
|
||||
|
||||
sub _find_and_delete_message_connection {
|
||||
my ($self, $msg) = @_;
|
||||
sub _find_message_connection {
|
||||
my ($self, $msg, $opts) = @_;
|
||||
$opts = {} unless ref $opts;
|
||||
|
||||
my $id = ref($msg) ? $msg->{message_id} : $msg;
|
||||
|
||||
my $conn = delete($self->{routing}->{$id});
|
||||
my $conn = $self->{routing}{$id};
|
||||
delete($self->{routing}{$id}) if $opts->{delete};
|
||||
|
||||
if ( !$conn ) {
|
||||
croak "WARN: Could not find the connection to route msg $id";
|
||||
}
|
||||
croak "WARN: Could not find the connection to route msg $id" unless $conn;
|
||||
|
||||
return $conn;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user