tech.guitarrapc.cóm

Technical updates

PowerShell による同期処理、非同期処理、並列処理 を考えてみる

PowerShellの一番つらいところは、非同期なCmdletやキーワード*1が用意されていないことです。

そこで前回の記事で作成したコードを使って、同期、非同期、並列の3つのパターンに関して見てみましょう。

拙作のPowerShellによるDeployライブラリvalentiaでも大枠は同様に同期、非同期、並列に処理を行っています。

同期処理

サンプルリポジトリは前回同様に以下です。

guitarrapc/PS-WaybackMachineAvailavility | GitHub

まずはコードから見てみましょう。

function Get-WaybackMachineAvailavility
{
    [CmdletBinding()]
    Param
    (
        # Input an uri you want to search.
        [Parameter(
            Mandatory = 1,
            ValueFromPipeline,
            ValueFromPipelineByPropertyName,
            Position=0)]
        [string[]]
        $urls,


        # Input timestamp to obtain closed date you want. Make sure as format 'yyyyMMddHHmmss' or 'yyyy' or 'yyyyMM' or 'yyyyMMdd' or else.('2006' will tring to obtain closed to 2006)
        [Parameter(
            Mandatory = 0,
            Position=1)]
        [string]
        $timestamp,

        # Invoke request with async
        [switch]
        $async
    )

    Begin
    {
        # base settings for query
        $private:baseUri = "http://archive.org/wayback/available"
        $private:baseQuery = "?url="
        $private:timestampQuery = "&timestamp="
    }
    Process
    {
        foreach($url in $urls)
        {
            # build query
            $private:query = "$baseQuery{0}" -f $url | where {$_}

                # validate timestamp parameter for query
                if (-not [string]::IsNullOrWhiteSpace($PSBoundParameters.timestamp))
                {
                    $private:trimTimestampQuery = $PSBoundParameters.timestamp | where {$_}
                    $private:query = "$query{0}{1}" -f $timestampQuery, $trimTimestampQuery
                }

            # build query uri
            $private:queryUri = (@($baseUri,$query) | where { $_ } | % { ([string]$_).Trim('/') } | where { $_ } ) -join '/'

            # invoke request
            Write-Verbose ("trying to collect availability of Wayback Time machine for uri '{0}' from API '{1}'" -f $url, $baseUri)
            Write-Verbose ("Whole query string '{0}'" -f $queryUri)

            # using Invoke-RestMethod
            $private:task = Invoke-RestMethod -Method Get -Uri $queryUri -UserAgent ("PowerShell {0}" -f $PSVersionTable.PSVersion)

            # get reuslt
            $private:result =  $task.archived_snapshots.closest

            # create sorted hashtable to create object
            $obj = [ordered]@{
                available = $result.available
                status = $result.status
                timestamp = $result.timestamp
                url = $result.url
                queryInformation = @{
                    url = $url
                    queryUri = $queryUri
                }
            }

            # create PSObject to output
            $output = New-Object -TypeName PSObject -Property $obj
            $output
        }
    }
}

解説

Begin{} Process{} End{}について

まず目につくのが、Begin{} Process{}です。

これらは、 Param()に記述している$urlsのパラメータValueFromPipelineValueFromPipelineByPropertyNameを処理するために必要です。

ValueFromPipelineValueFromPipelineByPropertyNameは、対象のパラメータをパイプラインから受けることが可能であることを宣言しています。

  • ValueFromPipeline : パイプラインからの入力を自分に割り当てる
  • ValueFromPipelineByPropertyName : パイプラインからの入力でプロパティ名が合致したものを自分に割り当てる

パイプライン越しの入力とは以下のような入力を指します。

# Synchronous pipeline invokation
"http://tech.guitarrapc.com","http://neue.cc" | Get-WaybackMachineAvailavility

