原文:annas-archive.org/md5/59885cc35c2ca63f9f092895c4c3c9b9

译者:飞龙

协议:CC BY-NC-SA 4.0

前言

联邦学习FL)正在成为人工智能领域的一个颠覆性技术,因为它经常被说成,在联邦学习框架中,需要移动的是机器学习模型而不是数据本身,这样智能才能持续发展和增长。因此,人们将 FL 称为以模型为中心的方法,相对于传统的以数据为中心的方法,因此被认为是一项变革性技术。以模型为中心的方法的理念可以创建一个以智能为中心的平台,开创智慧驱动的世界。

通过采用 FL,你可以克服大数据人工智能长期面临的一些挑战,如数据隐私、训练成本和效率,以及最新智能的交付延迟。然而,FL 并不是一个通过盲目聚合机器学习模型就能解决大数据所有问题的魔法解决方案。我们需要非常仔细地设计分布式系统和学习机制,以同步所有分布式学习过程并一致地综合所有本地训练的机器学习模型。这样,我们可以创建一个可持续且具有弹性的 FL 系统,即使在规模化的实际操作中也能持续运行。

因此,这本书不仅超越了描述联邦学习概念和理论方面的内容,正如许多研究项目中通过模拟器或原型引入的,这些原型在大多数与该领域相关的文献中都有介绍。相反,你将通过查看简化联邦学习系统的代码来了解整个设计和实现原则,以验证框架的工作原理和结果。

在阅读完这本书之后,你将创建第一个基于联邦学习的应用程序,该程序可以在本地和云环境中各种设置中安装和测试。

这本书面向谁

这本书是为那些想要了解如何利用 FL 创建机器学习应用程序的机器学习工程师、数据科学家和 AI 爱好者而写的。为了开始阅读这本书,你需要具备基本的 Python 编程知识和机器学习概念。

这本书涵盖的内容

第一章大数据和传统人工智能的挑战,主要关于指导你了解大数据系统和传统集中式机器学习方法的当前问题,以及联邦学习如何解决这些问题,例如数据隐私、偏差和孤岛、模型漂移和性能下降。这将为你深入了解联邦学习系统的设计和实现做好准备。

第二章, 什么是联邦学习?,继续介绍 FL,并帮助你通过当前分布式学习的趋势以及机器学习基础知识来理解 FL 的概念。你还将了解 FL 作为以模型为中心的机器学习方法在数据隐私和安全、效率以及可扩展性方面的优势。

第三章联邦学习系统的工作原理,为你提供了一个坚实的理解,了解 FL 系统将如何工作以及 FL 系统内部分布式组件之间的交互。你将了解 FL 系统的基本架构,包括其流程、状态转换以及向 FL 系统连续运行的消息序列。

第四章, 使用 Python 实现联邦学习服务器,指导你学习 FL 服务器端系统的实现原理,包括数据库服务器和聚合器模块。本章还讨论了 FL 组件之间的通信、模型聚合以及如何管理分布式学习代理和本地及全局模型的状态。

第五章, 联邦学习客户端实现,描述了 FL 客户端功能以及可以由本地机器学习引擎和进程使用的库。核心客户端功能包括将分布式学习代理注册到 FL 平台、全局模型接收以及本地模型上传和共享。

第六章, 运行联邦学习系统并分析结果,将帮助你运行简化的 FL 框架,以了解 FL 系统的行为以及流程,以及联邦平均的最标准模型聚合方法。你还将能够通过分析两个示例测试用例的结果来验证运行 FL 框架的结果。

第七章, 模型聚合,是理解模型聚合的重要章节,这是 FL 的基础。你将了解不同的 FL 场景特征如何要求不同的聚合方法,并应该对如何实现这些算法有一个概念。

第八章, 介绍现有的联邦学习框架,解释了现有的 FL 项目和方法,如 PySyft、TFF、Flower、OpenFL 和 STADLE,以及它们的 API。有许多具有不同设计哲学的有用 FL 项目,你将了解这些框架的功能和差异。

第九章联邦学习应用的关键用例案例研究,介绍了 FL 在不同行业中的主要用例。您将熟悉 FL 在医疗保健、金融行业、边缘计算、物联网以及大数据分布式学习等各个领域的应用,在这些领域,FL 显示出克服许多重要技术挑战的显著潜力。

第十章未来趋势与发展,描述了由持续的研究和开发驱动的联邦学习(FL)技术的未来方向。您将了解到智能互联网和以智慧为中心的平台的新视角。因此,您将准备好迎接集体智慧的世界。

附录探索内部库,概述了内部库,包括用于实现 FL 系统的枚举类、通信协议、数据结构处理器以及辅助和支持函数。

要充分利用本书

您需要在计算机上安装 Python 3.7+版本。为了轻松运行书中的代码示例,建议在 macOS 或 Linux 上安装 Anaconda 以创建虚拟环境。

本书涵盖的软件/硬件 操作系统要求
Python 3.7+ macOS 或 Linux
Anaconda 环境
GitHub

您可以在任何云环境中安装 GitHub 仓库中提供的服务器端代码,例如亚马逊网络服务AWS)或谷歌云平台GCP),并设置适当的安全设置以建立分布式学习环境。

如果您正在使用本书的数字版,我们建议您亲自输入代码或从本书的 GitHub 仓库(下一节中有一个链接)获取代码。这样做将帮助您避免与代码复制粘贴相关的任何潜在错误。

下载示例代码文件

您可以从 GitHub(github.com/PacktPublishing/Federated-Learning-with-Python)下载本书的示例代码文件。如果代码有更新,它将在 GitHub 仓库中更新。

我们还有其他丰富的图书和视频的代码包,可在github.com/PacktPublishing/找到。查看它们吧!

请检查github.com/tie-set/simple-fl仓库以获取最新代码。

注意

您可以使用代码文件用于个人或教育目的。请注意,我们不会支持商业部署,并且不对使用代码引起的任何错误、问题或损害负责。

下载彩色图像

我们还提供了一个包含本书中使用的截图和图表彩色图像的 PDF 文件。您可以从这里下载:packt.link/qh1su

使用的约定

本书使用了多种文本约定。

文本中的代码:表示文本中的代码单词、数据库表名、文件夹名、文件名、文件扩展名、路径名、虚拟 URL、用户输入和 Twitter 昵称。以下是一个示例:“服务器代码为 FL 进程导入了StateManagerAggregator。”

代码块设置如下:

import tensorflow as tf
from tensorflow import keras
from sst_model import SSTModel

任何命令行输入或输出都应如下编写:

fx envoy start -n envoy_1 - -disable-tls --envoy-config-path envoy_config_1.yaml -dh localhost -dp 50051

小贴士或重要提示

看起来像这样。

联系我们

我们始终欢迎读者的反馈。

一般反馈:如果您对本书的任何方面有疑问,请通过电子邮件发送至 customercare@packtpub.com,并在邮件主题中提及书名。

勘误表:尽管我们已经尽最大努力确保内容的准确性,但错误仍然可能发生。如果您在这本书中发现了错误,我们将不胜感激,如果您能向我们报告,我们将不胜感激。请访问www.packtpub.com/support/errata并填写表格。

copyright@packt.com,并附有链接到该材料。

如果您有兴趣成为作者:如果您在某个主题上具有专业知识,并且您有兴趣撰写或为书籍做出贡献,请访问authors.packtpub.com

分享您的想法

一旦您阅读了《使用 Python 进行联邦学习》,我们很乐意听听您的想法!请点击此处直接进入此书的亚马逊评论页面并分享您的反馈。

您的评论对我们和科技社区都很重要,并将帮助我们确保我们提供高质量的内容。

前言

下载本书的免费 PDF 副本

感谢您购买本书!

您喜欢在路上阅读,但又无法携带您的印刷书籍到处走吗?

您的电子书购买是否与您选择的设备不兼容?

别担心,现在,每购买一本 Packt 图书,您都可以免费获得该书的 DRM 免费 PDF 版本。

在任何地方、任何设备上阅读。直接从您最喜欢的技术书籍中搜索、复制和粘贴代码到您的应用程序中。

优惠活动远不止这些,您还可以获得独家折扣、时事通讯和每天收件箱中的优质免费内容。

按照以下简单步骤获取福利:

  1. 扫描下面的二维码或访问以下链接

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_QR_Free_PDF.jpg

packt.link/free-ebook/978-1-80324-710-6

  1. 提交您的购买证明

  2. 就这些!我们将直接将您的免费 PDF 和其他福利发送到您的电子邮件。

第一部分 联邦学习 – 概念基础

在本部分,你将了解大数据 AI 和集中式传统机器学习方法所面临的挑战,以及联邦学习FL)如何解决它们的主要问题。你将学习 FL 系统的基本概念和运作原理,以及一些机器学习基础知识、分布式系统和计算原则。

本部分包括以下章节:

  • 第一章, 大数据和传统 AI 的挑战

  • 第二章, 什么是联邦学习?

  • 第三章, 联邦学习系统的运作原理

第一章:大数据和传统 AI 的挑战

在本章中,我们将详细解释为什么联邦学习FL)将成为 2020 年代的关键技术。您将了解大数据是什么,以及从数据隐私、模型偏差和漂移的角度看,它是如何成为问题的。对这些问题及其解决方案的深入理解将激励您开始一段充满挑战的旅程,以获取相关的知识和技能,并使用以下章节来规划 FL 的掌握。阅读本章后,您将明显地看到人工智能AI)和机器学习ML)正在发生巨大的范式转变,这是由于公众和商业对当前对大数据导向系统的依赖表示担忧而发生的。无需多言,让我们出发吧!

本章将涵盖以下主题:

  • 理解大数据的本质

  • 数据隐私作为瓶颈

  • 训练数据和模型偏差的影响

  • 模型漂移和性能下降

  • FL 作为数据问题的主要解决方案

理解大数据的本质

在 Algorithmia 对 403 位商业领袖进行的2021 年机器学习企业趋势调查中,76%的企业将 AI 和 ML 置于其他 IT 计划之上。COVID-19 全球大流行迫使一些公司加快 AI 和 ML 的发展,正如他们的首席信息官CIO)所述,83%的调查机构在 AI 和 ML 方面的预算同比增长YoY),其中四分之一的增长超过 50%。客户体验改善和流程自动化,无论是通过增加收入还是降低成本,都是这一变化的主要驱动力。其他研究,包括 KPMG 的最新报告在 AI 世界中繁荣发展,本质上讲述的是同样的故事。

深度学习DL)为代表的 AI 和 ML 发展的持续热潮,得益于过去十年大数据的出现。有了 Apache 的开源软件工具 Hadoop 和 Spark,以及亚马逊网络服务AWS)、谷歌云平台GCP)和微软 Azure等云计算服务,私营和公共部门的企业都可以通过处理以前无法想象的大量数据来解决问题。公司和机构不再需要在开发数据分析和模型设计时过于谨慎,以确保相关数据以适当的格式存储。相反,他们可以将可用的原始数据简单地级联到他们的数据湖中,期待他们的数据科学家通过检查它们之间的相关性来发现后续有价值的变量。

大数据似乎是一系列问题的终极解决方案,但正如我们将在以下章节中看到的,它存在一些固有的问题。为了清楚地了解大数据可能存在的问题,让我们首先明确什么是大数据。

大数据的定义

大数据代表大量信息。这些信息现在正以指数级增长。如今,人类每天产生两千万亿字节的数据,这使得大数据变得如此庞大,以至于使用现有的传统数据管理工具来非常有效地处理大数据变得相当困难。以下列出的三个 V 通常用来定义大数据的特征:

  • 数量:来自各种来源的数据,如商业交易、物联网(IoT)设备、社交媒体、工业设备、视频等,都为数据的巨大数量做出了贡献。

  • 速度:数据速度也是大数据的一个基本特征。通常,数据需要实时或接近实时。

  • 种类:数据以所有格式出现,如数值数据、文本文档、图像、视频、电子邮件、音频、金融交易等。

以下截图描述了三个 V 在大数据中的交集:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_01_01.jpg

图 1.1 – 大数据的三个 V

在 1880 年,美国人口普查局从人口普查中收集了大量数据,并估计处理这些数据需要 8 年时间。第二年,一个名叫赫尔曼·霍勒里希的人发明了霍勒里希制表机,这减少了处理数据所需的工作。第一个数据中心建于 1965 年,用于存储指纹数据和税务信息。

大数据现在

数据湖这一概念的出现,在引领我们今天处理数据时看到的巨大规模方面发挥了关键作用。数据湖为公司存储在运营过程中观察到的任意类型的数据提供了完全的自由,消除了否则会阻止公司收集一些最终变得必要的数据的限制。虽然这种自由允许数据湖保持公司生成数据的最大潜力,但它也可能导致一个关键问题——对收集到的数据的理解上的自满。以非结构化方式存储不同类型数据的便利性实际上可能导致“先存储,后整理”的心态。与处理非结构化数据的真正困难实际上源于其处理过程;因此,延迟处理的心态有可能导致数据湖变得非常繁琐,难以筛选和处理,这是由于数据收集的无限制增长。

原始数据的价值仅在于从中可以提取的模型和洞察力。中心数据湖方法导致了一些情况,其中从数据中提取的洞察力受到缺乏结构的限制,从而引发从存储效率低下到由于提取困难导致的实际智能效率低下等一系列问题。另一方面,先于数据湖的方法则因无法访问潜在的大量数据而受到简单限制。FL 允许避免这两类问题,这是 FL 作为推动大数据进入集体智能时代载体的关键驱动支持。

这一主张得到了 FL 将大数据流程从收集→提取智能转变为提取智能→收集的事实支持。对于人类来说,智能可以被视为大量经验的浓缩形式。以类似的方式,在数据源处提取智能——通过在源位置对数据进行训练——简洁地总结了数据,以最大化其实际应用的易用性格式。FL 的后期收集步骤导致在最大数据访问和数据存储效率下创建所需的全球智能。即使是对生成数据源的局部使用,也可以通过大量减少进入剩余数据湖的数据格式数量,从智能和数据联合存储中大大受益。

大数据的 AAA 心态

尽管已经提出了许多定义,强调不同的方面,牛津大学教授维克托·迈尔-舍恩伯格和《经济学人》高级编辑肯尼思·库克耶在 2013 年的国际畅销书《大数据:一场将改变我们生活、工作和思考方式的革命?》中,巧妙地阐明了大数据的本质。大数据并非关于服务器中数据的规模;大数据是关于三个相互关联的心态转变,这些转变相互强化。他们的论点归结为我们可以总结并称之为大数据的AAA 心态,它包括观察的丰富性、对混乱的接受和因果关系的模棱两可。让我们逐一看看它们。

观察的丰富性

从列和行或文件大小来看,大数据不必“大”。大数据有多个观察值,通常称为n,接近或等于感兴趣人群的大小。在传统统计学中,收集整个人群的数据——例如,纽约对健身感兴趣的人——是不可能的或不可行的,研究人员必须从人群中随机选择样本——例如,1000 名对健身感兴趣的纽约人。随机抽样往往很难进行,而且对特定子组的狭窄关注也是难以证明的:在健身房周围调查的人会错过在公园跑步和在家的瑜伽练习者,为什么是健身房会员而不是跑步者和瑜伽爱好者?然而,由于信息和通信技术ICT)系统的发展和复杂化,今天的研究人员可以通过多个来源访问大约所有人的数据——例如,关于健身的谷歌搜索记录。这种丰富性n = all的范式是有利的,因为数据所表达的内容可以解释为关于人群的真实陈述,而旧方法只能以显著水平的信心推断这种真理,通常以p 值表示,通常假设小于 0.05。小数据提供统计数据;大数据证明状态。

对杂乱程度的接受

大数据往往比较杂乱。如果我们用谷歌搜索数据作为某人兴趣的代理——例如——我们可能会错误地将他们设备上家人或朋友进行的某些搜索归因于他们,这样估计的兴趣程度将不准确,程度取决于这种非自有设备搜索的比例。在某些设备上,大量的搜索可能由多个用户进行,例如办公室的共用电脑或属于尚未拥有手机的孩子的智能手机。否则,人们可能会搜索在与其他人交谈中出现的词语,而不是自言自语,这并不一定反映他们自己的兴趣。在采用传统方法的研究中,研究人员必须确保这些设备不包括在他们的样本数据中,因为这种“杂乱”会显著影响推理的质量,因为观察的数量会很少。但在大数据研究中并非如此。随着观察数量的增加,其影响会相应减小,直到达到n = all。在大多数设备上,谷歌搜索通常由所有者自主进行,其他上下文中的搜索影响不大。

因果关系的矛盾

大数据通常用于研究相关性而不是因果关系——换句话说,它通常只能告诉我们是什么,而不能告诉我们为什么。对于许多实际问题,仅仅相关性就能提供答案。Mayer-Schönberger 和 Cukier 在《大数据:一场将改变我们生活、工作和思考方式的革命》一书中给出了几个例子,其中之一是 2011 年建立的 Fair Isaac Corporation 的Medication Adherence Score。在人们的行为模式被数据化的时代,收集对感兴趣变量的所有观察结果(n = all)是可能的,并且在这些变量之间发现的关联足够强大,足以指导我们的决策。我们不需要知道人们的一致性从众性的心理分数,这些分数导致他们遵守医疗处方;通过观察他们在生活中的其他方面的行为,我们可以预测他们是否会遵守处方。

通过拥抱丰饶、接受和矛盾的三重心态,企业和政府已经在从定价服务到推荐产品、优化运输路线和识别犯罪嫌疑人的任务中产生了智能。然而,这种心态在近年来受到了挑战,以下章节将展示这一点。首先,让我们简要了解一下通常被视为理所当然的观察结果的丰饶性目前正面临压力。

数据隐私作为瓶颈

FL 经常被认为是最受欢迎的隐私保护人工智能技术之一,因为生成高质量智能不需要收集或与第三方实体共享隐私数据。因此,在本节中,我们讨论 FL 试图解决以创造高质量智能的数据隐私瓶颈问题。

什么是数据隐私?2021 年 5 月,HCA Healthcare 宣布公司与谷歌达成协议,共享其患者记录和实时医疗数据。各种媒体迅速作出反应,警告公众关于这笔交易,因为谷歌因其在Project Nightingale中的行为而被提及,据称这家科技巨头利用了数百万美国患者的敏感数据。根据皮尤研究中心 2019 年的一项民意调查,超过 80%的公众认为公司收集数据的潜在风险超过了好处,因此如此规模的数据共享项目自然被视为对人们数据隐私的威胁。

数据隐私,也称为信息隐私,是个人控制其个人信息使用方式的权利,这要求第三方在法律允许的范围内妥善处理、处理、存储和使用此类信息。它常与数据安全混淆,数据安全确保数据准确、可靠,并且仅对授权用户可访问。在谷歌账户的情况下,数据隐私规定了公司如何使用账户持有者的信息,而数据安全则要求他们部署诸如密码保护和两步验证等措施。在解释这两个概念时,数据隐私经理使用了一个关于安全和隐私的比喻:安全是一个前提,而隐私则像一扇窗户和一扇窗帘:数据安全是数据隐私的前提。结合起来,它们构成了数据保护,如下面的图表所示:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_01_02.jpg

图 1.2 – 数据安全与数据隐私对比

从前面的图表中我们可以看出,虽然数据安全限制了谁可以访问数据,但数据隐私限制了数据中可以包含什么。理解这种区别非常重要,因为数据隐私可以放大数据安全失败的影响。让我们看看这是如何发生的。

处理私人数据的风险

数据保护失败代价高昂。根据 IBM 的数据泄露成本报告 2021 年,2021 年全球数据泄露的平均成本为美元USD)4240 万美元,比前一年的 3860 万美元高得多,并且是报告 17 年历史中的最高金额;COVID-19 疫情爆发后远程工作人数的增加被认为是这一激增的主要原因。平均总成本最高的五个行业是医疗保健、金融、制药、技术和能源。该年度近一半的数据泄露包括了客户的个人可识别信息PII),平均每条记录的成本为 180 美元。一旦客户的 PII 遭到泄露,就会随之而来系统响应期间的系统停机、客户流失、需要获取新客户、声誉损失和商誉下降等负面影响;因此,成本高昂。

IBM 的研究还发现,未能遵守数据保护法规是放大数据泄露成本的主要因素之一(www.ibm.com/downloads/cas/ojdvqgry)。

增加的数据保护法规

随着技术的进步,保护客户数据的需求变得更加重要。消费者在每次交易中都要求并期望得到隐私保护;许多简单的活动都可能危及个人数据,无论是网上银行还是使用手机应用。

全世界各国政府最初对制定法律和法规以保护个人数据免受身份盗窃、网络犯罪和数据隐私侵犯的反应都比较缓慢。然而,现在情况正在改变,因为全球数据保护法律开始成形。

规章法规增加的几个驱动因素包括大量数据的增长,我们需要更多的数据安全和隐私保护来防止用户遭受诸如身份盗窃等恶意活动。

让我们以下面的子部分来看看一些旨在保护数据隐私的措施。

通用数据保护条例(GDPR)

欧洲联盟的通用数据保护条例GDPR)被认为是现代数据经济中的第一项数据保护法规,并被许多国家效仿以制定自己的法规。GDPR 于 2012 年提出,2016 年由欧盟理事会和议会通过,并于 2018 年 5 月生效。它取代了 1995 年通过的《数据保护指令》。

GDPR 之所以具有划时代意义,在于其强调对个人身份信息(PII)的保护,包括人们的姓名、位置、种族或民族起源、政治或性取向、宗教信仰、协会会员资格以及基因/生物特征/健康信息。欧盟内外组织和个人在处理欧盟居民的个人信息时都必须遵守该法规。GDPR 有七个原则,其中六个是从数据保护指令继承而来的;新的原则是问责制,要求数据使用者保持关于个人数据使用目的和程序的文档。

GDPR 向公众展示了其违规的后果。根据违规的严重程度,GDPR 的罚款可以从全球年营业额的 2%或 1000 万欧元(以较高者为准),或者全球年营业额的 4%或 2000 万欧元(以较高者为准)。2018 年 5 月,成千上万的欧洲人通过法国组织 La Quadrature du Net(也称为英文中的Squaring the Net)对亚马逊公司提起诉讼,指控该公司未经客户同意使用其广告定位系统。经过 3 年的调查,卢森堡的国家数据保护委员会CNDP)成为全球新闻头条:它对亚马逊开出 7.46 亿欧元的罚款。同样,WhatsApp 在 2021 年 9 月因 GDPR 违规被爱尔兰数据保护委员会罚款;再次,调查历时 3 年,罚款金额为 2.25 亿欧元。

目前,在美国,大多数州已经实施了隐私保护措施,或者很快将实施。此外,一些州已经加强了现有法规,例如加利福尼亚州、科罗拉多州和弗吉尼亚州。让我们逐一了解这些变化。

加利福尼亚消费者隐私法案(CCPA)

加利福尼亚州随后效仿。加利福尼亚消费者隐私法CCPA)于 2020 年 1 月 1 日起生效。正如其名所示,该法规的目的是像 GDPR 一样保护消费者的个人身份信息(PII)。与 GDPR 相比,CCPA 的适用范围显著缩小。CCPA 仅适用于每年从超过 50,000 个点(该州的居民、家庭或设备)收集数据、年营收超过 2500 万美元或通过出售此类信息获得其一半年度收入的营利性组织。然而,CCPA 违规的成本可能比 GDPR 违规的成本高得多,因为前者对其罚款没有上限(每条无意违规罚款 2500 美元;每条故意违规罚款 7500 美元)。

科罗拉多隐私法(CPA)

根据科罗拉多隐私法CPA),从 2024 年 7 月 1 日起,数据收集者和控制者必须遵循用户为生成定向广告和销售所选的通用退出规则。这项规则保护科罗拉多州的居民免受定向销售和广告的侵扰,以及某些类型的用户画像。

弗吉尼亚消费者数据保护法(CDPA)

弗吉尼亚州的消费者数据保护法CDPA)将于 2023 年 1 月 1 日带来多项变更,以增强安全和隐私。这些变更将适用于在弗吉尼亚州或与弗吉尼亚州居民进行业务往来的组织。数据收集者需要获得批准才能利用其私人数据。这些变更还试图确定 AI 供应商的隐私和安全是否充分,这可能需要删除这些数据。

这些只是美国数据法规将如何形成的几个简单例子。这对世界其他地区意味着什么呢?有些人估计,到 2024 年,全球 75%的人口将受到一种或多种隐私法规的保护。

另一个主要数据保护法规的例子是巴西的通用个人数据保护法LGPD),该法自 2020 年 9 月起生效。它取代了该国与数据隐私相关的数十项法律。LGPD 模仿了 GDPR,其内容几乎相同。在亚洲,日本是第一个引入数据保护法规的国家:2003 年通过的个人信息保护法APPI)于 2015 年进行了修订。2022 年 4 月,APPI 的最新版本生效,以应对对数据隐私的现代担忧。

佛罗里达州(FL)已被认定为一种关键技术,它能够与不同领域的隐私法规和监管合规性良好地协同工作。

从隐私设计到数据简约主义

组织已经适应了这些规定。TrustArc 的2021 年全球隐私基准调查发现,拥有专门隐私办公室的企业数量正在增加:调查中有 83%的受访者拥有隐私办公室,而 2020 年的比例仅为 67%。85%的企业已经实施了战略性和可报告的隐私管理计划,但其中 73%的人认为他们可以做得更多来保护隐私。他们的热情并不令人惊讶,因为 34%的受访者声称他们在过去 3 年内遭遇过数据泄露,其代价高昂的后果在本章中已提及。隐私办公室将由数据保护官DPO)领导,负责公司的数据保护影响评估DPIA),以确保遵守如 GDPR 等要求问责制和个人数据处理文件化的法规。DPO 还负责监督并确保其组织按照法律处理个人数据,而高层管理和董事会应提供必要的支持和资源,以便 DPO 完成其任务。

面对 GDPR,当前数据保护的趋势正在转向数据最小化。在此背景下,数据最小化并不一定鼓励最小化数据的大小;它更直接地涉及最小化数据中的 PII 因素,以便个人不能通过其数据点被识别。因此,数据最小化影响了 AI 行业在创建高性能 AI 应用方面的能力,因为 ML 过程中数据种类的短缺仅仅产生了性能不令人满意的 ML 模型偏差。

