import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix
import h5py
from typing import Dict, Optional
import logging
logger = logging.getLogger(__name__)
from pegasusio import modalities, UnimodalData, CITESeqData, MultimodalData
def load_10x_h5_file_v2(h5_in: h5py.Group) -> MultimodalData:
"""Load 10x v2 format matrix from hdf5 file
Parameters
----------
h5_in : h5py.Group
An instance of h5py.Group class that is connected to a 10x v2 formatted hdf5 file.
Returns
-------
A MultimodalData object containing (genome, UnimodalData) pair per genome.
Examples
--------
>>> io.load_10x_h5_file_v2(h5_in)
"""
data = MultimodalData()
for genome in h5_in.keys():
group = h5_in[genome]
M, N = group["shape"][...]
mat = csr_matrix(
(
group["data"][...],
group["indices"][...],
group["indptr"][...],
),
shape=(N, M),
)
barcodes = group["barcodes"][...].astype(str)
ids = group["genes"][...].astype(str)
names = group["gene_names"][...].astype(str)
unidata = UnimodalData(
{"barcodekey": barcodes},
{"featurekey": names, "featureid": ids},
{"counts": mat},
{"modality": "rna", "genome": genome},
cur_matrix = "counts",
)
unidata.separate_channels()
data.add_data(unidata)
return data
def load_10x_h5_file_v3(
h5_in: h5py.Group,
genome: Optional[str] = None,
modality: Optional[str] = None,
) -> MultimodalData:
"""Load 10x v3 format matrix from hdf5 file, allowing detection of crispr and citeseq libraries
Parameters
----------
h5_in : h5py.Group
An instance of h5py.Group class that is connected to a 10x v3 formatted hdf5 file.
genome: ``str``, optional, default: ``None``
The genome reference name. Ignore if ``None`` or the h5 file contains multiple matrices.
modality: ``str``, optional, default: ``None``
The modality name. Ignore if ``None`` or the h5 file contains multiple matrices.
Returns
-------
A MultimodalData object containing (genome, UnimodalData) pair per genome.
Examples
--------
>>> io.load_10x_h5_file_v3(h5_in)
"""
M, N = h5_in["matrix/shape"][...]
bigmat = csr_matrix(
(
h5_in["matrix/data"][...],
h5_in["matrix/indices"][...],
h5_in["matrix/indptr"][...],
),
shape=(N, M),
)
barcodes = h5_in["matrix/barcodes"][...].astype(str)
df = pd.DataFrame(
data={
"genome": h5_in["matrix/features/genome"][...].astype(str),
"feature_type": h5_in["matrix/features/feature_type"][...].astype(str),
"id": h5_in["matrix/features/id"][...].astype(str),
"name": h5_in["matrix/features/name"][...].astype(str),
}
)
genomes = list(df["genome"].unique())
if "" in genomes:
genomes.remove("")
default_genome = genomes[0] if len(genomes) == 1 else None
data = MultimodalData()
gb = df.groupby(by=["genome", "feature_type"])
for name, group in gb:
barcode_metadata = {"barcodekey": barcodes}
feature_metadata = {
"featurekey": group["name"].values,
"featureid": group["id"].values,
}
mat = bigmat[:, gb.groups[name]]
cur_genome = name[0]
if gb.ngroups == 1 and genome is not None:
cur_genome = genome
elif name[0] == "" and default_genome is not None:
cur_genome = default_genome
cur_modality = "custom"
if (gb.ngroups == 1) and (modality is not None):
cur_modality = modality
elif name[1] == "Gene Expression":
cur_modality = "rna"
elif name[1] == "CRISPR Guide Capture":
cur_modality = "crispr"
elif name[1] == "Multiplexing Capture":
cur_modality = "hashing"
elif name[1] == "Antibody Capture":
cur_modality = "citeseq"
Class = CITESeqData if cur_modality == "citeseq" else UnimodalData
unidata = Class(
barcode_metadata,
feature_metadata,
{"counts": mat},
{"genome": cur_genome, "modality": cur_modality},
cur_matrix = "counts",
)
unidata.separate_channels()
data.add_data(unidata)
return data
def load_10x_h5_file(
input_h5: str,
genome: Optional[str] = None,
modality: Optional[str] = None,
) -> MultimodalData:
"""Load 10x format matrix (either v2 or v3) from hdf5 file
Parameters
----------
input_h5 : ``str``
The matrix in 10x v2 or v3 hdf5 format.
genome: ``str``, optional, default: ``None``
The genome reference name. Ignore for 10x v2 hdf5 format, and 10x v3 hdf5 file containing multiple matrices.
modality: ``str``, optional, default: ``None``
The modality name. Ignore for 10x v2 hdf5 format, and 10x v3 hdf5 file containing multiple matrices.
Returns
-------
A MultimodalData object containing (genome, UnimodalData) pair per genome.
Examples
--------
>>> io.load_10x_h5_file('example_10x.h5')
"""
data = None
with h5py.File(input_h5, "r") as h5_in:
data = load_10x_h5_file_v3(h5_in, genome, modality) if "matrix" in h5_in.keys() else load_10x_h5_file_v2(h5_in)
return data
[docs]
def read_molecule_info(molecule_info_h5: str) -> pd.DataFrame:
"""Load molecule info from hdf5 file as a pandas DataFrame. Only support cumulus_feature_barcoding format as the following structure:
- ``/barcode_idx``: Integer array of length ``n_mol`` (number of molecules). Each entry is the index of the molecule’s cell barcode, which can be found in ``/barcodes``;
- ``/barcodes``: String array of length ``n_cell`` (number of cell barcodes). Each entry is a cell barcode;
- ``/feature_idx``: Integer array of length ``n_mol``. Each entry is the index of the molecule’s feature name, which can be found in ``/features``;
- ``/features``: String array of length ``n_feature`` (number of features). Each entry is a feature name;
- ``/umi``: String array of length ``n_mol``. Each entry is the molecule’s UMI barcode;
- ``/count``: Integer array of length n_mol. Each entry is the molecule’s count of reads.
Parameters
----------
molecule_info_h5: ``str``
The hdf5 format file containing UMI information.
Returns
--------
A data frame with each row representing a UMI, and columns as:
- ``Barcode`` for cell barcodes
- ``Feature`` for feature names
- ``UMI`` for UMI barcodes
- ``Count`` for count of reads
Examples
--------
>>> io.read_molecule_info("molecule_info.h5")
"""
with h5py.File(molecule_info_h5, "r") as h5_in:
assert "features" in h5_in.keys() and isinstance(h5_in["features"], h5py.Dataset), "H5 format not supported! Only support Cumulus Feature Barcoding's hdf5 format."
df_mole = pd.DataFrame({
"Barcode": h5_in["barcodes"][:][h5_in["barcode_idx"][:]].astype(str),
"Feature": h5_in["features"][:][h5_in["feature_idx"][:]].astype(str),
"UMI": h5_in["umi"][:].astype(str),
"Count": h5_in["count"][:],
})
return df_mole
def load_loom_file(
input_loom: str, genome: str = None, modality: str = None
) -> MultimodalData:
"""Load count matrix from a LOOM file.
Parameters
----------
input_loom : `str`
The LOOM file, containing the count matrix.
genome : `str`, optional (default None)
The genome reference. If None, use "unknown" instead. If not None and input loom contains genome attribute, the attribute will be overwritten.
modality: `str`, optional (default None)
Modality. If None, use "rna" instead. If not None and input loom contains modality attribute, the attribute will be overwritten.
Returns
-------
A MultimodalData object containing a (genome, UmimodalData) pair.
Examples
--------
>>> io.load_loom_file('example.loom', genome = 'GRCh38')
"""
col_trans = {
"CellID": "barcodekey",
"obs_names": "barcodekey",
"cell_names": "barcodekey",
}
row_trans = {
"Gene": "featurekey",
"var_names": "featurekey",
"gene_names": "featurekey",
"Accession": "featureid",
"gene_ids": "featureid",
"ensembl_ids": "featureid",
}
import loompy
with loompy.connect(input_loom) as ds:
barcode_metadata = {}
barcode_multiarrays = {}
for key, arr in ds.col_attrs.items():
key = col_trans.get(key, key)
if arr.ndim == 1:
barcode_metadata[key] = arr
elif arr.ndim > 1:
barcode_multiarrays[key] = arr
else:
raise ValueError(
f"Detected column attribute '{key}' has ndim = {arr.ndim}!"
)
feature_metadata = {}
feature_multiarrays = {}
for key, arr in ds.row_attrs.items():
key = row_trans.get(key, key)
if arr.ndim == 1:
feature_metadata[key] = arr
elif arr.ndim > 1:
feature_multiarrays[key] = arr
else:
raise ValueError(
f"Detected row attribute '{key}' has ndim = {arr.ndim}!"
)
barcode_multigraphs = {}
for key, graph in ds.col_graphs.items():
barcode_multigraphs[key] = csr_matrix(graph)
feature_multigraphs = {}
for key, graph in ds.row_graphs.items():
feature_multigraphs[key] = csr_matrix(graph)
matrices = {}
for key, mat in ds.layers.items():
key = "X" if key == "" else key
matrices[key] = mat.sparse().T.tocsr()
metadata = dict(ds.attrs)
if genome is not None:
metadata["genome"] = genome
elif "genome" not in metadata:
metadata["genome"] = "unknown"
if modality is not None:
metadata["modality"] = modality
elif "modality" not in metadata:
if metadata.get("experiment_type", "none") in modalities:
metadata["modality"] = metadata.pop("experiment_type")
else:
metadata["modality"] = "rna"
unidata = UnimodalData(
barcode_metadata,
feature_metadata,
matrices,
metadata,
barcode_multiarrays,
feature_multiarrays,
barcode_multigraphs,
feature_multigraphs,
cur_matrix = "X",
)
unidata.separate_channels()
data = MultimodalData(unidata)
return data
def write_loom_file(data: MultimodalData, output_file: str) -> None:
"""Write a MultimodalData to loom file. Will assert data only contain one type of experiment. Use current matrix as the main matrix."""
keys = data.list_data()
if len(keys) > 1:
raise ValueError(f"Data contain multiple modalities: {','.join(keys)}!")
data.select_data(keys[0])
matrices = data.list_keys()
main_mat_key = data.current_matrix()
if len(matrices) == 0:
raise ValueError("Could not write empty matrix to a loom file!")
def _replace_slash(name: str) -> str:
"""Replace slash with |"""
if name.find("/") >= 0:
return name.replace("/", "|")
return name
def _process_attrs(
key_name: str, attrs: pd.DataFrame, attrs_multi: dict
) -> Dict[str, object]:
res_dict = {key_name: attrs.index.values}
for key in attrs.columns:
res_dict[_replace_slash(key)] = np.array(attrs[key].values)
for key, value in attrs_multi.items():
if (
value.ndim > 1
): # value.ndim == 1 refers to np.recarray, which will not be written to a loom file.
res_dict[_replace_slash(key)] = (
value if value.shape[1] > 1 else value[:, 0]
)
return res_dict
row_attrs = _process_attrs("Gene", data.var, data.varm)
col_attrs = _process_attrs("CellID", data.obs, data.obsm)
accession_key = (
"featureid"
if "featureid" in row_attrs
else ("gene_ids" if "gene_ids" in row_attrs else None)
)
if accession_key is not None:
row_attrs["Accession"] = row_attrs.pop(accession_key)
layers = {}
for matkey in matrices:
layers["" if matkey == main_mat_key else matkey] = data.get_matrix(matkey).T
file_attrs = {}
for key, value in data.uns.items():
if isinstance(value, str):
file_attrs[_replace_slash(key)] = value
import loompy
loompy.create(output_file, layers, row_attrs, col_attrs, file_attrs=file_attrs)
if len(data.varp) > 0 or len(data.obsp) > 0:
with loompy.connect(output_file) as ds:
for key, value in data.varp.items():
ds.row_graphs[_replace_slash(key)] = value
for key, value in data.obsp.items():
ds.col_graphs[_replace_slash(key)] = value
logger.info(f"{output_file} is written.")
def write_10x_h5(data: MultimodalData, output_file: str) -> None:
"""Follow 10x hdf5 format description (https://support.10xgenomics.com/single-cell-gene-expression/software/pipelines/latest/advanced/h5_matrices).
Restricted to GEX modality only.
Settings consistent with 10x cellranger's (https://github.com/10XGenomics/cellranger/blob/master/lib/python/cellranger/matrix.py#L428-L445).
"""
unidata = data._unidata
nmat = len(unidata.matrices)
compression_method = "gzip"
chunk_size = 80000
assert unidata.get_modality() == "rna", "Only Gene Expression count matrix is accepted!"
def _create_h5_string_dataset(group, name, data, shape=None, fillvalue=None):
"""Inspired by 10x cellranger create_hdf5_string_dataset function (https://github.com/10XGenomics/cellranger/blob/master/lib/python/cellranger/io.py#L323-L358)"""
kwargs = {
'chunks': (chunk_size,),
'maxshape': (None,),
'compression': compression_method,
'shuffle': True,
}
if data is None:
assert fillvalue is not None, "Either data or fillvalue must be specified!"
dtype = f"S{len(fillvalue)}"
group.create_dataset(
name=name,
dtype=dtype,
shape=shape,
fillvalue=fillvalue.encode('ascii', 'xmlcharrefreplace'),
**kwargs,
)
else:
fixed_len = max(len(x) for x in data)
if fixed_len == 0:
fixed_len = 1
dtype = f"S{fixed_len}"
group.create_dataset(
name=name,
data=np.vectorize(lambda s: s.encode('ascii', 'xmlcharrefreplace'))(data),
dtype=dtype,
**kwargs,
)
def _write_h5(data, output_file):
n_obs, n_feature = data.shape
genome = data.uns["genome"] if "genome" in data.uns.keys() else "Unknown"
feature_type = "Gene Expression"
with h5py.File(output_file, "w") as f:
grp = f.create_group("matrix")
# Cell barcodes
_create_h5_string_dataset(grp, "barcodes", data.obs_names.values)
# Raw counts.
X = data.X if isinstance(data.X, csr_matrix) else csr_matrix(data.X)
grp.create_dataset(
name="data",
data=X.data,
chunks=(chunk_size,),
maxshape=(None,),
compression=compression_method,
shuffle=True,
)
grp.create_dataset(
name="indices",
data=X.indices,
chunks=(chunk_size,),
maxshape=(None,),
compression=compression_method,
shuffle=True,
)
grp.create_dataset(
name="indptr",
data=X.indptr,
chunks=(chunk_size,),
maxshape=(None,),
compression=compression_method,
shuffle=True,
)
grp.create_dataset(
name="shape",
dtype=np.int32,
data=(n_feature, n_obs),
) # feature-by-barcode
feature_grp = grp.create_group("features")
feature_grp.create_dataset(name="_all_tag_keys", data=[b"genome"])
_create_h5_string_dataset(feature_grp, "feature_type", data=None, shape=(n_feature,), fillvalue=feature_type)
_create_h5_string_dataset(feature_grp, "genome", data=None, shape=(n_feature,), fillvalue=genome)
_create_h5_string_dataset(feature_grp, "id", data=data.var["featureid"].values)
_create_h5_string_dataset(feature_grp, "name", data=data.var_names.values)
if nmat == 1:
_write_h5(unidata, output_file)
else:
import os
path = os.path.dirname(os.path.abspath(output_file))
outname = os.path.basename(output_file).rstrip(".h5")
for mat_key in unidata.list_keys():
unidata.select_matrix(mat_key)
_write_h5(unidata, f"{path}/{outname}.{mat_key}.h5")