Skip to content

nautobot.apps.jobs

Nautobot Jobs API.

nautobot.apps.jobs.BaseJob

Bases: Task

Base model for jobs.

Users can subclass this directly if they want to provide their own base class for implementing multiple jobs with shared functionality; if no such sharing is required, use Job class instead.

Jobs must define at minimum a run method.

Source code in nautobot/extras/jobs.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
class BaseJob(Task):
    """Base model for jobs.

    Users can subclass this directly if they want to provide their own base class for implementing multiple jobs
    with shared functionality; if no such sharing is required, use Job class instead.

    Jobs must define at minimum a run method.
    """

    class Meta:
        """
        Metaclass attributes - subclasses can define any or all of the following attributes:

        - name (str)
        - description (str)
        - hidden (bool)
        - field_order (list)
        - approval_required (bool)
        - soft_time_limit (int)
        - time_limit (int)
        - has_sensitive_variables (bool)
        - task_queues (list)
        """

    def __init__(self):
        self.logger = get_task_logger(self.__module__)

    def __call__(self, *args, **kwargs):
        # Attempt to resolve serialized data back into original form by creating querysets or model instances
        # If we fail to find any objects, we consider this a job execution error, and fail.
        # This might happen when a job sits on the queue for a while (i.e. scheduled) and data has changed
        # or it might be bad input from an API request, or manual execution.
        try:
            deserialized_kwargs = self.deserialize_data(kwargs)
        except Exception as err:
            raise RunJobTaskFailed("Error initializing job") from err
        if isinstance(self, JobHookReceiver):
            change_context = ObjectChangeEventContextChoices.CONTEXT_JOB_HOOK
        else:
            change_context = ObjectChangeEventContextChoices.CONTEXT_JOB

        with web_request_context(user=self.user, context_detail=self.class_path, context=change_context):
            if self.celery_kwargs.get("nautobot_job_profile", False) is True:
                import cProfile

                # TODO: This should probably be available as a file download rather than dumped to the hard drive.
                # Pending this: https://github.com/nautobot/nautobot/issues/3352
                profiling_path = f"{tempfile.gettempdir()}/nautobot-jobresult-{self.job_result.id}.pstats"
                self.logger.info(
                    "Writing profiling information to %s.", profiling_path, extra={"grouping": "initialization"}
                )

                with cProfile.Profile() as pr:
                    try:
                        output = self.run(*args, **deserialized_kwargs)
                    except Exception as err:
                        pr.dump_stats(profiling_path)
                        raise err
                    else:
                        pr.dump_stats(profiling_path)
                        return output
            else:
                return self.run(*args, **deserialized_kwargs)

    def __str__(self):
        return str(self.name)

    # See https://github.com/PyCQA/pylint-django/issues/240 for why we have a pylint disable on each classproperty below

    # TODO(jathan): Could be interesting for custom stuff when the Job is
    # enabled in the database and then therefore registered in Celery
    @classmethod
    def on_bound(cls, app):
        """Called when the task is bound to an app.

        Note:
            This class method can be defined to do additional actions when
            the task class is bound to an app.
        """

    # TODO(jathan): Could be interesting for showing the Job's class path as the
    # shadow name vs. the Celery task_name?
    def shadow_name(self, args, kwargs, options):
        """Override for custom task name in worker logs/monitoring.

        Example:
                from celery.utils.imports import qualname

                def shadow_name(task, args, kwargs, options):
                    return qualname(args[0])

                @app.task(shadow_name=shadow_name, serializer='pickle')
                def apply_function_async(fun, *args, **kwargs):
                    return fun(*args, **kwargs)

        Arguments:
            args (Tuple): Task positional arguments.
            kwargs (Dict): Task keyword arguments.
            options (Dict): Task execution options.
        """

    def before_start(self, task_id, args, kwargs):
        """Handler called before the task starts.

        Arguments:
            task_id (str): Unique id of the task to execute.
            args (Tuple): Original arguments for the task to execute.
            kwargs (Dict): Original keyword arguments for the task to execute.

        Returns:
            (None): The return value of this handler is ignored.
        """
        self.clear_cache()

        try:
            self.job_result
        except ObjectDoesNotExist as err:
            raise RunJobTaskFailed(f"Unable to find associated job result for job {task_id}") from err

        try:
            self.job_model
        except ObjectDoesNotExist as err:
            raise RunJobTaskFailed(f"Unable to find associated job model for job {task_id}") from err

        if not self.job_model.enabled:
            self.logger.error(
                "Job %s is not enabled to be run!",
                self.job_model,
                extra={"object": self.job_model, "grouping": "initialization"},
            )
            raise RunJobTaskFailed(f"Job {self.job_model} is not enabled to be run!")

        soft_time_limit = self.job_model.soft_time_limit or settings.CELERY_TASK_SOFT_TIME_LIMIT
        time_limit = self.job_model.time_limit or settings.CELERY_TASK_TIME_LIMIT
        if time_limit <= soft_time_limit:
            self.logger.warning(
                "The hard time limit of %s seconds is less than "
                "or equal to the soft time limit of %s seconds. "
                "This job will fail silently after %s seconds.",
                time_limit,
                soft_time_limit,
                time_limit,
                extra={"grouping": "initialization"},
            )

        self.logger.info("Running job", extra={"grouping": "initialization"})

    def run(self, *args, **kwargs):
        """
        Method invoked when this Job is run.
        """
        raise NotImplementedError("Jobs must define the run method.")

    def on_success(self, retval, task_id, args, kwargs):
        """Success handler.

        Run by the worker if the task executes successfully.

        Arguments:
            retval (Any): The return value of the task.
            task_id (str): Unique id of the executed task.
            args (Tuple): Original arguments for the executed task.
            kwargs (Dict): Original keyword arguments for the executed task.

        Returns:
            (None): The return value of this handler is ignored.
        """

    def on_retry(self, exc, task_id, args, kwargs, einfo):
        """Retry handler.

        This is run by the worker when the task is to be retried.

        Arguments:
            exc (Exception): The exception sent to :meth:`retry`.
            task_id (str): Unique id of the retried task.
            args (Tuple): Original arguments for the retried task.
            kwargs (Dict): Original keyword arguments for the retried task.
            einfo (~billiard.einfo.ExceptionInfo): Exception information.

        Returns:
            (None): The return value of this handler is ignored.
        """

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """Error handler.

        This is run by the worker when the task fails.

        Arguments:
            exc (Exception): The exception raised by the task.
            task_id (str): Unique id of the failed task.
            args (Tuple): Original arguments for the task that failed.
            kwargs (Dict): Original keyword arguments for the task that failed.
            einfo (~billiard.einfo.ExceptionInfo): Exception information.

        Returns:
            (None): The return value of this handler is ignored.
        """

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        """
        Handler called after the task returns.

        Parameters
            status - Current task state.
            retval - Task return value/exception.
            task_id - Unique id of the task.
            args - Original arguments for the task that returned.
            kwargs - Original keyword arguments for the task that returned.

        Keyword Arguments
            einfo - ExceptionInfo instance, containing the traceback (if any).

        Returns:
            (None): The return value of this handler is ignored.
        """

        # Cleanup FileProxy objects
        file_fields = list(self._get_file_vars())
        file_ids = [kwargs[f] for f in file_fields]
        if file_ids:
            self.delete_files(*file_ids)

        self.logger.info("Job completed", extra={"grouping": "post_run"})

        # TODO(gary): document this in job author docs
        # Super.after_return must be called for chords to function properly
        super().after_return(status, retval, task_id, args, kwargs, einfo=einfo)

    def apply(
        self,
        args=None,
        kwargs=None,
        link=None,
        link_error=None,
        task_id=None,
        retries=None,
        throw=None,
        logfile=None,
        loglevel=None,
        headers=None,
        **options,
    ):
        """Fix celery's apply method to propagate options to the task result"""
        # trace imports Task, so need to import inline.
        from celery.app.trace import build_tracer

        app = self._get_app()
        args = args or ()
        kwargs = kwargs or {}
        task_id = task_id or uuid()
        retries = retries or 0
        if throw is None:
            throw = app.conf.task_eager_propagates

        # Make sure we get the task instance, not class.
        task = app._tasks[self.name]

        request = {
            "id": task_id,
            "retries": retries,
            "is_eager": True,
            "logfile": logfile,
            "loglevel": loglevel or 0,
            "hostname": gethostname(),
            "callbacks": maybe_list(link),
            "errbacks": maybe_list(link_error),
            "headers": headers,
            "ignore_result": options.get("ignore_result", False),
            "delivery_info": {
                "is_eager": True,
                "exchange": options.get("exchange"),
                "routing_key": options.get("routing_key"),
                "priority": options.get("priority"),
            },
            "properties": options,  # one line fix to overloaded method
        }
        if "stamped_headers" in options:
            request["stamped_headers"] = maybe_list(options["stamped_headers"])
            request["stamps"] = {header: maybe_list(options.get(header, [])) for header in request["stamped_headers"]}

        tb = None
        tracer = build_tracer(
            task.name,
            task,
            eager=True,
            propagate=throw,
            app=self._get_app(),
        )
        ret = tracer(task_id, args, kwargs, request)
        retval = ret.retval
        if isinstance(retval, ExceptionInfo):
            retval, tb = retval.exception, retval.traceback
            if isinstance(retval, ExceptionWithTraceback):
                retval = retval.exc
        if isinstance(retval, Retry) and retval.sig is not None:
            return retval.sig.apply(retries=retries + 1)
        state = states.SUCCESS if ret.info is None else ret.info.state
        return EagerResult(task_id, retval, state, traceback=tb)

    @final
    @classproperty
    def file_path(cls) -> str:  # pylint: disable=no-self-argument
        return inspect.getfile(cls)

    @final
    @classproperty
    def class_path(cls) -> str:  # pylint: disable=no-self-argument
        """
        Unique identifier of a specific Job class, in the form <module_name>.<ClassName>.

        Examples:
        my_script.MyScript - Local Job
        nautobot.core.jobs.MySystemJob - System Job
        my_plugin.jobs.MyPluginJob - App-provided Job
        git_repository.jobs.myjob.MyJob - GitRepository Job
        """
        return f"{cls.__module__}.{cls.__name__}"

    @final
    @classproperty
    def class_path_dotted(cls) -> str:  # pylint: disable=no-self-argument
        """
        Dotted class_path, suitable for use in things like Python logger names.

        Deprecated as of Nautobot 2.0: just use .class_path instead.
        """
        return cls.class_path

    @final
    @classproperty
    def class_path_js_escaped(cls) -> str:  # pylint: disable=no-self-argument
        """
        Escape various characters so that the class_path can be used as a jQuery selector.
        """
        return cls.class_path.replace(".", r"\.")

    @final
    @classproperty
    def grouping(cls) -> str:  # pylint: disable=no-self-argument
        module = inspect.getmodule(cls)
        return getattr(module, "name", module.__name__)

    @final
    @classmethod
    def _get_meta_attr_and_assert_type(cls, attr_name, default, expected_type):
        result = getattr(cls.Meta, attr_name, default)
        if not isinstance(result, expected_type):
            raise TypeError(f"Meta.{attr_name} should be {expected_type}, not {type(result)}")
        return result

    @final
    @classproperty
    def name(cls) -> str:  # pylint: disable=no-self-argument
        return cls._get_meta_attr_and_assert_type("name", cls.__name__, expected_type=str)

    @final
    @classproperty
    def description(cls) -> str:  # pylint: disable=no-self-argument
        return dedent(cls._get_meta_attr_and_assert_type("description", "", expected_type=str)).strip()

    @final
    @classproperty
    def description_first_line(cls) -> str:  # pylint: disable=no-self-argument
        if cls.description:  # pylint: disable=using-constant-test
            return cls.description.splitlines()[0]
        return ""

    @final
    @classproperty
    def dryrun_default(cls) -> bool:  # pylint: disable=no-self-argument
        return cls._get_meta_attr_and_assert_type("dryrun_default", False, expected_type=bool)

    @final
    @classproperty
    def hidden(cls) -> bool:  # pylint: disable=no-self-argument
        return cls._get_meta_attr_and_assert_type("hidden", False, expected_type=bool)

    @final
    @classproperty
    def field_order(cls):  # pylint: disable=no-self-argument
        return cls._get_meta_attr_and_assert_type("field_order", [], expected_type=(list, tuple))

    @final
    @classproperty
    def read_only(cls) -> bool:  # pylint: disable=no-self-argument
        return cls._get_meta_attr_and_assert_type("read_only", False, expected_type=bool)

    @final
    @classproperty
    def approval_required(cls) -> bool:  # pylint: disable=no-self-argument
        return cls._get_meta_attr_and_assert_type("approval_required", False, expected_type=bool)

    @final
    @classproperty
    def soft_time_limit(cls) -> int:  # pylint: disable=no-self-argument
        return cls._get_meta_attr_and_assert_type("soft_time_limit", 0, expected_type=int)

    @final
    @classproperty
    def time_limit(cls) -> int:  # pylint: disable=no-self-argument
        return cls._get_meta_attr_and_assert_type("time_limit", 0, expected_type=int)

    @final
    @classproperty
    def has_sensitive_variables(cls) -> bool:  # pylint: disable=no-self-argument
        return cls._get_meta_attr_and_assert_type("has_sensitive_variables", True, expected_type=bool)

    @final
    @classproperty
    def supports_dryrun(cls) -> bool:  # pylint: disable=no-self-argument
        return isinstance(getattr(cls, "dryrun", None), DryRunVar)

    @final
    @classproperty
    def task_queues(cls) -> list:  # pylint: disable=no-self-argument
        return cls._get_meta_attr_and_assert_type("task_queues", [], expected_type=(list, tuple))

    @final
    @classproperty
    def properties_dict(cls) -> dict:  # pylint: disable=no-self-argument
        """
        Return all relevant classproperties as a dict.

        Used for convenient rendering into job_edit.html via the `json_script` template tag.
        """
        return {
            "name": cls.name,
            "grouping": cls.grouping,
            "description": cls.description,
            "approval_required": cls.approval_required,
            "hidden": cls.hidden,
            "soft_time_limit": cls.soft_time_limit,
            "time_limit": cls.time_limit,
            "has_sensitive_variables": cls.has_sensitive_variables,
            "task_queues": cls.task_queues,
        }

    @final
    @classproperty
    def registered_name(cls) -> str:  # pylint: disable=no-self-argument
        return f"{cls.__module__}.{cls.__name__}"

    @classmethod
    def _get_vars(cls):
        """
        Return dictionary of ScriptVariable attributes defined on this class and any base classes to the top of the inheritance chain.
        The variables are sorted in the order that they were defined, with variables defined on base classes appearing before subclass variables.
        """
        cls_vars = {}
        # get list of base classes, including cls, in reverse method resolution order: [BaseJob, Job, cls]
        base_classes = reversed(inspect.getmro(cls))
        attr_names = [name for base in base_classes for name in base.__dict__.keys()]
        for name in attr_names:
            attr_class = getattr(cls, name, None).__class__
            if name not in cls_vars and issubclass(attr_class, ScriptVariable):
                cls_vars[name] = getattr(cls, name)

        return cls_vars

    @classmethod
    def _get_file_vars(cls):
        """Return an ordered dict of FileVar fields."""
        cls_vars = cls._get_vars()
        file_vars = OrderedDict()
        for name, attr in cls_vars.items():
            if isinstance(attr, FileVar):
                file_vars[name] = attr

        return file_vars

    @classmethod
    def as_form_class(cls):
        """
        Dynamically generate a Django form class corresponding to the variables in this Job.

        In most cases you should use `.as_form()` instead of calling this method directly.
        """
        fields = {name: var.as_field() for name, var in cls._get_vars().items()}
        return type("JobForm", (JobForm,), fields)

    @classmethod
    def as_form(cls, data=None, files=None, initial=None, approval_view=False):
        """
        Return a Django form suitable for populating the context data required to run this Job.

        `approval_view` will disable all fields from modification and is used to display the form
        during a approval review workflow.
        """

        form = cls.as_form_class()(data, files, initial=initial)

        try:
            job_model = JobModel.objects.get_for_class_path(cls.class_path)
            dryrun_default = job_model.dryrun_default if job_model.dryrun_default_override else cls.dryrun_default
            task_queues = job_model.task_queues if job_model.task_queues_override else cls.task_queues
        except JobModel.DoesNotExist:
            logger.error("No Job instance found in the database corresponding to %s", cls.class_path)
            dryrun_default = cls.dryrun_default
            task_queues = cls.task_queues

        # Update task queue choices
        form.fields["_task_queue"].choices = task_queues_as_choices(task_queues)

        if cls.supports_dryrun and (not initial or "dryrun" not in initial):
            # Set initial "dryrun" checkbox state based on the Meta parameter
            form.fields["dryrun"].initial = dryrun_default
        if not settings.DEBUG:
            form.fields["_profile"].widget = forms.HiddenInput()

        # https://github.com/PyCQA/pylint/issues/3484
        if cls.field_order:  # pylint: disable=using-constant-test
            form.order_fields(cls.field_order)

        if approval_view:
            # Set `disabled=True` on all fields
            for _, field in form.fields.items():
                field.disabled = True

        return form

    def clear_cache(self):
        """
        Clear all cached properties on this instance without accessing them. This is required because
        celery reuses task instances for multiple runs.
        """
        try:
            del self.celery_kwargs
        except AttributeError:
            pass
        try:
            del self.job_result
        except AttributeError:
            pass
        try:
            del self.job_model
        except AttributeError:
            pass

    @functools.cached_property
    def job_model(self):
        return JobModel.objects.get(module_name=self.__module__, job_class_name=self.__name__)

    @functools.cached_property
    def job_result(self):
        return JobResult.objects.get(id=self.request.id)

    @functools.cached_property
    def celery_kwargs(self):
        return self.job_result.celery_kwargs or {}

    @property
    def user(self):
        return getattr(self.job_result, "user", None)

    @staticmethod
    def serialize_data(data):
        """
        This method parses input data (from JobForm usually) and returns a dict which is safe to serialize

        Here we convert the QuerySet of a MultiObjectVar to a list of the pk's and the model instance
        of an ObjectVar into the pk value.

        These are converted back during job execution.
        """

        return_data = {}
        for field_name, value in data.items():
            # MultiObjectVar
            if isinstance(value, QuerySet):
                return_data[field_name] = list(value.values_list("pk", flat=True))
            # ObjectVar
            elif isinstance(value, Model):
                return_data[field_name] = value.pk
            # FileVar (Save each FileVar as a FileProxy)
            elif isinstance(value, InMemoryUploadedFile):
                return_data[field_name] = BaseJob.save_file(value)
            # IPAddressVar, IPAddressWithMaskVar, IPNetworkVar
            elif isinstance(value, netaddr.ip.BaseIP):
                return_data[field_name] = str(value)
            # Everything else...
            else:
                return_data[field_name] = value

        return return_data

    # TODO: can the deserialize_data logic be moved to NautobotKombuJSONEncoder?
    @classmethod
    def deserialize_data(cls, data):
        """
        Given data input for a job execution, deserialize it by resolving object references using defined variables.

        This converts a list of pk's back into a QuerySet for MultiObjectVar instances and single pk values into
        model instances for ObjectVar.

        Note that when resolving querysets or model instances by their PK, we do not catch DoesNotExist
        exceptions here, we leave it up the caller to handle those cases. The normal job execution code
        path would consider this a failure of the job execution, as described in `nautobot.extras.jobs.run_job`.
        """
        cls_vars = cls._get_vars()
        return_data = {}

        if not isinstance(data, dict):
            raise TypeError("Data should be a dictionary.")

        for field_name, value in data.items():
            # If a field isn't a var, skip it (e.g. `_task_queue`).
            try:
                var = cls_vars[field_name]
            except KeyError:
                continue

            if value is None:
                if var.field_attrs.get("required"):
                    raise ValidationError(f"{field_name} is a required field")
                else:
                    return_data[field_name] = value
                    continue

            if isinstance(var, MultiObjectVar):
                queryset = var.field_attrs["queryset"].filter(pk__in=value)
                if queryset.count() < len(value):
                    # Not all objects found
                    found_pks = set(queryset.values_list("pk", flat=True))
                    not_found_pks = set(value).difference(found_pks)
                    raise queryset.model.DoesNotExist(
                        f"Failed to find requested objects for var {field_name}: [{', '.join(not_found_pks)}]"
                    )
                return_data[field_name] = var.field_attrs["queryset"].filter(pk__in=value)

            elif isinstance(var, ObjectVar):
                if isinstance(value, dict):
                    return_data[field_name] = var.field_attrs["queryset"].get(**value)
                else:
                    return_data[field_name] = var.field_attrs["queryset"].get(pk=value)
            elif isinstance(var, FileVar):
                return_data[field_name] = cls.load_file(value)
            # IPAddressVar is a netaddr.IPAddress object
            elif isinstance(var, IPAddressVar):
                return_data[field_name] = netaddr.IPAddress(value)
            # IPAddressWithMaskVar, IPNetworkVar are netaddr.IPNetwork objects
            elif isinstance(var, (IPAddressWithMaskVar, IPNetworkVar)):
                return_data[field_name] = netaddr.IPNetwork(value)
            else:
                return_data[field_name] = value

        return return_data

    @classmethod
    def validate_data(cls, data, files=None):
        cls_vars = cls._get_vars()

        if not isinstance(data, dict):
            raise ValidationError("Job data needs to be a dict")

        for k in data:
            if k not in cls_vars:
                raise ValidationError({k: "Job data contained an unknown property"})

        # defer validation to the form object
        f = cls.as_form(data=cls.deserialize_data(data), files=files)
        if not f.is_valid():
            raise ValidationError(f.errors)

        return f.cleaned_data

    @classmethod
    def prepare_job_kwargs(cls, job_kwargs):
        """Process dict and return kwargs that exist as ScriptVariables on this job."""
        job_vars = cls._get_vars()
        return {k: v for k, v in job_kwargs.items() if k in job_vars}

    @staticmethod
    def load_file(pk):
        """Load a file proxy stored in the database by primary key.

        Args:
            pk (uuid): Primary key of the `FileProxy` to retrieve

        Returns:
            (FileProxy): A File-like object
        """
        fp = FileProxy.objects.get(pk=pk)
        return fp.file

    @staticmethod
    def save_file(uploaded_file):
        """
        Save an uploaded file to the database as a file proxy and return the
        primary key.

        Args:
            uploaded_file (file): File handle of file to save to database

        Returns:
            (uuid): The pk of the `FileProxy` object
        """
        fp = FileProxy.objects.create(name=uploaded_file.name, file=uploaded_file)
        return fp.pk

    def delete_files(self, *files_to_delete):
        """Given an unpacked list of primary keys for `FileProxy` objects, delete them.

        Args:
            files_to_delete (*args): List of primary keys to delete

        Returns:
            (int): number of objects deleted
        """
        files = FileProxy.objects.filter(pk__in=files_to_delete)
        num = 0
        for fp in files:
            fp.delete()  # Call delete() on each, so `FileAttachment` is reaped
            num += 1
        self.logger.debug("Deleted %d file proxies", num, extra={"grouping": "post_run"})
        return num

    # Convenience functions

    def load_yaml(self, filename):
        """
        Return data from a YAML file
        """
        file_path = os.path.join(os.path.dirname(self.file_path), filename)
        with open(file_path, "r") as datafile:
            data = yaml.safe_load(datafile)

        return data

    def load_json(self, filename):
        """
        Return data from a JSON file
        """
        file_path = os.path.join(os.path.dirname(self.file_path), filename)
        with open(file_path, "r") as datafile:
            data = json.load(datafile)

        return data

