PHPFixing
  • Privacy Policy
  • TOS
  • Ask Question
  • Contact Us
  • Home
  • PHP
  • Programming
  • SQL Injection
  • Web3.0

Monday, August 1, 2022

[FIXED] How do I set FTP passive mode in Spark?... to read a file from FTP Server

 August 01, 2022     apache-commons-net, apache-spark, docker, ftp, hadoop     No comments   

Issue

I am reading a file from FTP server into spark rdd like this

val rdd = spark.sparkContext.textFile("ftp://anonymous:pwd@<hostname>/data.gz")
rdd.count
...

This actually works when I run the spark application from my Local Machine (Mac), but when I try to run the same application from the docker container (running in Mac), I am getting the following exception,

Exception in thread "main" org.apache.commons.net.ftp.FTPConnectionClosedException: Connection closed without indication.
    at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:313)
    at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:290)
    at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:479)
    at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:552)
    at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:601)
    at org.apache.commons.net.ftp.FTP.quit(FTP.java:809)
    at org.apache.commons.net.ftp.FTPClient.logout(FTPClient.java:979)
    at org.apache.hadoop.fs.ftp.FTPFileSystem.disconnect(FTPFileSystem.java:168)
    at org.apache.hadoop.fs.ftp.FTPFileSystem.getFileStatus(FTPFileSystem.java:415)
    at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57)
    at org.apache.hadoop.fs.Globber.glob(Globber.java:252)
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1676)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:259)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
    at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
    at org.apache.spark.MapOutputTrackerMaster.getPreferredLocationsForShuffle(MapOutputTracker.scala:626)
    at org.apache.spark.rdd.ShuffledRDD.getPreferredLocations(ShuffledRDD.scala:99)
    at org.apache.spark.rdd.RDD.$anonfun$preferredLocations$2(RDD.scala:300)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:300)
    at org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(DAGScheduler.scala:2098)
    at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:2072)
    at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1794)
    at org.apache.spark.rdd.DefaultPartitionCoalescer.currPrefLocs(CoalescedRDD.scala:180)
    at org.apache.spark.rdd.DefaultPartitionCoalescer$PartitionLocations.$anonfun$getAllPrefLocs$1(CoalescedRDD.scala:198)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
    at org.apache.spark.rdd.DefaultPartitionCoalescer$PartitionLocations.getAllPrefLocs(CoalescedRDD.scala:197)
    at org.apache.spark.rdd.DefaultPartitionCoalescer$PartitionLocations.<init>(CoalescedRDD.scala:190)
    at org.apache.spark.rdd.DefaultPartitionCoalescer.coalesce(CoalescedRDD.scala:391)
    at org.apache.spark.rdd.CoalescedRDD.getPartitions(CoalescedRDD.scala:90)
    at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1227)
    at com.mypackage.Myapp$.parseData(Myapp.scala:76)

In the container, even the ftp command line utility also have the same issue, but found out by setting the passive mode in ftp CLI, I am able to successfully transfer file from FTP server to the container,

ftp <host>
...
ftp> passive
Passive mode on.
ftp> get data.gz
227 Entering Passive Mode ...
226 Transfer complete
20676672 bytes received in 25.53 secs (790.9552 kB/s)

So my question here is...How do I set the passive mode property?... when reading the file in Spark using param.spark.sparkContext.textFile("ftp://anonymous:pwd@<hostname>/data.gz")


Solution

I do not have experience with Spark, so I do not know how it glues with Hadoop. But in Hadoop, you can set up FTP passive mode by setting fs.ftp.data.connection.mode configuration option:

fs.ftp.data.connection.mode=PASSIVE_LOCAL_DATA_CONNECTION_MODE

You need Hadoop 2.9 at least: https://issues.apache.org/jira/browse/HADOOP-13953



Answered By - Martin Prikryl
Answer Checked By - Pedro (PHPFixing Volunteer)
  • Share This:  
  •  Facebook
  •  Twitter
  •  Stumble
  •  Digg
Newer Post Older Post Home

0 Comments:

Post a Comment

Note: Only a member of this blog may post a comment.

Total Pageviews

Featured Post

Why Learn PHP Programming

Why Learn PHP Programming A widely-used open source scripting language PHP is one of the most popular programming languages in the world. It...

Subscribe To

Posts
Atom
Posts
Comments
Atom
Comments

Copyright © PHPFixing