Skip to content

API Reference

Core Classes

DataStreamer

chelombus.streamer.data_streamer.DataStreamer

Class for streaming large datasets in manageable chunks.

Source code in chelombus/streamer/data_streamer.py
class DataStreamer:
    """
    Class for streaming large datasets in manageable chunks.
    """
    def parse_input(self, input_path: str, chunksize: Optional[int] = None, verbose: int = 0, smiles_col: int = 0) -> Iterator[List[str]]:
        """
        Reads input data from a file or a directory of files and yields the data in chunks.

        This method processes each file provided by the input path, which can be either a single file
        or a directory containing multiple files. Lines in plain-text files are split and the column at
        `smiles_col` is used as the SMILES string. SDF/SD files are handled in a streaming fashion via
        RDKit and converted to SMILES on the fly. For each file, the extracted SMILES strings are placed
        in a buffer until it reaches `chunksize`, at which point the buffer is yielded and cleared. If
        `chunksize` is None or there are remaining lines after processing a file, those are yielded as a
        final chunk.

        Args:
            input_path (str): The path to a file or directory containing input files.
            chunksize (Optional[int]): The number of lines to include in each yielded chunk. If None, 
                the entire file content is yielded as a single chunk.
            verbose (int): Level of verbosity. Default is 0.
            smiles_col (int): Column index for text inputs in which SMILES appear among several columns.
                Ignored for SDF inputs where SMILES strings are generated from molecule blocks.

        Yields:
            List[str]: A list of lines from the input file(s), where the list length will be equal to 
            `chunksize` except possibly for the final chunk.
        """ 
        # Sanity check
        if not os.path.exists(input_path):
            raise FileNotFoundError(f"Data source {input_path} not found.")

        buffer: List[str] = []

        def _flush_buffer() -> Iterator[List[str]]:
            if buffer:
                yield buffer[:]
                buffer.clear()

        for file_path in _process_input(input_paths=input_path):
            if verbose == 1:
                print("Processing file:", file_path)

            file_ext = os.path.splitext(file_path)[1].lower()
            if file_ext in {".sdf", ".sd"}:
                records = self._stream_sdf(file_path)
            else:
                records = self._stream_text(file_path, smiles_col)

            for smiles in records:
                buffer.append(smiles)
                if chunksize is not None and len(buffer) == int(chunksize):
                    yield buffer[:]
                    buffer.clear()

            # Process remaining items in the buffer for the current file
            yield from _flush_buffer()

    def _stream_text(self, file_path: str, smiles_col: int) -> Iterator[str]:
        with open(file_path, "r") as file:
            for line in file:
                try:
                    yield line.split()[smiles_col]
                except Exception as e:
                    print(f"\nAn exception occured with line: {line}. Raised Error: {e}\n")

    def _stream_sdf(self, file_path: str) -> Iterator[str]:
        try:
            from rdkit import Chem
        except ImportError as exc:
            raise ImportError(
                "RDKit is required to read SDF files. Please install rdkit to stream SDF inputs."
            ) from exc

        # ForwardSDMolSupplier streams molecules without loading the full file in memory
        with open(file_path, "rb") as file_handle:
            supplier = Chem.ForwardSDMolSupplier(file_handle)
            for mol in supplier:
                if mol is None:
                    continue
                try:
                    yield Chem.MolToSmiles(mol)
                except Exception as e:
                    print(f"\nFailed to convert molecule in {file_path} to SMILES: {e}\n")

parse_input(input_path, chunksize=None, verbose=0, smiles_col=0)

Reads input data from a file or a directory of files and yields the data in chunks.

This method processes each file provided by the input path, which can be either a single file or a directory containing multiple files. Lines in plain-text files are split and the column at smiles_col is used as the SMILES string. SDF/SD files are handled in a streaming fashion via RDKit and converted to SMILES on the fly. For each file, the extracted SMILES strings are placed in a buffer until it reaches chunksize, at which point the buffer is yielded and cleared. If chunksize is None or there are remaining lines after processing a file, those are yielded as a final chunk.

Parameters:

Name Type Description Default
input_path str

The path to a file or directory containing input files.

required
chunksize Optional[int]

The number of lines to include in each yielded chunk. If None, the entire file content is yielded as a single chunk.

None
verbose int

Level of verbosity. Default is 0.

0
smiles_col int

Column index for text inputs in which SMILES appear among several columns. Ignored for SDF inputs where SMILES strings are generated from molecule blocks.

0

Yields:

Type Description
List[str]

List[str]: A list of lines from the input file(s), where the list length will be equal to

List[str]

chunksize except possibly for the final chunk.

Source code in chelombus/streamer/data_streamer.py
def parse_input(self, input_path: str, chunksize: Optional[int] = None, verbose: int = 0, smiles_col: int = 0) -> Iterator[List[str]]:
    """
    Reads input data from a file or a directory of files and yields the data in chunks.

    This method processes each file provided by the input path, which can be either a single file
    or a directory containing multiple files. Lines in plain-text files are split and the column at
    `smiles_col` is used as the SMILES string. SDF/SD files are handled in a streaming fashion via
    RDKit and converted to SMILES on the fly. For each file, the extracted SMILES strings are placed
    in a buffer until it reaches `chunksize`, at which point the buffer is yielded and cleared. If
    `chunksize` is None or there are remaining lines after processing a file, those are yielded as a
    final chunk.

    Args:
        input_path (str): The path to a file or directory containing input files.
        chunksize (Optional[int]): The number of lines to include in each yielded chunk. If None, 
            the entire file content is yielded as a single chunk.
        verbose (int): Level of verbosity. Default is 0.
        smiles_col (int): Column index for text inputs in which SMILES appear among several columns.
            Ignored for SDF inputs where SMILES strings are generated from molecule blocks.

    Yields:
        List[str]: A list of lines from the input file(s), where the list length will be equal to 
        `chunksize` except possibly for the final chunk.
    """ 
    # Sanity check
    if not os.path.exists(input_path):
        raise FileNotFoundError(f"Data source {input_path} not found.")

    buffer: List[str] = []

    def _flush_buffer() -> Iterator[List[str]]:
        if buffer:
            yield buffer[:]
            buffer.clear()

    for file_path in _process_input(input_paths=input_path):
        if verbose == 1:
            print("Processing file:", file_path)

        file_ext = os.path.splitext(file_path)[1].lower()
        if file_ext in {".sdf", ".sd"}:
            records = self._stream_sdf(file_path)
        else:
            records = self._stream_text(file_path, smiles_col)

        for smiles in records:
            buffer.append(smiles)
            if chunksize is not None and len(buffer) == int(chunksize):
                yield buffer[:]
                buffer.clear()

        # Process remaining items in the buffer for the current file
        yield from _flush_buffer()

FingerprintCalculator

chelombus.utils.fingerprints.FingerprintCalculator

A class to compute molecular fingerprints from a list of SMILES strings. Currently, only the 'morgan' and 'mqn' fingerprints type are supported.

Source code in chelombus/utils/fingerprints.py
class FingerprintCalculator:
    """
    A class to compute molecular fingerprints from a list of SMILES strings.
    Currently, only the 'morgan' and 'mqn' fingerprints type are supported.
    """
    def __init__(self):

        # Map fingerprint types to functions
        self.fingerprint_function_map = {
            #TODO: Support for other fingerprints
            'morgan': _calculate_morgan_fp,
            'mqn': _calculate_mqn_fp,
        }

    def FingerprintFromSmiles(self, smiles:List | str, fp:str, nprocesses:int | None = os.cpu_count(), **params) -> npt.NDArray:
        """
        Generate fingerprints for a list of SMILES strings in parallel.

        The method selects the appropriate fingerprint function based on the 'fp' parameter,
        binds additional keyword parameters using functools.partial, and then applies the function 
        across the SMILES list using multiprocessing.Pool.map.

        Args:
            smiles_list (list): A list of SMILES strings.
            fp (str): The fingerprint type to compute (e.g., 'morgan').
            nprocesses (int): Number of processes for multithreaded fingerprint calculation.  Default to cpu cores 
            **params: Additional keyword parameters for the fingerprint function.
                For 'morgan', required keys are:
                    - fpSize (int): Number of bits in the fingerprint.
                    - radius (int): Radius for the Morgan fingerprint.

        Returns:
            npt.NDArray: A NumPy array of fingerprints with shape (number of SMILES, fpSize).

        Raises:
            ValueError: If an unsupported fingerprint type is requested.
        """
        func = self.fingerprint_function_map.get(fp)
        if func is None:
            raise ValueError(f"Unsupported fingerprint type: '{fp}'")

        # Bind the additional parameters to the selected function.
        part_func = partial(func, **params)

        if isinstance(smiles, str):
            smiles = [smiles]
        try: 
            with Pool(processes=nprocesses) as pool:
                fingerprints = pool.map(part_func, smiles)
        finally: 
            pool.close()
            pool.join()

        fingerprints = [fp for fp in fingerprints if fp is not None]

        # Free memory
        del smiles
        return np.array(fingerprints)

