Move publish() public API to Client, even Readers get to publish sometimes

Signed-off-by: Pedro Melo <melo@simplicidade.org>
This commit is contained in:
Pedro Melo
2014-07-21 05:16:17 +01:00
parent 0d8f13b950
commit cfa7a7b4a5
2 changed files with 19 additions and 18 deletions

View File

@@ -11,7 +11,9 @@ use AnyEvent::Socket ();
use Carp 'croak';
use AnyEvent::NSQ::Connection;
#### Public API
## constructor
sub new {
my ($class, %args) = @_;
my $self = bless {}, $class;
@@ -22,12 +24,29 @@ sub new {
return $self;
}
## disconnect from our pool of nsqd connection
sub disconnect {
my ($self, $cb) = @_;
$_->{conn}->disconnect($cb) for values %{ $self->{nsqd_conns} };
}
## Publish a single or multiple message - callback is only called if we succedd
sub publish {
my ($self, $topic, @data) = @_;
my $conn = $self->_random_connected_conn;
croak "ERROR: there no active connections at this moment," unless $conn;
if (ref($data[-1]) eq 'CODE' or !defined($data[-1])) {
my $cb = pop @data;
push @data, sub { $cb->($self, $topic, \@data, @_) }
if $cb;
}
return $conn->publish($topic, @data);
}
#### Argument parsing

View File

@@ -10,23 +10,5 @@ use Carp 'croak';
use parent 'AnyEvent::NSQ::Client';
#### Producer API
## Publish a single or multiple message - callback is only called if we succedd
sub publish {
my ($self, $topic, @data) = @_;
my $conn = $self->_random_connected_conn;
croak "ERROR: there no active connections at this moment," unless $conn;
if (ref($data[-1]) eq 'CODE' or !defined($data[-1])) {
my $cb = pop @data;
push @data, sub { $cb->($self, $topic, \@data, @_) }
if $cb;
}
return $conn->publish($topic, @data);
}
1;