Meta

Metaclass attributes - subclasses can define any or all of the following attributes:

  • name (str)
  • description (str)
  • hidden (bool)
  • field_order (list)
  • approval_required (bool)
  • soft_time_limit (int)
  • time_limit (int)
  • has_sensitive_variables (bool)
  • task_queues (list)
Source code in nautobot/extras/jobs.py
class Meta:
    """
    Metaclass attributes - subclasses can define any or all of the following attributes:

    - name (str)
    - description (str)
    - hidden (bool)
    - field_order (list)
    - approval_required (bool)
    - soft_time_limit (int)
    - time_limit (int)
    - has_sensitive_variables (bool)
    - task_queues (list)
    """

after_return(status, retval, task_id, args, kwargs, einfo)

Handler called after the task returns.

Parameters status - Current task state. retval - Task return value/exception. task_id - Unique id of the task. args - Original arguments for the task that returned. kwargs - Original keyword arguments for the task that returned.

Keyword Arguments einfo - ExceptionInfo instance, containing the traceback (if any).

Returns:

Type Description
None

The return value of this handler is ignored.

Source code in nautobot/extras/jobs.py
def after_return(self, status, retval, task_id, args, kwargs, einfo):
    """
    Handler called after the task returns.

    Parameters
        status - Current task state.
        retval - Task return value/exception.
        task_id - Unique id of the task.
        args - Original arguments for the task that returned.
        kwargs - Original keyword arguments for the task that returned.

    Keyword Arguments
        einfo - ExceptionInfo instance, containing the traceback (if any).

    Returns:
        (None): The return value of this handler is ignored.
    """

    # Cleanup FileProxy objects
    file_fields = list(self._get_file_vars())
    file_ids = [kwargs[f] for f in file_fields]
    if file_ids:
        self.delete_files(*file_ids)

    self.logger.info("Job completed", extra={"grouping": "post_run"})

    # TODO(gary): document this in job author docs
    # Super.after_return must be called for chords to function properly
    super().after_return(status, retval, task_id, args, kwargs, einfo=einfo)

