Apache Spark, büyük veri işleme ve analizinde sektör lideri bir platformdur. Yapısal olmayan verilere—belgeler, e-postalar, multimedya içerikler—artan taleple birlikte derin öğrenme (DL) ve büyük dil modelleri (LLM’ler) modern veri analitiği sürecinin temel bileşenleri haline gelmiştir. Bu modeller, görüntü açıklama, anlamsal etiketleme, belge özetleme gibi çeşitli alt görevleri gerçekleştirme yeteneği sağlar.
Ancak, GPU yoğun DL’i Spark ile birleştirmek tarihsel olarak zorlu bir süreç olmuştur. NVIDIA RAPIDS Accelerator for Apache Spark ve Spark RAPIDS ML kütüphanesi, veri çıkarımı, dönüştürme ve yükleme (ETL) ile klasik makine öğrenimi (ML) iş yükleri için kesintisiz GPU hızlandırması sağlar.
Son zamanlarda, Spark üzerinde dağıtık eğitim ve çıkarma için yeni API’ler önemli ilerlemeler sağlamıştır. Bu yazı, bu çalışmanın üzerine inşa ederek Spark üzerinde dağıtık çıkarım için en iyi uygulamaları tanıtmaktadır. NVIDIA Triton Inference Server gibi hizmet platformlarıyla entegrasyonu, vLLM ile performanslı LLM çıkarımı ve bulut platformlarında dağıtımı göstereceğiz.
Neden Batch Çıkarma?
Gerçek zamanlı çıkarım, etkileşimli uygulamalar için en uygun seçenekken, batch çıkarım büyük veri setlerini tek seferde işlemek için ölçeklenebilir, yüksek hacimli bir paradigma sunar. Bazı önemli kullanım senaryoları şunlardır:
- Anlamsal Arama: Büyük içerik havuzları için gömme ve anlamsal meta veriler üretmek, arama kalitesini artırır.
- Veri Dönüşümü: Yapısal şemalara dönüştürülebilen serbest metin veya görüntüler gibi yapısal olmayan veri setlerini çeviri, özetleme veya dönüştürme işlemleri.
- İçerik Üretimi: Büyük ölçekli içerik üretimi için otomatik olarak ürün açıklamaları, görüntü başlıkları, sosyal medya gönderileri veya pazarlama metinleri oluşturma.
DL/LLM modellerinin mevcut Spark boru hattına entegrasyonu, DL ve yaratıcı yapay zekanın yeteneklerini kurumsal verilerinize doğrudan getiren birleşik bir iş akışı sunar. Şimdi, bir özet ile başlayarak Spark’ın predict_batch_udf
API’sinin uygulamalarını keşfedelim.
Temel Dağıtım: predict_batch_udf ile Dağıtık Çıkarma
Spark 3.4, derin öğrenme modeli çıkarımı için basit bir arayüz sunan predict_batch_udf API’sini tanıtmıştır. Bu API, Spark DataFrame sütunlarını batched NumPy girdilerine otomatik dönüşüm sağlar ve modelleri Spark yürütücülerinde önbelleğe alır. Daha fazla bilgi için, “Distributed Deep Learning Made Easy with Spark 3.4” çalışmasına göz atabilirsiniz.
Örneğin, aşağıdaki kod, metin verisi içeren bir Spark DataFrame üzerinde dağıtık metin gömme işlemi gerçekleştirmek için Huggingface Sentence Transformers‘ı kullanarak gösterilmektedir:
from pyspark.sql.functions import predict_batch_udf
from pyspark.sql.types import *
def predict_batch_fn():
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("paraphrase-MiniLM-L6-v2", device="cuda")
def predict(inputs):
return model.encode(inputs)
return predict
embed_udf = predict_batch_udf(predict_batch_fn,
return_type=ArrayType(FloatType()),
batch_size=128)
df = spark.read.parquet("/path/to/text_data")
embeddings_df = df.withColumn("embedding", embed("text"))
embeddings_df.write.parquet("/path/to/embeddings")
Bu bir veri-paralel mimaridir (Şekil 1), burada her Python işçisi modelin bir kopyasını GPU’ya yükler ve verisetinin kendi bölümünde tahmin yapar.

