返回全部 Skills

zarr-python

数据处理

分块N维数组用于云存储(Zarr-Python 3)。压缩数组、并行I/O、通过fsspec支持S3/GCS,与NumPy/Dask/Xarray兼容,适用于大规模科学计算流程。

2

下载量

AI SkillHub 能力展示图

安装方式

命令行安装

在项目根目录执行以下命令,完成 Skill 安装。

npx bzskills add k-dense-ai/scientific-agent-skills --skill zarr-python

skill.md

name: zarr-python
description: 分块N维数组用于云存储(Zarr-Python 3)。压缩数组、并行I/O、通过fsspec支持S3/GCS,与NumPy/Dask/Xarray兼容,适用于大规模科学计算流程。
allowed-tools: Read Write Edit Bash
license: MIT license
compatibility: Requires Python 3.12+ and zarr 3.x. Cloud I/O needs zarr[remote] plus pinned s3fs or gcsfs. Legacy Zarr v2 workflows need exact 2.x pins on older Python.
metadata: {"version": "1.1", "skill-author": "K-Dense Inc."}

Zarr Python

概述

Zarr 是一个 Python 库,用于存储大型 N 维数组,支持分块和压缩。应用本技能可实现高效的并行 I/O、云原生工作流,以及与 NumPy、Dask 和 Xarray 的无缝集成。

当前上游版本: zarr 3.2.1(发布于 2026-05-05)。文档:zarr.readthedocs.io。新数组默认使用 Zarr 格式 3;若需向后兼容,请设置 zarr_format=2。Zarr 3.2 增加了矩形分块,并继续完善 v3 编解码器管线。本技能是 社区指南,由 K-Dense Inc. 维护,并非官方 zarr-developers 软件包。

快速入门

安装

uv pip install "zarr==3.2.1"

当前稳定的 Zarr-Python 需要 Python 3.12+ 和 NumPy 2.0+。对于远程存储(S3、GCS、HTTP),请在项目锁定文件中指定可选额外/后端:

uv pip install "zarr[remote]==3.2.1" "s3fs==2026.4.0" "gcsfs==2026.5.0"

仅当您的项目拥有已提交的锁定文件和兼容性测试时,才使用版本范围如 zarr>=3,<4。对于 Zarr-Python 2 / Python 3.10–3.11 工作流,请从 support-v2 发行说明中选择确切的 zarr==2.x.y 补丁版本,并提交生成的锁定文件。

基础数组创建

import zarr
import numpy as np

# 创建一个带有分块和压缩的 2D 数组
z = zarr.create_array(
    store="data/my_array.zarr",
    shape=(10000, 10000),
    chunks=(1000, 1000),
    dtype="f4"
)

# 使用 NumPy 风格索引写入数据
z[:, :] = np.random.random((10000, 10000))

# 读取数据
data = z[0:100, 0:100]  # 返回 NumPy 数组

核心操作

创建数组

Zarr 提供了多个便捷函数用于创建数组:

# 创建空数组
z = zarr.zeros(shape=(10000, 10000), chunks=(1000, 1000), dtype='f4',
               store='data.zarr')

# 创建填充数组
z = zarr.ones((5000, 5000), chunks=(500, 500))
z = zarr.full((1000, 1000), fill_value=42, chunks=(100, 100))

# 从现有数据创建
data = np.arange(10000).reshape(100, 100)
z = zarr.array(data, chunks=(10, 10), store='data.zarr')

# 仿照另一个数组创建
z2 = zarr.zeros_like(z)  # 匹配 z 的形状、分块和 dtype

打开现有数组

# 打开数组(默认为读写模式)
z = zarr.open_array('data.zarr', mode='r+')

# 只读模式
z = zarr.open_array('data.zarr', mode='r')

# open() 函数会自动检测数组或组
z = zarr.open('data.zarr')  # 返回 Array 或 Group

读写数据

Zarr 数组支持类似 NumPy 的索引:

# 写入整个数组
z[:] = 42