apply(args=None, kwargs=None, link=None, link_error=None, task_id=None, retries=None, throw=None, logfile=None, loglevel=None, headers=None, **options)

Fix celery's apply method to propagate options to the task result

Source code in nautobot/extras/jobs.py
def apply(
    self,
    args=None,
    kwargs=None,
    link=None,
    link_error=None,
    task_id=None,
    retries=None,
    throw=None,
    logfile=None,
    loglevel=None,
    headers=None,
    **options,
):
    """Fix celery's apply method to propagate options to the task result"""
    # trace imports Task, so need to import inline.
    from celery.app.trace import build_tracer

    app = self._get_app()
    args = args or ()
    kwargs = kwargs or {}
    task_id = task_id or uuid()
    retries = retries or 0
    if throw is None:
        throw = app.conf.task_eager_propagates

    # Make sure we get the task instance, not class.
    task = app._tasks[self.name]

    request = {
        "id": task_id,
        "retries": retries,
        "is_eager": True,
        "logfile": logfile,
        "loglevel": loglevel or 0,
        "hostname": gethostname(),
        "callbacks": maybe_list(link),
        "errbacks": maybe_list(link_error),
        "headers": headers,
        "ignore_result": options.get("ignore_result", False),
        "delivery_info": {
            "is_eager": True,
            "exchange": options.get("exchange"),
            "routing_key": options.get("routing_key"),
            "priority": options.get("priority"),
        },
        "properties": options,  # one line fix to overloaded method
    }
    if "stamped_headers" in options:
        request["stamped_headers"] = maybe_list(options["stamped_headers"])
        request["stamps"] = {header: maybe_list(options.get(header, [])) for header in request["stamped_headers"]}

    tb = None
    tracer = build_tracer(
        task.name,
        task,
        eager=True,
        propagate=throw,
        app=self._get_app(),
    )
    ret = tracer(task_id, args, kwargs, request)
    retval = ret.retval
    if isinstance(retval, ExceptionInfo):
        retval, tb = retval.exception, retval.traceback
        if isinstance(retval, ExceptionWithTraceback):
            retval = retval.exc
    if isinstance(retval, Retry) and retval.sig is not None:
        return retval.sig.apply(retries=retries + 1)
    state = states.SUCCESS if ret.info is None else ret.info.state
    return EagerResult(task_id, retval, state, traceback=tb)