Bu doğrudan çıkarım yaklaşımı sayesinde mevcut PyTorch, TensorFlow veya Huggingface çerçeve kodunuzu Spark’a taşımada minimum kod değişikliği ile dağıtık çıkarım gerçekleştirebilirsiniz.
Ancak, büyük modellerle çalışırken GPU’ya birden fazla model kopyası yüklemek sorunlu olabilir. Bu sorunun üstesinden gelmek için çıkarım sunumunun nasıl çalıştığını tartışacağız ve kaynak ayrımını nasıl geliştirdiğini ele alacağız.
Gelişmiş Dağıtım: Dağıtık Çıkarma Sunumu
Temel yaklaşımda, predict_batch_udf
ile paralel olarak görevler çalıştırmak, her Python işçisinin GPU üzerinde bir model kopyasını yüklemesine neden olur. Sonuç olarak, bellek hatası veya aşırı yükleme sorunları yaşamamak için yürütücüler başına kaç görev ayarlanacağını belirlemeniz gerekir. Tamamen GPU belleğini kaplayan büyük modeller—LLM’ler gibi—yürütücü başına bir görevle sınırlanmayı gerektirebilir (örneğin, spark.task.resource.gpu.amount=1
ayarı ile, uygulamanın tamamı için) (Şekil 2).

predict_batch_udf
‘nin bu sınırlaması, Spark zamanlamasında bir zorluğu vurgular: tüm görevleri eşit olarak değerlendirir, CPU ve GPU kaynak kullanımını ayırmaksızın işler.
Çıkarma sunumu, bunu GPU yürütmesini Spark görev zamanlamasından ayrıştırarak çözmektedir. Modelleri her Spark görevinde yüklemek yerine, her yürütücüde özel bir çıkarım sunucusu dağıtabiliriz. Birçok görev, veriyi paralel olarak yükleyebilir, ön işleyebilir ve yazabilirken, sunucu GPU’yu çıkarım için kullanacak şekilde yapılandırılır.

CPU ve GPU paralelliği arasında mantıksal bir ayrım sağlayarak, çıkarım sunumu GPU belleği açısından yürütücü başına görevleri ayarlama gereğini ortadan kaldırır. Ayrıca, model yönetimi ve dinamik toplama gibi sunum özelliklerinin kolay entegrasyonunu sağlar.
Sunucuları Spark kümeleri arasında başlatıp yönetmek için Spark-RAPIDS-Examples DL Inference repo‘da sunucu yardımcı programları sağlıyoruz. Bu, NVIDIA Triton Inference Server ve vLLM‘yi destekler. Bu örneklerin aktif olarak gelişmekte olduğunu ve belki de NVIDIA Dynamo ve NVIDIA NIM’ler gibi çıkarım çözümlerine destek eklemeyi hedeflediğimizi belirtmek gerekir.
Triton Çıkarma Sunucusu ile Sunum
NVIDIA Triton Inference Server, yüksek performanslı model sunumu için endüstri standardı bir platformdur. Birçok önemli özellikleri destekler: model bileşimleri, eş zamanlı yürütme ve dinamik toplama gibi. Triton genellikle bir Docker konteyneri içinde çalıştığı için, bulut tabanlı Spark ortamlarında konteynerlerin içinde çalışan yürütücülerle dağıtımı zorluklar doğurur; bunlar ayrıcalık gereksinimleri ve kaynak izolasyonu eksikliği gibi sorunları içerir.
Şans eseri, PyTriton, Triton’u doğrudan bir Python sürecinde çalıştırmak için Python doğasıyla yazılmış bir arayüz sunar ve bu da bulutta dağıtımı basitleştirir. PyTriton dağıtımına dair temel bilgiler için bu bloga göz atabilirsiniz.
server_utils
modülü Spark-RAPIDS-Examples DL Inference repo‘da, sunucuların yaşam döngüsünü yönetmek için bir TritonServerManager
sağlar. Bu, mevcuttaki bağlantı noktalarını bulma ve atama, her yürütücüde bir sunucu süreci başlatma ve çıkarım sonrası düzgün bir kapama işlemini yönetme gibi işlemleri içerir.
Bu sınıf ile Triton sunucularını dağıtmak için adımlar basittir:
triton_server
işlevini, çıkarım çerçevesi kodunu içeren PyTriton sunucu mantığı ile tanımlayın.- Model adı ve yolu ile
TritonServerManager
‘ın başlatın. TritonServerManager.start_servers
(triton_server
) çağrısı yapın ve sunucu işlevini küme genelinde dağıtın.
Bu adımları daha iyi anlamak için kodu inceleyelim. Öncelikle, triton_server
işlevini tanımlayalım. Kısalık açısından bunu atladık—kullanıcıların çerçeve dahilinde örnekler bulabileceği notlara göz atabilirsiniz.
def triton_server(ports: List[int], model_path: str):
# GPU'ya modeli yükle, çıkarım mantığını tanımla, sunucuya bağla
Sunucu mantığı tanımlandıktan sonra, model adı ve yolu ile sunucu yöneticisini başlatın ve sunucuları başlatırken triton_server
işlevini geçirin:
from server_utils import TritonServerManager
server_manager = TritonServerManager(model_name="my-model", model_path="path/to/my-model")
server_manager.start_servers(triton_server)
host_to_grpc_url = server_manager.host_to_grpc_url
ServerManager, sürücü üzerinde her yürütücüye bir başlatma görevi dağıtır ve bu görevler kullanıcı tanımlı triton_server
işlevini çalıştıran bir Python süreci başlatır (Şekil 4).

