From c067b5264cfb42b38f46068af7316297411b43d5 Mon Sep 17 00:00:00 2001 From: Adriane Boyd Date: Tue, 27 Jun 2023 10:47:07 +0200 Subject: [PATCH] Address issues with source with component names and replacing listeners (#12701) When sourcing a component, the object from the original pipeline is added to the new pipeline as the same object. This creates a situation where there are several attributes that cannot be in sync between the original pipeline and the new pipeline at the same time for this one object: * component.name * component.listener_map / component.listening_components for tok2vec and transformer When running replace_listeners on a component, the config is not updated correctly if the state of the component is incorrect for the current pipeline (in particular changes that should be applied from model.attrs["replace_listener_cfg"] as used in spacy-transformers) due to the fact that: * find_listeners relies on component.name to set the name in the listener_map * replace_listeners relies on listener_map to determine how to modify the configs In addition, there are several places where pipeline components are modified and the listener map and/or internal component names aren't currently updated. In cases where there is a component shared by two pipelines that cannot be in sync, this PR chooses to prioritize the most recently modified or initialized pipeline. There is no actual solution with the current source behavior that will make both pipelines usable, so the current pipeline is updated whenever components are added/renamed/removed or the pipeline is initialized for training. --- spacy/language.py | 44 +++++++------- spacy/tests/pipeline/test_tok2vec.py | 91 ++++++++++++++++++++++++---- spacy/training/initialize.py | 3 +- 3 files changed, 105 insertions(+), 33 deletions(-) diff --git a/spacy/language.py b/spacy/language.py index 80077bf69..fd616483b 100644 --- a/spacy/language.py +++ b/spacy/language.py @@ -739,6 +739,11 @@ class Language: ) ) pipe = source.get_pipe(source_name) + # There is no actual solution here. Either the component has the right + # name for the source pipeline or the component has the right name for + # the current pipeline. This prioritizes the current pipeline. + if hasattr(pipe, "name"): + pipe.name = name # Make sure the source config is interpolated so we don't end up with # orphaned variables in our final config source_config = source.config.interpolate() @@ -816,6 +821,7 @@ class Language: pipe_index = self._get_pipe_index(before, after, first, last) self._pipe_meta[name] = self.get_factory_meta(factory_name) self._components.insert(pipe_index, (name, pipe_component)) + self._link_components() return pipe_component def _get_pipe_index( @@ -951,6 +957,7 @@ class Language: if old_name in self._config["initialize"]["components"]: init_cfg = self._config["initialize"]["components"].pop(old_name) self._config["initialize"]["components"][new_name] = init_cfg + self._link_components() def remove_pipe(self, name: str) -> Tuple[str, PipeCallable]: """Remove a component from the pipeline. @@ -974,6 +981,7 @@ class Language: # Make sure the name is also removed from the set of disabled components if name in self.disabled: self._disabled.remove(name) + self._link_components() return removed def disable_pipe(self, name: str) -> None: @@ -1702,8 +1710,16 @@ class Language: # The problem is we need to do it during deserialization...And the # components don't receive the pipeline then. So this does have to be # here :( + # First, fix up all the internal component names in case they have + # gotten out of sync due to sourcing components from different + # pipelines, since find_listeners uses proc2.name for the listener + # map. + for name, proc in self.pipeline: + if hasattr(proc, "name"): + proc.name = name for i, (name1, proc1) in enumerate(self.pipeline): if isinstance(proc1, ty.ListenedToComponent): + proc1.listener_map = {} for name2, proc2 in self.pipeline[i + 1 :]: proc1.find_listeners(proc2) @@ -1837,6 +1853,7 @@ class Language: raw_config=raw_config, ) else: + assert "source" in pipe_cfg # We need the sourced components to reference the same # vocab without modifying the current vocab state **AND** # we still want to load the source model vectors to perform @@ -1856,6 +1873,10 @@ class Language: source_name = pipe_cfg.get("component", pipe_name) listeners_replaced = False if "replace_listeners" in pipe_cfg: + # Make sure that the listened-to component has the + # state of the source pipeline listener map so that the + # replace_listeners method below works as intended. + source_nlps[model]._link_components() for name, proc in source_nlps[model].pipeline: if source_name in getattr(proc, "listening_components", []): source_nlps[model].replace_listeners( @@ -1867,6 +1888,8 @@ class Language: nlp.add_pipe( source_name, source=source_nlps[model], name=pipe_name ) + # At this point after nlp.add_pipe, the listener map + # corresponds to the new pipeline. if model not in source_nlp_vectors_hashes: source_nlp_vectors_hashes[model] = hash( source_nlps[model].vocab.vectors.to_bytes( @@ -1921,27 +1944,6 @@ class Language: raise ValueError( Errors.E942.format(name="pipeline_creation", value=type(nlp)) ) - # Detect components with listeners that are not frozen consistently - for name, proc in nlp.pipeline: - if isinstance(proc, ty.ListenedToComponent): - # Remove listeners not in the pipeline - listener_names = proc.listening_components - unused_listener_names = [ - ll for ll in listener_names if ll not in nlp.pipe_names - ] - for listener_name in unused_listener_names: - for listener in proc.listener_map.get(listener_name, []): - proc.remove_listener(listener, listener_name) - - for listener_name in proc.listening_components: - # e.g. tok2vec/transformer - # If it's a component sourced from another pipeline, we check if - # the tok2vec listeners should be replaced with standalone tok2vec - # models (e.g. so component can be frozen without its performance - # degrading when other components/tok2vec are updated) - paths = sourced.get(listener_name, {}).get("replace_listeners", []) - if paths: - nlp.replace_listeners(name, listener_name, paths) return nlp def replace_listeners( diff --git a/spacy/tests/pipeline/test_tok2vec.py b/spacy/tests/pipeline/test_tok2vec.py index 76c7d6f62..998f0472c 100644 --- a/spacy/tests/pipeline/test_tok2vec.py +++ b/spacy/tests/pipeline/test_tok2vec.py @@ -192,8 +192,7 @@ def test_tok2vec_listener(with_vectors): for tag in t[1]["tags"]: tagger.add_label(tag) - # Check that the Tok2Vec component finds it listeners - assert tok2vec.listeners == [] + # Check that the Tok2Vec component finds its listeners optimizer = nlp.initialize(lambda: train_examples) assert tok2vec.listeners == [tagger_tok2vec] @@ -221,7 +220,6 @@ def test_tok2vec_listener_callback(): assert nlp.pipe_names == ["tok2vec", "tagger"] tagger = nlp.get_pipe("tagger") tok2vec = nlp.get_pipe("tok2vec") - nlp._link_components() docs = [nlp.make_doc("A random sentence")] tok2vec.model.initialize(X=docs) gold_array = [[1.0 for tag in ["V", "Z"]] for word in docs] @@ -430,29 +428,46 @@ def test_replace_listeners_from_config(): nlp.to_disk(dir_path) base_model = str(dir_path) new_config = { - "nlp": {"lang": "en", "pipeline": ["tok2vec", "tagger", "ner"]}, + "nlp": { + "lang": "en", + "pipeline": ["tok2vec", "tagger2", "ner3", "tagger4"], + }, "components": { "tok2vec": {"source": base_model}, - "tagger": { + "tagger2": { "source": base_model, + "component": "tagger", "replace_listeners": ["model.tok2vec"], }, - "ner": {"source": base_model}, + "ner3": { + "source": base_model, + "component": "ner", + }, + "tagger4": { + "source": base_model, + "component": "tagger", + }, }, } new_nlp = util.load_model_from_config(new_config, auto_fill=True) new_nlp.initialize(lambda: examples) tok2vec = new_nlp.get_pipe("tok2vec") - tagger = new_nlp.get_pipe("tagger") - ner = new_nlp.get_pipe("ner") - assert tok2vec.listening_components == ["ner"] + tagger = new_nlp.get_pipe("tagger2") + ner = new_nlp.get_pipe("ner3") + assert "ner" not in new_nlp.pipe_names + assert "tagger" not in new_nlp.pipe_names + assert tok2vec.listening_components == ["ner3", "tagger4"] assert any(isinstance(node, Tok2VecListener) for node in ner.model.walk()) assert not any(isinstance(node, Tok2VecListener) for node in tagger.model.walk()) t2v_cfg = new_nlp.config["components"]["tok2vec"]["model"] assert t2v_cfg["@architectures"] == "spacy.Tok2Vec.v2" - assert new_nlp.config["components"]["tagger"]["model"]["tok2vec"] == t2v_cfg + assert new_nlp.config["components"]["tagger2"]["model"]["tok2vec"] == t2v_cfg assert ( - new_nlp.config["components"]["ner"]["model"]["tok2vec"]["@architectures"] + new_nlp.config["components"]["ner3"]["model"]["tok2vec"]["@architectures"] + == "spacy.Tok2VecListener.v1" + ) + assert ( + new_nlp.config["components"]["tagger4"]["model"]["tok2vec"]["@architectures"] == "spacy.Tok2VecListener.v1" ) @@ -544,3 +559,57 @@ def test_tok2vec_listeners_textcat(): assert cats1["imperative"] < 0.9 assert [t.tag_ for t in docs[0]] == ["V", "J", "N"] assert [t.tag_ for t in docs[1]] == ["N", "V", "J", "N"] + + +def test_tok2vec_listener_source_link_name(): + """The component's internal name and the tok2vec listener map correspond + to the most recently modified pipeline. + """ + orig_config = Config().from_str(cfg_string_multi) + nlp1 = util.load_model_from_config(orig_config, auto_fill=True, validate=True) + assert nlp1.get_pipe("tok2vec").listening_components == ["tagger", "ner"] + + nlp2 = English() + nlp2.add_pipe("tok2vec", source=nlp1) + nlp2.add_pipe("tagger", name="tagger2", source=nlp1) + + # there is no way to have the component have the right name for both + # pipelines, right now the most recently modified pipeline is prioritized + assert nlp1.get_pipe("tagger").name == nlp2.get_pipe("tagger2").name == "tagger2" + + # there is no way to have the tok2vec have the right listener map for both + # pipelines, right now the most recently modified pipeline is prioritized + assert nlp2.get_pipe("tok2vec").listening_components == ["tagger2"] + nlp2.add_pipe("ner", name="ner3", source=nlp1) + assert nlp2.get_pipe("tok2vec").listening_components == ["tagger2", "ner3"] + nlp2.remove_pipe("ner3") + assert nlp2.get_pipe("tok2vec").listening_components == ["tagger2"] + nlp2.remove_pipe("tagger2") + assert nlp2.get_pipe("tok2vec").listening_components == [] + + # at this point the tok2vec component corresponds to nlp2 + assert nlp1.get_pipe("tok2vec").listening_components == [] + + # modifying the nlp1 pipeline syncs the tok2vec listener map back to nlp1 + nlp1.add_pipe("sentencizer") + assert nlp1.get_pipe("tok2vec").listening_components == ["tagger", "ner"] + + # modifying nlp2 syncs it back to nlp2 + nlp2.add_pipe("sentencizer") + assert nlp1.get_pipe("tok2vec").listening_components == [] + + +def test_tok2vec_listener_source_replace_listeners(): + orig_config = Config().from_str(cfg_string_multi) + nlp1 = util.load_model_from_config(orig_config, auto_fill=True, validate=True) + assert nlp1.get_pipe("tok2vec").listening_components == ["tagger", "ner"] + nlp1.replace_listeners("tok2vec", "tagger", ["model.tok2vec"]) + assert nlp1.get_pipe("tok2vec").listening_components == ["ner"] + + nlp2 = English() + nlp2.add_pipe("tok2vec", source=nlp1) + assert nlp2.get_pipe("tok2vec").listening_components == [] + nlp2.add_pipe("tagger", source=nlp1) + assert nlp2.get_pipe("tok2vec").listening_components == [] + nlp2.add_pipe("ner", name="ner2", source=nlp1) + assert nlp2.get_pipe("tok2vec").listening_components == ["ner2"] diff --git a/spacy/training/initialize.py b/spacy/training/initialize.py index 39dc06b9e..3a46b6632 100644 --- a/spacy/training/initialize.py +++ b/spacy/training/initialize.py @@ -76,7 +76,8 @@ def init_nlp(config: Config, *, use_gpu: int = -1) -> "Language": with nlp.select_pipes(enable=resume_components): logger.info("Resuming training for: %s", resume_components) nlp.resume_training(sgd=optimizer) - # Make sure that listeners are defined before initializing further + # Make sure that internal component names are synced and listeners are + # defined before initializing further nlp._link_components() with nlp.select_pipes(disable=[*frozen_components, *resume_components]): if T["max_epochs"] == -1: