APIs
ClimaComms.ClimaComms — ModuleClimaCommsAbstracts the communications interface for the various CliMA components in order to:
- support different computational backends (CPUs, GPUs)
 - enable the use of different backends as transports (MPI, SharedArrays, etc.), and
 - transparently support single or double buffering for GPUs, depending on whether the transport has the ability to access GPU memory.
 
Loading
ClimaComms.@import_required_backends — MacroClimaComms.@import_required_backendsIf the desired context is MPI (as determined by ClimaComms.context()), try loading MPI.jl. If the desired device is CUDA (as determined by ClimaComms.device()), try loading CUDA.jl.
ClimaComms.cuda_is_required — Functioncuda_is_required()Returns a Bool indicating if CUDA should be loaded, based on the ENV["CLIMACOMMS_DEVICE"]. See ClimaComms.device for more information.
cuda_is_required() && using CUDAClimaComms.mpi_is_required — Functionmpi_is_required()Returns a Bool indicating if MPI should be loaded, based on the ENV["CLIMACOMMS_CONTEXT"]. See ClimaComms.context for more information.
mpi_is_required() && using MPIDevices
ClimaComms.AbstractDevice — TypeAbstractDeviceThe base type for a device.
ClimaComms.AbstractCPUDevice — TypeAbstractCPUDevice()Abstract device type for single-threaded and multi-threaded CPU runs.
ClimaComms.CPUSingleThreaded — TypeCPUSingleThreaded()Use the CPU with single thread.
ClimaComms.CPUMultiThreaded — TypeCPUMultiThreaded()Use the CPU with multiple thread.
ClimaComms.CUDADevice — TypeCUDADevice()Use NVIDIA GPU accelarator
ClimaComms.device — FunctionClimaComms.device()Determine the device to use depending on the CLIMACOMMS_DEVICE environment variable.
Allowed values:
CPU, single-threaded or multi-threaded depending on the number of threads;CPUSingleThreaded,CPUMultiThreaded,CUDA.
The default is CPU.
ClimaComms.device_functional — FunctionClimaComms.device_functional(device)Return true when the device is correctly set up.
ClimaComms.array_type — FunctionClimaComms.array_type(::AbstractDevice)The base array type used by the specified device (currently Array or CuArray).
ClimaComms.allowscalar — Functionallowscalar(f, ::AbstractDevice, args...; kwargs...)Device-flexible version of CUDA.@allowscalar.
Lowers to
f(args...)for CPU devices and
CUDA.@allowscalar f(args...)for CUDA devices.
This is usefully written with closures via
allowscalar(device) do
    f()
endClimaComms.@time — Macro@time device exprDevice-flexible @time.
Lowers to
@time exprfor CPU devices and
CUDA.@time exprfor CUDA devices.
ClimaComms.@elapsed — Macro@elapsed device exprDevice-flexible @elapsed.
Lowers to
@elapsed exprfor CPU devices and
CUDA.@elapsed exprfor CUDA devices.
ClimaComms.@assert — Macro@assert device cond [text]Device-flexible @assert.
Lowers to
@assert cond [text]for CPU devices and
CUDA.@cuassert cond [text]for CUDA devices.
ClimaComms.@sync — Macro@sync device exprDevice-flexible @sync.
Lowers to
@sync exprfor CPU devices and
CUDA.@sync exprfor CUDA devices.
An example use-case of this might be:
BenchmarkTools.@benchmark begin
    if ClimaComms.device() isa ClimaComms.CUDADevice
        CUDA.@sync begin
            launch_cuda_kernels_or_spawn_tasks!(...)
        end
    elseif ClimaComms.device() isa ClimaComms.CPUMultiThreading
        Base.@sync begin
            launch_cuda_kernels_or_spawn_tasks!(...)
        end
    end
endIf the CPU version of the above example does not leverage spawned tasks (which require using Base.sync or Threads.wait to synchronize), then you may want to simply use @cuda_sync.
ClimaComms.@cuda_sync — Macro@cuda_sync device exprDevice-flexible CUDA.@sync.
Lowers to
exprfor CPU devices and
CUDA.@sync exprfor CUDA devices.
Adapt.adapt_structure — MethodAdapt.adapt_structure(::Type{<:AbstractArray}, device::AbstractDevice)Adapt a given device to a device associated with the given array type.
Example
Adapt.adapt_structure(Array, ClimaComms.CUDADevice()) -> ClimaComms.CPUSingleThreaded()By default, adapting to Array creates a CPUSingleThreaded device, and there is currently no way to conver to a CPUMultiThreaded device.
ClimaComms.@threaded — Macro@threaded [device] [coarsen=...] [block_size=...] for ... endDevice-flexible generalization of Threads.@threads, which distributes the iterations of a for-loop across multiple threads, with the option to control thread coarsening and GPU kernel configuration. Coarsening makes each thread evaluate more than one iteration of the loop, which can improve performance by reducing the runtime overhead of launching additional threads (though too much coarsening worsens performance because it reduces parallelization). The device is either inferred by calling ClimaComms.device(), or it can be specified manually, with the following device-dependent behavior:
When
deviceis aCPUSingleThreaded(), the loop is evaluated as-is. This avoids the runtime overhead of callingThreads.@threadswith a single thread, and, when the device type is statically inferrable, it also avoids compilation overhead.When
deviceis aCPUMultiThreaded(), the loop is passed toThreads.@threads. This supports three different kinds of "schedulers" for determining how many iterations of the loop to evaluate in each thread:- (default) a "dynamic" scheduler that changes the number of iterations as new threads are launched,
 - a "static" scheduler that evaluates a fixed number of iterations per thread, and
 - a "greedy" scheduler that uses a small number of threads, continuously evaluating iterations in each thread until the loop is completed (only available as of Julia 1.11).
 
Setting
coarsento:dynamicor:greedylaunches threads with those schedulers. Setting it to:staticor an integer value launches threads with static scheduling (using:staticis similar to using1, but slightly more performant). To read more about multi-threading, see the documentation forThreads.@threads.When
deviceis aCUDADevice(), the loop is compiled withCUDA.@cudaand run withCUDA.@sync. Since CUDA launches all threads at the same time, only static scheduling can be used. Settingcoarsento any symbol causes each thread to evaluate a single iteration (default), and setting it to an integer value causes each thread to evaluate that number of iterations (the default is similar to using1, but slightly more performant). If the total number of iterations in the loop is extremely large, the specified coarsening may require more threads than can be simultaneously launched on the GPU, in which case the amount of coarsening is automatically increased.The optional argument
block_sizeis also available for manually specifying the size of each block on a GPU. The default value of:autosets the number of threads in each block to the largest possible value that permits a high GPU "occupancy" (the number of active thread warps in each multiprocessor executing the kernel). An integer can be used instead of:autoto override this default value. If the specified value exceeds the total number of threads, it is automatically decreased to avoid idle threads.
Any iterator with methods for firstindex, length, and getindex can be used in a @threaded loop. All lazy iterators from Base and Base.Iterators, such as zip, enumerate, Iterators.product, and generator expressions, are also compatible with @threaded. (Although these iterators do not define methods for getindex, they are automatically modified by threadable to support getindex.) Using multiple iterators with @threaded is equivalent to looping over a single Iterators.product, with the innermost iterator of the loop appearing first in the product, and the outermost iterator appearing last.
NOTE: When a value in the body of the loop has a type that cannot be inferred by the compiler, an InvalidIRError will be thrown during compilation for a CUDADevice(). In particular, global variables are not inferrable, so @threaded must be wrapped in a function whenever it is used in the REPL:
julia> a = CUDA.CuArray{Int}(undef, 100); b = similar(a);
julia> threaded_copyto!(a, b) = ClimaComms.@threaded for i in axes(a, 1)
           a[i] = b[i]
       end
