# -*- coding: utf-8 -*-
"""
这是 SDK 的核心模块, 实现了各种底层方法.
"""
import typing as T
import time
from boto_session_manager import BotoSesManager
import aws_ssm_run_command.api as aws_ssm_run_command
from acore_server_metadata.api import Server
from ..agent.api import SOAPRequest, SOAPResponse
from ..exc import EC2IsNotRunningError, RunCommandError
[docs]def build_cli_arg_for_gm(
command: str,
username: T.Optional[str] = None,
password: T.Optional[str] = None,
raises: bool = True,
s3uri_output: T.Optional[str] = None,
path_cli: str = "/home/ubuntu/git_repos/acore_soap_app-project/.venv/bin/acsoap",
) -> str:
"""
构造最终的 acsoap 命令行参数. 以便之后 pass 给
:meth:`acore_soap_app.cli.main.Command.gm` 命令.
"""
args = [
path_cli,
"gm",
f'"{command}"',
]
if username is not None:
args.append(f"-u={username}")
if password is not None:
args.append(f"-p={password}")
if raises is not None:
if raises:
args.append(f"-r=True")
else:
args.append(f"-r=False")
if s3uri_output is not None:
args.append(f"-s={s3uri_output}")
return " ".join(args)
[docs]def run_soap_command(
bsm: BotoSesManager,
server_id: str,
request_like: T.Union[
str,
T.List[str],
SOAPRequest,
T.List[SOAPRequest],
],
username: T.Optional[str] = None,
password: T.Optional[str] = None,
raises: bool = True,
s3uri_input: T.Optional[str] = None,
s3uri_output: T.Optional[str] = None,
path_cli: str = "/home/ubuntu/git_repos/acore_soap_app-project/.venv/bin/acsoap",
sync: bool = True,
delays: int = 1,
timeout: int = 10,
verbose: bool = True,
) -> T.Union[T.List[SOAPResponse], str]:
"""
从任何地方, 通过 SSM Run Command, 远程执行 SOAP 命令.
Usage Example:
.. code-block:: python
>>> response = run_soap_command(bsm, "sbx-blue", ".server info")
>>> response = run_soap_command(bsm, "sbx-blue", [".account create test1 test1", ".account create test2 test2"])
:param bsm: ``boto_session_manager.BotoSesManager`` 对象, 定义了 AWS 权限.
:param server_id: AzerothCore 服务器的逻辑 ID, 命名规律为 "${env_name}-${server_name}",
例如 "sbx-blue"
:param request_like: 请参考
:class:`~acore_soap_app.agent.impl.SOAPRequest.batch_load`
:param username: 默认的用户名, 只有当 request.username 为 None 的时候才会用到.
:param password: 默认的密码, 只有当 request.password 为 None 的时候才会用到.
:param raises: 默认为 True. 如果为 True, 则在遇到错误时抛出异常. 反之则将
failed SOAP Response 原封不动地返回.
:param s3uri_input: 如果指定, 则将输入写入 S3. 常用于 Payload 比较大的情况.
如果你一次性发送的 request 大于 20 条, 则必须使用这个参数.
:param s3uri_output: 如果不指定, 则默认将输出作为 JSON 打印. 如果指定了 s3uri,
则将输出写入到 S3.
:param path_cli: EC2 上 acsoap 命令行工具的绝对路径.
:param sync: 同步和异步模式, 默认为同步模式
- 如果以同步模式运行, 则会等待 SSM Run Command 完成
- 如果以异步模式运行, 则会立刻返回一个 SSM Run Command 的 command id.
:param delays: 同步模式下的等待间隔
:param timeout: 同步模式下的超时限制
:param verbose: 同步模式下是否显示进度条
"""
# load requests
requests = SOAPRequest.batch_load(
request_like=request_like,
username=username,
password=password,
s3_client=bsm.s3_client,
)
# get ec2 instance id
server = Server(id=server_id)
server.refresh(ec2_client=bsm.ec2_client, rds_client=bsm.rds_client)
if server.is_running() is False:
raise EC2IsNotRunningError(f"EC2 {server_id!r} is not running")
instance_id = server.ec2_inst.id
# identify the run strategy
if len(requests) >= 20:
if s3uri_input is None:
raise ValueError(
"'s3uri_input' must be specified when the number of requests is greater than 20"
)
is_s3_input = True
else:
is_s3_input = s3uri_input is not None
if is_s3_input:
SOAPRequest.batch_dump_to_s3(
s3_client=bsm.s3_client,
instances=requests,
s3uri=s3uri_input,
)
commands = [
build_cli_arg_for_gm(
command=s3uri_input,
username=username,
password=password,
raises=raises,
s3uri_output=s3uri_output,
path_cli=path_cli,
)
]
else:
commands = [
build_cli_arg_for_gm(
command=request.command,
username=request.username,
password=request.password,
raises=raises,
s3uri_output=s3uri_output,
path_cli=path_cli,
)
for request in requests
]
# run command
command = aws_ssm_run_command.better_boto.run_shell_script_async(
ssm_client=bsm.ssm_client,
commands=commands,
instance_ids=instance_id,
)
command_id = command.CommandId
if sync is False: # async mode, return immediately
return command_id
# sync mode, wait until command succeeded
time.sleep(1)
# get command response
command_invocation = aws_ssm_run_command.better_boto.wait_until_send_command_succeeded(
ssm_client=bsm.ssm_client,
command_id=command_id,
instance_id=instance_id,
raises=False,
delays=delays,
timeout=timeout,
verbose=verbose,
)
if command_invocation.ResponseCode != 0:
raise RunCommandError.from_command_invocation(command_invocation)
# parse response
if s3uri_output is None:
output = command_invocation.StandardOutputContent
lines = output.splitlines()
responses = [SOAPResponse.from_json(json_str) for json_str in lines]
else:
responses = SOAPResponse.batch_load_from_s3(
s3_client=bsm.s3_client, s3uri=s3uri_output
)
return responses