# 写入切片
z[0, :] = np.arange(100)
z[10:20, 50:60] = np.random.random((10, 10))

# 读取数据(返回 NumPy 数组)
data = z[0:100, 0:100]
row = z[5, :]

# 高级索引
z.vindex[[0, 5, 10], [2, 8, 15]]  # 坐标索引
z.oindex[0:10, [5, 10, 15]]       # 正交索引
z.blocks[0, 0]                     # 块/分块索引

调整大小和追加

# 调整数组大小(v3:将形状作为元组传递)
z.resize((15000, 15000))

# 沿某个轴追加数据
z.append(np.random.random((1000, 10000)), axis=0)  # 添加行

分块策略

分块对性能至关重要。根据访问模式选择分块大小和形状。

分块大小指南

  • 最小分块大小:建议 1 MB 以获得最佳性能
  • 平衡:较大的分块 = 较少的元数据操作;较小的分块 = 更好的并行访问
  • 内存考虑:压缩时整个分块必须能放入内存
# 配置分块大小(目标约为每个分块 1MB)
# 对于 float32 数据:1MB = 262,144 个元素 = 512×512 数组
z = zarr.zeros(
    shape=(10000, 10000),
    chunks=(512, 512),  # ~1MB 分块
    dtype='f4'
)

使分块与访问模式对齐

关键:分块形状会显著影响性能,取决于数据的访问方式。

# 如果频繁按行访问(第一个维度)
z = zarr.zeros((10000, 10000), chunks=(10, 10000))  # 分块跨越列

# 如果频繁按列访问(第二个维度)
z = zarr.zeros((10000, 10000), chunks=(10000, 10))  # 分块跨越行

# 混合访问模式(平衡方法)
z = zarr.zeros((10000, 10000), chunks=(1000, 1000))  # 方形分块

性能示例:对于 (200, 200, 200) 的数组,沿第一个维度读取:

  • 使用分块 (1, 200, 200):约 107ms
  • 使用分块 (200, 200, 1):约 1.65ms(快 65 倍!)

矩形分块和分片

Zarr 3.2 支持不均匀网格的 矩形分块。当某个维度具有可变瓦片大小时,传递嵌套的分块长度:

z = zarr.create_array(
    store="rectilinear.zarr",
    shape=(60, 100),
    chunks=([10, 20, 30], [50, 50]),
    dtype="f4",
)

当数组包含数百万个小分块时,使用 分片 将分块分组为更大的存储对象:

# 创建带有分片的数组
z = zarr.create_array(
    store='data.zarr',
    shape=(100000, 100000),
    chunks=(100, 100),  # 用于访问的小分块
    shards=(1000, 1000),  # 每个分片包含 100 个分块
    dtype='f4'
)

好处

  • 减少因数百万个小文件带来的文件系统开销
  • 提升云存储性能(更少的对象请求)
  • 避免文件系统块大小浪费

重要:写入前整个分片必须能放入内存。

压缩

Zarr 对每个分块应用压缩,以在保持快速访问的同时减少存储空间。

配置压缩

from zarr.codecs import BloscCodec, BloscShuffle, GzipCodec

# 默认:Blosc with Zstandard
z = zarr.zeros((1000, 1000), chunks=(100, 100))  # 使用默认压缩

# 配置 Blosc 压缩
z = zarr.create_array(
    store='data.zarr',
    shape=(1000, 1000),
    chunks=(100, 100),
    dtype='f4',
    compressors=BloscCodec(cname='zstd', clevel=5, shuffle=BloscShuffle.bitshuffle)
)

# 可用的 Blosc 压缩器:'blosclz', 'lz4', 'lz4hc', 'snappy', 'zlib', 'zstd'

# 使用 Gzip 压缩
z = zarr.create_array(
    store='data.zarr',
    shape=(1000, 1000),
    chunks=(100, 100),
    dtype='f4',
    compressors=GzipCodec(level=6)
)