このパイプラインからの入力で、繰り返し処理するために利用するのが、Begin{} Process{} End{}です。

  • Begin句の処理は初回に一度だけ実行される
  • 一方で、Process句の処理はパイプラインのたびに実行される
  • そしてここでは使っていませんが、End{}は、全てのProcess{}句の完了後、1回だけ実行される

Begin{} Process{} End{}を用いることで、 パイプラインの入力に関しては自動的に繰り返し実行されます。

Begin{}処理の内容

ここでは、 クエリの基本となる変数を定めています。

Process{}処理の内容

foreach の利用

foreachを利用しています。

これは、 Cmdletにurlsパラメータを直接した場合、Begin{} Process{} End{}では繰り返し処理されないためです。

つまり以下の指定にした場合、 Begin{} Process{} End{}では初めの1アドレスしか処理されません。

Get-WaybackMachineAvailavility -urls "http://tech.guitarrapc.com","http://neue.cc"

そのためforeachを使ってurlsパラメータに直接複数のアドレスを指定しても処理ができるようにしています。

クエリの生成

残りは、 クエリの生成とInvoke-RestMethodでのJSON取得です。

余り使っているのを見かけないのですが、 PowerShellでURIを生成するときは、以下のワンライナーが便利です。

(@($baseUri,$query) | where { $_ } | % { ([string]$_).Trim('/') } | where { $_ } ) -join '/'

こうすることで、パーツに指定した$baseUri$queryが空白だった場合でも正常にURIが生成できます。

Invoke-RestMethod

Invoke-RestMethodは、 Invoke-WebRequestと違い、返却されたJSONを解釈するように内部でごちゃごちゃ処理してくれています。*2

PowerShell 3.0では、対象のJSONによってはヘッダが途中までしか読めないなどRSSリーダーとして利用するには致命的なバグがありました。

が、 PowerShell 4.0でバグが修正され安心して利用できるようになっています。

後は、JSONが格納されたプロパティを指定してあげるだけです。

カスタムオブジェクト合成

最後にカスタムオブジェクトを生成しています。

簡単ですね。要領はこのやり方で残りの処理を見てみましょう。

非同期処理

メインテーマの非同期処理です。

最近になって、 Hey, Scripting Guy! Blog - Weekend Scripter: Max Out PowerShell in a Little Bit of Time—Part 2でも、runspaceを利用した非同期処理に関して紹介されるようになりました。

拙作のvalentiaでも、同様の手法を採用しています。

では、まずは非同期でのコードを見てみましょう。

