0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

如何通過python輕松處理大文件

jf_TEuU2tls ? 來源:浩道linux ? 2023-04-27 10:54 ? 次閱讀

前言

大家好,這里是浩道linux,主要給大家分享linux、python、網(wǎng)絡(luò)通信相關(guān)的IT知識(shí)平臺(tái)。

眾所周知,python除了以簡(jiǎn)潔著稱,其成熟的第三方庫功能也是很強(qiáng)大的,今天浩道帶大家看看如何通過python輕松處理大文件,真讓人直呼yyds。

為了進(jìn)行并行處理,我們將任務(wù)劃分為子單元。它增加了程序處理的作業(yè)數(shù)量,減少了整體處理時(shí)間。

例如,如果你正在處理一個(gè)大的CSV文件,你想修改一個(gè)單列。我們將把數(shù)據(jù)以數(shù)組的形式輸入函數(shù),它將根據(jù)可用的進(jìn)程數(shù)量,一次并行處理多個(gè)值。這些進(jìn)程是基于你的處理器內(nèi)核的數(shù)量。

在這篇文章中,我們將學(xué)習(xí)如何使用multiprocessing、joblib和tqdm Python包減少大文件的處理時(shí)間。這是一個(gè)簡(jiǎn)單的教程,可以適用于任何文件、數(shù)據(jù)庫、圖像、視頻音頻。

開始

我們將使用來自 Kaggle 的 US Accidents (2016 - 2021) 數(shù)據(jù)集,它包括280萬條記錄和47個(gè)列。

https://www.kaggle.com/datasets/sobhanmoosavi/us-accidents

我們將導(dǎo)入multiprocessing、joblib和tqdm用于并行處理,pandas用于數(shù)據(jù)導(dǎo)入,re、nltk和string用于文本處理。

# Parallel Computing


import multiprocessing as mp


from joblib import Parallel, delayed


from tqdm.notebook import tqdm


# Data Ingestion 


import pandas as pd


# Text Processing 


import re 


from nltk.corpus import stopwords


import string

在我們開始之前,讓我們通過加倍cpu_count()來設(shè)置n_workers。正如你所看到的,我們有8個(gè)workers。

n_workers = 2 * mp.cpu_count()


print(f"{n_workers} workers are available")


>>> 8 workers are available

下一步,我們將使用pandas read_csv函數(shù)讀取大型CSV文件。然后打印出dataframe的形狀、列的名稱和處理時(shí)間。

%%time
file_name="../input/us-accidents/US_Accidents_Dec21_updated.csv"
df = pd.read_csv(file_name)