# 禁用压缩
z = zarr.create_array(
    store='data.zarr',
    shape=(1000, 1000),
    chunks=(100, 100),
    dtype='f4',
    compressors=None
)

压缩性能提示

  • Blosc(默认):快速压缩/解压,适合交互式工作负载
  • Zstandard:更好的压缩比,比 LZ4 稍慢
  • Gzip:最大压缩,性能较慢
  • LZ4:最快的压缩,较低压缩比
  • Shuffle:对数值数据启用 shuffle 过滤器以获得更好的压缩
# 数值科学数据的最佳选择
compressors=BloscCodec(cname='zstd', clevel=5, shuffle=BloscShuffle.bitshuffle)

# 速度最佳选择
compressors=BloscCodec(cname='lz4', clevel=1)

# 压缩比最佳选择
compressors=GzipCodec(level=9)

存储后端

Zarr 通过灵活的存储接口支持多种存储后端。

本地文件系统(默认)

from zarr.storage import LocalStore

# 显式创建存储
store = LocalStore('data/my_array.zarr')
z = zarr.open_array(store=store, mode='w', shape=(1000, 1000), chunks=(100, 100))

# 或使用字符串路径(自动创建 LocalStore)
z = zarr.open_array('data/my_array.zarr', mode='w', shape=(1000, 1000),
                    chunks=(100, 100))

内存存储

from zarr.storage import MemoryStore

# 创建内存存储
store = MemoryStore()
z = zarr.open_array(store=store, mode='w', shape=(1000, 1000), chunks=(100, 100))

# 数据仅在内存中存在,不会持久化

ZIP 文件存储

from zarr.storage import ZipStore

# 写入 ZIP 文件
store = ZipStore('data.zip', mode='w')
z = zarr.open_array(store=store, mode='w', shape=(1000, 1000), chunks=(100, 100))
z[:] = np.random.random((1000, 1000))
store.close()  # 重要:必须关闭 ZipStore

# 从 ZIP 文件读取
store = ZipStore('data.zip', mode='r')
z = zarr.open_array(store=store)
data = z[:]
store.close()

云存储(S3、GCS)

Zarr 3 通过 URI 字符串或 FsspecStore(优于旧的 S3Map/GCSMap)使用 fsspec 后端。

import zarr

# S3 — 推荐使用 IAM 角色/配置文件;fsspec 处理提供商凭证发现。
# 切勿在提示或笔记本中打印、记录或复制凭证值。
z = zarr.create_array(
    store="s3://my-bucket/path/to/array.zarr",
    shape=(1000, 1000),
    chunks=(100, 100),
    dtype="f4",
    storage_options={"anon": False},
)
z[:] = data

# GCS — 推荐使用工作负载身份或 gcloud 应用默认凭证。
z = zarr.open_array(
    "gs://my-bucket/path/to/array.zarr",
    mode="r",
    storage_options={"project": "my-project"},
)

# 显式存储(任意 fsspec 文件系统)
from zarr.storage import FsspecStore
store = FsspecStore.from_url("s3://my-bucket/data.zarr", storage_options={"anon": False})
root = zarr.open_group(store=store, mode="r+")

云后端通过提供商 SDK/fsspec 后端读取凭证。不要检查宽泛的 .env 文件;如果用户明确需要帮助调试认证,请询问已脱敏的配置,并仅读取他们允许的命名提供商变量。将所有 import zarrimport daskimport h5pyimport xarray 示例视为第三方包导入,而非捆绑的脚本文件。

云存储最佳实践

  • 使用合并元数据以减少延迟:zarr.consolidate_metadata(store)
  • 使分块大小与云对象大小对齐(通常 5-100 MB 最佳)
  • 使用 Dask 启用并行写入以处理大规模数据
  • 考虑分片以减少对象数量

组与层级结构

组以层次结构组织多个数组,类似于目录或 HDF5 组。

创建和使用组

# 创建根组
root = zarr.group(store='data/hierarchy.zarr')

