Потоковая обработка данных с использованием Kafka и Apache Spark Stereaming
2. Создание входных потоков Kafka на Python с использованием Twitter API и VK API.
Что такое Spark Stereaming.
В этой статье мы попытаемся объединить все полученные знания в единую систему. Spark Stereaming — компонент Spark для обработки потоковых данных практически в реальном масштабе времени.
Spark Streaming предоставляет набор программных интерфейсов для потоковой обработки, что позволяет создавать потоковые задания так же, как пакетные задания. Он поддерживает Java, Scala и Python. Spark Streaming автоматически восстанавливает потерянные данные и состояние исполнительного модуля без какого-либо дополнительного кода со стороны разработчика. Spark Streaming позволяет повторно использовать один и тот же код для пакетной обработки, объединять потоки с данными или выполнять специальные запросы по состоянию потока.
Spark Streaming может считывать данные из HDFS , Flume , Kafka , Twitter и ZeroMQ. Можно определить свои собственные источники данных. Spark Streaming может запускаться в режиме автономного кластера Spark или в других поддерживаемых диспетчерах ресурсов кластера. Он также включает режим локального запуска для разработки. В производственной среде Spark Streaming использует ZooKeeper и HDFS для обеспечения высокой доступности.
Создание входных потоков Kafka на Python с использованием Twitter API и VK API.
Для создания входных потоков будем использовать модный нынче язык программирования Python. Получать данные будем через API социальных сетей, хотя их возможности в буржуйских сетях на данный момент сильно урезаются, а в Twitter для новых аккаунтов просто недоступны. Альтернативой может быть парсинг веб-страниц, но это требует большего количества времени на разработку, и не об этом сейчас речь.
Начнем с Twitter.
Первый этап взаимодействия с любыми API социальных сетей — аутентификация. Для этого нам нужно зарегистрировать в данной сети свое приложение, и получить для него ключи безопасности и токены. В Twitter их аж 4 штуки: consumer_key, consumer_secret, access_token, access_token_secret. Данные ключи доступны по следующей ссылке в разделе Keys and tokens.
Теперь про Python, для взаимодействия с Twitter API я использовал библиотеку tweepy.
Код аутентификации выглядит следующим образом:
from tweepy import OAuthHandler
from tweepy import API
auth = OAuthHandler(consumer_key, consumer_secret)
Все, мы авторизовались, теперь можем отправлять запросы к социальной сети. Параметры запроса:
keywords_to_track = [u’Russia’] — отбирать твиты со словом Russia в кодировке юникод.
myStream.filter(track = keywords_to_track, languages=[‘en’])— язык английский.
В потоке (класс MyStreamListener) определяем брокер kafka, у меня он bootstrap_servers=[‘172.20.161.141:9094’]. master.mshome.net как на скриншоте, более удобно.
Далее указываем тему my_topic = ‘Twitter’, парсим полученный json и отправляем в Kafka.
Скриншот скрипта представлен ниже.
Скрипт коротенький, поэтому еще что-то пояснять не вижу смысла.
Теперь поработаем с VK.
Комментарии
Комментировать могуть только зарегистрированные пользователи
Потоковая обработка с помощью полностью управляемых подсистем обработки данных с открытым кодом
В этой статье представлен пример решения потоковой передачи, использующего полностью управляемые службы данных Azure.
Архитектура
Скачайте файл Visio этой архитектуры.
Рабочий процесс
- Компонент Центров событий для Apache Kafka передает события от производителей Kafka.
- Apache Spark потребляет события. AKS предоставляет управляемую среду для заданий Apache Spark.
- Приложение, использующее Azure Cosmos DB для Apache Cassandra, записывает события в Cassandra. Эта база данных служит в качестве платформы хранения для событий. AKS размещает микрослужбы, которые выполняют запись в Cassandra.
- Компонент канала изменений Azure Cosmos DB обрабатывает события в реальном времени.
- Запланированные приложения выполняют пакетную обработку для событий в Cassandra.
- Хранилища эталонных данных обогащают сведения о событиях. Пакетные приложения записывают обогащенные сведения о событиях в PostgreSQL. К типичным хранилищам эталонных данных относятся следующие:
- Azure Data Lake Storage, которое может хранить данные в открытых форматах, например Parquet.
- Реляционные хранилища данных с открытым кодом, например PostgreSQL и MySQL.
- Пакетные приложения обрабатывают данные Cassandra. Такое приложение сохраняет обработанные данные в Базе данных Azure для PostgreSQL. Это реляционное хранилище данных предоставляет данные целевым приложениям, нуждающимся в обогащенной информации.
- Приложения и средства для создания отчетов анализируют данные в базе данных PostgreSQL. Например, Power BI подключается к базе данных с помощью соединителя Базы данных Azure для PostgreSQL. Эта служба отчетов затем отображает подробные визуальные представления данных.
- Кэш Azure для Redis предоставляет кэш в памяти. В этом решении кэш содержит данные о критических событиях. Приложение сохраняет данные в кэше и извлекает их из кэша.
- Веб-сайты и другие приложения используют кэшированные данные для улучшения времени отклика. Иногда данные недоступны в кэше. В таких случаях эти приложения используют шаблон программирования отдельно от кэша или подобную стратегию для извлечения данных из Cassandra в Azure Cosmos DB.
Компоненты
- Центры событий — это полностью управляемая платформа потоковой передачи, которая может обрабатывать миллионы событий в секунду. Центры событий предоставляют конечную точку для Apache Kafka, популярной платформы обработки данных потоковой передачи с открытым кодом. Если организации используют компонент конечной точки, им не нужно создавать и обслуживать кластеры Kafka для обработки данных потоковой передачи. Вместо этого они могут получить все преимущества полностью управляемой реализации Kafka, предлагаемых Центрами событий.
- Azure Cosmos DB — это полностью управляемая база данных NoSQL и реляционная база данных, которая предлагает репликацию с несколькими master. Azure Cosmos DB поддерживает API с открытым кодом для множества баз данных, языков и платформ. Примеры:
- Apache Cassandra.
- Gremlin.
- MongoDB.
С помощью Azure Cosmos DB для Apache Cassandra можно получить доступ к данным Azure Cosmos DB с помощью инструментов, языков и драйверов Apache Cassandra. Apache Cassandra — это база данных NoSQL с открытым кодом, которая хорошо подходит для рабочих нагрузок с высокой интенсивностью записи.
Альтернативные варианты
Продукты и службы с поддержкой открытого кода в этом решении можно заменить на другие. Дополнительные сведения о службах с открытым кодом, доступных в Azure, см. в статье Открытый код в Azure.
Сведения о сценарии
Полностью управляемые службы данных Azure, которые выполняют подсистемы с открытым исходным кодом, составляют это решение потоковой передачи:
- Центры событий Azure предлагают реализацию Kafka для приема потоковой передачи.
- Azure Cosmos DB поддерживает хранение событий в Cassandra.
- Служба Azure Kubernetes (AKS) размещает микрослужбы Kubernetes для обработки потоковой передачи.
- База данных Azure для PostgreSQL управляет реляционным хранилищем данных в PostgreSQL.
- Кэш Azure для Redis управляет хранилищами данных Redis в памяти.
Технологии с открытым кодом предоставляют множество преимуществ. Например, организации могут использовать технологии с открытым для следующего:
- Перенос существующих рабочих нагрузок.
- Поддержка широкого сообщества, работающего с открытым кодом.
- Устранение зависимости от одного поставщика.
Предоставляя технологии с открытым кодом, средства и службы Azure помогают организациям получить их преимущества и разработать нужные им решения.
Это решение использует полностью управляемые службы на основе модели PaaS (платформа как услуга). Поэтому за установку исправлений, обслуживание в рамках SLA и другие задачи управления отвечает корпорация Майкрософт. Еще одним преимуществом является встроенная интеграция с инфраструктурой безопасности Azure.
Потенциальные варианты использования
Это решение применяется к разным сценариям:
- Использование служб PaaS Azure для создания современных решений потоковой передачи, которые используют технологии с открытым кодом.
- Перенос решений обработки потоковой передачи с открытым кодом в Azure.
Рекомендации
Эти рекомендации реализуют основные принципы Azure Well-Architected Framework, которая представляет собой набор руководящих принципов, которые можно использовать для повышения качества рабочей нагрузки. Дополнительные сведения см. в статье Microsoft Azure Well-Architected Framework.
Разрабатывайте и реализуйте каждую службу с применением лучших методик. Рекомендации по каждой службе см. на сайте документации Майкрософт. Кроме того, ознакомьтесь со сведениями в следующих разделах:
Производительность
- Реализуйте пулы подключений для Базы данных Azure для PostgreSQL. Вы можете использовать библиотеку пулов подключений в самом приложении. Или же вы можете использовать средство для создания пулов подключений, например PgBouncer или Pgpool. Установка подключения с PostgreSQL — это ресурсоемкая операция. Благодаря пулам подключений вы можете избежать ухудшения производительности приложения. PgBouncer встроен в База данных Azure для PostgreSQL гибкий сервер.
- Настройте Azure Cosmos DB для Apache Cassandra для оптимальной производительности с помощью соответствующей стратегии секционирования. Решите, будете ли вы использовать первичный ключ с одним полем, составной первичный ключ или составной ключ секции при секционировании таблиц.
Масштабируемость
- При выборе уровня Центра событий учитывайте свои требования к потоковой передаче:
- Если вам требуется пропускная способность на уровне не более 120 Мбит/с, рекомендуем уровень «Премиум». Он позволяет гибко масштабироваться для соответствия требованиям потоковой передачи.
- Для рабочих нагрузок потоковой передачи высокого уровня с входящим трафиком на уровне гигабайт данных, рекомендуем уровень «Выделенный». Этот уровень является однотенатным предложением с гарантированной емкостью. Масштаб выделенных кластеров можно уменьшать и увеличивать.
Безопасность
Безопасность обеспечивает гарантии от преднамеренных атак и злоупотреблений ценными данными и системами. Дополнительные сведения см. в статье Общие сведения о принципах безопасности.
- Используйте Приватный канал Azure, чтобы включить службы Azure в свою виртуальную сеть. При использовании Приватного канала трафик между службами и вашей сетью передается по магистрали Azure без прохода по общедоступным сетям Интернета. Службы Azure в этом решении поддерживают Приватный канал только для отдельных SKU.
- Проверьте соответствие политикам безопасности в своей организации. С помощью Azure Cosmos DB для Apache Cassandra ключи предоставляют доступ к таким ресурсам, как пространства ключей и таблицы. Такие ключи хранятся в экземпляре Azure Cosmos DB. Ваши политики безопасности могут требовать передачи таких ключей в службу управления ключами, например Azure Key Vault. Также обеспечьте смену ключей в соответствии с политиками организации.
Устойчивость
Используйте зоны доступности для защиты критически важных для бизнеса приложений от сбоев в центре обработки данных. Службы этого решения поддерживают зоны доступности для выбранных SKU в регионах с зонами доступности. Актуальные сведения см. в списке служб с поддержкой зон доступности.
Оптимизация затрат
Оптимизация затрат заключается в поиске способов уменьшения ненужных расходов и повышения эффективности работы. Дополнительные сведения см. в разделе Обзор критерия «Оптимизация затрат».
Для оценки стоимости решения используйте калькулятор цен Azure. Также учитывайте следующее:
- Центры событий доступны в уровнях «Базовый», «Стандартный», «Премиум» и «Выделенный». Уровни «Премиум» и «Выделенный» оптимальны для крупномасштабных рабочих нагрузок потоковой передачи. Вы можете масштабировать пропускную способность, поэтому мы рекомендуем начать с небольшого объема ресурсов и масштабироваться при повышении спроса.
- Azure Cosmos DB предлагает две модели:
- Модель с подготовленной пропускной способностью, которая идеально подходит для требовательных рабочих нагрузок. Эта модель доступна в двух вариантах управления емкостью: стандартный и с автомасштабированием.
- Бессерверная модель, которая хорошо подходит для выполнения небольших рабочих нагрузок с частыми пиками.
- Кластеризация
- Сохраняемость
- Активная георепликация
Развертывание этого сценария
При развертывании решения учитывайте следующее:
- При развертывании Центров событий для Kafka изучите статью Краткое руководство. Потоковая передача данных с помощью Центров событий с использованием протокола Kafka. Эта статья содержит следующие сведения.
- Как отправлять и получать сообщения с помощью Kafka в Центрах событий.
- Пример кода для публикации приложения.
- Как перенести существующие приложения Kafka в Центры событий для Kafka, внеся изменения в конфигурацию.
- Сведения о создании базового приложения Spark См. в статье Подключение приложения Apache Spark к Центрам событий Azure.
- Сведения о размещении приложения Spark в AKS см. в статье Выполнение заданий Apache Spark в AKS.
- Как использовать предикаты запросов в языке запросов Cassandra (CQL) для отправки запросов к API канала изменений.
- Пример кода для приложения Java.
Соавторы
Эта статья обновляется и поддерживается корпорацией Майкрософт. Первоначально она была написана следующими авторами.
- Аджит Анантрам | Архитектор облачных решений
Дальнейшие действия
- Руководство разработчика Apache Kafka для концентраторов событий Azure
- Часто задаваемые вопросы об Azure Cosmos DB для Apache Cassandra
- Рекомендации по созданию приложения с помощью Базы данных Azure для PostgreSQL
- Вопросы и ответы по кэшу Redis для Azure
Связанные ресурсы
Чтобы узнать больше о связанных решениях, см. следующие сведения:
- Проектирование архитектуры аналитики
- Выбор хранилища аналитических данных в Azure
- Выбор технологии для анализа данных в Azure
- Использование Azure Kubernetes при обработке потока событий
- Потоковая передача данных с помощью AKS
Конвейер реального времени. Потоковая обработка данных
Правильный способ сбора данных в реальном времени. Авторитетное руководство по освоению архитектурного проектирования потоковых приложений корпоративного уровня. Подробное объяснение различных систем, стратегий и инструментов для реализации потоковой обработки данных. Эта книга содержит все необходимое для понимания потоковой обработки. Эта насыщенная идеями книга научит вас думать об эффективном взаимодействии с быстрыми потоками данных. В ней выдержан идеальный баланс между широкой картиной и деталями реализации. На содержательных примерах и практических задачах вы узнаете о проектировании приложений, которые читают, анализируют, разделяют и сохраняют потоковые данные. Попутно вы поймете, какую роль играют такие технологии, как Spark, Storm, Kafka, Flink, RabbitMQ и другие. Издание ориентировано на разработчиков, знакомых с концепциями реляционных баз данных.
Apache Kafka и потоковая обработка данных с помощью Spark Streaming
Привет, Хабр! Сегодня мы построим систему, которая будет при помощи Spark Streaming обрабатывать потоки сообщений Apache Kafka и записывать результат обработки в облачную базу данных AWS RDS.
Представим, что некая кредитная организация ставит перед нами задачу обработки входящих транзакций «на лету» по всем своим филиалам. Это может быть сделано с целью оперативного расчета открытой валютой позиции для казначейства, лимитов или финансового результата по сделкам и т.д.
Как реализовать этот кейс без применения магии и волшебных заклинаний — читаем под катом! Поехали!
(Источник картинки)
Введение
Безусловно, обработка большого массива данных в реальном времени предоставляет широкие возможности для использования в современных системах. Одной из популярнейших комбинаций для этого является тандем Apache Kafka и Spark Streaming, где Kafka — создает поток пакетов входящих сообщений, а Spark Streaming обрабатывает эти пакеты через заданный интервал времени.
Для повышения отказоустойчивости приложения, будем использовать контрольные точки — чекпоинты (checkpoints). При помощи этого механизма, когда модулю Spark Streaming потребуется восстановить утраченные данные, ему нужно будет только вернуться к последней контрольной точке и возобновить вычисления от нее.
Архитектура разрабатываемой системы
- Apache Kafka — это распределенная система обмена сообщениями с публикацией и подпиской. Подходит как для автономного, так и для онлайнового потребления сообщений. Для предотвращения потери данных сообщения Kafka сохраняются на диске и реплицируются внутри кластера. Система Kafka построена поверх службы синхронизации ZooKeeper;
- Apache Spark Streaming — компонент Spark для обработки потоковых данных. Модуль Spark Streaming построен с применением «микропакетной» архитектуры (micro-batch architecture), когда поток данных интерпретируется как непрерывная последовательность маленьких пакетов данных. Spark Streaming принимает данные из разных источников и объединяет их в небольшие пакеты. Новые пакеты создаются через регулярные интервалы времени. В начале каждого интервала времени создается новый пакет, и любые данные, поступившие в течение этого интервала, включаются в пакет. В конце интервала увеличение пакета прекращается. Размер интервала определяется параметром, который называется интервал пакетирования (batch interval);
- Apache Spark SQL — объединяет реляционную обработку с функциональным программированием Spark. Под структурированными данными подразумеваются данные, имеющие схему, то есть единый набор полей для всех записей. Spark SQL поддерживает ввод из множества источников структурированных данных и, благодаря наличию информации о схеме, он может эффективно извлекать только необходимые поля записей, а также предоставляет API-интерфейсы DataFrame;
- AWS RDS — это cравнительно недорогая облачная реляционная база данных, веб-сервис, который упрощает настройку, эксплуатацию и масштабирование, администрируется непосредcтвенно Amazon.
Установка и запуск сервера Kafka
Перед непосредственным использованием Kafka, необходимо убедиться в наличии Java, т.к. для работы используется JVM:
sudo apt-get update sudo apt-get install default-jre java -version
Создадим нового пользователя для работы с Kafka:
sudo useradd kafka -m sudo passwd kafka sudo adduser kafka sudo
Далее скачиваем дистрибутив с официального сайта Apache Kafka:
wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"
Распаковываем скаченный архив:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Следующий шаг — опциональный. Дело в том, что настройки по умолчанию не позволяют полноценно использовать все возможности Apache Kafka. Например, удалять тему, категорию, группу, на которые могут быть опубликованы сообщения. Чтобы изменить это, отредактируем файл конфигурации:
vim ~/kafka/config/server.properties
Добавьте в конец файла следующее:
delete.topic.enable = true
Перед запуском сервера Kafka, необходимо стартовать сервер ZooKeeper, будем использовать вспомогательный скрипт, который поставляется вместе с дистрибутивом Kafka:
Cd ~/kafka bin/zookeeper-server-start.sh config/zookeeper.properties
После того, как ZooKeeper успешно стартовал, в отдельном терминале запускаем сервер Kafka:
bin/kafka-server-start.sh config/server.properties
Создадим новый топик под названием Transaction:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction
Убедимся, что топик с нужным количеством партиций и репликацией был создан:
bin/kafka-topics.sh --describe --zookeeper localhost:2181
Упустим моменты тестирования продюсера и консьюмера для вновь созданного топика. Более подробно о том, как можно протестировать отправку и прием сообщений, написано в официальной документации — Send some messages . Ну а мы переходим к написанию продюсера на Python с использованием KafkaProducer API.
Написание продюсера
Продюсер будет генерить случайные данные — по 100 сообщений каждую секунду. Под случайными данными будем понимать словарь, состоящий из трех полей:
- Branch — наименование точки продаж кредитной организации;
- Currency — валюта сделки;
- Amount — сумма сделки. Сумма будет положительным числом, если это покупка валюты Банком, и отрицательным — если продажа.
Код для продюсера выглядит следующим образом:
from numpy.random import choice, randint def get_random_value(): new_dict = <> branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut'] currency_list = ['RUB', 'USD', 'EUR', 'GBP'] new_dict['branch'] = choice(branch_list) new_dict['currency'] = choice(currency_list) new_dict['amount'] = randint(-100, 100) return new_dict
Далее, используя метод send, отправляем сообщение на сервер, в нужный нам топик, в формате JSON:
from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x:dumps(x).encode('utf-8'), compression_type='gzip') my_topic = 'transaction' data = get_random_value() try: future = producer.send(topic = my_topic, value = data) record_metadata = future.get(timeout=10) print('--> The message has been sent to a topic: <>, partition: <>, offset: <>' .format(record_metadata.topic, record_metadata.partition, record_metadata.offset )) except Exception as e: print('--> It seems an Error occurred: <>'.format(e)) finally: producer.flush()
При запуске скрипта получаем в терминале следующие сообщения:
Это означает, что все работает как мы хотели — продюсер генерит и отправляет сообщения в нужный нам топик.
Следующим шагом будет установка Spark и обработка этого потока сообщений.Установка Apache Spark
Apache Spark — это универсальная и высокопроизводительная кластерная вычислительная платформа.
По производительности Spark превосходит популярные реализации модели MapReduce, попутно обеспечивая поддержку более широкого диапазона типов вычислений, включая интерактивные запросы и потоковую обработку. Скорость играет важную роль при обработке больших объемов данных, так как именно скорость позволяет работать в интерактивном режиме, не тратя минуты или часы на ожидание. Одно из важнейших достоинств Spark, обеспечивающих столь высокую скорость, — способность выполнять вычисления в памяти.
Данный фреймворк написан на Scala, поэтому необходимо установить ее в первую очередь:
sudo apt-get install scala
Скачиваем с официального сайта дистрибутив Spark:
wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"
sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark
Добавляем путь к Spark в bash-файл:
vim ~/.bashrc
Добавляем через редактор следующие строчки:
SPARK_HOME=/usr/local/spark export PATH=$SPARK_HOME/bin:$PATH
Выполняем команду ниже после внесения правок в bashrc:
source ~/.bashrc
Развертывание AWS PostgreSQL
Осталось развернуть базу данных, куда будем заливать обработанную информацию из потоков. Для этого будем использовать сервис AWS RDS.
Заходим в консоль AWS —> AWS RDS —> Databases —> Create database:
Выбираем PostgreSQL и нажимаем кнопку Next:
Т.к. данный пример разбирается исключительно в образовательных целях, будем использовать бесплатный сервер «на минималках» (Free Tier):
Далее, ставим галочку в блоке Free Tier, и после этого нам автоматом будет предложен инстанс класса t2.micro — хоть и слабенький, но бесплатный и вполне подойдет для нашей задачи:
Следом идут очень важные вещи: наименование инстанса БД, имя мастер-пользователя и его пароль. Назовем инстанст: myHabrTest, мастер-пользователь: habr, пароль: habr12345 и нажимаем на кнопку Next:
На следующей странице находятся параметры, отвечающие за доступность нашего сервера БД извне (Public accessibility) и доступность портов:
Давайте создадим новую настройку для VPC security group, которая позволит извне обращаться к нашему серверу БД через порт 5432 (PostgreSQL).
Перейдем в отдельном окне браузера к консоли AWS в раздел VPC Dashboard —> Security Groups —> Create security group:Задаем имя для Security group — PostgreSQL, описание, указываем к какой VPC данная группа должна быть ассоциирована и нажимаем кнопку Create:
Заполняем для свежесозданной группы Inbound rules для порта 5432, как показано на картинке ниже. Вручную порт можно не указывать, а выбрать PostgreSQL из раскрывающегося списка Type.
Строго говоря, значение ::/0 означает доступность входящего траффика для сервера со всего мира, что канонически не совсем верно, но для разбора примера позволим себе использовать такой подход:
Возвращаемся к странице браузера, где у нас открыто «Configure advanced settings» и выбираем в разделе VPC security groups —> Choose existing VPC security groups —> PostgreSQL:
Далее, в разделе Database options —> Database name —> задаем имя — habrDB.
Остальные параметры, за исключением разве что отключения бэкапирования (backup retention period — 0 days), мониторинга и Performance Insights, можем оставить по умолчанию. Нажимаем на кнопку Create database:
Обработчик потоков
Завершающим этапом будет разработка Spark-джобы, которая будет каждые две секунды обрабатывать новые данные, пришедшие от Kafka и заносить результат в базу данных.
Как было отмечено выше, контрольные точки (сheckpoints) — это основной механизм в SparkStreaming, который должен быть настроен для обеспечения отказоустойчивости. Будем использовать контрольные точки и, в случае падения процедуры, модулю Spark Streaming для восстановления утраченных данных нужно будет только вернуться к последней контрольной точке и возобновить вычисления от нее.
Контрольную точку можно включить, установив каталог в отказоустойчивой, надежной файловой системе (например, HDFS, S3 и т. Д.), в которой будет сохранена информация контрольной точки. Это делается с помощью, например:
streamingContext.checkpoint(checkpointDirectory)
В нашем примере будем использовать следующий подход, а именно, если checkpointDirectory существует, то контекст будет воссоздан из данных контрольной точки. Если каталог не существует (т.е. выполняется в первый раз), то вызывается функция functionToCreateContext для создания нового контекста и настройки DStreams:
from pyspark.streaming import StreamingContext context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Создаем объект DirectStream с целью подключения к топику «transaction» при помощи метода createDirectStream библиотеки KafkaUtils:
from pyspark.streaming.kafka import KafkaUtils sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 2) broker_list = 'localhost:9092' topic = 'transaction' directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], )
Парсим входящие данные в формате JSON:
rowRdd = rdd.map(lambda w: Row(branch=w['branch'], currency=w['currency'], amount=w['amount'])) testDataFrame = spark.createDataFrame(rowRdd) testDataFrame.createOrReplaceTempView("treasury_stream")
Используя Spark SQL, делаем несложную группировку и выводим результат в консоль:
select from_unixtime(unix_timestamp()) as curr_time, t.branch as branch_name, t.currency as currency_code, sum(amount) as batch_value from treasury_stream t group by t.branch, t.currency
Получение текста запроса и запуск его через Spark SQL:
sql_query = get_sql_query() testResultDataFrame = spark.sql(sql_query) testResultDataFrame.show(n=5)
А затем сохраняем полученные агрегированные данные в таблицу в AWS RDS. Чтобы сохранить результаты агрегации в таблицу базы данных, будем использовать метод write объекта DataFrame:
testResultDataFrame.write .format("jdbc") .mode("append") .option("driver", 'org.postgresql.Driver') .option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB") .option("dbtable", "transaction_flow") .option("user", "habr") .option("password", "habr12345") .save()
Несколько слов о настройке подключения к AWS RDS. Пользователя и пароль к нему мы создавали на шаге «Развертывание AWS PostgreSQL». В качестве url сервера баз данных следует использовать Endpoint, который отображается в разделе Connectivity & security:
В целях корректной связки Spark и Kafka, следует запускать джобу через smark-submit с использованием артефакта spark-streaming-kafka-0-8_2.11. Дополнительно применим также артефакт для взаимодействия с базой данных PostgreSQL, их будем передавать через —packages.
Для гибкости скрипта, вынесем в качестве входных параметров также наименование сервера сообщений и топик, из которого хотим получать данные.
Итак, пришло время запустить и проверить работоспособность системы:
spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2, org.postgresql:postgresql:9.4.1207 spark_job.py localhost:9092 transaction
Все получилось! Как видно на картинке ниже — в процессе работы приложения новые результаты агрегации выводятся каждые 2 секунды, потому что мы установили интервал пакетирования равным 2 секундам, когда создавали объект StreamingContext:
Далее, делаем нехитрый запрос к базе данных, чтобы проверить наличие записей в таблице transaction_flow:
Заключение
В данной статье был рассмотрен пример поточной обработки информации с использованием Spark Streaming в связке с Apache Kafka и PostgreSQL. С ростом объемов данных из различных источников, сложно переоценить практическую ценность Spark Streaming для создания потоковых приложений и приложений, действующих в масштабе реального времени.
Полный исходный код вы можете найти в моем репозитории на GitHub .
С удовольствием готов обсудить данную статью, жду Ваших комментариев, а также, надеюсь на конструктивную критику всех неравнодушных читателей.
Ps. Первоначально планировалось использовать локальную БД PostgreSQL, но учитывая мою любовь к AWS, я решил вынести базу данных в облако. В следующей статье по этой теме я покажу, как реализовать целиком вышеописанную систему в AWS при помощи AWS Kinesis и AWS EMR. Следите за новостями!
Добавить комментарий Отменить ответ
Для отправки комментария вам необходимо авторизоваться.