2016年3月19日土曜日

N元連立1次方程式の解法プログラム(マルチスレッド版)

前、Fock-Joinフレームワークを使ったフィボナッチ数列を書いたとき、
連立方程式に応用したらもっとはやく計算できるんじゃね?と思ったので、作ってみた
連立方程式は前回同様クラメルの公式。今更だけど、連立方程式は掃き出し法が最強だと思う。。

前回との演算の違い

前の行列式を求めるロジックはこんな感じ
※総和を返すと書いているが、厳密には総和ではなく (-1)j * a0j の係数が付きます

これを単一スレッドで処理すればそりゃ遅くなる。
そこで今回は上図の水色部分をすべてスレッドにして平行処理させようと思う


プログラム

まずはマルチタスクにする行列式を求めるタスククラスから
継承元はRecursiveTask。型パラメータは戻値になるので行列式を返すDoubleで
メソッドはこの3つ
メソッド名効果
コンストラクタ行列式計算用の行列を設定
compute()並行処理内容。めっちゃ行列式の演算スレッド生成
演算ちょっと楽にするために3元まではサラス法や
余因子展開の行列を作るメソッド余因子行列をつくる
注目はcompute()のdefaultに入ったとき。
このときに余因子の数だけ並行処理して余因子の行列式を求める
import java.util.concurrent.RecursiveTask;

/** 行列式を求めるタスククラス. */
class Determinant extends RecursiveTask<Double> {
  /** 行列式を求める対象の行列 */
  final double[][] matrix;
 
  /** 行列式のコンストラクタ
   * @param matrix 行列
   */
  Determinant(double[][] matrix) {
    this.matrix = matrix;
  }
 
  /**
   * 並列処理
   * 3×3行列まではサラス法
   * それ以降は余因子展開から求めていく
   */
  @Override
  protected Double compute() {
    double ret = 0;
    switch (this.matrix.length) {
      case 1:
        ret = matrix[0][0];
        break;
 
      case 2:
        ret = matrix[0][0] * matrix[1][1]
          - matrix[1][0] * matrix[0][1];
        break;
        
      case 3:
        ret = matrix[0][0] * matrix[1][1] * matrix[2][2] 
          + matrix[1][0] * matrix[2][1] * matrix[0][2]
          + matrix[2][0] * matrix[0][1] * matrix[1][2]
          - matrix[0][2] * matrix[1][1] * matrix[2][0]
          - matrix[1][0] * matrix[0][1] * matrix[2][2]
          - matrix[2][1] * matrix[1][2] * matrix[0][0];
        break;
        
      default:
        Determinant[] dets = new Determinant[matrix.length];
        for (int j = 0; j < matrix.length; j++) {
          dets[j] = new Determinant(getLaplaceMarix(this.matrix, 0, j));
          if(j == 0) {
            int sign = ((j % 2) == 0) ? 1 : -1;
            ret += sign * matrix[j][0] * dets[j].compute();
          } else {
            dets[j].fork();
          }
        }
 
        for (int j = 1; j < matrix.length; j++) {
          int sign = ((j % 2) == 0) ? 1 : -1;
          ret += sign * matrix[j][0] * dets[j].join();
        }
 
        break;
      }
      return ret;
    }
 
  /**
   * 余因子行列を生成
   * @param matrix 元の行列
   * @param line   縦
   * @param col  横
   * @return 余因子行列
   */
  static double[][] getLaplaceMarix(double[][] matrix, int line, int col) {
    int len = matrix.length;
     // 余因子展開で使う行列
    double laplace_marix[][] = new double[len - 1][len - 1];
 
    for(int i = 0, y = 0; i < len; i++) {
      if(i == col) continue;
      for(int j = 0, x = 0; j < len; j++){
        if(j == line)continue;
        laplace_marix[y][x] = matrix[i][j];
        x++;
      }
      y++;
    }
    return laplace_marix;
  }
}
で、この行列式演算タスククラスを使って、連立方程式を組み立てるのはメイン文
必要な行列式のタスク数はN+1個(分母+解の個数分)なので、Fork-Joinのプールにそれぞれをサブミットで並行処理させてく
サブミットの戻値であるFutureを保持し、演算終了後に結果を取得し演算を返す
フィボナッチ数列を求めるときに使ってたインボークだと、1つの行列式を求めるのには問題ないが、今回はN+1つの行列式すべてを平行に動かすので、サブミットを使用
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ExecutionException;

public class Main {
  // メイン処理
  public static void main(String[] args) {
    // 連立方程式を行列化
    final double matrix[][] = {
        // x1 x2 x3 x4 b ※bは移行してるわけではないので、右辺のbを書けばOK
        { 2.0, 1.0, 3.0, 4.0, 2.0 }, // 式1: 2x + y + 3z + 4w = 2
        { 3.0, 2.0, 5.0, 2.0, 12.0 }, // 式2: 3x + 2y + 5z + 2w = 12
        { 3.0, 4.0, 1.0, -1.0, 4.0 }, // 式3: 3x + 4y + z - 1w = 4
        { -1.0, -3.0, 1.0, 3.0, -1.0 } // 式4:-1x - 3y + z + 3w = -1
    };
 
    // 演算の準備
    final ForkJoinPool pool = new ForkJoinPool();
    final java.util.List<ForkJoinTask<Double>> ret = new java.util.ArrayList<>();
 
    // 行列式の演算開始( 分母 -> 分子の末尾から降順 )
    for (int i = matrix.length; i >= 0; i--) {
      double[][] calc_matrix = changeColumn(matrix, i, matrix.length);
      Determinant task = new Determinant(calc_matrix);
      ret.add(pool.submit(task));
    }
 
    // 結果の出力
    try {
      // とりあえず共通の分母を先に取得
      double denominator = ret.remove(0).get();
 
      // 分子を昇順で取得し、解を計算・表示させていく
      java.util.Collections.reverse(ret);
      for (ForkJoinTask<Double> task : ret) {
        double result = task.get() / denominator;
        System.out.println("result = " + result);
      }
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }
 
    // プールは最後に終了させる
    pool.shutdown();
  }
 
  // 行列の列(col1とcol2)を入れ替えた新しい行列を生成するメソッド
  static double[][] changeColumn(double[][] matrix, int col1, int col2) {
    // 列の入れ替え用に配列をコピーする
    double[][] temp_matrix = new double[matrix.length][];
    int j = 0;
    for (double[] m : matrix) temp_matrix[j++] = m.clone();
 
    // 列の入れ替え
    int n = matrix.length;
    for (int i = 0; i < n; i++) {
      double temp = temp_matrix[i][col1];
      temp_matrix[i][col1] = temp_matrix[i][col2];
      temp_matrix[i][col2] = temp;
    }
    return temp_matrix;
  }
}


演算処理結果

どんぐらいはやくなったのか実験
2~13元の1次連立方程式を用意し、それぞれの計算時間を計測した
使用した計算式はこんな感じ
結果
n元
方程式
シングル
スレッド[sec]
マルチ
スレッド[sec]
20.000 0.015
30.000 0.000
40.000 0.000
50.000 0.000
60.000 0.015
70.000 0.015
80.016 0.016
90.172 0.079
101.844 0.919
11 21.81714.468
12284.669139.296
134110.5932399.709
グラフ化すると、
図:マルチ・シングル処理における連立方程式解速度


10元くらいまではそんなに変わらないけど、11から大体半分くらいの速度になるね
15元くらいになると、半日かけても処理が終わらなかった。やはり、高い元を有する連立方程式はやはり掃き出し法が最強か。。

0 件のコメント:

コメントを投稿