PySpark Fundamentals and Implement Top Ten Pattern

Explanation Video Link on Youtube

B站中文解说视频链接

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Colab").getOrCreate()
rdd=spark.sparkContext.parallelize(["bird", "cat","dog","cat","dog","dog", "dog", "lion", "pig", "pig", "cat", "snake"], 2)
rdd.glom().collect()

def f(iterator):
  new_it = sorted(map(lambda x: (x, 1), iterator))
  from itertools import groupby
  groups = groupby(new_it, lambda y: y[0])
  KeyToCount = [(key, sum(j for i, j in group)) for key,group in groups]
  
  from heapq import nlargest
  largest_list = nlargest(2, KeyToCount, key=lambda e:e[1])

  for key in largest_list:
    yield key
  
rdd_after_map=rdd.mapPartitions(f)
rdd_after_map.glom().collect()

from operator import add
rdd_after_group=rdd_after_map.reduceByKey(add)
print(rdd_after_group.collect())

nlargest(2, rdd_after_group.collect(), key=lambda e:e[1])

Last updated

Was this helpful?