Update publish() on both Connection and Writer to use MPUB if need be:

We accept a list of messages now, before the trailing optional callback.

If multiple messages are present, we switch to MPUB.

Signed-off-by: Pedro Melo <melo@simplicidade.org>
This commit is contained in:
Pedro Melo
2014-07-21 05:13:36 +01:00
parent 1ff696d616
commit 0d8f13b950
2 changed files with 21 additions and 11 deletions

View File

@@ -130,15 +130,23 @@ sub subscribe {
} }
sub publish { sub publish {
my ($self, $topic, $data, $cb) = @_; my ($self, $topic, @data) = @_;
return unless my $hdl = $self->{handle}; return unless my $hdl = $self->{handle};
$cb = sub { } my $cb;
unless $cb; $cb = pop @data if ref($data[-1]) eq 'CODE' or !defined($data[-1]);
return unless @data;
$hdl->push_write("PUB $topic\012" . pack('N', length($data)) . $data); my $body = join('', map { pack('N', length($_)) . $_ } @data);
$self->_on_next_success_frame($cb); if (@data == 1) {
$hdl->push_write("PUB $topic\012$body");
$self->_on_next_success_frame($cb);
}
else {
$hdl->push_write("MPUB $topic\012" . pack('N', length($body)) . pack('N', scalar(@data)) . $body);
$self->_on_next_success_frame($cb);
}
return; return;
} }

View File

@@ -12,19 +12,21 @@ use parent 'AnyEvent::NSQ::Client';
#### Producer API #### Producer API
## Publish a single message - callback is only called if we succedd ## Publish a single or multiple message - callback is only called if we succedd
sub publish { sub publish {
my ($self, $topic, $data, $cb) = @_; my ($self, $topic, @data) = @_;
my $conn = $self->_random_connected_conn; my $conn = $self->_random_connected_conn;
croak "ERROR: there no active connections at this moment," unless $conn; croak "ERROR: there no active connections at this moment," unless $conn;
$cb = sub { $cb->($self, $topic, $data, @_) } if (ref($data[-1]) eq 'CODE' or !defined($data[-1])) {
if $cb; my $cb = pop @data;
push @data, sub { $cb->($self, $topic, \@data, @_) }
if $cb;
}
return $conn->publish($topic, $data, $cb); return $conn->publish($topic, @data);
} }
1; 1;