# 创建子组
temperature = root.create_group('temperature')
precipitation = root.create_group('precipitation')

# 在组内创建数组
temp_array = temperature.create_array(
    name='t2m',
    shape=(365, 720, 1440),
    chunks=(1, 720, 1440),
    dtype='f4'
)

precip_array = precipitation.create_array(
    name='prcp',
    shape=(365, 720, 1440),
    chunks=(1, 720, 1440),
    dtype='f4'
)

# 通过路径访问
array = root['temperature/t2m']

# 可视化层级结构
print(root.tree())
# 输出:
# /
#  ├── temperature
#  │   └── t2m (365, 720, 1440) f4
#  └── precipitation
#      └── prcp (365, 720, 1440) f4

组 API(v3)

使用 create_array / require_array(v3 中已移除 h5py 风格的 create_dataset / require_dataset):

root = zarr.group('data.zarr')
arr = root.create_array('my_data', shape=(1000, 1000), chunks=(100, 100), dtype='f4')

grp = root.require_group('subgroup')
arr2 = grp.require_array('array', shape=(500, 500), chunks=(50, 50), dtype='i4')

属性与元数据

使用属性向数组和组附加自定义元数据:

# 为数组添加属性
z = zarr.zeros((1000, 1000), chunks=(100, 100))
z.attrs['description'] = 'Temperature data in Kelvin'
z.attrs['units'] = 'K'
z.attrs['created'] = '2024-01-15'
z.attrs['processing_version'] = 2.1

# 属性以 JSON 格式存储
print(z.attrs['units'])  # 输出: K

# 为组添加属性
root = zarr.group('data.zarr')
root.attrs['project'] = 'Climate Analysis'
root.attrs['institution'] = 'Research Institute'

# 属性随数组/组持久化
z2 = zarr.open('data.zarr')
print(z2.attrs['description'])

重要:属性必须是 JSON 可序列化的(字符串、数字、列表、字典、布尔值、null)。

与 NumPy、Dask 和 Xarray 集成

NumPy 集成

Zarr 数组实现了 NumPy 数组接口:

import numpy as np
import zarr

z = zarr.zeros((1000, 1000), chunks=(100, 100))

# 直接使用 NumPy 函数
result = np.sum(z, axis=0)  # NumPy 对 Zarr 数组进行操作
mean = np.mean(z[:100, :100])

# 转换为 NumPy 数组
numpy_array = z[:]  # 将整个数组加载到内存中

Dask 集成

Dask 提供了对 Zarr 数组的惰性、并行计算:

import dask.array as da
import zarr

# 创建大型 Zarr 数组
z = zarr.open('data.zarr', mode='w', shape=(100000, 100000),
              chunks=(1000, 1000), dtype='f4')

# 加载为 Dask 数组(惰性,不加载数据)
dask_array = da.from_zarr('data.zarr')

# 执行计算(并行,内存外)
result = dask_array.mean(axis=0).compute()  # 并行计算

# 将 Dask 数组写入 Zarr
large_array = da.random.random((100000, 100000), chunks=(1000, 1000))
da.to_zarr(large_array, 'output.zarr')

好处

  • 处理大于内存的数据集
  • 跨分块自动并行计算
  • 利用分块存储实现高效 I/O

Xarray 集成

Xarray 提供了带有标签的多维数组以及 Zarr 后端:

import xarray as xr
import zarr

# 将 Zarr 存储打开为 Xarray 数据集(惰性加载)
ds = xr.open_zarr('data.zarr')

# 数据集包含坐标和元数据
print(ds)

# 访问变量
temperature = ds['temperature']

# 执行带标签的操作
subset = ds.sel(time='2024-01', lat=slice(30, 60))

# 将 Xarray 数据集写入 Zarr
ds.to_zarr('output.zarr')