print(f"Shape:{df.shape}

Column Names:
{df.columns}
")

輸出:

Shape:(2845342,47)


Column Names:


Index(['ID', 'Severity', 'Start_Time', 'End_Time', 'Start_Lat', 'Start_Lng',
'End_Lat', 'End_Lng', 'Distance(mi)', 'Description', 'Number', 'Street',
'Side', 'City', 'County', 'State', 'Zipcode', 'Country', 'Timezone',
'Airport_Code', 'Weather_Timestamp', 'Temperature(F)', 'Wind_Chill(F)',
'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Direction',
'Wind_Speed(mph)', 'Precipitation(in)', 'Weather_Condition', 'Amenity',
'Bump', 'Crossing', 'Give_Way', 'Junction', 'No_Exit', 'Railway',
'Roundabout', 'Station', 'Stop', 'Traffic_Calming', 'Traffic_Signal',
'Turning_Loop', 'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight',
'Astronomical_Twilight'],
dtype='object')


CPU times: user 33.9 s, sys: 3.93 s, total: 37.9 s
Wall time: 46.9 s

處理文本

clean_text是一個(gè)用于處理文本的簡(jiǎn)單函數(shù)。我們將使用nltk.copus獲得英語停止詞,并使用它來過濾掉文本行中的停止詞。之后,我們將刪除句子中的特殊字符和多余的空格。它將成為確定串行、并行和批處理的處理時(shí)間的基準(zhǔn)函數(shù)。

def clean_text(text): 
  # Remove stop words
  stops = stopwords.words("english")
  text = " ".join([word for word in text.split() if word 
 not in stops])
  # Remove Special Characters
  text = text.translate(str.maketrans('', '', string.punctuation))
  # removing the extra spaces
  text = re.sub(' +',' ', text)
  return text

串行處理

對(duì)于串行處理,我們可以使用pandas的.apply()函數(shù),但是如果你想看到進(jìn)度條,你需要為pandas激活tqdm,然后使用.progress_apply()函數(shù)。

我們將處理280萬條記錄,并將結(jié)果保存回 “Description” 列中。

%%time
tqdm.pandas()


df['Description'] = df['Description'].progress_apply(clean_text)

輸出

高端處理器串行處理280萬行花了9分5秒。

100%  2845342/2845342 [09:05<00:00, 5724.25it/s]


CPU times: user 8min 14s, sys: 53.6 s, total: 9min 7s
Wall time: 9min 5s

多進(jìn)程處理

有多種方法可以對(duì)文件進(jìn)行并行處理,我們將了解所有這些方法。multiprocessing是一個(gè)內(nèi)置的python包,通常用于并行處理大型文件。

我們將創(chuàng)建一個(gè)有8個(gè)workers的多處理池,并使用map函數(shù)來啟動(dòng)進(jìn)程。為了顯示進(jìn)度條,我們將使用tqdm。

map函數(shù)由兩部分組成。第一個(gè)部分需要函數(shù),第二個(gè)部分需要一個(gè)參數(shù)或參數(shù)列表。

%%time
p = mp.Pool(n_workers) 


df['Description'] = p.map(clean_text,tqdm(df['Description']))

輸出

我們的處理時(shí)間幾乎提高了3倍。處理時(shí)間從9分5秒下降到3分51秒。


100%  2845342/2845342 [02:58<00:00, 135646.12it/s]


CPU times: user 5.68 s, sys: 1.56 s, total: 7.23 s
Wall time: 3min 51s

并行處理

我們現(xiàn)在將學(xué)習(xí)另一個(gè)Python包來執(zhí)行并行處理。在本節(jié)中,我們將使用joblib的Parallel和delayed來復(fù)制map函數(shù)。

Parallel需要兩個(gè)參數(shù):n_job = 8和backend = multiprocessing。

然后,我們將在delayed函數(shù)中加入clean_text。

創(chuàng)建一個(gè)循環(huán),每次輸入一個(gè)值。

下面的過程是相當(dāng)通用的,你可以根據(jù)你的需要修改你的函數(shù)和數(shù)組。我曾用它來處理成千上萬的音頻和視頻文件,沒有任何問題。

建議:使用"try: "和"except: "添加異常處理。

def text_parallel_clean(array):
  result = Parallel(n_jobs=n_workers,backend="multiprocessing")(
  delayed(clean_text)
  (text) 
  for text in tqdm(array)
  )
  return result

在text_parallel_clean()中添加“Description”列。

%%time
df['Description'] = text_parallel_clean(df['Description'])

輸出

我們的函數(shù)比多進(jìn)程處理Pool多花了13秒。即使如此,并行處理也比串行處理快4分59秒。


100%  2845342/2845342 [04:03<00:00, 10514.98it/s]


CPU times: user 44.2 s, sys: 2.92 s, total: 47.1 s
Wall time: 4min 4s

并行批量處理

有一個(gè)更好的方法來處理大文件,就是把它們分成若干批,然后并行處理。讓我們從創(chuàng)建一個(gè)批處理函數(shù)開始,該函數(shù)將在單一批次的值上運(yùn)行clean_function。

批量處理函數(shù)

def proc_batch(batch):
  return [
  clean_text(text)
  for text in batch
  ]

將文件分割成批

下面的函數(shù)將根據(jù)workers的數(shù)量把文件分成多個(gè)批次。在我們的例子中,我們得到8個(gè)批次。

def batch_file(array,n_workers):
  file_len = len(array)
  batch_size = round(file_len / n_workers)
  batches = [
  array[ix:ix+batch_size]
  for ix in tqdm(range(0, file_len, batch_size))
  ]
  return batches


batches = batch_file(df['Description'],n_workers)


>>> 100% 8/8 [00:00<00:00, 280.01it/s]

運(yùn)行并行批處理

最后,我們將使用Parallel和delayed來處理批次。

%%time
batch_output = Parallel(n_jobs=n_workers,backend="multiprocessing")(
  delayed(proc_batch)
  (batch) 
  for batch in tqdm(batches)
  )


df['Description'] = [j for i in batch_output for j in i]

輸出

我們已經(jīng)改善了處理時(shí)間。這種技術(shù)在處理復(fù)雜數(shù)據(jù)和訓(xùn)練深度學(xué)習(xí)模型方面非常有名。

100%  8/8 [00:00<00:00, 2.19it/s]


CPU times: user 3.39 s, sys: 1.42 s, total: 4.81 s
Wall time: 3min 56s

tqdm 并發(fā)

tqdm將多處理帶到了一個(gè)新的水平。它簡(jiǎn)單而強(qiáng)大。

process_map需要:

函數(shù)名稱

Dataframe列名

max_workers

chucksize與批次大小類似。我們將用workers的數(shù)量來計(jì)算批處理的大小,或者你可以根據(jù)你的喜好來添加這個(gè)數(shù)字。

%%time
from tqdm.contrib.concurrent import process_map
batch = round(len(df)/n_workers)


df['Description'] = process_map(clean_text,df['Description'], max_workers=n_workers, chunksize=batch)

輸出

通過一行代碼,我們得到了最好的結(jié)果:

100%  2845342/2845342 [03:48<00:00, 1426320.93it/s]


CPU times: user 7.32 s, sys: 1.97 s, total: 9.29 s
Wall time: 3min 51s

結(jié)論

我們需要找到一個(gè)平衡點(diǎn),它可以是串行處理,并行處理,或批處理。如果你正在處理一個(gè)較小的、不太復(fù)雜的數(shù)據(jù)集,并行處理可能會(huì)適得其反。

在這個(gè)教程中,我們已經(jīng)了解了各種處理大文件的Python包,它們?cè)试S我們對(duì)數(shù)據(jù)函數(shù)進(jìn)行并行處理。

