# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import filecmp import os import shutil import tempfile from typing import Dict, Optional, Set, Union import pytest import torch from huggingface_hub.hf_api import ModelFilter from omegaconf import DictConfig, OmegaConf, open_dict from nemo.collections.asr.models import EncDecCTCModel, EncDecCTCModelBPE from nemo.collections.nlp.models import PunctuationCapitalizationModel from nemo.core.classes import ModelPT from nemo.core.connectors import save_restore_connector from nemo.utils.app_state import AppState from nemo.utils.exceptions import NeMoBaseException def classpath(cls): return f'{cls.__module__}.{cls.__name__}' def get_dir_size(path='.'): total = 0 with os.scandir(path) as it: for entry in it: if entry.is_file(): total += entry.stat().st_size elif entry.is_dir(): total += get_dir_size(entry.path) return total def get_size(path='.'): if os.path.isfile(path): return os.path.getsize(path) elif os.path.isdir(path): return get_dir_size(path) def getattr2(object, attr): if not '.' in attr: return getattr(object, attr) else: arr = attr.split('.') return getattr2(getattr(object, arr[0]), '.'.join(arr[1:])) class MockModel(ModelPT): def __init__(self, cfg, trainer=None): super(MockModel, self).__init__(cfg=cfg, trainer=trainer) self.w = torch.nn.Linear(10, 1) # mock temp file if 'temp_file' in self.cfg and self.cfg.temp_file is not None: self.setup_data_from_file(self.cfg.temp_file) else: self.temp_file = None self.temp_data = None def setup_data_from_file(self, temp_file): """ Load data from temp_file to `self.temp_data` Allows to test changing resource after instantiation """ with open_dict(self.cfg): self.cfg.temp_file = temp_file self.temp_file = self.register_artifact('temp_file', self.cfg.temp_file) with open(self.temp_file, 'r', encoding='utf-8') as f: self.temp_data = f.readlines() def change_stub_number(self, new_number: int): """ Change stub number in config, useful for testing nested models, since child can mutate config independently """ self.cfg.stub_number = new_number def forward(self, x): y = self.w(x) return y, self.cfg.temp_file def setup_training_data(self, train_data_config: Union[DictConfig, Dict]): self._train_dl = None def setup_validation_data(self, val_data_config: Union[DictConfig, Dict]): self._validation_dl = None def setup_test_data(self, test_data_config: Union[DictConfig, Dict]): self._test_dl = None @classmethod def list_available_models(cls): return [] class MockModelWithChildren(MockModel): """ Mock Model, can contain 2 children (other NeMo models) """ def __init__(self, cfg, trainer=None): super().__init__(cfg=cfg, trainer=trainer) # variant 1 for creating nested NeMo model: # load model directly from config # variant 2 for creating nested NeMo model: # - initialize child model from .nemo checkpoint, subconfig will be automatically saved # - after saving model will be restored directly from subconfig (attribute `config_field` of self.cfg) # child 1 self.child1_model: Optional[MockModel] # annotate type for IDE autocompletion and type checking if cfg.get("child1_model") is not None: self.register_nemo_submodule( "child1_model", config_field="child1_model", model=MockModel(self.cfg.child1_model), ) elif cfg.get("child1_model_path") is not None: self.register_nemo_submodule( "child1_model", config_field="child1_model", model=MockModel.restore_from(self.cfg.child1_model_path), ) else: self.child1_model = None # child 2 # can have sub-children self.child2_model: Optional[MockModelWithChildren] # annotate type for IDE autocompletion and type checking if cfg.get("child2_model") is not None: self.register_nemo_submodule( "child2_model", config_field="child2_model", model=MockModelWithChildren(self.cfg.child2_model), ) elif cfg.get("child2_model_path") is not None: self.register_nemo_submodule( "child2_model", config_field="child2_model", model=MockModelWithChildren.restore_from(self.cfg.child2_model_path), ) else: self.child2_model = None class MockModelWithChildEncDecCTCBPE(MockModel): """ Mock Model, will contain EncDecCTC model as a child Useful for testing nested models with children initialized from pretrained NeMo models """ def __init__(self, cfg, trainer=None): super().__init__(cfg=cfg, trainer=trainer) # variant 3 for creating nested NeMo model: # - initialize child model from pretrained NeMo model, subconfig will be automatically saved # - after saving model will be restored directly from subconfig (attribute `config_field` of self.cfg) self.ctc_model: EncDecCTCModelBPE # annotate type for IDE autocompletion and type checking if cfg.get("ctc_model", None) is not None: self.register_nemo_submodule( "ctc_model", config_field="ctc_model", model=EncDecCTCModelBPE(self.cfg.ctc_model), ) else: # model is mandatory assert cfg.get("ctc_model_pretrained", None) is not None self.register_nemo_submodule( "ctc_model", config_field="ctc_model", model=EncDecCTCModelBPE.from_pretrained(self.cfg.ctc_model_pretrained), ) class MockModelWithChildCustomConfigPath(MockModel): """ Mock Model, can contain 1 child Path in config is not equal to name of the attribute Config is stored in `child1_model_config` Child model is stored in `child1_model` attribute NB: This is not recommended if it's not necessary. But here we test that it works. """ def __init__(self, cfg, trainer=None): super().__init__(cfg=cfg, trainer=trainer) self.child1_model: Optional[MockModel] # annotate type for IDE autocompletion and type checking if cfg.get("child1_model_config") is not None: self.register_nemo_submodule( "child1_model", config_field="child1_model_config", model=MockModel(self.cfg.child1_model_config), ) else: self.child1_model = None class MockModelIncorrectWithNemoArtifact(MockModel): """ Incorrect model that tries to use .nemo model checkpoint as an artifact Expected to fail, since it is not supported """ def __init__(self, cfg, trainer=None): super().__init__(cfg=cfg, trainer=trainer) assert cfg.get("child_model_path") is not None # this will fail, since .nemo model checkpoint is not supported as an artifact child_model_path = self.register_artifact("child_model_path", cfg.child_model_path) self.child_model = ModelPT.restore_from(child_model_path) def _mock_model_config(): conf = {'temp_file': None, 'target': classpath(MockModel), 'stub_number': 1} conf = OmegaConf.create({'model': conf}) OmegaConf.set_struct(conf, True) return conf def _mock_model_with_children_config( child1_model_path: Optional[str] = None, child2_model_path: Optional[str] = None, child2_model_cfg: Optional[DictConfig] = None, ) -> DictConfig: """ Child 1 always constructed from .nemo model checkpoint (optional) Child 2 can be constructed directly from subconfig (optional) or from .nemo model checkpoint (optional) """ conf = { 'temp_file': None, 'target': classpath(MockModelWithChildren), 'child1_model': None, 'child1_model_path': child1_model_path, 'child2_model': child2_model_cfg, 'child2_model_path': child2_model_path, 'stub_number': 1, } conf = OmegaConf.create({'model': conf}) OmegaConf.set_struct(conf, True) return conf def _mock_model_with_child_encdecctcbpe_config(pretrained_model_name: str) -> DictConfig: conf = {'temp_file': None, 'ctc_model_pretrained': pretrained_model_name, 'stub_number': 1} conf = OmegaConf.create({'model': conf}) OmegaConf.set_struct(conf, True) return conf def _mock_model_with_child_custom_config_path_config(): conf = { 'temp_file': None, 'child1_model_config': _mock_model_config().model, 'target': classpath(MockModelWithChildCustomConfigPath), 'stub_number': 1, } conf = OmegaConf.create({'model': conf}) OmegaConf.set_struct(conf, True) return conf def _mock_model_incorrect_with_nemo_artifact_config(child_model_path: str): conf = {'temp_file': None, 'child_model_path': child_model_path, 'stub_number': 1} conf = OmegaConf.create({'model': conf}) OmegaConf.set_struct(conf, True) return conf class TestSaveRestore: def __test_restore_elsewhere( self, model: ModelPT, attr_for_eq_check: Set[str] = None, override_config_path: Optional[Union[str, DictConfig]] = None, map_location: Optional[Union[torch.device, str]] = None, strict: bool = False, return_config: bool = False, ): """Test's logic: 1. Save model into temporary folder (save_folder) 2. Copy .nemo file from save_folder to restore_folder 3. Delete save_folder 4. Attempt to restore from .nemo file in restore_folder and compare to original instance """ # Create a new temporary directory with tempfile.TemporaryDirectory() as restore_folder: with tempfile.TemporaryDirectory() as save_folder: save_folder_path = save_folder # Where model will be saved model_save_path = os.path.join(save_folder, f"{model.__class__.__name__}.nemo") model.save_to(save_path=model_save_path) # Where model will be restored from model_restore_path = os.path.join(restore_folder, f"{model.__class__.__name__}.nemo") shutil.copy(model_save_path, model_restore_path) # at this point save_folder should not exist assert save_folder_path is not None and not os.path.exists(save_folder_path) assert not os.path.exists(model_save_path) assert os.path.exists(model_restore_path) # attempt to restore model_copy = model.__class__.restore_from( restore_path=model_restore_path, map_location=map_location, strict=strict, return_config=return_config, override_config_path=override_config_path, ) if return_config: return model_copy assert model.num_weights == model_copy.num_weights if attr_for_eq_check is not None and len(attr_for_eq_check) > 0: for attr in attr_for_eq_check: assert getattr2(model, attr) == getattr2(model_copy, attr) return model_copy @pytest.mark.with_downloads() @pytest.mark.unit def test_EncDecCTCModel(self): # TODO: Switch to using named configs because here we don't really care about weights qn = EncDecCTCModel.from_pretrained(model_name="QuartzNet15x5Base-En") self.__test_restore_elsewhere(model=qn, attr_for_eq_check=set(["decoder._feat_in", "decoder._num_classes"])) @pytest.mark.with_downloads() @pytest.mark.unit def test_EncDecCTCModelBPE(self): # TODO: Switch to using named configs because here we don't really care about weights cn = EncDecCTCModelBPE.from_pretrained(model_name="stt_en_citrinet_256") self.__test_restore_elsewhere(model=cn, attr_for_eq_check=set(["decoder._feat_in", "decoder._num_classes"])) @pytest.mark.with_downloads() @pytest.mark.unit def test_EncDecCTCModelBPE_v2(self): # TODO: Switch to using named configs because here we don't really care about weights cn = EncDecCTCModelBPE.from_pretrained(model_name="stt_en_conformer_ctc_small") self.__test_restore_elsewhere(model=cn, attr_for_eq_check=set(["decoder._feat_in", "decoder._num_classes"])) @pytest.mark.with_downloads() @pytest.mark.unit def test_EncDecCTCModelBPE_v3(self): # TODO: Switch to using named configs because here we don't really care about weights cn = EncDecCTCModelBPE.from_pretrained(model_name="stt_en_squeezeformer_ctc_xsmall_ls") self.__test_restore_elsewhere(model=cn, attr_for_eq_check=set(["decoder._feat_in", "decoder._num_classes"])) @pytest.mark.with_downloads() @pytest.mark.unit def test_EncDecCTCModelBPE_HF(self): # TODO: Switch to using named configs because here we don't really care about weights # Specifically use ModelPT instead of EncDecCTCModelBPE in order to test target class resolution. cn = ModelPT.from_pretrained(model_name="nvidia/stt_en_citrinet_256_ls") self.__test_restore_elsewhere(model=cn, attr_for_eq_check=set(["decoder._feat_in", "decoder._num_classes"])) @pytest.mark.with_downloads() @pytest.mark.unit def test_PunctuationCapitalization(self): # TODO: Switch to using named configs because here we don't really care about weights pn = PunctuationCapitalizationModel.from_pretrained(model_name='punctuation_en_distilbert') self.__test_restore_elsewhere( model=pn, attr_for_eq_check=set(["punct_classifier.log_softmax", "punct_classifier.log_softmax"]) ) @pytest.mark.unit def test_mock_save_to_restore_from(self): with tempfile.NamedTemporaryFile('w') as empty_file: # Write some data empty_file.writelines(["*****\n"]) empty_file.flush() # Update config cfg = _mock_model_config() cfg.model.temp_file = empty_file.name # Create model model = MockModel(cfg=cfg.model, trainer=None) model = model.to('cpu') assert model.temp_file == empty_file.name # Save test model_copy = self.__test_restore_elsewhere(model, map_location='cpu') # Restore test diff = model.w.weight - model_copy.w.weight # because of caching - cache gets prepended assert os.path.basename(model_copy.temp_file).endswith(os.path.basename(model.temp_file)) assert diff.mean() <= 1e-9 # assert os.path.basename(model.temp_file) == model_copy.temp_file assert model_copy.temp_data == ["*****\n"] @pytest.mark.unit def test_mock_restore_from_config_only(self): with tempfile.NamedTemporaryFile('w') as empty_file: # Write some data empty_file.writelines(["*****\n"]) empty_file.flush() # Update config cfg = _mock_model_config() cfg.model.temp_file = os.path.abspath(empty_file.name) # Inject arbitrary config arguments (after creating model) with open_dict(cfg.model): cfg.model.xyz = "abc" # Create model model = MockModel(cfg=cfg.model, trainer=None) model = model.to('cpu') assert model.temp_file == empty_file.name model_copy = self.__test_restore_elsewhere(model, map_location='cpu', return_config=False) # because of caching - cache gets prepended assert os.path.basename(model_copy.temp_file).endswith(os.path.basename(model.temp_file)) # assert filecmp.cmp(model.temp_file, model_copy._cfg.temp_file) assert model.cfg.xyz == model_copy.cfg.xyz @pytest.mark.unit def test_mock_restore_from_config_override_with_OmegaConf(self): with tempfile.NamedTemporaryFile('w') as empty_file: # Write some data empty_file.writelines(["*****\n"]) empty_file.flush() # Update config cfg = _mock_model_config() cfg.model.temp_file = empty_file.name # Create model model = MockModel(cfg=cfg.model, trainer=None) model = model.to('cpu') assert model.temp_file == empty_file.name # Inject arbitrary config arguments (after creating model) with open_dict(cfg.model): cfg.model.xyz = "abc" # Save test (with overriden config as OmegaConf object) model_copy = self.__test_restore_elsewhere(model, map_location='cpu', override_config_path=cfg) # Restore test diff = model.w.weight - model_copy.w.weight assert diff.mean() <= 1e-9 assert model_copy.temp_data == ["*****\n"] # Test that new config has arbitrary content assert model_copy.cfg.xyz == "abc" @pytest.mark.unit def test_mock_restore_from_config_override_with_yaml(self): with tempfile.NamedTemporaryFile('w') as empty_file, tempfile.NamedTemporaryFile('w') as config_file: # Write some data empty_file.writelines(["*****\n"]) empty_file.flush() # Update config cfg = _mock_model_config() cfg.model.temp_file = empty_file.name # Create model model = MockModel(cfg=cfg.model, trainer=None) model = model.to('cpu') assert model.temp_file == empty_file.name # Inject arbitrary config arguments (after creating model) with open_dict(cfg.model): cfg.model.xyz = "abc" # Write new config into file OmegaConf.save(cfg, config_file) # Save test (with overriden config as OmegaConf object) model_copy = self.__test_restore_elsewhere( model, map_location='cpu', override_config_path=config_file.name ) # Restore test diff = model.w.weight - model_copy.w.weight assert diff.mean() <= 1e-9 assert filecmp.cmp(model.temp_file, model_copy.temp_file) assert model_copy.temp_data == ["*****\n"] # Test that new config has arbitrary content assert model_copy.cfg.xyz == "abc" @pytest.mark.unit def test_mock_save_to_restore_from_with_target_class(self): with tempfile.NamedTemporaryFile('w') as empty_file: # Write some data empty_file.writelines(["*****\n"]) empty_file.flush() # Update config cfg = _mock_model_config() cfg.model.temp_file = empty_file.name # Create model model = MockModel(cfg=cfg.model, trainer=None) model = model.to('cpu') # type: MockModel assert model.temp_file == empty_file.name # Save file using MockModel with tempfile.TemporaryDirectory() as save_folder: save_path = os.path.join(save_folder, "temp.nemo") model.save_to(save_path) # Restore test (using ModelPT as restorer) # This forces the target class = MockModel to be used as resolver model_copy = ModelPT.restore_from(save_path, map_location='cpu') # because of caching - cache gets prepended assert os.path.basename(model_copy.temp_file).endswith(os.path.basename(model.temp_file)) # assert filecmp.cmp(model.temp_file, model_copy.temp_file) # Restore test diff = model.w.weight - model_copy.w.weight assert diff.mean() <= 1e-9 assert isinstance(model_copy, MockModel) assert model_copy.temp_data == ["*****\n"] @pytest.mark.unit def test_mock_save_to_restore_from_multiple_models(self): with tempfile.NamedTemporaryFile('w') as empty_file, tempfile.NamedTemporaryFile('w') as empty_file2: # Write some data empty_file.writelines(["*****\n"]) empty_file.flush() empty_file2.writelines(["+++++\n"]) empty_file2.flush() # Update config + create ,pde;s cfg = _mock_model_config() cfg.model.temp_file = empty_file.name cfg2 = _mock_model_config() cfg2.model.temp_file = empty_file2.name # Create models model = MockModel(cfg=cfg.model, trainer=None) model = model.to('cpu') model2 = MockModel(cfg=cfg2.model, trainer=None) model2 = model2.to('cpu') assert model.temp_file == empty_file.name assert model2.temp_file == empty_file2.name # Save test model_copy = self.__test_restore_elsewhere(model, map_location='cpu') model2_copy = self.__test_restore_elsewhere(model2, map_location='cpu') # Restore test assert model_copy.temp_data == ["*****\n"] assert model2_copy.temp_data == ["+++++\n"] @pytest.mark.unit def test_mock_save_to_restore_from_multiple_models_inverted_order(self): with tempfile.NamedTemporaryFile('w') as empty_file, tempfile.NamedTemporaryFile('w') as empty_file2: # Write some data empty_file.writelines(["*****\n"]) empty_file.flush() empty_file2.writelines(["+++++\n"]) empty_file2.flush() # Update config + create ,pde;s cfg = _mock_model_config() cfg.model.temp_file = empty_file.name cfg2 = _mock_model_config() cfg2.model.temp_file = empty_file2.name # Create models model = MockModel(cfg=cfg.model, trainer=None) model = model.to('cpu') model2 = MockModel(cfg=cfg2.model, trainer=None) model2 = model2.to('cpu') assert model.temp_file == empty_file.name assert model2.temp_file == empty_file2.name # Save test (inverted order) model2_copy = self.__test_restore_elsewhere(model2, map_location='cpu') model_copy = self.__test_restore_elsewhere(model, map_location='cpu') # Restore test assert model_copy.temp_data == ["*****\n"] assert model2_copy.temp_data == ["+++++\n"] @pytest.mark.unit def test_mock_save_to_restore_chained(self): with tempfile.NamedTemporaryFile('w') as empty_file, tempfile.NamedTemporaryFile('w') as empty_file2: # Write some data empty_file.writelines(["*****\n"]) empty_file.flush() # Update config + create ,pde;s cfg = _mock_model_config() cfg.model.temp_file = empty_file.name # Create models model = MockModel(cfg=cfg.model, trainer=None) model = model.to('cpu') assert model.temp_file == empty_file.name def save_copy(model, save_folder, restore_folder): # Where model will be saved model_save_path = os.path.join(save_folder, f"{model.__class__.__name__}.nemo") model.save_to(save_path=model_save_path) # Where model will be restored from model_restore_path = os.path.join(restore_folder, f"{model.__class__.__name__}.nemo") shutil.copy(model_save_path, model_restore_path) return model_restore_path # Save test with tempfile.TemporaryDirectory() as level4: with tempfile.TemporaryDirectory() as level3: with tempfile.TemporaryDirectory() as level2: with tempfile.TemporaryDirectory() as level1: path = save_copy(model, level1, level2) model_copy2 = model.__class__.restore_from(path) path = save_copy(model_copy2, level2, level3) model_copy3 = model.__class__.restore_from(path) path = save_copy(model_copy3, level3, level4) model_copy = model.__class__.restore_from(path) # Restore test assert model_copy.temp_data == ["*****\n"] # AppState test appstate = AppState() metadata = appstate.get_model_metadata_from_guid(model_copy.model_guid) assert metadata.guid != model.model_guid assert metadata.restoration_path == path @pytest.mark.unit def test_mock_save_to_multiple_times(self): with tempfile.NamedTemporaryFile('w') as empty_file, tempfile.TemporaryDirectory() as tmpdir: # Write some data empty_file.writelines(["*****\n"]) empty_file.flush() # Update config cfg = _mock_model_config() cfg.model.temp_file = empty_file.name # Create model model = MockModel(cfg=cfg.model, trainer=None) # type: MockModel model = model.to('cpu') assert model.temp_file == empty_file.name # Save test model.save_to(os.path.join(tmpdir, 'save_0.nemo')) model.save_to(os.path.join(tmpdir, 'save_1.nemo')) model.save_to(os.path.join(tmpdir, 'save_2.nemo')) @pytest.mark.unit def test_multiple_model_save_restore_connector(self): class MySaveRestoreConnector(save_restore_connector.SaveRestoreConnector): def save_to(self, model, save_path: str): save_path = save_path.replace(".nemo", "_XYZ.nemo") super(MySaveRestoreConnector, self).save_to(model, save_path) with tempfile.TemporaryDirectory() as tmpdir: # Update config cfg = _mock_model_config() # Create model model = MockModel(cfg=cfg.model, trainer=None) model_with_custom_connector = MockModel(cfg=cfg.model, trainer=None) model_with_custom_connector._save_restore_connector = MySaveRestoreConnector() model_with_custom_connector.save_to(os.path.join(tmpdir, 'save_custom.nemo')) assert os.path.exists(os.path.join(tmpdir, 'save_custom_XYZ.nemo')) assert isinstance(model._save_restore_connector, save_restore_connector.SaveRestoreConnector) assert isinstance(model_with_custom_connector._save_restore_connector, MySaveRestoreConnector) assert type(MockModel._save_restore_connector) == save_restore_connector.SaveRestoreConnector @pytest.mark.unit def test_restore_from_save_restore_connector(self): class MySaveRestoreConnector(save_restore_connector.SaveRestoreConnector): def save_to(self, model, save_path: str): save_path = save_path.replace(".nemo", "_XYZ.nemo") super().save_to(model, save_path) class MockModelV2(MockModel): pass with tempfile.TemporaryDirectory() as tmpdir: # Update config cfg = _mock_model_config() # Create model save_path = os.path.join(tmpdir, 'save_custom.nemo') model_with_custom_connector = MockModel(cfg=cfg.model, trainer=None) model_with_custom_connector._save_restore_connector = MySaveRestoreConnector() model_with_custom_connector.save_to(save_path) assert os.path.exists(os.path.join(tmpdir, 'save_custom_XYZ.nemo')) restored_model = MockModelV2.restore_from( save_path.replace(".nemo", "_XYZ.nemo"), save_restore_connector=MySaveRestoreConnector() ) assert type(restored_model) == MockModelV2 assert type(restored_model._save_restore_connector) == MySaveRestoreConnector @pytest.mark.unit def test_mock_model_model_collision(self): # The usual pipeline is working just fine. cfg = _mock_model_config() model = MockModel(cfg=cfg.model, trainer=None) # type: MockModel model = model.to('cpu') # Let's create a custom config with a 'model.model' node. cfg = _mock_model_config() OmegaConf.set_struct(cfg, False) cfg.model.model = 'aaa' OmegaConf.set_struct(cfg, True) # Failing due to collision. with pytest.raises(ValueError, match="Creating model config node is forbidden"): model = MockModel(cfg=cfg.model, trainer=None) # type: MockModel model = model.to('cpu') @pytest.mark.unit @pytest.mark.parametrize("change_child_number", [False, True]) @pytest.mark.parametrize("child2_model_from_path", [False, True]) def test_mock_model_nested(self, change_child_number: bool, child2_model_from_path: bool): """ Test model with 2 children Model and each child can be saved/restored separately Model is constructed using saved child models (.nemo checkpoints) Args: change_child_number: if change_child_number is True, child model changes its config without notifying parent model, and saved parent model should handle this correctly. child2_model_from_path: if child2_model_from_path is True, child2 model is restored from .nemo checkpoint, otherwise constructed directly from config. Child1 model always loaded from checkpoint. """ # children - models without sub-children cfg_child1 = _mock_model_config() cfg_child2 = _mock_model_with_children_config() # no children # Create models child1 = MockModel(cfg=cfg_child1.model, trainer=None) child1 = child1.to('cpu') with tempfile.TemporaryDirectory() as tmpdir_parent: parent_path = os.path.join(tmpdir_parent, "parent.nemo") with tempfile.TemporaryDirectory() as tmpdir_child: # save children child1_path = os.path.join(tmpdir_child, 'child1.nemo') child1.save_to(child1_path) if child2_model_from_path: child2 = MockModelWithChildren(cfg=cfg_child2.model, trainer=None) child2 = child2.to('cpu') child2_path = os.path.join(tmpdir_child, 'child2.nemo') child2.save_to(child2_path) # create model with children using saved .nemo model checkpoints cfg_parent = _mock_model_with_children_config( child1_model_path=child1_path, child2_model_path=child2_path ) else: # child 2 model will be directly constructed from subconfig cfg_parent = _mock_model_with_children_config( child1_model_path=child1_path, child2_model_path=None, child2_model_cfg=cfg_child2.get("model") ) parent = MockModelWithChildren(cfg_parent.model) if change_child_number: parent.child2_model.change_stub_number(10) parent.save_to(parent_path) # restore, separate children checkpoints are not available here (tmpdir_child destroyed) parent = ModelPT.restore_from(parent_path) # check model is transparent, child models can be accessed and can be saved/restored separately _ = self.__test_restore_elsewhere(parent.child1_model, map_location='cpu') child2 = self.__test_restore_elsewhere(parent.child2_model, map_location='cpu') if change_child_number: assert child2.cfg.stub_number == 10 # check model itself can be saved/restored parent = self.__test_restore_elsewhere(parent, map_location='cpu') if change_child_number: assert parent.child2_model.cfg.stub_number == 10 @pytest.mark.unit @pytest.mark.parametrize("change_child_resource", [False, True]) @pytest.mark.parametrize("child2_model_from_path", [False, True]) def test_mock_model_nested_with_resources(self, change_child_resource: bool, child2_model_from_path: bool): """ Test nested model with 2 children: model and each child can be saved/restored separately child models and parent model itself contain resources Args: change_child_resource: if change_child_resource is True, child model resources are changed after instantiation parent model. child2_model_from_path: if child2_model_from_path is True, child2 model is restored from .nemo checkpoint, otherwise constructed directly from config. Child1 model always loaded from checkpoint. """ with tempfile.NamedTemporaryFile('w') as file_child1, tempfile.NamedTemporaryFile( 'w' ) as file_child2, tempfile.NamedTemporaryFile('w') as file_child2_other, tempfile.NamedTemporaryFile( 'w' ) as file_parent: # write text data, use these files as resources parent_data = ["*****\n"] child1_data = ["+++++\n"] child2_data = ["-----\n"] child2_data_other = [".....\n"] file_parent.writelines(parent_data) file_parent.flush() file_child1.writelines(child1_data) file_child1.flush() file_child2.writelines(child2_data) file_child2.flush() file_child2_other.writelines(child2_data_other) file_child2_other.flush() # construct child models with resources # create configs cfg_child1 = _mock_model_config() cfg_child1.model.temp_file = file_child1.name cfg_child2 = _mock_model_with_children_config() # no sub-children cfg_child2.model.temp_file = file_child2.name # create child models child1 = MockModel(cfg=cfg_child1.model, trainer=None) child1 = child1.to('cpu') with tempfile.TemporaryDirectory() as tmpdir_parent: parent_path = os.path.join(tmpdir_parent, "parent.nemo") with tempfile.TemporaryDirectory() as tmpdir_child: # save children child1_path = os.path.join(tmpdir_child, 'child1.nemo') child1.save_to(child1_path) if child2_model_from_path: child2 = MockModelWithChildren(cfg=cfg_child2.model, trainer=None) child2 = child2.to('cpu') child2_path = os.path.join(tmpdir_child, 'child2.nemo') child2.save_to(child2_path) # create model with children using saved .nemo model checkpoints cfg_parent = _mock_model_with_children_config( child1_model_path=child1_path, child2_model_path=child2_path ) else: # child 2 model will be directly constructed from subconfig cfg_parent = _mock_model_with_children_config( child1_model_path=child1_path, child2_model_path=None, child2_model_cfg=cfg_child2.get("model"), ) cfg_parent.model.temp_file = file_parent.name # add resource parent = MockModelWithChildren(cfg_parent.model) if change_child_resource: parent.child2_model.setup_data_from_file(file_child2_other.name) parent.save_to(parent_path) # restore, separate children checkpoints are not available here (tmpdir_child destroyed) parent = ModelPT.restore_from(parent_path) # check model is transparent, child models can be accessed and can be saved/restored separately child1 = self.__test_restore_elsewhere(parent.child1_model, map_location='cpu') child2 = self.__test_restore_elsewhere(parent.child2_model, map_location='cpu') # test parent save/restore parent = self.__test_restore_elsewhere(parent, map_location='cpu') # test resources # check separately restored child models assert child1.temp_data == child1_data if change_child_resource: assert child2.temp_data == child2_data_other else: assert child2.temp_data == child2_data # test parent model + child models assert parent.temp_data == parent_data assert parent.child1_model.temp_data == child1_data if change_child_resource: assert parent.child2_model.temp_data == child2_data_other else: assert parent.child2_model.temp_data == child2_data @pytest.mark.unit def test_mock_model_nested_with_resources_multiple_passes(self): """ Test nested model with 2 children: multiple save-restore passes child models and parent model itself contain resources """ with tempfile.NamedTemporaryFile('w') as file_child1, tempfile.NamedTemporaryFile( 'w' ) as file_child2, tempfile.NamedTemporaryFile('w') as file_child2_other, tempfile.NamedTemporaryFile( 'w' ) as file_parent: # write text data, use these files as resources parent_data = ["*****\n"] child1_data = ["+++++\n"] child2_data = ["-----\n"] child2_data_other = [".....\n"] file_parent.writelines(parent_data) file_parent.flush() file_child1.writelines(child1_data) file_child1.flush() file_child2.writelines(child2_data) file_child2.flush() file_child2_other.writelines(child2_data_other) file_child2_other.flush() # construct child models with resources # create configs cfg_child1 = _mock_model_config() cfg_child1.model.temp_file = file_child1.name cfg_child2 = _mock_model_with_children_config() # no sub-children cfg_child2.model.temp_file = file_child2.name # create child models child1 = MockModel(cfg=cfg_child1.model, trainer=None) child1 = child1.to('cpu') child2 = MockModelWithChildren(cfg=cfg_child2.model, trainer=None) child2 = child2.to('cpu') with tempfile.TemporaryDirectory() as tmpdir_parent1, tempfile.TemporaryDirectory() as tmpdir_parent2, tempfile.TemporaryDirectory() as tmpdir_parent3, tempfile.TemporaryDirectory() as tmpdir_parent4: parent_path1 = os.path.join(tmpdir_parent1, "parent.nemo") parent_path2 = os.path.join(tmpdir_parent2, "parent.nemo") with tempfile.TemporaryDirectory() as tmpdir_child: # save children child1_path = os.path.join(tmpdir_child, 'child1.nemo') child1.save_to(child1_path) child2_path = os.path.join(tmpdir_child, 'child2.nemo') child2.save_to(child2_path) # create model with children using saved "nemo" checkpoints cfg_parent = _mock_model_with_children_config( child1_model_path=child1_path, child2_model_path=child2_path ) cfg_parent.model.temp_file = file_parent.name # add resource parent = MockModelWithChildren(cfg_parent.model) # save-restore first pass # save to different locations parent.save_to(parent_path1) parent.save_to(parent_path2) # restore, separate children checkpoints are not available here (tmpdir_child destroyed) parent1 = ModelPT.restore_from(parent_path1) parent2 = ModelPT.restore_from(parent_path2) # check resources for parent in (parent1, parent2): assert parent.temp_data == parent_data assert parent.child1_model.temp_data == child1_data assert parent.child2_model.temp_data == child2_data del parent2 # use parent1 for second pass # save-restore second pass parent_path3 = os.path.join(tmpdir_parent3, "parent.nemo") parent_path4 = os.path.join(tmpdir_parent4, "parent.nemo") parent1.save_to(parent_path3) parent1.save_to(parent_path4) parent3 = ModelPT.restore_from(parent_path3) parent4 = ModelPT.restore_from(parent_path4) # check resources for parent in (parent3, parent4): assert parent.temp_data == parent_data assert parent.child1_model.temp_data == child1_data assert parent.child2_model.temp_data == child2_data @pytest.mark.unit def test_mock_model_nested_double_with_resources(self): """ test nested model: parent -> child_with_child -> child; model and each child can be saved/restored separately all models can contain resources """ with tempfile.NamedTemporaryFile('w') as file_child, tempfile.NamedTemporaryFile( 'w' ) as file_child_with_child, tempfile.NamedTemporaryFile('w') as file_parent: # write text data, use these files as resources parent_data = ["*****\n"] child_with_child_data = ["+++++\n"] child_data = ["-----\n"] file_parent.writelines(parent_data) file_parent.flush() file_child_with_child.writelines(child_with_child_data) file_child_with_child.flush() file_child.writelines(child_data) file_child.flush() # construct child model (leaf) with resource cfg_child = _mock_model_config() cfg_child.model.temp_file = file_child.name child = MockModel(cfg=cfg_child.model, trainer=None) child = child.to('cpu') with tempfile.TemporaryDirectory() as tmpdir_parent: parent_path = os.path.join(tmpdir_parent, "parent.nemo") with tempfile.TemporaryDirectory() as tmpdir_child_with_child: child_with_child_path = os.path.join(tmpdir_child_with_child, 'child_with_child.nemo') with tempfile.TemporaryDirectory() as tmpdir_child: # save child child_path = os.path.join(tmpdir_child, 'child.nemo') child.save_to(child_path) # create child model with child cfg_child_with_child = _mock_model_with_children_config( child1_model_path=None, child2_model_path=child_path ) cfg_child_with_child.model.temp_file = file_child_with_child.name child_with_child = MockModelWithChildren(cfg_child_with_child.model) child_with_child.save_to(child_with_child_path) # create parent model with child-with-child, leaf checkpoint is not available here cfg_parent = _mock_model_with_children_config( child1_model_path=None, child2_model_path=child_with_child_path ) cfg_parent.model.temp_file = file_parent.name parent = MockModelWithChildren(cfg_parent.model) parent.save_to(parent_path) # restore, separate children checkpoints are not available here # tmpdir_child, tmpdir_child_with_child are destroyed parent = ModelPT.restore_from(parent_path) # model is transparent, children and model itself can be saved/restored child = self.__test_restore_elsewhere(parent.child2_model.child2_model, map_location='cpu') child_with_child = self.__test_restore_elsewhere(parent.child2_model, map_location='cpu') parent = self.__test_restore_elsewhere(parent, map_location='cpu') # test resources for all restored models # leaf model assert child.temp_data == child_data # child with child assert child_with_child.temp_data == child_with_child_data assert child_with_child.child2_model.temp_data == child_data # parent assert parent.temp_data == parent_data assert parent.child2_model.temp_data == child_with_child_data assert parent.child2_model.child2_model.temp_data == child_data # check named_nemo_modules: parent -> child2 -> child2.child2, # tuples of (attribute_path, cfg_path, module) named_nemo_modules = list(parent.named_nemo_modules()) etalon_nemo_modules = [ ("", "", parent), ("child2_model", "child2_model", parent.child2_model), ("child2_model.child2_model", "child2_model.child2_model", parent.child2_model.child2_model), ] assert len(named_nemo_modules) == len(etalon_nemo_modules) for etalon, actual in zip(etalon_nemo_modules, named_nemo_modules): assert etalon[0] == actual[0] assert etalon[1] == actual[1] assert etalon[2] is actual[2] @pytest.mark.unit @pytest.mark.with_downloads def test_mock_model_nested_child_from_pretrained(self): """ Test nested model with child initialized from pretrained model """ cfg = _mock_model_with_child_encdecctcbpe_config("stt_en_conformer_ctc_small") parent = MockModelWithChildEncDecCTCBPE(cfg=cfg.model, trainer=None) with tempfile.TemporaryDirectory() as tmpdir_parent: parent_path = os.path.join(tmpdir_parent, "parent.nemo") # save, then restore parent.save_to(parent_path) parent = ModelPT.restore_from(parent_path) # test child can be saved/restored _ = self.__test_restore_elsewhere(parent.ctc_model, map_location='cpu') # test parent can be saved/restored parent = self.__test_restore_elsewhere(parent, map_location='cpu') assert isinstance(parent.ctc_model, EncDecCTCModel) @pytest.mark.unit def test_mock_model_nested_custom_config_field(self): """ Test nested model with custom config field not equal to attribute name Config is stored in `child1_model_config` Child model is stored in `child1_model` attribute """ with tempfile.NamedTemporaryFile('w') as file_child1, tempfile.NamedTemporaryFile('w') as file_parent: # write text data, use these files as resources parent_data = ["*****\n"] child1_data = ["+++++\n"] file_parent.writelines(parent_data) file_parent.flush() file_child1.writelines(child1_data) file_child1.flush() cfg = _mock_model_with_child_custom_config_path_config() cfg.model.temp_file = file_parent.name cfg.model.child1_model_config.temp_file = file_child1.name # construct parent model parent = MockModelWithChildCustomConfigPath(cfg=cfg.model, trainer=None) with tempfile.TemporaryDirectory() as tmpdir_parent: parent_path = os.path.join(tmpdir_parent, "parent.nemo") # save, then restore parent.save_to(parent_path) parent = ModelPT.restore_from(parent_path) # test child can be saved/restored _ = self.__test_restore_elsewhere(parent.child1_model, map_location='cpu') # test parent can be saved/restored parent = self.__test_restore_elsewhere(parent, map_location='cpu') # check data assert parent.temp_data == parent_data assert parent.child1_model.temp_data == child1_data # check named_nemo_modules: parent -> child, tuples of (attribute_path, cfg_path, module) named_nemo_modules = list(parent.named_nemo_modules()) etalon_nemo_modules = [("", "", parent), ("child1_model", "child1_model_config", parent.child1_model)] assert len(named_nemo_modules) == len(etalon_nemo_modules) for etalon, actual in zip(etalon_nemo_modules, named_nemo_modules): assert etalon[0] == actual[0] assert etalon[1] == actual[1] assert etalon[2] is actual[2] @pytest.mark.unit def test_using_nemo_checkpoint_as_artifact_disallowed(self): """ Test that using nemo checkpoint as artifact is disallowed """ cfg_child = _mock_model_config() child = MockModel(cfg=cfg_child.model, trainer=None).to("cpu") with tempfile.TemporaryDirectory() as tmpdir: child_path = os.path.join(tmpdir, "child.nemo") child.save_to(child_path) cfg_parent = _mock_model_incorrect_with_nemo_artifact_config(child_path) with pytest.raises(NeMoBaseException): # registering .nemo checkpoint as an artifact is not allowed _ = MockModelIncorrectWithNemoArtifact(cfg=cfg_parent.model, trainer=None) @pytest.mark.unit def test_restore_from_save_restore_connector_extracted_dir(self): class MySaveRestoreConnector(save_restore_connector.SaveRestoreConnector): def save_to(self, model, save_path: str): save_path = save_path.replace(".nemo", "_XYZ.nemo") super().save_to(model, save_path) class MockModelV2(MockModel): pass with tempfile.TemporaryDirectory() as extracted_tempdir: with tempfile.TemporaryDirectory() as tmpdir: # Update config cfg = _mock_model_config() # Create model save_path = os.path.join(tmpdir, 'save_custom.nemo') model_with_custom_connector = MockModel(cfg=cfg.model, trainer=None) model_with_custom_connector._save_restore_connector = MySaveRestoreConnector() model_with_custom_connector.save_to(save_path) nemo_filepath = os.path.join(tmpdir, 'save_custom_XYZ.nemo') assert os.path.exists(nemo_filepath) # extract the contents to this dir apriori # simulate by extracting now before calling restore_from connector = MySaveRestoreConnector() MySaveRestoreConnector._unpack_nemo_file(nemo_filepath, extracted_tempdir) assert get_size(extracted_tempdir) > 0 # delete the old directory and preserve only the new extracted directory (escape scope of old dir) # next, set the model's extracted directory path connector.model_extracted_dir = extracted_tempdir # note, we pass in the "old" nemo_filepath, stored somewhere other than the extracted directory # this nemo_filepath is no longer valid, and has been deleted. restored_model = MockModelV2.restore_from(nemo_filepath, save_restore_connector=connector) assert type(restored_model) == MockModelV2 assert type(restored_model._save_restore_connector) == MySaveRestoreConnector # assert models have correct restoration information and paths appstate = AppState() original_metadata = appstate.get_model_metadata_from_guid(model_with_custom_connector.model_guid) assert original_metadata.restoration_path is None restored_metadata = appstate.get_model_metadata_from_guid(restored_model.model_guid) assert restored_metadata.restoration_path is not None # assert that the restore path was the path of the pre-extracted directory # irrespective of whether an old `nemo_filepath` (which doesnt exist anymore) was passed to restore_from. assert extracted_tempdir in restored_metadata.restoration_path assert extracted_tempdir not in nemo_filepath assert not os.path.exists(nemo_filepath) # test for parameter equality model_with_custom_connector = model_with_custom_connector.to('cpu') restored_model = restored_model.to('cpu') original_state_dict = model_with_custom_connector.state_dict() restored_state_dict = restored_model.state_dict() for orig, restored in zip(original_state_dict.keys(), restored_state_dict.keys()): assert (original_state_dict[orig] - restored_state_dict[restored]).abs().mean() < 1e-6 @pytest.mark.unit def test_hf_model_filter(self): filt = ModelPT.get_hf_model_filter() assert isinstance(filt, ModelFilter) assert filt.library == 'nemo' @pytest.mark.with_downloads() @pytest.mark.unit def test_hf_model_info(self): filt = ModelPT.get_hf_model_filter() # check no override results model_infos = ModelPT.search_huggingface_models(model_filter=None) assert len(model_infos) > 0 # check with default override results (should match above) default_model_infos = ModelPT.search_huggingface_models(model_filter=filt) assert len(model_infos) == len(default_model_infos) @pytest.mark.with_downloads() @pytest.mark.unit def test_hf_model_info_with_card_data(self): filt = ModelPT.get_hf_model_filter() # check no override results model_infos = ModelPT.search_huggingface_models(model_filter=filt) assert len(model_infos) > 0 assert not hasattr(model_infos[0], 'cardData') # check overriden defaults filt.resolve_card_info = True model_infos = ModelPT.search_huggingface_models(model_filter=filt) assert len(model_infos) > 0 for info in model_infos: if hasattr(info, 'cardData'): assert info.cardData is not None break @pytest.mark.with_downloads() @pytest.mark.unit def test_hf_model_info_with_limited_results(self): filt = ModelPT.get_hf_model_filter() # check no override results model_infos = ModelPT.search_huggingface_models(model_filter=filt) assert len(model_infos) > 0 # check overriden defaults filt.limit_results = 5 new_model_infos = ModelPT.search_huggingface_models(model_filter=filt) assert len(new_model_infos) <= 5 assert len(new_model_infos) < len(model_infos)