本章开头介绍的大数据丰富心态因此受到了公众对数据隐私的关注所约束。违反数据保护法规而被罚款的风险,加上拥有数据坟墓的浪费成本,呼吁实践数据最小化而不是数据丰富。

正因如此,联邦学习(FL)正成为许多 AI 解决方案提供商的必备解决方案,例如医疗行业,它们正在努力应对公众对隐私的关注和数据隐私问题,这基本上成为了一个问题,当第三方实体需要收集私人数据以改善 ML 模型及其应用的质量时。如前所述,FL 是一个有希望的隐私保护 AI 框架,因为数据的学习可以在任何地方进行;即使数据对 AI 服务提供商不可用,我们只需以一致的方式收集和汇总训练好的 ML 模型即可。

现在,让我们考虑大数据 Triple-A 思维模式面临的另一个挑战:接受杂乱无章的数据。

训练数据和模型偏差的影响

大数据量的纯粹数量消除了“垃圾输入,垃圾输出”的险恶现实。或者,是这样吗?事实上,只有当足够的数据可以从各种来源和分布中完全学习,而不造成学习结果的任何偏差时,数据的杂乱无章才能被接受。实际上,在集中位置对大数据进行训练确实需要大量的时间和巨大的计算资源及存储空间。此外,我们可能还需要找到方法来衡量和减少模型偏差,而无需直接收集和访问敏感和私人数据,这可能会与之前讨论的一些隐私法规相冲突。联邦学习(FL)也有分布式和协作学习的方面,这对于消除数据偏差和模型偏差以吸收数据的杂乱无章变得至关重要。通过协作和分布式学习,我们可以显著提高整个学习过程的数据可访问性和效率,这个过程通常既昂贵又耗时。这给了我们突破大数据训练过去所有限制的机遇,以下章节将进行讨论。

大数据昂贵的训练

根据报告:https://www.flexera.com/blog/cloud/cloud-computing-trends-2022-state-of-the-cloud-report,37%的企业每年花费超过 1200 万美元,80%的企业每年花费超过 120 万美元用于公共云。云上的训练成本并不便宜,并且可以假设随着对人工智能和机器学习的需求增加,这种成本将会显著增加。有时,由于以下问题,大数据无法完全用于机器学习训练:

  • 大数据存储:大数据存储是一种计算和存储架构,用于收集和管理大量数据集,以供人工智能应用或实时分析使用。全球企业公司仅为了云存储和数据中心成本就支付了超过 1000 亿美元(https://a16z.com/2021/05/27/cost-of-cloud-paradox-market-cap-cloud-lifecycle-scale-growth-repatriation-optimization/)。虽然其中一些数据集对于它们提供的应用至关重要,但它们真正想要的通常是能够从数据中提取的商业智能,而不仅仅是数据本身。

  • 显著的训练时间:构建和训练一个可以交付为真实产品的机器学习模型基本上需要大量的时间,这不仅包括训练过程,还包括机器学习管道的准备。因此,在许多情况下,智能的真正价值在机器学习模型交付时就已经丧失了。

  • 巨大的计算量:机器学习模型的训练通常消耗大量的计算资源。例如,使用机械手操作如魔方等部件的机器学习任务有时可能需要超过 1000 台计算机。仅运行一些专门的图形芯片几个月,就可能需要十几台机器。

  • 通信延迟:为了形成大数据,特别是在云中,需要将大量数据传输到服务器,这本身就会造成通信延迟。在大多数用例中,FL 需要从本地设备或学习环境传输到服务器(称为聚合器)的数据要少得多,该服务器用于综合从这些设备收集的本地 ML 模型。

  • 可扩展性:在传统的集中式系统中,由于大数据的复杂性和其昂贵的基础设施(如云服务器环境中的大量存储和计算资源),可扩展性成为一个问题。在 FL 服务器中,仅进行聚合以综合训练多个本地模型以更新全局模型。因此,当 ML 训练以分布式方式在边缘设备上进行时,系统和学习可扩展性显著提高,而不仅仅是单个集中式学习服务器。

FL 有效地利用了可用于 ML 模型轻量级训练的分布式计算资源。无论是实际物理设备上的训练还是云系统虚拟实例上的训练,将模型训练过程并行化到分布式环境中通常可以加速学习本身的速度。

此外,一旦收集了训练好的模型,FL 系统可以快速地将它们综合起来,生成一个更新的 ML 模型,称为全局模型,该模型在边缘侧吸收了足够的经验,因此实现近乎实时的智能传递成为可能。

模型偏差和训练数据

当 ML 算法由于 ML 过程中的错误假设而产生系统性的偏见结果时,就会发生 ML 偏差。ML 偏差有时也被称为算法偏差或 AI 偏差。

2018 年图灵奖得主,因其在深度学习发展方面的杰出贡献而获奖的 Yann LeCun 说:“当数据有偏差时,ML 系统是有偏差的” (twitter.com/ylecun/status/1274782757907030016)。这来自于 Nvidia 团队编译的Flickr-Faces-HQ数据集。基于人脸上采样系统,许多人被分类为白色,因为网络是在主要包含白人图片的Flickr-Faces-HQ数据上预训练的。对于这种误分类人群的问题,模型的架构并不是导致这种输出的关键问题。因此,结论是,一个种族偏差的数据集生成了一个中立的模型来产生偏差的结果。

关于 AI 和机器学习偏差的富有成效的对话是由谷歌前 AI 伦理负责人引领的。2018 年发表的性别阴影论文揭示了主要面部识别模型中的种族和性别偏差,国会的立法者寻求禁止美国联邦政府使用这项技术。包括亚马逊、IBM 和微软在内的科技公司也同意暂停或终止向警方销售面部识别模型。他们鼓励采取干预主义的数据收集方法,建议科学家和工程师明确模型开发的目标,制定严格的数据收集政策,并对收集到的数据进行彻底评估,以避免偏差——详细信息可在FATE/CV网站上找到(https://sites.google.com/view/fatecv-tutorial/home)。

FL 可能是克服数据孤岛问题的最有前途的机器学习技术之一。很多时候,数据甚至无法访问或用于训练,导致数据和模型存在重大偏差。自然地,FL 通过解决数据隐私和孤岛问题,这些问题成为避免数据偏差的瓶颈,因此 FL 对于克服偏差非常有用。在这种情况下,FL 正在成为大数据服务和应用的突破性实现,这在arxiv.org/pdf/2110.04160.pdf中得到了彻底的研究。

此外,还有一些技术试图在 FL 本身中减轻模型偏差,例如重新加权偏见消除器,这些都在arxiv.org/pdf/2012.02447.pdf中详细说明。

模型漂移和性能退化

模型漂移通常是指由于数据以及输入和输出变量之间关系的变化而导致机器学习模型性能下降,也称为模型退化。模型漂移可以通过持续学习来应对,以适应数据集或环境中的最新变化,几乎在实时进行。FL 的一个重要方面是通过在本地分布式环境中学习发生时即时更新机器学习模型,以实现一个连续学习框架。这样,FL 可以解决企业 AI 应用中常见的智能在交付生产时变得无用的状况。

我们现在将讨论模型如何退化或停止工作,以及一些当前模型运维ModelOps)的努力,以持续提高模型性能并实现可持续的 AI 运营。

模型如何停止工作

任何具有固定参数或权重的 AI 和 ML 模型,这些参数或权重是从训练数据中生成的,并调整到测试数据,当在模型接收的数据类似于训练和测试数据的环境中部署时,可以表现相当好。如果一个自动驾驶模型在晴朗的白天记录的数据上得到了良好的训练,那么该模型可以在晴天安全地驾驶车辆,因为它正在做它被训练去做的事情。然而,在一个雨天晚上,如果车辆是自动驾驶的,那么没有人应该坐在或靠近车辆:模型被喂以完全陌生的、黑暗和模糊的图像;其决策将完全不可靠。在这种情况下,模型的决策将偏离轨道,因此得名模型漂移。再次强调,如果模型在类似于训练和测试环境的环境中部署,并且环境在一段时间内没有显著变化,那么模型漂移不太可能发生。但在许多商业情况下,这个假设并不总是成立,模型漂移成为一个严重的问题。

模型漂移有两种类型:数据漂移概念漂移。数据漂移发生在部署的模型输入数据与模型训练数据显著不同时。换句话说,数据分布的变化是数据漂移的原因。上述日间自动驾驶模型在夜间表现不佳就是一个数据漂移的例子。另一个例子是,在加利福尼亚州训练的冰淇淋销售预测模型在新西兰部署;南半球的季节性与北半球相反,因此预计的冰淇淋销量在夏季会低,在冬季会高,这与实际销量相反。

相反,概念漂移是由于变量之间相关性变化的结果。在统计学的术语中,这意味着数据生成过程已经改变。这正是Google Flu Trends (GFT)所遭遇的,正如《潜行者经济学家》一书的作者在以下《金融时报》文章中所述:https://www.ft.com/content/21a6e7d8-b479-11e3-a09a-00144feabdc0#axzz30qfdzLCB。

在此时期之前,搜索查询与流感传播之间存在有意义的关联,因为主要是那些怀疑自己感染了流感的人会在浏览器中输入这些词语,因此模型运作成功。但在 2013 年,这种情况可能不再成立,因为其他类别的人,例如那些对潜在的大流行病持预防态度的人或只是好奇的人,也在搜索这些词语,他们可能是由谷歌的推荐引导去搜索的。这种概念漂移可能导致 GFT 高估了与疾病控制与预防中心(Centers for Disease Control and PreventionCDC)提供的医疗报告相比的传播情况。

不论是通过数据还是通过概念,模型漂移会导致模型性能下降,这是由于我们关注相关性所致。在数据科学领域的术语中,“ground truth”并不像物理学和化学等硬科学中的普遍真理那样,即因果关系。它仅仅是对给定数据中变量在特定环境中如何相互关联的真实陈述,并且不能保证当环境发生变化或不同时,相关性仍然存在。也就是说,我们估计的“ground truth”可能随时间和地点而变化,就像“ground”在历史上和地理上被地震事件重塑一样。

持续监控——放弃因果关系的代价

在 Redis Labs(https://venturebeat.com/business/redis-survey-finds-ai-is-stressing-it-infrastructure-to-breaking-point/)委托的一项调查中,大约一半的受访者将模型可靠性(48%)、模型性能(44%)、随时间变化的准确性(57%)和运行模型的延迟(51%)列为部署模型时面临的主要挑战。鉴于模型漂移的风险和担忧,AI 和机器学习模型的相关方在部署后需要完成两项额外的工作。首先,必须持续监控模型性能以检测模型漂移。数据漂移和概念漂移可能逐渐或突然发生。一旦检测到模型漂移,就需要使用新的训练数据重新训练模型,当发生概念漂移时,甚至可能需要使用新的模型架构来升级模型。

为了解决这些需求,一个新的机器学习原则,称为持续交付机器学习(Continuous Delivery for Machine Learning,简称CD4ML)已被提出。在 CD4ML 的框架中,模型首先使用训练数据进行编码和训练。然后,使用单独的数据集对模型进行测试,并根据某些指标进行评估,通常情况下,从多个候选模型中选择最佳模型。接下来,选定的模型通过进一步的测试进行生产化,以确保模型在部署后表现良好,一旦通过测试,它就会被部署。在这里,监控过程开始。当观察到模型漂移时,模型将使用新数据重新训练或根据漂移的严重程度给予新的架构。如果你熟悉软件工程,你可能会注意到 CD4ML 是机器学习领域对持续集成/持续交付(Continuous Integration/Continuous Delivery,简称CI/CD**)的采用。在类似的方式中,源自开发-运维(Development-Operations,简称DevOps软件工程框架的 AI 和机器学习操作框架ModelOps正在获得越来越多的关注。ModelOps 连接了机器学习操作(Machine Learning Operations,简称MLOps:数据工程与数据科学的集成**)和应用程序工程;它可以被视为 CD4ML 的推动者。

大数据“AAA 思维模式”的第三个因素让我们能够专注于相关性,并在过去十年中帮助快速构建 AI 和 ML 模型。发现相关性比发现因果关系要容易得多。对于许多多年来一直在告诉我们从人们的谷歌搜索模式中需要知道什么的人工智能和机器学习模型,我们必须检查它们今天是否仍然有效。明天也是如此。

正因如此,联邦学习(FL)是持续学习的重要方法之一。在创建和运营 FL 系统时,还重要的是要开发具有 ModelOps 功能的系统,因为 FL 的关键作用是以协作方式不断从各种学习环境中改进模型。甚至可以使用 FL 实现一个众包学习框架,以便平台上的用户可以将所需的 ML 模型带到本地进行适应和训练,并将更新的模型返回给 FL 服务器,由聚合器处理。通过一个高级模型聚合框架来过滤掉可能降低当前模型性能的有毒 ML 模型,FL 可以持续整合其他学习成果,从而实现一个可持续的持续学习操作,这对于具有 ModelOps 功能的平台至关重要。

FL 作为数据问题的主要解决方案

到目前为止,在本章中,我们确认了大数据存在需要解决的问题。必须保护数据隐私,不仅是为了保护个人,也是为了保护可能面临数据泄露和随之而来的罚款的数据用户。一组大数据中的偏差可以通过代理对推断产生重大影响,即使省略了关于性别和种族的因素,而且专注于相关性而不是因果关系使得预测模型容易受到模型漂移的影响。

在这里,让我们从架构、流程、问题和好处等方面讨论传统大数据 ML 系统和 FL 系统之间的区别。以下图表展示了传统大数据 ML 系统和 FL 系统之间的视觉比较:

![Figure 1.3 – 传统大数据 ML 系统和 FL 系统比较]

![img/B18369_01_03.jpg]

图 1.3 – 传统大数据 ML 系统和 FL 系统比较

在传统的数据系统中,数据被收集以创建大型数据存储。这些大型数据存储用于使用机器学习解决特定问题。由于训练数据量庞大,产生的模型显示出强大的泛化能力,并最终被部署。

然而,持续的数据收集需要大量的通信带宽。在注重隐私的应用中,数据的传输可能完全被禁止,这使得模型创建成为不可能。在大数据存储上训练大型 ML 模型计算成本高昂,传统的集中式训练效率受限于单机性能。缓慢的训练过程导致增量模型更新之间的延迟时间过长,导致无法灵活地适应新的数据趋势。

另一方面,在联邦学习系统中,机器学习训练直接在数据所在地进行。生成的训练模型被收集到中央服务器上。使用聚合算法从收集到的模型中生成一个聚合模型。聚合模型被发送回数据所在地进行进一步训练。

联邦学习方法通常需要在分布式系统设置中设置和维护训练性能,这会产生额外的开销。然而,即使架构和设置稍微复杂一些,也有其卓越的益处。训练在数据所在地进行,因此数据永远不会被传输,从而维护数据隐私。训练可以在多个节点上异步进行,这导致高效且易于扩展的分布式学习。仅在服务器和节点之间传输模型权重,因此联邦学习在通信方面是高效的。高级聚合算法甚至可以在受限场景中保持训练性能,并在标准机器学习场景中提高效率。

大多数人工智能项目似乎都无法交付,或者根本无法达到预期效果。要交付一个真正的人工智能应用和产品,之前讨论的所有问题都需要认真考虑。显然,联邦学习(FL)与其他关键技术的结合,用于处理机器学习(ML)管道和引擎处理的数据,正逐渐成为解决数据相关问题的连续和协作的关键解决方案。

我们如何利用人工智能和机器学习的力量,优化整个社会的技术系统——即,在数据最小化和道德的基础上,带来更加快乐、舒适、便捷和安全的世界,并持续进行改进?我们认为关键在于一个集体智慧以智能为中心的平台,这在第十章“未来趋势和发展”中也有讨论。在本书的后续章节中,我们介绍了联邦学习系统的概念、设计和实现,作为一种有前景的技术,用于通过人工智能和机器学习模型的网络来协调集体智慧,以满足之前讨论的需求。

摘要

本章概述了联邦学习如何通过首先理解大数据的定义及其本质,包括大量的观察、接受混乱和因果关系的矛盾性,来潜在地解决许多大数据问题。

我们从许多地区了解了隐私法规的多种形式,以及数据泄露和隐私侵犯的风险,这些最终导致利润损失,以及创建真正人工智能应用的瓶颈。联邦学习,按照设计,不会收集任何原始数据,可以保护数据隐私并遵守这些法规。

此外,使用 FL 框架,我们可以减少影响 ML 模型性能的固有偏见,并通过持续学习框架最小化模型漂移。因此,需要一个基于 FL 的分布式和协作学习框架,以实现更经济高效的方法。

这本导论章节以联邦学习(FL)作为解决上述大数据问题的主要解决方案的潜力结束,这一解决方案基于集体智能的范式转变思想,这种思想有可能取代当前主流的数据中心平台。

在下一章中,我们将看到联邦学习(FL)在数据科学领域中的位置以及它如何开启机器学习(ML)的新时代。

进一步阅读

要了解更多关于本章所涉及的主题,请参阅以下参考文献:

  • Algorithmia. (2021 年). 2021 年机器学习企业趋势. 西雅图:Algorithmia.

  • Mayer-Schönberger, V.Cukier, K. (2013 年). 大数据:一场将改变我们生活、工作和思考方式的革命. 波士顿/纽约:Eamon Dolan/Houghton Mifflin Harcourt.

  • 《经济学人》. (2010 年 2 月 27 日). 数据无处不在. 《经济学人》.

  • 数据隐私经理. (2021 年 10 月 1 日). 数据隐私与数据安全[定义和比较]. 《数据隐私经理》.

  • IBM. (2021 年). 2021 年数据泄露成本报告. 纽约:IBM.

  • Burgess, M. (2020 年 3 月 24 日). 什么是 GDPR?英国 GDPR 合规的总结指南. 《Wired》.

  • TrustArc. (2021 年). 2021 年全球隐私基准调查. 沃尔纳特克里克:TrustArc.

  • Auxier, B.Rainie, L.Anderson, M.Perrin, A.Kumar, M.Turner, E. (2019 年 11 月 15 日). 美国人和隐私:担忧、困惑,感觉无法控制他们的个人信息. 皮尤研究中心.

  • Hes, R.Borking, J. (1995 年). 隐私增强技术:通往匿名之路. 荷兰海牙:安大略省信息和隐私专员.

  • Goldsteen, A.Ezov, G.Shmelkin, R.Moffie, M.Farkash, A. (2021 年). 机器学习模型中 GDPR 合规的数据最小化. 《AI 和伦理》,1-15 页。

  • Knight, W. (2019 年 11 月 19 日). 苹果卡没有“看到”性别——这就是问题所在. 《Wired》.

  • Gebru, T.Denton, E. (2020 年). CVPR 2020 计算机视觉公平性、责任、透明度和伦理教程. 在线可用:sites.google.com/view/fatecv-tutorial/home.

  • Ukanwa, K. (2021 年 5 月 3 日). 算法偏见不仅不公平,而且对商业有害. 《波士顿环球报》.

  • O’Neil, C. (2016 年). 数学破坏武器:大数据如何加剧不平等并威胁民主. 纽约:Crown.

  • Blackman, R. (2020 年 10 月 15 日). 构建道德 AI 的实用指南. 《哈佛商业评论》.

  • Ginsberg, J.Mohebbi, M.Patel, R.Brammer, L.Smolinski, M. S.Brilliant, L. (2009). 利用搜索引擎查询数据检测流感大流行自然,第 457 卷,第 1012-1014 页

  • Anderson, C. (2008 年 6 月 23 日). 理论的终结:数据洪流使科学方法过时Wired

  • Butler, D. (2013). 当谷歌误判流感时自然,第 494 卷,第 155-156 页

  • Harford, T. (2014 年 3 月 28 日). 大数据:我们是否犯了大错误?金融时报

  • Dral, E.Samuylova, E. (2020 年 11 月 12 日). 机器学习监控,第五部分:为什么你应该关注数据漂移和概念漂移Evidently AI 博客

  • Forrester Consulting。(2021). 将 ML 模型部署到内存数据库:实现快速性能。从redis.com/wp-content/uploads/2021/06/forrester-ai-opportunity-snapshot.pdf检索。

  • Sato, D.Wider, A.Windheuser, C. (2019 年 9 月 19 日). 机器学习的持续交付:自动化机器学习应用的端到端生命周期。从martinFowler.com检索,martinfowler.com/articles/cd4ml.html

  • Verma, D. C. (2021). 联邦 AI 应用于现实世界商业场景纽约:CRC 出版社

  • Bostrom, R. P.Heinen, J. S. (1977). MIS 问题与失败:社会技术视角。第一部分:原因MIS 季刊,第 1 卷第 3 期,第 17 页

  • Weld, D. S.Lin, C. H.Bragg, J. (2015). 人工智能与集体智慧集体智慧手册,第 89-114 页

  • Abay, A.Zhou, Y.Baracaldo, N.Rajamoni, S.Chuba, E.Ludwig, H. 缓解联邦学习中的偏差。可在 https://arxiv.org/pdf/2012.02447.pdf 获取。

  • 《大数据:一场将改变我们生活、工作和思考方式的革命》 (www.amazon.com/Big-Data-Revolution-Transform-Think/dp/0544227751

第二章:什么是联邦学习?

第一章《大数据和传统人工智能的挑战》中,我们探讨了大数据和机器学习(ML)的潮流如何为新的实用 ML 应用方法奠定了基础。本章将联邦学习(FL)视为满足这种新 ML 方法需求的答案。简而言之,FL 是一种 ML 方法,允许模型在数据源之间并行训练,而不需要传输任何数据。

本章的目标是建立 FL 方法的论据,解释必要的概念构建块,以确保你可以达到对 FL 的技术方面和实际应用的类似理解。

阅读本章后,你应该对 FL 过程有一个高级别的理解,并且能够可视化这种方法在现实世界问题领域中的位置。

在本章中,我们将涵盖以下主题:

  • 理解当前机器学习(ML)的状态

  • 分布式学习特性——走向可扩展人工智能

  • 理解联邦学习(FL)

  • FL 系统考虑因素

理解当前机器学习(ML)的状态

为了理解从应用 FL 中获得的利益为何可能超过这种方法增加的复杂性,有必要了解 ML 当前的实践及其相关的限制。本节的目标是为你提供这个背景。

什么是模型?

术语“模型”在众多不同学科中都有应用;然而,我们感兴趣的广义定义可以缩小到某些所需系统内动态的工作表示。简单来说,我们通过 B 模型更好地理解现象 A,A 是现象 B 的某种现象,这是通过 B 提供的增加的交互性来实现的。考虑一个物体从真空中某个点掉落的现象。使用运动方程,我们可以精确计算出物体落地所需的时间——这就是上述现象的模型。

这种方法的强大之处在于,可以在不与所讨论的现象进行明确交互的情况下观察创建的模型的结果。例如,下落物体的模型使我们能够在某个高度上确定 10 公斤物体和 50 公斤物体下落时间的差异,而无需在真实真空中从该高度物理地释放真实物体。显然,对自然现象的建模在能够声称真正理解这些现象方面发挥着关键作用。消除对现象进行全面观察的需要,使得在决策过程中实现真正的泛化成为可能。

在计算机科学领域,模型的概念被大大缩小了。在这种情况下,模型是算法,它允许根据对所讨论现象的某些初始描述输出该现象的一些关键值。回到落体例子,计算机科学模型可能包括根据物体的质量和下落的高度计算值,如击中地面的时间和最大速度。这些计算机科学模型由于计算机在从无数起始现象配置中计算输出的超人类能力而独具特色,为我们提供了更深入的理解和更广泛的概括。

那么,我们如何创建这样的模型呢?第一个也是最简单的方法是构建基于规则的系统或白盒模型。白盒模型(也称为玻璃盒或透明盒)是通过明确写出感兴趣系统的底层函数来构建的。这只有在系统信息可预先获得的情况下才可能。自然地,在这种情况下,底层函数相对简单。一个这样的例子是分类随机选择的整数是奇数还是偶数的问题;我们可以很容易地编写一个算法通过检查整数除以二的余数来完成这个任务。如果你想看看加满油箱需要花多少钱,给定油箱的空余量和每加仑的价格,你只需将这些值相乘即可。尽管这些例子很简单,但它们说明了简单的模型可以在各个领域有大量的实际应用。

不幸的是,底层函数的白盒建模很快就会变得过于复杂而无法直接执行。一般来说,系统通常过于复杂,我们无法构建白盒模型。例如,假设你想预测你财产的未来价值。你有很多关于财产的指标,比如面积、它的年龄、位置,以及利率等等。你相信财产价值和所有这些指标之间可能存在线性关系,即所有这些指标的加权总和将给出财产价值。现在,如果你实际上基于这个假设尝试构建白盒模型,你必须直接确定每个指标的参数(权重),这意味着你必须知道房地产定价系统的底层函数。通常情况下,这不是事实。因此,我们需要另一种方法:黑盒建模。

ML – 自动化模型创建过程

黑盒系统的概念最早在二战期间的电气电路领域发展起来。正是著名的控制论学家诺伯特·维纳开始将黑盒视为一个抽象概念,并在 20 世纪 60 年代由马里奥·奥古斯特·邦格建立了通用理论。如前所述,估计未来房地产价值的函数是一个很好的黑盒例子。正如你所期望的,这个函数足够复杂,我们尝试编写一个白盒模型来表示它是不可行的。这就是机器学习发挥作用的地方,它允许我们创建一个作为黑盒的模型。

参考文献

你可能知道,黑盒建模因其缺乏可解释性而受到批评,这是本书范围之外的一个重要概念;我们推荐 Serg Masís 所著的 Packt 出版的《Python 可解释机器学习》作为该主题的参考。

机器学习是一种人工智能,用于自动生成用于决策和预测的模型参数。图 2.1 以非常简单的方式说明了这一点:那些已知值和未知值之间存在线性关系的案例允许应用一个流行的算法,称为普通最小二乘法OLS)。OLS 通过找到产生最接近预测的一组参数来计算线性关系的未知参数,这些参数是在一些已知示例(输入特征值集和真实输出值对)上进行的:

![图 2.1 – 机器学习确定模型参数]

![图片 B18369_02_01.jpg]

![图 2.1 – 机器学习确定模型参数]

上述图表显示了一个简单的二维线性回归问题,包含一个特征/输入变量和一个输出变量。在这个简单的二维案例中,我们可能相对容易直接提出代表最佳拟合关系的参数,无论是通过隐含知识还是通过测试不同的值。然而,很明显,随着特征变量的数量增加,这种方法很快就会变得难以处理。

最小二乘法(OLS)允许我们从相反的方向解决这个问题:我们不是产生线性关系并在数据上评估它们,而是可以直接使用数据来计算最佳拟合关系的参数。回顾房地产问题,假设我们已经收集了大量房地产估值数据点,包括相关的指标值和销售价格。我们可以应用 OLS 来取这些点,并找出每个指标与任何房地产销售价格之间的关系(仍然假设真实关系是线性的)。据此,我们可以输入我们房地产的指标值,并得到预测的销售价格。

这种方法的力量在于将这种关系计算从对问题的任何隐含知识中抽象出来。最小二乘法(OLS)算法并不关心数据代表什么——它只是找到给定数据的最佳直线。这类方法正是机器学习所包含的,它赋予了在没有内部关系知识的情况下,仅凭足够的数据创建现象模型的能力。

简而言之,机器学习(ML)让我们能够编写算法,这些算法可以从数据中学习创建模型,而我们这样做的原因是为了近似复杂系统。重要的是要记住,由于外部因素的影响,复杂系统的底层函数可能会随时间而改变,这使得从旧数据中创建的模型很快就会过时。例如,前面的线性回归模型可能无法用于估计遥远未来或遥远地区的财产价值。在只包含几十个参数的模型中,并没有考虑到这种宏观尺度上的变化,我们需要为相邻数据点的不同组别使用不同的模型——除非我们采用更复杂的机器学习方法,如深度学习

深度学习

那么,深度学习是如何在普通用法中与机器学习(ML)同义的?深度学习涉及应用深度神经网络(DNN),这是一种受大脑中神经元之间信号传递启发的超参数化模型。深度学习的基础是在 20 世纪 60 年代初由 Frank Rosenblatt 建立的,他被誉为深度学习之父。他的工作在 20 世纪 70 年代和 80 年代得到了包括 Geoffrey Hinton、Yann LeCun 和 Yoshua Bengio 在内的计算机科学家的进一步发展,而“深度学习”这一术语是由加州大学欧文分校的杰出教授 Rina Dechter 普及的。与简单的机器学习算法(如线性回归)相比,深度学习可以执行更复杂的任务。虽然具体内容超出了本书的范围,但深度学习能够解决的关键问题是复杂非线性关系的建模,由于它提供的建模能力增强,推动了机器学习作为整体在众多领域的最前沿。

这种能力已经通过针对不同模型大小案例的具体通用逼近定理在数学上得到证明。对于那些对深度学习或机器学习(ML)总体上都是新手的人来说,Sebastian Raschka 和 Vahid Mirjalili 合著的《Python 机器学习》第三版是一个很好的参考资料,可以了解更多关于这个主题的内容。

在过去十年中,随着大数据的背景,科技巨头们构建了越来越强大的模型,正如在第一章“大数据与传统人工智能的挑战”中讨论的那样。如果我们看看当今最先进的深度学习模型,它们可能拥有多达万亿个参数;预期地,这赋予了它们在建模复杂函数方面无与伦比的灵活性。深度学习模型之所以能够通过所谓的双重下降现象任意扩展以增加性能,而不同于之前使用的其他机器学习模型类型,是因为这种现象。这指的是某个参数化/训练阈值能够克服标准的偏差-方差权衡(其中增加复杂性会导致对训练数据的微调,减少偏差但增加方差),并继续提高性能。

关键的启示是,深度学习模型的性能可以被认为是仅受限于可用的计算能力和数据,这两个因素在过去十年中由于计算技术的进步以及设备数量和收集数据的软件数量的不断增加而迅速增长。深度学习已经与机器学习交织在一起,深度学习在当前的机器学习和大数据状态下发挥着重要作用。

本节重点在于确立当前机器学习技术所执行建模的重要性。从某种意义上说,这可以被认为是什么——FL(联邦学习)究竟试图做什么。接下来,我们将关注哪里,即众多机器学习应用所期望的设置。

分布式学习特性——向可扩展人工智能迈进

在本节中,我们介绍了分布式计算环境,并讨论了该环境与机器学习方法的交汇,以完全确立 FL(联邦学习)必要性的支持。本节的目标是让用户理解分布式计算环境带来的优势和局限性,以便理解 FL 如何解决其中的一些局限性。

分布式计算

在过去几年中,从分布式计算的角度来看,新方法的开发和对现有服务器基础设施的转换呈现出大幅但可预测的增长。进一步概括,分布式方法本身越来越多地从研究实现转向在生产环境中的广泛应用;这一现象的一个显著例子是亚马逊的 AWS、谷歌的Google Cloud Platform(GCP)和微软的 Azure 等云计算平台的运用。结果是,按需资源的灵活性使得在许多其他情况下,可能会受到本地服务器和计算能力的瓶颈限制的应用中,实现了成本节约和效率提升。

虽然不能完全将云计算与分布式计算的概念相提并论,但由此产生的关键好处在本质上相似。从高层次来看,分布式计算涉及将某些计算任务的必要工作分散到多个计算代理中,以便每个代理都能近乎自主地行动。以下图显示了在回答问题的高层次上下文中,集中式和分布式方法之间的差异:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_02_02.jpg

图 2.2 – 集中式与分布式问答

在这个玩具示例中,集中式方法涉及按顺序处理输入问题,而分布式方法能够同时处理每个问题。应该清楚的是,并行方法是在计算资源使用和增加回答速度之间进行权衡。那么,这个权衡对于现实世界的应用是否有益,自然就成为了一个问题。

现实世界案例 – 电子商务

为了理解分布式计算方法的实际好处,让我们通过传统计算和分布式计算的角度分析一个业务问题。考虑一个试图使用本地服务器托管其网站的电子商务业务。传统的方法是在业务方面进行足够分析,以确定未来某个时间点的预期流量量,并投资购买一台或几台足够处理该计算出的流量的服务器机器。

几个案例立即揭示了这种方法的缺陷。考虑一种情况,即网站的使用量远远超过了最初的预测。固定数量的服务器意味着所有升级都必须是硬件升级,导致必须购买且不再使用的旧硬件。进一步来说,现在增加的使用量是否保持不变也没有保证。使用量的进一步增加将导致更多的扩展成本,而使用量的减少将导致资源浪费(当小型机器就足够时,仍需维护大型服务器)。一个关键点是,由于使用单机方法来管理托管,额外服务器的集成并不简单。此外,我们必须考虑处理大量并发请求的硬件限制。每个机器处理并发请求的能力有限——大量的流量几乎肯定会最终成为瓶颈,无论每个服务器的可用功率如何。

相比之下,考虑基于分布式计算的解决方案。根据初步的商业预测,购买了一些较小的服务器机器,并且每台机器都设置好了以处理一些固定的流量量。如果出现超出预期的流量情况,不需要对现有机器进行修改;相反,可以购买更多类似尺寸的服务器,并配置它们来处理指定的新流量量。如果流量减少,相应数量的服务器可以被关闭或转移到处理其他任务。这意味着相同的硬件可以用于可变流量的处理。

这种能力能够快速扩展以处理任何时刻必要的计算任务,这正是分布式计算方法允许计算代理无缝开始和停止工作在所述任务上的原因。此外,与使用较少的大型机器相比,并行使用许多较小的机器意味着可以同时处理的请求数量显著更高。很明显,在这种情况下,分布式计算方法本身具有节省成本和灵活性的优势,这是更传统的方法无法比拟的。

分布式计算的好处

通常,分布式计算方法为任何计算任务提供了三个主要好处——可扩展性、吞吐量和弹性。在前面的网页托管案例中,可扩展性指的是根据进入的流量量调整部署的服务器数量,而吞吐量指的是通过较小服务器的固有并行性来减少请求处理延迟的能力。在这个例子中,弹性可能指的是其他部署的服务器能够承担停止工作的服务器的负载,从而使得托管能够相对不受影响地继续进行。

分布式计算通常在处理大量数据时得到应用,尤其是在使用单一机器进行数据分析在计算上不可行或不太理想的情况下。在这些情况下,可扩展性允许根据所需的运行时间和任何给定时间的数据量等因素部署可变数量的代理,而每个代理能够自主地并行处理数据子集的能力,使得处理吞吐量对于单一高性能机器来说是不可能的。结果发现,这种不依赖于尖端硬件的做法进一步降低了成本,因为硬件价格与性能比通常不是线性的。

虽然开发用于在分布式计算环境中运行的并行化软件并非易事,但希望这清楚地表明,许多实际计算任务都极大地受益于这种方法实现的可扩展性和吞吐量。

分布式机器学习

当考虑那些在实用应用中已被证明有价值且可能直接受益于增加可扩展性和吞吐量的计算任务类型时,很明显,快速增长的机器学习领域几乎处于顶端。事实上,我们可以将机器学习任务视为上述分析大量数据任务的具体例子,强调正在处理的数据和分析所执行的性质。廉价计算能力(例如,智能设备)的联合增长以及数据分析建模的既定益处,导致了一些公司拥有大量数据存储,并希望从这些数据中提取有意义的见解和预测。

第二部分正是机器学习旨在解决的问题,已经在各个领域完成了大量工作。然而,与其他计算任务一样,在大量数据上执行机器学习往往会导致时间-计算能力权衡,需要更强大的机器在合理的时间内完成这些任务。随着机器学习算法在计算和内存方面变得更加密集,例如最近具有数十亿参数的最新深度学习模型,硬件瓶颈使得增加计算能力变得不可行。因此,当前的机器学习任务必须应用分布式计算方法,以保持前沿地位,同时在使用时间内产生结果。

边缘推理

尽管前面描述的深度学习的普及,以及在第第一章中讨论的从大数据到集体智能的范式转变——“大数据与传统人工智能的挑战”,为分布式机器学习提供了足够的动力,但其物理基础却源于最近边缘计算的快速发展。这里的边缘指的是部署解决方案的附近区域;因此,边缘计算指的是在数据源附近或其处处理数据。将计算的概念扩展到机器学习导致边缘人工智能(Edge AI)的想法,其中模型直接集成到边缘设备中。一些流行的例子包括亚马逊 Alexa,其中边缘人工智能负责语音识别,以及自动驾驶汽车,它们收集真实世界的数据,并通过边缘人工智能逐步改进。

最普遍的例子是智能手机——一些潜在用途包括向用户推荐内容、带有语音助手的搜索和自动完成、自动将图片排序到相册和图库搜索等。为了利用这种潜力,智能手机制造商已经开始将专注于机器学习的处理器组件集成到他们与最新手机集成的芯片中,例如三星的神经处理单元和谷歌 Tensor 芯片上的张量处理单元。谷歌还通过他们的Android ML Kit SDK为 Android 应用程序开发了专注于机器学习的 API。从这一点来看,机器学习应用正转向边缘计算范式。

假设智能手机需要使用深度学习模型来进行单词推荐。这样,当你用手机输入文字时,它会为你提供下一个单词的建议,目的是节省你的时间。在集中式计算过程的方案中,中央服务器是唯一可以访问这个文本预测模型的组件,而没有任何一部手机本地存储了这个模型。中央服务器处理来自手机的所有请求,以返回单词推荐。当你输入时,你的手机必须将已输入的内容以及一些关于你的个人信息发送到中央服务器。服务器接收到这些信息,使用深度学习模型进行预测,然后将结果发送回手机。以下图反映了这个场景:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_02_03.jpg

图 2.3 – 集中式推理场景

当你观察这个场景时,会出现一些明显的问题。首先,即使只有半秒到一秒的延迟,推荐的速度也会比你自己输入所有内容慢,使得系统变得无用。此外,如果没有互联网连接,推荐根本无法工作。这个方案的另一个限制是需要中央服务器处理所有这些请求。想象一下世界上有多少部智能手机正在使用,你就会意识到由于这个解决方案的极端规模,其可行性存在不足。

现在,让我们从边缘计算的角度来看同一个问题。如果智能手机本身包含深度学习模型会怎样?中央服务器只负责管理最新的训练模型,并与每部手机通信这个模型。现在,无论何时你开始输入,你的手机都可以使用接收到的模型在本地进行推荐。以下图反映了这个场景:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_02_04.jpg

图 2.4 – 边缘推理场景

这样既消除了延迟问题,又避免了在中央位置处理传入的推理请求的需要。此外,手机不再需要与服务器保持连接以进行推荐。每部手机负责满足其用户的请求。这是边缘计算的核心优势:我们将计算负载从中央服务器转移到了边缘设备/服务器

边缘训练

集中式与分布式计算之间的区别可以扩展到模型训练的概念。让我们继续以智能手机为例,但思考一下我们如何训练预测模型。首先,在集中式机器学习过程中,用于训练推荐模型的所有数据都必须从用户的设备中收集并存储在中央服务器上。然后,收集到的数据用于训练一个模型,最终发送到所有手机。这意味着中央服务器仍然需要能够处理大量涌入的用户数据,并以高效的方式存储它,以便能够训练模型。

这种设计导致了集中式计算方法中存在的问题:随着连接到服务器的手机数量的增加,服务器处理传入数据的能力需要扩展,以维持训练过程。此外,由于数据需要在这种方法中集中传输和存储,因此始终存在传输被拦截或存储数据受到攻击的可能性。在许多情况下,需要或强烈希望保护数据机密性和隐私;例如,金融和医疗行业中的应用。因此,集中式模型训练限制了用例,需要一种直接在边缘设备上处理数据的工作方式。这正是 FL 的动机所在。

理解 FL

本节重点在于提供对 FL(联邦学习)如何作为解决上一节所述问题设置的一种解决方案的高级技术理解。本节的目标是让您了解 FL 作为解决方案的适用性,并为后续章节提供概念基础。

定义 FL

联邦学习是一种从边缘训练的本地模型中综合全局模型的方法。FL 首次由 Google 在 2016 年为他们的 Gboard 应用开发,该应用结合了 Android 用户打字历史的上下文来建议更正并提出后续单词的候选词。确实,这正是我们在边缘推理边缘训练部分讨论的精确单词推荐问题。Google 提出的解决方案是一种去中心化的训练方法,其中迭代过程会在边缘计算模型训练更新,并将这些更新聚合起来以生成应用于模型的全球更新。这种聚合模型更新的核心概念对于从边缘训练产生单个、高性能的模型至关重要。

让我们进一步分解这个概念。期望的模型分布在边缘,并在边缘收集的本地数据上训练。当然,我们可以预期,在一个特定数据源上训练的模型不会代表整个数据集。因此,我们将使用有限数据训练的这种模型称为本地模型。这种方法的直接好处是,它使得在集中式情况下由于隐私和效率问题而无法收集的数据上的机器学习成为可能。

聚合,FL 的关键理论步骤,允许我们期望的单一全局模型从某些迭代产生的本地模型集合中创建出来。最著名的聚合算法,因其简单性和出人意料的性能而受到欢迎,被称为联邦平均FedAvg)。FedAvg 通过计算模型间的参数算术平均值来在本地模型集上执行,生成一个聚合模型。重要的是要理解,仅进行一次聚合不足以产生一个好的全局聚合模型;相反,是通过在本地训练先前的全局模型并将产生的本地模型聚合到一个新的全局模型中,这个过程允许实现全局训练的进展。

FL 流程

为了从迭代过程的角度更好地理解 FL,我们将它分解为单个迭代或的核心组成步骤。

一轮的步骤可以描述如下:

  1. 将聚合全局模型参数发送到每个用户的设备。

  2. 用户设备上接收到的 ML 模型使用本地数据进行训练。

  3. 在一定量的训练之后,本地模型参数被发送到中心服务器。

  4. 中心服务器通过应用聚合函数来聚合本地模型,生成一个新的聚合全局模型。

这些步骤在图 2.5中有所描述:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_02_05.jpg

图 2.5 – FL 步骤

步骤 1 到 4的流程构成了一个 FL 的单轮。下一轮开始于用户服务器/设备接收到新创建的聚合模型并开始在本地数据上训练。

让我们回顾一下谷歌为 Gboard 提供的单词推荐功能。在某个时间点,每部手机都会存储足够多的用户打字数据。边缘训练过程可以从这些数据中创建一个本地模型,并将参数发送到中央服务器。在收到一定数量的手机发送的参数后,服务器将它们汇总以创建一个全局模型,并将其发送到手机。这样,连接到服务器的每部手机都会接收到一个反映所有手机本地数据的模型,而无需从它们那里传输数据。反过来,当收集到另一批足够的数据时,每部手机都会重新训练模型,将模型发送到服务器,并接收一个新的全局模型。这个周期会根据 FL 系统的配置反复进行,从而实现全局模型的持续监控和更新。

注意,用户数据从未离开边缘,只有模型参数;也不需要将所有数据放入中央服务器以生成全局模型,这允许数据最小化。此外,可以使用 FL 方法减轻模型偏差,如第第三章中讨论的,联邦学习系统的运作。这就是为什么 FL 可以被视为解决在第一章中引入的大数据三个问题的解决方案,即大数据和传统 AI 的挑战

迁移学习

FL 与机器学习中的一个概念迁移学习TL)密切相关。迁移学习允许我们使用研究人员使用大量计算能力和资源在非常通用的数据集上训练的大型深度学习模型。这些模型可以应用于更具体的问题。

例如,我们可以取一个训练用于在图像中定位和命名特定对象的物体检测模型,并在包含我们感兴趣且未包含在原始数据中的特定对象的有限数据集上重新训练它。如果你要从原始数据中取出,添加我们感兴趣的那些对象的资料,然后从头开始训练一个模型,将需要大量的计算时间和能力。有了迁移学习,你可以通过利用现有大型通用模型的一个关键事实来加快这个过程。大型深度神经网络的中层通常非常擅长提取特征,这些特征被后续层用于特定的机器学习任务。我们可以通过保留这些层中的参数来维持其提取特征的学习能力。

换句话说,现有预训练模型某些层的参数可以保留并用于检测新对象——我们不需要重新发明轮子。这种技术被称为参数冻结。在 FL 中,模型训练通常在计算能力有限的本地设备/服务器上进行。一个使用 Gboard 场景的例子是在预训练的词嵌入层上执行参数冻结,以便训练可以专注于特定任务的信息,利用嵌入的先前训练来大大减少可训练参数的数量。

将这个概念进一步扩展,联邦学习(FL)和迁移学习(TL)的交集被称为联邦迁移学习FTL)。FTL 允许在本地数据集结构不同的场景下应用 FL 方法,通过在模型的一个共享子集上执行 FL,该子集可以后来扩展用于特定任务。例如,一个情感分析模型和一个文本摘要模型可以共享一个句子编码组件,该组件可以使用 FL 进行训练,并用于这两个任务。TL(以及由此扩展的 FTL)是允许在 FL 中实现训练效率和增量改进的关键概念。

