Deploying OCI Container Images to Bare Metal with a Custom IPA Hardware Manager
What if you could deploy any OCI container image directly to bare metal, without building traditional disk images? Back in 2021, Dmitry Tantsur implemented custom deploy steps for Ironic, enabling alternative deployment methods beyond the standard image-based approach. This feature powers OpenShift’s bare metal provisioning with CoreOS, yet it remains surprisingly unknown to the broader Metal3 community. This post aims to change that by providing an example implementation of a custom IPA hardware manager that deploys Debian-based container images with EFI boot, LVM root filesystem, and optional RAID1 mirroring.
The Problem with Traditional Image-Based Deployments
Traditional bare metal provisioning with Metal3 and Ironic typically requires pre-built disk images. You need to maintain these images, update them regularly, and ensure they contain all necessary drivers and configurations. This approach has some drawbacks:
- Image building complexity - Building and maintaining OS disk images is not as trivial as creating container images
- Software RAID limitations - Image-based deployments with mdadm RAID and EFI boot require workarounds
What if we could leverage the container ecosystem instead? Container registries already solve the distribution problem, and OCI images are versioned, layered, simple to build and widely available. This approach allows you to:
- Use standard container images from any registry
- Avoid maintaining custom disk images
- Easily switch between OS versions by updating
spec.image.url - Get RAID1 redundancy with minimal configuration
Introducing the deb_oci_efi_lvm Hardware Manager
The DebOCIEFILVMHardwareManager
is a custom IPA hardware manager that deploys Debian-based OCI container
images directly to bare metal. It
provides:
- EFI boot support - UEFI boot with GRUB, which unlike systemd-boot, supports booting from LVM on top of mdadm software RAID
- LVM root filesystem - Flexible volume management for the root partition
- Optional RAID1 - Software mirroring across two disks for redundancy
- Cloud-init integration - Ironic configdrive data is written directly to the root filesystem, no separate configdrive partition
- Multi-architecture - Supports x86_64 and ARM64 via OCI multi-arch images
How It Works
The deployment process extracts an OCI image using Google’s crane tool,
then installs the necessary boot infrastructure on top. The hardware
manager supports three methods for specifying the OCI image (in priority
order):
spec.image.urlwithoci://prefix (e.g.,oci://debian:12)- Configdrive metadata annotation
bmh.metal3.io/oci_image - Default fallback:
ubuntu:24.04
Root device hints can be specified using either standard BareMetalHost
rootDeviceHints fields or a simplified format via the
bmh.metal3.io/root_device_hints annotation (e.g., serial=ABC123 or
wwn=0x123456). For RAID1 configurations, provide two space-separated
values (e.g., serial=ABC123 DEF456).
Note: Alternatively,
podmancan be used instead ofcranefor OCI image extraction, as it is readily available in CentOS Stream 9 and also has an export command. This would require code modifications to the hardware manager.
The hardware manager performs these steps during deployment:
- Resolve OCI image - Check
image_source, configdrive, or use default - Resolve target disks - Parse root device hints (serial or WWN)
- Clean existing data - Wipe partitions, RAID arrays, and LVM based on
disk wipe mode (
allfor RAID1,targetfor single disk by default) - Partition disks - Create 2GB EFI partition and LVM partition (with RAID1 if two disks are specified)
- Create filesystems - FAT32 for EFI, ext4 for root LV
- Extract OCI image - Use
crane exportpiped totarfor rootfs - Install packages - Add cloud-init, GRUB, kernel, mdadm, lvm2
- Configure boot - Set up GRUB, initramfs, and fstab
- Install bootloader - GRUB to both EFI partitions for RAID1
Disk Layout
The hardware manager creates the following partition layout:
| Partition | Size | Filesystem | Label | Mount Point |
|---|---|---|---|---|
| 1 (EFI) | 2 GB | FAT32 | EFI | /boot/efi |
| 2 (LVM/RAID) | Remaining | - | - | - |
The LVM configuration:
| Component | Name | Description |
|---|---|---|
| Volume Group | vg_root | Contains all logical volumes |
| Logical Volume | lv_root | Root filesystem (100% of VG) |
| Filesystem | ext4 | Label: ROOTFS |
For RAID1 configurations, both disks get identical partition tables, with partition 2 forming a RAID1 array that serves as the LVM physical volume.
Configuration
Basic Single-Disk Deployment
For a simple single-disk deployment, configure your BareMetalHost and Metal3MachineTemplate as follows:
apiVersion: metal3.io/v1alpha1
kind: BareMetalHost
metadata:
name: my-server
namespace: metal3
spec:
online: true
bootMode: UEFI
# Preferred method: Use spec.image.url with oci:// prefix
image:
url: "oci://debian:12"
rootDeviceHints:
serialNumber: "DISK_SERIAL_NUMBER"
Alternatively, you can use annotations or simplified hint formats:
apiVersion: metal3.io/v1alpha1
kind: BareMetalHost
metadata:
name: my-server-alt
namespace: metal3
annotations:
# Alternative: Override default ubuntu:24.04 via annotation
bmh.metal3.io/oci_image: "debian:12"
# Alternative: Use simplified hint format
bmh.metal3.io/root_device_hints: "serial=DISK_SERIAL_NUMBER"
spec:
online: true
bootMode: UEFI
The hardware manager supports three methods for specifying the OCI image (in priority order):
- spec.image.url with
oci://prefix (highest priority, recommended) - Annotation
bmh.metal3.io/oci_imagepassed via Metal3DataTemplate - Default
ubuntu:24.04(fallback)
Root device hints support both standard format (serialNumber: "ABC123")
and simplified format via annotation (bmh.metal3.io/root_device_hints: "serial=ABC123").
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: Metal3MachineTemplate
metadata:
name: my-worker-template
namespace: metal3
spec:
template:
spec:
customDeploy:
method: "deb_oci_efi_lvm"
dataTemplate:
name: my-data-template
RAID1 Configuration
For production deployments requiring disk redundancy, specify two disk serial numbers. The hardware manager supports multiple formats:
Method 1: Standard format with space-separated values
apiVersion: metal3.io/v1alpha1
kind: BareMetalHost
metadata:
name: my-ha-server
namespace: metal3
spec:
online: true
bootMode: UEFI
image:
url: "oci://debian:13"
rootDeviceHints:
# Two space-separated serial numbers enable RAID1
serialNumber: "DISK1_SERIAL DISK2_SERIAL"
Method 2: Simplified format via annotation
apiVersion: metal3.io/v1alpha1
kind: BareMetalHost
metadata:
name: my-ha-server-alt
namespace: metal3
annotations:
bmh.metal3.io/oci_image: "debian:13"
# Simplified RAID1 hint format
bmh.metal3.io/root_device_hints: "serial=DISK1_SERIAL DISK2_SERIAL"
spec:
online: true
bootMode: UEFI
With RAID1 enabled, the hardware manager will:
- Clean both disks (remove existing partitions, RAID arrays, and LVM)
- Create identical partition layouts on both disks
- Set up a RAID1 array (
/dev/md0) for the LVM physical volume - Install GRUB to both EFI partitions
- Configure a GRUB update hook to sync EFI partitions via rsync
Disk Wipe Mode Configuration
By default, the hardware manager wipes all block devices for RAID1 configurations (to prevent stray RAID/LVM metadata issues) and only target disks for single-disk setups. You can override this behavior:
apiVersion: metal3.io/v1alpha1
kind: BareMetalHost
metadata:
name: my-server
namespace: metal3
annotations:
# Control disk cleaning behavior
# "all" - Wipe all block devices (recommended for RAID1)
# "target" - Wipe only target disk(s) from root device hints
bmh.metal3.io/disk_wipe_mode: "all"
spec:
online: true
bootMode: UEFI
image:
url: "oci://ubuntu:24.04"
rootDeviceHints:
serialNumber: "DISK_SERIAL_NUMBER"
The disk_wipe_mode annotation is useful when:
- You have multiple disks and want to ensure clean RAID/LVM state (
all) - You want to preserve data on non-target disks (
target) - You’re migrating from a previous RAID configuration
Metal3DataTemplate Configuration
When using annotations (instead of spec.image.url), configure your
Metal3DataTemplate to pass them to the configdrive:
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: Metal3DataTemplate
metadata:
name: my-data-template
namespace: metal3
spec:
clusterName: my-cluster
metaData:
fromAnnotations:
# Optional: Pass OCI image annotation (only if not using spec.image.url)
- key: oci_image
object: baremetalhost
annotation: "bmh.metal3.io/oci_image"
# Optional: Pass simplified root device hint
- key: root_device_hints
object: baremetalhost
annotation: "bmh.metal3.io/root_device_hints"
# Optional: Pass disk wipe mode
- key: disk_wipe_mode
object: baremetalhost
annotation: "bmh.metal3.io/disk_wipe_mode"
objectNames:
- key: name
object: machine
- key: local-hostname
object: machine
- key: local_hostname
object: machine
- key: metal3-name
object: baremetalhost
- key: metal3-namespace
object: baremetalhost
networkData:
links:
ethernets:
- id: enp1s0
macAddress:
fromHostInterface: enp1s0
type: phy
networks:
ipv4:
- id: baremetalv4
ipAddressFromIPPool: my-ip-pool
link: enp1s0
routes:
- gateway:
fromIPPool: my-ip-pool
network: 0.0.0.0
prefix: 0
services:
dns:
- 8.8.8.8
Note: When using
spec.image.urlwith theoci://prefix, you don’t need to pass theoci_imageannotation through Metal3DataTemplate. The hardware manager reads directly frominstance_info.image_source. This is the recommended approach for newer deployments.
Building an IPA Image with the Hardware Manager
To use this hardware manager, you need to build a custom IPA ramdisk image using ironic-python-agent-builder. This tool uses diskimage-builder (DIB) to create bootable ramdisk images containing the IPA and any custom elements you need.
Required Packages
The hardware manager requires several packages to be present in the IPA ramdisk:
| Package | Purpose |
|---|---|
crane |
OCI image extraction from container registries |
mdadm |
Software RAID array management |
lvm2 |
Logical Volume Manager for root filesystem |
parted |
Disk partitioning |
dosfstools |
FAT32 filesystem creation for EFI partition |
grub2-efi-* |
UEFI bootloader installation |
curl |
Downloading files during deployment |
rsync |
EFI partition synchronization for RAID |
Custom DIB Elements
DIB elements are modular components that customize the image build. Each element is a directory containing scripts that run at different phases of the build:
| Directory | Phase | Description |
|---|---|---|
extra-data.d/ |
Pre-build | Copy files into build environment |
install.d/ |
Chroot | Run inside chroot during build |
post-install.d/ |
Post-install | Run after package installation |
finalise.d/ |
Finalize | Run at end of build process |
Scripts are named with a numeric prefix (e.g., 50-crane) to control
execution order.
DIB element: crane (OCI image tool)
Create a DIB element to install Google’s crane tool for OCI image
extraction. Create the following directory structure:
crane/
├── element-deps
└── install.d/
└── 50-crane
The element-deps file can be empty or list dependencies. The install
script (install.d/50-crane):
#!/bin/bash
# https://docs.openstack.org/diskimage-builder/latest/developer/developing_elements.html
if [ "${DIB_DEBUG_TRACE:-0}" -gt 0 ]; then
set -x
fi
set -eu
set -o pipefail
CRANE_VERSION="${DIB_CRANE_VERSION:-latest}"
# Detect architecture
ARCH=$(uname -m)
case "${ARCH}" in
x86_64)
CRANE_ARCH="x86_64"
;;
aarch64)
CRANE_ARCH="arm64"
;;
*)
echo "Unsupported architecture: ${ARCH}"
exit 1
;;
esac
echo "Installing crane (${CRANE_VERSION}) for ${CRANE_ARCH}..."
# Get the download URL
if [ "${CRANE_VERSION}" = "latest" ]; then
DOWNLOAD_URL=$(curl -s https://api.github.com/repos/google/go-containerregistry/releases/latest |
grep "browser_download_url.*Linux_${CRANE_ARCH}.tar.gz" |
cut -d '"' -f 4)
else
DOWNLOAD_URL="https://github.com/google/go-containerregistry/releases/download/${CRANE_VERSION}/go-containerregistry_Linux_${CRANE_ARCH}.tar.gz"
fi
if [ -z "${DOWNLOAD_URL}" ]; then
echo "Failed to determine crane download URL"
exit 1
fi
echo "Downloading crane from: ${DOWNLOAD_URL}"
# Download and extract crane
TEMP_DIR=$(mktemp -d)
curl -sL "${DOWNLOAD_URL}" | tar -xz -C "${TEMP_DIR}"
# Install crane binary
install -m 755 "${TEMP_DIR}/crane" /usr/local/bin/crane
# Cleanup
rm -rf "${TEMP_DIR}"
# Verify installation
if crane version; then
echo "crane installed successfully"
else
echo "crane installation verification failed"
exit 1
fi
DIB element: packages-install (extra packages)
Create a DIB element that installs packages from the DIB_EXTRA_PACKAGES
environment variable:
packages-install/
├── element-deps
└── install.d/
└── 50-packages-install
The install script (install.d/50-packages-install):
#!/bin/bash
# https://docs.openstack.org/diskimage-builder/latest/developer/developing_elements.html
if [ "${DIB_DEBUG_TRACE:-0}" -gt 0 ]; then
set -x
fi
set -eu
set -o pipefail
# Enable CRB (CodeReady Builder) repository and install EPEL
echo "Enabling CRB repository..."
dnf config-manager --set-enabled crb || true
# Detect CentOS version and install appropriate EPEL
if [ -f /etc/os-release ]; then
# shellcheck disable=SC1091
. /etc/os-release
case "${VERSION_ID%%.*}" in
9)
echo "Installing EPEL for CentOS 9..."
dnf install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-9.noarch.rpm || true
;;
10)
echo "Installing EPEL for CentOS 10..."
dnf install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-10.noarch.rpm || true
;;
*)
echo "Unknown CentOS version: ${VERSION_ID}, skipping EPEL installation"
;;
esac
fi
if [ -z "${DIB_EXTRA_PACKAGES:-}" ]; then
echo "No extra packages specified via DIB_EXTRA_PACKAGES, skipping"
exit 0
fi
echo "Updating system packages..."
dnf update -y
echo "Installing extra packages: ${DIB_EXTRA_PACKAGES}"
# shellcheck disable=SC2086
dnf install -y ${DIB_EXTRA_PACKAGES}
echo "Cleaning package cache..."
dnf clean all
echo "Extra packages installation complete"
Building the Image
Set the ELEMENTS_PATH to include your custom elements directory, then
run the builder:
export ELEMENTS_PATH="/path/to/your/dib-elements"
export DIB_EXTRA_PACKAGES="jq yq mdadm lvm2 curl parted util-linux \
squashfs-tools xfsprogs dosfstools grub2-efi-x64 grub2-tools rsync"
ironic-python-agent-builder \
-o ipa-custom \
-e extra-hardware \
-e crane \
-e packages-install \
--release 9-stream centos
This produces two files:
ipa-custom.kernel- The Linux kernelipa-custom.initramfs- The ramdisk containing IPA and tools
For ARM64 builds, the grub packages differ:
export DIB_EXTRA_PACKAGES="jq yq mdadm lvm2 curl parted util-linux \
squashfs-tools xfsprogs dosfstools grub2-efi-aa64 grub2-tools rsync"
Installing the Hardware Manager
The hardware manager must be placed in the IPA hardware managers directory
and registered in setup.cfg.
File location:
ironic_python_agent/hardware_managers/deb_oci_efi_lvm.py
setup.cfg entry point:
Add the following entry to the ironic_python_agent.hardware_managers
section in setup.cfg:
[entry_points]
ironic_python_agent.hardware_managers =
deb_oci_efi_lvm = ironic_python_agent.hardware_managers.deb_oci_efi_lvm:DebOCIEFILVMHardwareManager
This registers the hardware manager as a plugin, allowing IPA to discover and load it at runtime.
Source Code
The implementation is shown below in expandable sections. Full source: deb_oci_efi_lvm.py.
Note: The code below uses a custom
run_commandhelper function instead of IPA’s built-inironic_python_agent.utils.execute. This was a deliberate choice to minimize dependencies on IPA internals, avoiding the need to keep the hardware manager in constant sync with IPA changes. However, reusing IPA’s existing utilities is a valid alternative approach.
Imports and constants
Standard library and IPA imports, plus configuration constants for device paths, filesystem labels, and retry parameters.
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: 2025 s3rj1k
"""Debian/Ubuntu OCI EFI LVM deployment hardware manager.
This hardware manager deploys Debian-based OCI container images with:
- EFI boot partition
- LVM on root partition
- Optional RAID1 support for two-disk configurations
"""
import os
import platform
import re
import shutil
import stat as stat_module
import subprocess
import tempfile
import time
import yaml
from oslo_log import log
from ironic_python_agent import device_hints
from ironic_python_agent import hardware
LOG = log.getLogger(__name__)
# Default OCI image (can be overridden via node metadata 'oci_image')
DEFAULT_OCI_IMAGE = "ubuntu:24.04"
# Device/filesystem constants
RAID_DEVICE = "/dev/md0"
VG_NAME = "vg_root"
LV_NAME = "lv_root"
ROOT_FS_LABEL = "ROOTFS"
BOOT_FS_LABEL = "EFI"
BOOT_FS_LABEL2 = "EFI2"
DEVICE_PROBE_MAX_ATTEMPTS = 5
DEVICE_PROBE_DELAY = 5
DEVICE_WAIT_MAX_ATTEMPTS = 5
DEVICE_WAIT_DELAY = 5
run_command
Wrapper around subprocess.run with logging support.
def run_command(cmd, check=True, capture_output=True, timeout=300):
"""Run a shell command with logging.
:param cmd: Command as list of strings
:param check: Whether to raise on non-zero exit
:param capture_output: Whether to capture stdout/stderr
:param timeout: Command timeout in seconds
:returns: CompletedProcess object
:raises: subprocess.CalledProcessError on failure if check=True
"""
LOG.debug("Running command: %s", " ".join(cmd))
result = subprocess.run(
cmd, check=check, capture_output=capture_output, text=True, timeout=timeout
)
if result.stdout:
LOG.debug("stdout: %s", result.stdout)
if result.stderr:
LOG.debug("stderr: %s", result.stderr)
return result
is_efi_system
Checks if the system booted in UEFI mode by testing for /sys/firmware/efi.
def is_efi_system():
"""Check if the system is booted in EFI mode.
:returns: True if running under EFI, False otherwise
"""
return os.path.isdir("/sys/firmware/efi")
probe_device
Runs partprobe and waits for device to appear in the kernel.
def probe_device(device):
"""Probe device until it is visible in the kernel.
:param device: Device path (e.g., /dev/sda)
:raises: RuntimeError if device doesn't appear after max attempts
"""
for attempt in range(DEVICE_PROBE_MAX_ATTEMPTS):
run_command(["partprobe", device], check=False)
time.sleep(DEVICE_PROBE_DELAY)
if os.path.exists(device):
LOG.debug("Device %s visible after %d attempt(s)", device, attempt + 1)
return
raise RuntimeError(
f"Device {device} not visible after " f"{DEVICE_PROBE_MAX_ATTEMPTS} attempts"
)
has_interactive_users
Checks for logged-in users via who command, used to pause deployment
for debugging via BMC console.
def has_interactive_users():
"""Check if there are any interactive users logged in.
Uses 'who' command to check for logged-in users, which indicates
someone has connected via BMC console for debugging.
:returns: Boolean indicating if interactive users are logged in
"""
try:
result = run_command(["who"], check=True, timeout=5)
# who returns empty output if no users are logged in
users = result.stdout.strip()
if users:
LOG.debug("Interactive users detected: %s", users)
return True
return False
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, OSError) as e:
LOG.warning("Failed to check for interactive users: %s", e)
return False
get_configdrive_data
Extracts configdrive dictionary from node’s instance_info.
def get_configdrive_data(node):
"""Extract configdrive data from node instance_info.
:param node: Node dictionary containing instance_info
:returns: Dictionary containing configdrive data
:raises: ValueError if node is invalid or configdrive data is missing
"""
if node is None:
raise ValueError("Node cannot be None")
if not isinstance(node, dict):
raise ValueError("Node must be a dictionary")
instance_info = node.get("instance_info", {})
if not isinstance(instance_info, dict):
raise ValueError("instance_info must be a dictionary")
configdrive = instance_info.get("configdrive")
if configdrive is None:
raise ValueError("configdrive not found in instance_info")
if not isinstance(configdrive, dict):
raise ValueError("configdrive must be a dictionary")
LOG.info("Extracted configdrive data: %s", configdrive)
return configdrive
parse_prefixed_hint_string
Parses simplified hint format like serial=ABC123 or wwn=0x123456 into
IPA hint dictionary format. Supports RAID1 with space-separated values.
def parse_prefixed_hint_string(hint_string):
"""Parse a prefixed hint string into a hints dictionary.
Supports simplified format for cloud-init/annotation use cases:
- 'serial=ABC123' -> {'serial': 's== ABC123'}
- 'wwn=0x123456' -> {'wwn': 's== 0x123456'}
- 'serial=ABC123 DEF456' -> {'serial': 's== ABC123 DEF456'} (RAID1)
- 'wwn=0x123 0x456' -> {'wwn': 's== 0x123 0x456'} (RAID1)
:param hint_string: String with format 'hint_type=value1 [value2]'
:returns: Dictionary containing root_device hints
:raises: ValueError if format is invalid
"""
if not hint_string or not isinstance(hint_string, str):
raise ValueError("Hint string must be a non-empty string")
hint_string = hint_string.strip()
if "=" not in hint_string:
raise ValueError(
'Hint string must contain "=" separator. '
'Expected format: "serial=VALUE" or "wwn=VALUE"'
)
# Split on first equals only
parts = hint_string.split("=", 1)
if len(parts) != 2:
raise ValueError("Invalid hint string format")
hint_type = parts[0].strip().lower()
hint_values = parts[1].strip()
if hint_type not in ("serial", "wwn"):
raise ValueError(
f'Unsupported hint type "{hint_type}". '
'Only "serial" and "wwn" are supported.'
)
if not hint_values:
raise ValueError(f"No value provided for {hint_type} hint")
# Add s== operator prefix (string equality)
hint_with_operator = f"s== {hint_values}"
LOG.info(
'Parsed prefixed hint string "%s" -> {"%s": "%s"}',
hint_string,
hint_type,
hint_with_operator,
)
return {hint_type: hint_with_operator}
get_root_device_hints
Extracts root device hints from configdrive annotation or node’s
instance_info. Supports both simplified string format
(serial=ABC123) and standard dictionary format.
def get_root_device_hints(node, configdrive_data):
"""Extract root_device hints from node instance_info or annotation.
Priority order:
1. configdrive meta_data.root_device_hints (prefixed string format)
1. node.instance_info.root_device (dict format with operators)
:param node: Node dictionary containing instance_info
:param configdrive_data: Configdrive dictionary
:returns: Dictionary containing root_device hints
:raises: ValueError if node is invalid or root_device not found anywhere
"""
if node is None:
raise ValueError("Node cannot be None")
if not isinstance(node, dict):
raise ValueError("Node must be a dictionary")
instance_info = node.get("instance_info", {})
if not isinstance(instance_info, dict):
raise ValueError("instance_info must be a dictionary")
# Check annotation first (via configdrive metadata)
meta_data = configdrive_data.get("meta_data", {})
annotation_hints = meta_data.get("root_device_hints")
if annotation_hints is not None:
# Annotations use prefixed string format only
if not isinstance(annotation_hints, str):
raise ValueError(
"root_device_hints from annotation must be a string "
'in format "serial=VALUE" or "wwn=VALUE"'
)
parsed_hints = parse_prefixed_hint_string(annotation_hints)
LOG.info("Using root_device hints from annotation: %s", parsed_hints)
return parsed_hints
# Fall back to instance_info
root_device = instance_info.get("root_device")
if root_device is not None:
if not isinstance(root_device, dict):
raise ValueError("root_device must be a dictionary")
LOG.info("Using root_device hints from instance_info: %s", root_device)
return root_device
# Neither source provided root_device hints
raise ValueError("root_device hints not found in instance_info or annotation")
find_device_by_hints
Uses IPA’s device_hints module to find a block device by serial or WWN.
def find_device_by_hints(hints):
"""Find a single block device matching the given hints.
:param hints: Dictionary containing device hints (serial or wwn)
:returns: Device path (e.g., /dev/sda)
:raises: ValueError if no device or multiple devices match
"""
devices = hardware.list_all_block_devices()
LOG.debug("list_all_block_devices returned type: %s", type(devices).__name__)
LOG.info("Found %d block devices", len(devices))
serialized_devs = [dev.serialize() for dev in devices]
matched_raw = device_hints.find_devices_by_hints(serialized_devs, hints)
matched = list(matched_raw)
if not matched:
raise ValueError(f"No device found matching hints: {hints}")
if len(matched) > 1:
device_names = [dev["name"] for dev in matched]
raise ValueError(
f"Multiple devices match hints: {device_names}. "
f"Hints must match exactly one device."
)
return matched[0]["name"]
parse_hint_values
Parses hint strings, stripping operator prefixes and splitting multiple values for RAID1 configurations.
def parse_hint_values(hint):
"""Parse hint value, handling operator prefixes like 's=='.
Returns list of values without the operator prefix.
For RAID1: 's== SERIAL1 SERIAL2' -> ['SERIAL1', 'SERIAL2']
For single: 's== SERIAL1' -> ['SERIAL1']
For plain: 'SERIAL1 SERIAL2' -> ['SERIAL1', 'SERIAL2']
:param hint: Hint string value (may include operator prefix)
:returns: List of values without operator prefix
"""
if not hint:
return []
parts = hint.split()
# Check if first part is an operator (e.g., 's==', 'int', etc.)
operators = ("s==", "s!=", "<in>", "<or>", "int", "float")
if parts and parts[0] in operators:
return parts[1:] # Skip the operator
return parts
resolve_root_devices
Resolves device paths from hints. Returns one device for single-disk or two devices for RAID1 configuration.
def resolve_root_devices(root_device_hints):
"""Resolve root device path(s) from hints.
Only serial or wwn hints are supported. If the hint contains two
space-separated values, both devices are resolved for RAID1 setup.
:param root_device_hints: Dictionary containing root device hints
:returns: Tuple of device paths - (primary,) for single device or
(primary, secondary) for RAID1 configuration
:raises: ValueError if device cannot be resolved or hints are invalid
"""
if root_device_hints is None:
raise ValueError("root_device_hints cannot be None")
if not isinstance(root_device_hints, dict):
raise ValueError("root_device_hints must be a dictionary")
# Validate that only serial or wwn hints are present
serial_hint = root_device_hints.get("serial")
wwn_hint = root_device_hints.get("wwn")
if not serial_hint and not wwn_hint:
raise ValueError("root_device_hints must contain serial or wwn hint")
# Check for unsupported hint types
supported_hints = {"serial", "wwn"}
provided_hints = set(root_device_hints.keys())
unsupported = provided_hints - supported_hints
if unsupported:
raise ValueError(
f"Unsupported root_device hints: {unsupported}. "
f"Only serial and wwn are supported."
)
LOG.info("Resolving root devices from hints: %s", root_device_hints)
# Parse hints - may contain one or two values (with optional operator)
serial_values = parse_hint_values(serial_hint)
wwn_values = parse_hint_values(wwn_hint)
# Determine if this is a RAID1 configuration
is_raid = len(serial_values) == 2 or len(wwn_values) == 2
if is_raid:
LOG.info("RAID1 configuration detected")
# Resolve primary device
primary_hints = {}
if serial_values:
primary_hints["serial"] = serial_values[0]
if wwn_values:
primary_hints["wwn"] = wwn_values[0]
primary_device = find_device_by_hints(primary_hints)
LOG.info("Resolved primary device: %s", primary_device)
if not is_raid:
return (primary_device,)
# Resolve secondary device for RAID1
secondary_hints = {}
if len(serial_values) == 2:
secondary_hints["serial"] = serial_values[1]
if len(wwn_values) == 2:
secondary_hints["wwn"] = wwn_values[1]
secondary_device = find_device_by_hints(secondary_hints)
LOG.info("Resolved secondary device: %s", secondary_device)
return (primary_device, secondary_device)
get_oci_image
Gets OCI image reference with priority: spec.image.url (with oci://
prefix) > configdrive annotation > default ubuntu:24.04.
def get_oci_image(node, configdrive_data):
"""Get OCI image from instance_info, metadata, or use default.
Priority order:
1. node.instance_info.image_source with oci:// prefix
1. configdrive meta_data.oci_image (from annotation)
1. DEFAULT_OCI_IMAGE
:param node: Node dictionary containing instance_info
:param configdrive_data: Configdrive dictionary
:returns: OCI image reference string (without oci:// prefix)
"""
oci_image = None
# Check instance_info first
instance_info = node.get("instance_info", {})
image_source = instance_info.get("image_source", "").strip()
if image_source.startswith("oci://"):
oci_image = image_source.removeprefix("oci://").strip()
if not oci_image:
LOG.warning(
"Empty OCI image after stripping oci:// prefix, "
"falling back to annotation/default"
)
oci_image = None
else:
LOG.info("Using OCI image from instance_info: %s", oci_image)
else:
# Fall back to annotation (via configdrive metadata)
meta_data = configdrive_data.get("meta_data", {})
annotation_image = (meta_data.get("oci_image") or "").strip()
if annotation_image:
oci_image = annotation_image
LOG.info("Using OCI image from annotation: %s", oci_image)
else:
# Fall back to default
oci_image = DEFAULT_OCI_IMAGE
LOG.info("Using default OCI image: %s", oci_image)
return oci_image
get_disk_wipe_mode
Determines disk cleaning behavior based on annotation or setup type. Returns
all to wipe all block devices (default for RAID1) or target to wipe only
specified disks (default for single disk).
def get_disk_wipe_mode(configdrive_data, is_raid):
"""Get disk wipe mode from configdrive or use default based on setup.
Priority order:
1. configdrive meta_data.disk_wipe_mode (from annotation)
1. Default: "all" for RAID1, "target" for single disk
:param configdrive_data: Configdrive dictionary
:param is_raid: Boolean indicating if this is a RAID setup
:returns: String "all" or "target"
:raises: ValueError if disk_wipe_mode has invalid value
"""
meta_data = configdrive_data.get("meta_data", {})
wipe_mode = (meta_data.get("disk_wipe_mode") or "").strip().lower()
if wipe_mode:
if wipe_mode not in ("all", "target"):
raise ValueError(
f'Invalid disk_wipe_mode "{wipe_mode}". '
'Valid values are: "all", "target"'
)
LOG.info("Using disk wipe mode from annotation: %s", wipe_mode)
return wipe_mode
# Use default based on setup type
default_mode = "all" if is_raid else "target"
LOG.info(
"Using default disk wipe mode for %s setup: %s",
"RAID1" if is_raid else "single disk",
default_mode,
)
return default_mode
get_architecture_config
Returns architecture-specific settings for x86_64 or ARM64, including GRUB packages and UEFI target.
def get_architecture_config(oci_image):
"""Get architecture-specific configuration.
:param oci_image: OCI image reference to use
:returns: Dictionary with oci_image, oci_platform, uefi_target,
and grub_packages
:raises: RuntimeError if architecture is not supported
"""
machine = platform.machine()
if machine == "x86_64":
return {
"oci_image": oci_image,
"oci_platform": "linux/amd64",
"uefi_target": "x86_64-efi",
"grub_packages": ["grub-efi-amd64", "grub-efi-amd64-signed", "shim-signed"],
}
elif machine == "aarch64":
return {
"oci_image": oci_image,
"oci_platform": "linux/arm64",
"uefi_target": "arm64-efi",
"grub_packages": ["grub-efi-arm64", "grub-efi-arm64-bin"],
}
else:
raise RuntimeError(f"Unsupported architecture: {machine}")
wait_for_device
Waits for a block device to become available with retries.
def wait_for_device(device):
"""Wait for a block device to become available.
:param device: Device path (e.g., /dev/sda)
:returns: True if device is available
:raises: RuntimeError if device doesn't appear
"""
for attempt in range(DEVICE_WAIT_MAX_ATTEMPTS):
if os.path.exists(device):
try:
mode = os.stat(device).st_mode
if stat_module.S_ISBLK(mode):
LOG.info("Device %s is available", device)
return True
except OSError:
pass
LOG.debug(
"Waiting for device %s (attempt %d/%d)",
device,
attempt + 1,
DEVICE_WAIT_MAX_ATTEMPTS,
)
time.sleep(DEVICE_WAIT_DELAY)
raise RuntimeError(f"Device {device} did not become available")
get_partition_path
Returns partition path, handling NVMe and MMC naming conventions.
def get_partition_path(device, partition_number):
"""Get the partition path for a device.
:param device: Base device path (e.g., /dev/sda)
:param partition_number: Partition number
:returns: Partition path (e.g., /dev/sda1 or /dev/nvme0n1p1)
"""
if re.match(r".*/nvme\d+n\d+$", device) or re.match(r".*/mmcblk\d+$", device):
return f"{device}p{partition_number}"
return f"{device}{partition_number}"
clean_device
Removes existing partitions, RAID arrays, LVM structures, and wipes the device.
def clean_device(device):
"""Clean a device of existing partitions, RAID, and LVM.
:param device: Device path to clean
"""
LOG.info("Cleaning device: %s", device)
# Stop any RAID arrays using this device
try:
result = run_command(["lsblk", "-nlo", "NAME,TYPE", device], check=False)
for line in result.stdout.strip().split("\n"):
parts = line.split()
if len(parts) >= 2 and parts[1] in (
"raid1",
"raid0",
"raid5",
"raid6",
"raid10",
):
raid_dev = f"/dev/{parts[0]}"
run_command(["mdadm", "--stop", raid_dev], check=False)
except Exception:
pass
# Remove LVM if present (check device and all its partitions)
try:
# Get all block devices (device + partitions)
result = run_command(["lsblk", "-nlo", "NAME", device], check=False)
all_devs = []
for line in result.stdout.strip().split("\n"):
name = line.strip()
if name:
all_devs.append(f"/dev/{name}")
# Find all VGs that use any of these devices
vgs_to_remove = set()
for dev in all_devs:
result = run_command(["pvs", dev], check=False)
if result.returncode == 0:
vg_result = run_command(
["pvs", "--noheadings", "-o", "vg_name", dev], check=False
)
vg_name = vg_result.stdout.strip()
if vg_name:
vgs_to_remove.add(vg_name)
# Deactivate, remove all LVs and VGs
for vg_name in vgs_to_remove:
# Deactivate all LVs in this VG
run_command(["lvchange", "-an", vg_name], check=False)
lv_result = run_command(
["lvs", "--noheadings", "-o", "lv_path", vg_name], check=False
)
for lv_path in lv_result.stdout.strip().split("\n"):
lv_path = lv_path.strip()
if lv_path:
# Try dmsetup remove for stubborn LVs
dm_name = lv_path.replace("/dev/", "").replace("/", "-")
run_command(
["dmsetup", "remove", "--retry", "-f", dm_name], check=False
)
run_command(["lvremove", "-ff", lv_path], check=False)
run_command(["vgremove", "-ff", vg_name], check=False)
# Remove PVs from all devices
for dev in all_devs:
run_command(["pvremove", "-ff", "-y", dev], check=False)
except Exception:
pass
# Zero RAID superblocks
run_command(["mdadm", "--zero-superblock", "--force", device], check=False)
# Zero superblocks on partitions
try:
result = run_command(["lsblk", "-nlo", "NAME", device], check=False)
base_name = os.path.basename(device)
for line in result.stdout.strip().split("\n"):
name = line.strip()
if name and name != base_name:
part_dev = f"/dev/{name}"
run_command(
["mdadm", "--zero-superblock", "--force", part_dev], check=False
)
run_command(["wipefs", "--all", "--force", part_dev], check=False)
except Exception:
pass
# Wipe device
run_command(["wipefs", "--all", "--force", device], check=False)
run_command(["sgdisk", "--zap-all", device], check=False)
# Sync filesystem buffers and wait for udev to settle
run_command(["sync"], check=False)
run_command(["udevadm", "settle"], check=False)
# Probe until device is visible again
probe_device(device)
LOG.info("Device %s cleaned", device)
clean_all_devices
Cleans all block devices on the system to remove stray RAID/LVM metadata.
Useful when disk_wipe_mode is set to all (default for RAID1 setups).
def clean_all_devices():
"""Clean all block devices to remove stray RAID/LVM metadata.
Useful for nodes that may have multiple disks with old metadata
from previous deployments.
"""
LOG.info("Cleaning all block devices on the system")
try:
devices = hardware.list_all_block_devices()
LOG.info("Found %d block devices to clean", len(devices))
for device_obj in devices:
device = device_obj.name
try:
clean_device(device)
except Exception as e:
LOG.warning("Error cleaning device %s: %s", device, e)
LOG.info("Finished cleaning all block devices")
except Exception as e:
LOG.error("Error listing block devices: %s", e)
clean_partition_signatures
Cleans RAID, LVM, and filesystem signatures from a partition without
removing the partition itself. Used internally by partition_disk() to
clean partitions before creating RAID arrays, ensuring no stray metadata
causes issues.
def clean_partition_signatures(partition):
"""Clean RAID, LVM, and filesystem signatures from a partition.
Does not remove the partition itself, only metadata/signatures.
:param partition: Partition path to clean
"""
LOG.debug("Cleaning signatures from partition: %s", partition)
run_command(["pvremove", "-ff", "-y", partition], check=False)
run_command(["wipefs", "--all", "--force", partition], check=False)
run_command(["mdadm", "--zero-superblock", "--force", partition], check=False)
partition_disk
Creates GPT partition table with EFI and LVM partitions. Sets up RAID1
array if second device is provided. Calls clean_partition_signatures()
before RAID creation to ensure clean metadata.
def partition_disk(
device, vg_name, lv_name, second_device=None, raid_device=RAID_DEVICE, homehost=None
):
"""Partition disk with EFI and LVM (optionally on RAID).
:param device: Primary device path
:param vg_name: Volume group name
:param lv_name: Logical volume name
:param second_device: Optional second device for RAID
:param raid_device: RAID device path
:param homehost: Hostname for RAID array
:returns: Tuple of (is_raid, pv_device)
"""
LOG.info("Partitioning disk: %s", device)
wait_for_device(device)
# Ensure udev has finished processing before partitioning
run_command(["udevadm", "settle"], check=False)
# Create GPT partition table
run_command(["parted", "-s", device, "mklabel", "gpt"])
# Create EFI partition (2GB)
run_command(
[
"parted",
"-s",
"-a",
"optimal",
device,
"mkpart",
"primary",
"fat32",
"2MiB",
"2050MiB",
]
)
run_command(["parted", "-s", device, "set", "1", "esp", "on"])
# Create data partition (rest of disk)
run_command(
["parted", "-s", "-a", "optimal", device, "mkpart", "primary", "2050MiB", "99%"]
)
is_raid = second_device is not None
if is_raid:
run_command(["parted", "-s", device, "set", "2", "raid", "on"])
else:
run_command(["parted", "-s", device, "set", "2", "lvm", "on"])
# Wipe new partitions
try:
result = run_command(["lsblk", "-nlo", "NAME", device], check=False)
base_name = os.path.basename(device)
for line in result.stdout.strip().split("\n"):
name = line.strip()
if name and name != base_name:
run_command(["wipefs", "-a", f"/dev/{name}"], check=False)
except Exception:
pass
data_partition = get_partition_path(device, 2)
pv_device = data_partition
if is_raid:
probe_device(device)
probe_device(second_device)
# Clone partition table to second device
sfdisk_result = run_command(["sfdisk", "-d", device])
LOG.debug("Cloning partition table to %s", second_device)
sfdisk_proc = subprocess.run(
["sfdisk", "--force", second_device],
input=sfdisk_result.stdout,
capture_output=True,
text=True,
check=False,
)
if sfdisk_proc.stdout:
LOG.debug("sfdisk stdout: %s", sfdisk_proc.stdout)
if sfdisk_proc.stderr:
LOG.debug("sfdisk stderr: %s", sfdisk_proc.stderr)
if sfdisk_proc.returncode != 0:
raise subprocess.CalledProcessError(
sfdisk_proc.returncode,
["sfdisk", "--force", second_device],
sfdisk_proc.stdout,
sfdisk_proc.stderr,
)
# Randomize partition GUIDs on second device
run_command(["sgdisk", "--partition-guid=1:R", second_device])
run_command(["sgdisk", "--partition-guid=2:R", second_device])
second_data_partition = get_partition_path(second_device, 2)
probe_device(second_data_partition)
if not homehost:
raise RuntimeError("homehost required for RAID configuration")
# Clean new partitions before creating RAID
LOG.info("Cleaning partition signatures before RAID creation")
clean_partition_signatures(data_partition)
clean_partition_signatures(second_data_partition)
# Create RAID array
run_command(
[
"mdadm",
"--create",
raid_device,
"--level=1",
"--raid-devices=2",
"--metadata=1.2",
"--name=root",
"--bitmap=internal",
f"--homehost={homehost}",
"--force",
"--run",
"--assume-clean",
data_partition,
second_data_partition,
]
)
# Sync filesystem buffers before continuing
run_command(["sync"], check=False)
time.sleep(5)
pv_device = raid_device
else:
probe_device(device)
# Create LVM
run_command(["pvcreate", "-ff", "-y", "--zero", "y", pv_device])
run_command(["vgcreate", "-y", vg_name, pv_device])
run_command(["lvcreate", "-y", "-W", "y", "-n", lv_name, "-l", "100%FREE", vg_name])
LOG.info("Disk partitioned successfully, is_raid=%s", is_raid)
return is_raid, pv_device
create_filesystems
Creates FAT32 filesystem on EFI partition and ext4 on root LV.
def create_filesystems(
efi_partition,
root_lv_path,
boot_label=BOOT_FS_LABEL,
root_label=ROOT_FS_LABEL,
second_efi_partition=None,
boot_label2=BOOT_FS_LABEL2,
):
"""Create filesystems on partitions.
:param efi_partition: EFI partition path
:param root_lv_path: Root LV path
:param boot_label: EFI partition label
:param root_label: Root partition label
:param second_efi_partition: Second EFI partition for RAID
:param boot_label2: Second EFI partition label
"""
LOG.info("Creating filesystems")
run_command(["mkfs.vfat", "-F", "32", "-n", boot_label, efi_partition])
if second_efi_partition:
run_command(
["mkfs.vfat", "-F", "32", "-n", boot_label2, second_efi_partition],
check=False,
)
run_command(["mkfs.ext4", "-F", "-L", root_label, root_lv_path])
LOG.info("Filesystems created")
setup_chroot
Mounts /proc, /sys, /dev and sets up DNS resolution in chroot.
def setup_chroot(chroot_dir):
"""Set up chroot environment with necessary mounts.
:param chroot_dir: Path to chroot directory
"""
LOG.info("Setting up chroot: %s", chroot_dir)
run_command(["mount", "-t", "proc", "proc", f"{chroot_dir}/proc"])
run_command(["mount", "-t", "sysfs", "sys", f"{chroot_dir}/sys"])
run_command(["mount", "--bind", "/dev", f"{chroot_dir}/dev"])
run_command(["mount", "--bind", "/dev/pts", f"{chroot_dir}/dev/pts"])
os.makedirs(f"{chroot_dir}/run", exist_ok=True)
# Set up resolv.conf
resolv_link = os.path.join(chroot_dir, "etc", "resolv.conf")
if os.path.islink(resolv_link):
target = os.readlink(resolv_link)
if target.startswith("/"):
target_path = os.path.join(chroot_dir, target.lstrip("/"))
else:
target_path = os.path.join(chroot_dir, "etc", target)
os.makedirs(os.path.dirname(target_path), exist_ok=True)
shutil.copy("/etc/resolv.conf", target_path)
else:
shutil.copy("/etc/resolv.conf", resolv_link)
LOG.info("Chroot setup complete")
teardown_chroot
Unmounts chroot bind mounts in reverse order.
def teardown_chroot(chroot_dir):
"""Tear down chroot environment.
:param chroot_dir: Path to chroot directory
"""
LOG.info("Tearing down chroot: %s", chroot_dir)
mounts = [
f"{chroot_dir}/run",
f"{chroot_dir}/dev/pts",
f"{chroot_dir}/dev",
f"{chroot_dir}/sys",
f"{chroot_dir}/proc",
]
for mount in mounts:
try:
result = run_command(["mountpoint", "-q", mount], check=False)
if result.returncode == 0:
run_command(["umount", "-l", mount])
except Exception as e:
LOG.warning("Error unmounting %s: %s", mount, e)
LOG.info("Chroot teardown complete")
extract_oci_image
Extracts OCI image filesystem using crane export piped to tar.
def extract_oci_image(image, platform, dest_dir):
"""Extract OCI image rootfs using crane.
:param image: OCI image reference (e.g., ubuntu:24.04)
:param platform: Target platform (e.g., linux/amd64)
:param dest_dir: Destination directory for rootfs
"""
LOG.info("Extracting OCI image %s (%s) to %s", image, platform, dest_dir)
# Use crane export to extract the image filesystem
# crane export outputs a tar stream, pipe to tar for extraction
crane_cmd = ["crane", "export", "--platform", platform, image, "-"]
tar_cmd = ["tar", "-xf", "-", "-C", dest_dir]
LOG.info("Running: %s | %s", " ".join(crane_cmd), " ".join(tar_cmd))
# Create pipeline: crane export | tar extract
crane_proc = subprocess.Popen(
crane_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
tar_proc = subprocess.Popen(
tar_cmd, stdin=crane_proc.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
# Allow crane to receive SIGPIPE if tar exits
crane_proc.stdout.close()
# Wait for tar to complete
tar_stdout, tar_stderr = tar_proc.communicate(timeout=1800)
# Wait for crane to complete
crane_proc.wait()
if crane_proc.returncode != 0:
_, crane_stderr = crane_proc.communicate()
raise RuntimeError(
f"crane export failed with code {crane_proc.returncode}: "
f"{crane_stderr.decode() if crane_stderr else 'unknown error'}"
)
if tar_proc.returncode != 0:
raise RuntimeError(
f"tar extract failed with code {tar_proc.returncode}: "
f"{tar_stderr.decode() if tar_stderr else 'unknown error'}"
)
if tar_stderr:
LOG.debug("tar stderr: %s", tar_stderr.decode())
LOG.info("OCI image extraction complete")
install_packages
Installs cloud-init, GRUB, kernel, and other required packages via apt.
def install_packages(chroot_dir, grub_packages):
"""Install required packages in chroot.
:param chroot_dir: Path to chroot directory
:param grub_packages: List of GRUB packages to install
"""
LOG.info("Installing packages in chroot")
# Remove snap packages if present
snap_path = os.path.join(chroot_dir, "usr", "bin", "snap")
if os.path.exists(snap_path):
snap_patterns = [
"!/^Name|^core|^snapd|^lxd/",
"/^lxd/",
"/^core/",
"/^snapd/",
"!/^Name/",
]
for pattern in snap_patterns:
try:
run_command(
[
"chroot",
chroot_dir,
"sh",
"-c",
f"snap list 2>/dev/null | awk '{pattern} {{print $1}}' | "
"xargs -rI{} snap remove --purge {}",
],
check=False,
)
except Exception:
pass
# Update package lists
run_command(["chroot", chroot_dir, "apt-get", "update"])
# Remove unwanted packages one by one, ignoring errors for missing packages
for pkg in ["lxd", "lxd-agent-loader", "lxd-installer", "snapd"]:
run_command(
["chroot", chroot_dir, "apt-get", "--purge", "remove", "-y", pkg],
check=False,
)
# Install required packages
packages = [
"cloud-init",
"curl",
"efibootmgr",
"grub-common",
"initramfs-tools",
"lvm2",
"mdadm",
"netplan.io",
"rsync",
"sudo",
"systemd-sysv",
] + grub_packages
run_command(["chroot", chroot_dir, "apt-get", "install", "-y"] + packages)
# Install kernel based on distro
try:
os_release_path = os.path.join(chroot_dir, "etc", "os-release")
distro_id = None
version_id = None
if os.path.exists(os_release_path):
with open(os_release_path, "r", encoding="utf-8") as f:
for line in f:
if line.startswith("ID="):
distro_id = line.split("=")[1].strip().strip('"')
elif line.startswith("VERSION_ID="):
version_id = line.split("=")[1].strip().strip('"')
if distro_id == "ubuntu" and version_id:
# Ubuntu: install HWE kernel
run_command(
[
"chroot",
chroot_dir,
"apt-get",
"install",
"-y",
f"linux-generic-hwe-{version_id}",
],
check=False,
)
elif distro_id == "debian":
# Debian: install standard kernel metapackage
arch = platform.machine()
if arch == "x86_64":
kernel_pkg = "linux-image-amd64"
elif arch == "aarch64":
kernel_pkg = "linux-image-arm64"
else:
kernel_pkg = "linux-image-" + arch
run_command(
["chroot", chroot_dir, "apt-get", "install", "-y", kernel_pkg],
check=False,
)
except Exception as e:
LOG.warning("Error installing kernel: %s", e)
# Clean up removed packages
try:
result = run_command(["chroot", chroot_dir, "dpkg", "-l"], check=False)
rc_packages = []
for line in result.stdout.split("\n"):
if line.startswith("rc "):
parts = line.split()
if len(parts) >= 2:
rc_packages.append(parts[1])
if rc_packages:
run_command(
["chroot", chroot_dir, "apt-get", "purge", "-y"] + rc_packages,
check=False,
)
except Exception:
pass
run_command(
["chroot", chroot_dir, "apt-get", "autoremove", "--purge", "-y"], check=False
)
LOG.info("Package installation complete")
write_hosts_file
Writes /etc/hosts with localhost and IPv6 entries.
def write_hosts_file(mount_point, hostname):
"""Write /etc/hosts file with proper entries.
:param mount_point: Root mount point
:param hostname: System hostname
"""
LOG.info("Writing /etc/hosts file")
hosts_path = os.path.join(mount_point, "etc", "hosts")
with open(hosts_path, "w", encoding="utf-8") as f:
f.write(f"127.0.0.1\tlocalhost\t{hostname}\n")
f.write("\n")
f.write("# The following lines are desirable for IPv6 capable hosts\n")
f.write("::1\tip6-localhost\tip6-loopback\n")
f.write("fe00::0\tip6-localnet\n")
f.write("ff00::0\tip6-mcastprefix\n")
f.write("ff02::1\tip6-allnodes\n")
f.write("ff02::2\tip6-allrouters\n")
f.write("ff02::3\tip6-allhosts\n")
LOG.info("/etc/hosts written with hostname: %s", hostname)
configure_cloud_init
Configures cloud-init NoCloud datasource with metadata, userdata, and network config from configdrive.
def configure_cloud_init(mount_point, configdrive_data):
"""Configure cloud-init with configdrive data.
:param mount_point: Root mount point
:param configdrive_data: Configdrive dictionary
"""
LOG.info("Configuring cloud-init")
cloud_init_cfg_dir = os.path.join(mount_point, "etc", "cloud", "cloud.cfg.d")
os.makedirs(cloud_init_cfg_dir, exist_ok=True)
nocloud_seed_dir = os.path.join(
mount_point, "var", "lib", "cloud", "seed", "nocloud-net"
)
os.makedirs(nocloud_seed_dir, exist_ok=True)
# Write datasource config
datasource_cfg = os.path.join(cloud_init_cfg_dir, "99-nocloud-seed.cfg")
with open(datasource_cfg, "w", encoding="utf-8") as f:
f.write(
"""datasource_list: [ NoCloud, None ]
datasource:
NoCloud:
seedfrom: file:///var/lib/cloud/seed/nocloud-net/
"""
)
# Write meta-data
meta_data = configdrive_data.get("meta_data", {})
meta_data_path = os.path.join(nocloud_seed_dir, "meta-data")
with open(meta_data_path, "w", encoding="utf-8") as f:
yaml.safe_dump(meta_data, f, default_flow_style=False)
# Write user-data
user_data = configdrive_data.get("user_data", "")
user_data_path = os.path.join(nocloud_seed_dir, "user-data")
with open(user_data_path, "w", encoding="utf-8") as f:
f.write(user_data if user_data else "")
# Write network-config if present
network_data = configdrive_data.get("network_data", {})
if network_data:
network_config_path = os.path.join(nocloud_seed_dir, "network-config")
with open(network_config_path, "w", encoding="utf-8") as f:
yaml.safe_dump(network_data, f, default_flow_style=False)
# Set permissions
for filename in os.listdir(nocloud_seed_dir):
filepath = os.path.join(nocloud_seed_dir, filename)
os.chmod(filepath, 0o600)
LOG.info("Cloud-init configuration complete")
write_fstab
Writes /etc/fstab with root and EFI entries, plus second EFI for RAID.
def write_fstab(mount_point, root_label, boot_label, is_raid, boot_label2=None):
"""Write /etc/fstab.
:param mount_point: Root mount point
:param root_label: Root partition label
:param boot_label: EFI partition label
:param is_raid: Whether RAID is configured
:param boot_label2: Second EFI partition label
"""
LOG.info("Writing fstab")
fstab_path = os.path.join(mount_point, "etc", "fstab")
with open(fstab_path, "w", encoding="utf-8") as f:
f.write(f"LABEL={root_label}\t/\text4\terrors=remount-ro\t0\t1\n")
f.write(f"LABEL={boot_label}\t/boot/efi\tvfat\tumask=0077,nofail\t0\t1\n")
if is_raid and boot_label2:
f.write(
f"LABEL={boot_label2}\t/boot/efi2\tvfat\t"
f"umask=0077,nofail,noauto\t0\t2\n"
)
LOG.info("fstab written")
write_mdadm_conf
Writes /etc/mdadm/mdadm.conf with RAID array configuration.
def write_mdadm_conf(mount_point):
"""Write mdadm configuration.
:param mount_point: Root mount point
"""
LOG.info("Writing mdadm.conf")
mdadm_dir = os.path.join(mount_point, "etc", "mdadm")
os.makedirs(mdadm_dir, exist_ok=True)
mdadm_conf_path = os.path.join(mdadm_dir, "mdadm.conf")
with open(mdadm_conf_path, "w", encoding="utf-8") as f:
f.write("HOMEHOST <system>\n")
f.write("MAILADDR root\n")
# Append ARRAY lines from mdadm --detail --scan
result = run_command(["mdadm", "--detail", "--scan", "--verbose"])
with open(mdadm_conf_path, "a", encoding="utf-8") as f:
for line in result.stdout.split("\n"):
if line.startswith("ARRAY"):
f.write(line + "\n")
LOG.info("mdadm.conf written")
configure_initramfs
Configures initramfs-tools to include LVM and RAID modules.
def configure_initramfs(chroot_dir, is_raid):
"""Configure initramfs-tools for LVM and optionally RAID.
This ensures initramfs includes LVM modules.
:param chroot_dir: Chroot directory path
:param is_raid: Whether RAID is configured
"""
LOG.info("Configuring initramfs-tools")
initramfs_conf_dir = os.path.join(chroot_dir, "etc", "initramfs-tools", "conf.d")
os.makedirs(initramfs_conf_dir, exist_ok=True)
# Disable resume (no swap partition)
resume_conf = os.path.join(initramfs_conf_dir, "resume")
with open(resume_conf, "w", encoding="utf-8") as f:
f.write("RESUME=none\n")
# Force LVM inclusion in initramfs
# This is needed because during chroot, LVM volumes may not be
# detected by the initramfs-tools hooks
initramfs_conf = os.path.join(
chroot_dir, "etc", "initramfs-tools", "initramfs.conf"
)
if os.path.exists(initramfs_conf):
with open(initramfs_conf, "r", encoding="utf-8") as f:
content = f.read()
# Set MODULES to "most" to include storage drivers
content = re.sub(r"^MODULES=.*$", "MODULES=most", content, flags=re.MULTILINE)
with open(initramfs_conf, "w", encoding="utf-8") as f:
f.write(content)
# Add LVM modules explicitly
modules_file = os.path.join(chroot_dir, "etc", "initramfs-tools", "modules")
lvm_modules = ["dm-mod", "dm-snapshot", "dm-mirror", "dm-zero"]
if is_raid:
lvm_modules.extend(["raid1", "md-mod"])
existing_modules = set()
if os.path.exists(modules_file):
with open(modules_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
existing_modules.add(line)
with open(modules_file, "a", encoding="utf-8") as f:
for module in lvm_modules:
if module not in existing_modules:
f.write(f"{module}\n")
LOG.info("initramfs-tools configuration complete")
setup_grub_defaults
Configures /etc/default/grub with root device and RAID options.
def setup_grub_defaults(chroot_dir, root_label, is_raid):
"""Configure GRUB defaults.
:param chroot_dir: Chroot directory path
:param root_label: Root partition label
:param is_raid: Whether RAID is configured
"""
LOG.info("Setting up GRUB defaults")
grub_default = os.path.join(chroot_dir, "etc", "default", "grub")
with open(grub_default, "r", encoding="utf-8") as f:
content = f.read()
# Build GRUB_CMDLINE_LINUX
cmdline = f"root=LABEL={root_label}"
if is_raid:
cmdline += " rd.auto=1"
# Update GRUB_CMDLINE_LINUX
content = re.sub(
r"^#*\s*GRUB_CMDLINE_LINUX=.*$",
f'GRUB_CMDLINE_LINUX="{cmdline}"',
content,
flags=re.MULTILINE,
)
# Update GRUB_DISABLE_LINUX_UUID
if "GRUB_DISABLE_LINUX_UUID=" in content:
content = re.sub(
r"^#*\s*GRUB_DISABLE_LINUX_UUID=.*$",
"GRUB_DISABLE_LINUX_UUID=true",
content,
flags=re.MULTILINE,
)
else:
content += "\nGRUB_DISABLE_LINUX_UUID=true\n"
# Add rootdelay for RAID
if is_raid:
if "GRUB_CMDLINE_LINUX_DEFAULT=" in content:
if "rootdelay=" not in content:
content = re.sub(
r'^(#*\s*GRUB_CMDLINE_LINUX_DEFAULT="[^"]*)',
r"\1 rootdelay=10",
content,
flags=re.MULTILINE,
)
else:
content += '\nGRUB_CMDLINE_LINUX_DEFAULT="rootdelay=10"\n'
with open(grub_default, "w", encoding="utf-8") as f:
f.write(content)
LOG.info("GRUB defaults configured")
setup_grub_efi_sync
Creates GRUB hook script to sync EFI partitions for RAID redundancy.
def setup_grub_efi_sync(chroot_dir, boot_label2):
"""Set up GRUB hook to sync EFI partitions for RAID.
:param chroot_dir: Chroot directory path
:param boot_label2: Second EFI partition label
"""
LOG.info("Setting up GRUB EFI sync hook")
grub_hook = os.path.join(chroot_dir, "etc", "grub.d", "90_copy_to_boot_efi2")
with open(grub_hook, "w", encoding="utf-8") as f:
f.write(
f"""#!/bin/sh
# Sync GRUB updates to both EFI partitions for RAID redundancy
set -e
if mountpoint --quiet --nofollow /boot/efi; then
mount LABEL={boot_label2} /boot/efi2 || :
rsync --times --recursive --delete /boot/efi/ /boot/efi2/
umount -l /boot/efi2
fi
exit 0
"""
)
os.chmod(grub_hook, 0o755) # nosec B103
LOG.info("GRUB EFI sync hook created")
class DebOCIEFILVMHardwareManager
Main hardware manager class implementing the deb_oci_efi_lvm deploy step.
Orchestrates the full deployment workflow.
class DebOCIEFILVMHardwareManager(hardware.HardwareManager):
"""Hardware manager for OCI EFI LVM RAID deployment."""
HARDWARE_MANAGER_NAME = "DebOCIEFILVMHardwareManager"
HARDWARE_MANAGER_VERSION = "1.0"
def evaluate_hardware_support(self):
LOG.info("DebOCIEFILVMHardwareManager: " "evaluate_hardware_support called")
return hardware.HardwareSupport.SERVICE_PROVIDER
def get_deploy_steps(self, node, ports):
LOG.info("DebOCIEFILVMHardwareManager: get_deploy_steps called")
return [
{
"step": "deb_oci_efi_lvm",
"priority": 0,
"interface": "deploy",
"reboot_requested": False,
"argsinfo": {},
},
]
def deb_oci_efi_lvm(self, node, ports):
"""Deploy Debian-based OCI image with EFI, LVM, and optional RAID.
:param node: Node dictionary containing deployment configuration
:param ports: List of port dictionaries for the node
:raises: ValueError if configuration is invalid
:raises: RuntimeError if deployment fails
"""
LOG.info("DebOCIEFILVMHardwareManager: " "deb_oci_efi_lvm called")
LOG.info("DebOCIEFILVMHardwareManager: node: %s", node)
LOG.info("DebOCIEFILVMHardwareManager: ports: %s", ports)
if not is_efi_system():
raise RuntimeError(
"This deployment requires EFI boot mode. "
"System is not booted in EFI mode."
)
try:
# Extract configuration from node
configdrive_data = get_configdrive_data(node)
root_device_hints = get_root_device_hints(node, configdrive_data)
resolved_devices = resolve_root_devices(root_device_hints)
meta_data = configdrive_data.get("meta_data", {})
metal3_name = meta_data.get("metal3-name")
root_device_path = resolved_devices[0]
second_device = resolved_devices[1] if len(resolved_devices) > 1 else None
LOG.info(
"DebOCIEFILVMHardwareManager: " "root_device_path: %s", root_device_path
)
if second_device:
LOG.info(
"DebOCIEFILVMHardwareManager: " "second_device: %s (RAID1)",
second_device,
)
# Get OCI image and architecture-specific configuration
oci_image = get_oci_image(node, configdrive_data)
arch_config = get_architecture_config(oci_image)
LOG.info(
"DebOCIEFILVMHardwareManager: " "architecture config: %s", arch_config
)
# Get disk wipe mode
is_raid_setup = second_device is not None
wipe_mode = get_disk_wipe_mode(configdrive_data, is_raid_setup)
# Clean devices based on wipe mode
if wipe_mode == "all":
LOG.info("Cleaning all block devices (wipe_mode: all)")
clean_all_devices()
wait_for_device(root_device_path)
if second_device:
wait_for_device(second_device)
else: # wipe_mode == 'target'
LOG.info("Cleaning only target device(s) (wipe_mode: target)")
wait_for_device(root_device_path)
clean_device(root_device_path)
if second_device:
wait_for_device(second_device)
clean_device(second_device)
# Partition disk
is_raid, pv_device = partition_disk(
root_device_path,
VG_NAME,
LV_NAME,
second_device=second_device,
raid_device=RAID_DEVICE,
homehost=metal3_name,
)
# Get partition paths
efi_partition = get_partition_path(root_device_path, 1)
second_efi_partition = None
if is_raid and second_device:
second_efi_partition = get_partition_path(second_device, 1)
root_lv_path = f"/dev/{VG_NAME}/{LV_NAME}"
# Create filesystems
create_filesystems(
efi_partition,
root_lv_path,
boot_label=BOOT_FS_LABEL,
root_label=ROOT_FS_LABEL,
second_efi_partition=second_efi_partition,
boot_label2=BOOT_FS_LABEL2,
)
# Mount root filesystem
root_mount = tempfile.mkdtemp()
run_command(["mount", root_lv_path, root_mount])
try:
# Extract OCI image rootfs
extract_oci_image(
arch_config["oci_image"], arch_config["oci_platform"], root_mount
)
# Mount EFI partition
efi_mount = os.path.join(root_mount, "boot", "efi")
os.makedirs(efi_mount, exist_ok=True)
run_command(["mount", efi_partition, efi_mount])
try:
# Set up chroot
setup_chroot(root_mount)
try:
# Install packages
install_packages(root_mount, arch_config["grub_packages"])
# Configure cloud-init
configure_cloud_init(root_mount, configdrive_data)
# Write /etc/hosts
write_hosts_file(root_mount, metal3_name)
# Write fstab
write_fstab(
root_mount,
ROOT_FS_LABEL,
BOOT_FS_LABEL,
is_raid,
BOOT_FS_LABEL2,
)
# Configure GRUB
setup_grub_defaults(root_mount, ROOT_FS_LABEL, is_raid)
# RAID-specific configuration
if is_raid:
write_mdadm_conf(root_mount)
setup_grub_efi_sync(root_mount, BOOT_FS_LABEL2)
efi2_mount = os.path.join(root_mount, "boot", "efi2")
os.makedirs(efi2_mount, exist_ok=True)
# Install GRUB to EFI
run_command(
[
"chroot",
root_mount,
"grub-install",
f'--target={arch_config["uefi_target"]}',
"--efi-directory=/boot/efi",
"--bootloader-id=ubuntu",
"--recheck",
]
)
# Configure initramfs for LVM (required for Debian)
configure_initramfs(root_mount, is_raid)
# Update GRUB config and initramfs
run_command(["chroot", root_mount, "update-grub"])
run_command(
[
"chroot",
root_mount,
"update-initramfs",
"-u",
"-k",
"all",
]
)
# Install GRUB to second EFI partition for RAID
if is_raid and second_efi_partition:
efi2_mount = os.path.join(root_mount, "boot", "efi2")
try:
run_command(["mount", second_efi_partition, efi2_mount])
run_command(
[
"rsync",
"-a",
f"{root_mount}/boot/efi/",
f"{root_mount}/boot/efi2/",
]
)
run_command(
[
"chroot",
root_mount,
"grub-install",
f'--target={arch_config["uefi_target"]}',
"--efi-directory=/boot/efi2",
"--bootloader-id=ubuntu",
"--recheck",
]
)
except Exception as e:
LOG.warning(
"Error installing GRUB to second EFI: %s", e
)
finally:
result = run_command(
["mountpoint", "-q", efi2_mount], check=False
)
if result.returncode == 0:
run_command(["umount", "-l", efi2_mount])
finally:
teardown_chroot(root_mount)
finally:
# Unmount EFI partition
result = run_command(["mountpoint", "-q", efi_mount], check=False)
if result.returncode == 0:
run_command(["umount", "-l", efi_mount])
finally:
# Unmount root filesystem
result = run_command(["mountpoint", "-q", root_mount], check=False)
if result.returncode == 0:
run_command(["umount", "-l", root_mount])
# Clean up temporary directories
if root_mount and os.path.exists(root_mount):
try:
os.rmdir(root_mount)
LOG.debug("Cleaned up root mount directory: %s", root_mount)
except Exception as e:
LOG.warning(
"Failed to clean up root mount dir %s: %s", root_mount, e
)
LOG.info(
"DebOCIEFILVMHardwareManager: " "deb_oci_efi_lvm completed successfully"
)
except Exception as e:
LOG.error("DebOCIEFILVMHardwareManager: " "deb_oci_efi_lvm failed: %s", e)
raise
finally:
# Wait for interactive users to logout
if has_interactive_users():
LOG.info(
"DebOCIEFILVMHardwareManager: "
"interactive users detected, waiting for logout"
)
while has_interactive_users():
LOG.info(
"DebOCIEFILVMHardwareManager: "
"users still logged in, checking again "
"in 60 seconds"
)
time.sleep(60)
LOG.info(
"DebOCIEFILVMHardwareManager: " "all interactive users logged out"
)
Supported OCI Images
The hardware manager works with any Debian-based OCI image that has a
functional apt package manager. OCI multi-arch images are supported.
Tested images include:
ubuntu:24.04debian:13
The key benefit of this approach is the ability to create custom OCI images with your specific OS configuration, packages, and settings. You can build and maintain your own Docker images and use them directly as the root filesystem for bare metal deployments. The deployment process installs additional packages (kernel, GRUB, cloud-init) on top of the base image.
Debugging Deployments
If a deployment fails, you can connect to the server via BMC console during the IPA phase. The hardware manager includes a feature that waits for interactive users to log out before completing, allowing you to inspect the system state.
Limitations and Considerations
The following are limitations of this specific deb_oci_efi_lvm
implementation, not of Metal3’s custom deploy mechanism itself. The
custom deploy framework is flexible and allows implementing alternative
hardware managers with different capabilities.
- EFI only - This implementation requires UEFI boot mode
- Debian-based only - The package installation assumes
aptis available - Network required - The IPA needs network access to pull OCI images from registries and install packages in target system
- Root device hints - Only
serialandwwnhints are supported for disk selection
Conclusion
The deb_oci_efi_lvm hardware manager demonstrates how custom deploy
steps can extend Ironic’s capabilities beyond traditional image-based
deployments. The source code and GitHub Actions for building custom IPA
images are available at
s3rj1k/ironic-python-agent.
Future Improvements
A potential enhancement could add native support for converting OpenStack
network_data.json format to cloud-init v1 network configuration during
deployment.
References
- Integrating CoreOS Installer with Ironic - Dmitry Tantsur’s original blog post on custom deploy steps
- Ironic Deploy Steps Documentation
- Metal3 Custom Deploy Steps Design
- OpenShift CoreOS Install Hardware Manager