Refactor _start_recv_frames into smaller methods:

* added _process_incoming_frame to pick the correct frame type processor;
* added _process_success_frame, _process_error_frame and
  _process_message_frame:
  * they should return undef to give up;
  * or any truth value to keep reading frames.
* renamed _on_success_frame to _on_next_success_frame;
* we no longer die on unknown frame types, we just ignore them.

Signed-off-by: Pedro Melo <melo@simplicidade.org>
This commit is contained in:
Pedro Melo
2014-07-20 17:43:47 +01:00
parent 36c67c9586
commit d7a830f0f8

View File

@@ -94,7 +94,7 @@ sub identify {
$hdl->push_write(pack('N', length($data)));
$hdl->push_write($data);
$self->_on_success_frame(sub { $cb->($self, $self->{identify_info} = $_[1]) });
$self->_on_next_success_frame(sub { $cb->($self, $self->{identify_info} = $_[1]) });
return;
}
@@ -106,7 +106,7 @@ sub subscribe {
$self->{message_cb} = $cb;
$hdl->push_write("SUB $topic $chan\012");
$self->_on_success_frame(sub { }); ## We don't care about the success ok
$self->_on_next_success_frame(sub { }); ## We don't care about the success ok, and errors will kill us
return;
}
@@ -238,10 +238,9 @@ sub _send_magic_identifier { $_[0]{handle}->push_write(' V2') }
sub _start_recv_frames {
my ($self) = @_;
my $hdl = $self->{handle};
my $err_cb = $self->{error_cb};
my @read_frame; ## on a separate line, we need it circular
@read_frame = (
my @push_read_setup;
@push_read_setup = (
chunk => 8,
sub {
my ($size, $frame_type) = unpack('NN', $_[1]);
@@ -249,36 +248,74 @@ sub _start_recv_frames {
chunk => $size - 4, ## remove size of frame_type...
sub {
my ($msg) = $_[1];
# print STDERR ">>>> FRAME $size, $frame_type";
if ($frame_type == 0) { ## OK frame
# print STDERR ", success: $msg\n";
my $info = {};
my $action = $self->_process_incoming_frame($frame_type, $msg);
return $hdl->push_read(@push_read_setup) if $action;
$self->_force_disconnect;
}
);
}
);
## Start with first frame...
$hdl->push_read(@push_read_setup);
}
## Decide which type of frame we have, and take care of it
sub _process_incoming_frame {
my ($self, $frame_type, $msg) = @_;
my $res;
if ($frame_type == 0) { $res = $self->_process_success_frame($msg) }
elsif ($frame_type == 1) { $res = $self->_process_error_frame($msg) }
elsif ($frame_type == 2) { $res = $self->_process_message_frame($msg) }
# no else, be liberal in what you accept and all of that...
return $res;
}
## Proces success frames, both plain OK, JSON-encoded and _heartbeat_ success messages
sub _process_success_frame {
my ($self, $msg) = @_;
if ($msg eq '_heartbeat_') {
$self->nop;
}
else {
if ($msg ne 'OK') {
$info = eval { JSON::XS::decode_json($msg) };
my $info = eval { JSON::XS::decode_json($msg) };
unless ($info) {
$err_cb->($self, qq{unexpected/invalid JSON response '$msg'});
$self->_force_disconnect;
@read_frame = ();
$self->_log_error(qq{unexpected/invalid JSON response '$msg'});
return;
}
my $cb = shift @{ $self->{success_cb_queue} || [] };
$cb->($self, $info) if $cb;
}
}
return 'keep_reading_frames';
}
elsif ($frame_type == 1) { ## error frame
# print STDERR ", error: $msg\n";
$self->{error_cb}->($self, qq{received error '$msg'});
$self->_force_disconnect;
@read_frame = ();
## Manage queue of pending success frame handlers
sub _on_next_success_frame { push @{ $_[0]->{success_cb_queue} }, $_[1] }
## Processing of error messages, just signal and
sub _process_error_frame {
my ($self, $msg) = @_;
$self->_log_error(qq{received error frame '$msg'});
return;
}
elsif ($frame_type == 2) { ## message frame
## Process regular message frames, callback, and deal with RDY state
sub _process_message_frame {
my ($self, $msg) = @_;
my ($t1, $t2, $attempts, $message_id) = unpack('NNnA16', substr($msg, 0, 26, ''));
$msg = {
attempts => $attempts,
@@ -286,38 +323,16 @@ sub _start_recv_frames {
tstamp => ($t2 | ($t1 << 32)),
message => $msg,
};
# print STDERR ", msg: $attempts, $message_id, $msg\n";
$self->{in_flight}++;
$self->{message_cb}->($self, $msg) if $self->{message_cb};
## FIXME: this logic was more of infered than learned, but I remember seeing 25% somewhere
# print STDERR ">>>> READY CHECK? $self->{in_flight} / $self->{ready_count}\n";
## FIXME: move RDY state processing to Reader
$self->ready($self->{ready_count})
if $self->{ready_count} and $self->{in_flight} / $self->{ready_count} > .25;
}
else {
$err_cb->($self, qq{unexpected frame type '$frame_type'});
$self->_force_disconnect;
@read_frame = ();
return;
}
## ... and keep reading those frames
# print STDERR ">>>> RESET FRAME READER\n";
$hdl->push_read(@read_frame);
}
);
}
);
## Start with first frame...
$hdl->push_read(@read_frame);
}
sub _on_success_frame {
my ($self, $cb) = @_;
push @{ $self->{success_cb_queue} }, $cb;
return 'keep_reading_frames';
}