个性化

当边缘设备处理的数据不是独立同分布IID)时,每个设备都可以定制全局模型。这是一个被称为个性化的概念,可以被视为使用本地数据对全局模型进行微调,或者是在数据中战略性地使用偏差。

例如,考虑一个在两个具有不同本地人口统计数据的地区运营的购物中心连锁店(即,该连锁店处理非 IID 数据)。如果该连锁店使用 FL 为两个地点寻求租户推荐,那么每个地点都可以通过个性化的模型比单一的全局模型更好地得到服务,从而有助于吸引当地客户。由于个性化模型是通过本地数据微调或偏差的,我们可以预期它在通用数据上的性能不会像全局模型那样好。另一方面,我们也可以预期个性化模型在为模型个性化设计的本地数据上的性能会优于全局模型。在用户特定性能和泛化性之间存在权衡,而 FL 系统的强大之处在于其灵活性,可以根据需求平衡它们。

水平和垂直 FL

FL 有两种类型:水平同质FL 和 垂直异质FL。水平 FL,也称为基于样本的 FL,适用于所有与聚合服务器连接的本地数据集具有相同的特征但包含不同的样本的情况。前面讨论的 Gboard 应用就是一个很好的水平 FL 的例子,即跨设备 FL,也就是说,本地训练发生在边缘设备上。所有安卓手机的数据库格式相同,但内容独特,反映了用户的打字历史。另一方面,垂直 FL,或基于特征的 FL,是一种更先进的技术,允许持有相同样本的不同特征的各方合作生成一个全局模型。

例如,一家银行和一家电子商务公司可能都存储了一个城市的居民数据,但他们的特征会有所不同:前者了解公民的信用和支出模式,后者了解他们的购物行为。他们都可以通过共享有价值的见解而无需共享客户数据来受益。首先,银行和电子商务公司可以使用私有集合交集PSI)技术来识别他们的共同用户,同时使用Rivest-Shamir-AdlemanRSA)加密来保护数据隐私。接下来,每个当事人使用包含独特特征的本地数据训练一个初步模型。然后,这些模型被聚合起来构建一个全局模型。通常,垂直 FL 涉及多个数据孤岛,在这种情况下,它也被称为跨孤岛 FL。在中国,联邦人工智能生态系统FATE)因其涉及 WeBank 的垂直 FL 的开创性演示而闻名。如果您对 FL 的进一步概念细节感兴趣,Cloudera Fast Forward Labs 有一个非常说明性和文笔优美的报告,网址为 https://federated.fastforwardlabs.com/。

本节中包含的 FL 信息应该足以理解以下章节,这些章节将进一步深入探讨此处介绍的一些关键概念。本章的最后部分旨在涵盖一些关注 FL 实际应用的辅助概念。

FL 系统考虑因素

本节主要关注 FL 的多方计算方面,包括理论安全措施和完全去中心化方法。本节的目标是让您意识到一些在实际 FL 应用中应考虑的更实际的考虑因素。

FL 系统的安全性

尽管这项技术尚处于起步阶段,但 FL 在一些领域的实验性使用已经出现。具体来说,金融行业的反洗钱(AML)和医疗行业的药物发现与诊断已经看到了有希望的结果,正如 Consilient 和 Owkin 等公司在该领域的概念验证已经成功实施。在 AML 用例中,银行可以相互合作,以高效地识别欺诈交易,而无需共享他们的账户数据;医院可以在保持其患者数据的同时,提高检测健康问题的机器学习模型。

这些解决方案利用了相对简单的横向跨领域 FL 的力量,如理解 FL部分所述,其应用正在扩展到其他领域。例如,Edgify 是一家位于英国的公司,与英特尔和惠普合作,致力于自动化零售店的收银员。在德国慕尼黑,另一家英国公司 Fetch.ai 正在利用其基于 FL 的技术开发智能城市基础设施。很明显,FL 的实际应用正在迅速增长。

尽管 FL 可以通过其隐私设计(模型参数不暴露隐私)和数据最小化(数据不在中央服务器收集)的方法绕过对数据隐私的担忧,正如在第一章“大数据和传统 AI 的挑战”中讨论的那样,但其实施可能存在潜在的障碍;其中一个例子是 FL 项目参与者之间的不信任。考虑一种情况,银行 A银行 B同意使用 FL 来开发一个协作式反洗钱(AML)解决方案。他们决定采用共同的模型架构,以便每个银行都可以使用自己的数据训练本地模型,并将结果汇总以创建一个全局模型供双方使用。

FL 的简单实现可能允许一个银行使用其本地模型和聚合模型从另一个银行重建本地模型。据此,该银行可能能够从用于训练另一个银行模型的训练数据中提取关键信息。因此,可能会出现关于哪个方应该托管服务器以聚合本地模型的争议。一个可能的解决方案是由第三方托管服务器并负责模型聚合。然而,银行 A如何知道第三方没有与银行 B勾结,反之亦然?进一步来说,将 FL 系统集成到以安全为重点的领域会导致对每个系统组件的安全性和稳定性的新担忧。与不同 FL 系统方法相关的已知安全问题可能会给对抗性攻击带来额外的潜在弱点,这种弱点超过了该方法的益处。

有几种安全措施可以在不强迫参与者相互信任的情况下允许 FL 协作。使用一种名为差分隐私DP)的统计方法,每个参与者可以向他们的本地模型参数添加随机噪声,以防止从传输的参数中获取训练数据分布或特定元素的信息。通过从具有零均值和相对较低方差(例如高斯、拉普拉斯)的对称分布中采样随机噪声,预期在聚合时添加到本地模型中的随机差异将被抵消。因此,预期全局模型将与没有 DP 生成的模型非常相似。

然而,这种方法存在一个关键的局限性;为了使添加的随机噪声收敛到零,必须足够多的参与者加入联盟。对于仅涉及少数银行或医院的工程项目来说,这种情况可能并不适用,在这种情况下使用差分隐私(DP)会损害全局模型的完整性。可能需要采取一些额外的措施,例如,每个参与者发送他们本地模型的多个副本以增加模型数量,从而使噪声得到抵消。在某些完全去中心化的联邦学习(FL)系统中,另一种可能性是安全多方计算MPC)。基于 MPC 的聚合允许代理之间相互通信并计算聚合模型,而不涉及可信的第三方服务器,从而保持模型参数的隐私性。

参与者如何确保系统免受外部攻击?同态加密HE)在加密过程中保留了加法和乘法对数据的影响,允许以加密形式将本地模型聚合到全局模型中。这阻止了模型参数暴露给没有解密密钥的外部人员。然而,HE 在保护参与者之间通信方面的有效性伴随着极高的计算成本:使用 HE 算法处理数据的操作可能比其他方式慢数百万亿倍!

为了缓解这一挑战,可以使用部分同态加密(HE),它仅与加密中的加法或乘法操作之一兼容;因此,它在计算上比完全同态加密轻得多。使用这种方案,联盟中的每个参与者都可以加密并发送他们的本地模型到聚合器,然后聚合器将所有本地模型相加,并将聚合模型发送回参与者,参与者随后解密模型并将参数除以参与者的数量以接收全局模型。

HE 和 DP 都是 FL 在实际应用中的关键技术。对 FL 在现实场景中实施感兴趣的人可以从 IBM 研究学者 Dinesh C. Verma 所著的《Federated AI for Real-World Business Scenarios》一书中学到很多。

去中心化 FL 和区块链

到目前为止讨论的 FL 架构是基于客户端-服务器网络,即边缘设备与中央聚合服务器交换模型。然而,由于之前讨论的 FL 联盟参与者之间的信任问题;然而,建立一个以聚合器作为单独和中央实体的系统可能会出现问题。聚合器的主持人可能难以保持公正和无偏见地对待自己的数据。此外,拥有一个中央服务器不可避免地会导致 FL 系统中的单点故障,这导致系统弹性低。此外,如果聚合器设置在云服务器上,实施这样的 FL 系统将需要一个熟练的 DevOps 工程师,这可能很难找到且成本高昂。

考虑到这些担忧,本书的主要作者 Kiyoshi Nakayama 共同撰写了一篇关于首次使用区块链技术进行完全去中心化 FL 实验的文章(www.kiyoshi-nakayama.com/publications/BAFFLE.pdf)。利用智能合约来协调模型更新和聚合,构建了一个私有以太坊网络以无服务器方式执行 FL。实验结果表明,基于对等网络的去中心化 FL 可以比基于聚合器的集中式 FL 更加高效和可扩展。在惠普和德国研究机构最近进行的一项实验中,确认了去中心化架构的优越性,他们给使用区块链技术的去中心化 FL 起了一个独特的名字:swarm learning

虽然 FL 领域的研发正在转向去中心化模式,但本书的其余部分假设使用聚合服务器进行集中式架构。这种设计有两个原因。首先,区块链技术仍然是一个新兴技术,AI 和 ML 研究人员并不一定熟悉。引入对等通信方案可能会使主题过于复杂。其次,FL 本身的逻辑独立于网络架构,集中式模型没有问题,可以说明 FL 的工作原理。

摘要

在本章中,我们讨论了由于所有级别的可访问计算能力的增长而带来的两个关键发展。首先,我们探讨了模型的重要性以及它是如何使 ML 在实用应用中显著增长,计算能力的增加允许产生比手动创建的白色盒系统更强的模型,这些模型可以持续产生。我们称之为 FL 的what – ML 是我们试图使用 FL 执行的内容。

然后,我们退后一步,看看边缘设备是如何达到在合理时间内执行复杂计算的阶段,这对于现实世界的应用来说是有意义的,比如我们手机上的文本推荐模型。我们称之为 FL 的哪里——我们想要执行机器学习的环境。

从“什么”和“哪里”,我们得到这两个发展的交集——直接在边缘设备上使用机器学习模型。记住,对于边缘机器学习案例,标准的中心训练方法由于需要集中收集所有数据而严重受限,这阻止了需要高效通信或数据隐私的应用。我们展示了FL如何直接解决这个问题,通过在数据存储的同一位置进行所有训练以产生本地模型聚合算法将这些本地模型组合成一个全局模型。通过在本地训练和聚合之间迭代切换,FL 允许创建一个模型,该模型实际上已经在所有数据存储中进行过训练,而无需集中收集数据。

我们通过跳出有效聚合背后的理论,审视系统与架构设计方面的考虑,如模型隐私和完全去中心化等方面,来结束本章。阅读完本章后,应该清楚,当前机器学习(ML)、边缘计算以及实际联邦学习(FL)应用的初步增长表明,FL 在不久的将来有望实现显著增长。

在下一章中,我们将从系统层面考察联邦学习的实现。

进一步阅读

想了解更多本章涉及的主题,请参阅以下参考文献:

  • Ruiz, N. W. (2020, 12 月 22 日). 未来是联邦化的. Medium.

  • Contessa, G. (2010). 科学模型与虚构对象. Synthese, 172(2): 215–229.

  • Frigg, R. 和 Hartmann, S. (2020). 科学中的模型. 在 Zalta, E. N. (编). 斯坦福哲学百科全书.

  • Bunge, M. (1963). 通用黑盒理论. 科学哲学, 30(4): 346-358.

  • Moore, S. K.,Schneider, D. 和 Strickland, E. (2021, 9 月 28 日). 深度学习是如何工作的:驱动今天人工智能的神经网络内部. IEEE Spectrum.

  • Raschka, S. 和 Mirjalili, V. (2019). Python 机器学习:使用 Python、scikit-learn 和 TensorFlow 2 进行机器学习和深度学习(第 3 版). 伯明翰:Packt Publishing.

  • McMahan, B., Moore, E., Ramage, D., Hampson, S. 和 Arcas, B. A. (2016). 从去中心化数据中高效学习深度网络. JMLR WandCP, 54.

  • 黄,X.,丁,Y.,江,Z. L. 和 Qi,S. (2020). DP-FL:一种用于不平衡数据的创新差分隐私联邦学习框架. World Wide Web, 23: 2529–254.

  • 杨,Q.,刘,Y.,陈,T.,和童,Y. (2019). 联邦机器学习:概念与应用. ACM 智能系统与技术交易(TIST),10(2),1-19.

  • Verma, D. C. (2021). 联邦人工智能:现实世界商业场景应用. 佛罗里达:CRC 出版社.

  • Ramanan, P., and Nakayama, K. (2020). Baffle: Blockchain Based Aggregator Free Federated Learning. 2020 IEEE International Conference on Blockchain, 72-81.

  • Warnat-Herresthal, S.,Schultze, H.,Shastry, K. L.,Manamohan, S.,Mukherjee, S.,Garg, V.,和 Schultze, J. L. (2021). 群智学习:用于去中心化和机密临床机器学习. 自然,594(7862),265-270.

第三章:联邦学习系统的工作原理

本章将概述联邦学习系统(FL)的架构、流程流程、消息序列和模型聚合的基本原理。如第二章“什么是联邦学习?”中讨论的,FL 框架的概念基础非常简单且易于理解。然而,FL 框架的真正实现需要具备对 AI 和分布式系统的良好理解。

本章内容基于联邦学习系统最标准的基石,这些基石将在本书后面的实际练习中使用。首先,我们将介绍联邦学习系统的构建块,例如带有联邦学习服务器的聚合器、带有联邦学习客户端的代理、数据库服务器以及这些组件之间的通信。本章介绍的架构以解耦的方式设计,以便对系统的进一步增强将比包含所有内容在一个机器上的联邦学习系统更容易。然后,我们将解释从初始化到聚合的联邦学习操作流程。

最后,我们将探讨如何通过横向设计去中心化的联邦学习设置来扩展联邦学习系统的规模。

本章涵盖了以下主题:

  • 联邦学习系统架构

  • 理解联邦学习系统流程——从初始化到持续运行

  • 模型聚合的基本原理

  • 通过横向设计进一步扩展可伸缩性

联邦学习系统架构

联邦学习系统是分散到服务器和分布式客户端的分布式系统。在这里,我们将定义一个具有以下组件的联邦学习系统的代表性架构:带有联邦学习服务器的聚合器、带有联邦学习客户端的代理和数据库:

  • 集群聚合器(或聚合器):一个带有联邦学习服务器的系统,它收集和聚合在多个分布式代理(稍后定义)上训练的机器学习模型,并创建全局机器学习模型,这些模型被发送回代理。该系统作为集群聚合器,或者更简单地说,作为联邦学习系统聚合器

  • 分布式代理(或代理):一个带有联邦学习客户端(如本地边缘设备、移动应用、平板电脑或任何分布式云环境)的分布式学习环境,在这些环境中以分布式方式训练机器学习模型并将其发送到聚合器。代理可以通过聚合器的联邦学习客户端通信模块连接到聚合器的联邦学习服务器。联邦学习客户端端的代码包含一系列库,这些库可以集成到由个别机器学习工程师和数据科学家设计和实现的本地机器学习应用中。

  • 数据库服务器(或数据库):用于存储与聚合器、代理、全局和本地机器学习模型及其性能指标相关的数据的数据库及其服务器。数据库服务器处理来自聚合器的查询,并将必要的数据发送回聚合器。为了简化联邦学习系统设计,代理不需要直接连接到数据库服务器。

图 3.1展示了典型的整体架构,包括单个集群聚合器和数据库服务器,以及多个分布式代理:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_01.jpg

图 3.1 – 联邦学习系统的整体架构

联邦学习系统架构的一个优点是用户不需要将私有原始数据发送到服务器,尤其是第三方拥有的数据。相反,他们只需将本地训练的模型发送给聚合器。本地训练的模型可以有多种格式,例如整个机器学习模型的权重、权重的变化(梯度),甚至它们的子集。另一个优点是减少通信负载,因为用户只需交换通常比原始数据轻得多的模型。

聚类聚合器

集群聚合器由联邦学习服务器模块、联邦学习状态管理模块和模型聚合模块组成,如图 3.1所示。我们只称带有联邦学习服务器的集群聚合器为聚合器。虽然这些模块是聚合器的基础,但可以添加高级模块以确保进一步的安全性和灵活性。由于本书的主要目的是理解联邦学习系统的基本结构和系统流程,因此本书提供的simple-fl GitHub 仓库中未实现一些高级模块。在聚合器系统中,与联邦学习服务器、联邦学习状态管理和模型聚合相关的以下模块是实现聚合器端功能的关键。

  • 联邦学习服务器模块:联邦学习服务器模块有三个主要功能,包括通信处理程序、系统配置处理程序和模型合成例程:

    • 通信处理器:作为聚合器的一个模块,支持与代理和数据库的通信。通常,此模块接受来自代理的轮询消息并向它们发送响应。他们接收的消息类型包括使用安全凭证和认证机制进行代理注册、初始化作为未来聚合过程初始模型的 ML 模型、确认代理是否参与某一轮次以及重新训练于分布式代理(如移动设备和本地边缘机器)的本地 ML 模型。通信处理器还可以查询数据库服务器,以访问系统数据以及数据库中的 ML 模型,并在聚合器接收或创建新模型后推送和存储这些数据和模型。此模块可以使用 HTTP、WebSocket 或任何其他通信框架来实现其实现。

    • 系统配置处理器:处理代理的注册和跟踪连接的代理及其状态。聚合器需要了解代理的连接和注册状态。如果代理使用已建立的认证机制进行注册,它们将接受消息并相应地处理它们。否则,此模块将执行认证过程,例如验证从代理发送的令牌,以便下次此代理连接到 FL 服务器时,系统能够正确识别该代理。

    • 模型合成例程:支持检查本地 ML 模型的收集状态,并在满足收集标准后进行聚合。收集标准包括连接的代理收集的本地模型数量。例如,当 80%的连接代理将训练好的本地模型发送到聚合器时,就会发生聚合。实现这一目标的设计模式之一是定期检查代理上传的 ML 模型数量,这些操作在 FL 服务器运行时持续进行。模型合成例程将定期访问数据库或本地缓冲区,以检查本地模型收集的状态并聚合这些模型,以生成将存储在数据库服务器中并发送回代理的全局模型。

  • FL 状态管理器:状态管理器跟踪聚合器和连接的代理的状态信息。它存储聚合器的易失性信息,例如代理提供的本地和全局模型、从数据库拉取的集群模型、FL 轮次信息或连接到聚合器的代理。缓冲的本地模型由模型聚合模块使用,以生成发送回连接到聚合器的每个活动代理的全局模型。

  • 模型聚合模块:模型聚合模块是本章“模型聚合基础”部分和第七章“模型聚合”中介绍的模型聚合算法的集合。最典型的聚合算法是联邦平均,它平均收集到的 ML 模型的权重,考虑到每个模型用于其本地训练的样本数量。

分布式代理

分布式代理由一个 FL 客户端模块组成,该模块包括通信处理程序和客户端库,以及通过 FL 客户端库连接到 FL 系统的本地 ML 应用程序:

  • FL 客户端模块:FL 客户端模块主要有四个关键功能,包括通信处理程序、代理参与处理程序、模型交换例程和客户端库:

    • 通信处理程序:作为与分配给代理的聚合器通信的通道。发送给聚合器的消息包括代理本身的注册有效载荷和一个将成为聚合模型基础的初始模型。该消息还包含本地训练的模型及其性能数据。此模块支持推送轮询机制,并可以利用 HTTP 或 WebSocket 框架来实现其实现。

    • FL 参与处理程序:通过向聚合器发送包含要注册在 FL 平台上的代理信息本身的消息来处理代理在 FL 过程和周期中的参与。响应消息将设置代理以进行持续和持续的 FL 过程,并且通常包括代理可以利用和本地训练的最新全局模型。

    • 模型交换例程:支持一个同步功能,该功能不断检查是否有新的全局模型可用。如果新的全局模型可用,此模块将从聚合器下载全局模型,并在需要时用全局模型替换本地模型。此模块还会检查客户端状态,并在本地训练过程完成后发送重新训练的模型。

    • 客户端库:包括管理库和通用 FL 客户端库:

      • 当注册其他代理将使用的初始模型时使用管理库。也可以由具有更高控制能力的行政代理请求 FL 系统的任何配置更改。

      • 通用 FL 客户端库提供基本功能,例如启动 FL 客户端核心线程、将本地模型发送到聚合器、在本地机器的特定位置保存模型、操作客户端状态以及下载全局模型。本书主要讨论这种通用类型的库。

  • 本地 ML 引擎和数据管道:这些部分由个别 ML 工程师和科学家设计,可以独立于 FL 客户端功能。此模块本身有一个 ML 模型,用户可以立即使用它进行更准确的推理,一个可以插入 FL 客户端库的培训和测试环境,以及数据管道的实现。虽然上述模块和库可以通用并提供为任何 ML 应用程序的应用程序编程接口API)或库,但此模块根据要开发的 AI 应用程序的需求是独特的。

数据库服务器

数据库服务器由数据库查询处理器和数据库组成,作为存储。数据库服务器可以位于服务器端,例如在云上,并且与聚合器紧密相连,而推荐的设计是将此数据库服务器与聚合器服务器分开,以解耦功能,增强系统的简单性和弹性。数据库查询处理器和示例数据库表的功能如下:

  • 数据库查询处理器:接受来自聚合器的传入请求,并将所需数据和 ML 模型发送回聚合器。

  • 数据库:存储所有与 FL 过程相关的信息。我们在此列出数据库的一些潜在条目:

    • 聚合器信息:此聚合器相关信息包括聚合器本身的 ID、IP 地址和各种端口号、系统注册和更新时间以及系统状态。此外,此条目还可以包括模型聚合相关信息,例如 FL 的轮次及其信息以及聚合标准。

    • 代理信息:此代理相关信息包括代理本身的 ID、IP 地址和各种端口号、系统注册和更新时间以及系统状态。此条目还可以包含用于同步 FL(在本章的同步和异步 FL部分中解释)的 opt-in/out 状态,以及一个标志来记录代理是否在过去有过不良行为(例如,参与投毒攻击,或返回结果非常慢)。

    • 基础模型信息:基础模型信息用于注册初始 ML 模型,其架构和信息用于 FL 轮次的整个流程。

    • 本地模型:本地模型的信息包括唯一标识个别 ML 模型的模型 ID、模型的生成时间、上传模型的代理 ID、从代理接收模型的聚合器 ID 等。通常,模型 ID 唯一映射到实际 ML 模型文件的存储位置,这些文件可以存储在数据库服务器或某些云存储服务中,例如亚马逊网络服务的 S3 存储桶等。

    • 集群全局模型:集群全局模型的信息类似于本地模型可以在数据库中记录的信息,包括模型 ID、聚合器 ID、模型的生成时间等。一旦聚合器创建了一个聚合模型,数据库服务器将接受全局模型并将它们存储在数据库服务器或任何云存储服务中。任何全局模型都可以由聚合器请求。

    • 性能数据:本地和全局模型的表现可以追踪,作为附加到这些模型的元数据。这些性能数据将被用于确保在将聚合模型实际部署到用户 ML 应用之前,聚合模型的表现足够好。

注意

simple-fl存储库的代码示例中,仅覆盖了与本地模型和集群模型相关的数据库表,以简化整个 FL 过程的解释。

现在 FL 系统的基本架构已经介绍完毕,接下来我们将讨论如果代理端设备计算资源有限时,如何增强 FL 系统的架构。

中间服务器用于低计算能力的代理设备

有时,本地用户设备的计算能力有限 – 在这些设备上可能难以进行 ML 训练,但仅通过下载全局模型就可以进行推理或预测。在这些情况下,FL 平台可能能够设置一个额外的中间服务器层,例如使用智能手机、平板电脑或边缘服务器。例如,在医疗 AI 应用中,用户可以在他们的智能手表上管理他们的健康信息,这些信息可以传输到他们的智能平板电脑或与笔记本电脑同步。在这些设备上,重新训练 ML 模型和集成分布式代理功能都很简单。

因此,系统架构需要根据 FL 系统集成的应用进行修改或重新设计,并且可以使用分布式代理应用中间服务器概念来实现 FL 过程。我们不需要修改聚合器与中间服务器之间的交互和通信机制。只需在用户设备和中间服务器之间实现 API,大多数情况下就可以实现 FL。

图 3.2 展示了聚合器、中间服务器和用户设备之间的交互:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_02.jpg

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_02.jpg

图 3.2 – 带有中间服务器的 FL 系统

现在我们已经了解了 FL 系统的基本架构和组件,接下来我们将探讨在下一节中 FL 系统是如何运行的。

理解 FL 系统流程 – 从初始化到持续运行

每个分布式代理都属于一个由 FL 服务器管理的聚合器,在那里进行 ML 模型聚合,以合成一个将要发送回代理的全局模型。这个概念听起来很简单,因此我们将更详细地探讨这些过程的整个流程。

我们还定义了一个集群全局模型,我们简单地称之为集群模型全局模型,它是从分布式代理收集的本地模型的聚合 ML 模型。

注意

在接下来的两个章节中,我们将指导您如何实现本章讨论的程序和消息序列。然而,一些系统操作视角,如聚合器或代理在数据库中的系统注册,在simple-fl存储库的代码示例中没有介绍,以简化整个 FL 过程的解释。

数据库、聚合器和代理的初始化

初始化过程的顺序相当简单。初始化和注册过程需要按照数据库、聚合器和代理的顺序进行。聚合器和代理在数据库中的整体注册顺序如图 3.3 所示:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_03.jpg

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_03.jpg

图 3.3 – 数据库服务器中聚合器和代理注册的过程

这里是 FL 系统中每个组件的初始化和注册过程:

  • 数据库服务器初始化:FL 系统操作的第一步是启动数据库服务器。多个组织提供了一些简单的框架,这些框架不包括数据库或数据库服务器。然而,为了维护联邦 ML 模型的过程,建议您使用数据库,即使是一个轻量级的数据库,如 SQLite 数据库。

  • 聚合器初始化和注册:在代理开始运行并上传机器学习模型之前,应该设置并运行聚合器。当聚合器开始运行并首次连接到数据库服务器时,注册过程会自动进行,同时也会检查聚合器是否安全连接。如果注册过程失败,它会收到数据库发送回来的注册失败消息。此外,如果聚合器在失去与数据库的连接后再次尝试连接到数据库,数据库服务器会检查聚合器是否已经注册。如果是这种情况,数据库服务器响应中会包含已注册聚合器的系统信息,以便聚合器可以从它停止的地方开始。如果聚合器使用 HTTP 或 WebSocket,它可能需要发布 IP 地址和端口号以便代理连接。

  • 代理初始化和注册:通常情况下,如果代理知道它想要连接的聚合器,注册过程类似于聚合器连接到数据库服务器的方式。连接过程应该足够简单,只需使用 IP 地址、聚合器的端口号(如果我们使用某些框架,如 HTTP 或 WebSocket)以及一个认证令牌向该聚合器发送一个参与消息。如果代理在失去与聚合器的连接后再次尝试连接到聚合器,数据库服务器会检查代理是否已经注册。如果代理已经注册,数据库服务器响应中会包含已注册代理的系统信息,以便代理可以从与聚合器断开连接的点开始。

特别地,当它收到代理的参与消息时,聚合器会按照图 3.4中的流程进行操作。接收参与请求后的关键过程是(i)检查代理是否可信,或者代理是否已经注册,以及(ii)检查初始全局模型是否已经注册。如果(i)成立,注册过程将继续。如果(初始)全局模型已经注册,代理将能够接收全局模型并开始使用该全局模型在代理端进行本地训练过程。

在聚合器端,代理的参与和注册过程如图 3.4所示:

![图 3.4 – 代理通过聚合器的注册过程

![img/B18369_03_04_New.jpg]

图 3.4 – 代理通过聚合器的注册过程

现在我们已经了解了 FL 系统组件的初始化和注册过程,让我们继续到正在进行的 FL 过程的基本配置,这涉及到上传初始机器学习模型。

初始代理的初始模型上传过程

运行 FL 过程的下一步是注册初始机器学习模型,该模型的架构将被 FL 的所有聚合器和代理在整个连续过程中使用。初始模型可以由拥有机器学习应用程序和 FL 服务器的公司分发。他们可能会将初始基模型作为聚合器配置的一部分提供。

我们将用作模型聚合参考的初始机器学习模型称为基模型。我们还将上传初始基模型的代理称为初始代理。基模型信息可能包括机器学习模型本身以及生成时间和初始性能数据。因此,初始化基模型的过程可以在图 3.5中看到:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_05.jpg

](https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_05.jpg)

图 3.5 – 初始代理的基模型上传过程

现在,FL 过程已准备好进行。接下来,我们将了解 FL 周期,这是 FL 过程的核心部分。

FL 系统的整体循环和过程

在本节中,我们只提供一个单代理和聚合器的示例,但在实际案例和操作中,代理环境是多样的,分散到分布式设备中。以下是如何上传、聚合、存储本地模型并将其作为全局模型发送回代理的流程列表:

  1. 除了初始代理之外的其他代理将请求全局模型,这是一个更新的聚合机器学习模型,以便将其部署到自己的应用程序中。

  2. 一旦代理从聚合器获取更新后的模型并部署它,代理将使用随后获得的新数据在本地重新训练机器学习模型,以反映数据的最新性和时效性。代理还可以参与多个轮次,使用不同的数据来吸收其本地示例和倾向。再次强调,这些本地数据不会与聚合器共享,并保持在分布式设备上本地。

  3. 在重新训练本地机器学习模型(当然,该模型与 FL 的全局或基模型具有相同的架构)之后,代理调用 FL 客户端 API 将模型发送到聚合器。

  4. 聚合器接收本地机器学习模型并将模型推送到数据库。聚合器跟踪收集到的本地模型数量,只要联盟轮次开放,就会继续接受本地模型。轮次可以通过任何定义的标准关闭,例如聚合器接收足够多的机器学习模型以进行聚合。当满足标准时,聚合器将聚合本地模型并生成一个更新的全局模型,该模型已准备好发送回代理。

  5. 在此过程中,代理会不断查询聚合器,以确定全局模型是否已准备好,或者在某些情况下,根据通信系统设计和网络约束,聚合器可能会将全局模型推送到连接到聚合器的代理。然后,更新的模型被发送回代理。

  6. 在收到更新的全局模型后,代理在准备好时在本地部署和重新训练全局模型。描述的整个过程会重复进行,直到满足联邦学习的终止条件。在某些情况下,没有终止条件来停止这个联邦学习循环和重新训练过程,以便全局模型持续学习最新的现象、当前趋势或与用户相关的倾向。联邦学习回合可以通过手动停止来为一些评估做准备,在部署之前。

图 3.6 展示了联邦学习在代理、聚合器和数据库之间持续进行的过程的总体情况:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_06.jpg

图 3.6 – 联邦学习连续循环概述

现在我们已经了解了联邦学习过程的整体流程,我们将探讨联邦学习过程中的不同回合管理方法:同步联邦学习和异步联邦学习。

同步和异步联邦学习

当模型聚合发生在聚合器时,存在多个与从哪些代理收集多少本地模型相关的标准。在本节中,我们将简要讨论同步和异步联邦学习之间的差异,这些差异已在许多文献中讨论过,例如 https://iqua.ece.toronto.edu/papers/ningxinsu-iwqos22.pdf,因此请参阅以进一步了解这些概念。

同步联邦学习

同步联邦学习要求聚合器在每个回合中选择需要发送本地模型的代理,以便进行模型聚合。这种同步联邦学习方法设计简单,易于实现,适用于需要明确选择代理的联邦学习应用。然而,如果代理的数量变得太多,聚合器可能需要等待很长时间才能完成当前回合,因为代理的计算能力可能不同,其中一些可能存在上传问题或未能上传其本地模型。因此,一些代理在向聚合器发送模型时可能会变得缓慢或完全无法工作。这些缓慢的代理在分布式机器学习中被称为 拖沓者,这促使我们使用异步联邦学习模式。

异步联邦学习

异步 FL 不需要聚合器选择必须上传其本地模型的代理。相反,它为任何受信任的代理在任何时候上传模型打开了大门。此外,聚合器想要生成全局模型时,无论是否有收集所需的最小数量本地模型的标准,或者聚合器需要等待接收来自代理的本地模型直到该轮次聚合发生的一些预定义间隔或截止日期,都可以结束联盟轮次。这种异步 FL 方法为每个 FL 轮次的模型聚合提供了 FL 系统更多的灵活性,但设计可能比简单的同步聚合框架更复杂。

在管理 FL 轮次时,需要考虑运行轮次的实际操作,例如调度和处理延迟响应,所需的最小参与水平,示例存储的细节,使用下载或训练的模型在边缘设备上的应用中进行改进推理,以及处理不良或缓慢的代理。

我们将探讨 FL 过程和流程,重点关注聚合器端。

聚合器端的 FL 周期和流程

聚合器运行两个线程来接受和缓存本地模型,并聚合收集到的本地机器学习模型。在本节中,我们将描述这些过程。

接受和缓存本地机器学习模型

聚合器端接受和缓存本地机器学习模型的流程在图 3.7中展示,并如下解释:

  1. 聚合器将等待代理上传本地机器学习模型。这种方法听起来像是异步 FL。然而,如果聚合器已经决定接受哪些代理上传模型,它只需排除来自不受欢迎的代理的模型上传即可。其他系统或模块可能已经告知不受欢迎的代理不要参与这一轮。

  2. 一旦收到机器学习模型,聚合器会检查该模型是否由受信任的代理上传。此外,如果上传本地模型的代理不在 FL 操作员希望接受的代理列表中,聚合器将丢弃该模型。此外,聚合器需要有一种机制来仅过滤有效模型——否则,存在毒害全局模型并搞乱整个 FL 过程的危险。

  3. 如果上传的本地机器学习模型有效,聚合器将把模型推送到数据库。如果数据库位于不同的服务器上,聚合器将打包模型并发送到数据库服务器。

  4. 当上传的模型存储在数据库中时,它们应在聚合器状态管理器的内存中以适当的格式(如 NumPy 数组)进行缓冲。

此流程会一直运行,直到满足终止条件或 FL 系统的操作员选择停止该过程。图 3.7 描述了接受和缓存本地机器学习模型的流程:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_07.jpg

图 3.7 – 接受和缓存本地机器学习模型的流程

一旦本地机器学习模型被接受并缓存,FL 系统继续进行下一个流程:聚合本地模型。

