Jenkinsのpipelineを使うと、Jenkinsfileとしてジョブの定義・連携をGroovyで柔軟に記述できるようになります。そこでは例えば「変数を用いてノードやステージを横断する連携を実装する」「いろいろな外部スクリプトを実行して最適な実行条件を分析する」といった処理を容易に実装できます。
今回はその具体例として、pytestのテスト実行時間の最適化を行うJenkinsfileを題材にします。
pytestのテスト並列実行の時間を最適化する
対象は、pytestの2並列実行のジョブです。具体的には、Jenkins Pipelineで指定されたgitリポジトリから必要なファイルをチェックアウトし、その中のpytestのテストケースをノードサーバで実行して、結果をプッシュ& Jenkins Test Report出力する処理とします。実現にあたっては、(仮にテストケースが長大であるとして)ノードサーバを2つ用意し、テストを2分割してそれぞれのノードで並列実行させることで、テスト実行時間の短縮を狙います。
このテストの2分割ですが、旧来の静的なジョブ定義では、テストケース数などで事前に静的分割せざるを得ない場合が多いです。一方でJenkins Pipelineを用いると、テスト実行時間が均等に分割されるように、テストケースを動的に2分割する処理を容易に実装できます。
Jenkinsfileの実装
そのJenkinsfileの具体例を示します。ここでは以下を実装しています。
- Prepareステージで、create_test_selection_file.py(実装例は後述)をシェル実行し、その標準出力をtest_selectionに格納します。
- Testステージで、2つのノード:node1、node2にて、pytestのテストケースを並行実行します。pytestには、前ステージで得たtest_selectionを引数として指定します。
- Reportステージで、pytestが生成したテストレポートファイルのプッシュと、Jenkins Test Reportへの出力を行います。
まとめると外に用意した集計スクリプトを呼び出して、ノードで実行するテストケースを動的に選択しています。
Jenkinsfileの実装例:
pipeline { environment { test_selection = "" } agent any stages { stage('Prepare') { steps { script { def output = sh ( script: 'python create_test_selection_file.py', returnStdout: true ).trim() test_selection = output.split(',') } } } stage('Test') { steps { parallel 'automation test':{ node('node1') { checkout scm sh "pytest ${test_selection[0]} --junitxml=testresult/result1.xml" sh "python push_test_report.py testresult" } node('node2') { checkout scm sh "pytest ${test_selection[1]} --junitxml=testresult/result2.xml" sh "python push_test_report.py testresult" } } } } stage('Report'){ steps { git branch: 'main', url: 'git://hoge/integration-test.git' } post { always { junit 'intagration_test/testresult/*.xml' } } } } }
create_test_selection_file.pyの実装
(主題ではないため簡易的なサンプルですが)例えば以下のようなものになります。
import glob import xml.etree.ElementTree as ET import numpy as np NUMBER_OF_TESTNODE = 2 def select_test_by_time(test_files, resultfiles): test_result = {} all_time = 0.0 for file in resultfiles: root = ET.parse(file).getroot() all_time += float(root.attrib['time']) for testcase in root.iter('testcase'): test_result[testcase.attrib['file']] = test_result.get(testcase.attrib['file'], 0) + float(testcase.attrib['time']) testsuite_time = 0.0 node_no = 0 output = "" for key, value in test_result.items(): testsuite_time += value output += key + ' ' if testsuite_time > all_time / NUMBER_OF_TESTNODE and node_no < NUMBER_OF_TESTNODE - 1: testsuite_time = 0 output += "," node_no += 1 output = output.replace('\\','/') print(output) def select_test_by_number(test_files): test_group = list(np.array_split(test_files, NUMBER_OF_TESTNODE)) output = '' for tests in test_group: output += ' '.join(tests) + ',' output = output.replace('\\','/') print(output) def create_test_selection_file(): test_file = 'test/test*.py' input_file = 'testresult/*.xml' test_files = glob.glob(test_file) resultfiles = glob.glob(input_file) if resultfiles: select_test_by_time(test_files, resultfiles) else: select_test_by_number(glob.glob(test_file)) if __name__ == '__main__': create_test_selection_file()