MySQL MMM利用時に自動的に再実行するClass::DBI用のプラグイン

MySQL MMM利用時に自動的に再実行するClass::DBI用のプラグイン

 MySQL MMMはなかなかよさげなのですが、writerの切り替え時に実行中のSQLがあるとエラーになってしまい困るわけですよ。

 AutoCommitで使っているので失敗したら再実行すればいいので、SQLを発行してるをフックしてやればなんとかなりそうかな? と思ってClass::DBISQLが実行してるあたりをごにょごにょするプラグインを作ってみた。

 まあイマドキClass::DBIでもない気もしますが稼働中のものですし、以前Class::DBI::Plugin::Iteratorを書いたときにも似たようなことをしてたので簡単にできそうだったってのもあります。


package Class::DBI::Plugin::AutoRetry;
use strict;
use warnings;

our $EXECUTE_RETRY_COUNT = 20;
our $CONNECT_RETRY_COUNT = 30;
our $WAIT_SEC = 2;

use Carp ();

our @EXPORT = qw(
    _chk_retry_errmsg
    _croak
    _auto_increment_value
    _insert_row
    delete
    update

    _mk_db_closure
);

sub _chk_retry_errmsg {
    my $self = shift;
    my $errmsg = $_[0];
    $errmsg =~ s/^DBD::mysql::[sd]t execute failed: //;
    if (   $errmsg =~ /^MySQL server has gone away/
        or $errmsg =~ /^Query execution was interrupted/
        or $errmsg =~ /^Lost connection to MySQL server/
        or $errmsg =~ /^The MySQL server is running with the --read-only option/
        or $errmsg =~ /^Server shutdown in progress/
    ) {
        sleep($WAIT_SEC);
        return 1;
    }
}

sub import {
    my $class = shift;
    my %options = map lc $_, @_;
    my $pkg   = caller(0);
    no strict 'refs';

    foreach my $meth (@EXPORT) {
        *{"$pkg\::$meth"} = \&$meth;
    }

    my $sth_to_objects = $pkg->can('sth_to_objects');
    my $scalar_sth_to_objects = sub {
        my $res;
        my $retry_count = $EXECUTE_RETRY_COUNT;
        {
            $res = eval { $sth_to_objects->(@_) };
            if ($@) {
                if ($_[0]->_chk_retry_errmsg($@) and --$retry_count) {
                    redo;
                }

                die $@;
            }
        }
        return $res;
    };
    my $array_sth_to_objects = sub {
        my @res;
        my $retry_count = $EXECUTE_RETRY_COUNT;
        {
            @res = eval { $sth_to_objects->(@_) };
            if ($@) {
                if ($_[0]->_chk_retry_errmsg($@) and --$retry_count) {
                    redo;
                }

                die $@;
            }
        }
        return @res;
    };

    delete ${"$pkg\::"}{sth_to_objects} if exists ${"$pkg\::"}{sth_to_objects};
    *{"$pkg\::_sth_to_objects"} = *{"$pkg\::sth_to_objects"} = sub {
        wantarray? $array_sth_to_objects->(@_): $scalar_sth_to_objects->(@_);
    };
}

## for Class::DBI
sub _croak {
    my ($self, $message, %info) = @_;
    if ($self->_chk_retry_errmsg($info{err})) {
        $message = $info{err};
    }

    Carp::croak($message || $self);
}

sub _auto_increment_value {
    my $self = shift;
    my $sth = shift;
    my $dbh = $sth->{Database};

    # Try to do this in a standard method. Fall back to MySQL/SQLite
    # specific versions. TODO remove these when last_insert_id is more
    # widespread.
    # Note: I don't believe the last_insert_id can be zero. We need to
    # switch to defined() checks if it can.
    my $id = $dbh->last_insert_id(undef, undef, $self->table, undef)    # std
        || $dbh->{mysql_insertid}                                         # mysql
        || eval { $dbh->func('last_insert_rowid') }
        or $self->_croak("Can't get last insert id");
    return $id;
}

sub _insert_row {
    my $self = shift;
    my $data = shift;

    my $retry_count = $EXECUTE_RETRY_COUNT;
    {
        eval {
            my @columns = keys %$data;
            my $sth     = $self->sql_MakeNewObj(
                join(', ', @columns),
                join(', ', map $self->_column_placeholder($_), @columns),
            );
            $self->_bind_param($sth, \@columns);
            $sth->execute(values %$data);
            my @primary_columns = $self->primary_columns;
            $data->{ $primary_columns[0] } = $self->_auto_increment_value($sth)
                if @primary_columns == 1
                && !defined $data->{ $primary_columns[0] };
        };
        if ($@) {
            if ($self->_chk_retry_errmsg($@) and --$retry_count) {
                redo;
            }

            my $class = ref $self;
            return $self->_db_error(
                msg    => "Can't insert new $class: $@",
                err    => $@,
                method => 'insert'
            );
        }
    }

    return 1;
}

