-
Notifications
You must be signed in to change notification settings - Fork 7.2k
[data] continue grabbing task state until response is not None #60592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[data] continue grabbing task state until response is not None #60592
Conversation
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request aims to fix a race condition where get_task could return None for a hanging task if queried too quickly. The proposed change correctly adds a condition to re-fetch the task state if it was previously None. However, this introduces a subtle bug where the hanging task timer is incorrectly reset, which could delay or prevent the detection of hanging tasks. I've added a comment with details on the issue.
Regarding your question on testing, this race condition could be tested by mocking ray.util.state.get_task to return None on the first call for a given task, and a valid TaskState on a subsequent call. You could then assert that the task state is eventually populated in the detector's internal state and that the hanging issue is correctly reported with the full task details.
python/ray/data/_internal/issue_detection/detectors/hanging_detector.py
Outdated
Show resolved
Hide resolved
| # NOTE: The task_id + node_id will not change once we grab the task state. | ||
| # Therefore, we can avoid an rpc call if we have already retrieved state info. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I felt confused while reading this code because I don't think it's obvious that task_id and node_id are fields on the task_state dataclass. Could you clarify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, what about all of the other fields that can possibly change? Do we not care about those?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TaskState is defined by core: https://github.com/iamjustinhsu/ray/blob/d35d310a0759a0112335e6a74583ebe164a7d648/python/ray/util/state/common.py#L731. My previous implementation assume that tasks cannot change their node_id, or task_id. Upon thinking about this more, I'm not sure that is true if a task is retried. Because of this and the interest of simplicity, I decided to grab the new state every time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Description
Previously, I added
task_id,node_id, andattempt_numberfor hanging tasks in #59793. However, this introduced a race condition when querying for task state:get_taskreturnsNonehttps://github.com/iamjustinhsu/ray/blob/75f9731f69f4b9c7b973f53b74d0580adb3c4ab9/python/ray/data/_internal/issue_detection/detectors/hanging_detector.py#L161 because task state not ready.for 2), we only fire off when the task wasn't hanging before, or if the task has produced bytes since last checked. My fix is to also check if
previous_state.task_stateisNonetooI ran this many times, and the race condition stopped. Open to ideas on testing this too
Related issues
Additional information