Source code for dig.xgraph.dataset.mol_dataset

import os
import re
import torch
import numpy as np
from torch_geometric.utils import dense_to_sparse
from torch_geometric.data import Data, InMemoryDataset, \
    download_url

try:
    from rdkit import Chem
except ImportError:
    Chem = None
import os.path as osp
import zipfile
import gzip

x_map = {
    'atomic_num':
    list(range(0, 119)),
    'chirality': [
        'CHI_UNSPECIFIED',
        'CHI_TETRAHEDRAL_CW',
        'CHI_TETRAHEDRAL_CCW',
        'CHI_OTHER',
    ],
    'degree':
    list(range(0, 11)),
    'formal_charge':
    list(range(-5, 7)),
    'num_hs':
    list(range(0, 9)),
    'num_radical_electrons':
    list(range(0, 5)),
    'hybridization': [
        'UNSPECIFIED',
        'S',
        'SP',
        'SP2',
        'SP3',
        'SP3D',
        'SP3D2',
        'OTHER',
    ],
    'is_aromatic': [False, True],
    'is_in_ring': [False, True],
}

e_map = {
    'bond_type': [
        'misc',
        'SINGLE',
        'DOUBLE',
        'TRIPLE',
        'AROMATIC',
    ],
    'stereo': [
        'STEREONONE',
        'STEREOZ',
        'STEREOE',
        'STEREOCIS',
        'STEREOTRANS',
        'STEREOANY',
    ],
    'is_conjugated': [False, True],
}


def maybe_log(path, log=True):
    if log:
        print('Extracting', path)


def extract_gz(path, folder, log=True):
    maybe_log(path, log)
    with gzip.open(path, 'r') as r:
        with open(osp.join(folder, '.'.join(os.path.basename(path).split('.')[:-1])), 'wb') as w:
            w.write(r.read())


def extract_zip(path, folder, log=True):
    r"""Extracts a zip archive to a specific folder.

    Args:
        path (string): The path to the tar archive.
        folder (string): The folder.
        log (bool, optional): If :obj:`False`, will not print anything to the
            console. (default: :obj:`True`)
    """
    maybe_log(path, log)
    with zipfile.ZipFile(path, 'r') as f:
        f.extractall(folder)


