apple / th.py
t5
apple
5e4f92f
# -*- coding: utf-8 -*-
from threading import Thread,Lock,Event,Condition
import multiprocessing as mlti
class orth:#(th.Thread):
lock=Lock()
orEv=Event()
def thpr(*a):
with orth.lock:
print(a,end="-")
def run(cb,args):
t=Thread(target=cb,args=args)
t.start()
return t
def runi(sl,cb,data,mod,vl):
liImz=[Thread(target=cb, args=(vl,x,data,[])) for x in range(mod)]
for j in liImz: j.start()
return liImz
def thrun(thFind,iList,nm,ars):
"thFind(ars,sepnm,iList,letlist)"
#l=list.split(nm)
#(len(letlist)*i)/nm
#separater_span_block_colon_range_窓関数_
sep=len(iList)/nm
sepnm=int(sep+1)#+0.5)
#sepnm=len(iList)//nm+1#floordivision
letlist=[]#[[] for i in range(nm)]
liImz=[]
for i in range(nm):
#th
letlist.append([])
if not(sepnm*(i+1)>=len(iList)):
rang=(sepnm*i,sepnm*(i+1))
imz=Thread(target=thFind,args=(ars,rang,iList,letlist[i]))
liImz.append(imz)
else:
rang=(sepnm*i,len(iList))
imz=Thread(target=thFind,args=(ars,rang,iList,letlist[i]))
liImz.append(imz)
break
for f in liImz: f.start()
for f in liImz:
res=getattr(f,b'\x6A\x6F\x69\x6E'.decode())()
return letlist
def threadAny(thFind,iList,ars):
"thFind(ars,sepnm,iList,letlist)"
letlist=[[] for i in range(len(iList))]
liImz=[]
for i,j in enumerate(iList):
def crProc(proc):
def f1(*a):
proc(*a)
orEv.set()
return f1
imz=Thread(target=crProc(thFind),args=(ars,j,letlist[i]))
liImz.append(imz)
for j in liImz: j.start()
for i,j in enumerate(liImz):
orEv.clear()#block<>set() green light
n=orEv.wait(timeout=5)
#call set() elsewhere
if not n:return 0
rn1=[j for j in letlist if j][0]
return rn1
import concurrent
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
def pRun(Cb,list,th=20,fil=()):
executor = concurrent.futures.ProcessPoolExecutor(max_workers=nm)
futures = [executor.submit(Cb,t) for t in list]#start
# as_completed()は、与えられたfuturesの要素を完了順にたどるイテレータを返す。
for future in concurrent.futures.as_completed(futures):# block
print(future.result()) # digest()の戻り値が表示される。
# (as_completedをすべてイテレートしているので、実際にはこの時点で完了していないスレッドは無いはず。)
executor.shutdown()
#thpool
def pool(cb,urls,th=20,ve=(),pl="t"):
li2=[None]*len(urls)
pl={"t":ThreadPoolExecutor, "p":ProcessPoolExecutor}[pl]
with pl(max_workers=th) as executor:
future_to_url = {executor.submit(cb, url,*ve): x for x,url in enumerate(urls)}
for future in concurrent.futures.as_completed(future_to_url):
x = future_to_url[future]
li2[x]=future.result()
return li2
##print(thpe(lambda ab,cd:ab+"23",["A","B"],23,["CD"]))
def main():#incremental search
import time
def timeo(duration=0.2):
def fi(te="something"):
for j in te:
time.sleep(duration)
yield j
newflag=0
if(newflag):#cansel this thread
return 2
return 1
return fi
global pro,pr1;
def pro(conn,cb,ars,li):
for j in cb(ars):
conn.send(j)
ri=conn.recv()
print(ri)
pr1=timeo()
#
ps=[]
for j in "inc":
pare, child = mlti.Pipe()
ps.append(( mlti.Process(target=pro, args=(child,1234,(j*9),None)),pare,child))
#cant pickle local callback
for p in ps:
p[0].start()
time.sleep(0.5)
## print pare.recv() # prints "[23, None, 'hello']"
## for p in ps: jw(p,)
if __name__ == '__main__':
## main()
from multiprocessing import shared_memory
## m=mlti.shared_memory.SharedMemory("fil",True,1)
## m=mlti.shared_memory.SharedMemory("fil",False,1)
## print(m)
import mmap
m=mmap.mmap(-1,3,"fil")
print(m.tell(),m.read())