# 从头创建带坐标的数据集
ds = xr.Dataset(
    {
        'temperature': (['time', 'lat', 'lon'], data),
        'precipitation': (['time', 'lat', 'lon'], data2)
    },
    coords={
        'time': pd.date_range('2024-01-01', periods=365),
        'lat': np.arange(-90, 91, 1),
        'lon': np.arange(-180, 180, 1)
    }
)
ds.to_zarr('climate_data.zarr')

好处

  • 命名维度和坐标
  • 基于标签的索引和选择
  • 与 pandas 集成处理时间序列
  • 类似于 NetCDF 的接口,气候/地理空间科学家熟悉

并行计算与线程安全

Zarr 内部使用异步 I/O。针对远程存储或 Dask 密集型工作负载调整并发度:

import zarr

# 较高的值可提升远程吞吐量;较低的值可减少压力
# 当 Dask 已经提供许多工作线程时。
zarr.config.set({
    "async.concurrency": 8,
    "threading.max_workers": 8,
})

旧的 synchronizer 参数(ThreadSynchronizerProcessSynchronizer)在 Zarr-Python 3 中不可用。请使用以下模式:

  • 读取: 始终跨线程/进程安全。
  • 写入: 当每个工作线程写入 非重叠分块 时安全;大多数存储支持原子分块写入。
  • 重叠写入: 在同步器恢复之前,通过外部协调(文件锁、工作流设计)。

对于 Dask 密集型工作负载,估计总并发 I/O 约为 dask_threads × async.concurrency,如果存储或内存饱和,则降低 Zarr 的并发设置。

合并元数据

对于包含许多数组的层级存储,将元数据合并到单个文件中以减少 I/O 操作:

import zarr

# 在创建数组/组之后
root = zarr.group('data.zarr')
# ... 创建多个数组/组 ...

# 合并元数据
zarr.consolidate_metadata('data.zarr')

# 使用合并元数据打开(更快,尤其对云存储)
root = zarr.open_consolidated('data.zarr')

好处

  • 将元数据读取操作从 N 次(每个数组一次)减少到 1 次
  • 对云存储至关重要(减少延迟)
  • 加快 tree() 操作和组遍历

注意事项

  • 如果数组更新后未重新合并,元数据可能过时
  • 不适合频繁更新的数据集
  • 多写入者场景可能导致读取不一致

性能优化

最佳性能清单

  1. 分块大小:每个分块目标 1-10 MB
   # 对于 float32:1MB = 262,144 个元素
   chunks = (512, 512)  # 512×512×4 字节 = ~1MB
  1. 分块形状:与访问模式对齐
   # 按行访问 → 分块跨越列:(小, 大)
   # 按列访问 → 分块跨越行:(大, 小)
   # 随机访问 → 平衡:(中, 中)
  1. 压缩:根据工作负载选择
   # 交互式/快速:BloscCodec(cname='lz4')
   # 平衡:BloscCodec(cname='zstd', clevel=5)
   # 最大压缩:GzipCodec(level=9)
  1. 存储后端:与环境匹配
   # 本地:LocalStore(默认)
   # 云:fsspec URI 或 FsspecStore + 合并元数据
   # 临时:MemoryStore
  1. 分片:用于大规模数据集
   # 当有数百万个小分块时
   shards=(10*chunk_size, 10*chunk_size)
  1. 并行 I/O:对大操作使用 Dask
   import dask.array as da
   dask_array = da.from_zarr('data.zarr')
   result = dask_array.compute(scheduler='threads', num_workers=8)

性能分析与调试

# 打印详细的数组信息
print(z.info)

# 输出包括:
# - 类型、形状、分块、dtype
# - 序列化器和压缩器
# - 存储大小(压缩 vs 未压缩)
# - 存储位置

# 检查存储大小
print(f"压缩后大小: {z.nbytes_stored / 1e6:.2f} MB")
print(f"未压缩大小: {z.nbytes / 1e6:.2f} MB")
print(f"压缩比: {z.nbytes / z.nbytes_stored:.2f}x")

常见模式与最佳实践

模式:时间序列数据