如果你只處理一個(gè)表格數(shù)據(jù)集,并且想提高處理性能,那么建議你嘗試Dask、datatable和RAPIDS。

審核編輯:湯梓紅
聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • Linux
    +關(guān)注

    關(guān)注

    87

    文章

    11207

    瀏覽量

    208714
  • 文件
    +關(guān)注

    關(guān)注

    1

    文章

    561

    瀏覽量

    24671
  • 函數(shù)
    +關(guān)注

    關(guān)注

    3

    文章

    4277

    瀏覽量

    62323
  • python
    +關(guān)注

    關(guān)注

    55

    文章

    4767

    瀏覽量

    84375
  • csv
    csv
    +關(guān)注

    關(guān)注

    0

    文章

    38

    瀏覽量

    5797

原文標(biāo)題:【yyds】python處理大文件太輕松了!

文章出處:【微信號(hào):浩道linux,微信公眾號(hào):浩道linux】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    如何在Linux使用find和du命令搜索大文件和目錄

    隨著時(shí)間的推移,您的磁盤驅(qū)動(dòng)器可能會(huì)被大文件占用大量磁盤空間,不必要文件弄得亂七八糟。
    發(fā)表于 12-07 17:17 ?4255次閱讀

    Linux平臺(tái)大文件生成和處理方法

    在日常工作中,為了驗(yàn)證某些場(chǎng)景下的功能,經(jīng)常需要人為構(gòu)造一些大文件進(jìn)行測(cè)試,有時(shí)需要用大文件來測(cè)試下載速度,有時(shí)需要用大文件來覆蓋磁盤空間;偶爾會(huì)看到一些網(wǎng)絡(luò)博文會(huì)教大家如何構(gòu)造大文件
    發(fā)表于 07-14 16:38 ?3842次閱讀

    labview讀取大文件問題

    labview讀取二進(jìn)制文件最大能讀取多大的文件,我讀160多兆的文件就不能讀取了,誰還有讀取大文件的好方法,分享一下。qq419069469
    發(fā)表于 07-29 16:57

    在Linux下如何清空或刪除大文件和大量文件

    快速刪除大文件1. 通過重定向到 Null 來清空文件內(nèi)容清空或者讓一個(gè)文件成為空白的最簡(jiǎn)單方式,是像下面那樣,通過 shell 重定向 n
    發(fā)表于 07-11 07:50

    cc3200如何讀寫大文件

    請(qǐng)教一下TI的工程師,如果需要讀寫一個(gè)大文件, 文件大小為300k。 而cc3200 RAM 為256k, ROM可以為1M。而且寫的時(shí)候,只能全部讀出來,再寫。 不能追加。 那有什么好方法可以讀寫一個(gè)大文件了?是不是將這個(gè)
    發(fā)表于 06-16 14:27

    請(qǐng)問如何上傳CSharp大文件?

    CSharp大文件上傳解決方案
    發(fā)表于 11-16 07:48

    ESP8266如何通過wifi傳輸大文件?

    我最近接到了一項(xiàng)關(guān)于 ESP12-F 模塊的任務(wù)。用戶 SPI 連接到皮質(zhì) m4 uC?,F(xiàn)在我需要通過 WiFi 將一個(gè)大文件 (150MB) 發(fā)送到 ESP8266,然后將其存儲(chǔ)在連接到 uC
    發(fā)表于 02-23 06:56

    基于PHP大文件上傳的研究和設(shè)計(jì)

    基于PHP大文件上傳的研究和設(shè)計(jì),感興趣的可以看看。
    發(fā)表于 02-22 18:15 ?6次下載

    TXT大文件切割軟體應(yīng)用程序免費(fèi)下載

    本文檔的主要內(nèi)容詳細(xì)是TXT大文件切割軟體應(yīng)用程序免費(fèi)下載,可以切割任意大小的txt文件,可以根據(jù)大小,數(shù)量,標(biāo)題等類別進(jìn)行切割,絕對(duì)是大文件切割的必備工具,該軟體綠色免安裝,誰用誰知道~
    發(fā)表于 11-07 08:00 ?5次下載
    TXT<b class='flag-5'>大文件</b>切割軟體應(yīng)用程序免費(fèi)下載

    JAVA中NIO通過MappedByteBuffer操作大文件

    java io操作中通常采用BufferedReader,BufferedInputStream等帶緩沖的IO類處理大文件,不過java nio中引入了一種基于MappedByteBuffer操作大文件的方式,其讀寫性能極高,本
    的頭像 發(fā)表于 05-05 23:42 ?3468次閱讀

    華為 8 款機(jī)型升級(jí) EMUI 11 最新版,上線暢連大文件閃傳和智感支付

    捷的智感支付。 IT之家了解到,暢連大文件閃傳功能由華為 Mate40 系列首發(fā)搭載,兩臺(tái)手機(jī)點(diǎn)對(duì)點(diǎn)異地直傳,依托華為在 5G 芯片、5G 終端和 5G 網(wǎng)絡(luò)端到端能力,可以輕松實(shí)現(xiàn)超大文件疾速可達(dá),斷點(diǎn)仍可續(xù)傳。據(jù)華為介紹,超
    的頭像 發(fā)表于 01-08 10:27 ?2929次閱讀

    Linux系統(tǒng)下傳輸大文件的切割與合并實(shí)例分析

    往往是因?yàn)榫W(wǎng)絡(luò)傳輸?shù)南拗?,?dǎo)致很多時(shí)候,我們需要在 Linux 系統(tǒng)下進(jìn)行大文件的切割。這樣將一個(gè)大文件切割成為多個(gè)小文件,進(jìn)行傳輸,傳輸完畢之后進(jìn)行合并即可。
    的頭像 發(fā)表于 07-02 11:47 ?1789次閱讀
    Linux系統(tǒng)下傳輸<b class='flag-5'>大文件</b>的切割與合并實(shí)例分析

    網(wǎng)絡(luò)工程師學(xué)Python-文件處理

    當(dāng)涉及到 Python 文件處理時(shí), 我們通常會(huì)涉及到文件的讀取和寫入, 以及文件的操作和處理。
    的頭像 發(fā)表于 04-27 09:21 ?632次閱讀

    大文件傳輸?shù)?種方法

    SendBig是在全球范圍內(nèi)發(fā)送你的文件的最簡(jiǎn)單方法。免費(fèi)分享高達(dá)30GB的大文件、照片和視頻,讓文件共享變得簡(jiǎn)單。在不注冊(cè)的情況下,可以發(fā)送高達(dá)50MB的數(shù)據(jù),如果你想發(fā)送更大的文件
    的頭像 發(fā)表于 07-29 11:12 ?3815次閱讀
    <b class='flag-5'>大文件</b>傳輸?shù)?種方法

    如何使用Python讀取寫入Word文件

    01 準(zhǔn)備 Python 是一種通用編程語言,也可以用于處理 Microsoft Word 文件。在本文中,我將向你介紹如何使用 Python
    的頭像 發(fā)表于 09-27 17:03 ?1985次閱讀