Skip to content

DB K8S

create_dump(context, dbname, host, port, username, password, file='', additional_params='')

Execute dump command in db pod.

Source code in saritasa_invocations/db_k8s.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@invoke.task
def create_dump(
    context: invoke.Context,
    dbname: str,
    host: str,
    port: str,
    username: str,
    password: str,
    file: str = "",
    additional_params: str = "",
) -> None:
    """Execute dump command in db pod."""
    config = _config.Config.from_context(context)
    command = _generate_dump_command(
        context,
        file=file,
        dbname=dbname,
        host=host,
        port=port,
        username=username,
        additional_params=additional_params,
    )
    k8s.success(context, f"Entering into db with {command}")
    db_exec_command = _generate_exec_command(context)
    context.run(
        f"{db_exec_command} -- {command}",
        watchers=(
            invoke.Responder(
                pattern=config.db.password_pattern,
                response=f"{password}\n",
            ),
        ),
    )

get_dump(context, file='')

Download db data from db pod if it present.

Source code in saritasa_invocations/db_k8s.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@invoke.task
def get_dump(
    context: invoke.Context,
    file: str = "",
) -> str:
    """Download db data from db pod if it present."""
    config = k8s.get_current_env_config_from_context(context).db_config
    file = _get_db_k8s_dump_filename(context)

    k8s.success(context, f"Downloading dump ({file}) from pod")
    dump_path = f"{config.dump_dir}/{file}"
    k8s.download_file_from_pod(
        context,
        pod_namespace=config.namespace,
        get_pod_name_command=_generate_get_pod_name_command(context),
        path_to_file_in_pod=dump_path,
        path_to_where_save_file=f"{pathlib.Path.cwd()}/{file}",
    )
    k8s.success(context, f"Downloaded dump ({file}) from pod. Clean up")
    context.run(f"{_generate_exec_command(context)} -- rm {dump_path}")
    return file