Skip to content

API Reference

Core Classes

DataStreamer

chelombus.streamer.data_streamer.DataStreamer

Class for streaming large datasets in manageable chunks.

Source code in chelombus/streamer/data_streamer.py
class DataStreamer:
    """
    Class for streaming large datasets in manageable chunks.
    """
    def parse_input(self, input_path: str, chunksize: Optional[int] = None, verbose: int = 0, smiles_col: int = 0) -> Iterator[List[str]]:
        """
        Reads input data from a file or a directory of files and yields the data in chunks.

        This method processes each file provided by the input path, which can be either a single file
        or a directory containing multiple files. Lines in plain-text files are split and the column at
        `smiles_col` is used as the SMILES string. SDF/SD files are handled in a streaming fashion via
        RDKit and converted to SMILES on the fly. For each file, the extracted SMILES strings are placed
        in a buffer until it reaches `chunksize`, at which point the buffer is yielded and cleared. If
        `chunksize` is None or there are remaining lines after processing a file, those are yielded as a
        final chunk.

        Args:
            input_path (str): The path to a file or directory containing input files.
            chunksize (Optional[int]): The number of lines to include in each yielded chunk. If None, 
                the entire file content is yielded as a single chunk.
            verbose (int): Level of verbosity. Default is 0.
            smiles_col (int): Column index for text inputs in which SMILES appear among several columns.
                Ignored for SDF inputs where SMILES strings are generated from molecule blocks.

        Yields:
            List[str]: A list of lines from the input file(s), where the list length will be equal to 
            `chunksize` except possibly for the final chunk.
        """ 
        # Sanity check
        if not os.path.exists(input_path):
            raise FileNotFoundError(f"Data source {input_path} not found.")

        buffer: List[str] = []

        def _flush_buffer() -> Iterator[List[str]]:
            if buffer:
                yield buffer[:]
                buffer.clear()

        for file_path in _process_input(input_paths=input_path):
            if verbose == 1:
                print("Processing file:", file_path)

            file_ext = os.path.splitext(file_path)[1].lower()
            if file_ext in {".sdf", ".sd"}:
                records = self._stream_sdf(file_path)
            else:
                records = self._stream_text(file_path, smiles_col)

            for smiles in records:
                buffer.append(smiles)
                if chunksize is not None and len(buffer) == int(chunksize):
                    yield buffer[:]
                    buffer.clear()

            # Process remaining items in the buffer for the current file
            yield from _flush_buffer()

    def _stream_text(self, file_path: str, smiles_col: int) -> Iterator[str]:
        with open(file_path, "r") as file:
            for line in file:
                try:
                    yield line.split()[smiles_col]
                except Exception as e:
                    print(f"\nAn exception occured with line: {line}. Raised Error: {e}\n")

    def _stream_sdf(self, file_path: str) -> Iterator[str]:
        try:
            from rdkit import Chem
        except ImportError as exc:
            raise ImportError(
                "RDKit is required to read SDF files. Please install rdkit to stream SDF inputs."
            ) from exc

        # ForwardSDMolSupplier streams molecules without loading the full file in memory
        with open(file_path, "rb") as file_handle:
            supplier = Chem.ForwardSDMolSupplier(file_handle)
            for mol in supplier:
                if mol is None:
                    continue
                try:
                    yield Chem.MolToSmiles(mol)
                except Exception as e:
                    print(f"\nFailed to convert molecule in {file_path} to SMILES: {e}\n")

parse_input(input_path, chunksize=None, verbose=0, smiles_col=0)

Reads input data from a file or a directory of files and yields the data in chunks.

This method processes each file provided by the input path, which can be either a single file or a directory containing multiple files. Lines in plain-text files are split and the column at smiles_col is used as the SMILES string. SDF/SD files are handled in a streaming fashion via RDKit and converted to SMILES on the fly. For each file, the extracted SMILES strings are placed in a buffer until it reaches chunksize, at which point the buffer is yielded and cleared. If chunksize is None or there are remaining lines after processing a file, those are yielded as a final chunk.

Parameters:

Name Type Description Default
input_path str

The path to a file or directory containing input files.

required
chunksize Optional[int]

The number of lines to include in each yielded chunk. If None, the entire file content is yielded as a single chunk.

None
verbose int

Level of verbosity. Default is 0.

0
smiles_col int

Column index for text inputs in which SMILES appear among several columns. Ignored for SDF inputs where SMILES strings are generated from molecule blocks.

0

Yields:

Type Description
List[str]

List[str]: A list of lines from the input file(s), where the list length will be equal to

List[str]

chunksize except possibly for the final chunk.

Source code in chelombus/streamer/data_streamer.py
def parse_input(self, input_path: str, chunksize: Optional[int] = None, verbose: int = 0, smiles_col: int = 0) -> Iterator[List[str]]:
    """
    Reads input data from a file or a directory of files and yields the data in chunks.

    This method processes each file provided by the input path, which can be either a single file
    or a directory containing multiple files. Lines in plain-text files are split and the column at
    `smiles_col` is used as the SMILES string. SDF/SD files are handled in a streaming fashion via
    RDKit and converted to SMILES on the fly. For each file, the extracted SMILES strings are placed
    in a buffer until it reaches `chunksize`, at which point the buffer is yielded and cleared. If
    `chunksize` is None or there are remaining lines after processing a file, those are yielded as a
    final chunk.

    Args:
        input_path (str): The path to a file or directory containing input files.
        chunksize (Optional[int]): The number of lines to include in each yielded chunk. If None, 
            the entire file content is yielded as a single chunk.
        verbose (int): Level of verbosity. Default is 0.
        smiles_col (int): Column index for text inputs in which SMILES appear among several columns.
            Ignored for SDF inputs where SMILES strings are generated from molecule blocks.

    Yields:
        List[str]: A list of lines from the input file(s), where the list length will be equal to 
        `chunksize` except possibly for the final chunk.
    """ 
    # Sanity check
    if not os.path.exists(input_path):
        raise FileNotFoundError(f"Data source {input_path} not found.")

    buffer: List[str] = []

    def _flush_buffer() -> Iterator[List[str]]:
        if buffer:
            yield buffer[:]
            buffer.clear()

    for file_path in _process_input(input_paths=input_path):
        if verbose == 1:
            print("Processing file:", file_path)

        file_ext = os.path.splitext(file_path)[1].lower()
        if file_ext in {".sdf", ".sd"}:
            records = self._stream_sdf(file_path)
        else:
            records = self._stream_text(file_path, smiles_col)

        for smiles in records:
            buffer.append(smiles)
            if chunksize is not None and len(buffer) == int(chunksize):
                yield buffer[:]
                buffer.clear()

        # Process remaining items in the buffer for the current file
        yield from _flush_buffer()

FingerprintCalculator

chelombus.utils.fingerprints.FingerprintCalculator

A class to compute molecular fingerprints from a list of SMILES strings. Currently, only the 'morgan' and 'mqn' fingerprints type are supported.

Source code in chelombus/utils/fingerprints.py
class FingerprintCalculator:
    """
    A class to compute molecular fingerprints from a list of SMILES strings.
    Currently, only the 'morgan' and 'mqn' fingerprints type are supported.
    """
    def __init__(self):

        # Map fingerprint types to functions
        self.fingerprint_function_map = {
            #TODO: Support for other fingerprints
            'morgan': _calculate_morgan_fp,
            'mqn': _calculate_mqn_fp,
        }

    def FingerprintFromSmiles(self, smiles:List | str, fp:str, nprocesses:int | None = os.cpu_count(), **params) -> npt.NDArray:
        """
        Generate fingerprints for a list of SMILES strings in parallel.

        The method selects the appropriate fingerprint function based on the 'fp' parameter,
        binds additional keyword parameters using functools.partial, and then applies the function 
        across the SMILES list using multiprocessing.Pool.map.

        Args:
            smiles_list (list): A list of SMILES strings.
            fp (str): The fingerprint type to compute (e.g., 'morgan').
            nprocesses (int): Number of processes for multithreaded fingerprint calculation.  Default to cpu cores 
            **params: Additional keyword parameters for the fingerprint function.
                For 'morgan', required keys are:
                    - fpSize (int): Number of bits in the fingerprint.
                    - radius (int): Radius for the Morgan fingerprint.

        Returns:
            npt.NDArray: A NumPy array of fingerprints with shape (number of SMILES, fpSize).

        Raises:
            ValueError: If an unsupported fingerprint type is requested.
        """
        func = self.fingerprint_function_map.get(fp)
        if func is None:
            raise ValueError(f"Unsupported fingerprint type: '{fp}'")

        # Bind the additional parameters to the selected function.
        part_func = partial(func, **params)

        if isinstance(smiles, str):
            smiles = [smiles]
        try: 
            with Pool(processes=nprocesses) as pool:
                fingerprints = pool.map(part_func, smiles)
        finally: 
            pool.close()
            pool.join()

        fingerprints = [fp for fp in fingerprints if fp is not None]

        # Free memory
        del smiles
        return np.array(fingerprints)

FingerprintFromSmiles(smiles, fp, nprocesses=os.cpu_count(), **params)

Generate fingerprints for a list of SMILES strings in parallel.

The method selects the appropriate fingerprint function based on the 'fp' parameter, binds additional keyword parameters using functools.partial, and then applies the function across the SMILES list using multiprocessing.Pool.map.

Parameters:

Name Type Description Default
smiles_list list

A list of SMILES strings.

required
fp str

The fingerprint type to compute (e.g., 'morgan').

required
nprocesses int

Number of processes for multithreaded fingerprint calculation. Default to cpu cores

cpu_count()
**params

Additional keyword parameters for the fingerprint function. For 'morgan', required keys are: - fpSize (int): Number of bits in the fingerprint. - radius (int): Radius for the Morgan fingerprint.

{}

Returns:

Type Description
NDArray

npt.NDArray: A NumPy array of fingerprints with shape (number of SMILES, fpSize).

Raises:

Type Description
ValueError

If an unsupported fingerprint type is requested.