FingerprintFromSmiles(smiles, fp, nprocesses=os.cpu_count(), **params)

Generate fingerprints for a list of SMILES strings in parallel.

The method selects the appropriate fingerprint function based on the 'fp' parameter, binds additional keyword parameters using functools.partial, and then applies the function across the SMILES list using multiprocessing.Pool.map.

Parameters:

Name Type Description Default
smiles_list list

A list of SMILES strings.

required
fp str

The fingerprint type to compute (e.g., 'morgan').

required
nprocesses int

Number of processes for multithreaded fingerprint calculation. Default to cpu cores

cpu_count()
**params

Additional keyword parameters for the fingerprint function. For 'morgan', required keys are: - fpSize (int): Number of bits in the fingerprint. - radius (int): Radius for the Morgan fingerprint.

{}

Returns:

Type Description
NDArray

npt.NDArray: A NumPy array of fingerprints with shape (number of SMILES, fpSize).

Raises:

Type Description
ValueError

If an unsupported fingerprint type is requested.

Source code in chelombus/utils/fingerprints.py
def FingerprintFromSmiles(self, smiles:List | str, fp:str, nprocesses:int | None = os.cpu_count(), **params) -> npt.NDArray:
    """
    Generate fingerprints for a list of SMILES strings in parallel.

    The method selects the appropriate fingerprint function based on the 'fp' parameter,
    binds additional keyword parameters using functools.partial, and then applies the function 
    across the SMILES list using multiprocessing.Pool.map.

    Args:
        smiles_list (list): A list of SMILES strings.
        fp (str): The fingerprint type to compute (e.g., 'morgan').
        nprocesses (int): Number of processes for multithreaded fingerprint calculation.  Default to cpu cores 
        **params: Additional keyword parameters for the fingerprint function.
            For 'morgan', required keys are:
                - fpSize (int): Number of bits in the fingerprint.
                - radius (int): Radius for the Morgan fingerprint.

    Returns:
        npt.NDArray: A NumPy array of fingerprints with shape (number of SMILES, fpSize).

    Raises:
        ValueError: If an unsupported fingerprint type is requested.
    """
    func = self.fingerprint_function_map.get(fp)
    if func is None:
        raise ValueError(f"Unsupported fingerprint type: '{fp}'")

    # Bind the additional parameters to the selected function.
    part_func = partial(func, **params)

    if isinstance(smiles, str):
        smiles = [smiles]
    try: 
        with Pool(processes=nprocesses) as pool:
            fingerprints = pool.map(part_func, smiles)
    finally: 
        pool.close()
        pool.join()

    fingerprints = [fp for fp in fingerprints if fp is not None]

    # Free memory
    del smiles
    return np.array(fingerprints)

PQEncoder

chelombus.encoder.encoder.PQEncoder

Bases: PQEncoderBase

Class to encode high-dimensional vectors into PQ-codes.

