Automatic Serialization with Celery and Pydantic Models

Automatic Serialization with Celery and Pydantic Models

I’m working on a project involving FastApi (which uses Pydantic models under the hood) and Celery. I wanted to have a nice clean way to pass Pydantic models to Celery tasks so I could avoid translating back and forth. To my surprise this doesn’t work natively, though it does seem like it has been considered

I ended up solving this using Celery’s ability to add custom serializers and this helpful starting example.

Structure

main.py
models/
  __init__.py
  models.py
pydanticserializer/
  __init__.py
  pydanticserializer.py

models.py

Create a module around this


from pydantic import BaseModel

class MyCustomTypeOne(BaseModel):
  myfield:str

class MyCustomTypeTwo(BaseModel):
  myotherfield:str

pydanticserializer.py


import json
from pydantic import BaseModel
import models

class PydanticSerializer(json.JSONEncoder):   
    def default(self, obj):
        if isinstance(obj, BaseModel):
            return obj.model_dump() | {'__type__': type(obj).__name__}
        else:
            return json.JSONEncoder.default(self, obj)

def pydantic_decoder(obj):
    if '__type__' in obj:
        if obj['__type__'] in dir(models):
            cls = getattr(models, obj['__type__'])
            return cls.parse_obj(obj)
    return obj


# Encoder function      
def pydantic_dumps(obj):
    return json.dumps(obj, cls=PydanticSerializer)

# Decoder function
def pydantic_loads(obj):
    return json.loads(obj, object_hook=pydantic_decoder)

main.py

from celery import Celery
from kombu.serialization import register
import pydanticserializer

# Register new serializer methods into kombu
register(
    "pydantic",
    pydanticserializer.pydantic_dumps,
    pydanticserializer.pydantic_loads,
    content_type="application/x-pydantic",
    content_encoding="utf-8",
)

celery_app = Celery()

celery_app.conf.update(
    task_serializer="pydantic",
    result_serializer="pydantic",
    event_serializer="pydantic",
    accept_content=["application/json", "application/x-pydantic"],
    result_accept_content=["application/json", "application/x-pydantic"],
)

So now that that is wired up you should be able to setup tasks that accept and return these models natively

@celery_app.task(bind=True)
def mytask(self, req: MyCustomTypeOne) -> MyCustomTypeOne:
  ...
  return MyCustomTypeTwo(myotherfield="...")

…As well as invoke them in this way


req = MyCustomTypeOne(myfield="...")
async_result = mytask.delay(input)
...
result: MyCustomTypeTwo = async_result.get()