在传统开发中,总会有一种任务,它们一般无状态,独立运行并且耗时,于是我们不得不在将它们托付给其他进程进行处理。
但额外维护一个专门处理任务的程序麻烦且奢侈,它们并不总是在工作,反而需要你时刻关心它们空转时是不是占据了太多的服务器资源,或者是不是能够正确接受任务并处理任务。
至少在我的上一家公司里,一个核心的Django程序,附带启动了接近二十个不同的Celery进程,它们时不时的吞噬CPU和内存,令人头疼。虽然我认为是设计原因,但额外维持进程总是痛苦的。
阿里云的OSS和Serverless Function,给了我一个灵感。用它们组成一个分布式的任务处理系统岂不是美哉?(其实主要是因为我没钱,而这两都有很高的免费额度)
介绍
OSS(对象存储)
阿里云的对象存储服务,不仅能拿来存图片,还能存二进制数据(废话),是一个缺少功能的k-v数据库。
用它存任务队列,经济实惠,而且耐操。以阿里云的技术,OSS的读写性能足够供应绝大部分的业务需求。
Serverless Function
阿里云的无服务函数,提供了一个OSS触发器。当触发器所指定的OSS中被新建了一个对象,而对象的Key能匹配到此触发器,则会调用这个无服务函数。
使用它作为消费者,就不再需要考虑消费者集群是否存活,是否正常运行的问题。大大减少了运维成本。
并且让我这种穷人很心动的一点是无服务函数只有被调用才会计费,每个月有百万次的免费调用额度。一般中小型企业的用量也达不到百万次,对于个人开发者更是等于不要钱。
实践代码
Talk is cheap, show me the code.
下面以一个最简单的发送邮件为例。
预备步骤
首先肯定得把无服务函数与OSS服务开启,然后创建对应的两个RAM用户,分别拥有无服务函数的全部权限和OSS的全部权限。
然后建立一个OSS bucket,用于任务队列的读写。
编写代码
使用阿里云的fun工具,首先编写一个template.yml
用于一键部署。
ROSTemplateFormatVersion: '2015-09-01'
Transform: 'Aliyun::Serverless-2018-04-03'
Resources:
SendMail:
Type: 'Aliyun::Serverless::Function'
Properties:
Initializer: sendmail.init
InitializationTimeout: 10
Handler: sendmail.handler
Timeout: 60
Runtime: python3
CodeUri: './'
EnvironmentVariables:
OSS_BUCKET: 'your bucket name'
OSS_KEYID: ''
OSS_SECRET: ''
OSS_LINK: 'https://oss-cn-hongkong.aliyuncs.com'
EMAIL_SERVER_HOST: 'smtp server'
EMAIL_SERVER_PORT: port
EMAIL_USERNAME: 'your email username'
EMAIL_PASSWORD: 'your email password'
Events:
SendMail:
Type: OSS
Properties:
BucketName: your bucket name
Events:
- oss:ObjectCreated:*
Filter:
Key:
Prefix: mail/
Suffix: .task
配置都是什么意思,不一一解释。关键的配置在于Events
。
这里配置了SendMail这个无服务函数监听一个OSS bucket的创建事件,并且Filter
中指定,只有Key是以mail/
开头,.task
结束的对象被创建时,才会触发此函数。
再写一个sendmail.py
作为无服务函数进行部署
import os
import json
import typing
import pickle
import hashlib
import smtplib
import logging
from email.mime.text import MIMEText
from email.header import Header
import oss2
OSS_BUCKET = os.environ.get("OSS_BUCKET")
OSS_KEYID = os.environ.get("OSS_KEYID")
OSS_SECRET = os.environ.get("OSS_SECRET")
OSS_LINK = os.environ.get("OSS_LINK")
EMAIL_SERVER_HOST = os.environ.get("EMAIL_SERVER_HOST")
EMAIL_SERVER_PORT = int(os.environ.get("EMAIL_SERVER_PORT"))
EMAIL_USERNAME = os.environ.get("EMAIL_USERNAME")
EMAIL_PASSWORD = os.environ.get("EMAIL_PASSWORD")
smtpObj = None
def sendmail(nickname: str, to_email: str, subject: str, content: str) -> typing.Tuple[bool, typing.Optional[str]]:
"""发送邮件通知"""
global smtpObj
message = MIMEText(content, 'html', 'utf-8')
message['From'] = Header(f"Trotter<{EMAIL_USERNAME}>", 'utf-8')
message['To'] = Header(f'{nickname}<{to_email}>', 'utf-8')
message['Subject'] = Header(subject, 'utf-8')
try:
smtpObj.sendmail(EMAIL_USERNAME, to_email, message.as_string())
return True, None
except smtplib.SMTPException as e:
return False, str(e)
def init(context):
global smtpObj
smtpObj = smtplib.SMTP(EMAIL_SERVER_HOST, EMAIL_SERVER_PORT)
smtpObj.starttls()
smtpObj.login(EMAIL_USERNAME, EMAIL_PASSWORD)
def handler(event, context):
evt = json.loads(event)
bucket = oss2.Bucket(
oss2.Auth(OSS_KEYID, OSS_SECRET),
OSS_LINK,
OSS_BUCKET
)
object_name = evt['events'][0]['oss']['object']['key']
remote_stream = bucket.get_object(object_name)
remote_stream = remote_stream.read()
task = pickle.loads(remote_stream)
res, err = sendmail(**task)
if err:
logging.error(err)
if res:
bucket.delete_object(object_name)
由于没有使用任何需要安装的第三方库(oss2
是阿里无服务函数Python3 runtime自带的),所以不需要去管什么依赖安装。
最后fun deploy
一键部署上去,发送邮件的任务系统就弄完了。可能会有点让人诧异,但对于这种无状态的任务来说,无服务函数一键部署,就是这么简单。
那么接下来看看如何调用它去发送邮件。
import oss2
bucket = oss2.Bucket(
oss2.Auth(OSS_KEYID, OSS_SECRET),
OSS_LINK,
OSS_BUCKET
)
bucket.put_object(f'mail/test.task', pickle.dumps({
"nickname": 'Testing',
"to_email": Email Address,
"subject": "测试邮件",
"content": "仅仅是测试"
}))
如上代码在项目中使用时,应当封装成一个函数。