Source code in chelombus/utils/fingerprints.py
def FingerprintFromSmiles(self, smiles:List | str, fp:str, nprocesses:int | None = os.cpu_count(), **params) -> npt.NDArray:
    """
    Generate fingerprints for a list of SMILES strings in parallel.

    The method selects the appropriate fingerprint function based on the 'fp' parameter,
    binds additional keyword parameters using functools.partial, and then applies the function 
    across the SMILES list using multiprocessing.Pool.map.

    Args:
        smiles_list (list): A list of SMILES strings.
        fp (str): The fingerprint type to compute (e.g., 'morgan').
        nprocesses (int): Number of processes for multithreaded fingerprint calculation.  Default to cpu cores 
        **params: Additional keyword parameters for the fingerprint function.
            For 'morgan', required keys are:
                - fpSize (int): Number of bits in the fingerprint.
                - radius (int): Radius for the Morgan fingerprint.

    Returns:
        npt.NDArray: A NumPy array of fingerprints with shape (number of SMILES, fpSize).

    Raises:
        ValueError: If an unsupported fingerprint type is requested.
    """
    func = self.fingerprint_function_map.get(fp)
    if func is None:
        raise ValueError(f"Unsupported fingerprint type: '{fp}'")

    # Bind the additional parameters to the selected function.
    part_func = partial(func, **params)

    if isinstance(smiles, str):
        smiles = [smiles]
    try: 
        with Pool(processes=nprocesses) as pool:
            fingerprints = pool.map(part_func, smiles)
    finally: 
        pool.close()
        pool.join()

    fingerprints = [fp for fp in fingerprints if fp is not None]

    # Free memory
    del smiles
    return np.array(fingerprints)

PQEncoder

chelombus.encoder.encoder.PQEncoder

Bases: PQEncoderBase

Class to encode high-dimensional vectors into PQ-codes.

