diff --git a/model_estimated_hellinger_distance_quantinuum_h2_56/PPO_1/events.out.tfevents.1764328882.cda-server-6.1587961.0 b/model_estimated_hellinger_distance_quantinuum_h2_56/PPO_1/events.out.tfevents.1764328882.cda-server-6.1587961.0 new file mode 100644 index 000000000..361ccf648 Binary files /dev/null and b/model_estimated_hellinger_distance_quantinuum_h2_56/PPO_1/events.out.tfevents.1764328882.cda-server-6.1587961.0 differ diff --git a/model_estimated_hellinger_distance_quantinuum_h2_56/PPO_2/events.out.tfevents.1764329185.cda-server-6.1587961.1 b/model_estimated_hellinger_distance_quantinuum_h2_56/PPO_2/events.out.tfevents.1764329185.cda-server-6.1587961.1 new file mode 100644 index 000000000..3378ea3d5 Binary files /dev/null and b/model_estimated_hellinger_distance_quantinuum_h2_56/PPO_2/events.out.tfevents.1764329185.cda-server-6.1587961.1 differ diff --git a/pyproject.toml b/pyproject.toml index 75a48259e..f0149a7fb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,6 +44,10 @@ dependencies = [ "numpy>=1.26; python_version >= '3.12'", "numpy>=1.24; python_version >= '3.11'", "numpy>=1.22", + "numpy>=1.22,<2; sys_platform == 'darwin' and 'x86_64' in platform_machine and python_version < '3.13'", # Restrict numpy v2 for macOS x86 since it is not supported anymore since torch v2.3.0 + "optuna>=4.5.0", + "torch-geometric>=2.6.1,<2.7.0", + "torch>=2.7.1,<2.8.0; sys_platform == 'darwin' and 'x86_64' in platform_machine and python_version < '3.13'", # Restrict torch v2.3.0 for macOS x86 since it is not supported anymore. "typing-extensions>=4.1", # for `assert_never` ] @@ -118,6 +122,8 @@ xfail_strict = true filterwarnings = [ 'error', 'ignore:.*pytorch.*:UserWarning:', + "ignore:.*torch_geometric.*:UserWarning:", + "ignore:.*'type_params' parameter of 'typing\\._eval_type'.*:DeprecationWarning:", 'ignore:.*Values in x.*:RuntimeWarning:', 'ignore:.*The least populated class in y has only 3 members, which is less than n_splits=5.*:UserWarning:', 'ignore:.*divide by zero encountered in det.*:RuntimeWarning:', @@ -126,6 +132,7 @@ filterwarnings = [ 'ignore:.*qiskit.providers.models is deprecated since Qiskit 1.2*:DeprecationWarning:', 'ignore:.*The class ``qiskit.qobj.*`` is deprecated as of Qiskit 1.3.*:DeprecationWarning:', 'ignore:.*The property ``qiskit.circuit.instruction.Instruction.*`` is deprecated as of qiskit 1.3.0.*:DeprecationWarning:', + "ignore::DeprecationWarning:torch_geometric\\.distributed.*", ] [tool.coverage] @@ -163,9 +170,13 @@ implicit_reexport = true # recent versions of `gym` are typed, but stable-baselines3 pins a very old version of gym. # qiskit is not yet marked as typed, but is typed mostly. # the other libraries do not have type stubs. -module = ["qiskit.*", "joblib.*", "sklearn.*", "matplotlib.*", "gymnasium.*", "mqt.bench.*", "sb3_contrib.*", "bqskit.*", "qiskit_ibm_runtime.*", "networkx.*", "stable_baselines3.*"] +module = ["qiskit.*", "joblib.*", "sklearn.*", "matplotlib.*", "gymnasium.*", "mqt.bench.*", "sb3_contrib.*", "bqskit.*", "qiskit_ibm_runtime.*", "networkx.*", "stable_baselines3.*", "torch", "torch.*", "torch_geometric", "torch_geometric.*", "optuna.*"] ignore_missing_imports = true +[[tool.mypy.overrides]] +module = ["mqt.predictor.ml.*"] +disallow_subclassing_any = false + [tool.ruff] line-length = 120 extend-include = ["*.ipynb"] @@ -247,6 +258,7 @@ wille = "wille" anc = "anc" aer = "aer" fom = "fom" +TPE = "TPE" [tool.repo-review] ignore = [ diff --git a/src/mqt/predictor/_version.py b/src/mqt/predictor/_version.py new file mode 100644 index 000000000..79c219efe --- /dev/null +++ b/src/mqt/predictor/_version.py @@ -0,0 +1,40 @@ +# Copyright (c) 2023 - 2025 Chair for Design Automation, TUM +# Copyright (c) 2025 Munich Quantum Software Company GmbH +# All rights reserved. +# +# SPDX-License-Identifier: MIT +# +# Licensed under the MIT License + +# file generated by setuptools-scm +# don't change, don't track in version control +from __future__ import annotations + +__all__ = [ + "__commit_id__", + "__version__", + "__version_tuple__", + "commit_id", + "version", + "version_tuple", +] + +TYPE_CHECKING = False +if TYPE_CHECKING: + VERSION_TUPLE = tuple[int | str, ...] + COMMIT_ID = str | None +else: + VERSION_TUPLE = object + COMMIT_ID = object + +version: str +__version__: str +__version_tuple__: VERSION_TUPLE +version_tuple: VERSION_TUPLE +commit_id: COMMIT_ID +__commit_id__: COMMIT_ID + +__version__ = version = "0.1.dev719+g5ea17201a.d20250908" +__version_tuple__ = version_tuple = (0, 1, "dev719", "g5ea17201a.d20250908") + +__commit_id__ = commit_id = None diff --git a/src/mqt/predictor/hellinger/utils.py b/src/mqt/predictor/hellinger/utils.py index 6f1a3fffa..6f69f0e35 100644 --- a/src/mqt/predictor/hellinger/utils.py +++ b/src/mqt/predictor/hellinger/utils.py @@ -132,12 +132,13 @@ def calc_device_specific_features( return np.array(list(feature_dict.values())) -def get_hellinger_model_path(device: Target) -> Path: +def get_hellinger_model_path(device: Target, *, gnn: bool = False) -> Path: """Returns the path to the trained model folder resulting from the machine learning training.""" - training_data_path = Path(str(resources.files("mqt.predictor"))) / "ml" / "training_data" - model_path = ( - training_data_path - / "trained_model" - / ("trained_hellinger_distance_regressor_" + device.description + ".joblib") + training_data_path = Path(str(resources.files("mqt.predictor"))) / "ml" / "training_data" / "trained_model" + device_description = str(device.description) + filename = ( + ("trained_hellinger_distance_regressor_gnn_" + device_description + ".pth") + if gnn + else ("trained_hellinger_distance_regressor_" + device_description + ".joblib") ) - return Path(model_path) + return training_data_path / filename diff --git a/src/mqt/predictor/ml/__init__.py b/src/mqt/predictor/ml/__init__.py index 6887f5367..151ece6a4 100644 --- a/src/mqt/predictor/ml/__init__.py +++ b/src/mqt/predictor/ml/__init__.py @@ -13,4 +13,9 @@ from mqt.predictor.ml import helper from mqt.predictor.ml.predictor import Predictor, predict_device_for_figure_of_merit, setup_device_predictor -__all__ = ["Predictor", "helper", "predict_device_for_figure_of_merit", "setup_device_predictor"] +__all__ = [ + "Predictor", + "helper", + "predict_device_for_figure_of_merit", + "setup_device_predictor", +] diff --git a/src/mqt/predictor/ml/gnn.py b/src/mqt/predictor/ml/gnn.py new file mode 100644 index 000000000..4727ed9e5 --- /dev/null +++ b/src/mqt/predictor/ml/gnn.py @@ -0,0 +1,295 @@ +# Copyright (c) 2023 - 2025 Chair for Design Automation, TUM +# Copyright (c) 2025 Munich Quantum Software Company GmbH +# All rights reserved. +# +# SPDX-License-Identifier: MIT +# +# Licensed under the MIT License + +"""Graph neural network models using SAGEConv layers.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import torch +import torch.nn as nn +import torch.nn.functional as functional +from torch_geometric.nn import ( + GraphNorm, + SAGEConv, + SAGPooling, + global_mean_pool, +) + +if TYPE_CHECKING: + from collections.abc import Callable + + from torch_geometric.data import Data + + +class GraphConvolutionSage(nn.Module): + """Graph convolutional encoder using SAGEConv layers.""" + + def __init__( + self, + in_feats: int, + hidden_dim: int, + num_conv_wo_resnet: int, + num_resnet_layers: int, + *, + conv_activation: Callable[..., torch.Tensor] = functional.leaky_relu, + conv_act_kwargs: dict[str, Any] | None = None, + dropout_p: float = 0.2, + bidirectional: bool = True, + use_sag_pool: bool = False, + sag_ratio: float = 0.7, + sag_nonlinearity: Callable[..., torch.Tensor] = torch.tanh, + ) -> None: + """Initialize the graph convolutional encoder. + + The encoder consists of a stack of SAGEConv layers followed by + optional SAGPooling before the global readout. + + Args: + in_feats: Dimensionality of the node features. + hidden_dim: Output dimensionality of the first SAGEConv layer. + num_conv_wo_resnet: Number of SAGEConv layers before residual + connections are introduced. + num_resnet_layers: Number of SAGEConv layers with residual + connections. + conv_activation: Activation function applied after each graph + convolution. Defaults to torch.nn.functional.leaky_relu. + conv_act_kwargs: Additional keyword arguments passed to + conv_activation. Defaults to None. + dropout_p: Dropout probability applied after each graph layer. + Defaults to 0.2. + bidirectional: If True, apply message passing in both + directions (forward and reversed edges) and average the + results. Defaults to True. + use_sag_pool: If True, apply a single SAGPooling layer after + the convolutions and before readout. Defaults to False. + sag_ratio: Fraction of nodes to keep in SAGPooling. Must be in + (0, 1]. Defaults to 0.7. + sag_nonlinearity: Nonlinearity used inside SAGPooling for score + computation. Defaults to torch.tanh. + """ + super().__init__() + + if num_conv_wo_resnet < 1: + msg = "num_conv_wo_resnet must be at least 1" + raise ValueError(msg) + + self.conv_activation = conv_activation + self.conv_act_kwargs = conv_act_kwargs or {} + self.bidirectional = bidirectional + self.use_sag_pool = use_sag_pool + + # --- GRAPH ENCODER --- + self.convs: nn.ModuleList[SAGEConv] = nn.ModuleList() + self.norms: nn.ModuleList[GraphNorm] = nn.ModuleList() + + # First layer: SAGE + self.convs.append(SAGEConv(in_feats, hidden_dim)) + out_dim = hidden_dim + self.graph_emb_dim = out_dim + self.norms.append(GraphNorm(out_dim)) + + # Subsequent layers: SAGE with fixed width == out_dim + for _ in range(num_conv_wo_resnet - 1): + self.convs.append(SAGEConv(out_dim, out_dim)) + self.norms.append(GraphNorm(out_dim)) + for _ in range(num_resnet_layers): + self.convs.append(SAGEConv(out_dim, out_dim)) + self.norms.append(GraphNorm(out_dim)) + + self.drop = nn.Dropout(dropout_p) + # Start residuals after the initial non-residual stack + self._residual_start = num_conv_wo_resnet + # Expose the final node embedding width + self.out_dim = out_dim + + # --- SAGPooling layer (applied once, after all convs) --- + # Uses SAGEConv internally for attention scoring to match the stack. + if self.use_sag_pool: + if not (0.0 < sag_ratio <= 1.0): + msg = "sag_ratio must be in (0, 1]" + raise ValueError(msg) + self.sag_pool: SAGPooling | None = SAGPooling( + in_channels=self.out_dim, + ratio=sag_ratio, + GNN=SAGEConv, + nonlinearity=sag_nonlinearity, + ) + else: + self.sag_pool = None + + def _apply_conv_bidir( + self, + conv: SAGEConv, + x: torch.Tensor, + edge_index: torch.Tensor, + ) -> torch.Tensor: + """Apply a SAGEConv layer in forward and backward directions and average. + + Args: + conv: Convolution layer taken from self.convs. + x: Node feature matrix of shape [num_nodes, in_channels]. + edge_index: Edge index tensor of shape [2, num_edges]. + + Returns: + Tensor with updated node features of shape + [num_nodes, out_channels]. + """ + x_f = conv(x, edge_index) + if not self.bidirectional: + return x_f + x_b = conv(x, edge_index.flip(0)) + return (x_f + x_b) / 2 + + def forward(self, data: Data) -> torch.Tensor: + """Encode a batch of graphs and return pooled graph embeddings. + + The input batch of graphs is processed by the SAGEConv stack, + optionally followed by SAGPooling, and finally aggregated with + global mean pooling. + + Args: + data: Batched torch_geometric.data.Data object. + Expected attributes: + - x: Node features of shape [num_nodes, in_feats]. + - edge_index: Edge indices of shape [2, num_edges]. + - batch: Graph indices for each node of shape + [num_nodes]. + + Returns: + Tensor of shape [num_graphs, out_dim] containing one embedding + per input graph. + """ + x, edge_index, batch = data.x, data.edge_index, data.batch + + for i, conv in enumerate(self.convs): + x_new = self._apply_conv_bidir(conv, x, edge_index) + x_new = self.norms[i](x_new, batch=batch) + x_new = self.conv_activation(x_new, **self.conv_act_kwargs) + x_new = self.drop(x_new) + + x = x_new if i < self._residual_start else x + x_new + + # --- SAGPooling (hierarchical pooling before readout) --- + if self.sag_pool is not None: + # SAGPooling may also return edge_attr, perm, score; we ignore those here. + x, edge_index, _, batch, _, _ = self.sag_pool( + x, + edge_index, + batch=batch, + ) + + return global_mean_pool(x, batch) + + +class GNN(nn.Module): + """Graph neural network with a SAGE-based encoder and MLP head. + + This model first encodes each input graph using GraphConvolutionSage + and then applies a feed-forward neural network to the resulting graph + embeddings to produce the final prediction. + """ + + def __init__( + self, + in_feats: int, + hidden_dim: int, + num_conv_wo_resnet: int, + num_resnet_layers: int, + mlp_units: list[int], + *, + conv_activation: Callable[..., torch.Tensor] = functional.leaky_relu, + conv_act_kwargs: dict[str, Any] | None = None, + mlp_activation: Callable[..., torch.Tensor] = functional.leaky_relu, + mlp_act_kwargs: dict[str, Any] | None = None, + dropout_p: float = 0.2, + bidirectional: bool = True, + output_dim: int = 1, + use_sag_pool: bool = False, + sag_ratio: float = 0.7, + sag_nonlinearity: Callable[..., torch.Tensor] = torch.tanh, + ) -> None: + """Initialize the GNN model. + + Args: + in_feats: Dimensionality of the input node features. + hidden_dim: Hidden dimensionality of the SAGEConv layers. + num_conv_wo_resnet: Number of SAGEConv layers before residual + connections are introduced in the encoder. + num_resnet_layers: Number of SAGEConv layers with residual + connections in the encoder. + mlp_units: List specifying the number of units in each hidden + layer of the MLP head. + conv_activation: Activation function applied after each graph + convolution. Defaults to torch.nn.functional.leaky_relu. + conv_act_kwargs: Additional keyword arguments passed to + conv_activation. Defaults to None. + mlp_activation: Activation function applied after each MLP layer. + Defaults to torch.nn.functional.leaky_relu. + mlp_act_kwargs: Additional keyword arguments passed to + mlp_activation. Defaults to None. + dropout_p: Dropout probability applied in the graph encoder. + Defaults to 0.2. + bidirectional: If True, apply bidirectional message passing in + the encoder. Defaults to True. + output_dim: Dimensionality of the model output (e.g. number of + targets per graph). Defaults to 1. + use_sag_pool: If True, enable SAGPooling in the encoder. + Defaults to False. + sag_ratio: Fraction of nodes to keep in SAGPooling. Must be in + (0, 1]. Defaults to 0.7. + sag_nonlinearity: Nonlinearity used inside SAGPooling for score + computation. Defaults to torch.tanh. + """ + super().__init__() + + # Graph encoder + self.graph_conv = GraphConvolutionSage( + in_feats=in_feats, + hidden_dim=hidden_dim, + num_conv_wo_resnet=num_conv_wo_resnet, + num_resnet_layers=num_resnet_layers, + conv_activation=conv_activation, + conv_act_kwargs=conv_act_kwargs, + dropout_p=dropout_p, + bidirectional=bidirectional, + use_sag_pool=use_sag_pool, + sag_ratio=sag_ratio, + sag_nonlinearity=sag_nonlinearity, + ) + + # MLP head + self.mlp_activation = mlp_activation + self.mlp_act_kwargs = mlp_act_kwargs or {} + last_dim = self.graph_conv.graph_emb_dim + self.fcs: nn.ModuleList[nn.Linear] = nn.ModuleList() + for out_dim_ in mlp_units: + self.fcs.append(nn.Linear(last_dim, out_dim_)) + last_dim = out_dim_ + self.out = nn.Linear(last_dim, output_dim) + + def forward(self, data: Data) -> torch.Tensor: + """Compute predictions for a batch of graphs. + + The input graphs are encoded into graph embeddings by the + GraphConvolutionSage encoder, then passed through the MLP head + to obtain final predictions. + + Args: + data: Batched torch_geometric.data.Data object + containing the graphs to be evaluated. + + Returns: + Tensor of shape [num_graphs, output_dim] with the model + predictions for each graph in the batch. + """ + x = self.graph_conv(data) + for fc in self.fcs: + x = self.mlp_activation(fc(x), **self.mlp_act_kwargs) + return self.out(x) diff --git a/src/mqt/predictor/ml/helper.py b/src/mqt/predictor/ml/helper.py index 4550cf015..bc981bc0a 100644 --- a/src/mqt/predictor/ml/helper.py +++ b/src/mqt/predictor/ml/helper.py @@ -15,12 +15,22 @@ from pathlib import Path from typing import TYPE_CHECKING +import numpy as np +import torch +from qiskit import transpile +from qiskit.converters import circuit_to_dag +from qiskit.dagcircuit import DAGOpNode +from qiskit.transpiler import PassManager +from qiskit.transpiler.passes import RemoveBarriers +from sklearn.metrics import accuracy_score, classification_report, mean_absolute_error, mean_squared_error, r2_score + from mqt.predictor.utils import calc_supermarq_features if TYPE_CHECKING: - import numpy as np + import torch_geometric from numpy._typing import NDArray from qiskit import QuantumCircuit + from torch import nn def get_path_training_data() -> Path: @@ -40,6 +50,13 @@ def get_path_trained_model(figure_of_merit: str) -> Path: return get_path_training_data() / "trained_model" / ("trained_clf_" + figure_of_merit + ".joblib") +def get_path_trained_model_gnn(figure_of_merit: str) -> Path: + """Return the path to the GNN checkpoint file for the given figure of merit.""" + base_dir = get_path_training_data() / "trained_model" + filename = f"trained_gnn_{figure_of_merit}.pth" + return base_dir / filename + + def get_path_training_circuits() -> Path: """Returns the path to the training circuits folder.""" return get_path_training_data() / "training_circuits" @@ -99,6 +116,41 @@ def get_openqasm_gates() -> list[str]: ] +def get_openqasm3_gates() -> list[str]: + """Returns a list of all quantum gates within the openQASM 3.0 standard header.""" + # according to https://openqasm.com/language/standard_library.html#standard-library + # Snapshot from OpenQASM 3.0 specification (version 3.0) + # Verify against latest spec when Qiskit or OpenQASM updates + return [ + "x", + "y", + "z", + "h", + "s", + "sdg", + "t", + "tdg", + "sx", + "p", + "rx", + "ry", + "rz", + "u", + "cx", + "cy", + "cz", + "ch", + "cp", + "crx", + "cry", + "crz", + "cu", + "swap", + "ccx", + "cswap", + ] + + def dict_to_featurevector(gate_dict: dict[str, int]) -> dict[str, int]: """Calculates and returns the feature vector of a given quantum circuit gate dictionary.""" res_dct = dict.fromkeys(get_openqasm_gates(), 0) @@ -137,14 +189,396 @@ def create_feature_vector(qc: QuantumCircuit) -> list[int | float]: return list(feature_dict.values()) +def create_dag(qc: QuantumCircuit) -> tuple[torch.Tensor, torch.Tensor, int]: + """Creates and returns the feature-annotated DAG of the quantum circuit. + + Arguments: + qc: The quantum circuit to be converted to a DAG. + + Returns: + node_vector: features per node = [one-hot gate, sin/cos params, arity, controls, + num_params, critical_flag, fan_prop] + edge_index: 2 for E tensor of edges (src, dst) + number_of_gates: number of nodes in the DAG. + """ + # 0) cleanup & DAG + pm = PassManager(RemoveBarriers()) + qc = pm.run(qc) + qc = transpile(qc, optimization_level=0, basis_gates=get_openqasm3_gates()) + dag = circuit_to_dag(qc) + + unique_gates = [*get_openqasm3_gates(), "measure"] + gate2idx = {g: i for i, g in enumerate(unique_gates)} + number_gates = len(unique_gates) + + # --- parameters sin/cos (max 3 param) --- + def param_vector(node: DAGOpNode, dim: int = 3) -> list[float]: + """Return [sin(p1), cos(p1), sin(p2), cos(p2), sin(p3), cos(p3)]. + + Arguments: + node: DAG operation node + dim: number of parameters to consider (max 3) + + Returns: + list of sin/cos values of parameters + """ + # pad the parameters with zeros if less than dim + params = [float(val) for val in getattr(node.op, "params", [])][:dim] + params += [0.0] * (dim - len(params)) + out = [] + # for each param calculate sin and cos + for p in params: + out.extend([np.sin(p), np.cos(p)]) + return out # len = 2*dim + + nodes = list(dag.op_nodes()) + number_nodes = len(nodes) + + # prealloc + onehots = torch.zeros((number_nodes, number_gates), dtype=torch.float32) + num_params = torch.zeros((number_nodes, 1), dtype=torch.float32) + params = torch.zeros((number_nodes, 6), dtype=torch.float32) + arity = torch.zeros((number_nodes, 1), dtype=torch.float32) + controls = torch.zeros((number_nodes, 1), dtype=torch.float32) + fan_prop = torch.zeros((number_nodes, 1), dtype=torch.float32) + + for i, node in enumerate(nodes): + name = node.op.name + if name not in unique_gates: + msg = f"Unknown gate: {name}" + raise ValueError(msg) + onehots[i, gate2idx[name]] = 1.0 + params[i] = torch.tensor(param_vector(node), dtype=torch.float32) + arity[i] = float(len(node.qargs)) + controls[i] = float(getattr(node.op, "num_ctrl_qubits", 0)) + num_params[i] = float(len(getattr(node.op, "params", []))) + preds = [p for p in dag.predecessors(node) if isinstance(p, DAGOpNode)] + succs = [s for s in dag.successors(node) if isinstance(s, DAGOpNode)] + fan_prop[i] = float(len(succs)) / max(1, len(preds)) + + # edges DAG + idx_map = {node: i for i, node in enumerate(nodes)} + edges: list[list[int]] = [] + for src, dst, _ in dag.edges(): + if src in idx_map and dst in idx_map: + edges.append([idx_map[src], idx_map[dst]]) + if edges: + edge_index = torch.tensor(edges, dtype=torch.long).t().contiguous() + else: + edge_index = torch.empty((2, 0), dtype=torch.long) + + # --- critical path detection --- + topo_nodes = list(dag.topological_op_nodes()) + if not topo_nodes: + # No operation nodes: return node features with zero critical flags + critical_flag = torch.zeros((number_nodes, 1), dtype=torch.float32) + node_vector = torch.cat([onehots, params, arity, controls, num_params, critical_flag, fan_prop], dim=1) + return node_vector, edge_index, number_nodes + + dist_in = dict.fromkeys(topo_nodes, 0) + for node in topo_nodes: + preds = [p for p in dag.predecessors(node) if isinstance(p, DAGOpNode)] + if preds: + dist_in[node] = max(dist_in[p] + 1 for p in preds) + + dist_out = dict.fromkeys(topo_nodes, 0) + for node in reversed(topo_nodes): + succs = [s for s in dag.successors(node) if isinstance(s, DAGOpNode)] + if succs: + dist_out[node] = max(dist_out[s] + 1 for s in succs) + + critical_len = max(dist_in[n] + dist_out[n] for n in topo_nodes) + + critical_flag = torch.zeros((number_nodes, 1), dtype=torch.float32) + for i, node in enumerate(nodes): + # set critical flag to 1 if on critical path + if dist_in[node] + dist_out[node] == critical_len: + critical_flag[i] = 1.0 + + # final concat of features + node_vector = torch.cat([onehots, params, arity, controls, num_params, critical_flag, fan_prop], dim=1) + + return node_vector, edge_index, number_nodes + + +def get_results_classes(preds: torch.Tensor, targets: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]: + """Return predicted and target class indices. + + Arguments: + preds: model predictions + targets: ground truth targets + Returns: + pred_idx: predicted class indices + targets_idx: target class indices + """ + pred_idx = torch.argmax(preds, dim=1) + targets_idx = torch.argmax(targets, dim=1) + + return pred_idx, targets_idx + + +# --------------------------------------------------- +# Evaluation +# --------------------------------------------------- +def evaluate_classification_model( + model: nn.Module, + loader: torch_geometric.loader.DataLoader, + loss_fn: nn.Module, + device: str, + *, + return_arrays: bool = False, + verbose: bool = False, +) -> tuple[float, dict[str, float], tuple[np.ndarray, np.ndarray] | None]: + """Evaluate a classification model with the given loss function and compute accuracy metrics. + + Arguments: + model: classification model to be evaluated + loader: data loader for the evaluation dataset + loss_fn: loss function for evaluation + device: device to be used for evaluation (cuda or cpu) + return_arrays: whether to return prediction and target arrays + verbose: whether to print the metrics results. + + Returns: + avg_loss: average loss over the loader + metrics: {"custom_accuracy": ..., "classification_report": ..., "mse": ..., "rmse": ..., "mae": ..., "r2": ...} + arrays: (preds, y_true) if return_arrays=True, else None. + """ + device = torch.device(device) + + model.eval() + total_loss, total = 0.0, 0 + all_preds, all_targets = [], [] + + with torch.no_grad(): + for batch in loader: + batch_device = batch.to(device) + preds = model(batch_device) + preds = torch.clamp(preds, 0.0, 1.0) + targets = batch_device.y.float() + + if targets.dim() == 1: + targets = targets.unsqueeze(1) + if preds.shape != targets.shape: + msg = f"Shape mismatch: preds {preds.shape} vs targets {targets.shape}" + raise ValueError(msg) + + bs = targets.size(0) + loss = loss_fn(preds, targets) + total_loss += loss.item() * bs + total += bs + + all_preds.append(preds.detach().cpu()) + all_targets.append(targets.detach().cpu()) + + avg_loss = total_loss / max(1, total) + metrics = {"loss": float(avg_loss)} + + if not all_preds or not all_targets: + arrays = (np.array([]), np.array([])) if return_arrays else None + return avg_loss, metrics, arrays + + preds = torch.cat(all_preds, dim=0) + targets = torch.cat(all_targets, dim=0) + + # --- compute accuracy --- + pred_classes, target_classes = get_results_classes(preds, targets) + acc = accuracy_score(target_classes, pred_classes) + classification_report_res = classification_report(target_classes, pred_classes) + metrics["custom_accuracy"] = float(acc) + metrics["classification_report"] = classification_report_res + + if verbose: + mse = mean_squared_error(targets.numpy().reshape(-1), preds.numpy().reshape(-1)) + mae = mean_absolute_error(targets.numpy().reshape(-1), preds.numpy().reshape(-1)) + rmse = float(np.sqrt(mse)) + if targets.size(0) < 2 or torch.all(targets == targets[0]): + r2 = float("nan") + else: + r2 = float(r2_score(targets.numpy().reshape(-1), preds.numpy().reshape(-1))) + metrics.update({"mse": float(mse), "rmse": float(rmse), "mae": float(mae), "r2": float(r2)}) + + arrays = (preds.numpy(), targets.numpy()) if return_arrays else None + return avg_loss, metrics, arrays + + +def evaluate_regression_model( + model: nn.Module, + loader: torch_geometric.loader.DataLoader, + loss_fn: nn.Module, + device: str, + *, + return_arrays: bool = False, + verbose: bool = False, +) -> tuple[float, dict[str, float], tuple[np.ndarray, np.ndarray] | None]: + """Evaluate a regression model (logits = scalar predictions). + + Arguments: + model: regression model to be evaluated + loader: data loader for the evaluation dataset + loss_fn: loss function for evaluation + device: device to be used for evaluation (cuda or cpu) + return_arrays: whether to return prediction and target arrays + verbose: whether to print the metrics results. + + Returns: + avg_loss: average loss over the loader + metrics: {"rmse": ..., "mae": ..., "r2": ...} + arrays: (preds, y_true) if return_arrays=True, else None + """ + device = torch.device(device) + + model.eval() + total_loss, total = 0.0, 0 + all_preds, all_targets = [], [] + + with torch.no_grad(): + for batch in loader: + batch_device = batch.to(device) + logits = model(batch_device) + y = batch_device.y.float().view_as(logits) + + loss = loss_fn(logits, y) + bs = y.numel() + total_loss += loss.item() * bs + total += bs + + preds_1d = logits.view(-1).detach().cpu().numpy() + y_1d = y.view(-1).detach().cpu().numpy() + all_preds.append(preds_1d) + all_targets.append(y_1d) + + avg_loss = total_loss / max(1, total) + preds = np.concatenate(all_preds, axis=0) if all_preds else np.array([]) + y_true = np.concatenate(all_targets, axis=0) if all_targets else np.array([]) + + metrics: dict[str, float] = {"loss": float(avg_loss)} + if preds.size > 0: + rmse = float(np.sqrt(mean_squared_error(y_true, preds))) + mae = float(mean_absolute_error(y_true, preds)) + if y_true.shape[0] < 2 or np.all(y_true == y_true[0]): + r2 = float("nan") + else: + r2 = float(r2_score(y_true, preds)) if np.var(y_true) > 0 else float("nan") + metrics.update({"rmse": rmse, "mae": mae, "r2": r2}) + + if verbose: + print(f"[Eval] loss={avg_loss:.6f} | rmse={rmse:.6f} | mae={mae:.6f} | r2={metrics['r2']:.6f}") + + arrays = (preds, y_true) if return_arrays else None + return avg_loss, metrics, arrays + + +def train_model( + model: nn.Module, + train_loader: torch_geometric.loader.DataLoader, + optimizer: torch.optim.Optimizer, + loss_fn: nn.Module, + num_epochs: int, + task: str, + *, + device: str | None = None, + verbose: bool = True, + val_loader: torch_geometric.loader.DataLoader | None = None, + patience: int = 10, + min_delta: float = 0.0, + restore_best: bool = True, +) -> None: + """Trains model using MSE loss and validates with custom class accuracy. + + Arguments: + model: regression model to be trained + train_loader: training set split into mini-batch + optimizer: optimizer for model training + loss_fn: loss function for training + num_epochs: number of training epochs + task: either "classification" or "regression" + device: device to be used for training (gpu or cpu) + verbose: whether to print progress messages + val_loader: validation set split into mini-batch (optional) + patience: number of epochs with no improvement after which training will be stopped + min_delta: minimum change in the monitored quantity to qualify as an improvement + restore_best: whether to restore model weights from the epoch with the best validation loss + """ + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + device = torch.device(device) + model.to(device) + + best_state, best_metric = None, float("inf") + epochs_no_improve = 0 + + for epoch in range(1, num_epochs + 1): + model.train() + running_loss, total = 0.0, 0 + + for batch in train_loader: + batch_device = batch.to(device) + preds = model(batch_device) + + targets = batch_device.y + if targets.dim() == 1: + targets = targets.unsqueeze(1) + loss = loss_fn(preds, targets) + + optimizer.zero_grad() + loss.backward() + optimizer.step() + + bs = targets.size(0) + running_loss += loss.item() * bs + total += bs + + train_loss = running_loss / max(1, total) + + if val_loader is not None: + if task == "classification": + val_loss, val_metrics, _ = evaluate_classification_model( + model, val_loader, loss_fn, device=str(device), verbose=True + ) + elif task == "regression": + val_loss, val_metrics, _ = evaluate_regression_model( + model, val_loader, loss_fn, device=str(device), verbose=True + ) + else: + # raise an error if task not classification or regression + msg = "Task variable not regression or classification" + raise ValueError(msg) + improved = (best_metric - val_loss) > min_delta + if improved: + best_metric = val_loss + best_state = {k: v.detach().cpu() for k, v in model.state_dict().items()} + epochs_no_improve = 0 + else: + epochs_no_improve += 1 + + if verbose: + print( + f"Epoch {epoch:03d}/{num_epochs} | train_loss={train_loss:.6f} | " + f"val_loss={val_loss:.6f} | acc={val_metrics.get('custom_accuracy', 0):.4f} | patience={epochs_no_improve}/{patience} | r2={val_metrics.get('r2', 0):.4f}" + ) + + if epochs_no_improve >= patience: + if verbose: + print(f"Early stopping at epoch {epoch}.") + break + else: + if verbose: + print(f"Epoch {epoch:03d}/{num_epochs} | train_loss={train_loss:.6f}") + + if restore_best and best_state is not None: + model.load_state_dict(best_state) + model.to(device) + + @dataclass class TrainingData: - """Dataclass for the training data.""" + """Container for training/test data for both classical (numpy) and GNN (graph) models.""" - X_train: NDArray[np.float64] - y_train: NDArray[np.float64] - X_test: NDArray[np.float64] | None = None - y_test: NDArray[np.float64] | None = None + X_train: NDArray[np.float64] | list[torch_geometric.data.Data] + y_train: NDArray[np.float64] | torch.Tensor + X_test: NDArray[np.float64] | list[torch_geometric.data.Data] | None = None + y_test: NDArray[np.float64] | torch.Tensor | None = None indices_train: list[int] | None = None indices_test: list[int] | None = None names_list: list[str] | None = None diff --git a/src/mqt/predictor/ml/predictor.py b/src/mqt/predictor/ml/predictor.py index 3f0ec5497..e6343f270 100644 --- a/src/mqt/predictor/ml/predictor.py +++ b/src/mqt/predictor/ml/predictor.py @@ -15,32 +15,51 @@ import zipfile from importlib import resources from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, TypedDict from joblib import dump as joblib_dump +from torch import nn +from torch_geometric.loader import DataLoader +from typing_extensions import Unpack + +from mqt.predictor.ml.gnn import GNN if sys.version_info >= (3, 11) and TYPE_CHECKING: # pragma: no cover from typing import assert_never else: from typing_extensions import assert_never +import gc + import matplotlib.pyplot as plt import numpy as np +import optuna +import torch from joblib import Parallel, delayed, load from mqt.bench.targets import get_device +from optuna.samplers import TPESampler + +# cspell:disable-next-line from qiskit import QuantumCircuit from qiskit.qasm2 import dump from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor -from sklearn.model_selection import GridSearchCV, train_test_split +from sklearn.model_selection import GridSearchCV, KFold, train_test_split +from torch_geometric.data import Data from mqt.predictor.hellinger import get_hellinger_model_path from mqt.predictor.ml.helper import ( TrainingData, + create_dag, create_feature_vector, + evaluate_classification_model, + evaluate_regression_model, + get_openqasm3_gates, get_path_trained_model, + get_path_trained_model_gnn, get_path_training_circuits, get_path_training_circuits_compiled, get_path_training_data, + train_model, ) from mqt.predictor.reward import ( crit_depth, @@ -53,15 +72,31 @@ from mqt.predictor.utils import timeout_watcher if TYPE_CHECKING: + import torch_geometric + from numpy._typing import NDArray from qiskit.transpiler import Target from mqt.predictor.reward import figure_of_merit +import json + +GNNSample = tuple[torch.Tensor, torch.Tensor, torch.Tensor, int, str] +FeatureSample = tuple[list[float], str] +TrainingSample = GNNSample | FeatureSample + plt.rcParams["font.family"] = "Times New Roman" logger = logging.getLogger("mqt-predictor") +class TrainGNNKwargs(TypedDict, total=False): + """Arguments for training the GNN model.""" + + num_epochs: int + num_trials: int + verbose: bool + + def setup_device_predictor( devices: list[Target], figure_of_merit: figure_of_merit = "expected_fidelity", @@ -69,6 +104,8 @@ def setup_device_predictor( path_compiled_circuits: Path | None = None, path_training_data: Path | None = None, timeout: int = 600, + gnn: bool = False, + **gnn_kwargs: Unpack[TrainGNNKwargs], ) -> bool: """Sets up the device predictor for the given figure of merit. @@ -79,14 +116,13 @@ def setup_device_predictor( path_compiled_circuits: The path to the directory where the compiled circuits should be saved. Defaults to None. path_training_data: The path to the directory where the generated training data should be saved. Defaults to None. timeout: The timeout in seconds for the compilation of a single circuit. Defaults to 600. + gnn: Whether to use a GNN for training. Defaults to False. + gnn_kwargs: Additional keyword arguments for GNN training. Returns: True if the setup was successful, False otherwise. """ - predictor = Predictor( - figure_of_merit=figure_of_merit, - devices=devices, - ) + predictor = Predictor(figure_of_merit=figure_of_merit, devices=devices, gnn=gnn) try: logger.info(f"Start the training for the figure of merit: {figure_of_merit}") # Step 1: Generate compiled circuits for all devices @@ -103,9 +139,14 @@ def setup_device_predictor( path_training_data=path_training_data, ) logger.info(f"Generated training data for {figure_of_merit}") + # Step 3: Train the random forest classifier - predictor.train_random_forest_model() - logger.info(f"Trained random forest classifier for {figure_of_merit}") + if not predictor.gnn: + predictor.train_random_forest_model() + logger.info(f"Trained random forest classifier for {figure_of_merit}") + else: + predictor.train_gnn_model(**gnn_kwargs) + logger.info(f"Trained random GNN for {figure_of_merit}") except FileNotFoundError: logger.exception("File not found during setup.") @@ -129,6 +170,7 @@ def __init__( self, devices: list[Target], figure_of_merit: figure_of_merit = "expected_fidelity", + gnn: bool = False, logger_level: int = logging.INFO, ) -> None: """Initializes the Predictor class. @@ -137,12 +179,13 @@ def __init__( figure_of_merit: The figure of merit to be used for training. devices: The devices to be used for training. logger_level: The level of the logger. Defaults to logging.INFO. - + gnn: Decide if using GNN or other models """ logger.setLevel(logger_level) self.figure_of_merit = figure_of_merit self.devices = devices + self.gnn = gnn self.devices.sort( key=lambda x: x.description ) # sorting is necessary to determine the ground truth label later on when generating the training data @@ -280,13 +323,30 @@ def generate_training_data( training_sample, circuit_name, scores = sample if all(score == -1 for score in scores): continue - training_data.append(training_sample) + + if self.gnn: + x, _y, edge_idx, n_nodes, target_label = training_sample + name_device = sorted(scores.keys()) + value_device = [scores[i] for i in name_device] + gnn_training_sample = Data( + x=x, + y=torch.tensor(value_device, dtype=torch.float32), + edge_index=edge_idx, + num_nodes=n_nodes, + target_label=target_label, + ) + + training_data.append(gnn_training_sample if self.gnn else training_sample) names_list.append(circuit_name) scores_list.append(scores) with resources.as_file(path_training_data) as path: - data = np.asarray(training_data, dtype=object) - np.save(str(path / ("training_data_" + self.figure_of_merit + ".npy")), data) + if self.gnn: + torch.save(training_data, str(path / ("graph_dataset_" + self.figure_of_merit + ".pt"))) + else: + data = np.asarray(training_data, dtype=object) + np.save(str(path / ("training_data_" + self.figure_of_merit + ".npy")), data) + data = np.asarray(names_list, dtype=str) np.save(str(path / ("names_list_" + self.figure_of_merit + ".npy")), data) data = np.asarray(scores_list, dtype=object) @@ -298,7 +358,9 @@ def _generate_training_sample( path_uncompiled_circuit: Path, path_compiled_circuits: Path, logger_level: int = logging.INFO, - ) -> tuple[tuple[list[Any], Any], str, list[float]]: + ) -> tuple[ + tuple[list[float], Any] | tuple[torch.Tensor, torch.Tensor, torch.Tensor, int, str], str, dict[str, float] + ]: """Handles to create a training sample from a given file. Arguments: @@ -356,15 +418,291 @@ def _generate_training_sample( if num_not_empty_entries == 0: logger.warning("no compiled circuits found for:" + str(file)) - scores_list = list(scores.values()) + scores_list = scores # list(scores.values()) target_label = max(scores, key=lambda k: scores[k]) qc = QuantumCircuit.from_qasm_file(path_uncompiled_circuit / file) - feature_vec = create_feature_vector(qc) - training_sample = (feature_vec, target_label) + training_sample: TrainingSample + if self.gnn: + x, edge_index, number_of_gates = create_dag(qc) + y = torch.tensor([[dev.description for dev in self.devices].index(target_label)], dtype=torch.float) + training_sample = (x, y, edge_index, number_of_gates, target_label) + else: + feature_vec = create_feature_vector(qc) + training_sample = (feature_vec, target_label) circuit_name = str(file).split(".")[0] return training_sample, circuit_name, scores_list + def objective( + self, + trial: optuna.Trial, + dataset: NDArray[np.float64] | list[torch_geometric.data.Data], + task: str, + in_feats: int, + num_outputs: int, + loss_fn: nn.Module, + k_folds: int, + batch_size: int = 32, + num_epochs: int = 10, + patience: int = 10, + verbose: bool = False, + device: str | None = None, + ) -> float: + """Objective function for Optuna GNN hyperparameter optimization. + + Arguments: + trial: The Optuna trial object. + dataset: The dataset to use for training and validation. + task: The task to optimize (e.g., "binary", "multiclass", or "regression"). + in_feats: number of input features. + num_outputs: number of output features. + device: device to use for training. + loss_fn: loss function to use. + optimizer: optimizer to use. + k_folds: number of folds for cross-validation. + classes: list of class names (for classification tasks). + batch_size: batch size for training. + num_epochs: number of epochs for training. + patience: patience for early stopping. + verbose: whether to print verbose output during training. + + + Returns: + mean_val: The mean value in validation considering the k-folds. + """ + # Type of device used + if device is None: + device = "cuda" if torch.cuda.is_available() else "cpu" + device_obj = torch.device(device) + + hidden_dim = trial.suggest_int("hidden_dim", 8, 64) + num_conv_wo_resnet = trial.suggest_int("num_conv_wo_resnet", 1, 3) + num_resnet_layers = trial.suggest_int("num_resnet_layers", 1, 9) + dropout = trial.suggest_categorical("dropout", [0.0, 0.1, 0.2, 0.3]) + sag_pool = trial.suggest_categorical("sag_pool", [False, True]) + bidirectional = trial.suggest_categorical("bidirectional", [False, True]) # True, False]) + mlp_options = [ + "none", + "32", + "64", + "128", + "256", + "64,32", + "128,32", + "128,64", + "256,32", + "256,64", + "256,128", + "128,64,32", + "256,128,64", + ] + + mlp_str = trial.suggest_categorical("mlp", mlp_options) + mlp_units = [] if mlp_str == "none" else [int(x) for x in mlp_str.split(",")] + + num_outputs = max(1, len(self.devices)) + + # Split into k-folds + kf = KFold(n_splits=k_folds, shuffle=True) + fold_val_best_losses: list[float] = [] + + for _fold_idx, (train_idx, val_idx) in enumerate(kf.split(range(len(dataset)))): + train_subset = [dataset[i] for i in train_idx] + val_subset = [dataset[i] for i in val_idx] + # Transform the data into loaders + train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True) + val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False) + # Define the GNN + model = GNN( + in_feats=in_feats, + num_conv_wo_resnet=num_conv_wo_resnet, + hidden_dim=hidden_dim, + num_resnet_layers=num_resnet_layers, + mlp_units=mlp_units, + output_dim=num_outputs, + dropout_p=dropout, + bidirectional=bidirectional, + use_sag_pool=sag_pool, + sag_ratio=0.7, + conv_activation=torch.nn.functional.leaky_relu, + mlp_activation=torch.nn.functional.leaky_relu, + ).to(device_obj) + + optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) + # Based on the task, do a training and evaluation for regression or classification + train_model( + model, + train_loader, + optimizer, + loss_fn, + task=task, + num_epochs=num_epochs, + device=device, + verbose=verbose, + val_loader=val_loader, + patience=patience, + min_delta=0.0, + restore_best=True, + ) + if task == "regression": + val_loss, _, _ = evaluate_regression_model( + model, val_loader, loss_fn, device=device, return_arrays=False, verbose=verbose + ) + else: + val_loss, _, _ = evaluate_classification_model( + model, val_loader, loss_fn, device=device, return_arrays=False, verbose=verbose + ) + + fold_val_best_losses.append(float(val_loss)) + del train_loader, val_loader, train_subset, val_subset, optimizer, model + if device_obj.type == "cuda": + torch.cuda.empty_cache() + gc.collect() + # Take the mean value + return float(np.mean(fold_val_best_losses)) + + def train_gnn_model( + self, + training_data: TrainingData | None = None, + num_epochs: int = 10, + num_trials: int = 2, + patience: int = 10, + verbose: bool = False, + ) -> nn.Module: + """Train the GNN model(s) and return the trained model. + + Arguments: + training_data: The training data to use for training the model. + num_epochs: The number of epochs to train the model. + num_trials: The number of trials to run for hyperparameter optimization. + verbose: Whether to print verbose output during training. + patience: The patience variable for early stopping. + + Returns: + The trained GNN model. + """ + # Figure out outputs and save path + if self.figure_of_merit == "hellinger_distance": + if len(self.devices) != 1: + msg = "A single device must be provided for Hellinger distance model training." + raise ValueError(msg) + num_outputs = 1 + save_mdl_path = str(get_hellinger_model_path(self.devices[0], gnn=True)) + else: + num_outputs = max(1, len(self.devices)) + save_mdl_path = str(get_path_trained_model_gnn(self.figure_of_merit)) + + # Prepare data + if training_data is None: + training_data = self._get_prepared_training_data() + number_in_features = int(len(get_openqasm3_gates()) + 1 + 6 + 3 + 1 + 1) + loss_fn = nn.MSELoss() + if self.figure_of_merit == "hellinger_distance": + task = "regression" + else: + if num_outputs == 1: + # task = "binary" + task = "classification" + [dev.description for dev in self.devices] + sampler_obj = TPESampler(n_startup_trials=10) + study = optuna.create_study(study_name="Best GNN Model", direction="minimize", sampler=sampler_obj) + k_folds = min(len(training_data.y_train), 5) + + def _obj(trial: optuna.Trial) -> float: + return self.objective( + trial=trial, + dataset=training_data.X_train, + task=task, + in_feats=number_in_features, + num_outputs=num_outputs, + loss_fn=loss_fn, + k_folds=k_folds, + num_epochs=num_epochs, + patience=patience, + verbose=verbose, + ) + + study.optimize(_obj, n_trials=num_trials) + dict_best_hyper = study.best_trial.params # user_attrs.get("best_hparams") + # Build model (ensure final layer outputs raw logits/no activation) + json_dict = study.best_trial.params + mlp_str = dict_best_hyper["mlp"] + mlp_units = [] if mlp_str == "none" else [int(x) for x in mlp_str.split(",")] + + json_dict["num_outputs"] = len(self.devices) if self.figure_of_merit != "hellinger_distance" else 1 + if self.figure_of_merit != "hellinger_distance": + model = GNN( + in_feats=int(len(get_openqasm3_gates()) + 1 + 6 + 3 + 1 + 1), + num_conv_wo_resnet=dict_best_hyper["num_conv_wo_resnet"], + hidden_dim=dict_best_hyper["hidden_dim"], + num_resnet_layers=dict_best_hyper["num_resnet_layers"], + mlp_units=mlp_units, + output_dim=len(self.devices), + dropout_p=dict_best_hyper["dropout"], + bidirectional=dict_best_hyper["bidirectional"], + use_sag_pool=dict_best_hyper["sag_pool"], + sag_ratio=0.7, + conv_activation=torch.nn.functional.leaky_relu, + mlp_activation=torch.nn.functional.leaky_relu, + ).to("cuda" if torch.cuda.is_available() else "cpu") + else: + model = GNN( + in_feats=int(len(get_openqasm3_gates()) + 1 + 6 + 3 + 1 + 1), + num_conv_wo_resnet=dict_best_hyper["num_conv_wo_resnet"], + hidden_dim=dict_best_hyper["hidden_dim"], + num_resnet_layers=dict_best_hyper["num_resnet_layers"], + mlp_units=mlp_units, + output_dim=1, + dropout_p=dict_best_hyper["dropout"], + bidirectional=dict_best_hyper["bidirectional"], + use_sag_pool=dict_best_hyper["sag_pool"], + sag_ratio=0.7, + conv_activation=torch.nn.functional.leaky_relu, + mlp_activation=torch.nn.functional.leaky_relu, + ).to("cuda" if torch.cuda.is_available() else "cpu") + + json_path = Path(save_mdl_path).with_suffix(".json") # works whether save_mdl_path is str or Path + json_dict["class_labels"] = [dev.description for dev in self.devices] + with json_path.open("w", encoding="utf-8") as f: + json.dump(json_dict, f, indent=4) + + # Device handling + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + model.to(device) + # Optimizer + optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) + x_train, x_val, _y_train, _y_val = train_test_split( + training_data.X_train, training_data.y_train, test_size=0.2, random_state=5 + ) + # Dataloader + train_loader = DataLoader(x_train, batch_size=16, shuffle=True) + + val_loader = DataLoader(x_val, batch_size=16, shuffle=False) + train_model( + model, + train_loader, + optimizer, + loss_fn, + task=task, + num_epochs=num_epochs, + device=device, + verbose=verbose, + val_loader=val_loader, + patience=30, + min_delta=0.0, + restore_best=True, + ) + if verbose: + test_loader = DataLoader(training_data.X_test, batch_size=16, shuffle=False) + avg_loss_test, dict_results, _ = evaluate_classification_model( + model, test_loader, loss_fn=loss_fn, device=device, verbose=verbose + ) + print(f"Test loss: {avg_loss_test:.4f}, {dict_results}") + + # Save the model + torch.save(model.state_dict(), save_mdl_path) + return model + def train_random_forest_model( self, training_data: TrainingData | None = None ) -> RandomForestRegressor | RandomForestClassifier: @@ -420,23 +758,29 @@ def _get_prepared_training_data(self) -> TrainingData: """ with resources.as_file(get_path_training_data() / "training_data_aggregated") as path: prefix = f"{self.figure_of_merit}.npy" - file_data = path / f"training_data_{prefix}" file_names = path / f"names_list_{prefix}" file_scores = path / f"scores_list_{prefix}" + file_data = ( + path / f"training_data_{prefix}" if not self.gnn else path / f"graph_dataset_{self.figure_of_merit}.pt" + ) if file_data.is_file() and file_names.is_file() and file_scores.is_file(): - training_data = np.load(file_data, allow_pickle=True) + training_data = ( + np.load(file_data, allow_pickle=True) if not self.gnn else torch.load(file_data, weights_only=False) + ) names_list = list(np.load(file_names, allow_pickle=True)) scores_list = [list(scores) for scores in np.load(file_scores, allow_pickle=True)] else: msg = "Training data not found." raise FileNotFoundError(msg) - - x_list, y_list = zip(*training_data, strict=False) - x = np.array(x_list, dtype=np.float64) - y = np.array(y_list, dtype=str) + if not self.gnn: + x_list, y_list = zip(*training_data, strict=False) + x = np.array(x_list, dtype=np.float64) + y = np.array(y_list, dtype=str) + else: + x = training_data + y = np.array([el.target_label for el in training_data]) indices = np.arange(len(y), dtype=np.int64) - x_train, x_test, y_train, y_test, indices_train, indices_test = train_test_split( x, y, indices, test_size=0.3, random_state=5 ) @@ -454,13 +798,14 @@ def _get_prepared_training_data(self) -> TrainingData: def predict_device_for_figure_of_merit( - qc: Path | QuantumCircuit, figure_of_merit: figure_of_merit = "expected_fidelity" + qc: Path | QuantumCircuit, figure_of_merit: figure_of_merit = "expected_fidelity", gnn: bool = False ) -> Target: """Returns the probabilities for all supported quantum devices to be the most suitable one for the given quantum circuit. Arguments: qc: The QuantumCircuit or Path to the respective qasm file. figure_of_merit: The figure of merit to be used for compilation. + gnn: Whether to use a GNN for prediction. Defaults to False. Returns: The probabilities for all supported quantum devices to be the most suitable one for the given quantum circuit. @@ -472,22 +817,60 @@ def predict_device_for_figure_of_merit( if isinstance(qc, Path) and qc.exists(): qc = QuantumCircuit.from_qasm_file(qc) assert isinstance(qc, QuantumCircuit) - - path = get_path_trained_model(figure_of_merit) + path = get_path_trained_model(figure_of_merit) if not gnn else get_path_trained_model_gnn(figure_of_merit) if not path.exists(): error_msg = "The ML model is not trained yet. Please train the model before using it." logger.error(error_msg) raise FileNotFoundError(error_msg) - clf = load(path) - - feature_vector = create_feature_vector(qc) - - probabilities = clf.predict_proba([feature_vector])[0] - class_labels = clf.classes_ - # sort all devices with decreasing probabilities - sorted_devices = np.array([ - label for _, label in sorted(zip(probabilities, class_labels, strict=False), reverse=True) - ]) + if not gnn: + clf = load(path) + + feature_vector = create_feature_vector(qc) + + probabilities = clf.predict_proba([feature_vector])[0] + class_labels = clf.classes_ + # sort all devices with decreasing probabilities + sorted_devices = np.array([ + label for _, label in sorted(zip(probabilities, class_labels, strict=False), reverse=True) + ]) + else: + # Open the json file save_mdl_path[:-4] + ".json" + with Path.open(path.with_suffix(".json"), encoding="utf-8") as f: + json_dict = json.load(f) + + mlp_str = json_dict["mlp"] + mlp_units = [] if mlp_str == "none" else [int(x) for x in mlp_str.split(",")] + + gnn_model = GNN( + in_feats=int(len(get_openqasm3_gates()) + 1 + 6 + 3 + 1 + 1), + num_conv_wo_resnet=json_dict["num_conv_wo_resnet"], + hidden_dim=json_dict["hidden_dim"], + num_resnet_layers=json_dict["num_resnet_layers"], + mlp_units=mlp_units, + output_dim=json_dict["num_outputs"], + dropout_p=json_dict["dropout"], + bidirectional=json_dict["bidirectional"], + use_sag_pool=json_dict["sag_pool"], + sag_ratio=0.7, + conv_activation=torch.nn.functional.leaky_relu, + mlp_activation=torch.nn.functional.leaky_relu, + ).to("cuda" if torch.cuda.is_available() else "cpu") + gnn_model.load_state_dict(torch.load(path)) + x, edge_index, number_of_gates = create_dag(qc) + feature_vector = Data(x=x, edge_index=edge_index, num_gates=number_of_gates).to( + "cuda" if torch.cuda.is_available() else "cpu" + ) + gnn_model.eval() + class_labels = json_dict["class_labels"] + with torch.no_grad(): + outputs = gnn_model(feature_vector) + assert class_labels is not None + if len(class_labels) != len(outputs): + msg = "outputs and class_labels must be same length" + raise ValueError(msg) + + pairs = sorted(zip(outputs.tolist(), class_labels, strict=False), reverse=True) + sorted_devices = np.array([label for _, label in pairs]) for dev_name in sorted_devices: dev = get_device(dev_name) diff --git a/tests/device_selection/test_helper_ml.py b/tests/device_selection/test_helper_ml.py index daeda6825..8b57cd027 100644 --- a/tests/device_selection/test_helper_ml.py +++ b/tests/device_selection/test_helper_ml.py @@ -13,6 +13,7 @@ from mqt.bench import BenchmarkLevel, get_benchmark from mqt.predictor.ml.helper import ( + create_dag, create_feature_vector, get_openqasm_gates, get_path_training_circuits, @@ -28,6 +29,13 @@ def test_create_feature_vector() -> None: assert feature_vector is not None +def test_create_dag() -> None: + """Test the creation of a DAG.""" + qc = get_benchmark("dj", BenchmarkLevel.INDEP, 3).decompose() + dag = create_dag(qc) + assert dag is not None + + def test_get_openqasm_gates() -> None: """Test the retrieval of the OpenQASM gates.""" assert get_openqasm_gates() is not None diff --git a/tests/device_selection/test_predictor_ml.py b/tests/device_selection/test_predictor_ml.py index 0b2f1485f..884d20bd0 100644 --- a/tests/device_selection/test_predictor_ml.py +++ b/tests/device_selection/test_predictor_ml.py @@ -35,7 +35,12 @@ def path_compiled_circuits() -> Path: return Path("./test_compiled_circuits") -def test_setup_device_predictor_with_prediction(path_uncompiled_circuits: Path, path_compiled_circuits: Path) -> None: +@pytest.mark.parametrize( + ("gnn", "verbose"), [(False, False), (True, False), (True, True)], ids=["rf", "gnn", "gnn_verbose"] +) +def test_setup_device_predictor_with_prediction( + path_uncompiled_circuits: Path, path_compiled_circuits: Path, gnn: bool, verbose: bool +) -> None: """Test the full training pipeline and prediction using a mock device.""" if not path_uncompiled_circuits.exists(): path_uncompiled_circuits.mkdir() @@ -49,22 +54,28 @@ def test_setup_device_predictor_with_prediction(path_uncompiled_circuits: Path, dump(qc, f) device = get_device("ibm_falcon_127") - success = setup_device_predictor( devices=[device], figure_of_merit="expected_fidelity", path_uncompiled_circuits=path_uncompiled_circuits, path_compiled_circuits=path_compiled_circuits, + gnn=gnn, + verbose=verbose, ) assert success data_path = get_path_training_data() / "training_data_aggregated" - assert (data_path / "training_data_expected_fidelity.npy").exists() - assert (data_path / "names_list_expected_fidelity.npy").exists() - assert (data_path / "scores_list_expected_fidelity.npy").exists() + if gnn: + assert (data_path / "graph_dataset_expected_fidelity.pt").exists() + assert (data_path / "names_list_expected_fidelity.npy").exists() + assert (data_path / "scores_list_expected_fidelity.npy").exists() + else: + assert (data_path / "training_data_expected_fidelity.npy").exists() + assert (data_path / "names_list_expected_fidelity.npy").exists() + assert (data_path / "scores_list_expected_fidelity.npy").exists() test_qc = get_benchmark("ghz", BenchmarkLevel.ALG, 3) - predicted = predict_device_for_figure_of_merit(test_qc, figure_of_merit="expected_fidelity") + predicted = predict_device_for_figure_of_merit(test_qc, figure_of_merit="expected_fidelity", gnn=gnn) assert predicted.description == "ibm_falcon_127" @@ -86,7 +97,7 @@ def test_remove_files(path_uncompiled_circuits: Path, path_compiled_circuits: Pa data_path = get_path_training_data() / "training_data_aggregated" if data_path.exists(): for file in data_path.iterdir(): - if file.suffix == ".npy": + if file.suffix in (".npy", ".pt"): file.unlink() @@ -100,8 +111,9 @@ def test_predict_device_for_figure_of_merit_no_suitable_device() -> None: predict_device_for_figure_of_merit(qc) -def test_get_prepared_training_data_false_input() -> None: +@pytest.mark.parametrize("gnn", [False, True], ids=["rf", "gnn"]) +def test_get_prepared_training_data_false_input(gnn: bool) -> None: """Test the retrieval of prepared training data.""" - pred = Predictor(devices=[], figure_of_merit="expected_fidelity") + pred = Predictor(devices=[], figure_of_merit="expected_fidelity", gnn=gnn) with pytest.raises(FileNotFoundError, match=re.escape("Training data not found.")): pred._get_prepared_training_data() # noqa: SLF001 diff --git a/tests/hellinger_distance/test_estimated_hellinger_distance.py b/tests/hellinger_distance/test_estimated_hellinger_distance.py index 082acf610..e045f0e44 100644 --- a/tests/hellinger_distance/test_estimated_hellinger_distance.py +++ b/tests/hellinger_distance/test_estimated_hellinger_distance.py @@ -18,15 +18,17 @@ import numpy as np import pytest +import torch from mqt.bench import BenchmarkLevel, get_benchmark from mqt.bench.targets import get_available_device_names, get_device from qiskit import QuantumCircuit from qiskit.qasm2 import dump +from torch_geometric.data import Batch, Data from mqt.predictor.hellinger import calc_device_specific_features, hellinger_distance from mqt.predictor.ml import Predictor as ml_Predictor from mqt.predictor.ml import predict_device_for_figure_of_merit -from mqt.predictor.ml.helper import TrainingData, get_path_training_data +from mqt.predictor.ml.helper import TrainingData, create_dag, get_path_training_data from mqt.predictor.rl import Predictor as rl_Predictor if TYPE_CHECKING: @@ -152,20 +154,22 @@ def test_hellinger_distance_error() -> None: hellinger_distance(p=invalid, q=valid) -def test_train_random_forest_regressor_and_predict(device: Target) -> None: - """Test the training of the random forest regressor. The trained model is saved and used in the following tests.""" - # Setup the training environment +@pytest.mark.parametrize("gnn", [False, True], ids=["rf", "gnn"]) +def test_train_model_and_predict(device: Target, gnn: bool) -> None: + """Test the training of the RF and GNN models. The trained models are saved and used in the following tests.""" n_circuits = 20 qc = QuantumCircuit(device.num_qubits) for i in range(1, device.num_qubits): qc.cz(0, i) - # 1. Feature Extraction - feature_vector = calc_device_specific_features(qc, device) - feature_vector_list = [feature_vector] * n_circuits + if not gnn: + feature_vector = calc_device_specific_features(qc, device) + feature_vector_list = [feature_vector] * n_circuits + else: + x, edge_index, number_of_gates = create_dag(qc) + training_sample = [(x, edge_index, number_of_gates)] * n_circuits - # 2. Label Generation rng = np.random.default_rng() noisy = rng.random(device.num_qubits) noisy /= np.sum(noisy) @@ -173,17 +177,48 @@ def test_train_random_forest_regressor_and_predict(device: Target) -> None: noiseless[0] = 1.0 distance_label = hellinger_distance(noisy, noiseless) labels_list = [distance_label] * n_circuits - training_data = TrainingData(X_train=feature_vector_list, y_train=labels_list) + if not gnn: + training_data = TrainingData(X_train=feature_vector_list, y_train=labels_list) + else: + training_data_list = [] + for i in range(n_circuits): + x, edge_idx, n_nodes = training_sample[i] + gnn_training_sample = Data( + x=x, + y=torch.tensor(labels_list[i], dtype=torch.float32), + edge_index=edge_idx, + num_nodes=n_nodes, + ) + training_data_list.append(gnn_training_sample) + training_data = TrainingData(X_train=training_data_list, y_train=labels_list) # 3. Model Training - pred = ml_Predictor(figure_of_merit="hellinger_distance", devices=[device]) - trained_model = pred.train_random_forest_model(training_data) - - assert np.isclose(trained_model.predict([feature_vector]), distance_label) - - -def test_train_and_qcompile_with_hellinger_model(source_path: Path, target_path: Path, device: Target) -> None: - """Test the entire predictor toolchain with the Hellinger distance model that was trained in the previous test.""" + pred = ml_Predictor(figure_of_merit="hellinger_distance", devices=[device], gnn=gnn) + if gnn: + trained_model = pred.train_gnn_model(training_data, num_epochs=100, patience=20) + else: + trained_model = pred.train_random_forest_model(training_data) + + if not gnn: + assert np.isclose(trained_model.predict([feature_vector]), distance_label) + else: + trained_model.eval() + model_device = next(trained_model.parameters()).device + with torch.no_grad(): + batch = Batch.from_data_list(training_data.X_train).to(model_device) + out = trained_model(batch) + out = out.squeeze(-1) + predicted_values = out.cpu().numpy() + labels = np.asarray(labels_list, dtype=np.float32) + # it is set a tolerance value of 2e-1 just because of the small number of training samples + assert np.allclose(predicted_values, labels, atol=2e-1) + + +@pytest.mark.parametrize("gnn", [False, True], ids=["rf", "gnn"]) +def test_train_and_qcompile_with_hellinger_model( + source_path: Path, target_path: Path, device: Target, gnn: bool +) -> None: + """Test the entire predictor toolchain for estimating the Hellinger distance with both RF and GNN.""" figure_of_merit = "estimated_hellinger_distance" with warnings.catch_warnings(): @@ -202,7 +237,7 @@ def test_train_and_qcompile_with_hellinger_model(source_path: Path, target_path: ) # 2. Setup and train the machine learning model for device selection - ml_predictor = ml_Predictor(devices=[device], figure_of_merit=figure_of_merit) + ml_predictor = ml_Predictor(devices=[device], figure_of_merit=figure_of_merit, gnn=gnn) # Prepare uncompiled circuits if not source_path.exists(): @@ -217,10 +252,13 @@ def test_train_and_qcompile_with_hellinger_model(source_path: Path, target_path: dump(qc, f) # Generate compiled circuits (using trained RL model) - if sys.platform == "win32": + if sys.platform == "win32" and not gnn: with pytest.warns(RuntimeWarning, match=re.escape("Timeout is not supported on Windows.")): ml_predictor.compile_training_circuits( - timeout=600, path_compiled_circuits=target_path, path_uncompiled_circuits=source_path, num_workers=1 + timeout=600, + path_compiled_circuits=target_path, + path_uncompiled_circuits=source_path, + num_workers=1, ) else: ml_predictor.compile_training_circuits( @@ -231,24 +269,55 @@ def test_train_and_qcompile_with_hellinger_model(source_path: Path, target_path: ml_predictor.generate_training_data( path_uncompiled_circuits=source_path, path_compiled_circuits=target_path, num_workers=1 ) - - for file in [ - "training_data_estimated_hellinger_distance.npy", - "names_list_estimated_hellinger_distance.npy", - "scores_list_estimated_hellinger_distance.npy", - ]: - path = get_path_training_data() / "training_data_aggregated" / file - assert path.exists() + if gnn: + assert ( + get_path_training_data() / "training_data_aggregated" / "graph_dataset_estimated_hellinger_distance.pt" + ).exists() + else: + for file in [ + "training_data_estimated_hellinger_distance.npy", + "names_list_estimated_hellinger_distance.npy", + "scores_list_estimated_hellinger_distance.npy", + ]: + path = get_path_training_data() / "training_data_aggregated" / file + assert path.exists() # Train the ML model - ml_predictor.train_random_forest_model() + ml_predictor.train_gnn_model() if gnn else ml_predictor.train_random_forest_model() qc = get_benchmark("ghz", BenchmarkLevel.ALG, 3) # Test the prediction - predicted_dev = predict_device_for_figure_of_merit(qc, figure_of_merit) + predicted_dev = predict_device_for_figure_of_merit(qc, figure_of_merit, gnn=gnn) assert predicted_dev.description in get_available_device_names() +def test_remove_files(source_path: Path, target_path: Path) -> None: + """Remove files created during testing.""" + if source_path.exists(): + for file in source_path.iterdir(): + if file.suffix == ".qasm": + file.unlink() + source_path.rmdir() + + if target_path.exists(): + for file in target_path.iterdir(): + if file.suffix == ".qasm": + file.unlink() + target_path.rmdir() + + data_path = get_path_training_data() / "training_data_aggregated" + if data_path.exists(): + for file in data_path.iterdir(): + if file.suffix in (".npy", ".pt"): + file.unlink() + + model_path = get_path_training_data() / "trained_model" + if model_path.exists(): + for file in model_path.iterdir(): + if file.suffix in (".joblib", ".pth", ".json"): + file.unlink() + + def test_predict_device_for_estimated_hellinger_distance_no_device_provided() -> None: """Test the error handling of the device selection predictor when no device is provided for the Hellinger distance model.""" rng = np.random.default_rng()