# 将时间序列存储为第一个维度
# 这样可以高效地追加新的时间步
z = zarr.open('timeseries.zarr', mode='a',
              shape=(0, 720, 1440),  # 以 0 个时间步开始
              chunks=(1, 720, 1440),  # 每个分块一个时间步
              dtype='f4')

# 追加新的时间步
new_data = np.random.random((1, 720, 1440))
z.append(new_data, axis=0)

模式:大型矩阵操作

import dask.array as da

# 在 Zarr 中创建大型矩阵
z = zarr.open('matrix.zarr', mode='w',
              shape=(100000, 100000),
              chunks=(1000, 1000),
              dtype='f8')

# 使用 Dask 进行并行计算
dask_z = da.from_zarr('matrix.zarr')
result = (dask_z @ dask_z.T).compute()  # 并行矩阵乘法

模式:云原生工作流

import zarr

path = "s3://my-bucket/data.zarr"
z = zarr.create_array(
    store=path,
    shape=(10000, 10000),
    chunks=(500, 500),
    dtype="f4",
    storage_options={"anon": False},
)
z[:] = data

zarr.consolidate_metadata(path)
z_read = zarr.open_consolidated(path, storage_options={"anon": False})
subset = z_read[0:100, 0:100]

模式:格式转换

# HDF5 到 Zarr
import h5py
import zarr

with h5py.File('data.h5', 'r') as h5:
    dataset = h5['dataset_name']
    z = zarr.array(dataset[:],
                   chunks=(1000, 1000),
                   store='data.zarr')

# NumPy 到 Zarr
import numpy as np
data = np.load('data.npy')
z = zarr.array(data, chunks='auto', store='data.zarr')

# Zarr 到 NetCDF(通过 Xarray)
import xarray as xr
ds = xr.open_zarr('data.zarr')
ds.to_netcdf('data.nc')

常见问题与解决方案

问题:性能慢

诊断:检查分块大小和对齐

print(z.chunks)  # 分块大小是否合适?
print(z.info)    # 检查压缩比

解决方案

  • 将分块大小增加到 1-10 MB
  • 使分块与访问模式对齐
  • 尝试不同的压缩编解码器
  • 使用 Dask 进行并行操作

问题:高内存使用

原因:将整个数组或大分块加载到内存中

解决方案

# 不要加载整个数组
# 坏:data = z[:]
# 好:逐块处理
for i in range(0, z.shape[0], 1000):
    chunk = z[i:i+1000, :]
    process(chunk)

# 或使用 Dask 进行自动分块
import dask.array as da
dask_z = da.from_zarr('data.zarr')
result = dask_z.mean().compute()  # 逐块处理

问题:云存储延迟

解决方案

# 1. 合并元数据
zarr.consolidate_metadata(store)
z = zarr.open_consolidated(store)

# 2. 使用合适的分块大小(云存储为 5-100 MB)
chunks = (2000, 2000)  # 云存储使用更大的分块

# 3. 启用分片
shards = (10000, 10000)  # 将许多分块分组

问题:并发写入冲突

解决方案:设计工作流,使每个进程/线程写入独立的分块。Zarr-Python 3 尚不支持 ThreadSynchronizer / ProcessSynchronizer;请参阅 references/v3_migration.md

附加资源

捆绑的参考资料

文件内容
references/api_reference.md函数签名、存储、编解码器、索引
references/v3_migration.mdZarr-Python 2→3 的 breaking changes 及 WIP 功能

官方上游

  • 文档:https://zarr.readthedocs.io/en/stable/
  • 3.0 迁移指南:https://zarr.readthedocs.io/en/stable/user-guide/v3_migration/
  • 存储后端:https://zarr.readthedocs.io/en/stable/user-guide/storage/
  • Zarr 规范:https://zarr-specs.readthedocs.io/
  • GitHub:https://github.com/zarr-developers/zarr-python
  • 开发者聊天:https://ossci.zulipchat.com/#narrow/channel/423692-Zarr-Python

相关库: XarrayDaskNumCodecs