function Get-WaybackMachineAvailavilityAsync
{
    [CmdletBinding()]
    Param
    (
        # Input an uri you want to search.
        [Parameter(
            Mandatory = 1,
            Position=0)]
        [string[]]
        $urls,


        # Input timestamp to obtain closed date you want. Make sure as format 'yyyyMMddHHmmss' or 'yyyy' or 'yyyyMM' or 'yyyyMMdd' or else.('2006' will tring to obtain closed to 2006)
        [Parameter(
            Mandatory = 0,
            Position=1)]
        [string]
        $timestamp
    )

    try
    {
        # create Runspace
        Write-Debug ("creating runspace for powershell")
        $private:sessionstate = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault()
        $private:minPoolSize = $maxPoolSize = 50 # 50 runspaces
        $private:runspacePool = [runspacefactory]::CreateRunspacePool($minPoolSize, $maxPoolSize,  $sessionstate, $Host) # create Runspace Pool
        $runspacePool.ApartmentState = "STA" # only STA mode supports
        $runspacePool.Open() # open pool


        # start process
        foreach ($url in $urls)
        {
            Write-Debug ("start creating command for '{0}'" -f $url)
            $command = {
                [CmdletBinding()]
                param
                (
                    [parameter(
                        mandatory,
                        position = 0)]
                    [string]
                    $url,

                    [parameter(
                        mandatory = 0,
                        position = 1)]
                    [int]
                    $timestamp,

                    [parameter(
                        mandatory = 0,
                        position = 2)]
                    [string]
                    $VerbosePreference
                )

                # change ErrorActionPreference
                Write-Debug "set continue with error as http client requires dispose when method done."
                $private:originalErrorActionPreference = $ErrorActionPreference
                $ErrorActionPreference = "Continue"

                # base settings for query
                $private:baseUri = "http://archive.org/wayback/available"
                $private:baseQuery = "?url="
                $private:timestampQuery = "&timestamp="

                # build query
                $private:query = "{0}{1}" -f $baseQuery, $url | where {$_}

                    # validate timestamp parameter for query
                    if (-not [string]::IsNullOrWhiteSpace($timestamp))
                    {
                        $private:trimTimestampQuery = $timestamp | where {$_}
                        $private:query = "$query{0}{1}" -f $timestampQuery, $trimTimestampQuery
                    }

                # build query uri
                $private:queryUri = (@($baseUri,$query) | where { $_ } | % { ([string]$_).Trim('/') } | where { $_ } ) -join '/'

                # Load Assembly to use HttpClient
                try
                {
                    Add-Type -AssemblyName System.Net.Http
                }
                catch
                {
                }

                # new HttpClient
                $httpClient = New-Object -TypeName System.Net.Http.HttpClient
                $httpClient.BaseAddress = $private:baseUri

                # invoke http client request
                Write-Verbose ("trying to collect availability of Wayback Time machine for uri '{0}' from API '{1}'" -f $url, $baseUri)
                Write-Verbose ("Whole query string '{0}'" -f $queryUri)
                $private:task = $httpClient.GetStringAsync($queryUri)
                $task.wait()

                # return result
                $task

                # dispose HttpClient
                $httpClient.Dispose()

                # reverse ErrorActionPreference
                $ErrorActionPreference = $originalErrorActionPreference
            }

            # Verbose settings for Async Command inside
            Write-Debug "set VerbosePreference inside Asynchronous execution"
            if ($PSBoundParameters.Verbose.IsPresent)
            {
                $private:verbose = "continue"
            }
            else
            {
                $private:verbose = $VerbosePreference
            }

            # Main Invokation
            Write-Debug "start asynchronous invokation"
            $private:powershell = [PowerShell]::Create().AddScript($command).AddArgument($url).AddArgument($timestamp).AddArgument($verbose)
            $powershell.RunspacePool = $runspacePool
            [array]$private:RunspaceCollection += New-Object -TypeName PSObject -Property @{
                Runspace = $powershell.BeginInvoke();
                powershell = $powershell
            }
        }


        # check process result
        Write-Debug "check asynchronos execution has done"
        while (($runspaceCollection.RunSpace | sort IsCompleted -Unique).IsCompleted -ne $true)
        {
            sleep -Milliseconds 5
        }

        # get process result and end powershell session
        Write-Debug "obtain process result"
        foreach ($runspace in $runspaceCollection)
        {
            # obtain Asynchronos command result
            $private:task = $runspace.powershell.EndInvoke($runspace.Runspace)

            # show result
            if ($task.IsCompleted)
            {
                # get reuslt
                $private:result = ($task.Result | ConvertFrom-Json).archived_snapshots.closest
                # create sorted hashtable to create object
                $private:obj = [ordered]@{
                    available = $result.available
                    status = $result.status
                    timestamp = $result.timestamp
                    url = $result.url
                    queryInformation = @{
                        url = $url
                        queryUri = $queryUri
                    }
                }

                # create PSObject to output
                $private:output = New-Object -TypeName PSObject -Property $obj

                # return result into host
                $output
            }

            # Dispose pipeline
            $runspace.powershell.Dispose()
        }
    }
    finally
    {
        # Dispose Runspace
        $runspacePool.Dispose()
    }
}

解説

