# For licensing see accompanying LICENSE file.
# Copyright (C) 2026 Apple Inc. All Rights Reserved.
import asyncio
import json
import logging
from apple_fm_sdk.transcript import Transcript
from .c_helpers import (
_ManagedObject,
_register_handle,
_session_callback,
_session_structured_callback,
_unregister_handle,
StreamingCallback,
)
from .core import SystemLanguageModel
from .tool import Tool
from .generable import Generable, GeneratedContent
from .generation_schema import GenerationSchema
import threading
import queue
from typing import Any, Optional, AsyncIterator, Type, Union, overload
from .errors import FoundationModelsError
import ctypes
logger = logging.getLogger(__name__)
try:
from . import _ctypes_bindings as lib
except ImportError:
raise ImportError(
"Foundation Models C bindings not found. Please ensure _foundationmodels_ctypes.py is available."
)
Prompt = str # Alias for prompt type
[docs]
class LanguageModelSession(_ManagedObject):
"""Represents a language model session for foundation model interactions.
A ``LanguageModelSession`` manages the lifecycle of a session with a foundation model,
maintaining session history (transcript), handling tool calls, and providing both
synchronous and streaming response capabilities.
The session is thread-safe for sequential requests but does not support concurrent
requests. If a request is in progress, attempting another request will wait for the
first to complete.
**Session Lifecycle:**
1. **Creation**: Initialize with optional instructions, model configuration, and tools
2. **Active Use**: Make requests via ``respond()`` or ``stream_response()``
3. **Cleanup**: Automatically handled via context manager or explicit cleanup
**Concurrent Request Handling:**
Sessions use an internal lock to prevent concurrent requests. If you need to handle
multiple requests simultaneously, create multiple session instances.
Examples:
Basic session creation and usage::
import apple_fm_sdk as fm
# Create a simple session
session = fm.LanguageModelSession()
response = await session.respond("Hello, how are you?")
print(response)
Session with instructions::
import apple_fm_sdk as fm
# Guide the model's behavior with instructions
session = fm.LanguageModelSession(
instructions="You are a helpful bird expert. Provide concise, "
"accurate information about birds."
)
response = await session.respond("What is a Swift?")
Session with custom model and tools::
import apple_fm_sdk as fm
from my_tools import CalculatorTool, WeatherTool
model = fm.SystemLanguageModel(
temperature=0.7,
top_p=0.9
)
session = fm.LanguageModelSession(
instructions="You are a helpful assistant with access to tools.",
model=model,
tools=[CalculatorTool(), WeatherTool()]
)
response = await session.respond("What's the weather like in Cupertino?")
See Also:
- :class:`~apple_fm_sdk.core.SystemLanguageModel`: For model configuration
- :class:`~apple_fm_sdk.tool.Tool`: For creating custom tools
- :class:`~apple_fm_sdk.transcript.Transcript`: For accessing session history
"""
[docs]
def __init__(
self,
instructions: Optional[str] = None,
model: Optional[SystemLanguageModel] = None,
tools: Optional[list[Tool]] = None,
_ptr=None,
):
"""Create a language model session.
:param instructions: Optional system instructions to guide the model's behavior throughout
the session. These instructions persist across all requests in the session.
Example: "You are a helpful coding assistant."
:type instructions: Optional[str]
:param model: Optional specialized system model configuration. If not provided, uses default
SystemLanguageModel() with standard settings. Use this to customize temperature,
top_p, and other generation parameters.
:type model: Optional[SystemLanguageModel]
:param tools: Optional list of Tool instances that the model can invoke during generation.
Tools enable the model to perform actions like calculations, API calls, or
database queries. The model will automatically decide when to use tools based
on the session context.
:type tools: Optional[list[Tool]]
:raises FoundationModelsError: If session creation fails
Note:
The session maintains a transcript of all interactions, which can be accessed
via the ``transcript`` property. This transcript is automatically updated after
each request.
"""
# Initialize request lock for preventing concurrent requests
self._request_lock = asyncio.Lock()
self._active_task = None
if _ptr is not None:
# Internal constructor for specific ptr
super().__init__(_ptr)
else:
# Create model pointer
model_ptr = model._ptr if model else None
# Encode instructions if provided
instructions_cstr = None
if instructions:
instructions_cstr = instructions.encode("utf-8")
# Create array of tool pointers
tool_count = len(tools) if tools else 0
tool_refs = (ctypes.c_void_p * tool_count)()
if tools:
for i, tool in enumerate(tools):
tool_refs[i] = tool._ptr
# Create the session via C binding
ptr = lib.FMLanguageModelSessionCreateFromSystemLanguageModel(
model_ptr, instructions_cstr, tool_refs, tool_count
)
# Create transcript
self.transcript = Transcript(ptr)
# model object will be retained by LanguageModelSession in Swift
# so here we don't need to retain model
super().__init__(ptr)
# This opaque pointer already has 1 ref count by `passRetained`
@property
def is_responding(self) -> bool:
"""Check if the session is currently responding to a request.
Returns:
bool: True if the session is currently processing a request, False otherwise
"""
return lib.FMLanguageModelSessionIsResponding(self._ptr)
def _reset_task_state(self):
"""Reset the task memory management state after a cancelled or failed request.
This is an internal function that cleans up task-related state to ensure the
session is ready to accept new requests. It does NOT create a new session or
clear the session transcript - it only resets the internal task handling
state.
This function is automatically called after cancellations and errors, so client
code should not need to call it directly.
"""
lib.FMLanguageModelSessionReset(self._ptr)
@overload # This overload helps the type checker understand the return type
async def respond(self, prompt: Prompt) -> str: ...
@overload # This overload helps the type checker understand the return type
async def respond(
self, prompt: Prompt, *, generating: type[Generable]
) -> Type[Any]: ...
@overload # This overload helps the type checker understand the return type
async def respond(self, prompt: Prompt, *, generating: Generable) -> Type[Any]: ...
@overload # This overload helps the type checker understand the return type
async def respond(
self, prompt: Prompt, *, schema: GenerationSchema
) -> GeneratedContent: ...
@overload # This overload helps the type checker understand the return type
async def respond(self, prompt: str, *, json_schema: dict) -> GeneratedContent: ...
[docs]
async def respond(
self,
prompt: str,
generating: Optional[Union[Type[Generable], Generable]] = None,
*,
schema: Optional[GenerationSchema] = None,
json_schema: Optional[dict] = None,
) -> Union[str, Any, GeneratedContent]:
"""Get a response to a prompt with optional guided generation.
This function supports multiple response modes:
1. **Basic text response**: Returns a plain string
2. **Guided generation with Generable**: Returns a typed Python object
3. **Guided generation with schema**: Returns structured GeneratedContent
4. **Guided generation with JSON schema**: Returns structured GeneratedContent
The session automatically updates its transcript after each response, maintaining
the full session history.
:param prompt: The input prompt string to send to the model
:type prompt: str
:param generating: Optional Generable type or instance for type-safe guided generation.
When provided, the response will be constrained to match the structure of
the Generable type and automatically converted to an instance of that type.
:type generating: Optional[Union[Type[Generable], Generable]]
:param schema: Optional GenerationSchema for explicit schema-based guided generation.
Use this for custom schemas that don't map to a Generable type.
:type schema: Optional[GenerationSchema]
:param json_schema: Optional JSON schema dictionary for guided generation. The schema
should follow JSON Schema specification.
:type json_schema: Optional[dict]
:return: Plain text response if no generation constraints are specified, or
instance of generating type if ``generating`` parameter is provided, or
structured content if ``schema`` or ``json_schema`` is provided
:rtype: Union[str, Any, GeneratedContent]
:raises FoundationModelsError: If the response fails or times out
:raises ValueError: If both ``generating`` and ``schema`` are provided, or if the
generating type is not a valid Generable
:raises asyncio.CancelledError: If the request is cancelled
Examples:
Basic text response::
import apple_fm_sdk as fm
session = fm.LanguageModelSession()
response = await session.respond("What is the capital of France?")
print(response) # Plain string response
Guided generation with Generable type::
import apple_fm_sdk as fm
@fm.generable()
class Cat:
name: str
age: int
profile: str
session = fm.LanguageModelSession()
cat = await session.respond(
"Generate a cat named Maomao",
generating=Cat
)
print(f"{cat.name} is {cat.age} years old")
Multi-turn session::
import apple_fm_sdk as fm
session = fm.LanguageModelSession(
instructions="You are a helpful expert on architecture."
)
# First turn
response1 = await session.respond("What is the tallest building in the world?")
print(response1)
# Second turn - context is maintained
response2 = await session.respond(
"What's the architectural style of that building?"
)
print(response2)
Note:
- Only one of ``generating``, ``schema``, or ``json_schema`` can be specified
- The session maintains session context across multiple ``respond()`` calls
- Concurrent calls to ``respond()`` on the same session will be serialized
- For streaming responses, use :meth:`stream_response` instead
See Also:
- :meth:`stream_response`: For streaming text responses
- :class:`~apple_fm_sdk.generable.Generable`: For creating typed response structures
- :class:`~apple_fm_sdk.generation_schema.GenerationSchema`: For custom schemas
"""
# Validate arguments
if generating is not None and schema is not None:
raise ValueError("Cannot specify both 'generating' and 'schema' arguments")
# Handle guided generation with generable type
if generating is not None:
if not isinstance(generating, Generable):
raise ValueError(
f"{generating.__name__} is not a Generable type. Use @generable decorator."
)
# Get the generation schema for the type
gen_schema = generating.generation_schema()
# Use the schema-based respond method
generated_content = await self._respond_with_schema(prompt, gen_schema)
# Convert GeneratedContent to the target type
return generating._from_generated_content(generated_content)
# Handle guided generation with explicit schema
if schema is not None:
return await self._respond_with_schema(prompt, schema)
# Handle guided generation from raw JSON schema string
if json_schema is not None:
return await self._respond_with_schema_from_json(prompt, json_schema)
# Handle basic text response
return await self._respond_basic(prompt)
async def _respond_basic(self, prompt: str) -> str:
"""Get a complete basic text response to a prompt.
Args:
prompt: The input prompt
Returns:
The complete response text
Raises:
GenerationError: If the response fails with specific error type
FoundationModelsError: For other errors like timeout
ConcurrentRequestsError: If another request is already in progress
"""
# Acquire lock to prevent concurrent requests
async with self._request_lock:
loop = asyncio.get_running_loop()
future = loop.create_future()
prompt_bytes = prompt.encode("utf-8")
future_handle = _register_handle(future)
task = lib.FMLanguageModelSessionRespond(
self._ptr, prompt_bytes, future_handle, _session_callback
)
# Store active task reference
self._active_task = task
try:
await future
except asyncio.CancelledError as e:
# Cancel the native task
lib.FMTaskCancel(task)
future.cancel()
# Wait for the task to actually complete cancellation
# Poll is_responding until it becomes False, with timeout
max_wait_time = 1.0 # Maximum 1 second wait
poll_interval = 0.01 # Poll every 10ms
elapsed = 0.0
while self.is_responding and elapsed < max_wait_time:
await asyncio.sleep(poll_interval)
elapsed += poll_interval
# Reset task state to ensure the session is ready for new requests
self._reset_task_state()
raise e
finally:
# Clean up handle to prevent memory leaks
_unregister_handle(future_handle)
lib.FMRelease(task)
self._active_task = None
return future.result()
async def _respond_with_schema(
self, prompt: str, schema: GenerationSchema
) -> GeneratedContent:
"""Internal method for guided generation using a GenerationSchema."""
# Acquire lock to prevent concurrent requests
async with self._request_lock:
loop = asyncio.get_running_loop()
future = loop.create_future()
prompt_bytes = prompt.encode("utf-8")
future_handle = _register_handle(future)
# Always use the proper C binding for guided generation
task = lib.FMLanguageModelSessionRespondWithSchema(
self._ptr,
prompt_bytes,
schema._ptr,
future_handle,
_session_structured_callback,
)
# Store active task reference
self._active_task = task
try:
await future
except asyncio.CancelledError as e:
# Cancel the native task
lib.FMTaskCancel(task)
future.cancel()
# Wait for the task to actually complete cancellation
# Poll is_responding until it becomes False, with timeout
max_wait_time = 1.0 # Maximum 1 second wait
poll_interval = 0.01 # Poll every 10ms
elapsed = 0.0
while self.is_responding and elapsed < max_wait_time:
await asyncio.sleep(poll_interval)
elapsed += poll_interval
# Reset task state to ensure the session is ready for new requests
self._reset_task_state()
raise e
except Exception as e:
# On any error, reset task state to ensure clean state
self._reset_task_state()
raise e
finally:
# Clean up handle to prevent memory leaks
_unregister_handle(future_handle)
lib.FMRelease(task)
self._active_task = None
return future.result()
async def _respond_with_schema_from_json(
self, prompt: str, json_schema: dict
) -> GeneratedContent:
"""Internal method for guided generation using a JSON schema string."""
# Acquire lock to prevent concurrent requests
async with self._request_lock:
loop = asyncio.get_running_loop()
future = loop.create_future()
prompt_bytes = prompt.encode("utf-8")
json_schema_bytes = json.dumps(json_schema).encode("utf-8")
future_handle = _register_handle(future)
# Use the C binding for guided generation with JSON schema
task = lib.FMLanguageModelSessionRespondWithSchemaFromJSON(
self._ptr,
prompt_bytes,
json_schema_bytes,
future_handle,
_session_structured_callback,
)
# Store active task reference
self._active_task = task
try:
await future
except asyncio.CancelledError as e:
# Cancel the native task
lib.FMTaskCancel(task)
future.cancel()
# Wait for the task to actually complete cancellation
# Poll is_responding until it becomes False, with timeout
max_wait_time = 1.0 # Maximum 1 second wait
poll_interval = 0.01 # Poll every 10ms
elapsed = 0.0
while self.is_responding and elapsed < max_wait_time:
await asyncio.sleep(poll_interval)
elapsed += poll_interval
# Reset task state to ensure the session is ready for new requests
self._reset_task_state()
raise e
except Exception as e:
# On any error, reset task state to ensure clean state
self._reset_task_state()
raise e
finally:
# Clean up handle to prevent memory leaks
_unregister_handle(future_handle)
lib.FMRelease(task)
self._active_task = None
return future.result()
[docs]
async def stream_response(self, prompt: Prompt) -> AsyncIterator:
"""Stream response chunks for a prompt (text only).
This function provides real-time streaming of the model's response, yielding text
snapshots as they become available. Each yielded value represents the complete
response text generated so far, rather than the delta from the previous chunk.
**Streaming Behavior:**
- Yields complete text snapshots (not deltas) as generation progresses
- The final yield contains the complete response
- Automatically updates the session transcript after completion
- Does not support guided generation (text responses only)
- Can be cancelled mid-stream using asyncio cancellation
:param prompt: The input prompt string to send to the model
:type prompt: str
:yields: Progressive snapshots of the response text. Each snapshot contains
the full text generated so far, rather than only the new tokens.
:ytype: str
:raises FoundationModelsError: If streaming fails or encounters an error
:raises asyncio.CancelledError: If the stream is cancelled
Examples:
Basic streaming::
import apple_fm_sdk as fm
session = fm.LanguageModelSession()
async for chunk in session.stream_response("Tell me a story"):
print(chunk, end="", flush=True)
Cancelling a stream::
import asyncio
import apple_fm_sdk as fm
session = fm.LanguageModelSession()
async def stream_with_timeout():
try:
async for chunk in session.stream_response("Write a long essay"):
print(chunk)
# Simulate some processing
await asyncio.sleep(0.1)
except asyncio.CancelledError:
print("Stream cancelled")
raise
# Cancel after 5 seconds
task = asyncio.create_task(stream_with_timeout())
await asyncio.sleep(5)
task.cancel()
Streaming with error handling::
import apple_fm_sdk as fm
session = fm.LanguageModelSession()
try:
async for chunk in session.stream_response("Hello"):
print(chunk)
except fm.FoundationModelsError as e:
print(f"Streaming error: {e}")
Note:
- Streaming currently only supports basic text responses
- For guided generation, use :meth:`respond` instead
- Each snapshot contains the full text, rather than only new tokens
- The session transcript is updated only after streaming completes
- Breaking out of the async for loop early will properly clean up resources
See Also:
- :meth:`respond`: For non-streaming responses with guided generation support
"""
# Handle basic text streaming only
async for chunk in self._stream_response_basic(prompt):
yield chunk
async def _stream_response_basic(self, prompt: Prompt) -> AsyncIterator[str]:
"""Stream basic text response chunks for a prompt.
Args:
prompt: The input prompt
Yields:
Response text snapshots as they become available
"""
callback = StreamingCallback()
stream_thread = None
stream_ptr_holder = [None] # Use list to allow modification in nested function
def _start_stream():
prompt_bytes = prompt.encode("utf-8")
stream_ptr = lib.FMLanguageModelSessionStreamResponse(
self._ptr, prompt_bytes
)
stream_ptr_holder[0] = stream_ptr # Store for cleanup
if not stream_ptr:
callback.error = FoundationModelsError(
"Failed to create response stream"
)
callback.queue.put(None)
callback.completed.set()
return
try:
lib.FMLanguageModelSessionResponseStreamIterate(
stream_ptr, None, callback._callback
)
except Exception as e:
callback.error = FoundationModelsError(f"Stream iteration error: {e}")
callback.queue.put(None)
callback.completed.set()
try:
# Start streaming in a separate thread
stream_thread = threading.Thread(target=_start_stream)
stream_thread.daemon = True
stream_thread.start()
# Yield snapshots as they become available
while True:
try:
# Use a timeout to allow checking for completion
snapshot = callback.queue.get(timeout=0.1)
if snapshot is None: # End signal
break
yield snapshot
except queue.Empty:
# Check if we're done or have an error
if callback.completed.is_set():
# Check for any remaining items in queue
try:
while True:
snapshot = callback.queue.get_nowait()
if snapshot is None:
break
yield snapshot
except queue.Empty:
pass
break
# Continue waiting for more data
continue
# Check for errors after completion
if callback.error:
raise callback.error
finally:
# Ensure the stream thread completes before we exit
# This prevents segfaults when breaking early from the stream
if stream_thread and stream_thread.is_alive():
# Wait for thread to finish - the native iteration must complete
# before we can safely release the stream pointer
stream_thread.join(timeout=2.0)
# Now it's safe to release the stream pointer
# This must happen after the thread completes to prevent segfaults
if stream_ptr_holder[0]:
lib.FMRelease(stream_ptr_holder[0])