diff --git a/spacy/language.py b/spacy/language.py index 80077bf69..fd616483b 100644 --- a/spacy/language.py +++ b/spacy/language.py @@ -739,6 +739,11 @@ class Language: ) ) pipe = source.get_pipe(source_name) + # There is no actual solution here. Either the component has the right + # name for the source pipeline or the component has the right name for + # the current pipeline. This prioritizes the current pipeline. + if hasattr(pipe, "name"): + pipe.name = name # Make sure the source config is interpolated so we don't end up with # orphaned variables in our final config source_config = source.config.interpolate() @@ -816,6 +821,7 @@ class Language: pipe_index = self._get_pipe_index(before, after, first, last) self._pipe_meta[name] = self.get_factory_meta(factory_name) self._components.insert(pipe_index, (name, pipe_component)) + self._link_components() return pipe_component def _get_pipe_index( @@ -951,6 +957,7 @@ class Language: if old_name in self._config["initialize"]["components"]: init_cfg = self._config["initialize"]["components"].pop(old_name) self._config["initialize"]["components"][new_name] = init_cfg + self._link_components() def remove_pipe(self, name: str) -> Tuple[str, PipeCallable]: """Remove a component from the pipeline. @@ -974,6 +981,7 @@ class Language: # Make sure the name is also removed from the set of disabled components if name in self.disabled: self._disabled.remove(name) + self._link_components() return removed def disable_pipe(self, name: str) -> None: @@ -1702,8 +1710,16 @@ class Language: # The problem is we need to do it during deserialization...And the # components don't receive the pipeline then. So this does have to be # here :( + # First, fix up all the internal component names in case they have + # gotten out of sync due to sourcing components from different + # pipelines, since find_listeners uses proc2.name for the listener + # map. + for name, proc in self.pipeline: + if hasattr(proc, "name"): + proc.name = name for i, (name1, proc1) in enumerate(self.pipeline): if isinstance(proc1, ty.ListenedToComponent): + proc1.listener_map = {} for name2, proc2 in self.pipeline[i + 1 :]: proc1.find_listeners(proc2) @@ -1837,6 +1853,7 @@ class Language: raw_config=raw_config, ) else: + assert "source" in pipe_cfg # We need the sourced components to reference the same # vocab without modifying the current vocab state **AND** # we still want to load the source model vectors to perform @@ -1856,6 +1873,10 @@ class Language: source_name = pipe_cfg.get("component", pipe_name) listeners_replaced = False if "replace_listeners" in pipe_cfg: + # Make sure that the listened-to component has the + # state of the source pipeline listener map so that the + # replace_listeners method below works as intended. + source_nlps[model]._link_components() for name, proc in source_nlps[model].pipeline: if source_name in getattr(proc, "listening_components", []): source_nlps[model].replace_listeners( @@ -1867,6 +1888,8 @@ class Language: nlp.add_pipe( source_name, source=source_nlps[model], name=pipe_name ) + # At this point after nlp.add_pipe, the listener map + # corresponds to the new pipeline. if model not in source_nlp_vectors_hashes: source_nlp_vectors_hashes[model] = hash( source_nlps[model].vocab.vectors.to_bytes( @@ -1921,27 +1944,6 @@ class Language: raise ValueError( Errors.E942.format(name="pipeline_creation", value=type(nlp)) ) - # Detect components with listeners that are not frozen consistently - for name, proc in nlp.pipeline: - if isinstance(proc, ty.ListenedToComponent): - # Remove listeners not in the pipeline - listener_names = proc.listening_components - unused_listener_names = [ - ll for ll in listener_names if ll not in nlp.pipe_names - ] - for listener_name in unused_listener_names: - for listener in proc.listener_map.get(listener_name, []): - proc.remove_listener(listener, listener_name) - - for listener_name in proc.listening_components: - # e.g. tok2vec/transformer - # If it's a component sourced from another pipeline, we check if - # the tok2vec listeners should be replaced with standalone tok2vec - # models (e.g. so component can be frozen without its performance - # degrading when other components/tok2vec are updated) - paths = sourced.get(listener_name, {}).get("replace_listeners", []) - if paths: - nlp.replace_listeners(name, listener_name, paths) return nlp def replace_listeners( diff --git a/spacy/tests/pipeline/test_tok2vec.py b/spacy/tests/pipeline/test_tok2vec.py index 76c7d6f62..998f0472c 100644 --- a/spacy/tests/pipeline/test_tok2vec.py +++ b/spacy/tests/pipeline/test_tok2vec.py @@ -192,8 +192,7 @@ def test_tok2vec_listener(with_vectors): for tag in t[1]["tags"]: tagger.add_label(tag) - # Check that the Tok2Vec component finds it listeners - assert tok2vec.listeners == [] + # Check that the Tok2Vec component finds its listeners optimizer = nlp.initialize(lambda: train_examples) assert tok2vec.listeners == [tagger_tok2vec] @@ -221,7 +220,6 @@ def test_tok2vec_listener_callback(): assert nlp.pipe_names == ["tok2vec", "tagger"] tagger = nlp.get_pipe("tagger") tok2vec = nlp.get_pipe("tok2vec") - nlp._link_components() docs = [nlp.make_doc("A random sentence")] tok2vec.model.initialize(X=docs) gold_array = [[1.0 for tag in ["V", "Z"]] for word in docs] @@ -430,29 +428,46 @@ def test_replace_listeners_from_config(): nlp.to_disk(dir_path) base_model = str(dir_path) new_config = { - "nlp": {"lang": "en", "pipeline": ["tok2vec", "tagger", "ner"]}, + "nlp": { + "lang": "en", + "pipeline": ["tok2vec", "tagger2", "ner3", "tagger4"], + }, "components": { "tok2vec": {"source": base_model}, - "tagger": { + "tagger2": { "source": base_model, + "component": "tagger", "replace_listeners": ["model.tok2vec"], }, - "ner": {"source": base_model}, + "ner3": { + "source": base_model, + "component": "ner", + }, + "tagger4": { + "source": base_model, + "component": "tagger", + }, }, } new_nlp = util.load_model_from_config(new_config, auto_fill=True) new_nlp.initialize(lambda: examples) tok2vec = new_nlp.get_pipe("tok2vec") - tagger = new_nlp.get_pipe("tagger") - ner = new_nlp.get_pipe("ner") - assert tok2vec.listening_components == ["ner"] + tagger = new_nlp.get_pipe("tagger2") + ner = new_nlp.get_pipe("ner3") + assert "ner" not in new_nlp.pipe_names + assert "tagger" not in new_nlp.pipe_names + assert tok2vec.listening_components == ["ner3", "tagger4"] assert any(isinstance(node, Tok2VecListener) for node in ner.model.walk()) assert not any(isinstance(node, Tok2VecListener) for node in tagger.model.walk()) t2v_cfg = new_nlp.config["components"]["tok2vec"]["model"] assert t2v_cfg["@architectures"] == "spacy.Tok2Vec.v2" - assert new_nlp.config["components"]["tagger"]["model"]["tok2vec"] == t2v_cfg + assert new_nlp.config["components"]["tagger2"]["model"]["tok2vec"] == t2v_cfg assert ( - new_nlp.config["components"]["ner"]["model"]["tok2vec"]["@architectures"] + new_nlp.config["components"]["ner3"]["model"]["tok2vec"]["@architectures"] + == "spacy.Tok2VecListener.v1" + ) + assert ( + new_nlp.config["components"]["tagger4"]["model"]["tok2vec"]["@architectures"] == "spacy.Tok2VecListener.v1" ) @@ -544,3 +559,57 @@ def test_tok2vec_listeners_textcat(): assert cats1["imperative"] < 0.9 assert [t.tag_ for t in docs[0]] == ["V", "J", "N"] assert [t.tag_ for t in docs[1]] == ["N", "V", "J", "N"] + + +def test_tok2vec_listener_source_link_name(): + """The component's internal name and the tok2vec listener map correspond + to the most recently modified pipeline. + """ + orig_config = Config().from_str(cfg_string_multi) + nlp1 = util.load_model_from_config(orig_config, auto_fill=True, validate=True) + assert nlp1.get_pipe("tok2vec").listening_components == ["tagger", "ner"] + + nlp2 = English() + nlp2.add_pipe("tok2vec", source=nlp1) + nlp2.add_pipe("tagger", name="tagger2", source=nlp1) + + # there is no way to have the component have the right name for both + # pipelines, right now the most recently modified pipeline is prioritized + assert nlp1.get_pipe("tagger").name == nlp2.get_pipe("tagger2").name == "tagger2" + + # there is no way to have the tok2vec have the right listener map for both + # pipelines, right now the most recently modified pipeline is prioritized + assert nlp2.get_pipe("tok2vec").listening_components == ["tagger2"] + nlp2.add_pipe("ner", name="ner3", source=nlp1) + assert nlp2.get_pipe("tok2vec").listening_components == ["tagger2", "ner3"] + nlp2.remove_pipe("ner3") + assert nlp2.get_pipe("tok2vec").listening_components == ["tagger2"] + nlp2.remove_pipe("tagger2") + assert nlp2.get_pipe("tok2vec").listening_components == [] + + # at this point the tok2vec component corresponds to nlp2 + assert nlp1.get_pipe("tok2vec").listening_components == [] + + # modifying the nlp1 pipeline syncs the tok2vec listener map back to nlp1 + nlp1.add_pipe("sentencizer") + assert nlp1.get_pipe("tok2vec").listening_components == ["tagger", "ner"] + + # modifying nlp2 syncs it back to nlp2 + nlp2.add_pipe("sentencizer") + assert nlp1.get_pipe("tok2vec").listening_components == [] + + +def test_tok2vec_listener_source_replace_listeners(): + orig_config = Config().from_str(cfg_string_multi) + nlp1 = util.load_model_from_config(orig_config, auto_fill=True, validate=True) + assert nlp1.get_pipe("tok2vec").listening_components == ["tagger", "ner"] + nlp1.replace_listeners("tok2vec", "tagger", ["model.tok2vec"]) + assert nlp1.get_pipe("tok2vec").listening_components == ["ner"] + + nlp2 = English() + nlp2.add_pipe("tok2vec", source=nlp1) + assert nlp2.get_pipe("tok2vec").listening_components == [] + nlp2.add_pipe("tagger", source=nlp1) + assert nlp2.get_pipe("tok2vec").listening_components == [] + nlp2.add_pipe("ner", name="ner2", source=nlp1) + assert nlp2.get_pipe("tok2vec").listening_components == ["ner2"] diff --git a/spacy/training/initialize.py b/spacy/training/initialize.py index 39dc06b9e..3a46b6632 100644 --- a/spacy/training/initialize.py +++ b/spacy/training/initialize.py @@ -76,7 +76,8 @@ def init_nlp(config: Config, *, use_gpu: int = -1) -> "Language": with nlp.select_pipes(enable=resume_components): logger.info("Resuming training for: %s", resume_components) nlp.resume_training(sgd=optimizer) - # Make sure that listeners are defined before initializing further + # Make sure that internal component names are synced and listeners are + # defined before initializing further nlp._link_components() with nlp.select_pipes(disable=[*frozen_components, *resume_components]): if T["max_epochs"] == -1: