Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kbatch job logs #81

Merged
merged 3 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions kbatch-proxy/kbatch_proxy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,25 @@ async def create_job(request: Request, user: User = Depends(get_current_user)):
return _create_job(data, V1Job, user)


@router.get("/jobs/logs/{job_name}/", response_class=Response)
async def job_logs(
job_name: str,
user: User = Depends(get_current_user),
stream: Optional[bool] = False,
):
core_api, _ = get_k8s_api()
pods = core_api.list_namespaced_pod(
namespace=user.namespace,
label_selector=f"batch.kubernetes.io/job-name={job_name}",
)
if not pods.items:
raise HTTPException(
status.HTTP_404_NOT_FOUND, detail=f"No pods found for job {job_name}"
)
pod_name = pods.items[0].metadata.name
return await pod_logs(pod_name, user=user, stream=stream)


# pods #
@router.get("/pods/{pod_name}")
async def read_pod(pod_name: str, user: User = Depends(get_current_user)):
Expand All @@ -250,8 +269,8 @@ async def read_pods(
return result.to_dict()


@router.get("/jobs/logs/{pod_name}/", response_class=Response)
async def logs(
@router.get("/pods/logs/{pod_name}/", response_class=Response)
async def pod_logs(
pod_name: str,
user: User = Depends(get_current_user),
stream: Optional[bool] = False,
Expand Down
12 changes: 8 additions & 4 deletions kbatch/kbatch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
configure,
delete_job,
format_jobs,
job_logs,
job_logs_streaming,
list_jobs,
list_pods,
logs,
logs_streaming,
pod_logs,
pod_logs_streaming,
show_job,
submit_job,
)
Expand All @@ -26,12 +28,14 @@
"Job",
"delete_job",
"format_jobs",
"job_logs",
"job_logs_streaming",
"list_jobs",
"list_pods",
"logs_streaming",
"logs",
"make_cronjob",
"make_job",
"pod_logs",
"pod_logs_streaming",
"show_job",
"submit_job",
]
54 changes: 43 additions & 11 deletions kbatch/kbatch/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def _request_action(


def show_job(
resource_name,
resource_name: str,
kbatch_url: str | None = None,
token: str | None = None,
model: Union[V1Job, V1CronJob] = V1Job,
Expand All @@ -146,7 +146,7 @@ def show_job(


def delete_job(
resource_name,
resource_name: str,
kbatch_url: str | None = None,
token: str | None = None,
model: Union[V1Job, V1CronJob] = V1Job,
Expand All @@ -167,11 +167,14 @@ def submit_job(
kbatch_url: str | None = None,
token: str | None = None,
model: Union[V1Job, V1CronJob] = V1Job,
code=None,
profile=None,
code: Path | str | None = None,
profile: str | dict | None = None,
):
from ._backend import make_job, make_cronjob

if isinstance(profile, str):
profile = load_profile(profile, kbatch_url=kbatch_url)

profile = profile or {}

if issubclass(model, V1Job):
Expand Down Expand Up @@ -221,8 +224,32 @@ def list_pods(
return r.json()


def logs(
pod_name,
def job_logs(
job_name: str,
kbatch_url: str | None = None,
token: str | None = None,
read_timeout: int = 60,
):
gen = _logs(
job_name, kbatch_url, token, stream=False, read_timeout=read_timeout, kind="job"
)
result = next(gen)
return result


def job_logs_streaming(
job_name: str,
kbatch_url: str | None = None,
token: str | None = None,
read_timeout: int = 60,
):
return _logs(
job_name, kbatch_url, token, stream=True, read_timeout=read_timeout, kind="job"
)


def pod_logs(
pod_name: str,
kbatch_url: str | None = None,
token: str | None = None,
read_timeout: int = 60,
Expand All @@ -232,8 +259,8 @@ def logs(
return result


def logs_streaming(
pod_name,
def pod_logs_streaming(
pod_name: str,
kbatch_url: str | None = None,
token: str | None = None,
read_timeout: int = 60,
Expand All @@ -242,7 +269,12 @@ def logs_streaming(


def _logs(
pod_name, kbatch_url, token, stream: Optional[bool] = False, read_timeout: int = 60
name,
kbatch_url,
token,
stream: Optional[bool] = False,
read_timeout: int = 60,
kind: str = "pod",
):
config = load_config()
client = httpx.Client(
Expand All @@ -258,7 +290,7 @@ def _logs(
if stream:
with client.stream(
"GET",
urllib.parse.urljoin(kbatch_url, f"jobs/logs/{pod_name}/"),
urllib.parse.urljoin(kbatch_url, f"{kind}s/logs/{name}/"),
headers=headers,
params=dict(stream=stream),
) as r:
Expand All @@ -267,7 +299,7 @@ def _logs(

else:
r = client.get(
urllib.parse.urljoin(kbatch_url, f"jobs/logs/{pod_name}/"),
urllib.parse.urljoin(kbatch_url, f"{kind}s/logs/{name}/"),
headers=headers,
)
r.raise_for_status()
Expand Down
50 changes: 38 additions & 12 deletions kbatch/kbatch/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def cronjob():

@cronjob.command(name="show")
@click.option("--kbatch-url", help="URL to the kbatch server.")
@click.option("--token", help="File to execute.")
@click.option("--token", help="kbatch auth token")
@click.argument("cronjob_name")
def show_cronjob(cronjob_name, kbatch_url, token):
"""Show the details for a cronjob."""
Expand All @@ -90,7 +90,7 @@ def show_cronjob(cronjob_name, kbatch_url, token):

@cronjob.command(name="delete")
@click.option("--kbatch-url", help="URL to the kbatch server.")
@click.option("--token", help="File to execute.")
@click.option("--token", help="kbatch auth token")
@click.argument("cronjob_name")
def delete_cronjob(cronjob_name, kbatch_url, token):
"""Delete a cronjob, cancelling running jobs and pods."""
Expand All @@ -100,7 +100,7 @@ def delete_cronjob(cronjob_name, kbatch_url, token):

@cronjob.command(name="list")
@click.option("--kbatch-url", help="URL to the kbatch server.")
@click.option("--token", help="File to execute.")
@click.option("--token", help="kbatch auth token")
@click.option(
"-o",
"--output",
Expand Down Expand Up @@ -203,7 +203,7 @@ def job():

@job.command(name="show")
@click.option("--kbatch-url", help="URL to the kbatch server.")
@click.option("--token", help="File to execute.")
@click.option("--token", help="kbatch auth token")
@click.argument("job_name")
def show_job(job_name, kbatch_url, token):
"""Show the details for a job."""
Expand All @@ -213,7 +213,7 @@ def show_job(job_name, kbatch_url, token):

@job.command(name="delete")
@click.option("--kbatch-url", help="URL to the kbatch server.")
@click.option("--token", help="File to execute.")
@click.option("--token", help="kbatch auth token")
@click.argument("job_name")
def delete_job(job_name, kbatch_url, token):
"""Delete a job, cancelling running pods."""
Expand All @@ -223,7 +223,7 @@ def delete_job(job_name, kbatch_url, token):

@job.command(name="list")
@click.option("--kbatch-url", help="URL to the kbatch server.")
@click.option("--token", help="File to execute.")
@click.option("--token", help="kbatch auth token")
@click.option(
"-o",
"--output",
Expand Down Expand Up @@ -312,6 +312,32 @@ def submit_job(
print(result["metadata"]["name"])


@job.command("logs")
@click.argument("job_name")
@click.option("--kbatch-url", help="URL to the kbatch server.")
@click.option("--token", help="Auth token")
@click.option("--stream/--no-stream", help="Whether to stream the logs", default=False)
@click.option("--read-timeout", help="Timeout for reading data", default=60, type=int)
@click.option("--pretty/--no-pretty", default=True)
def job_logs(job_name, kbatch_url, token, stream, pretty, read_timeout):
"""Get the logs for a kbatch job."""
if pretty:
print = rich.print

if stream:
result = _core.job_logs_streaming(
job_name, kbatch_url, token, read_timeout=read_timeout
)
else:
result = _core.job_logs(job_name, kbatch_url, token, read_timeout=read_timeout)

if stream:
for line in result:
print(line)
else:
print(result)


# POD
@cli.group()
def pod():
Expand All @@ -321,7 +347,7 @@ def pod():

@pod.command(name="list")
@click.option("--kbatch-url", help="URL to the kbatch server.")
@click.option("--token", help="File to execute.")
@click.option("--token", help="kbatch auth token")
@click.option(
"--job-name", help="The name of the job to limit the results to.", default=None
)
Expand All @@ -348,24 +374,24 @@ def list_pods(kbatch_url, token, job_name, output):
# TODO show pod


@pod.command()
@pod.command("logs")
@click.argument("pod_name")
@click.option("--kbatch-url", help="URL to the kbatch server.")
@click.option("--token", help="File to execute.")
@click.option("--token", help="kbatch auth token")
@click.option("--stream/--no-stream", help="Whether to stream the logs", default=False)
@click.option("--read-timeout", help="Timeout for reading data", default=60, type=int)
@click.option("--pretty/--no-pretty", default=True)
def logs(pod_name, kbatch_url, token, stream, pretty, read_timeout):
def pod_logs(pod_name, kbatch_url, token, stream, pretty, read_timeout):
"""Get the logs for a kbatch pod."""
if pretty:
print = rich.print

if stream:
result = _core.logs_streaming(
result = _core.pod_logs_streaming(
pod_name, kbatch_url, token, read_timeout=read_timeout
)
else:
result = _core.logs(pod_name, kbatch_url, token, read_timeout=read_timeout)
result = _core.pod_logs(pod_name, kbatch_url, token, read_timeout=read_timeout)

if stream:
for line in result:
Expand Down
30 changes: 13 additions & 17 deletions kbatch/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import re
import pathlib
import zipfile
from types import GeneratorType

import httpx
import respx
Expand Down Expand Up @@ -224,27 +225,22 @@ def test_list_pods(respx_mock: respx.MockRouter):
assert result == data


def test_logs(respx_mock: respx.MockRouter):
@pytest.mark.parametrize("kind", ["pod", "job"])
@pytest.mark.parametrize("streaming", ["streaming", ""])
def test_logs(respx_mock: respx.MockRouter, kind, streaming):
data = HERE.joinpath("data", "list_jobs.json").read_text()
respx_mock.get("http://kbatch.com/jobs/logs/mypod/").mock(
return_value=httpx.Response(200, text=data)
)

result = kbatch.logs("mypod", "http://kbatch.com/", token="abc")
assert result == data


def test_logs_streaming(respx_mock: respx.MockRouter):
data = HERE.joinpath("data", "list_jobs.json").read_text()
respx_mock.get("http://kbatch.com/jobs/logs/mypod/").mock(
respx_mock.get(f"http://kbatch.com/{kind}s/logs/mypod/").mock(
return_value=httpx.Response(200, text=data)
)
if streaming:
logs = getattr(kbatch, f"{kind}_logs_streaming")
else:
logs = getattr(kbatch, f"{kind}_logs")

buffers = []
gen = kbatch.logs_streaming("mypod", "http://kbatch.com/", token="abc")
for batch in gen:
buffers.append(batch)
result = "".join(buffers)
result = logs("mypod", "http://kbatch.com/", token="abc")
if streaming:
assert isinstance(result, GeneratorType)
result = "".join(result)
assert result == data


Expand Down
Loading