# -*- coding: utf-8 -*- from threading import Thread,Lock,Event,Condition import multiprocessing as mlti class orth:#(th.Thread): lock=Lock() orEv=Event() def thpr(*a): with orth.lock: print(a,end="-") def run(cb,args): t=Thread(target=cb,args=args) t.start() return t def runi(sl,cb,data,mod,vl): liImz=[Thread(target=cb, args=(vl,x,data,[])) for x in range(mod)] for j in liImz: j.start() return liImz def thrun(thFind,iList,nm,ars): "thFind(ars,sepnm,iList,letlist)" #l=list.split(nm) #(len(letlist)*i)/nm #separater_span_block_colon_range_窓関数_ sep=len(iList)/nm sepnm=int(sep+1)#+0.5) #sepnm=len(iList)//nm+1#floordivision letlist=[]#[[] for i in range(nm)] liImz=[] for i in range(nm): #th letlist.append([]) if not(sepnm*(i+1)>=len(iList)): rang=(sepnm*i,sepnm*(i+1)) imz=Thread(target=thFind,args=(ars,rang,iList,letlist[i])) liImz.append(imz) else: rang=(sepnm*i,len(iList)) imz=Thread(target=thFind,args=(ars,rang,iList,letlist[i])) liImz.append(imz) break for f in liImz: f.start() for f in liImz: res=getattr(f,b'\x6A\x6F\x69\x6E'.decode())() return letlist def threadAny(thFind,iList,ars): "thFind(ars,sepnm,iList,letlist)" letlist=[[] for i in range(len(iList))] liImz=[] for i,j in enumerate(iList): def crProc(proc): def f1(*a): proc(*a) orEv.set() return f1 imz=Thread(target=crProc(thFind),args=(ars,j,letlist[i])) liImz.append(imz) for j in liImz: j.start() for i,j in enumerate(liImz): orEv.clear()#block<>set() green light n=orEv.wait(timeout=5) #call set() elsewhere if not n:return 0 rn1=[j for j in letlist if j][0] return rn1 import concurrent from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor def pRun(Cb,list,th=20,fil=()): executor = concurrent.futures.ProcessPoolExecutor(max_workers=nm) futures = [executor.submit(Cb,t) for t in list]#start # as_completed()は、与えられたfuturesの要素を完了順にたどるイテレータを返す。 for future in concurrent.futures.as_completed(futures):# block print(future.result()) # digest()の戻り値が表示される。 # (as_completedをすべてイテレートしているので、実際にはこの時点で完了していないスレッドは無いはず。) executor.shutdown() #thpool def pool(cb,urls,th=20,ve=(),pl="t"): li2=[None]*len(urls) pl={"t":ThreadPoolExecutor, "p":ProcessPoolExecutor}[pl] with pl(max_workers=th) as executor: future_to_url = {executor.submit(cb, url,*ve): x for x,url in enumerate(urls)} for future in concurrent.futures.as_completed(future_to_url): x = future_to_url[future] li2[x]=future.result() return li2 ##print(thpe(lambda ab,cd:ab+"23",["A","B"],23,["CD"])) def main():#incremental search import time def timeo(duration=0.2): def fi(te="something"): for j in te: time.sleep(duration) yield j newflag=0 if(newflag):#cansel this thread return 2 return 1 return fi global pro,pr1; def pro(conn,cb,ars,li): for j in cb(ars): conn.send(j) ri=conn.recv() print(ri) pr1=timeo() # ps=[] for j in "inc": pare, child = mlti.Pipe() ps.append(( mlti.Process(target=pro, args=(child,1234,(j*9),None)),pare,child)) #cant pickle local callback for p in ps: p[0].start() time.sleep(0.5) ## print pare.recv() # prints "[23, None, 'hello']" ## for p in ps: jw(p,) if __name__ == '__main__': ## main() from multiprocessing import shared_memory ## m=mlti.shared_memory.SharedMemory("fil",True,1) ## m=mlti.shared_memory.SharedMemory("fil",False,1) ## print(m) import mmap m=mmap.mmap(-1,3,"fil") print(m.tell(),m.read())