本題の非同期処理について説明します。非同期処理の流れです。

  1. RunspacePoolの生成
  2. RunspacePoolのオープン
  3. ターゲットごとに処理開始(非同期)
  4. ターゲットごとに実行するコマンドを生成(非同期)
  5. ターゲットごとにPowerShellインスタンスを生成(非同期)
  6. ターゲットごとに生成したPowerShellインスタンスへコマンドや引数を渡す(非同期)
  7. ターゲットごとに生成したPowerShellインスタンスへRunspaceを割り当てる(非同期)
  8. ターゲットごとに生成したPowerShellインスタンスを実行(非同期)
  9. 実行したコマンドの状態を監視(非同期)
  10. PowerShellインスタンスごとにコマンド実行結果を取得
  11. 生成したPowerShellインスタンスとRunspaceのクリーンアップ

Begin{} Process{} End{}について

パイプラインからのストリーム入力は1本です。そのためPipelineを使って非同期に処理できません。

これが、非同期処理でBegin{} Process{} End{}を用いたパイプライン入力をサポートしない理由です。

非同期のポイントは RunspacePoolの生成

try
{
    # create Runspace
    Write-Debug ("creating runspace for powershell")
    $private:sessionstate = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault()
    $private:minPoolSize = $maxPoolSize = 50 # 50 runspaces
    $private:runspacePool = [runspacefactory]::CreateRunspacePool($minPoolSize, $maxPoolSize,  $sessionstate, $Host) # create Runspace Pool
    $runspacePool.ApartmentState = "STA" # only STA mode supports

PowerShellでの非同期処理のポイントは[RunspaceFactory]を生成してマルチスレッドを実現することにあります。

単純にRunspaceを生成するだけなら、以下で行えます。

$RunspacePool = [RunspaceFactory]::CreateRunspacePool()

が、SessionStatePoolSizeを指定することでRunspacePoolサイズのハンドルが可能です。

CreateRunspacePool Methodについて、詳しくはDeveloper Networkをどうぞ。

Developer Network - RunspaceFactory.CreateRunspacePool Method.ASPX)

また、ここで生成したRunspacePoolは処理の成功如何にかかわりなく、破棄が必要なため、try{}finally{}としています。

RunspacePoolのオープン

$runspacePool.Open() # open pool

必要なサイズのRunspaceを生成したら、.Open()します。

コラム : Job は非同期ではない

時々、「Jobを使った非同期?」という謎の記事を見ることがありますが、これには日頃疑念を抱いています。

PowerShell 2.0において、Jobの考えや手法が導入されましたが、これを利用して非同期処理することは困難を極めます。なぜならStart-Job-AsJobが行っているのは、バックグラウンドに別のRunspaceを1つ生成して、バックグラウンド処理を行っているだけです。

つまり、Jobはマルチスレッドでないため、表面上ホスト処理を止めないだけで処理自体の高速化はされません。非同期でないことは計測すれば一目両全です。

もちろん、使い方次第で、通常の処理をバックグラウンドに回すことでロスをなくし高速化を果たすことは可能です。Jobの使いどころはそれほど多くありませんが、ハマルと気軽に使えて便利です。

foreach で 各urlを処理する

