当前位置: 首页 > news >正文

lerobot框架部署act模型

onnx模型导出

src\lerobot\policies\act\modeling_act.py修改如下:

#!/usr/bin/env python# Copyright 2024 Tony Z. Zhao and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Action Chunking Transformer PolicyAs per Learning Fine-Grained Bimanual Manipulation with Low-Cost Hardware (https://huggingface.co/papers/2304.13705).
The majority of changes here involve removing unused code, unifying naming, and adding helpful comments.
"""import math
from collections import deque
from collections.abc import Callable
from itertools import chainimport einops
import numpy as np
import torch
import torch.nn.functional as F  # noqa: N812
import torchvision
from torch import Tensor, nn
from torchvision.models._utils import IntermediateLayerGetter
from torchvision.ops.misc import FrozenBatchNorm2dfrom lerobot.constants import ACTION, OBS_IMAGES
from lerobot.policies.act.configuration_act import ACTConfig
from lerobot.policies.normalize import Normalize, Unnormalize
from lerobot.policies.pretrained import PreTrainedPolicyclass ACTPolicy(PreTrainedPolicy):"""Action Chunking Transformer Policy as per Learning Fine-Grained Bimanual Manipulation with Low-CostHardware (paper: https://huggingface.co/papers/2304.13705, code: https://github.com/tonyzhaozh/act)"""config_class = ACTConfigname = "act"def __init__(self,config: ACTConfig,dataset_stats: dict[str, dict[str, Tensor]] | None = None,):"""Args:config: Policy configuration class instance or None, in which case the default instantiation ofthe configuration class is used.dataset_stats: Dataset statistics to be used for normalization. If not passed here, it is expectedthat they will be passed with a call to `load_state_dict` before the policy is used."""super().__init__(config)config.validate_features()self.config = configself.normalize_inputs = Normalize(config.input_features, config.normalization_mapping, dataset_stats)self.normalize_targets = Normalize(config.output_features, config.normalization_mapping, dataset_stats)self.unnormalize_outputs = Unnormalize(config.output_features, config.normalization_mapping, dataset_stats)self.model = ACT(config)if config.temporal_ensemble_coeff is not None:self.temporal_ensembler = ACTTemporalEnsembler(config.temporal_ensemble_coeff, config.chunk_size)self.reset()def get_optim_params(self) -> dict:# TODO(aliberts, rcadene): As of now, lr_backbone == lr# Should we remove this and just `return self.parameters()`?return [{"params": [pfor n, p in self.named_parameters()if not n.startswith("model.backbone") and p.requires_grad]},{"params": [pfor n, p in self.named_parameters()if n.startswith("model.backbone") and p.requires_grad],"lr": self.config.optimizer_lr_backbone,},]def reset(self):"""This should be called whenever the environment is reset."""if self.config.temporal_ensemble_coeff is not None:self.temporal_ensembler.reset()else:self._action_queue = deque([], maxlen=self.config.n_action_steps)@torch.no_grad()def select_action(self, batch: dict[str, Tensor]) -> Tensor:"""Select a single action given environment observations.This method wraps `select_actions` in order to return one action at a time for execution in theenvironment. It works by managing the actions in a queue and only calling `select_actions` when thequeue is empty."""self.eval()  # keeping the policy in eval mode as it could be set to train mode while queue is consumedif self.config.temporal_ensemble_coeff is not None:actions = self.predict_action_chunk(batch)action = self.temporal_ensembler.update(actions)return action# Action queue logic for n_action_steps > 1. When the action_queue is depleted, populate it by# querying the policy.if len(self._action_queue) == 0:actions = self.predict_action_chunk(batch)[:, : self.config.n_action_steps]# `self.model.forward` returns a (batch_size, n_action_steps, action_dim) tensor, but the queue# effectively has shape (n_action_steps, batch_size, *), hence the transpose.self._action_queue.extend(actions.transpose(0, 1))return self._action_queue.popleft()@torch.no_grad()def predict_action_chunk(self, batch: dict[str, Tensor]) -> Tensor:"""Predict a chunk of actions given environment observations."""self.eval()batch = self.normalize_inputs(batch)if self.config.image_features:batch = dict(batch)  # shallow copy so that adding a key doesn't modify the originalbatch[OBS_IMAGES] = [batch[key] for key in self.config.image_features]actions = self.model(batch)[0]actions = self.unnormalize_outputs({ACTION: actions})[ACTION]return actions# def forward(self, batch: dict[str, Tensor]) -> tuple[Tensor, dict]:#     """Run the batch through the model and compute the loss for training or validation."""#     batch = self.normalize_inputs(batch)#     if self.config.image_features:#         batch = dict(batch)  # shallow copy so that adding a key doesn't modify the original#         batch[OBS_IMAGES] = [batch[key] for key in self.config.image_features]#     batch = self.normalize_targets(batch)#     actions_hat, (mu_hat, log_sigma_x2_hat) = self.model(batch)#     l1_loss = (#         F.l1_loss(batch[ACTION], actions_hat, reduction="none") * ~batch["action_is_pad"].unsqueeze(-1)#     ).mean()#     loss_dict = {"l1_loss": l1_loss.item()}#     if self.config.use_vae:#         # Calculate Dₖₗ(latent_pdf || standard_normal). Note: After computing the KL-divergence for#         # each dimension independently, we sum over the latent dimension to get the total#         # KL-divergence per batch element, then take the mean over the batch.#         # (See App. B of https://huggingface.co/papers/1312.6114 for more details).#         mean_kld = (#             (-0.5 * (1 + log_sigma_x2_hat - mu_hat.pow(2) - (log_sigma_x2_hat).exp())).sum(-1).mean()#         )#         loss_dict["kld_loss"] = mean_kld.item()#         loss = l1_loss + mean_kld * self.config.kl_weight#     else:#         loss = l1_loss#     return loss, loss_dictdef forward(self, image, state):batch = {'observation.images.top':image, 'observation.state':state}self.eval()  # keeping the policy in eval mode as it could be set to train mode while queue is consumedif self.config.temporal_ensemble_coeff is not None:actions = self.predict_action_chunk(batch)action = self.temporal_ensembler.update(actions)return action# Action queue logic for n_action_steps > 1. When the action_queue is depleted, populate it by# querying the policy.if len(self._action_queue) == 0:actions = self.predict_action_chunk(batch)[:, : self.config.n_action_steps]# `self.model.forward` returns a (batch_size, n_action_steps, action_dim) tensor, but the queue# effectively has shape (n_action_steps, batch_size, *), hence the transpose.self._action_queue.extend(actions.transpose(0, 1))return self._action_queue.popleft()class ACTTemporalEnsembler:def __init__(self, temporal_ensemble_coeff: float, chunk_size: int) -> None:"""Temporal ensembling as described in Algorithm 2 of https://huggingface.co/papers/2304.13705.The weights are calculated as wᵢ = exp(-temporal_ensemble_coeff * i) where w₀ is the oldest action.They are then normalized to sum to 1 by dividing by Σwᵢ. Here's some intuition around how thecoefficient works:- Setting it to 0 uniformly weighs all actions.- Setting it positive gives more weight to older actions.- Setting it negative gives more weight to newer actions.NOTE: The default value for `temporal_ensemble_coeff` used by the original ACT work is 0.01. Thisresults in older actions being weighed more highly than newer actions (the experiments documented inhttps://github.com/huggingface/lerobot/pull/319 hint at why highly weighing new actions might bedetrimental: doing so aggressively may diminish the benefits of action chunking).Here we use an online method for computing the average rather than caching a history of actions inorder to compute the average offline. For a simple 1D sequence it looks something like:```import torchseq = torch.linspace(8, 8.5, 100)print(seq)m = 0.01exp_weights = torch.exp(-m * torch.arange(len(seq)))print(exp_weights)# Calculate offlineavg = (exp_weights * seq).sum() / exp_weights.sum()print("offline", avg)# Calculate onlinefor i, item in enumerate(seq):if i == 0:avg = itemcontinueavg *= exp_weights[:i].sum()avg += item * exp_weights[i]avg /= exp_weights[: i + 1].sum()print("online", avg)```"""self.chunk_size = chunk_sizeself.ensemble_weights = torch.exp(-temporal_ensemble_coeff * torch.arange(chunk_size))self.ensemble_weights_cumsum = torch.cumsum(self.ensemble_weights, dim=0)self.reset()def reset(self):"""Resets the online computation variables."""self.ensembled_actions = None# (chunk_size,) count of how many actions are in the ensemble for each time step in the sequence.self.ensembled_actions_count = Nonedef update(self, actions: Tensor) -> Tensor:"""Takes a (batch, chunk_size, action_dim) sequence of actions, update the temporal ensemble for alltime steps, and pop/return the next batch of actions in the sequence."""self.ensemble_weights = self.ensemble_weights.to(device=actions.device)self.ensemble_weights_cumsum = self.ensemble_weights_cumsum.to(device=actions.device)if self.ensembled_actions is None:# Initializes `self._ensembled_action` to the sequence of actions predicted during the first# time step of the episode.self.ensembled_actions = actions.clone()# Note: The last dimension is unsqueeze to make sure we can broadcast properly for tensor# operations later.self.ensembled_actions_count = torch.ones((self.chunk_size, 1), dtype=torch.long, device=self.ensembled_actions.device)else:# self.ensembled_actions will have shape (batch_size, chunk_size - 1, action_dim). Compute# the online update for those entries.self.ensembled_actions *= self.ensemble_weights_cumsum[self.ensembled_actions_count - 1]self.ensembled_actions += actions[:, :-1] * self.ensemble_weights[self.ensembled_actions_count]self.ensembled_actions /= self.ensemble_weights_cumsum[self.ensembled_actions_count]self.ensembled_actions_count = torch.clamp(self.ensembled_actions_count + 1, max=self.chunk_size)# The last action, which has no prior online average, needs to get concatenated onto the end.self.ensembled_actions = torch.cat([self.ensembled_actions, actions[:, -1:]], dim=1)self.ensembled_actions_count = torch.cat([self.ensembled_actions_count, torch.ones_like(self.ensembled_actions_count[-1:])])# "Consume" the first action.action, self.ensembled_actions, self.ensembled_actions_count = (self.ensembled_actions[:, 0],self.ensembled_actions[:, 1:],self.ensembled_actions_count[1:],)return actionclass ACT(nn.Module):"""Action Chunking Transformer: The underlying neural network for ACTPolicy.Note: In this code we use the terms `vae_encoder`, 'encoder', `decoder`. The meanings are as follows.- The `vae_encoder` is, as per the literature around variational auto-encoders (VAE), the part of themodel that encodes the target data (a sequence of actions), and the condition (the robotjoint-space).- A transformer with an `encoder` (not the VAE encoder) and `decoder` (not the VAE decoder) withcross-attention is used as the VAE decoder. For these terms, we drop the `vae_` prefix because wehave an option to train this model without the variational objective (in which case we drop the`vae_encoder` altogether, and nothing about this model has anything to do with a VAE).TransformerUsed alone for inference(acts as VAE decoderduring training)┌───────────────────────┐│             Outputs   ││                ▲      ││     ┌─────►┌───────┐  │┌──────┐     │     │      │Transf.│  ││      │     │     ├─────►│decoder│  │┌────┴────┐ │     │     │      │       │  ││         │ │     │ ┌───┴───┬─►│       │  ││ VAE     │ │     │ │       │  └───────┘  ││ encoder │ │     │ │Transf.│             ││         │ │     │ │encoder│             │└───▲─────┘ │     │ │       │             ││       │     │ └▲──▲─▲─┘             ││       │     │  │  │ │               │inputs    └─────┼──┘  │ image emb.      ││    state emb.         │└───────────────────────┘"""def __init__(self, config: ACTConfig):# BERT style VAE encoder with input tokens [cls, robot_state, *action_sequence].# The cls token forms parameters of the latent's distribution (like this [*means, *log_variances]).super().__init__()self.config = configif self.config.use_vae:self.vae_encoder = ACTEncoder(config, is_vae_encoder=True)self.vae_encoder_cls_embed = nn.Embedding(1, config.dim_model)# Projection layer for joint-space configuration to hidden dimension.if self.config.robot_state_feature:self.vae_encoder_robot_state_input_proj = nn.Linear(self.config.robot_state_feature.shape[0], config.dim_model)# Projection layer for action (joint-space target) to hidden dimension.self.vae_encoder_action_input_proj = nn.Linear(self.config.action_feature.shape[0],config.dim_model,)# Projection layer from the VAE encoder's output to the latent distribution's parameter space.self.vae_encoder_latent_output_proj = nn.Linear(config.dim_model, config.latent_dim * 2)# Fixed sinusoidal positional embedding for the input to the VAE encoder. Unsqueeze for batch# dimension.num_input_token_encoder = 1 + config.chunk_sizeif self.config.robot_state_feature:num_input_token_encoder += 1self.register_buffer("vae_encoder_pos_enc",create_sinusoidal_pos_embedding(num_input_token_encoder, config.dim_model).unsqueeze(0),)# Backbone for image feature extraction.if self.config.image_features:backbone_model = getattr(torchvision.models, config.vision_backbone)(replace_stride_with_dilation=[False, False, config.replace_final_stride_with_dilation],weights=config.pretrained_backbone_weights,norm_layer=FrozenBatchNorm2d,)# Note: The assumption here is that we are using a ResNet model (and hence layer4 is the final# feature map).# Note: The forward method of this returns a dict: {"feature_map": output}.self.backbone = IntermediateLayerGetter(backbone_model, return_layers={"layer4": "feature_map"})# Transformer (acts as VAE decoder when training with the variational objective).self.encoder = ACTEncoder(config)self.decoder = ACTDecoder(config)# Transformer encoder input projections. The tokens will be structured like# [latent, (robot_state), (env_state), (image_feature_map_pixels)].if self.config.robot_state_feature:self.encoder_robot_state_input_proj = nn.Linear(self.config.robot_state_feature.shape[0], config.dim_model)if self.config.env_state_feature:self.encoder_env_state_input_proj = nn.Linear(self.config.env_state_feature.shape[0], config.dim_model)self.encoder_latent_input_proj = nn.Linear(config.latent_dim, config.dim_model)if self.config.image_features:self.encoder_img_feat_input_proj = nn.Conv2d(backbone_model.fc.in_features, config.dim_model, kernel_size=1)# Transformer encoder positional embeddings.n_1d_tokens = 1  # for the latentif self.config.robot_state_feature:n_1d_tokens += 1if self.config.env_state_feature:n_1d_tokens += 1self.encoder_1d_feature_pos_embed = nn.Embedding(n_1d_tokens, config.dim_model)if self.config.image_features:self.encoder_cam_feat_pos_embed = ACTSinusoidalPositionEmbedding2d(config.dim_model // 2)# Transformer decoder.# Learnable positional embedding for the transformer's decoder (in the style of DETR object queries).self.decoder_pos_embed = nn.Embedding(config.chunk_size, config.dim_model)# Final action regression head on the output of the transformer's decoder.self.action_head = nn.Linear(config.dim_model, self.config.action_feature.shape[0])self._reset_parameters()def _reset_parameters(self):"""Xavier-uniform initialization of the transformer parameters as in the original code."""for p in chain(self.encoder.parameters(), self.decoder.parameters()):if p.dim() > 1:nn.init.xavier_uniform_(p)def forward(self, batch: dict[str, Tensor]) -> tuple[Tensor, tuple[Tensor, Tensor] | tuple[None, None]]:"""A forward pass through the Action Chunking Transformer (with optional VAE encoder).`batch` should have the following structure:{[robot_state_feature] (optional): (B, state_dim) batch of robot states.[image_features]: (B, n_cameras, C, H, W) batch of images.AND/OR[env_state_feature]: (B, env_dim) batch of environment states.[action_feature] (optional, only if training with VAE): (B, chunk_size, action dim) batch of actions.}Returns:(B, chunk_size, action_dim) batch of action sequencesTuple containing the latent PDF's parameters (mean, log(σ²)) both as (B, L) tensors where L is thelatent dimension."""if self.config.use_vae and self.training:assert "action" in batch, ("actions must be provided when using the variational objective in training mode.")if "observation.images" in batch:batch_size = batch["observation.images"][0].shape[0]else:batch_size = batch["observation.environment_state"].shape[0]# Prepare the latent for input to the transformer encoder.if self.config.use_vae and "action" in batch and self.training:# Prepare the input to the VAE encoder: [cls, *joint_space_configuration, *action_sequence].cls_embed = einops.repeat(self.vae_encoder_cls_embed.weight, "1 d -> b 1 d", b=batch_size)  # (B, 1, D)if self.config.robot_state_feature:robot_state_embed = self.vae_encoder_robot_state_input_proj(batch["observation.state"])robot_state_embed = robot_state_embed.unsqueeze(1)  # (B, 1, D)action_embed = self.vae_encoder_action_input_proj(batch["action"])  # (B, S, D)if self.config.robot_state_feature:vae_encoder_input = [cls_embed, robot_state_embed, action_embed]  # (B, S+2, D)else:vae_encoder_input = [cls_embed, action_embed]vae_encoder_input = torch.cat(vae_encoder_input, axis=1)# Prepare fixed positional embedding.# Note: detach() shouldn't be necessary but leaving it the same as the original code just in case.pos_embed = self.vae_encoder_pos_enc.clone().detach()  # (1, S+2, D)# Prepare key padding mask for the transformer encoder. We have 1 or 2 extra tokens at the start of the# sequence depending whether we use the input states or not (cls and robot state)# False means not a padding token.cls_joint_is_pad = torch.full((batch_size, 2 if self.config.robot_state_feature else 1),False,device=batch["observation.state"].device,)key_padding_mask = torch.cat([cls_joint_is_pad, batch["action_is_pad"]], axis=1)  # (bs, seq+1 or 2)# Forward pass through VAE encoder to get the latent PDF parameters.cls_token_out = self.vae_encoder(vae_encoder_input.permute(1, 0, 2),pos_embed=pos_embed.permute(1, 0, 2),key_padding_mask=key_padding_mask,)[0]  # select the class token, with shape (B, D)latent_pdf_params = self.vae_encoder_latent_output_proj(cls_token_out)mu = latent_pdf_params[:, : self.config.latent_dim]# This is 2log(sigma). Done this way to match the original implementation.log_sigma_x2 = latent_pdf_params[:, self.config.latent_dim :]# Sample the latent with the reparameterization trick.latent_sample = mu + log_sigma_x2.div(2).exp() * torch.randn_like(mu)else:# When not using the VAE encoder, we set the latent to be all zeros.mu = log_sigma_x2 = None# TODO(rcadene, alexander-soare): remove call to `.to` to speedup forward ; precompute and use bufferlatent_sample = torch.zeros([batch_size, self.config.latent_dim], dtype=torch.float32).to(batch["observation.state"].device)# Prepare transformer encoder inputs.encoder_in_tokens = [self.encoder_latent_input_proj(latent_sample)]encoder_in_pos_embed = list(self.encoder_1d_feature_pos_embed.weight.unsqueeze(1))# Robot state token.if self.config.robot_state_feature:encoder_in_tokens.append(self.encoder_robot_state_input_proj(batch["observation.state"]))# Environment state token.if self.config.env_state_feature:encoder_in_tokens.append(self.encoder_env_state_input_proj(batch["observation.environment_state"]))if self.config.image_features:# For a list of images, the H and W may vary but H*W is constant.# NOTE: If modifying this section, verify on MPS devices that# gradients remain stable (no explosions or NaNs).for img in batch["observation.images"]:cam_features = self.backbone(img)["feature_map"]cam_pos_embed = self.encoder_cam_feat_pos_embed(cam_features).to(dtype=cam_features.dtype)cam_features = self.encoder_img_feat_input_proj(cam_features)# Rearrange features to (sequence, batch, dim).cam_features = einops.rearrange(cam_features, "b c h w -> (h w) b c")cam_pos_embed = einops.rearrange(cam_pos_embed, "b c h w -> (h w) b c")# Extend immediately instead of accumulating and concatenating# Convert to list to extend properlyencoder_in_tokens.extend(list(cam_features))encoder_in_pos_embed.extend(list(cam_pos_embed))# Stack all tokens along the sequence dimension.encoder_in_tokens = torch.stack(encoder_in_tokens, axis=0)encoder_in_pos_embed = torch.stack(encoder_in_pos_embed, axis=0)# Forward pass through the transformer modules.encoder_out = self.encoder(encoder_in_tokens, pos_embed=encoder_in_pos_embed)# TODO(rcadene, alexander-soare): remove call to `device` ; precompute and use bufferdecoder_in = torch.zeros((self.config.chunk_size, batch_size, self.config.dim_model),dtype=encoder_in_pos_embed.dtype,device=encoder_in_pos_embed.device,)decoder_out = self.decoder(decoder_in,encoder_out,encoder_pos_embed=encoder_in_pos_embed,decoder_pos_embed=self.decoder_pos_embed.weight.unsqueeze(1),)# Move back to (B, S, C).decoder_out = decoder_out.transpose(0, 1)actions = self.action_head(decoder_out)return actions, (mu, log_sigma_x2)class ACTEncoder(nn.Module):"""Convenience module for running multiple encoder layers, maybe followed by normalization."""def __init__(self, config: ACTConfig, is_vae_encoder: bool = False):super().__init__()self.is_vae_encoder = is_vae_encodernum_layers = config.n_vae_encoder_layers if self.is_vae_encoder else config.n_encoder_layersself.layers = nn.ModuleList([ACTEncoderLayer(config) for _ in range(num_layers)])self.norm = nn.LayerNorm(config.dim_model) if config.pre_norm else nn.Identity()def forward(self, x: Tensor, pos_embed: Tensor | None = None, key_padding_mask: Tensor | None = None) -> Tensor:for layer in self.layers:x = layer(x, pos_embed=pos_embed, key_padding_mask=key_padding_mask)x = self.norm(x)return xclass ACTEncoderLayer(nn.Module):def __init__(self, config: ACTConfig):super().__init__()self.self_attn = nn.MultiheadAttention(config.dim_model, config.n_heads, dropout=config.dropout)# Feed forward layers.self.linear1 = nn.Linear(config.dim_model, config.dim_feedforward)self.dropout = nn.Dropout(config.dropout)self.linear2 = nn.Linear(config.dim_feedforward, config.dim_model)self.norm1 = nn.LayerNorm(config.dim_model)self.norm2 = nn.LayerNorm(config.dim_model)self.dropout1 = nn.Dropout(config.dropout)self.dropout2 = nn.Dropout(config.dropout)self.activation = get_activation_fn(config.feedforward_activation)self.pre_norm = config.pre_normdef forward(self, x, pos_embed: Tensor | None = None, key_padding_mask: Tensor | None = None) -> Tensor:skip = xif self.pre_norm:x = self.norm1(x)q = k = x if pos_embed is None else x + pos_embedx = self.self_attn(q, k, value=x, key_padding_mask=key_padding_mask)x = x[0]  # note: [0] to select just the output, not the attention weightsx = skip + self.dropout1(x)if self.pre_norm:skip = xx = self.norm2(x)else:x = self.norm1(x)skip = xx = self.linear2(self.dropout(self.activation(self.linear1(x))))x = skip + self.dropout2(x)if not self.pre_norm:x = self.norm2(x)return xclass ACTDecoder(nn.Module):def __init__(self, config: ACTConfig):"""Convenience module for running multiple decoder layers followed by normalization."""super().__init__()self.layers = nn.ModuleList([ACTDecoderLayer(config) for _ in range(config.n_decoder_layers)])self.norm = nn.LayerNorm(config.dim_model)def forward(self,x: Tensor,encoder_out: Tensor,decoder_pos_embed: Tensor | None = None,encoder_pos_embed: Tensor | None = None,) -> Tensor:for layer in self.layers:x = layer(x, encoder_out, decoder_pos_embed=decoder_pos_embed, encoder_pos_embed=encoder_pos_embed)if self.norm is not None:x = self.norm(x)return xclass ACTDecoderLayer(nn.Module):def __init__(self, config: ACTConfig):super().__init__()self.self_attn = nn.MultiheadAttention(config.dim_model, config.n_heads, dropout=config.dropout)self.multihead_attn = nn.MultiheadAttention(config.dim_model, config.n_heads, dropout=config.dropout)# Feed forward layers.self.linear1 = nn.Linear(config.dim_model, config.dim_feedforward)self.dropout = nn.Dropout(config.dropout)self.linear2 = nn.Linear(config.dim_feedforward, config.dim_model)self.norm1 = nn.LayerNorm(config.dim_model)self.norm2 = nn.LayerNorm(config.dim_model)self.norm3 = nn.LayerNorm(config.dim_model)self.dropout1 = nn.Dropout(config.dropout)self.dropout2 = nn.Dropout(config.dropout)self.dropout3 = nn.Dropout(config.dropout)self.activation = get_activation_fn(config.feedforward_activation)self.pre_norm = config.pre_normdef maybe_add_pos_embed(self, tensor: Tensor, pos_embed: Tensor | None) -> Tensor:return tensor if pos_embed is None else tensor + pos_embed.expand(-1, tensor.shape[1], -1)def forward(self,x: Tensor,encoder_out: Tensor,decoder_pos_embed: Tensor | None = None,encoder_pos_embed: Tensor | None = None,) -> Tensor:"""Args:x: (Decoder Sequence, Batch, Channel) tensor of input tokens.encoder_out: (Encoder Sequence, B, C) output features from the last layer of the encoder we arecross-attending with.decoder_pos_embed: (ES, 1, C) positional embedding for keys (from the encoder).encoder_pos_embed: (DS, 1, C) Positional_embedding for the queries (from the decoder).Returns:(DS, B, C) tensor of decoder output features."""skip = xif self.pre_norm:x = self.norm1(x)q = k = self.maybe_add_pos_embed(x, decoder_pos_embed)x = self.self_attn(q, k, value=x)[0]  # select just the output, not the attention weightsx = skip + self.dropout1(x)if self.pre_norm:skip = xx = self.norm2(x)else:x = self.norm1(x)skip = xx = self.multihead_attn(query=self.maybe_add_pos_embed(x, decoder_pos_embed),key=self.maybe_add_pos_embed(encoder_out, encoder_pos_embed),value=encoder_out,)[0]  # select just the output, not the attention weightsx = skip + self.dropout2(x)if self.pre_norm:skip = xx = self.norm3(x)else:x = self.norm2(x)skip = xx = self.linear2(self.dropout(self.activation(self.linear1(x))))x = skip + self.dropout3(x)if not self.pre_norm:x = self.norm3(x)return xdef create_sinusoidal_pos_embedding(num_positions: int, dimension: int) -> Tensor:"""1D sinusoidal positional embeddings as in Attention is All You Need.Args:num_positions: Number of token positions required.Returns: (num_positions, dimension) position embeddings (the first dimension is the batch dimension)."""def get_position_angle_vec(position):return [position / np.power(10000, 2 * (hid_j // 2) / dimension) for hid_j in range(dimension)]sinusoid_table = np.array([get_position_angle_vec(pos_i) for pos_i in range(num_positions)])sinusoid_table[:, 0::2] = np.sin(sinusoid_table[:, 0::2])  # dim 2isinusoid_table[:, 1::2] = np.cos(sinusoid_table[:, 1::2])  # dim 2i+1return torch.from_numpy(sinusoid_table).float()class ACTSinusoidalPositionEmbedding2d(nn.Module):"""2D sinusoidal positional embeddings similar to what's presented in Attention Is All You Need.The variation is that the position indices are normalized in [0, 2π] (not quite: the lower bound is 1/Hfor the vertical direction, and 1/W for the horizontal direction."""def __init__(self, dimension: int):"""Args:dimension: The desired dimension of the embeddings."""super().__init__()self.dimension = dimensionself._two_pi = 2 * math.piself._eps = 1e-6# Inverse "common ratio" for the geometric progression in sinusoid frequencies.self._temperature = 10000def forward(self, x: Tensor) -> Tensor:"""Args:x: A (B, C, H, W) batch of 2D feature map to generate the embeddings for.Returns:A (1, C, H, W) batch of corresponding sinusoidal positional embeddings."""not_mask = torch.ones_like(x[0, :1])  # (1, H, W)# Note: These are like range(1, H+1) and range(1, W+1) respectively, but in most implementations# they would be range(0, H) and range(0, W). Keeping it at as is to match the original code.y_range = not_mask.cumsum(1, dtype=torch.float32)x_range = not_mask.cumsum(2, dtype=torch.float32)# "Normalize" the position index such that it ranges in [0, 2π].# Note: Adding epsilon on the denominator should not be needed as all values of y_embed and x_range# are non-zero by construction. This is an artifact of the original code.y_range = y_range / (y_range[:, -1:, :] + self._eps) * self._two_pix_range = x_range / (x_range[:, :, -1:] + self._eps) * self._two_piinverse_frequency = self._temperature ** (2 * (torch.arange(self.dimension, dtype=torch.float32, device=x.device) // 2) / self.dimension)x_range = x_range.unsqueeze(-1) / inverse_frequency  # (1, H, W, 1)y_range = y_range.unsqueeze(-1) / inverse_frequency  # (1, H, W, 1)# Note: this stack then flatten operation results in interleaved sine and cosine terms.# pos_embed_x and pos_embed_y are (1, H, W, C // 2).pos_embed_x = torch.stack((x_range[..., 0::2].sin(), x_range[..., 1::2].cos()), dim=-1).flatten(3)pos_embed_y = torch.stack((y_range[..., 0::2].sin(), y_range[..., 1::2].cos()), dim=-1).flatten(3)pos_embed = torch.cat((pos_embed_y, pos_embed_x), dim=3).permute(0, 3, 1, 2)  # (1, C, H, W)return pos_embeddef get_activation_fn(activation: str) -> Callable:"""Return an activation function given a string."""if activation == "relu":return F.reluif activation == "gelu":return F.geluif activation == "glu":return F.gluraise RuntimeError(f"activation should be relu/gelu/glu, not {activation}.")

src\lerobot\scripts\eval.py修改下面几行:

# with torch.inference_mode():
#     action = policy.select_action(observation)  image = torch.randn(50, 3, 480, 640).to("cuda")
state = torch.randn(50, 14).to("cuda")
torch.onnx.register_custom_op_symbolic("aten::lift_fresh", lambda g, x: x, 13)
torch.onnx.export(policy, (image, state), "act.onnx", opset_version=13)
exit(0)

运行

python lerobot/scripts/eval.py \--policy.path=act_aloha_transfer \--env.type=aloha \--env.task=AlohaTransferCube-v0 \--eval.n_episodes=500 \--eval.batch_size=50 \--device=cuda \--use_amp=false

onnx模型部署

import numpy as np
import onnxruntimeonnx_session = onnxruntime.InferenceSession("act.onnx", providers=['CUDAExecutionProvider', 'CPUExecutionProvider'])input_name = []
for node in onnx_session.get_inputs():input_name.append(node.name)inputs = {}
inputs['onnx::Sub_0'] = np.random.randn(50, 3, 480, 640).astype(np.float32)
inputs['onnx::Sub_1'] = np.random.randn(50, 14).astype(np.float32)outputs = onnx_session.run(None, inputs)
print(outputs)

tensorrt模型部署

import numpy as np
import tensorrt as trt
import commonlogger = trt.Logger(trt.Logger.WARNING)
with open("act.engine", "rb") as f, trt.Runtime(logger) as runtime:engine = runtime.deserialize_cuda_engine(f.read())
context = engine.create_execution_context()
inputs, outputs, bindings, stream = common.allocate_buffers(engine)input0 = np.random.randn(50, 3, 480, 640).astype(np.float32)
input1 = np.random.randn(50, 14).astype(np.float32)
np.copyto(inputs[0].host, input0.ravel())
np.copyto(inputs[1].host, input1.ravel())output = common.do_inference(context,engine=engine, bindings=bindings,inputs=inputs, outputs=outputs, stream=stream)
print(output)

common.py

#
# SPDX-FileCopyrightText: Copyright (c) 1993-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#import argparse
import os
import ctypes
from typing import Optional, Listimport numpy as np
import tensorrt as trt
from cuda import cuda, cudarttry:# Sometimes python does not understand FileNotFoundErrorFileNotFoundError
except NameError:FileNotFoundError = IOErrorEXPLICIT_BATCH = 1 << (int)(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)def check_cuda_err(err):if isinstance(err, cuda.CUresult):if err != cuda.CUresult.CUDA_SUCCESS:raise RuntimeError("Cuda Error: {}".format(err))if isinstance(err, cudart.cudaError_t):if err != cudart.cudaError_t.cudaSuccess:raise RuntimeError("Cuda Runtime Error: {}".format(err))else:raise RuntimeError("Unknown error type: {}".format(err))def cuda_call(call):err, res = call[0], call[1:]check_cuda_err(err)if len(res) == 1:res = res[0]return resdef GiB(val):return val * 1 << 30def add_help(description):parser = argparse.ArgumentParser(description=description, formatter_class=argparse.ArgumentDefaultsHelpFormatter)args, _ = parser.parse_known_args()def find_sample_data(description="Runs a TensorRT Python sample", subfolder="", find_files=[], err_msg=""):"""Parses sample arguments.Args:description (str): Description of the sample.subfolder (str): The subfolder containing data relevant to this samplefind_files (str): A list of filenames to find. Each filename will be replaced with an absolute path.Returns:str: Path of data directory."""# Standard command-line arguments for all samples.kDEFAULT_DATA_ROOT = os.path.join(os.sep, "usr", "src", "tensorrt", "data")parser = argparse.ArgumentParser(description=description, formatter_class=argparse.ArgumentDefaultsHelpFormatter)parser.add_argument("-d","--datadir",help="Location of the TensorRT sample data directory, and any additional data directories.",action="append",default=[kDEFAULT_DATA_ROOT],)args, _ = parser.parse_known_args()def get_data_path(data_dir):# If the subfolder exists, append it to the path, otherwise use the provided path as-is.data_path = os.path.join(data_dir, subfolder)if not os.path.exists(data_path):if data_dir != kDEFAULT_DATA_ROOT:print("WARNING: " + data_path + " does not exist. Trying " + data_dir + " instead.")data_path = data_dir# Make sure data directory exists.if not (os.path.exists(data_path)) and data_dir != kDEFAULT_DATA_ROOT:print("WARNING: {:} does not exist. Please provide the correct data path with the -d option.".format(data_path))return data_pathdata_paths = [get_data_path(data_dir) for data_dir in args.datadir]return data_paths, locate_files(data_paths, find_files, err_msg)def locate_files(data_paths, filenames, err_msg=""):"""Locates the specified files in the specified data directories.If a file exists in multiple data directories, the first directory is used.Args:data_paths (List[str]): The data directories.filename (List[str]): The names of the files to find.Returns:List[str]: The absolute paths of the files.Raises:FileNotFoundError if a file could not be located."""found_files = [None] * len(filenames)for data_path in data_paths:# Find all requested files.for index, (found, filename) in enumerate(zip(found_files, filenames)):if not found:file_path = os.path.abspath(os.path.join(data_path, filename))if os.path.exists(file_path):found_files[index] = file_path# Check that all files were foundfor f, filename in zip(found_files, filenames):if not f or not os.path.exists(f):raise FileNotFoundError("Could not find {:}. Searched in data paths: {:}\n{:}".format(filename, data_paths, err_msg))return found_filesclass HostDeviceMem:"""Pair of host and device memory, where the host memory is wrapped in a numpy array"""def __init__(self, size: int, dtype: np.dtype):nbytes = size * dtype.itemsizehost_mem = cuda_call(cudart.cudaMallocHost(nbytes))pointer_type = ctypes.POINTER(np.ctypeslib.as_ctypes_type(dtype))self._host = np.ctypeslib.as_array(ctypes.cast(host_mem, pointer_type), (size,))self._device = cuda_call(cudart.cudaMalloc(nbytes))self._nbytes = nbytes@propertydef host(self) -> np.ndarray:return self._host@host.setterdef host(self, arr: np.ndarray):if arr.size > self.host.size:raise ValueError(f"Tried to fit an array of size {arr.size} into host memory of size {self.host.size}")np.copyto(self.host[:arr.size], arr.flat, casting='safe')@propertydef device(self) -> int:return self._device@propertydef nbytes(self) -> int:return self._nbytesdef __str__(self):return f"Host:\n{self.host}\nDevice:\n{self.device}\nSize:\n{self.nbytes}\n"def __repr__(self):return self.__str__()def free(self):cuda_call(cudart.cudaFree(self.device))cuda_call(cudart.cudaFreeHost(self.host.ctypes.data))# Allocates all buffers required for an engine, i.e. host/device inputs/outputs.
# If engine uses dynamic shapes, specify a profile to find the maximum input & output size.
def allocate_buffers(engine: trt.ICudaEngine, profile_idx: Optional[int] = None):inputs = []outputs = []bindings = []stream = cuda_call(cudart.cudaStreamCreate())tensor_names = [engine.get_tensor_name(i) for i in range(engine.num_io_tensors)]for binding in tensor_names:# get_tensor_profile_shape returns (min_shape, optimal_shape, max_shape)# Pick out the max shape to allocate enough memory for the binding.shape = engine.get_tensor_shape(binding) if profile_idx is None else engine.get_tensor_profile_shape(binding, profile_idx)[-1]shape_valid = np.all([s >= 0 for s in shape])if not shape_valid and profile_idx is None:raise ValueError(f"Binding {binding} has dynamic shape, " +\"but no profile was specified.")size = trt.volume(shape)if engine.has_implicit_batch_dimension:size *= engine.max_batch_sizedtype = np.dtype(trt.nptype(engine.get_tensor_dtype(binding)))# Allocate host and device buffersbindingMemory = HostDeviceMem(size, dtype)# Append the device buffer to device bindings.bindings.append(int(bindingMemory.device))# Append to the appropriate list.if engine.get_tensor_mode(binding) == trt.TensorIOMode.INPUT:inputs.append(bindingMemory)else:outputs.append(bindingMemory)return inputs, outputs, bindings, stream# Frees the resources allocated in allocate_buffers
def free_buffers(inputs: List[HostDeviceMem], outputs: List[HostDeviceMem], stream: cudart.cudaStream_t):for mem in inputs + outputs:mem.free()cuda_call(cudart.cudaStreamDestroy(stream))# Wrapper for cudaMemcpy which infers copy size and does error checking
def memcpy_host_to_device(device_ptr: int, host_arr: np.ndarray):nbytes = host_arr.size * host_arr.itemsizecuda_call(cudart.cudaMemcpy(device_ptr, host_arr, nbytes, cudart.cudaMemcpyKind.cudaMemcpyHostToDevice))# Wrapper for cudaMemcpy which infers copy size and does error checking
def memcpy_device_to_host(host_arr: np.ndarray, device_ptr: int):nbytes = host_arr.size * host_arr.itemsizecuda_call(cudart.cudaMemcpy(host_arr, device_ptr, nbytes, cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost))def _do_inference_base(inputs, outputs, stream, execute_async):# Transfer input data to the GPU.kind = cudart.cudaMemcpyKind.cudaMemcpyHostToDevice[cuda_call(cudart.cudaMemcpyAsync(inp.device, inp.host, inp.nbytes, kind, stream)) for inp in inputs]# Run inference.execute_async()# Transfer predictions back from the GPU.kind = cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost[cuda_call(cudart.cudaMemcpyAsync(out.host, out.device, out.nbytes, kind, stream)) for out in outputs]# Synchronize the streamcuda_call(cudart.cudaStreamSynchronize(stream))# Return only the host outputs.return [out.host for out in outputs]def do_inference(context, engine, bindings, inputs, outputs, stream):def execute_async_func():context.execute_async_v3(stream_handle=stream)# Setup context tensor address.num_io = engine.num_io_tensorsfor i in range(num_io):context.set_tensor_address(engine.get_tensor_name(i), bindings[i])return _do_inference_base(inputs, outputs, stream, execute_async_func)
http://www.dtcms.com/a/561283.html

相关文章:

  • 用织梦搭建网站外贸网站建设报价
  • 【软考架构】案例分析-web应用设计:SSH 和 SSM(Spring + Spring MVC + MyBatis ) 之间的区别,以及使用场景
  • 天津做网站的公司怎么样东莞全网合一网站
  • ☆☆FPGA图像处理算法开发学习教程——总目录
  • Nestjs框架: 微服务项目工程结构优化与构建方案
  • 佛山建网站的公司泰安房地产信息网官网
  • 重庆—阿尔及利亚南通道铁海联运线路首发
  • 八股训练营第 4 天 | HTTP1.0 和 HTTP1.1 的区别?HTTP2.0 与 HTTP1.1 的区别?HTTP3.0 有了解过吗?
  • 全球搜和外贸快车哪个好厦门seo传播
  • 给传销产品做网站班级优化大师免费下载学生版
  • 工信部 网站备案材料 复印件 电子版哪个网站做外链视频好
  • UE5C++GameplayStatics源代码
  • 关键词解释:梯度下降法(Gradient Descent)
  • 做外贸的网站哪个好湖南人文科技学院
  • deadbeef播放器歌词插件
  • 网站推广有什么好处咨询公司招聘条件
  • 网站定位授权开启权限怎么做精准营销模式
  • Flutter 开发环境配置教程
  • Go Gorm 深度解析:从内部原理到实战避坑指南
  • 保定企业建网站房产网站运营方案
  • 机械动力的能力
  • 山西省旅游网站建设分析廊坊网站制作网站
  • 【YashanDB认证】之二:Docker部署一体YashanDB(YDC,YCM)
  • C语言刷题(一)
  • 电子电气架构(EEA)最新调研-5
  • 【软考架构】案例分析-对比MySQL查询缓存与Memcached
  • 「经典图形题」集合 | C/C++
  • IT4IT是由The Open Group提出的面向数字化转型的IT管理参考架构框架
  • 学校网站怎么做的好南翔做网站公司
  • 解决 CentOS 8 报错:Failed to download metadata for repo ‘BaseOS‘