주 콘텐츠로 건너뛰기

Qiskit Serverless 컴퓨팅 및 데이터 리소스 관리

패키지 버전

이 페이지의 코드는 다음 요구 사항을 사용하여 개발되었습니다. 이 버전 이상을 사용하는 것을 권장합니다.

qiskit[all]~=2.0.0
qiskit-ibm-runtime~=0.37.0
qiskit-serverless~=0.22.0

Qiskit Serverless를 사용하면 CPU, QPU 및 기타 컴퓨팅 가속기를 포함하여 Qiskit 패턴 전반의 컴퓨팅 및 데이터를 관리할 수 있습니다.

세부 상태 설정

Serverless 워크로드는 워크플로우 전반에 걸쳐 여러 단계를 거칩니다. 기본적으로 job.status()로 다음 상태를 확인할 수 있습니다:

  • QUEUED: 워크로드가 클래식 리소스 대기열에 있습니다
  • INITIALIZING: 워크로드가 설정 중입니다
  • RUNNING: 워크로드가 현재 클래식 리소스에서 실행 중입니다
  • DONE: 워크로드가 성공적으로 완료되었습니다

다음과 같이 특정 워크플로우 단계를 더 자세히 설명하는 사용자 정의 상태를 설정할 수도 있습니다.

# Added by doQumentation — required packages for this notebook
!pip install -q qiskit qiskit-ibm-runtime qiskit-serverless
# This cell is hidden from users, it just creates a new folder
from pathlib import Path

Path("./source_files").mkdir(exist_ok=True)
%%writefile ./source_files/status_example.py

from qiskit_serverless import update_status, Job

## If your function has a mapping stage, particularly application functions, you can set the status to "RUNNING: MAPPING" as follows:
update_status(Job.MAPPING)

## While handling transpilation, error suppression, and so forth, you can set the status to "RUNNING: OPTIMIZING_FOR_HARDWARE":
update_status(Job.OPTIMIZING_HARDWARE)

## After you submit jobs to Qiskit Runtime, the underlying quantum job will be queued. You can set status to "RUNNING: WAITING_FOR_QPU":
update_status(Job.WAITING_QPU)

## When the Qiskit Runtime job starts running on the QPU, set the following status "RUNNING: EXECUTING_QPU":
update_status(Job.EXECUTING_QPU)

## Once QPU is completed and post-processing has begun, set the status "RUNNING: POST_PROCESSING":
update_status(Job.POST_PROCESSING)
Writing ./source_files/status_example.py

이 워크로드가 성공적으로 완료되면 (save_result() 사용 시), 이 상태는 자동으로 DONE으로 업데이트됩니다.

병렬 워크플로우

병렬화할 수 있는 클래식 작업의 경우, @distribute_task 데코레이터를 사용하여 작업 수행에 필요한 컴퓨팅 요구 사항을 정의하세요. 첫 번째 Qiskit Serverless 프로그램 작성 주제에서 다음 코드로 transpile_remote.py 예시를 다시 확인하는 것으로 시작합니다.

다음 코드를 실행하려면 이미 자격 증명을 저장해야 합니다.

%%writefile ./source_files/transpile_remote.py

from qiskit.transpiler import generate_preset_pass_manager
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_serverless import distribute_task

service = QiskitRuntimeService()

@distribute_task(target={"cpu": 1})
def transpile_remote(circuit, optimization_level, backend):
"""Transpiles an abstract circuit (or list of circuits) into an ISA circuit for a given backend."""
pass_manager = generate_preset_pass_manager(
optimization_level=optimization_level,
backend=service.backend(backend)
)
isa_circuit = pass_manager.run(circuit)
return isa_circuit
Writing ./source_files/transpile_remote.py

이 예시에서는 transpile_remote() 함수에 @distribute_task(target={"cpu": 1})를 데코레이터로 적용했습니다. 실행하면 단일 CPU 코어를 가진 비동기 병렬 워커 작업이 생성되며, 워커를 추적할 수 있는 참조를 반환합니다. 결과를 가져오려면 참조를 get() 함수에 전달하세요. 이를 사용하여 여러 병렬 작업을 실행할 수 있습니다:

%%writefile --append ./source_files/transpile_remote.py

from time import time
from qiskit_serverless import get, get_arguments, save_result, update_status, Job

# Get arguments
arguments = get_arguments()
circuit = arguments.get("circuit")
optimization_level = arguments.get("optimization_level")
backend = arguments.get("backend")
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Start distributed transpilation

update_status(Job.OPTIMIZING_HARDWARE)

start_time = time()
transpile_worker_references = [
transpile_remote(circuit, optimization_level, backend)
for circuit in arguments.get("circuit_list")
]

transpiled_circuits = get(transpile_worker_references)
end_time = time()
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Save result, with metadata

result = {
"circuits": transpiled_circuits,
"metadata": {
"resource_usage": {
"RUNNING: OPTIMIZING_FOR_HARDWARE": {
"CPU_TIME": end_time - start_time,
"QPU_TIME": 0,
},
}
},
}

save_result(result)
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It uploads the serverless program and checks it runs.

def test_serverless_job(title, entrypoint):
# Import in function to stop them interfering with user-facing code
from qiskit.circuit.random import random_circuit
from qiskit_serverless import IBMServerlessClient, QiskitFunction
import time
import uuid

title += "_" + uuid.uuid4().hex[:8]
serverless = IBMServerlessClient()
transpile_remote_demo = QiskitFunction(
title=title,
entrypoint=entrypoint,
working_dir="./source_files/",
)
serverless.upload(transpile_remote_demo)
job = serverless.get(title).run(
circuit=random_circuit(3, 3),
circuit_list=[random_circuit(3, 3) for _ in range(3)],
backend="ibm_torino",
optimization_level=1,
)
for retry in range(25):
time.sleep(5)
status = job.status()
if status == "DONE":
print("Job completed successfully")
return
if status not in [
"QUEUED",
"INITIALIZING",
"RUNNING",
"RUNNING: OPTIMIZING_FOR_HARDWARE",
"DONE",
]:
raise Exception(
f"Unexpected job status '{status}'.\nHere's the logs:\n"
+ job.logs()
)
print(f"Waiting for job (status '{status}')")
raise Exception("Job did not complete in time")

test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully

다양한 작업 구성 탐색

@distribute_task()를 통해 작업에 CPU, GPU 및 메모리를 유연하게 할당할 수 있습니다. IBM Quantum® Platform의 Qiskit Serverless에서는 각 프로그램에 16개의 CPU 코어와 32GB RAM이 제공되며, 필요에 따라 동적으로 할당할 수 있습니다.

CPU 코어는 다음과 같이 전체 CPU 코어 또는 분수 할당으로도 할당할 수 있습니다.

메모리는 바이트 수로 할당됩니다. 1킬로바이트는 1024바이트, 1메가바이트는 1024킬로바이트, 1기가바이트는 1024메가바이트임을 기억하세요. 워커에 2GB의 메모리를 할당하려면 "mem": 2 * 1024 * 1024 * 1024를 할당해야 합니다.

%%writefile --append ./source_files/transpile_remote.py

@distribute_task(target={
"cpu": 16,
"mem": 2 * 1024 * 1024 * 1024
})
def transpile_remote(circuit, optimization_level, backend):
return None
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It checks the distributed program works.
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully

프로그램 전반의 데이터 관리

Qiskit Serverless를 사용하면 모든 프로그램에서 /data 디렉토리의 파일을 관리할 수 있습니다. 여기에는 몇 가지 제한 사항이 있습니다:

  • 현재는 tarh5 파일만 지원됩니다
  • 이는 단순한 /data 플랫 스토리지이며, /data/folder/ 하위 디렉토리를 가질 수 없습니다

다음은 파일을 업로드하는 방법을 보여줍니다. IBM Quantum 계정으로 Qiskit Serverless에 인증되어 있는지 확인하세요 (지침은 IBM Quantum Platform에 배포를 참조하세요).

import tarfile
from qiskit_serverless import IBMServerlessClient

# Create a tar
filename = "transpile_demo.tar"
file = tarfile.open(filename, "w")
file.add("./source_files/transpile_remote.py")
file.close()

# Get a reference to a QiskitFunction
serverless = IBMServerlessClient()
transpile_remote_demo = next(
program
for program in serverless.list()
if program.title == "transpile_remote_serverless"
)

# Upload the tar to Serverless data directory
serverless.file_upload(file=filename, function=transpile_remote_demo)
'{"message":"/usr/src/app/media/5e1f442128cdf60018496a04/transpile_demo.tar"}'

다음으로 data 디렉토리의 모든 파일을 나열할 수 있습니다. 이 데이터는 모든 프로그램에서 접근할 수 있습니다.

serverless.files(function=transpile_remote_demo)
['classifier_name.pkl.tar', 'output.json.tar', 'transpile_demo.tar']

프로그램에서 file_download()를 사용하여 프로그램 환경에 파일을 다운로드하고 tar를 압축 해제하는 방식으로 이를 수행할 수 있습니다.

%%writefile ./source_files/extract_tarfile.py

import tarfile
from qiskit_serverless import IBMServerlessClient

serverless = IBMServerlessClient(token="<YOUR_API_KEY>") # Use the 44-character API_KEY you created and saved from the IBM Quantum Platform Home dashboard
files = serverless.files()
demo_file = files[0]
downloaded_tar = serverless.file_download(demo_file)

with tarfile.open(downloaded_tar, 'r') as tar:
tar.extractall()

이 시점에서 프로그램은 로컬 실험처럼 파일과 상호 작용할 수 있습니다. file_upload(), file_download(), file_delete()는 로컬 실험 또는 업로드된 프로그램에서 호출할 수 있어 일관되고 유연한 데이터 관리가 가능합니다.

다음 단계

권장 사항