Skip to content

Worker options

了解建立 worker 可用的選項。

WorkerOptions parameters

建立 worker 的介面是透過 WorkerOptions 類別實現的。以下僅包含部分可用參數。完整清單請參閱 WorkerOptions 參考

Python:

opts = WorkerOptions(
    # 當作業被指派給此 Worker 時,會呼叫 entrypoint 函數
    # 這是 WorkerOptions 中唯一必要的參數
    # https://docs.livekit.io/agents/worker/job/#entrypoint
    entrypoint_fnc,

    # 檢查請求並決定目前 worker 是否應該處理它。
    request_fnc,

    # 一個在新進程啟動時需要被執行的必要初始化的函數。
    prewarm_fnc,

    # 一個收集目前系統負載(CPU 或 RAM 等)的 callback 函數。
    load_fnc,

    # load_fnc 的最大值,超過該值將不會產生新的 job 流程
    load_threshold,

    # 代理是否可以訂閱軌道、發布資料、更新元資料等。
    permissions,

    # 收到 SIGTERM 或 SIGINT 時等待現有作業完成的時間
    drain_timeout,

    # 要建立的工作者類型,JT_ROOM 或 JT_PUBLISHER
    worker_type=WorkerType.ROOM,
)

# start the worker
cli.run_app(opts)

Warn

🔥 Caution

出於安全目的,將 LiveKit API 金鑰和機密設定為環境變量,而不是 WorkerOptions 參數。

Entrypoint

entrypoint_fnc 是每個新作業呼叫的主函數,也是代理應用的核心。要了解更多信息,請參閱作業生命週期文章中的 entrypoint 文件

Python:

async def entrypoint(ctx: JobContext):
    # connect to the room
    await ctx.connect()
    # handle the session
    ...

Request handler

每當 LiveKit 伺服器有代理程式的作業時,都會執行 request_fnc 函數。框架要求 worker 明確接受或拒絕每個 job request。如果接受請求,則會呼叫 entrypoint_fnc 函數。如果請求被拒絕,則會將其傳送到下一個可用的 worker。

預設情況下,如果留空,則行為是自動接受所有傳送給工作器的請求。

Python:

async def request_fnc(req: JobRequest):
    # accept the job request
    await req.accept(
        # the agent's name (Participant.name), defaults to ""
        name="agent",
        # the agent's identity (Participant.identity), defaults to "agent-<jobid>"
        identity="identity",
        # attributes to set on the agent participant upon join
        attributes={"myagent": "rocks"},
    )

    # or reject it
    # await req.reject()

opts = WorkerOptions(entrypoint_fnc=entrypoint, request_fnc=request_fnc)

Prewarm function

出於隔離和效能方面的考慮,agent fraemwork 會在每個代理作業的獨立進程中運行它們。代理通常需要存取需要較長時間載入的模型檔案。為了解決這個問題,您可以使用 prewarm 函數在向其分配任何作業之前預熱該進程。您可以使用 num_idle_processes 參數控制需要預熱的進程數量。

Python:

def prewarm_fnc(proc: JobProcess):
    # 載入 silero 權重並儲存以處理使用者數據
    proc.userdata["vad"] = silero.VAD.load()


async def entrypoint(ctx: JobContext):
    # 存取已載入的 silero 實例
    vad: silero.VAD = ctx.proc.userdata["vad"]

opts = WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm_fnc)

Load function

load_fncload_threshold 控制 worker 何時停止接受新 jobs。

  • load_fnc: 以 0 到 1.0 之間的浮點數傳回 worker 目前負載的函數。
  • load_threshold: Worker 仍能接受新工作的最大負載值。

如果未提供 load_fnc ,則預設使用 worker 在 5 秒視窗內的平均 CPU 使用率。

以下範例展示如何定義一個自訂負載函數,將工作器限制為 9 個併發作業,且不受 CPU 使用率的影響:

from livekit.agents import Worker, WorkerOptions

def compute_load(worker: Worker) -> float:
    return min(len(worker.active_jobs) / 10, 1.0)

opts = WorkerOptions(
    load_fnc=compute_load,
    load_threshold=0.9,
)

Drain timeout

由於 agent sessions 是有狀態的,因此在進程關閉時不應突然終止它們。代理框架支援優雅終止:當收到 SIGTERMSIGINT 訊號時,worker 將進入 draining 狀態。在此狀態下,它會停止接受新任務,但允許現有任務完成,最長可達配置的逾時時間。

drain_timeout 參數設定等待活動作業完成的最長時間。預設值為 30 分鐘。

Permissions

預設情況下,代理可以向同一房間內的其他參與者發布訊息,也可以向他們訂閱訊息。不過,您可以透過設定 WorkerOptions 中的 permissions 參數來自訂這些權限。要查看完整的參數列表,請參閱 WorkerPermissions 參考

Python:

opts = WorkerOptions(
    ...
    permissions=WorkerPermissions(
        can_publish=True,
        can_subscribe=True,
        can_publish_data=True,
        # 設定為 true 時,房間中的其他人將看不到該代理。
        # 隱藏時,它也無法將軌跡發佈到房間,因為它不可見。
        hidden=False,
    ),
)

Worker type

您可以選擇為每個房間或房間內的每個發布者啟動一個新的 agent 實例。您可以在註冊 worker 時進行設定:

Python:

opts = WorkerOptions(
    ...
    # when omitted, the default is WorkerType.ROOM
    worker_type=WorkerType.ROOM,
)

Node.js:

const opts = new WorkerOptions({
  // when omitted, the default is JobType.JT_ROOM
  workerType: JobType.JT_ROOM,
});

WorkerType 枚舉有兩個選項: - ROOM: 為每個房間建立一個新的代理實例。 - PUBLISHER: 為房間中的每個發布者建立一個新的代理實例。

如果代理程式在可能包含多個發布者的房間中執行資源密集型操作(例如,處理來自一組安全攝影機的傳入視訊),則可以將 worker_type 設定為 JT_PUBLISHER,以確保每個發布者都有自己的代理實例。

對於 PUBLISHER jobs,為房間中的每個發布者呼叫一次 entrypoint 函數。 JobContext.publisher 物件包含一個代表該發布者的 RemoteParticipant

Starting the worker

若要使用 WorkerOptions 定義的設定啟動 worker,請呼叫 CLI:

Python:

if __name__ == "__main__":
    cli.run_app(opts)

Node.js:

cli.runApp(opts);

Agents 工作器 CLI 提供了兩個子指令: startdev。前者將原始 JSON 資料輸出到標準輸出,建議用於生產環境。而 dev 則建議用於開發環境,因為它輸出人性化的彩色日誌,並且支援 Python 的熱重載。