Sending text¶
使用文字流在參與者之間發送任意數量的文字。
Overview¶
文字流提供了一種在參與者之間即時發送文字的簡單方法,支援聊天、串流 LLM 回應等用例。每個單獨的串流都與一個主題相關聯,並且您必須註冊一個處理程序 (handler) 來接收該主題的傳入流。流可以針對特定參與者或整個房間。
若要傳送其他類型的數據,請改用 byte streams。
Sending all at once¶
當整個字串預先可用時,使用 sendText
方法。輸入字串會自動分塊並串流傳輸,因此字串大小沒有限制。
Streaming incrementally¶
如果您的文字是增量生成的,請使用 streamText
開啟流寫入器。發送完資料後,必須明確關閉流。
writer = await room.local_participant.stream_text(
topic="my-topic",
)
print(f"Opened text stream with ID: {writer.stream_id}")
# In a real application, you might receive chunks of text from an LLM or other source
text_chunks = ["Lorem ", "ipsum ", "dolor ", "sit ", "amet..."]
for chunk in text_chunks:
await writer.write(chunk)
await writer.close()
print(f"Closed text stream with ID: {writer.stream_id}")
const streamWriter = await room.localParticipant.streamText({
topic: 'my-topic',
});
console.log(`Opened text stream with ID: ${streamWriter.info.id}`);
// In a real app, you would generate this text asynchronously / incrementally as well
const textChunks = ["Lorem ", "ipsum ", "dolor ", "sit ", "amet..."]
for (const chunk of textChunks) {
await streamWriter.write(chunk)
}
// The stream must be explicitly closed when done
await streamWriter.close();
console.log(`Closed text stream with ID: ${streamWriter.info.id}`);
const streamWriter = await room.localParticipant.streamText({
topic: 'my-topic',
});
console.log(`Opened text stream with ID: ${streamWriter.info.id}`);
// In a real app, you would generate this text asynchronously / incrementally as well
const textChunks = ["Lorem ", "ipsum ", "dolor ", "sit ", "amet..."]
for (const chunk of textChunks) {
await streamWriter.write(chunk)
}
// The stream must be explicitly closed when done
await streamWriter.close();
console.log(`Closed text stream with ID: ${streamWriter.info.id}`);
let writer = try await room.localParticipant
.streamText(for: "my-topic")
print("Opened text stream with ID: \(writer.info.id)")
// In a real application, you might receive chunks of text from an LLM or other source
let textChunks = ["Lorem ", "ipsum ", "dolor ", "sit ", "amet..."]
for chunk in textChunks {
try await writer.write(chunk)
}
// The stream must be explicitly closed when done
try await writer.close()
print("Closed text stream with ID: \(writer.info.id)")
let options = StreamTextOptions {
topic: "my-topic".to_string(),
..Default::default()
};
let stream_writer = room.local_participant()
.stream_text(options).await?;
let id = stream_writer.info().id.clone();
println!("Opened text stream with ID: {}", id);
let text_chunks = ["Lorem ", "ipsum ", "dolor ", "sit ", "amet..."];
for chunk in text_chunks {
stream_writer.write(&chunk).await?;
}
// The stream can be closed explicitly or will be closed implicitly
// when the last writer is dropped
stream_writer.close().await?;
println!("Closed text stream with ID: {}", id);
// In a real application, you would generate this text asynchronously / incrementally as well
textChunks := []string{"Lorem ", "ipsum ", "dolor ", "sit ", "amet..."}
writer := room.LocalParticipant.SendText(livekit.StreamTextOptions{
Topic: "my-topic",
})
for i, chunk := range textChunks {
// Close the stream when the last chunk is sent
onDone := func() {
if i == len(textChunks) - 1 {
writer.Close()
}
}
writer.Write(chunk, onDone)
}
fmt.Printf("Closed text stream with ID: %s\n", writer.Info.ID)
Handling incoming streams¶
無論資料是透過 sendText
還是 streamText
發送的,它總是以流的形式接收。您必須註冊一個處理程序來接收它。
import asyncio
# Store active tasks to prevent garbage collection
_active_tasks = set()
async def async_handle_text_stream(reader, participant_identity):
info = reader.info
print(
f'Text stream received from {participant_identity}\n'
f' Topic: {info.topic}\n'
f' Timestamp: {info.timestamp}\n'
f' ID: {info.id}\n'
f' Size: {info.size}' # Optional, only available if the stream was sent with `send_text`
)
# Option 1: Process the stream incrementally using an async for loop.
async for chunk in reader:
print(f"Next chunk: {chunk}")
# Option 2: Get the entire text after the stream completes.
text = await reader.read_all()
print(f"Received text: {text}")
def handle_text_stream(reader, participant_identity):
task = asyncio.create_task(async_handle_text_stream(reader, participant_identity))
_active_tasks.add(task)
task.add_done_callback(lambda t: _active_tasks.remove(t))
room.register_text_stream_handler(
"my-topic",
handle_text_stream
)
room.registerTextStreamHandler('my-topic', (reader, participantInfo) => {
const info = reader.info;
console.log(
`Received text stream from ${participantInfo.identity}\n` +
` Topic: ${info.topic}\n` +
` Timestamp: ${info.timestamp}\n` +
` ID: ${info.id}\n` +
` Size: ${info.size}` // Optional, only available if the stream was sent with `sendText`
);
// Option 1: Process the stream incrementally using a for-await loop.
for await (const chunk of reader) {
console.log(`Next chunk: ${chunk}`);
}
// Option 2: Get the entire text after the stream completes.
const text = await reader.readAll();
console.log(`Received text: ${text}`);
});
room.registerTextStreamHandler('my-topic', (reader, participantInfo) => {
const info = reader.info;
console.log(
`Received text stream from ${participantInfo.identity}\n` +
` Topic: ${info.topic}\n` +
` Timestamp: ${info.timestamp}\n` +
` ID: ${info.id}\n` +
` Size: ${info.size}` // Optional, only available if the stream was sent with `sendText`
);
// Option 1: Process the stream incrementally using a for-await loop.
for await (const chunk of reader) {
console.log(`Next chunk: ${chunk}`);
}
// Option 2: Get the entire text after the stream completes.
const text = await reader.readAll();
console.log(`Received text: ${text}`);
});
try await room.localParticipant
.registerTextStreamHandler(for: "my-topic") { reader, participantIdentity in
let info = reader.info
print("""
Text stream received from \(participantIdentity)
Topic: \(info.topic)
Timestamp: \(info.timestamp)
ID: \(info.id)
Size: \(info.size) (only available if the stream was sent with `sendText`)
""")
// Option 1: Process the stream incrementally using a for-await loop
for try await chunk in reader {
print("Next chunk: \(chunk)")
}
// Option 2: Get the entire text after the stream completes
let text = try await reader.readAll()
print("Received text: \(text)")
}
Rust API 與其他 SDK 略有不同。如果您希望處理流程,則無需註冊主題處理程序,而是處理 TextStreamOpened
房間事件並從事件中取得讀取器。
while let Some(event) = room.subscribe().recv().await {
match event {
RoomEvent::TextStreamOpened { reader, topic, participant_identity } => {
if topic != "my-topic" { continue };
let Some(mut reader) = reader.take() else { continue };
let info = reader.info();
println!("Text stream received from {participant_identity}");
println!(" Topic: {}", info.topic);
println!(" Timestamp: {}", info.timestamp);
println!(" ID: {}", info.id);
println!(" Size: {:?}", info.total_length);
// Option 1: Process the stream incrementally as a Stream
// using `TryStreamExt` from the `futures_util` crate
while let Some(chunk) = reader.try_next().await? {
println!("Next chunk: {chunk}");
}
// Option 2: Get the entire text after the stream completes
let text = reader.read_all().await?;
println!("Received text: {text}");
}
_ => {}
}
}
room.RegisterTextStreamHandler(
"my-topic",
func(reader livekit.TextStreamReader, participantIdentity livekit.ParticipantIdentity) {
fmt.Printf("Text stream received from %s\n", participantIdentity)
// Option 1: Process the stream incrementally
res := ""
for {
// ReadString takes a delimiter
word, err := reader.ReadString(' ')
fmt.Printf("read word: %s\n", word)
res += word
if err != nil {
// EOF represents the end of the stream
if err == io.EOF {
break
} else {
fmt.Printf("failed to read text stream: %v\n", err)
break
}
}
}
// Similar to ReadString, there is Read(p []bytes), ReadByte(), ReadBytes(delim byte) and ReadRune() as well
// All of these methods return io.EOF when the stream is closed
// If the stream has no data, it will block until there is data or the stream is closed
// If the stream has data, but not as much as requested, it will return what is available without any error
// Option 2: Get the entire text after the stream completes
text := reader.ReadAll()
fmt.Printf("received text: %s\n", text)
},
)
Stream properties¶
這些是文字流上可用的所有屬性,可以從 send/stream
方法中設定或從處理程序(handler)中讀取。
Property | Description | Type |
---|---|---|
id | 此流的唯一識別碼。 | string |
topic | 用於將流路由到適當處理程序的主題名稱。 | string |
timestamp | 流的創建時間。 | number |
size | 預計總大小(以位元組為單位,UTF-8 編碼)(如果知道)。 | number |
attributes | 您的應用程式所需的附加屬性。 | string dict |
destinationIdentities | 發送流的參與者的身分。如果為空,則發送給所有人。 | array |
Concurrency¶
可以同時寫入或讀取多個流。如果對相同主題多次呼叫 sendText
或 streamText
,則接收方的處理程序將被呼叫多次,每個流調用一次。這些呼叫將按照發送方打開流的順序發生,並且流讀取器將按照發送方關閉流的順序關閉。
Joining mid-stream¶
在直播開始後加入房間的參與者將不會收到任何訊息。只有在串流開啟時連接的參與者才有資格接收它。
No message persistence¶
LiveKit 不包含文字流的長期持久性。所有數據僅在連接的參與者之間即時傳輸。如果您需要訊息歷史記錄,則需要使用資料庫或其他持久層自行實現儲存。
Chat components¶
LiveKit 為聊天等常見的文字流用例提供了預先建立的 React 元件。有關詳細信息,請參閱 Chat component 和 useChat hook。
Info
流是一種簡單而強大的發送文字的方式,但如果您需要精確控制單個資料包的行為,則較低級別的 data packets API 可能更合適。