Source code in chelombus/encoder/encoder.py
class PQEncoder(PQEncoderBase):
    """
    Class to encode high-dimensional vectors into PQ-codes.
    """
    def __init__(self, k:int=256, m:int=8, iterations=20):
        """ Initializes the encoder with trained sub-block centroids.

        Args:
            k (int): Number of centroids. Default is 256. We assume that all subquantizers 
            have the same finit number (k') of reproduction values. So k = (k')^m 
            m (int): Number of distinct subvectors of the input X vector. 
            m is a subvector of dimension D' = D/m where D is the 
            dimension of the input vector X. D is therefore a multiple of m
            iterations (int): Number of iterations for the k-means

        High values of k increase the computational cost of the quantizer as well as memory 
        usage of strogin the centroids (k' x D floating points). Using k=256 and m=8 is often
        a reasonable choice. 

        Reference: DOI: 10.1109/TPAMI.2010.57
        """

        self.m = m 
        self.k = k 
        self.iterations = iterations
        self.encoder_is_trained = False
        self.codebook_dtype =  np.uint8 if self.k <= 2**8 else (np.uint16 if self.k<= 2**16 else np.uint32)
        self.pq_trained = []

        """ The codebook is defined as the Cartesian product of all centroids. 
        Storing the codebook C explicitly is not efficient, the full codebook would be (k')^m  centroids
        Instead we store the K' centroids of all the subquantizers (k' · m). We can later simulate the full 
        codebook by combining the centroids from each subquantizer. 
        """
    @property
    def is_trained(self) -> bool: return self.encoder_is_trained

    def fit(self, X_train:NDArray, verbose:int=1, **kwargs)->None:
        """ KMeans fitting of every subvector matrix from the X_train matrix. Populates 
        the codebook by storing the cluster centers of every subvector

        X_train is the input matrix. For a vector that has dimension D  
        then X_train is a matrix of size (N, D)
        where N is the number of rows (vectors) and D the number of columns (dimension of
        every vector i.e. fingerprint in the case of molecular data)

        Args:
           X_train(np.array): Input matrix to train the encoder.  
           verbose(int): Level of verbosity. Default is 1
           **kwargs: Optional keyword arguments passed to the underlying KMeans `fit()` function.
        """

        assert X_train.ndim == 2, "The input can only be a matrix (X.ndim == 2)"
        N, D = X_train.shape # N number of input vectors, D dimension of the vectors
        assert self.k < N, "the number of training vectors (N for N,D = X_train.shape) should be more than the number of centroids (K)"
        assert D % self.m == 0, f"Vector (fingeprint) dimension should be divisible by the number of subvectors (m). Got {D} / {self.m}" 
        self.D_subvector = int(D / self.m) # Dimension of the subvector. 
        self.og_D = D # We save the original dimensions of the input vector (fingerprint) for later use
        assert self.encoder_is_trained == False, "Encoder can only be fitted once"

        self.codewords= np.zeros((self.m, self.k, self.D_subvector), dtype=float)

        iterable = range(self.m)
        if verbose > 0: 
            iterable = tqdm(iterable, desc='Training PQ-encoder', total=self.m)    

        # Divide the input vector into m subvectors 
        subvector_dim = int(D / self.m) 

        for subvector_idx in iterable:
            X_train_subvector = X_train[:, subvector_dim * subvector_idx : subvector_dim * (subvector_idx + 1)] 
            # For every subvector, run KMeans and store the centroids in the codebook 
            kmeans = KMeans(n_clusters=self.k, init='k-means++', max_iter=self.iterations, **kwargs).fit(X_train_subvector)
            self.pq_trained.append(kmeans)

            # Results for training 1M 1024 dimensional fingerprints were: 5 min 58 s, with k=256, m=4, iterations=200, this is much faster than using scipy
            # Store the cluster_centers coordinates in the codebook
            self.codewords[subvector_idx] = kmeans.cluster_centers_ # Store the cluster_centers coordinates in the codebook

        self.encoder_is_trained = True
        del X_train # remove initial training data from memory


    def transform(self, X:NDArray, verbose:int=1, **kwargs) -> NDArray:
        """
        Transforms the input matrix X into its PQ-codes.

        For each sample in X, the input vector is split into `m` equal-sized subvectors. 
        Each subvector is assigned to the nearest cluster centroid
        and the index of the closest centroid is stored. 

        The result is a compact representation of X, where each sample is encoded as a sequence of centroid indices.

        Args:
            X (np.ndarray): Input data matrix of shape (n_samples, n_features), 
                            where n_features must be divisible by the number of subvectors `m`.
            verbose(int): Level of verbosity. Default is 1
            **kwargs: Optional keyword arguments passed to the underlying KMeans `predict()` function.

        Returns:
            np.ndarray: PQ codes of shape (n_samples, m), where each element is the index of the nearest centroid 
                        for the corresponding subvector.
        """

        assert self.encoder_is_trained == True, "PQEncoder must be trained before calling transform"

        N, D = X.shape
        # Store the index of the Nearest centroid for each subvector
        pq_codes = np.zeros((N, self.m), dtype=self.codebook_dtype)

        iterable = range(self.m)
        # If our original vector is 1024 and our m (splits) is 8 then each subvector will be of dim= 1024/8 = 128
        if verbose > 0: 
            iterable = tqdm(iterable, desc='Generating PQ-codes', total=self.m)

        subvector_dim = int(D / self.m)

        for subvector_idx in iterable:
            X_train_subvector = X[:, subvector_dim * subvector_idx : subvector_dim * (subvector_idx + 1)] 
            # For every subvector, run KMeans.predict(). Then look in the codebook for the index of the cluster that is closest
            # Appends the centroid index to the pq_code.  
            pq_codes[:, subvector_idx] =  self.pq_trained[subvector_idx].predict(X_train_subvector, **kwargs)

        # Free memory 
        del  X

        # Return pq_codes (labels of the centroids for every subvector from the X_test data)
        return pq_codes

    def fit_transform(self, X:NDArray, verbose:int=1, **kwargs) -> NDArray:
        """Fit and transforms the input matrix `X` into its PQ-codes

        The encoder is trained on the matrix and then for each sample in X,
          the input vector is split into `m` equal-sized vectors subvectors composed 
          byt the index of the closest centroid. Returns a compact representation of X, 
          where each sample is encoded as a sequence of centroid indices (i.e PQcodes)

        Args:
            X (np.array): Input data matrix of shape (n_samples, n_features)
            verbose (int, optional): Level of verbosity. Defaults to 1.
            **kwargs: Optional keyword. These arguments will be passed to the underlying KMeans
            predict() function. 

        Returns:
            np.array: PQ codes of shape (n_samples, m), where each element is the index of the nearest
            centroid for the corresponding subvector
        """

        self.fit(X, verbose, **kwargs)
        return self.transform(X, verbose)


    def inverse_transform(self, 
                          pq_codes:NDArray, 
                          binary=False, 
                          round=True):
        """ Inverse transform. From PQ-code to the original vector. 
        This process is lossy so we don't expect to get the exact same data.
        If binary=True then the vectors will be returned in binary. 
        This is useful for the case where our original vectors were binary. 
        With binary=True then the returned vectors are transformed from 
        [0.32134, 0.8232, 0.0132, ... 0.1432, 1.19234] to 
        [0, 1, 0, ..., 0, 1]
        If round=True then the vector will be approximated to integer values
        (useful in cases where we expeect to have a count-based fingerprint)

        Args:
            pq_code: (np.array): Input data of PQ codes to be transformed into the
            original vectors.
            binary: (bool): Wheter to return the vectors rounded to 0s and 1s. Default is False
            round: (bool): Round inversed vector to integers values. Default is True  
        """

        # Get shape of the input matrix of PQ codes
        N, D = pq_codes.shape

        # The dimension of the PQ vectors should be the same 
        # as the number of splits (subvectors) from the original data 

        assert D == (self.m), f"The dimension D of the PQ-codes (N,D) should be the same as the number of the subvectors or splits (m) . Got D = {D} for m = {self.m}"
        assert D == (self.og_D  / self.D_subvector), f"The dimension D of the PQ-codes (N,D) should be the same as the original vector dimension divided the subvector dimension"

        X_inversed = np.empty((N, D*self.D_subvector), dtype=float)
        for subvector_idx in range(self.m):
            X_inversed[:, subvector_idx*self.D_subvector:((subvector_idx+1)*self.D_subvector)] = self.codewords[subvector_idx][pq_codes[:, subvector_idx], :]

        if binary:
            reconstructed_binary = (X_inversed>= 0.6).astype('int8')
            return reconstructed_binary 

        if round: return X_inversed.astype(int)
        else: return X_inversed


    def save(self, path: str | Path) -> None:
        path = Path(path)
        payload = {
            "version": 1,
            "init": {"k": self.k, "m": self.m, "iterations": self.iterations},
            "state": {
                "encoder_is_trained": self.encoder_is_trained,
                "og_D": getattr(self, "og_D", None),
                "D_subvector": getattr(self, "D_subvector", None),
                "codewords": getattr(self, "codewords", None),
                "pq_trained": self.pq_trained,  # optional
            },
        }
        joblib.dump(payload, path)

    @classmethod
    def load(cls, path: str | Path) -> "PQEncoder":
        path = Path(path)
        payload = joblib.load(path)

        if isinstance(payload, cls): return payload

        obj = cls(**payload["init"])
        state = payload["state"]
        obj.encoder_is_trained = state["encoder_is_trained"]
        obj.og_D = state["og_D"]
        obj.D_subvector = state["D_subvector"]
        obj.codewords = state["codewords"]
        obj.pq_trained = state.get("pq_trained", [])
        return obj

pq_trained = [] instance-attribute

