APIs

ClimaComms.ClimaCommsModule
ClimaComms

Abstracts the communications interface for the various CliMA components in order to:

  • support different computational backends (CPUs, GPUs)
  • enable the use of different backends as transports (MPI, SharedArrays, etc.), and
  • transparently support single or double buffering for GPUs, depending on whether the transport has the ability to access GPU memory.
source

Loading

ClimaComms.@import_required_backendsMacro
ClimaComms.@import_required_backends

If the desired context is MPI (as determined by ClimaComms.context()), try loading MPI.jl. If the desired device is CUDA (as determined by ClimaComms.device()), try loading CUDA.jl.

source

Devices

ClimaComms.deviceFunction
ClimaComms.device()

Determine the device to use depending on the CLIMACOMMS_DEVICE environment variable.

Allowed values:

  • CPU, single-threaded or multi-threaded depending on the number of threads;
  • CPUSingleThreaded,
  • CPUMultiThreaded,
  • CUDA.

The default is CPU.

source
ClimaComms.array_typeFunction
ClimaComms.array_type(::AbstractDevice)

The base array type used by the specified device (currently Array or CuArray).

source
ClimaComms.allowscalarFunction
allowscalar(f, ::AbstractDevice, args...; kwargs...)

Device-flexible version of CUDA.@allowscalar.

Lowers to

f(args...)

for CPU devices and

CUDA.@allowscalar f(args...)

for CUDA devices.

This is usefully written with closures via

allowscalar(device) do
    f()
end
source
ClimaComms.@timeMacro
@time device expr

Device-flexible @time.

Lowers to

@time expr

for CPU devices and

CUDA.@time expr

for CUDA devices.

source
ClimaComms.@elapsedMacro
@elapsed device expr

Device-flexible @elapsed.

Lowers to

@elapsed expr

for CPU devices and

CUDA.@elapsed expr

for CUDA devices.

source
ClimaComms.@assertMacro
@assert device cond [text]

Device-flexible @assert.

Lowers to

@assert cond [text]

for CPU devices and

CUDA.@cuassert cond [text]

for CUDA devices.

source
ClimaComms.@syncMacro
@sync device expr

Device-flexible @sync.

Lowers to

@sync expr

for CPU devices and

CUDA.@sync expr

for CUDA devices.

An example use-case of this might be:

BenchmarkTools.@benchmark begin
    if ClimaComms.device() isa ClimaComms.CUDADevice
        CUDA.@sync begin
            launch_cuda_kernels_or_spawn_tasks!(...)
        end
    elseif ClimaComms.device() isa ClimaComms.CPUMultiThreading
        Base.@sync begin
            launch_cuda_kernels_or_spawn_tasks!(...)
        end
    end
end

If the CPU version of the above example does not leverage spawned tasks (which require using Base.sync or Threads.wait to synchronize), then you may want to simply use @cuda_sync.

source
ClimaComms.@cuda_syncMacro
@cuda_sync device expr

Device-flexible CUDA.@sync.

Lowers to

expr

for CPU devices and

CUDA.@sync expr

for CUDA devices.

source
Adapt.adapt_structureMethod
Adapt.adapt_structure(::Type{<:AbstractArray}, device::AbstractDevice)

Adapt a given device to a device associated with the given array type.

Example

Adapt.adapt_structure(Array, ClimaComms.CUDADevice()) -> ClimaComms.CPUSingleThreaded()
Note

By default, adapting to Array creates a CPUSingleThreaded device, and there is currently no way to conver to a CPUMultiThreaded device.

source
ClimaComms.@threadedMacro
@threaded [device] [coarsen=...] [block_size=...] for ... end

