⚠️ 注意:有限維護
本專案不再積極維護。現有版本仍然可用,但沒有計劃中的更新、錯誤修復、新功能或安全補丁。使用者應注意漏洞可能無法得到解決。
使用 Torchserve 提供大型模型服務¶
本文件解釋了 Torchserve 如何支援大型模型服務,此處的大型模型指那些無法完全放入單個 GPU 中,因此需要在多個 GPU 上分割成多個分割槽執行的模型。本頁分為以下幾個部分:
工作原理?¶
對於較小模型的 GPU 推理,TorchServe 為每個工作程序執行一個獨立的程序,並分配一個 GPU。對於大型模型推理,模型需要分割到多個 GPU 上。有不同的模式可以實現這種分割,通常包括流水線並行 (PP)、張量並行或它們的組合。選擇哪種模式以及如何實現分割取決於所用框架的具體實現。TorchServe 允許使用者使用任何框架進行模型部署,並透過靈活的配置來適應各種框架的需求。一些框架要求為每個 GPU 執行單獨的程序(如 PiPPy, Deep Speed),而另一些則要求單個程序分配所有 GPU(如 vLLM)。如果需要多個程序,TorchServe 會利用 torchrun 為工作程序設定分散式環境。在設定過程中,torchrun 會為分配給工作程序的每個 GPU 啟動一個新的程序。是否使用 torchrun 取決於引數 parallelType,該引數可以在 model-config.yaml 中設定為以下選項之一:
pp- 用於流水線並行tp- 用於張量並行pptp- 用於流水線 + 張量並行custom
前三個選項使用 torchrun 設定環境,而“custom”選項則將並行化方式留給使用者,並將分配給工作程序的 GPU 分配給單個程序。分配的 GPU 數量由 torchrun 啟動的程序數(即透過 nproc-per-node 配置)或引數 parallelLevel 確定。這意味著如果設定了 nproc-per-node,則不應設定引數 parallelLevel,反之亦然。
預設情況下,TorchServe 使用輪詢演算法將 GPU 分配給主機上的工作程序。對於大型模型推理,分配給每個工作程序的 GPU 數量會根據 model_config.yaml 中指定的 GPU 數量自動計算。CUDA_VISIBLE_DEVICES 根據此數量設定。
例如,假設一個節點上有八個 GPU,一個工作程序需要 4 個 GPU(即 nproc-per-node=4 或 parallelLevel=4)。在這種情況下,TorchServe 會將 CUDA_VISIBLE_DEVICES=”0,1,2,3” 分配給 worker1,將 CUDA_VISIBLE_DEVICES=”4,5,6,7” 分配給 worker2。
除了這種預設行為,TorchServe 還提供了靈活性,允許使用者為工作程序指定 GPU。例如,如果使用者在模型配置 YAML 檔案中設定了“deviceIds: [2,3,4,5]”,並將 nproc-per-node(或 parallelLevel)設定為 2,則 TorchServe 會將 CUDA_VISIBLE_DEVICES=”2,3” 分配給 worker1,將 CUDA_VISIBLE_DEVICES=”4,5” 分配給 worker2。
以 PiPPy 整合為例,下圖展示了 TorchServe 大型模型推理的內部工作原理。有關使用 vLLM 的示例,請參見此示例。