as_form(data=None, files=None, initial=None, approval_view=False) classmethod

Return a Django form suitable for populating the context data required to run this Job.

approval_view will disable all fields from modification and is used to display the form during a approval review workflow.

Source code in nautobot/extras/jobs.py
@classmethod
def as_form(cls, data=None, files=None, initial=None, approval_view=False):
    """
    Return a Django form suitable for populating the context data required to run this Job.

    `approval_view` will disable all fields from modification and is used to display the form
    during a approval review workflow.
    """

    form = cls.as_form_class()(data, files, initial=initial)

    try:
        job_model = JobModel.objects.get_for_class_path(cls.class_path)
        dryrun_default = job_model.dryrun_default if job_model.dryrun_default_override else cls.dryrun_default
        task_queues = job_model.task_queues if job_model.task_queues_override else cls.task_queues
    except JobModel.DoesNotExist:
        logger.error("No Job instance found in the database corresponding to %s", cls.class_path)
        dryrun_default = cls.dryrun_default
        task_queues = cls.task_queues

    # Update task queue choices
    form.fields["_task_queue"].choices = task_queues_as_choices(task_queues)

    if cls.supports_dryrun and (not initial or "dryrun" not in initial):
        # Set initial "dryrun" checkbox state based on the Meta parameter
        form.fields["dryrun"].initial = dryrun_default
    if not settings.DEBUG:
        form.fields["_profile"].widget = forms.HiddenInput()

    # https://github.com/PyCQA/pylint/issues/3484
    if cls.field_order:  # pylint: disable=using-constant-test
        form.order_fields(cls.field_order)

    if approval_view:
        # Set `disabled=True` on all fields
        for _, field in form.fields.items():
            field.disabled = True

    return form

as_form_class() classmethod

Dynamically generate a Django form class corresponding to the variables in this Job.

In most cases you should use .as_form() instead of calling this method directly.

Source code in nautobot/extras/jobs.py
@classmethod
def as_form_class(cls):
    """
    Dynamically generate a Django form class corresponding to the variables in this Job.

    In most cases you should use `.as_form()` instead of calling this method directly.
    """
    fields = {name: var.as_field() for name, var in cls._get_vars().items()}
    return type("JobForm", (JobForm,), fields)

before_start(task_id, args, kwargs)

Handler called before the task starts.

Parameters:

Name Type Description Default
task_id str

Unique id of the task to execute.

required
args Tuple

Original arguments for the task to execute.

required
kwargs Dict

Original keyword arguments for the task to execute.

required

Returns:

Type Description
None

The return value of this handler is ignored.

Source code in nautobot/extras/jobs.py
def before_start(self, task_id, args, kwargs):
    """Handler called before the task starts.

    Arguments:
        task_id (str): Unique id of the task to execute.
        args (Tuple): Original arguments for the task to execute.
        kwargs (Dict): Original keyword arguments for the task to execute.

    Returns:
        (None): The return value of this handler is ignored.
    """
    self.clear_cache()

    try:
        self.job_result
    except ObjectDoesNotExist as err:
        raise RunJobTaskFailed(f"Unable to find associated job result for job {task_id}") from err

    try:
        self.job_model
    except ObjectDoesNotExist as err:
        raise RunJobTaskFailed(f"Unable to find associated job model for job {task_id}") from err

    if not self.job_model.enabled:
        self.logger.error(
            "Job %s is not enabled to be run!",
            self.job_model,
            extra={"object": self.job_model, "grouping": "initialization"},
        )
        raise RunJobTaskFailed(f"Job {self.job_model} is not enabled to be run!")

    soft_time_limit = self.job_model.soft_time_limit or settings.CELERY_TASK_SOFT_TIME_LIMIT
    time_limit = self.job_model.time_limit or settings.CELERY_TASK_TIME_LIMIT
    if time_limit <= soft_time_limit:
        self.logger.warning(
            "The hard time limit of %s seconds is less than "
            "or equal to the soft time limit of %s seconds. "
            "This job will fail silently after %s seconds.",
            time_limit,
            soft_time_limit,
            time_limit,
            extra={"grouping": "initialization"},
        )

    self.logger.info("Running job", extra={"grouping": "initialization"})

class_path()

Unique identifier of a specific Job class, in the form ..

Examples:

my_script.MyScript - Local Job nautobot.core.jobs.MySystemJob - System Job my_plugin.jobs.MyPluginJob - App-provided Job git_repository.jobs.myjob.MyJob - GitRepository Job

Source code in nautobot/extras/jobs.py
@final
@classproperty
def class_path(cls) -> str:  # pylint: disable=no-self-argument
    """
    Unique identifier of a specific Job class, in the form <module_name>.<ClassName>.

    Examples:
    my_script.MyScript - Local Job
    nautobot.core.jobs.MySystemJob - System Job
    my_plugin.jobs.MyPluginJob - App-provided Job
    git_repository.jobs.myjob.MyJob - GitRepository Job
    """
    return f"{cls.__module__}.{cls.__name__}"

class_path_dotted()

Dotted class_path, suitable for use in things like Python logger names.

Deprecated as of Nautobot 2.0: just use .class_path instead.

Source code in nautobot/extras/jobs.py
@final
@classproperty
def class_path_dotted(cls) -> str:  # pylint: disable=no-self-argument
    """
    Dotted class_path, suitable for use in things like Python logger names.

    Deprecated as of Nautobot 2.0: just use .class_path instead.
    """
    return cls.class_path

class_path_js_escaped()

Escape various characters so that the class_path can be used as a jQuery selector.

Source code in nautobot/extras/jobs.py
@final
@classproperty
def class_path_js_escaped(cls) -> str:  # pylint: disable=no-self-argument
    """
    Escape various characters so that the class_path can be used as a jQuery selector.
    """
    return cls.class_path.replace(".", r"\.")

clear_cache()

Clear all cached properties on this instance without accessing them. This is required because celery reuses task instances for multiple runs.

Source code in nautobot/extras/jobs.py
def clear_cache(self):
    """
    Clear all cached properties on this instance without accessing them. This is required because
    celery reuses task instances for multiple runs.
    """
    try:
        del self.celery_kwargs
    except AttributeError:
        pass
    try:
        del self.job_result
    except AttributeError:
        pass
    try:
        del self.job_model
    except AttributeError:
        pass

delete_files(*files_to_delete)

Given an unpacked list of primary keys for FileProxy objects, delete them.

Parameters:

Name Type Description Default
files_to_delete *args

List of primary keys to delete

()

Returns:

Type Description
int

number of objects deleted

Source code in nautobot/extras/jobs.py
def delete_files(self, *files_to_delete):
    """Given an unpacked list of primary keys for `FileProxy` objects, delete them.

    Args:
        files_to_delete (*args): List of primary keys to delete

    Returns:
        (int): number of objects deleted
    """
    files = FileProxy.objects.filter(pk__in=files_to_delete)
    num = 0
    for fp in files:
        fp.delete()  # Call delete() on each, so `FileAttachment` is reaped
        num += 1
    self.logger.debug("Deleted %d file proxies", num, extra={"grouping": "post_run"})
    return num

deserialize_data(data) classmethod

Given data input for a job execution, deserialize it by resolving object references using defined variables.

This converts a list of pk's back into a QuerySet for MultiObjectVar instances and single pk values into model instances for ObjectVar.

Note that when resolving querysets or model instances by their PK, we do not catch DoesNotExist exceptions here, we leave it up the caller to handle those cases. The normal job execution code path would consider this a failure of the job execution, as described in nautobot.extras.jobs.run_job.

Source code in nautobot/extras/jobs.py
@classmethod
def deserialize_data(cls, data):
    """
    Given data input for a job execution, deserialize it by resolving object references using defined variables.

    This converts a list of pk's back into a QuerySet for MultiObjectVar instances and single pk values into
    model instances for ObjectVar.

    Note that when resolving querysets or model instances by their PK, we do not catch DoesNotExist
    exceptions here, we leave it up the caller to handle those cases. The normal job execution code
    path would consider this a failure of the job execution, as described in `nautobot.extras.jobs.run_job`.
    """
    cls_vars = cls._get_vars()
    return_data = {}

    if not isinstance(data, dict):
        raise TypeError("Data should be a dictionary.")

    for field_name, value in data.items():
        # If a field isn't a var, skip it (e.g. `_task_queue`).
        try:
            var = cls_vars[field_name]
        except KeyError:
            continue

        if value is None:
            if var.field_attrs.get("required"):
                raise ValidationError(f"{field_name} is a required field")
            else:
                return_data[field_name] = value
                continue

        if isinstance(var, MultiObjectVar):
            queryset = var.field_attrs["queryset"].filter(pk__in=value)
            if queryset.count() < len(value):
                # Not all objects found
                found_pks = set(queryset.values_list("pk", flat=True))
                not_found_pks = set(value).difference(found_pks)
                raise queryset.model.DoesNotExist(
                    f"Failed to find requested objects for var {field_name}: [{', '.join(not_found_pks)}]"
                )
            return_data[field_name] = var.field_attrs["queryset"].filter(pk__in=value)

        elif isinstance(var, ObjectVar):
            if isinstance(value, dict):
                return_data[field_name] = var.field_attrs["queryset"].get(**value)
            else:
                return_data[field_name] = var.field_attrs["queryset"].get(pk=value)
        elif isinstance(var, FileVar):
            return_data[field_name] = cls.load_file(value)
        # IPAddressVar is a netaddr.IPAddress object
        elif isinstance(var, IPAddressVar):
            return_data[field_name] = netaddr.IPAddress(value)
        # IPAddressWithMaskVar, IPNetworkVar are netaddr.IPNetwork objects
        elif isinstance(var, (IPAddressWithMaskVar, IPNetworkVar)):
            return_data[field_name] = netaddr.IPNetwork(value)
        else:
            return_data[field_name] = value

    return return_data

load_file(pk) staticmethod

Load a file proxy stored in the database by primary key.

Parameters:

Name Type Description Default
pk uuid

Primary key of the FileProxy to retrieve

required

Returns:

Type Description
FileProxy

A File-like object

Source code in nautobot/extras/jobs.py
@staticmethod
def load_file(pk):
    """Load a file proxy stored in the database by primary key.

    Args:
        pk (uuid): Primary key of the `FileProxy` to retrieve

    Returns:
        (FileProxy): A File-like object
    """
    fp = FileProxy.objects.get(pk=pk)
    return fp.file

load_json(filename)

Return data from a JSON file

Source code in nautobot/extras/jobs.py
def load_json(self, filename):
    """
    Return data from a JSON file
    """
    file_path = os.path.join(os.path.dirname(self.file_path), filename)
    with open(file_path, "r") as datafile:
        data = json.load(datafile)

    return data

load_yaml(filename)

Return data from a YAML file

Source code in nautobot/extras/jobs.py
def load_yaml(self, filename):
    """
    Return data from a YAML file
    """
    file_path = os.path.join(os.path.dirname(self.file_path), filename)
    with open(file_path, "r") as datafile:
        data = yaml.safe_load(datafile)

    return data

on_bound(app) classmethod

Called when the task is bound to an app.

Note

This class method can be defined to do additional actions when the task class is bound to an app.

Source code in nautobot/extras/jobs.py
@classmethod
def on_bound(cls, app):
    """Called when the task is bound to an app.

    Note:
        This class method can be defined to do additional actions when
        the task class is bound to an app.
    """

on_failure(exc, task_id, args, kwargs, einfo)

Error handler.

This is run by the worker when the task fails.

Parameters:

Name Type Description Default
exc Exception

The exception raised by the task.

required
task_id str

Unique id of the failed task.

required
args Tuple

Original arguments for the task that failed.

required
kwargs Dict

Original keyword arguments for the task that failed.

required
einfo ~billiard.einfo.ExceptionInfo

Exception information.

required

Returns:

Type Description
None

The return value of this handler is ignored.

Source code in nautobot/extras/jobs.py
def on_failure(self, exc, task_id, args, kwargs, einfo):
    """Error handler.

    This is run by the worker when the task fails.

    Arguments:
        exc (Exception): The exception raised by the task.
        task_id (str): Unique id of the failed task.
        args (Tuple): Original arguments for the task that failed.
        kwargs (Dict): Original keyword arguments for the task that failed.
        einfo (~billiard.einfo.ExceptionInfo): Exception information.

    Returns:
        (None): The return value of this handler is ignored.
    """

