from torch_geometric.nn import GCNConv import torch.nn as nn import torch.nn.functional as F from torch_scatter import scatter from transformers import PreTrainedModel from gcn_model.configuration_gcn import GCNConfig import torch from rdkit import Chem from rdkit.Chem import AllChem import torch from torch_geometric.data import Data import os from transformers import PretrainedConfig from typing import List from torch_geometric.loader import DataLoader from tqdm import tqdm import pandas as pd from transformers import AutoModel class GCNConfig(PretrainedConfig): model_type = "gcn" def __init__( self, input_feature: int=64, emb_input: int=20, hidden_size: int=64, n_layers: int=6, num_classes: int=1, smiles: List[str] = None, processor_class: str = "SmilesProcessor", **kwargs, ): self.input_feature = input_feature # the dimension of input feature self.emb_input = emb_input # the embedding dimension of input feature self.hidden_size = hidden_size # the hidden size of GCN self.n_layers = n_layers # the number of GCN layers self.num_classes = num_classes # the number of output classes self.smiles = smiles # process smiles self.processor_class = processor_class super().__init__(**kwargs) class SmilesDataset(torch.utils.data.Dataset): def __init__(self, smiles): self.smiles_list = smiles self.data_list = [] def __len__(self): return len(self.data_list) def __getitem__(self, idx): return self.data_list[idx] def get_data(self, smiles): self.smiles_list = smiles # self.data_list = [] # bonds = {BT.SINGLE: 0, BT.DOUBLE: 1, BT.TRIPLE: 2, BT.AROMATIC: 3} types = {'H': 0, 'C': 1, 'N': 2, 'O': 3, 'S': 4} for i in range(len(self.smiles_list)): # 将 SMILES 表示转换为 RDKit 的分子对象 # print(self.smiles_list[i]) mol = Chem.MolFromSmiles(self.smiles_list[i]) # 从smiles编码中获取结构信息 if mol is None: print("无法创建Mol对象", self.smiles_list[i]) else: mol3d = Chem.AddHs( mol) # 在rdkit中,分子在默认情况下是不显示氢的,但氢原子对于真实的几何构象计算有很大的影响,所以在计算3D构象前,需要使用Chem.AddHs()方法加上氢原子 if mol3d is None: print("无法创建mol3d对象", self.smiles_list[i]) else: AllChem.EmbedMolecule(mol3d, randomSeed=1) # 生成3D构象 N = mol3d.GetNumAtoms() # 获取原子坐标信息 if mol3d.GetNumConformers() > 0: conformer = mol3d.GetConformer() pos = conformer.GetPositions() pos = torch.tensor(pos, dtype=torch.float) type_idx = [] # atomic_number = [] # aromatic = [] # sp = [] # sp2 = [] # sp3 = [] for atom in mol3d.GetAtoms(): type_idx.append(types[atom.GetSymbol()]) # atomic_number.append(atom.GetAtomicNum()) # aromatic.append(1 if atom.GetIsAromatic() else 0) # hybridization = atom.GetHybridization() # sp.append(1 if hybridization == HybridizationType.SP else 0) # sp2.append(1 if hybridization == HybridizationType.SP2 else 0) # sp3.append(1 if hybridization == HybridizationType.SP3 else 0) # z = torch.tensor(atomic_number, dtype=torch.long) row, col, edge_type = [], [], [] for bond in mol3d.GetBonds(): start, end = bond.GetBeginAtomIdx(), bond.GetEndAtomIdx() row += [start, end] col += [end, start] # edge_type += 2 * [bonds[bond.GetBondType()]] edge_index = torch.tensor([row, col], dtype=torch.long) # edge_type = torch.tensor(edge_type, dtype=torch.long) # edge_attr = F.one_hot(edge_type, num_classes=len(bonds)).to(torch.float) perm = (edge_index[0] * N + edge_index[1]).argsort() edge_index = edge_index[:, perm] # edge_type = edge_type[perm] # edge_attr = edge_attr[perm] # # row, col = edge_index # hs = (z == 1).to(torch.float) x = torch.tensor(type_idx).to(torch.float) # y = self.y_list[i] data = Data(x=x, pos=pos, edge_index=edge_index, smiles=self.smiles_list[i]) self.data_list.append(data) else: print("无法创建comfor", self.smiles_list[i]) return self.data_list """ MLP Layer used after graph vector representation """ class MLPReadout(nn.Module): def __init__(self, input_dim, output_dim, L=2): # L=nb_hidden_layers super().__init__() list_FC_layers = [nn.Linear(input_dim // 2 ** l, input_dim // 2 ** (l + 1), bias=True) for l in range(L)] list_FC_layers.append(nn.Linear(input_dim // 2 ** L, output_dim, bias=True)) self.FC_layers = nn.ModuleList(list_FC_layers) self.L = L def forward(self, x): y = x for l in range(self.L): y = self.FC_layers[l](y) y = F.relu(y) y = self.FC_layers[self.L](y) return y class GCNNet(torch.nn.Module): def __init__(self, input_feature=64, emb_input=20, hidden_size=64, n_layers=6, num_classes=1): super(GCNNet, self).__init__() self.embedding = torch.nn.Embedding(emb_input, hidden_size, padding_idx=0) self.input_feature = input_feature self.n_layers = n_layers # 2层GCN self.num_classes = num_classes self.conv1 = GCNConv(hidden_size, hidden_size) self.conv2 = GCNConv(hidden_size, 32) self.mlp = MLPReadout(32, num_classes) def forward_features(self, data): x, edge_index, batch = data.x.long(), data.edge_index, data.batch x = self.embedding(x.reshape(-1)) for i in range(self.n_layers): x = F.relu(self.conv1(x, edge_index)) x = F.relu(self.conv2(x, edge_index)) x = scatter(x, batch, dim=-2, reduce='mean') x = self.mlp(x) return x.squeeze(-1) class GCNModel(PreTrainedModel): config_class = GCNConfig def __init__(self, config): super().__init__(config) self.model = GCNNet( input_feature=config.input_feature, emb_input=config.emb_input, hidden_size=config.hidden_size, n_layers=config.n_layers, num_classes=config.num_classes, ) self.process = SmilesDataset( smiles=config.smiles, ) self.gcn_model = None self.dataset = None self.output = None self.data_loader = None self.pred_data = None def forward(self, tensor): return self.model.forward_features(tensor) # def process_smiles(self, smiles): # return self.process.get_data(smiles) def predict_smiles(self, smiles, device: str='cpu', result_dir: str='./', **kwargs): batch_size = kwargs.pop('batch_size', 1) shuffle = kwargs.pop('shuffle', False) drop_last = kwargs.pop('drop_last', False) num_workers = kwargs.pop('num_workers', 0) self.gcn_model = AutoModel.from_pretrained("Huhujingjing/custom-gcn", trust_remote_code=True).to(device) self.gcn_model.eval() self.dataset = self.process.get_data(smiles) self.output = "" self.output += ("predicted samples num: {}\n".format(len(self.dataset))) self.output +=("predicted samples:{}\n".format(self.dataset[0])) self.data_loader = DataLoader(self.dataset, batch_size=batch_size, shuffle=shuffle, drop_last=drop_last, num_workers=num_workers ) self.pred_data = { 'smiles': [], 'pred': [] } for batch in tqdm(self.data_loader): batch = batch.to(device) with torch.no_grad(): self.pred_data['smiles'] += batch['smiles'] self.pred_data['pred'] += self.gcn_model(batch).cpu().tolist() pred = torch.tensor(self.pred_data['pred']).reshape(-1) if device == 'cuda': pred = pred.cpu().tolist() self.pred_data['pred'] = pred pred_df = pd.DataFrame(self.pred_data) pred_df['pred'] = pred_df['pred'].apply(lambda x: round(x, 2)) self.output +=('-' * 40 + '\n'+'predicted result: \n'+'{}\n'.format(pred_df)) self.output +=('-' * 40) pred_df.to_csv(os.path.join(result_dir, 'gcn.csv'), index=False) self.output +=('\nsave predicted result to {}\n'.format(os.path.join(result_dir, 'gcn.csv'))) return self.output if __name__ == "__main__": gcn_config = GCNConfig(input_feature=64, emb_input=20, hidden_size=64, n_layers=6, num_classes=1, smiles=["C", "CC", "CCC"], processor_class="SmilesProcessor") gcnd = GCNModel(gcn_config) gcnd.model.load_state_dict(torch.load(r'G:\Trans_MXM\gcn_model\gcn.pt')) gcnd.save_pretrained("custom-gcn")