AberSheeran
Aber Sheeran

分布式任务处理

起笔自
共计 4603 个字符
落笔于

在传统开发中,总会有一种任务,它们一般无状态,独立运行并且耗时,于是我们不得不在将它们托付给其他进程进行处理。

但额外维护一个专门处理任务的程序麻烦且奢侈,它们并不总是在工作,反而需要你时刻关心它们空转时是不是占据了太多的服务器资源,或者是不是能够正确接受任务并处理任务。

至少在我的上一家公司里,一个核心的Django程序,附带启动了接近二十个不同的Celery进程,它们时不时的吞噬CPU和内存,令人头疼。虽然我认为是设计原因,但额外维持进程总是痛苦的。

阿里云的OSS和Serverless Function,给了我一个灵感。用它们组成一个分布式的任务处理系统岂不是美哉?(其实主要是因为我没钱,而这两都有很高的免费额度)

介绍

OSS(对象存储)

阿里云的对象存储服务,不仅能拿来存图片,还能存二进制数据(废话),是一个缺少功能的k-v数据库。

用它存任务队列,经济实惠,而且耐操。以阿里云的技术,OSS的读写性能足够供应绝大部分的业务需求。

Serverless Function

阿里云的无服务函数,提供了一个OSS触发器。当触发器所指定的OSS中被新建了一个对象,而对象的Key能匹配到此触发器,则会调用这个无服务函数。

使用它作为消费者,就不再需要考虑消费者集群是否存活,是否正常运行的问题。大大减少了运维成本。

并且让我这种穷人很心动的一点是无服务函数只有被调用才会计费,每个月有百万次的免费调用额度。一般中小型企业的用量也达不到百万次,对于个人开发者更是等于不要钱。

实践代码

Talk is cheap, show me the code.

下面以一个最简单的发送邮件为例。

预备步骤

首先肯定得把无服务函数与OSS服务开启,然后创建对应的两个RAM用户,分别拥有无服务函数的全部权限和OSS的全部权限。

然后建立一个OSS bucket,用于任务队列的读写。

编写代码

使用阿里云的fun工具,首先编写一个template.yml用于一键部署。

ROSTemplateFormatVersion: '2015-09-01'
Transform: 'Aliyun::Serverless-2018-04-03'
Resources:
    SendMail:
      Type: 'Aliyun::Serverless::Function'
      Properties:
        Initializer: sendmail.init
        InitializationTimeout: 10
        Handler: sendmail.handler
        Timeout: 60
        Runtime: python3
        CodeUri: './'
        EnvironmentVariables:
          OSS_BUCKET: 'your bucket name'
          OSS_KEYID: ''
          OSS_SECRET: ''
          OSS_LINK: 'https://oss-cn-hongkong.aliyuncs.com'
          EMAIL_SERVER_HOST: 'smtp server'
          EMAIL_SERVER_PORT: port
          EMAIL_USERNAME: 'your email username'
          EMAIL_PASSWORD: 'your email password'
      Events:
        SendMail:
          Type: OSS
          Properties:
            BucketName: your bucket name
            Events:
              - oss:ObjectCreated:*
            Filter:
              Key:
                  Prefix: mail/
                  Suffix: .task

配置都是什么意思,不一一解释。关键的配置在于Events

这里配置了SendMail这个无服务函数监听一个OSS bucket的创建事件,并且Filter中指定,只有Key是以mail/开头,.task结束的对象被创建时,才会触发此函数。

再写一个sendmail.py作为无服务函数进行部署

import os
import json
import typing
import pickle
import hashlib
import smtplib
import logging
from email.mime.text import MIMEText
from email.header import Header

import oss2

OSS_BUCKET = os.environ.get("OSS_BUCKET")
OSS_KEYID = os.environ.get("OSS_KEYID")
OSS_SECRET = os.environ.get("OSS_SECRET")
OSS_LINK = os.environ.get("OSS_LINK")
EMAIL_SERVER_HOST = os.environ.get("EMAIL_SERVER_HOST")
EMAIL_SERVER_PORT = int(os.environ.get("EMAIL_SERVER_PORT"))
EMAIL_USERNAME = os.environ.get("EMAIL_USERNAME")
EMAIL_PASSWORD = os.environ.get("EMAIL_PASSWORD")


smtpObj = None


def sendmail(nickname: str, to_email: str, subject: str, content: str) -> typing.Tuple[bool, typing.Optional[str]]:
    """发送邮件通知"""
    global smtpObj

    message = MIMEText(content, 'html', 'utf-8')
    message['From'] = Header(f"Trotter<{EMAIL_USERNAME}>", 'utf-8')
    message['To'] = Header(f'{nickname}<{to_email}>', 'utf-8')
    message['Subject'] = Header(subject, 'utf-8')

    try:
        smtpObj.sendmail(EMAIL_USERNAME, to_email, message.as_string())
        return True, None
    except smtplib.SMTPException as e:
        return False, str(e)


def init(context):
    global smtpObj
    smtpObj = smtplib.SMTP(EMAIL_SERVER_HOST, EMAIL_SERVER_PORT)
    smtpObj.starttls()
    smtpObj.login(EMAIL_USERNAME, EMAIL_PASSWORD)


def handler(event, context):
    evt = json.loads(event)
    bucket = oss2.Bucket(
        oss2.Auth(OSS_KEYID, OSS_SECRET),
        OSS_LINK,
        OSS_BUCKET
    )
    object_name = evt['events'][0]['oss']['object']['key']
    remote_stream = bucket.get_object(object_name)
    remote_stream = remote_stream.read()
    task = pickle.loads(remote_stream)
    res, err = sendmail(**task)
    if err:
        logging.error(err)
    if res:
        bucket.delete_object(object_name)

由于没有使用任何需要安装的第三方库(oss2是阿里无服务函数Python3 runtime自带的),所以不需要去管什么依赖安装。

最后fun deploy一键部署上去,发送邮件的任务系统就弄完了。可能会有点让人诧异,但对于这种无状态的任务来说,无服务函数一键部署,就是这么简单。

那么接下来看看如何调用它去发送邮件。

import oss2


bucket = oss2.Bucket(
    oss2.Auth(OSS_KEYID, OSS_SECRET),
    OSS_LINK,
    OSS_BUCKET
)

bucket.put_object(f'mail/test.task', pickle.dumps({
    "nickname": 'Testing',
    "to_email": Email Address,
    "subject": "测试邮件",
    "content": "仅仅是测试"
}))

如上代码在项目中使用时,应当封装成一个函数。

如果你觉得本文值得,不妨赏杯茶
没有上一篇
Sentry 消息转发