threaded_copyto! (generic function with 1 method)
julia> threaded_copyto!(a, b)
julia> ClimaComms.@threaded for i in axes(a, 1)
           a[i] = b[i]
       end
ERROR: InvalidIRError: ...Moreover, type variables are not inferrable across function boundaries, so types used in a threaded loop cannot be precomputed before the loop:
julia> threaded_add_epsilon!(a) = ClimaComms.@threaded for i in axes(a, 1)
           FT = eltype(a)
           a[i] += eps(FT)
       end
threaded_add_epsilon! (generic function with 1 method)
julia> threaded_add_epsilon!(a)
julia> function threaded_add_epsilon!(a)
           FT = eltype(a)
           ClimaComms.@threaded for i in axes(a, 1)
               a[i] += eps(FT)
           end
       end
threaded_add_epsilon! (generic function with 1 method)
julia> threaded_add_epsilon!(a)
ERROR: InvalidIRError: ...To fix other kinds of inference issues on GPUs, especially ones brought about by indexing into iterators with nonuniform element types, see UnrolledUtilities.jl.
ClimaComms.threaded — Functionthreaded(f, device, itrs...; kwargs...)Functional form of @threaded. If there are n iterators and f is a function of n arguments, the threaded function is similar to
@threaded device [kwargs...] for xₙ in itrs[n], ..., x₂ in itrs[2], x₁ in itrs[1]
    f(x₁, x₂, ..., xₙ)
endOn single-threaded CPU devices, the @threaded macro inlines the for-loop without any intermediate function calls, so that it has a lower latency than the threaded function. On other devices, the only difference between the macro and the function is that keyword argument symbols like :dynamic and :auto must be wrapped in Vals for the function.
ClimaComms.threadable — Functionthreadable(device, itr)Modifies an iterator to ensure that it can be used in a @threaded loop. This will typically return itr or a ThreadableWrapper of itr.
ClimaComms.ThreadableWrapper — TypeThreadableWrapperWrapper for an iterator from Base or Iterators that can be used in @threaded, with methods for firstindex, length, and getindex. The getindex method only supports linear indices between firstindex and firstindex + length - 1. For the ThreadableWrapper of Iterators.product, getindex converts each linear index to a Cartesian index using regular integer division on CPUs and Base.multiplicativeinverse on GPUs.
Contexts
ClimaComms.AbstractCommsContext — TypeAbstractCommsContextThe base type for a communications context. Each backend defines a concrete subtype of this.
ClimaComms.SingletonCommsContext — TypeSingletonCommsContext(device=device())A singleton communications context, used for single-process runs. ClimaComms.AbstractCPUDevice and ClimaComms.CUDADevice device options are currently supported.
ClimaComms.MPICommsContext — TypeMPICommsContext()
MPICommsContext(device)
MPICommsContext(device, comm)A MPI communications context, used for distributed runs. AbstractCPUDevice and CUDADevice device options are currently supported.
ClimaComms.AbstractGraphContext — TypeAbstractGraphContextA context for communicating between processes in a graph.
ClimaComms.context — FunctionClimaComms.context(device=device())Construct a default communication context.
By default, it will try to determine if it is running inside an MPI environment variables are set; if so it will return a MPICommsContext; otherwise it will return a SingletonCommsContext.
Behavior can be overridden by setting the CLIMACOMMS_CONTEXT environment variable to either MPI or SINGLETON.
ClimaComms.graph_context — Functiongraph_context(context::AbstractCommsContext,
              sendarray, sendlengths, sendpids,
              recvarray, recvlengths, recvpids)Construct a communication context for exchanging neighbor data via a graph.