聚合本地机器学习模型

图 3.8 所示的聚合器端聚合本地机器学习模型的流程如下:

  1. 聚合器持续检查聚合标准是否满足。典型的聚合标准如下:

    • 在此 FL 轮次中迄今为止收集到的本地模型数量。例如,如果代理的数量是 10 个节点,在 8 个节点(意味着 80%的节点)报告了本地训练的模型后,聚合器开始聚合模型。

    • 收集到的模型数量和 FL 轮次所花费的时间的组合。这可以自动化聚合过程,防止系统陷入停滞。

  2. 一旦满足聚合标准,聚合器开始模型聚合过程。通常,联邦平均是一种非常典型但强大的聚合方法。关于模型聚合方法的进一步解释见本章的 模型聚合基础 部分,以及 第七章模型聚合。在本 FL 轮次中,聚合的模型被定义为全局模型。

在 FL 轮次的时间已过期且参与轮次的代理中上传的模型不足的情况下,该轮次可以被放弃或强制对迄今为止收集到的本地模型进行聚合。

  1. 一旦模型聚合完成,聚合器将聚合的全局模型推送到数据库。如果数据库位于不同的服务器上,聚合器将打包全局模型并将其发送到数据库服务器。

  2. 然后,聚合器将全局模型发送给所有代理,或者当代理轮询检查全局模型是否准备就绪时,聚合器将通知代理全局模型已准备就绪,并将其放入对代理的响应消息中。

  3. 在模型聚合的整个过程中,聚合器仅通过递增来更新 FL 轮次的数量。

图 3.8 显示了在收集到足够的模型后,聚合器从检查聚合标准到合成全局模型的流程:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_08.jpg

图 3.8 – 模型合成流程:聚合本地机器学习模型

已经解释了如何聚合本地模型以生成全局模型。现在,让我们看看代理端的 FL 周期,包括本地机器学习模型的重新训练过程。

代理端本地重训练周期和过程

在分布式智能体中,以下状态转换发生,并且为了 FL 循环的持续运行而重复:

  1. 等待 _gm状态,智能体轮询聚合器以接收与全局模型相关的任何更新。基本上,使用轮询方法定期查询更新的全局模型。然而,在某些特定设置下,聚合器可以将更新的全局模型推送到所有智能体。

  2. gm_ready是聚合器形成全局模型并由智能体下载后的状态。模型参数在智能体设备中缓存。智能体用下载的全局模型替换其本地机器学习模型。在完全用下载的模型替换本地模型之前,智能体可以检查全局模型的输出是否足够高效以供本地机器学习引擎使用。如果性能不符合预期,用户可以继续使用本地旧模型,直到接收到具有所需性能的全局模型。

  3. 接下来,在训练状态中,智能体可以本地训练模型以最大化其性能。训练好的模型保存在一个本地数据存储中,其中包含训练示例。智能体的 FL 客户端库确认其准备就绪,可以操作可以异步函数访问保护的本地区域模型。

  4. 在本地模型训练完成后,智能体检查新的全局模型是否已发送给智能体。如果全局模型已到达,则丢弃本地训练的机器学习模型,并返回到gm_ready状态。

  5. 在本地训练完成后,智能体进入发送状态,将更新后的本地模型发送回聚合器,然后,智能体返回到等待 _gm状态。

图 3.9 描述了智能体适应和更新机器学习模型的状态转换:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_09.jpg

图 3.9 – 智能体侧状态转换以适应和更新机器学习模型

接下来,我们将讨论基于基线输出的偏差的模型解释,这些基线输出用于异常检测和防止模型退化。

基于基线输出偏差的模型解释

我们还可以通过查看每个本地模型的输出提供解释框架。以下程序可以被认为是确保本地模型始终可用并且可以部署到生产中的方法:

  1. 获取智能体生成的最新机器学习输出以及一个基线输出,该输出可以是用户准备的一个典型期望输出。基线输出可能包括基于过去窗口或操作员、主题专家或基于规则的算法定义的参考点的平均值输出。

  2. 计算本地模型输出与基线输出之间的偏差。

  3. 通过检查偏差是否超过操作员指定的阈值,可以检测到异常或性能下降。如果检测到异常,可以向操作员发送警报,指示故障或机器学习模型处于异常状态。

既然联邦学习的过程已经解释清楚,让我们来看看模型聚合的基本概念,它是联邦学习的关键部分。

模型聚合的基本概念

聚合是联邦学习(FL)中的一个核心概念。实际上,用于聚合模型的策略是联邦学习系统性能的关键理论驱动因素。本节的目的在于介绍在联邦学习系统背景下聚合的高级概念——更深入地讨论高级聚合策略的理论和示例将在第七章 模型聚合中进行。

模型聚合究竟意味着什么?

让我们回顾一下在理解联邦学习系统流程——从初始化到持续运行章节中讨论的聚合器端周期,在过程中分配给某个聚合器的代理完成本地训练并将这些模型传回该聚合器时。任何聚合策略,或任何聚合这些模型的方式,的目标是产生新的模型,这些模型在构成代理收集的所有数据上逐渐提高性能。

需要记住的一个重要观点是,按照定义,联邦学习是一种受限的分布式学习设置版本,其中每个代理收集的本地数据不能被其他代理直接访问。如果这种限制不存在,可以通过从每个代理收集数据并在联合数据集上训练来使模型在所有数据上简单地表现良好;因此,将这种集中训练的模型作为联邦学习方法的靶模型是有意义的。从高层次来看,我们可以将这种不受限制的分布式学习场景视为模型训练前的聚合(在这种情况下,聚合指的是结合每个代理的数据)。由于联邦学习不允许数据被其他代理访问,我们将这种情况视为模型训练后的聚合;在这种情况下,聚合指的是结合每个训练模型从其不同的本地数据集中捕获的智能。总结来说,聚合策略的目标是以一种最终导致泛化模型性能接近相应集中训练模型性能的方式结合模型。

FedAvg – 联邦平均

为了使一些想法更加具体,让我们先看看最著名且最直接的聚合策略之一,即联邦平均FedAvg)。FedAvg 算法的执行过程如下:设https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_F01.png<hthttps://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_F02.png的参数,每个代理拥有一个本地数据集大小为<httpshttps://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_F03.pngs://https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_F04.png/githttps://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_F05.pngvg 返回以下机器学习模型作为聚合模型:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_F06.jpg

实际上,我们通过对一组模型进行加权平均来执行 FedAvg,权重与用于训练模型的本地数据集大小成比例。因此,FedAvg 可以应用到的模型类型是那些可以表示为一些参数值集合的模型。深度神经网络是目前这类模型中最引人注目的——大多数分析 FedAvg 性能的结果都是与深度学习模型一起工作的。

真是令人惊讶,这种相对简单的方法竟然能导致最终模型产生泛化。我们可以通过在玩具二维参数空间中观察 FedAvg 的样子,来直观地检验聚合策略的好处:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_10.jpg

图 3.10 – 来自两个代理(圆形和方形)和目标模型(黑色 x)的二维参数空间中的本地模型

让我们考虑一个有两个新初始化的模型(圆形和方形点)属于不同代理的情况。前面图中的空间代表模型的参数空间,其中每个玩具模型由两个参数定义。随着模型的训练,这些点将在参数空间中移动——目标是接近参数空间中的局部最优解,通常对应于上述集中训练的模型:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_11.jpg

图 3.11 – 没有聚合的本地模型参数变化

每个模型都收敛到各自数据集特定的最优解(来自圆形和方形的两个 x 点),这些最优解不具有泛化能力。因为每个代理只能访问数据的一个子集,所以通过本地训练每个模型所达到的局部最优解将与真实的局部最优解不同;这种差异取决于每个代理的底层数据分布的相似程度。如果模型仅在本地进行训练,那么得到的模型可能无法泛化到所有数据:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_12.jpg

图 3.12 – 添加聚合将局部模型参数移动到每个步骤中两个模型的平均值,导致在目标模型上收敛

在每一步应用 FedAvg 允许我们创建一个聚合模型,该模型最终在参数空间中接近真实局部最优。

此示例展示了 FedAvg 产生泛化模型的基本能力。然而,与真实模型(如高度参数化的深度学习模型)一起工作引入了额外的复杂性,这由 FedAvg 处理,但不是更简单的方法。例如,我们可能会想知道为什么我们不是简单地完全训练每个本地模型,只在最后平均;虽然这种方法在这个玩具案例中可以工作,但观察到仅对真实模型进行一次平均会导致所有数据上的性能较差。FedAvg 过程允许在高度维参数空间内以更稳健的方式达到泛化模型。

本节仅旨在概述联邦学习中的聚合;第七章,模型聚合,包含对不同场景中聚合的更详细解释和示例。

我们现在已经理解了 FL 系统如何与基本模型聚合工作的整个过程。在某些应用中,FL 系统可能需要支持大量的代理以实现其可扩展性。下一节将为您提供一些关于如何更平滑地扩展的想法,特别是在去中心化的横向设计中。

进一步通过横向设计提高可扩展性

在本节中,我们将探讨在需要支持大量设备和用户时如何进一步提高可扩展性。

在实际案例中,集中式联邦学习提供了控制、易于维护和部署以及低通信开销。如果代理数量不是很大,坚持集中式联邦学习比去中心化联邦学习更有意义。然而,当参与代理的数量变得相当大时,可能值得考虑使用去中心化 FL 架构的横向扩展。如今自动扩展框架的最新发展,如Kubernetes框架(https://kubernetes.io/),可以很好地与本章讨论的主题相结合,尽管与 Kubernetes 的实际集成和实现超出了本书的范围。

带有半全局模型的横向设计

在某些用例中,需要许多聚合器来聚类一组代理并在这许多聚合器之上创建一个全局模型。谷歌采用集中式方法来实现这一点,正如在论文《迈向大规模联邦学习》中所述,而为管理多个聚合器设置一个集中式节点可能存在一些弹性问题。其想法很简单:定期在某个中央主节点上聚合所有聚类模型。

另一方面,我们可以实现由多个聚合器创建的集群模型的去中心化聚合方式。这种架构基于两个关键思想:

  • 在没有主节点的情况下,在单个集群聚合器之间进行的模型聚合

  • 半全局模型综合以聚合由其他聚合器生成的集群模型

为了创建半全局模型,去中心化的集群聚合器相互交换它们聚合的集群模型,并近似最优的全局模型。集群聚合器还可以使用数据库定期收集其他集群模型以生成半全局模型。这个框架允许通过综合最新的全局模型来吸收来自分散在许多聚合器上的不同用户集的训练结果,而不需要主节点概念。

基于这种去中心化架构,整个 FL 系统的鲁棒性可以得到增强,因为半全局模型可以在每个集群聚合器独立计算。FL 系统可以进一步扩展,因为每个集群聚合器都负责自己创建自己的半全局模型——不是通过这些聚合器的主节点——因此,去中心化的半全局模型形成具有弹性和移动性。

我们甚至可以将存储上传的本地模型、集群全局模型和半全局模型的数据库解耦。通过将分布式数据库引入 FL 系统,整个系统可以变得更加可扩展、弹性,并且安全,同时还有一些故障转移机制。

例如,每个集群聚合器将集群模型存储在分布式数据库中。集群聚合器可以通过定期从数据库中拉取模型来检索其他聚合器的集群模型。在每个集群聚合器,通过综合拉取的模型生成一个半全局 ML 模型。

图 3.13 展示了多聚合器 FL 系统去中心化水平设计的整体架构:

![图 3.13 – 具有多个聚合器的去中心化 FL 系统架构(水平设计)]

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_13_New.jpg

图 3.13 – 具有多个聚合器的去中心化 FL 系统架构(水平设计)

既然我们已经讨论了如何通过半全局模型概念使用水平设计来增强 FL 系统,接下来,我们将探讨分布式数据库框架以进一步确保可扩展性和弹性。

分布式数据库

此外,可以通过在数据驱动的分布式数据库中存储历史模型数据来提供模型更新的问责制。星际文件系统IPFS)和区块链是众所周知的分布式数据库,它们确保了全局模型更新的问责制。当一个集群聚合器基于其他集群模型生成半全局模型后,该半全局模型被存储在分布式数据库中。分布式数据库使用唯一标识符管理这些模型的信息。为了保持所有模型的一致性,包括本地、集群和半全局模型,每个机器学习模型都被分配了一个全局唯一的标识符,例如哈希值,这可以通过使用Chord 分布式哈希表Chord DHT)的概念来实现。Chord DHT 是一个用于互联网应用的可扩展的 P2P 查找协议。

集群聚合器可以在集群模型上存储元数据,例如时间戳和哈希标识符。这为我们提供了对模型合成的进一步问责制,确保集群模型没有被更改。一旦恶意模型可检测,还可以识别出一组发送有害集群模型以破坏半全局模型的聚合器。这些模型可以通过分析集群模型权重的模式或与其他集群模型的偏差来过滤,当差异太大而无法依赖时。

分布式数据库的本质是存储分布式 FL 系统所有易变的状态信息。在发生故障的情况下,FL 系统可以从分布式数据库中恢复。集群聚合器也会根据系统操作员定义的某个间隔交换它们的集群模型。因此,集群模型和聚合器之间的映射表需要与本地、集群和半全局模型上的元信息一起记录在数据库中,例如这些模型的生成时间和训练样本的大小。

在多聚合器场景中的异步代理参与

分布式代理可以在他们想要加入 FL 过程时向可连接的聚合器广播参与消息。参与消息可以包含代理的唯一 ID。然后,集群聚合器之一会返回一个集群聚合器 ID,这可能是基于一个共同哈希函数生成的值,代理应该属于该值。图 3.14展示了如何使用哈希函数将代理分配给特定的集群聚合器:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_14.jpg

图 3.14 – 代理加入 FL 系统中一个集群聚合器的序列

在下一节中,我们将探讨如何基于聚合多个集群全局模型来生成半全局模型。

半全局模型合成

在代理被分配到特定的集群聚合器后,代理开始参与 FL 过程。如果它已注册,则请求基础 ML 模型;否则,它需要上传基础模型以开始本地训练。上传本地模型、生成集群和半全局模型的过程将继续,直到代理或聚合器从系统中断开连接。本地和集群模型上传过程、聚合过程以及半全局模型合成和拉取的序列在图 3.15中说明:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_15.jpg

图 3.15 – 从上传本地模型到拉取半全局模型合成过程的序列

让我们看看使用代理、聚合器和分布式数据库之间的流程图来查看半全局模型合成。

聚合器从代理那里接收一个本地模型。在接收本地模型时,模型过滤过程将决定是否接受上传的模型。此框架可以使用许多不同的方法实现,例如检查全局和本地模型权重差异的基本方案。如果模型无效,则简单地丢弃本地模型。

然后,通过聚合所有接受的本地模型创建一个集群模型。聚合器将集群模型存储在数据库中,同时检索其他集群聚合器生成的集群模型。然后从这些集群模型中合成一个半全局模型,并将用于分配给集群聚合器的代理。

图 3.16展示了集群聚合器如何使用分布式数据库进行集群和半全局模型合成:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_03_16.jpg

图 3.16 – 半全局模型合成的流程和流程图

聚合器不需要检索每一轮生成的所有集群模型来创建半全局模型。为了合成一个半全局模型,全局模型可以最终基于每个聚合器随机选择的模型子集收敛。采用这种方法,通过在每次更新时妥协创建全局模型的条件,可以增强聚合器的鲁棒性和独立性。此框架还可以解决集中式 FL 系统典型的计算和通信瓶颈。

摘要

在本章中,我们讨论了 FL 系统内的潜在架构、流程流程和消息序列。典型的 FL 系统架构包括一个聚合器、代理和数据库服务器。这三个组件不断相互通信以交换系统信息和 ML 模型,以实现模型聚合。

实现良好的 FL 系统的关键是解耦关键组件并仔细设计它们之间的接口。我们专注于其设计的简单性,以便只需向系统中添加额外的组件即可实现进一步的增强。水平分布式设计也有助于实现可扩展的 FL 系统。

在下一章中,我们将讨论在服务器端实现 FL 的实现细节。由于本章已介绍了功能的一些关键方面,您将能够实现基本系统并使用一些机器学习应用程序顺利运行模拟。

进一步阅读

本章讨论的一些概念可以通过阅读以下论文进一步探索:

  • Keith Bonawitz, Hubert Eichner, Wolfgang Grieskamp, Dzmitry Huba, Alex Ingerman, Vladimir Ivanov, Chloe Kiddon 等. 迈向大规模联邦学习:系统设计. 机器学习与系统会议论文集 1 (2019): 374–388, (arxiv.org/abs/1902.01046).

  • Kairouz, P., McMahan, H. B., Avent, B., Bellet, A., Bennis, M., Bhagoji, A. N.,以及 Zhao, S. (2021). 联邦学习中的进展和开放问题. 机器学习基础与趋势, 14 (1 和 2): 1–210, (arxiv.org/abs/1912.04977).

  • Stoica, I., Morris, R., Liben-Nowell, D., Karger, D., Kaashoek, M., Dabek, F.,以及 Balakrishnan, H. (2003). Chord: 一个用于互联网应用的可扩展对等查找协议, IEEE/ACM Transactions on Networking., 第 11 卷,第 1 期,第 17–32 页,(resources.mpi-inf.mpg.de/d5/teaching/ws03_04/p2p-data/11-18-writeup1.pdf).

  • Juan Benet. (2014). IPFS – 基于内容寻址、版本化、P2P 文件系统, (arxiv.org/abs/1407.3561).

第二部分 联邦学习系统的设计与实现

在本部分,我们将使用 Python 解释联邦学习FL)系统的实现原理。您将学习如何设计软件组件并编写 FL 服务器和客户端的基本功能代码。此外,您还能够将您自己的机器学习过程集成到 FL 系统中,并运行和分析基于 FL 的应用程序。

本部分包括以下章节:

  • 第四章, 使用 Python 实现联邦学习服务器

  • 第五章, 联邦学习客户端实现

  • 第六章, 运行联邦学习系统并分析结果

  • 第七章, 模型聚合

第四章:使用 Python 实现联邦学习服务器

联邦学习(federated learningFL)系统的服务器端实现对于实现真正的 FL 启用应用至关重要。我们在前一章中讨论了基本系统架构和流程。在本章中,我们将讨论更多实际实现,以便您可以创建一个简单的 FL 系统服务器和聚合器,各种机器学习ML)应用可以连接并在此测试。

本章描述了在第三章“联邦学习系统的工作原理”中讨论的 FL 服务器端组件的实际实现方面。基于对 FL 系统整个工作流程的理解,您将能够进一步使用这里和 GitHub 上提供的示例代码来实现。一旦您理解了使用示例代码的基本实现原则,根据您自己的设计增强 FL 服务器功能将是一个有趣的部分。

在本章中,我们将涵盖以下主题:

  • 聚合器的主要软件组件

  • 实现 FL 服务器端功能

  • 使用状态管理维护聚合模型

  • 聚合本地模型

  • 运行 FL 服务器

  • 实现和运行数据库服务器

  • FL 服务器的潜在增强

技术要求

本章中介绍的所有代码文件都可以在 GitHub 上找到:github.com/tie-set/simple-fl

重要提示

您可以使用代码文件用于个人或教育目的。然而,请注意,我们不会支持商业部署,并且不会对使用代码造成的任何错误、问题或损害负责。

聚合器和数据库的主要软件组件

在前一章中介绍了具有 FL 服务器的聚合器架构。在这里,我们将介绍实现 FL 系统基本功能的代码。聚合器和数据库端的 Python 软件组件列在fl_mainaggregator目录中,以及lib/utilpseudodb文件夹中,如图 4.1所示:

![图 4.1 – 聚合器的 Python 软件组件以及内部库和伪数据库]

](https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_04_01.jpg)

图 4.1 – 聚合器的 Python 软件组件以及内部库和伪数据库

以下是对聚合器中 Python 代码文件的简要描述。

聚合器端代码

在本节中,我们将涉及与 FL 服务器线程、FL 状态管理以及模型聚合本身相关的聚合器端的主要 Python 文件。这些聚合器端的代码文件位于aggregator文件夹中。仓库中的代码仅捕获模型聚合的视角,而不是创建全面 FL 平台的整个工程方面。

FL 服务器代码(server_th.py)

这是实现联邦学习过程整个基本流程的主要代码,包括聚合器本身、代理和数据库之间的通信过程,以及协调代理参与和 ML 模型的聚合。它还初始化从第一个连接的代理发送的全球集群模型。它管理接收本地模型和集群模型合成例程,在收集足够的本地模型后形成集群全局模型。

联邦学习状态管理器(state_manager.py)

状态管理器缓冲本地模型和集群模型数据,这些数据对于聚合过程是必需的。当聚合器从代理接收本地模型时,缓冲区将被填充,在进入联邦学习过程的下一轮时将被清除。聚合标准检查函数也定义在此文件中。

聚合代码(aggregation.py)

聚合 Python 代码将列出聚合模型的基本算法。在本章中使用的代码示例中,我们只介绍称为联邦平均FedAvg)的平均方法,它考虑了本地数据集的大小来平均收集的本地模型的权重,从而生成集群全局模型。

lib/util 代码

内部库的 Python 文件(communication_handler.pydata_struc.pyhelpers.pymessengers.pystates.py)将在附录探索内部库中进行解释。

数据库端代码

数据库端代码包括伪数据库和位于pseudodb文件夹中的 SQLite 数据库 Python 代码文件。伪数据库代码运行一个服务器以接收来自聚合器的消息,并将它们处理为可用于联邦学习过程的 ML 模型数据。

伪数据库代码(pseudo_db.py)

伪数据库 Python 代码的功能是接受来自聚合器与本地和全球集群模型相关的消息,并将信息推送到数据库。它还将在本地文件系统中保存 ML 模型二进制文件。

SQLite 数据库代码(sqlite_db.py)

SQLite 数据库 Python 代码在指定的路径创建实际的 SQLite 数据库。它还具有将有关本地和全球集群模型的数据条目插入数据库的功能。

现在聚合器和数据库端软件组件已经定义好了,让我们继续进行聚合器的配置。

面向聚合器的配置

以下代码是聚合器端配置参数的示例,这些参数定义在config_aggregator.json文件中,该文件位于setups文件夹中:

{
    "aggr_ip": "localhost",
    "db_ip": "localhost",
    "reg_socket": "8765",
    "exch_socket": "7890",
    "recv_socket": "4321",
    "db_socket": "9017",
    "round_interval": 5,
    "aggregation_threshold": 1.0,
    "polling": 1
}

参数包括聚合器的 IP(FL 服务器的 IP)、数据库服务器的 IP 以及数据库和代理的各种端口号。轮询间隔是检查聚合标准的时间间隔,聚合阈值定义了开始聚合过程所需的收集的本地 ML 模型的百分比。轮询标志与是否利用polling方法进行聚合器和代理之间的通信有关。

现在我们已经介绍了聚合器侧的配置文件的概念,让我们继续了解代码的设计和实现。

实现 FL 服务器端功能

在本节中,我们将通过实际的代码示例来解释如何使用 FL 服务器系统实现聚合器的第一个版本,这些示例代码位于aggregator目录下的server_th.py文件中。通过这种方式,您将了解 FL 服务器系统的核心功能以及它们的实现方式,以便您能够进一步扩展更多功能。因此,我们只涵盖进行简单 FL 过程所必需的重要和核心功能。潜在的提升将在本章后面的部分列出,即FL 服务器的潜在提升

server_th.py处理与 FL 服务器端相关的所有基本功能方面,所以让我们在下一节中看看。

导入 FL 服务器的库

FL 服务器端的代码从导入必要的库开始。特别是,lib.util处理使 FL 实现变得容易的基本支持功能。代码的详细信息可以在 GitHub 仓库中找到。

服务器代码导入StateManagerAggregator用于 FL 过程。关于状态管理器和聚合的代码将在本章后面的部分讨论,即使用状态管理器维护聚合模型聚合本地模型

下面是导入必要库的代码:

import asyncio, logging, time, numpy as np
from typing import List, Dict, Any
from fl_main.lib.util.communication_handler import init_fl_server, send, send_websocket, receive 
from fl_main.lib.util.data_struc import convert_LDict_to_Dict
from fl_main.lib.util.helpers import read_config, set_config_file
from fl_main.lib.util.messengers import generate_db_push_message, generate_ack_message, generate_cluster_model_dist_message, generate_agent_participation_confirmation_message
from fl_main.lib.util.states import ParticipateMSGLocation, ModelUpMSGLocation, PollingMSGLocation, ModelType, AgentMsgType
from .state_manager import StateManager
from .aggregation import Aggregator

在我们导入必要的库之后,让我们继续设计一个 FL Server类。

定义 FL 服务器类

在实践中,定义Server类是明智的,使用它可以创建一个具有在第三章联邦学习系统的工作原理中讨论的功能的 FL 服务器实例,如下所示:

class Server:
    """
    FL Server class defining the functionalities of 
    agent registration, global model synthesis, and
    handling mechanisms of messages by agents. 
    """

再次强调,server类主要提供代理注册和全局模型合成的功能,并处理上传的本地模型和来自代理的轮询消息的机制。它还充当聚合器和数据库以及聚合器和代理之间的接口。

FL 服务器类的功能现在很清晰——接下来是初始化和配置服务器。

初始化 FL 服务器

__init__ 构造函数内部的以下代码是 Server 实例初始化过程的示例。

def __init__(self):
    config_file = set_config_file("aggregator")
    self.config = read_config(config_file)
    self.sm = StateManager()
    self.agg = Aggregator(self.sm)
    self.aggr_ip = self.config['aggr_ip']
    self.reg_socket = self.config['reg_socket']
    self.recv_socket = self.config['recv_socket']
    self.exch_socket = self.config['exch_socket']
    self.db_ip = self.config['db_ip']
    self.db_socket = self.config['db_socket']
    self.round_interval = self.config['round_interval']
    self.is_polling = bool(self.config['polling'])
    self.sm.agg_threshold = 
                     self.config['aggregation_threshold']

然后,self.config 存储了前面代码块中讨论的 config_aggregator.json 文件中的信息。

self.smself.agg 分别是下面讨论的状态管理类和聚合类实例。

self.aggr_ip 从聚合器的配置文件中读取一个 IP 地址。