Source code in chelombus/encoder/encoder.py
class PQEncoder(PQEncoderBase):
    """
    Class to encode high-dimensional vectors into PQ-codes.
    """
    def __init__(self, k:int=256, m:int=8, iterations=20):
        """ Initializes the encoder with trained sub-block centroids.

        Args:
            k (int): Number of centroids. Default is 256. We assume that all subquantizers 
            have the same finit number (k') of reproduction values. So k = (k')^m 
            m (int): Number of distinct subvectors of the input X vector. 
            m is a subvector of dimension D' = D/m where D is the 
            dimension of the input vector X. D is therefore a multiple of m
            iterations (int): Number of iterations for the k-means

        High values of k increase the computational cost of the quantizer as well as memory 
        usage of strogin the centroids (k' x D floating points). Using k=256 and m=8 is often
        a reasonable choice. 

        Reference: DOI: 10.1109/TPAMI.2010.57
        """

        self.m = m 
        self.k = k 
        self.iterations = iterations
        self.encoder_is_trained = False
        self.codebook_dtype =  np.uint8 if self.k <= 2**8 else (np.uint16 if self.k<= 2**16 else np.uint32)
        self.pq_trained = []

        """ The codebook is defined as the Cartesian product of all centroids. 
        Storing the codebook C explicitly is not efficient, the full codebook would be (k')^m  centroids
        Instead we store the K' centroids of all the subquantizers (k' · m). We can later simulate the full 
        codebook by combining the centroids from each subquantizer. 
        """
    @property
    def is_trained(self) -> bool: return self.encoder_is_trained

    def fit(self, X_train:NDArray, verbose:int=1, device:str='cpu', **kwargs)->None:
        """ KMeans fitting of every subvector matrix from the X_train matrix. Populates
        the codebook by storing the cluster centers of every subvector

        X_train is the input matrix. For a vector that has dimension D
        then X_train is a matrix of size (N, D)
        where N is the number of rows (vectors) and D the number of columns (dimension of
        every vector i.e. fingerprint in the case of molecular data)

        Args:
           X_train(np.array): Input matrix to train the encoder.
           verbose(int): Level of verbosity. Default is 1
           device: 'cpu' for sklearn KMeans, 'gpu' for torch-based KMeans on CUDA,
                   'auto' to pick GPU if available. Default is 'cpu'.
           **kwargs: Optional keyword arguments passed to the underlying KMeans `fit()` function
                     (only used on the CPU path).
        """

        assert X_train.ndim == 2, "The input can only be a matrix (X.ndim == 2)"
        N, D = X_train.shape # N number of input vectors, D dimension of the vectors
        assert self.k < N, "the number of training vectors (N for N,D = X_train.shape) should be more than the number of centroids (K)"
        assert D % self.m == 0, f"Vector (fingeprint) dimension should be divisible by the number of subvectors (m). Got {D} / {self.m}"
        self.D_subvector = int(D / self.m) # Dimension of the subvector.
        self.og_D = D # We save the original dimensions of the input vector (fingerprint) for later use
        assert self.encoder_is_trained == False, "Encoder can only be fitted once"

        self.codewords= np.zeros((self.m, self.k, self.D_subvector), dtype=np.float32)

        use_gpu = (device == 'gpu') or (device == 'auto' and _GPU_AVAILABLE)
        if use_gpu:
            if not _GPU_AVAILABLE:
                raise RuntimeError("GPU requested but CUDA not available")
            self._fit_gpu(X_train, verbose)
        else:
            self._fit_cpu(X_train, verbose, **kwargs)

        self.encoder_is_trained = True
        del X_train # remove initial training data from memory

    def _fit_cpu(self, X_train: NDArray, verbose: int = 1, **kwargs) -> None:
        subvector_dim = self.D_subvector

        iterable = range(self.m)
        if verbose > 0:
            iterable = tqdm(iterable, desc='Training PQ-encoder', total=self.m)

        for subvector_idx in iterable:
            X_train_subvector = X_train[:, subvector_dim * subvector_idx : subvector_dim * (subvector_idx + 1)]
            kmeans = KMeans(n_clusters=self.k, init='k-means++', max_iter=self.iterations, **kwargs).fit(X_train_subvector)
            self.pq_trained.append(kmeans)
            self.codewords[subvector_idx] = kmeans.cluster_centers_

    @staticmethod
    def _gpu_encoder_batch_size(
        N: int,
        k: int,
        reserve_mb: int = 1024,
        min_batch: int = 250_000,
        max_batch: int = 2_000_000,
    ) -> int:
        """Choose a GPU assignment batch size from the current free VRAM.

        This is called after the resident tensors for one subvector are already
        allocated on the GPU. The main scratch tensors per batch point are:

        - one float32 distance row of length ``k``
        - one int64 label from ``argmin``
        - one float32 ``ones`` entry for ``index_add_``

        A reserve margin is kept for CUDA context, allocator fragmentation,
        and GEMM workspace. If VRAM telemetry is unavailable, the method falls
        back to a conservative fixed 1 GiB scratch budget.
        """
        if N <= 0:
            return 0

        floor = min(min_batch, N)
        ceiling = min(max_batch, N)
        bytes_per_point = k * 4 + 8 + 4
        fallback_budget = 1 * 1024**3

        try:
            free, _ = torch.cuda.mem_get_info()
            usable = free - reserve_mb * 1024**2
            scratch_budget = usable if usable > 0 else max(free // 2, floor * bytes_per_point)
        except Exception:
            scratch_budget = fallback_budget

        batch = max(scratch_budget // bytes_per_point, floor)
        return int(min(max(batch, floor), ceiling))

    def _fit_gpu(self, X_train: NDArray, verbose: int = 1) -> None:
        """GPU-accelerated KMeans fitting with batched assignment.

        Each subvector's data (N x D_sub) is kept GPU-resident.  The
        distance matrix is computed in batches to cap VRAM at ~1 GB
        scratch regardless of N.  The GEMM form
        ``||x-c||² = ||x||² + ||c||² - 2·x·cᵀ`` avoids ``torch.cdist``
        workspace overhead and fuses well with cuBLAS.
        """
        N = X_train.shape[0]
        subvector_dim = self.D_subvector
        X_f32 = np.ascontiguousarray(X_train, dtype=np.float32)
        rng = np.random.default_rng()

        iterable = range(self.m)
        if verbose > 0:
            iterable = tqdm(iterable, desc='Training PQ-encoder (GPU)', total=self.m)

        for subvector_idx in iterable:
            sub_slice = X_f32[:, subvector_dim * subvector_idx : subvector_dim * (subvector_idx + 1)]
            X_gpu = torch.from_numpy(sub_slice).cuda()
            # Precompute ||x||² (stays constant across iterations)
            x_sq = (X_gpu * X_gpu).sum(dim=1)  # (N,)
            B = self._gpu_encoder_batch_size(N, self.k)

            # Random init
            idx = rng.choice(N, size=self.k, replace=(N < self.k))
            centroids = X_gpu[torch.from_numpy(idx.astype(np.int64)).cuda()].clone()

            for _ in range(self.iterations):
                c_sq = (centroids * centroids).sum(dim=1)  # (k,)
                counts = torch.zeros(self.k, dtype=torch.float32, device='cuda')
                sums = torch.zeros_like(centroids)

                for start in range(0, N, B):
                    end = min(start + B, N)
                    x_batch = X_gpu[start:end]           # view, no copy
                    # ||x-c||² = ||x||² + ||c||² - 2·x·cᵀ
                    dist = (x_sq[start:end, None]
                            + c_sq[None, :]
                            - 2.0 * (x_batch @ centroids.T))
                    dist.clamp_min_(0.0)
                    labels = dist.argmin(dim=1)
                    del dist

                    ones = torch.ones(end - start, dtype=torch.float32, device='cuda')
                    counts.index_add_(0, labels, ones)
                    sums.index_add_(0, labels, x_batch)
                    del labels, ones

                empty = counts == 0
                counts.clamp_min_(1.0)
                new_centroids = sums / counts.unsqueeze(1)
                new_centroids[empty] = centroids[empty]
                centroids = new_centroids

            self.codewords[subvector_idx] = centroids.cpu().numpy()
            del X_gpu, centroids, x_sq


    def transform(self, X:NDArray, verbose:int=1, device:str='auto', **kwargs) -> NDArray:
        """
        Transforms the input matrix X into its PQ-codes.

        For each sample in X, the input vector is split into `m` equal-sized subvectors.
        Each subvector is assigned to the nearest cluster centroid
        and the index of the closest centroid is stored.

        The result is a compact representation of X, where each sample is encoded as a sequence of centroid indices.

        Args:
            X (np.ndarray): Input data matrix of shape (n_samples, n_features),
                            where n_features must be divisible by the number of subvectors `m`.
            verbose(int): Level of verbosity. Default is 1
            device: 'cpu' for sklearn, 'gpu' for torch.cdist on CUDA, 'auto' to pick GPU if available.
            **kwargs: Optional keyword arguments passed to the underlying KMeans `predict()` function.

        Returns:
            np.ndarray: PQ codes of shape (n_samples, m), where each element is the index of the nearest centroid
                        for the corresponding subvector.
        """

        assert self.encoder_is_trained == True, "PQEncoder must be trained before calling transform"

        use_gpu = (device == 'gpu') or (device == 'auto' and _GPU_AVAILABLE)
        if use_gpu:
            if not _GPU_AVAILABLE:
                raise RuntimeError("GPU requested but CUDA not available")
            return self._transform_gpu(X)

        return self._transform_cpu(X, verbose, **kwargs)

    def _transform_cpu(self, X: NDArray, verbose: int = 1, **kwargs) -> NDArray:
        N, D = X.shape
        pq_codes = np.zeros((N, self.m), dtype=self.codebook_dtype)
        has_sklearn_models = len(self.pq_trained) == self.m

        iterable = range(self.m)
        if verbose > 0:
            iterable = tqdm(iterable, desc='Generating PQ-codes', total=self.m)

        subvector_dim = int(D / self.m)

        for subvector_idx in iterable:
            X_train_subvector = X[:, subvector_dim * subvector_idx : subvector_dim * (subvector_idx + 1)]
            if has_sklearn_models:
                pq_codes[:, subvector_idx] = self.pq_trained[subvector_idx].predict(X_train_subvector, **kwargs)
                continue

            codewords = np.ascontiguousarray(self.codewords[subvector_idx], dtype=np.float32)
            X_sub = np.ascontiguousarray(X_train_subvector, dtype=np.float32)
            bytes_budget = 64 * 1024**2
            bytes_per_row = max(codewords.shape[0] * codewords.shape[1] * 4, 1)
            chunk_size = max(1, min(N, bytes_budget // bytes_per_row))

            for start in range(0, N, chunk_size):
                end = min(start + chunk_size, N)
                chunk = X_sub[start:end]
                diff = chunk[:, None, :] - codewords[None, :, :]
                dists = np.sum(diff * diff, axis=2)
                pq_codes[start:end, subvector_idx] = np.argmin(dists, axis=1).astype(self.codebook_dtype)

        del X
        return pq_codes

    def _transform_gpu(self, X: NDArray) -> NDArray:
        """GPU-accelerated transform using torch.cdist + argmin.

        Batches points to avoid OOM.  Each batch is uploaded to the GPU once
        and then sliced per subvector on-device, avoiding m separate PCIe
        transfers and CPU-side contiguous copies.
        """
        N, D = X.shape
        subvector_dim = int(D / self.m)
        pq_codes = np.zeros((N, self.m), dtype=self.codebook_dtype)

        cw_gpu = [
            torch.from_numpy(np.ascontiguousarray(self.codewords[sub], dtype=np.float32)).cuda()
            for sub in range(self.m)
        ]

        # Budget: batch tensor (D floats) + distance matrix (k floats) per point
        bytes_per_point = (D + self.k) * 4
        max_batch = max((2 * 1024**3) // bytes_per_point, 1024)
        X_f32 = np.ascontiguousarray(X, dtype=np.float32)

        for start in range(0, N, max_batch):
            end = min(start + max_batch, N)
            batch_gpu = torch.from_numpy(X_f32[start:end]).cuda()
            for sub in range(self.m):
                chunk = batch_gpu[:, subvector_dim * sub : subvector_dim * (sub + 1)]
                dists = torch.cdist(chunk, cw_gpu[sub])
                pq_codes[start:end, sub] = dists.argmin(dim=1).cpu().numpy()
                del dists
            del batch_gpu

        del cw_gpu
        return pq_codes

    def fit_transform(self, X:NDArray, verbose:int=1, device:str='cpu', **kwargs) -> NDArray:
        """Fit and transforms the input matrix `X` into its PQ-codes

        The encoder is trained on the matrix and then for each sample in X,
          the input vector is split into `m` equal-sized vectors subvectors composed
          byt the index of the closest centroid. Returns a compact representation of X,
          where each sample is encoded as a sequence of centroid indices (i.e PQcodes)

        Args:
            X (np.array): Input data matrix of shape (n_samples, n_features)
            verbose (int, optional): Level of verbosity. Defaults to 1.
            device: 'cpu', 'gpu', or 'auto'. Default is 'cpu'.
            **kwargs: Optional keyword. These arguments will be passed to the underlying KMeans
            predict() function.

        Returns:
            np.array: PQ codes of shape (n_samples, m), where each element is the index of the nearest
            centroid for the corresponding subvector
        """

        self.fit(X, verbose, device=device, **kwargs)
        return self.transform(X, verbose, device=device)


    def inverse_transform(self, 
                          pq_codes:NDArray, 
                          binary=False, 
                          round=True):
        """ Inverse transform. From PQ-code to the original vector. 
        This process is lossy so we don't expect to get the exact same data.
        If binary=True then the vectors will be returned in binary. 
        This is useful for the case where our original vectors were binary. 
        With binary=True then the returned vectors are transformed from 
        [0.32134, 0.8232, 0.0132, ... 0.1432, 1.19234] to 
        [0, 1, 0, ..., 0, 1]
        If round=True then the vector will be approximated to integer values
        (useful in cases where we expeect to have a count-based fingerprint)

        Args:
            pq_code: (np.array): Input data of PQ codes to be transformed into the
            original vectors.
            binary: (bool): Wheter to return the vectors rounded to 0s and 1s. Default is False
            round: (bool): Round inversed vector to integers values. Default is True  
        """

        # Get shape of the input matrix of PQ codes
        N, D = pq_codes.shape

        # The dimension of the PQ vectors should be the same 
        # as the number of splits (subvectors) from the original data 

        assert D == (self.m), f"The dimension D of the PQ-codes (N,D) should be the same as the number of the subvectors or splits (m) . Got D = {D} for m = {self.m}"
        assert D == (self.og_D  / self.D_subvector), f"The dimension D of the PQ-codes (N,D) should be the same as the original vector dimension divided the subvector dimension"

        X_inversed = np.empty((N, D*self.D_subvector), dtype=float)
        for subvector_idx in range(self.m):
            X_inversed[:, subvector_idx*self.D_subvector:((subvector_idx+1)*self.D_subvector)] = self.codewords[subvector_idx][pq_codes[:, subvector_idx], :]

        if binary:
            reconstructed_binary = (X_inversed>= 0.6).astype('int8')
            return reconstructed_binary 

        if round: return X_inversed.astype(int)
        else: return X_inversed


    def save(self, path: str | Path) -> None:
        path = Path(path)
        payload = {
            "version": 1,
            "init": {"k": self.k, "m": self.m, "iterations": self.iterations},
            "state": {
                "encoder_is_trained": self.encoder_is_trained,
                "og_D": getattr(self, "og_D", None),
                "D_subvector": getattr(self, "D_subvector", None),
                "codewords": getattr(self, "codewords", None),
                "pq_trained": self.pq_trained,  # optional
            },
        }
        joblib.dump(payload, path)

    @classmethod
    def load(cls, path: str | Path) -> "PQEncoder":
        path = Path(path)
        payload = joblib.load(path)

        if isinstance(payload, cls): return payload

        obj = cls(**payload["init"])
        state = payload["state"]
        obj.encoder_is_trained = state["encoder_is_trained"]
        obj.og_D = state["og_D"]
        obj.D_subvector = state["D_subvector"]
        obj.codewords = state["codewords"]
        obj.pq_trained = state.get("pq_trained", [])
        return obj

pq_trained = [] instance-attribute

The codebook is defined as the Cartesian product of all centroids. Storing the codebook C explicitly is not efficient, the full codebook would be (k')^m centroids Instead we store the K' centroids of all the subquantizers (k' · m). We can later simulate the full codebook by combining the centroids from each subquantizer.

__init__(k=256, m=8, iterations=20)

Initializes the encoder with trained sub-block centroids.

Parameters:

Name Type Description Default
k int

Number of centroids. Default is 256. We assume that all subquantizers

256
m int

Number of distinct subvectors of the input X vector.

8
iterations int

Number of iterations for the k-means

20

High values of k increase the computational cost of the quantizer as well as memory usage of strogin the centroids (k' x D floating points). Using k=256 and m=8 is often a reasonable choice.

Reference: DOI: 10.1109/TPAMI.2010.57

Source code in chelombus/encoder/encoder.py
def __init__(self, k:int=256, m:int=8, iterations=20):
    """ Initializes the encoder with trained sub-block centroids.

    Args:
        k (int): Number of centroids. Default is 256. We assume that all subquantizers 
        have the same finit number (k') of reproduction values. So k = (k')^m 
        m (int): Number of distinct subvectors of the input X vector. 
        m is a subvector of dimension D' = D/m where D is the 
        dimension of the input vector X. D is therefore a multiple of m
        iterations (int): Number of iterations for the k-means

    High values of k increase the computational cost of the quantizer as well as memory 
    usage of strogin the centroids (k' x D floating points). Using k=256 and m=8 is often
    a reasonable choice. 

    Reference: DOI: 10.1109/TPAMI.2010.57
    """

    self.m = m 
    self.k = k 
    self.iterations = iterations
    self.encoder_is_trained = False
    self.codebook_dtype =  np.uint8 if self.k <= 2**8 else (np.uint16 if self.k<= 2**16 else np.uint32)
    self.pq_trained = []

    """ The codebook is defined as the Cartesian product of all centroids. 
    Storing the codebook C explicitly is not efficient, the full codebook would be (k')^m  centroids
    Instead we store the K' centroids of all the subquantizers (k' · m). We can later simulate the full 
    codebook by combining the centroids from each subquantizer. 
    """

fit(X_train, verbose=1, device='cpu', **kwargs)

KMeans fitting of every subvector matrix from the X_train matrix. Populates the codebook by storing the cluster centers of every subvector

X_train is the input matrix. For a vector that has dimension D then X_train is a matrix of size (N, D) where N is the number of rows (vectors) and D the number of columns (dimension of every vector i.e. fingerprint in the case of molecular data)

Parameters:

Name Type Description Default
X_train array

Input matrix to train the encoder.

required
verbose int

Level of verbosity. Default is 1

1
device str

'cpu' for sklearn KMeans, 'gpu' for torch-based KMeans on CUDA, 'auto' to pick GPU if available. Default is 'cpu'.

'cpu'
**kwargs

Optional keyword arguments passed to the underlying KMeans fit() function (only used on the CPU path).

{}
Source code in chelombus/encoder/encoder.py
def fit(self, X_train:NDArray, verbose:int=1, device:str='cpu', **kwargs)->None:
    """ KMeans fitting of every subvector matrix from the X_train matrix. Populates
    the codebook by storing the cluster centers of every subvector

    X_train is the input matrix. For a vector that has dimension D
    then X_train is a matrix of size (N, D)
    where N is the number of rows (vectors) and D the number of columns (dimension of
    every vector i.e. fingerprint in the case of molecular data)

    Args:
       X_train(np.array): Input matrix to train the encoder.
       verbose(int): Level of verbosity. Default is 1
       device: 'cpu' for sklearn KMeans, 'gpu' for torch-based KMeans on CUDA,
               'auto' to pick GPU if available. Default is 'cpu'.
       **kwargs: Optional keyword arguments passed to the underlying KMeans `fit()` function
                 (only used on the CPU path).
    """

    assert X_train.ndim == 2, "The input can only be a matrix (X.ndim == 2)"
    N, D = X_train.shape # N number of input vectors, D dimension of the vectors
    assert self.k < N, "the number of training vectors (N for N,D = X_train.shape) should be more than the number of centroids (K)"
    assert D % self.m == 0, f"Vector (fingeprint) dimension should be divisible by the number of subvectors (m). Got {D} / {self.m}"
    self.D_subvector = int(D / self.m) # Dimension of the subvector.
    self.og_D = D # We save the original dimensions of the input vector (fingerprint) for later use
    assert self.encoder_is_trained == False, "Encoder can only be fitted once"

    self.codewords= np.zeros((self.m, self.k, self.D_subvector), dtype=np.float32)

    use_gpu = (device == 'gpu') or (device == 'auto' and _GPU_AVAILABLE)
    if use_gpu:
        if not _GPU_AVAILABLE:
            raise RuntimeError("GPU requested but CUDA not available")
        self._fit_gpu(X_train, verbose)
    else:
        self._fit_cpu(X_train, verbose, **kwargs)

    self.encoder_is_trained = True
    del X_train # remove initial training data from memory

transform(X, verbose=1, device='auto', **kwargs)

Transforms the input matrix X into its PQ-codes.

For each sample in X, the input vector is split into m equal-sized subvectors. Each subvector is assigned to the nearest cluster centroid and the index of the closest centroid is stored.

The result is a compact representation of X, where each sample is encoded as a sequence of centroid indices.

Parameters:

Name Type Description Default
X ndarray

Input data matrix of shape (n_samples, n_features), where n_features must be divisible by the number of subvectors m.

required
verbose int

Level of verbosity. Default is 1

1
device str

'cpu' for sklearn, 'gpu' for torch.cdist on CUDA, 'auto' to pick GPU if available.

'auto'
**kwargs

Optional keyword arguments passed to the underlying KMeans predict() function.

{}

Returns:

Type Description
NDArray

np.ndarray: PQ codes of shape (n_samples, m), where each element is the index of the nearest centroid for the corresponding subvector.

Source code in chelombus/encoder/encoder.py
def transform(self, X:NDArray, verbose:int=1, device:str='auto', **kwargs) -> NDArray:
    """
    Transforms the input matrix X into its PQ-codes.

    For each sample in X, the input vector is split into `m` equal-sized subvectors.
    Each subvector is assigned to the nearest cluster centroid
    and the index of the closest centroid is stored.

    The result is a compact representation of X, where each sample is encoded as a sequence of centroid indices.

    Args:
        X (np.ndarray): Input data matrix of shape (n_samples, n_features),
                        where n_features must be divisible by the number of subvectors `m`.
        verbose(int): Level of verbosity. Default is 1
        device: 'cpu' for sklearn, 'gpu' for torch.cdist on CUDA, 'auto' to pick GPU if available.
        **kwargs: Optional keyword arguments passed to the underlying KMeans `predict()` function.

    Returns:
        np.ndarray: PQ codes of shape (n_samples, m), where each element is the index of the nearest centroid
                    for the corresponding subvector.
    """

    assert self.encoder_is_trained == True, "PQEncoder must be trained before calling transform"

    use_gpu = (device == 'gpu') or (device == 'auto' and _GPU_AVAILABLE)
    if use_gpu:
        if not _GPU_AVAILABLE:
            raise RuntimeError("GPU requested but CUDA not available")
        return self._transform_gpu(X)

    return self._transform_cpu(X, verbose, **kwargs)

fit_transform(X, verbose=1, device='cpu', **kwargs)

Fit and transforms the input matrix X into its PQ-codes

The encoder is trained on the matrix and then for each sample in X, the input vector is split into m equal-sized vectors subvectors composed byt the index of the closest centroid. Returns a compact representation of X, where each sample is encoded as a sequence of centroid indices (i.e PQcodes)

Parameters:

Name Type Description Default
X array

Input data matrix of shape (n_samples, n_features)

required
verbose int

Level of verbosity. Defaults to 1.

1
device str

'cpu', 'gpu', or 'auto'. Default is 'cpu'.

'cpu'
**kwargs

Optional keyword. These arguments will be passed to the underlying KMeans

{}

Returns:

Type Description
NDArray

np.array: PQ codes of shape (n_samples, m), where each element is the index of the nearest

NDArray

centroid for the corresponding subvector

Source code in chelombus/encoder/encoder.py
def fit_transform(self, X:NDArray, verbose:int=1, device:str='cpu', **kwargs) -> NDArray:
    """Fit and transforms the input matrix `X` into its PQ-codes

    The encoder is trained on the matrix and then for each sample in X,
      the input vector is split into `m` equal-sized vectors subvectors composed
      byt the index of the closest centroid. Returns a compact representation of X,
      where each sample is encoded as a sequence of centroid indices (i.e PQcodes)

    Args:
        X (np.array): Input data matrix of shape (n_samples, n_features)
        verbose (int, optional): Level of verbosity. Defaults to 1.
        device: 'cpu', 'gpu', or 'auto'. Default is 'cpu'.
        **kwargs: Optional keyword. These arguments will be passed to the underlying KMeans
        predict() function.

    Returns:
        np.array: PQ codes of shape (n_samples, m), where each element is the index of the nearest
        centroid for the corresponding subvector
    """

    self.fit(X, verbose, device=device, **kwargs)
    return self.transform(X, verbose, device=device)

inverse_transform(pq_codes, binary=False, round=True)

Inverse transform. From PQ-code to the original vector. This process is lossy so we don't expect to get the exact same data. If binary=True then the vectors will be returned in binary. This is useful for the case where our original vectors were binary. With binary=True then the returned vectors are transformed from [0.32134, 0.8232, 0.0132, ... 0.1432, 1.19234] to [0, 1, 0, ..., 0, 1] If round=True then the vector will be approximated to integer values (useful in cases where we expeect to have a count-based fingerprint)

Parameters:

Name Type Description Default
pq_code

(np.array): Input data of PQ codes to be transformed into the

required
binary

(bool): Wheter to return the vectors rounded to 0s and 1s. Default is False

False
round

(bool): Round inversed vector to integers values. Default is True

True
Source code in chelombus/encoder/encoder.py
def inverse_transform(self, 
                      pq_codes:NDArray, 
                      binary=False, 
                      round=True):
    """ Inverse transform. From PQ-code to the original vector. 
    This process is lossy so we don't expect to get the exact same data.
    If binary=True then the vectors will be returned in binary. 
    This is useful for the case where our original vectors were binary. 
    With binary=True then the returned vectors are transformed from 
    [0.32134, 0.8232, 0.0132, ... 0.1432, 1.19234] to 
    [0, 1, 0, ..., 0, 1]
    If round=True then the vector will be approximated to integer values
    (useful in cases where we expeect to have a count-based fingerprint)

    Args:
        pq_code: (np.array): Input data of PQ codes to be transformed into the
        original vectors.
        binary: (bool): Wheter to return the vectors rounded to 0s and 1s. Default is False
        round: (bool): Round inversed vector to integers values. Default is True  
    """

    # Get shape of the input matrix of PQ codes
    N, D = pq_codes.shape

    # The dimension of the PQ vectors should be the same 
    # as the number of splits (subvectors) from the original data 

    assert D == (self.m), f"The dimension D of the PQ-codes (N,D) should be the same as the number of the subvectors or splits (m) . Got D = {D} for m = {self.m}"
    assert D == (self.og_D  / self.D_subvector), f"The dimension D of the PQ-codes (N,D) should be the same as the original vector dimension divided the subvector dimension"

    X_inversed = np.empty((N, D*self.D_subvector), dtype=float)
    for subvector_idx in range(self.m):
        X_inversed[:, subvector_idx*self.D_subvector:((subvector_idx+1)*self.D_subvector)] = self.codewords[subvector_idx][pq_codes[:, subvector_idx], :]

    if binary:
        reconstructed_binary = (X_inversed>= 0.6).astype('int8')
        return reconstructed_binary 

    if round: return X_inversed.astype(int)
    else: return X_inversed

PQKMeans

chelombus.clustering.PyQKmeans.PQKMeans

This class provides a scikit-learn-like interface to the PQk-means algorithm, which operates directly on PQ codes using symmetric distance.

Parameters:

Name Type Description Default
encoder PQEncoder

A trained PQEncoder instance

required
k int

Number of clusters

required
iteration int

Maximum number of k-means iterations (default: 20)

20
tol float

Early-stopping tolerance for the GPU path. Training stops when the fraction of changed center coordinates drops below tol. 0 means stop only on exact convergence. (default: 1e-3)

0.001
verbose bool

Whether to print progress information (default: False)

False
Example

encoder = PQEncoder(k=256, m=6) encoder.fit(training_data) pq_codes = encoder.transform(data) clusterer = PQKMeans(encoder, k=100000) labels = clusterer.fit_predict(pq_codes)

Source code in chelombus/clustering/PyQKmeans.py
class PQKMeans:
    """
    This class provides a scikit-learn-like interface to the PQk-means algorithm,
    which operates directly on PQ codes using symmetric distance.

    Args:
        encoder: A trained PQEncoder instance
        k: Number of clusters
        iteration: Maximum number of k-means iterations (default: 20)
        tol: Early-stopping tolerance for the GPU path. Training stops when
             the fraction of changed center coordinates drops below *tol*.
             0 means stop only on exact convergence. (default: 1e-3)
        verbose: Whether to print progress information (default: False)

    Example:
        >>> encoder = PQEncoder(k=256, m=6)
        >>> encoder.fit(training_data)
        >>> pq_codes = encoder.transform(data)
        >>> clusterer = PQKMeans(encoder, k=100000)
        >>> labels = clusterer.fit_predict(pq_codes)
    """

    def __init__(
        self,
        encoder: PQEncoder,
        k: int,
        iteration: int = 20,
        tol: float = 1e-3,
        verbose: bool = False
    ):
        if not encoder.encoder_is_trained: # type: ignore
            raise ValueError("Encoder must be trained before clustering")

        self.encoder = encoder
        self.k = k
        self.iteration = iteration
        self.tol = tol
        self.verbose = verbose
        self.trained = False
        self._dtables = None
        self._centers_u8 = None
        self._fit_labels = None
        self._cluster = pqkmeans.clustering.PQKMeans(
            encoder=encoder,
            k=k,
            iteration=iteration,
            verbose=verbose,
        )

    def _gpu_support_reason(self) -> str | None:
        if not _GPU_AVAILABLE:
            return "CUDA/Triton not available"
        if self.encoder.k > 256:
            return (
                "GPU path currently supports only 8-bit PQ codes "
                f"(encoder.k <= 256), got encoder.k={self.encoder.k}"
            )
        return None

    def _should_use_gpu(self, device: str) -> bool:
        if device not in {"auto", "cpu", "gpu"}:
            raise ValueError(f"device must be 'auto', 'cpu', or 'gpu', got {device!r}")

        gpu_reason = self._gpu_support_reason()
        if device == "gpu":
            if gpu_reason is not None:
                raise RuntimeError(f"GPU requested but unavailable: {gpu_reason}")
            return True

        return device == "auto" and gpu_reason is None

    @property
    def cluster_centers_(self) -> np.ndarray:
        """Get cluster centers (PQ codes of shape (k, m))."""
        return np.array(self._cluster.cluster_centers_)

    @cluster_centers_.setter
    def cluster_centers_(self, centers: np.ndarray) -> None:
        """Set cluster centers and mark as trained."""
        centers_codes = np.asarray(centers, dtype=self.encoder.codebook_dtype)
        self._cluster._impl.set_cluster_centers(centers_codes.tolist())
        self.trained = True
        self._centers_u8 = None
        self._fit_labels = None

    def fit(self, X_train: np.ndarray, device: str = 'auto') -> 'PQKMeans':
        """Fit the PQk-means model to training PQ codes.

        Args:
            X_train: PQ codes of shape (n_samples, n_subvectors)
            device: 'cpu' uses the C++ backend,
                    'gpu' uses Triton assignment + CPU centroid update,
                    'auto' picks GPU when available (default).

        Returns:
            self
        """
        use_gpu = self._should_use_gpu(device)

        if use_gpu:
            self._fit_gpu(X_train)
        else:
            self._cluster.fit(X_train)

        self.trained = True
        self._dtables = None
        self._centers_u8 = None
        self._fit_labels = None
        return self

    def _fit_gpu(self, X_train: np.ndarray, return_labels: bool = False) -> np.ndarray | None:
        """GPU-accelerated training: Triton assignment + CPU centroid update."""
        import time

        pq_codes = np.ascontiguousarray(X_train, dtype=np.uint8)
        N, m = pq_codes.shape
        dtables = _build_distance_tables(self.encoder.codewords)
        fit_batch = max(N // 20, 1_000_000)

        # Initialise centres by sampling K data points at random
        rng = np.random.default_rng()
        indices = rng.choice(N, size=self.k, replace=(N < self.k))
        centers = pq_codes[indices].copy()

        labels = None
        prev_centers = None
        final_labels_match_centers = False
        for it in range(self.iteration):
            t0 = time.time()

            # assignment (GPU)
            # Force ~20 batches for progress reporting on large N
            labels = predict_gpu(pq_codes, centers, dtables,
                                 batch_size=fit_batch, verbose=self.verbose)
            t1 = time.time()

            # centroid update (CPU)
            new_centers = _update_centers(
                pq_codes,
                labels,
                self.k,
                dtables,
                previous_centers=centers,
            )
            t2 = time.time()

            changed = int(np.sum(new_centers != centers))
            total_coords = self.k * m
            frac = changed / total_coords

            if self.verbose:
                print(
                    f"  iter {it + 1}/{self.iteration}  "
                    f"assign={t1 - t0:.1f}s  update={t2 - t1:.1f}s  "
                    f"changed={changed}/{total_coords} ({frac:.4%})"
                )

            # Early stopping: below tolerance or an exact 2-cycle oscillation.
            converged = frac <= self.tol
            oscillating = prev_centers is not None and np.array_equal(new_centers, prev_centers)
            final_labels_match_centers = changed == 0
            old_centers = centers
            centers = new_centers

            if converged or oscillating:
                if self.verbose:
                    reason = "converged" if converged else "oscillation"
                    print(f"  Stopped at iteration {it + 1} ({reason})")
                break
            prev_centers = old_centers.copy()

        # Store centres in the pqkmeans backend for compatibility
        self._cluster._impl.set_cluster_centers(centers.tolist())
        self._fit_labels = None

        if not return_labels:
            return None

        if labels is None:
            raise RuntimeError("GPU fit did not produce assignments")

        if final_labels_match_centers:
            return labels

        return predict_gpu(
            pq_codes,
            centers,
            dtables,
            batch_size=fit_batch,
            verbose=self.verbose,
        )

    def predict(self, X: np.ndarray, device: str = 'auto') -> np.ndarray:
        """Predict cluster labels for PQ codes.

        Args:
            X: PQ codes of shape (n_samples, n_subvectors), dtype uint8
            device: 'cpu' for Numba, 'gpu' for Triton/CUDA, 'auto' to pick GPU if available.

        Returns:
            Cluster labels of shape (n_samples,)
        """
        if not self.trained: # type: ignore
            raise ValueError("Must be trained before clustering. Use `.fit()` first")
        if self._dtables is None:
            self._dtables = _build_distance_tables(self.encoder.codewords)
            self._centers_u8 = self.cluster_centers_.astype(self.encoder.codebook_dtype)

        use_gpu = self._should_use_gpu(device)

        if use_gpu:
            codes = np.asarray(X, dtype=np.uint8)
            centers = np.asarray(self._centers_u8, dtype=np.uint8)
            return predict_gpu(codes, centers, self._dtables)

        codes = np.asarray(X, dtype=self.encoder.codebook_dtype)
        return _predict_numba(codes, self._centers_u8, self._dtables)

    def fit_predict(self, X: np.ndarray, device: str = 'auto') -> np.ndarray:
        """Fit the model and predict cluster labels in one step.

        Args:
            X: PQ codes of shape (n_samples, n_subvectors)
            device: 'cpu', 'gpu', or 'auto' (default).

        Returns:
            Cluster labels of shape (n_samples,)
        """
        use_gpu = self._should_use_gpu(device)

        if use_gpu:
            labels = self._fit_gpu(X, return_labels=True)
            self.trained = True
            self._dtables = None
            self._centers_u8 = None
            self._fit_labels = None
            return labels

        labels = np.array(self._cluster.fit_predict(X))
        self.trained = True
        self._dtables = None
        self._centers_u8 = None
        self._fit_labels = None
        return labels

    @property
    def is_trained(self) -> bool:
        return bool(self.trained) # type: ignore

    def __getstate__(self):
        state = self.__dict__.copy()
        state["_dtables"] = None
        state["_centers_u8"] = None
        state["_fit_labels"] = None
        return state

    def save(self, path: str | Path) -> None:
        path = Path(path)
        joblib.dump(self, path)

    @classmethod
    def load(cls, path: str | Path) -> "PQKMeans":
        path = Path(path)
        obj = joblib.load(path)
        if not isinstance(obj, cls):
            raise TypeError(f"Expected {cls.__name__}, got {type(obj).__name__}")
        if not hasattr(obj, '_dtables'):
            obj._dtables = None
        if not hasattr(obj, '_centers_u8'):
            obj._centers_u8 = None
        if not hasattr(obj, '_fit_labels'):
            obj._fit_labels = None
        return obj

cluster_centers_ property writable

Get cluster centers (PQ codes of shape (k, m)).

fit(X_train, device='auto')

Fit the PQk-means model to training PQ codes.

Parameters:

Name Type Description Default
X_train ndarray

PQ codes of shape (n_samples, n_subvectors)

required
device str

'cpu' uses the C++ backend, 'gpu' uses Triton assignment + CPU centroid update, 'auto' picks GPU when available (default).

'auto'

Returns:

Type Description
PQKMeans

self

Source code in chelombus/clustering/PyQKmeans.py
def fit(self, X_train: np.ndarray, device: str = 'auto') -> 'PQKMeans':
    """Fit the PQk-means model to training PQ codes.

    Args:
        X_train: PQ codes of shape (n_samples, n_subvectors)
        device: 'cpu' uses the C++ backend,
                'gpu' uses Triton assignment + CPU centroid update,
                'auto' picks GPU when available (default).

    Returns:
        self
    """
    use_gpu = self._should_use_gpu(device)

    if use_gpu:
        self._fit_gpu(X_train)
    else:
        self._cluster.fit(X_train)

    self.trained = True
    self._dtables = None
    self._centers_u8 = None
    self._fit_labels = None
    return self

predict(X, device='auto')

Predict cluster labels for PQ codes.

Parameters:

Name Type Description Default
X ndarray

PQ codes of shape (n_samples, n_subvectors), dtype uint8

required
device str

'cpu' for Numba, 'gpu' for Triton/CUDA, 'auto' to pick GPU if available.

'auto'

Returns:

Type Description
ndarray

Cluster labels of shape (n_samples,)

Source code in chelombus/clustering/PyQKmeans.py
def predict(self, X: np.ndarray, device: str = 'auto') -> np.ndarray:
    """Predict cluster labels for PQ codes.

    Args:
        X: PQ codes of shape (n_samples, n_subvectors), dtype uint8
        device: 'cpu' for Numba, 'gpu' for Triton/CUDA, 'auto' to pick GPU if available.

    Returns:
        Cluster labels of shape (n_samples,)
    """
    if not self.trained: # type: ignore
        raise ValueError("Must be trained before clustering. Use `.fit()` first")
    if self._dtables is None:
        self._dtables = _build_distance_tables(self.encoder.codewords)
        self._centers_u8 = self.cluster_centers_.astype(self.encoder.codebook_dtype)

    use_gpu = self._should_use_gpu(device)

    if use_gpu:
        codes = np.asarray(X, dtype=np.uint8)
        centers = np.asarray(self._centers_u8, dtype=np.uint8)
        return predict_gpu(codes, centers, self._dtables)

    codes = np.asarray(X, dtype=self.encoder.codebook_dtype)
    return _predict_numba(codes, self._centers_u8, self._dtables)

fit_predict(X, device='auto')

Fit the model and predict cluster labels in one step.

Parameters:

Name Type Description Default
X ndarray

PQ codes of shape (n_samples, n_subvectors)

required
device str

'cpu', 'gpu', or 'auto' (default).

'auto'

Returns:

Type Description
ndarray

Cluster labels of shape (n_samples,)

Source code in chelombus/clustering/PyQKmeans.py
def fit_predict(self, X: np.ndarray, device: str = 'auto') -> np.ndarray:
    """Fit the model and predict cluster labels in one step.

    Args:
        X: PQ codes of shape (n_samples, n_subvectors)
        device: 'cpu', 'gpu', or 'auto' (default).

    Returns:
        Cluster labels of shape (n_samples,)
    """
    use_gpu = self._should_use_gpu(device)

    if use_gpu:
        labels = self._fit_gpu(X, return_labels=True)
        self.trained = True
        self._dtables = None
        self._centers_u8 = None
        self._fit_labels = None
        return labels

    labels = np.array(self._cluster.fit_predict(X))
    self.trained = True
    self._dtables = None
    self._centers_u8 = None
    self._fit_labels = None
    return labels

Visualization

chelombus.utils.visualization.create_tmap(smiles, fingerprint='morgan', fingerprint_bits=2048, fingerprint_radius=2, properties=None, tmap_name='my_tmap', k_neighbors=20)

Create a TMAP visualization from SMILES strings.

Parameters:

Name Type Description Default
smiles list[str]

List of SMILES strings to visualize.

required
fingerprint str

Fingerprint type ('morgan' or 'mqn').

'morgan'
fingerprint_bits int

Number of bits for Morgan fingerprint (default: 2048).

2048
fingerprint_radius int

Radius for Morgan fingerprint (default: 2).

2
properties list[str] | None

List of molecular properties to display as color channels. If None, uses all available properties.

None
tmap_name str

Name for the output TMAP HTML file.

'my_tmap'
k_neighbors int

Number of neighbors for layout (default: 20).

20
Source code in chelombus/utils/visualization.py
def create_tmap(
    smiles: list[str],
    fingerprint: str = 'morgan',
    fingerprint_bits: int = 2048,
    fingerprint_radius: int = 2,
    properties: list[str] | None = None,
    tmap_name: str = 'my_tmap',
    k_neighbors: int = 20,
) -> None:
    """Create a TMAP visualization from SMILES strings.

    Args:
        smiles: List of SMILES strings to visualize.
        fingerprint: Fingerprint type ('morgan' or 'mqn').
        fingerprint_bits: Number of bits for Morgan fingerprint (default: 2048).
        fingerprint_radius: Radius for Morgan fingerprint (default: 2).
        properties: List of molecular properties to display as color channels.
                   If None, uses all available properties.
        tmap_name: Name for the output TMAP HTML file.
        k_neighbors: Number of neighbors for layout (default: 20).
    """
    if properties is None:
        properties = DEFAULT_PROPERTIES

    _validate_properties(properties)

    # Compute fingerprints via tmap2
    fp_kwargs = {}
    if fingerprint == 'morgan':
        fp_kwargs = {'n_bits': fingerprint_bits, 'radius': fingerprint_radius}

    fps = fingerprints_from_smiles(smiles, fp_type=fingerprint, **fp_kwargs)

    # Fit TMAP
    model = TMAP(metric='jaccard', n_neighbors=k_neighbors).fit(fps)

    # Build visualization
    viz = model.to_tmapviz(include_edges=True)
    viz.title = tmap_name
    viz.background_color = "#FFFFFF"

    # Add molecular properties as color layers
    props = molecular_properties(smiles, properties=properties)
    for prop_name, values in props.items():
        viz.add_color_layout(prop_name, values, categorical=False, color='rainbow')

    # Add SMILES for structure rendering in tooltips
    viz.add_smiles(smiles)

    viz.write_html(f"{tmap_name}.html")

chelombus.utils.visualization.representative_tmap(smiles, cluster_ids=None, fingerprint='mqn', fingerprint_bits=2048, fingerprint_radius=2, properties=None, tmap_name='representative_tmap', k_neighbors=21)

Create a TMAP for representative molecules.

Recommended for primary TMAPs showing cluster representatives.

Parameters:

Name Type Description Default
smiles list[str]

List of SMILES strings (typically cluster representatives).

required
cluster_ids Sequence[int | str] | None

Optional list of cluster IDs corresponding to each SMILES. If provided, added as a categorical color channel.

None
fingerprint str

Fingerprint type ('mqn' recommended, or 'morgan').

'mqn'
fingerprint_bits int

Number of bits for Morgan fingerprint (default: 2048).

2048
fingerprint_radius int

Radius for Morgan fingerprint (default: 2).

2
properties list[str] | None

List of molecular properties to display as color channels. If None, uses all available properties.

None
tmap_name str

Name for the output TMAP HTML file.

'representative_tmap'
k_neighbors int

Number of neighbors for KNN graph (default: 21).

21
Source code in chelombus/utils/visualization.py
def representative_tmap(
    smiles: list[str],
    cluster_ids: Sequence[int | str] | None = None,
    fingerprint: str = 'mqn',
    fingerprint_bits: int = 2048,
    fingerprint_radius: int = 2,
    properties: list[str] | None = None,
    tmap_name: str = 'representative_tmap',
    k_neighbors: int = 21,
) -> None:
    """Create a TMAP for representative molecules.

    Recommended for primary TMAPs showing cluster representatives.

    Args:
        smiles: List of SMILES strings (typically cluster representatives).
        cluster_ids: Optional list of cluster IDs corresponding to each SMILES.
                    If provided, added as a categorical color channel.
        fingerprint: Fingerprint type ('mqn' recommended, or 'morgan').
        fingerprint_bits: Number of bits for Morgan fingerprint (default: 2048).
        fingerprint_radius: Radius for Morgan fingerprint (default: 2).
        properties: List of molecular properties to display as color channels.
                   If None, uses all available properties.
        tmap_name: Name for the output TMAP HTML file.
        k_neighbors: Number of neighbors for KNN graph (default: 21).
    """
    if properties is None:
        properties = DEFAULT_PROPERTIES

    _validate_properties(properties)

    # Compute fingerprints via tmap2
    fp_kwargs = {}
    if fingerprint == 'morgan':
        fp_kwargs = {'n_bits': fingerprint_bits, 'radius': fingerprint_radius}

    fps = fingerprints_from_smiles(smiles, fp_type=fingerprint, **fp_kwargs)

    # Fit TMAP — use euclidean for dense descriptors like MQN
    metric = 'euclidean' if fingerprint == 'mqn' else 'jaccard'
    model = TMAP(metric=metric, n_neighbors=k_neighbors).fit(fps)

    # Build visualization
    viz = model.to_tmapviz(include_edges=True)
    viz.title = tmap_name
    viz.background_color = "#FFFFFF"

    # Add molecular properties as color layers
    props = molecular_properties(smiles, properties=properties)
    for prop_name, values in props.items():
        viz.add_color_layout(prop_name, values, categorical=False, color='rainbow')

    # Add cluster ID as categorical color channel
    if cluster_ids is not None:
        viz.add_color_layout(
            'Cluster ID',
            [str(cid) for cid in cluster_ids],
            categorical=True,
            color='tab20',
        )

    # Add SMILES for structure rendering in tooltips
    viz.add_smiles(smiles)

    viz.write_html(f"{tmap_name}.html")

Cluster I/O

chelombus.utils.cluster_io

Cluster I/O utilities using DuckDB for efficient querying.

This module provides functions for querying and exporting clustered molecular data stored in chunked parquet files. It uses DuckDB for efficient SQL-based queries across multiple files without loading everything into memory.

Architecture

During pipeline.transform(), data is saved as chunked parquet files: results/chunk_00001.parquet (smiles, cluster_id) results/chunk_00002.parquet (smiles, cluster_id) ...

These functions query across all chunks using DuckDB's glob pattern support, enabling efficient filtering and export without loading all data into RAM.

Example

from chelombus.utils.cluster_io import query_cluster, export_all_clusters

Get molecules from cluster 42

df = query_cluster('results/', cluster_id=42)

Export all clusters (for HPC/SLURM processing)

stats = export_all_clusters('results/', 'clusters/')

query_cluster(results_dir, cluster_id, cluster_column='cluster_id', columns=None)

Query all molecules from a specific cluster.

Scans all chunked parquet files in results_dir and returns molecules belonging to the specified cluster as a pandas DataFrame.

Parameters:

Name Type Description Default
results_dir Union[str, Path]

Path to directory containing chunked parquet files (output from pipeline.transform()).

required
cluster_id int

The cluster ID to query.

required
cluster_column str

Name of the column containing cluster IDs.

'cluster_id'
columns Optional[List[str]]

Optional list of columns to return. Default returns all columns. Common columns: ['smiles', 'cluster_id'].

None

Returns:

Type Description
'pd.DataFrame'

DataFrame containing all molecules from the specified cluster.

Raises:

Type Description
ImportError

If duckdb is not installed.

FileNotFoundError

If results_dir doesn't exist.

ValueError

If no parquet files found or cluster_id doesn't exist.

Example

df = query_cluster('results/', cluster_id=42) print(f"Cluster 42 has {len(df)} molecules") print(df['smiles'].head())

Source code in chelombus/utils/cluster_io.py
def query_cluster(
    results_dir: Union[str, Path],
    cluster_id: int,
    cluster_column: str = "cluster_id",
    columns: Optional[List[str]] = None,
) -> "pd.DataFrame":
    """Query all molecules from a specific cluster.

    Scans all chunked parquet files in results_dir and returns molecules
    belonging to the specified cluster as a pandas DataFrame.

    Args:
        results_dir: Path to directory containing chunked parquet files
            (output from pipeline.transform()).
        cluster_id: The cluster ID to query.
        cluster_column: Name of the column containing cluster IDs.
        columns: Optional list of columns to return. Default returns all columns.
            Common columns: ['smiles', 'cluster_id'].

    Returns:
        DataFrame containing all molecules from the specified cluster.

    Raises:
        ImportError: If duckdb is not installed.
        FileNotFoundError: If results_dir doesn't exist.
        ValueError: If no parquet files found or cluster_id doesn't exist.

    Example:
        >>> df = query_cluster('results/', cluster_id=42)
        >>> print(f"Cluster 42 has {len(df)} molecules")
        >>> print(df['smiles'].head())
    """
    _check_duckdb_available()
    import duckdb

    path = _validate_results_dir(results_dir)
    glob_pattern = str(path / "*.parquet")
    quoted_cluster_column = _quote_identifier(cluster_column)

    # Build column selection
    if columns:
        col_str = ", ".join(_quote_identifier(col) for col in columns)
    else:
        col_str = "*"

    query = f"""
        SELECT {col_str}
        FROM '{glob_pattern}'
        WHERE {quoted_cluster_column} = {cluster_id}
    """

    result = duckdb.query(query).df()

    if result.empty:
        logger.warning(f"No molecules found for cluster_id={cluster_id}")

    return result

export_cluster(results_dir, cluster_id, output_path, cluster_column='cluster_id', columns=None)

Export a single cluster to a file.

Queries all molecules from a specific cluster and writes them to a file. Supports parquet and CSV output formats.

Parameters:

Name Type Description Default
results_dir Union[str, Path]

Path to directory containing chunked parquet files.

required
cluster_id int

The cluster ID to export.

required
output_path Union[str, Path]

Output file path. Must end with .parquet or .csv.

required
cluster_column str

Name of the column containing cluster IDs.

'cluster_id'
columns Optional[List[str]]

Optional list of columns to export. Default exports all columns.

None

Returns:

Type Description
int

Number of molecules exported.

Raises:

Type Description
ImportError

If duckdb is not installed.

ValueError

If output format is not supported.

Example

count = export_cluster('results/', 42, 'cluster_42.parquet') print(f"Exported {count} molecules")

Source code in chelombus/utils/cluster_io.py
def export_cluster(
    results_dir: Union[str, Path],
    cluster_id: int,
    output_path: Union[str, Path],
    cluster_column: str = "cluster_id",
    columns: Optional[List[str]] = None,
) -> int:
    """Export a single cluster to a file.

    Queries all molecules from a specific cluster and writes them to a file.
    Supports parquet and CSV output formats.

    Args:
        results_dir: Path to directory containing chunked parquet files.
        cluster_id: The cluster ID to export.
        output_path: Output file path. Must end with .parquet or .csv.
        cluster_column: Name of the column containing cluster IDs.
        columns: Optional list of columns to export. Default exports all columns.

    Returns:
        Number of molecules exported.

    Raises:
        ImportError: If duckdb is not installed.
        ValueError: If output format is not supported.

    Example:
        >>> count = export_cluster('results/', 42, 'cluster_42.parquet')
        >>> print(f"Exported {count} molecules")
    """
    _check_duckdb_available()
    import duckdb

    path = _validate_results_dir(results_dir)
    output_path = Path(output_path)
    glob_pattern = str(path / "*.parquet")
    quoted_cluster_column = _quote_identifier(cluster_column)

    # Build column selection
    col_str = ", ".join(columns) if columns else "*"

    # First get the count
    count_query = f"""
        SELECT COUNT(*) as cnt
        FROM '{glob_pattern}'
        WHERE {quoted_cluster_column} = {cluster_id}
    """
    count = duckdb.query(count_query).fetchone()[0] # type: ignore

    if count == 0:
        logger.warning(f"No molecules found for cluster_id={cluster_id}")
        return 0

    # Create parent directory if needed
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Export based on format
    suffix = output_path.suffix.lower()
    if suffix == ".parquet":
        export_query = f"""
            COPY (
                SELECT {col_str}
                FROM '{glob_pattern}'
                WHERE {quoted_cluster_column} = {cluster_id}
            ) TO '{output_path}' (FORMAT PARQUET)
        """
    elif suffix == ".csv":
        export_query = f"""
            COPY (
                SELECT {col_str}
                FROM '{glob_pattern}'
                WHERE {quoted_cluster_column} = {cluster_id}
            ) TO '{output_path}' (FORMAT CSV, HEADER)
        """
    elif suffix in (".smi", ".txt"):
        # For SMILES files, export only the smiles column without header
        export_query = f"""
            COPY (
                SELECT smiles
                FROM '{glob_pattern}'
                WHERE {quoted_cluster_column} = {cluster_id}
            ) TO '{output_path}' (FORMAT CSV, HEADER FALSE)
        """
    else:
        raise ValueError(
            f"Unsupported output format: {suffix}. "
            "Use .parquet, .csv, .smi, or .txt"
        )

    duckdb.query(export_query)
    logger.info(f"Exported {count} molecules to {output_path}")

    return count

export_all_clusters(results_dir, output_dir, format='parquet')

Export all clusters to individual files using partitioned writes.

This function exports all clusters to separate files in a single streaming pass through the data, using DuckDB's PARTITION_BY functionality for memory efficiency.

Output structure (Hive-style partitioning): output_dir/ ├── cluster_id=0/data_0.parquet ├── cluster_id=1/data_0.parquet ├── cluster_id=2/data_0.parquet ... └── cluster_id=99999/data_0.parquet

This structure is compatible with: - DuckDB queries: SELECT * FROM 'clusters//.parquet' WHERE cluster_id = 42 - Spark/pandas: Reads partition column automatically - SLURM arrays: Process cluster_id=$SLURM_ARRAY_TASK_ID

Parameters:

Name Type Description Default
results_dir Union[str, Path]

Path to directory containing chunked parquet files.

required
output_dir Union[str, Path]

Output directory for partitioned cluster files.

required
format str

Output format, either 'parquet' or 'csv'. Default 'parquet'.

'parquet'
batch_size

Number of clusters to process per batch for progress reporting. Does not affect memory usage (DuckDB handles streaming internally).

required

Returns:

Type Description
dict[int, int]

Dictionary mapping cluster_id to molecule count.

Raises:

Type Description
ImportError

If duckdb is not installed.

ValueError

If format is not supported.

Example

stats = export_all_clusters('results/', 'clusters/') print(f"Exported {len(stats)} clusters") print(f"Largest cluster: {max(stats.values())} molecules")

Note

For HPC/SLURM usage, you can process clusters in parallel:

# SLURM array job
#SBATCH --array=0-99999

chelombus visualize \
    --input clusters/cluster_id=$SLURM_ARRAY_TASK_ID/ \
    --output tmaps/cluster_$SLURM_ARRAY_TASK_ID.html
Source code in chelombus/utils/cluster_io.py
def export_all_clusters(
    results_dir: Union[str, Path],
    output_dir: Union[str, Path],
    format: str = "parquet",
) -> dict[int, int]:
    """Export all clusters to individual files using partitioned writes.

    This function exports all clusters to separate files in a single streaming
    pass through the data, using DuckDB's PARTITION_BY functionality for
    memory efficiency.

    Output structure (Hive-style partitioning):
        output_dir/
        ├── cluster_id=0/data_0.parquet
        ├── cluster_id=1/data_0.parquet
        ├── cluster_id=2/data_0.parquet
        ...
        └── cluster_id=99999/data_0.parquet

    This structure is compatible with:
    - DuckDB queries: SELECT * FROM 'clusters/*/*.parquet' WHERE cluster_id = 42
    - Spark/pandas: Reads partition column automatically
    - SLURM arrays: Process cluster_id=$SLURM_ARRAY_TASK_ID

    Args:
        results_dir: Path to directory containing chunked parquet files.
        output_dir: Output directory for partitioned cluster files.
        format: Output format, either 'parquet' or 'csv'. Default 'parquet'.
        batch_size: Number of clusters to process per batch for progress reporting.
            Does not affect memory usage (DuckDB handles streaming internally).

    Returns:
        Dictionary mapping cluster_id to molecule count.

    Raises:
        ImportError: If duckdb is not installed.
        ValueError: If format is not supported.

    Example:
        >>> stats = export_all_clusters('results/', 'clusters/')
        >>> print(f"Exported {len(stats)} clusters")
        >>> print(f"Largest cluster: {max(stats.values())} molecules")

    Note:
        For HPC/SLURM usage, you can process clusters in parallel:

        ```bash
        # SLURM array job
        #SBATCH --array=0-99999

        chelombus visualize \\
            --input clusters/cluster_id=$SLURM_ARRAY_TASK_ID/ \\
            --output tmaps/cluster_$SLURM_ARRAY_TASK_ID.html
        ```
    """
    _check_duckdb_available()
    import duckdb
    from tqdm import tqdm

    path = _validate_results_dir(results_dir)
    output_path = Path(output_dir)
    glob_pattern = str(path / "*.parquet")

    if format not in ("parquet", "csv"):
        raise ValueError(f"Unsupported format: {format}. Use 'parquet' or 'csv'")

    # Create output directory
    output_path.mkdir(parents=True, exist_ok=True)

    logger.info(f"Exporting all clusters from {results_dir} to {output_dir}")

    # Use DuckDB's partitioned write - single pass through all data
    # This is memory-efficient as DuckDB streams the data
    format_str = "PARQUET" if format == "parquet" else "CSV"

    export_query = f"""
        COPY (
            SELECT smiles, cluster_id
            FROM '{glob_pattern}'
        ) TO '{output_path}' (
            FORMAT {format_str},
            PARTITION_BY (cluster_id),
            OVERWRITE_OR_IGNORE
        )
    """

    logger.info("Starting partitioned export (streaming, memory-efficient)...")
    duckdb.query(export_query)

    # Get cluster statistics after export
    logger.info("Calculating cluster statistics...")
    stats = get_cluster_stats(results_dir)

    # Convert to dictionary
    result = dict(zip(stats["cluster_id"], stats["molecule_count"]))

    logger.info(
        f"Exported {len(result)} clusters, "
        f"total {sum(result.values()):,} molecules"
    )

    return result

get_cluster_stats(results_dir, column='cluster_id')

Get statistics for all clusters.

Returns a DataFrame with cluster IDs and molecule counts, sorted by cluster_id.

Parameters:

Name Type Description Default
results_dir Union[str, Path]

Path to directory containing chunked parquet files.

required
column str

Name for the column containing cluster ID defaults to 'cluster_id'

'cluster_id'

Returns:

Type Description
'pd.DataFrame'

DataFrame with columns ['cluster_id', 'molecule_count'].

Example

stats = get_cluster_stats('results/') print(f"Number of clusters: {len(stats)}") print(f"Average cluster size: {stats['molecule_count'].mean():.1f}") print(f"Largest cluster: {stats['molecule_count'].max()}")

Source code in chelombus/utils/cluster_io.py
def get_cluster_stats(
    results_dir: Union[str, Path],
    column: str = 'cluster_id',
) -> "pd.DataFrame":
    """Get statistics for all clusters.

    Returns a DataFrame with cluster IDs and molecule counts, sorted by cluster_id.

    Args:
        results_dir: Path to directory containing chunked parquet files.
        column: Name for the column containing cluster ID defaults to 'cluster_id'

    Returns:
        DataFrame with columns ['cluster_id', 'molecule_count'].

    Example:
        >>> stats = get_cluster_stats('results/')
        >>> print(f"Number of clusters: {len(stats)}")
        >>> print(f"Average cluster size: {stats['molecule_count'].mean():.1f}")
        >>> print(f"Largest cluster: {stats['molecule_count'].max()}")
    """
    _check_duckdb_available()
    import duckdb

    path = _validate_results_dir(results_dir)
    glob_pattern = str(path / "*.parquet")
    quoted_column = _quote_identifier(column)

    query = f"""
        SELECT
            {quoted_column} as cluster_id,
            COUNT(*) as molecule_count
        FROM '{glob_pattern}'
        GROUP BY {quoted_column}
        ORDER BY {quoted_column}
    """

    return duckdb.query(query).df()

get_cluster_ids(results_dir, cluster_id_column_name='cluster_id')

Get list of all cluster IDs in the results.

Parameters:

Name Type Description Default
results_dir Union[str, Path]

Path to directory containing chunked parquet files.

required

Returns:

Type Description
List[int]

Sorted list of cluster IDs.

Example

cluster_ids = get_cluster_ids('results/') print(f"Clusters range from {min(cluster_ids)} to {max(cluster_ids)}")

Source code in chelombus/utils/cluster_io.py
def get_cluster_ids(
    results_dir: Union[str, Path],
    cluster_id_column_name: str = 'cluster_id',
) -> List[int]:
    """Get list of all cluster IDs in the results.

    Args:
        results_dir: Path to directory containing chunked parquet files.

    Returns:
        Sorted list of cluster IDs.

    Example:
        >>> cluster_ids = get_cluster_ids('results/')
        >>> print(f"Clusters range from {min(cluster_ids)} to {max(cluster_ids)}")
    """
    _check_duckdb_available()
    import duckdb

    path = _validate_results_dir(results_dir)
    glob_pattern = str(path / "*.parquet")

    query = f"""
        SELECT DISTINCT {_quote_identifier(cluster_id_column_name)} 
        FROM '{glob_pattern}'
        ORDER BY {_quote_identifier(cluster_id_column_name)}
    """

    result = duckdb.query(query).fetchall()
    return [row[0] for row in result]

get_total_molecules(results_dir)

Get total number of molecules across all clusters.

Parameters:

Name Type Description Default
results_dir Union[str, Path]

Path to directory containing chunked parquet files.

required

Returns:

Type Description
int

Total molecule count.

Source code in chelombus/utils/cluster_io.py
def get_total_molecules(
    results_dir: Union[str, Path],
) -> int:
    """Get total number of molecules across all clusters.

    Args:
        results_dir: Path to directory containing chunked parquet files.

    Returns:
        Total molecule count.
    """
    _check_duckdb_available()
    import duckdb

    path = _validate_results_dir(results_dir)
    glob_pattern = str(path / "*.parquet")

    query = f"SELECT COUNT(*) FROM '{glob_pattern}'"
    return duckdb.query(query).fetchone()[0] # type: ignore

query_clusters_batch(results_dir, cluster_ids, columns=None, cluster_column='cluster_id')

Query molecules from multiple clusters at once.

More efficient than calling query_cluster() multiple times when you need data from several clusters.

Parameters:

Name Type Description Default
results_dir Union[str, Path]

Path to directory containing chunked parquet files.

required
cluster_ids List[int]

List of cluster IDs to query.

required
columns Optional[List[str]]

Optional list of columns to return.

None

Returns:

Type Description
'pd.DataFrame'

DataFrame containing molecules from all specified clusters.

Example

df = query_clusters_batch('results/', [1, 2, 3, 4, 5]) cluster_counts = df.groupby('cluster_id').size()

Source code in chelombus/utils/cluster_io.py
def query_clusters_batch(
    results_dir: Union[str, Path],
    cluster_ids: List[int],
    columns: Optional[List[str]] = None,
    cluster_column: str= 'cluster_id'
) -> "pd.DataFrame":
    """Query molecules from multiple clusters at once.

    More efficient than calling query_cluster() multiple times when you need
    data from several clusters.

    Args:
        results_dir: Path to directory containing chunked parquet files.
        cluster_ids: List of cluster IDs to query.
        columns: Optional list of columns to return.

    Returns:
        DataFrame containing molecules from all specified clusters.

    Example:
        >>> df = query_clusters_batch('results/', [1, 2, 3, 4, 5])
        >>> cluster_counts = df.groupby('cluster_id').size()
    """
    _check_duckdb_available()
    import duckdb
    quoted_cluster_column = _quote_identifier(cluster_column)

    path = _validate_results_dir(results_dir)
    glob_pattern = str(path / "*.parquet")

    col_str = ", ".join(columns) if columns else "*"
    ids_str = ", ".join(str(cid) for cid in cluster_ids)

    query = f"""
        SELECT {col_str}
        FROM '{glob_pattern}'
        WHERE {quoted_cluster_column} IN ({ids_str})
    """

    return duckdb.query(query).df()

sample_from_cluster(results_dir, cluster_id, n=100, random_state=None)

Get a random sample of molecules from a cluster.

Useful for previewing cluster contents without loading all molecules.

Parameters:

Name Type Description Default
results_dir Union[str, Path]

Path to directory containing chunked parquet files.

required
cluster_id int

The cluster ID to sample from.

required
n int

Number of molecules to sample.

100
random_state Optional[int]

Random seed for reproducibility.

None

Returns:

Type Description
'pd.DataFrame'

DataFrame with sampled molecules.

Example

sample = sample_from_cluster('results/', cluster_id=42, n=10) print(sample['smiles'].tolist())

Source code in chelombus/utils/cluster_io.py
def sample_from_cluster(
    results_dir: Union[str, Path],
    cluster_id: int,
    n: int = 100,
    random_state: Optional[int] = None,
) -> "pd.DataFrame":
    """Get a random sample of molecules from a cluster.

    Useful for previewing cluster contents without loading all molecules.

    Args:
        results_dir: Path to directory containing chunked parquet files.
        cluster_id: The cluster ID to sample from.
        n: Number of molecules to sample.
        random_state: Random seed for reproducibility.

    Returns:
        DataFrame with sampled molecules.

    Example:
        >>> sample = sample_from_cluster('results/', cluster_id=42, n=10)
        >>> print(sample['smiles'].tolist())
    """
    _check_duckdb_available()
    import duckdb

    path = _validate_results_dir(results_dir)
    glob_pattern = str(path / "*.parquet")

    # Use setseed() for reproducibility when random_state is provided
    if random_state is not None:
        duckdb.query(f"SELECT setseed({random_state / 2**31})")

    # Use ORDER BY RANDOM() for sampling, which is more portable
    query = f"""
        SELECT *
        FROM '{glob_pattern}'
        WHERE cluster_id = {cluster_id}
        ORDER BY RANDOM()
        LIMIT {n}
    """

    return duckdb.query(query).df()

Helper Functions

chelombus.utils.helper_functions

format_time(seconds)

Simpel script to transform seconds into readable format

Parameters:

Name Type Description Default
seconds int

Seconds

required

Returns:

Name Type Description
str

(int) h, (int) min, (float) seconds

Source code in chelombus/utils/helper_functions.py
def format_time(seconds):
    """Simpel script to transform seconds into readable format

    Args:
        seconds (int): Seconds 

    Returns:
        str: (int) h, (int) min, (float) seconds 
    """
    hours, rem = divmod(seconds, 3600)
    minutes, seconds = divmod(rem, 60)
    return f"{int(hours)} h; {int(minutes)}min; {seconds:.2f} s" 

save_chunk(fp_chunk, output_dir, chunk_index, file_format='npy', name='fingerprints_chunk', **kwargs)

Save a chunk of fingerprint data to a file in the specified format.

Parameters:

Name Type Description Default
fp_chunk ndarray

The fingerprint array chunk (each row corresponds to a fingerprint).

required
output_dir str

Directory path where the fingerprint chunk will be saved.

required
chunk_index int

The index number for the current chunk. This is used to generate a unique file name.

required
file_format str

Format to save the data. Options are: - 'npy': Save as a NumPy binary file. - 'parquet': Save as an Apache Parquet file. Default is 'npy'.

'npy'
name str

Name prefix for each the file to be generated.

'fingerprints_chunk'
**kwargs

Additional keyword arguments to pass to the saving function. For 'npy', kwargs are passed to np.save. For 'parquet', kwargs are passed to DataFrame.to_parquet.

{}

Returns:

Name Type Description
str str

The full path of the saved file.

Source code in chelombus/utils/helper_functions.py
def save_chunk(fp_chunk: np.ndarray | pd.DataFrame, output_dir: str, chunk_index: int,
                  file_format: str = 'npy', name:str="fingerprints_chunk", **kwargs) -> str:
    """
    Save a chunk of fingerprint data to a file in the specified format.

    Parameters:
        fp_chunk (np.ndarray): The fingerprint array chunk (each row corresponds to a fingerprint).
        output_dir (str): Directory path where the fingerprint chunk will be saved.
        chunk_index (int): The index number for the current chunk. This is used to generate a unique file name.
        file_format (str): Format to save the data. Options are:
                           - 'npy': Save as a NumPy binary file.
                           - 'parquet': Save as an Apache Parquet file.
                           Default is 'npy'.
        name (str): Name prefix for each the file to be generated. 
        **kwargs: Additional keyword arguments to pass to the saving function.
                  For 'npy', kwargs are passed to `np.save`.
                  For 'parquet', kwargs are passed to `DataFrame.to_parquet`.

    Returns:
        str: The full path of the saved file.
    """
    # Ensure the output directory exists.
    os.makedirs(output_dir, exist_ok=True)

    if file_format.lower() == 'npy':
        filename = os.path.join(output_dir, f"{name}_{chunk_index:05d}.npy")
        np.save(filename, fp_chunk, **kwargs)
        del fp_chunk
    elif file_format.lower() == 'parquet':
        filename = os.path.join(output_dir, f"{name}_{chunk_index:05d}.parquet")
        # Each row is a fingerprint and each column is a bit.
        df = pd.DataFrame(fp_chunk)
        del fp_chunk
        df.to_parquet(filename, **kwargs)
        del df 
    else:
        raise ValueError("Unsupported file format. Please choose 'npy' or 'parquet'.")
    return filename