Device-flexible generalization of Threads.@threads, which distributes the iterations of a for-loop across multiple threads, with the option to control thread coarsening and GPU kernel configuration. Coarsening makes each thread evaluate more than one iteration of the loop, which can improve performance by reducing the runtime overhead of launching additional threads (though too much coarsening worsens performance because it reduces parallelization). The device is either inferred by calling ClimaComms.device(), or it can be specified manually, with the following device-dependent behavior:

  • When device is a CPUSingleThreaded(), the loop is evaluated as-is. This avoids the runtime overhead of calling Threads.@threads with a single thread, and, when the device type is statically inferrable, it also avoids compilation overhead.

  • When device is a CPUMultiThreaded(), the loop is passed to Threads.@threads. This supports three different kinds of "schedulers" for determining how many iterations of the loop to evaluate in each thread:

    1. (default) a "dynamic" scheduler that changes the number of iterations as new threads are launched,
    2. a "static" scheduler that evaluates a fixed number of iterations per thread, and
    3. a "greedy" scheduler that uses a small number of threads, continuously evaluating iterations in each thread until the loop is completed (only available as of Julia 1.11).

    Setting coarsen to :dynamic or :greedy launches threads with those schedulers. Setting it to :static or an integer value launches threads with static scheduling (using :static is similar to using 1, but slightly more performant). To read more about multi-threading, see the documentation for Threads.@threads.

  • When device is a CUDADevice(), the loop is compiled with CUDA.@cuda and run with CUDA.@sync. Since CUDA launches all threads at the same time, only static scheduling can be used. Setting coarsen to any symbol causes each thread to evaluate a single iteration (default), and setting it to an integer value causes each thread to evaluate that number of iterations (the default is similar to using 1, but slightly more performant). If the total number of iterations in the loop is extremely large, the specified coarsening may require more threads than can be simultaneously launched on the GPU, in which case the amount of coarsening is automatically increased.

    The optional argument block_size is also available for manually specifying the size of each block on a GPU. The default value of :auto sets the number of threads in each block to the largest possible value that permits a high GPU "occupancy" (the number of active thread warps in each multiprocessor executing the kernel). An integer can be used instead of :auto to override this default value. If the specified value exceeds the total number of threads, it is automatically decreased to avoid idle threads.

Any iterator with methods for firstindex, length, and getindex can be used in a @threaded loop. All lazy iterators from Base and Base.Iterators, such as zip, enumerate, Iterators.product, and generator expressions, are also compatible with @threaded. (Although these iterators do not define methods for getindex, they are automatically modified by threadable to support getindex.) Using multiple iterators with @threaded is equivalent to looping over a single Iterators.product, with the innermost iterator of the loop appearing first in the product, and the outermost iterator appearing last.

NOTE: When a value in the body of the loop has a type that cannot be inferred by the compiler, an InvalidIRError will be thrown during compilation for a CUDADevice(). In particular, global variables are not inferrable, so @threaded must be wrapped in a function whenever it is used in the REPL:

julia> a = CUDA.CuArray{Int}(undef, 100); b = similar(a);

julia> threaded_copyto!(a, b) = ClimaComms.@threaded for i in axes(a, 1)
           a[i] = b[i]
       end
threaded_copyto! (generic function with 1 method)

julia> threaded_copyto!(a, b)

julia> ClimaComms.@threaded for i in axes(a, 1)
           a[i] = b[i]
       end
ERROR: InvalidIRError: ...

Moreover, type variables are not inferrable across function boundaries, so types used in a threaded loop cannot be precomputed before the loop:

julia> threaded_add_epsilon!(a) = ClimaComms.@threaded for i in axes(a, 1)
           FT = eltype(a)
           a[i] += eps(FT)
       end
threaded_add_epsilon! (generic function with 1 method)

julia> threaded_add_epsilon!(a)

julia> function threaded_add_epsilon!(a)
           FT = eltype(a)
           ClimaComms.@threaded for i in axes(a, 1)
               a[i] += eps(FT)
           end
       end
threaded_add_epsilon! (generic function with 1 method)

julia> threaded_add_epsilon!(a)
ERROR: InvalidIRError: ...

To fix other kinds of inference issues on GPUs, especially ones brought about by indexing into iterators with nonuniform element types, see UnrolledUtilities.jl.

source
ClimaComms.threadedFunction
threaded(f, device, itrs...; kwargs...)

Functional form of @threaded. If there are n iterators and f is a function of n arguments, the threaded function is similar to

@threaded device [kwargs...] for xₙ in itrs[n], ..., x₂ in itrs[2], x₁ in itrs[1]
    f(x₁, x₂, ..., xₙ)
end

On single-threaded CPU devices, the @threaded macro inlines the for-loop without any intermediate function calls, so that it has a lower latency than the threaded function. On other devices, the only difference between the macro and the function is that keyword argument symbols like :dynamic and :auto must be wrapped in Vals for the function.

source
ClimaComms.threadableFunction
threadable(device, itr)

Modifies an iterator to ensure that it can be used in a @threaded loop. This will typically return itr or a ThreadableWrapper of itr.

source
ClimaComms.ThreadableWrapperType
ThreadableWrapper

Wrapper for an iterator from Base or Iterators that can be used in @threaded, with methods for firstindex, length, and getindex. The getindex method only supports linear indices between firstindex and firstindex + length - 1. For the ThreadableWrapper of Iterators.product, getindex converts each linear index to a Cartesian index using regular integer division on CPUs and Base.multiplicativeinverse on GPUs.

source

Contexts

ClimaComms.contextFunction
ClimaComms.context(device=device())

Construct a default communication context.

By default, it will try to determine if it is running inside an MPI environment variables are set; if so it will return a MPICommsContext; otherwise it will return a SingletonCommsContext.