on_retry(exc, task_id, args, kwargs, einfo)

Retry handler.

This is run by the worker when the task is to be retried.

Parameters:

Name Type Description Default
exc Exception

The exception sent to :meth:retry.

required
task_id str

Unique id of the retried task.

required
args Tuple

Original arguments for the retried task.

required
kwargs Dict

Original keyword arguments for the retried task.

required
einfo ~billiard.einfo.ExceptionInfo

Exception information.

required

Returns:

Type Description
None

The return value of this handler is ignored.

Source code in nautobot/extras/jobs.py
def on_retry(self, exc, task_id, args, kwargs, einfo):
    """Retry handler.

    This is run by the worker when the task is to be retried.

    Arguments:
        exc (Exception): The exception sent to :meth:`retry`.
        task_id (str): Unique id of the retried task.
        args (Tuple): Original arguments for the retried task.
        kwargs (Dict): Original keyword arguments for the retried task.
        einfo (~billiard.einfo.ExceptionInfo): Exception information.

    Returns:
        (None): The return value of this handler is ignored.
    """

on_success(retval, task_id, args, kwargs)

Success handler.

Run by the worker if the task executes successfully.

Parameters:

Name Type Description Default
retval Any

The return value of the task.

required
task_id str

Unique id of the executed task.

required
args Tuple

Original arguments for the executed task.

required
kwargs Dict

Original keyword arguments for the executed task.

required

Returns:

Type Description
None

The return value of this handler is ignored.

Source code in nautobot/extras/jobs.py
def on_success(self, retval, task_id, args, kwargs):
    """Success handler.

    Run by the worker if the task executes successfully.

    Arguments:
        retval (Any): The return value of the task.
        task_id (str): Unique id of the executed task.
        args (Tuple): Original arguments for the executed task.
        kwargs (Dict): Original keyword arguments for the executed task.

    Returns:
        (None): The return value of this handler is ignored.
    """

prepare_job_kwargs(job_kwargs) classmethod

Process dict and return kwargs that exist as ScriptVariables on this job.

Source code in nautobot/extras/jobs.py
@classmethod
def prepare_job_kwargs(cls, job_kwargs):
    """Process dict and return kwargs that exist as ScriptVariables on this job."""
    job_vars = cls._get_vars()
    return {k: v for k, v in job_kwargs.items() if k in job_vars}

properties_dict()

Return all relevant classproperties as a dict.

Used for convenient rendering into job_edit.html via the json_script template tag.

Source code in nautobot/extras/jobs.py
@final
@classproperty
def properties_dict(cls) -> dict:  # pylint: disable=no-self-argument
    """
    Return all relevant classproperties as a dict.

    Used for convenient rendering into job_edit.html via the `json_script` template tag.
    """
    return {
        "name": cls.name,
        "grouping": cls.grouping,
        "description": cls.description,
        "approval_required": cls.approval_required,
        "hidden": cls.hidden,
        "soft_time_limit": cls.soft_time_limit,
        "time_limit": cls.time_limit,
        "has_sensitive_variables": cls.has_sensitive_variables,
        "task_queues": cls.task_queues,
    }

run(*args, **kwargs)

Method invoked when this Job is run.

Source code in nautobot/extras/jobs.py
def run(self, *args, **kwargs):
    """
    Method invoked when this Job is run.
    """
    raise NotImplementedError("Jobs must define the run method.")

save_file(uploaded_file) staticmethod

Save an uploaded file to the database as a file proxy and return the primary key.

Parameters:

Name Type Description Default
uploaded_file file

File handle of file to save to database

required

Returns:

Type Description
uuid

The pk of the FileProxy object

Source code in nautobot/extras/jobs.py
@staticmethod
def save_file(uploaded_file):
    """
    Save an uploaded file to the database as a file proxy and return the
    primary key.

    Args:
        uploaded_file (file): File handle of file to save to database

    Returns:
        (uuid): The pk of the `FileProxy` object
    """
    fp = FileProxy.objects.create(name=uploaded_file.name, file=uploaded_file)
    return fp.pk

serialize_data(data) staticmethod

This method parses input data (from JobForm usually) and returns a dict which is safe to serialize

Here we convert the QuerySet of a MultiObjectVar to a list of the pk's and the model instance of an ObjectVar into the pk value.

These are converted back during job execution.

Source code in nautobot/extras/jobs.py
@staticmethod
def serialize_data(data):
    """
    This method parses input data (from JobForm usually) and returns a dict which is safe to serialize

    Here we convert the QuerySet of a MultiObjectVar to a list of the pk's and the model instance
    of an ObjectVar into the pk value.

    These are converted back during job execution.
    """

    return_data = {}
    for field_name, value in data.items():
        # MultiObjectVar
        if isinstance(value, QuerySet):
            return_data[field_name] = list(value.values_list("pk", flat=True))
        # ObjectVar
        elif isinstance(value, Model):
            return_data[field_name] = value.pk
        # FileVar (Save each FileVar as a FileProxy)
        elif isinstance(value, InMemoryUploadedFile):
            return_data[field_name] = BaseJob.save_file(value)
        # IPAddressVar, IPAddressWithMaskVar, IPNetworkVar
        elif isinstance(value, netaddr.ip.BaseIP):
            return_data[field_name] = str(value)
        # Everything else...
        else:
            return_data[field_name] = value

    return return_data

shadow_name(args, kwargs, options)

Override for custom task name in worker logs/monitoring.

Example

from celery.utils.imports import qualname

def shadow_name(task, args, kwargs, options): return qualname(args[0])

@app.task(shadow_name=shadow_name, serializer='pickle') def apply_function_async(fun, args, kwargs): return fun(args, **kwargs)

Parameters:

Name Type Description Default
args Tuple

Task positional arguments.

required
kwargs Dict

Task keyword arguments.

required
options Dict

Task execution options.

required
Source code in nautobot/extras/jobs.py
def shadow_name(self, args, kwargs, options):
    """Override for custom task name in worker logs/monitoring.

    Example:
            from celery.utils.imports import qualname

            def shadow_name(task, args, kwargs, options):
                return qualname(args[0])

            @app.task(shadow_name=shadow_name, serializer='pickle')
            def apply_function_async(fun, *args, **kwargs):
                return fun(*args, **kwargs)

    Arguments:
        args (Tuple): Task positional arguments.
        kwargs (Dict): Task keyword arguments.
        options (Dict): Task execution options.
    """

nautobot.apps.jobs.BooleanVar

Bases: ScriptVariable

Boolean representation (true/false). Renders as a checkbox.

Source code in nautobot/extras/jobs.py
class BooleanVar(ScriptVariable):
    """
    Boolean representation (true/false). Renders as a checkbox.
    """

    form_field = forms.BooleanField

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # Boolean fields cannot be required
        self.field_attrs["required"] = False

nautobot.apps.jobs.ChoiceVar

Bases: ScriptVariable

Select one of several predefined static choices, passed as a list of two-tuples. Example:

color = ChoiceVar(
    choices=(
        ('#ff0000', 'Red'),
        ('#00ff00', 'Green'),
        ('#0000ff', 'Blue')
    )
)
Source code in nautobot/extras/jobs.py
class ChoiceVar(ScriptVariable):
    """
    Select one of several predefined static choices, passed as a list of two-tuples. Example:

        color = ChoiceVar(
            choices=(
                ('#ff0000', 'Red'),
                ('#00ff00', 'Green'),
                ('#0000ff', 'Blue')
            )
        )
    """

    form_field = forms.ChoiceField

    def __init__(self, choices, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # Set field choices
        self.field_attrs["choices"] = choices

nautobot.apps.jobs.DatabaseFileField

Bases: forms.FileField

Specialized FileField for use with DatabaseFileStorage storage backend.

Source code in nautobot/extras/jobs.py
class DatabaseFileField(forms.FileField):
    """Specialized `FileField` for use with `DatabaseFileStorage` storage backend."""

    widget = DBClearableFileInput

nautobot.apps.jobs.DryRunVar

Bases: BooleanVar

Special boolean variable that bypasses approval requirements if this is set to True on job execution.

Source code in nautobot/extras/jobs.py
class DryRunVar(BooleanVar):
    """
    Special boolean variable that bypasses approval requirements if this is set to True on job execution.
    """

    description = "Check to run job in dryrun mode."

    def __init__(self, *args, **kwargs):
        # Default must be false unless overridden through `dryrun_default` meta attribute
        kwargs["default"] = False

        # Default description if one was not provided
        kwargs.setdefault("description", self.description)

        super().__init__(*args, **kwargs)

nautobot.apps.jobs.FileVar

Bases: ScriptVariable

An uploaded file.

Source code in nautobot/extras/jobs.py
class FileVar(ScriptVariable):
    """
    An uploaded file.
    """

    form_field = DatabaseFileField

nautobot.apps.jobs.GitRepositoryDryRun

Bases: Job

System Job to perform a dry run on a Git repository.

Source code in nautobot/core/jobs/__init__.py
class GitRepositoryDryRun(Job):
    """System Job to perform a dry run on a Git repository."""

    repository = ObjectVar(
        description="Git Repository to dry-run",
        label="Git Repository",
        model=GitRepository,
    )

    class Meta:
        name = "Git Repository: Dry-Run"
        has_sensitive_variables = False

    def run(self, repository):
        job_result = self.job_result
        self.logger.info(f'Performing a Dry Run on Git repository "{repository.name}"...')

        try:
            git_repository_dry_run(repository, logger=self.logger)
        finally:
            self.logger.info(f"Repository dry run completed in {job_result.duration}")

nautobot.apps.jobs.GitRepositorySync

Bases: Job

System job to clone and/or pull a Git repository, then invoke refresh_datasource_content().

Source code in nautobot/core/jobs/__init__.py
class GitRepositorySync(Job):
    """
    System job to clone and/or pull a Git repository, then invoke `refresh_datasource_content()`.
    """

    repository = ObjectVar(
        description="Git Repository to pull and refresh",
        label="Git Repository",
        model=GitRepository,
    )

    class Meta:
        name = "Git Repository: Sync"
        has_sensitive_variables = False

    def run(self, repository):
        job_result = self.job_result
        user = job_result.user

        self.logger.info(f'Creating/refreshing local copy of Git repository "{repository.name}"...')

        try:
            ensure_git_repository(repository, logger=self.logger)
            refresh_datasource_content("extras.gitrepository", repository, user, job_result, delete=False)
            # Given that the above succeeded, tell all workers (including ourself) to call ensure_git_repository()
            app.control.broadcast("refresh_git_repository", repository_pk=repository.pk, head=repository.current_head)
        finally:
            self.logger.info(f"Repository synchronization completed in {job_result.duration}")

nautobot.apps.jobs.IPAddressVar

Bases: ScriptVariable

An IPv4 or IPv6 address without a mask.

Source code in nautobot/extras/jobs.py
class IPAddressVar(ScriptVariable):
    """
    An IPv4 or IPv6 address without a mask.
    """

    form_field = IPAddressFormField

nautobot.apps.jobs.IPAddressWithMaskVar

Bases: ScriptVariable

An IPv4 or IPv6 address with a mask.

Source code in nautobot/extras/jobs.py
class IPAddressWithMaskVar(ScriptVariable):
    """
    An IPv4 or IPv6 address with a mask.
    """

    form_field = IPNetworkFormField

nautobot.apps.jobs.IPNetworkVar

Bases: ScriptVariable

An IPv4 or IPv6 prefix.

Source code in nautobot/extras/jobs.py
class IPNetworkVar(ScriptVariable):
    """
    An IPv4 or IPv6 prefix.
    """

    form_field = IPNetworkFormField

    def __init__(self, min_prefix_length=None, max_prefix_length=None, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # Set prefix validator and optional minimum/maximum prefix lengths
        self.field_attrs["validators"] = [prefix_validator]
        if min_prefix_length is not None:
            self.field_attrs["validators"].append(MinPrefixLengthValidator(min_prefix_length))
        if max_prefix_length is not None:
            self.field_attrs["validators"].append(MaxPrefixLengthValidator(max_prefix_length))

nautobot.apps.jobs.IntegerVar

Bases: ScriptVariable

Integer representation. Can enforce minimum/maximum values.

Source code in nautobot/extras/jobs.py
class IntegerVar(ScriptVariable):
    """
    Integer representation. Can enforce minimum/maximum values.
    """

    form_field = forms.IntegerField

    def __init__(self, min_value=None, max_value=None, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # Optional minimum/maximum values
        if min_value:
            self.field_attrs["min_value"] = min_value
        if max_value:
            self.field_attrs["max_value"] = max_value

nautobot.apps.jobs.Job

Bases: BaseJob

Classes which inherit from this model will appear in the list of available jobs.

Source code in nautobot/extras/jobs.py
class Job(BaseJob):
    """
    Classes which inherit from this model will appear in the list of available jobs.
    """

nautobot.apps.jobs.JobButtonReceiver

Bases: Job

Base class for job button receivers. Job button receivers are jobs that are initiated from UI buttons and are not intended to be run from the job form UI or API like standard jobs.

Source code in nautobot/extras/jobs.py
class JobButtonReceiver(Job):
    """
    Base class for job button receivers. Job button receivers are jobs that are initiated
    from UI buttons and are not intended to be run from the job form UI or API like standard jobs.
    """

    object_pk = StringVar()
    object_model_name = StringVar()

    def run(self, object_pk, object_model_name):
        """JobButtonReceiver subclasses generally shouldn't need to override this method."""
        model = get_model_from_name(object_model_name)
        obj = model.objects.get(pk=object_pk)

        self.receive_job_button(obj=obj)

    def receive_job_button(self, obj):
        """
        Method to be implemented by concrete JobButtonReceiver subclasses.

        :param obj: an instance of the object
        """
        raise NotImplementedError

receive_job_button(obj)

Method to be implemented by concrete JobButtonReceiver subclasses.

:param obj: an instance of the object

Source code in nautobot/extras/jobs.py
def receive_job_button(self, obj):
    """
    Method to be implemented by concrete JobButtonReceiver subclasses.

    :param obj: an instance of the object
    """
    raise NotImplementedError

run(object_pk, object_model_name)

JobButtonReceiver subclasses generally shouldn't need to override this method.

Source code in nautobot/extras/jobs.py
def run(self, object_pk, object_model_name):
    """JobButtonReceiver subclasses generally shouldn't need to override this method."""
    model = get_model_from_name(object_model_name)
    obj = model.objects.get(pk=object_pk)

    self.receive_job_button(obj=obj)

nautobot.apps.jobs.JobHookReceiver

Bases: Job

Base class for job hook receivers. Job hook receivers are jobs that are initiated from object changes and are not intended to be run from the UI or API like standard jobs.

Source code in nautobot/extras/jobs.py
class JobHookReceiver(Job):
    """
    Base class for job hook receivers. Job hook receivers are jobs that are initiated
    from object changes and are not intended to be run from the UI or API like standard jobs.
    """

    object_change = ObjectVar(model=ObjectChange)

    def run(self, object_change):
        """JobHookReceiver subclasses generally shouldn't need to override this method."""
        self.receive_job_hook(
            change=object_change,
            action=object_change.action,
            changed_object=object_change.changed_object,
        )

    def receive_job_hook(self, change, action, changed_object):
        """
        Method to be implemented by concrete JobHookReceiver subclasses.

        :param change: an instance of `nautobot.extras.models.ObjectChange`
        :param action: a string with the action performed on the changed object ("create", "update" or "delete")
        :param changed_object: an instance of the object that was changed, or `None` if the object has been deleted
        """
        raise NotImplementedError

receive_job_hook(change, action, changed_object)

Method to be implemented by concrete JobHookReceiver subclasses.

:param change: an instance of nautobot.extras.models.ObjectChange :param action: a string with the action performed on the changed object ("create", "update" or "delete") :param changed_object: an instance of the object that was changed, or None if the object has been deleted

Source code in nautobot/extras/jobs.py
def receive_job_hook(self, change, action, changed_object):
    """
    Method to be implemented by concrete JobHookReceiver subclasses.

    :param change: an instance of `nautobot.extras.models.ObjectChange`
    :param action: a string with the action performed on the changed object ("create", "update" or "delete")
    :param changed_object: an instance of the object that was changed, or `None` if the object has been deleted
    """
    raise NotImplementedError

run(object_change)

JobHookReceiver subclasses generally shouldn't need to override this method.

Source code in nautobot/extras/jobs.py
def run(self, object_change):
    """JobHookReceiver subclasses generally shouldn't need to override this method."""
    self.receive_job_hook(
        change=object_change,
        action=object_change.action,
        changed_object=object_change.changed_object,
    )

nautobot.apps.jobs.MultiChoiceVar

Bases: ChoiceVar

Like ChoiceVar, but allows for the selection of multiple choices.

Source code in nautobot/extras/jobs.py
class MultiChoiceVar(ChoiceVar):
    """
    Like ChoiceVar, but allows for the selection of multiple choices.
    """

    form_field = forms.MultipleChoiceField

nautobot.apps.jobs.MultiObjectVar

Bases: ObjectVar

Like ObjectVar, but can represent one or more objects.

Source code in nautobot/extras/jobs.py
class MultiObjectVar(ObjectVar):
    """
    Like ObjectVar, but can represent one or more objects.
    """

    form_field = DynamicModelMultipleChoiceField

nautobot.apps.jobs.NautobotKombuJSONEncoder

Bases: JSONEncoder

Custom json encoder based on restframework's JSONEncoder that serializes objects that implement the nautobot_serialize() method via the __nautobot_type__ interface. This is useful in passing special objects to and from Celery tasks.

This pattern should generally be avoided by passing pointers to persisted objects to the Celery tasks and retrieving them from within the task execution. While this is always possible for model instances (which covers 99% of use cases), for rare instances where it does not, and the actual object must be passed, this pattern allows for encoding and decoding of such objects.

It requires a conforming class to implement the instance method nautobot_serialize() which returns a json serializable dictionary of the object representation. The class must also implement the nautobot_deserialize() class method which takes the dictionary representation and returns an actual instance of the class.

Source code in nautobot/core/celery/encoders.py
class NautobotKombuJSONEncoder(JSONEncoder):
    """
    Custom json encoder based on restframework's JSONEncoder that serializes objects that implement
    the `nautobot_serialize()` method via the `__nautobot_type__` interface. This is useful
    in passing special objects to and from Celery tasks.

    This pattern should generally be avoided by passing pointers to persisted objects to the
    Celery tasks and retrieving them from within the task execution. While this is always possible
    for model instances (which covers 99% of use cases), for rare instances where it does not,
    and the actual object must be passed, this pattern allows for encoding and decoding
    of such objects.

    It requires a conforming class to implement the instance method `nautobot_serialize()` which
    returns a json serializable dictionary of the object representation. The class must also implement
    the `nautobot_deserialize()` class method which takes the dictionary representation and returns
    an actual instance of the class.
    """

    def default(self, obj):
        # Import here to avoid django.core.exceptions.ImproperlyConfigured Error.
        # Core App is not set up yet if we import this at the top of the file.
        from nautobot.core.models import BaseModel
        from nautobot.core.models.managers import TagsManager

        if isinstance(obj, BaseModel):
            cls = obj.__class__
            module = cls.__module__
            qual_name = ".".join([module, cls.__qualname__])  # fully qualified dotted import path
            logger.debug("Performing nautobot serialization on %s for type %s", obj, qual_name)
            data = {
                "id": obj.id,
                "__nautobot_type__": qual_name,
                # TODO: change to natural key to provide additional context if object is deleted from the db
                "display": getattr(obj, "display", str(obj)),
            }
            return data

        elif isinstance(obj, set):
            # Convert a set to a list for passing to and from a task
            return list(obj)
        elif isinstance(obj, TagsManager):
            obj = obj.values_list("id", flat=True)
            return obj
        elif isinstance(obj, Exception):
            # JobResult.result uses NautobotKombuJSONEncoder as an encoder and expects a JSONSerializable object,
            # although an exception, such as a RuntimeException, can be supplied as the obj.
            return f"{obj.__class__.__name__}: {obj}"
        else:
            return super().default(obj)

nautobot.apps.jobs.ObjectVar

Bases: ScriptVariable

A single object within Nautobot.

:param model: The Nautobot model being referenced :param display_field: The attribute of the returned object to display in the selection list (default: 'name') :param query_params: A dictionary of additional query parameters to attach when making REST API requests (optional) :param null_option: The label to use as a "null" selection option (optional)

Source code in nautobot/extras/jobs.py
class ObjectVar(ScriptVariable):
    """
    A single object within Nautobot.

    :param model: The Nautobot model being referenced
    :param display_field: The attribute of the returned object to display in the selection list (default: 'name')
    :param query_params: A dictionary of additional query parameters to attach when making REST API requests (optional)
    :param null_option: The label to use as a "null" selection option (optional)
    """

    form_field = DynamicModelChoiceField

    def __init__(
        self,
        model=None,
        queryset=None,
        display_field="display",
        query_params=None,
        null_option=None,
        *args,
        **kwargs,
    ):
        super().__init__(*args, **kwargs)

        # Set the form field's queryset. Support backward compatibility for the "queryset" argument for now.
        if model is not None:
            self.field_attrs["queryset"] = model.objects.all()
        elif queryset is not None:
            warnings.warn(
                f'{self}: Specifying a queryset for ObjectVar is no longer supported. Please use "model" instead.'
            )
            self.field_attrs["queryset"] = queryset
        else:
            raise TypeError("ObjectVar must specify a model")

        self.field_attrs.update(
            {
                "display_field": display_field,
                "query_params": query_params,
                "null_option": null_option,
            }
        )

nautobot.apps.jobs.RunJobTaskFailed

Bases: Exception

Celery task failed for some reason.

Source code in nautobot/extras/jobs.py
class RunJobTaskFailed(Exception):
    """Celery task failed for some reason."""

nautobot.apps.jobs.ScriptVariable

Base model for script variables

Source code in nautobot/extras/jobs.py
class ScriptVariable:
    """
    Base model for script variables
    """

    form_field = forms.CharField

    def __init__(self, label="", description="", default=None, required=True, widget=None):
        # Initialize field attributes
        if not hasattr(self, "field_attrs"):
            self.field_attrs = {}
        if label:
            self.field_attrs["label"] = label
        if description:
            self.field_attrs["help_text"] = description
        if default is not None:
            self.field_attrs["initial"] = default
        if widget:
            self.field_attrs["widget"] = widget
        self.field_attrs["required"] = required

    def as_field(self):
        """
        Render the variable as a Django form field.
        """
        form_field = self.form_field(**self.field_attrs)
        if not isinstance(form_field.widget, forms.CheckboxInput):
            if form_field.widget.attrs and "class" in form_field.widget.attrs.keys():
                form_field.widget.attrs["class"] += " form-control"
            else:
                form_field.widget.attrs["class"] = "form-control"

        return form_field

as_field()

Render the variable as a Django form field.

Source code in nautobot/extras/jobs.py
def as_field(self):
    """
    Render the variable as a Django form field.
    """
    form_field = self.form_field(**self.field_attrs)
    if not isinstance(form_field.widget, forms.CheckboxInput):
        if form_field.widget.attrs and "class" in form_field.widget.attrs.keys():
            form_field.widget.attrs["class"] += " form-control"
        else:
            form_field.widget.attrs["class"] = "form-control"

    return form_field

nautobot.apps.jobs.StringVar

Bases: ScriptVariable

Character string representation. Can enforce minimum/maximum length and/or regex validation.

Source code in nautobot/extras/jobs.py
class StringVar(ScriptVariable):
    """
    Character string representation. Can enforce minimum/maximum length and/or regex validation.
    """

    def __init__(self, min_length=None, max_length=None, regex=None, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # Optional minimum/maximum lengths
        if min_length:
            self.field_attrs["min_length"] = min_length
        if max_length:
            self.field_attrs["max_length"] = max_length

        # Optional regular expression validation
        if regex:
            self.field_attrs["validators"] = [
                RegexValidator(
                    regex=regex,
                    message=f"Invalid value. Must match regex: {regex}",
                    code="invalid",
                )
            ]

nautobot.apps.jobs.TextVar

Bases: ScriptVariable

Free-form text data. Renders as a