sub delete {
    my $self = shift;
    return $self->_search_delete(@_) if not ref $self;
    $self->remove_from_object_index;
    $self->call_trigger('before_delete');

    my $retry_count = $EXECUTE_RETRY_COUNT;
    {
        eval { $self->sql_DeleteMe->execute($self->id) };
        if ($@) {
            if ($self->_chk_retry_errmsg($@) and --$retry_count) {
                redo;
            }

            return $self->_db_error(
                msg    => "Can't delete $self: $@",
                err    => $@,
                method => 'delete'
            );
        }
    }

    $self->call_trigger('after_delete');
    undef %$self;
    bless $self, 'Class::DBI::Object::Has::Been::Deleted';
    return 1;
}

sub update {
    my $self  = shift;
    my $class = ref($self)
        or return $self->_croak("Can't call update as a class method");

    $self->call_trigger('before_update');
    return -1 unless my @changed_cols = $self->is_changed;
    $self->call_trigger('deflate_for_update');
    my @primary_columns = $self->primary_columns;

    my $rows;
    my $retry_count = $EXECUTE_RETRY_COUNT;
    {
        my $sth             = $self->sql_update($self->_update_line);
        $class->_bind_param($sth, \@changed_cols);

        $rows = eval { $sth->execute($self->_update_vals, $self->id); };
        if ($@) {
            if ($self->_chk_retry_errmsg($@) and --$retry_count) {
                redo;
            }

            return $self->_db_error(
                msg    => "Can't update $self: $@",
                err    => $@,
                method => 'update'
            );
        }
    }

    # enable this once new fixed DBD::SQLite is released:
    if (0 and $rows != 1) {    # should always only update one row
        $self->_croak("Can't update $self: row not found") if $rows == 0;
        $self->_croak("Can't update $self: updated more than one row");
    }

    $self->call_trigger('after_update', discard_columns => \@changed_cols);

    # delete columns that changed (in case adding to DB modifies them again)
    $self->_attribute_delete(@changed_cols);
    delete $self->{__Changed};
    return 1;
}


## for Ima::DBI
sub _mk_db_closure {
    my ($class, $dsn, $user, $pass, $attr) = @_;
        $attr ||= {};
    
    my $dbh;
    my $process_id = $$;
    return sub {
        # set the PID in a private cache key to prevent us
        # from sharing one with the parent after fork.  This
        # is better than disconnecting the existing $dbh since
        # the parent may still need the connection open.  Note
        # that forking code also needs to set InactiveDestroy
        # on all open handles in the child or the connection
        # will be broken during DESTROY.
        $attr->{private_cache_key_pid} = $$;

                # reopen if this is a new process or if the connection
                # is bad
        if ($process_id != $$ or not ($dbh && $dbh->FETCH('Active') && $dbh->ping)) {
            my $retry_count = $CONNECT_RETRY_COUNT;
            {
                eval {
                    $dbh = DBI->connect_cached($dsn, $user, $pass, $attr);
                };
                if ($@) {
                    if ($@ =~ /Can't connect to MySQL server/ and --$retry_count) {
                        sleep($WAIT_SEC);
                        redo;
                    }

                    die $@;
                }
            }

            $process_id = $$;
        }

        return $dbh;
    };

}


1;

 更新系(insert/update/delete)については元々eval{}でエラーチェックしていたので、その部分を修正する形で対応。元メソッドを呼ばないのはtriggerを複数実行すると問題がありそうな気がしたから。

 set_sql()を使って自前で更新系を書いているときは_chk_retry_errmsg()を呼び出して自分でリトライするように修正する必要があるけれど。

 参照系は必ずsth_to_objects()を呼び出してるので、そこでエラーのチェックして再実行するようにしました。

 can()で元のメソッドを保存してるのは、Class::DBI::Plugin::Iteratorがsth_to_objects()を書き換えてるから。

 呼び出し元のコンテキストによって戻り値が違うので、それに合わせるようにしたのですが……もっといい方法はないのかな?

 ちなみに全件をfetchした結果を返しているので、途中でfetchに失敗した場合はSQLを再発行して全件再取得してる――はず。

 ひょっとしたらconnect()の$attrに{mysql_auto_reconnect => 1}を渡す必要があるかもしれません。

(いちおう渡してる)

 _auto_increment_value()は本家でも$sthからデータベースハンドルを取得するようにしたほうがいいんじゃないかなー、とかちょっと思った。

 アクセスが多いサイトでこんなことをすると死にそうな気もしますが、更新系があまり走らないようなサイトだとこんなでもなんとかなるんじゃないのかなー、みたいな。