Compare commits

6 Commits

Author SHA1 Message Date
Pedro Melo
f3fb375d10 Fix memory leak, circular reference on publish()
Reported, analised and fixed by Tiago Quintela and Nuno Mota.

Signed-off-by: Pedro Melo <melo@simplicidade.org>
2015-02-28 07:49:45 +00:00
Pedro Melo
1c5a3526fd Tidy
Signed-off-by: Pedro Melo <melo@simplicidade.org>
2015-01-02 12:46:29 +00:00
Pedro Melo
f56271c965 Refactor _find_and_delete_message_connection as _find_message_connection:
Not all commands need the delete part, touch_msg for example

Signed-off-by: Pedro Melo <melo@simplicidade.org>
2015-01-02 12:46:23 +00:00
Pedro Melo
681939809d Rename touch_message to touch_msg, for consistency:
Kept the old name around for now...

Signed-off-by: Pedro Melo <melo@simplicidade.org>
2015-01-02 12:45:22 +00:00
Pedro Melo
dca911fee8 Tidy
Signed-off-by: Pedro Melo <melo@simplicidade.org>
2014-10-23 07:08:45 +01:00
Pedro Melo
a08049ee0f Add ready() to ::Client
Allow clients to update the ready count dynamically

Signed-off-by: Pedro Melo <melo@simplicidade.org>
2014-10-23 07:08:41 +01:00
2 changed files with 32 additions and 19 deletions

View File

@@ -38,15 +38,27 @@ sub publish {
my $conn = $self->_random_connected_conn; my $conn = $self->_random_connected_conn;
croak "ERROR: there no active connections at this moment," unless $conn; croak "ERROR: there no active connections at this moment," unless $conn;
my @args;
if (ref($data[-1]) eq 'CODE' or !defined($data[-1])) { if (ref($data[-1]) eq 'CODE' or !defined($data[-1])) {
my $cb = pop @data; my $cb = pop @data;
push @data, sub { $cb->($self, $topic, \@data, @_) }
if $cb; if ($cb) {
my @cb_data = @data;
push @data, sub { $cb->($self, $topic, \@cb_data, @_) }
}
} }
return $conn->publish($topic, @data); return $conn->publish($topic, @data);
} }
sub ready {
my ($self, $ready_count) = @_;
$_->{conn}->ready($ready_count) for values %{ $self->{nsqd_conns} };
return;
}
#### Argument parsing #### Argument parsing
@@ -57,8 +69,8 @@ sub _parse_args {
$self->{disconnect_cb} = delete($args->{disconnect_cb}) || sub { }; $self->{disconnect_cb} = delete($args->{disconnect_cb}) || sub { };
$self->{error_cb} = delete($args->{error_cb}) || sub { croak($_[1]) }; $self->{error_cb} = delete($args->{error_cb}) || sub { croak($_[1]) };
$self->{connect_cb} = delete($args->{connect_cb}) || sub { }; $self->{connect_cb} = delete($args->{connect_cb}) || sub { };
$self->{identify_cb} = delete($args->{identify_cb}) || sub { }; $self->{identify_cb} = delete($args->{identify_cb}) || sub { };
for my $arg (qw( client_id hostname connect_timeout )) { for my $arg (qw( client_id hostname connect_timeout )) {
$self->{$arg} = delete($args->{$arg}) if exists $args->{$arg}; $self->{$arg} = delete($args->{$arg}) if exists $args->{$arg};

View File

@@ -58,8 +58,8 @@ sub _identified {
## Keep the connection in the registry in case the user ## Keep the connection in the registry in case the user
## wants to issue FIN/REQs inside the callback ## wants to issue FIN/REQs inside the callback
$self->{routing}->{$msg->{message_id}} = $conn; $self->{routing}->{ $msg->{message_id} } = $conn;
weaken( $self->{routing}->{$msg->{message_id}} ); weaken($self->{routing}->{ $msg->{message_id} });
$self->{message_cb}->($self, $msg); $self->{message_cb}->($self, $msg);
} }
@@ -71,40 +71,41 @@ sub _identified {
sub mark_as_done_msg { sub mark_as_done_msg {
my ($self, $msg) = @_; my ($self, $msg) = @_;
my $conn = $self->_find_and_delete_message_connection($msg); my $conn = $self->_find_message_connection($msg, {delete => 1});
$conn->mark_as_done_msg($msg); $conn->mark_as_done_msg($msg);
return 1; return 1;
} }
sub requeue_msg { sub requeue_msg {
my ($self, $msg, $delay) = @_; my ($self, $msg, $delay) = @_;
my $conn = $self->_find_and_delete_message_connection($msg); my $conn = $self->_find_message_connection($msg, {delete => 1});
$conn->requeue_msg($msg, $delay); $conn->requeue_msg($msg, $delay);
return 1; return 1;
} }
sub touch_message { sub touch_msg {
my ($self, $msg) = @_; my ($self, $msg) = @_;
my $conn = $self->_find_and_delete_message_connection($msg); my $conn = $self->_find_message_connection($msg);
$conn->touch_msg($msg); $conn->touch_msg($msg);
return 1; return 1;
} }
*touch_message = \&touch_msg;
sub _find_and_delete_message_connection { sub _find_message_connection {
my ($self, $msg) = @_; my ($self, $msg, $opts) = @_;
$opts = {} unless ref $opts;
my $id = ref($msg) ? $msg->{message_id} : $msg; my $id = ref($msg) ? $msg->{message_id} : $msg;
my $conn = delete($self->{routing}->{$id}); my $conn = $self->{routing}{$id};
delete($self->{routing}{$id}) if $opts->{delete};
if ( !$conn ) {
croak "WARN: Could not find the connection to route msg $id"; croak "WARN: Could not find the connection to route msg $id" unless $conn;
}
return $conn; return $conn;
} }