-
Notifications
You must be signed in to change notification settings - Fork 146
feat: preserve initial message in task history by reusing TaskManager #801
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -14,10 +14,8 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.a2aproject.sdk.server.tasks.TaskSerializationException; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.a2aproject.sdk.server.tasks.TaskStore; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.a2aproject.sdk.spec.A2AError; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.a2aproject.sdk.spec.A2AServerException; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.a2aproject.sdk.spec.Event; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.a2aproject.sdk.spec.InternalError; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.a2aproject.sdk.spec.Message; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.a2aproject.sdk.spec.Task; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.a2aproject.sdk.spec.StreamingEventKind; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.a2aproject.sdk.spec.TaskArtifactUpdateEvent; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -207,7 +205,7 @@ private void processEvent(MainEventBusContext context) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // If this throws, we distribute an error to ensure "persist before client visibility" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| boolean isFinal = updateTaskStore(taskId, event, isReplicated); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| boolean isFinal = updateTaskStore(taskId, event, isReplicated, mainQueue); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| eventToDistribute = event; // Success - distribute original event | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -279,9 +277,10 @@ private void processEvent(MainEventBusContext context) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Updates TaskStore using TaskManager.process(). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * <p> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Creates a temporary TaskManager instance for this event and delegates to its process() method, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * which handles all event types (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * This leverages existing TaskManager logic for status updates, artifact appending, message history, etc. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * Attempts to reuse the TaskManager from the MainQueue (if available), which preserves | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * the initial message in the task's history. Falls back to creating a temporary TaskManager | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * if none is available. The TaskManager.process() method handles all event types | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * </p> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * <p> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * If persistence fails, the exception is propagated to processEvent() which distributes an | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -291,21 +290,32 @@ private void processEvent(MainEventBusContext context) { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param taskId the task ID | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param event the event to persist | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param isReplicated whether this is a replicated event | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param mainQueue the main queue for this task | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @return true if the task reached a final state, false otherwise | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @throws InternalError if persistence fails | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private boolean updateTaskStore(String taskId, Event event, boolean isReplicated) throws InternalError { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private boolean updateTaskStore(String taskId, Event event, boolean isReplicated, EventQueue.MainQueue mainQueue) throws InternalError { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Extract contextId from event (all relevant events have it) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| String contextId = extractContextId(event); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Create temporary TaskManager instance for this event | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TaskManager taskManager = new TaskManager(taskId, contextId, taskStore, null); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Try to get TaskManager from MainQueue (preserves initial message for new tasks) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Falls back to creating a temporary one for existing tasks | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| TaskManager taskManager = mainQueue.getInitialMessageTaskManager(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (taskManager == null) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Create temporary TaskManager instance for this event | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| taskManager = new TaskManager(taskId, contextId, taskStore, null); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| LOGGER.debug("Created temporary TaskManager for task {} (no TaskManager on queue)", taskId); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| LOGGER.debug("Reusing TaskManager from queue for task {} (preserves initial message)", taskId); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+303
to
+312
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To prevent memory leaks and ensure the
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Use TaskManager.process() - handles all event types with existing logic | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| boolean isFinal = taskManager.process(event, isReplicated); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| LOGGER.debug("TaskStore updated via TaskManager.process() for task {}: {} (final: {}, replicated: {})", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| taskId, event.getClass().getSimpleName(), isFinal, isReplicated); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return isFinal; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (TaskSerializationException e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clearing the
TaskManagerhere introduces a race condition for non-blocking requests (returnImmediately=true). InDefaultRequestHandler.onMessageSend, theChildQueueis closed immediately in thefinallyblock (line 617). If theMainEventBusProcessorhasn't processed the first event yet, theTaskManager(and thus the initial message) will be lost before it can be used. It is safer to clear theTaskManagerinMainEventBusProcessorafter its first successful use, or inMainQueue.doCloseas a final cleanup.