# start process
foreach ($url in $urls)
{

RunSpaceに入れた処理は、バックグラウンドで行われます。

そのため、foreachで順次実行してもそれほどのロスにはなりません。*3

Runspaceで実行する Command の生成

Runspaceには、PowerShellインスタンスへの.AddScript()メソッドで処理を差し込みます。

この.AddScript()メソッドは、ScriptBlockを受けられるので、実行したいScriptBlockを生成しましょう。

クエリの生成

Write-Debug ("start creating command for '{0}'" -f $url)
$command = {
[CmdletBinding()]
param
(
    [parameter(
        mandatory,
        position = 0)]
    [string]
    $url,

    [parameter(
        mandatory = 0,
        position = 1)]
    [int]
    $timestamp,

    [parameter(
        mandatory = 0,
        position = 2)]
    [string]
    $VerbosePreference
)

# change ErrorActionPreference
Write-Debug "set continue with error as http client requires dispose when method done."
$private:originalErrorActionPreference = $ErrorActionPreference
$ErrorActionPreference = "Continue"

# base settings for query
$private:baseUri = "http://archive.org/wayback/available"
$private:baseQuery = "?url="
$private:timestampQuery = "&timestamp="

# build query
$private:query = "{0}{1}" -f $baseQuery, $url | where {$_}

    # validate timestamp parameter for query
    if (-not [string]::IsNullOrWhiteSpace($timestamp))
    {
        $private:trimTimestampQuery = $timestamp | where {$_}
        $private:query = "$query{0}{1}" -f $timestampQuery, $trimTimestampQuery
    }

# build query uri
$private:queryUri = (@($baseUri,$query) | where { $_ } | % { ([string]$_).Trim('/') } | where { $_ } ) -join '/'

先の同期処理の内容同様に、クエリを生成しています。

但し、対象のurlは、それぞれ違うので、 param()でパラメータを指定しています。 仮にparam()を利用しなかった場合は、args[0]などで代りに受けることが可能です。

HttpClientの利用

# Load Assembly to use HttpClient
try
{
    Add-Type -AssemblyName System.Net.Http
}
catch
{
}

# new HttpClient
$httpClient = New-Object -TypeName System.Net.Http.HttpClient
$httpClient.BaseAddress = $private:baseUri

# invoke http client request
Write-Verbose ("trying to collect availability of Wayback Time machine for uri '{0}' from API '{1}'" -f $url, $baseUri)
Write-Verbose ("Whole query string '{0}'" -f $queryUri)
$private:task = $httpClient.GetStringAsync($queryUri)
$task.wait()

# return result
$task

# dispose HttpClient
$httpClient.Dispose()

# reverse ErrorActionPreference
$ErrorActionPreference = $originalErrorActionPreference

せっかくなのでここでは、.NETに追加されたHttpClientをPowerShellで利用しています。

もうWebRequestより各段に便利! です。が、PowerShellにasync/awaitがないのでメソッドがもったいないような気もしないではないけど便利です。

HttpClinetの処理後は、生成したインスタンスを.Dispose()しておきます。*4

ErrorPreferenceの変更と差し戻し

先のクエリ生成前の冒頭で、ErrorPreferenceをContinueに明示的に指定しています。

これは、非同期のどの処理でもエラーが出ても継続させたいからです。

もしエラーで止める場合は、Stopを指定して、try{}catch{}などで補足可能です。

# change ErrorActionPreference
Write-Debug "set continue with error as http client requires dispose when method done."
$private:originalErrorActionPreference = $ErrorActionPreference
$ErrorActionPreference = "Continue"

コマンドの最後で、元のErrorPreferenceに戻しています。

# reverse ErrorActionPreference
$ErrorActionPreference = $originalErrorActionPreference

非同期で処理するコマンドでもVerboseで動作させる

# Verbose settings for Async Command inside
Write-Debug "set VerbosePreference inside Asynchronous execution"
if ($PSBoundParameters.Verbose.IsPresent)
{
    $private:verbose = "continue"
}
else
{
    $private:verbose = $VerbosePreference
}

例え、Get-WaybackMachineAvailavilityAsync-Verboseスイッチ付きで実行しても、Runspaceで実行するコマンドがVerboseにならないのは自明です。

上記の処理で、Get-WaybackMachineAvailavilityAsync-Verboseスイッチが利用された時の状態を取得しています。

非同期で処理するPowershellインスタンスを生成と実行

# Main Invokation
Write-Debug "start asynchronous invokation"
$private:powershell = [PowerShell]::Create().AddScript($command).AddArgument($url).AddArgument($timestamp).AddArgument($verbose)
$powershell.RunspacePool = $runspacePool
[array]$private:RunspaceCollection += New-Object -TypeName PSObject -Property @{
    Runspace = $powershell.BeginInvoke();
    powershell = $powershell
}

ここが、非同期実行のメイン処理です。*5

非同期で処理するコマンドにScriptBlockと引数とVerboseを含める

生成したRunspacePoolを実行するPowerShell Instanceを生成します。

[PowerShell]::Create()

ここに.AddScript()メソッドで、先ほど生成したScriptBlockを差し込みます。

[PowerShell]::Create().AddScript($command)

さらに.AddArgument()メソッドにて.AddScript()で差し込んだScriptBlockに渡したいパラメータを渡します。

[PowerShell]::Create().AddScript($command).AddArgument($url).AddArgument($timestamp).AddArgument($verbose)

コマンドの生成後は、生成したPowerShellインスタンスにRunspacePoolを割り当てます。

$powershell.RunspacePool = $runspacePool

準備ができたPowershellインスタンスを実行する際は、.BeginInvoke()メソッドを利用します。

また後々、PowerShellインスタンスを.Dispose()するために、オブジェクトで保持します。

New-Object -TypeName PSObject -Property @{
    Runspace = $powershell.BeginInvoke();
    powershell = $powershell
}

この処理は、各urlに対して行われます。全urlでの実行結果を受けるため、ここでは[array]で受けています。もはや[System.Collection]ArrayListHashTableは産廃*6らしいのでPowerShellとしては辛いですね。

[array]$private:RunspaceCollection += New-Object -TypeName PSObject -Property @{
    Runspace = $powershell.BeginInvoke();
    powershell = $powershell
}

非同期処理の状態を監視する

# check process result
Write-Debug "check asynchronos execution has done"
while (($runspaceCollection.RunSpace | sort IsCompleted -Unique).IsCompleted -ne $true)
{
    sleep -Milliseconds 5
}

非同期処理の管理は、先ほどコマンド実行結果したオブジェクトのIsCompletedプロパティで判断できます。

ここでは、全体をユニークソートして、全てが$trueになったかで判定しています。

なお、Select -UniqueGet-Uniqueは本当に使えないのでお気を付けください。*7

非同期処理の結果取得

# get process result and end powershell session
Write-Debug "obtain process result"
foreach ($runspace in $runspaceCollection)
{
    # obtain Asynchronos command result
    $private:task = $runspace.powershell.EndInvoke($runspace.Runspace)

    # show result
    if ($task.IsCompleted)
    {
        # get reuslt
        $private:result = ($task.Result | ConvertFrom-Json).archived_snapshots.closest
        # create sorted hashtable to create object
        $private:obj = [ordered]@{
            available = $result.available
            status = $result.status
            timestamp = $result.timestamp
            url = $result.url
            queryInformation = @{
                url = $url
                queryUri = $queryUri
            }
        }

        # create PSObject to output
        $private:output = New-Object -TypeName PSObject -Property $obj

        # return result into host
        $output
    }

    # Dispose pipeline
    $runspace.powershell.Dispose()
}

ここで、非同期処理を行った結果をホストに取得しています。

各RunSpace結果を順次取得する

foreach ($runspace in $runspaceCollection)

各Urlに対して行った結果を格納したRunspaceCollection変数からRunspaceを取得します。

.EndInvoke()することでコマンド実行結果を取得する

# obtain Asynchronos command result
$private:task = $runspace.powershell.EndInvoke($runspace.Runspace)

コマンドの実行は、.BeginInvoke()メソッドでした。

コマンドの実行結果取得は、.EndInvoke()メソッドです。

このメソッド実行時に、通常のホスト画面でコマンドを実行したように、非同期で各RunSpaceにて実行したPowerShellインスタンス結果が取得できます。

コマンド実行後の取得結果JSONをカスタムオブジェクトに格納する

# show result
if ($task.IsCompleted)
{
    # get reuslt
    $private:result = ($task.Result | ConvertFrom-Json).archived_snapshots.closest
    # create sorted hashtable to create object
    $private:obj = [ordered]@{
        available = $result.available
        status = $result.status
        timestamp = $result.timestamp
        url = $result.url
        queryInformation = @{
            url = $url
            queryUri = $queryUri
        }
    }

    # create PSObject to output
    $private:output = New-Object -TypeName PSObject -Property $obj

    # return result into host
    $output
}

同期処理では、Invoke-RestMethodを利用したので、JSON結果が自動的にオブジェクトに変換されました。

が、今回は、HttpClientを利用しているので、自前で変換します。

JSONからオブジェクト変換は、PowerShell 3.0から追加されたConvertFrom-Jsonが便利です。

PowerShellインスタンスの破棄

# Dispose pipeline
$runspace.powershell.Dispose()

RunSpaceごとに取得結果をオブジェクトに合成、出力したらもうPowerShellインスタンスは不要です。

.Dispose()して挙げましょう。

これで、PowerShellインスタンスによる処理が終了したので、try{}も完了です。

Finally{}とRunspacePoolの破棄

finally
{
    # Dispose Runspace
    $runspacePool.Dispose()
}

生成したRunSpacePoolも、全ての処理が完了したら破棄します。

finally{}にしておくことで、PowerShellインスタンスのエラーが起こっても生成したRunspacePoolは破棄されます。

並列処理

いよいよ最後です。

まずはコードから見てみましょう。

workflow Get-WaybackMachineAvailavilityParallel
{
    [CmdletBinding()]
    Param
    (
        # Input an uri you want to search.
        [Parameter(
            Mandatory = 1,
            Position=0)]
        [string[]]
        $urls,


        # Input timestamp to obtain closed date you want. Make sure as format 'yyyyMMddHHmmss' or 'yyyy' or 'yyyyMM' or 'yyyyMMdd' or else.('2006' will tring to obtain closed to 2006)
        [Parameter(
            Mandatory = 0,
            Position=1)]
        [string]
        $timestamp
    )

    # base settings for query
    $baseUri = "http://archive.org/wayback/available"
    $baseQuery = "?url="
    $timestampQuery = "&timestamp="


    # start process
    foreach -parallel ($url in $urls)
    {
        Write-Debug ("start creating command for '{0}'" -f $url)

        # build query
        $query = "$baseQuery{0}" -f ($url | where {$_})

        # validate timestamp parameter for query
        if (-not [string]::IsNullOrWhiteSpace($timestamp))
        {
            $trimTimestampQuery = $timestamp | where {$_}
            $query = "$query{0}{1}" -f $timestampQuery, $trimTimestampQuery
        }

        # build query uri
        $queryUri = (@($baseUri,$query) | where { $_ } | % { ([string]$_).Trim('/') } | where { $_ } ) -join '/'

        # invoke request
        Write-Verbose -Message ("trying to collect availability of Wayback Time machine for uri '{0}' from API '{1}'" -f $url, $baseUri)
        Write-Verbose -Message ("Whole query string '{0}'" -f $queryUri)

        # using Invoke-RestMethod
        $task = Invoke-RestMethod -Method Get -Uri $queryUri -UserAgent ("PowerShell {0}" -f $PSVersionTable.PSVersion)

        # get reuslt
        $result =  $task.archived_snapshots.closest

        # create sorted hashtable to create object
        $obj = [ordered]@{
            available = $result.available
            status = $result.status
            timestamp = $result.timestamp
            url = $result.url
            queryInformation = @{
                url = $url
                queryUri = $queryUri
            }
        }

        # create PSObject to output
        $output = New-Object -TypeName PSObject -Property $obj
        $output
    }
}

解説

並列処理には、Workflowを利用します。

Workflowは、構文はほぼfunctionと変わらず利用できて便利です

が、非同期ではなく並列処理なので劇的な速度改善にはなりません。くれぐれも過信されないようにご注意ください。

Begin{} Process{} End{}について

Workflowなのでそもそもこれらのキーワードは使えません。

また、非同期同様の理由でいずれにしても不可です。

クエリの基本変数を設定

    # base settings for query
    $baseUri = "http://archive.org/wayback/available"
    $baseQuery = "?url="
    $timestampQuery = "&timestamp="

クエリの基本となる変数を定めています。

Workflow の foreach -parallel を利用した並列処理

foreach -parallel ($url in $urls)

Workflowに関しては、以前にもPowerShell における Windows Workflow Foundation 4.0 (WF) 利用のすすめとして記事にしています。

ここで記載した、 foreach -parallelが、並列処理となります。

-parallelパラメータを指定すると、シングルスレッドですが並列に順不同で5本実行されます。

非同期処理は、処理対象はforeachで渡していたので順次実行です。ここも今回紹介した並列実行と非同期実行では異なります。

詳しくは、縄神様の解説をご参照ください。

てすとぶろぐ - ワークフロー上のアクティビティを非同期に複数動作させるには てすとぶろぐ - PowerShell 3.0 の ForEach –parallel はマルチスレッドではない

foreach -parallelの中身は、同期処理と同様

Write-Debug ("start creating command for '{0}'" -f $url)

# build query
$query = "$baseQuery{0}" -f ($url | where {$_})

# validate timestamp parameter for query
if (-not [string]::IsNullOrWhiteSpace($timestamp))
{
    $trimTimestampQuery = $timestamp | where {$_}
    $query = "$query{0}{1}" -f $timestampQuery, $trimTimestampQuery
}

# build query uri
$queryUri = (@($baseUri,$query) | where { $_ } | % { ([string]$_).Trim('/') } | where { $_ } ) -join '/'

# invoke request
Write-Verbose -Message ("trying to collect availability of Wayback Time machine for uri '{0}' from API '{1}'" -f $url, $baseUri)
Write-Verbose -Message ("Whole query string '{0}'" -f $queryUri)

# using Invoke-RestMethod
$task = Invoke-RestMethod -Method Get -Uri $queryUri -UserAgent ("PowerShell {0}" -f $PSVersionTable.PSVersion)

# get reuslt
$result =  $task.archived_snapshots.closest

# create sorted hashtable to create object
$obj = [ordered]@{
    available = $result.available
    status = $result.status
    timestamp = $result.timestamp
    url = $result.url
    queryInformation = @{
        url = $url
        queryUri = $queryUri
    }
}

# create PSObject to output
$output = New-Object -TypeName PSObject -Property $obj
$output

もはや同期処理と変わりません。

今回は、Workflowの制限に当たるCmdletも利用しなかったので、InlineScript{}も利用していません。

このfunctionとの互換性で並列実行が可能になるのが、PowerShell Workflowのメリットの1つですね。*8

まとめ

PowerShellにおける非同期実行はRunspaceを利用することが現在の主流です。

よりよいやり方は常に模索していますが、何かヒントがあればうかがえると喜びます。

対象の数による処理速度の違いは次の通りです。

処理 対象処理速度(少) 対象処理速度(多) 備考
同期 X 対象の数に比例して時間がかかる
非同期 対象が多くてもRunspaceに割り当てて最大限非同期に処理するため、対象が多いと最速
並列 並列5本ずつ実行する。

セッションの保持の違いによる、リモートホストへの実行速度

リモートホストへの実行の場合、セッションの持ち方の違いで、セッションを持ったままの同期/並列、とセッションが毎回切れる非同期で少し違いがあります。

処理 処理速度(初回) 処理速度(2回目) 備考
同期 セッションを維持していた場合、2回目以降は接続は高速
非同期 初回の同期処理とほぼ同程度。2回目以降も初回とずっと変わらない*9
並列 初回の接続は同期処理より若干時間がかかるが、二回目以降はローカル並の爆速

*1:C#のasync/awaitなど

*2:言い方悪い

*3:数にもよりますが

*4:usingないの辛いです

*5:あえて1つのfunctionに押し込めたせい読みにくい

*6:要出典

*7:この2ついらない

*8:状態保持や、アクティビティの自由な組み合わせこそが一番のメリットです

*9:セッションは毎回切れるので速度に変化がでない