Skip to content

K8S

download_file(context, path_to_file_in_pod, path_to_where_save_file, component='')

Download file from pod.

Source code in saritasa_invocations/k8s.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
@invoke.task
def download_file(
    context: invoke.Context,
    path_to_file_in_pod: str,
    path_to_where_save_file: str,
    component: str = "",
) -> None:
    """Download file from pod."""
    config = get_current_env_config_from_context(context)
    download_file_from_pod(
        context,
        pod_namespace=config.namespace,
        get_pod_name_command=get_pod_cmd(
            context,
            component or config.default_component,
        ),
        path_to_file_in_pod=path_to_file_in_pod,
        path_to_where_save_file=path_to_where_save_file,
    )

download_file_and_remove_afterwards(context, path_to_file_in_pod, path_to_where_save_file)

Download file from k8s and delete it after work is done.

Source code in saritasa_invocations/k8s.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
@contextlib.contextmanager
def download_file_and_remove_afterwards(
    context: invoke.Context,
    path_to_file_in_pod: str,
    path_to_where_save_file: str,
) -> collections.abc.Generator[str, typing.Any, None]:
    """Download file from k8s and delete it after work is done."""
    download_file(
        context,
        path_to_file_in_pod=path_to_file_in_pod,
        path_to_where_save_file=path_to_where_save_file,
    )
    try:
        yield path_to_where_save_file
    finally:
        printing.print_success(
            f"Deleting file({path_to_where_save_file}) after use",
        )
        pathlib.Path(path_to_where_save_file).unlink()

download_file_from_pod(context, pod_namespace, get_pod_name_command, path_to_file_in_pod, path_to_where_save_file, retries=-1)

Download file from pod.

Source code in saritasa_invocations/k8s.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def download_file_from_pod(
    context: invoke.Context,
    pod_namespace: str,
    get_pod_name_command: str,
    path_to_file_in_pod: str,
    path_to_where_save_file: str,
    retries: int = -1,
) -> None:
    """Download file from pod."""
    context.run(
        "kubectl cp"
        f" --namespace {pod_namespace}"
        f" --retries={retries}"
        f" $({get_pod_name_command}):{path_to_file_in_pod}"
        f" {path_to_where_save_file}",
    )

execute(context, entry='', command='', env_params='', component='', pty=None, hide=None)

Execute command inside k8s pod.

Source code in saritasa_invocations/k8s.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
@invoke.task
def execute(
    context: invoke.Context,
    entry: str = "",
    command: str = "",
    env_params: str = "",
    component: str = "",
    pty: bool | None = None,
    hide: str | None = None,
) -> invoke.Result | None:
    """Execute command inside k8s pod."""
    config = get_current_env_config_from_context(context)
    component = component or config.default_component
    if not entry:
        entry = config.default_entry
        command = command or config.default_command
    if env_params:
        env_params = f"env {env_params}"
    entry_cmd = " ".join(filter(None, (env_params, entry, command)))
    success(context, f"Entering into {component} with {entry_cmd}")
    pod_cmd = get_pod_cmd(context, component)
    return context.run(
        f"kubectl exec -ti $({pod_cmd}) -- {entry_cmd}",
        pty=pty,
        hide=hide,
    )

get_current_env_config_from_context(context)

Return current environment data class based on current cluster.

Source code in saritasa_invocations/k8s.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def get_current_env_config_from_context(
    context: invoke.Context,
) -> _config.K8SGeneratedSettings:
    """Return current environment data class based on current cluster."""
    run_result = context.run(
        "kubectl config current-context",
        echo=False,
        hide="out",
        warn=True,
    )
    if not run_result or not run_result.stdout:
        return handle_error_on_getting_config(
            context=context,
            run_result=run_result,
        )
    current_context = run_result.stdout.splitlines()[0]
    run_result = context.run(
        "kubectl config view --minify | grep namespace | awk '{print $2}'",
        echo=False,
        hide="out",
    )
    if not run_result or not run_result.stdout:
        return handle_error_on_getting_config(
            context=context,
            run_result=run_result,
        )
    current_namespace = run_result.stdout.splitlines()[0]

    found_env: _config.K8SSettings | None = None
    config = _config.Config.from_context(context)
    for env in config.k8s_configs.values():
        context_matches = (env.context or env.proxy) == current_context
        is_known_namespace = current_namespace in env.namespace
        if context_matches and is_known_namespace:
            found_env = env
            break

    if not found_env:
        picked_env = rich.prompt.Prompt.ask(
            (
                f"Environment data class for the context `{current_context}`"
                f" and namespace `{current_namespace}` doesn't exist. "
                "Need to set context, which `env` to use?"
            ),
            choices=list(config.k8s_configs),
            default=config.default_k8s_env,
        )
        set_context(context, env=picked_env)
        return get_current_env_config_from_context(context)
    return _config.K8SGeneratedSettings.merge_settings(
        default=config.k8s_defaults,
        env_settings=found_env,
    )

get_env_secrets(context)

Get secrets from k8s and save it to file.

Source code in saritasa_invocations/k8s.py
307
308
309
310
311
312
313
314
315
316
317
318
@contextlib.contextmanager
def get_env_secrets(
    context: invoke.Context,
) -> collections.abc.Generator[str, typing.Any, None]:
    """Get secrets from k8s and save it to file."""
    config = get_current_env_config_from_context(context)
    with download_file_and_remove_afterwards(
        context,
        path_to_file_in_pod=config.secret_file_path_in_pod,
        path_to_where_save_file=config.temp_secret_file_path,
    ) as file_path:
        yield file_path

get_environment(context, env_name)

Get environment by its name.

Source code in saritasa_invocations/k8s.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def get_environment(
    context: invoke.Context,
    env_name: str,
) -> _config.K8SGeneratedSettings:
    """Get environment by its name."""
    config = _config.Config.from_context(context)
    environment = config.k8s_configs.get(env_name)
    if not environment:
        raise invoke.Exit(
            code=1,
            message=(
                f"Data class for `{env_name}` environment not found. "
                "Available environments: "
                f"{', '.join(config.k8s_configs.keys())}"
            ),
        )
    return _config.K8SGeneratedSettings.merge_settings(
        default=config.k8s_defaults,
        env_settings=environment,
    )

get_pod_cmd(context, component)

Get command for getting exact pod.

Source code in saritasa_invocations/k8s.py
115
116
117
118
119
120
121
122
123
124
def get_pod_cmd(
    context: invoke.Context,
    component: str,
) -> str:
    """Get command for getting exact pod."""
    config = get_current_env_config_from_context(context)
    return config.get_pod_name_command.format(
        component_selector=config.component_selector,
        component=component,
    )

handle_error_on_getting_config(context, run_result)

Handle error when fetching config.

Source code in saritasa_invocations/k8s.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def handle_error_on_getting_config(
    context: invoke.Context,
    run_result: invoke.runners.Result | None,
) -> _config.K8SGeneratedSettings:
    """Handle error when fetching config."""
    error_message = ""
    if run_result is None:
        error_message = "Unexpected error!"
    elif run_result.exited == 1:
        error_message = f"Encountered error: {run_result.stdout.strip()}"
    elif not run_result.stdout:
        error_message = "Unable to fetch data from command!"
    else:
        error_message = "Unexpected error!"
    printing.print_error(error_message)
    config = _config.Config.from_context(context)
    env = rich.prompt.Prompt.ask(
        "Usually such errors fixed by setting context, which `env` to use?",
        choices=list(config.k8s_configs),
        default=config.default_k8s_env,
    )
    set_context(context, env=env)
    return get_current_env_config_from_context(context)

health_check(context, component='')

Check health of component.

Source code in saritasa_invocations/k8s.py
237
238
239
240
241
242
243
244
@invoke.task
def health_check(
    context: invoke.Context,
    component: str = "",
) -> None:
    """Check health of component."""
    config = get_current_env_config_from_context(context)
    execute(context, component=component, entry=config.health_check)

login(context, proxy=None, auth=None, cluster=None)

Login into k8s via teleport.

Source code in saritasa_invocations/k8s.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@invoke.task
def login(
    context: invoke.Context,
    proxy: str | None = None,
    auth: str | None = None,
    cluster: str | None = None,
) -> None:
    """Login into k8s via teleport."""
    printing.print_success("Login into kubernetes CI")
    if not proxy and not auth and not cluster:
        config = get_current_env_config_from_context(context)
        proxy = config.proxy
        auth = config.auth
        cluster = config.cluster or config.proxy
    context.run(
        f"tsh login --proxy={proxy} --auth={auth} --kube-cluster={cluster}",
    )

logs(context, component='')

Get logs for k8s pod.

Source code in saritasa_invocations/k8s.py
176
177
178
179
180
181
182
183
184
185
186
187
@invoke.task
def logs(
    context: invoke.Context,
    component: str = "",
) -> None:
    """Get logs for k8s pod."""
    config = get_current_env_config_from_context(context)
    component = component or config.default_component
    success(context, f"Getting logs from {component}")
    context.run(
        f"kubectl logs $({get_pod_cmd(context, component=component)})",
    )

pods(context)

Get pods from k8s.

Source code in saritasa_invocations/k8s.py
190
191
192
193
194
195
196
@invoke.task
def pods(
    context: invoke.Context,
) -> None:
    """Get pods from k8s."""
    success(context, "Getting pods")
    context.run("kubectl get pods")

python_shell(context, component='')

Enter into python shell.

Source code in saritasa_invocations/k8s.py
227
228
229
230
231
232
233
234
@invoke.task
def python_shell(
    context: invoke.Context,
    component: str = "",
) -> None:
    """Enter into python shell."""
    config = get_current_env_config_from_context(context)
    execute(context, component=component, entry=config.python_shell)

set_context(context, env='')

Set k8s context to current project.

Source code in saritasa_invocations/k8s.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@invoke.task
def set_context(context: invoke.Context, env: str = "") -> None:
    """Set k8s context to current project."""
    printing.print_success("Setting context for k8s")
    config = _config.Config.from_context(context)
    env = env or config.default_k8s_env
    environment = get_environment(context, env)
    try:
        context.run(f"kubectl config use-context {environment.context}")
    except invoke.UnexpectedExit:
        printing.print_warn(
            "User is not logged into the environment, attempting to login",
        )
        # User needs to login for first time to be able to use env
        login(
            context,
            proxy=environment.proxy,
            auth=environment.auth,
            cluster=environment.cluster or environment.proxy,
        )
        context.run(f"kubectl config use-context {environment.context}")
    context.run(
        f"kubectl config set-context"
        f" --current --namespace={environment.namespace}",
    )
    printing.print_success(
        f"`{environment.name}` environment has been set up successfully.",
    )

success(context, message)

Display success message with mention of environment.

Source code in saritasa_invocations/k8s.py
321
322
323
324
325
326
327
328
329
330
def success(
    context: invoke.Context,
    message: str,
) -> None:
    """Display success message with mention of environment."""
    env = get_current_env_config_from_context(context)
    printing.print_success(
        message,
        title=f"[{env.env_color} bold underline]{env.name.upper()}",
    )