然后,将设置 reg_socketrecv_socket,其中 reg_socket 用于代理注册自身以及存储为 self.aggr_ip 的聚合器 IP 地址,而 recv_socket 用于从代理接收本地模型,以及存储为 self.aggr_ip 的聚合器 IP 地址。在这个示例代码中,reg_socketrecv_socket 都可以读取自聚合器的配置文件。

exch_socket 是用于将全局模型连同代理的 IP 地址一起发送回代理的端口号,该端口号在初始化过程中通过配置参数进行初始化。

随后,将配置连接到数据库服务器的信息,其中 dp_ipdb_socket 分别是数据库服务器的 IP 地址和端口号,所有这些信息都是从 config_aggregator.json 文件中读取的。

round_interval 是一个检查是否满足启动模型聚合过程聚合标准的时间间隔。

is_polling 标志与是否使用代理端的 polling 方法有关。轮询标志必须与代理端配置文件中使用的标志相同。

agg_threshold 也是在 ready_for_local_aggregation 函数中使用的超过收集到的本地模型数量的百分比,如果收集到的模型百分比等于或超过 agg_threshold,则 FL 服务器开始本地模型的聚合过程。

在这个示例代码中,self.round_intervalself.agg_threshold 都是从配置文件中读取的。

现在配置已经设置好了,我们将讨论如何注册尝试参与联邦学习(FL)过程的代理。

代理注册函数

在本节中,描述了简化和异步的 register 函数,用于接收指定模型结构的参与消息,并返回用于未来模型交换的套接字信息。它还向代理发送欢迎消息作为响应。

代理的注册过程在下面的示例代码中进行了描述:

async def register(self, websocket: str, path):        
    msg = await receive(websocket)
    es = self._get_exch_socket(msg)
    agent_nm = msg[int(ParticipateMSGLocation.agent_name)]
    agent_id = msg[int(ParticipateMSGLocation.agent_id)]
    ip = msg[int(ParticipateMSGLocation.agent_ip)]
    id, es = self.sm.add_agent(agent_nm, agent_id, ip, es)
    if self.sm.round == 0:
        await self._initialize_fl(msg)
    await self._send_updated_global_model( \
        websocket, id, es)

在这个示例代码中,从代理接收到的消息,在此定义为 msg,是通过从 communication_handler 代码导入的 receive 函数进行解码的。

特别是,self.sm.add_agent(agent_name, agent_id, addr, es) 函数接收代理名称、代理 ID、代理 IP 地址以及包含在 msg 消息中的 exch_socket 号码,以便接受来自该代理的消息,即使代理暂时断开连接然后再次连接。

之后,注册函数检查是否应该根据 self.sm.round 追踪的 FL 轮次继续到初始化模型的过程。如果 FL 进程尚未开始,即 self.sm.round0,它将调用 _initialize_fl(msg) 函数以初始化 FL 进程。

然后,FL 服务器通过调用 _send_updated_global_model(websocket, id, es) 函数将更新的全局模型发送回代理。该函数接受 WebSocket、代理 ID 和 exch_socket 作为参数,并向代理创建一个回复消息以通知其参与消息是否已被接受。

在此示例代码中,简化了代理与 FL 服务器的注册过程。在生产环境中,所有来自代理的系统信息都将推送到数据库,以便任何失去与 FL 服务器连接的代理都可以通过重新连接到 FL 服务器来随时恢复。

通常情况下,如果 FL 服务器安装在云端,并且代理从其本地环境连接到 FL 服务器,由于安全设置(如防火墙)等原因,聚合器到代理的这种回推机制将不会工作。本书中我们不详细讨论安全问题,因此鼓励您使用 simple-fl 代码中实现的 polling 方法在基于云的聚合器和本地代理之间进行通信。

获取套接字信息以将全局模型推回代理

下面的 _get_exch_socket 函数从代理接收参与消息,并根据消息中的模拟标志决定使用哪个端口来联系代理:

def _get_exch_socket(self, msg):
    if msg[int(ParticipateMSGLocation.sim_flag)]:
        es = msg[int(ParticipateMSGLocation.exch_socket)]
    else:
        es = self.exch_socket
    return es

在这个实现练习中,我们支持进行模拟运行,通过这种方式,你可以在一台机器上运行数据库、聚合器和多个代理的所有 FL 系统组件。

如有必要,初始化 FL 进程

异步的 _initialize_fl 函数用于初始化一个 FL 进程,该进程仅在 FL 轮次为 0 时被调用。以下是其代码实现:

async def _initialize_fl(self, msg):
    agent_id = msg[int(ParticipateMSGLocation.agent_id)]
    model_id = msg[int(ParticipateMSGLocation.model_id)]
    gene_time = msg[int(ParticipateMSGLocation.gene_time)]
    lmodels = msg[int(ParticipateMSGLocation.lmodels)] 
    perf_val = msg[int(ParticipateMSGLocation.meta_data)]
    init_flag = \
        bool(msg[int(ParticipateMSGLocation.init_flag)])
    self.sm.initialize_model_info(lmodels, init_flag)
    await self._push_local_models( \
        agent_id, model_id, lmodels, gene_time, perf_val)
    self.sm.increment_round()

从接收到的消息中提取代理 ID (agent_id)、模型 ID (model_id)、来自代理的本地模型 (lmodels)、模型的生成时间 (gene_time)、性能数据 (perf_val) 和 init_flag 的值后,调用状态管理器代码中的 initialize_model_info 函数,该函数将在本章后面的部分进行解释。

此函数随后通过调用本节中描述的_push_local_models函数将本地模型推送到数据库。您可以参考将本地和全局模型推送到数据库的函数部分。

然后,轮次增加以进入 FL 的第一轮。

使用更新的全局模型确认代理参与

在初始化(集群)全局模型后,需要通过此注册过程将全局模型发送到连接到聚合器的代理。以下异步的_send_updated_global_model函数处理将全局模型发送到代理的过程,它以 WebSocket 信息、代理 ID 和用于联系代理的端口号作为参数。以下代码块描述了该过程:

async def _send_updated_global_model( \
                   self, websocket, agent_id, exch_socket):
    model_id = self.sm.cluster_model_ids[-1]
    cluster_models = \
       convert_LDict_to_Dict(self.sm.cluster_models)
    reply = generate_agent_participation_confirm_message(
       self.sm.id, model_id, cluster_models, self.sm.round,
       agent_id, exch_socket, self.recv_socket)
    await send_websocket(reply, websocket)

如果 FL 过程已经启动,即self.sm.round已经大于 0,我们将从它们的缓冲区获取集群模型,并使用convert_LDict_to_Dict库函数将它们转换为字典格式。

然后,使用generate_ agent_participation_confirm_message函数包装回复消息,并通过调用send_websocket(reply, websocket)函数将其发送给刚刚连接或重新连接到聚合器的代理。请参阅将全局模型发送到代理的函数部分。

既然我们已经了解了代理的注册过程,让我们继续到处理本地机器学习模型和轮询消息的实现。

处理来自本地代理的消息的服务器

FL 服务器上的异步receive_msg_from_agent过程持续运行,以接收本地模型更新并将它们推送到数据库和内存缓冲区,临时保存本地模型。它还响应来自本地代理的轮询消息。以下代码解释了这一功能:

async def receive_msg_from_agent(self, websocket, path):
    msg = await receive(websocket)
    if msg[int(ModelUpMSGLocation.msg_type)] == \
                                       AgentMsgType.update:
        await self._process_lmodel_upload(msg)
    elif msg[int(PollingMSGLocation.msg_type)] == \
                                      AgentMsgType.polling:
        await self._process_polling(msg, websocket)  

接下来,我们将查看由receive_msg_from_agent函数调用的两个函数,如前述代码块所示,它们是_process_lmodel_upload_process_polling函数。

处理本地代理的模型上传

异步的_process_lmodel_upload函数处理AgentMsgType.update消息。以下代码块是关于接收本地机器学习模型并将它们放入状态管理器缓冲区的函数:

async def _process_lmodel_upload(self, msg):
    lmodels = msg[int(ModelUpMSGLocation.lmodels)]
    agent_id = msg[int(ModelUpMSGLocation.agent_id)]
    model_id = msg[int(ModelUpMSGLocation.model_id)]
    gene_time = msg[int(ModelUpMSGLocation.gene_time)]
    perf_val = msg[int(ModelUpMSGLocation.meta_data)]
    await self._push_local_models( \ 
        agent_id, model_id, lmodels, gene_time, perf_val)
    self.sm.buffer_local_models( \ 
        lmodels, participate=False, meta_data=perf_val)

首先,它从接收到的消息中提取代理 ID(agent_id)、模型 ID(model_id)、来自代理的本地模型(lmodels)、模型的生成时间(gene_time)和性能数据(perf_val),然后调用_push_local_models函数将本地模型推送到数据库。

然后调用buffer_local_models函数以将本地模型(lmodels)保存在内存缓冲区中。buffer_local_models函数在使用状态管理器维护聚合模型部分中描述。

处理代理的轮询

以下异步的_process_polling函数处理AgentMsgType.polling消息:

async def _process_polling(self, msg, websocket):
    if self.sm.round > \
                   int(msg[int(PollingMSGLocation.round)]):
        model_id = self.sm.cluster_model_ids[-1]
        cluster_models = \
            convert_LDict_to_Dict(self.sm.cluster_models)
        msg = generate_cluster_model_dist_message( \
            self.sm.id, model_id, self.sm.round, \
            cluster_models)
        await send_websocket(msg, websocket)
    else:
        msg = generate_ack_message()
        await send_websocket(msg, websocket)  

如果 FL 轮次(self.sm.round)大于本地 FL 轮次,该轮次包含在本地代理自身维护的接收消息中,这意味着模型聚合是在代理上次向聚合器轮询的时间和现在之间的期间完成的。

在此情况下,通过generate_cluster_model_dist_message函数将cluster_models转换为字典格式后,通过send_websocket函数打包成响应消息并回传给代理。

否则,聚合器将仅通过generate_ack_message函数生成的ACK消息返回给代理。

现在我们已经准备好聚合从代理接收到的本地模型,让我们来看看模型聚合例程。

全局模型合成例程

在 FL 服务器中设计的async def model_synthesis_routine(self)过程通过定期检查存储的模型数量,并在收集到足够多的本地模型以满足聚合阈值时执行全局模型合成。

以下代码描述了模型合成例程过程,该过程定期检查聚合标准并执行模型合成:

async def model_synthesis_routine(self):
    while True:
        await asyncio.sleep(self.round_interval)
        if self.sm.ready_for_local_aggregation():  
            self.agg.aggregate_local_models()
            await self._push_cluster_models()
            if self.is_polling == False:
                await self._send_cluster_models_to_all()
            self.sm.increment_round()

此过程是异步的,使用while循环运行。

特别是,一旦满足ready_for_local_aggregation(在使用状态管理器维护聚合模型部分中解释)设定的条件,就会调用从aggregator.py文件导入的aggregate_local_models函数,该函数基于FedAvg对收集到的本地模型权重进行平均。关于aggregate_local_models函数的进一步解释可以在聚合本地模型部分找到。

然后,调用await self._push_cluster_models()以将聚合的集群全局模型推送到数据库。

await self._send_cluster_models_to_all()用于在未使用polling方法的情况下,将更新的全局模型发送给连接到聚合器的所有代理。

最后但同样重要的是,FL 轮次通过self.sm.increment_round()递增。

一旦生成集群全局模型,就需要使用以下章节中描述的函数将模型发送到连接的代理。

用于将全局模型发送到代理的函数

将全局模型发送到连接的代理的功能由_send_cluster_models_to_all函数处理。这是一个异步函数,用于将集群全局模型发送到本聚合器下的所有代理,如下所示:

async def _send_cluster_models_to_all(self):
    model_id = self.sm.cluster_model_ids[-1]
    cluster_models = \
        convert_LDict_to_Dict(self.sm.cluster_models)
    msg = generate_cluster_model_dist_message( \
        self.sm.id, model_id, self.sm.round, \
        cluster_models)
    for agent in self.sm.agent_set:
        await send(msg, agent['agent_ip'], agent['socket'])

在获取集群模型信息后,它使用generate_cluster_model_dist_message函数创建包含集群模型、轮次、模型 ID 和聚合器 ID 信息的消息,并调用communication_handler库中的send函数,将全局模型发送到通过代理参与过程注册的agent_set中的所有代理。

已经解释了将集群全局模型发送到连接的代理。接下来,我们将解释如何将本地和集群模型推送到数据库。

将本地和全局模型推送到数据库的函数

_push_local_models_push_cluster_models函数都内部调用以将本地模型和集群全局模型推送到数据库。

将本地模型推送到数据库

以下是将一组本地模型推送到数据库的_push_local_models函数:

async def _push_local_models(self, agent_id: str, \
        model_id: str, local_models: Dict[str, np.array], \
        gene_time: float, performance: Dict[str, float]) \
        -> List[Any]:
    return await self._push_models(
        agent_id, ModelType.local, local_models, \
        model_id, gene_time, performance)

_push_local_models函数接受诸如代理 ID、本地模型、模型 ID、模型的生成时间以及性能数据等参数,如果有响应消息则返回。

将集群模型推送到数据库

以下是将集群全局模型推送到数据库的_push_cluster_models函数:

async def _push_cluster_models(self) -> List[Any]:
    model_id = self.sm.cluster_model_ids[-1] 
    models = convert_LDict_to_Dict(self.sm.cluster_models)
    meta_dict = dict({ \
        "num_samples" : self.sm.own_cluster_num_samples})
    return await self._push_models( \
        self.sm.id, ModelType.cluster, models, model_id, \
        time.time(), meta_dict)

在此代码中,_push_cluster_models函数不接收任何参数,因为这些参数可以从状态管理器的实例信息和缓冲内存数据中获取。例如,self.sm.cluster_model_ids[-1]获取最新集群模型的 ID,而self.sm.cluster_models存储最新的集群模型本身,并将其转换为字典格式的models以发送到数据库。它还创建mata_dict来存储样本数量。

将机器学习模型推送到数据库

上述两个函数都按照如下方式调用_push_models函数:

async def _push_models(
    self, component_id: str, model_type: ModelType,
    models: Dict[str, np.array], model_id: str,
    gene_time: float, performance_dict: Dict[str, float])
    -> List[Any]:
    msg = generate_db_push_message(component_id, \
        self.sm.round, model_type, models, model_id, \
        gene_time, performance_dict)
    resp = await send(msg, self.db_ip, self.db_socket)
    return resp

在此代码示例中,_push_models函数接受诸如component_id(聚合器或代理的 ID)、model_type(如本地或集群模型)、models本身、model_idgene_time(模型创建的时间)以及performance_dict(作为模型的性能指标)等参数。然后,通过generate_db_push_message函数创建要发送到数据库的消息(使用send函数),这些参数包括 FL 轮次信息。它从数据库返回响应消息。

现在我们已经解释了与 FL 服务器相关的所有核心功能,让我们来看看状态管理器的角色,它维护聚合过程所需的全部模型。

使用状态管理器维护聚合所需的模型

在本节中,我们将解释state_manager.py,该文件处理维护模型以及与本地模型聚合相关的必要易失性信息。

导入状态管理器的库

此代码导入了以下内容。data_struchelpersstates 的内部库在 附录探索内部库 中介绍:

import numpy as np
import logging
import time
from typing import Dict, Any
from fl_main.lib.util.data_struc import LimitedDict
from fl_main.lib.util.helpers import generate_id, generate_model_id
from fl_main.lib.util.states import IDPrefix

在导入必要的库之后,让我们定义状态管理器类。

定义状态管理器类

状态管理器类(Class StateManager),如 state_manager.py 中所见,在以下代码中定义:

class StateManager:
    """
    StateManager instance keeps the state of an aggregator.
    Functions are listed with this indentation.
    """

这跟踪聚合器的状态信息。聚合器和代理的易变状态也应存储,例如本地模型、连接到聚合器的代理信息、聚合过程生成的聚类模型以及当前轮次编号。

在定义了状态管理器之后,让我们继续初始化状态管理器。

初始化状态管理器

__init__ 构造函数中,配置了与联邦学习过程相关的信息。以下代码是构建状态管理器的一个示例:

def __init__(self):
    self.id = generate_id()
    self.agent_set = list()
    self.mnames = list()
    self.round = 0
    self.local_model_buffers = LimitedDict(self.mnames)
    self.local_model_num_samples = list()
    self.cluster_models = LimitedDict(self.mnames)
    self.cluster_model_ids = list()
    self.initialized = False
    self.agg_threshold = 1.0

self.id 聚合器的 ID 可以使用来自 util.helpers 库的 generate_id() 函数随机生成。

self.agent_set 是连接到聚合器的代理集合,其中集合的格式是字典信息的集合,在这种情况下与代理相关。

self.mnames 以列表格式存储要聚合的 ML 模型中每一层的名称。

self.round 被初始化为 0 以初始化联邦学习的轮次。

local_model_buffers 是由代理收集并存储在内存空间中的本地模型列表。local_model_buffers 接受来自代理的每个联邦学习轮次的本地模型,一旦聚合过程完成该轮次,此缓冲区将被清除并开始接受下一轮的本地模型。

self.local_model_num_samples 是一个列表,用于存储收集在缓冲区中的模型的样本数量。

self.cluster_models 是以 LimitedDict 格式存储的全局聚类模型集合,而 self.cluster_model_ids 是聚类模型 ID 的列表。

一旦设置了初始全局模型,self.initialized 变为 True,否则为 False

self.agg_threshold 被初始化为 1.0,该值会被 config_aggregator.json 文件中指定的值覆盖。

在初始化状态管理器之后,让我们接下来调查初始化全局模型。

初始化全局模型

以下 initialize_model_info 函数设置了其他代理将使用的初始全局模型:

def initialize_model_info(self, lmodels, \
                          init_weights_flag):
    for key in lmodels.keys():
        self.mnames.append(key)
    self.local_model_buffers = LimitedDict(self.mnames)
    self.cluster_models = LimitedDict(self.mnames)
    self.clear_lmodel_buffers()
    if init_weights_flag:
        self.initialize_models(lmodels, \
                            weight_keep=init_weights_flag)
    else:
        self.initialize_models(lmodels, weight_keep=False)

它填充了从初始代理发送的本地模型(lmodels)中提取的模型名称(self.mnames)。与模型名称一起,local_model_bufferscluster_models 也被重新初始化。在清除本地模型缓冲区后,它调用 initialize_models 函数。

以下 initialize_models 函数根据作为模型参数接收的初始基础模型(以字典格式 strnp.array)初始化神经网络的结构(numpy.array):

def initialize_models(self, models: Dict[str, np.array], \
                                weight_keep: bool = False):
    self.clear_saved_models()
    for mname in self.mnames:
        if weight_keep:
            m = models[mname]
        else:
            m = np.zeros_like(models[mname])
        self.cluster_models[mname].append(m)
        id = generate_model_id(IDPrefix.aggregator, \
                 self.id, time.time())
        self.cluster_model_ids.append(id)
        self.initialized = True

对于模型的每一层,这里定义为模型名称,此函数填写模型参数。根据weight_keep标志,模型以零或接收到的参数初始化。这样,初始的集群全局模型与随机模型 ID 一起构建。如果代理发送的 ML 模型与这里定义的模型架构不同,聚合器将拒绝接受该模型或向代理发送错误信息。不返回任何内容。

因此,我们已经涵盖了全局模型的初始化。在下一节中,我们将解释 FL 过程的主体部分,即检查聚合标准。

检查聚合标准

以下名为ready_for_local_aggregation的代码用于检查聚合标准:

def ready_for_local_aggregation(self) -> bool:
    if len(self.mnames) == 0:
            return False
    num_agents = int(self.agg_threshold * \
                                       len(self.agent_set))
    if num_agents == 0: num_agents = 1
    num_collected_lmodels = \
        len(self.local_model_buffers[self.mnames[0]])
    if num_collected_lmodels >= num_agents:
        return True
    else:
        return False            

ready_for_local_aggregation函数返回一个bool值,以标识聚合器是否可以开始聚合过程。如果满足聚合标准(例如收集足够的本地模型以进行聚合),则返回True,否则返回False。聚合阈值agg_thresholdconfig_aggregator.json文件中配置。

下一节是关于缓存用于聚合过程的本地模型。

缓存本地模型

以下在buffer_local_models上的代码将代理的本地模型存储在本地模型缓冲区中:

def buffer_local_models(self, models: Dict[str, np.array], 
        participate=False, meta_data: Dict[Any, Any] = {}):
    if not participate:  
        for key, model in models.items():
            self.local_model_buffers[key].append(model)
        try:
            num_samples = meta_data["num_samples"]
        except:
            num_samples = 1
        self.local_model_num_samples.append( \
                int(num_samples))
    else:  
        pass
    if not self.initialized:
        self.initialize_models(models)

参数包括以字典格式表示的本地models以及如样本数量等元信息。

首先,此函数通过检查参与标志来确定从代理发送的本地模型是初始模型还是不是。如果是初始模型,它将调用initialize_model函数,如前述代码块所示。

否则,对于用模型名称定义的模型的每一层,它将numpy数组存储在self.local_model_buffers中。key是模型名称,前述代码中提到的model是模型的实际参数。可选地,它可以接受代理用于重新训练过程的样本数量或数据源,并将其推送到self. local_model_num_samples缓冲区。

当 FL 服务器在receive_msg_from_agent过程中从代理接收本地模型时,会调用此函数。

这样,本地模型缓冲区已经解释完毕。接下来,我们将解释如何清除已保存的模型,以便聚合可以继续进行,而无需在缓冲区中存储不必要的模型。

清除已保存的模型

以下clear_saved_models函数清除本轮存储的所有集群模型:

def clear_saved_models(self):
    for mname in self.mnames:
        self.cluster_models[mname].clear()

此函数在初始化 FL 过程之初被调用,集群全局模型被清空,以便再次开始新一轮的 FL。

以下函数,clear_lmodel_buffers函数,清除所有缓存的本地模型,为下一轮 FL 做准备:

def clear_lmodel_buffers(self):
    for mname in self.mnames:
        self.local_model_buffers[mname].clear()
    self.local_model_num_samples = list()

在进行下一轮 FL 之前清除local_model_buffers中的本地模型是至关重要的。如果没有这个过程,要聚合的模型将与来自其他轮次的非相关模型混合,最终 FL 的性能有时会下降。

接下来,我们将解释在 FL 过程中添加代理的基本框架。

添加代理

这个add_agent函数处理使用系统内存进行简短的代理注册:

def add_agent(self, agent_name: str, agent_id: str, \
                               agent_ip: str, socket: str):
    for agent in self.agent_set:
        if agent_name == agent['agent_name']:
            return agent['agent_id'], agent['socket']
    agent = {
        'agent_name': agent_name,
        'agent_id': agent_id,
        'agent_ip': agent_ip,
        'socket': socket
    }
    self.agent_set.append(agent)
    return agent_id, socket

此函数仅向self.agent_set列表添加与代理相关的信息。代理信息包括代理名称、代理 ID、代理 IP 地址以及用于联系代理的socket编号。socket编号可以在将集群全局模型发送到连接到聚合器的代理时使用,以及在聚合器和代理之间使用push方法进行通信时使用。此函数仅在代理注册过程中调用,并返回代理 ID 和socket编号。

如果代理已经注册,这意味着agent_set中已经存在具有相同名称的代理,它将返回现有代理的代理 ID 和socket编号。

再次强调,从聚合器到代理的此push通信方法在特定安全情况下不起作用。建议使用代理使用的polling方法,以不断检查聚合器是否有更新的全局模型。

可以使用数据库扩展代理注册机制,这将为您提供更好的分布式系统管理。

接下来,我们将涉及 FL 轮次的增加。

增加 FL 轮次

increment_round函数仅精确增加由状态管理器管理的轮次编号:

def increment_round(self):
    self.round += 1

增加轮次是 FL 过程中支持连续学习操作的关键部分。此函数仅在注册初始全局模型或每次模型聚合过程之后调用。

现在我们已经了解了 FL 如何与状态管理器协同工作,在接下来的部分,我们将讨论模型聚合框架。

聚合本地模型

aggregation.py代码处理使用一系列聚合算法对本地模型进行聚合。在代码示例中,我们只支持在以下章节中讨论的FedAvg

导入聚合器的库

aggregation.py代码导入以下内容:

import logging
import time
import numpy as np
from typing import List
from .state_manager import StateManager
from fl_main.lib.util.helpers import generate_model_id
from fl_main.lib.util.states import IDPrefix

使用状态管理器维护聚合模型部分中讨论了导入的状态管理器的角色和功能,并在附录探索内部库中介绍了helpersstates库。

在导入必要的库之后,让我们定义聚合器类。

定义和初始化聚合器类

以下class Aggregator的代码定义了聚合器的核心过程,它提供了一套数学函数用于计算聚合模型:

class Aggregator:
    """
    Aggregator class instance provides a set of 
    mathematical functions to compute aggregated models.
    """

以下 __init__ 函数只是设置聚合器的状态管理器以访问模型缓冲区:

def __init__(self, sm: StateManager):
    self.sm = sm

一旦聚合器类被定义和初始化,让我们看看实际的 FedAvg 算法实现。

定义 aggregate_local_models 函数

以下 aggregate_local_models 函数是聚合本地模型的代码:

def aggregate_local_models(self):
    for mname in self.sm.mnames:
        self.sm.cluster_models[mname][0] \
            = self._average_aggregate( \
                self.sm.local_model_buffers[mname], \
                self.sm.local_model_num_samples)
    self.sm.own_cluster_num_samples = \
        sum(self.sm.local_model_num_samples)
    id = generate_model_id( \
        IDPrefix.aggregator, self.sm.id, time.time())
    self.sm.cluster_model_ids.append(id)
    self.sm.clear_lmodel_buffers()

此函数可以在聚合标准满足后调用,例如在 config_aggregator.json 文件中定义的聚合阈值。聚合过程使用状态管理器内存中缓存的本地 ML 模型。这些本地 ML 模型来自注册的代理。对于由 mname 定义的模型的每一层,模型权重由 _average_aggregate 函数如下平均,以实现 FedAvg。在平均所有层的模型参数后,cluster_models 被更新,并发送给所有代理。

然后,清除本地模型缓冲区,为下一轮 FL 流程做好准备。

FedAvg 函数

以下函数 _average_aggregate,由前面的 aggregate_local_models 函数调用,是实现 FedAvg 聚合方法的代码:

def _average_aggregate(self, buffer: List[np.array], 
                       num_samples: List[int]) -> np.array:
    denominator = sum(num_samples)
    model = float(num_samples[0])/denominator * buffer[0]
    for i in range(1, len(buffer)):
        model += float(num_samples[i]) / 
                                    denominator * buffer[i]
    return model

_average_aggregate 函数中,计算足够简单,对于给定的 ML 模型列表的每个缓冲区,它为模型取平均参数。模型聚合的基本原理在 第三章联邦学习系统的工作原理 中讨论。它使用 np.array 返回加权聚合模型。

现在我们已经涵盖了 FL 服务器和聚合器的所有基本功能,接下来,我们将讨论如何运行 FL 服务器本身。

运行 FL 服务器

这里是一个运行 FL 服务器的示例。为了运行 FL 服务器,你只需执行以下代码:

if __name__ == "__main__":
    s = Server()
    init_fl_server(s.register, 
                   s.receive_msg_from_agent, 
                   s.model_synthesis_routine(), 
                   s.aggr_ip, s.reg_socket, s.recv_socket)

FL 服务器实例的 registerreceive_msg_from_agentmodel_synthesis_routine 函数用于启动代理的注册过程、接收代理的消息以及启动模型合成过程以创建全局模型,所有这些都是使用 communication_handler 库中的 init_fl_server 函数启动的。

我们已经使用 FL 服务器涵盖了聚合器的所有核心模块。它们可以与数据库服务器一起工作,这将在下一节中讨论。

实现和运行数据库服务器

数据库服务器可以托管在聚合器服务器所在的同一台机器上,也可以与聚合器服务器分开。无论数据库服务器是否托管在同一台机器上,这里引入的代码都适用于这两种情况。数据库相关的代码可以在本书提供的 GitHub 仓库的 fl_main/pseudodb 文件夹中找到。

面向数据库的配置

以下代码是作为 config_db.json 保存的数据库端配置参数的示例:

{
    "db_ip": "localhost",
    "db_socket": "9017",
    "db_name": "sample_data",
    "db_data_path": "./db",
    "db_model_path": "./db/models"
}

特别是,db_data_path是 SQLite 数据库的位置,db_model_path是 ML 模型二进制文件的位置。config_db.json文件可以在setup文件夹中找到。

接下来,让我们定义数据库服务器并导入必要的库。

定义数据库服务器

pseudo_db.py代码的主要功能是接收包含本地和集群全局模型的包含消息。

导入伪数据库所需的库

首先,pseudo_db.py代码导入了以下内容:

import pickle, logging, time, os
from typing import Any, List
from .sqlite_db import SQLiteDBHandler
from fl_main.lib.util.helpers import generate_id, read_config, set_config_file
from fl_main.lib.util.states import DBMsgType, DBPushMsgLocation, ModelType
from fl_main.lib.util.communication_handler import init_db_server, send_websocket, receive

它导入了基本通用库以及SQLiteDBHandler(在使用 SQLite 定义数据库部分中讨论)和来自lib/util库的函数,这些函数在附录探索内部库中讨论。

定义 PseudoDB 类

然后定义了PseudoDB类以创建一个实例,该实例从聚合器接收模型及其数据并将其推送到实际的数据库(在这个例子中是 SQLite):

class PseudoDB:
    """
    PseudoDB class instance receives models and their data
    from an aggregator, and pushes them to database
    """

现在,让我们继续初始化PseudoDB的实例。

初始化 PseudoDB

然后,初始化过程__init__被定义为以下内容:

def __init__(self):
    self.id = generate_id()
    self.config = read_config(set_config_file("db"))
    self.db_ip = self.config['db_ip']
    self.db_socket = self.config['db_socket']
    self.data_path = self.config['db_data_path']
    if not os.path.exists(self.data_path):
        os.makedirs(self.data_path)
    self.db_file = \
        f'{self.data_path}/model_data{time.time()}.db'
    self.dbhandler = SQLiteDBHandler(self.db_file)
    self.dbhandler.initialize_DB()
    self.db_model_path = self.config['db_model_path']
    if not os.path.exists(self.db_model_path):
        os.makedirs(self.db_model_path)

初始化过程生成实例 ID 并设置各种参数,如数据库套接字(db_socket)、数据库 IP 地址(db_ip)、数据库路径(data_path)和数据库文件(db_file),所有这些均从config_db.json配置。

dbhandler存储SQLiteDBHandler的实例并调用initialize_DB函数来创建 SQLite 数据库。

如果不存在,则创建data_pathdb_model_path的文件夹。

PseudoDB的初始化过程之后,我们需要设计通信模块以接收来自聚合器的消息。我们再次使用 WebSocket 与聚合器进行通信,并将此模块作为服务器启动以接收和响应来自聚合器的消息。在这个设计中,我们不将来自数据库服务器的消息推送到聚合器或代理,以简化 FL 机制。

处理来自聚合器的消息

以下是对async def handler函数的代码,该函数以websocket作为参数,接收来自聚合器的消息并返回所需的信息:

async def handler(self, websocket, path):
    msg = await receive(websocket)
    msg_type = msg[DBPushMsgLocation.msg_type]
    reply = list()
    if msg_type == DBMsgType.push:
        self._push_all_data_to_db(msg)
        reply.append('confirmation')
    else:
        raise TypeError(f'Undefined DB Message Type: \
                                              {msg_type}.')
    await send_websocket(reply, websocket)

handler函数中,一旦它解码了从聚合器接收到的消息,handler函数会检查消息类型是否为push。如果是,它将尝试通过调用_push_all_data_to_db函数将本地或集群模型推送到数据库。否则,它将显示错误消息。然后可以将确认消息发送回聚合器。

在这里,我们只定义了push消息的类型,但你可以定义尽可能多的类型,同时增强数据库模式设计。

将所有数据推送到数据库

以下_push_all_data_to_db代码将模型信息推送到数据库:

def _push_all_data_to_db(self, msg: List[Any]):
    pm = self._parse_message(msg)
    self.dbhandler.insert_an_entry(*pm)
    model_id = msg[int(DBPushMsgLocation.model_id)]
    models = msg[int(DBPushMsgLocation.models)]
    fname = f'{self.db_model_path}/{model_id}.binaryfile'
    with open(fname, 'wb') as f:
        pickle.dump(models, f)

模型的信息通过_parse_message函数提取,并传递给_insert_an_entry函数。然后,实际的模型保存在本地服务器文件系统中,其中模型的文件名和路径由这里的db_model_pathfname定义。

解析消息

_parse_message函数仅从接收到的消息中提取参数:

def _parse_message(self, msg: List[Any]):
    component_id = msg[int(DBPushMsgLocation.component_id)]
    r = msg[int(DBPushMsgLocation.round)]
    mt = msg[int(DBPushMsgLocation.model_type)]
    model_id = msg[int(DBPushMsgLocation.model_id)]
    gene_time = msg[int(DBPushMsgLocation.gene_time)]
    meta_data = msg[int(DBPushMsgLocation.meta_data)]
    local_prfmc = 0.0
    if mt == ModelType.local:
        try: local_prfmc = meta_data["accuracy"]
        except: pass
    num_samples = 0
    try: num_samples = meta_data["num_samples"]
    except: pass
    return component_id, r, mt, model_id, gene_time, \
                                   local_prfmc, num_samples

此函数将接收到的消息解析为与代理 ID 或聚合器 ID(component_id)、轮数(r)、消息类型(mt)、model_id、模型生成时间(gene_time)以及以字典格式(meta_data)的性能数据相关的参数。当模型类型为本地时,提取本地性能数据local_prfmc。从meta_dect中提取在本地设备上使用的样本数据量。所有这些提取的参数在最后返回。

在以下部分,我们将解释使用 SQLite 框架实现的数据库。

使用 SQLite 定义数据库

sqlite_db.py代码创建 SQLite 数据库并处理从数据库中存储和检索数据。

导入 SQLite 数据库的库

sqlite_db.py按照如下方式导入基本通用库和ModelType

import sqlite3
import datetime
import logging
from fl_main.lib.util.states import ModelType

lib/util中的ModelType定义了模型类型:本地模型和(全局)集群模型。

定义和初始化 SQLiteDBHandler 类

然后,以下与SQLiteDBHandler类相关的代码创建并初始化 SQLite 数据库,并将模型插入 SQLite 数据库:

class SQLiteDBHandler:
    """
    SQLiteDB Handler class that creates and initialize
    SQLite DB, and inserts models to the SQLiteDB
    """

初始化非常简单——只需将PseudoDB实例传递的db_file参数设置为self.db_file

def __init__(self, db_file):
    self.db_file = db_file

初始化数据库

在下面的initialize_DB函数中,使用 SQLite(sqlite3)定义了本地和集群模型的数据库表:

def initialize_DB(self):
    conn = sqlite3.connect(f'{self.db_file}')
    c = conn.cursor()
    c.execute('''CREATE TABLE local_models(model_id, \
        generation_time, agent_id, round, performance, \
        num_samples)''')
    c.execute('''CREATE TABLE cluster_models(model_id, \
        generation_time, aggregator_id, round, \
        num_samples)''')
    conn.commit()
    conn.close()

在本例中,表被简化了,这样你可以轻松地跟踪上传的本地模型及其性能,以及由聚合器创建的全局模型。

local_models表具有模型 ID(model_id)、模型生成时间(generation_time)、上传的本地模型代理 ID(agent_id)、轮次信息(round)、本地模型的性能数据(performance)以及用于 FedAvg 聚合使用的样本数量(num_samples)。

cluster_models具有模型 ID(model_id)、模型生成时间(generation_time)、聚合器 ID(aggregator_id)、轮次信息(round)和样本数量(num_samples)。

将条目插入数据库

以下代码用于insert_an_entry,使用sqlite3库将接收到的参数数据插入:

def insert_an_entry(self, component_id: str, r: int, mt: \
    ModelType, model_id: str, gtime: float, local_prfmc: \
    float, num_samples: int):
    conn = sqlite3.connect(self.db_file)
    c = conn.cursor()
    t = datetime.datetime.fromtimestamp(gtime)
    gene_time = t.strftime('%m/%d/%Y %H:%M:%S')
    if mt == ModelType.local:
        c.execute('''INSERT INTO local_models VALUES \
        (?, ?, ?, ?, ?, ?);''', (model_id, gene_time, \
        component_id, r, local_prfmc, num_samples))
    elif mt == ModelType.cluster:
        c.execute('''INSERT INTO cluster_models VALUES \
        (?, ?, ?, ?, ?);''', (model_id, gene_time, \
        component_id, r, num_samples))
    conn.commit()
    conn.close()

此函数接受component_id(代理 ID 或聚合器 ID)、轮数(r)、消息类型(mt)、模型 ID(model_id)、模型生成时间(gtime)、本地模型性能数据(local_prfmc)和要插入的样本数量(num_samples)作为参数,使用 SQLite 库的execute函数插入条目。

如果模型类型是本地,则将模型信息插入到local_models表中。如果模型类型是集群,则将模型信息插入到cluster_models表中。

其他功能,例如更新和删除数据库中的数据,在此示例代码中未实现,您需要自行编写这些附加功能。

在下一节中,我们将解释如何运行数据库服务器。

运行数据库服务器

下面是使用 SQLite 数据库运行数据库服务器的代码:

if __name__ == "__main__":
    pdb = PseudoDB()
    init_db_server(pdb.handler, pdb.db_ip, pdb.db_socket)

PseudoDB类的实例被创建为pdbpdb.handler、数据库的 IP 地址(pdb.db_ip)和数据库套接字(pdb.db_socket)用于启动从init_db_server函数中启用并由communication_handler库在util/lib文件夹中提供的聚合器接收本地和集群模型的过程。

现在,我们了解了如何实现和运行数据库服务器。本章中讨论的数据库表和模式设计得尽可能简单,以便我们理解 FL 服务器流程的基本原理。在下一节中,我们将讨论 FL 服务器的潜在增强功能。

FL 服务器的潜在增强功能

在本章中讨论了 FL 服务器的一些关键潜在增强功能。

重新设计数据库

在本书中,数据库有意设计为包含最少的表信息,需要扩展,例如通过在数据库中添加聚合器本身、代理、初始基础模型和项目信息等表。例如,本章中描述的 FL 系统不支持服务器和代理进程的终止和重启。因此,FL 服务器的实现并不完整,因为它在系统停止或失败时丢失了大部分信息。

自动注册初始模型

为了简化注册初始模型的过程的解释,我们使用模型名称定义了 ML 模型的层。在系统中注册此模型可以自动化,这样只需加载具有.pt/.pth.h5等文件扩展名的特定 ML 模型(如 PyTorch 或 Keras 模型),FL 系统的用户就可以开始这个过程。

本地模型和全局模型性能指标

再次,为了简化对 FL 服务器和数据库端功能的解释,准确度值仅被用作模型性能标准之一。通常,机器学习应用有更多指标需要跟踪作为性能数据,并且它们需要与数据库和通信协议设计一起增强。

微调聚合

为了简化聚合本地模型的过程,我们仅使用了 FedAvg,这是一种加权平均方法。样本数量可以根据本地环境动态变化,这一方面由你增强。还有各种模型聚合方法,这些方法将在本书的第七章“模型聚合”中解释,以便你可以根据要创建和集成到 FL 系统中的 ML 应用选择最佳的聚合方法。

摘要

在本章中,通过实际的代码示例解释了 FL 服务器端实现的基本原理和原则。跟随本章内容后,你现在应该能够使用模型聚合机制构建 FL 服务器端功能。

这里介绍的服务器端组件包括基本的通信、代理和初始模型的注册、用于聚合的状态信息管理,以及创建全局集群模型的聚合机制。此外,我们还讨论了仅存储 ML 模型信息的数据库实现。代码被简化,以便你能够理解服务器端功能的原则。构建更可持续、弹性、可扩展的 FL 系统的许多其他方面的进一步改进取决于你。

在下一章中,我们将讨论实现 FL 客户端和代理功能的原则。客户端需要为机器学习应用提供一些精心设计的 API 以供插件使用。因此,本章将讨论 FL 客户端的核心功能库以及库集成到非常简单的 ML 应用中,以实现整个 FL 过程。

第五章:联邦学习客户端实现

根据系统架构、顺序和流程,如第3 章“联邦学习系统的工作原理”中所述,联邦学习FL)系统的客户端模块可以基于系统架构、顺序和流程实现。FL 客户端功能可以将进行本地训练和测试的分布式机器学习ML)应用程序与聚合器通过客户端库中嵌入的通信模块连接起来。

在使用 FL 客户端库的本地 ML 引擎示例中,将讨论最小引擎包示例,使用虚拟 ML 模型来理解与本章设计的 FL 客户端库集成的过程。通过遵循集成示例代码,您将了解如何实际启用与 FL 客户端相关的整个流程,如第3 章“联邦学习系统的工作原理”中所述,同时将在第6 章“运行联邦学习系统并分析结果”中讨论最小示例将发生什么。

在本章中,将讨论在本地 ML 引擎中使用的 FL 客户端功能的设计和实现原理概述。通过阅读本章,您将能够编写 FL 客户端模块和库以及分布式本地 ML 引擎的代码,例如使用卷积神经网络CNNs)进行图像分类。

在本章中,我们将涵盖以下主题:

  • FL 客户端组件概述

  • 实现 FL 客户端主要功能

  • 设计 FL 客户端库

  • 本地 ML 引擎集成到 FL 系统中

  • 将图像分类集成到 FL 系统中的示例

技术要求

本章中介绍的所有代码文件都可以在 GitHub 上找到(github.com/tie-set/simple-fl)。

重要提示

您可以使用代码文件用于个人或教育目的。请注意,我们不会支持商业部署,并且不对使用代码造成的任何错误、问题或损害负责。

FL 客户端组件概述

在第3 章“联邦学习系统的工作原理”中介绍了 FL 客户端作为代理的架构。在此,我们将介绍实现 FL 客户端基本功能的代码。在此处,软件架构的客户端被简化,仅可以使用此示例中的client.py文件,以及来自lib/util文件夹的支持函数,如图5.1所示:

![图 5.1 – 作为代理的 FL 客户端的 Python 软件组件

![img/B18369_05_01.jpg]

图 5.1 – 作为代理的 FL 客户端的 Python 软件组件

以下部分简要描述了 FL 系统代理的 Python 文件。

分布式代理端代码

对于代理端,fl_main/agent目录中有一个主要文件client.py,它处理大多数 FL 客户端功能。

FL 客户端代码(client.py)

agent文件夹中的client.py文件包含参与 FL 周期、与聚合器进行 ML 模型交换的框架以及推送轮询机制以与聚合器通信的功能。客户端的功能还可以作为本地 ML 应用程序与 FL 系统本身的接口,为 ML 引擎提供 FL 客户端库。这是将本地训练的 ML 模型连接到 FL 服务器和聚合器的主要代码。您需要自己准备一个本地 ML 应用程序,我们将帮助您了解如何使用 FL 客户端库将您的 ML 引擎集成到 FL 系统中,这是本章的另一个主要主题。

lib/util 代码

将对支持 Python 代码(communication_handler.pydata_struc.pyhelpers.pymessengers.pystates.py)作为内部库的解释包含在附录,探索内部库中。

代理配置

以下是我们使用的代码中保存为config_agent.json的客户端配置参数示例:

{
    "aggr_ip": "localhost",
    "reg_socket": "8765",
    "model_path": "./data/agents",
    "local_model_file_name": "lms.binaryfile",
    "global_model_file_name": "gms.binaryfile",
    "state_file_name": "state",
    "init_weights_flag": 1,
    "polling": 1
}

聚合器的 IP(aggr_ip)和端口号(reg_socket)用于连接到 FL 服务器,在那里进行本地模型的聚合。此外,模型路径参数model_path指定了本地模型(命名为local_model_file_name)和全局模型(命名为global_model_file_name)的位置。本地和全局模型存储为二进制文件(本例中为lms.binaryfilegms.binaryfile)。状态文件(命名为state_file_name)记录客户端的本地状态,定义了等待全局模型、训练模型、发送训练好的模型等。init_weights_flag在系统操作员希望使用某些权重初始化全局模型时使用。如果标志为1,代理将发送预配置的模型;否则,模型将在聚合器端填充为零。轮询标志(polling)涉及是否在代理和聚合器之间使用轮询方法进行通信。

现在我们已经讨论了 FL 客户端模块,让我们来看看实际的实现和一些代码,以实现 FL 客户端的功能。

实现 FL 客户端主要功能

在本节中,我们将解释如何实现基本的联邦学习客户端代码,该代码在 agent 目录下的 client.py 文件中有描述。请参阅第三章联邦学习系统的工作原理,以了解联邦学习客户端架构、序列和流程。通过学习这段客户端代码,您将了解如何实现智能体的注册过程、模型交换同步以及推送/轮询机制,以及智能体和聚合器之间的通信协议,以及一些将在其他机器学习应用中作为应用程序编程接口API)调用的函数。

让我们先看看实现联邦学习客户端功能需要导入哪些库。

为智能体导入库

在这个 client.py 文件示例中,智能体导入了通用库,例如 asynciotime(关于这些库的详细解释超出了本书的范围):

import asyncio, time, logging, sys, os
from typing import Dict, Any
from threading import Thread
from fl_main.lib.util.communication_handler import \
     init_client_server, send, receive
from fl_main.lib.util.helpers import read_config, \
     init_loop, save_model_file, load_model_file, \
     read_state, write_state, generate_id, \
     set_config_file, get_ip, compatible_data_dict_read, \
     generate_model_id, create_data_dict_from_models, \
     create_meta_data_dict
from fl_main.lib.util.states import ClientState, \
     AggMsgType, ParticipateConfirmationMSGLocation, \
     GMDistributionMsgLocation, IDPrefix
from fl_main.lib.util.messengers import \
     generate_lmodel_update_message, \
     generate_agent_participation_message, \
     generate_polling_message

至于从 fl_main.lib.util 导入的 communication_handlerhelpersstatesmessengers 库,它们旨在启用联邦学习的一般功能,请参阅附录,探索内部库

在导入必要的库之后,您将定义 Client 类。

定义客户端类

让我们定义实现联邦学习客户端核心功能的 Client 类,包括智能体自身的参与机制、模型交换框架以及智能体和聚合器之间的通信接口,以及为在智能体端本地机器学习引擎中使用提供的库:

class Client:
    """
    Client class instance with FL client-side functions
    and libraries used in the agent's ML engine
    """

然后,您将在 __init__ 函数下初始化 Client 类,如下一节所述。

初始化客户端

以下 __init__ 构造函数中的代码是客户端初始化过程的示例:

def __init__(self):
    self.agent_name = 'default_agent'
    self.id = generate_id()
    self.agent_ip = get_ip()
    self.simulation_flag = False
    if len(sys.argv) > 1:
        self.simulation_flag = bool(int(sys.argv[1]))
    config_file = set_config_file("agent")
    self.config = read_config(config_file)
    self.aggr_ip = self.config['aggr_ip']
    self.reg_socket = self.config['reg_socket']
    self.msend_socket = 0
    self.exch_socket = 0
    if self.simulation_flag:
        self.exch_socket = int(sys.argv[2])
        self.agent_name = sys.argv[3]
    self.model_path = f'{self.config["model_path"]}
                                        /{self.agent_name}'
    if not os.path.exists(self.model_path):
        os.makedirs(self.model_path)
    self.lmfile = self.config['local_model_file_name']
    self.gmfile = self.config['global_model_file_name']
    self.statefile = self.config['state_file_name']
    self.round = 0
    self.init_weights_flag = \
                     bool(self.config['init_weights_flag'])
    self.is_polling = bool(self.config['polling'])

首先,客户端为自己生成一个唯一的 ID 作为标识符,该标识符将在许多场景中用于执行联邦学习。

第二,客户端通过使用 get_ip() 函数获取自己的 IP 地址。

此外,本实现练习支持模拟运行,其中我们可以在一台机器上运行数据库、服务器和多个智能体的所有联邦学习系统组件。如果需要进行模拟,则 simulation_flag 参数需要设置为 True(有关如何设置模拟模式的说明,请参阅 GitHub 上的 README 文件)。

然后,self.cofig 读取并存储 config_agent.json 的信息。

然后,客户端配置聚合器的信息以连接到其服务器,其中 self.aggr_ip 从智能体配置文件中读取聚合器机器或实例的 IP 地址。

之后,将设置 reg_socket 端口,其中 reg_socket 用于智能体的注册,以及存储为 self.aggr_ip 的聚合器 IP 地址。在这个示例中,reg_socket 的值也可以从智能体配置文件中读取。

用于在模型交换例程中发送本地机器学习模型的 msend_socket,将在代理通过向联邦学习服务器发送消息并接收响应后参与联邦学习过程时进行配置。

当通信不是在 轮询 模式下时,exch_socket 用于接收来自聚合器的全局模型,同时与存储为 self.agent_ip 的代理 IP 地址一起使用。

在本例中,exch_socket 可以根据模拟模式从命令行参数读取或由聚合器决定。

在本例中,当聚合器被设置为能够向连接的代理推送消息时,这在轮询模式下是不成立的,exch_socket 可以由聚合器动态配置。

self.model_path 存储本地和全局模型的路径,可以根据模拟模式从代理配置文件或命令行参数中读取。如果没有目录来保存这些模型文件,它将确保创建该目录。

self.lmfile, self.gmfile, 和 self.statefile 分别是本地模型、全局模型和客户端状态的文件名,它们从代理的配置文件中读取。特别是,在 self.statefile 中,保存了 ClientState 的值。ClientState 是客户端自身的枚举值,其中有一个等待全局模型的状态(waiting_gm),一个用于本地训练的状态(training),一个用于发送本地模型的状态(sending),以及一个拥有更新后的全局模型的状态(gm_ready)。

联邦学习过程的轮次信息,定义为 self.round,初始化为 0,并在模型聚合过程中随着联邦学习轮次的进行而更新,通常聚合器会通知轮次的变化。

self.init_weights_flag 是当系统操作员想要使用某些参数初始化全局模型时使用的标志,如代理配置中所述。

self.is_polling 标志涉及是否在代理和聚合器之间的通信中使用轮询方法。轮询标志必须与聚合器端设置的标志相同。

这里讨论的关于 __init__ 构造函数的代码可以在 GitHub 上的 fl_main/agent 文件夹中的 client.py 文件中找到 (github.com/tie-set/simple-fl)。

既然我们已经讨论了如何初始化客户端模块,在下一节中,我们将探讨参与机制是如何与一些示例代码一起工作的。

代理参与联邦学习周期

这个参与或注册过程是代理能够与其他代理一起参与联邦学习过程所必需的。因此,代理需要被添加到可以发送本地训练的机器学习模型到聚合器的授权代理列表中。

异步的 participate 函数向聚合器发送第一条消息以加入 FL 循环,并将接收状态和通信信息,例如来自聚合器的套接字编号。

代理通过 config_agent.json 文件知道加入 FL 平台的 IP 地址和端口号。当加入 FL 平台时,代理发送包含以下信息的参与消息:

  • agent_name:代理自身的唯一名称。

  • id:代理自身的唯一标识符。

  • model_id:要发送给聚合器的模型的唯一标识符。

  • models:按模型名称键控的模型字典。如果 init_flagFalse,则模型权重不需要训练,因为它仅由聚合器用于记住模型的形状。

  • init_weights_flag:一个布尔标志,表示发送的模型权重是否应作为基础模型使用。如果为 True 且没有准备好的全局模型,聚合器将此组本地模型作为第一个全局模型,并发送给所有代理。

  • simulation_flag:如果是模拟运行,则为 True;否则为 False

  • exch_socket:等待从聚合器接收全局模型的端口号。

  • gene_time:模型生成的时间。

  • performance_dict:以字典格式存储与模型相关的性能数据。

  • agent_ip:代理自身的 IP 地址。

定义了所有上述参与消息后,代理准备好与聚合器交换模型,实现参与过程的代码如下:

async def participate(self):
    data_dict, performance_dict = \
       load_model_file(self.model_path, self.lmfile)
    _, gene_time, models, model_id = \
       compatible_data_dict_read(data_dict)
    msg = generate_agent_participation_message(
         self.agent_name, self.id, model_id, models,
         self.init_weights_flag, self.simulation_flag,
         self.exch_socket, gene_time, performance_dict,
         self.agent_ip)
    resp = await send(msg, self.aggr_ip, self.reg_socket)
    self.round = resp[ \
       int(ParticipateConfirmaMSGLocation.round)]
    self.exch_socket = resp[ \
       int(ParticipateConfirmationMSGLocation.exch_socket)]
    self.msend_socket = resp[ \
       int(ParticipateConfirmationMSGLocation.recv_socket)]
    self.id = resp[ \
       int(ParticipateConfirmationMSGLocation.agent_id)]
    self.save_model_from_message(resp, \
        ParticipateConfirmationMSGLocation)

