From fdf4dd395ba5e45692d7ba3b118f7947eaf24141 Mon Sep 17 00:00:00 2001 From: Min RK Date: Fri, 27 Sep 2024 16:14:02 +0200 Subject: [PATCH 1/3] add `kbatch job logs` allows getting logs by job name, which is usually what folks have rather than needing to do job -> pod lookup --- kbatch-proxy/kbatch_proxy/main.py | 23 +++++++++++-- kbatch/kbatch/__init__.py | 12 ++++--- kbatch/kbatch/_core.py | 54 ++++++++++++++++++++++++------- kbatch/kbatch/cli.py | 50 +++++++++++++++++++++------- 4 files changed, 110 insertions(+), 29 deletions(-) diff --git a/kbatch-proxy/kbatch_proxy/main.py b/kbatch-proxy/kbatch_proxy/main.py index 795ea14..c8b45ae 100644 --- a/kbatch-proxy/kbatch_proxy/main.py +++ b/kbatch-proxy/kbatch_proxy/main.py @@ -229,6 +229,25 @@ async def create_job(request: Request, user: User = Depends(get_current_user)): return _create_job(data, V1Job, user) +@router.get("/jobs/logs/{job_name}/", response_class=Response) +async def job_logs( + job_name: str, + user: User = Depends(get_current_user), + stream: Optional[bool] = False, +): + core_api, _ = get_k8s_api() + pods = core_api.list_namespaced_pod( + namespace=user.namespace, + label_selector=f"batch.kubernetes.io/job-name={job_name}", + ) + if not pods.items: + raise HTTPException( + status.HTTP_404_NOT_FOUND, detail=f"No pods found for job {job_name}" + ) + pod_name = pods.items[0].metadata.name + return await pod_logs(pod_name, user=user, stream=stream) + + # pods # @router.get("/pods/{pod_name}") async def read_pod(pod_name: str, user: User = Depends(get_current_user)): @@ -250,8 +269,8 @@ async def read_pods( return result.to_dict() -@router.get("/jobs/logs/{pod_name}/", response_class=Response) -async def logs( +@router.get("/pods/logs/{pod_name}/", response_class=Response) +async def pod_logs( pod_name: str, user: User = Depends(get_current_user), stream: Optional[bool] = False, diff --git a/kbatch/kbatch/__init__.py b/kbatch/kbatch/__init__.py index ddb7164..03988d1 100644 --- a/kbatch/kbatch/__init__.py +++ b/kbatch/kbatch/__init__.py @@ -6,10 +6,12 @@ configure, delete_job, format_jobs, + job_logs, + job_logs_streaming, list_jobs, list_pods, - logs, - logs_streaming, + pod_logs, + pod_logs_streaming, show_job, submit_job, ) @@ -26,12 +28,14 @@ "Job", "delete_job", "format_jobs", + "job_logs", + "job_logs_streaming", "list_jobs", "list_pods", - "logs_streaming", - "logs", "make_cronjob", "make_job", + "pod_logs", + "pod_logs_streaming", "show_job", "submit_job", ] diff --git a/kbatch/kbatch/_core.py b/kbatch/kbatch/_core.py index fc5ccce..a023d36 100644 --- a/kbatch/kbatch/_core.py +++ b/kbatch/kbatch/_core.py @@ -137,7 +137,7 @@ def _request_action( def show_job( - resource_name, + resource_name: str, kbatch_url: str | None = None, token: str | None = None, model: Union[V1Job, V1CronJob] = V1Job, @@ -146,7 +146,7 @@ def show_job( def delete_job( - resource_name, + resource_name: str, kbatch_url: str | None = None, token: str | None = None, model: Union[V1Job, V1CronJob] = V1Job, @@ -167,11 +167,14 @@ def submit_job( kbatch_url: str | None = None, token: str | None = None, model: Union[V1Job, V1CronJob] = V1Job, - code=None, - profile=None, + code: Path | None = None, + profile: str | None = None, ): from ._backend import make_job, make_cronjob + if isinstance(profile, str): + profile = load_profile(profile, kbatch_url=kbatch_url, token=token) + profile = profile or {} if issubclass(model, V1Job): @@ -221,8 +224,32 @@ def list_pods( return r.json() -def logs( - pod_name, +def job_logs( + job_name: str, + kbatch_url: str | None = None, + token: str | None = None, + read_timeout: int = 60, +): + gen = _logs( + job_name, kbatch_url, token, stream=False, read_timeout=read_timeout, kind="job" + ) + result = next(gen) + return result + + +def job_logs_streaming( + job_name: str, + kbatch_url: str | None = None, + token: str | None = None, + read_timeout: int = 60, +): + return _logs( + job_name, kbatch_url, token, stream=True, read_timeout=read_timeout, kind="job" + ) + + +def pod_logs( + pod_name: str, kbatch_url: str | None = None, token: str | None = None, read_timeout: int = 60, @@ -232,8 +259,8 @@ def logs( return result -def logs_streaming( - pod_name, +def pod_logs_streaming( + pod_name: str, kbatch_url: str | None = None, token: str | None = None, read_timeout: int = 60, @@ -242,7 +269,12 @@ def logs_streaming( def _logs( - pod_name, kbatch_url, token, stream: Optional[bool] = False, read_timeout: int = 60 + name, + kbatch_url, + token, + stream: Optional[bool] = False, + read_timeout: int = 60, + kind: str = "pod", ): config = load_config() client = httpx.Client( @@ -258,7 +290,7 @@ def _logs( if stream: with client.stream( "GET", - urllib.parse.urljoin(kbatch_url, f"jobs/logs/{pod_name}/"), + urllib.parse.urljoin(kbatch_url, f"{kind}s/logs/{name}/"), headers=headers, params=dict(stream=stream), ) as r: @@ -267,7 +299,7 @@ def _logs( else: r = client.get( - urllib.parse.urljoin(kbatch_url, f"jobs/logs/{pod_name}/"), + urllib.parse.urljoin(kbatch_url, f"{kind}s/logs/{name}/"), headers=headers, ) r.raise_for_status() diff --git a/kbatch/kbatch/cli.py b/kbatch/kbatch/cli.py index 7aba6e7..e1e66ef 100644 --- a/kbatch/kbatch/cli.py +++ b/kbatch/kbatch/cli.py @@ -80,7 +80,7 @@ def cronjob(): @cronjob.command(name="show") @click.option("--kbatch-url", help="URL to the kbatch server.") -@click.option("--token", help="File to execute.") +@click.option("--token", help="kbatch auth token") @click.argument("cronjob_name") def show_cronjob(cronjob_name, kbatch_url, token): """Show the details for a cronjob.""" @@ -90,7 +90,7 @@ def show_cronjob(cronjob_name, kbatch_url, token): @cronjob.command(name="delete") @click.option("--kbatch-url", help="URL to the kbatch server.") -@click.option("--token", help="File to execute.") +@click.option("--token", help="kbatch auth token") @click.argument("cronjob_name") def delete_cronjob(cronjob_name, kbatch_url, token): """Delete a cronjob, cancelling running jobs and pods.""" @@ -100,7 +100,7 @@ def delete_cronjob(cronjob_name, kbatch_url, token): @cronjob.command(name="list") @click.option("--kbatch-url", help="URL to the kbatch server.") -@click.option("--token", help="File to execute.") +@click.option("--token", help="kbatch auth token") @click.option( "-o", "--output", @@ -203,7 +203,7 @@ def job(): @job.command(name="show") @click.option("--kbatch-url", help="URL to the kbatch server.") -@click.option("--token", help="File to execute.") +@click.option("--token", help="kbatch auth token") @click.argument("job_name") def show_job(job_name, kbatch_url, token): """Show the details for a job.""" @@ -213,7 +213,7 @@ def show_job(job_name, kbatch_url, token): @job.command(name="delete") @click.option("--kbatch-url", help="URL to the kbatch server.") -@click.option("--token", help="File to execute.") +@click.option("--token", help="kbatch auth token") @click.argument("job_name") def delete_job(job_name, kbatch_url, token): """Delete a job, cancelling running pods.""" @@ -223,7 +223,7 @@ def delete_job(job_name, kbatch_url, token): @job.command(name="list") @click.option("--kbatch-url", help="URL to the kbatch server.") -@click.option("--token", help="File to execute.") +@click.option("--token", help="kbatch auth token") @click.option( "-o", "--output", @@ -312,6 +312,32 @@ def submit_job( print(result["metadata"]["name"]) +@job.command("logs") +@click.argument("job_name") +@click.option("--kbatch-url", help="URL to the kbatch server.") +@click.option("--token", help="Auth token") +@click.option("--stream/--no-stream", help="Whether to stream the logs", default=False) +@click.option("--read-timeout", help="Timeout for reading data", default=60, type=int) +@click.option("--pretty/--no-pretty", default=True) +def job_logs(job_name, kbatch_url, token, stream, pretty, read_timeout): + """Get the logs for a kbatch job.""" + if pretty: + print = rich.print + + if stream: + result = _core.job_logs_streaming( + job_name, kbatch_url, token, read_timeout=read_timeout + ) + else: + result = _core.job_logs(job_name, kbatch_url, token, read_timeout=read_timeout) + + if stream: + for line in result: + print(line) + else: + print(result) + + # POD @cli.group() def pod(): @@ -321,7 +347,7 @@ def pod(): @pod.command(name="list") @click.option("--kbatch-url", help="URL to the kbatch server.") -@click.option("--token", help="File to execute.") +@click.option("--token", help="kbatch auth token") @click.option( "--job-name", help="The name of the job to limit the results to.", default=None ) @@ -348,24 +374,24 @@ def list_pods(kbatch_url, token, job_name, output): # TODO show pod -@pod.command() +@pod.command("logs") @click.argument("pod_name") @click.option("--kbatch-url", help="URL to the kbatch server.") -@click.option("--token", help="File to execute.") +@click.option("--token", help="kbatch auth token") @click.option("--stream/--no-stream", help="Whether to stream the logs", default=False) @click.option("--read-timeout", help="Timeout for reading data", default=60, type=int) @click.option("--pretty/--no-pretty", default=True) -def logs(pod_name, kbatch_url, token, stream, pretty, read_timeout): +def pod_logs(pod_name, kbatch_url, token, stream, pretty, read_timeout): """Get the logs for a kbatch pod.""" if pretty: print = rich.print if stream: - result = _core.logs_streaming( + result = _core.pod_logs_streaming( pod_name, kbatch_url, token, read_timeout=read_timeout ) else: - result = _core.logs(pod_name, kbatch_url, token, read_timeout=read_timeout) + result = _core.pod_logs(pod_name, kbatch_url, token, read_timeout=read_timeout) if stream: for line in result: From 40d5b4fe0ba44e46dacef0568615ea032fa128fe Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 30 Sep 2024 11:50:05 +0200 Subject: [PATCH 2/3] update test for job logs --- kbatch/tests/test_core.py | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/kbatch/tests/test_core.py b/kbatch/tests/test_core.py index c5026cc..f4f81fa 100644 --- a/kbatch/tests/test_core.py +++ b/kbatch/tests/test_core.py @@ -5,6 +5,7 @@ import re import pathlib import zipfile +from types import GeneratorType import httpx import respx @@ -224,27 +225,22 @@ def test_list_pods(respx_mock: respx.MockRouter): assert result == data -def test_logs(respx_mock: respx.MockRouter): +@pytest.mark.parametrize("kind", ["pod", "job"]) +@pytest.mark.parametrize("streaming", ["streaming", ""]) +def test_logs(respx_mock: respx.MockRouter, kind, streaming): data = HERE.joinpath("data", "list_jobs.json").read_text() - respx_mock.get("http://kbatch.com/jobs/logs/mypod/").mock( - return_value=httpx.Response(200, text=data) - ) - - result = kbatch.logs("mypod", "http://kbatch.com/", token="abc") - assert result == data - - -def test_logs_streaming(respx_mock: respx.MockRouter): - data = HERE.joinpath("data", "list_jobs.json").read_text() - respx_mock.get("http://kbatch.com/jobs/logs/mypod/").mock( + respx_mock.get(f"http://kbatch.com/{kind}s/logs/mypod/").mock( return_value=httpx.Response(200, text=data) ) + if streaming: + logs = getattr(kbatch, f"{kind}_logs_streaming") + else: + logs = getattr(kbatch, f"{kind}_logs") - buffers = [] - gen = kbatch.logs_streaming("mypod", "http://kbatch.com/", token="abc") - for batch in gen: - buffers.append(batch) - result = "".join(buffers) + result = logs("mypod", "http://kbatch.com/", token="abc") + if streaming: + assert isinstance(result, GeneratorType) + result = "".join(result) assert result == data From 7f87ff27ca4392e57a3753d9c898ec4bbe7016a6 Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 30 Sep 2024 11:54:48 +0200 Subject: [PATCH 3/3] fix type on code, profile --- kbatch/kbatch/_core.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kbatch/kbatch/_core.py b/kbatch/kbatch/_core.py index a023d36..770755f 100644 --- a/kbatch/kbatch/_core.py +++ b/kbatch/kbatch/_core.py @@ -167,13 +167,13 @@ def submit_job( kbatch_url: str | None = None, token: str | None = None, model: Union[V1Job, V1CronJob] = V1Job, - code: Path | None = None, - profile: str | None = None, + code: Path | str | None = None, + profile: str | dict | None = None, ): from ._backend import make_job, make_cronjob if isinstance(profile, str): - profile = load_profile(profile, kbatch_url=kbatch_url, token=token) + profile = load_profile(profile, kbatch_url=kbatch_url) profile = profile or {}