PiPPy(PyTorch 大型模型推理原生解決方案)¶
PiPPy 為無法完全放入單個 GPU 的大型模型提供流水線並行支援。它會將您的模型分割成指定裝置數量的、大小相等的部分(階段)。然後使用微批處理來執行您的批處理輸入進行推理(對於批次大小 >1 的情況更最佳化)。
如何在 Torchserve 中使用 PiPPy¶
要在 Torchserve 中使用 PiPPy,我們需要使用繼承自 base_pippy_handler 的自定義處理程式,並將我們的設定放入 model-config.yaml 中。
Torchserve 中的自定義處理程式是一個簡單的 Python 指令碼,它定義了特定於您的工作流的模型載入、預處理、推理和後處理邏輯。
它看起來像下面這樣:
建立 custom_handler.py 或其他具有描述性的名稱的檔案。
#DO import the necessary packages along with following
from ts.torch_handler.distributed.base_pippy_handler import BasePippyHandler
from ts.handler_utils.distributed.pt_pippy import initialize_rpc_workers, get_pipline_driver
class ModelHandler(BasePippyHandler, ABC):
def __init__(self):
super(ModelHandler, self).__init__()
self.initialized = False
def initialize(self, ctx):
model = # load your model from model_dir
self.device = self.local_rank % torch.cuda.device_count()# being used to move model inputs to (self.device)
self.model = get_pipline_driver(model,self.world_size, ctx)
您的 model-config.yaml 檔案需要如下所示,該配置檔案非常靈活,您可以新增與前端、後端和處理程式相關的設定。
#frontend settings
minWorkers: 1
maxWorkers: 1
maxBatchDelay: 100
responseTimeout: 120
deviceType: "gpu"
parallelType: "pp" # options depending on the solution, pp(pipeline parallelism), tp(tensor parallelism), pptp ( pipeline and tensor parallelism)
# This will be used to route input to either rank0 or all ranks from fontend based on the solution (e.g. DeepSpeed support tp, PiPPy support pp)
torchrun:
nproc-per-node: 4 # specifies the number of processes torchrun starts to serve your model, set to world_size or number of
# gpus you wish to split your model
#backend settings
pippy:
chunks: 1 # This sets the microbatch sizes, microbatch = batch size/ chunks
input_names: ['input_ids'] # input arg names to the model, this is required for FX tracing
model_type: "HF" # set the model type to HF if you are using Huggingface model other wise leave it blank or any other model you use.
rpc_timeout: 1800
num_worker_threads: 512 #set number of threads for rpc worker init.
handler:
max_length: 80 # max length of tokens for tokenizer in the handler
如何在處理程式中訪問它? 這是一個例子:
def initialize(self, ctx):
model_type = ctx.model_yaml_config["pippy"]["model_type"]
其餘部分與 Torchserve 中的常規操作相同,基本上是打包您的模型並啟動伺服器。
打包模型的命令示例,請確保傳遞 model-config.yaml 檔案:
torch-model-archiver --model-name bloom --version 1.0 --handler pippy_handler.py --extra-files $MODEL_CHECKPOINTS_PATH -r requirements.txt --config-file model-config.yaml --archive-format tgz
張量並行支援正在進行中,一旦就緒就會新增。
DeepSpeed¶
DeepSpeed-Inference 是微軟的一個開源專案。它為服務無法完全放入單個 GPU 記憶體中的大型基於 Transformer 的 PyTorch 模型提供了模型並行支援。
如何在 TorchServe 中使用 DeepSpeed¶
要在 TorchServe 中使用 DeepSpeed,我們需要使用繼承自 base_deepspeed_handler 的自定義處理程式,並將我們的設定放入 model-config.yaml 中。
它看起來像下面這樣:
建立 custom_handler.py 或其他具有描述性的名稱的檔案。
#DO import the necessary packages along with following
from ts.handler_utils.distributed.deepspeed import get_ds_engine
from ts.torch_handler.distributed.base_deepspeed_handler import BaseDeepSpeedHandler
class ModelHandler(BaseDeepSpeedHandler, ABC):
def __init__(self):
super(ModelHandler, self).__init__()
self.initialized = False
def initialize(self, ctx):
model = # load your model from model_dir
ds_engine = get_ds_engine(self.model, ctx)
self.model = ds_engine.module
self.initialized = True
您的 model-config.yaml 檔案需要如下所示,該配置檔案非常靈活,您可以新增與前端、後端和處理程式相關的設定。
#frontend settings
minWorkers: 1
maxWorkers: 1
maxBatchDelay: 100
responseTimeout: 120
deviceType: "gpu"
parallelType: "tp" # options depending on the solution, pp(pipeline parallelism), tp(tensor parallelism), pptp ( pipeline and tensor parallelism)
# This will be used to route input to either rank0 or all ranks from fontend based on the solution (e.g. DeepSpeed support tp, PiPPy support pp)
torchrun:
nproc-per-node: 4 # specifies the number of processes torchrun starts to serve your model, set to world_size or number of
# gpus you wish to split your model
#backend settings
deepspeed:
config: ds-config.json # DeepSpeed config json filename.
# Details:https://www.deepspeed.ai/docs/config-json/
handler:
max_length: 80 # max length of tokens for tokenizer in the handler
這裡是一個 ds-config.json 的例子:
{
"dtype": "torch.float16",
"replace_with_kernel_inject": true,
"tensor_parallel": {
"tp_size": 2
}
}
安裝 DeepSpeed
方法 1: requirements.txt
方法 2: 透過命令預安裝(推薦用於加快模型載入速度)
# See https://www.deepspeed.ai/tutorials/advanced-install/
DS_BUILD_OPS=1 pip install deepspeed
其餘部分與 Torchserve 中的常規操作相同,基本上是打包您的模型並啟動伺服器。
打包模型的命令示例,請確保傳遞 model-config.yaml 檔案:
# option 1: Using model_dir
torch-model-archiver --model-name bloom --version 1.0 --handler deepspeed_handler.py --extra-files $MODEL_CHECKPOINTS_PATH,ds-config.json -r requirements.txt --config-file model-config.yaml --archive-format tgz
# option 2: Using HF model_name
torch-model-archiver --model-name bloom --version 1.0 --handler deepspeed_handler.py --extra-files ds-config.json -r requirements.txt --config-file model-config.yaml --archive-format
DeepSpeed MII¶
如果使用此處所示的受支援模型之一,您可以利用 Deep Speed MII 的優勢。Deep Speed MII 使用 Deep Speed Inference 以及深度學習領域的進一步進展來最小化延遲並最大化吞吐量。它針對特定的模型型別、模型大小、批次大小和可用硬體資源進行最佳化。
有關如何在受支援模型上利用 Deep Speed MII 的更多資訊,請參閱此處的資訊。您還可以在此處找到如何將其應用於 TorchServe 的示例。
使用 Accelerate 提供大型 Hugging Face 模型服務¶
如果使用大型 Hugging Face 模型但資源有限,您可以使用 accelerate 來提供這些模型服務。為此,您需要在 setup_config.json 檔案中設定 low_cpu_mem_usage=True 和設定 `device_map=”auto”。
有關將 accelerate 與大型 Hugging Face 模型一起使用的更多資訊,請參閱此示例。
大型模型推理技巧¶
減少模型載入延遲¶
為了減少模型延遲,我們建議:
在容器或主機上預安裝模型並行庫,例如 Deepspeed。
預下載模型檢查點。例如,如果使用 HuggingFace,可以透過 Download_model.py 指令碼預下載預訓練模型。
設定環境變數 HUGGINGFACE_HUB_CACHE 和 TRANSFORMERS_CACHE
透過工具 Download_model.py 將模型下載到 HuggingFace 快取目錄。
調整模型配置 YAML 檔案¶
您可以調整模型配置 YAML 檔案以獲得更好的效能,方法如下:
如果模型推理延遲較高導致響應超時,請更新 responseTimeout。
如果模型載入延遲較高導致啟動超時,請更新 startupTimeout。
調整 torchrun 引數。支援的引數定義在此處。例如,預設情況下,
OMP_NUMBER_THREADS為 1。可以在 YAML 檔案中修改此引數。
#frontend settings
torchrun:
nproc-per-node: 4 # specifies the number of processes torchrun starts to serve your model, set to world_size or number of
# gpus you wish to split your model
OMP_NUMBER_THREADS: 2
延遲敏感型應用¶
Job Ticket¶
Job Ticket 功能推薦用於延遲敏感型推理場景。啟用 Job Ticket 後,TorchServe 會驗證模型是否存在可用於處理客戶端請求的活動工作程序。如果存在活動工作程序,請求會被接受並立即處理,無需等待作業佇列或動態批處理;否則,會向客戶端返回 503 響應。
此功能有助於處理推理延遲可能較高的使用案例,例如生成模型、像 chatGPT 這樣的自迴歸解碼器模型。此功能幫助這類應用根據業務需求採取有效行動,例如將拒絕的請求路由到不同的伺服器,或擴充套件模型伺服器容量。以下是啟用 Job Ticket 的示例:
minWorkers: 2
maxWorkers: 2
jobQueueSize: 2
useJobTicket: true
在此示例中,一個模型有 2 個工作程序,作業佇列大小為 2。推理請求要麼立即由 TorchServe 處理,要麼被拒絕並返回響應程式碼 503。
透過 HTTP 1.1 分塊編碼進行流式響應¶
TorchServe 的推理 API 支援流式響應,允許透過 HTTP 1.1 分塊編碼傳送一系列推理響應。此功能僅推薦用於完整響應推理延遲較高且推理中間結果會發送給客戶端的場景。一個例子是用於生成應用的 LLMs,其中生成“n”個 token 的延遲可能很高。在這種情況下,使用者可以在每個生成的 token 就緒時接收,直到完整響應完成。要實現流式響應,後端處理程式呼叫“send_intermediate_predict_response”將一箇中間結果傳送到前端,並以現有方式返回最後一個結果。例如,
from ts.handler_utils.utils import send_intermediate_predict_response
''' Note: TorchServe v1.0.0 will deprecate
"from ts.protocol.otf_message_handler import send_intermediate_predict_response".
Please replace it with "from ts.handler_utils.utils import send_intermediate_predict_response".
'''
def handle(data, context):
if type(data) is list:
for i in range (3):
send_intermediate_predict_response(["intermediate_response"], context.request_ids, "Intermediate Prediction success", 200, context)
return ["hello world "]
客戶端接收分塊資料。
import test_utils
def test_echo_stream_inference():
test_utils.start_torchserve(no_config_snapshots=True, gen_mar=False)
test_utils.register_model('echo_stream',
'https://torchserve.pytorch.org/mar_files/echo_stream.mar')
response = requests.post(TF_INFERENCE_API + '/predictions/echo_stream', data="foo", stream=True)
assert response.headers['Transfer-Encoding'] == 'chunked'
prediction = []
for chunk in (response.iter_content(chunk_size=None)):
if chunk:
prediction.append(chunk.decode("utf-8"))
assert str(" ".join(prediction)) == "hello hello hello hello world "
test_utils.unregister_model('echo_stream')
GRPC 伺服器端流¶
TorchServe GRPC API 添加了推理 API “StreamPredictions” 的伺服器端流支援,允許透過同一個 GRPC 流傳送一系列推理響應。此 API 僅推薦用於完整響應推理延遲較高且推理中間結果會發送給客戶端的場景。一個例子是用於生成應用的 LLMs,其中生成“n”個 token 的延遲可能很高。與 HTTP 1.1 分塊編碼類似,透過此功能,使用者可以在每個生成的 token 就緒時接收,直到完整響應完成。此 API 自動強制 batchSize 為 1。
service InferenceAPIsService {
// Check health status of the TorchServe server.
rpc Ping(google.protobuf.Empty) returns (TorchServeHealthResponse) {}
// Predictions entry point to get inference using default model version.
rpc Predictions(PredictionsRequest) returns (PredictionResponse) {}
// Streaming response for an inference request.
rpc StreamPredictions(PredictionsRequest) returns (stream PredictionResponse) {}
}
後端處理程式呼叫“send_intermediate_predict_response”將一箇中間結果傳送到前端,並以現有方式返回最後一個結果。例如
from ts.handler_utils.utils import send_intermediate_predict_response
''' Note: TorchServe v1.0.0 will deprecate
"from ts.protocol.otf_message_handler import send_intermediate_predict_response".
Please replace it with "from ts.handler_utils.utils import send_intermediate_predict_response".
'''
def handle(data, context):
if type(data) is list:
for i in range (3):
send_intermediate_predict_response(["intermediate_response"], context.request_ids, "Intermediate Prediction success", 200, context)
return ["hello world "]