start_servers()
işlevi, her yürütücüde triton_server
sürecinin dağıtımını başlatır.Sonrasında, predict_batch_udf
kullanarak bir grup girdi ön işleme yapın ve tahminleri sunucuya göndermek için PyTriton’unModelClient API’sini kullanın.
def triton_predict_fn(model_name, host_to_url):
import socket
from pytriton.client import ModelClient
url = host_to_url.get(socket.gethostname())
def infer_batch(inputs):
with ModelClient(url, model_name) as client:
# Ön işleme yap...
result_data = client.infer_batch(inputs) # Sunucuya grubu gönder
return result_data["predictions"] # Tahminleri döndür
return infer_batch
predict_udf = predict_batch_udf(partial(triton_predict_fn, model_name="my-model", host_to_url=host_to_grpc_url),
return_type=ArrayType(FloatType()),
batch_size=32)
# Çıkarma işlemini çalıştır
df = spark.read.parquet("/path/to/my-data")
predictions_df = df.withColumn("predictions", predict_udf(col("data")))
predictions_df.write.parquet("/path/to/predictions.parquet")
# İşimiz bitince sunucuları durdur
server_manager.stop_servers()
Görün ki, UDF’de yapılan yükleme ve ön işleme işlemleri artık CPU üzerinde gerçekleştirilir ve bunlar GPU’daki çıkarımdan ayrılmıştır. Bu sayede Spark, bu görevleri paralel bir şekilde planlama yapabilir ve GPU belleğinde ekstra model kopyaları oluşturma gereğinden kaçınabilir.
vLLM Sunucu ile Sunum
Triton, özelleşmiş çıkarım mantığını, çoklu çerçeveleri ve çeşitli model türlerini yönetmede mükemmel olsa da, vLLM, LLM’ler için özel olarak optimize edilmiş basit bir sunum alternatifidir. Üretim dağıtımları için OpenAI uyumlu bir HTTP sunucusu sağlar.
vLLM sunumunu Spark üzerinde kullanmak için yardımcı sınıfımızdaki VLLMServerManager‘ı kullanıyoruz. Önceki Şekil 3 gibi, bu yaklaşım her Spark yürütücüsünde bir vLLM sunucu süreci başlatır ve CPU ile GPU yürütmesini ayırır. Özelleştirilmiş bir sunucu işlevi yerine, sunucuları başlatırken desteklenen vLLM CLI argümanlarından herhangi birini geçebilirsiniz:
from server_utils import VLLMServerManager
server_manager = VLLMServerManager(model_name="qwen-2.5-7b",
model_path="/path/to/Qwen2.5-7B")
server_manager.start_servers(gpu_memory_utilization=0.95,
max_model_len=6600,
task="generate")
host_to_http_url = server_manager.host_to_http_url
Benzer bir şekilde, sunucuya istek göndererek dağıtık çıkarım gerçekleştirebilirsiniz. Bu durumda, Open-AI uyumlu JSON formatını kullanarak bunu gerçekleştirebilirsiniz:
def vllm_fn(model_name, host_to_url):
import socket
import numpy as np
import requests
url = host_to_url[socket.gethostname()]
def predict(inputs):
response = requests.post(
f"{url}/v1/completions",
json={
"model": model_name,
"prompt": inputs.tolist(),
"max_tokens": 256,
}
)
return np.array([r["text"] for r in response.json()["choices"]])
return predict
generate = predict_batch_udf(partial(vllm_fn, model_name="qwen-2.5-7b", host_to_url=host_to_http_url),
return_type=StringType(),
batch_size=32)
# Çıkarma işlemini çalıştır
preds = text_df.withColumn("response", generate("prompt"))
# İşimiz bitince sunucuları durdur
server_manager.stop_servers()
Özet: Dağıtım Stratejinizi Seçme
Her iki dağıtım yaklaşımını da inceledikten sonra, uygulamanızı yönlendirecek olan güçlü ve zayıf yönlerini karşılaştırabiliriz. Genelde, basit prototiplendirme için temel yaklaşımı, esneklik ve kaynak ayrımı temizliği içinse gelişmiş yaklaşımı öneriyoruz. Aşağıda bu konuda bir tablo bulunmaktadır:
Değerlendirme | Temel Dağıtım (predict_batch_udf) | Gelişmiş Dağıtım (Çıkarma Sunucusu) |
Kaynak Yönetimi | GPU belleğini konumlandırmak için görev paralelliğini ayarlamak gerekmektedir. | —CPU ve GPU zamanlamasını ayırdığı için görev paralelliği ayarı gerekmez. |
Kurulum Karmaşıklığı | Basit, doğrudan çerçeve kodunuzu aktarın. | Sunucu yöneticisi kullanımı ile basit, fakat ek istemci/sunucu kodu gerektirir. |
Çıkarma Özellikleri | Çerçevenin sunduğu özelliklerle sınırlı | Ek sunucuya özgü özellikler (dinamik toplama, model bileşimleri) |
Taşınabilirlik | Çıkarma kodu UDF’ye özeldir. | Sunucu içinde bir kez tanımlanan çıkarım mantığını çevrimiçi/çevrimdışı uygulamalarda yeniden kullanın. |
En İyisi | Küçük modeller, basit boru hatları ve prototipleme için | Daha büyük modeller ve karmaşık boru hatları için |
Bulut Platformlarında Dağıtım
Daha önceki blogumuz yerel dağıtımı göstermişti, fakat Spark-RAPIDS-Examples DL Inference repo‘nu güncelledik ve ihtiyaç duyduğunuz her şeyi bulut SPARK kümelerinde DL/LLM çıkarım iş yüklerini dağıtmak için sağlıyoruz.
Buluta Hazır Şablonlar
CSP talimatları, Databricks ve Dataproc Spark ortamlarına yönelik dağıtım ve çalışma işlemeni ayarlamak için buluta hazır şablonlar sağlar. Bu, aşağıdakileri kapsar:
- Grupları başlatıp gerekli kütüphaneleri yüklemek için önceden yapılandırılmış başlatma betikleri.
- Batch çıkarım için önerilen Spark yapılandırmaları.
- Modelleri bulut depolama alanına kaydetme ve yükleme konusunda en iyi uygulamalar.
Notlar, hiçbir kod değişikliği gerektirmeden, yerel bağımsız, Databricks veya Dataproc kümeleri dahil her ortamda uçtan uca çalışacak şekilde yapılandırılmıştır.
GPU Örneklerini Yapılandırma
Örnekleri çalıştırmak için, yeterli GPU belleğini sağlamak amacıyla A10/L4 veya daha yeni GPU örneklerini öneriyoruz (örneğin, NVadsA10, g5/g6, g2-standard). Büyük LLM’ler için A100/H100 GPU’ları daha iyi bir seçimdir—örneğin, yarı hassasiyetle çalışan Llama 70b, iki H100’de rahatlıkla çalışır.
Büyük modeller için çoklu GPU’lara yaymak sıklıkla gerekli olmaktadır ve birden fazla GPU’nun bulunduğu Spark kümelerinde bunu gerçekleştirmek mümkündür. spark.executor.resource.gpu.amount=(node başına GPU)
ayarını yaparak, her yürütücüye gerekli GPU’ları tahsis edin. Böylelikle çıkarım sunucularına görünür hale gelir. Model daha sonra çerçeve aracılığıyla eşit olarak paralelleştirilebilir: örneğin, vLLM ile tensor_parallel_size=(node başına GPU)
ayarını yaparak. vLLM tensor paralelliği örneğine göz atabilirsiniz.
Başlarken
Başlamak için, örnek notlara göz atabilir, bunlar açık kaynaklı modeller ve veri setleri kullanarak görüntü sınıflandırması, duygu analizi, belge özetleme gibi bir dizi kullanımı gösteren uçtan uca Spark uygulamalarını içermektedir. Bu uygulamaları bulutta dağıtmak için lütfen CSP platformlarında çalışma kılavuzumuzu inceleyin.