代理读取本地模型以告知聚合器 ML 模型的结构,初始模型不一定需要训练。data_dictperformance_dict 分别存储模型及其性能数据。

然后,使用 generate_agent_participation_message 函数包装包含如 ML models 和其 model_id 等信息的消息 msg

在发送消息时,在这个例子中,使用聚合器的 IP 地址 (aggr_ip) 和注册端口号 (reg_socket) 构建 WebSocket 以连接到聚合器。

通过从 communication_handler 导入的异步 send 函数向聚合器发送消息后,代理从聚合器接收响应消息 resp。响应将包括轮次信息、接收全局模型 exch_socket 的端口号、将本地模型发送到聚合器 msend_socket 的端口号以及更新的代理 ID。

最后,通过调用 save_model_from_message 函数将响应消息中的全局模型保存在本地。

代理的参与机制已经解释。在下一节中,我们将学习模型交换同步的框架。

模型交换同步

模型交换同步,如下面的代码所示,是为了检查代理的状态并根据状态调用适当的函数:

Async def model_exchange_routine(self):
    while True:
        await asyncio.sleep(5)
        state = read_state(self.model_path, self.statefile)
        if state == ClientState.sending:
            await self.send_models()
        elif state == ClientState.waiting_gm:
            if self.is_polling == True:
               await self.process_polling()
            else: pass
        elif state == ClientState.training: pass
        elif state == ClientState.gm_ready: pass
        else: pass

基本上,当客户端存活时,这个过程始终在运行,而while循环则定期用来检查客户端的状态,并在必要时进行下一步操作。

while循环中,等待几秒钟后,它首先通过read_state函数检查客户端状态。read_state函数中的参数是用来定位存储在本地环境中的状态文件。

正如所述,ClientState具有客户端状态的枚举值,定义了发送本地模型的状态(发送)、等待全局模型的状态(等待 _sgm)、本地训练的状态(训练)和接收更新后的全局模型的状态(gm_ready)。

如果客户端处于发送状态(state == ClientState.sending),这意味着它已准备好将本地训练的模型发送到聚合器。因此,代理调用send_models函数将本地训练的机器学习模型发送到聚合器。

当状态是等待全局模型state == ClientState.waiting_gm)时,如果开启轮询模式,则通过process_polling从代理向聚合器进行轮询;如果轮询模式关闭,则什么都不做。

如果客户端处于训练状态(state == ClientState.training),这意味着客户端现在正在训练本地模型,只需等待几秒钟,如果需要的话打印训练状态。也可以根据需要添加任何程序。

如果客户端处于gm_ready状态(state == ClientState.gm_ready),这意味着客户端已收到全局模型。此状态将由本地机器学习应用程序处理,它除了显示全局模型的就绪状态外,不做任何事情。

在下一节中,我们将讨论如何实现联邦学习周期中的推送轮询机制。

推送和轮询实现

一旦代理初始化并确认参与联邦学习过程,它就开始等待从聚合器发送的全局模型。从聚合器接收全局模型有两种方式:推送方法和轮询方法。尽管为了简化,这里没有在联邦学习客户端代码中实现安全套接字层SSL)或传输层安全性TSL)框架,但建议支持它们以确保持续的通信安全。

让我们来看看每个通信框架的机制。

从聚合器到代理的推送方法

使用推送方法,聚合器将在全局模型生成后立即将包含全局模型的消息推送到所有连接的代理。

以下代码展示了从聚合器接受和保存全局模型的推送机制:

async def wait_models(self, websocket, path):
    gm_msg = await receive(websocket)
    self.save_model_from_message( \
        gm_msg, GMDistributionMsgLocation)

wait_models异步函数接受websocket作为参数。当聚合器向代理发送消息时,它通过await recieve(websocket)接收gm_msg消息,并通过调用在设计 FL 客户端库部分定义的save_model_from_message函数,将全局模型本地保存。

代理到聚合器的轮询方法

使用polling方法,代理将持续询问(轮询)聚合器,以查看全局模型是否已经形成。一旦创建并准备好发送给连接的代理,轮询的消息将返回给代理,并在响应中包含更新的全局模型。

以下关于process_polling异步函数的代码说明了polling方法:

async def process_polling(self):
    msg = generate_polling_message(self.round, self.id)
    resp = await send(msg, self.aggr_ip, self.msend_socket)
    if resp[int(PollingMSGLocation.msg_type)] \
                                      == AggMsgType.update:
        self.save_model_from_message(resp, \
            GMDistributionMsgLocation)
    else: pass

它首先使用generate_polling_message函数生成要发送给聚合器的轮询消息。在收到聚合器发送的响应消息resp后,如果消息类型是AggMsgType.update,意味着响应消息包含更新的全局模型,它将调用save_model_from_message函数。否则,它不执行任何操作。

上述函数是 FL 客户端的基本但核心功能,这些函数需要作为库被用户端的 ML 应用程序高效地使用。

现在 FL 客户端设计,包括初始化、参与和模型交换,已经解释过了,我们将学习如何设计 FL 客户端库。

设计 FL 客户端库

在本节中,我们将解释如何封装基本函数作为库提供给用户。在本例中,将它们作为库封装的最简单方法将被讨论。这需要根据你的需求和自己的 FL 客户端框架设计进行扩展。通过将 FL 客户端模块作为库封装,开发者将能够轻松地将 FL 客户端的功能集成到本地 ML 引擎中。

让我们从如何定义一个库来启动和注册 FL 客户端开始。

启动 FL 客户端核心线程

为了使本地 ML 应用开发者能够集成 FL 客户端相关的函数,有时需要将它们封装为线程函数。

以下注册 FL 系统中代理的代码简单地将一个participate函数放入asyncio.get_event_loop函数的run_until_complete函数中:

def register_client(self):
    asyncio.get_event_loop().run_until_complete( \
        self.participate())

此外,start_wait_model_server函数被封装,如下面的代码块所示,其中Thread函数负责持续运行。这样,你将能够在并行运行本地 ML 模块的同时,在 FL 系统处于推送通信模式时,在wait_models线程中接收全局模型:

def start_wait_model_server(self):
    th = Thread(target = init_client_server, \
        args=[self.wait_models, self.agent_ip, \
        self.exch_socket])
    th.start()

同样,start_model_exhange_server 函数可以是一个线程,用于运行模型交换例程以同步本地和全局模型,同时本地机器学习模块并行运行。您只需调用以下 start_model_exchange_server 函数作为库来启用此功能:

def start_model_exchange_server(self):
    self.agent_running = True
    th = Thread(target = init_loop, \
        args=[self.model_exchange_routine()])
    th.start()

最后,当它们在 Client 类外部被调用时,同时执行所有这三个函数可能是有帮助的。因此,我们引入了以下关于 start_fl_client 的代码,该代码聚合了注册代理、等待全局模型和模型交换例程以启动 FL 客户端核心功能的功能:

def start_fl_client(self):
    self.register_client()
    if self.is_polling == False:
        self.start_wait_model_server()
    self.start_model_exchange_server()

FL 客户端的初始化现在被封装到 start_fl_client 中。接下来,我们将定义保存的机器学习模型库。

保存全局模型

虽然 loadsave 模型函数由 lib/util 中的辅助函数提供,这些将在后面的 附录探索内部库 中解释,但为机器学习开发者提供一个从聚合器发送的消息中保存全局模型的接口是有帮助的。

以下 save_model_from_message 函数是一个从代理中提取并保存全局模型,并更改客户端状态为 gm_ready 的函数。此函数将消息(msg)和消息位置(MSG_LOC)信息作为参数:

def save_model_from_message(self, msg, MSG_LOC):
    data_dict = create_data_dict_from_models( \
        msg[int(MSG_LOC.model_id)],
        msg[int(MSG_LOC.global_models)],
        msg[int(MSG_LOC.aggregator_id)])
    self.round = msg[int(MSG_LOC.round)]
    save_model_file(data_dict, self.model_path, \
        self.gmfile)
    self.tran_state(ClientState.gm_ready)

使用 create_data_dict_from_models 库从消息中提取全局模型、模型 ID 和聚合器 ID,并将它们放入字典中。根据接收到的消息,也更新了轮次信息。

然后,使用 save_model_file 库将接收到的全局模型保存到本地文件中,其中指定了数据字典、模型路径和全局模型文件名以保存模型。

接收到全局模型后,它通过调用 tran_state 函数将客户端状态更改为 gm_ready,该状态表示全局模型已准备好由本地机器学习使用,tran_state 函数将在下一节中解释。

定义了保存全局模型的函数后,我们就可以继续下一节,了解如何操作客户端状态。

操作客户端状态

为了操作客户端状态以便它可以逻辑地处理本地和全局模型,我们准备了 read_statetran_state 函数,这些函数可以从代码内部和外部访问。

以下 read_state 函数读取存储在 statefile 中、由 model_path 指定位置的值。使用 ClientState 的枚举值来更改客户端状态:

def read_state(self) -> ClientState:
    return read_state(self.model_path, self.statefile)

以下 tran_state 函数更改代理的状态。在这个代码示例中,状态仅在本地 state 文件中维护:

def tran_state(self, state: ClientState):
    write_state(self.model_path, self.statefile, state)

接下来,让我们定义可以将本地模型发送到聚合器的函数。

将本地模型发送到聚合器

以下异步的 send_models 函数是关于将本地保存的模型发送到聚合器的:

async def send_models(self):
    data_dict, performance_dict = \
        load_model_file(self.model_path, self.lmfile)
    , _, models, model_id = \
        compatible_data_dict_read(data_dict)
    msg = generate_lmodel_update_message( \
        self.id, model_id, models, performance_dict)
    await send(msg, self.aggr_ip, self.msend_socket)
    self.tran_state(ClientState.waiting_gm)

它首先使用load_model_file辅助函数提取data_dictperformance_dict,然后根据compatible_data_dict_read函数从data_dict中提取模型及其 ID。然后,使用generate_lmodel_update_message库包装消息,并通过communication_handlersend函数发送到聚合器。之后,通过tran_state函数将客户端状态更改为waiting_gm。再次强调,可以添加 SSL/TSL 框架来确保通信安全,但在此处未实现,以保持联邦学习客户端编码的简单性。

当你想将初始基础模型发送到模型架构的聚合器以进行注册目的时,会调用以下send_initial_model函数。它接受初始模型、样本数量和性能值作为输入,并调用将在本节后面解释的setup_sending_model

def send_initial_model(self, initial_models, \
                             num_samples=1, perf_val=0.0):
    self.setup_sending_models( \
        initial_models, num_samples, perf_val)

当你在联邦学习周期内想要向聚合器发送训练好的本地模型时,会调用以下send_trained_model函数。它接受训练好的模型、样本数量和性能值作为输入,并且只有在客户端状态不是gm_ready时才调用setup_sending_model

def send_trained_model(self, models, \
                             num_samples, perf_value):
    state = self.read_state()
    if state == ClientState.gm_ready:
        pass
    else:
        self.setup_sending_models( \
            models, num_samples, perf_value)

以下setup_sending_models函数被设计为内部库,用于设置将本地训练的模型发送到聚合器的过程。它接受模型的参数作为np.array,样本数量作为整数,以及性能数据作为浮点值:

def setup_sending_models(self, models, \
                               num_samples, perf_val):
    model_id = generate_model_id( \
                   IDPrefix.agent, self.id, time.time())
    data_dict = create_data_dict_from_models( \
                    model_id, models, self.id)
    meta_data_dict = create_meta_data_dict( \
                         perf_val, num_samples)
    save_model_file(data_dict, self.model_path, \
        self.lmfile, meta_data_dict)
    self.tran_state(ClientState.sending)

基本上,这个函数使用generate_model_id辅助函数创建一个唯一的模型 ID,使用create_data_dict_from_models辅助函数创建的data_dict来存储本地机器学习模型数据,以及使用create_meta_data_dict辅助函数创建的meta_data_dict来存储性能数据。然后,所有与模型和性能相关的上述数据都使用save_model_file函数保存在由self.model_path指定的位置。接着,它将客户端状态更改为sending,以便mode_exchange_routine函数可以注意客户端状态的改变,并开始向聚合器发送训练好的本地模型。

既然我们已经了解了将机器学习模型发送到聚合器的库,那么让我们来了解一个在代理端等待全局模型的重要函数。

等待聚合器提供的全局模型

以下wait_for_global_model函数对于持续进行联邦学习周期非常重要:

def wait_for_global_model(self):
    while (self.read_state() != ClientState.gm_ready):
        time.sleep(5)
    data_dict, _ = load_model_file( \
                       self.model_path, self.gmfile)
    global_models = data_dict['models']
    self.tran_state(ClientState.training)
    return global_models

原则上,该函数会等待客户端状态变为gm_ready。当代理端接收到全局模型时,客户端状态会变为gm_ready。一旦客户端状态变为gm_ready,它将继续从data_dict中加载全局模型,使用load_model_file函数提取,将客户端状态更改为training,并将全局模型返回到本地机器学习模块。

我们已经讨论了如何设计 FL 客户端函数的库。在下一节中,我们将讨论如何将这些库集成到本地机器学习过程中。

本地机器学习引擎集成到 FL 系统中

将 FL 客户端库成功集成到本地机器学习引擎中,对于后续在分布式环境中进行联邦学习至关重要。

图 5.2所示,GitHub 仓库github.com/tie-set/simple-flexamples/minimal目录下的minimal_MLEngine.py文件提供了一个将 FL 客户端库集成到最小化 ML 引擎包的示例:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_05_02.jpg

图 5.2 – 最小化机器学习引擎包

在下一节中,我们将解释需要将哪些库导入到本地机器学习引擎中。

导入本地机器学习引擎的库

以下代码显示了导入过程,其中首先导入了通用库,如numpytimeDict。这个过程的关键部分是,从fl_main.agent文件夹中的client.py文件导入Client。这样,开发者不需要了解 FL 系统内部太多的代码,只需调用作为库定义的重要功能,正如在设计 FL 客户端库的方向一节中讨论的那样。

本书将不会介绍pip安装打包的内容,但可以使用私有或公共 PyPI 服务器托管客户端代码:

import numpy as np
import time, logging, sys
from typing import Dict
from fl_main.agent.client import Client

在导入必要的库之后,让我们看看为本地训练和测试定义的函数。

定义机器学习模型、训练和测试函数

您首先定义要集成到 FL 系统中的模型、训练和测试函数。在本代码示例中,我们将使用虚拟模型和训练/测试函数,使用户能够理解最小化 FL 流程,而无需被特定的机器学习复杂性所困扰。

被称为init_models的以下函数返回模型的模板(以字典格式),以告知 ML 模型结构。模型不一定需要训练。在这种情况下,模型由model1model2定义了两个层,每个层分配了一些随机的 NumPy 数组,如下所示:

def init_models() -> Dict[str,np.array]:
    models = dict()
    models['model1'] = np.array([[1, 2, 3], [4, 5, 6]])
    models['model2'] = np.array([[1, 2], [3, 4]])
    return models

在初始化模型后,您将设计以下training函数,它可以作为每个 ML 应用的占位符函数:

def training(models: Dict[str,np.array],
           init_flag: bool = False) -> Dict[str,np.array]:
    # return templates of models to tell the structure
    # This model is not necessarily actually trained
    if init_flag:
        return init_models()
    # ML Training. In this example, no actual training.
    models = dict()
    models['model1'] = np.array([[1, 2, 3], [4, 5, 6]])
    models['model2'] = np.array([[1, 2], [3, 4]])
    return models

此函数的逻辑顺序应该是接收模型作为输入,训练它们,并返回训练后的本地模型。作为输入参数,它接受具有Dict[str,np.array]格式的模型和init_flag布尔值,指示是否是初始化步骤。

当您想要调用并返回预定义的init_models时,init_flagTrue,如果是实际的训练步骤,则为False

最终,此函数返回分解为 NumPy 数组的训练模型,在这个例子中是一个 Dict[str,np.array] 字典。

在这个虚拟例子中,我们只是提供了虚拟模型,跳过了实际的训练过程。

然后,以下 compute_performance 函数被设计用来计算给定一组模型和测试数据集的模型性能:

def compute_performance(models: Dict[str,np.array], \
                                      testdata) -> float:
    # replace with actual performance computation logic
    accuracy = 0.5
    return

再次强调,在这个例子中,只提供了一个虚拟准确度值 0.5,以保持事情简单。

然后,你可能想要定义以下 judge_termination 函数来决定结束训练过程并退出联邦学习过程的准则:

def judge_termination(training_count: int = 0,
              global_arrival_count: int = 0) -> bool:
    # Depending on termination criteria, change the return bool value
    # Call a performance tracker to check if the current models satisfy the required performance
    return True

如何设计这个终止条件取决于你。这个函数接受诸如完成训练过程的数量(training_count)、接收全局模型的次数(global_arrival_count)等参数,返回一个布尔值,其中标志为 True 表示继续联邦学习过程,False 表示停止。在这里,它只提供了一个 True 布尔值,意味着除非代理被强制在函数外部停止,否则联邦学习过程不会停止。

如果需要准备测试数据,你可以定义一个如 prep_test_data 的函数:

def prep_test_data():
    testdata = 0
    return

在这个例子中,它被设置为 0

现在测试和训练所需的函数已经定义,我们将客户端库集成到本地机器学习(ML)引擎中,以运行与联邦服务器端组件(如聚合器和数据库)一起工作的联邦学习代理。

将客户端库集成到您的本地 ML 引擎中

现在,一切准备就绪,可以开始你的第一个联邦学习(FL)过程,尽管模型、训练和测试函数都使用虚拟变量设置。

首要任务是创建一个 Client 实例,如下所示,以便你可以调用其库:

# Step1: Create Client instance
cl = Client()

第二,你使用训练函数创建 initial_models,如下所示:

# Step2: Create template models (to tell the shapes)
initial_models = training(dict(), init_flag=True)

之后,通过调用 cl.send_initial_model 并将 initial_models 作为参数,它将初始模型发送到联邦聚合器:

# Step3: Send initial models
cl.send_initial_model(initial_model)

然后,让我们通过调用 cl.start_fl_client() 来启动客户端的联邦学习过程。正如在 启动联邦客户端核心线程 部分中解释的那样,此函数可以同时启动三个过程:注册代理、等待全局模型以及模型交换例程:

# Step4: Start the FL client
cl.start_fl_client()

然后,我们通过有效地集成几个联邦客户端库,设计客户端的本地训练/测试和发送/接收模型的联邦学习周期:

# Step5: Run the local FL loop
training_count, gm_arrival_count = 0, 0
while judge_termination(training_count, gm_arrival_count):
    global_models = cl.wait_for_global_model()
    gm_arrival_count += 1
    global_model_performance_data = \
       compute_performance(global_models, prep_test_data())
    models = training(global_models)
    training_count += 1
    perf_value = compute_performance( \
                     models, prep_test_data())
    cl.send_trained_model(models, 1, perf_value)

我们使用 while 循环和 judge_termination 函数来检查系统是否需要退出循环。是否使用 training_countgm_arrival_count 来判断联邦学习周期的终止取决于你。

然后,代理通过 cl.wait_for_global_model() 继续等待全局模型。当从聚合器接收到全局模型后,它提取 global_models,增加 gm_arrival_count,并在 wait_for_global_model 函数中将客户端状态设置为 training 状态。

接下来,使用 compute_performance 函数计算 global_model_performance_data,输入为 global_modelsprep_test_data

当在 training 状态下执行 training(global_models) 时,客户端可能会从聚合器接收新的全局模型。这种情况发生在客户端的本地训练速度过慢,聚合器决定利用其他本地模型来创建一组新的全局模型。如果新的全局模型已经到达代理,客户端的状态将变为 gm_ready,并且当前正在训练的 ML 模型将被丢弃。

在本地训练阶段完成,并生成 models 后,代理通过 compute_performance 函数增加 training_count 并计算当前 ML 模型的性能数据 perf_value

然后,代理尝试通过 cl.send_trained_model 将训练好的本地模型上传到聚合器,并将训练好的模型和之前计算的性能值作为参数。

send_trained_model 函数中,客户端状态被设置为 sending。一旦客户端的 model_exchange_routine 观察到状态转换为 sending 状态,它将训练好的本地模型(以二进制文件的形式存储)发送给聚合器。发送模型后,它回到 send_models 函数中的 waiting_gm 状态。

在发送本地模型后,聚合器将其存储在缓冲区中,并等待下一轮全局模型聚合,直到有足够的本地模型由代理上传。

在下一节中,我们将简要介绍如何将图像分类机器学习集成到我们讨论过的联邦学习系统中。

将图像分类集成到联邦系统中的示例

我们通过一个最小示例学习了如何启动一个联邦学习过程。在本节中,我们将给出使用 CNN 进行图像分类(IC)的联邦学习的一个简要示例。

首先,包含图像分类示例代码的包位于 GitHub 仓库 github.com/tie-set/simple-fl 中的 examples/image_classification/ 文件夹,如图 5.3 所示:

https://github.com/OpenDocCN/freelearn-ml-zh/raw/master/docs/fdr-lrn-py/img/B18369_05_03.jpg

图 5.3 – 图像分类包

负责将 IC 算法集成到 FL 系统中的主要代码位于 classification_engine.py 文件中。

当导入库时,我们使用一些额外的文件,包括与 IC 算法相关的 CNN 模型、转换函数和数据管理器。详细信息请参阅 GitHub 代码 github.com/tie-set/simple-fl

接下来,让我们导入一些标准的 ML 库以及我们讨论过的 FL 代码中的客户端库:

import logging
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from typing import Dict
from .cnn import Net
from .conversion import Converter
from .ic_training import DataManger, execute_ic_training
from fl_main.agent.client import Client

在这种情况下,我们定义 TrainingMetaData,它只提供了将要发送给聚合器并在执行 FedAvg 算法时使用的数据量。聚合算法在 第四章 使用 Python 的联邦学习服务器实现 以及在 第七章 模型聚合 中进行了讨论:

class TrainingMetaData:
    # The number of training data used for each round
    # This will be used for the weighted averaging
    # Set to a natural number > 0
    num_training_data = 8000

init_models 函数的内容现在被替换为一个转换为 NumPy 数组的 CNN。它以字典格式返回 CNN 的模板以告知结构:

def init_models() -> Dict[str,np.array]:
    net = Net()
    return Converter.cvtr().convert_nn_to_dict_nparray(net)

训练函数 training 现在填充了使用 CIFAR-10 数据集的实际训练算法。它以模型和 init_flag 作为参数,并将训练后的模型作为 Dict[str,np.array] 返回。init_flag 是一个 bool 值,如果是初始步骤则为 True,如果是实际训练步骤则为 False。在准备训练数据时,由于批量大小,我们使用一定的阈值进行训练。在这种情况下,阈值是 4

然后,我们使用 net = Converter.cvtr().convert_dict_nparray_to_nn(models) 创建一个基于 CNN 的聚类全局模型。

我们定义损失函数和优化器如下:

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

然后,实际的训练将通过 trained_net = execute_ic_training(DataManger.dm(), net, criterion, optimizer) 进行,其中 IC 训练的实际代码可以在 ic_training.py 文件中找到。

训练完成后,将返回转换后的模型。

算法总结如下:

def training(models: Dict[str,np.array], \
             init_flag: bool=False) -> Dict[str,np.array]:
    if init_flag:
        DataManger.dm( \
            int(TrainingMetaData.num_training_data / 4))
        return init_models()
    net = \
        Converter.cvtr().convert_dict_nparray_to_nn(models)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(net.parameters(), \
                          lr=0.001, momentum=0.9)
    trained_net = execute_ic_training(DataManger.dm(), \
                              net, criterion, optimizer)
    models = Converter.cvtr(). \
                 convert_nn_to_dict_nparray(trained_net)
    return models

下面的 compute_performance 函数填充了一个用于计算准确率的算法,这个算法足够简单——只需将正确结果的数量除以总标签的数量。给定一组模型和测试数据集,它使用 modelstestdata 作为参数计算模型的性能:

def compute_performance(models: Dict[str,np.array], \
                       testdata, is_local: bool) -> float:
    # Convert np arrays to a CNN
    net = \
       Converter.cvtr().convert_dict_nparray_to_nn(models)
    correct, total = 0, 0
    with torch.no_grad():
        for data in DataManger.dm().testloader:
            images, labels = data
            _, predicted = torch.max(net(images).data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    acc = float(correct) / total
    return acc

judge_terminationprep_test_data 函数与最小示例中的函数相同。

客户端库集成到 IC 示例中

现在,一切准备就绪,可以开始 IC 算法了,整合前面函数的所有代码与之前 将客户端库集成到您的本地 ML 引擎 部分中使用的代码相同。请查看 classification_engine.py 文件,确保代码相同,除了显示我们实际发送的数据样本数量的部分。这样,通过仅重写前面的函数,您就可以轻松地将自己的本地 ML 应用程序连接到我们在这里讨论的 FL 系统。请参阅 第六章 运行联邦学习系统并分析结果 中的 运行图像分类及其分析 部分,以检查本节讨论的代码的运行结果。

摘要

在本章中,我们讨论了联邦学习(FL)的客户端实现。在参与 FL 过程中有三个基本但重要的功能:通过推送或轮询机制接收来自聚合器的全局模型,以及在本地训练过程结束后向聚合器发送本地模型。为了有效地实现与 FL 聚合器合作的客户端机器学习引擎,理解客户端状态非常重要。客户端状态包括等待全局模型的状态、表示本地训练正在进行的状态、显示准备发送本地模型的状态,以及拥有更新后的全局模型的状态。

我们还讨论了 FL 客户端库的设计理念,其中核心功能需要有效地打包,以提供对机器学习开发人员和工程师友好的接口。

最后但同样重要的是,我们学习了如何实际使用 FL 客户端库将本地机器学习引擎集成到 FL 系统中,其中我们使用了最小示例和 IC 示例来理解集成过程本身。

在下一章中,我们将实际运行本章和前几章中介绍过的代码,以便我们可以深入了解模型的情况,这些模型通过最小示例以及 IC 示例进行聚合。

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