The codebook is defined as the Cartesian product of all centroids. Storing the codebook C explicitly is not efficient, the full codebook would be (k')^m centroids Instead we store the K' centroids of all the subquantizers (k' · m). We can later simulate the full codebook by combining the centroids from each subquantizer.

__init__(k=256, m=8, iterations=20)

Initializes the encoder with trained sub-block centroids.

Parameters:

Name Type Description Default
k int

Number of centroids. Default is 256. We assume that all subquantizers

256
m int

Number of distinct subvectors of the input X vector.

8
iterations int

Number of iterations for the k-means

20

High values of k increase the computational cost of the quantizer as well as memory usage of strogin the centroids (k' x D floating points). Using k=256 and m=8 is often a reasonable choice.

Reference: DOI: 10.1109/TPAMI.2010.57

Source code in chelombus/encoder/encoder.py
def __init__(self, k:int=256, m:int=8, iterations=20):
    """ Initializes the encoder with trained sub-block centroids.

    Args:
        k (int): Number of centroids. Default is 256. We assume that all subquantizers 
        have the same finit number (k') of reproduction values. So k = (k')^m 
        m (int): Number of distinct subvectors of the input X vector. 
        m is a subvector of dimension D' = D/m where D is the 
        dimension of the input vector X. D is therefore a multiple of m
        iterations (int): Number of iterations for the k-means

    High values of k increase the computational cost of the quantizer as well as memory 
    usage of strogin the centroids (k' x D floating points). Using k=256 and m=8 is often
    a reasonable choice. 

    Reference: DOI: 10.1109/TPAMI.2010.57
    """

    self.m = m 
    self.k = k 
    self.iterations = iterations
    self.encoder_is_trained = False
    self.codebook_dtype =  np.uint8 if self.k <= 2**8 else (np.uint16 if self.k<= 2**16 else np.uint32)
    self.pq_trained = []

    """ The codebook is defined as the Cartesian product of all centroids. 
    Storing the codebook C explicitly is not efficient, the full codebook would be (k')^m  centroids
    Instead we store the K' centroids of all the subquantizers (k' · m). We can later simulate the full 
    codebook by combining the centroids from each subquantizer. 
    """

fit(X_train, verbose=1, **kwargs)

KMeans fitting of every subvector matrix from the X_train matrix. Populates the codebook by storing the cluster centers of every subvector

X_train is the input matrix. For a vector that has dimension D
then X_train is a matrix of size (N, D) where N is the number of rows (vectors) and D the number of columns (dimension of every vector i.e. fingerprint in the case of molecular data)

Parameters:

Name Type Description Default
X_train array

Input matrix to train the encoder.

required
verbose int

Level of verbosity. Default is 1

1
**kwargs

Optional keyword arguments passed to the underlying KMeans fit() function.

{}
Source code in chelombus/encoder/encoder.py
def fit(self, X_train:NDArray, verbose:int=1, **kwargs)->None:
    """ KMeans fitting of every subvector matrix from the X_train matrix. Populates 
    the codebook by storing the cluster centers of every subvector

    X_train is the input matrix. For a vector that has dimension D  
    then X_train is a matrix of size (N, D)
    where N is the number of rows (vectors) and D the number of columns (dimension of
    every vector i.e. fingerprint in the case of molecular data)

    Args:
       X_train(np.array): Input matrix to train the encoder.  
       verbose(int): Level of verbosity. Default is 1
       **kwargs: Optional keyword arguments passed to the underlying KMeans `fit()` function.
    """

    assert X_train.ndim == 2, "The input can only be a matrix (X.ndim == 2)"
    N, D = X_train.shape # N number of input vectors, D dimension of the vectors
    assert self.k < N, "the number of training vectors (N for N,D = X_train.shape) should be more than the number of centroids (K)"
    assert D % self.m == 0, f"Vector (fingeprint) dimension should be divisible by the number of subvectors (m). Got {D} / {self.m}" 
    self.D_subvector = int(D / self.m) # Dimension of the subvector. 
    self.og_D = D # We save the original dimensions of the input vector (fingerprint) for later use
    assert self.encoder_is_trained == False, "Encoder can only be fitted once"

    self.codewords= np.zeros((self.m, self.k, self.D_subvector), dtype=float)

    iterable = range(self.m)
    if verbose > 0: 
        iterable = tqdm(iterable, desc='Training PQ-encoder', total=self.m)    

    # Divide the input vector into m subvectors 
    subvector_dim = int(D / self.m) 

    for subvector_idx in iterable:
        X_train_subvector = X_train[:, subvector_dim * subvector_idx : subvector_dim * (subvector_idx + 1)] 
        # For every subvector, run KMeans and store the centroids in the codebook 
        kmeans = KMeans(n_clusters=self.k, init='k-means++', max_iter=self.iterations, **kwargs).fit(X_train_subvector)
        self.pq_trained.append(kmeans)

        # Results for training 1M 1024 dimensional fingerprints were: 5 min 58 s, with k=256, m=4, iterations=200, this is much faster than using scipy
        # Store the cluster_centers coordinates in the codebook
        self.codewords[subvector_idx] = kmeans.cluster_centers_ # Store the cluster_centers coordinates in the codebook

    self.encoder_is_trained = True
    del X_train # remove initial training data from memory

transform(X, verbose=1, **kwargs)

Transforms the input matrix X into its PQ-codes.

For each sample in X, the input vector is split into m equal-sized subvectors. Each subvector is assigned to the nearest cluster centroid and the index of the closest centroid is stored.

The result is a compact representation of X, where each sample is encoded as a sequence of centroid indices.

Parameters:

Name Type Description Default
X ndarray

Input data matrix of shape (n_samples, n_features), where n_features must be divisible by the number of subvectors m.

required
verbose int

Level of verbosity. Default is 1

1
**kwargs

Optional keyword arguments passed to the underlying KMeans predict() function.

{}

Returns:

Type Description
NDArray

np.ndarray: PQ codes of shape (n_samples, m), where each element is the index of the nearest centroid for the corresponding subvector.

Source code in chelombus/encoder/encoder.py
def transform(self, X:NDArray, verbose:int=1, **kwargs) -> NDArray:
    """
    Transforms the input matrix X into its PQ-codes.

    For each sample in X, the input vector is split into `m` equal-sized subvectors. 
    Each subvector is assigned to the nearest cluster centroid
    and the index of the closest centroid is stored. 

    The result is a compact representation of X, where each sample is encoded as a sequence of centroid indices.

    Args:
        X (np.ndarray): Input data matrix of shape (n_samples, n_features), 
                        where n_features must be divisible by the number of subvectors `m`.
        verbose(int): Level of verbosity. Default is 1
        **kwargs: Optional keyword arguments passed to the underlying KMeans `predict()` function.

    Returns:
        np.ndarray: PQ codes of shape (n_samples, m), where each element is the index of the nearest centroid 
                    for the corresponding subvector.
    """

    assert self.encoder_is_trained == True, "PQEncoder must be trained before calling transform"

    N, D = X.shape
    # Store the index of the Nearest centroid for each subvector
    pq_codes = np.zeros((N, self.m), dtype=self.codebook_dtype)

    iterable = range(self.m)
    # If our original vector is 1024 and our m (splits) is 8 then each subvector will be of dim= 1024/8 = 128
    if verbose > 0: 
        iterable = tqdm(iterable, desc='Generating PQ-codes', total=self.m)

    subvector_dim = int(D / self.m)

    for subvector_idx in iterable:
        X_train_subvector = X[:, subvector_dim * subvector_idx : subvector_dim * (subvector_idx + 1)] 
        # For every subvector, run KMeans.predict(). Then look in the codebook for the index of the cluster that is closest
        # Appends the centroid index to the pq_code.  
        pq_codes[:, subvector_idx] =  self.pq_trained[subvector_idx].predict(X_train_subvector, **kwargs)

    # Free memory 
    del  X

    # Return pq_codes (labels of the centroids for every subvector from the X_test data)
    return pq_codes

fit_transform(X, verbose=1, **kwargs)

Fit and transforms the input matrix X into its PQ-codes

The encoder is trained on the matrix and then for each sample in X, the input vector is split into m equal-sized vectors subvectors composed byt the index of the closest centroid. Returns a compact representation of X, where each sample is encoded as a sequence of centroid indices (i.e PQcodes)

Parameters:

Name Type Description Default
X array

Input data matrix of shape (n_samples, n_features)

required
verbose int

Level of verbosity. Defaults to 1.

1
**kwargs

Optional keyword. These arguments will be passed to the underlying KMeans

{}

Returns:

Type Description
NDArray

np.array: PQ codes of shape (n_samples, m), where each element is the index of the nearest

NDArray

centroid for the corresponding subvector

Source code in chelombus/encoder/encoder.py
def fit_transform(self, X:NDArray, verbose:int=1, **kwargs) -> NDArray:
    """Fit and transforms the input matrix `X` into its PQ-codes

    The encoder is trained on the matrix and then for each sample in X,
      the input vector is split into `m` equal-sized vectors subvectors composed 
      byt the index of the closest centroid. Returns a compact representation of X, 
      where each sample is encoded as a sequence of centroid indices (i.e PQcodes)

    Args:
        X (np.array): Input data matrix of shape (n_samples, n_features)
        verbose (int, optional): Level of verbosity. Defaults to 1.
        **kwargs: Optional keyword. These arguments will be passed to the underlying KMeans
        predict() function. 

    Returns:
        np.array: PQ codes of shape (n_samples, m), where each element is the index of the nearest
        centroid for the corresponding subvector
    """

    self.fit(X, verbose, **kwargs)
    return self.transform(X, verbose)

inverse_transform(pq_codes, binary=False, round=True)

Inverse transform. From PQ-code to the original vector. This process is lossy so we don't expect to get the exact same data. If binary=True then the vectors will be returned in binary. This is useful for the case where our original vectors were binary. With binary=True then the returned vectors are transformed from [0.32134, 0.8232, 0.0132, ... 0.1432, 1.19234] to [0, 1, 0, ..., 0, 1] If round=True then the vector will be approximated to integer values (useful in cases where we expeect to have a count-based fingerprint)

Parameters:

Name Type Description Default
pq_code

(np.array): Input data of PQ codes to be transformed into the

required
binary

(bool): Wheter to return the vectors rounded to 0s and 1s. Default is False

False
round

(bool): Round inversed vector to integers values. Default is True

True
Source code in chelombus/encoder/encoder.py
def inverse_transform(self, 
                      pq_codes:NDArray, 
                      binary=False, 
                      round=True):
    """ Inverse transform. From PQ-code to the original vector. 
    This process is lossy so we don't expect to get the exact same data.
    If binary=True then the vectors will be returned in binary. 
    This is useful for the case where our original vectors were binary. 
    With binary=True then the returned vectors are transformed from 
    [0.32134, 0.8232, 0.0132, ... 0.1432, 1.19234] to 
    [0, 1, 0, ..., 0, 1]
    If round=True then the vector will be approximated to integer values
    (useful in cases where we expeect to have a count-based fingerprint)

    Args:
        pq_code: (np.array): Input data of PQ codes to be transformed into the
        original vectors.
        binary: (bool): Wheter to return the vectors rounded to 0s and 1s. Default is False
        round: (bool): Round inversed vector to integers values. Default is True  
    """

    # Get shape of the input matrix of PQ codes
    N, D = pq_codes.shape

    # The dimension of the PQ vectors should be the same 
    # as the number of splits (subvectors) from the original data 

    assert D == (self.m), f"The dimension D of the PQ-codes (N,D) should be the same as the number of the subvectors or splits (m) . Got D = {D} for m = {self.m}"
    assert D == (self.og_D  / self.D_subvector), f"The dimension D of the PQ-codes (N,D) should be the same as the original vector dimension divided the subvector dimension"

    X_inversed = np.empty((N, D*self.D_subvector), dtype=float)
    for subvector_idx in range(self.m):
        X_inversed[:, subvector_idx*self.D_subvector:((subvector_idx+1)*self.D_subvector)] = self.codewords[subvector_idx][pq_codes[:, subvector_idx], :]

    if binary:
        reconstructed_binary = (X_inversed>= 0.6).astype('int8')
        return reconstructed_binary 

    if round: return X_inversed.astype(int)
    else: return X_inversed

PQKMeans

chelombus.clustering.PyQKmeans.PQKMeans

This class provides a scikit-learn-like interface to the PQk-means algorithm, which operates directly on PQ codes using symmetric distance.

Parameters:

Name Type Description Default
encoder PQEncoder

A trained PQEncoder instance

required
k int

Number of clusters

required
iteration int

Number of k-means iterations (default: 20)

20
verbose bool

Whether to print progress information (default: False)

False
Example

encoder = PQEncoder(k=256, m=6) encoder.fit(training_data) pq_codes = encoder.transform(data) clusterer = PQKMeans(encoder, k=100000) labels = clusterer.fit_predict(pq_codes)

Source code in chelombus/clustering/PyQKmeans.py
class PQKMeans:
    """
    This class provides a scikit-learn-like interface to the PQk-means algorithm,
    which operates directly on PQ codes using symmetric distance.

    Args:
        encoder: A trained PQEncoder instance
        k: Number of clusters
        iteration: Number of k-means iterations (default: 20)
        verbose: Whether to print progress information (default: False)

    Example:
        >>> encoder = PQEncoder(k=256, m=6)
        >>> encoder.fit(training_data)
        >>> pq_codes = encoder.transform(data)
        >>> clusterer = PQKMeans(encoder, k=100000)
        >>> labels = clusterer.fit_predict(pq_codes)
    """

    def __init__(
        self,
        encoder: PQEncoder,
        k: int,
        iteration: int = 20,
        verbose: bool = False
    ):
        if not encoder.encoder_is_trained: # type: ignore
            raise ValueError("Encoder must be trained before clustering")

        self.encoder = encoder
        self.k = k
        self.iteration = iteration
        self.verbose = verbose
        self.trained = False
        self._dtables = None
        self._centers_u8 = None
        self._cluster = pqkmeans.clustering.PQKMeans(
            encoder=encoder,
            k=k,
            verbose=verbose
        )

    @property
    def cluster_centers_(self) -> np.ndarray:
        """Get cluster centers (PQ codes of shape (k, m))."""
        return np.array(self._cluster.cluster_centers_)

    @cluster_centers_.setter
    def cluster_centers_(self, centers: np.ndarray) -> None:
        """Set cluster centers and mark as trained."""
        centers_uint8 = np.array(centers).astype(np.uint8)
        self._cluster._impl.set_cluster_centers(centers_uint8.tolist())
        self.trained = True

    def fit(self, X_train: np.ndarray) -> 'PQKMeans':
        """Fit the PQk-means model to training PQ codes.

        Args:
            X_train: PQ codes of shape (n_samples, n_subvectors)

        Returns:
            self
        """
        self._cluster.fit(X_train)
        self.trained = True
        self._dtables = None
        self._centers_u8 = None
        return self

    def predict(self, X: np.ndarray) -> np.ndarray:
        """Predict cluster labels for PQ codes.

        Uses Numba JIT-compiled parallel assignment with precomputed
        symmetric distance lookup tables

        Args:
            X: PQ codes of shape (n_samples, n_subvectors), dtype uint8

        Returns:
            Cluster labels of shape (n_samples,)
        """
        if not self.trained: # type: ignore
            raise ValueError("Must be trained before clustering. Use `.fit()` first")
        if self._dtables is None:
            self._dtables = _build_distance_tables(self.encoder.codewords)
            self._centers_u8 = self.cluster_centers_.astype(np.uint8)
        return _predict_numba(np.asarray(X, dtype=np.uint8), self._centers_u8, self._dtables)

    def fit_predict(self, X: np.ndarray) -> np.ndarray:
        """Fit the model and predict cluster labels in one step.

        Args:
            X: PQ codes of shape (n_samples, n_subvectors)

        Returns:
            Cluster labels of shape (n_samples,)
        """
        self.trained = True
        return np.array(self._cluster.fit_predict(X))

    @property
    def is_trained(self) -> bool:
        return bool(self.trained) # type: ignore

    def save(self, path: str | Path) -> None:
        path = Path(path)
        joblib.dump(self, path)

    @classmethod
    def load(cls, path: str | Path) -> "PQKMeans":
        path = Path(path)
        obj = joblib.load(path)
        if not isinstance(obj, cls):
            raise TypeError(f"Expected {cls.__name__}, got {type(obj).__name__}")
        if not hasattr(obj, '_dtables'):
            obj._dtables = None
        return obj

cluster_centers_ property writable

Get cluster centers (PQ codes of shape (k, m)).

fit(X_train)

Fit the PQk-means model to training PQ codes.

Parameters:

Name Type Description Default
X_train ndarray

PQ codes of shape (n_samples, n_subvectors)

required

Returns:

Type Description
PQKMeans

self

Source code in chelombus/clustering/PyQKmeans.py
def fit(self, X_train: np.ndarray) -> 'PQKMeans':
    """Fit the PQk-means model to training PQ codes.

    Args:
        X_train: PQ codes of shape (n_samples, n_subvectors)

    Returns:
        self
    """
    self._cluster.fit(X_train)
    self.trained = True
    self._dtables = None
    self._centers_u8 = None
    return self

predict(X)

Predict cluster labels for PQ codes.

Uses Numba JIT-compiled parallel assignment with precomputed symmetric distance lookup tables

Parameters:

Name Type Description Default
X ndarray

PQ codes of shape (n_samples, n_subvectors), dtype uint8

required

Returns:

Type Description
ndarray

Cluster labels of shape (n_samples,)

Source code in chelombus/clustering/PyQKmeans.py
def predict(self, X: np.ndarray) -> np.ndarray:
    """Predict cluster labels for PQ codes.

    Uses Numba JIT-compiled parallel assignment with precomputed
    symmetric distance lookup tables

    Args:
        X: PQ codes of shape (n_samples, n_subvectors), dtype uint8

    Returns:
        Cluster labels of shape (n_samples,)
    """
    if not self.trained: # type: ignore
        raise ValueError("Must be trained before clustering. Use `.fit()` first")
    if self._dtables is None:
        self._dtables = _build_distance_tables(self.encoder.codewords)
        self._centers_u8 = self.cluster_centers_.astype(np.uint8)
    return _predict_numba(np.asarray(X, dtype=np.uint8), self._centers_u8, self._dtables)

fit_predict(X)

Fit the model and predict cluster labels in one step.

Parameters:

Name Type Description Default
X ndarray

PQ codes of shape (n_samples, n_subvectors)

required

Returns:

Type Description
ndarray

Cluster labels of shape (n_samples,)

Source code in chelombus/clustering/PyQKmeans.py
def fit_predict(self, X: np.ndarray) -> np.ndarray:
    """Fit the model and predict cluster labels in one step.

    Args:
        X: PQ codes of shape (n_samples, n_subvectors)

    Returns:
        Cluster labels of shape (n_samples,)
    """
    self.trained = True
    return np.array(self._cluster.fit_predict(X))

Cluster I/O

chelombus.utils.cluster_io

Cluster I/O utilities using DuckDB for efficient querying.

This module provides functions for querying and exporting clustered molecular data stored in chunked parquet files. It uses DuckDB for efficient SQL-based queries across multiple files without loading everything into memory.

Architecture

During pipeline.transform(), data is saved as chunked parquet files: results/chunk_00001.parquet (smiles, cluster_id) results/chunk_00002.parquet (smiles, cluster_id) ...

These functions query across all chunks using DuckDB's glob pattern support, enabling efficient filtering and export without loading all data into RAM.

Example

from chelombus.utils.cluster_io import query_cluster, export_all_clusters

Get molecules from cluster 42

df = query_cluster('results/', cluster_id=42)

Export all clusters (for HPC/SLURM processing)

stats = export_all_clusters('results/', 'clusters/')

query_cluster(results_dir, cluster_id, cluster_column='cluster_id', columns=None)

Query all molecules from a specific cluster.

Scans all chunked parquet files in results_dir and returns molecules belonging to the specified cluster as a pandas DataFrame.

Parameters:

Name Type Description Default
results_dir Union[str, Path]

Path to directory containing chunked parquet files (output from pipeline.transform()).

required
cluster_id int

The cluster ID to query.

required
cluster_column str

Name of the column containing cluster IDs.

'cluster_id'
columns Optional[List[str]]

Optional list of columns to return. Default returns all columns. Common columns: ['smiles', 'cluster_id'].

None

Returns:

Type Description
'pd.DataFrame'

DataFrame containing all molecules from the specified cluster.

Raises:

Type Description
ImportError

If duckdb is not installed.

FileNotFoundError

If results_dir doesn't exist.

ValueError

If no parquet files found or cluster_id doesn't exist.

Example

df = query_cluster('results/', cluster_id=42) print(f"Cluster 42 has {len(df)} molecules") print(df['smiles'].head())

Source code in chelombus/utils/cluster_io.py
def query_cluster(
    results_dir: Union[str, Path],
    cluster_id: int,
    cluster_column: str = "cluster_id",
    columns: Optional[List[str]] = None,
) -> "pd.DataFrame":
    """Query all molecules from a specific cluster.

    Scans all chunked parquet files in results_dir and returns molecules
    belonging to the specified cluster as a pandas DataFrame.

    Args:
        results_dir: Path to directory containing chunked parquet files
            (output from pipeline.transform()).
        cluster_id: The cluster ID to query.
        cluster_column: Name of the column containing cluster IDs.
        columns: Optional list of columns to return. Default returns all columns.
            Common columns: ['smiles', 'cluster_id'].

    Returns:
        DataFrame containing all molecules from the specified cluster.

    Raises:
        ImportError: If duckdb is not installed.
        FileNotFoundError: If results_dir doesn't exist.
        ValueError: If no parquet files found or cluster_id doesn't exist.

    Example:
        >>> df = query_cluster('results/', cluster_id=42)
        >>> print(f"Cluster 42 has {len(df)} molecules")
        >>> print(df['smiles'].head())
    """
    _check_duckdb_available()
    import duckdb

    path = _validate_results_dir(results_dir)
    glob_pattern = str(path / "*.parquet")
    quoted_cluster_column = _quote_identifier(cluster_column)

    # Build column selection
    if columns:
        col_str = ", ".join(_quote_identifier(col) for col in columns)
    else:
        col_str = "*"

    query = f"""
        SELECT {col_str}
        FROM '{glob_pattern}'
        WHERE {quoted_cluster_column} = {cluster_id}
    """

    result = duckdb.query(query).df()

    if result.empty:
        logger.warning(f"No molecules found for cluster_id={cluster_id}")

    return result

export_cluster(results_dir, cluster_id, output_path, cluster_column='cluster_id', columns=None)

Export a single cluster to a file.

Queries all molecules from a specific cluster and writes them to a file. Supports parquet and CSV output formats.

Parameters:

Name Type Description Default
results_dir Union[str, Path]

Path to directory containing chunked parquet files.

required
cluster_id int

The cluster ID to export.

required
output_path Union[str, Path]

Output file path. Must end with .parquet or .csv.

required
cluster_column str

Name of the column containing cluster IDs.

'cluster_id'
columns Optional[List[str]]

Optional list of columns to export. Default exports all columns.

None

Returns:

Type Description
int

Number of molecules exported.

Raises:

Type Description
ImportError

If duckdb is not installed.

ValueError

If output format is not supported.

Example

count = export_cluster('results/', 42, 'cluster_42.parquet') print(f"Exported {count} molecules")

Source code in chelombus/utils/cluster_io.py
def export_cluster(
    results_dir: Union[str, Path],
    cluster_id: int,
    output_path: Union[str, Path],
    cluster_column: str = "cluster_id",
    columns: Optional[List[str]] = None,
) -> int:
    """Export a single cluster to a file.

    Queries all molecules from a specific cluster and writes them to a file.
    Supports parquet and CSV output formats.

    Args:
        results_dir: Path to directory containing chunked parquet files.
        cluster_id: The cluster ID to export.
        output_path: Output file path. Must end with .parquet or .csv.
        cluster_column: Name of the column containing cluster IDs.
        columns: Optional list of columns to export. Default exports all columns.

    Returns:
        Number of molecules exported.

    Raises:
        ImportError: If duckdb is not installed.
        ValueError: If output format is not supported.

    Example:
        >>> count = export_cluster('results/', 42, 'cluster_42.parquet')
        >>> print(f"Exported {count} molecules")
    """
    _check_duckdb_available()
    import duckdb

    path = _validate_results_dir(results_dir)
    output_path = Path(output_path)
    glob_pattern = str(path / "*.parquet")
    quoted_cluster_column = _quote_identifier(cluster_column)

    # Build column selection
    col_str = ", ".join(columns) if columns else "*"

    # First get the count
    count_query = f"""
        SELECT COUNT(*) as cnt
        FROM '{glob_pattern}'
        WHERE {quoted_cluster_column} = {cluster_id}
    """
    count = duckdb.query(count_query).fetchone()[0] # type: ignore

    if count == 0:
        logger.warning(f"No molecules found for cluster_id={cluster_id}")
        return 0

    # Create parent directory if needed
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Export based on format
    suffix = output_path.suffix.lower()
    if suffix == ".parquet":
        export_query = f"""
            COPY (
                SELECT {col_str}
                FROM '{glob_pattern}'
                WHERE {quoted_cluster_column} = {cluster_id}
            ) TO '{output_path}' (FORMAT PARQUET)
        """
    elif suffix == ".csv":
        export_query = f"""
            COPY (
                SELECT {col_str}
                FROM '{glob_pattern}'
                WHERE {quoted_cluster_column} = {cluster_id}
            ) TO '{output_path}' (FORMAT CSV, HEADER)
        """
    elif suffix in (".smi", ".txt"):
        # For SMILES files, export only the smiles column without header
        export_query = f"""
            COPY (
                SELECT smiles
                FROM '{glob_pattern}'
                WHERE {quoted_cluster_column} = {cluster_id}
            ) TO '{output_path}' (FORMAT CSV, HEADER FALSE)
        """
    else:
        raise ValueError(
            f"Unsupported output format: {suffix}. "
            "Use .parquet, .csv, .smi, or .txt"
        )

    duckdb.query(export_query)
    logger.info(f"Exported {count} molecules to {output_path}")

    return count

export_all_clusters(results_dir, output_dir, format='parquet')

Export all clusters to individual files using partitioned writes.

This function exports all clusters to separate files in a single streaming pass through the data, using DuckDB's PARTITION_BY functionality for memory efficiency.

Output structure (Hive-style partitioning): output_dir/ ├── cluster_id=0/data_0.parquet ├── cluster_id=1/data_0.parquet ├── cluster_id=2/data_0.parquet ... └── cluster_id=99999/data_0.parquet

This structure is compatible with: - DuckDB queries: SELECT * FROM 'clusters//.parquet' WHERE cluster_id = 42 - Spark/pandas: Reads partition column automatically - SLURM arrays: Process cluster_id=$SLURM_ARRAY_TASK_ID

Parameters:

Name Type Description Default
results_dir Union[str, Path]

Path to directory containing chunked parquet files.

required
output_dir Union[str, Path]

Output directory for partitioned cluster files.

required
format str

Output format, either 'parquet' or 'csv'. Default 'parquet'.

'parquet'
batch_size

Number of clusters to process per batch for progress reporting. Does not affect memory usage (DuckDB handles streaming internally).

required

Returns:

Type Description
dict[int, int]

Dictionary mapping cluster_id to molecule count.

Raises:

Type Description
ImportError

If duckdb is not installed.

ValueError

If format is not supported.

Example

stats = export_all_clusters('results/', 'clusters/') print(f"Exported {len(stats)} clusters") print(f"Largest cluster: {max(stats.values())} molecules")

Note

For HPC/SLURM usage, you can process clusters in parallel:

# SLURM array job
#SBATCH --array=0-99999

chelombus visualize \
    --input clusters/cluster_id=$SLURM_ARRAY_TASK_ID/ \
    --output tmaps/cluster_$SLURM_ARRAY_TASK_ID.html
Source code in chelombus/utils/cluster_io.py
def export_all_clusters(
    results_dir: Union[str, Path],
    output_dir: Union[str, Path],
    format: str = "parquet",
) -> dict[int, int]:
    """Export all clusters to individual files using partitioned writes.

    This function exports all clusters to separate files in a single streaming
    pass through the data, using DuckDB's PARTITION_BY functionality for
    memory efficiency.

    Output structure (Hive-style partitioning):
        output_dir/
        ├── cluster_id=0/data_0.parquet
        ├── cluster_id=1/data_0.parquet
        ├── cluster_id=2/data_0.parquet
        ...
        └── cluster_id=99999/data_0.parquet

    This structure is compatible with:
    - DuckDB queries: SELECT * FROM 'clusters/*/*.parquet' WHERE cluster_id = 42
    - Spark/pandas: Reads partition column automatically
    - SLURM arrays: Process cluster_id=$SLURM_ARRAY_TASK_ID

    Args:
        results_dir: Path to directory containing chunked parquet files.
        output_dir: Output directory for partitioned cluster files.
        format: Output format, either 'parquet' or 'csv'. Default 'parquet'.
        batch_size: Number of clusters to process per batch for progress reporting.
            Does not affect memory usage (DuckDB handles streaming internally).

    Returns:
        Dictionary mapping cluster_id to molecule count.

    Raises:
        ImportError: If duckdb is not installed.
        ValueError: If format is not supported.

    Example:
        >>> stats = export_all_clusters('results/', 'clusters/')
        >>> print(f"Exported {len(stats)} clusters")
        >>> print(f"Largest cluster: {max(stats.values())} molecules")

    Note:
        For HPC/SLURM usage, you can process clusters in parallel:

        ```bash
        # SLURM array job
        #SBATCH --array=0-99999

        chelombus visualize \\
            --input clusters/cluster_id=$SLURM_ARRAY_TASK_ID/ \\
            --output tmaps/cluster_$SLURM_ARRAY_TASK_ID.html
        ```
    """
    _check_duckdb_available()
    import duckdb
    from tqdm import tqdm

    path = _validate_results_dir(results_dir)
    output_path = Path(output_dir)
    glob_pattern = str(path / "*.parquet")

    if format not in ("parquet", "csv"):
        raise ValueError(f"Unsupported format: {format}. Use 'parquet' or 'csv'")

    # Create output directory
    output_path.mkdir(parents=True, exist_ok=True)

    logger.info(f"Exporting all clusters from {results_dir} to {output_dir}")

    # Use DuckDB's partitioned write - single pass through all data
    # This is memory-efficient as DuckDB streams the data
    format_str = "PARQUET" if format == "parquet" else "CSV"

    export_query = f"""
        COPY (
            SELECT smiles, cluster_id
            FROM '{glob_pattern}'
        ) TO '{output_path}' (
            FORMAT {format_str},
            PARTITION_BY (cluster_id),
            OVERWRITE_OR_IGNORE
        )
    """

    logger.info("Starting partitioned export (streaming, memory-efficient)...")
    duckdb.query(export_query)

    # Get cluster statistics after export
    logger.info("Calculating cluster statistics...")
    stats = get_cluster_stats(results_dir)

    # Convert to dictionary
    result = dict(zip(stats["cluster_id"], stats["molecule_count"]))

    logger.info(
        f"Exported {len(result)} clusters, "
        f"total {sum(result.values()):,} molecules"
    )

    return result

get_cluster_stats(results_dir, column='cluster_id')

Get statistics for all clusters.

Returns a DataFrame with cluster IDs and molecule counts, sorted by cluster_id.

Parameters:

Name Type Description Default
results_dir Union[str, Path]

Path to directory containing chunked parquet files.

required
column str

Name for the column containing cluster ID defaults to 'cluster_id'

'cluster_id'

Returns:

Type Description
'pd.DataFrame'

DataFrame with columns ['cluster_id', 'molecule_count'].

Example

stats = get_cluster_stats('results/') print(f"Number of clusters: {len(stats)}") print(f"Average cluster size: {stats['molecule_count'].mean():.1f}") print(f"Largest cluster: {stats['molecule_count'].max()}")

Source code in chelombus/utils/cluster_io.py
def get_cluster_stats(
    results_dir: Union[str, Path],
    column: str = 'cluster_id',
) -> "pd.DataFrame":
    """Get statistics for all clusters.

    Returns a DataFrame with cluster IDs and molecule counts, sorted by cluster_id.

    Args:
        results_dir: Path to directory containing chunked parquet files.
        column: Name for the column containing cluster ID defaults to 'cluster_id'

    Returns:
        DataFrame with columns ['cluster_id', 'molecule_count'].

    Example:
        >>> stats = get_cluster_stats('results/')
        >>> print(f"Number of clusters: {len(stats)}")
        >>> print(f"Average cluster size: {stats['molecule_count'].mean():.1f}")
        >>> print(f"Largest cluster: {stats['molecule_count'].max()}")
    """
    _check_duckdb_available()
    import duckdb

    path = _validate_results_dir(results_dir)
    glob_pattern = str(path / "*.parquet")
    quoted_column = _quote_identifier(column)

    query = f"""
        SELECT
            {quoted_column} as cluster_id,
            COUNT(*) as molecule_count
        FROM '{glob_pattern}'
        GROUP BY {quoted_column}
        ORDER BY {quoted_column}
    """

    return duckdb.query(query).df()

get_cluster_ids(results_dir, cluster_id_column_name='cluster_id')

Get list of all cluster IDs in the results.

Parameters:

Name Type Description Default
results_dir Union[str, Path]

Path to directory containing chunked parquet files.

required

Returns:

Type Description
List[int]

Sorted list of cluster IDs.

Example

cluster_ids = get_cluster_ids('results/') print(f"Clusters range from {min(cluster_ids)} to {max(cluster_ids)}")

Source code in chelombus/utils/cluster_io.py
def get_cluster_ids(
    results_dir: Union[str, Path],
    cluster_id_column_name: str = 'cluster_id',
) -> List[int]:
    """Get list of all cluster IDs in the results.

    Args:
        results_dir: Path to directory containing chunked parquet files.

    Returns:
        Sorted list of cluster IDs.

    Example:
        >>> cluster_ids = get_cluster_ids('results/')
        >>> print(f"Clusters range from {min(cluster_ids)} to {max(cluster_ids)}")
    """
    _check_duckdb_available()
    import duckdb

    path = _validate_results_dir(results_dir)
    glob_pattern = str(path / "*.parquet")

    query = f"""
        SELECT DISTINCT {_quote_identifier(cluster_id_column_name)} 
        FROM '{glob_pattern}'
        ORDER BY {_quote_identifier(cluster_id_column_name)}
    """

    result = duckdb.query(query).fetchall()
    return [row[0] for row in result]

get_total_molecules(results_dir)

Get total number of molecules across all clusters.

Parameters:

Name Type Description Default
results_dir Union[str, Path]

Path to directory containing chunked parquet files.

required

Returns:

Type Description
int

Total molecule count.

Source code in chelombus/utils/cluster_io.py
def get_total_molecules(
    results_dir: Union[str, Path],
) -> int:
    """Get total number of molecules across all clusters.

    Args:
        results_dir: Path to directory containing chunked parquet files.

    Returns:
        Total molecule count.
    """
    _check_duckdb_available()
    import duckdb

    path = _validate_results_dir(results_dir)
    glob_pattern = str(path / "*.parquet")

    query = f"SELECT COUNT(*) FROM '{glob_pattern}'"
    return duckdb.query(query).fetchone()[0] # type: ignore

query_clusters_batch(results_dir, cluster_ids, columns=None, cluster_column='cluster_id')

Query molecules from multiple clusters at once.

More efficient than calling query_cluster() multiple times when you need data from several clusters.

Parameters:

Name Type Description Default
results_dir Union[str, Path]

Path to directory containing chunked parquet files.

required
cluster_ids List[int]

List of cluster IDs to query.

required
columns Optional[List[str]]

Optional list of columns to return.

None

Returns:

Type Description
'pd.DataFrame'

DataFrame containing molecules from all specified clusters.

Example

df = query_clusters_batch('results/', [1, 2, 3, 4, 5]) cluster_counts = df.groupby('cluster_id').size()

Source code in chelombus/utils/cluster_io.py
def query_clusters_batch(
    results_dir: Union[str, Path],
    cluster_ids: List[int],
    columns: Optional[List[str]] = None,
    cluster_column: str= 'cluster_id'
) -> "pd.DataFrame":
    """Query molecules from multiple clusters at once.

    More efficient than calling query_cluster() multiple times when you need
    data from several clusters.

    Args:
        results_dir: Path to directory containing chunked parquet files.
        cluster_ids: List of cluster IDs to query.
        columns: Optional list of columns to return.

    Returns:
        DataFrame containing molecules from all specified clusters.

    Example:
        >>> df = query_clusters_batch('results/', [1, 2, 3, 4, 5])
        >>> cluster_counts = df.groupby('cluster_id').size()
    """
    _check_duckdb_available()
    import duckdb
    quoted_cluster_column = _quote_identifier(cluster_column)

    path = _validate_results_dir(results_dir)
    glob_pattern = str(path / "*.parquet")

    col_str = ", ".join(columns) if columns else "*"
    ids_str = ", ".join(str(cid) for cid in cluster_ids)

    query = f"""
        SELECT {col_str}
        FROM '{glob_pattern}'
        WHERE {quoted_cluster_column} IN ({ids_str})
    """

    return duckdb.query(query).df()

sample_from_cluster(results_dir, cluster_id, n=100, random_state=None)

Get a random sample of molecules from a cluster.

Useful for previewing cluster contents without loading all molecules.

Parameters:

Name Type Description Default
results_dir Union[str, Path]

Path to directory containing chunked parquet files.

required
cluster_id int

The cluster ID to sample from.

required
n int

Number of molecules to sample.

100
random_state Optional[int]

Random seed for reproducibility.

None

Returns:

Type Description
'pd.DataFrame'

DataFrame with sampled molecules.

Example

sample = sample_from_cluster('results/', cluster_id=42, n=10) print(sample['smiles'].tolist())

Source code in chelombus/utils/cluster_io.py
def sample_from_cluster(
    results_dir: Union[str, Path],
    cluster_id: int,
    n: int = 100,
    random_state: Optional[int] = None,
) -> "pd.DataFrame":
    """Get a random sample of molecules from a cluster.

    Useful for previewing cluster contents without loading all molecules.

    Args:
        results_dir: Path to directory containing chunked parquet files.
        cluster_id: The cluster ID to sample from.
        n: Number of molecules to sample.
        random_state: Random seed for reproducibility.

    Returns:
        DataFrame with sampled molecules.

    Example:
        >>> sample = sample_from_cluster('results/', cluster_id=42, n=10)
        >>> print(sample['smiles'].tolist())
    """
    _check_duckdb_available()
    import duckdb

    path = _validate_results_dir(results_dir)
    glob_pattern = str(path / "*.parquet")

    # Use setseed() for reproducibility when random_state is provided
    if random_state is not None:
        duckdb.query(f"SELECT setseed({random_state / 2**31})")

    # Use ORDER BY RANDOM() for sampling, which is more portable
    query = f"""
        SELECT *
        FROM '{glob_pattern}'
        WHERE cluster_id = {cluster_id}
        ORDER BY RANDOM()
        LIMIT {n}
    """

    return duckdb.query(query).df()

Helper Functions

chelombus.utils.helper_functions

format_time(seconds)

Simpel script to transform seconds into readable format

Parameters:

Name Type Description Default
seconds int

Seconds

required

Returns:

Name Type Description
str

(int) h, (int) min, (float) seconds

Source code in chelombus/utils/helper_functions.py
def format_time(seconds):
    """Simpel script to transform seconds into readable format

    Args:
        seconds (int): Seconds 

    Returns:
        str: (int) h, (int) min, (float) seconds 
    """
    hours, rem = divmod(seconds, 3600)
    minutes, seconds = divmod(rem, 60)
    return f"{int(hours)} h; {int(minutes)}min; {seconds:.2f} s" 

save_chunk(fp_chunk, output_dir, chunk_index, file_format='npy', name='fingerprints_chunk', **kwargs)

Save a chunk of fingerprint data to a file in the specified format.

Parameters:

Name Type Description Default
fp_chunk ndarray

The fingerprint array chunk (each row corresponds to a fingerprint).

required
output_dir str

Directory path where the fingerprint chunk will be saved.

required
chunk_index int

The index number for the current chunk. This is used to generate a unique file name.

required
file_format str

Format to save the data. Options are: - 'npy': Save as a NumPy binary file. - 'parquet': Save as an Apache Parquet file. Default is 'npy'.

'npy'
name str

Name prefix for each the file to be generated.

'fingerprints_chunk'
**kwargs

Additional keyword arguments to pass to the saving function. For 'npy', kwargs are passed to np.save. For 'parquet', kwargs are passed to DataFrame.to_parquet.

{}

Returns:

Name Type Description
str str

The full path of the saved file.

Source code in chelombus/utils/helper_functions.py
def save_chunk(fp_chunk: np.ndarray | pd.DataFrame, output_dir: str, chunk_index: int,
                  file_format: str = 'npy', name:str="fingerprints_chunk", **kwargs) -> str:
    """
    Save a chunk of fingerprint data to a file in the specified format.

    Parameters:
        fp_chunk (np.ndarray): The fingerprint array chunk (each row corresponds to a fingerprint).
        output_dir (str): Directory path where the fingerprint chunk will be saved.
        chunk_index (int): The index number for the current chunk. This is used to generate a unique file name.
        file_format (str): Format to save the data. Options are:
                           - 'npy': Save as a NumPy binary file.
                           - 'parquet': Save as an Apache Parquet file.
                           Default is 'npy'.
        name (str): Name prefix for each the file to be generated. 
        **kwargs: Additional keyword arguments to pass to the saving function.
                  For 'npy', kwargs are passed to `np.save`.
                  For 'parquet', kwargs are passed to `DataFrame.to_parquet`.

    Returns:
        str: The full path of the saved file.
    """
    # Ensure the output directory exists.
    os.makedirs(output_dir, exist_ok=True)

    if file_format.lower() == 'npy':
        filename = os.path.join(output_dir, f"{name}_{chunk_index:05d}.npy")
        np.save(filename, fp_chunk, **kwargs)
        del fp_chunk
    elif file_format.lower() == 'parquet':
        filename = os.path.join(output_dir, f"{name}_{chunk_index:05d}.parquet")
        # Each row is a fingerprint and each column is a bit.
        df = pd.DataFrame(fp_chunk)
        del fp_chunk
        df.to_parquet(filename, **kwargs)
        del df 
    else:
        raise ValueError("Unsupported file format. Please choose 'npy' or 'parquet'.")
    return filename