Arguments:
context: the communication context on which to construct the graph context.sendarray: array containing data to sendsendlengths: list of lengths of data to send to each process IDsendpids: list of processor IDs to sendrecvarray: array to receive data intorecvlengths: list of lengths of data to receive from each process IDrecvpids: list of processor IDs to receive from
This should return an AbstractGraphContext object.
Adapt.adapt_structure — MethodAdapt.adapt_structure(::Type{<:AbstractArray}, context::AbstractCommsContext)Adapt a given context to a context with a device associated with the given array type.
Example
Adapt.adapt_structure(Array, ClimaComms.context(ClimaComms.CUDADevice())) -> ClimaComms.CPUSingleThreaded()By default, adapting to Array creates a CPUSingleThreaded device, and there is currently no way to conver to a CPUMultiThreaded device.
Logging
ClimaComms.OnlyRootLogger — FunctionOnlyRootLogger()
OnlyRootLogger(ctx::AbstractCommsContext)Return a logger that silences non-root processes.
If no context is passed, obtain the default context via context.
ClimaComms.MPILogger — FunctionMPILogger(context::AbstractCommsContext)
MPILogger(iostream, context)Add a rank prefix before log messages.
Outputs to stdout if no IOStream is given.
ClimaComms.FileLogger — FunctionFileLogger(context, log_dir; log_stdout = true, min_level = Logging.Info)Log MPI ranks to different files within the log_dir.
The minimum logging level is set using min_level. If log_stdout = true, root process logs will be sent to stdout as well.
Context operations
ClimaComms.init — Function(pid, nprocs) = init(ctx::AbstractCommsContext)Perform any necessary initialization for the specified backend. Return a tuple of the processor ID and the number of participating processors.
ClimaComms.mypid — Functionmypid(ctx::AbstractCommsContext)Return the processor ID.
ClimaComms.iamroot — Functioniamroot(ctx::AbstractCommsContext)Return true if the calling processor is the root processor.
ClimaComms.nprocs — Functionnprocs(ctx::AbstractCommsContext)Return the number of participating processors.
ClimaComms.abort — Functionabort(ctx::CC, status::Int) where {CC <: AbstractCommsContext}Terminate the caller and all participating processors with the specified status.
Collective operations
ClimaComms.barrier — Functionbarrier(ctx::CC) where {CC <: AbstractCommsContext}Perform a global synchronization across all participating processors.
ClimaComms.reduce — Functionreduce(ctx::CC, val, op) where {CC <: AbstractCommsContext}Perform a reduction across all participating processors, using op as the reduction operator and val as this rank's reduction value. Return the result to the first processor only.
ClimaComms.reduce! — Functionreduce!(ctx::CC, sendbuf, recvbuf, op)
reduce!(ctx::CC, sendrecvbuf, op)Performs elementwise reduction using the operator op on the buffer sendbuf, storing the result in the recvbuf of the process. If only one sendrecvbuf buffer is provided, then the operation is performed in-place.
ClimaComms.allreduce — Functionallreduce(ctx::CC, sendbuf, op)Performs elementwise reduction using the operator op on the buffer sendbuf, allocating a new array for the result. sendbuf can also be a scalar, in which case recvbuf will be a value of the same type.
ClimaComms.allreduce! — Functionallreduce!(ctx::CC, sendbuf, recvbuf, op)
allreduce!(ctx::CC, sendrecvbuf, op)Performs elementwise reduction using the operator op on the buffer sendbuf, storing the result in the recvbuf of all processes in the group. Allreduce! is equivalent to a Reduce! operation followed by a Bcast!, but can lead to better performance. If only one sendrecvbuf buffer is provided, then the operation is performed in-place.
ClimaComms.bcast — Functionbcast(ctx::AbstractCommsContext, object)Broadcast object from the root process to all other processes. The value of object on non-root processes is ignored.
Graph exchange
ClimaComms.start — Functionstart(ctx::AbstractGraphContext)Initiate graph data exchange.
ClimaComms.progress — Functionprogress(ctx::AbstractGraphContext)Drive communication. Call after start to ensure that communication proceeds asynchronously.
ClimaComms.finish — Functionfinish(ctx::AbstractGraphContext)Complete the communications step begun by start(). After this returns, data received from all neighbors will be available in the stage areas of each neighbor's receive buffer.