diff --git a/.gitignore b/.gitignore index 57a1574..6c86302 100644 --- a/.gitignore +++ b/.gitignore @@ -194,3 +194,4 @@ FakesAssemblies/ # Visual Studio 6 workspace options file *.opt +*.db diff --git a/PSParallel.sln b/PSParallel.sln index 334a019..6c6636f 100644 --- a/PSParallel.sln +++ b/PSParallel.sln @@ -21,6 +21,7 @@ EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "module", "module", "{9C879DF2-D1E2-4143-A95A-2374F8650F48}" ProjectSection(SolutionItems) = preProject module\en-US\about_PSParallel.Help.txt = module\en-US\about_PSParallel.Help.txt + module\en-US\PSParallel.dll-Help.xml = module\en-US\PSParallel.dll-Help.xml module\PSParallel.psd1 = module\PSParallel.psd1 EndProjectSection EndProject diff --git a/README.md b/README.md index bd66059..afe6ec8 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,9 @@ # PSParallel + Invoke scriptblocks in parallel runspaces -##Installation +## Installation + ```PowerShell Install-Module PSParallel ``` @@ -11,14 +13,15 @@ Install-Module PSParallel (1..255).Foreach{"192.168.0.$_"} | Invoke-Parallel { [PSCustomObject] @{IP=$_;Result=ping.exe -4 -a -w 20 $_}} ``` -Variables are captured from the parent session but functions are not. +Variables and functions are captured from the parent session. + +## Throttling -##Throttling To control the degree of parallelism, i.e. the number of concurrent runspaces, use the -ThrottleLimit parameter ```PowerShell # process lots of crash dumps -Get-ChildItem -recurce *.dmp | Invoke-Parallel -ThrottleLimit 64 -ProgressActivity "Processing dumps" { +Get-ChildItem -Recurse *.dmp | Invoke-Parallel -ThrottleLimit 64 -ProgressActivity "Processing dumps" { [PSCustomObject] @{ Dump=$_; Analysis = cdb.exe -z $_.fullname -c '"!analyze -v;q"' } ``` @@ -27,9 +30,11 @@ The overhead of spinning up new PowerShell classes is non-zero. Invoke-Parallel ![image](https://github.com/powercode/PSParallel/raw/master/images/Invoke-Parallel.png) -##Contributions +## Contributions + Pull requests and/or suggestions are more than welcome. -###Acknowlegementes +### Acknowledgements + The idea and the basis for the implementation comes from [RamblingCookieMonster](https://github.com/RamblingCookieMonster). -Cudos for that implementation also goes to Boe Prox(@proxb) and Sergei Vorobev(@xvorsx). \ No newline at end of file +Kudos for that implementation also goes to Boe Prox(@proxb) and Sergei Vorobev(@xvorsx). diff --git a/module/PSParallel.psd1 b/module/PSParallel.psd1 index 626174c..85a0627 100644 Binary files a/module/PSParallel.psd1 and b/module/PSParallel.psd1 differ diff --git a/module/en-US/PSParallel.dll-Help.xml b/module/en-US/PSParallel.dll-Help.xml index 2c825fa..931a439 100644 --- a/module/en-US/PSParallel.dll-Help.xml +++ b/module/en-US/PSParallel.dll-Help.xml @@ -1,14 +1,14 @@ - - + + - + - + Invoke-Parallel @@ -22,21 +22,21 @@ - Th + The cmdlet uses a RunspacePool an and invokes the provied scriptblock once for each input. + +To control the environment of the scriptblock, the ImportModule, ImportVariable and ImportFunction parameters can be use +d. - + Invoke-Parallel - + ScriptBlock - - + Specifies the operation that is performed on each input object. Enter a script block that describes the operation. ScriptBlock - - ParentProgressId @@ -44,99 +44,108 @@ Identifies the parent activity of the current activity. Use the value -1 if the current activity has no parent activity. Int32 - - ProgressId - Specifies an ID that distinguishes each progress bar from the others. Use this parameter when you are creating more than one progress bar in a single command. If the progress bars do not have different IDs, they are superimposed instead of being displayed in a series. + Specifies an ID that distinguishes each progress bar from the others. Use this parameter when you are creating more than + one progress bar in a single command. If the progress bars do not have different IDs, they are superimposed instead of +being displayed in a series. Int32 - - ProgressActivity - Specifies the first line of progress text in the heading above the status bar. This text describes the activity whose progress is being reported. + Specifies the first line of progress text in the heading above the status bar. This text describes the activity whose pr +ogress is being reported. String - - - + ThrottleLimit - Specifies the maximum number of concurrent connections that can be established to run this command. If you omit this parameter or enter a value of 0, the default value, 16, is used. + Specifies the maximum number of concurrent connections that can be established to run this command. If you omit this +parameter or enter a value of 0, the default value, 16, is used. Int32 - - + + InitialSessionState + + The session state used by the runspaces when invoking ScriptBlock. This provides the functions, variables, drives, etc +available to the ScriptBlock. +By default, InitialSessionState.Create2() is used and the functions and variables from the current scope is then +imported. + + InitialSessionState + + InputObject - Specifies the input objects. Invoke-Parallel runs the script block on each input object in parallel. Enter a variable that contains the objects, or type a command or expression that gets the objects. + Specifies the input objects. Invoke-Parallel runs the script block on each input object in parallel. Enter a variable th +at contains the objects, or type a command or expression that gets the objects. PSObject - - Invoke-Parallel - + ScriptBlock - - + Specifies the operation that is performed on each input object. Enter a script block that describes the operation. - ScriptBlock - - + ScriptBlock - + ThrottleLimit - Specifies the maximum number of concurrent connections that can be established to run this command. If you omit this parameter or enter a value of 0, the default value, 16, is used. + Specifies the maximum number of concurrent connections that can be established to run this command. If you omit this +parameter or enter a value of 0, the default value, 16, is used. - Int32 - + Int32 - + InputObject - Specifies the input objects. Invoke-Parallel runs the script block on each input object in parallel. Enter a variable that contains the objects, or type a command or expression that gets the objects. + Specifies the input objects. Invoke-Parallel runs the script block on each input object in parallel. Enter a variable th +at contains the objects, or type a command or expression that gets the objects. - PSObject - - + PSObject - + NoProgress - Will now show progress from Invoke-Progress. Progress from the scriptblock will still be displayed. + Will not show progress from Invoke-Progress. Progress from the scriptblock will still be displayed. - SwitchParameter - + SwitchParameter + + + InitialSessionState + + The session state used by the runspaces when invoking ScriptBlock. This provides the functions, variables, drives, etc +available to the ScriptBlock. +By default, InitialSessionState.Create2() is used and the functions and variables from the current scope is then +imported. + + InitialSessionState - - + + ScriptBlock - - + Specifies the operation that is performed on each input object. Enter a script block that describes the operation. ScriptBlock ScriptBlock - - + ParentProgressId @@ -148,39 +157,40 @@ Int32 - - + ProgressId - Specifies an ID that distinguishes each progress bar from the others. Use this parameter when you are creating more than one progress bar in a single command. If the progress bars do not have different IDs, they are superimposed instead of being displayed in a series. + Specifies an ID that distinguishes each progress bar from the others. Use this parameter when you are creating more than + one progress bar in a single command. If the progress bars do not have different IDs, they are superimposed instead of +being displayed in a series. Int32 Int32 - - + ProgressActivity - Specifies the first line of progress text in the heading above the status bar. This text describes the activity whose progress is being reported. + Specifies the first line of progress text in the heading above the status bar. This text describes the activity whose pr +ogress is being reported. String String - - + - + ThrottleLimit - Specifies the maximum number of concurrent connections that can be established to run this command. If you omit this parameter or enter a value of 0, the default value, 16, is used. + Specifies the maximum number of concurrent connections that can be established to run this command. If you omit this +parameter or enter a value of 0, the default value, 16, is used. Int32 @@ -189,23 +199,23 @@ - + InputObject - Specifies the input objects. Invoke-Parallel runs the script block on each input object in parallel. Enter a variable that contains the objects, or type a command or expression that gets the objects. + Specifies the input objects. Invoke-Parallel runs the script block on each input object in parallel. Enter a variable th +at contains the objects, or type a command or expression that gets the objects. PSObject PSObject - - + NoProgress - Will now show progress from Invoke-Progress. Progress from the scriptblock will still be displayed. + Will not show progress from Invoke-Progress. Progress from the scriptblock will still be displayed. SwitchParameter @@ -214,46 +224,66 @@ - - - - + + InitialSessionState + + The session state used by the runspaces when invoking ScriptBlock. This provides the functions, variables, drives, etc +available to the ScriptBlock. +By default, InitialSessionState.Create2() is used and the functions and variables from the current scope is then +imported. + + InitialSessionState - System.Management.Automation.PSObject - + InitialSessionState + + + + Parameter8 - - - - - + + + + + + + - System.Object + System.Management.Automation.PSObject + + + + + + + + + - - + + - + -------------------------- EXAMPLE 1 -------------------------- PS C:\> - 1..256 | Invoke-Parallel {$ip = 192.168.0.$_; $res = ping.exe -v4 -w20 $ip; [PSCustomObject] @{IP=$ip;Res=$res}} + (1..255).ForEach{"192.168.0.$_"} | + Invoke-Parallel {$ip = [IPAddress]$_; $res = ping.exe -n 1 -4 -w 20 -a $_; [PSCustomObject] @{IP=$ip;Res=$res}} -ThrottleLimit 64 - - + This example pings all iP v4 addresses on a subnet, specifying Throttlelimit to 64, i.e. running up to 64 runspaces +in parallel. - + \ No newline at end of file diff --git a/scripts/Install.ps1 b/scripts/Install.ps1 index ff1c97f..ac7fa11 100644 --- a/scripts/Install.ps1 +++ b/scripts/Install.ps1 @@ -1,10 +1,18 @@ -param([string]$InstallDirectory) +$manPath = Get-ChildItem -recurse $PSScriptRoot/../module -include *.psd1 | Select-Object -first 1 +$man = Test-ModuleManifest $manPath -$rootDir = Split-Path (Split-Path $MyInvocation.MyCommand.Path) +$name = $man.Name +[string]$version = $man.Version +$moduleSourceDir = "$PSScriptRoot/$name" +$moduleDir = "~/documents/WindowsPowerShell/Modules/$name/$version/" + +[string]$rootDir = Resolve-Path $PSSCriptRoot/.. + +$InstallDirectory = $moduleDir if ('' -eq $InstallDirectory) { - $personalModules = Join-Path -Path ([Environment]::GetFolderPath('MyDocuments')) -ChildPath WindowsPowerShell\Modules + $personalModules = Join-Path -Path ([Environment]::GetFolderPath('MyDocuments')) -ChildPath WindowsPowerShell\Modules\ if (($env:PSModulePath -split ';') -notcontains $personalModules) { Write-Warning "$personalModules is not in `$env:PSModulePath" @@ -18,29 +26,36 @@ if ('' -eq $InstallDirectory) $InstallDirectory = Join-Path -Path $personalModules -ChildPath PSParallel } -if (!(Test-Path $InstallDirectory)) +if(-not (Test-Path $InstallDirectory)) { - $null = mkdir $InstallDirectory + $null = mkdir $InstallDirectory } - -$moduleFileList = @( - 'PSParallel.psd1' - 'en-US\PSParallel.dll-Help.xml' - 'en-US\about_PSParallel.Help.txt' - -) -$binaryFileList = 'src\PsParallel\bin\Release\PSParallel.dll' - - - -$binaryFileList | foreach { Copy-Item "$rootDir\$_" -Destination $InstallDirectory } -$moduleFileList | foreach {Copy-Item "$rootdir\module\$_" -Destination $InstallDirectory\$_ } +@( + 'module\PSParallel.psd1' + 'src\PsParallel\bin\Release\PSParallel.dll' +).Foreach{Copy-Item "$rootdir\$_" -Destination $InstallDirectory } + +$lang = @('en-us') + +$lang.Foreach{ + $lang = $_ + $langDir = "$InstallDirectory\$lang" + if(-not (Test-Path $langDir)) + { + $null = MkDir $langDir + } + + @( + 'PSParallel.dll-Help.xml' + 'about_PSParallel.Help.txt' + ).Foreach{Copy-Item "$rootDir\module\$lang\$_" -Destination $langDir} +} Get-ChildItem -Recurse -Path $InstallDirectory -$cert = Get-Item Cert:\CurrentUser\My\98D6087848D1213F20149ADFE698473429A9B15D -Get-ChildItem -File $InstallDirectory | Set-AuthenticodeSignature -Certificate $cert - - - \ No newline at end of file +$cert =Get-ChildItem cert:\CurrentUser\My -CodeSigningCert +if($cert) +{ + Get-ChildItem -File $InstallDirectory -Include *.dll,*.psd1 -Recurse | Set-AuthenticodeSignature -Certificate $cert -TimestampServer http://timestamp.verisign.com/scripts/timstamp.dll +} diff --git a/scripts/Publish-ToGallery.ps1 b/scripts/Publish-ToGallery.ps1 index 33a1b26..8aff037 100644 --- a/scripts/Publish-ToGallery.ps1 +++ b/scripts/Publish-ToGallery.ps1 @@ -1,11 +1,13 @@ +$manPath = Get-ChildItem -recurse $PSScriptRoot/../module -include *.psd1 | Select-Object -first 1 +$man = Test-ModuleManifest $manPath + +$name = $man.Name +[string]$version = $man.Version + $p = @{ - Name = "PSParallel" + Name = $name NuGetApiKey = $NuGetApiKey - LicenseUri = "https://github.com/powercode/PSParallel/blob/master/LICENSE" - IconUri = "https://github.com/powercode/PSParallel/blob/master/images/PSParallel_icon.png" - Tag = "Parallel","Runspace","Invoke","Foreach" - ReleaseNote = "Adding authenticode signature." - ProjectUri = "https://github.com/powercode/PSParallel" + RequiredVersion = $version } Publish-Module @p diff --git a/scripts/dbg.ps1 b/scripts/dbg.ps1 new file mode 100644 index 0000000..ed4c670 --- /dev/null +++ b/scripts/dbg.ps1 @@ -0,0 +1,58 @@ +param( + [int] $ThrottleLimit = 3, + [int] $Milliseconds = 500 +) + +Import-Module PSParallel -RequiredVersion 2.2.1 + +function new-philosopher { + param($name, [string[]] $treats) + [PSCustomObject] @{ + Name = $name + Treats = $treats + } +} + + +$philosopherData = @( + new-philosopher 'Immanuel Kant' 'was a real pissant','who where very rarely stable' + new-philosopher 'Heidegger' 'was a boozy beggar', 'Who could think you under the table' + new-philosopher 'David Hume' 'could out-consume Schopenhauer and Hegel' + new-philosopher 'Wittgenstein' 'was a beery swine', 'Who was just as sloshed as Schlegel' + new-philosopher 'John Stuart Mill' 'of his own free will', 'On half a pint of shandy was particularly ill' + new-philosopher 'Nietzsche' 'There''s nothing Nietzsche couldn''t teach ya', 'Bout the raising of the wrist.' + new-philosopher 'Plato' 'they say, could stick it away', 'Half a crate of whiskey every day' + new-philosopher 'Aristotle' 'was a bugger for the bottle' + new-philosopher 'Hobbes' 'was fond of his dram' + new-philosopher 'Rene Descartes' 'was a drunken fart:', 'I drink, therefore I am' + new-philosopher 'Socrates' 'is particularly missed','A lovely little thinker but a bugger when he''s pissed!' + ) + + +1..100 | invoke-parallel -Throttle $ThrottleLimit { + + + + $pd = $philosopherData[($_ -1)% $philosopherData.Count] + + 1..100 | ForEach-Object { + $op = switch($_ % 8) + { + 0 { 'sleeping' } + 1 { 'drinking' } + 2 { 'drinking' } + 3 { 'thinking' } + 4 { 'drinking' } + 5 { 'drinking' } + 6 { 'eating' } + 7 { 'drinking' } + } + + $status = $pd.Treats[$_ % $pd.Treats.Length] + + $name = $pd.Name + $currentOperation = "$name is currently $op" + Write-Progress -id $PSParallelProgressId -percent $_ -activity $pd.Name -Status $status -CurrentOperation $currentOperation + Start-Sleep -milliseconds ($Milliseconds + 100 * (Get-Random -Minimum 3 -Maximum 7)) + } +} \ No newline at end of file diff --git a/src/PSParallel/InvokeParallelCommand.cs b/src/PSParallel/InvokeParallelCommand.cs index f495fcf..42ab9b2 100644 --- a/src/PSParallel/InvokeParallelCommand.cs +++ b/src/PSParallel/InvokeParallelCommand.cs @@ -1,14 +1,15 @@ using System; -using System.CodeDom; using System.Collections.Generic; using System.Collections.ObjectModel; +using System.Diagnostics; using System.Linq; using System.Management.Automation; -using System.Management.Automation.Language; using System.Management.Automation.Runspaces; using System.Threading; -using Microsoft.PowerShell.Commands; +// ReSharper disable UnusedAutoPropertyAccessor.Global +// ReSharper disable AutoPropertyCanBeMadeGetOnly.Global +// ReSharper disable MemberCanBePrivate.Global namespace PSParallel { [Alias("ipa")] @@ -18,190 +19,172 @@ public sealed class InvokeParallelCommand : PSCmdlet, IDisposable [Parameter(Mandatory = true, Position = 0)] public ScriptBlock ScriptBlock { get; set; } - [Alias("ppi")] [Parameter(ParameterSetName = "Progress")] + [Alias("ppi")] public int ParentProgressId { get; set; } = -1; - [Alias("pi")] [Parameter(ParameterSetName = "Progress")] + [Alias("pi")] public int ProgressId { get; set; } = 1000; - [Alias("pa")] [Parameter(ParameterSetName = "Progress")] + [Alias("pa")] [ValidateNotNullOrEmpty] public string ProgressActivity { get; set; } = "Invoke-Parallel"; [Parameter] - [ValidateRange(1,128)] + [ValidateRange(1, 128)] public int ThrottleLimit { get; set; } = 32; + [Parameter] + [AllowNull] + [Alias("iss")] + public InitialSessionState InitialSessionState { get; set; } + [Parameter(ValueFromPipeline = true, Mandatory = true)] public PSObject InputObject { get; set; } [Parameter(ParameterSetName = "NoProgress")] public SwitchParameter NoProgress { get; set; } - private readonly CancellationTokenSource m_cancelationTokenSource = new CancellationTokenSource(); - private PowershellPool m_powershellPool; - private InitialSessionState m_initialSessionState; - private ProgressManager m_progressManager; - - // this is only used when NoProgress is not specified - // Input is then captured in ProcessRecored and processed in EndProcessing - private List m_input; + private readonly CancellationTokenSource _cancelationTokenSource = new CancellationTokenSource(); + internal PowershellPool PowershellPool; - private static InitialSessionState GetSessionState(ScriptBlock scriptBlock, SessionState sessionState) + private static InitialSessionState GetSessionState(SessionState sessionState) { var initialSessionState = InitialSessionState.CreateDefault2(); - - CaptureVariables(scriptBlock, sessionState, initialSessionState); - // this will get invoked recursively - - var functions = GetFunctions(sessionState); - - CaptureFunctions(scriptBlock, initialSessionState, functions, new HashSet()); + CaptureVariables(sessionState, initialSessionState); + CaptureFunctions(sessionState, initialSessionState); return initialSessionState; } - private static IDictionary GetFunctions(SessionState sessionState) + private static IEnumerable GetFunctions(SessionState sessionState) { - var baseObject = (Dictionary.ValueCollection) sessionState.InvokeProvider.Item.Get("function:")[0].BaseObject; - return baseObject.ToDictionary(f=>f.Name); + try + { + var functionDrive = sessionState.InvokeProvider.Item.Get("function:"); + return (Dictionary.ValueCollection)functionDrive[0].BaseObject; + + } + catch (DriveNotFoundException) + { + return new FunctionInfo[] { }; + } } - private static void CaptureFunctions(ScriptBlock scriptBlock, InitialSessionState initialSessionState, - IDictionary functions, ISet processedFunctions) + private static IEnumerable GetVariables(SessionState sessionState) { - var commands = scriptBlock.Ast.FindAll((ast) => ast is CommandAst, true); - - var nonProcessedCommandNames = commands.Cast() - .Select(commandAst => commandAst.CommandElements[0].Extent.Text) - .Where(commandName => !processedFunctions.Contains(commandName)); - foreach (var commandName in nonProcessedCommandNames) + try { - - FunctionInfo functionInfo; - if (!functions.TryGetValue(commandName, out functionInfo)) - { - continue; - } - initialSessionState.Commands.Add(new SessionStateFunctionEntry(functionInfo.Name, functionInfo.Definition)); - processedFunctions.Add(commandName); - CaptureFunctions(functionInfo.ScriptBlock, initialSessionState, functions, processedFunctions); + string[] noTouchVariables = { "null", "true", "false", "Error" }; + var variables = sessionState.InvokeProvider.Item.Get("Variable:"); + var psVariables = (IEnumerable)variables[0].BaseObject; + + return psVariables.Where(p => !noTouchVariables.Contains(p.Name)); + } + catch (DriveNotFoundException) + { + return new PSVariable[] { }; } } + private static void CaptureFunctions(SessionState sessionState, InitialSessionState initialSessionState) + { + var functions = GetFunctions(sessionState); + foreach (var func in functions) + { + initialSessionState.Commands.Add(new SessionStateFunctionEntry(func.Name, func.Definition)); + } + } - private static void CaptureVariables(ScriptBlock scriptBlock, SessionState sessionState, - InitialSessionState initialSessionState) + private static void CaptureVariables(SessionState sessionState, InitialSessionState initialSessionState) { - var variables = scriptBlock.Ast.FindAll(ast => ast is VariableExpressionAst, true); - var varDict = new Dictionary(); - foreach (var ast in variables) + var variables = GetVariables(sessionState); + foreach (var variable in variables) { - var v = (VariableExpressionAst) ast; - var variableName = v.VariablePath.UserPath; - if (variableName == "_" || varDict.ContainsKey(variableName)) + var existing = initialSessionState.Variables[variable.Name].FirstOrDefault(); + if (existing != null) { - continue; + if ((existing.Options & (ScopedItemOptions.Constant | ScopedItemOptions.ReadOnly)) != ScopedItemOptions.None) + { + continue; + } + else + { + initialSessionState.Variables.Remove(existing.Name, existing.GetType()); + initialSessionState.Variables.Add(new SessionStateVariableEntry(variable.Name, variable.Value, variable.Description, variable.Options, variable.Attributes)); + } } - - var variable = sessionState.PSVariable.Get(variableName); - if (variable != null) + else { - var ssve = new SessionStateVariableEntry(variable.Name, variable.Value, - variable.Description, variable.Options, variable.Attributes); - varDict.Add(variableName, ssve); + initialSessionState.Variables.Add(new SessionStateVariableEntry(variable.Name, variable.Value, variable.Description, variable.Options, variable.Attributes)); } } + } - var prefs = new[] - { - "ErrorActionPreference", "DebugPreference", "VerbosePreference", "WarningPreference", - "ProgressPreference", "InformationPreference", "ConfirmPreference", "WhatIfPreference" - }; - foreach (var pref in prefs) + private void ValidateParameters() + { + if (NoProgress) { - var v = sessionState.PSVariable.Get(pref); - if (v != null) + var boundParameters = MyInvocation.BoundParameters; + foreach (var p in new[] { nameof(ProgressActivity), nameof(ParentProgressId), nameof(ProgressId) }) { - var ssve = new SessionStateVariableEntry(v.Name, v.Value, - v.Description, v.Options, v.Attributes); - varDict.Add(v.Name, ssve); + if (!boundParameters.ContainsKey(p)) continue; + var argumentException = new ArgumentException($"'{p}' must not be specified together with 'NoProgress'", p); + ThrowTerminatingError(new ErrorRecord(argumentException, "InvalidProgressParam", ErrorCategory.InvalidArgument, p)); } } - - initialSessionState.Variables.Add(varDict.Values); } - protected override void BeginProcessing() + InitialSessionState GetSessionState() { - m_initialSessionState = GetSessionState(ScriptBlock, SessionState); - m_powershellPool = new PowershellPool(ThrottleLimit,m_initialSessionState, m_cancelationTokenSource.Token); - m_powershellPool.Open(); - if (!NoProgress) + if (MyInvocation.BoundParameters.ContainsKey(nameof(InitialSessionState))) { - m_progressManager = new ProgressManager(ProgressId, ProgressActivity, $"Processing with {ThrottleLimit} workers", ParentProgressId); - m_input = new List(500); + if (InitialSessionState == null) + { + return InitialSessionState.Create(); + } + return InitialSessionState; } + return GetSessionState(base.SessionState); + } + + + private WorkerBase _worker; + protected override void BeginProcessing() + { + ValidateParameters(); + var iss = GetSessionState(); + PowershellPool = new PowershellPool(ThrottleLimit, iss, _cancelationTokenSource.Token); + _worker = NoProgress ? (WorkerBase) new NoProgressWorker(this) : new ProgressWorker(this); } + protected override void ProcessRecord() { - if(NoProgress) - { - m_powershellPool.AddInput(ScriptBlock, InputObject); - WriteOutputs(); - } - else - { - m_input.Add(InputObject); - } + _worker.ProcessRecord(InputObject); } protected override void EndProcessing() { - try { - if (!NoProgress) - { - m_progressManager.TotalCount = m_input.Count; - foreach (var i in m_input) - { - var pr = m_progressManager.GetCurrentProgressRecord($"Starting processing of {i}", m_powershellPool.ProcessedCount); - WriteProgress(pr); - m_powershellPool.AddInput(ScriptBlock, i); - WriteOutputs(); - } - } - while(!m_powershellPool.WaitForAllPowershellCompleted(100)) - { - var pr = m_progressManager.GetCurrentProgressRecord("All work queued. Waiting for remaining work to complete.", m_powershellPool.ProcessedCount); - WriteProgress(pr); - if (Stopping) - { - return; - } - WriteOutputs(); - } - WriteOutputs(); - } - finally - { - if(!NoProgress) - { - WriteProgress(m_progressManager.Completed()); - } - } + _worker.EndProcessing(); + } protected override void StopProcessing() { - m_cancelationTokenSource.Cancel(); + _cancelationTokenSource.Cancel(); + PowershellPool?.Stop(); } private void WriteOutputs() { - var streams = m_powershellPool.Streams; + Debug.WriteLine("Processing output"); + if (_cancelationTokenSource.IsCancellationRequested) + { + return; + } + var streams = PowershellPool.Streams; foreach (var o in streams.Output.ReadAll()) { WriteObject(o, false); @@ -227,21 +210,156 @@ private void WriteOutputs() { WriteVerbose(v.Message); } - foreach (var p in streams.Progress.ReadAll()) + _worker.WriteProgress(streams.ReadAllProgress()); + } + + public void Dispose() + { + PowershellPool?.Dispose(); + _cancelationTokenSource.Dispose(); + } + + + private abstract class WorkerBase + { + protected readonly InvokeParallelCommand Cmdlet; + protected readonly PowershellPool Pool; + protected bool Stopping => Cmdlet.Stopping; + protected void WriteOutputs() => Cmdlet.WriteOutputs(); + protected void WriteProgress(ProgressRecord record) => Cmdlet.WriteProgress(record); + public abstract void ProcessRecord(PSObject inputObject); + public abstract void EndProcessing(); + public abstract void WriteProgress(Collection progress); + protected ScriptBlock ScriptBlock => Cmdlet.ScriptBlock; + + protected WorkerBase(InvokeParallelCommand cmdlet) + { + Cmdlet = cmdlet; + Pool = cmdlet.PowershellPool; + } + } + + class NoProgressWorker : WorkerBase + { + public NoProgressWorker(InvokeParallelCommand cmdlet) : base(cmdlet) + { + } + + public override void ProcessRecord(PSObject inputObject) + { + while (!Pool.TryAddInput(Cmdlet.ScriptBlock, Cmdlet.InputObject)) + { + Cmdlet.WriteOutputs(); + } + } + + public override void EndProcessing() + { + while (!Pool.WaitForAllPowershellCompleted(100)) + { + if (Stopping) + { + return; + } + WriteOutputs(); + } + WriteOutputs(); + } + + public override void WriteProgress(Collection progress) { - if(!NoProgress) + foreach (var p in progress) { - p.ParentActivityId = m_progressManager.ActivityId; + base.WriteProgress(p); } - WriteProgress(p); } } - public void Dispose() + class ProgressWorker : WorkerBase { - m_powershellPool.Dispose(); - m_cancelationTokenSource.Dispose(); - } + readonly ProgressManager _progressManager; + private readonly List _input; + private int _lastEstimate = -1; + public ProgressWorker(InvokeParallelCommand cmdlet) : base(cmdlet) + { + _progressManager = new ProgressManager(cmdlet.ProgressId, cmdlet.ProgressActivity, $"Processing with {cmdlet.ThrottleLimit} workers", cmdlet.ParentProgressId); + _input = new List(500); + } + + public override void ProcessRecord(PSObject inputObject) + { + _input.Add(inputObject); + } + + public override void EndProcessing() + { + try + { + _progressManager.TotalCount = _input.Count; + var lastPercentComplete = -1; + foreach (var i in _input) + { + var processed = Pool.GetEstimatedProgressCount(); + _lastEstimate = processed; + _progressManager.SetCurrentOperation($"Starting processing of {i}"); + _progressManager.UpdateCurrentProgressRecord(processed); + var pr = _progressManager.ProgressRecord; + if (lastPercentComplete != pr.PercentComplete) + { + WriteProgress(pr); + lastPercentComplete = pr.PercentComplete; + } + + while (!Pool.TryAddInput(ScriptBlock, i)) + { + WriteOutputs(); + } + } + _progressManager.SetCurrentOperation("All work queued. Waiting for remaining work to complete."); + while (!Pool.WaitForAllPowershellCompleted(100)) + { + WriteProgressIfUpdated(); + if (Stopping) + { + return; + } + WriteOutputs(); + } + WriteOutputs(); + } + finally + { + _progressManager.UpdateCurrentProgressRecord(Pool.GetEstimatedProgressCount()); + WriteProgress(_progressManager.Completed()); + } + } + public override void WriteProgress(Collection progress) + { + foreach (var p in progress) + { + if (p.ActivityId != _progressManager.ActivityId) + { + p.ParentActivityId = _progressManager.ActivityId; + } + WriteProgress(p); + } + if (progress.Count > 0) + { + WriteProgressIfUpdated(); + } + } + + private void WriteProgressIfUpdated() + { + var estimatedCompletedCount = Pool.GetEstimatedProgressCount(); + if (_lastEstimate != estimatedCompletedCount) + { + _lastEstimate = estimatedCompletedCount; + _progressManager.UpdateCurrentProgressRecord(estimatedCompletedCount); + WriteProgress(_progressManager.ProgressRecord); + } + } + } } } diff --git a/src/PSParallel/PSParallel.csproj b/src/PSParallel/PSParallel.csproj index 75319f0..5a72cfb 100644 --- a/src/PSParallel/PSParallel.csproj +++ b/src/PSParallel/PSParallel.csproj @@ -33,6 +33,10 @@ false + + False + ..\..\..\..\..\..\..\Windows\Microsoft.NET\assembly\GAC_MSIL\Microsoft.PowerShell.Commands.Utility\v4.0_3.0.0.0__31bf3856ad364e35\Microsoft.PowerShell.Commands.Utility.dll + diff --git a/src/PSParallel/PowerShellPoolMember.cs b/src/PSParallel/PowerShellPoolMember.cs index ce7bf95..50265ae 100644 --- a/src/PSParallel/PowerShellPoolMember.cs +++ b/src/PSParallel/PowerShellPoolMember.cs @@ -1,62 +1,94 @@ using System; using System.Management.Automation; +using System.Management.Automation.Runspaces; using System.Threading; namespace PSParallel { class PowerShellPoolMember : IDisposable { - private readonly PowershellPool m_pool; - private readonly PowerShellPoolStreams m_poolStreams; - private PowerShell m_powerShell; - public PowerShell PowerShell => m_powerShell; - private readonly PSDataCollection m_input =new PSDataCollection(); - private PSDataCollection m_output; + private readonly PowershellPool _pool; + private readonly int _index; + private readonly InitialSessionState _initialSessionState; + private readonly PowerShellPoolStreams _poolStreams; + private PowerShell _powerShell; + public PowerShell PowerShell => _powerShell; + public int Index => _index ; + + private readonly PSDataCollection _input =new PSDataCollection(); + private PSDataCollection _output; + private int _percentComplete; + public int PercentComplete + { + get { return _percentComplete; } + set { _percentComplete = value; } + } + - public PowerShellPoolMember(PowershellPool pool) + public PowerShellPoolMember(PowershellPool pool, int index, InitialSessionState initialSessionState) { - m_pool = pool; - m_poolStreams = m_pool.Streams; - m_input.Complete(); - CreatePowerShell(); + _pool = pool; + _index = index; + _initialSessionState = initialSessionState; + _poolStreams = _pool.Streams; + _input.Complete(); + CreatePowerShell(initialSessionState); } private void PowerShellOnInvocationStateChanged(object sender, PSInvocationStateChangedEventArgs psInvocationStateChangedEventArgs) { switch (psInvocationStateChangedEventArgs.InvocationStateInfo.State) { - case PSInvocationState.Stopped: + ReleasePowerShell(); + _pool.ReportStopped(this); + break; case PSInvocationState.Completed: - case PSInvocationState.Failed: - ReturnPowerShell(m_powerShell); - CreatePowerShell(); - m_pool.ReportCompletion(this); - + case PSInvocationState.Failed: + ResetPowerShell(); + _pool.ReportAvailable(this); break; } } - private void CreatePowerShell() + private void CreatePowerShell(InitialSessionState initialSessionState) { - var powerShell = PowerShell.Create(); + var powerShell = PowerShell.Create(RunspaceMode.NewRunspace); + var runspace = RunspaceFactory.CreateRunspace(initialSessionState); + runspace.ApartmentState = ApartmentState.MTA; + powerShell.Runspace = runspace; + runspace.Open(); HookStreamEvents(powerShell.Streams); powerShell.InvocationStateChanged += PowerShellOnInvocationStateChanged; - m_powerShell = powerShell; - m_output = new PSDataCollection(); - m_output.DataAdded += OutputOnDataAdded; + _powerShell = powerShell; + _output = new PSDataCollection(); + _output.DataAdded += OutputOnDataAdded; + } + + public void ResetPowerShell() + { + UnhookStreamEvents(_powerShell.Streams); + _powerShell.Runspace.ResetRunspaceState(); + var runspace = _powerShell.Runspace; + _powerShell = PowerShell.Create(RunspaceMode.NewRunspace); + _powerShell.Runspace = runspace; + + HookStreamEvents(_powerShell.Streams); + _powerShell.InvocationStateChanged += PowerShellOnInvocationStateChanged; + _output = new PSDataCollection(); + _output.DataAdded += OutputOnDataAdded; } - private void ReturnPowerShell(PowerShell powershell) + private void ReleasePowerShell() { - UnhookStreamEvents(powershell.Streams); - powershell.InvocationStateChanged -= PowerShellOnInvocationStateChanged; - m_output.DataAdded -= OutputOnDataAdded; - powershell.Dispose(); + UnhookStreamEvents(_powerShell.Streams); + _powerShell.InvocationStateChanged -= PowerShellOnInvocationStateChanged; + _output.DataAdded -= OutputOnDataAdded; + _powerShell.Dispose(); } - private void HookStreamEvents(PSDataStreams streams) + private void HookStreamEvents(PSDataStreams streams) { streams.Debug.DataAdded += DebugOnDataAdded; streams.Error.DataAdded += ErrorOnDataAdded; @@ -80,65 +112,91 @@ private void UnhookStreamEvents(PSDataStreams streams) public void BeginInvoke(ScriptBlock scriptblock, PSObject inputObject) { - string command = $"param($_,$PSItem){scriptblock}"; - m_powerShell.AddScript(command) + _percentComplete = 0; + string command = $"param($_,$PSItem, $PSPArallelIndex,$PSParallelProgressId){scriptblock}"; + _powerShell.AddScript(command) .AddParameter("_", inputObject) - .AddParameter("PSItem", inputObject); - m_powerShell.BeginInvoke(m_input, m_output); + .AddParameter("PSItem", inputObject) + .AddParameter("PSParallelIndex", _index) + .AddParameter("PSParallelProgressId", _index + 1000); + _powerShell.BeginInvoke(_input, _output); } public void Dispose() { - if (m_powerShell != null) + var ps = _powerShell; + if (ps != null) { - UnhookStreamEvents(m_powerShell.Streams); + UnhookStreamEvents(ps.Streams); + ps.Runspace?.Dispose(); + ps.Dispose(); } - m_output.Dispose(); - m_input.Dispose(); - m_powerShell?.Dispose(); + _output.Dispose(); + _input.Dispose(); } private void OutputOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs) { var item = ((PSDataCollection)sender)[dataAddedEventArgs.Index]; - m_poolStreams.Output.Add(item); + _poolStreams.Output.Add(item); } private void InformationOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs) { var ir = ((PSDataCollection)sender)[dataAddedEventArgs.Index]; - m_poolStreams.Information.Add(ir); + _poolStreams.Information.Add(ir); } private void ProgressOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs) { - var record = ((PSDataCollection)sender)[dataAddedEventArgs.Index]; - m_poolStreams.Progress.Add(record); + var psDataCollection = ((PSDataCollection) sender); + var record = psDataCollection[dataAddedEventArgs.Index]; + _poolStreams.AddProgress(record, _index); } private void ErrorOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs) { var record = ((PSDataCollection)sender)[dataAddedEventArgs.Index]; - m_poolStreams.Error.Add(record); + _poolStreams.Error.Add(record); } private void DebugOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs) { var record = ((PSDataCollection)sender)[dataAddedEventArgs.Index]; - m_poolStreams.Debug.Add(record); + _poolStreams.Debug.Add(record); } private void WarningOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs) { var record = ((PSDataCollection)sender)[dataAddedEventArgs.Index]; - m_poolStreams.Warning.Add(record); + _poolStreams.Warning.Add(record); } private void VerboseOnDataAdded(object sender, DataAddedEventArgs dataAddedEventArgs) { var record = ((PSDataCollection)sender)[dataAddedEventArgs.Index]; - m_poolStreams.Verbose.Add(record); + _poolStreams.Verbose.Add(record); + } + + public void Stop() + { + if(_powerShell.InvocationStateInfo.State != PSInvocationState.Stopped) + { + UnhookStreamEvents(_powerShell.Streams); + _powerShell.BeginStop(OnStopped, null); + } + } + + private void OnStopped(IAsyncResult ar) + { + var ps = _powerShell; + if (ps == null) + { + return; + } + ps.EndStop(ar); + _powerShell = null; } } } \ No newline at end of file diff --git a/src/PSParallel/PowerShellPoolStreams.cs b/src/PSParallel/PowerShellPoolStreams.cs index 56e64e0..bcbe46b 100644 --- a/src/PSParallel/PowerShellPoolStreams.cs +++ b/src/PSParallel/PowerShellPoolStreams.cs @@ -1,4 +1,7 @@ using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Linq; using System.Management.Automation; namespace PSParallel @@ -7,9 +10,9 @@ class PowerShellPoolStreams : IDisposable { public PSDataCollection Output { get; } = new PSDataCollection(100); public PSDataCollection Debug { get; } = new PSDataCollection(); - public PSDataCollection Progress { get; } = new PSDataCollection(); + private PSDataCollection Progress { get; } = new PSDataCollection(); public PSDataCollection Error { get; } = new PSDataCollection(); - public PSDataCollection Warning { get; } = new PSDataCollection(); + public PSDataCollection Warning { get; } = new PSDataCollection(); public PSDataCollection Information { get; } = new PSDataCollection(); public PSDataCollection Verbose { get; } = new PSDataCollection(); @@ -22,5 +25,52 @@ public void Dispose() Information.Dispose(); Verbose.Dispose(); } + + public void AddProgress(ProgressRecord progress, int index) + { + DoAddProgress(progress); + OnProgressChanged(progress.PercentComplete, index); + } + + public void ClearProgress(int index) + { + OnProgressChanged(0, index); + } + + protected void DoAddProgress(ProgressRecord progress) + { + Progress.Add(progress); + } + + protected virtual void OnProgressChanged(int progress, int index){} + + public Collection ReadAllProgress() + { + return Progress.ReadAll(); + } } + + class ProgressTrackingPowerShellPoolStreams : PowerShellPoolStreams + { + private readonly int _maxPoolSize; + private readonly int[] _poolProgress; + private int _currentProgress; + public ProgressTrackingPowerShellPoolStreams(int maxPoolSize) + { + _maxPoolSize = maxPoolSize; + _poolProgress = new int[maxPoolSize]; + } + + protected override void OnProgressChanged(int progress, int index) + { + lock(_poolProgress) { + _poolProgress[index] = progress; + _currentProgress = _poolProgress.Sum(); + } + } + + public int PoolPercentComplete => _currentProgress/_maxPoolSize; + + } + } \ No newline at end of file diff --git a/src/PSParallel/PowershellPool.cs b/src/PSParallel/PowershellPool.cs index 6031c3a..0412a26 100644 --- a/src/PSParallel/PowershellPool.cs +++ b/src/PSParallel/PowershellPool.cs @@ -1,99 +1,154 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Diagnostics.Contracts; using System.Management.Automation; +using System.Management.Automation.Host; using System.Management.Automation.Runspaces; using System.Threading; + namespace PSParallel { - class PowershellPool : IDisposable + sealed class PowershellPool : IDisposable { - private int m_busyCount; - private int m_processedCount; - private readonly CancellationToken m_cancellationToken; - private readonly RunspacePool m_runspacePool; - private readonly List m_poolMembers; - private readonly BlockingCollection m_availablePoolMembers = new BlockingCollection(new ConcurrentStack ()); + private readonly object _countLock = new object(); + private int _busyCount; + private readonly CancellationToken _cancellationToken; + private readonly List _poolMembers; + private readonly InitialSessionState _initialSessionState; + private readonly BlockingCollection _availablePoolMembers = new BlockingCollection(new ConcurrentQueue()); public readonly PowerShellPoolStreams Streams = new PowerShellPoolStreams(); - - public int ProcessedCount => m_processedCount; + private int _processedCount; public PowershellPool(int poolSize, InitialSessionState initialSessionState, CancellationToken cancellationToken) { - m_poolMembers= new List(poolSize); - m_processedCount = 0; - m_cancellationToken = cancellationToken; + _poolMembers= new List(poolSize); + _initialSessionState = initialSessionState; + _cancellationToken = cancellationToken; + + for (var i = 0; i < poolSize; i++) + { + var powerShellPoolMember = new PowerShellPoolMember(this, i+1, initialSessionState); + _poolMembers.Add(powerShellPoolMember); + _availablePoolMembers.Add(powerShellPoolMember); + } + } - for (int i = 0; i < poolSize; i++) + private int GetPartiallyProcessedCount() + { + var totalPercentComplete = 0; + var count = _poolMembers.Count; + for (int i = 0; i < count; ++i) { - var powerShellPoolMember = new PowerShellPoolMember(this); - m_poolMembers.Add(powerShellPoolMember); - m_availablePoolMembers.Add(powerShellPoolMember); + var percentComplete = _poolMembers[i].PercentComplete; + if (percentComplete < 0) + { + percentComplete = 0; + } + else if(percentComplete > 100) + { + percentComplete = 100; + } + totalPercentComplete += percentComplete; } - - m_runspacePool = RunspaceFactory.CreateRunspacePool(initialSessionState); - m_runspacePool.SetMaxRunspaces(poolSize); + var partiallyProcessedCount = totalPercentComplete / 100; + return partiallyProcessedCount; } - public void AddInput(ScriptBlock scriptblock,PSObject inputObject) + public int GetEstimatedProgressCount() { - var powerShell = WaitForAvailablePowershell(); - powerShell.BeginInvoke(scriptblock, inputObject); - Interlocked.Increment(ref m_busyCount); + lock(_countLock) { + return _processedCount + GetPartiallyProcessedCount(); + } } - public void Open() - { - m_runspacePool.Open(); + public bool TryAddInput(ScriptBlock scriptblock,PSObject inputObject) + { + PowerShellPoolMember poolMember; + if(!TryWaitForAvailablePowershell(100, out poolMember)) + { + return false; + } + + Interlocked.Increment(ref _busyCount); + poolMember.BeginInvoke(scriptblock, inputObject); + return true; } + + public bool WaitForAllPowershellCompleted(int timeoutMilliseconds) { - Contract.Requires(timeoutMilliseconds >=0); + Contract.Requires(timeoutMilliseconds >= 0); var startTicks = Environment.TickCount; var currendTicks = startTicks; while (currendTicks - startTicks < timeoutMilliseconds) { currendTicks = Environment.TickCount; - if (m_cancellationToken.IsCancellationRequested) + if (_cancellationToken.IsCancellationRequested) { return false; } - if (Interlocked.CompareExchange(ref m_busyCount, 0, 0) == 0) + if (Interlocked.CompareExchange(ref _busyCount, 0, 0) == 0) { return true; } Thread.Sleep(10); - } return false; } - private PowerShellPoolMember WaitForAvailablePowershell() + private bool TryWaitForAvailablePowershell(int milliseconds, out PowerShellPoolMember poolMember) { - var poolmember = m_availablePoolMembers.Take(m_cancellationToken); - poolmember.PowerShell.RunspacePool = m_runspacePool; - return poolmember; + if (!_availablePoolMembers.TryTake(out poolMember, milliseconds, _cancellationToken)) + { + _cancellationToken.ThrowIfCancellationRequested(); + Debug.WriteLine("WaitForAvailablePowershell - TryTake failed"); + poolMember = null; + return false; + } + return true; } public void Dispose() { - foreach (var pm in m_poolMembers) + Streams.Dispose(); + _availablePoolMembers.Dispose(); + } + + public void ReportAvailable(PowerShellPoolMember poolmember) + { + Interlocked.Decrement(ref _busyCount); + lock (_countLock) { - pm.Dispose(); + _processedCount++; + poolmember.PercentComplete = 0; + } + + poolmember.PercentComplete = 0; + while (!_availablePoolMembers.TryAdd(poolmember, 1000, _cancellationToken)) + { + _cancellationToken.ThrowIfCancellationRequested(); + Debug.WriteLine("WaitForAvailablePowershell - TryAdd failed"); } - Streams.Dispose(); - m_availablePoolMembers.Dispose(); } - public void ReportCompletion(PowerShellPoolMember poolmember) + public void ReportStopped(PowerShellPoolMember powerShellPoolMember) { - Interlocked.Decrement(ref m_busyCount); - Interlocked.Increment(ref m_processedCount); - m_availablePoolMembers.Add(poolmember); + Interlocked.Decrement(ref _busyCount); + } + + public void Stop() + { + _availablePoolMembers.CompleteAdding(); + foreach (var poolMember in _poolMembers) + { + poolMember.Stop(); + } + WaitForAllPowershellCompleted(5000); } } } \ No newline at end of file diff --git a/src/PSParallel/ProgressManager.cs b/src/PSParallel/ProgressManager.cs index c3b1d00..2291b91 100644 --- a/src/PSParallel/ProgressManager.cs +++ b/src/PSParallel/ProgressManager.cs @@ -1,56 +1,155 @@ -using System.Diagnostics; +using System; +using System.Diagnostics; using System.Management.Automation; - namespace PSParallel { class ProgressManager { public int TotalCount { get; set; } - private readonly ProgressRecord m_progressRecord; - private readonly Stopwatch m_stopwatch; + private ProgressRecord _progressRecord; + private readonly Stopwatch _stopwatch; + private string _currentOperation; public ProgressManager(int activityId, string activity, string statusDescription, int parentActivityId = -1, int totalCount = 0) { TotalCount = totalCount; - m_stopwatch = new Stopwatch(); - m_progressRecord = new ProgressRecord(activityId, activity, statusDescription) {ParentActivityId = parentActivityId}; + _stopwatch = new Stopwatch(); + _progressRecord = new ProgressRecord(activityId, activity, statusDescription) {ParentActivityId = parentActivityId}; } - public ProgressRecord GetCurrentProgressRecord(string currentOperation, int count) + + private void UpdateCurrentProgressRecordInternal(int count) { - if(!m_stopwatch.IsRunning && TotalCount > 0) + if (!_stopwatch.IsRunning && TotalCount > 0) { - m_stopwatch.Start(); + _stopwatch.Start(); } - m_progressRecord.RecordType = ProgressRecordType.Processing; - if(TotalCount > 0) - { - var percentComplete = GetPercentComplete(count); - if (percentComplete != m_progressRecord.PercentComplete) - { - m_progressRecord.PercentComplete = percentComplete; - m_progressRecord.SecondsRemaining = GetSecondsRemaining(count); - } - m_progressRecord.CurrentOperation = $"({count}/{TotalCount}) {currentOperation}"; + var current = TotalCount > 0 ? $"({count}/{TotalCount}) {_currentOperation}" : _currentOperation; + var pr = _progressRecord.Clone(); + pr.CurrentOperation = current; + pr.RecordType = ProgressRecordType.Processing; + if (TotalCount > 0) + { + pr.PercentComplete = GetPercentComplete(count); + pr.SecondsRemaining = GetSecondsRemaining(count); } - else - { - m_progressRecord.CurrentOperation = currentOperation; - } - return m_progressRecord; + _progressRecord = pr; + } + + public void SetCurrentOperation(string currentOperation) + { + _currentOperation = currentOperation; } + public void UpdateCurrentProgressRecord(int count) + { + + UpdateCurrentProgressRecordInternal(count); + } + + public ProgressRecord ProgressRecord => _progressRecord; + public ProgressRecord Completed() { - m_stopwatch.Reset(); + _stopwatch.Reset(); + _progressRecord = _progressRecord.WithRecordType(ProgressRecordType.Completed); + return _progressRecord; + } + + + private int GetSecondsRemaining(int count) + { + var secondsRemaining = count == 0 ? -1 : (int) ((TotalCount - count)*_stopwatch.ElapsedMilliseconds/1000/count); + return secondsRemaining; + } + + private int GetPercentComplete(int count) + { + var percentComplete = count*100/TotalCount; + return percentComplete; + } + + public int ActivityId => _progressRecord.ActivityId; + } + + + class ProgressProjector + { + private readonly Stopwatch _stopWatch; + private int _percentComplete; + public ProgressProjector() + { + _stopWatch = new Stopwatch(); + _percentComplete = -1; + } + + public void ReportProgress(int percentComplete) + { + if (percentComplete > 100) + { + percentComplete = 100; + } + _percentComplete = percentComplete; + } - m_progressRecord.RecordType = ProgressRecordType.Completed; - return m_progressRecord; + public bool IsValid => _percentComplete > 0 && _stopWatch.IsRunning; + public TimeSpan Elapsed => _stopWatch.Elapsed; + + public TimeSpan ProjectedTotalTime => new TimeSpan(Elapsed.Ticks * 100 / _percentComplete); + + public void Start() + { + _stopWatch.Start(); + _percentComplete = 0; + } + + public void Stop() + { + _stopWatch.Stop(); + } + } + + static class ProgressRecordExtension + { + static ProgressRecord CloneProgressRecord(ProgressRecord record) + { + return new ProgressRecord(record.ActivityId, record.Activity, record.StatusDescription) + { + CurrentOperation = record.CurrentOperation, + ParentActivityId = record.ParentActivityId, + SecondsRemaining = record.SecondsRemaining, + PercentComplete = record.PercentComplete, + RecordType = record.RecordType + }; + } + + public static ProgressRecord Clone(this ProgressRecord record) + { + return CloneProgressRecord(record); + } + + public static ProgressRecord WithCurrentOperation(this ProgressRecord record, string currentOperation) + { + var r = CloneProgressRecord(record); + r.CurrentOperation = currentOperation; + return r; + } + + public static ProgressRecord WithRecordType(this ProgressRecord record, ProgressRecordType recordType) + { + var r = CloneProgressRecord(record); + r.RecordType = recordType; + return r; + } + + public static ProgressRecord WithPercentCompleteAndSecondsRemaining(this ProgressRecord record, int percentComplete, int secondsRemaining) + { + var r = CloneProgressRecord(record); + r.PercentComplete = percentComplete; + r.SecondsRemaining = secondsRemaining; + return r; } - private int GetSecondsRemaining(int count) => count == 0 ? -1 : (int) ((TotalCount - count)*m_stopwatch.ElapsedMilliseconds/1000/count); - private int GetPercentComplete(int count) => count*100/TotalCount; - public int ActivityId => m_progressRecord.ActivityId; } } diff --git a/src/PSParallel/Properties/AssemblyInfo.cs b/src/PSParallel/Properties/AssemblyInfo.cs index 3a8773d..fe5f249 100644 --- a/src/PSParallel/Properties/AssemblyInfo.cs +++ b/src/PSParallel/Properties/AssemblyInfo.cs @@ -1,5 +1,4 @@ using System.Reflection; -using System.Runtime.CompilerServices; using System.Runtime.InteropServices; // General Information about an assembly is controlled through the following @@ -10,7 +9,7 @@ [assembly: AssemblyConfiguration("")] [assembly: AssemblyCompany("")] [assembly: AssemblyProduct("PSParallell")] -[assembly: AssemblyCopyright("Copyright © 2015")] +[assembly: AssemblyCopyright("Copyright © 2016")] [assembly: AssemblyTrademark("")] [assembly: AssemblyCulture("")] @@ -33,4 +32,4 @@ // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] [assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("2.2.3.0")] diff --git a/src/PSParallelTests/InvokeParallelTests.cs b/src/PSParallelTests/InvokeParallelTests.cs index 5f1e941..02b105f 100644 --- a/src/PSParallelTests/InvokeParallelTests.cs +++ b/src/PSParallelTests/InvokeParallelTests.cs @@ -1,5 +1,4 @@ using System; -using System.IO; using System.Linq; using System.Management.Automation; using System.Management.Automation.Runspaces; @@ -13,277 +12,394 @@ namespace PSParallelTests public sealed class InvokeParallelTests : IDisposable { readonly RunspacePool m_runspacePool; - + readonly InitialSessionState _iss; public InvokeParallelTests() - { + { + _iss = CreateInitialSessionState(); + m_runspacePool = RunspaceFactory.CreateRunspacePool(_iss); + m_runspacePool.SetMaxRunspaces(10); + m_runspacePool.Open(); + } + + private static InitialSessionState CreateInitialSessionState() + { var iss = InitialSessionState.Create(); - iss.LanguageMode = PSLanguageMode.FullLanguage; - iss.Commands.Add(new [] + iss.LanguageMode = PSLanguageMode.FullLanguage; + iss.Commands.Add(new[] { - new SessionStateCmdletEntry("Write-Error", typeof(WriteErrorCommand), null), - new SessionStateCmdletEntry("Write-Verbose", typeof(WriteVerboseCommand), null), - new SessionStateCmdletEntry("Write-Debug", typeof(WriteDebugCommand), null), - new SessionStateCmdletEntry("Write-Progress", typeof(WriteProgressCommand), null), - new SessionStateCmdletEntry("Write-Warning", typeof(WriteWarningCommand), null), + new SessionStateCmdletEntry("Write-Error", typeof(WriteErrorCommand), null), + new SessionStateCmdletEntry("Write-Verbose", typeof(WriteVerboseCommand), null), + new SessionStateCmdletEntry("Write-Debug", typeof(WriteDebugCommand), null), + new SessionStateCmdletEntry("Write-Progress", typeof(WriteProgressCommand), null), + new SessionStateCmdletEntry("Write-Warning", typeof(WriteWarningCommand), null), new SessionStateCmdletEntry("Write-Information", typeof(WriteInformationCommand), null), - new SessionStateCmdletEntry("Invoke-Parallel", typeof(InvokeParallelCommand), null), - }); - m_runspacePool = RunspaceFactory.CreateRunspacePool(iss); - m_runspacePool.SetMaxRunspaces(10); - m_runspacePool.Open(); + new SessionStateCmdletEntry("Invoke-Parallel", typeof(InvokeParallelCommand), null), + }); + iss.Providers.Add(new SessionStateProviderEntry("Function", typeof(FunctionProvider), null)); + iss.Providers.Add(new SessionStateProviderEntry("Variable", typeof(VariableProvider), null)); + return iss; } + [TestMethod] public void TestOutput() { - PowerShell ps = PowerShell.Create(); - ps.RunspacePool = m_runspacePool; - - ps.AddCommand("Invoke-Parallel") - .AddParameter("ScriptBlock", ScriptBlock.Create("$_* 2")) - .AddParameter("ThrottleLimit", 1); - var input = new PSDataCollection {1,2,3,4,5}; - input.Complete(); - var output = ps.Invoke(input); - var sum = output.Aggregate(0, (a, b) => a + b); - Assert.AreEqual(30, sum); + using (var ps = PowerShell.Create()) + { + ps.RunspacePool = m_runspacePool; + + ps.AddCommand("Invoke-Parallel") + .AddParameter("ScriptBlock", ScriptBlock.Create("$_* 2")) + .AddParameter("ThrottleLimit", 1); + var input = new PSDataCollection {1,2,3,4,5}; + input.Complete(); + var output = ps.Invoke(input); + var sum = output.Aggregate(0, (a, b) => a + b); + Assert.AreEqual(30, sum); + } } [TestMethod] public void TestParallelOutput() { - PowerShell ps = PowerShell.Create(); - ps.RunspacePool = m_runspacePool; - - ps.AddCommand("Invoke-Parallel") - .AddParameter("ScriptBlock", ScriptBlock.Create("$_* 2")) - .AddParameter("ThrottleLimit", 10); - var input = new PSDataCollection(Enumerable.Range(1,1000)); - input.Complete(); - var output = ps.Invoke(input); - var sum = output.Aggregate(0, (a, b) => a + b); - Assert.AreEqual(1001000, sum); + using (var ps = PowerShell.Create()) + { + //ps.RunspacePool = m_runspacePool; + + ps.AddCommand("Invoke-Parallel") + .AddParameter("ScriptBlock", ScriptBlock.Create("$_* 2")) + .AddParameter("ThrottleLimit", 10); + var input = new PSDataCollection(Enumerable.Range(1, 1000)); + input.Complete(); + var output = ps.Invoke(input); + var sum = output.Aggregate(0, (a, b) => a + b); + Assert.AreEqual(1001000, sum); + } } [TestMethod] public void TestVerboseOutput() { - PowerShell ps = PowerShell.Create(); - ps.RunspacePool = m_runspacePool; - ps.AddScript("$VerbosePreference='Continue'", false).Invoke(); - ps.Commands.Clear(); - ps.AddStatement() - .AddCommand("Invoke-Parallel",false) + using (var ps = PowerShell.Create()) + { + ps.RunspacePool = m_runspacePool; + ps.AddScript("$VerbosePreference=[System.Management.Automation.ActionPreference]::Continue", false).Invoke(); + ps.Commands.Clear(); + ps.AddStatement() + .AddCommand("Invoke-Parallel", false) .AddParameter("ScriptBlock", ScriptBlock.Create("Write-Verbose $_")) .AddParameter("ThrottleLimit", 1); - var input = new PSDataCollection { 1, 2, 3, 4, 5 }; - input.Complete(); - ps.Invoke(input); - Assert.IsFalse(ps.HadErrors, "We don't expect errors here"); - var vrb = ps.Streams.Verbose.ReadAll(); - Assert.IsTrue(vrb.Any(v=> v.Message == "1"), "Some verbose message should be '1'"); + var input = new PSDataCollection {1, 2, 3, 4, 5}; + input.Complete(); + ps.Invoke(input); + Assert.IsFalse(ps.HadErrors, "We don't expect errors here"); + var vrb = ps.Streams.Verbose.ReadAll(); + Assert.IsTrue(vrb.Any(v => v.Message == "1"), "Some verbose message should be '1'"); + } } [TestMethod] public void TestNoVerboseOutputWithoutPreference() { - PowerShell ps = PowerShell.Create(); - ps.RunspacePool = m_runspacePool; - ps.AddStatement() - .AddCommand("Invoke-Parallel", false) + using (var ps = PowerShell.Create()) + { + ps.Runspace = RunspaceFactory.CreateRunspace(); + ps.Runspace.Open(); + ps.AddStatement() + .AddCommand("Invoke-Parallel", false) .AddParameter("ScriptBlock", ScriptBlock.Create("Write-Verbose $_")) .AddParameter("ThrottleLimit", 1); - var input = new PSDataCollection { 1, 2, 3, 4, 5 }; - input.Complete(); - ps.Invoke(input); - Assert.IsFalse(ps.HadErrors, "We don't expect errors here"); - var vrb = ps.Streams.Verbose.ReadAll(); - Assert.IsFalse(vrb.Any(v => v.Message == "1"), "No verbose message should be '1'"); + var input = new PSDataCollection {1, 2, 3, 4, 5}; + input.Complete(); + ps.Invoke(input); + Assert.IsFalse(ps.HadErrors, "We don't expect errors here"); + var vrb = ps.Streams.Verbose.ReadAll(); + Assert.IsFalse(vrb.Any(v => v.Message == "1"), "No verbose message should be '1'"); + ps.Runspace.Dispose(); + } } [TestMethod] public void TestDebugOutput() { - PowerShell ps = PowerShell.Create(); - ps.RunspacePool = m_runspacePool; - ps.AddScript("$DebugPreference='Continue'", false).Invoke(); - ps.Commands.Clear(); - ps.AddStatement() - .AddCommand("Invoke-Parallel", false) - .AddParameter("ScriptBlock", ScriptBlock.Create("Write-Debug $_")) - .AddParameter("ThrottleLimit", 1); - var input = new PSDataCollection { 1, 2, 3, 4, 5 }; - input.Complete(); - ps.Invoke(input); - Assert.IsFalse(ps.HadErrors, "We don't expect errors here"); - var dbg = ps.Streams.Debug.ReadAll(); - Assert.IsTrue(dbg.Any(d => d.Message == "1"), "Some debug message should be '1'"); + using (var ps = PowerShell.Create()) + { + using (var rs = RunspaceFactory.CreateRunspace(_iss)) + { + rs.Open(); + ps.Runspace = rs; + ps.AddScript("$DebugPreference=[System.Management.Automation.ActionPreference]::Continue", false).Invoke(); + ps.Commands.Clear(); + ps.AddStatement() + .AddCommand("Invoke-Parallel", false) + .AddParameter("ScriptBlock", ScriptBlock.Create("Write-Debug $_")) + .AddParameter("ThrottleLimit", 1); + var input = new PSDataCollection { 1, 2, 3, 4, 5 }; + input.Complete(); + ps.Invoke(input); + Assert.IsFalse(ps.HadErrors, "We don't expect errors here"); + var dbg = ps.Streams.Debug.ReadAll(); + Assert.IsTrue(dbg.Any(d => d.Message == "1"), "Some debug message should be '1'"); + } + } } [TestMethod] public void TestNoDebugOutputWithoutPreference() { - PowerShell ps = PowerShell.Create(); - ps.RunspacePool = m_runspacePool; - ps.Commands.Clear(); - ps.AddStatement() - .AddCommand("Invoke-Parallel", false) + using (var ps = PowerShell.Create()) + { + ps.RunspacePool = m_runspacePool; + ps.Commands.Clear(); + ps.AddStatement() + .AddCommand("Invoke-Parallel", false) .AddParameter("ScriptBlock", ScriptBlock.Create("Write-Debug $_")) .AddParameter("ThrottleLimit", 1); - var input = new PSDataCollection { 1, 2, 3, 4, 5 }; - input.Complete(); - ps.Invoke(input); - var dbg = ps.Streams.Debug.ReadAll(); - Assert.IsFalse(dbg.Any(d => d.Message == "1"), "No debug message should be '1'"); + var input = new PSDataCollection {1, 2, 3, 4, 5}; + input.Complete(); + ps.Invoke(input); + var dbg = ps.Streams.Debug.ReadAll(); + Assert.IsFalse(dbg.Any(d => d.Message == "1"), "No debug message should be '1'"); + } } [TestMethod] public void TestWarningOutput() { - PowerShell ps = PowerShell.Create(); - ps.RunspacePool = m_runspacePool; - ps.AddScript("$WarningPreference='Continue'", false).Invoke(); - ps.Commands.Clear(); - ps.AddStatement() - .AddCommand("Invoke-Parallel", false) + + using (var ps = PowerShell.Create()) + { + ps.RunspacePool = m_runspacePool; + ps.AddScript("$WarningPreference='Continue'", false).Invoke(); + ps.Commands.Clear(); + ps.AddStatement() + .AddCommand("Invoke-Parallel", false) .AddParameter("ScriptBlock", ScriptBlock.Create("Write-Warning $_")) .AddParameter("ThrottleLimit", 1); - var input = new PSDataCollection { 1, 2, 3, 4, 5 }; - input.Complete(); - ps.Invoke(input); - var wrn = ps.Streams.Warning.ReadAll(); - Assert.IsTrue(wrn.Any(w => w.Message == "1"), "Some warning message should be '1'"); + var input = new PSDataCollection {1, 2, 3, 4, 5}; + input.Complete(); + ps.Invoke(input); + var wrn = ps.Streams.Warning.ReadAll(); + Assert.IsTrue(wrn.Any(w => w.Message == "1"), "Some warning message should be '1'"); + } } [TestMethod] public void TestNoWarningOutputWithoutPreference() { - PowerShell ps = PowerShell.Create(); - ps.RunspacePool = m_runspacePool; - ps.AddScript("$WarningPreference='SilentlyContinue'", false).Invoke(); - ps.Commands.Clear(); - ps.AddStatement() - .AddCommand("Invoke-Parallel", false) + using (var ps = PowerShell.Create()) + { + ps.RunspacePool = m_runspacePool; + ps.AddScript("$WarningPreference='SilentlyContinue'", false).Invoke(); + ps.Commands.Clear(); + ps.AddStatement() + .AddCommand("Invoke-Parallel", false) .AddParameter("ScriptBlock", ScriptBlock.Create("Write-Warning $_")) .AddParameter("ThrottleLimit", 1); - var input = new PSDataCollection { 1, 2, 3, 4, 5 }; - input.Complete(); - ps.Invoke(input); - var wrn = ps.Streams.Warning.ReadAll(); - Assert.IsFalse(wrn.Any(w => w.Message == "1"), "No warning message should be '1'"); + var input = new PSDataCollection {1, 2, 3, 4, 5}; + input.Complete(); + ps.Invoke(input); + var wrn = ps.Streams.Warning.ReadAll(); + Assert.IsFalse(wrn.Any(w => w.Message == "1"), "No warning message should be '1'"); + } } [TestMethod] public void TestErrorOutput() { - PowerShell ps = PowerShell.Create(); - ps.RunspacePool = m_runspacePool; - ps.AddScript("$ErrorActionPreference='Continue'", false).Invoke(); - ps.Commands.Clear(); - ps.AddStatement() - .AddCommand("Invoke-Parallel", false) + using (var ps = PowerShell.Create()) + { + ps.RunspacePool = m_runspacePool; + ps.AddScript("$ErrorActionPreference='Continue'", false).Invoke(); + ps.Commands.Clear(); + ps.AddStatement() + .AddCommand("Invoke-Parallel", false) .AddParameter("ScriptBlock", ScriptBlock.Create("Write-Error -Message $_ -TargetObject $_")) .AddParameter("ThrottleLimit", 1); - var input = new PSDataCollection { 1, 2, 3, 4, 5 }; - input.Complete(); - ps.Invoke(input); - var err = ps.Streams.Error.ReadAll(); - Assert.IsTrue(err.Any(e => e.Exception.Message == "1"), "Some warning message should be '1'"); + var input = new PSDataCollection {1, 2, 3, 4, 5}; + input.Complete(); + ps.Invoke(input); + var err = ps.Streams.Error.ReadAll(); + Assert.IsTrue(err.Any(e => e.Exception.Message == "1"), "Some warning message should be '1'"); + } } [TestMethod] public void TestNoErrorOutputWithoutPreference() { - PowerShell ps = PowerShell.Create(); - ps.RunspacePool = m_runspacePool; - ps.AddScript("$ErrorActionPreference='SilentlyContinue'", false).Invoke(); - ps.Commands.Clear(); - ps.AddStatement() - .AddCommand("Invoke-Parallel", false) + using (var ps = PowerShell.Create()) + { + ps.RunspacePool = m_runspacePool; + ps.AddScript("$ErrorActionPreference='SilentlyContinue'", false).Invoke(); + ps.Commands.Clear(); + ps.AddStatement() + .AddCommand("Invoke-Parallel", false) .AddParameter("ScriptBlock", ScriptBlock.Create("Write-Error -message $_ -TargetObject $_")) .AddParameter("ThrottleLimit", 1); - var input = new PSDataCollection { 1, 2, 3, 4, 5 }; - input.Complete(); - ps.Invoke(input); - var err = ps.Streams.Error.ReadAll(); - Assert.IsFalse(err.Any(e => e.Exception.Message == "1"), "No Error message should be '1'"); + var input = new PSDataCollection {1, 2, 3, 4, 5}; + input.Complete(); + ps.Invoke(input); + var err = ps.Streams.Error.ReadAll(); + Assert.IsFalse(err.Any(e => e.Exception.Message == "1"), "No Error message should be '1'"); + } } [TestMethod] public void TestBinaryExpressionVariableCapture() { - PowerShell ps = PowerShell.Create(); - ps.RunspacePool = m_runspacePool; - ps.AddScript("[int]$x=10", false).Invoke(); - ps.Commands.Clear(); - ps.AddStatement() - .AddCommand("Invoke-Parallel", false) - .AddParameter("ScriptBlock", ScriptBlock.Create("$x -eq 10")) - .AddParameter("ThrottleLimit", 1) - .AddParameter("InputObject", 1); - - var result = ps.Invoke().First(); - Assert.IsTrue(result); + using (var ps = PowerShell.Create()) + { + ps.RunspacePool = m_runspacePool; + ps.AddScript("[int]$x=10", false).Invoke(); + ps.Commands.Clear(); + ps.AddStatement() + .AddCommand("Invoke-Parallel", false) + .AddParameter("ScriptBlock", ScriptBlock.Create("$x -eq 10")) + .AddParameter("ThrottleLimit", 1) + .AddParameter("InputObject", 1); + + var result = ps.Invoke().First(); + Assert.IsTrue(result); + } } [TestMethod] public void TestAssingmentExpressionVariableCapture() { - PowerShell ps = PowerShell.Create(); - ps.RunspacePool = m_runspacePool; - ps.AddScript("[int]$x=10;", false).Invoke(); - ps.Commands.Clear(); - ps.AddStatement() - .AddCommand("Invoke-Parallel", false) - .AddParameter("ScriptBlock", ScriptBlock.Create("$y = $x * 5; $y")) - .AddParameter("ThrottleLimit", 1) - .AddParameter("InputObject", 1); - - var result = ps.Invoke().First(); - Assert.AreEqual(50, result); + using (var ps = PowerShell.Create()) + { + ps.RunspacePool = m_runspacePool; + ps.AddScript("[int]$x=10;", false).Invoke(); + ps.Commands.Clear(); + ps.AddStatement() + .AddCommand("Invoke-Parallel", false) + .AddParameter("ScriptBlock", ScriptBlock.Create("$y = $x * 5; $y")) + .AddParameter("ThrottleLimit", 1) + .AddParameter("InputObject", 1); + + var result = ps.Invoke().First(); + Assert.AreEqual(50, result); + } } [TestMethod] public void TestProgressOutput() { - PowerShell ps = PowerShell.Create(); - ps.RunspacePool = m_runspacePool; - - ps.AddStatement() - .AddCommand("Invoke-Parallel", false) - .AddParameter("ScriptBlock", ScriptBlock.Create("Write-Progress -activity 'Test' -Status 'Status' -currentoperation $_")) - .AddParameter("ThrottleLimit", 1); - - var input = new PSDataCollection { 1, 2, 3, 4, 5 }; - input.Complete(); - ps.Invoke(input); - var progress = ps.Streams.Progress.ReadAll(); - Assert.AreEqual(11, progress.Count(pr=>pr.Activity == "Invoke-Parallel" || pr.Activity == "Test")); + using (var ps = PowerShell.Create()) + { + ps.RunspacePool = m_runspacePool; + ps.AddScript("$ProgressPreference='Continue'", false).Invoke(); + ps.AddStatement() + .AddCommand("Invoke-Parallel", false) + .AddParameter("ScriptBlock", + ScriptBlock.Create("Write-Progress -activity 'Test' -Status 'Status' -currentoperation $_")) + .AddParameter("ThrottleLimit", 1); + + var input = new PSDataCollection {1, 2, 3, 4, 5}; + input.Complete(); + ps.Invoke(input); + var progress = ps.Streams.Progress.ReadAll(); + Assert.IsTrue(10 < progress.Count(pr => pr.Activity == "Invoke-Parallel" || pr.Activity == "Test")); + } + } + + [TestMethod] + public void TestProgressOutput2Workers() + { + using (var ps = PowerShell.Create()) + { + ps.RunspacePool = m_runspacePool; + ps.AddScript("$ProgressPreference='Continue'", false).Invoke(); + ps.AddStatement() + .AddCommand("Invoke-Parallel", false) + .AddParameter("ScriptBlock", + ScriptBlock.Create("Write-Progress -activity 'Test' -Status 'Status' -currentoperation $_")) + .AddParameter("ThrottleLimit", 2); + + var input = new PSDataCollection { 1, 2, 3, 4, 5, 6, 7,8, 9,10 }; + input.Complete(); + ps.Invoke(input); + var progress = ps.Streams.Progress.ReadAll(); + Assert.IsTrue(19 <= progress.Count(pr => pr.Activity == "Invoke-Parallel" || pr.Activity == "Test")); + } } [TestMethod] public void TestNoProgressOutput() { - PowerShell ps = PowerShell.Create(); - ps.RunspacePool = m_runspacePool; - - ps.AddStatement() - .AddCommand("Invoke-Parallel", false) - .AddParameter("ScriptBlock", ScriptBlock.Create("Write-Progress -activity 'Test' -Status 'Status' -currentoperation $_")) - .AddParameter("ThrottleLimit", 1) - .AddParameter("NoProgress"); - - var input = new PSDataCollection { 1, 2, 3, 4, 5 }; - input.Complete(); - ps.Invoke(input); - var progress = ps.Streams.Progress.ReadAll(); - Assert.IsFalse( progress.Any(pr=>pr.Activity == "Invoke-Parallel")); - Assert.AreEqual(5, progress.Count(pr=>pr.Activity == "Test")); + using (var ps = PowerShell.Create()) + { + ps.RunspacePool = m_runspacePool; + + ps.AddStatement() + .AddCommand("Invoke-Parallel", false) + .AddParameter("ScriptBlock", + ScriptBlock.Create("Write-Progress -activity 'Test' -Status 'Status' -currentoperation $_")) + .AddParameter("ThrottleLimit", 1) + .AddParameter("NoProgress"); + + var input = new PSDataCollection {1, 2, 3, 4, 5}; + input.Complete(); + ps.Invoke(input); + var progress = ps.Streams.Progress.ReadAll(); + Assert.IsFalse(progress.Any(pr => pr.Activity == "Invoke-Parallel")); + Assert.AreEqual(5, progress.Count(pr => pr.Activity == "Test")); + } + } + + + [TestMethod] + public void TestFunctionCaptureOutput() + { + using (var ps = PowerShell.Create()) + { + ps.RunspacePool = m_runspacePool; + ps.AddScript(@" +function foo($x) {return $x * 2} +", false); + ps.AddStatement() + .AddCommand("Invoke-Parallel", false) + .AddParameter("ScriptBlock", ScriptBlock.Create("foo $_")) + .AddParameter("ThrottleLimit", 1) + .AddParameter("NoProgress"); + + var input = new PSDataCollection {1, 2, 3, 4, 5}; + input.Complete(); + var output = ps.Invoke(input); + var sum = output.Aggregate(0, (a, b) => a + b); + Assert.AreEqual(30, sum); + } } + [TestMethod] + public void TestRecursiveFunctionCaptureOutput() + { + using (var ps = PowerShell.Create()) + { + ps.RunspacePool = m_runspacePool; + ps.AddScript(@" +function foo($x) {return 2 * $x} +function bar($x) {return 3 * (foo $x)} +", false); + + ps.AddStatement() + .AddCommand("Invoke-Parallel", false) + .AddParameter("ScriptBlock", ScriptBlock.Create("bar $_")) + .AddParameter("ThrottleLimit", 1) + .AddParameter("NoProgress"); + + var input = new PSDataCollection {1, 2, 3, 4, 5}; + input.Complete(); + var output = ps.Invoke(input); + var sum = output.Aggregate(0, (a, b) => a + b); + Assert.AreEqual(90, sum); + } + } + + public void Dispose() { m_runspacePool.Dispose();