Using QBitBridge

The workflow makes use of Prefect, Postgres, Slurm. We do not discuss setting a slurm service here as this service will likely be setup by the HPC centre.

Running A Workflow

To run a workflow, there a few key steps that are needed:

  • Launch Prefect and Postgres services on a node that allows running long-running services. These services (typically) do not require significant computational or memory resources.

  • Ensure you have set the PREFECT_API_URL to the appropriate port and hostname.

  • Launch with python <workflow.py>

Services to run Prefect

Running postgres

The workflows will be best run with a Prefect server running using uvicorn and a postgres database. The bundle includes two scripts located in workflow/scripts/ designed to launch these services. By default, there is an assumption that both of these services run on the same host but that does not need to be the case.

This can be started with start_postgres.sh script. This will launch the postgres database using the Singularity container engine. This script makes use of several key POSTGRES environment variables (like POSTGRES_ADDR). This will pull the latest postgres container image from docker hub to locally store it. This script could be altered to use other container engines as well.

Running Prefect Server

This can be started with start_prefect.sh. This will launch prefect using

python -m uvicorn \
  --app-dir ${prefect_python_venv}/lib/python3.11/site-packages/ \
  --factory prefect.server.api.server:create_app \
  --host 0.0.0.0 \
  --port 4200 \
  --timeout-keep-alive 10 \
  --limit-max-requests 4096 \
  --timeout-graceful-shutdown 7200 &

and when running on the command line with a simple workflow, one can follow the reported message and set export PREFECT_API_URL=http://${POSTGRES_ADDR}:4200/api where POSTGRES_ADD will be replaced with the host on which the postgres database will be run.

Note

At the moment, scripts geared towards using Singularity and on the same host. You will need to alter it to use other container engines.

Prefect UI

Once these services are running you can use your browser to load up the Prefect UI by using 127.0.0.1:4200 once you have setup an ssh tunnel ssh -N -f -L 4200:<remote>:4200 <remote>.

Setup cluster yaml file

It is important to consider what resources a given flow will need. This will be set by the DaskTaskRunner used by the flow, which is defined in the cluster yaml file. This it is important to write an appropriate cluster configuration for the HPC cluster and the available Slurm partitions. Examples of a cluster configurations can be found in workflow/clusters/. This configuration needs a certain minimum set of named configurations:

  • generic: a small resource for running computationally simple tasks (such as launching some services)

  • vqpu: for running gpu-accelerated vQPU service.

  • circuit: for running circuit submission service (likely need to ensure Python paths are correct)

  • cpu: for moderate cpu-heavy tasks

  • gpu: for gpu tasks

You will find in the examples that generic-aws, circuit-aws are also present to run aws tasks which need environment variables setup to log in to the AWS CLI. Similarly there are generic-quera, circuit-quera that also need some environment variables set for access.

Designing A Hybrid Workflow

There are several things to consider when designing a workflow. As a start point, we suggest looking at examples of some workflows in the examples/flows/ directory (see Example Workflow).

Let’s start by using the tutorial_workflow.py. This workflow starts with some key imports

QbitBridge imports

Load the key classes and other useful routines

# import key classes from
from qbitbridge.vqpubase import (
   QPUMetaData,
   HybridQuantumWorkflowBase,
)

# import useful utilities
from qbitbridge.utils import (
   EventFile, # Event files
   save_artifact, # to save prefect artificats
   get_num_gpus, # to get number of gpus
)

# import basic flows and tasks  from the vqpuflow as desired
from qbitbridge.vqpuflow import (
   # tasks
   run_cpu, # run a cpu task
   run_gpu, # run a gpu task
   # and here are some flows
   launch_vqpu_workflow, # launch a vqpu workflow
   cpu_workflow, # launch a cpu workflow
   gpu_workflow, # launch a gpu workflow
)

Prefect imports

It will also be critical to import relevant prefect items

import asyncio # for asynchronous tasks and flows
from prefect import task, flow # task and flow decorators
from prefect.logging import get_run_logger #logger

Define tasks and flows

Then you can start defining some simple tasks. Here we have a standard Prefect task along with an ayncio async task.

# let's create some tasks
@task(name="Example task", task_run_name="example_task-{date:%Y-%m-%d:%H:%M:%S}")
def simple_task(
   date: datetime.datetime = datetime.datetime.now(),
):
   """Task"""
   pass


@task(
   name="Example async task",
   task_run_name="example_async_task-{date:%Y-%m-%d:%H:%M:%S}",
)
async def simple_async_task(
   date: datetime.datetime = datetime.datetime.now(),
):
   """Async task"""
   pass

Flows can be simple and just submit several tasks and get the results or try to have some dependency between tasks. We show a standard Prefect flow and an asynio asynchronous one.

@flow(
   name="Example flow",
   flow_run_name="example_flow-{date:%Y-%m-%d:%H:%M:%S}",
)
def workflow(
   myqpuworkflow: HybridQuantumWorkflowBase,
   date: datetime.datetime = datetime.datetime.now(),
) -> None:
   """Example flow"""
   logger = get_run_logger()
   logger.info("Example flow")
   # let's submit a task to the flow
   future = simple_task.submit()
   # and then get results
   future.result()
   logger.info("Finished flow")


@flow(
   name="Example async flow",
   flow_run_name="example_async_flow-{date:%Y-%m-%d:%H:%M:%S}",
)
async def async_workflow(
   myqpuworkflow: HybridQuantumWorkflowBase,
   date: datetime.datetime = datetime.datetime.now(),
) -> None:
   """Example async flow that can call asynchronous functions"""
   logger = get_run_logger()
   logger.info("Example async flow")
   # submit several tasks at once
   futures = []
   for i in range(10):
      futures.append(simple_task.submit())
   for f in futures:
      f.result()
   # We can also submit some asynchronous tasks
   async with asyncio.TaskGroup() as tg:
      for i in range(10):
            tg.create_task(simple_async_task.submit())
      # once the async taskgroup is finished all tasks have been submited
   # we can also just create a list of tasks
   tg = []
   for i in range(10):
      tg.append(asyncio.create_task(simple_async_task.submit()))
   done, pending = await asyncio.wait(tg)
   # once they are all done, let's get the results
   for d in done:
      d.result()

   logger.info("Finished async flow")

More details of a complex flow can be found in Example Workflow.