Refactor Connection creation in Reader:
* easier to understand; * added support for client_id and hostname from Reader => Connection. Signed-off-by: Pedro Melo <melo@simplicidade.org>
This commit is contained in:
@@ -78,38 +78,41 @@ sub _start_nsqd_connection {
|
||||
my ($host, $port) = AnyEvent::Socket::parse_hostport($nsqd_tcp_address, 4150); ## 4150 is the default port for nsqd
|
||||
croak(qq{FATAL: could not parse '$nsqd_tcp_address' as a valid address/port combination}) unless $host and $port;
|
||||
|
||||
$conns->{$nsqd_tcp_address}{conn} = AnyEvent::NSQ::Connection->new(
|
||||
host => $host,
|
||||
port => $port,
|
||||
error_cb => $self->{error_cb},
|
||||
connect_cb => sub {
|
||||
my ($conn) = @_;
|
||||
$conn->identify(
|
||||
sub {
|
||||
$conn->subscribe(
|
||||
$self->{topic},
|
||||
$self->{channel},
|
||||
sub {
|
||||
## FIXME: bless it with the future AnyEvent::NSQ::Message
|
||||
my $msg = $_[1];
|
||||
my %conn = (host => $host, port => $port);
|
||||
for my $arg (qw( client_id hostname error_cb )) {
|
||||
$conn{$arg} = $self->{$arg} if exists $self->{$arg};
|
||||
}
|
||||
|
||||
my $action = $self->{message_cb}->($self, $msg);
|
||||
$conn{connect_cb} = sub {
|
||||
my ($conn) = @_;
|
||||
$conn->identify(
|
||||
sub {
|
||||
$conn->subscribe(
|
||||
$self->{topic},
|
||||
$self->{channel},
|
||||
sub {
|
||||
## FIXME: bless it with the future AnyEvent::NSQ::Message
|
||||
my $msg = $_[1];
|
||||
|
||||
if (not defined $action) { $conn->mark_as_done_msg($_[1]) }
|
||||
elsif ($action >= 0) { $conn->requeue_msg($_[1], $action) }
|
||||
else { $conn->touch_msg($_[1], -$action) }
|
||||
}
|
||||
);
|
||||
my $action = $self->{message_cb}->($self, $msg);
|
||||
|
||||
$conn->ready($self->{ready_count} || int(($_[1]{max_rdy_count} || 2000) / 10));
|
||||
}
|
||||
);
|
||||
},
|
||||
disconnect_cb => sub {
|
||||
## FIXME: deal with reconnects here
|
||||
$self->{disconnect_cb}->(@_) if $self->{disconnect_cb};
|
||||
},
|
||||
);
|
||||
if (not defined $action) { $conn->mark_as_done_msg($_[1]) }
|
||||
elsif ($action >= 0) { $conn->requeue_msg($_[1], $action) }
|
||||
else { $conn->touch_msg($_[1], -$action) }
|
||||
}
|
||||
);
|
||||
|
||||
$conn->ready($self->{ready_count} || int(($_[1]{max_rdy_count} || 2000) / 10));
|
||||
}
|
||||
);
|
||||
};
|
||||
|
||||
$conn{disconnect_cb} = sub {
|
||||
## FIXME: deal with reconnects here
|
||||
$self->{disconnect_cb}->(@_) if $self->{disconnect_cb};
|
||||
};
|
||||
|
||||
$conns->{$nsqd_tcp_address}{conn} = AnyEvent::NSQ::Connection->new(%conn);
|
||||
$conns->{$nsqd_tcp_address}{state} = 'connecting';
|
||||
|
||||
return;
|
||||
|
||||
Reference in New Issue
Block a user