Behavior can be overridden by setting the CLIMACOMMS_CONTEXT environment variable to either MPI or SINGLETON.

source
ClimaComms.graph_contextFunction
graph_context(context::AbstractCommsContext,
              sendarray, sendlengths, sendpids,
              recvarray, recvlengths, recvpids)

Construct a communication context for exchanging neighbor data via a graph.

Arguments:

  • context: the communication context on which to construct the graph context.
  • sendarray: array containing data to send
  • sendlengths: list of lengths of data to send to each process ID
  • sendpids: list of processor IDs to send
  • recvarray: array to receive data into
  • recvlengths: list of lengths of data to receive from each process ID
  • recvpids: list of processor IDs to receive from

This should return an AbstractGraphContext object.

source
Adapt.adapt_structureMethod
Adapt.adapt_structure(::Type{<:AbstractArray}, context::AbstractCommsContext)

Adapt a given context to a context with a device associated with the given array type.

Example

Adapt.adapt_structure(Array, ClimaComms.context(ClimaComms.CUDADevice())) -> ClimaComms.CPUSingleThreaded()
Note

By default, adapting to Array creates a CPUSingleThreaded device, and there is currently no way to conver to a CPUMultiThreaded device.

source

Logging

ClimaComms.OnlyRootLoggerFunction
OnlyRootLogger()
OnlyRootLogger(ctx::AbstractCommsContext)

Return a logger that silences non-root processes.

If no context is passed, obtain the default context via context.

source
ClimaComms.MPILoggerFunction
MPILogger(context::AbstractCommsContext)
MPILogger(iostream, context)

Add a rank prefix before log messages.

Outputs to stdout if no IOStream is given.

source
ClimaComms.FileLoggerFunction
FileLogger(context, log_dir; log_stdout = true, min_level = Logging.Info)

Log MPI ranks to different files within the log_dir.

The minimum logging level is set using min_level. If log_stdout = true, root process logs will be sent to stdout as well.

source

Context operations

ClimaComms.initFunction
(pid, nprocs) = init(ctx::AbstractCommsContext)

Perform any necessary initialization for the specified backend. Return a tuple of the processor ID and the number of participating processors.

source
ClimaComms.iamrootFunction
iamroot(ctx::AbstractCommsContext)

Return true if the calling processor is the root processor.

source
ClimaComms.nprocsFunction
nprocs(ctx::AbstractCommsContext)

Return the number of participating processors.

source
ClimaComms.abortFunction
abort(ctx::CC, status::Int) where {CC <: AbstractCommsContext}

Terminate the caller and all participating processors with the specified status.

source

Collective operations

ClimaComms.barrierFunction
barrier(ctx::CC) where {CC <: AbstractCommsContext}

Perform a global synchronization across all participating processors.

source
ClimaComms.reduceFunction
reduce(ctx::CC, val, op) where {CC <: AbstractCommsContext}

Perform a reduction across all participating processors, using op as the reduction operator and val as this rank's reduction value. Return the result to the first processor only.

source
ClimaComms.reduce!Function
reduce!(ctx::CC, sendbuf, recvbuf, op)
reduce!(ctx::CC, sendrecvbuf, op)

Performs elementwise reduction using the operator op on the buffer sendbuf, storing the result in the recvbuf of the process. If only one sendrecvbuf buffer is provided, then the operation is performed in-place.

source
ClimaComms.allreduceFunction
allreduce(ctx::CC, sendbuf, op)

Performs elementwise reduction using the operator op on the buffer sendbuf, allocating a new array for the result. sendbuf can also be a scalar, in which case recvbuf will be a value of the same type.

source
ClimaComms.allreduce!Function
allreduce!(ctx::CC, sendbuf, recvbuf, op)
allreduce!(ctx::CC, sendrecvbuf, op)

Performs elementwise reduction using the operator op on the buffer sendbuf, storing the result in the recvbuf of all processes in the group. Allreduce! is equivalent to a Reduce! operation followed by a Bcast!, but can lead to better performance. If only one sendrecvbuf buffer is provided, then the operation is performed in-place.

source
ClimaComms.bcastFunction
bcast(ctx::AbstractCommsContext, object)

Broadcast object from the root process to all other processes. The value of object on non-root processes is ignored.

source

Graph exchange

ClimaComms.progressFunction
progress(ctx::AbstractGraphContext)

Drive communication. Call after start to ensure that communication proceeds asynchronously.

source
ClimaComms.finishFunction
finish(ctx::AbstractGraphContext)

Complete the communications step begun by start(). After this returns, data received from all neighbors will be available in the stage areas of each neighbor's receive buffer.

source