Compare commits

6 Commits

Author SHA1 Message Date
Pedro Melo
f3fb375d10 Fix memory leak, circular reference on publish()
Reported, analised and fixed by Tiago Quintela and Nuno Mota.

Signed-off-by: Pedro Melo <melo@simplicidade.org>
2015-02-28 07:49:45 +00:00
Pedro Melo
1c5a3526fd Tidy
Signed-off-by: Pedro Melo <melo@simplicidade.org>
2015-01-02 12:46:29 +00:00
Pedro Melo
f56271c965 Refactor _find_and_delete_message_connection as _find_message_connection:
Not all commands need the delete part, touch_msg for example

Signed-off-by: Pedro Melo <melo@simplicidade.org>
2015-01-02 12:46:23 +00:00
Pedro Melo
681939809d Rename touch_message to touch_msg, for consistency:
Kept the old name around for now...

Signed-off-by: Pedro Melo <melo@simplicidade.org>
2015-01-02 12:45:22 +00:00
Pedro Melo
dca911fee8 Tidy
Signed-off-by: Pedro Melo <melo@simplicidade.org>
2014-10-23 07:08:45 +01:00
Pedro Melo
a08049ee0f Add ready() to ::Client
Allow clients to update the ready count dynamically

Signed-off-by: Pedro Melo <melo@simplicidade.org>
2014-10-23 07:08:41 +01:00
2 changed files with 32 additions and 19 deletions

View File

@@ -38,15 +38,27 @@ sub publish {
my $conn = $self->_random_connected_conn;
croak "ERROR: there no active connections at this moment," unless $conn;
my @args;
if (ref($data[-1]) eq 'CODE' or !defined($data[-1])) {
my $cb = pop @data;
push @data, sub { $cb->($self, $topic, \@data, @_) }
if $cb;
if ($cb) {
my @cb_data = @data;
push @data, sub { $cb->($self, $topic, \@cb_data, @_) }
}
}
return $conn->publish($topic, @data);
}
sub ready {
my ($self, $ready_count) = @_;
$_->{conn}->ready($ready_count) for values %{ $self->{nsqd_conns} };
return;
}
#### Argument parsing

View File

@@ -71,7 +71,7 @@ sub _identified {
sub mark_as_done_msg {
my ($self, $msg) = @_;
my $conn = $self->_find_and_delete_message_connection($msg);
my $conn = $self->_find_message_connection($msg, {delete => 1});
$conn->mark_as_done_msg($msg);
return 1;
@@ -80,31 +80,32 @@ sub mark_as_done_msg {
sub requeue_msg {
my ($self, $msg, $delay) = @_;
my $conn = $self->_find_and_delete_message_connection($msg);
my $conn = $self->_find_message_connection($msg, {delete => 1});
$conn->requeue_msg($msg, $delay);
return 1;
}
sub touch_message {
sub touch_msg {
my ($self, $msg) = @_;
my $conn = $self->_find_and_delete_message_connection($msg);
my $conn = $self->_find_message_connection($msg);
$conn->touch_msg($msg);
return 1;
}
*touch_message = \&touch_msg;
sub _find_and_delete_message_connection {
my ($self, $msg) = @_;
sub _find_message_connection {
my ($self, $msg, $opts) = @_;
$opts = {} unless ref $opts;
my $id = ref($msg) ? $msg->{message_id} : $msg;
my $conn = delete($self->{routing}->{$id});
my $conn = $self->{routing}{$id};
delete($self->{routing}{$id}) if $opts->{delete};
if ( !$conn ) {
croak "WARN: Could not find the connection to route msg $id";
}
croak "WARN: Could not find the connection to route msg $id" unless $conn;
return $conn;
}