[docs]class MoleculeDataset(InMemoryDataset): r""" The extension of MoleculeNet with `MUTAG <https://pubs.acs.org/doi/10.1021/jm00106a046>`_. The `MoleculeNet benchmark collection <http://moleculenet.ai/datasets-1>`_ from the `MoleculeNet: A Benchmark for Molecular Machine Learning <https://arxiv.org/abs/1703.00564>`_ paper, containing datasets from physical chemistry, biophysics and physiology. The MoleculeNet datasets come with the additional node and edge features introduced by the `Open Graph Benchmark <https://ogb.stanford.edu/docs/graphprop/>`_, and the node features in MUTAG dataset are one hot features denoting the atom types. Args: root (string): Root directory where the dataset should be saved. name (string): The name of the dataset (:obj:`"MUTAG"`, :obj:`"ESOL"`, :obj:`"FreeSolv"`, :obj:`"Lipo"`, :obj:`"PCBA"`, :obj:`"MUV"`, :obj:`"HIV"`, :obj:`"BACE"`, :obj:`"BBPB"`, :obj:`"Tox21"`, :obj:`"ToxCast"`, :obj:`"SIDER"`, :obj:`"ClinTox"`). transform (callable, optional): A function/transform that takes in an :obj:`torch_geometric.data.Data` object and returns a transformed version. The data object will be transformed before every access. (default: :obj:`None`) pre_transform (callable, optional): A function/transform that takes in an :obj:`torch_geometric.data.Data` object and returns a transformed version. The data object will be transformed before being saved to disk. (default: :obj:`None`) pre_filter (callable, optional): A function that takes in an :obj:`torch_geometric.data.Data` object and returns a boolean value, indicating whether the data object should be included in the final dataset. (default: :obj:`None`) """ url = 'https://deepchemdata.s3-us-west-1.amazonaws.com/datasets/{}' mutag_url = 'https://github.com/divelab/DIG_storage/raw/main/xgraph/datasets/{}' # Format: name: [display_name, url_name, filename, smiles_idx, y_idx] names = { 'mutag': ['MUTAG', 'MUTAG.zip', None, None], 'esol': ['ESOL', 'delaney-processed.csv', 'delaney-processed.csv', -1, -2], 'freesolv': ['FreeSolv', 'SAMPL.csv', 'SAMPL.csv', 1, 2], 'lipo': ['Lipophilicity', 'Lipophilicity.csv', 'Lipophilicity.csv', 2, 1], 'pcba': ['PCBA', 'pcba.csv.gz', 'pcba.csv', -1, slice(0, 128)], 'muv': ['MUV', 'muv.csv.gz', 'muv.csv', -1, slice(0, 17)], 'hiv': ['HIV', 'HIV.csv', 'HIV.csv', 0, -1], 'bace': ['BACE', 'bace.csv', 'bace.csv', 0, 2], 'bbbp': ['BBBP', 'BBBP.csv', 'BBBP.csv', -1, -2], 'tox21': ['Tox21', 'tox21.csv.gz', 'tox21.csv', -1, slice(0, 12)], 'toxcast': ['ToxCast', 'toxcast_data.csv.gz', 'toxcast_data.csv', 0, slice(1, 618)], 'sider': ['SIDER', 'sider.csv.gz', 'sider.csv', 0, slice(1, 28)], 'clintox': ['ClinTox', 'clintox.csv.gz', 'clintox.csv', 0, slice(1, 3)], } def __init__(self, root, name, transform=None, pre_transform=None, pre_filter=None): if Chem is None: raise ImportError('`MoleculeNet` requires `rdkit`.') self.name = name.lower() assert self.name in self.names.keys() super(MoleculeDataset, self).__init__(root, transform, pre_transform, pre_filter) self.data, self.slices = torch.load(self.processed_paths[0]) @property def raw_dir(self): return osp.join(self.root, self.name, 'raw') @property def processed_dir(self): return osp.join(self.root, self.name, 'processed') @property def raw_file_names(self): if self.name.lower() == 'MUTAG'.lower(): return ['MUTAG_A.txt', 'MUTAG_graph_labels.txt', 'MUTAG_graph_indicator.txt', 'MUTAG_node_labels.txt', 'README.txt'] else: return self.names[self.name][2] @property def processed_file_names(self): return 'data.pt'
[docs] def download(self): if self.name.lower() == 'MUTAG'.lower(): url = self.mutag_url.format(self.names[self.name][1]) else: url = self.url.format(self.names[self.name][1]) path = download_url(url, self.raw_dir) if self.names[self.name][1][-2:] == 'gz': extract_gz(path, self.raw_dir) os.unlink(path) elif self.names[self.name][1][-3:] == 'zip': extract_zip(path, self.raw_dir) os.unlink(path)
[docs] def process(self): if self.name.lower() == 'MUTAG'.lower(): with open(os.path.join(self.raw_dir, 'MUTAG_node_labels.txt'), 'r') as f: nodes_all_temp = f.read().splitlines() nodes_all = [int(i) for i in nodes_all_temp] adj_all = np.zeros((len(nodes_all), len(nodes_all))) with open(os.path.join(self.raw_dir, 'MUTAG_A.txt'), 'r') as f: adj_list = f.read().splitlines() for item in adj_list: lr = item.split(', ') l = int(lr[0]) r = int(lr[1]) adj_all[l - 1, r - 1] = 1 with open(os.path.join(self.raw_dir, 'MUTAG_graph_indicator.txt'), 'r') as f: graph_indicator_temp = f.read().splitlines() graph_indicator = [int(i) for i in graph_indicator_temp] graph_indicator = np.array(graph_indicator) with open(os.path.join(self.raw_dir, 'MUTAG_graph_labels.txt'), 'r') as f: graph_labels_temp = f.read().splitlines() graph_labels = [int(i) for i in graph_labels_temp] data_list = [] for i in range(1, 189): idx = np.where(graph_indicator == i) graph_len = len(idx[0]) adj = adj_all[idx[0][0]:idx[0][0] + graph_len, idx[0][0]:idx[0][0] + graph_len] label = int(graph_labels[i - 1] == 1) feature = nodes_all[idx[0][0]:idx[0][0] + graph_len] nb_clss = 7 targets = np.array(feature).reshape(-1) one_hot_feature = np.eye(nb_clss)[targets] data_example = Data(x=torch.from_numpy(one_hot_feature).float(), edge_index=dense_to_sparse(torch.from_numpy(adj))[0], y=label) data_list.append(data_example) else: with open(self.raw_paths[0], 'r') as f: dataset = f.read().split('\n')[1:-1] dataset = [x for x in dataset if len(x) > 0] # Filter empty lines. data_list = [] for line in dataset: line = re.sub(r'\".*\"', '', line) # Replace ".*" strings. line = line.split(',') smiles = line[self.names[self.name][3]] ys = line[self.names[self.name][4]] ys = ys if isinstance(ys, list) else [ys] ys = [float(y) if len(y) > 0 else float('NaN') for y in ys] y = torch.tensor(ys, dtype=torch.float).view(1, -1) mol = Chem.MolFromSmiles(smiles) if mol is None: continue xs = [] for atom in mol.GetAtoms(): x = [] x.append(x_map['atomic_num'].index(atom.GetAtomicNum())) x.append(x_map['chirality'].index(str(atom.GetChiralTag()))) x.append(x_map['degree'].index(atom.GetTotalDegree())) x.append(x_map['formal_charge'].index(atom.GetFormalCharge())) x.append(x_map['num_hs'].index(atom.GetTotalNumHs())) x.append(x_map['num_radical_electrons'].index( atom.GetNumRadicalElectrons())) x.append(x_map['hybridization'].index( str(atom.GetHybridization()))) x.append(x_map['is_aromatic'].index(atom.GetIsAromatic())) x.append(x_map['is_in_ring'].index(atom.IsInRing())) xs.append(x) x = torch.tensor(xs, dtype=torch.long).view(-1, 9) edge_indices, edge_attrs = [], [] for bond in mol.GetBonds(): i = bond.GetBeginAtomIdx() j = bond.GetEndAtomIdx() e = [] e.append(e_map['bond_type'].index(str(bond.GetBondType()))) e.append(e_map['stereo'].index(str(bond.GetStereo()))) e.append(e_map['is_conjugated'].index(bond.GetIsConjugated())) edge_indices += [[i, j], [j, i]] edge_attrs += [e, e] edge_index = torch.tensor(edge_indices) edge_index = edge_index.t().to(torch.long).view(2, -1) edge_attr = torch.tensor(edge_attrs, dtype=torch.long).view(-1, 3) # Sort indices. if edge_index.numel() > 0: perm = (edge_index[0] * x.size(0) + edge_index[1]).argsort() edge_index, edge_attr = edge_index[:, perm], edge_attr[perm] data = Data(x=x, edge_index=edge_index, edge_attr=edge_attr, y=y, smiles=smiles) if self.pre_filter is not None and not self.pre_filter(data): continue if self.pre_transform is not None: data = self.pre_transform(data) data_list.append(data) torch.save(self.collate(data_list), self.processed_paths[0])
def __repr__(self): return '{}({})'.format(self.names[self.name][0], len(self))
if __name__ == '__main__': dataset = MoleculeDataset(root='.', name='MUTAG')